From 9e94c344b8b6ae626ced1ec42d28ecfd98def37e Mon Sep 17 00:00:00 2001 From: Ari Webb Date: Tue, 14 Oct 2025 14:12:43 -0700 Subject: [PATCH] using uuid and datetime [LET-5508] (#5430) * using uuid and datetime * add run_id --------- Co-authored-by: Ari Webb --- letta/__init__.py | 3 +- letta/schemas/letta_message.py | 33 +++++++++++++++++++++ letta/schemas/letta_ping.py | 28 ----------------- letta/schemas/letta_stop_reason.py | 25 ---------------- letta/server/rest_api/app.py | 3 +- letta/server/rest_api/routers/v1/runs.py | 2 +- letta/server/rest_api/streaming_response.py | 7 +++-- letta/services/streaming_service.py | 2 +- tests/integration_test_send_message.py | 2 +- tests/integration_test_send_message_v2.py | 2 +- 10 files changed, 44 insertions(+), 63 deletions(-) delete mode 100644 letta/schemas/letta_ping.py diff --git a/letta/__init__.py b/letta/__init__.py index a40c00c8..127733d2 100644 --- a/letta/__init__.py +++ b/letta/__init__.py @@ -28,8 +28,7 @@ from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import JobStatus from letta.schemas.file import FileMetadata from letta.schemas.job import Job -from letta.schemas.letta_message import LettaMessage -from letta.schemas.letta_ping import LettaPing +from letta.schemas.letta_message import LettaMessage, LettaPing from letta.schemas.letta_stop_reason import LettaStopReason from letta.schemas.llm_config import LLMConfig from letta.schemas.memory import ArchivalMemorySummary, BasicBlockMemory, ChatMemory, Memory, RecallMemorySummary diff --git a/letta/schemas/letta_message.py b/letta/schemas/letta_message.py index 0d104230..de170708 100644 --- a/letta/schemas/letta_message.py +++ b/letta/schemas/letta_message.py @@ -348,6 +348,21 @@ class AssistantMessage(LettaMessage): ) +class LettaPing(LettaMessage): + """ + A ping message used as a keepalive to prevent SSE streams from timing out during long running requests. + + Args: + id (str): The ID of the message + date (datetime): The date the message was created in ISO format + """ + + message_type: Literal["ping"] = Field( + "ping", + description="The type of the message. Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.", + ) + + # NOTE: use Pydantic's discriminated unions feature: https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions LettaMessageUnion = Annotated[ Union[ @@ -395,6 +410,24 @@ def create_letta_message_union_schema(): } +def create_letta_ping_schema(): + return { + "properties": { + "message_type": { + "type": "string", + "const": "ping", + "title": "Message Type", + "description": "The type of the message.", + "default": "ping", + } + }, + "type": "object", + "required": ["message_type"], + "title": "LettaPing", + "description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.", + } + + # -------------------------- # Message Update API Schemas # -------------------------- diff --git a/letta/schemas/letta_ping.py b/letta/schemas/letta_ping.py deleted file mode 100644 index 05ba9c65..00000000 --- a/letta/schemas/letta_ping.py +++ /dev/null @@ -1,28 +0,0 @@ -from typing import Literal - -from pydantic import BaseModel, Field - - -def create_letta_ping_schema(): - return { - "properties": { - "message_type": { - "type": "string", - "const": "ping", - "title": "Message Type", - "description": "The type of the message.", - "default": "ping", - } - }, - "type": "object", - "required": ["message_type"], - "title": "LettaPing", - "description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.", - } - - -class LettaPing(BaseModel): - message_type: Literal["ping"] = Field( - "ping", - description="The type of the message. Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.", - ) diff --git a/letta/schemas/letta_stop_reason.py b/letta/schemas/letta_stop_reason.py index fe62d742..d0f70915 100644 --- a/letta/schemas/letta_stop_reason.py +++ b/letta/schemas/letta_stop_reason.py @@ -48,28 +48,3 @@ class LettaStopReason(BaseModel): message_type: Literal["stop_reason"] = Field("stop_reason", description="The type of the message.") stop_reason: StopReasonType = Field(..., description="The reason why execution stopped.") - - -def create_letta_ping_schema(): - return { - "properties": { - "message_type": { - "type": "string", - "const": "ping", - "title": "Message Type", - "description": "The type of the message.", - "default": "ping", - } - }, - "type": "object", - "required": ["message_type"], - "title": "LettaPing", - "description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.", - } - - -class LettaPing(BaseModel): - message_type: Literal["ping"] = Field( - "ping", - description="The type of the message. Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests.", - ) diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 34113bd3..4761d76a 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -42,13 +42,12 @@ from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pineco from letta.jobs.scheduler import start_scheduler_with_leader_election from letta.log import get_logger from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError -from letta.schemas.letta_message import create_letta_message_union_schema +from letta.schemas.letta_message import create_letta_message_union_schema, create_letta_ping_schema from letta.schemas.letta_message_content import ( create_letta_assistant_message_content_union_schema, create_letta_message_content_union_schema, create_letta_user_message_content_union_schema, ) -from letta.schemas.letta_ping import create_letta_ping_schema from letta.server.constants import REST_DEFAULT_PORT from letta.server.db import db_registry diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index 8822722b..4348724e 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -370,7 +370,7 @@ async def retrieve_stream( ) if request.include_pings and settings.enable_keepalive: - stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval) + stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval, run_id=run_id) return StreamingResponseWithStatusCode( stream, diff --git a/letta/server/rest_api/streaming_response.py b/letta/server/rest_api/streaming_response.py index ee332af7..6dab5f23 100644 --- a/letta/server/rest_api/streaming_response.py +++ b/letta/server/rest_api/streaming_response.py @@ -5,6 +5,8 @@ import asyncio import json from collections.abc import AsyncIterator +from datetime import datetime, timezone +from uuid import uuid4 import anyio from fastapi import HTTPException @@ -14,7 +16,7 @@ from starlette.types import Send from letta.errors import LettaUnexpectedStreamCancellationError, PendingApprovalError from letta.log import get_logger from letta.schemas.enums import RunStatus -from letta.schemas.letta_ping import LettaPing +from letta.schemas.letta_message import LettaPing from letta.schemas.user import User from letta.server.rest_api.utils import capture_sentry_exception from letta.services.run_manager import RunManager @@ -34,6 +36,7 @@ class RunCancelledException(Exception): async def add_keepalive_to_stream( stream_generator: AsyncIterator[str | bytes], + run_id: str, keepalive_interval: float = 30.0, ) -> AsyncIterator[str | bytes]: """ @@ -83,7 +86,7 @@ async def add_keepalive_to_stream( # No data received within keepalive interval if not stream_exhausted: # Send keepalive ping in the same format as [DONE] - yield f"data: {LettaPing().model_dump_json()}\n\n" + yield f"data: {LettaPing(id=f'ping-{uuid4()}', date=datetime.now(timezone.utc), run_id=run_id).model_dump_json()}\n\n" else: # Stream is done but queue might be processing # Check if there's anything left diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 66c03ae4..c6ea8e91 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -143,7 +143,7 @@ class StreamingService: # conditionally wrap with keepalive based on request parameter if request.include_pings and settings.enable_keepalive: - stream = add_keepalive_to_stream(raw_stream, keepalive_interval=settings.keepalive_interval) + stream = add_keepalive_to_stream(raw_stream, keepalive_interval=settings.keepalive_interval, run_id=run.id) else: stream = raw_stream diff --git a/tests/integration_test_send_message.py b/tests/integration_test_send_message.py index fce27cfc..91322936 100644 --- a/tests/integration_test_send_message.py +++ b/tests/integration_test_send_message.py @@ -36,7 +36,7 @@ from letta.helpers.reasoning_helper import is_reasoning_completely_disabled from letta.llm_api.openai_client import is_openai_reasoning_model from letta.log import get_logger from letta.schemas.agent import AgentState -from letta.schemas.letta_ping import LettaPing +from letta.schemas.letta_message import LettaPing from letta.schemas.llm_config import LLMConfig logger = get_logger(__name__) diff --git a/tests/integration_test_send_message_v2.py b/tests/integration_test_send_message_v2.py index c56daffb..9553fbbb 100644 --- a/tests/integration_test_send_message_v2.py +++ b/tests/integration_test_send_message_v2.py @@ -36,7 +36,7 @@ from letta_client.types import ( from letta.log import get_logger from letta.schemas.agent import AgentState from letta.schemas.enums import AgentType, JobStatus -from letta.schemas.letta_ping import LettaPing +from letta.schemas.letta_message import LettaPing from letta.schemas.llm_config import LLMConfig logger = get_logger(__name__)