test(include_pings): Add long-running tests (#3899)

This commit is contained in:
Kian Jones
2025-08-14 18:13:47 -07:00
committed by GitHub
parent 8b8536c3ef
commit 8aceaf677b
3 changed files with 156 additions and 4 deletions

View File

@@ -1,7 +1,10 @@
import functools
import os
import time
from typing import Optional, Union
import requests
from letta.functions.functions import parse_source_code
from letta.functions.schema_generator import generate_schema
from letta.schemas.agent import AgentState, CreateAgent, UpdateAgent
@@ -11,6 +14,7 @@ from letta.schemas.memory import ContextWindowOverview
from letta.schemas.tool import Tool
from letta.schemas.user import User
from letta.schemas.user import User as PydanticUser
from letta.server.rest_api.routers.v1.agents import ImportedAgentsResponse
from letta.server.server import SyncServer
@@ -248,3 +252,29 @@ def validate_context_window_overview(
# Check for tools
assert overview.num_tokens_functions_definitions > 0
assert len(overview.functions_definitions) > 0
def upload_test_agentfile_from_disk(server_url: str, filename: str) -> ImportedAgentsResponse:
"""
Upload a given .af file to live FastAPI server.
"""
path_to_current_file = os.path.dirname(__file__)
path_to_test_agent_files = path_to_current_file.removesuffix("/helpers") + "/test_agent_files"
file_path = os.path.join(path_to_test_agent_files, filename)
with open(file_path, "rb") as f:
files = {"file": (filename, f, "application/json")}
# Send parameters as form data instead of query parameters
form_data = {
"append_copy_suffix": "true",
"override_existing_tools": "false",
}
response = requests.post(
f"{server_url}/v1/agents/import",
headers={"user_id": ""},
files=files,
data=form_data, # Send as form data
)
return ImportedAgentsResponse(**response.json())

View File

@@ -10,6 +10,8 @@ from dotenv import load_dotenv
from letta_client import AsyncLetta, MessageCreate, ReasoningMessage, ToolCallMessage
from letta_client.core import RequestOptions
from tests.helpers.utils import upload_test_agentfile_from_disk
REASONING_THROTTLE_MS = 100
TEST_USER_MESSAGE = "What products or services does 11x AI sell?"
@@ -19,7 +21,7 @@ def server_url() -> str:
"""
Provides the URL for the Letta server.
If LETTA_SERVER_URL is not set, starts the server in a background thread
and polls until its accepting connections.
and polls until it's accepting connections.
"""
def _run_server() -> None:
@@ -60,12 +62,11 @@ def client(server_url: str):
yield async_client_instance
async def test_pinecone_tool(client: AsyncLetta) -> None:
async def test_pinecone_tool(client: AsyncLetta, server_url: str) -> None:
"""
Test the Pinecone tool integration with the Letta client.
"""
with open("../../scripts/test-afs/knowledge-base.af", "rb") as f:
response = await client.agents.import_file(file=f)
response = upload_test_agentfile_from_disk(server_url, "knowledge-base.af")
agent_id = response.agent_ids[0]

View File

@@ -0,0 +1,121 @@
import os
import threading
import time
import httpx
import pytest
import requests
from dotenv import load_dotenv
from letta_client import Letta, MessageCreate, TextContent
from tests.helpers.utils import upload_test_agentfile_from_disk
RESEARCH_INSTRUCTIONS = "\n Lead Name: Kian Jones\n Lead Title: Software Engineer\n Lead LinkedIn URL: https://www.linkedin.com/in/kian-jones\n Company Name: Letta\n Company Domain: letta.com\n Company Industry: technology/software/ai\n \n**Research Instructions**\n"
DEEP_RESEARCH_INSTRUCTIONS = "Let's get started, we have to research mantis shrimps. I need to know everything there is, or my grandmother will die. Please begin immediately and do a great job, they are scaring me."
@pytest.fixture(scope="module")
def server_url() -> str:
"""
Provides the URL for the Letta server.
If LETTA_SERVER_URL is not set, starts the server in a background thread
and polls until it's accepting connections.
"""
def _run_server() -> None:
load_dotenv()
from letta.server.rest_api.app import start_server
start_server(debug=True)
url: str = os.getenv("LETTA_SERVER_URL", "http://localhost:8283")
if not os.getenv("LETTA_SERVER_URL"):
thread = threading.Thread(target=_run_server, daemon=True)
thread.start()
# Poll until the server is up (or timeout)
timeout_seconds = 30
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
resp = requests.get(url + "/v1/health")
if resp.status_code < 500:
break
except requests.exceptions.RequestException:
pass
time.sleep(0.1)
else:
raise RuntimeError(f"Could not reach {url} within {timeout_seconds}s")
return url
@pytest.fixture(scope="module")
def client(server_url: str) -> Letta:
"""
Creates and returns a synchronous Letta REST client for testing.
"""
client_instance = Letta(base_url=server_url)
return client_instance
async def test_deep_research_agent(client, server_url, disable_e2b_api_key):
imported_af = upload_test_agentfile_from_disk(server_url, "deep-thought.af")
agent_id = imported_af.agent_ids[0]
try:
response = client.agents.messages.create_stream(
agent_id=agent_id,
stream_tokens=True,
messages=[
MessageCreate(
role="user",
content=[
TextContent(
text=DEEP_RESEARCH_INSTRUCTIONS,
)
],
)
],
)
for chunk in response:
if chunk.message_type is not None:
print(chunk)
except httpx.ReadTimeout as e:
print("Timeout on create_stream. Consider enabling pings in create_stream if you have long running agents. ", e)
assert False
finally:
client.agents.delete(agent_id=agent_id)
async def test_11x_agent(client, server_url, disable_e2b_api_key):
imported_af = upload_test_agentfile_from_disk(server_url, "mock_alice.af")
agent_id = imported_af.agent_ids[0]
try:
response = client.agents.messages.create_stream(
agent_id=agent_id,
include_pings=True,
stream_tokens=True,
messages=[
MessageCreate(
role="user",
content=[
TextContent(
text=RESEARCH_INSTRUCTIONS,
)
],
)
],
)
for chunk in response:
if chunk.message_type is not None:
print(chunk)
except httpx.ReadTimeout as e:
print("Timeout on create_stream. Consider enabling pings in create_stream if you have long running agents. ", e)
assert False
finally:
client.agents.delete(agent_id=agent_id)