feat: enable streaming flag on send message [LET-6100] (#6040)

* base

* base

* update

* stainless

* final

---------

Co-authored-by: Letta Bot <noreply@letta.com>
This commit is contained in:
jnjpng
2025-11-07 13:43:28 -08:00
committed by Caren Thomas
parent 789f6cf9cd
commit edbff34d63
3 changed files with 84 additions and 16 deletions

View File

@@ -7272,7 +7272,7 @@
"post": {
"tags": ["agents"],
"summary": "Send Message",
"description": "Process a user message and return the agent's response.\nThis endpoint accepts a message from a user and processes it through the agent.",
"description": "Process a user message and return the agent's response.\nThis endpoint accepts a message from a user and processes it through the agent.\n\nThe response format is controlled by the `streaming` field in the request body:\n- If `streaming=false` (default): Returns a complete LettaResponse with all messages\n- If `streaming=true`: Returns a Server-Sent Events (SSE) stream\n\nAdditional streaming options (only used when streaming=true):\n- `stream_tokens`: Stream individual tokens instead of complete steps\n- `include_pings`: Include keepalive pings to prevent connection timeouts\n- `background`: Process the request in the background",
"operationId": "send_message",
"parameters": [
{
@@ -7296,19 +7296,22 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/LettaRequest"
"$ref": "#/components/schemas/LettaStreamingRequest"
}
}
}
},
"responses": {
"200": {
"description": "Successful Response",
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/LettaResponse"
}
},
"text/event-stream": {
"description": "Server-Sent Events stream (when streaming=true in request body)"
}
}
},
@@ -28595,22 +28598,28 @@
"default": true,
"deprecated": true
},
"streaming": {
"type": "boolean",
"title": "Streaming",
"description": "If True, returns a streaming response (Server-Sent Events). If False (default), returns a complete response.",
"default": false
},
"stream_tokens": {
"type": "boolean",
"title": "Stream Tokens",
"description": "Flag to determine if individual tokens should be streamed, rather than streaming per step.",
"description": "Flag to determine if individual tokens should be streamed, rather than streaming per step (only used when streaming=true).",
"default": false
},
"include_pings": {
"type": "boolean",
"title": "Include Pings",
"description": "Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts.",
"description": "Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts (only used when streaming=true).",
"default": true
},
"background": {
"type": "boolean",
"title": "Background",
"description": "Whether to process the request in the background.",
"description": "Whether to process the request in the background (only used when streaming=true).",
"default": false
}
},

View File

@@ -80,17 +80,21 @@ class LettaRequest(BaseModel):
class LettaStreamingRequest(LettaRequest):
streaming: bool = Field(
default=False,
description="If True, returns a streaming response (Server-Sent Events). If False (default), returns a complete response.",
)
stream_tokens: bool = Field(
default=False,
description="Flag to determine if individual tokens should be streamed, rather than streaming per step.",
description="Flag to determine if individual tokens should be streamed, rather than streaming per step (only used when streaming=true).",
)
include_pings: bool = Field(
default=True,
description="Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts.",
description="Whether to include periodic keepalive ping messages in the stream to prevent connection timeouts (only used when streaming=true).",
)
background: bool = Field(
default=False,
description="Whether to process the request in the background.",
description="Whether to process the request in the background (only used when streaming=true).",
)

View File

@@ -61,6 +61,7 @@ from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_le
from letta.server.server import SyncServer
from letta.services.lettuce import LettuceClient
from letta.services.run_manager import RunManager
from letta.services.streaming_service import StreamingService
from letta.settings import settings
from letta.utils import is_1_0_sdk_version, safe_create_shielded_task, safe_create_task, truncate_file_visible_content
from letta.validators import AgentId, BlockId, FileId, MessageId, SourceId, ToolId
@@ -1326,25 +1327,78 @@ async def modify_message(
"/{agent_id}/messages",
response_model=LettaResponse,
operation_id="send_message",
responses={
200: {
"description": "Successful response",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/LettaResponse"}},
"text/event-stream": {"description": "Server-Sent Events stream (when streaming=true in request body)"},
},
}
},
)
async def send_message(
request_obj: Request, # FastAPI Request
agent_id: AgentId,
server: SyncServer = Depends(get_letta_server),
request: LettaRequest = Body(...),
request: LettaStreamingRequest = Body(...),
headers: HeaderParams = Depends(get_headers),
):
) -> StreamingResponse | LettaResponse:
"""
Process a user message and return the agent's response.
This endpoint accepts a message from a user and processes it through the agent.
The response format is controlled by the `streaming` field in the request body:
- If `streaming=false` (default): Returns a complete LettaResponse with all messages
- If `streaming=true`: Returns a Server-Sent Events (SSE) stream
Additional streaming options (only used when streaming=true):
- `stream_tokens`: Stream individual tokens instead of complete steps
- `include_pings`: Include keepalive pings to prevent connection timeouts
- `background`: Process the request in the background
"""
# After validation, messages should always be set (converted from input if needed)
if not request.messages or len(request.messages) == 0:
raise ValueError("Messages must not be empty")
request_start_timestamp_ns = get_utc_timestamp_ns()
MetricRegistry().user_message_counter.add(1, get_ctx_attributes())
raise HTTPException(status_code=422, detail="Messages must not be empty")
# Validate streaming-specific options are only set when streaming=true
if not request.streaming:
errors = []
if request.stream_tokens is True:
errors.append("stream_tokens can only be true when streaming=true")
if request.include_pings is False:
errors.append("include_pings can only be set to false when streaming=true")
if request.background is True:
errors.append("background can only be true when streaming=true")
if errors:
raise HTTPException(
status_code=422,
detail=f"Streaming options set without streaming enabled. {'; '.join(errors)}. "
"Either set streaming=true or use default values for streaming options.",
)
is_1_0_sdk = is_1_0_sdk_version(headers)
if request.streaming and not is_1_0_sdk:
raise HTTPException(status_code=422, detail="streaming=true is only supported for SDK v1.0+ clients.")
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
if request.streaming and is_1_0_sdk:
streaming_service = StreamingService(server)
run, result = await streaming_service.create_agent_stream(
agent_id=agent_id,
actor=actor,
request=request,
run_type="send_message",
)
return result
request_start_timestamp_ns = get_utc_timestamp_ns()
MetricRegistry().user_message_counter.add(1, get_ctx_attributes())
# TODO: This is redundant, remove soon
agent = await server.agent_manager.get_agent_by_id_async(
agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"]
@@ -1467,10 +1521,11 @@ async def send_message_streaming(
This endpoint accepts a message from a user and processes it through the agent.
It will stream the steps of the response always, and stream the tokens if 'stream_tokens' is set to True.
"""
from letta.services.streaming_service import StreamingService
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Since this is the dedicated streaming endpoint, ensure streaming is enabled
request.streaming = True
# use the streaming service for unified stream handling
streaming_service = StreamingService(server)