feat: add conversation_id to export export and compact (#8792)

This commit is contained in:
Sarah Wooders
2026-01-16 16:22:06 -08:00
parent e2a8a95371
commit 5c7bed7743
3 changed files with 129 additions and 4 deletions

View File

@@ -243,6 +243,10 @@ async def export_agent(
description="If True, exports using the legacy single-agent 'v1' format with inline tools/blocks. If False, exports using the new multi-entity 'v2' format, with separate agents, tools, blocks, files, etc.",
deprecated=True,
),
conversation_id: Optional[str] = Query(
None,
description="Conversation ID to export. If provided, uses messages from this conversation instead of the agent's global message history.",
),
# do not remove, used to autogeneration of spec
# TODO: Think of a better way to export AgentFileSchema
spec: AgentFileSchema | None = None,
@@ -254,7 +258,7 @@ async def export_agent(
if use_legacy_format:
raise HTTPException(status_code=400, detail="Legacy format is not supported")
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
agent_file_schema = await server.agent_serialization_manager.export(agent_ids=[agent_id], actor=actor)
agent_file_schema = await server.agent_serialization_manager.export(agent_ids=[agent_id], actor=actor, conversation_id=conversation_id)
return agent_file_schema.model_dump()

View File

@@ -1,10 +1,11 @@
from datetime import timedelta
from typing import Annotated, List, Literal, Optional
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import Field
from fastapi import APIRouter, Body, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from starlette.responses import StreamingResponse
from letta.agents.letta_agent_v3 import LettaAgentV3
from letta.data_sources.redis_client import NoopAsyncRedisClient, get_redis_client
from letta.errors import LettaExpiredError, LettaInvalidArgumentError, NoActiveRunsToCancelError
from letta.helpers.datetime_helpers import get_utc_time
@@ -26,6 +27,7 @@ from letta.services.conversation_manager import ConversationManager
from letta.services.lettuce import LettuceClient
from letta.services.run_manager import RunManager
from letta.services.streaming_service import StreamingService
from letta.services.summarizer.summarizer_config import CompactionSettings
from letta.settings import settings
from letta.validators import ConversationId
@@ -358,3 +360,107 @@ async def cancel_conversation(
logger.info(f"Cancelled run {run_id}")
return results
class CompactionRequest(BaseModel):
compaction_settings: Optional[CompactionSettings] = Field(
default=None,
description="Optional compaction settings to use for this summarization request. If not provided, the agent's default settings will be used.",
)
class CompactionResponse(BaseModel):
summary: str
num_messages_before: int
num_messages_after: int
@router.post("/{conversation_id}/compact", response_model=CompactionResponse, operation_id="compact_conversation")
async def compact_conversation(
conversation_id: ConversationId,
request: Optional[CompactionRequest] = Body(default=None),
server: SyncServer = Depends(get_letta_server),
headers: HeaderParams = Depends(get_headers),
):
"""
Compact (summarize) a conversation's message history.
This endpoint summarizes the in-context messages for a specific conversation,
reducing the message count while preserving important context.
"""
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
# Get the conversation to find the agent_id
conversation = await conversation_manager.get_conversation_by_id(
conversation_id=conversation_id,
actor=actor,
)
# Get the agent state
agent = await server.agent_manager.get_agent_by_id_async(conversation.agent_id, actor, include_relationships=["multi_agent_group"])
# Check eligibility
agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"]
model_compatible = agent.llm_config.model_endpoint_type in [
"anthropic",
"openai",
"together",
"google_ai",
"google_vertex",
"bedrock",
"ollama",
"azure",
"xai",
"zai",
"groq",
"deepseek",
]
if not (agent_eligible and model_compatible):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.",
)
# Get in-context messages for this conversation
in_context_messages = await conversation_manager.get_messages_for_conversation(
conversation_id=conversation_id,
actor=actor,
)
if not in_context_messages:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No in-context messages found for this conversation.",
)
# Create agent loop with conversation context
agent_loop = LettaAgentV3(agent_state=agent, actor=actor, conversation_id=conversation_id)
compaction_settings = request.compaction_settings if request else None
num_messages_before = len(in_context_messages)
# Run compaction
summary_message, messages, summary = await agent_loop.compact(
messages=in_context_messages,
compaction_settings=compaction_settings,
)
num_messages_after = len(messages)
# Validate compaction reduced messages
if num_messages_before <= num_messages_after:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Summarization failed to reduce the number of messages. You may need to use a different CompactionSettings (e.g. using `all` mode).",
)
# Checkpoint the messages (this will update the conversation_messages table)
await agent_loop._checkpoint_messages(run_id=None, step_id=None, new_messages=[summary_message], in_context_messages=messages)
logger.info(f"Compacted conversation {conversation_id}: {num_messages_before} messages -> {num_messages_after}")
return CompactionResponse(
summary=summary,
num_messages_before=num_messages_before,
num_messages_after=num_messages_after,
)

View File

@@ -358,12 +358,14 @@ class AgentSerializationManager:
logger.error(f"Failed to convert group {group.id}: {e}")
raise
async def export(self, agent_ids: List[str], actor: User) -> AgentFileSchema:
async def export(self, agent_ids: List[str], actor: User, conversation_id: Optional[str] = None) -> AgentFileSchema:
"""
Export agents and their related entities to AgentFileSchema format.
Args:
agent_ids: List of agent UUIDs to export
conversation_id: Optional conversation ID. If provided, uses the conversation's
in-context message_ids instead of the agent's global message_ids.
Returns:
AgentFileSchema with all related entities
@@ -376,6 +378,19 @@ class AgentSerializationManager:
agent_states = await self.agent_manager.get_agents_by_ids_async(agent_ids=agent_ids, actor=actor)
# If conversation_id is provided, override the agent's message_ids with conversation's
if conversation_id:
from letta.services.conversation_manager import ConversationManager
conversation_manager = ConversationManager()
conversation_message_ids = await conversation_manager.get_message_ids_for_conversation(
conversation_id=conversation_id,
actor=actor,
)
# Override message_ids for the first agent (conversation export is single-agent)
if agent_states:
agent_states[0].message_ids = conversation_message_ids
# Validate that all requested agents were found
if len(agent_states) != len(agent_ids):
found_ids = {agent.id for agent in agent_states}