diff --git a/letta/schemas/enums.py b/letta/schemas/enums.py index a76178f9..fba439cb 100644 --- a/letta/schemas/enums.py +++ b/letta/schemas/enums.py @@ -102,6 +102,10 @@ class FileProcessingStatus(str, Enum): COMPLETED = "completed" ERROR = "error" + def is_terminal_state(self) -> bool: + """Check if the processing status is in a terminal state (completed or error).""" + return self in (FileProcessingStatus.COMPLETED, FileProcessingStatus.ERROR) + class ToolType(str, Enum): CUSTOM = "custom" diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index efba90c3..992eb064 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -2,6 +2,7 @@ import asyncio import mimetypes import os import tempfile +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import List, Optional @@ -393,6 +394,31 @@ async def get_file_metadata( if file_metadata.source_id != source_id: raise HTTPException(status_code=404, detail=f"File with id={file_id} not found in source {source_id}.") + # Check for timeout if status is not terminal + if not file_metadata.processing_status.is_terminal_state(): + if file_metadata.created_at: + # Handle timezone differences between PostgreSQL (timezone-aware) and SQLite (timezone-naive) + if settings.letta_pg_uri_no_default: + # PostgreSQL: both datetimes are timezone-aware + timeout_threshold = datetime.now(timezone.utc) - timedelta(minutes=settings.file_processing_timeout_minutes) + file_created_at = file_metadata.created_at + else: + # SQLite: both datetimes should be timezone-naive + timeout_threshold = datetime.utcnow() - timedelta(minutes=settings.file_processing_timeout_minutes) + file_created_at = file_metadata.created_at + + if file_created_at < timeout_threshold: + # Move file to error status with timeout message + timeout_message = settings.file_processing_timeout_error_message.format(settings.file_processing_timeout_minutes) + try: + file_metadata = await server.file_manager.update_file_status( + file_id=file_metadata.id, actor=actor, processing_status=FileProcessingStatus.ERROR, error_message=timeout_message + ) + except ValueError as e: + # state transition was blocked - log it but don't fail the request + logger.warning(f"Could not update file to timeout error state: {str(e)}") + # continue with existing file_metadata + if should_use_pinecone() and file_metadata.processing_status == FileProcessingStatus.EMBEDDING: ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor) logger.info( diff --git a/letta/settings.py b/letta/settings.py index 1d200552..f7537797 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -278,6 +278,10 @@ class Settings(BaseSettings): pinecone_agent_index: Optional[str] = "recall" upsert_pinecone_indices: bool = False + # File processing timeout settings + file_processing_timeout_minutes: int = 30 + file_processing_timeout_error_message: str = "File processing timed out after {} minutes. Please try again." + @property def letta_pg_uri(self) -> str: if self.pg_uri: diff --git a/tests/test_sources.py b/tests/test_sources.py index a3f49c82..e9315c3b 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -2,6 +2,7 @@ import os import re import threading import time +from datetime import datetime, timedelta import pytest from dotenv import load_dotenv @@ -13,6 +14,7 @@ from letta_client.types import AgentState from letta.constants import DEFAULT_ORG_ID, FILES_TOOLS from letta.orm.enums import ToolType +from letta.schemas.enums import FileProcessingStatus from letta.schemas.message import MessageCreate from letta.schemas.user import User from letta.settings import settings @@ -1045,3 +1047,66 @@ def test_agent_close_all_open_files(disable_pinecone, client: LettaSDKClient, ag # Verify result is a list of strings assert isinstance(result, list), f"Expected list, got {type(result)}" assert all(isinstance(item, str) for item in result), "All items in result should be strings" + + +def test_file_processing_timeout(disable_pinecone, client: LettaSDKClient): + """Test that files in non-terminal states are moved to error after timeout""" + # Create a source + source = client.sources.create(name="test_timeout_source", embedding="openai/text-embedding-3-small") + + # Upload a file + file_path = "tests/data/test.txt" + with open(file_path, "rb") as f: + file_metadata = client.sources.files.upload(source_id=source.id, file=f) + + # Get the file ID + file_id = file_metadata.id + + # Test the is_terminal_state method directly (this doesn't require server mocking) + assert FileProcessingStatus.COMPLETED.is_terminal_state() == True + assert FileProcessingStatus.ERROR.is_terminal_state() == True + assert FileProcessingStatus.PARSING.is_terminal_state() == False + assert FileProcessingStatus.EMBEDDING.is_terminal_state() == False + assert FileProcessingStatus.PENDING.is_terminal_state() == False + + # For testing the actual timeout logic, we can check the current file status + current_file = client.sources.get_file_metadata(source_id=source.id, file_id=file_id) + + # Convert string status to enum for testing + status_enum = FileProcessingStatus(current_file.processing_status) + + # Verify that files in terminal states are not affected by timeout checks + if status_enum.is_terminal_state(): + # This is the expected behavior - files that completed processing shouldn't timeout + print(f"File {file_id} is in terminal state: {current_file.processing_status}") + assert status_enum in [FileProcessingStatus.COMPLETED, FileProcessingStatus.ERROR] + else: + # If file is still processing, it should eventually complete or timeout + # In a real scenario, we'd wait and check, but for unit tests we just verify the logic exists + print(f"File {file_id} is still processing: {current_file.processing_status}") + assert status_enum in [FileProcessingStatus.PENDING, FileProcessingStatus.PARSING, FileProcessingStatus.EMBEDDING] + + +@pytest.mark.unit +def test_file_processing_timeout_logic(): + """Test the timeout logic directly without server dependencies""" + from datetime import timezone + + # Test scenario: file created 35 minutes ago, timeout is 30 minutes + old_time = datetime.now(timezone.utc) - timedelta(minutes=35) + current_time = datetime.now(timezone.utc) + timeout_minutes = 30 + + # Calculate timeout threshold + timeout_threshold = current_time - timedelta(minutes=timeout_minutes) + + # Verify timeout logic + assert old_time < timeout_threshold, "File created 35 minutes ago should be past 30-minute timeout" + + # Test edge case: file created exactly at timeout + edge_time = current_time - timedelta(minutes=timeout_minutes) + assert not (edge_time < timeout_threshold), "File created exactly at timeout should not trigger timeout" + + # Test recent file + recent_time = current_time - timedelta(minutes=10) + assert not (recent_time < timeout_threshold), "Recent file should not trigger timeout"