fix(core): handle transient database connection errors with retry logic (#9324)

Add retry mechanism for ConnectionError during asyncpg SSL handshake failures.
Implements exponential backoff (3 attempts) and returns 503 on exhaustion.

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

Issue-ID: 8caf1136-0200-11f1-8f4d-da7ad0900000
This commit is contained in:
Kian Jones
2026-02-05 20:19:23 -08:00
committed by Caren Thomas
parent 05b77a5fed
commit 69fc934135

View File

@@ -12,8 +12,11 @@ from sqlalchemy.ext.asyncio import (
)
from letta.database_utils import get_database_uri_for_context
from letta.log import get_logger
from letta.settings import settings
logger = get_logger(__name__)
# Convert PostgreSQL URI to async format using common utility
async_pg_uri = get_database_uri_for_context(settings.letta_pg_uri, "async")
@@ -75,22 +78,42 @@ class DatabaseRegistry:
a BaseException (not Exception) in Python 3.8+. Without this, cancelled
tasks would skip rollback() and return connections to the pool with
uncommitted transactions, causing "idle in transaction" connection leaks.
Implements retry logic for transient connection errors (e.g., SSL handshake failures).
"""
async with async_session_factory() as session:
max_retries = 3
retry_delay = 0.1
for attempt in range(max_retries):
try:
yield session
await session.commit()
except asyncio.CancelledError:
# Task was cancelled (client disconnect, timeout, explicit cancellation)
# Must rollback to avoid returning connection with open transaction
await session.rollback()
raise
except Exception:
await session.rollback()
raise
finally:
session.expunge_all()
await session.close()
async with async_session_factory() as session:
try:
yield session
await session.commit()
except asyncio.CancelledError:
# Task was cancelled (client disconnect, timeout, explicit cancellation)
# Must rollback to avoid returning connection with open transaction
await session.rollback()
raise
except Exception:
await session.rollback()
raise
finally:
session.expunge_all()
await session.close()
return
except ConnectionError as e:
if attempt < max_retries - 1:
logger.warning(f"Database connection error (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay *= 2
else:
logger.error(f"Database connection failed after {max_retries} attempts: {e}")
from letta.errors import LettaServiceUnavailableError
raise LettaServiceUnavailableError(
"Database connection temporarily unavailable. Please retry your request.", service_name="database"
) from e
# Create singleton instance to match existing interface