Files
letta-server/tests/test_client.py
jnjpng ff69c6a32e feat: add /agents/{agent_id}/generate endpoint for direct LLM requests (#9272)
* feat: add /agents/{agent_id}/generate endpoint for direct LLM requests

Add new endpoint that makes direct LLM provider requests without agent
context, memory, tools, or state modification. This enables:
- Quick LLM queries without agent overhead
- Testing model configurations
- Simple chat completions using agent's credentials
- Comparing responses across different models

Features:
- Uses agent's LLM config by default
- Supports model override with full provider config resolution
- Non-streaming, stateless operation
- Proper error handling and validation
- Request/response schemas with Pydantic validation

Implementation:
- Add GenerateRequest and GenerateResponse schemas
- Implement generate_completion endpoint handler
- Add necessary imports (LLMError, LLMClient, HandleNotFoundError)
- Include logging and comprehensive error handling

* fix: improve error handling and fix Message construction

- Fix critical bug: use content=[TextContent(text=...)] instead of text=...
- Add explicit error handling for NoResultFound and HandleNotFoundError
- Add error handling for convert_response_to_chat_completion
- Add structured logging for debugging
- Remove unnecessary .get() calls since Pydantic validates messages

* refactor: extract generate logic to AgentCompletionService

Move the generate endpoint business logic out of the endpoint handler
into a dedicated AgentCompletionService class for better code organization
and separation of concerns.

Changes:
- Create new AgentCompletionService in services/agent_completion_service.py
- Service handles all business logic: agent validation, LLM config resolution,
  message conversion, LLM client creation, and request/response processing
- Integrate service with SyncServer initialization
- Refactor generate_completion endpoint to use the service
- Endpoint now only handles HTTP concerns (auth, error mapping)

Benefits:
- Cleaner endpoint code (reduced from ~140 lines to ~25 lines)
- Better separation of concerns (HTTP vs business logic)
- Service logic can be reused or tested independently
- Follows established patterns in the codebase (AgentManager, etc.)

* feat: simplify generate API to accept just prompt text

Simplify the client interface by accepting a simple prompt string instead
of requiring clients to format messages.

Changes:
- Update GenerateRequest schema:
  - Replace 'messages' array with simple 'prompt' string
  - Add optional 'system_prompt' for context/instructions
  - Keep 'override_model' for model selection
- Update AgentCompletionService to format messages automatically:
  - Accepts prompt and optional system_prompt
  - Constructs message array internally (system + user messages)
  - Simpler API surface for clients
- Update endpoint documentation with new simplified examples
- Regenerate OpenAPI spec and TypeScript SDK

Benefits:
- Much simpler client experience - just send text
- No need to understand message formatting
- Still supports system prompts for context
- Cleaner API that matches common use cases

Example (before):
{
  "messages": [{"role": "user", "content": "What is 2+2?"}]
}

Example (after):
{
  "prompt": "What is 2+2?"
}

* test: add comprehensive integration tests for generate endpoint

Add 9 integration tests covering various scenarios:

Happy path tests:
- test_agent_generate_basic: Basic prompt -> response flow
- test_agent_generate_with_system_prompt: System prompt + user prompt
- test_agent_generate_with_model_override: Override model selection
- test_agent_generate_long_prompt: Handle longer prompts
- test_agent_generate_no_persistence: Verify no messages saved to agent

Error handling tests:
- test_agent_generate_empty_prompt_error: Empty prompt validation (422)
- test_agent_generate_invalid_agent_id: Invalid agent ID (404)
- test_agent_generate_invalid_model_override: Invalid model handle (404)

All tests verify:
- Response structure (content, model, usage)
- Proper status codes for errors
- Usage statistics (tokens, counts)
- No side effects on agent state

Tests follow existing test patterns in test_client.py and use the
letta_client SDK (assuming generate_completion method is auto-generated
from the OpenAPI spec).

* openapi

* refactor: rename AgentCompletionService to AgentGenerateCompletionManager

Rename for better clarity and consistency with codebase naming conventions:
- Rename file: agent_completion_service.py → agent_generate_completion_manager.py
- Rename class: AgentCompletionService → AgentGenerateCompletionManager
- Rename attribute: server.agent_completion_service → server.agent_generate_completion_manager
- Update docstrings: 'Service' → 'Manager'

Changes:
- apps/core/letta/services/agent_generate_completion_manager.py (renamed + updated class)
- apps/core/letta/server/server.py (import + initialization)
- apps/core/letta/server/rest_api/routers/v1/agents.py (usage in endpoint)

No functional changes, purely a naming refactor.

* fix: remove invalid Message parameters in generate manager

Remove agent_id=None and user_id=None from Message construction.
The Message model doesn't accept these as None values - only pass
required parameters (role, content).

Fixes validation error:
  'Extra inputs are not permitted [type=extra_forbidden, input_value=None]'

This aligns with other Message construction patterns in the codebase
(see tools.py, memory.py examples).

* feat: improve generate endpoint validation and tests

- Add field validator for whitespace-only prompts
- Always include system message (required by Anthropic)
- Use default "You are a helpful assistant." when no system_prompt provided
- Update tests to use direct HTTP calls via httpx
- Fix test issues:
  - Use valid agent ID format (agent-{uuid})
  - Use available model (openai/gpt-4o-mini)
  - Add whitespace validation test
- All 9 integration tests passing
2026-02-24 10:52:06 -08:00

1104 lines
41 KiB
Python

import json
import os
import threading
import uuid
from http.server import BaseHTTPRequestHandler, HTTPServer
import httpx
import pytest
from dotenv import load_dotenv
from letta_client import APIError, Letta
from letta_client.types import MessageCreateParam
from letta_client.types.agent_state import AgentState
from sqlalchemy import delete
from letta.orm import SandboxConfig, SandboxEnvironmentVariable
from tests.utils import wait_for_server
# Constants
SERVER_PORT = 8283
SANDBOX_DIR = "/tmp/sandbox"
UPDATED_SANDBOX_DIR = "/tmp/updated_sandbox"
ENV_VAR_KEY = "TEST_VAR"
UPDATED_ENV_VAR_KEY = "UPDATED_VAR"
ENV_VAR_VALUE = "test_value"
UPDATED_ENV_VAR_VALUE = "updated_value"
ENV_VAR_DESCRIPTION = "A test environment variable"
def run_server():
load_dotenv()
from letta.server.rest_api.app import start_server
print("Starting server...")
start_server(debug=True)
@pytest.fixture(
scope="module",
)
def mock_openai_server():
"""Local mock for the OpenAI API used by tests.
These tests should not require a real OPENAI_API_KEY.
We still exercise the OpenAI embeddings codepath by serving a minimal subset of the API.
"""
EMBED_DIM = 1536
class Handler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
# Silence noisy HTTP server logs during tests
return
def _send_json(self, status_code: int, payload: dict):
body = json.dumps(payload).encode("utf-8")
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self): # noqa: N802
# Support OpenAI model listing used during provider sync.
if self.path in ("/v1/models", "/models"):
self._send_json(
200,
{
"object": "list",
"data": [
{"id": "gpt-4o-mini", "object": "model", "context_length": 128000},
{"id": "gpt-4.1", "object": "model", "context_length": 128000},
{"id": "gpt-4o", "object": "model", "context_length": 128000},
],
},
)
return
self._send_json(404, {"error": {"message": f"Not found: {self.path}"}})
def do_POST(self): # noqa: N802
# Support embeddings endpoint
if self.path not in ("/v1/embeddings", "/embeddings"):
self._send_json(404, {"error": {"message": f"Not found: {self.path}"}})
return
content_len = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(content_len) if content_len else b"{}"
try:
req = json.loads(raw.decode("utf-8"))
except Exception:
self._send_json(400, {"error": {"message": "Invalid JSON"}})
return
inputs = req.get("input", [])
if isinstance(inputs, str):
inputs = [inputs]
if not isinstance(inputs, list):
self._send_json(400, {"error": {"message": "'input' must be a string or list"}})
return
data = [{"object": "embedding", "index": i, "embedding": [0.0] * EMBED_DIM} for i in range(len(inputs))]
self._send_json(
200,
{
"object": "list",
"data": data,
"model": req.get("model", "text-embedding-3-small"),
"usage": {"prompt_tokens": 0, "total_tokens": 0},
},
)
# Bind to an ephemeral port
server = HTTPServer(("127.0.0.1", 0), Handler)
host, port = server.server_address
base_url = f"http://{host}:{port}/v1"
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
# Ensure the Letta server uses this mock OpenAI endpoint.
# We *override* values here because a developer's local .env may contain a stale key.
prev_openai_api_key = os.environ.get("OPENAI_API_KEY")
prev_openai_base_url = os.environ.get("OPENAI_BASE_URL")
os.environ["OPENAI_API_KEY"] = "DUMMY_API_KEY"
os.environ["OPENAI_BASE_URL"] = base_url
yield base_url
# Restore env
if prev_openai_api_key is None:
os.environ.pop("OPENAI_API_KEY", None)
else:
os.environ["OPENAI_API_KEY"] = prev_openai_api_key
if prev_openai_base_url is None:
os.environ.pop("OPENAI_BASE_URL", None)
else:
os.environ["OPENAI_BASE_URL"] = prev_openai_base_url
server.shutdown()
server.server_close()
@pytest.fixture(
scope="module",
)
def client(request, mock_openai_server):
# Get URL from environment or start server
api_url = os.getenv("LETTA_API_URL")
server_url = os.getenv("LETTA_SERVER_URL", f"http://localhost:{SERVER_PORT}")
if not os.getenv("LETTA_SERVER_URL"):
print("Starting server thread")
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
wait_for_server(server_url)
print("Running client tests with server:", server_url)
# Overide the base_url if the LETTA_API_URL is set
base_url = api_url if api_url else server_url
# create the Letta client
yield Letta(base_url=base_url)
# Fixture for test agent
@pytest.fixture(scope="module")
def agent(client: Letta):
agent_state = client.agents.create(
name="test_client",
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
yield agent_state
# delete agent
client.agents.delete(agent_state.id)
# Fixture for test agent
@pytest.fixture
def search_agent_one(client: Letta):
agent_state = client.agents.create(
name="Search Agent One",
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
yield agent_state
# delete agent
client.agents.delete(agent_state.id)
# Fixture for test agent
@pytest.fixture
def search_agent_two(client: Letta):
agent_state = client.agents.create(
name="Search Agent Two",
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
yield agent_state
# delete agent
client.agents.delete(agent_state.id)
@pytest.fixture(autouse=True)
async def clear_tables():
"""Clear the sandbox tables before each test."""
from letta.server.db import db_registry
async with db_registry.async_session() as session:
await session.execute(delete(SandboxEnvironmentVariable))
await session.execute(delete(SandboxConfig))
# context manager now handles commits
# await session.commit()
# --------------------------------------------------------------------------------------------------------------------
# Agent tags
# --------------------------------------------------------------------------------------------------------------------
def test_add_and_manage_tags_for_agent(client: Letta):
"""
Comprehensive happy path test for adding, retrieving, and managing tags on an agent.
"""
tags_to_add = ["test_tag_1", "test_tag_2", "test_tag_3"]
# Step 0: create an agent with no tags
agent = client.agents.create(
memory_blocks=[],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
assert len(agent.tags) == 0
# Step 1: Add multiple tags to the agent
client.agents.update(agent_id=agent.id, tags=tags_to_add)
# Step 2: Retrieve tags for the agent and verify they match the added tags
retrieved_tags = client.agents.retrieve(agent_id=agent.id, include=["agent.tags"]).tags
assert set(retrieved_tags) == set(tags_to_add), f"Expected tags {tags_to_add}, but got {retrieved_tags}"
# Step 3: Retrieve agents by each tag to ensure the agent is associated correctly
for tag in tags_to_add:
agents_with_tag = client.agents.list(tags=[tag]).items
assert agent.id in [a.id for a in agents_with_tag], f"Expected agent {agent.id} to be associated with tag '{tag}'"
# Step 4: Delete a specific tag from the agent and verify its removal
tag_to_delete = tags_to_add.pop()
client.agents.update(agent_id=agent.id, tags=tags_to_add)
# Verify the tag is removed from the agent's tags
remaining_tags = client.agents.retrieve(agent_id=agent.id, include=["agent.tags"]).tags
assert tag_to_delete not in remaining_tags, f"Tag '{tag_to_delete}' was not removed as expected"
assert set(remaining_tags) == set(tags_to_add), f"Expected remaining tags to be {tags_to_add[1:]}, but got {remaining_tags}"
# Step 5: Delete all remaining tags from the agent
client.agents.update(agent_id=agent.id, tags=[])
# Verify all tags are removed
final_tags = client.agents.retrieve(agent_id=agent.id, include=["agent.tags"]).tags
assert len(final_tags) == 0, f"Expected no tags, but found {final_tags}"
# Remove agent
client.agents.delete(agent.id)
def test_agent_tags(client: Letta, clear_tables):
"""Test creating agents with tags and retrieving tags via the API."""
# Create multiple agents with different tags
agent1 = client.agents.create(
name=f"test_agent_{str(uuid.uuid4())}",
tags=["test", "agent1", "production"],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
agent2 = client.agents.create(
name=f"test_agent_{str(uuid.uuid4())}",
tags=["test", "agent2", "development"],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
agent3 = client.agents.create(
name=f"test_agent_{str(uuid.uuid4())}",
tags=["test", "agent3", "production"],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
# Test getting all tags
all_tags = client.tags.list()
# Filter out dynamic favorite:user tags since they contain user-specific UUIDs
all_tags_filtered = [tag for tag in all_tags if not tag.startswith("favorite:user:")]
expected_tags = ["agent1", "agent2", "agent3", "development", "origin:letta-chat", "production", "test", "view:letta-chat"]
print("ALL TAGS", all_tags)
print("EXPECTED TAGS", expected_tags)
assert sorted(all_tags_filtered) == expected_tags
# Test pagination
paginated_tags = client.tags.list(limit=2)
assert len(paginated_tags) == 2
assert paginated_tags[0] == "agent1"
assert paginated_tags[1] == "agent2"
# Test pagination with cursor
next_page_tags = client.tags.list(after="agent2", limit=2)
assert len(next_page_tags) == 2
assert next_page_tags[0] == "agent3"
assert next_page_tags[1] == "development"
# Test text search
prod_tags = client.tags.list(query_text="prod")
assert sorted(prod_tags) == ["production"]
dev_tags = client.tags.list(query_text="dev")
assert sorted(dev_tags) == ["development"]
agent_tags = client.tags.list(query_text="agent")
assert sorted(agent_tags) == ["agent1", "agent2", "agent3"]
# Remove agents
client.agents.delete(agent1.id)
client.agents.delete(agent2.id)
client.agents.delete(agent3.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent memory blocks
# --------------------------------------------------------------------------------------------------------------------
def test_shared_blocks(disable_e2b_api_key, client: Letta):
# create a block
block = client.blocks.create(label="human", value="username: sarah")
# create agents with shared block
agent_state1 = client.agents.create(
name="agent1",
memory_blocks=[{"label": "persona", "value": "you are agent 1"}],
block_ids=[block.id],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
agent_state2 = client.agents.create(
name="agent2",
memory_blocks=[{"label": "persona", "value": "you are agent 2"}],
block_ids=[block.id],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
)
# update memory
client.agents.messages.create(agent_id=agent_state1.id, messages=[{"role": "user", "content": "my name is actually charles"}])
# check agent 2 memory
assert "charles" in client.agents.blocks.retrieve(agent_id=agent_state2.id, block_label="human").value.lower()
# cleanup
client.agents.delete(agent_state1.id)
client.agents.delete(agent_state2.id)
def test_update_agent_memory_label(client: Letta):
"""Test that we can update the label of a block in an agent's memory"""
agent = client.agents.create(
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
memory_blocks=[{"label": "human", "value": ""}],
)
try:
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id).items]
example_label = current_labels[0]
example_new_label = "example_new_label"
assert example_new_label not in [b.label for b in client.agents.blocks.list(agent_id=agent.id).items]
client.agents.blocks.update(agent_id=agent.id, block_label=example_label, label=example_new_label)
updated_blocks = client.agents.blocks.list(agent_id=agent.id)
assert example_new_label in [b.label for b in updated_blocks.items]
finally:
client.agents.delete(agent.id)
def test_attach_detach_agent_memory_block(client: Letta, agent: AgentState):
"""Test that we can add and remove a block from an agent's memory"""
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id).items]
example_new_label = current_labels[0] + "_v2"
example_new_value = "example value"
assert example_new_label not in current_labels
# Link a new memory block
block = client.blocks.create(
label=example_new_label,
value=example_new_value,
limit=1000,
)
updated_agent = client.agents.blocks.attach(
agent_id=agent.id,
block_id=block.id,
)
assert example_new_label in [block.label for block in client.agents.blocks.list(agent_id=updated_agent.id).items]
# Now unlink the block
updated_agent = client.agents.blocks.detach(
agent_id=agent.id,
block_id=block.id,
)
assert example_new_label not in [block.label for block in client.agents.blocks.list(agent_id=updated_agent.id).items]
def test_update_agent_memory_limit(client: Letta):
"""Test that we can update the limit of a block in an agent's memory"""
agent = client.agents.create(
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
memory_blocks=[
{"label": "human", "value": "username: sarah", "limit": 1000},
{"label": "persona", "value": "you are sarah", "limit": 1000},
],
)
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id).items]
example_label = current_labels[0]
example_new_limit = 1
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id).items]
example_label = current_labels[0]
example_new_limit = 1
current_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label)
current_block_length = len(current_block.value)
assert example_new_limit != current_block.limit
assert example_new_limit < current_block_length
# We expect this to throw a value error
with pytest.raises(APIError):
client.agents.blocks.update(
agent_id=agent.id,
block_label=example_label,
limit=example_new_limit,
)
# Now try the same thing with a higher limit
example_new_limit = current_block_length + 10000
assert example_new_limit > current_block_length
client.agents.blocks.update(
agent_id=agent.id,
block_label=example_label,
limit=example_new_limit,
)
assert example_new_limit == client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label).limit
client.agents.delete(agent.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent Tools
# --------------------------------------------------------------------------------------------------------------------
def test_function_always_error(client: Letta):
"""Test to see if function that errors works correctly"""
def testing_method():
"""
Call this tool when the user asks
"""
return 5 / 0
tool = client.tools.upsert_from_function(func=testing_method)
agent = client.agents.create(
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
memory_blocks=[
{
"label": "human",
"value": "username: sarah",
},
{
"label": "persona",
"value": "you are sarah",
},
],
tool_ids=[tool.id],
)
print("AGENT TOOLS", [tool.name for tool in agent.tools])
# get function response
response = client.agents.messages.create(
agent_id=agent.id,
messages=[MessageCreateParam(role="user", content="call the testing_method function and tell me the result")],
)
print(response.messages)
response_message = None
for message in response.messages:
if message.message_type == "tool_return_message":
response_message = message
break
assert response_message, "ToolReturnMessage message not found in response"
assert response_message.status == "error"
# TODO: add this back
# assert "Error executing function testing_method" in response_message.tool_return, response_message.tool_return
assert "division by zero" in response_message.stderr[0]
client.agents.delete(agent_id=agent.id)
def test_attach_detach_agent_tool(client: Letta, agent: AgentState):
"""Test that we can attach and detach a tool from an agent"""
try:
# Create a tool
def example_tool(x: int) -> int:
"""
This is an example tool.
Parameters:
x (int): The input value.
Returns:
int: The output value.
"""
return x * 2
tool = client.tools.upsert_from_function(func=example_tool)
# Initially tool should not be attached
initial_tools = client.agents.tools.list(agent_id=agent.id).items
assert tool.id not in [t.id for t in initial_tools]
# Attach tool
client.agents.tools.attach(agent_id=agent.id, tool_id=tool.id)
new_agent_state = client.agents.retrieve(agent_id=agent.id, include=["agent.tools"])
assert tool.id in [t.id for t in new_agent_state.tools]
# Verify tool is attached
updated_tools = client.agents.tools.list(agent_id=agent.id).items
assert tool.id in [t.id for t in updated_tools]
# Detach tool
client.agents.tools.detach(agent_id=agent.id, tool_id=tool.id)
new_agent_state = client.agents.retrieve(agent_id=agent.id, include=["agent.tools"])
assert tool.id not in [t.id for t in new_agent_state.tools]
# Verify tool is detached
final_tools = client.agents.tools.list(agent_id=agent.id).items
assert tool.id not in [t.id for t in final_tools]
finally:
client.tools.delete(tool.id)
# --------------------------------------------------------------------------------------------------------------------
# AgentMessages
# --------------------------------------------------------------------------------------------------------------------
def test_messages(client: Letta, agent: AgentState):
# _reset_config()
send_message_response = client.agents.messages.create(
agent_id=agent.id, messages=[MessageCreateParam(role="user", content="Test message")]
)
assert send_message_response, "Sending message failed"
messages_response = client.agents.messages.list(agent_id=agent.id, limit=1).items
assert len(messages_response) > 0, "Retrieving messages failed"
# search_response = list(client.messages.search(query="test"))
# assert len(search_response) > 0, "Searching messages failed"
# for result in search_response:
# assert result.agent_id == agent.id
# assert result.created_at
# TODO: Add back when new agent loop hits
# @pytest.mark.asyncio
# async def test_send_message_parallel(client: Letta, agent: AgentState, request):
# """
# Test that sending two messages in parallel does not error.
# """
#
# # Define a coroutine for sending a message using asyncio.to_thread for synchronous calls
# async def send_message_task(message: str):
# response = await asyncio.to_thread(
# client.agents.messages.create, agent_id=agent.id, messages=[MessageCreateParam(role="user", content=message)]
# )
# assert response, f"Sending message '{message}' failed"
# return response
#
# # Prepare two tasks with different messages
# messages = ["Test message 1", "Test message 2"]
# tasks = [send_message_task(message) for message in messages]
#
# # Run the tasks concurrently
# responses = await asyncio.gather(*tasks, return_exceptions=True)
#
# # Check for exceptions and validate responses
# for i, response in enumerate(responses):
# if isinstance(response, Exception):
# pytest.fail(f"Task {i} failed with exception: {response}")
# else:
# assert response, f"Task {i} returned an invalid response: {response}"
#
# # Ensure both tasks completed
# assert len(responses) == len(messages), "Not all messages were processed"
# ----------------------------------------------------------------------------------------------------
# Agent listing
# ----------------------------------------------------------------------------------------------------
def test_agent_listing(client: Letta, agent, search_agent_one, search_agent_two):
"""Test listing agents with pagination and query text filtering."""
# Test query text filtering
search_results = client.agents.list(query_text="search agent").items
assert len(search_results) == 2
search_agent_ids = {agent.id for agent in search_results}
assert search_agent_one.id in search_agent_ids
assert search_agent_two.id in search_agent_ids
assert agent.id not in search_agent_ids
different_results = client.agents.list(query_text="client").items
assert len(different_results) == 1
assert different_results[0].id == agent.id
# Test pagination
first_page = client.agents.list(query_text="search agent", limit=1).items
assert len(first_page) == 1
first_agent = first_page[0]
second_page = client.agents.list(query_text="search agent", after=first_agent.id, limit=1).items # Use agent ID as cursor
assert len(second_page) == 1
assert second_page[0].id != first_agent.id
# Verify we got both search agents with no duplicates
all_ids = {first_page[0].id, second_page[0].id}
assert len(all_ids) == 2
assert all_ids == {search_agent_one.id, search_agent_two.id}
# Test listing without any filters; make less flakey by checking we have at least 3 agents in case created elsewhere
all_agents = client.agents.list().items
assert len(all_agents) >= 3
assert all(agent.id in {a.id for a in all_agents} for agent in [search_agent_one, search_agent_two, agent])
def test_agent_creation(client: Letta):
"""Test that block IDs are properly attached when creating an agent."""
# Create a test block that will represent user preferences
user_preferences_block = client.blocks.create(label="user_preferences", value="", limit=10000)
# Create test tools
def test_tool():
"""A simple test tool."""
return "Hello from test tool!"
def another_test_tool():
"""Another test tool."""
return "Hello from another test tool!"
tool1 = client.tools.upsert_from_function(func=test_tool, tags=["test"])
tool2 = client.tools.upsert_from_function(func=another_test_tool, tags=["test"])
# Create agent with the blocks and tools
agent = client.agents.create(
memory_blocks=[
{
"label": "human",
"value": "you are a human",
},
{"label": "persona", "value": "you are an assistant"},
],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
tool_ids=[tool1.id, tool2.id],
include_base_tools=False,
tags=["test"],
block_ids=[user_preferences_block.id],
)
memory_blocks = agent.memory.blocks
# Verify the agent was created successfully
assert agent is not None
assert agent.id is not None
# Verify the blocks are properly attached
agent_blocks = client.agents.blocks.list(agent_id=agent.id).items
agent_block_ids = {block.id for block in agent_blocks}
# Check that all memory blocks are present
memory_block_ids = {block.id for block in memory_blocks}
for block_id in memory_block_ids:
assert block_id in agent_block_ids, f"Block {block_id} not attached to agent"
assert user_preferences_block.id in agent_block_ids, f"User preferences block {user_preferences_block.id} not attached to agent"
# Verify the tools are properly attached
agent_tools = client.agents.tools.list(agent_id=agent.id).items
assert len(agent_tools) == 2
tool_ids = {tool1.id, tool2.id}
assert all(tool.id in tool_ids for tool in agent_tools)
client.agents.delete(agent_id=agent.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent Initial Message Sequence
# --------------------------------------------------------------------------------------------------------------------
def test_initial_sequence(client: Letta):
# create an agent
agent = client.agents.create(
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
initial_message_sequence=[
MessageCreateParam(
role="assistant",
content="Hello, how are you?",
),
MessageCreateParam(role="user", content="I'm good, and you?"),
],
)
# list messages
messages = client.agents.messages.list(agent_id=agent.id).items
response = client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreateParam(
role="user",
content="hello assistant!",
)
],
)
assert len(messages) == 3
assert messages[0].message_type == "system_message"
assert messages[1].message_type == "assistant_message"
assert messages[2].message_type == "user_message"
# TODO: Add back when timezone packing is standardized/settled
# def test_timezone(client: Letta):
# agent = client.agents.create(
# memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
# model="anthropic/claude-haiku-4-5-20251001",
# embedding="openai/text-embedding-3-small",
# timezone="America/Los_Angeles",
# )
#
# agent = client.agents.retrieve(agent_id=agent.id)
# assert agent.timezone == "America/Los_Angeles"
#
# response = client.agents.messages.create(
# agent_id=agent.id,
# messages=[
# MessageCreateParam(
# role="user",
# content="What timezone are you in?",
# )
# ],
# )
# # second message is assistant message
# assert response.messages[1].message_type == "assistant_message"
#
# pacific_tz_indicators = {"America/Los_Angeles", "PDT", "PST", "PT", "Pacific Daylight Time", "Pacific Standard Time", "Pacific Time"}
# content = response.messages[1].content
# assert any(tz in content for tz in pacific_tz_indicators), (
# f"Response content: {response.messages[1].content} does not contain expected timezone"
# )
#
# # test updating the timezone
# client.agents.update(agent_id=agent.id, timezone="America/New_York")
# agent = client.agents.retrieve(agent_id=agent.id)
# assert agent.timezone == "America/New_York"
def test_agent_timezone_none_no_message_packing(client: Letta):
"""Test that agent created without timezone has timezone=None and messages are not JSON wrapped."""
agent = client.agents.create(
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
# No timezone specified
)
try:
# Verify timezone is None
retrieved_agent = client.agents.retrieve(agent_id=agent.id)
assert retrieved_agent.timezone is None, f"Expected timezone to be None, got {retrieved_agent.timezone}"
# Send a message
test_message = "Hello, this is a test message without timezone"
client.agents.messages.create(
agent_id=agent.id,
messages=[MessageCreateParam(role="user", content=test_message)],
)
# List messages and find the user message
messages = client.agents.messages.list(agent_id=agent.id).items
user_messages = [m for m in messages if m.message_type == "user_message"]
assert len(user_messages) > 0, "Expected at least one user message"
# The user message content should be the raw text (not JSON wrapped)
# When timezone is None, the message is stored as-is and retrieved as-is
latest_user_message = user_messages[0]
assert latest_user_message.content == test_message, f"Expected raw message '{test_message}', got '{latest_user_message.content}'"
finally:
client.agents.delete(agent_id=agent.id)
def test_agent_timezone_set_message_packing(client: Letta):
"""Test that agent created with timezone has messages JSON wrapped with timestamp."""
agent = client.agents.create(
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
timezone="America/Los_Angeles",
)
try:
# Verify timezone is set
retrieved_agent = client.agents.retrieve(agent_id=agent.id)
assert retrieved_agent.timezone == "America/Los_Angeles", f"Expected timezone 'America/Los_Angeles', got {retrieved_agent.timezone}"
# Send a message
test_message = "Hello, this is a test message with timezone"
client.agents.messages.create(
agent_id=agent.id,
messages=[MessageCreateParam(role="user", content=test_message)],
)
# List messages and find the user message
messages = client.agents.messages.list(agent_id=agent.id).items
user_messages = [m for m in messages if m.message_type == "user_message"]
assert len(user_messages) > 0, "Expected at least one user message"
# The user message content should be unpacked to just the message text
# (The API unpacks the JSON wrapper before returning)
latest_user_message = user_messages[0]
assert latest_user_message.content == test_message, (
f"Expected unpacked message '{test_message}', got '{latest_user_message.content}'"
)
# Test that updating timezone works
client.agents.update(agent_id=agent.id, timezone="America/New_York")
updated_agent = client.agents.retrieve(agent_id=agent.id)
assert updated_agent.timezone == "America/New_York", f"Expected updated timezone 'America/New_York', got {updated_agent.timezone}"
finally:
client.agents.delete(agent_id=agent.id)
def test_attach_sleeptime_block(client: Letta):
agent = client.agents.create(
memory_blocks=[{"label": "human", "value": ""}, {"label": "persona", "value": ""}],
model="anthropic/claude-haiku-4-5-20251001",
embedding="openai/text-embedding-3-small",
enable_sleeptime=True,
)
# get the sleeptime agent
# get the multi-agent group
group_id = agent.multi_agent_group.id
group = client.groups.retrieve(group_id=group_id)
agent_ids = group.agent_ids
sleeptime_id = [id for id in agent_ids if id != agent.id][0]
# attach a new block
block = client.blocks.create(label="test", value="test") # , project_id="test")
client.agents.blocks.attach(agent_id=agent.id, block_id=block.id)
# verify block is attached to both agents
blocks = client.agents.blocks.list(agent_id=agent.id).items
assert block.id in [b.id for b in blocks]
blocks = client.agents.blocks.list(agent_id=sleeptime_id).items
assert block.id in [b.id for b in blocks]
# blocks = client.blocks.list(project_id="test")
# assert block.id in [b.id for b in blocks]
# cleanup
client.agents.delete(agent.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent Generate Endpoint Tests
# --------------------------------------------------------------------------------------------------------------------
def test_agent_generate_basic(client: Letta, agent: AgentState):
"""Test basic generate endpoint with simple prompt."""
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={"prompt": "What is 2+2?"},
timeout=30.0,
)
# Verify successful response
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
response_data = response.json()
# Verify response structure
assert response_data is not None
assert "content" in response_data
assert "model" in response_data
assert "usage" in response_data
# Verify content is returned
assert response_data["content"] is not None
assert len(response_data["content"]) > 0
assert isinstance(response_data["content"], str)
# Verify model is set
assert response_data["model"] is not None
assert isinstance(response_data["model"], str)
# Verify usage statistics
assert response_data["usage"] is not None
assert response_data["usage"]["total_tokens"] > 0
assert response_data["usage"]["prompt_tokens"] > 0
assert response_data["usage"]["completion_tokens"] > 0
def test_agent_generate_with_system_prompt(client: Letta, agent: AgentState):
"""Test generate endpoint with system prompt."""
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={
"prompt": "What is your role?",
"system_prompt": "You are a helpful math tutor who always responds with exactly 5 words.",
},
timeout=30.0,
)
# Verify successful response
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
response_data = response.json()
# Verify response
assert response_data is not None
assert response_data["content"] is not None
assert len(response_data["content"]) > 0
# Verify usage includes system prompt tokens
assert response_data["usage"]["prompt_tokens"] > 10 # Should include system prompt tokens
def test_agent_generate_with_model_override(client: Letta, agent: AgentState):
"""Test generate endpoint with model override."""
# Get the agent's current model
original_model = agent.llm_config.model
# Use OpenAI model (more likely to be available in test environment)
override_model_handle = "openai/gpt-4o-mini"
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={
"prompt": "Say hello",
"override_model": override_model_handle,
},
timeout=30.0,
)
# Verify successful response
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
response_data = response.json()
# Verify response
assert response_data is not None
assert response_data["content"] is not None
# Verify the override model was used (model name should be different from original)
# Note: The actual model name in response might be the full model name, not the handle
assert response_data["model"] is not None
def test_agent_generate_empty_prompt_error(client: Letta, agent: AgentState):
"""Test that empty prompt returns validation error."""
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={"prompt": ""}, # Empty prompt should fail validation
timeout=30.0,
)
# Verify it's a validation error (422)
assert response.status_code == 422, f"Expected 422, got {response.status_code}: {response.text}"
def test_agent_generate_whitespace_prompt_error(client: Letta, agent: AgentState):
"""Test that whitespace-only prompt returns validation error."""
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={"prompt": " \n\t "}, # Whitespace-only prompt should fail validation
timeout=30.0,
)
# Verify it's a validation error (422)
assert response.status_code == 422, f"Expected 422, got {response.status_code}: {response.text}"
def test_agent_generate_invalid_agent_id(client: Letta):
"""Test that invalid agent ID returns 404."""
# Use properly formatted agent ID that doesn't exist
fake_agent_id = "agent-00000000-0000-4000-8000-000000000000"
response = httpx.post(
f"{client._client._base_url}/v1/agents/{fake_agent_id}/generate",
json={"prompt": "Hello"},
timeout=30.0,
)
# Verify it's a not found error (404)
assert response.status_code == 404, f"Expected 404, got {response.status_code}: {response.text}"
assert "not found" in response.text.lower()
def test_agent_generate_invalid_model_override(client: Letta, agent: AgentState):
"""Test that invalid model override returns 404."""
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={
"prompt": "Hello",
"override_model": "invalid/model-that-does-not-exist",
},
timeout=30.0,
)
# Verify it's a not found error (404)
assert response.status_code == 404, f"Expected 404, got {response.status_code}: {response.text}"
assert "not found" in response.text.lower() or "not accessible" in response.text.lower()
def test_agent_generate_long_prompt(client: Letta, agent: AgentState):
"""Test generate endpoint with a longer prompt."""
# Create a longer prompt
long_prompt = " ".join(["This is a test sentence."] * 50)
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={"prompt": long_prompt},
timeout=30.0,
)
# Verify successful response
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
response_data = response.json()
# Verify response
assert response_data is not None
assert response_data["content"] is not None
# Verify token usage reflects the longer prompt
assert response_data["usage"]["prompt_tokens"] > 100 # Should have substantial prompt tokens
def test_agent_generate_no_persistence(client: Letta, agent: AgentState):
"""Test that generate endpoint does not persist messages to agent."""
# Get initial message count
initial_messages = client.agents.messages.list(agent_id=agent.id).items
initial_count = len(initial_messages)
# Make a generate request
response = httpx.post(
f"{client._client._base_url}/v1/agents/{agent.id}/generate",
json={"prompt": "This should not be saved to agent memory"},
timeout=30.0,
)
# Verify successful response
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
response_data = response.json()
# Verify response was generated
assert response_data is not None
assert response_data["content"] is not None
# Verify no new messages were added to the agent
final_messages = client.agents.messages.list(agent_id=agent.id).items
final_count = len(final_messages)
assert final_count == initial_count, "Generate endpoint should not persist messages"