From 6c415b27f8774d39f90466aa16e7844d30fb278d Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Thu, 22 Jan 2026 18:18:46 -0800 Subject: [PATCH] feat: add non-streaming option for conversation messages (#9044) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add non-streaming option for conversation messages - Add ConversationMessageRequest with stream=True default (backwards compatible) - stream=true (default): SSE streaming via StreamingService - stream=false: JSON response via AgentLoop.load().step() 🤖 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * chore: regenerate API schema for ConversationMessageRequest --------- Co-authored-by: Letta --- fern/openapi.json | 194 +++++++++++++++++- letta/schemas/letta_request.py | 21 ++ .../rest_api/routers/v1/conversations.py | 77 +++++-- 3 files changed, 270 insertions(+), 22 deletions(-) diff --git a/fern/openapi.json b/fern/openapi.json index 855976dc..b9def2e2 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -8798,7 +8798,7 @@ "post": { "tags": ["conversations"], "summary": "Send Conversation Message", - "description": "Send a message to a conversation and get a streaming response.\n\nThis endpoint sends a message to an existing conversation and streams\nthe agent's response back.", + "description": "Send a message to a conversation and get a response.\n\nThis endpoint sends a message to an existing conversation.\nBy default (stream=true), returns a streaming response (Server-Sent Events).\nSet stream=false to get a complete JSON response.", "operationId": "send_conversation_message", "parameters": [ { @@ -8822,7 +8822,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/LettaStreamingRequest" + "$ref": "#/components/schemas/ConversationMessageRequest" } } } @@ -8834,10 +8834,11 @@ "application/json": { "schema": { "$ref": "#/components/schemas/LettaStreamingResponse" - } + }, + "description": "JSON response (when stream=false)" }, "text/event-stream": { - "description": "Server-Sent Events stream" + "description": "Server-Sent Events stream (default, when stream=true)" } } }, @@ -29849,6 +29850,191 @@ "title": "Conversation", "description": "Represents a conversation on an agent for concurrent messaging." }, + "ConversationMessageRequest": { + "properties": { + "messages": { + "anyOf": [ + { + "items": { + "anyOf": [ + { + "$ref": "#/components/schemas/MessageCreate" + }, + { + "$ref": "#/components/schemas/ApprovalCreate" + } + ] + }, + "type": "array" + }, + { + "type": "null" + } + ], + "title": "Messages", + "description": "The messages to be sent to the agent." + }, + "input": { + "anyOf": [ + { + "type": "string" + }, + { + "items": { + "oneOf": [ + { + "$ref": "#/components/schemas/TextContent" + }, + { + "$ref": "#/components/schemas/ImageContent" + }, + { + "$ref": "#/components/schemas/ToolCallContent" + }, + { + "$ref": "#/components/schemas/ToolReturnContent" + }, + { + "$ref": "#/components/schemas/ReasoningContent" + }, + { + "$ref": "#/components/schemas/RedactedReasoningContent" + }, + { + "$ref": "#/components/schemas/OmittedReasoningContent" + }, + { + "$ref": "#/components/schemas/SummarizedReasoningContent" + } + ], + "discriminator": { + "propertyName": "type", + "mapping": { + "image": "#/components/schemas/ImageContent", + "omitted_reasoning": "#/components/schemas/OmittedReasoningContent", + "reasoning": "#/components/schemas/ReasoningContent", + "redacted_reasoning": "#/components/schemas/RedactedReasoningContent", + "summarized_reasoning": "#/components/schemas/SummarizedReasoningContent", + "text": "#/components/schemas/TextContent", + "tool_call": "#/components/schemas/ToolCallContent", + "tool_return": "#/components/schemas/ToolReturnContent" + } + } + }, + "type": "array" + }, + { + "type": "null" + } + ], + "title": "Input", + "description": "Syntactic sugar for a single user message. Equivalent to messages=[{'role': 'user', 'content': input}]." + }, + "max_steps": { + "type": "integer", + "title": "Max Steps", + "description": "Maximum number of steps the agent should take to process the request.", + "default": 50 + }, + "use_assistant_message": { + "type": "boolean", + "title": "Use Assistant Message", + "description": "Whether the server should parse specific tool call arguments (default `send_message`) as `AssistantMessage` objects. Still supported for legacy agent types, but deprecated for letta_v1_agent onward.", + "default": true, + "deprecated": true + }, + "assistant_message_tool_name": { + "type": "string", + "title": "Assistant Message Tool Name", + "description": "The name of the designated message tool. Still supported for legacy agent types, but deprecated for letta_v1_agent onward.", + "default": "send_message", + "deprecated": true + }, + "assistant_message_tool_kwarg": { + "type": "string", + "title": "Assistant Message Tool Kwarg", + "description": "The name of the message argument in the designated message tool. Still supported for legacy agent types, but deprecated for letta_v1_agent onward.", + "default": "message", + "deprecated": true + }, + "include_return_message_types": { + "anyOf": [ + { + "items": { + "$ref": "#/components/schemas/MessageType" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "title": "Include Return Message Types", + "description": "Only return specified message types in the response. If `None` (default) returns all messages." + }, + "enable_thinking": { + "type": "string", + "title": "Enable Thinking", + "description": "If set to True, enables reasoning before responses or tool calls from the agent.", + "default": true, + "deprecated": true + }, + "client_tools": { + "anyOf": [ + { + "items": { + "$ref": "#/components/schemas/ClientToolSchema" + }, + "type": "array" + }, + { + "type": "null" + } + ], + "title": "Client Tools", + "description": "Client-side tools that the agent can call. When the agent calls a client-side tool, execution pauses and returns control to the client to execute the tool and provide the result via a ToolReturn." + }, + "override_model": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Override Model", + "description": "Model handle to use for this request instead of the agent's default model. This allows sending a message to a different model without changing the agent's configuration." + }, + "stream": { + "type": "boolean", + "title": "Stream", + "description": "If True (default), returns a streaming response (Server-Sent Events). If False, returns a complete JSON response.", + "default": true + }, + "stream_tokens": { + "type": "boolean", + "title": "Stream Tokens", + "description": "Flag to determine if individual tokens should be streamed, rather than streaming per step (only used when stream=true).", + "default": false + }, + "include_pings": { + "type": "boolean", + "title": "Include Pings", + "description": "Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts (only used when stream=true).", + "default": true + }, + "background": { + "type": "boolean", + "title": "Background", + "description": "Whether to process the request in the background (only used when stream=true).", + "default": false + } + }, + "type": "object", + "title": "ConversationMessageRequest", + "description": "Request for sending messages to a conversation. Streams by default." + }, "CoreMemoryBlockSchema": { "properties": { "created_at": { diff --git a/letta/schemas/letta_request.py b/letta/schemas/letta_request.py index 69c245b1..ba261e16 100644 --- a/letta/schemas/letta_request.py +++ b/letta/schemas/letta_request.py @@ -125,6 +125,27 @@ class LettaStreamingRequest(LettaRequest): ) +class ConversationMessageRequest(LettaRequest): + """Request for sending messages to a conversation. Streams by default.""" + + stream: bool = Field( + default=True, + description="If True (default), returns a streaming response (Server-Sent Events). If False, returns a complete JSON response.", + ) + stream_tokens: bool = Field( + default=False, + description="Flag to determine if individual tokens should be streamed, rather than streaming per step (only used when stream=true).", + ) + include_pings: bool = Field( + default=True, + description="Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts (only used when stream=true).", + ) + background: bool = Field( + default=False, + description="Whether to process the request in the background (only used when stream=true).", + ) + + class LettaAsyncRequest(LettaRequest): callback_url: Optional[str] = Field(None, description="Optional callback URL to POST to when the job completes") diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index 691e89af..40951247 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -5,6 +5,7 @@ from fastapi import APIRouter, Body, Depends, HTTPException, Query, status from pydantic import BaseModel, Field from starlette.responses import StreamingResponse +from letta.agents.agent_loop import AgentLoop from letta.agents.letta_agent_v3 import LettaAgentV3 from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client from letta.errors import LettaExpiredError, LettaInvalidArgumentError, NoActiveRunsToCancelError @@ -13,7 +14,7 @@ from letta.log import get_logger from letta.schemas.conversation import Conversation, CreateConversation, UpdateConversation from letta.schemas.enums import RunStatus from letta.schemas.letta_message import LettaMessageUnion -from letta.schemas.letta_request import LettaStreamingRequest, RetrieveStreamRequest +from letta.schemas.letta_request import ConversationMessageRequest, LettaStreamingRequest, RetrieveStreamRequest from letta.schemas.letta_response import LettaResponse, LettaStreamingResponse from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server from letta.server.rest_api.redis_stream_manager import redis_sse_stream_generator @@ -160,45 +161,85 @@ async def list_conversation_messages( 200: { "description": "Successful response", "content": { - "text/event-stream": {"description": "Server-Sent Events stream"}, + "text/event-stream": {"description": "Server-Sent Events stream (default, when stream=true)"}, + "application/json": {"description": "JSON response (when stream=false)"}, }, } }, ) async def send_conversation_message( conversation_id: ConversationId, - request: LettaStreamingRequest = Body(...), + request: ConversationMessageRequest = Body(...), server: SyncServer = Depends(get_letta_server), headers: HeaderParams = Depends(get_headers), ) -> StreamingResponse | LettaResponse: """ - Send a message to a conversation and get a streaming response. + Send a message to a conversation and get a response. - This endpoint sends a message to an existing conversation and streams - the agent's response back. + This endpoint sends a message to an existing conversation. + By default (stream=true), returns a streaming response (Server-Sent Events). + Set stream=false to get a complete JSON response. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - # Get the conversation to find the agent_id + if not request.messages or len(request.messages) == 0: + raise HTTPException(status_code=422, detail="Messages must not be empty") + conversation = await conversation_manager.get_conversation_by_id( conversation_id=conversation_id, actor=actor, ) - # Force streaming mode for this endpoint - request.streaming = True + # Streaming mode (default) + if request.stream: + # Convert to LettaStreamingRequest for StreamingService compatibility + streaming_request = LettaStreamingRequest( + messages=request.messages, + streaming=True, + stream_tokens=request.stream_tokens, + include_pings=request.include_pings, + background=request.background, + max_steps=request.max_steps, + use_assistant_message=request.use_assistant_message, + assistant_message_tool_name=request.assistant_message_tool_name, + assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, + include_return_message_types=request.include_return_message_types, + override_model=request.override_model, + client_tools=request.client_tools, + ) + streaming_service = StreamingService(server) + run, result = await streaming_service.create_agent_stream( + agent_id=conversation.agent_id, + actor=actor, + request=streaming_request, + run_type="send_conversation_message", + conversation_id=conversation_id, + ) + return result - # Use streaming service - streaming_service = StreamingService(server) - run, result = await streaming_service.create_agent_stream( - agent_id=conversation.agent_id, - actor=actor, - request=request, - run_type="send_conversation_message", - conversation_id=conversation_id, + # Non-streaming mode + agent = await server.agent_manager.get_agent_by_id_async( + conversation.agent_id, + actor, + include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools", "tags"], ) - return result + if request.override_model: + override_llm_config = await server.get_llm_config_from_handle_async( + actor=actor, + handle=request.override_model, + ) + agent = agent.model_copy(update={"llm_config": override_llm_config}) + + agent_loop = AgentLoop.load(agent_state=agent, actor=actor) + return await agent_loop.step( + request.messages, + max_steps=request.max_steps, + use_assistant_message=request.use_assistant_message, + include_return_message_types=request.include_return_message_types, + client_tools=request.client_tools, + conversation_id=conversation_id, + ) @router.post(