diff --git a/letta/otel/metric_registry.py b/letta/otel/metric_registry.py index 60ece713..5ac41051 100644 --- a/letta/otel/metric_registry.py +++ b/letta/otel/metric_registry.py @@ -120,3 +120,29 @@ class MetricRegistry: unit="1", ), ) + + # (includes endpoint_path, method, status_code) + @property + def endpoint_e2e_ms_histogram(self) -> Histogram: + return self._get_or_create_metric( + "hist_endpoint_e2e_ms", + partial( + self._meter.create_histogram, + name="hist_endpoint_e2e_ms", + description="Histogram for endpoint e2e time (ms)", + unit="ms", + ), + ) + + # (includes endpoint_path, method, status_code) + @property + def endpoint_request_counter(self) -> Counter: + return self._get_or_create_metric( + "count_endpoint_requests", + partial( + self._meter.create_counter, + name="count_endpoint_requests", + description="Counts the number of endpoint requests", + unit="1", + ), + ) diff --git a/letta/otel/metrics.py b/letta/otel/metrics.py index 8e11f291..239a6430 100644 --- a/letta/otel/metrics.py +++ b/letta/otel/metrics.py @@ -1,12 +1,17 @@ -from fastapi import FastAPI, Request +import re +import time +from typing import List, Optional + +from fastapi import Depends, FastAPI, Request from opentelemetry import metrics from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter from opentelemetry.metrics import NoOpMeter from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from letta.helpers.datetime_helpers import ns_to_ms from letta.log import get_logger -from letta.otel.context import add_ctx_attribute +from letta.otel.context import add_ctx_attribute, get_ctx_attributes from letta.otel.resource import get_resource, is_pytest_environment logger = get_logger(__name__) @@ -14,26 +19,85 @@ logger = get_logger(__name__) _meter: metrics.Meter = NoOpMeter("noop") _is_metrics_initialized: bool = False +# Endpoints to include in endpoint metrics tracking (opt-in) vs tracing.py opt-out +_included_v1_endpoints_regex: List[str] = [ + "^POST /v1/agents/(?P[^/]+)/messages$", + "^POST /v1/agents/(?P[^/]+)/messages/stream$", + "^POST /v1/agents/(?P[^/]+)/messages/async$", +] + +# Header attributes to set context with +header_attributes = { + "x-organization-id": "organization.id", + "x-project-id": "project.id", + "x-base-template-id": "base_template.id", + "x-template-id": "template.id", + "x-agent-id": "agent.id", +} + async def _otel_metric_middleware(request: Request, call_next): if not _is_metrics_initialized: return await call_next(request) - header_attributes = { - "x-organization-id": "organization.id", - "x-project-id": "project.id", - "x-base-template-id": "base_template.id", - "x-template-id": "template.id", - "x-agent-id": "agent.id", - } - try: - for header_key, otel_key in header_attributes.items(): - header_value = request.headers.get(header_key) - if header_value: - add_ctx_attribute(otel_key, header_value) + for header_key, otel_key in header_attributes.items(): + header_value = request.headers.get(header_key) + if header_value: + add_ctx_attribute(otel_key, header_value) + + # Opt-in check for latency / error tracking + endpoint_path = f"{request.method} {request.url.path}" + should_track_endpoint_metrics = any(re.match(regex, endpoint_path) for regex in _included_v1_endpoints_regex) + + if not should_track_endpoint_metrics: return await call_next(request) - except Exception: + + # --- Opt-in endpoint metrics --- + start_perf_counter_ns = time.perf_counter_ns() + response = None + status_code = 500 # reasonable default + + try: + response = await call_next(request) + status_code = response.status_code + return response + except Exception as e: + # Determine status code from exception + status_code = getattr(e, "status_code", 500) raise + finally: + end_to_end_ms = ns_to_ms(time.perf_counter_ns() - start_perf_counter_ns) + _record_endpoint_metrics( + request=request, + latency_ms=end_to_end_ms, + status_code=status_code, + ) + + +def _record_endpoint_metrics( + request: Request, + latency_ms: float, + status_code: int, +): + """Record endpoint latency and request count metrics.""" + try: + # Get the route pattern for better endpoint naming + route = request.scope.get("route") + endpoint_name = route.path if route and hasattr(route, "path") else "unknown" + + attrs = { + "endpoint_path": endpoint_name, + "method": request.method, + "status_code": status_code, + **get_ctx_attributes(), + } + from letta.otel.metric_registry import MetricRegistry + + MetricRegistry().endpoint_e2e_ms_histogram.record(latency_ms, attributes=attrs) + MetricRegistry().endpoint_request_counter.add(1, attributes=attrs) + + except Exception as e: + logger.warning(f"Failed to record endpoint metrics: {e}") def setup_metrics(