From 1b25154c399e0c1a9715a014e762fb352c1281d2 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 5 Jan 2026 16:13:01 -0800 Subject: [PATCH] fix: unbounded gather for agents listing (#8297) * fix: unbounded gather for agents listing * more callsites --- letta/services/agent_manager.py | 10 +++++----- letta/services/archive_manager.py | 4 ++-- letta/services/block_manager.py | 6 ++---- letta/services/identity_manager.py | 4 ++-- letta/services/source_manager.py | 4 ++-- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 4bbf5139..d2261327 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -111,7 +111,7 @@ from letta.services.passage_manager import PassageManager from letta.services.source_manager import SourceManager from letta.services.tool_manager import ToolManager from letta.settings import DatabaseChoice, model_settings, settings -from letta.utils import calculate_file_defaults_based_on_context_window, enforce_types, united_diff +from letta.utils import bounded_gather, calculate_file_defaults_based_on_context_window, enforce_types, united_diff from letta.validators import raise_on_invalid_id logger = get_logger(__name__) @@ -970,8 +970,8 @@ class AgentManager: query = query.limit(limit) result = await session.execute(query) agents = result.scalars().all() - return await asyncio.gather( - *[agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents] + return await bounded_gather( + [agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents] ) @trace_method @@ -1068,7 +1068,7 @@ class AgentManager: query = query.distinct(AgentModel.id).order_by(AgentModel.id).limit(limit) result = await session.execute(query) - return await asyncio.gather(*[agent.to_pydantic_async() for agent in result.scalars()]) + return await bounded_gather([agent.to_pydantic_async() for agent in result.scalars()]) @trace_method async def size_async( @@ -1137,7 +1137,7 @@ class AgentManager: logger.warning(f"No agents found with IDs: {agent_ids}") return [] - return await asyncio.gather(*[agent.to_pydantic_async(include_relationships=include_relationships) for agent in agents]) + return await bounded_gather([agent.to_pydantic_async(include_relationships=include_relationships) for agent in agents]) except Exception as e: logger.error(f"Error fetching agents with IDs {agent_ids}: {str(e)}") raise diff --git a/letta/services/archive_manager.py b/letta/services/archive_manager.py index 83903b93..52b5843c 100644 --- a/letta/services/archive_manager.py +++ b/letta/services/archive_manager.py @@ -18,7 +18,7 @@ from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.services.helpers.agent_manager_helper import validate_agent_exists_async from letta.settings import DatabaseChoice, settings -from letta.utils import enforce_types +from letta.utils import bounded_gather, enforce_types from letta.validators import raise_on_invalid_id logger = get_logger(__name__) @@ -554,7 +554,7 @@ class ArchiveManager: result = await session.execute(query) agents_orm = result.scalars().all() - agents = await asyncio.gather(*[agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents_orm]) + agents = await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents_orm]) return agents @enforce_types diff --git a/letta/services/block_manager.py b/letta/services/block_manager.py index 684fb163..ebfc03eb 100644 --- a/letta/services/block_manager.py +++ b/letta/services/block_manager.py @@ -19,7 +19,7 @@ from letta.schemas.enums import ActorType, PrimitiveType from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.settings import DatabaseChoice, settings -from letta.utils import enforce_types +from letta.utils import bounded_gather, enforce_types from letta.validators import raise_on_invalid_id logger = get_logger(__name__) @@ -505,9 +505,7 @@ class BlockManager: result = await session.execute(query) agents_orm = result.scalars().all() - agents = await asyncio.gather( - *[agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents_orm] - ) + agents = await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents_orm]) return agents @enforce_types diff --git a/letta/services/identity_manager.py b/letta/services/identity_manager.py index 40545e9f..6fa9e5bc 100644 --- a/letta/services/identity_manager.py +++ b/letta/services/identity_manager.py @@ -24,7 +24,7 @@ from letta.schemas.identity import ( from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.settings import DatabaseChoice, settings -from letta.utils import enforce_types +from letta.utils import bounded_gather, enforce_types from letta.validators import raise_on_invalid_id @@ -336,7 +336,7 @@ class IdentityManager: ascending=ascending, identity_id=identity.id, ) - return await asyncio.gather(*[agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents]) + return await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents]) @enforce_types @raise_on_invalid_id(param_name="identity_id", expected_prefix=PrimitiveType.IDENTITY) diff --git a/letta/services/source_manager.py b/letta/services/source_manager.py index a2673cc9..123bfc3e 100644 --- a/letta/services/source_manager.py +++ b/letta/services/source_manager.py @@ -15,7 +15,7 @@ from letta.schemas.enums import PrimitiveType, VectorDBProvider from letta.schemas.source import Source as PydanticSource, SourceUpdate from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.utils import enforce_types, printd +from letta.utils import bounded_gather, enforce_types, printd from letta.validators import raise_on_invalid_id @@ -326,7 +326,7 @@ class SourceManager: result = await session.execute(query) agents_orm = result.scalars().all() - return await asyncio.gather(*[agent.to_pydantic_async(include=[]) for agent in agents_orm]) + return await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=[]) for agent in agents_orm]) @enforce_types @raise_on_invalid_id(param_name="source_id", expected_prefix=PrimitiveType.SOURCE)