스트리밍
Agents SDK는 모델과 기타 실행 단계의 출력을 점진적으로 전달할 수 있습니다. 스트리밍을 사용하면 UI의 응답성을 유지할 수 있고, 전체 최종 결과를 기다린 뒤에야 사용자를 업데이트해야 하는 상황을 피할 수 있습니다.
스트리밍 활성화
섹션 제목: “스트리밍 활성화”전체 결과 대신 스트리밍 객체를 얻으려면 Runner.run()에 { stream: true } 옵션을 전달합니다.
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});스트리밍이 활성화되면 반환되는 stream은 AsyncIterable 인터페이스를 구현합니다. 각 이벤트는 실행 중에 발생한 일을 설명하는 객체로 yield 됩니다. 스트림은 세 가지 이벤트 타입 중 하나를 yield하며, 각각은 에이전트 실행의 서로 다른 부분을 설명합니다. 하지만 대부분의 애플리케이션은 모델의 텍스트만 필요하므로, 스트림은 이를 위한 헬퍼도 제공합니다.
텍스트 출력 가져오기
섹션 제목: “텍스트 출력 가져오기”방출된 텍스트의 스트림을 얻으려면 stream.toTextStream()을 호출합니다. compatibleWithNodeStreams가 true이면 반환값은 일반적인 Node.js Readable입니다. 이를 process.stdout이나 다른 대상으로 직접 파이프할 수 있습니다.
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
result .toTextStream({ compatibleWithNodeStreams: true, }) .pipe(process.stdout);stream.completed 프라미스는 실행과 모든 보류 중인 콜백이 완료되면 resolve 됩니다. 더 이상 출력이 없음을 확실히 하려면 항상 이를 await 하세요. 여기에는 마지막 텍스트 토큰이 도착한 뒤 완료되는 세션 영속화나 히스토리 압축 훅 같은 후처리 작업도 포함됩니다.
toTextStream()은 assistant 텍스트만 방출합니다. 도구 호출, 핸드오프, 승인, 기타 런타임 이벤트는 전체 이벤트 스트림에서 확인할 수 있습니다.
모든 이벤트 수신
섹션 제목: “모든 이벤트 수신”for await 루프를 사용해 각 이벤트가 도착할 때마다 확인할 수 있습니다. 유용한 정보에는 저수준 모델 이벤트, 에이전트 전환, SDK 전용 실행 정보가 포함됩니다.
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
const result = await run(agent, 'Tell me a story about a cat.', { stream: true,});
for await (const event of result) { // these are the raw events from the model if (event.type === 'raw_model_stream_event') { console.log(`${event.type} %o`, event.data); } // agent updated events if (event.type === 'agent_updated_stream_event') { console.log(`${event.type} %s`, event.agent.name); } // Agent SDK specific events if (event.type === 'run_item_stream_event') { console.log(`${event.type} %o`, event.item); }}일반 텍스트 스트림과 원문 이벤트 스트림을 모두 출력하는 완전한 스크립트는 스트리밍 예제를 참고하세요.
Responses WebSocket 전송 방식 (선택 사항)
섹션 제목: “Responses WebSocket 전송 방식 (선택 사항)”이 페이지의 스트리밍 API는 OpenAI Responses WebSocket 전송 방식에서도 작동합니다.
전역으로 활성화하려면 setOpenAIResponsesTransport('websocket')를 사용하고, 직접 OpenAIProvider를 사용할 경우 useResponsesWebSocket: true를 사용하세요.
WebSocket으로 스트리밍하기 위해 꼭 withResponsesWebSocketSession(...)이나 사용자 정의 OpenAIProvider가 필요한 것은 아닙니다. 실행 간 재연결이 허용된다면, 전송 방식을 활성화한 뒤에도 run() / Runner.run(..., { stream: true })는 계속 작동합니다.
연결 재사용과 더 명시적인 provider 생명주기 제어가 필요하다면 withResponsesWebSocketSession(...)이나 사용자 정의 OpenAIProvider / Runner를 사용하세요.
previousResponseId를 사용한 이어서 실행은 HTTP 전송 방식과 동일한 의미 체계를 사용합니다. 차이는 전송 방식과 연결 생명주기뿐입니다.
직접 provider를 구성하는 경우 종료 시 await provider.close()를 호출하는 것을 잊지 마세요. Websocket 기반 모델 래퍼는 기본적으로 재사용을 위해 캐시되며, provider를 닫으면 해당 연결이 해제됩니다. withResponsesWebSocketSession(...)은 동일한 재사용을 제공하지만 정리 범위를 단일 콜백으로 자동 제한합니다.
스트리밍, 도구 호출, 승인, previousResponseId를 포함한 완전한 예제는 examples/basic/stream-ws.ts를 참고하세요.
이벤트 타입
섹션 제목: “이벤트 타입”스트림은 세 가지 서로 다른 이벤트 타입을 yield합니다.
raw_model_stream_event
섹션 제목: “raw_model_stream_event”import { isOpenAIChatCompletionsRawModelStreamEvent, isOpenAIResponsesRawModelStreamEvent, type RunStreamEvent,} from '@openai/agents';
export function logOpenAIRawModelEvent(event: RunStreamEvent) { if (isOpenAIResponsesRawModelStreamEvent(event)) { console.log(event.source); console.log(event.data.event.type); return; }
if (isOpenAIChatCompletionsRawModelStreamEvent(event)) { console.log(event.source); console.log(event.data.event.object); }}예시:
{ "type": "raw_model_stream_event", "data": { "type": "output_text_delta", "delta": "Hello" }}OpenAI provider를 사용 중이라면 @openai/agents-openai와 @openai/agents 모두 agents-core의 일반적인 RunRawModelStreamEvent 계약을 변경하지 않으면서 원문 OpenAI 페이로드의 타입을 좁혀 주는 헬퍼를 내보냅니다.
import type { RunStreamEvent } from '@openai/agents';import { isOpenAIResponsesRawModelStreamEvent } from '@openai/agents';
export function isOpenAIResponsesTextDelta(event: RunStreamEvent): boolean { return ( isOpenAIResponsesRawModelStreamEvent(event) && event.data.event.type === 'response.output_text.delta' );}전송 방식에 독립적인 스트리밍 코드만 필요하다면 event.type === 'raw_model_stream_event'만 확인해도 충분합니다.
OpenAI 모델을 사용하면서 수동 캐스팅 없이 provider별 페이로드를 검사하고 싶다면, SDK는 타입 좁히기 헬퍼도 제공합니다.
- Responses 원문 이벤트용
isOpenAIResponsesRawModelStreamEvent(event) - Chat Completions 청크용
isOpenAIChatCompletionsRawModelStreamEvent(event)
이러한 OpenAI 모델 이벤트에서는 RunRawModelStreamEvent.source도 'openai-responses' 또는 'openai-chat-completions'로 채워집니다.
이는 response.reasoning_summary_text.delta, response.output_item.done, MCP 인수 델타 같은 Responses 전용 이벤트를 검사하면서도 TypeScript가 기본 이벤트 형태를 인식하도록 유지하고 싶을 때 특히 유용합니다.
더 완전한 OpenAI 전용 스트리밍 패턴은 examples/basic/stream-ws.ts, examples/tools/code-interpreter.ts, examples/connectors/index.ts를 참고하세요.
run_item_stream_event
섹션 제목: “run_item_stream_event”import type { RunItemStreamEvent, RunStreamEvent } from '@openai/agents';
export function isRunItemStreamEvent( event: RunStreamEvent,): event is RunItemStreamEvent { return event.type === 'run_item_stream_event';}name은 어떤 종류의 항목이 생성되었는지 식별합니다.
name | 의미 |
|---|---|
message_output_created | 메시지 출력 항목이 생성되었습니다. |
handoff_requested | 모델이 핸드오프를 요청했습니다. |
handoff_occurred | 런타임이 다른 에이전트로의 핸드오프를 완료했습니다. |
tool_search_called | tool_search_call 항목이 방출되었습니다. |
tool_search_output_created | 로드된 도구 정의가 포함된 tool_search_output 항목이 방출되었습니다. |
tool_called | 도구 호출 항목이 방출되었습니다. |
tool_output | 도구 결과 항목이 방출되었습니다. |
reasoning_item_created | reasoning 항목이 방출되었습니다. |
tool_approval_requested | 도구 호출이 사람의 승인을 위해 일시 중지되었습니다. |
tool_search_* 이벤트는 실행 중 toolSearchTool()을 사용해 지연된 도구를 로드하는 Responses 실행에서만 나타납니다.
핸드오프 페이로드 예시:
{ "type": "run_item_stream_event", "name": "handoff_occurred", "item": { "type": "handoff_call", "id": "h1", "status": "completed", "name": "transfer_to_refund_agent" }}agent_updated_stream_event
섹션 제목: “agent_updated_stream_event”import type { RunAgentUpdatedStreamEvent, RunStreamEvent,} from '@openai/agents';
export function isRunAgentUpdatedStreamEvent( event: RunStreamEvent,): event is RunAgentUpdatedStreamEvent { return event.type === 'agent_updated_stream_event';}예시:
{ "type": "agent_updated_stream_event", "agent": { "name": "Refund Agent" }}스트리밍 중 휴먼인더루프 (HITL)
섹션 제목: “스트리밍 중 휴먼인더루프 (HITL)”스트리밍은 실행을 일시 중지하는 핸드오프와 호환됩니다(예: 도구에 승인이 필요한 경우). 스트림 객체의 interruptions 필드는 보류 중인 승인을 노출하며, 각각에 대해 state.approve() 또는 state.reject()를 호출해 실행을 계속할 수 있습니다. 스트림이 일시 중지된 뒤 stream.completed가 resolve되고, stream.interruptions에는 처리할 승인이 포함됩니다. 이후 { stream: true }로 다시 실행하면 스트리밍 출력이 재개됩니다.
import { Agent, run } from '@openai/agents';
const agent = new Agent({ name: 'Storyteller', instructions: 'You are a storyteller. You will be given a topic and you will tell a story about it.',});
let stream = await run( agent, 'What is the weather in San Francisco and Oakland?', { stream: true },);stream.toTextStream({ compatibleWithNodeStreams: true }).pipe(process.stdout);await stream.completed;
while (stream.interruptions?.length) { console.log( 'Human-in-the-loop: approval required for the following tool calls:', ); const state = stream.state; for (const interruption of stream.interruptions) { const approved = confirm( `Agent ${interruption.agent.name} would like to use the tool ${interruption.name} with "${interruption.arguments}". Do you approve?`, ); if (approved) { state.approve(interruption); } else { state.reject(interruption); } }
// Resume execution with streaming output stream = await run(agent, state, { stream: true }); const textStream = stream.toTextStream({ compatibleWithNodeStreams: true }); textStream.pipe(process.stdout); await stream.completed;}사용자와 상호작용하는 더 완전한 예제는 human-in-the-loop-stream.ts입니다.
스트림 중지 및 동일 턴 계속하기
섹션 제목: “스트림 중지 및 동일 턴 계속하기”스트리밍 실행을 조기에 중지하려면 run()에 전달한 signal을 abort 하거나 stream.toStream()에서 생성한 reader를 취소하세요. 어느 경우든 실행이 정리되었다고 간주하기 전에 반드시 stream.completed를 await 해야 합니다. 코드가 이벤트 소비를 멈춘 뒤에도 SDK가 현재 턴 입력을 영속화하거나 기타 정리 작업을 마무리하고 있을 수 있습니다.
스트림이 취소되면 stream.cancelled는 true가 되며, 현재 턴이 끝나지 않았기 때문에 stream.finalOutput은 종종 undefined로 남습니다. 이 미완료 턴을 나중에 이어가고 싶다면, 새 사용자 메시지를 추가하는 대신 동일한 에이전트를 stream.state와 함께 다시 실행하세요. 이렇게 하면 턴 수가 올바르게 유지되고 RunState에 이미 저장된 conversationId나 previousResponseId도 재사용됩니다.
세션 영속화도 함께 사용 중이라면, 재개하는 run() 호출에도 동일한 session을 다시 전달하여 대화가 같은 백킹 스토어에 계속 기록되도록 하세요.
승인으로 인한 일시 중지도 같은 규칙을 따릅니다. stream.interruptions를 처리한 뒤 새 턴을 시작하지 말고 stream.state에서 재개하세요.
- 모든 출력이 플러시되었는지 보장하려면 종료 전에
stream.completed를 기다리세요 - 초기
{ stream: true }옵션은 해당 옵션이 제공된 호출에만 적용됩니다.RunState로 다시 실행하는 경우 이 옵션을 다시 지정해야 합니다 - 애플리케이션이 텍스트 결과만 필요하다면 개별 이벤트 객체를 다루지 않도록
toTextStream()사용을 우선 고려하세요
스트리밍과 이벤트 시스템을 사용하면 채팅 인터페이스, 터미널 애플리케이션, 또는 사용자가 점진적 업데이트의 이점을 누릴 수 있는 모든 곳에 에이전트를 통합할 수 있습니다.