feat: add semaphore to limit embeddings creation (#6261)

This commit is contained in:
cthomas
2025-11-19 13:53:31 -08:00
committed by Caren Thomas
parent ccafc6bef4
commit 6f810d95d8
5 changed files with 119 additions and 49 deletions

View File

@@ -1,5 +1,6 @@
"""Turbopuffer utilities for archival memory storage."""
import asyncio
import logging
from datetime import datetime, timezone
from typing import Any, Callable, List, Optional, Tuple
@@ -13,6 +14,10 @@ from letta.settings import model_settings, settings
logger = logging.getLogger(__name__)
# Global semaphore for Turbopuffer operations to prevent overwhelming the service
# This is separate from embedding semaphore since Turbopuffer can handle more concurrency
_GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5)
def should_use_tpuf() -> bool:
# We need OpenAI since we default to their embedding model
@@ -205,17 +210,19 @@ class TurbopufferClient:
}
try:
# Use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}")
return passages
# use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
# Use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}")
return passages
except Exception as e:
logger.error(f"Failed to insert passages to Turbopuffer: {e}")
@@ -333,17 +340,19 @@ class TurbopufferClient:
upsert_columns["template_id"] = template_ids
try:
# Use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}")
return True
# use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
# Use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}")
return True
except Exception as e:
logger.error(f"Failed to insert messages to Turbopuffer: {e}")
@@ -1259,17 +1268,19 @@ class TurbopufferClient:
}
try:
# use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}")
return passages
# use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
# use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}")
return passages
except Exception as e:
logger.error(f"Failed to insert file passages to Turbopuffer: {e}")

View File

@@ -1,5 +1,6 @@
import asyncio
import os
import time
from typing import List, Optional
import openai
@@ -748,7 +749,8 @@ class OpenAIClient(LLMClientBase):
if not inputs:
return []
logger.info(f"request_embeddings called with {len(inputs)} inputs, model={embedding_config.embedding_model}")
request_start = time.time()
logger.info(f"DIAGNOSTIC: request_embeddings called with {len(inputs)} inputs, model={embedding_config.embedding_model}")
# Validate inputs - OpenAI rejects empty strings or non-string values
# See: https://community.openai.com/t/embedding-api-change-input-is-invalid/707490/7
@@ -794,7 +796,7 @@ class OpenAIClient(LLMClientBase):
continue
logger.info(
f"Creating embedding task: start_idx={start_idx}, batch_size={len(chunk_inputs)}, "
f"DIAGNOSTIC: Creating embedding task: start_idx={start_idx}, batch_size={len(chunk_inputs)}, "
f"first_input_len={len(chunk_inputs[0]) if chunk_inputs else 0}, "
f"model={embedding_config.embedding_model}"
)
@@ -806,7 +808,15 @@ class OpenAIClient(LLMClientBase):
logger.warning("All chunks were empty, skipping embedding request")
break
gather_start = time.time()
logger.info(f"DIAGNOSTIC: Awaiting {len(tasks)} embedding API calls...")
task_results = await asyncio.gather(*tasks, return_exceptions=True)
gather_duration = time.time() - gather_start
if gather_duration > 1.0:
logger.warning(f"DIAGNOSTIC: SLOW embedding API gather took {gather_duration:.2f}s for {len(tasks)} tasks")
else:
logger.info(f"DIAGNOSTIC: Embedding API gather completed in {gather_duration:.2f}s")
failed_chunks = []
for (start_idx, chunk_inputs, current_batch_size), result in zip(task_metadata, task_results):
@@ -849,6 +859,14 @@ class OpenAIClient(LLMClientBase):
chunks_to_process = failed_chunks
total_duration = time.time() - request_start
if total_duration > 2.0:
logger.error(f"DIAGNOSTIC: BLOCKING DETECTED - request_embeddings took {total_duration:.2f}s for {len(inputs)} inputs")
elif total_duration > 1.0:
logger.warning(f"DIAGNOSTIC: Slow request_embeddings took {total_duration:.2f}s for {len(inputs)} inputs")
else:
logger.info(f"DIAGNOSTIC: request_embeddings completed in {total_duration:.2f}s")
return results
@trace_method

View File

@@ -1,4 +1,5 @@
import asyncio
import time
from typing import List, Optional, Tuple, cast
from letta.llm_api.llm_client import LLMClient
@@ -14,6 +15,10 @@ from letta.settings import model_settings
logger = get_logger(__name__)
# Global semaphore shared across ALL embedding operations to prevent overwhelming OpenAI API
# This ensures that even when processing multiple files concurrently, we don't exceed rate limits
_GLOBAL_EMBEDDING_SEMAPHORE = asyncio.Semaphore(3)
class OpenAIEmbedder(BaseEmbedder):
"""OpenAI-based embedding generation"""
@@ -134,6 +139,7 @@ class OpenAIEmbedder(BaseEmbedder):
chunk_indices = [i for i, _ in valid_chunks]
chunks_to_embed = [chunk for _, chunk in valid_chunks]
embedding_start = time.time()
logger.info(f"Generating embeddings for {len(chunks_to_embed)} chunks using {self.embedding_config.embedding_model}")
log_event(
"embedder.generation_started",
@@ -163,20 +169,23 @@ class OpenAIEmbedder(BaseEmbedder):
{"total_batches": len(batches), "batch_size": self.embedding_config.batch_size, "total_chunks": len(chunks_to_embed)},
)
# Use global semaphore to limit concurrent embedding requests across ALL file processing
# This prevents rate limiting even when processing multiple files simultaneously
async def process(batch: List[str], indices: List[int]):
try:
return await self._embed_batch(batch, indices)
except Exception as e:
logger.error("Failed to embed batch of size %s: %s", len(batch), e)
log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__})
raise
async with _GLOBAL_EMBEDDING_SEMAPHORE:
try:
return await self._embed_batch(batch, indices)
except Exception as e:
logger.error("Failed to embed batch of size %s: %s", len(batch), e)
log_event("embedder.batch_failed", {"batch_size": len(batch), "error": str(e), "error_type": type(e).__name__})
raise
# Execute all batches concurrently with semaphore control
# Execute all batches with global semaphore control to limit concurrency
tasks = [process(batch, indices) for batch, indices in zip(batches, batch_indices)]
log_event(
"embedder.concurrent_processing_started",
{"concurrent_tasks": len(tasks)},
{"concurrent_tasks": len(tasks), "max_concurrent_global": 3},
)
results = await asyncio.gather(*tasks)
log_event("embedder.concurrent_processing_completed", {"batches_processed": len(results)})
@@ -202,9 +211,16 @@ class OpenAIEmbedder(BaseEmbedder):
)
passages.append(passage)
logger.info(f"Successfully generated {len(passages)} embeddings")
embedding_duration = time.time() - embedding_start
logger.info(f"Successfully generated {len(passages)} embeddings (took {embedding_duration:.2f}s)")
log_event(
"embedder.generation_completed",
{"passages_created": len(passages), "total_chunks_processed": len(chunks_to_embed), "file_id": file_id, "source_id": source_id},
{
"passages_created": len(passages),
"total_chunks_processed": len(chunks_to_embed),
"file_id": file_id,
"source_id": source_id,
"duration_seconds": embedding_duration,
},
)
return passages

View File

@@ -1,3 +1,4 @@
import time
from typing import List, Optional
from letta.helpers.tpuf_client import TurbopufferClient
@@ -65,6 +66,7 @@ class TurbopufferEmbedder(BaseEmbedder):
try:
# insert passages to Turbopuffer - it will handle embedding generation internally
embedding_start = time.time()
passages = await self.tpuf_client.insert_file_passages(
source_id=source_id,
file_id=file_id,
@@ -72,8 +74,9 @@ class TurbopufferEmbedder(BaseEmbedder):
organization_id=actor.organization_id,
actor=actor,
)
embedding_duration = time.time() - embedding_start
logger.info(f"Successfully generated and stored {len(passages)} passages in Turbopuffer")
logger.info(f"Successfully generated and stored {len(passages)} passages in Turbopuffer (took {embedding_duration:.2f}s)")
log_event(
"turbopuffer_embedder.generation_completed",
{
@@ -81,6 +84,7 @@ class TurbopufferEmbedder(BaseEmbedder):
"total_chunks_processed": len(valid_chunks),
"file_id": file_id,
"source_id": source_id,
"duration_seconds": embedding_duration,
},
)
return passages

View File

@@ -1,3 +1,5 @@
import asyncio
import time
from typing import List
from mistralai import OCRPageObject, OCRResponse, OCRUsageInfo
@@ -55,7 +57,14 @@ class FileProcessor:
try:
all_chunks = []
for page in ocr_response.pages:
chunks = text_chunker.chunk_text(page)
# Run CPU-intensive chunking in thread pool to avoid blocking event loop
chunking_start = time.time()
chunks = await asyncio.to_thread(text_chunker.chunk_text, page)
chunking_duration = time.time() - chunking_start
if chunking_duration > 0.5:
logger.warning(f"Slow chunking operation for {filename}: {chunking_duration:.2f}s")
if not chunks:
log_event(
"file_processor.chunking_failed",
@@ -97,7 +106,14 @@ class FileProcessor:
all_chunks = []
for page in ocr_response.pages:
chunks = text_chunker.default_chunk_text(page)
# Run CPU-intensive default chunking in thread pool to avoid blocking event loop
chunking_start = time.time()
chunks = await asyncio.to_thread(text_chunker.default_chunk_text, page)
chunking_duration = time.time() - chunking_start
if chunking_duration > 0.5:
logger.warning(f"Slow default chunking operation for {filename}: {chunking_duration:.2f}s")
if not chunks:
log_event(
"file_processor.default_chunking_failed",
@@ -308,6 +324,7 @@ class FileProcessor:
return []
content = file_metadata.content
processing_start = time.time()
try:
# Create OCR response from existing content
ocr_response = self._create_ocr_response_from_content(content)
@@ -345,7 +362,10 @@ class FileProcessor:
file_id=file_metadata.id, actor=self.actor, total_chunks=len(all_passages), chunks_embedded=0
)
logger.info(f"Successfully processed imported file {filename}: {len(all_passages)} passages")
processing_duration = time.time() - processing_start
logger.info(
f"Successfully processed imported file {filename}: {len(all_passages)} passages (total time: {processing_duration:.2f}s)"
)
log_event(
"file_processor.import_processing_completed",
{
@@ -353,6 +373,7 @@ class FileProcessor:
"file_id": str(file_metadata.id),
"total_passages": len(all_passages),
"status": FileProcessingStatus.COMPLETED.value,
"total_duration_seconds": processing_duration,
},
)