fix: wrap tpuf operations in thread pool (#8615)

* fix: wrap turbopuffer vector writes in thread pool

Turbopuffer library does CPU-intensive base64 encoding of vectors
synchronously in async functions (_async_transform_recursive →
b64encode_vector), blocking the event loop during file uploads.

Solution: Created _run_turbopuffer_write_in_thread() helper that runs
turbopuffer writes in an isolated event loop within a worker thread.

Applied to all vector write operations:
- insert_tools()
- insert_archival_memories()
- insert_messages()
- insert_file_passages()

This prevents pybase64.b64encode_as_string() from blocking the main
event loop during vector encoding.

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: wrap all turbopuffer operations in thread pool

Extended the thread pool wrapping to ALL turbopuffer write operations,
including delete operations, for complete isolation from the main event loop.

All turbopuffer namespace.write() calls now run in isolated event loops
within worker threads, preventing any potential CPU work from blocking.

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

---------

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
cthomas
2026-01-12 13:36:42 -08:00
committed by Sarah Wooders
parent c05f3cec0b
commit 870c5955d9

View File

@@ -20,6 +20,52 @@ logger = logging.getLogger(__name__)
_GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5)
def _run_turbopuffer_write_in_thread(
api_key: str,
region: str,
namespace_name: str,
upsert_columns: dict = None,
deletes: list = None,
delete_by_filter: tuple = None,
distance_metric: str = "cosine_distance",
schema: dict = None,
):
"""
Sync wrapper to run turbopuffer write in isolated event loop.
Turbopuffer's async write() does CPU-intensive base64 encoding of vectors
synchronously in async functions, blocking the event loop. Running it in
a thread pool with an isolated event loop prevents blocking.
"""
from turbopuffer import AsyncTurbopuffer
# Create new event loop for this worker thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
async def do_write():
async with AsyncTurbopuffer(api_key=api_key, region=region) as client:
namespace = client.namespace(namespace_name)
# Build write kwargs
kwargs = {"distance_metric": distance_metric}
if upsert_columns:
kwargs["upsert_columns"] = upsert_columns
if deletes:
kwargs["deletes"] = deletes
if delete_by_filter:
kwargs["delete_by_filter"] = delete_by_filter
if schema:
kwargs["schema"] = schema
return await namespace.write(**kwargs)
return loop.run_until_complete(do_write())
finally:
loop.close()
def should_use_tpuf() -> bool:
# We need OpenAI since we default to their embedding model
return bool(settings.use_tpuf) and bool(settings.tpuf_api_key) and bool(model_settings.openai_api_key)
@@ -246,16 +292,20 @@ class TurbopufferClient:
}
try:
# Use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} tools to Turbopuffer")
return True
# Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} tools to Turbopuffer")
return True
except Exception as e:
logger.error(f"Failed to insert tools to Turbopuffer: {e}")
@@ -367,19 +417,20 @@ class TurbopufferClient:
}
try:
# use global semaphore to limit concurrent Turbopuffer writes
# Use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
# Use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}")
return passages
# Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} passages to Turbopuffer for archive {archive_id}")
return passages
except Exception as e:
logger.error(f"Failed to insert passages to Turbopuffer: {e}")
@@ -508,22 +559,23 @@ class TurbopufferClient:
upsert_columns["template_id"] = template_ids_list
try:
# use global semaphore to limit concurrent Turbopuffer writes
# Use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
# Use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={
"text": {"type": "string", "full_text_search": True},
"conversation_id": {"type": "string"},
},
)
logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}")
return True
# Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={
"text": {"type": "string", "full_text_search": True},
"conversation_id": {"type": "string"},
},
)
logger.info(f"Successfully inserted {len(ids)} messages to Turbopuffer for agent {agent_id}")
return True
except Exception as e:
logger.error(f"Failed to insert messages to Turbopuffer: {e}")
@@ -1308,12 +1360,16 @@ class TurbopufferClient:
namespace_name = await self._get_archive_namespace_name(archive_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# Use write API with deletes parameter as per Turbopuffer docs
await namespace.write(deletes=[passage_id])
logger.info(f"Successfully deleted passage {passage_id} from Turbopuffer archive {archive_id}")
return True
# Run in thread pool for consistency (deletes are lightweight but use same wrapper)
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
deletes=[passage_id],
)
logger.info(f"Successfully deleted passage {passage_id} from Turbopuffer archive {archive_id}")
return True
except Exception as e:
logger.error(f"Failed to delete passage from Turbopuffer: {e}")
raise
@@ -1329,12 +1385,16 @@ class TurbopufferClient:
namespace_name = await self._get_archive_namespace_name(archive_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# Use write API with deletes parameter as per Turbopuffer docs
await namespace.write(deletes=passage_ids)
logger.info(f"Successfully deleted {len(passage_ids)} passages from Turbopuffer archive {archive_id}")
return True
# Run in thread pool for consistency
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
deletes=passage_ids,
)
logger.info(f"Successfully deleted {len(passage_ids)} passages from Turbopuffer archive {archive_id}")
return True
except Exception as e:
logger.error(f"Failed to delete passages from Turbopuffer: {e}")
raise
@@ -1368,12 +1428,16 @@ class TurbopufferClient:
namespace_name = await self._get_message_namespace_name(organization_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# Use write API with deletes parameter as per Turbopuffer docs
await namespace.write(deletes=message_ids)
logger.info(f"Successfully deleted {len(message_ids)} messages from Turbopuffer for agent {agent_id}")
return True
# Run in thread pool for consistency
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
deletes=message_ids,
)
logger.info(f"Successfully deleted {len(message_ids)} messages from Turbopuffer for agent {agent_id}")
return True
except Exception as e:
logger.error(f"Failed to delete messages from Turbopuffer: {e}")
raise
@@ -1386,13 +1450,16 @@ class TurbopufferClient:
namespace_name = await self._get_message_namespace_name(organization_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# Use delete_by_filter to only delete messages for this agent
# since namespace is now org-scoped
result = await namespace.write(delete_by_filter=("agent_id", "Eq", agent_id))
logger.info(f"Successfully deleted all messages for agent {agent_id} (deleted {result.rows_affected} rows)")
return True
# Run in thread pool for consistency
result = await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
delete_by_filter=("agent_id", "Eq", agent_id),
)
logger.info(f"Successfully deleted all messages for agent {agent_id} (deleted {result.rows_affected if result else 0} rows)")
return True
except Exception as e:
logger.error(f"Failed to delete all messages from Turbopuffer: {e}")
raise
@@ -1511,19 +1578,20 @@ class TurbopufferClient:
}
try:
# use global semaphore to limit concurrent Turbopuffer writes
# Use global semaphore to limit concurrent Turbopuffer writes
async with _GLOBAL_TURBOPUFFER_SEMAPHORE:
# use AsyncTurbopuffer as a context manager for proper resource cleanup
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# turbopuffer recommends column-based writes for performance
await namespace.write(
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}")
return passages
# Run in thread pool to prevent CPU-intensive base64 encoding from blocking event loop
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
upsert_columns=upsert_columns,
distance_metric="cosine_distance",
schema={"text": {"type": "string", "full_text_search": True}},
)
logger.info(f"Successfully inserted {len(ids)} file passages to Turbopuffer for source {source_id}, file {file_id}")
return passages
except Exception as e:
logger.error(f"Failed to insert file passages to Turbopuffer: {e}")
@@ -1680,16 +1748,22 @@ class TurbopufferClient:
namespace_name = await self._get_file_passages_namespace_name(organization_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# use delete_by_filter to only delete passages for this file
# need to filter by both source_id and file_id
filter_expr = ("And", [("source_id", "Eq", source_id), ("file_id", "Eq", file_id)])
result = await namespace.write(delete_by_filter=filter_expr)
logger.info(
f"Successfully deleted passages for file {file_id} from source {source_id} (deleted {result.rows_affected} rows)"
)
return True
# use delete_by_filter to only delete passages for this file
# need to filter by both source_id and file_id
filter_expr = ("And", [("source_id", "Eq", source_id), ("file_id", "Eq", file_id)])
# Run in thread pool for consistency
result = await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
delete_by_filter=filter_expr,
)
logger.info(
f"Successfully deleted passages for file {file_id} from source {source_id} (deleted {result.rows_affected if result else 0} rows)"
)
return True
except Exception as e:
logger.error(f"Failed to delete file passages from Turbopuffer: {e}")
raise
@@ -1702,12 +1776,16 @@ class TurbopufferClient:
namespace_name = await self._get_file_passages_namespace_name(organization_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
# delete all passages for this source
result = await namespace.write(delete_by_filter=("source_id", "Eq", source_id))
logger.info(f"Successfully deleted all passages for source {source_id} (deleted {result.rows_affected} rows)")
return True
# Run in thread pool for consistency
result = await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
delete_by_filter=("source_id", "Eq", source_id),
)
logger.info(f"Successfully deleted all passages for source {source_id} (deleted {result.rows_affected if result else 0} rows)")
return True
except Exception as e:
logger.error(f"Failed to delete source passages from Turbopuffer: {e}")
raise
@@ -1733,11 +1811,16 @@ class TurbopufferClient:
namespace_name = await self._get_tool_namespace_name(organization_id)
try:
async with AsyncTurbopuffer(api_key=self.api_key, region=self.region) as client:
namespace = client.namespace(namespace_name)
await namespace.write(deletes=tool_ids)
logger.info(f"Successfully deleted {len(tool_ids)} tools from Turbopuffer")
return True
# Run in thread pool for consistency
await asyncio.to_thread(
_run_turbopuffer_write_in_thread,
api_key=self.api_key,
region=self.region,
namespace_name=namespace_name,
deletes=tool_ids,
)
logger.info(f"Successfully deleted {len(tool_ids)} tools from Turbopuffer")
return True
except Exception as e:
logger.error(f"Failed to delete tools from Turbopuffer: {e}")
raise