186 lines
5.6 KiB
Python
186 lines
5.6 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from pathlib import Path
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import pytest
|
|
from dotenv import load_dotenv
|
|
from faker import Faker
|
|
from letta_client import AsyncLetta
|
|
from tqdm import tqdm
|
|
|
|
from letta.schemas.embedding_config import EmbeddingConfig
|
|
from letta.schemas.llm_config import LLMConfig
|
|
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
|
|
|
|
|
# --- Server Management --- #
|
|
|
|
|
|
def _run_server():
|
|
"""Starts the Letta server in a background thread."""
|
|
load_dotenv()
|
|
from letta.server.rest_api.app import start_server
|
|
|
|
start_server(debug=True)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def server_url():
|
|
"""Ensures a server is running and returns its base URL."""
|
|
url = os.getenv("LETTA_SERVER_URL", "http://localhost:8283")
|
|
|
|
if not os.getenv("LETTA_SERVER_URL"):
|
|
thread = threading.Thread(target=_run_server, daemon=True)
|
|
thread.start()
|
|
time.sleep(2) # Allow server startup time
|
|
|
|
return url
|
|
|
|
|
|
# --- Client Setup --- #
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def client(server_url):
|
|
"""Creates a REST client for testing."""
|
|
client = AsyncLetta(base_url=server_url)
|
|
yield client
|
|
|
|
|
|
# --- Load Test --- #
|
|
|
|
NUM_AGENTS = 30
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_insert_archival_memories_concurrent(client):
|
|
fake = Faker()
|
|
|
|
# 1) Create agents
|
|
agent_ids = []
|
|
for i in tqdm(range(NUM_AGENTS), desc="Creating agents"):
|
|
agent = await client.agents.create(
|
|
name=f"complex_agent_{i}_{uuid.uuid4().hex[:6]}",
|
|
include_base_tools=True,
|
|
memory_blocks=[
|
|
{"label": "human", "value": "Name: Matt"},
|
|
{"label": "persona", "value": "Friendly agent"},
|
|
],
|
|
llm_config=LLMConfig.default_config("gpt-4o-mini"),
|
|
embedding_config=EmbeddingConfig.default_config(provider="openai"),
|
|
)
|
|
agent_ids.append(agent.id)
|
|
|
|
# 2) Measure start and duration of each call
|
|
timeline = []
|
|
|
|
async def measure(agent_index, aid):
|
|
t0 = time.perf_counter()
|
|
await client.agents.passages.create(agent_id=aid, text=fake.paragraph())
|
|
t1 = time.perf_counter()
|
|
timeline.append((agent_index, t0, t1 - t0))
|
|
|
|
await asyncio.gather(*(measure(idx, aid) for idx, aid in enumerate(agent_ids)))
|
|
|
|
# 3) Convert to arrays
|
|
timeline.sort(key=lambda x: x[0])
|
|
indices = np.array([t[0] for t in timeline])
|
|
starts = np.array([t[1] for t in timeline])
|
|
durs = np.array([t[2] for t in timeline])
|
|
start_offset = starts - starts.min()
|
|
|
|
print(f"Latency stats (s): min={durs.min():.3f}, mean={durs.mean():.3f}, " f"max={durs.max():.3f}, std={durs.std():.3f}")
|
|
|
|
# 4) Generate improved plots
|
|
# Helper: concurrency over time
|
|
events = np.concatenate([np.column_stack([starts, np.ones_like(starts)]), np.column_stack([starts + durs, -np.ones_like(durs)])])
|
|
events = events[events[:, 0].argsort()]
|
|
concurrency_t = np.cumsum(events[:, 1])
|
|
concurrency_x = events[:, 0] - starts.min()
|
|
|
|
# Helper: latency CDF
|
|
durs_sorted = np.sort(durs)
|
|
cdf_y = np.arange(1, len(durs_sorted) + 1) / len(durs_sorted)
|
|
|
|
# Plot all 6 subplots
|
|
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
|
|
axs = axes.ravel()
|
|
|
|
# 1) Kickoff timeline
|
|
axs[0].scatter(indices, start_offset, s=15)
|
|
axs[0].set_title("Kick-off timeline")
|
|
axs[0].set_xlabel("Call index")
|
|
axs[0].set_ylabel("Start offset (s)")
|
|
|
|
# 2) Per-call latency
|
|
axs[1].plot(indices, durs, marker="o", linestyle="")
|
|
axs[1].set_title("Per-call latency")
|
|
axs[1].set_xlabel("Call index")
|
|
axs[1].set_ylabel("Duration (s)")
|
|
|
|
# 3) Latency distribution (histogram)
|
|
axs[2].hist(durs, bins="auto")
|
|
axs[2].set_title("Latency distribution")
|
|
axs[2].set_xlabel("Duration (s)")
|
|
axs[2].set_ylabel("Count")
|
|
|
|
# 4) Empirical CDF
|
|
axs[3].step(durs_sorted, cdf_y, where="post")
|
|
axs[3].set_title("Latency CDF")
|
|
axs[3].set_xlabel("Duration (s)")
|
|
axs[3].set_ylabel("Fraction ≤ x")
|
|
|
|
# 5) Concurrency over time
|
|
axs[4].step(concurrency_x, concurrency_t, where="post")
|
|
axs[4].set_title("Concurrency vs. time")
|
|
axs[4].set_xlabel("Time since first start (s)")
|
|
axs[4].set_ylabel("# in-flight")
|
|
|
|
# 6) Summary stats
|
|
axs[5].axis("off")
|
|
summary_text = (
|
|
f"n = {len(durs)}\n"
|
|
f"min = {durs.min():.3f} s\n"
|
|
f"p50 = {np.percentile(durs, 50):.3f} s\n"
|
|
f"mean = {durs.mean():.3f} s\n"
|
|
f"p95 = {np.percentile(durs, 95):.3f} s\n"
|
|
f"max = {durs.max():.3f} s\n"
|
|
f"stdev = {durs.std():.3f} s"
|
|
)
|
|
axs[5].text(0.02, 0.98, summary_text, va="top", ha="left", fontsize=11, family="monospace", transform=axs[5].transAxes)
|
|
|
|
plt.tight_layout()
|
|
plt.savefig("latency_diagnostics.png", dpi=150)
|
|
print("Saved latency_diagnostics.png")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_insert_large_archival_memory(client):
|
|
# 1) Create 30 agents
|
|
agent = await client.agents.create(
|
|
include_base_tools=True,
|
|
memory_blocks=[
|
|
{"label": "human", "value": "Name: Matt"},
|
|
{"label": "persona", "value": "Friendly agent"},
|
|
],
|
|
llm_config=LLMConfig.default_config("gpt-4o-mini"),
|
|
embedding_config=EmbeddingConfig.default_config(provider="openai"),
|
|
)
|
|
|
|
file_path = Path(__file__).parent / "data" / "paper1.txt"
|
|
text = file_path.read_text()
|
|
|
|
t0 = time.perf_counter()
|
|
await client.agents.passages.create(agent_id=agent.id, text=text)
|
|
t1 = time.perf_counter()
|
|
|
|
print(f"Total time: {t1-t0}")
|