From 8f1640b2ef1dfffb5515f5df5c8232d1dcee2bc4 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Mon, 7 Jul 2025 12:18:39 -0700 Subject: [PATCH] feat: Add batching to record upsert (#3196) --- letta/constants.py | 1 + letta/helpers/pinecone_utils.py | 14 ++++++++++++-- letta/server/rest_api/routers/v1/sources.py | 4 +++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/letta/constants.py b/letta/constants.py index bd8d2c49..5b7725bb 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -371,3 +371,4 @@ PINECONE_TEXT_FIELD_NAME = "chunk_text" PINECONE_METRIC = "cosine" PINECONE_CLOUD = "aws" PINECONE_REGION = "us-east-1" +PINECONE_MAX_BATCH_SIZE = 96 diff --git a/letta/helpers/pinecone_utils.py b/letta/helpers/pinecone_utils.py index d7cd1d4a..17209946 100644 --- a/letta/helpers/pinecone_utils.py +++ b/letta/helpers/pinecone_utils.py @@ -1,8 +1,16 @@ +import asyncio from typing import Any, Dict, List from pinecone import PineconeAsyncio -from letta.constants import PINECONE_CLOUD, PINECONE_EMBEDDING_MODEL, PINECONE_METRIC, PINECONE_REGION, PINECONE_TEXT_FIELD_NAME +from letta.constants import ( + PINECONE_CLOUD, + PINECONE_EMBEDDING_MODEL, + PINECONE_MAX_BATCH_SIZE, + PINECONE_METRIC, + PINECONE_REGION, + PINECONE_TEXT_FIELD_NAME, +) from letta.log import get_logger from letta.schemas.user import User from letta.settings import settings @@ -90,7 +98,9 @@ async def upsert_records_to_pinecone_index(records: List[dict], actor: User): async with PineconeAsyncio(api_key=settings.pinecone_api_key) as pc: description = await pc.describe_index(name=settings.pinecone_source_index) async with pc.IndexAsyncio(host=description.index.host) as dense_index: - await dense_index.upsert_records(actor.organization_id, records) + # Process records in batches to avoid exceeding Pinecone limits + batches = [records[i : i + PINECONE_MAX_BATCH_SIZE] for i in range(0, len(records), PINECONE_MAX_BATCH_SIZE)] + await asyncio.gather(*[dense_index.upsert_records(actor.organization_id, batch) for batch in batches]) async def search_pinecone_index(query: str, limit: int, filter: Dict[str, Any], actor: User) -> Dict[str, Any]: diff --git a/letta/server/rest_api/routers/v1/sources.py b/letta/server/rest_api/routers/v1/sources.py index 3e715e66..273d8918 100644 --- a/letta/server/rest_api/routers/v1/sources.py +++ b/letta/server/rest_api/routers/v1/sources.py @@ -343,7 +343,9 @@ async def get_file_metadata( if should_use_pinecone() and not file_metadata.is_processing_terminal(): ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor, limit=file_metadata.total_chunks) - logger.info(f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} in organization {actor.organization_id}") + logger.info( + f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} ({file_metadata.file_name}) in organization {actor.organization_id}" + ) if len(ids) != file_metadata.chunks_embedded or len(ids) == file_metadata.total_chunks: if len(ids) != file_metadata.total_chunks: