feat: Throttle pinecone uploads (#3373)
This commit is contained in:
@@ -378,3 +378,4 @@ PINECONE_MAX_RETRY_ATTEMPTS = 3
|
||||
PINECONE_RETRY_BASE_DELAY = 1.0 # seconds
|
||||
PINECONE_RETRY_MAX_DELAY = 60.0 # seconds
|
||||
PINECONE_RETRY_BACKOFF_FACTOR = 2.0
|
||||
PINECONE_THROTTLE_DELAY = 0.75 # seconds base delay between batches
|
||||
|
||||
@@ -31,6 +31,7 @@ from letta.constants import (
|
||||
PINECONE_RETRY_BASE_DELAY,
|
||||
PINECONE_RETRY_MAX_DELAY,
|
||||
PINECONE_TEXT_FIELD_NAME,
|
||||
PINECONE_THROTTLE_DELAY,
|
||||
)
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.user import User
|
||||
@@ -256,6 +257,13 @@ async def upsert_records_to_pinecone_index(records: List[dict], actor: User):
|
||||
logger.debug(f"[Pinecone] Upserting batch {batch_num}/{total_batches} with {len(batch)} records")
|
||||
await dense_index.upsert_records(actor.organization_id, batch)
|
||||
|
||||
# throttle between batches (except the last one)
|
||||
if batch_num < total_batches:
|
||||
jitter = random.uniform(0, PINECONE_THROTTLE_DELAY * 0.2) # ±20% jitter
|
||||
throttle_delay = PINECONE_THROTTLE_DELAY + jitter
|
||||
logger.debug(f"[Pinecone] Throttling for {throttle_delay:.3f}s before next batch")
|
||||
await asyncio.sleep(throttle_delay)
|
||||
|
||||
logger.info(f"[Pinecone] Successfully upserted all {len(records)} records in {total_batches} batches")
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user