feat: asyncify batch scheduler locking code (#3212)

This commit is contained in:
cthomas
2025-07-07 16:18:20 -07:00
committed by GitHub
parent a2e1efe6bc
commit b84a192b46

View File

@@ -4,10 +4,11 @@ from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy import text
from letta.jobs.llm_batch_job_polling import poll_running_llm_batches
from letta.log import get_logger
from letta.server.db import db_context
from letta.server.db import db_registry
from letta.server.server import SyncServer
from letta.settings import settings
@@ -16,68 +17,54 @@ scheduler = AsyncIOScheduler()
logger = get_logger(__name__)
ADVISORY_LOCK_KEY = 0x12345678ABCDEF00
_advisory_lock_conn = None # Holds the raw DB connection if leader
_advisory_lock_cur = None # Holds the cursor for the lock connection if leader
_advisory_lock_session = None # Holds the async session if leader
_lock_retry_task: Optional[asyncio.Task] = None # Background task handle for non-leaders
_is_scheduler_leader = False # Flag indicating if this instance runs the scheduler
async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
"""Attempts to acquire lock, starts scheduler if successful."""
global _advisory_lock_conn, _advisory_lock_cur, _is_scheduler_leader, scheduler
global _advisory_lock_session, _is_scheduler_leader, scheduler
if _is_scheduler_leader:
return True # Already leading
raw_conn = None
cur = None
lock_session = None
acquired_lock = False
try:
# Use a temporary connection context for the attempt initially
with db_context() as session:
async with db_registry.async_session() as session:
engine = session.get_bind()
engine_name = engine.name
logger.info(f"Database engine type: {engine_name}")
if engine_name != "postgresql":
logger.warning(f"Advisory locks not supported for {engine_name} database. Starting scheduler without leader election.")
acquired_lock = True # For SQLite, assume we can start the scheduler
acquired_lock = True
else:
# Get raw connection - MUST be kept open if lock is acquired
raw_conn = engine.raw_connection()
cur = raw_conn.cursor()
cur.execute("SELECT pg_try_advisory_lock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
acquired_lock = cur.fetchone()[0]
lock_session = db_registry.get_async_session_factory()()
result = await lock_session.execute(
text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY}
)
acquired_lock = result.scalar()
if not acquired_lock:
if cur:
cur.close()
if raw_conn:
raw_conn.close()
if lock_session:
await lock_session.close()
logger.info("Scheduler lock held by another instance.")
return False
# --- Lock Acquired ---
if engine_name == "postgresql":
logger.info("Acquired PostgreSQL advisory lock.")
_advisory_lock_conn = raw_conn # Keep connection for lock duration
_advisory_lock_cur = cur # Keep cursor for lock duration
raw_conn = None # Prevent closing in finally block
cur = None # Prevent closing in finally block
_advisory_lock_session = lock_session
lock_session = None
else:
logger.info("Starting scheduler for non-PostgreSQL database.")
# For SQLite, we don't need to keep the connection open
if cur:
cur.close()
if raw_conn:
raw_conn.close()
raw_conn = None
cur = None
if lock_session:
await lock_session.close()
lock_session = None
trigger = IntervalTrigger(
seconds=settings.poll_running_llm_batches_interval_seconds,
jitter=10, # Jitter for the job execution
jitter=10,
)
scheduler.add_job(
poll_running_llm_batches,
@@ -91,7 +78,7 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
if not scheduler.running:
scheduler.start()
elif scheduler.state == 2: # PAUSED
elif scheduler.state == 2:
scheduler.resume()
_is_scheduler_leader = True
@@ -99,38 +86,27 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
except Exception as e:
logger.error(f"Error during lock acquisition/scheduler start: {e}", exc_info=True)
if acquired_lock: # If lock was acquired before error, try to release
if acquired_lock:
logger.warning("Attempting to release lock due to error during startup.")
try:
# Use the cursor/connection we were about to store
_advisory_lock_cur = cur
_advisory_lock_conn = raw_conn
await _release_advisory_lock() # Attempt cleanup
_advisory_lock_session = lock_session
await _release_advisory_lock()
except Exception as unlock_err:
logger.error(f"Failed to release lock during error handling: {unlock_err}", exc_info=True)
finally:
# Ensure globals are cleared after failed attempt
_advisory_lock_cur = None
_advisory_lock_conn = None
_advisory_lock_session = None
_is_scheduler_leader = False
# Ensure scheduler is stopped if we failed partially
if scheduler.running:
try:
scheduler.shutdown(wait=False)
except:
pass # Best effort
pass
return False
finally:
# Clean up temporary resources if lock wasn't acquired or error occurred before storing
if cur:
if lock_session:
try:
cur.close()
except:
pass
if raw_conn:
try:
raw_conn.close()
await lock_session.close()
except:
pass
@@ -141,63 +117,50 @@ async def _background_lock_retry_loop(server: SyncServer):
logger.info("Starting background task to periodically check for scheduler lock.")
while True:
if _is_scheduler_leader: # Should be cancelled first, but safety check
if _is_scheduler_leader:
break
try:
wait_time = settings.poll_lock_retry_interval_seconds
await asyncio.sleep(wait_time)
# Re-check state before attempting lock
if _is_scheduler_leader or _lock_retry_task is None:
break # Stop if became leader or task was cancelled
break
acquired = await _try_acquire_lock_and_start_scheduler(server)
if acquired:
logger.info("Background task acquired lock and started scheduler.")
_lock_retry_task = None # Clear self handle
break # Exit loop, we are now the leader
_lock_retry_task = None
break
except asyncio.CancelledError:
logger.info("Background lock retry task cancelled.")
break
except Exception as e:
logger.error(f"Error in background lock retry loop: {e}", exc_info=True)
# Avoid tight loop on persistent errors
await asyncio.sleep(settings.poll_lock_retry_interval_seconds)
async def _release_advisory_lock():
"""Releases the advisory lock using the stored connection."""
global _advisory_lock_conn, _advisory_lock_cur
"""Releases the advisory lock using the stored session."""
global _advisory_lock_session
lock_cur = _advisory_lock_cur
lock_conn = _advisory_lock_conn
_advisory_lock_cur = None # Clear global immediately
_advisory_lock_conn = None # Clear global immediately
lock_session = _advisory_lock_session
_advisory_lock_session = None
if lock_cur is not None and lock_conn is not None:
if lock_session is not None:
logger.info(f"Attempting to release PostgreSQL advisory lock {ADVISORY_LOCK_KEY}")
try:
# Try to execute unlock - connection/cursor validity is checked by attempting the operation
lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
lock_cur.fetchone() # Consume result
lock_conn.commit()
await lock_session.execute(text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))"), {"lock_key": ADVISORY_LOCK_KEY})
logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}")
except Exception as e:
logger.error(f"Error executing pg_advisory_unlock: {e}", exc_info=True)
finally:
# Ensure resources are closed regardless of unlock success
try:
if lock_cur:
lock_cur.close()
if lock_session:
await lock_session.close()
logger.info("Closed database session that held advisory lock.")
except Exception as e:
logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True)
try:
if lock_conn:
lock_conn.close()
logger.info("Closed database connection that held advisory lock.")
except Exception as e:
logger.error(f"Error closing advisory lock connection: {e}", exc_info=True)
logger.error(f"Error closing advisory lock session: {e}", exc_info=True)
else:
logger.info("No PostgreSQL advisory lock to release (likely using SQLite or non-PostgreSQL database).")
@@ -220,7 +183,6 @@ async def start_scheduler_with_leader_election(server: SyncServer):
acquired_immediately = await _try_acquire_lock_and_start_scheduler(server)
if not acquired_immediately and _lock_retry_task is None:
# Failed initial attempt, start background retry task
loop = asyncio.get_running_loop()
_lock_retry_task = loop.create_task(_background_lock_retry_loop(server))
@@ -232,48 +194,40 @@ async def shutdown_scheduler_and_release_lock():
"""
global _is_scheduler_leader, _lock_retry_task, scheduler
# 1. Cancel retry task if running (for non-leaders)
if _lock_retry_task is not None:
logger.info("Shutting down: Cancelling background lock retry task.")
current_task = _lock_retry_task
_lock_retry_task = None # Clear handle first
_lock_retry_task = None
current_task.cancel()
try:
await current_task # Wait for cancellation
await current_task
except asyncio.CancelledError:
logger.info("Background lock retry task successfully cancelled.")
except Exception as e:
logger.warning(f"Exception waiting for cancelled retry task: {e}", exc_info=True)
# 2. Shutdown scheduler and release lock if we were the leader
if _is_scheduler_leader:
logger.info("Shutting down: Leader instance stopping scheduler and releasing lock.")
if scheduler.running:
try:
# Force synchronous shutdown to prevent callback scheduling
scheduler.shutdown(wait=True)
# wait for any internal cleanup to complete
await asyncio.sleep(0.1)
logger.info("APScheduler shut down.")
except Exception as e:
# Handle SchedulerNotRunningError and other shutdown exceptions
logger.warning(f"Exception during APScheduler shutdown: {e}")
if "not running" not in str(e).lower():
logger.error(f"Unexpected error shutting down APScheduler: {e}", exc_info=True)
await _release_advisory_lock()
_is_scheduler_leader = False # Update state after cleanup
_is_scheduler_leader = False
else:
logger.info("Shutting down: Non-leader instance.")
# Final cleanup check for scheduler state (belt and suspenders)
# This should rarely be needed if shutdown logic above worked correctly
try:
if scheduler.running:
logger.warning("Scheduler still running after shutdown logic completed? Forcing shutdown.")
scheduler.shutdown(wait=False)
except Exception as e:
# Catch SchedulerNotRunningError and other shutdown exceptions
logger.debug(f"Expected exception during final scheduler cleanup: {e}")