Skip to content

MCP Servers

MCPServer

Bases: ABC

Base class for Model Context Protocol servers.

Source code in src/agents/mcp/server.py
class MCPServer(abc.ABC):
    """Base class for Model Context Protocol servers."""

    @abc.abstractmethod
    async def connect(self):
        """Connect to the server. For example, this might mean spawning a subprocess or
        opening a network connection. The server is expected to remain connected until
        `cleanup()` is called.
        """
        pass

    @property
    @abc.abstractmethod
    def name(self) -> str:
        """A readable name for the server."""
        pass

    @abc.abstractmethod
    async def cleanup(self):
        """Cleanup the server. For example, this might mean closing a subprocess or
        closing a network connection.
        """
        pass

    @abc.abstractmethod
    async def list_tools(self) -> list[MCPTool]:
        """List the tools available on the server."""
        pass

    @abc.abstractmethod
    async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
        """Invoke a tool on the server."""
        pass

name abstractmethod property

name: str

A readable name for the server.

connect abstractmethod async

connect()

Connect to the server. For example, this might mean spawning a subprocess or opening a network connection. The server is expected to remain connected until cleanup() is called.

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def connect(self):
    """Connect to the server. For example, this might mean spawning a subprocess or
    opening a network connection. The server is expected to remain connected until
    `cleanup()` is called.
    """
    pass

cleanup abstractmethod async

cleanup()

Cleanup the server. For example, this might mean closing a subprocess or closing a network connection.

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def cleanup(self):
    """Cleanup the server. For example, this might mean closing a subprocess or
    closing a network connection.
    """
    pass

list_tools abstractmethod async

list_tools() -> list[Tool]

List the tools available on the server.

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def list_tools(self) -> list[MCPTool]:
    """List the tools available on the server."""
    pass

call_tool abstractmethod async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

Invoke a tool on the server.

Source code in src/agents/mcp/server.py
@abc.abstractmethod
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    pass

MCPServerStdioParams

Bases: TypedDict

Mirrors mcp.client.stdio.StdioServerParameters, but lets you pass params without another import.

Source code in src/agents/mcp/server.py
class MCPServerStdioParams(TypedDict):
    """Mirrors `mcp.client.stdio.StdioServerParameters`, but lets you pass params without another
    import.
    """

    command: str
    """The executable to run to start the server. For example, `python` or `node`."""

    args: NotRequired[list[str]]
    """Command line args to pass to the `command` executable. For example, `['foo.py']` or
    `['server.js', '--port', '8080']`."""

    env: NotRequired[dict[str, str]]
    """The environment variables to set for the server. ."""

    cwd: NotRequired[str | Path]
    """The working directory to use when spawning the process."""

    encoding: NotRequired[str]
    """The text encoding used when sending/receiving messages to the server. Defaults to `utf-8`."""

    encoding_error_handler: NotRequired[Literal["strict", "ignore", "replace"]]
    """The text encoding error handler. Defaults to `strict`.

    See https://docs.python.org/3/library/codecs.html#codec-base-classes for
    explanations of possible values.
    """

command instance-attribute

command: str

The executable to run to start the server. For example, python or node.

args instance-attribute

args: NotRequired[list[str]]

Command line args to pass to the command executable. For example, ['foo.py'] or ['server.js', '--port', '8080'].

env instance-attribute

env: NotRequired[dict[str, str]]

The environment variables to set for the server. .

cwd instance-attribute

cwd: NotRequired[str | Path]

The working directory to use when spawning the process.

encoding instance-attribute

encoding: NotRequired[str]

The text encoding used when sending/receiving messages to the server. Defaults to utf-8.

encoding_error_handler instance-attribute

encoding_error_handler: NotRequired[
    Literal["strict", "ignore", "replace"]
]

The text encoding error handler. Defaults to strict.

See https://docs.python.org/3/library/codecs.html#codec-base-classes for explanations of possible values.

MCPServerStdio

Bases: _MCPServerWithClientSession

MCP server implementation that uses the stdio transport. See the [spec] (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for details.

Source code in src/agents/mcp/server.py
class MCPServerStdio(_MCPServerWithClientSession):
    """MCP server implementation that uses the stdio transport. See the [spec]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#stdio) for
    details.
    """

    def __init__(
        self,
        params: MCPServerStdioParams,
        cache_tools_list: bool = False,
        name: str | None = None,
    ):
        """Create a new MCP server based on the stdio transport.

        Args:
            params: The params that configure the server. This includes the command to run to
                start the server, the args to pass to the command, the environment variables to
                set for the server, the working directory to use when spawning the process, and
                the text encoding used when sending/receiving messages to the server.
            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).
            name: A readable name for the server. If not provided, we'll create one from the
                command.
        """
        super().__init__(cache_tools_list)

        self.params = StdioServerParameters(
            command=params["command"],
            args=params.get("args", []),
            env=params.get("env"),
            cwd=params.get("cwd"),
            encoding=params.get("encoding", "utf-8"),
            encoding_error_handler=params.get("encoding_error_handler", "strict"),
        )

        self._name = name or f"stdio: {self.params.command}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[JSONRPCMessage | Exception],
            MemoryObjectSendStream[JSONRPCMessage],
        ]
    ]:
        """Create the streams for the server."""
        return stdio_client(self.params)

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

name property

name: str

A readable name for the server.

connect async

connect()

Connect to the server.

Source code in src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        read, write = transport
        session = await self.exit_stack.enter_async_context(ClientSession(read, write))
        await session.initialize()
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

Cleanup the server.

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
            self.session = None
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")

list_tools async

list_tools() -> list[Tool]

List the tools available on the server.

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

Invoke a tool on the server.

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

Invalidate the tools cache.

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True

__init__

__init__(
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
)

Create a new MCP server based on the stdio transport.

Parameters:

Name Type Description Default
params MCPServerStdioParams

The params that configure the server. This includes the command to run to start the server, the args to pass to the command, the environment variables to set for the server, the working directory to use when spawning the process, and the text encoding used when sending/receiving messages to the server.

required
cache_tools_list bool

Whether to cache the tools list. If True, the tools list will be cached and only fetched from the server once. If False, the tools list will be fetched from the server on each call to list_tools(). The cache can be invalidated by calling invalidate_tools_cache(). You should set this to True if you know the server will not change its tools list, because it can drastically improve latency (by avoiding a round-trip to the server every time).

False
name str | None

A readable name for the server. If not provided, we'll create one from the command.

None
Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerStdioParams,
    cache_tools_list: bool = False,
    name: str | None = None,
):
    """Create a new MCP server based on the stdio transport.

    Args:
        params: The params that configure the server. This includes the command to run to
            start the server, the args to pass to the command, the environment variables to
            set for the server, the working directory to use when spawning the process, and
            the text encoding used when sending/receiving messages to the server.
        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).
        name: A readable name for the server. If not provided, we'll create one from the
            command.
    """
    super().__init__(cache_tools_list)

    self.params = StdioServerParameters(
        command=params["command"],
        args=params.get("args", []),
        env=params.get("env"),
        cwd=params.get("cwd"),
        encoding=params.get("encoding", "utf-8"),
        encoding_error_handler=params.get("encoding_error_handler", "strict"),
    )

    self._name = name or f"stdio: {self.params.command}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            JSONRPCMessage | Exception
        ],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]

Create the streams for the server.

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[JSONRPCMessage | Exception],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]:
    """Create the streams for the server."""
    return stdio_client(self.params)

MCPServerSseParams

Bases: TypedDict

Mirrors the params inmcp.client.sse.sse_client.

Source code in src/agents/mcp/server.py
class MCPServerSseParams(TypedDict):
    """Mirrors the params in`mcp.client.sse.sse_client`."""

    url: str
    """The URL of the server."""

    headers: NotRequired[dict[str, str]]
    """The headers to send to the server."""

    timeout: NotRequired[float]
    """The timeout for the HTTP request. Defaults to 5 seconds."""

    sse_read_timeout: NotRequired[float]
    """The timeout for the SSE connection, in seconds. Defaults to 5 minutes."""

url instance-attribute

url: str

The URL of the server.

headers instance-attribute

headers: NotRequired[dict[str, str]]

The headers to send to the server.

timeout instance-attribute

timeout: NotRequired[float]

The timeout for the HTTP request. Defaults to 5 seconds.

sse_read_timeout instance-attribute

sse_read_timeout: NotRequired[float]

The timeout for the SSE connection, in seconds. Defaults to 5 minutes.

MCPServerSse

Bases: _MCPServerWithClientSession

MCP server implementation that uses the HTTP with SSE transport. See the [spec] (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse) for details.

Source code in src/agents/mcp/server.py
class MCPServerSse(_MCPServerWithClientSession):
    """MCP server implementation that uses the HTTP with SSE transport. See the [spec]
    (https://spec.modelcontextprotocol.io/specification/2024-11-05/basic/transports/#http-with-sse)
    for details.
    """

    def __init__(
        self,
        params: MCPServerSseParams,
        cache_tools_list: bool = False,
        name: str | None = None,
    ):
        """Create a new MCP server based on the HTTP with SSE transport.

        Args:
            params: The params that configure the server. This includes the URL of the server,
                the headers to send to the server, the timeout for the HTTP request, and the
                timeout for the SSE connection.

            cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
                cached and only fetched from the server once. If `False`, the tools list will be
                fetched from the server on each call to `list_tools()`. The cache can be
                invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
                if you know the server will not change its tools list, because it can drastically
                improve latency (by avoiding a round-trip to the server every time).

            name: A readable name for the server. If not provided, we'll create one from the
                URL.
        """
        super().__init__(cache_tools_list)

        self.params = params
        self._name = name or f"sse: {self.params['url']}"

    def create_streams(
        self,
    ) -> AbstractAsyncContextManager[
        tuple[
            MemoryObjectReceiveStream[JSONRPCMessage | Exception],
            MemoryObjectSendStream[JSONRPCMessage],
        ]
    ]:
        """Create the streams for the server."""
        return sse_client(
            url=self.params["url"],
            headers=self.params.get("headers", None),
            timeout=self.params.get("timeout", 5),
            sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
        )

    @property
    def name(self) -> str:
        """A readable name for the server."""
        return self._name

name property

name: str

A readable name for the server.

connect async

connect()

Connect to the server.

Source code in src/agents/mcp/server.py
async def connect(self):
    """Connect to the server."""
    try:
        transport = await self.exit_stack.enter_async_context(self.create_streams())
        read, write = transport
        session = await self.exit_stack.enter_async_context(ClientSession(read, write))
        await session.initialize()
        self.session = session
    except Exception as e:
        logger.error(f"Error initializing MCP server: {e}")
        await self.cleanup()
        raise

cleanup async

cleanup()

Cleanup the server.

Source code in src/agents/mcp/server.py
async def cleanup(self):
    """Cleanup the server."""
    async with self._cleanup_lock:
        try:
            await self.exit_stack.aclose()
            self.session = None
        except Exception as e:
            logger.error(f"Error cleaning up server: {e}")

list_tools async

list_tools() -> list[Tool]

List the tools available on the server.

Source code in src/agents/mcp/server.py
async def list_tools(self) -> list[MCPTool]:
    """List the tools available on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    # Return from cache if caching is enabled, we have tools, and the cache is not dirty
    if self.cache_tools_list and not self._cache_dirty and self._tools_list:
        return self._tools_list

    # Reset the cache dirty to False
    self._cache_dirty = False

    # Fetch the tools from the server
    self._tools_list = (await self.session.list_tools()).tools
    return self._tools_list

call_tool async

call_tool(
    tool_name: str, arguments: dict[str, Any] | None
) -> CallToolResult

Invoke a tool on the server.

Source code in src/agents/mcp/server.py
async def call_tool(self, tool_name: str, arguments: dict[str, Any] | None) -> CallToolResult:
    """Invoke a tool on the server."""
    if not self.session:
        raise UserError("Server not initialized. Make sure you call `connect()` first.")

    return await self.session.call_tool(tool_name, arguments)

invalidate_tools_cache

invalidate_tools_cache()

Invalidate the tools cache.

Source code in src/agents/mcp/server.py
def invalidate_tools_cache(self):
    """Invalidate the tools cache."""
    self._cache_dirty = True

__init__

__init__(
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
)

Create a new MCP server based on the HTTP with SSE transport.

Parameters:

Name Type Description Default
params MCPServerSseParams

The params that configure the server. This includes the URL of the server, the headers to send to the server, the timeout for the HTTP request, and the timeout for the SSE connection.

required
cache_tools_list bool

Whether to cache the tools list. If True, the tools list will be cached and only fetched from the server once. If False, the tools list will be fetched from the server on each call to list_tools(). The cache can be invalidated by calling invalidate_tools_cache(). You should set this to True if you know the server will not change its tools list, because it can drastically improve latency (by avoiding a round-trip to the server every time).

False
name str | None

A readable name for the server. If not provided, we'll create one from the URL.

None
Source code in src/agents/mcp/server.py
def __init__(
    self,
    params: MCPServerSseParams,
    cache_tools_list: bool = False,
    name: str | None = None,
):
    """Create a new MCP server based on the HTTP with SSE transport.

    Args:
        params: The params that configure the server. This includes the URL of the server,
            the headers to send to the server, the timeout for the HTTP request, and the
            timeout for the SSE connection.

        cache_tools_list: Whether to cache the tools list. If `True`, the tools list will be
            cached and only fetched from the server once. If `False`, the tools list will be
            fetched from the server on each call to `list_tools()`. The cache can be
            invalidated by calling `invalidate_tools_cache()`. You should set this to `True`
            if you know the server will not change its tools list, because it can drastically
            improve latency (by avoiding a round-trip to the server every time).

        name: A readable name for the server. If not provided, we'll create one from the
            URL.
    """
    super().__init__(cache_tools_list)

    self.params = params
    self._name = name or f"sse: {self.params['url']}"

create_streams

create_streams() -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[
            JSONRPCMessage | Exception
        ],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]

Create the streams for the server.

Source code in src/agents/mcp/server.py
def create_streams(
    self,
) -> AbstractAsyncContextManager[
    tuple[
        MemoryObjectReceiveStream[JSONRPCMessage | Exception],
        MemoryObjectSendStream[JSONRPCMessage],
    ]
]:
    """Create the streams for the server."""
    return sse_client(
        url=self.params["url"],
        headers=self.params.get("headers", None),
        timeout=self.params.get("timeout", 5),
        sse_read_timeout=self.params.get("sse_read_timeout", 60 * 5),
    )