diff --git a/alembic/versions/d798609d65ff_add_index_on_messages_step_id.py b/alembic/versions/d798609d65ff_add_index_on_messages_step_id.py new file mode 100644 index 00000000..a289a62a --- /dev/null +++ b/alembic/versions/d798609d65ff_add_index_on_messages_step_id.py @@ -0,0 +1,36 @@ +"""add_index_on_messages_step_id + +Revision ID: d798609d65ff +Revises: 89fd4648866b +Create Date: 2025-11-07 15:43:59.446292 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa + +from alembic import op +from letta.settings import settings + +# revision identifiers, used by Alembic. +revision: str = "d798609d65ff" +down_revision: Union[str, None] = "89fd4648866b" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Skip this migration for SQLite + if not settings.letta_pg_uri_no_default: + return + + op.create_index("idx_messages_step_id", "messages", ["step_id"], if_not_exists=True) + + +def downgrade() -> None: + # Skip this migration for SQLite + if not settings.letta_pg_uri_no_default: + return + + op.drop_index("idx_messages_step_id", table_name="messages", if_exists=True) diff --git a/letta/services/passage_manager.py b/letta/services/passage_manager.py index 1003ba8a..42f78d8a 100644 --- a/letta/services/passage_manager.py +++ b/letta/services/passage_manager.py @@ -23,6 +23,7 @@ from letta.schemas.passage import Passage as PydanticPassage from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.services.archive_manager import ArchiveManager +from letta.settings import settings from letta.utils import enforce_types logger = get_logger(__name__) @@ -152,10 +153,23 @@ class PassageManager: if tags: tags = list(set(tags)) + # Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB) + embedding = data["embedding"] + if embedding: + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + # Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone) + if not should_use_tpuf(): + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist() + common_fields = { "id": data.get("id"), "text": data["text"], - "embedding": data["embedding"], + "embedding": embedding, "embedding_config": data["embedding_config"], "organization_id": data["organization_id"], "metadata_": data.get("metadata", {}), @@ -200,10 +214,23 @@ class PassageManager: if tags: tags = list(set(tags)) + # Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB) + embedding = data["embedding"] + if embedding: + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + # Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone) + if not should_use_tpuf(): + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist() + common_fields = { "id": data.get("id"), "text": data["text"], - "embedding": data["embedding"], + "embedding": embedding, "embedding_config": data["embedding_config"], "organization_id": data["organization_id"], "metadata_": data.get("metadata", {}), @@ -285,10 +312,24 @@ class PassageManager: raise ValueError("Archival passage cannot have source_id") data = p.model_dump(to_orm=True) + + # Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB) + embedding = data["embedding"] + if embedding: + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + # Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone) + if not should_use_tpuf(): + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist() + common_fields = { "id": data.get("id"), "text": data["text"], - "embedding": data["embedding"], + "embedding": embedding, "embedding_config": data["embedding_config"], "organization_id": data["organization_id"], "metadata_": data.get("metadata", {}), @@ -325,10 +366,24 @@ class PassageManager: raise ValueError("Source passage cannot have archive_id") data = p.model_dump(to_orm=True) + + # Pad embeddings to MAX_EMBEDDING_DIM for pgvector (always pad when writing to Postgres) + embedding = data["embedding"] + if embedding: + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + # Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone) + if not should_use_tpuf(): + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist() + common_fields = { "id": data.get("id"), "text": data["text"], - "embedding": data["embedding"], + "embedding": embedding, "embedding_config": data["embedding_config"], "organization_id": data["organization_id"], "metadata_": data.get("metadata", {}), @@ -549,6 +604,21 @@ class PassageManager: # Update the tags on the passage object setattr(curr_passage, "tags", new_tags) + # Pad embeddings if needed (only when using Postgres as vector DB) + if "embedding" in update_data and update_data["embedding"]: + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + # Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone) + if not should_use_tpuf(): + embedding = update_data["embedding"] + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + update_data["embedding"] = np.pad( + np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant" + ).tolist() + # Update other fields for key, value in update_data.items(): setattr(curr_passage, key, value) @@ -578,6 +648,22 @@ class PassageManager: # Update the database record with values from the provided record update_data = passage.model_dump(to_orm=True, exclude_unset=True, exclude_none=True) + + # Pad embeddings if needed (only when using Postgres as vector DB) + if "embedding" in update_data and update_data["embedding"]: + import numpy as np + + from letta.helpers.tpuf_client import should_use_tpuf + + # Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone) + if not should_use_tpuf(): + embedding = update_data["embedding"] + np_embedding = np.array(embedding) + if np_embedding.shape[0] != MAX_EMBEDDING_DIM: + update_data["embedding"] = np.pad( + np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant" + ).tolist() + for key, value in update_data.items(): setattr(curr_passage, key, value)