* fix(core): catch all MCP tool execution errors instead of re-raising MCP tools are external user-configured servers - any failure during tool execution is expected and should be returned as (error_msg, False) to the agent, not raised as an exception that hits Datadog as a 500. Previously: - base_client.py only caught McpError/ToolError, re-raised everything else - fastmcp_client.py (both SSE and StreamableHTTP) always re-raised Now all three execute_tool() methods catch all exceptions and return the error message to the agent conversation. The agent handles tool failures via the error message naturally. This silences ~15 Datadog issue types including: - fastmcp.exceptions.ToolError (validation, permissions) - mcp.shared.exceptions.McpError (connection closed, credentials) - httpx.HTTPStatusError (503 from Zapier, etc.) - httpx.ConnectError, ReadTimeout, RemoteProtocolError - requests.exceptions.ConnectionError - builtins.ConnectionError 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> * fix(core): log unexpected MCP errors at warning level with traceback Expected MCP errors (ToolError, McpError, httpx.*, ConnectionError, etc.) log at info level. Anything else (e.g. TypeError, AttributeError from our own code) logs at warning with exc_info=True so it still surfaces in Datadog without crashing the request. 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta <noreply@letta.com> --------- Co-authored-by: Letta <noreply@letta.com>
340 lines
13 KiB
Python
340 lines
13 KiB
Python
"""FastMCP-based MCP clients with server-side OAuth support.
|
|
|
|
This module provides MCP client implementations using the FastMCP library,
|
|
with support for server-side OAuth flows where authorization URLs are
|
|
forwarded to web clients instead of opening a browser.
|
|
|
|
These clients replace the existing AsyncSSEMCPClient and AsyncStreamableHTTPMCPClient
|
|
implementations that used the lower-level MCP SDK directly.
|
|
"""
|
|
|
|
from contextlib import AsyncExitStack
|
|
from typing import List, Optional, Tuple
|
|
|
|
import httpx
|
|
from fastmcp import Client
|
|
from fastmcp.client.transports import SSETransport, StreamableHttpTransport
|
|
from mcp import Tool as MCPTool
|
|
|
|
from letta.errors import LettaMCPConnectionError
|
|
from letta.functions.mcp_client.types import SSEServerConfig, StreamableHTTPServerConfig
|
|
from letta.log import get_logger
|
|
from letta.services.mcp.base_client import _log_mcp_tool_error
|
|
from letta.services.mcp.server_side_oauth import ServerSideOAuth
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class AsyncFastMCPSSEClient:
|
|
"""SSE MCP client using FastMCP with server-side OAuth support.
|
|
|
|
This client connects to MCP servers using Server-Sent Events (SSE) transport.
|
|
It supports both authenticated and unauthenticated connections, with OAuth
|
|
handled via the ServerSideOAuth class for server-side flows.
|
|
|
|
Args:
|
|
server_config: SSE server configuration including URL, headers, and auth settings
|
|
oauth: Optional ServerSideOAuth instance for OAuth authentication
|
|
agent_id: Optional agent ID to include in request headers
|
|
"""
|
|
|
|
AGENT_ID_HEADER = "X-Agent-Id"
|
|
|
|
def __init__(
|
|
self,
|
|
server_config: SSEServerConfig,
|
|
oauth: Optional[ServerSideOAuth] = None,
|
|
agent_id: Optional[str] = None,
|
|
):
|
|
self.server_config = server_config
|
|
self.oauth = oauth
|
|
self.agent_id = agent_id
|
|
self.client: Optional[Client] = None
|
|
self.initialized = False
|
|
self.exit_stack = AsyncExitStack()
|
|
|
|
async def connect_to_server(self):
|
|
"""Establish connection to the MCP server.
|
|
|
|
Raises:
|
|
ConnectionError: If connection to the server fails
|
|
"""
|
|
try:
|
|
headers = {}
|
|
if self.server_config.custom_headers:
|
|
headers.update(self.server_config.custom_headers)
|
|
if self.server_config.auth_header and self.server_config.auth_token:
|
|
headers[self.server_config.auth_header] = self.server_config.auth_token
|
|
if self.agent_id:
|
|
headers[self.AGENT_ID_HEADER] = self.agent_id
|
|
|
|
transport = SSETransport(
|
|
url=self.server_config.server_url,
|
|
headers=headers if headers else None,
|
|
auth=self.oauth, # Pass ServerSideOAuth instance (or None)
|
|
)
|
|
|
|
self.client = Client(transport)
|
|
await self.client._connect()
|
|
self.initialized = True
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 401:
|
|
raise LettaMCPConnectionError(message="401 Unauthorized", server_name=self.server_config.server_name) from e
|
|
raise LettaMCPConnectionError(
|
|
message=f"HTTP error connecting to MCP server at {self.server_config.server_url}: {e}",
|
|
server_name=self.server_config.server_name,
|
|
) from e
|
|
except LettaMCPConnectionError:
|
|
raise
|
|
except ConnectionError as e:
|
|
raise LettaMCPConnectionError(message=str(e), server_name=self.server_config.server_name) from e
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Connecting to MCP server failed. Please review your server config: {self.server_config.model_dump_json(indent=4)}. Error: {str(e)}"
|
|
)
|
|
raise LettaMCPConnectionError(
|
|
message=f"Failed to connect to MCP server at '{self.server_config.server_url}'. "
|
|
f"Please check your configuration and ensure the server is accessible. Error: {str(e)}",
|
|
server_name=self.server_config.server_name,
|
|
) from e
|
|
|
|
async def list_tools(self, serialize: bool = False) -> List[MCPTool]:
|
|
"""List available tools from the MCP server.
|
|
|
|
Args:
|
|
serialize: If True, return tools as dictionaries instead of MCPTool objects
|
|
|
|
Returns:
|
|
List of tools available on the server
|
|
|
|
Raises:
|
|
RuntimeError: If client has not been initialized
|
|
"""
|
|
self._check_initialized()
|
|
tools = await self.client.list_tools()
|
|
if serialize:
|
|
serializable_tools = []
|
|
for tool in tools:
|
|
if hasattr(tool, "model_dump"):
|
|
serializable_tools.append(tool.model_dump())
|
|
elif hasattr(tool, "dict"):
|
|
serializable_tools.append(tool.dict())
|
|
elif hasattr(tool, "__dict__"):
|
|
serializable_tools.append(tool.__dict__)
|
|
else:
|
|
serializable_tools.append(str(tool))
|
|
return serializable_tools
|
|
return tools
|
|
|
|
async def execute_tool(self, tool_name: str, tool_args: dict) -> Tuple[str, bool]:
|
|
"""Execute a tool on the MCP server.
|
|
|
|
Args:
|
|
tool_name: Name of the tool to execute
|
|
tool_args: Arguments to pass to the tool
|
|
|
|
Returns:
|
|
Tuple of (result_content, success_flag)
|
|
|
|
Raises:
|
|
RuntimeError: If client has not been initialized
|
|
"""
|
|
self._check_initialized()
|
|
try:
|
|
result = await self.client.call_tool(tool_name, tool_args)
|
|
except Exception as e:
|
|
exception_to_check = e
|
|
if hasattr(e, "exceptions") and e.exceptions and len(e.exceptions) == 1:
|
|
exception_to_check = e.exceptions[0]
|
|
_log_mcp_tool_error(logger, tool_name, exception_to_check)
|
|
return str(exception_to_check), False
|
|
|
|
# Parse content from result
|
|
parsed_content = []
|
|
for content_piece in result.content:
|
|
if hasattr(content_piece, "text"):
|
|
parsed_content.append(content_piece.text)
|
|
logger.debug(f"MCP tool result parsed content (text): {parsed_content}")
|
|
else:
|
|
parsed_content.append(str(content_piece))
|
|
logger.debug(f"MCP tool result parsed content (other): {parsed_content}")
|
|
|
|
if parsed_content:
|
|
final_content = " ".join(parsed_content)
|
|
else:
|
|
final_content = "Empty response from tool"
|
|
|
|
return final_content, not result.is_error
|
|
|
|
def _check_initialized(self):
|
|
"""Check if the client has been initialized."""
|
|
if not self.initialized:
|
|
logger.error("MCPClient has not been initialized")
|
|
raise RuntimeError("MCPClient has not been initialized")
|
|
|
|
async def cleanup(self):
|
|
"""Clean up client resources."""
|
|
if self.client:
|
|
try:
|
|
await self.client.close()
|
|
except Exception as e:
|
|
logger.warning(f"Error during FastMCP client cleanup: {e}")
|
|
self.initialized = False
|
|
|
|
|
|
class AsyncFastMCPStreamableHTTPClient:
|
|
"""Streamable HTTP MCP client using FastMCP with server-side OAuth support.
|
|
|
|
This client connects to MCP servers using Streamable HTTP transport.
|
|
It supports both authenticated and unauthenticated connections, with OAuth
|
|
handled via the ServerSideOAuth class for server-side flows.
|
|
|
|
Args:
|
|
server_config: Streamable HTTP server configuration
|
|
oauth: Optional ServerSideOAuth instance for OAuth authentication
|
|
agent_id: Optional agent ID to include in request headers
|
|
"""
|
|
|
|
AGENT_ID_HEADER = "X-Agent-Id"
|
|
|
|
def __init__(
|
|
self,
|
|
server_config: StreamableHTTPServerConfig,
|
|
oauth: Optional[ServerSideOAuth] = None,
|
|
agent_id: Optional[str] = None,
|
|
):
|
|
self.server_config = server_config
|
|
self.oauth = oauth
|
|
self.agent_id = agent_id
|
|
self.client: Optional[Client] = None
|
|
self.initialized = False
|
|
self.exit_stack = AsyncExitStack()
|
|
|
|
async def connect_to_server(self):
|
|
"""Establish connection to the MCP server.
|
|
|
|
Raises:
|
|
ConnectionError: If connection to the server fails
|
|
"""
|
|
try:
|
|
headers = {}
|
|
if self.server_config.custom_headers:
|
|
headers.update(self.server_config.custom_headers)
|
|
if self.server_config.auth_header and self.server_config.auth_token:
|
|
headers[self.server_config.auth_header] = self.server_config.auth_token
|
|
if self.agent_id:
|
|
headers[self.AGENT_ID_HEADER] = self.agent_id
|
|
|
|
transport = StreamableHttpTransport(
|
|
url=self.server_config.server_url,
|
|
headers=headers if headers else None,
|
|
auth=self.oauth, # Pass ServerSideOAuth instance (or None)
|
|
)
|
|
|
|
self.client = Client(transport)
|
|
await self.client._connect()
|
|
self.initialized = True
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 401:
|
|
raise LettaMCPConnectionError(message="401 Unauthorized", server_name=self.server_config.server_name) from e
|
|
raise LettaMCPConnectionError(
|
|
message=f"HTTP error connecting to MCP server at {self.server_config.server_url}: {e}",
|
|
server_name=self.server_config.server_name,
|
|
) from e
|
|
except LettaMCPConnectionError:
|
|
raise
|
|
except ConnectionError as e:
|
|
raise LettaMCPConnectionError(message=str(e), server_name=self.server_config.server_name) from e
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Connecting to MCP server failed. Please review your server config: {self.server_config.model_dump_json(indent=4)}. Error: {str(e)}"
|
|
)
|
|
raise LettaMCPConnectionError(
|
|
message=f"Failed to connect to MCP server at '{self.server_config.server_url}'. "
|
|
f"Please check your configuration and ensure the server is accessible. Error: {str(e)}",
|
|
server_name=self.server_config.server_name,
|
|
) from e
|
|
|
|
async def list_tools(self, serialize: bool = False) -> List[MCPTool]:
|
|
"""List available tools from the MCP server.
|
|
|
|
Args:
|
|
serialize: If True, return tools as dictionaries instead of MCPTool objects
|
|
|
|
Returns:
|
|
List of tools available on the server
|
|
|
|
Raises:
|
|
RuntimeError: If client has not been initialized
|
|
"""
|
|
self._check_initialized()
|
|
tools = await self.client.list_tools()
|
|
if serialize:
|
|
serializable_tools = []
|
|
for tool in tools:
|
|
if hasattr(tool, "model_dump"):
|
|
serializable_tools.append(tool.model_dump())
|
|
elif hasattr(tool, "dict"):
|
|
serializable_tools.append(tool.dict())
|
|
elif hasattr(tool, "__dict__"):
|
|
serializable_tools.append(tool.__dict__)
|
|
else:
|
|
serializable_tools.append(str(tool))
|
|
return serializable_tools
|
|
return tools
|
|
|
|
async def execute_tool(self, tool_name: str, tool_args: dict) -> Tuple[str, bool]:
|
|
"""Execute a tool on the MCP server.
|
|
|
|
Args:
|
|
tool_name: Name of the tool to execute
|
|
tool_args: Arguments to pass to the tool
|
|
|
|
Returns:
|
|
Tuple of (result_content, success_flag)
|
|
|
|
Raises:
|
|
RuntimeError: If client has not been initialized
|
|
"""
|
|
self._check_initialized()
|
|
try:
|
|
result = await self.client.call_tool(tool_name, tool_args)
|
|
except Exception as e:
|
|
exception_to_check = e
|
|
if hasattr(e, "exceptions") and e.exceptions and len(e.exceptions) == 1:
|
|
exception_to_check = e.exceptions[0]
|
|
_log_mcp_tool_error(logger, tool_name, exception_to_check)
|
|
return str(exception_to_check), False
|
|
|
|
# Parse content from result
|
|
parsed_content = []
|
|
for content_piece in result.content:
|
|
if hasattr(content_piece, "text"):
|
|
parsed_content.append(content_piece.text)
|
|
logger.debug(f"MCP tool result parsed content (text): {parsed_content}")
|
|
else:
|
|
parsed_content.append(str(content_piece))
|
|
logger.debug(f"MCP tool result parsed content (other): {parsed_content}")
|
|
|
|
if parsed_content:
|
|
final_content = " ".join(parsed_content)
|
|
else:
|
|
final_content = "Empty response from tool"
|
|
|
|
return final_content, not result.is_error
|
|
|
|
def _check_initialized(self):
|
|
"""Check if the client has been initialized."""
|
|
if not self.initialized:
|
|
logger.error("MCPClient has not been initialized")
|
|
raise RuntimeError("MCPClient has not been initialized")
|
|
|
|
async def cleanup(self):
|
|
"""Clean up client resources."""
|
|
if self.client:
|
|
try:
|
|
await self.client.close()
|
|
except Exception as e:
|
|
logger.warning(f"Error during FastMCP client cleanup: {e}")
|
|
self.initialized = False
|