feat: make conversations throw http busy to stop race condition [LET-6842] (#8411)
* feat: make conversations throw http busy to stop race condition * use redis lock instead * move acquire lock into redis client, integration tests, move lock release into run manager * fix tests, bug * conditional import * remove else * better release * run ci * final reordering lock * update tests * wrong naming of lock holder token
This commit is contained in:
@@ -3,6 +3,7 @@ Integration tests for the Conversations API using the SDK.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from time import sleep
|
||||
|
||||
import pytest
|
||||
from letta_client import Letta
|
||||
@@ -269,3 +270,129 @@ class TestConversationsSDK:
|
||||
|
||||
# Should return 400 because no active run exists (run is completed)
|
||||
assert "No active runs found" in str(exc_info.value)
|
||||
|
||||
def test_conversation_lock_released_after_completion(self, client: Letta, agent):
|
||||
"""Test that lock is released after request completes by sending sequential messages."""
|
||||
from letta.settings import settings
|
||||
|
||||
# Skip if Redis is not configured
|
||||
if settings.redis_host is None or settings.redis_port is None:
|
||||
pytest.skip("Redis not configured - skipping conversation lock test")
|
||||
|
||||
conversation = client.conversations.create(agent_id=agent.id)
|
||||
|
||||
# Send first message (should acquire and release lock)
|
||||
messages1 = list(
|
||||
client.conversations.messages.create(
|
||||
conversation_id=conversation.id,
|
||||
messages=[{"role": "user", "content": "Hello"}],
|
||||
)
|
||||
)
|
||||
assert len(messages1) > 0
|
||||
|
||||
# Send second message - should succeed if lock was released
|
||||
messages2 = list(
|
||||
client.conversations.messages.create(
|
||||
conversation_id=conversation.id,
|
||||
messages=[{"role": "user", "content": "Hello again"}],
|
||||
)
|
||||
)
|
||||
assert len(messages2) > 0
|
||||
|
||||
def test_conversation_lock_released_on_error(self, client: Letta, agent):
|
||||
"""Test that lock is released even when the run encounters an error.
|
||||
|
||||
This test sends a message that triggers an error during streaming (by causing
|
||||
a context window exceeded error with a very long message), then verifies the
|
||||
lock is properly released by successfully sending another message.
|
||||
"""
|
||||
from letta.settings import settings
|
||||
|
||||
# Skip if Redis is not configured
|
||||
if settings.redis_host is None or settings.redis_port is None:
|
||||
pytest.skip("Redis not configured - skipping conversation lock test")
|
||||
|
||||
conversation = client.conversations.create(agent_id=agent.id)
|
||||
|
||||
# Try to send a message that will cause an error during processing
|
||||
# We use an extremely long message to trigger a context window error
|
||||
very_long_message = "Hello " * 100000 # Very long message to exceed context window
|
||||
|
||||
try:
|
||||
list(
|
||||
client.conversations.messages.create(
|
||||
conversation_id=conversation.id,
|
||||
messages=[{"role": "user", "content": very_long_message}],
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
pass # Expected to fail due to context window exceeded
|
||||
|
||||
# Send another message - should succeed if lock was released after error
|
||||
messages = list(
|
||||
client.conversations.messages.create(
|
||||
conversation_id=conversation.id,
|
||||
messages=[{"role": "user", "content": "Hello after error"}],
|
||||
)
|
||||
)
|
||||
assert len(messages) > 0, "Lock should be released even after run error"
|
||||
|
||||
def test_concurrent_messages_to_same_conversation(self, client: Letta, agent):
|
||||
"""Test that concurrent messages to the same conversation are properly serialized.
|
||||
|
||||
One request should succeed and one should get a 409 CONVERSATION_BUSY error.
|
||||
After both return, a subsequent message should succeed.
|
||||
"""
|
||||
import concurrent.futures
|
||||
|
||||
from letta_client import ConflictError
|
||||
|
||||
from letta.settings import settings
|
||||
|
||||
# Skip if Redis is not configured
|
||||
if settings.redis_host is None or settings.redis_port is None:
|
||||
pytest.skip("Redis not configured - skipping conversation lock test")
|
||||
|
||||
conversation = client.conversations.create(agent_id=agent.id)
|
||||
|
||||
results = {"success": 0, "conflict": 0, "other_error": 0}
|
||||
|
||||
def send_message(msg: str):
|
||||
try:
|
||||
messages = list(
|
||||
client.conversations.messages.create(
|
||||
conversation_id=conversation.id,
|
||||
messages=[{"role": "user", "content": msg}],
|
||||
)
|
||||
)
|
||||
return ("success", messages)
|
||||
except ConflictError:
|
||||
return ("conflict", None)
|
||||
except Exception as e:
|
||||
return ("other_error", str(e))
|
||||
|
||||
# Fire off two messages concurrently
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
future1 = executor.submit(send_message, "Message 1")
|
||||
future2 = executor.submit(send_message, "Message 2")
|
||||
|
||||
result1 = future1.result()
|
||||
result2 = future2.result()
|
||||
|
||||
# Count results
|
||||
for result_type, _ in [result1, result2]:
|
||||
results[result_type] += 1
|
||||
|
||||
# One should succeed and one should get conflict
|
||||
assert results["success"] == 1, f"Expected 1 success, got {results['success']}"
|
||||
assert results["conflict"] == 1, f"Expected 1 conflict, got {results['conflict']}"
|
||||
assert results["other_error"] == 0, f"Unexpected errors: {results['other_error']}"
|
||||
|
||||
# Now send another message - should succeed since lock is released
|
||||
messages = list(
|
||||
client.conversations.messages.create(
|
||||
conversation_id=conversation.id,
|
||||
messages=[{"role": "user", "content": "Message after concurrent requests"}],
|
||||
)
|
||||
)
|
||||
assert len(messages) > 0, "Should be able to send message after concurrent requests complete"
|
||||
|
||||
Reference in New Issue
Block a user