fix(core): patch sse streaming errors (#5906)

* fix: patch sse streaming errors

* fix: don't re-raise, but log explicitly with sentry

* chore: cleanup comments

* fix: revert change from #5907, also make sure to write out a [DONE] to close the stream
This commit is contained in:
Charles Packer
2025-11-02 20:38:37 -08:00
committed by Caren Thomas
parent ac599145bb
commit 468b47bef5
3 changed files with 27 additions and 12 deletions

View File

@@ -283,8 +283,8 @@ class LettaAgentV3(LettaAgentV2):
# Clear to avoid duplication in next iteration
self.response_messages = []
# if not self.should_continue:
# break
if not self.should_continue:
break
input_messages_to_persist = []
@@ -305,7 +305,18 @@ class LettaAgentV3(LettaAgentV2):
except Exception as e:
self.logger.warning(f"Error during agent stream: {e}", exc_info=True)
if first_chunk:
raise # only raise if first chunk has not been streamed yet
# Raise if no chunks sent yet (response not started, can return error status code)
raise
else:
# Mid-stream error: yield error event to client in SSE format
error_chunk = {
"error": {
"type": "internal_error",
"message": "An error occurred during agent execution.",
"detail": str(e),
}
}
yield f"event: error\ndata: {json.dumps(error_chunk)}\n\n"
if run_id:
letta_messages = Message.to_letta_messages_from_list(

View File

@@ -247,7 +247,8 @@ async def create_background_stream_processor(
# Handle cancellation gracefully - don't write error chunk, cancellation event was already sent
logger.info(f"Stream processing stopped due to cancellation for run {run_id}")
# The cancellation event was already yielded by cancellation_aware_stream_wrapper
# Just mark as complete, don't write additional error chunks
# Write [DONE] marker to properly close the stream for clients reading from Redis
await writer.write_chunk(run_id=run_id, data="data: [DONE]\n\n", is_complete=True)
except Exception as e:
logger.error(f"Error processing stream for run {run_id}: {e}")
# Write error chunk

View File

@@ -42,6 +42,7 @@ from letta.server.rest_api.streaming_response import (
add_keepalive_to_stream,
cancellation_aware_stream_wrapper,
)
from letta.server.rest_api.utils import capture_sentry_exception
from letta.services.run_manager import RunManager
from letta.settings import settings
from letta.utils import safe_create_task
@@ -327,7 +328,7 @@ class StreamingService:
error_data = {"error": {"type": "llm_timeout", "message": "The LLM request timed out. Please try again.", "detail": str(e)}}
stop_reason = StopReasonType.llm_api_error
logger.error(f"Run {run_id} stopped with LLM timeout error: {e}, error_data: {error_data}")
yield (f"data: {json.dumps(error_data)}\n\n", 504)
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMRateLimitError as e:
@@ -341,7 +342,7 @@ class StreamingService:
}
stop_reason = StopReasonType.llm_api_error
logger.warning(f"Run {run_id} stopped with LLM rate limit error: {e}, error_data: {error_data}")
yield (f"data: {json.dumps(error_data)}\n\n", 429)
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMAuthenticationError as e:
@@ -355,16 +356,16 @@ class StreamingService:
}
logger.warning(f"Run {run_id} stopped with LLM authentication error: {e}, error_data: {error_data}")
stop_reason = StopReasonType.llm_api_error
yield (f"data: {json.dumps(error_data)}\n\n", 401)
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except LLMError as e:
run_status = RunStatus.failed
error_data = {"error": {"type": "llm_error", "message": "An error occurred with the LLM request.", "detail": str(e)}}
logger.error(f"Run {run_id} stopped with LLM error: {e}, error_data: {error_data}")
yield (f"data: {json.dumps(error_data)}\n\n", 502)
# Send [DONE] marker to properly close the stream
stop_reason = StopReasonType.llm_api_error
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
except Exception as e:
run_status = RunStatus.failed
@@ -377,9 +378,11 @@ class StreamingService:
}
logger.error(f"Run {run_id} stopped with unknown error: {e}, error_data: {error_data}")
stop_reason = StopReasonType.error
yield (f"data: {json.dumps(error_data)}\n\n", 500)
# Re-raise to ensure proper error handling and Sentry capture
raise
yield f"event: error\ndata: {json.dumps(error_data)}\n\n"
# Send [DONE] marker to properly close the stream
yield "data: [DONE]\n\n"
# Capture for Sentry but don't re-raise to allow stream to complete gracefully
capture_sentry_exception(e)
finally:
# always update run status, whether success or failure
if run_id and self.runs_manager and run_status: