fix: re-raise the error and send [DONE] for known LLM errors in stream (#5805)

This commit is contained in:
Sarah Wooders
2025-10-28 21:50:18 -07:00
committed by Caren Thomas
parent 7f128544d7
commit 655c9489d8
2 changed files with 48 additions and 14 deletions

View File

@@ -271,7 +271,7 @@ def create_application() -> "FastAPI":
return JSONResponse(
status_code=500,
content={
"detail": "An internal server error occurred",
"detail": "An unknown error occurred",
# Only include error details in debug/development mode
# "debug_info": str(exc) if settings.debug else None
},

View File

@@ -31,6 +31,7 @@ from letta.schemas.letta_message import AssistantMessage, MessageType
from letta.schemas.letta_message_content import TextContent
from letta.schemas.letta_request import LettaStreamingRequest
from letta.schemas.letta_response import LettaResponse
from letta.schemas.letta_stop_reason import LettaStopReason, StopReasonType
from letta.schemas.message import MessageCreate
from letta.schemas.run import Run as PydanticRun, RunUpdate
from letta.schemas.usage import LettaUsageStatistics
@@ -273,6 +274,11 @@ class StreamingService:
async def error_aware_stream():
"""Stream that handles early LLM errors gracefully in streaming format."""
run_status = None
run_update_metadata = None
stop_reason = None
error_data = None
try:
stream = agent_loop.stream(
input_messages=messages,
@@ -287,23 +293,22 @@ class StreamingService:
async for chunk in stream:
yield chunk
# update run status after completion
if run_id and self.runs_manager:
if agent_loop.stop_reason.stop_reason.value == "cancelled":
run_status = RunStatus.cancelled
else:
run_status = RunStatus.completed
await self.runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=run_status, stop_reason=agent_loop.stop_reason.stop_reason.value),
actor=actor,
)
# set run status after successful completion
if agent_loop.stop_reason.stop_reason.value == "cancelled":
run_status = RunStatus.cancelled
else:
run_status = RunStatus.completed
stop_reason = agent_loop.stop_reason.stop_reason.value
except LLMTimeoutError as e:
run_status = RunStatus.failed
error_data = {"error": {"type": "llm_timeout", "message": "The LLM request timed out. Please try again.", "detail": str(e)}}
stop_reason = StopReasonType.llm_api_error
yield (f"data: {json.dumps(error_data)}\n\n", 504)
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMRateLimitError as e:
run_status = RunStatus.failed
error_data = {
"error": {
"type": "llm_rate_limit",
@@ -311,8 +316,12 @@ class StreamingService:
"detail": str(e),
}
}
stop_reason = StopReasonType.llm_api_error
yield (f"data: {json.dumps(error_data)}\n\n", 429)
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMAuthenticationError as e:
run_status = RunStatus.failed
error_data = {
"error": {
"type": "llm_authentication",
@@ -320,13 +329,38 @@ class StreamingService:
"detail": str(e),
}
}
stop_reason = StopReasonType.llm_api_error
yield (f"data: {json.dumps(error_data)}\n\n", 401)
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMError as e:
run_status = RunStatus.failed
error_data = {"error": {"type": "llm_error", "message": "An error occurred with the LLM request.", "detail": str(e)}}
yield (f"data: {json.dumps(error_data)}\n\n", 502)
# Send [DONE] marker to properly close the stream
stop_reason = StopReasonType.llm_api_error
yield "data: [DONE]\n\n"
except Exception as e:
error_data = {"error": {"type": "internal_error", "message": "An internal server error occurred.", "detail": str(e)}}
run_status = RunStatus.failed
error_data = {
"error": {
"type": "internal_error",
"message": "An unknown error occurred with the LLM streaming request.",
"detail": str(e),
}
}
stop_reason = StopReasonType.error
yield (f"data: {json.dumps(error_data)}\n\n", 500)
# Re-raise to ensure proper error handling and Sentry capture
raise
finally:
# always update run status, whether success or failure
if run_id and self.runs_manager and run_status:
await self.runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=run_status, stop_reason=stop_reason, metadata=error_data),
actor=actor,
)
return error_aware_stream()