class OpenAIResponsesModel(Model):
"""
Implementation of `Model` that uses the OpenAI Responses API.
"""
def __init__(
self,
model: str | ChatModel,
openai_client: AsyncOpenAI,
) -> None:
self.model = model
self._client = openai_client
def _non_null_or_not_given(self, value: Any) -> Any:
return value if value is not None else NOT_GIVEN
async def get_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
tracing: ModelTracing,
) -> ModelResponse:
with response_span(disabled=tracing.is_disabled()) as span_response:
try:
response = await self._fetch_response(
system_instructions,
input,
model_settings,
tools,
output_schema,
handoffs,
stream=False,
)
if _debug.DONT_LOG_MODEL_DATA:
logger.debug("LLM responsed")
else:
logger.debug(
"LLM resp:\n"
f"{json.dumps([x.model_dump() for x in response.output], indent=2)}\n"
)
usage = (
Usage(
requests=1,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
total_tokens=response.usage.total_tokens,
)
if response.usage
else Usage()
)
if tracing.include_data():
span_response.span_data.response = response
span_response.span_data.input = input
except Exception as e:
span_response.set_error(
SpanError(
message="Error getting response",
data={
"error": str(e) if tracing.include_data() else e.__class__.__name__,
},
)
)
logger.error(f"Error getting response: {e}")
raise
return ModelResponse(
output=response.output,
usage=usage,
referenceable_id=response.id,
)
async def stream_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
tracing: ModelTracing,
) -> AsyncIterator[ResponseStreamEvent]:
"""
Yields a partial message as it is generated, as well as the usage information.
"""
with response_span(disabled=tracing.is_disabled()) as span_response:
try:
stream = await self._fetch_response(
system_instructions,
input,
model_settings,
tools,
output_schema,
handoffs,
stream=True,
)
final_response: Response | None = None
async for chunk in stream:
if isinstance(chunk, ResponseCompletedEvent):
final_response = chunk.response
yield chunk
if final_response and tracing.include_data():
span_response.span_data.response = final_response
span_response.span_data.input = input
except Exception as e:
span_response.set_error(
SpanError(
message="Error streaming response",
data={
"error": str(e) if tracing.include_data() else e.__class__.__name__,
},
)
)
logger.error(f"Error streaming response: {e}")
raise
@overload
async def _fetch_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
stream: Literal[True],
) -> AsyncStream[ResponseStreamEvent]: ...
@overload
async def _fetch_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
stream: Literal[False],
) -> Response: ...
async def _fetch_response(
self,
system_instructions: str | None,
input: str | list[TResponseInputItem],
model_settings: ModelSettings,
tools: list[Tool],
output_schema: AgentOutputSchema | None,
handoffs: list[Handoff],
stream: Literal[True] | Literal[False] = False,
) -> Response | AsyncStream[ResponseStreamEvent]:
list_input = ItemHelpers.input_to_new_input_list(input)
parallel_tool_calls = (
True if model_settings.parallel_tool_calls and tools and len(tools) > 0 else NOT_GIVEN
)
tool_choice = Converter.convert_tool_choice(model_settings.tool_choice)
converted_tools = Converter.convert_tools(tools, handoffs)
response_format = Converter.get_response_format(output_schema)
if _debug.DONT_LOG_MODEL_DATA:
logger.debug("Calling LLM")
else:
logger.debug(
f"Calling LLM {self.model} with input:\n"
f"{json.dumps(list_input, indent=2)}\n"
f"Tools:\n{json.dumps(converted_tools.tools, indent=2)}\n"
f"Stream: {stream}\n"
f"Tool choice: {tool_choice}\n"
f"Response format: {response_format}\n"
)
return await self._client.responses.create(
instructions=self._non_null_or_not_given(system_instructions),
model=self.model,
input=list_input,
include=converted_tools.includes,
tools=converted_tools.tools,
temperature=self._non_null_or_not_given(model_settings.temperature),
top_p=self._non_null_or_not_given(model_settings.top_p),
truncation=self._non_null_or_not_given(model_settings.truncation),
tool_choice=tool_choice,
parallel_tool_calls=parallel_tool_calls,
stream=stream,
extra_headers=_HEADERS,
text=response_format,
)
def _get_client(self) -> AsyncOpenAI:
if self._client is None:
self._client = AsyncOpenAI()
return self._client