diff --git a/letta/monitoring/event_loop_watchdog.py b/letta/monitoring/event_loop_watchdog.py index 18879da6..77e8fa9d 100644 --- a/letta/monitoring/event_loop_watchdog.py +++ b/letta/monitoring/event_loop_watchdog.py @@ -7,6 +7,7 @@ import asyncio import threading import time import traceback +from collections import defaultdict from typing import Optional from letta.log import get_logger @@ -128,6 +129,10 @@ class EventLoopWatchdog: f"Event loop saturation detected: lag={current_lag:.2f}s, tasks={task_count}, max_lag_seen={max_lag_seen:.2f}s" ) + # Only dump stack traces for severe lag (> 3s) to avoid noise + if current_lag > 3.0: + self._dump_asyncio_tasks() + if time_since_heartbeat > self.timeout_threshold: consecutive_hangs += 1 logger.error( @@ -135,8 +140,9 @@ class EventLoopWatchdog: f"tasks={task_count}" ) - # Dump basic state + # Dump both thread state and asyncio tasks self._dump_state() + self._dump_asyncio_tasks() if consecutive_hangs >= 2: logger.critical(f"Event loop appears frozen ({consecutive_hangs} consecutive hangs), tasks={task_count}") @@ -180,6 +186,75 @@ class EventLoopWatchdog: except Exception as e: logger.error(f"Failed to dump state: {e}") + def _dump_asyncio_tasks(self): + """Dump asyncio task stack traces to diagnose event loop saturation.""" + try: + if not self._loop or self._loop.is_closed(): + return + + active_tasks = asyncio.all_tasks(self._loop) + if not active_tasks: + return + + logger.warning(f"Severe lag detected - dumping active tasks ({len(active_tasks)} total):") + + # Collect task data in single pass + tasks_by_location = defaultdict(list) + + for task in active_tasks: + try: + if task.done(): + continue + stack = task.get_stack() + if not stack: + continue + + # Find top letta frame for grouping + for frame in reversed(stack): + if "letta" in frame.f_code.co_filename: + idx = frame.f_code.co_filename.find("letta/") + path = frame.f_code.co_filename[idx + 6 :] if idx != -1 else frame.f_code.co_filename + location = f"{path}:{frame.f_lineno}:{frame.f_code.co_name}" + tasks_by_location[location].append((task, stack)) + break + except Exception: + continue + + if not tasks_by_location: + return + + total_tasks = sum(len(tasks) for tasks in tasks_by_location.values()) + logger.warning(f" Letta tasks: {total_tasks} total") + + # Show all patterns, but only examples for top 5 + sorted_patterns = sorted(tasks_by_location.items(), key=lambda x: len(x[1]), reverse=True) + num_patterns = len(sorted_patterns) + + logger.warning(f" Task patterns ({num_patterns} unique locations):") + for i, (location, tasks) in enumerate(sorted_patterns, 1): + logger.warning(f" [{i}] {len(tasks)} tasks at: {location}") + + # Only show example stack for top 5 patterns + if i <= 5: + _, sample_stack = tasks[0] + stack_summary = [] + for frame in sample_stack[-5:]: + filename = frame.f_code.co_filename + letta_idx = filename.find("letta/") + if letta_idx != -1: + stack_summary.append(f"{filename[letta_idx + 6 :]}:{frame.f_lineno}:{frame.f_code.co_name}") + else: + pkg_idx = filename.find("site-packages/") + if pkg_idx != -1: + lib_end = filename.find("/", pkg_idx + 14) + lib = filename[pkg_idx + 14 : lib_end] if lib_end != -1 else "lib" + stack_summary.append(f"[{lib}].{frame.f_code.co_name}") + if stack_summary: + logger.warning(f" Example: {' → '.join(stack_summary)}") + + except Exception as e: + logger.error(f"Failed to dump asyncio tasks: {e}") + _global_watchdog: Optional[EventLoopWatchdog] = None