From 512d97022bb47d4556447b37c3fbe755bc968ec9 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Wed, 19 Mar 2025 11:18:55 -0700 Subject: [PATCH] fix: Tweak performance on multi agent tooling (#1338) --- letta/functions/function_sets/multi_agent.py | 47 +++++++++++++- letta/functions/helpers.py | 65 +++----------------- 2 files changed, 55 insertions(+), 57 deletions(-) diff --git a/letta/functions/function_sets/multi_agent.py b/letta/functions/function_sets/multi_agent.py index 98f513ef..3358c788 100644 --- a/letta/functions/function_sets/multi_agent.py +++ b/letta/functions/function_sets/multi_agent.py @@ -9,6 +9,8 @@ from letta.functions.helpers import ( ) from letta.schemas.enums import MessageRole from letta.schemas.message import MessageCreate +from letta.server.rest_api.utils import get_letta_server +from letta.utils import log_telemetry if TYPE_CHECKING: from letta.agent import Agent @@ -85,8 +87,51 @@ def send_message_to_agents_matching_tags(self: "Agent", message: str, match_all: response corresponds to a single agent. Agents that do not respond will not have an entry in the returned list. """ + log_telemetry( + self.logger, + "_send_message_to_agents_matching_tags_async start", + message=message, + match_all=match_all, + match_some=match_some, + ) + server = get_letta_server() - return asyncio.run(_send_message_to_agents_matching_tags_async(self, message, match_all, match_some)) + augmented_message = ( + f"[Incoming message from agent with ID '{self.agent_state.id}' - to reply to this message, " + f"make sure to use the 'send_message' at the end, and the system will notify the sender of your response] " + f"{message}" + ) + + # Retrieve up to 100 matching agents + log_telemetry( + self.logger, + "_send_message_to_agents_matching_tags_async listing agents start", + message=message, + match_all=match_all, + match_some=match_some, + ) + matching_agents = server.agent_manager.list_agents_matching_tags(actor=self.user, match_all=match_all, match_some=match_some) + + log_telemetry( + self.logger, + "_send_message_to_agents_matching_tags_async listing agents finish", + message=message, + match_all=match_all, + match_some=match_some, + ) + + # Create a system message + messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=self.agent_state.name)] + + result = asyncio.run(_send_message_to_agents_matching_tags_async(self, server, messages, matching_agents)) + log_telemetry( + self.logger, + "_send_message_to_agents_matching_tags_async finish", + messages=message, + match_all=match_all, + match_some=match_some, + ) + return result def send_message_to_all_agents_in_group(self: "Agent", message: str) -> List[str]: diff --git a/letta/functions/helpers.py b/letta/functions/helpers.py index ebe78a6a..64a01f63 100644 --- a/letta/functions/helpers.py +++ b/letta/functions/helpers.py @@ -533,57 +533,17 @@ def fire_and_forget_send_to_agent( async def _send_message_to_agents_matching_tags_async( - sender_agent: "Agent", message: str, match_all: List[str], match_some: List[str] + sender_agent: "Agent", server: "SyncServer", messages: List[MessageCreate], matching_agents: List["AgentState"] ) -> List[str]: - log_telemetry( - sender_agent.logger, - "_send_message_to_agents_matching_tags_async start", - message=message, - match_all=match_all, - match_some=match_some, - ) - server = get_letta_server() - - augmented_message = ( - f"[Incoming message from agent with ID '{sender_agent.agent_state.id}' - to reply to this message, " - f"make sure to use the 'send_message' at the end, and the system will notify the sender of your response] " - f"{message}" - ) - - # Retrieve up to 100 matching agents - log_telemetry( - sender_agent.logger, - "_send_message_to_agents_matching_tags_async listing agents start", - message=message, - match_all=match_all, - match_some=match_some, - ) - matching_agents = server.agent_manager.list_agents_matching_tags(actor=sender_agent.user, match_all=match_all, match_some=match_some) - - log_telemetry( - sender_agent.logger, - "_send_message_to_agents_matching_tags_async listing agents finish", - message=message, - match_all=match_all, - match_some=match_some, - ) - - # Create a system message - messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=sender_agent.agent_state.name)] - - # Possibly limit concurrency to avoid meltdown: - sem = asyncio.Semaphore(settings.multi_agent_concurrent_sends) - async def _send_single(agent_state): - async with sem: - return await async_send_message_with_retries( - server=server, - sender_agent=sender_agent, - target_agent_id=agent_state.id, - messages=messages, - max_retries=3, - timeout=settings.multi_agent_send_message_timeout, - ) + return await async_send_message_with_retries( + server=server, + sender_agent=sender_agent, + target_agent_id=agent_state.id, + messages=messages, + max_retries=3, + timeout=settings.multi_agent_send_message_timeout, + ) tasks = [asyncio.create_task(_send_single(agent_state)) for agent_state in matching_agents] results = await asyncio.gather(*tasks, return_exceptions=True) @@ -594,13 +554,6 @@ async def _send_message_to_agents_matching_tags_async( else: final.append(r) - log_telemetry( - sender_agent.logger, - "_send_message_to_agents_matching_tags_async finish", - message=message, - match_all=match_all, - match_some=match_some, - ) return final