콘텐츠로 이동

스트리밍

Agents SDK는 모델과 기타 실행 단계의 출력을 점진적으로 전달할 수 있습니다. 스트리밍은 UI의 응답성을 유지하고, 전체 최종 결과를 기다리지 않고도 사용자에게 업데이트할 수 있게 해 줍니다.

Runner.run(){ stream: true } 옵션을 전달하면 전체 결과 대신 스트리밍 객체를 얻을 수 있습니다:

스트리밍 활성화
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});

스트리밍이 활성화되면 반환되는 streamAsyncIterable 인터페이스를 구현합니다. yield되는 각 이벤트는 실행 내에서 발생한 일을 설명하는 객체입니다. 스트림은 세 가지 이벤트 유형 중 하나를 yield하며, 각각은 에이전트 실행의 서로 다른 부분을 설명합니다. 다만 대부분의 애플리케이션은 모델의 텍스트만 필요하므로, 스트림은 이를 위한 헬퍼를 제공합니다.

방출된 텍스트 스트림을 얻으려면 stream.toTextStream()을 호출합니다. compatibleWithNodeStreamstrue이면 반환값은 일반 Node.js Readable입니다. 이를 process.stdout이나 다른 대상으로 직접 pipe할 수 있습니다.

텍스트가 도착하는 대로 로그 출력
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
result
.toTextStream({
compatibleWithNodeStreams: true,
})
.pipe(process.stdout);

Promise인 stream.completed는 실행과 대기 중인 모든 콜백이 완료되면 resolve됩니다. 더 이상 출력이 없음을 보장하려면 항상 이를 await 하세요. 여기에는 마지막 텍스트 토큰이 도착한 뒤 완료되는 세션 영속성 또는 히스토리 압축 훅 같은 후처리 작업도 포함됩니다.

toTextStream()은 어시스턴트 텍스트만 내보냅니다. 도구 호출, 핸드오프, 승인 및 기타 런타임 이벤트는 전체 이벤트 스트림에서 사용할 수 있습니다.

for await 루프를 사용해 각 이벤트가 도착하는 대로 검사할 수 있습니다. 유용한 정보에는 저수준 모델 이벤트, 에이전트 전환 및 SDK별 실행 정보가 포함됩니다:

모든 이벤트 수신
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
const result = await run(agent, 'Tell me a story about a cat.', {
stream: true,
});
for await (const event of result) {
// these are the raw events from the model
if (event.type === 'raw_model_stream_event') {
console.log(`${event.type} %o`, event.data);
}
// agent updated events
if (event.type === 'agent_updated_stream_event') {
console.log(`${event.type} %s`, event.agent.name);
}
// Agent SDK specific events
if (event.type === 'run_item_stream_event') {
console.log(`${event.type} %o`, event.item);
}
}

일반 텍스트 스트림과 원문 이벤트 스트림을 모두 출력하는 완성된 스크립트는 스트리밍 예제를 참고하세요.

Responses WebSocket 전송 방식(선택 사항)

섹션 제목: “Responses WebSocket 전송 방식(선택 사항)”

이 페이지의 스트리밍 API는 OpenAI Responses WebSocket 전송 방식에서도 작동합니다.

전역으로 활성화하려면 setOpenAIResponsesTransport('websocket')를 사용하거나, useResponsesWebSocket: true를 설정한 자체 OpenAIProvider를 사용하세요.

WebSocket으로 스트리밍하기 위해 withResponsesWebSocketSession(...)나 사용자 지정 OpenAIProvider가 꼭 필요하지는 않습니다. 실행 사이에 재연결해도 괜찮다면, 전송 방식을 활성화한 뒤에도 run() / Runner.run(..., { stream: true })가 계속 작동합니다.

연결 재사용과 더 명시적인 프로바이더 생명주기 제어가 필요할 때는 withResponsesWebSocketSession(...) 또는 사용자 지정 OpenAIProvider / Runner를 사용하세요.

previousResponseId를 사용한 이어가기는 HTTP 전송 방식과 동일한 의미 체계를 사용합니다. 차이는 전송 방식과 연결 생명주기뿐입니다.

프로바이더를 직접 빌드하는 경우 종료 시 await provider.close()를 호출해야 합니다. WebSocket 기반 모델 래퍼는 기본적으로 재사용을 위해 캐시되며, 프로바이더를 닫으면 해당 연결이 해제됩니다. withResponsesWebSocketSession(...)은 동일한 재사용을 제공하지만 정리를 단일 콜백 범위로 자동 제한합니다.

스트리밍, 도구 호출, 승인, previousResponseId를 포함한 전체 예제는 examples/basic/stream-ws.ts를 참고하세요.

스트림은 세 가지 이벤트 유형을 yield합니다:

RunRawModelStreamEvent
import {
isOpenAIChatCompletionsRawModelStreamEvent,
isOpenAIResponsesRawModelStreamEvent,
type RunStreamEvent,
} from '@openai/agents';
export function logOpenAIRawModelEvent(event: RunStreamEvent) {
if (isOpenAIResponsesRawModelStreamEvent(event)) {
console.log(event.source);
console.log(event.data.event.type);
return;
}
if (isOpenAIChatCompletionsRawModelStreamEvent(event)) {
console.log(event.source);
console.log(event.data.event.object);
}
}

예:

{
"type": "raw_model_stream_event",
"data": {
"type": "output_text_delta",
"delta": "Hello"
}
}

OpenAI provider를 사용하는 경우, @openai/agents-openai@openai/agents는 모두 agents-core의 제네릭 RunRawModelStreamEvent 계약을 변경하지 않고 원문 OpenAI 페이로드의 타입을 좁히는 헬퍼를 내보냅니다.

OpenAI 원문 모델 이벤트 타입 좁히기
import type { RunStreamEvent } from '@openai/agents';
import { isOpenAIResponsesRawModelStreamEvent } from '@openai/agents';
export function isOpenAIResponsesTextDelta(event: RunStreamEvent): boolean {
return (
isOpenAIResponsesRawModelStreamEvent(event) &&
event.data.event.type === 'response.output_text.delta'
);
}

전송 방식에 구애받지 않는 스트리밍 코드만 필요하다면, event.type === 'raw_model_stream_event'를 확인하는 것만으로도 충분합니다.

OpenAI 모델을 사용하며 직접 캐스팅하지 않고 provider별 페이로드를 검사하려면, SDK는 타입 좁히기 헬퍼도 내보냅니다:

  • Responses 원문 이벤트용 isOpenAIResponsesRawModelStreamEvent(event)
  • Chat Completions 청크용 isOpenAIChatCompletionsRawModelStreamEvent(event)

이러한 OpenAI 모델 이벤트의 경우, RunRawModelStreamEvent.source에도 'openai-responses' 또는 'openai-chat-completions' 중 하나가 채워집니다.

이는 response.reasoning_summary_text.delta, response.output_item.done 또는 MCP 인수 델타 같은 Responses 전용 이벤트를 검사하면서도 TypeScript가 기본 이벤트 형태를 알 수 있게 유지하고 싶을 때 특히 유용합니다.

보다 자세한 OpenAI 전용 스트리밍 패턴은 examples/basic/stream-ws.ts, examples/tools/code-interpreter.ts, examples/connectors/index.ts를 참고하세요.

RunItemStreamEvent
import type { RunItemStreamEvent, RunStreamEvent } from '@openai/agents';
export function isRunItemStreamEvent(
event: RunStreamEvent,
): event is RunItemStreamEvent {
return event.type === 'run_item_stream_event';
}

name은 어떤 종류의 항목이 생성되었는지 식별합니다:

name의미
message_output_created메시지 출력 항목이 생성되었습니다.
handoff_requested모델이 핸드오프를 요청했습니다.
handoff_occurred런타임이 다른 에이전트로의 핸드오프를 완료했습니다.
tool_search_calledtool_search_call 항목이 내보내졌습니다.
tool_search_output_created로드된 도구 정의가 포함된 tool_search_output 항목이 내보내졌습니다.
tool_called도구 호출 항목이 내보내졌습니다.
tool_output도구 결과 항목이 내보내졌습니다.
reasoning_item_created추론 항목이 내보내졌습니다.
tool_approval_requested도구 호출이 사람의 승인을 위해 일시 중지되었습니다.

tool_search_* 이벤트는 실행 중 지연된 도구를 로드하기 위해 toolSearchTool()을 사용하는 Responses 실행에만 나타납니다.

핸드오프 페이로드 예:

{
"type": "run_item_stream_event",
"name": "handoff_occurred",
"item": {
"type": "handoff_call",
"id": "h1",
"status": "completed",
"name": "transfer_to_refund_agent"
}
}
RunAgentUpdatedStreamEvent
import type {
RunAgentUpdatedStreamEvent,
RunStreamEvent,
} from '@openai/agents';
export function isRunAgentUpdatedStreamEvent(
event: RunStreamEvent,
): event is RunAgentUpdatedStreamEvent {
return event.type === 'agent_updated_stream_event';
}

예:

{
"type": "agent_updated_stream_event",
"agent": {
"name": "Refund Agent"
}
}

스트리밍 중 휴먼인더루프 (HITL)

섹션 제목: “스트리밍 중 휴먼인더루프 (HITL)”

스트리밍은 실행을 일시 중지하는 핸드오프(예: 도구에 승인이 필요한 경우)와 호환됩니다. 스트림 객체의 interruptions 필드는 대기 중인 승인들을 노출하며, 각각에 대해 state.approve() 또는 state.reject()를 호출해 실행을 계속할 수 있습니다. 스트림이 일시 중지되면 stream.completed가 resolve되고 stream.interruptions에는 처리할 승인들이 포함됩니다. { stream: true }로 다시 실행하면 스트리밍 출력이 재개됩니다.

스트리밍 중 사람 승인 처리
import { Agent, run } from '@openai/agents';
const agent = new Agent({
name: 'Storyteller',
instructions:
'You are a storyteller. You will be given a topic and you will tell a story about it.',
});
let stream = await run(
agent,
'What is the weather in San Francisco and Oakland?',
{ stream: true },
);
stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);
await stream.completed;
while (stream.interruptions?.length) {
console.log(
'Human-in-the-loop: approval required for the following tool calls:',
);
const state = stream.state;
for (const interruption of stream.interruptions) {
const approved = confirm(
`Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`,
);
if (approved) {
state.approve(interruption);
} else {
state.reject(interruption);
}
}
// Resume execution with streaming output
stream = await run(agent, state, { stream: true });
const textStream = stream.toTextStream({ compatibleWithNodeStreams: true });
textStream.pipe(process.stdout);
await stream.completed;
}

사용자와 상호작용하는 더 완전한 예제는 human-in-the-loop-stream.ts입니다.

스트림 중지 및 동일 턴 계속 진행

섹션 제목: “스트림 중지 및 동일 턴 계속 진행”

스트리밍 실행을 조기에 중지하려면 run()에 전달한 signal을 abort하거나 stream.toStream()에서 생성된 reader를 취소하세요. 어느 방법이든 실행이 정리된 것으로 간주하기 전에 여전히 stream.completed를 await 하세요. 코드가 이벤트 소비를 중단한 뒤에도 SDK가 현재 턴 입력을 영속화하거나 다른 정리 작업을 완료하고 있을 수 있습니다.

스트림이 취소되면 stream.cancelledtrue가 되고, 현재 턴이 완료되지 않았기 때문에 stream.finalOutput은 종종 undefined로 남습니다. 나중에 해당 미완료 턴을 계속하려면 새 사용자 메시지를 추가하는 대신 동일한 에이전트를 stream.state로 다시 실행하세요. 이렇게 하면 턴 카운팅이 올바르게 유지되고 RunState에 이미 저장된 conversationId 또는 previousResponseId가 재사용됩니다.

세션 영속성도 사용 중이라면, 대화가 동일한 백킹 스토어에 계속 기록되도록 재개된 run() 호출에 동일한 session을 다시 전달하세요.

승인으로 인한 일시 중지도 같은 규칙을 따릅니다. stream.interruptions를 처리한 다음, 새 턴을 시작하지 말고 stream.state에서 재개하세요.

  • 종료하기 전에 모든 출력이 flush되었는지 확인하려면 stream.completed를 기다리세요.
  • 초기 { stream: true } 옵션은 제공된 호출에만 적용됩니다. RunState로 다시 실행하는 경우 이 옵션을 다시 지정해야 합니다.
  • 애플리케이션이 텍스트 결과에만 관심이 있다면 개별 이벤트 객체를 다루지 않도록 toTextStream()을 사용하는 것이 좋습니다.

스트리밍과 이벤트 시스템을 사용하면 에이전트를 채팅 인터페이스, 터미널 애플리케이션 또는 사용자가 점진적 업데이트의 이점을 얻는 모든 곳에 통합할 수 있습니다.