From 3b0b2cbee10f7a37ca1df5baf78632bd3eeecd40 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 5 Jan 2026 15:31:55 -0800 Subject: [PATCH] fix: unbounded file to pydantic conversion (#8292) * fix: unbounded file to pydantic conversion * remove var name --- letta/orm/file.py | 28 +++++++++++++++++++++++++ letta/services/file_manager.py | 32 +++++++++++++++++++--------- letta/utils.py | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/letta/orm/file.py b/letta/orm/file.py index 3229675f..21cc38a1 100644 --- a/letta/orm/file.py +++ b/letta/orm/file.py @@ -69,6 +69,34 @@ class FileMetadata(SqlalchemyBase, OrganizationMixin, SourceMixin, AsyncAttrs): cascade="all, delete-orphan", ) + def to_pydantic(self, strip_directory_prefix: bool = False) -> PydanticFileMetadata: + """ + Convert to Pydantic model without any relationship loading. + """ + file_name = self.file_name + if strip_directory_prefix and "/" in file_name: + file_name = "/".join(file_name.split("/")[1:]) + + return PydanticFileMetadata( + id=self.id, + organization_id=self.organization_id, + source_id=self.source_id, + file_name=file_name, + original_file_name=self.original_file_name, + file_path=self.file_path, + file_type=self.file_type, + file_size=self.file_size, + file_creation_date=self.file_creation_date, + file_last_modified_date=self.file_last_modified_date, + processing_status=self.processing_status, + error_message=self.error_message, + total_chunks=self.total_chunks, + chunks_embedded=self.chunks_embedded, + created_at=self.created_at, + updated_at=self.updated_at, + content=None, + ) + async def to_pydantic_async(self, include_content: bool = False, strip_directory_prefix: bool = False) -> PydanticFileMetadata: """ Async version of `to_pydantic` that supports optional relationship loading diff --git a/letta/services/file_manager.py b/letta/services/file_manager.py index a0d60136..bd52e309 100644 --- a/letta/services/file_manager.py +++ b/letta/services/file_manager.py @@ -22,7 +22,7 @@ from letta.schemas.source_metadata import FileStats, OrganizationSourcesStats, S from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.settings import settings -from letta.utils import enforce_types +from letta.utils import bounded_gather, enforce_types from letta.validators import raise_on_invalid_id logger = get_logger(__name__) @@ -85,7 +85,7 @@ class FileManager: # invalidate cache for this new file await self._invalidate_file_caches(file_orm.id, actor, file_orm.original_file_name, file_orm.source_id) - return await file_orm.to_pydantic_async() + return file_orm.to_pydantic() except IntegrityError: await session.rollback() @@ -284,7 +284,7 @@ class FileManager: identifier=file_id, actor=actor, ) - return await file_orm.to_pydantic_async() + return file_orm.to_pydantic() @enforce_types @trace_method @@ -451,9 +451,15 @@ class FileManager: ) # convert all files to pydantic models - file_metadatas = await asyncio.gather( - *[file.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) for file in files] - ) + if include_content: + file_metadatas = await bounded_gather( + [ + file.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) + for file in files + ] + ) + else: + file_metadatas = [file.to_pydantic(strip_directory_prefix=strip_directory_prefix) for file in files] # if status checking is enabled, check all files sequentially to avoid db pool exhaustion # Each status check may update the file in the database, so concurrent checks with many @@ -479,7 +485,7 @@ class FileManager: await self._invalidate_file_caches(file_id, actor, file.original_file_name, file.source_id) await file.hard_delete_async(db_session=session, actor=actor) - return await file.to_pydantic_async() + return file.to_pydantic() @enforce_types @trace_method @@ -561,7 +567,7 @@ class FileManager: file_orm = result.scalar_one_or_none() if file_orm: - return await file_orm.to_pydantic_async() + return file_orm.to_pydantic() return None @enforce_types @@ -670,7 +676,10 @@ class FileManager: result = await session.execute(query) files_orm = result.scalars().all() - return await asyncio.gather(*[file.to_pydantic_async(include_content=include_content) for file in files_orm]) + if include_content: + return await bounded_gather([file.to_pydantic_async(include_content=include_content) for file in files_orm]) + else: + return [file.to_pydantic() for file in files_orm] @enforce_types @trace_method @@ -715,4 +724,7 @@ class FileManager: result = await session.execute(query) files_orm = result.scalars().all() - return await asyncio.gather(*[file.to_pydantic_async(include_content=include_content) for file in files_orm]) + if include_content: + return await bounded_gather([file.to_pydantic_async(include_content=include_content) for file in files_orm]) + else: + return [file.to_pydantic() for file in files_orm] diff --git a/letta/utils.py b/letta/utils.py index 62548030..c4f75539 100644 --- a/letta/utils.py +++ b/letta/utils.py @@ -1418,3 +1418,41 @@ def is_1_0_sdk_version(headers: HeaderParams): major_version = version_base.split(".")[0] return major_version == "1" + + +async def bounded_gather(coros: list[Coroutine], max_concurrency: int = 10) -> list: + """ + Execute coroutines with bounded concurrency to prevent event loop saturation. + + Unlike asyncio.gather() which runs all coroutines concurrently, this limits + the number of concurrent tasks to prevent overwhelming the event loop. + + Note: This is a stopgap measure. Prefer fixing the root cause by: + - Limiting items fetched from DB (e.g., pagination) + - Using explicit relationship loading instead of eager-loading all + - Adding concurrency limits at the API/business logic layer + + Args: + coros: List of coroutines to execute + max_concurrency: Maximum number of concurrent tasks (default: 10) + + Returns: + List of results in the same order as input coroutines + """ + if not coros: + return [] + + semaphore = asyncio.Semaphore(max_concurrency) + + async def bounded_coro(index: int, coro: Coroutine): + async with semaphore: + result = await coro + return (index, result) + + # Wrap all coroutines with semaphore control + tasks = [bounded_coro(i, coro) for i, coro in enumerate(coros)] + indexed_results = await asyncio.gather(*tasks) + + # Sort by original index to preserve order + indexed_results.sort(key=lambda x: x[0]) + return [result for _, result in indexed_results]