diff --git a/letta/services/tool_executor/multi_agent_tool_executor.py b/letta/services/tool_executor/multi_agent_tool_executor.py index 3d90a0e1..2e6a3815 100644 --- a/letta/services/tool_executor/multi_agent_tool_executor.py +++ b/letta/services/tool_executor/multi_agent_tool_executor.py @@ -87,7 +87,7 @@ class LettaMultiAgentToolExecutor(ToolExecutor): return str(results) async def _process_agent(self, agent_state: AgentState, message: str, actor: User) -> Dict[str, Any]: - from letta.agents.letta_agent_v2 import LettaAgentV2 + from letta.agents.letta_agent_v3 import LettaAgentV3 try: runs_manager = RunManager() @@ -102,7 +102,7 @@ class LettaMultiAgentToolExecutor(ToolExecutor): actor=actor, ) - letta_agent = LettaAgentV2( + letta_agent = LettaAgentV3( agent_state=agent_state, actor=self.actor, ) diff --git a/tests/integration_test_multi_agent.py b/tests/integration_test_multi_agent.py index 57021fc8..47927eb6 100644 --- a/tests/integration_test_multi_agent.py +++ b/tests/integration_test_multi_agent.py @@ -63,6 +63,22 @@ def client(server_url: str) -> Letta: @pytest.fixture(autouse=True) def remove_stale_agents(client): + """ + Clean up ALL stale agents before each test to ensure fresh state. + This is critical because @retry_until_success decorator will retry tests + and accumulate agents if not cleaned up. + + Note: agent_id in llm_batch_items has ondelete="CASCADE" set in the migration, + so batch items should be automatically deleted. However, we catch errors + in case the migration hasn't been run yet or there are orphaned records. + """ + stale_agents = client.agents.list(limit=300) + for agent in stale_agents: + client.agents.delete(agent_id=agent.id) + + yield # Run the test + + # Also cleanup after test to prevent accumulation across test runs stale_agents = client.agents.list(limit=300) for agent in stale_agents: client.agents.delete(agent_id=agent.id) @@ -201,8 +217,12 @@ def test_send_message_to_agent(client: Letta, agent_obj: AgentState, other_agent @retry_until_success(max_attempts=5, sleep_time_seconds=2) def test_send_message_to_agents_with_tags_simple(client: Letta): - worker_tags_123 = ["worker", "user-123"] - worker_tags_456 = ["worker", "user-456"] + import uuid + + # Add unique identifier to tags to prevent interference from retry attempts + test_id = str(uuid.uuid4())[:8] + worker_tags_123 = ["worker", f"user-123-{test_id}"] + worker_tags_456 = ["worker", f"user-456-{test_id}"] secret_word = "banana" @@ -262,10 +282,14 @@ def test_send_message_to_agents_with_tags_simple(client: Letta): # Verify responses from all expected worker agents worker_agent_ids = {agent.id for agent in worker_agents_456} returned_agent_ids = set() - for json_str in tool_response: - response_obj = json.loads(json_str) + for item in tool_response: + # Handle both dict and JSON string formats + if isinstance(item, str): + response_obj = json.loads(item) + else: + response_obj = item assert response_obj["agent_id"] in worker_agent_ids - assert response_obj["response_messages"] != [""] + assert response_obj.get("response", response_obj.get("response_messages")) != [""] returned_agent_ids.add(response_obj["agent_id"]) break @@ -308,6 +332,11 @@ def test_send_message_to_agents_with_tags_simple(client: Letta): @retry_until_success(max_attempts=5, sleep_time_seconds=2) def test_send_message_to_agents_with_tags_complex_tool_use(client: Letta, roll_dice_tool): + import uuid + + # Add unique identifier to tags to prevent interference from retry attempts + test_id = str(uuid.uuid4())[:8] + # Create "manager" agent send_message_to_agents_matching_tags_tool_id = list(client.tools.list(name="send_message_to_agents_matching_tags"))[0].id manager_agent_state = client.agents.create( @@ -319,7 +348,7 @@ def test_send_message_to_agents_with_tags_complex_tool_use(client: Letta, roll_d # Create 2 worker agents worker_agents = [] - worker_tags = ["dice-rollers"] + worker_tags = ["dice-rollers", f"test-{test_id}"] for _ in range(2): worker_agent_state = client.agents.create( agent_type="letta_v1_agent", @@ -354,12 +383,16 @@ def test_send_message_to_agents_with_tags_complex_tool_use(client: Letta, roll_d worker_agent_ids = {agent.id for agent in worker_agents} returned_agent_ids = set() all_responses = [] - for json_str in tool_response: - response_obj = json.loads(json_str) + for item in tool_response: + # Handle both dict and JSON string formats + if isinstance(item, str): + response_obj = json.loads(item) + else: + response_obj = item assert response_obj["agent_id"] in worker_agent_ids - assert response_obj["response_messages"] != [""] + assert response_obj.get("response", response_obj.get("response_messages")) != [""] returned_agent_ids.add(response_obj["agent_id"]) - all_responses.extend(response_obj["response_messages"]) + all_responses.extend(response_obj.get("response", response_obj.get("response_messages", []))) break # Test that the agent can still receive messages fine