From 967cc3decfb622d7fb8fd8e6ab3b340a0bc4794e Mon Sep 17 00:00:00 2001 From: Ari Webb Date: Wed, 15 Oct 2025 10:01:13 -0700 Subject: [PATCH] move exceptions out of folders and sources [LET-4631] (#5444) --- letta/errors.py | 4 ++ letta/server/rest_api/app.py | 5 ++ letta/server/rest_api/routers/v1/folders.py | 33 +++++------- letta/server/rest_api/routers/v1/sources.py | 41 +++++---------- letta/server/server.py | 2 - letta/services/file_manager.py | 52 +++++++++---------- letta/services/source_manager.py | 15 +++--- .../tool_executor/files_tool_executor.py | 8 +-- 8 files changed, 71 insertions(+), 89 deletions(-) diff --git a/letta/errors.py b/letta/errors.py index d1962314..5d56e51a 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -98,6 +98,10 @@ class LettaUserNotFoundError(LettaError): """Error raised when a user is not found.""" +class LettaUnsupportedFileUploadError(LettaError): + """Error raised when an unsupported file upload is attempted.""" + + class LettaInvalidArgumentError(LettaError): """Error raised when an invalid argument is provided.""" diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 72ff96f4..6622cc2c 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -33,6 +33,7 @@ from letta.errors import ( LettaMCPTimeoutError, LettaToolCreateError, LettaToolNameConflictError, + LettaUnsupportedFileUploadError, LettaUserNotFoundError, LLMAuthenticationError, LLMError, @@ -244,6 +245,7 @@ def create_application() -> "FastAPI": _error_handler_408 = partial(error_handler_with_code, code=408) _error_handler_409 = partial(error_handler_with_code, code=409) _error_handler_410 = partial(error_handler_with_code, code=410) + _error_handler_415 = partial(error_handler_with_code, code=415) _error_handler_422 = partial(error_handler_with_code, code=422) _error_handler_500 = partial(error_handler_with_code, code=500) _error_handler_503 = partial(error_handler_with_code, code=503) @@ -273,6 +275,9 @@ def create_application() -> "FastAPI": app.add_exception_handler(UniqueConstraintViolationError, _error_handler_409) app.add_exception_handler(IntegrityError, _error_handler_409) + # 415 Unsupported Media Type errors + app.add_exception_handler(LettaUnsupportedFileUploadError, _error_handler_415) + # 422 Validation errors app.add_exception_handler(ValidationError, _error_handler_422) diff --git a/letta/server/rest_api/routers/v1/folders.py b/letta/server/rest_api/routers/v1/folders.py index 4fafda11..6394dd40 100644 --- a/letta/server/rest_api/routers/v1/folders.py +++ b/letta/server/rest_api/routers/v1/folders.py @@ -9,6 +9,7 @@ from starlette import status from starlette.responses import Response import letta.constants as constants +from letta.errors import LettaInvalidArgumentError, LettaUnsupportedFileUploadError from letta.helpers.pinecone_utils import ( delete_file_records_from_pinecone_index, delete_source_records_from_pinecone_index, @@ -70,8 +71,6 @@ async def retrieve_folder( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) folder = await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor) - if not folder: - raise HTTPException(status_code=404, detail=f"Folder with id={folder_id} not found.") return folder @@ -90,8 +89,6 @@ async def get_folder_by_name( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) folder = await server.source_manager.get_source_by_name(source_name=folder_name, actor=actor) - if not folder: - raise HTTPException(status_code=404, detail=f"Folder with name={folder_name} not found.") return folder.id @@ -157,8 +154,9 @@ async def create_folder( if not folder_create.embedding_config: if not folder_create.embedding: if settings.default_embedding_handle is None: - # TODO: modify error type - raise ValueError("Must specify either embedding or embedding_config in request") + raise LettaInvalidArgumentError( + "Must specify either embedding or embedding_config in request", argument_name="default_embedding_handle" + ) else: folder_create.embedding = settings.default_embedding_handle folder_create.embedding_config = await server.get_embedding_config_from_handle_async( @@ -188,8 +186,7 @@ async def modify_folder( """ # TODO: allow updating the handle/embedding config actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - if not await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor): - raise HTTPException(status_code=404, detail=f"Folder with id={folder_id} does not exist.") + await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor) return await server.source_manager.update_source(source_id=folder_id, source_update=folder, actor=actor) @@ -222,11 +219,9 @@ async def delete_folder( await server.remove_files_from_context_window(agent_state=agent_state, file_ids=file_ids, actor=actor) if agent_state.enable_sleeptime: - try: - block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=folder.name, actor=actor) + block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=folder.name, actor=actor) + if block: await server.block_manager.delete_block_async(block.id, actor) - except: - pass await server.delete_source(source_id=folder_id, actor=actor) @@ -265,9 +260,8 @@ async def upload_file_to_folder( # If still not allowed, reject with 415. if media_type not in allowed_media_types: - raise HTTPException( - status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, - detail=( + raise LettaUnsupportedFileUploadError( + message=( f"Unsupported file type: {media_type or 'unknown'} " f"(filename: {file.filename}). " f"Supported types: PDF, text files (.txt, .md), JSON, and code files (.py, .js, .java, etc.)." @@ -277,8 +271,6 @@ async def upload_file_to_folder( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) folder = await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor) - if folder is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Folder with id={folder_id} not found.") content = await file.read() @@ -297,8 +289,9 @@ async def upload_file_to_folder( if existing_file: # Duplicate found, handle based on strategy if duplicate_handling == DuplicateFileHandling.ERROR: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, detail=f"File '{original_filename}' already exists in folder '{folder.name}'" + raise LettaInvalidArgumentError( + message=f"File '{original_filename}' already exists in folder '{folder.name}'", + argument_name="duplicate_handling", ) elif duplicate_handling == DuplicateFileHandling.SKIP: # Return existing file metadata with custom header to indicate it was skipped @@ -528,8 +521,6 @@ async def delete_file_from_folder( await delete_file_records_from_pinecone_index(file_id=file_id, actor=actor) safe_create_task(sleeptime_document_ingest_async(server, folder_id, actor, clear_history=True), label="document_ingest_after_delete") - if deleted_file is None: - raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.") async def load_file_to_source_async(server: SyncServer, source_id: str, job_id: str, filename: str, bytes: bytes, actor: User): diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index 7197a710..5402a77a 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -9,6 +9,7 @@ from starlette import status from starlette.responses import Response import letta.constants as constants +from letta.errors import LettaInvalidArgumentError, LettaUnsupportedFileUploadError from letta.helpers.pinecone_utils import ( delete_file_records_from_pinecone_index, delete_source_records_from_pinecone_index, @@ -66,10 +67,7 @@ async def retrieve_source( Get all sources """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - source = await server.source_manager.get_source_by_id(source_id=source_id, actor=actor) - if not source: - raise HTTPException(status_code=404, detail=f"Source with id={source_id} not found.") return source @@ -85,8 +83,6 @@ async def get_source_id_by_name( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) source = await server.source_manager.get_source_by_name(source_name=source_name, actor=actor) - if not source: - raise HTTPException(status_code=404, detail=f"Source with name={source_name} not found.") return source.id @@ -138,8 +134,9 @@ async def create_source( if not source_create.embedding_config: if not source_create.embedding: if settings.default_embedding_handle is None: - # TODO: modify error type - raise ValueError("Must specify either embedding or embedding_config in request") + raise LettaInvalidArgumentError( + "Must specify either embedding or embedding_config in request", argument_name="default_embedding_handle" + ) else: source_create.embedding = settings.default_embedding_handle source_create.embedding_config = await server.get_embedding_config_from_handle_async( @@ -169,8 +166,7 @@ async def modify_source( """ # TODO: allow updating the handle/embedding config actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - if not await server.source_manager.get_source_by_id(source_id=source_id, actor=actor): - raise HTTPException(status_code=404, detail=f"Source with id={source_id} does not exist.") + await server.source_manager.get_source_by_id(source_id=source_id, actor=actor) return await server.source_manager.update_source(source_id=source_id, source_update=source, actor=actor) @@ -203,11 +199,9 @@ async def delete_source( await server.remove_files_from_context_window(agent_state=agent_state, file_ids=file_ids, actor=actor) if agent_state.enable_sleeptime: - try: - block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=source.name, actor=actor) + block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=source.name, actor=actor) + if block: await server.block_manager.delete_block_async(block.id, actor) - except: - pass await server.delete_source(source_id=source_id, actor=actor) @@ -246,9 +240,8 @@ async def upload_file_to_source( # If still not allowed, reject with 415. if media_type not in allowed_media_types: - raise HTTPException( - status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, - detail=( + raise LettaUnsupportedFileUploadError( + message=( f"Unsupported file type: {media_type or 'unknown'} " f"(filename: {file.filename}). " f"Supported types: PDF, text files (.txt, .md), JSON, and code files (.py, .js, .java, etc.)." @@ -258,8 +251,6 @@ async def upload_file_to_source( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) source = await server.source_manager.get_source_by_id(source_id=source_id, actor=actor) - if source is None: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Source with id={source_id} not found.") content = await file.read() @@ -278,8 +269,9 @@ async def upload_file_to_source( if existing_file: # Duplicate found, handle based on strategy if duplicate_handling == DuplicateFileHandling.ERROR: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, detail=f"File '{original_filename}' already exists in source '{source.name}'" + raise LettaInvalidArgumentError( + message=f"File '{original_filename}' already exists in source '{source.name}'", + argument_name="duplicate_handling", ) elif duplicate_handling == DuplicateFileHandling.SKIP: # Return existing file metadata with custom header to indicate it was skipped @@ -410,13 +402,6 @@ async def get_file_metadata( file_id=file_id, actor=actor, include_content=include_content, strip_directory_prefix=True ) - if not file_metadata: - raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.") - - # Verify the file belongs to the specified source - if file_metadata.source_id != source_id: - raise HTTPException(status_code=404, detail=f"File with id={file_id} not found in source {source_id}.") - # Check and update file status (timeout check and pinecone embedding sync) file_metadata = await server.file_manager.check_and_update_file_status(file_metadata, actor) @@ -452,8 +437,6 @@ async def delete_file_from_source( await delete_file_records_from_pinecone_index(file_id=file_id, actor=actor) safe_create_task(sleeptime_document_ingest_async(server, source_id, actor, clear_history=True), label="document_ingest_after_delete") - if deleted_file is None: - raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.") async def load_file_to_source_async(server: SyncServer, source_id: str, job_id: str, filename: str, bytes: bytes, actor: User): diff --git a/letta/server/server.py b/letta/server/server.py index 0ed583c1..b6a010d0 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -761,8 +761,6 @@ class SyncServer(object): # TODO: move this into a thread source = await self.source_manager.get_source_by_id(source_id=source_id) - if source is None: - raise NoResultFound(f"Source {source_id} does not exist") connector = DirectoryConnector(input_files=[file_path]) num_passages, num_documents = await self.load_data(user_id=source.created_by_id, source_name=source.name, connector=connector) diff --git a/letta/services/file_manager.py b/letta/services/file_manager.py index efff3da9..256b9e7b 100644 --- a/letta/services/file_manager.py +++ b/letta/services/file_manager.py @@ -60,7 +60,11 @@ class FileManager: text: Optional[str] = None, ) -> PydanticFileMetadata: # short-circuit if it already exists - existing = await self.get_file_by_id(file_metadata.id, actor=actor) + try: + existing = await self.get_file_by_id(file_metadata.id, actor=actor) + except NoResultFound: + existing = None + if existing: return existing @@ -105,35 +109,29 @@ class FileManager: lazy SELECT (avoids MissingGreenlet). """ async with db_registry.async_session() as session: - try: - if include_content: - # explicit eager load - query = ( - select(FileMetadataModel).where(FileMetadataModel.id == file_id).options(selectinload(FileMetadataModel.content)) - ) - # apply org-scoping if actor provided - if actor: - query = FileMetadataModel.apply_access_predicate( - query, - actor, - access=["read"], - access_type=AccessType.ORGANIZATION, - ) - - result = await session.execute(query) - file_orm = result.scalar_one() - else: - # fast path (metadata only) - file_orm = await FileMetadataModel.read_async( - db_session=session, - identifier=file_id, - actor=actor, + if include_content: + # explicit eager load + query = select(FileMetadataModel).where(FileMetadataModel.id == file_id).options(selectinload(FileMetadataModel.content)) + # apply org-scoping if actor provided + if actor: + query = FileMetadataModel.apply_access_predicate( + query, + actor, + access=["read"], + access_type=AccessType.ORGANIZATION, ) - return await file_orm.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) + result = await session.execute(query) + file_orm = result.scalar_one() + else: + # fast path (metadata only) + file_orm = await FileMetadataModel.read_async( + db_session=session, + identifier=file_id, + actor=actor, + ) - except NoResultFound: - return None + return await file_orm.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix) @enforce_types @trace_method diff --git a/letta/services/source_manager.py b/letta/services/source_manager.py index d1893c35..d738e4f1 100644 --- a/letta/services/source_manager.py +++ b/letta/services/source_manager.py @@ -61,7 +61,11 @@ class SourceManager: @trace_method async def create_source(self, source: PydanticSource, actor: PydanticUser) -> PydanticSource: """Create a new source based on the PydanticSource schema.""" - db_source = await self.get_source_by_id(source.id, actor=actor) + try: + db_source = await self.get_source_by_id(source.id, actor=actor) + except NoResultFound: + db_source = None + if db_source: return db_source else: @@ -346,11 +350,8 @@ class SourceManager: async def get_source_by_id(self, source_id: str, actor: Optional[PydanticUser] = None) -> Optional[PydanticSource]: """Retrieve a source by its ID.""" async with db_registry.async_session() as session: - try: - source = await SourceModel.read_async(db_session=session, identifier=source_id, actor=actor) - return source.to_pydantic() - except NoResultFound: - return None + source = await SourceModel.read_async(db_session=session, identifier=source_id, actor=actor) + return source.to_pydantic() @enforce_types @trace_method @@ -364,7 +365,7 @@ class SourceManager: limit=1, ) if not sources: - return None + raise NoResultFound(f"Source with name={source_name} not found.") else: return sources[0].to_pydantic() diff --git a/letta/services/tool_executor/files_tool_executor.py b/letta/services/tool_executor/files_tool_executor.py index fb3c4718..d05d42f3 100644 --- a/letta/services/tool_executor/files_tool_executor.py +++ b/letta/services/tool_executor/files_tool_executor.py @@ -2,6 +2,8 @@ import asyncio import re from typing import Any, Dict, List, Optional +from sqlalchemy.exc import NoResultFound + from letta.constants import PINECONE_TEXT_FIELD_NAME from letta.functions.types import FileOpenRequest from letta.helpers.pinecone_utils import search_pinecone_index, should_use_pinecone @@ -389,9 +391,9 @@ class LettaFileToolExecutor(ToolExecutor): for file_agent in file_agents: # Load file content - file = await self.file_manager.get_file_by_id(file_id=file_agent.file_id, actor=self.actor, include_content=True) - - if not file or not file.content: + try: + file = await self.file_manager.get_file_by_id(file_id=file_agent.file_id, actor=self.actor, include_content=True) + except NoResultFound: files_skipped += 1 self.logger.warning(f"Grep: Skipping file {file_agent.file_name} - no content available") continue