Revert "fix: ensure stop_reason is always set and reduce noisy logs (… (#9086)

Revert "fix: ensure stop_reason is always set and reduce noisy logs (#9046)"

This reverts commit 4241a360579440d2697124ba69061d0e46ecc5e9.

**Problem:**
After the original change, caren-code-agent reported streams hanging
indefinitely. The trace shows ttft (time to first token) succeeds, but
the stream never closes.

**Root Cause (suspected):**
The change modified `is_complete=is_done` to `is_complete=saw_done`,
meaning error events no longer mark the stream as complete immediately.
This may cause timing issues where clients wait for more data before
the finalizer runs.

**Fix:**
Revert to the defensive "belt-and-suspenders" approach that always
appends [DONE]. The noisy logs are preferable to hanging streams.

The original comment noted: "Even if a previous chunk set `complete`,
an extra [DONE] is harmless and ensures SDKs that rely on explicit
[DONE] will exit."

👾 Generated with [Letta Code](https://letta.com)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
cthomas
2026-01-23 17:19:32 -08:00
committed by Caren Thomas
parent 18274b5a42
commit 2bccd36382

View File

@@ -263,10 +263,11 @@ async def create_background_stream_processor(
# Don't let parsing failures interfere with streaming
error_metadata = {"error": {"message": "Failed to parse error payload from stream."}}
await writer.write_chunk(run_id=run_id, data=chunk, is_complete=saw_done)
is_done = saw_done or saw_error
# Only break after seeing [DONE] or error
if saw_done or saw_error:
await writer.write_chunk(run_id=run_id, data=chunk, is_complete=is_done)
if is_done:
break
try:
@@ -393,19 +394,20 @@ async def create_background_stream_processor(
conversation_id=conversation_id,
)
# Only append [DONE] if we didn't already see it (fallback safety mechanism)
if not saw_done:
logger.warning(
"[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)",
run_id,
saw_error,
saw_done,
final_stop_reason,
)
try:
await writer.mark_complete(run_id)
except Exception as e:
logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}")
# Belt-and-suspenders: always append a terminal [DONE] chunk to ensure clients terminate
# Even if a previous chunk set `complete`, an extra [DONE] is harmless and ensures SDKs that
# rely on explicit [DONE] will exit.
logger.warning(
"[Stream Finalizer] Appending forced [DONE] for run=%s (saw_error=%s, saw_done=%s, final_stop_reason=%s)",
run_id,
saw_error,
saw_done,
final_stop_reason,
)
try:
await writer.mark_complete(run_id)
except Exception as e:
logger.warning(f"Failed to append terminal [DONE] for run {run_id}: {e}")
async def redis_sse_stream_generator(