From 101cfefe5e18d66d9b1687296fde3bb9b4a17c62 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 30 Jan 2026 12:23:08 -0800 Subject: [PATCH] fix: add retry logic for turbopuffer transient network errors (#8635) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add async retry decorator with exponential backoff to handle transient network connection errors (httpx.ConnectError) when connecting to turbopuffer.com. This addresses production errors seen in memgpt-server. Changes: - Add `async_retry_with_backoff()` decorator for async methods - Add `is_transient_error()` helper to identify retryable errors - Apply retry logic to all turbopuffer network operations - Retry config: 3 retries, 1s initial delay, 2x backoff, 10% jitter Fixes #8390 Relates to #8155 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com> Co-authored-by: datadog-official[bot] Co-authored-by: Kian Jones <11655409+kianjones9@users.noreply.github.com> --- letta/helpers/tpuf_client.py | 154 ++++++++++++++++++++++++++++++++++- 1 file changed, 152 insertions(+), 2 deletions(-) diff --git a/letta/helpers/tpuf_client.py b/letta/helpers/tpuf_client.py index 169b9969..17ac59fa 100644 --- a/letta/helpers/tpuf_client.py +++ b/letta/helpers/tpuf_client.py @@ -3,12 +3,16 @@ import asyncio import json import logging +import random from datetime import datetime, timezone -from typing import Any, Callable, List, Optional, Tuple +from functools import wraps +from typing import Any, Callable, List, Optional, Tuple, TypeVar + +import httpx from letta.constants import DEFAULT_EMBEDDING_CHUNK_SIZE from letta.errors import LettaInvalidArgumentError -from letta.otel.tracing import trace_method +from letta.otel.tracing import trace_method, log_event from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import MessageRole, TagMatchMode from letta.schemas.passage import Passage as PydanticPassage @@ -16,6 +20,139 @@ from letta.settings import model_settings, settings logger = logging.getLogger(__name__) +# Type variable for generic async retry decorator +T = TypeVar("T") + +# Default retry configuration for turbopuffer operations +TPUF_MAX_RETRIES = 3 +TPUF_INITIAL_DELAY = 1.0 # seconds +TPUF_EXPONENTIAL_BASE = 2.0 +TPUF_JITTER = True + + +def is_transient_error(error: Exception) -> bool: + """Check if an error is transient and should be retried. + + Args: + error: The exception to check + + Returns: + True if the error is transient and can be retried + """ + # httpx connection errors (network issues, DNS failures, etc.) + if isinstance(error, httpx.ConnectError): + return True + + # httpx timeout errors + if isinstance(error, httpx.TimeoutException): + return True + + # httpx network errors + if isinstance(error, httpx.NetworkError): + return True + + # Check for connection-related errors in the error message + error_str = str(error).lower() + transient_patterns = [ + "connect call failed", + "connection refused", + "connection reset", + "connection timed out", + "temporary failure", + "name resolution", + "dns", + "network unreachable", + "no route to host", + "ssl handshake", + ] + for pattern in transient_patterns: + if pattern in error_str: + return True + + return False + + +def async_retry_with_backoff( + max_retries: int = TPUF_MAX_RETRIES, + initial_delay: float = TPUF_INITIAL_DELAY, + exponential_base: float = TPUF_EXPONENTIAL_BASE, + jitter: bool = TPUF_JITTER, +): + """Decorator for async functions that retries on transient errors with exponential backoff. + + Args: + max_retries: Maximum number of retry attempts + initial_delay: Initial delay between retries in seconds + exponential_base: Base for exponential backoff calculation + jitter: Whether to add random jitter to delays + + Returns: + Decorated async function with retry logic + """ + + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + @wraps(func) + async def wrapper(*args, **kwargs) -> Any: + num_retries = 0 + delay = initial_delay + last_error: Optional[Exception] = None + + while True: + try: + return await func(*args, **kwargs) + except Exception as e: + # Check if this is a retryable error + if not is_transient_error(e): + # Not a transient error, re-raise immediately + raise + + last_error = e + num_retries += 1 + + # Log the retry attempt + log_event( + "turbopuffer_retry_attempt", + { + "attempt": num_retries, + "delay": delay, + "error_type": type(e).__name__, + "error": str(e), + "function": func.__name__, + }, + ) + logger.warning( + f"Turbopuffer operation '{func.__name__}' failed with transient error " + f"(attempt {num_retries}/{max_retries}): {e}. Retrying in {delay:.1f}s..." + ) + + # Check if max retries exceeded + if num_retries > max_retries: + log_event( + "turbopuffer_max_retries_exceeded", + { + "max_retries": max_retries, + "error_type": type(e).__name__, + "error": str(e), + "function": func.__name__, + }, + ) + logger.error( + f"Turbopuffer operation '{func.__name__}' failed after {max_retries} retries: {e}" + ) + raise + + # Wait with exponential backoff + await asyncio.sleep(delay) + + # Calculate next delay with optional jitter + delay *= exponential_base + if jitter: + delay *= 1 + random.random() * 0.1 # Add up to 10% jitter + + return wrapper + + return decorator + # Global semaphore for Turbopuffer operations to prevent overwhelming the service # This is separate from embedding semaphore since Turbopuffer can handle more concurrency _GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5) @@ -222,6 +359,7 @@ class TurbopufferClient: return json.dumps(parts) @trace_method + @async_retry_with_backoff() async def insert_tools( self, tools: List["PydanticTool"], @@ -313,6 +451,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def insert_archival_memories( self, archive_id: str, @@ -464,6 +603,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def insert_messages( self, agent_id: str, @@ -609,6 +749,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def _execute_query( self, namespace_name: str, @@ -1377,6 +1518,7 @@ class TurbopufferClient: return sorted_results[:top_k] @trace_method + @async_retry_with_backoff() async def delete_passage(self, archive_id: str, passage_id: str) -> bool: """Delete a passage from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1399,6 +1541,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def delete_passages(self, archive_id: str, passage_ids: List[str]) -> bool: """Delete multiple passages from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1424,6 +1567,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def delete_all_passages(self, archive_id: str) -> bool: """Delete all passages for an archive from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1442,6 +1586,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def delete_messages(self, agent_id: str, organization_id: str, message_ids: List[str]) -> bool: """Delete multiple messages from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1467,6 +1612,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def delete_all_messages(self, agent_id: str, organization_id: str) -> bool: """Delete all messages for an agent from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1509,6 +1655,7 @@ class TurbopufferClient: return namespace_name @trace_method + @async_retry_with_backoff() async def insert_file_passages( self, source_id: str, @@ -1765,6 +1912,7 @@ class TurbopufferClient: return passages_with_scores @trace_method + @async_retry_with_backoff() async def delete_file_passages(self, source_id: str, file_id: str, organization_id: str) -> bool: """Delete all passages for a specific file from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1793,6 +1941,7 @@ class TurbopufferClient: raise @trace_method + @async_retry_with_backoff() async def delete_source_passages(self, source_id: str, organization_id: str) -> bool: """Delete all passages for a source from Turbopuffer.""" from turbopuffer import AsyncTurbopuffer @@ -1817,6 +1966,7 @@ class TurbopufferClient: # tool methods @trace_method + @async_retry_with_backoff() async def delete_tools(self, organization_id: str, tool_ids: List[str]) -> bool: """Delete tools from Turbopuffer.