コンテンツにスキップ

ストリーミング

Agents SDK は、モデルおよびその他の実行ステップからの出力を段階的に配信できます。ストリーミングにより UI を応答性良く保ち、エージェントの最終的な実行結果をすべて待たずに ユーザー を更新できます。

Runner.run(){ stream: true } オプションを渡すと、完全な結果ではなくストリーミングオブジェクトを取得します:

Enabling streaming
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});

ストリーミングが有効な場合、返される streamAsyncIterable インターフェースを実装します。各イベントは、実行内で何が起きたかを示すオブジェクトです。ストリームは 3 種類のイベントタイプのいずれかを生成し、エージェントの実行の異なる部分を記述します。多くのアプリケーションはモデルのテキストのみを必要とするため、ストリームはそのためのヘルパーを提供します。

stream.toTextStream() を呼び出すと、生成されたテキストのストリームを取得できます。compatibleWithNodeStreamstrue の場合、戻り値は通常の Node.js の Readable です。process.stdout などへ直接パイプできます。

Logging out the text as it arrives
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
result
.toTextStream({
compatibleWithNodeStreams: true,
})
.pipe(process.stdout);

stream.completed の Promise は、実行および保留中のすべてのコールバックが完了すると解決されます。出力がもうないことを確実にするには、必ず await してください。

for await ループを使用して、到着した各イベントを検査できます。低レベルのモデルイベント、エージェントの切り替え、SDK 固有の実行情報などが役立ちます:

Listening to all events
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
for await (const event of result) {
// these are the raw events from the model
if (event.type === 'raw_model_stream_event') {
console.log(`${event.type} %o`, event.data);
}
// agent updated events
if (event.type === 'agent_updated_stream_event') {
console.log(`${event.type} %s`, event.agent.name);
}
// Agent SDK specific events
if (event.type === 'run_item_stream_event') {
console.log(`${event.type} %o`, event.item);
}
}

ストリーミングのプレーンテキストと 元 のイベントストリームの両方を出力する完全なスクリプトについては、the streamed example を参照してください。

Responses WebSocket トランスポート(任意)

Section titled “Responses WebSocket トランスポート(任意)”

このページのストリーミング API は、OpenAI Responses WebSocket トランスポートでも動作します。

setOpenAIResponsesTransport('websocket') でグローバルに有効化するか、useResponsesWebSocket: true を指定した独自の OpenAIProvider を使用します。

WebSocket でストリーミングするだけなら、withResponsesWebSocketSession(...) やカスタムの OpenAIProvider は不要です。実行間での再接続が許容できるなら、トランスポートを有効化した後でも run() / Runner.run(..., { stream: true }) はそのまま動作します。

接続の再利用やより明示的なプロバイダーのライフサイクル管理が必要な場合は、withResponsesWebSocketSession(...) またはカスタムの OpenAIProvider / Runner を使用してください。

プロバイダーを自分で構築した場合は、シャットダウン時に await provider.close() を呼び出すことを忘れないでください。WebSocket バックエンドのモデルラッパーはデフォルトで再利用のためキャッシュされ、プロバイダーを閉じるとそれらの接続が解放されます。withResponsesWebSocketSession(...) は同様の再利用を提供しつつ、クリーンアップを単一のコールバックに自動的にスコープします。

ストリーミング、ツール呼び出し、承認、previousResponseId を含む完全な code examples については、examples/basic/stream-ws.ts を参照してください。

ストリームは 3 種類の異なるイベントタイプを生成します:

type RunRawModelStreamEvent = {
type: 'raw_model_stream_event';
data: ResponseStreamEvent;
};

例:

{
"type": "raw_model_stream_event",
"data": {
"type": "output_text_delta",
"delta": "Hello"
}
}
type RunItemStreamEvent = {
type: 'run_item_stream_event';
name: RunItemStreamEventName;
item: RunItem;
};

name は、生成されたアイテムの種類を識別します:

name意味
message_output_createdメッセージ出力アイテムが作成された
handoff_requestedモデルがハンドオフを要求した
handoff_occurredランタイムが別のエージェントへのハンドオフを完了した
tool_calledツール呼び出しアイテムが出力された
tool_outputツールの結果アイテムが出力された
reasoning_item_createdリーズニングアイテムが出力された
tool_approval_requestedツール呼び出しが人による承認のために一時停止した

ハンドオフのペイロード例:

{
"type": "run_item_stream_event",
"name": "handoff_occurred",
"item": {
"type": "handoff_call",
"id": "h1",
"status": "completed",
"name": "transfer_to_refund_agent"
}
}
type RunAgentUpdatedStreamEvent = {
type: 'agent_updated_stream_event';
agent: Agent<any, any>;
};

例:

{
"type": "agent_updated_stream_event",
"agent": {
"name": "Refund Agent"
}
}

ストリーミング中の Human in the loop(人間の介入)

Section titled “ストリーミング中の Human in the loop(人間の介入)”

ストリーミングは、実行を一時停止するハンドオフ(例えばツールが承認を必要とする場合)と互換性があります。ストリームオブジェクト上の interruption フィールドで割り込みにアクセスでき、各割り込みに対して state.approve() または state.reject() を呼び出すことで実行を継続できます。{ stream: true } で再実行するとストリーミング出力が再開します。

Handling human approval while streaming
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
let stream = await run(
agent,
'What is the weather in San Francisco and Oakland?',
{ stream: true },
);
stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);
await stream.completed;
while (stream.interruptions?.length) {
console.log(
'Human-in-the-loop: approval required for the following tool calls:',
);
const state = stream.state;
for (const interruption of stream.interruptions) {
const approved = confirm(
`Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`,
);
if (approved) {
state.approve(interruption);
} else {
state.reject(interruption);
}
}
// Resume execution with streaming output
stream = await run(agent, state, { stream: true });
const textStream = stream.toTextStream({ compatibleWithNodeStreams: true });
textStream.pipe(process.stdout);
await stream.completed;
}

ユーザー と対話する、より完全な code examples は human-in-the-loop-stream.ts を参照してください。

  • すべての出力がフラッシュされることを確実にするため、終了前に stream.completed を待つことを忘れないでください
  • 最初の { stream: true } オプションは、それを指定した呼び出しにのみ適用されます。RunState で再実行する場合は、再度このオプションを指定する必要があります
  • アプリケーションがテキスト結果のみに関心がある場合は、個々のイベントオブジェクトを扱わずに済むように toTextStream() を優先してください

ストリーミングとイベントシステムを使うことで、エージェントをチャットインターフェースやターミナルアプリケーション、増分更新が ユーザー に有益なあらゆる場面に統合できます。