diff --git a/fern/changelog/2025-03-02.mdx b/fern/changelog/2025-03-02.mdx new file mode 100644 index 00000000..5e6e01ab --- /dev/null +++ b/fern/changelog/2025-03-02.mdx @@ -0,0 +1,29 @@ +## Added List Run Steps API + +We've introduced a new API endpoint that allows you to list all steps associated with a specific run. This feature makes it easier to track and analyze the sequence of steps performed during a run. + + +```python title="python" +from letta_client import Letta +client = Letta( + token="YOUR_API_KEY", +) +steps = client.runs.list_run_steps( + run_id="RUN_ID", +) +for step in steps: + print(f"Step ID: {step.id}, Tokens: {step.total_tokens}") +``` +```typescript title="node.js" +import { LettaClient } from '@letta-ai/letta-client'; +const client = new LettaClient({ + token: "YOUR_API_KEY", +}); +const steps = await client.runs.steps.list({ + run_id: "RUN_ID", +}); +steps.forEach(step => { + console.log(`Step ID: ${step.id}, Tokens: ${step.total_tokens}`); +}); +``` + diff --git a/fern/openapi.json b/fern/openapi.json index 78d043e3..30abc965 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -13976,9 +13976,9 @@ "/v1/runs/{run_id}/messages": { "get": { "tags": ["runs"], - "summary": "List Run Messages", + "summary": "List Messages For Run", "description": "Get response messages associated with a run.", - "operationId": "list_run_messages", + "operationId": "list_messages_for_run", "parameters": [ { "name": "run_id", @@ -14081,7 +14081,7 @@ "items": { "$ref": "#/components/schemas/LettaMessageUnion" }, - "title": "Response List Run Messages" + "title": "Response List Messages For Run" } } } @@ -14102,9 +14102,9 @@ "/v1/runs/{run_id}/usage": { "get": { "tags": ["runs"], - "summary": "Retrieve Run Usage", + "summary": "Retrieve Usage For Run", "description": "Get usage statistics for a run.", - "operationId": "retrieve_run_usage", + "operationId": "retrieve_usage_for_run", "parameters": [ { "name": "run_id", @@ -14184,9 +14184,9 @@ "/v1/runs/{run_id}/steps": { "get": { "tags": ["runs"], - "summary": "List Run Steps", + "summary": "List Steps For Run", "description": "Get steps associated with a run with filtering options.", - "operationId": "list_run_steps", + "operationId": "list_steps_for_run", "parameters": [ { "name": "run_id", @@ -14289,7 +14289,7 @@ "items": { "$ref": "#/components/schemas/Step" }, - "title": "Response List Run Steps" + "title": "Response List Steps For Run" } } } @@ -14310,8 +14310,8 @@ "/v1/runs/{run_id}/stream": { "post": { "tags": ["runs"], - "summary": "Retrieve Stream", - "operationId": "retrieve_stream", + "summary": "Retrieve Stream For Run", + "operationId": "retrieve_stream_for_run", "parameters": [ { "name": "run_id", diff --git a/fern/pages/agents/long_running.mdx b/fern/pages/agents/long_running.mdx new file mode 100644 index 00000000..0b67207f --- /dev/null +++ b/fern/pages/agents/long_running.mdx @@ -0,0 +1,602 @@ +--- +title: Long-Running Executions +slug: guides/agents/long-running +subtitle: How to handle long-running agent executions +--- + +When agents need to execute multiple tool calls or perform complex operations (like deep research, data analysis, or multi-step workflows), processing time can vary significantly. + +Letta supports various ways to handle long-running agents, so you can choose the approach that best fits your use case: + +| Use Case | Duration | Recommendedation | Key Benefits | +|----------|----------|---------------------|-------------| +| Few-step invocations | < 1 minute | [Standard streaming](/guides/agents/streaming) | Simplest approach | +| Variable length runs | 1-10 minutes | **Background mode** (Keepalive + Timeout as a second choice) | Easy way to reduce timeouts | +| Deep research | 10+ minutes | **Background mode**, or async polling | Survives disconnects, resumable streams | +| Batch jobs | Any | **Async polling** | Fire-and-forget, check results later | + +## Option 1: Background Mode with Resumable Streaming + + +**Best for:** Operations exceeding 10 minutes, unreliable network connections, or critical workflows that must complete regardless of client connectivity. + +**Trade-off:** Slightly higher latency to first token due to background task initialization. + + +Background mode decouples agent execution from your client connection. The agent processes your request on the server while streaming results to a persistent store, allowing you to reconnect and resume from any point — even if your application crashes or network fails. + + +```curl curl maxLines=50 +curl --request POST \ + --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ + --header 'Authorization: Bearer $LETTA_API_KEY' \ + --header 'Content-Type: application/json' \ + --data '{ + "messages": [ + { + "role": "user", + "content": "Run comprehensive analysis on this dataset" + } + ], + "stream_tokens": true, + "background": true +}' + +# Response stream includes run_id and seq_id for each chunk: +data: {"run_id":"run-123","seq_id":0,"message_type":"reasoning_message","reasoning":"Analyzing"} +data: {"run_id":"run-123","seq_id":1,"message_type":"reasoning_message","reasoning":" the dataset"} +data: {"run_id":"run-123","seq_id":2,"message_type":"tool_call","tool_call":{...}} +# ... stream continues + +# Step 2: If disconnected, resume from last received seq_id +curl --request GET \ + --url https://api.letta.com/v1/runs/$RUN_ID/stream \ + --header 'Accept: text/event-stream' \ + --data '{ + "starting_after": 57 +}' +``` +```python python maxLines=50 +stream = client.agents.messages.create_stream( + agent_id=agent_state.id, + messages=[ + { + "role": "user", + "content": "Run comprehensive analysis on this dataset" + } + ], + stream_tokens=True, + background=True, +) +run_id = None +last_seq_id = None +for chunk in stream: + if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): + run_id = chunk.run_id # Save this to reconnect if your connection drops + last_seq_id = chunk.seq_id # Save this as your resumption point for cursor-based pagination + print(chunk) + +# If disconnected, resume from last received seq_id: +for chunk in client.runs.stream(run_id, starting_after=last_seq_id): + print(chunk) +``` +```typescript TypeScript maxLines=50 +const stream = await client.agents.messages.createStream({ + agentId: agentState.id, + requestBody: { + messages: [ + { + role: "user", + content: "Run comprehensive analysis on this dataset" + } + ], + streamTokens: true, + background: true, + } +}); + +let runId = null; +let lastSeqId = null; +for await (const chunk of stream) { + if (chunk.run_id && chunk.seq_id) { + runId = chunk.run_id; // Save this to reconnect if your connection drops + lastSeqId = chunk.seq_id; // Save this as your resumption point for cursor-based pagination + } + console.log(chunk); +} + +// If disconnected, resume from last received seq_id +for await (const chunk of client.runs.stream(runId, {startingAfter: lastSeqId})) { + console.log(chunk); +} +``` +```python python maxLines=60 +# 1) Start background stream and capture approval request +stream = client.agents.messages.create_stream( + agent_id=agent.id, + messages=[{"role": "user", "content": "Do a sensitive operation"}], + stream_tokens=True, + background=True, +) + +approval_request_id = None +orig_run_id = None +last_seq_id = 0 +for chunk in stream: + if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): + orig_run_id = chunk.run_id + last_seq_id = chunk.seq_id + if getattr(chunk, "message_type", None) == "approval_request_message": + approval_request_id = chunk.id + break + +# 2) Approve in background; capture the approval stream cursor (this creates a new run) +approve = client.agents.messages.create_stream( + agent_id=agent.id, + messages=[{"type": "approval", "approve": True, "approval_request_id": approval_request_id}], + stream_tokens=True, + background=True, +) + +run_id = None +approve_seq = 0 +for chunk in approve: + if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): + run_id = chunk.run_id + approve_seq = chunk.seq_id + if getattr(chunk, "message_type", None) == "tool_return_message": + # Tool result arrives here on the approval stream + break + +# 3) Resume that run to read follow-up tokens +for chunk in client.runs.stream(run_id, starting_after=approve_seq): + print(chunk) +``` +```typescript TypeScript maxLines=60 +// 1) Start background stream and capture approval request +const stream = await client.agents.messages.createStream( + agent.id, { + messages: [{role: "user", content: "Do a sensitive operation"}], + streamTokens: true, + background: true, + } +); + +let approvalRequestId = null; +let origRunId = null; +let lastSeqId = 0; +for await (const chunk of stream) { + if (chunk.runId && chunk.seqId) { + origRunId = chunk.runId; + lastSeqId = chunk.seqId; + } + if (chunk.messageType === "approval_request_message") { + approvalRequestId = chunk.id; + break; + } +} + +// 2) Approve in background; capture the approval stream cursor (this creates a new run) +const approveStream = await client.agents.messages.createStream( + agent.id, { + messages: [{type: "approval", approve: true, approvalRequestId}], + streamTokens: true, + background: true, + } +); + +let runId = null; +let approveSeq = 0; +for await (const chunk of approveStream) { + if (chunk.runId && chunk.seqId) { + runId = chunk.runId; + approveSeq = chunk.seqId; + } + if (chunk.messageType === "tool_return_message") { + // Tool result arrives here on the approval stream + break; + } +} + +// 3) Resume that run to read follow-up tokens +for await (const chunk of client.runs.stream(runId, {startingAfter: approveSeq})) { + console.log(chunk); +} +``` + + +### HITL in Background Mode + +When [Human‑in‑the‑Loop (HITL) approval](/guides/agents/human-in-the-loop) is enabled for a tool, your background stream may pause and emit an `approval_request_message`. In background mode, send the approval via a separate background stream and capture that stream’s `run_id`/`seq_id`. + + +Approval responses in background mode emit the `tool_return_message` on the approval stream itself (with a new `run_id`, different from the original stream). Save the approval stream cursor, then resume with `runs.stream` to consume subsequent reasoning/assistant messages. + + + +```curl curl maxLines=70 +# 1) Start background stream; capture approval request +curl --request POST \ + --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ + --header 'Authorization: Bearer $LETTA_API_KEY' \ + --header 'Content-Type: application/json' \ + --data '{ + "messages": [{"role": "user", "content": "Do a sensitive operation"}], + "stream_tokens": true, + "background": true +}' + +# Example stream output (approval request arrives): +data: {"run_id":"run-abc","seq_id":0,"message_type":"reasoning_message","reasoning":"..."} +data: {"run_id":"run-abc","seq_id":1,"message_type":"approval_request_message","id":"message-abc","tool_call":{"name":"sensitive_operation","arguments":"{...}","tool_call_id":"tool-xyz"}} + +# 2) Approve in background; capture approval stream cursor (this creates a new run) +curl --request POST \ + --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ + --header 'Authorization: Bearer $LETTA_API_KEY' \ + --header 'Content-Type: application/json' \ + --data '{ + "messages": [{"type": "approval", "approve": true, "approval_request_id": "message-abc"}], + "stream_tokens": true, + "background": true +}' + +# Example approval stream output (tool result arrives here): +data: {"run_id":"run-new","seq_id":0,"message_type":"tool_return_message","status":"success","tool_return":"..."} + +# 3) Resume the approval stream's run to continue +curl --request GET \ + --url https://api.letta.com/v1/runs/$RUN_ID/stream \ + --header 'Accept: text/event-stream' \ + --data '{ + "starting_after": 0 +}' +``` +```python python maxLines=70 +# 1) Start background stream and capture approval request +stream = client.agents.messages.create_stream( + agent_id=agent.id, + messages=[{"role": "user", "content": "Do a sensitive operation"}], + stream_tokens=True, + background=True, +) + +approval_request_id = None +orig_run_id = None +last_seq_id = 0 +for chunk in stream: + if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): + orig_run_id = chunk.run_id + last_seq_id = chunk.seq_id + if getattr(chunk, "message_type", None) == "approval_request_message": + approval_request_id = chunk.id + break + +# 2) Approve in background; capture the approval stream cursor (this creates a new run) +approve = client.agents.messages.create_stream( + agent_id=agent.id, + messages=[{"type": "approval", "approve": True, "approval_request_id": approval_request_id}], + stream_tokens=True, + background=True, +) + +run_id = None +approve_seq = 0 +for chunk in approve: + if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): + run_id = chunk.run_id + approve_seq = chunk.seq_id + if getattr(chunk, "message_type", None) == "tool_return_message": + # Tool result arrives here on the approval stream + break + +# 3) Resume that run to read follow-up tokens +for chunk in client.runs.stream(run_id, starting_after=approve_seq): + print(chunk) +``` +```typescript TypeScript maxLines=70 +// 1) Start background stream and capture approval request +const stream = await client.agents.messages.createStream({ + agentId: agent.id, + requestBody: { + messages: [{ role: "user", content: "Do a sensitive operation" }], + streamTokens: true, + background: true, + } +}); + +let approvalRequestId: string | null = null; +let origRunId: string | null = null; +let lastSeqId = 0; +for await (const chunk of stream) { + if (chunk.run_id && chunk.seq_id) { origRunId = chunk.run_id; lastSeqId = chunk.seq_id; } + if (chunk.message_type === "approval_request_message") { + approvalRequestId = chunk.id; break; + } +} + +// 2) Approve in background; capture the approval stream cursor (this creates a new run) +const approve = await client.agents.messages.createStream({ + agentId: agent.id, + requestBody: { + messages: [{ type: "approval", approve: true, approvalRequestId }], + streamTokens: true, + background: true, + } +}); + +let runId: string | null = null; +let approveSeq = 0; +for await (const chunk of approve) { + if (chunk.run_id && chunk.seq_id) { runId = chunk.run_id; approveSeq = chunk.seq_id; } + if (chunk.message_type === "tool_return_message") { + // Tool result arrives here on the approval stream + break; + } +} + +// 3) Resume that run to read follow-up tokens +const resume = await client.runs.stream(runId!, { startingAfter: approveSeq }); +for await (const chunk of resume) { + console.log(chunk); +} +``` + + + +### Discovering and Resuming Active Streams + +When your application starts or recovers from a crash, you can check for any active background streams and resume them. This is particularly useful for: +- **Application restarts**: Resume processing after deployments or crashes +- **Load balancing**: Pick up streams started by other instances +- **Monitoring**: Check progress of long-running operations from different clients + + +```curl curl maxLines=50 +# Step 1: Find active background streams for your agents +curl --request GET \ + --url https://api.letta.com/v1/runs/active \ + --header 'Authorization: Bearer $LETTA_API_KEY' \ + --header 'Content-Type: application/json' \ + --data '{ + "agent_ids": [ + "agent-123", + "agent-456" + ], + "background": true +}' +# Returns: [{"run_id": "run-abc", "agent_id": "agent-123", "status": "processing", ...}] + +# Step 2: Resume streaming from the beginning (or any specified seq_id) +curl --request GET \ + --url https://api.letta.com/v1/runs/$RUN_ID/stream \ + --header 'Accept: text/event-stream' \ + --data '{ + "starting_after": 0, # Start from beginning + "batch_size": 1000 # Fetch historical chunks in larger batches +}' +``` +```python python maxLines=50 +# Find and resume active background streams +active_runs = client.runs.active( + agent_ids=["agent-123", "agent-456"], + background=True, +) + +if active_runs: + # Resume the first active stream from the beginning + run = active_runs[0] + print(f"Resuming stream for run {run.id}, status: {run.status}") + + stream = client.runs.stream( + run_id=run.id, + starting_after=0, # Start from beginning + batch_size=1000 # Fetch historical chunks in larger batches + ) + + # Each historical chunk is streamed one at a time, followed by new chunks as they become available + for chunk in stream: + print(chunk) +``` +```typescript TypeScript maxLines=50 +// Find and resume active background streams +const activeRuns = await client.runs.active({ + agentIds: ["agent-123", "agent-456"], + background: true, +}); + +if (activeRuns.length > 0) { + // Resume the first active stream from the beginning + const run = activeRuns[0]; + console.log(`Resuming stream for run ${run.id}, status: ${run.status}`); + + const stream = await client.runs.stream(run.id, { + startingAfter: 0, // Start from beginning + batchSize: 1000 // Fetch historical chunks in larger batches + }); + + // Each historical chunk is streamed one at a time, followed by new chunks as they become available + for await (const chunk of stream) { + console.log(chunk); + } +} +``` + + +## Option 2: Async Operations with Polling + + +**Best for:** Usecases where you don't need real-time token streaming. + + +Ideal for batch processing, scheduled jobs, or when you don't need real-time updates. The [async SDK method](/api-reference/agents/messages/create-async) queues your request and returns immediately, letting you check results later: + + +```curl curl maxLines=50 +# Start async operation (returns immediately with run ID) +curl --request POST \ + --url https://api.letta.com/v1/agents/$AGENT_ID/messages/async \ + --header 'Authorization: Bearer $LETTA_API_KEY' \ + --header 'Content-Type: application/json' \ + --data '{ + "messages": [ + { + "role": "user", + "content": "Run comprehensive analysis on this dataset" + } + ] +}' + +# Poll for results using the returned run ID +curl --request GET \ + --url https://api.letta.com/v1/runs/$RUN_ID +``` +```python python maxLines=50 +# Start async operation (returns immediately with run ID) +run = client.agents.messages.create_async( + agent_id=agent_state.id, + messages=[ + { + "role": "user", + "content": "Run comprehensive analysis on this dataset" + } + ], +) + +# Poll for completion +import time +while run.status != "completed": + time.sleep(2) + run = client.runs.retrieve(run_id=run.id) + +# Get the messages once complete +messages = client.runs.messages.list(run_id=run.id) +``` +```typescript TypeScript maxLines=50 +// Start async operation (returns immediately with run ID) +const run = await client.agents.createAgentMessageAsync({ + agentId: agentState.id, + requestBody: { + messages: [ + { + role: "user", + content: "Run comprehensive analysis on this dataset" + } + ] + } +}); + +// Poll for completion +while (run.status !== "completed") { + await new Promise(resolve => setTimeout(resolve, 2000)); + run = await client.runs.retrieveRun({ runId: run.id }); +} + +// Get the messages once complete +const messages = await client.runs.messages.list({ runId: run.id }); +``` + + +## Option 3: Configure Streaming with Keepalive Pings and Longer Timeouts + + +**Best for:** Usecases where you are already using the standard [streaming code](/guides/agents/streaming), but are experiencing issues with timeouts or disconnects (e.g. due to network interruptions or hanging tool executions). + +**Trade-off:** Not as reliable as background mode, and does not support resuming a disconnected stream/request. + + + +This approach assumes a persistent HTTP connection. We highly recommend using **background mode** (or async polling) for long-running jobs, especially when: +- Your infrastructure uses aggressive proxy timeouts +- You need to handle network interruptions gracefully +- Operations might exceed 10 minutes + + +For operations under 10 minutes that need real-time updates without the complexity of background processing. Configure keepalive pings and timeouts to maintain stable connections: + + +```curl curl maxLines=50 +curl --request POST \ + --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ + --header 'Authorization: Bearer $LETTA_API_KEY' \ + --header 'Content-Type: application/json' \ + --data '{ + "messages": [ + { + "role": "user", + "content": "Execute this long-running analysis" + } + ], + "include_pings": true +}' +``` +```python python +# Configure client with extended timeout +from letta_client import Letta +import os + +client = Letta( + token=os.getenv("LETTA_API_KEY") +) + +# Enable pings to prevent timeout during long operations +stream = client.agents.messages.create_stream( + agent_id=agent_state.id, + messages=[ + { + "role": "user", + "content": "Execute this long-running analysis" + } + ], + include_pings=True, # Sends periodic keepalive messages + request_options={"timeout_in_seconds": 600} # 10 min timeout +) + +# Process the stream (pings will keep connection alive) +for chunk in stream: + if chunk.message_type == "ping": + # Keepalive ping received, connection is still active + continue + print(chunk) +``` +```typescript TypeScript maxLines=50 +// Configure client with extended timeout +import { Letta } from '@letta/sdk'; + +const client = new Letta({ + token: process.env.LETTA_API_KEY +}); + +// Enable pings to prevent timeout during long operations +const stream = await client.agents.createAgentMessageStream({ + agentId: agentState.id, + requestBody: { + messages: [ + { + role: "user", + content: "Execute this long-running analysis" + } + ], + includePings: true // Sends periodic keepalive messages + }, { + timeoutInSeconds: 600 // 10 minutes timeout in seconds + } +}); + +// Process the stream (pings will keep connection alive) +for await (const chunk of stream) { + if (chunk.message_type === "ping") { + // Keepalive ping received, connection is still active + continue; + } + console.log(chunk); +} +``` + + +### Configuration Guidelines + +| Parameter | Purpose | When to Use | +|-----------|---------|------------| +| Timeout in seconds | Extends request timeout beyond 60s default | Set to 1.5x your expected max duration | +| Include pings | Sends keepalive messages every ~30s | Enable for operations with long gaps between outputs | diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index 35669d85..d4b9ee8b 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -160,9 +160,9 @@ RunMessagesResponse = Annotated[ @router.get( "/{run_id}/messages", response_model=RunMessagesResponse, - operation_id="list_run_messages", + operation_id="list_messages_for_run", ) -async def list_run_messages( +async def list_messages_for_run( run_id: str, server: "SyncServer" = Depends(get_letta_server), headers: HeaderParams = Depends(get_headers), @@ -183,8 +183,8 @@ async def list_run_messages( return await server.run_manager.get_run_messages(run_id=run_id, actor=actor, before=before, after=after, limit=limit, order=order) -@router.get("/{run_id}/usage", response_model=UsageStatistics, operation_id="retrieve_run_usage") -async def retrieve_run_usage( +@router.get("/{run_id}/usage", response_model=UsageStatistics, operation_id="retrieve_usage_for_run") +async def retrieve_usage_for_run( run_id: str, headers: HeaderParams = Depends(get_headers), server: "SyncServer" = Depends(get_letta_server), @@ -215,9 +215,9 @@ async def retrieve_metrics_for_run( @router.get( "/{run_id}/steps", response_model=List[Step], - operation_id="list_run_steps", + operation_id="list_steps_for_run", ) -async def list_run_steps( +async def list_steps_for_run( run_id: str, server: "SyncServer" = Depends(get_letta_server), headers: HeaderParams = Depends(get_headers), @@ -262,7 +262,7 @@ async def delete_run( @router.post( "/{run_id}/stream", response_model=None, - operation_id="retrieve_stream", + operation_id="retrieve_stream_for_run", responses={ 200: { "description": "Successful response", @@ -292,7 +292,7 @@ async def delete_run( } }, ) -async def retrieve_stream( +async def retrieve_stream_for_run( run_id: str, request: RetrieveStreamRequest = Body(None), headers: HeaderParams = Depends(get_headers),