diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index f5c96168..8f1839a8 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -263,11 +263,10 @@ async def create_background_stream_processor( # Don't let parsing failures interfere with streaming error_metadata = {"error": {"message": "Failed to parse error payload from stream."}} - is_done = saw_done or saw_error + await writer.write_chunk(run_id=run_id, data=chunk, is_complete=saw_done) - await writer.write_chunk(run_id=run_id, data=chunk, is_complete=is_done) - - if is_done: + # Only break after seeing [DONE] or error + if saw_done or saw_error: break try: @@ -394,20 +393,19 @@ async def create_background_stream_processor( conversation_id=conversation_id, ) - # Belt-and-suspenders: always append a terminal [DONE] chunk to ensure clients terminate - # Even if a previous chunk set `complete`, an extra [DONE] is harmless and ensures SDKs that - # rely on explicit [DONE] will exit. - logger.warning( - "[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)", - run_id, - saw_error, - saw_done, - final_stop_reason, - ) - try: - await writer.mark_complete(run_id) - except Exception as e: - logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}") + # Only append [DONE] if we didn't already see it (fallback safety mechanism) + if not saw_done: + logger.warning( + "[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)", + run_id, + saw_error, + saw_done, + final_stop_reason, + ) + try: + await writer.mark_complete(run_id) + except Exception as e: + logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}") async def redis_sse_stream_generator(