fix: concurrent block update rollback [LET-6695] (#8133)
fix: concurrent block update rollback
This commit is contained in:
@@ -64,6 +64,15 @@ class NoActiveRunsToCancelError(LettaError):
|
||||
super().__init__(message=message, code=ErrorCode.CONFLICT, details=details)
|
||||
|
||||
|
||||
class ConcurrentUpdateError(LettaError):
|
||||
"""Error raised when a resource was updated by another transaction (optimistic locking conflict)."""
|
||||
|
||||
def __init__(self, resource_type: str, resource_id: str):
|
||||
message = f"{resource_type} with id '{resource_id}' was updated by another transaction. Please retry your request."
|
||||
details = {"error_code": "CONCURRENT_UPDATE", "resource_type": resource_type, "resource_id": resource_id}
|
||||
super().__init__(message=message, code=ErrorCode.CONFLICT, details=details)
|
||||
|
||||
|
||||
class LettaToolCreateError(LettaError):
|
||||
"""Error raised when a tool cannot be created."""
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from sqlalchemy.orm import Mapped, Session, mapped_column
|
||||
from sqlalchemy.orm.exc import StaleDataError
|
||||
from sqlalchemy.orm.interfaces import ORMOption
|
||||
|
||||
from letta.errors import ConcurrentUpdateError
|
||||
from letta.log import get_logger
|
||||
from letta.orm.base import Base, CommonSqlalchemyMetaMixins
|
||||
from letta.orm.errors import DatabaseTimeoutError, ForeignKeyConstraintViolationError, NoResultFound, UniqueConstraintViolationError
|
||||
@@ -619,6 +620,11 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base):
|
||||
if actor:
|
||||
self._set_created_and_updated_by_fields(actor.id)
|
||||
self.set_updated_at()
|
||||
|
||||
# Capture id before try block to avoid accessing expired attributes after rollback
|
||||
object_id = self.id
|
||||
class_name = self.__class__.__name__
|
||||
|
||||
try:
|
||||
db_session.add(self)
|
||||
if no_commit:
|
||||
@@ -631,10 +637,17 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base):
|
||||
return self
|
||||
except StaleDataError as e:
|
||||
# This can occur when using optimistic locking (version_id_col) and:
|
||||
# 1. The row doesn't exist (0 rows matched)
|
||||
# 2. The version has changed (concurrent update)
|
||||
# We convert this to NoResultFound to return a proper 404 error
|
||||
raise NoResultFound(f"{self.__class__.__name__} with id '{self.id}' not found or was updated by another transaction") from e
|
||||
# 1. The row doesn't exist (0 rows matched) - return 404
|
||||
# 2. The version has changed (concurrent update) - return 409
|
||||
|
||||
# Check if the row still exists to distinguish between the two cases
|
||||
result = await db_session.execute(select(self.__class__).where(self.__class__.id == object_id))
|
||||
if result.scalar_one_or_none() is None:
|
||||
# Row was deleted - return 404
|
||||
raise NoResultFound(f"{class_name} with id '{object_id}' not found") from e
|
||||
|
||||
# Row exists but version changed (concurrent update) - return 409
|
||||
raise ConcurrentUpdateError(resource_type=class_name, resource_id=object_id) from e
|
||||
except (DBAPIError, IntegrityError) as e:
|
||||
self._handle_dbapi_error(e)
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ from letta.errors import (
|
||||
AgentFileImportError,
|
||||
AgentNotFoundForExportError,
|
||||
BedrockPermissionError,
|
||||
ConcurrentUpdateError,
|
||||
EmbeddingConfigRequiredError,
|
||||
HandleNotFoundError,
|
||||
LettaAgentNotFoundError,
|
||||
@@ -493,6 +494,7 @@ def create_application() -> "FastAPI":
|
||||
app.add_exception_handler(ForeignKeyConstraintViolationError, _error_handler_409)
|
||||
app.add_exception_handler(UniqueConstraintViolationError, _error_handler_409)
|
||||
app.add_exception_handler(IntegrityError, _error_handler_409)
|
||||
app.add_exception_handler(ConcurrentUpdateError, _error_handler_409)
|
||||
app.add_exception_handler(PendingApprovalError, _error_handler_409)
|
||||
app.add_exception_handler(NoActiveRunsToCancelError, _error_handler_409)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user