From 71e0a8aab951aed7e2f29ac2815c28c6e89677d5 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Sun, 8 Feb 2026 21:25:51 -0800 Subject: [PATCH] fix(core): use INSERT ON CONFLICT DO NOTHING for provider model sync (#9342) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(core): use INSERT ON CONFLICT DO NOTHING for provider model sync Replaces try/except around model.create_async() with pg_insert() .on_conflict_do_nothing() to prevent UniqueViolationError from being raised at the asyncpg driver level during concurrent model syncs. The previous approach caught the exception in Python but ddtrace still captured it at the driver level, causing Datadog error tracking noise. Fixes Datadog issue d8dec148-d535-11f0-95eb-da7ad0900000 🐾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * cleaner impl * fix --------- Co-authored-by: Letta Co-authored-by: Ari Webb --- letta/orm/sqlalchemy_base.py | 24 +++++++++++++++++++-- letta/services/provider_manager.py | 34 +++++++++--------------------- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/letta/orm/sqlalchemy_base.py b/letta/orm/sqlalchemy_base.py index 500fdd19..1ae32249 100644 --- a/letta/orm/sqlalchemy_base.py +++ b/letta/orm/sqlalchemy_base.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, List, Literal, Optional, Tuple, Union from asyncpg.exceptions import DeadlockDetectedError, QueryCanceledError from sqlalchemy import Sequence, String, and_, delete, func, or_, select +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.exc import DBAPIError, IntegrityError, TimeoutError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, Session, mapped_column @@ -544,12 +545,31 @@ class SqlalchemyBase(CommonSqlalchemyMetaMixins, Base): actor: Optional["User"] = None, no_commit: bool = False, no_refresh: bool = False, - ) -> "SqlalchemyBase": - """Async version of create function""" + ignore_conflicts: bool = False, + ) -> Optional["SqlalchemyBase"]: + """Async version of create function + + Args: + ignore_conflicts: If True, uses INSERT ... ON CONFLICT DO NOTHING and returns + None if a conflict occurred (no exception raised). + """ logger.debug(f"Creating {self.__class__.__name__} with ID: {self.id} with actor={actor}") if actor: self._set_created_and_updated_by_fields(actor.id) + + if ignore_conflicts: + values = { + col.name: getattr(self, col.key) + for col in self.__table__.columns + if not (getattr(self, col.key) is None and col.server_default is not None) + } + stmt = pg_insert(self.__table__).values(**values).on_conflict_do_nothing() + result = await db_session.execute(stmt) + if not no_commit: + await db_session.commit() + return self if result.rowcount > 0 else None + for attempt in range(_DEADLOCK_MAX_RETRIES): try: db_session.add(self) diff --git a/letta/services/provider_manager.py b/letta/services/provider_manager.py index 52b46495..2f66d140 100644 --- a/letta/services/provider_manager.py +++ b/letta/services/provider_manager.py @@ -723,19 +723,12 @@ class ProviderManager: f"org_id={pydantic_model.organization_id}" ) - # Convert to ORM model = ProviderModelORM(**pydantic_model.model_dump(to_orm=True)) - try: - await model.create_async(session) - logger.info(f" ✓ Successfully created LLM model {llm_config.handle} with ID {model.id}") - except Exception as e: - logger.info(f" ✗ Failed to create LLM model {llm_config.handle}: {e}") - # Log the full error details - import traceback - - logger.info(f" Full traceback: {traceback.format_exc()}") - # Roll back the session to clear the failed transaction - await session.rollback() + result = await model.create_async(session, ignore_conflicts=True) + if result: + logger.info(f" ✓ Successfully created LLM model {llm_config.handle}") + else: + logger.info(f" LLM model {llm_config.handle} already exists (concurrent insert), skipping") else: # Check if max_context_window or model_endpoint_type needs to be updated existing_model = existing[0] @@ -813,19 +806,12 @@ class ProviderManager: f"org_id={pydantic_model.organization_id}" ) - # Convert to ORM model = ProviderModelORM(**pydantic_model.model_dump(to_orm=True)) - try: - await model.create_async(session) - logger.info(f" ✓ Successfully created embedding model {embedding_config.handle} with ID {model.id}") - except Exception as e: - logger.error(f" ✗ Failed to create embedding model {embedding_config.handle}: {e}") - # Log the full error details - import traceback - - logger.error(f" Full traceback: {traceback.format_exc()}") - # Roll back the session to clear the failed transaction - await session.rollback() + result = await model.create_async(session, ignore_conflicts=True) + if result: + logger.info(f" ✓ Successfully created embedding model {embedding_config.handle}") + else: + logger.info(f" Embedding model {embedding_config.handle} already exists (concurrent insert), skipping") else: # Check if model_endpoint_type needs to be updated existing_model = existing[0]