From f25844fbe2b346192cf9bb205b33fe7110cc8f2a Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Mon, 25 Aug 2025 16:21:33 -0700 Subject: [PATCH] fix: Add check status update flag to list (#4177) --- letta/server/rest_api/routers/v1/sources.py | 54 ++-------- letta/services/file_manager.py | 113 ++++++++++++++++++-- tests/helpers/utils.py | 40 +++++++ tests/test_sources.py | 43 +++++++- 4 files changed, 192 insertions(+), 58 deletions(-) diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index 90ac70ec..3bfbb9d2 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -2,7 +2,6 @@ import asyncio import mimetypes import os import tempfile -from datetime import datetime, timedelta, timezone from pathlib import Path from typing import List, Optional @@ -14,7 +13,6 @@ import letta.constants as constants from letta.helpers.pinecone_utils import ( delete_file_records_from_pinecone_index, delete_source_records_from_pinecone_index, - list_pinecone_index_for_files, should_use_pinecone, ) from letta.log import get_logger @@ -366,6 +364,10 @@ async def list_source_files( limit: int = Query(1000, description="Number of files to return"), after: Optional[str] = Query(None, description="Pagination cursor to fetch the next set of results"), include_content: bool = Query(False, description="Whether to include full file content"), + check_status_updates: bool = Query( + True, + description="Whether to check and update file processing status (from the vector db service). If False, will not fetch and update the status, which may lead to performance gains.", + ), server: "SyncServer" = Depends(get_letta_server), actor_id: Optional[str] = Header(None, alias="user_id"), ): @@ -380,6 +382,7 @@ async def list_source_files( actor=actor, include_content=include_content, strip_directory_prefix=True, # TODO: Reconsider this. This is purely for aesthetics. + check_status_updates=check_status_updates, ) @@ -408,51 +411,8 @@ async def get_file_metadata( if file_metadata.source_id != source_id: raise HTTPException(status_code=404, detail=f"File with id={file_id} not found in source {source_id}.") - # Check for timeout if status is not terminal - if not file_metadata.processing_status.is_terminal_state(): - if file_metadata.created_at: - # Handle timezone differences between PostgreSQL (timezone-aware) and SQLite (timezone-naive) - if settings.letta_pg_uri_no_default: - # PostgreSQL: both datetimes are timezone-aware - timeout_threshold = datetime.now(timezone.utc) - timedelta(minutes=settings.file_processing_timeout_minutes) - file_created_at = file_metadata.created_at - else: - # SQLite: both datetimes should be timezone-naive - timeout_threshold = datetime.utcnow() - timedelta(minutes=settings.file_processing_timeout_minutes) - file_created_at = file_metadata.created_at - - if file_created_at < timeout_threshold: - # Move file to error status with timeout message - timeout_message = settings.file_processing_timeout_error_message.format(settings.file_processing_timeout_minutes) - try: - file_metadata = await server.file_manager.update_file_status( - file_id=file_metadata.id, actor=actor, processing_status=FileProcessingStatus.ERROR, error_message=timeout_message - ) - except ValueError as e: - # state transition was blocked - log it but don't fail the request - logger.warning(f"Could not update file to timeout error state: {str(e)}") - # continue with existing file_metadata - - if should_use_pinecone() and file_metadata.processing_status == FileProcessingStatus.EMBEDDING: - ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor) - logger.info( - f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} ({file_metadata.file_name}) in organization {actor.organization_id}" - ) - - if len(ids) != file_metadata.chunks_embedded or len(ids) == file_metadata.total_chunks: - if len(ids) != file_metadata.total_chunks: - file_status = file_metadata.processing_status - else: - file_status = FileProcessingStatus.COMPLETED - try: - file_metadata = await server.file_manager.update_file_status( - file_id=file_metadata.id, actor=actor, chunks_embedded=len(ids), processing_status=file_status - ) - except ValueError as e: - # state transition was blocked - this is a race condition - # log it but don't fail the request since we're just reading metadata - logger.warning(f"Race condition detected in get_file_metadata: {str(e)}") - # return the current file state without updating + # Check and update file status (timeout check and pinecone embedding sync) + file_metadata = await server.file_manager.check_and_update_file_status(file_metadata, actor) return file_metadata diff --git a/letta/services/file_manager.py b/letta/services/file_manager.py index 3f613059..b43c3e19 100644 --- a/letta/services/file_manager.py +++ b/letta/services/file_manager.py @@ -1,6 +1,6 @@ import asyncio import os -from datetime import datetime +from datetime import datetime, timedelta, timezone from typing import List, Optional from sqlalchemy import func, select, update @@ -9,6 +9,8 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import selectinload from letta.constants import MAX_FILENAME_LENGTH +from letta.helpers.pinecone_utils import list_pinecone_index_for_files, should_use_pinecone +from letta.log import get_logger from letta.orm.errors import NoResultFound from letta.orm.file import FileContent as FileContentModel from letta.orm.file import FileMetadata as FileMetadataModel @@ -20,8 +22,11 @@ from letta.schemas.source import Source as PydanticSource from letta.schemas.source_metadata import FileStats, OrganizationSourcesStats, SourceStats from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry +from letta.settings import settings from letta.utils import enforce_types +logger = get_logger(__name__) + class DuplicateFileError(Exception): """Raised when a duplicate file is encountered and error handling is specified""" @@ -277,6 +282,79 @@ class FileManager: ) return await file_orm.to_pydantic_async() + @enforce_types + @trace_method + async def check_and_update_file_status( + self, + file_metadata: PydanticFileMetadata, + actor: PydanticUser, + ) -> PydanticFileMetadata: + """ + Check and update file status for timeout and embedding completion. + + This method consolidates logic for: + 1. Checking if a file has timed out during processing + 2. Checking Pinecone embedding status and updating counts + + Args: + file_metadata: The file metadata to check + actor: User performing the check + + Returns: + Updated file metadata with current status + """ + # check for timeout if status is not terminal + if not file_metadata.processing_status.is_terminal_state(): + if file_metadata.created_at: + # handle timezone differences between PostgreSQL (timezone-aware) and SQLite (timezone-naive) + if settings.letta_pg_uri_no_default: + # postgresql: both datetimes are timezone-aware + timeout_threshold = datetime.now(timezone.utc) - timedelta(minutes=settings.file_processing_timeout_minutes) + file_created_at = file_metadata.created_at + else: + # sqlite: both datetimes should be timezone-naive + timeout_threshold = datetime.utcnow() - timedelta(minutes=settings.file_processing_timeout_minutes) + file_created_at = file_metadata.created_at + + if file_created_at < timeout_threshold: + # move file to error status with timeout message + timeout_message = settings.file_processing_timeout_error_message.format(settings.file_processing_timeout_minutes) + try: + file_metadata = await self.update_file_status( + file_id=file_metadata.id, + actor=actor, + processing_status=FileProcessingStatus.ERROR, + error_message=timeout_message, + ) + except ValueError as e: + # state transition was blocked - log it but don't fail + logger.warning(f"Could not update file to timeout error state: {str(e)}") + # continue with existing file_metadata + + # check pinecone embedding status + if should_use_pinecone() and file_metadata.processing_status == FileProcessingStatus.EMBEDDING: + ids = await list_pinecone_index_for_files(file_id=file_metadata.id, actor=actor) + logger.info( + f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_metadata.id} ({file_metadata.file_name}) in organization {actor.organization_id}" + ) + + if len(ids) != file_metadata.chunks_embedded or len(ids) == file_metadata.total_chunks: + if len(ids) != file_metadata.total_chunks: + file_status = file_metadata.processing_status + else: + file_status = FileProcessingStatus.COMPLETED + try: + file_metadata = await self.update_file_status( + file_id=file_metadata.id, actor=actor, chunks_embedded=len(ids), processing_status=file_status + ) + except ValueError as e: + # state transition was blocked - this is a race condition + # log it but don't fail since we're just checking status + logger.warning(f"Race condition detected in check_and_update_file_status: {str(e)}") + # return the current file state without updating + + return file_metadata + @enforce_types @trace_method async def upsert_file_content( @@ -332,8 +410,22 @@ class FileManager: limit: Optional[int] = 50, include_content: bool = False, strip_directory_prefix: bool = False, + check_status_updates: bool = False, ) -> List[PydanticFileMetadata]: - """List all files with optional pagination.""" + """List all files with optional pagination and status checking. + + Args: + source_id: Source to list files from + actor: User performing the request + after: Pagination cursor + limit: Maximum number of files to return + include_content: Whether to include file content + strip_directory_prefix: Whether to strip directory prefix from filenames + check_status_updates: Whether to check and update status for timeout and embedding completion + + Returns: + List of file metadata + """ async with db_registry.async_session() as session: options = [selectinload(FileMetadataModel.content)] if include_content else None @@ -345,10 +437,19 @@ class FileManager: source_id=source_id, query_options=options, ) - return [ - await file.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) - for file in files - ] + + # convert all files to pydantic models + file_metadatas = await asyncio.gather( + *[file.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) for file in files] + ) + + # if status checking is enabled, check all files concurrently + if check_status_updates: + file_metadatas = await asyncio.gather( + *[self.check_and_update_file_status(file_metadata, actor) for file_metadata in file_metadatas] + ) + + return file_metadatas @enforce_types @trace_method diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py index 180469ca..2d4036ef 100644 --- a/tests/helpers/utils.py +++ b/tests/helpers/utils.py @@ -308,3 +308,43 @@ def upload_file_and_wait( raise RuntimeError(f"File processing failed: {file_metadata.error_message}") return file_metadata + + +def upload_file_and_wait_list_files( + client: Letta, + source_id: str, + file_path: str, + name: Optional[str] = None, + max_wait: int = 60, + duplicate_handling: Optional[str] = None, +): + """Helper function to upload a file and wait for processing using list_files instead of get_file_metadata""" + with open(file_path, "rb") as f: + if duplicate_handling: + file_metadata = client.sources.files.upload(source_id=source_id, file=f, duplicate_handling=duplicate_handling, name=name) + else: + file_metadata = client.sources.files.upload(source_id=source_id, file=f, name=name) + + # wait for the file to be processed using list_files + start_time = time.time() + while file_metadata.processing_status != "completed" and file_metadata.processing_status != "error": + if time.time() - start_time > max_wait: + raise TimeoutError(f"File processing timed out after {max_wait} seconds") + time.sleep(1) + + # use list_files to get all files and find our specific file + files = client.sources.files.list(source_id=source_id, limit=100) + # find the file with matching id + for file in files: + if file.id == file_metadata.id: + file_metadata = file + break + else: + raise RuntimeError(f"File {file_metadata.id} not found in source files list") + + print("Waiting for file processing to complete (via list_files)...", file_metadata.processing_status) + + if file_metadata.processing_status == "error": + raise RuntimeError(f"File processing failed: {file_metadata.error_message}") + + return file_metadata diff --git a/tests/test_sources.py b/tests/test_sources.py index cb0d2d1e..55bb4434 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -1,3 +1,4 @@ +import asyncio import os import re import tempfile @@ -14,11 +15,12 @@ from letta_client import MessageCreate as ClientMessageCreate from letta_client.types import AgentState from letta.constants import DEFAULT_ORG_ID, FILES_TOOLS +from letta.helpers.pinecone_utils import should_use_pinecone from letta.schemas.enums import FileProcessingStatus, ToolType from letta.schemas.message import MessageCreate from letta.schemas.user import User from letta.settings import settings -from tests.helpers.utils import upload_file_and_wait +from tests.helpers.utils import upload_file_and_wait, upload_file_and_wait_list_files from tests.utils import wait_for_server # Constants @@ -1131,10 +1133,43 @@ def test_pinecone_search_files_tool(client: LettaSDKClient): ), f"Search results should contain relevant content: {search_results}" +def test_pinecone_list_files_status(client: LettaSDKClient): + """Test that list_source_files properly syncs embedding status with Pinecone""" + if not should_use_pinecone(): + pytest.skip("Pinecone not configured (missing API key or disabled), skipping Pinecone-specific tests") + + # create source + source = client.sources.create(name="test_list_files_status", embedding="openai/text-embedding-3-small") + + file_paths = ["tests/data/long_test.txt"] + uploaded_files = [] + for file_path in file_paths: + # use the new helper that polls via list_files + file_metadata = upload_file_and_wait_list_files(client, source.id, file_path) + uploaded_files.append(file_metadata) + assert file_metadata.processing_status == "completed", f"File {file_path} should be completed" + + # now get files using list_source_files to verify status checking works + files_list = client.sources.files.list(source_id=source.id, limit=100) + + # verify all files show completed status and have proper embedding counts + assert len(files_list) == len(uploaded_files), f"Expected {len(uploaded_files)} files, got {len(files_list)}" + + for file_metadata in files_list: + assert file_metadata.processing_status == "completed", f"File {file_metadata.file_name} should show completed status" + + # verify embedding counts for files that have chunks + if file_metadata.total_chunks and file_metadata.total_chunks > 0: + assert ( + file_metadata.chunks_embedded == file_metadata.total_chunks + ), f"File {file_metadata.file_name} should have all chunks embedded: {file_metadata.chunks_embedded}/{file_metadata.total_chunks}" + + # cleanup + client.sources.delete(source_id=source.id) + + def test_pinecone_lifecycle_file_and_source_deletion(client: LettaSDKClient): """Test that file and source deletion removes records from Pinecone""" - import asyncio - from letta.helpers.pinecone_utils import list_pinecone_index_for_files, should_use_pinecone if not should_use_pinecone(): @@ -1203,8 +1238,6 @@ def test_pinecone_lifecycle_file_and_source_deletion(client: LettaSDKClient): len(records_after) == 0 ), f"All source records should be removed from Pinecone after source deletion, but found {len(records_after)}" - print("✓ Pinecone lifecycle verified - namespace is clean after source deletion") - def test_agent_open_file(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): """Test client.agents.open_file() function"""