跳转到内容

流式传输

Agents SDK 可以增量地提供模型和其他执行步骤的输出。流式传输让你的 UI 更加灵敏,无需等待整个最终结果再更新给用户。

Runner.run() 传入 { stream: true } 选项,以获得一个流式对象而非完整结果:

Enabling streaming
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});

启用流式传输后,返回的 stream 实现了 AsyncIterable 接口。每个产出的事件都是一个对象,描述运行期间发生的事情。该流会产出三种事件类型之一,分别描述智能体执行的不同部分。 大多数应用只需要模型的文本,因此流提供了便捷方法。

调用 stream.toTextStream() 获取已发出的文本流。 当 compatibleWithNodeStreamstrue 时,返回值是常规的 Node.js Readable。我们可以将其直接管道到 process.stdout 或其他目标。

Logging out the text as it arrives
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
result
.toTextStream({
compatibleWithNodeStreams: true,
})
.pipe(process.stdout);

当运行和所有挂起的回调完成后,stream.completed 这个 promise 会被 resolve。如果你想确保没有更多输出,请务必等待它。

你可以使用 for await 循环在每个事件到达时进行检查。 有用的信息包括底层模型事件、任意智能体切换以及 SDK 特定的运行信息:

Listening to all events
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
for await (const event of result) {
// these are the raw events from the model
if (event.type === 'raw_model_stream_event') {
console.log(`${event.type} %o`, event.data);
}
// agent updated events
if (event.type === 'agent_updated_stream_event') {
console.log(`${event.type} %s`, event.agent.name);
}
// Agent SDK specific events
if (event.type === 'run_item_stream_event') {
console.log(`${event.type} %o`, event.item);
}
}

参见流式示例,其中包含一个完整脚本,同时打印纯文本流和原始事件流。

该流会产出三种不同的事件类型:

type RunRawModelStreamEvent = {
type: 'raw_model_stream_event';
data: ResponseStreamEvent;
};

示例:

{
"type": "raw_model_stream_event",
"data": {
"type": "output_text_delta",
"delta": "Hello"
}
}
type RunItemStreamEvent = {
type: 'run_item_stream_event';
name: RunItemStreamEventName;
item: RunItem;
};

交接负载示例:

{
"type": "run_item_stream_event",
"name": "handoff_occurred",
"item": {
"type": "handoff_call",
"id": "h1",
"status": "completed",
"name": "transfer_to_refund_agent"
}
}
type RunAgentUpdatedStreamEvent = {
type: 'agent_updated_stream_event';
agent: Agent<any, any>;
};

示例:

{
"type": "agent_updated_stream_event",
"agent": {
"name": "Refund Agent"
}
}

流式传输与会暂停执行的交接兼容(例如当某个工具需要审批时)。流对象上的 interruption 字段暴露了这些中断,你可以通过调用每个中断的 state.approve()state.reject() 来继续执行。再次以 { stream: true } 执行会恢复流式输出。

Handling human approval while streaming
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
let stream = await run(
agent,
'What is the weather in San Francisco and Oakland?',
{ stream: true },
);
stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);
await stream.completed;
while (stream.interruptions?.length) {
console.log(
'Human-in-the-loop: approval required for the following tool calls:',
);
const state = stream.state;
for (const interruption of stream.interruptions) {
const approved = confirm(
`Agent ${interruption.agent.name} would like to use the tool ${interruption.rawItem.name} with "${interruption.rawItem.arguments}". Do you approve?`,
);
if (approved) {
state.approve(interruption);
} else {
state.reject(interruption);
}
}
// Resume execution with streaming output
stream = await run(agent, state, { stream: true });
const textStream = stream.toTextStream({ compatibleWithNodeStreams: true });
textStream.pipe(process.stdout);
await stream.completed;
}

一个与用户交互的更完整示例见 human-in-the-loop-stream.ts

  • 退出前记得等待 stream.completed,以确保所有输出都已刷新。
  • 初始的 { stream: true } 选项只适用于提供它的那次调用。如果你用 RunState 重新执行,必须再次指定该选项。
  • 如果你的应用只关心文本结果,优先使用 toTextStream(),以避免处理单独的事件对象。

借助流式传输和事件系统,你可以将智能体集成到聊天界面、终端应用,或任何用户受益于增量更新的场景。