diff --git a/letta/errors.py b/letta/errors.py index 082fe24a..d1962314 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -19,6 +19,7 @@ class ErrorCode(Enum): RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED" TIMEOUT = "TIMEOUT" CONFLICT = "CONFLICT" + EXPIRED = "EXPIRED" class LettaError(Exception): @@ -141,6 +142,13 @@ class LettaUnexpectedStreamCancellationError(LettaError): """Error raised when a streaming request is terminated unexpectedly.""" +class LettaExpiredError(LettaError): + """Error raised when a resource has expired.""" + + def __init__(self, message: str): + super().__init__(message=message, code=ErrorCode.EXPIRED) + + class LLMError(LettaError): pass diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 4761d76a..72ff96f4 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -26,6 +26,7 @@ from letta.errors import ( AgentNotFoundForExportError, BedrockPermissionError, LettaAgentNotFoundError, + LettaExpiredError, LettaInvalidArgumentError, LettaInvalidMCPSchemaError, LettaMCPConnectionError, @@ -242,6 +243,7 @@ def create_application() -> "FastAPI": _error_handler_404_user = partial(_error_handler_404, detail="User not found") _error_handler_408 = partial(error_handler_with_code, code=408) _error_handler_409 = partial(error_handler_with_code, code=409) + _error_handler_410 = partial(error_handler_with_code, code=410) _error_handler_422 = partial(error_handler_with_code, code=422) _error_handler_500 = partial(error_handler_with_code, code=500) _error_handler_503 = partial(error_handler_with_code, code=503) @@ -259,6 +261,9 @@ def create_application() -> "FastAPI": app.add_exception_handler(LettaUserNotFoundError, _error_handler_404_user) app.add_exception_handler(AgentNotFoundForExportError, _error_handler_404) + # 410 Expired errors + app.add_exception_handler(LettaExpiredError, _error_handler_410) + # 408 Timeout errors app.add_exception_handler(LettaMCPTimeoutError, _error_handler_408) app.add_exception_handler(LettaInvalidMCPSchemaError, _error_handler_400) diff --git a/letta/server/rest_api/routers/v1/jobs.py b/letta/server/rest_api/routers/v1/jobs.py index 7aeccfaa..19062455 100644 --- a/letta/server/rest_api/routers/v1/jobs.py +++ b/letta/server/rest_api/routers/v1/jobs.py @@ -1,8 +1,8 @@ from typing import List, Literal, Optional -from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi import APIRouter, Depends, Query -from letta.orm.errors import NoResultFound +from letta.errors import LettaInvalidArgumentError from letta.schemas.enums import JobStatus from letta.schemas.job import Job from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server @@ -96,11 +96,7 @@ async def retrieve_job( Get the status of a job. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - - try: - return await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor) - except NoResultFound: - raise HTTPException(status_code=404, detail="Job not found") + return await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor) @router.patch("/{job_id}/cancel", response_model=Job, operation_id="cancel_job") @@ -117,19 +113,15 @@ async def cancel_job( """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) if not settings.track_agent_run: - raise HTTPException(status_code=400, detail="Agent run tracking is disabled") + raise LettaInvalidArgumentError("Agent run tracking is disabled") - try: - # First check if the job exists and is in a cancellable state - existing_job = await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor) + # First check if the job exists and is in a cancellable state + existing_job = await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor) - if existing_job.status.is_terminal: - return False + if existing_job.status.is_terminal: + return False - return await server.job_manager.safe_update_job_status_async(job_id=job_id, new_status=JobStatus.cancelled, actor=actor) - - except NoResultFound: - raise HTTPException(status_code=404, detail="Job not found") + return await server.job_manager.safe_update_job_status_async(job_id=job_id, new_status=JobStatus.cancelled, actor=actor) @router.delete("/{job_id}", response_model=Job, operation_id="delete_job") @@ -142,9 +134,4 @@ async def delete_job( Delete a job by its job_id. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - - try: - job = await server.job_manager.delete_job_by_id_async(job_id=job_id, actor=actor) - return job - except NoResultFound: - raise HTTPException(status_code=404, detail="Job not found") + return await server.job_manager.delete_job_by_id_async(job_id=job_id, actor=actor) diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index 4348724e..bb34706c 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -5,8 +5,8 @@ from fastapi import APIRouter, Body, Depends, HTTPException, Query from pydantic import Field from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client +from letta.errors import LettaExpiredError, LettaInvalidArgumentError from letta.helpers.datetime_helpers import get_utc_time -from letta.orm.errors import NoResultFound from letta.schemas.enums import RunStatus from letta.schemas.letta_message import LettaMessageUnion from letta.schemas.letta_request import RetrieveStreamRequest @@ -151,28 +151,25 @@ async def retrieve_run( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) runs_manager = RunManager() - try: - run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor) + run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor) - use_lettuce = run.metadata and run.metadata.get("lettuce") - if use_lettuce and run.status not in [RunStatus.completed, RunStatus.failed, RunStatus.cancelled]: - lettuce_client = await LettuceClient.create() - status = await lettuce_client.get_status(run_id=run_id) + use_lettuce = run.metadata and run.metadata.get("lettuce") + if use_lettuce and run.status not in [RunStatus.completed, RunStatus.failed, RunStatus.cancelled]: + lettuce_client = await LettuceClient.create() + status = await lettuce_client.get_status(run_id=run_id) - # Map the status to our enum - run_status = run.status - if status == "RUNNING": - run_status = RunStatus.running - elif status == "COMPLETED": - run_status = RunStatus.completed - elif status == "FAILED": - run_status = RunStatus.failed - elif status == "CANCELLED": - run_status = RunStatus.cancelled - run.status = run_status - return run - except NoResultFound: - raise HTTPException(status_code=404, detail="Run not found") + # Map the status to our enum + run_status = run.status + if status == "RUNNING": + run_status = RunStatus.running + elif status == "COMPLETED": + run_status = RunStatus.completed + elif status == "FAILED": + run_status = RunStatus.failed + elif status == "CANCELLED": + run_status = RunStatus.cancelled + run.status = run_status + return run RunMessagesResponse = Annotated[ @@ -218,11 +215,7 @@ async def retrieve_run_usage( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) runs_manager = RunManager() - try: - usage = await runs_manager.get_run_usage(run_id=run_id, actor=actor) - return usage - except NoResultFound: - raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found") + return await runs_manager.get_run_usage(run_id=run_id, actor=actor) @router.get("/{run_id}/metrics", response_model=RunMetrics, operation_id="retrieve_metrics_for_run") @@ -234,12 +227,9 @@ async def retrieve_metrics_for_run( """ Get run metrics by run ID. """ - try: - actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - runs_manager = RunManager() - return await runs_manager.get_run_metrics_async(run_id=run_id, actor=actor) - except NoResultFound: - raise HTTPException(status_code=404, detail="Run metrics not found") + actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + runs_manager = RunManager() + return await runs_manager.get_run_metrics_async(run_id=run_id, actor=actor) @router.get( @@ -330,16 +320,13 @@ async def retrieve_stream( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) runs_manager = RunManager() - try: - run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor) - except NoResultFound: - raise HTTPException(status_code=404, detail="Run not found") + run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor) if not run.background: - raise HTTPException(status_code=400, detail="Run was not created in background mode, so it cannot be retrieved.") + raise LettaInvalidArgumentError("Run was not created in background mode, so it cannot be retrieved.") if run.created_at < get_utc_time() - timedelta(hours=3): - raise HTTPException(status_code=410, detail="Run was created more than 3 hours ago, and is now expired.") + raise LettaExpiredError("Run was created more than 3 hours ago, and is now expired.") redis_client = await get_redis_client()