feat: Add ConditionalToolRules (#2279)
Co-authored-by: Mindy Long <mindy@letta.com>
This commit is contained in:
@@ -4,7 +4,12 @@ import uuid
|
||||
import pytest
|
||||
from letta import create_client
|
||||
from letta.schemas.letta_message import ToolCallMessage
|
||||
from letta.schemas.tool_rule import ChildToolRule, InitToolRule, TerminalToolRule
|
||||
from letta.schemas.tool_rule import (
|
||||
ChildToolRule,
|
||||
ConditionalToolRule,
|
||||
InitToolRule,
|
||||
TerminalToolRule,
|
||||
)
|
||||
from tests.helpers.endpoints_helper import (
|
||||
assert_invoked_function_call,
|
||||
assert_invoked_send_message_with_keyword,
|
||||
@@ -68,6 +73,57 @@ def fourth_secret_word(prev_secret_word: str):
|
||||
return "banana"
|
||||
|
||||
|
||||
def flip_coin():
|
||||
"""
|
||||
Call this to retrieve the password to the secret word, which you will need to output in a send_message later.
|
||||
If it returns an empty string, try flipping again!
|
||||
|
||||
Returns:
|
||||
str: The password or an empty string
|
||||
"""
|
||||
import random
|
||||
|
||||
# Flip a coin with 50% chance
|
||||
if random.random() < 0.5:
|
||||
return ""
|
||||
return "hj2hwibbqm"
|
||||
|
||||
|
||||
def flip_coin_hard():
|
||||
"""
|
||||
Call this to retrieve the password to the secret word, which you will need to output in a send_message later.
|
||||
If it returns an empty string, try flipping again!
|
||||
|
||||
Returns:
|
||||
str: The password or an empty string
|
||||
"""
|
||||
import random
|
||||
|
||||
# Flip a coin with 50% chance
|
||||
result = random.random()
|
||||
if result < 0.5:
|
||||
return ""
|
||||
if result < 0.75:
|
||||
return "START_OVER"
|
||||
return "hj2hwibbqm"
|
||||
|
||||
|
||||
def can_play_game():
|
||||
"""
|
||||
Call this to start the tool chain.
|
||||
"""
|
||||
import random
|
||||
|
||||
return random.random() < 0.5
|
||||
|
||||
|
||||
def return_none():
|
||||
"""
|
||||
Really simple function
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
def auto_error():
|
||||
"""
|
||||
If you call this function, it will throw an error automatically.
|
||||
@@ -201,6 +257,7 @@ def test_claude_initial_tool_rule_enforced(mock_e2b_api_key_none):
|
||||
tool_rules = [
|
||||
InitToolRule(tool_name=t1_name),
|
||||
ChildToolRule(tool_name=t1_name, children=[t2_name]),
|
||||
TerminalToolRule(tool_name=t2_name)
|
||||
]
|
||||
tools = [t1, t2]
|
||||
|
||||
@@ -259,26 +316,331 @@ def test_agent_no_structured_output_with_one_child_tool(mock_e2b_api_key_none):
|
||||
]
|
||||
|
||||
for config in config_files:
|
||||
agent_state = setup_agent(client, config, agent_uuid=agent_uuid, tool_ids=[t.id for t in tools], tool_rules=tool_rules)
|
||||
response = client.user_message(agent_id=agent_state.id, message="hi. run archival memory search")
|
||||
max_retries = 3
|
||||
last_error = None
|
||||
|
||||
# Make checks
|
||||
assert_sanity_checks(response)
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
agent_state = setup_agent(client, config, agent_uuid=agent_uuid, tool_ids=[t.id for t in tools], tool_rules=tool_rules)
|
||||
response = client.user_message(agent_id=agent_state.id, message="hi. run archival memory search")
|
||||
|
||||
# Assert the tools were called
|
||||
assert_invoked_function_call(response.messages, "archival_memory_search")
|
||||
assert_invoked_function_call(response.messages, "archival_memory_insert")
|
||||
assert_invoked_function_call(response.messages, "send_message")
|
||||
# Make checks
|
||||
assert_sanity_checks(response)
|
||||
|
||||
# Check ordering of tool calls
|
||||
tool_names = [t.name for t in [archival_memory_search, archival_memory_insert, send_message]]
|
||||
for m in response.messages:
|
||||
if isinstance(m, ToolCallMessage):
|
||||
# Check that it's equal to the first one
|
||||
assert m.tool_call.name == tool_names[0]
|
||||
# Assert the tools were called
|
||||
assert_invoked_function_call(response.messages, "archival_memory_search")
|
||||
assert_invoked_function_call(response.messages, "archival_memory_insert")
|
||||
assert_invoked_function_call(response.messages, "send_message")
|
||||
|
||||
# Pop out first one
|
||||
tool_names = tool_names[1:]
|
||||
# Check ordering of tool calls
|
||||
tool_names = [t.name for t in [archival_memory_search, archival_memory_insert, send_message]]
|
||||
for m in response.messages:
|
||||
if isinstance(m, ToolCallMessage):
|
||||
# Check that it's equal to the first one
|
||||
assert m.tool_call.name == tool_names[0]
|
||||
|
||||
# Pop out first one
|
||||
tool_names = tool_names[1:]
|
||||
|
||||
print(f"Got successful response from client: \n\n{response}")
|
||||
break # Test passed, exit retry loop
|
||||
|
||||
except AssertionError as e:
|
||||
last_error = e
|
||||
print(f"Attempt {attempt + 1} failed, retrying..." if attempt < max_retries - 1 else f"All {max_retries} attempts failed")
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
continue
|
||||
|
||||
if last_error and attempt == max_retries - 1:
|
||||
raise last_error # Re-raise the last error if all retries failed
|
||||
|
||||
print(f"Got successful response from client: \n\n{response}")
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60) # Sets a 60-second timeout for the test since this could loop infinitely
|
||||
def test_agent_conditional_tool_easy(mock_e2b_api_key_none):
|
||||
"""
|
||||
Test the agent with a conditional tool that has a child tool.
|
||||
|
||||
Tool Flow:
|
||||
|
||||
-------
|
||||
| |
|
||||
| v
|
||||
-- flip_coin
|
||||
|
|
||||
v
|
||||
reveal_secret_word
|
||||
"""
|
||||
|
||||
client = create_client()
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
coin_flip_name = "flip_coin"
|
||||
secret_word_tool = "fourth_secret_word"
|
||||
flip_coin_tool = client.create_or_update_tool(flip_coin, name=coin_flip_name)
|
||||
reveal_secret = client.create_or_update_tool(fourth_secret_word, name=secret_word_tool)
|
||||
|
||||
# Make tool rules
|
||||
tool_rules = [
|
||||
InitToolRule(tool_name=coin_flip_name),
|
||||
ConditionalToolRule(
|
||||
tool_name=coin_flip_name,
|
||||
default_child=coin_flip_name,
|
||||
child_output_mapping={
|
||||
"hj2hwibbqm": secret_word_tool,
|
||||
}
|
||||
),
|
||||
TerminalToolRule(tool_name=secret_word_tool),
|
||||
]
|
||||
tools = [flip_coin_tool, reveal_secret]
|
||||
|
||||
config_file = "tests/configs/llm_model_configs/claude-3-sonnet-20240229.json"
|
||||
agent_state = setup_agent(client, config_file, agent_uuid=agent_uuid, tool_ids=[t.id for t in tools], tool_rules=tool_rules)
|
||||
response = client.user_message(agent_id=agent_state.id, message="flip a coin until you get the secret word")
|
||||
|
||||
# Make checks
|
||||
assert_sanity_checks(response)
|
||||
|
||||
# Assert the tools were called
|
||||
assert_invoked_function_call(response.messages, "flip_coin")
|
||||
assert_invoked_function_call(response.messages, "fourth_secret_word")
|
||||
|
||||
# Check ordering of tool calls
|
||||
found_secret_word = False
|
||||
for m in response.messages:
|
||||
if isinstance(m, ToolCallMessage):
|
||||
if m.tool_call.name == secret_word_tool:
|
||||
# Should be the last tool call
|
||||
found_secret_word = True
|
||||
else:
|
||||
# Before finding secret_word, only flip_coin should be called
|
||||
assert m.tool_call.name == coin_flip_name
|
||||
assert not found_secret_word
|
||||
|
||||
# Ensure we found the secret word exactly once
|
||||
assert found_secret_word
|
||||
|
||||
print(f"Got successful response from client: \n\n{response}")
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
|
||||
|
||||
@pytest.mark.timeout(90) # Longer timeout since this test has more steps
|
||||
def test_agent_conditional_tool_hard(mock_e2b_api_key_none):
|
||||
"""
|
||||
Test the agent with a complex conditional tool graph
|
||||
|
||||
Tool Flow:
|
||||
|
||||
can_play_game <---+
|
||||
| |
|
||||
v |
|
||||
flip_coin -----+
|
||||
|
|
||||
v
|
||||
fourth_secret_word
|
||||
"""
|
||||
client = create_client()
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
# Create tools
|
||||
play_game = "can_play_game"
|
||||
coin_flip_name = "flip_coin_hard"
|
||||
final_tool = "fourth_secret_word"
|
||||
play_game_tool = client.create_or_update_tool(can_play_game, name=play_game)
|
||||
flip_coin_tool = client.create_or_update_tool(flip_coin_hard, name=coin_flip_name)
|
||||
reveal_secret = client.create_or_update_tool(fourth_secret_word, name=final_tool)
|
||||
|
||||
# Make tool rules - chain them together with conditional rules
|
||||
tool_rules = [
|
||||
InitToolRule(tool_name=play_game),
|
||||
ConditionalToolRule(
|
||||
tool_name=play_game,
|
||||
default_child=play_game, # Keep trying if we can't play
|
||||
child_output_mapping={
|
||||
True: coin_flip_name # Only allow access when can_play_game returns True
|
||||
}
|
||||
),
|
||||
ConditionalToolRule(
|
||||
tool_name=coin_flip_name,
|
||||
default_child=coin_flip_name,
|
||||
child_output_mapping={
|
||||
"hj2hwibbqm": final_tool, "START_OVER": play_game
|
||||
}
|
||||
),
|
||||
TerminalToolRule(tool_name=final_tool),
|
||||
]
|
||||
|
||||
# Setup agent with all tools
|
||||
tools = [play_game_tool, flip_coin_tool, reveal_secret]
|
||||
config_file = "tests/configs/llm_model_configs/claude-3-sonnet-20240229.json"
|
||||
agent_state = setup_agent(
|
||||
client,
|
||||
config_file,
|
||||
agent_uuid=agent_uuid,
|
||||
tool_ids=[t.id for t in tools],
|
||||
tool_rules=tool_rules
|
||||
)
|
||||
|
||||
# Ask agent to try to get all secret words
|
||||
response = client.user_message(agent_id=agent_state.id, message="hi")
|
||||
|
||||
# Make checks
|
||||
assert_sanity_checks(response)
|
||||
|
||||
# Assert all tools were called
|
||||
assert_invoked_function_call(response.messages, play_game)
|
||||
assert_invoked_function_call(response.messages, final_tool)
|
||||
|
||||
# Check ordering of tool calls
|
||||
found_words = []
|
||||
for m in response.messages:
|
||||
if isinstance(m, ToolCallMessage):
|
||||
name = m.tool_call.name
|
||||
if name in [play_game, coin_flip_name]:
|
||||
# Before finding secret_word, only can_play_game and flip_coin should be called
|
||||
assert name in [play_game, coin_flip_name]
|
||||
else:
|
||||
# Should find secret words in order
|
||||
expected_word = final_tool
|
||||
assert name == expected_word, f"Found {name} but expected {expected_word}"
|
||||
found_words.append(name)
|
||||
|
||||
# Ensure we found all secret words in order
|
||||
assert found_words == [final_tool]
|
||||
|
||||
print(f"Got successful response from client: \n\n{response}")
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_agent_conditional_tool_without_default_child(mock_e2b_api_key_none):
|
||||
"""
|
||||
Test the agent with a conditional tool that allows any child tool to be called if a function returns None.
|
||||
|
||||
Tool Flow:
|
||||
|
||||
return_none
|
||||
|
|
||||
v
|
||||
any tool... <-- When output doesn't match mapping, agent can call any tool
|
||||
"""
|
||||
client = create_client()
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
# Create tools - we'll make several available to the agent
|
||||
tool_name = "return_none"
|
||||
|
||||
tool = client.create_or_update_tool(return_none, name=tool_name)
|
||||
secret_word = client.create_or_update_tool(first_secret_word, name="first_secret_word")
|
||||
|
||||
# Make tool rules - only map one output, let others be free choice
|
||||
tool_rules = [
|
||||
InitToolRule(tool_name=tool_name),
|
||||
ConditionalToolRule(
|
||||
tool_name=tool_name,
|
||||
default_child=None, # Allow any tool to be called if output doesn't match
|
||||
child_output_mapping={
|
||||
"anything but none": "first_secret_word"
|
||||
}
|
||||
)
|
||||
]
|
||||
tools = [tool, secret_word]
|
||||
|
||||
# Setup agent with all tools
|
||||
agent_state = setup_agent(
|
||||
client,
|
||||
config_file,
|
||||
agent_uuid=agent_uuid,
|
||||
tool_ids=[t.id for t in tools],
|
||||
tool_rules=tool_rules
|
||||
)
|
||||
|
||||
# Ask agent to try different tools based on the game output
|
||||
response = client.user_message(
|
||||
agent_id=agent_state.id,
|
||||
message="call a function, any function. then call send_message"
|
||||
)
|
||||
|
||||
# Make checks
|
||||
assert_sanity_checks(response)
|
||||
|
||||
# Assert return_none was called
|
||||
assert_invoked_function_call(response.messages, tool_name)
|
||||
|
||||
# Assert any base function called afterward
|
||||
found_any_tool = False
|
||||
found_return_none = False
|
||||
for m in response.messages:
|
||||
if isinstance(m, ToolCallMessage):
|
||||
if m.tool_call.name == tool_name:
|
||||
found_return_none = True
|
||||
elif found_return_none and m.tool_call.name:
|
||||
found_any_tool = True
|
||||
break
|
||||
|
||||
assert found_any_tool, "Should have called any tool after return_none"
|
||||
|
||||
print(f"Got successful response from client: \n\n{response}")
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
|
||||
@pytest.mark.timeout(60)
|
||||
def test_agent_reload_remembers_function_response(mock_e2b_api_key_none):
|
||||
"""
|
||||
Test that when an agent is reloaded, it remembers the last function response for conditional tool chaining.
|
||||
|
||||
Tool Flow:
|
||||
|
||||
flip_coin
|
||||
|
|
||||
v
|
||||
fourth_secret_word <-- Should remember coin flip result after reload
|
||||
"""
|
||||
client = create_client()
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
|
||||
# Create tools
|
||||
flip_coin_name = "flip_coin"
|
||||
secret_word = "fourth_secret_word"
|
||||
flip_coin_tool = client.create_or_update_tool(flip_coin, name=flip_coin_name)
|
||||
secret_word_tool = client.create_or_update_tool(fourth_secret_word, name=secret_word)
|
||||
|
||||
# Make tool rules - map coin flip to fourth_secret_word
|
||||
tool_rules = [
|
||||
InitToolRule(tool_name=flip_coin_name),
|
||||
ConditionalToolRule(
|
||||
tool_name=flip_coin_name,
|
||||
default_child=flip_coin_name, # Allow any tool to be called if output doesn't match
|
||||
child_output_mapping={
|
||||
"hj2hwibbqm": secret_word
|
||||
}
|
||||
),
|
||||
TerminalToolRule(tool_name=secret_word)
|
||||
]
|
||||
tools = [flip_coin_tool, secret_word_tool]
|
||||
|
||||
# Setup initial agent
|
||||
agent_state = setup_agent(
|
||||
client, config_file, agent_uuid=agent_uuid, tool_ids=[t.id for t in tools], tool_rules=tool_rules
|
||||
)
|
||||
|
||||
# Call flip_coin first
|
||||
response = client.user_message(agent_id=agent_state.id, message="flip a coin")
|
||||
assert_invoked_function_call(response.messages, flip_coin_name)
|
||||
assert_invoked_function_call(response.messages, secret_word)
|
||||
found_fourth_secret = False
|
||||
for m in response.messages:
|
||||
if isinstance(m, ToolCallMessage) and m.tool_call.name == secret_word:
|
||||
found_fourth_secret = True
|
||||
break
|
||||
|
||||
assert found_fourth_secret, "Reloaded agent should remember coin flip result and call fourth_secret_word if True"
|
||||
|
||||
# Reload the agent
|
||||
reloaded_agent = client.server.load_agent(agent_id=agent_state.id, actor=client.user)
|
||||
assert reloaded_agent.last_function_response is not None
|
||||
|
||||
print(f"Got successful response from client: \n\n{response}")
|
||||
cleanup(client=client, agent_uuid=agent_uuid)
|
||||
Reference in New Issue
Block a user