Respond to a user message
This guide covers how to implement and run a ChatKit server that responds to user messages, including thread loading, inference, event streaming, and persistence.
Install ChatKit
Install the SDK from PyPI:
pip install openai-chatkit
Build and run your ChatKit server
Your ChatKit server does three main things:
- Accept HTTP requests from your client.
- Construct a request context (user id, auth, feature flags, etc.).
- Call
ChatKitServer.respondto produce streamed events.
Define a request context
First, define a small context object that will be created per request and passed through your server, store, and agents:
from dataclasses import dataclass
@dataclass
class MyRequestContext:
user_id: str
Implement your ChatKitServer
Subclass ChatKitServer and implement respond. It runs once per user turn and should yield the events that make up your response. We'll keep this example simple for now and fill in history loading and model calls in later sections.
from collections.abc import AsyncIterator
from datetime import datetime
from chatkit.server import ChatKitServer
from chatkit.types import (
AssistantMessageContent,
AssistantMessageItem,
ThreadItemDoneEvent,
ThreadMetadata,
ThreadStreamEvent,
UserMessageItem,
)
class MyChatKitServer(ChatKitServer[MyRequestContext]):
async def respond(
self,
thread: ThreadMetadata,
input: UserMessageItem | None,
context: MyRequestContext,
) -> AsyncIterator[ThreadStreamEvent]:
# Replace this with your inference pipeline.
yield ThreadItemDoneEvent(
item=AssistantMessageItem(
thread_id=thread.id,
id=self.store.generate_item_id("message", thread, context),
created_at=datetime.now(),
content=[AssistantMessageContent(text="Hi there!")],
)
)
Wire ChatKit to your web framework
Expose a single /chatkit endpoint that forwards requests to your MyChatKitServer instance. For example, with FastAPI:
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
from chatkit.server import ChatKitServer, StreamingResult
app = FastAPI()
store = MyPostgresStore(conn_info)
server = MyChatKitServer(store)
@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
# Build a per-request context from the incoming HTTP request.
context = MyRequestContext(user_id="abc123")
# Let ChatKit handle the request and return either a streaming or JSON result.
result = await server.process(await request.body(), context)
if isinstance(result, StreamingResult):
return StreamingResponse(result, media_type="text/event-stream")
return Response(content=result.json, media_type="application/json")
How request context flows into ChatKit
ChatKitServer[TContext] and Store[TContext] are generic over a request context type you choose (user id, org, auth scopes, feature flags). Construct it per request and pass it to server.process; it flows into respond and your store methods.
context = MyRequestContext(user_id="abc123")
result = await server.process(await request.body(), context)
Request metadata in the payload is available before calling process; include it in your context for auth, tracing, or feature flags.
Implement your ChatKit data store
Implement the Store interface to control how threads, messages, tool calls, and widgets are stored. Prefer serializing thread items as JSON so schema changes do not break storage. Example Postgres store:
class MyPostgresStore(Store[RequestContext]):
def __init__(self, conninfo: str) -> None:
self._conninfo = conninfo
self._init_schema()
def _init_schema(self) -> None:
with self._connection() as conn, conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS threads (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB NOT NULL
);
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS items (
id TEXT PRIMARY KEY,
thread_id TEXT NOT NULL
REFERENCES threads (id)
ON DELETE CASCADE,
user_id TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
data JSONB NOT NULL
);
"""
)
conn.commit()
async def load_thread(
self, thread_id: str, context: RequestContext
) -> ThreadMetadata:
with self._connection() as conn, conn.cursor(row_factory=tuple_row) as cur:
cur.execute(
"SELECT data FROM threads WHERE id = %s AND user_id = %s",
(thread_id, context.user_id),
)
row = cur.fetchone()
if row is None:
raise NotFoundError(f"Thread {thread_id} not found")
return ThreadMetadata.model_validate(row[0])
async def save_thread(
self, thread: ThreadMetadata, context: RequestContext
) -> None:
payload = thread.model_dump(mode="json")
with self._connection() as conn, conn.cursor() as cur:
cur.execute(
"""
INSERT INTO threads (id, user_id, created_at, data)
VALUES (%s, %s, %s, %s)
""",
(thread.id, context.user_id, thread.created_at, payload),
)
conn.commit()
# Implement the remaining Store methods following the same pattern.
Customize ID generation by overriding generate_thread_id and generate_item_id if you need external or deterministic IDs. Store metadata such as a model last_response_id on ThreadMetadata to drive your inference pipeline.
Generate a response using your model
Inside respond, you'll usually:
- Load recent thread history.
- Prepare model input for your agent.
- Run inference and stream events back to the client.
Prepare model input
Before you call your model, decide what conversation context you want the model to see.
Load thread history (recommended default)
Fetch recent items so the model sees the conversation state before you build the next turn:
items_page = await self.store.load_thread_items(
thread.id,
after=None,
limit=20, # Tune this limit based on your model/context budget.
order="desc",
context=context,
)
items = list(reversed(items_page.data))
- Start with the defaults:
simple_to_agent_input(items)is a convenience wrapper around the defaultThreadItemConverter(it callsThreadItemConverter().to_agent_input(items)under the hood). - Customize for your integration: some item types require app-specific translation into model input (for example: attachments, tags, and some hidden context items). In those cases, subclass
ThreadItemConverterand call your converter directly instead ofsimple_to_agent_input. (If your thread includes attachments/tags and you haven't implemented the converter hooks, conversion will raiseNotImplementedError.)
from agents import Runner
from chatkit.agents import AgentContext, ThreadItemConverter, simple_to_agent_input
# Option A (defaults):
input_items = await simple_to_agent_input(items)
# Option B (your integration-specific converter):
# input_items = await MyThreadItemConverter().to_agent_input(items)
agent_context = AgentContext(
thread=thread,
store=self.store,
request_context=context,
)
Respect any input.inference_options the client sends (model, tool choice, etc.) when you build your request to the model.
Using previous_response_id (OpenAI Responses API only)
If you are using OpenAI models through the Responses API, you can pass previous_response_id to Runner.run_streamed(...) and (often) send only the new user message as model input. This can simplify input construction when the provider can retrieve prior context server-side.
Terminology note: Agents exposes the ID of the most recent model response as result.last_response_id. On the next turn, you pass that saved value as the previous_response_id parameter.
Important restrictions:
- OpenAI Responses API only. Other model providers won't be able to follow a
previous_response_id, so you must send thread history yourself. - Only includes model-visible history. If your integration streams ChatKit-only items (e.g. widgets/workflows emitted directly to the client), the model won't know about them unless you also include them in
input_items. - Works only while the referenced response is retrievable. Persist
result.last_response_idand ensure responses are stored (store=True/ModelSettings(store=True)); otherwise fall back to rebuilding input from thread items.
Example:
# `ThreadMetadata.metadata` is a free-form dict for integration-specific state.
# ChatKit does not define a first-class `last_response_id` field on `ThreadMetadata`;
# your integration can store it under a key and reuse it on the next turn.
last_response_id = thread.metadata.get("last_response_id")
last_response_id = last_response_id if isinstance(last_response_id, str) else None
# Often: send only the new message as input when chaining on the server.
input_items = await simple_to_agent_input(input)
result = Runner.run_streamed(
assistant_agent,
input_items,
context=agent_context,
previous_response_id=last_response_id,
auto_previous_response_id=True,
)
# Persist the new response ID so the next turn can chain again.
if result.last_response_id:
thread.metadata["last_response_id"] = result.last_response_id
await self.store.save_thread(thread, context=context)
Run inference and stream events
Run your agent and stream events back to the client. stream_agent_response converts an Agents run into ChatKit events; you can also yield events manually.
from agents import (
InputGuardrailTripwireTriggered,
OutputGuardrailTripwireTriggered,
Runner,
)
from chatkit.agents import stream_agent_response
from chatkit.types import ErrorEvent
result = Runner.run_streamed(
assistant_agent,
input_items,
context=agent_context,
)
try:
async for event in stream_agent_response(agent_context, result):
yield event
except InputGuardrailTripwireTriggered:
yield ErrorEvent(message="We blocked that message for safety.")
except OutputGuardrailTripwireTriggered:
yield ErrorEvent(
message="The assistant response was blocked.",
allow_retry=False,
)
To stream events from a server tool during the same turn, use ctx.context.stream(...) inside the tool:
from agents import RunContextWrapper, function_tool
from chatkit.agents import AgentContext
from chatkit.types import ProgressUpdateEvent
@function_tool()
async def load_document(ctx: RunContextWrapper[AgentContext], document_id: str):
await ctx.context.stream(ProgressUpdateEvent(icon="document", text="Loading document..."))
return await get_document_by_id(document_id)
stream_agent_response will forward these events alongside any assistant text or tool call updates. Client tool calls are also supported via ctx.context.client_tool_call when you register the tool on both client and server.
Next: add features
- Let users browse past threads
- Accept rich user input
- Let users pick tools and models
- Pass extra app context to your model
- Update the client during a response
- Build interactive responses with widgets
- Add annotations in assistant messages
- Stream generated images
- Keep your app in sync with ChatKit
- Let your app draft and send messages
- Handle feedback
- Prepare your app for production