From a85abe71b718e177c4c1ea92477ee22a628ca276 Mon Sep 17 00:00:00 2001 From: jnjpng Date: Tue, 28 Oct 2025 22:52:50 -0700 Subject: [PATCH] feat: add streaming response type to messages stream for stainless [LET-5949] (#5807) * base * generate * fix * update * done * yaml --------- Co-authored-by: Letta Bot --- fern/openapi.json | 93 ++++++++++++++++++---- letta/schemas/letta_response.py | 40 +++++++++- letta/server/rest_api/routers/v1/agents.py | 4 +- 3 files changed, 115 insertions(+), 22 deletions(-) diff --git a/fern/openapi.json b/fern/openapi.json index 382d63f5..f383b22f 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -7293,7 +7293,9 @@ "description": "Successful response", "content": { "application/json": { - "schema": {} + "schema": { + "$ref": "#/components/schemas/LettaStreamingResponse" + } }, "text/event-stream": { "description": "Server-Sent Events stream" @@ -27280,6 +27282,21 @@ "required": ["file_id"], "title": "LettaImage" }, + "LettaPing": { + "properties": { + "message_type": { + "type": "string", + "const": "ping", + "title": "Message Type", + "description": "The type of the message.", + "default": "ping" + } + }, + "type": "object", + "required": ["message_type"], + "title": "LettaPing", + "description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests." + }, "LettaRequest": { "properties": { "messages": { @@ -27521,6 +27538,65 @@ "required": ["messages"], "title": "LettaStreamingRequest" }, + "LettaStreamingResponse": { + "oneOf": [ + { + "$ref": "#/components/schemas/SystemMessage" + }, + { + "$ref": "#/components/schemas/UserMessage" + }, + { + "$ref": "#/components/schemas/ReasoningMessage" + }, + { + "$ref": "#/components/schemas/HiddenReasoningMessage" + }, + { + "$ref": "#/components/schemas/ToolCallMessage" + }, + { + "$ref": "#/components/schemas/ToolReturnMessage" + }, + { + "$ref": "#/components/schemas/AssistantMessage" + }, + { + "$ref": "#/components/schemas/ApprovalRequestMessage" + }, + { + "$ref": "#/components/schemas/ApprovalResponseMessage" + }, + { + "$ref": "#/components/schemas/LettaPing" + }, + { + "$ref": "#/components/schemas/LettaStopReason" + }, + { + "$ref": "#/components/schemas/LettaUsageStatistics" + } + ], + "title": "LettaStreamingResponse", + "description": "Streaming response type for Server-Sent Events (SSE) endpoints.\nEach event in the stream will be one of these types.", + "discriminator": { + "propertyName": "message_type", + "mapping": { + "approval_request_message": "#/components/schemas/ApprovalRequestMessage", + "approval_response_message": "#/components/schemas/ApprovalResponseMessage", + "assistant_message": "#/components/schemas/AssistantMessage", + "hidden_reasoning_message": "#/components/schemas/HiddenReasoningMessage", + "ping": "#/components/schemas/LettaPing", + "reasoning_message": "#/components/schemas/ReasoningMessage", + "stop_reason": "#/components/schemas/LettaStopReason", + "system_message": "#/components/schemas/SystemMessage", + "tool_call_message": "#/components/schemas/ToolCallMessage", + "tool_return_message": "#/components/schemas/ToolReturnMessage", + "usage_statistics": "#/components/schemas/LettaUsageStatistics", + "user_message": "#/components/schemas/UserMessage" + } + } + }, "LettaUsageStatistics": { "properties": { "message_type": { @@ -36518,21 +36594,6 @@ "image": "#/components/schemas/ImageContent" } } - }, - "LettaPing": { - "properties": { - "message_type": { - "type": "string", - "const": "ping", - "title": "Message Type", - "description": "The type of the message.", - "default": "ping" - } - }, - "type": "object", - "required": ["message_type"], - "title": "LettaPing", - "description": "Ping messages are a keep-alive to prevent SSE streams from timing out during long running requests." } }, "securitySchemes": { diff --git a/letta/schemas/letta_response.py b/letta/schemas/letta_response.py index 428d263a..1a37fba4 100644 --- a/letta/schemas/letta_response.py +++ b/letta/schemas/letta_response.py @@ -4,11 +4,24 @@ import re from datetime import datetime from typing import List, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, RootModel from letta.helpers.json_helpers import json_dumps from letta.schemas.enums import JobStatus, MessageStreamStatus -from letta.schemas.letta_message import LettaMessage, LettaMessageUnion +from letta.schemas.letta_message import ( + ApprovalRequestMessage, + ApprovalResponseMessage, + AssistantMessage, + HiddenReasoningMessage, + LettaMessage, + LettaMessageUnion, + LettaPing, + ReasoningMessage, + SystemMessage, + ToolCallMessage, + ToolReturnMessage, + UserMessage, +) from letta.schemas.letta_stop_reason import LettaStopReason from letta.schemas.message import Message from letta.schemas.usage import LettaUsageStatistics @@ -170,8 +183,27 @@ class LettaResponse(BaseModel): return html_output -# The streaming response is either [DONE], [DONE_STEP], [DONE], an error, or a LettaMessage -LettaStreamingResponse = Union[LettaMessage, MessageStreamStatus, LettaStopReason, LettaUsageStatistics] +# The streaming response can be any of the individual message types, plus metadata types +class LettaStreamingResponse(RootModel): + """ + Streaming response type for Server-Sent Events (SSE) endpoints. + Each event in the stream will be one of these types. + """ + + root: Union[ + SystemMessage, + UserMessage, + ReasoningMessage, + HiddenReasoningMessage, + ToolCallMessage, + ToolReturnMessage, + AssistantMessage, + ApprovalRequestMessage, + ApprovalResponseMessage, + LettaPing, + LettaStopReason, + LettaUsageStatistics, + ] = Field(..., discriminator="message_type") class LettaBatchResponse(BaseModel): diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 380404c3..c9a85376 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -41,7 +41,7 @@ from letta.schemas.job import LettaRequestConfig from letta.schemas.letta_message import LettaMessageUnion, LettaMessageUpdateUnion, MessageType from letta.schemas.letta_message_content import TextContent from letta.schemas.letta_request import LettaAsyncRequest, LettaRequest, LettaStreamingRequest -from letta.schemas.letta_response import LettaResponse +from letta.schemas.letta_response import LettaResponse, LettaStreamingResponse from letta.schemas.letta_stop_reason import StopReasonType from letta.schemas.memory import ( ArchivalMemorySearchResponse, @@ -1396,7 +1396,7 @@ async def send_message( # noinspection PyInconsistentReturns @router.post( "/{agent_id}/messages/stream", - response_model=None, + response_model=LettaStreamingResponse, operation_id="create_agent_message_stream", responses={ 200: {