* Adjust naming * Add testing and improve message search * Adjust comments * Change query text to query * Fern autogen
2180 lines
79 KiB
Python
2180 lines
79 KiB
Python
import io
|
|
import json
|
|
import os
|
|
import textwrap
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from typing import List, Type
|
|
|
|
import pytest
|
|
from dotenv import load_dotenv
|
|
from letta_client import (
|
|
ContinueToolRule,
|
|
CreateBlock,
|
|
Letta as LettaSDKClient,
|
|
LettaRequest,
|
|
MaxCountPerStepToolRule,
|
|
MessageCreate,
|
|
TerminalToolRule,
|
|
TextContent,
|
|
)
|
|
from letta_client.client import BaseTool
|
|
from letta_client.core import ApiError
|
|
from letta_client.types import AgentState, ToolReturnMessage
|
|
from pydantic import BaseModel, Field
|
|
|
|
from tests.helpers.utils import upload_file_and_wait
|
|
|
|
# Constants
|
|
SERVER_PORT = 8283
|
|
|
|
|
|
def pytest_configure(config):
|
|
"""Override asyncio settings for this test file"""
|
|
# config.option.asyncio_default_fixture_loop_scope = "function"
|
|
config.option.asyncio_default_test_loop_scope = "function"
|
|
|
|
|
|
def run_server():
|
|
load_dotenv()
|
|
|
|
from letta.server.rest_api.app import start_server
|
|
|
|
print("Starting server...")
|
|
start_server(debug=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def client() -> LettaSDKClient:
|
|
# Get URL from environment or start server
|
|
server_url = os.getenv("LETTA_SERVER_URL", f"http://localhost:{SERVER_PORT}")
|
|
if not os.getenv("LETTA_SERVER_URL"):
|
|
print("Starting server thread")
|
|
thread = threading.Thread(target=run_server, daemon=True)
|
|
thread.start()
|
|
time.sleep(5)
|
|
|
|
print("Running client tests with server:", server_url)
|
|
client = LettaSDKClient(base_url=server_url, token=None, timeout=300.0)
|
|
yield client
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def agent(client: LettaSDKClient):
|
|
agent_state = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: sarah",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
yield agent_state
|
|
|
|
# delete agent
|
|
client.agents.delete(agent_id=agent_state.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def fibonacci_tool(client: LettaSDKClient):
|
|
"""Fixture providing Fibonacci calculation tool."""
|
|
|
|
def calculate_fibonacci(n: int) -> int:
|
|
"""Calculate the nth Fibonacci number.
|
|
|
|
Args:
|
|
n: The position in the Fibonacci sequence to calculate.
|
|
|
|
Returns:
|
|
The nth Fibonacci number.
|
|
"""
|
|
if n <= 0:
|
|
return 0
|
|
elif n == 1:
|
|
return 1
|
|
else:
|
|
a, b = 0, 1
|
|
for _ in range(2, n + 1):
|
|
a, b = b, a + b
|
|
return b
|
|
|
|
tool = client.tools.upsert_from_function(func=calculate_fibonacci, tags=["math", "utility"])
|
|
yield tool
|
|
client.tools.delete(tool.id)
|
|
|
|
|
|
def test_messages_search(client: LettaSDKClient, agent: AgentState):
|
|
"""Exercise org-wide message search with query and filters.
|
|
|
|
Skips when Turbopuffer/OpenAI are not configured or unavailable in this environment.
|
|
"""
|
|
from datetime import timezone
|
|
|
|
from letta.settings import model_settings, settings
|
|
|
|
# Require TPUF + OpenAI to be configured; otherwise this is a cloud-only feature
|
|
if not getattr(settings, "tpuf_api_key", None) or not getattr(model_settings, "openai_api_key", None):
|
|
pytest.skip("Message search requires Turbopuffer and OpenAI; skipping.")
|
|
|
|
original_use_tpuf = settings.use_tpuf
|
|
original_embed_all = settings.embed_all_messages
|
|
try:
|
|
# Enable TPUF + message embedding for this test run
|
|
settings.use_tpuf = True
|
|
settings.embed_all_messages = True
|
|
|
|
unique_term = f"kitten-cats-{uuid.uuid4().hex[:8]}"
|
|
|
|
# Create a couple of messages to search over
|
|
client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[MessageCreate(role="user", content=f"I love {unique_term} dearly")],
|
|
)
|
|
client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[MessageCreate(role="user", content=f"Recorded preference: {unique_term}")],
|
|
)
|
|
|
|
# Allow brief time for background indexing (if enabled)
|
|
time.sleep(2)
|
|
|
|
# Call the SDK using the OpenAPI fields
|
|
results = client.agents.messages.search(
|
|
query=unique_term,
|
|
search_mode="hybrid",
|
|
roles=["user"],
|
|
project_id=agent.project_id,
|
|
limit=10,
|
|
start_date=None,
|
|
end_date=None,
|
|
)
|
|
|
|
# Validate shape of response
|
|
assert isinstance(results, list) and len(results) >= 1
|
|
top = results[0]
|
|
assert getattr(top, "message", None) is not None
|
|
assert top.message.role == "user" # role filter applied
|
|
assert hasattr(top, "rrf_score") and top.rrf_score is not None
|
|
finally:
|
|
settings.use_tpuf = original_use_tpuf
|
|
settings.embed_all_messages = original_embed_all
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def preferences_tool(client: LettaSDKClient):
|
|
"""Fixture providing user preferences tool."""
|
|
|
|
def get_user_preferences(category: str) -> str:
|
|
"""Get user preferences for a specific category.
|
|
|
|
Args:
|
|
category: The preference category to retrieve (notification, theme, language).
|
|
|
|
Returns:
|
|
The user's preference for the specified category, or "not specified" if unknown.
|
|
"""
|
|
preferences = {"notification": "email only", "theme": "dark mode", "language": "english"}
|
|
return preferences.get(category, "not specified")
|
|
|
|
tool = client.tools.upsert_from_function(func=get_user_preferences, tags=["user", "preferences"])
|
|
yield tool
|
|
client.tools.delete(tool.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def data_analysis_tool(client: LettaSDKClient):
|
|
"""Fixture providing data analysis tool."""
|
|
|
|
def analyze_data(data_type: str, values: List[float]) -> str:
|
|
"""Analyze data and provide insights.
|
|
|
|
Args:
|
|
data_type: Type of data to analyze.
|
|
values: Numerical values to analyze.
|
|
|
|
Returns:
|
|
Analysis results including average, max, and min values.
|
|
"""
|
|
if not values:
|
|
return "No data provided"
|
|
avg = sum(values) / len(values)
|
|
max_val = max(values)
|
|
min_val = min(values)
|
|
return f"Analysis of {data_type}: avg={avg:.2f}, max={max_val}, min={min_val}"
|
|
|
|
tool = client.tools.upsert_from_function(func=analyze_data, tags=["analysis", "data"])
|
|
yield tool
|
|
client.tools.delete(tool.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def persona_block(client: LettaSDKClient):
|
|
"""Fixture providing persona memory block."""
|
|
block = client.blocks.create(
|
|
label="persona",
|
|
value="You are Alex, a data analyst and mathematician who helps users with calculations and insights. You have extensive experience in statistical analysis and prefer to provide clear, accurate results.",
|
|
limit=8000,
|
|
)
|
|
yield block
|
|
client.blocks.delete(block.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def human_block(client: LettaSDKClient):
|
|
"""Fixture providing human memory block."""
|
|
block = client.blocks.create(
|
|
label="human",
|
|
value="username: sarah_researcher\noccupation: data scientist\ninterests: machine learning, statistics, fibonacci sequences\npreferred_communication: detailed explanations with examples",
|
|
limit=4000,
|
|
)
|
|
yield block
|
|
client.blocks.delete(block.id)
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def context_block(client: LettaSDKClient):
|
|
"""Fixture providing project context memory block."""
|
|
block = client.blocks.create(
|
|
label="project_context",
|
|
value="Current project: Building predictive models for financial markets. Sarah is working on sequence analysis and pattern recognition. Recently interested in mathematical sequences like Fibonacci for trend analysis.",
|
|
limit=6000,
|
|
)
|
|
yield block
|
|
client.blocks.delete(block.id)
|
|
|
|
|
|
def test_shared_blocks(client: LettaSDKClient):
|
|
# create a block
|
|
block = client.blocks.create(
|
|
label="human",
|
|
value="username: sarah",
|
|
)
|
|
|
|
# create agents with shared block
|
|
agent_state1 = client.agents.create(
|
|
name="agent1",
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="persona",
|
|
value="you are agent 1",
|
|
),
|
|
],
|
|
block_ids=[block.id],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
agent_state2 = client.agents.create(
|
|
name="agent2",
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="persona",
|
|
value="you are agent 2",
|
|
),
|
|
],
|
|
block_ids=[block.id],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
|
|
# update memory
|
|
client.agents.messages.create(
|
|
agent_id=agent_state1.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="my name is actually charles",
|
|
)
|
|
],
|
|
)
|
|
|
|
# check agent 2 memory
|
|
block_value = client.blocks.retrieve(block_id=block.id).value
|
|
assert "charles" in block_value.lower(), f"Shared block update failed {block_value}"
|
|
|
|
client.agents.messages.create(
|
|
agent_id=agent_state2.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="whats my name?",
|
|
)
|
|
],
|
|
)
|
|
block_value = client.agents.blocks.retrieve(agent_id=agent_state2.id, block_label="human").value
|
|
assert "charles" in block_value.lower(), f"Shared block update failed {block_value}"
|
|
|
|
# cleanup
|
|
client.agents.delete(agent_state1.id)
|
|
client.agents.delete(agent_state2.id)
|
|
|
|
|
|
def test_read_only_block(client: LettaSDKClient):
|
|
block_value = "username: sarah"
|
|
agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value=block_value,
|
|
read_only=True,
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
|
|
# make sure agent cannot update read-only block
|
|
client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="my name is actually charles",
|
|
)
|
|
],
|
|
)
|
|
|
|
# make sure block value is still the same
|
|
block = client.agents.blocks.retrieve(agent_id=agent.id, block_label="human")
|
|
assert block.value == block_value
|
|
|
|
# make sure can update from client
|
|
new_value = "hello"
|
|
client.agents.blocks.modify(agent_id=agent.id, block_label="human", value=new_value)
|
|
block = client.agents.blocks.retrieve(agent_id=agent.id, block_label="human")
|
|
assert block.value == new_value
|
|
|
|
# cleanup
|
|
client.agents.delete(agent.id)
|
|
|
|
|
|
def test_add_and_manage_tags_for_agent(client: LettaSDKClient):
|
|
"""
|
|
Comprehensive happy path test for adding, retrieving, and managing tags on an agent.
|
|
"""
|
|
tags_to_add = ["test_tag_1", "test_tag_2", "test_tag_3"]
|
|
|
|
# Step 0: create an agent with no tags
|
|
agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: sarah",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
assert len(agent.tags) == 0
|
|
|
|
# Step 1: Add multiple tags to the agent
|
|
client.agents.modify(agent_id=agent.id, tags=tags_to_add)
|
|
|
|
# Step 2: Retrieve tags for the agent and verify they match the added tags
|
|
retrieved_tags = client.agents.retrieve(agent_id=agent.id).tags
|
|
assert set(retrieved_tags) == set(tags_to_add), f"Expected tags {tags_to_add}, but got {retrieved_tags}"
|
|
|
|
# Step 3: Retrieve agents by each tag to ensure the agent is associated correctly
|
|
for tag in tags_to_add:
|
|
agents_with_tag = client.agents.list(tags=[tag])
|
|
assert agent.id in [a.id for a in agents_with_tag], f"Expected agent {agent.id} to be associated with tag '{tag}'"
|
|
|
|
# Step 4: Delete a specific tag from the agent and verify its removal
|
|
tag_to_delete = tags_to_add.pop()
|
|
client.agents.modify(agent_id=agent.id, tags=tags_to_add)
|
|
|
|
# Verify the tag is removed from the agent's tags
|
|
remaining_tags = client.agents.retrieve(agent_id=agent.id).tags
|
|
assert tag_to_delete not in remaining_tags, f"Tag '{tag_to_delete}' was not removed as expected"
|
|
assert set(remaining_tags) == set(tags_to_add), f"Expected remaining tags to be {tags_to_add[1:]}, but got {remaining_tags}"
|
|
|
|
# Step 5: Delete all remaining tags from the agent
|
|
client.agents.modify(agent_id=agent.id, tags=[])
|
|
|
|
# Verify all tags are removed
|
|
final_tags = client.agents.retrieve(agent_id=agent.id).tags
|
|
assert len(final_tags) == 0, f"Expected no tags, but found {final_tags}"
|
|
|
|
# Remove agent
|
|
client.agents.delete(agent.id)
|
|
|
|
|
|
def test_agent_tags(client: LettaSDKClient):
|
|
"""Test creating agents with tags and retrieving tags via the API."""
|
|
# Clear all agents
|
|
all_agents = client.agents.list()
|
|
for agent in all_agents:
|
|
client.agents.delete(agent.id)
|
|
|
|
# Create multiple agents with different tags
|
|
agent1 = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: sarah",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tags=["test", "agent1", "production"],
|
|
)
|
|
|
|
agent2 = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: sarah",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tags=["test", "agent2", "development"],
|
|
)
|
|
|
|
agent3 = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: sarah",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tags=["test", "agent3", "production"],
|
|
)
|
|
|
|
# Test getting all tags
|
|
all_tags = client.tags.list()
|
|
expected_tags = ["agent1", "agent2", "agent3", "development", "production", "test"]
|
|
assert sorted(all_tags) == expected_tags
|
|
|
|
# Test pagination
|
|
paginated_tags = client.tags.list(limit=2)
|
|
assert len(paginated_tags) == 2
|
|
assert paginated_tags[0] == "agent1"
|
|
assert paginated_tags[1] == "agent2"
|
|
|
|
# Test pagination with cursor
|
|
next_page_tags = client.tags.list(after="agent2", limit=2)
|
|
assert len(next_page_tags) == 2
|
|
assert next_page_tags[0] == "agent3"
|
|
assert next_page_tags[1] == "development"
|
|
|
|
# Test text search
|
|
prod_tags = client.tags.list(query_text="prod")
|
|
assert sorted(prod_tags) == ["production"]
|
|
|
|
dev_tags = client.tags.list(query_text="dev")
|
|
assert sorted(dev_tags) == ["development"]
|
|
|
|
agent_tags = client.tags.list(query_text="agent")
|
|
assert sorted(agent_tags) == ["agent1", "agent2", "agent3"]
|
|
|
|
# Remove agents
|
|
client.agents.delete(agent1.id)
|
|
client.agents.delete(agent2.id)
|
|
client.agents.delete(agent3.id)
|
|
|
|
|
|
def test_update_agent_memory_label(client: LettaSDKClient, agent: AgentState):
|
|
"""Test that we can update the label of a block in an agent's memory"""
|
|
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
|
|
example_label = current_labels[0]
|
|
example_new_label = "example_new_label"
|
|
assert example_new_label not in current_labels
|
|
|
|
client.agents.blocks.modify(
|
|
agent_id=agent.id,
|
|
block_label=example_label,
|
|
label=example_new_label,
|
|
)
|
|
|
|
updated_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_new_label)
|
|
assert updated_block.label == example_new_label
|
|
|
|
|
|
def test_add_remove_agent_memory_block(client: LettaSDKClient, agent: AgentState):
|
|
"""Test that we can add and remove a block from an agent's memory"""
|
|
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
|
|
example_new_label = current_labels[0] + "_v2"
|
|
example_new_value = "example value"
|
|
assert example_new_label not in current_labels
|
|
|
|
# Link a new memory block
|
|
block = client.blocks.create(
|
|
label=example_new_label,
|
|
value=example_new_value,
|
|
limit=1000,
|
|
)
|
|
client.agents.blocks.attach(
|
|
agent_id=agent.id,
|
|
block_id=block.id,
|
|
)
|
|
|
|
updated_block = client.agents.blocks.retrieve(
|
|
agent_id=agent.id,
|
|
block_label=example_new_label,
|
|
)
|
|
assert updated_block.value == example_new_value
|
|
|
|
# Now unlink the block
|
|
client.agents.blocks.detach(
|
|
agent_id=agent.id,
|
|
block_id=block.id,
|
|
)
|
|
|
|
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
|
|
assert example_new_label not in current_labels
|
|
|
|
|
|
def test_update_agent_memory_limit(client: LettaSDKClient, agent: AgentState):
|
|
"""Test that we can update the limit of a block in an agent's memory"""
|
|
|
|
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
|
|
example_label = current_labels[0]
|
|
example_new_limit = 1
|
|
current_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label)
|
|
current_block_length = len(current_block.value)
|
|
|
|
assert example_new_limit != client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label).limit
|
|
assert example_new_limit < current_block_length
|
|
|
|
# We expect this to throw a value error
|
|
with pytest.raises(ApiError):
|
|
client.agents.blocks.modify(
|
|
agent_id=agent.id,
|
|
block_label=example_label,
|
|
limit=example_new_limit,
|
|
)
|
|
|
|
# Now try the same thing with a higher limit
|
|
example_new_limit = current_block_length + 10000
|
|
assert example_new_limit > current_block_length
|
|
client.agents.blocks.modify(
|
|
agent_id=agent.id,
|
|
block_label=example_label,
|
|
limit=example_new_limit,
|
|
)
|
|
|
|
assert example_new_limit == client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label).limit
|
|
|
|
|
|
def test_messages(client: LettaSDKClient, agent: AgentState):
|
|
send_message_response = client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="Test message",
|
|
),
|
|
],
|
|
)
|
|
assert send_message_response, "Sending message failed"
|
|
|
|
messages_response = client.agents.messages.list(
|
|
agent_id=agent.id,
|
|
limit=1,
|
|
)
|
|
assert len(messages_response) > 0, "Retrieving messages failed"
|
|
|
|
|
|
def test_send_system_message(client: LettaSDKClient, agent: AgentState):
|
|
"""Important unit test since the Letta API exposes sending system messages, but some backends don't natively support it (eg Anthropic)"""
|
|
send_system_message_response = client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="system",
|
|
content="Event occurred: The user just logged off.",
|
|
),
|
|
],
|
|
)
|
|
assert send_system_message_response, "Sending message failed"
|
|
|
|
|
|
def test_insert_archival_memory(client: LettaSDKClient, agent: AgentState):
|
|
passage = client.agents.passages.create(
|
|
agent_id=agent.id,
|
|
text="This is a test passage",
|
|
)
|
|
assert passage, "Inserting archival memory failed"
|
|
|
|
# List archival memory and verify content
|
|
archival_memory_response = client.agents.passages.list(agent_id=agent.id, limit=1)
|
|
archival_memories = [memory.text for memory in archival_memory_response]
|
|
assert "This is a test passage" in archival_memories, f"Retrieving archival memory failed: {archival_memories}"
|
|
|
|
# Delete the memory
|
|
memory_id_to_delete = archival_memory_response[0].id
|
|
client.agents.passages.delete(agent_id=agent.id, memory_id=memory_id_to_delete)
|
|
|
|
# Verify memory is gone (implicitly checks that the list call works)
|
|
final_passages = client.agents.passages.list(agent_id=agent.id)
|
|
passage_texts = [p.text for p in final_passages]
|
|
assert "This is a test passage" not in passage_texts, f"Memory was not deleted: {passage_texts}"
|
|
|
|
|
|
def test_search_archival_memory(client: LettaSDKClient, agent: AgentState):
|
|
from datetime import datetime, timezone
|
|
|
|
client.agents.passages.create(
|
|
agent_id=agent.id,
|
|
text="This is a test passage",
|
|
)
|
|
client.agents.passages.create(
|
|
agent_id=agent.id,
|
|
text="This is another test passage",
|
|
)
|
|
client.agents.passages.create(agent_id=agent.id, text="cats")
|
|
# insert old passage: 09/03/2001
|
|
old_passage = "OLD PASSAGE"
|
|
client.agents.passages.create(
|
|
agent_id=agent.id,
|
|
text=old_passage,
|
|
created_at=datetime(2001, 9, 3, 0, 0, 0, 0, timezone.utc),
|
|
)
|
|
|
|
# test seaching for old passage
|
|
search_results = client.agents.passages.search(agent_id=agent.id, query="cats", top_k=1)
|
|
assert len(search_results.results) == 1
|
|
assert search_results.results[0].content == "cats"
|
|
|
|
# test seaching for old passage
|
|
search_results = client.agents.passages.search(agent_id=agent.id, query="cats", top_k=4)
|
|
assert len(search_results.results) == 4
|
|
assert search_results.results[0].content == "cats"
|
|
|
|
# search for old passage
|
|
search_results = client.agents.passages.search(
|
|
agent_id=agent.id,
|
|
query="cats",
|
|
top_k=4,
|
|
start_datetime=datetime(2001, 8, 3, 0, 0, 0, 0, timezone.utc),
|
|
end_datetime=datetime(2001, 10, 3, 0, 0, 0, 0, timezone.utc),
|
|
)
|
|
assert len(search_results.results) == 1
|
|
assert search_results.results[0].content == old_passage
|
|
|
|
|
|
def test_function_return_limit(disable_e2b_api_key, client: LettaSDKClient, agent: AgentState):
|
|
"""Test to see if the function return limit works"""
|
|
|
|
def big_return():
|
|
"""
|
|
Always call this tool.
|
|
|
|
Returns:
|
|
important_data (str): Important data
|
|
"""
|
|
return "x" * 100000
|
|
|
|
tool = client.tools.upsert_from_function(func=big_return, return_char_limit=1000)
|
|
|
|
client.agents.tools.attach(agent_id=agent.id, tool_id=tool.id)
|
|
|
|
# get function response
|
|
response = client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="call the big_return function",
|
|
),
|
|
],
|
|
use_assistant_message=False,
|
|
)
|
|
|
|
response_message = None
|
|
for message in response.messages:
|
|
if isinstance(message, ToolReturnMessage):
|
|
response_message = message
|
|
break
|
|
|
|
assert response_message, "ToolReturnMessage message not found in response"
|
|
res = response_message.tool_return
|
|
assert "function output was truncated " in res
|
|
|
|
|
|
@pytest.mark.flaky(max_runs=3)
|
|
def test_function_always_error(client: LettaSDKClient, agent: AgentState):
|
|
"""Test to see if function that errors works correctly"""
|
|
|
|
def testing_method():
|
|
"""
|
|
A method that has test functionalit.
|
|
"""
|
|
return 5 / 0
|
|
|
|
tool = client.tools.upsert_from_function(func=testing_method, return_char_limit=1000)
|
|
|
|
client.agents.tools.attach(agent_id=agent.id, tool_id=tool.id)
|
|
|
|
# get function response
|
|
response = client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="call the testing_method function and tell me the result",
|
|
),
|
|
],
|
|
)
|
|
|
|
response_message = None
|
|
for message in response.messages:
|
|
if isinstance(message, ToolReturnMessage):
|
|
response_message = message
|
|
break
|
|
|
|
assert response_message, "ToolReturnMessage message not found in response"
|
|
assert response_message.status == "error"
|
|
|
|
assert "Error executing function testing_method: ZeroDivisionError: division by zero" in response_message.tool_return
|
|
assert "ZeroDivisionError" in response_message.tool_return
|
|
|
|
|
|
# TODO: Add back when the new agent loop hits
|
|
# @pytest.mark.asyncio
|
|
# async def test_send_message_parallel(client: LettaSDKClient, agent: AgentState):
|
|
# """
|
|
# Test that sending two messages in parallel does not error.
|
|
# """
|
|
#
|
|
# # Define a coroutine for sending a message using asyncio.to_thread for synchronous calls
|
|
# async def send_message_task(message: str):
|
|
# response = await asyncio.to_thread(
|
|
# client.agents.messages.create,
|
|
# agent_id=agent.id,
|
|
# messages=[
|
|
# MessageCreate(
|
|
# role="user",
|
|
# content=message,
|
|
# ),
|
|
# ],
|
|
# )
|
|
# assert response, f"Sending message '{message}' failed"
|
|
# return response
|
|
#
|
|
# # Prepare two tasks with different messages
|
|
# messages = ["Test message 1", "Test message 2"]
|
|
# tasks = [send_message_task(message) for message in messages]
|
|
#
|
|
# # Run the tasks concurrently
|
|
# responses = await asyncio.gather(*tasks, return_exceptions=True)
|
|
#
|
|
# # Check for exceptions and validate responses
|
|
# for i, response in enumerate(responses):
|
|
# if isinstance(response, Exception):
|
|
# pytest.fail(f"Task {i} failed with exception: {response}")
|
|
# else:
|
|
# assert response, f"Task {i} returned an invalid response: {response}"
|
|
#
|
|
# # Ensure both tasks completed
|
|
# assert len(responses) == len(messages), "Not all messages were processed"
|
|
|
|
|
|
def test_agent_creation(client: LettaSDKClient):
|
|
"""Test that block IDs are properly attached when creating an agent."""
|
|
sleeptime_agent_system = """
|
|
You are a helpful agent. You will be provided with a list of memory blocks and a user preferences block.
|
|
You should use the memory blocks to remember information about the user and their preferences.
|
|
You should also use the user preferences block to remember information about the user's preferences.
|
|
"""
|
|
|
|
# Create a test block that will represent user preferences
|
|
user_preferences_block = client.blocks.create(
|
|
label="user_preferences",
|
|
value="",
|
|
limit=10000,
|
|
)
|
|
|
|
# Create test tools
|
|
def test_tool():
|
|
"""A simple test tool."""
|
|
return "Hello from test tool!"
|
|
|
|
def another_test_tool():
|
|
"""Another test tool."""
|
|
return "Hello from another test tool!"
|
|
|
|
tool1 = client.tools.upsert_from_function(func=test_tool, tags=["test"])
|
|
tool2 = client.tools.upsert_from_function(func=another_test_tool, tags=["test"])
|
|
|
|
# Create test blocks
|
|
sleeptime_persona_block = client.blocks.create(label="persona", value="persona description", limit=5000)
|
|
mindy_block = client.blocks.create(label="mindy", value="Mindy is a helpful assistant", limit=5000)
|
|
|
|
# Create agent with the blocks and tools
|
|
agent = client.agents.create(
|
|
name=f"test_agent_{str(uuid.uuid4())}",
|
|
memory_blocks=[sleeptime_persona_block, mindy_block],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tool_ids=[tool1.id, tool2.id],
|
|
include_base_tools=False,
|
|
tags=["test"],
|
|
block_ids=[user_preferences_block.id],
|
|
)
|
|
|
|
# Verify the agent was created successfully
|
|
assert agent is not None
|
|
assert agent.id is not None
|
|
|
|
# Verify all memory blocks are properly attached
|
|
for block in [sleeptime_persona_block, mindy_block, user_preferences_block]:
|
|
agent_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=block.label)
|
|
assert block.value == agent_block.value and block.limit == agent_block.limit
|
|
|
|
# Verify the tools are properly attached
|
|
agent_tools = client.agents.tools.list(agent_id=agent.id)
|
|
assert len(agent_tools) == 2
|
|
tool_ids = {tool1.id, tool2.id}
|
|
assert all(tool.id in tool_ids for tool in agent_tools)
|
|
|
|
|
|
def test_many_blocks(client: LettaSDKClient):
|
|
users = ["user1", "user2"]
|
|
# Create agent with the blocks
|
|
agent1 = client.agents.create(
|
|
name=f"test_agent_{str(uuid.uuid4())}",
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="user1",
|
|
value="user preferences: loud",
|
|
),
|
|
CreateBlock(
|
|
label="user2",
|
|
value="user preferences: happy",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
include_base_tools=False,
|
|
tags=["test"],
|
|
)
|
|
agent2 = client.agents.create(
|
|
name=f"test_agent_{str(uuid.uuid4())}",
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="user1",
|
|
value="user preferences: sneezy",
|
|
),
|
|
CreateBlock(
|
|
label="user2",
|
|
value="user preferences: lively",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
include_base_tools=False,
|
|
tags=["test"],
|
|
)
|
|
|
|
# Verify the agent was created successfully
|
|
assert agent1 is not None
|
|
assert agent2 is not None
|
|
|
|
# Verify all memory blocks are properly attached
|
|
for user in users:
|
|
agent_block = client.agents.blocks.retrieve(agent_id=agent1.id, block_label=user)
|
|
assert agent_block is not None
|
|
|
|
blocks = client.blocks.list(label=user)
|
|
assert len(blocks) == 2
|
|
|
|
for block in blocks:
|
|
client.blocks.delete(block.id)
|
|
|
|
client.agents.delete(agent1.id)
|
|
client.agents.delete(agent2.id)
|
|
|
|
|
|
# cases: steam, async, token stream, sync
|
|
@pytest.mark.parametrize("message_create", ["stream_step", "token_stream", "sync", "async"])
|
|
def test_include_return_message_types(client: LettaSDKClient, agent: AgentState, message_create: str):
|
|
"""Test that the include_return_message_types parameter works"""
|
|
|
|
def verify_message_types(messages, message_types):
|
|
for message in messages:
|
|
assert message.message_type in message_types
|
|
|
|
message = "My name is actually Sarah"
|
|
message_types = ["reasoning_message", "tool_call_message"]
|
|
agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(label="user", value="Name: Charles"),
|
|
],
|
|
model="letta/letta-free",
|
|
embedding="letta/letta-free",
|
|
)
|
|
|
|
if message_create == "stream_step":
|
|
response = client.agents.messages.create_stream(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content=message,
|
|
),
|
|
],
|
|
include_return_message_types=message_types,
|
|
)
|
|
messages = [message for message in list(response) if message.message_type not in ["stop_reason", "usage_statistics"]]
|
|
verify_message_types(messages, message_types)
|
|
|
|
elif message_create == "async":
|
|
response = client.agents.messages.create_async(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content=message,
|
|
)
|
|
],
|
|
include_return_message_types=message_types,
|
|
)
|
|
# wait to finish
|
|
while response.status not in {"failed", "completed", "cancelled", "expired"}:
|
|
time.sleep(1)
|
|
response = client.runs.retrieve(run_id=response.id)
|
|
|
|
if response.status != "completed":
|
|
pytest.fail(f"Response status was NOT completed: {response}")
|
|
|
|
messages = client.runs.messages.list(run_id=response.id)
|
|
verify_message_types(messages, message_types)
|
|
|
|
elif message_create == "token_stream":
|
|
response = client.agents.messages.create_stream(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content=message,
|
|
),
|
|
],
|
|
include_return_message_types=message_types,
|
|
)
|
|
messages = [message for message in list(response) if message.message_type not in ["stop_reason", "usage_statistics"]]
|
|
verify_message_types(messages, message_types)
|
|
|
|
elif message_create == "sync":
|
|
response = client.agents.messages.create(
|
|
agent_id=agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content=message,
|
|
),
|
|
],
|
|
include_return_message_types=message_types,
|
|
)
|
|
messages = response.messages
|
|
verify_message_types(messages, message_types)
|
|
|
|
# cleanup
|
|
client.agents.delete(agent.id)
|
|
|
|
|
|
def test_base_tools_upsert_on_list(client: LettaSDKClient):
|
|
"""Test that base tools are automatically upserted when missing on tools list call"""
|
|
from letta.constants import LETTA_TOOL_SET
|
|
|
|
# First, get the initial list of tools to establish baseline
|
|
initial_tools = client.tools.list()
|
|
initial_tool_names = {tool.name for tool in initial_tools}
|
|
|
|
# Find which base tools might be missing initially
|
|
missing_base_tools = LETTA_TOOL_SET - initial_tool_names
|
|
|
|
# If all base tools are already present, we need to delete some to test the upsert functionality
|
|
# We'll delete a few base tools if they exist to create the condition for testing
|
|
tools_to_delete = []
|
|
if not missing_base_tools:
|
|
# Pick a few base tools to delete for testing
|
|
test_base_tools = ["send_message", "conversation_search"]
|
|
for tool_name in test_base_tools:
|
|
for tool in initial_tools:
|
|
if tool.name == tool_name:
|
|
tools_to_delete.append(tool)
|
|
client.tools.delete(tool_id=tool.id)
|
|
break
|
|
|
|
# Now call list_tools() which should trigger the base tools check and upsert
|
|
updated_tools = client.tools.list()
|
|
updated_tool_names = {tool.name for tool in updated_tools}
|
|
|
|
# Verify that all base tools are now present
|
|
missing_after_upsert = LETTA_TOOL_SET - updated_tool_names
|
|
assert not missing_after_upsert, f"Base tools still missing after upsert: {missing_after_upsert}"
|
|
|
|
# Verify that the base tools are actually in the list
|
|
for base_tool_name in LETTA_TOOL_SET:
|
|
assert base_tool_name in updated_tool_names, f"Base tool {base_tool_name} not found after upsert"
|
|
|
|
# Cleanup: restore any tools we deleted for testing (they should already be restored by the upsert)
|
|
# This is just a double-check that our test cleanup is proper
|
|
final_tools = client.tools.list()
|
|
final_tool_names = {tool.name for tool in final_tools}
|
|
for deleted_tool in tools_to_delete:
|
|
assert deleted_tool.name in final_tool_names, f"Deleted tool {deleted_tool.name} was not properly restored"
|
|
|
|
|
|
@pytest.mark.parametrize("e2b_sandbox_mode", [True, False], indirect=True)
|
|
def test_pydantic_inventory_management_tool(e2b_sandbox_mode, client: LettaSDKClient):
|
|
class InventoryItem(BaseModel):
|
|
sku: str
|
|
name: str
|
|
price: float
|
|
category: str
|
|
|
|
class InventoryEntry(BaseModel):
|
|
timestamp: int
|
|
item: InventoryItem
|
|
transaction_id: str
|
|
|
|
class InventoryEntryData(BaseModel):
|
|
data: InventoryEntry
|
|
quantity_change: int
|
|
|
|
class ManageInventoryTool(BaseTool):
|
|
name: str = "manage_inventory"
|
|
args_schema: Type[BaseModel] = InventoryEntryData
|
|
description: str = "Update inventory catalogue with a new data entry"
|
|
tags: List[str] = ["inventory", "shop"]
|
|
|
|
def run(self, data: InventoryEntry, quantity_change: int) -> bool:
|
|
print(f"Updated inventory for {data.item.name} with a quantity change of {quantity_change}")
|
|
return True
|
|
|
|
# test creation
|
|
tool = client.tools.add(
|
|
tool=ManageInventoryTool(),
|
|
)
|
|
|
|
# test that upserting also works
|
|
new_description = "NEW"
|
|
|
|
class ManageInventoryToolModified(ManageInventoryTool):
|
|
description: str = new_description
|
|
|
|
tool = client.tools.add(
|
|
tool=ManageInventoryToolModified(),
|
|
)
|
|
assert tool.description == new_description
|
|
|
|
assert tool is not None
|
|
assert tool.name == "manage_inventory"
|
|
assert "inventory" in tool.tags
|
|
assert "shop" in tool.tags
|
|
|
|
temp_agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="persona",
|
|
value="You are a helpful inventory management assistant.",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tool_ids=[tool.id],
|
|
include_base_tools=False,
|
|
)
|
|
|
|
response = client.agents.messages.create(
|
|
agent_id=temp_agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="Update the inventory for product 'iPhone 15' with SKU 'IPH15-001', price $999.99, category 'Electronics', transaction ID 'TXN-12345', timestamp 1640995200, with a quantity change of +10",
|
|
),
|
|
],
|
|
)
|
|
|
|
assert response is not None
|
|
|
|
tool_call_messages = [msg for msg in response.messages if msg.message_type == "tool_call_message"]
|
|
assert len(tool_call_messages) > 0, "Expected at least one tool call message"
|
|
|
|
first_tool_call = tool_call_messages[0]
|
|
assert first_tool_call.tool_call.name == "manage_inventory"
|
|
|
|
args = json.loads(first_tool_call.tool_call.arguments)
|
|
assert "data" in args
|
|
assert "quantity_change" in args
|
|
assert "item" in args["data"]
|
|
assert "name" in args["data"]["item"]
|
|
assert "sku" in args["data"]["item"]
|
|
assert "price" in args["data"]["item"]
|
|
assert "category" in args["data"]["item"]
|
|
assert "transaction_id" in args["data"]
|
|
assert "timestamp" in args["data"]
|
|
|
|
tool_return_messages = [msg for msg in response.messages if msg.message_type == "tool_return_message"]
|
|
assert len(tool_return_messages) > 0, "Expected at least one tool return message"
|
|
|
|
first_tool_return = tool_return_messages[0]
|
|
assert first_tool_return.status == "success"
|
|
assert first_tool_return.tool_return == "True"
|
|
assert "Updated inventory for iPhone 15 with a quantity change of 10" in "\n".join(first_tool_return.stdout)
|
|
|
|
client.agents.delete(temp_agent.id)
|
|
client.tools.delete(tool.id)
|
|
|
|
|
|
@pytest.mark.parametrize("e2b_sandbox_mode", [False], indirect=True)
|
|
def test_pydantic_task_planning_tool(e2b_sandbox_mode, client: LettaSDKClient):
|
|
class Step(BaseModel):
|
|
name: str = Field(..., description="Name of the step.")
|
|
description: str = Field(..., description="An exhaustive description of what this step is trying to achieve.")
|
|
|
|
class StepsList(BaseModel):
|
|
steps: List[Step] = Field(..., description="List of steps to add to the task plan.")
|
|
explanation: str = Field(..., description="Explanation for the list of steps.")
|
|
|
|
def create_task_plan(steps, explanation):
|
|
"""Creates a task plan for the current task."""
|
|
print(f"Created task plan with {len(steps)} steps: {explanation}")
|
|
return steps
|
|
|
|
# test creation
|
|
client.tools.upsert_from_function(func=create_task_plan, args_schema=StepsList, tags=["planning", "task", "pydantic_test"])
|
|
|
|
# test upsert
|
|
new_steps_description = "NEW"
|
|
|
|
class StepsListModified(BaseModel):
|
|
steps: List[Step] = Field(..., description=new_steps_description)
|
|
explanation: str = Field(..., description="Explanation for the list of steps.")
|
|
|
|
tool = client.tools.upsert_from_function(func=create_task_plan, args_schema=StepsListModified, description=new_steps_description)
|
|
assert tool.description == new_steps_description
|
|
|
|
assert tool is not None
|
|
assert tool.name == "create_task_plan"
|
|
assert "planning" in tool.tags
|
|
assert "task" in tool.tags
|
|
|
|
temp_agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="persona",
|
|
value="You are a helpful task planning assistant.",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tool_ids=[tool.id],
|
|
include_base_tools=False,
|
|
tool_rules=[
|
|
TerminalToolRule(tool_name=tool.name),
|
|
],
|
|
)
|
|
|
|
response = client.agents.messages.create(
|
|
agent_id=temp_agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="Create a task plan for organizing a team meeting with 3 steps: 1) Schedule meeting (find available time slots), 2) Send invitations (notify all team members), 3) Prepare agenda (outline discussion topics). Explanation: This plan ensures a well-organized team meeting.",
|
|
),
|
|
],
|
|
)
|
|
|
|
assert response is not None
|
|
assert hasattr(response, "messages")
|
|
assert len(response.messages) > 0
|
|
|
|
tool_call_messages = [msg for msg in response.messages if msg.message_type == "tool_call_message"]
|
|
assert len(tool_call_messages) > 0, "Expected at least one tool call message"
|
|
|
|
first_tool_call = tool_call_messages[0]
|
|
assert first_tool_call.tool_call.name == "create_task_plan"
|
|
|
|
args = json.loads(first_tool_call.tool_call.arguments)
|
|
|
|
assert "steps" in args
|
|
assert "explanation" in args
|
|
assert isinstance(args["steps"], list)
|
|
assert len(args["steps"]) > 0
|
|
|
|
for step in args["steps"]:
|
|
assert "name" in step
|
|
assert "description" in step
|
|
|
|
tool_return_messages = [msg for msg in response.messages if msg.message_type == "tool_return_message"]
|
|
assert len(tool_return_messages) > 0, "Expected at least one tool return message"
|
|
|
|
first_tool_return = tool_return_messages[0]
|
|
assert first_tool_return.status == "success"
|
|
|
|
client.agents.delete(temp_agent.id)
|
|
client.tools.delete(tool.id)
|
|
|
|
|
|
@pytest.mark.parametrize("e2b_sandbox_mode", [True, False], indirect=True)
|
|
def test_create_tool_from_function_with_docstring(e2b_sandbox_mode, client: LettaSDKClient):
|
|
"""Test creating a tool from a function with a docstring using create_from_function"""
|
|
|
|
def roll_dice() -> str:
|
|
"""
|
|
Simulate the roll of a 20-sided die (d20).
|
|
|
|
This function generates a random integer between 1 and 20, inclusive,
|
|
which represents the outcome of a single roll of a d20.
|
|
|
|
Returns:
|
|
str: The result of the die roll.
|
|
"""
|
|
import random
|
|
|
|
dice_role_outcome = random.randint(1, 20)
|
|
output_string = f"You rolled a {dice_role_outcome}"
|
|
return output_string
|
|
|
|
tool = client.tools.create_from_function(func=roll_dice)
|
|
|
|
assert tool is not None
|
|
assert tool.name == "roll_dice"
|
|
assert "Simulate the roll of a 20-sided die" in tool.description
|
|
assert tool.source_code is not None
|
|
assert "random.randint(1, 20)" in tool.source_code
|
|
|
|
all_tools = client.tools.list()
|
|
tool_names = [t.name for t in all_tools]
|
|
assert "roll_dice" in tool_names
|
|
|
|
client.tools.delete(tool.id)
|
|
|
|
|
|
def test_preview_payload(client: LettaSDKClient):
|
|
temp_agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: sarah",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
|
|
try:
|
|
payload = client.agents.messages.preview_raw_payload(
|
|
agent_id=temp_agent.id,
|
|
request=LettaRequest(
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content=[
|
|
TextContent(
|
|
text="text",
|
|
)
|
|
],
|
|
)
|
|
],
|
|
),
|
|
)
|
|
|
|
assert isinstance(payload, dict)
|
|
assert "model" in payload
|
|
assert "messages" in payload
|
|
assert "tools" in payload
|
|
assert "frequency_penalty" in payload
|
|
assert "max_completion_tokens" in payload
|
|
assert "temperature" in payload
|
|
assert "user" in payload
|
|
assert "parallel_tool_calls" in payload
|
|
assert "tool_choice" in payload
|
|
|
|
assert payload["model"] == "gpt-4o-mini"
|
|
|
|
assert isinstance(payload["messages"], list)
|
|
assert len(payload["messages"]) >= 3
|
|
|
|
system_message = payload["messages"][0]
|
|
assert system_message["role"] == "system"
|
|
assert "base_instructions" in system_message["content"]
|
|
assert "memory_blocks" in system_message["content"]
|
|
assert "Letta" in system_message["content"]
|
|
|
|
assert isinstance(payload["tools"], list)
|
|
assert len(payload["tools"]) > 0
|
|
|
|
for tool in payload["tools"]:
|
|
assert tool["type"] == "function"
|
|
assert "function" in tool
|
|
assert "name" in tool["function"]
|
|
assert "description" in tool["function"]
|
|
assert "parameters" in tool["function"]
|
|
assert tool["function"]["strict"] is True
|
|
|
|
assert payload["frequency_penalty"] == 1.0
|
|
assert payload["max_completion_tokens"] is None
|
|
assert payload["temperature"] == 0.7
|
|
assert payload["parallel_tool_calls"] is False
|
|
assert payload["tool_choice"] == "required"
|
|
assert payload["user"].startswith("user-")
|
|
|
|
print(payload)
|
|
finally:
|
|
# Clean up the agent
|
|
client.agents.delete(agent_id=temp_agent.id)
|
|
|
|
|
|
def test_archive_tags_in_system_prompt(client: LettaSDKClient):
|
|
"""Test that archive tags are correctly compiled into the system prompt."""
|
|
# Create a test agent
|
|
temp_agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="human",
|
|
value="username: test_user",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
)
|
|
|
|
try:
|
|
# Add passages with different tags to the agent's archive
|
|
test_tags = ["project_alpha", "meeting_notes", "research", "ideas", "todo_items"]
|
|
|
|
# Create passages with tags
|
|
for i, tag in enumerate(test_tags):
|
|
client.agents.passages.create(agent_id=temp_agent.id, text=f"Test passage {i} with tag {tag}", tags=[tag])
|
|
|
|
# Also create a passage with multiple tags
|
|
client.agents.passages.create(agent_id=temp_agent.id, text="Passage with multiple tags", tags=["multi_tag_1", "multi_tag_2"])
|
|
|
|
# Get the raw payload to check the system prompt
|
|
payload = client.agents.messages.preview_raw_payload(
|
|
agent_id=temp_agent.id,
|
|
request=LettaRequest(
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content=[
|
|
TextContent(
|
|
text="Hello",
|
|
)
|
|
],
|
|
)
|
|
],
|
|
),
|
|
)
|
|
|
|
# Extract the system message
|
|
assert isinstance(payload, dict)
|
|
assert "messages" in payload
|
|
assert len(payload["messages"]) > 0
|
|
|
|
system_message = payload["messages"][0]
|
|
assert system_message["role"] == "system"
|
|
system_content = system_message["content"]
|
|
|
|
# Check that the archive tags are included in the metadata
|
|
assert "Available archival memory tags:" in system_content
|
|
|
|
# Check that all unique tags are present
|
|
all_unique_tags = set(test_tags + ["multi_tag_1", "multi_tag_2"])
|
|
for tag in all_unique_tags:
|
|
assert tag in system_content, f"Tag '{tag}' not found in system prompt"
|
|
|
|
# Verify the tags are in the memory_metadata section
|
|
assert "<memory_metadata>" in system_content
|
|
assert "</memory_metadata>" in system_content
|
|
|
|
# Extract the metadata section to verify format
|
|
metadata_start = system_content.index("<memory_metadata>")
|
|
metadata_end = system_content.index("</memory_metadata>")
|
|
metadata_section = system_content[metadata_start:metadata_end]
|
|
|
|
# Verify the tags line is properly formatted
|
|
assert "- Available archival memory tags:" in metadata_section
|
|
|
|
finally:
|
|
# Clean up the agent
|
|
client.agents.delete(agent_id=temp_agent.id)
|
|
|
|
|
|
def test_agent_tools_list(client: LettaSDKClient):
|
|
"""Test the optimized agent tools list endpoint for correctness."""
|
|
# Create a test agent
|
|
agent_state = client.agents.create(
|
|
name="test_agent_tools_list",
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="persona",
|
|
value="You are a helpful assistant.",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
include_base_tools=True,
|
|
)
|
|
|
|
try:
|
|
# Test basic functionality
|
|
tools = client.agents.tools.list(agent_id=agent_state.id)
|
|
assert len(tools) > 0, "Agent should have base tools attached"
|
|
|
|
# Verify tool objects have expected attributes
|
|
for tool in tools:
|
|
assert hasattr(tool, "id"), "Tool should have id attribute"
|
|
assert hasattr(tool, "name"), "Tool should have name attribute"
|
|
assert tool.id is not None, "Tool id should not be None"
|
|
assert tool.name is not None, "Tool name should not be None"
|
|
|
|
finally:
|
|
# Clean up
|
|
client.agents.delete(agent_id=agent_state.id)
|
|
|
|
|
|
def test_agent_tool_rules_deduplication(client: LettaSDKClient):
|
|
"""Test that duplicate tool rules are properly deduplicated when creating/updating agents."""
|
|
# Create agent with duplicate tool rules
|
|
duplicate_rules = [
|
|
TerminalToolRule(tool_name="send_message"),
|
|
TerminalToolRule(tool_name="send_message"), # exact duplicate
|
|
TerminalToolRule(tool_name="send_message"), # another duplicate
|
|
]
|
|
|
|
agent_state = client.agents.create(
|
|
name="test_agent_dedup",
|
|
memory_blocks=[
|
|
CreateBlock(
|
|
label="persona",
|
|
value="You are a helpful assistant.",
|
|
),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tool_rules=duplicate_rules,
|
|
include_base_tools=False,
|
|
)
|
|
|
|
# Get the agent and check tool rules
|
|
retrieved_agent = client.agents.retrieve(agent_id=agent_state.id)
|
|
assert len(retrieved_agent.tool_rules) == 1, f"Expected 1 unique tool rule, got {len(retrieved_agent.tool_rules)}"
|
|
assert retrieved_agent.tool_rules[0].tool_name == "send_message"
|
|
assert retrieved_agent.tool_rules[0].type == "exit_loop"
|
|
|
|
# Test update with duplicates
|
|
update_rules = [
|
|
ContinueToolRule(tool_name="conversation_search"),
|
|
ContinueToolRule(tool_name="conversation_search"), # duplicate
|
|
MaxCountPerStepToolRule(tool_name="test_tool", max_count_limit=2),
|
|
MaxCountPerStepToolRule(tool_name="test_tool", max_count_limit=2), # exact duplicate
|
|
MaxCountPerStepToolRule(tool_name="test_tool", max_count_limit=3), # different limit, not a duplicate
|
|
]
|
|
|
|
updated_agent = client.agents.modify(agent_id=agent_state.id, tool_rules=update_rules)
|
|
|
|
# Check that duplicates were removed
|
|
assert len(updated_agent.tool_rules) == 3, f"Expected 3 unique tool rules after update, got {len(updated_agent.tool_rules)}"
|
|
|
|
# Verify the specific rules
|
|
rule_set = {(r.tool_name, r.type, getattr(r, "max_count_limit", None)) for r in updated_agent.tool_rules}
|
|
expected_set = {
|
|
("conversation_search", "continue_loop", None),
|
|
("test_tool", "max_count_per_step", 2),
|
|
("test_tool", "max_count_per_step", 3),
|
|
}
|
|
assert rule_set == expected_set, f"Tool rules don't match expected. Got: {rule_set}"
|
|
|
|
|
|
def test_add_tool_with_multiple_functions_in_source_code(client: LettaSDKClient):
|
|
"""Test adding a tool with multiple functions in the source code"""
|
|
import textwrap
|
|
|
|
# Define source code with multiple functions
|
|
source_code = textwrap.dedent(
|
|
"""
|
|
def helper_function(x: int) -> int:
|
|
'''
|
|
Helper function that doubles the input
|
|
|
|
Args:
|
|
x: The input number
|
|
|
|
Returns:
|
|
The input multiplied by 2
|
|
'''
|
|
return x * 2
|
|
|
|
def another_helper(text: str) -> str:
|
|
'''
|
|
Another helper that uppercases text
|
|
|
|
Args:
|
|
text: The input text to uppercase
|
|
|
|
Returns:
|
|
The uppercased text
|
|
'''
|
|
return text.upper()
|
|
|
|
def main_function(x: int, y: int) -> int:
|
|
'''
|
|
Main function that uses the helper
|
|
|
|
Args:
|
|
x: First number
|
|
y: Second number
|
|
|
|
Returns:
|
|
Result of (x * 2) + y
|
|
'''
|
|
doubled_x = helper_function(x)
|
|
return doubled_x + y
|
|
"""
|
|
).strip()
|
|
|
|
# Create the tool with multiple functions
|
|
tool = client.tools.create(
|
|
source_code=source_code,
|
|
)
|
|
|
|
try:
|
|
# Verify the tool was created
|
|
assert tool is not None
|
|
assert tool.name == "main_function"
|
|
assert tool.source_code == source_code
|
|
|
|
# Verify the JSON schema was generated for the main function
|
|
assert tool.json_schema is not None
|
|
assert tool.json_schema["name"] == "main_function"
|
|
assert tool.json_schema["description"] == "Main function that uses the helper"
|
|
|
|
# Check parameters
|
|
params = tool.json_schema.get("parameters", {})
|
|
properties = params.get("properties", {})
|
|
assert "x" in properties
|
|
assert "y" in properties
|
|
assert properties["x"]["type"] == "integer"
|
|
assert properties["y"]["type"] == "integer"
|
|
assert params["required"] == ["x", "y"]
|
|
|
|
# Test that we can retrieve the tool
|
|
retrieved_tool = client.tools.retrieve(tool_id=tool.id)
|
|
assert retrieved_tool.name == "main_function"
|
|
assert retrieved_tool.source_code == source_code
|
|
|
|
finally:
|
|
# Clean up
|
|
client.tools.delete(tool_id=tool.id)
|
|
|
|
|
|
# TODO: add back once behavior is defined
|
|
# def test_tool_name_auto_update_with_multiple_functions(client: LettaSDKClient):
|
|
# """Test that tool name auto-updates when source code changes with multiple functions"""
|
|
# import textwrap
|
|
#
|
|
# # Initial source code with multiple functions
|
|
# initial_source_code = textwrap.dedent(
|
|
# """
|
|
# def helper_function(x: int) -> int:
|
|
# '''
|
|
# Helper function that doubles the input
|
|
#
|
|
# Args:
|
|
# x: The input number
|
|
#
|
|
# Returns:
|
|
# The input multiplied by 2
|
|
# '''
|
|
# return x * 2
|
|
#
|
|
# def another_helper(text: str) -> str:
|
|
# '''
|
|
# Another helper that uppercases text
|
|
#
|
|
# Args:
|
|
# text: The input text to uppercase
|
|
#
|
|
# Returns:
|
|
# The uppercased text
|
|
# '''
|
|
# return text.upper()
|
|
#
|
|
# def main_function(x: int, y: int) -> int:
|
|
# '''
|
|
# Main function that uses the helper
|
|
#
|
|
# Args:
|
|
# x: First number
|
|
# y: Second number
|
|
#
|
|
# Returns:
|
|
# Result of (x * 2) + y
|
|
# '''
|
|
# doubled_x = helper_function(x)
|
|
# return doubled_x + y
|
|
# """
|
|
# ).strip()
|
|
#
|
|
# # Create tool with initial source code
|
|
# tool = client.tools.create(
|
|
# source_code=initial_source_code,
|
|
# )
|
|
#
|
|
# try:
|
|
# # Verify the tool was created with the last function's name
|
|
# assert tool is not None
|
|
# assert tool.name == "main_function"
|
|
# assert tool.source_code == initial_source_code
|
|
#
|
|
# # Now modify the source code with a different function order
|
|
# new_source_code = textwrap.dedent(
|
|
# """
|
|
# def process_data(data: str, count: int) -> str:
|
|
# '''
|
|
# Process data by repeating it
|
|
#
|
|
# Args:
|
|
# data: The input data
|
|
# count: Number of times to repeat
|
|
#
|
|
# Returns:
|
|
# The processed data
|
|
# '''
|
|
# return data * count
|
|
#
|
|
# def helper_utility(x: float) -> float:
|
|
# '''
|
|
# Helper utility function
|
|
#
|
|
# Args:
|
|
# x: Input value
|
|
#
|
|
# Returns:
|
|
# Squared value
|
|
# '''
|
|
# return x * x
|
|
# """
|
|
# ).strip()
|
|
#
|
|
# # Modify the tool with new source code
|
|
# modified_tool = client.tools.modify(name="helper_utility", tool_id=tool.id, source_code=new_source_code)
|
|
#
|
|
# # Verify the name automatically updated to the last function
|
|
# assert modified_tool.name == "helper_utility"
|
|
# assert modified_tool.source_code == new_source_code
|
|
#
|
|
# # Verify the JSON schema updated correctly
|
|
# assert modified_tool.json_schema is not None
|
|
# assert modified_tool.json_schema["name"] == "helper_utility"
|
|
# assert modified_tool.json_schema["description"] == "Helper utility function"
|
|
#
|
|
# # Check parameters updated correctly
|
|
# params = modified_tool.json_schema.get("parameters", {})
|
|
# properties = params.get("properties", {})
|
|
# assert "x" in properties
|
|
# assert properties["x"]["type"] == "number" # float maps to number
|
|
# assert params["required"] == ["x"]
|
|
#
|
|
# # Test one more modification with only one function
|
|
# single_function_code = textwrap.dedent(
|
|
# """
|
|
# def calculate_total(items: list, tax_rate: float) -> float:
|
|
# '''
|
|
# Calculate total with tax
|
|
#
|
|
# Args:
|
|
# items: List of item prices
|
|
# tax_rate: Tax rate as decimal
|
|
#
|
|
# Returns:
|
|
# Total including tax
|
|
# '''
|
|
# subtotal = sum(items)
|
|
# return subtotal * (1 + tax_rate)
|
|
# """
|
|
# ).strip()
|
|
#
|
|
# # Modify again
|
|
# final_tool = client.tools.modify(tool_id=tool.id, source_code=single_function_code)
|
|
#
|
|
# # Verify name updated again
|
|
# assert final_tool.name == "calculate_total"
|
|
# assert final_tool.source_code == single_function_code
|
|
# assert final_tool.json_schema["description"] == "Calculate total with tax"
|
|
#
|
|
# finally:
|
|
# # Clean up
|
|
# client.tools.delete(tool_id=tool.id)
|
|
|
|
|
|
def test_tool_rename_with_json_schema_and_source_code(client: LettaSDKClient):
|
|
"""Test that passing both new JSON schema AND source code still renames the tool based on source code"""
|
|
|
|
# Create initial tool
|
|
def initial_tool(x: int) -> int:
|
|
"""
|
|
Multiply a number by 2
|
|
|
|
Args:
|
|
x: The input number
|
|
|
|
Returns:
|
|
The input multiplied by 2
|
|
"""
|
|
return x * 2
|
|
|
|
# Create the tool
|
|
tool = client.tools.upsert_from_function(func=initial_tool)
|
|
assert tool.name == "initial_tool"
|
|
|
|
try:
|
|
# Define new function source code with different name
|
|
new_source_code = textwrap.dedent(
|
|
"""
|
|
def renamed_function(value: float, multiplier: float = 2.0) -> float:
|
|
'''
|
|
Multiply a value by a multiplier
|
|
|
|
Args:
|
|
value: The input value
|
|
multiplier: The multiplier to use (default 2.0)
|
|
|
|
Returns:
|
|
The value multiplied by the multiplier
|
|
'''
|
|
return value * multiplier
|
|
"""
|
|
).strip()
|
|
|
|
# Create a custom JSON schema that has a different name
|
|
custom_json_schema = {
|
|
"name": "custom_schema_name",
|
|
"description": "Custom description from JSON schema",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"value": {"type": "number", "description": "Input value from JSON schema"},
|
|
"multiplier": {"type": "number", "description": "Multiplier from JSON schema", "default": 2.0},
|
|
},
|
|
"required": ["value"],
|
|
},
|
|
}
|
|
|
|
# verify there is a 400 error when both source code and json schema are provided
|
|
with pytest.raises(Exception) as e:
|
|
client.tools.modify(tool_id=tool.id, source_code=new_source_code, json_schema=custom_json_schema)
|
|
assert e.value.status_code == 400
|
|
|
|
# update with consistent name and schema
|
|
custom_json_schema["name"] = "renamed_function"
|
|
tool = client.tools.modify(tool_id=tool.id, json_schema=custom_json_schema)
|
|
assert tool.json_schema == custom_json_schema
|
|
assert tool.name == "renamed_function"
|
|
|
|
finally:
|
|
# Clean up
|
|
client.tools.delete(tool_id=tool.id)
|
|
|
|
|
|
def test_import_agent_file_from_disk(
|
|
client: LettaSDKClient, fibonacci_tool, preferences_tool, data_analysis_tool, persona_block, human_block, context_block
|
|
):
|
|
"""Test exporting an agent to file and importing it back from disk."""
|
|
# Create a comprehensive agent (similar to test_agent_serialization_v2)
|
|
name = f"test_export_import_{str(uuid.uuid4())}"
|
|
temp_agent = client.agents.create(
|
|
name=name,
|
|
memory_blocks=[persona_block, human_block, context_block],
|
|
model="openai/gpt-4.1-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tool_ids=[fibonacci_tool.id, preferences_tool.id, data_analysis_tool.id],
|
|
include_base_tools=True,
|
|
tags=["test", "export", "import"],
|
|
system="You are a helpful assistant specializing in data analysis and mathematical computations.",
|
|
)
|
|
|
|
# Add archival memory
|
|
archival_passages = ["Test archival passage for export/import testing.", "Another passage with data about testing procedures."]
|
|
|
|
for passage_text in archival_passages:
|
|
client.agents.passages.create(agent_id=temp_agent.id, text=passage_text)
|
|
|
|
# Send a test message
|
|
client.agents.messages.create(
|
|
agent_id=temp_agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="Test message for export",
|
|
),
|
|
],
|
|
)
|
|
|
|
# Export the agent
|
|
serialized_v2 = client.agents.export_file(agent_id=temp_agent.id, use_legacy_format=False)
|
|
|
|
# Save to file
|
|
file_path = os.path.join(os.path.dirname(__file__), "test_agent_files", "test_basic_agent_with_blocks_tools_messages_v2.af")
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
|
|
|
with open(file_path, "w") as f:
|
|
json.dump(serialized_v2, f, indent=2)
|
|
|
|
# Now import from the file
|
|
with open(file_path, "rb") as f:
|
|
import_result = client.agents.import_file(
|
|
file=f,
|
|
append_copy_suffix=True,
|
|
override_existing_tools=True, # Use suffix to avoid name conflict
|
|
)
|
|
|
|
# Basic verification
|
|
assert import_result is not None, "Import result should not be None"
|
|
assert len(import_result.agent_ids) > 0, "Should have imported at least one agent"
|
|
|
|
# Get the imported agent
|
|
imported_agent_id = import_result.agent_ids[0]
|
|
imported_agent = client.agents.retrieve(agent_id=imported_agent_id)
|
|
|
|
# Basic checks
|
|
assert imported_agent is not None, "Should be able to retrieve imported agent"
|
|
assert imported_agent.name is not None, "Imported agent should have a name"
|
|
assert imported_agent.memory is not None, "Agent should have memory"
|
|
assert len(imported_agent.tools) > 0, "Agent should have tools"
|
|
assert imported_agent.system is not None, "Agent should have a system prompt"
|
|
|
|
|
|
def test_agent_serialization_v2(
|
|
client: LettaSDKClient, fibonacci_tool, preferences_tool, data_analysis_tool, persona_block, human_block, context_block
|
|
):
|
|
"""Test agent serialization with comprehensive setup including custom tools, blocks, messages, and archival memory."""
|
|
name = f"comprehensive_test_agent_{str(uuid.uuid4())}"
|
|
temp_agent = client.agents.create(
|
|
name=name,
|
|
memory_blocks=[persona_block, human_block, context_block],
|
|
model="openai/gpt-4.1-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
tool_ids=[fibonacci_tool.id, preferences_tool.id, data_analysis_tool.id],
|
|
include_base_tools=True,
|
|
tags=["test", "comprehensive", "serialization"],
|
|
system="You are a helpful assistant specializing in data analysis and mathematical computations.",
|
|
)
|
|
|
|
# Add archival memory
|
|
archival_passages = [
|
|
"Project background: Sarah is working on a financial prediction model that uses Fibonacci retracements for technical analysis.",
|
|
"Research notes: Golden ratio (1.618) derived from Fibonacci sequence is often used in financial markets for support/resistance levels.",
|
|
]
|
|
|
|
for passage_text in archival_passages:
|
|
client.agents.passages.create(agent_id=temp_agent.id, text=passage_text)
|
|
|
|
# Send some messages
|
|
client.agents.messages.create(
|
|
agent_id=temp_agent.id,
|
|
messages=[
|
|
MessageCreate(
|
|
role="user",
|
|
content="Test message",
|
|
),
|
|
],
|
|
)
|
|
|
|
# Serialize using v2
|
|
serialized_v2 = client.agents.export_file(agent_id=temp_agent.id, use_legacy_format=False)
|
|
# Convert dict to JSON bytes for import
|
|
json_str = json.dumps(serialized_v2)
|
|
file_obj = io.BytesIO(json_str.encode("utf-8"))
|
|
|
|
# Import again
|
|
import_result = client.agents.import_file(file=file_obj, append_copy_suffix=False, override_existing_tools=True)
|
|
|
|
# Verify import was successful
|
|
assert len(import_result.agent_ids) == 1, "Should have imported exactly one agent"
|
|
imported_agent_id = import_result.agent_ids[0]
|
|
imported_agent = client.agents.retrieve(agent_id=imported_agent_id)
|
|
|
|
# ========== BASIC AGENT PROPERTIES ==========
|
|
# Name should be the same (if append_copy_suffix=False) or have suffix
|
|
assert imported_agent.name == name, f"Agent name mismatch: {imported_agent.name} != {name}"
|
|
|
|
# LLM and embedding configs should be preserved
|
|
assert imported_agent.llm_config.model == temp_agent.llm_config.model, (
|
|
f"LLM model mismatch: {imported_agent.llm_config.model} != {temp_agent.llm_config.model}"
|
|
)
|
|
assert imported_agent.embedding_config.embedding_model == temp_agent.embedding_config.embedding_model, "Embedding model mismatch"
|
|
|
|
# System prompt should be preserved
|
|
assert imported_agent.system == temp_agent.system, "System prompt was not preserved"
|
|
|
|
# Tags should be preserved
|
|
assert set(imported_agent.tags) == set(temp_agent.tags), f"Tags mismatch: {imported_agent.tags} != {temp_agent.tags}"
|
|
|
|
# Agent type should be preserved
|
|
assert imported_agent.agent_type == temp_agent.agent_type, (
|
|
f"Agent type mismatch: {imported_agent.agent_type} != {temp_agent.agent_type}"
|
|
)
|
|
|
|
# ========== MEMORY BLOCKS ==========
|
|
# Compare memory blocks directly from AgentState objects
|
|
original_blocks = temp_agent.memory.blocks
|
|
imported_blocks = imported_agent.memory.blocks
|
|
|
|
# Should have same number of blocks
|
|
assert len(imported_blocks) == len(original_blocks), f"Block count mismatch: {len(imported_blocks)} != {len(original_blocks)}"
|
|
|
|
# Verify each block by label
|
|
original_blocks_by_label = {block.label: block for block in original_blocks}
|
|
imported_blocks_by_label = {block.label: block for block in imported_blocks}
|
|
|
|
# Check persona block
|
|
assert "persona" in imported_blocks_by_label, "Persona block missing in imported agent"
|
|
assert "Alex" in imported_blocks_by_label["persona"].value, "Persona block content not preserved"
|
|
assert imported_blocks_by_label["persona"].limit == original_blocks_by_label["persona"].limit, "Persona block limit mismatch"
|
|
|
|
# Check human block
|
|
assert "human" in imported_blocks_by_label, "Human block missing in imported agent"
|
|
assert "sarah_researcher" in imported_blocks_by_label["human"].value, "Human block content not preserved"
|
|
assert imported_blocks_by_label["human"].limit == original_blocks_by_label["human"].limit, "Human block limit mismatch"
|
|
|
|
# Check context block
|
|
assert "project_context" in imported_blocks_by_label, "Context block missing in imported agent"
|
|
assert "financial markets" in imported_blocks_by_label["project_context"].value, "Context block content not preserved"
|
|
assert imported_blocks_by_label["project_context"].limit == original_blocks_by_label["project_context"].limit, (
|
|
"Context block limit mismatch"
|
|
)
|
|
|
|
# ========== TOOLS ==========
|
|
# Compare tools directly from AgentState objects
|
|
original_tools = temp_agent.tools
|
|
imported_tools = imported_agent.tools
|
|
|
|
# Should have same number of tools
|
|
assert len(imported_tools) == len(original_tools), f"Tool count mismatch: {len(imported_tools)} != {len(original_tools)}"
|
|
|
|
original_tool_names = {tool.name for tool in original_tools}
|
|
imported_tool_names = {tool.name for tool in imported_tools}
|
|
|
|
# Check custom tools are present
|
|
assert "calculate_fibonacci" in imported_tool_names, "Fibonacci tool missing in imported agent"
|
|
assert "get_user_preferences" in imported_tool_names, "Preferences tool missing in imported agent"
|
|
assert "analyze_data" in imported_tool_names, "Data analysis tool missing in imported agent"
|
|
|
|
# Check for base tools (since we set include_base_tools=True when creating the agent)
|
|
# Base tools should also be present (at least some core ones)
|
|
base_tool_names = {"send_message", "conversation_search"}
|
|
missing_base_tools = base_tool_names - imported_tool_names
|
|
assert len(missing_base_tools) == 0, f"Missing base tools: {missing_base_tools}"
|
|
|
|
# Verify tool names match exactly
|
|
assert original_tool_names == imported_tool_names, f"Tool names don't match: {original_tool_names} != {imported_tool_names}"
|
|
|
|
# ========== MESSAGE HISTORY ==========
|
|
# Get messages for both agents
|
|
original_messages = client.agents.messages.list(agent_id=temp_agent.id, limit=100)
|
|
imported_messages = client.agents.messages.list(agent_id=imported_agent_id, limit=100)
|
|
|
|
# Should have same number of messages
|
|
assert len(imported_messages) >= 1, "Imported agent should have messages"
|
|
|
|
# Filter for user messages (excluding system-generated login messages)
|
|
original_user_msgs = [msg for msg in original_messages if msg.message_type == "user_message" and "Test message" in msg.content]
|
|
imported_user_msgs = [msg for msg in imported_messages if msg.message_type == "user_message" and "Test message" in msg.content]
|
|
|
|
# Should have the same number of test messages
|
|
assert len(imported_user_msgs) == len(original_user_msgs), (
|
|
f"User message count mismatch: {len(imported_user_msgs)} != {len(original_user_msgs)}"
|
|
)
|
|
|
|
# Verify test message content is preserved
|
|
if len(original_user_msgs) > 0 and len(imported_user_msgs) > 0:
|
|
assert imported_user_msgs[0].content == original_user_msgs[0].content, "User message content not preserved"
|
|
assert "Test message" in imported_user_msgs[0].content, "Test message content not found"
|
|
|
|
|
|
def test_export_import_agent_with_files(client: LettaSDKClient):
|
|
"""Test exporting and importing an agent with files attached."""
|
|
|
|
# Clean up any existing source with the same name from previous runs
|
|
existing_sources = client.sources.list()
|
|
for existing_source in existing_sources:
|
|
client.sources.delete(source_id=existing_source.id)
|
|
|
|
# Create a source and upload test files
|
|
source = client.sources.create(name="test_export_source", embedding="openai/text-embedding-3-small")
|
|
|
|
# Upload test files to the source
|
|
test_files = ["tests/data/test.txt", "tests/data/test.md"]
|
|
|
|
for file_path in test_files:
|
|
upload_file_and_wait(client, source.id, file_path)
|
|
|
|
# Verify files were uploaded successfully
|
|
files_in_source = client.sources.files.list(source_id=source.id, limit=10)
|
|
assert len(files_in_source) == len(test_files), f"Expected {len(test_files)} files, got {len(files_in_source)}"
|
|
|
|
# Create a simple agent with the source attached
|
|
temp_agent = client.agents.create(
|
|
memory_blocks=[
|
|
CreateBlock(label="human", value="username: sarah"),
|
|
],
|
|
model="openai/gpt-4o-mini",
|
|
embedding="openai/text-embedding-3-small",
|
|
source_ids=[source.id], # Attach the source with files
|
|
)
|
|
|
|
# Verify the agent has the source and file blocks
|
|
agent_state = client.agents.retrieve(agent_id=temp_agent.id)
|
|
assert len(agent_state.sources) == 1, "Agent should have one source attached"
|
|
assert agent_state.sources[0].id == source.id, "Agent should have the correct source attached"
|
|
|
|
# Verify file blocks are present
|
|
file_blocks = agent_state.memory.file_blocks
|
|
assert len(file_blocks) == len(test_files), f"Expected {len(test_files)} file blocks, got {len(file_blocks)}"
|
|
|
|
# Export the agent
|
|
serialized_agent = client.agents.export_file(agent_id=temp_agent.id, use_legacy_format=False)
|
|
|
|
# Convert to JSON bytes for import
|
|
json_str = json.dumps(serialized_agent)
|
|
file_obj = io.BytesIO(json_str.encode("utf-8"))
|
|
|
|
# Import the agent
|
|
import_result = client.agents.import_file(file=file_obj, append_copy_suffix=True, override_existing_tools=True)
|
|
|
|
# Verify import was successful
|
|
assert len(import_result.agent_ids) == 1, "Should have imported exactly one agent"
|
|
imported_agent_id = import_result.agent_ids[0]
|
|
imported_agent = client.agents.retrieve(agent_id=imported_agent_id)
|
|
|
|
# Verify the source is attached to the imported agent
|
|
assert len(imported_agent.sources) == 1, "Imported agent should have one source attached"
|
|
imported_source = imported_agent.sources[0]
|
|
|
|
# Check that imported source has the same files
|
|
imported_files = client.sources.files.list(source_id=imported_source.id, limit=10)
|
|
assert len(imported_files) == len(test_files), f"Imported source should have {len(test_files)} files"
|
|
|
|
# Verify file blocks are preserved in imported agent
|
|
imported_file_blocks = imported_agent.memory.file_blocks
|
|
assert len(imported_file_blocks) == len(test_files), f"Imported agent should have {len(test_files)} file blocks"
|
|
|
|
# Verify file block content
|
|
for file_block in imported_file_blocks:
|
|
assert file_block.value is not None and len(file_block.value) > 0, "Imported file block should have content"
|
|
assert "[Viewing file start" in file_block.value, "Imported file block should show file viewing header"
|
|
|
|
# Test that files can be opened on the imported agent
|
|
if len(imported_files) > 0:
|
|
test_file = imported_files[0]
|
|
client.agents.files.open(agent_id=imported_agent_id, file_id=test_file.id)
|
|
|
|
# Clean up
|
|
client.agents.delete(agent_id=temp_agent.id)
|
|
client.agents.delete(agent_id=imported_agent_id)
|
|
client.sources.delete(source_id=source.id)
|
|
|
|
|
|
def test_import_agent_with_files_from_disk(client: LettaSDKClient):
|
|
"""Test exporting an agent with files to disk and importing it back."""
|
|
# Upload test files to the source
|
|
test_files = ["tests/data/test.txt", "tests/data/test.md"]
|
|
|
|
# Save to file
|
|
file_path = os.path.join(os.path.dirname(__file__), "test_agent_files", "test_agent_with_files_and_sources.af")
|
|
|
|
# Now import from the file
|
|
with open(file_path, "rb") as f:
|
|
import_result = client.agents.import_file(
|
|
file=f,
|
|
append_copy_suffix=True,
|
|
override_existing_tools=True, # Use suffix to avoid name conflict
|
|
)
|
|
|
|
# Verify import was successful
|
|
assert len(import_result.agent_ids) == 1, "Should have imported exactly one agent"
|
|
imported_agent_id = import_result.agent_ids[0]
|
|
imported_agent = client.agents.retrieve(agent_id=imported_agent_id)
|
|
|
|
# Verify the source is attached to the imported agent
|
|
assert len(imported_agent.sources) == 1, "Imported agent should have one source attached"
|
|
imported_source = imported_agent.sources[0]
|
|
|
|
# Check that imported source has the same files
|
|
imported_files = client.sources.files.list(source_id=imported_source.id, limit=10)
|
|
assert len(imported_files) == len(test_files), f"Imported source should have {len(test_files)} files"
|
|
|
|
# Verify file blocks are preserved in imported agent
|
|
imported_file_blocks = imported_agent.memory.file_blocks
|
|
assert len(imported_file_blocks) == len(test_files), f"Imported agent should have {len(test_files)} file blocks"
|
|
|
|
# Verify file block content
|
|
for file_block in imported_file_blocks:
|
|
assert file_block.value is not None and len(file_block.value) > 0, "Imported file block should have content"
|
|
assert "[Viewing file start" in file_block.value, "Imported file block should show file viewing header"
|
|
|
|
# Test that files can be opened on the imported agent
|
|
if len(imported_files) > 0:
|
|
test_file = imported_files[0]
|
|
client.agents.files.open(agent_id=imported_agent_id, file_id=test_file.id)
|
|
|
|
# Clean up agents and sources
|
|
client.agents.delete(agent_id=imported_agent_id)
|
|
client.sources.delete(source_id=imported_source.id)
|
|
|
|
|
|
def test_upsert_tools(client: LettaSDKClient):
|
|
"""Test upserting tools with complex schemas."""
|
|
from typing import List
|
|
|
|
class WriteReasonOffer(BaseModel):
|
|
biltMerchantId: str = Field(..., description="The merchant ID (e.g. 'MERCHANT_NETWORK-123' or 'LYFT')")
|
|
campaignId: str = Field(
|
|
...,
|
|
description="The campaign ID (e.g. '550e8400-e29b-41d4-a716-446655440000' or '550e8400-e29b-41d4-a716-446655440000_123e4567-e89b-12d3-a456-426614174000')",
|
|
)
|
|
reason: str = Field(
|
|
...,
|
|
description="A detailed explanation of why this offer is relevant to the user. Refer to the category-specific reason_instructions_{category} block for all guidelines on creating personalized reasons.",
|
|
)
|
|
|
|
class WriteReasonArgs(BaseModel):
|
|
"""Arguments for the write_reason tool."""
|
|
|
|
offer_list: List[WriteReasonOffer] = Field(
|
|
...,
|
|
description="List of WriteReasonOffer objects with merchant and campaign information",
|
|
)
|
|
|
|
def write_reason(offer_list: List[WriteReasonOffer]):
|
|
"""
|
|
This tool is used to write detailed reasons for a list of offers.
|
|
It returns the essential information: biltMerchantId, campaignId, and reason.
|
|
|
|
IMPORTANT: When generating reasons, you MUST ONLY follow the guidelines in the
|
|
category-specific instruction block named "reason_instructions_{category}" where
|
|
{category} is the category of the offer (e.g., dining, travel, shopping).
|
|
|
|
These instruction blocks contain all the necessary guidelines for creating
|
|
personalized, detailed reasons for each category. Do not rely on any other
|
|
instructions outside of these blocks.
|
|
|
|
Args:
|
|
offer_list: List of WriteReasonOffer objects, each containing:
|
|
- biltMerchantId: The merchant ID (e.g. 'MERCHANT_NETWORK-123' or 'LYFT')
|
|
- campaignId: The campaign ID (e.g. '124', '28')
|
|
- reason: A detailed explanation generated according to the category-specific reason_instructions_{category} block
|
|
|
|
Returns:
|
|
None: This function prints the offer list but does not return a value.
|
|
"""
|
|
print(offer_list)
|
|
|
|
tool = client.tools.upsert_from_function(func=write_reason, args_schema=WriteReasonArgs)
|
|
assert tool is not None
|
|
assert tool.name == "write_reason"
|
|
|
|
# Clean up
|
|
client.tools.delete(tool.id)
|