feat: dump traces on event loop saturation (#8185)
This commit is contained in:
@@ -7,6 +7,7 @@ import asyncio
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from typing import Optional
|
||||
|
||||
from letta.log import get_logger
|
||||
@@ -128,6 +129,10 @@ class EventLoopWatchdog:
|
||||
f"Event loop saturation detected: lag={current_lag:.2f}s, tasks={task_count}, max_lag_seen={max_lag_seen:.2f}s"
|
||||
)
|
||||
|
||||
# Only dump stack traces for severe lag (> 3s) to avoid noise
|
||||
if current_lag > 3.0:
|
||||
self._dump_asyncio_tasks()
|
||||
|
||||
if time_since_heartbeat > self.timeout_threshold:
|
||||
consecutive_hangs += 1
|
||||
logger.error(
|
||||
@@ -135,8 +140,9 @@ class EventLoopWatchdog:
|
||||
f"tasks={task_count}"
|
||||
)
|
||||
|
||||
# Dump basic state
|
||||
# Dump both thread state and asyncio tasks
|
||||
self._dump_state()
|
||||
self._dump_asyncio_tasks()
|
||||
|
||||
if consecutive_hangs >= 2:
|
||||
logger.critical(f"Event loop appears frozen ({consecutive_hangs} consecutive hangs), tasks={task_count}")
|
||||
@@ -180,6 +186,75 @@ class EventLoopWatchdog:
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to dump state: {e}")
|
||||
|
||||
def _dump_asyncio_tasks(self):
|
||||
"""Dump asyncio task stack traces to diagnose event loop saturation."""
|
||||
try:
|
||||
if not self._loop or self._loop.is_closed():
|
||||
return
|
||||
|
||||
active_tasks = asyncio.all_tasks(self._loop)
|
||||
if not active_tasks:
|
||||
return
|
||||
|
||||
logger.warning(f"Severe lag detected - dumping active tasks ({len(active_tasks)} total):")
|
||||
|
||||
# Collect task data in single pass
|
||||
tasks_by_location = defaultdict(list)
|
||||
|
||||
for task in active_tasks:
|
||||
try:
|
||||
if task.done():
|
||||
continue
|
||||
stack = task.get_stack()
|
||||
if not stack:
|
||||
continue
|
||||
|
||||
# Find top letta frame for grouping
|
||||
for frame in reversed(stack):
|
||||
if "letta" in frame.f_code.co_filename:
|
||||
idx = frame.f_code.co_filename.find("letta/")
|
||||
path = frame.f_code.co_filename[idx + 6 :] if idx != -1 else frame.f_code.co_filename
|
||||
location = f"{path}:{frame.f_lineno}:{frame.f_code.co_name}"
|
||||
tasks_by_location[location].append((task, stack))
|
||||
break
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not tasks_by_location:
|
||||
return
|
||||
|
||||
total_tasks = sum(len(tasks) for tasks in tasks_by_location.values())
|
||||
logger.warning(f" Letta tasks: {total_tasks} total")
|
||||
|
||||
# Show all patterns, but only examples for top 5
|
||||
sorted_patterns = sorted(tasks_by_location.items(), key=lambda x: len(x[1]), reverse=True)
|
||||
num_patterns = len(sorted_patterns)
|
||||
|
||||
logger.warning(f" Task patterns ({num_patterns} unique locations):")
|
||||
for i, (location, tasks) in enumerate(sorted_patterns, 1):
|
||||
logger.warning(f" [{i}] {len(tasks)} tasks at: {location}")
|
||||
|
||||
# Only show example stack for top 5 patterns
|
||||
if i <= 5:
|
||||
_, sample_stack = tasks[0]
|
||||
stack_summary = []
|
||||
for frame in sample_stack[-5:]:
|
||||
filename = frame.f_code.co_filename
|
||||
letta_idx = filename.find("letta/")
|
||||
if letta_idx != -1:
|
||||
stack_summary.append(f"{filename[letta_idx + 6 :]}:{frame.f_lineno}:{frame.f_code.co_name}")
|
||||
else:
|
||||
pkg_idx = filename.find("site-packages/")
|
||||
if pkg_idx != -1:
|
||||
lib_end = filename.find("/", pkg_idx + 14)
|
||||
lib = filename[pkg_idx + 14 : lib_end] if lib_end != -1 else "lib"
|
||||
stack_summary.append(f"[{lib}].{frame.f_code.co_name}")
|
||||
if stack_summary:
|
||||
logger.warning(f" Example: {' → '.join(stack_summary)}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to dump asyncio tasks: {e}")
|
||||
|
||||
|
||||
_global_watchdog: Optional[EventLoopWatchdog] = None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user