feat: Add timeout to file processing (#3513)
This commit is contained in:
@@ -102,6 +102,10 @@ class FileProcessingStatus(str, Enum):
|
||||
COMPLETED = "completed"
|
||||
ERROR = "error"
|
||||
|
||||
def is_terminal_state(self) -> bool:
|
||||
"""Check if the processing status is in a terminal state (completed or error)."""
|
||||
return self in (FileProcessingStatus.COMPLETED, FileProcessingStatus.ERROR)
|
||||
|
||||
|
||||
class ToolType(str, Enum):
|
||||
CUSTOM = "custom"
|
||||
|
||||
@@ -2,6 +2,7 @@ import asyncio
|
||||
import mimetypes
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
@@ -393,6 +394,31 @@ async def get_file_metadata(
|
||||
if file_metadata.source_id != source_id:
|
||||
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found in source {source_id}.")
|
||||
|
||||
# Check for timeout if status is not terminal
|
||||
if not file_metadata.processing_status.is_terminal_state():
|
||||
if file_metadata.created_at:
|
||||
# Handle timezone differences between PostgreSQL (timezone-aware) and SQLite (timezone-naive)
|
||||
if settings.letta_pg_uri_no_default:
|
||||
# PostgreSQL: both datetimes are timezone-aware
|
||||
timeout_threshold = datetime.now(timezone.utc) - timedelta(minutes=settings.file_processing_timeout_minutes)
|
||||
file_created_at = file_metadata.created_at
|
||||
else:
|
||||
# SQLite: both datetimes should be timezone-naive
|
||||
timeout_threshold = datetime.utcnow() - timedelta(minutes=settings.file_processing_timeout_minutes)
|
||||
file_created_at = file_metadata.created_at
|
||||
|
||||
if file_created_at < timeout_threshold:
|
||||
# Move file to error status with timeout message
|
||||
timeout_message = settings.file_processing_timeout_error_message.format(settings.file_processing_timeout_minutes)
|
||||
try:
|
||||
file_metadata = await server.file_manager.update_file_status(
|
||||
file_id=file_metadata.id, actor=actor, processing_status=FileProcessingStatus.ERROR, error_message=timeout_message
|
||||
)
|
||||
except ValueError as e:
|
||||
# state transition was blocked - log it but don't fail the request
|
||||
logger.warning(f"Could not update file to timeout error state: {str(e)}")
|
||||
# continue with existing file_metadata
|
||||
|
||||
if should_use_pinecone() and file_metadata.processing_status == FileProcessingStatus.EMBEDDING:
|
||||
ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor)
|
||||
logger.info(
|
||||
|
||||
@@ -278,6 +278,10 @@ class Settings(BaseSettings):
|
||||
pinecone_agent_index: Optional[str] = "recall"
|
||||
upsert_pinecone_indices: bool = False
|
||||
|
||||
# File processing timeout settings
|
||||
file_processing_timeout_minutes: int = 30
|
||||
file_processing_timeout_error_message: str = "File processing timed out after {} minutes. Please try again."
|
||||
|
||||
@property
|
||||
def letta_pg_uri(self) -> str:
|
||||
if self.pg_uri:
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
import pytest
|
||||
from dotenv import load_dotenv
|
||||
@@ -13,6 +14,7 @@ from letta_client.types import AgentState
|
||||
|
||||
from letta.constants import DEFAULT_ORG_ID, FILES_TOOLS
|
||||
from letta.orm.enums import ToolType
|
||||
from letta.schemas.enums import FileProcessingStatus
|
||||
from letta.schemas.message import MessageCreate
|
||||
from letta.schemas.user import User
|
||||
from letta.settings import settings
|
||||
@@ -1045,3 +1047,66 @@ def test_agent_close_all_open_files(disable_pinecone, client: LettaSDKClient, ag
|
||||
# Verify result is a list of strings
|
||||
assert isinstance(result, list), f"Expected list, got {type(result)}"
|
||||
assert all(isinstance(item, str) for item in result), "All items in result should be strings"
|
||||
|
||||
|
||||
def test_file_processing_timeout(disable_pinecone, client: LettaSDKClient):
|
||||
"""Test that files in non-terminal states are moved to error after timeout"""
|
||||
# Create a source
|
||||
source = client.sources.create(name="test_timeout_source", embedding="openai/text-embedding-3-small")
|
||||
|
||||
# Upload a file
|
||||
file_path = "tests/data/test.txt"
|
||||
with open(file_path, "rb") as f:
|
||||
file_metadata = client.sources.files.upload(source_id=source.id, file=f)
|
||||
|
||||
# Get the file ID
|
||||
file_id = file_metadata.id
|
||||
|
||||
# Test the is_terminal_state method directly (this doesn't require server mocking)
|
||||
assert FileProcessingStatus.COMPLETED.is_terminal_state() == True
|
||||
assert FileProcessingStatus.ERROR.is_terminal_state() == True
|
||||
assert FileProcessingStatus.PARSING.is_terminal_state() == False
|
||||
assert FileProcessingStatus.EMBEDDING.is_terminal_state() == False
|
||||
assert FileProcessingStatus.PENDING.is_terminal_state() == False
|
||||
|
||||
# For testing the actual timeout logic, we can check the current file status
|
||||
current_file = client.sources.get_file_metadata(source_id=source.id, file_id=file_id)
|
||||
|
||||
# Convert string status to enum for testing
|
||||
status_enum = FileProcessingStatus(current_file.processing_status)
|
||||
|
||||
# Verify that files in terminal states are not affected by timeout checks
|
||||
if status_enum.is_terminal_state():
|
||||
# This is the expected behavior - files that completed processing shouldn't timeout
|
||||
print(f"File {file_id} is in terminal state: {current_file.processing_status}")
|
||||
assert status_enum in [FileProcessingStatus.COMPLETED, FileProcessingStatus.ERROR]
|
||||
else:
|
||||
# If file is still processing, it should eventually complete or timeout
|
||||
# In a real scenario, we'd wait and check, but for unit tests we just verify the logic exists
|
||||
print(f"File {file_id} is still processing: {current_file.processing_status}")
|
||||
assert status_enum in [FileProcessingStatus.PENDING, FileProcessingStatus.PARSING, FileProcessingStatus.EMBEDDING]
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_file_processing_timeout_logic():
|
||||
"""Test the timeout logic directly without server dependencies"""
|
||||
from datetime import timezone
|
||||
|
||||
# Test scenario: file created 35 minutes ago, timeout is 30 minutes
|
||||
old_time = datetime.now(timezone.utc) - timedelta(minutes=35)
|
||||
current_time = datetime.now(timezone.utc)
|
||||
timeout_minutes = 30
|
||||
|
||||
# Calculate timeout threshold
|
||||
timeout_threshold = current_time - timedelta(minutes=timeout_minutes)
|
||||
|
||||
# Verify timeout logic
|
||||
assert old_time < timeout_threshold, "File created 35 minutes ago should be past 30-minute timeout"
|
||||
|
||||
# Test edge case: file created exactly at timeout
|
||||
edge_time = current_time - timedelta(minutes=timeout_minutes)
|
||||
assert not (edge_time < timeout_threshold), "File created exactly at timeout should not trigger timeout"
|
||||
|
||||
# Test recent file
|
||||
recent_time = current_time - timedelta(minutes=10)
|
||||
assert not (recent_time < timeout_threshold), "Recent file should not trigger timeout"
|
||||
|
||||
Reference in New Issue
Block a user