From 39a537a9a58eb7fb33269c4b030fbb427a8209f2 Mon Sep 17 00:00:00 2001 From: cthomas Date: Thu, 26 Feb 2026 16:19:39 -0800 Subject: [PATCH] feat: add default convo support to conversations endpoint (#9706) * feat: add default convo support to conversations endpoint * api sync --- fern/openapi.json | 90 ++++++++------ .../rest_api/routers/v1/conversations.py | 117 ++++++++++++++++++ letta/services/streaming_service.py | 21 ++-- letta/validators.py | 31 +++-- tests/integration_test_conversations_sdk.py | 111 ++++++++++++++++- 5 files changed, 308 insertions(+), 62 deletions(-) diff --git a/fern/openapi.json b/fern/openapi.json index 1abe55a2..e4442c78 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -8856,16 +8856,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "responses": { @@ -8904,16 +8905,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "requestBody": { @@ -8962,16 +8964,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "responses": { @@ -9010,16 +9013,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." }, { "name": "before", @@ -9169,7 +9173,7 @@ "post": { "tags": ["conversations"], "summary": "Send Conversation Message", - "description": "Send a message to a conversation and get a response.\n\nThis endpoint sends a message to an existing conversation.\nBy default (streaming=true), returns a streaming response (Server-Sent Events).\nSet streaming=false to get a complete JSON response.", + "description": "Send a message to a conversation and get a response.\n\nThis endpoint sends a message to an existing conversation.\nBy default (streaming=true), returns a streaming response (Server-Sent Events).\nSet streaming=false to get a complete JSON response.\n\nIf conversation_id is an agent ID (starts with \"agent-\"), routes to agent-direct\nmode with locking but without conversation-specific features.", "operationId": "send_conversation_message", "parameters": [ { @@ -9179,16 +9183,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "requestBody": { @@ -9243,16 +9248,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "requestBody": { @@ -9346,16 +9352,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "responses": { @@ -9398,16 +9405,17 @@ "schema": { "type": "string", "minLength": 1, - "maxLength": 41, - "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'", + "maxLength": 42, + "pattern": "^(default|conv-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|agent-[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})$", + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'.", "examples": [ "default", - "conv-123e4567-e89b-42d3-8456-426614174000" + "conv-123e4567-e89b-42d3-8456-426614174000", + "agent-123e4567-e89b-42d3-8456-426614174000" ], "title": "Conversation Id" }, - "description": "The conversation identifier. Either the special value 'default' or an ID in the format 'conv-'" + "description": "The conversation identifier. Can be a conversation ID ('conv-'), an agent ID ('agent-') for agent-direct messaging, or 'default'." } ], "requestBody": { diff --git a/letta/server/rest_api/routers/v1/conversations.py b/letta/server/rest_api/routers/v1/conversations.py index 141fba52..67a7b1a6 100644 --- a/letta/server/rest_api/routers/v1/conversations.py +++ b/letta/server/rest_api/routers/v1/conversations.py @@ -1,5 +1,6 @@ from datetime import timedelta from typing import Annotated, List, Literal, Optional +from uuid import uuid4 from fastapi import APIRouter, Body, Depends, HTTPException, Query, status from pydantic import BaseModel, Field @@ -186,6 +187,105 @@ async def list_conversation_messages( ) +async def _send_agent_direct_message( + agent_id: str, + request: ConversationMessageRequest, + server: SyncServer, + actor, +) -> StreamingResponse | LettaResponse: + """ + Handle agent-direct messaging with locking but without conversation features. + + This is used when the conversation_id in the URL is actually an agent ID, + providing a unified endpoint while maintaining agent-level locking. + """ + redis_client = await get_redis_client() + + # Streaming mode (default) + if request.streaming: + streaming_request = LettaStreamingRequest( + messages=request.messages, + streaming=True, + stream_tokens=request.stream_tokens, + include_pings=request.include_pings, + background=request.background, + max_steps=request.max_steps, + use_assistant_message=request.use_assistant_message, + assistant_message_tool_name=request.assistant_message_tool_name, + assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, + include_return_message_types=request.include_return_message_types, + override_model=request.override_model, + client_tools=request.client_tools, + ) + streaming_service = StreamingService(server) + run, result = await streaming_service.create_agent_stream( + agent_id=agent_id, + actor=actor, + request=streaming_request, + run_type="send_message", + conversation_id=None, + should_lock=True, + ) + return result + + # Non-streaming mode with locking + agent = await server.agent_manager.get_agent_by_id_async( + agent_id, + actor, + include_relationships=["memory", "multi_agent_group", "sources", "tool_exec_environment_variables", "tools", "tags"], + ) + + # Handle model override if specified in the request + if request.override_model: + override_llm_config = await server.get_llm_config_from_handle_async( + actor=actor, + handle=request.override_model, + ) + agent = agent.model_copy(update={"llm_config": override_llm_config}) + + # Acquire lock using agent_id as lock key + if not isinstance(redis_client, NoopAsyncRedisClient): + await redis_client.acquire_conversation_lock( + conversation_id=agent_id, + token=str(uuid4()), + ) + + try: + # Create a run for execution tracking + run = None + if settings.track_agent_run: + runs_manager = RunManager() + run = await runs_manager.create_run( + pydantic_run=PydanticRun( + agent_id=agent_id, + background=False, + metadata={ + "run_type": "send_message", + }, + request_config=LettaRequestConfig.from_letta_request(request), + ), + actor=actor, + ) + + # Set run_id in Redis for cancellation support + await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{agent_id}", run.id if run else None) + + agent_loop = AgentLoop.load(agent_state=agent, actor=actor) + return await agent_loop.step( + request.messages, + max_steps=request.max_steps, + run_id=run.id if run else None, + use_assistant_message=request.use_assistant_message, + include_return_message_types=request.include_return_message_types, + client_tools=request.client_tools, + conversation_id=None, + include_compaction_messages=request.include_compaction_messages, + ) + finally: + # Release lock + await redis_client.release_conversation_lock(agent_id) + + @router.post( "/{conversation_id}/messages", response_model=LettaResponse, @@ -212,12 +312,29 @@ async def send_conversation_message( This endpoint sends a message to an existing conversation. By default (streaming=true), returns a streaming response (Server-Sent Events). Set streaming=false to get a complete JSON response. + + If conversation_id is an agent ID (starts with "agent-"), routes to agent-direct + mode with locking but without conversation-specific features. """ actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) if not request.messages or len(request.messages) == 0: raise HTTPException(status_code=422, detail="Messages must not be empty") + # Detect agent-direct mode: conversation_id is actually an agent ID + is_agent_direct = conversation_id.startswith("agent-") + + if is_agent_direct: + # Agent-direct mode: use agent ID, enable locking, skip conversation features + agent_id = conversation_id + return await _send_agent_direct_message( + agent_id=agent_id, + request=request, + server=server, + actor=actor, + ) + + # Normal conversation mode conversation = await conversation_manager.get_conversation_by_id( conversation_id=conversation_id, actor=actor, diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 22b9e888..64fdd346 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -77,6 +77,7 @@ class StreamingService: request: LettaStreamingRequest, run_type: str = "streaming", conversation_id: Optional[str] = None, + should_lock: bool = False, ) -> tuple[Optional[PydanticRun], Union[StreamingResponse, LettaResponse]]: """ Create a streaming response for an agent. @@ -87,6 +88,7 @@ class StreamingService: request: The LettaStreamingRequest containing all request parameters run_type: Type of run for tracking conversation_id: Optional conversation ID for conversation-scoped messaging + should_lock: If True and conversation_id is None, use agent_id as lock key Returns: Tuple of (run object or None, streaming response) @@ -131,12 +133,15 @@ class StreamingService: model_compatible_token_streaming = self._is_token_streaming_compatible(agent) - # Attempt to acquire conversation lock if conversation_id is provided - # This prevents concurrent message processing for the same conversation + # Determine lock key: use conversation_id if provided, else agent_id if should_lock + lock_key = conversation_id if conversation_id else (agent_id if should_lock else None) + + # Attempt to acquire lock if lock_key is set + # This prevents concurrent message processing for the same conversation/agent # Skip locking if Redis is not available (graceful degradation) - if conversation_id and not isinstance(redis_client, NoopAsyncRedisClient): + if lock_key and not isinstance(redis_client, NoopAsyncRedisClient): await redis_client.acquire_conversation_lock( - conversation_id=conversation_id, + conversation_id=lock_key, token=str(uuid4()), ) @@ -164,6 +169,7 @@ class StreamingService: include_return_message_types=request.include_return_message_types, actor=actor, conversation_id=conversation_id, + lock_key=lock_key, # For lock release (may differ from conversation_id) client_tools=request.client_tools, include_compaction_messages=request.include_compaction_messages, ) @@ -196,7 +202,7 @@ class StreamingService: run_id=run.id, run_manager=self.server.run_manager, actor=actor, - conversation_id=conversation_id, + conversation_id=lock_key, # Use lock_key for lock release ), label=f"background_stream_processor_{run.id}", ) @@ -252,7 +258,7 @@ class StreamingService: if settings.track_agent_run and run and run_status: await self.server.run_manager.update_run_by_id_async( run_id=run.id, - conversation_id=conversation_id, + conversation_id=lock_key, # Use lock_key for lock release update=RunUpdate(status=run_status, metadata=run_update_metadata), actor=actor, ) @@ -327,6 +333,7 @@ class StreamingService: include_return_message_types: Optional[list[MessageType]], actor: User, conversation_id: Optional[str] = None, + lock_key: Optional[str] = None, client_tools: Optional[list[ClientToolSchema]] = None, include_compaction_messages: bool = False, ) -> AsyncIterator: @@ -507,7 +514,7 @@ class StreamingService: stop_reason_value = stop_reason.stop_reason if stop_reason else StopReasonType.error.value await self.runs_manager.update_run_by_id_async( run_id=run_id, - conversation_id=conversation_id, + conversation_id=lock_key, # Use lock_key for lock release update=RunUpdate(status=run_status, stop_reason=stop_reason_value, metadata=error_data), actor=actor, ) diff --git a/letta/validators.py b/letta/validators.py index 4e8552c5..a6fa3f7e 100644 --- a/letta/validators.py +++ b/letta/validators.py @@ -45,27 +45,36 @@ PATH_VALIDATORS = {primitive_type.value: _create_path_validator_factory(primitiv def _create_conversation_id_or_default_path_validator_factory(): - """Conversation IDs accept the usual primitive format or the special value 'default'.""" + """Conversation IDs accept the usual primitive format, 'default', or an agent ID.""" - primitive = PrimitiveType.CONVERSATION.value - prefix_pattern = PRIMITIVE_ID_PATTERNS[primitive].pattern - # Make the full regex accept either the primitive ID format or 'default'. - # `prefix_pattern` already contains the ^...$ anchors. - conversation_or_default_pattern = f"^(default|{prefix_pattern[1:-1]})$" + conversation_primitive = PrimitiveType.CONVERSATION.value + agent_primitive = PrimitiveType.AGENT.value + conversation_pattern = PRIMITIVE_ID_PATTERNS[conversation_primitive].pattern + agent_pattern = PRIMITIVE_ID_PATTERNS[agent_primitive].pattern + # Make the full regex accept: conversation ID, agent ID, or 'default'. + # Patterns already contain ^...$ anchors, so strip them for the alternation. + conversation_or_agent_or_default_pattern = f"^(default|{conversation_pattern[1:-1]}|{agent_pattern[1:-1]})$" def factory(): return Path( - description=(f"The conversation identifier. Either the special value 'default' or an ID in the format '{primitive}-'"), - pattern=conversation_or_default_pattern, - examples=["default", f"{primitive}-123e4567-e89b-42d3-8456-426614174000"], + description=( + f"The conversation identifier. Can be a conversation ID ('{conversation_primitive}-'), " + f"an agent ID ('{agent_primitive}-') for agent-direct messaging, or 'default'." + ), + pattern=conversation_or_agent_or_default_pattern, + examples=[ + "default", + f"{conversation_primitive}-123e4567-e89b-42d3-8456-426614174000", + f"{agent_primitive}-123e4567-e89b-42d3-8456-426614174000", + ], min_length=1, - max_length=len(primitive) + 1 + 36, + max_length=max(len(conversation_primitive), len(agent_primitive)) + 1 + 36, ) return factory -# Override conversation ID path validation to also allow the special value 'default'. +# Override conversation ID path validation to also allow 'default' and agent IDs. PATH_VALIDATORS[PrimitiveType.CONVERSATION.value] = _create_conversation_id_or_default_path_validator_factory() diff --git a/tests/integration_test_conversations_sdk.py b/tests/integration_test_conversations_sdk.py index 45462d71..a4405bfd 100644 --- a/tests/integration_test_conversations_sdk.py +++ b/tests/integration_test_conversations_sdk.py @@ -568,6 +568,113 @@ class TestConversationsSDK: # Should not contain the cursor message assert first_message_id not in [m.id for m in messages_after] + def test_agent_direct_messaging_via_conversations_endpoint(self, client: Letta, agent): + """Test sending messages using agent ID as conversation_id (agent-direct mode). + + This allows clients to use a unified endpoint pattern without managing conversation IDs. + """ + # Send a message using the agent ID directly as conversation_id + # This should route to agent-direct mode with locking + messages = list( + client.conversations.messages.create( + conversation_id=agent.id, # Using agent ID instead of conversation ID + messages=[{"role": "user", "content": "Hello via agent-direct mode!"}], + ) + ) + + # Verify we got a response + assert len(messages) > 0, "Should receive response messages" + + # Verify we got an assistant message in the response + assistant_messages = [m for m in messages if hasattr(m, "message_type") and m.message_type == "assistant_message"] + assert len(assistant_messages) > 0, "Should receive at least one assistant message" + + def test_agent_direct_messaging_with_locking(self, client: Letta, agent): + """Test that agent-direct mode properly acquires and releases locks. + + Sequential requests should both succeed if locks are properly released. + """ + from letta.settings import settings + + # Skip if Redis is not configured + if settings.redis_host is None or settings.redis_port is None: + pytest.skip("Redis not configured - skipping agent-direct lock test") + + # Send first message via agent-direct mode + messages1 = list( + client.conversations.messages.create( + conversation_id=agent.id, + messages=[{"role": "user", "content": "First message"}], + ) + ) + assert len(messages1) > 0, "First message should succeed" + + # Send second message - should succeed if lock was released + messages2 = list( + client.conversations.messages.create( + conversation_id=agent.id, + messages=[{"role": "user", "content": "Second message"}], + ) + ) + assert len(messages2) > 0, "Second message should succeed after lock released" + + def test_agent_direct_concurrent_requests_blocked(self, client: Letta, agent): + """Test that concurrent requests to agent-direct mode are properly serialized. + + One request should succeed and one should get a 409 CONVERSATION_BUSY error. + """ + import concurrent.futures + + from letta_client import ConflictError + + from letta.settings import settings + + # Skip if Redis is not configured + if settings.redis_host is None or settings.redis_port is None: + pytest.skip("Redis not configured - skipping agent-direct lock test") + + results = {"success": 0, "conflict": 0, "other_error": 0} + + def send_message(msg: str): + try: + messages = list( + client.conversations.messages.create( + conversation_id=agent.id, # Agent-direct mode + messages=[{"role": "user", "content": msg}], + ) + ) + return ("success", messages) + except ConflictError: + return ("conflict", None) + except Exception as e: + return ("other_error", str(e)) + + # Fire off two messages concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future1 = executor.submit(send_message, "Concurrent message 1") + future2 = executor.submit(send_message, "Concurrent message 2") + + result1 = future1.result() + result2 = future2.result() + + # Count results + for result_type, _ in [result1, result2]: + results[result_type] += 1 + + # One should succeed and one should get conflict + assert results["success"] == 1, f"Expected 1 success, got {results['success']}" + assert results["conflict"] == 1, f"Expected 1 conflict, got {results['conflict']}" + assert results["other_error"] == 0, f"Unexpected errors: {results['other_error']}" + + # Now send another message - should succeed since lock is released + messages = list( + client.conversations.messages.create( + conversation_id=agent.id, + messages=[{"role": "user", "content": "Message after concurrent requests"}], + ) + ) + assert len(messages) > 0, "Should be able to send message after concurrent requests complete" + class TestConversationDelete: """Tests for the conversation delete endpoint.""" @@ -902,9 +1009,7 @@ class TestConversationSystemMessageRecompilation: order="asc", ) old_system_content = conv1_messages_after_update[0].content - assert unique_marker not in old_system_content, ( - "Old conversation system message should NOT contain the updated memory value" - ) + assert unique_marker not in old_system_content, "Old conversation system message should NOT contain the updated memory value" # Step 4: Create a new conversation conv2 = client.conversations.create(agent_id=agent.id)