From 6d859174c2654dedcf1c7556dc98c8a0376c914b Mon Sep 17 00:00:00 2001 From: Ari Webb Date: Fri, 9 Jan 2026 17:34:01 -0800 Subject: [PATCH] feat: make conversations throw http busy to stop race condition [LET-6842] (#8411) * feat: make conversations throw http busy to stop race condition * use redis lock instead * move acquire lock into redis client, integration tests, move lock release into run manager * fix tests, bug * conditional import * remove else * better release * run ci * final reordering lock * update tests * wrong naming of lock holder token --- letta/constants.py | 4 + letta/data_sources/redis_client.py | 71 +++++++++- letta/errors.py | 16 +++ letta/server/rest_api/app.py | 2 + letta/server/rest_api/redis_stream_manager.py | 10 +- letta/services/run_manager.py | 21 ++- letta/services/streaming_service.py | 21 ++- tests/integration_test_conversations_sdk.py | 127 ++++++++++++++++++ 8 files changed, 265 insertions(+), 7 deletions(-) diff --git a/letta/constants.py b/letta/constants.py index 48e160a7..2eb39d09 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -452,6 +452,10 @@ REDIS_SET_DEFAULT_VAL = "None" REDIS_DEFAULT_CACHE_PREFIX = "letta_cache" REDIS_RUN_ID_PREFIX = "agent:send_message:run_id" +# Conversation lock constants +CONVERSATION_LOCK_PREFIX = "conversation:lock:" +CONVERSATION_LOCK_TTL_SECONDS = 300 # 5 minutes + # TODO: This is temporary, eventually use token-based eviction # File based controls DEFAULT_MAX_FILES_OPEN = 5 diff --git a/letta/data_sources/redis_client.py b/letta/data_sources/redis_client.py index be149ab2..8c4f5fec 100644 --- a/letta/data_sources/redis_client.py +++ b/letta/data_sources/redis_client.py @@ -2,17 +2,20 @@ import asyncio from functools import wraps from typing import Any, Dict, List, Optional, Set, Union -from letta.constants import REDIS_EXCLUDE, REDIS_INCLUDE, REDIS_SET_DEFAULT_VAL +from letta.constants import CONVERSATION_LOCK_PREFIX, CONVERSATION_LOCK_TTL_SECONDS, REDIS_EXCLUDE, REDIS_INCLUDE, REDIS_SET_DEFAULT_VAL +from letta.errors import ConversationBusyError from letta.log import get_logger from letta.settings import settings try: from redis import RedisError from redis.asyncio import ConnectionPool, Redis + from redis.asyncio.lock import Lock except ImportError: RedisError = None Redis = None ConnectionPool = None + Lock = None logger = get_logger(__name__) @@ -171,6 +174,62 @@ class AsyncRedisClient: client = await self.get_client() return await client.delete(*keys) + async def acquire_conversation_lock( + self, + conversation_id: str, + token: str, + ) -> Optional["Lock"]: + """ + Acquire a distributed lock for a conversation. + + Args: + conversation_id: The ID for the conversation + token: Unique identifier for the lock holder (for debugging/tracing) + + Returns: + Lock object if acquired, raises ConversationBusyError if in use + """ + if Lock is None: + return None + client = await self.get_client() + lock_key = f"{CONVERSATION_LOCK_PREFIX}{conversation_id}" + lock = Lock( + client, + lock_key, + timeout=CONVERSATION_LOCK_TTL_SECONDS, + blocking=False, + thread_local=False, # We manage token explicitly + raise_on_release_error=False, # We handle release errors ourselves + ) + + if await lock.acquire(token=token): + return lock + + lock_holder_token = await client.get(lock_key) + raise ConversationBusyError( + conversation_id=conversation_id, + lock_holder_token=lock_holder_token, + ) + + async def release_conversation_lock(self, conversation_id: str) -> bool: + """ + Release a conversation lock by conversation_id. + + Args: + conversation_id: The conversation ID to release the lock for + + Returns: + True if lock was released, False if release failed + """ + try: + client = await self.get_client() + lock_key = f"{CONVERSATION_LOCK_PREFIX}{conversation_id}" + await client.delete(lock_key) + return True + except Exception as e: + logger.warning(f"Failed to release conversation lock for conversation {conversation_id}: {e}") + return False + @with_retry() async def exists(self, *keys: str) -> int: """Check if keys exist.""" @@ -395,6 +454,16 @@ class NoopAsyncRedisClient(AsyncRedisClient): async def delete(self, *keys: str) -> int: return 0 + async def acquire_conversation_lock( + self, + conversation_id: str, + token: str, + ) -> Optional["Lock"]: + return None + + async def release_conversation_lock(self, conversation_id: str) -> bool: + return False + async def check_inclusion_and_exclusion(self, member: str, group: str) -> bool: return False diff --git a/letta/errors.py b/letta/errors.py index edc23d84..22fea4f6 100644 --- a/letta/errors.py +++ b/letta/errors.py @@ -73,6 +73,22 @@ class ConcurrentUpdateError(LettaError): super().__init__(message=message, code=ErrorCode.CONFLICT, details=details) +class ConversationBusyError(LettaError): + """Error raised when attempting to send a message while another request is already processing for the same conversation.""" + + def __init__(self, conversation_id: str, lock_holder_token: Optional[str] = None): + self.conversation_id = conversation_id + self.lock_holder_token = lock_holder_token + message = "Cannot send a new message: Another request is currently being processed for this conversation. Please wait for the current request to complete." + code = ErrorCode.CONFLICT + details = { + "error_code": "CONVERSATION_BUSY", + "conversation_id": conversation_id, + "lock_holder_token": lock_holder_token, + } + super().__init__(message=message, code=code, details=details) + + class LettaToolCreateError(LettaError): """Error raised when a tool cannot be created.""" diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index b0612472..a2406e11 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -33,6 +33,7 @@ from letta.errors import ( AgentNotFoundForExportError, BedrockPermissionError, ConcurrentUpdateError, + ConversationBusyError, EmbeddingConfigRequiredError, HandleNotFoundError, LettaAgentNotFoundError, @@ -496,6 +497,7 @@ def create_application() -> "FastAPI": app.add_exception_handler(UniqueConstraintViolationError, _error_handler_409) app.add_exception_handler(IntegrityError, _error_handler_409) app.add_exception_handler(ConcurrentUpdateError, _error_handler_409) + app.add_exception_handler(ConversationBusyError, _error_handler_409) app.add_exception_handler(PendingApprovalError, _error_handler_409) app.add_exception_handler(NoActiveRunsToCancelError, _error_handler_409) diff --git a/letta/server/rest_api/redis_stream_manager.py b/letta/server/rest_api/redis_stream_manager.py index 9fae3710..0b56c4c6 100644 --- a/letta/server/rest_api/redis_stream_manager.py +++ b/letta/server/rest_api/redis_stream_manager.py @@ -202,6 +202,7 @@ async def create_background_stream_processor( writer: Optional[RedisSSEStreamWriter] = None, run_manager: Optional[RunManager] = None, actor: Optional[User] = None, + conversation_id: Optional[str] = None, ) -> None: """ Process a stream in the background and store chunks to Redis. @@ -216,6 +217,7 @@ async def create_background_stream_processor( writer: Optional pre-configured writer (creates new if not provided) run_manager: Optional run manager for updating run status actor: Optional actor for run status updates + conversation_id: Optional conversation ID for releasing lock on terminal states """ stop_reason = None saw_done = False @@ -342,6 +344,7 @@ async def create_background_stream_processor( run_id=run_id, update=RunUpdate(status=RunStatus.failed, stop_reason=StopReasonType.error.value, metadata={"error": str(e)}), actor=actor, + conversation_id=conversation_id, ) finally: if should_stop_writer: @@ -384,7 +387,12 @@ async def create_background_stream_processor( if run_status == RunStatus.failed and error_metadata is not None: update_kwargs["metadata"] = error_metadata - await run_manager.update_run_by_id_async(run_id=run_id, update=RunUpdate(**update_kwargs), actor=actor) + await run_manager.update_run_by_id_async( + run_id=run_id, + update=RunUpdate(**update_kwargs), + actor=actor, + conversation_id=conversation_id, + ) # Belt-and-suspenders: always append a terminal [DONE] chunk to ensure clients terminate # Even if a previous chunk set `complete`, an extra [DONE] is harmless and ensures SDKs that diff --git a/letta/services/run_manager.py b/letta/services/run_manager.py index bea5ca3f..d600b525 100644 --- a/letta/services/run_manager.py +++ b/letta/services/run_manager.py @@ -5,6 +5,7 @@ from typing import List, Literal, Optional from httpx import AsyncClient +from letta.data_sources.redis_client import get_redis_client from letta.errors import LettaInvalidArgumentError from letta.helpers.datetime_helpers import get_utc_time from letta.log import get_logger @@ -318,7 +319,12 @@ class RunManager: @raise_on_invalid_id(param_name="run_id", expected_prefix=PrimitiveType.RUN) @trace_method async def update_run_by_id_async( - self, run_id: str, update: RunUpdate, actor: PydanticUser, refresh_result_messages: bool = True + self, + run_id: str, + update: RunUpdate, + actor: PydanticUser, + refresh_result_messages: bool = True, + conversation_id: Optional[str] = None, ) -> PydanticRun: """Update a run using a RunUpdate object.""" async with db_registry.async_session() as session: @@ -382,6 +388,14 @@ class RunManager: # context manager now handles commits # await session.commit() + # Release conversation lock if conversation_id was provided + if is_terminal_update and conversation_id: + try: + redis_client = await get_redis_client() + await redis_client.release_conversation_lock(conversation_id) + except Exception as lock_error: + logger.warning(f"Failed to release conversation lock for conversation {conversation_id}: {lock_error}") + # Update agent's last_stop_reason when run completes # Do this after run update is committed to database if is_terminal_update and update.stop_reason: @@ -639,7 +653,10 @@ class RunManager: # cancel the run # NOTE: this should update the agent's last stop reason to cancelled run = await self.update_run_by_id_async( - run_id=run_id, update=RunUpdate(status=RunStatus.cancelled, stop_reason=StopReasonType.cancelled), actor=actor + run_id=run_id, + update=RunUpdate(status=RunStatus.cancelled, stop_reason=StopReasonType.cancelled), + actor=actor, + conversation_id=run.conversation_id, ) # cleanup the agent's state diff --git a/letta/services/streaming_service.py b/letta/services/streaming_service.py index 41593e3e..826ec6c0 100644 --- a/letta/services/streaming_service.py +++ b/letta/services/streaming_service.py @@ -102,14 +102,24 @@ class StreamingService: model_compatible = self._is_model_compatible(agent) model_compatible_token_streaming = self._is_token_streaming_compatible(agent) + # Attempt to acquire conversation lock if conversation_id is provided + # This prevents concurrent message processing for the same conversation + # Skip locking if Redis is not available (graceful degradation) + if conversation_id and not isinstance(redis_client, NoopAsyncRedisClient): + await redis_client.acquire_conversation_lock( + conversation_id=conversation_id, + token=str(uuid4()), + ) + # create run if tracking is enabled run = None run_update_metadata = None - if settings.track_agent_run: - run = await self._create_run(agent_id, request, run_type, actor, conversation_id=conversation_id) - await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{agent_id}", run.id if run else None) try: + if settings.track_agent_run: + run = await self._create_run(agent_id, request, run_type, actor, conversation_id=conversation_id) + await redis_client.set(f"{REDIS_RUN_ID_PREFIX}:{agent_id}", run.id if run else None) + if agent_eligible and model_compatible: # use agent loop for streaming agent_loop = AgentLoop.load(agent_state=agent, actor=actor) @@ -156,6 +166,7 @@ class StreamingService: run_id=run.id, run_manager=self.server.run_manager, actor=actor, + conversation_id=conversation_id, ), label=f"background_stream_processor_{run.id}", ) @@ -218,6 +229,7 @@ class StreamingService: if settings.track_agent_run and run: await self.server.run_manager.update_run_by_id_async( run_id=run.id, + conversation_id=conversation_id, update=RunUpdate(status=run_status, metadata=run_update_metadata), actor=actor, ) @@ -446,6 +458,7 @@ class StreamingService: stop_reason_value = stop_reason.stop_reason if stop_reason else StopReasonType.error.value await self.runs_manager.update_run_by_id_async( run_id=run_id, + conversation_id=conversation_id, update=RunUpdate(status=run_status, stop_reason=stop_reason_value, metadata=error_data), actor=actor, ) @@ -507,6 +520,7 @@ class StreamingService: actor: User, error: Optional[str] = None, stop_reason: Optional[str] = None, + conversation_id: Optional[str] = None, ): """Update the status of a run.""" if not self.runs_manager: @@ -522,6 +536,7 @@ class StreamingService: run_id=run_id, update=update, actor=actor, + conversation_id=conversation_id, ) diff --git a/tests/integration_test_conversations_sdk.py b/tests/integration_test_conversations_sdk.py index c512880f..6284b705 100644 --- a/tests/integration_test_conversations_sdk.py +++ b/tests/integration_test_conversations_sdk.py @@ -3,6 +3,7 @@ Integration tests for the Conversations API using the SDK. """ import uuid +from time import sleep import pytest from letta_client import Letta @@ -269,3 +270,129 @@ class TestConversationsSDK: # Should return 400 because no active run exists (run is completed) assert "No active runs found" in str(exc_info.value) + + def test_conversation_lock_released_after_completion(self, client: Letta, agent): + """Test that lock is released after request completes by sending sequential messages.""" + from letta.settings import settings + + # Skip if Redis is not configured + if settings.redis_host is None or settings.redis_port is None: + pytest.skip("Redis not configured - skipping conversation lock test") + + conversation = client.conversations.create(agent_id=agent.id) + + # Send first message (should acquire and release lock) + messages1 = list( + client.conversations.messages.create( + conversation_id=conversation.id, + messages=[{"role": "user", "content": "Hello"}], + ) + ) + assert len(messages1) > 0 + + # Send second message - should succeed if lock was released + messages2 = list( + client.conversations.messages.create( + conversation_id=conversation.id, + messages=[{"role": "user", "content": "Hello again"}], + ) + ) + assert len(messages2) > 0 + + def test_conversation_lock_released_on_error(self, client: Letta, agent): + """Test that lock is released even when the run encounters an error. + + This test sends a message that triggers an error during streaming (by causing + a context window exceeded error with a very long message), then verifies the + lock is properly released by successfully sending another message. + """ + from letta.settings import settings + + # Skip if Redis is not configured + if settings.redis_host is None or settings.redis_port is None: + pytest.skip("Redis not configured - skipping conversation lock test") + + conversation = client.conversations.create(agent_id=agent.id) + + # Try to send a message that will cause an error during processing + # We use an extremely long message to trigger a context window error + very_long_message = "Hello " * 100000 # Very long message to exceed context window + + try: + list( + client.conversations.messages.create( + conversation_id=conversation.id, + messages=[{"role": "user", "content": very_long_message}], + ) + ) + except Exception: + pass # Expected to fail due to context window exceeded + + # Send another message - should succeed if lock was released after error + messages = list( + client.conversations.messages.create( + conversation_id=conversation.id, + messages=[{"role": "user", "content": "Hello after error"}], + ) + ) + assert len(messages) > 0, "Lock should be released even after run error" + + def test_concurrent_messages_to_same_conversation(self, client: Letta, agent): + """Test that concurrent messages to the same conversation are properly serialized. + + One request should succeed and one should get a 409 CONVERSATION_BUSY error. + After both return, a subsequent message should succeed. + """ + import concurrent.futures + + from letta_client import ConflictError + + from letta.settings import settings + + # Skip if Redis is not configured + if settings.redis_host is None or settings.redis_port is None: + pytest.skip("Redis not configured - skipping conversation lock test") + + conversation = client.conversations.create(agent_id=agent.id) + + results = {"success": 0, "conflict": 0, "other_error": 0} + + def send_message(msg: str): + try: + messages = list( + client.conversations.messages.create( + conversation_id=conversation.id, + messages=[{"role": "user", "content": msg}], + ) + ) + return ("success", messages) + except ConflictError: + return ("conflict", None) + except Exception as e: + return ("other_error", str(e)) + + # Fire off two messages concurrently + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future1 = executor.submit(send_message, "Message 1") + future2 = executor.submit(send_message, "Message 2") + + result1 = future1.result() + result2 = future2.result() + + # Count results + for result_type, _ in [result1, result2]: + results[result_type] += 1 + + # One should succeed and one should get conflict + assert results["success"] == 1, f"Expected 1 success, got {results['success']}" + assert results["conflict"] == 1, f"Expected 1 conflict, got {results['conflict']}" + assert results["other_error"] == 0, f"Unexpected errors: {results['other_error']}" + + # Now send another message - should succeed since lock is released + messages = list( + client.conversations.messages.create( + conversation_id=conversation.id, + messages=[{"role": "user", "content": "Message after concurrent requests"}], + ) + ) + assert len(messages) > 0, "Should be able to send message after concurrent requests complete"