From 6fbd5bcdd01fa26d79829eceb2131dd6cecb81a3 Mon Sep 17 00:00:00 2001 From: cthomas Date: Sun, 20 Jul 2025 13:59:07 -0700 Subject: [PATCH] feat: add connection release info to tracing (#3432) --- letta/server/db.py | 47 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/letta/server/db.py b/letta/server/db.py index 30e6a723..11365522 100644 --- a/letta/server/db.py +++ b/letta/server/db.py @@ -1,9 +1,11 @@ import os import threading +import time import uuid from contextlib import asynccontextmanager, contextmanager from typing import Any, AsyncGenerator, Generator +from opentelemetry import trace from rich.console import Console from rich.panel import Panel from rich.text import Text @@ -45,6 +47,37 @@ def enable_sqlite_foreign_keys(dbapi_connection, connection_record): cursor.close() +def on_connect(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("SELECT pg_backend_pid()") + pid = cursor.fetchone()[0] + connection_record.info["pid"] = pid + connection_record.info["connect_spawn_time_ms"] = time.perf_counter() * 1000 + cursor.close() + + +def on_close(dbapi_connection, connection_record): + connection_record.info.get("pid") + (time.perf_counter() * 1000) - connection_record.info.get("connect_spawn_time_ms") + # print(f"Connection closed: {pid}, duration: {duration:.6f}s") + + +def on_checkout(dbapi_connection, connection_record, connection_proxy): + connection_record.info.get("pid") + connection_record.info["connect_checkout_time_ms"] = time.perf_counter() * 1000 + + +def on_checkin(dbapi_connection, connection_record): + pid = connection_record.info.get("pid") + duration = (time.perf_counter() * 1000) - connection_record.info.get("connect_checkout_time_ms") + + tracer = trace.get_tracer("letta.db.connection") + with tracer.start_as_current_span("connect_release") as span: + span.set_attribute("db.connection.pid", pid) + span.set_attribute("db.connection.duration_ms", duration) + span.set_attribute("db.connection.operation", "checkin") + + @contextmanager def db_error_handler(): """Context manager for handling database errors""" @@ -116,6 +149,13 @@ class DatabaseRegistry: Base.metadata.create_all(bind=engine) self._engines["default"] = engine + # Set up connection monitoring + if settings.sqlalchemy_tracing and settings.database_engine is DatabaseChoice.POSTGRES: + event.listen(engine, "connect", on_connect) + event.listen(engine, "close", on_close) + event.listen(engine, "checkout", on_checkout) + event.listen(engine, "checkin", on_checkin) + self._setup_pool_monitoring(engine, "default") # Create session factory @@ -157,6 +197,13 @@ class DatabaseRegistry: # Create async session factory self._async_engines["default"] = async_engine + # Set up connection monitoring for async engine + if settings.sqlalchemy_tracing and settings.database_engine is DatabaseChoice.POSTGRES: + event.listen(async_engine.sync_engine, "connect", on_connect) + event.listen(async_engine.sync_engine, "close", on_close) + event.listen(async_engine.sync_engine, "checkout", on_checkout) + event.listen(async_engine.sync_engine, "checkin", on_checkin) + self._setup_pool_monitoring(async_engine, "default_async") self._async_session_factories["default"] = async_sessionmaker(