Files
letta-server/tests/test_sdk_client.py

1525 lines
50 KiB
Python

import json
import os
import threading
import time
import uuid
from typing import List, Type
import pytest
from dotenv import load_dotenv
from letta_client import CreateBlock
from letta_client import Letta as LettaSDKClient
from letta_client import LettaRequest, MessageCreate, TextContent
from letta_client.client import BaseTool
from letta_client.core import ApiError
from letta_client.types import AgentState, ToolReturnMessage
from pydantic import BaseModel, Field
# Constants
SERVER_PORT = 8283
def run_server():
load_dotenv()
from letta.server.rest_api.app import start_server
print("Starting server...")
start_server(debug=True)
@pytest.fixture(scope="module")
def client() -> LettaSDKClient:
# Get URL from environment or start server
server_url = os.getenv("LETTA_SERVER_URL", f"http://localhost:{SERVER_PORT}")
if not os.getenv("LETTA_SERVER_URL"):
print("Starting server thread")
thread = threading.Thread(target=run_server, daemon=True)
thread.start()
time.sleep(5)
print("Running client tests with server:", server_url)
client = LettaSDKClient(base_url=server_url, token=None)
yield client
@pytest.fixture(scope="module")
def agent(client: LettaSDKClient):
agent_state = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value="username: sarah",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
yield agent_state
# delete agent
client.agents.delete(agent_id=agent_state.id)
def test_shared_blocks(client: LettaSDKClient):
# create a block
block = client.blocks.create(
label="human",
value="username: sarah",
)
# create agents with shared block
agent_state1 = client.agents.create(
name="agent1",
memory_blocks=[
CreateBlock(
label="persona",
value="you are agent 1",
),
],
block_ids=[block.id],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
agent_state2 = client.agents.create(
name="agent2",
memory_blocks=[
CreateBlock(
label="persona",
value="you are agent 2",
),
],
block_ids=[block.id],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
# update memory
client.agents.messages.create(
agent_id=agent_state1.id,
messages=[
MessageCreate(
role="user",
content="my name is actually charles",
)
],
)
# check agent 2 memory
block_value = client.blocks.retrieve(block_id=block.id).value
assert "charles" in block_value.lower(), f"Shared block update failed {block_value}"
client.agents.messages.create(
agent_id=agent_state2.id,
messages=[
MessageCreate(
role="user",
content="whats my name?",
)
],
)
block_value = client.agents.blocks.retrieve(agent_id=agent_state2.id, block_label="human").value
assert "charles" in block_value.lower(), f"Shared block update failed {block_value}"
# cleanup
client.agents.delete(agent_state1.id)
client.agents.delete(agent_state2.id)
def test_read_only_block(client: LettaSDKClient):
block_value = "username: sarah"
agent = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value=block_value,
read_only=True,
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
# make sure agent cannot update read-only block
client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content="my name is actually charles",
)
],
)
# make sure block value is still the same
block = client.agents.blocks.retrieve(agent_id=agent.id, block_label="human")
assert block.value == block_value
# make sure can update from client
new_value = "hello"
client.agents.blocks.modify(agent_id=agent.id, block_label="human", value=new_value)
block = client.agents.blocks.retrieve(agent_id=agent.id, block_label="human")
assert block.value == new_value
# cleanup
client.agents.delete(agent.id)
def test_add_and_manage_tags_for_agent(client: LettaSDKClient):
"""
Comprehensive happy path test for adding, retrieving, and managing tags on an agent.
"""
tags_to_add = ["test_tag_1", "test_tag_2", "test_tag_3"]
# Step 0: create an agent with no tags
agent = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value="username: sarah",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
assert len(agent.tags) == 0
# Step 1: Add multiple tags to the agent
client.agents.modify(agent_id=agent.id, tags=tags_to_add)
# Step 2: Retrieve tags for the agent and verify they match the added tags
retrieved_tags = client.agents.retrieve(agent_id=agent.id).tags
assert set(retrieved_tags) == set(tags_to_add), f"Expected tags {tags_to_add}, but got {retrieved_tags}"
# Step 3: Retrieve agents by each tag to ensure the agent is associated correctly
for tag in tags_to_add:
agents_with_tag = client.agents.list(tags=[tag])
assert agent.id in [a.id for a in agents_with_tag], f"Expected agent {agent.id} to be associated with tag '{tag}'"
# Step 4: Delete a specific tag from the agent and verify its removal
tag_to_delete = tags_to_add.pop()
client.agents.modify(agent_id=agent.id, tags=tags_to_add)
# Verify the tag is removed from the agent's tags
remaining_tags = client.agents.retrieve(agent_id=agent.id).tags
assert tag_to_delete not in remaining_tags, f"Tag '{tag_to_delete}' was not removed as expected"
assert set(remaining_tags) == set(tags_to_add), f"Expected remaining tags to be {tags_to_add[1:]}, but got {remaining_tags}"
# Step 5: Delete all remaining tags from the agent
client.agents.modify(agent_id=agent.id, tags=[])
# Verify all tags are removed
final_tags = client.agents.retrieve(agent_id=agent.id).tags
assert len(final_tags) == 0, f"Expected no tags, but found {final_tags}"
# Remove agent
client.agents.delete(agent.id)
def test_agent_tags(client: LettaSDKClient):
"""Test creating agents with tags and retrieving tags via the API."""
# Clear all agents
all_agents = client.agents.list()
for agent in all_agents:
client.agents.delete(agent.id)
# Create multiple agents with different tags
agent1 = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value="username: sarah",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
tags=["test", "agent1", "production"],
)
agent2 = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value="username: sarah",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
tags=["test", "agent2", "development"],
)
agent3 = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value="username: sarah",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
tags=["test", "agent3", "production"],
)
# Test getting all tags
all_tags = client.tags.list()
expected_tags = ["agent1", "agent2", "agent3", "development", "production", "test"]
assert sorted(all_tags) == expected_tags
# Test pagination
paginated_tags = client.tags.list(limit=2)
assert len(paginated_tags) == 2
assert paginated_tags[0] == "agent1"
assert paginated_tags[1] == "agent2"
# Test pagination with cursor
next_page_tags = client.tags.list(after="agent2", limit=2)
assert len(next_page_tags) == 2
assert next_page_tags[0] == "agent3"
assert next_page_tags[1] == "development"
# Test text search
prod_tags = client.tags.list(query_text="prod")
assert sorted(prod_tags) == ["production"]
dev_tags = client.tags.list(query_text="dev")
assert sorted(dev_tags) == ["development"]
agent_tags = client.tags.list(query_text="agent")
assert sorted(agent_tags) == ["agent1", "agent2", "agent3"]
# Remove agents
client.agents.delete(agent1.id)
client.agents.delete(agent2.id)
client.agents.delete(agent3.id)
def test_update_agent_memory_label(client: LettaSDKClient, agent: AgentState):
"""Test that we can update the label of a block in an agent's memory"""
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
example_label = current_labels[0]
example_new_label = "example_new_label"
assert example_new_label not in current_labels
client.agents.blocks.modify(
agent_id=agent.id,
block_label=example_label,
label=example_new_label,
)
updated_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_new_label)
assert updated_block.label == example_new_label
def test_add_remove_agent_memory_block(client: LettaSDKClient, agent: AgentState):
"""Test that we can add and remove a block from an agent's memory"""
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
example_new_label = current_labels[0] + "_v2"
example_new_value = "example value"
assert example_new_label not in current_labels
# Link a new memory block
block = client.blocks.create(
label=example_new_label,
value=example_new_value,
limit=1000,
)
client.agents.blocks.attach(
agent_id=agent.id,
block_id=block.id,
)
updated_block = client.agents.blocks.retrieve(
agent_id=agent.id,
block_label=example_new_label,
)
assert updated_block.value == example_new_value
# Now unlink the block
client.agents.blocks.detach(
agent_id=agent.id,
block_id=block.id,
)
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
assert example_new_label not in current_labels
def test_update_agent_memory_limit(client: LettaSDKClient, agent: AgentState):
"""Test that we can update the limit of a block in an agent's memory"""
current_labels = [block.label for block in client.agents.blocks.list(agent_id=agent.id)]
example_label = current_labels[0]
example_new_limit = 1
current_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label)
current_block_length = len(current_block.value)
assert example_new_limit != client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label).limit
assert example_new_limit < current_block_length
# We expect this to throw a value error
with pytest.raises(ApiError):
client.agents.blocks.modify(
agent_id=agent.id,
block_label=example_label,
limit=example_new_limit,
)
# Now try the same thing with a higher limit
example_new_limit = current_block_length + 10000
assert example_new_limit > current_block_length
client.agents.blocks.modify(
agent_id=agent.id,
block_label=example_label,
limit=example_new_limit,
)
assert example_new_limit == client.agents.blocks.retrieve(agent_id=agent.id, block_label=example_label).limit
def test_messages(client: LettaSDKClient, agent: AgentState):
send_message_response = client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content="Test message",
),
],
)
assert send_message_response, "Sending message failed"
messages_response = client.agents.messages.list(
agent_id=agent.id,
limit=1,
)
assert len(messages_response) > 0, "Retrieving messages failed"
def test_send_system_message(client: LettaSDKClient, agent: AgentState):
"""Important unit test since the Letta API exposes sending system messages, but some backends don't natively support it (eg Anthropic)"""
send_system_message_response = client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreate(
role="system",
content="Event occurred: The user just logged off.",
),
],
)
assert send_system_message_response, "Sending message failed"
def test_function_return_limit(disable_e2b_api_key, client: LettaSDKClient, agent: AgentState):
"""Test to see if the function return limit works"""
def big_return():
"""
Always call this tool.
Returns:
important_data (str): Important data
"""
return "x" * 100000
tool = client.tools.upsert_from_function(func=big_return, return_char_limit=1000)
client.agents.tools.attach(agent_id=agent.id, tool_id=tool.id)
# get function response
response = client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content="call the big_return function",
),
],
use_assistant_message=False,
)
response_message = None
for message in response.messages:
if isinstance(message, ToolReturnMessage):
response_message = message
break
assert response_message, "ToolReturnMessage message not found in response"
res = response_message.tool_return
assert "function output was truncated " in res
@pytest.mark.flaky(max_runs=3)
def test_function_always_error(client: LettaSDKClient, agent: AgentState):
"""Test to see if function that errors works correctly"""
def testing_method():
"""
A method that has test functionalit.
"""
return 5 / 0
tool = client.tools.upsert_from_function(func=testing_method, return_char_limit=1000)
client.agents.tools.attach(agent_id=agent.id, tool_id=tool.id)
# get function response
response = client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content="call the testing_method function and tell me the result",
),
],
)
response_message = None
for message in response.messages:
if isinstance(message, ToolReturnMessage):
response_message = message
break
assert response_message, "ToolReturnMessage message not found in response"
assert response_message.status == "error"
assert "Error executing function testing_method: ZeroDivisionError: division by zero" in response_message.tool_return
assert "ZeroDivisionError" in response_message.tool_return
# TODO: Add back when the new agent loop hits
# @pytest.mark.asyncio
# async def test_send_message_parallel(client: LettaSDKClient, agent: AgentState):
# """
# Test that sending two messages in parallel does not error.
# """
#
# # Define a coroutine for sending a message using asyncio.to_thread for synchronous calls
# async def send_message_task(message: str):
# response = await asyncio.to_thread(
# client.agents.messages.create,
# agent_id=agent.id,
# messages=[
# MessageCreate(
# role="user",
# content=message,
# ),
# ],
# )
# assert response, f"Sending message '{message}' failed"
# return response
#
# # Prepare two tasks with different messages
# messages = ["Test message 1", "Test message 2"]
# tasks = [send_message_task(message) for message in messages]
#
# # Run the tasks concurrently
# responses = await asyncio.gather(*tasks, return_exceptions=True)
#
# # Check for exceptions and validate responses
# for i, response in enumerate(responses):
# if isinstance(response, Exception):
# pytest.fail(f"Task {i} failed with exception: {response}")
# else:
# assert response, f"Task {i} returned an invalid response: {response}"
#
# # Ensure both tasks completed
# assert len(responses) == len(messages), "Not all messages were processed"
def test_agent_creation(client: LettaSDKClient):
"""Test that block IDs are properly attached when creating an agent."""
sleeptime_agent_system = """
You are a helpful agent. You will be provided with a list of memory blocks and a user preferences block.
You should use the memory blocks to remember information about the user and their preferences.
You should also use the user preferences block to remember information about the user's preferences.
"""
# Create a test block that will represent user preferences
user_preferences_block = client.blocks.create(
label="user_preferences",
value="",
limit=10000,
)
# Create test tools
def test_tool():
"""A simple test tool."""
return "Hello from test tool!"
def another_test_tool():
"""Another test tool."""
return "Hello from another test tool!"
tool1 = client.tools.upsert_from_function(func=test_tool, tags=["test"])
tool2 = client.tools.upsert_from_function(func=another_test_tool, tags=["test"])
# Create test blocks
sleeptime_persona_block = client.blocks.create(label="persona", value="persona description", limit=5000)
mindy_block = client.blocks.create(label="mindy", value="Mindy is a helpful assistant", limit=5000)
# Create agent with the blocks and tools
agent = client.agents.create(
name=f"test_agent_{str(uuid.uuid4())}",
memory_blocks=[sleeptime_persona_block, mindy_block],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
tool_ids=[tool1.id, tool2.id],
include_base_tools=False,
tags=["test"],
block_ids=[user_preferences_block.id],
)
# Verify the agent was created successfully
assert agent is not None
assert agent.id is not None
# Verify all memory blocks are properly attached
for block in [sleeptime_persona_block, mindy_block, user_preferences_block]:
agent_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label=block.label)
assert block.value == agent_block.value and block.limit == agent_block.limit
# Verify the tools are properly attached
agent_tools = client.agents.tools.list(agent_id=agent.id)
assert len(agent_tools) == 2
tool_ids = {tool1.id, tool2.id}
assert all(tool.id in tool_ids for tool in agent_tools)
def test_many_blocks(client: LettaSDKClient):
users = ["user1", "user2"]
# Create agent with the blocks
agent1 = client.agents.create(
name=f"test_agent_{str(uuid.uuid4())}",
memory_blocks=[
CreateBlock(
label="user1",
value="user preferences: loud",
),
CreateBlock(
label="user2",
value="user preferences: happy",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
include_base_tools=False,
tags=["test"],
)
agent2 = client.agents.create(
name=f"test_agent_{str(uuid.uuid4())}",
memory_blocks=[
CreateBlock(
label="user1",
value="user preferences: sneezy",
),
CreateBlock(
label="user2",
value="user preferences: lively",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
include_base_tools=False,
tags=["test"],
)
# Verify the agent was created successfully
assert agent1 is not None
assert agent2 is not None
# Verify all memory blocks are properly attached
for user in users:
agent_block = client.agents.blocks.retrieve(agent_id=agent1.id, block_label=user)
assert agent_block is not None
blocks = client.blocks.list(label=user)
assert len(blocks) == 2
for block in blocks:
client.blocks.delete(block.id)
client.agents.delete(agent1.id)
client.agents.delete(agent2.id)
# cases: steam, async, token stream, sync
@pytest.mark.parametrize("message_create", ["stream_step", "token_stream", "sync", "async"])
def test_include_return_message_types(client: LettaSDKClient, agent: AgentState, message_create: str):
"""Test that the include_return_message_types parameter works"""
def verify_message_types(messages, message_types):
for message in messages:
assert message.message_type in message_types
message = "My name is actually Sarah"
message_types = ["reasoning_message", "tool_call_message"]
agent = client.agents.create(
memory_blocks=[
CreateBlock(label="user", value="Name: Charles"),
],
model="letta/letta-free",
embedding="letta/letta-free",
)
if message_create == "stream_step":
response = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content=message,
),
],
include_return_message_types=message_types,
)
messages = [message for message in list(response) if message.message_type not in ["stop_reason", "usage_statistics"]]
verify_message_types(messages, message_types)
elif message_create == "async":
response = client.agents.messages.create_async(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content=message,
)
],
include_return_message_types=message_types,
)
# wait to finish
while response.status not in {"failed", "completed", "cancelled", "expired"}:
time.sleep(1)
response = client.runs.retrieve(run_id=response.id)
if response.status != "completed":
pytest.fail(f"Response status was NOT completed: {response}")
messages = client.runs.messages.list(run_id=response.id)
verify_message_types(messages, message_types)
elif message_create == "token_stream":
response = client.agents.messages.create_stream(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content=message,
),
],
include_return_message_types=message_types,
)
messages = [message for message in list(response) if message.message_type not in ["stop_reason", "usage_statistics"]]
verify_message_types(messages, message_types)
elif message_create == "sync":
response = client.agents.messages.create(
agent_id=agent.id,
messages=[
MessageCreate(
role="user",
content=message,
),
],
include_return_message_types=message_types,
)
messages = response.messages
verify_message_types(messages, message_types)
# cleanup
client.agents.delete(agent.id)
def test_base_tools_upsert_on_list(client: LettaSDKClient):
"""Test that base tools are automatically upserted when missing on tools list call"""
from letta.constants import LETTA_TOOL_SET
# First, get the initial list of tools to establish baseline
initial_tools = client.tools.list()
initial_tool_names = {tool.name for tool in initial_tools}
# Find which base tools might be missing initially
missing_base_tools = LETTA_TOOL_SET - initial_tool_names
# If all base tools are already present, we need to delete some to test the upsert functionality
# We'll delete a few base tools if they exist to create the condition for testing
tools_to_delete = []
if not missing_base_tools:
# Pick a few base tools to delete for testing
test_base_tools = ["send_message", "conversation_search"]
for tool_name in test_base_tools:
for tool in initial_tools:
if tool.name == tool_name:
tools_to_delete.append(tool)
client.tools.delete(tool_id=tool.id)
break
# Now call list_tools() which should trigger the base tools check and upsert
updated_tools = client.tools.list()
updated_tool_names = {tool.name for tool in updated_tools}
# Verify that all base tools are now present
missing_after_upsert = LETTA_TOOL_SET - updated_tool_names
assert not missing_after_upsert, f"Base tools still missing after upsert: {missing_after_upsert}"
# Verify that the base tools are actually in the list
for base_tool_name in LETTA_TOOL_SET:
assert base_tool_name in updated_tool_names, f"Base tool {base_tool_name} not found after upsert"
# Cleanup: restore any tools we deleted for testing (they should already be restored by the upsert)
# This is just a double-check that our test cleanup is proper
final_tools = client.tools.list()
final_tool_names = {tool.name for tool in final_tools}
for deleted_tool in tools_to_delete:
assert deleted_tool.name in final_tool_names, f"Deleted tool {deleted_tool.name} was not properly restored"
@pytest.mark.parametrize("e2b_sandbox_mode", [True, False], indirect=True)
def test_pydantic_inventory_management_tool(e2b_sandbox_mode, client: LettaSDKClient):
class InventoryItem(BaseModel):
sku: str
name: str
price: float
category: str
class InventoryEntry(BaseModel):
timestamp: int
item: InventoryItem
transaction_id: str
class InventoryEntryData(BaseModel):
data: InventoryEntry
quantity_change: int
class ManageInventoryTool(BaseTool):
name: str = "manage_inventory"
args_schema: Type[BaseModel] = InventoryEntryData
description: str = "Update inventory catalogue with a new data entry"
tags: List[str] = ["inventory", "shop"]
def run(self, data: InventoryEntry, quantity_change: int) -> bool:
print(f"Updated inventory for {data.item.name} with a quantity change of {quantity_change}")
return True
tool = client.tools.add(
tool=ManageInventoryTool(),
)
assert tool is not None
assert tool.name == "manage_inventory"
assert "inventory" in tool.tags
assert "shop" in tool.tags
temp_agent = client.agents.create(
memory_blocks=[
CreateBlock(
label="persona",
value="You are a helpful inventory management assistant.",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
tool_ids=[tool.id],
include_base_tools=False,
)
response = client.agents.messages.create(
agent_id=temp_agent.id,
messages=[
MessageCreate(
role="user",
content="Update the inventory for product 'iPhone 15' with SKU 'IPH15-001', price $999.99, category 'Electronics', transaction ID 'TXN-12345', timestamp 1640995200, with a quantity change of +10",
),
],
)
assert response is not None
tool_call_messages = [msg for msg in response.messages if msg.message_type == "tool_call_message"]
assert len(tool_call_messages) > 0, "Expected at least one tool call message"
first_tool_call = tool_call_messages[0]
assert first_tool_call.tool_call.name == "manage_inventory"
args = json.loads(first_tool_call.tool_call.arguments)
assert "data" in args
assert "quantity_change" in args
assert "item" in args["data"]
assert "name" in args["data"]["item"]
assert "sku" in args["data"]["item"]
assert "price" in args["data"]["item"]
assert "category" in args["data"]["item"]
assert "transaction_id" in args["data"]
assert "timestamp" in args["data"]
tool_return_messages = [msg for msg in response.messages if msg.message_type == "tool_return_message"]
assert len(tool_return_messages) > 0, "Expected at least one tool return message"
first_tool_return = tool_return_messages[0]
assert first_tool_return.status == "success"
assert first_tool_return.tool_return == "True"
assert "Updated inventory for iPhone 15 with a quantity change of 10" in "\n".join(first_tool_return.stdout)
client.agents.delete(temp_agent.id)
client.tools.delete(tool.id)
@pytest.mark.parametrize("e2b_sandbox_mode", [True, False], indirect=True)
def test_pydantic_task_planning_tool(e2b_sandbox_mode, client: LettaSDKClient):
class Step(BaseModel):
name: str = Field(..., description="Name of the step.")
description: str = Field(..., description="An exhaustive description of what this step is trying to achieve.")
class StepsList(BaseModel):
steps: List[Step] = Field(..., description="List of steps to add to the task plan.")
explanation: str = Field(..., description="Explanation for the list of steps.")
def create_task_plan(steps, explanation):
"""Creates a task plan for the current task."""
print(f"Created task plan with {len(steps)} steps: {explanation}")
return steps
tool = client.tools.upsert_from_function(func=create_task_plan, args_schema=StepsList, tags=["planning", "task", "pydantic_test"])
assert tool is not None
assert tool.name == "create_task_plan"
assert "planning" in tool.tags
assert "task" in tool.tags
temp_agent = client.agents.create(
memory_blocks=[
CreateBlock(
label="persona",
value="You are a helpful task planning assistant.",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
tool_ids=[tool.id],
include_base_tools=False,
)
response = client.agents.messages.create(
agent_id=temp_agent.id,
messages=[
MessageCreate(
role="user",
content="Create a task plan for organizing a team meeting with 3 steps: 1) Schedule meeting (find available time slots), 2) Send invitations (notify all team members), 3) Prepare agenda (outline discussion topics). Explanation: This plan ensures a well-organized team meeting.",
),
],
)
assert response is not None
assert hasattr(response, "messages")
assert len(response.messages) > 0
tool_call_messages = [msg for msg in response.messages if msg.message_type == "tool_call_message"]
assert len(tool_call_messages) > 0, "Expected at least one tool call message"
first_tool_call = tool_call_messages[0]
assert first_tool_call.tool_call.name == "create_task_plan"
args = json.loads(first_tool_call.tool_call.arguments)
assert "steps" in args
assert "explanation" in args
assert isinstance(args["steps"], list)
assert len(args["steps"]) > 0
for step in args["steps"]:
assert "name" in step
assert "description" in step
tool_return_messages = [msg for msg in response.messages if msg.message_type == "tool_return_message"]
assert len(tool_return_messages) > 0, "Expected at least one tool return message"
first_tool_return = tool_return_messages[0]
assert first_tool_return.status == "success"
client.agents.delete(temp_agent.id)
client.tools.delete(tool.id)
@pytest.mark.parametrize("e2b_sandbox_mode", [True, False], indirect=True)
def test_create_tool_from_function_with_docstring(e2b_sandbox_mode, client: LettaSDKClient):
"""Test creating a tool from a function with a docstring using create_from_function"""
def roll_dice() -> str:
"""
Simulate the roll of a 20-sided die (d20).
This function generates a random integer between 1 and 20, inclusive,
which represents the outcome of a single roll of a d20.
Returns:
str: The result of the die roll.
"""
import random
dice_role_outcome = random.randint(1, 20)
output_string = f"You rolled a {dice_role_outcome}"
return output_string
tool = client.tools.create_from_function(func=roll_dice)
assert tool is not None
assert tool.name == "roll_dice"
assert "Simulate the roll of a 20-sided die" in tool.description
assert tool.source_code is not None
assert "random.randint(1, 20)" in tool.source_code
all_tools = client.tools.list()
tool_names = [t.name for t in all_tools]
assert "roll_dice" in tool_names
client.tools.delete(tool.id)
def test_preview_payload(client: LettaSDKClient):
temp_agent = client.agents.create(
memory_blocks=[
CreateBlock(
label="human",
value="username: sarah",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
)
try:
payload = client.agents.messages.preview_raw_payload(
agent_id=temp_agent.id,
request=LettaRequest(
messages=[
MessageCreate(
role="user",
content=[
TextContent(
text="text",
)
],
)
],
),
)
assert isinstance(payload, dict)
assert "model" in payload
assert "messages" in payload
assert "tools" in payload
assert "frequency_penalty" in payload
assert "max_completion_tokens" in payload
assert "temperature" in payload
assert "user" in payload
assert "parallel_tool_calls" in payload
assert "tool_choice" in payload
assert payload["model"] == "gpt-4o-mini"
assert isinstance(payload["messages"], list)
assert len(payload["messages"]) >= 3
system_message = payload["messages"][0]
assert system_message["role"] == "system"
assert "base_instructions" in system_message["content"]
assert "memory_blocks" in system_message["content"]
assert "tool_usage_rules" in system_message["content"]
assert "Letta" in system_message["content"]
assert isinstance(payload["tools"], list)
assert len(payload["tools"]) > 0
for tool in payload["tools"]:
assert tool["type"] == "function"
assert "function" in tool
assert "name" in tool["function"]
assert "description" in tool["function"]
assert "parameters" in tool["function"]
assert tool["function"]["strict"] is True
assert payload["frequency_penalty"] == 1.0
assert payload["max_completion_tokens"] == 4096
assert payload["temperature"] == 0.7
assert payload["parallel_tool_calls"] is False
assert payload["tool_choice"] == "required"
assert payload["user"].startswith("user-")
print(payload)
finally:
# Clean up the agent
client.agents.delete(agent_id=temp_agent.id)
def test_agent_tools_list(client: LettaSDKClient):
"""Test the optimized agent tools list endpoint for correctness."""
# Create a test agent
agent_state = client.agents.create(
name="test_agent_tools_list",
memory_blocks=[
CreateBlock(
label="persona",
value="You are a helpful assistant.",
),
],
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
include_base_tools=True,
)
try:
# Test basic functionality
tools = client.agents.tools.list(agent_id=agent_state.id)
assert len(tools) > 0, "Agent should have base tools attached"
# Verify tool objects have expected attributes
for tool in tools:
assert hasattr(tool, "id"), "Tool should have id attribute"
assert hasattr(tool, "name"), "Tool should have name attribute"
assert tool.id is not None, "Tool id should not be None"
assert tool.name is not None, "Tool name should not be None"
finally:
# Clean up
client.agents.delete(agent_id=agent_state.id)
def test_update_tool_source_code_changes_name(client: LettaSDKClient):
"""Test that updating a tool's source code correctly changes its name"""
import textwrap
# Create initial tool
def initial_tool(x: int) -> int:
"""
Multiply a number by 2
Args:
x: The input number
Returns:
The input multiplied by 2
"""
return x * 2
# Create the tool
tool = client.tools.upsert_from_function(func=initial_tool)
assert tool.name == "initial_tool"
try:
# Define new function source code with different name
new_source_code = textwrap.dedent(
"""
def updated_tool(x: int, y: int) -> int:
'''
Add two numbers together
Args:
x: First number
y: Second number
Returns:
Sum of x and y
'''
return x + y
"""
).strip()
# Update the tool's source code
updated = client.tools.modify(tool_id=tool.id, source_code=new_source_code)
# Verify the name changed
assert updated.name == "updated_tool"
assert updated.source_code == new_source_code
# Verify the schema was updated for the new parameters
assert updated.json_schema is not None
assert updated.json_schema["name"] == "updated_tool"
assert updated.json_schema["description"] == "Add two numbers together"
# Check parameters
params = updated.json_schema.get("parameters", {})
properties = params.get("properties", {})
assert "x" in properties
assert "y" in properties
assert properties["x"]["type"] == "integer"
assert properties["y"]["type"] == "integer"
assert properties["x"]["description"] == "First number"
assert properties["y"]["description"] == "Second number"
assert params["required"] == ["x", "y"]
finally:
# Clean up
client.tools.delete(tool_id=tool.id)
def test_update_tool_source_code_duplicate_name_error(client: LettaSDKClient):
"""Test that updating a tool's source code to have the same name as another existing tool raises an error"""
import textwrap
# Create first tool
def first_tool(x: int) -> int:
"""
Multiply a number by 2
Args:
x: The input number
Returns:
The input multiplied by 2
"""
return x * 2
# Create second tool
def second_tool(x: int) -> int:
"""
Multiply a number by 3
Args:
x: The input number
Returns:
The input multiplied by 3
"""
return x * 3
# Create both tools
tool1 = client.tools.upsert_from_function(func=first_tool)
tool2 = client.tools.upsert_from_function(func=second_tool)
assert tool1.name == "first_tool"
assert tool2.name == "second_tool"
try:
# Try to update second_tool to have the same name as first_tool
new_source_code = textwrap.dedent(
"""
def first_tool(x: int) -> int:
'''
Multiply a number by 4
Args:
x: The input number
Returns:
The input multiplied by 4
'''
return x * 4
"""
).strip()
# This should raise an error since first_tool already exists
with pytest.raises(Exception) as exc_info:
client.tools.modify(tool_id=tool2.id, source_code=new_source_code)
# Verify the error message indicates duplicate name
error_message = str(exc_info.value)
assert "already exists" in error_message.lower() or "duplicate" in error_message.lower() or "conflict" in error_message.lower()
# Verify that tool2 was not modified
tool2_check = client.tools.retrieve(tool_id=tool2.id)
assert tool2_check.name == "second_tool" # Name should remain unchanged
finally:
# Clean up both tools
client.tools.delete(tool_id=tool1.id)
client.tools.delete(tool_id=tool2.id)
def test_add_tool_with_multiple_functions_in_source_code(client: LettaSDKClient):
"""Test adding a tool with multiple functions in the source code"""
import textwrap
# Define source code with multiple functions
source_code = textwrap.dedent(
"""
def helper_function(x: int) -> int:
'''
Helper function that doubles the input
Args:
x: The input number
Returns:
The input multiplied by 2
'''
return x * 2
def another_helper(text: str) -> str:
'''
Another helper that uppercases text
Args:
text: The input text to uppercase
Returns:
The uppercased text
'''
return text.upper()
def main_function(x: int, y: int) -> int:
'''
Main function that uses the helper
Args:
x: First number
y: Second number
Returns:
Result of (x * 2) + y
'''
doubled_x = helper_function(x)
return doubled_x + y
"""
).strip()
# Create the tool with multiple functions
tool = client.tools.create(
source_code=source_code,
)
try:
# Verify the tool was created
assert tool is not None
assert tool.name == "main_function"
assert tool.source_code == source_code
# Verify the JSON schema was generated for the main function
assert tool.json_schema is not None
assert tool.json_schema["name"] == "main_function"
assert tool.json_schema["description"] == "Main function that uses the helper"
# Check parameters
params = tool.json_schema.get("parameters", {})
properties = params.get("properties", {})
assert "x" in properties
assert "y" in properties
assert properties["x"]["type"] == "integer"
assert properties["y"]["type"] == "integer"
assert params["required"] == ["x", "y"]
# Test that we can retrieve the tool
retrieved_tool = client.tools.retrieve(tool_id=tool.id)
assert retrieved_tool.name == "main_function"
assert retrieved_tool.source_code == source_code
finally:
# Clean up
client.tools.delete(tool_id=tool.id)
def test_tool_name_auto_update_with_multiple_functions(client: LettaSDKClient):
"""Test that tool name auto-updates when source code changes with multiple functions"""
import textwrap
# Initial source code with multiple functions
initial_source_code = textwrap.dedent(
"""
def helper_function(x: int) -> int:
'''
Helper function that doubles the input
Args:
x: The input number
Returns:
The input multiplied by 2
'''
return x * 2
def another_helper(text: str) -> str:
'''
Another helper that uppercases text
Args:
text: The input text to uppercase
Returns:
The uppercased text
'''
return text.upper()
def main_function(x: int, y: int) -> int:
'''
Main function that uses the helper
Args:
x: First number
y: Second number
Returns:
Result of (x * 2) + y
'''
doubled_x = helper_function(x)
return doubled_x + y
"""
).strip()
# Create tool with initial source code
tool = client.tools.create(
source_code=initial_source_code,
)
try:
# Verify the tool was created with the last function's name
assert tool is not None
assert tool.name == "main_function"
assert tool.source_code == initial_source_code
# Now modify the source code with a different function order
new_source_code = textwrap.dedent(
"""
def process_data(data: str, count: int) -> str:
'''
Process data by repeating it
Args:
data: The input data
count: Number of times to repeat
Returns:
The processed data
'''
return data * count
def helper_utility(x: float) -> float:
'''
Helper utility function
Args:
x: Input value
Returns:
Squared value
'''
return x * x
"""
).strip()
# Modify the tool with new source code
modified_tool = client.tools.modify(tool_id=tool.id, source_code=new_source_code)
# Verify the name automatically updated to the last function
assert modified_tool.name == "helper_utility"
assert modified_tool.source_code == new_source_code
# Verify the JSON schema updated correctly
assert modified_tool.json_schema is not None
assert modified_tool.json_schema["name"] == "helper_utility"
assert modified_tool.json_schema["description"] == "Helper utility function"
# Check parameters updated correctly
params = modified_tool.json_schema.get("parameters", {})
properties = params.get("properties", {})
assert "x" in properties
assert properties["x"]["type"] == "number" # float maps to number
assert params["required"] == ["x"]
# Test one more modification with only one function
single_function_code = textwrap.dedent(
"""
def calculate_total(items: list, tax_rate: float) -> float:
'''
Calculate total with tax
Args:
items: List of item prices
tax_rate: Tax rate as decimal
Returns:
Total including tax
'''
subtotal = sum(items)
return subtotal * (1 + tax_rate)
"""
).strip()
# Modify again
final_tool = client.tools.modify(tool_id=tool.id, source_code=single_function_code)
# Verify name updated again
assert final_tool.name == "calculate_total"
assert final_tool.source_code == single_function_code
assert final_tool.json_schema["description"] == "Calculate total with tax"
finally:
# Clean up
client.tools.delete(tool_id=tool.id)
def test_tool_rename_with_json_schema_and_source_code(client: LettaSDKClient):
"""Test that passing both new JSON schema AND source code still renames the tool based on source code"""
import textwrap
# Create initial tool
def initial_tool(x: int) -> int:
"""
Multiply a number by 2
Args:
x: The input number
Returns:
The input multiplied by 2
"""
return x * 2
# Create the tool
tool = client.tools.upsert_from_function(func=initial_tool)
assert tool.name == "initial_tool"
try:
# Define new function source code with different name
new_source_code = textwrap.dedent(
"""
def renamed_function(value: float, multiplier: float = 2.0) -> float:
'''
Multiply a value by a multiplier
Args:
value: The input value
multiplier: The multiplier to use (default 2.0)
Returns:
The value multiplied by the multiplier
'''
return value * multiplier
"""
).strip()
# Create a custom JSON schema that has a different name
custom_json_schema = {
"name": "custom_schema_name",
"description": "Custom description from JSON schema",
"parameters": {
"type": "object",
"properties": {
"value": {"type": "number", "description": "Input value from JSON schema"},
"multiplier": {"type": "number", "description": "Multiplier from JSON schema", "default": 2.0},
},
"required": ["value"],
},
}
# Modify the tool with both new source code AND JSON schema
modified_tool = client.tools.modify(tool_id=tool.id, source_code=new_source_code, json_schema=custom_json_schema)
# Verify the name comes from the source code function name, not the JSON schema
assert modified_tool.name == "renamed_function"
assert modified_tool.source_code == new_source_code
# Verify the JSON schema was updated to match the function name from source code
assert modified_tool.json_schema is not None
assert modified_tool.json_schema["name"] == "renamed_function"
# The description should come from the source code docstring, not the JSON schema
assert modified_tool.json_schema["description"] == "Multiply a value by a multiplier"
# Verify parameters are from the source code, not the custom JSON schema
params = modified_tool.json_schema.get("parameters", {})
properties = params.get("properties", {})
assert "value" in properties
assert "multiplier" in properties
assert properties["value"]["type"] == "number"
assert properties["multiplier"]["type"] == "number"
assert params["required"] == ["value"]
finally:
# Clean up
client.tools.delete(tool_id=tool.id)