From abb325f32dad3058ad2bcef2a527cfcfec67241a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 2 Jan 2026 10:58:24 -0500 Subject: [PATCH] fix: prevent ForeignKeyViolationError on messages table (#8202) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add validation in `create_many_messages_async` to check if run_ids exist before attempting to insert messages. If a run has been deleted (e.g., during cancellation or cleanup) while messages are still being created, the run_id is set to NULL instead of causing a ForeignKeyViolationError. This handles the race condition where: 1. A run is created and workflow is started 2. The run is deleted before the workflow's message creation completes 3. The message insert fails with FK violation Fixes #8201, #8119, #8091, #8025 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com> Co-authored-by: Letta --- letta/services/message_manager.py | 52 +++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/letta/services/message_manager.py b/letta/services/message_manager.py index 6f45f2ad..e420d85c 100644 --- a/letta/services/message_manager.py +++ b/letta/services/message_manager.py @@ -394,6 +394,31 @@ class MessageManager: orm_messages.append(MessageModel(**msg_data)) return orm_messages + @enforce_types + @trace_method + async def check_run_exists_async(self, run_id: str, actor: PydanticUser) -> bool: + """Check if a run exists in the database. + + Args: + run_id: The run ID to check + actor: User performing the action + + Returns: + True if the run exists, False otherwise + """ + if not run_id: + return False + + from letta.orm.run import Run as RunModel + + async with db_registry.async_session() as session: + query = select(RunModel.id).where( + RunModel.id == run_id, + RunModel.organization_id == actor.organization_id + ) + result = await session.execute(query) + return result.scalar_one_or_none() is not None + @enforce_types @trace_method async def check_existing_message_ids(self, message_ids: List[str], actor: PydanticUser) -> Set[str]: @@ -511,6 +536,33 @@ class MessageManager: media_type=content.source.media_type, detail=content.source.detail, ) + + # Validate run_ids exist before inserting to prevent ForeignKeyViolationError + # This handles the case where a run is deleted while messages are being created + unique_run_ids = {msg.run_id for msg in messages_to_create if msg.run_id} + if unique_run_ids: + from letta.orm.run import Run as RunModel + + async with db_registry.async_session() as session: + # Check which run_ids actually exist + query = select(RunModel.id).where( + RunModel.id.in_(unique_run_ids), + RunModel.organization_id == actor.organization_id + ) + result = await session.execute(query) + existing_run_ids = set(result.scalars().all()) + + # For any non-existent run_ids, set to None and log a warning + missing_run_ids = unique_run_ids - existing_run_ids + if missing_run_ids: + logger.warning( + f"Messages reference run_id(s) that don't exist: {missing_run_ids}. " + f"Setting run_id to None for affected messages to prevent ForeignKeyViolationError." + ) + for msg in messages_to_create: + if msg.run_id in missing_run_ids: + msg.run_id = None + orm_messages = self._create_many_preprocess(messages_to_create, actor) async with db_registry.async_session() as session: created_messages = await MessageModel.batch_create_async(orm_messages, session, actor=actor, no_commit=True, no_refresh=True)