From 468b47bef53ccb61395766e49ad84550f6b57b24 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sun, 2 Nov 2025 20:38:37 -0800 Subject: [PATCH] fix(core): patch sse streaming errors (#5906) * fix: patch sse streaming errors * fix: don't re-raise, but log explicitly with sentry * chore: cleanup comments * fix: revert change from #5907, also make sure to write out a [DONE] to close the stream --- letta/agents/letta_agent_v3.py | 17 ++++++++++++++--- letta/server/rest_api/redis_stream_manager.py | 3 ++- letta/services/streaming_service.py | 19 +++++++++++-------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index 3e64228a..4a340e22 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -283,8 +283,8 @@ class LettaAgentV3(LettaAgentV2): # Clear to avoid duplication in next iteration self.response_messages = [] - # if not self.should_continue: - # break + if not self.should_continue: + break input_messages_to_persist = [] @@ -305,7 +305,18 @@ class LettaAgentV3(LettaAgentV2): except Exception as e: self.logger.warning(f"Error during agent stream: {e}", exc_info=True) if first_chunk: - raise # only raise if first chunk has not been streamed yet + # Raise if no chunks sent yet (response not started, can return error status code) + raise + else: + # Mid-stream error: yield error event to client in SSE format + error_chunk = { + "error": { + "type": "internal_error", + "message": "An error occurred during agent execution.", + "detail": str(e), + } + } + yield f"event: error\ndata: {json.dumps(error_chunk)}\n\n" if run_id: letta_messages = Message.to_letta_messages_from_list( diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 06a4cac9..17dcd09d 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -247,7 +247,8 @@ async def create_background_stream_processor( # Handle cancellation gracefully - don't write error chunk, cancellation event was already sent logger.info(f"Stream processing stopped due to cancellation for run {run_id}") # The cancellation event was already yielded by cancellation_aware_stream_wrapper - # Just mark as complete, don't write additional error chunks + # Write [DONE] marker to properly close the stream for clients reading from Redis + await writer.write_chunk(run_id=run_id, data="data: [DONE]\n\n", is_complete=True) except Exception as e: logger.error(f"Error processing stream for run {run_id}: {e}") # Write error chunk diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 373af5e8..d5291aae 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -42,6 +42,7 @@ from letta.server.rest_api.streaming_response import ( add_keepalive_to_stream, cancellation_aware_stream_wrapper, ) +from letta.server.rest_api.utils import capture_sentry_exception from letta.services.run_manager import RunManager from letta.settings import settings from letta.utils import safe_create_task @@ -327,7 +328,7 @@ class StreamingService: error_data = {"error": {"type": "llm_timeout", "message": "The LLM request timed out. Please try again.", "detail": str(e)}} stop_reason = StopReasonType.llm_api_error logger.error(f"Run {run_id} stopped with LLM timeout error: {e}, error_data: {error_data}") - yield (f"data: {json.dumps(error_data)}\n\n", 504) + yield f"event: error\ndata: {json.dumps(error_data)}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMRateLimitError as e: @@ -341,7 +342,7 @@ class StreamingService: } stop_reason = StopReasonType.llm_api_error logger.warning(f"Run {run_id} stopped with LLM rate limit error: {e}, error_data: {error_data}") - yield (f"data: {json.dumps(error_data)}\n\n", 429) + yield f"event: error\ndata: {json.dumps(error_data)}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMAuthenticationError as e: @@ -355,16 +356,16 @@ class StreamingService: } logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_data}") stop_reason = StopReasonType.llm_api_error - yield (f"data: {json.dumps(error_data)}\n\n", 401) + yield f"event: error\ndata: {json.dumps(error_data)}\n\n" # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except LLMError as e: run_status = RunStatus.failed error_data = {"error": {"type": "llm_error", "message": "An error occurred with the LLM request.", "detail": str(e)}} logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_data}") - yield (f"data: {json.dumps(error_data)}\n\n", 502) - # Send [DONE] marker to properly close the stream stop_reason = StopReasonType.llm_api_error + yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + # Send [DONE] marker to properly close the stream yield "data: [DONE]\n\n" except Exception as e: run_status = RunStatus.failed @@ -377,9 +378,11 @@ class StreamingService: } logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_data}") stop_reason = StopReasonType.error - yield (f"data: {json.dumps(error_data)}\n\n", 500) - # Re-raise to ensure proper error handling and Sentry capture - raise + yield f"event: error\ndata: {json.dumps(error_data)}\n\n" + # Send [DONE] marker to properly close the stream + yield "data: [DONE]\n\n" + # Capture for Sentry but don't re-raise to allow stream to complete gracefully + capture_sentry_exception(e) finally: # always update run status, whether success or failure if run_id and self.runs_manager and run_status: