diff --git a/letta/constants.py b/letta/constants.py index 05711ab3..210319ab 100644 --- a/letta/constants.py +++ b/letta/constants.py @@ -378,3 +378,4 @@ PINECONE_MAX_RETRY_ATTEMPTS = 3 PINECONE_RETRY_BASE_DELAY = 1.0 # seconds PINECONE_RETRY_MAX_DELAY = 60.0 # seconds PINECONE_RETRY_BACKOFF_FACTOR = 2.0 +PINECONE_THROTTLE_DELAY = 0.75 # seconds base delay between batches diff --git a/letta/helpers/pinecone_utils.py b/letta/helpers/pinecone_utils.py index 7caa5280..e014061b 100644 --- a/letta/helpers/pinecone_utils.py +++ b/letta/helpers/pinecone_utils.py @@ -31,6 +31,7 @@ from letta.constants import ( PINECONE_RETRY_BASE_DELAY, PINECONE_RETRY_MAX_DELAY, PINECONE_TEXT_FIELD_NAME, + PINECONE_THROTTLE_DELAY, ) from letta.log import get_logger from letta.schemas.user import User @@ -256,6 +257,13 @@ async def upsert_records_to_pinecone_index(records: List[dict], actor: User): logger.debug(f"[Pinecone] Upserting batch {batch_num}/{total_batches} with {len(batch)} records") await dense_index.upsert_records(actor.organization_id, batch) + # throttle between batches (except the last one) + if batch_num < total_batches: + jitter = random.uniform(0, PINECONE_THROTTLE_DELAY * 0.2) # ±20% jitter + throttle_delay = PINECONE_THROTTLE_DELAY + jitter + logger.debug(f"[Pinecone] Throttling for {throttle_delay:.3f}s before next batch") + await asyncio.sleep(throttle_delay) + logger.info(f"[Pinecone] Successfully upserted all {len(records)} records in {total_batches} batches")