using uuid and datetime [LET-5508] (#5430)

* using uuid and datetime

* add run_id

---------

Co-authored-by: Ari Webb <ari@letta.com>
This commit is contained in:
Ari Webb
2025-10-14 14:12:43 -07:00
committed by Caren Thomas
parent 09ba075cfa
commit 9e94c344b8
10 changed files with 44 additions and 63 deletions

View File

@@ -28,8 +28,7 @@ from letta.schemas.embedding_config import EmbeddingConfig
from letta.schemas.enums import JobStatus
from letta.schemas.file import FileMetadata
from letta.schemas.job import Job
from letta.schemas.letta_message import LettaMessage
from letta.schemas.letta_ping import LettaPing
from letta.schemas.letta_message import LettaMessage, LettaPing
from letta.schemas.letta_stop_reason import LettaStopReason
from letta.schemas.llm_config import LLMConfig
from letta.schemas.memory import ArchivalMemorySummary, BasicBlockMemory, ChatMemory, Memory, RecallMemorySummary

View File

@@ -348,6 +348,21 @@ class AssistantMessage(LettaMessage):
)
class LettaPing(LettaMessage):
"""
A ping message used as a keepalive to prevent SSE streams from timing out during long running requests.
Args:
id (str): The ID of the message
date (datetime): The date the message was created in ISO format
"""
message_type: Literal["ping"] = Field(
"ping",
description="The type of the message. Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.",
)
# NOTE: use Pydantic's discriminated unions feature: https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions
LettaMessageUnion = Annotated[
Union[
@@ -395,6 +410,24 @@ def create_letta_message_union_schema():
}
def create_letta_ping_schema():
return {
"properties": {
"message_type": {
"type": "string",
"const": "ping",
"title": "Message Type",
"description": "The type of the message.",
"default": "ping",
}
},
"type": "object",
"required": ["message_type"],
"title": "LettaPing",
"description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.",
}
# --------------------------
# Message Update API Schemas
# --------------------------

View File

@@ -1,28 +0,0 @@
from typing import Literal
from pydantic import BaseModel, Field
def create_letta_ping_schema():
return {
"properties": {
"message_type": {
"type": "string",
"const": "ping",
"title": "Message Type",
"description": "The type of the message.",
"default": "ping",
}
},
"type": "object",
"required": ["message_type"],
"title": "LettaPing",
"description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.",
}
class LettaPing(BaseModel):
message_type: Literal["ping"] = Field(
"ping",
description="The type of the message. Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.",
)

View File

@@ -48,28 +48,3 @@ class LettaStopReason(BaseModel):
message_type: Literal["stop_reason"] = Field("stop_reason", description="The type of the message.")
stop_reason: StopReasonType = Field(..., description="The reason why execution stopped.")
def create_letta_ping_schema():
return {
"properties": {
"message_type": {
"type": "string",
"const": "ping",
"title": "Message Type",
"description": "The type of the message.",
"default": "ping",
}
},
"type": "object",
"required": ["message_type"],
"title": "LettaPing",
"description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.",
}
class LettaPing(BaseModel):
message_type: Literal["ping"] = Field(
"ping",
description="The type of the message. Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.",
)

View File

@@ -42,13 +42,12 @@ from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pineco
from letta.jobs.scheduler import start_scheduler_with_leader_election
from letta.log import get_logger
from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError
from letta.schemas.letta_message import create_letta_message_union_schema
from letta.schemas.letta_message import create_letta_message_union_schema, create_letta_ping_schema
from letta.schemas.letta_message_content import (
create_letta_assistant_message_content_union_schema,
create_letta_message_content_union_schema,
create_letta_user_message_content_union_schema,
)
from letta.schemas.letta_ping import create_letta_ping_schema
from letta.server.constants import REST_DEFAULT_PORT
from letta.server.db import db_registry

View File

@@ -370,7 +370,7 @@ async def retrieve_stream(
)
if request.include_pings and settings.enable_keepalive:
stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval)
stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval, run_id=run_id)
return StreamingResponseWithStatusCode(
stream,

View File

@@ -5,6 +5,8 @@
import asyncio
import json
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from uuid import uuid4
import anyio
from fastapi import HTTPException
@@ -14,7 +16,7 @@ from starlette.types import Send
from letta.errors import LettaUnexpectedStreamCancellationError, PendingApprovalError
from letta.log import get_logger
from letta.schemas.enums import RunStatus
from letta.schemas.letta_ping import LettaPing
from letta.schemas.letta_message import LettaPing
from letta.schemas.user import User
from letta.server.rest_api.utils import capture_sentry_exception
from letta.services.run_manager import RunManager
@@ -34,6 +36,7 @@ class RunCancelledException(Exception):
async def add_keepalive_to_stream(
stream_generator: AsyncIterator[str | bytes],
run_id: str,
keepalive_interval: float = 30.0,
) -> AsyncIterator[str | bytes]:
"""
@@ -83,7 +86,7 @@ async def add_keepalive_to_stream(
# No data received within keepalive interval
if not stream_exhausted:
# Send keepalive ping in the same format as [DONE]
yield f"data: {LettaPing().model_dump_json()}\n\n"
yield f"data: {LettaPing(id=f'ping-{uuid4()}', date=datetime.now(timezone.utc), run_id=run_id).model_dump_json()}\n\n"
else:
# Stream is done but queue might be processing
# Check if there's anything left

View File

@@ -143,7 +143,7 @@ class StreamingService:
# conditionally wrap with keepalive based on request parameter
if request.include_pings and settings.enable_keepalive:
stream = add_keepalive_to_stream(raw_stream, keepalive_interval=settings.keepalive_interval)
stream = add_keepalive_to_stream(raw_stream, keepalive_interval=settings.keepalive_interval, run_id=run.id)
else:
stream = raw_stream

View File

@@ -36,7 +36,7 @@ from letta.helpers.reasoning_helper import is_reasoning_completely_disabled
from letta.llm_api.openai_client import is_openai_reasoning_model
from letta.log import get_logger
from letta.schemas.agent import AgentState
from letta.schemas.letta_ping import LettaPing
from letta.schemas.letta_message import LettaPing
from letta.schemas.llm_config import LLMConfig
logger = get_logger(__name__)

View File

@@ -36,7 +36,7 @@ from letta_client.types import (
from letta.log import get_logger
from letta.schemas.agent import AgentState
from letta.schemas.enums import AgentType, JobStatus
from letta.schemas.letta_ping import LettaPing
from letta.schemas.letta_message import LettaPing
from letta.schemas.llm_config import LLMConfig
logger = get_logger(__name__)