From 870c5955d96eba9cdfc8396cec2b2bffa26bc011 Mon Sep 17 00:00:00 2001 From: cthomas Date: Mon, 12 Jan 2026 13:36:42 -0800 Subject: [PATCH] fix: wrap tpuf operations in thread pool (#8615) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: wrap turbopuffer vector writes in thread pool Turbopuffer library does CPU-intensive base64 encoding of vectors synchronously in async functions (_async_transform_recursive → b64encode_vector), blocking the event loop during file uploads. Solution: Created _run_turbopuffer_write_in_thread() helper that runs turbopuffer writes in an isolated event loop within a worker thread. Applied to all vector write operations: - insert_tools() - insert_archival_memories() - insert_messages() - insert_file_passages() This prevents pybase64.b64encode_as_string() from blocking the main event loop during vector encoding. 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix: wrap all turbopuffer operations in thread pool Extended the thread pool wrapping to ALL turbopuffer write operations, including delete operations, for complete isolation from the main event loop. All turbopuffer namespace.write() calls now run in isolated event loops within worker threads, preventing any potential CPU work from blocking. 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta --------- Co-authored-by: Letta --- letta/helpers/tpuf_client.py | 271 +++++++++++++++++++++++------------ 1 file changed, 177 insertions(+), 94 deletions(-) diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index 992cfd33..a550f18d 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -20,6 +20,52 @@ logger = logging.getLogger(__name__) _GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5) +def _run_turbopuffer_write_in_thread( + api_key: str, + region: str, + namespace_name: str, + upsert_columns: dict = None, + deletes: list = None, + delete_by_filter: tuple = None, + distance_metric: str = "cosine_distance", + schema: dict = None, +): + """ + Sync wrapper to run turbopuffer write in isolated event loop. + + Turbopuffer's async write() does CPU-intensive base64 encoding of vectors + synchronously in async functions, blocking the event loop. Running it in + a thread pool with an isolated event loop prevents blocking. + """ + from turbopuffer import AsyncTurbopuffer + + # Create new event loop for this worker thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + + async def do_write(): + async with AsyncTurbopuffer(api_key=api_key, region=region) as client: + namespace = client.namespace(namespace_name) + + # Build write kwargs + kwargs = {"distance_metric": distance_metric} + if upsert_columns: + kwargs["upsert_columns"] = upsert_columns + if deletes: + kwargs["deletes"] = deletes + if delete_by_filter: + kwargs["delete_by_filter"] = delete_by_filter + if schema: + kwargs["schema"] = schema + + return await namespace.write(**kwargs) + + return loop.run_until_complete(do_write()) + finally: + loop.close() + + def should_use_tpuf() -> bool: # We need OpenAI since we default to their embedding model return bool(settings.use_tpuf) and bool(settings.tpuf_api_key) and bool(model_settings.openai_api_key) @@ -246,16 +292,20 @@ class TurbopufferClient: } try: + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} tools to Turbopuffer") - return True + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} tools to Turbopuffer") + return True except Exception as e: logger.error(f"Failed to insert tools to Turbopuffer: {e}") @@ -367,19 +417,20 @@ class TurbopufferClient: } try: - # use global semaphore to limit concurrent Turbopuffer writes + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - # Use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}") - return passages + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}") + return passages except Exception as e: logger.error(f"Failed to insert passages to Turbopuffer: {e}") @@ -508,22 +559,23 @@ class TurbopufferClient: upsert_columns["template_id"] = template_ids_list try: - # use global semaphore to limit concurrent Turbopuffer writes + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - # Use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={ - "text": {"type": "string", "full_text_search": True}, - "conversation_id": {"type": "string"}, - }, - ) - logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}") - return True + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={ + "text": {"type": "string", "full_text_search": True}, + "conversation_id": {"type": "string"}, + }, + ) + logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}") + return True except Exception as e: logger.error(f"Failed to insert messages to Turbopuffer: {e}") @@ -1308,12 +1360,16 @@ class TurbopufferClient: namespace_name = await self._get_archive_namespace_name(archive_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use write API with deletes parameter as per Turbopuffer docs - await namespace.write(deletes=[passage_id]) - logger.info(f"Successfully deleted passage {passage_id} from Turbopuffer archive {archive_id}") - return True + # Run in thread pool for consistency (deletes are lightweight but use same wrapper) + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=[passage_id], + ) + logger.info(f"Successfully deleted passage {passage_id} from Turbopuffer archive {archive_id}") + return True except Exception as e: logger.error(f"Failed to delete passage from Turbopuffer: {e}") raise @@ -1329,12 +1385,16 @@ class TurbopufferClient: namespace_name = await self._get_archive_namespace_name(archive_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use write API with deletes parameter as per Turbopuffer docs - await namespace.write(deletes=passage_ids) - logger.info(f"Successfully deleted {len(passage_ids)} passages from Turbopuffer archive {archive_id}") - return True + # Run in thread pool for consistency + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=passage_ids, + ) + logger.info(f"Successfully deleted {len(passage_ids)} passages from Turbopuffer archive {archive_id}") + return True except Exception as e: logger.error(f"Failed to delete passages from Turbopuffer: {e}") raise @@ -1368,12 +1428,16 @@ class TurbopufferClient: namespace_name = await self._get_message_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use write API with deletes parameter as per Turbopuffer docs - await namespace.write(deletes=message_ids) - logger.info(f"Successfully deleted {len(message_ids)} messages from Turbopuffer for agent {agent_id}") - return True + # Run in thread pool for consistency + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=message_ids, + ) + logger.info(f"Successfully deleted {len(message_ids)} messages from Turbopuffer for agent {agent_id}") + return True except Exception as e: logger.error(f"Failed to delete messages from Turbopuffer: {e}") raise @@ -1386,13 +1450,16 @@ class TurbopufferClient: namespace_name = await self._get_message_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # Use delete_by_filter to only delete messages for this agent - # since namespace is now org-scoped - result = await namespace.write(delete_by_filter=("agent_id", "Eq", agent_id)) - logger.info(f"Successfully deleted all messages for agent {agent_id} (deleted {result.rows_affected} rows)") - return True + # Run in thread pool for consistency + result = await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + delete_by_filter=("agent_id", "Eq", agent_id), + ) + logger.info(f"Successfully deleted all messages for agent {agent_id} (deleted {result.rows_affected if result else 0} rows)") + return True except Exception as e: logger.error(f"Failed to delete all messages from Turbopuffer: {e}") raise @@ -1511,19 +1578,20 @@ class TurbopufferClient: } try: - # use global semaphore to limit concurrent Turbopuffer writes + # Use global semaphore to limit concurrent Turbopuffer writes async with _GLOBAL_TURBOPUFFER_SEMAPHORE: - # use AsyncTurbopuffer as a context manager for proper resource cleanup - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # turbopuffer recommends column-based writes for performance - await namespace.write( - upsert_columns=upsert_columns, - distance_metric="cosine_distance", - schema={"text": {"type": "string", "full_text_search": True}}, - ) - logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}") - return passages + # Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + upsert_columns=upsert_columns, + distance_metric="cosine_distance", + schema={"text": {"type": "string", "full_text_search": True}}, + ) + logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}") + return passages except Exception as e: logger.error(f"Failed to insert file passages to Turbopuffer: {e}") @@ -1680,16 +1748,22 @@ class TurbopufferClient: namespace_name = await self._get_file_passages_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # use delete_by_filter to only delete passages for this file - # need to filter by both source_id and file_id - filter_expr = ("And", [("source_id", "Eq", source_id), ("file_id", "Eq", file_id)]) - result = await namespace.write(delete_by_filter=filter_expr) - logger.info( - f"Successfully deleted passages for file {file_id} from source {source_id} (deleted {result.rows_affected} rows)" - ) - return True + # use delete_by_filter to only delete passages for this file + # need to filter by both source_id and file_id + filter_expr = ("And", [("source_id", "Eq", source_id), ("file_id", "Eq", file_id)]) + + # Run in thread pool for consistency + result = await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + delete_by_filter=filter_expr, + ) + logger.info( + f"Successfully deleted passages for file {file_id} from source {source_id} (deleted {result.rows_affected if result else 0} rows)" + ) + return True except Exception as e: logger.error(f"Failed to delete file passages from Turbopuffer: {e}") raise @@ -1702,12 +1776,16 @@ class TurbopufferClient: namespace_name = await self._get_file_passages_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - # delete all passages for this source - result = await namespace.write(delete_by_filter=("source_id", "Eq", source_id)) - logger.info(f"Successfully deleted all passages for source {source_id} (deleted {result.rows_affected} rows)") - return True + # Run in thread pool for consistency + result = await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + delete_by_filter=("source_id", "Eq", source_id), + ) + logger.info(f"Successfully deleted all passages for source {source_id} (deleted {result.rows_affected if result else 0} rows)") + return True except Exception as e: logger.error(f"Failed to delete source passages from Turbopuffer: {e}") raise @@ -1733,11 +1811,16 @@ class TurbopufferClient: namespace_name = await self._get_tool_namespace_name(organization_id) try: - async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client: - namespace = client.namespace(namespace_name) - await namespace.write(deletes=tool_ids) - logger.info(f"Successfully deleted {len(tool_ids)} tools from Turbopuffer") - return True + # Run in thread pool for consistency + await asyncio.to_thread( + _run_turbopuffer_write_in_thread, + api_key=self.api_key, + region=self.region, + namespace_name=namespace_name, + deletes=tool_ids, + ) + logger.info(f"Successfully deleted {len(tool_ids)} tools from Turbopuffer") + return True except Exception as e: logger.error(f"Failed to delete tools from Turbopuffer: {e}") raise