diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py index 36918df7..f47b7e27 100644 --- a/tests/helpers/utils.py +++ b/tests/helpers/utils.py @@ -1,7 +1,10 @@ import functools +import os import time from typing import Optional, Union +import requests + from letta.functions.functions import parse_source_code from letta.functions.schema_generator import generate_schema from letta.schemas.agent import AgentState, CreateAgent, UpdateAgent @@ -11,6 +14,7 @@ from letta.schemas.memory import ContextWindowOverview from letta.schemas.tool import Tool from letta.schemas.user import User from letta.schemas.user import User as PydanticUser +from letta.server.rest_api.routers.v1.agents import ImportedAgentsResponse from letta.server.server import SyncServer @@ -248,3 +252,29 @@ def validate_context_window_overview( # Check for tools assert overview.num_tokens_functions_definitions > 0 assert len(overview.functions_definitions) > 0 + + +def upload_test_agentfile_from_disk(server_url: str, filename: str) -> ImportedAgentsResponse: + """ + Upload a given .af file to live FastAPI server. + """ + path_to_current_file = os.path.dirname(__file__) + path_to_test_agent_files = path_to_current_file.removesuffix("/helpers") + "/test_agent_files" + file_path = os.path.join(path_to_test_agent_files, filename) + + with open(file_path, "rb") as f: + files = {"file": (filename, f, "application/json")} + + # Send parameters as form data instead of query parameters + form_data = { + "append_copy_suffix": "true", + "override_existing_tools": "false", + } + + response = requests.post( + f"{server_url}/v1/agents/import", + headers={"user_id": ""}, + files=files, + data=form_data, # Send as form data + ) + return ImportedAgentsResponse(**response.json()) diff --git a/tests/integration_test_pinecone_tool.py b/tests/integration_test_pinecone_tool.py index 3caf4d5f..c37bd382 100644 --- a/tests/integration_test_pinecone_tool.py +++ b/tests/integration_test_pinecone_tool.py @@ -10,6 +10,8 @@ from dotenv import load_dotenv from letta_client import AsyncLetta, MessageCreate, ReasoningMessage, ToolCallMessage from letta_client.core import RequestOptions +from tests.helpers.utils import upload_test_agentfile_from_disk + REASONING_THROTTLE_MS = 100 TEST_USER_MESSAGE = "What products or services does 11x AI sell?" @@ -19,7 +21,7 @@ def server_url() -> str: """ Provides the URL for the Letta server. If LETTA_SERVER_URL is not set, starts the server in a background thread - and polls until it’s accepting connections. + and polls until it's accepting connections. """ def _run_server() -> None: @@ -60,12 +62,11 @@ def client(server_url: str): yield async_client_instance -async def test_pinecone_tool(client: AsyncLetta) -> None: +async def test_pinecone_tool(client: AsyncLetta, server_url: str) -> None: """ Test the Pinecone tool integration with the Letta client. """ - with open("../../scripts/test-afs/knowledge-base.af", "rb") as f: - response = await client.agents.import_file(file=f) + response = upload_test_agentfile_from_disk(server_url, "knowledge-base.af") agent_id = response.agent_ids[0] diff --git a/tests/test_long_running_agents.py b/tests/test_long_running_agents.py new file mode 100644 index 00000000..04f0b9d6 --- /dev/null +++ b/tests/test_long_running_agents.py @@ -0,0 +1,121 @@ +import os +import threading +import time + +import httpx +import pytest +import requests +from dotenv import load_dotenv +from letta_client import Letta, MessageCreate, TextContent + +from tests.helpers.utils import upload_test_agentfile_from_disk + +RESEARCH_INSTRUCTIONS = "\n Lead Name: Kian Jones\n Lead Title: Software Engineer\n Lead LinkedIn URL: https://www.linkedin.com/in/kian-jones\n Company Name: Letta\n Company Domain: letta.com\n Company Industry: technology/software/ai\n \n**Research Instructions**\n" +DEEP_RESEARCH_INSTRUCTIONS = "Let's get started, we have to research mantis shrimps. I need to know everything there is, or my grandmother will die. Please begin immediately and do a great job, they are scaring me." + + +@pytest.fixture(scope="module") +def server_url() -> str: + """ + Provides the URL for the Letta server. + If LETTA_SERVER_URL is not set, starts the server in a background thread + and polls until it's accepting connections. + """ + + def _run_server() -> None: + load_dotenv() + from letta.server.rest_api.app import start_server + + start_server(debug=True) + + url: str = os.getenv("LETTA_SERVER_URL", "http://localhost:8283") + + if not os.getenv("LETTA_SERVER_URL"): + thread = threading.Thread(target=_run_server, daemon=True) + thread.start() + + # Poll until the server is up (or timeout) + timeout_seconds = 30 + deadline = time.time() + timeout_seconds + while time.time() < deadline: + try: + resp = requests.get(url + "/v1/health") + if resp.status_code < 500: + break + except requests.exceptions.RequestException: + pass + time.sleep(0.1) + else: + raise RuntimeError(f"Could not reach {url} within {timeout_seconds}s") + + return url + + +@pytest.fixture(scope="module") +def client(server_url: str) -> Letta: + """ + Creates and returns a synchronous Letta REST client for testing. + """ + client_instance = Letta(base_url=server_url) + return client_instance + + +async def test_deep_research_agent(client, server_url, disable_e2b_api_key): + imported_af = upload_test_agentfile_from_disk(server_url, "deep-thought.af") + + agent_id = imported_af.agent_ids[0] + + try: + response = client.agents.messages.create_stream( + agent_id=agent_id, + stream_tokens=True, + messages=[ + MessageCreate( + role="user", + content=[ + TextContent( + text=DEEP_RESEARCH_INSTRUCTIONS, + ) + ], + ) + ], + ) + for chunk in response: + if chunk.message_type is not None: + print(chunk) + except httpx.ReadTimeout as e: + print("Timeout on create_stream. Consider enabling pings in create_stream if you have long running agents. ", e) + assert False + finally: + client.agents.delete(agent_id=agent_id) + + +async def test_11x_agent(client, server_url, disable_e2b_api_key): + imported_af = upload_test_agentfile_from_disk(server_url, "mock_alice.af") + + agent_id = imported_af.agent_ids[0] + + try: + response = client.agents.messages.create_stream( + agent_id=agent_id, + include_pings=True, + stream_tokens=True, + messages=[ + MessageCreate( + role="user", + content=[ + TextContent( + text=RESEARCH_INSTRUCTIONS, + ) + ], + ) + ], + ) + for chunk in response: + if chunk.message_type is not None: + print(chunk) + except httpx.ReadTimeout as e: + print("Timeout on create_stream. Consider enabling pings in create_stream if you have long running agents. ", e) + assert False + finally: + client.agents.delete(agent_id=agent_id)