fix: prevent db connection pool exhaustion in MCP server manager (#6622)
Problem: When creating an MCP server with many tools, the code used two asyncio.gather calls - one for tool creation and one for mapping creation. Each operation involves database INSERT/UPDATE, leading to 2N concurrent database connections. Example: An MCP server with 50 tools creates 50 + 50 = 100 simultaneous database connections (tools + mappings), severely exhausting the pool. Root cause: 1. asyncio.gather(*[create_mcp_tool_async(...) for tool in tools]) 2. asyncio.gather(*[create_mcp_tool_mapping(...) for tool in results]) Both process operations concurrently, each opening a DB session. Solution: Process tool creation and mapping sequentially in a single loop. Create each tool, then immediately create its mapping if successful. This: - Reduces connection count from 2N to 1 - Maintains proper error handling per tool - Prevents database connection pool exhaustion Changes: - apps/core/letta/services/mcp_server_manager.py: - Replaced two asyncio.gather calls with single sequential loop - Create mapping immediately after each successful tool creation - Maintained return_exceptions=True behavior with try/except - Added explanatory comment about db pool exhaustion prevention Impact: With 50 MCP tools: - Before: 100 concurrent DB connections (50 tools + 50 mappings, pool exhaustion) - After: 1 DB connection at a time (no pool exhaustion) Note: This follows the same pattern as PR #6617, #6619, #6620, and #6621 which fixed similar issues throughout the codebase.
This commit is contained in:
@@ -610,31 +610,32 @@ class MCPServerManager:
|
||||
# Filter out invalid tools
|
||||
valid_tools = [tool for tool in mcp_tools if not (tool.health and tool.health.status == "INVALID")]
|
||||
|
||||
# Register in parallel
|
||||
# Register tools sequentially to avoid exhausting database connection pool
|
||||
# When an MCP server has many tools (e.g., 50+), concurrent tool creation and mapping
|
||||
# can create too many simultaneous database connections, causing pool exhaustion errors
|
||||
if valid_tools:
|
||||
tool_tasks = []
|
||||
results = []
|
||||
successful_count = 0
|
||||
for mcp_tool in valid_tools:
|
||||
tool_create = ToolCreate.from_mcp(mcp_server_name=created_server.server_name, mcp_tool=mcp_tool)
|
||||
task = self.tool_manager.create_mcp_tool_async(
|
||||
tool_create=tool_create, mcp_server_name=created_server.server_name, mcp_server_id=created_server.id, actor=actor
|
||||
)
|
||||
tool_tasks.append(task)
|
||||
try:
|
||||
result = await self.tool_manager.create_mcp_tool_async(
|
||||
tool_create=tool_create,
|
||||
mcp_server_name=created_server.server_name,
|
||||
mcp_server_id=created_server.id,
|
||||
actor=actor,
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
results = await asyncio.gather(*tool_tasks, return_exceptions=True)
|
||||
|
||||
# Create mappings in MCPTools table for successful tools
|
||||
mapping_tasks = []
|
||||
successful_count = 0
|
||||
for result in results:
|
||||
if not isinstance(result, Exception) and result:
|
||||
# result should be a PydanticTool
|
||||
mapping_task = self.create_mcp_tool_mapping(created_server.id, result.id, actor)
|
||||
mapping_tasks.append(mapping_task)
|
||||
successful_count += 1
|
||||
|
||||
# Execute mapping creation in parallel
|
||||
if mapping_tasks:
|
||||
await asyncio.gather(*mapping_tasks, return_exceptions=True)
|
||||
# Create mapping for successful tool
|
||||
if result:
|
||||
try:
|
||||
await self.create_mcp_tool_mapping(created_server.id, result.id, actor)
|
||||
successful_count += 1
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to create mapping for tool {result.id}: {e}")
|
||||
except Exception as e:
|
||||
results.append(e)
|
||||
|
||||
failed = len(results) - successful_count
|
||||
logger.info(
|
||||
|
||||
Reference in New Issue
Block a user