feat: Add batching to record upsert (#3196)
This commit is contained in:
@@ -371,3 +371,4 @@ PINECONE_TEXT_FIELD_NAME = "chunk_text"
|
||||
PINECONE_METRIC = "cosine"
|
||||
PINECONE_CLOUD = "aws"
|
||||
PINECONE_REGION = "us-east-1"
|
||||
PINECONE_MAX_BATCH_SIZE = 96
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
import asyncio
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from pinecone import PineconeAsyncio
|
||||
|
||||
from letta.constants import PINECONE_CLOUD, PINECONE_EMBEDDING_MODEL, PINECONE_METRIC, PINECONE_REGION, PINECONE_TEXT_FIELD_NAME
|
||||
from letta.constants import (
|
||||
PINECONE_CLOUD,
|
||||
PINECONE_EMBEDDING_MODEL,
|
||||
PINECONE_MAX_BATCH_SIZE,
|
||||
PINECONE_METRIC,
|
||||
PINECONE_REGION,
|
||||
PINECONE_TEXT_FIELD_NAME,
|
||||
)
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.user import User
|
||||
from letta.settings import settings
|
||||
@@ -90,7 +98,9 @@ async def upsert_records_to_pinecone_index(records: List[dict], actor: User):
|
||||
async with PineconeAsyncio(api_key=settings.pinecone_api_key) as pc:
|
||||
description = await pc.describe_index(name=settings.pinecone_source_index)
|
||||
async with pc.IndexAsyncio(host=description.index.host) as dense_index:
|
||||
await dense_index.upsert_records(actor.organization_id, records)
|
||||
# Process records in batches to avoid exceeding Pinecone limits
|
||||
batches = [records[i : i + PINECONE_MAX_BATCH_SIZE] for i in range(0, len(records), PINECONE_MAX_BATCH_SIZE)]
|
||||
await asyncio.gather(*[dense_index.upsert_records(actor.organization_id, batch) for batch in batches])
|
||||
|
||||
|
||||
async def search_pinecone_index(query: str, limit: int, filter: Dict[str, Any], actor: User) -> Dict[str, Any]:
|
||||
|
||||
@@ -343,7 +343,9 @@ async def get_file_metadata(
|
||||
|
||||
if should_use_pinecone() and not file_metadata.is_processing_terminal():
|
||||
ids = await list_pinecone_index_for_files(file_id=file_id, actor=actor, limit=file_metadata.total_chunks)
|
||||
logger.info(f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} in organization {actor.organization_id}")
|
||||
logger.info(
|
||||
f"Embedded chunks {len(ids)}/{file_metadata.total_chunks} for {file_id} ({file_metadata.file_name}) in organization {actor.organization_id}"
|
||||
)
|
||||
|
||||
if len(ids) != file_metadata.chunks_embedded or len(ids) == file_metadata.total_chunks:
|
||||
if len(ids) != file_metadata.total_chunks:
|
||||
|
||||
Reference in New Issue
Block a user