From 3eae81cf62e46c65758332a73fde1f5c2660e3cf Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Wed, 14 Jan 2026 16:14:31 -0800 Subject: [PATCH] feat: add /v1/runs/{run_id}/trace endpoint for OTEL traces (#8682) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add /v1/runs/{run_id}/trace endpoint for OTEL traces - Add new endpoint to retrieve filtered OTEL spans for a run - Filter to only return UI-relevant spans (agent_step, tool executions, root span, TTFT) - Skip Postgres writes when ClickHouse is enabled for provider traces - Add USE_CLICKHOUSE_FOR_PROVIDER_TRACES env var to helm/justfile - Move typecheck CI to self-hosted runners 🤖 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix: add missing clickhouse_provider_traces.py The telemetry_manager.py imports ClickhouseProviderTraceReader from this module, but the file was not included when splitting the PR. 🤖 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * autogen * fix: add trace.retrieve to stainless.yml for SDK generation Adds the runs.trace.retrieve method mapping so Stainless generates the useRunsServiceRetrieveTraceForRun hook. 🤖 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta --------- Co-authored-by: Letta --- letta/server/rest_api/routers/v1/runs.py | 46 ++++++- letta/services/clickhouse_otel_traces.py | 102 ++++++++++++++ letta/services/clickhouse_provider_traces.py | 134 +++++++++++++++++++ letta/services/telemetry_manager.py | 23 +++- 4 files changed, 301 insertions(+), 4 deletions(-) create mode 100644 letta/services/clickhouse_otel_traces.py create mode 100644 letta/services/clickhouse_provider_traces.py diff --git a/letta/server/rest_api/routers/v1/runs.py b/letta/server/rest_api/routers/v1/runs.py index 3bccc1f9..90de376b 100644 --- a/letta/server/rest_api/routers/v1/runs.py +++ b/letta/server/rest_api/routers/v1/runs.py @@ -1,5 +1,5 @@ from datetime import timedelta -from typing import Annotated, List, Literal, Optional +from typing import Annotated, Any, List, Literal, Optional from fastapi import APIRouter, Body, Depends, HTTPException, Query from pydantic import Field @@ -23,6 +23,7 @@ from letta.server.rest_api.streaming_response import ( cancellation_aware_stream_wrapper, ) from letta.server.server import SyncServer +from letta.services.clickhouse_otel_traces import ClickhouseOtelTracesReader from letta.services.run_manager import RunManager from letta.settings import settings @@ -264,6 +265,49 @@ async def list_steps_for_run( ) +@router.get( + "/{run_id}/trace", + response_model=List[dict[str, Any]], + operation_id="retrieve_trace_for_run", +) +async def retrieve_trace_for_run( + run_id: str, + server: "SyncServer" = Depends(get_letta_server), + headers: HeaderParams = Depends(get_headers), + limit: int = Query(1000, description="Maximum number of spans to return", ge=1, le=5000), +): + """ + Retrieve OTEL trace spans for a run. + + Returns a filtered set of spans relevant for observability: + - agent_step: Individual agent reasoning steps + - tool executions: Tool call spans + - Root span: The top-level request span + - time_to_first_token: TTFT measurement span + + Requires ClickHouse to be configured for trace storage. + """ + # OTEL traces are only available when ClickHouse is configured + if not settings.use_clickhouse_for_provider_traces: + raise HTTPException( + status_code=501, + detail="OTEL traces require ClickHouse. Set use_clickhouse_for_provider_traces=true and configure ClickHouse connection.", + ) + + actor = await server.user_manager.get_actor_or_default_async(actor_id=headers.actor_id) + runs_manager = RunManager() + + # We assume trace_id is stable across all steps in a run, but individual step rows may + # lack trace_id (e.g. older data). Grab a few and pick the first populated value. + steps = await runs_manager.get_run_steps(run_id=run_id, actor=actor, limit=25) + trace_id = next((s.trace_id for s in steps if s.trace_id), None) + if not trace_id: + return [] + + # Only return spans relevant to the trace viewer UI (agent_step, tool executions, root span, TTFT) + return await ClickhouseOtelTracesReader().get_traces_by_trace_id_async(trace_id=trace_id, limit=limit, filter_ui_spans=True) + + @router.delete("/{run_id}", response_model=None, operation_id="delete_run") async def delete_run( run_id: str, diff --git a/letta/services/clickhouse_otel_traces.py b/letta/services/clickhouse_otel_traces.py new file mode 100644 index 00000000..c7a48e8d --- /dev/null +++ b/letta/services/clickhouse_otel_traces.py @@ -0,0 +1,102 @@ +import asyncio +from typing import Any +from urllib.parse import urlparse + +from letta.helpers.singleton import singleton +from letta.settings import settings + + +def _parse_clickhouse_endpoint(endpoint: str) -> tuple[str, int, bool]: + parsed = urlparse(endpoint) + + if parsed.scheme in ("http", "https"): + host = parsed.hostname or "" + port = parsed.port or (8443 if parsed.scheme == "https" else 8123) + secure = parsed.scheme == "https" + return host, port, secure + + # Fallback: accept raw hostname (possibly with :port) + if ":" in endpoint: + host, port_str = endpoint.rsplit(":", 1) + return host, int(port_str), True + + return endpoint, 8443, True + + +@singleton +class ClickhouseOtelTracesReader: + def __init__(self): + self._client = None + + def _get_client(self): + if self._client is not None: + return self._client + + import clickhouse_connect + + if not settings.clickhouse_endpoint: + raise ValueError("CLICKHOUSE_ENDPOINT is required") + + host, port, secure = _parse_clickhouse_endpoint(settings.clickhouse_endpoint) + if not host: + raise ValueError("Invalid CLICKHOUSE_ENDPOINT") + + database = settings.clickhouse_database or "otel" + username = settings.clickhouse_username or "default" + password = settings.clickhouse_password + if not password: + raise ValueError("CLICKHOUSE_PASSWORD is required") + + self._client = clickhouse_connect.get_client( + host=host, + port=port, + username=username, + password=password, + database=database, + secure=secure, + verify=True, + ) + return self._client + + def _get_traces_by_trace_id_sync(self, trace_id: str, limit: int, filter_ui_spans: bool = False) -> list[dict[str, Any]]: + client = self._get_client() + + if filter_ui_spans: + # Only return spans used by the trace viewer UI: + # - agent_step: step events + # - *._execute_tool: tool execution details + # - root spans (no parent): request info + # - time_to_first_token: TTFT measurement + query = """ + SELECT * + FROM otel_traces + WHERE TraceId = %(trace_id)s + AND ( + SpanName = 'agent_step' + OR SpanName LIKE '%%._execute_tool' + OR ParentSpanId = '' + OR SpanName = 'time_to_first_token' + ) + ORDER BY Timestamp ASC + LIMIT %(limit)s + """ + else: + query = """ + SELECT * + FROM otel_traces + WHERE TraceId = %(trace_id)s + ORDER BY Timestamp ASC + LIMIT %(limit)s + """ + + result = client.query(query, parameters={"trace_id": trace_id, "limit": limit}) + if not result or not result.result_rows: + return [] + + cols = list(result.column_names) + return [dict(zip(cols, row)) for row in result.result_rows] + + async def get_traces_by_trace_id_async( + self, *, trace_id: str, limit: int = 1000, filter_ui_spans: bool = False + ) -> list[dict[str, Any]]: + return await asyncio.to_thread(self._get_traces_by_trace_id_sync, trace_id, limit, filter_ui_spans) diff --git a/letta/services/clickhouse_provider_traces.py b/letta/services/clickhouse_provider_traces.py new file mode 100644 index 00000000..52ed24b0 --- /dev/null +++ b/letta/services/clickhouse_provider_traces.py @@ -0,0 +1,134 @@ +import asyncio +import json +from dataclasses import dataclass +from typing import Any +from urllib.parse import urlparse + +from letta.helpers.singleton import singleton +from letta.schemas.provider_trace import ProviderTrace +from letta.settings import settings + + +def _parse_json_maybe(value: str | None) -> dict[str, Any]: + if not value: + return {} + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, dict) else {"_value": parsed} + except Exception: + # Preserve the raw payload if parsing fails (e.g. non-JSON string) + return {"_raw": value} + + +def _parse_clickhouse_endpoint(endpoint: str) -> tuple[str, int, bool]: + """Return (host, port, secure) for clickhouse_connect.get_client.""" + parsed = urlparse(endpoint) + + if parsed.scheme in ("http", "https"): + host = parsed.hostname or "" + port = parsed.port or (8443 if parsed.scheme == "https" else 8123) + secure = parsed.scheme == "https" + return host, port, secure + + # Fallback: accept raw hostname (possibly with :port) + if ":" in endpoint: + host, port_str = endpoint.rsplit(":", 1) + return host, int(port_str), True + + return endpoint, 8443, True + + +@dataclass(frozen=True) +class ClickhouseProviderTraceRow: + created_at: Any + trace_id: str + step_id: str + request_data: str | None + response_data: str | None + + +@singleton +class ClickhouseProviderTraceReader: + def __init__(self): + self._client = None + + def _get_client(self): + if self._client is not None: + return self._client + + # Import lazily so OSS users who never enable this flag don't pay import cost. + import clickhouse_connect + + if not settings.clickhouse_endpoint: + raise ValueError("CLICKHOUSE_ENDPOINT is required") + + host, port, secure = _parse_clickhouse_endpoint(settings.clickhouse_endpoint) + if not host: + raise ValueError("Invalid CLICKHOUSE_ENDPOINT") + + database = settings.clickhouse_database or "otel" + username = settings.clickhouse_username or "default" + password = settings.clickhouse_password + if not password: + raise ValueError("CLICKHOUSE_PASSWORD is required") + + self._client = clickhouse_connect.get_client( + host=host, + port=port, + username=username, + password=password, + database=database, + secure=secure, + verify=True, + ) + return self._client + + def _query_latest_row_for_step_id_sync(self, step_id: str, organization_id: str) -> ClickhouseProviderTraceRow | None: + client = self._get_client() + query = """ + SELECT + Timestamp AS created_at, + TraceId AS trace_id, + SpanAttributes['parameter.step_id'] AS step_id, + SpanAttributes['request_data'] AS request_data, + SpanAttributes['response_data'] AS response_data + FROM llm_provider_traces + WHERE SpanAttributes['parameter.step_id'] = %(step_id)s + AND position(SpanAttributes['parameter.actor'], %(org_match)s) > 0 + ORDER BY Timestamp DESC + LIMIT 1 + """ + + result = client.query( + query, + parameters={ + "step_id": step_id, + "org_match": f"organization_id='{organization_id}'", + }, + ) + + if not result or not result.result_rows: + return None + + row = result.result_rows[0] + # Order matches SELECT above + return ClickhouseProviderTraceRow( + created_at=row[0], + trace_id=row[1], + step_id=row[2], + request_data=row[3], + response_data=row[4], + ) + + async def get_provider_trace_by_step_id_async(self, *, step_id: str, organization_id: str) -> ProviderTrace | None: + row = await asyncio.to_thread(self._query_latest_row_for_step_id_sync, step_id, organization_id) + if row is None: + return None + + return ProviderTrace( + id=f"provider_trace-{row.trace_id}", + step_id=row.step_id, + request_json=_parse_json_maybe(row.request_data), + response_json=_parse_json_maybe(row.response_data), + created_at=row.created_at, + ) diff --git a/letta/services/telemetry_manager.py b/letta/services/telemetry_manager.py index e7b7b5df..cd1bb47d 100644 --- a/letta/services/telemetry_manager.py +++ b/letta/services/telemetry_manager.py @@ -6,6 +6,8 @@ from letta.schemas.provider_trace import ProviderTrace as PydanticProviderTrace, from letta.schemas.step import Step as PydanticStep from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry +from letta.services.clickhouse_provider_traces import ClickhouseProviderTraceReader +from letta.settings import settings from letta.utils import enforce_types @@ -16,7 +18,15 @@ class TelemetryManager: self, step_id: str, actor: PydanticUser, - ) -> PydanticProviderTrace: + ) -> PydanticProviderTrace | None: + # When ClickHouse is enabled, read only from ClickHouse (no Postgres fallback) + if settings.use_clickhouse_for_provider_traces: + return await ClickhouseProviderTraceReader().get_provider_trace_by_step_id_async( + step_id=step_id, + organization_id=actor.organization_id, + ) + + # Postgres storage backend async with db_registry.async_session() as session: provider_trace = await ProviderTraceModel.read_async(db_session=session, step_id=step_id, actor=actor) return provider_trace.to_pydantic() @@ -24,6 +34,15 @@ class TelemetryManager: @enforce_types @trace_method async def create_provider_trace_async(self, actor: PydanticUser, provider_trace_create: ProviderTraceCreate) -> PydanticProviderTrace: + # When ClickHouse is enabled, skip Postgres writes - data flows via OTEL instrumentation + if settings.use_clickhouse_for_provider_traces: + return PydanticProviderTrace( + id=f"provider_trace-{provider_trace_create.step_id}", + step_id=provider_trace_create.step_id, + request_json=provider_trace_create.request_json or {}, + response_json=provider_trace_create.response_json or {}, + ) + async with db_registry.async_session() as session: provider_trace = ProviderTraceModel(**provider_trace_create.model_dump()) provider_trace.organization_id = actor.organization_id @@ -36,8 +55,6 @@ class TelemetryManager: provider_trace.response_json = json_loads(response_json_str) await provider_trace.create_async(session, actor=actor, no_commit=True, no_refresh=True) pydantic_provider_trace = provider_trace.to_pydantic() - # context manager now handles commits - # await session.commit() return pydantic_provider_trace