feat: Add better error catching for files upload (#4145)
This commit is contained in:
@@ -35,7 +35,7 @@ from letta.services.file_processor.file_types import get_allowed_media_types, ge
|
||||
from letta.services.file_processor.parser.markitdown_parser import MarkitdownFileParser
|
||||
from letta.services.file_processor.parser.mistral_parser import MistralFileParser
|
||||
from letta.settings import settings
|
||||
from letta.utils import safe_create_task, sanitize_filename
|
||||
from letta.utils import safe_create_file_processing_task, safe_create_task, sanitize_filename
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -312,8 +312,11 @@ async def upload_file_to_folder(
|
||||
|
||||
# Use cloud processing for all files (simple files always, complex files with Mistral key)
|
||||
logger.info("Running experimental cloud based file processing...")
|
||||
safe_create_task(
|
||||
safe_create_file_processing_task(
|
||||
load_file_to_source_cloud(server, agent_states, content, folder_id, actor, folder.embedding_config, file_metadata),
|
||||
file_metadata=file_metadata,
|
||||
server=server,
|
||||
actor=actor,
|
||||
logger=logger,
|
||||
label="file_processor.process",
|
||||
)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import mimetypes
|
||||
import os
|
||||
import tempfile
|
||||
@@ -8,6 +7,7 @@ from typing import List, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Header, HTTPException, Query, UploadFile
|
||||
from starlette import status
|
||||
from starlette.responses import Response
|
||||
|
||||
import letta.constants as constants
|
||||
from letta.helpers.pinecone_utils import (
|
||||
@@ -35,14 +35,13 @@ from letta.services.file_processor.file_types import get_allowed_media_types, ge
|
||||
from letta.services.file_processor.parser.markitdown_parser import MarkitdownFileParser
|
||||
from letta.services.file_processor.parser.mistral_parser import MistralFileParser
|
||||
from letta.settings import settings
|
||||
from letta.utils import safe_create_task, sanitize_filename
|
||||
from letta.utils import safe_create_file_processing_task, safe_create_task, sanitize_filename
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
# Register all supported file types with Python's mimetypes module
|
||||
register_mime_types()
|
||||
|
||||
|
||||
router = APIRouter(prefix="/sources", tags=["sources"])
|
||||
|
||||
|
||||
@@ -312,8 +311,11 @@ async def upload_file_to_source(
|
||||
|
||||
# Use cloud processing for all files (simple files always, complex files with Mistral key)
|
||||
logger.info("Running experimental cloud based file processing...")
|
||||
safe_create_task(
|
||||
safe_create_file_processing_task(
|
||||
load_file_to_source_cloud(server, agent_states, content, source_id, actor, source.embedding_config, file_metadata),
|
||||
file_metadata=file_metadata,
|
||||
server=server,
|
||||
actor=actor,
|
||||
logger=logger,
|
||||
label="file_processor.process",
|
||||
)
|
||||
|
||||
@@ -174,6 +174,10 @@ class FileManager:
|
||||
if processing_status is None and error_message is None and total_chunks is None and chunks_embedded is None:
|
||||
raise ValueError("Nothing to update")
|
||||
|
||||
# validate that ERROR status must have an error message
|
||||
if processing_status == FileProcessingStatus.ERROR and not error_message:
|
||||
raise ValueError("Error message is required when setting processing status to ERROR")
|
||||
|
||||
values: dict[str, object] = {"updated_at": datetime.utcnow()}
|
||||
if processing_status is not None:
|
||||
values["processing_status"] = processing_status
|
||||
|
||||
@@ -1103,6 +1103,43 @@ def safe_create_task(coro, logger: Logger, label: str = "background task"):
|
||||
return asyncio.create_task(wrapper())
|
||||
|
||||
|
||||
def safe_create_file_processing_task(coro, file_metadata, server, actor, logger: Logger, label: str = "file processing task"):
|
||||
"""
|
||||
Create a task for file processing that updates file status on failure.
|
||||
|
||||
This is a specialized version of safe_create_task that ensures file
|
||||
status is properly updated to ERROR with a meaningful message if the
|
||||
task fails.
|
||||
|
||||
Args:
|
||||
coro: The coroutine to execute
|
||||
file_metadata: FileMetadata object being processed
|
||||
server: Server instance with file_manager
|
||||
actor: User performing the operation
|
||||
logger: Logger instance for error logging
|
||||
label: Description of the task for logging
|
||||
"""
|
||||
from letta.schemas.enums import FileProcessingStatus
|
||||
|
||||
async def wrapper():
|
||||
try:
|
||||
await coro
|
||||
except Exception as e:
|
||||
logger.exception(f"{label} failed for file {file_metadata.file_name} with {type(e).__name__}: {e}")
|
||||
# update file status to ERROR with a meaningful message
|
||||
try:
|
||||
await server.file_manager.update_file_status(
|
||||
file_id=file_metadata.id,
|
||||
actor=actor,
|
||||
processing_status=FileProcessingStatus.ERROR,
|
||||
error_message=f"Processing failed: {str(e)}" if str(e) else "Processing failed due to an unexpected error",
|
||||
)
|
||||
except Exception as update_error:
|
||||
logger.error(f"Failed to update file status to ERROR for {file_metadata.id}: {update_error}")
|
||||
|
||||
return asyncio.create_task(wrapper())
|
||||
|
||||
|
||||
class CancellationSignal:
|
||||
"""
|
||||
A signal that can be checked for cancellation during streaming operations.
|
||||
|
||||
@@ -286,17 +286,11 @@ def upload_file_and_wait(
|
||||
file_path: str,
|
||||
name: Optional[str] = None,
|
||||
max_wait: int = 60,
|
||||
duplicate_handling: Optional["DuplicateFileHandling"] = None,
|
||||
duplicate_handling: Optional[str] = None,
|
||||
):
|
||||
"""Helper function to upload a file and wait for processing to complete"""
|
||||
from letta_client import DuplicateFileHandling as ClientDuplicateFileHandling
|
||||
|
||||
with open(file_path, "rb") as f:
|
||||
if duplicate_handling:
|
||||
# handle both client and server enum types
|
||||
if hasattr(duplicate_handling, "value"):
|
||||
# server enum type
|
||||
duplicate_handling = ClientDuplicateFileHandling(duplicate_handling.value)
|
||||
file_metadata = client.sources.files.upload(source_id=source_id, file=f, duplicate_handling=duplicate_handling, name=name)
|
||||
else:
|
||||
file_metadata = client.sources.files.upload(source_id=source_id, file=f, name=name)
|
||||
|
||||
Reference in New Issue
Block a user