流式传输
Agents SDK 可以逐步传递模型输出以及其他执行步骤的结果。流式传输可让您的 UI 保持响应,并避免在更新用户之前等待完整的最终结果。
启用流式传输
Section titled “启用流式传输”向 Runner.run() 传入 { stream: true } 选项,即可获得一个流式对象,而不是完整结果:
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});启用流式传输后,返回的 stream 会实现 AsyncIterable 接口。每个产生的事件都是一个对象,用于描述运行过程中发生的情况。该流会产生三种事件类型,每种都对应智能体执行过程中的不同部分。不过,大多数应用通常只关心模型文本,因此该流也提供了辅助方法。
获取文本输出
Section titled “获取文本输出”调用 stream.toTextStream() 可获得已发出的文本流。当 compatibleWithNodeStreams 为 true 时,返回值是一个标准的 Node.js Readable。我们可以将其直接管道传输到 process.stdout 或其他目标。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
result .toTextStream({ compatibleWithNodeStreams: true, }) .pipe(process.stdout);当运行以及所有待处理回调都完成后,Promise stream.completed 会被解析。如果您想确保不会再有更多输出,请始终等待它。这包括一些后处理工作,例如会话持久化或历史压缩钩子,这些工作可能会在最后一个文本 token 到达后才完成。
toTextStream() 只会发出助手文本。工具调用、交接、审批及其他运行时事件可从完整事件流中获取。
监听所有事件
Section titled “监听所有事件”您可以使用 for await 循环在每个事件到达时进行检查。其中的有用信息包括底层模型事件、任何智能体切换以及 SDK 特有的运行信息:
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
for await (const event of result) { // these are the raw events from the model if (event.type === 'raw_model_stream_event') { console.log(`${event.type} %o`, event.data); } // agent updated events if (event.type === 'agent_updated_stream_event') { console.log(`${event.type} %s`, event.agent.name); } // Agent SDK specific events if (event.type === 'run_item_stream_event') { console.log(`${event.type} %o`, event.item); }}有关完整脚本示例(同时打印纯文本流和原始事件流),请参见流式传输示例。
Responses WebSocket 传输(可选)
Section titled “Responses WebSocket 传输(可选)”本页中的流式传输 API 也适用于 OpenAI Responses WebSocket 传输。
可通过 setOpenAIResponsesTransport('websocket') 全局启用,或者使用您自己的 OpenAIProvider 并设置 useResponsesWebSocket: true。
您不需要仅仅为了通过 WebSocket 进行流式传输而使用 withResponsesWebSocketSession(...) 或自定义 OpenAIProvider。如果运行之间允许重新连接,那么在启用该传输后,run() / Runner.run(..., { stream: true }) 仍然可以正常工作。
当您希望复用连接,并对 provider 生命周期进行更明确的控制时,请使用 withResponsesWebSocketSession(...) 或自定义 OpenAIProvider / Runner。
通过 previousResponseId 进行续接时,其语义与 HTTP 传输相同。区别仅在于传输方式和连接生命周期。
如果您自行构建 provider,请记得在关闭时调用 await provider.close()。默认情况下,基于 Websocket 的模型包装器会被缓存以供复用,而关闭 provider 会释放这些连接。withResponsesWebSocketSession(...) 也提供相同的复用能力,但会自动将清理范围限定在单个回调内。
完整示例(包含流式传输、工具调用、审批和 previousResponseId)请参见 examples/basic/stream-ws.ts。
该流会产生三种不同的事件类型:
raw_model_stream_event
Section titled “raw_model_stream_event”import { isOpenAIChatCompletionsRawModelStreamEvent, isOpenAIResponsesRawModelStreamEvent, type RunStreamEvent,} from '@openai/agents';
export function logOpenAIRawModelEvent(event: RunStreamEvent) { if (isOpenAIResponsesRawModelStreamEvent(event)) { console.log(event.source); console.log(event.data.event.type); return; }
if (isOpenAIChatCompletionsRawModelStreamEvent(event)) { console.log(event.source); console.log(event.data.event.object); }}示例:
{ "type": "raw_model_stream_event", "data": { "type": "output_text_delta", "delta": "Hello" }}如果您使用的是 OpenAI provider,@openai/agents-openai 和 @openai/agents 都会导出辅助方法,用于缩小原始 OpenAI 载荷的类型范围,同时不会改变 agents-core 中通用的 RunRawModelStreamEvent 契约。
import type { RunStreamEvent } from '@openai/agents';import { isOpenAIResponsesRawModelStreamEvent } from '@openai/agents';
export function isOpenAIResponsesTextDelta(event: RunStreamEvent): boolean { return ( isOpenAIResponsesRawModelStreamEvent(event) && event.data.event.type === 'response.output_text.delta' );}当您只需要与传输无关的流式传输代码时,检查 event.type === 'raw_model_stream_event' 仍然足够。
如果您使用 OpenAI 模型,并希望在不手动类型转换的情况下检查 provider 特有的载荷,SDK 还导出了类型收窄辅助方法:
isOpenAIResponsesRawModelStreamEvent(event):用于 Responses 原始事件。isOpenAIChatCompletionsRawModelStreamEvent(event):用于 Chat Completions 分块。
对于这些 OpenAI 模型事件,RunRawModelStreamEvent.source 也会被填充为 'openai-responses' 或 'openai-chat-completions'。
当您希望检查仅属于 Responses 的事件(如 response.reasoning_summary_text.delta、response.output_item.done 或 MCP 参数增量),同时保持 TypeScript 知晓底层事件结构时,这一点尤其有用。
更完整的 OpenAI 特定流式传输模式,请参见 examples/basic/stream-ws.ts、examples/tools/code-interpreter.ts 和 examples/connectors/index.ts。
run_item_stream_event
Section titled “run_item_stream_event”import type { RunItemStreamEvent, RunStreamEvent } from '@openai/agents';
export function isRunItemStreamEvent( event: RunStreamEvent,): event is RunItemStreamEvent { return event.type === 'run_item_stream_event';}name 用于标识产出的条目类型:
name | 含义 |
|---|---|
message_output_created | 已创建消息输出条目。 |
handoff_requested | 模型请求了一次交接。 |
handoff_occurred | 运行时已完成向另一个智能体的交接。 |
tool_search_called | 发出了一个 tool_search_call 条目。 |
tool_search_output_created | 发出了一个包含已加载工具定义的 tool_search_output 条目。 |
tool_called | 发出了一个工具调用条目。 |
tool_output | 发出了一个工具结果条目。 |
reasoning_item_created | 发出了一个推理条目。 |
tool_approval_requested | 一个工具调用因等待人工审批而暂停。 |
tool_search_* 事件仅会出现在使用 toolSearchTool() 在运行期间加载延迟工具的 Responses 运行中。
交接载荷示例:
{ "type": "run_item_stream_event", "name": "handoff_occurred", "item": { "type": "handoff_call", "id": "h1", "status": "completed", "name": "transfer_to_refund_agent" }}agent_updated_stream_event
Section titled “agent_updated_stream_event”import type { RunAgentUpdatedStreamEvent, RunStreamEvent,} from '@openai/agents';
export function isRunAgentUpdatedStreamEvent( event: RunStreamEvent,): event is RunAgentUpdatedStreamEvent { return event.type === 'agent_updated_stream_event';}示例:
{ "type": "agent_updated_stream_event", "agent": { "name": "Refund Agent" }}流式传输中的人机协作
Section titled “流式传输中的人机协作”流式传输兼容会暂停执行的交接(例如工具需要审批时)。流对象上的 interruptions 字段会暴露待处理的审批项,您可以对每一项调用 state.approve() 或 state.reject() 以继续执行。流暂停后,stream.completed 会被解析,而 stream.interruptions 会包含需要处理的审批项。再次以 { stream: true } 执行时,会恢复流式输出。
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
let stream = await run( agent, 'What is the weather in San Francisco and Oakland?', { stream: true },);stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);await stream.completed;
while (stream.interruptions?.length) { console.log( 'Human-in-the-loop: approval required for the following tool calls:', ); const state = stream.state; for (const interruption of stream.interruptions) { const approved = confirm( `Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`, ); if (approved) { state.approve(interruption); } else { state.reject(interruption); } }
// Resume execution with streaming output stream = await run(agent, state, { stream: true }); const textStream = stream.toTextStream({ compatibleWithNodeStreams: true }); textStream.pipe(process.stdout); await stream.completed;}与用户交互的更完整示例请参见 human-in-the-loop-stream.ts。
停止流并继续同一轮对话
Section titled “停止流并继续同一轮对话”如果要提前停止一次流式运行,请中止您传给 run() 的 signal,或取消由 stream.toStream() 创建的 reader。无论采用哪种方式,在将该运行视为已稳定结束前,仍应等待 stream.completed。即使您的代码已停止消费事件,SDK 仍可能正在持久化当前轮次输入,或完成其他清理工作。
当流被取消时,stream.cancelled 会变为 true,而 stream.finalOutput 通常会保持为 undefined,因为当前轮次尚未完成。如果您之后想继续这个未完成的轮次,请使用 stream.state 重新运行相同的智能体,而不是追加一条新的用户消息。这样可以保持轮次计数正确,并复用已存储在 RunState 中的 conversationId 或 previousResponseId。
如果您还在使用会话持久化,请在恢复后的 run() 调用中再次传入同一个 session,这样对话就会继续写入同一个后端存储。
审批暂停也遵循相同规则:先处理 stream.interruptions,然后从 stream.state 恢复,而不是开始新一轮对话。
- 请记得在退出前等待
stream.completed,以确保所有输出都已刷新。 - 初始的
{ stream: true }选项只对传入它的那次调用生效。如果您使用RunState重新运行,则必须再次指定该选项。 - 如果您的应用只关心文本结果,优先使用
toTextStream(),以避免处理单独的事件对象。
借助流式传输和事件系统,您可以将智能体集成到聊天界面、终端应用程序,或任何用户能从增量更新中受益的场景中。