fix: update ContextWindowCalculator to parse new system message sections (#9398)

* fix: update ContextWindowCalculator to parse new system message sections

The context window calculator was using outdated position-based parsing
that only handled 3 sections (base_instructions, memory_blocks, memory_metadata).
The actual system message now includes additional sections that were not
being tracked:

- <memory_filesystem> (git-enabled agents)
- <tool_usage_rules> (when tool rules configured)
- <directories> (when sources attached)

Changes:
- Add _extract_tag_content() helper for proper XML tag extraction
- Rewrite extract_system_components() to return a Dict with all 6 sections
- Update calculate_context_window() to count tokens for new sections
- Add new fields to ContextWindowOverview schema with backward-compatible defaults
- Add unit tests for the extraction logic

* update

* generate

* fix: check attached file in directories section instead of core_memory

Files are rendered inside <directories> tags, not <memory_blocks>.
Update validate_context_window_overview assertions accordingly.

* fix: address review feedback for context window parser

- Fix git-enabled agents regression: capture bare file blocks
  (e.g. <system/human.md>) rendered after </memory_filesystem> as
  core_memory via new _extract_git_core_memory() method
- Make _extract_top_level_tag robust: scan all occurrences to find
  tag outside container, handling nested-first + top-level-later case
- Document system_prompt tag inconsistency in docstring
- Add TODO to base_agent.py extract_dynamic_section linking to
  ContextWindowCalculator to flag parallel parser tech debt
- Add tests: git-enabled agent parsing, dual-occurrence tag
  extraction, pure text system prompt, git-enabled integration test
This commit is contained in:
jnjpng
2026-02-10 16:22:54 -08:00
committed by Caren Thomas
parent 7cc1cd3dc0
commit 39b25a0e3c
6 changed files with 906 additions and 86 deletions

View File

@@ -30242,6 +30242,60 @@
"title": "Core Memory", "title": "Core Memory",
"description": "The content of the core memory." "description": "The content of the core memory."
}, },
"num_tokens_memory_filesystem": {
"type": "integer",
"title": "Num Tokens Memory Filesystem",
"description": "The number of tokens in the memory filesystem section (git-enabled agents only).",
"default": 0
},
"memory_filesystem": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Memory Filesystem",
"description": "The content of the memory filesystem section."
},
"num_tokens_tool_usage_rules": {
"type": "integer",
"title": "Num Tokens Tool Usage Rules",
"description": "The number of tokens in the tool usage rules section.",
"default": 0
},
"tool_usage_rules": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Tool Usage Rules",
"description": "The content of the tool usage rules section."
},
"num_tokens_directories": {
"type": "integer",
"title": "Num Tokens Directories",
"description": "The number of tokens in the directories section (attached sources).",
"default": 0
},
"directories": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Directories",
"description": "The content of the directories section."
},
"num_tokens_summary_memory": { "num_tokens_summary_memory": {
"type": "integer", "type": "integer",
"title": "Num Tokens Summary Memory", "title": "Num Tokens Summary Memory",

View File

@@ -125,6 +125,9 @@ class BaseAgent(ABC):
# extract the dynamic section that includes memory blocks, tool rules, and directories # extract the dynamic section that includes memory blocks, tool rules, and directories
# this avoids timestamp comparison issues # this avoids timestamp comparison issues
# TODO: This is a separate position-based parser for the same system message format
# parsed by ContextWindowCalculator.extract_system_components(). Consider unifying
# to avoid divergence. See PR #9398 for context.
def extract_dynamic_section(text): def extract_dynamic_section(text):
start_marker = "</base_instructions>" start_marker = "</base_instructions>"
end_marker = "<memory_metadata>" end_marker = "<memory_metadata>"

View File

@@ -43,6 +43,17 @@ class ContextWindowOverview(BaseModel):
num_tokens_core_memory: int = Field(..., description="The number of tokens in the core memory.") num_tokens_core_memory: int = Field(..., description="The number of tokens in the core memory.")
core_memory: str = Field(..., description="The content of the core memory.") core_memory: str = Field(..., description="The content of the core memory.")
num_tokens_memory_filesystem: int = Field(
0, description="The number of tokens in the memory filesystem section (git-enabled agents only)."
)
memory_filesystem: Optional[str] = Field(None, description="The content of the memory filesystem section.")
num_tokens_tool_usage_rules: int = Field(0, description="The number of tokens in the tool usage rules section.")
tool_usage_rules: Optional[str] = Field(None, description="The content of the tool usage rules section.")
num_tokens_directories: int = Field(0, description="The number of tokens in the directories section (attached sources).")
directories: Optional[str] = Field(None, description="The content of the directories section.")
num_tokens_summary_memory: int = Field(..., description="The number of tokens in the summary memory.") num_tokens_summary_memory: int = Field(..., description="The number of tokens in the summary memory.")
summary_memory: Optional[str] = Field(None, description="The content of the summary memory.") summary_memory: Optional[str] = Field(None, description="The content of the summary memory.")

View File

@@ -1,5 +1,5 @@
import asyncio import asyncio
from typing import Any, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from openai.types.beta.function_tool import FunctionTool as OpenAITool from openai.types.beta.function_tool import FunctionTool as OpenAITool
@@ -20,68 +20,195 @@ class ContextWindowCalculator:
"""Handles context window calculations with different token counting strategies""" """Handles context window calculations with different token counting strategies"""
@staticmethod @staticmethod
def extract_system_components(system_message: str) -> Tuple[str, str, str]: def _extract_tag_content(text: str, tag_name: str) -> Optional[str]:
"""Extract system prompt + core memory + metadata from a system message.
Historically, Letta system messages were formatted with:
- <base_instructions> ...
- <memory_blocks> ...
- <memory_metadata> ...
Git-backed memory agents do NOT wrap their rendered memory in <memory_blocks>.
Instead, the memory content typically begins with <memory_filesystem> followed
by file-like tags such as <system/human.md>...</system/human.md>.
This helper supports both formats so the context window preview can display
core memory for git-enabled agents.
""" """
Extract content between XML-style opening and closing tags.
base_start = system_message.find("<base_instructions>") Args:
memory_blocks_start = system_message.find("<memory_blocks>") text: The text to search in
if memory_blocks_start == -1: tag_name: The name of the tag (without < >)
# Git-memory-enabled agents render <memory_filesystem> instead of <memory_blocks>
memory_blocks_start = system_message.find("<memory_filesystem>")
metadata_start = system_message.find("<memory_metadata>")
system_prompt = "" Returns:
core_memory = "" The content between tags (inclusive of tags), or None if not found
external_memory_summary = ""
# Always extract metadata if present Note:
if metadata_start != -1: If duplicate tags exist, only the first occurrence is extracted.
external_memory_summary = system_message[metadata_start:].strip() """
start_tag = f"<{tag_name}>"
end_tag = f"</{tag_name}>"
# Preferred (legacy) parsing when tags are present start_idx = text.find(start_tag)
if base_start != -1 and memory_blocks_start != -1: if start_idx == -1:
system_prompt = system_message[base_start:memory_blocks_start].strip() return None
if memory_blocks_start != -1 and metadata_start != -1:
core_memory = system_message[memory_blocks_start:metadata_start].strip()
# Fallback parsing for git-backed memory rendering (no <memory_blocks> wrapper) end_idx = text.find(end_tag, start_idx)
if not core_memory and metadata_start != -1: if end_idx == -1:
# Identify where the "memory" section begins. return None
candidates = []
for marker in (
"<memory_filesystem>",
"<system/", # e.g. <system/human.md>
"<organization/", # future-proofing
):
pos = system_message.find(marker)
if pos != -1:
candidates.append(pos)
# If <memory_blocks> is present but core_memory wasn't extracted (e.g. missing base tags), return text[start_idx : end_idx + len(end_tag)]
# allow it as a candidate as well.
if memory_blocks_start != -1:
candidates.append(memory_blocks_start)
if candidates: @staticmethod
mem_start = min(candidates) def _extract_system_prompt(system_message: str) -> Optional[str]:
core_memory = system_message[mem_start:metadata_start].strip() """
if not system_prompt: Extract the system prompt / base instructions from a system message.
system_prompt = system_message[:mem_start].strip()
return system_prompt, core_memory, external_memory_summary First tries to find an explicit <base_instructions> tag. If not present
(e.g. custom system prompts from Letta Code agents), falls back to
extracting everything before the first known section tag.
Returns:
The system prompt text, or None if the message is empty.
Note:
The returned value is semantically different depending on agent type:
- Standard agents: includes the <base_instructions>...</base_instructions> tags
- Custom prompt agents (e.g. Letta Code): raw preamble text without any tags
"""
_extract = ContextWindowCalculator._extract_tag_content
# Preferred: explicit <base_instructions> wrapper
tagged = _extract(system_message, "base_instructions")
if tagged is not None:
return tagged
# Fallback: everything before the first known section tag
section_tags = ["<memory_blocks>", "<memory_filesystem>", "<tool_usage_rules>", "<directories>", "<memory_metadata>"]
first_section_pos = len(system_message)
for tag in section_tags:
pos = system_message.find(tag)
if pos != -1 and pos < first_section_pos:
first_section_pos = pos
prompt = system_message[:first_section_pos].strip()
return prompt if prompt else None
@staticmethod
def _extract_top_level_tag(system_message: str, tag_name: str, container_tag: str = "memory_blocks") -> Optional[str]:
"""
Extract a tag only if it appears outside a container tag.
This prevents extracting tags that are nested inside <memory_blocks> as
memory block labels (e.g. a block named "memory_filesystem" rendered as
<memory_filesystem> inside <memory_blocks>) from being confused with
top-level sections.
Handles the case where a tag appears both nested (inside the container)
and at top-level — scans all occurrences to find one outside the container.
Args:
system_message: The full system message text
tag_name: The tag to extract
container_tag: The container tag to check nesting against
Returns:
The tag content if found at top level, None otherwise.
"""
_extract = ContextWindowCalculator._extract_tag_content
start_tag = f"<{tag_name}>"
end_tag = f"</{tag_name}>"
# Find the container boundaries
container_start = system_message.find(f"<{container_tag}>")
container_end = system_message.find(f"</{container_tag}>")
has_container = container_start != -1 and container_end != -1
# Scan all occurrences of the tag to find one outside the container
search_start = 0
while True:
tag_start = system_message.find(start_tag, search_start)
if tag_start == -1:
return None
# Check if this occurrence is nested inside the container
if has_container and container_start < tag_start < container_end:
# Skip past this nested occurrence
search_start = tag_start + len(start_tag)
continue
# Found a top-level occurrence — extract it
tag_end = system_message.find(end_tag, tag_start)
if tag_end == -1:
return None
return system_message[tag_start : tag_end + len(end_tag)]
@staticmethod
def _extract_git_core_memory(system_message: str) -> Optional[str]:
"""
Extract bare file blocks for git-enabled agents.
Git-enabled agents render individual memory blocks as bare tags like
<system/human.md>...</system/human.md> WITHOUT any container tag.
These appear after </memory_filesystem> and before the next known
section tag (<tool_usage_rules>, <directories>, or <memory_metadata>).
Returns:
The text containing all bare file blocks, or None if not found.
"""
end_marker = "</memory_filesystem>"
end_pos = system_message.find(end_marker)
if end_pos == -1:
return None
start = end_pos + len(end_marker)
# Find the next known section tag
next_section_tags = ["<tool_usage_rules>", "<directories>", "<memory_metadata>"]
next_section_pos = len(system_message)
for tag in next_section_tags:
pos = system_message.find(tag, start)
if pos != -1 and pos < next_section_pos:
next_section_pos = pos
content = system_message[start:next_section_pos].strip()
return content if content else None
@staticmethod
def extract_system_components(system_message: str) -> Dict[str, Optional[str]]:
"""
Extract structured components from a formatted system message.
Parses the system message to extract sections marked by XML-style tags using
proper end-tag matching. Handles all agent types including:
- Standard agents with <base_instructions> wrapper
- Custom system prompts without <base_instructions> (e.g. Letta Code agents)
- Git-enabled agents with top-level <memory_filesystem> and bare file blocks
- React/workflow agents that don't render <memory_blocks>
Args:
system_message: A formatted system message containing XML-style section markers
Returns:
A dictionary with the following keys (value is None if section not found):
- system_prompt: The base instructions section (or text before first section tag)
- core_memory: The memory blocks section. For standard agents this is the
<memory_blocks>...</memory_blocks> content. For git-enabled agents (no
<memory_blocks> but top-level <memory_filesystem>), this captures the bare
file blocks (e.g. <system/human.md>) that follow </memory_filesystem>.
- memory_filesystem: Top-level memory filesystem (git-enabled agents only, NOT
the memory_filesystem block nested inside <memory_blocks>)
- tool_usage_rules: The tool usage rules section
- directories: The directories section (when sources are attached)
- external_memory_summary: The memory metadata section
"""
_extract = ContextWindowCalculator._extract_tag_content
_extract_top = ContextWindowCalculator._extract_top_level_tag
core_memory = _extract(system_message, "memory_blocks")
memory_filesystem = _extract_top(system_message, "memory_filesystem")
# Git-enabled agents: no <memory_blocks>, but bare file blocks after </memory_filesystem>
if core_memory is None and memory_filesystem is not None:
core_memory = ContextWindowCalculator._extract_git_core_memory(system_message)
return {
"system_prompt": ContextWindowCalculator._extract_system_prompt(system_message),
"core_memory": core_memory,
"memory_filesystem": memory_filesystem,
"tool_usage_rules": _extract_top(system_message, "tool_usage_rules"),
"directories": _extract_top(system_message, "directories"),
"external_memory_summary": _extract(system_message, "memory_metadata"),
}
@staticmethod @staticmethod
def extract_summary_memory(messages: List[Any]) -> Tuple[Optional[str], int]: def extract_summary_memory(messages: List[Any]) -> Tuple[Optional[str], int]:
@@ -154,9 +281,14 @@ class ContextWindowCalculator:
converted_messages = token_counter.convert_messages(in_context_messages) converted_messages = token_counter.convert_messages(in_context_messages)
# Extract system components # Extract system components
system_prompt = "" components: Dict[str, Optional[str]] = {
core_memory = "" "system_prompt": None,
external_memory_summary = "" "core_memory": None,
"memory_filesystem": None,
"tool_usage_rules": None,
"directories": None,
"external_memory_summary": None,
}
if ( if (
in_context_messages in_context_messages
@@ -166,10 +298,15 @@ class ContextWindowCalculator:
and isinstance(in_context_messages[0].content[0], TextContent) and isinstance(in_context_messages[0].content[0], TextContent)
): ):
system_message = in_context_messages[0].content[0].text system_message = in_context_messages[0].content[0].text
system_prompt, core_memory, external_memory_summary = self.extract_system_components(system_message) components = self.extract_system_components(system_message)
# System prompt # Extract each component with fallbacks
system_prompt = system_prompt or agent_state.system system_prompt = components.get("system_prompt") or agent_state.system or ""
core_memory = components.get("core_memory") or ""
memory_filesystem = components.get("memory_filesystem") or ""
tool_usage_rules = components.get("tool_usage_rules") or ""
directories = components.get("directories") or ""
external_memory_summary = components.get("external_memory_summary") or ""
# Extract summary memory # Extract summary memory
summary_memory, message_start_index = self.extract_summary_memory(in_context_messages) summary_memory, message_start_index = self.extract_summary_memory(in_context_messages)
@@ -179,11 +316,14 @@ class ContextWindowCalculator:
if agent_state.tools: if agent_state.tools:
available_functions_definitions = [OpenAITool(type="function", function=f.json_schema) for f in agent_state.tools] available_functions_definitions = [OpenAITool(type="function", function=f.json_schema) for f in agent_state.tools]
# Count tokens concurrently # Count tokens concurrently for all sections, skipping empty ones
token_counts = await asyncio.gather( token_counts = await asyncio.gather(
token_counter.count_text_tokens(system_prompt), token_counter.count_text_tokens(system_prompt),
token_counter.count_text_tokens(core_memory), token_counter.count_text_tokens(core_memory) if core_memory else asyncio.sleep(0, result=0),
token_counter.count_text_tokens(external_memory_summary), token_counter.count_text_tokens(memory_filesystem) if memory_filesystem else asyncio.sleep(0, result=0),
token_counter.count_text_tokens(tool_usage_rules) if tool_usage_rules else asyncio.sleep(0, result=0),
token_counter.count_text_tokens(directories) if directories else asyncio.sleep(0, result=0),
token_counter.count_text_tokens(external_memory_summary) if external_memory_summary else asyncio.sleep(0, result=0),
token_counter.count_text_tokens(summary_memory) if summary_memory else asyncio.sleep(0, result=0), token_counter.count_text_tokens(summary_memory) if summary_memory else asyncio.sleep(0, result=0),
( (
token_counter.count_message_tokens(converted_messages[message_start_index:]) token_counter.count_message_tokens(converted_messages[message_start_index:])
@@ -200,6 +340,9 @@ class ContextWindowCalculator:
( (
num_tokens_system, num_tokens_system,
num_tokens_core_memory, num_tokens_core_memory,
num_tokens_memory_filesystem,
num_tokens_tool_usage_rules,
num_tokens_directories,
num_tokens_external_memory_summary, num_tokens_external_memory_summary,
num_tokens_summary_memory, num_tokens_summary_memory,
num_tokens_messages, num_tokens_messages,
@@ -223,6 +366,14 @@ class ContextWindowCalculator:
system_prompt=system_prompt, system_prompt=system_prompt,
num_tokens_core_memory=num_tokens_core_memory, num_tokens_core_memory=num_tokens_core_memory,
core_memory=core_memory, core_memory=core_memory,
# New sections
num_tokens_memory_filesystem=num_tokens_memory_filesystem,
memory_filesystem=memory_filesystem if memory_filesystem else None,
num_tokens_tool_usage_rules=num_tokens_tool_usage_rules,
tool_usage_rules=tool_usage_rules if tool_usage_rules else None,
num_tokens_directories=num_tokens_directories,
directories=directories if directories else None,
# Summary and messages
num_tokens_summary_memory=num_tokens_summary_memory, num_tokens_summary_memory=num_tokens_summary_memory,
summary_memory=summary_memory, summary_memory=summary_memory,
num_tokens_messages=num_tokens_messages, num_tokens_messages=num_tokens_messages,

View File

@@ -186,6 +186,9 @@ def validate_context_window_overview(
# 2. All token counts should be non-negative # 2. All token counts should be non-negative
assert overview.num_tokens_system >= 0, "System token count cannot be negative" assert overview.num_tokens_system >= 0, "System token count cannot be negative"
assert overview.num_tokens_core_memory >= 0, "Core memory token count cannot be negative" assert overview.num_tokens_core_memory >= 0, "Core memory token count cannot be negative"
assert overview.num_tokens_memory_filesystem >= 0, "Memory filesystem token count cannot be negative"
assert overview.num_tokens_tool_usage_rules >= 0, "Tool usage rules token count cannot be negative"
assert overview.num_tokens_directories >= 0, "Directories token count cannot be negative"
assert overview.num_tokens_external_memory_summary >= 0, "External memory summary token count cannot be negative" assert overview.num_tokens_external_memory_summary >= 0, "External memory summary token count cannot be negative"
assert overview.num_tokens_summary_memory >= 0, "Summary memory token count cannot be negative" assert overview.num_tokens_summary_memory >= 0, "Summary memory token count cannot be negative"
assert overview.num_tokens_messages >= 0, "Messages token count cannot be negative" assert overview.num_tokens_messages >= 0, "Messages token count cannot be negative"
@@ -195,6 +198,9 @@ def validate_context_window_overview(
expected_total = ( expected_total = (
overview.num_tokens_system overview.num_tokens_system
+ overview.num_tokens_core_memory + overview.num_tokens_core_memory
+ overview.num_tokens_memory_filesystem
+ overview.num_tokens_tool_usage_rules
+ overview.num_tokens_directories
+ overview.num_tokens_external_memory_summary + overview.num_tokens_external_memory_summary
+ overview.num_tokens_summary_memory + overview.num_tokens_summary_memory
+ overview.num_tokens_messages + overview.num_tokens_messages
@@ -244,13 +250,14 @@ def validate_context_window_overview(
avg_tokens_per_message = overview.num_tokens_messages / overview.num_messages avg_tokens_per_message = overview.num_tokens_messages / overview.num_messages
assert avg_tokens_per_message >= 0, "Average tokens per message should be non-negative" assert avg_tokens_per_message >= 0, "Average tokens per message should be non-negative"
# 16. Check attached file is visible # 16. Check attached file is visible in the directories section
if attached_file: if attached_file:
assert attached_file.visible_content in overview.core_memory, "File must be attached in core memory" assert overview.directories is not None, "Directories section must exist when files are attached"
assert '<file status="open"' in overview.core_memory assert attached_file.visible_content in overview.directories, "File must be attached in directories"
assert "</file>" in overview.core_memory assert '<file status="open"' in overview.directories
assert "max_files_open" in overview.core_memory, "Max files should be set in core memory" assert "</file>" in overview.directories
assert "current_files_open" in overview.core_memory, "Current files should be set in core memory" assert "max_files_open" in overview.directories, "Max files should be set in directories"
assert "current_files_open" in overview.directories, "Current files should be set in directories"
# Check for tools # Check for tools
assert overview.num_tokens_functions_definitions > 0 assert overview.num_tokens_functions_definitions > 0

View File

@@ -1,17 +1,400 @@
from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from letta.services.context_window_calculator.context_window_calculator import ContextWindowCalculator from letta.services.context_window_calculator.context_window_calculator import ContextWindowCalculator
def test_extract_system_components_git_backed_memory_without_memory_blocks_wrapper(): class TestExtractTagContent:
system_message = """You are some system prompt. """Tests for the _extract_tag_content helper method"""
def test_extracts_simple_tag(self):
text = "prefix <tag>content</tag> suffix"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result == "<tag>content</tag>"
def test_returns_none_for_missing_tag(self):
text = "no tags here"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result is None
def test_returns_none_for_missing_opening_tag(self):
text = "content</tag>"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result is None
def test_returns_none_for_unclosed_tag(self):
text = "<tag>content without closing"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result is None
def test_handles_multiline_content(self):
text = "<tag>\nline1\nline2\n</tag>"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result == "<tag>\nline1\nline2\n</tag>"
def test_handles_nested_content(self):
text = "<outer><inner>nested</inner></outer>"
result = ContextWindowCalculator._extract_tag_content(text, "outer")
assert result == "<outer><inner>nested</inner></outer>"
def test_handles_empty_content(self):
text = "<tag></tag>"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result == "<tag></tag>"
def test_extracts_first_occurrence_with_duplicate_tags(self):
"""When duplicate tags exist, only the first occurrence is extracted"""
text = "<tag>first</tag> some text <tag>second</tag>"
result = ContextWindowCalculator._extract_tag_content(text, "tag")
assert result == "<tag>first</tag>"
class TestExtractSystemComponents:
"""Tests for the extract_system_components method"""
def test_extracts_standard_agent_sections(self):
"""Standard agent with base_instructions, memory_blocks, and memory_metadata"""
system_message = """
<base_instructions>
Base prompt here
</base_instructions>
<memory_blocks>
Core memory content
</memory_blocks>
<memory_metadata>
Metadata here
</memory_metadata>
"""
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["system_prompt"] is not None
assert "<base_instructions>" in result["system_prompt"]
assert "Base prompt here" in result["system_prompt"]
assert result["core_memory"] is not None
assert "Core memory content" in result["core_memory"]
assert result["external_memory_summary"] is not None
assert "<memory_metadata>" in result["external_memory_summary"]
# These should be None for standard agent
assert result["memory_filesystem"] is None
assert result["tool_usage_rules"] is None
assert result["directories"] is None
def test_extracts_git_enabled_agent_sections(self):
"""Git-enabled agent has top-level memory_filesystem OUTSIDE memory_blocks"""
system_message = (
"<base_instructions>Base</base_instructions>\n"
"<memory_filesystem>\n"
"memory/\n"
" system/\n"
" human.md (100 chars)\n"
"</memory_filesystem>\n"
"<memory_metadata>Meta</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["core_memory"] is None # git-enabled agents don't use <memory_blocks>
assert result["memory_filesystem"] is not None
assert "memory/" in result["memory_filesystem"]
assert "human.md" in result["memory_filesystem"]
def test_extracts_tool_usage_rules(self):
"""Agent with tool usage rules configured"""
system_message = """
<base_instructions>Base</base_instructions>
<memory_blocks>Memory</memory_blocks>
<tool_usage_rules>
You must use tools in a specific order.
</tool_usage_rules>
<memory_metadata>Meta</memory_metadata>
"""
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["tool_usage_rules"] is not None
assert "specific order" in result["tool_usage_rules"]
def test_extracts_directories(self):
"""Agent with attached sources has directories section"""
system_message = """
<base_instructions>Base</base_instructions>
<memory_blocks>Memory</memory_blocks>
<directories>
<directory name="project">
<file status="open" name="readme.md">
README content
</file>
</directory>
</directories>
<memory_metadata>Meta</memory_metadata>
"""
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["directories"] is not None
assert '<directory name="project">' in result["directories"]
assert "readme.md" in result["directories"]
def test_handles_react_agent_no_memory_blocks(self):
"""React/workflow agents don't render <memory_blocks>"""
system_message = """
<base_instructions>React agent base</base_instructions>
<directories>
Some directory content
</directories>
<memory_metadata>Meta</memory_metadata>
"""
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["system_prompt"] is not None
assert result["core_memory"] is None # No memory_blocks for react agents
assert result["directories"] is not None
assert result["external_memory_summary"] is not None
def test_handles_all_sections_present(self):
"""Full agent with all optional sections"""
system_message = """
<base_instructions>Base instructions</base_instructions>
<memory_blocks>Memory blocks content</memory_blocks>
<memory_filesystem>Filesystem tree</memory_filesystem>
<tool_usage_rules>Tool rules</tool_usage_rules>
<directories>Directories content</directories>
<memory_metadata>Metadata</memory_metadata>
"""
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["system_prompt"] is not None
assert result["core_memory"] is not None
assert result["memory_filesystem"] is not None
assert result["tool_usage_rules"] is not None
assert result["directories"] is not None
assert result["external_memory_summary"] is not None
def test_handles_empty_string(self):
"""Empty input returns all None values"""
result = ContextWindowCalculator.extract_system_components("")
assert all(v is None for v in result.values())
def test_returns_correct_dict_keys(self):
"""Verify the returned dict has all expected keys"""
result = ContextWindowCalculator.extract_system_components("")
expected_keys = {
"system_prompt",
"core_memory",
"memory_filesystem",
"tool_usage_rules",
"directories",
"external_memory_summary",
}
assert set(result.keys()) == expected_keys
def test_no_base_instructions_tag_extracts_preamble(self):
"""Custom system prompts without <base_instructions> should extract preamble text"""
system_message = (
"You are a helpful AI agent.\n"
"Use the tools available to you.\n\n"
"<memory_blocks>\n"
"<persona>My name is Letta.</persona>\n"
"</memory_blocks>\n\n"
"<memory_metadata>Metadata here</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["system_prompt"] is not None
assert "helpful AI agent" in result["system_prompt"]
assert "Use the tools" in result["system_prompt"]
# Should NOT include memory_blocks content
assert "<memory_blocks>" not in result["system_prompt"]
assert "<memory_metadata>" not in result["system_prompt"]
assert result["core_memory"] is not None
assert result["external_memory_summary"] is not None
def test_nested_memory_filesystem_not_extracted_as_top_level(self):
"""memory_filesystem block INSIDE memory_blocks should NOT be extracted as top-level"""
system_message = (
"You are a self-improving AI agent.\n\n"
"<memory_blocks>\n"
"The following memory blocks are currently engaged:\n\n"
"<memory_filesystem>\n"
"<value>\n"
"/memory/\n"
"\u251c\u2500\u2500 system/\n"
"\u2502 \u251c\u2500\u2500 human.md\n"
"\u2502 \u2514\u2500\u2500 persona.md\n"
"</value>\n"
"</memory_filesystem>\n\n"
"<persona>My name is Letta.</persona>\n"
"</memory_blocks>\n\n"
"<memory_metadata>Metadata</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
# memory_filesystem is nested inside memory_blocks - should NOT be extracted
assert result["memory_filesystem"] is None
# core_memory should include the full memory_blocks content (including the nested filesystem)
assert result["core_memory"] is not None
assert "<memory_filesystem>" in result["core_memory"]
assert "human.md" in result["core_memory"]
def test_top_level_memory_filesystem_outside_memory_blocks(self):
"""Top-level memory_filesystem (git-enabled) rendered BEFORE memory_blocks is extracted"""
system_message = (
"<base_instructions>Base</base_instructions>\n"
"<memory_filesystem>\n"
"\u251c\u2500\u2500 system/\n"
"\u2502 \u2514\u2500\u2500 human.md\n"
"</memory_filesystem>\n\n"
"<system/human.md>\n---\ndescription: About the human\n---\nName: Alice\n</system/human.md>\n\n"
"<memory_metadata>Meta</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
# This memory_filesystem is top-level (no memory_blocks container)
assert result["memory_filesystem"] is not None
assert "human.md" in result["memory_filesystem"]
# Bare file blocks after </memory_filesystem> are captured as core_memory
assert result["core_memory"] is not None
assert "<system/human.md>" in result["core_memory"]
assert "Name: Alice" in result["core_memory"]
def test_letta_code_agent_real_format(self):
"""Real-world Letta Code agent format: no base_instructions, nested memory_filesystem"""
system_message = (
"You are a self-improving AI agent with advanced memory.\n"
"You are connected to an interactive CLI tool.\n\n"
"# Memory\n"
"You have an advanced memory system.\n\n"
"<memory_blocks>\n"
"The following memory blocks are currently engaged:\n\n"
"<memory_filesystem>\n"
"<description>Filesystem view</description>\n"
"<value>\n"
"/memory/\n"
"\u251c\u2500\u2500 system/\n"
"\u2502 \u251c\u2500\u2500 human.md\n"
"\u2502 \u2514\u2500\u2500 persona.md\n"
"</value>\n"
"</memory_filesystem>\n\n"
"<persona>\n"
"<value>My name is Letta Code.</value>\n"
"</persona>\n\n"
"<human>\n"
"<value>Name: Jin Peng</value>\n"
"</human>\n"
"</memory_blocks>\n\n"
"<memory_metadata>\n"
"- The current system date is: February 10, 2026\n"
"- 9663 previous messages in recall memory\n"
"</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
# System prompt: preamble before <memory_blocks>
assert result["system_prompt"] is not None
assert "self-improving AI agent" in result["system_prompt"]
assert "advanced memory system" in result["system_prompt"]
assert "<memory_blocks>" not in result["system_prompt"]
# Core memory: the full <memory_blocks> section
assert result["core_memory"] is not None
assert "Letta Code" in result["core_memory"]
assert "Jin Peng" in result["core_memory"]
# memory_filesystem is NESTED inside memory_blocks - should NOT be extracted
assert result["memory_filesystem"] is None
# No tool_usage_rules or directories
assert result["tool_usage_rules"] is None
assert result["directories"] is None
# External memory summary
assert result["external_memory_summary"] is not None
assert "February 10, 2026" in result["external_memory_summary"]
def test_git_enabled_agent_bare_file_blocks_captured_as_core_memory(self):
"""Git-enabled agents render bare file blocks after </memory_filesystem> — these must be captured as core_memory"""
system_message = (
"<base_instructions>Base</base_instructions>\n"
"<memory_filesystem>\n"
"\u251c\u2500\u2500 system/\n"
"\u2502 \u251c\u2500\u2500 human.md\n"
"\u2502 \u2514\u2500\u2500 persona.md\n"
"</memory_filesystem>\n\n"
"<system/human.md>\n---\ndescription: About the human\nlimit: 2000\n---\nName: Alice\n</system/human.md>\n\n"
"<system/persona.md>\n---\ndescription: Agent persona\n---\nI am a helpful assistant.\n</system/persona.md>\n\n"
"<tool_usage_rules>Always call send_message to respond.</tool_usage_rules>\n"
"<memory_metadata>Meta</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
# memory_filesystem should be the tree view only
assert result["memory_filesystem"] is not None
assert "\u251c\u2500\u2500 system/" in result["memory_filesystem"]
# core_memory should capture the bare file blocks
assert result["core_memory"] is not None
assert "<system/human.md>" in result["core_memory"]
assert "Name: Alice" in result["core_memory"]
assert "<system/persona.md>" in result["core_memory"]
assert "helpful assistant" in result["core_memory"]
# tool_usage_rules should NOT be included in core_memory
assert "<tool_usage_rules>" not in result["core_memory"]
# Other sections
assert result["tool_usage_rules"] is not None
assert result["external_memory_summary"] is not None
def test_git_enabled_agent_no_bare_blocks(self):
"""Git-enabled agent with no file blocks after memory_filesystem returns None for core_memory"""
system_message = (
"<base_instructions>Base</base_instructions>\n"
"<memory_filesystem>\n"
"\u251c\u2500\u2500 system/\n"
"</memory_filesystem>\n"
"<memory_metadata>Meta</memory_metadata>"
)
result = ContextWindowCalculator.extract_system_components(system_message)
assert result["memory_filesystem"] is not None
assert result["core_memory"] is None
def test_extract_top_level_tag_dual_occurrence_nested_first(self):
"""When a tag appears nested first and top-level later, the top-level one is extracted"""
system_message = (
"<memory_blocks>\n"
"<tool_usage_rules>nested rules</tool_usage_rules>\n"
"</memory_blocks>\n\n"
"<tool_usage_rules>top-level rules</tool_usage_rules>"
)
result = ContextWindowCalculator._extract_top_level_tag(system_message, "tool_usage_rules")
assert result is not None
assert "top-level rules" in result
assert "nested rules" not in result
def test_extract_system_prompt_pure_text_no_tags(self):
"""System message with no section tags at all returns the full text as system_prompt"""
system_message = "You are a simple agent.\nYou help the user with tasks."
result = ContextWindowCalculator._extract_system_prompt(system_message)
assert result is not None
assert "simple agent" in result
assert "help the user" in result
def test_git_backed_memory_without_memory_blocks_wrapper(self):
"""Regression test from main: git-backed agents without <memory_blocks> wrapper"""
system_message = """You are some system prompt.
<memory_filesystem> <memory_filesystem>
Memory Directory: ~/.letta/agents/agent-123/memory Memory Directory: ~/.letta/agents/agent-123/memory
/memory/ /memory/
└── system/ \u2514\u2500\u2500 system/
└── human.md \u2514\u2500\u2500 human.md
</memory_filesystem> </memory_filesystem>
<system/human.md> <system/human.md>
@@ -26,17 +409,20 @@ hello
- foo=bar - foo=bar
</memory_metadata> </memory_metadata>
""" """
result = ContextWindowCalculator.extract_system_components(system_message)
system_prompt, core_memory, external_memory_summary = ContextWindowCalculator.extract_system_components(system_message) assert "You are some system prompt" in result["system_prompt"]
# memory_filesystem is a top-level section
assert result["memory_filesystem"] is not None
assert "<memory_filesystem>" in result["memory_filesystem"]
# bare file blocks are captured as core_memory
assert result["core_memory"] is not None
assert "<system/human.md>" in result["core_memory"]
assert result["external_memory_summary"].startswith("<memory_metadata>")
assert "You are some system prompt" in system_prompt def test_legacy_memory_blocks_wrapper(self):
assert "<memory_filesystem>" in core_memory """Regression test from main: legacy memory_blocks wrapper is properly parsed"""
assert "<system/human.md>" in core_memory system_message = """<base_instructions>SYS</base_instructions>
assert external_memory_summary.startswith("<memory_metadata>")
def test_extract_system_components_legacy_memory_blocks_wrapper():
system_message = """<base_instructions>SYS</base_instructions>
<memory_blocks> <memory_blocks>
<persona>p</persona> <persona>p</persona>
@@ -46,9 +432,217 @@ def test_extract_system_components_legacy_memory_blocks_wrapper():
- x=y - x=y
</memory_metadata> </memory_metadata>
""" """
result = ContextWindowCalculator.extract_system_components(system_message)
system_prompt, core_memory, external_memory_summary = ContextWindowCalculator.extract_system_components(system_message) assert result["system_prompt"].startswith("<base_instructions>")
assert result["core_memory"].startswith("<memory_blocks>")
assert result["external_memory_summary"].startswith("<memory_metadata>")
assert system_prompt.startswith("<base_instructions>")
assert core_memory.startswith("<memory_blocks>") def _make_system_message(text: str):
assert external_memory_summary.startswith("<memory_metadata>") """Helper to create a real Message object for use as a system message in tests."""
from letta.schemas.enums import MessageRole
from letta.schemas.letta_message_content import TextContent
from letta.schemas.message import Message
return Message(role=MessageRole.system, content=[TextContent(text=text)])
def _make_mock_deps(system_text: str):
"""Helper to create mocked token_counter, message_manager, and agent_state."""
token_counter = MagicMock()
token_counter.count_text_tokens = AsyncMock(side_effect=lambda text: len(text) if text else 0)
token_counter.count_message_tokens = AsyncMock(return_value=0)
token_counter.count_tool_tokens = AsyncMock(return_value=0)
token_counter.convert_messages = MagicMock(return_value=[{"role": "system", "content": system_text}])
message_manager = MagicMock()
message_manager.get_messages_by_ids_async = AsyncMock(return_value=[])
agent_state = MagicMock()
agent_state.id = "agent-test"
agent_state.message_ids = ["msg-sys"]
agent_state.system = "fallback system prompt"
agent_state.tools = []
agent_state.llm_config.context_window = 128000
actor = MagicMock()
return token_counter, message_manager, agent_state, actor
class TestCalculateContextWindow:
"""Integration tests for calculate_context_window with mocked dependencies"""
@pytest.mark.asyncio
async def test_calculate_context_window_standard_agent(self):
"""Test full context window calculation with a standard system message"""
system_text = (
"<base_instructions>You are a helpful agent.</base_instructions>\n"
"<memory_blocks>human: User is Alice</memory_blocks>\n"
"<memory_metadata>Archival: 5 passages</memory_metadata>"
)
system_msg = _make_system_message(system_text)
token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text)
calculator = ContextWindowCalculator()
result = await calculator.calculate_context_window(
agent_state=agent_state,
actor=actor,
token_counter=token_counter,
message_manager=message_manager,
system_message_compiled=system_msg,
num_archival_memories=5,
num_messages=10,
message_ids=[],
)
assert result.context_window_size_max == 128000
assert result.num_archival_memory == 5
assert result.num_recall_memory == 10
assert result.num_tokens_system > 0
assert "helpful agent" in result.system_prompt
assert result.num_tokens_core_memory > 0
assert "User is Alice" in result.core_memory
assert result.num_tokens_external_memory_summary > 0
# New sections should be None/0 since not in system message
assert result.memory_filesystem is None
assert result.num_tokens_memory_filesystem == 0
assert result.tool_usage_rules is None
assert result.num_tokens_tool_usage_rules == 0
assert result.directories is None
assert result.num_tokens_directories == 0
@pytest.mark.asyncio
async def test_calculate_context_window_skips_empty_sections(self):
"""Verify that token counting is skipped for empty/missing sections"""
# Only base_instructions, no other sections
system_text = "<base_instructions>Simple agent</base_instructions>"
system_msg = _make_system_message(system_text)
token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text)
calculator = ContextWindowCalculator()
await calculator.calculate_context_window(
agent_state=agent_state,
actor=actor,
token_counter=token_counter,
message_manager=message_manager,
system_message_compiled=system_msg,
num_archival_memories=0,
num_messages=0,
message_ids=[],
)
# count_text_tokens should only be called for system_prompt (non-empty)
# and NOT for core_memory, memory_filesystem, tool_usage_rules, directories,
# external_memory_summary, or summary_memory (all empty/None)
calls = token_counter.count_text_tokens.call_args_list
assert len(calls) == 1, f"Expected 1 call to count_text_tokens (system_prompt only), got {len(calls)}: {calls}"
@pytest.mark.asyncio
async def test_calculate_context_window_all_sections(self):
"""Test with all optional sections present"""
system_text = (
"<base_instructions>Agent instructions</base_instructions>\n"
"<memory_blocks>Core memory</memory_blocks>\n"
"<memory_filesystem>\u251c\u2500\u2500 system/\n\u2502 \u2514\u2500\u2500 human.md</memory_filesystem>\n"
"<tool_usage_rules>Always call search first</tool_usage_rules>\n"
'<directories><directory name="docs">content</directory></directories>\n'
"<memory_metadata>Archival: 10 passages</memory_metadata>"
)
system_msg = _make_system_message(system_text)
token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text)
calculator = ContextWindowCalculator()
result = await calculator.calculate_context_window(
agent_state=agent_state,
actor=actor,
token_counter=token_counter,
message_manager=message_manager,
system_message_compiled=system_msg,
num_archival_memories=10,
num_messages=5,
message_ids=[],
)
# All sections should be populated
assert result.num_tokens_system > 0
assert result.num_tokens_core_memory > 0
assert result.num_tokens_memory_filesystem > 0
assert result.memory_filesystem is not None
assert result.num_tokens_tool_usage_rules > 0
assert result.tool_usage_rules is not None
assert result.num_tokens_directories > 0
assert result.directories is not None
assert result.num_tokens_external_memory_summary > 0
# Verify total is sum of all parts
expected_total = (
result.num_tokens_system
+ result.num_tokens_core_memory
+ result.num_tokens_memory_filesystem
+ result.num_tokens_tool_usage_rules
+ result.num_tokens_directories
+ result.num_tokens_external_memory_summary
+ result.num_tokens_summary_memory
+ result.num_tokens_messages
+ result.num_tokens_functions_definitions
)
assert result.context_window_size_current == expected_total
@pytest.mark.asyncio
async def test_calculate_context_window_git_enabled_agent(self):
"""Test that git-enabled agents capture bare file blocks as core_memory"""
system_text = (
"<base_instructions>Git agent</base_instructions>\n"
"<memory_filesystem>\n"
"\u251c\u2500\u2500 system/\n"
"\u2502 \u251c\u2500\u2500 human.md\n"
"\u2502 \u2514\u2500\u2500 persona.md\n"
"</memory_filesystem>\n\n"
"<system/human.md>\n---\ndescription: About the human\n---\nName: Alice\n</system/human.md>\n\n"
"<system/persona.md>\n---\ndescription: Agent persona\n---\nI am helpful.\n</system/persona.md>\n\n"
"<memory_metadata>Archival: 3 passages</memory_metadata>"
)
system_msg = _make_system_message(system_text)
token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text)
calculator = ContextWindowCalculator()
result = await calculator.calculate_context_window(
agent_state=agent_state,
actor=actor,
token_counter=token_counter,
message_manager=message_manager,
system_message_compiled=system_msg,
num_archival_memories=3,
num_messages=5,
message_ids=[],
)
# memory_filesystem should capture the tree view
assert result.memory_filesystem is not None
assert result.num_tokens_memory_filesystem > 0
# core_memory should capture the bare file blocks
assert result.num_tokens_core_memory > 0
assert "Name: Alice" in result.core_memory
assert "<system/persona.md>" in result.core_memory
# Total should include all sections
expected_total = (
result.num_tokens_system
+ result.num_tokens_core_memory
+ result.num_tokens_memory_filesystem
+ result.num_tokens_tool_usage_rules
+ result.num_tokens_directories
+ result.num_tokens_external_memory_summary
+ result.num_tokens_summary_memory
+ result.num_tokens_messages
+ result.num_tokens_functions_definitions
)
assert result.context_window_size_current == expected_total