Files
letta-server/letta/agents/base_agent_v2.py
cthomas 416ffc7cd7 Add billing context to LLM telemetry traces (#9745)
* feat: add billing context to LLM telemetry traces

Add billing metadata (plan type, cost source, customer ID) to LLM traces in ClickHouse for cost analytics and attribution.

**Data Flow:**
- Cloud-API: Extract billing info from subscription in rate limiting, set x-billing-* headers
- Core: Parse headers into BillingContext object via dependencies
- Adapters: Flow billing_context through all LLM adapters (blocking & streaming)
- Agent: Pass billing_context to step() and stream() methods
- ClickHouse: Store in billing_plan_type, billing_cost_source, billing_customer_id columns

**Changes:**
- Add BillingContext schema to provider_trace.py
- Add billing columns to llm_traces ClickHouse table DDL
- Update getCustomerSubscription to fetch stripeCustomerId from organization_billing_details
- Propagate billing_context through agent step flow, adapters, and streaming service
- Update ProviderTrace and LLMTrace to include billing metadata
- Regenerate SDK with autogen

**Production Deployment:**
Requires env vars: LETTA_PROVIDER_TRACE_BACKEND=clickhouse, LETTA_STORE_LLM_TRACES=true, CLICKHOUSE_*

🐾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta <noreply@letta.com>

* fix: add billing_context parameter to agent step methods

- Add billing_context to BaseAgent and BaseAgentV2 abstract methods
- Update LettaAgent, LettaAgentV2, LettaAgentV3 step methods
- Update multi-agent groups: SleeptimeMultiAgentV2, V3, V4
- Fix test_utils.py to include billing header parameters
- Import BillingContext in all affected files

* fix: add billing_context to stream methods

- Add billing_context parameter to BaseAgentV2.stream()
- Add billing_context parameter to LettaAgentV2.stream()
- LettaAgentV3.stream() already has it from previous commit

* fix: exclude billing headers from OpenAPI spec

Mark billing headers as internal (include_in_schema=False) so they don't appear in the public API.
These are internal headers between cloud-api and core, not part of the public SDK.

Regenerated SDK with stage-api - removes 10,650 lines of bloat that was causing OOM during Next.js build.

* refactor: return billing context from handleUnifiedRateLimiting instead of mutating req

Instead of passing req into handleUnifiedRateLimiting and mutating headers inside it:
- Return billing context fields (billingPlanType, billingCostSource, billingCustomerId) from handleUnifiedRateLimiting
- Set headers in handleMessageRateLimiting (middleware layer) after getting the result
- This fixes step-orchestrator compatibility since it doesn't have a real Express req object

* chore: remove extra gencode

* p

---------

Co-authored-by: Letta <noreply@letta.com>
2026-03-03 18:34:13 -08:00

95 lines
3.8 KiB
Python

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, AsyncGenerator
from letta.constants import DEFAULT_MAX_STEPS
from letta.log import get_logger
from letta.schemas.agent import AgentState
from letta.schemas.enums import MessageStreamStatus
from letta.schemas.letta_message import LegacyLettaMessage, LettaMessage, MessageType
from letta.schemas.letta_response import LettaResponse
from letta.schemas.message import MessageCreate
from letta.schemas.user import User
if TYPE_CHECKING:
from letta.schemas.letta_request import ClientToolSchema
from letta.schemas.provider_trace import BillingContext
class BaseAgentV2(ABC):
"""
Abstract base class for the main agent execution loop for letta agents, handling
message management, llm api request, tool execution, and context tracking.
"""
def __init__(self, agent_state: AgentState, actor: User):
self.agent_state = agent_state
self.actor = actor
self.logger = get_logger(agent_state.id)
@property
def agent_id(self) -> str:
"""Return the agent ID for backward compatibility with code expecting self.agent_id."""
return self.agent_state.id
@abstractmethod
async def build_request(
self,
input_messages: list[MessageCreate],
) -> dict:
"""
Execute the agent loop in dry_run mode, returning just the generated request
payload sent to the underlying llm provider.
"""
raise NotImplementedError
@abstractmethod
async def step(
self,
input_messages: list[MessageCreate],
max_steps: int = DEFAULT_MAX_STEPS,
run_id: str | None = None,
use_assistant_message: bool = True,
include_return_message_types: list[MessageType] | None = None,
request_start_timestamp_ns: int | None = None,
client_tools: list["ClientToolSchema"] | None = None,
include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility
billing_context: "BillingContext | None" = None,
) -> LettaResponse:
"""
Execute the agent loop in blocking mode, returning all messages at once.
Args:
client_tools: Optional list of client-side tools. When called, execution pauses
for client to provide tool returns.
include_compaction_messages: Not used in V2, but accepted for API compatibility.
"""
raise NotImplementedError
@abstractmethod
async def stream(
self,
input_messages: list[MessageCreate],
max_steps: int = DEFAULT_MAX_STEPS,
stream_tokens: bool = False,
run_id: str | None = None,
use_assistant_message: bool = True,
include_return_message_types: list[MessageType] | None = None,
request_start_timestamp_ns: int | None = None,
conversation_id: str | None = None,
client_tools: list["ClientToolSchema"] | None = None,
include_compaction_messages: bool = False, # Not used in V2, but accepted for API compatibility
billing_context: "BillingContext | None" = None,
) -> AsyncGenerator[LettaMessage | LegacyLettaMessage | MessageStreamStatus, None]:
"""
Execute the agent loop in streaming mode, yielding chunks as they become available.
If stream_tokens is True, individual tokens are streamed as they arrive from the LLM,
providing the lowest latency experience, otherwise each complete step (reasoning +
tool call + tool return) is yielded as it completes.
Args:
client_tools: Optional list of client-side tools. When called, execution pauses
for client to provide tool returns.
include_compaction_messages: Not used in V2, but accepted for API compatibility.
"""
raise NotImplementedError