fix: move all attach detach to be under agents (#723)

Co-authored-by: Mindy Long <mindy@letta.com>
This commit is contained in:
mlong93
2025-01-22 12:16:31 -08:00
committed by GitHub
parent 50de3cb4b7
commit c9e18e66f1
8 changed files with 418 additions and 350 deletions

View File

@@ -13,7 +13,6 @@ from sqlalchemy import delete
from letta import LocalClient, RESTClient, create_client
from letta.orm import SandboxConfig, SandboxEnvironmentVariable
from letta.schemas.agent import AgentState
from letta.schemas.block import CreateBlock
from letta.schemas.embedding_config import EmbeddingConfig
from letta.schemas.enums import MessageRole
from letta.schemas.job import JobStatus
@@ -113,46 +112,6 @@ def clear_tables():
session.commit()
def test_shared_blocks(mock_e2b_api_key_none, client: Union[LocalClient, RESTClient]):
# _reset_config()
# create a block
block = client.create_block(label="human", value="username: sarah")
# create agents with shared block
from letta.schemas.block import Block
from letta.schemas.memory import BasicBlockMemory
# persona1_block = client.create_block(label="persona", value="you are agent 1")
# persona2_block = client.create_block(label="persona", value="you are agent 2")
# create agents
agent_state1 = client.create_agent(
name="agent1", memory=BasicBlockMemory([Block(label="persona", value="you are agent 1")]), block_ids=[block.id]
)
agent_state2 = client.create_agent(
name="agent2", memory=BasicBlockMemory([Block(label="persona", value="you are agent 2")]), block_ids=[block.id]
)
## attach shared block to both agents
# client.link_agent_memory_block(agent_state1.id, block.id)
# client.link_agent_memory_block(agent_state2.id, block.id)
# update memory
client.user_message(agent_id=agent_state1.id, message="my name is actually charles")
# check agent 2 memory
assert "charles" in client.get_block(block.id).value.lower(), f"Shared block update failed {client.get_block(block.id).value}"
client.user_message(agent_id=agent_state2.id, message="whats my name?")
assert (
"charles" in client.get_core_memory(agent_state2.id).get_block("human").value.lower()
), f"Shared block update failed {client.get_core_memory(agent_state2.id).get_block('human').value}"
# cleanup
client.delete_agent(agent_state1.id)
client.delete_agent(agent_state2.id)
def test_sandbox_config_and_env_var_basic(client: Union[LocalClient, RESTClient]):
"""
Test sandbox config and environment variable functions for both LocalClient and RESTClient.
@@ -204,6 +163,11 @@ def test_sandbox_config_and_env_var_basic(client: Union[LocalClient, RESTClient]
client.delete_sandbox_config(sandbox_config_id=sandbox_config.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent tags
# --------------------------------------------------------------------------------------------------------------------
def test_add_and_manage_tags_for_agent(client: Union[LocalClient, RESTClient]):
"""
Comprehensive happy path test for adding, retrieving, and managing tags on an agent.
@@ -306,6 +270,49 @@ def test_agent_tags(client: Union[LocalClient, RESTClient]):
client.delete_agent(agent3.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent memory blocks
# --------------------------------------------------------------------------------------------------------------------
def test_shared_blocks(mock_e2b_api_key_none, client: Union[LocalClient, RESTClient]):
# _reset_config()
# create a block
block = client.create_block(label="human", value="username: sarah")
# create agents with shared block
from letta.schemas.block import Block
from letta.schemas.memory import BasicBlockMemory
# persona1_block = client.create_block(label="persona", value="you are agent 1")
# persona2_block = client.create_block(label="persona", value="you are agent 2")
# create agents
agent_state1 = client.create_agent(
name="agent1", memory=BasicBlockMemory([Block(label="persona", value="you are agent 1")]), block_ids=[block.id]
)
agent_state2 = client.create_agent(
name="agent2", memory=BasicBlockMemory([Block(label="persona", value="you are agent 2")]), block_ids=[block.id]
)
## attach shared block to both agents
# client.link_agent_memory_block(agent_state1.id, block.id)
# client.link_agent_memory_block(agent_state2.id, block.id)
# update memory
client.user_message(agent_id=agent_state1.id, message="my name is actually charles")
# check agent 2 memory
assert "charles" in client.get_block(block.id).value.lower(), f"Shared block update failed {client.get_block(block.id).value}"
client.user_message(agent_id=agent_state2.id, message="whats my name?")
assert (
"charles" in client.get_core_memory(agent_state2.id).get_block("human").value.lower()
), f"Shared block update failed {client.get_core_memory(agent_state2.id).get_block('human').value}"
# cleanup
client.delete_agent(agent_state1.id)
client.delete_agent(agent_state2.id)
def test_update_agent_memory_label(client: Union[LocalClient, RESTClient], agent: AgentState):
"""Test that we can update the label of a block in an agent's memory"""
@@ -326,38 +333,32 @@ def test_update_agent_memory_label(client: Union[LocalClient, RESTClient], agent
client.delete_agent(agent.id)
def test_add_remove_agent_memory_block(client: Union[LocalClient, RESTClient], agent: AgentState):
def test_attach_detach_agent_memory_block(client: Union[LocalClient, RESTClient], agent: AgentState):
"""Test that we can add and remove a block from an agent's memory"""
agent = client.create_agent(name=create_random_username())
current_labels = agent.memory.list_block_labels()
example_new_label = current_labels[0] + "_v2"
example_new_value = "example value"
assert example_new_label not in current_labels
try:
current_labels = agent.memory.list_block_labels()
example_new_label = "example_new_label"
example_new_value = "example value"
assert example_new_label not in current_labels
# Link a new memory block
block = client.create_block(
label=example_new_label,
value=example_new_value,
limit=1000,
)
updated_agent = client.attach_block(
agent_id=agent.id,
block_id=block.id,
)
assert example_new_label in updated_agent.memory.list_block_labels()
# Link a new memory block
client.add_agent_memory_block(
agent_id=agent.id,
create_block=CreateBlock(
label=example_new_label,
value=example_new_value,
limit=1000,
),
)
updated_agent = client.get_agent(agent_id=agent.id)
assert example_new_label in updated_agent.memory.list_block_labels()
# Now unlink the block
client.remove_agent_memory_block(agent_id=agent.id, block_label=example_new_label)
updated_agent = client.get_agent(agent_id=agent.id)
assert example_new_label not in updated_agent.memory.list_block_labels()
finally:
client.delete_agent(agent.id)
# Now unlink the block
updated_agent = client.detach_block(
agent_id=agent.id,
block_id=block.id,
)
assert example_new_label not in updated_agent.memory.list_block_labels()
# def test_core_memory_token_limits(client: Union[LocalClient, RESTClient], agent: AgentState):
@@ -413,24 +414,9 @@ def test_update_agent_memory_limit(client: Union[LocalClient, RESTClient]):
client.delete_agent(agent.id)
def test_messages(client: Union[LocalClient, RESTClient], agent: AgentState):
# _reset_config()
send_message_response = client.send_message(agent_id=agent.id, message="Test message", role="user")
assert send_message_response, "Sending message failed"
messages_response = client.get_messages(agent_id=agent.id, limit=1)
assert len(messages_response) > 0, "Retrieving messages failed"
def test_send_system_message(client: Union[LocalClient, RESTClient], agent: AgentState):
"""Important unit test since the Letta API exposes sending system messages, but some backends don't natively support it (eg Anthropic)"""
send_system_message_response = client.send_message(
agent_id=agent.id, message="Event occurred: The user just logged off.", role="system"
)
assert send_system_message_response, "Sending message failed"
# --------------------------------------------------------------------------------------------------------------------
# Agent Tools
# --------------------------------------------------------------------------------------------------------------------
def test_function_return_limit(client: Union[LocalClient, RESTClient]):
"""Test to see if the function return limit works"""
@@ -503,6 +489,70 @@ def test_function_always_error(client: Union[LocalClient, RESTClient]):
client.delete_agent(agent_id=agent.id)
def test_attach_detach_agent_tool(client: Union[LocalClient, RESTClient], agent: AgentState):
"""Test that we can attach and detach a tool from an agent"""
try:
# Create a tool
def example_tool(x: int) -> int:
"""
This is an example tool.
Parameters:
x (int): The input value.
Returns:
int: The output value.
"""
return x * 2
tool = client.create_or_update_tool(func=example_tool, name="test_tool")
# Initially tool should not be attached
initial_tools = client.list_attached_tools(agent_id=agent.id)
assert tool.id not in [t.id for t in initial_tools]
# Attach tool
new_agent_state = client.attach_tool(agent_id=agent.id, tool_id=tool.id)
assert tool.id in [t.id for t in new_agent_state.tools]
# Verify tool is attached
updated_tools = client.list_attached_tools(agent_id=agent.id)
assert tool.id in [t.id for t in updated_tools]
# Detach tool
new_agent_state = client.detach_tool(agent_id=agent.id, tool_id=tool.id)
assert tool.id not in [t.id for t in new_agent_state.tools]
# Verify tool is detached
final_tools = client.list_attached_tools(agent_id=agent.id)
assert tool.id not in [t.id for t in final_tools]
finally:
client.delete_tool(tool.id)
# --------------------------------------------------------------------------------------------------------------------
# AgentMessages
# --------------------------------------------------------------------------------------------------------------------
def test_messages(client: Union[LocalClient, RESTClient], agent: AgentState):
# _reset_config()
send_message_response = client.send_message(agent_id=agent.id, message="Test message", role="user")
assert send_message_response, "Sending message failed"
messages_response = client.get_messages(agent_id=agent.id, limit=1)
assert len(messages_response) > 0, "Retrieving messages failed"
def test_send_system_message(client: Union[LocalClient, RESTClient], agent: AgentState):
"""Important unit test since the Letta API exposes sending system messages, but some backends don't natively support it (eg Anthropic)"""
send_system_message_response = client.send_message(
agent_id=agent.id, message="Event occurred: The user just logged off.", role="system"
)
assert send_system_message_response, "Sending message failed"
@pytest.mark.asyncio
async def test_send_message_parallel(client: Union[LocalClient, RESTClient], agent: AgentState, request):
"""
@@ -580,9 +630,9 @@ def test_send_message_async(client: Union[LocalClient, RESTClient], agent: Agent
assert usage.total_tokens == usage.completion_tokens + usage.prompt_tokens
# ==========================================
# TESTS FOR AGENT LISTING
# ==========================================
# ----------------------------------------------------------------------------------------------------
# Agent listing
# ----------------------------------------------------------------------------------------------------
def test_agent_listing(client: Union[LocalClient, RESTClient], agent, search_agent_one, search_agent_two):
@@ -678,3 +728,33 @@ def test_agent_creation(client: Union[LocalClient, RESTClient]):
assert all(tool.id in tool_ids for tool in agent_tools)
client.delete_agent(agent_id=agent.id)
# --------------------------------------------------------------------------------------------------------------------
# Agent sources
# --------------------------------------------------------------------------------------------------------------------
def test_attach_detach_agent_source(client: Union[LocalClient, RESTClient], agent: AgentState):
"""Test that we can attach and detach a source from an agent"""
# Create a source
source = client.create_source(
name="test_source",
)
initial_sources = client.list_attached_sources(agent_id=agent.id)
assert source.id not in [s.id for s in initial_sources]
# Attach source
client.attach_source(agent_id=agent.id, source_id=source.id)
# Verify source is attached
final_sources = client.list_attached_sources(agent_id=agent.id)
assert source.id in [s.id for s in final_sources]
# Detach source
client.detach_source(agent_id=agent.id, source_id=source.id)
# Verify source is detached
final_sources = client.list_attached_sources(agent_id=agent.id)
assert source.id not in [s.id for s in final_sources]
client.delete_source(source.id)