fix: conversation id not found in tpuf (#8469)
* fix: conversation id not found in tpuf * add tests
This commit is contained in:
@@ -2283,6 +2283,212 @@ async def test_message_conversation_id_filtering(server, sarah_agent, default_us
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(not settings.tpuf_api_key, reason="Turbopuffer API key not configured")
|
||||
async def test_query_messages_with_mixed_conversation_id_presence(enable_message_embedding, default_user):
|
||||
"""Test that querying works when the namespace schema doesn't have conversation_id.
|
||||
|
||||
This test validates the fix for the error:
|
||||
'attribute "conversation_id" not found in schema, cannot be part of include_attributes'
|
||||
|
||||
The fix changed from explicitly listing attributes (which fails when the namespace
|
||||
schema doesn't have conversation_id) to using include_attributes=True which gracefully
|
||||
returns all available attributes.
|
||||
|
||||
IMPORTANT: This test uses raw Turbopuffer API to insert messages WITHOUT
|
||||
the conversation_id schema, then queries BEFORE any new messages are added.
|
||||
This reproduces the exact production scenario where old namespaces don't have
|
||||
conversation_id in their schema.
|
||||
"""
|
||||
from turbopuffer import AsyncTurbopuffer
|
||||
|
||||
from letta.helpers.tpuf_client import TurbopufferClient
|
||||
|
||||
client = TurbopufferClient()
|
||||
agent_id = f"test-agent-{uuid.uuid4()}"
|
||||
org_id = str(uuid.uuid4())
|
||||
namespace_name = f"messages_{org_id}_dev"
|
||||
|
||||
try:
|
||||
# Insert messages using raw Turbopuffer API WITHOUT conversation_id in schema
|
||||
# This simulates a namespace that was created before conversation_id feature existed
|
||||
message_ids = [str(uuid.uuid4()), str(uuid.uuid4())]
|
||||
message_texts = [
|
||||
"Message without conversation_id about Python",
|
||||
"Another message about machine learning",
|
||||
]
|
||||
|
||||
# Generate embeddings
|
||||
embeddings = await client._generate_embeddings(message_texts, default_user)
|
||||
|
||||
# Use raw Turbopuffer API to insert WITHOUT conversation_id in schema
|
||||
async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf:
|
||||
namespace = tpuf.namespace(namespace_name)
|
||||
await namespace.write(
|
||||
upsert_columns={
|
||||
"id": message_ids,
|
||||
"vector": embeddings,
|
||||
"text": message_texts,
|
||||
"organization_id": [org_id, org_id],
|
||||
"agent_id": [agent_id, agent_id],
|
||||
"role": ["user", "assistant"],
|
||||
"created_at": [datetime.now(timezone.utc), datetime.now(timezone.utc)],
|
||||
# NOTE: No conversation_id column - schema won't have this attribute!
|
||||
},
|
||||
distance_metric="cosine_distance",
|
||||
schema={
|
||||
"text": {"type": "string", "full_text_search": True},
|
||||
# NOTE: No conversation_id in schema - this is the key!
|
||||
},
|
||||
)
|
||||
|
||||
# Wait for indexing
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# CRITICAL: Query BEFORE inserting any new messages with conversation_id
|
||||
# This is when the bug manifests - the schema doesn't have conversation_id yet
|
||||
|
||||
# Test 1: Timestamp mode query
|
||||
timestamp_results = await client.query_messages_by_agent_id(
|
||||
agent_id=agent_id,
|
||||
organization_id=org_id,
|
||||
search_mode="timestamp",
|
||||
top_k=10,
|
||||
actor=default_user,
|
||||
)
|
||||
assert len(timestamp_results) == 2, f"Expected 2 messages, got {len(timestamp_results)}"
|
||||
result_ids = [msg["id"] for msg, _, _ in timestamp_results]
|
||||
for msg_id in message_ids:
|
||||
assert msg_id in result_ids
|
||||
|
||||
# Test 2: Vector search
|
||||
vector_results = await client.query_messages_by_agent_id(
|
||||
agent_id=agent_id,
|
||||
organization_id=org_id,
|
||||
actor=default_user,
|
||||
query_text="Python programming",
|
||||
search_mode="vector",
|
||||
top_k=10,
|
||||
)
|
||||
assert len(vector_results) > 0
|
||||
|
||||
# Test 3: Hybrid search
|
||||
hybrid_results = await client.query_messages_by_agent_id(
|
||||
agent_id=agent_id,
|
||||
organization_id=org_id,
|
||||
actor=default_user,
|
||||
query_text="message",
|
||||
search_mode="hybrid",
|
||||
top_k=10,
|
||||
vector_weight=0.5,
|
||||
fts_weight=0.5,
|
||||
)
|
||||
assert len(hybrid_results) > 0
|
||||
|
||||
# Test 4: FTS search
|
||||
fts_results = await client.query_messages_by_agent_id(
|
||||
agent_id=agent_id,
|
||||
organization_id=org_id,
|
||||
actor=default_user,
|
||||
query_text="Python",
|
||||
search_mode="fts",
|
||||
top_k=10,
|
||||
)
|
||||
assert len(fts_results) > 0
|
||||
assert any("Python" in msg["text"] for msg, _, _ in fts_results)
|
||||
|
||||
finally:
|
||||
# Clean up - delete the entire namespace
|
||||
try:
|
||||
async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf:
|
||||
namespace = tpuf.namespace(namespace_name)
|
||||
await namespace.delete_all()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(not settings.tpuf_api_key, reason="Turbopuffer API key not configured")
|
||||
async def test_query_messages_by_org_id_with_missing_conversation_id_schema(enable_message_embedding, default_user):
|
||||
"""Test that query_messages_by_org_id works when the namespace doesn't have conversation_id in schema.
|
||||
|
||||
This is the companion test to test_query_messages_with_mixed_conversation_id_presence,
|
||||
validating the same fix for the query_messages_by_org_id method.
|
||||
|
||||
IMPORTANT: This test queries BEFORE any messages with conversation_id are inserted,
|
||||
to reproduce the exact production scenario.
|
||||
"""
|
||||
from turbopuffer import AsyncTurbopuffer
|
||||
|
||||
from letta.helpers.tpuf_client import TurbopufferClient
|
||||
|
||||
client = TurbopufferClient()
|
||||
agent_id = f"test-agent-{uuid.uuid4()}"
|
||||
org_id = str(uuid.uuid4())
|
||||
namespace_name = f"messages_{org_id}_dev"
|
||||
|
||||
try:
|
||||
# Insert messages using raw Turbopuffer API WITHOUT conversation_id in schema
|
||||
message_ids = [str(uuid.uuid4()), str(uuid.uuid4())]
|
||||
message_texts = ["Org message about Python", "Org message about JavaScript"]
|
||||
|
||||
# Generate embeddings
|
||||
embeddings = await client._generate_embeddings(message_texts, default_user)
|
||||
|
||||
# Use raw Turbopuffer API to insert WITHOUT conversation_id in schema
|
||||
async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf:
|
||||
namespace = tpuf.namespace(namespace_name)
|
||||
await namespace.write(
|
||||
upsert_columns={
|
||||
"id": message_ids,
|
||||
"vector": embeddings,
|
||||
"text": message_texts,
|
||||
"organization_id": [org_id, org_id],
|
||||
"agent_id": [agent_id, agent_id],
|
||||
"role": ["user", "assistant"],
|
||||
"created_at": [datetime.now(timezone.utc), datetime.now(timezone.utc)],
|
||||
# NOTE: No conversation_id column
|
||||
},
|
||||
distance_metric="cosine_distance",
|
||||
schema={
|
||||
"text": {"type": "string", "full_text_search": True},
|
||||
# NOTE: No conversation_id in schema
|
||||
},
|
||||
)
|
||||
|
||||
# Wait for indexing
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# CRITICAL: Query BEFORE inserting any new messages with conversation_id
|
||||
# This is when the bug manifests - schema doesn't have conversation_id
|
||||
|
||||
# Query at org level - should work even without conversation_id in schema
|
||||
org_results = await client.query_messages_by_org_id(
|
||||
organization_id=org_id,
|
||||
actor=default_user,
|
||||
query_text="message",
|
||||
search_mode="hybrid",
|
||||
top_k=10,
|
||||
vector_weight=0.5,
|
||||
fts_weight=0.5,
|
||||
)
|
||||
|
||||
# Should find both messages
|
||||
assert len(org_results) == 2, f"Expected 2 messages, got {len(org_results)}"
|
||||
result_ids = [msg["id"] for msg, _, _ in org_results]
|
||||
assert message_ids[0] in result_ids
|
||||
assert message_ids[1] in result_ids
|
||||
|
||||
finally:
|
||||
# Clean up - delete the entire namespace
|
||||
try:
|
||||
async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf:
|
||||
namespace = tpuf.namespace(namespace_name)
|
||||
await namespace.delete_all()
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_system_messages_not_embedded_during_agent_creation(server, default_user, enable_message_embedding):
|
||||
"""Test that system messages are filtered out before being passed to the embedding pipeline during agent creation"""
|
||||
|
||||
Reference in New Issue
Block a user