From 6599aa3b4442d7e28692a0739c81e7ee46ac7b76 Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 16 Jan 2026 15:41:26 -0800 Subject: [PATCH] feat: populate seq_id for ping messages (#8844) * feat: populate seq_id for ping messages * fix import --- letta/server/rest_api/streaming_response.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/letta/server/rest_api/streaming_response.py b/letta/server/rest_api/streaming_response.py index 385caca9..9869ff5c 100644 --- a/letta/server/rest_api/streaming_response.py +++ b/letta/server/rest_api/streaming_response.py @@ -4,6 +4,7 @@ import asyncio import json +import re from collections.abc import AsyncIterator from datetime import datetime, timezone from uuid import uuid4 @@ -57,6 +58,7 @@ async def add_keepalive_to_stream( # A small maxsize prevents unbounded memory growth if the client is slow queue = asyncio.Queue(maxsize=1) stream_exhausted = False + last_seq_id = None async def stream_reader(): """Read from the original stream and put items in the queue.""" @@ -81,13 +83,19 @@ async def add_keepalive_to_stream( # Stream finished break elif msg_type == "data": + # Track seq_id from chunks for ping messages + if isinstance(data, str): + seq_id_match = re.search(r'"seq_id":(\d+)', data) # Look for "seq_id": pattern in the SSE chunk + if seq_id_match: + last_seq_id = int(seq_id_match.group(1)) + yield data except asyncio.TimeoutError: # No data received within keepalive interval if not stream_exhausted: - # Send keepalive ping in the same format as [DONE] - yield f"data: {LettaPing(id=f'ping-{uuid4()}', date=datetime.now(timezone.utc), run_id=run_id).model_dump_json()}\n\n" + # Send keepalive ping with the last seq_id to allow clients to track progress + yield f"data: {LettaPing(id=f'ping-{uuid4()}', date=datetime.now(timezone.utc), run_id=run_id, seq_id=last_seq_id).model_dump_json()}\n\n" else: # Stream is done but queue might be processing # Check if there's anything left