fix: avoid infinite summarization loops (#6506)
This commit is contained in:
committed by
Caren Thomas
parent
a38475f23d
commit
3569721fd4
@@ -899,6 +899,14 @@ class LettaAgentV2(BaseAgentV2):
|
||||
# Save per-step usage for Step token details (before accumulating)
|
||||
self.last_step_usage = step_usage_stats
|
||||
|
||||
# For newer agent loops (e.g. V3), we also maintain a running
|
||||
# estimate of the current context size derived from the latest
|
||||
# step's total tokens. This can then be safely adjusted after
|
||||
# summarization without mutating the historical per-step usage
|
||||
# stored in Step metrics.
|
||||
if hasattr(self, "context_token_estimate"):
|
||||
self.context_token_estimate = step_usage_stats.total_tokens
|
||||
|
||||
# Accumulate into global usage
|
||||
self.usage.step_count += step_usage_stats.step_count
|
||||
self.usage.completion_tokens += step_usage_stats.completion_tokens
|
||||
|
||||
@@ -49,7 +49,10 @@ from letta.server.rest_api.utils import (
|
||||
from letta.services.helpers.tool_parser_helper import runtime_override_tool_json_schema
|
||||
from letta.services.summarizer.summarizer_all import summarize_all
|
||||
from letta.services.summarizer.summarizer_config import SummarizerConfig, get_default_summarizer_config
|
||||
from letta.services.summarizer.summarizer_sliding_window import summarize_via_sliding_window
|
||||
from letta.services.summarizer.summarizer_sliding_window import (
|
||||
count_tokens,
|
||||
summarize_via_sliding_window,
|
||||
)
|
||||
from letta.settings import settings, summarizer_settings
|
||||
from letta.system import package_function_response, package_summarize_message_no_counts
|
||||
from letta.utils import log_telemetry, validate_function_response
|
||||
@@ -71,6 +74,11 @@ class LettaAgentV3(LettaAgentV2):
|
||||
super()._initialize_state()
|
||||
self._require_tool_call = False
|
||||
self.response_messages_for_metadata = [] # Separate accumulator for streaming job metadata
|
||||
# Approximate token count for the *current* in-context buffer, used
|
||||
# only for proactive summarization / eviction logic. This is derived
|
||||
# from per-step usage but can be updated after summarization without
|
||||
# affecting step-level telemetry.
|
||||
self.context_token_estimate: int | None = None
|
||||
|
||||
def _compute_tool_return_truncation_chars(self) -> int:
|
||||
"""Compute a dynamic cap for tool returns in requests.
|
||||
@@ -141,8 +149,8 @@ class LettaAgentV3(LettaAgentV2):
|
||||
|
||||
# Proactive summarization if approaching context limit
|
||||
if (
|
||||
self.last_step_usage
|
||||
and self.last_step_usage.total_tokens > self.agent_state.llm_config.context_window * SUMMARIZATION_TRIGGER_MULTIPLIER
|
||||
self.context_token_estimate is not None
|
||||
and self.context_token_estimate > self.agent_state.llm_config.context_window * SUMMARIZATION_TRIGGER_MULTIPLIER
|
||||
and not self.agent_state.message_buffer_autoclear
|
||||
):
|
||||
self.logger.warning(
|
||||
@@ -153,7 +161,7 @@ class LettaAgentV3(LettaAgentV2):
|
||||
in_context_messages = await self.summarize_conversation_history(
|
||||
in_context_messages=in_context_messages,
|
||||
new_letta_messages=self.response_messages,
|
||||
total_tokens=self.last_step_usage.total_tokens,
|
||||
total_tokens=self.context_token_estimate,
|
||||
force=True,
|
||||
)
|
||||
|
||||
@@ -167,11 +175,11 @@ class LettaAgentV3(LettaAgentV2):
|
||||
|
||||
# Rebuild context window after stepping (safety net)
|
||||
if not self.agent_state.message_buffer_autoclear:
|
||||
if self.last_step_usage:
|
||||
if self.context_token_estimate is not None:
|
||||
await self.summarize_conversation_history(
|
||||
in_context_messages=in_context_messages,
|
||||
new_letta_messages=self.response_messages,
|
||||
total_tokens=self.last_step_usage.total_tokens,
|
||||
total_tokens=self.context_token_estimate,
|
||||
force=False,
|
||||
)
|
||||
else:
|
||||
@@ -276,8 +284,8 @@ class LettaAgentV3(LettaAgentV2):
|
||||
|
||||
# Proactive summarization if approaching context limit
|
||||
if (
|
||||
self.last_step_usage
|
||||
and self.last_step_usage.total_tokens > self.agent_state.llm_config.context_window * SUMMARIZATION_TRIGGER_MULTIPLIER
|
||||
self.context_token_estimate is not None
|
||||
and self.context_token_estimate > self.agent_state.llm_config.context_window * SUMMARIZATION_TRIGGER_MULTIPLIER
|
||||
and not self.agent_state.message_buffer_autoclear
|
||||
):
|
||||
self.logger.warning(
|
||||
@@ -288,7 +296,7 @@ class LettaAgentV3(LettaAgentV2):
|
||||
in_context_messages = await self.summarize_conversation_history(
|
||||
in_context_messages=in_context_messages,
|
||||
new_letta_messages=self.response_messages,
|
||||
total_tokens=self.last_step_usage.total_tokens,
|
||||
total_tokens=self.context_token_estimate,
|
||||
force=True,
|
||||
)
|
||||
|
||||
@@ -304,11 +312,11 @@ class LettaAgentV3(LettaAgentV2):
|
||||
self.stop_reason = LettaStopReason(stop_reason=StopReasonType.max_steps.value)
|
||||
|
||||
if not self.agent_state.message_buffer_autoclear:
|
||||
if self.last_step_usage:
|
||||
if self.context_token_estimate is not None:
|
||||
await self.summarize_conversation_history(
|
||||
in_context_messages=in_context_messages,
|
||||
new_letta_messages=self.response_messages,
|
||||
total_tokens=self.last_step_usage.total_tokens,
|
||||
total_tokens=self.context_token_estimate,
|
||||
force=False,
|
||||
)
|
||||
else:
|
||||
@@ -1364,7 +1372,8 @@ class LettaAgentV3(LettaAgentV2):
|
||||
template_id=self.agent_state.template_id,
|
||||
)
|
||||
|
||||
# Update the message_ids in the agent state
|
||||
# Update the message_ids in the agent state to include the summary
|
||||
# plus whatever tail we decided to keep.
|
||||
new_in_context_messages = [in_context_messages[0], summary_message_obj] + new_in_context_messages
|
||||
new_in_context_message_ids = [m.id for m in new_in_context_messages]
|
||||
await self.agent_manager.update_message_ids_async(
|
||||
@@ -1374,4 +1383,71 @@ class LettaAgentV3(LettaAgentV2):
|
||||
)
|
||||
self.agent_state.message_ids = new_in_context_message_ids
|
||||
|
||||
# After summarization, recompute an approximate token count for the
|
||||
# updated in-context messages so that subsequent summarization
|
||||
# decisions don't keep firing based on a stale, pre-summarization
|
||||
# total_tokens value.
|
||||
try:
|
||||
new_total_tokens = await count_tokens(
|
||||
actor=self.actor,
|
||||
llm_config=self.agent_state.llm_config,
|
||||
messages=new_in_context_messages,
|
||||
)
|
||||
|
||||
context_limit = self.agent_state.llm_config.context_window
|
||||
trigger_threshold = int(context_limit * SUMMARIZATION_TRIGGER_MULTIPLIER)
|
||||
|
||||
# If even after summarization the context is still at or above
|
||||
# the proactive summarization threshold, treat this as a hard
|
||||
# failure: log loudly and evict all prior conversation state
|
||||
# (keeping only the system message) to avoid getting stuck in
|
||||
# repeated summarization loops.
|
||||
if new_total_tokens > trigger_threshold:
|
||||
self.logger.error(
|
||||
"Summarization failed to sufficiently reduce context size: "
|
||||
f"post-summarization tokens={new_total_tokens}, "
|
||||
f"threshold={trigger_threshold}, context_window={context_limit}. "
|
||||
"Evicting all prior messages without a summary to break potential loops.",
|
||||
)
|
||||
|
||||
# Keep only the system message in-context.
|
||||
system_message = in_context_messages[0]
|
||||
new_in_context_messages = [system_message]
|
||||
new_in_context_message_ids = [system_message.id]
|
||||
|
||||
await self.agent_manager.update_message_ids_async(
|
||||
agent_id=self.agent_state.id,
|
||||
message_ids=new_in_context_message_ids,
|
||||
actor=self.actor,
|
||||
)
|
||||
self.agent_state.message_ids = new_in_context_message_ids
|
||||
|
||||
# Recompute token usage for this minimal context and update
|
||||
# context_token_estimate so future checks see the reduced size.
|
||||
try:
|
||||
minimal_tokens = await count_tokens(
|
||||
actor=self.actor,
|
||||
llm_config=self.agent_state.llm_config,
|
||||
messages=new_in_context_messages,
|
||||
)
|
||||
self.context_token_estimate = minimal_tokens
|
||||
except Exception as inner_e:
|
||||
self.logger.warning(
|
||||
f"Failed to recompute token usage after hard eviction: {inner_e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return new_in_context_messages
|
||||
|
||||
# Normal case: summarization succeeded in bringing us below the
|
||||
# proactive threshold. Update context_token_estimate so future
|
||||
# summarization checks reason over the *post*-summarization
|
||||
# context size.
|
||||
self.context_token_estimate = new_total_tokens
|
||||
except Exception as e: # best-effort; never block the agent on this
|
||||
self.logger.warning(
|
||||
f"Failed to recompute token usage after summarization: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
return new_in_context_messages
|
||||
|
||||
@@ -720,6 +720,105 @@ async def test_summarize_with_mode(server: SyncServer, actor, llm_config: LLMCon
|
||||
print(f"Mode '{mode}' with {llm_config.model}: {len(in_context_messages)} -> {len(result)} messages")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("llm_config", TESTED_LLM_CONFIGS, ids=[c.model for c in TESTED_LLM_CONFIGS])
|
||||
async def test_v3_summarize_hard_eviction_when_still_over_threshold(
|
||||
server: SyncServer,
|
||||
actor,
|
||||
llm_config: LLMConfig,
|
||||
caplog,
|
||||
):
|
||||
"""Regression test: ensure V3 summarizer does a hard eviction when
|
||||
summarization fails to bring the context size below the proactive
|
||||
summarization threshold.
|
||||
|
||||
This test simulates the edge case that previously led to summarization
|
||||
loops:
|
||||
|
||||
1. A large pre-summarization token count triggers summarization.
|
||||
2. Even after summarization, the (mocked) post-summarization token count
|
||||
is still above the trigger threshold.
|
||||
3. We verify that LettaAgentV3:
|
||||
- Logs an error about summarization failing to reduce context size.
|
||||
- Evicts all prior messages, keeping only the system message.
|
||||
- Updates `context_token_estimate` to the token count of the minimal
|
||||
context so future steps don't keep re-triggering summarization based
|
||||
on a stale, oversized value.
|
||||
"""
|
||||
|
||||
# Build a small but non-trivial conversation with an explicit system
|
||||
# message so that after hard eviction we expect to keep exactly that one
|
||||
# message.
|
||||
messages = [
|
||||
PydanticMessage(
|
||||
role=MessageRole.system,
|
||||
content=[TextContent(type="text", text="You are a helpful assistant.")],
|
||||
),
|
||||
PydanticMessage(
|
||||
role=MessageRole.user,
|
||||
content=[TextContent(type="text", text="User message 0: hello")],
|
||||
),
|
||||
PydanticMessage(
|
||||
role=MessageRole.assistant,
|
||||
content=[TextContent(type="text", text="Assistant response 0: hi there")],
|
||||
),
|
||||
]
|
||||
|
||||
agent_state, in_context_messages = await create_agent_with_messages(server, actor, llm_config, messages)
|
||||
|
||||
# Create the V3 agent loop
|
||||
agent_loop = LettaAgentV3(agent_state=agent_state, actor=actor)
|
||||
|
||||
# We don't care which summarizer mode is used here; we just need
|
||||
# summarize_conversation_history to run and then hit the branch where the
|
||||
# *post*-summarization token count is still above the proactive
|
||||
# summarization threshold. We simulate that by patching the
|
||||
# letta_agent_v3-level count_tokens helper to report an extremely large
|
||||
# token count for the first call (post-summary) and a small count for the
|
||||
# second call (after hard eviction).
|
||||
with patch("letta.agents.letta_agent_v3.count_tokens") as mock_count_tokens:
|
||||
# First call: pretend the summarized context is still huge relative to
|
||||
# this model's context window so that we always trigger the
|
||||
# hard-eviction path. Second call: minimal context (system only) is
|
||||
# small.
|
||||
context_limit = llm_config.context_window or 100_000
|
||||
huge_tokens = context_limit * 10 # safely above any reasonable trigger
|
||||
mock_count_tokens.side_effect = [huge_tokens, 10]
|
||||
|
||||
caplog.set_level("ERROR")
|
||||
|
||||
result = await agent_loop.summarize_conversation_history(
|
||||
in_context_messages=in_context_messages,
|
||||
new_letta_messages=[],
|
||||
# total_tokens is not used when force=True for triggering, but we
|
||||
# set it to a large value for clarity.
|
||||
total_tokens=llm_config.context_window * 2 if llm_config.context_window else None,
|
||||
force=True,
|
||||
)
|
||||
|
||||
# We should have made exactly two token-count calls: one for the
|
||||
# summarized context, one for the hard-evicted minimal context.
|
||||
assert mock_count_tokens.call_count == 2
|
||||
|
||||
# After hard eviction, only the system message should remain in-context.
|
||||
assert isinstance(result, list)
|
||||
assert len(result) == 1, f"Expected only the system message after hard eviction, got {len(result)} messages"
|
||||
assert result[0].role == MessageRole.system
|
||||
|
||||
# Agent state should also reflect exactly one message id.
|
||||
assert len(agent_loop.agent_state.message_ids) == 1
|
||||
|
||||
# context_token_estimate should be updated to the minimal token count
|
||||
# (second side-effect value from count_tokens), rather than the original
|
||||
# huge value.
|
||||
assert agent_loop.context_token_estimate == 10
|
||||
|
||||
# Verify that we logged an error about summarization failing to reduce
|
||||
# context size.
|
||||
error_logs = [rec for rec in caplog.records if "Summarization failed to sufficiently reduce context size" in rec.getMessage()]
|
||||
assert error_logs, "Expected an error log when summarization fails to reduce context size sufficiently"
|
||||
|
||||
|
||||
# ======================================================================================================================
|
||||
# Sliding Window Summarizer Unit Tests
|
||||
# ======================================================================================================================
|
||||
|
||||
Reference in New Issue
Block a user