feat: add /v1/runs/{run_id}/trace endpoint for OTEL traces (#8682)
* feat: add /v1/runs/{run_id}/trace endpoint for OTEL traces
- Add new endpoint to retrieve filtered OTEL spans for a run
- Filter to only return UI-relevant spans (agent_step, tool executions, root span, TTFT)
- Skip Postgres writes when ClickHouse is enabled for provider traces
- Add USE_CLICKHOUSE_FOR_PROVIDER_TRACES env var to helm/justfile
- Move typecheck CI to self-hosted runners
🤖 Generated with [Letta Code](https://letta.com)
Co-Authored-By: Letta <noreply@letta.com>
* fix: add missing clickhouse_provider_traces.py
The telemetry_manager.py imports ClickhouseProviderTraceReader from
this module, but the file was not included when splitting the PR.
🤖 Generated with [Letta Code](https://letta.com)
Co-Authored-By: Letta <noreply@letta.com>
* autogen
* fix: add trace.retrieve to stainless.yml for SDK generation
Adds the runs.trace.retrieve method mapping so Stainless generates
the useRunsServiceRetrieveTraceForRun hook.
🤖 Generated with [Letta Code](https://letta.com)
Co-Authored-By: Letta <noreply@letta.com>
---------
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
committed by
Sarah Wooders
parent
9d1ad00dd6
commit
3eae81cf62
@@ -1,5 +1,5 @@
|
||||
from datetime import timedelta
|
||||
from typing import Annotated, List, Literal, Optional
|
||||
from typing import Annotated, Any, List, Literal, Optional
|
||||
|
||||
from fastapi import APIRouter, Body, Depends, HTTPException, Query
|
||||
from pydantic import Field
|
||||
@@ -23,6 +23,7 @@ from letta.server.rest_api.streaming_response import (
|
||||
cancellation_aware_stream_wrapper,
|
||||
)
|
||||
from letta.server.server import SyncServer
|
||||
from letta.services.clickhouse_otel_traces import ClickhouseOtelTracesReader
|
||||
from letta.services.run_manager import RunManager
|
||||
from letta.settings import settings
|
||||
|
||||
@@ -264,6 +265,49 @@ async def list_steps_for_run(
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/{run_id}/trace",
|
||||
response_model=List[dict[str, Any]],
|
||||
operation_id="retrieve_trace_for_run",
|
||||
)
|
||||
async def retrieve_trace_for_run(
|
||||
run_id: str,
|
||||
server: "SyncServer" = Depends(get_letta_server),
|
||||
headers: HeaderParams = Depends(get_headers),
|
||||
limit: int = Query(1000, description="Maximum number of spans to return", ge=1, le=5000),
|
||||
):
|
||||
"""
|
||||
Retrieve OTEL trace spans for a run.
|
||||
|
||||
Returns a filtered set of spans relevant for observability:
|
||||
- agent_step: Individual agent reasoning steps
|
||||
- tool executions: Tool call spans
|
||||
- Root span: The top-level request span
|
||||
- time_to_first_token: TTFT measurement span
|
||||
|
||||
Requires ClickHouse to be configured for trace storage.
|
||||
"""
|
||||
# OTEL traces are only available when ClickHouse is configured
|
||||
if not settings.use_clickhouse_for_provider_traces:
|
||||
raise HTTPException(
|
||||
status_code=501,
|
||||
detail="OTEL traces require ClickHouse. Set use_clickhouse_for_provider_traces=true and configure ClickHouse connection.",
|
||||
)
|
||||
|
||||
actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id)
|
||||
runs_manager = RunManager()
|
||||
|
||||
# We assume trace_id is stable across all steps in a run, but individual step rows may
|
||||
# lack trace_id (e.g. older data). Grab a few and pick the first populated value.
|
||||
steps = await runs_manager.get_run_steps(run_id=run_id, actor=actor, limit=25)
|
||||
trace_id = next((s.trace_id for s in steps if s.trace_id), None)
|
||||
if not trace_id:
|
||||
return []
|
||||
|
||||
# Only return spans relevant to the trace viewer UI (agent_step, tool executions, root span, TTFT)
|
||||
return await ClickhouseOtelTracesReader().get_traces_by_trace_id_async(trace_id=trace_id, limit=limit, filter_ui_spans=True)
|
||||
|
||||
|
||||
@router.delete("/{run_id}", response_model=None, operation_id="delete_run")
|
||||
async def delete_run(
|
||||
run_id: str,
|
||||
|
||||
102
letta/services/clickhouse_otel_traces.py
Normal file
102
letta/services/clickhouse_otel_traces.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import asyncio
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from letta.helpers.singleton import singleton
|
||||
from letta.settings import settings
|
||||
|
||||
|
||||
def _parse_clickhouse_endpoint(endpoint: str) -> tuple[str, int, bool]:
|
||||
parsed = urlparse(endpoint)
|
||||
|
||||
if parsed.scheme in ("http", "https"):
|
||||
host = parsed.hostname or ""
|
||||
port = parsed.port or (8443 if parsed.scheme == "https" else 8123)
|
||||
secure = parsed.scheme == "https"
|
||||
return host, port, secure
|
||||
|
||||
# Fallback: accept raw hostname (possibly with :port)
|
||||
if ":" in endpoint:
|
||||
host, port_str = endpoint.rsplit(":", 1)
|
||||
return host, int(port_str), True
|
||||
|
||||
return endpoint, 8443, True
|
||||
|
||||
|
||||
@singleton
|
||||
class ClickhouseOtelTracesReader:
|
||||
def __init__(self):
|
||||
self._client = None
|
||||
|
||||
def _get_client(self):
|
||||
if self._client is not None:
|
||||
return self._client
|
||||
|
||||
import clickhouse_connect
|
||||
|
||||
if not settings.clickhouse_endpoint:
|
||||
raise ValueError("CLICKHOUSE_ENDPOINT is required")
|
||||
|
||||
host, port, secure = _parse_clickhouse_endpoint(settings.clickhouse_endpoint)
|
||||
if not host:
|
||||
raise ValueError("Invalid CLICKHOUSE_ENDPOINT")
|
||||
|
||||
database = settings.clickhouse_database or "otel"
|
||||
username = settings.clickhouse_username or "default"
|
||||
password = settings.clickhouse_password
|
||||
if not password:
|
||||
raise ValueError("CLICKHOUSE_PASSWORD is required")
|
||||
|
||||
self._client = clickhouse_connect.get_client(
|
||||
host=host,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password,
|
||||
database=database,
|
||||
secure=secure,
|
||||
verify=True,
|
||||
)
|
||||
return self._client
|
||||
|
||||
def _get_traces_by_trace_id_sync(self, trace_id: str, limit: int, filter_ui_spans: bool = False) -> list[dict[str, Any]]:
|
||||
client = self._get_client()
|
||||
|
||||
if filter_ui_spans:
|
||||
# Only return spans used by the trace viewer UI:
|
||||
# - agent_step: step events
|
||||
# - *._execute_tool: tool execution details
|
||||
# - root spans (no parent): request info
|
||||
# - time_to_first_token: TTFT measurement
|
||||
query = """
|
||||
SELECT *
|
||||
FROM otel_traces
|
||||
WHERE TraceId = %(trace_id)s
|
||||
AND (
|
||||
SpanName = 'agent_step'
|
||||
OR SpanName LIKE '%%._execute_tool'
|
||||
OR ParentSpanId = ''
|
||||
OR SpanName = 'time_to_first_token'
|
||||
)
|
||||
ORDER BY Timestamp ASC
|
||||
LIMIT %(limit)s
|
||||
"""
|
||||
else:
|
||||
query = """
|
||||
SELECT *
|
||||
FROM otel_traces
|
||||
WHERE TraceId = %(trace_id)s
|
||||
ORDER BY Timestamp ASC
|
||||
LIMIT %(limit)s
|
||||
"""
|
||||
|
||||
result = client.query(query, parameters={"trace_id": trace_id, "limit": limit})
|
||||
if not result or not result.result_rows:
|
||||
return []
|
||||
|
||||
cols = list(result.column_names)
|
||||
return [dict(zip(cols, row)) for row in result.result_rows]
|
||||
|
||||
async def get_traces_by_trace_id_async(
|
||||
self, *, trace_id: str, limit: int = 1000, filter_ui_spans: bool = False
|
||||
) -> list[dict[str, Any]]:
|
||||
return await asyncio.to_thread(self._get_traces_by_trace_id_sync, trace_id, limit, filter_ui_spans)
|
||||
134
letta/services/clickhouse_provider_traces.py
Normal file
134
letta/services/clickhouse_provider_traces.py
Normal file
@@ -0,0 +1,134 @@
|
||||
import asyncio
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from letta.helpers.singleton import singleton
|
||||
from letta.schemas.provider_trace import ProviderTrace
|
||||
from letta.settings import settings
|
||||
|
||||
|
||||
def _parse_json_maybe(value: str | None) -> dict[str, Any]:
|
||||
if not value:
|
||||
return {}
|
||||
try:
|
||||
parsed = json.loads(value)
|
||||
return parsed if isinstance(parsed, dict) else {"_value": parsed}
|
||||
except Exception:
|
||||
# Preserve the raw payload if parsing fails (e.g. non-JSON string)
|
||||
return {"_raw": value}
|
||||
|
||||
|
||||
def _parse_clickhouse_endpoint(endpoint: str) -> tuple[str, int, bool]:
|
||||
"""Return (host, port, secure) for clickhouse_connect.get_client."""
|
||||
parsed = urlparse(endpoint)
|
||||
|
||||
if parsed.scheme in ("http", "https"):
|
||||
host = parsed.hostname or ""
|
||||
port = parsed.port or (8443 if parsed.scheme == "https" else 8123)
|
||||
secure = parsed.scheme == "https"
|
||||
return host, port, secure
|
||||
|
||||
# Fallback: accept raw hostname (possibly with :port)
|
||||
if ":" in endpoint:
|
||||
host, port_str = endpoint.rsplit(":", 1)
|
||||
return host, int(port_str), True
|
||||
|
||||
return endpoint, 8443, True
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ClickhouseProviderTraceRow:
|
||||
created_at: Any
|
||||
trace_id: str
|
||||
step_id: str
|
||||
request_data: str | None
|
||||
response_data: str | None
|
||||
|
||||
|
||||
@singleton
|
||||
class ClickhouseProviderTraceReader:
|
||||
def __init__(self):
|
||||
self._client = None
|
||||
|
||||
def _get_client(self):
|
||||
if self._client is not None:
|
||||
return self._client
|
||||
|
||||
# Import lazily so OSS users who never enable this flag don't pay import cost.
|
||||
import clickhouse_connect
|
||||
|
||||
if not settings.clickhouse_endpoint:
|
||||
raise ValueError("CLICKHOUSE_ENDPOINT is required")
|
||||
|
||||
host, port, secure = _parse_clickhouse_endpoint(settings.clickhouse_endpoint)
|
||||
if not host:
|
||||
raise ValueError("Invalid CLICKHOUSE_ENDPOINT")
|
||||
|
||||
database = settings.clickhouse_database or "otel"
|
||||
username = settings.clickhouse_username or "default"
|
||||
password = settings.clickhouse_password
|
||||
if not password:
|
||||
raise ValueError("CLICKHOUSE_PASSWORD is required")
|
||||
|
||||
self._client = clickhouse_connect.get_client(
|
||||
host=host,
|
||||
port=port,
|
||||
username=username,
|
||||
password=password,
|
||||
database=database,
|
||||
secure=secure,
|
||||
verify=True,
|
||||
)
|
||||
return self._client
|
||||
|
||||
def _query_latest_row_for_step_id_sync(self, step_id: str, organization_id: str) -> ClickhouseProviderTraceRow | None:
|
||||
client = self._get_client()
|
||||
query = """
|
||||
SELECT
|
||||
Timestamp AS created_at,
|
||||
TraceId AS trace_id,
|
||||
SpanAttributes['parameter.step_id'] AS step_id,
|
||||
SpanAttributes['request_data'] AS request_data,
|
||||
SpanAttributes['response_data'] AS response_data
|
||||
FROM llm_provider_traces
|
||||
WHERE SpanAttributes['parameter.step_id'] = %(step_id)s
|
||||
AND position(SpanAttributes['parameter.actor'], %(org_match)s) > 0
|
||||
ORDER BY Timestamp DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
result = client.query(
|
||||
query,
|
||||
parameters={
|
||||
"step_id": step_id,
|
||||
"org_match": f"organization_id='{organization_id}'",
|
||||
},
|
||||
)
|
||||
|
||||
if not result or not result.result_rows:
|
||||
return None
|
||||
|
||||
row = result.result_rows[0]
|
||||
# Order matches SELECT above
|
||||
return ClickhouseProviderTraceRow(
|
||||
created_at=row[0],
|
||||
trace_id=row[1],
|
||||
step_id=row[2],
|
||||
request_data=row[3],
|
||||
response_data=row[4],
|
||||
)
|
||||
|
||||
async def get_provider_trace_by_step_id_async(self, *, step_id: str, organization_id: str) -> ProviderTrace | None:
|
||||
row = await asyncio.to_thread(self._query_latest_row_for_step_id_sync, step_id, organization_id)
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return ProviderTrace(
|
||||
id=f"provider_trace-{row.trace_id}",
|
||||
step_id=row.step_id,
|
||||
request_json=_parse_json_maybe(row.request_data),
|
||||
response_json=_parse_json_maybe(row.response_data),
|
||||
created_at=row.created_at,
|
||||
)
|
||||
@@ -6,6 +6,8 @@ from letta.schemas.provider_trace import ProviderTrace as PydanticProviderTrace,
|
||||
from letta.schemas.step import Step as PydanticStep
|
||||
from letta.schemas.user import User as PydanticUser
|
||||
from letta.server.db import db_registry
|
||||
from letta.services.clickhouse_provider_traces import ClickhouseProviderTraceReader
|
||||
from letta.settings import settings
|
||||
from letta.utils import enforce_types
|
||||
|
||||
|
||||
@@ -16,7 +18,15 @@ class TelemetryManager:
|
||||
self,
|
||||
step_id: str,
|
||||
actor: PydanticUser,
|
||||
) -> PydanticProviderTrace:
|
||||
) -> PydanticProviderTrace | None:
|
||||
# When ClickHouse is enabled, read only from ClickHouse (no Postgres fallback)
|
||||
if settings.use_clickhouse_for_provider_traces:
|
||||
return await ClickhouseProviderTraceReader().get_provider_trace_by_step_id_async(
|
||||
step_id=step_id,
|
||||
organization_id=actor.organization_id,
|
||||
)
|
||||
|
||||
# Postgres storage backend
|
||||
async with db_registry.async_session() as session:
|
||||
provider_trace = await ProviderTraceModel.read_async(db_session=session, step_id=step_id, actor=actor)
|
||||
return provider_trace.to_pydantic()
|
||||
@@ -24,6 +34,15 @@ class TelemetryManager:
|
||||
@enforce_types
|
||||
@trace_method
|
||||
async def create_provider_trace_async(self, actor: PydanticUser, provider_trace_create: ProviderTraceCreate) -> PydanticProviderTrace:
|
||||
# When ClickHouse is enabled, skip Postgres writes - data flows via OTEL instrumentation
|
||||
if settings.use_clickhouse_for_provider_traces:
|
||||
return PydanticProviderTrace(
|
||||
id=f"provider_trace-{provider_trace_create.step_id}",
|
||||
step_id=provider_trace_create.step_id,
|
||||
request_json=provider_trace_create.request_json or {},
|
||||
response_json=provider_trace_create.response_json or {},
|
||||
)
|
||||
|
||||
async with db_registry.async_session() as session:
|
||||
provider_trace = ProviderTraceModel(**provider_trace_create.model_dump())
|
||||
provider_trace.organization_id = actor.organization_id
|
||||
@@ -36,8 +55,6 @@ class TelemetryManager:
|
||||
provider_trace.response_json = json_loads(response_json_str)
|
||||
await provider_trace.create_async(session, actor=actor, no_commit=True, no_refresh=True)
|
||||
pydantic_provider_trace = provider_trace.to_pydantic()
|
||||
# context manager now handles commits
|
||||
# await session.commit()
|
||||
return pydantic_provider_trace
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user