fix: handle DBAPIError wrapping asyncpg DeadlockDetectedError (#9355)
SQLAlchemy wraps asyncpg's DeadlockDetectedError in a DBAPIError, which was falling through to the generic 500 handler. Now detected at both the ORM level (_handle_dbapi_error) and FastAPI handler level, returning 409 with Retry-After header. Datadog: https://us5.datadoghq.com/error-tracking/issue/2f1dc54c-dab6-11f0-a828-da7ad0900000 🐾 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -20,3 +20,11 @@ class DatabaseTimeoutError(Exception):
|
|||||||
def __init__(self, message="Database operation timed out", original_exception=None):
|
def __init__(self, message="Database operation timed out", original_exception=None):
|
||||||
super().__init__(message)
|
super().__init__(message)
|
||||||
self.original_exception = original_exception
|
self.original_exception = original_exception
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseDeadlockError(Exception):
|
||||||
|
"""Custom exception for database deadlock errors (PostgreSQL error code 40P01)."""
|
||||||
|
|
||||||
|
def __init__(self, message="A database deadlock was detected", original_exception=None):
|
||||||
|
super().__init__(message)
|
||||||
|
self.original_exception = original_exception
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from functools import wraps
|
|||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import TYPE_CHECKING, List, Literal, Optional, Tuple, Union
|
from typing import TYPE_CHECKING, List, Literal, Optional, Tuple, Union
|
||||||
|
|
||||||
from asyncpg.exceptions import QueryCanceledError
|
from asyncpg.exceptions import DeadlockDetectedError, QueryCanceledError
|
||||||
from sqlalchemy import Sequence, String, and_, delete, func, or_, select
|
from sqlalchemy import Sequence, String, and_, delete, func, or_, select
|
||||||
from sqlalchemy.exc import DBAPIError, IntegrityError, TimeoutError
|
from sqlalchemy.exc import DBAPIError, IntegrityError, TimeoutError
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
@@ -16,7 +16,13 @@ from sqlalchemy.orm.interfaces import ORMOption
|
|||||||
from letta.errors import ConcurrentUpdateError
|
from letta.errors import ConcurrentUpdateError
|
||||||
from letta.log import get_logger
|
from letta.log import get_logger
|
||||||
from letta.orm.base import Base, CommonSqlalchemyMetaMixins
|
from letta.orm.base import Base, CommonSqlalchemyMetaMixins
|
||||||
from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError
|
from letta.orm.errors import (
|
||||||
|
DatabaseDeadlockError,
|
||||||
|
DatabaseTimeoutError,
|
||||||
|
ForeignKeyConstraintViolationError,
|
||||||
|
NoResultFound,
|
||||||
|
UniqueConstraintViolationError,
|
||||||
|
)
|
||||||
from letta.settings import DatabaseChoice
|
from letta.settings import DatabaseChoice
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
@@ -810,6 +816,10 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base):
|
|||||||
logger.error(f"Query canceled (statement timeout) for {cls.__name__}: {e}")
|
logger.error(f"Query canceled (statement timeout) for {cls.__name__}: {e}")
|
||||||
raise DatabaseTimeoutError(message=f"Query canceled due to statement timeout for {cls.__name__}.", original_exception=e) from e
|
raise DatabaseTimeoutError(message=f"Query canceled due to statement timeout for {cls.__name__}.", original_exception=e) from e
|
||||||
|
|
||||||
|
if isinstance(orig, DeadlockDetectedError):
|
||||||
|
logger.error(f"Deadlock detected for {cls.__name__}: {e}")
|
||||||
|
raise DatabaseDeadlockError(message=f"A database deadlock was detected for {cls.__name__}.", original_exception=e) from e
|
||||||
|
|
||||||
# Handle SQLite-specific errors
|
# Handle SQLite-specific errors
|
||||||
if "UNIQUE constraint failed" in error_message:
|
if "UNIQUE constraint failed" in error_message:
|
||||||
raise UniqueConstraintViolationError(
|
raise UniqueConstraintViolationError(
|
||||||
@@ -844,6 +854,11 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base):
|
|||||||
f"A foreign key constraint was violated for {cls.__name__}. Check your input for missing or invalid references: {e}"
|
f"A foreign key constraint was violated for {cls.__name__}. Check your input for missing or invalid references: {e}"
|
||||||
) from e
|
) from e
|
||||||
|
|
||||||
|
# Handle deadlock detected
|
||||||
|
if error_code == "40P01":
|
||||||
|
logger.error(f"Deadlock detected for {cls.__name__}: {e}")
|
||||||
|
raise DatabaseDeadlockError(message=f"A database deadlock was detected for {cls.__name__}.", original_exception=e) from e
|
||||||
|
|
||||||
# Re-raise for other unhandled DBAPI errors
|
# Re-raise for other unhandled DBAPI errors
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ from fastapi import FastAPI, Request
|
|||||||
from fastapi.exceptions import RequestValidationError
|
from fastapi.exceptions import RequestValidationError
|
||||||
from fastapi.responses import JSONResponse, ORJSONResponse
|
from fastapi.responses import JSONResponse, ORJSONResponse
|
||||||
from marshmallow import ValidationError
|
from marshmallow import ValidationError
|
||||||
from sqlalchemy.exc import IntegrityError, OperationalError
|
from sqlalchemy.exc import DBAPIError, IntegrityError, OperationalError
|
||||||
from starlette.middleware.cors import CORSMiddleware
|
from starlette.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
from letta.__init__ import __version__ as letta_version
|
from letta.__init__ import __version__ as letta_version
|
||||||
@@ -59,7 +59,13 @@ from letta.errors import (
|
|||||||
from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pinecone, upsert_pinecone_indices
|
from letta.helpers.pinecone_utils import get_pinecone_indices, should_use_pinecone, upsert_pinecone_indices
|
||||||
from letta.jobs.scheduler import start_scheduler_with_leader_election
|
from letta.jobs.scheduler import start_scheduler_with_leader_election
|
||||||
from letta.log import get_logger
|
from letta.log import get_logger
|
||||||
from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError
|
from letta.orm.errors import (
|
||||||
|
DatabaseDeadlockError,
|
||||||
|
DatabaseTimeoutError,
|
||||||
|
ForeignKeyConstraintViolationError,
|
||||||
|
NoResultFound,
|
||||||
|
UniqueConstraintViolationError,
|
||||||
|
)
|
||||||
from letta.otel.tracing import get_trace_id
|
from letta.otel.tracing import get_trace_id
|
||||||
from letta.schemas.letta_message import create_letta_error_message_schema, create_letta_message_union_schema
|
from letta.schemas.letta_message import create_letta_error_message_schema, create_letta_message_union_schema
|
||||||
from letta.schemas.letta_message_content import (
|
from letta.schemas.letta_message_content import (
|
||||||
@@ -547,6 +553,35 @@ def create_application() -> "FastAPI":
|
|||||||
app.add_exception_handler(LettaServiceUnavailableError, _error_handler_503)
|
app.add_exception_handler(LettaServiceUnavailableError, _error_handler_503)
|
||||||
app.add_exception_handler(LLMProviderOverloaded, _error_handler_503)
|
app.add_exception_handler(LLMProviderOverloaded, _error_handler_503)
|
||||||
|
|
||||||
|
@app.exception_handler(DatabaseDeadlockError)
|
||||||
|
async def database_deadlock_error_handler(request: Request, exc: DatabaseDeadlockError):
|
||||||
|
logger.error(f"Deadlock detected: {exc}. Original exception: {exc.original_exception}")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=409,
|
||||||
|
content={"detail": "A database deadlock was detected. Please retry your request."},
|
||||||
|
headers={"Retry-After": "1"},
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.exception_handler(DBAPIError)
|
||||||
|
async def dbapi_error_handler(request: Request, exc: DBAPIError):
|
||||||
|
from asyncpg.exceptions import DeadlockDetectedError
|
||||||
|
|
||||||
|
if isinstance(exc.orig, DeadlockDetectedError):
|
||||||
|
logger.error(f"Deadlock detected (DBAPIError wrapper): {exc}")
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=409,
|
||||||
|
content={"detail": "A database deadlock was detected. Please retry your request."},
|
||||||
|
headers={"Retry-After": "1"},
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.error(f"Unhandled DBAPIError: {exc}", exc_info=True)
|
||||||
|
if SENTRY_ENABLED:
|
||||||
|
sentry_sdk.capture_exception(exc)
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=500,
|
||||||
|
content={"detail": "A database error occurred."},
|
||||||
|
)
|
||||||
|
|
||||||
@app.exception_handler(IncompatibleAgentType)
|
@app.exception_handler(IncompatibleAgentType)
|
||||||
async def handle_incompatible_agent_type(request: Request, exc: IncompatibleAgentType):
|
async def handle_incompatible_agent_type(request: Request, exc: IncompatibleAgentType):
|
||||||
logger.error("Incompatible agent types. Expected: %s, Actual: %s", exc.expected_type, exc.actual_type)
|
logger.error("Incompatible agent types. Expected: %s, Actual: %s", exc.expected_type, exc.actual_type)
|
||||||
|
|||||||
Reference in New Issue
Block a user