跳转到内容

流式传输

Agents SDK 可以增量地交付来自模型以及其他执行步骤的输出。流式传输可以让你的 UI 保持响应,并避免等到完整最终结果生成后才向用户更新内容。

Runner.run() 传入 { stream: true } 选项,即可获得流式对象,而不是完整结果:

启用流式传输
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});

启用流式传输后,返回的 stream 会实现 AsyncIterable 接口。每个产出的事件都是一个对象,用于描述运行过程中发生的事情。该流会产出三种事件类型之一,每种类型描述智能体执行过程中的不同部分。不过,大多数应用只需要模型文本,因此该流提供了一些辅助方法。

调用 stream.toTextStream() 可获得已发出文本的流。当 compatibleWithNodeStreamstrue 时,返回值是普通的 Node.js Readable。我们可以将它直接管道传输到 process.stdout 或其他目标。

文本到达时输出日志
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
result
.toTextStream({
compatibleWithNodeStreams: true,
})
.pipe(process.stdout);

stream.completed 这个 promise 会在运行以及所有待处理回调都完成后 resolve。如果你想确保不再有更多输出,请务必 await 它。这也包括后处理工作,例如会话持久化或历史压缩钩子,它们会在最后一个文本 token 到达后完成。

toTextStream() 只会发出助手文本。工具调用、交接、批准和其他运行时事件可从完整事件流中获取。

你可以使用 for await 循环检查每个到达的事件。有用的信息包括低层模型事件、任何智能体切换以及 SDK 专属运行信息:

监听所有事件
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
for await (const event of result) {
// these are the raw events from the model
if (event.type === 'raw_model_stream_event') {
console.log(`${event.type} %o`, event.data);
}
// agent updated events
if (event.type === 'agent_updated_stream_event') {
console.log(`${event.type} %s`, event.agent.name);
}
// Agent SDK specific events
if (event.type === 'run_item_stream_event') {
console.log(`${event.type} %o`, event.item);
}
}

请参阅流式传输示例,其中提供了一个完整可运行脚本,会同时打印纯文本流和原始事件流。

本页的流式传输 API 也适用于 OpenAI Responses WebSocket 传输。

可以使用 setOpenAIResponsesTransport('websocket') 全局启用它,或使用你自己的 OpenAIProvider 并设置 useResponsesWebSocket: true

仅为了通过 WebSocket 进行流式传输,你不需要 withResponsesWebSocketSession(...) 或自定义 OpenAIProvider。如果可以接受在不同运行之间重新连接,启用该传输后,run() / Runner.run(..., { stream: true }) 仍然可用。

当你希望复用连接并更明确地控制提供方生命周期时,请使用 withResponsesWebSocketSession(...),或使用自定义 OpenAIProvider / Runner

使用 previousResponseId 继续时,语义与 HTTP 传输相同。区别仅在于传输方式和连接生命周期。

如果你自行构建提供方,请记得在关闭时调用 await provider.close()。由 WebSocket 支持的模型包装器默认会被缓存以便复用,关闭提供方会释放这些连接。withResponsesWebSocketSession(...) 提供相同的复用能力,但会自动将清理范围限定在单个回调内。

请参阅 examples/basic/stream-ws.ts,其中包含一个完整示例,涵盖流式传输、工具调用、批准以及 previousResponseId

该流会产出三种不同的事件类型:

RunRawModelStreamEvent
import {
isOpenAIChatCompletionsRawModelStreamEvent,
isOpenAIResponsesRawModelStreamEvent,
type RunStreamEvent,
} from '@openai/agents';
export function logOpenAIRawModelEvent(event: RunStreamEvent) {
if (isOpenAIResponsesRawModelStreamEvent(event)) {
console.log(event.source);
console.log(event.data.event.type);
return;
}
if (isOpenAIChatCompletionsRawModelStreamEvent(event)) {
console.log(event.source);
console.log(event.data.event.object);
}
}

示例:

{
"type": "raw_model_stream_event",
"data": {
"type": "output_text_delta",
"delta": "Hello"
}
}

如果你使用 OpenAI 提供方,@openai/agents-openai@openai/agents 都会导出辅助函数,用于对原始 OpenAI 载荷进行类型收窄,而不会改变 agents-core 中通用的 RunRawModelStreamEvent 契约。

收窄 OpenAI 原始模型事件
import type { RunStreamEvent } from '@openai/agents';
import { isOpenAIResponsesRawModelStreamEvent } from '@openai/agents';
export function isOpenAIResponsesTextDelta(event: RunStreamEvent): boolean {
return (
isOpenAIResponsesRawModelStreamEvent(event) &&
event.data.event.type === 'response.output_text.delta'
);
}

当你只需要与传输无关的流式传输代码时,检查 event.type === 'raw_model_stream_event' 仍然足够。

如果你使用 OpenAI 模型,并希望在不手动类型转换的情况下检查提供方特定载荷,SDK 也会导出类型收窄辅助函数:

  • isOpenAIResponsesRawModelStreamEvent(event) 用于 Responses 原始事件。
  • isOpenAIChatCompletionsRawModelStreamEvent(event) 用于 Chat Completions 分块。

对于这些 OpenAI 模型事件,RunRawModelStreamEvent.source 也会填充为 'openai-responses''openai-chat-completions'

当你希望检查仅 Responses 才有的事件时,这一点尤其有用,例如 response.reasoning_summary_text.deltaresponse.output_item.done 或 MCP 参数增量,同时仍让 TypeScript 了解底层事件形状。

请参阅 examples/basic/stream-ws.tsexamples/tools/code-interpreter.tsexamples/connectors/index.ts,了解更完整的 OpenAI 专属流式传输模式。

RunItemStreamEvent
import type { RunItemStreamEvent, RunStreamEvent } from '@openai/agents';
export function isRunItemStreamEvent(
event: RunStreamEvent,
): event is RunItemStreamEvent {
return event.type === 'run_item_stream_event';
}

name 标识生成了哪类项:

name含义
message_output_created已创建消息输出项。
handoff_requested模型请求了交接。
handoff_occurred运行时完成了到另一个智能体的交接。
tool_search_called发出了一个 tool_search_call 项。
tool_search_output_created发出了一个包含已加载工具定义的 tool_search_output 项。
tool_called发出了一个工具调用项。
tool_output发出了一个工具结果项。
reasoning_item_created发出了一个推理项。
tool_approval_requested工具调用因等待人工批准而暂停。

tool_search_* 事件只会出现在使用 toolSearchTool() 在运行期间加载延迟工具的 Responses 运行中。

交接载荷示例:

{
"type": "run_item_stream_event",
"name": "handoff_occurred",
"item": {
"type": "handoff_call",
"id": "h1",
"status": "completed",
"name": "transfer_to_refund_agent"
}
}
RunAgentUpdatedStreamEvent
import type {
RunAgentUpdatedStreamEvent,
RunStreamEvent,
} from '@openai/agents';
export function isRunAgentUpdatedStreamEvent(
event: RunStreamEvent,
): event is RunAgentUpdatedStreamEvent {
return event.type === 'agent_updated_stream_event';
}

示例:

{
"type": "agent_updated_stream_event",
"agent": {
"name": "Refund Agent"
}
}

流式传输与会暂停执行的交接兼容(例如工具需要批准时)。stream 对象上的 interruptions 字段会暴露待处理的批准项,你可以对每个批准项调用 state.approve()state.reject() 来继续执行。流暂停后,stream.completed 会 resolve,stream.interruptions 会包含需要处理的批准项。再次以 { stream: true } 执行会恢复流式输出。

流式传输期间的人工批准处理
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
let stream = await run(
agent,
'What is the weather in San Francisco and Oakland?',
{ stream: true },
);
stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);
await stream.completed;
while (stream.interruptions?.length) {
console.log(
'Human-in-the-loop: approval required for the following tool calls:',
);
const state = stream.state;
for (const interruption of stream.interruptions) {
const approved = confirm(
`Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`,
);
if (approved) {
state.approve(interruption);
} else {
state.reject(interruption);
}
}
// Resume execution with streaming output
stream = await run(agent, state, { stream: true });
const textStream = stream.toTextStream({ compatibleWithNodeStreams: true });
textStream.pipe(process.stdout);
await stream.completed;
}

一个会与用户交互的更完整示例是 human-in-the-loop-stream.ts

要提前停止一次流式运行,可以中止传给 run()signal,或取消从 stream.toStream() 创建的 reader。无论哪种方式,在将该运行视为已完成前,仍应 await stream.completed。在你的代码停止消费事件后,SDK 可能仍在持久化当前轮次输入,或完成其他清理工作。

当流被取消时,stream.cancelled 会变为 true,而 stream.finalOutput 通常仍为 undefined,因为当前轮次从未完成。如果你想稍后继续这个未完成的轮次,请用 stream.state 重新运行同一个智能体,而不是追加新的用户消息。这样可以保持轮次计数正确,并复用任何已存储在 RunState 中的 conversationIdpreviousResponseId

如果你也在使用会话持久化,请在恢复后的 run() 调用中再次传入同一个 session,以便对话继续写入同一个后端存储。

批准导致的暂停也遵循同一规则:先处理 stream.interruptions,再从 stream.state 恢复,而不是开始新轮次。

  • 请记得在退出前等待 stream.completed,以确保所有输出都已刷新。
  • 初始的 { stream: true } 选项只适用于提供该选项的那次调用。如果使用 RunState 重新运行,你必须再次指定该选项。
  • 如果你的应用只关心文本结果,优先使用 toTextStream(),以避免处理单个事件对象。

借助流式传输和事件系统,你可以将智能体集成到聊天界面、终端应用,或任何能让用户受益于增量更新的场景中。