diff --git a/letta/jobs/scheduler.py b/letta/jobs/scheduler.py index 7a6b105f..80999a5d 100644 --- a/letta/jobs/scheduler.py +++ b/letta/jobs/scheduler.py @@ -4,6 +4,7 @@ from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger +from sqlalchemy import text from letta.jobs.llm_batch_job_polling import poll_running_llm_batches from letta.log import get_logger @@ -38,7 +39,8 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: raw_conn = await session.connection() # Try to acquire the advisory lock - result = await session.execute(f"SELECT pg_try_advisory_lock(CAST({ADVISORY_LOCK_KEY} AS bigint))") + sql = text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))") + result = await session.execute(sql, {"lock_key": ADVISORY_LOCK_KEY}) acquired_lock = result.scalar_one() if not acquired_lock: @@ -103,14 +105,14 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: # Clean up temporary resources if lock wasn't acquired or error occurred before storing if cur: try: - cur.close() - except: - pass + await cur.close() + except Exception as e: + logger.warning(f"Error closing cursor: {e}") if raw_conn: try: - raw_conn.close() - except: - pass + await raw_conn.close() + except Exception as e: + logger.warning(f"Error closing connection: {e}") async def _background_lock_retry_loop(server: SyncServer): @@ -158,7 +160,9 @@ async def _release_advisory_lock(): try: if not lock_conn.closed: if not lock_cur.closed: - lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,)) + # Use SQLAlchemy text() for raw SQL + unlock_sql = text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))") + lock_cur.execute(unlock_sql, {"lock_key": ADVISORY_LOCK_KEY}) lock_cur.fetchone() # Consume result lock_conn.commit() logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}") @@ -172,12 +176,12 @@ async def _release_advisory_lock(): # Ensure resources are closed regardless of unlock success try: if lock_cur and not lock_cur.closed: - lock_cur.close() + await lock_cur.close() except Exception as e: logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True) try: if lock_conn and not lock_conn.closed: - lock_conn.close() + await lock_conn.close() logger.info("Closed database connection that held advisory lock.") except Exception as e: logger.error(f"Error closing advisory lock connection: {e}", exc_info=True)