跳转至

运行智能体

你可以通过 Runner 类来运行智能体。你有 3 种选择:

  1. Runner.run(),异步运行并返回一个 RunResult
  2. Runner.run_sync(),同步方法,本质上在内部调用 .run()
  3. Runner.run_streamed(),异步运行并返回一个 RunResultStreaming。它以流式模式调用 LLM,并在接收到事件时将其流式传输给你。
from agents import Agent, Runner

async def main():
    agent = Agent(name="Assistant", instructions="You are a helpful assistant")

    result = await Runner.run(agent, "Write a haiku about recursion in programming.")
    print(result.final_output)
    # Code within the code,
    # Functions calling themselves,
    # Infinite loop's dance

结果指南中阅读更多内容。

智能体循环

当你在 Runner 中使用 run 方法时,你需要传入一个起始智能体和输入。输入可以是字符串(视为用户消息),也可以是一组输入项列表,即 OpenAI Responses API 中的项目。

runner 随后运行一个循环:

  1. 我们使用当前输入为当前智能体调用 LLM。
  2. LLM 生成输出。
    1. 如果 LLM 返回 final_output,循环结束并返回结果。
    2. 如果 LLM 进行任务转移,我们会更新当前智能体和输入,并重新运行循环。
    3. 如果 LLM 产生工具调用,我们会运行这些工具调用,附加结果,并重新运行循环。
  3. 如果超过传入的 max_turns,我们会抛出一个 MaxTurnsExceeded 异常。

Note

判断 LLM 输出是否为“最终输出”的规则是:它生成了所需类型的文本输出,且没有工具调用。

流式传输

流式传输允许你在 LLM 运行时额外接收流式事件。流结束后,RunResultStreaming 将包含有关此次运行的完整信息,包括所有新产生的输出。你可以调用 .stream_events() 获取流式事件。更多信息请阅读流式传输指南

运行配置

run_config 参数允许你为智能体运行配置一些全局设置:

  • model:允许设置一个全局使用的 LLM 模型,而不管每个 Agent 的 model 是什么。
  • model_provider:用于查找模型名称的模型提供方,默认为 OpenAI。
  • model_settings:覆盖智能体特定设置。例如你可以设置全局的 temperaturetop_p
  • input_guardrails, output_guardrails:要在所有运行中包含的输入或输出安全防护措施列表。
  • handoff_input_filter:应用于所有任务转移的全局输入过滤器(如果该任务转移尚未设置)。输入过滤器允许你编辑发送给新智能体的输入。更多细节见 Handoff.input_filter 的文档。
  • tracing_disabled:允许你为整个运行禁用追踪
  • trace_include_sensitive_data:配置追踪中是否包含潜在敏感数据,例如 LLM 与工具调用的输入/输出。
  • workflow_name, trace_id, group_id:为此次运行设置追踪工作流名称、追踪 ID 和追踪分组 ID。我们建议至少设置 workflow_name。分组 ID 是可选字段,可用于跨多个运行关联追踪。
  • trace_metadata:要包含在所有追踪上的元数据。

会话/聊天线程

调用任一运行方法都可能导致一个或多个智能体运行(因此一个或多个 LLM 调用),但它表示聊天会话中的单个逻辑轮次。例如:

  1. 用户轮次:用户输入文本
  2. Runner 运行:第一个智能体调用 LLM,运行工具,进行一次任务转移到第二个智能体,第二个智能体运行更多工具,然后生成输出。

在智能体运行结束时,你可以选择向用户展示什么。例如,你可以向用户展示智能体生成的每个新条目,或仅展示最终输出。无论哪种方式,用户都可能继续追问,此时你可以再次调用 run 方法。

手动会话管理

你可以使用 RunResultBase.to_input_list() 手动管理会话历史,以获取下一轮的输入:

async def main():
    agent = Agent(name="Assistant", instructions="Reply very concisely.")

    thread_id = "thread_123"  # Example thread ID
    with trace(workflow_name="Conversation", group_id=thread_id):
        # First turn
        result = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
        print(result.final_output)
        # San Francisco

        # Second turn
        new_input = result.to_input_list() + [{"role": "user", "content": "What state is it in?"}]
        result = await Runner.run(agent, new_input)
        print(result.final_output)
        # California

使用 Sessions 的自动会话管理

更简单的方式是使用 Sessions 自动处理会话历史,而无需手动调用 .to_input_list()

from agents import Agent, Runner, SQLiteSession

async def main():
    agent = Agent(name="Assistant", instructions="Reply very concisely.")

    # Create session instance
    session = SQLiteSession("conversation_123")

    thread_id = "thread_123"  # Example thread ID
    with trace(workflow_name="Conversation", group_id=thread_id):
        # First turn
        result = await Runner.run(agent, "What city is the Golden Gate Bridge in?", session=session)
        print(result.final_output)
        # San Francisco

        # Second turn - agent automatically remembers previous context
        result = await Runner.run(agent, "What state is it in?", session=session)
        print(result.final_output)
        # California

Sessions 会自动:

  • 在每次运行前检索会话历史
  • 在每次运行后存储新消息
  • 为不同的 session ID 维护独立会话

更多细节见Sessions 文档

服务管理的会话

你也可以让 OpenAI 会话状态功能在服务端管理会话状态,而不是在本地通过 to_input_list()Sessions 进行处理。这样可以在不手动重发所有历史消息的情况下保留会话历史。更多信息请参阅 OpenAI Conversation state 指南

OpenAI 提供两种跨轮次追踪状态的方式:

1. 使用 conversation_id

你首先使用 OpenAI Conversations API 创建一个会话,然后在后续每次调用中复用其 ID:

from agents import Agent, Runner
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def main():
    # Create a server-managed conversation
    conversation = await client.conversations.create()
    conv_id = conversation.id    

    agent = Agent(name="Assistant", instructions="Reply very concisely.")

    # First turn
    result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?", conversation_id=conv_id)
    print(result1.final_output)
    # San Francisco

    # Second turn reuses the same conversation_id
    result2 = await Runner.run(
        agent,
        "What state is it in?",
        conversation_id=conv_id,
    )
    print(result2.final_output)
    # California

2. 使用 previous_response_id

另一种选择是响应链式(response chaining),每一轮显式地链接到上一轮的响应 ID。

from agents import Agent, Runner

async def main():
    agent = Agent(name="Assistant", instructions="Reply very concisely.")

    # First turn
    result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
    print(result1.final_output)
    # San Francisco

    # Second turn, chained to the previous response
    result2 = await Runner.run(
        agent,
        "What state is it in?",
        previous_response_id=result1.last_response_id,
    )
    print(result2.final_output)
    # California

长时运行的智能体与人机协同

你可以使用 Agents SDK 与 Temporal 的集成来运行持久的、长时运行的工作流,包括人机协同任务。查看此视频中 Temporal 与 Agents SDK 协同完成长时任务的演示,以及此处的文档

异常

SDK 在特定情况下会抛出异常。完整列表见 agents.exceptions。概览如下:

  • AgentsException:这是 SDK 内抛出的所有异常的基类。它作为通用类型,其他所有特定异常都从它派生。
  • MaxTurnsExceeded:当智能体的运行超过传给 Runner.runRunner.run_syncRunner.run_streamed 方法的 max_turns 限制时抛出。表示智能体无法在指定的交互轮次内完成任务。
  • ModelBehaviorError:当底层模型(LLM)产生出乎意料或无效的输出时发生。这可能包括:
    • JSON 格式错误:当模型为工具调用或其直接输出提供了格式错误的 JSON,尤其是在指定了特定 output_type 的情况下。
    • 意外的工具相关失败:当模型未按预期方式使用工具
  • UserError:当你(使用 SDK 编写代码的人)在使用 SDK 时出错会抛出该异常。通常由不正确的代码实现、无效的配置或误用 SDK 的 API 引起。
  • InputGuardrailTripwireTriggered, OutputGuardrailTripwireTriggered:当输入安全防护措施或输出安全防护措施的条件分别被触发时抛出。输入安全防护措施在处理前检查传入消息,而输出安全防护措施在交付前检查智能体的最终响应。