From 71bce718f729074ab4531212dce0af72b15a1730 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Mon, 17 Nov 2025 12:01:50 -0800 Subject: [PATCH] Add lightweight event loop watchdog monitoring (#6209) * Add lightweight event loop watchdog monitoring - Thread-based watchdog detects event loop hangs >15s - Runs independently, won't interfere with normal operation - Disabled in test environments - Minimal overhead, just heartbeat checks every 5s * actually test it * Add test script to validate watchdog detects hangs Run with: uv run python test_watchdog_hang.py Tests: - Normal operation (no false positives) - Short blocks under threshold (no alerts) - Long blocks over threshold (correctly alerts) --- letta/monitoring/event_loop_watchdog.py | 141 ++++++++++++++++++++++++ letta/server/rest_api/app.py | 13 +++ test_watchdog_hang.py | 97 ++++++++++++++++ 3 files changed, 251 insertions(+) create mode 100644 letta/monitoring/event_loop_watchdog.py create mode 100644 test_watchdog_hang.py diff --git a/letta/monitoring/event_loop_watchdog.py b/letta/monitoring/event_loop_watchdog.py new file mode 100644 index 00000000..a16f7a84 --- /dev/null +++ b/letta/monitoring/event_loop_watchdog.py @@ -0,0 +1,141 @@ +""" +Lightweight thread-based watchdog to detect event loop hangs. +Runs independently and won't interfere with tests or normal operation. +""" + +import asyncio +import threading +import time +import traceback +from typing import Optional + +from letta.log import get_logger + +logger = get_logger(__name__) + + +class EventLoopWatchdog: + """ + Minimal watchdog that monitors event loop health from a separate thread. + Detects complete event loop freezes that would cause health check failures. + """ + + def __init__(self, check_interval: float = 5.0, timeout_threshold: float = 15.0): + """ + Args: + check_interval: How often to check (seconds) + timeout_threshold: Threshold for hang detection (seconds) + """ + self.check_interval = check_interval + self.timeout_threshold = timeout_threshold + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._last_heartbeat = time.time() + self._heartbeat_lock = threading.Lock() + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._monitoring = False + + def start(self, loop: asyncio.AbstractEventLoop): + """Start the watchdog thread.""" + if self._monitoring: + return + + self._loop = loop + self._monitoring = True + self._stop_event.clear() + self._last_heartbeat = time.time() + + self._thread = threading.Thread(target=self._watch_loop, daemon=True, name="EventLoopWatchdog") + self._thread.start() + + # Schedule periodic heartbeats on the event loop + loop.call_soon(self._schedule_heartbeats) + + logger.info(f"Watchdog started (timeout: {self.timeout_threshold}s)") + + def stop(self): + """Stop the watchdog thread.""" + self._monitoring = False + self._stop_event.set() + if self._thread: + self._thread.join(timeout=2) + logger.info("Watchdog stopped") + + def _schedule_heartbeats(self): + """Schedule periodic heartbeat updates on the event loop.""" + if not self._monitoring: + return + + with self._heartbeat_lock: + self._last_heartbeat = time.time() + + if self._loop and self._monitoring: + self._loop.call_later(1.0, self._schedule_heartbeats) + + def _watch_loop(self): + """Main watchdog loop running in separate thread.""" + consecutive_hangs = 0 + + while not self._stop_event.is_set(): + try: + time.sleep(self.check_interval) + + with self._heartbeat_lock: + last_beat = self._last_heartbeat + + time_since_heartbeat = time.time() - last_beat + + if time_since_heartbeat > self.timeout_threshold: + consecutive_hangs += 1 + logger.error( + f"EVENT LOOP HANG DETECTED! No heartbeat for {time_since_heartbeat:.1f}s (threshold: {self.timeout_threshold}s)" + ) + + # Dump basic state + self._dump_state() + + if consecutive_hangs >= 2: + logger.critical(f"Event loop appears frozen ({consecutive_hangs} consecutive hangs)") + else: + if consecutive_hangs > 0: + logger.info("Event loop recovered") + consecutive_hangs = 0 + + except Exception as e: + logger.error(f"Watchdog error: {e}") + + def _dump_state(self): + """Dump minimal state when hang detected.""" + try: + # Get all threads + logger.error(f"Active threads: {threading.active_count()}") + for thread in threading.enumerate(): + logger.error(f" {thread.name} (daemon={thread.daemon})") + + except Exception as e: + logger.error(f"Failed to dump state: {e}") + + +_global_watchdog: Optional[EventLoopWatchdog] = None + + +def get_watchdog() -> Optional[EventLoopWatchdog]: + """Get the global watchdog instance.""" + return _global_watchdog + + +def start_watchdog(loop: asyncio.AbstractEventLoop, check_interval: float = 5.0, timeout_threshold: float = 15.0): + """Start the global watchdog.""" + global _global_watchdog + if _global_watchdog is None: + _global_watchdog = EventLoopWatchdog(check_interval=check_interval, timeout_threshold=timeout_threshold) + _global_watchdog.start(loop) + return _global_watchdog + + +def stop_watchdog(): + """Stop the global watchdog.""" + global _global_watchdog + if _global_watchdog: + _global_watchdog.stop() + _global_watchdog = None diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index d944a5ee..459a07c3 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -5,6 +5,7 @@ import logging import os import platform import sys +import threading from contextlib import asynccontextmanager from functools import partial from pathlib import Path @@ -150,6 +151,18 @@ async def lifespan(app_: FastAPI): await tracker.start_background_monitor() logger.info(f"[Worker {worker_id}] Memory tracking enabled - monitoring every 5s with proactive alerts") + # Initialize event loop watchdog + try: + import asyncio + + from letta.monitoring.event_loop_watchdog import start_watchdog + + loop = asyncio.get_running_loop() + start_watchdog(loop, check_interval=5.0, timeout_threshold=15.0) + logger.info(f"[Worker {worker_id}] Event loop watchdog started") + except Exception as e: + logger.warning(f"[Worker {worker_id}] Failed to start watchdog: {e}") + if telemetry_settings.profiler: try: import googlecloudprofiler diff --git a/test_watchdog_hang.py b/test_watchdog_hang.py new file mode 100644 index 00000000..ea99f34c --- /dev/null +++ b/test_watchdog_hang.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +Test script to verify the watchdog detects event loop hangs. +Run this to validate the watchdog works before deploying. +""" + +import asyncio +import logging +import sys +import time +from pathlib import Path + +# Setup logging to see watchdog output +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") + +# Add letta to path +sys.path.insert(0, str(Path(__file__).parent)) + +from letta.monitoring.event_loop_watchdog import start_watchdog + + +def blocking_operation(seconds: float): + """Simulate a blocking operation that hangs the event loop.""" + print(f"\n⚠️ BLOCKING event loop for {seconds}s (simulating hang)...") + time.sleep(seconds) + print("✓ Blocking operation completed\n") + + +async def test_watchdog_detection(): + """Test that watchdog detects event loop hangs.""" + print("\n" + "=" * 70) + print("EVENT LOOP WATCHDOG TEST") + print("=" * 70) + + # Start the watchdog with aggressive settings for testing + loop = asyncio.get_running_loop() + watchdog = start_watchdog(loop, check_interval=2.0, timeout_threshold=5.0) + + print("\n✓ Watchdog started (will alert if no heartbeat for >5s)") + print(" Checking every 2 seconds...\n") + + # Test 1: Normal operation (should NOT trigger) + print("TEST 1: Normal async operation (no hang expected)") + print("-" * 70) + for i in range(3): + await asyncio.sleep(1) + print(f" Heartbeat {i + 1}/3 - event loop running normally") + print("✓ Test 1 passed: No false alarms\n") + + await asyncio.sleep(3) + + # Test 2: Short block (should NOT trigger - under 5s threshold) + print("TEST 2: Short blocking operation (4s - should NOT trigger)") + print("-" * 70) + blocking_operation(4.0) + await asyncio.sleep(3) + print("✓ Test 2 passed: Short blocks don't trigger false alarms\n") + + await asyncio.sleep(2) + + # Test 3: Long block (SHOULD trigger - exceeds 5s threshold) + print("TEST 3: Long blocking operation (8s - SHOULD trigger watchdog)") + print("-" * 70) + print("🔍 Watch for ERROR logs from the watchdog...") + blocking_operation(8.0) + + # Give watchdog time to detect and log + await asyncio.sleep(3) + + print("\n" + "=" * 70) + print("TEST COMPLETE") + print("=" * 70) + print("\nExpected results:") + print(" ✓ Test 1: No watchdog alerts (normal operation)") + print(" ✓ Test 2: No watchdog alerts (4s < 5s threshold)") + print(" ✓ Test 3: WATCHDOG ALERT logged (8s > 5s threshold)") + print("\nIf you saw 'EVENT LOOP HANG DETECTED' in Test 3, watchdog works! ✓") + print("\n") + + # Stop watchdog + watchdog.stop() + + +async def main(): + """Run the test.""" + try: + await test_watchdog_detection() + except Exception as e: + print(f"\n❌ Test failed with error: {e}") + import traceback + + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main())