From d592ec3135be568d50b529fa8c65ab749348d6f0 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Fri, 6 Feb 2026 16:31:14 -0800 Subject: [PATCH] fix: handle DBAPIError wrapping asyncpg DeadlockDetectedError (#9355) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit SQLAlchemy wraps asyncpg's DeadlockDetectedError in a DBAPIError, which was falling through to the generic 500 handler. Now detected at both the ORM level (_handle_dbapi_error) and FastAPI handler level, returning 409 with Retry-After header. Datadog: https://us5.datadoghq.com/error-tracking/issue/2f1dc54c-dab6-11f0-a828-da7ad0900000 🐾 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta --- letta/orm/errors.py | 8 ++++++++ letta/orm/sqlalchemy_base.py | 19 ++++++++++++++++-- letta/server/rest_api/app.py | 39 ++++++++++++++++++++++++++++++++++-- 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/letta/orm/errors.py b/letta/orm/errors.py index a574e74c..59b7b400 100644 --- a/letta/orm/errors.py +++ b/letta/orm/errors.py @@ -20,3 +20,11 @@ class DatabaseTimeoutError(Exception): def __init__(self, message="Database operation timed out", original_exception=None): super().__init__(message) self.original_exception = original_exception + + +class DatabaseDeadlockError(Exception): + """Custom exception for database deadlock errors (PostgreSQL error code 40P01).""" + + def __init__(self, message="A database deadlock was detected", original_exception=None): + super().__init__(message) + self.original_exception = original_exception diff --git a/letta/orm/sqlalchemy_base.py b/letta/orm/sqlalchemy_base.py index c012c54a..04700312 100644 --- a/letta/orm/sqlalchemy_base.py +++ b/letta/orm/sqlalchemy_base.py @@ -5,7 +5,7 @@ from functools import wraps from pprint import pformat from typing import TYPE_CHECKING, List, Literal, Optional, Tuple, Union -from asyncpg.exceptions import QueryCanceledError +from asyncpg.exceptions import DeadlockDetectedError, QueryCanceledError from sqlalchemy import Sequence, String, and_, delete, func, or_, select from sqlalchemy.exc import DBAPIError, IntegrityError, TimeoutError from sqlalchemy.ext.asyncio import AsyncSession @@ -16,7 +16,13 @@ from sqlalchemy.orm.interfaces import ORMOption from letta.errors import ConcurrentUpdateError from letta.log import get_logger from letta.orm.base import Base, CommonSqlalchemyMetaMixins -from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError +from letta.orm.errors import ( + DatabaseDeadlockError, + DatabaseTimeoutError, + ForeignKeyConstraintViolationError, + NoResultFound, + UniqueConstraintViolationError, +) from letta.settings import DatabaseChoice if TYPE_CHECKING: @@ -810,6 +816,10 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base): logger.error(f"Query canceled (statement timeout) for {cls.__name__}: {e}") raise DatabaseTimeoutError(message=f"Query canceled due to statement timeout for {cls.__name__}.", original_exception=e) from e + if isinstance(orig, DeadlockDetectedError): + logger.error(f"Deadlock detected for {cls.__name__}: {e}") + raise DatabaseDeadlockError(message=f"A database deadlock was detected for {cls.__name__}.", original_exception=e) from e + # Handle SQLite-specific errors if "UNIQUE constraint failed" in error_message: raise UniqueConstraintViolationError( @@ -844,6 +854,11 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base): f"A foreign key constraint was violated for {cls.__name__}. Check your input for missing or invalid references: {e}" ) from e + # Handle deadlock detected + if error_code == "40P01": + logger.error(f"Deadlock detected for {cls.__name__}: {e}") + raise DatabaseDeadlockError(message=f"A database deadlock was detected for {cls.__name__}.", original_exception=e) from e + # Re-raise for other unhandled DBAPI errors raise diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index b58d083e..91d8905f 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -20,7 +20,7 @@ from fastapi import FastAPI, Request from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse, ORJSONResponse from marshmallow import ValidationError -from sqlalchemy.exc import IntegrityError, OperationalError +from sqlalchemy.exc import DBAPIError, IntegrityError, OperationalError from starlette.middleware.cors import CORSMiddleware from letta.__init__ import __version__ as letta_version @@ -59,7 +59,13 @@ from letta.errors import ( from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pinecone, upsert_pinecone_indices from letta.jobs.scheduler import start_scheduler_with_leader_election from letta.log import get_logger -from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError +from letta.orm.errors import ( + DatabaseDeadlockError, + DatabaseTimeoutError, + ForeignKeyConstraintViolationError, + NoResultFound, + UniqueConstraintViolationError, +) from letta.otel.tracing import get_trace_id from letta.schemas.letta_message import create_letta_error_message_schema, create_letta_message_union_schema from letta.schemas.letta_message_content import ( @@ -547,6 +553,35 @@ def create_application() -> "FastAPI": app.add_exception_handler(LettaServiceUnavailableError, _error_handler_503) app.add_exception_handler(LLMProviderOverloaded, _error_handler_503) + @app.exception_handler(DatabaseDeadlockError) + async def database_deadlock_error_handler(request: Request, exc: DatabaseDeadlockError): + logger.error(f"Deadlock detected: {exc}. Original exception: {exc.original_exception}") + return JSONResponse( + status_code=409, + content={"detail": "A database deadlock was detected. Please retry your request."}, + headers={"Retry-After": "1"}, + ) + + @app.exception_handler(DBAPIError) + async def dbapi_error_handler(request: Request, exc: DBAPIError): + from asyncpg.exceptions import DeadlockDetectedError + + if isinstance(exc.orig, DeadlockDetectedError): + logger.error(f"Deadlock detected (DBAPIError wrapper): {exc}") + return JSONResponse( + status_code=409, + content={"detail": "A database deadlock was detected. Please retry your request."}, + headers={"Retry-After": "1"}, + ) + + logger.error(f"Unhandled DBAPIError: {exc}", exc_info=True) + if SENTRY_ENABLED: + sentry_sdk.capture_exception(exc) + return JSONResponse( + status_code=500, + content={"detail": "A database error occurred."}, + ) + @app.exception_handler(IncompatibleAgentType) async def handle_incompatible_agent_type(request: Request, exc: IncompatibleAgentType): logger.error("Incompatible agent types. Expected: %s, Actual: %s", exc.expected_type, exc.actual_type)