From 6f810d95d895874a88cde72c99a7f41892a0cc63 Mon Sep 17 00:00:00 2001 From: cthomas Date: Wed, 19 Nov 2025 13:53:31 -0800 Subject: [PATCH] feat: add semaphore to limit embeddings creation (#6261) --- letta/helpers/tpuf_client.py | 77 +++++++++++-------- letta/llm_api/openai_client.py | 22 +++++- .../embedder/openai_embedder.py | 36 ++++++--- .../embedder/turbopuffer_embedder.py | 6 +- .../services/file_processor/file_processor.py | 27 ++++++- 5 files changed, 119 insertions(+), 49 deletions(-) diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index f19a3aeb..52c4c62d 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -1,5 +1,6 @@ """Turbopuffer utilities for archival memory storage.""" +import asyncio import logging from datetime import datetime, timezone from typing import Any, Callable, List, Optional, Tuple @@ -13,6 +14,10 @@ from letta.settings import model_settings, settings logger = logging.getLogger(__name__) +# Global semaphore for Turbopuffer operations to prevent overwhelming the service +# This is separate from embedding semaphore since Turbopuffer can handle more concurrency +_GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5) + def should_use_tpuf() -> bool: # We need OpenAI since we default to their embedding model @@ -205,17 +210,19 @@ class TurbopufferClient: } try: - # Use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}") - return passages + # use global semaphore to limit concurrent Turbopuffer writes + async with _GLOBAL_TURBOPUFFER_SEMAPHORE: + # Use AsyncTurbopuffer as a context manager for proper resource cleanup + async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: + namespace = client.namespace(namespace_name) + # turbopuffer recommends column-based writes for performance + await namespace.write( + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}") + return passages except Exception as e: logger.error(f"Failed to insert passages to Turbopuffer: {e}") @@ -333,17 +340,19 @@ class TurbopufferClient: upsert_columns["template_id"] = template_ids try: - # Use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}") - return True + # use global semaphore to limit concurrent Turbopuffer writes + async with _GLOBAL_TURBOPUFFER_SEMAPHORE: + # Use AsyncTurbopuffer as a context manager for proper resource cleanup + async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: + namespace = client.namespace(namespace_name) + # turbopuffer recommends column-based writes for performance + await namespace.write( + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}") + return True except Exception as e: logger.error(f"Failed to insert messages to Turbopuffer: {e}") @@ -1259,17 +1268,19 @@ class TurbopufferClient: } try: - # use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}") - return passages + # use global semaphore to limit concurrent Turbopuffer writes + async with _GLOBAL_TURBOPUFFER_SEMAPHORE: + # use AsyncTurbopuffer as a context manager for proper resource cleanup + async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: + namespace = client.namespace(namespace_name) + # turbopuffer recommends column-based writes for performance + await namespace.write( + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}") + return passages except Exception as e: logger.error(f"Failed to insert file passages to Turbopuffer: {e}") diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index 232eb6d0..930e29b0 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -1,5 +1,6 @@ import asyncio import os +import time from typing import List, Optional import openai @@ -748,7 +749,8 @@ class OpenAIClient(LLMClientBase): if not inputs: return [] - logger.info(f"request_embeddings called with {len(inputs)} inputs, model={embedding_config.embedding_model}") + request_start = time.time() + logger.info(f"DIAGNOSTIC: request_embeddings called with {len(inputs)} inputs, model={embedding_config.embedding_model}") # Validate inputs - OpenAI rejects empty strings or non-string values # See: https://community.openai.com/t/embedding-api-change-input-is-invalid/707490/7 @@ -794,7 +796,7 @@ class OpenAIClient(LLMClientBase): continue logger.info( - f"Creating embedding task: start_idx={start_idx}, batch_size={len(chunk_inputs)}, " + f"DIAGNOSTIC: Creating embedding task: start_idx={start_idx}, batch_size={len(chunk_inputs)}, " f"first_input_len={len(chunk_inputs[0]) if chunk_inputs else 0}, " f"model={embedding_config.embedding_model}" ) @@ -806,7 +808,15 @@ class OpenAIClient(LLMClientBase): logger.warning("All chunks were empty, skipping embedding request") break + gather_start = time.time() + logger.info(f"DIAGNOSTIC: Awaiting {len(tasks)} embedding API calls...") task_results = await asyncio.gather(*tasks, return_exceptions=True) + gather_duration = time.time() - gather_start + + if gather_duration > 1.0: + logger.warning(f"DIAGNOSTIC: SLOW embedding API gather took {gather_duration:.2f}s for {len(tasks)} tasks") + else: + logger.info(f"DIAGNOSTIC: Embedding API gather completed in {gather_duration:.2f}s") failed_chunks = [] for (start_idx, chunk_inputs, current_batch_size), result in zip(task_metadata, task_results): @@ -849,6 +859,14 @@ class OpenAIClient(LLMClientBase): chunks_to_process = failed_chunks + total_duration = time.time() - request_start + if total_duration > 2.0: + logger.error(f"DIAGNOSTIC: BLOCKING DETECTED - request_embeddings took {total_duration:.2f}s for {len(inputs)} inputs") + elif total_duration > 1.0: + logger.warning(f"DIAGNOSTIC: Slow request_embeddings took {total_duration:.2f}s for {len(inputs)} inputs") + else: + logger.info(f"DIAGNOSTIC: request_embeddings completed in {total_duration:.2f}s") + return results @trace_method diff --git a/letta/services/file_processor/embedder/openai_embedder.py b/letta/services/file_processor/embedder/openai_embedder.py index dcb67866..4f979e1f 100644 --- a/letta/services/file_processor/embedder/openai_embedder.py +++ b/letta/services/file_processor/embedder/openai_embedder.py @@ -1,4 +1,5 @@ import asyncio +import time from typing import List, Optional, Tuple, cast from letta.llm_api.llm_client import LLMClient @@ -14,6 +15,10 @@ from letta.settings import model_settings logger = get_logger(__name__) +# Global semaphore shared across ALL embedding operations to prevent overwhelming OpenAI API +# This ensures that even when processing multiple files concurrently, we don't exceed rate limits +_GLOBAL_EMBEDDING_SEMAPHORE = asyncio.Semaphore(3) + class OpenAIEmbedder(BaseEmbedder): """OpenAI-based embedding generation""" @@ -134,6 +139,7 @@ class OpenAIEmbedder(BaseEmbedder): chunk_indices = [i for i, _ in valid_chunks] chunks_to_embed = [chunk for _, chunk in valid_chunks] + embedding_start = time.time() logger.info(f"Generating embeddings for {len(chunks_to_embed)} chunks using {self.embedding_config.embedding_model}") log_event( "embedder.generation_started", @@ -163,20 +169,23 @@ class OpenAIEmbedder(BaseEmbedder): {"total_batches": len(batches), "batch_size": self.embedding_config.batch_size, "total_chunks": len(chunks_to_embed)}, ) + # Use global semaphore to limit concurrent embedding requests across ALL file processing + # This prevents rate limiting even when processing multiple files simultaneously async def process(batch: List[str], indices: List[int]): - try: - return await self._embed_batch(batch, indices) - except Exception as e: - logger.error("Failed to embed batch of size %s: %s", len(batch), e) - log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__}) - raise + async with _GLOBAL_EMBEDDING_SEMAPHORE: + try: + return await self._embed_batch(batch, indices) + except Exception as e: + logger.error("Failed to embed batch of size %s: %s", len(batch), e) + log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__}) + raise - # Execute all batches concurrently with semaphore control + # Execute all batches with global semaphore control to limit concurrency tasks = [process(batch, indices) for batch, indices in zip(batches, batch_indices)] log_event( "embedder.concurrent_processing_started", - {"concurrent_tasks": len(tasks)}, + {"concurrent_tasks": len(tasks), "max_concurrent_global": 3}, ) results = await asyncio.gather(*tasks) log_event("embedder.concurrent_processing_completed", {"batches_processed": len(results)}) @@ -202,9 +211,16 @@ class OpenAIEmbedder(BaseEmbedder): ) passages.append(passage) - logger.info(f"Successfully generated {len(passages)} embeddings") + embedding_duration = time.time() - embedding_start + logger.info(f"Successfully generated {len(passages)} embeddings (took {embedding_duration:.2f}s)") log_event( "embedder.generation_completed", - {"passages_created": len(passages), "total_chunks_processed": len(chunks_to_embed), "file_id": file_id, "source_id": source_id}, + { + "passages_created": len(passages), + "total_chunks_processed": len(chunks_to_embed), + "file_id": file_id, + "source_id": source_id, + "duration_seconds": embedding_duration, + }, ) return passages diff --git a/letta/services/file_processor/embedder/turbopuffer_embedder.py b/letta/services/file_processor/embedder/turbopuffer_embedder.py index 1b4617ce..f3b663cb 100644 --- a/letta/services/file_processor/embedder/turbopuffer_embedder.py +++ b/letta/services/file_processor/embedder/turbopuffer_embedder.py @@ -1,3 +1,4 @@ +import time from typing import List, Optional from letta.helpers.tpuf_client import TurbopufferClient @@ -65,6 +66,7 @@ class TurbopufferEmbedder(BaseEmbedder): try: # insert passages to Turbopuffer - it will handle embedding generation internally + embedding_start = time.time() passages = await self.tpuf_client.insert_file_passages( source_id=source_id, file_id=file_id, @@ -72,8 +74,9 @@ class TurbopufferEmbedder(BaseEmbedder): organization_id=actor.organization_id, actor=actor, ) + embedding_duration = time.time() - embedding_start - logger.info(f"Successfully generated and stored {len(passages)} passages in Turbopuffer") + logger.info(f"Successfully generated and stored {len(passages)} passages in Turbopuffer (took {embedding_duration:.2f}s)") log_event( "turbopuffer_embedder.generation_completed", { @@ -81,6 +84,7 @@ class TurbopufferEmbedder(BaseEmbedder): "total_chunks_processed": len(valid_chunks), "file_id": file_id, "source_id": source_id, + "duration_seconds": embedding_duration, }, ) return passages diff --git a/letta/services/file_processor/file_processor.py b/letta/services/file_processor/file_processor.py index 83260c06..ed4e0c5b 100644 --- a/letta/services/file_processor/file_processor.py +++ b/letta/services/file_processor/file_processor.py @@ -1,3 +1,5 @@ +import asyncio +import time from typing import List from mistralai import OCRPageObject, OCRResponse, OCRUsageInfo @@ -55,7 +57,14 @@ class FileProcessor: try: all_chunks = [] for page in ocr_response.pages: - chunks = text_chunker.chunk_text(page) + # Run CPU-intensive chunking in thread pool to avoid blocking event loop + chunking_start = time.time() + chunks = await asyncio.to_thread(text_chunker.chunk_text, page) + chunking_duration = time.time() - chunking_start + + if chunking_duration > 0.5: + logger.warning(f"Slow chunking operation for {filename}: {chunking_duration:.2f}s") + if not chunks: log_event( "file_processor.chunking_failed", @@ -97,7 +106,14 @@ class FileProcessor: all_chunks = [] for page in ocr_response.pages: - chunks = text_chunker.default_chunk_text(page) + # Run CPU-intensive default chunking in thread pool to avoid blocking event loop + chunking_start = time.time() + chunks = await asyncio.to_thread(text_chunker.default_chunk_text, page) + chunking_duration = time.time() - chunking_start + + if chunking_duration > 0.5: + logger.warning(f"Slow default chunking operation for {filename}: {chunking_duration:.2f}s") + if not chunks: log_event( "file_processor.default_chunking_failed", @@ -308,6 +324,7 @@ class FileProcessor: return [] content = file_metadata.content + processing_start = time.time() try: # Create OCR response from existing content ocr_response = self._create_ocr_response_from_content(content) @@ -345,7 +362,10 @@ class FileProcessor: file_id=file_metadata.id, actor=self.actor, total_chunks=len(all_passages), chunks_embedded=0 ) - logger.info(f"Successfully processed imported file {filename}: {len(all_passages)} passages") + processing_duration = time.time() - processing_start + logger.info( + f"Successfully processed imported file {filename}: {len(all_passages)} passages (total time: {processing_duration:.2f}s)" + ) log_event( "file_processor.import_processing_completed", { @@ -353,6 +373,7 @@ class FileProcessor: "file_id": str(file_metadata.id), "total_passages": len(all_passages), "status": FileProcessingStatus.COMPLETED.value, + "total_duration_seconds": processing_duration, }, )