From 3fdf2b6c796eec620b115a89f5c92ab79183be21 Mon Sep 17 00:00:00 2001 From: Sarah Wooders Date: Mon, 2 Feb 2026 22:55:35 -0800 Subject: [PATCH] chore: deprecate old agent messaging (#9120) --- fern/openapi.json | 295 ++++++------------ letta/functions/function_sets/multi_agent.py | 48 +-- letta/server/rest_api/routers/v1/agents.py | 200 +++--------- .../rest_api/routers/v1/conversations.py | 23 -- letta/server/rest_api/routers/v1/groups.py | 72 ----- letta/server/server.py | 241 -------------- letta/services/streaming_service.py | 164 ++++------ ...manual_test_multi_agent_broadcast_large.py | 9 +- tests/test_multi_agent.py | 270 ---------------- 9 files changed, 205 insertions(+), 1117 deletions(-) diff --git a/fern/openapi.json b/fern/openapi.json index 88c0fe28..eda5a677 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -9559,12 +9559,12 @@ } } }, - "/v1/groups/{group_id}/messages": { - "post": { + "/v1/groups/{group_id}/messages/{message_id}": { + "patch": { "tags": ["groups"], - "summary": "Send Group Message", - "description": "Process a user message and return the group's response.\nThis endpoint accepts a message from a user and processes it through through agents in the group based on the specified pattern", - "operationId": "send_group_message", + "summary": "Modify Group Message", + "description": "Update the details of a message associated with an agent.", + "operationId": "modify_group_message", "deprecated": true, "parameters": [ { @@ -9581,6 +9581,21 @@ "title": "Group Id" }, "description": "The ID of the group in the format 'group-'" + }, + { + "name": "message_id", + "in": "path", + "required": true, + "schema": { + "type": "string", + "minLength": 44, + "maxLength": 44, + "pattern": "^message-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", + "description": "The ID of the message in the format 'message-'", + "examples": ["message-123e4567-e89b-42d3-8456-426614174000"], + "title": "Message Id" + }, + "description": "The ID of the message in the format 'message-'" } ], "requestBody": { @@ -9588,7 +9603,21 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/LettaRequest" + "anyOf": [ + { + "$ref": "#/components/schemas/UpdateSystemMessage" + }, + { + "$ref": "#/components/schemas/UpdateUserMessage" + }, + { + "$ref": "#/components/schemas/UpdateReasoningMessage" + }, + { + "$ref": "#/components/schemas/UpdateAssistantMessage" + } + ], + "title": "Request" } } } @@ -9599,7 +9628,58 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/LettaResponse" + "oneOf": [ + { + "$ref": "#/components/schemas/SystemMessage" + }, + { + "$ref": "#/components/schemas/UserMessage" + }, + { + "$ref": "#/components/schemas/ReasoningMessage" + }, + { + "$ref": "#/components/schemas/HiddenReasoningMessage" + }, + { + "$ref": "#/components/schemas/ToolCallMessage" + }, + { + "$ref": "#/components/schemas/ToolReturnMessage" + }, + { + "$ref": "#/components/schemas/AssistantMessage" + }, + { + "$ref": "#/components/schemas/ApprovalRequestMessage" + }, + { + "$ref": "#/components/schemas/ApprovalResponseMessage" + }, + { + "$ref": "#/components/schemas/SummaryMessage" + }, + { + "$ref": "#/components/schemas/EventMessage" + } + ], + "discriminator": { + "propertyName": "message_type", + "mapping": { + "system_message": "#/components/schemas/SystemMessage", + "user_message": "#/components/schemas/UserMessage", + "reasoning_message": "#/components/schemas/ReasoningMessage", + "hidden_reasoning_message": "#/components/schemas/HiddenReasoningMessage", + "tool_call_message": "#/components/schemas/ToolCallMessage", + "tool_return_message": "#/components/schemas/ToolReturnMessage", + "assistant_message": "#/components/schemas/AssistantMessage", + "approval_request_message": "#/components/schemas/ApprovalRequestMessage", + "approval_response_message": "#/components/schemas/ApprovalResponseMessage", + "summary_message": "#/components/schemas/SummaryMessage", + "event_message": "#/components/schemas/EventMessage" + } + }, + "title": "Response Modify Group Message" } } } @@ -9615,7 +9695,9 @@ } } } - }, + } + }, + "/v1/groups/{group_id}/messages": { "get": { "tags": ["groups"], "summary": "List Group Messages", @@ -9790,203 +9872,6 @@ } } }, - "/v1/groups/{group_id}/messages/stream": { - "post": { - "tags": ["groups"], - "summary": "Send Group Message Streaming", - "description": "Process a user message and return the group's responses.\nThis endpoint accepts a message from a user and processes it through agents in the group based on the specified pattern.\nIt will stream the steps of the response always, and stream the tokens if 'stream_tokens' is set to True.", - "operationId": "send_group_message_streaming", - "deprecated": true, - "parameters": [ - { - "name": "group_id", - "in": "path", - "required": true, - "schema": { - "type": "string", - "minLength": 42, - "maxLength": 42, - "pattern": "^group-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", - "description": "The ID of the group in the format 'group-'", - "examples": ["group-123e4567-e89b-42d3-8456-426614174000"], - "title": "Group Id" - }, - "description": "The ID of the group in the format 'group-'" - } - ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/LettaStreamingRequest" - } - } - } - }, - "responses": { - "200": { - "description": "Successful response", - "content": { - "application/json": { - "schema": {} - }, - "text/event-stream": { - "description": "Server-Sent Events stream" - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, - "/v1/groups/{group_id}/messages/{message_id}": { - "patch": { - "tags": ["groups"], - "summary": "Modify Group Message", - "description": "Update the details of a message associated with an agent.", - "operationId": "modify_group_message", - "deprecated": true, - "parameters": [ - { - "name": "group_id", - "in": "path", - "required": true, - "schema": { - "type": "string", - "minLength": 42, - "maxLength": 42, - "pattern": "^group-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", - "description": "The ID of the group in the format 'group-'", - "examples": ["group-123e4567-e89b-42d3-8456-426614174000"], - "title": "Group Id" - }, - "description": "The ID of the group in the format 'group-'" - }, - { - "name": "message_id", - "in": "path", - "required": true, - "schema": { - "type": "string", - "minLength": 44, - "maxLength": 44, - "pattern": "^message-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", - "description": "The ID of the message in the format 'message-'", - "examples": ["message-123e4567-e89b-42d3-8456-426614174000"], - "title": "Message Id" - }, - "description": "The ID of the message in the format 'message-'" - } - ], - "requestBody": { - "required": true, - "content": { - "application/json": { - "schema": { - "anyOf": [ - { - "$ref": "#/components/schemas/UpdateSystemMessage" - }, - { - "$ref": "#/components/schemas/UpdateUserMessage" - }, - { - "$ref": "#/components/schemas/UpdateReasoningMessage" - }, - { - "$ref": "#/components/schemas/UpdateAssistantMessage" - } - ], - "title": "Request" - } - } - } - }, - "responses": { - "200": { - "description": "Successful Response", - "content": { - "application/json": { - "schema": { - "oneOf": [ - { - "$ref": "#/components/schemas/SystemMessage" - }, - { - "$ref": "#/components/schemas/UserMessage" - }, - { - "$ref": "#/components/schemas/ReasoningMessage" - }, - { - "$ref": "#/components/schemas/HiddenReasoningMessage" - }, - { - "$ref": "#/components/schemas/ToolCallMessage" - }, - { - "$ref": "#/components/schemas/ToolReturnMessage" - }, - { - "$ref": "#/components/schemas/AssistantMessage" - }, - { - "$ref": "#/components/schemas/ApprovalRequestMessage" - }, - { - "$ref": "#/components/schemas/ApprovalResponseMessage" - }, - { - "$ref": "#/components/schemas/SummaryMessage" - }, - { - "$ref": "#/components/schemas/EventMessage" - } - ], - "discriminator": { - "propertyName": "message_type", - "mapping": { - "system_message": "#/components/schemas/SystemMessage", - "user_message": "#/components/schemas/UserMessage", - "reasoning_message": "#/components/schemas/ReasoningMessage", - "hidden_reasoning_message": "#/components/schemas/HiddenReasoningMessage", - "tool_call_message": "#/components/schemas/ToolCallMessage", - "tool_return_message": "#/components/schemas/ToolReturnMessage", - "assistant_message": "#/components/schemas/AssistantMessage", - "approval_request_message": "#/components/schemas/ApprovalRequestMessage", - "approval_response_message": "#/components/schemas/ApprovalResponseMessage", - "summary_message": "#/components/schemas/SummaryMessage", - "event_message": "#/components/schemas/EventMessage" - } - }, - "title": "Response Modify Group Message" - } - } - } - }, - "422": { - "description": "Validation Error", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HTTPValidationError" - } - } - } - } - } - } - }, "/v1/groups/{group_id}/reset-messages": { "patch": { "tags": ["groups"], diff --git a/letta/functions/function_sets/multi_agent.py b/letta/functions/function_sets/multi_agent.py index 100bf737..1ffec99e 100644 --- a/letta/functions/function_sets/multi_agent.py +++ b/letta/functions/function_sets/multi_agent.py @@ -1,18 +1,15 @@ import asyncio -import json -from concurrent.futures import ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING, List from letta.functions.helpers import ( + _send_message_to_agents_matching_tags_async, _send_message_to_all_agents_in_group_async, execute_send_message_to_agent, - extract_send_message_from_steps_messages, fire_and_forget_send_to_agent, ) from letta.schemas.enums import MessageRole from letta.schemas.message import MessageCreate from letta.server.rest_api.dependencies import get_letta_server -from letta.settings import settings def send_message_to_agent_and_wait_for_reply(self: "Agent", message: str, other_agent_id: str) -> str: @@ -67,46 +64,11 @@ def send_message_to_agents_matching_tags(self: "Agent", message: str, match_all: if not matching_agents: return [] - def process_agent(agent_id: str) -> str: - """Loads an agent, formats the message, and executes .step()""" - actor = self.user # Ensure correct actor context - agent = server.load_agent(agent_id=agent_id, interface=None, actor=actor) + # Prepare the message + messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=self.agent_state.name)] - # Prepare the message - messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=self.agent_state.name)] - - # Run .step() and return the response - usage_stats = agent.step( - input_messages=messages, - chaining=True, - max_chaining_steps=None, - stream=False, - skip_verify=True, - metadata=None, - put_inner_thoughts_first=True, - ) - - send_messages = extract_send_message_from_steps_messages(usage_stats.steps_messages, logger=agent.logger) - response_data = { - "agent_id": agent_id, - "response_messages": send_messages if send_messages else [""], - } - - return json.dumps(response_data, indent=2) - - # Use ThreadPoolExecutor for parallel execution - results = [] - with ThreadPoolExecutor(max_workers=settings.multi_agent_concurrent_sends) as executor: - future_to_agent = {executor.submit(process_agent, agent_state.id): agent_state for agent_state in matching_agents} - - for future in as_completed(future_to_agent): - try: - results.append(future.result()) # Collect results - except Exception as e: - # Log or handle failure for specific agents if needed - self.logger.exception(f"Error processing agent {future_to_agent[future]}: {e}") - - return results + # Use async helper for parallel message sending + return asyncio.run(_send_message_to_agents_matching_tags_async(self, server, messages, matching_agents)) def send_message_to_all_agents_in_group(self: "Agent", message: str) -> List[str]: diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 3264c03f..a70a9428 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -1557,23 +1557,6 @@ async def send_message( # Create a copy of agent state with the overridden llm_config agent = agent.model_copy(update={"llm_config": override_llm_config}) - agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] - model_compatible = agent.llm_config.model_endpoint_type in [ - "anthropic", - "openai", - "together", - "google_ai", - "google_vertex", - "bedrock", - "ollama", - "azure", - "xai", - "zai", - "groq", - "deepseek", - "chatgpt_oauth", - ] - # Create a new run for execution tracking if settings.track_agent_run: runs_manager = RunManager() @@ -1597,32 +1580,17 @@ async def send_message( run_update_metadata = None try: - result = None - if agent_eligible and model_compatible: - agent_loop = AgentLoop.load(agent_state=agent, actor=actor) - result = await agent_loop.step( - request.messages, - max_steps=request.max_steps, - run_id=run.id if run else None, - use_assistant_message=request.use_assistant_message, - request_start_timestamp_ns=request_start_timestamp_ns, - include_return_message_types=request.include_return_message_types, - client_tools=request.client_tools, - include_compaction_messages=request.include_compaction_messages, - ) - else: - result = await server.send_message_to_agent( - agent_id=agent_id, - actor=actor, - input_messages=request.messages, - stream_steps=False, - stream_tokens=False, - # Support for AssistantMessage - use_assistant_message=request.use_assistant_message, - assistant_message_tool_name=request.assistant_message_tool_name, - assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, - include_return_message_types=request.include_return_message_types, - ) + agent_loop = AgentLoop.load(agent_state=agent, actor=actor) + result = await agent_loop.step( + request.messages, + max_steps=request.max_steps, + run_id=run.id if run else None, + use_assistant_message=request.use_assistant_message, + request_start_timestamp_ns=request_start_timestamp_ns, + include_return_message_types=request.include_return_message_types, + client_tools=request.client_tools, + include_compaction_messages=request.include_compaction_messages, + ) run_status = result.stop_reason.stop_reason.run_status return result except PendingApprovalError as e: @@ -1844,47 +1812,16 @@ async def _process_message_background( # Create a copy of agent state with the overridden llm_config agent = agent.model_copy(update={"llm_config": override_llm_config}) - agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] - model_compatible = agent.llm_config.model_endpoint_type in [ - "anthropic", - "openai", - "together", - "google_ai", - "google_vertex", - "bedrock", - "ollama", - "azure", - "xai", - "zai", - "groq", - "deepseek", - ] - if agent_eligible and model_compatible: - agent_loop = AgentLoop.load(agent_state=agent, actor=actor) - result = await agent_loop.step( - messages, - max_steps=max_steps, - run_id=run_id, - use_assistant_message=use_assistant_message, - request_start_timestamp_ns=request_start_timestamp_ns, - include_return_message_types=include_return_message_types, - include_compaction_messages=include_compaction_messages, - ) - else: - result = await server.send_message_to_agent( - agent_id=agent_id, - actor=actor, - input_messages=messages, - stream_steps=False, - stream_tokens=False, - metadata={"run_id": run_id}, - # Support for AssistantMessage - use_assistant_message=use_assistant_message, - assistant_message_tool_name=assistant_message_tool_name, - assistant_message_tool_kwarg=assistant_message_tool_kwarg, - include_return_message_types=include_return_message_types, - ) - + agent_loop = AgentLoop.load(agent_state=agent, actor=actor) + result = await agent_loop.step( + messages, + max_steps=max_steps, + run_id=run_id, + use_assistant_message=use_assistant_message, + request_start_timestamp_ns=request_start_timestamp_ns, + include_return_message_types=include_return_message_types, + include_compaction_messages=include_compaction_messages, + ) runs_manager = RunManager() from letta.schemas.enums import RunStatus from letta.schemas.letta_stop_reason import StopReasonType @@ -2170,33 +2107,10 @@ async def preview_model_request( agent = await server.agent_manager.get_agent_by_id_async( agent_id, actor, include_relationships=["multi_agent_group", "memory", "sources"] ) - agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] - model_compatible = agent.llm_config.model_endpoint_type in [ - "anthropic", - "openai", - "together", - "google_ai", - "google_vertex", - "bedrock", - "ollama", - "azure", - "xai", - "zai", - "groq", - "deepseek", - "chatgpt_oauth", - ] - - if agent_eligible and model_compatible: - agent_loop = AgentLoop.load(agent_state=agent, actor=actor) - return await agent_loop.build_request( - input_messages=request.messages, - ) - else: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Payload inspection is not currently supported for this agent configuration.", - ) + agent_loop = AgentLoop.load(agent_state=agent, actor=actor) + return await agent_loop.build_request( + input_messages=request.messages, + ) class CompactionRequest(BaseModel): @@ -2225,53 +2139,31 @@ async def summarize_messages( actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"]) - agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] - model_compatible = agent.llm_config.model_endpoint_type in [ - "anthropic", - "openai", - "together", - "google_ai", - "google_vertex", - "bedrock", - "ollama", - "azure", - "xai", - "zai", - "groq", - "deepseek", - "chatgpt_oauth", - ] - if agent_eligible and model_compatible: - agent_loop = LettaAgentV3(agent_state=agent, actor=actor) - in_context_messages = await server.message_manager.get_messages_by_ids_async(message_ids=agent.message_ids, actor=actor) - compaction_settings = request.compaction_settings if request else None - num_messages_before = len(in_context_messages) - summary_message, messages, summary = await agent_loop.compact( - messages=in_context_messages, - compaction_settings=compaction_settings, - use_summary_role=True, - ) - num_messages_after = len(messages) + agent_loop = LettaAgentV3(agent_state=agent, actor=actor) + in_context_messages = await server.message_manager.get_messages_by_ids_async(message_ids=agent.message_ids, actor=actor) + compaction_settings = request.compaction_settings if request else None + num_messages_before = len(in_context_messages) + summary_message, messages, summary = await agent_loop.compact( + messages=in_context_messages, + compaction_settings=compaction_settings, + use_summary_role=True, + ) + num_messages_after = len(messages) - # update the agent state - logger.info(f"Summarized {num_messages_before} messages to {num_messages_after}") - if num_messages_before <= num_messages_after: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Summarization failed to reduce the number of messages. You may need to use a different CompactionSettings (e.g. using `all` mode).", - ) - await agent_loop._checkpoint_messages(run_id=None, step_id=None, new_messages=[summary_message], in_context_messages=messages) - return CompactionResponse( - summary=summary, - num_messages_before=num_messages_before, - num_messages_after=num_messages_after, - ) - else: + # update the agent state + logger.info(f"Summarized {num_messages_before} messages to {num_messages_after}") + if num_messages_before <= num_messages_after: raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.", + status_code=status.HTTP_400_BAD_REQUEST, + detail="Summarization failed to reduce the number of messages. You may need to use a different CompactionSettings (e.g. using `all` mode).", ) + await agent_loop._checkpoint_messages(run_id=None, step_id=None, new_messages=[summary_message], in_context_messages=messages) + return CompactionResponse( + summary=summary, + num_messages_before=num_messages_before, + num_messages_after=num_messages_after, + ) class CaptureMessagesRequest(BaseModel): diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index d1646f20..97a224d7 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -470,29 +470,6 @@ async def compact_conversation( # Get the agent state agent = await server.agent_manager.get_agent_by_id_async(conversation.agent_id, actor, include_relationships=["multi_agent_group"]) - # Check eligibility - agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] - model_compatible = agent.llm_config.model_endpoint_type in [ - "anthropic", - "openai", - "together", - "google_ai", - "google_vertex", - "bedrock", - "ollama", - "azure", - "xai", - "zai", - "groq", - "deepseek", - ] - - if not (agent_eligible and model_compatible): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Summarization is not currently supported for this agent configuration. Please contact Letta support.", - ) - # Get in-context messages for this conversation in_context_messages = await conversation_manager.get_messages_for_conversation( conversation_id=conversation_id, diff --git a/letta/server/rest_api/routers/v1/groups.py b/letta/server/rest_api/routers/v1/groups.py index 49553224..9117e4e5 100644 --- a/letta/server/rest_api/routers/v1/groups.py +++ b/letta/server/rest_api/routers/v1/groups.py @@ -7,7 +7,6 @@ from pydantic import Field from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG from letta.schemas.group import Group, GroupBase, GroupCreate, GroupUpdate, ManagerType from letta.schemas.letta_message import LettaMessageUnion, LettaMessageUpdateUnion -from letta.schemas.letta_request import LettaRequest, LettaStreamingRequest from letta.schemas.letta_response import LettaResponse from letta.schemas.message import BaseMessage from letta.server.rest_api.dependencies import HeaderParams, get_headers, get_letta_server @@ -128,77 +127,6 @@ async def delete_group( return JSONResponse(status_code=status.HTTP_200_OK, content={"message": f"Group id={group_id} successfully deleted"}) -@router.post( - "/{group_id}/messages", - response_model=LettaResponse, - operation_id="send_group_message", - deprecated=True, -) -async def send_group_message( - group_id: GroupId, - server: SyncServer = Depends(get_letta_server), - request: LettaRequest = Body(...), - headers: HeaderParams = Depends(get_headers), -): - """ - Process a user message and return the group's response. - This endpoint accepts a message from a user and processes it through through agents in the group based on the specified pattern - """ - actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - result = await server.send_group_message_to_agent( - group_id=group_id, - actor=actor, - input_messages=request.messages, - stream_steps=False, - stream_tokens=False, - # Support for AssistantMessage - use_assistant_message=request.use_assistant_message, - assistant_message_tool_name=request.assistant_message_tool_name, - assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, - ) - return result - - -@router.post( - "/{group_id}/messages/stream", - response_model=None, - operation_id="send_group_message_streaming", - deprecated=True, - responses={ - 200: { - "description": "Successful response", - "content": { - "text/event-stream": {"description": "Server-Sent Events stream"}, - }, - } - }, -) -async def send_group_message_streaming( - group_id: GroupId, - server: SyncServer = Depends(get_letta_server), - request: LettaStreamingRequest = Body(...), - headers: HeaderParams = Depends(get_headers), -): - """ - Process a user message and return the group's responses. - This endpoint accepts a message from a user and processes it through agents in the group based on the specified pattern. - It will stream the steps of the response always, and stream the tokens if 'stream_tokens' is set to True. - """ - actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - result = await server.send_group_message_to_agent( - group_id=group_id, - actor=actor, - input_messages=request.messages, - stream_steps=True, - stream_tokens=request.stream_tokens, - # Support for AssistantMessage - use_assistant_message=request.use_assistant_message, - assistant_message_tool_name=request.assistant_message_tool_name, - assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, - ) - return result - - GroupMessagesResponse = Annotated[ List[LettaMessageUnion], Field(json_schema_extra={"type": "array", "items": {"$ref": "#/components/schemas/LettaMessageUnion"}}) ] diff --git a/letta/server/server.py b/letta/server/server.py index 2197c38a..f00a7405 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -1754,244 +1754,3 @@ class SyncServer(object): raise LettaInvalidArgumentError(f"Failed to write MCP config file {mcp_config_path}") return list(current_mcp_servers.values()) - - @trace_method - async def send_message_to_agent( - self, - agent_id: str, - actor: User, - # role: MessageRole, - input_messages: List[MessageCreate], - stream_steps: bool, - stream_tokens: bool, - # related to whether or not we return `LettaMessage`s or `Message`s - chat_completion_mode: bool = False, - # Support for AssistantMessage - use_assistant_message: bool = True, - assistant_message_tool_name: str = constants.DEFAULT_MESSAGE_TOOL, - assistant_message_tool_kwarg: str = constants.DEFAULT_MESSAGE_TOOL_KWARG, - metadata: Optional[dict] = None, - request_start_timestamp_ns: Optional[int] = None, - include_return_message_types: Optional[List[MessageType]] = None, - ) -> Union[StreamingResponse, LettaResponse]: - """Split off into a separate function so that it can be imported in the /chat/completion proxy.""" - # TODO: @charles is this the correct way to handle? - include_final_message = True - - if not stream_steps and stream_tokens: - raise HTTPException(status_code=400, detail="stream_steps must be 'true' if stream_tokens is 'true'") - - # For streaming response - try: - # TODO: move this logic into server.py - - # Get the generator object off of the agent's streaming interface - # This will be attached to the POST SSE request used under-the-hood - letta_agent = self.load_agent(agent_id=agent_id, actor=actor) - - # Disable token streaming if not OpenAI or Anthropic - # TODO: cleanup this logic - llm_config = letta_agent.agent_state.llm_config - # supports_token_streaming = ["openai", "anthropic", "xai", "deepseek"] - supports_token_streaming = ["openai", "anthropic", "deepseek", "chatgpt_oauth"] # TODO re-enable xAI once streaming is patched - if stream_tokens and (llm_config.model_endpoint_type not in supports_token_streaming): - logger.warning( - f"Token streaming is only supported for models with type {' or '.join(supports_token_streaming)} in the model_endpoint: agent has endpoint type {llm_config.model_endpoint_type} and {llm_config.model_endpoint}. Setting stream_tokens to False." - ) - stream_tokens = False - - # Create a new interface per request - letta_agent.interface = StreamingServerInterface( - # multi_step=True, # would we ever want to disable this? - use_assistant_message=use_assistant_message, - assistant_message_tool_name=assistant_message_tool_name, - assistant_message_tool_kwarg=assistant_message_tool_kwarg, - inner_thoughts_in_kwargs=( - llm_config.put_inner_thoughts_in_kwargs if llm_config.put_inner_thoughts_in_kwargs is not None else False - ), - # inner_thoughts_kwarg=INNER_THOUGHTS_KWARG, - ) - streaming_interface = letta_agent.interface - if not isinstance(streaming_interface, StreamingServerInterface): - raise LettaInvalidArgumentError( - f"Agent has wrong type of interface: {type(streaming_interface)}", argument_name="interface" - ) - - # Enable token-streaming within the request if desired - streaming_interface.streaming_mode = stream_tokens - # "chatcompletion mode" does some remapping and ignores inner thoughts - streaming_interface.streaming_chat_completion_mode = chat_completion_mode - - # streaming_interface.allow_assistant_message = stream - # streaming_interface.function_call_legacy_mode = stream - - # Allow AssistantMessage is desired by client - # streaming_interface.use_assistant_message = use_assistant_message - # streaming_interface.assistant_message_tool_name = assistant_message_tool_name - # streaming_interface.assistant_message_tool_kwarg = assistant_message_tool_kwarg - - # Related to JSON buffer reader - # streaming_interface.inner_thoughts_in_kwargs = ( - # llm_config.put_inner_thoughts_in_kwargs if llm_config.put_inner_thoughts_in_kwargs is not None else False - # ) - - # Offload the synchronous message_func to a separate thread - streaming_interface.stream_start() - task = safe_create_task( - asyncio.to_thread( - self.send_messages, - actor=actor, - agent_id=agent_id, - input_messages=input_messages, - interface=streaming_interface, - metadata=metadata, - ), - label="send_messages_thread", - ) - - if stream_steps: - # return a stream - return StreamingResponse( - sse_async_generator( - streaming_interface.get_generator(), - usage_task=task, - finish_message=include_final_message, - request_start_timestamp_ns=request_start_timestamp_ns, - llm_config=llm_config, - ), - media_type="text/event-stream", - ) - - else: - # buffer the stream, then return the list - generated_stream = [] - async for message in streaming_interface.get_generator(): - assert ( - isinstance(message, LettaMessage) - or isinstance(message, LegacyLettaMessage) - or isinstance(message, MessageStreamStatus) - ), type(message) - generated_stream.append(message) - if message == MessageStreamStatus.done: - break - - # Get rid of the stream status messages - filtered_stream = [d for d in generated_stream if not isinstance(d, MessageStreamStatus)] - - # Apply message type filtering if specified - if include_return_message_types is not None: - filtered_stream = [msg for msg in filtered_stream if msg.message_type in include_return_message_types] - - usage = await task - - # By default the stream will be messages of type LettaMessage or LettaLegacyMessage - # If we want to convert these to Message, we can use the attached IDs - # NOTE: we will need to de-duplicate the Messsage IDs though (since Assistant->Inner+Func_Call) - # TODO: eventually update the interface to use `Message` and `MessageChunk` (new) inside the deque instead - return LettaResponse( - messages=filtered_stream, - stop_reason=LettaStopReason(stop_reason=StopReasonType.end_turn.value), - usage=usage, - ) - - except HTTPException: - raise - except Exception as e: - logger.exception(f"Error sending message to agent: {e}") - raise HTTPException(status_code=500, detail=f"{e}") - - @trace_method - async def send_group_message_to_agent( - self, - group_id: str, - actor: User, - input_messages: Union[List[Message], List[MessageCreate]], - stream_steps: bool, - stream_tokens: bool, - chat_completion_mode: bool = False, - # Support for AssistantMessage - use_assistant_message: bool = True, - assistant_message_tool_name: str = constants.DEFAULT_MESSAGE_TOOL, - assistant_message_tool_kwarg: str = constants.DEFAULT_MESSAGE_TOOL_KWARG, - metadata: Optional[dict] = None, - ) -> Union[StreamingResponse, LettaResponse]: - include_final_message = True - if not stream_steps and stream_tokens: - raise LettaInvalidArgumentError("stream_steps must be 'true' if stream_tokens is 'true'", argument_name="stream_steps") - - group = await self.group_manager.retrieve_group_async(group_id=group_id, actor=actor) - agent_state_id = group.manager_agent_id or (group.agent_ids[0] if len(group.agent_ids) > 0 else None) - agent_state = await self.agent_manager.get_agent_by_id_async(agent_id=agent_state_id, actor=actor) if agent_state_id else None - letta_multi_agent = load_multi_agent(group=group, agent_state=agent_state, actor=actor) - - llm_config = letta_multi_agent.agent_state.llm_config - supports_token_streaming = ["openai", "anthropic", "deepseek", "chatgpt_oauth"] - if stream_tokens and (llm_config.model_endpoint_type not in supports_token_streaming): - logger.warning( - f"Token streaming is only supported for models with type {' or '.join(supports_token_streaming)} in the model_endpoint: agent has endpoint type {llm_config.model_endpoint_type} and {llm_config.model_endpoint}. Setting stream_tokens to False." - ) - stream_tokens = False - - # Create a new interface per request - letta_multi_agent.interface = StreamingServerInterface( - use_assistant_message=use_assistant_message, - assistant_message_tool_name=assistant_message_tool_name, - assistant_message_tool_kwarg=assistant_message_tool_kwarg, - inner_thoughts_in_kwargs=( - llm_config.put_inner_thoughts_in_kwargs if llm_config.put_inner_thoughts_in_kwargs is not None else False - ), - ) - streaming_interface = letta_multi_agent.interface - if not isinstance(streaming_interface, StreamingServerInterface): - raise LettaInvalidArgumentError(f"Agent has wrong type of interface: {type(streaming_interface)}", argument_name="interface") - streaming_interface.streaming_mode = stream_tokens - streaming_interface.streaming_chat_completion_mode = chat_completion_mode - if metadata and hasattr(streaming_interface, "metadata"): - streaming_interface.metadata = metadata - - streaming_interface.stream_start() - task = safe_create_task( - asyncio.to_thread( - letta_multi_agent.step, - input_messages=input_messages, - chaining=self.chaining, - max_chaining_steps=self.max_chaining_steps, - ), - label="multi_agent_step_thread", - ) - - if stream_steps: - # return a stream - return StreamingResponse( - sse_async_generator( - streaming_interface.get_generator(), - usage_task=task, - finish_message=include_final_message, - ), - media_type="text/event-stream", - ) - - else: - # buffer the stream, then return the list - generated_stream = [] - async for message in streaming_interface.get_generator(): - assert ( - isinstance(message, LettaMessage) or isinstance(message, LegacyLettaMessage) or isinstance(message, MessageStreamStatus) - ), type(message) - generated_stream.append(message) - if message == MessageStreamStatus.done: - break - - # Get rid of the stream status messages - filtered_stream = [d for d in generated_stream if not isinstance(d, MessageStreamStatus)] - usage = await task - - # By default the stream will be messages of type LettaMessage or LettaLegacyMessage - # If we want to convert these to Message, we can use the attached IDs - # NOTE: we will need to de-duplicate the Messsage IDs though (since Assistant->Inner+Func_Call) - # TODO: eventually update the interface to use `Message` and `MessageChunk` (new) inside the deque instead - return LettaResponse( - messages=filtered_stream, - stop_reason=LettaStopReason(stop_reason=StopReasonType.end_turn.value), - usage=usage, - ) diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 9ed43891..32408d47 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -111,8 +111,6 @@ class StreamingService: # Create a copy of agent state with the overridden llm_config agent = agent.model_copy(update={"llm_config": override_llm_config}) - agent_eligible = self._is_agent_eligible(agent) - model_compatible = self._is_model_compatible(agent) model_compatible_token_streaming = self._is_token_streaming_compatible(agent) # Attempt to acquire conversation lock if conversation_id is provided @@ -133,68 +131,40 @@ class StreamingService: run = await self._create_run(agent_id, request, run_type, actor, conversation_id=conversation_id) await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{agent_id}", run.id if run else None) - if agent_eligible and model_compatible: - # use agent loop for streaming - agent_loop = AgentLoop.load(agent_state=agent, actor=actor) + # use agent loop for streaming + agent_loop = AgentLoop.load(agent_state=agent, actor=actor) - # create the base stream with error handling - raw_stream = self._create_error_aware_stream( - agent_loop=agent_loop, - messages=request.messages, - max_steps=request.max_steps, - stream_tokens=request.stream_tokens and model_compatible_token_streaming, - run_id=run.id if run else None, - use_assistant_message=request.use_assistant_message, - request_start_timestamp_ns=request_start_timestamp_ns, - include_return_message_types=request.include_return_message_types, - actor=actor, - conversation_id=conversation_id, - client_tools=request.client_tools, - include_compaction_messages=request.include_compaction_messages, - ) + # create the base stream with error handling + raw_stream = self._create_error_aware_stream( + agent_loop=agent_loop, + messages=request.messages, + max_steps=request.max_steps, + stream_tokens=request.stream_tokens and model_compatible_token_streaming, + run_id=run.id if run else None, + use_assistant_message=request.use_assistant_message, + request_start_timestamp_ns=request_start_timestamp_ns, + include_return_message_types=request.include_return_message_types, + actor=actor, + conversation_id=conversation_id, + client_tools=request.client_tools, + include_compaction_messages=request.include_compaction_messages, + ) + - # handle background streaming if requested - if request.background and settings.track_agent_run: - if isinstance(redis_client, NoopAsyncRedisClient): - raise LettaServiceUnavailableError( - f"Background streaming requires Redis to be running. " - f"Please ensure Redis is properly configured. " - f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}", - service_name="redis", - ) - - # Wrap the agent loop stream with cancellation awareness for background task - background_stream = raw_stream - if settings.enable_cancellation_aware_streaming and run: - background_stream = cancellation_aware_stream_wrapper( - stream_generator=raw_stream, - run_manager=self.runs_manager, - run_id=run.id, - actor=actor, - cancellation_event=get_cancellation_event_for_run(run.id), - ) - - safe_create_task( - create_background_stream_processor( - stream_generator=background_stream, - redis_client=redis_client, - run_id=run.id, - run_manager=self.server.run_manager, - actor=actor, - conversation_id=conversation_id, - ), - label=f"background_stream_processor_{run.id}", + # handle background streaming if requested + if request.background and settings.track_agent_run: + if isinstance(redis_client, NoopAsyncRedisClient): + raise LettaServiceUnavailableError( + f"Background streaming requires Redis to be running. " + f"Please ensure Redis is properly configured. " + f"LETTA_REDIS_HOST: {settings.redis_host}, LETTA_REDIS_PORT: {settings.redis_port}", + service_name="redis", ) - raw_stream = redis_sse_stream_generator( - redis_client=redis_client, - run_id=run.id, - ) - - # wrap client stream with cancellation awareness if enabled and tracking runs - stream = raw_stream - if settings.enable_cancellation_aware_streaming and settings.track_agent_run and run and not request.background: - stream = cancellation_aware_stream_wrapper( + # Wrap the agent loop stream with cancellation awareness for background task + background_stream = raw_stream + if settings.enable_cancellation_aware_streaming and run: + background_stream = cancellation_aware_stream_wrapper( stream_generator=raw_stream, run_manager=self.runs_manager, run_id=run.id, @@ -202,29 +172,43 @@ class StreamingService: cancellation_event=get_cancellation_event_for_run(run.id), ) - # conditionally wrap with keepalive based on request parameter - if request.include_pings and settings.enable_keepalive: - stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval, run_id=run.id) + safe_create_task( + create_background_stream_processor( + stream_generator=background_stream, + redis_client=redis_client, + run_id=run.id, + run_manager=self.server.run_manager, + actor=actor, + conversation_id=conversation_id, + ), + label=f"background_stream_processor_{run.id}", + ) - result = StreamingResponseWithStatusCode( - stream, - media_type="text/event-stream", + raw_stream = redis_sse_stream_generator( + redis_client=redis_client, + run_id=run.id, ) - else: - # fallback to non-agent-loop streaming - result = await self.server.send_message_to_agent( - agent_id=agent_id, + + # wrap client stream with cancellation awareness if enabled and tracking runs + stream = raw_stream + if settings.enable_cancellation_aware_streaming and settings.track_agent_run and run and not request.background: + stream = cancellation_aware_stream_wrapper( + stream_generator=raw_stream, + run_manager=self.runs_manager, + run_id=run.id, actor=actor, - input_messages=request.messages, - stream_steps=True, - stream_tokens=request.stream_tokens, - use_assistant_message=request.use_assistant_message, - assistant_message_tool_name=request.assistant_message_tool_name, - assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, - request_start_timestamp_ns=request_start_timestamp_ns, - include_return_message_types=request.include_return_message_types, + cancellation_event=get_cancellation_event_for_run(run.id), ) + # conditionally wrap with keepalive based on request parameter + if request.include_pings and settings.enable_keepalive: + stream = add_keepalive_to_stream(stream, keepalive_interval=settings.keepalive_interval, run_id=run.id) + + result = StreamingResponseWithStatusCode( + stream, + media_type="text/event-stream", + ) + # update run status to running before returning if settings.track_agent_run and run: # refetch run since it may have been updated by another service @@ -499,30 +483,6 @@ class StreamingService: return error_aware_stream() - def _is_agent_eligible(self, agent: AgentState) -> bool: - """Check if agent is eligible for streaming.""" - return agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] - - def _is_model_compatible(self, agent: AgentState) -> bool: - """Check if agent's model is compatible with streaming.""" - return agent.llm_config.model_endpoint_type in [ - "anthropic", - "openai", - "together", - "google_ai", - "google_vertex", - "bedrock", - "ollama", - "azure", - "xai", - "zai", - "groq", - "deepseek", - "chatgpt_oauth", - "minimax", - "openrouter", - ] - def _is_token_streaming_compatible(self, agent: AgentState) -> bool: """Check if agent's model supports token-level streaming.""" base_compatible = agent.llm_config.model_endpoint_type in [ diff --git a/tests/manual_test_multi_agent_broadcast_large.py b/tests/manual_test_multi_agent_broadcast_large.py index 7f992910..8a0f315b 100644 --- a/tests/manual_test_multi_agent_broadcast_large.py +++ b/tests/manual_test_multi_agent_broadcast_large.py @@ -69,12 +69,9 @@ def test_multi_agent_large(server, default_user, roll_dice_tool, num_workers): actor=default_user, ) - manager_agent = server.load_agent(agent_id=manager_agent_state.id, actor=default_user) - # Create N worker agents - worker_agents = [] for idx in tqdm(range(num_workers)): - worker_agent_state = server.create_agent( + server.create_agent( CreateAgent( name=f"worker-{idx}", tool_ids=[roll_dice_tool.id], @@ -86,13 +83,11 @@ def test_multi_agent_large(server, default_user, roll_dice_tool, num_workers): ), actor=default_user, ) - worker_agent = server.load_agent(agent_id=worker_agent_state.id, actor=default_user) - worker_agents.append(worker_agent) # Manager sends broadcast message broadcast_message = f"Send a message to all agents with tags {worker_tags} asking them to roll a dice for you!" server.send_messages( actor=default_user, - agent_id=manager_agent.agent_state.id, + agent_id=manager_agent_state.id, input_messages=[MessageCreate(role="user", content=broadcast_message)], ) diff --git a/tests/test_multi_agent.py b/tests/test_multi_agent.py index 28a4ff3a..931e8118 100644 --- a/tests/test_multi_agent.py +++ b/tests/test_multi_agent.py @@ -1,20 +1,14 @@ -import asyncio - import pytest from letta.config import LettaConfig from letta.schemas.agent import CreateAgent from letta.schemas.block import CreateBlock from letta.schemas.group import ( - DynamicManager, DynamicManagerUpdate, GroupCreate, GroupUpdate, ManagerType, - RoundRobinManagerUpdate, - SupervisorManager, ) -from letta.schemas.message import MessageCreate from letta.server.server import SyncServer @@ -125,30 +119,6 @@ async def manager_agent(server, default_user): yield agent_scooby -async def test_empty_group(server, default_user): - group = await server.group_manager.create_group_async( - group=GroupCreate( - description="This is a group chat between best friends all like to hang out together. In their free time they like to solve mysteries.", - agent_ids=[], - ), - actor=default_user, - ) - with pytest.raises(ValueError, match="Empty group"): - await server.send_group_message_to_agent( - group_id=group.id, - actor=default_user, - input_messages=[ - MessageCreate( - role="user", - content="what is everyone up to for the holidays?", - ), - ], - stream_steps=False, - stream_tokens=False, - ) - await server.group_manager.delete_group_async(group_id=group.id, actor=default_user) - - async def test_modify_group_pattern(server, default_user, four_participant_agents, manager_agent): group = await server.group_manager.create_group_async( group=GroupCreate( @@ -195,243 +165,3 @@ async def test_list_agent_groups(server, default_user, four_participant_agents): await server.group_manager.delete_group_async(group_id=group_a.id, actor=default_user) await server.group_manager.delete_group_async(group_id=group_b.id, actor=default_user) - - -async def test_round_robin(server, default_user, four_participant_agents): - description = ( - "This is a group chat between best friends all like to hang out together. In their free time they like to solve mysteries." - ) - group = await server.group_manager.create_group_async( - group=GroupCreate( - description=description, - agent_ids=[agent.id for agent in four_participant_agents], - ), - actor=default_user, - ) - - # verify group creation - assert group.manager_type == ManagerType.round_robin - assert group.description == description - assert group.agent_ids == [agent.id for agent in four_participant_agents] - assert group.max_turns is None - assert group.manager_agent_id is None - assert group.termination_token is None - - try: - server.group_manager.reset_messages(group_id=group.id, actor=default_user) - response = await server.send_group_message_to_agent( - group_id=group.id, - actor=default_user, - input_messages=[ - MessageCreate( - role="user", - content="what is everyone up to for the holidays?", - ), - ], - stream_steps=False, - stream_tokens=False, - ) - assert response.usage.step_count == len(group.agent_ids) - assert len(response.messages) == response.usage.step_count * 2 - for i, message in enumerate(response.messages): - assert message.message_type == "reasoning_message" if i % 2 == 0 else "assistant_message" - assert message.name == four_participant_agents[i // 2].name - - for agent_id in group.agent_ids: - agent_messages = await server.get_agent_recall( - user_id=default_user.id, - agent_id=agent_id, - group_id=group.id, - reverse=True, - return_message_object=False, - ) - assert len(agent_messages) == len(group.agent_ids) + 2 # add one for user message, one for reasoning message - - # TODO: filter this to return a clean conversation history - messages = server.group_manager.list_group_messages( - group_id=group.id, - actor=default_user, - ) - assert len(messages) == (len(group.agent_ids) + 2) * len(group.agent_ids) - - max_turns = 3 - group = await server.group_manager.modify_group_async( - group_id=group.id, - group_update=GroupUpdate( - agent_ids=[agent.id for agent in four_participant_agents][::-1], - manager_config=RoundRobinManagerUpdate( - max_turns=max_turns, - ), - ), - actor=default_user, - ) - assert group.manager_type == ManagerType.round_robin - assert group.description == description - assert group.agent_ids == [agent.id for agent in four_participant_agents][::-1] - assert group.max_turns == max_turns - assert group.manager_agent_id is None - assert group.termination_token is None - - server.group_manager.reset_messages(group_id=group.id, actor=default_user) - - response = await server.send_group_message_to_agent( - group_id=group.id, - actor=default_user, - input_messages=[ - MessageCreate( - role="user", - content="when should we plan our next adventure?", - ), - ], - stream_steps=False, - stream_tokens=False, - ) - assert response.usage.step_count == max_turns - assert len(response.messages) == max_turns * 2 - - for i, message in enumerate(response.messages): - assert message.message_type == "reasoning_message" if i % 2 == 0 else "assistant_message" - assert message.name == four_participant_agents[::-1][i // 2].name - - for i in range(len(group.agent_ids)): - agent_messages = await server.get_agent_recall( - user_id=default_user.id, - agent_id=group.agent_ids[i], - group_id=group.id, - reverse=True, - return_message_object=False, - ) - expected_message_count = max_turns + 1 if i >= max_turns else max_turns + 2 - assert len(agent_messages) == expected_message_count - - finally: - await server.group_manager.delete_group_async(group_id=group.id, actor=default_user) - - -async def test_supervisor(server, default_user, four_participant_agents): - agent_scrappy = await server.create_agent_async( - request=CreateAgent( - name="shaggy", - memory_blocks=[ - CreateBlock( - label="persona", - value="You are a puppy operations agent for Letta and you help run multi-agent group chats. Your role is to supervise the group, sending messages and aggregating the responses.", - ), - CreateBlock( - label="human", - value="", - ), - ], - model="openai/gpt-4o-mini", - embedding="openai/text-embedding-3-small", - ), - actor=default_user, - ) - - group = await server.group_manager.create_group_async( - group=GroupCreate( - description="This is a group chat between best friends all like to hang out together. In their free time they like to solve mysteries.", - agent_ids=[agent.id for agent in four_participant_agents], - manager_config=SupervisorManager( - manager_agent_id=agent_scrappy.id, - ), - ), - actor=default_user, - ) - try: - response = await server.send_group_message_to_agent( - group_id=group.id, - actor=default_user, - input_messages=[ - MessageCreate( - role="user", - content="ask everyone what they like to do for fun and then come up with an activity for everyone to do together.", - ), - ], - stream_steps=False, - stream_tokens=False, - ) - assert response.usage.step_count == 2 - assert len(response.messages) == 5 - - # verify tool call - assert response.messages[0].message_type == "reasoning_message" - assert ( - response.messages[1].message_type == "tool_call_message" - and response.messages[1].tool_call.name == "send_message_to_all_agents_in_group" - ) - assert response.messages[2].message_type == "tool_return_message" and len(eval(response.messages[2].tool_return)) == len( - four_participant_agents - ) - assert response.messages[3].message_type == "reasoning_message" - assert response.messages[4].message_type == "assistant_message" - - finally: - await server.group_manager.delete_group_async(group_id=group.id, actor=default_user) - server.agent_manager.delete_agent(agent_id=agent_scrappy.id, actor=default_user) - - -@pytest.mark.flaky(max_runs=2) -async def test_dynamic_group_chat(server, default_user, manager_agent, four_participant_agents): - description = ( - "This is a group chat between best friends all like to hang out together. In their free time they like to solve mysteries." - ) - # error on duplicate agent in participant list - with pytest.raises(ValueError, match="Duplicate agent ids"): - await server.group_manager.create_group_async( - group=GroupCreate( - description=description, - agent_ids=[agent.id for agent in four_participant_agents] + [four_participant_agents[0].id], - manager_config=DynamicManager( - manager_agent_id=manager_agent.id, - ), - ), - actor=default_user, - ) - # error on duplicate agent names - duplicate_agent_shaggy = server.create_agent( - request=CreateAgent( - name="shaggy", - model="openai/gpt-4o-mini", - embedding="openai/text-embedding-3-small", - ), - actor=default_user, - ) - with pytest.raises(ValueError, match="Duplicate agent names"): - await server.group_manager.create_group_async( - group=GroupCreate( - description=description, - agent_ids=[agent.id for agent in four_participant_agents] + [duplicate_agent_shaggy.id], - manager_config=DynamicManager( - manager_agent_id=manager_agent.id, - ), - ), - actor=default_user, - ) - server.agent_manager.delete_agent(duplicate_agent_shaggy.id, actor=default_user) - - group = await server.group_manager.create_group_async( - group=GroupCreate( - description=description, - agent_ids=[agent.id for agent in four_participant_agents], - manager_config=DynamicManager( - manager_agent_id=manager_agent.id, - ), - ), - actor=default_user, - ) - try: - response = await server.send_group_message_to_agent( - group_id=group.id, - actor=default_user, - input_messages=[ - MessageCreate(role="user", content="what is everyone up to for the holidays?"), - ], - stream_steps=False, - stream_tokens=False, - ) - assert response.usage.step_count == len(four_participant_agents) * 2 - assert len(response.messages) == response.usage.step_count * 2 - - finally: - await server.group_manager.delete_group_async(group_id=group.id, actor=default_user)