feat: populate seq_id for ping messages (#8844)
* feat: populate seq_id for ping messages * fix import
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import re
|
||||
from collections.abc import AsyncIterator
|
||||
from datetime import datetime, timezone
|
||||
from uuid import uuid4
|
||||
@@ -57,6 +58,7 @@ async def add_keepalive_to_stream(
|
||||
# A small maxsize prevents unbounded memory growth if the client is slow
|
||||
queue = asyncio.Queue(maxsize=1)
|
||||
stream_exhausted = False
|
||||
last_seq_id = None
|
||||
|
||||
async def stream_reader():
|
||||
"""Read from the original stream and put items in the queue."""
|
||||
@@ -81,13 +83,19 @@ async def add_keepalive_to_stream(
|
||||
# Stream finished
|
||||
break
|
||||
elif msg_type == "data":
|
||||
# Track seq_id from chunks for ping messages
|
||||
if isinstance(data, str):
|
||||
seq_id_match = re.search(r'"seq_id":(\d+)', data) # Look for "seq_id":<number> pattern in the SSE chunk
|
||||
if seq_id_match:
|
||||
last_seq_id = int(seq_id_match.group(1))
|
||||
|
||||
yield data
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No data received within keepalive interval
|
||||
if not stream_exhausted:
|
||||
# Send keepalive ping in the same format as [DONE]
|
||||
yield f"data: {LettaPing(id=f'ping-{uuid4()}', date=datetime.now(timezone.utc), run_id=run_id).model_dump_json()}\n\n"
|
||||
# Send keepalive ping with the last seq_id to allow clients to track progress
|
||||
yield f"data: {LettaPing(id=f'ping-{uuid4()}', date=datetime.now(timezone.utc), run_id=run_id, seq_id=last_seq_id).model_dump_json()}\n\n"
|
||||
else:
|
||||
# Stream is done but queue might be processing
|
||||
# Check if there's anything left
|
||||
|
||||
Reference in New Issue
Block a user