from unittest.mock import AsyncMock, MagicMock import pytest from letta.services.context_window_calculator.context_window_calculator import ContextWindowCalculator class TestExtractTagContent: """Tests for the _extract_tag_content helper method""" def test_extracts_simple_tag(self): text = "prefix content suffix" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result == "content" def test_returns_none_for_missing_tag(self): text = "no tags here" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result is None def test_returns_none_for_missing_opening_tag(self): text = "content" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result is None def test_returns_none_for_unclosed_tag(self): text = "content without closing" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result is None def test_handles_multiline_content(self): text = "\nline1\nline2\n" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result == "\nline1\nline2\n" def test_handles_nested_content(self): text = "nested" result = ContextWindowCalculator._extract_tag_content(text, "outer") assert result == "nested" def test_handles_empty_content(self): text = "" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result == "" def test_extracts_first_occurrence_with_duplicate_tags(self): """When duplicate tags exist, only the first occurrence is extracted""" text = "first some text second" result = ContextWindowCalculator._extract_tag_content(text, "tag") assert result == "first" class TestExtractSystemComponents: """Tests for the extract_system_components method""" def test_extracts_standard_agent_sections(self): """Standard agent with base_instructions, memory_blocks, and memory_metadata""" system_message = """ Base prompt here Core memory content Metadata here """ result = ContextWindowCalculator.extract_system_components(system_message) assert result["system_prompt"] is not None assert "" in result["system_prompt"] assert "Base prompt here" in result["system_prompt"] assert result["core_memory"] is not None assert "Core memory content" in result["core_memory"] assert result["external_memory_summary"] is not None assert "" in result["external_memory_summary"] # These should be None for standard agent assert result["memory_filesystem"] is None assert result["tool_usage_rules"] is None assert result["directories"] is None def test_extracts_git_enabled_agent_sections(self): """Git-enabled agent has top-level memory_filesystem OUTSIDE memory_blocks""" system_message = ( "Base\n" "\n" "memory/\n" " system/\n" " human.md (100 chars)\n" "\n" "Meta" ) result = ContextWindowCalculator.extract_system_components(system_message) assert result["core_memory"] is None # git-enabled agents don't use assert result["memory_filesystem"] is not None assert "memory/" in result["memory_filesystem"] assert "human.md" in result["memory_filesystem"] def test_extracts_tool_usage_rules(self): """Agent with tool usage rules configured""" system_message = """ Base Memory You must use tools in a specific order. Meta """ result = ContextWindowCalculator.extract_system_components(system_message) assert result["tool_usage_rules"] is not None assert "specific order" in result["tool_usage_rules"] def test_extracts_directories(self): """Agent with attached sources has directories section""" system_message = """ Base Memory README content Meta """ result = ContextWindowCalculator.extract_system_components(system_message) assert result["directories"] is not None assert '' in result["directories"] assert "readme.md" in result["directories"] def test_handles_react_agent_no_memory_blocks(self): """React/workflow agents don't render """ system_message = """ React agent base Some directory content Meta """ result = ContextWindowCalculator.extract_system_components(system_message) assert result["system_prompt"] is not None assert result["core_memory"] is None # No memory_blocks for react agents assert result["directories"] is not None assert result["external_memory_summary"] is not None def test_handles_all_sections_present(self): """Full agent with all optional sections""" system_message = """ Base instructions Memory blocks content Filesystem tree Tool rules Directories content Metadata """ result = ContextWindowCalculator.extract_system_components(system_message) assert result["system_prompt"] is not None assert result["core_memory"] is not None assert result["memory_filesystem"] is not None assert result["tool_usage_rules"] is not None assert result["directories"] is not None assert result["external_memory_summary"] is not None def test_handles_empty_string(self): """Empty input returns all None values""" result = ContextWindowCalculator.extract_system_components("") assert all(v is None for v in result.values()) def test_returns_correct_dict_keys(self): """Verify the returned dict has all expected keys""" result = ContextWindowCalculator.extract_system_components("") expected_keys = { "system_prompt", "core_memory", "memory_filesystem", "tool_usage_rules", "directories", "external_memory_summary", } assert set(result.keys()) == expected_keys def test_no_base_instructions_tag_extracts_preamble(self): """Custom system prompts without should extract preamble text""" system_message = ( "You are a helpful AI agent.\n" "Use the tools available to you.\n\n" "\n" "My name is Letta.\n" "\n\n" "Metadata here" ) result = ContextWindowCalculator.extract_system_components(system_message) assert result["system_prompt"] is not None assert "helpful AI agent" in result["system_prompt"] assert "Use the tools" in result["system_prompt"] # Should NOT include memory_blocks content assert "" not in result["system_prompt"] assert "" not in result["system_prompt"] assert result["core_memory"] is not None assert result["external_memory_summary"] is not None def test_nested_memory_filesystem_not_extracted_as_top_level(self): """memory_filesystem block INSIDE memory_blocks should NOT be extracted as top-level""" system_message = ( "You are a self-improving AI agent.\n\n" "\n" "The following memory blocks are currently engaged:\n\n" "\n" "\n" "/memory/\n" "\u251c\u2500\u2500 system/\n" "\u2502 \u251c\u2500\u2500 human.md\n" "\u2502 \u2514\u2500\u2500 persona.md\n" "\n" "\n\n" "My name is Letta.\n" "\n\n" "Metadata" ) result = ContextWindowCalculator.extract_system_components(system_message) # memory_filesystem is nested inside memory_blocks - should NOT be extracted assert result["memory_filesystem"] is None # core_memory should include the full memory_blocks content (including the nested filesystem) assert result["core_memory"] is not None assert "" in result["core_memory"] assert "human.md" in result["core_memory"] def test_top_level_memory_filesystem_outside_memory_blocks(self): """Top-level memory_filesystem (git-enabled) rendered BEFORE memory_blocks is extracted""" system_message = ( "Base\n" "\n" "\u251c\u2500\u2500 system/\n" "\u2502 \u2514\u2500\u2500 human.md\n" "\n\n" "\n---\ndescription: About the human\n---\nName: Alice\n\n\n" "Meta" ) result = ContextWindowCalculator.extract_system_components(system_message) # This memory_filesystem is top-level (no memory_blocks container) assert result["memory_filesystem"] is not None assert "human.md" in result["memory_filesystem"] # Bare file blocks after are captured as core_memory assert result["core_memory"] is not None assert "" in result["core_memory"] assert "Name: Alice" in result["core_memory"] def test_letta_code_agent_real_format(self): """Real-world Letta Code agent format: no base_instructions, nested memory_filesystem""" system_message = ( "You are a self-improving AI agent with advanced memory.\n" "You are connected to an interactive CLI tool.\n\n" "# Memory\n" "You have an advanced memory system.\n\n" "\n" "The following memory blocks are currently engaged:\n\n" "\n" "Filesystem view\n" "\n" "/memory/\n" "\u251c\u2500\u2500 system/\n" "\u2502 \u251c\u2500\u2500 human.md\n" "\u2502 \u2514\u2500\u2500 persona.md\n" "\n" "\n\n" "\n" "My name is Letta Code.\n" "\n\n" "\n" "Name: Jin Peng\n" "\n" "\n\n" "\n" "- The current system date is: February 10, 2026\n" "- 9663 previous messages in recall memory\n" "" ) result = ContextWindowCalculator.extract_system_components(system_message) # System prompt: preamble before assert result["system_prompt"] is not None assert "self-improving AI agent" in result["system_prompt"] assert "advanced memory system" in result["system_prompt"] assert "" not in result["system_prompt"] # Core memory: the full section assert result["core_memory"] is not None assert "Letta Code" in result["core_memory"] assert "Jin Peng" in result["core_memory"] # memory_filesystem is NESTED inside memory_blocks - should NOT be extracted assert result["memory_filesystem"] is None # No tool_usage_rules or directories assert result["tool_usage_rules"] is None assert result["directories"] is None # External memory summary assert result["external_memory_summary"] is not None assert "February 10, 2026" in result["external_memory_summary"] def test_git_enabled_agent_bare_file_blocks_captured_as_core_memory(self): """Git-enabled agents render bare file blocks after — these must be captured as core_memory""" system_message = ( "Base\n" "\n" "\u251c\u2500\u2500 system/\n" "\u2502 \u251c\u2500\u2500 human.md\n" "\u2502 \u2514\u2500\u2500 persona.md\n" "\n\n" "\n---\ndescription: About the human\nlimit: 2000\n---\nName: Alice\n\n\n" "\n---\ndescription: Agent persona\n---\nI am a helpful assistant.\n\n\n" "Always call send_message to respond.\n" "Meta" ) result = ContextWindowCalculator.extract_system_components(system_message) # memory_filesystem should preserve tree connectors with deterministic ordering assert result["memory_filesystem"] is not None assert "\u251c\u2500\u2500 system/" in result["memory_filesystem"] # core_memory should capture the bare file blocks assert result["core_memory"] is not None assert "" in result["core_memory"] assert "Name: Alice" in result["core_memory"] assert "" in result["core_memory"] assert "helpful assistant" in result["core_memory"] # tool_usage_rules should NOT be included in core_memory assert "" not in result["core_memory"] # Other sections assert result["tool_usage_rules"] is not None assert result["external_memory_summary"] is not None def test_git_enabled_agent_no_bare_blocks(self): """Git-enabled agent with no file blocks after memory_filesystem returns None for core_memory""" system_message = ( "Base\n" "\n" "\u251c\u2500\u2500 system/\n" "\n" "Meta" ) result = ContextWindowCalculator.extract_system_components(system_message) assert result["memory_filesystem"] is not None assert result["core_memory"] is None def test_extract_top_level_tag_dual_occurrence_nested_first(self): """When a tag appears nested first and top-level later, the top-level one is extracted""" system_message = ( "\n" "nested rules\n" "\n\n" "top-level rules" ) result = ContextWindowCalculator._extract_top_level_tag(system_message, "tool_usage_rules") assert result is not None assert "top-level rules" in result assert "nested rules" not in result def test_extract_system_prompt_pure_text_no_tags(self): """System message with no section tags at all returns the full text as system_prompt""" system_message = "You are a simple agent.\nYou help the user with tasks." result = ContextWindowCalculator._extract_system_prompt(system_message) assert result is not None assert "simple agent" in result assert "help the user" in result def test_git_backed_memory_without_memory_blocks_wrapper(self): """Regression test from main: git-backed agents without wrapper""" system_message = """You are some system prompt. Memory Directory: ~/.letta/agents/agent-123/memory /memory/ \u2514\u2500\u2500 system/ \u2514\u2500\u2500 human.md --- description: test limit: 10 --- hello - foo=bar """ result = ContextWindowCalculator.extract_system_components(system_message) assert "You are some system prompt" in result["system_prompt"] # memory_filesystem is a top-level section assert result["memory_filesystem"] is not None assert "" in result["memory_filesystem"] # bare file blocks are captured as core_memory assert result["core_memory"] is not None assert "" in result["core_memory"] assert result["external_memory_summary"].startswith("") def test_legacy_memory_blocks_wrapper(self): """Regression test from main: legacy memory_blocks wrapper is properly parsed""" system_message = """SYS p - x=y """ result = ContextWindowCalculator.extract_system_components(system_message) assert result["system_prompt"].startswith("") assert result["core_memory"].startswith("") assert result["external_memory_summary"].startswith("") def _make_system_message(text: str): """Helper to create a real Message object for use as a system message in tests.""" from letta.schemas.enums import MessageRole from letta.schemas.letta_message_content import TextContent from letta.schemas.message import Message return Message(role=MessageRole.system, content=[TextContent(text=text)]) def _make_mock_deps(system_text: str): """Helper to create mocked token_counter, message_manager, and agent_state.""" token_counter = MagicMock() token_counter.count_text_tokens = AsyncMock(side_effect=lambda text: len(text) if text else 0) token_counter.count_message_tokens = AsyncMock(return_value=0) token_counter.count_tool_tokens = AsyncMock(return_value=0) token_counter.convert_messages = MagicMock(return_value=[{"role": "system", "content": system_text}]) message_manager = MagicMock() message_manager.get_messages_by_ids_async = AsyncMock(return_value=[]) agent_state = MagicMock() agent_state.id = "agent-test" agent_state.message_ids = ["msg-sys"] agent_state.system = "fallback system prompt" agent_state.tools = [] agent_state.llm_config.context_window = 128000 actor = MagicMock() return token_counter, message_manager, agent_state, actor class TestCalculateContextWindow: """Integration tests for calculate_context_window with mocked dependencies""" @pytest.mark.asyncio async def test_calculate_context_window_standard_agent(self): """Test full context window calculation with a standard system message""" system_text = ( "You are a helpful agent.\n" "human: User is Alice\n" "Archival: 5 passages" ) system_msg = _make_system_message(system_text) token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) calculator = ContextWindowCalculator() result = await calculator.calculate_context_window( agent_state=agent_state, actor=actor, token_counter=token_counter, message_manager=message_manager, system_message_compiled=system_msg, num_archival_memories=5, num_messages=10, message_ids=[], ) assert result.context_window_size_max == 128000 assert result.num_archival_memory == 5 assert result.num_recall_memory == 10 assert result.num_tokens_system > 0 assert "helpful agent" in result.system_prompt assert result.num_tokens_core_memory > 0 assert "User is Alice" in result.core_memory assert result.num_tokens_external_memory_summary > 0 # New sections should be None/0 since not in system message assert result.memory_filesystem is None assert result.num_tokens_memory_filesystem == 0 assert result.tool_usage_rules is None assert result.num_tokens_tool_usage_rules == 0 assert result.directories is None assert result.num_tokens_directories == 0 @pytest.mark.asyncio async def test_calculate_context_window_skips_empty_sections(self): """Verify that token counting is skipped for empty/missing sections""" # Only base_instructions, no other sections system_text = "Simple agent" system_msg = _make_system_message(system_text) token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) calculator = ContextWindowCalculator() await calculator.calculate_context_window( agent_state=agent_state, actor=actor, token_counter=token_counter, message_manager=message_manager, system_message_compiled=system_msg, num_archival_memories=0, num_messages=0, message_ids=[], ) # count_text_tokens should only be called for system_prompt (non-empty) # and NOT for core_memory, memory_filesystem, tool_usage_rules, directories, # external_memory_summary, or summary_memory (all empty/None) calls = token_counter.count_text_tokens.call_args_list assert len(calls) == 1, f"Expected 1 call to count_text_tokens (system_prompt only), got {len(calls)}: {calls}" @pytest.mark.asyncio async def test_calculate_context_window_all_sections(self): """Test with all optional sections present""" system_text = ( "Agent instructions\n" "Core memory\n" "\u251c\u2500\u2500 system/\n\u2502 \u2514\u2500\u2500 human.md\n" "Always call search first\n" 'content\n' "Archival: 10 passages" ) system_msg = _make_system_message(system_text) token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) calculator = ContextWindowCalculator() result = await calculator.calculate_context_window( agent_state=agent_state, actor=actor, token_counter=token_counter, message_manager=message_manager, system_message_compiled=system_msg, num_archival_memories=10, num_messages=5, message_ids=[], ) # All sections should be populated assert result.num_tokens_system > 0 assert result.num_tokens_core_memory > 0 assert result.num_tokens_memory_filesystem > 0 assert result.memory_filesystem is not None assert result.num_tokens_tool_usage_rules > 0 assert result.tool_usage_rules is not None assert result.num_tokens_directories > 0 assert result.directories is not None assert result.num_tokens_external_memory_summary > 0 # Verify total is sum of all parts expected_total = ( result.num_tokens_system + result.num_tokens_core_memory + result.num_tokens_memory_filesystem + result.num_tokens_tool_usage_rules + result.num_tokens_directories + result.num_tokens_external_memory_summary + result.num_tokens_summary_memory + result.num_tokens_messages + result.num_tokens_functions_definitions ) assert result.context_window_size_current == expected_total @pytest.mark.asyncio async def test_calculate_context_window_git_enabled_agent(self): """Test that git-enabled agents capture bare file blocks as core_memory""" system_text = ( "Git agent\n" "\n" "\u251c\u2500\u2500 system/\n" "\u2502 \u251c\u2500\u2500 human.md\n" "\u2502 \u2514\u2500\u2500 persona.md\n" "\n\n" "\n---\ndescription: About the human\n---\nName: Alice\n\n\n" "\n---\ndescription: Agent persona\n---\nI am helpful.\n\n\n" "Archival: 3 passages" ) system_msg = _make_system_message(system_text) token_counter, message_manager, agent_state, actor = _make_mock_deps(system_text) calculator = ContextWindowCalculator() result = await calculator.calculate_context_window( agent_state=agent_state, actor=actor, token_counter=token_counter, message_manager=message_manager, system_message_compiled=system_msg, num_archival_memories=3, num_messages=5, message_ids=[], ) # memory_filesystem should capture the tree view assert result.memory_filesystem is not None assert result.num_tokens_memory_filesystem > 0 # core_memory should capture the bare file blocks assert result.num_tokens_core_memory > 0 assert "Name: Alice" in result.core_memory assert "" in result.core_memory # Total should include all sections expected_total = ( result.num_tokens_system + result.num_tokens_core_memory + result.num_tokens_memory_filesystem + result.num_tokens_tool_usage_rules + result.num_tokens_directories + result.num_tokens_external_memory_summary + result.num_tokens_summary_memory + result.num_tokens_messages + result.num_tokens_functions_definitions ) assert result.context_window_size_current == expected_total