Files
letta-server/tests/integration_test_conversations_sdk.py
github-actions[bot] 0020f4b866 feat: recompile system message on new conversation creation (#9508)
* feat: recompile system message on new conversation creation

When a new conversation is created, the system prompt is now recompiled
with the latest memory block values and metadata instead of starting
with no messages. This ensures each conversation captures the current
agent state at creation time.

- Add _initialize_conversation_system_message to ConversationManager
- Compile fresh system message using PromptGenerator during conversation creation
- Add integration tests for the full workflow (modify memory → new conversation
  gets updated system message)
- Update existing test expectations for non-empty conversation messages

Fixes #9507

Co-authored-by: Sarah Wooders <sarahwooders@users.noreply.github.com>

* refactor: deduplicate system message compilation into ConversationManager

Consolidate the duplicate system message compilation logic into a single
shared method `compile_and_save_system_message_for_conversation` on
ConversationManager. This method accepts optional pre-loaded agent_state
and message_manager to avoid redundant DB loads when callers already have
them.

- Renamed _initialize_conversation_system_message → compile_and_save_system_message_for_conversation (public, reusable)
- Added optional agent_state and message_manager params
- Replaced 40-line duplicate in helpers.py with a 7-line call to the shared method
- Method returns the persisted system message for caller use

Co-authored-by: Sarah Wooders <sarahwooders@users.noreply.github.com>

---------

Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com>
Co-authored-by: Sarah Wooders <sarahwooders@users.noreply.github.com>
2026-03-03 18:34:01 -08:00

968 lines
40 KiB
Python

"""
Integration tests for the Conversations API using the SDK.
"""
import uuid
import pytest
import requests
from letta_client import Letta
@pytest.fixture
def client(server_url: str) -> Letta:
"""Create a Letta client."""
return Letta(base_url=server_url)
@pytest.fixture
def agent(client: Letta):
"""Create a test agent."""
agent_state = client.agents.create(
name=f"test_conversations_{uuid.uuid4().hex[:8]}",
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
memory_blocks=[
{"label": "human", "value": "Test user"},
{"label": "persona", "value": "You are a helpful assistant."},
],
)
yield agent_state
# Cleanup
client.agents.delete(agent_id=agent_state.id)
class TestConversationsSDK:
"""Test conversations using the SDK client."""
def test_create_conversation(self, client: Letta, agent):
"""Test creating a conversation for an agent."""
conversation = client.conversations.create(agent_id=agent.id)
assert conversation.id is not None
assert conversation.id.startswith("conv-")
assert conversation.agent_id == agent.id
def test_list_conversations(self, client: Letta, agent):
"""Test listing conversations for an agent."""
# Create multiple conversations
conv1 = client.conversations.create(agent_id=agent.id)
conv2 = client.conversations.create(agent_id=agent.id)
# List conversations
conversations = client.conversations.list(agent_id=agent.id)
assert len(conversations) >= 2
conv_ids = [c.id for c in conversations]
assert conv1.id in conv_ids
assert conv2.id in conv_ids
def test_retrieve_conversation(self, client: Letta, agent):
"""Test retrieving a specific conversation."""
# Create a conversation
created = client.conversations.create(agent_id=agent.id)
# Retrieve it (should have system message from creation)
retrieved = client.conversations.retrieve(conversation_id=created.id)
assert retrieved.id == created.id
assert retrieved.agent_id == created.agent_id
# Conversation should have 1 system message immediately after creation
assert len(retrieved.in_context_message_ids) == 1
assert retrieved.in_context_message_ids[0].startswith("message-")
# Send a message to the conversation
list(
client.conversations.messages.create(
conversation_id=created.id,
messages=[{"role": "user", "content": "Hello!"}],
)
)
# Retrieve again and check in_context_message_ids is populated
retrieved_with_messages = client.conversations.retrieve(conversation_id=created.id)
# System message + user + assistant messages should be in the conversation
assert len(retrieved_with_messages.in_context_message_ids) >= 3 # system + user + assistant
# All IDs should be strings starting with "message-"
for msg_id in retrieved_with_messages.in_context_message_ids:
assert isinstance(msg_id, str)
assert msg_id.startswith("message-")
# Verify message ordering by listing messages
messages = client.conversations.messages.list(conversation_id=created.id)
assert len(messages) >= 3 # system + user + assistant
# First message should be system message (shared across conversations)
assert messages[0].message_type == "system_message", f"First message should be system_message, got {messages[0].message_type}"
# Second message should be user message
assert messages[1].message_type == "user_message", f"Second message should be user_message, got {messages[1].message_type}"
def test_send_message_to_conversation(self, client: Letta, agent):
"""Test sending a message to a conversation."""
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Send a message (returns a stream)
stream = client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Hello, how are you?"}],
)
# Consume the stream to get messages
messages = list(stream)
# Check response contains messages
assert len(messages) > 0
# Should have at least an assistant message
message_types = [m.message_type for m in messages if hasattr(m, "message_type")]
assert "assistant_message" in message_types
def test_list_conversation_messages(self, client: Letta, agent):
"""Test listing messages from a conversation."""
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Send a message to create some history (consume the stream)
stream = client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Say 'test response' back to me."}],
)
list(stream) # Consume stream
# List messages
messages = client.conversations.messages.list(conversation_id=conversation.id)
assert len(messages) >= 2 # At least user + assistant
message_types = [m.message_type for m in messages]
assert "user_message" in message_types
assert "assistant_message" in message_types
# Send another message and check that old and new messages are both listed
first_message_count = len(messages)
stream = client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "This is a follow-up message."}],
)
list(stream) # Consume stream
# List messages again
updated_messages = client.conversations.messages.list(conversation_id=conversation.id)
# Should have more messages now (at least 2 more: user + assistant)
assert len(updated_messages) >= first_message_count + 2
def test_conversation_isolation(self, client: Letta, agent):
"""Test that conversations are isolated from each other."""
# Create two conversations
conv1 = client.conversations.create(agent_id=agent.id)
conv2 = client.conversations.create(agent_id=agent.id)
# Send different messages to each (consume streams)
list(
client.conversations.messages.create(
conversation_id=conv1.id,
messages=[{"role": "user", "content": "Remember the word: APPLE"}],
)
)
list(
client.conversations.messages.create(
conversation_id=conv2.id,
messages=[{"role": "user", "content": "Remember the word: BANANA"}],
)
)
# List messages from each conversation
conv1_messages = client.conversations.messages.list(conversation_id=conv1.id)
conv2_messages = client.conversations.messages.list(conversation_id=conv2.id)
# Check messages are separate
conv1_content = " ".join([m.content for m in conv1_messages if hasattr(m, "content") and m.content])
conv2_content = " ".join([m.content for m in conv2_messages if hasattr(m, "content") and m.content])
assert "APPLE" in conv1_content
assert "BANANA" in conv2_content
# Each conversation should only have its own word
assert "BANANA" not in conv1_content or "APPLE" not in conv2_content
# Ask what word was remembered and make sure it's different for each conversation
conv1_recall = list(
client.conversations.messages.create(
conversation_id=conv1.id,
messages=[{"role": "user", "content": "What word did I ask you to remember? Reply with just the word."}],
)
)
conv2_recall = list(
client.conversations.messages.create(
conversation_id=conv2.id,
messages=[{"role": "user", "content": "What word did I ask you to remember? Reply with just the word."}],
)
)
# Get the assistant responses
conv1_response = " ".join([m.content for m in conv1_recall if hasattr(m, "message_type") and m.message_type == "assistant_message"])
conv2_response = " ".join([m.content for m in conv2_recall if hasattr(m, "message_type") and m.message_type == "assistant_message"])
assert "APPLE" in conv1_response.upper(), f"Conv1 should remember APPLE, got: {conv1_response}"
assert "BANANA" in conv2_response.upper(), f"Conv2 should remember BANANA, got: {conv2_response}"
# Each conversation has its own system message (created on first message)
conv1_system_id = conv1_messages[0].id
conv2_system_id = conv2_messages[0].id
assert conv1_system_id != conv2_system_id, "System messages should have different IDs for different conversations"
def test_conversation_messages_pagination(self, client: Letta, agent):
"""Test pagination when listing conversation messages."""
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Send multiple messages to create history (consume streams)
for i in range(3):
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": f"Message number {i}"}],
)
)
# List with limit
messages = client.conversations.messages.list(
conversation_id=conversation.id,
limit=2,
)
# Should respect the limit
assert len(messages) <= 2
def test_retrieve_conversation_stream_no_active_run(self, client: Letta, agent):
"""Test that retrieve_conversation_stream returns error when no active run exists."""
from letta_client import BadRequestError
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Try to retrieve stream when no run exists (should fail)
with pytest.raises(BadRequestError) as exc_info:
# Use the SDK's stream method
stream = client.conversations.messages.stream(conversation_id=conversation.id)
list(stream) # Consume the stream to trigger the error
# Should return 400 because no active run exists
assert "No active runs found" in str(exc_info.value)
def test_retrieve_conversation_stream_after_completed_run(self, client: Letta, agent):
"""Test that retrieve_conversation_stream returns error when run is completed."""
from letta_client import BadRequestError
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Send a message (this creates a run that completes)
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Hello"}],
)
)
# Try to retrieve stream after the run has completed (should fail)
with pytest.raises(BadRequestError) as exc_info:
# Use the SDK's stream method
stream = client.conversations.messages.stream(conversation_id=conversation.id)
list(stream) # Consume the stream to trigger the error
# Should return 400 because no active run exists (run is completed)
assert "No active runs found" in str(exc_info.value)
def test_conversation_lock_released_after_completion(self, client: Letta, agent):
"""Test that lock is released after request completes by sending sequential messages."""
from letta.settings import settings
# Skip if Redis is not configured
if settings.redis_host is None or settings.redis_port is None:
pytest.skip("Redis not configured - skipping conversation lock test")
conversation = client.conversations.create(agent_id=agent.id)
# Send first message (should acquire and release lock)
messages1 = list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Hello"}],
)
)
assert len(messages1) > 0
# Send second message - should succeed if lock was released
messages2 = list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Hello again"}],
)
)
assert len(messages2) > 0
def test_conversation_lock_released_on_error(self, client: Letta, agent):
"""Test that lock is released even when the run encounters an error.
This test sends a message that triggers an error during streaming (by causing
a context window exceeded error with a very long message), then verifies the
lock is properly released by successfully sending another message.
"""
from letta.settings import settings
# Skip if Redis is not configured
if settings.redis_host is None or settings.redis_port is None:
pytest.skip("Redis not configured - skipping conversation lock test")
conversation = client.conversations.create(agent_id=agent.id)
# Try to send a message that will cause an error during processing
# We use an extremely long message to trigger a context window error
very_long_message = "Hello " * 100000 # Very long message to exceed context window
try:
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": very_long_message}],
)
)
except Exception:
pass # Expected to fail due to context window exceeded
# Send another message - should succeed if lock was released after error
messages = list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Hello after error"}],
)
)
assert len(messages) > 0, "Lock should be released even after run error"
def test_concurrent_messages_to_same_conversation(self, client: Letta, agent):
"""Test that concurrent messages to the same conversation are properly serialized.
One request should succeed and one should get a 409 CONVERSATION_BUSY error.
After both return, a subsequent message should succeed.
"""
import concurrent.futures
from letta_client import ConflictError
from letta.settings import settings
# Skip if Redis is not configured
if settings.redis_host is None or settings.redis_port is None:
pytest.skip("Redis not configured - skipping conversation lock test")
conversation = client.conversations.create(agent_id=agent.id)
results = {"success": 0, "conflict": 0, "other_error": 0}
def send_message(msg: str):
try:
messages = list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": msg}],
)
)
return ("success", messages)
except ConflictError:
return ("conflict", None)
except Exception as e:
return ("other_error", str(e))
# Fire off two messages concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
future1 = executor.submit(send_message, "Message 1")
future2 = executor.submit(send_message, "Message 2")
result1 = future1.result()
result2 = future2.result()
# Count results
for result_type, _ in [result1, result2]:
results[result_type] += 1
# One should succeed and one should get conflict
assert results["success"] == 1, f"Expected 1 success, got {results['success']}"
assert results["conflict"] == 1, f"Expected 1 conflict, got {results['conflict']}"
assert results["other_error"] == 0, f"Unexpected errors: {results['other_error']}"
# Now send another message - should succeed since lock is released
messages = list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Message after concurrent requests"}],
)
)
assert len(messages) > 0, "Should be able to send message after concurrent requests complete"
def test_list_conversation_messages_order_asc(self, client: Letta, agent):
"""Test listing messages in ascending order (oldest first)."""
conversation = client.conversations.create(agent_id=agent.id)
# Send messages to create history
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "First message"}],
)
)
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Second message"}],
)
)
# List messages in ascending order (oldest first)
messages_asc = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
# First message should be system message (oldest)
assert messages_asc[0].message_type == "system_message"
# Get user messages and verify order
user_messages = [m for m in messages_asc if m.message_type == "user_message"]
assert len(user_messages) >= 2
# First user message should contain "First message"
assert "First" in user_messages[0].content
def test_list_conversation_messages_order_desc(self, client: Letta, agent):
"""Test listing messages in descending order (newest first)."""
conversation = client.conversations.create(agent_id=agent.id)
# Send messages to create history
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "First message"}],
)
)
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Second message"}],
)
)
# List messages in descending order (newest first) - this is the default
messages_desc = client.conversations.messages.list(
conversation_id=conversation.id,
order="desc",
)
# Get user messages and verify order
user_messages = [m for m in messages_desc if m.message_type == "user_message"]
assert len(user_messages) >= 2
# First user message in desc order should contain "Second message" (newest)
assert "Second" in user_messages[0].content
def test_list_conversation_messages_order_affects_pagination(self, client: Letta, agent):
"""Test that order parameter affects pagination correctly."""
conversation = client.conversations.create(agent_id=agent.id)
# Send multiple messages
for i in range(3):
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": f"Message {i}"}],
)
)
# Get all messages in descending order with limit
messages_desc = client.conversations.messages.list(
conversation_id=conversation.id,
order="desc",
limit=5,
)
# Get all messages in ascending order with limit
messages_asc = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
limit=5,
)
# The first messages should be different based on order
assert messages_desc[0].id != messages_asc[0].id
def test_list_conversation_messages_with_before_cursor(self, client: Letta, agent):
"""Test pagination with before cursor."""
conversation = client.conversations.create(agent_id=agent.id)
# Send messages to create history
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "First message"}],
)
)
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Second message"}],
)
)
# Get all messages first
all_messages = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
assert len(all_messages) >= 4 # system + user + assistant + user + assistant
# Use the last message ID as cursor
last_message_id = all_messages[-1].id
messages_before = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
before=last_message_id,
)
# Should have fewer messages (all except the last one)
assert len(messages_before) < len(all_messages)
# Should not contain the cursor message
assert last_message_id not in [m.id for m in messages_before]
def test_list_conversation_messages_with_after_cursor(self, client: Letta, agent):
"""Test pagination with after cursor."""
conversation = client.conversations.create(agent_id=agent.id)
# Send messages to create history
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "First message"}],
)
)
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": "Second message"}],
)
)
# Get all messages first
all_messages = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
assert len(all_messages) >= 4
# Use the first message ID as cursor
first_message_id = all_messages[0].id
messages_after = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
after=first_message_id,
)
# Should have fewer messages (all except the first one)
assert len(messages_after) < len(all_messages)
# Should not contain the cursor message
assert first_message_id not in [m.id for m in messages_after]
class TestConversationDelete:
"""Tests for the conversation delete endpoint."""
def test_delete_conversation(self, client: Letta, agent, server_url: str):
"""Test soft deleting a conversation."""
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
assert conversation.id is not None
# Delete it via REST endpoint
response = requests.delete(
f"{server_url}/v1/conversations/{conversation.id}",
)
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
# Verify it's no longer accessible
response = requests.get(
f"{server_url}/v1/conversations/{conversation.id}",
)
assert response.status_code == 404, f"Expected 404 for deleted conversation, got {response.status_code}"
def test_delete_conversation_removes_from_list(self, client: Letta, agent, server_url: str):
"""Test that deleted conversations don't appear in list."""
# Create two conversations
conv1 = client.conversations.create(agent_id=agent.id)
conv2 = client.conversations.create(agent_id=agent.id)
# Verify both appear in list
conversations = client.conversations.list(agent_id=agent.id)
conv_ids = [c.id for c in conversations]
assert conv1.id in conv_ids
assert conv2.id in conv_ids
# Delete one
response = requests.delete(
f"{server_url}/v1/conversations/{conv1.id}",
)
assert response.status_code == 200
# Verify only the non-deleted one appears in list
conversations = client.conversations.list(agent_id=agent.id)
conv_ids = [c.id for c in conversations]
assert conv1.id not in conv_ids, "Deleted conversation should not appear in list"
assert conv2.id in conv_ids, "Non-deleted conversation should still appear"
def test_delete_conversation_not_found(self, client: Letta, agent, server_url: str):
"""Test that deleting a non-existent conversation returns 404."""
fake_id = "conv-00000000-0000-0000-0000-000000000000"
response = requests.delete(
f"{server_url}/v1/conversations/{fake_id}",
)
assert response.status_code == 404
def test_delete_conversation_double_delete(self, client: Letta, agent, server_url: str):
"""Test that deleting an already-deleted conversation returns 404."""
# Create and delete a conversation
conversation = client.conversations.create(agent_id=agent.id)
# First delete should succeed
response = requests.delete(
f"{server_url}/v1/conversations/{conversation.id}",
)
assert response.status_code == 200
# Second delete should return 404
response = requests.delete(
f"{server_url}/v1/conversations/{conversation.id}",
)
assert response.status_code == 404, "Double delete should return 404"
def test_update_deleted_conversation_fails(self, client: Letta, agent, server_url: str):
"""Test that updating a deleted conversation returns 404."""
# Create and delete a conversation
conversation = client.conversations.create(agent_id=agent.id)
response = requests.delete(
f"{server_url}/v1/conversations/{conversation.id}",
)
assert response.status_code == 200
# Try to update the deleted conversation
response = requests.patch(
f"{server_url}/v1/conversations/{conversation.id}",
json={"summary": "Updated summary"},
)
assert response.status_code == 404, "Updating deleted conversation should return 404"
class TestConversationCompact:
"""Tests for the conversation compact (summarization) endpoint."""
def test_compact_conversation_basic(self, client: Letta, agent, server_url: str):
"""Test basic conversation compaction via the REST endpoint."""
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Send multiple messages to create a history worth summarizing
for i in range(5):
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": f"Message {i}: Tell me about topic {i}."}],
)
)
# Get initial message count
initial_messages = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
initial_count = len(initial_messages)
assert initial_count >= 10 # At least 5 user + 5 assistant messages
# Call compact endpoint via REST
response = requests.post(
f"{server_url}/v1/conversations/{conversation.id}/compact",
json={},
)
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
result = response.json()
# Verify the response structure
assert "summary" in result
assert "num_messages_before" in result
assert "num_messages_after" in result
assert isinstance(result["summary"], str)
assert len(result["summary"]) > 0
assert result["num_messages_before"] > result["num_messages_after"]
# Verify messages were actually compacted
compacted_messages = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
assert len(compacted_messages) < initial_count
def test_compact_conversation_creates_summary_role_message(self, client: Letta, agent, server_url: str):
"""Test that compaction creates a summary message with role='summary'."""
# Create a conversation
conversation = client.conversations.create(agent_id=agent.id)
# Send multiple messages to create a history worth summarizing
for i in range(5):
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": f"Message {i}: Tell me about topic {i}."}],
)
)
# Call compact endpoint with 'all' mode to ensure a single summary
response = requests.post(
f"{server_url}/v1/conversations/{conversation.id}/compact",
json={
"compaction_settings": {
"mode": "all",
}
},
)
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
# Get compacted messages
compacted_messages = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
# After 'all' mode compaction, we expect: system message + summary message
# The summary message should have role='summary'
summary_messages = [msg for msg in compacted_messages if msg.role == "summary"]
assert len(summary_messages) == 1, (
f"Expected exactly 1 summary message after compaction, found {len(summary_messages)}. "
f"Message roles: {[msg.role for msg in compacted_messages]}"
)
def test_compact_conversation_with_settings(self, client: Letta, agent, server_url: str):
"""Test conversation compaction with custom compaction settings."""
# Create a conversation with multiple messages
conversation = client.conversations.create(agent_id=agent.id)
for i in range(5):
list(
client.conversations.messages.create(
conversation_id=conversation.id,
messages=[{"role": "user", "content": f"Remember fact {i}: The number {i} is important."}],
)
)
# Call compact with 'all' mode
response = requests.post(
f"{server_url}/v1/conversations/{conversation.id}/compact",
json={
"compaction_settings": {
"mode": "all",
}
},
)
assert response.status_code == 200, f"Expected 200, got {response.status_code}: {response.text}"
result = response.json()
assert result["num_messages_before"] > result["num_messages_after"]
def test_compact_conversation_preserves_conversation_isolation(self, client: Letta, agent, server_url: str):
"""Test that compacting one conversation doesn't affect another."""
# Create two conversations
conv1 = client.conversations.create(agent_id=agent.id)
conv2 = client.conversations.create(agent_id=agent.id)
# Add messages to both
for i in range(5):
list(
client.conversations.messages.create(
conversation_id=conv1.id,
messages=[{"role": "user", "content": f"Conv1 message {i}"}],
)
)
list(
client.conversations.messages.create(
conversation_id=conv2.id,
messages=[{"role": "user", "content": f"Conv2 message {i}"}],
)
)
# Get initial counts
conv1_initial = len(client.conversations.messages.list(conversation_id=conv1.id))
conv2_initial = len(client.conversations.messages.list(conversation_id=conv2.id))
# Compact only conv1
response = requests.post(
f"{server_url}/v1/conversations/{conv1.id}/compact",
json={},
)
assert response.status_code == 200
# Conv1 should be compacted
conv1_after = len(client.conversations.messages.list(conversation_id=conv1.id))
assert conv1_after < conv1_initial
# Conv2 should be unchanged
conv2_after = len(client.conversations.messages.list(conversation_id=conv2.id))
assert conv2_after == conv2_initial
def test_compact_conversation_empty_fails(self, client: Letta, agent, server_url: str):
"""Test that compacting an empty conversation fails gracefully."""
# Create a new conversation without messages
conversation = client.conversations.create(agent_id=agent.id)
# Try to compact - should fail since no messages exist
response = requests.post(
f"{server_url}/v1/conversations/{conversation.id}/compact",
json={},
)
# Should return 400 because there are no in-context messages
assert response.status_code == 400
def test_compact_conversation_invalid_id(self, client: Letta, agent, server_url: str):
"""Test that compacting with invalid conversation ID returns 404."""
fake_id = "conv-00000000-0000-0000-0000-000000000000"
response = requests.post(
f"{server_url}/v1/conversations/{fake_id}/compact",
json={},
)
assert response.status_code == 404
class TestConversationSystemMessageRecompilation:
"""Tests that verify the system message is recompiled with latest memory state on new conversation creation."""
def test_new_conversation_recompiles_system_message_with_updated_memory(self, client: Letta, server_url: str):
"""Test the full workflow:
1. Agent is created
2. Send message to agent (through a conversation)
3. Modify the memory block -> check system message is NOT updated with the modified value
4. Create a new conversation
5. Check new conversation system message DOES have the modified value
"""
unique_marker = f"UNIQUE_MARKER_{uuid.uuid4().hex[:8]}"
# Step 1: Create an agent with known memory blocks
agent = client.agents.create(
name=f"test_sys_msg_recompile_{uuid.uuid4().hex[:8]}",
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
memory_blocks=[
{"label": "human", "value": "The user is a test user."},
{"label": "persona", "value": "You are a helpful assistant."},
],
)
try:
# Step 2: Create a conversation and send a message to it
conv1 = client.conversations.create(agent_id=agent.id)
list(
client.conversations.messages.create(
conversation_id=conv1.id,
messages=[{"role": "user", "content": "Hello, just a quick test."}],
)
)
# Verify the conversation has messages including a system message
conv1_messages = client.conversations.messages.list(
conversation_id=conv1.id,
order="asc",
)
assert len(conv1_messages) >= 3 # system + user + assistant
assert conv1_messages[0].message_type == "system_message"
# Get the original system message content
original_system_content = conv1_messages[0].content
assert unique_marker not in original_system_content, "Marker should not be in original system message"
# Step 3: Modify the memory block with a unique marker
client.agents.blocks.update(
agent_id=agent.id,
block_label="human",
value=f"The user is a test user. {unique_marker}",
)
# Verify the block was actually updated
updated_block = client.agents.blocks.retrieve(agent_id=agent.id, block_label="human")
assert unique_marker in updated_block.value
# Check that the OLD conversation's system message is NOT updated
conv1_messages_after_update = client.conversations.messages.list(
conversation_id=conv1.id,
order="asc",
)
old_system_content = conv1_messages_after_update[0].content
assert unique_marker not in old_system_content, (
"Old conversation system message should NOT contain the updated memory value"
)
# Step 4: Create a new conversation
conv2 = client.conversations.create(agent_id=agent.id)
# Step 5: Check the new conversation's system message has the updated value
# The system message should be compiled at creation time with the latest memory
conv2_retrieved = client.conversations.retrieve(conversation_id=conv2.id)
assert len(conv2_retrieved.in_context_message_ids) == 1, (
f"New conversation should have exactly 1 system message, got {len(conv2_retrieved.in_context_message_ids)}"
)
conv2_messages = client.conversations.messages.list(
conversation_id=conv2.id,
order="asc",
)
assert len(conv2_messages) >= 1
assert conv2_messages[0].message_type == "system_message"
new_system_content = conv2_messages[0].content
assert unique_marker in new_system_content, (
f"New conversation system message should contain the updated memory value '{unique_marker}', "
f"but system message content did not include it"
)
finally:
client.agents.delete(agent_id=agent.id)
def test_conversation_creation_initializes_system_message(self, client: Letta, server_url: str):
"""Test that creating a conversation immediately initializes it with a system message."""
agent = client.agents.create(
name=f"test_conv_init_{uuid.uuid4().hex[:8]}",
model="openai/gpt-4o-mini",
embedding="openai/text-embedding-3-small",
memory_blocks=[
{"label": "human", "value": "Test user for system message init."},
{"label": "persona", "value": "You are a helpful assistant."},
],
)
try:
# Create a conversation (without sending any messages)
conversation = client.conversations.create(agent_id=agent.id)
# Verify the conversation has a system message immediately
retrieved = client.conversations.retrieve(conversation_id=conversation.id)
assert len(retrieved.in_context_message_ids) == 1, (
f"Expected 1 system message after conversation creation, got {len(retrieved.in_context_message_ids)}"
)
# Verify the system message content contains memory block values
messages = client.conversations.messages.list(
conversation_id=conversation.id,
order="asc",
)
assert len(messages) == 1
assert messages[0].message_type == "system_message"
assert "Test user for system message init." in messages[0].content
finally:
client.agents.delete(agent_id=agent.id)