콘텐츠로 이동

OpenAI Responses model

ResponsesWebSocketError

Bases: RuntimeError

Error raised for websocket transport error frames.

Source code in src/agents/models/openai_responses.py
class ResponsesWebSocketError(RuntimeError):
    """Error raised for websocket transport error frames."""

    def __init__(self, payload: Mapping[str, Any]):
        event_type = str(payload.get("type") or "error")
        self.event_type = event_type
        self.payload = dict(payload)

        error_data = payload.get("error")
        error_obj = error_data if isinstance(error_data, Mapping) else {}
        self.code = self._coerce_optional_str(error_obj.get("code"))
        self.error_type = self._coerce_optional_str(error_obj.get("type"))
        self.request_id = self._coerce_optional_str(
            payload.get("request_id") or error_obj.get("request_id")
        )
        self.error_message = self._coerce_optional_str(error_obj.get("message"))

        prefix = (
            "Responses websocket error"
            if event_type == "error"
            else f"Responses websocket {event_type}"
        )
        super().__init__(f"{prefix}: {json.dumps(payload, default=_json_dumps_default)}")

    @staticmethod
    def _coerce_optional_str(value: Any) -> str | None:
        return value if isinstance(value, str) else None

OpenAIResponsesModel

Bases: Model

Implementation of Model that uses the OpenAI Responses API.

Source code in src/agents/models/openai_responses.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
class OpenAIResponsesModel(Model):
    """
    Implementation of `Model` that uses the OpenAI Responses API.
    """

    def __init__(
        self,
        model: str | ChatModel,
        openai_client: AsyncOpenAI,
        *,
        model_is_explicit: bool = True,
    ) -> None:
        self.model = model
        self._model_is_explicit = model_is_explicit
        self._client = openai_client

    def _non_null_or_omit(self, value: Any) -> Any:
        return value if value is not None else omit

    async def _maybe_aclose_async_iterator(self, iterator: Any) -> None:
        aclose = getattr(iterator, "aclose", None)
        if callable(aclose):
            await aclose()
            return

        close = getattr(iterator, "close", None)
        if callable(close):
            close_result = close()
            if inspect.isawaitable(close_result):
                await close_result

    def _schedule_async_iterator_close(self, iterator: Any) -> None:
        task = asyncio.create_task(self._maybe_aclose_async_iterator(iterator))
        task.add_done_callback(self._consume_background_cleanup_task_result)

    @staticmethod
    def _consume_background_cleanup_task_result(task: asyncio.Task[Any]) -> None:
        try:
            task.result()
        except asyncio.CancelledError:
            pass
        except Exception as exc:
            logger.debug(f"Background stream cleanup failed after cancellation: {exc}")

    async def get_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        tracing: ModelTracing,
        previous_response_id: str | None = None,
        conversation_id: str | None = None,
        prompt: ResponsePromptParam | None = None,
    ) -> ModelResponse:
        with response_span(disabled=tracing.is_disabled()) as span_response:
            try:
                response = await self._fetch_response(
                    system_instructions,
                    input,
                    model_settings,
                    tools,
                    output_schema,
                    handoffs,
                    previous_response_id=previous_response_id,
                    conversation_id=conversation_id,
                    stream=False,
                    prompt=prompt,
                )

                if _debug.DONT_LOG_MODEL_DATA:
                    logger.debug("LLM responded")
                else:
                    logger.debug(
                        "LLM resp:\n"
                        f"""{
                            json.dumps(
                                [x.model_dump() for x in response.output],
                                indent=2,
                                ensure_ascii=False,
                            )
                        }\n"""
                    )

                usage = (
                    Usage(
                        requests=1,
                        input_tokens=response.usage.input_tokens,
                        output_tokens=response.usage.output_tokens,
                        total_tokens=response.usage.total_tokens,
                        input_tokens_details=response.usage.input_tokens_details,
                        output_tokens_details=response.usage.output_tokens_details,
                    )
                    if response.usage
                    else Usage()
                )

                if tracing.include_data():
                    span_response.span_data.response = response
                    span_response.span_data.input = input
            except Exception as e:
                span_response.set_error(
                    SpanError(
                        message="Error getting response",
                        data={
                            "error": str(e) if tracing.include_data() else e.__class__.__name__,
                        },
                    )
                )
                request_id = getattr(e, "request_id", None)
                logger.error(f"Error getting response: {e}. (request_id: {request_id})")
                raise

        return ModelResponse(
            output=response.output,
            usage=usage,
            response_id=response.id,
        )

    async def stream_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        tracing: ModelTracing,
        previous_response_id: str | None = None,
        conversation_id: str | None = None,
        prompt: ResponsePromptParam | None = None,
    ) -> AsyncIterator[ResponseStreamEvent]:
        """
        Yields a partial message as it is generated, as well as the usage information.
        """
        with response_span(disabled=tracing.is_disabled()) as span_response:
            try:
                stream = await self._fetch_response(
                    system_instructions,
                    input,
                    model_settings,
                    tools,
                    output_schema,
                    handoffs,
                    previous_response_id=previous_response_id,
                    conversation_id=conversation_id,
                    stream=True,
                    prompt=prompt,
                )

                final_response: Response | None = None
                yielded_terminal_event = False
                close_stream_in_background = False
                try:
                    async for chunk in stream:
                        chunk_type = getattr(chunk, "type", None)
                        if isinstance(chunk, ResponseCompletedEvent):
                            final_response = chunk.response
                        elif chunk_type in {
                            "response.failed",
                            "response.incomplete",
                        }:
                            terminal_response = getattr(chunk, "response", None)
                            if isinstance(terminal_response, Response):
                                final_response = terminal_response
                        if chunk_type in {
                            "response.completed",
                            "response.failed",
                            "response.incomplete",
                            "response.error",
                        }:
                            yielded_terminal_event = True
                        yield chunk
                except asyncio.CancelledError:
                    close_stream_in_background = True
                    self._schedule_async_iterator_close(stream)
                    raise
                finally:
                    if not close_stream_in_background:
                        try:
                            await self._maybe_aclose_async_iterator(stream)
                        except Exception as exc:
                            if yielded_terminal_event:
                                logger.debug(
                                    f"Ignoring stream cleanup error after terminal event: {exc}"
                                )
                            else:
                                raise

                if final_response and tracing.include_data():
                    span_response.span_data.response = final_response
                    span_response.span_data.input = input

            except Exception as e:
                span_response.set_error(
                    SpanError(
                        message="Error streaming response",
                        data={
                            "error": str(e) if tracing.include_data() else e.__class__.__name__,
                        },
                    )
                )
                logger.error(f"Error streaming response: {e}")
                raise

    @overload
    async def _fetch_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None,
        conversation_id: str | None,
        stream: Literal[True],
        prompt: ResponsePromptParam | None = None,
    ) -> AsyncIterator[ResponseStreamEvent]: ...

    @overload
    async def _fetch_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None,
        conversation_id: str | None,
        stream: Literal[False],
        prompt: ResponsePromptParam | None = None,
    ) -> Response: ...

    async def _fetch_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None = None,
        conversation_id: str | None = None,
        stream: Literal[True] | Literal[False] = False,
        prompt: ResponsePromptParam | None = None,
    ) -> Response | AsyncIterator[ResponseStreamEvent]:
        response = await self._client.responses.create(
            **self._build_response_create_kwargs(
                system_instructions=system_instructions,
                input=input,
                model_settings=model_settings,
                tools=tools,
                output_schema=output_schema,
                handoffs=handoffs,
                previous_response_id=previous_response_id,
                conversation_id=conversation_id,
                stream=stream,
                prompt=prompt,
            )
        )
        return cast(Union[Response, AsyncIterator[ResponseStreamEvent]], response)

    def _build_response_create_kwargs(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None = None,
        conversation_id: str | None = None,
        stream: bool = False,
        prompt: ResponsePromptParam | None = None,
    ) -> dict[str, Any]:
        list_input = ItemHelpers.input_to_new_input_list(input)
        list_input = _to_dump_compatible(list_input)
        list_input = self._remove_openai_responses_api_incompatible_fields(list_input)

        if model_settings.parallel_tool_calls and tools:
            parallel_tool_calls: bool | Omit = True
        elif model_settings.parallel_tool_calls is False:
            parallel_tool_calls = False
        else:
            parallel_tool_calls = omit

        tool_choice = Converter.convert_tool_choice(model_settings.tool_choice)
        converted_tools = Converter.convert_tools(tools, handoffs)
        converted_tools_payload = _to_dump_compatible(converted_tools.tools)
        response_format = Converter.get_response_format(output_schema)
        should_omit_model = prompt is not None and not self._model_is_explicit
        model_param: str | ChatModel | Omit = self.model if not should_omit_model else omit
        should_omit_tools = prompt is not None and len(converted_tools_payload) == 0
        # In prompt-managed tool flows without local tools payload, omit only named tool choices
        # that must match an explicit tool list. Keep control literals like "none"/"required".
        should_omit_tool_choice = should_omit_tools and isinstance(tool_choice, dict)
        tools_param: list[ToolParam] | Omit = (
            converted_tools_payload if not should_omit_tools else omit
        )
        tool_choice_param: response_create_params.ToolChoice | Omit = (
            tool_choice if not should_omit_tool_choice else omit
        )

        include_set: set[str] = set(converted_tools.includes)
        if model_settings.response_include is not None:
            include_set.update(model_settings.response_include)
        if model_settings.top_logprobs is not None:
            include_set.add("message.output_text.logprobs")
        include = cast(list[ResponseIncludable], list(include_set))

        if _debug.DONT_LOG_MODEL_DATA:
            logger.debug("Calling LLM")
        else:
            input_json = json.dumps(
                list_input,
                indent=2,
                ensure_ascii=False,
            )
            tools_json = json.dumps(
                converted_tools_payload,
                indent=2,
                ensure_ascii=False,
            )
            logger.debug(
                f"Calling LLM {self.model} with input:\n"
                f"{input_json}\n"
                f"Tools:\n{tools_json}\n"
                f"Stream: {stream}\n"
                f"Tool choice: {tool_choice_param}\n"
                f"Response format: {response_format}\n"
                f"Previous response id: {previous_response_id}\n"
                f"Conversation id: {conversation_id}\n"
            )

        extra_args = dict(model_settings.extra_args or {})
        if model_settings.top_logprobs is not None:
            extra_args["top_logprobs"] = model_settings.top_logprobs
        if model_settings.verbosity is not None:
            if response_format is not omit:
                response_format["verbosity"] = model_settings.verbosity  # type: ignore [index]
            else:
                response_format = {"verbosity": model_settings.verbosity}

        stream_param: Literal[True] | Omit = True if stream else omit

        create_kwargs: dict[str, Any] = {
            "previous_response_id": self._non_null_or_omit(previous_response_id),
            "conversation": self._non_null_or_omit(conversation_id),
            "instructions": self._non_null_or_omit(system_instructions),
            "model": model_param,
            "input": list_input,
            "include": include,
            "tools": tools_param,
            "prompt": self._non_null_or_omit(prompt),
            "temperature": self._non_null_or_omit(model_settings.temperature),
            "top_p": self._non_null_or_omit(model_settings.top_p),
            "truncation": self._non_null_or_omit(model_settings.truncation),
            "max_output_tokens": self._non_null_or_omit(model_settings.max_tokens),
            "tool_choice": tool_choice_param,
            "parallel_tool_calls": parallel_tool_calls,
            "stream": cast(Any, stream_param),
            "extra_headers": self._merge_headers(model_settings),
            "extra_query": model_settings.extra_query,
            "extra_body": model_settings.extra_body,
            "text": response_format,
            "store": self._non_null_or_omit(model_settings.store),
            "prompt_cache_retention": self._non_null_or_omit(model_settings.prompt_cache_retention),
            "reasoning": self._non_null_or_omit(model_settings.reasoning),
            "metadata": self._non_null_or_omit(model_settings.metadata),
        }
        duplicate_extra_arg_keys = sorted(set(create_kwargs).intersection(extra_args))
        if duplicate_extra_arg_keys:
            if len(duplicate_extra_arg_keys) == 1:
                key = duplicate_extra_arg_keys[0]
                raise TypeError(
                    f"responses.create() got multiple values for keyword argument '{key}'"
                )
            keys = ", ".join(repr(key) for key in duplicate_extra_arg_keys)
            raise TypeError(f"responses.create() got multiple values for keyword arguments {keys}")
        create_kwargs.update(extra_args)
        return create_kwargs

    def _remove_openai_responses_api_incompatible_fields(self, list_input: list[Any]) -> list[Any]:
        """
        Remove or transform input items that are incompatible with the OpenAI Responses API.

        This data transformation does not always guarantee that items from other provider
        interactions are accepted by the OpenAI Responses API.

        Only items with truthy provider_data are processed.
        This function handles the following incompatibilities:
        - provider_data: Removes fields specific to other providers (e.g., Gemini, Claude).
        - Fake IDs: Removes temporary IDs (FAKE_RESPONSES_ID) that should not be sent to OpenAI.
        - Reasoning items: Filters out provider-specific reasoning items entirely.
        """
        # Early return optimization: if no item has provider_data, return unchanged.
        has_provider_data = any(
            isinstance(item, dict) and item.get("provider_data") for item in list_input
        )
        if not has_provider_data:
            return list_input

        result = []
        for item in list_input:
            cleaned = self._clean_item_for_openai(item)
            if cleaned is not None:
                result.append(cleaned)
        return result

    def _clean_item_for_openai(self, item: Any) -> Any | None:
        # Only process dict items
        if not isinstance(item, dict):
            return item

        # Filter out reasoning items with provider_data (provider-specific reasoning).
        if item.get("type") == "reasoning" and item.get("provider_data"):
            return None

        # Remove fake response ID.
        if item.get("id") == FAKE_RESPONSES_ID:
            del item["id"]

        # Remove provider_data field.
        if "provider_data" in item:
            del item["provider_data"]

        return item

    def _get_client(self) -> AsyncOpenAI:
        if self._client is None:
            self._client = AsyncOpenAI()
        return self._client

    def _merge_headers(self, model_settings: ModelSettings):
        return {
            **_HEADERS,
            **(model_settings.extra_headers or {}),
            **(_HEADERS_OVERRIDE.get() or {}),
        }

stream_response async

stream_response(
    system_instructions: str | None,
    input: str | list[TResponseInputItem],
    model_settings: ModelSettings,
    tools: list[Tool],
    output_schema: AgentOutputSchemaBase | None,
    handoffs: list[Handoff],
    tracing: ModelTracing,
    previous_response_id: str | None = None,
    conversation_id: str | None = None,
    prompt: ResponsePromptParam | None = None,
) -> AsyncIterator[ResponseStreamEvent]

Yields a partial message as it is generated, as well as the usage information.

Source code in src/agents/models/openai_responses.py
async def stream_response(
    self,
    system_instructions: str | None,
    input: str | list[TResponseInputItem],
    model_settings: ModelSettings,
    tools: list[Tool],
    output_schema: AgentOutputSchemaBase | None,
    handoffs: list[Handoff],
    tracing: ModelTracing,
    previous_response_id: str | None = None,
    conversation_id: str | None = None,
    prompt: ResponsePromptParam | None = None,
) -> AsyncIterator[ResponseStreamEvent]:
    """
    Yields a partial message as it is generated, as well as the usage information.
    """
    with response_span(disabled=tracing.is_disabled()) as span_response:
        try:
            stream = await self._fetch_response(
                system_instructions,
                input,
                model_settings,
                tools,
                output_schema,
                handoffs,
                previous_response_id=previous_response_id,
                conversation_id=conversation_id,
                stream=True,
                prompt=prompt,
            )

            final_response: Response | None = None
            yielded_terminal_event = False
            close_stream_in_background = False
            try:
                async for chunk in stream:
                    chunk_type = getattr(chunk, "type", None)
                    if isinstance(chunk, ResponseCompletedEvent):
                        final_response = chunk.response
                    elif chunk_type in {
                        "response.failed",
                        "response.incomplete",
                    }:
                        terminal_response = getattr(chunk, "response", None)
                        if isinstance(terminal_response, Response):
                            final_response = terminal_response
                    if chunk_type in {
                        "response.completed",
                        "response.failed",
                        "response.incomplete",
                        "response.error",
                    }:
                        yielded_terminal_event = True
                    yield chunk
            except asyncio.CancelledError:
                close_stream_in_background = True
                self._schedule_async_iterator_close(stream)
                raise
            finally:
                if not close_stream_in_background:
                    try:
                        await self._maybe_aclose_async_iterator(stream)
                    except Exception as exc:
                        if yielded_terminal_event:
                            logger.debug(
                                f"Ignoring stream cleanup error after terminal event: {exc}"
                            )
                        else:
                            raise

            if final_response and tracing.include_data():
                span_response.span_data.response = final_response
                span_response.span_data.input = input

        except Exception as e:
            span_response.set_error(
                SpanError(
                    message="Error streaming response",
                    data={
                        "error": str(e) if tracing.include_data() else e.__class__.__name__,
                    },
                )
            )
            logger.error(f"Error streaming response: {e}")
            raise

close async

close() -> None

Release any resources held by the model.

Models that maintain persistent connections can override this. The default implementation is a no-op.

Source code in src/agents/models/interface.py
async def close(self) -> None:
    """Release any resources held by the model.

    Models that maintain persistent connections can override this. The default implementation
    is a no-op.
    """
    return None

OpenAIResponsesWSModel

Bases: OpenAIResponsesModel

Implementation of Model that uses the OpenAI Responses API over a websocket transport.

The websocket transport currently sends response.create frames and always streams events. get_response() is implemented by consuming the streamed events until a terminal response event is received.

Source code in src/agents/models/openai_responses.py
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
class OpenAIResponsesWSModel(OpenAIResponsesModel):
    """
    Implementation of `Model` that uses the OpenAI Responses API over a websocket transport.

    The websocket transport currently sends `response.create` frames and always streams events.
    `get_response()` is implemented by consuming the streamed events until a terminal response
    event is received.
    """

    def __init__(
        self,
        model: str | ChatModel,
        openai_client: AsyncOpenAI,
        *,
        model_is_explicit: bool = True,
    ) -> None:
        super().__init__(
            model=model, openai_client=openai_client, model_is_explicit=model_is_explicit
        )
        self._ws_connection: Any | None = None
        self._ws_connection_identity: tuple[str, tuple[tuple[str, str], ...]] | None = None
        self._ws_connection_loop_ref: weakref.ReferenceType[asyncio.AbstractEventLoop] | None = None
        self._ws_request_lock: asyncio.Lock | None = None
        self._ws_request_lock_loop_ref: weakref.ReferenceType[asyncio.AbstractEventLoop] | None = (
            None
        )
        self._ws_client_close_generation = 0

    def _get_ws_request_lock(self) -> asyncio.Lock:
        running_loop = asyncio.get_running_loop()
        if (
            self._ws_request_lock is None
            or self._ws_request_lock_loop_ref is None
            or self._ws_request_lock_loop_ref() is not running_loop
        ):
            self._ws_request_lock = asyncio.Lock()
            self._ws_request_lock_loop_ref = weakref.ref(running_loop)
        return self._ws_request_lock

    @overload
    async def _fetch_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None,
        conversation_id: str | None,
        stream: Literal[True],
        prompt: ResponsePromptParam | None = None,
    ) -> AsyncIterator[ResponseStreamEvent]: ...

    @overload
    async def _fetch_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None,
        conversation_id: str | None,
        stream: Literal[False],
        prompt: ResponsePromptParam | None = None,
    ) -> Response: ...

    async def _fetch_response(
        self,
        system_instructions: str | None,
        input: str | list[TResponseInputItem],
        model_settings: ModelSettings,
        tools: list[Tool],
        output_schema: AgentOutputSchemaBase | None,
        handoffs: list[Handoff],
        previous_response_id: str | None = None,
        conversation_id: str | None = None,
        stream: Literal[True] | Literal[False] = False,
        prompt: ResponsePromptParam | None = None,
    ) -> Response | AsyncIterator[ResponseStreamEvent]:
        create_kwargs = self._build_response_create_kwargs(
            system_instructions=system_instructions,
            input=input,
            model_settings=model_settings,
            tools=tools,
            output_schema=output_schema,
            handoffs=handoffs,
            previous_response_id=previous_response_id,
            conversation_id=conversation_id,
            stream=True,
            prompt=prompt,
        )

        if stream:
            return self._iter_websocket_response_events(create_kwargs)

        final_response: Response | None = None
        terminal_event_type: str | None = None
        async for event in self._iter_websocket_response_events(create_kwargs):
            event_type = getattr(event, "type", None)
            if isinstance(event, ResponseCompletedEvent):
                final_response = event.response
                terminal_event_type = event.type
            elif event_type in {"response.incomplete", "response.failed"}:
                terminal_event_type = cast(str, event_type)
                terminal_response = getattr(event, "response", None)
                if isinstance(terminal_response, Response):
                    final_response = terminal_response

        if final_response is None:
            terminal_event_hint = (
                f" Terminal event: `{terminal_event_type}`." if terminal_event_type else ""
            )
            raise RuntimeError(
                "Responses websocket stream ended without a terminal response payload."
                f"{terminal_event_hint}"
            )

        return final_response

    async def _iter_websocket_response_events(
        self, create_kwargs: dict[str, Any]
    ) -> AsyncIterator[ResponseStreamEvent]:
        request_timeout = create_kwargs.get("timeout", omit)
        if _is_openai_omitted_value(request_timeout):
            request_timeout = getattr(self._client, "timeout", None)
        request_timeouts = self._get_websocket_request_timeouts(request_timeout)
        request_close_generation = self._ws_client_close_generation
        request_lock = self._get_ws_request_lock()
        if request_timeouts.lock == 0 and not request_lock.locked():
            # `wait_for(..., timeout=0)` can time out before an uncontended acquire runs.
            await request_lock.acquire()
        else:
            await self._await_websocket_with_timeout(
                request_lock.acquire(),
                request_timeouts.lock,
                "request lock wait",
            )
        try:
            request_frame, ws_url, request_headers = await self._prepare_websocket_request(
                create_kwargs
            )
            retry_pre_event_disconnect = True
            while True:
                connection = await self._await_websocket_with_timeout(
                    self._ensure_websocket_connection(
                        ws_url, request_headers, connect_timeout=request_timeouts.connect
                    ),
                    request_timeouts.connect,
                    "connect",
                )
                received_any_event = False
                yielded_terminal_event = False
                sent_request_frame = False
                try:
                    # Once we begin awaiting `send()`, treat the request as potentially
                    # transmitted to avoid replaying it on send/close races.
                    sent_request_frame = True
                    await self._await_websocket_with_timeout(
                        connection.send(json.dumps(request_frame, default=_json_dumps_default)),
                        request_timeouts.send,
                        "send",
                    )

                    while True:
                        frame = await self._await_websocket_with_timeout(
                            connection.recv(),
                            request_timeouts.recv,
                            "receive",
                        )
                        if frame is None:
                            raise RuntimeError(
                                "Responses websocket connection closed before a terminal "
                                "response event."
                            )

                        if isinstance(frame, bytes):
                            frame = frame.decode("utf-8")

                        payload = json.loads(frame)
                        event_type = payload.get("type")

                        if event_type == "error":
                            raise ResponsesWebSocketError(payload)
                        if event_type == "response.error":
                            received_any_event = True
                            raise ResponsesWebSocketError(payload)

                        event = _construct_response_stream_event_from_payload(payload)
                        received_any_event = True
                        is_terminal_event = event_type in {
                            "response.completed",
                            "response.failed",
                            "response.incomplete",
                            "response.error",
                        }
                        if is_terminal_event:
                            yielded_terminal_event = True
                        yield event

                        if is_terminal_event:
                            return
                except BaseException as exc:
                    is_non_terminal_generator_exit = (
                        isinstance(exc, GeneratorExit) and not yielded_terminal_event
                    )
                    if isinstance(exc, asyncio.CancelledError) or is_non_terminal_generator_exit:
                        self._force_abort_websocket_connection(connection)
                        self._clear_websocket_connection_state()
                    elif not (yielded_terminal_event and isinstance(exc, GeneratorExit)):
                        await self._drop_websocket_connection()

                    is_pre_event_disconnect = (
                        not received_any_event
                        and isinstance(exc, Exception)
                        and self._should_wrap_pre_event_websocket_disconnect(exc)
                    )
                    # Do not replay a request after the frame was sent; the server may already
                    # be executing it even if no response event arrived yet.
                    is_retryable_pre_event_disconnect = (
                        is_pre_event_disconnect and not sent_request_frame
                    )
                    if (
                        is_pre_event_disconnect
                        and self._ws_client_close_generation != request_close_generation
                    ):
                        raise
                    if retry_pre_event_disconnect and is_retryable_pre_event_disconnect:
                        retry_pre_event_disconnect = False
                        continue
                    if is_pre_event_disconnect:
                        raise RuntimeError(
                            "Responses websocket connection closed before any response events "
                            "were received. The feature may not be enabled for this account/model "
                            "yet, or the server closed the connection."
                        ) from exc
                    raise
        finally:
            request_lock.release()

    def _should_wrap_pre_event_websocket_disconnect(self, exc: Exception) -> bool:
        if isinstance(exc, UserError):
            return False
        if isinstance(exc, ResponsesWebSocketError):
            return False

        if isinstance(exc, RuntimeError):
            message = str(exc)
            if message.startswith("Responses websocket error:"):
                return False
            return message.startswith(
                "Responses websocket connection closed before a terminal response event."
            )

        exc_module = exc.__class__.__module__
        exc_name = exc.__class__.__name__
        return exc_module.startswith("websockets") and exc_name.startswith("ConnectionClosed")

    def _get_websocket_request_timeouts(self, timeout: Any) -> _WebsocketRequestTimeouts:
        if timeout is None or _is_openai_omitted_value(timeout):
            return _WebsocketRequestTimeouts(lock=None, connect=None, send=None, recv=None)

        if isinstance(timeout, httpx.Timeout):
            return _WebsocketRequestTimeouts(
                lock=None if timeout.pool is None else float(timeout.pool),
                connect=None if timeout.connect is None else float(timeout.connect),
                send=None if timeout.write is None else float(timeout.write),
                recv=None if timeout.read is None else float(timeout.read),
            )

        if isinstance(timeout, (int, float)):
            timeout_seconds = float(timeout)
            return _WebsocketRequestTimeouts(
                lock=timeout_seconds,
                connect=timeout_seconds,
                send=timeout_seconds,
                recv=timeout_seconds,
            )

        return _WebsocketRequestTimeouts(lock=None, connect=None, send=None, recv=None)

    async def _await_websocket_with_timeout(
        self,
        awaitable: Awaitable[Any],
        timeout_seconds: float | None,
        phase: str,
    ) -> Any:
        if timeout_seconds is None:
            return await awaitable

        if timeout_seconds == 0:
            # `wait_for(..., timeout=0)` can time out before an immediately-ready awaitable runs.
            task = asyncio.ensure_future(awaitable)
            if not task.done():
                await asyncio.sleep(0)
            if task.done():
                return task.result()
            task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await task
            raise TimeoutError(
                f"Responses websocket {phase} timed out after {timeout_seconds} seconds."
            )

        try:
            return await asyncio.wait_for(awaitable, timeout=timeout_seconds)
        except asyncio.TimeoutError as exc:
            raise TimeoutError(
                f"Responses websocket {phase} timed out after {timeout_seconds} seconds."
            ) from exc

    async def _prepare_websocket_request(
        self, create_kwargs: dict[str, Any]
    ) -> tuple[dict[str, Any], str, dict[str, str]]:
        await _refresh_openai_client_api_key_if_supported(self._client)

        request_kwargs = dict(create_kwargs)
        extra_headers_raw = request_kwargs.pop("extra_headers", None)
        if extra_headers_raw is None or _is_openai_omitted_value(extra_headers_raw):
            extra_headers_raw = {}
        extra_query = request_kwargs.pop("extra_query", None)
        extra_body = request_kwargs.pop("extra_body", None)
        # Request options like `timeout` are transport-level settings, not websocket
        # `response.create` payload fields. They are applied separately when sending/receiving.
        request_kwargs.pop("timeout", None)

        if not isinstance(extra_headers_raw, Mapping):
            raise UserError("Responses websocket extra headers must be a mapping.")

        handshake_headers = self._merge_websocket_headers(extra_headers_raw)
        ws_url = self._prepare_websocket_url(extra_query)

        frame: dict[str, Any] = {"type": "response.create"}
        for key, value in request_kwargs.items():
            if _is_openai_omitted_value(value):
                continue
            frame[key] = value

        frame["stream"] = True

        if extra_body is not None and not _is_openai_omitted_value(extra_body):
            if not isinstance(extra_body, Mapping):
                raise UserError("Responses websocket extra_body must be a mapping.")
            for key, value in extra_body.items():
                if _is_openai_omitted_value(value):
                    continue
                frame[str(key)] = value

        # Preserve websocket envelope fields regardless of `extra_body` contents.
        frame["type"] = "response.create"
        frame["stream"] = True

        return frame, ws_url, handshake_headers

    def _merge_websocket_headers(self, extra_headers: Mapping[str, Any]) -> dict[str, str]:
        headers: dict[str, str] = {}
        for key, value in self._client.default_headers.items():
            if _is_openai_omitted_value(value):
                continue
            headers[key] = str(value)

        for key, value in extra_headers.items():
            if isinstance(value, NotGiven):
                continue
            header_key = str(key)
            for existing_key in list(headers):
                if existing_key.lower() == header_key.lower():
                    del headers[existing_key]
            if isinstance(value, Omit):
                continue
            headers[header_key] = str(value)

        return headers

    def _prepare_websocket_url(self, extra_query: Any) -> str:
        if self._client.websocket_base_url is not None:
            base_url = httpx.URL(self._client.websocket_base_url)
            ws_scheme = {"http": "ws", "https": "wss"}.get(base_url.scheme, base_url.scheme)
            base_url = base_url.copy_with(scheme=ws_scheme)
        else:
            client_base_url = self._client.base_url
            ws_scheme = {"http": "ws", "https": "wss"}.get(
                client_base_url.scheme, client_base_url.scheme
            )
            base_url = client_base_url.copy_with(scheme=ws_scheme)

        params: dict[str, Any] = dict(base_url.params)
        default_query = getattr(self._client, "default_query", None)
        if default_query is not None and not _is_openai_omitted_value(default_query):
            if not isinstance(default_query, Mapping):
                raise UserError("Responses websocket client default_query must be a mapping.")
            for key, value in default_query.items():
                query_key = str(key)
                if isinstance(value, Omit):
                    params.pop(query_key, None)
                    continue
                if isinstance(value, NotGiven):
                    continue
                params[query_key] = value

        if extra_query is not None and not _is_openai_omitted_value(extra_query):
            if not isinstance(extra_query, Mapping):
                raise UserError("Responses websocket extra_query must be a mapping.")
            for key, value in extra_query.items():
                query_key = str(key)
                if isinstance(value, Omit):
                    params.pop(query_key, None)
                    continue
                if isinstance(value, NotGiven):
                    continue
                params[query_key] = value

        path = base_url.path.rstrip("/") + "/responses"
        return str(base_url.copy_with(path=path, params=params))

    async def _ensure_websocket_connection(
        self,
        ws_url: str,
        headers: Mapping[str, str],
        *,
        connect_timeout: float | None,
    ) -> Any:
        running_loop = asyncio.get_running_loop()
        identity = (
            ws_url,
            tuple(sorted((str(key).lower(), str(value)) for key, value in headers.items())),
        )

        if self._ws_connection is not None and self._ws_connection_identity == identity:
            if (
                self._ws_connection_loop_ref is not None
                and self._ws_connection_loop_ref() is running_loop
                and self._is_websocket_connection_reusable(self._ws_connection)
            ):
                return self._ws_connection
        if self._ws_connection is not None:
            await self._drop_websocket_connection()
        self._ws_connection = await self._open_websocket_connection(
            ws_url,
            headers,
            connect_timeout=connect_timeout,
        )
        self._ws_connection_identity = identity
        self._ws_connection_loop_ref = weakref.ref(running_loop)
        return self._ws_connection

    def _is_websocket_connection_reusable(self, connection: Any) -> bool:
        try:
            state = getattr(connection, "state", None)
            state_name = getattr(state, "name", None)
            if isinstance(state_name, str):
                return state_name == "OPEN"

            closed = getattr(connection, "closed", None)
            if isinstance(closed, bool):
                return not closed

            is_open = getattr(connection, "open", None)
            if isinstance(is_open, bool):
                return is_open

            close_code = getattr(connection, "close_code", None)
            if close_code is not None:
                return False
        except Exception:
            return False

        return True

    async def close(self) -> None:
        """Close the persistent websocket connection, if one is open."""
        self._ws_client_close_generation += 1
        request_lock = self._get_current_loop_ws_request_lock()
        if request_lock is not None and request_lock.locked():
            if self._ws_connection is not None:
                self._force_abort_websocket_connection(self._ws_connection)
            self._clear_websocket_connection_state()
            return

        await self._drop_websocket_connection()

    def _get_current_loop_ws_request_lock(self) -> asyncio.Lock | None:
        if self._ws_request_lock is None or self._ws_request_lock_loop_ref is None:
            return None

        try:
            running_loop = asyncio.get_running_loop()
        except RuntimeError:
            return None

        if self._ws_request_lock_loop_ref() is not running_loop:
            return None

        return self._ws_request_lock

    def _force_abort_websocket_connection(self, connection: Any) -> None:
        """Best-effort fallback for cross-loop cleanup when awaiting close() fails."""
        try:
            transport = getattr(connection, "transport", None)
            if transport is not None:
                abort = getattr(transport, "abort", None)
                if callable(abort):
                    abort()
                    return
                close_transport = getattr(transport, "close", None)
                if callable(close_transport):
                    close_transport()
                    return
        except Exception:
            pass

    def _force_drop_websocket_connection_sync(self) -> None:
        """Synchronously abort and clear cached websocket state without awaiting close()."""
        self._ws_client_close_generation += 1
        if self._ws_connection is not None:
            self._force_abort_websocket_connection(self._ws_connection)
        self._clear_websocket_connection_state()
        # Also clear the loop-bound lock so closed-loop models don't retain stale lock state.
        self._ws_request_lock = None
        self._ws_request_lock_loop_ref = None

    def _clear_websocket_connection_state(self) -> None:
        """Clear cached websocket connection metadata."""
        self._ws_connection = None
        self._ws_connection_identity = None
        self._ws_connection_loop_ref = None

    async def _drop_websocket_connection(self) -> None:
        if self._ws_connection is None:
            self._clear_websocket_connection_state()
            return

        try:
            await self._ws_connection.close()
        except Exception:
            self._force_abort_websocket_connection(self._ws_connection)
        finally:
            self._clear_websocket_connection_state()

    async def _open_websocket_connection(
        self,
        ws_url: str,
        headers: Mapping[str, str],
        *,
        connect_timeout: float | None,
    ) -> Any:
        try:
            from websockets.asyncio.client import connect
        except ImportError as exc:
            raise UserError(
                "OpenAIResponsesWSModel requires the `websockets` package. "
                "Install `websockets` or `openai[realtime]`."
            ) from exc

        return await connect(
            ws_url,
            user_agent_header=None,
            additional_headers=dict(headers),
            max_size=None,
            open_timeout=connect_timeout,
        )

close async

close() -> None

Close the persistent websocket connection, if one is open.

Source code in src/agents/models/openai_responses.py
async def close(self) -> None:
    """Close the persistent websocket connection, if one is open."""
    self._ws_client_close_generation += 1
    request_lock = self._get_current_loop_ws_request_lock()
    if request_lock is not None and request_lock.locked():
        if self._ws_connection is not None:
            self._force_abort_websocket_connection(self._ws_connection)
        self._clear_websocket_connection_state()
        return

    await self._drop_websocket_connection()

stream_response async

stream_response(
    system_instructions: str | None,
    input: str | list[TResponseInputItem],
    model_settings: ModelSettings,
    tools: list[Tool],
    output_schema: AgentOutputSchemaBase | None,
    handoffs: list[Handoff],
    tracing: ModelTracing,
    previous_response_id: str | None = None,
    conversation_id: str | None = None,
    prompt: ResponsePromptParam | None = None,
) -> AsyncIterator[ResponseStreamEvent]

Yields a partial message as it is generated, as well as the usage information.

Source code in src/agents/models/openai_responses.py
async def stream_response(
    self,
    system_instructions: str | None,
    input: str | list[TResponseInputItem],
    model_settings: ModelSettings,
    tools: list[Tool],
    output_schema: AgentOutputSchemaBase | None,
    handoffs: list[Handoff],
    tracing: ModelTracing,
    previous_response_id: str | None = None,
    conversation_id: str | None = None,
    prompt: ResponsePromptParam | None = None,
) -> AsyncIterator[ResponseStreamEvent]:
    """
    Yields a partial message as it is generated, as well as the usage information.
    """
    with response_span(disabled=tracing.is_disabled()) as span_response:
        try:
            stream = await self._fetch_response(
                system_instructions,
                input,
                model_settings,
                tools,
                output_schema,
                handoffs,
                previous_response_id=previous_response_id,
                conversation_id=conversation_id,
                stream=True,
                prompt=prompt,
            )

            final_response: Response | None = None
            yielded_terminal_event = False
            close_stream_in_background = False
            try:
                async for chunk in stream:
                    chunk_type = getattr(chunk, "type", None)
                    if isinstance(chunk, ResponseCompletedEvent):
                        final_response = chunk.response
                    elif chunk_type in {
                        "response.failed",
                        "response.incomplete",
                    }:
                        terminal_response = getattr(chunk, "response", None)
                        if isinstance(terminal_response, Response):
                            final_response = terminal_response
                    if chunk_type in {
                        "response.completed",
                        "response.failed",
                        "response.incomplete",
                        "response.error",
                    }:
                        yielded_terminal_event = True
                    yield chunk
            except asyncio.CancelledError:
                close_stream_in_background = True
                self._schedule_async_iterator_close(stream)
                raise
            finally:
                if not close_stream_in_background:
                    try:
                        await self._maybe_aclose_async_iterator(stream)
                    except Exception as exc:
                        if yielded_terminal_event:
                            logger.debug(
                                f"Ignoring stream cleanup error after terminal event: {exc}"
                            )
                        else:
                            raise

            if final_response and tracing.include_data():
                span_response.span_data.response = final_response
                span_response.span_data.input = input

        except Exception as e:
            span_response.set_error(
                SpanError(
                    message="Error streaming response",
                    data={
                        "error": str(e) if tracing.include_data() else e.__class__.__name__,
                    },
                )
            )
            logger.error(f"Error streaming response: {e}")
            raise

Converter

Source code in src/agents/models/openai_responses.py
class Converter:
    @classmethod
    def _convert_shell_environment(cls, environment: ShellToolEnvironment | None) -> dict[str, Any]:
        """Convert shell environment settings to OpenAI payload shape."""
        if environment is None:
            return {"type": "local"}
        if not isinstance(environment, Mapping):
            raise UserError("Shell environment must be a mapping.")

        payload = dict(environment)
        if "type" not in payload:
            payload["type"] = "local"
        return payload

    @classmethod
    def convert_tool_choice(
        cls, tool_choice: Literal["auto", "required", "none"] | str | MCPToolChoice | None
    ) -> response_create_params.ToolChoice | Omit:
        if tool_choice is None:
            return omit
        elif isinstance(tool_choice, MCPToolChoice):
            return {
                "server_label": tool_choice.server_label,
                "type": "mcp",
                "name": tool_choice.name,
            }
        elif tool_choice == "required":
            return "required"
        elif tool_choice == "auto":
            return "auto"
        elif tool_choice == "none":
            return "none"
        elif tool_choice == "file_search":
            return {
                "type": "file_search",
            }
        elif tool_choice == "web_search":
            return {
                # TODO: revisit the type: ignore comment when ToolChoice is updated in the future
                "type": "web_search",  # type: ignore[misc, return-value]
            }
        elif tool_choice == "web_search_preview":
            return {
                "type": "web_search_preview",
            }
        elif tool_choice == "computer_use_preview":
            return {
                "type": "computer_use_preview",
            }
        elif tool_choice == "image_generation":
            return {
                "type": "image_generation",
            }
        elif tool_choice == "code_interpreter":
            return {
                "type": "code_interpreter",
            }
        elif tool_choice == "mcp":
            # Note that this is still here for backwards compatibility,
            # but migrating to MCPToolChoice is recommended.
            return {"type": "mcp"}  # type: ignore[misc, return-value]
        else:
            return {
                "type": "function",
                "name": tool_choice,
            }

    @classmethod
    def get_response_format(
        cls, output_schema: AgentOutputSchemaBase | None
    ) -> ResponseTextConfigParam | Omit:
        if output_schema is None or output_schema.is_plain_text():
            return omit
        else:
            return {
                "format": {
                    "type": "json_schema",
                    "name": "final_output",
                    "schema": output_schema.json_schema(),
                    "strict": output_schema.is_strict_json_schema(),
                }
            }

    @classmethod
    def convert_tools(
        cls,
        tools: list[Tool],
        handoffs: list[Handoff[Any, Any]],
    ) -> ConvertedTools:
        converted_tools: list[ToolParam] = []
        includes: list[ResponseIncludable] = []

        computer_tools = [tool for tool in tools if isinstance(tool, ComputerTool)]
        if len(computer_tools) > 1:
            raise UserError(f"You can only provide one computer tool. Got {len(computer_tools)}")

        for tool in tools:
            converted_tool, include = cls._convert_tool(tool)
            converted_tools.append(converted_tool)
            if include:
                includes.append(include)

        for handoff in handoffs:
            converted_tools.append(cls._convert_handoff_tool(handoff))

        return ConvertedTools(tools=converted_tools, includes=includes)

    @classmethod
    def _convert_tool(cls, tool: Tool) -> tuple[ToolParam, ResponseIncludable | None]:
        """Returns converted tool and includes"""

        if isinstance(tool, FunctionTool):
            converted_tool: ToolParam = {
                "name": tool.name,
                "parameters": tool.params_json_schema,
                "strict": tool.strict_json_schema,
                "type": "function",
                "description": tool.description,
            }
            includes: ResponseIncludable | None = None
        elif isinstance(tool, WebSearchTool):
            # TODO: revisit the type: ignore comment when ToolParam is updated in the future
            converted_tool = {
                "type": "web_search",
                "filters": tool.filters.model_dump() if tool.filters is not None else None,  # type: ignore [typeddict-item]
                "user_location": tool.user_location,
                "search_context_size": tool.search_context_size,
            }
            includes = None
        elif isinstance(tool, FileSearchTool):
            converted_tool = {
                "type": "file_search",
                "vector_store_ids": tool.vector_store_ids,
            }
            if tool.max_num_results:
                converted_tool["max_num_results"] = tool.max_num_results
            if tool.ranking_options:
                converted_tool["ranking_options"] = tool.ranking_options
            if tool.filters:
                converted_tool["filters"] = tool.filters

            includes = "file_search_call.results" if tool.include_search_results else None
        elif isinstance(tool, ComputerTool):
            computer = tool.computer
            if not isinstance(computer, (Computer, AsyncComputer)):
                raise UserError(
                    "Computer tool is not initialized for serialization. Call "
                    "resolve_computer({ tool, run_context }) with a run context first "
                    "when building payloads manually."
                )
            converted_tool = {
                "type": "computer_use_preview",
                "environment": computer.environment,
                "display_width": computer.dimensions[0],
                "display_height": computer.dimensions[1],
            }
            includes = None
        elif isinstance(tool, HostedMCPTool):
            converted_tool = tool.tool_config
            includes = None
        elif isinstance(tool, ApplyPatchTool):
            converted_tool = cast(ToolParam, {"type": "apply_patch"})
            includes = None
        elif isinstance(tool, ShellTool):
            converted_tool = cast(
                ToolParam,
                {
                    "type": "shell",
                    "environment": cls._convert_shell_environment(tool.environment),
                },
            )
            includes = None
        elif isinstance(tool, ImageGenerationTool):
            converted_tool = tool.tool_config
            includes = None
        elif isinstance(tool, CodeInterpreterTool):
            converted_tool = tool.tool_config
            includes = None
        elif isinstance(tool, LocalShellTool):
            converted_tool = {
                "type": "local_shell",
            }
            includes = None
        else:
            raise UserError(f"Unknown tool type: {type(tool)}, tool")

        return converted_tool, includes

    @classmethod
    def _convert_handoff_tool(cls, handoff: Handoff) -> ToolParam:
        return {
            "name": handoff.tool_name,
            "parameters": handoff.input_json_schema,
            "strict": handoff.strict_json_schema,
            "type": "function",
            "description": handoff.tool_description,
        }