fix: test_mcp.py tests [LET-4075]

* fix tests

* update venv cleanup
This commit is contained in:
jnjpng
2025-08-27 15:04:47 -07:00
committed by GitHub
parent 2b23ec7dff
commit a4789aa8f4

View File

@@ -1,13 +1,14 @@
import json
import os
import shutil
import subprocess
import threading
import time
import uuid
import venv
from pathlib import Path
import pytest
import requests
from dotenv import load_dotenv
from letta_client import Letta, McpTool, ToolCallMessage, ToolReturnMessage
@@ -19,7 +20,7 @@ from letta.schemas.message import MessageCreate
from tests.utils import wait_for_server
def create_virtualenv_and_install_requirements(requirements_path: Path, name="venv") -> Path:
def create_virtualenv_and_install_requirements(requirements_path: Path, name="venv", force_recreate=True) -> Path:
requirements_path = requirements_path.resolve()
if not requirements_path.exists():
@@ -29,16 +30,41 @@ def create_virtualenv_and_install_requirements(requirements_path: Path, name="ve
venv_dir = requirements_path.parent / name
# Always clean up existing venv if force_recreate is True (default)
# This prevents corruption issues
if venv_dir.exists() and force_recreate:
try:
shutil.rmtree(venv_dir)
print(f"Cleaned up existing venv at {venv_dir}")
except Exception as e:
print(f"Warning: Failed to remove existing venv: {e}")
# Continue anyway, might still work
# Create fresh venv
if not venv_dir.exists():
venv.EnvBuilder(with_pip=True).create(venv_dir)
venv.EnvBuilder(with_pip=True, clear=True).create(venv_dir)
pip_path = venv_dir / ("Scripts" if os.name == "nt" else "bin") / "pip"
# Wait a moment for venv creation to complete
for _ in range(10):
if pip_path.exists():
break
time.sleep(0.1)
if not pip_path.exists():
raise FileNotFoundError(f"pip executable not found at: {pip_path}")
try:
# Upgrade pip first to avoid potential issues
subprocess.check_call([str(pip_path), "install", "--upgrade", "pip"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Install requirements
subprocess.check_call([str(pip_path), "install", "-r", str(requirements_path)])
except subprocess.CalledProcessError as exc:
# On failure, try to clean up and recreate once more
if not force_recreate: # Avoid infinite recursion
print(f"Initial pip install failed, attempting clean recreation...")
return create_virtualenv_and_install_requirements(requirements_path, name, force_recreate=False)
raise RuntimeError(f"pip install failed with exit code {exc.returncode}")
return venv_dir
@@ -63,6 +89,24 @@ def empty_mcp_config():
return path
@pytest.fixture(autouse=True)
def cleanup_test_venvs():
"""Fixture to clean up test virtual environments before and after tests."""
venv_path = Path(__file__).parent / "weather" / "venv"
# Clean before test (in case of previous failure)
if venv_path.exists():
try:
shutil.rmtree(venv_path)
except Exception:
pass # Ignore errors during cleanup
yield # Run the test
# Note: We don't clean after test to allow debugging if needed
# The next test run will clean it anyway
@pytest.fixture()
def server_url(empty_mcp_config):
"""Ensures a server is running and returns its base URL."""
@@ -108,55 +152,59 @@ def agent_state(client):
@pytest.mark.asyncio
async def test_sse_mcp_server(client, agent_state):
mcp_server_name = "deepwiki"
server_url = "https://mcp.deepwiki.com/sse"
sse_mcp_config = SSEServerConfig(server_name=mcp_server_name, server_url=server_url)
client.tools.add_mcp_server(request=sse_mcp_config)
try:
mcp_server_name = "deepwiki"
server_url = "https://mcp.deepwiki.com/sse"
sse_mcp_config = SSEServerConfig(server_name=mcp_server_name, server_url=server_url)
client.tools.add_mcp_server(request=sse_mcp_config)
# Check that it's in the server mapping
mcp_server_mapping = client.tools.list_mcp_servers()
assert mcp_server_name in mcp_server_mapping
# Check that it's in the server mapping
mcp_server_mapping = client.tools.list_mcp_servers()
assert mcp_server_name in mcp_server_mapping
# Check tools
tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name)
assert len(tools) > 0
assert isinstance(tools[0], McpTool)
# Check tools
tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name)
assert len(tools) > 0
assert isinstance(tools[0], McpTool)
# Test with the ask_question tool which is one of the available deepwiki tools
ask_question_tool = next((t for t in tools if t.name == "ask_question"), None)
assert ask_question_tool is not None, f"ask_question tool not found. Available tools: {[t.name for t in tools]}"
# Test with the ask_question tool which is one of the available deepwiki tools
ask_question_tool = next((t for t in tools if t.name == "ask_question"), None)
assert ask_question_tool is not None, f"ask_question tool not found. Available tools: {[t.name for t in tools]}"
# Check that the tool is executable
letta_tool = client.tools.add_mcp_tool(mcp_server_name=mcp_server_name, mcp_tool_name=ask_question_tool.name)
# Check that the tool is executable
letta_tool = client.tools.add_mcp_tool(mcp_server_name=mcp_server_name, mcp_tool_name=ask_question_tool.name)
tool_args = {"repoName": "facebook/react", "question": "What is React?"}
tool_args = {"repoName": "facebook/react", "question": "What is React?"}
# Add to agent, have agent invoke tool
client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id)
response = client.agents.messages.create(
agent_id=agent_state.id,
messages=[
MessageCreate(
role="user",
content=[TextContent(text=f"Use the `{letta_tool.name}` tool with these arguments: {tool_args}.")],
)
],
)
seq = response.messages
calls = [m for m in seq if isinstance(m, ToolCallMessage)]
assert calls, "Expected a ToolCallMessage"
assert calls[0].tool_call.name == "ask_question"
# Add to agent, have agent invoke tool
client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id)
response = client.agents.messages.create(
agent_id=agent_state.id,
messages=[
MessageCreate(
role="user",
content=[TextContent(text=f"Use the `{letta_tool.name}` tool with these arguments: {tool_args}.")],
)
],
)
seq = response.messages
calls = [m for m in seq if isinstance(m, ToolCallMessage)]
assert calls, "Expected a ToolCallMessage"
assert calls[0].tool_call.name == "ask_question"
returns = [m for m in seq if isinstance(m, ToolReturnMessage)]
assert returns, "Expected a ToolReturnMessage"
tr = returns[0]
# status field
assert tr.status == "success", f"Bad status: {tr.status}"
# Check that we got some content back
assert len(tr.tool_return.strip()) > 0, f"Expected non-empty tool return, got: {tr.tool_return}"
returns = [m for m in seq if isinstance(m, ToolReturnMessage)]
assert returns, "Expected a ToolReturnMessage"
tr = returns[0]
# status field
assert tr.status == "success", f"Bad status: {tr.status}"
# Check that we got some content back
assert len(tr.tool_return.strip()) > 0, f"Expected non-empty tool return, got: {tr.tool_return}"
finally:
client.tools.delete_mcp_server(mcp_server_name=mcp_server_name)
assert mcp_server_name not in client.tools.list_mcp_servers()
def test_stdio_mcp_server(client, agent_state):
def test_stdio_mcp_server(client, agent_state, server_url):
req_file = Path(__file__).parent / "weather" / "requirements.txt"
create_virtualenv_and_install_requirements(req_file, name="venv")
@@ -170,44 +218,48 @@ def test_stdio_mcp_server(client, agent_state):
args=args,
)
client.tools.add_mcp_server(request=stdio_config)
try:
client.tools.add_mcp_server(request=stdio_config)
servers = client.tools.list_mcp_servers()
assert mcp_server_name in servers
servers = client.tools.list_mcp_servers()
assert mcp_server_name in servers
tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name)
assert tools, "Expected at least one tool from the weather MCP server"
assert any(t.name == "get_alerts" for t in tools), f"Got: {[t.name for t in tools]}"
tools = client.tools.list_mcp_tools_by_server(mcp_server_name=mcp_server_name)
assert tools, "Expected at least one tool from the weather MCP server"
assert any(t.name == "get_alerts" for t in tools), f"Got: {[t.name for t in tools]}"
get_alerts = next(t for t in tools if t.name == "get_alerts")
get_alerts = next(t for t in tools if t.name == "get_alerts")
letta_tool = client.tools.add_mcp_tool(
mcp_server_name=mcp_server_name,
mcp_tool_name=get_alerts.name,
)
letta_tool = client.tools.add_mcp_tool(
mcp_server_name=mcp_server_name,
mcp_tool_name=get_alerts.name,
)
client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id)
client.agents.tools.attach(agent_id=agent_state.id, tool_id=letta_tool.id)
response = client.agents.messages.create(
agent_id=agent_state.id,
messages=[
MessageCreate(
role="user",
content=[TextContent(text=(f"Use the `{letta_tool.name}` tool with these arguments: {{'state': 'CA'}}."))],
)
],
)
response = client.agents.messages.create(
agent_id=agent_state.id,
messages=[
MessageCreate(
role="user",
content=[TextContent(text=(f"Use the `{letta_tool.name}` tool with these arguments: {{'state': 'CA'}}."))],
)
],
)
calls = [m for m in response.messages if isinstance(m, ToolCallMessage) and m.tool_call.name == "get_alerts"]
assert calls, "Expected a get_alerts ToolCallMessage"
calls = [m for m in response.messages if isinstance(m, ToolCallMessage) and m.tool_call.name == "get_alerts"]
assert calls, "Expected a get_alerts ToolCallMessage"
returns = [m for m in response.messages if isinstance(m, ToolReturnMessage) and m.tool_call_id == calls[0].tool_call.tool_call_id]
assert returns, "Expected a ToolReturnMessage for get_alerts"
ret = returns[0]
returns = [m for m in response.messages if isinstance(m, ToolReturnMessage) and m.tool_call_id == calls[0].tool_call.tool_call_id]
assert returns, "Expected a ToolReturnMessage for get_alerts"
ret = returns[0]
assert ret.status == "success", f"Unexpected status: {ret.status}"
# make sure there's at least some payload
assert len(ret.tool_return.strip()) >= 10, f"Expected at least 10 characters in tool_return, got {len(ret.tool_return.strip())}"
assert ret.status == "success", f"Unexpected status: {ret.status}"
# make sure there's at least some payload
assert len(ret.tool_return.strip()) >= 10, f"Expected at least 10 characters in tool_return, got {len(ret.tool_return.strip())}"
finally:
client.tools.delete_mcp_server(mcp_server_name=mcp_server_name)
assert mcp_server_name not in client.tools.list_mcp_servers()
# Optional OpenAI validation test for MCP-normalized schema
@@ -321,8 +373,5 @@ async def test_streamable_http_mcp_server_update_schema_no_docstring_required(cl
letta_tool_2 = client.tools.add_mcp_tool(mcp_server_name=mcp_server_name, mcp_tool_name=ask_question_tool.name)
assert letta_tool_2 is not None
finally:
# Best-effort cleanup (DELETE by name)
try:
requests.delete(f"{server_url}/v1/tools/mcp/servers/{mcp_server_name}")
except Exception:
pass
client.tools.delete_mcp_server(mcp_server_name=mcp_server_name)
assert mcp_server_name not in client.tools.list_mcp_servers()