运行智能体
你可以通过 Runner
类来运行智能体。你有 3 种选择:
Runner.run()
,异步运行并返回一个RunResult
。Runner.run_sync()
,同步方法,本质上在内部调用.run()
。Runner.run_streamed()
,异步运行并返回一个RunResultStreaming
。它以流式模式调用 LLM,并在接收到事件时将其流式传输给你。
from agents import Agent, Runner
async def main():
agent = Agent(name="Assistant", instructions="You are a helpful assistant")
result = await Runner.run(agent, "Write a haiku about recursion in programming.")
print(result.final_output)
# Code within the code,
# Functions calling themselves,
# Infinite loop's dance
在结果指南中阅读更多内容。
智能体循环
当你在 Runner
中使用 run 方法时,你需要传入一个起始智能体和输入。输入可以是字符串(视为用户消息),也可以是一组输入项列表,即 OpenAI Responses API 中的项目。
runner 随后运行一个循环:
- 我们使用当前输入为当前智能体调用 LLM。
- LLM 生成输出。
- 如果 LLM 返回
final_output
,循环结束并返回结果。 - 如果 LLM 进行任务转移,我们会更新当前智能体和输入,并重新运行循环。
- 如果 LLM 产生工具调用,我们会运行这些工具调用,附加结果,并重新运行循环。
- 如果 LLM 返回
- 如果超过传入的
max_turns
,我们会抛出一个MaxTurnsExceeded
异常。
Note
判断 LLM 输出是否为“最终输出”的规则是:它生成了所需类型的文本输出,且没有工具调用。
流式传输
流式传输允许你在 LLM 运行时额外接收流式事件。流结束后,RunResultStreaming
将包含有关此次运行的完整信息,包括所有新产生的输出。你可以调用 .stream_events()
获取流式事件。更多信息请阅读流式传输指南。
运行配置
run_config
参数允许你为智能体运行配置一些全局设置:
model
:允许设置一个全局使用的 LLM 模型,而不管每个 Agent 的model
是什么。model_provider
:用于查找模型名称的模型提供方,默认为 OpenAI。model_settings
:覆盖智能体特定设置。例如你可以设置全局的temperature
或top_p
。input_guardrails
,output_guardrails
:要在所有运行中包含的输入或输出安全防护措施列表。handoff_input_filter
:应用于所有任务转移的全局输入过滤器(如果该任务转移尚未设置)。输入过滤器允许你编辑发送给新智能体的输入。更多细节见Handoff.input_filter
的文档。tracing_disabled
:允许你为整个运行禁用追踪。trace_include_sensitive_data
:配置追踪中是否包含潜在敏感数据,例如 LLM 与工具调用的输入/输出。workflow_name
,trace_id
,group_id
:为此次运行设置追踪工作流名称、追踪 ID 和追踪分组 ID。我们建议至少设置workflow_name
。分组 ID 是可选字段,可用于跨多个运行关联追踪。trace_metadata
:要包含在所有追踪上的元数据。
会话/聊天线程
调用任一运行方法都可能导致一个或多个智能体运行(因此一个或多个 LLM 调用),但它表示聊天会话中的单个逻辑轮次。例如:
- 用户轮次:用户输入文本
- Runner 运行:第一个智能体调用 LLM,运行工具,进行一次任务转移到第二个智能体,第二个智能体运行更多工具,然后生成输出。
在智能体运行结束时,你可以选择向用户展示什么。例如,你可以向用户展示智能体生成的每个新条目,或仅展示最终输出。无论哪种方式,用户都可能继续追问,此时你可以再次调用 run 方法。
手动会话管理
你可以使用 RunResultBase.to_input_list()
手动管理会话历史,以获取下一轮的输入:
async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")
thread_id = "thread_123" # Example thread ID
with trace(workflow_name="Conversation", group_id=thread_id):
# First turn
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result.final_output)
# San Francisco
# Second turn
new_input = result.to_input_list() + [{"role": "user", "content": "What state is it in?"}]
result = await Runner.run(agent, new_input)
print(result.final_output)
# California
使用 Sessions 的自动会话管理
更简单的方式是使用 Sessions 自动处理会话历史,而无需手动调用 .to_input_list()
:
from agents import Agent, Runner, SQLiteSession
async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")
# Create session instance
session = SQLiteSession("conversation_123")
thread_id = "thread_123" # Example thread ID
with trace(workflow_name="Conversation", group_id=thread_id):
# First turn
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?", session=session)
print(result.final_output)
# San Francisco
# Second turn - agent automatically remembers previous context
result = await Runner.run(agent, "What state is it in?", session=session)
print(result.final_output)
# California
Sessions 会自动:
- 在每次运行前检索会话历史
- 在每次运行后存储新消息
- 为不同的 session ID 维护独立会话
更多细节见Sessions 文档。
服务管理的会话
你也可以让 OpenAI 会话状态功能在服务端管理会话状态,而不是在本地通过 to_input_list()
或 Sessions
进行处理。这样可以在不手动重发所有历史消息的情况下保留会话历史。更多信息请参阅 OpenAI Conversation state 指南。
OpenAI 提供两种跨轮次追踪状态的方式:
1. 使用 conversation_id
你首先使用 OpenAI Conversations API 创建一个会话,然后在后续每次调用中复用其 ID:
from agents import Agent, Runner
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
# Create a server-managed conversation
conversation = await client.conversations.create()
conv_id = conversation.id
agent = Agent(name="Assistant", instructions="Reply very concisely.")
# First turn
result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?", conversation_id=conv_id)
print(result1.final_output)
# San Francisco
# Second turn reuses the same conversation_id
result2 = await Runner.run(
agent,
"What state is it in?",
conversation_id=conv_id,
)
print(result2.final_output)
# California
2. 使用 previous_response_id
另一种选择是响应链式(response chaining),每一轮显式地链接到上一轮的响应 ID。
from agents import Agent, Runner
async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")
# First turn
result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result1.final_output)
# San Francisco
# Second turn, chained to the previous response
result2 = await Runner.run(
agent,
"What state is it in?",
previous_response_id=result1.last_response_id,
)
print(result2.final_output)
# California
长时运行的智能体与人机协同
你可以使用 Agents SDK 与 Temporal 的集成来运行持久的、长时运行的工作流,包括人机协同任务。查看此视频中 Temporal 与 Agents SDK 协同完成长时任务的演示,以及此处的文档。
异常
SDK 在特定情况下会抛出异常。完整列表见 agents.exceptions
。概览如下:
AgentsException
:这是 SDK 内抛出的所有异常的基类。它作为通用类型,其他所有特定异常都从它派生。MaxTurnsExceeded
:当智能体的运行超过传给Runner.run
、Runner.run_sync
或Runner.run_streamed
方法的max_turns
限制时抛出。表示智能体无法在指定的交互轮次内完成任务。ModelBehaviorError
:当底层模型(LLM)产生出乎意料或无效的输出时发生。这可能包括:- JSON 格式错误:当模型为工具调用或其直接输出提供了格式错误的 JSON,尤其是在指定了特定
output_type
的情况下。 - 意外的工具相关失败:当模型未按预期方式使用工具
- JSON 格式错误:当模型为工具调用或其直接输出提供了格式错误的 JSON,尤其是在指定了特定
UserError
:当你(使用 SDK 编写代码的人)在使用 SDK 时出错会抛出该异常。通常由不正确的代码实现、无效的配置或误用 SDK 的 API 引起。InputGuardrailTripwireTriggered
,OutputGuardrailTripwireTriggered
:当输入安全防护措施或输出安全防护措施的条件分别被触发时抛出。输入安全防护措施在处理前检查传入消息,而输出安全防护措施在交付前检查智能体的最终响应。