From 5c7bed77435323ddd9995a355d27493a413fac4d Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Fri, 16 Jan 2026 16:22:06 -0800 Subject: [PATCH] feat: add conversation_id to export export and compact (#8792) --- letta/server/rest_api/routers/v1/agents.py | 6 +- .../rest_api/routers/v1/conversations.py | 110 +++++++++++++++++- letta/services/agent_serialization_manager.py | 17 ++- 3 files changed, 129 insertions(+), 4 deletions(-) diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 509fd088..031feaa8 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -243,6 +243,10 @@ async def export_agent( description="If True, exports using the legacy single-agent 'v1' format with inline tools/blocks. If False, exports using the new multi-entity 'v2' format, with separate agents, tools, blocks, files, etc.", deprecated=True, ), + conversation_id: Optional[str] = Query( + None, + description="Conversation ID to export. If provided, uses messages from this conversation instead of the agent's global message history.", + ), # do not remove, used to autogeneration of spec # TODO: Think of a better way to export AgentFileSchema spec: AgentFileSchema | None = None, @@ -254,7 +258,7 @@ async def export_agent( if use_legacy_format: raise HTTPException(status_code=400, detail="Legacy format is not supported") actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - agent_file_schema = await server.agent_serialization_manager.export(agent_ids=[agent_id], actor=actor) + agent_file_schema = await server.agent_serialization_manager.export(agent_ids=[agent_id], actor=actor, conversation_id=conversation_id) return agent_file_schema.model_dump() diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index e63e111a..9cbba747 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -1,10 +1,11 @@ from datetime import timedelta from typing import Annotated, List, Literal, Optional -from fastapi import APIRouter, Body, Depends, HTTPException, Query -from pydantic import Field +from fastapi import APIRouter, Body, Depends, HTTPException, Query, status +from pydantic import BaseModel, Field from starlette.responses import StreamingResponse +from letta.agents.letta_agent_v3 import LettaAgentV3 from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client from letta.errors import LettaExpiredError, LettaInvalidArgumentError, NoActiveRunsToCancelError from letta.helpers.datetime_helpers import get_utc_time @@ -26,6 +27,7 @@ from letta.services.conversation_manager import ConversationManager from letta.services.lettuce import LettuceClient from letta.services.run_manager import RunManager from letta.services.streaming_service import StreamingService +from letta.services.summarizer.summarizer_config import CompactionSettings from letta.settings import settings from letta.validators import ConversationId @@ -358,3 +360,107 @@ async def cancel_conversation( logger.info(f"Cancelled run {run_id}") return results + + +class CompactionRequest(BaseModel): + compaction_settings: Optional[CompactionSettings] = Field( + default=None, + description="Optional compaction settings to use for this summarization request. If not provided, the agent's default settings will be used.", + ) + + +class CompactionResponse(BaseModel): + summary: str + num_messages_before: int + num_messages_after: int + + +@router.post("/{conversation_id}/compact", response_model=CompactionResponse, operation_id="compact_conversation") +async def compact_conversation( + conversation_id: ConversationId, + request: Optional[CompactionRequest] = Body(default=None), + server: SyncServer = Depends(get_letta_server), + headers: HeaderParams = Depends(get_headers), +): + """ + Compact (summarize) a conversation's message history. + + This endpoint summarizes the in-context messages for a specific conversation, + reducing the message count while preserving important context. + """ + actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + + # Get the conversation to find the agent_id + conversation = await conversation_manager.get_conversation_by_id( + conversation_id=conversation_id, + actor=actor, + ) + + # Get the agent state + agent = await server.agent_manager.get_agent_by_id_async(conversation.agent_id, actor, include_relationships=["multi_agent_group"]) + + # Check eligibility + agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] + model_compatible = agent.llm_config.model_endpoint_type in [ + "anthropic", + "openai", + "together", + "google_ai", + "google_vertex", + "bedrock", + "ollama", + "azure", + "xai", + "zai", + "groq", + "deepseek", + ] + + if not (agent_eligible and model_compatible): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.", + ) + + # Get in-context messages for this conversation + in_context_messages = await conversation_manager.get_messages_for_conversation( + conversation_id=conversation_id, + actor=actor, + ) + + if not in_context_messages: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="No in-context messages found for this conversation.", + ) + + # Create agent loop with conversation context + agent_loop = LettaAgentV3(agent_state=agent, actor=actor, conversation_id=conversation_id) + + compaction_settings = request.compaction_settings if request else None + num_messages_before = len(in_context_messages) + + # Run compaction + summary_message, messages, summary = await agent_loop.compact( + messages=in_context_messages, + compaction_settings=compaction_settings, + ) + num_messages_after = len(messages) + + # Validate compaction reduced messages + if num_messages_before <= num_messages_after: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Summarization failed to reduce the number of messages. You may need to use a different CompactionSettings (e.g. using `all` mode).", + ) + + # Checkpoint the messages (this will update the conversation_messages table) + await agent_loop._checkpoint_messages(run_id=None, step_id=None, new_messages=[summary_message], in_context_messages=messages) + + logger.info(f"Compacted conversation {conversation_id}: {num_messages_before} messages -> {num_messages_after}") + + return CompactionResponse( + summary=summary, + num_messages_before=num_messages_before, + num_messages_after=num_messages_after, + ) diff --git a/letta/services/agent_serialization_manager.py b/letta/services/agent_serialization_manager.py index 185aad85..eb58d022 100644 --- a/letta/services/agent_serialization_manager.py +++ b/letta/services/agent_serialization_manager.py @@ -358,12 +358,14 @@ class AgentSerializationManager: logger.error(f"Failed to convert group {group.id}: {e}") raise - async def export(self, agent_ids: List[str], actor: User) -> AgentFileSchema: + async def export(self, agent_ids: List[str], actor: User, conversation_id: Optional[str] = None) -> AgentFileSchema: """ Export agents and their related entities to AgentFileSchema format. Args: agent_ids: List of agent UUIDs to export + conversation_id: Optional conversation ID. If provided, uses the conversation's + in-context message_ids instead of the agent's global message_ids. Returns: AgentFileSchema with all related entities @@ -376,6 +378,19 @@ class AgentSerializationManager: agent_states = await self.agent_manager.get_agents_by_ids_async(agent_ids=agent_ids, actor=actor) + # If conversation_id is provided, override the agent's message_ids with conversation's + if conversation_id: + from letta.services.conversation_manager import ConversationManager + + conversation_manager = ConversationManager() + conversation_message_ids = await conversation_manager.get_message_ids_for_conversation( + conversation_id=conversation_id, + actor=actor, + ) + # Override message_ids for the first agent (conversation export is single-agent) + if agent_states: + agent_states[0].message_ids = conversation_message_ids + # Validate that all requested agents were found if len(agent_states) != len(agent_ids): found_ids = {agent.id for agent in agent_states}