From 720fc9c75893affa3950e6b2e2ed20957fe18b40 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Fri, 30 Jan 2026 15:16:12 -0800 Subject: [PATCH] feat: add statement_timeout to SQLAlchemy OTEL spans (#9220) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Queries Postgres for statement_timeout on connection checkout and adds it as db.statement_timeout attribute on cursor execution spans. 🤖 Generated with [Letta Code](https://letta.com) Co-authored-by: Letta --- letta/otel/sqlalchemy_instrumentation.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/letta/otel/sqlalchemy_instrumentation.py b/letta/otel/sqlalchemy_instrumentation.py index 9305d6aa..53242d88 100644 --- a/letta/otel/sqlalchemy_instrumentation.py +++ b/letta/otel/sqlalchemy_instrumentation.py @@ -120,11 +120,24 @@ def _instrument_engine_events(engine: Engine) -> None: if isinstance(engine, AsyncEngine): engine = engine.sync_engine + def checkout(dbapi_conn, connection_record, connection_proxy): + """Query and cache statement_timeout on connection checkout.""" + try: + cursor = dbapi_conn.cursor() + cursor.execute("SHOW statement_timeout") + result = cursor.fetchone() + connection_record.info["statement_timeout"] = result[0] if result else None + cursor.close() + except Exception: + connection_record.info["statement_timeout"] = None + def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): """Track cursor execution start.""" if not _config["enabled"]: return + statement_timeout = conn.info.get("statement_timeout") + # Store context for the after event context._sync_instrumentation_span = _create_sync_db_span( operation_type="cursor_execute", @@ -132,6 +145,7 @@ def _instrument_engine_events(engine: Engine) -> None: additional_attrs={ "db.executemany": executemany, "db.connection.info": str(conn.info), + "db.statement_timeout": statement_timeout, }, ) @@ -163,6 +177,7 @@ def _instrument_engine_events(engine: Engine) -> None: context._sync_instrumentation_span = None # Register engine events + event.listen(engine.pool, "checkout", checkout) event.listen(engine, "before_cursor_execute", before_cursor_execute) event.listen(engine, "after_cursor_execute", after_cursor_execute) event.listen(engine, "handle_error", handle_cursor_error) @@ -170,6 +185,7 @@ def _instrument_engine_events(engine: Engine) -> None: # Store listeners for cleanup _instrumentation_state["engine_listeners"].extend( [ + (engine.pool, "checkout", checkout), (engine, "before_cursor_execute", before_cursor_execute), (engine, "after_cursor_execute", after_cursor_execute), (engine, "handle_error", handle_cursor_error),