diff --git a/letta/jobs/scheduler.py b/letta/jobs/scheduler.py index b453aea9..056167fa 100644 --- a/letta/jobs/scheduler.py +++ b/letta/jobs/scheduler.py @@ -4,10 +4,11 @@ from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger +from sqlalchemy import text from letta.jobs.llm_batch_job_polling import poll_running_llm_batches from letta.log import get_logger -from letta.server.db import db_context +from letta.server.db import db_registry from letta.server.server import SyncServer from letta.settings import settings @@ -16,68 +17,54 @@ scheduler = AsyncIOScheduler() logger = get_logger(__name__) ADVISORY_LOCK_KEY = 0x12345678ABCDEF00 -_advisory_lock_conn = None # Holds the raw DB connection if leader -_advisory_lock_cur = None # Holds the cursor for the lock connection if leader +_advisory_lock_session = None # Holds the async session if leader _lock_retry_task: Optional[asyncio.Task] = None # Background task handle for non-leaders _is_scheduler_leader = False # Flag indicating if this instance runs the scheduler async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: """Attempts to acquire lock, starts scheduler if successful.""" - global _advisory_lock_conn, _advisory_lock_cur, _is_scheduler_leader, scheduler + global _advisory_lock_session, _is_scheduler_leader, scheduler if _is_scheduler_leader: return True # Already leading - raw_conn = None - cur = None + lock_session = None acquired_lock = False try: - # Use a temporary connection context for the attempt initially - with db_context() as session: + async with db_registry.async_session() as session: engine = session.get_bind() engine_name = engine.name logger.info(f"Database engine type: {engine_name}") - if engine_name != "postgresql": logger.warning(f"Advisory locks not supported for {engine_name} database. Starting scheduler without leader election.") - acquired_lock = True # For SQLite, assume we can start the scheduler + acquired_lock = True else: - # Get raw connection - MUST be kept open if lock is acquired - raw_conn = engine.raw_connection() - cur = raw_conn.cursor() - - cur.execute("SELECT pg_try_advisory_lock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,)) - acquired_lock = cur.fetchone()[0] + lock_session = db_registry.get_async_session_factory()() + result = await lock_session.execute( + text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY} + ) + acquired_lock = result.scalar() if not acquired_lock: - if cur: - cur.close() - if raw_conn: - raw_conn.close() + if lock_session: + await lock_session.close() logger.info("Scheduler lock held by another instance.") return False - # --- Lock Acquired --- if engine_name == "postgresql": logger.info("Acquired PostgreSQL advisory lock.") - _advisory_lock_conn = raw_conn # Keep connection for lock duration - _advisory_lock_cur = cur # Keep cursor for lock duration - raw_conn = None # Prevent closing in finally block - cur = None # Prevent closing in finally block + _advisory_lock_session = lock_session + lock_session = None else: logger.info("Starting scheduler for non-PostgreSQL database.") - # For SQLite, we don't need to keep the connection open - if cur: - cur.close() - if raw_conn: - raw_conn.close() - raw_conn = None - cur = None + if lock_session: + await lock_session.close() + lock_session = None trigger = IntervalTrigger( seconds=settings.poll_running_llm_batches_interval_seconds, - jitter=10, # Jitter for the job execution + jitter=10, ) scheduler.add_job( poll_running_llm_batches, @@ -91,7 +78,7 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: if not scheduler.running: scheduler.start() - elif scheduler.state == 2: # PAUSED + elif scheduler.state == 2: scheduler.resume() _is_scheduler_leader = True @@ -99,38 +86,27 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool: except Exception as e: logger.error(f"Error during lock acquisition/scheduler start: {e}", exc_info=True) - if acquired_lock: # If lock was acquired before error, try to release + if acquired_lock: logger.warning("Attempting to release lock due to error during startup.") try: - # Use the cursor/connection we were about to store - _advisory_lock_cur = cur - _advisory_lock_conn = raw_conn - await _release_advisory_lock() # Attempt cleanup + _advisory_lock_session = lock_session + await _release_advisory_lock() except Exception as unlock_err: logger.error(f"Failed to release lock during error handling: {unlock_err}", exc_info=True) finally: - # Ensure globals are cleared after failed attempt - _advisory_lock_cur = None - _advisory_lock_conn = None + _advisory_lock_session = None _is_scheduler_leader = False - # Ensure scheduler is stopped if we failed partially if scheduler.running: try: scheduler.shutdown(wait=False) except: - pass # Best effort + pass return False finally: - # Clean up temporary resources if lock wasn't acquired or error occurred before storing - if cur: + if lock_session: try: - cur.close() - except: - pass - if raw_conn: - try: - raw_conn.close() + await lock_session.close() except: pass @@ -141,63 +117,50 @@ async def _background_lock_retry_loop(server: SyncServer): logger.info("Starting background task to periodically check for scheduler lock.") while True: - if _is_scheduler_leader: # Should be cancelled first, but safety check + if _is_scheduler_leader: break try: wait_time = settings.poll_lock_retry_interval_seconds await asyncio.sleep(wait_time) - # Re-check state before attempting lock if _is_scheduler_leader or _lock_retry_task is None: - break # Stop if became leader or task was cancelled + break acquired = await _try_acquire_lock_and_start_scheduler(server) if acquired: logger.info("Background task acquired lock and started scheduler.") - _lock_retry_task = None # Clear self handle - break # Exit loop, we are now the leader + _lock_retry_task = None + break except asyncio.CancelledError: logger.info("Background lock retry task cancelled.") break except Exception as e: logger.error(f"Error in background lock retry loop: {e}", exc_info=True) - # Avoid tight loop on persistent errors await asyncio.sleep(settings.poll_lock_retry_interval_seconds) async def _release_advisory_lock(): - """Releases the advisory lock using the stored connection.""" - global _advisory_lock_conn, _advisory_lock_cur + """Releases the advisory lock using the stored session.""" + global _advisory_lock_session - lock_cur = _advisory_lock_cur - lock_conn = _advisory_lock_conn - _advisory_lock_cur = None # Clear global immediately - _advisory_lock_conn = None # Clear global immediately + lock_session = _advisory_lock_session + _advisory_lock_session = None - if lock_cur is not None and lock_conn is not None: + if lock_session is not None: logger.info(f"Attempting to release PostgreSQL advisory lock {ADVISORY_LOCK_KEY}") try: - # Try to execute unlock - connection/cursor validity is checked by attempting the operation - lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,)) - lock_cur.fetchone() # Consume result - lock_conn.commit() + await lock_session.execute(text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY}) logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}") except Exception as e: logger.error(f"Error executing pg_advisory_unlock: {e}", exc_info=True) finally: - # Ensure resources are closed regardless of unlock success try: - if lock_cur: - lock_cur.close() + if lock_session: + await lock_session.close() + logger.info("Closed database session that held advisory lock.") except Exception as e: - logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True) - try: - if lock_conn: - lock_conn.close() - logger.info("Closed database connection that held advisory lock.") - except Exception as e: - logger.error(f"Error closing advisory lock connection: {e}", exc_info=True) + logger.error(f"Error closing advisory lock session: {e}", exc_info=True) else: logger.info("No PostgreSQL advisory lock to release (likely using SQLite or non-PostgreSQL database).") @@ -220,7 +183,6 @@ async def start_scheduler_with_leader_election(server: SyncServer): acquired_immediately = await _try_acquire_lock_and_start_scheduler(server) if not acquired_immediately and _lock_retry_task is None: - # Failed initial attempt, start background retry task loop = asyncio.get_running_loop() _lock_retry_task = loop.create_task(_background_lock_retry_loop(server)) @@ -232,48 +194,40 @@ async def shutdown_scheduler_and_release_lock(): """ global _is_scheduler_leader, _lock_retry_task, scheduler - # 1. Cancel retry task if running (for non-leaders) if _lock_retry_task is not None: logger.info("Shutting down: Cancelling background lock retry task.") current_task = _lock_retry_task - _lock_retry_task = None # Clear handle first + _lock_retry_task = None current_task.cancel() try: - await current_task # Wait for cancellation + await current_task except asyncio.CancelledError: logger.info("Background lock retry task successfully cancelled.") except Exception as e: logger.warning(f"Exception waiting for cancelled retry task: {e}", exc_info=True) - # 2. Shutdown scheduler and release lock if we were the leader if _is_scheduler_leader: logger.info("Shutting down: Leader instance stopping scheduler and releasing lock.") if scheduler.running: try: - # Force synchronous shutdown to prevent callback scheduling scheduler.shutdown(wait=True) - # wait for any internal cleanup to complete await asyncio.sleep(0.1) logger.info("APScheduler shut down.") except Exception as e: - # Handle SchedulerNotRunningError and other shutdown exceptions logger.warning(f"Exception during APScheduler shutdown: {e}") if "not running" not in str(e).lower(): logger.error(f"Unexpected error shutting down APScheduler: {e}", exc_info=True) await _release_advisory_lock() - _is_scheduler_leader = False # Update state after cleanup + _is_scheduler_leader = False else: logger.info("Shutting down: Non-leader instance.") - # Final cleanup check for scheduler state (belt and suspenders) - # This should rarely be needed if shutdown logic above worked correctly try: if scheduler.running: logger.warning("Scheduler still running after shutdown logic completed? Forcing shutdown.") scheduler.shutdown(wait=False) except Exception as e: - # Catch SchedulerNotRunningError and other shutdown exceptions logger.debug(f"Expected exception during final scheduler cleanup: {e}")