feat: otel endpoint tracking for messages (#2715)
This commit is contained in:
@@ -120,3 +120,29 @@ class MetricRegistry:
|
||||
unit="1",
|
||||
),
|
||||
)
|
||||
|
||||
# (includes endpoint_path, method, status_code)
|
||||
@property
|
||||
def endpoint_e2e_ms_histogram(self) -> Histogram:
|
||||
return self._get_or_create_metric(
|
||||
"hist_endpoint_e2e_ms",
|
||||
partial(
|
||||
self._meter.create_histogram,
|
||||
name="hist_endpoint_e2e_ms",
|
||||
description="Histogram for endpoint e2e time (ms)",
|
||||
unit="ms",
|
||||
),
|
||||
)
|
||||
|
||||
# (includes endpoint_path, method, status_code)
|
||||
@property
|
||||
def endpoint_request_counter(self) -> Counter:
|
||||
return self._get_or_create_metric(
|
||||
"count_endpoint_requests",
|
||||
partial(
|
||||
self._meter.create_counter,
|
||||
name="count_endpoint_requests",
|
||||
description="Counts the number of endpoint requests",
|
||||
unit="1",
|
||||
),
|
||||
)
|
||||
|
||||
@@ -1,12 +1,17 @@
|
||||
from fastapi import FastAPI, Request
|
||||
import re
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi import Depends, FastAPI, Request
|
||||
from opentelemetry import metrics
|
||||
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
|
||||
from opentelemetry.metrics import NoOpMeter
|
||||
from opentelemetry.sdk.metrics import MeterProvider
|
||||
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
|
||||
|
||||
from letta.helpers.datetime_helpers import ns_to_ms
|
||||
from letta.log import get_logger
|
||||
from letta.otel.context import add_ctx_attribute
|
||||
from letta.otel.context import add_ctx_attribute, get_ctx_attributes
|
||||
from letta.otel.resource import get_resource, is_pytest_environment
|
||||
|
||||
logger = get_logger(__name__)
|
||||
@@ -14,26 +19,85 @@ logger = get_logger(__name__)
|
||||
_meter: metrics.Meter = NoOpMeter("noop")
|
||||
_is_metrics_initialized: bool = False
|
||||
|
||||
# Endpoints to include in endpoint metrics tracking (opt-in) vs tracing.py opt-out
|
||||
_included_v1_endpoints_regex: List[str] = [
|
||||
"^POST /v1/agents/(?P<agent_id>[^/]+)/messages$",
|
||||
"^POST /v1/agents/(?P<agent_id>[^/]+)/messages/stream$",
|
||||
"^POST /v1/agents/(?P<agent_id>[^/]+)/messages/async$",
|
||||
]
|
||||
|
||||
# Header attributes to set context with
|
||||
header_attributes = {
|
||||
"x-organization-id": "organization.id",
|
||||
"x-project-id": "project.id",
|
||||
"x-base-template-id": "base_template.id",
|
||||
"x-template-id": "template.id",
|
||||
"x-agent-id": "agent.id",
|
||||
}
|
||||
|
||||
|
||||
async def _otel_metric_middleware(request: Request, call_next):
|
||||
if not _is_metrics_initialized:
|
||||
return await call_next(request)
|
||||
|
||||
header_attributes = {
|
||||
"x-organization-id": "organization.id",
|
||||
"x-project-id": "project.id",
|
||||
"x-base-template-id": "base_template.id",
|
||||
"x-template-id": "template.id",
|
||||
"x-agent-id": "agent.id",
|
||||
}
|
||||
try:
|
||||
for header_key, otel_key in header_attributes.items():
|
||||
header_value = request.headers.get(header_key)
|
||||
if header_value:
|
||||
add_ctx_attribute(otel_key, header_value)
|
||||
for header_key, otel_key in header_attributes.items():
|
||||
header_value = request.headers.get(header_key)
|
||||
if header_value:
|
||||
add_ctx_attribute(otel_key, header_value)
|
||||
|
||||
# Opt-in check for latency / error tracking
|
||||
endpoint_path = f"{request.method} {request.url.path}"
|
||||
should_track_endpoint_metrics = any(re.match(regex, endpoint_path) for regex in _included_v1_endpoints_regex)
|
||||
|
||||
if not should_track_endpoint_metrics:
|
||||
return await call_next(request)
|
||||
except Exception:
|
||||
|
||||
# --- Opt-in endpoint metrics ---
|
||||
start_perf_counter_ns = time.perf_counter_ns()
|
||||
response = None
|
||||
status_code = 500 # reasonable default
|
||||
|
||||
try:
|
||||
response = await call_next(request)
|
||||
status_code = response.status_code
|
||||
return response
|
||||
except Exception as e:
|
||||
# Determine status code from exception
|
||||
status_code = getattr(e, "status_code", 500)
|
||||
raise
|
||||
finally:
|
||||
end_to_end_ms = ns_to_ms(time.perf_counter_ns() - start_perf_counter_ns)
|
||||
_record_endpoint_metrics(
|
||||
request=request,
|
||||
latency_ms=end_to_end_ms,
|
||||
status_code=status_code,
|
||||
)
|
||||
|
||||
|
||||
def _record_endpoint_metrics(
|
||||
request: Request,
|
||||
latency_ms: float,
|
||||
status_code: int,
|
||||
):
|
||||
"""Record endpoint latency and request count metrics."""
|
||||
try:
|
||||
# Get the route pattern for better endpoint naming
|
||||
route = request.scope.get("route")
|
||||
endpoint_name = route.path if route and hasattr(route, "path") else "unknown"
|
||||
|
||||
attrs = {
|
||||
"endpoint_path": endpoint_name,
|
||||
"method": request.method,
|
||||
"status_code": status_code,
|
||||
**get_ctx_attributes(),
|
||||
}
|
||||
from letta.otel.metric_registry import MetricRegistry
|
||||
|
||||
MetricRegistry().endpoint_e2e_ms_histogram.record(latency_ms, attributes=attrs)
|
||||
MetricRegistry().endpoint_request_counter.add(1, attributes=attrs)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to record endpoint metrics: {e}")
|
||||
|
||||
|
||||
def setup_metrics(
|
||||
|
||||
Reference in New Issue
Block a user