fix: batch poll scheduler (#2366)
This commit is contained in:
@@ -4,6 +4,7 @@ from typing import Optional
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from sqlalchemy import text
|
||||
|
||||
from letta.jobs.llm_batch_job_polling import poll_running_llm_batches
|
||||
from letta.log import get_logger
|
||||
@@ -38,7 +39,8 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
|
||||
raw_conn = await session.connection()
|
||||
|
||||
# Try to acquire the advisory lock
|
||||
result = await session.execute(f"SELECT pg_try_advisory_lock(CAST({ADVISORY_LOCK_KEY} AS bigint))")
|
||||
sql = text("SELECT pg_try_advisory_lock(CAST(:lock_key AS bigint))")
|
||||
result = await session.execute(sql, {"lock_key": ADVISORY_LOCK_KEY})
|
||||
acquired_lock = result.scalar_one()
|
||||
|
||||
if not acquired_lock:
|
||||
@@ -103,14 +105,14 @@ async def _try_acquire_lock_and_start_scheduler(server: SyncServer) -> bool:
|
||||
# Clean up temporary resources if lock wasn't acquired or error occurred before storing
|
||||
if cur:
|
||||
try:
|
||||
cur.close()
|
||||
except:
|
||||
pass
|
||||
await cur.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing cursor: {e}")
|
||||
if raw_conn:
|
||||
try:
|
||||
raw_conn.close()
|
||||
except:
|
||||
pass
|
||||
await raw_conn.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing connection: {e}")
|
||||
|
||||
|
||||
async def _background_lock_retry_loop(server: SyncServer):
|
||||
@@ -158,7 +160,9 @@ async def _release_advisory_lock():
|
||||
try:
|
||||
if not lock_conn.closed:
|
||||
if not lock_cur.closed:
|
||||
lock_cur.execute("SELECT pg_advisory_unlock(CAST(%s AS bigint))", (ADVISORY_LOCK_KEY,))
|
||||
# Use SQLAlchemy text() for raw SQL
|
||||
unlock_sql = text("SELECT pg_advisory_unlock(CAST(:lock_key AS bigint))")
|
||||
lock_cur.execute(unlock_sql, {"lock_key": ADVISORY_LOCK_KEY})
|
||||
lock_cur.fetchone() # Consume result
|
||||
lock_conn.commit()
|
||||
logger.info(f"Executed pg_advisory_unlock for lock {ADVISORY_LOCK_KEY}")
|
||||
@@ -172,12 +176,12 @@ async def _release_advisory_lock():
|
||||
# Ensure resources are closed regardless of unlock success
|
||||
try:
|
||||
if lock_cur and not lock_cur.closed:
|
||||
lock_cur.close()
|
||||
await lock_cur.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing advisory lock cursor: {e}", exc_info=True)
|
||||
try:
|
||||
if lock_conn and not lock_conn.closed:
|
||||
lock_conn.close()
|
||||
await lock_conn.close()
|
||||
logger.info("Closed database connection that held advisory lock.")
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing advisory lock connection: {e}", exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user