fix: unbounded file to pydantic conversion (#8292)

* fix: unbounded file to pydantic conversion

* remove var name
This commit is contained in:
cthomas
2026-01-05 15:31:55 -08:00
committed by Caren Thomas
parent 6e0bf69ed8
commit 3b0b2cbee1
3 changed files with 88 additions and 10 deletions

View File

@@ -69,6 +69,34 @@ class FileMetadata(SqlalchemyBase, OrganizationMixin, SourceMixin, AsyncAttrs):
cascade="all, delete-orphan",
)
def to_pydantic(self, strip_directory_prefix: bool = False) -> PydanticFileMetadata:
"""
Convert to Pydantic model without any relationship loading.
"""
file_name = self.file_name
if strip_directory_prefix and "/" in file_name:
file_name = "/".join(file_name.split("/")[1:])
return PydanticFileMetadata(
id=self.id,
organization_id=self.organization_id,
source_id=self.source_id,
file_name=file_name,
original_file_name=self.original_file_name,
file_path=self.file_path,
file_type=self.file_type,
file_size=self.file_size,
file_creation_date=self.file_creation_date,
file_last_modified_date=self.file_last_modified_date,
processing_status=self.processing_status,
error_message=self.error_message,
total_chunks=self.total_chunks,
chunks_embedded=self.chunks_embedded,
created_at=self.created_at,
updated_at=self.updated_at,
content=None,
)
async def to_pydantic_async(self, include_content: bool = False, strip_directory_prefix: bool = False) -> PydanticFileMetadata:
"""
Async version of `to_pydantic` that supports optional relationship loading

View File

@@ -22,7 +22,7 @@ from letta.schemas.source_metadata import FileStats, OrganizationSourcesStats, S
from letta.schemas.user import User as PydanticUser
from letta.server.db import db_registry
from letta.settings import settings
from letta.utils import enforce_types
from letta.utils import bounded_gather, enforce_types
from letta.validators import raise_on_invalid_id
logger = get_logger(__name__)
@@ -85,7 +85,7 @@ class FileManager:
# invalidate cache for this new file
await self._invalidate_file_caches(file_orm.id, actor, file_orm.original_file_name, file_orm.source_id)
return await file_orm.to_pydantic_async()
return file_orm.to_pydantic()
except IntegrityError:
await session.rollback()
@@ -284,7 +284,7 @@ class FileManager:
identifier=file_id,
actor=actor,
)
return await file_orm.to_pydantic_async()
return file_orm.to_pydantic()
@enforce_types
@trace_method
@@ -451,9 +451,15 @@ class FileManager:
)
# convert all files to pydantic models
file_metadatas = await asyncio.gather(
*[file.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) for file in files]
)
if include_content:
file_metadatas = await bounded_gather(
[
file.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix)
for file in files
]
)
else:
file_metadatas = [file.to_pydantic(strip_directory_prefix=strip_directory_prefix) for file in files]
# if status checking is enabled, check all files sequentially to avoid db pool exhaustion
# Each status check may update the file in the database, so concurrent checks with many
@@ -479,7 +485,7 @@ class FileManager:
await self._invalidate_file_caches(file_id, actor, file.original_file_name, file.source_id)
await file.hard_delete_async(db_session=session, actor=actor)
return await file.to_pydantic_async()
return file.to_pydantic()
@enforce_types
@trace_method
@@ -561,7 +567,7 @@ class FileManager:
file_orm = result.scalar_one_or_none()
if file_orm:
return await file_orm.to_pydantic_async()
return file_orm.to_pydantic()
return None
@enforce_types
@@ -670,7 +676,10 @@ class FileManager:
result = await session.execute(query)
files_orm = result.scalars().all()
return await asyncio.gather(*[file.to_pydantic_async(include_content=include_content) for file in files_orm])
if include_content:
return await bounded_gather([file.to_pydantic_async(include_content=include_content) for file in files_orm])
else:
return [file.to_pydantic() for file in files_orm]
@enforce_types
@trace_method
@@ -715,4 +724,7 @@ class FileManager:
result = await session.execute(query)
files_orm = result.scalars().all()
return await asyncio.gather(*[file.to_pydantic_async(include_content=include_content) for file in files_orm])
if include_content:
return await bounded_gather([file.to_pydantic_async(include_content=include_content) for file in files_orm])
else:
return [file.to_pydantic() for file in files_orm]

View File

@@ -1418,3 +1418,41 @@ def is_1_0_sdk_version(headers: HeaderParams):
major_version = version_base.split(".")[0]
return major_version == "1"
async def bounded_gather(coros: list[Coroutine], max_concurrency: int = 10) -> list:
"""
Execute coroutines with bounded concurrency to prevent event loop saturation.
Unlike asyncio.gather() which runs all coroutines concurrently, this limits
the number of concurrent tasks to prevent overwhelming the event loop.
Note: This is a stopgap measure. Prefer fixing the root cause by:
- Limiting items fetched from DB (e.g., pagination)
- Using explicit relationship loading instead of eager-loading all
- Adding concurrency limits at the API/business logic layer
Args:
coros: List of coroutines to execute
max_concurrency: Maximum number of concurrent tasks (default: 10)
Returns:
List of results in the same order as input coroutines
"""
if not coros:
return []
semaphore = asyncio.Semaphore(max_concurrency)
async def bounded_coro(index: int, coro: Coroutine):
async with semaphore:
result = await coro
return (index, result)
# Wrap all coroutines with semaphore control
tasks = [bounded_coro(i, coro) for i, coro in enumerate(coros)]
indexed_results = await asyncio.gather(*tasks)
# Sort by original index to preserve order
indexed_results.sort(key=lambda x: x[0])
return [result for _, result in indexed_results]