diff --git a/letta/services/message_manager.py b/letta/services/message_manager.py index 6f45f2ad..e420d85c 100644 --- a/letta/services/message_manager.py +++ b/letta/services/message_manager.py @@ -394,6 +394,31 @@ class MessageManager: orm_messages.append(MessageModel(**msg_data)) return orm_messages + @enforce_types + @trace_method + async def check_run_exists_async(self, run_id: str, actor: PydanticUser) -> bool: + """Check if a run exists in the database. + + Args: + run_id: The run ID to check + actor: User performing the action + + Returns: + True if the run exists, False otherwise + """ + if not run_id: + return False + + from letta.orm.run import Run as RunModel + + async with db_registry.async_session() as session: + query = select(RunModel.id).where( + RunModel.id == run_id, + RunModel.organization_id == actor.organization_id + ) + result = await session.execute(query) + return result.scalar_one_or_none() is not None + @enforce_types @trace_method async def check_existing_message_ids(self, message_ids: List[str], actor: PydanticUser) -> Set[str]: @@ -511,6 +536,33 @@ class MessageManager: media_type=content.source.media_type, detail=content.source.detail, ) + + # Validate run_ids exist before inserting to prevent ForeignKeyViolationError + # This handles the case where a run is deleted while messages are being created + unique_run_ids = {msg.run_id for msg in messages_to_create if msg.run_id} + if unique_run_ids: + from letta.orm.run import Run as RunModel + + async with db_registry.async_session() as session: + # Check which run_ids actually exist + query = select(RunModel.id).where( + RunModel.id.in_(unique_run_ids), + RunModel.organization_id == actor.organization_id + ) + result = await session.execute(query) + existing_run_ids = set(result.scalars().all()) + + # For any non-existent run_ids, set to None and log a warning + missing_run_ids = unique_run_ids - existing_run_ids + if missing_run_ids: + logger.warning( + f"Messages reference run_id(s) that don't exist: {missing_run_ids}. " + f"Setting run_id to None for affected messages to prevent ForeignKeyViolationError." + ) + for msg in messages_to_create: + if msg.run_id in missing_run_ids: + msg.run_id = None + orm_messages = self._create_many_preprocess(messages_to_create, actor) async with db_registry.async_session() as session: created_messages = await MessageModel.batch_create_async(orm_messages, session, actor=actor, no_commit=True, no_refresh=True)