diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 460a96e2..55e5603d 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -470,7 +470,6 @@ class LettaAgent(BaseAgent): ToolType.LETTA_VOICE_SLEEPTIME_CORE, ToolType.LETTA_BUILTIN, } - or (t.tool_type == ToolType.LETTA_MULTI_AGENT_CORE and t.name == "send_message_to_agents_matching_tags") or (t.tool_type == ToolType.EXTERNAL_COMPOSIO) ] @@ -604,20 +603,21 @@ class LettaAgent(BaseAgent): # TODO: This temp. Move this logic and code to executors try: - if target_tool.name == "send_message_to_agents_matching_tags" and target_tool.tool_type == ToolType.LETTA_MULTI_AGENT_CORE: - log_event(name="start_send_message_to_agents_matching_tags", attributes=tool_args) - results = await self._send_message_to_agents_matching_tags(**tool_args) - log_event(name="finish_send_message_to_agents_matching_tags", attributes=tool_args) - return json.dumps(results), True - else: - tool_execution_manager = ToolExecutionManager(agent_state=agent_state, actor=self.actor) - # TODO: Integrate sandbox result - log_event(name=f"start_{tool_name}_execution", attributes=tool_args) - tool_execution_result = await tool_execution_manager.execute_tool_async( - function_name=tool_name, function_args=tool_args, tool=target_tool - ) - log_event(name=f"finish_{tool_name}_execution", attributes=tool_args) - return tool_execution_result.func_return, True + tool_execution_manager = ToolExecutionManager( + agent_state=agent_state, + message_manager=self.message_manager, + agent_manager=self.agent_manager, + block_manager=self.block_manager, + passage_manager=self.passage_manager, + actor=self.actor, + ) + # TODO: Integrate sandbox result + log_event(name=f"start_{tool_name}_execution", attributes=tool_args) + tool_execution_result = await tool_execution_manager.execute_tool_async( + function_name=tool_name, function_args=tool_args, tool=target_tool + ) + log_event(name=f"finish_{tool_name}_execution", attributes=tool_args) + return tool_execution_result.func_return, True except Exception as e: return f"Failed to call tool. Error: {e}", False diff --git a/letta/services/tool_executor/tool_execution_manager.py b/letta/services/tool_executor/tool_execution_manager.py index 6580415a..4c378621 100644 --- a/letta/services/tool_executor/tool_execution_manager.py +++ b/letta/services/tool_executor/tool_execution_manager.py @@ -8,6 +8,10 @@ from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult from letta.schemas.user import User +from letta.services.agent_manager import AgentManager +from letta.services.block_manager import BlockManager +from letta.services.message_manager import MessageManager +from letta.services.passage_manager import PassageManager from letta.services.tool_executor.tool_executor import ( ExternalComposioToolExecutor, ExternalMCPToolExecutor, @@ -35,10 +39,24 @@ class ToolExecutorFactory: } @classmethod - def get_executor(cls, tool_type: ToolType) -> ToolExecutor: + def get_executor( + cls, + tool_type: ToolType, + message_manager: MessageManager, + agent_manager: AgentManager, + block_manager: BlockManager, + passage_manager: PassageManager, + actor: User, + ) -> ToolExecutor: """Get the appropriate executor for the given tool type.""" executor_class = cls._executor_map.get(tool_type, SandboxToolExecutor) - return executor_class() + return executor_class( + message_manager=message_manager, + agent_manager=agent_manager, + block_manager=block_manager, + passage_manager=passage_manager, + actor=actor, + ) class ToolExecutionManager: @@ -46,11 +64,19 @@ class ToolExecutionManager: def __init__( self, + message_manager: MessageManager, + agent_manager: AgentManager, + block_manager: BlockManager, + passage_manager: PassageManager, agent_state: AgentState, actor: User, sandbox_config: Optional[SandboxConfig] = None, sandbox_env_vars: Optional[Dict[str, Any]] = None, ): + self.message_manager = message_manager + self.agent_manager = agent_manager + self.block_manager = block_manager + self.passage_manager = passage_manager self.agent_state = agent_state self.logger = get_logger(__name__) self.actor = actor @@ -70,7 +96,14 @@ class ToolExecutionManager: Tuple containing the function response and sandbox run result (if applicable) """ try: - executor = ToolExecutorFactory.get_executor(tool.tool_type) + executor = ToolExecutorFactory.get_executor( + tool.tool_type, + message_manager=self.message_manager, + agent_manager=self.agent_manager, + block_manager=self.block_manager, + passage_manager=self.passage_manager, + actor=self.actor, + ) return executor.execute( function_name, function_args, @@ -100,9 +133,18 @@ class ToolExecutionManager: Execute a tool asynchronously and persist any state changes. """ try: - executor = ToolExecutorFactory.get_executor(tool.tool_type) + executor = ToolExecutorFactory.get_executor( + tool.tool_type, + message_manager=self.message_manager, + agent_manager=self.agent_manager, + block_manager=self.block_manager, + passage_manager=self.passage_manager, + actor=self.actor, + ) # TODO: Extend this async model to composio - if isinstance(executor, (SandboxToolExecutor, ExternalComposioToolExecutor, LettaBuiltinToolExecutor)): + if isinstance( + executor, (SandboxToolExecutor, ExternalComposioToolExecutor, LettaBuiltinToolExecutor, LettaMultiAgentToolExecutor) + ): result = await executor.execute(function_name, function_args, self.agent_state, tool, self.actor) else: result = executor.execute(function_name, function_args, self.agent_state, tool, self.actor) diff --git a/letta/services/tool_executor/tool_executor.py b/letta/services/tool_executor/tool_executor.py index 14369699..51fda3d7 100644 --- a/letta/services/tool_executor/tool_executor.py +++ b/letta/services/tool_executor/tool_executor.py @@ -1,9 +1,10 @@ +import asyncio import json import math import traceback from abc import ABC, abstractmethod from textwrap import shorten -from typing import Any, Dict, Literal, Optional +from typing import Any, Dict, List, Literal, Optional from letta.constants import ( COMPOSIO_ENTITY_ENV_VAR_KEY, @@ -18,12 +19,18 @@ from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_fun from letta.functions.composio_helpers import execute_composio_action_async, generate_composio_action_from_func_name from letta.helpers.composio_helpers import get_composio_api_key from letta.helpers.json_helpers import json_dumps +from letta.log import get_logger from letta.schemas.agent import AgentState +from letta.schemas.enums import MessageRole +from letta.schemas.letta_message import AssistantMessage +from letta.schemas.letta_message_content import TextContent +from letta.schemas.message import MessageCreate from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult from letta.schemas.user import User from letta.services.agent_manager import AgentManager +from letta.services.block_manager import BlockManager from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B @@ -32,10 +39,26 @@ from letta.settings import tool_settings from letta.tracing import trace_method from letta.utils import get_friendly_error_msg +logger = get_logger(__name__) + class ToolExecutor(ABC): """Abstract base class for tool executors.""" + def __init__( + self, + message_manager: MessageManager, + agent_manager: AgentManager, + block_manager: BlockManager, + passage_manager: PassageManager, + actor: User, + ): + self.message_manager = message_manager + self.agent_manager = agent_manager + self.block_manager = block_manager + self.passage_manager = passage_manager + self.actor = actor + @abstractmethod def execute( self, @@ -499,12 +522,107 @@ class LettaCoreToolExecutor(ToolExecutor): class LettaMultiAgentToolExecutor(ToolExecutor): """Executor for LETTA multi-agent core tools.""" - # TODO: Implement - # def execute(self, function_name: str, function_args: dict, agent: "Agent", tool: Tool) -> ToolExecutionResult: - # callable_func = get_function_from_module(LETTA_MULTI_AGENT_TOOL_MODULE_NAME, function_name) - # function_args["self"] = agent # need to attach self to arg since it's dynamically linked - # function_response = callable_func(**function_args) - # return ToolExecutionResult(func_return=function_response) + async def execute( + self, + function_name: str, + function_args: dict, + agent_state: AgentState, + tool: Tool, + actor: User, + sandbox_config: Optional[SandboxConfig] = None, + sandbox_env_vars: Optional[Dict[str, Any]] = None, + ) -> ToolExecutionResult: + function_map = { + "send_message_to_agent_and_wait_for_reply": self.send_message_to_agent_and_wait_for_reply, + "send_message_to_agent_async": self.send_message_to_agent_async, + "send_message_to_agents_matching_tags": self.send_message_to_agents_matching_tags, + } + + if function_name not in function_map: + raise ValueError(f"Unknown function: {function_name}") + + # Execute the appropriate function + function_args_copy = function_args.copy() # Make a copy to avoid modifying the original + function_response = await function_map[function_name](agent_state, **function_args_copy) + return ToolExecutionResult( + status="success", + func_return=function_response, + ) + + async def send_message_to_agent_and_wait_for_reply(self, agent_state: AgentState, message: str, other_agent_id: str) -> str: + augmented_message = ( + f"[Incoming message from agent with ID '{agent_state.id}' - to reply to this message, " + f"make sure to use the 'send_message' at the end, and the system will notify the sender of your response] " + f"{message}" + ) + + return str(await self._process_agent(agent_id=other_agent_id, message=augmented_message)) + + async def send_message_to_agent_async(self, agent_state: AgentState, message: str, other_agent_id: str) -> str: + # 1) Build the prefixed system‐message + prefixed = ( + f"[Incoming message from agent with ID '{agent_state.id}' - " + f"to reply to this message, make sure to use the " + f"'send_message_to_agent_async' tool, or the agent will not receive your message] " + f"{message}" + ) + + task = asyncio.create_task(self._process_agent(agent_id=other_agent_id, message=prefixed)) + + task.add_done_callback(lambda t: (logger.error(f"Async send_message task failed: {t.exception()}") if t.exception() else None)) + + return "Successfully sent message" + + async def send_message_to_agents_matching_tags( + self, agent_state: AgentState, message: str, match_all: List[str], match_some: List[str] + ) -> str: + # Find matching agents + matching_agents = self.agent_manager.list_agents_matching_tags(actor=self.actor, match_all=match_all, match_some=match_some) + if not matching_agents: + return str([]) + + augmented_message = ( + "[Incoming message from external Letta agent - to reply to this message, " + "make sure to use the 'send_message' at the end, and the system will notify " + "the sender of your response] " + f"{message}" + ) + + tasks = [ + asyncio.create_task(self._process_agent(agent_id=agent_state.id, message=augmented_message)) for agent_state in matching_agents + ] + results = await asyncio.gather(*tasks) + return str(results) + + async def _process_agent(self, agent_id: str, message: str) -> Dict[str, Any]: + from letta.agents.letta_agent import LettaAgent + + try: + letta_agent = LettaAgent( + agent_id=agent_id, + message_manager=self.message_manager, + agent_manager=self.agent_manager, + block_manager=self.block_manager, + passage_manager=self.passage_manager, + actor=self.actor, + ) + + letta_response = await letta_agent.step([MessageCreate(role=MessageRole.system, content=[TextContent(text=message)])]) + messages = letta_response.messages + + send_message_content = [message.content for message in messages if isinstance(message, AssistantMessage)] + + return { + "agent_id": agent_id, + "response": send_message_content if send_message_content else [""], + } + + except Exception as e: + return { + "agent_id": agent_id, + "error": str(e), + "type": type(e).__name__, + } class ExternalComposioToolExecutor(ToolExecutor): diff --git a/tests/integration_test_builtin_tools.py b/tests/integration_test_builtin_tools.py index 7e3faf78..402fd54e 100644 --- a/tests/integration_test_builtin_tools.py +++ b/tests/integration_test_builtin_tools.py @@ -8,7 +8,7 @@ from typing import List import pytest import requests from dotenv import load_dotenv -from letta_client import AsyncLetta, Letta, MessageCreate +from letta_client import Letta, MessageCreate from letta_client.types import ToolReturnMessage from letta.schemas.agent import AgentState @@ -69,15 +69,6 @@ def client(server_url: str) -> Letta: yield client_instance -@pytest.fixture(scope="function") -def async_client(server_url: str) -> AsyncLetta: - """ - Creates and returns an asynchronous Letta REST client for testing. - """ - async_client_instance = AsyncLetta(base_url=server_url) - yield async_client_instance - - @pytest.fixture(scope="module") def agent_state(client: Letta) -> AgentState: """ diff --git a/tests/integration_test_composio.py b/tests/integration_test_composio.py index e1219d1e..ba700f56 100644 --- a/tests/integration_test_composio.py +++ b/tests/integration_test_composio.py @@ -67,9 +67,14 @@ async def test_composio_tool_execution_e2e(check_composio_key_set, composio_get_ actor=default_user, ) - tool_execution_result = await ToolExecutionManager(agent_state, actor=default_user).execute_tool( - function_name=composio_get_emojis.name, function_args={}, tool=composio_get_emojis - ) + tool_execution_result = await ToolExecutionManager( + message_manager=server.message_manager, + agent_manager=server.agent_manager, + block_manager=server.block_manager, + passage_manager=server.passage_manager, + agent_state=agent_state, + actor=default_user, + ).execute_tool(function_name=composio_get_emojis.name, function_args={}, tool=composio_get_emojis) # Small check, it should return something at least assert len(tool_execution_result.func_return.keys()) > 10 diff --git a/tests/integration_test_multi_agent.py b/tests/integration_test_multi_agent.py index f53f33f1..a4a464b5 100644 --- a/tests/integration_test_multi_agent.py +++ b/tests/integration_test_multi_agent.py @@ -1,56 +1,120 @@ import json +import os +import threading +import time import pytest +import requests +from dotenv import load_dotenv +from letta_client import Letta -from letta import LocalClient, create_client +from letta.config import LettaConfig from letta.functions.functions import derive_openai_json_schema, parse_source_code -from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.letta_message import SystemMessage, ToolReturnMessage -from letta.schemas.llm_config import LLMConfig -from letta.schemas.memory import ChatMemory from letta.schemas.tool import Tool +from letta.server.server import SyncServer from letta.services.agent_manager import AgentManager +from letta.settings import settings from tests.helpers.utils import retry_until_success from tests.utils import wait_for_incoming_message -@pytest.fixture(scope="function") -def client(): - client = create_client() - client.set_default_llm_config(LLMConfig.default_config("gpt-4o")) - client.set_default_embedding_config(EmbeddingConfig.default_config(provider="openai")) +@pytest.fixture(scope="module") +def server_url() -> str: + """ + Provides the URL for the Letta server. + If LETTA_SERVER_URL is not set, starts the server in a background thread + and polls until it’s accepting connections. + """ - yield client + def _run_server() -> None: + load_dotenv() + from letta.server.rest_api.app import start_server + + start_server(debug=True) + + url: str = os.getenv("LETTA_SERVER_URL", "http://localhost:8283") + + if not os.getenv("LETTA_SERVER_URL"): + thread = threading.Thread(target=_run_server, daemon=True) + thread.start() + + # Poll until the server is up (or timeout) + timeout_seconds = 30 + deadline = time.time() + timeout_seconds + while time.time() < deadline: + try: + resp = requests.get(url + "/v1/health") + if resp.status_code < 500: + break + except requests.exceptions.RequestException: + pass + time.sleep(0.1) + else: + raise RuntimeError(f"Could not reach {url} within {timeout_seconds}s") + + temp = settings.use_experimental + settings.use_experimental = True + yield url + settings.use_experimental = temp + + +@pytest.fixture(scope="module") +def server(): + config = LettaConfig.load() + print("CONFIG PATH", config.config_path) + + config.save() + + server = SyncServer() + return server + + +@pytest.fixture(scope="module") +def client(server_url: str) -> Letta: + """ + Creates and returns a synchronous Letta REST client for testing. + """ + client_instance = Letta(base_url=server_url) + client_instance.tools.upsert_base_tools() + yield client_instance @pytest.fixture(autouse=True) def remove_stale_agents(client): - stale_agents = AgentManager().list_agents(actor=client.user, limit=300) + stale_agents = client.agents.list(limit=300) for agent in stale_agents: - client.delete_agent(agent_id=agent.id) + client.agents.delete(agent_id=agent.id) @pytest.fixture(scope="function") -def agent_obj(client: LocalClient): +def agent_obj(client): """Create a test agent that we can call functions on""" - send_message_to_agent_and_wait_for_reply_tool_id = client.get_tool_id(name="send_message_to_agent_and_wait_for_reply") - agent_state = client.create_agent(tool_ids=[send_message_to_agent_and_wait_for_reply_tool_id]) + send_message_to_agent_tool = client.tools.list(name="send_message_to_agent_and_wait_for_reply")[0] + agent_state_instance = client.agents.create( + include_base_tools=True, + tool_ids=[send_message_to_agent_tool.id], + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) + yield agent_state_instance - agent_obj = client.server.load_agent(agent_id=agent_state.id, actor=client.user) - yield agent_obj - - # client.delete_agent(agent_obj.agent_state.id) + client.agents.delete(agent_state_instance.id) @pytest.fixture(scope="function") -def other_agent_obj(client: LocalClient): +def other_agent_obj(client): """Create another test agent that we can call functions on""" - agent_state = client.create_agent(include_multi_agent_tools=False) + agent_state_instance = client.agents.create( + include_base_tools=True, + include_multi_agent_tools=False, + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) - other_agent_obj = client.server.load_agent(agent_id=agent_state.id, actor=client.user) - yield other_agent_obj + yield agent_state_instance - client.delete_agent(other_agent_obj.agent_state.id) + client.agents.delete(agent_state_instance.id) @pytest.fixture @@ -77,48 +141,68 @@ def roll_dice_tool(client): tool.json_schema = derived_json_schema tool.name = derived_name - tool = client.server.tool_manager.create_or_update_tool(tool, actor=client.user) + tool = client.tools.upsert_from_function(func=roll_dice) # Yield the created tool yield tool @retry_until_success(max_attempts=5, sleep_time_seconds=2) -def test_send_message_to_agent(client, agent_obj, other_agent_obj): +def test_send_message_to_agent(client, server, agent_obj, other_agent_obj): secret_word = "banana" + actor = server.user_manager.get_user_or_default() # Encourage the agent to send a message to the other agent_obj with the secret string - client.send_message( - agent_id=agent_obj.agent_state.id, - role="user", - message=f"Use your tool to send a message to another agent with id {other_agent_obj.agent_state.id} to share the secret word: {secret_word}!", + client.agents.messages.create( + agent_id=agent_obj.id, + messages=[ + { + "role": "user", + "content": f"Use your tool to send a message to another agent with id {other_agent_obj.id} to share the secret word: {secret_word}!", + } + ], ) # Conversation search the other agent - messages = client.get_messages(other_agent_obj.agent_state.id) + messages = server.get_agent_recall( + user_id=actor.id, + agent_id=other_agent_obj.id, + reverse=True, + return_message_object=False, + ) + # Check for the presence of system message for m in reversed(messages): - print(f"\n\n {other_agent_obj.agent_state.id} -> {m.model_dump_json(indent=4)}") + print(f"\n\n {other_agent_obj.id} -> {m.model_dump_json(indent=4)}") if isinstance(m, SystemMessage): assert secret_word in m.content break # Search the sender agent for the response from another agent - in_context_messages = agent_obj.agent_manager.get_in_context_messages(agent_id=agent_obj.agent_state.id, actor=agent_obj.user) + in_context_messages = AgentManager().get_in_context_messages(agent_id=agent_obj.id, actor=actor) found = False - target_snippet = f"{other_agent_obj.agent_state.id} said:" + target_snippet = f"'agent_id': '{other_agent_obj.id}', 'response': [" for m in in_context_messages: if target_snippet in m.content[0].text: found = True break - print(f"In context messages of the sender agent (without system):\n\n{"\n".join([m.content[0].text for m in in_context_messages[1:]])}") + joined = "\n".join([m.content[0].text for m in in_context_messages[1:]]) + print(f"In context messages of the sender agent (without system):\n\n{joined}") if not found: raise Exception(f"Was not able to find an instance of the target snippet: {target_snippet}") # Test that the agent can still receive messages fine - response = client.send_message(agent_id=agent_obj.agent_state.id, role="user", message="So what did the other agent say?") + response = client.agents.messages.create( + agent_id=agent_obj.id, + messages=[ + { + "role": "user", + "content": "So what did the other agent say?", + } + ], + ) print(response.messages) @@ -127,39 +211,50 @@ def test_send_message_to_agents_with_tags_simple(client): worker_tags_123 = ["worker", "user-123"] worker_tags_456 = ["worker", "user-456"] - # Clean up first from possibly failed tests - prev_worker_agents = client.server.agent_manager.list_agents( - client.user, tags=list(set(worker_tags_123 + worker_tags_456)), match_all_tags=True - ) - for agent in prev_worker_agents: - client.delete_agent(agent.id) - secret_word = "banana" # Create "manager" agent - send_message_to_agents_matching_tags_tool_id = client.get_tool_id(name="send_message_to_agents_matching_tags") - manager_agent_state = client.create_agent(name="manager_agent", tool_ids=[send_message_to_agents_matching_tags_tool_id]) - manager_agent = client.server.load_agent(agent_id=manager_agent_state.id, actor=client.user) + send_message_to_agents_matching_tags_tool_id = client.tools.list(name="send_message_to_agents_matching_tags")[0].id + manager_agent_state = client.agents.create( + name="manager_agent", + tool_ids=[send_message_to_agents_matching_tags_tool_id], + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) # Create 3 non-matching worker agents (These should NOT get the message) worker_agents_123 = [] for idx in range(2): - worker_agent_state = client.create_agent(name=f"not_worker_{idx}", include_multi_agent_tools=False, tags=worker_tags_123) - worker_agent = client.server.load_agent(agent_id=worker_agent_state.id, actor=client.user) - worker_agents_123.append(worker_agent) + worker_agent_state = client.agents.create( + name=f"not_worker_{idx}", + include_multi_agent_tools=False, + tags=worker_tags_123, + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) + worker_agents_123.append(worker_agent_state) # Create 3 worker agents that should get the message worker_agents_456 = [] for idx in range(2): - worker_agent_state = client.create_agent(name=f"worker_{idx}", include_multi_agent_tools=False, tags=worker_tags_456) - worker_agent = client.server.load_agent(agent_id=worker_agent_state.id, actor=client.user) - worker_agents_456.append(worker_agent) + worker_agent_state = client.agents.create( + name=f"worker_{idx}", + include_multi_agent_tools=False, + tags=worker_tags_456, + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) + worker_agents_456.append(worker_agent_state) # Encourage the manager to send a message to the other agent_obj with the secret string - response = client.send_message( - agent_id=manager_agent.agent_state.id, - role="user", - message=f"Send a message to all agents with tags {worker_tags_456} informing them of the secret word: {secret_word}!", + response = client.agents.messages.create( + agent_id=manager_agent_state.id, + messages=[ + { + "role": "user", + "content": f"Send a message to all agents with tags {worker_tags_456} informing them of the secret word: {secret_word}!", + } + ], ) for m in response.messages: @@ -172,62 +267,70 @@ def test_send_message_to_agents_with_tags_simple(client): break # Conversation search the worker agents - for agent in worker_agents_456: - messages = client.get_messages(agent.agent_state.id) + for agent_state in worker_agents_456: + messages = client.agents.messages.list(agent_state.id) # Check for the presence of system message for m in reversed(messages): - print(f"\n\n {agent.agent_state.id} -> {m.model_dump_json(indent=4)}") + print(f"\n\n {agent_state.id} -> {m.model_dump_json(indent=4)}") if isinstance(m, SystemMessage): assert secret_word in m.content break # Ensure it's NOT in the non matching worker agents - for agent in worker_agents_123: - messages = client.get_messages(agent.agent_state.id) + for agent_state in worker_agents_123: + messages = client.agents.messages.list(agent_state.id) # Check for the presence of system message for m in reversed(messages): - print(f"\n\n {agent.agent_state.id} -> {m.model_dump_json(indent=4)}") + print(f"\n\n {agent_state.id} -> {m.model_dump_json(indent=4)}") if isinstance(m, SystemMessage): assert secret_word not in m.content # Test that the agent can still receive messages fine - response = client.send_message(agent_id=manager_agent.agent_state.id, role="user", message="So what did the other agents say?") + response = client.agents.messages.create( + agent_id=manager_agent_state.id, + messages=[ + { + "role": "user", + "content": "So what did the other agent say?", + } + ], + ) print("Manager agent followup message: \n\n" + "\n".join([str(m) for m in response.messages])) - # Clean up agents - client.delete_agent(manager_agent_state.id) - for agent in worker_agents_456 + worker_agents_123: - client.delete_agent(agent.agent_state.id) - @retry_until_success(max_attempts=5, sleep_time_seconds=2) def test_send_message_to_agents_with_tags_complex_tool_use(client, roll_dice_tool): - worker_tags = ["dice-rollers"] - - # Clean up first from possibly failed tests - prev_worker_agents = client.server.agent_manager.list_agents(client.user, tags=worker_tags, match_all_tags=True) - for agent in prev_worker_agents: - client.delete_agent(agent.id) - # Create "manager" agent - send_message_to_agents_matching_tags_tool_id = client.get_tool_id(name="send_message_to_agents_matching_tags") - manager_agent_state = client.create_agent(tool_ids=[send_message_to_agents_matching_tags_tool_id]) - manager_agent = client.server.load_agent(agent_id=manager_agent_state.id, actor=client.user) + send_message_to_agents_matching_tags_tool_id = client.tools.list(name="send_message_to_agents_matching_tags")[0].id + manager_agent_state = client.agents.create( + tool_ids=[send_message_to_agents_matching_tags_tool_id], + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) # Create 3 worker agents worker_agents = [] worker_tags = ["dice-rollers"] for _ in range(2): - worker_agent_state = client.create_agent(include_multi_agent_tools=False, tags=worker_tags, tool_ids=[roll_dice_tool.id]) - worker_agent = client.server.load_agent(agent_id=worker_agent_state.id, actor=client.user) - worker_agents.append(worker_agent) + worker_agent_state = client.agents.create( + include_multi_agent_tools=False, + tags=worker_tags, + tool_ids=[roll_dice_tool.id], + model="openai/gpt-4o-mini", + embedding="letta/letta-free", + ) + worker_agents.append(worker_agent_state) # Encourage the manager to send a message to the other agent_obj with the secret string broadcast_message = f"Send a message to all agents with tags {worker_tags} asking them to roll a dice for you!" - response = client.send_message( - agent_id=manager_agent.agent_state.id, - role="user", - message=broadcast_message, + response = client.agents.messages.create( + agent_id=manager_agent_state.id, + messages=[ + { + "role": "user", + "content": broadcast_message, + } + ], ) for m in response.messages: @@ -240,47 +343,65 @@ def test_send_message_to_agents_with_tags_complex_tool_use(client, roll_dice_too break # Test that the agent can still receive messages fine - response = client.send_message(agent_id=manager_agent.agent_state.id, role="user", message="So what did the other agents say?") + response = client.agents.messages.create( + agent_id=manager_agent_state.id, + messages=[ + { + "role": "user", + "content": "So what did the other agent say?", + } + ], + ) print("Manager agent followup message: \n\n" + "\n".join([str(m) for m in response.messages])) - # Clean up agents - client.delete_agent(manager_agent_state.id) - for agent in worker_agents: - client.delete_agent(agent.agent_state.id) - -@retry_until_success(max_attempts=5, sleep_time_seconds=2) +# @retry_until_success(max_attempts=5, sleep_time_seconds=2) def test_agents_async_simple(client): """ Test two agents with multi-agent tools sending messages back and forth to count to 5. The chain is started by prompting one of the agents. """ - # Cleanup from potentially failed previous runs - existing_agents = client.server.agent_manager.list_agents(client.user) - for agent in existing_agents: - client.delete_agent(agent.id) - # Create two agents with multi-agent tools - send_message_to_agent_async_tool_id = client.get_tool_id(name="send_message_to_agent_async") - memory_a = ChatMemory( - human="Chad - I'm interested in hearing poem.", - persona="You are an AI agent that can communicate with your agent buddy using `send_message_to_agent_async`, who has some great poem ideas (so I've heard).", + send_message_to_agent_async_tool_id = client.tools.list(name="send_message_to_agent_async")[0].id + charles_state = client.agents.create( + name="charles", + tool_ids=[send_message_to_agent_async_tool_id], + memory_blocks=[ + { + "label": "human", + "value": "Chad - I'm interested in hearing poem.", + }, + { + "label": "persona", + "value": "You are an AI agent that can communicate with your agent buddy using `send_message_to_agent_async`, who has some great poem ideas (so I've heard).", + }, + ], + model="openai/gpt-4o-mini", + embedding="letta/letta-free", ) - charles_state = client.create_agent(name="charles", memory=memory_a, tool_ids=[send_message_to_agent_async_tool_id]) - charles = client.server.load_agent(agent_id=charles_state.id, actor=client.user) - memory_b = ChatMemory( - human="No human - you are to only communicate with the other AI agent.", - persona="You are an AI agent that can communicate with your agent buddy using `send_message_to_agent_async`, who is interested in great poem ideas.", + sarah_state = client.agents.create( + name="sarah", + tool_ids=[send_message_to_agent_async_tool_id], + memory_blocks=[ + { + "label": "human", + "value": "No human - you are to only communicate with the other AI agent.", + }, + { + "label": "persona", + "value": "You are an AI agent that can communicate with your agent buddy using `send_message_to_agent_async`, who is interested in great poem ideas.", + }, + ], + model="openai/gpt-4o-mini", + embedding="letta/letta-free", ) - sarah_state = client.create_agent(name="sarah", memory=memory_b, tool_ids=[send_message_to_agent_async_tool_id]) # Start the count chain with Agent1 initial_prompt = f"I want you to talk to the other agent with ID {sarah_state.id} using `send_message_to_agent_async`. Specifically, I want you to ask him for a poem idea, and then craft a poem for me." - client.send_message( - agent_id=charles.agent_state.id, - role="user", - message=initial_prompt, + client.agents.messages.create( + agent_id=charles_state.id, + messages=[{"role": "user", "content": initial_prompt}], ) found_in_charles = wait_for_incoming_message( diff --git a/tests/utils.py b/tests/utils.py index 37b8ed6a..04778d11 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -7,13 +7,12 @@ from importlib import util from typing import Dict, Iterator, List, Optional, Tuple import requests +from letta_client import Letta, SystemMessage from letta.config import LettaConfig from letta.data_sources.connectors import DataConnector from letta.functions.functions import parse_source_code -from letta.schemas.enums import MessageRole from letta.schemas.file import FileMetadata -from letta.schemas.message import Message from letta.schemas.tool import Tool from letta.settings import TestSettings @@ -154,7 +153,7 @@ def with_qdrant_storage(storage: list[str]): def wait_for_incoming_message( - client, + client: Letta, agent_id: str, substring: str = "[Incoming message from agent with ID", max_wait_seconds: float = 10.0, @@ -168,13 +167,13 @@ def wait_for_incoming_message( deadline = time.time() + max_wait_seconds while time.time() < deadline: - messages = client.server.message_manager.list_messages_for_agent(agent_id=agent_id, actor=client.user) + messages = client.agents.messages.list(agent_id)[1:] # Check for the system message containing `substring` - def get_message_text(message: Message) -> str: - return message.content[0].text if message.content and len(message.content) == 1 else "" + def get_message_text(message: SystemMessage) -> str: + return message.content if message.content else "" - if any(message.role == MessageRole.system and substring in get_message_text(message) for message in messages): + if any(isinstance(message, SystemMessage) and substring in get_message_text(message) for message in messages): return True time.sleep(sleep_interval)