docs: improve streaming documentation structure and examples (#2930)

This commit is contained in:
Cameron Pfiffer
2025-09-16 10:41:22 -07:00
committed by GitHub
parent 0905f3fb00
commit ed8cd365ed
6 changed files with 1735 additions and 122 deletions

View File

@@ -0,0 +1,84 @@
# Letta Streaming Examples
Minimal examples demonstrating Letta's streaming API in both Python and TypeScript.
## Setup
1. Set your Letta API key:
```bash
export LETTA_API_KEY="your_api_key_here"
```
2. Install dependencies:
```bash
# For TypeScript
npm install
# For Python
pip install letta-client
```
## Run Examples
### Python
```bash
python streaming_demo.py
```
### TypeScript
```bash
npm run demo:typescript
# or directly with tsx:
npx tsx streaming_demo.ts
```
## What These Examples Show
Both examples demonstrate:
1. **Step Streaming** (default) - Complete messages delivered as they're generated
2. **Token Streaming** - Partial chunks for real-time display (ChatGPT-like UX)
The key difference:
- Step streaming: Each event contains a complete message
- Token streaming: Multiple events per message, requiring reassembly by message ID
## Key Concepts
### Python
```python
# Step streaming (default)
stream = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[{"role": "user", "content": "Hello!"}]
)
# Token streaming
stream = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[{"role": "user", "content": "Hello!"}],
stream_tokens=True # Enable token streaming
)
```
### TypeScript
```typescript
// Step streaming (default)
const stream = await client.agents.messages.createStream(
agentId, {
messages: [{role: "user", content: "Hello!"}]
}
);
// Token streaming
const stream = await client.agents.messages.createStream(
agentId, {
messages: [{role: "user", content: "Hello!"}],
streamTokens: true // Enable token streaming
}
);
```
## Learn More
See the full documentation at [docs.letta.com/guides/agents/streaming](https://docs.letta.com/guides/agents/streaming)

1143
examples/streaming/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,19 @@
{
"name": "letta-streaming-examples",
"version": "1.0.0",
"description": "Examples demonstrating Letta's streaming API in Python and TypeScript",
"scripts": {
"demo:python": "python streaming_demo.py",
"demo:typescript": "tsx streaming_demo.ts",
"demo": "npm run demo:typescript"
},
"keywords": ["letta", "streaming", "ai", "agents"],
"author": "",
"license": "MIT",
"dependencies": {
"@letta-ai/letta-client": "^0.0.68646",
"@types/node": "^24.4.0",
"tsx": "^4.20.5",
"typescript": "^5.9.2"
}
}

View File

@@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
Minimal examples showing Letta's streaming API.
"""
import os
from typing import Dict, Any
from letta_client import Letta
def step_streaming_example(client: Letta, agent_id: str):
"""Step streaming: receive complete messages as they're generated."""
# Send a message with step streaming (default)
stream = client.agents.messages.create_stream(
agent_id=agent_id,
messages=[{
"role": "user",
"content": "Hi! My name is Alice. What's 2+2?"
}]
)
for chunk in stream:
# Each chunk is a complete message
if hasattr(chunk, 'message_type'):
if chunk.message_type == 'assistant_message':
print(chunk.content)
def token_streaming_example(client: Letta, agent_id: str):
"""Token streaming: receive partial chunks for real-time display."""
# Send a message with token streaming enabled
stream = client.agents.messages.create_stream(
agent_id=agent_id,
messages=[{
"role": "user",
"content": "What's my name? And tell me a short joke."
}],
stream_tokens=True # Enable token streaming
)
# Track messages by ID for reassembly
message_accumulators: Dict[str, str] = {}
for chunk in stream:
if hasattr(chunk, 'id') and chunk.message_type == 'assistant_message':
msg_id = chunk.id
# Initialize accumulator for new messages
if msg_id not in message_accumulators:
message_accumulators[msg_id] = ''
# Accumulate and print content
content_chunk = chunk.content or ''
message_accumulators[msg_id] += content_chunk
print(content_chunk, end="", flush=True)
print() # New line after streaming completes
def main():
# Check for API key
api_key = os.environ.get("LETTA_API_KEY")
if not api_key:
print("Please set LETTA_API_KEY environment variable")
return
# Initialize client
client = Letta(token=api_key)
# Create a test agent
agent = client.agents.create(
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
memory_blocks=[
{
"label": "human",
"value": "The user is exploring streaming capabilities."
},
{
"label": "persona",
"value": "I am a helpful assistant demonstrating streaming responses."
}
]
)
try:
# Example 1: Step Streaming (default)
print("\nStep Streaming (complete messages):")
step_streaming_example(client, agent.id)
# Example 2: Token Streaming
print("\nToken Streaming (real-time chunks):")
token_streaming_example(client, agent.id)
finally:
# Clean up
client.agents.delete(agent.id)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env tsx
/**
* Minimal TypeScript examples showing Letta's streaming API.
* Demonstrates both step streaming (default) and token streaming modes.
*/
import { LettaClient } from '@letta-ai/letta-client';
import type { LettaMessage } from '@letta-ai/letta-client/api/types';
async function stepStreamingExample(client: LettaClient, agentId: string): Promise<void> {
console.log('\nStep Streaming (complete messages):');
// Send a message with step streaming (default)
const stream = await client.agents.messages.createStream(
agentId, {
messages: [{role: "user", content: "Hi! My name is Alice. What's 2+2?"}]
}
);
for await (const chunk of stream as AsyncIterable<LettaMessage>) {
// Each chunk is a complete message
if (chunk.messageType === 'assistant_message') {
console.log((chunk as any).content);
}
}
}
async function tokenStreamingExample(client: LettaClient, agentId: string): Promise<void> {
console.log('\nToken Streaming (real-time chunks):');
// Send a message with token streaming enabled
const stream = await client.agents.messages.createStream(
agentId, {
messages: [{role: "user", content: "What's my name? And tell me a short joke."}],
streamTokens: true // Enable token streaming
}
);
// Track messages by ID for reassembly
const messageAccumulators = new Map<string, string>();
for await (const chunk of stream as AsyncIterable<LettaMessage>) {
if (chunk.id && chunk.messageType === 'assistant_message') {
const msgId = chunk.id;
// Initialize accumulator for new messages
if (!messageAccumulators.has(msgId)) {
messageAccumulators.set(msgId, '');
}
// Accumulate and print content
const contentChunk = (chunk as any).content || '';
messageAccumulators.set(msgId, messageAccumulators.get(msgId)! + contentChunk);
process.stdout.write(contentChunk);
}
}
console.log(); // New line after streaming completes
}
async function main(): Promise<void> {
// Check for API key
const apiKey = process.env.LETTA_API_KEY;
if (!apiKey) {
console.error('Please set LETTA_API_KEY environment variable');
process.exit(1);
}
// Initialize client
const client = new LettaClient({ token: apiKey });
// Create a test agent
const agent = await client.agents.create({
model: "openai/gpt-4o-mini",
embedding: "openai/text-embedding-3-small",
memoryBlocks: [
{
label: "human",
value: "The user is exploring streaming capabilities."
},
{
label: "persona",
value: "I am a helpful assistant demonstrating streaming responses."
}
]
});
try {
// Example 1: Step Streaming (default)
await stepStreamingExample(client, agent.id);
// Example 2: Token Streaming
await tokenStreamingExample(client, agent.id);
} finally {
// Clean up
await client.agents.delete(agent.id);
}
}
// Run the example
main().catch(console.error);

View File

@@ -6,162 +6,326 @@ slug: guides/agents/streaming
Messages from the **Letta server** can be **streamed** to the client.
If you're building a UI on the Letta API, enabling streaming allows your UI to update in real-time as the agent generates a response to an input message.
There are two kinds of streaming you can enable: **streaming agent steps** and **streaming tokens**.
To enable streaming (either mode), you need to use the [`/v1/agent/messages/stream`](/api-reference/agents/messages/stream) API route instead of the [`/v1/agent/messages`](/api-reference/agents/messages) API route.
<Warning>
When working with agents that execute long-running operations (e.g., complex tool calls, extensive searches, or code execution), you may encounter timeouts with the message routes.
See our [tips on handling long-running tasks](/guides/agents/long-running) for more info.
</Warning>
## Streaming agent steps
## Quick Start
When you send a message to the Letta server, the agent may run multiple steps while generating a response.
For example, an agent may run a search query, then use the results of that query to generate a response.
Letta supports two streaming modes: **step streaming** (default) and **token streaming**.
To enable streaming, use the [`/v1/agents/{agent_id}/messages/stream`](/api-reference/agents/messages/stream) endpoint instead of `/messages`:
When you use the `/messages/stream` route, `stream_steps` is enabled by default, and the response to the `POST` request will stream back as server-sent events (read more about SSE format [here](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)):
<CodeGroup>
```curl curl
curl --request POST \
--url http://localhost:8283/v1/agents/$AGENT_ID/messages/stream \
--header 'Content-Type: application/json' \
--data '{
"messages": [
{
"role": "user",
"content": "hows it going????"
}
]
}'
```
```python title="python" maxLines=50
# send a message to the agent (streaming steps)
```python title="python"
# Step streaming (default) - returns complete messages
stream = client.agents.messages.create_stream(
agent_id=agent_state.id,
messages=[
{
"role": "user",
"content": "hows it going????"
}
],
agent_id=agent.id,
messages=[{"role": "user", "content": "Hello!"}]
)
# print the chunks coming back
for chunk in stream:
print(chunk)
print(chunk) # Complete message objects
# Token streaming - returns partial chunks for real-time UX
stream = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[{"role": "user", "content": "Hello!"}],
stream_tokens=True # Enable token streaming
)
for chunk in stream:
print(chunk) # Partial content chunks
```
```typescript maxLines=50 title="node.js"
// send a message to the agent (streaming steps)
const stream = await client.agents.messages.create_stream(
agentState.id, {
messages: [
{
role: "user",
content: "hows it going????"
}
]
```typescript title="typescript"
import { LettaClient } from '@letta-ai/letta-client';
const client = new LettaClient({ token: 'YOUR_API_KEY' });
// Step streaming (default) - returns complete messages
const stream = await client.agents.messages.createStream(
agent.id, {
messages: [{role: "user", content: "Hello!"}]
}
);
// print the chunks coming back
for await (const chunk of stream) {
console.log(chunk);
};
console.log(chunk); // Complete message objects
}
// Token streaming - returns partial chunks for real-time UX
const tokenStream = await client.agents.messages.createStream(
agent.id, {
messages: [{role: "user", content: "Hello!"}],
streamTokens: true // Enable token streaming
}
);
for await (const chunk of tokenStream) {
console.log(chunk); // Partial content chunks
}
```
</CodeGroup>
```json maxLines=50
data: {"id":"...","date":"...","message_type":"reasoning_message","reasoning":"User keeps asking the same question; maybe it's part of their style or humor. I\u2019ll respond warmly and play along."}
## Streaming Modes Comparison
data: {"id":"...","date":"...","message_type":"assistant_message","assistant_message":"Hey! It\u2019s going well! Still here, ready to chat. How about you? Anything exciting happening?"}
| Aspect | Step Streaming (default) | Token Streaming |
|--------|-------------------------|-----------------|
| **What you get** | Complete messages after each step | Partial chunks as tokens generate |
| **When to use** | Simple implementation | ChatGPT-like real-time UX |
| **Reassembly needed** | No | Yes (by message ID) |
| **Message IDs** | Unique per message | Same ID across chunks |
| **Content format** | Full text in each message | Incremental text pieces |
| **Enable with** | Default behavior | `stream_tokens: true` |
data: {"message_type":"usage_statistics","completion_tokens":65,"prompt_tokens":2329,"total_tokens":2394,"step_count":1}
## Understanding Message Flow
data: [DONE]
```
### Message Types and Flow Patterns
## Streaming tokens
The messages you receive depend on your agent's configuration:
You can also stream chunks of tokens from the agent as they are generated by the underlying LLM process by setting `stream_tokens` to `true` in your API request:
<CodeGroup>
```curl curl
curl --request POST \
--url http://localhost:8283/v1/agents/$AGENT_ID/messages/stream \
--header 'Content-Type: application/json' \
--data '{
"messages": [
{
"role": "user",
"content": "hows it going????"
}
],
"stream_tokens": true
}'
```
```python title="python" maxLines=50
# send a message to the agent (streaming steps)
stream = client.agents.messages.create_stream(
agent_id=agent_state.id,
messages=[
{
"role": "user",
"content": "hows it going????"
}
],
stream_tokens=True,
**With reasoning enabled (default):**
- Simple response: `reasoning_message` → `assistant_message`
- With tool use: `reasoning_message` → `tool_call_message` → `tool_return_message` → `reasoning_message` → `assistant_message`
**With reasoning disabled (`reasoning=false`):**
- Simple response: `assistant_message`
- With tool use: `tool_call_message` → `tool_return_message` → `assistant_message`
### Message Type Reference
- **`reasoning_message`**: Agent's internal thinking process (only when `reasoning=true`)
- **`assistant_message`**: The actual response shown to the user
- **`tool_call_message`**: Request to execute a tool
- **`tool_return_message`**: Result from tool execution
- **`stop_reason`**: Indicates end of response (`end_turn`)
- **`usage_statistics`**: Token usage and step count metrics
### Controlling Reasoning Messages
```python
# With reasoning (default) - includes reasoning_message events
agent = client.agents.create(
model="openai/gpt-4o-mini",
# reasoning=True is the default
)
# print the chunks coming back
for chunk in stream:
print(chunk)
# Without reasoning - no reasoning_message events
agent = client.agents.create(
model="openai/gpt-4o-mini",
reasoning=False # Disable reasoning messages
)
```
```typescript maxLines=50 title="node.js"
// send a message to the agent (streaming steps)
const stream = await client.agents.messages.create_stream(
agentState.id, {
messages: [
{
role: "user",
content: "hows it going????"
}
],
streamTokens: true
## Step Streaming (Default)
Step streaming delivers **complete messages** after each agent step completes. This is the default behavior when you use the streaming endpoint.
### How It Works
1. Agent processes your request through steps (reasoning, tool calls, generating responses)
2. After each step completes, you receive a complete `LettaMessage` via SSE
3. Each message can be processed immediately without reassembly
### Example
<CodeGroup>
```python title="python"
stream = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[{"role": "user", "content": "What's 2+2?"}]
)
for chunk in stream:
if hasattr(chunk, 'message_type'):
if chunk.message_type == 'reasoning_message':
print(f"Thinking: {chunk.reasoning}")
elif chunk.message_type == 'assistant_message':
print(f"Response: {chunk.content}")
```
```typescript title="typescript"
import { LettaClient } from '@letta-ai/letta-client';
import type { LettaMessage } from '@letta-ai/letta-client/api/types';
const client = new LettaClient({ token: 'YOUR_API_KEY' });
const stream = await client.agents.messages.createStream(
agent.id, {
messages: [{role: "user", content: "What's 2+2?"}]
}
);
// print the chunks coming back
for await (const chunk of stream) {
console.log(chunk);
};
for await (const chunk of stream as AsyncIterable<LettaMessage>) {
if (chunk.messageType === 'reasoning_message') {
console.log(`Thinking: ${(chunk as any).reasoning}`);
} else if (chunk.messageType === 'assistant_message') {
console.log(`Response: ${(chunk as any).content}`);
}
}
```
```bash title="curl"
curl -N --request POST \
--url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \
--header "Authorization: Bearer $LETTA_API_KEY" \
--header 'Content-Type: application/json' \
--data '{"messages": [{"role": "user", "content": "What is 2+2?"}]}'
# For self-hosted: Replace https://api.letta.com with http://localhost:8283
```
</CodeGroup>
With token streaming enabled, the response will look very similar to the prior example (agent steps streaming), but instead of receiving complete messages, the client receives multiple messages with chunks of the response.
The client is responsible for reassembling the response from the chunks.
We've ommited most of the chunks for brevity:
```sh
data: {"id":"...","date":"...","message_type":"reasoning_message","reasoning":"It's"}
data: {"id":"...","date":"...","message_type":"reasoning_message","reasoning":" interesting"}
... chunks ommited
data: {"id":"...","date":"...","message_type":"reasoning_message","reasoning":"!"}
data: {"id":"...","date":"...","message_type":"assistant_message","assistant_message":"Well"}
... chunks ommited
data: {"id":"...","date":"...","message_type":"assistant_message","assistant_message":"."}
data: {"message_type":"usage_statistics","completion_tokens":50,"prompt_tokens":2771,"total_tokens":2821,"step_count":1}
### Example Output
```
data: {"id":"msg-123","message_type":"reasoning_message","reasoning":"User is asking a simple math question."}
data: {"id":"msg-456","message_type":"assistant_message","content":"2 + 2 equals 4!"}
data: {"message_type":"stop_reason","stop_reason":"end_turn"}
data: {"message_type":"usage_statistics","completion_tokens":50,"total_tokens":2821}
data: [DONE]
```
## Tips on handling streaming in your client code
The data structure for token streaming is the same as for agent steps streaming (`LettaMessage`) - just instead of returning complete messages, the Letta server will return multiple messages each with a chunk of the response.
Because the format of the data looks the same, if you write your frontend code to handle tokens streaming, it will also work for agent steps streaming.
## Token Streaming
For example, if the Letta server is connected to multiple LLM backend providers and only a subset of them support LLM token streaming, you can use the same frontend code (interacting with the Letta API) to handle both streaming and non-streaming providers.
If you send a message to an agent with streaming enabled (`stream_tokens` are `true`), the server will stream back `LettaMessage` objects with chunks if the selected LLM provider supports token streaming, and `LettaMessage` objects with complete strings if the selected LLM provider does not support token streaming.
Token streaming provides **partial content chunks** as they're generated by the LLM, enabling a ChatGPT-like experience where text appears character by character.
### How It Works
1. Set `stream_tokens: true` in your request
2. Receive multiple chunks with the **same message ID**
3. Each chunk contains a piece of the content
4. Client must accumulate chunks by ID to rebuild complete messages
### Example with Reassembly
<CodeGroup>
```python title="python"
# Token streaming with reassembly
message_accumulators = {}
stream = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[{"role": "user", "content": "Tell me a joke"}],
stream_tokens=True
)
for chunk in stream:
if hasattr(chunk, 'id') and hasattr(chunk, 'message_type'):
msg_id = chunk.id
msg_type = chunk.message_type
# Initialize accumulator for new messages
if msg_id not in message_accumulators:
message_accumulators[msg_id] = {
'type': msg_type,
'content': ''
}
# Accumulate content
if msg_type == 'reasoning_message':
message_accumulators[msg_id]['content'] += chunk.reasoning
elif msg_type == 'assistant_message':
message_accumulators[msg_id]['content'] += chunk.content
# Display accumulated content in real-time
print(message_accumulators[msg_id]['content'], end='', flush=True)
```
```typescript title="typescript"
import { LettaClient } from '@letta-ai/letta-client';
import type { LettaMessage } from '@letta-ai/letta-client/api/types';
const client = new LettaClient({ token: 'YOUR_API_KEY' });
// Token streaming with reassembly
interface MessageAccumulator {
type: string;
content: string;
}
const messageAccumulators = new Map<string, MessageAccumulator>();
const stream = await client.agents.messages.createStream(
agent.id, {
messages: [{role: "user", content: "Tell me a joke"}],
streamTokens: true // Note: camelCase
}
);
for await (const chunk of stream as AsyncIterable<LettaMessage>) {
if (chunk.id && chunk.messageType) {
const msgId = chunk.id;
const msgType = chunk.messageType;
// Initialize accumulator for new messages
if (!messageAccumulators.has(msgId)) {
messageAccumulators.set(msgId, {
type: msgType,
content: ''
});
}
// Accumulate content based on message type
const acc = messageAccumulators.get(msgId)!;
// Only accumulate if the type matches (in case types share IDs)
if (acc.type === msgType) {
if (msgType === 'reasoning_message') {
acc.content += (chunk as any).reasoning || '';
} else if (msgType === 'assistant_message') {
acc.content += (chunk as any).content || '';
}
}
// Update UI with accumulated content
process.stdout.write(acc.content);
}
}
```
```bash title="curl"
curl -N --request POST \
--url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \
--header "Authorization: Bearer $LETTA_API_KEY" \
--header 'Content-Type: application/json' \
--data '{
"messages": [{"role": "user", "content": "Tell me a joke"}],
"stream_tokens": true
}'
```
</CodeGroup>
### Example Output
```
# Same ID across chunks of the same message
data: {"id":"msg-abc","message_type":"assistant_message","content":"Why"}
data: {"id":"msg-abc","message_type":"assistant_message","content":" did"}
data: {"id":"msg-abc","message_type":"assistant_message","content":" the"}
data: {"id":"msg-abc","message_type":"assistant_message","content":" scarecrow"}
data: {"id":"msg-abc","message_type":"assistant_message","content":" win"}
# ... more chunks with same ID
data: [DONE]
```
## Implementation Tips
### Universal Handling Pattern
The accumulator pattern shown above works for **both** streaming modes:
- **Step streaming**: Each message is complete (single chunk per ID)
- **Token streaming**: Multiple chunks per ID need accumulation
This means you can write your client code once to handle both cases.
### SSE Format Notes
All streaming responses follow the Server-Sent Events (SSE) format:
- Each event starts with `data: ` followed by JSON
- Stream ends with `data: [DONE]`
- Empty lines separate events
Learn more about SSE format [here](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events).
### Handling Different LLM Providers
If your Letta server connects to multiple LLM providers, some may not support token streaming. Your client code will still work - the server will fall back to step streaming automatically when token streaming isn't available.