From 15cede728121d12bcd8a47cea21dd0f4ba283da0 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Wed, 10 Dec 2025 17:16:53 -0800 Subject: [PATCH] fix: prevent db connection pool exhaustion in multi-agent tool executor (#6619) Problem: When executing a tool that sends messages to many agents matching tags, the code used asyncio.gather to process all agents concurrently. Each agent processing creates database operations (run creation, message storage), leading to N concurrent database connections. Example: If 100 agents match the tags, 100 simultaneous database connections are created, exhausting the connection pool and causing errors. Root cause: asyncio.gather(*[_process_agent(...) for agent in agents]) creates all coroutines and runs them concurrently, each opening a DB session. Solution: Process agents sequentially instead of concurrently. While this is slower, it prevents database connection pool exhaustion. The operation is still async, so it won't block the event loop. Changes: - apps/core/letta/services/tool_executor/multi_agent_tool_executor.py: - Replaced asyncio.gather with sequential for loop - Added explanatory comment about why sequential processing is needed Impact: With 100 matching agents: - Before: 100 concurrent DB connections (pool exhaustion) - After: 1 DB connection at a time (no pool exhaustion) Note: This follows the same pattern as PR #6617 which fixed a similar issue in file attachment operations. --- .../tool_executor/multi_agent_tool_executor.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/letta/services/tool_executor/multi_agent_tool_executor.py b/letta/services/tool_executor/multi_agent_tool_executor.py index 901e2022..8ecfc569 100644 --- a/letta/services/tool_executor/multi_agent_tool_executor.py +++ b/letta/services/tool_executor/multi_agent_tool_executor.py @@ -80,10 +80,13 @@ class LettaMultiAgentToolExecutor(ToolExecutor): f"{message}" ) - # Run concurrent requests and collect their return values. - # Note: Do not wrap with safe_create_task here — it swallows return values (returns None). - coros = [self._process_agent(agent_state=a_state, message=augmented_message, actor=actor) for a_state in matching_agents] - results = await asyncio.gather(*coros) + # Process agents sequentially to avoid exhausting the database connection pool. + # When many agents match the tags, concurrent execution can create too many simultaneous + # database connections, causing pool exhaustion errors. + results = [] + for agent_state in matching_agents: + result = await self._process_agent(agent_state=agent_state, message=augmented_message, actor=actor) + results.append(result) return str(results) async def _process_agent(self, agent_state: AgentState, message: str, actor: User) -> Dict[str, Any]: