diff --git a/fern/openapi.json b/fern/openapi.json index 2647f06e..eb428df1 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -30242,6 +30242,60 @@ "title": "Core Memory", "description": "The content of the core memory." }, + "num_tokens_memory_filesystem": { + "type": "integer", + "title": "Num Tokens Memory Filesystem", + "description": "The number of tokens in the memory filesystem section (git-enabled agents only).", + "default": 0 + }, + "memory_filesystem": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Memory Filesystem", + "description": "The content of the memory filesystem section." + }, + "num_tokens_tool_usage_rules": { + "type": "integer", + "title": "Num Tokens Tool Usage Rules", + "description": "The number of tokens in the tool usage rules section.", + "default": 0 + }, + "tool_usage_rules": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Tool Usage Rules", + "description": "The content of the tool usage rules section." + }, + "num_tokens_directories": { + "type": "integer", + "title": "Num Tokens Directories", + "description": "The number of tokens in the directories section (attached sources).", + "default": 0 + }, + "directories": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "title": "Directories", + "description": "The content of the directories section." + }, "num_tokens_summary_memory": { "type": "integer", "title": "Num Tokens Summary Memory", diff --git a/letta/agents/base_agent.py b/letta/agents/base_agent.py index e072146d..94d18232 100644 --- a/letta/agents/base_agent.py +++ b/letta/agents/base_agent.py @@ -125,6 +125,9 @@ class BaseAgent(ABC): # extract the dynamic section that includes memory blocks, tool rules, and directories # this avoids timestamp comparison issues + # TODO: This is a separate position-based parser for the same system message format + # parsed by ContextWindowCalculator.extract_system_components(). Consider unifying + # to avoid divergence. See PR #9398 for context. def extract_dynamic_section(text): start_marker = "" end_marker = "" diff --git a/letta/schemas/memory.py b/letta/schemas/memory.py index 496eb308..b4ca07cf 100644 --- a/letta/schemas/memory.py +++ b/letta/schemas/memory.py @@ -43,6 +43,17 @@ class ContextWindowOverview(BaseModel): num_tokens_core_memory: int = Field(..., description="The number of tokens in the core memory.") core_memory: str = Field(..., description="The content of the core memory.") + num_tokens_memory_filesystem: int = Field( + 0, description="The number of tokens in the memory filesystem section (git-enabled agents only)." + ) + memory_filesystem: Optional[str] = Field(None, description="The content of the memory filesystem section.") + + num_tokens_tool_usage_rules: int = Field(0, description="The number of tokens in the tool usage rules section.") + tool_usage_rules: Optional[str] = Field(None, description="The content of the tool usage rules section.") + + num_tokens_directories: int = Field(0, description="The number of tokens in the directories section (attached sources).") + directories: Optional[str] = Field(None, description="The content of the directories section.") + num_tokens_summary_memory: int = Field(..., description="The number of tokens in the summary memory.") summary_memory: Optional[str] = Field(None, description="The content of the summary memory.") diff --git a/letta/services/context_window_calculator/context_window_calculator.py b/letta/services/context_window_calculator/context_window_calculator.py index b228b532..f4ad2e07 100644 --- a/letta/services/context_window_calculator/context_window_calculator.py +++ b/letta/services/context_window_calculator/context_window_calculator.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from openai.types.beta.function_tool import FunctionTool as OpenAITool @@ -20,68 +20,195 @@ class ContextWindowCalculator: """Handles context window calculations with different token counting strategies""" @staticmethod - def extract_system_components(system_message: str) -> Tuple[str, str, str]: - """Extract system prompt + core memory + metadata from a system message. - - Historically, Letta system messages were formatted with: - - ... - - ... - - ... - - Git-backed memory agents do NOT wrap their rendered memory in . - Instead, the memory content typically begins with followed - by file-like tags such as .... - - This helper supports both formats so the context window preview can display - core memory for git-enabled agents. + def _extract_tag_content(text: str, tag_name: str) -> Optional[str]: """ + Extract content between XML-style opening and closing tags. - base_start = system_message.find("") - memory_blocks_start = system_message.find("") - if memory_blocks_start == -1: - # Git-memory-enabled agents render instead of - memory_blocks_start = system_message.find("") - metadata_start = system_message.find("") + Args: + text: The text to search in + tag_name: The name of the tag (without < >) - system_prompt = "" - core_memory = "" - external_memory_summary = "" + Returns: + The content between tags (inclusive of tags), or None if not found - # Always extract metadata if present - if metadata_start != -1: - external_memory_summary = system_message[metadata_start:].strip() + Note: + If duplicate tags exist, only the first occurrence is extracted. + """ + start_tag = f"<{tag_name}>" + end_tag = f"" - # Preferred (legacy) parsing when tags are present - if base_start != -1 and memory_blocks_start != -1: - system_prompt = system_message[base_start:memory_blocks_start].strip() - if memory_blocks_start != -1 and metadata_start != -1: - core_memory = system_message[memory_blocks_start:metadata_start].strip() + start_idx = text.find(start_tag) + if start_idx == -1: + return None - # Fallback parsing for git-backed memory rendering (no wrapper) - if not core_memory and metadata_start != -1: - # Identify where the "memory" section begins. - candidates = [] - for marker in ( - "", - " - " is present but core_memory wasn't extracted (e.g. missing base tags), - # allow it as a candidate as well. - if memory_blocks_start != -1: - candidates.append(memory_blocks_start) + return text[start_idx : end_idx + len(end_tag)] - if candidates: - mem_start = min(candidates) - core_memory = system_message[mem_start:metadata_start].strip() - if not system_prompt: - system_prompt = system_message[:mem_start].strip() + @staticmethod + def _extract_system_prompt(system_message: str) -> Optional[str]: + """ + Extract the system prompt / base instructions from a system message. - return system_prompt, core_memory, external_memory_summary + First tries to find an explicit tag. If not present + (e.g. custom system prompts from Letta Code agents), falls back to + extracting everything before the first known section tag. + + Returns: + The system prompt text, or None if the message is empty. + + Note: + The returned value is semantically different depending on agent type: + - Standard agents: includes the ... tags + - Custom prompt agents (e.g. Letta Code): raw preamble text without any tags + """ + _extract = ContextWindowCalculator._extract_tag_content + + # Preferred: explicit wrapper + tagged = _extract(system_message, "base_instructions") + if tagged is not None: + return tagged + + # Fallback: everything before the first known section tag + section_tags = ["", "", "", "", ""] + first_section_pos = len(system_message) + for tag in section_tags: + pos = system_message.find(tag) + if pos != -1 and pos < first_section_pos: + first_section_pos = pos + + prompt = system_message[:first_section_pos].strip() + return prompt if prompt else None + + @staticmethod + def _extract_top_level_tag(system_message: str, tag_name: str, container_tag: str = "memory_blocks") -> Optional[str]: + """ + Extract a tag only if it appears outside a container tag. + + This prevents extracting tags that are nested inside as + memory block labels (e.g. a block named "memory_filesystem" rendered as + inside ) from being confused with + top-level sections. + + Handles the case where a tag appears both nested (inside the container) + and at top-level — scans all occurrences to find one outside the container. + + Args: + system_message: The full system message text + tag_name: The tag to extract + container_tag: The container tag to check nesting against + + Returns: + The tag content if found at top level, None otherwise. + """ + _extract = ContextWindowCalculator._extract_tag_content + + start_tag = f"<{tag_name}>" + end_tag = f"" + + # Find the container boundaries + container_start = system_message.find(f"<{container_tag}>") + container_end = system_message.find(f"") + has_container = container_start != -1 and container_end != -1 + + # Scan all occurrences of the tag to find one outside the container + search_start = 0 + while True: + tag_start = system_message.find(start_tag, search_start) + if tag_start == -1: + return None + + # Check if this occurrence is nested inside the container + if has_container and container_start < tag_start < container_end: + # Skip past this nested occurrence + search_start = tag_start + len(start_tag) + continue + + # Found a top-level occurrence — extract it + tag_end = system_message.find(end_tag, tag_start) + if tag_end == -1: + return None + return system_message[tag_start : tag_end + len(end_tag)] + + @staticmethod + def _extract_git_core_memory(system_message: str) -> Optional[str]: + """ + Extract bare file blocks for git-enabled agents. + + Git-enabled agents render individual memory blocks as bare tags like + ... WITHOUT any container tag. + These appear after and before the next known + section tag (, , or ). + + Returns: + The text containing all bare file blocks, or None if not found. + """ + end_marker = "" + end_pos = system_message.find(end_marker) + if end_pos == -1: + return None + + start = end_pos + len(end_marker) + + # Find the next known section tag + next_section_tags = ["", "", ""] + next_section_pos = len(system_message) + for tag in next_section_tags: + pos = system_message.find(tag, start) + if pos != -1 and pos < next_section_pos: + next_section_pos = pos + + content = system_message[start:next_section_pos].strip() + return content if content else None + + @staticmethod + def extract_system_components(system_message: str) -> Dict[str, Optional[str]]: + """ + Extract structured components from a formatted system message. + + Parses the system message to extract sections marked by XML-style tags using + proper end-tag matching. Handles all agent types including: + - Standard agents with wrapper + - Custom system prompts without (e.g. Letta Code agents) + - Git-enabled agents with top-level and bare file blocks + - React/workflow agents that don't render + + Args: + system_message: A formatted system message containing XML-style section markers + + Returns: + A dictionary with the following keys (value is None if section not found): + - system_prompt: The base instructions section (or text before first section tag) + - core_memory: The memory blocks section. For standard agents this is the + ... content. For git-enabled agents (no + but top-level ), this captures the bare + file blocks (e.g. ) that follow . + - memory_filesystem: Top-level memory filesystem (git-enabled agents only, NOT + the memory_filesystem block nested inside ) + - tool_usage_rules: The tool usage rules section + - directories: The directories section (when sources are attached) + - external_memory_summary: The memory metadata section + """ + _extract = ContextWindowCalculator._extract_tag_content + _extract_top = ContextWindowCalculator._extract_top_level_tag + + core_memory = _extract(system_message, "memory_blocks") + memory_filesystem = _extract_top(system_message, "memory_filesystem") + + # Git-enabled agents: no , but bare file blocks after + if core_memory is None and memory_filesystem is not None: + core_memory = ContextWindowCalculator._extract_git_core_memory(system_message) + + return { + "system_prompt": ContextWindowCalculator._extract_system_prompt(system_message), + "core_memory": core_memory, + "memory_filesystem": memory_filesystem, + "tool_usage_rules": _extract_top(system_message, "tool_usage_rules"), + "directories": _extract_top(system_message, "directories"), + "external_memory_summary": _extract(system_message, "memory_metadata"), + } @staticmethod def extract_summary_memory(messages: List[Any]) -> Tuple[Optional[str], int]: @@ -154,9 +281,14 @@ class ContextWindowCalculator: converted_messages = token_counter.convert_messages(in_context_messages) # Extract system components - system_prompt = "" - core_memory = "" - external_memory_summary = "" + components: Dict[str, Optional[str]] = { + "system_prompt": None, + "core_memory": None, + "memory_filesystem": None, + "tool_usage_rules": None, + "directories": None, + "external_memory_summary": None, + } if ( in_context_messages @@ -166,10 +298,15 @@ class ContextWindowCalculator: and isinstance(in_context_messages[0].content[0], TextContent) ): system_message = in_context_messages[0].content[0].text - system_prompt, core_memory, external_memory_summary = self.extract_system_components(system_message) + components = self.extract_system_components(system_message) - # System prompt - system_prompt = system_prompt or agent_state.system + # Extract each component with fallbacks + system_prompt = components.get("system_prompt") or agent_state.system or "" + core_memory = components.get("core_memory") or "" + memory_filesystem = components.get("memory_filesystem") or "" + tool_usage_rules = components.get("tool_usage_rules") or "" + directories = components.get("directories") or "" + external_memory_summary = components.get("external_memory_summary") or "" # Extract summary memory summary_memory, message_start_index = self.extract_summary_memory(in_context_messages) @@ -179,11 +316,14 @@ class ContextWindowCalculator: if agent_state.tools: available_functions_definitions = [OpenAITool(type="function", function=f.json_schema) for f in agent_state.tools] - # Count tokens concurrently + # Count tokens concurrently for all sections, skipping empty ones token_counts = await asyncio.gather( token_counter.count_text_tokens(system_prompt), - token_counter.count_text_tokens(core_memory), - token_counter.count_text_tokens(external_memory_summary), + token_counter.count_text_tokens(core_memory) if core_memory else asyncio.sleep(0, result=0), + token_counter.count_text_tokens(memory_filesystem) if memory_filesystem else asyncio.sleep(0, result=0), + token_counter.count_text_tokens(tool_usage_rules) if tool_usage_rules else asyncio.sleep(0, result=0), + token_counter.count_text_tokens(directories) if directories else asyncio.sleep(0, result=0), + token_counter.count_text_tokens(external_memory_summary) if external_memory_summary else asyncio.sleep(0, result=0), token_counter.count_text_tokens(summary_memory) if summary_memory else asyncio.sleep(0, result=0), ( token_counter.count_message_tokens(converted_messages[message_start_index:]) @@ -200,6 +340,9 @@ class ContextWindowCalculator: ( num_tokens_system, num_tokens_core_memory, + num_tokens_memory_filesystem, + num_tokens_tool_usage_rules, + num_tokens_directories, num_tokens_external_memory_summary, num_tokens_summary_memory, num_tokens_messages, @@ -223,6 +366,14 @@ class ContextWindowCalculator: system_prompt=system_prompt, num_tokens_core_memory=num_tokens_core_memory, core_memory=core_memory, + # New sections + num_tokens_memory_filesystem=num_tokens_memory_filesystem, + memory_filesystem=memory_filesystem if memory_filesystem else None, + num_tokens_tool_usage_rules=num_tokens_tool_usage_rules, + tool_usage_rules=tool_usage_rules if tool_usage_rules else None, + num_tokens_directories=num_tokens_directories, + directories=directories if directories else None, + # Summary and messages num_tokens_summary_memory=num_tokens_summary_memory, summary_memory=summary_memory, num_tokens_messages=num_tokens_messages, diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py index 52b13bc7..17715b91 100644 --- a/tests/helpers/utils.py +++ b/tests/helpers/utils.py @@ -186,6 +186,9 @@ def validate_context_window_overview( # 2. All token counts should be non-negative assert overview.num_tokens_system >= 0, "System token count cannot be negative" assert overview.num_tokens_core_memory >= 0, "Core memory token count cannot be negative" + assert overview.num_tokens_memory_filesystem >= 0, "Memory filesystem token count cannot be negative" + assert overview.num_tokens_tool_usage_rules >= 0, "Tool usage rules token count cannot be negative" + assert overview.num_tokens_directories >= 0, "Directories token count cannot be negative" assert overview.num_tokens_external_memory_summary >= 0, "External memory summary token count cannot be negative" assert overview.num_tokens_summary_memory >= 0, "Summary memory token count cannot be negative" assert overview.num_tokens_messages >= 0, "Messages token count cannot be negative" @@ -195,6 +198,9 @@ def validate_context_window_overview( expected_total = ( overview.num_tokens_system + overview.num_tokens_core_memory + + overview.num_tokens_memory_filesystem + + overview.num_tokens_tool_usage_rules + + overview.num_tokens_directories + overview.num_tokens_external_memory_summary + overview.num_tokens_summary_memory + overview.num_tokens_messages @@ -244,13 +250,14 @@ def validate_context_window_overview( avg_tokens_per_message = overview.num_tokens_messages / overview.num_messages assert avg_tokens_per_message >= 0, "Average tokens per message should be non-negative" - # 16. Check attached file is visible + # 16. Check attached file is visible in the directories section if attached_file: - assert attached_file.visible_content in overview.core_memory, "File must be attached in core memory" - assert '" in overview.core_memory - assert "max_files_open" in overview.core_memory, "Max files should be set in core memory" - assert "current_files_open" in overview.core_memory, "Current files should be set in core memory" + assert overview.directories is not None, "Directories section must exist when files are attached" + assert attached_file.visible_content in overview.directories, "File must be attached in directories" + assert '" in overview.directories + assert "max_files_open" in overview.directories, "Max files should be set in directories" + assert "current_files_open" in overview.directories, "Current files should be set in directories" # Check for tools assert overview.num_tokens_functions_definitions > 0 diff --git a/tests/test_context_window_calculator.py b/tests/test_context_window_calculator.py index de17a685..0f3f3035 100644 --- a/tests/test_context_window_calculator.py +++ b/tests/test_context_window_calculator.py @@ -1,17 +1,400 @@ +from unittest.mock import AsyncMock, MagicMock + import pytest from letta.services.context_window_calculator.context_window_calculator import ContextWindowCalculator -def test_extract_system_components_git_backed_memory_without_memory_blocks_wrapper(): - system_message = """You are some system prompt. +class TestExtractTagContent: + """Tests for the _extract_tag_content helper method""" + + def test_extracts_simple_tag(self): + text = "prefix content suffix" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result == "content" + + def test_returns_none_for_missing_tag(self): + text = "no tags here" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result is None + + def test_returns_none_for_missing_opening_tag(self): + text = "content" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result is None + + def test_returns_none_for_unclosed_tag(self): + text = "content without closing" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result is None + + def test_handles_multiline_content(self): + text = "\nline1\nline2\n" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result == "\nline1\nline2\n" + + def test_handles_nested_content(self): + text = "nested" + result = ContextWindowCalculator._extract_tag_content(text, "outer") + assert result == "nested" + + def test_handles_empty_content(self): + text = "" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result == "" + + def test_extracts_first_occurrence_with_duplicate_tags(self): + """When duplicate tags exist, only the first occurrence is extracted""" + text = "first some text second" + result = ContextWindowCalculator._extract_tag_content(text, "tag") + assert result == "first" + + +class TestExtractSystemComponents: + """Tests for the extract_system_components method""" + + def test_extracts_standard_agent_sections(self): + """Standard agent with base_instructions, memory_blocks, and memory_metadata""" + system_message = """ + +Base prompt here + + + +Core memory content + + + +Metadata here + +""" + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["system_prompt"] is not None + assert "" in result["system_prompt"] + assert "Base prompt here" in result["system_prompt"] + + assert result["core_memory"] is not None + assert "Core memory content" in result["core_memory"] + + assert result["external_memory_summary"] is not None + assert "" in result["external_memory_summary"] + + # These should be None for standard agent + assert result["memory_filesystem"] is None + assert result["tool_usage_rules"] is None + assert result["directories"] is None + + def test_extracts_git_enabled_agent_sections(self): + """Git-enabled agent has top-level memory_filesystem OUTSIDE memory_blocks""" + system_message = ( + "Base\n" + "\n" + "memory/\n" + " system/\n" + " human.md (100 chars)\n" + "\n" + "Meta" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["core_memory"] is None # git-enabled agents don't use + assert result["memory_filesystem"] is not None + assert "memory/" in result["memory_filesystem"] + assert "human.md" in result["memory_filesystem"] + + def test_extracts_tool_usage_rules(self): + """Agent with tool usage rules configured""" + system_message = """ +Base +Memory + +You must use tools in a specific order. + +Meta +""" + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["tool_usage_rules"] is not None + assert "specific order" in result["tool_usage_rules"] + + def test_extracts_directories(self): + """Agent with attached sources has directories section""" + system_message = """ +Base +Memory + + + +README content + + + +Meta +""" + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["directories"] is not None + assert '' in result["directories"] + assert "readme.md" in result["directories"] + + def test_handles_react_agent_no_memory_blocks(self): + """React/workflow agents don't render """ + system_message = """ +React agent base + +Some directory content + +Meta +""" + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["system_prompt"] is not None + assert result["core_memory"] is None # No memory_blocks for react agents + assert result["directories"] is not None + assert result["external_memory_summary"] is not None + + def test_handles_all_sections_present(self): + """Full agent with all optional sections""" + system_message = """ +Base instructions +Memory blocks content +Filesystem tree +Tool rules +Directories content +Metadata +""" + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["system_prompt"] is not None + assert result["core_memory"] is not None + assert result["memory_filesystem"] is not None + assert result["tool_usage_rules"] is not None + assert result["directories"] is not None + assert result["external_memory_summary"] is not None + + def test_handles_empty_string(self): + """Empty input returns all None values""" + result = ContextWindowCalculator.extract_system_components("") + assert all(v is None for v in result.values()) + + def test_returns_correct_dict_keys(self): + """Verify the returned dict has all expected keys""" + result = ContextWindowCalculator.extract_system_components("") + expected_keys = { + "system_prompt", + "core_memory", + "memory_filesystem", + "tool_usage_rules", + "directories", + "external_memory_summary", + } + assert set(result.keys()) == expected_keys + + def test_no_base_instructions_tag_extracts_preamble(self): + """Custom system prompts without should extract preamble text""" + system_message = ( + "You are a helpful AI agent.\n" + "Use the tools available to you.\n\n" + "\n" + "My name is Letta.\n" + "\n\n" + "Metadata here" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + + assert result["system_prompt"] is not None + assert "helpful AI agent" in result["system_prompt"] + assert "Use the tools" in result["system_prompt"] + # Should NOT include memory_blocks content + assert "" not in result["system_prompt"] + assert "" not in result["system_prompt"] + + assert result["core_memory"] is not None + assert result["external_memory_summary"] is not None + + def test_nested_memory_filesystem_not_extracted_as_top_level(self): + """memory_filesystem block INSIDE memory_blocks should NOT be extracted as top-level""" + system_message = ( + "You are a self-improving AI agent.\n\n" + "\n" + "The following memory blocks are currently engaged:\n\n" + "\n" + "\n" + "/memory/\n" + "\u251c\u2500\u2500 system/\n" + "\u2502 \u251c\u2500\u2500 human.md\n" + "\u2502 \u2514\u2500\u2500 persona.md\n" + "\n" + "\n\n" + "My name is Letta.\n" + "\n\n" + "Metadata" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + + # memory_filesystem is nested inside memory_blocks - should NOT be extracted + assert result["memory_filesystem"] is None + + # core_memory should include the full memory_blocks content (including the nested filesystem) + assert result["core_memory"] is not None + assert "" in result["core_memory"] + assert "human.md" in result["core_memory"] + + def test_top_level_memory_filesystem_outside_memory_blocks(self): + """Top-level memory_filesystem (git-enabled) rendered BEFORE memory_blocks is extracted""" + system_message = ( + "Base\n" + "\n" + "\u251c\u2500\u2500 system/\n" + "\u2502 \u2514\u2500\u2500 human.md\n" + "\n\n" + "\n---\ndescription: About the human\n---\nName: Alice\n\n\n" + "Meta" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + + # This memory_filesystem is top-level (no memory_blocks container) + assert result["memory_filesystem"] is not None + assert "human.md" in result["memory_filesystem"] + + # Bare file blocks after are captured as core_memory + assert result["core_memory"] is not None + assert "" in result["core_memory"] + assert "Name: Alice" in result["core_memory"] + + def test_letta_code_agent_real_format(self): + """Real-world Letta Code agent format: no base_instructions, nested memory_filesystem""" + system_message = ( + "You are a self-improving AI agent with advanced memory.\n" + "You are connected to an interactive CLI tool.\n\n" + "# Memory\n" + "You have an advanced memory system.\n\n" + "\n" + "The following memory blocks are currently engaged:\n\n" + "\n" + "Filesystem view\n" + "\n" + "/memory/\n" + "\u251c\u2500\u2500 system/\n" + "\u2502 \u251c\u2500\u2500 human.md\n" + "\u2502 \u2514\u2500\u2500 persona.md\n" + "\n" + "\n\n" + "\n" + "My name is Letta Code.\n" + "\n\n" + "\n" + "Name: Jin Peng\n" + "\n" + "\n\n" + "\n" + "- The current system date is: February 10, 2026\n" + "- 9663 previous messages in recall memory\n" + "" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + + # System prompt: preamble before + assert result["system_prompt"] is not None + assert "self-improving AI agent" in result["system_prompt"] + assert "advanced memory system" in result["system_prompt"] + assert "" not in result["system_prompt"] + + # Core memory: the full section + assert result["core_memory"] is not None + assert "Letta Code" in result["core_memory"] + assert "Jin Peng" in result["core_memory"] + + # memory_filesystem is NESTED inside memory_blocks - should NOT be extracted + assert result["memory_filesystem"] is None + + # No tool_usage_rules or directories + assert result["tool_usage_rules"] is None + assert result["directories"] is None + + # External memory summary + assert result["external_memory_summary"] is not None + assert "February 10, 2026" in result["external_memory_summary"] + + def test_git_enabled_agent_bare_file_blocks_captured_as_core_memory(self): + """Git-enabled agents render bare file blocks after — these must be captured as core_memory""" + system_message = ( + "Base\n" + "\n" + "\u251c\u2500\u2500 system/\n" + "\u2502 \u251c\u2500\u2500 human.md\n" + "\u2502 \u2514\u2500\u2500 persona.md\n" + "\n\n" + "\n---\ndescription: About the human\nlimit: 2000\n---\nName: Alice\n\n\n" + "\n---\ndescription: Agent persona\n---\nI am a helpful assistant.\n\n\n" + "Always call send_message to respond.\n" + "Meta" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + + # memory_filesystem should be the tree view only + assert result["memory_filesystem"] is not None + assert "\u251c\u2500\u2500 system/" in result["memory_filesystem"] + + # core_memory should capture the bare file blocks + assert result["core_memory"] is not None + assert "" in result["core_memory"] + assert "Name: Alice" in result["core_memory"] + assert "" in result["core_memory"] + assert "helpful assistant" in result["core_memory"] + + # tool_usage_rules should NOT be included in core_memory + assert "" not in result["core_memory"] + + # Other sections + assert result["tool_usage_rules"] is not None + assert result["external_memory_summary"] is not None + + def test_git_enabled_agent_no_bare_blocks(self): + """Git-enabled agent with no file blocks after memory_filesystem returns None for core_memory""" + system_message = ( + "Base\n" + "\n" + "\u251c\u2500\u2500 system/\n" + "\n" + "Meta" + ) + result = ContextWindowCalculator.extract_system_components(system_message) + assert result["memory_filesystem"] is not None + assert result["core_memory"] is None + + def test_extract_top_level_tag_dual_occurrence_nested_first(self): + """When a tag appears nested first and top-level later, the top-level one is extracted""" + system_message = ( + "\n" + "nested rules\n" + "\n\n" + "top-level rules" + ) + result = ContextWindowCalculator._extract_top_level_tag(system_message, "tool_usage_rules") + assert result is not None + assert "top-level rules" in result + assert "nested rules" not in result + + def test_extract_system_prompt_pure_text_no_tags(self): + """System message with no section tags at all returns the full text as system_prompt""" + system_message = "You are a simple agent.\nYou help the user with tasks." + result = ContextWindowCalculator._extract_system_prompt(system_message) + assert result is not None + assert "simple agent" in result + assert "help the user" in result + + def test_git_backed_memory_without_memory_blocks_wrapper(self): + """Regression test from main: git-backed agents without wrapper""" + system_message = """You are some system prompt. Memory Directory: ~/.letta/agents/agent-123/memory /memory/ -└── system/ - └── human.md +\u2514\u2500\u2500 system/ + \u2514\u2500\u2500 human.md @@ -26,17 +409,20 @@ hello - foo=bar """ + result = ContextWindowCalculator.extract_system_components(system_message) - system_prompt, core_memory, external_memory_summary = ContextWindowCalculator.extract_system_components(system_message) + assert "You are some system prompt" in result["system_prompt"] + # memory_filesystem is a top-level section + assert result["memory_filesystem"] is not None + assert "" in result["memory_filesystem"] + # bare file blocks are captured as core_memory + assert result["core_memory"] is not None + assert "" in result["core_memory"] + assert result["external_memory_summary"].startswith("") - assert "You are some system prompt" in system_prompt - assert "" in core_memory - assert "" in core_memory - assert external_memory_summary.startswith("") - - -def test_extract_system_components_legacy_memory_blocks_wrapper(): - system_message = """SYS + def test_legacy_memory_blocks_wrapper(self): + """Regression test from main: legacy memory_blocks wrapper is properly parsed""" + system_message = """SYS p @@ -46,9 +432,217 @@ def test_extract_system_components_legacy_memory_blocks_wrapper(): - x=y """ + result = ContextWindowCalculator.extract_system_components(system_message) - system_prompt, core_memory, external_memory_summary = ContextWindowCalculator.extract_system_components(system_message) + assert result["system_prompt"].startswith("") + assert result["core_memory"].startswith("") + assert result["external_memory_summary"].startswith("") - assert system_prompt.startswith("") - assert core_memory.startswith("") - assert external_memory_summary.startswith("") + +def _make_system_message(text: str): + """Helper to create a real Message object for use as a system message in tests.""" + from letta.schemas.enums import MessageRole + from letta.schemas.letta_message_content import TextContent + from letta.schemas.message import Message + + return Message(role=MessageRole.system, content=[TextContent(text=text)]) + + +def _make_mock_deps(system_text: str): + """Helper to create mocked token_counter, message_manager, and agent_state.""" + token_counter = MagicMock() + token_counter.count_text_tokens = AsyncMock(side_effect=lambda text: len(text) if text else 0) + token_counter.count_message_tokens = AsyncMock(return_value=0) + token_counter.count_tool_tokens = AsyncMock(return_value=0) + token_counter.convert_messages = MagicMock(return_value=[{"role": "system", "content": system_text}]) + + message_manager = MagicMock() + message_manager.get_messages_by_ids_async = AsyncMock(return_value=[]) + + agent_state = MagicMock() + agent_state.id = "agent-test" + agent_state.message_ids = ["msg-sys"] + agent_state.system = "fallback system prompt" + agent_state.tools = [] + agent_state.llm_config.context_window = 128000 + + actor = MagicMock() + + return token_counter, message_manager, agent_state, actor + + +class TestCalculateContextWindow: + """Integration tests for calculate_context_window with mocked dependencies""" + + @pytest.mark.asyncio + async def test_calculate_context_window_standard_agent(self): + """Test full context window calculation with a standard system message""" + system_text = ( + "You are a helpful agent.\n" + "human: User is Alice\n" + "Archival: 5 passages" + ) + + system_msg = _make_system_message(system_text) + token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) + + calculator = ContextWindowCalculator() + result = await calculator.calculate_context_window( + agent_state=agent_state, + actor=actor, + token_counter=token_counter, + message_manager=message_manager, + system_message_compiled=system_msg, + num_archival_memories=5, + num_messages=10, + message_ids=[], + ) + + assert result.context_window_size_max == 128000 + assert result.num_archival_memory == 5 + assert result.num_recall_memory == 10 + assert result.num_tokens_system > 0 + assert "helpful agent" in result.system_prompt + assert result.num_tokens_core_memory > 0 + assert "User is Alice" in result.core_memory + assert result.num_tokens_external_memory_summary > 0 + + # New sections should be None/0 since not in system message + assert result.memory_filesystem is None + assert result.num_tokens_memory_filesystem == 0 + assert result.tool_usage_rules is None + assert result.num_tokens_tool_usage_rules == 0 + assert result.directories is None + assert result.num_tokens_directories == 0 + + @pytest.mark.asyncio + async def test_calculate_context_window_skips_empty_sections(self): + """Verify that token counting is skipped for empty/missing sections""" + # Only base_instructions, no other sections + system_text = "Simple agent" + + system_msg = _make_system_message(system_text) + token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) + + calculator = ContextWindowCalculator() + await calculator.calculate_context_window( + agent_state=agent_state, + actor=actor, + token_counter=token_counter, + message_manager=message_manager, + system_message_compiled=system_msg, + num_archival_memories=0, + num_messages=0, + message_ids=[], + ) + + # count_text_tokens should only be called for system_prompt (non-empty) + # and NOT for core_memory, memory_filesystem, tool_usage_rules, directories, + # external_memory_summary, or summary_memory (all empty/None) + calls = token_counter.count_text_tokens.call_args_list + assert len(calls) == 1, f"Expected 1 call to count_text_tokens (system_prompt only), got {len(calls)}: {calls}" + + @pytest.mark.asyncio + async def test_calculate_context_window_all_sections(self): + """Test with all optional sections present""" + system_text = ( + "Agent instructions\n" + "Core memory\n" + "\u251c\u2500\u2500 system/\n\u2502 \u2514\u2500\u2500 human.md\n" + "Always call search first\n" + 'content\n' + "Archival: 10 passages" + ) + + system_msg = _make_system_message(system_text) + token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) + + calculator = ContextWindowCalculator() + result = await calculator.calculate_context_window( + agent_state=agent_state, + actor=actor, + token_counter=token_counter, + message_manager=message_manager, + system_message_compiled=system_msg, + num_archival_memories=10, + num_messages=5, + message_ids=[], + ) + + # All sections should be populated + assert result.num_tokens_system > 0 + assert result.num_tokens_core_memory > 0 + assert result.num_tokens_memory_filesystem > 0 + assert result.memory_filesystem is not None + assert result.num_tokens_tool_usage_rules > 0 + assert result.tool_usage_rules is not None + assert result.num_tokens_directories > 0 + assert result.directories is not None + assert result.num_tokens_external_memory_summary > 0 + + # Verify total is sum of all parts + expected_total = ( + result.num_tokens_system + + result.num_tokens_core_memory + + result.num_tokens_memory_filesystem + + result.num_tokens_tool_usage_rules + + result.num_tokens_directories + + result.num_tokens_external_memory_summary + + result.num_tokens_summary_memory + + result.num_tokens_messages + + result.num_tokens_functions_definitions + ) + assert result.context_window_size_current == expected_total + + @pytest.mark.asyncio + async def test_calculate_context_window_git_enabled_agent(self): + """Test that git-enabled agents capture bare file blocks as core_memory""" + system_text = ( + "Git agent\n" + "\n" + "\u251c\u2500\u2500 system/\n" + "\u2502 \u251c\u2500\u2500 human.md\n" + "\u2502 \u2514\u2500\u2500 persona.md\n" + "\n\n" + "\n---\ndescription: About the human\n---\nName: Alice\n\n\n" + "\n---\ndescription: Agent persona\n---\nI am helpful.\n\n\n" + "Archival: 3 passages" + ) + + system_msg = _make_system_message(system_text) + token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) + + calculator = ContextWindowCalculator() + result = await calculator.calculate_context_window( + agent_state=agent_state, + actor=actor, + token_counter=token_counter, + message_manager=message_manager, + system_message_compiled=system_msg, + num_archival_memories=3, + num_messages=5, + message_ids=[], + ) + + # memory_filesystem should capture the tree view + assert result.memory_filesystem is not None + assert result.num_tokens_memory_filesystem > 0 + + # core_memory should capture the bare file blocks + assert result.num_tokens_core_memory > 0 + assert "Name: Alice" in result.core_memory + assert "" in result.core_memory + + # Total should include all sections + expected_total = ( + result.num_tokens_system + + result.num_tokens_core_memory + + result.num_tokens_memory_filesystem + + result.num_tokens_tool_usage_rules + + result.num_tokens_directories + + result.num_tokens_external_memory_summary + + result.num_tokens_summary_memory + + result.num_tokens_messages + + result.num_tokens_functions_definitions + ) + assert result.context_window_size_current == expected_total