コンテンツにスキップ

ストリーミング

Agents SDK は、モデルおよびその他の実行ステップからの出力を段階的に配信できます。ストリーミングにより UI の応答性を保ち、最終結果全体を待たずにユーザーへの表示を更新できます。

Runner.run(){ stream: true } オプションを渡すと、完全な実行結果ではなくストリーミングオブジェクトを取得できます。

ストリーミングの有効化
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});

ストリーミングを有効にすると、返される streamAsyncIterable インターフェイスを実装します。生成される各イベントは、その実行内で何が起きたかを説明するオブジェクトです。ストリームは 3 種類のイベント型のいずれかを返し、それぞれがエージェントの実行の異なる部分を表します。ただし、多くのアプリケーションが必要とするのはモデルのテキストだけなので、ストリームにはヘルパーが用意されています。

stream.toTextStream() を呼び出して、発行されたテキストのストリームを取得します。compatibleWithNodeStreamstrue の場合、戻り値は通常の Node.js Readable です。これを process.stdout や別の出力先へ直接パイプできます。

到着したテキストのログ出力
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
result
.toTextStream({
compatibleWithNodeStreams: true,
})
.pipe(process.stdout);

Promise である stream.completed は、実行と保留中のすべてのコールバックが完了すると解決されます。これ以上出力がないことを確実にしたい場合は、必ず await してください。これには、最後のテキストトークンが到着した後に完了する、セッションの永続化や履歴圧縮フックなどの後処理も含まれます。

toTextStream() はアシスタントのテキストのみを発行します。ツール呼び出し、ハンドオフ、承認、その他のランタイムイベントは、イベントストリーム全体から利用できます。

for await ループを使って、到着した各イベントを調べることができます。有用な情報には、低レベルなモデルイベント、エージェントの切り替え、SDK 固有の実行情報などがあります。

全イベントのリスニング
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
for await (const event of result) {
// these are the raw events from the model
if (event.type === 'raw_model_stream_event') {
console.log(`${event.type} %o`, event.data);
}
// agent updated events
if (event.type === 'agent_updated_stream_event') {
console.log(`${event.type} %s`, event.agent.name);
}
// Agent SDK specific events
if (event.type === 'run_item_stream_event') {
console.log(`${event.type} %o`, event.item);
}
}

プレーンテキストストリームと元のイベントストリームの両方を出力する完成したスクリプトについては、ストリーミングのコード例を参照してください。

Responses WebSocket トランスポート(任意)

Section titled “Responses WebSocket トランスポート(任意)”

このページのストリーミング API は、OpenAI Responses WebSocket トランスポートでも動作します。

setOpenAIResponsesTransport('websocket') でグローバルに有効化するか、useResponsesWebSocket: true を指定した独自の OpenAIProvider を使用してください。

WebSocket 経由でストリーミングするだけであれば、withResponsesWebSocketSession(...) やカスタム OpenAIProvider は必要ありません。実行間の再接続が許容できる場合は、トランスポートを有効化した後も run() / Runner.run(..., { stream: true }) が引き続き動作します。

接続の再利用と、より明示的なプロバイダーライフサイクル制御が必要な場合は、withResponsesWebSocketSession(...) またはカスタム OpenAIProvider / Runner を使用してください。

previousResponseId による継続は、HTTP トランスポートと同じセマンティクスを使用します。違いはトランスポートと接続ライフサイクルだけです。

プロバイダーを自分で構築する場合は、シャットダウン時に await provider.close() を呼び出すことを忘れないでください。WebSocket ベースのモデルラッパーは、デフォルトで再利用のためにキャッシュされます。プロバイダーを閉じると、それらの接続が解放されます。withResponsesWebSocketSession(...) でも同じ再利用が可能ですが、クリーンアップの範囲は単一のコールバックに自動的に限定されます。

ストリーミング、ツール呼び出し、承認、previousResponseId を含む完全なコード例については、examples/basic/stream-ws.tsを参照してください。

ストリームは 3 つの異なるイベント型を返します。

RunRawModelStreamEvent
import {
isOpenAIChatCompletionsRawModelStreamEvent,
isOpenAIResponsesRawModelStreamEvent,
type RunStreamEvent,
} from '@openai/agents';
export function logOpenAIRawModelEvent(event: RunStreamEvent) {
if (isOpenAIResponsesRawModelStreamEvent(event)) {
console.log(event.source);
console.log(event.data.event.type);
return;
}
if (isOpenAIChatCompletionsRawModelStreamEvent(event)) {
console.log(event.source);
console.log(event.data.event.object);
}
}

例:

{
"type": "raw_model_stream_event",
"data": {
"type": "output_text_delta",
"delta": "Hello"
}
}

OpenAI プロバイダーを使用している場合、@openai/agents-openai@openai/agents はどちらも、agents-core における汎用の RunRawModelStreamEvent コントラクトを変更せずに、元の OpenAI ペイロードを絞り込むヘルパーをエクスポートします。

OpenAI の元のモデルイベントの絞り込み
import type { RunStreamEvent } from '@openai/agents';
import { isOpenAIResponsesRawModelStreamEvent } from '@openai/agents';
export function isOpenAIResponsesTextDelta(event: RunStreamEvent): boolean {
return (
isOpenAIResponsesRawModelStreamEvent(event) &&
event.data.event.type === 'response.output_text.delta'
);
}

トランスポート非依存のストリーミングコードだけが必要な場合は、event.type === 'raw_model_stream_event' を確認するだけで十分です。

OpenAI モデルを使用しており、手動キャストなしでプロバイダー固有のペイロードを調べたい場合、SDK は絞り込みヘルパーもエクスポートします。

  • isOpenAIResponsesRawModelStreamEvent(event) は Responses の元イベント用です。
  • isOpenAIChatCompletionsRawModelStreamEvent(event) は Chat Completions チャンク用です。

これらの OpenAI モデルイベントでは、RunRawModelStreamEvent.source にも 'openai-responses' または 'openai-chat-completions' のいずれかが設定されます。

これは、TypeScript が基礎となるイベントの形状を認識できる状態を保ちながら、response.reasoning_summary_text.deltaresponse.output_item.done、MCP 引数のデルタなど、Responses 専用イベントを調べたい場合に特に便利です。

より充実した OpenAI 固有のストリーミングパターンについては、examples/basic/stream-ws.tsexamples/tools/code-interpreter.tsexamples/connectors/index.tsを参照してください。

RunItemStreamEvent
import type { RunItemStreamEvent, RunStreamEvent } from '@openai/agents';
export function isRunItemStreamEvent(
event: RunStreamEvent,
): event is RunItemStreamEvent {
return event.type === 'run_item_stream_event';
}

name は、どの種類のアイテムが生成されたかを識別します。

name意味
message_output_createdメッセージ出力アイテムが作成されました。
handoff_requestedモデルがハンドオフをリクエストしました。
handoff_occurredランタイムが別のエージェントへのハンドオフを完了しました。
tool_search_calledtool_search_call アイテムが発行されました。
tool_search_output_created読み込まれたツール定義を含む tool_search_output アイテムが発行されました。
tool_calledツール呼び出しアイテムが発行されました。
tool_outputツール結果アイテムが発行されました。
reasoning_item_created推論アイテムが発行されました。
tool_approval_requestedツール呼び出しが人間の承認のために一時停止しました。

tool_search_* イベントは、実行中に遅延ツールを読み込むために toolSearchTool() を使用する Responses の実行でのみ発生します。

ハンドオフペイロードの例:

{
"type": "run_item_stream_event",
"name": "handoff_occurred",
"item": {
"type": "handoff_call",
"id": "h1",
"status": "completed",
"name": "transfer_to_refund_agent"
}
}
RunAgentUpdatedStreamEvent
import type {
RunAgentUpdatedStreamEvent,
RunStreamEvent,
} from '@openai/agents';
export function isRunAgentUpdatedStreamEvent(
event: RunStreamEvent,
): event is RunAgentUpdatedStreamEvent {
return event.type === 'agent_updated_stream_event';
}

例:

{
"type": "agent_updated_stream_event",
"agent": {
"name": "Refund Agent"
}
}

ストリーミング中の Human in the loop (人間の介入)

Section titled “ストリーミング中の Human in the loop (人間の介入)”

ストリーミングは、実行を一時停止するハンドオフ(たとえばツールが承認を必要とする場合)と互換性があります。ストリームオブジェクトの interruptions フィールドは保留中の承認を公開し、それぞれに対して state.approve() または state.reject() を呼び出すことで実行を続行できます。ストリームが一時停止した後、stream.completed が解決され、stream.interruptions には処理すべき承認が含まれます。{ stream: true } で再度実行すると、ストリーミング出力が再開されます。

ストリーミング中の人間による承認の処理
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
let stream = await run(
agent,
'What is the weather in San Francisco and Oakland?',
{ stream: true },
);
stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);
await stream.completed;
while (stream.interruptions?.length) {
console.log(
'Human-in-the-loop: approval required for the following tool calls:',
);
const state = stream.state;
for (const interruption of stream.interruptions) {
const approved = confirm(
`Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`,
);
if (approved) {
state.approve(interruption);
} else {
state.reject(interruption);
}
}
// Resume execution with streaming output
stream = await run(agent, state, { stream: true });
const textStream = stream.toTextStream({ compatibleWithNodeStreams: true });
textStream.pipe(process.stdout);
await stream.completed;
}

ユーザーとやり取りする、より充実したコード例は human-in-the-loop-stream.ts です。

ストリームの停止と同じターンの継続

Section titled “ストリームの停止と同じターンの継続”

ストリーミング実行を早めに停止するには、run() に渡した signal を abort するか、stream.toStream() から作成した reader をキャンセルします。どちらの場合でも、実行が確定したとみなす前に stream.completed を await してください。コードがイベントの消費を停止した後も、SDK は現在のターン入力の永続化やその他のクリーンアップを完了している途中の場合があります。

ストリームがキャンセルされると、stream.cancelledtrue になり、現在のターンが完了していないため stream.finalOutput は多くの場合 undefined のままです。後でその未完了のターンを続行したい場合は、新しいユーザーメッセージを追加するのではなく、同じエージェントを stream.state で再実行してください。これによりターン数のカウントが正しく保たれ、RunState にすでに保存されている conversationIdpreviousResponseId が再利用されます。

セッションの永続化も使用している場合は、再開した run() 呼び出しで同じ session をもう一度渡し、会話が同じバッキングストアへ書き込み続けるようにしてください。

承認による一時停止も同じルールに従います。stream.interruptions を解決してから、新しいターンを開始するのではなく stream.state から再開してください。

  • 終了する前に stream.completed を待機し、すべての出力がフラッシュされたことを確認してください。
  • 最初の { stream: true } オプションは、それが指定された呼び出しにのみ適用されます。RunState で再実行する場合は、このオプションを再度指定する必要があります。
  • アプリケーションがテキストの結果だけを扱う場合は、個々のイベントオブジェクトを処理しなくて済むように toTextStream() を使用することをおすすめします。

ストリーミングとイベントシステムにより、エージェントをチャットインターフェイスやターミナルアプリケーション、または段階的な更新がユーザーに役立つあらゆる場所へ統合できます。