Files
letta-server/letta/server/rest_api/routers/v1/conversations.py
cthomas 416ffc7cd7 Add billing context to LLM telemetry traces (#9745)
* feat: add billing context to LLM telemetry traces

Add billing metadata (plan type, cost source, customer ID) to LLM traces in ClickHouse for cost analytics and attribution.

**Data Flow:**
- Cloud-API: Extract billing info from subscription in rate limiting, set x-billing-* headers
- Core: Parse headers into BillingContext object via dependencies
- Adapters: Flow billing_context through all LLM adapters (blocking & streaming)
- Agent: Pass billing_context to step() and stream() methods
- ClickHouse: Store in billing_plan_type, billing_cost_source, billing_customer_id columns

**Changes:**
- Add BillingContext schema to provider_trace.py
- Add billing columns to llm_traces ClickHouse table DDL
- Update getCustomerSubscription to fetch stripeCustomerId from organization_billing_details
- Propagate billing_context through agent step flow, adapters, and streaming service
- Update ProviderTrace and LLMTrace to include billing metadata
- Regenerate SDK with autogen

**Production Deployment:**
Requires env vars: LETTA_PROVIDER_TRACE_BACKEND=clickhouse, LETTA_STORE_LLM_TRACES=true, CLICKHOUSE_*

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: add billing_context parameter to agent step methods

- Add billing_context to BaseAgent and BaseAgentV2 abstract methods
- Update LettaAgent, LettaAgentV2, LettaAgentV3 step methods
- Update multi-agent groups: SleeptimeMultiAgentV2, V3, V4
- Fix test_utils.py to include billing header parameters
- Import BillingContext in all affected files

* fix: add billing_context to stream methods

- Add billing_context parameter to BaseAgentV2.stream()
- Add billing_context parameter to LettaAgentV2.stream()
- LettaAgentV3.stream() already has it from previous commit

* fix: exclude billing headers from OpenAPI spec

Mark billing headers as internal (include_in_schema=False) so they don't appear in the public API.
These are internal headers between cloud-api and core, not part of the public SDK.

Regenerated SDK with stage-api - removes 10,650 lines of bloat that was causing OOM during Next.js build.

* refactor: return billing context from handleUnifiedRateLimiting instead of mutating req

Instead of passing req into handleUnifiedRateLimiting and mutating headers inside it:
- Return billing context fields (billingPlanType, billingCostSource, billingCustomerId) from handleUnifiedRateLimiting
- Set headers in handleMessageRateLimiting (middleware layer) after getting the result
- This fixes step-orchestrator compatibility since it doesn't have a real Express req object

* chore: remove extra gencode

* p

---------

Co-authored-by: Letta <noreply@letta.com>
2026-03-03 18:34:13 -08:00

770 lines
31 KiB
Python

from datetime import timedelta
from typing import Annotated, List, Literal, Optional
from uuid import uuid4
from fastapi import APIRouter, Body, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from starlette.responses import StreamingResponse
from letta.agents.agent_loop import AgentLoop
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.constants import REDIS_RUN_ID_PREFIX
from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client
from letta.errors import LettaExpiredError, LettaInvalidArgumentError, NoActiveRunsToCancelError
from letta.helpers.datetime_helpers import get_utc_time
from letta.log import get_logger
from letta.schemas.conversation import Conversation, CreateConversation, UpdateConversation
from letta.schemas.enums import RunStatus
from letta.schemas.job import LettaRequestConfig
from letta.schemas.letta_message import LettaMessageUnion
from letta.schemas.letta_request import ConversationMessageRequest, LettaStreamingRequest, RetrieveStreamRequest
from letta.schemas.letta_response import LettaResponse
from letta.schemas.provider_trace import BillingContext
from letta.schemas.run import Run as PydanticRun
from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server
from letta.server.rest_api.redis_stream_manager import redis_sse_stream_generator
from letta.server.rest_api.streaming_response import (
StreamingResponseWithStatusCode,
add_keepalive_to_stream,
)
from letta.server.server import SyncServer
from letta.services.conversation_manager import ConversationManager
from letta.services.lettuce import LettuceClient
from letta.services.run_manager import RunManager
from letta.services.streaming_service import StreamingService
from letta.services.summarizer.summarizer_config import CompactionSettings
from letta.settings import settings
from letta.validators import ConversationId
router = APIRouter(prefix="/conversations", tags=["conversations"])
logger = get_logger(__name__)
# Instantiate manager
conversation_manager = ConversationManager()
@router.post("/", response_model=Conversation, operation_id="create_conversation")
async def create_conversation(
agent_id: str = Query(..., description="The agent ID to create a conversation for"),
conversation_create: CreateConversation = Body(default_factory=CreateConversation),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""Create a new conversation for an agent."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await conversation_manager.create_conversation(
agent_id=agent_id,
conversation_create=conversation_create,
actor=actor,
)
@router.get("/", response_model=List[Conversation], operation_id="list_conversations")
async def list_conversations(
agent_id: Optional[str] = Query(
None, description="The agent ID to list conversations for (optional - returns all conversations if not provided)"
),
limit: int = Query(50, description="Maximum number of conversations to return"),
after: Optional[str] = Query(None, description="Cursor for pagination (conversation ID)"),
summary_search: Optional[str] = Query(None, description="Search for text within conversation summaries"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for conversations. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at", "last_run_completion"] = Query("created_at", description="Field to sort by"),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""List all conversations for an agent (or all conversations if agent_id not provided)."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
ascending = order == "asc"
return await conversation_manager.list_conversations(
agent_id=agent_id,
actor=actor,
limit=limit,
after=after,
summary_search=summary_search,
ascending=ascending,
sort_by=order_by,
)
@router.get("/{conversation_id}", response_model=Conversation, operation_id="retrieve_conversation")
async def retrieve_conversation(
conversation_id: ConversationId,
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""Retrieve a specific conversation."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await conversation_manager.get_conversation_by_id(
conversation_id=conversation_id,
actor=actor,
)
@router.patch("/{conversation_id}", response_model=Conversation, operation_id="update_conversation")
async def update_conversation(
conversation_id: ConversationId,
conversation_update: UpdateConversation = Body(...),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""Update a conversation."""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
return await conversation_manager.update_conversation(
conversation_id=conversation_id,
conversation_update=conversation_update,
actor=actor,
)
@router.delete("/{conversation_id}", response_model=None, operation_id="delete_conversation")
async def delete_conversation(
conversation_id: ConversationId,
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Delete a conversation (soft delete).
This marks the conversation as deleted but does not permanently remove it from the database.
The conversation will no longer appear in list operations.
Any isolated blocks associated with the conversation will be permanently deleted.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
await conversation_manager.delete_conversation(
conversation_id=conversation_id,
actor=actor,
)
ConversationMessagesResponse = Annotated[
List[LettaMessageUnion], Field(json_schema_extra={"type": "array", "items": {"$ref": "#/components/schemas/LettaMessageUnion"}})
]
@router.get(
"/{conversation_id}/messages",
response_model=ConversationMessagesResponse,
operation_id="list_conversation_messages",
)
async def list_conversation_messages(
conversation_id: ConversationId,
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
before: Optional[str] = Query(
None, description="Message ID cursor for pagination. Returns messages that come before this message ID in the specified sort order"
),
after: Optional[str] = Query(
None, description="Message ID cursor for pagination. Returns messages that come after this message ID in the specified sort order"
),
limit: Optional[int] = Query(100, description="Maximum number of messages to return"),
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for messages by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
group_id: Optional[str] = Query(None, description="Group ID to filter messages by."),
include_err: Optional[bool] = Query(
None, description="Whether to include error messages and error statuses. For debugging purposes only."
),
):
"""
List all messages in a conversation.
Returns LettaMessage objects (UserMessage, AssistantMessage, etc.) for all
messages in the conversation, with support for cursor-based pagination.
If conversation_id is an agent ID (starts with "agent-"), returns messages
from the agent's default conversation (no conversation isolation).
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Agent-direct mode: list agent's default conversation messages
if conversation_id.startswith("agent-"):
return await server.get_agent_recall_async(
agent_id=conversation_id,
after=after,
before=before,
limit=limit,
group_id=group_id,
conversation_id=None, # Default conversation (no isolation)
reverse=(order == "desc"),
return_message_object=False,
include_err=include_err,
actor=actor,
)
return await conversation_manager.list_conversation_messages(
conversation_id=conversation_id,
actor=actor,
limit=limit,
before=before,
after=after,
reverse=(order == "desc"),
group_id=group_id,
include_err=include_err,
)
async def _send_agent_direct_message(
agent_id: str,
request: ConversationMessageRequest,
server: SyncServer,
actor,
billing_context: "BillingContext | None" = None,
) -> StreamingResponse | LettaResponse:
"""
Handle agent-direct messaging with locking but without conversation features.
This is used when the conversation_id in the URL is actually an agent ID,
providing a unified endpoint while maintaining agent-level locking.
"""
redis_client = await get_redis_client()
# Streaming mode (default)
if request.streaming:
streaming_request = LettaStreamingRequest(
messages=request.messages,
streaming=True,
stream_tokens=request.stream_tokens,
include_pings=request.include_pings,
background=request.background,
max_steps=request.max_steps,
use_assistant_message=request.use_assistant_message,
assistant_message_tool_name=request.assistant_message_tool_name,
assistant_message_tool_kwarg=request.assistant_message_tool_kwarg,
include_return_message_types=request.include_return_message_types,
override_model=request.override_model,
client_tools=request.client_tools,
)
streaming_service = StreamingService(server)
run, result = await streaming_service.create_agent_stream(
agent_id=agent_id,
actor=actor,
request=streaming_request,
run_type="send_message",
conversation_id=None,
should_lock=True,
billing_context=billing_context,
)
return result
# Non-streaming mode with locking
agent = await server.agent_manager.get_agent_by_id_async(
agent_id,
actor,
include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools", "tags"],
)
# Handle model override if specified in the request
if request.override_model:
override_llm_config = await server.get_llm_config_from_handle_async(
actor=actor,
handle=request.override_model,
)
agent = agent.model_copy(update={"llm_config": override_llm_config})
# Acquire lock using agent_id as lock key
if not isinstance(redis_client, NoopAsyncRedisClient):
await redis_client.acquire_conversation_lock(
conversation_id=agent_id,
token=str(uuid4()),
)
try:
# Create a run for execution tracking
run = None
if settings.track_agent_run:
runs_manager = RunManager()
run = await runs_manager.create_run(
pydantic_run=PydanticRun(
agent_id=agent_id,
background=False,
metadata={
"run_type": "send_message",
},
request_config=LettaRequestConfig.from_letta_request(request),
),
actor=actor,
)
# Set run_id in Redis for cancellation support
await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{agent_id}", run.id if run else None)
agent_loop = AgentLoop.load(agent_state=agent, actor=actor)
return await agent_loop.step(
request.messages,
max_steps=request.max_steps,
run_id=run.id if run else None,
use_assistant_message=request.use_assistant_message,
include_return_message_types=request.include_return_message_types,
client_tools=request.client_tools,
conversation_id=None,
include_compaction_messages=request.include_compaction_messages,
billing_context=billing_context,
)
finally:
# Release lock
await redis_client.release_conversation_lock(agent_id)
@router.post(
"/{conversation_id}/messages",
response_model=LettaResponse,
operation_id="send_conversation_message",
responses={
200: {
"description": "Successful response",
"content": {
"text/event-stream": {"description": "Server-Sent Events stream (default, when streaming=true)"},
"application/json": {"description": "JSON response (when streaming=false)"},
},
}
},
)
async def send_conversation_message(
conversation_id: ConversationId,
request: ConversationMessageRequest = Body(...),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
) -> StreamingResponse | LettaResponse:
"""
Send a message to a conversation and get a response.
This endpoint sends a message to an existing conversation.
By default (streaming=true), returns a streaming response (Server-Sent Events).
Set streaming=false to get a complete JSON response.
If conversation_id is an agent ID (starts with "agent-"), routes to agent-direct
mode with locking but without conversation-specific features.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
if not request.messages or len(request.messages) == 0:
raise HTTPException(status_code=422, detail="Messages must not be empty")
# Detect agent-direct mode: conversation_id is actually an agent ID
is_agent_direct = conversation_id.startswith("agent-")
if is_agent_direct:
# Agent-direct mode: use agent ID, enable locking, skip conversation features
agent_id = conversation_id
return await _send_agent_direct_message(
agent_id=agent_id,
request=request,
server=server,
actor=actor,
billing_context=headers.billing_context,
)
# Normal conversation mode
conversation = await conversation_manager.get_conversation_by_id(
conversation_id=conversation_id,
actor=actor,
)
# Streaming mode (default)
if request.streaming:
# Convert to LettaStreamingRequest for StreamingService compatibility
streaming_request = LettaStreamingRequest(
messages=request.messages,
streaming=True,
stream_tokens=request.stream_tokens,
include_pings=request.include_pings,
background=request.background,
max_steps=request.max_steps,
use_assistant_message=request.use_assistant_message,
assistant_message_tool_name=request.assistant_message_tool_name,
assistant_message_tool_kwarg=request.assistant_message_tool_kwarg,
include_return_message_types=request.include_return_message_types,
override_model=request.override_model,
client_tools=request.client_tools,
)
streaming_service = StreamingService(server)
run, result = await streaming_service.create_agent_stream(
agent_id=conversation.agent_id,
actor=actor,
request=streaming_request,
run_type="send_conversation_message",
conversation_id=conversation_id,
billing_context=headers.billing_context,
)
return result
# Non-streaming mode
agent = await server.agent_manager.get_agent_by_id_async(
conversation.agent_id,
actor,
include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools", "tags"],
)
# Apply conversation-level model override if set (lower priority than request override)
if conversation.model and not request.override_model:
conversation_llm_config = await server.get_llm_config_from_handle_async(
actor=actor,
handle=conversation.model,
)
if conversation.model_settings is not None:
update_params = conversation.model_settings._to_legacy_config_params()
# Don't clobber max_tokens with the Pydantic default when the caller
# didn't explicitly provide max_output_tokens.
if "max_output_tokens" not in conversation.model_settings.model_fields_set:
update_params.pop("max_tokens", None)
conversation_llm_config = conversation_llm_config.model_copy(update=update_params)
agent = agent.model_copy(update={"llm_config": conversation_llm_config})
if request.override_model:
override_llm_config = await server.get_llm_config_from_handle_async(
actor=actor,
handle=request.override_model,
)
agent = agent.model_copy(update={"llm_config": override_llm_config})
# Create a run for execution tracking
run = None
if settings.track_agent_run:
runs_manager = RunManager()
run = await runs_manager.create_run(
pydantic_run=PydanticRun(
agent_id=conversation.agent_id,
background=False,
metadata={
"run_type": "send_conversation_message",
},
request_config=LettaRequestConfig.from_letta_request(request),
),
actor=actor,
)
# Set run_id in Redis for cancellation support
redis_client = await get_redis_client()
await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{conversation.agent_id}", run.id if run else None)
agent_loop = AgentLoop.load(agent_state=agent, actor=actor)
return await agent_loop.step(
request.messages,
max_steps=request.max_steps,
run_id=run.id if run else None,
use_assistant_message=request.use_assistant_message,
include_return_message_types=request.include_return_message_types,
client_tools=request.client_tools,
conversation_id=conversation_id,
include_compaction_messages=request.include_compaction_messages,
billing_context=headers.billing_context,
)
@router.post(
"/{conversation_id}/stream",
response_model=None,
operation_id="retrieve_conversation_stream",
responses={
200: {
"description": "Successful response",
"content": {
"text/event-stream": {
"description": "Server-Sent Events stream",
"schema": {
"oneOf": [
{"$ref": "#/components/schemas/SystemMessage"},
{"$ref": "#/components/schemas/UserMessage"},
{"$ref": "#/components/schemas/ReasoningMessage"},
{"$ref": "#/components/schemas/HiddenReasoningMessage"},
{"$ref": "#/components/schemas/ToolCallMessage"},
{"$ref": "#/components/schemas/ToolReturnMessage"},
{"$ref": "#/components/schemas/AssistantMessage"},
{"$ref": "#/components/schemas/ApprovalRequestMessage"},
{"$ref": "#/components/schemas/ApprovalResponseMessage"},
{"$ref": "#/components/schemas/LettaPing"},
{"$ref": "#/components/schemas/LettaErrorMessage"},
{"$ref": "#/components/schemas/LettaStopReason"},
{"$ref": "#/components/schemas/LettaUsageStatistics"},
]
},
},
},
}
},
)
async def retrieve_conversation_stream(
conversation_id: ConversationId,
request: RetrieveStreamRequest = Body(None),
headers: HeaderParams = Depends(get_headers),
server: SyncServer = Depends(get_letta_server),
):
"""
Resume the stream for the most recent active run in a conversation.
This endpoint allows you to reconnect to an active background stream
for a conversation, enabling recovery from network interruptions.
If conversation_id is an agent ID (starts with "agent-"), retrieves the
stream for the agent's most recent active run.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
runs_manager = RunManager()
# Find the most recent active run
if conversation_id.startswith("agent-"):
# Agent-direct mode: find runs by agent_id
active_runs = await runs_manager.list_runs(
actor=actor,
agent_id=conversation_id,
statuses=[RunStatus.created, RunStatus.running],
limit=1,
ascending=False,
)
else:
# Normal mode: find runs by conversation_id
active_runs = await runs_manager.list_runs(
actor=actor,
conversation_id=conversation_id,
statuses=[RunStatus.created, RunStatus.running],
limit=1,
ascending=False,
)
if not active_runs:
raise LettaInvalidArgumentError("No active runs found for this conversation.")
run = active_runs[0]
if not run.background:
raise LettaInvalidArgumentError("Run was not created in background mode, so it cannot be retrieved.")
if run.created_at < get_utc_time() - timedelta(hours=3):
raise LettaExpiredError("Run was created more than 3 hours ago, and is now expired.")
redis_client = await get_redis_client()
if isinstance(redis_client, NoopAsyncRedisClient):
raise HTTPException(
status_code=503,
detail=(
"Background streaming requires Redis to be running. "
"Please ensure Redis is properly configured. "
f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}"
),
)
stream = redis_sse_stream_generator(
redis_client=redis_client,
run_id=run.id,
starting_after=request.starting_after if request else None,
poll_interval=request.poll_interval if request else None,
batch_size=request.batch_size if request else None,
)
if settings.enable_cancellation_aware_streaming:
from letta.server.rest_api.streaming_response import cancellation_aware_stream_wrapper, get_cancellation_event_for_run
stream = cancellation_aware_stream_wrapper(
stream_generator=stream,
run_manager=server.run_manager,
run_id=run.id,
actor=actor,
cancellation_event=get_cancellation_event_for_run(run.id),
)
if request and request.include_pings and settings.enable_keepalive:
stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval, run_id=run.id)
return StreamingResponseWithStatusCode(
stream,
media_type="text/event-stream",
)
@router.post("/{conversation_id}/cancel", operation_id="cancel_conversation")
async def cancel_conversation(
conversation_id: ConversationId,
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
) -> dict:
"""
Cancel runs associated with a conversation.
Note: To cancel active runs, Redis is required.
If conversation_id is an agent ID (starts with "agent-"), cancels runs
for the agent's default conversation.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
logger.info(
"[Interrupt] Cancel request received for conversation=%s by actor=%s (org=%s)",
conversation_id,
actor.id,
actor.organization_id,
)
if not settings.track_agent_run:
raise HTTPException(status_code=400, detail="Agent run tracking is disabled")
# Agent-direct mode: use agent_id directly, skip conversation lookup
if conversation_id.startswith("agent-"):
agent_id = conversation_id
# Find active runs for this agent (default conversation has conversation_id=None)
runs = await server.run_manager.list_runs(
actor=actor,
agent_id=agent_id,
statuses=[RunStatus.created, RunStatus.running],
ascending=False,
limit=100,
)
else:
# Verify conversation exists and get agent_id
conversation = await conversation_manager.get_conversation_by_id(
conversation_id=conversation_id,
actor=actor,
)
agent_id = conversation.agent_id
# Find active runs for this conversation
runs = await server.run_manager.list_runs(
actor=actor,
statuses=[RunStatus.created, RunStatus.running],
ascending=False,
conversation_id=conversation_id,
limit=100,
)
run_ids = [run.id for run in runs]
if not run_ids:
raise NoActiveRunsToCancelError(conversation_id=conversation_id)
results = {}
for run_id in run_ids:
try:
run = await server.run_manager.get_run_by_id(run_id=run_id, actor=actor)
if run.metadata and run.metadata.get("lettuce"):
try:
lettuce_client = await LettuceClient.create()
await lettuce_client.cancel(run_id)
except Exception as e:
logger.error(f"Failed to cancel Lettuce run {run_id}: {e}")
await server.run_manager.cancel_run(actor=actor, agent_id=agent_id, run_id=run_id)
except Exception as e:
results[run_id] = "failed"
logger.error(f"Failed to cancel run {run_id}: {str(e)}")
continue
results[run_id] = "cancelled"
logger.info(f"Cancelled run {run_id}")
return results
class CompactionRequest(BaseModel):
compaction_settings: Optional[CompactionSettings] = Field(
default=None,
description="Optional compaction settings to use for this summarization request. If not provided, the agent's default settings will be used.",
)
class CompactionResponse(BaseModel):
summary: str
num_messages_before: int
num_messages_after: int
@router.post("/{conversation_id}/compact", response_model=CompactionResponse, operation_id="compact_conversation")
async def compact_conversation(
conversation_id: ConversationId,
request: Optional[CompactionRequest] = Body(default=None),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Compact (summarize) a conversation's message history.
This endpoint summarizes the in-context messages for a specific conversation,
reducing the message count while preserving important context.
If conversation_id is an agent ID (starts with "agent-"), compacts the
agent's default conversation messages.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Agent-direct mode: compact agent's default conversation
if conversation_id.startswith("agent-"):
agent_id = conversation_id
agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"])
in_context_messages = await server.message_manager.get_messages_by_ids_async(message_ids=agent.message_ids, actor=actor)
agent_loop = LettaAgentV3(agent_state=agent, actor=actor)
else:
# Get the conversation to find the agent_id
conversation = await conversation_manager.get_conversation_by_id(
conversation_id=conversation_id,
actor=actor,
)
# Get the agent state
agent = await server.agent_manager.get_agent_by_id_async(conversation.agent_id, actor, include_relationships=["multi_agent_group"])
# Get in-context messages for this conversation
in_context_messages = await conversation_manager.get_messages_for_conversation(
conversation_id=conversation_id,
actor=actor,
)
# Create agent loop with conversation context
agent_loop = LettaAgentV3(agent_state=agent, actor=actor, conversation_id=conversation_id)
if not in_context_messages:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No in-context messages found for this conversation.",
)
# Merge request compaction_settings with agent's settings (request overrides agent)
if agent.compaction_settings and request and request.compaction_settings:
# Start with agent's settings, override with new values from request
# Use model_fields_set to get the fields that were changed in the request (want to ignore the defaults that get set automatically)
compaction_settings = agent.compaction_settings.copy() # do not mutate original agent compaction settings
changed_fields = request.compaction_settings.model_fields_set
for field in changed_fields:
setattr(compaction_settings, field, getattr(request.compaction_settings, field))
# If mode changed from agent's original settings and prompt not explicitly set in request, then use the default prompt for the new mode
# Ex: previously was sliding_window, now is all, so we need to use the default prompt for all mode
if (
"mode" in changed_fields
and "prompt" not in changed_fields
and agent.compaction_settings.mode != request.compaction_settings.mode
):
from letta.services.summarizer.summarizer_config import get_default_prompt_for_mode
compaction_settings.prompt = get_default_prompt_for_mode(compaction_settings.mode)
else:
compaction_settings = (request and request.compaction_settings) or agent.compaction_settings
num_messages_before = len(in_context_messages)
# Run compaction
summary_message, messages, summary = await agent_loop.compact(
messages=in_context_messages,
compaction_settings=compaction_settings,
use_summary_role=True,
)
num_messages_after = len(messages)
# Validate compaction reduced messages
if num_messages_before <= num_messages_after:
logger.warning(f"Summarization failed to reduce the number of messages. {num_messages_before} messages -> {num_messages_after}.")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Summarization failed to reduce the number of messages. You may not have enough messages to compact or need to use a different CompactionSettings (e.g. using `all` mode).",
)
# Checkpoint the messages (this will update the conversation_messages table)
await agent_loop._checkpoint_messages(run_id=None, step_id=None, new_messages=[summary_message], in_context_messages=messages)
logger.info(f"Compacted conversation {conversation_id}: {num_messages_before} messages -> {num_messages_after}")
return CompactionResponse(
summary=summary,
num_messages_before=num_messages_before,
num_messages_after=num_messages_after,
)