move exceptions out of folders and sources [LET-4631] (#5444)
This commit is contained in:
@@ -98,6 +98,10 @@ class LettaUserNotFoundError(LettaError):
|
||||
"""Error raised when a user is not found."""
|
||||
|
||||
|
||||
class LettaUnsupportedFileUploadError(LettaError):
|
||||
"""Error raised when an unsupported file upload is attempted."""
|
||||
|
||||
|
||||
class LettaInvalidArgumentError(LettaError):
|
||||
"""Error raised when an invalid argument is provided."""
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ from letta.errors import (
|
||||
LettaMCPTimeoutError,
|
||||
LettaToolCreateError,
|
||||
LettaToolNameConflictError,
|
||||
LettaUnsupportedFileUploadError,
|
||||
LettaUserNotFoundError,
|
||||
LLMAuthenticationError,
|
||||
LLMError,
|
||||
@@ -244,6 +245,7 @@ def create_application() -> "FastAPI":
|
||||
_error_handler_408 = partial(error_handler_with_code, code=408)
|
||||
_error_handler_409 = partial(error_handler_with_code, code=409)
|
||||
_error_handler_410 = partial(error_handler_with_code, code=410)
|
||||
_error_handler_415 = partial(error_handler_with_code, code=415)
|
||||
_error_handler_422 = partial(error_handler_with_code, code=422)
|
||||
_error_handler_500 = partial(error_handler_with_code, code=500)
|
||||
_error_handler_503 = partial(error_handler_with_code, code=503)
|
||||
@@ -273,6 +275,9 @@ def create_application() -> "FastAPI":
|
||||
app.add_exception_handler(UniqueConstraintViolationError, _error_handler_409)
|
||||
app.add_exception_handler(IntegrityError, _error_handler_409)
|
||||
|
||||
# 415 Unsupported Media Type errors
|
||||
app.add_exception_handler(LettaUnsupportedFileUploadError, _error_handler_415)
|
||||
|
||||
# 422 Validation errors
|
||||
app.add_exception_handler(ValidationError, _error_handler_422)
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from starlette import status
|
||||
from starlette.responses import Response
|
||||
|
||||
import letta.constants as constants
|
||||
from letta.errors import LettaInvalidArgumentError, LettaUnsupportedFileUploadError
|
||||
from letta.helpers.pinecone_utils import (
|
||||
delete_file_records_from_pinecone_index,
|
||||
delete_source_records_from_pinecone_index,
|
||||
@@ -70,8 +71,6 @@ async def retrieve_folder(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
folder = await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor)
|
||||
if not folder:
|
||||
raise HTTPException(status_code=404, detail=f"Folder with id={folder_id} not found.")
|
||||
return folder
|
||||
|
||||
|
||||
@@ -90,8 +89,6 @@ async def get_folder_by_name(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
folder = await server.source_manager.get_source_by_name(source_name=folder_name, actor=actor)
|
||||
if not folder:
|
||||
raise HTTPException(status_code=404, detail=f"Folder with name={folder_name} not found.")
|
||||
return folder.id
|
||||
|
||||
|
||||
@@ -157,8 +154,9 @@ async def create_folder(
|
||||
if not folder_create.embedding_config:
|
||||
if not folder_create.embedding:
|
||||
if settings.default_embedding_handle is None:
|
||||
# TODO: modify error type
|
||||
raise ValueError("Must specify either embedding or embedding_config in request")
|
||||
raise LettaInvalidArgumentError(
|
||||
"Must specify either embedding or embedding_config in request", argument_name="default_embedding_handle"
|
||||
)
|
||||
else:
|
||||
folder_create.embedding = settings.default_embedding_handle
|
||||
folder_create.embedding_config = await server.get_embedding_config_from_handle_async(
|
||||
@@ -188,8 +186,7 @@ async def modify_folder(
|
||||
"""
|
||||
# TODO: allow updating the handle/embedding config
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
if not await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor):
|
||||
raise HTTPException(status_code=404, detail=f"Folder with id={folder_id} does not exist.")
|
||||
await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor)
|
||||
return await server.source_manager.update_source(source_id=folder_id, source_update=folder, actor=actor)
|
||||
|
||||
|
||||
@@ -222,11 +219,9 @@ async def delete_folder(
|
||||
await server.remove_files_from_context_window(agent_state=agent_state, file_ids=file_ids, actor=actor)
|
||||
|
||||
if agent_state.enable_sleeptime:
|
||||
try:
|
||||
block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=folder.name, actor=actor)
|
||||
block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=folder.name, actor=actor)
|
||||
if block:
|
||||
await server.block_manager.delete_block_async(block.id, actor)
|
||||
except:
|
||||
pass
|
||||
await server.delete_source(source_id=folder_id, actor=actor)
|
||||
|
||||
|
||||
@@ -265,9 +260,8 @@ async def upload_file_to_folder(
|
||||
|
||||
# If still not allowed, reject with 415.
|
||||
if media_type not in allowed_media_types:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
|
||||
detail=(
|
||||
raise LettaUnsupportedFileUploadError(
|
||||
message=(
|
||||
f"Unsupported file type: {media_type or 'unknown'} "
|
||||
f"(filename: {file.filename}). "
|
||||
f"Supported types: PDF, text files (.txt, .md), JSON, and code files (.py, .js, .java, etc.)."
|
||||
@@ -277,8 +271,6 @@ async def upload_file_to_folder(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
folder = await server.source_manager.get_source_by_id(source_id=folder_id, actor=actor)
|
||||
if folder is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Folder with id={folder_id} not found.")
|
||||
|
||||
content = await file.read()
|
||||
|
||||
@@ -297,8 +289,9 @@ async def upload_file_to_folder(
|
||||
if existing_file:
|
||||
# Duplicate found, handle based on strategy
|
||||
if duplicate_handling == DuplicateFileHandling.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT, detail=f"File '{original_filename}' already exists in folder '{folder.name}'"
|
||||
raise LettaInvalidArgumentError(
|
||||
message=f"File '{original_filename}' already exists in folder '{folder.name}'",
|
||||
argument_name="duplicate_handling",
|
||||
)
|
||||
elif duplicate_handling == DuplicateFileHandling.SKIP:
|
||||
# Return existing file metadata with custom header to indicate it was skipped
|
||||
@@ -528,8 +521,6 @@ async def delete_file_from_folder(
|
||||
await delete_file_records_from_pinecone_index(file_id=file_id, actor=actor)
|
||||
|
||||
safe_create_task(sleeptime_document_ingest_async(server, folder_id, actor, clear_history=True), label="document_ingest_after_delete")
|
||||
if deleted_file is None:
|
||||
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.")
|
||||
|
||||
|
||||
async def load_file_to_source_async(server: SyncServer, source_id: str, job_id: str, filename: str, bytes: bytes, actor: User):
|
||||
|
||||
@@ -9,6 +9,7 @@ from starlette import status
|
||||
from starlette.responses import Response
|
||||
|
||||
import letta.constants as constants
|
||||
from letta.errors import LettaInvalidArgumentError, LettaUnsupportedFileUploadError
|
||||
from letta.helpers.pinecone_utils import (
|
||||
delete_file_records_from_pinecone_index,
|
||||
delete_source_records_from_pinecone_index,
|
||||
@@ -66,10 +67,7 @@ async def retrieve_source(
|
||||
Get all sources
|
||||
"""
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
source = await server.source_manager.get_source_by_id(source_id=source_id, actor=actor)
|
||||
if not source:
|
||||
raise HTTPException(status_code=404, detail=f"Source with id={source_id} not found.")
|
||||
return source
|
||||
|
||||
|
||||
@@ -85,8 +83,6 @@ async def get_source_id_by_name(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
source = await server.source_manager.get_source_by_name(source_name=source_name, actor=actor)
|
||||
if not source:
|
||||
raise HTTPException(status_code=404, detail=f"Source with name={source_name} not found.")
|
||||
return source.id
|
||||
|
||||
|
||||
@@ -138,8 +134,9 @@ async def create_source(
|
||||
if not source_create.embedding_config:
|
||||
if not source_create.embedding:
|
||||
if settings.default_embedding_handle is None:
|
||||
# TODO: modify error type
|
||||
raise ValueError("Must specify either embedding or embedding_config in request")
|
||||
raise LettaInvalidArgumentError(
|
||||
"Must specify either embedding or embedding_config in request", argument_name="default_embedding_handle"
|
||||
)
|
||||
else:
|
||||
source_create.embedding = settings.default_embedding_handle
|
||||
source_create.embedding_config = await server.get_embedding_config_from_handle_async(
|
||||
@@ -169,8 +166,7 @@ async def modify_source(
|
||||
"""
|
||||
# TODO: allow updating the handle/embedding config
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
if not await server.source_manager.get_source_by_id(source_id=source_id, actor=actor):
|
||||
raise HTTPException(status_code=404, detail=f"Source with id={source_id} does not exist.")
|
||||
await server.source_manager.get_source_by_id(source_id=source_id, actor=actor)
|
||||
return await server.source_manager.update_source(source_id=source_id, source_update=source, actor=actor)
|
||||
|
||||
|
||||
@@ -203,11 +199,9 @@ async def delete_source(
|
||||
await server.remove_files_from_context_window(agent_state=agent_state, file_ids=file_ids, actor=actor)
|
||||
|
||||
if agent_state.enable_sleeptime:
|
||||
try:
|
||||
block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=source.name, actor=actor)
|
||||
block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=source.name, actor=actor)
|
||||
if block:
|
||||
await server.block_manager.delete_block_async(block.id, actor)
|
||||
except:
|
||||
pass
|
||||
await server.delete_source(source_id=source_id, actor=actor)
|
||||
|
||||
|
||||
@@ -246,9 +240,8 @@ async def upload_file_to_source(
|
||||
|
||||
# If still not allowed, reject with 415.
|
||||
if media_type not in allowed_media_types:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
|
||||
detail=(
|
||||
raise LettaUnsupportedFileUploadError(
|
||||
message=(
|
||||
f"Unsupported file type: {media_type or 'unknown'} "
|
||||
f"(filename: {file.filename}). "
|
||||
f"Supported types: PDF, text files (.txt, .md), JSON, and code files (.py, .js, .java, etc.)."
|
||||
@@ -258,8 +251,6 @@ async def upload_file_to_source(
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
|
||||
source = await server.source_manager.get_source_by_id(source_id=source_id, actor=actor)
|
||||
if source is None:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Source with id={source_id} not found.")
|
||||
|
||||
content = await file.read()
|
||||
|
||||
@@ -278,8 +269,9 @@ async def upload_file_to_source(
|
||||
if existing_file:
|
||||
# Duplicate found, handle based on strategy
|
||||
if duplicate_handling == DuplicateFileHandling.ERROR:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_409_CONFLICT, detail=f"File '{original_filename}' already exists in source '{source.name}'"
|
||||
raise LettaInvalidArgumentError(
|
||||
message=f"File '{original_filename}' already exists in source '{source.name}'",
|
||||
argument_name="duplicate_handling",
|
||||
)
|
||||
elif duplicate_handling == DuplicateFileHandling.SKIP:
|
||||
# Return existing file metadata with custom header to indicate it was skipped
|
||||
@@ -410,13 +402,6 @@ async def get_file_metadata(
|
||||
file_id=file_id, actor=actor, include_content=include_content, strip_directory_prefix=True
|
||||
)
|
||||
|
||||
if not file_metadata:
|
||||
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.")
|
||||
|
||||
# Verify the file belongs to the specified source
|
||||
if file_metadata.source_id != source_id:
|
||||
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found in source {source_id}.")
|
||||
|
||||
# Check and update file status (timeout check and pinecone embedding sync)
|
||||
file_metadata = await server.file_manager.check_and_update_file_status(file_metadata, actor)
|
||||
|
||||
@@ -452,8 +437,6 @@ async def delete_file_from_source(
|
||||
await delete_file_records_from_pinecone_index(file_id=file_id, actor=actor)
|
||||
|
||||
safe_create_task(sleeptime_document_ingest_async(server, source_id, actor, clear_history=True), label="document_ingest_after_delete")
|
||||
if deleted_file is None:
|
||||
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found.")
|
||||
|
||||
|
||||
async def load_file_to_source_async(server: SyncServer, source_id: str, job_id: str, filename: str, bytes: bytes, actor: User):
|
||||
|
||||
@@ -761,8 +761,6 @@ class SyncServer(object):
|
||||
|
||||
# TODO: move this into a thread
|
||||
source = await self.source_manager.get_source_by_id(source_id=source_id)
|
||||
if source is None:
|
||||
raise NoResultFound(f"Source {source_id} does not exist")
|
||||
connector = DirectoryConnector(input_files=[file_path])
|
||||
num_passages, num_documents = await self.load_data(user_id=source.created_by_id, source_name=source.name, connector=connector)
|
||||
|
||||
|
||||
@@ -60,7 +60,11 @@ class FileManager:
|
||||
text: Optional[str] = None,
|
||||
) -> PydanticFileMetadata:
|
||||
# short-circuit if it already exists
|
||||
existing = await self.get_file_by_id(file_metadata.id, actor=actor)
|
||||
try:
|
||||
existing = await self.get_file_by_id(file_metadata.id, actor=actor)
|
||||
except NoResultFound:
|
||||
existing = None
|
||||
|
||||
if existing:
|
||||
return existing
|
||||
|
||||
@@ -105,35 +109,29 @@ class FileManager:
|
||||
lazy SELECT (avoids MissingGreenlet).
|
||||
"""
|
||||
async with db_registry.async_session() as session:
|
||||
try:
|
||||
if include_content:
|
||||
# explicit eager load
|
||||
query = (
|
||||
select(FileMetadataModel).where(FileMetadataModel.id == file_id).options(selectinload(FileMetadataModel.content))
|
||||
)
|
||||
# apply org-scoping if actor provided
|
||||
if actor:
|
||||
query = FileMetadataModel.apply_access_predicate(
|
||||
query,
|
||||
actor,
|
||||
access=["read"],
|
||||
access_type=AccessType.ORGANIZATION,
|
||||
)
|
||||
|
||||
result = await session.execute(query)
|
||||
file_orm = result.scalar_one()
|
||||
else:
|
||||
# fast path (metadata only)
|
||||
file_orm = await FileMetadataModel.read_async(
|
||||
db_session=session,
|
||||
identifier=file_id,
|
||||
actor=actor,
|
||||
if include_content:
|
||||
# explicit eager load
|
||||
query = select(FileMetadataModel).where(FileMetadataModel.id == file_id).options(selectinload(FileMetadataModel.content))
|
||||
# apply org-scoping if actor provided
|
||||
if actor:
|
||||
query = FileMetadataModel.apply_access_predicate(
|
||||
query,
|
||||
actor,
|
||||
access=["read"],
|
||||
access_type=AccessType.ORGANIZATION,
|
||||
)
|
||||
|
||||
return await file_orm.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix)
|
||||
result = await session.execute(query)
|
||||
file_orm = result.scalar_one()
|
||||
else:
|
||||
# fast path (metadata only)
|
||||
file_orm = await FileMetadataModel.read_async(
|
||||
db_session=session,
|
||||
identifier=file_id,
|
||||
actor=actor,
|
||||
)
|
||||
|
||||
except NoResultFound:
|
||||
return None
|
||||
return await file_orm.to_pydantic_async(include_content=include_content, strip_directory_prefix=strip_directory_prefix)
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
|
||||
@@ -61,7 +61,11 @@ class SourceManager:
|
||||
@trace_method
|
||||
async def create_source(self, source: PydanticSource, actor: PydanticUser) -> PydanticSource:
|
||||
"""Create a new source based on the PydanticSource schema."""
|
||||
db_source = await self.get_source_by_id(source.id, actor=actor)
|
||||
try:
|
||||
db_source = await self.get_source_by_id(source.id, actor=actor)
|
||||
except NoResultFound:
|
||||
db_source = None
|
||||
|
||||
if db_source:
|
||||
return db_source
|
||||
else:
|
||||
@@ -346,11 +350,8 @@ class SourceManager:
|
||||
async def get_source_by_id(self, source_id: str, actor: Optional[PydanticUser] = None) -> Optional[PydanticSource]:
|
||||
"""Retrieve a source by its ID."""
|
||||
async with db_registry.async_session() as session:
|
||||
try:
|
||||
source = await SourceModel.read_async(db_session=session, identifier=source_id, actor=actor)
|
||||
return source.to_pydantic()
|
||||
except NoResultFound:
|
||||
return None
|
||||
source = await SourceModel.read_async(db_session=session, identifier=source_id, actor=actor)
|
||||
return source.to_pydantic()
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
@@ -364,7 +365,7 @@ class SourceManager:
|
||||
limit=1,
|
||||
)
|
||||
if not sources:
|
||||
return None
|
||||
raise NoResultFound(f"Source with name={source_name} not found.")
|
||||
else:
|
||||
return sources[0].to_pydantic()
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ import asyncio
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from sqlalchemy.exc import NoResultFound
|
||||
|
||||
from letta.constants import PINECONE_TEXT_FIELD_NAME
|
||||
from letta.functions.types import FileOpenRequest
|
||||
from letta.helpers.pinecone_utils import search_pinecone_index, should_use_pinecone
|
||||
@@ -389,9 +391,9 @@ class LettaFileToolExecutor(ToolExecutor):
|
||||
|
||||
for file_agent in file_agents:
|
||||
# Load file content
|
||||
file = await self.file_manager.get_file_by_id(file_id=file_agent.file_id, actor=self.actor, include_content=True)
|
||||
|
||||
if not file or not file.content:
|
||||
try:
|
||||
file = await self.file_manager.get_file_by_id(file_id=file_agent.file_id, actor=self.actor, include_content=True)
|
||||
except NoResultFound:
|
||||
files_skipped += 1
|
||||
self.logger.warning(f"Grep: Skipping file {file_agent.file_name} - no content available")
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user