feat: add compaction stats (#9219)

* base

* update

* last

* generate

* fix test
This commit is contained in:
jnjpng
2026-01-30 15:20:16 -08:00
committed by Caren Thomas
parent 720fc9c758
commit 3f23a23227
6 changed files with 500 additions and 3 deletions

View File

@@ -29839,6 +29839,63 @@
"title": "CompactionSettings",
"description": "Configuration for conversation compaction / summarization.\n\n``model`` is the only required user-facing field it specifies the summarizer\nmodel handle (e.g. ``\"openai/gpt-4o-mini\"``). Per-model settings (temperature,\nmax tokens, etc.) are derived from the default configuration for that handle."
},
"CompactionStats": {
"properties": {
"trigger": {
"type": "string",
"title": "Trigger",
"description": "What triggered the compaction (e.g., 'context_window_exceeded', 'post_step_context_check')"
},
"context_tokens_before": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Context Tokens Before",
"description": "Token count before compaction (from LLM usage stats, includes full context sent to LLM)"
},
"context_tokens_after": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"title": "Context Tokens After",
"description": "Token count after compaction (message tokens only, does not include tool definitions)"
},
"context_window": {
"type": "integer",
"title": "Context Window",
"description": "The model's context window size"
},
"messages_count_before": {
"type": "integer",
"title": "Messages Count Before",
"description": "Number of messages before compaction"
},
"messages_count_after": {
"type": "integer",
"title": "Messages Count After",
"description": "Number of messages after compaction"
}
},
"type": "object",
"required": [
"trigger",
"context_window",
"messages_count_before",
"messages_count_after"
],
"title": "CompactionStats",
"description": "Statistics about a memory compaction operation."
},
"ComparisonOperator": {
"type": "string",
"enum": ["eq", "gte", "lte"],
@@ -43072,6 +43129,16 @@
"summary": {
"type": "string",
"title": "Summary"
},
"compaction_stats": {
"anyOf": [
{
"$ref": "#/components/schemas/CompactionStats"
},
{
"type": "null"
}
]
}
},
"type": "object",

View File

@@ -29,7 +29,16 @@ from letta.local_llm.constants import INNER_THOUGHTS_KWARG
from letta.otel.tracing import trace_method
from letta.schemas.agent import AgentState
from letta.schemas.enums import MessageRole
from letta.schemas.letta_message import ApprovalReturn, EventMessage, LettaErrorMessage, LettaMessage, MessageType, SummaryMessage
from letta.schemas.letta_message import (
ApprovalReturn,
CompactionStats,
EventMessage,
LettaErrorMessage,
LettaMessage,
MessageType,
SummaryMessage,
extract_compaction_stats_from_packed_json,
)
from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent
from letta.schemas.letta_request import ClientToolSchema
from letta.schemas.letta_response import LettaResponse
@@ -61,6 +70,25 @@ from letta.system import package_function_response, package_summarize_message_no
from letta.utils import log_telemetry, validate_function_response
def extract_compaction_stats_from_message(message: Message) -> CompactionStats | None:
"""
Extract CompactionStats from a Message object's packed content.
Args:
message: Message object with packed JSON content
Returns:
CompactionStats if found and valid, None otherwise
"""
try:
if message.content and len(message.content) == 1:
text_content = message.content[0].text
return extract_compaction_stats_from_packed_json(text_content)
except AttributeError:
pass
return None
class LettaAgentV3(LettaAgentV2):
"""
Similar to V2, but stripped down / simplified, while also generalized:
@@ -614,6 +642,9 @@ class LettaAgentV3(LettaAgentV2):
List of LettaMessage objects to yield to the client
"""
if include_compaction_messages:
# Extract compaction_stats from the packed message content if available
compaction_stats = extract_compaction_stats_from_message(summary_message)
# New behavior: structured SummaryMessage
return [
SummaryMessage(
@@ -623,6 +654,7 @@ class LettaAgentV3(LettaAgentV2):
otid=Message.generate_otid_from_id(summary_message.id, 0),
step_id=step_id,
run_id=run_id,
compaction_stats=compaction_stats,
),
]
else:
@@ -865,6 +897,10 @@ class LettaAgentV3(LettaAgentV2):
f"Context window exceeded (error {e}), trying to compact messages attempt {llm_request_attempt + 1} of {summarizer_settings.max_summarizer_retries + 1}"
)
try:
# Capture pre-compaction state for metadata
context_tokens_before = self.context_token_estimate
messages_count_before = len(messages)
# Yield event notification before compaction starts
if include_compaction_messages:
yield self._create_compaction_event_message(
@@ -879,6 +915,9 @@ class LettaAgentV3(LettaAgentV2):
run_id=run_id,
step_id=step_id,
use_summary_role=include_compaction_messages,
trigger="context_window_exceeded",
context_tokens_before=context_tokens_before,
messages_count_before=messages_count_before,
)
self.logger.info("Summarization succeeded, continuing to retry LLM request")
@@ -1013,6 +1052,10 @@ class LettaAgentV3(LettaAgentV2):
f"Context window exceeded (current: {self.context_token_estimate}, threshold: {self.agent_state.llm_config.context_window}), trying to compact messages"
)
# Capture pre-compaction state for metadata
context_tokens_before = self.context_token_estimate
messages_count_before = len(messages)
# Yield event notification before compaction starts
if include_compaction_messages:
yield self._create_compaction_event_message(
@@ -1027,6 +1070,9 @@ class LettaAgentV3(LettaAgentV2):
run_id=run_id,
step_id=step_id,
use_summary_role=include_compaction_messages,
trigger="post_step_context_check",
context_tokens_before=context_tokens_before,
messages_count_before=messages_count_before,
)
self.response_messages.append(summary_message)
@@ -1612,6 +1658,9 @@ class LettaAgentV3(LettaAgentV2):
run_id: Optional[str] = None,
step_id: Optional[str] = None,
use_summary_role: bool = False,
trigger: Optional[str] = None,
context_tokens_before: Optional[int] = None,
messages_count_before: Optional[int] = None,
) -> tuple[Message, list[Message], str]:
"""Compact the current in-context messages for this agent.
@@ -1624,6 +1673,9 @@ class LettaAgentV3(LettaAgentV2):
use_summary_role: If True, the summary message will be created with
role=summary instead of role=user. This enables first-class
summary message handling in the database and API responses.
trigger: What triggered the compaction (e.g., "context_window_exceeded", "post_step_context_check").
context_tokens_before: Token count before compaction (for stats).
messages_count_before: Message count before compaction (for stats).
"""
# Use the passed-in compaction_settings first, then agent's compaction_settings if set,
@@ -1741,10 +1793,25 @@ class LettaAgentV3(LettaAgentV2):
f"Summarization fallback succeeded in bringing the context size below the trigger threshold: {self.context_token_estimate} < {trigger_threshold}"
)
# Build compaction stats if we have the before values
# Note: messages_count_after = len(compacted_messages) + 1 because final_messages
# will be: [system] + [summary_message] + compacted_messages[1:]
compaction_stats = None
if trigger and context_tokens_before is not None and messages_count_before is not None:
compaction_stats = {
"trigger": trigger,
"context_tokens_before": context_tokens_before,
"context_tokens_after": self.context_token_estimate,
"context_window": self.agent_state.llm_config.context_window,
"messages_count_before": messages_count_before,
"messages_count_after": len(compacted_messages) + 1,
}
# Persist the summary message to DB
summary_message_str_packed = package_summarize_message_no_counts(
summary=summary,
timezone=self.agent_state.timezone,
compaction_stats=compaction_stats,
)
if use_summary_role:

View File

@@ -396,6 +396,42 @@ class LettaErrorMessage(BaseModel):
seq_id: Optional[int] = None
class CompactionStats(BaseModel):
"""
Statistics about a memory compaction operation.
"""
trigger: str = Field(..., description="What triggered the compaction (e.g., 'context_window_exceeded', 'post_step_context_check')")
context_tokens_before: Optional[int] = Field(
None, description="Token count before compaction (from LLM usage stats, includes full context sent to LLM)"
)
context_tokens_after: Optional[int] = Field(
None, description="Token count after compaction (message tokens only, does not include tool definitions)"
)
context_window: int = Field(..., description="The model's context window size")
messages_count_before: int = Field(..., description="Number of messages before compaction")
messages_count_after: int = Field(..., description="Number of messages after compaction")
def extract_compaction_stats_from_packed_json(text_content: str) -> Optional[CompactionStats]:
"""
Extract CompactionStats from a packed summary message JSON string.
Args:
text_content: The packed JSON string from summary message content
Returns:
CompactionStats if found and valid, None otherwise
"""
try:
packed_json = json.loads(text_content)
if isinstance(packed_json, dict) and "compaction_stats" in packed_json:
return CompactionStats(**packed_json["compaction_stats"])
except (json.JSONDecodeError, TypeError, ValueError):
pass
return None
class SummaryMessage(LettaMessage):
"""
A message representing a summary of the conversation. Sent to the LLM as a user or system message depending on the provider.
@@ -403,6 +439,7 @@ class SummaryMessage(LettaMessage):
message_type: Literal["summary_message"] = "summary_message"
summary: str
compaction_stats: Optional[CompactionStats] = None
class EventMessage(LettaMessage):

View File

@@ -30,6 +30,7 @@ from letta.schemas.letta_message import (
ApprovalReturn,
AssistantMessage,
AssistantMessageListResult,
CompactionStats,
HiddenReasoningMessage,
LettaMessage,
LettaMessageReturnUnion,
@@ -46,6 +47,7 @@ from letta.schemas.letta_message import (
ToolReturnMessage,
UserMessage,
UserMessageListResult,
extract_compaction_stats_from_packed_json,
)
from letta.schemas.letta_message_content import (
ImageContent,
@@ -1062,9 +1064,12 @@ class Message(BaseMessage):
raise ValueError(f"Invalid summary message (no text object on message): {self.content}")
# Unpack the summary from the packed JSON format
# The packed format is: {"type": "system_alert", "message": "...", "time": "..."}
# The packed format is: {"type": "system_alert", "message": "...", "time": "...", "compaction_stats": {...}}
summary = unpack_message(text_content)
# Extract compaction_stats from the packed JSON using shared helper
compaction_stats = extract_compaction_stats_from_packed_json(text_content)
if as_user_message:
# Return as UserMessage for backward compatibility
return UserMessage(
@@ -1086,6 +1091,7 @@ class Message(BaseMessage):
otid=self.otid,
step_id=self.step_id,
run_id=self.run_id,
compaction_stats=compaction_stats,
)
@staticmethod

View File

@@ -204,7 +204,7 @@ def package_summarize_message(summary, summary_message_count, hidden_message_cou
return json_dumps(packaged_message)
def package_summarize_message_no_counts(summary, timezone):
def package_summarize_message_no_counts(summary, timezone, compaction_stats: dict | None = None):
context_message = (
"Note: prior messages have been hidden from view due to conversation memory constraints.\n"
+ f"The following is a summary of the previous messages:\n {summary}"
@@ -217,6 +217,9 @@ def package_summarize_message_no_counts(summary, timezone):
"time": formatted_time,
}
if compaction_stats:
packaged_message["compaction_stats"] = compaction_stats
return json_dumps(packaged_message)

View File

@@ -1716,3 +1716,320 @@ async def test_summarize_all(server: SyncServer, actor, llm_config: LLMConfig):
print(f"Successfully summarized {len(messages)} messages using 'all' mode")
print(f"Summary: {summary[:200]}..." if len(summary) > 200 else f"Summary: {summary}")
print(f"Using {llm_config.model_endpoint_type} for model {llm_config.model}")
# =============================================================================
# CompactionStats tests
# =============================================================================
def test_compaction_stats_embedding_in_packed_json():
"""Test that compaction_stats are correctly embedded in the packed JSON by package_summarize_message_no_counts."""
from letta.system import package_summarize_message_no_counts
stats = {
"trigger": "post_step_context_check",
"context_tokens_before": 50000,
"context_tokens_after": 15000,
"context_window": 128000,
"messages_count_before": 45,
"messages_count_after": 12,
}
packed = package_summarize_message_no_counts(
summary="Test summary content",
timezone="UTC",
compaction_stats=stats,
)
# Parse the packed JSON
packed_json = json.loads(packed)
# Verify structure
assert "type" in packed_json
assert packed_json["type"] == "system_alert"
assert "message" in packed_json
assert "Test summary content" in packed_json["message"]
assert "compaction_stats" in packed_json
# Verify stats content
embedded_stats = packed_json["compaction_stats"]
assert embedded_stats["trigger"] == "post_step_context_check"
assert embedded_stats["context_tokens_before"] == 50000
assert embedded_stats["context_tokens_after"] == 15000
assert embedded_stats["context_window"] == 128000
assert embedded_stats["messages_count_before"] == 45
assert embedded_stats["messages_count_after"] == 12
def test_compaction_stats_embedding_without_stats():
"""Test that packed JSON works correctly when no stats are provided."""
from letta.system import package_summarize_message_no_counts
packed = package_summarize_message_no_counts(
summary="Test summary content",
timezone="UTC",
compaction_stats=None,
)
packed_json = json.loads(packed)
assert "type" in packed_json
assert "message" in packed_json
assert "compaction_stats" not in packed_json
def test_extract_compaction_stats_from_packed_json():
"""Test extracting CompactionStats from a packed JSON string."""
from letta.schemas.letta_message import CompactionStats, extract_compaction_stats_from_packed_json
packed_json = json.dumps(
{
"type": "system_alert",
"message": "Test summary",
"time": "2024-01-15T10:00:00",
"compaction_stats": {
"trigger": "context_window_exceeded",
"context_tokens_before": 100000,
"context_tokens_after": 30000,
"context_window": 128000,
"messages_count_before": 50,
"messages_count_after": 15,
},
}
)
stats = extract_compaction_stats_from_packed_json(packed_json)
assert stats is not None
assert isinstance(stats, CompactionStats)
assert stats.trigger == "context_window_exceeded"
assert stats.context_tokens_before == 100000
assert stats.context_tokens_after == 30000
assert stats.context_window == 128000
assert stats.messages_count_before == 50
assert stats.messages_count_after == 15
def test_extract_compaction_stats_from_packed_json_without_stats():
"""Test that extraction returns None when no stats are present (backward compatibility)."""
from letta.schemas.letta_message import extract_compaction_stats_from_packed_json
# Old format without compaction_stats
packed_json = json.dumps(
{
"type": "system_alert",
"message": "Test summary",
"time": "2024-01-15T10:00:00",
}
)
stats = extract_compaction_stats_from_packed_json(packed_json)
assert stats is None
def test_extract_compaction_stats_from_packed_json_invalid_json():
"""Test that extraction handles invalid JSON gracefully."""
from letta.schemas.letta_message import extract_compaction_stats_from_packed_json
stats = extract_compaction_stats_from_packed_json("not valid json")
assert stats is None
stats = extract_compaction_stats_from_packed_json("")
assert stats is None
def test_extract_compaction_stats_from_packed_json_invalid_stats():
"""Test that extraction handles invalid stats structure gracefully."""
from letta.schemas.letta_message import extract_compaction_stats_from_packed_json
# Missing required fields
packed_json = json.dumps(
{
"type": "system_alert",
"message": "Test summary",
"compaction_stats": {
"trigger": "test",
# Missing context_window, messages_count_before, messages_count_after
},
}
)
stats = extract_compaction_stats_from_packed_json(packed_json)
assert stats is None # Should return None due to validation failure
def test_extract_compaction_stats_from_message():
"""Test extracting CompactionStats from a Message object."""
from letta.agents.letta_agent_v3 import extract_compaction_stats_from_message
from letta.schemas.letta_message import CompactionStats
packed_content = json.dumps(
{
"type": "system_alert",
"message": "Test summary",
"time": "2024-01-15T10:00:00",
"compaction_stats": {
"trigger": "post_step_context_check",
"context_tokens_before": 50000,
"context_tokens_after": 15000,
"context_window": 128000,
"messages_count_before": 45,
"messages_count_after": 12,
},
}
)
message = PydanticMessage(
role=MessageRole.summary,
content=[TextContent(type="text", text=packed_content)],
)
stats = extract_compaction_stats_from_message(message)
assert stats is not None
assert isinstance(stats, CompactionStats)
assert stats.trigger == "post_step_context_check"
assert stats.context_tokens_before == 50000
assert stats.messages_count_after == 12
def test_extract_compaction_stats_from_message_without_stats():
"""Test that Message extraction returns None when no stats are present."""
from letta.agents.letta_agent_v3 import extract_compaction_stats_from_message
packed_content = json.dumps(
{
"type": "system_alert",
"message": "Old format summary",
"time": "2024-01-15T10:00:00",
}
)
message = PydanticMessage(
role=MessageRole.summary,
content=[TextContent(type="text", text=packed_content)],
)
stats = extract_compaction_stats_from_message(message)
assert stats is None
def test_message_to_summary_message_with_stats():
"""Test that Message._convert_summary_message extracts compaction_stats."""
from letta.schemas.letta_message import CompactionStats
packed_content = json.dumps(
{
"type": "system_alert",
"message": "Summary of conversation",
"time": "2024-01-15T10:00:00",
"compaction_stats": {
"trigger": "context_window_exceeded",
"context_tokens_before": 80000,
"context_tokens_after": 25000,
"context_window": 128000,
"messages_count_before": 60,
"messages_count_after": 20,
},
}
)
message = PydanticMessage(
role=MessageRole.summary,
content=[TextContent(type="text", text=packed_content)],
)
# Convert to SummaryMessage (as_user_message=False)
summary_msg = message._convert_summary_message(as_user_message=False)
assert summary_msg.message_type == "summary_message"
assert summary_msg.compaction_stats is not None
assert isinstance(summary_msg.compaction_stats, CompactionStats)
assert summary_msg.compaction_stats.trigger == "context_window_exceeded"
assert summary_msg.compaction_stats.context_tokens_before == 80000
def test_message_to_summary_message_backward_compatible():
"""Test that old messages without compaction_stats still convert correctly."""
packed_content = json.dumps(
{
"type": "system_alert",
"message": "Old format summary without stats",
"time": "2024-01-15T10:00:00",
}
)
message = PydanticMessage(
role=MessageRole.summary,
content=[TextContent(type="text", text=packed_content)],
)
summary_msg = message._convert_summary_message(as_user_message=False)
assert summary_msg.message_type == "summary_message"
assert summary_msg.compaction_stats is None # Should be None for old messages
assert "Old format summary" in summary_msg.summary
@pytest.mark.asyncio
@pytest.mark.parametrize(
"llm_config",
TESTED_LLM_CONFIGS,
ids=[c.model for c in TESTED_LLM_CONFIGS],
)
async def test_compact_with_stats_params_embeds_stats(server: SyncServer, actor, llm_config: LLMConfig):
"""
Integration test: compact() with trigger/context_tokens_before/messages_count_before
embeds compaction_stats in the packed message content.
"""
from letta.agents.letta_agent_v3 import extract_compaction_stats_from_message
# Create a conversation with enough messages to summarize
messages = [
PydanticMessage(
role=MessageRole.system,
content=[TextContent(type="text", text="You are a helpful assistant.")],
)
]
for i in range(10):
messages.append(
PydanticMessage(
role=MessageRole.user,
content=[TextContent(type="text", text=f"User message {i}")],
)
)
messages.append(
PydanticMessage(
role=MessageRole.assistant,
content=[TextContent(type="text", text=f"Response {i}")],
)
)
agent_state, in_context_messages = await create_agent_with_messages(server, actor, llm_config, messages)
handle = llm_config.handle or f"{llm_config.model_endpoint_type}/{llm_config.model}"
agent_state.compaction_settings = CompactionSettings(model=handle, mode="all")
agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor)
# Call compact with stats params
summary_message_obj, compacted_messages, summary_text = await agent_loop.compact(
messages=in_context_messages,
use_summary_role=True,
trigger="post_step_context_check",
context_tokens_before=50000,
messages_count_before=len(in_context_messages),
)
# Extract stats from the message
stats = extract_compaction_stats_from_message(summary_message_obj)
assert stats is not None, "CompactionStats should be embedded in the message"
assert stats.trigger == "post_step_context_check"
assert stats.context_tokens_before == 50000
assert stats.messages_count_before == len(in_context_messages)
assert stats.context_tokens_after is not None # Should be set by compact()
assert stats.messages_count_after == len(compacted_messages) # final_messages already includes summary
assert stats.context_window == llm_config.context_window