class ThreadItemConverter:
"""
Converts thread items to Agent SDK input items.
Widgets, Tasks, and Workflows have default conversions but can be customized.
Attachments, Tags, and HiddenContextItems require custom handling based on the use case.
Other item types are converted automatically.
"""
def attachment_to_message_content(
self, attachment: Attachment
) -> Awaitable[ResponseInputContentParam]:
"""
Convert an attachment in a user message into a message content part to send to the model.
Required when attachments are enabled.
"""
raise NotImplementedError(
"An Attachment was included in a UserMessageItem but Converter.attachment_to_message_content was not implemented"
)
def tag_to_message_content(
self, tag: UserMessageTagContent
) -> ResponseInputContentParam:
"""
Convert a tag in a user message into a message content part to send to the model as context.
Required when tags are used.
"""
raise NotImplementedError(
"A Tag was included in a UserMessageItem but Converter.tag_to_message_content is not implemented"
)
def hidden_context_to_input(
self, item: HiddenContextItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
"""
Convert a HiddenContextItem into input item(s) to send to the model.
Required when HiddenContextItem are used.
"""
raise NotImplementedError(
"HiddenContextItem were present in a user message but Converter.hidden_context_to_input was not implemented"
)
def task_to_input(
self, item: TaskItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
"""
Convert a TaskItem into input item(s) to send to the model.
"""
if item.task.type != "custom" or (
not item.task.title and not item.task.content
):
return None
title = f"{item.task.title}" if item.task.title else ""
content = f"{item.task.content}" if item.task.content else ""
task_text = f"{title}: {content}" if title and content else title or content
text = f"A message was displayed to the user that the following task was performed:\n<Task>\n{task_text}\n</Task>"
return Message(
type="message",
content=[
ResponseInputTextParam(
type="input_text",
text=text,
)
],
role="user",
)
def workflow_to_input(
self, item: WorkflowItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
"""
Convert a TaskItem into input item(s) to send to the model.
Returns WorkflowItem.response_items by default.
"""
messages = []
for task in item.workflow.tasks:
if task.type != "custom" or (not task.title and not task.content):
continue
title = f"{task.title}" if task.title else ""
content = f"{task.content}" if task.content else ""
task_text = f"{title}: {content}" if title and content else title or content
text = f"A message was displayed to the user that the following task was performed:\n<Task>\n{task_text}\n</Task>"
messages.append(
Message(
type="message",
content=[
ResponseInputTextParam(
type="input_text",
text=text,
)
],
role="user",
)
)
return messages
def widget_to_input(
self, item: WidgetItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
"""
Convert a WidgetItem into input item(s) to send to the model.
By default, WidgetItems converted to a text description with a JSON representation of the widget.
"""
return Message(
type="message",
content=[
ResponseInputTextParam(
type="input_text",
text=f"The following graphical UI widget (id: {item.id}) was displayed to the user:"
+ item.widget.model_dump_json(
exclude_unset=True, exclude_none=True
),
)
],
role="user",
)
async def user_message_to_input(
self, item: UserMessageItem, is_last_message: bool = True
) -> TResponseInputItem | list[TResponseInputItem] | None:
# Build the user text exactly as typed, rendering tags as @key
message_text_parts: list[str] = []
# Track tags separately to add system context
raw_tags: list[UserMessageTagContent] = []
for part in item.content:
if isinstance(part, UserMessageTextContent):
message_text_parts.append(part.text)
elif isinstance(part, UserMessageTagContent):
message_text_parts.append(f"@{part.text}")
raw_tags.append(part)
else:
assert_never(part)
user_text_item = Message(
role="user",
type="message",
content=[
ResponseInputTextParam(
type="input_text", text="".join(message_text_parts)
),
*[
await self.attachment_to_message_content(a)
for a in item.attachments
],
],
)
# Build system items (prepend later): quoted text and @-mention context
context_items: list[TResponseInputItem] = []
if item.quoted_text and is_last_message:
context_items.append(
Message(
role="user",
type="message",
content=[
ResponseInputTextParam(
type="input_text",
text=f"The user is referring to this in particular: \n{item.quoted_text}",
)
],
)
)
# Dedupe tags (preserve order) and resolve to message content
if raw_tags:
seen, uniq_tags = set(), []
for t in raw_tags:
if t.text not in seen:
seen.add(t.text)
uniq_tags.append(t)
tag_content: ResponseInputMessageContentListParam = [
# should return summarized text items
self.tag_to_message_content(tag)
for tag in uniq_tags
]
if tag_content:
context_items.append(
Message(
role="user",
type="message",
content=[
ResponseInputTextParam(
type="input_text",
text=cleandoc("""
# User-provided context for @-mentions
- When referencing resolved entities, use their canonical names **without** '@'.
- The '@' form appears only in user text and should not be echoed.
""").strip(),
),
*tag_content,
],
)
)
return [user_text_item, *context_items]
async def assistant_message_to_input(
self, item: AssistantMessageItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
return EasyInputMessageParam(
type="message",
content=[
# content param doesn't support the assistant message content types
cast(
ResponseInputContentParam,
ResponseOutputText(
type="output_text",
text=c.text,
annotations=[], # TODO: these should be sent back as well
).model_dump(),
)
for c in item.content
],
role="assistant",
)
async def client_tool_call_to_input(
self, item: ClientToolCallItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
if item.status == "pending":
# Filter out pending tool calls - they cannot be sent to the model
return None
return [
ResponseFunctionToolCallParam(
type="function_call",
call_id=item.call_id,
name=item.name,
arguments=json.dumps(item.arguments),
),
FunctionCallOutput(
type="function_call_output",
call_id=item.call_id,
output=json.dumps(item.output),
),
]
async def end_of_turn_to_input(
self, item: EndOfTurnItem
) -> TResponseInputItem | list[TResponseInputItem] | None:
# Only used for UI hints - you shouldn't need to override this
return None
async def _thread_item_to_input_item(
self,
item: ThreadItem,
is_last_message: bool = True,
) -> list[TResponseInputItem]:
match item:
case UserMessageItem():
out = await self.user_message_to_input(item, is_last_message) or []
return out if isinstance(out, list) else [out]
case AssistantMessageItem():
out = await self.assistant_message_to_input(item) or []
return out if isinstance(out, list) else [out]
case ClientToolCallItem():
out = await self.client_tool_call_to_input(item) or []
return out if isinstance(out, list) else [out]
case EndOfTurnItem():
out = await self.end_of_turn_to_input(item) or []
return out if isinstance(out, list) else [out]
case WidgetItem():
out = self.widget_to_input(item) or []
return out if isinstance(out, list) else [out]
case WorkflowItem():
out = self.workflow_to_input(item) or []
return out if isinstance(out, list) else [out]
case TaskItem():
out = self.task_to_input(item) or []
return out if isinstance(out, list) else [out]
case HiddenContextItem():
out = self.hidden_context_to_input(item) or []
return out if isinstance(out, list) else [out]
case _:
assert_never(item)
async def to_agent_input(
self,
thread_items: Sequence[ThreadItem] | ThreadItem,
) -> list[TResponseInputItem]:
if isinstance(thread_items, Sequence):
# shallow copy in case caller mutates the list while we're iterating
thread_items = thread_items[:]
else:
thread_items = [thread_items]
output: list[TResponseInputItem] = []
for item in thread_items:
output.extend(
await self._thread_item_to_input_item(
item,
is_last_message=item is thread_items[-1],
)
)
return output