test(include_pings): Add long-running tests (#3899)

This commit is contained in:
Kian Jones
2025-08-14 18:13:47 -07:00
committed by GitHub
parent 711c22ec0e
commit 6731f769ca
6 changed files with 1669 additions and 4 deletions

View File

@@ -1,7 +1,10 @@
import functools
import os
import time
from typing import Optional, Union
import requests
from letta.functions.functions import parse_source_code
from letta.functions.schema_generator import generate_schema
from letta.schemas.agent import AgentState, CreateAgent, UpdateAgent
@@ -11,6 +14,7 @@ from letta.schemas.memory import ContextWindowOverview
from letta.schemas.tool import Tool
from letta.schemas.user import User
from letta.schemas.user import User as PydanticUser
from letta.server.rest_api.routers.v1.agents import ImportedAgentsResponse
from letta.server.server import SyncServer
@@ -248,3 +252,29 @@ def validate_context_window_overview(
# Check for tools
assert overview.num_tokens_functions_definitions > 0
assert len(overview.functions_definitions) > 0
def upload_test_agentfile_from_disk(server_url: str, filename: str) -> ImportedAgentsResponse:
"""
Upload a given .af file to live FastAPI server.
"""
path_to_current_file = os.path.dirname(__file__)
path_to_test_agent_files = path_to_current_file.removesuffix("/helpers") + "/test_agent_files"
file_path = os.path.join(path_to_test_agent_files, filename)
with open(file_path, "rb") as f:
files = {"file": (filename, f, "application/json")}
# Send parameters as form data instead of query parameters
form_data = {
"append_copy_suffix": "true",
"override_existing_tools": "false",
}
response = requests.post(
f"{server_url}/v1/agents/import",
headers={"user_id": ""},
files=files,
data=form_data, # Send as form data
)
return ImportedAgentsResponse(**response.json())

View File

@@ -10,6 +10,8 @@ from dotenv import load_dotenv
from letta_client import AsyncLetta, MessageCreate, ReasoningMessage, ToolCallMessage
from letta_client.core import RequestOptions
from tests.helpers.utils import upload_test_agentfile_from_disk
REASONING_THROTTLE_MS = 100
TEST_USER_MESSAGE = "What products or services does 11x AI sell?"
@@ -19,7 +21,7 @@ def server_url() -> str:
"""
Provides the URL for the Letta server.
If LETTA_SERVER_URL is not set, starts the server in a background thread
and polls until its accepting connections.
and polls until it's accepting connections.
"""
def _run_server() -> None:
@@ -60,12 +62,11 @@ def client(server_url: str):
yield async_client_instance
async def test_pinecone_tool(client: AsyncLetta) -> None:
async def test_pinecone_tool(client: AsyncLetta, server_url: str) -> None:
"""
Test the Pinecone tool integration with the Letta client.
"""
with open("../../scripts/test-afs/knowledge-base.af", "rb") as f:
response = await client.agents.import_file(file=f)
response = upload_test_agentfile_from_disk(server_url, "knowledge-base.af")
agent_id = response.agent_ids[0]

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,121 @@
import os
import threading
import time
import httpx
import pytest
import requests
from dotenv import load_dotenv
from letta_client import Letta, MessageCreate, TextContent
from tests.helpers.utils import upload_test_agentfile_from_disk
RESEARCH_INSTRUCTIONS = "\n Lead Name: Kian Jones\n Lead Title: Software Engineer\n Lead LinkedIn URL: https://www.linkedin.com/in/kian-jones\n Company Name: Letta\n Company Domain: letta.com\n Company Industry: technology/software/ai\n \n**Research Instructions**\n"
DEEP_RESEARCH_INSTRUCTIONS = "Let's get started, we have to research mantis shrimps. I need to know everything there is, or my grandmother will die. Please begin immediately and do a great job, they are scaring me."
@pytest.fixture(scope="module")
def server_url() -> str:
"""
Provides the URL for the Letta server.
If LETTA_SERVER_URL is not set, starts the server in a background thread
and polls until it's accepting connections.
"""
def _run_server() -> None:
load_dotenv()
from letta.server.rest_api.app import start_server
start_server(debug=True)
url: str = os.getenv("LETTA_SERVER_URL", "http://localhost:8283")
if not os.getenv("LETTA_SERVER_URL"):
thread = threading.Thread(target=_run_server, daemon=True)
thread.start()
# Poll until the server is up (or timeout)
timeout_seconds = 30
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
resp = requests.get(url + "/v1/health")
if resp.status_code < 500:
break
except requests.exceptions.RequestException:
pass
time.sleep(0.1)
else:
raise RuntimeError(f"Could not reach {url} within {timeout_seconds}s")
return url
@pytest.fixture(scope="module")
def client(server_url: str) -> Letta:
"""
Creates and returns a synchronous Letta REST client for testing.
"""
client_instance = Letta(base_url=server_url)
return client_instance
async def test_deep_research_agent(client, server_url, disable_e2b_api_key):
imported_af = upload_test_agentfile_from_disk(server_url, "deep-thought.af")
agent_id = imported_af.agent_ids[0]
try:
response = client.agents.messages.create_stream(
agent_id=agent_id,
stream_tokens=True,
messages=[
MessageCreate(
role="user",
content=[
TextContent(
text=DEEP_RESEARCH_INSTRUCTIONS,
)
],
)
],
)
for chunk in response:
if chunk.message_type is not None:
print(chunk)
except httpx.ReadTimeout as e:
print("Timeout on create_stream. Consider enabling pings in create_stream if you have long running agents. ", e)
assert False
finally:
client.agents.delete(agent_id=agent_id)
async def test_11x_agent(client, server_url, disable_e2b_api_key):
imported_af = upload_test_agentfile_from_disk(server_url, "mock_alice.af")
agent_id = imported_af.agent_ids[0]
try:
response = client.agents.messages.create_stream(
agent_id=agent_id,
include_pings=True,
stream_tokens=True,
messages=[
MessageCreate(
role="user",
content=[
TextContent(
text=RESEARCH_INSTRUCTIONS,
)
],
)
],
)
for chunk in response:
if chunk.message_type is not None:
print(chunk)
except httpx.ReadTimeout as e:
print("Timeout on create_stream. Consider enabling pings in create_stream if you have long running agents. ", e)
assert False
finally:
client.agents.delete(agent_id=agent_id)