fix: unbounded gather for agents listing (#8297)

* fix: unbounded gather for agents listing

* more callsites
This commit is contained in:
cthomas
2026-01-05 16:13:01 -08:00
committed by Caren Thomas
parent 3aaab90b4c
commit 1b25154c39
5 changed files with 13 additions and 15 deletions

View File

@@ -111,7 +111,7 @@ from letta.services.passage_manager import PassageManager
from letta.services.source_manager import SourceManager
from letta.services.tool_manager import ToolManager
from letta.settings import DatabaseChoice, model_settings, settings
from letta.utils import calculate_file_defaults_based_on_context_window, enforce_types, united_diff
from letta.utils import bounded_gather, calculate_file_defaults_based_on_context_window, enforce_types, united_diff
from letta.validators import raise_on_invalid_id
logger = get_logger(__name__)
@@ -970,8 +970,8 @@ class AgentManager:
query = query.limit(limit)
result = await session.execute(query)
agents = result.scalars().all()
return await asyncio.gather(
*[agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents]
return await bounded_gather(
[agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents]
)
@trace_method
@@ -1068,7 +1068,7 @@ class AgentManager:
query = query.distinct(AgentModel.id).order_by(AgentModel.id).limit(limit)
result = await session.execute(query)
return await asyncio.gather(*[agent.to_pydantic_async() for agent in result.scalars()])
return await bounded_gather([agent.to_pydantic_async() for agent in result.scalars()])
@trace_method
async def size_async(
@@ -1137,7 +1137,7 @@ class AgentManager:
logger.warning(f"No agents found with IDs: {agent_ids}")
return []
return await asyncio.gather(*[agent.to_pydantic_async(include_relationships=include_relationships) for agent in agents])
return await bounded_gather([agent.to_pydantic_async(include_relationships=include_relationships) for agent in agents])
except Exception as e:
logger.error(f"Error fetching agents with IDs {agent_ids}: {str(e)}")
raise

View File

@@ -18,7 +18,7 @@ from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.services.helpers.agent_manager_helper import validate_agent_exists_async
from letta.settings import DatabaseChoice, settings
from letta.utils import enforce_types
from letta.utils import bounded_gather, enforce_types
from letta.validators import raise_on_invalid_id
logger = get_logger(__name__)
@@ -554,7 +554,7 @@ class ArchiveManager:
result = await session.execute(query)
agents_orm = result.scalars().all()
agents = await asyncio.gather(*[agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents_orm])
agents = await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents_orm])
return agents
@enforce_types

View File

@@ -19,7 +19,7 @@ from letta.schemas.enums import ActorType, PrimitiveType
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.settings import DatabaseChoice, settings
from letta.utils import enforce_types
from letta.utils import bounded_gather, enforce_types
from letta.validators import raise_on_invalid_id
logger = get_logger(__name__)
@@ -505,9 +505,7 @@ class BlockManager:
result = await session.execute(query)
agents_orm = result.scalars().all()
agents = await asyncio.gather(
*[agent.to_pydantic_async(include_relationships=include_relationships, include=include) for agent in agents_orm]
)
agents = await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents_orm])
return agents
@enforce_types

View File

@@ -24,7 +24,7 @@ from letta.schemas.identity import (
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.settings import DatabaseChoice, settings
from letta.utils import enforce_types
from letta.utils import bounded_gather, enforce_types
from letta.validators import raise_on_invalid_id
@@ -336,7 +336,7 @@ class IdentityManager:
ascending=ascending,
identity_id=identity.id,
)
return await asyncio.gather(*[agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents])
return await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=include) for agent in agents])
@enforce_types
@raise_on_invalid_id(param_name="identity_id", expected_prefix=PrimitiveType.IDENTITY)

View File

@@ -15,7 +15,7 @@ from letta.schemas.enums import PrimitiveType, VectorDBProvider
from letta.schemas.source import Source as PydanticSource, SourceUpdate
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.utils import enforce_types, printd
from letta.utils import bounded_gather, enforce_types, printd
from letta.validators import raise_on_invalid_id
@@ -326,7 +326,7 @@ class SourceManager:
result = await session.execute(query)
agents_orm = result.scalars().all()
return await asyncio.gather(*[agent.to_pydantic_async(include=[]) for agent in agents_orm])
return await bounded_gather([agent.to_pydantic_async(include_relationships=[], include=[]) for agent in agents_orm])
@enforce_types
@raise_on_invalid_id(param_name="source_id", expected_prefix=PrimitiveType.SOURCE)