From cdca1a564f336b97dbed4be369d9b7edcfa2c3d7 Mon Sep 17 00:00:00 2001 From: Ari Webb Date: Fri, 9 Jan 2026 11:18:02 -0800 Subject: [PATCH] fix: conversation id not found in tpuf (#8469) * fix: conversation id not found in tpuf * add tests --- letta/helpers/tpuf_client.py | 4 +- tests/integration_test_turbopuffer.py | 206 ++++++++++++++++++++++++++ 2 files changed, 208 insertions(+), 2 deletions(-) diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index c5e7f925..992cfd33 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -932,7 +932,7 @@ class TurbopufferClient: query_embedding=query_embedding, query_text=query_text, top_k=top_k, - include_attributes=["text", "organization_id", "agent_id", "role", "created_at", "conversation_id"], + include_attributes=True, filters=final_filter, vector_weight=vector_weight, fts_weight=fts_weight, @@ -1097,7 +1097,7 @@ class TurbopufferClient: query_embedding=query_embedding, query_text=query_text, top_k=top_k, - include_attributes=["text", "organization_id", "agent_id", "role", "created_at", "conversation_id"], + include_attributes=True, filters=final_filter, vector_weight=vector_weight, fts_weight=fts_weight, diff --git a/tests/integration_test_turbopuffer.py b/tests/integration_test_turbopuffer.py index 031d67db..31fa21be 100644 --- a/tests/integration_test_turbopuffer.py +++ b/tests/integration_test_turbopuffer.py @@ -2283,6 +2283,212 @@ async def test_message_conversation_id_filtering(server, sarah_agent, default_us ) +@pytest.mark.asyncio +@pytest.mark.skipif(not settings.tpuf_api_key, reason="Turbopuffer API key not configured") +async def test_query_messages_with_mixed_conversation_id_presence(enable_message_embedding, default_user): + """Test that querying works when the namespace schema doesn't have conversation_id. + + This test validates the fix for the error: + 'attribute "conversation_id" not found in schema, cannot be part of include_attributes' + + The fix changed from explicitly listing attributes (which fails when the namespace + schema doesn't have conversation_id) to using include_attributes=True which gracefully + returns all available attributes. + + IMPORTANT: This test uses raw Turbopuffer API to insert messages WITHOUT + the conversation_id schema, then queries BEFORE any new messages are added. + This reproduces the exact production scenario where old namespaces don't have + conversation_id in their schema. + """ + from turbopuffer import AsyncTurbopuffer + + from letta.helpers.tpuf_client import TurbopufferClient + + client = TurbopufferClient() + agent_id = f"test-agent-{uuid.uuid4()}" + org_id = str(uuid.uuid4()) + namespace_name = f"messages_{org_id}_dev" + + try: + # Insert messages using raw Turbopuffer API WITHOUT conversation_id in schema + # This simulates a namespace that was created before conversation_id feature existed + message_ids = [str(uuid.uuid4()), str(uuid.uuid4())] + message_texts = [ + "Message without conversation_id about Python", + "Another message about machine learning", + ] + + # Generate embeddings + embeddings = await client._generate_embeddings(message_texts, default_user) + + # Use raw Turbopuffer API to insert WITHOUT conversation_id in schema + async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf: + namespace = tpuf.namespace(namespace_name) + await namespace.write( + upsert_columns={ + "id": message_ids, + "vector": embeddings, + "text": message_texts, + "organization_id": [org_id, org_id], + "agent_id": [agent_id, agent_id], + "role": ["user", "assistant"], + "created_at": [datetime.now(timezone.utc), datetime.now(timezone.utc)], + # NOTE: No conversation_id column - schema won't have this attribute! + }, + distance_metric="cosine_distance", + schema={ + "text": {"type": "string", "full_text_search": True}, + # NOTE: No conversation_id in schema - this is the key! + }, + ) + + # Wait for indexing + await asyncio.sleep(1) + + # CRITICAL: Query BEFORE inserting any new messages with conversation_id + # This is when the bug manifests - the schema doesn't have conversation_id yet + + # Test 1: Timestamp mode query + timestamp_results = await client.query_messages_by_agent_id( + agent_id=agent_id, + organization_id=org_id, + search_mode="timestamp", + top_k=10, + actor=default_user, + ) + assert len(timestamp_results) == 2, f"Expected 2 messages, got {len(timestamp_results)}" + result_ids = [msg["id"] for msg, _, _ in timestamp_results] + for msg_id in message_ids: + assert msg_id in result_ids + + # Test 2: Vector search + vector_results = await client.query_messages_by_agent_id( + agent_id=agent_id, + organization_id=org_id, + actor=default_user, + query_text="Python programming", + search_mode="vector", + top_k=10, + ) + assert len(vector_results) > 0 + + # Test 3: Hybrid search + hybrid_results = await client.query_messages_by_agent_id( + agent_id=agent_id, + organization_id=org_id, + actor=default_user, + query_text="message", + search_mode="hybrid", + top_k=10, + vector_weight=0.5, + fts_weight=0.5, + ) + assert len(hybrid_results) > 0 + + # Test 4: FTS search + fts_results = await client.query_messages_by_agent_id( + agent_id=agent_id, + organization_id=org_id, + actor=default_user, + query_text="Python", + search_mode="fts", + top_k=10, + ) + assert len(fts_results) > 0 + assert any("Python" in msg["text"] for msg, _, _ in fts_results) + + finally: + # Clean up - delete the entire namespace + try: + async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf: + namespace = tpuf.namespace(namespace_name) + await namespace.delete_all() + except: + pass + + +@pytest.mark.asyncio +@pytest.mark.skipif(not settings.tpuf_api_key, reason="Turbopuffer API key not configured") +async def test_query_messages_by_org_id_with_missing_conversation_id_schema(enable_message_embedding, default_user): + """Test that query_messages_by_org_id works when the namespace doesn't have conversation_id in schema. + + This is the companion test to test_query_messages_with_mixed_conversation_id_presence, + validating the same fix for the query_messages_by_org_id method. + + IMPORTANT: This test queries BEFORE any messages with conversation_id are inserted, + to reproduce the exact production scenario. + """ + from turbopuffer import AsyncTurbopuffer + + from letta.helpers.tpuf_client import TurbopufferClient + + client = TurbopufferClient() + agent_id = f"test-agent-{uuid.uuid4()}" + org_id = str(uuid.uuid4()) + namespace_name = f"messages_{org_id}_dev" + + try: + # Insert messages using raw Turbopuffer API WITHOUT conversation_id in schema + message_ids = [str(uuid.uuid4()), str(uuid.uuid4())] + message_texts = ["Org message about Python", "Org message about JavaScript"] + + # Generate embeddings + embeddings = await client._generate_embeddings(message_texts, default_user) + + # Use raw Turbopuffer API to insert WITHOUT conversation_id in schema + async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf: + namespace = tpuf.namespace(namespace_name) + await namespace.write( + upsert_columns={ + "id": message_ids, + "vector": embeddings, + "text": message_texts, + "organization_id": [org_id, org_id], + "agent_id": [agent_id, agent_id], + "role": ["user", "assistant"], + "created_at": [datetime.now(timezone.utc), datetime.now(timezone.utc)], + # NOTE: No conversation_id column + }, + distance_metric="cosine_distance", + schema={ + "text": {"type": "string", "full_text_search": True}, + # NOTE: No conversation_id in schema + }, + ) + + # Wait for indexing + await asyncio.sleep(1) + + # CRITICAL: Query BEFORE inserting any new messages with conversation_id + # This is when the bug manifests - schema doesn't have conversation_id + + # Query at org level - should work even without conversation_id in schema + org_results = await client.query_messages_by_org_id( + organization_id=org_id, + actor=default_user, + query_text="message", + search_mode="hybrid", + top_k=10, + vector_weight=0.5, + fts_weight=0.5, + ) + + # Should find both messages + assert len(org_results) == 2, f"Expected 2 messages, got {len(org_results)}" + result_ids = [msg["id"] for msg, _, _ in org_results] + assert message_ids[0] in result_ids + assert message_ids[1] in result_ids + + finally: + # Clean up - delete the entire namespace + try: + async with AsyncTurbopuffer(api_key=client.api_key, region=client.region) as tpuf: + namespace = tpuf.namespace(namespace_name) + await namespace.delete_all() + except: + pass + + @pytest.mark.asyncio async def test_system_messages_not_embedded_during_agent_creation(server, default_user, enable_message_embedding): """Test that system messages are filtered out before being passed to the embedding pipeline during agent creation"""