From e4c58da682cb64958629f48a4432fba46cbb0319 Mon Sep 17 00:00:00 2001 From: cthomas Date: Wed, 19 Nov 2025 20:47:06 -0800 Subject: [PATCH] fix: streaming error for stop reason chunks (#6285) --- letta/server/rest_api/redis_stream_manager.py | 11 +++++++++-- letta/utils.py | 13 +++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 2e7625e1..cd1385ff 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -273,8 +273,15 @@ async def create_background_stream_processor( message="Stream ended unexpectedly without stop_reason.", detail=None, ) - yield f"data: {LettaStopReason(stop_reason=StopReasonType.error).model_dump_json()}\n\n" - yield f"event: error\ndata: {error_message.model_dump_json()}\n\n" + # Write error chunks to Redis instead of yielding (this is a background task, not a generator) + await writer.write_chunk( + run_id=run_id, + data=f"data: {LettaStopReason(stop_reason=StopReasonType.error).model_dump_json()}\n\n", + is_complete=False, + ) + await writer.write_chunk( + run_id=run_id, data=f"event: error\ndata: {error_message.model_dump_json()}\n\n", is_complete=False + ) await writer.write_chunk(run_id=run_id, data="data: [DONE]\n\n", is_complete=True) saw_error = True saw_done = True diff --git a/letta/utils.py b/letta/utils.py index ae2f6d2c..3401207e 100644 --- a/letta/utils.py +++ b/letta/utils.py @@ -1121,6 +1121,19 @@ def get_background_task_count() -> int: @trace_method def safe_create_task(coro, label: str = "background task"): + # Check if coro is an async generator instead of a coroutine + if inspect.isasyncgen(coro): + raise TypeError( + f"{label}: Cannot create task from async generator. " + "Async generators must be consumed with 'async for', not 'await'. " + "If you need to run an async generator as a task, wrap it in an async function." + ) + + if not inspect.iscoroutine(coro): + raise TypeError( + f"{label}: Expected a coroutine, got {type(coro).__name__}. Make sure you're calling the async function with () parentheses." + ) + async def wrapper(): try: await coro