From cae23cbdd5d223c13deb165a5de27ed64bb764e8 Mon Sep 17 00:00:00 2001 From: cthomas Date: Tue, 26 Aug 2025 14:33:29 -0700 Subject: [PATCH] feat: add error handling for background mode fetch [LET-4052] (#4210) * feat: add error handling for background mode fetch * improve redis error message * fix typeerror --------- Co-authored-by: Shubham Naik --- letta/schemas/job.py | 2 ++ letta/server/rest_api/routers/v1/agents.py | 6 ++++- letta/server/rest_api/routers/v1/runs.py | 28 +++++++++++++++++++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/letta/schemas/job.py b/letta/schemas/job.py index 645b332d..257917a0 100644 --- a/letta/schemas/job.py +++ b/letta/schemas/job.py @@ -4,6 +4,7 @@ from typing import List, Optional from pydantic import BaseModel, ConfigDict, Field from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG +from letta.helpers.datetime_helpers import get_utc_time from letta.schemas.enums import JobStatus, JobType from letta.schemas.letta_base import OrmMetadataBase from letta.schemas.letta_message import MessageType @@ -12,6 +13,7 @@ from letta.schemas.letta_message import MessageType class JobBase(OrmMetadataBase): __id_prefix__ = "job" status: JobStatus = Field(default=JobStatus.created, description="The status of the job.") + created_at: datetime = Field(default_factory=get_utc_time, description="The unix timestamp of when the job was created.") completed_at: Optional[datetime] = Field(None, description="The unix timestamp of when the job was completed.") metadata: Optional[dict] = Field(None, validation_alias="metadata_", description="The metadata of the job.") job_type: JobType = Field(default=JobType.JOB, description="The type of the job.") diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 084f2345..d849ba9a 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -1268,7 +1268,11 @@ async def send_message_streaming( if isinstance(redis_client, NoopAsyncRedisClient): raise HTTPException( status_code=503, - detail="Background streaming is not available: Redis is not configured. Please ensure Redis is properly configured and running.", + detail=( + "Background streaming requires Redis to be running. " + "Please ensure Redis is properly configured. " + f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}" + ), ) if request.stream_tokens and model_compatible_token_streaming: diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index 8b283bf3..3ac34c3d 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -1,9 +1,11 @@ +from datetime import timedelta from typing import Annotated, List, Optional from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query from pydantic import Field -from letta.data_sources.redis_client import get_redis_client +from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client +from letta.helpers.datetime_helpers import get_utc_time from letta.orm.errors import NoResultFound from letta.schemas.enums import JobStatus, JobType, MessageRole from letta.schemas.letta_message import LettaMessageUnion @@ -260,8 +262,32 @@ async def retrieve_stream( actor_id: Optional[str] = Header(None, alias="user_id"), server: "SyncServer" = Depends(get_letta_server), ): + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) + try: + job = server.job_manager.get_job_by_id(job_id=run_id, actor=actor) + except NoResultFound: + raise HTTPException(status_code=404, detail="Run not found") + + run = Run.from_job(job) + + if "background" not in run.metadata or not run.metadata["background"]: + raise HTTPException(status_code=400, detail="Run was not created in background mode, so it cannot be retrieved.") + + if run.created_at < get_utc_time() - timedelta(hours=3): + raise HTTPException(status_code=410, detail="Run was created more than 3 hours ago, and is now expired.") + redis_client = await get_redis_client() + if isinstance(redis_client, NoopAsyncRedisClient): + raise HTTPException( + status_code=503, + detail=( + "Background streaming requires Redis to be running. " + "Please ensure Redis is properly configured. " + f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}" + ), + ) + stream = redis_sse_stream_generator( redis_client=redis_client, run_id=run_id,