diff --git a/letta/server/rest_api/routers/v1/folders.py b/letta/server/rest_api/routers/v1/folders.py index 3a08a63c..d72d8ddd 100644 --- a/letta/server/rest_api/routers/v1/folders.py +++ b/letta/server/rest_api/routers/v1/folders.py @@ -35,7 +35,7 @@ from letta.services.file_processor.file_types import get_allowed_media_types, ge from letta.services.file_processor.parser.markitdown_parser import MarkitdownFileParser from letta.services.file_processor.parser.mistral_parser import MistralFileParser from letta.settings import settings -from letta.utils import safe_create_task, sanitize_filename +from letta.utils import safe_create_file_processing_task, safe_create_task, sanitize_filename logger = get_logger(__name__) @@ -312,8 +312,11 @@ async def upload_file_to_folder( # Use cloud processing for all files (simple files always, complex files with Mistral key) logger.info("Running experimental cloud based file processing...") - safe_create_task( + safe_create_file_processing_task( load_file_to_source_cloud(server, agent_states, content, folder_id, actor, folder.embedding_config, file_metadata), + file_metadata=file_metadata, + server=server, + actor=actor, logger=logger, label="file_processor.process", ) diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index a8de93ce..cbec503d 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -1,4 +1,3 @@ -import asyncio import mimetypes import os import tempfile @@ -8,6 +7,7 @@ from typing import List, Optional from fastapi import APIRouter, Depends, Header, HTTPException, Query, UploadFile from starlette import status +from starlette.responses import Response import letta.constants as constants from letta.helpers.pinecone_utils import ( @@ -35,14 +35,13 @@ from letta.services.file_processor.file_types import get_allowed_media_types, ge from letta.services.file_processor.parser.markitdown_parser import MarkitdownFileParser from letta.services.file_processor.parser.mistral_parser import MistralFileParser from letta.settings import settings -from letta.utils import safe_create_task, sanitize_filename +from letta.utils import safe_create_file_processing_task, safe_create_task, sanitize_filename logger = get_logger(__name__) # Register all supported file types with Python's mimetypes module register_mime_types() - router = APIRouter(prefix="/sources", tags=["sources"]) @@ -312,8 +311,11 @@ async def upload_file_to_source( # Use cloud processing for all files (simple files always, complex files with Mistral key) logger.info("Running experimental cloud based file processing...") - safe_create_task( + safe_create_file_processing_task( load_file_to_source_cloud(server, agent_states, content, source_id, actor, source.embedding_config, file_metadata), + file_metadata=file_metadata, + server=server, + actor=actor, logger=logger, label="file_processor.process", ) diff --git a/letta/services/file_manager.py b/letta/services/file_manager.py index 6dc06778..3f613059 100644 --- a/letta/services/file_manager.py +++ b/letta/services/file_manager.py @@ -174,6 +174,10 @@ class FileManager: if processing_status is None and error_message is None and total_chunks is None and chunks_embedded is None: raise ValueError("Nothing to update") + # validate that ERROR status must have an error message + if processing_status == FileProcessingStatus.ERROR and not error_message: + raise ValueError("Error message is required when setting processing status to ERROR") + values: dict[str, object] = {"updated_at": datetime.utcnow()} if processing_status is not None: values["processing_status"] = processing_status diff --git a/letta/utils.py b/letta/utils.py index 4ce4c10c..7ca81bff 100644 --- a/letta/utils.py +++ b/letta/utils.py @@ -1103,6 +1103,43 @@ def safe_create_task(coro, logger: Logger, label: str = "background task"): return asyncio.create_task(wrapper()) +def safe_create_file_processing_task(coro, file_metadata, server, actor, logger: Logger, label: str = "file processing task"): + """ + Create a task for file processing that updates file status on failure. + + This is a specialized version of safe_create_task that ensures file + status is properly updated to ERROR with a meaningful message if the + task fails. + + Args: + coro: The coroutine to execute + file_metadata: FileMetadata object being processed + server: Server instance with file_manager + actor: User performing the operation + logger: Logger instance for error logging + label: Description of the task for logging + """ + from letta.schemas.enums import FileProcessingStatus + + async def wrapper(): + try: + await coro + except Exception as e: + logger.exception(f"{label} failed for file {file_metadata.file_name} with {type(e).__name__}: {e}") + # update file status to ERROR with a meaningful message + try: + await server.file_manager.update_file_status( + file_id=file_metadata.id, + actor=actor, + processing_status=FileProcessingStatus.ERROR, + error_message=f"Processing failed: {str(e)}" if str(e) else "Processing failed due to an unexpected error", + ) + except Exception as update_error: + logger.error(f"Failed to update file status to ERROR for {file_metadata.id}: {update_error}") + + return asyncio.create_task(wrapper()) + + class CancellationSignal: """ A signal that can be checked for cancellation during streaming operations. diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py index 7ce5b989..180469ca 100644 --- a/tests/helpers/utils.py +++ b/tests/helpers/utils.py @@ -286,17 +286,11 @@ def upload_file_and_wait( file_path: str, name: Optional[str] = None, max_wait: int = 60, - duplicate_handling: Optional["DuplicateFileHandling"] = None, + duplicate_handling: Optional[str] = None, ): """Helper function to upload a file and wait for processing to complete""" - from letta_client import DuplicateFileHandling as ClientDuplicateFileHandling - with open(file_path, "rb") as f: if duplicate_handling: - # handle both client and server enum types - if hasattr(duplicate_handling, "value"): - # server enum type - duplicate_handling = ClientDuplicateFileHandling(duplicate_handling.value) file_metadata = client.sources.files.upload(source_id=source_id, file=f, duplicate_handling=duplicate_handling, name=name) else: file_metadata = client.sources.files.upload(source_id=source_id, file=f, name=name)