feat: Make multi agent parallelism configurable (#937)

This commit is contained in:
Matthew Zhou
2025-02-06 12:34:24 -05:00
committed by GitHub
parent 5e2fddce06
commit d4aa107cbb
3 changed files with 13 additions and 17 deletions

View File

@@ -51,9 +51,6 @@ BASE_TOOLS = ["send_message", "conversation_search", "archival_memory_insert", "
BASE_MEMORY_TOOLS = ["core_memory_append", "core_memory_replace"]
# Multi agent tools
MULTI_AGENT_TOOLS = ["send_message_to_agent_and_wait_for_reply", "send_message_to_agents_matching_all_tags", "send_message_to_agent_async"]
MULTI_AGENT_SEND_MESSAGE_MAX_RETRIES = 3
MULTI_AGENT_SEND_MESSAGE_TIMEOUT = 20 * 60
MULTI_AGENT_CONCURRENT_SENDS = 15
# The name of the tool used to send message to the user
# May not be relevant in cases where the agent has multiple ways to message to user (send_imessage, send_discord_mesasge, ...)

View File

@@ -7,14 +7,7 @@ import humps
from composio.constants import DEFAULT_ENTITY_ID
from pydantic import BaseModel
from letta.constants import (
COMPOSIO_ENTITY_ENV_VAR_KEY,
DEFAULT_MESSAGE_TOOL,
DEFAULT_MESSAGE_TOOL_KWARG,
MULTI_AGENT_CONCURRENT_SENDS,
MULTI_AGENT_SEND_MESSAGE_MAX_RETRIES,
MULTI_AGENT_SEND_MESSAGE_TIMEOUT,
)
from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY, DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG
from letta.functions.interface import MultiAgentMessagingInterface
from letta.orm.errors import NoResultFound
from letta.schemas.enums import MessageRole
@@ -23,6 +16,7 @@ from letta.schemas.letta_response import LettaResponse
from letta.schemas.message import Message, MessageCreate
from letta.schemas.user import User
from letta.server.rest_api.utils import get_letta_server
from letta.settings import settings
# TODO: This is kind of hacky, as this is used to search up the action later on composio's side
@@ -290,8 +284,8 @@ async def async_execute_send_message_to_agent(
sender_agent=sender_agent,
target_agent_id=other_agent_id,
messages=messages,
max_retries=MULTI_AGENT_SEND_MESSAGE_MAX_RETRIES,
timeout=MULTI_AGENT_SEND_MESSAGE_TIMEOUT,
max_retries=settings.multi_agent_send_message_max_retries,
timeout=settings.multi_agent_send_message_timeout,
logging_prefix=log_prefix,
)
@@ -429,8 +423,8 @@ def fire_and_forget_send_to_agent(
sender_agent=sender_agent,
target_agent_id=other_agent_id,
messages=messages,
max_retries=MULTI_AGENT_SEND_MESSAGE_MAX_RETRIES,
timeout=MULTI_AGENT_SEND_MESSAGE_TIMEOUT,
max_retries=settings.multi_agent_send_message_max_retries,
timeout=settings.multi_agent_send_message_timeout,
logging_prefix=log_prefix,
)
sender_agent.logger.info(f"{log_prefix} fire-and-forget success with retries: {result}")
@@ -489,7 +483,7 @@ async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent",
messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=sender_agent.agent_state.name)]
# Possibly limit concurrency to avoid meltdown:
sem = asyncio.Semaphore(MULTI_AGENT_CONCURRENT_SENDS)
sem = asyncio.Semaphore(settings.multi_agent_concurrent_sends)
async def _send_single(agent_state):
async with sem:
@@ -499,7 +493,7 @@ async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent",
target_agent_id=agent_state.id,
messages=messages,
max_retries=3,
timeout=30,
timeout=settings.multi_agent_send_message_timeout,
)
tasks = [asyncio.create_task(_send_single(agent_state)) for agent_state in matching_agents]

View File

@@ -146,6 +146,11 @@ class Settings(BaseSettings):
pg_pool_recycle: int = 1800 # When to recycle connections
pg_echo: bool = False # Logging
# multi agent settings
multi_agent_send_message_max_retries: int = 3
multi_agent_send_message_timeout: int = 20 * 60
multi_agent_concurrent_sends: int = 15
@property
def letta_pg_uri(self) -> str:
if self.pg_uri: