跳转至

Results

RunResultBase dataclass

Bases: ABC

Source code in src/agents/result.py
@dataclass
class RunResultBase(abc.ABC):
    input: str | list[TResponseInputItem]
    """The original input items i.e. the items before run() was called. This may be a mutated
    version of the input, if there are handoff input filters that mutate the input.
    """

    new_items: list[RunItem]
    """The new items generated during the agent run. These include things like new messages, tool
    calls and their outputs, etc.
    """

    raw_responses: list[ModelResponse]
    """The raw LLM responses generated by the model during the agent run."""

    final_output: Any
    """The output of the last agent."""

    input_guardrail_results: list[InputGuardrailResult]
    """Guardrail results for the input messages."""

    output_guardrail_results: list[OutputGuardrailResult]
    """Guardrail results for the final output of the agent."""

    tool_input_guardrail_results: list[ToolInputGuardrailResult]
    """Tool input guardrail results from all tools executed during the run."""

    tool_output_guardrail_results: list[ToolOutputGuardrailResult]
    """Tool output guardrail results from all tools executed during the run."""

    context_wrapper: RunContextWrapper[Any]
    """The context wrapper for the agent run."""

    @property
    @abc.abstractmethod
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run."""

    def release_agents(self, *, release_new_items: bool = True) -> None:
        """
        Release strong references to agents held by this result. After calling this method,
        accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
        collected. Callers can use this when they are done inspecting the result and want to
        eagerly drop any associated agent graph.
        """
        if release_new_items:
            for item in self.new_items:
                release = getattr(item, "release_agent", None)
                if callable(release):
                    release()
        self._release_last_agent_reference()

    def __del__(self) -> None:
        try:
            # Fall back to releasing agents automatically in case the caller never invoked
            # `release_agents()` explicitly so GC of the RunResult drops the last strong reference.
            # We pass `release_new_items=False` so RunItems that the user intentionally keeps
            # continue exposing their originating agent until that agent itself is collected.
            self.release_agents(release_new_items=False)
        except Exception:
            # Avoid raising from __del__.
            pass

    @abc.abstractmethod
    def _release_last_agent_reference(self) -> None:
        """Release stored agent reference specific to the concrete result type."""

    def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
        """A convenience method to cast the final output to a specific type. By default, the cast
        is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
        TypeError if the final output is not of the given type.

        Args:
            cls: The type to cast the final output to.
            raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
                the given type.

        Returns:
            The final output casted to the given type.
        """
        if raise_if_incorrect_type and not isinstance(self.final_output, cls):
            raise TypeError(f"Final output is not of type {cls.__name__}")

        return cast(T, self.final_output)

    def to_input_list(self) -> list[TResponseInputItem]:
        """Creates a new input list, merging the original input with all the new items generated."""
        original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
        new_items = [item.to_input_item() for item in self.new_items]

        return original_items + new_items

    @property
    def last_response_id(self) -> str | None:
        """Convenience method to get the response ID of the last model response."""
        if not self.raw_responses:
            return None

        return self.raw_responses[-1].response_id

input instance-attribute

input: str | list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutated version of the input, if there are handoff input filters that mutate the input.

new_items instance-attribute

new_items: list[RunItem]

The new items generated during the agent run. These include things like new messages, tool calls and their outputs, etc.

raw_responses instance-attribute

raw_responses: list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

final_output instance-attribute

final_output: Any

The output of the last agent.

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

Tool output guardrail results from all tools executed during the run.

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

The context wrapper for the agent run.

last_agent abstractmethod property

last_agent: Agent[Any]

The last agent that was run.

last_response_id property

last_response_id: str | None

Convenience method to get the response ID of the last model response.

release_agents

release_agents(*, release_new_items: bool = True) -> None

Release strong references to agents held by this result. After calling this method, accessing item.agent or last_agent may return None if the agent has been garbage collected. Callers can use this when they are done inspecting the result and want to eagerly drop any associated agent graph.

Source code in src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

A convenience method to cast the final output to a specific type. By default, the cast is only for the typechecker. If you set raise_if_incorrect_type to True, we'll raise a TypeError if the final output is not of the given type.

Parameters:

Name Type Description Default
cls type[T]

The type to cast the final output to.

required
raise_if_incorrect_type bool

If True, we'll raise a TypeError if the final output is not of the given type.

False

Returns:

Type Description
T

The final output casted to the given type.

Source code in src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code in src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items = [item.to_input_item() for item in self.new_items]

    return original_items + new_items

RunResult dataclass

Bases: RunResultBase

Source code in src/agents/result.py
@dataclass
class RunResult(RunResultBase):
    _last_agent: Agent[Any]
    _last_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field(
        init=False,
        repr=False,
        default=None,
    )

    def __post_init__(self) -> None:
        self._last_agent_ref = weakref.ref(self._last_agent)

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run."""
        agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent"))
        if agent is not None:
            return agent
        if self._last_agent_ref:
            agent = self._last_agent_ref()
            if agent is not None:
                return agent
        raise AgentsException("Last agent reference is no longer available.")

    def _release_last_agent_reference(self) -> None:
        agent = cast("Agent[Any] | None", self.__dict__.get("_last_agent"))
        if agent is None:
            return
        self._last_agent_ref = weakref.ref(agent)
        # Preserve dataclass field so repr/asdict continue to succeed.
        self.__dict__["_last_agent"] = None

    def __str__(self) -> str:
        return pretty_print_result(self)

last_agent property

last_agent: Agent[Any]

The last agent that was run.

input instance-attribute

input: str | list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutated version of the input, if there are handoff input filters that mutate the input.

new_items instance-attribute

new_items: list[RunItem]

The new items generated during the agent run. These include things like new messages, tool calls and their outputs, etc.

raw_responses instance-attribute

raw_responses: list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

final_output instance-attribute

final_output: Any

The output of the last agent.

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

Tool output guardrail results from all tools executed during the run.

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

The context wrapper for the agent run.

last_response_id property

last_response_id: str | None

Convenience method to get the response ID of the last model response.

release_agents

release_agents(*, release_new_items: bool = True) -> None

Release strong references to agents held by this result. After calling this method, accessing item.agent or last_agent may return None if the agent has been garbage collected. Callers can use this when they are done inspecting the result and want to eagerly drop any associated agent graph.

Source code in src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

A convenience method to cast the final output to a specific type. By default, the cast is only for the typechecker. If you set raise_if_incorrect_type to True, we'll raise a TypeError if the final output is not of the given type.

Parameters:

Name Type Description Default
cls type[T]

The type to cast the final output to.

required
raise_if_incorrect_type bool

If True, we'll raise a TypeError if the final output is not of the given type.

False

Returns:

Type Description
T

The final output casted to the given type.

Source code in src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code in src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items = [item.to_input_item() for item in self.new_items]

    return original_items + new_items

RunResultStreaming dataclass

Bases: RunResultBase

The result of an agent run in streaming mode. You can use the stream_events method to receive semantic events as they are generated.

The streaming method will raise: - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit. - A GuardrailTripwireTriggered exception if a guardrail is tripped.

Source code in src/agents/result.py
@dataclass
class RunResultStreaming(RunResultBase):
    """The result of an agent run in streaming mode. You can use the `stream_events` method to
    receive semantic events as they are generated.

    The streaming method will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """

    current_agent: Agent[Any]
    """The current agent that is running."""

    current_turn: int
    """The current turn number."""

    max_turns: int
    """The maximum number of turns the agent can run for."""

    final_output: Any
    """The final output of the agent. This is None until the agent has finished running."""

    _current_agent_output_schema: AgentOutputSchemaBase | None = field(repr=False)

    trace: Trace | None = field(repr=False)

    is_complete: bool = False
    """Whether the agent has finished running."""

    _current_agent_ref: weakref.ReferenceType[Agent[Any]] | None = field(
        init=False,
        repr=False,
        default=None,
    )

    # Queues that the background run_loop writes to
    _event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] = field(
        default_factory=asyncio.Queue, repr=False
    )
    _input_guardrail_queue: asyncio.Queue[InputGuardrailResult] = field(
        default_factory=asyncio.Queue, repr=False
    )

    # Store the asyncio tasks that we're waiting on
    _run_impl_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _input_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _output_guardrails_task: asyncio.Task[Any] | None = field(default=None, repr=False)
    _stored_exception: Exception | None = field(default=None, repr=False)

    # Soft cancel state
    _cancel_mode: Literal["none", "immediate", "after_turn"] = field(default="none", repr=False)

    def __post_init__(self) -> None:
        self._current_agent_ref = weakref.ref(self.current_agent)

    @property
    def last_agent(self) -> Agent[Any]:
        """The last agent that was run. Updates as the agent run progresses, so the true last agent
        is only available after the agent run is complete.
        """
        agent = cast("Agent[Any] | None", self.__dict__.get("current_agent"))
        if agent is not None:
            return agent
        if self._current_agent_ref:
            agent = self._current_agent_ref()
            if agent is not None:
                return agent
        raise AgentsException("Last agent reference is no longer available.")

    def _release_last_agent_reference(self) -> None:
        agent = cast("Agent[Any] | None", self.__dict__.get("current_agent"))
        if agent is None:
            return
        self._current_agent_ref = weakref.ref(agent)
        # Preserve dataclass field so repr/asdict continue to succeed.
        self.__dict__["current_agent"] = None

    def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
        """Cancel the streaming run.

        Args:
            mode: Cancellation strategy:
                - "immediate": Stop immediately, cancel all tasks, clear queues (default)
                - "after_turn": Complete current turn gracefully before stopping
                    * Allows LLM response to finish
                    * Executes pending tool calls
                    * Saves session state properly
                    * Tracks usage accurately
                    * Stops before next turn begins

        Example:
            ```python
            result = Runner.run_streamed(agent, "Task", session=session)

            async for event in result.stream_events():
                if user_interrupted():
                    result.cancel(mode="after_turn")  # Graceful
                    # result.cancel()  # Immediate (default)
            ```

        Note: After calling cancel(), you should continue consuming stream_events()
        to allow the cancellation to complete properly.
        """
        # Store the cancel mode for the background task to check
        self._cancel_mode = mode

        if mode == "immediate":
            # Existing behavior - immediate shutdown
            self._cleanup_tasks()  # Cancel all running tasks
            self.is_complete = True  # Mark the run as complete to stop event streaming

            # Optionally, clear the event queue to prevent processing stale events
            while not self._event_queue.empty():
                self._event_queue.get_nowait()
            while not self._input_guardrail_queue.empty():
                self._input_guardrail_queue.get_nowait()

        elif mode == "after_turn":
            # Soft cancel - just set the flag
            # The streaming loop will check this and stop gracefully
            # Don't call _cleanup_tasks() or clear queues yet
            pass

    async def stream_events(self) -> AsyncIterator[StreamEvent]:
        """Stream deltas for new items as they are generated. We're using the types from the
        OpenAI Responses API, so these are semantic events: each event has a `type` field that
        describes the type of the event, along with the data for that event.

        This will raise:
        - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
        - A GuardrailTripwireTriggered exception if a guardrail is tripped.
        """
        try:
            while True:
                self._check_errors()
                if self._stored_exception:
                    logger.debug("Breaking due to stored exception")
                    self.is_complete = True
                    break

                if self.is_complete and self._event_queue.empty():
                    break

                try:
                    item = await self._event_queue.get()
                except asyncio.CancelledError:
                    break

                if isinstance(item, QueueCompleteSentinel):
                    # Await input guardrails if they are still running, so late
                    # exceptions are captured.
                    await self._await_task_safely(self._input_guardrails_task)

                    self._event_queue.task_done()

                    # Check for errors, in case the queue was completed
                    # due to an exception
                    self._check_errors()
                    break

                yield item
                self._event_queue.task_done()
        finally:
            # Ensure main execution completes before cleanup to avoid race conditions
            # with session operations
            await self._await_task_safely(self._run_impl_task)
            # Safely terminate all background tasks after main execution has finished
            self._cleanup_tasks()

        if self._stored_exception:
            raise self._stored_exception

    def _create_error_details(self) -> RunErrorDetails:
        """Return a `RunErrorDetails` object considering the current attributes of the class."""
        return RunErrorDetails(
            input=self.input,
            new_items=self.new_items,
            raw_responses=self.raw_responses,
            last_agent=self.current_agent,
            context_wrapper=self.context_wrapper,
            input_guardrail_results=self.input_guardrail_results,
            output_guardrail_results=self.output_guardrail_results,
        )

    def _check_errors(self):
        if self.current_turn > self.max_turns:
            max_turns_exc = MaxTurnsExceeded(f"Max turns ({self.max_turns}) exceeded")
            max_turns_exc.run_data = self._create_error_details()
            self._stored_exception = max_turns_exc

        # Fetch all the completed guardrail results from the queue and raise if needed
        while not self._input_guardrail_queue.empty():
            guardrail_result = self._input_guardrail_queue.get_nowait()
            if guardrail_result.output.tripwire_triggered:
                tripwire_exc = InputGuardrailTripwireTriggered(guardrail_result)
                tripwire_exc.run_data = self._create_error_details()
                self._stored_exception = tripwire_exc

        # Check the tasks for any exceptions
        if self._run_impl_task and self._run_impl_task.done():
            run_impl_exc = self._run_impl_task.exception()
            if run_impl_exc and isinstance(run_impl_exc, Exception):
                if isinstance(run_impl_exc, AgentsException) and run_impl_exc.run_data is None:
                    run_impl_exc.run_data = self._create_error_details()
                self._stored_exception = run_impl_exc

        if self._input_guardrails_task and self._input_guardrails_task.done():
            in_guard_exc = self._input_guardrails_task.exception()
            if in_guard_exc and isinstance(in_guard_exc, Exception):
                if isinstance(in_guard_exc, AgentsException) and in_guard_exc.run_data is None:
                    in_guard_exc.run_data = self._create_error_details()
                self._stored_exception = in_guard_exc

        if self._output_guardrails_task and self._output_guardrails_task.done():
            out_guard_exc = self._output_guardrails_task.exception()
            if out_guard_exc and isinstance(out_guard_exc, Exception):
                if isinstance(out_guard_exc, AgentsException) and out_guard_exc.run_data is None:
                    out_guard_exc.run_data = self._create_error_details()
                self._stored_exception = out_guard_exc

    def _cleanup_tasks(self):
        if self._run_impl_task and not self._run_impl_task.done():
            self._run_impl_task.cancel()

        if self._input_guardrails_task and not self._input_guardrails_task.done():
            self._input_guardrails_task.cancel()

        if self._output_guardrails_task and not self._output_guardrails_task.done():
            self._output_guardrails_task.cancel()

    def __str__(self) -> str:
        return pretty_print_run_result_streaming(self)

    async def _await_task_safely(self, task: asyncio.Task[Any] | None) -> None:
        """Await a task if present, ignoring cancellation and storing exceptions elsewhere.

        This ensures we do not lose late guardrail exceptions while not surfacing
        CancelledError to callers of stream_events.
        """
        if task and not task.done():
            try:
                await task
            except asyncio.CancelledError:
                # Task was cancelled (e.g., due to result.cancel()). Nothing to do here.
                pass
            except Exception:
                # The exception will be surfaced via _check_errors() if needed.
                pass

current_agent instance-attribute

current_agent: Agent[Any]

The current agent that is running.

current_turn instance-attribute

current_turn: int

The current turn number.

max_turns instance-attribute

max_turns: int

The maximum number of turns the agent can run for.

final_output instance-attribute

final_output: Any

The final output of the agent. This is None until the agent has finished running.

is_complete class-attribute instance-attribute

is_complete: bool = False

Whether the agent has finished running.

last_agent property

last_agent: Agent[Any]

The last agent that was run. Updates as the agent run progresses, so the true last agent is only available after the agent run is complete.

input instance-attribute

input: str | list[TResponseInputItem]

The original input items i.e. the items before run() was called. This may be a mutated version of the input, if there are handoff input filters that mutate the input.

new_items instance-attribute

new_items: list[RunItem]

The new items generated during the agent run. These include things like new messages, tool calls and their outputs, etc.

raw_responses instance-attribute

raw_responses: list[ModelResponse]

The raw LLM responses generated by the model during the agent run.

input_guardrail_results instance-attribute

input_guardrail_results: list[InputGuardrailResult]

Guardrail results for the input messages.

output_guardrail_results instance-attribute

output_guardrail_results: list[OutputGuardrailResult]

Guardrail results for the final output of the agent.

tool_input_guardrail_results instance-attribute

tool_input_guardrail_results: list[ToolInputGuardrailResult]

Tool input guardrail results from all tools executed during the run.

tool_output_guardrail_results instance-attribute

tool_output_guardrail_results: list[
    ToolOutputGuardrailResult
]

Tool output guardrail results from all tools executed during the run.

context_wrapper instance-attribute

context_wrapper: RunContextWrapper[Any]

The context wrapper for the agent run.

last_response_id property

last_response_id: str | None

Convenience method to get the response ID of the last model response.

cancel

cancel(
    mode: Literal["immediate", "after_turn"] = "immediate",
) -> None

Cancel the streaming run.

Parameters:

Name Type Description Default
mode Literal['immediate', 'after_turn']

Cancellation strategy: - "immediate": Stop immediately, cancel all tasks, clear queues (default) - "after_turn": Complete current turn gracefully before stopping * Allows LLM response to finish * Executes pending tool calls * Saves session state properly * Tracks usage accurately * Stops before next turn begins

'immediate'
Example
result = Runner.run_streamed(agent, "Task", session=session)

async for event in result.stream_events():
    if user_interrupted():
        result.cancel(mode="after_turn")  # Graceful
        # result.cancel()  # Immediate (default)

Note: After calling cancel(), you should continue consuming stream_events() to allow the cancellation to complete properly.

Source code in src/agents/result.py
def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None:
    """Cancel the streaming run.

    Args:
        mode: Cancellation strategy:
            - "immediate": Stop immediately, cancel all tasks, clear queues (default)
            - "after_turn": Complete current turn gracefully before stopping
                * Allows LLM response to finish
                * Executes pending tool calls
                * Saves session state properly
                * Tracks usage accurately
                * Stops before next turn begins

    Example:
        ```python
        result = Runner.run_streamed(agent, "Task", session=session)

        async for event in result.stream_events():
            if user_interrupted():
                result.cancel(mode="after_turn")  # Graceful
                # result.cancel()  # Immediate (default)
        ```

    Note: After calling cancel(), you should continue consuming stream_events()
    to allow the cancellation to complete properly.
    """
    # Store the cancel mode for the background task to check
    self._cancel_mode = mode

    if mode == "immediate":
        # Existing behavior - immediate shutdown
        self._cleanup_tasks()  # Cancel all running tasks
        self.is_complete = True  # Mark the run as complete to stop event streaming

        # Optionally, clear the event queue to prevent processing stale events
        while not self._event_queue.empty():
            self._event_queue.get_nowait()
        while not self._input_guardrail_queue.empty():
            self._input_guardrail_queue.get_nowait()

    elif mode == "after_turn":
        # Soft cancel - just set the flag
        # The streaming loop will check this and stop gracefully
        # Don't call _cleanup_tasks() or clear queues yet
        pass

stream_events async

stream_events() -> AsyncIterator[StreamEvent]

Stream deltas for new items as they are generated. We're using the types from the OpenAI Responses API, so these are semantic events: each event has a type field that describes the type of the event, along with the data for that event.

This will raise: - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit. - A GuardrailTripwireTriggered exception if a guardrail is tripped.

Source code in src/agents/result.py
async def stream_events(self) -> AsyncIterator[StreamEvent]:
    """Stream deltas for new items as they are generated. We're using the types from the
    OpenAI Responses API, so these are semantic events: each event has a `type` field that
    describes the type of the event, along with the data for that event.

    This will raise:
    - A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
    - A GuardrailTripwireTriggered exception if a guardrail is tripped.
    """
    try:
        while True:
            self._check_errors()
            if self._stored_exception:
                logger.debug("Breaking due to stored exception")
                self.is_complete = True
                break

            if self.is_complete and self._event_queue.empty():
                break

            try:
                item = await self._event_queue.get()
            except asyncio.CancelledError:
                break

            if isinstance(item, QueueCompleteSentinel):
                # Await input guardrails if they are still running, so late
                # exceptions are captured.
                await self._await_task_safely(self._input_guardrails_task)

                self._event_queue.task_done()

                # Check for errors, in case the queue was completed
                # due to an exception
                self._check_errors()
                break

            yield item
            self._event_queue.task_done()
    finally:
        # Ensure main execution completes before cleanup to avoid race conditions
        # with session operations
        await self._await_task_safely(self._run_impl_task)
        # Safely terminate all background tasks after main execution has finished
        self._cleanup_tasks()

    if self._stored_exception:
        raise self._stored_exception

release_agents

release_agents(*, release_new_items: bool = True) -> None

Release strong references to agents held by this result. After calling this method, accessing item.agent or last_agent may return None if the agent has been garbage collected. Callers can use this when they are done inspecting the result and want to eagerly drop any associated agent graph.

Source code in src/agents/result.py
def release_agents(self, *, release_new_items: bool = True) -> None:
    """
    Release strong references to agents held by this result. After calling this method,
    accessing `item.agent` or `last_agent` may return `None` if the agent has been garbage
    collected. Callers can use this when they are done inspecting the result and want to
    eagerly drop any associated agent graph.
    """
    if release_new_items:
        for item in self.new_items:
            release = getattr(item, "release_agent", None)
            if callable(release):
                release()
    self._release_last_agent_reference()

final_output_as

final_output_as(
    cls: type[T], raise_if_incorrect_type: bool = False
) -> T

A convenience method to cast the final output to a specific type. By default, the cast is only for the typechecker. If you set raise_if_incorrect_type to True, we'll raise a TypeError if the final output is not of the given type.

Parameters:

Name Type Description Default
cls type[T]

The type to cast the final output to.

required
raise_if_incorrect_type bool

If True, we'll raise a TypeError if the final output is not of the given type.

False

Returns:

Type Description
T

The final output casted to the given type.

Source code in src/agents/result.py
def final_output_as(self, cls: type[T], raise_if_incorrect_type: bool = False) -> T:
    """A convenience method to cast the final output to a specific type. By default, the cast
    is only for the typechecker. If you set `raise_if_incorrect_type` to True, we'll raise a
    TypeError if the final output is not of the given type.

    Args:
        cls: The type to cast the final output to.
        raise_if_incorrect_type: If True, we'll raise a TypeError if the final output is not of
            the given type.

    Returns:
        The final output casted to the given type.
    """
    if raise_if_incorrect_type and not isinstance(self.final_output, cls):
        raise TypeError(f"Final output is not of type {cls.__name__}")

    return cast(T, self.final_output)

to_input_list

to_input_list() -> list[TResponseInputItem]

Creates a new input list, merging the original input with all the new items generated.

Source code in src/agents/result.py
def to_input_list(self) -> list[TResponseInputItem]:
    """Creates a new input list, merging the original input with all the new items generated."""
    original_items: list[TResponseInputItem] = ItemHelpers.input_to_new_input_list(self.input)
    new_items = [item.to_input_item() for item in self.new_items]

    return original_items + new_items