コンテンツにスキップ

Results

RunResultBase dataclass

Bases: ABC

Source code in src/agents/result.py
@dataclass
class RunResultBase(abc.ABC):
    input: str | list[TResponseInputItem]
    """The original input items i.e. the items before run() was called. This may be a mutated
    version of the input, if there are handoff input filters that mutate the input.
    """

    new_items: list[RunItem]
    """The new items generated during the agent run. These include things like new messages, tool
    calls and their outputs, etc.
    """

    raw_responses: list[ModelResponse]
    """The raw LLM responses generated by the model during the agent run."""

    final_output: Any
    """The output of the last agent."""

    input_guardrail_results: list[InputGuardrailResult]
    """Guardrail results for the input messages."""

    output_guardrail_results: list[OutputGuardrailResult]
    """Guardrail results for the final output of the agent."""

    tool_input_guardrail_results: list[ToolInputGuardrailResult]
    """Tool input guardrail results from all tools executed during the run."""

    tool_output_guardrail_results: list[ToolOutputGuardrailResult]
    """Tool output guardrail results from all tools executed during the run."""

    context_wrapper: RunContextWrapper[Any]
    """The context wrapper for the agent run."""

    interruptions: list[ToolApprovalItem]
    """Pending tool approval requests (interruptions) for this run."""

    _trace_state: TraceState | None = field(default=None, init=False, repr=False)
    """Serialized trace metadata captured during the run."""

    @property
    @abc.abstractmethod
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run."""

    def release_agents(self, *, release_new_items: bool = True) -> None:
        """
        Release strong references to agents held by this result. After calling this method,
        accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
        collected. Callers can use this when they are done inspecting the result and want to
        eagerly drop any associated agent graph.
        """
        if release_new_items:
            for item in self.new_items:
                release = getattr(item, "release_agent", None)
                if callable(release):
                    release()
        self._release_last_agent_reference()

    def __del__(self) -> None:
        try:
            # Fall back to releasing agents automatically in case the caller never invoked
            # `release_agents()` explicitly so GC of the RunResult drops the last strong reference.
            # We pass `release_new_items=False` so RunItems that the user intentionally keeps
            # continue exposing their originating agent until that agent itself is collected.
            self.release_agents(release_new_items=False)
        except Exception:
            # Avoid raising from __del__.
            pass

    @abc.abstractmethod
    def _release_last_agent_reference(self) -> None:
        """Release stored agent reference specific to the concrete result type."""

    def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
        """A convenience method to cast the final output to a specific type. By default, the cast
        is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
        TypeError if the final output is not of the given type.

        Args:
            cls: The type to cast the final output to.
            raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
                the given type.

        Returns:
            The final output casted to the given type.
        """
        if raise_if_incorrect_type and not isinstance(self.final_output, cls):
            raise TypeError(f"Final output is not of type {cls.__name__}")

        return cast(T, self.final_output)

    def to_input_list(self) -> list[TResponseInputItem]:
        """Creates a new input list, merging the original input with all the new items generated."""
        original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
        new_items: list[TResponseInputItem] = []
        for item in self.new_items:
            if isinstance(item, ToolApprovalItem):
                continue
            new_items.append(item.to_input_item())

        return original_items + new_items

    @property
    def last_response_id(self) -> str | None:
        """Convenience method to get the response ID of the last model response."""
        if not self.raw_responses:
            return None

        return self.raw_responses[-1].response_id

input instance-attribute

input: str | list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutated version of the input, if there are handoff input filters that mutate the input.

new_items instance-attribute

new_items: list[RunItem]

The new items generated during the agent run. These include things like new messages, tool calls and their outputs, etc.

raw_responses instance-attribute

raw_responses: list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

final_output instance-attribute

final_output: Any

The output of the last agent.

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

Tool output guardrail results from all tools executed during the run.

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

The context wrapper for the agent run.

interruptions instance-attribute

interruptions: list[ToolApprovalItem]

Pending tool approval requests (interruptions) for this run.

last_agent abstractmethod property

last_agent: Agent[Any]

The last agent that was run.

last_response_id property

last_response_id: str | None

Convenience method to get the response ID of the last model response.

release_agents

release_agents(*, release_new_items: bool = True) -> None

Release strong references to agents held by this result. After calling this method, accessing item.agent or last_agent may return None if the agent has been garbage collected. Callers can use this when they are done inspecting the result and want to eagerly drop any associated agent graph.

Source code in src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

A convenience method to cast the final output to a specific type. By default, the cast is only for the typechecker. If you set raise_if_incorrect_type to True, we'll raise a TypeError if the final output is not of the given type.

Parameters:

Name Type Description Default
cls type[T]

The type to cast the final output to.

required
raise_if_incorrect_type bool

If True, we'll raise a TypeError if the final output is not of the given type.

False

Returns:

Type Description
T

The final output casted to the given type.

Source code in src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code in src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items: list[TResponseInputItem] = []
    for item in self.new_items:
        if isinstance(item, ToolApprovalItem):
            continue
        new_items.append(item.to_input_item())

    return original_items + new_items

RunResult dataclass

Bases: RunResultBase

Source code in src/agents/result.py
@dataclass
class RunResult(RunResultBase):
    _last_agent: Agent[Any]
    _last_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field(
        init=False,
        repr=False,
        default=None,
    )
    _last_processed_response: ProcessedResponse | None = field(default=None, repr=False)
    """The last processed model response. This is needed for resuming from interruptions."""
    _tool_use_tracker_snapshot: dict[str, list[str]] = field(default_factory=dict, repr=False)
    _current_turn_persisted_item_count: int = 0
    """Number of items from new_items already persisted to session for the
    current turn."""
    _current_turn: int = 0
    """The current turn number. This is preserved when converting to RunState."""
    _model_input_items: list[RunItem] = field(default_factory=list, repr=False)
    """Filtered items used to build model input when resuming runs."""
    _original_input: str | list[TResponseInputItem] | None = field(default=None, repr=False)
    """The original input for the current run segment.
    This is updated when handoffs or resume logic replace the input history, and used by to_state()
    to preserve the correct originalInput when serializing state."""
    _conversation_id: str | None = field(default=None, repr=False)
    """Conversation identifier for server-managed runs."""
    _previous_response_id: str | None = field(default=None, repr=False)
    """Response identifier returned by the server for the last turn."""
    _auto_previous_response_id: bool = field(default=False, repr=False)
    """Whether automatic previous response tracking was enabled."""
    max_turns: int = 10
    """The maximum number of turns allowed for this run."""

    def __post_init__(self) -> None:
        self._last_agent_ref = weakref.ref(self._last_agent)

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run."""
        agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent"))
        if agent is not None:
            return agent
        if self._last_agent_ref:
            agent = self._last_agent_ref()
            if agent is not None:
                return agent
        raise AgentsException("Last agent reference is no longer available.")

    def _release_last_agent_reference(self) -> None:
        agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent"))
        if agent is None:
            return
        self._last_agent_ref = weakref.ref(agent)
        # Preserve dataclass field so repr/asdict continue to succeed.
        self.__dict__["_last_agent"] = None

    def to_state(self) -> RunState[Any]:
        """Create a RunState from this result to resume execution.

        This is useful when the run was interrupted (e.g., for tool approval). You can
        approve or reject the tool calls on the returned state, then pass it back to
        `Runner.run()` to continue execution.

        Returns:
            A RunState that can be used to resume the run.

        Example:
            ```python
            # Run agent until it needs approval
            result = await Runner.run(agent, "Use the delete_file tool")

            if result.interruptions:
                # Approve the tool call
                state = result.to_state()
                state.approve(result.interruptions[0])

                # Resume the run
                result = await Runner.run(agent, state)
            ```
        """
        # Create a RunState from the current result
        original_input_for_state = getattr(self, "_original_input", None)
        state = RunState(
            context=self.context_wrapper,
            original_input=original_input_for_state
            if original_input_for_state is not None
            else self.input,
            starting_agent=self.last_agent,
            max_turns=self.max_turns,
        )

        return _populate_state_from_result(
            state,
            self,
            current_turn=self._current_turn,
            last_processed_response=self._last_processed_response,
            current_turn_persisted_item_count=self._current_turn_persisted_item_count,
            tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,
            conversation_id=self._conversation_id,
            previous_response_id=self._previous_response_id,
            auto_previous_response_id=self._auto_previous_response_id,
        )

    def __str__(self) -> str:
        return pretty_print_result(self)

max_turns class-attribute instance-attribute

max_turns: int = 10

The maximum number of turns allowed for this run.

last_agent property

last_agent: Agent[Any]

The last agent that was run.

input instance-attribute

input: str | list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutated version of the input, if there are handoff input filters that mutate the input.

new_items instance-attribute

new_items: list[RunItem]

The new items generated during the agent run. These include things like new messages, tool calls and their outputs, etc.

raw_responses instance-attribute

raw_responses: list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

final_output instance-attribute

final_output: Any

The output of the last agent.

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

Tool output guardrail results from all tools executed during the run.

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

The context wrapper for the agent run.

interruptions instance-attribute

interruptions: list[ToolApprovalItem]

Pending tool approval requests (interruptions) for this run.

last_response_id property

last_response_id: str | None

Convenience method to get the response ID of the last model response.

to_state

to_state() -> RunState[Any]

Create a RunState from this result to resume execution.

This is useful when the run was interrupted (e.g., for tool approval). You can approve or reject the tool calls on the returned state, then pass it back to Runner.run() to continue execution.

Returns:

Type Description
RunState[Any]

A RunState that can be used to resume the run.

Example
# Run agent until it needs approval
result = await Runner.run(agent, "Use the delete_file tool")

if result.interruptions:
    # Approve the tool call
    state = result.to_state()
    state.approve(result.interruptions[0])

    # Resume the run
    result = await Runner.run(agent, state)
Source code in src/agents/result.py
def to_state(self) -> RunState[Any]:
    """Create a RunState from this result to resume execution.

    This is useful when the run was interrupted (e.g., for tool approval). You can
    approve or reject the tool calls on the returned state, then pass it back to
    `Runner.run()` to continue execution.

    Returns:
        A RunState that can be used to resume the run.

    Example:
        ```python
        # Run agent until it needs approval
        result = await Runner.run(agent, "Use the delete_file tool")

        if result.interruptions:
            # Approve the tool call
            state = result.to_state()
            state.approve(result.interruptions[0])

            # Resume the run
            result = await Runner.run(agent, state)
        ```
    """
    # Create a RunState from the current result
    original_input_for_state = getattr(self, "_original_input", None)
    state = RunState(
        context=self.context_wrapper,
        original_input=original_input_for_state
        if original_input_for_state is not None
        else self.input,
        starting_agent=self.last_agent,
        max_turns=self.max_turns,
    )

    return _populate_state_from_result(
        state,
        self,
        current_turn=self._current_turn,
        last_processed_response=self._last_processed_response,
        current_turn_persisted_item_count=self._current_turn_persisted_item_count,
        tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,
        conversation_id=self._conversation_id,
        previous_response_id=self._previous_response_id,
        auto_previous_response_id=self._auto_previous_response_id,
    )

release_agents

release_agents(*, release_new_items: bool = True) -> None

Release strong references to agents held by this result. After calling this method, accessing item.agent or last_agent may return None if the agent has been garbage collected. Callers can use this when they are done inspecting the result and want to eagerly drop any associated agent graph.

Source code in src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

A convenience method to cast the final output to a specific type. By default, the cast is only for the typechecker. If you set raise_if_incorrect_type to True, we'll raise a TypeError if the final output is not of the given type.

Parameters:

Name Type Description Default
cls type[T]

The type to cast the final output to.

required
raise_if_incorrect_type bool

If True, we'll raise a TypeError if the final output is not of the given type.

False

Returns:

Type Description
T

The final output casted to the given type.

Source code in src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code in src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items: list[TResponseInputItem] = []
    for item in self.new_items:
        if isinstance(item, ToolApprovalItem):
            continue
        new_items.append(item.to_input_item())

    return original_items + new_items

RunResultStreaming dataclass

Bases: RunResultBase

The result of an agent run in streaming mode. You can use the stream_events method to receive semantic events as they are generated.

The streaming method will raise: - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit. - A GuardrailTripwireTriggered exception if a guardrail is tripped.

Source code in src/agents/result.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
@dataclass
class RunResultStreaming(RunResultBase):
    """The result of an agent run in streaming mode. You can use the `stream_events` method to
    receive semantic events as they are generated.

    The streaming method will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """

    current_agent: Agent[Any]
    """The current agent that is running."""

    current_turn: int
    """The current turn number."""

    max_turns: int
    """The maximum number of turns the agent can run for."""

    final_output: Any
    """The final output of the agent. This is None until the agent has finished running."""

    _current_agent_output_schema: AgentOutputSchemaBase | None = field(repr=False)

    trace: Trace | None = field(repr=False)

    is_complete: bool = False
    """Whether the agent has finished running."""

    _current_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field(
        init=False,
        repr=False,
        default=None,
    )
    _last_processed_response: ProcessedResponse | None = field(default=None, repr=False)
    """The last processed model response. This is needed for resuming from interruptions."""

    _model_input_items: list[RunItem] = field(default_factory=list, repr=False)
    """Filtered items used to build model input between streaming turns."""

    # Queues that the background run_loop writes to
    _event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field(
        default_factory=asyncio.Queue, repr=False
    )
    _input_guardrail_queue: asyncio.Queue[InputGuardrailResult] = field(
        default_factory=asyncio.Queue, repr=False
    )

    # Store the asyncio tasks that we're waiting on
    run_loop_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _input_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _output_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _stored_exception: Exception | None = field(default=None, repr=False)
    _waiting_on_event_queue: bool = field(default=False, repr=False)

    _current_turn_persisted_item_count: int = 0
    """Number of items from new_items already persisted to session for the
    current turn."""

    _stream_input_persisted: bool = False
    """Whether the input has been persisted to the session. Prevents double-saving."""

    _original_input_for_persistence: list[TResponseInputItem] = field(default_factory=list)
    """Original turn input before session history was merged, used for
    persistence (matches JS sessionInputOriginalSnapshot)."""

    # Soft cancel state
    _cancel_mode: Literal["none", "immediate", "after_turn"] = field(default="none", repr=False)

    _max_turns_handled: bool = field(default=False, repr=False)

    _original_input: str | list[TResponseInputItem] | None = field(default=None, repr=False)
    """The original input from the first turn. Unlike `input`, this is never updated during the run.
    Used by to_state() to preserve the correct originalInput when serializing state."""
    _tool_use_tracker_snapshot: dict[str, list[str]] = field(default_factory=dict, repr=False)
    _state: Any = field(default=None, repr=False)
    """Internal reference to the RunState for streaming results."""
    _conversation_id: str | None = field(default=None, repr=False)
    """Conversation identifier for server-managed runs."""
    _previous_response_id: str | None = field(default=None, repr=False)
    """Response identifier returned by the server for the last turn."""
    _auto_previous_response_id: bool = field(default=False, repr=False)
    """Whether automatic previous response tracking was enabled."""

    def __post_init__(self) -> None:
        self._current_agent_ref = weakref.ref(self.current_agent)
        # Store the original input at creation time (it will be set via input field)
        if self._original_input is None:
            self._original_input = self.input

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run. Updates as the agent run progresses, so the true last agent
        is only available after the agent run is complete.
        """
        agent = cast("Agent[Any] | None", self.__dict__.get("current_agent"))
        if agent is not None:
            return agent
        if self._current_agent_ref:
            agent = self._current_agent_ref()
            if agent is not None:
                return agent
        raise AgentsException("Last agent reference is no longer available.")

    def _release_last_agent_reference(self) -> None:
        agent = cast("Agent[Any] | None", self.__dict__.get("current_agent"))
        if agent is None:
            return
        self._current_agent_ref = weakref.ref(agent)
        # Preserve dataclass field so repr/asdict continue to succeed.
        self.__dict__["current_agent"] = None

    def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
        """Cancel the streaming run.

        Args:
            mode: Cancellation strategy:
                - "immediate": Stop immediately, cancel all tasks, clear queues (default)
                - "after_turn": Complete current turn gracefully before stopping
                    * Allows LLM response to finish
                    * Executes pending tool calls
                    * Saves session state properly
                    * Tracks usage accurately
                    * Stops before next turn begins

        Example:
            ```python
            result = Runner.run_streamed(agent, "Task", session=session)

            async for event in result.stream_events():
                if user_interrupted():
                    result.cancel(mode="after_turn")  # Graceful
                    # result.cancel()  # Immediate (default)
            ```

        Note: After calling cancel(), you should continue consuming stream_events()
        to allow the cancellation to complete properly.
        """
        # Store the cancel mode for the background task to check
        self._cancel_mode = mode

        if mode == "immediate":
            # Existing behavior - immediate shutdown
            self._cleanup_tasks()  # Cancel all running tasks
            self.is_complete = True  # Mark the run as complete to stop event streaming

            while not self._input_guardrail_queue.empty():
                self._input_guardrail_queue.get_nowait()

            # Unblock any streamers waiting on the event queue.
            self._event_queue.put_nowait(QueueCompleteSentinel())
            if not self._waiting_on_event_queue:
                self._drain_event_queue()

        elif mode == "after_turn":
            # Soft cancel - just set the flag
            # The streaming loop will check this and stop gracefully
            # Don't call _cleanup_tasks() or clear queues yet
            pass

    async def stream_events(self) -> AsyncIterator[StreamEvent]:
        """Stream deltas for new items as they are generated. We're using the types from the
        OpenAI Responses API, so these are semantic events: each event has a `type` field that
        describes the type of the event, along with the data for that event.

        This will raise:
        - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
        - A GuardrailTripwireTriggered exception if a guardrail is tripped.
        """
        cancelled = False
        try:
            while True:
                self._check_errors()
                if self._stored_exception:
                    logger.debug("Breaking due to stored exception")
                    self.is_complete = True
                    break

                if self.is_complete and self._event_queue.empty():
                    break

                try:
                    self._waiting_on_event_queue = True
                    item = await self._event_queue.get()
                except asyncio.CancelledError:
                    cancelled = True
                    self.cancel()
                    raise
                finally:
                    self._waiting_on_event_queue = False

                if isinstance(item, QueueCompleteSentinel):
                    # Await input guardrails if they are still running, so late
                    # exceptions are captured.
                    await self._await_task_safely(self._input_guardrails_task)

                    self._event_queue.task_done()

                    # Check for errors, in case the queue was completed
                    # due to an exception
                    self._check_errors()
                    break

                yield item
                self._event_queue.task_done()
        finally:
            if cancelled:
                # Cancellation should return promptly, so avoid waiting on long-running tasks.
                # Tasks have already been cancelled above.
                self._cleanup_tasks()
            else:
                # Ensure main execution completes before cleanup to avoid race conditions
                # with session operations
                await self._await_task_safely(self.run_loop_task)
                # Safely terminate all background tasks after main execution has finished
                self._cleanup_tasks()

            # Allow any pending callbacks (e.g., cancellation handlers) to enqueue their
            # completion sentinels before we clear the queues for observability.
            await asyncio.sleep(0)

            # Drain queues so callers observing internal state see them empty after completion.
            self._drain_event_queue()
            self._drain_input_guardrail_queue()

        if self._stored_exception:
            raise self._stored_exception

    def _create_error_details(self) -> RunErrorDetails:
        """Return a `RunErrorDetails` object considering the current attributes of the class."""
        return RunErrorDetails(
            input=self.input,
            new_items=self.new_items,
            raw_responses=self.raw_responses,
            last_agent=self.current_agent,
            context_wrapper=self.context_wrapper,
            input_guardrail_results=self.input_guardrail_results,
            output_guardrail_results=self.output_guardrail_results,
        )

    def _check_errors(self):
        if self.current_turn > self.max_turns and not self._max_turns_handled:
            max_turns_exc = MaxTurnsExceeded(f"Max turns ({self.max_turns}) exceeded")
            max_turns_exc.run_data = self._create_error_details()
            self._stored_exception = max_turns_exc

        # Fetch all the completed guardrail results from the queue and raise if needed
        while not self._input_guardrail_queue.empty():
            guardrail_result = self._input_guardrail_queue.get_nowait()
            if guardrail_result.output.tripwire_triggered:
                tripwire_exc = InputGuardrailTripwireTriggered(guardrail_result)
                tripwire_exc.run_data = self._create_error_details()
                self._stored_exception = tripwire_exc

        # Check the tasks for any exceptions
        if self.run_loop_task and self.run_loop_task.done():
            if not self.run_loop_task.cancelled():
                run_impl_exc = self.run_loop_task.exception()
                if run_impl_exc and isinstance(run_impl_exc, Exception):
                    if isinstance(run_impl_exc, AgentsException) and run_impl_exc.run_data is None:
                        run_impl_exc.run_data = self._create_error_details()
                    self._stored_exception = run_impl_exc

        if self._input_guardrails_task and self._input_guardrails_task.done():
            if not self._input_guardrails_task.cancelled():
                in_guard_exc = self._input_guardrails_task.exception()
                if in_guard_exc and isinstance(in_guard_exc, Exception):
                    if isinstance(in_guard_exc, AgentsException) and in_guard_exc.run_data is None:
                        in_guard_exc.run_data = self._create_error_details()
                    self._stored_exception = in_guard_exc

        if self._output_guardrails_task and self._output_guardrails_task.done():
            if not self._output_guardrails_task.cancelled():
                out_guard_exc = self._output_guardrails_task.exception()
                if out_guard_exc and isinstance(out_guard_exc, Exception):
                    if (
                        isinstance(out_guard_exc, AgentsException)
                        and out_guard_exc.run_data is None
                    ):
                        out_guard_exc.run_data = self._create_error_details()
                    self._stored_exception = out_guard_exc

    def _cleanup_tasks(self):
        if self.run_loop_task and not self.run_loop_task.done():
            self.run_loop_task.cancel()

        if self._input_guardrails_task and not self._input_guardrails_task.done():
            self._input_guardrails_task.cancel()

        if self._output_guardrails_task and not self._output_guardrails_task.done():
            self._output_guardrails_task.cancel()

    def __str__(self) -> str:
        return pretty_print_run_result_streaming(self)

    async def _await_task_safely(self, task: asyncio.Task[Any] | None) -> None:
        """Await a task if present, ignoring cancellation and storing exceptions elsewhere.

        This ensures we do not lose late guardrail exceptions while not surfacing
        CancelledError to callers of stream_events.
        """
        if task and not task.done():
            try:
                await task
            except asyncio.CancelledError:
                # Task was cancelled (e.g., due to result.cancel()). Nothing to do here.
                pass
            except Exception:
                # The exception will be surfaced via _check_errors() if needed.
                pass

    def _drain_event_queue(self) -> None:
        """Remove any pending items from the event queue and mark them done."""
        while not self._event_queue.empty():
            try:
                self._event_queue.get_nowait()
                self._event_queue.task_done()
            except asyncio.QueueEmpty:
                break
            except ValueError:
                # task_done called too many times; nothing more to drain.
                break

    def _drain_input_guardrail_queue(self) -> None:
        """Remove any pending items from the input guardrail queue."""
        while not self._input_guardrail_queue.empty():
            try:
                self._input_guardrail_queue.get_nowait()
            except asyncio.QueueEmpty:
                break

    def to_state(self) -> RunState[Any]:
        """Create a RunState from this streaming result to resume execution.

        This is useful when the run was interrupted (e.g., for tool approval). You can
        approve or reject the tool calls on the returned state, then pass it back to
        `Runner.run_streamed()` to continue execution.

        Returns:
            A RunState that can be used to resume the run.

        Example:
            ```python
            # Run agent until it needs approval
            result = Runner.run_streamed(agent, "Use the delete_file tool")
            async for event in result.stream_events():
                pass

            if result.interruptions:
                # Approve the tool call
                state = result.to_state()
                state.approve(result.interruptions[0])

                # Resume the run
                result = Runner.run_streamed(agent, state)
                async for event in result.stream_events():
                    pass
            ```
        """
        # Create a RunState from the current result
        # Use _original_input (updated on handoffs/resume when input history changes).
        # This avoids serializing a mutated view of input history.
        state = RunState(
            context=self.context_wrapper,
            original_input=self._original_input if self._original_input is not None else self.input,
            starting_agent=self.last_agent,
            max_turns=self.max_turns,
        )

        return _populate_state_from_result(
            state,
            self,
            current_turn=self.current_turn,
            last_processed_response=self._last_processed_response,
            current_turn_persisted_item_count=self._current_turn_persisted_item_count,
            tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,
            conversation_id=self._conversation_id,
            previous_response_id=self._previous_response_id,
            auto_previous_response_id=self._auto_previous_response_id,
        )

current_agent instance-attribute

current_agent: Agent[Any]

The current agent that is running.

current_turn instance-attribute

current_turn: int

The current turn number.

max_turns instance-attribute

max_turns: int

The maximum number of turns the agent can run for.

final_output instance-attribute

final_output: Any

The final output of the agent. This is None until the agent has finished running.

is_complete class-attribute instance-attribute

is_complete: bool = False

Whether the agent has finished running.

last_agent property

last_agent: Agent[Any]

The last agent that was run. Updates as the agent run progresses, so the true last agent is only available after the agent run is complete.

input instance-attribute

input: str | list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutated version of the input, if there are handoff input filters that mutate the input.

new_items instance-attribute

new_items: list[RunItem]

The new items generated during the agent run. These include things like new messages, tool calls and their outputs, etc.

raw_responses instance-attribute

raw_responses: list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

Tool output guardrail results from all tools executed during the run.

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

The context wrapper for the agent run.

interruptions instance-attribute

interruptions: list[ToolApprovalItem]

Pending tool approval requests (interruptions) for this run.

last_response_id property

last_response_id: str | None

Convenience method to get the response ID of the last model response.

cancel

cancel(
    mode: Literal["immediate", "after_turn"] = "immediate",
) -> None

Cancel the streaming run.

Parameters:

Name Type Description Default
mode Literal['immediate', 'after_turn']

Cancellation strategy: - "immediate": Stop immediately, cancel all tasks, clear queues (default) - "after_turn": Complete current turn gracefully before stopping * Allows LLM response to finish * Executes pending tool calls * Saves session state properly * Tracks usage accurately * Stops before next turn begins

'immediate'
Example
result = Runner.run_streamed(agent, "Task", session=session)

async for event in result.stream_events():
    if user_interrupted():
        result.cancel(mode="after_turn")  # Graceful
        # result.cancel()  # Immediate (default)

Note: After calling cancel(), you should continue consuming stream_events() to allow the cancellation to complete properly.

Source code in src/agents/result.py
def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
    """Cancel the streaming run.

    Args:
        mode: Cancellation strategy:
            - "immediate": Stop immediately, cancel all tasks, clear queues (default)
            - "after_turn": Complete current turn gracefully before stopping
                * Allows LLM response to finish
                * Executes pending tool calls
                * Saves session state properly
                * Tracks usage accurately
                * Stops before next turn begins

    Example:
        ```python
        result = Runner.run_streamed(agent, "Task", session=session)

        async for event in result.stream_events():
            if user_interrupted():
                result.cancel(mode="after_turn")  # Graceful
                # result.cancel()  # Immediate (default)
        ```

    Note: After calling cancel(), you should continue consuming stream_events()
    to allow the cancellation to complete properly.
    """
    # Store the cancel mode for the background task to check
    self._cancel_mode = mode

    if mode == "immediate":
        # Existing behavior - immediate shutdown
        self._cleanup_tasks()  # Cancel all running tasks
        self.is_complete = True  # Mark the run as complete to stop event streaming

        while not self._input_guardrail_queue.empty():
            self._input_guardrail_queue.get_nowait()

        # Unblock any streamers waiting on the event queue.
        self._event_queue.put_nowait(QueueCompleteSentinel())
        if not self._waiting_on_event_queue:
            self._drain_event_queue()

    elif mode == "after_turn":
        # Soft cancel - just set the flag
        # The streaming loop will check this and stop gracefully
        # Don't call _cleanup_tasks() or clear queues yet
        pass

stream_events async

stream_events() -> AsyncIterator[StreamEvent]

Stream deltas for new items as they are generated. We're using the types from the OpenAI Responses API, so these are semantic events: each event has a type field that describes the type of the event, along with the data for that event.

This will raise: - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit. - A GuardrailTripwireTriggered exception if a guardrail is tripped.

Source code in src/agents/result.py
async def stream_events(self) -> AsyncIterator[StreamEvent]:
    """Stream deltas for new items as they are generated. We're using the types from the
    OpenAI Responses API, so these are semantic events: each event has a `type` field that
    describes the type of the event, along with the data for that event.

    This will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """
    cancelled = False
    try:
        while True:
            self._check_errors()
            if self._stored_exception:
                logger.debug("Breaking due to stored exception")
                self.is_complete = True
                break

            if self.is_complete and self._event_queue.empty():
                break

            try:
                self._waiting_on_event_queue = True
                item = await self._event_queue.get()
            except asyncio.CancelledError:
                cancelled = True
                self.cancel()
                raise
            finally:
                self._waiting_on_event_queue = False

            if isinstance(item, QueueCompleteSentinel):
                # Await input guardrails if they are still running, so late
                # exceptions are captured.
                await self._await_task_safely(self._input_guardrails_task)

                self._event_queue.task_done()

                # Check for errors, in case the queue was completed
                # due to an exception
                self._check_errors()
                break

            yield item
            self._event_queue.task_done()
    finally:
        if cancelled:
            # Cancellation should return promptly, so avoid waiting on long-running tasks.
            # Tasks have already been cancelled above.
            self._cleanup_tasks()
        else:
            # Ensure main execution completes before cleanup to avoid race conditions
            # with session operations
            await self._await_task_safely(self.run_loop_task)
            # Safely terminate all background tasks after main execution has finished
            self._cleanup_tasks()

        # Allow any pending callbacks (e.g., cancellation handlers) to enqueue their
        # completion sentinels before we clear the queues for observability.
        await asyncio.sleep(0)

        # Drain queues so callers observing internal state see them empty after completion.
        self._drain_event_queue()
        self._drain_input_guardrail_queue()

    if self._stored_exception:
        raise self._stored_exception

to_state

to_state() -> RunState[Any]

Create a RunState from this streaming result to resume execution.

This is useful when the run was interrupted (e.g., for tool approval). You can approve or reject the tool calls on the returned state, then pass it back to Runner.run_streamed() to continue execution.

Returns:

Type Description
RunState[Any]

A RunState that can be used to resume the run.

Example
# Run agent until it needs approval
result = Runner.run_streamed(agent, "Use the delete_file tool")
async for event in result.stream_events():
    pass

if result.interruptions:
    # Approve the tool call
    state = result.to_state()
    state.approve(result.interruptions[0])

    # Resume the run
    result = Runner.run_streamed(agent, state)
    async for event in result.stream_events():
        pass
Source code in src/agents/result.py
def to_state(self) -> RunState[Any]:
    """Create a RunState from this streaming result to resume execution.

    This is useful when the run was interrupted (e.g., for tool approval). You can
    approve or reject the tool calls on the returned state, then pass it back to
    `Runner.run_streamed()` to continue execution.

    Returns:
        A RunState that can be used to resume the run.

    Example:
        ```python
        # Run agent until it needs approval
        result = Runner.run_streamed(agent, "Use the delete_file tool")
        async for event in result.stream_events():
            pass

        if result.interruptions:
            # Approve the tool call
            state = result.to_state()
            state.approve(result.interruptions[0])

            # Resume the run
            result = Runner.run_streamed(agent, state)
            async for event in result.stream_events():
                pass
        ```
    """
    # Create a RunState from the current result
    # Use _original_input (updated on handoffs/resume when input history changes).
    # This avoids serializing a mutated view of input history.
    state = RunState(
        context=self.context_wrapper,
        original_input=self._original_input if self._original_input is not None else self.input,
        starting_agent=self.last_agent,
        max_turns=self.max_turns,
    )

    return _populate_state_from_result(
        state,
        self,
        current_turn=self.current_turn,
        last_processed_response=self._last_processed_response,
        current_turn_persisted_item_count=self._current_turn_persisted_item_count,
        tool_use_tracker_snapshot=self._tool_use_tracker_snapshot,
        conversation_id=self._conversation_id,
        previous_response_id=self._previous_response_id,
        auto_previous_response_id=self._auto_previous_response_id,
    )

release_agents

release_agents(*, release_new_items: bool = True) -> None

Release strong references to agents held by this result. After calling this method, accessing item.agent or last_agent may return None if the agent has been garbage collected. Callers can use this when they are done inspecting the result and want to eagerly drop any associated agent graph.

Source code in src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

A convenience method to cast the final output to a specific type. By default, the cast is only for the typechecker. If you set raise_if_incorrect_type to True, we'll raise a TypeError if the final output is not of the given type.

Parameters:

Name Type Description Default
cls type[T]

The type to cast the final output to.

required
raise_if_incorrect_type bool

If True, we'll raise a TypeError if the final output is not of the given type.

False

Returns:

Type Description
T

The final output casted to the given type.

Source code in src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code in src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items: list[TResponseInputItem] = []
    for item in self.new_items:
        if isinstance(item, ToolApprovalItem):
            continue
        new_items.append(item.to_input_item())

    return original_items + new_items