From 2bccd36382c1a1cd986980de10be01fafa3f77f0 Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 23 Jan 2026 17:19:32 -0800 Subject: [PATCH] =?UTF-8?q?Revert=20"fix:=20ensure=20stop=5Freason=20is=20?= =?UTF-8?q?always=20set=20and=20reduce=20noisy=20logs=20(=E2=80=A6=20(#908?= =?UTF-8?q?6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert "fix: ensure stop_reason is always set and reduce noisy logs (#9046)" This reverts commit 4241a360579440d2697124ba69061d0e46ecc5e9. **Problem:** After the original change, caren-code-agent reported streams hanging indefinitely. The trace shows ttft (time to first token) succeeds, but the stream never closes. **Root Cause (suspected):** The change modified `is_complete=is_done` to `is_complete=saw_done`, meaning error events no longer mark the stream as complete immediately. This may cause timing issues where clients wait for more data before the finalizer runs. **Fix:** Revert to the defensive "belt-and-suspenders" approach that always appends [DONE]. The noisy logs are preferable to hanging streams. The original comment noted: "Even if a previous chunk set `complete`, an extra [DONE] is harmless and ensures SDKs that rely on explicit [DONE] will exit." 👾 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta --- letta/server/rest_api/redis_stream_manager.py | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 8f1839a8..f5c96168 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -263,10 +263,11 @@ async def create_background_stream_processor( # Don't let parsing failures interfere with streaming error_metadata = {"error": {"message": "Failed to parse error payload from stream."}} - await writer.write_chunk(run_id=run_id, data=chunk, is_complete=saw_done) + is_done = saw_done or saw_error - # Only break after seeing [DONE] or error - if saw_done or saw_error: + await writer.write_chunk(run_id=run_id, data=chunk, is_complete=is_done) + + if is_done: break try: @@ -393,19 +394,20 @@ async def create_background_stream_processor( conversation_id=conversation_id, ) - # Only append [DONE] if we didn't already see it (fallback safety mechanism) - if not saw_done: - logger.warning( - "[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)", - run_id, - saw_error, - saw_done, - final_stop_reason, - ) - try: - await writer.mark_complete(run_id) - except Exception as e: - logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}") + # Belt-and-suspenders: always append a terminal [DONE] chunk to ensure clients terminate + # Even if a previous chunk set `complete`, an extra [DONE] is harmless and ensures SDKs that + # rely on explicit [DONE] will exit. + logger.warning( + "[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)", + run_id, + saw_error, + saw_done, + final_stop_reason, + ) + try: + await writer.mark_complete(run_id) + except Exception as e: + logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}") async def redis_sse_stream_generator(