@dataclass
class RunState(Generic[TContext, TAgent]):
"""Serializable snapshot of an agent run, including context, usage, and interruptions.
``RunState`` is the durable pause/resume boundary for human-in-the-loop flows. It stores
enough information to continue an interrupted run, including model responses, generated
items, approval state, and optional server-managed conversation identifiers.
Context serialization is intentionally conservative:
- Mapping contexts round-trip directly.
- Custom contexts may require a serializer and deserializer.
- When no safe serializer is available, the snapshot is still written but emits warnings and
records metadata describing what is required to rebuild the original context type.
"""
_current_turn: int = 0
"""Current turn number in the conversation."""
_current_agent: TAgent | None = None
"""The agent currently handling the conversation."""
_original_input: str | list[Any] = field(default_factory=list)
"""Original user input prior to any processing."""
_model_responses: list[ModelResponse] = field(default_factory=list)
"""Responses from the model so far."""
_context: RunContextWrapper[TContext] | None = None
"""Run context tracking approvals, usage, and other metadata."""
_generated_items: list[RunItem] = field(default_factory=list)
"""Items used to build model input when resuming; may be filtered by handoffs."""
_session_items: list[RunItem] = field(default_factory=list)
"""Full, unfiltered run items for session history."""
_max_turns: int = 10
"""Maximum allowed turns before forcing termination."""
_conversation_id: str | None = None
"""Conversation identifier for server-managed conversation tracking."""
_previous_response_id: str | None = None
"""Response identifier of the last server-managed response."""
_auto_previous_response_id: bool = False
"""Whether the previous response id should be automatically tracked."""
_reasoning_item_id_policy: Literal["preserve", "omit"] | None = None
"""How reasoning item IDs are represented in next-turn model input."""
_input_guardrail_results: list[InputGuardrailResult] = field(default_factory=list)
"""Results from input guardrails applied to the run."""
_output_guardrail_results: list[OutputGuardrailResult] = field(default_factory=list)
"""Results from output guardrails applied to the run."""
_tool_input_guardrail_results: list[ToolInputGuardrailResult] = field(default_factory=list)
"""Results from tool input guardrails applied during the run."""
_tool_output_guardrail_results: list[ToolOutputGuardrailResult] = field(default_factory=list)
"""Results from tool output guardrails applied during the run."""
_current_step: NextStepInterruption | None = None
"""Current step if the run is interrupted (e.g., for tool approval)."""
_last_processed_response: ProcessedResponse | None = None
"""The last processed model response. This is needed for resuming from interruptions."""
_generated_items_last_processed_marker: str | None = field(default=None, repr=False)
"""Tracks whether _generated_items already include the current last_processed_response."""
_current_turn_persisted_item_count: int = 0
"""Tracks how many items from this turn were already written to the session."""
_tool_use_tracker_snapshot: dict[str, list[str]] = field(default_factory=dict)
"""Serialized snapshot of the AgentToolUseTracker (agent name -> tools used)."""
_trace_state: TraceState | None = field(default=None, repr=False)
"""Serialized trace metadata for resuming tracing context."""
_agent_tool_state_scope_id: str | None = field(default=None, repr=False)
"""Private scope id used to isolate agent-tool pending state per RunState instance."""
def __init__(
self,
context: RunContextWrapper[TContext],
original_input: str | list[Any],
starting_agent: TAgent,
max_turns: int = 10,
*,
conversation_id: str | None = None,
previous_response_id: str | None = None,
auto_previous_response_id: bool = False,
):
"""Initialize a new RunState."""
self._context = context
self._original_input = _clone_original_input(original_input)
self._current_agent = starting_agent
self._max_turns = max_turns
self._conversation_id = conversation_id
self._previous_response_id = previous_response_id
self._auto_previous_response_id = auto_previous_response_id
self._reasoning_item_id_policy = None
self._model_responses = []
self._generated_items = []
self._session_items = []
self._input_guardrail_results = []
self._output_guardrail_results = []
self._tool_input_guardrail_results = []
self._tool_output_guardrail_results = []
self._current_step = None
self._current_turn = 0
self._last_processed_response = None
self._generated_items_last_processed_marker = None
self._current_turn_persisted_item_count = 0
self._tool_use_tracker_snapshot = {}
self._trace_state = None
from .agent_tool_state import get_agent_tool_state_scope
self._agent_tool_state_scope_id = get_agent_tool_state_scope(context)
def get_interruptions(self) -> list[ToolApprovalItem]:
"""Return pending interruptions if the current step is an interruption."""
# Import at runtime to avoid circular import
from .run_internal.run_steps import NextStepInterruption
if self._current_step is None or not isinstance(self._current_step, NextStepInterruption):
return []
return self._current_step.interruptions
def approve(self, approval_item: ToolApprovalItem, always_approve: bool = False) -> None:
"""Approve a tool call and rerun with this state to continue."""
if self._context is None:
raise UserError("Cannot approve tool: RunState has no context")
self._context.approve_tool(approval_item, always_approve=always_approve)
def reject(self, approval_item: ToolApprovalItem, always_reject: bool = False) -> None:
"""Reject a tool call and rerun with this state to continue."""
if self._context is None:
raise UserError("Cannot reject tool: RunState has no context")
self._context.reject_tool(approval_item, always_reject=always_reject)
def _serialize_approvals(self) -> dict[str, dict[str, Any]]:
"""Serialize approval records into a JSON-friendly mapping."""
if self._context is None:
return {}
approvals_dict: dict[str, dict[str, Any]] = {}
for tool_name, record in self._context._approvals.items():
approvals_dict[tool_name] = {
"approved": record.approved
if isinstance(record.approved, bool)
else list(record.approved),
"rejected": record.rejected
if isinstance(record.rejected, bool)
else list(record.rejected),
}
return approvals_dict
def _serialize_model_responses(self) -> list[dict[str, Any]]:
"""Serialize model responses."""
return [
{
"usage": serialize_usage(resp.usage),
"output": [_serialize_raw_item_value(item) for item in resp.output],
"response_id": resp.response_id,
"request_id": resp.request_id,
}
for resp in self._model_responses
]
def _serialize_original_input(self) -> str | list[Any]:
"""Normalize original input into the shape expected by Responses API."""
if not isinstance(self._original_input, list):
return self._original_input
normalized_items = []
for item in self._original_input:
normalized_item = _serialize_raw_item_value(item)
if isinstance(normalized_item, dict):
normalized_item = dict(normalized_item)
role = normalized_item.get("role")
if role == "assistant":
content = normalized_item.get("content")
if isinstance(content, str):
normalized_item["content"] = [{"type": "output_text", "text": content}]
if "status" not in normalized_item:
normalized_item["status"] = "completed"
normalized_items.append(normalized_item)
return normalized_items
def _serialize_context_payload(
self,
*,
context_serializer: ContextSerializer | None = None,
strict_context: bool = False,
) -> tuple[dict[str, Any] | None, dict[str, Any]]:
"""Validate and serialize the stored run context.
The returned metadata captures how the context was serialized so restore-time code can
decide whether a deserializer or override is required. This lets RunState remain durable
for simple mapping contexts without silently pretending that richer custom objects can be
reconstructed automatically.
"""
if self._context is None:
return None, _build_context_meta(
None,
serialized_via="none",
requires_deserializer=False,
omitted=False,
)
raw_context_payload = self._context.context
if raw_context_payload is None:
return None, _build_context_meta(
raw_context_payload,
serialized_via="none",
requires_deserializer=False,
omitted=False,
)
if isinstance(raw_context_payload, Mapping):
return (
dict(raw_context_payload),
_build_context_meta(
raw_context_payload,
serialized_via="mapping",
requires_deserializer=False,
omitted=False,
),
)
if strict_context and context_serializer is None:
# Avoid silently dropping non-mapping context data when strict mode is requested.
raise UserError(
"RunState serialization requires context to be a mapping when strict_context "
"is True. Provide context_serializer to serialize custom contexts."
)
if context_serializer is not None:
try:
serialized = context_serializer(raw_context_payload)
except Exception as exc:
raise UserError(
"Context serializer failed while serializing RunState context."
) from exc
if not isinstance(serialized, Mapping):
raise UserError("Context serializer must return a mapping.")
return (
dict(serialized),
_build_context_meta(
raw_context_payload,
serialized_via="context_serializer",
requires_deserializer=True,
omitted=False,
),
)
if hasattr(raw_context_payload, "model_dump"):
try:
serialized = raw_context_payload.model_dump(exclude_unset=True)
except TypeError:
serialized = raw_context_payload.model_dump()
if not isinstance(serialized, Mapping):
raise UserError("RunState context model_dump must return a mapping.")
# We can persist the data, but the original type is lost unless the caller rebuilds it.
logger.warning(
"RunState context was serialized from a Pydantic model. "
"Provide context_deserializer or context_override to restore the original type."
)
return (
dict(serialized),
_build_context_meta(
raw_context_payload,
serialized_via="model_dump",
requires_deserializer=True,
omitted=False,
),
)
if dataclasses.is_dataclass(raw_context_payload):
serialized = dataclasses.asdict(cast(Any, raw_context_payload))
if not isinstance(serialized, Mapping):
raise UserError("RunState dataclass context must serialize to a mapping.")
# Dataclass instances serialize to dicts, so reconstruction requires a deserializer.
logger.warning(
"RunState context was serialized from a dataclass. "
"Provide context_deserializer or context_override to restore the original type."
)
return (
dict(serialized),
_build_context_meta(
raw_context_payload,
serialized_via="asdict",
requires_deserializer=True,
omitted=False,
),
)
# Fall back to an empty dict so the run state remains serializable, but
# explicitly warn because the original context will be unavailable on restore.
logger.warning(
"RunState context of type %s is not serializable; storing empty context. "
"Provide context_serializer to preserve it.",
type(raw_context_payload).__name__,
)
return (
{},
_build_context_meta(
raw_context_payload,
serialized_via="omitted",
requires_deserializer=True,
omitted=True,
),
)
def _serialize_tool_input(self, tool_input: Any) -> Any:
"""Normalize tool input for JSON serialization."""
if tool_input is None:
return None
if dataclasses.is_dataclass(tool_input):
return dataclasses.asdict(cast(Any, tool_input))
if hasattr(tool_input, "model_dump"):
try:
serialized = tool_input.model_dump(exclude_unset=True)
except TypeError:
serialized = tool_input.model_dump()
return _to_dump_compatible(serialized)
return _to_dump_compatible(tool_input)
def _current_generated_items_merge_marker(self) -> str | None:
"""Return a marker for the processed response already reflected in _generated_items."""
if not (self._last_processed_response and self._last_processed_response.new_items):
return None
latest_response_id = (
self._model_responses[-1].response_id if self._model_responses else None
)
serialized_items = [
self._serialize_item(item) for item in self._last_processed_response.new_items
]
return json.dumps(
{
"current_turn": self._current_turn,
"last_response_id": latest_response_id,
"new_items": serialized_items,
},
sort_keys=True,
default=str,
)
def _mark_generated_items_merged_with_last_processed(self) -> None:
"""Remember that _generated_items already include the current processed response."""
self._generated_items_last_processed_marker = self._current_generated_items_merge_marker()
def _clear_generated_items_last_processed_marker(self) -> None:
"""Forget any prior merge marker after _generated_items is replaced."""
self._generated_items_last_processed_marker = None
def _merge_generated_items_with_processed(self) -> list[RunItem]:
"""Merge persisted and newly processed items without duplication."""
generated_items = list(self._generated_items)
if not (self._last_processed_response and self._last_processed_response.new_items):
return generated_items
current_merge_marker = self._current_generated_items_merge_marker()
if (
current_merge_marker is not None
and self._generated_items_last_processed_marker == current_merge_marker
):
return generated_items
seen_id_types: set[tuple[str, str]] = set()
seen_call_ids: set[str] = set()
seen_call_id_types: set[tuple[str, str]] = set()
def _id_type_call(item: Any) -> tuple[str | None, str | None, str | None]:
item_id = None
item_type = None
call_id = None
if hasattr(item, "raw_item"):
raw = item.raw_item
if isinstance(raw, dict):
item_id = raw.get("id")
item_type = raw.get("type")
call_id = raw.get("call_id")
else:
item_id = _get_attr(raw, "id")
item_type = _get_attr(raw, "type")
call_id = _get_attr(raw, "call_id")
if item_id is None and hasattr(item, "id"):
item_id = _get_attr(item, "id")
if item_type is None and hasattr(item, "type"):
item_type = _get_attr(item, "type")
return item_id, item_type, call_id
for existing in generated_items:
item_id, item_type, call_id = _id_type_call(existing)
if item_id and item_type:
seen_id_types.add((item_id, item_type))
if call_id and item_type:
seen_call_id_types.add((call_id, item_type))
elif call_id:
seen_call_ids.add(call_id)
for new_item in self._last_processed_response.new_items:
item_id, item_type, call_id = _id_type_call(new_item)
if call_id and item_type:
if (call_id, item_type) in seen_call_id_types:
continue
elif call_id and call_id in seen_call_ids:
continue
if item_id and item_type and (item_id, item_type) in seen_id_types:
continue
if item_id and item_type:
seen_id_types.add((item_id, item_type))
if call_id and item_type:
seen_call_id_types.add((call_id, item_type))
elif call_id:
seen_call_ids.add(call_id)
generated_items.append(new_item)
if current_merge_marker is not None:
self._generated_items_last_processed_marker = current_merge_marker
return generated_items
def to_json(
self,
*,
context_serializer: ContextSerializer | None = None,
strict_context: bool = False,
include_tracing_api_key: bool = False,
) -> dict[str, Any]:
"""Serializes the run state to a JSON-compatible dictionary.
This method is used to serialize the run state to a dictionary that can be used to
resume the run later.
Args:
context_serializer: Optional function to serialize non-mapping context values.
strict_context: When True, require mapping contexts or a context_serializer.
include_tracing_api_key: When True, include the tracing API key in the trace payload.
Returns:
A dictionary representation of the run state.
Raises:
UserError: If required state (agent, context) is missing.
"""
if self._current_agent is None:
raise UserError("Cannot serialize RunState: No current agent")
if self._context is None:
raise UserError("Cannot serialize RunState: No context")
approvals_dict = self._serialize_approvals()
model_responses = self._serialize_model_responses()
original_input_serialized = self._serialize_original_input()
context_payload, context_meta = self._serialize_context_payload(
context_serializer=context_serializer,
strict_context=strict_context,
)
context_entry: dict[str, Any] = {
"usage": serialize_usage(self._context.usage),
"approvals": approvals_dict,
"context": context_payload,
# Preserve metadata so deserialization can warn when context types were erased.
"context_meta": context_meta,
}
tool_input = self._serialize_tool_input(self._context.tool_input)
if tool_input is not None:
context_entry["tool_input"] = tool_input
result = {
"$schemaVersion": CURRENT_SCHEMA_VERSION,
"current_turn": self._current_turn,
"current_agent": {"name": self._current_agent.name},
"original_input": original_input_serialized,
"model_responses": model_responses,
"context": context_entry,
"tool_use_tracker": copy.deepcopy(self._tool_use_tracker_snapshot),
"max_turns": self._max_turns,
"no_active_agent_run": True,
"input_guardrail_results": _serialize_guardrail_results(self._input_guardrail_results),
"output_guardrail_results": _serialize_guardrail_results(
self._output_guardrail_results
),
"tool_input_guardrail_results": _serialize_tool_guardrail_results(
self._tool_input_guardrail_results, type_label="tool_input"
),
"tool_output_guardrail_results": _serialize_tool_guardrail_results(
self._tool_output_guardrail_results, type_label="tool_output"
),
"conversation_id": self._conversation_id,
"previous_response_id": self._previous_response_id,
"auto_previous_response_id": self._auto_previous_response_id,
"reasoning_item_id_policy": self._reasoning_item_id_policy,
}
generated_items = self._merge_generated_items_with_processed()
result["generated_items"] = [self._serialize_item(item) for item in generated_items]
result["session_items"] = [self._serialize_item(item) for item in list(self._session_items)]
result["current_step"] = self._serialize_current_step()
result["last_model_response"] = _serialize_last_model_response(model_responses)
result["last_processed_response"] = (
self._serialize_processed_response(
self._last_processed_response,
context_serializer=context_serializer,
strict_context=strict_context,
include_tracing_api_key=include_tracing_api_key,
)
if self._last_processed_response
else None
)
result["current_turn_persisted_item_count"] = self._current_turn_persisted_item_count
result["trace"] = self._serialize_trace_data(
include_tracing_api_key=include_tracing_api_key
)
return result
def _serialize_processed_response(
self,
processed_response: ProcessedResponse,
*,
context_serializer: ContextSerializer | None = None,
strict_context: bool = False,
include_tracing_api_key: bool = False,
) -> dict[str, Any]:
"""Serialize a ProcessedResponse to JSON format.
Args:
processed_response: The ProcessedResponse to serialize.
Returns:
A dictionary representation of the ProcessedResponse.
"""
action_groups = _serialize_tool_action_groups(processed_response)
_serialize_pending_nested_agent_tool_runs(
parent_state=self,
function_entries=action_groups.get("functions", []),
function_runs=processed_response.functions,
scope_id=self._agent_tool_state_scope_id,
context_serializer=context_serializer,
strict_context=strict_context,
include_tracing_api_key=include_tracing_api_key,
)
interruptions_data = [
_serialize_tool_approval_interruption(interruption, include_tool_name=True)
for interruption in processed_response.interruptions
if isinstance(interruption, ToolApprovalItem)
]
return {
"new_items": [self._serialize_item(item) for item in processed_response.new_items],
"tools_used": processed_response.tools_used,
**action_groups,
"interruptions": interruptions_data,
}
def _serialize_current_step(self) -> dict[str, Any] | None:
"""Serialize the current step if it's an interruption."""
# Import at runtime to avoid circular import
from .run_internal.run_steps import NextStepInterruption
if self._current_step is None or not isinstance(self._current_step, NextStepInterruption):
return None
interruptions_data = [
_serialize_tool_approval_interruption(
item, include_tool_name=item.tool_name is not None
)
for item in self._current_step.interruptions
if isinstance(item, ToolApprovalItem)
]
return {
"type": "next_step_interruption",
"data": {
"interruptions": interruptions_data,
},
}
def _serialize_item(self, item: RunItem) -> dict[str, Any]:
"""Serialize a run item to JSON-compatible dict."""
raw_item_dict: Any = _serialize_raw_item_value(item.raw_item)
result: dict[str, Any] = {
"type": item.type,
"raw_item": raw_item_dict,
"agent": {"name": item.agent.name},
}
# Add additional fields based on item type
if hasattr(item, "output"):
serialized_output = item.output
try:
if hasattr(serialized_output, "model_dump"):
serialized_output = serialized_output.model_dump(exclude_unset=True)
elif dataclasses.is_dataclass(serialized_output):
serialized_output = dataclasses.asdict(serialized_output) # type: ignore[arg-type]
serialized_output = _ensure_json_compatible(serialized_output)
except Exception:
serialized_output = str(item.output)
result["output"] = serialized_output
if hasattr(item, "source_agent"):
result["source_agent"] = {"name": item.source_agent.name}
if hasattr(item, "target_agent"):
result["target_agent"] = {"name": item.target_agent.name}
if hasattr(item, "tool_name") and item.tool_name is not None:
result["tool_name"] = item.tool_name
if hasattr(item, "tool_namespace") and item.tool_namespace is not None:
result["tool_namespace"] = item.tool_namespace
tool_lookup_key = serialize_function_tool_lookup_key(getattr(item, "tool_lookup_key", None))
if tool_lookup_key is not None:
result["tool_lookup_key"] = tool_lookup_key
if getattr(item, "_allow_bare_name_alias", False):
result["allow_bare_name_alias"] = True
if hasattr(item, "description") and item.description is not None:
result["description"] = item.description
return result
def _lookup_function_name(self, call_id: str) -> str:
"""Attempt to find the function name for the provided call_id."""
if not call_id:
return ""
def _extract_name(raw: Any) -> str | None:
if isinstance(raw, dict):
candidate_call_id = cast(Optional[str], raw.get("call_id"))
if candidate_call_id == call_id:
name_value = raw.get("name", "")
return str(name_value) if name_value else ""
else:
candidate_call_id = cast(Optional[str], _get_attr(raw, "call_id"))
if candidate_call_id == call_id:
name_value = _get_attr(raw, "name", "")
return str(name_value) if name_value else ""
return None
# Search generated items first
for run_item in self._generated_items:
if run_item.type != "tool_call_item":
continue
name = _extract_name(run_item.raw_item)
if name is not None:
return name
# Inspect last processed response
if self._last_processed_response is not None:
for run_item in self._last_processed_response.new_items:
if run_item.type != "tool_call_item":
continue
name = _extract_name(run_item.raw_item)
if name is not None:
return name
# Finally, inspect the original input list where the function call originated
if isinstance(self._original_input, list):
for input_item in self._original_input:
if not isinstance(input_item, dict):
continue
if input_item.get("type") != "function_call":
continue
item_call_id = cast(Optional[str], input_item.get("call_id"))
if item_call_id == call_id:
name_value = input_item.get("name", "")
return str(name_value) if name_value else ""
return ""
def to_string(
self,
*,
context_serializer: ContextSerializer | None = None,
strict_context: bool = False,
include_tracing_api_key: bool = False,
) -> str:
"""Serializes the run state to a JSON string.
Args:
include_tracing_api_key: When True, include the tracing API key in the trace payload.
Returns:
JSON string representation of the run state.
"""
return json.dumps(
self.to_json(
context_serializer=context_serializer,
strict_context=strict_context,
include_tracing_api_key=include_tracing_api_key,
),
indent=2,
)
def set_trace(self, trace: Trace | None) -> None:
"""Capture trace metadata for serialization/resumption."""
self._trace_state = TraceState.from_trace(trace)
def _serialize_trace_data(self, *, include_tracing_api_key: bool) -> dict[str, Any] | None:
if not self._trace_state:
return None
return self._trace_state.to_json(include_tracing_api_key=include_tracing_api_key)
def set_tool_use_tracker_snapshot(self, snapshot: Mapping[str, Sequence[str]] | None) -> None:
"""Store a copy of the serialized tool-use tracker data."""
if not snapshot:
self._tool_use_tracker_snapshot = {}
return
normalized: dict[str, list[str]] = {}
for agent_name, tools in snapshot.items():
if not isinstance(agent_name, str):
continue
normalized[agent_name] = [tool for tool in tools if isinstance(tool, str)]
self._tool_use_tracker_snapshot = normalized
def set_reasoning_item_id_policy(self, policy: Literal["preserve", "omit"] | None) -> None:
"""Store how reasoning item IDs should appear in next-turn model input."""
self._reasoning_item_id_policy = policy
def get_tool_use_tracker_snapshot(self) -> dict[str, list[str]]:
"""Return a defensive copy of the tool-use tracker snapshot."""
return {
agent_name: list(tool_names)
for agent_name, tool_names in self._tool_use_tracker_snapshot.items()
}
@staticmethod
async def from_string(
initial_agent: Agent[Any],
state_string: str,
*,
context_override: ContextOverride | None = None,
context_deserializer: ContextDeserializer | None = None,
strict_context: bool = False,
) -> RunState[Any, Agent[Any]]:
"""Deserializes a run state from a JSON string.
This method is used to deserialize a run state from a string that was serialized using
the `to_string()` method.
Args:
initial_agent: The initial agent (used to build agent map for resolution).
state_string: The JSON string to deserialize.
context_override: Optional context mapping or RunContextWrapper to use instead of the
serialized context.
context_deserializer: Optional function to rebuild non-mapping context values.
strict_context: When True, require a deserializer or override for non-mapping contexts.
Returns:
A reconstructed RunState instance.
Raises:
UserError: If the string is invalid JSON or has incompatible schema version.
"""
try:
state_json = json.loads(state_string)
except json.JSONDecodeError as e:
raise UserError(f"Failed to parse run state JSON: {e}") from e
return await RunState.from_json(
initial_agent=initial_agent,
state_json=state_json,
context_override=context_override,
context_deserializer=context_deserializer,
strict_context=strict_context,
)
@staticmethod
async def from_json(
initial_agent: Agent[Any],
state_json: dict[str, Any],
*,
context_override: ContextOverride | None = None,
context_deserializer: ContextDeserializer | None = None,
strict_context: bool = False,
) -> RunState[Any, Agent[Any]]:
"""Deserializes a run state from a JSON dictionary.
This method is used to deserialize a run state from a dict that was created using
the `to_json()` method.
Args:
initial_agent: The initial agent (used to build agent map for resolution).
state_json: The JSON dictionary to deserialize.
context_override: Optional context mapping or RunContextWrapper to use instead of the
serialized context.
context_deserializer: Optional function to rebuild non-mapping context values.
strict_context: When True, require a deserializer or override for non-mapping contexts.
Returns:
A reconstructed RunState instance.
Raises:
UserError: If the dict has incompatible schema version.
"""
return await _build_run_state_from_json(
initial_agent=initial_agent,
state_json=state_json,
context_override=context_override,
context_deserializer=context_deserializer,
strict_context=strict_context,
)