From 4af6465226b6a1c19785db5638cd184665c3eb45 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sat, 29 Nov 2025 13:45:15 -0800 Subject: [PATCH] feat(core+web): store raw usage data on streams (and visualize properly in ADE) (#6452) * feat(core): store raw usage data on streams * fix(web): various fixes to deal w/ hardcoding against openai --- letta/adapters/simple_llm_stream_adapter.py | 5 ++++- ...c_parallel_tool_call_streaming_interface.py | 10 ++++++++++ letta/interfaces/gemini_streaming_interface.py | 17 +++++++++++++++++ letta/interfaces/openai_streaming_interface.py | 18 ++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index 4fb8cc8e..71a34cff 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -234,7 +234,10 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): # "stop_reason": "", # "stop_sequence": None, "type": "message", - "usage": { + # Use raw_usage if available for transparent provider trace logging, else fallback + "usage": self.interface.raw_usage + if hasattr(self.interface, "raw_usage") and self.interface.raw_usage + else { "input_tokens": self.usage.prompt_tokens, "output_tokens": self.usage.completion_tokens, }, diff --git a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py index 37346cd1..e35d8425 100644 --- a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py +++ b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py @@ -98,6 +98,9 @@ class SimpleAnthropicStreamingInterface: self.cache_read_tokens = 0 self.cache_creation_tokens = 0 + # Raw usage from provider (for transparent logging in provider trace) + self.raw_usage: dict | None = None + # reasoning object trackers self.reasoning_messages = [] @@ -474,6 +477,13 @@ class SimpleAnthropicStreamingInterface: if hasattr(usage, "cache_creation_input_tokens") and usage.cache_creation_input_tokens: self.cache_creation_tokens += usage.cache_creation_input_tokens + # Store raw usage for transparent provider trace logging + try: + self.raw_usage = usage.model_dump(exclude_none=True) + except Exception as e: + logger.error(f"Failed to capture raw_usage from Anthropic: {e}") + self.raw_usage = None + elif isinstance(event, BetaRawMessageDeltaEvent): self.output_tokens += event.usage.output_tokens diff --git a/letta/interfaces/gemini_streaming_interface.py b/letta/interfaces/gemini_streaming_interface.py index e731cd5c..15b84c50 100644 --- a/letta/interfaces/gemini_streaming_interface.py +++ b/letta/interfaces/gemini_streaming_interface.py @@ -86,6 +86,9 @@ class SimpleGeminiStreamingInterface: # None means "not reported by provider", 0 means "provider reported 0" self.thinking_tokens: int | None = None + # Raw usage from provider (for transparent logging in provider trace) + self.raw_usage: dict | None = None + def get_content(self) -> List[ReasoningContent | TextContent | ToolCallContent]: """This is (unusually) in chunked format, instead of merged""" for content in self.content_parts: @@ -191,6 +194,20 @@ class SimpleGeminiStreamingInterface: # Use `is not None` to capture 0 values (meaning "provider reported 0 reasoning tokens") if hasattr(usage_metadata, "thoughts_token_count") and usage_metadata.thoughts_token_count is not None: self.thinking_tokens = usage_metadata.thoughts_token_count + # Store raw usage for transparent provider trace logging + try: + self.raw_usage = ( + usage_metadata.to_json_dict() + if hasattr(usage_metadata, "to_json_dict") + else { + "prompt_token_count": usage_metadata.prompt_token_count, + "candidates_token_count": usage_metadata.candidates_token_count, + "total_token_count": usage_metadata.total_token_count, + } + ) + except Exception as e: + logger.error(f"Failed to capture raw_usage from Gemini: {e}") + self.raw_usage = None if not event.candidates or len(event.candidates) == 0: return diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index dc097f1c..06677351 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -542,6 +542,9 @@ class SimpleOpenAIStreamingInterface: self.cached_tokens: int | None = None self.reasoning_tokens: int | None = None + # Raw usage from provider (for transparent logging in provider trace) + self.raw_usage: dict | None = None + # Fallback token counters (using tiktoken cl200k-base) self.fallback_input_tokens = 0 self.fallback_output_tokens = 0 @@ -707,6 +710,12 @@ class SimpleOpenAIStreamingInterface: if chunk.usage: self.input_tokens += chunk.usage.prompt_tokens self.output_tokens += chunk.usage.completion_tokens + # Store raw usage for transparent provider trace logging + try: + self.raw_usage = chunk.usage.model_dump(exclude_none=True) + except Exception as e: + logger.error(f"Failed to capture raw_usage from OpenAI chat completion chunk: {e}") + self.raw_usage = None # Capture cache token details (OpenAI) # Use `is not None` to capture 0 values (meaning "provider reported 0 cached tokens") if hasattr(chunk.usage, "prompt_tokens_details") and chunk.usage.prompt_tokens_details: @@ -876,6 +885,9 @@ class SimpleOpenAIResponsesStreamingInterface: self.cached_tokens: int | None = None self.reasoning_tokens: int | None = None + # Raw usage from provider (for transparent logging in provider trace) + self.raw_usage: dict | None = None + # -------- Mapping helpers (no broad try/except) -------- def _record_tool_mapping(self, event: object, item: object) -> tuple[str | None, str | None, int | None, str | None]: """Record call_id/name mapping for this tool-call using output_index and item.id if present. @@ -1300,6 +1312,12 @@ class SimpleOpenAIResponsesStreamingInterface: self.input_tokens = event.response.usage.input_tokens self.output_tokens = event.response.usage.output_tokens self.message_id = event.response.id + # Store raw usage for transparent provider trace logging + try: + self.raw_usage = event.response.usage.model_dump(exclude_none=True) + except Exception as e: + logger.error(f"Failed to capture raw_usage from OpenAI Responses API: {e}") + self.raw_usage = None # Capture cache token details (Responses API uses input_tokens_details) # Use `is not None` to capture 0 values (meaning "provider reported 0 cached tokens") if hasattr(event.response.usage, "input_tokens_details") and event.response.usage.input_tokens_details: