diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 8f1839a8..f5c96168 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -263,10 +263,11 @@ async def create_background_stream_processor( # Don't let parsing failures interfere with streaming error_metadata = {"error": {"message": "Failed to parse error payload from stream."}} - await writer.write_chunk(run_id=run_id, data=chunk, is_complete=saw_done) + is_done = saw_done or saw_error - # Only break after seeing [DONE] or error - if saw_done or saw_error: + await writer.write_chunk(run_id=run_id, data=chunk, is_complete=is_done) + + if is_done: break try: @@ -393,19 +394,20 @@ async def create_background_stream_processor( conversation_id=conversation_id, ) - # Only append [DONE] if we didn't already see it (fallback safety mechanism) - if not saw_done: - logger.warning( - "[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)", - run_id, - saw_error, - saw_done, - final_stop_reason, - ) - try: - await writer.mark_complete(run_id) - except Exception as e: - logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}") + # Belt-and-suspenders: always append a terminal [DONE] chunk to ensure clients terminate + # Even if a previous chunk set `complete`, an extra [DONE] is harmless and ensures SDKs that + # rely on explicit [DONE] will exit. + logger.warning( + "[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)", + run_id, + saw_error, + saw_done, + final_stop_reason, + ) + try: + await writer.mark_complete(run_id) + except Exception as e: + logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}") async def redis_sse_stream_generator(