fix: add retry logic for turbopuffer transient network errors (#8635)

Add async retry decorator with exponential backoff to handle transient
network connection errors (httpx.ConnectError) when connecting to
turbopuffer.com. This addresses production errors seen in memgpt-server.

Changes:
- Add `async_retry_with_backoff()` decorator for async methods
- Add `is_transient_error()` helper to identify retryable errors
- Apply retry logic to all turbopuffer network operations
- Retry config: 3 retries, 1s initial delay, 2x backoff, 10% jitter

Fixes #8390
Relates to #8155

🤖 Generated with [Letta Code](https://letta.com)

Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com>
Co-authored-by: datadog-official[bot] <datadog-official[bot]@users.noreply.github.com>
Co-authored-by: Kian Jones <11655409+kianjones9@users.noreply.github.com>
This commit is contained in:
github-actions[bot]
2026-01-30 12:23:08 -08:00
committed by Caren Thomas
parent 81398118dd
commit 101cfefe5e

View File

@@ -3,12 +3,16 @@
import asyncio
import json
import logging
import random
from datetime import datetime, timezone
from typing import Any, Callable, List, Optional, Tuple
from functools import wraps
from typing import Any, Callable, List, Optional, Tuple, TypeVar
import httpx
from letta.constants import DEFAULT_EMBEDDING_CHUNK_SIZE
from letta.errors import LettaInvalidArgumentError
from letta.otel.tracing import trace_method
from letta.otel.tracing import trace_method, log_event
from letta.schemas.embedding_config import EmbeddingConfig
from letta.schemas.enums import MessageRole, TagMatchMode
from letta.schemas.passage import Passage as PydanticPassage
@@ -16,6 +20,139 @@ from letta.settings import model_settings, settings
logger = logging.getLogger(__name__)
# Type variable for generic async retry decorator
T = TypeVar("T")
# Default retry configuration for turbopuffer operations
TPUF_MAX_RETRIES = 3
TPUF_INITIAL_DELAY = 1.0 # seconds
TPUF_EXPONENTIAL_BASE = 2.0
TPUF_JITTER = True
def is_transient_error(error: Exception) -> bool:
"""Check if an error is transient and should be retried.
Args:
error: The exception to check
Returns:
True if the error is transient and can be retried
"""
# httpx connection errors (network issues, DNS failures, etc.)
if isinstance(error, httpx.ConnectError):
return True
# httpx timeout errors
if isinstance(error, httpx.TimeoutException):
return True
# httpx network errors
if isinstance(error, httpx.NetworkError):
return True
# Check for connection-related errors in the error message
error_str = str(error).lower()
transient_patterns = [
"connect call failed",
"connection refused",
"connection reset",
"connection timed out",
"temporary failure",
"name resolution",
"dns",
"network unreachable",
"no route to host",
"ssl handshake",
]
for pattern in transient_patterns:
if pattern in error_str:
return True
return False
def async_retry_with_backoff(
max_retries: int = TPUF_MAX_RETRIES,
initial_delay: float = TPUF_INITIAL_DELAY,
exponential_base: float = TPUF_EXPONENTIAL_BASE,
jitter: bool = TPUF_JITTER,
):
"""Decorator for async functions that retries on transient errors with exponential backoff.
Args:
max_retries: Maximum number of retry attempts
initial_delay: Initial delay between retries in seconds
exponential_base: Base for exponential backoff calculation
jitter: Whether to add random jitter to delays
Returns:
Decorated async function with retry logic
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
num_retries = 0
delay = initial_delay
last_error: Optional[Exception] = None
while True:
try:
return await func(*args, **kwargs)
except Exception as e:
# Check if this is a retryable error
if not is_transient_error(e):
# Not a transient error, re-raise immediately
raise
last_error = e
num_retries += 1
# Log the retry attempt
log_event(
"turbopuffer_retry_attempt",
{
"attempt": num_retries,
"delay": delay,
"error_type": type(e).__name__,
"error": str(e),
"function": func.__name__,
},
)
logger.warning(
f"Turbopuffer operation '{func.__name__}' failed with transient error "
f"(attempt {num_retries}/{max_retries}): {e}. Retrying in {delay:.1f}s..."
)
# Check if max retries exceeded
if num_retries > max_retries:
log_event(
"turbopuffer_max_retries_exceeded",
{
"max_retries": max_retries,
"error_type": type(e).__name__,
"error": str(e),
"function": func.__name__,
},
)
logger.error(
f"Turbopuffer operation '{func.__name__}' failed after {max_retries} retries: {e}"
)
raise
# Wait with exponential backoff
await asyncio.sleep(delay)
# Calculate next delay with optional jitter
delay *= exponential_base
if jitter:
delay *= 1 + random.random() * 0.1 # Add up to 10% jitter
return wrapper
return decorator
# Global semaphore for Turbopuffer operations to prevent overwhelming the service
# This is separate from embedding semaphore since Turbopuffer can handle more concurrency
_GLOBAL_TURBOPUFFER_SEMAPHORE = asyncio.Semaphore(5)
@@ -222,6 +359,7 @@ class TurbopufferClient:
return json.dumps(parts)
@trace_method
@async_retry_with_backoff()
async def insert_tools(
self,
tools: List["PydanticTool"],
@@ -313,6 +451,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def insert_archival_memories(
self,
archive_id: str,
@@ -464,6 +603,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def insert_messages(
self,
agent_id: str,
@@ -609,6 +749,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def _execute_query(
self,
namespace_name: str,
@@ -1377,6 +1518,7 @@ class TurbopufferClient:
return sorted_results[:top_k]
@trace_method
@async_retry_with_backoff()
async def delete_passage(self, archive_id: str, passage_id: str) -> bool:
"""Delete a passage from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1399,6 +1541,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def delete_passages(self, archive_id: str, passage_ids: List[str]) -> bool:
"""Delete multiple passages from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1424,6 +1567,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def delete_all_passages(self, archive_id: str) -> bool:
"""Delete all passages for an archive from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1442,6 +1586,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def delete_messages(self, agent_id: str, organization_id: str, message_ids: List[str]) -> bool:
"""Delete multiple messages from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1467,6 +1612,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def delete_all_messages(self, agent_id: str, organization_id: str) -> bool:
"""Delete all messages for an agent from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1509,6 +1655,7 @@ class TurbopufferClient:
return namespace_name
@trace_method
@async_retry_with_backoff()
async def insert_file_passages(
self,
source_id: str,
@@ -1765,6 +1912,7 @@ class TurbopufferClient:
return passages_with_scores
@trace_method
@async_retry_with_backoff()
async def delete_file_passages(self, source_id: str, file_id: str, organization_id: str) -> bool:
"""Delete all passages for a specific file from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1793,6 +1941,7 @@ class TurbopufferClient:
raise
@trace_method
@async_retry_with_backoff()
async def delete_source_passages(self, source_id: str, organization_id: str) -> bool:
"""Delete all passages for a source from Turbopuffer."""
from turbopuffer import AsyncTurbopuffer
@@ -1817,6 +1966,7 @@ class TurbopufferClient:
# tool methods
@trace_method
@async_retry_with_backoff()
async def delete_tools(self, organization_id: str, tool_ids: List[str]) -> bool:
"""Delete tools from Turbopuffer.