From edbff34d6335ec58605156cae34b26bf47a2f940 Mon Sep 17 00:00:00 2001 From: jnjpng Date: Fri, 7 Nov 2025 13:43:28 -0800 Subject: [PATCH] feat: enable streaming flag on send message [LET-6100] (#6040) * base * base * update * stainless * final --------- Co-authored-by: Letta Bot --- fern/openapi.json | 21 +++++-- letta/schemas/letta_request.py | 10 +++- letta/server/rest_api/routers/v1/agents.py | 69 +++++++++++++++++++--- 3 files changed, 84 insertions(+), 16 deletions(-) diff --git a/fern/openapi.json b/fern/openapi.json index 38ebbb36..dfdb66fb 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -7272,7 +7272,7 @@ "post": { "tags": ["agents"], "summary": "Send Message", - "description": "Process a user message and return the agent's response.\nThis endpoint accepts a message from a user and processes it through the agent.", + "description": "Process a user message and return the agent's response.\nThis endpoint accepts a message from a user and processes it through the agent.\n\nThe response format is controlled by the `streaming` field in the request body:\n- If `streaming=false` (default): Returns a complete LettaResponse with all messages\n- If `streaming=true`: Returns a Server-Sent Events (SSE) stream\n\nAdditional streaming options (only used when streaming=true):\n- `stream_tokens`: Stream individual tokens instead of complete steps\n- `include_pings`: Include keepalive pings to prevent connection timeouts\n- `background`: Process the request in the background", "operationId": "send_message", "parameters": [ { @@ -7296,19 +7296,22 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/LettaRequest" + "$ref": "#/components/schemas/LettaStreamingRequest" } } } }, "responses": { "200": { - "description": "Successful Response", + "description": "Successful response", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/LettaResponse" } + }, + "text/event-stream": { + "description": "Server-Sent Events stream (when streaming=true in request body)" } } }, @@ -28595,22 +28598,28 @@ "default": true, "deprecated": true }, + "streaming": { + "type": "boolean", + "title": "Streaming", + "description": "If True, returns a streaming response (Server-Sent Events). If False (default), returns a complete response.", + "default": false + }, "stream_tokens": { "type": "boolean", "title": "Stream Tokens", - "description": "Flag to determine if individual tokens should be streamed, rather than streaming per step.", + "description": "Flag to determine if individual tokens should be streamed, rather than streaming per step (only used when streaming=true).", "default": false }, "include_pings": { "type": "boolean", "title": "Include Pings", - "description": "Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts.", + "description": "Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts (only used when streaming=true).", "default": true }, "background": { "type": "boolean", "title": "Background", - "description": "Whether to process the request in the background.", + "description": "Whether to process the request in the background (only used when streaming=true).", "default": false } }, diff --git a/letta/schemas/letta_request.py b/letta/schemas/letta_request.py index c75b86a8..7d4d2c18 100644 --- a/letta/schemas/letta_request.py +++ b/letta/schemas/letta_request.py @@ -80,17 +80,21 @@ class LettaRequest(BaseModel): class LettaStreamingRequest(LettaRequest): + streaming: bool = Field( + default=False, + description="If True, returns a streaming response (Server-Sent Events). If False (default), returns a complete response.", + ) stream_tokens: bool = Field( default=False, - description="Flag to determine if individual tokens should be streamed, rather than streaming per step.", + description="Flag to determine if individual tokens should be streamed, rather than streaming per step (only used when streaming=true).", ) include_pings: bool = Field( default=True, - description="Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts.", + description="Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts (only used when streaming=true).", ) background: bool = Field( default=False, - description="Whether to process the request in the background.", + description="Whether to process the request in the background (only used when streaming=true).", ) diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index e550c843..9683803a 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -61,6 +61,7 @@ from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_le from letta.server.server import SyncServer from letta.services.lettuce import LettuceClient from letta.services.run_manager import RunManager +from letta.services.streaming_service import StreamingService from letta.settings import settings from letta.utils import is_1_0_sdk_version, safe_create_shielded_task, safe_create_task, truncate_file_visible_content from letta.validators import AgentId, BlockId, FileId, MessageId, SourceId, ToolId @@ -1326,25 +1327,78 @@ async def modify_message( "/{agent_id}/messages", response_model=LettaResponse, operation_id="send_message", + responses={ + 200: { + "description": "Successful response", + "content": { + "application/json": {"schema": {"$ref": "#/components/schemas/LettaResponse"}}, + "text/event-stream": {"description": "Server-Sent Events stream (when streaming=true in request body)"}, + }, + } + }, ) async def send_message( request_obj: Request, # FastAPI Request agent_id: AgentId, server: SyncServer = Depends(get_letta_server), - request: LettaRequest = Body(...), + request: LettaStreamingRequest = Body(...), headers: HeaderParams = Depends(get_headers), -): +) -> StreamingResponse | LettaResponse: """ Process a user message and return the agent's response. This endpoint accepts a message from a user and processes it through the agent. + + The response format is controlled by the `streaming` field in the request body: + - If `streaming=false` (default): Returns a complete LettaResponse with all messages + - If `streaming=true`: Returns a Server-Sent Events (SSE) stream + + Additional streaming options (only used when streaming=true): + - `stream_tokens`: Stream individual tokens instead of complete steps + - `include_pings`: Include keepalive pings to prevent connection timeouts + - `background`: Process the request in the background """ # After validation, messages should always be set (converted from input if needed) if not request.messages or len(request.messages) == 0: - raise ValueError("Messages must not be empty") - request_start_timestamp_ns = get_utc_timestamp_ns() - MetricRegistry().user_message_counter.add(1, get_ctx_attributes()) + raise HTTPException(status_code=422, detail="Messages must not be empty") + + # Validate streaming-specific options are only set when streaming=true + if not request.streaming: + errors = [] + + if request.stream_tokens is True: + errors.append("stream_tokens can only be true when streaming=true") + + if request.include_pings is False: + errors.append("include_pings can only be set to false when streaming=true") + + if request.background is True: + errors.append("background can only be true when streaming=true") + + if errors: + raise HTTPException( + status_code=422, + detail=f"Streaming options set without streaming enabled. {'; '.join(errors)}. " + "Either set streaming=true or use default values for streaming options.", + ) + + is_1_0_sdk = is_1_0_sdk_version(headers) + if request.streaming and not is_1_0_sdk: + raise HTTPException(status_code=422, detail="streaming=true is only supported for SDK v1.0+ clients.") actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + + if request.streaming and is_1_0_sdk: + streaming_service = StreamingService(server) + run, result = await streaming_service.create_agent_stream( + agent_id=agent_id, + actor=actor, + request=request, + run_type="send_message", + ) + return result + + request_start_timestamp_ns = get_utc_timestamp_ns() + MetricRegistry().user_message_counter.add(1, get_ctx_attributes()) # TODO: This is redundant, remove soon agent = await server.agent_manager.get_agent_by_id_async( agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"] @@ -1467,10 +1521,11 @@ async def send_message_streaming( This endpoint accepts a message from a user and processes it through the agent. It will stream the steps of the response always, and stream the tokens if 'stream_tokens' is set to True. """ - from letta.services.streaming_service import StreamingService - actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + # Since this is the dedicated streaming endpoint, ensure streaming is enabled + request.streaming = True + # use the streaming service for unified stream handling streaming_service = StreamingService(server)