diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 4f08b380..a3584f0c 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -271,7 +271,7 @@ def create_application() -> "FastAPI": return JSONResponse( status_code=500, content={ - "detail": "An internal server error occurred", + "detail": "An unknown error occurred", # Only include error details in debug/development mode # "debug_info": str(exc) if settings.debug else None }, diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 5c7b714c..b42878ad 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -31,6 +31,7 @@ from letta.schemas.letta_message import AssistantMessage, MessageType from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_request import LettaStreamingRequest from letta.schemas.letta_response import LettaResponse +from letta.schemas.letta_stop_reason import LettaStopReason, StopReasonType from letta.schemas.message import MessageCreate from letta.schemas.run import Run as PydanticRun, RunUpdate from letta.schemas.usage import LettaUsageStatistics @@ -273,6 +274,11 @@ class StreamingService: async def error_aware_stream(): """Stream that handles early LLM errors gracefully in streaming format.""" + run_status = None + run_update_metadata = None + stop_reason = None + error_data = None + try: stream = agent_loop.stream( input_messages=messages, @@ -287,23 +293,22 @@ class StreamingService: async for chunk in stream: yield chunk - # update run status after completion - if run_id and self.runs_manager: - if agent_loop.stop_reason.stop_reason.value == "cancelled": - run_status = RunStatus.cancelled - else: - run_status = RunStatus.completed - - await self.runs_manager.update_run_by_id_async( - run_id=run_id, - update=RunUpdate(status=run_status, stop_reason=agent_loop.stop_reason.stop_reason.value), - actor=actor, - ) + # set run status after successful completion + if agent_loop.stop_reason.stop_reason.value == "cancelled": + run_status = RunStatus.cancelled + else: + run_status = RunStatus.completed + stop_reason = agent_loop.stop_reason.stop_reason.value except LLMTimeoutError as e: + run_status = RunStatus.failed error_data = {"error": {"type": "llm_timeout", "message": "The LLM request timed out. Please try again.", "detail": str(e)}} + stop_reason = StopReasonType.llm_api_error yield (f"data: {json.dumps(error_data)}\n\n", 504) + # Send [DONE] marker to properly close the stream + yield "data: [DONE]\n\n" except LLMRateLimitError as e: + run_status = RunStatus.failed error_data = { "error": { "type": "llm_rate_limit", @@ -311,8 +316,12 @@ class StreamingService: "detail": str(e), } } + stop_reason = StopReasonType.llm_api_error yield (f"data: {json.dumps(error_data)}\n\n", 429) + # Send [DONE] marker to properly close the stream + yield "data: [DONE]\n\n" except LLMAuthenticationError as e: + run_status = RunStatus.failed error_data = { "error": { "type": "llm_authentication", @@ -320,13 +329,38 @@ class StreamingService: "detail": str(e), } } + stop_reason = StopReasonType.llm_api_error yield (f"data: {json.dumps(error_data)}\n\n", 401) + # Send [DONE] marker to properly close the stream + yield "data: [DONE]\n\n" except LLMError as e: + run_status = RunStatus.failed error_data = {"error": {"type": "llm_error", "message": "An error occurred with the LLM request.", "detail": str(e)}} yield (f"data: {json.dumps(error_data)}\n\n", 502) + # Send [DONE] marker to properly close the stream + stop_reason = StopReasonType.llm_api_error + yield "data: [DONE]\n\n" except Exception as e: - error_data = {"error": {"type": "internal_error", "message": "An internal server error occurred.", "detail": str(e)}} + run_status = RunStatus.failed + error_data = { + "error": { + "type": "internal_error", + "message": "An unknown error occurred with the LLM streaming request.", + "detail": str(e), + } + } + stop_reason = StopReasonType.error yield (f"data: {json.dumps(error_data)}\n\n", 500) + # Re-raise to ensure proper error handling and Sentry capture + raise + finally: + # always update run status, whether success or failure + if run_id and self.runs_manager and run_status: + await self.runs_manager.update_run_by_id_async( + run_id=run_id, + update=RunUpdate(status=run_status, stop_reason=stop_reason, metadata=error_data), + actor=actor, + ) return error_aware_stream()