diff --git a/fern/openapi.json b/fern/openapi.json index d86cf7ab..dc495cc5 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -29839,6 +29839,63 @@ "title": "CompactionSettings", "description": "Configuration for conversation compaction / summarization.\n\n``model`` is the only required user-facing field – it specifies the summarizer\nmodel handle (e.g. ``\"openai/gpt-4o-mini\"``). Per-model settings (temperature,\nmax tokens, etc.) are derived from the default configuration for that handle." }, + "CompactionStats": { + "properties": { + "trigger": { + "type": "string", + "title": "Trigger", + "description": "What triggered the compaction (e.g., 'context_window_exceeded', 'post_step_context_check')" + }, + "context_tokens_before": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "title": "Context Tokens Before", + "description": "Token count before compaction (from LLM usage stats, includes full context sent to LLM)" + }, + "context_tokens_after": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "title": "Context Tokens After", + "description": "Token count after compaction (message tokens only, does not include tool definitions)" + }, + "context_window": { + "type": "integer", + "title": "Context Window", + "description": "The model's context window size" + }, + "messages_count_before": { + "type": "integer", + "title": "Messages Count Before", + "description": "Number of messages before compaction" + }, + "messages_count_after": { + "type": "integer", + "title": "Messages Count After", + "description": "Number of messages after compaction" + } + }, + "type": "object", + "required": [ + "trigger", + "context_window", + "messages_count_before", + "messages_count_after" + ], + "title": "CompactionStats", + "description": "Statistics about a memory compaction operation." + }, "ComparisonOperator": { "type": "string", "enum": ["eq", "gte", "lte"], @@ -43072,6 +43129,16 @@ "summary": { "type": "string", "title": "Summary" + }, + "compaction_stats": { + "anyOf": [ + { + "$ref": "#/components/schemas/CompactionStats" + }, + { + "type": "null" + } + ] } }, "type": "object", diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index 0456ae56..bd08ff0e 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -29,7 +29,16 @@ from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole -from letta.schemas.letta_message import ApprovalReturn, EventMessage, LettaErrorMessage, LettaMessage, MessageType, SummaryMessage +from letta.schemas.letta_message import ( + ApprovalReturn, + CompactionStats, + EventMessage, + LettaErrorMessage, + LettaMessage, + MessageType, + SummaryMessage, + extract_compaction_stats_from_packed_json, +) from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.letta_request import ClientToolSchema from letta.schemas.letta_response import LettaResponse @@ -61,6 +70,25 @@ from letta.system import package_function_response, package_summarize_message_no from letta.utils import log_telemetry, validate_function_response +def extract_compaction_stats_from_message(message: Message) -> CompactionStats | None: + """ + Extract CompactionStats from a Message object's packed content. + + Args: + message: Message object with packed JSON content + + Returns: + CompactionStats if found and valid, None otherwise + """ + try: + if message.content and len(message.content) == 1: + text_content = message.content[0].text + return extract_compaction_stats_from_packed_json(text_content) + except AttributeError: + pass + return None + + class LettaAgentV3(LettaAgentV2): """ Similar to V2, but stripped down / simplified, while also generalized: @@ -614,6 +642,9 @@ class LettaAgentV3(LettaAgentV2): List of LettaMessage objects to yield to the client """ if include_compaction_messages: + # Extract compaction_stats from the packed message content if available + compaction_stats = extract_compaction_stats_from_message(summary_message) + # New behavior: structured SummaryMessage return [ SummaryMessage( @@ -623,6 +654,7 @@ class LettaAgentV3(LettaAgentV2): otid=Message.generate_otid_from_id(summary_message.id, 0), step_id=step_id, run_id=run_id, + compaction_stats=compaction_stats, ), ] else: @@ -865,6 +897,10 @@ class LettaAgentV3(LettaAgentV2): f"Context window exceeded (error {e}), trying to compact messages attempt {llm_request_attempt + 1} of {summarizer_settings.max_summarizer_retries + 1}" ) try: + # Capture pre-compaction state for metadata + context_tokens_before = self.context_token_estimate + messages_count_before = len(messages) + # Yield event notification before compaction starts if include_compaction_messages: yield self._create_compaction_event_message( @@ -879,6 +915,9 @@ class LettaAgentV3(LettaAgentV2): run_id=run_id, step_id=step_id, use_summary_role=include_compaction_messages, + trigger="context_window_exceeded", + context_tokens_before=context_tokens_before, + messages_count_before=messages_count_before, ) self.logger.info("Summarization succeeded, continuing to retry LLM request") @@ -1013,6 +1052,10 @@ class LettaAgentV3(LettaAgentV2): f"Context window exceeded (current: {self.context_token_estimate}, threshold: {self.agent_state.llm_config.context_window}), trying to compact messages" ) + # Capture pre-compaction state for metadata + context_tokens_before = self.context_token_estimate + messages_count_before = len(messages) + # Yield event notification before compaction starts if include_compaction_messages: yield self._create_compaction_event_message( @@ -1027,6 +1070,9 @@ class LettaAgentV3(LettaAgentV2): run_id=run_id, step_id=step_id, use_summary_role=include_compaction_messages, + trigger="post_step_context_check", + context_tokens_before=context_tokens_before, + messages_count_before=messages_count_before, ) self.response_messages.append(summary_message) @@ -1612,6 +1658,9 @@ class LettaAgentV3(LettaAgentV2): run_id: Optional[str] = None, step_id: Optional[str] = None, use_summary_role: bool = False, + trigger: Optional[str] = None, + context_tokens_before: Optional[int] = None, + messages_count_before: Optional[int] = None, ) -> tuple[Message, list[Message], str]: """Compact the current in-context messages for this agent. @@ -1624,6 +1673,9 @@ class LettaAgentV3(LettaAgentV2): use_summary_role: If True, the summary message will be created with role=summary instead of role=user. This enables first-class summary message handling in the database and API responses. + trigger: What triggered the compaction (e.g., "context_window_exceeded", "post_step_context_check"). + context_tokens_before: Token count before compaction (for stats). + messages_count_before: Message count before compaction (for stats). """ # Use the passed-in compaction_settings first, then agent's compaction_settings if set, @@ -1741,10 +1793,25 @@ class LettaAgentV3(LettaAgentV2): f"Summarization fallback succeeded in bringing the context size below the trigger threshold: {self.context_token_estimate} < {trigger_threshold}" ) + # Build compaction stats if we have the before values + # Note: messages_count_after = len(compacted_messages) + 1 because final_messages + # will be: [system] + [summary_message] + compacted_messages[1:] + compaction_stats = None + if trigger and context_tokens_before is not None and messages_count_before is not None: + compaction_stats = { + "trigger": trigger, + "context_tokens_before": context_tokens_before, + "context_tokens_after": self.context_token_estimate, + "context_window": self.agent_state.llm_config.context_window, + "messages_count_before": messages_count_before, + "messages_count_after": len(compacted_messages) + 1, + } + # Persist the summary message to DB summary_message_str_packed = package_summarize_message_no_counts( summary=summary, timezone=self.agent_state.timezone, + compaction_stats=compaction_stats, ) if use_summary_role: diff --git a/letta/schemas/letta_message.py b/letta/schemas/letta_message.py index fde08f5a..6b25a0c6 100644 --- a/letta/schemas/letta_message.py +++ b/letta/schemas/letta_message.py @@ -396,6 +396,42 @@ class LettaErrorMessage(BaseModel): seq_id: Optional[int] = None +class CompactionStats(BaseModel): + """ + Statistics about a memory compaction operation. + """ + + trigger: str = Field(..., description="What triggered the compaction (e.g., 'context_window_exceeded', 'post_step_context_check')") + context_tokens_before: Optional[int] = Field( + None, description="Token count before compaction (from LLM usage stats, includes full context sent to LLM)" + ) + context_tokens_after: Optional[int] = Field( + None, description="Token count after compaction (message tokens only, does not include tool definitions)" + ) + context_window: int = Field(..., description="The model's context window size") + messages_count_before: int = Field(..., description="Number of messages before compaction") + messages_count_after: int = Field(..., description="Number of messages after compaction") + + +def extract_compaction_stats_from_packed_json(text_content: str) -> Optional[CompactionStats]: + """ + Extract CompactionStats from a packed summary message JSON string. + + Args: + text_content: The packed JSON string from summary message content + + Returns: + CompactionStats if found and valid, None otherwise + """ + try: + packed_json = json.loads(text_content) + if isinstance(packed_json, dict) and "compaction_stats" in packed_json: + return CompactionStats(**packed_json["compaction_stats"]) + except (json.JSONDecodeError, TypeError, ValueError): + pass + return None + + class SummaryMessage(LettaMessage): """ A message representing a summary of the conversation. Sent to the LLM as a user or system message depending on the provider. @@ -403,6 +439,7 @@ class SummaryMessage(LettaMessage): message_type: Literal["summary_message"] = "summary_message" summary: str + compaction_stats: Optional[CompactionStats] = None class EventMessage(LettaMessage): diff --git a/letta/schemas/message.py b/letta/schemas/message.py index 5391dd5b..71ff53d1 100644 --- a/letta/schemas/message.py +++ b/letta/schemas/message.py @@ -30,6 +30,7 @@ from letta.schemas.letta_message import ( ApprovalReturn, AssistantMessage, AssistantMessageListResult, + CompactionStats, HiddenReasoningMessage, LettaMessage, LettaMessageReturnUnion, @@ -46,6 +47,7 @@ from letta.schemas.letta_message import ( ToolReturnMessage, UserMessage, UserMessageListResult, + extract_compaction_stats_from_packed_json, ) from letta.schemas.letta_message_content import ( ImageContent, @@ -1062,9 +1064,12 @@ class Message(BaseMessage): raise ValueError(f"Invalid summary message (no text object on message): {self.content}") # Unpack the summary from the packed JSON format - # The packed format is: {"type": "system_alert", "message": "...", "time": "..."} + # The packed format is: {"type": "system_alert", "message": "...", "time": "...", "compaction_stats": {...}} summary = unpack_message(text_content) + # Extract compaction_stats from the packed JSON using shared helper + compaction_stats = extract_compaction_stats_from_packed_json(text_content) + if as_user_message: # Return as UserMessage for backward compatibility return UserMessage( @@ -1086,6 +1091,7 @@ class Message(BaseMessage): otid=self.otid, step_id=self.step_id, run_id=self.run_id, + compaction_stats=compaction_stats, ) @staticmethod diff --git a/letta/system.py b/letta/system.py index 95d919e3..e766420b 100644 --- a/letta/system.py +++ b/letta/system.py @@ -204,7 +204,7 @@ def package_summarize_message(summary, summary_message_count, hidden_message_cou return json_dumps(packaged_message) -def package_summarize_message_no_counts(summary, timezone): +def package_summarize_message_no_counts(summary, timezone, compaction_stats: dict | None = None): context_message = ( "Note: prior messages have been hidden from view due to conversation memory constraints.\n" + f"The following is a summary of the previous messages:\n {summary}" @@ -217,6 +217,9 @@ def package_summarize_message_no_counts(summary, timezone): "time": formatted_time, } + if compaction_stats: + packaged_message["compaction_stats"] = compaction_stats + return json_dumps(packaged_message) diff --git a/tests/integration_test_summarizer.py b/tests/integration_test_summarizer.py index 8c3bfa42..da4fa435 100644 --- a/tests/integration_test_summarizer.py +++ b/tests/integration_test_summarizer.py @@ -1716,3 +1716,320 @@ async def test_summarize_all(server: SyncServer, actor, llm_config: LLMConfig): print(f"Successfully summarized {len(messages)} messages using 'all' mode") print(f"Summary: {summary[:200]}..." if len(summary) > 200 else f"Summary: {summary}") print(f"Using {llm_config.model_endpoint_type} for model {llm_config.model}") + + +# ============================================================================= +# CompactionStats tests +# ============================================================================= + + +def test_compaction_stats_embedding_in_packed_json(): + """Test that compaction_stats are correctly embedded in the packed JSON by package_summarize_message_no_counts.""" + from letta.system import package_summarize_message_no_counts + + stats = { + "trigger": "post_step_context_check", + "context_tokens_before": 50000, + "context_tokens_after": 15000, + "context_window": 128000, + "messages_count_before": 45, + "messages_count_after": 12, + } + + packed = package_summarize_message_no_counts( + summary="Test summary content", + timezone="UTC", + compaction_stats=stats, + ) + + # Parse the packed JSON + packed_json = json.loads(packed) + + # Verify structure + assert "type" in packed_json + assert packed_json["type"] == "system_alert" + assert "message" in packed_json + assert "Test summary content" in packed_json["message"] + assert "compaction_stats" in packed_json + + # Verify stats content + embedded_stats = packed_json["compaction_stats"] + assert embedded_stats["trigger"] == "post_step_context_check" + assert embedded_stats["context_tokens_before"] == 50000 + assert embedded_stats["context_tokens_after"] == 15000 + assert embedded_stats["context_window"] == 128000 + assert embedded_stats["messages_count_before"] == 45 + assert embedded_stats["messages_count_after"] == 12 + + +def test_compaction_stats_embedding_without_stats(): + """Test that packed JSON works correctly when no stats are provided.""" + from letta.system import package_summarize_message_no_counts + + packed = package_summarize_message_no_counts( + summary="Test summary content", + timezone="UTC", + compaction_stats=None, + ) + + packed_json = json.loads(packed) + + assert "type" in packed_json + assert "message" in packed_json + assert "compaction_stats" not in packed_json + + +def test_extract_compaction_stats_from_packed_json(): + """Test extracting CompactionStats from a packed JSON string.""" + from letta.schemas.letta_message import CompactionStats, extract_compaction_stats_from_packed_json + + packed_json = json.dumps( + { + "type": "system_alert", + "message": "Test summary", + "time": "2024-01-15T10:00:00", + "compaction_stats": { + "trigger": "context_window_exceeded", + "context_tokens_before": 100000, + "context_tokens_after": 30000, + "context_window": 128000, + "messages_count_before": 50, + "messages_count_after": 15, + }, + } + ) + + stats = extract_compaction_stats_from_packed_json(packed_json) + + assert stats is not None + assert isinstance(stats, CompactionStats) + assert stats.trigger == "context_window_exceeded" + assert stats.context_tokens_before == 100000 + assert stats.context_tokens_after == 30000 + assert stats.context_window == 128000 + assert stats.messages_count_before == 50 + assert stats.messages_count_after == 15 + + +def test_extract_compaction_stats_from_packed_json_without_stats(): + """Test that extraction returns None when no stats are present (backward compatibility).""" + from letta.schemas.letta_message import extract_compaction_stats_from_packed_json + + # Old format without compaction_stats + packed_json = json.dumps( + { + "type": "system_alert", + "message": "Test summary", + "time": "2024-01-15T10:00:00", + } + ) + + stats = extract_compaction_stats_from_packed_json(packed_json) + + assert stats is None + + +def test_extract_compaction_stats_from_packed_json_invalid_json(): + """Test that extraction handles invalid JSON gracefully.""" + from letta.schemas.letta_message import extract_compaction_stats_from_packed_json + + stats = extract_compaction_stats_from_packed_json("not valid json") + assert stats is None + + stats = extract_compaction_stats_from_packed_json("") + assert stats is None + + +def test_extract_compaction_stats_from_packed_json_invalid_stats(): + """Test that extraction handles invalid stats structure gracefully.""" + from letta.schemas.letta_message import extract_compaction_stats_from_packed_json + + # Missing required fields + packed_json = json.dumps( + { + "type": "system_alert", + "message": "Test summary", + "compaction_stats": { + "trigger": "test", + # Missing context_window, messages_count_before, messages_count_after + }, + } + ) + + stats = extract_compaction_stats_from_packed_json(packed_json) + assert stats is None # Should return None due to validation failure + + +def test_extract_compaction_stats_from_message(): + """Test extracting CompactionStats from a Message object.""" + from letta.agents.letta_agent_v3 import extract_compaction_stats_from_message + from letta.schemas.letta_message import CompactionStats + + packed_content = json.dumps( + { + "type": "system_alert", + "message": "Test summary", + "time": "2024-01-15T10:00:00", + "compaction_stats": { + "trigger": "post_step_context_check", + "context_tokens_before": 50000, + "context_tokens_after": 15000, + "context_window": 128000, + "messages_count_before": 45, + "messages_count_after": 12, + }, + } + ) + + message = PydanticMessage( + role=MessageRole.summary, + content=[TextContent(type="text", text=packed_content)], + ) + + stats = extract_compaction_stats_from_message(message) + + assert stats is not None + assert isinstance(stats, CompactionStats) + assert stats.trigger == "post_step_context_check" + assert stats.context_tokens_before == 50000 + assert stats.messages_count_after == 12 + + +def test_extract_compaction_stats_from_message_without_stats(): + """Test that Message extraction returns None when no stats are present.""" + from letta.agents.letta_agent_v3 import extract_compaction_stats_from_message + + packed_content = json.dumps( + { + "type": "system_alert", + "message": "Old format summary", + "time": "2024-01-15T10:00:00", + } + ) + + message = PydanticMessage( + role=MessageRole.summary, + content=[TextContent(type="text", text=packed_content)], + ) + + stats = extract_compaction_stats_from_message(message) + assert stats is None + + +def test_message_to_summary_message_with_stats(): + """Test that Message._convert_summary_message extracts compaction_stats.""" + from letta.schemas.letta_message import CompactionStats + + packed_content = json.dumps( + { + "type": "system_alert", + "message": "Summary of conversation", + "time": "2024-01-15T10:00:00", + "compaction_stats": { + "trigger": "context_window_exceeded", + "context_tokens_before": 80000, + "context_tokens_after": 25000, + "context_window": 128000, + "messages_count_before": 60, + "messages_count_after": 20, + }, + } + ) + + message = PydanticMessage( + role=MessageRole.summary, + content=[TextContent(type="text", text=packed_content)], + ) + + # Convert to SummaryMessage (as_user_message=False) + summary_msg = message._convert_summary_message(as_user_message=False) + + assert summary_msg.message_type == "summary_message" + assert summary_msg.compaction_stats is not None + assert isinstance(summary_msg.compaction_stats, CompactionStats) + assert summary_msg.compaction_stats.trigger == "context_window_exceeded" + assert summary_msg.compaction_stats.context_tokens_before == 80000 + + +def test_message_to_summary_message_backward_compatible(): + """Test that old messages without compaction_stats still convert correctly.""" + packed_content = json.dumps( + { + "type": "system_alert", + "message": "Old format summary without stats", + "time": "2024-01-15T10:00:00", + } + ) + + message = PydanticMessage( + role=MessageRole.summary, + content=[TextContent(type="text", text=packed_content)], + ) + + summary_msg = message._convert_summary_message(as_user_message=False) + + assert summary_msg.message_type == "summary_message" + assert summary_msg.compaction_stats is None # Should be None for old messages + assert "Old format summary" in summary_msg.summary + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "llm_config", + TESTED_LLM_CONFIGS, + ids=[c.model for c in TESTED_LLM_CONFIGS], +) +async def test_compact_with_stats_params_embeds_stats(server: SyncServer, actor, llm_config: LLMConfig): + """ + Integration test: compact() with trigger/context_tokens_before/messages_count_before + embeds compaction_stats in the packed message content. + """ + from letta.agents.letta_agent_v3 import extract_compaction_stats_from_message + + # Create a conversation with enough messages to summarize + messages = [ + PydanticMessage( + role=MessageRole.system, + content=[TextContent(type="text", text="You are a helpful assistant.")], + ) + ] + for i in range(10): + messages.append( + PydanticMessage( + role=MessageRole.user, + content=[TextContent(type="text", text=f"User message {i}")], + ) + ) + messages.append( + PydanticMessage( + role=MessageRole.assistant, + content=[TextContent(type="text", text=f"Response {i}")], + ) + ) + + agent_state, in_context_messages = await create_agent_with_messages(server, actor, llm_config, messages) + + handle = llm_config.handle or f"{llm_config.model_endpoint_type}/{llm_config.model}" + agent_state.compaction_settings = CompactionSettings(model=handle, mode="all") + + agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor) + + # Call compact with stats params + summary_message_obj, compacted_messages, summary_text = await agent_loop.compact( + messages=in_context_messages, + use_summary_role=True, + trigger="post_step_context_check", + context_tokens_before=50000, + messages_count_before=len(in_context_messages), + ) + + # Extract stats from the message + stats = extract_compaction_stats_from_message(summary_message_obj) + + assert stats is not None, "CompactionStats should be embedded in the message" + assert stats.trigger == "post_step_context_check" + assert stats.context_tokens_before == 50000 + assert stats.messages_count_before == len(in_context_messages) + assert stats.context_tokens_after is not None # Should be set by compact() + assert stats.messages_count_after == len(compacted_messages) # final_messages already includes summary + assert stats.context_window == llm_config.context_window