Files
letta-server/letta/services/llm_trace_reader.py
Sarah Wooders 4096b30cd7 feat: log LLM traces to clickhouse (#9111)
* feat: add non-streaming option for conversation messages

- Add ConversationMessageRequest with stream=True default (backwards compatible)
- stream=true (default): SSE streaming via StreamingService
- stream=false: JSON response via AgentLoop.load().step()

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* chore: regenerate API schema for ConversationMessageRequest

* feat: add direct ClickHouse storage for raw LLM traces

    Adds ability to store raw LLM request/response payloads directly in ClickHouse,
    bypassing OTEL span attribute size limits. This enables debugging and analytics
    on large LLM payloads (>10MB system prompts, large tool schemas, etc.).

    New files:
    - letta/schemas/llm_raw_trace.py: Pydantic schema with ClickHouse row helper
    - letta/services/llm_raw_trace_writer.py: Async batching writer (fire-and-forget)
    - letta/services/llm_raw_trace_reader.py: Reader with query methods
    - scripts/sql/clickhouse/llm_raw_traces.ddl: Production table DDL
    - scripts/sql/clickhouse/llm_raw_traces_local.ddl: Local dev DDL
    - apps/core/clickhouse-init.sql: Local dev initialization

    Modified:
    - letta/settings.py: Added 4 settings (store_llm_raw_traces, ttl, batch_size, flush_interval)
    - letta/llm_api/llm_client_base.py: Integration into request_async_with_telemetry
    - compose.yaml: Added ClickHouse service for local dev
    - justfile: Added clickhouse, clickhouse-cli, clickhouse-traces commands

    Feature disabled by default (LETTA_STORE_LLM_RAW_TRACES=false).
    Uses ZSTD(3) compression for 10-30x reduction on JSON payloads.

    🤖 Generated with [Letta Code](https://letta.com)

    Co-Authored-By: Letta <noreply@letta.com>

* fix: address code review feedback for LLM raw traces

Fixes based on code review feedback:

1. Fix ClickHouse endpoint parsing - default to secure=False for raw host:port
   inputs (was defaulting to HTTPS which breaks local dev)

2. Make raw trace writes truly fire-and-forget - use asyncio.create_task()
   instead of awaiting, so JSON serialization doesn't block request path

3. Add bounded queue (maxsize=10000) - prevents unbounded memory growth
   under load. Drops traces with warning if queue is full.

4. Fix deprecated asyncio usage - get_running_loop() instead of get_event_loop()

5. Add org_id fallback - use _telemetry_org_id if actor doesn't have it

6. Remove unused imports - json import in reader

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: add missing asyncio import and simplify JSON serialization

- Add missing 'import asyncio' that was causing 'name asyncio is not defined' error
- Remove unnecessary clean_double_escapes() function - the JSON is stored correctly,
  the clickhouse-client CLI was just adding extra escaping when displaying
- Update just clickhouse-trace to use Python client for correct JSON output

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* test: add clickhouse raw trace integration test

* test: simplify clickhouse trace assertions

* refactor: centralize usage parsing and stream error traces

Use per-client usage helpers for raw trace extraction and ensure streaming errors log requests with error metadata.

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* test: exercise provider usage parsing live

Make live OpenAI/Anthropic/Gemini requests with credential gating and validate Anthropic cache usage mapping when present.

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* test: fix usage parsing tests to pass

- Use GoogleAIClient with GEMINI_API_KEY instead of GoogleVertexClient
- Update model to gemini-2.0-flash (1.5-flash deprecated in v1beta)
- Add tools=[] for Gemini/Anthropic build_request_data

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: extract_usage_statistics returns LettaUsageStatistics

Standardize on LettaUsageStatistics as the canonical usage format returned by client helpers. Inline UsageStatistics construction for ChatCompletionResponse where needed.

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* feat: add is_byok and llm_config_json columns to ClickHouse traces

Extend llm_raw_traces table with:
- is_byok (UInt8): Track BYOK vs base provider usage for billing analytics
- llm_config_json (String, ZSTD): Store full LLM config for debugging and analysis

This enables queries like:
- BYOK usage breakdown by provider/model
- Config parameter analysis (temperature, max_tokens, etc.)
- Debugging specific request configurations

* feat: add tests for error traces, llm_config_json, and cache tokens

- Update llm_raw_trace_reader.py to query new columns (is_byok,
  cached_input_tokens, cache_write_tokens, reasoning_tokens, llm_config_json)
- Add test_error_trace_stored_in_clickhouse to verify error fields
- Add test_cache_tokens_stored_for_anthropic to verify cache token storage
- Update existing tests to verify llm_config_json is stored correctly
- Make llm_config required in log_provider_trace_async()
- Simplify provider extraction to use provider_name directly

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* ci: add ClickHouse integration tests to CI pipeline

- Add use-clickhouse option to reusable-test-workflow.yml
- Add ClickHouse service container with otel database
- Add schema initialization step using clickhouse-init.sql
- Add ClickHouse env vars (CLICKHOUSE_ENDPOINT, etc.)
- Add separate clickhouse-integration-tests job running
  integration_test_clickhouse_llm_raw_traces.py

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: simplify provider and org_id extraction in raw trace writer

- Use model_endpoint_type.value for provider (not provider_name)
- Simplify org_id to just self.actor.organization_id (actor is always pydantic)

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: simplify LLMRawTraceWriter with _enabled flag

- Check ClickHouse env vars once at init, set _enabled flag
- Early return in write_async/flush_async if not enabled
- Remove ValueError raises (never used)
- Simplify _get_client (no validation needed since already checked)

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: add LLMRawTraceWriter shutdown to FastAPI lifespan

Properly flush pending traces on graceful shutdown via lifespan
instead of relying only on atexit handler.

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* feat: add agent_tags column to ClickHouse traces

Store agent tags as Array(String) for filtering/analytics by tag.

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* cleanup

* fix(ci): fix ClickHouse schema initialization in CI

- Create database separately before loading SQL file
- Remove CREATE DATABASE from SQL file (handled in CI step)
- Add verification step to confirm table was created
- Use -sf flag for curl to fail on HTTP errors

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: simplify LLM trace writer with ClickHouse async_insert

- Use ClickHouse async_insert for server-side batching instead of manual queue/flush loop
- Sync cloud DDL schema with clickhouse-init.sql (add missing columns)
- Remove redundant llm_raw_traces_local.ddl
- Remove unused batch_size/flush_interval settings
- Update tests for simplified writer

Key changes:
- async_insert=1, wait_for_async_insert=1 for reliable server-side batching
- Simple per-trace retry with exponential backoff (max 3 retries)
- ~150 lines removed from writer

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: consolidate ClickHouse direct writes into TelemetryManager backend

- Add clickhouse_direct backend to provider_trace_backends
- Remove duplicate ClickHouse write logic from llm_client_base.py
- Configure via LETTA_TELEMETRY_PROVIDER_TRACE_BACKEND=postgres,clickhouse_direct

The clickhouse_direct backend:
- Converts ProviderTrace to LLMRawTrace
- Extracts usage stats from response JSON
- Writes via LLMRawTraceWriter with async_insert

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: address PR review comments and fix llm_config bug

Review comment fixes:
- Rename clickhouse_direct -> clickhouse_analytics (clearer purpose)
- Remove ClickHouse from OSS compose.yaml, create separate compose.clickhouse.yaml
- Delete redundant scripts/test_llm_raw_traces.py (use pytest tests)
- Remove unused llm_raw_traces_ttl_days setting (TTL handled in DDL)
- Fix socket description leak in telemetry_manager docstring
- Add cloud-only comment to clickhouse-init.sql
- Update justfile to use separate compose file

Bug fix:
- Fix llm_config not being passed to ProviderTrace in telemetry
- Now correctly populates provider, model, is_byok for all LLM calls
- Affects both request_async_with_telemetry and log_provider_trace_async

DDL optimizations:
- Add secondary indexes (bloom_filter for agent_id, model, step_id)
- Add minmax indexes for is_byok, is_error
- Change model and error_type to LowCardinality for faster GROUP BY

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: rename llm_raw_traces -> llm_traces

Address review feedback that "raw" is misleading since we denormalize fields.

Renames:
- Table: llm_raw_traces -> llm_traces
- Schema: LLMRawTrace -> LLMTrace
- Files: llm_raw_trace_{reader,writer}.py -> llm_trace_{reader,writer}.py
- Setting: store_llm_raw_traces -> store_llm_traces

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: update workflow references to llm_traces

Missed renaming table name in CI workflow files.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: update clickhouse_direct -> clickhouse_analytics in docstring

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* chore: remove inaccurate OTEL size limit comments

The 4MB limit is our own truncation logic, not an OTEL protocol limit.
The real benefit is denormalized columns for analytics queries.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* chore: remove local ClickHouse dev setup (cloud-only feature)

- Delete clickhouse-init.sql and compose.clickhouse.yaml
- Remove local clickhouse just commands
- Update CI to use cloud DDL with MergeTree for testing

clickhouse_analytics is a cloud-only feature. For local dev, use postgres backend.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: restore compose.yaml to match main

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* refactor: merge clickhouse_analytics into clickhouse backend

Per review feedback - having two separate backends was confusing.

Now the clickhouse backend:
- Writes to llm_traces table (denormalized for cost analytics)
- Reads from OTEL traces table (will cut over to llm_traces later)

Config: LETTA_TELEMETRY_PROVIDER_TRACE_BACKEND=postgres,clickhouse

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: correct path to DDL file in CI workflow

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* chore: add provider index to DDL for faster filtering

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: configure telemetry backend in clickhouse tests

Tests need to set telemetry_settings.provider_trace_backends to include
'clickhouse', otherwise traces are routed to default postgres backend.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: set provider_trace_backend field, not property

provider_trace_backends is a computed property, need to set the
underlying provider_trace_backend string field instead.

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: error trace test and error_type extraction

- Add TelemetryManager to error trace test so traces get written
- Fix error_type extraction to check top-level before nested error dict

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: use provider_trace.id for trace correlation across backends

- Pass provider_trace.id to LLMTrace instead of auto-generating
- Log warning if ID is missing (shouldn't happen, helps debug)
- Fallback to new UUID only if not set

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: trace ID correlation and concurrency issues

- Strip "provider_trace-" prefix from ID for UUID storage in ClickHouse
- Add asyncio.Lock to serialize writes (clickhouse_connect not thread-safe)
- Fix Anthropic prompt_tokens to include cached tokens for cost analytics
- Log warning if provider_trace.id is missing

🤖 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

---------

Co-authored-by: Letta <noreply@letta.com>
Co-authored-by: Caren Thomas <carenthomas@gmail.com>
2026-02-24 10:52:06 -08:00

463 lines
14 KiB
Python

"""ClickHouse reader for LLM analytics traces.
Reads LLM traces from ClickHouse for debugging, analytics, and auditing.
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Any, List, Optional
from urllib.parse import urlparse
from letta.helpers.singleton import singleton
from letta.log import get_logger
from letta.schemas.llm_trace import LLMTrace
from letta.settings import settings
logger = get_logger(__name__)
def _parse_clickhouse_endpoint(endpoint: str) -> tuple[str, int, bool]:
"""Return (host, port, secure) for clickhouse_connect.get_client.
Supports:
- http://host:port -> (host, port, False)
- https://host:port -> (host, port, True)
- host:port -> (host, port, False) # Default to insecure for local dev
- host -> (host, 8123, False) # Default HTTP port, insecure
"""
parsed = urlparse(endpoint)
if parsed.scheme in ("http", "https"):
host = parsed.hostname or ""
port = parsed.port or (8443 if parsed.scheme == "https" else 8123)
secure = parsed.scheme == "https"
return host, port, secure
# Fallback: accept raw hostname (possibly with :port)
# Default to insecure (HTTP) for local development
if ":" in endpoint:
host, port_str = endpoint.rsplit(":", 1)
return host, int(port_str), False
return endpoint, 8123, False
@dataclass(frozen=True)
class LLMTraceRow:
"""Raw row from ClickHouse query."""
id: str
organization_id: str
project_id: str
agent_id: str
agent_tags: List[str]
run_id: str
step_id: str
trace_id: str
call_type: str
provider: str
model: str
is_byok: bool
request_size_bytes: int
response_size_bytes: int
prompt_tokens: int
completion_tokens: int
total_tokens: int
cached_input_tokens: Optional[int]
cache_write_tokens: Optional[int]
reasoning_tokens: Optional[int]
latency_ms: int
is_error: bool
error_type: str
error_message: str
request_json: str
response_json: str
llm_config_json: str
created_at: datetime
@singleton
class LLMTraceReader:
"""
ClickHouse reader for raw LLM traces.
Provides query methods for debugging, analytics, and auditing.
Usage:
reader = LLMTraceReader()
trace = await reader.get_by_step_id_async(step_id="step-xxx", organization_id="org-xxx")
traces = await reader.list_by_agent_async(agent_id="agent-xxx", organization_id="org-xxx")
"""
def __init__(self):
self._client = None
def _get_client(self):
"""Initialize ClickHouse client on first use (lazy loading)."""
if self._client is not None:
return self._client
import clickhouse_connect
if not settings.clickhouse_endpoint:
raise ValueError("CLICKHOUSE_ENDPOINT is required")
host, port, secure = _parse_clickhouse_endpoint(settings.clickhouse_endpoint)
if not host:
raise ValueError("Invalid CLICKHOUSE_ENDPOINT")
database = settings.clickhouse_database or "otel"
username = settings.clickhouse_username or "default"
password = settings.clickhouse_password
if not password:
raise ValueError("CLICKHOUSE_PASSWORD is required")
self._client = clickhouse_connect.get_client(
host=host,
port=port,
username=username,
password=password,
database=database,
secure=secure,
verify=True,
)
return self._client
def _row_to_trace(self, row: tuple) -> LLMTrace:
"""Convert a ClickHouse row tuple to LLMTrace."""
return LLMTrace(
id=row[0],
organization_id=row[1],
project_id=row[2] or None,
agent_id=row[3] or None,
agent_tags=list(row[4]) if row[4] else [],
run_id=row[5] or None,
step_id=row[6] or None,
trace_id=row[7] or None,
call_type=row[8],
provider=row[9],
model=row[10],
is_byok=bool(row[11]),
request_size_bytes=row[12],
response_size_bytes=row[13],
prompt_tokens=row[14],
completion_tokens=row[15],
total_tokens=row[16],
cached_input_tokens=row[17],
cache_write_tokens=row[18],
reasoning_tokens=row[19],
latency_ms=row[20],
is_error=bool(row[21]),
error_type=row[22] or None,
error_message=row[23] or None,
request_json=row[24],
response_json=row[25],
llm_config_json=row[26] or "",
created_at=row[27],
)
def _query_sync(self, query: str, parameters: dict[str, Any]) -> List[tuple]:
"""Execute a query synchronously."""
client = self._get_client()
result = client.query(query, parameters=parameters)
return result.result_rows if result else []
# -------------------------------------------------------------------------
# Query Methods
# -------------------------------------------------------------------------
async def get_by_step_id_async(
self,
step_id: str,
organization_id: str,
) -> Optional[LLMTrace]:
"""
Get the most recent trace for a step.
Args:
step_id: The step ID to look up
organization_id: Organization ID for access control
Returns:
LLMTrace if found, None otherwise
"""
query = """
SELECT
id, organization_id, project_id, agent_id, agent_tags, run_id, step_id, trace_id,
call_type, provider, model, is_byok,
request_size_bytes, response_size_bytes,
prompt_tokens, completion_tokens, total_tokens,
cached_input_tokens, cache_write_tokens, reasoning_tokens,
latency_ms,
is_error, error_type, error_message,
request_json, response_json, llm_config_json,
created_at
FROM llm_traces
WHERE step_id = %(step_id)s
AND organization_id = %(organization_id)s
ORDER BY created_at DESC
LIMIT 1
"""
rows = await asyncio.to_thread(
self._query_sync,
query,
{"step_id": step_id, "organization_id": organization_id},
)
if not rows:
return None
return self._row_to_trace(rows[0])
async def get_by_id_async(
self,
trace_id: str,
organization_id: str,
) -> Optional[LLMTrace]:
"""
Get a trace by its ID.
Args:
trace_id: The trace ID (UUID)
organization_id: Organization ID for access control
Returns:
LLMTrace if found, None otherwise
"""
query = """
SELECT
id, organization_id, project_id, agent_id, agent_tags, run_id, step_id, trace_id,
call_type, provider, model, is_byok,
request_size_bytes, response_size_bytes,
prompt_tokens, completion_tokens, total_tokens,
cached_input_tokens, cache_write_tokens, reasoning_tokens,
latency_ms,
is_error, error_type, error_message,
request_json, response_json, llm_config_json,
created_at
FROM llm_traces
WHERE id = %(trace_id)s
AND organization_id = %(organization_id)s
LIMIT 1
"""
rows = await asyncio.to_thread(
self._query_sync,
query,
{"trace_id": trace_id, "organization_id": organization_id},
)
if not rows:
return None
return self._row_to_trace(rows[0])
async def list_by_agent_async(
self,
agent_id: str,
organization_id: str,
limit: int = 100,
offset: int = 0,
call_type: Optional[str] = None,
is_error: Optional[bool] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List[LLMTrace]:
"""
List traces for an agent with optional filters.
Args:
agent_id: Agent ID to filter by
organization_id: Organization ID for access control
limit: Maximum number of results (default 100)
offset: Offset for pagination
call_type: Filter by call type ('agent_step', 'summarization')
is_error: Filter by error status
start_date: Filter by created_at >= start_date
end_date: Filter by created_at <= end_date
Returns:
List of LLMTrace objects
"""
conditions = [
"agent_id = %(agent_id)s",
"organization_id = %(organization_id)s",
]
params: dict[str, Any] = {
"agent_id": agent_id,
"organization_id": organization_id,
"limit": limit,
"offset": offset,
}
if call_type:
conditions.append("call_type = %(call_type)s")
params["call_type"] = call_type
if is_error is not None:
conditions.append("is_error = %(is_error)s")
params["is_error"] = 1 if is_error else 0
if start_date:
conditions.append("created_at >= %(start_date)s")
params["start_date"] = start_date
if end_date:
conditions.append("created_at <= %(end_date)s")
params["end_date"] = end_date
where_clause = " AND ".join(conditions)
query = f"""
SELECT
id, organization_id, project_id, agent_id, agent_tags, run_id, step_id, trace_id,
call_type, provider, model, is_byok,
request_size_bytes, response_size_bytes,
prompt_tokens, completion_tokens, total_tokens,
cached_input_tokens, cache_write_tokens, reasoning_tokens,
latency_ms,
is_error, error_type, error_message,
request_json, response_json, llm_config_json,
created_at
FROM llm_traces
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT %(limit)s OFFSET %(offset)s
"""
rows = await asyncio.to_thread(self._query_sync, query, params)
return [self._row_to_trace(row) for row in rows]
async def get_usage_stats_async(
self,
organization_id: str,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
group_by: str = "model", # 'model', 'agent_id', 'call_type'
) -> List[dict[str, Any]]:
"""
Get aggregated usage statistics.
Args:
organization_id: Organization ID for access control
start_date: Filter by created_at >= start_date
end_date: Filter by created_at <= end_date
group_by: Field to group by ('model', 'agent_id', 'call_type')
Returns:
List of aggregated stats dicts
"""
valid_group_by = {"model", "agent_id", "call_type", "provider"}
if group_by not in valid_group_by:
raise ValueError(f"group_by must be one of {valid_group_by}")
conditions = ["organization_id = %(organization_id)s"]
params: dict[str, Any] = {"organization_id": organization_id}
if start_date:
conditions.append("created_at >= %(start_date)s")
params["start_date"] = start_date
if end_date:
conditions.append("created_at <= %(end_date)s")
params["end_date"] = end_date
where_clause = " AND ".join(conditions)
query = f"""
SELECT
{group_by},
count() as request_count,
sum(total_tokens) as total_tokens,
sum(prompt_tokens) as prompt_tokens,
sum(completion_tokens) as completion_tokens,
avg(latency_ms) as avg_latency_ms,
sum(request_size_bytes) as total_request_bytes,
sum(response_size_bytes) as total_response_bytes,
countIf(is_error = 1) as error_count
FROM llm_traces
WHERE {where_clause}
GROUP BY {group_by}
ORDER BY total_tokens DESC
"""
rows = await asyncio.to_thread(self._query_sync, query, params)
return [
{
group_by: row[0],
"request_count": row[1],
"total_tokens": row[2],
"prompt_tokens": row[3],
"completion_tokens": row[4],
"avg_latency_ms": row[5],
"total_request_bytes": row[6],
"total_response_bytes": row[7],
"error_count": row[8],
}
for row in rows
]
async def find_large_requests_async(
self,
organization_id: str,
min_size_bytes: int = 1_000_000, # 1MB default
limit: int = 100,
) -> List[LLMTrace]:
"""
Find traces with large request payloads (for debugging).
Args:
organization_id: Organization ID for access control
min_size_bytes: Minimum request size in bytes (default 1MB)
limit: Maximum number of results
Returns:
List of LLMTrace objects with large requests
"""
query = """
SELECT
id, organization_id, project_id, agent_id, agent_tags, run_id, step_id, trace_id,
call_type, provider, model, is_byok,
request_size_bytes, response_size_bytes,
prompt_tokens, completion_tokens, total_tokens,
cached_input_tokens, cache_write_tokens, reasoning_tokens,
latency_ms,
is_error, error_type, error_message,
request_json, response_json, llm_config_json,
created_at
FROM llm_traces
WHERE organization_id = %(organization_id)s
AND request_size_bytes >= %(min_size_bytes)s
ORDER BY request_size_bytes DESC
LIMIT %(limit)s
"""
rows = await asyncio.to_thread(
self._query_sync,
query,
{
"organization_id": organization_id,
"min_size_bytes": min_size_bytes,
"limit": limit,
},
)
return [self._row_to_trace(row) for row in rows]
# Module-level instance for easy access
_reader_instance: Optional[LLMTraceReader] = None
def get_llm_trace_reader() -> LLMTraceReader:
"""Get the singleton LLMTraceReader instance."""
global _reader_instance
if _reader_instance is None:
_reader_instance = LLMTraceReader()
return _reader_instance