运行智能体
你可以通过 Runner 类来运行智能体。你有 3 个选项:
Runner.run():异步运行并返回一个RunResult。Runner.run_sync():同步方法,内部调用.run()。Runner.run_streamed():异步运行并返回一个RunResultStreaming。它以流式方式调用 LLM,并在接收时将事件流式传输给你。
from agents import Agent, Runner
async def main():
agent = Agent(name="Assistant", instructions="You are a helpful assistant")
result = await Runner.run(agent, "Write a haiku about recursion in programming.")
print(result.final_output)
# Code within the code,
# Functions calling themselves,
# Infinite loop's dance
在结果指南中了解更多。
智能体循环
当你在 Runner 中使用 run 方法时,你需要传入一个起始智能体和输入。输入可以是字符串(视为用户消息),也可以是输入项列表——这些是 OpenAI Responses API 中的项。
runner 随后运行一个循环:
- 我们使用当前输入,为当前智能体调用 LLM。
- LLM 生成输出。
- 如果 LLM 返回
final_output,循环结束并返回结果。 - 如果 LLM 进行任务转移,我们会更新当前智能体和输入,并重新运行循环。
- 如果 LLM 产生工具调用,我们会运行这些工具调用,追加结果,并重新运行循环。
- 如果 LLM 返回
- 如果超过传入的
max_turns,我们会抛出一个MaxTurnsExceeded异常。
Note
判断 LLM 输出是否为“最终输出”的规则是:它生成了所需类型的文本输出,且不存在工具调用。
流式传输
流式传输允许你在 LLM 运行时额外接收流事件。流结束后,RunResultStreaming 将包含关于这次运行的完整信息,包括所有新产生的输出。你可以调用 .stream_events() 获取流事件。详见流式传输指南。
运行配置
run_config 参数可让你为智能体运行配置一些全局设置:
model:允许设置一个全局的 LLM 模型使用,而不受每个 Agent 的model限制。model_provider:用于查找模型名称的模型提供方,默认是 OpenAI。model_settings:覆盖智能体特定设置。例如,你可以设置全局的temperature或top_p。input_guardrails、output_guardrails:在所有运行中包含的输入或输出安全防护措施列表。handoff_input_filter:应用于所有任务转移的全局输入过滤器(如果该任务转移尚未定义)。输入过滤器允许你编辑发送给新智能体的输入。详见Handoff.input_filter的文档。tracing_disabled:允许你为整个运行禁用追踪。trace_include_sensitive_data:配置追踪中是否包含潜在敏感数据,例如 LLM 和工具调用的输入/输出。workflow_name、trace_id、group_id:为本次运行设置追踪的工作流名称、追踪 ID 和追踪分组 ID。我们建议至少设置workflow_name。分组 ID 是可选字段,用于在多次运行之间关联追踪。trace_metadata:要包含在所有追踪中的元数据。
会话/聊天线程
调用任一运行方法可能导致一个或多个智能体运行(因此可能有一次或多次 LLM 调用),但它代表一次聊天会话中的单个逻辑轮次。例如:
- 用户轮次:用户输入文本
- Runner 运行:第一个智能体调用 LLM、运行工具、进行一次任务转移到第二个智能体,第二个智能体运行更多工具,然后生成输出。
在智能体运行结束时,你可以选择展示给用户的内容。例如,你可以向用户展示由智能体生成的每个新条目,或仅展示最终输出。无论哪种方式,用户都可能提出后续问题,此时你可以再次调用 run 方法。
手动会话管理
你可以使用 RunResultBase.to_input_list() 方法手动管理会话历史,以获取下一轮的输入:
async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")
thread_id = "thread_123" # Example thread ID
with trace(workflow_name="Conversation", group_id=thread_id):
# First turn
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result.final_output)
# San Francisco
# Second turn
new_input = result.to_input_list() + [{"role": "user", "content": "What state is it in?"}]
result = await Runner.run(agent, new_input)
print(result.final_output)
# California
使用 Sessions 的自动会话管理
如果需要更简单的方式,你可以使用 Sessions 自动处理会话历史,而无需手动调用 .to_input_list():
from agents import Agent, Runner, SQLiteSession
async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")
# Create session instance
session = SQLiteSession("conversation_123")
thread_id = "thread_123" # Example thread ID
with trace(workflow_name="Conversation", group_id=thread_id):
# First turn
result = await Runner.run(agent, "What city is the Golden Gate Bridge in?", session=session)
print(result.final_output)
# San Francisco
# Second turn - agent automatically remembers previous context
result = await Runner.run(agent, "What state is it in?", session=session)
print(result.final_output)
# California
Sessions 会自动:
- 在每次运行前获取会话历史
- 在每次运行后存储新消息
- 为不同的会话 ID 维护独立的会话
更多详情见Sessions 文档。
由服务端管理的会话
你也可以让 OpenAI 的会话状态功能在服务端管理会话状态,而不是使用 to_input_list() 或 Sessions 在本地处理。这样可以在无需手动重发所有历史消息的情况下保留会话历史。更多详情见 OpenAI Conversation state 指南。
OpenAI 提供两种跨轮次跟踪状态的方法:
1. 使用 conversation_id
你首先使用 OpenAI Conversations API 创建一个会话,然后在后续的每次调用中复用其 ID:
from agents import Agent, Runner
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def main():
# Create a server-managed conversation
conversation = await client.conversations.create()
conv_id = conversation.id
agent = Agent(name="Assistant", instructions="Reply very concisely.")
# First turn
result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?", conversation_id=conv_id)
print(result1.final_output)
# San Francisco
# Second turn reuses the same conversation_id
result2 = await Runner.run(
agent,
"What state is it in?",
conversation_id=conv_id,
)
print(result2.final_output)
# California
2. 使用 previous_response_id
另一种选择是响应链(response chaining),其中每一轮显式链接到上一轮的响应 ID。
from agents import Agent, Runner
async def main():
agent = Agent(name="Assistant", instructions="Reply very concisely.")
# First turn
result1 = await Runner.run(agent, "What city is the Golden Gate Bridge in?")
print(result1.final_output)
# San Francisco
# Second turn, chained to the previous response
result2 = await Runner.run(
agent,
"What state is it in?",
previous_response_id=result1.last_response_id,
)
print(result2.final_output)
# California
长运行智能体与人类参与
你可以使用 Agents SDK 的 Temporal 集成来运行持久的、长时间运行的工作流,包括人类参与的任务。观看 Temporal 与 Agents SDK 协同完成长时任务的演示视频,并查看文档。
异常
SDK 在某些情况下会抛出异常。完整列表见 agents.exceptions。概览如下:
AgentsException:这是 SDK 内抛出的所有异常的基类。它作为通用类型,其他特定异常都从它派生。MaxTurnsExceeded:当智能体运行超过传递给Runner.run、Runner.run_sync或Runner.run_streamed的max_turns限制时抛出该异常。它表示智能体无法在指定的交互轮次内完成任务。ModelBehaviorError:当底层模型(LLM)产生意外或无效输出时发生。包括:- JSON 结构不合法:当模型为工具调用或其直接输出提供了格式错误的 JSON,特别是在定义了特定
output_type时。 - 与工具相关的意外失败:当模型未按预期方式使用工具时
- JSON 结构不合法:当模型为工具调用或其直接输出提供了格式错误的 JSON,特别是在定义了特定
UserError:当你(使用该 SDK 编写代码的人)在使用 SDK 时出现错误会抛出此异常。通常由代码实现不正确、配置无效或误用 SDK 的 API 导致。InputGuardrailTripwireTriggered、OutputGuardrailTripwireTriggered:当输入安全防护措施或输出安全防护措施的条件被满足时分别抛出该异常。输入安全防护措施在处理前检查传入消息,而输出安全防护措施在交付前检查智能体的最终响应。