From e3dbb44fc92e2d5564672d8ebb06d7f39c6fef13 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Wed, 11 Feb 2026 12:36:37 -0800 Subject: [PATCH] feat(telem): support reading from clickhouse traces (#9431) draft --- letta/services/clickhouse_provider_traces.py | 39 +++++++++---------- .../provider_trace_backends/clickhouse.py | 12 ++---- letta/services/telemetry_manager.py | 2 +- 3 files changed, 23 insertions(+), 30 deletions(-) diff --git a/letta/services/clickhouse_provider_traces.py b/letta/services/clickhouse_provider_traces.py index 52ed24b0..5a86dc5e 100644 --- a/letta/services/clickhouse_provider_traces.py +++ b/letta/services/clickhouse_provider_traces.py @@ -41,10 +41,10 @@ def _parse_clickhouse_endpoint(endpoint: str) -> tuple[str, int, bool]: @dataclass(frozen=True) class ClickhouseProviderTraceRow: created_at: Any - trace_id: str + id: str step_id: str - request_data: str | None - response_data: str | None + request_json: str | None + response_json: str | None @singleton @@ -87,15 +87,15 @@ class ClickhouseProviderTraceReader: client = self._get_client() query = """ SELECT - Timestamp AS created_at, - TraceId AS trace_id, - SpanAttributes['parameter.step_id'] AS step_id, - SpanAttributes['request_data'] AS request_data, - SpanAttributes['response_data'] AS response_data - FROM llm_provider_traces - WHERE SpanAttributes['parameter.step_id'] = %(step_id)s - AND position(SpanAttributes['parameter.actor'], %(org_match)s) > 0 - ORDER BY Timestamp DESC + created_at, + id, + step_id, + request_json, + response_json + FROM llm_traces + WHERE step_id = %(step_id)s + AND organization_id = %(organization_id)s + ORDER BY created_at DESC LIMIT 1 """ @@ -103,7 +103,7 @@ class ClickhouseProviderTraceReader: query, parameters={ "step_id": step_id, - "org_match": f"organization_id='{organization_id}'", + "organization_id": organization_id, }, ) @@ -111,13 +111,12 @@ class ClickhouseProviderTraceReader: return None row = result.result_rows[0] - # Order matches SELECT above return ClickhouseProviderTraceRow( created_at=row[0], - trace_id=row[1], + id=row[1], step_id=row[2], - request_data=row[3], - response_data=row[4], + request_json=row[3], + response_json=row[4], ) async def get_provider_trace_by_step_id_async(self, *, step_id: str, organization_id: str) -> ProviderTrace | None: @@ -126,9 +125,9 @@ class ClickhouseProviderTraceReader: return None return ProviderTrace( - id=f"provider_trace-{row.trace_id}", + id=f"provider_trace-{row.id}", step_id=row.step_id, - request_json=_parse_json_maybe(row.request_data), - response_json=_parse_json_maybe(row.response_data), + request_json=_parse_json_maybe(row.request_json), + response_json=_parse_json_maybe(row.response_json), created_at=row.created_at, ) diff --git a/letta/services/provider_trace_backends/clickhouse.py b/letta/services/provider_trace_backends/clickhouse.py index 3f5f842e..77c8cf80 100644 --- a/letta/services/provider_trace_backends/clickhouse.py +++ b/letta/services/provider_trace_backends/clickhouse.py @@ -1,7 +1,6 @@ """ClickHouse provider trace backend. -Writes traces to the llm_traces table with denormalized columns for cost analytics. -Reads from the OTEL traces table (will eventually cut over to llm_traces). +Writes and reads from the llm_traces table with denormalized columns for cost analytics. """ import json @@ -22,12 +21,7 @@ logger = get_logger(__name__) class ClickhouseProviderTraceBackend(ProviderTraceBackendClient): - """ - ClickHouse backend for provider traces. - - - Writes go to llm_traces table (denormalized for cost analytics) - - Reads come from OTEL traces table (will cut over to llm_traces later) - """ + """ClickHouse backend for provider traces (reads and writes from llm_traces table).""" def __init__(self): self._reader = ClickhouseProviderTraceReader() @@ -71,7 +65,7 @@ class ClickhouseProviderTraceBackend(ProviderTraceBackendClient): step_id: str, actor: User, ) -> ProviderTrace | None: - """Read from OTEL traces table (will cut over to llm_traces later).""" + """Read provider trace from llm_traces table by step_id.""" return await self._reader.get_provider_trace_by_step_id_async( step_id=step_id, organization_id=actor.organization_id, diff --git a/letta/services/telemetry_manager.py b/letta/services/telemetry_manager.py index 94d6ef40..c7291bfd 100644 --- a/letta/services/telemetry_manager.py +++ b/letta/services/telemetry_manager.py @@ -20,7 +20,7 @@ class TelemetryManager: Supports multiple backends for dual-write scenarios (e.g., migration). Configure via LETTA_TELEMETRY_PROVIDER_TRACE_BACKEND (comma-separated): - postgres: Store in PostgreSQL (default) - - clickhouse: Store in ClickHouse (writes to llm_traces table, reads from OTEL traces) + - clickhouse: Store in ClickHouse (reads and writes from llm_traces table) - socket: Store via Unix socket to external sidecar Example: LETTA_TELEMETRY_PROVIDER_TRACE_BACKEND=postgres,clickhouse