fix: prevent db connection pool exhaustion in multi-agent tool executor (#6619)
Problem: When executing a tool that sends messages to many agents matching tags, the code used asyncio.gather to process all agents concurrently. Each agent processing creates database operations (run creation, message storage), leading to N concurrent database connections. Example: If 100 agents match the tags, 100 simultaneous database connections are created, exhausting the connection pool and causing errors. Root cause: asyncio.gather(*[_process_agent(...) for agent in agents]) creates all coroutines and runs them concurrently, each opening a DB session. Solution: Process agents sequentially instead of concurrently. While this is slower, it prevents database connection pool exhaustion. The operation is still async, so it won't block the event loop. Changes: - apps/core/letta/services/tool_executor/multi_agent_tool_executor.py: - Replaced asyncio.gather with sequential for loop - Added explanatory comment about why sequential processing is needed Impact: With 100 matching agents: - Before: 100 concurrent DB connections (pool exhaustion) - After: 1 DB connection at a time (no pool exhaustion) Note: This follows the same pattern as PR #6617 which fixed a similar issue in file attachment operations.
This commit is contained in:
@@ -80,10 +80,13 @@ class LettaMultiAgentToolExecutor(ToolExecutor):
|
||||
f"{message}"
|
||||
)
|
||||
|
||||
# Run concurrent requests and collect their return values.
|
||||
# Note: Do not wrap with safe_create_task here — it swallows return values (returns None).
|
||||
coros = [self._process_agent(agent_state=a_state, message=augmented_message, actor=actor) for a_state in matching_agents]
|
||||
results = await asyncio.gather(*coros)
|
||||
# Process agents sequentially to avoid exhausting the database connection pool.
|
||||
# When many agents match the tags, concurrent execution can create too many simultaneous
|
||||
# database connections, causing pool exhaustion errors.
|
||||
results = []
|
||||
for agent_state in matching_agents:
|
||||
result = await self._process_agent(agent_state=agent_state, message=augmented_message, actor=actor)
|
||||
results.append(result)
|
||||
return str(results)
|
||||
|
||||
async def _process_agent(self, agent_state: AgentState, message: str, actor: User) -> Dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user