diff --git a/tests/mcp_tests/test_mcp.py b/tests/mcp_tests/test_mcp.py index e5027901..71eabbe5 100644 --- a/tests/mcp_tests/test_mcp.py +++ b/tests/mcp_tests/test_mcp.py @@ -1,13 +1,14 @@ import json import os +import shutil import subprocess import threading +import time import uuid import venv from pathlib import Path import pytest -import requests from dotenv import load_dotenv from letta_client import Letta, McpTool, ToolCallMessage, ToolReturnMessage @@ -19,7 +20,7 @@ from letta.schemas.message import MessageCreate from tests.utils import wait_for_server -def create_virtualenv_and_install_requirements(requirements_path: Path, name="venv") -> Path: +def create_virtualenv_and_install_requirements(requirements_path: Path, name="venv", force_recreate=True) -> Path: requirements_path = requirements_path.resolve() if not requirements_path.exists(): @@ -29,16 +30,41 @@ def create_virtualenv_and_install_requirements(requirements_path: Path, name="ve venv_dir = requirements_path.parent / name + # Always clean up existing venv if force_recreate is True (default) + # This prevents corruption issues + if venv_dir.exists() and force_recreate: + try: + shutil.rmtree(venv_dir) + print(f"Cleaned up existing venv at {venv_dir}") + except Exception as e: + print(f"Warning: Failed to remove existing venv: {e}") + # Continue anyway, might still work + + # Create fresh venv if not venv_dir.exists(): - venv.EnvBuilder(with_pip=True).create(venv_dir) + venv.EnvBuilder(with_pip=True, clear=True).create(venv_dir) pip_path = venv_dir / ("Scripts" if os.name == "nt" else "bin") / "pip" + + # Wait a moment for venv creation to complete + for _ in range(10): + if pip_path.exists(): + break + time.sleep(0.1) + if not pip_path.exists(): raise FileNotFoundError(f"pip executable not found at: {pip_path}") try: + # Upgrade pip first to avoid potential issues + subprocess.check_call([str(pip_path), "install", "--upgrade", "pip"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + # Install requirements subprocess.check_call([str(pip_path), "install", "-r", str(requirements_path)]) except subprocess.CalledProcessError as exc: + # On failure, try to clean up and recreate once more + if not force_recreate: # Avoid infinite recursion + print(f"Initial pip install failed, attempting clean recreation...") + return create_virtualenv_and_install_requirements(requirements_path, name, force_recreate=False) raise RuntimeError(f"pip install failed with exit code {exc.returncode}") return venv_dir @@ -63,6 +89,24 @@ def empty_mcp_config(): return path +@pytest.fixture(autouse=True) +def cleanup_test_venvs(): + """Fixture to clean up test virtual environments before and after tests.""" + venv_path = Path(__file__).parent / "weather" / "venv" + + # Clean before test (in case of previous failure) + if venv_path.exists(): + try: + shutil.rmtree(venv_path) + except Exception: + pass # Ignore errors during cleanup + + yield # Run the test + + # Note: We don't clean after test to allow debugging if needed + # The next test run will clean it anyway + + @pytest.fixture() def server_url(empty_mcp_config): """Ensures a server is running and returns its base URL.""" @@ -108,55 +152,59 @@ def agent_state(client): @pytest.mark.asyncio async def test_sse_mcp_server(client, agent_state): - mcp_server_name = "deepwiki" - server_url = "https://mcp.deepwiki.com/sse" - sse_mcp_config = SSEServerConfig(server_name=mcp_server_name, server_url=server_url) - client.tools.add_mcp_server(request=sse_mcp_config) + try: + mcp_server_name = "deepwiki" + server_url = "https://mcp.deepwiki.com/sse" + sse_mcp_config = SSEServerConfig(server_name=mcp_server_name, server_url=server_url) + client.tools.add_mcp_server(request=sse_mcp_config) - # Check that it's in the server mapping - mcp_server_mapping = client.tools.list_mcp_servers() - assert mcp_server_name in mcp_server_mapping + # Check that it's in the server mapping + mcp_server_mapping = client.tools.list_mcp_servers() + assert mcp_server_name in mcp_server_mapping - # Check tools - tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name) - assert len(tools) > 0 - assert isinstance(tools[0], McpTool) + # Check tools + tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name) + assert len(tools) > 0 + assert isinstance(tools[0], McpTool) - # Test with the ask_question tool which is one of the available deepwiki tools - ask_question_tool = next((t for t in tools if t.name == "ask_question"), None) - assert ask_question_tool is not None, f"ask_question tool not found. Available tools: {[t.name for t in tools]}" + # Test with the ask_question tool which is one of the available deepwiki tools + ask_question_tool = next((t for t in tools if t.name == "ask_question"), None) + assert ask_question_tool is not None, f"ask_question tool not found. Available tools: {[t.name for t in tools]}" - # Check that the tool is executable - letta_tool = client.tools.add_mcp_tool(mcp_server_name=mcp_server_name, mcp_tool_name=ask_question_tool.name) + # Check that the tool is executable + letta_tool = client.tools.add_mcp_tool(mcp_server_name=mcp_server_name, mcp_tool_name=ask_question_tool.name) - tool_args = {"repoName": "facebook/react", "question": "What is React?"} + tool_args = {"repoName": "facebook/react", "question": "What is React?"} - # Add to agent, have agent invoke tool - client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id) - response = client.agents.messages.create( - agent_id=agent_state.id, - messages=[ - MessageCreate( - role="user", - content=[TextContent(text=f"Use the `{letta_tool.name}` tool with these arguments: {tool_args}.")], - ) - ], - ) - seq = response.messages - calls = [m for m in seq if isinstance(m, ToolCallMessage)] - assert calls, "Expected a ToolCallMessage" - assert calls[0].tool_call.name == "ask_question" + # Add to agent, have agent invoke tool + client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id) + response = client.agents.messages.create( + agent_id=agent_state.id, + messages=[ + MessageCreate( + role="user", + content=[TextContent(text=f"Use the `{letta_tool.name}` tool with these arguments: {tool_args}.")], + ) + ], + ) + seq = response.messages + calls = [m for m in seq if isinstance(m, ToolCallMessage)] + assert calls, "Expected a ToolCallMessage" + assert calls[0].tool_call.name == "ask_question" - returns = [m for m in seq if isinstance(m, ToolReturnMessage)] - assert returns, "Expected a ToolReturnMessage" - tr = returns[0] - # status field - assert tr.status == "success", f"Bad status: {tr.status}" - # Check that we got some content back - assert len(tr.tool_return.strip()) > 0, f"Expected non-empty tool return, got: {tr.tool_return}" + returns = [m for m in seq if isinstance(m, ToolReturnMessage)] + assert returns, "Expected a ToolReturnMessage" + tr = returns[0] + # status field + assert tr.status == "success", f"Bad status: {tr.status}" + # Check that we got some content back + assert len(tr.tool_return.strip()) > 0, f"Expected non-empty tool return, got: {tr.tool_return}" + finally: + client.tools.delete_mcp_server(mcp_server_name=mcp_server_name) + assert mcp_server_name not in client.tools.list_mcp_servers() -def test_stdio_mcp_server(client, agent_state): +def test_stdio_mcp_server(client, agent_state, server_url): req_file = Path(__file__).parent / "weather" / "requirements.txt" create_virtualenv_and_install_requirements(req_file, name="venv") @@ -170,44 +218,48 @@ def test_stdio_mcp_server(client, agent_state): args=args, ) - client.tools.add_mcp_server(request=stdio_config) + try: + client.tools.add_mcp_server(request=stdio_config) - servers = client.tools.list_mcp_servers() - assert mcp_server_name in servers + servers = client.tools.list_mcp_servers() + assert mcp_server_name in servers - tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name) - assert tools, "Expected at least one tool from the weather MCP server" - assert any(t.name == "get_alerts" for t in tools), f"Got: {[t.name for t in tools]}" + tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name) + assert tools, "Expected at least one tool from the weather MCP server" + assert any(t.name == "get_alerts" for t in tools), f"Got: {[t.name for t in tools]}" - get_alerts = next(t for t in tools if t.name == "get_alerts") + get_alerts = next(t for t in tools if t.name == "get_alerts") - letta_tool = client.tools.add_mcp_tool( - mcp_server_name=mcp_server_name, - mcp_tool_name=get_alerts.name, - ) + letta_tool = client.tools.add_mcp_tool( + mcp_server_name=mcp_server_name, + mcp_tool_name=get_alerts.name, + ) - client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id) + client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id) - response = client.agents.messages.create( - agent_id=agent_state.id, - messages=[ - MessageCreate( - role="user", - content=[TextContent(text=(f"Use the `{letta_tool.name}` tool with these arguments: {{'state': 'CA'}}."))], - ) - ], - ) + response = client.agents.messages.create( + agent_id=agent_state.id, + messages=[ + MessageCreate( + role="user", + content=[TextContent(text=(f"Use the `{letta_tool.name}` tool with these arguments: {{'state': 'CA'}}."))], + ) + ], + ) - calls = [m for m in response.messages if isinstance(m, ToolCallMessage) and m.tool_call.name == "get_alerts"] - assert calls, "Expected a get_alerts ToolCallMessage" + calls = [m for m in response.messages if isinstance(m, ToolCallMessage) and m.tool_call.name == "get_alerts"] + assert calls, "Expected a get_alerts ToolCallMessage" - returns = [m for m in response.messages if isinstance(m, ToolReturnMessage) and m.tool_call_id == calls[0].tool_call.tool_call_id] - assert returns, "Expected a ToolReturnMessage for get_alerts" - ret = returns[0] + returns = [m for m in response.messages if isinstance(m, ToolReturnMessage) and m.tool_call_id == calls[0].tool_call.tool_call_id] + assert returns, "Expected a ToolReturnMessage for get_alerts" + ret = returns[0] - assert ret.status == "success", f"Unexpected status: {ret.status}" - # make sure there's at least some payload - assert len(ret.tool_return.strip()) >= 10, f"Expected at least 10 characters in tool_return, got {len(ret.tool_return.strip())}" + assert ret.status == "success", f"Unexpected status: {ret.status}" + # make sure there's at least some payload + assert len(ret.tool_return.strip()) >= 10, f"Expected at least 10 characters in tool_return, got {len(ret.tool_return.strip())}" + finally: + client.tools.delete_mcp_server(mcp_server_name=mcp_server_name) + assert mcp_server_name not in client.tools.list_mcp_servers() # Optional OpenAI validation test for MCP-normalized schema @@ -321,8 +373,5 @@ async def test_streamable_http_mcp_server_update_schema_no_docstring_required(cl letta_tool_2 = client.tools.add_mcp_tool(mcp_server_name=mcp_server_name, mcp_tool_name=ask_question_tool.name) assert letta_tool_2 is not None finally: - # Best-effort cleanup (DELETE by name) - try: - requests.delete(f"{server_url}/v1/tools/mcp/servers/{mcp_server_name}") - except Exception: - pass + client.tools.delete_mcp_server(mcp_server_name=mcp_server_name) + assert mcp_server_name not in client.tools.list_mcp_servers()