feat: Move file upload to temporal [LET-6089] (#6024)

* Finish writing temporal upload file activity

* Remove prints

* Rewrite content re-use
This commit is contained in:
Matthew Zhou
2025-11-06 16:21:32 -08:00
committed by Caren Thomas
parent 940fde24e9
commit 8df78e9429
5 changed files with 61 additions and 8 deletions

View File

@@ -238,6 +238,7 @@ async def upload_file_to_folder(
"""
Upload a file to a data folder.
"""
# NEW: Cloud based file processing
# Determine file's MIME type
mimetypes.guess_type(file.filename)[0] or "application/octet-stream"
@@ -271,9 +272,28 @@ async def upload_file_to_folder(
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
folder = await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor)
# Read file bytes once
file_bytes = await file.read()
content = await file.read()
# If enabled, delegate to Temporal workflow (Lettuce) and return its result
if settings.use_lettuce_for_file_uploads:
from letta.services.lettuce import LettuceClient
lettuce_client = await LettuceClient.create()
result = await lettuce_client.upload_file_to_folder(
folder_id=folder_id,
actor_id=actor.id,
file_name=file.filename,
content=file_bytes,
content_type=raw_ct or None,
duplicate_handling=duplicate_handling,
override_name=name,
)
if result is not None:
return result.file_metadata
folder = await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor)
content = file_bytes
file_size_mb = len(content) / (1024 * 1024)
from letta.log import get_logger

View File

@@ -251,9 +251,28 @@ async def upload_file_to_source(
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
source = await server.source_manager.get_source_by_id(source_id=source_id, actor=actor)
# Read file bytes once
file_bytes = await file.read()
content = await file.read()
# If enabled, delegate to Temporal workflow (Lettuce) and return its result
if settings.use_lettuce_for_file_uploads:
from letta.services.lettuce import LettuceClient
lettuce_client = await LettuceClient.create()
result = await lettuce_client.upload_file_to_folder(
folder_id=source_id, # same underlying entity
actor_id=actor.id,
file_name=file.filename,
content=file_bytes,
content_type=raw_ct or None,
duplicate_handling=duplicate_handling,
override_name=name,
)
if result is not None:
return result.file_metadata
source = await server.source_manager.get_source_by_id(source_id=source_id, actor=actor)
content = file_bytes
file_size_mb = len(content) / (1024 * 1024)
from letta.log import get_logger

View File

@@ -1,6 +1,3 @@
try:
from .lettuce_client import LettuceClient
except ImportError:
from .lettuce_client_base import LettuceClient
from .lettuce_client import LettuceClient
__all__ = ["LettuceClient"]

View File

@@ -1,5 +1,6 @@
from letta.constants import DEFAULT_MAX_STEPS
from letta.schemas.agent import AgentState
from letta.schemas.enums import DuplicateFileHandling
from letta.schemas.letta_message import MessageType
from letta.schemas.message import MessageCreate
from letta.schemas.user import User
@@ -84,3 +85,17 @@ class LettuceClient:
str | None: The ID of the run or None if client is not available.
"""
return None
async def upload_file_to_folder(
self,
*,
folder_id: str,
actor_id: str,
file_name: str,
content: bytes,
content_type: str | None = None,
duplicate_handling: DuplicateFileHandling | None = None,
override_name: str | None = None,
):
"""Kick off upload workflow. Base client does nothing and returns None."""
return None

View File

@@ -293,6 +293,8 @@ class Settings(BaseSettings):
# experimental toggle
use_vertex_structured_outputs_experimental: bool = False
use_asyncio_shield: bool = True
# Gate using Temporal (Lettuce) for file uploads via folders endpoint
use_lettuce_for_file_uploads: bool = False
# Database pool monitoring
enable_db_pool_monitoring: bool = True # Enable connection pool monitoring