流式传输
Agents SDK 可以逐步输出模型及其他执行步骤的结果。流式传输可保持 UI 响应灵敏,避免在更新用户界面前等待整个最终结果。
启用流式传输
Section titled “启用流式传输”向 Runner.run() 传入 { stream: true } 选项以获取流式对象,而不是完整结果:
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});启用流式传输后,返回的 stream 实现了 AsyncIterable 接口。每个产出事件都是一个对象,描述运行过程中的发生情况。该流会产出三种事件类型之一,分别描述智能体执行的不同部分。多数应用只需要模型的文本,因此流提供了辅助方法。
获取文本输出
Section titled “获取文本输出”调用 stream.toTextStream() 获取已发出的文本流。
当 compatibleWithNodeStreams 为 true 时,返回值是常规的 Node.js Readable。我们可以将其直接管道到 process.stdout 或其他目标。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
result .toTextStream({ compatibleWithNodeStreams: true, }) .pipe(process.stdout);当运行和所有挂起的回调完成后,stream.completed 这个 promise 会被 resolve。若要确保没有更多输出,请务必等待它。
监听所有事件
Section titled “监听所有事件”您可以使用 for await 循环在事件到达时逐一检查。
有用的信息包括底层模型事件、任何智能体切换以及 SDK 特定的运行信息:
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
for await (const event of result) { // these are the raw events from the model if (event.type === 'raw_model_stream_event') { console.log(`${event.type} %o`, event.data); } // agent updated events if (event.type === 'agent_updated_stream_event') { console.log(`${event.type} %s`, event.agent.name); } // Agent SDK specific events if (event.type === 'run_item_stream_event') { console.log(`${event.type} %o`, event.item); }}请参阅the streamed example,这是一个完整脚本,会同时打印纯文本流和原始事件流。
Responses WebSocket 传输(可选)
Section titled “Responses WebSocket 传输(可选)”本页的流式 API 同样适用于 OpenAI Responses WebSocket 传输。
使用 setOpenAIResponsesTransport('websocket') 全局启用,或使用带有 useResponsesWebSocket: true 的自定义 OpenAIProvider。
仅为了通过 WebSocket 进行流式传输,您不需要 withResponsesWebSocketSession(...) 或自定义 OpenAIProvider。如果允许在运行之间重新连接,启用该传输后,run() / Runner.run(..., { stream: true }) 依然有效。
当您需要连接复用和更明确的 provider 生命周期控制时,请使用 withResponsesWebSocketSession(...) 或自定义的 OpenAIProvider / Runner。
如果您自行构建 provider,请在关闭时记得调用 await provider.close()。基于 WebSocket 的模型封装默认会被缓存以复用,关闭 provider 会释放这些连接。withResponsesWebSocketSession(...) 可提供相同的复用,但自动将清理限定在单个回调范围内。
参见 examples/basic/stream-ws.ts,其中包含流式传输、工具调用、审批以及 previousResponseId 的完整示例。
该流会产出三种不同的事件类型:
raw_model_stream_event
Section titled “raw_model_stream_event”type RunRawModelStreamEvent = { type: 'raw_model_stream_event'; data: ResponseStreamEvent;};示例:
{ "type": "raw_model_stream_event", "data": { "type": "output_text_delta", "delta": "Hello" }}run_item_stream_event
Section titled “run_item_stream_event”type RunItemStreamEvent = { type: 'run_item_stream_event'; name: RunItemStreamEventName; item: RunItem;};name 标识生成了哪种条目:
name | 含义 |
|---|---|
message_output_created | 已创建一条消息输出条目。 |
handoff_requested | 模型请求一次交接。 |
handoff_occurred | 运行时已完成到另一智能体的交接。 |
tool_called | 已发出一个工具调用条目。 |
tool_output | 已发出一个工具结果条目。 |
reasoning_item_created | 已发出一个推理条目。 |
tool_approval_requested | 工具调用已暂停以等待人工审批。 |
交接负载示例:
{ "type": "run_item_stream_event", "name": "handoff_occurred", "item": { "type": "handoff_call", "id": "h1", "status": "completed", "name": "transfer_to_refund_agent" }}agent_updated_stream_event
Section titled “agent_updated_stream_event”type RunAgentUpdatedStreamEvent = { type: 'agent_updated_stream_event'; agent: Agent<any, any>;};示例:
{ "type": "agent_updated_stream_event", "agent": { "name": "Refund Agent" }}流式传输中的人工干预
Section titled “流式传输中的人工干预”流式传输与会暂停执行的交接兼容(例如当某个工具需要审批时)。流对象上的 interruption 字段会暴露这些中断,您可以通过对每个中断调用 state.approve() 或 state.reject() 来继续执行。使用 { stream: true } 再次执行即可恢复流式输出。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
let stream = await run( agent, 'What is the weather in San Francisco and Oakland?', { stream: true },);stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);await stream.completed;
while (stream.interruptions?.length) { console.log( 'Human-in-the-loop: approval required for the following tool calls:', ); const state = stream.state; for (const interruption of stream.interruptions) { const approved = confirm( `Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`, ); if (approved) { state.approve(interruption); } else { state.reject(interruption); } }
// Resume execution with streaming output stream = await run(agent, state, { stream: true }); const textStream = stream.toTextStream({ compatibleWithNodeStreams: true }); textStream.pipe(process.stdout); await stream.completed;}一个与用户交互更完整的示例见
human-in-the-loop-stream.ts。
- 退出前请记得等待
stream.completed,以确保所有输出均已刷新。 - 初始的
{ stream: true }选项只适用于提供它的那次调用。若您使用RunState重新运行,必须再次指定该选项。 - 如果您的应用只关心文本结果,优先使用
toTextStream(),以避免处理单独的事件对象。
借助流式传输和事件系统,您可以将智能体集成到聊天界面、终端应用,或任何用户能从增量更新中获益的场景。