Files
letta-server/letta/server/rest_api/routers/v1/agents.py
Sarah Wooders 18a1a16bf4 Revert "feat: add message_types filter to list messages endpoint" (#8314)
Revert "feat: add message_types filter to list messages endpoint (#8280)"

This reverts commit e7ac5df721ec4b3e663dd30239f590ee16bb8630.
2026-01-12 10:57:48 -08:00

2246 lines
91 KiB
Python

import asyncio
import json
import traceback
from datetime import datetime, timezone
from typing import Annotated, Any, Dict, List, Literal, Optional, Union
from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException, Query, Request, UploadFile, status
from fastapi.responses import JSONResponse
from marshmallow import ValidationError
from orjson import orjson
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy.exc import IntegrityError, OperationalError
from starlette.responses import Response, StreamingResponse
from letta.agents.agent_loop import AgentLoop
from letta.agents.base_agent_v2 import BaseAgentV2
from letta.agents.letta_agent import LettaAgent
from letta.agents.letta_agent_v2 import LettaAgentV2
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.constants import DEFAULT_MAX_STEPS, DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG, REDIS_RUN_ID_PREFIX
from letta.data_sources.redis_client import get_redis_client
from letta.errors import (
AgentExportIdMappingError,
AgentExportProcessingError,
AgentFileImportError,
AgentNotFoundForExportError,
NoActiveRunsToCancelError,
PendingApprovalError,
)
from letta.groups.sleeptime_multi_agent_v4 import SleeptimeMultiAgentV4
from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns
from letta.log import get_logger
from letta.orm.errors import NoResultFound
from letta.otel.context import get_ctx_attributes
from letta.otel.metric_registry import MetricRegistry
from letta.schemas.agent import AgentRelationships, AgentState, CreateAgent, UpdateAgent
from letta.schemas.agent_file import AgentFileSchema
from letta.schemas.block import BaseBlock, Block, BlockResponse, BlockUpdate
from letta.schemas.enums import AgentType, MessageRole, RunStatus
from letta.schemas.file import AgentFileAttachment, FileMetadataBase, PaginatedAgentFiles
from letta.schemas.group import Group
from letta.schemas.job import LettaRequestConfig
from letta.schemas.letta_message import LettaMessageUnion, LettaMessageUpdateUnion, MessageType
from letta.schemas.letta_message_content import TextContent
from letta.schemas.letta_request import LettaAsyncRequest, LettaRequest, LettaStreamingRequest
from letta.schemas.letta_response import LettaResponse, LettaStreamingResponse
from letta.schemas.letta_stop_reason import StopReasonType
from letta.schemas.mcp_server import ToolExecuteRequest
from letta.schemas.memory import (
ArchivalMemorySearchResponse,
ArchivalMemorySearchResult,
ContextWindowOverview,
CreateArchivalMemory,
Memory,
)
from letta.schemas.message import Message, MessageCreate, MessageCreateType, MessageSearchRequest, MessageSearchResult
from letta.schemas.passage import Passage
from letta.schemas.run import Run as PydanticRun, RunUpdate
from letta.schemas.source import BaseSource, Source
from letta.schemas.tool import BaseTool, Tool
from letta.schemas.tool_execution_result import ToolExecutionResult
from letta.schemas.user import User
from letta.serialize_schemas.pydantic_agent_schema import AgentSchema
from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server
from letta.server.server import SyncServer
from letta.services.lettuce import LettuceClient
from letta.services.run_manager import RunManager
from letta.services.streaming_service import StreamingService
from letta.services.summarizer.summarizer_config import CompactionSettings
from letta.settings import settings
from letta.utils import is_1_0_sdk_version, safe_create_shielded_task, safe_create_task, truncate_file_visible_content
from letta.validators import AgentId, BlockId, FileId, MessageId, SourceId, ToolId
# These can be forward refs, but because Fastapi needs them at runtime the must be imported normally
router = APIRouter(prefix="/agents", tags=["agents"])
logger = get_logger(__name__)
@router.get("/", response_model=list[AgentState], operation_id="list_agents")
async def list_agents(
name: str | None = Query(None, description="Name of the agent"),
tags: list[str] | None = Query(None, description="List of tags to filter agents by"),
match_all_tags: bool = Query(
False,
description="If True, only returns agents that match ALL given tags. Otherwise, return agents that have ANY of the passed-in tags.",
),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: str | None = Query(None, description="Cursor for pagination"),
after: str | None = Query(None, description="Cursor for pagination"),
limit: int | None = Query(50, description="Limit for pagination"),
query_text: str | None = Query(None, description="Search agents by name"),
project_id: str | None = Query(None, description="Search agents by project ID - this will default to your default project on cloud"),
template_id: str | None = Query(None, description="Search agents by template ID"),
base_template_id: str | None = Query(None, description="Search agents by base template ID"),
identity_id: str | None = Query(None, description="Search agents by identity ID"),
identifier_keys: list[str] | None = Query(None, description="Search agents by identifier keys"),
include_relationships: list[str] | None = Query(
None,
description=(
"Specify which relational fields (e.g., 'tools', 'sources', 'memory') to include in the response. "
"If not provided, all relationships are loaded by default. "
"Using this can optimize performance by reducing unnecessary joins."
"This is a legacy parameter, and no longer supported after 1.0.0 SDK versions."
),
deprecated=True,
),
include: List[AgentRelationships] = Query(
[],
description=("Specify which relational fields to include in the response. No relationships are included by default."),
),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for agents by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at", "last_run_completion"] = Query("created_at", description="Field to sort by"),
ascending: bool = Query(
False,
description="Whether to sort agents oldest to newest (True) or newest to oldest (False, default)",
deprecated=True,
),
sort_by: str | None = Query(
"created_at",
description="Field to sort by. Options: 'created_at' (default), 'last_run_completion'",
deprecated=True,
),
show_hidden_agents: bool | None = Query(
False,
include_in_schema=False,
description="If set to True, include agents marked as hidden in the results.",
),
last_stop_reason: Optional[StopReasonType] = Query(None, description="Filter agents by their last stop reason."),
):
"""
Get a list of all agents.
"""
# Retrieve the actor (user) details
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Handle backwards compatibility - prefer new parameters over legacy ones
final_ascending = (order == "asc") if order else ascending
final_sort_by = order_by if order_by else sort_by
if include_relationships is None and is_1_0_sdk_version(headers):
include_relationships = [] # don't default include all if using new SDK version
# Call list_agents directly without unnecessary dict handling
return await server.agent_manager.list_agents_async(
actor=actor,
name=name,
before=before,
after=after,
limit=limit,
query_text=query_text,
tags=tags,
match_all_tags=match_all_tags,
project_id=project_id,
template_id=template_id,
base_template_id=base_template_id,
identity_id=identity_id,
identifier_keys=identifier_keys,
include_relationships=include_relationships,
include=include,
ascending=final_ascending,
sort_by=final_sort_by,
show_hidden_agents=show_hidden_agents,
last_stop_reason=last_stop_reason,
)
@router.get("/count", response_model=int, operation_id="count_agents")
async def count_agents(
name: str | None = Query(None, description="Name of the agent"),
tags: list[str] | None = Query(None, description="List of tags to filter agents by"),
match_all_tags: bool = Query(
False,
description="If True, only counts agents that match ALL given tags. Otherwise, counts agents that have ANY of the passed-in tags.",
),
query_text: str | None = Query(None, description="Search agents by name"),
project_id: str | None = Query(None, description="Search agents by project ID - this will default to your default project on cloud"),
template_id: str | None = Query(None, description="Search agents by template ID"),
base_template_id: str | None = Query(None, description="Search agents by base template ID"),
identity_id: str | None = Query(None, description="Search agents by identity ID"),
identifier_keys: list[str] | None = Query(None, description="Search agents by identifier keys"),
show_hidden_agents: bool | None = Query(
False,
include_in_schema=False,
description="If set to True, include agents marked as hidden in the results.",
),
last_stop_reason: Optional[StopReasonType] = Query(None, description="Filter agents by their last stop reason."),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Get the total number of agents with optional filtering.
Supports the same filters as list_agents for consistent querying.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# If no filters are provided, use the simpler size_async method
if (
all(
param is None or param is False
for param in [name, tags, query_text, project_id, template_id, base_template_id, identity_id, identifier_keys, last_stop_reason]
)
and not show_hidden_agents
):
return await server.agent_manager.size_async(actor=actor)
return await server.agent_manager.count_agents_async(
actor=actor,
name=name,
tags=tags,
match_all_tags=match_all_tags,
query_text=query_text,
project_id=project_id,
template_id=template_id,
base_template_id=base_template_id,
identity_id=identity_id,
identifier_keys=identifier_keys,
show_hidden_agents=show_hidden_agents,
last_stop_reason=last_stop_reason,
)
class IndentedORJSONResponse(Response):
media_type = "application/json"
def render(self, content: Any) -> bytes:
return orjson.dumps(content, option=orjson.OPT_INDENT_2)
@router.get("/{agent_id}/export", response_class=IndentedORJSONResponse, operation_id="export_agent")
async def export_agent(
agent_id: str = AgentId,
max_steps: int = Query(100, deprecated=True),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
use_legacy_format: bool = Query(
False,
description="If True, exports using the legacy single-agent 'v1' format with inline tools/blocks. If False, exports using the new multi-entity 'v2' format, with separate agents, tools, blocks, files, etc.",
deprecated=True,
),
# do not remove, used to autogeneration of spec
# TODO: Think of a better way to export AgentFileSchema
spec: AgentFileSchema | None = None,
legacy_spec: AgentSchema | None = None,
) -> JSONResponse:
"""
Export the serialized JSON representation of an agent, formatted with indentation.
"""
if use_legacy_format:
raise HTTPException(status_code=400, detail="Legacy format is not supported")
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent_file_schema = await server.agent_serialization_manager.export(agent_ids=[agent_id], actor=actor)
return agent_file_schema.model_dump()
class ImportedAgentsResponse(BaseModel):
"""Response model for imported agents"""
agent_ids: List[str] = Field(..., description="List of IDs of the imported agents")
def import_agent_legacy(
agent_json: dict,
server: "SyncServer",
actor: User,
append_copy_suffix: bool = True,
override_existing_tools: bool = True,
project_id: str | None = None,
strip_messages: bool = False,
env_vars: Optional[dict[str, Any]] = None,
) -> List[str]:
"""
Import an agent using the legacy AgentSchema format.
"""
# Validate the JSON against AgentSchema before passing it to deserialize
agent_schema = AgentSchema.model_validate(agent_json)
new_agent = server.agent_manager.deserialize(
serialized_agent=agent_schema, # Ensure we're passing a validated AgentSchema
actor=actor,
append_copy_suffix=append_copy_suffix,
override_existing_tools=override_existing_tools,
project_id=project_id,
strip_messages=strip_messages,
env_vars=env_vars,
)
return [new_agent.id]
async def _import_agent(
agent_file_json: dict,
server: "SyncServer",
actor: User,
# TODO: Support these fields for new agent file
append_copy_suffix: bool = True,
override_name: Optional[str] = None,
override_existing_tools: bool = True,
project_id: str | None = None,
strip_messages: bool = False,
env_vars: Optional[dict[str, Any]] = None,
override_embedding_handle: Optional[str] = None,
) -> List[str]:
"""
Import an agent using the new AgentFileSchema format.
"""
agent_schema = AgentFileSchema.model_validate(agent_file_json)
if override_embedding_handle:
embedding_config_override = await server.get_embedding_config_from_handle_async(actor=actor, handle=override_embedding_handle)
else:
embedding_config_override = None
import_result = await server.agent_serialization_manager.import_file(
schema=agent_schema,
actor=actor,
append_copy_suffix=append_copy_suffix,
override_name=override_name,
override_existing_tools=override_existing_tools,
env_vars=env_vars,
override_embedding_config=embedding_config_override,
project_id=project_id,
)
if not import_result.success:
from letta.errors import AgentFileImportError
raise AgentFileImportError(f"Import failed: {import_result.message}. Errors: {', '.join(import_result.errors)}")
return import_result.imported_agent_ids
@router.post("/import", response_model=ImportedAgentsResponse, operation_id="import_agent")
async def import_agent(
file: UploadFile = File(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
x_override_embedding_model: str | None = Header(None, alias="x-override-embedding-model"),
# New fields (all optional)
override_existing_tools: bool = Form(
True,
description="If set to True, existing tools can get their source code overwritten by the uploaded tool definitions. Note that Letta core tools can never be updated externally.",
),
strip_messages: bool = Form(
False,
description="If set to True, strips all messages from the agent before importing.",
),
secrets: Optional[str] = Form(None, description="Secrets as a JSON string to pass to the agent for tool execution."),
name: Optional[str] = Form(
None,
description="If provided, overrides the agent name with this value.",
),
embedding: Optional[str] = Form(
None,
description="Embedding handle to override with.",
),
# Deprecated fields (maintain backward compatibility)
append_copy_suffix: bool = Form(
True,
description='If set to True, appends "_copy" to the end of the agent name.',
deprecated=True,
),
override_name: Optional[str] = Form(
None,
description="If provided, overrides the agent name with this value. Use 'name' instead.",
deprecated=True,
),
override_embedding_handle: Optional[str] = Form(
None,
description="Override import with specific embedding handle. Use 'embedding' instead.",
deprecated=True,
),
project_id: str | None = Form(
None, description="The project ID to associate the uploaded agent with. This is now passed via headers.", deprecated=True
),
env_vars_json: Optional[str] = Form(
None,
description="Environment variables as a JSON string to pass to the agent for tool execution. Use 'secrets' instead.",
deprecated=True,
),
):
"""
Import a serialized agent file and recreate the agent(s) in the system.
Returns the IDs of all imported agents.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
try:
serialized_data = file.file.read()
file_size_mb = len(serialized_data) / (1024 * 1024)
logger.info(f"Agent import: loaded {file_size_mb:.2f} MB into memory")
agent_json = json.loads(serialized_data)
# Handle double-encoded JSON (if the result is a string, parse it again)
if isinstance(agent_json, str):
agent_json = json.loads(agent_json)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Corrupted agent file format.")
# Handle backward compatibility: prefer new field names over deprecated ones
final_name = name or override_name
final_embedding_handle = embedding or override_embedding_handle or x_override_embedding_model
# Parse secrets (new) or env_vars_json (deprecated)
env_vars = None
secrets_json = secrets or env_vars_json
if secrets_json:
try:
env_vars = json.loads(secrets_json)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="secrets must be a valid JSON string")
if not isinstance(env_vars, dict):
raise HTTPException(status_code=400, detail="secrets must be a valid JSON string")
# Get project_id from headers (preferred) or fall back to form data for backward compatibility
# In cloud environments, project_id should be passed via headers
final_project_id = headers.project_id or project_id
# Check if the JSON is AgentFileSchema or AgentSchema
# TODO: This is kind of hacky, but should work as long as dont' change the schema
if "agents" in agent_json and isinstance(agent_json.get("agents"), list):
# This is an AgentFileSchema
agent_ids = await _import_agent(
agent_file_json=agent_json,
server=server,
actor=actor,
append_copy_suffix=append_copy_suffix,
override_name=final_name,
override_existing_tools=override_existing_tools,
project_id=final_project_id,
strip_messages=strip_messages,
env_vars=env_vars,
override_embedding_handle=final_embedding_handle,
)
else:
# This is a legacy AgentSchema
raise HTTPException(
status_code=400,
detail="Legacy AgentSchema format is deprecated. Please use the new AgentFileSchema format with 'agents' field.",
)
return ImportedAgentsResponse(agent_ids=agent_ids)
@router.get("/{agent_id}/context", response_model=ContextWindowOverview, operation_id="retrieve_agent_context_window", deprecated=True)
async def retrieve_agent_context_window(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Retrieve the context window of a specific agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.get_context_window(agent_id=agent_id, actor=actor)
class CreateAgentRequest(CreateAgent):
"""
CreateAgent model specifically for POST request body, excluding user_id which comes from headers
"""
# Override the user_id field to exclude it from the request body validation
actor_id: str | None = Field(None, exclude=True)
@router.post("/", response_model=AgentState, operation_id="create_agent")
async def create_agent(
agent: CreateAgentRequest = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
x_project: str | None = Header(
None, alias="X-Project", description="The project slug to associate with the agent (cloud only)."
), # Only handled by next js middleware
):
"""
Create an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.create_agent_async(agent, actor=actor)
@router.patch("/{agent_id}", response_model=AgentState, operation_id="modify_agent")
async def modify_agent(
agent_id: AgentId,
update_agent: UpdateAgent = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""Update an existing agent."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.update_agent_async(agent_id=agent_id, request=update_agent, actor=actor)
@router.get("/{agent_id}/tools", response_model=list[Tool], operation_id="list_tools_for_agent")
async def list_tools_for_agent(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: Optional[str] = Query(
None, description="Tool ID cursor for pagination. Returns tools that come before this tool ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Tool ID cursor for pagination. Returns tools that come after this tool ID in the specified sort order"
),
limit: Optional[int] = Query(10, description="Maximum number of tools to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for tools by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
):
"""Get tools from an existing agent."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.list_attached_tools_async(
agent_id=agent_id,
actor=actor,
before=before,
after=after,
limit=limit,
ascending=(order == "asc"),
)
@router.patch("/{agent_id}/tools/attach/{tool_id}", response_model=Optional[AgentState], operation_id="attach_tool_to_agent")
async def attach_tool_to_agent(
tool_id: ToolId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Attach a tool to an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.agent_manager.attach_tool_async(agent_id=agent_id, tool_id=tool_id, actor=actor)
if is_1_0_sdk_version(headers):
return None
# TODO: Unfortunately we need this to preserve our current API behavior
return await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor)
@router.patch("/{agent_id}/tools/detach/{tool_id}", response_model=Optional[AgentState], operation_id="detach_tool_from_agent")
async def detach_tool_from_agent(
tool_id: ToolId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Detach a tool from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.agent_manager.detach_tool_async(agent_id=agent_id, tool_id=tool_id, actor=actor)
if is_1_0_sdk_version(headers):
return None
# TODO: Unfortunately we need this to preserve our current API behavior
return await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor)
class ModifyApprovalRequest(BaseModel):
"""Request body for modifying tool approval requirements."""
requires_approval: bool = Field(..., description="Whether the tool requires approval before execution")
model_config = ConfigDict(extra="forbid")
@router.patch("/{agent_id}/tools/approval/{tool_name}", response_model=Optional[AgentState], operation_id="modify_approval_for_tool")
async def modify_approval_for_tool(
tool_name: str,
agent_id: AgentId,
requires_approval: bool | None = Query(None, description="Whether the tool requires approval before execution", deprecated=True),
request: ModifyApprovalRequest | None = Body(None),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Modify the approval requirement for a tool attached to an agent.
Accepts requires_approval via request body (preferred) or query parameter (deprecated).
"""
# Prefer body over query param for backwards compatibility
if request is not None:
approval_value = request.requires_approval
elif requires_approval is not None:
approval_value = requires_approval
else:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="requires_approval must be provided either in request body or as query parameter",
)
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.agent_manager.modify_approvals_async(agent_id=agent_id, tool_name=tool_name, requires_approval=approval_value, actor=actor)
if is_1_0_sdk_version(headers):
return None
# TODO: Unfortunately we need this to preserve our current API behavior
return await server.agent_manager.get_agent_by_id_async(agent_id=agent_id, actor=actor)
@router.post("/{agent_id}/tools/{tool_name}/run", response_model=ToolExecutionResult, operation_id="run_tool_for_agent")
async def run_tool_for_agent(
agent_id: AgentId,
tool_name: str,
request: ToolExecuteRequest = Body(default=ToolExecuteRequest()),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Trigger a tool by name on a specific agent, providing the necessary arguments.
This endpoint executes a tool that is attached to the agent, using the agent's
state and environment variables for execution context.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Get agent with all relationships
agent = await server.agent_manager.get_agent_by_id_async(
agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"]
)
# Find the tool by name among attached tools
tool = None
if agent.tools:
for t in agent.tools:
if t.name == tool_name:
tool = t
break
if tool is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Tool '{tool_name}' not found or not attached to agent '{agent_id}'",
)
# Build environment variables dict from agent secrets
# Use pre-decrypted value field (populated in from_orm_async)
sandbox_env_vars = {}
if agent.tool_exec_environment_variables:
for env_var in agent.tool_exec_environment_variables:
sandbox_env_vars[env_var.key] = env_var.value or ""
# Create tool execution manager and execute the tool
from letta.services.tool_executor.tool_execution_manager import ToolExecutionManager
tool_execution_manager = ToolExecutionManager(
agent_state=agent,
message_manager=server.message_manager,
agent_manager=server.agent_manager,
block_manager=server.block_manager,
run_manager=server.run_manager,
passage_manager=server.passage_manager,
actor=actor,
sandbox_env_vars=sandbox_env_vars,
)
tool_execution_result = await tool_execution_manager.execute_tool_async(
function_name=tool_name,
function_args=request.args,
tool=tool,
)
# don't return a result if the tool execution failed
if tool_execution_result.status == "error":
tool_execution_result.func_return = None
# remove deprecated agent_state field
tool_execution_result.agent_state = None
return tool_execution_result
@router.patch("/{agent_id}/sources/attach/{source_id}", response_model=AgentState, operation_id="attach_source_to_agent", deprecated=True)
async def attach_source(
source_id: SourceId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Attach a source to an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent_state = await server.agent_manager.attach_source_async(agent_id=agent_id, source_id=source_id, actor=actor)
# Check if the agent is missing any files tools
agent_state = await server.agent_manager.attach_missing_files_tools_async(agent_state=agent_state, actor=actor)
files = await server.file_manager.list_files(source_id, actor, include_content=True)
if files:
await server.agent_manager.insert_files_into_context_window(agent_state=agent_state, file_metadata_with_content=files, actor=actor)
if agent_state.enable_sleeptime:
source = await server.source_manager.get_source_by_id(source_id=source_id)
safe_create_task(server.sleeptime_document_ingest_async(agent_state, source, actor), label="sleeptime_document_ingest_async")
return agent_state
@router.patch("/{agent_id}/folders/attach/{folder_id}", response_model=Optional[AgentState], operation_id="attach_folder_to_agent")
async def attach_folder_to_agent(
folder_id: SourceId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Attach a folder to an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent_state = await server.agent_manager.attach_source_async(agent_id=agent_id, source_id=folder_id, actor=actor)
# Check if the agent is missing any files tools
agent_state = await server.agent_manager.attach_missing_files_tools_async(agent_state=agent_state, actor=actor)
files = await server.file_manager.list_files(folder_id, actor, include_content=True)
if files:
await server.agent_manager.insert_files_into_context_window(agent_state=agent_state, file_metadata_with_content=files, actor=actor)
if agent_state.enable_sleeptime:
source = await server.source_manager.get_source_by_id(source_id=folder_id)
safe_create_task(server.sleeptime_document_ingest_async(agent_state, source, actor), label="sleeptime_document_ingest_async")
if is_1_0_sdk_version(headers):
return None
return agent_state
@router.patch("/{agent_id}/sources/detach/{source_id}", response_model=AgentState, operation_id="detach_source_from_agent", deprecated=True)
async def detach_source(
source_id: SourceId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Detach a source from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent_state = await server.agent_manager.detach_source_async(agent_id=agent_id, source_id=source_id, actor=actor)
if not agent_state.sources:
agent_state = await server.agent_manager.detach_all_files_tools_async(agent_state=agent_state, actor=actor)
# Query files_agents directly to get exactly what was attached, regardless of source changes
file_ids = await server.file_agent_manager.get_file_ids_for_agent_by_source(agent_id=agent_id, source_id=source_id, actor=actor)
if file_ids:
await server.remove_files_from_context_window(agent_state=agent_state, file_ids=file_ids, actor=actor)
if agent_state.enable_sleeptime:
try:
source = await server.source_manager.get_source_by_id(source_id=source_id)
block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=source.name, actor=actor)
await server.block_manager.delete_block_async(block.id, actor)
except:
pass
return agent_state
@router.patch("/{agent_id}/folders/detach/{folder_id}", response_model=Optional[AgentState], operation_id="detach_folder_from_agent")
async def detach_folder_from_agent(
folder_id: SourceId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Detach a folder from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent_state = await server.agent_manager.detach_source_async(agent_id=agent_id, source_id=folder_id, actor=actor)
if not agent_state.sources:
agent_state = await server.agent_manager.detach_all_files_tools_async(agent_state=agent_state, actor=actor)
# Query files_agents directly to get exactly what was attached, regardless of source changes
file_ids = await server.file_agent_manager.get_file_ids_for_agent_by_source(agent_id=agent_id, source_id=folder_id, actor=actor)
if file_ids:
await server.remove_files_from_context_window(agent_state=agent_state, file_ids=file_ids, actor=actor)
if agent_state.enable_sleeptime:
try:
source = await server.source_manager.get_source_by_id(source_id=folder_id)
block = await server.agent_manager.get_block_with_label_async(agent_id=agent_state.id, block_label=source.name, actor=actor)
await server.block_manager.delete_block_async(block.id, actor)
except:
pass
if is_1_0_sdk_version(headers):
return None
return agent_state
@router.patch("/{agent_id}/files/close-all", response_model=List[str], operation_id="close_all_files_for_agent")
async def close_all_files_for_agent(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Closes all currently open files for a given agent.
This endpoint updates the file state for the agent so that no files are marked as open.
Typically used to reset the working memory view for the agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.file_agent_manager.close_all_other_files(agent_id=agent_id, keep_file_names=[], actor=actor)
@router.patch("/{agent_id}/files/{file_id}/open", response_model=List[str], operation_id="open_file_for_agent")
async def open_file_for_agent(
file_id: FileId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Opens a specific file for a given agent.
This endpoint marks a specific file as open in the agent's file state.
The file will be included in the agent's working memory view.
Returns a list of file names that were closed due to LRU eviction.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Get the agent to access files configuration
per_file_view_window_char_limit, max_files_open = await server.agent_manager.get_agent_files_config_async(
agent_id=agent_id, actor=actor
)
# Get file metadata
file_metadata = await server.file_manager.get_file_by_id(file_id=file_id, actor=actor, include_content=True)
if not file_metadata:
raise HTTPException(status_code=404, detail=f"File with id={file_id} not found")
# Process file content with line numbers using LineChunker
from letta.services.file_processor.chunker.line_chunker import LineChunker
content_lines = LineChunker().chunk_text(file_metadata=file_metadata, validate_range=False)
visible_content = "\n".join(content_lines)
# Truncate if needed
visible_content = truncate_file_visible_content(visible_content, True, per_file_view_window_char_limit)
# Use enforce_max_open_files_and_open for efficient LRU handling
closed_files, was_already_open, _ = await server.file_agent_manager.enforce_max_open_files_and_open(
agent_id=agent_id,
file_id=file_id,
file_name=file_metadata.file_name,
source_id=file_metadata.source_id,
actor=actor,
visible_content=visible_content,
max_files_open=max_files_open,
)
return closed_files
@router.patch("/{agent_id}/files/{file_id}/close", response_model=None, operation_id="close_file_for_agent")
async def close_file_for_agent(
file_id: FileId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Closes a specific file for a given agent.
This endpoint marks a specific file as closed in the agent's file state.
The file will be removed from the agent's working memory view.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Use update_file_agent_by_id to close the file
await server.file_agent_manager.update_file_agent_by_id(
agent_id=agent_id,
file_id=file_id,
actor=actor,
is_open=False,
)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": f"File id={file_id} successfully closed"})
@router.get("/{agent_id}", response_model=AgentState, operation_id="retrieve_agent")
async def retrieve_agent(
agent_id: AgentId,
include_relationships: list[str] | None = Query(
None,
description=(
"Specify which relational fields (e.g., 'tools', 'sources', 'memory') to include in the response. "
"If not provided, all relationships are loaded by default. "
"Using this can optimize performance by reducing unnecessary joins."
"This is a legacy parameter, and no longer supported after 1.0.0 SDK versions."
),
deprecated=True,
),
include: List[AgentRelationships] = Query(
[],
description=("Specify which relational fields to include in the response. No relationships are included by default."),
),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Get the state of the agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
if include_relationships is None and is_1_0_sdk_version(headers):
include_relationships = [] # don't default include all if using new SDK version
return await server.agent_manager.get_agent_by_id_async(
agent_id=agent_id, include_relationships=include_relationships, include=include, actor=actor
)
@router.delete("/{agent_id}", response_model=None, operation_id="delete_agent")
async def delete_agent(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Delete an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.agent_manager.delete_agent_async(agent_id=agent_id, actor=actor)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": f"Agent id={agent_id} successfully deleted"})
@router.get("/{agent_id}/sources", response_model=list[Source], operation_id="list_agent_sources", deprecated=True)
async def list_agent_sources(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: Optional[str] = Query(
None, description="Source ID cursor for pagination. Returns sources that come before this source ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Source ID cursor for pagination. Returns sources that come after this source ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of sources to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for sources by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
):
"""
Get the sources associated with an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.list_attached_sources_async(
agent_id=agent_id,
actor=actor,
before=before,
after=after,
limit=limit,
ascending=(order == "asc"),
)
@router.get("/{agent_id}/folders", response_model=list[Source], operation_id="list_folders_for_agent")
async def list_folders_for_agent(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: Optional[str] = Query(
None, description="Source ID cursor for pagination. Returns sources that come before this source ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Source ID cursor for pagination. Returns sources that come after this source ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of sources to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for sources by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
):
"""
Get the folders associated with an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.list_attached_sources_async(
agent_id=agent_id,
actor=actor,
before=before,
after=after,
limit=limit,
ascending=(order == "asc"),
)
@router.get("/{agent_id}/files", response_model=PaginatedAgentFiles, operation_id="list_files_for_agent")
async def list_files_for_agent(
agent_id: AgentId,
before: Optional[str] = Query(
None, description="File ID cursor for pagination. Returns files that come before this file ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="File ID cursor for pagination. Returns files that come after this file ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of files to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for files by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
cursor: Optional[str] = Query(
None, description="Pagination cursor from previous response (deprecated, use before/after)", deprecated=True
),
is_open: Optional[bool] = Query(None, description="Filter by open status (true for open files, false for closed files)"),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Get the files attached to an agent with their open/closed status.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
effective_limit = limit or 20
# get paginated file-agent relationships for this agent
file_agents, next_cursor, has_more = await server.file_agent_manager.list_files_for_agent_paginated(
agent_id=agent_id,
actor=actor,
cursor=cursor, # keep for backwards compatibility
limit=effective_limit,
is_open=is_open,
before=before,
after=after,
ascending=(order == "asc"),
)
# enrich with file and source metadata
enriched_files = []
for fa in file_agents:
# get source/folder metadata
source = await server.source_manager.get_source_by_id(source_id=fa.source_id, actor=actor)
# build response object
attachment = AgentFileAttachment(
id=fa.id,
file_id=fa.file_id,
file_name=fa.file_name,
folder_id=fa.source_id,
folder_name=source.name if source else "Unknown",
is_open=fa.is_open,
last_accessed_at=fa.last_accessed_at,
visible_content=fa.visible_content,
start_line=fa.start_line,
end_line=fa.end_line,
)
enriched_files.append(attachment)
return PaginatedAgentFiles(files=enriched_files, next_cursor=next_cursor, has_more=has_more)
# TODO: remove? can also get with agent blocks
@router.get("/{agent_id}/core-memory", response_model=Memory, operation_id="retrieve_agent_memory", deprecated=True)
async def retrieve_agent_memory(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Retrieve the memory state of a specific agent.
This endpoint fetches the current memory state of the agent identified by the user ID and agent ID.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.get_agent_memory_async(agent_id=agent_id, actor=actor)
@router.get("/{agent_id}/core-memory/blocks/{block_label}", response_model=BlockResponse, operation_id="retrieve_core_memory_block")
async def retrieve_block_for_agent(
block_label: str,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Retrieve a core memory block from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.get_block_with_label_async(agent_id=agent_id, block_label=block_label, actor=actor)
@router.get("/{agent_id}/core-memory/blocks", response_model=list[BlockResponse], operation_id="list_core_memory_blocks")
async def list_blocks_for_agent(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: Optional[str] = Query(
None, description="Block ID cursor for pagination. Returns blocks that come before this block ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Block ID cursor for pagination. Returns blocks that come after this block ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of blocks to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for blocks by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
):
"""
Retrieve the core memory blocks of a specific agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.list_agent_blocks_async(
agent_id=agent_id,
actor=actor,
before=before,
after=after,
limit=limit,
ascending=(order == "asc"),
)
@router.patch("/{agent_id}/core-memory/blocks/{block_label}", response_model=BlockResponse, operation_id="modify_core_memory_block")
async def modify_block_for_agent(
block_label: str,
agent_id: AgentId,
block_update: BlockUpdate = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Updates a core memory block of an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
block = await server.agent_manager.modify_block_by_label_async(
agent_id=agent_id, block_label=block_label, block_update=block_update, actor=actor
)
# This should also trigger a system prompt change in the agent
await server.agent_manager.rebuild_system_prompt_async(agent_id=agent_id, actor=actor, force=True, update_timestamp=False)
return block
@router.patch("/{agent_id}/core-memory/blocks/attach/{block_id}", response_model=AgentState, operation_id="attach_core_memory_block")
async def attach_block_to_agent(
block_id: BlockId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Attach a core memory block to an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.attach_block_async(agent_id=agent_id, block_id=block_id, actor=actor)
@router.patch("/{agent_id}/core-memory/blocks/detach/{block_id}", response_model=AgentState, operation_id="detach_core_memory_block")
async def detach_block_from_agent(
block_id: BlockId,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Detach a core memory block from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.detach_block_async(agent_id=agent_id, block_id=block_id, actor=actor)
@router.patch("/{agent_id}/archives/attach/{archive_id}", response_model=None, operation_id="attach_archive_to_agent")
async def attach_archive_to_agent(
archive_id: str,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Attach an archive to an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.archive_manager.attach_agent_to_archive_async(
agent_id=agent_id,
archive_id=archive_id,
actor=actor,
)
return None
@router.patch("/{agent_id}/archives/detach/{archive_id}", response_model=None, operation_id="detach_archive_from_agent")
async def detach_archive_from_agent(
archive_id: str,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Detach an archive from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.archive_manager.detach_agent_from_archive_async(
agent_id=agent_id,
archive_id=archive_id,
actor=actor,
)
return None
@router.patch("/{agent_id}/identities/attach/{identity_id}", response_model=None, operation_id="attach_identity_to_agent")
async def attach_identity_to_agent(
identity_id: str,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Attach an identity to an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.identity_manager.attach_agent_async(
identity_id=identity_id,
agent_id=agent_id,
actor=actor,
)
return None
@router.patch("/{agent_id}/identities/detach/{identity_id}", response_model=None, operation_id="detach_identity_from_agent")
async def detach_identity_from_agent(
identity_id: str,
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Detach an identity from an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.identity_manager.detach_agent_async(
identity_id=identity_id,
agent_id=agent_id,
actor=actor,
)
return None
@router.get("/{agent_id}/archival-memory", response_model=list[Passage], operation_id="list_passages", deprecated=True)
async def list_passages(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
after: str | None = Query(None, description="Unique ID of the memory to start the query range at."),
before: str | None = Query(None, description="Unique ID of the memory to end the query range at."),
limit: int | None = Query(100, description="How many results to include in the response."),
search: str | None = Query(None, description="Search passages by text"),
ascending: bool | None = Query(
True, description="Whether to sort passages oldest to newest (True, default) or newest to oldest (False)"
),
headers: HeaderParams = Depends(get_headers),
):
"""
Retrieve the memories in an agent's archival memory store (paginated query).
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.get_agent_archival_async(
agent_id=agent_id,
actor=actor,
after=after,
before=before,
query_text=search,
limit=limit,
ascending=ascending,
)
@router.post("/{agent_id}/archival-memory", response_model=list[Passage], operation_id="create_passage", deprecated=True)
async def create_passage(
agent_id: AgentId,
request: CreateArchivalMemory = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Insert a memory into an agent's archival memory store.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.insert_archival_memory_async(
agent_id=agent_id, memory_contents=request.text, actor=actor, tags=request.tags, created_at=request.created_at
)
@router.get(
"/{agent_id}/archival-memory/search",
response_model=ArchivalMemorySearchResponse,
operation_id="search_archival_memory",
deprecated=True,
)
async def search_archival_memory(
agent_id: AgentId,
query: str = Query(..., description="String to search for using semantic similarity"),
tags: Optional[List[str]] = Query(None, description="Optional list of tags to filter search results"),
tag_match_mode: Literal["any", "all"] = Query(
"any", description="How to match tags - 'any' to match passages with any of the tags, 'all' to match only passages with all tags"
),
top_k: Optional[int] = Query(None, description="Maximum number of results to return. Uses system default if not specified"),
start_datetime: Optional[datetime] = Query(None, description="Filter results to passages created after this datetime"),
end_datetime: Optional[datetime] = Query(None, description="Filter results to passages created before this datetime"),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Search archival memory using semantic (embedding-based) search with optional temporal filtering.
This endpoint allows manual triggering of archival memory searches, enabling users to query
an agent's archival memory store directly via the API. The search uses the same functionality
as the agent's archival_memory_search tool but is accessible for external API usage.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# convert datetime to string in ISO 8601 format
start_datetime = start_datetime.isoformat() if start_datetime else None
end_datetime = end_datetime.isoformat() if end_datetime else None
# Use the shared agent manager method
formatted_results = await server.agent_manager.search_agent_archival_memory_async(
agent_id=agent_id,
actor=actor,
query=query,
tags=tags,
tag_match_mode=tag_match_mode,
top_k=top_k,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
# Convert to proper response schema
search_results = [ArchivalMemorySearchResult(**result) for result in formatted_results]
return ArchivalMemorySearchResponse(results=search_results, count=len(formatted_results))
# TODO(ethan): query or path parameter for memory_id?
# @router.delete("/{agent_id}/archival")
@router.delete("/{agent_id}/archival-memory/{memory_id}", response_model=None, operation_id="delete_passage", deprecated=True)
async def delete_passage(
memory_id: str,
agent_id: AgentId,
# memory_id: str = Query(..., description="Unique ID of the memory to be deleted."),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Delete a memory from an agent's archival memory store.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await server.delete_archival_memory_async(memory_id=memory_id, actor=actor)
return JSONResponse(status_code=status.HTTP_200_OK, content={"message": f"Memory id={memory_id} successfully deleted"})
AgentMessagesResponse = Annotated[
list[LettaMessageUnion], Field(json_schema_extra={"type": "array", "items": {"$ref": "#/components/schemas/LettaMessageUnion"}})
]
@router.get("/{agent_id}/messages", response_model=AgentMessagesResponse, operation_id="list_messages")
async def list_messages(
agent_id: AgentId,
server: "SyncServer" = Depends(get_letta_server),
before: Optional[str] = Query(
None, description="Message ID cursor for pagination. Returns messages that come before this message ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Message ID cursor for pagination. Returns messages that come after this message ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of messages to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for messages by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
group_id: str | None = Query(None, description="Group ID to filter messages by."),
use_assistant_message: bool = Query(True, description="Whether to use assistant messages", deprecated=True),
assistant_message_tool_name: str = Query(DEFAULT_MESSAGE_TOOL, description="The name of the designated message tool.", deprecated=True),
assistant_message_tool_kwarg: str = Query(DEFAULT_MESSAGE_TOOL_KWARG, description="The name of the message argument.", deprecated=True),
include_err: bool | None = Query(
None, description="Whether to include error messages and error statuses. For debugging purposes only."
),
headers: HeaderParams = Depends(get_headers),
):
"""
Retrieve message history for an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.get_agent_recall_async(
agent_id=agent_id,
after=after,
before=before,
limit=limit,
group_id=group_id,
reverse=(order == "desc"),
return_message_object=False,
use_assistant_message=use_assistant_message,
assistant_message_tool_name=assistant_message_tool_name,
assistant_message_tool_kwarg=assistant_message_tool_kwarg,
include_err=include_err,
actor=actor,
)
@router.patch("/{agent_id}/messages/{message_id}", response_model=LettaMessageUnion, operation_id="modify_message")
async def modify_message(
agent_id: AgentId, # backwards compatible. Consider removing for v1
message_id: MessageId,
request: LettaMessageUpdateUnion = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Update the details of a message associated with an agent.
"""
# TODO: support modifying tool calls/returns
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.message_manager.update_message_by_letta_message_async(
message_id=message_id, letta_message_update=request, actor=actor
)
# noinspection PyInconsistentReturns
@router.post(
"/{agent_id}/messages",
response_model=LettaResponse,
operation_id="send_message",
responses={
200: {
"description": "Successful response",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/LettaResponse"}},
"text/event-stream": {"description": "Server-Sent Events stream (when streaming=true in request body)"},
},
}
},
)
async def send_message(
request_obj: Request, # FastAPI Request
agent_id: AgentId,
server: SyncServer = Depends(get_letta_server),
request: LettaStreamingRequest = Body(...),
headers: HeaderParams = Depends(get_headers),
) -> StreamingResponse | LettaResponse:
"""
Process a user message and return the agent's response.
This endpoint accepts a message from a user and processes it through the agent.
The response format is controlled by the `streaming` field in the request body:
- If `streaming=false` (default): Returns a complete LettaResponse with all messages
- If `streaming=true`: Returns a Server-Sent Events (SSE) stream
Additional streaming options (only used when streaming=true):
- `stream_tokens`: Stream individual tokens instead of complete steps
- `include_pings`: Include keepalive pings to prevent connection timeouts
- `background`: Process the request in the background
"""
# After validation, messages should always be set (converted from input if needed)
if not request.messages or len(request.messages) == 0:
raise HTTPException(status_code=422, detail="Messages must not be empty")
# Validate streaming-specific options are only set when streaming=true
if not request.streaming:
errors = []
if request.stream_tokens is True:
errors.append("stream_tokens can only be true when streaming=true")
if request.include_pings is False:
errors.append("include_pings can only be set to false when streaming=true")
if request.background is True:
errors.append("background can only be true when streaming=true")
if errors:
raise HTTPException(
status_code=422,
detail=f"Streaming options set without streaming enabled. {'; '.join(errors)}. "
"Either set streaming=true or use default values for streaming options.",
)
is_1_0_sdk = is_1_0_sdk_version(headers)
if request.streaming and not is_1_0_sdk:
raise HTTPException(status_code=422, detail="streaming=true is only supported for SDK v1.0+ clients.")
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
if request.streaming and is_1_0_sdk:
streaming_service = StreamingService(server)
run, result = await streaming_service.create_agent_stream(
agent_id=agent_id,
actor=actor,
request=request,
run_type="send_message",
)
return result
request_start_timestamp_ns = get_utc_timestamp_ns()
MetricRegistry().user_message_counter.add(1, get_ctx_attributes())
# TODO: This is redundant, remove soon
agent = await server.agent_manager.get_agent_by_id_async(
agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"]
)
agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"]
model_compatible = agent.llm_config.model_endpoint_type in [
"anthropic",
"openai",
"together",
"google_ai",
"google_vertex",
"bedrock",
"ollama",
"azure",
"xai",
"zai",
"groq",
"deepseek",
]
# Create a new run for execution tracking
if settings.track_agent_run:
runs_manager = RunManager()
run = await runs_manager.create_run(
pydantic_run=PydanticRun(
agent_id=agent_id,
background=False,
metadata={
"run_type": "send_message",
},
request_config=LettaRequestConfig.from_letta_request(request),
),
actor=actor,
)
else:
run = None
# TODO (cliandy): clean this up
redis_client = await get_redis_client()
await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{agent_id}", run.id if run else None)
run_update_metadata = None
try:
result = None
if agent_eligible and model_compatible:
agent_loop = AgentLoop.load(agent_state=agent, actor=actor)
result = await agent_loop.step(
request.messages,
max_steps=request.max_steps,
run_id=run.id if run else None,
use_assistant_message=request.use_assistant_message,
request_start_timestamp_ns=request_start_timestamp_ns,
include_return_message_types=request.include_return_message_types,
client_tools=request.client_tools,
)
else:
result = await server.send_message_to_agent(
agent_id=agent_id,
actor=actor,
input_messages=request.messages,
stream_steps=False,
stream_tokens=False,
# Support for AssistantMessage
use_assistant_message=request.use_assistant_message,
assistant_message_tool_name=request.assistant_message_tool_name,
assistant_message_tool_kwarg=request.assistant_message_tool_kwarg,
include_return_message_types=request.include_return_message_types,
)
run_status = result.stop_reason.stop_reason.run_status
return result
except PendingApprovalError as e:
run_update_metadata = {"error": str(e)}
run_status = RunStatus.failed
raise HTTPException(
status_code=409, detail={"code": "PENDING_APPROVAL", "message": str(e), "pending_request_id": e.pending_request_id}
)
except Exception as e:
run_update_metadata = {"error": str(e)}
run_status = RunStatus.failed
raise
finally:
if settings.track_agent_run:
if result:
stop_reason = result.stop_reason.stop_reason
else:
# NOTE: we could also consider this an error?
stop_reason = None
await server.run_manager.update_run_by_id_async(
run_id=run.id,
update=RunUpdate(
status=run_status,
metadata=run_update_metadata,
stop_reason=stop_reason,
),
actor=actor,
)
# noinspection PyInconsistentReturns
@router.post(
"/{agent_id}/messages/stream",
response_model=LettaStreamingResponse,
operation_id="create_agent_message_stream",
responses={
200: {
"description": "Successful response",
"content": {
"text/event-stream": {"description": "Server-Sent Events stream"},
},
}
},
deprecated=True,
)
async def send_message_streaming(
request_obj: Request, # FastAPI Request
agent_id: AgentId,
server: SyncServer = Depends(get_letta_server),
request: LettaStreamingRequest = Body(...),
headers: HeaderParams = Depends(get_headers),
) -> StreamingResponse | LettaResponse:
"""
Process a user message and return the agent's response.
Deprecated: Use the `POST /{agent_id}/messages` endpoint with `streaming=true` in the request body instead.
This endpoint accepts a message from a user and processes it through the agent.
It will stream the steps of the response always, and stream the tokens if 'stream_tokens' is set to True.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Since this is the dedicated streaming endpoint, ensure streaming is enabled
request.streaming = True
# use the streaming service for unified stream handling
streaming_service = StreamingService(server)
run, result = await streaming_service.create_agent_stream(
agent_id=agent_id,
actor=actor,
request=request,
run_type="send_message_streaming",
)
return result
class CancelAgentRunRequest(BaseModel):
run_ids: list[str] | None = Field(None, description="Optional list of run IDs to cancel")
@router.post("/{agent_id}/messages/cancel", operation_id="cancel_message")
async def cancel_message(
agent_id: AgentId,
request: CancelAgentRunRequest = Body(None),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
) -> dict:
"""
Cancel runs associated with an agent. If run_ids are passed in, cancel those in particular.
Note to cancel active runs associated with an agent, redis is required.
"""
# TODO: WHY DOES THIS CANCEL A LIST OF RUNS?
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
if not settings.track_agent_run:
raise HTTPException(status_code=400, detail="Agent run tracking is disabled")
run_ids = request.run_ids if request else None
if not run_ids:
run_id = None
try:
redis_client = await get_redis_client()
run_id = await redis_client.get(f"{REDIS_RUN_ID_PREFIX}:{agent_id}")
except Exception as e:
# Redis is optional; fall back to DB to avoid surfacing 5XXs for cancellation.
logger.warning(f"Failed to look up run to cancel in redis for agent {agent_id}, falling back to DB: {e}")
if run_id is None:
logger.warning("Cannot find run associated with agent to cancel in redis, fetching from db.")
runs = await server.run_manager.list_runs(
actor=actor,
statuses=[RunStatus.created, RunStatus.running],
ascending=False,
agent_id=agent_id, # NOTE: this will override agent_ids if provided
limit=100, # Limit to 100 most recent active runs for cancellation
)
run_ids = [run.id for run in runs]
else:
run_ids = [run_id]
if not run_ids:
raise NoActiveRunsToCancelError(agent_id=agent_id)
results = {}
for run_id in run_ids:
try:
run = await server.run_manager.get_run_by_id(run_id=run_id, actor=actor)
if run.metadata and run.metadata.get("lettuce"):
try:
lettuce_client = await LettuceClient.create()
await lettuce_client.cancel(run_id)
except Exception as e:
# Do not surface cancellation failures as 5XXs.
logger.error(f"Failed to cancel Lettuce run {run_id}: {e}")
await server.run_manager.cancel_run(actor=actor, agent_id=agent_id, run_id=run_id)
except Exception as e:
results[run_id] = "failed"
# Cancellation failures should not raise errors back to the client.
logger.error(f"Failed to cancel run {run_id}: {str(e)}")
continue
results[run_id] = "cancelled"
logger.info(f"Cancelled run {run_id}")
return results
@router.post("/messages/search", response_model=List[MessageSearchResult], operation_id="search_messages")
async def search_messages(
request: MessageSearchRequest = Body(...),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Search messages across the entire organization with optional project and template filtering. Returns messages with FTS/vector ranks and total RRF score.
This is a cloud-only feature.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# get embedding config from the default agent if needed
# check if any agents exist in the org
agent_count = await server.agent_manager.size_async(actor=actor)
if agent_count == 0:
raise HTTPException(status_code=400, detail="No agents found in organization to derive embedding configuration from")
results = await server.message_manager.search_messages_org_async(
actor=actor,
query_text=request.query,
search_mode=request.search_mode,
roles=request.roles,
agent_id=request.agent_id,
project_id=request.project_id,
template_id=request.template_id,
limit=request.limit,
start_date=request.start_date,
end_date=request.end_date,
)
return results
async def _process_message_background(
run_id: str,
server: SyncServer,
actor: User,
agent_id: str,
messages: list[MessageCreate],
use_assistant_message: bool,
assistant_message_tool_name: str,
assistant_message_tool_kwarg: str,
max_steps: int = DEFAULT_MAX_STEPS,
include_return_message_types: list[MessageType] | None = None,
) -> None:
"""Background task to process the message and update run status."""
request_start_timestamp_ns = get_utc_timestamp_ns()
agent_loop = None
result = None
try:
agent = await server.agent_manager.get_agent_by_id_async(
agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"]
)
agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"]
model_compatible = agent.llm_config.model_endpoint_type in [
"anthropic",
"openai",
"together",
"google_ai",
"google_vertex",
"bedrock",
"ollama",
"azure",
"xai",
"zai",
"groq",
"deepseek",
]
if agent_eligible and model_compatible:
agent_loop = AgentLoop.load(agent_state=agent, actor=actor)
result = await agent_loop.step(
messages,
max_steps=max_steps,
run_id=run_id,
use_assistant_message=use_assistant_message,
request_start_timestamp_ns=request_start_timestamp_ns,
include_return_message_types=include_return_message_types,
)
else:
result = await server.send_message_to_agent(
agent_id=agent_id,
actor=actor,
input_messages=messages,
stream_steps=False,
stream_tokens=False,
metadata={"run_id": run_id},
# Support for AssistantMessage
use_assistant_message=use_assistant_message,
assistant_message_tool_name=assistant_message_tool_name,
assistant_message_tool_kwarg=assistant_message_tool_kwarg,
include_return_message_types=include_return_message_types,
)
runs_manager = RunManager()
from letta.schemas.enums import RunStatus
if result.stop_reason.stop_reason == "cancelled":
run_status = RunStatus.cancelled
else:
run_status = RunStatus.completed
await runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=run_status, stop_reason=result.stop_reason.stop_reason),
actor=actor,
)
except PendingApprovalError as e:
# Update run status to failed with specific error info
runs_manager = RunManager()
from letta.schemas.enums import RunStatus
await runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=RunStatus.failed, metadata={"error": str(e)}),
actor=actor,
)
except Exception as e:
# Update run status to failed
runs_manager = RunManager()
from letta.schemas.enums import RunStatus
await runs_manager.update_run_by_id_async(
run_id=run_id,
update=RunUpdate(status=RunStatus.failed, metadata={"error": str(e)}),
actor=actor,
)
finally:
# Critical: Explicit resource cleanup to prevent accumulation
if agent_loop and result:
await _cleanup_background_task_resources(agent_loop, result)
async def _cleanup_background_task_resources(agent_loop: BaseAgentV2 | LettaAgent, result: StreamingResponse | LettaResponse) -> None:
"""
Explicit cleanup of resources created during background message processing.
Proper cleanup of:
- Agent instances and their internal state
- Message buffers and response accumulation
- Any database connections or sessions
- LLM client resources
"""
import gc
try:
if agent_loop is not None:
if agent_loop.response_messages:
# Clear response message buffer to prevent accumulation
agent_loop.response_messages.clear()
# Clean up agent loop resources
del agent_loop
if result is not None:
del result # Clear result data to free memory
# Force garbage collection to clean up references and release memory
gc.collect()
except Exception as e:
# Handle errors for logging but don't fail the background task
logger.warning(f"Error during background task resource cleanup: {e}")
pass
@router.post(
"/{agent_id}/messages/async",
response_model=PydanticRun,
operation_id="create_agent_message_async",
)
async def send_message_async(
agent_id: AgentId,
server: SyncServer = Depends(get_letta_server),
request: LettaAsyncRequest = Body(...),
headers: HeaderParams = Depends(get_headers),
):
"""
Asynchronously process a user message and return a run object.
The actual processing happens in the background, and the status can be checked using the run ID.
This is "asynchronous" in the sense that it's a background run and explicitly must be fetched by the run ID.
"""
MetricRegistry().user_message_counter.add(1, get_ctx_attributes())
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
try:
is_message_input = request.messages[0].type == MessageCreateType.message
except:
is_message_input = True
use_lettuce = headers.experimental_params.message_async and is_message_input
# Create a new run
run = PydanticRun(
callback_url=request.callback_url,
agent_id=agent_id,
background=True, # Async endpoints are always background
metadata={
"run_type": "send_message_async",
"lettuce": use_lettuce,
},
request_config=LettaRequestConfig.from_letta_request(request),
)
run = await server.run_manager.create_run(
pydantic_run=run,
actor=actor,
)
if use_lettuce:
agent_state = await server.agent_manager.get_agent_by_id_async(
agent_id, actor, include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools"]
)
# Allow V1 agents only if the message async flag is enabled
is_v1_message_async_enabled = (
agent_state.agent_type == AgentType.letta_v1_agent and headers.experimental_params.letta_v1_agent_message_async
)
if agent_state.multi_agent_group is None and (agent_state.agent_type != AgentType.letta_v1_agent or is_v1_message_async_enabled):
lettuce_client = await LettuceClient.create()
run_id_from_lettuce = await lettuce_client.step(
agent_state=agent_state,
actor=actor,
input_messages=request.messages,
max_steps=request.max_steps,
run_id=run.id,
use_assistant_message=request.use_assistant_message,
include_return_message_types=request.include_return_message_types,
)
if run_id_from_lettuce:
return run
# Create asyncio task for background processing (shielded to prevent cancellation)
task = safe_create_shielded_task(
_process_message_background(
run_id=run.id,
server=server,
actor=actor,
agent_id=agent_id,
messages=request.messages,
use_assistant_message=request.use_assistant_message,
assistant_message_tool_name=request.assistant_message_tool_name,
assistant_message_tool_kwarg=request.assistant_message_tool_kwarg,
max_steps=request.max_steps,
include_return_message_types=request.include_return_message_types,
),
label=f"process_message_background_{run.id}",
)
def handle_task_completion(t):
try:
t.result()
except asyncio.CancelledError:
# Note: With shielded tasks, cancellation attempts don't actually stop the task
logger.info(f"Cancellation attempted on shielded background task for run {run.id}, but task continues running")
# Don't mark as failed since the shielded task is still running
except Exception as e:
logger.error(f"Unhandled exception in background task for run {run.id}: {e}")
from letta.services.run_manager import RunManager
error_str = str(e)
async def update_failed_run():
runs_manager = RunManager()
from letta.schemas.enums import RunStatus
await runs_manager.update_run_by_id_async(
run_id=run.id,
update=RunUpdate(status=RunStatus.failed, metadata={"error": error_str}),
actor=actor,
)
safe_create_task(
update_failed_run(),
label=f"update_failed_run_{run.id}",
)
task.add_done_callback(handle_task_completion)
return run
class ResetMessagesRequest(BaseModel):
"""Request body for resetting messages on an agent."""
add_default_initial_messages: bool = Field(
False,
description="If true, adds the default initial messages after resetting.",
)
@router.patch("/{agent_id}/reset-messages", response_model=Optional[AgentState], operation_id="reset_messages")
async def reset_messages(
agent_id: AgentId,
request: ResetMessagesRequest = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""Resets the messages for an agent"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await server.agent_manager.reset_messages_async(
agent_id=agent_id,
actor=actor,
add_default_initial_messages=request.add_default_initial_messages,
needs_agent_state=not is_1_0_sdk_version(headers),
)
@router.get("/{agent_id}/groups", response_model=list[Group], operation_id="list_groups_for_agent")
async def list_groups_for_agent(
agent_id: AgentId,
manager_type: str | None = Query(None, description="Manager type to filter groups by"),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: Optional[str] = Query(
None, description="Group ID cursor for pagination. Returns groups that come before this group ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Group ID cursor for pagination. Returns groups that come after this group ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of groups to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for groups by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
):
"""Lists the groups for an agent."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
logger.info("in list agents with manager_type: %s", manager_type)
return await server.agent_manager.list_groups_async(
agent_id=agent_id,
manager_type=manager_type,
actor=actor,
before=before,
after=after,
limit=limit,
ascending=(order == "asc"),
)
@router.post(
"/{agent_id}/messages/preview-raw-payload",
response_model=Dict[str, Any],
operation_id="preview_model_request",
)
async def preview_model_request(
agent_id: AgentId,
request: Union[LettaRequest, LettaStreamingRequest] = Body(...),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Inspect the raw LLM request payload without sending it.
This endpoint processes the message through the agent loop up until
the LLM request, then returns the raw request payload that would
be sent to the LLM provider. Useful for debugging and inspection.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent = await server.agent_manager.get_agent_by_id_async(
agent_id, actor, include_relationships=["multi_agent_group", "memory", "sources"]
)
agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"]
model_compatible = agent.llm_config.model_endpoint_type in [
"anthropic",
"openai",
"together",
"google_ai",
"google_vertex",
"bedrock",
"ollama",
"azure",
"xai",
"zai",
"groq",
"deepseek",
]
if agent_eligible and model_compatible:
agent_loop = AgentLoop.load(agent_state=agent, actor=actor)
return await agent_loop.build_request(
input_messages=request.messages,
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Payload inspection is not currently supported for this agent configuration.",
)
class CompactionRequest(BaseModel):
compaction_settings: Optional[CompactionSettings] = Field(
default=None,
description="Optional compaction settings to use for this summarization request. If not provided, the agent's default settings will be used.",
)
class CompactionResponse(BaseModel):
summary: str
num_messages_before: int
num_messages_after: int
@router.post("/{agent_id}/summarize", response_model=CompactionResponse, operation_id="summarize_messages")
async def summarize_messages(
agent_id: AgentId,
request: Optional[CompactionRequest] = Body(default=None),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Summarize an agent's conversation history.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"])
agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"]
model_compatible = agent.llm_config.model_endpoint_type in [
"anthropic",
"openai",
"together",
"google_ai",
"google_vertex",
"bedrock",
"ollama",
"azure",
"xai",
"zai",
"groq",
"deepseek",
]
if agent_eligible and model_compatible:
agent_loop = LettaAgentV3(agent_state=agent, actor=actor)
in_context_messages = await server.message_manager.get_messages_by_ids_async(message_ids=agent.message_ids, actor=actor)
compaction_settings = request.compaction_settings if request else None
num_messages_before = len(in_context_messages)
summary_message, messages, summary = await agent_loop.compact(
messages=in_context_messages,
compaction_settings=compaction_settings,
)
num_messages_after = len(messages)
# update the agent state
logger.info(f"Summarized {num_messages_before} messages to {num_messages_after}")
if num_messages_before <= num_messages_after:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Summarization failed to reduce the number of messages. You may need to use a different CompactionSettings (e.g. using `all` mode).",
)
await agent_loop._checkpoint_messages(run_id=None, step_id=None, new_messages=[summary_message], in_context_messages=messages)
return CompactionResponse(
summary=summary,
num_messages_before=num_messages_before,
num_messages_after=num_messages_after,
)
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.",
)
class CaptureMessagesRequest(BaseModel):
provider: str
model: str
request_messages: list[dict[str, Any]]
response_dict: dict[str, Any]
@router.post("/{agent_id}/messages/capture", response_model=str, operation_id="capture_messages", include_in_schema=False)
async def capture_messages(
agent_id: AgentId,
request: CaptureMessagesRequest = Body(...),
server: "SyncServer" = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Capture a list of messages for an agent.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"])
messages_to_persist = []
# Input user messages
for message in request.request_messages:
if message["role"] == "user":
messages_to_persist.append(
Message(
role=MessageRole.user,
content=[(TextContent(text=message["content"]))],
agent_id=agent_id,
tool_calls=None,
tool_call_id=None,
created_at=get_utc_time(),
)
)
# Assistant response
messages_to_persist.append(
Message(
role=MessageRole.assistant,
content=[(TextContent(text=request.response_dict["content"]))],
agent_id=agent_id,
model=request.model,
tool_calls=None,
tool_call_id=None,
created_at=get_utc_time(),
)
)
response_messages = await server.message_manager.create_many_messages_async(messages_to_persist, actor=actor)
sleeptime_group = agent.multi_agent_group if agent.multi_agent_group and agent.multi_agent_group.manager_type == "sleeptime" else None
if sleeptime_group:
sleeptime_agent_loop = SleeptimeMultiAgentV4(agent_state=agent, actor=actor, group=sleeptime_group)
sleeptime_agent_loop.response_messages = response_messages
run_ids = await sleeptime_agent_loop.run_sleeptime_agents()
return JSONResponse({"success": True, "messages_created": len(response_messages), "run_ids": run_ids})