feat: LET-3090 add sqlalchemy debug info to traces (#3401)
This commit is contained in:
548
letta/otel/sqlalchemy_instrumentation.py
Normal file
548
letta/otel/sqlalchemy_instrumentation.py
Normal file
@@ -0,0 +1,548 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import traceback
|
||||
from contextlib import contextmanager
|
||||
from functools import wraps
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
from opentelemetry import trace
|
||||
from opentelemetry.trace import Status, StatusCode
|
||||
from sqlalchemy import Engine, event
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.orm.loading import load_on_ident, load_on_pk_identity
|
||||
from sqlalchemy.orm.strategies import ImmediateLoader, JoinedLoader, LazyLoader, SelectInLoader, SubqueryLoader
|
||||
|
||||
_config = {
|
||||
"enabled": True,
|
||||
"sql_truncate_length": 1000,
|
||||
"monitor_joined_loading": True,
|
||||
"log_instrumentation_errors": True,
|
||||
}
|
||||
|
||||
_instrumentation_state = {
|
||||
"engine_listeners": [],
|
||||
"session_listeners": [],
|
||||
"original_methods": {},
|
||||
"active": False,
|
||||
}
|
||||
|
||||
_context = threading.local()
|
||||
|
||||
|
||||
def _get_tracer():
|
||||
"""Get the OpenTelemetry tracer for SQLAlchemy instrumentation."""
|
||||
return trace.get_tracer("sqlalchemy_sync_instrumentation", "1.0.0")
|
||||
|
||||
|
||||
def _is_event_loop_running() -> bool:
|
||||
"""Check if an asyncio event loop is running in the current thread."""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
return loop.is_running()
|
||||
except RuntimeError:
|
||||
return False
|
||||
|
||||
|
||||
def _is_main_thread() -> bool:
|
||||
"""Check if we're running on the main thread."""
|
||||
return threading.current_thread() is threading.main_thread()
|
||||
|
||||
|
||||
def _truncate_sql(sql: str, max_length: int = 1000) -> str:
|
||||
"""Truncate SQL statement to specified length."""
|
||||
if len(sql) <= max_length:
|
||||
return sql
|
||||
return sql[: max_length - 3] + "..."
|
||||
|
||||
|
||||
def _create_sync_db_span(
|
||||
operation_type: str,
|
||||
sql_statement: Optional[str] = None,
|
||||
loader_type: Optional[str] = None,
|
||||
relationship_key: Optional[str] = None,
|
||||
is_joined: bool = False,
|
||||
additional_attrs: Optional[Dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
"""
|
||||
Create an OpenTelemetry span for a synchronous database operation.
|
||||
|
||||
Args:
|
||||
operation_type: Type of database operation
|
||||
sql_statement: SQL statement being executed
|
||||
loader_type: Type of SQLAlchemy loader (selectin, joined, lazy, etc.)
|
||||
relationship_key: Name of relationship attribute if applicable
|
||||
is_joined: Whether this is from joined loading
|
||||
additional_attrs: Additional attributes to add to the span
|
||||
|
||||
Returns:
|
||||
OpenTelemetry span
|
||||
"""
|
||||
if not _config["enabled"]:
|
||||
return None
|
||||
|
||||
# Only create spans for potentially problematic operations
|
||||
if not _is_event_loop_running():
|
||||
return None
|
||||
|
||||
tracer = _get_tracer()
|
||||
span = tracer.start_span("db_operation")
|
||||
|
||||
# Set core attributes
|
||||
span.set_attribute("db.operation.type", operation_type)
|
||||
|
||||
# SQL statement
|
||||
if sql_statement:
|
||||
span.set_attribute("db.statement", _truncate_sql(sql_statement, _config["sql_truncate_length"]))
|
||||
|
||||
# Loader information
|
||||
if loader_type:
|
||||
span.set_attribute("sqlalchemy.loader.type", loader_type)
|
||||
span.set_attribute("sqlalchemy.loader.is_joined", is_joined)
|
||||
|
||||
# Relationship information
|
||||
if relationship_key:
|
||||
span.set_attribute("sqlalchemy.relationship.key", relationship_key)
|
||||
|
||||
# Additional attributes
|
||||
if additional_attrs:
|
||||
for key, value in additional_attrs.items():
|
||||
span.set_attribute(key, value)
|
||||
|
||||
return span
|
||||
|
||||
|
||||
def _instrument_engine_events(engine: Engine) -> None:
|
||||
"""Instrument SQLAlchemy engine events to detect sync operations."""
|
||||
|
||||
# Check if this is an AsyncEngine and get its sync_engine if it is
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine
|
||||
|
||||
if isinstance(engine, AsyncEngine):
|
||||
engine = engine.sync_engine
|
||||
|
||||
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
|
||||
"""Track cursor execution start."""
|
||||
if not _config["enabled"]:
|
||||
return
|
||||
|
||||
# Store context for the after event
|
||||
context._sync_instrumentation_span = _create_sync_db_span(
|
||||
operation_type="cursor_execute",
|
||||
sql_statement=statement,
|
||||
additional_attrs={
|
||||
"db.executemany": executemany,
|
||||
"db.connection.info": str(conn.info),
|
||||
},
|
||||
)
|
||||
|
||||
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
|
||||
"""Track cursor execution completion."""
|
||||
if not _config["enabled"]:
|
||||
return
|
||||
|
||||
span = getattr(context, "_sync_instrumentation_span", None)
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
context._sync_instrumentation_span = None
|
||||
|
||||
def handle_cursor_error(conn, cursor, statement, parameters, context, executemany):
|
||||
"""Handle cursor execution errors."""
|
||||
if not _config["enabled"]:
|
||||
return
|
||||
|
||||
span = getattr(context, "_sync_instrumentation_span", None)
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.ERROR, "Database operation failed"))
|
||||
span.end()
|
||||
context._sync_instrumentation_span = None
|
||||
|
||||
# Register engine events
|
||||
event.listen(engine, "before_cursor_execute", before_cursor_execute)
|
||||
event.listen(engine, "after_cursor_execute", after_cursor_execute)
|
||||
event.listen(engine, "handle_error", handle_cursor_error)
|
||||
|
||||
# Store listeners for cleanup
|
||||
_instrumentation_state["engine_listeners"].extend(
|
||||
[
|
||||
(engine, "before_cursor_execute", before_cursor_execute),
|
||||
(engine, "after_cursor_execute", after_cursor_execute),
|
||||
(engine, "handle_error", handle_cursor_error),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def _instrument_loader_strategies() -> None:
|
||||
"""Instrument SQLAlchemy loader strategies to detect lazy loading."""
|
||||
|
||||
def create_loader_wrapper(loader_class: type, loader_type: str, is_joined: bool = False):
|
||||
"""Create a wrapper for loader strategy methods."""
|
||||
|
||||
def wrapper(original_method: Callable):
|
||||
@wraps(original_method)
|
||||
def instrumented_method(self, *args, **kwargs):
|
||||
# Extract relationship information if available
|
||||
relationship_key = getattr(self, "key", None)
|
||||
if hasattr(self, "parent_property"):
|
||||
relationship_key = getattr(self.parent_property, "key", relationship_key)
|
||||
|
||||
span = _create_sync_db_span(
|
||||
operation_type="loader_strategy",
|
||||
loader_type=loader_type,
|
||||
relationship_key=relationship_key,
|
||||
is_joined=is_joined,
|
||||
additional_attrs={
|
||||
"sqlalchemy.loader.class": loader_class.__name__,
|
||||
"sqlalchemy.loader.method": original_method.__name__,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
result = original_method(self, *args, **kwargs)
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return result
|
||||
except Exception as e:
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
raise
|
||||
finally:
|
||||
if span:
|
||||
span.end()
|
||||
|
||||
return instrumented_method
|
||||
|
||||
return wrapper
|
||||
|
||||
# Instrument different loader strategies
|
||||
loaders_to_instrument = [
|
||||
(SelectInLoader, "selectin", False),
|
||||
(JoinedLoader, "joined", True),
|
||||
(LazyLoader, "lazy", False),
|
||||
(SubqueryLoader, "subquery", False),
|
||||
(ImmediateLoader, "immediate", False),
|
||||
]
|
||||
|
||||
for loader_class, loader_type, is_joined in loaders_to_instrument:
|
||||
# Skip if monitoring joined loading is disabled
|
||||
if is_joined and not _config["monitor_joined_loading"]:
|
||||
continue
|
||||
|
||||
wrapper = create_loader_wrapper(loader_class, loader_type, is_joined)
|
||||
|
||||
# Instrument key methods
|
||||
methods_to_instrument = ["_load_for_path", "load_for_path"]
|
||||
|
||||
for method_name in methods_to_instrument:
|
||||
if hasattr(loader_class, method_name):
|
||||
original_method = getattr(loader_class, method_name)
|
||||
key = f"{loader_class.__name__}.{method_name}"
|
||||
|
||||
# Store original method for cleanup
|
||||
_instrumentation_state["original_methods"][key] = original_method
|
||||
|
||||
# Apply wrapper
|
||||
setattr(loader_class, method_name, wrapper(original_method))
|
||||
|
||||
# Instrument additional joined loading specific methods
|
||||
if _config["monitor_joined_loading"]:
|
||||
joined_methods = [
|
||||
(JoinedLoader, "_create_eager_join"),
|
||||
(JoinedLoader, "_generate_cache_key"),
|
||||
]
|
||||
|
||||
wrapper = create_loader_wrapper(JoinedLoader, "joined", True)
|
||||
|
||||
for loader_class, method_name in joined_methods:
|
||||
if hasattr(loader_class, method_name):
|
||||
original_method = getattr(loader_class, method_name)
|
||||
key = f"{loader_class.__name__}.{method_name}"
|
||||
|
||||
_instrumentation_state["original_methods"][key] = original_method
|
||||
setattr(loader_class, method_name, wrapper(original_method))
|
||||
|
||||
|
||||
def _instrument_loading_functions() -> None:
|
||||
"""Instrument SQLAlchemy loading functions."""
|
||||
|
||||
def create_loading_wrapper(func_name: str):
|
||||
"""Create a wrapper for loading functions."""
|
||||
|
||||
def wrapper(original_func: Callable):
|
||||
@wraps(original_func)
|
||||
def instrumented_func(*args, **kwargs):
|
||||
span = _create_sync_db_span(
|
||||
operation_type="loading_function",
|
||||
additional_attrs={
|
||||
"sqlalchemy.loading.function": func_name,
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
result = original_func(*args, **kwargs)
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
return result
|
||||
except Exception as e:
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.ERROR, str(e)))
|
||||
raise
|
||||
finally:
|
||||
if span:
|
||||
span.end()
|
||||
|
||||
return instrumented_func
|
||||
|
||||
return wrapper
|
||||
|
||||
# Instrument loading functions
|
||||
import sqlalchemy.orm.loading as loading_module
|
||||
|
||||
functions_to_instrument = [
|
||||
(loading_module, "load_on_ident", load_on_ident),
|
||||
(loading_module, "load_on_pk_identity", load_on_pk_identity),
|
||||
]
|
||||
|
||||
for module, func_name, original_func in functions_to_instrument:
|
||||
wrapper = create_loading_wrapper(func_name)
|
||||
|
||||
# Store original function for cleanup
|
||||
_instrumentation_state["original_methods"][f"loading.{func_name}"] = original_func
|
||||
|
||||
# Apply wrapper
|
||||
setattr(module, func_name, wrapper(original_func))
|
||||
|
||||
|
||||
def _instrument_session_operations() -> None:
|
||||
"""Instrument SQLAlchemy session operations."""
|
||||
|
||||
def before_flush(session, flush_context, instances):
|
||||
"""Track session flush operations."""
|
||||
if not _config["enabled"]:
|
||||
return
|
||||
|
||||
span = _create_sync_db_span(
|
||||
operation_type="session_flush",
|
||||
additional_attrs={
|
||||
"sqlalchemy.session.new_count": len(session.new),
|
||||
"sqlalchemy.session.dirty_count": len(session.dirty),
|
||||
"sqlalchemy.session.deleted_count": len(session.deleted),
|
||||
},
|
||||
)
|
||||
|
||||
# Store span in session for cleanup
|
||||
session._sync_instrumentation_flush_span = span
|
||||
|
||||
def after_flush(session, flush_context):
|
||||
"""Track session flush completion."""
|
||||
if not _config["enabled"]:
|
||||
return
|
||||
|
||||
span = getattr(session, "_sync_instrumentation_flush_span", None)
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
session._sync_instrumentation_flush_span = None
|
||||
|
||||
def after_flush_postexec(session, flush_context):
|
||||
"""Track session flush post-execution."""
|
||||
if not _config["enabled"]:
|
||||
return
|
||||
|
||||
span = getattr(session, "_sync_instrumentation_flush_span", None)
|
||||
if span:
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
session._sync_instrumentation_flush_span = None
|
||||
|
||||
# Register session events
|
||||
event.listen(Session, "before_flush", before_flush)
|
||||
event.listen(Session, "after_flush", after_flush)
|
||||
event.listen(Session, "after_flush_postexec", after_flush_postexec)
|
||||
|
||||
# Store listeners for cleanup
|
||||
_instrumentation_state["session_listeners"].extend(
|
||||
[
|
||||
(Session, "before_flush", before_flush),
|
||||
(Session, "after_flush", after_flush),
|
||||
(Session, "after_flush_postexec", after_flush_postexec),
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
def setup_sqlalchemy_sync_instrumentation(
|
||||
engines: Optional[List[Engine]] = None,
|
||||
config_overrides: Optional[Dict[str, Any]] = None,
|
||||
lazy_loading_only: bool = True,
|
||||
) -> None:
|
||||
"""
|
||||
Set up SQLAlchemy synchronous operation instrumentation.
|
||||
|
||||
Args:
|
||||
engines: List of SQLAlchemy engines to instrument. If None, will attempt
|
||||
to discover engines automatically.
|
||||
config_overrides: Dictionary of configuration overrides.
|
||||
lazy_loading_only: If True, only instrument lazy loading operations.
|
||||
"""
|
||||
if _instrumentation_state["active"]:
|
||||
return # Already active
|
||||
|
||||
try:
|
||||
# Apply configuration overrides
|
||||
if config_overrides:
|
||||
_config.update(config_overrides)
|
||||
|
||||
# If lazy_loading_only is True, update config to focus on lazy loading
|
||||
if lazy_loading_only:
|
||||
_config.update(
|
||||
{
|
||||
"monitor_joined_loading": False, # Don't monitor joined loading
|
||||
}
|
||||
)
|
||||
|
||||
# Discover engines if not provided
|
||||
if engines is None:
|
||||
engines = []
|
||||
# Try to find engines from the database registry
|
||||
try:
|
||||
from letta.server.db import db_registry
|
||||
|
||||
if hasattr(db_registry, "_async_engines"):
|
||||
engines.extend(db_registry._async_engines.values())
|
||||
if hasattr(db_registry, "_sync_engines"):
|
||||
engines.extend(db_registry._sync_engines.values())
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Instrument loader strategies (focus on lazy loading if specified)
|
||||
_instrument_loader_strategies()
|
||||
|
||||
# Instrument loading functions
|
||||
_instrument_loading_functions()
|
||||
|
||||
# Instrument session operations
|
||||
_instrument_session_operations()
|
||||
|
||||
# Instrument engines last to avoid potential errors with async engines
|
||||
for engine in engines:
|
||||
try:
|
||||
_instrument_engine_events(engine)
|
||||
except Exception as e:
|
||||
if _config["log_instrumentation_errors"]:
|
||||
print(f"Error instrumenting engine {engine}: {e}")
|
||||
# Continue with other engines
|
||||
|
||||
_instrumentation_state["active"] = True
|
||||
|
||||
except Exception as e:
|
||||
if _config["log_instrumentation_errors"]:
|
||||
print(f"Error setting up SQLAlchemy instrumentation: {e}")
|
||||
import traceback
|
||||
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
|
||||
def teardown_sqlalchemy_sync_instrumentation() -> None:
|
||||
"""Tear down SQLAlchemy synchronous operation instrumentation."""
|
||||
if not _instrumentation_state["active"]:
|
||||
return # Not active
|
||||
|
||||
try:
|
||||
# Remove engine listeners
|
||||
for engine, event_name, listener in _instrumentation_state["engine_listeners"]:
|
||||
event.remove(engine, event_name, listener)
|
||||
|
||||
# Remove session listeners
|
||||
for target, event_name, listener in _instrumentation_state["session_listeners"]:
|
||||
event.remove(target, event_name, listener)
|
||||
|
||||
# Restore original methods
|
||||
for key, original_method in _instrumentation_state["original_methods"].items():
|
||||
if "." in key:
|
||||
module_or_class_name, method_name = key.rsplit(".", 1)
|
||||
|
||||
if key.startswith("loading."):
|
||||
# Restore loading function
|
||||
import sqlalchemy.orm.loading as loading_module
|
||||
|
||||
setattr(loading_module, method_name, original_method)
|
||||
else:
|
||||
# Restore class method
|
||||
class_name = module_or_class_name
|
||||
# Find the class
|
||||
for cls in [SelectInLoader, JoinedLoader, LazyLoader, SubqueryLoader, ImmediateLoader]:
|
||||
if cls.__name__ == class_name:
|
||||
setattr(cls, method_name, original_method)
|
||||
break
|
||||
|
||||
# Clear state
|
||||
_instrumentation_state["engine_listeners"].clear()
|
||||
_instrumentation_state["session_listeners"].clear()
|
||||
_instrumentation_state["original_methods"].clear()
|
||||
_instrumentation_state["active"] = False
|
||||
|
||||
except Exception as e:
|
||||
if _config["log_instrumentation_errors"]:
|
||||
print(f"Error tearing down SQLAlchemy instrumentation: {e}")
|
||||
traceback.print_exc()
|
||||
raise
|
||||
|
||||
|
||||
def configure_instrumentation(**kwargs) -> None:
|
||||
"""
|
||||
Configure SQLAlchemy synchronous operation instrumentation.
|
||||
|
||||
Args:
|
||||
**kwargs: Configuration options to update.
|
||||
"""
|
||||
_config.update(kwargs)
|
||||
|
||||
|
||||
def get_instrumentation_config() -> Dict[str, Any]:
|
||||
"""Get current instrumentation configuration."""
|
||||
return _config.copy()
|
||||
|
||||
|
||||
def is_instrumentation_active() -> bool:
|
||||
"""Check if instrumentation is currently active."""
|
||||
return _instrumentation_state["active"]
|
||||
|
||||
|
||||
# Context manager for temporary instrumentation
|
||||
@contextmanager
|
||||
def temporary_instrumentation(**config_overrides):
|
||||
"""
|
||||
Context manager for temporary SQLAlchemy instrumentation.
|
||||
|
||||
Args:
|
||||
**config_overrides: Configuration overrides for the instrumentation.
|
||||
"""
|
||||
was_active = _instrumentation_state["active"]
|
||||
|
||||
if not was_active:
|
||||
setup_sqlalchemy_sync_instrumentation(config_overrides=config_overrides)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if not was_active:
|
||||
teardown_sqlalchemy_sync_instrumentation()
|
||||
|
||||
|
||||
# FastAPI integration helper
|
||||
def setup_fastapi_instrumentation(app):
|
||||
"""
|
||||
Set up SQLAlchemy instrumentation for FastAPI application.
|
||||
|
||||
Args:
|
||||
app: FastAPI application instance
|
||||
"""
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_instrumentation():
|
||||
setup_sqlalchemy_sync_instrumentation()
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_instrumentation():
|
||||
teardown_sqlalchemy_sync_instrumentation()
|
||||
124
letta/otel/sqlalchemy_instrumentation_integration.py
Normal file
124
letta/otel/sqlalchemy_instrumentation_integration.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""
|
||||
Integration module for SQLAlchemy synchronous operation instrumentation.
|
||||
|
||||
This module provides easy integration with the existing Letta application,
|
||||
including automatic discovery of database engines and integration with
|
||||
the existing OpenTelemetry setup.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from letta.otel.sqlalchemy_instrumentation import (
|
||||
configure_instrumentation,
|
||||
get_instrumentation_config,
|
||||
is_instrumentation_active,
|
||||
setup_sqlalchemy_sync_instrumentation,
|
||||
teardown_sqlalchemy_sync_instrumentation,
|
||||
)
|
||||
from letta.server.db import db_registry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def setup_letta_db_instrumentation(
|
||||
enable_joined_monitoring: bool = True,
|
||||
sql_truncate_length: int = 1000,
|
||||
additional_config: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Set up SQLAlchemy instrumentation for Letta application.
|
||||
|
||||
Args:
|
||||
enable_joined_monitoring: Whether to monitor joined loading operations
|
||||
sql_truncate_length: Maximum length of SQL statements in traces
|
||||
additional_config: Additional configuration options
|
||||
"""
|
||||
if is_instrumentation_active():
|
||||
logger.info("SQLAlchemy instrumentation already active")
|
||||
return
|
||||
|
||||
# Build configuration
|
||||
config = {
|
||||
"enabled": True,
|
||||
"monitor_joined_loading": enable_joined_monitoring,
|
||||
"sql_truncate_length": sql_truncate_length,
|
||||
"log_instrumentation_errors": True,
|
||||
}
|
||||
|
||||
if additional_config:
|
||||
config.update(additional_config)
|
||||
|
||||
# Get engines from db_registry
|
||||
engines = []
|
||||
try:
|
||||
if hasattr(db_registry, "_async_engines"):
|
||||
engines.extend(db_registry._async_engines.values())
|
||||
if hasattr(db_registry, "_sync_engines"):
|
||||
engines.extend(db_registry._sync_engines.values())
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not discover engines from db_registry: {e}")
|
||||
|
||||
if not engines:
|
||||
logger.warning("No SQLAlchemy engines found for instrumentation")
|
||||
return
|
||||
|
||||
try:
|
||||
setup_sqlalchemy_sync_instrumentation(
|
||||
engines=engines,
|
||||
config_overrides=config,
|
||||
)
|
||||
logger.info(f"SQLAlchemy instrumentation setup complete for {len(engines)} engines")
|
||||
|
||||
# Log configuration
|
||||
logger.info("Instrumentation configuration:")
|
||||
for key, value in get_instrumentation_config().items():
|
||||
logger.info(f" {key}: {value}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to setup SQLAlchemy instrumentation: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def teardown_letta_db_instrumentation() -> None:
|
||||
"""Tear down SQLAlchemy instrumentation for Letta application."""
|
||||
if not is_instrumentation_active():
|
||||
logger.info("SQLAlchemy instrumentation not active")
|
||||
return
|
||||
|
||||
try:
|
||||
teardown_sqlalchemy_sync_instrumentation()
|
||||
logger.info("SQLAlchemy instrumentation teardown complete")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to teardown SQLAlchemy instrumentation: {e}")
|
||||
raise
|
||||
|
||||
|
||||
def configure_letta_db_instrumentation(**kwargs) -> None:
|
||||
"""
|
||||
Configure SQLAlchemy instrumentation for Letta application.
|
||||
|
||||
Args:
|
||||
**kwargs: Configuration options to update
|
||||
"""
|
||||
configure_instrumentation(**kwargs)
|
||||
logger.info(f"SQLAlchemy instrumentation configuration updated: {kwargs}")
|
||||
|
||||
|
||||
# FastAPI integration
|
||||
def setup_fastapi_db_instrumentation(app, **config_kwargs):
|
||||
"""
|
||||
Set up SQLAlchemy instrumentation for FastAPI application.
|
||||
|
||||
Args:
|
||||
app: FastAPI application instance
|
||||
**config_kwargs: Configuration options for instrumentation
|
||||
"""
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup_db_instrumentation():
|
||||
setup_letta_db_instrumentation(**config_kwargs)
|
||||
|
||||
@app.on_event("shutdown")
|
||||
async def shutdown_db_instrumentation():
|
||||
teardown_letta_db_instrumentation()
|
||||
@@ -143,7 +143,43 @@ def setup_tracing(
|
||||
if settings.sqlalchemy_tracing:
|
||||
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
|
||||
|
||||
SQLAlchemyInstrumentor().instrument()
|
||||
from letta.server.db import db_registry
|
||||
|
||||
# For OpenTelemetry SQLAlchemy instrumentation, we need to use the sync_engine
|
||||
async_engine = db_registry.get_async_engine()
|
||||
if async_engine:
|
||||
# Access the sync_engine attribute safely
|
||||
try:
|
||||
SQLAlchemyInstrumentor().instrument(
|
||||
engine=async_engine.sync_engine,
|
||||
enable_commenter=True,
|
||||
commenter_options={},
|
||||
enable_attribute_commenter=True,
|
||||
)
|
||||
except Exception:
|
||||
# Fall back to instrumenting without specifying an engine
|
||||
# This will still capture some SQL operations
|
||||
SQLAlchemyInstrumentor().instrument(
|
||||
enable_commenter=True,
|
||||
commenter_options={},
|
||||
enable_attribute_commenter=True,
|
||||
)
|
||||
else:
|
||||
# If no async engine is available, instrument without an engine
|
||||
SQLAlchemyInstrumentor().instrument(
|
||||
enable_commenter=True,
|
||||
commenter_options={},
|
||||
enable_attribute_commenter=True,
|
||||
)
|
||||
|
||||
# Additionally set up our custom instrumentation
|
||||
try:
|
||||
from letta.otel.sqlalchemy_instrumentation_integration import setup_letta_db_instrumentation
|
||||
|
||||
setup_letta_db_instrumentation(enable_joined_monitoring=True)
|
||||
except Exception as e:
|
||||
# Log but continue if our custom instrumentation fails
|
||||
logger.warning(f"Failed to setup Letta DB instrumentation: {e}")
|
||||
|
||||
if app:
|
||||
# Add middleware first
|
||||
|
||||
@@ -259,6 +259,11 @@ class DatabaseRegistry:
|
||||
self.initialize_sync()
|
||||
return self._engines.get(name)
|
||||
|
||||
def get_async_engine(self, name: str = "default") -> Engine:
|
||||
"""Get a database engine by name."""
|
||||
self.initialize_async()
|
||||
return self._async_engines.get(name)
|
||||
|
||||
def get_session_factory(self, name: str = "default") -> sessionmaker:
|
||||
"""Get a session factory by name."""
|
||||
self.initialize_sync()
|
||||
|
||||
@@ -157,6 +157,17 @@ async def lifespan(app_: FastAPI):
|
||||
logger.info(f"[Worker {worker_id}] Scheduler shutdown completed")
|
||||
except Exception as e:
|
||||
logger.error(f"[Worker {worker_id}] Scheduler shutdown failed: {e}", exc_info=True)
|
||||
|
||||
# Cleanup SQLAlchemy instrumentation
|
||||
if not settings.disable_tracing and settings.sqlalchemy_tracing:
|
||||
try:
|
||||
from letta.otel.sqlalchemy_instrumentation_integration import teardown_letta_db_instrumentation
|
||||
|
||||
teardown_letta_db_instrumentation()
|
||||
logger.info(f"[Worker {worker_id}] SQLAlchemy instrumentation shutdown completed")
|
||||
except Exception as e:
|
||||
logger.warning(f"[Worker {worker_id}] SQLAlchemy instrumentation shutdown failed: {e}")
|
||||
|
||||
logger.info(f"[Worker {worker_id}] Lifespan shutdown completed")
|
||||
|
||||
|
||||
@@ -314,6 +325,20 @@ def create_application() -> "FastAPI":
|
||||
)
|
||||
setup_metrics(endpoint=otlp_endpoint, app=app, service_name=service_name)
|
||||
|
||||
# Set up SQLAlchemy synchronous operation instrumentation
|
||||
if settings.sqlalchemy_tracing:
|
||||
from letta.otel.sqlalchemy_instrumentation_integration import setup_letta_db_instrumentation
|
||||
|
||||
try:
|
||||
setup_letta_db_instrumentation(
|
||||
enable_joined_monitoring=True, # Monitor joined loading operations
|
||||
sql_truncate_length=1500, # Longer SQL statements for debugging
|
||||
)
|
||||
print("▶ SQLAlchemy synchronous operation instrumentation enabled")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to setup SQLAlchemy instrumentation: {e}")
|
||||
# Don't fail startup if instrumentation fails
|
||||
|
||||
for route in v1_routes:
|
||||
app.include_router(route, prefix=API_PREFIX)
|
||||
# this gives undocumented routes for "latest" and bare api calls.
|
||||
|
||||
26
poetry.lock
generated
26
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aioboto3"
|
||||
@@ -4573,6 +4573,28 @@ opentelemetry-util-http = "0.51b0"
|
||||
[package.extras]
|
||||
instruments = ["requests (>=2.0,<3.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-instrumentation-sqlalchemy"
|
||||
version = "0.51b0"
|
||||
description = "OpenTelemetry SQLAlchemy instrumentation"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "opentelemetry_instrumentation_sqlalchemy-0.51b0-py3-none-any.whl", hash = "sha256:5ff4816228b496aef1511149e2b17a25e0faacec4d5eb65bf18a9964af40f1af"},
|
||||
{file = "opentelemetry_instrumentation_sqlalchemy-0.51b0.tar.gz", hash = "sha256:dbfe95b69006017f903dda194606be458d54789e6b3419d37161fb8861bb98a5"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
opentelemetry-api = ">=1.12,<2.0"
|
||||
opentelemetry-instrumentation = "0.51b0"
|
||||
opentelemetry-semantic-conventions = "0.51b0"
|
||||
packaging = ">=21.0"
|
||||
wrapt = ">=1.11.2"
|
||||
|
||||
[package.extras]
|
||||
instruments = ["sqlalchemy (>=1.0.0,<2.1.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "opentelemetry-proto"
|
||||
version = "1.30.0"
|
||||
@@ -8107,4 +8129,4 @@ tests = ["wikipedia"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = "<3.14,>=3.10"
|
||||
content-hash = "78dd3924afcef62f87fc4e03a33da6fbb1dc0f4f5fee435fef9ae02018414fd5"
|
||||
content-hash = "03cb9e2fb78259e20c53617103da09ff9366315a72181d8306b10d73eb08a42d"
|
||||
|
||||
@@ -77,6 +77,7 @@ openai = "^1.60.0"
|
||||
opentelemetry-api = "1.30.0"
|
||||
opentelemetry-sdk = "1.30.0"
|
||||
opentelemetry-instrumentation-requests = "0.51b0"
|
||||
opentelemetry-instrumentation-sqlalchemy = "0.51b0"
|
||||
opentelemetry-exporter-otlp = "1.30.0"
|
||||
google-genai = {version = "^1.15.0", optional = true}
|
||||
faker = "^36.1.0"
|
||||
|
||||
Reference in New Issue
Block a user