feat: add error handling for background mode fetch [LET-4052] (#4210)

* feat: add error handling for background mode fetch

* improve redis error message

* fix typeerror

---------

Co-authored-by: Shubham Naik <shub@letta.com>
This commit is contained in:
cthomas
2025-08-26 14:33:29 -07:00
committed by GitHub
parent fb6ecfc575
commit cae23cbdd5
3 changed files with 34 additions and 2 deletions

View File

@@ -4,6 +4,7 @@ from typing import List, Optional
from pydantic import BaseModel, ConfigDict, Field
from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG
from letta.helpers.datetime_helpers import get_utc_time
from letta.schemas.enums import JobStatus, JobType
from letta.schemas.letta_base import OrmMetadataBase
from letta.schemas.letta_message import MessageType
@@ -12,6 +13,7 @@ from letta.schemas.letta_message import MessageType
class JobBase(OrmMetadataBase):
__id_prefix__ = "job"
status: JobStatus = Field(default=JobStatus.created, description="The status of the job.")
created_at: datetime = Field(default_factory=get_utc_time, description="The unix timestamp of when the job was created.")
completed_at: Optional[datetime] = Field(None, description="The unix timestamp of when the job was completed.")
metadata: Optional[dict] = Field(None, validation_alias="metadata_", description="The metadata of the job.")
job_type: JobType = Field(default=JobType.JOB, description="The type of the job.")

View File

@@ -1268,7 +1268,11 @@ async def send_message_streaming(
if isinstance(redis_client, NoopAsyncRedisClient):
raise HTTPException(
status_code=503,
detail="Background streaming is not available: Redis is not configured. Please ensure Redis is properly configured and running.",
detail=(
"Background streaming requires Redis to be running. "
"Please ensure Redis is properly configured. "
f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}"
),
)
if request.stream_tokens and model_compatible_token_streaming:

View File

@@ -1,9 +1,11 @@
from datetime import timedelta
from typing import Annotated, List, Optional
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query
from pydantic import Field
from letta.data_sources.redis_client import get_redis_client
from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client
from letta.helpers.datetime_helpers import get_utc_time
from letta.orm.errors import NoResultFound
from letta.schemas.enums import JobStatus, JobType, MessageRole
from letta.schemas.letta_message import LettaMessageUnion
@@ -260,8 +262,32 @@ async def retrieve_stream(
actor_id: Optional[str] = Header(None, alias="user_id"),
server: "SyncServer" = Depends(get_letta_server),
):
actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id)
try:
job = server.job_manager.get_job_by_id(job_id=run_id, actor=actor)
except NoResultFound:
raise HTTPException(status_code=404, detail="Run not found")
run = Run.from_job(job)
if "background" not in run.metadata or not run.metadata["background"]:
raise HTTPException(status_code=400, detail="Run was not created in background mode, so it cannot be retrieved.")
if run.created_at < get_utc_time() - timedelta(hours=3):
raise HTTPException(status_code=410, detail="Run was created more than 3 hours ago, and is now expired.")
redis_client = await get_redis_client()
if isinstance(redis_client, NoopAsyncRedisClient):
raise HTTPException(
status_code=503,
detail=(
"Background streaming requires Redis to be running. "
"Please ensure Redis is properly configured. "
f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}"
),
)
stream = redis_sse_stream_generator(
redis_client=redis_client,
run_id=run_id,