fix: address high query latency with step id index on messages (#6068)
* add step id index on messages * simple index * fix sources padding * fix import * fix passage manager * fix ci * I think fixed * disable turbopufer for managers to avoid failing passages tests
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
"""add_index_on_messages_step_id
|
||||
|
||||
Revision ID: d798609d65ff
|
||||
Revises: 89fd4648866b
|
||||
Create Date: 2025-11-07 15:43:59.446292
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
from letta.settings import settings
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "d798609d65ff"
|
||||
down_revision: Union[str, None] = "89fd4648866b"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# Skip this migration for SQLite
|
||||
if not settings.letta_pg_uri_no_default:
|
||||
return
|
||||
|
||||
op.create_index("idx_messages_step_id", "messages", ["step_id"], if_not_exists=True)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# Skip this migration for SQLite
|
||||
if not settings.letta_pg_uri_no_default:
|
||||
return
|
||||
|
||||
op.drop_index("idx_messages_step_id", table_name="messages", if_exists=True)
|
||||
@@ -23,6 +23,7 @@ from letta.schemas.passage import Passage as PydanticPassage
|
||||
from letta.schemas.user import User as PydanticUser
|
||||
from letta.server.db import db_registry
|
||||
from letta.services.archive_manager import ArchiveManager
|
||||
from letta.settings import settings
|
||||
from letta.utils import enforce_types
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -152,10 +153,23 @@ class PassageManager:
|
||||
if tags:
|
||||
tags = list(set(tags))
|
||||
|
||||
# Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB)
|
||||
embedding = data["embedding"]
|
||||
if embedding:
|
||||
import numpy as np
|
||||
|
||||
from letta.helpers.tpuf_client import should_use_tpuf
|
||||
|
||||
# Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone)
|
||||
if not should_use_tpuf():
|
||||
np_embedding = np.array(embedding)
|
||||
if np_embedding.shape[0] != MAX_EMBEDDING_DIM:
|
||||
embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist()
|
||||
|
||||
common_fields = {
|
||||
"id": data.get("id"),
|
||||
"text": data["text"],
|
||||
"embedding": data["embedding"],
|
||||
"embedding": embedding,
|
||||
"embedding_config": data["embedding_config"],
|
||||
"organization_id": data["organization_id"],
|
||||
"metadata_": data.get("metadata", {}),
|
||||
@@ -200,10 +214,23 @@ class PassageManager:
|
||||
if tags:
|
||||
tags = list(set(tags))
|
||||
|
||||
# Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB)
|
||||
embedding = data["embedding"]
|
||||
if embedding:
|
||||
import numpy as np
|
||||
|
||||
from letta.helpers.tpuf_client import should_use_tpuf
|
||||
|
||||
# Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone)
|
||||
if not should_use_tpuf():
|
||||
np_embedding = np.array(embedding)
|
||||
if np_embedding.shape[0] != MAX_EMBEDDING_DIM:
|
||||
embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist()
|
||||
|
||||
common_fields = {
|
||||
"id": data.get("id"),
|
||||
"text": data["text"],
|
||||
"embedding": data["embedding"],
|
||||
"embedding": embedding,
|
||||
"embedding_config": data["embedding_config"],
|
||||
"organization_id": data["organization_id"],
|
||||
"metadata_": data.get("metadata", {}),
|
||||
@@ -285,10 +312,24 @@ class PassageManager:
|
||||
raise ValueError("Archival passage cannot have source_id")
|
||||
|
||||
data = p.model_dump(to_orm=True)
|
||||
|
||||
# Pad embeddings to MAX_EMBEDDING_DIM for pgvector (only when using Postgres as vector DB)
|
||||
embedding = data["embedding"]
|
||||
if embedding:
|
||||
import numpy as np
|
||||
|
||||
from letta.helpers.tpuf_client import should_use_tpuf
|
||||
|
||||
# Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone)
|
||||
if not should_use_tpuf():
|
||||
np_embedding = np.array(embedding)
|
||||
if np_embedding.shape[0] != MAX_EMBEDDING_DIM:
|
||||
embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist()
|
||||
|
||||
common_fields = {
|
||||
"id": data.get("id"),
|
||||
"text": data["text"],
|
||||
"embedding": data["embedding"],
|
||||
"embedding": embedding,
|
||||
"embedding_config": data["embedding_config"],
|
||||
"organization_id": data["organization_id"],
|
||||
"metadata_": data.get("metadata", {}),
|
||||
@@ -325,10 +366,24 @@ class PassageManager:
|
||||
raise ValueError("Source passage cannot have archive_id")
|
||||
|
||||
data = p.model_dump(to_orm=True)
|
||||
|
||||
# Pad embeddings to MAX_EMBEDDING_DIM for pgvector (always pad when writing to Postgres)
|
||||
embedding = data["embedding"]
|
||||
if embedding:
|
||||
import numpy as np
|
||||
|
||||
from letta.helpers.tpuf_client import should_use_tpuf
|
||||
|
||||
# Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone)
|
||||
if not should_use_tpuf():
|
||||
np_embedding = np.array(embedding)
|
||||
if np_embedding.shape[0] != MAX_EMBEDDING_DIM:
|
||||
embedding = np.pad(np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant").tolist()
|
||||
|
||||
common_fields = {
|
||||
"id": data.get("id"),
|
||||
"text": data["text"],
|
||||
"embedding": data["embedding"],
|
||||
"embedding": embedding,
|
||||
"embedding_config": data["embedding_config"],
|
||||
"organization_id": data["organization_id"],
|
||||
"metadata_": data.get("metadata", {}),
|
||||
@@ -549,6 +604,21 @@ class PassageManager:
|
||||
# Update the tags on the passage object
|
||||
setattr(curr_passage, "tags", new_tags)
|
||||
|
||||
# Pad embeddings if needed (only when using Postgres as vector DB)
|
||||
if "embedding" in update_data and update_data["embedding"]:
|
||||
import numpy as np
|
||||
|
||||
from letta.helpers.tpuf_client import should_use_tpuf
|
||||
|
||||
# Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone)
|
||||
if not should_use_tpuf():
|
||||
embedding = update_data["embedding"]
|
||||
np_embedding = np.array(embedding)
|
||||
if np_embedding.shape[0] != MAX_EMBEDDING_DIM:
|
||||
update_data["embedding"] = np.pad(
|
||||
np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant"
|
||||
).tolist()
|
||||
|
||||
# Update other fields
|
||||
for key, value in update_data.items():
|
||||
setattr(curr_passage, key, value)
|
||||
@@ -578,6 +648,22 @@ class PassageManager:
|
||||
|
||||
# Update the database record with values from the provided record
|
||||
update_data = passage.model_dump(to_orm=True, exclude_unset=True, exclude_none=True)
|
||||
|
||||
# Pad embeddings if needed (only when using Postgres as vector DB)
|
||||
if "embedding" in update_data and update_data["embedding"]:
|
||||
import numpy as np
|
||||
|
||||
from letta.helpers.tpuf_client import should_use_tpuf
|
||||
|
||||
# Always pad when writing to Postgres vector DB (don't pad for Turbopuffer/Pinecone)
|
||||
if not should_use_tpuf():
|
||||
embedding = update_data["embedding"]
|
||||
np_embedding = np.array(embedding)
|
||||
if np_embedding.shape[0] != MAX_EMBEDDING_DIM:
|
||||
update_data["embedding"] = np.pad(
|
||||
np_embedding, (0, MAX_EMBEDDING_DIM - np_embedding.shape[0]), mode="constant"
|
||||
).tolist()
|
||||
|
||||
for key, value in update_data.items():
|
||||
setattr(curr_passage, key, value)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user