diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 0b56c4c6..f5c96168 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -239,11 +239,11 @@ async def create_background_stream_processor( if isinstance(chunk, tuple): chunk = chunk[0] - # Track terminal events + # Track terminal events (check at line start to avoid false positives in message content) if isinstance(chunk, str): - if "data: [DONE]" in chunk: + if "\ndata: [DONE]" in chunk or chunk.startswith("data: [DONE]"): saw_done = True - if "event: error" in chunk: + if "\nevent: error" in chunk or chunk.startswith("event: error"): saw_error = True # Best-effort extraction of the error payload so we can persist it on the run. diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 29e54879..d7950684 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -351,11 +351,11 @@ class StreamingService: ) async for chunk in stream: - # Track terminal events + # Track terminal events (check at line start to avoid false positives in message content) if isinstance(chunk, str): - if "data: [DONE]" in chunk: + if "\ndata: [DONE]" in chunk or chunk.startswith("data: [DONE]"): saw_done = True - if "event: error" in chunk: + if "\nevent: error" in chunk or chunk.startswith("event: error"): saw_error = True yield chunk