diff --git a/fern/openapi.json b/fern/openapi.json index e4442c78..f7c35898 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -9003,7 +9003,7 @@ "get": { "tags": ["conversations"], "summary": "List Conversation Messages", - "description": "List all messages in a conversation.\n\nReturns LettaMessage objects (UserMessage, AssistantMessage, etc.) for all\nmessages in the conversation, with support for cursor-based pagination.", + "description": "List all messages in a conversation.\n\nReturns LettaMessage objects (UserMessage, AssistantMessage, etc.) for all\nmessages in the conversation, with support for cursor-based pagination.\n\nIf conversation_id is an agent ID (starts with \"agent-\"), returns messages\nfrom the agent's default conversation (no conversation isolation).", "operationId": "list_conversation_messages", "parameters": [ { @@ -9238,7 +9238,7 @@ "post": { "tags": ["conversations"], "summary": "Retrieve Conversation Stream", - "description": "Resume the stream for the most recent active run in a conversation.\n\nThis endpoint allows you to reconnect to an active background stream\nfor a conversation, enabling recovery from network interruptions.", + "description": "Resume the stream for the most recent active run in a conversation.\n\nThis endpoint allows you to reconnect to an active background stream\nfor a conversation, enabling recovery from network interruptions.\n\nIf conversation_id is an agent ID (starts with \"agent-\"), retrieves the\nstream for the agent's most recent active run.", "operationId": "retrieve_conversation_stream", "parameters": [ { @@ -9342,7 +9342,7 @@ "post": { "tags": ["conversations"], "summary": "Cancel Conversation", - "description": "Cancel runs associated with a conversation.\n\nNote: To cancel active runs, Redis is required.", + "description": "Cancel runs associated with a conversation.\n\nNote: To cancel active runs, Redis is required.\n\nIf conversation_id is an agent ID (starts with \"agent-\"), cancels runs\nfor the agent's default conversation.", "operationId": "cancel_conversation", "parameters": [ { @@ -9395,7 +9395,7 @@ "post": { "tags": ["conversations"], "summary": "Compact Conversation", - "description": "Compact (summarize) a conversation's message history.\n\nThis endpoint summarizes the in-context messages for a specific conversation,\nreducing the message count while preserving important context.", + "description": "Compact (summarize) a conversation's message history.\n\nThis endpoint summarizes the in-context messages for a specific conversation,\nreducing the message count while preserving important context.\n\nIf conversation_id is an agent ID (starts with \"agent-\"), compacts the\nagent's default conversation messages.", "operationId": "compact_conversation", "parameters": [ { diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index 67a7b1a6..e4865a80 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -173,8 +173,27 @@ async def list_conversation_messages( Returns LettaMessage objects (UserMessage, AssistantMessage, etc.) for all messages in the conversation, with support for cursor-based pagination. + + If conversation_id is an agent ID (starts with "agent-"), returns messages + from the agent's default conversation (no conversation isolation). """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + + # Agent-direct mode: list agent's default conversation messages + if conversation_id.startswith("agent-"): + return await server.get_agent_recall_async( + agent_id=conversation_id, + after=after, + before=before, + limit=limit, + group_id=group_id, + conversation_id=None, # Default conversation (no isolation) + reverse=(order == "desc"), + return_message_object=False, + include_err=include_err, + actor=actor, + ) + return await conversation_manager.list_conversation_messages( conversation_id=conversation_id, actor=actor, @@ -468,18 +487,32 @@ async def retrieve_conversation_stream( This endpoint allows you to reconnect to an active background stream for a conversation, enabling recovery from network interruptions. + + If conversation_id is an agent ID (starts with "agent-"), retrieves the + stream for the agent's most recent active run. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) runs_manager = RunManager() - # Find the most recent active run for this conversation - active_runs = await runs_manager.list_runs( - actor=actor, - conversation_id=conversation_id, - statuses=[RunStatus.created, RunStatus.running], - limit=1, - ascending=False, - ) + # Find the most recent active run + if conversation_id.startswith("agent-"): + # Agent-direct mode: find runs by agent_id + active_runs = await runs_manager.list_runs( + actor=actor, + agent_id=conversation_id, + statuses=[RunStatus.created, RunStatus.running], + limit=1, + ascending=False, + ) + else: + # Normal mode: find runs by conversation_id + active_runs = await runs_manager.list_runs( + actor=actor, + conversation_id=conversation_id, + statuses=[RunStatus.created, RunStatus.running], + limit=1, + ascending=False, + ) if not active_runs: raise LettaInvalidArgumentError("No active runs found for this conversation.") @@ -542,26 +575,43 @@ async def cancel_conversation( Cancel runs associated with a conversation. Note: To cancel active runs, Redis is required. + + If conversation_id is an agent ID (starts with "agent-"), cancels runs + for the agent's default conversation. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) if not settings.track_agent_run: raise HTTPException(status_code=400, detail="Agent run tracking is disabled") - # Verify conversation exists and get agent_id - conversation = await conversation_manager.get_conversation_by_id( - conversation_id=conversation_id, - actor=actor, - ) + # Agent-direct mode: use agent_id directly, skip conversation lookup + if conversation_id.startswith("agent-"): + agent_id = conversation_id + # Find active runs for this agent (default conversation has conversation_id=None) + runs = await server.run_manager.list_runs( + actor=actor, + agent_id=agent_id, + statuses=[RunStatus.created, RunStatus.running], + ascending=False, + limit=100, + ) + else: + # Verify conversation exists and get agent_id + conversation = await conversation_manager.get_conversation_by_id( + conversation_id=conversation_id, + actor=actor, + ) + agent_id = conversation.agent_id + + # Find active runs for this conversation + runs = await server.run_manager.list_runs( + actor=actor, + statuses=[RunStatus.created, RunStatus.running], + ascending=False, + conversation_id=conversation_id, + limit=100, + ) - # Find active runs for this conversation - runs = await server.run_manager.list_runs( - actor=actor, - statuses=[RunStatus.created, RunStatus.running], - ascending=False, - conversation_id=conversation_id, - limit=100, - ) run_ids = [run.id for run in runs] if not run_ids: @@ -578,7 +628,7 @@ async def cancel_conversation( except Exception as e: logger.error(f"Failed to cancel Lettuce run {run_id}: {e}") - await server.run_manager.cancel_run(actor=actor, agent_id=conversation.agent_id, run_id=run_id) + await server.run_manager.cancel_run(actor=actor, agent_id=agent_id, run_id=run_id) except Exception as e: results[run_id] = "failed" logger.error(f"Failed to cancel run {run_id}: {str(e)}") @@ -614,23 +664,36 @@ async def compact_conversation( This endpoint summarizes the in-context messages for a specific conversation, reducing the message count while preserving important context. + + If conversation_id is an agent ID (starts with "agent-"), compacts the + agent's default conversation messages. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) - # Get the conversation to find the agent_id - conversation = await conversation_manager.get_conversation_by_id( - conversation_id=conversation_id, - actor=actor, - ) + # Agent-direct mode: compact agent's default conversation + if conversation_id.startswith("agent-"): + agent_id = conversation_id + agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"]) + in_context_messages = await server.message_manager.get_messages_by_ids_async(message_ids=agent.message_ids, actor=actor) + agent_loop = LettaAgentV3(agent_state=agent, actor=actor) + else: + # Get the conversation to find the agent_id + conversation = await conversation_manager.get_conversation_by_id( + conversation_id=conversation_id, + actor=actor, + ) - # Get the agent state - agent = await server.agent_manager.get_agent_by_id_async(conversation.agent_id, actor, include_relationships=["multi_agent_group"]) + # Get the agent state + agent = await server.agent_manager.get_agent_by_id_async(conversation.agent_id, actor, include_relationships=["multi_agent_group"]) - # Get in-context messages for this conversation - in_context_messages = await conversation_manager.get_messages_for_conversation( - conversation_id=conversation_id, - actor=actor, - ) + # Get in-context messages for this conversation + in_context_messages = await conversation_manager.get_messages_for_conversation( + conversation_id=conversation_id, + actor=actor, + ) + + # Create agent loop with conversation context + agent_loop = LettaAgentV3(agent_state=agent, actor=actor, conversation_id=conversation_id) if not in_context_messages: raise HTTPException( @@ -638,9 +701,6 @@ async def compact_conversation( detail="No in-context messages found for this conversation.", ) - # Create agent loop with conversation context - agent_loop = LettaAgentV3(agent_state=agent, actor=actor, conversation_id=conversation_id) - compaction_settings = request.compaction_settings if request else None num_messages_before = len(in_context_messages) diff --git a/tests/integration_test_conversations_sdk.py b/tests/integration_test_conversations_sdk.py index a4405bfd..f1c7f47c 100644 --- a/tests/integration_test_conversations_sdk.py +++ b/tests/integration_test_conversations_sdk.py @@ -675,6 +675,56 @@ class TestConversationsSDK: ) assert len(messages) > 0, "Should be able to send message after concurrent requests complete" + def test_agent_direct_list_messages(self, client: Letta, agent): + """Test listing messages using agent ID as conversation_id.""" + # First send a message via agent-direct mode + list( + client.conversations.messages.create( + conversation_id=agent.id, + messages=[{"role": "user", "content": "Test message for listing"}], + ) + ) + + # List messages using agent ID + messages_page = client.conversations.messages.list(conversation_id=agent.id) + messages = list(messages_page) + + # Should have messages (at least system + user + assistant) + assert len(messages) >= 3, f"Expected at least 3 messages, got {len(messages)}" + + # Verify we can find our test message + user_messages = [m for m in messages if hasattr(m, "message_type") and m.message_type == "user_message"] + assert any("Test message for listing" in str(m.content) for m in user_messages), "Should find our test message" + + def test_agent_direct_cancel(self, client: Letta, agent): + """Test canceling runs using agent ID as conversation_id.""" + from letta.settings import settings + + # Skip if run tracking is disabled + if not settings.track_agent_run: + pytest.skip("Run tracking disabled - skipping cancel test") + + # Start a background request that we can cancel + try: + # Send a message in background mode + stream = client.conversations.messages.create( + conversation_id=agent.id, + messages=[{"role": "user", "content": "Background message to cancel"}], + background=True, + ) + # Consume a bit of the stream to ensure it started + next(iter(stream), None) + + # Cancel using agent ID + result = client.conversations.cancel(conversation_id=agent.id) + + # Should return results (may be empty if run already completed) + assert isinstance(result, dict), "Cancel should return a dict of results" + except Exception as e: + # If no active runs, that's okay - the run may have completed quickly + if "No active runs" not in str(e): + raise + class TestConversationDelete: """Tests for the conversation delete endpoint."""