import os import re import threading import time import pytest from dotenv import load_dotenv from letta_client import CreateBlock from letta_client import Letta as LettaSDKClient from letta_client import LettaRequest from letta_client import MessageCreate as ClientMessageCreate from letta_client.types import AgentState from letta.constants import DEFAULT_ORG_ID, FILES_TOOLS from letta.orm.enums import ToolType from letta.schemas.message import MessageCreate from letta.schemas.user import User from letta.settings import settings from tests.utils import wait_for_server # Constants SERVER_PORT = 8283 def get_raw_system_message(client: LettaSDKClient, agent_id: str) -> str: """Helper function to get the raw system message from an agent's preview payload.""" raw_payload = client.agents.messages.preview_raw_payload( agent_id=agent_id, request=LettaRequest( messages=[ ClientMessageCreate( role="user", content="Testing", ) ], ), ) return raw_payload["messages"][0]["content"] @pytest.fixture(autouse=True) def clear_sources(client: LettaSDKClient): # Clear existing sources for source in client.sources.list(): client.sources.delete(source_id=source.id) def run_server(): load_dotenv() from letta.server.rest_api.app import start_server print("Starting server...") start_server(debug=True) @pytest.fixture(scope="module") def client() -> LettaSDKClient: # Get URL from environment or start server server_url = os.getenv("LETTA_SERVER_URL", f"http://localhost:{SERVER_PORT}") if not os.getenv("LETTA_SERVER_URL"): print("Starting server thread") thread = threading.Thread(target=run_server, daemon=True) thread.start() wait_for_server(server_url) print("Running client tests with server:", server_url) client = LettaSDKClient(base_url=server_url, token=None) client.tools.upsert_base_tools() yield client def upload_file_and_wait(client: LettaSDKClient, source_id: str, file_path: str, max_wait: int = 60): """Helper function to upload a file and wait for processing to complete""" with open(file_path, "rb") as f: file_metadata = client.sources.files.upload(source_id=source_id, file=f) # Wait for the file to be processed start_time = time.time() while file_metadata.processing_status != "completed" and file_metadata.processing_status != "error": if time.time() - start_time > max_wait: pytest.fail(f"File processing timed out after {max_wait} seconds") time.sleep(1) file_metadata = client.sources.get_file_metadata(source_id=source_id, file_id=file_metadata.id) print("Waiting for file processing to complete...", file_metadata.processing_status) if file_metadata.processing_status == "error": pytest.fail(f"File processing failed: {file_metadata.error_message}") return file_metadata @pytest.fixture def agent_state(disable_pinecone, client: LettaSDKClient): open_file_tool = client.tools.list(name="open_files")[0] search_files_tool = client.tools.list(name="semantic_search_files")[0] grep_tool = client.tools.list(name="grep_files")[0] agent_state = client.agents.create( name="test_sources_agent", memory_blocks=[ CreateBlock( label="human", value="username: sarah", ), ], model="openai/gpt-4o-mini", embedding="openai/text-embedding-3-small", tool_ids=[open_file_tool.id, search_files_tool.id, grep_tool.id], ) yield agent_state # Tests def test_auto_attach_detach_files_tools(disable_pinecone, client: LettaSDKClient): """Test automatic attachment and detachment of file tools when managing agent sources.""" # Create agent with basic configuration agent = client.agents.create( memory_blocks=[ CreateBlock(label="human", value="username: sarah"), ], model="openai/gpt-4o-mini", embedding="openai/text-embedding-3-small", ) # Helper function to get file tools from agent def get_file_tools(agent_state): return {tool.name for tool in agent_state.tools if tool.tool_type == ToolType.LETTA_FILES_CORE} # Helper function to assert file tools presence def assert_file_tools_present(agent_state, expected_tools): actual_tools = get_file_tools(agent_state) assert actual_tools == expected_tools, f"File tools mismatch.\nExpected: {expected_tools}\nFound: {actual_tools}" # Helper function to assert no file tools def assert_no_file_tools(agent_state): has_file_tools = any(tool.tool_type == ToolType.LETTA_FILES_CORE for tool in agent_state.tools) assert not has_file_tools, "File tools should not be present" # Initial state: no file tools assert_no_file_tools(agent) # Create and attach first source source_1 = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") assert len(client.sources.list()) == 1 agent = client.agents.sources.attach(source_id=source_1.id, agent_id=agent.id) assert len(client.agents.retrieve(agent_id=agent.id).sources) == 1 assert_file_tools_present(agent, set(FILES_TOOLS)) # Create and attach second source source_2 = client.sources.create(name="another_test_source", embedding="openai/text-embedding-3-small") assert len(client.sources.list()) == 2 agent = client.agents.sources.attach(source_id=source_2.id, agent_id=agent.id) assert len(client.agents.retrieve(agent_id=agent.id).sources) == 2 # File tools should remain after attaching second source assert_file_tools_present(agent, set(FILES_TOOLS)) # Detach second source - tools should remain (first source still attached) agent = client.agents.sources.detach(source_id=source_2.id, agent_id=agent.id) assert_file_tools_present(agent, set(FILES_TOOLS)) # Detach first source - all file tools should be removed agent = client.agents.sources.detach(source_id=source_1.id, agent_id=agent.id) assert_no_file_tools(agent) @pytest.mark.parametrize( "file_path, expected_value, expected_label_regex", [ ("tests/data/test.txt", "test", r"test_source/test\.txt"), ("tests/data/memgpt_paper.pdf", "MemGPT", r"test_source/memgpt_paper\.pdf"), ("tests/data/toy_chat_fine_tuning.jsonl", '{"messages"', r"test_source/toy_chat_fine_tuning\.jsonl"), ("tests/data/test.md", "h2 Heading", r"test_source/test\.md"), ("tests/data/test.json", "glossary", r"test_source/test\.json"), ("tests/data/react_component.jsx", "UserProfile", r"test_source/react_component\.jsx"), ("tests/data/task_manager.java", "TaskManager", r"test_source/task_manager\.java"), ("tests/data/data_structures.cpp", "BinarySearchTree", r"test_source/data_structures\.cpp"), ("tests/data/api_server.go", "UserService", r"test_source/api_server\.go"), ("tests/data/data_analysis.py", "StatisticalAnalyzer", r"test_source/data_analysis\.py"), ("tests/data/test.csv", "Smart Fridge Plus", r"test_source/test\.csv"), ], ) def test_file_upload_creates_source_blocks_correctly( disable_pinecone, client: LettaSDKClient, agent_state: AgentState, file_path: str, expected_value: str, expected_label_regex: str, ): # skip pdf tests if mistral api key is missing if file_path.endswith(".pdf") and not settings.mistral_api_key: pytest.skip("mistral api key required for pdf processing") # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") assert len(client.sources.list()) == 1 # Attach client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) # Upload the file upload_file_and_wait(client, source.id, file_path) # Get uploaded files files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id # Check that blocks were created agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 1 assert any(expected_value in b.value for b in blocks) assert any(b.value.startswith("[Viewing file start") for b in blocks) assert any(re.fullmatch(expected_label_regex, b.label) for b in blocks) # verify raw system message contains source information raw_system_message = get_raw_system_message(client, agent_state.id) assert "test_source" in raw_system_message assert "" in raw_system_message # verify file-specific details in raw system message file_name = files[0].file_name assert f'name="test_source/{file_name}"' in raw_system_message assert 'status="open"' in raw_system_message # Remove file from source client.sources.files.delete(source_id=source.id, file_id=files[0].id) # Confirm blocks were removed agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 0 assert not any(expected_value in b.value for b in blocks) assert not any(re.fullmatch(expected_label_regex, b.label) for b in blocks) # verify raw system message no longer contains source information raw_system_message_after_removal = get_raw_system_message(client, agent_state.id) # this should be in, because we didn't delete the source assert "test_source" in raw_system_message_after_removal assert "" in raw_system_message_after_removal # verify file-specific details are also removed assert f'name="test_source/{file_name}"' not in raw_system_message_after_removal def test_attach_existing_files_creates_source_blocks_correctly(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") assert len(client.sources.list()) == 1 # Load files into the source file_path = "tests/data/test.txt" # Upload the files upload_file_and_wait(client, source.id, file_path) # Get the first file with pagination files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id # Attach after uploading the file client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) raw_system_message = get_raw_system_message(client, agent_state.id) # Assert that the expected chunk is in the raw system message expected_chunk = """ - read_only=true - chars_current=46 - chars_limit=50000 [Viewing file start (out of 1 chunks)] 1: test """ assert expected_chunk in raw_system_message # Get the agent state, check blocks exist agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 1 assert any("test" in b.value for b in blocks) assert any(b.value.startswith("[Viewing file start") for b in blocks) # Detach the source client.agents.sources.detach(source_id=source.id, agent_id=agent_state.id) # Get the agent state, check blocks do NOT exist agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 0 assert not any("test" in b.value for b in blocks) # Verify no traces of the prompt exist in the raw system message after detaching raw_system_message_after_detach = get_raw_system_message(client, agent_state.id) assert expected_chunk not in raw_system_message_after_detach assert "test_source" not in raw_system_message_after_detach assert "" not in raw_system_message_after_detach def test_delete_source_removes_source_blocks_correctly(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") assert len(client.sources.list()) == 1 client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) raw_system_message = get_raw_system_message(client, agent_state.id) assert "test_source" in raw_system_message assert "" in raw_system_message # Load files into the source file_path = "tests/data/test.txt" # Upload the files upload_file_and_wait(client, source.id, file_path) raw_system_message = get_raw_system_message(client, agent_state.id) # Assert that the expected chunk is in the raw system message expected_chunk = """ - read_only=true - chars_current=46 - chars_limit=50000 [Viewing file start (out of 1 chunks)] 1: test """ assert expected_chunk in raw_system_message # Get the agent state, check blocks exist agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 1 assert any("test" in b.value for b in blocks) # Remove file from source client.sources.delete(source_id=source.id) raw_system_message_after_detach = get_raw_system_message(client, agent_state.id) assert expected_chunk not in raw_system_message_after_detach assert "test_source" not in raw_system_message_after_detach assert "" not in raw_system_message_after_detach # Get the agent state, check blocks do NOT exist agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 0 assert not any("test" in b.value for b in blocks) def test_agent_uses_open_close_file_correctly(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") sources_list = client.sources.list() assert len(sources_list) == 1 # Attach source to agent client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) # Load files into the source file_path = "tests/data/long_test.txt" # Upload the files upload_file_and_wait(client, source.id, file_path) # Get uploaded files files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id file = files[0] # Check that file is opened initially agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks print(f"Agent has {len(blocks)} file block(s)") if blocks: initial_content_length = len(blocks[0].value) print(f"Initial file content length: {initial_content_length} characters") print(f"First 100 chars of content: {blocks[0].value[:100]}...") assert initial_content_length > 10, f"Expected file content > 10 chars, got {initial_content_length}" # Ask agent to open the file for a specific range using offset/length offset, length = 1, 5 # 1-indexed offset, 5 lines print(f"Requesting agent to open file with offset={offset}, length={length}") open_response1 = client.agents.messages.create( agent_id=agent_state.id, messages=[ MessageCreate( role="user", content=f"Use ONLY the open_files tool to open the file named test_source/{file.file_name} with offset {offset} and length {length}", ) ], ) print(f"First open request sent, got {len(open_response1.messages)} message(s) in response") print(open_response1.messages) # Check that file is opened agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks old_value = blocks[0].value old_content_length = len(old_value) print(f"File content length after first open: {old_content_length} characters") print(f"First range content: '{old_value}'") assert old_content_length > 10, f"Expected content > 10 chars for offset={offset}, length={length}, got {old_content_length}" # Assert specific content expectations for first range (lines 1-5) assert "[Viewing chunks 1 to 5 (out of 554 chunks)]" in old_value, f"Expected viewing header for lines 1-5, got: {old_value[:100]}..." assert "1: Enrico Letta" in old_value, f"Expected line 1 to start with '1: Enrico Letta', got: {old_value[:200]}..." assert "5: appointed to the Cabinet" in old_value, f"Expected line 5 to contain '5: appointed to the Cabinet', got: {old_value}" # Ask agent to open the file for a different range offset, length = 6, 5 # Different offset, same length open_response2 = client.agents.messages.create( agent_id=agent_state.id, messages=[ MessageCreate( role="user", content=f"Use ONLY the open_files tool to open the file named {file.file_name} with offset {offset} and length {length}", ) ], ) print(f"Second open request sent, got {len(open_response2.messages)} message(s) in response") print(open_response2.messages) # Check that file is opened, but for different range print("Verifying file is opened with second range...") agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks new_value = blocks[0].value new_content_length = len(new_value) print(f"File content length after second open: {new_content_length} characters") print(f"Second range content: '{new_value}'") assert new_content_length > 10, f"Expected content > 10 chars for offset={offset}, length={length}, got {new_content_length}" # Assert specific content expectations for second range (lines 6-10) assert "[Viewing chunks 6 to 10 (out of 554 chunks)]" in new_value, f"Expected viewing header for lines 6-10, got: {new_value[:100]}..." assert ( "6: was promoted to become Minister" in new_value ), f"Expected line 6 to start with '6: was promoted to become Minister', got: {new_value[:200]}..." assert ( "10: produced an inconclusive result" in new_value ), f"Expected line 10 to contain '10: produced an inconclusive result', got: {new_value}" print(f"Comparing content ranges:") print(f" First range (offset=1, length=5): '{old_value}'") print(f" Second range (offset=6, length=5): '{new_value}'") assert new_value != old_value, f"Different view ranges should have different content. New: '{new_value}', Old: '{old_value}'" # Assert that ranges don't overlap - first range should not contain line 6, second should not contain line 1 assert "6: was promoted" not in old_value, f"First range (1-5) should not contain line 6, got: {old_value}" assert "1: Enrico Letta" not in new_value, f"Second range (6-10) should not contain line 1, got: {new_value}" print("✓ File successfully opened with different range - content differs as expected") def test_agent_uses_search_files_correctly(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") sources_list = client.sources.list() assert len(sources_list) == 1 # Attach source to agent client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) # Load files into the source file_path = "tests/data/long_test.txt" print(f"Uploading file: {file_path}") # Upload the files file_metadata = upload_file_and_wait(client, source.id, file_path) print(f"File uploaded and processed: {file_metadata.file_name}") # Get uploaded files files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id # Ask agent to use the semantic_search_files tool search_files_response = client.agents.messages.create( agent_id=agent_state.id, messages=[ MessageCreate( role="user", content=f"Use ONLY the semantic_search_files tool to search for details regarding the electoral history." ) ], ) print(f"Search file request sent, got {len(search_files_response.messages)} message(s) in response") print(search_files_response.messages) # Check that archival_memory_search was called tool_calls = [msg for msg in search_files_response.messages if msg.message_type == "tool_call_message"] assert len(tool_calls) > 0, "No tool calls found" assert any(tc.tool_call.name == "semantic_search_files" for tc in tool_calls), "semantic_search_files not called" # Check it returned successfully tool_returns = [msg for msg in search_files_response.messages if msg.message_type == "tool_return_message"] assert len(tool_returns) > 0, "No tool returns found" assert all(tr.status == "success" for tr in tool_returns), "Tool call failed" def test_agent_uses_grep_correctly_basic(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") sources_list = client.sources.list() assert len(sources_list) == 1 # Attach source to agent client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) # Load files into the source file_path = "tests/data/long_test.txt" print(f"Uploading file: {file_path}") # Upload the files file_metadata = upload_file_and_wait(client, source.id, file_path) print(f"File uploaded and processed: {file_metadata.file_name}") # Get uploaded files files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id # Ask agent to use the semantic_search_files tool search_files_response = client.agents.messages.create( agent_id=agent_state.id, messages=[MessageCreate(role="user", content=f"Use ONLY the grep_files tool to search for `Nunzia De Girolamo`.")], ) print(f"Grep request sent, got {len(search_files_response.messages)} message(s) in response") print(search_files_response.messages) # Check that grep_files was called tool_calls = [msg for msg in search_files_response.messages if msg.message_type == "tool_call_message"] assert len(tool_calls) > 0, "No tool calls found" assert any(tc.tool_call.name == "grep_files" for tc in tool_calls), "semantic_search_files not called" # Check it returned successfully tool_returns = [msg for msg in search_files_response.messages if msg.message_type == "tool_return_message"] assert len(tool_returns) > 0, "No tool returns found" assert all(tr.status == "success" for tr in tool_returns), "Tool call failed" def test_agent_uses_grep_correctly_advanced(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") sources_list = client.sources.list() assert len(sources_list) == 1 # Attach source to agent client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) # Load files into the source file_path = "tests/data/list_tools.json" print(f"Uploading file: {file_path}") # Upload the files file_metadata = upload_file_and_wait(client, source.id, file_path) print(f"File uploaded and processed: {file_metadata.file_name}") # Get uploaded files files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id # Ask agent to use the semantic_search_files tool search_files_response = client.agents.messages.create( agent_id=agent_state.id, messages=[ MessageCreate(role="user", content=f"Use ONLY the grep_files tool to search for `tool-f5b80b08-5a45-4a0a-b2cd-dd8a0177b7ef`.") ], ) print(f"Grep request sent, got {len(search_files_response.messages)} message(s) in response") print(search_files_response.messages) tool_return_message = next((m for m in search_files_response.messages if m.message_type == "tool_return_message"), None) assert tool_return_message is not None, "No ToolReturnMessage found in messages" # Basic structural integrity checks assert tool_return_message.name == "grep_files" assert tool_return_message.status == "success" assert "Found 1 matches" in tool_return_message.tool_return assert "tool-f5b80b08-5a45-4a0a-b2cd-dd8a0177b7ef" in tool_return_message.tool_return # Context line integrity (3 lines before and after) assert "507:" in tool_return_message.tool_return assert "508:" in tool_return_message.tool_return assert "509:" in tool_return_message.tool_return assert "> 510:" in tool_return_message.tool_return # Match line with > prefix assert "511:" in tool_return_message.tool_return assert "512:" in tool_return_message.tool_return assert "513:" in tool_return_message.tool_return def test_create_agent_with_source_ids_creates_source_blocks_correctly(disable_pinecone, client: LettaSDKClient): """Test that creating an agent with source_ids parameter correctly creates source blocks.""" # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") assert len(client.sources.list()) == 1 # Upload a file to the source before attaching file_path = "tests/data/long_test.txt" upload_file_and_wait(client, source.id, file_path) # Get uploaded files to verify files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id # Create agent with source_ids parameter temp_agent_state = client.agents.create( name="test_agent_with_sources", memory_blocks=[ CreateBlock( label="human", value="username: sarah", ), ], model="openai/gpt-4o-mini", embedding="openai/text-embedding-3-small", source_ids=[source.id], # Attach source during creation ) # Verify agent was created successfully assert temp_agent_state is not None assert temp_agent_state.name == "test_agent_with_sources" # Check that source blocks were created correctly blocks = temp_agent_state.memory.file_blocks assert len(blocks) == 1 assert any(b.value.startswith("[Viewing file start (out of 554 chunks)]") for b in blocks) # Verify file tools were automatically attached file_tools = {tool.name for tool in temp_agent_state.tools if tool.tool_type == ToolType.LETTA_FILES_CORE} assert file_tools == set(FILES_TOOLS) def test_view_ranges_have_metadata(disable_pinecone, client: LettaSDKClient, agent_state: AgentState): # Create a new source source = client.sources.create(name="test_source", embedding="openai/text-embedding-3-small") sources_list = client.sources.list() assert len(sources_list) == 1 # Attach source to agent client.agents.sources.attach(source_id=source.id, agent_id=agent_state.id) # Load files into the source file_path = "tests/data/1_to_100.py" # Upload the files upload_file_and_wait(client, source.id, file_path) # Get uploaded files files = client.sources.files.list(source_id=source.id, limit=1) assert len(files) == 1 assert files[0].source_id == source.id file = files[0] # Check that file is opened initially agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 1 block = blocks[0] assert block.value.startswith("[Viewing file start (out of 100 lines)]") # Open a specific range using offset/length offset = 50 # 1-indexed line 50 length = 5 # 5 lines (50-54) open_response = client.agents.messages.create( agent_id=agent_state.id, messages=[ MessageCreate( role="user", content=f"Use ONLY the open_files tool to open the file named test_source/{file.file_name} with offset {offset} and length {length}", ) ], ) print(f"Open request sent, got {len(open_response.messages)} message(s) in response") print(open_response.messages) # Check that file is opened correctly agent_state = client.agents.retrieve(agent_id=agent_state.id) blocks = agent_state.memory.file_blocks assert len(blocks) == 1 block = blocks[0] print(block.value) assert ( block.value == """ [Viewing lines 50 to 54 (out of 100 lines)] 50: x50 = 50 51: x51 = 51 52: x52 = 52 53: x53 = 53 54: x54 = 54 """.strip() ) def test_duplicate_file_renaming(disable_pinecone, client: LettaSDKClient): """Test that duplicate files are renamed with count-based suffixes (e.g., file.txt, file (1).txt, file (2).txt)""" # Create a new source source = client.sources.create(name="test_duplicate_source", embedding="openai/text-embedding-3-small") # Upload the same file three times file_path = "tests/data/test.txt" with open(file_path, "rb") as f: first_file = client.sources.files.upload(source_id=source.id, file=f) with open(file_path, "rb") as f: second_file = client.sources.files.upload(source_id=source.id, file=f) with open(file_path, "rb") as f: third_file = client.sources.files.upload(source_id=source.id, file=f) # Get all uploaded files files = client.sources.files.list(source_id=source.id, limit=10) assert len(files) == 3, f"Expected 3 files, got {len(files)}" # Sort files by creation time to ensure predictable order files.sort(key=lambda f: f.created_at) # Verify filenames follow the count-based pattern expected_filenames = ["test.txt", "test_(1).txt", "test_(2).txt"] actual_filenames = [f.file_name for f in files] assert actual_filenames == expected_filenames, f"Expected {expected_filenames}, got {actual_filenames}" # Verify all files have the same original_file_name for file in files: assert file.original_file_name == "test.txt", f"Expected original_file_name='test.txt', got '{file.original_file_name}'" print(f"✓ Successfully tested duplicate file renaming:") for i, file in enumerate(files): print(f" File {i+1}: original='{file.original_file_name}' → renamed='{file.file_name}'") def test_open_files_schema_descriptions(disable_pinecone, client: LettaSDKClient): """Test that open_files tool schema contains correct descriptions from docstring""" # Get the open_files tool tools = client.tools.list(name="open_files") assert len(tools) == 1, "Expected exactly one open_files tool" open_files_tool = tools[0] schema = open_files_tool.json_schema # Check main function description includes the full multiline docstring with examples description = schema["description"] # Check main description line assert ( "Open one or more files and load their contents into files section in core memory. Maximum of 5 files can be opened simultaneously." in description ) # Check that examples are included assert "Examples:" in description assert 'FileOpenRequest(file_name="project_utils/config.py")' in description assert 'FileOpenRequest(file_name="project_utils/config.py", offset=1, length=50)' in description assert "# Lines 1-50" in description assert "# Lines 100-199" in description assert "# Entire file" in description assert "close_all_others=True" in description assert "View specific portions of large files (e.g. functions or definitions)" in description # Check parameters structure assert "parameters" in schema assert "properties" in schema["parameters"] properties = schema["parameters"]["properties"] # Check file_requests parameter assert "file_requests" in properties file_requests_prop = properties["file_requests"] expected_file_requests_desc = "List of file open requests, each specifying file name and optional view range." assert ( file_requests_prop["description"] == expected_file_requests_desc ), f"Expected file_requests description: '{expected_file_requests_desc}', got: '{file_requests_prop['description']}'" # Check close_all_others parameter assert "close_all_others" in properties close_all_others_prop = properties["close_all_others"] expected_close_all_others_desc = "If True, closes all other currently open files first. Defaults to False." assert ( close_all_others_prop["description"] == expected_close_all_others_desc ), f"Expected close_all_others description: '{expected_close_all_others_desc}', got: '{close_all_others_prop['description']}'" # Check that file_requests is an array type assert file_requests_prop["type"] == "array", f"Expected file_requests type to be 'array', got: '{file_requests_prop['type']}'" # Check FileOpenRequest schema within file_requests items assert "items" in file_requests_prop file_request_items = file_requests_prop["items"] assert file_request_items["type"] == "object", "Expected FileOpenRequest to be object type" # Check FileOpenRequest properties assert "properties" in file_request_items file_request_properties = file_request_items["properties"] # Check file_name field assert "file_name" in file_request_properties file_name_prop = file_request_properties["file_name"] assert file_name_prop["description"] == "Name of the file to open" assert file_name_prop["type"] == "string" # Check offset field assert "offset" in file_request_properties offset_prop = file_request_properties["offset"] expected_offset_desc = "Optional starting line number (1-indexed). If not specified, starts from beginning of file." assert offset_prop["description"] == expected_offset_desc assert offset_prop["type"] == "integer" # Check length field assert "length" in file_request_properties length_prop = file_request_properties["length"] expected_length_desc = "Optional number of lines to view from offset (inclusive). If not specified, views to end of file." assert length_prop["description"] == expected_length_desc assert length_prop["type"] == "integer" # --- Pinecone Tests --- def test_pinecone_search_files_tool(client: LettaSDKClient): """Test that search_files tool uses Pinecone when enabled""" from letta.helpers.pinecone_utils import should_use_pinecone if not should_use_pinecone(verbose=True): pytest.skip("Pinecone not configured (missing API key or disabled), skipping Pinecone-specific tests") print("Testing Pinecone search_files tool functionality") # Create agent with file tools agent = client.agents.create( name="test_pinecone_agent", memory_blocks=[ CreateBlock(label="human", value="username: testuser"), ], model="openai/gpt-4o-mini", embedding="openai/text-embedding-3-small", ) # Create source and attach to agent source = client.sources.create(name="test_pinecone_source", embedding="openai/text-embedding-3-small") client.agents.sources.attach(source_id=source.id, agent_id=agent.id) # Upload a file with searchable content file_path = "tests/data/long_test.txt" upload_file_and_wait(client, source.id, file_path) # Test semantic search using Pinecone search_response = client.agents.messages.create( agent_id=agent.id, messages=[MessageCreate(role="user", content="Use the semantic_search_files tool to search for 'electoral history' in the files.")], ) # Verify tool was called successfully tool_calls = [msg for msg in search_response.messages if msg.message_type == "tool_call_message"] assert len(tool_calls) > 0, "No tool calls found" assert any(tc.tool_call.name == "semantic_search_files" for tc in tool_calls), "semantic_search_files not called" # Verify tool returned results tool_returns = [msg for msg in search_response.messages if msg.message_type == "tool_return_message"] assert len(tool_returns) > 0, "No tool returns found" assert all(tr.status == "success" for tr in tool_returns), "Tool call failed" # Check that results contain expected content search_results = tool_returns[0].tool_return print(search_results) assert ( "electoral" in search_results.lower() or "history" in search_results.lower() ), f"Search results should contain relevant content: {search_results}" def test_pinecone_lifecycle_file_and_source_deletion(client: LettaSDKClient): """Test that file and source deletion removes records from Pinecone""" import asyncio from letta.helpers.pinecone_utils import list_pinecone_index_for_files, should_use_pinecone if not should_use_pinecone(): pytest.skip("Pinecone not configured (missing API key or disabled), skipping Pinecone-specific tests") print("Testing Pinecone file and source deletion lifecycle") # Create source source = client.sources.create(name="test_lifecycle_source", embedding="openai/text-embedding-3-small") # Upload multiple files and wait for processing file_paths = ["tests/data/test.txt", "tests/data/test.md"] uploaded_files = [] for file_path in file_paths: file_metadata = upload_file_and_wait(client, source.id, file_path) uploaded_files.append(file_metadata) # Get temp user for Pinecone operations user = User(name="temp", organization_id=DEFAULT_ORG_ID) # Test file-level deletion first if len(uploaded_files) > 1: file_to_delete = uploaded_files[0] # Check records for the specific file using list function records_before = asyncio.run(list_pinecone_index_for_files(file_to_delete.id, user)) print(f"Found {len(records_before)} records for file before deletion") # Delete the file client.sources.files.delete(source_id=source.id, file_id=file_to_delete.id) # Allow time for deletion to propagate time.sleep(2) # Verify file records are removed records_after = asyncio.run(list_pinecone_index_for_files(file_to_delete.id, user)) print(f"Found {len(records_after)} records for file after deletion") assert len(records_after) == 0, f"File records should be removed from Pinecone after deletion, but found {len(records_after)}" # Test source-level deletion - check remaining files # Check records for remaining files remaining_records = [] for file_metadata in uploaded_files[1:]: # Skip the already deleted file file_records = asyncio.run(list_pinecone_index_for_files(file_metadata.id, user)) remaining_records.extend(file_records) records_before = len(remaining_records) print(f"Found {records_before} records for remaining files before source deletion") # Delete the entire source client.sources.delete(source_id=source.id) # Allow time for deletion to propagate time.sleep(3) # Verify all remaining file records are removed records_after = [] for file_metadata in uploaded_files[1:]: file_records = asyncio.run(list_pinecone_index_for_files(file_metadata.id, user)) records_after.extend(file_records) print(f"Found {len(records_after)} records for files after source deletion") assert ( len(records_after) == 0 ), f"All source records should be removed from Pinecone after source deletion, but found {len(records_after)}" print("✓ Pinecone lifecycle verified - namespace is clean after source deletion")