Files
letta-server/letta/services/mcp/fastmcp_client.py
Kian Jones ddcfeb26b1 fix(core): catch all MCP tool execution errors instead of re-raising (#9419)
* fix(core): catch all MCP tool execution errors instead of re-raising

MCP tools are external user-configured servers - any failure during
tool execution is expected and should be returned as (error_msg, False)
to the agent, not raised as an exception that hits Datadog as a 500.

Previously:
- base_client.py only caught McpError/ToolError, re-raised everything else
- fastmcp_client.py (both SSE and StreamableHTTP) always re-raised

Now all three execute_tool() methods catch all exceptions and return
the error message to the agent conversation. The agent handles tool
failures via the error message naturally.

This silences ~15 Datadog issue types including:
- fastmcp.exceptions.ToolError (validation, permissions)
- mcp.shared.exceptions.McpError (connection closed, credentials)
- httpx.HTTPStatusError (503 from Zapier, etc.)
- httpx.ConnectError, ReadTimeout, RemoteProtocolError
- requests.exceptions.ConnectionError
- builtins.ConnectionError

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix(core): log unexpected MCP errors at warning level with traceback

Expected MCP errors (ToolError, McpError, httpx.*, ConnectionError, etc.)
log at info level. Anything else (e.g. TypeError, AttributeError from
our own code) logs at warning with exc_info=True so it still surfaces
in Datadog without crashing the request.

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

---------

Co-authored-by: Letta <noreply@letta.com>
2026-02-24 10:52:07 -08:00

340 lines
13 KiB
Python

"""FastMCP-based MCP clients with server-side OAuth support.
This module provides MCP client implementations using the FastMCP library,
with support for server-side OAuth flows where authorization URLs are
forwarded to web clients instead of opening a browser.
These clients replace the existing AsyncSSEMCPClient and AsyncStreamableHTTPMCPClient
implementations that used the lower-level MCP SDK directly.
"""
from contextlib import AsyncExitStack
from typing import List, Optional, Tuple
import httpx
from fastmcp import Client
from fastmcp.client.transports import SSETransport, StreamableHttpTransport
from mcp import Tool as MCPTool
from letta.errors import LettaMCPConnectionError
from letta.functions.mcp_client.types import SSEServerConfig, StreamableHTTPServerConfig
from letta.log import get_logger
from letta.services.mcp.base_client import _log_mcp_tool_error
from letta.services.mcp.server_side_oauth import ServerSideOAuth
logger = get_logger(__name__)
class AsyncFastMCPSSEClient:
"""SSE MCP client using FastMCP with server-side OAuth support.
This client connects to MCP servers using Server-Sent Events (SSE) transport.
It supports both authenticated and unauthenticated connections, with OAuth
handled via the ServerSideOAuth class for server-side flows.
Args:
server_config: SSE server configuration including URL, headers, and auth settings
oauth: Optional ServerSideOAuth instance for OAuth authentication
agent_id: Optional agent ID to include in request headers
"""
AGENT_ID_HEADER = "X-Agent-Id"
def __init__(
self,
server_config: SSEServerConfig,
oauth: Optional[ServerSideOAuth] = None,
agent_id: Optional[str] = None,
):
self.server_config = server_config
self.oauth = oauth
self.agent_id = agent_id
self.client: Optional[Client] = None
self.initialized = False
self.exit_stack = AsyncExitStack()
async def connect_to_server(self):
"""Establish connection to the MCP server.
Raises:
ConnectionError: If connection to the server fails
"""
try:
headers = {}
if self.server_config.custom_headers:
headers.update(self.server_config.custom_headers)
if self.server_config.auth_header and self.server_config.auth_token:
headers[self.server_config.auth_header] = self.server_config.auth_token
if self.agent_id:
headers[self.AGENT_ID_HEADER] = self.agent_id
transport = SSETransport(
url=self.server_config.server_url,
headers=headers if headers else None,
auth=self.oauth, # Pass ServerSideOAuth instance (or None)
)
self.client = Client(transport)
await self.client._connect()
self.initialized = True
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise LettaMCPConnectionError(message="401 Unauthorized", server_name=self.server_config.server_name) from e
raise LettaMCPConnectionError(
message=f"HTTP error connecting to MCP server at {self.server_config.server_url}: {e}",
server_name=self.server_config.server_name,
) from e
except LettaMCPConnectionError:
raise
except ConnectionError as e:
raise LettaMCPConnectionError(message=str(e), server_name=self.server_config.server_name) from e
except Exception as e:
logger.warning(
f"Connecting to MCP server failed. Please review your server config: {self.server_config.model_dump_json(indent=4)}. Error: {str(e)}"
)
raise LettaMCPConnectionError(
message=f"Failed to connect to MCP server at '{self.server_config.server_url}'. "
f"Please check your configuration and ensure the server is accessible. Error: {str(e)}",
server_name=self.server_config.server_name,
) from e
async def list_tools(self, serialize: bool = False) -> List[MCPTool]:
"""List available tools from the MCP server.
Args:
serialize: If True, return tools as dictionaries instead of MCPTool objects
Returns:
List of tools available on the server
Raises:
RuntimeError: If client has not been initialized
"""
self._check_initialized()
tools = await self.client.list_tools()
if serialize:
serializable_tools = []
for tool in tools:
if hasattr(tool, "model_dump"):
serializable_tools.append(tool.model_dump())
elif hasattr(tool, "dict"):
serializable_tools.append(tool.dict())
elif hasattr(tool, "__dict__"):
serializable_tools.append(tool.__dict__)
else:
serializable_tools.append(str(tool))
return serializable_tools
return tools
async def execute_tool(self, tool_name: str, tool_args: dict) -> Tuple[str, bool]:
"""Execute a tool on the MCP server.
Args:
tool_name: Name of the tool to execute
tool_args: Arguments to pass to the tool
Returns:
Tuple of (result_content, success_flag)
Raises:
RuntimeError: If client has not been initialized
"""
self._check_initialized()
try:
result = await self.client.call_tool(tool_name, tool_args)
except Exception as e:
exception_to_check = e
if hasattr(e, "exceptions") and e.exceptions and len(e.exceptions) == 1:
exception_to_check = e.exceptions[0]
_log_mcp_tool_error(logger, tool_name, exception_to_check)
return str(exception_to_check), False
# Parse content from result
parsed_content = []
for content_piece in result.content:
if hasattr(content_piece, "text"):
parsed_content.append(content_piece.text)
logger.debug(f"MCP tool result parsed content (text): {parsed_content}")
else:
parsed_content.append(str(content_piece))
logger.debug(f"MCP tool result parsed content (other): {parsed_content}")
if parsed_content:
final_content = " ".join(parsed_content)
else:
final_content = "Empty response from tool"
return final_content, not result.is_error
def _check_initialized(self):
"""Check if the client has been initialized."""
if not self.initialized:
logger.error("MCPClient has not been initialized")
raise RuntimeError("MCPClient has not been initialized")
async def cleanup(self):
"""Clean up client resources."""
if self.client:
try:
await self.client.close()
except Exception as e:
logger.warning(f"Error during FastMCP client cleanup: {e}")
self.initialized = False
class AsyncFastMCPStreamableHTTPClient:
"""Streamable HTTP MCP client using FastMCP with server-side OAuth support.
This client connects to MCP servers using Streamable HTTP transport.
It supports both authenticated and unauthenticated connections, with OAuth
handled via the ServerSideOAuth class for server-side flows.
Args:
server_config: Streamable HTTP server configuration
oauth: Optional ServerSideOAuth instance for OAuth authentication
agent_id: Optional agent ID to include in request headers
"""
AGENT_ID_HEADER = "X-Agent-Id"
def __init__(
self,
server_config: StreamableHTTPServerConfig,
oauth: Optional[ServerSideOAuth] = None,
agent_id: Optional[str] = None,
):
self.server_config = server_config
self.oauth = oauth
self.agent_id = agent_id
self.client: Optional[Client] = None
self.initialized = False
self.exit_stack = AsyncExitStack()
async def connect_to_server(self):
"""Establish connection to the MCP server.
Raises:
ConnectionError: If connection to the server fails
"""
try:
headers = {}
if self.server_config.custom_headers:
headers.update(self.server_config.custom_headers)
if self.server_config.auth_header and self.server_config.auth_token:
headers[self.server_config.auth_header] = self.server_config.auth_token
if self.agent_id:
headers[self.AGENT_ID_HEADER] = self.agent_id
transport = StreamableHttpTransport(
url=self.server_config.server_url,
headers=headers if headers else None,
auth=self.oauth, # Pass ServerSideOAuth instance (or None)
)
self.client = Client(transport)
await self.client._connect()
self.initialized = True
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
raise LettaMCPConnectionError(message="401 Unauthorized", server_name=self.server_config.server_name) from e
raise LettaMCPConnectionError(
message=f"HTTP error connecting to MCP server at {self.server_config.server_url}: {e}",
server_name=self.server_config.server_name,
) from e
except LettaMCPConnectionError:
raise
except ConnectionError as e:
raise LettaMCPConnectionError(message=str(e), server_name=self.server_config.server_name) from e
except Exception as e:
logger.warning(
f"Connecting to MCP server failed. Please review your server config: {self.server_config.model_dump_json(indent=4)}. Error: {str(e)}"
)
raise LettaMCPConnectionError(
message=f"Failed to connect to MCP server at '{self.server_config.server_url}'. "
f"Please check your configuration and ensure the server is accessible. Error: {str(e)}",
server_name=self.server_config.server_name,
) from e
async def list_tools(self, serialize: bool = False) -> List[MCPTool]:
"""List available tools from the MCP server.
Args:
serialize: If True, return tools as dictionaries instead of MCPTool objects
Returns:
List of tools available on the server
Raises:
RuntimeError: If client has not been initialized
"""
self._check_initialized()
tools = await self.client.list_tools()
if serialize:
serializable_tools = []
for tool in tools:
if hasattr(tool, "model_dump"):
serializable_tools.append(tool.model_dump())
elif hasattr(tool, "dict"):
serializable_tools.append(tool.dict())
elif hasattr(tool, "__dict__"):
serializable_tools.append(tool.__dict__)
else:
serializable_tools.append(str(tool))
return serializable_tools
return tools
async def execute_tool(self, tool_name: str, tool_args: dict) -> Tuple[str, bool]:
"""Execute a tool on the MCP server.
Args:
tool_name: Name of the tool to execute
tool_args: Arguments to pass to the tool
Returns:
Tuple of (result_content, success_flag)
Raises:
RuntimeError: If client has not been initialized
"""
self._check_initialized()
try:
result = await self.client.call_tool(tool_name, tool_args)
except Exception as e:
exception_to_check = e
if hasattr(e, "exceptions") and e.exceptions and len(e.exceptions) == 1:
exception_to_check = e.exceptions[0]
_log_mcp_tool_error(logger, tool_name, exception_to_check)
return str(exception_to_check), False
# Parse content from result
parsed_content = []
for content_piece in result.content:
if hasattr(content_piece, "text"):
parsed_content.append(content_piece.text)
logger.debug(f"MCP tool result parsed content (text): {parsed_content}")
else:
parsed_content.append(str(content_piece))
logger.debug(f"MCP tool result parsed content (other): {parsed_content}")
if parsed_content:
final_content = " ".join(parsed_content)
else:
final_content = "Empty response from tool"
return final_content, not result.is_error
def _check_initialized(self):
"""Check if the client has been initialized."""
if not self.initialized:
logger.error("MCPClient has not been initialized")
raise RuntimeError("MCPClient has not been initialized")
async def cleanup(self):
"""Clean up client resources."""
if self.client:
try:
await self.client.close()
except Exception as e:
logger.warning(f"Error during FastMCP client cleanup: {e}")
self.initialized = False