ストリーミング
Agents SDK は、モデルやその他の実行ステップからの出力を段階的に返せます。ストリーミングにより UI の応答性を保ち、最終結果全体を待たずにユーザーへの表示を更新できます。
ストリーミングの有効化
Section titled “ストリーミングの有効化”Runner.run() に { stream: true } オプションを渡すと、完全な結果ではなくストリーミングオブジェクトを取得できます。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});ストリーミングを有効化すると、返される stream は AsyncIterable インターフェースを実装します。各 yield イベントは、その実行内で何が起きたかを示すオブジェクトです。ストリームは 3 種類のイベントを返し、それぞれがエージェント実行の異なる部分を表します。多くのアプリケーションではモデルのテキストだけが必要なため、ストリームにはヘルパーが用意されています。
テキスト出力の取得
Section titled “テキスト出力の取得”stream.toTextStream() を呼び出すと、出力されたテキストのストリームを取得できます。compatibleWithNodeStreams が true の場合、戻り値は通常の Node.js Readable です。これを process.stdout や他の出力先へ直接 pipe できます。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
result .toTextStream({ compatibleWithNodeStreams: true, }) .pipe(process.stdout);stream.completed の Promise は、実行と保留中のすべてのコールバックが完了すると resolve されます。これ以上出力がないことを保証したい場合は、必ず await してください。これには、最後のテキストトークン到着後に終わるセッション永続化や履歴圧縮フックなどの後処理も含まれます。
toTextStream() が出力するのは assistant のテキストのみです。ツール呼び出し、ハンドオフ、承認、その他のランタイムイベントは完全なイベントストリームから取得できます。
すべてのイベントの監視
Section titled “すべてのイベントの監視”for await ループを使って、到着した各イベントを確認できます。低レベルのモデルイベント、エージェントの切り替え、SDK 固有の実行情報などが得られます。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
for await (const event of result) { // these are the raw events from the model if (event.type === 'raw_model_stream_event') { console.log(`${event.type} %o`, event.data); } // agent updated events if (event.type === 'agent_updated_stream_event') { console.log(`${event.type} %s`, event.agent.name); } // Agent SDK specific events if (event.type === 'run_item_stream_event') { console.log(`${event.type} %o`, event.item); }}プレーンテキストストリームと元イベントストリームの両方を出力する、完全なスクリプトは streamed example を参照してください。
Responses WebSocket トランスポート(任意)
Section titled “Responses WebSocket トランスポート(任意)”このページのストリーミング API は OpenAI Responses WebSocket トランスポートでも動作します。
グローバルに有効化するには setOpenAIResponsesTransport('websocket') を使うか、useResponsesWebSocket: true を設定した独自の OpenAIProvider を使用します。
WebSocket 経由でストリーミングするだけなら、withResponsesWebSocketSession(...) やカスタム OpenAIProvider は不要です。実行間の再接続を許容できる場合、トランスポート有効化後も run() / Runner.run(..., { stream: true }) は動作します。
接続の再利用や、より明示的なプロバイダーライフサイクル制御が必要な場合は、withResponsesWebSocketSession(...) またはカスタム OpenAIProvider / Runner を使用してください。
previousResponseId を使った継続は、HTTP トランスポートと同じセマンティクスです。違いはトランスポートと接続ライフサイクルだけです。
プロバイダーを自作する場合、シャットダウン時に await provider.close() を呼ぶことを忘れないでください。Websocket ベースのモデルラッパーは既定で再利用のためにキャッシュされ、プロバイダーを閉じるとそれらの接続が解放されます。withResponsesWebSocketSession(...) は同じ再利用性を提供しつつ、クリーンアップを単一コールバックに自動でスコープします。
ストリーミング、ツール呼び出し、承認、previousResponseId を含む完全な例は examples/basic/stream-ws.ts を参照してください。
イベントタイプ
Section titled “イベントタイプ”ストリームは 3 種類のイベントを返します。
raw_model_stream_event
Section titled “raw_model_stream_event”import { isOpenAIChatCompletionsRawModelStreamEvent, isOpenAIResponsesRawModelStreamEvent, type RunStreamEvent,} from '@openai/agents';
export function logOpenAIRawModelEvent(event: RunStreamEvent) { if (isOpenAIResponsesRawModelStreamEvent(event)) { console.log(event.source); console.log(event.data.event.type); return; }
if (isOpenAIChatCompletionsRawModelStreamEvent(event)) { console.log(event.source); console.log(event.data.event.object); }}例:
{ "type": "raw_model_stream_event", "data": { "type": "output_text_delta", "delta": "Hello" }}OpenAI provider を使用している場合、@openai/agents-openai と @openai/agents はどちらも、agents-core の汎用 RunRawModelStreamEvent 契約を変更せずに、生の OpenAI ペイロードを絞り込むヘルパーをエクスポートしています。
import type { RunStreamEvent } from '@openai/agents';import { isOpenAIResponsesRawModelStreamEvent } from '@openai/agents';
export function isOpenAIResponsesTextDelta(event: RunStreamEvent): boolean { return ( isOpenAIResponsesRawModelStreamEvent(event) && event.data.event.type === 'response.output_text.delta' );}トランスポート非依存のストリーミングコードだけが必要な場合、event.type === 'raw_model_stream_event' を確認するだけで十分です。
OpenAI モデルを使用し、手動キャストなしで provider 固有のペイロードを確認したい場合、SDK は絞り込みヘルパーもエクスポートしています。
- Responses の生イベント用
isOpenAIResponsesRawModelStreamEvent(event) - Chat Completions のチャンク用
isOpenAIChatCompletionsRawModelStreamEvent(event)
これらの OpenAI モデルイベントでは、RunRawModelStreamEvent.source に 'openai-responses' または 'openai-chat-completions' も設定されます。
これは、response.reasoning_summary_text.delta、response.output_item.done、MCP 引数 delta など Responses 専用イベントを確認しつつ、基盤イベント形状を TypeScript に認識させたい場合に特に有用です。
より充実した OpenAI 固有のストリーミングパターンは examples/basic/stream-ws.ts、examples/tools/code-interpreter.ts、examples/connectors/index.ts を参照してください。
run_item_stream_event
Section titled “run_item_stream_event”import type { RunItemStreamEvent, RunStreamEvent } from '@openai/agents';
export function isRunItemStreamEvent( event: RunStreamEvent,): event is RunItemStreamEvent { return event.type === 'run_item_stream_event';}name は、どの種類の item が生成されたかを識別します。
name | 意味 |
|---|---|
message_output_created | メッセージ出力 item が作成された |
handoff_requested | モデルがハンドオフを要求した |
handoff_occurred | ランタイムが別のエージェントへのハンドオフを完了した |
tool_search_called | tool_search_call item が出力された |
tool_search_output_created | 読み込まれたツール定義を含む tool_search_output item が出力された |
tool_called | ツール呼び出し item が出力された |
tool_output | ツール結果 item が出力された |
reasoning_item_created | reasoning item が出力された |
tool_approval_requested | ツール呼び出しが人間の承認待ちで一時停止した |
tool_search_* イベントは、実行中に toolSearchTool() を使って遅延ツールを読み込む Responses 実行でのみ表示されます。
ハンドオフペイロードの例:
{ "type": "run_item_stream_event", "name": "handoff_occurred", "item": { "type": "handoff_call", "id": "h1", "status": "completed", "name": "transfer_to_refund_agent" }}agent_updated_stream_event
Section titled “agent_updated_stream_event”import type { RunAgentUpdatedStreamEvent, RunStreamEvent,} from '@openai/agents';
export function isRunAgentUpdatedStreamEvent( event: RunStreamEvent,): event is RunAgentUpdatedStreamEvent { return event.type === 'agent_updated_stream_event';}例:
{ "type": "agent_updated_stream_event", "agent": { "name": "Refund Agent" }}ストリーミング中の Human in the loop (人間の介入)
Section titled “ストリーミング中の Human in the loop (人間の介入)”ストリーミングは、実行を一時停止するハンドオフ(例: ツールが承認を必要とする場合)に対応しています。ストリームオブジェクトの interruptions フィールドで保留中の承認を参照でき、それぞれに対して state.approve() または state.reject() を呼んで実行を継続できます。ストリームが停止すると stream.completed が resolve され、stream.interruptions に処理すべき承認が入ります。{ stream: true } で再実行すると、ストリーミング出力が再開されます。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
let stream = await run( agent, 'What is the weather in San Francisco and Oakland?', { stream: true },);stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);await stream.completed;
while (stream.interruptions?.length) { console.log( 'Human-in-the-loop: approval required for the following tool calls:', ); const state = stream.state; for (const interruption of stream.interruptions) { const approved = confirm( `Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`, ); if (approved) { state.approve(interruption); } else { state.reject(interruption); } }
// Resume execution with streaming output stream = await run(agent, state, { stream: true }); const textStream = stream.toTextStream({ compatibleWithNodeStreams: true }); textStream.pipe(process.stdout); await stream.completed;}ユーザーと対話する、より完全な例は human-in-the-loop-stream.ts です。
- すべての出力が flush 済みであることを保証するため、終了前に
stream.completedを待機する - 初回の
{ stream: true }オプションは指定した呼び出しにのみ適用されます。RunStateで再実行する場合は再度オプションを指定する - アプリケーションがテキスト結果のみを必要とする場合、個別のイベントオブジェクト処理を避けるため
toTextStream()を優先する
ストリーミングとイベントシステムを使うことで、エージェントをチャットインターフェース、ターミナルアプリケーション、または段階的更新が有益なあらゆる場所に統合できます。