fix: prevent ForeignKeyViolationError on messages table (#8202)
Add validation in `create_many_messages_async` to check if run_ids exist before attempting to insert messages. If a run has been deleted (e.g., during cancellation or cleanup) while messages are still being created, the run_id is set to NULL instead of causing a ForeignKeyViolationError. This handles the race condition where: 1. A run is created and workflow is started 2. The run is deleted before the workflow's message creation completes 3. The message insert fails with FK violation Fixes #8201, #8119, #8091, #8025 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com> Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
committed by
Caren Thomas
parent
8f4074d2fc
commit
abb325f32d
@@ -394,6 +394,31 @@ class MessageManager:
|
||||
orm_messages.append(MessageModel(**msg_data))
|
||||
return orm_messages
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def check_run_exists_async(self, run_id: str, actor: PydanticUser) -> bool:
|
||||
"""Check if a run exists in the database.
|
||||
|
||||
Args:
|
||||
run_id: The run ID to check
|
||||
actor: User performing the action
|
||||
|
||||
Returns:
|
||||
True if the run exists, False otherwise
|
||||
"""
|
||||
if not run_id:
|
||||
return False
|
||||
|
||||
from letta.orm.run import Run as RunModel
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
query = select(RunModel.id).where(
|
||||
RunModel.id == run_id,
|
||||
RunModel.organization_id == actor.organization_id
|
||||
)
|
||||
result = await session.execute(query)
|
||||
return result.scalar_one_or_none() is not None
|
||||
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def check_existing_message_ids(self, message_ids: List[str], actor: PydanticUser) -> Set[str]:
|
||||
@@ -511,6 +536,33 @@ class MessageManager:
|
||||
media_type=content.source.media_type,
|
||||
detail=content.source.detail,
|
||||
)
|
||||
|
||||
# Validate run_ids exist before inserting to prevent ForeignKeyViolationError
|
||||
# This handles the case where a run is deleted while messages are being created
|
||||
unique_run_ids = {msg.run_id for msg in messages_to_create if msg.run_id}
|
||||
if unique_run_ids:
|
||||
from letta.orm.run import Run as RunModel
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
# Check which run_ids actually exist
|
||||
query = select(RunModel.id).where(
|
||||
RunModel.id.in_(unique_run_ids),
|
||||
RunModel.organization_id == actor.organization_id
|
||||
)
|
||||
result = await session.execute(query)
|
||||
existing_run_ids = set(result.scalars().all())
|
||||
|
||||
# For any non-existent run_ids, set to None and log a warning
|
||||
missing_run_ids = unique_run_ids - existing_run_ids
|
||||
if missing_run_ids:
|
||||
logger.warning(
|
||||
f"Messages reference run_id(s) that don't exist: {missing_run_ids}. "
|
||||
f"Setting run_id to None for affected messages to prevent ForeignKeyViolationError."
|
||||
)
|
||||
for msg in messages_to_create:
|
||||
if msg.run_id in missing_run_ids:
|
||||
msg.run_id = None
|
||||
|
||||
orm_messages = self._create_many_preprocess(messages_to_create, actor)
|
||||
async with db_registry.async_session() as session:
|
||||
created_messages = await MessageModel.batch_create_async(orm_messages, session, actor=actor, no_commit=True, no_refresh=True)
|
||||
|
||||
Reference in New Issue
Block a user