diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index 992cfd33..a550f18d 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -20,6 +20,52 @@ logger = logging.getLogger(__name__) _GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5) +def _run_turbopuffer_write_in_thread( + api_key: str, + region: str, + namespace_name: str, + upsert_columns: dict = None, + deletes: list = None, + delete_by_filter: tuple = None, + distance_metric: str = "cosine_distance", + schema: dict = None, +): + """ + Sync wrapper to run turbopuffer write in isolated event loop. + + Turbopuffer's async write() does CPU-intensive base64 encoding of vectors + synchronously in async functions, blocking the event loop. Running it in + a thread pool with an isolated event loop prevents blocking. + """ + from turbopuffer import AsyncTurbopuffer + + # Create new event loop for this worker thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def do_write(): + async with AsyncTurbopuffer(api_key=api_key, region=region) as client: + namespace = client.namespace(namespace_name) + + # Build write kwargs + kwargs = {"distance_metric": distance_metric} + if upsert_columns: + kwargs["upsert_columns"] = upsert_columns + if deletes: + kwargs["deletes"] = deletes + if delete_by_filter: + kwargs["delete_by_filter"] = delete_by_filter + if schema: + kwargs["schema"] = schema + + return await namespace.write(**kwargs) + + return loop.run_until_complete(do_write()) + finally: + loop.close() + + def should_use_tpuf() -> bool: # We need OpenAI since we default to their embedding model return bool(settings.use_tpuf) and bool(settings.tpuf_api_key) and bool(model_settings.openai_api_key) @@ -246,16 +292,20 @@ class TurbopufferClient: } try: + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} tools to Turbopuffer") - return True + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} tools to Turbopuffer") + return True except Exception as e: logger.error(f"Failed to insert tools to Turbopuffer: {e}") @@ -367,19 +417,20 @@ class TurbopufferClient: } try: - # use global semaphore to limit concurrent Turbopuffer writes + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - # Use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}") - return passages + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}") + return passages except Exception as e: logger.error(f"Failed to insert passages to Turbopuffer: {e}") @@ -508,22 +559,23 @@ class TurbopufferClient: upsert_columns["template_id"] = template_ids_list try: - # use global semaphore to limit concurrent Turbopuffer writes + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - # Use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={ - "text": {"type": "string", "full_text_search": True}, - "conversation_id": {"type": "string"}, - }, - ) - logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}") - return True + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={ + "text": {"type": "string", "full_text_search": True}, + "conversation_id": {"type": "string"}, + }, + ) + logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}") + return True except Exception as e: logger.error(f"Failed to insert messages to Turbopuffer: {e}") @@ -1308,12 +1360,16 @@ class TurbopufferClient: namespace_name = await self._get_archive_namespace_name(archive_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use write API with deletes parameter as per Turbopuffer docs - await namespace.write(deletes=[passage_id]) - logger.info(f"Successfully deleted passage {passage_id} from Turbopuffer archive {archive_id}") - return True + # Run in thread pool for consistency (deletes are lightweight but use same wrapper) + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=[passage_id], + ) + logger.info(f"Successfully deleted passage {passage_id} from Turbopuffer archive {archive_id}") + return True except Exception as e: logger.error(f"Failed to delete passage from Turbopuffer: {e}") raise @@ -1329,12 +1385,16 @@ class TurbopufferClient: namespace_name = await self._get_archive_namespace_name(archive_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use write API with deletes parameter as per Turbopuffer docs - await namespace.write(deletes=passage_ids) - logger.info(f"Successfully deleted {len(passage_ids)} passages from Turbopuffer archive {archive_id}") - return True + # Run in thread pool for consistency + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=passage_ids, + ) + logger.info(f"Successfully deleted {len(passage_ids)} passages from Turbopuffer archive {archive_id}") + return True except Exception as e: logger.error(f"Failed to delete passages from Turbopuffer: {e}") raise @@ -1368,12 +1428,16 @@ class TurbopufferClient: namespace_name = await self._get_message_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use write API with deletes parameter as per Turbopuffer docs - await namespace.write(deletes=message_ids) - logger.info(f"Successfully deleted {len(message_ids)} messages from Turbopuffer for agent {agent_id}") - return True + # Run in thread pool for consistency + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=message_ids, + ) + logger.info(f"Successfully deleted {len(message_ids)} messages from Turbopuffer for agent {agent_id}") + return True except Exception as e: logger.error(f"Failed to delete messages from Turbopuffer: {e}") raise @@ -1386,13 +1450,16 @@ class TurbopufferClient: namespace_name = await self._get_message_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use delete_by_filter to only delete messages for this agent - # since namespace is now org-scoped - result = await namespace.write(delete_by_filter=("agent_id", "Eq", agent_id)) - logger.info(f"Successfully deleted all messages for agent {agent_id} (deleted {result.rows_affected} rows)") - return True + # Run in thread pool for consistency + result = await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + delete_by_filter=("agent_id", "Eq", agent_id), + ) + logger.info(f"Successfully deleted all messages for agent {agent_id} (deleted {result.rows_affected if result else 0} rows)") + return True except Exception as e: logger.error(f"Failed to delete all messages from Turbopuffer: {e}") raise @@ -1511,19 +1578,20 @@ class TurbopufferClient: } try: - # use global semaphore to limit concurrent Turbopuffer writes + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - # use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}") - return passages + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}") + return passages except Exception as e: logger.error(f"Failed to insert file passages to Turbopuffer: {e}") @@ -1680,16 +1748,22 @@ class TurbopufferClient: namespace_name = await self._get_file_passages_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # use delete_by_filter to only delete passages for this file - # need to filter by both source_id and file_id - filter_expr = ("And", [("source_id", "Eq", source_id), ("file_id", "Eq", file_id)]) - result = await namespace.write(delete_by_filter=filter_expr) - logger.info( - f"Successfully deleted passages for file {file_id} from source {source_id} (deleted {result.rows_affected} rows)" - ) - return True + # use delete_by_filter to only delete passages for this file + # need to filter by both source_id and file_id + filter_expr = ("And", [("source_id", "Eq", source_id), ("file_id", "Eq", file_id)]) + + # Run in thread pool for consistency + result = await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + delete_by_filter=filter_expr, + ) + logger.info( + f"Successfully deleted passages for file {file_id} from source {source_id} (deleted {result.rows_affected if result else 0} rows)" + ) + return True except Exception as e: logger.error(f"Failed to delete file passages from Turbopuffer: {e}") raise @@ -1702,12 +1776,16 @@ class TurbopufferClient: namespace_name = await self._get_file_passages_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # delete all passages for this source - result = await namespace.write(delete_by_filter=("source_id", "Eq", source_id)) - logger.info(f"Successfully deleted all passages for source {source_id} (deleted {result.rows_affected} rows)") - return True + # Run in thread pool for consistency + result = await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + delete_by_filter=("source_id", "Eq", source_id), + ) + logger.info(f"Successfully deleted all passages for source {source_id} (deleted {result.rows_affected if result else 0} rows)") + return True except Exception as e: logger.error(f"Failed to delete source passages from Turbopuffer: {e}") raise @@ -1733,11 +1811,16 @@ class TurbopufferClient: namespace_name = await self._get_tool_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - await namespace.write(deletes=tool_ids) - logger.info(f"Successfully deleted {len(tool_ids)} tools from Turbopuffer") - return True + # Run in thread pool for consistency + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=tool_ids, + ) + logger.info(f"Successfully deleted {len(tool_ids)} tools from Turbopuffer") + return True except Exception as e: logger.error(f"Failed to delete tools from Turbopuffer: {e}") raise