From 2bb4caffc38f094f96a2becd9244fe2285b45cce Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Wed, 21 Jan 2026 15:50:51 -0800 Subject: [PATCH] fix: remove unused embedding generation (#9013) * remove unused embedding generation * prevent double embed * fix embedding dimension comparison and valueerror --- letta/helpers/tpuf_client.py | 30 +++++++++++++++++++++++++++--- letta/services/agent_manager.py | 9 --------- letta/services/archive_manager.py | 3 ++- letta/services/passage_manager.py | 9 +++++---- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index a550f18d..169b9969 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -7,6 +7,7 @@ from datetime import datetime, timezone from typing import Any, Callable, List, Optional, Tuple from letta.constants import DEFAULT_EMBEDDING_CHUNK_SIZE +from letta.errors import LettaInvalidArgumentError from letta.otel.tracing import trace_method from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import MessageRole, TagMatchMode @@ -321,6 +322,7 @@ class TurbopufferClient: actor: "PydanticUser", tags: Optional[List[str]] = None, created_at: Optional[datetime] = None, + embeddings: Optional[List[List[float]]] = None, ) -> List[PydanticPassage]: """Insert passages into Turbopuffer. @@ -332,6 +334,7 @@ class TurbopufferClient: actor: User actor for embedding generation tags: Optional list of tags to attach to all passages created_at: Optional timestamp for retroactive entries (defaults to current UTC time) + embeddings: Optional pre-computed embeddings (must match 1:1 with text_chunks). If provided, skips embedding generation. Returns: List of PydanticPassage objects that were inserted @@ -345,9 +348,30 @@ class TurbopufferClient: logger.warning("All text chunks were empty, skipping insertion") return [] - # generate embeddings using the default config filtered_texts = [text for _, text in filtered_chunks] - embeddings = await self._generate_embeddings(filtered_texts, actor) + + # use provided embeddings only if dimensions match TPUF's expected dimension + use_provided_embeddings = False + if embeddings is not None: + if len(embeddings) != len(text_chunks): + raise LettaInvalidArgumentError( + f"embeddings length ({len(embeddings)}) must match text_chunks length ({len(text_chunks)})", + argument_name="embeddings", + ) + # check if first non-empty embedding has correct dimensions + filtered_indices = [i for i, _ in filtered_chunks] + sample_embedding = embeddings[filtered_indices[0]] if filtered_indices else None + if sample_embedding is not None and len(sample_embedding) == self.default_embedding_config.embedding_dim: + use_provided_embeddings = True + filtered_embeddings = [embeddings[i] for i, _ in filtered_chunks] + else: + logger.debug( + f"Embedding dimension mismatch (got {len(sample_embedding) if sample_embedding else 'None'}, " + f"expected {self.default_embedding_config.embedding_dim}), regenerating embeddings" + ) + + if not use_provided_embeddings: + filtered_embeddings = await self._generate_embeddings(filtered_texts, actor) namespace_name = await self._get_archive_namespace_name(archive_id) @@ -379,7 +403,7 @@ class TurbopufferClient: tags_arrays = [] # Store tags as arrays passages = [] - for (original_idx, text), embedding in zip(filtered_chunks, embeddings): + for (original_idx, text), embedding in zip(filtered_chunks, filtered_embeddings): passage_id = passage_ids[original_idx] # append to columns diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 20341478..986c80c7 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -2321,15 +2321,6 @@ class AgentManager: # Use Turbopuffer for vector search if archive is configured for TPUF if archive.vector_db_provider == VectorDBProvider.TPUF: from letta.helpers.tpuf_client import TurbopufferClient - from letta.llm_api.llm_client import LLMClient - - # Generate embedding for query - embedding_client = LLMClient.create( - provider_type=embedding_config.embedding_endpoint_type, - actor=actor, - ) - embeddings = await embedding_client.request_embeddings([query_text], embedding_config) - query_embedding = embeddings[0] # Query Turbopuffer - use hybrid search when text is available tpuf_client = TurbopufferClient() diff --git a/letta/services/archive_manager.py b/letta/services/archive_manager.py index b982ca4d..5265e2c8 100644 --- a/letta/services/archive_manager.py +++ b/letta/services/archive_manager.py @@ -345,13 +345,14 @@ class ArchiveManager: tpuf_client = TurbopufferClient() - # Insert to Turbopuffer with the same ID as SQL + # Insert to Turbopuffer with the same ID as SQL, reusing existing embedding await tpuf_client.insert_archival_memories( archive_id=archive.id, text_chunks=[created_passage.text], passage_ids=[created_passage.id], organization_id=actor.organization_id, actor=actor, + embeddings=[created_passage.embedding], ) logger.info(f"Uploaded passage {created_passage.id} to Turbopuffer for archive {archive_id}") except Exception as e: diff --git a/letta/services/passage_manager.py b/letta/services/passage_manager.py index 99781b55..f2d7e48a 100644 --- a/letta/services/passage_manager.py +++ b/letta/services/passage_manager.py @@ -525,20 +525,21 @@ class PassageManager: tpuf_client = TurbopufferClient() - # Extract IDs and texts from the created passages + # Extract IDs, texts, and embeddings from the created passages passage_ids = [p.id for p in passages] passage_texts = [p.text for p in passages] + passage_embeddings = [p.embedding for p in passages] - # Insert to Turbopuffer with the same IDs as SQL - # TurbopufferClient will generate embeddings internally using default config + # Insert to Turbopuffer with the same IDs as SQL, reusing existing embeddings await tpuf_client.insert_archival_memories( archive_id=archive.id, text_chunks=passage_texts, - passage_ids=passage_ids, # Use same IDs as SQL + passage_ids=passage_ids, organization_id=actor.organization_id, actor=actor, tags=tags, created_at=passages[0].created_at if passages else None, + embeddings=passage_embeddings, ) except Exception as e: logger.error(f"Failed to insert passages to Turbopuffer: {e}")