runs and jobs [LET-4634] (#5427)

runs and jobs

Co-authored-by: Ari Webb <ari@letta.com>
This commit is contained in:
Ari Webb
2025-10-14 14:14:00 -07:00
committed by Caren Thomas
parent 9e94c344b8
commit 4431b06881
4 changed files with 48 additions and 61 deletions

View File

@@ -19,6 +19,7 @@ class ErrorCode(Enum):
RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED"
TIMEOUT = "TIMEOUT"
CONFLICT = "CONFLICT"
EXPIRED = "EXPIRED"
class LettaError(Exception):
@@ -141,6 +142,13 @@ class LettaUnexpectedStreamCancellationError(LettaError):
"""Error raised when a streaming request is terminated unexpectedly."""
class LettaExpiredError(LettaError):
"""Error raised when a resource has expired."""
def __init__(self, message: str):
super().__init__(message=message, code=ErrorCode.EXPIRED)
class LLMError(LettaError):
pass

View File

@@ -26,6 +26,7 @@ from letta.errors import (
AgentNotFoundForExportError,
BedrockPermissionError,
LettaAgentNotFoundError,
LettaExpiredError,
LettaInvalidArgumentError,
LettaInvalidMCPSchemaError,
LettaMCPConnectionError,
@@ -242,6 +243,7 @@ def create_application() -> "FastAPI":
_error_handler_404_user = partial(_error_handler_404, detail="User not found")
_error_handler_408 = partial(error_handler_with_code, code=408)
_error_handler_409 = partial(error_handler_with_code, code=409)
_error_handler_410 = partial(error_handler_with_code, code=410)
_error_handler_422 = partial(error_handler_with_code, code=422)
_error_handler_500 = partial(error_handler_with_code, code=500)
_error_handler_503 = partial(error_handler_with_code, code=503)
@@ -259,6 +261,9 @@ def create_application() -> "FastAPI":
app.add_exception_handler(LettaUserNotFoundError, _error_handler_404_user)
app.add_exception_handler(AgentNotFoundForExportError, _error_handler_404)
# 410 Expired errors
app.add_exception_handler(LettaExpiredError, _error_handler_410)
# 408 Timeout errors
app.add_exception_handler(LettaMCPTimeoutError, _error_handler_408)
app.add_exception_handler(LettaInvalidMCPSchemaError, _error_handler_400)

View File

@@ -1,8 +1,8 @@
from typing import List, Literal, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi import APIRouter, Depends, Query
from letta.orm.errors import NoResultFound
from letta.errors import LettaInvalidArgumentError
from letta.schemas.enums import JobStatus
from letta.schemas.job import Job
from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server
@@ -96,11 +96,7 @@ async def retrieve_job(
Get the status of a job.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
try:
return await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor)
except NoResultFound:
raise HTTPException(status_code=404, detail="Job not found")
return await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor)
@router.patch("/{job_id}/cancel", response_model=Job, operation_id="cancel_job")
@@ -117,19 +113,15 @@ async def cancel_job(
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
if not settings.track_agent_run:
raise HTTPException(status_code=400, detail="Agent run tracking is disabled")
raise LettaInvalidArgumentError("Agent run tracking is disabled")
try:
# First check if the job exists and is in a cancellable state
existing_job = await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor)
# First check if the job exists and is in a cancellable state
existing_job = await server.job_manager.get_job_by_id_async(job_id=job_id, actor=actor)
if existing_job.status.is_terminal:
return False
if existing_job.status.is_terminal:
return False
return await server.job_manager.safe_update_job_status_async(job_id=job_id, new_status=JobStatus.cancelled, actor=actor)
except NoResultFound:
raise HTTPException(status_code=404, detail="Job not found")
return await server.job_manager.safe_update_job_status_async(job_id=job_id, new_status=JobStatus.cancelled, actor=actor)
@router.delete("/{job_id}", response_model=Job, operation_id="delete_job")
@@ -142,9 +134,4 @@ async def delete_job(
Delete a job by its job_id.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
try:
job = await server.job_manager.delete_job_by_id_async(job_id=job_id, actor=actor)
return job
except NoResultFound:
raise HTTPException(status_code=404, detail="Job not found")
return await server.job_manager.delete_job_by_id_async(job_id=job_id, actor=actor)

View File

@@ -5,8 +5,8 @@ from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import Field
from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client
from letta.errors import LettaExpiredError, LettaInvalidArgumentError
from letta.helpers.datetime_helpers import get_utc_time
from letta.orm.errors import NoResultFound
from letta.schemas.enums import RunStatus
from letta.schemas.letta_message import LettaMessageUnion
from letta.schemas.letta_request import RetrieveStreamRequest
@@ -151,28 +151,25 @@ async def retrieve_run(
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
runs_manager = RunManager()
try:
run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor)
run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor)
use_lettuce = run.metadata and run.metadata.get("lettuce")
if use_lettuce and run.status not in [RunStatus.completed, RunStatus.failed, RunStatus.cancelled]:
lettuce_client = await LettuceClient.create()
status = await lettuce_client.get_status(run_id=run_id)
use_lettuce = run.metadata and run.metadata.get("lettuce")
if use_lettuce and run.status not in [RunStatus.completed, RunStatus.failed, RunStatus.cancelled]:
lettuce_client = await LettuceClient.create()
status = await lettuce_client.get_status(run_id=run_id)
# Map the status to our enum
run_status = run.status
if status == "RUNNING":
run_status = RunStatus.running
elif status == "COMPLETED":
run_status = RunStatus.completed
elif status == "FAILED":
run_status = RunStatus.failed
elif status == "CANCELLED":
run_status = RunStatus.cancelled
run.status = run_status
return run
except NoResultFound:
raise HTTPException(status_code=404, detail="Run not found")
# Map the status to our enum
run_status = run.status
if status == "RUNNING":
run_status = RunStatus.running
elif status == "COMPLETED":
run_status = RunStatus.completed
elif status == "FAILED":
run_status = RunStatus.failed
elif status == "CANCELLED":
run_status = RunStatus.cancelled
run.status = run_status
return run
RunMessagesResponse = Annotated[
@@ -218,11 +215,7 @@ async def retrieve_run_usage(
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
runs_manager = RunManager()
try:
usage = await runs_manager.get_run_usage(run_id=run_id, actor=actor)
return usage
except NoResultFound:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found")
return await runs_manager.get_run_usage(run_id=run_id, actor=actor)
@router.get("/{run_id}/metrics", response_model=RunMetrics, operation_id="retrieve_metrics_for_run")
@@ -234,12 +227,9 @@ async def retrieve_metrics_for_run(
"""
Get run metrics by run ID.
"""
try:
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
runs_manager = RunManager()
return await runs_manager.get_run_metrics_async(run_id=run_id, actor=actor)
except NoResultFound:
raise HTTPException(status_code=404, detail="Run metrics not found")
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
runs_manager = RunManager()
return await runs_manager.get_run_metrics_async(run_id=run_id, actor=actor)
@router.get(
@@ -330,16 +320,13 @@ async def retrieve_stream(
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
runs_manager = RunManager()
try:
run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor)
except NoResultFound:
raise HTTPException(status_code=404, detail="Run not found")
run = await runs_manager.get_run_by_id(run_id=run_id, actor=actor)
if not run.background:
raise HTTPException(status_code=400, detail="Run was not created in background mode, so it cannot be retrieved.")
raise LettaInvalidArgumentError("Run was not created in background mode, so it cannot be retrieved.")
if run.created_at < get_utc_time() - timedelta(hours=3):
raise HTTPException(status_code=410, detail="Run was created more than 3 hours ago, and is now expired.")
raise LettaExpiredError("Run was created more than 3 hours ago, and is now expired.")
redis_client = await get_redis_client()