diff --git a/.skills/llm-provider-usage-statistics/SKILL.md b/.skills/llm-provider-usage-statistics/SKILL.md new file mode 100644 index 00000000..ff06467c --- /dev/null +++ b/.skills/llm-provider-usage-statistics/SKILL.md @@ -0,0 +1,43 @@ +--- +name: llm-provider-usage-statistics +description: Reference guide for token counting and prefix caching across LLM providers (OpenAI, Anthropic, Gemini). Use when debugging token counts or optimizing prefix caching. +--- + +# LLM Provider Usage Statistics + +Reference documentation for how different LLM providers report token usage. + +## Quick Reference: Token Counting Semantics + +| Provider | `input_tokens` meaning | Cache tokens | Must add cache to get total? | +|----------|------------------------|--------------|------------------------------| +| OpenAI | TOTAL (includes cached) | `cached_tokens` is subset | No | +| Anthropic | NON-cached only | `cache_read_input_tokens` + `cache_creation_input_tokens` | **Yes** | +| Gemini | TOTAL (includes cached) | `cached_content_token_count` is subset | No | + +**Critical difference:** Anthropic's `input_tokens` excludes cached tokens, so you must add them: +``` +total_input = input_tokens + cache_read_input_tokens + cache_creation_input_tokens +``` + +## Quick Reference: Prefix Caching + +| Provider | Min tokens | How to enable | TTL | +|----------|-----------|---------------|-----| +| OpenAI | 1,024 | Automatic | ~5-10 min | +| Anthropic | 1,024 | Requires `cache_control` breakpoints | 5 min | +| Gemini 2.0+ | 1,024 | Automatic (implicit) | Variable | + +## Quick Reference: Reasoning/Thinking Tokens + +| Provider | Field name | Models | +|----------|-----------|--------| +| OpenAI | `reasoning_tokens` | o1, o3 models | +| Anthropic | N/A | (thinking is in content blocks, not usage) | +| Gemini | `thoughts_token_count` | Gemini 2.0 with thinking enabled | + +## Provider Reference Files + +- **OpenAI:** [references/openai.md](references/openai.md) - Chat Completions vs Responses API, reasoning models, cached_tokens +- **Anthropic:** [references/anthropic.md](references/anthropic.md) - cache_control setup, beta headers, cache token fields +- **Gemini:** [references/gemini.md](references/gemini.md) - implicit caching, thinking tokens, usage_metadata fields diff --git a/.skills/llm-provider-usage-statistics/references/anthropic.md b/.skills/llm-provider-usage-statistics/references/anthropic.md new file mode 100644 index 00000000..7dfdd6fb --- /dev/null +++ b/.skills/llm-provider-usage-statistics/references/anthropic.md @@ -0,0 +1,83 @@ +# Anthropic Usage Statistics + +## Response Format + +``` +response.usage.input_tokens # NON-cached input tokens only +response.usage.output_tokens # Output tokens +response.usage.cache_read_input_tokens # Tokens read from cache +response.usage.cache_creation_input_tokens # Tokens written to cache +``` + +## Critical: Token Calculation + +**Anthropic's `input_tokens` is NOT the total.** To get total input tokens: + +```python +total_input = input_tokens + cache_read_input_tokens + cache_creation_input_tokens +``` + +This is different from OpenAI/Gemini where `prompt_tokens` is already the total. + +## Prefix Caching (Prompt Caching) + +**Requirements:** +- Minimum 1,024 tokens for Claude 3.5 Haiku/Sonnet +- Minimum 2,048 tokens for Claude 3 Opus +- Requires explicit `cache_control` breakpoints in messages +- TTL: 5 minutes + +**How to enable:** +Add `cache_control` to message content: +```python +{ + "role": "user", + "content": [ + { + "type": "text", + "text": "...", + "cache_control": {"type": "ephemeral"} + } + ] +} +``` + +**Beta header required:** +```python +betas = ["prompt-caching-2024-07-31"] +``` + +## Cache Behavior + +- `cache_creation_input_tokens`: Tokens that were cached on this request (cache write) +- `cache_read_input_tokens`: Tokens that were read from existing cache (cache hit) +- On first request: expect `cache_creation_input_tokens > 0` +- On subsequent requests with same prefix: expect `cache_read_input_tokens > 0` + +## Streaming + +In streaming mode, usage is reported in two events: + +1. **`message_start`**: Initial usage (may have cache info) + ```python + event.message.usage.input_tokens + event.message.usage.output_tokens + event.message.usage.cache_read_input_tokens + event.message.usage.cache_creation_input_tokens + ``` + +2. **`message_delta`**: Cumulative output tokens + ```python + event.usage.output_tokens # This is CUMULATIVE, not incremental + ``` + +**Important:** Per Anthropic docs, `message_delta` token counts are cumulative, so assign (don't accumulate). + +## Letta Implementation + +- **Client:** `letta/llm_api/anthropic_client.py` +- **Streaming interfaces:** + - `letta/interfaces/anthropic_streaming_interface.py` + - `letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py` (tracks cache tokens) +- **Extract method:** `AnthropicClient.extract_usage_statistics()` +- **Cache control:** `_add_cache_control_to_system_message()`, `_add_cache_control_to_messages()` diff --git a/.skills/llm-provider-usage-statistics/references/gemini.md b/.skills/llm-provider-usage-statistics/references/gemini.md new file mode 100644 index 00000000..2df708f5 --- /dev/null +++ b/.skills/llm-provider-usage-statistics/references/gemini.md @@ -0,0 +1,81 @@ +# Gemini Usage Statistics + +## Response Format + +Gemini returns usage in `usage_metadata`: + +``` +response.usage_metadata.prompt_token_count # Total input tokens +response.usage_metadata.candidates_token_count # Output tokens +response.usage_metadata.total_token_count # Sum +response.usage_metadata.cached_content_token_count # Tokens from cache (optional) +response.usage_metadata.thoughts_token_count # Reasoning tokens (optional) +``` + +## Token Counting + +- `prompt_token_count` is the TOTAL (includes cached) +- `cached_content_token_count` is a subset (when present) +- Similar to OpenAI's semantics + +## Implicit Caching (Gemini 2.0+) + +**Requirements:** +- Minimum 1,024 tokens +- Automatic (no opt-in required) +- Available on Gemini 2.0 Flash and later models + +**Behavior:** +- Caching is probabilistic and server-side +- `cached_content_token_count` may or may not be present +- When present, indicates tokens that were served from cache + +**Note:** Unlike Anthropic, Gemini doesn't have explicit cache_control. Caching is implicit and managed by Google's infrastructure. + +## Reasoning/Thinking Tokens + +For models with extended thinking (like Gemini 2.0 with thinking enabled): +- `thoughts_token_count` reports tokens used for reasoning +- These are similar to OpenAI's `reasoning_tokens` + +**Enabling thinking:** +```python +generation_config = { + "thinking_config": { + "thinking_budget": 1024 # Max thinking tokens + } +} +``` + +## Streaming + +In streaming mode: +- `usage_metadata` is typically in the **final chunk** +- Same fields as non-streaming +- May not be present in intermediate chunks + +**Important:** `stream_async()` returns an async generator (not awaitable): +```python +# Correct: +stream = client.stream_async(request_data, llm_config) +async for chunk in stream: + ... + +# Incorrect (will error): +stream = await client.stream_async(...) # TypeError! +``` + +## APIs + +Gemini has two APIs: +- **Google AI (google_ai):** Uses `google.genai` SDK +- **Vertex AI (google_vertex):** Uses same SDK with different auth + +Both share the same response format. + +## Letta Implementation + +- **Client:** `letta/llm_api/google_vertex_client.py` (handles both google_ai and google_vertex) +- **Streaming interface:** `letta/interfaces/gemini_streaming_interface.py` +- **Extract method:** `GoogleVertexClient.extract_usage_statistics()` +- Response is a `GenerateContentResponse` object with `.usage_metadata` attribute diff --git a/.skills/llm-provider-usage-statistics/references/openai.md b/.skills/llm-provider-usage-statistics/references/openai.md new file mode 100644 index 00000000..25913791 --- /dev/null +++ b/.skills/llm-provider-usage-statistics/references/openai.md @@ -0,0 +1,61 @@ +# OpenAI Usage Statistics + +## APIs and Response Formats + +OpenAI has two APIs with different response structures: + +### Chat Completions API +``` +response.usage.prompt_tokens # Total input tokens (includes cached) +response.usage.completion_tokens # Output tokens +response.usage.total_tokens # Sum +response.usage.prompt_tokens_details.cached_tokens # Subset that was cached +response.usage.completion_tokens_details.reasoning_tokens # For o1/o3 models +``` + +### Responses API (newer) +``` +response.usage.input_tokens # Total input tokens +response.usage.output_tokens # Output tokens +response.usage.total_tokens # Sum +response.usage.input_tokens_details.cached_tokens # Subset that was cached +response.usage.output_tokens_details.reasoning_tokens # For reasoning models +``` + +## Prefix Caching + +**Requirements:** +- Minimum 1,024 tokens in the prefix +- Automatic (no opt-in required) +- Cached in 128-token increments +- TTL: approximately 5-10 minutes of inactivity + +**Supported models:** GPT-4o, GPT-4o-mini, o1, o1-mini, o3-mini + +**Cache behavior:** +- `cached_tokens` will be a multiple of 128 +- Cache hit means those tokens were not re-processed +- Cost: cached tokens are cheaper than non-cached + +## Reasoning Models (o1, o3) + +For reasoning models, additional tokens are used for "thinking": +- `reasoning_tokens` in `completion_tokens_details` +- These are output tokens used for internal reasoning +- Not visible in the response content + +## Streaming + +In streaming mode, usage is reported in the **final chunk** when `stream_options.include_usage=True`: +```python +request_data["stream_options"] = {"include_usage": True} +``` + +The final chunk will have `chunk.usage` with the same structure as non-streaming. + +## Letta Implementation + +- **Client:** `letta/llm_api/openai_client.py` +- **Streaming interface:** `letta/interfaces/openai_streaming_interface.py` +- **Extract method:** `OpenAIClient.extract_usage_statistics()` +- Uses OpenAI SDK's pydantic models (`ChatCompletion`) for type-safe parsing diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 46659618..5a37aebe 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -116,64 +116,9 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): # Extract reasoning content from the interface self.reasoning_content = self.interface.get_reasoning_content() - # Extract usage statistics - # Some providers don't provide usage in streaming, use fallback if needed - if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"): - # Handle cases where tokens might not be set (e.g., LMStudio) - input_tokens = self.interface.input_tokens - output_tokens = self.interface.output_tokens - - # Fallback to estimated values if not provided - if not input_tokens and hasattr(self.interface, "fallback_input_tokens"): - input_tokens = self.interface.fallback_input_tokens - if not output_tokens and hasattr(self.interface, "fallback_output_tokens"): - output_tokens = self.interface.fallback_output_tokens - - # Extract cache token data (OpenAI/Gemini use cached_tokens, Anthropic uses cache_read_tokens) - # None means provider didn't report, 0 means provider reported 0 - cached_input_tokens = None - if hasattr(self.interface, "cached_tokens") and self.interface.cached_tokens is not None: - cached_input_tokens = self.interface.cached_tokens - elif hasattr(self.interface, "cache_read_tokens") and self.interface.cache_read_tokens is not None: - cached_input_tokens = self.interface.cache_read_tokens - - # Extract cache write tokens (Anthropic only) - cache_write_tokens = None - if hasattr(self.interface, "cache_creation_tokens") and self.interface.cache_creation_tokens is not None: - cache_write_tokens = self.interface.cache_creation_tokens - - # Extract reasoning tokens (OpenAI o1/o3 models use reasoning_tokens, Gemini uses thinking_tokens) - reasoning_tokens = None - if hasattr(self.interface, "reasoning_tokens") and self.interface.reasoning_tokens is not None: - reasoning_tokens = self.interface.reasoning_tokens - elif hasattr(self.interface, "thinking_tokens") and self.interface.thinking_tokens is not None: - reasoning_tokens = self.interface.thinking_tokens - - # Calculate actual total input tokens - # - # ANTHROPIC: input_tokens is NON-cached only, must add cache tokens - # Total = input_tokens + cache_read_input_tokens + cache_creation_input_tokens - # - # OPENAI/GEMINI: input_tokens is already TOTAL - # cached_tokens is a subset, NOT additive - is_anthropic = hasattr(self.interface, "cache_read_tokens") or hasattr(self.interface, "cache_creation_tokens") - if is_anthropic: - actual_input_tokens = (input_tokens or 0) + (cached_input_tokens or 0) + (cache_write_tokens or 0) - else: - actual_input_tokens = input_tokens or 0 - - self.usage = LettaUsageStatistics( - step_count=1, - completion_tokens=output_tokens or 0, - prompt_tokens=actual_input_tokens, - total_tokens=actual_input_tokens + (output_tokens or 0), - cached_input_tokens=cached_input_tokens, - cache_write_tokens=cache_write_tokens, - reasoning_tokens=reasoning_tokens, - ) - else: - # Default usage statistics if not available - self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0) + # Extract usage statistics from the streaming interface + self.usage = self.interface.get_usage_statistics() + self.usage.step_count = 1 # Store any additional data from the interface self.message_id = self.interface.letta_message_id diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index ef1a6c26..3d033e88 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -14,7 +14,6 @@ from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage from letta.schemas.letta_message_content import LettaMessageContentUnion from letta.schemas.provider_trace import ProviderTrace -from letta.schemas.usage import LettaUsageStatistics from letta.schemas.user import User from letta.server.rest_api.streaming_response import get_cancellation_event_for_run from letta.settings import settings @@ -164,68 +163,10 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): # Extract all content parts self.content: List[LettaMessageContentUnion] = self.interface.get_content() - # Extract usage statistics - # Some providers don't provide usage in streaming, use fallback if needed - if hasattr(self.interface, "input_tokens") and hasattr(self.interface, "output_tokens"): - # Handle cases where tokens might not be set (e.g., LMStudio) - input_tokens = self.interface.input_tokens - output_tokens = self.interface.output_tokens - - # Fallback to estimated values if not provided - if not input_tokens and hasattr(self.interface, "fallback_input_tokens"): - input_tokens = self.interface.fallback_input_tokens - if not output_tokens and hasattr(self.interface, "fallback_output_tokens"): - output_tokens = self.interface.fallback_output_tokens - - # Extract cache token data (OpenAI/Gemini use cached_tokens) - # None means provider didn't report, 0 means provider reported 0 - cached_input_tokens = None - if hasattr(self.interface, "cached_tokens") and self.interface.cached_tokens is not None: - cached_input_tokens = self.interface.cached_tokens - # Anthropic uses cache_read_tokens for cache hits - elif hasattr(self.interface, "cache_read_tokens") and self.interface.cache_read_tokens is not None: - cached_input_tokens = self.interface.cache_read_tokens - - # Extract cache write tokens (Anthropic only) - # None means provider didn't report, 0 means provider reported 0 - cache_write_tokens = None - if hasattr(self.interface, "cache_creation_tokens") and self.interface.cache_creation_tokens is not None: - cache_write_tokens = self.interface.cache_creation_tokens - - # Extract reasoning tokens (OpenAI o1/o3 models use reasoning_tokens, Gemini uses thinking_tokens) - # None means provider didn't report, 0 means provider reported 0 - reasoning_tokens = None - if hasattr(self.interface, "reasoning_tokens") and self.interface.reasoning_tokens is not None: - reasoning_tokens = self.interface.reasoning_tokens - elif hasattr(self.interface, "thinking_tokens") and self.interface.thinking_tokens is not None: - reasoning_tokens = self.interface.thinking_tokens - - # Calculate actual total input tokens for context window limit checks (summarization trigger). - # - # ANTHROPIC: input_tokens is NON-cached only, must add cache tokens - # Total = input_tokens + cache_read_input_tokens + cache_creation_input_tokens - # - # OPENAI/GEMINI: input_tokens (prompt_tokens/prompt_token_count) is already TOTAL - # cached_tokens is a subset, NOT additive - # Total = input_tokens (don't add cached_tokens or it double-counts!) - is_anthropic = hasattr(self.interface, "cache_read_tokens") or hasattr(self.interface, "cache_creation_tokens") - if is_anthropic: - actual_input_tokens = (input_tokens or 0) + (cached_input_tokens or 0) + (cache_write_tokens or 0) - else: - actual_input_tokens = input_tokens or 0 - - self.usage = LettaUsageStatistics( - step_count=1, - completion_tokens=output_tokens or 0, - prompt_tokens=actual_input_tokens, - total_tokens=actual_input_tokens + (output_tokens or 0), - cached_input_tokens=cached_input_tokens, - cache_write_tokens=cache_write_tokens, - reasoning_tokens=reasoning_tokens, - ) - else: - # Default usage statistics if not available - self.usage = LettaUsageStatistics(step_count=1, completion_tokens=0, prompt_tokens=0, total_tokens=0) + # Extract usage statistics from the interface + # Each interface implements get_usage_statistics() with provider-specific logic + self.usage = self.interface.get_usage_statistics() + self.usage.step_count = 1 # Store any additional data from the interface self.message_id = self.interface.letta_message_id diff --git a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py index 0df80176..0c13a727 100644 --- a/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py +++ b/letta/interfaces/anthropic_parallel_tool_call_streaming_interface.py @@ -146,6 +146,26 @@ class SimpleAnthropicStreamingInterface: return tool_calls[0] return None + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + """ + from letta.schemas.usage import LettaUsageStatistics + + # Anthropic: input_tokens is NON-cached only, must add cache tokens for total + actual_input_tokens = (self.input_tokens or 0) + (self.cache_read_tokens or 0) + (self.cache_creation_tokens or 0) + + return LettaUsageStatistics( + prompt_tokens=actual_input_tokens, + completion_tokens=self.output_tokens or 0, + total_tokens=actual_input_tokens + (self.output_tokens or 0), + cached_input_tokens=self.cache_read_tokens if self.cache_read_tokens else None, + cache_write_tokens=self.cache_creation_tokens if self.cache_creation_tokens else None, + reasoning_tokens=None, # Anthropic doesn't report reasoning tokens separately + ) + def get_reasoning_content(self) -> list[TextContent | ReasoningContent | RedactedReasoningContent]: def _process_group( group: list[ReasoningMessage | HiddenReasoningMessage | AssistantMessage], diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 8c2be4c6..fa1fdefa 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -128,6 +128,25 @@ class AnthropicStreamingInterface: arguments = str(json.dumps(tool_input, indent=2)) return ToolCall(id=self.tool_call_id, function=FunctionCall(arguments=arguments, name=self.tool_call_name)) + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + """ + from letta.schemas.usage import LettaUsageStatistics + + # Anthropic: input_tokens is NON-cached only in streaming + # This interface doesn't track cache tokens, so we just use the raw values + return LettaUsageStatistics( + prompt_tokens=self.input_tokens or 0, + completion_tokens=self.output_tokens or 0, + total_tokens=(self.input_tokens or 0) + (self.output_tokens or 0), + cached_input_tokens=None, # This interface doesn't track cache tokens + cache_write_tokens=None, + reasoning_tokens=None, + ) + def _check_inner_thoughts_complete(self, combined_args: str) -> bool: """ Check if inner thoughts are complete in the current tool call arguments @@ -637,6 +656,25 @@ class SimpleAnthropicStreamingInterface: arguments = str(json.dumps(tool_input, indent=2)) return ToolCall(id=self.tool_call_id, function=FunctionCall(arguments=arguments, name=self.tool_call_name)) + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + """ + from letta.schemas.usage import LettaUsageStatistics + + # Anthropic: input_tokens is NON-cached only in streaming + # This interface doesn't track cache tokens, so we just use the raw values + return LettaUsageStatistics( + prompt_tokens=self.input_tokens or 0, + completion_tokens=self.output_tokens or 0, + total_tokens=(self.input_tokens or 0) + (self.output_tokens or 0), + cached_input_tokens=None, # This interface doesn't track cache tokens + cache_write_tokens=None, + reasoning_tokens=None, + ) + def get_reasoning_content(self) -> list[TextContent | ReasoningContent | RedactedReasoningContent]: def _process_group( group: list[ReasoningMessage | HiddenReasoningMessage | AssistantMessage], diff --git a/letta/interfaces/gemini_streaming_interface.py b/letta/interfaces/gemini_streaming_interface.py index 629da143..9656977c 100644 --- a/letta/interfaces/gemini_streaming_interface.py +++ b/letta/interfaces/gemini_streaming_interface.py @@ -122,6 +122,27 @@ class SimpleGeminiStreamingInterface: """Return all finalized tool calls collected during this message (parallel supported).""" return list(self.collected_tool_calls) + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + + Note: + Gemini uses `thinking_tokens` instead of `reasoning_tokens` (OpenAI o1/o3). + """ + from letta.schemas.usage import LettaUsageStatistics + + return LettaUsageStatistics( + prompt_tokens=self.input_tokens or 0, + completion_tokens=self.output_tokens or 0, + total_tokens=(self.input_tokens or 0) + (self.output_tokens or 0), + # Gemini: input_tokens is already total, cached_tokens is a subset (not additive) + cached_input_tokens=self.cached_tokens, + cache_write_tokens=None, # Gemini doesn't report cache write tokens + reasoning_tokens=self.thinking_tokens, # Gemini uses thinking_tokens + ) + async def process( self, stream: AsyncIterator[GenerateContentResponse], diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 7a78d813..ca3602df 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -194,6 +194,28 @@ class OpenAIStreamingInterface: function=FunctionCall(arguments=self._get_current_function_arguments(), name=function_name), ) + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + """ + from letta.schemas.usage import LettaUsageStatistics + + # Use actual tokens if available, otherwise fall back to estimated + input_tokens = self.input_tokens if self.input_tokens else self.fallback_input_tokens + output_tokens = self.output_tokens if self.output_tokens else self.fallback_output_tokens + + return LettaUsageStatistics( + prompt_tokens=input_tokens or 0, + completion_tokens=output_tokens or 0, + total_tokens=(input_tokens or 0) + (output_tokens or 0), + # OpenAI: input_tokens is already total, cached_tokens is a subset (not additive) + cached_input_tokens=None, # This interface doesn't track cache tokens + cache_write_tokens=None, + reasoning_tokens=None, # This interface doesn't track reasoning tokens + ) + async def process( self, stream: AsyncStream[ChatCompletionChunk], @@ -672,6 +694,28 @@ class SimpleOpenAIStreamingInterface: raise ValueError("No tool calls available") return calls[0] + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + """ + from letta.schemas.usage import LettaUsageStatistics + + # Use actual tokens if available, otherwise fall back to estimated + input_tokens = self.input_tokens if self.input_tokens else self.fallback_input_tokens + output_tokens = self.output_tokens if self.output_tokens else self.fallback_output_tokens + + return LettaUsageStatistics( + prompt_tokens=input_tokens or 0, + completion_tokens=output_tokens or 0, + total_tokens=(input_tokens or 0) + (output_tokens or 0), + # OpenAI: input_tokens is already total, cached_tokens is a subset (not additive) + cached_input_tokens=self.cached_tokens, + cache_write_tokens=None, # OpenAI doesn't have cache write tokens + reasoning_tokens=self.reasoning_tokens, + ) + async def process( self, stream: AsyncStream[ChatCompletionChunk], @@ -1080,6 +1124,24 @@ class SimpleOpenAIResponsesStreamingInterface: raise ValueError("No tool calls available") return calls[0] + def get_usage_statistics(self) -> "LettaUsageStatistics": + """Extract usage statistics from accumulated streaming data. + + Returns: + LettaUsageStatistics with token counts from the stream. + """ + from letta.schemas.usage import LettaUsageStatistics + + return LettaUsageStatistics( + prompt_tokens=self.input_tokens or 0, + completion_tokens=self.output_tokens or 0, + total_tokens=(self.input_tokens or 0) + (self.output_tokens or 0), + # OpenAI Responses API: input_tokens is already total + cached_input_tokens=self.cached_tokens, + cache_write_tokens=None, # OpenAI doesn't have cache write tokens + reasoning_tokens=self.reasoning_tokens, + ) + async def process( self, stream: AsyncStream[ResponseStreamEvent], diff --git a/letta/llm_api/anthropic_client.py b/letta/llm_api/anthropic_client.py index dc76a248..8acd65dd 100644 --- a/letta/llm_api/anthropic_client.py +++ b/letta/llm_api/anthropic_client.py @@ -48,6 +48,7 @@ from letta.schemas.openai.chat_completion_response import ( UsageStatistics, ) from letta.schemas.response_format import JsonSchemaResponseFormat +from letta.schemas.usage import LettaUsageStatistics from letta.settings import model_settings DUMMY_FIRST_USER_MESSAGE = "User initializing bootup sequence." @@ -988,6 +989,35 @@ class AnthropicClient(LLMClientBase): return super().handle_llm_error(e) + def extract_usage_statistics(self, response_data: dict | None, llm_config: LLMConfig) -> LettaUsageStatistics: + """Extract usage statistics from Anthropic response and return as LettaUsageStatistics.""" + if not response_data: + return LettaUsageStatistics() + + response = AnthropicMessage(**response_data) + prompt_tokens = response.usage.input_tokens + completion_tokens = response.usage.output_tokens + + # Extract cache data if available (None means not reported, 0 means reported as 0) + cache_read_tokens = None + cache_creation_tokens = None + if hasattr(response.usage, "cache_read_input_tokens"): + cache_read_tokens = response.usage.cache_read_input_tokens + if hasattr(response.usage, "cache_creation_input_tokens"): + cache_creation_tokens = response.usage.cache_creation_input_tokens + + # Per Anthropic docs: "Total input tokens in a request is the summation of + # input_tokens, cache_creation_input_tokens, and cache_read_input_tokens." + actual_input_tokens = prompt_tokens + (cache_read_tokens or 0) + (cache_creation_tokens or 0) + + return LettaUsageStatistics( + prompt_tokens=actual_input_tokens, + completion_tokens=completion_tokens, + total_tokens=actual_input_tokens + completion_tokens, + cached_input_tokens=cache_read_tokens, + cache_write_tokens=cache_creation_tokens, + ) + # TODO: Input messages doesn't get used here # TODO: Clean up this interface @trace_method @@ -1032,10 +1062,13 @@ class AnthropicClient(LLMClientBase): } """ response = AnthropicMessage(**response_data) - prompt_tokens = response.usage.input_tokens - completion_tokens = response.usage.output_tokens finish_reason = remap_finish_reason(str(response.stop_reason)) + # Extract usage via centralized method + from letta.schemas.enums import ProviderType + + usage_stats = self.extract_usage_statistics(response_data, llm_config).to_usage(ProviderType.anthropic) + content = None reasoning_content = None reasoning_content_signature = None @@ -1100,35 +1133,12 @@ class AnthropicClient(LLMClientBase): ), ) - # Build prompt tokens details with cache data if available - prompt_tokens_details = None - cache_read_tokens = 0 - cache_creation_tokens = 0 - if hasattr(response.usage, "cache_read_input_tokens") or hasattr(response.usage, "cache_creation_input_tokens"): - from letta.schemas.openai.chat_completion_response import UsageStatisticsPromptTokenDetails - - cache_read_tokens = getattr(response.usage, "cache_read_input_tokens", 0) or 0 - cache_creation_tokens = getattr(response.usage, "cache_creation_input_tokens", 0) or 0 - prompt_tokens_details = UsageStatisticsPromptTokenDetails( - cache_read_tokens=cache_read_tokens, - cache_creation_tokens=cache_creation_tokens, - ) - - # Per Anthropic docs: "Total input tokens in a request is the summation of - # input_tokens, cache_creation_input_tokens, and cache_read_input_tokens." - actual_input_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens - chat_completion_response = ChatCompletionResponse( id=response.id, choices=[choice], created=get_utc_time_int(), model=response.model, - usage=UsageStatistics( - prompt_tokens=actual_input_tokens, - completion_tokens=completion_tokens, - total_tokens=actual_input_tokens + completion_tokens, - prompt_tokens_details=prompt_tokens_details, - ), + usage=usage_stats, ) if llm_config.put_inner_thoughts_in_kwargs: chat_completion_response = unpack_all_inner_thoughts_from_kwargs( diff --git a/letta/llm_api/chatgpt_oauth_client.py b/letta/llm_api/chatgpt_oauth_client.py index 7bf991d9..86854c06 100644 --- a/letta/llm_api/chatgpt_oauth_client.py +++ b/letta/llm_api/chatgpt_oauth_client.py @@ -54,6 +54,7 @@ from letta.schemas.openai.chat_completion_response import ( UsageStatistics, ) from letta.schemas.providers.chatgpt_oauth import ChatGPTOAuthCredentials, ChatGPTOAuthProvider +from letta.schemas.usage import LettaUsageStatistics logger = get_logger(__name__) @@ -511,6 +512,25 @@ class ChatGPTOAuthClient(LLMClientBase): # Response should already be in ChatCompletion format after transformation return ChatCompletionResponse(**response_data) + def extract_usage_statistics(self, response_data: dict | None, llm_config: LLMConfig) -> LettaUsageStatistics: + """Extract usage statistics from ChatGPT OAuth response and return as LettaUsageStatistics.""" + if not response_data: + return LettaUsageStatistics() + + usage = response_data.get("usage") + if not usage: + return LettaUsageStatistics() + + prompt_tokens = usage.get("prompt_tokens") or 0 + completion_tokens = usage.get("completion_tokens") or 0 + total_tokens = usage.get("total_tokens") or (prompt_tokens + completion_tokens) + + return LettaUsageStatistics( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + ) + @trace_method async def stream_async( self, diff --git a/letta/llm_api/google_vertex_client.py b/letta/llm_api/google_vertex_client.py index 0bd13ed0..b5bac794 100644 --- a/letta/llm_api/google_vertex_client.py +++ b/letta/llm_api/google_vertex_client.py @@ -39,6 +39,7 @@ from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage from letta.schemas.openai.chat_completion_request import Tool, Tool as OpenAITool from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, Choice, FunctionCall, Message, ToolCall, UsageStatistics +from letta.schemas.usage import LettaUsageStatistics from letta.settings import model_settings, settings from letta.utils import get_tool_call_id @@ -415,6 +416,34 @@ class GoogleVertexClient(LLMClientBase): return request_data + def extract_usage_statistics(self, response_data: dict | None, llm_config: LLMConfig) -> LettaUsageStatistics: + """Extract usage statistics from Gemini response and return as LettaUsageStatistics.""" + if not response_data: + return LettaUsageStatistics() + + response = GenerateContentResponse(**response_data) + if not response.usage_metadata: + return LettaUsageStatistics() + + cached_tokens = None + if ( + hasattr(response.usage_metadata, "cached_content_token_count") + and response.usage_metadata.cached_content_token_count is not None + ): + cached_tokens = response.usage_metadata.cached_content_token_count + + reasoning_tokens = None + if hasattr(response.usage_metadata, "thoughts_token_count") and response.usage_metadata.thoughts_token_count is not None: + reasoning_tokens = response.usage_metadata.thoughts_token_count + + return LettaUsageStatistics( + prompt_tokens=response.usage_metadata.prompt_token_count or 0, + completion_tokens=response.usage_metadata.candidates_token_count or 0, + total_tokens=response.usage_metadata.total_token_count or 0, + cached_input_tokens=cached_tokens, + reasoning_tokens=reasoning_tokens, + ) + @trace_method async def convert_response_to_chat_completion( self, @@ -642,36 +671,10 @@ class GoogleVertexClient(LLMClientBase): # "totalTokenCount": 36 # } if response.usage_metadata: - # Extract cache token data if available (Gemini uses cached_content_token_count) - # Use `is not None` to capture 0 values (meaning "provider reported 0 cached tokens") - prompt_tokens_details = None - if ( - hasattr(response.usage_metadata, "cached_content_token_count") - and response.usage_metadata.cached_content_token_count is not None - ): - from letta.schemas.openai.chat_completion_response import UsageStatisticsPromptTokenDetails + # Extract usage via centralized method + from letta.schemas.enums import ProviderType - prompt_tokens_details = UsageStatisticsPromptTokenDetails( - cached_tokens=response.usage_metadata.cached_content_token_count, - ) - - # Extract thinking/reasoning token data if available (Gemini uses thoughts_token_count) - # Use `is not None` to capture 0 values (meaning "provider reported 0 reasoning tokens") - completion_tokens_details = None - if hasattr(response.usage_metadata, "thoughts_token_count") and response.usage_metadata.thoughts_token_count is not None: - from letta.schemas.openai.chat_completion_response import UsageStatisticsCompletionTokenDetails - - completion_tokens_details = UsageStatisticsCompletionTokenDetails( - reasoning_tokens=response.usage_metadata.thoughts_token_count, - ) - - usage = UsageStatistics( - prompt_tokens=response.usage_metadata.prompt_token_count, - completion_tokens=response.usage_metadata.candidates_token_count, - total_tokens=response.usage_metadata.total_token_count, - prompt_tokens_details=prompt_tokens_details, - completion_tokens_details=completion_tokens_details, - ) + usage = self.extract_usage_statistics(response_data, llm_config).to_usage(ProviderType.google_ai) else: # Count it ourselves using the Gemini token counting API assert input_messages is not None, "Didn't get UsageMetadata from the API response, so input_messages is required" diff --git a/letta/llm_api/llm_client_base.py b/letta/llm_api/llm_client_base.py index 3491e093..754a19e8 100644 --- a/letta/llm_api/llm_client_base.py +++ b/letta/llm_api/llm_client_base.py @@ -15,6 +15,7 @@ from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message from letta.schemas.openai.chat_completion_response import ChatCompletionResponse from letta.schemas.provider_trace import ProviderTrace +from letta.schemas.usage import LettaUsageStatistics from letta.services.telemetry_manager import TelemetryManager from letta.settings import settings @@ -73,6 +74,10 @@ class LLMClientBase: self._telemetry_compaction_settings = compaction_settings self._telemetry_llm_config = llm_config + def extract_usage_statistics(self, response_data: Optional[dict], llm_config: LLMConfig) -> LettaUsageStatistics: + """Provider-specific usage parsing hook (override in subclasses). Returns LettaUsageStatistics.""" + return LettaUsageStatistics() + async def request_async_with_telemetry(self, request_data: dict, llm_config: LLMConfig) -> dict: """Wrapper around request_async that logs telemetry for all requests including errors. diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index 87324451..240078e7 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -60,6 +60,7 @@ from letta.schemas.openai.chat_completion_response import ( ) from letta.schemas.openai.responses_request import ResponsesRequest from letta.schemas.response_format import JsonSchemaResponseFormat +from letta.schemas.usage import LettaUsageStatistics from letta.settings import model_settings logger = get_logger(__name__) @@ -591,6 +592,66 @@ class OpenAIClient(LLMClientBase): def is_reasoning_model(self, llm_config: LLMConfig) -> bool: return is_openai_reasoning_model(llm_config.model) + def extract_usage_statistics(self, response_data: dict | None, llm_config: LLMConfig) -> LettaUsageStatistics: + """Extract usage statistics from OpenAI response and return as LettaUsageStatistics.""" + if not response_data: + return LettaUsageStatistics() + + # Handle Responses API format (used by reasoning models like o1/o3) + if response_data.get("object") == "response": + usage = response_data.get("usage", {}) or {} + prompt_tokens = usage.get("input_tokens") or 0 + completion_tokens = usage.get("output_tokens") or 0 + total_tokens = usage.get("total_tokens") or (prompt_tokens + completion_tokens) + + input_details = usage.get("input_tokens_details", {}) or {} + cached_tokens = input_details.get("cached_tokens") + + output_details = usage.get("output_tokens_details", {}) or {} + reasoning_tokens = output_details.get("reasoning_tokens") + + return LettaUsageStatistics( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + cached_input_tokens=cached_tokens, + reasoning_tokens=reasoning_tokens, + ) + + # Handle standard Chat Completions API format using pydantic models + from openai.types.chat import ChatCompletion + + try: + completion = ChatCompletion.model_validate(response_data) + except Exception: + return LettaUsageStatistics() + + if not completion.usage: + return LettaUsageStatistics() + + usage = completion.usage + prompt_tokens = usage.prompt_tokens or 0 + completion_tokens = usage.completion_tokens or 0 + total_tokens = usage.total_tokens or (prompt_tokens + completion_tokens) + + # Extract cached tokens from prompt_tokens_details + cached_tokens = None + if usage.prompt_tokens_details: + cached_tokens = usage.prompt_tokens_details.cached_tokens + + # Extract reasoning tokens from completion_tokens_details + reasoning_tokens = None + if usage.completion_tokens_details: + reasoning_tokens = usage.completion_tokens_details.reasoning_tokens + + return LettaUsageStatistics( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + cached_input_tokens=cached_tokens, + reasoning_tokens=reasoning_tokens, + ) + @trace_method async def convert_response_to_chat_completion( self, @@ -607,30 +668,10 @@ class OpenAIClient(LLMClientBase): # See example payload in tests/integration_test_send_message_v2.py model = response_data.get("model") - # Extract usage - usage = response_data.get("usage", {}) or {} - prompt_tokens = usage.get("input_tokens") or 0 - completion_tokens = usage.get("output_tokens") or 0 - total_tokens = usage.get("total_tokens") or (prompt_tokens + completion_tokens) + # Extract usage via centralized method + from letta.schemas.enums import ProviderType - # Extract detailed token breakdowns (Responses API uses input_tokens_details/output_tokens_details) - prompt_tokens_details = None - input_details = usage.get("input_tokens_details", {}) or {} - if input_details.get("cached_tokens"): - from letta.schemas.openai.chat_completion_response import UsageStatisticsPromptTokenDetails - - prompt_tokens_details = UsageStatisticsPromptTokenDetails( - cached_tokens=input_details.get("cached_tokens") or 0, - ) - - completion_tokens_details = None - output_details = usage.get("output_tokens_details", {}) or {} - if output_details.get("reasoning_tokens"): - from letta.schemas.openai.chat_completion_response import UsageStatisticsCompletionTokenDetails - - completion_tokens_details = UsageStatisticsCompletionTokenDetails( - reasoning_tokens=output_details.get("reasoning_tokens") or 0, - ) + usage_stats = self.extract_usage_statistics(response_data, llm_config).to_usage(ProviderType.openai) # Extract assistant message text from the outputs list outputs = response_data.get("output") or [] @@ -698,13 +739,7 @@ class OpenAIClient(LLMClientBase): choices=[choice], created=int(response_data.get("created_at") or 0), model=model or (llm_config.model if hasattr(llm_config, "model") else None), - usage=UsageStatistics( - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - total_tokens=total_tokens, - prompt_tokens_details=prompt_tokens_details, - completion_tokens_details=completion_tokens_details, - ), + usage=usage_stats, ) return chat_completion_response diff --git a/letta/schemas/usage.py b/letta/schemas/usage.py index da015e77..d2f5191d 100644 --- a/letta/schemas/usage.py +++ b/letta/schemas/usage.py @@ -126,3 +126,53 @@ class LettaUsageStatistics(BaseModel): reasoning_tokens: Optional[int] = Field( None, description="The number of reasoning/thinking tokens generated. None if not reported by provider." ) + + def to_usage(self, provider_type: Optional["ProviderType"] = None) -> "UsageStatistics": + """Convert to UsageStatistics (OpenAI-compatible format). + + Args: + provider_type: ProviderType enum indicating which provider format to use. + Used to determine which cache field to populate. + + Returns: + UsageStatistics object with nested prompt/completion token details. + """ + from letta.schemas.enums import ProviderType + from letta.schemas.openai.chat_completion_response import ( + UsageStatistics, + UsageStatisticsCompletionTokenDetails, + UsageStatisticsPromptTokenDetails, + ) + + # Providers that use Anthropic-style cache fields (cache_read_tokens, cache_creation_tokens) + anthropic_style_providers = {ProviderType.anthropic, ProviderType.bedrock} + + # Build prompt_tokens_details if we have cache data + prompt_tokens_details = None + if self.cached_input_tokens is not None or self.cache_write_tokens is not None: + if provider_type in anthropic_style_providers: + # Anthropic uses cache_read_tokens and cache_creation_tokens + prompt_tokens_details = UsageStatisticsPromptTokenDetails( + cache_read_tokens=self.cached_input_tokens, + cache_creation_tokens=self.cache_write_tokens, + ) + else: + # OpenAI/Gemini use cached_tokens + prompt_tokens_details = UsageStatisticsPromptTokenDetails( + cached_tokens=self.cached_input_tokens, + ) + + # Build completion_tokens_details if we have reasoning tokens + completion_tokens_details = None + if self.reasoning_tokens is not None: + completion_tokens_details = UsageStatisticsCompletionTokenDetails( + reasoning_tokens=self.reasoning_tokens, + ) + + return UsageStatistics( + prompt_tokens=self.prompt_tokens, + completion_tokens=self.completion_tokens, + total_tokens=self.total_tokens, + prompt_tokens_details=prompt_tokens_details, + completion_tokens_details=completion_tokens_details, + ) diff --git a/tests/test_usage_parsing.py b/tests/test_usage_parsing.py new file mode 100644 index 00000000..0b9dc1c2 --- /dev/null +++ b/tests/test_usage_parsing.py @@ -0,0 +1,473 @@ +""" +Tests for usage statistics parsing through the production adapter path. + +These tests verify that SimpleLLMRequestAdapter correctly extracts usage statistics +from LLM responses, including: +1. Basic usage (prompt_tokens, completion_tokens, total_tokens) +2. Cache-related fields (cached_input_tokens, cache_write_tokens) +3. Reasoning tokens (for models that support it) + +This tests the actual production code path: + SimpleLLMRequestAdapter.invoke_llm() + → llm_client.request_async_with_telemetry() + → llm_client.convert_response_to_chat_completion() + → adapter extracts from chat_completions_response.usage + → normalize_cache_tokens() / normalize_reasoning_tokens() +""" + +import os + +import pytest + +from letta.adapters.simple_llm_request_adapter import SimpleLLMRequestAdapter +from letta.errors import LLMAuthenticationError +from letta.llm_api.anthropic_client import AnthropicClient +from letta.llm_api.google_ai_client import GoogleAIClient +from letta.llm_api.openai_client import OpenAIClient +from letta.schemas.enums import AgentType, MessageRole +from letta.schemas.letta_message_content import TextContent +from letta.schemas.llm_config import LLMConfig +from letta.schemas.message import Message +from letta.settings import model_settings + + +def _has_openai_credentials() -> bool: + return bool(model_settings.openai_api_key or os.environ.get("OPENAI_API_KEY")) + + +def _has_anthropic_credentials() -> bool: + return bool(model_settings.anthropic_api_key or os.environ.get("ANTHROPIC_API_KEY")) + + +def _has_gemini_credentials() -> bool: + return bool(model_settings.gemini_api_key or os.environ.get("GEMINI_API_KEY")) + + +def _build_simple_messages(user_content: str) -> list[Message]: + """Build a minimal message list for testing.""" + return [ + Message( + role=MessageRole.user, + content=[TextContent(text=user_content)], + ) + ] + + +# Large system prompt to exceed caching thresholds (>1024 tokens) +LARGE_SYSTEM_PROMPT = """You are an advanced AI assistant with extensive knowledge across multiple domains. + +# Core Capabilities + +## Technical Knowledge +- Software Engineering: Expert in Python, JavaScript, TypeScript, Go, Rust, and many other languages +- System Design: Deep understanding of distributed systems, microservices, and cloud architecture +- DevOps: Proficient in Docker, Kubernetes, CI/CD pipelines, and infrastructure as code +- Databases: Experience with SQL (PostgreSQL, MySQL) and NoSQL (MongoDB, Redis, Cassandra) databases +- Machine Learning: Knowledge of neural networks, transformers, and modern ML frameworks + +## Problem Solving Approach +When tackling problems, you follow a structured methodology: +1. Understand the requirements thoroughly +2. Break down complex problems into manageable components +3. Consider multiple solution approaches +4. Evaluate trade-offs between different options +5. Implement solutions with clean, maintainable code +6. Test thoroughly and iterate based on feedback + +## Communication Style +- Clear and concise explanations +- Use examples and analogies when helpful +- Adapt technical depth to the audience +- Ask clarifying questions when requirements are ambiguous +- Provide context and rationale for recommendations + +# Domain Expertise + +## Web Development +You have deep knowledge of: +- Frontend: React, Vue, Angular, Next.js, modern CSS frameworks +- Backend: Node.js, Express, FastAPI, Django, Flask +- API Design: REST, GraphQL, gRPC +- Authentication: OAuth, JWT, session management +- Performance: Caching strategies, CDNs, lazy loading + +## Data Engineering +You understand: +- ETL pipelines and data transformation +- Data warehousing concepts (Snowflake, BigQuery, Redshift) +- Stream processing (Kafka, Kinesis) +- Data modeling and schema design +- Data quality and validation + +## Cloud Platforms +You're familiar with: +- AWS: EC2, S3, Lambda, RDS, DynamoDB, CloudFormation +- GCP: Compute Engine, Cloud Storage, Cloud Functions, BigQuery +- Azure: Virtual Machines, Blob Storage, Azure Functions +- Serverless architectures and best practices +- Cost optimization strategies + +## Security +You consider: +- Common vulnerabilities (OWASP Top 10) +- Secure coding practices +- Encryption and key management +- Access control and authorization patterns +- Security audit and compliance requirements + +# Interaction Principles + +## Helpfulness +- Provide actionable guidance +- Share relevant resources and documentation +- Offer multiple approaches when appropriate +- Point out potential pitfalls and edge cases +- Follow up to ensure understanding + +## Accuracy +- Acknowledge limitations and uncertainties +- Distinguish between facts and opinions +- Cite sources when making specific claims +- Correct mistakes promptly when identified +- Stay current with latest developments + +## Respect +- Value diverse perspectives and approaches +- Maintain professional boundaries +- Protect user privacy and confidentiality +- Avoid assumptions about user background +- Be patient with varying skill levels + +Remember: Your goal is to empower users to solve problems and learn, not just to provide answers.""" + + +@pytest.mark.asyncio +async def test_openai_usage_via_adapter(): + """Test OpenAI usage extraction through SimpleLLMRequestAdapter. + + This tests the actual production code path used by letta_agent_v3. + """ + if not _has_openai_credentials(): + pytest.skip("OpenAI credentials not configured") + + client = OpenAIClient() + llm_config = LLMConfig.default_config("gpt-4o-mini") + + adapter = SimpleLLMRequestAdapter( + llm_client=client, + llm_config=llm_config, + ) + + messages = _build_simple_messages("Say hello in exactly 5 words.") + request_data = client.build_request_data(AgentType.letta_v1_agent, messages, llm_config) + + # Call through the adapter (production path) + try: + async for _ in adapter.invoke_llm( + request_data=request_data, + messages=messages, + tools=[], + use_assistant_message=False, + ): + pass + except LLMAuthenticationError: + pytest.skip("OpenAI credentials invalid") + + # Verify usage was extracted + assert adapter.usage is not None, "adapter.usage should not be None" + assert adapter.usage.prompt_tokens > 0, f"prompt_tokens should be > 0, got {adapter.usage.prompt_tokens}" + assert adapter.usage.completion_tokens > 0, f"completion_tokens should be > 0, got {adapter.usage.completion_tokens}" + assert adapter.usage.total_tokens > 0, f"total_tokens should be > 0, got {adapter.usage.total_tokens}" + assert adapter.usage.step_count == 1, f"step_count should be 1, got {adapter.usage.step_count}" + + print(f"OpenAI usage: prompt={adapter.usage.prompt_tokens}, completion={adapter.usage.completion_tokens}") + print(f"OpenAI cache: cached_input={adapter.usage.cached_input_tokens}, cache_write={adapter.usage.cache_write_tokens}") + print(f"OpenAI reasoning: {adapter.usage.reasoning_tokens}") + + +@pytest.mark.asyncio +async def test_anthropic_usage_via_adapter(): + """Test Anthropic usage extraction through SimpleLLMRequestAdapter. + + This tests the actual production code path used by letta_agent_v3. + + Note: Anthropic's input_tokens is NON-cached only. The adapter should + compute total prompt_tokens = input_tokens + cache_read + cache_creation. + """ + if not _has_anthropic_credentials(): + pytest.skip("Anthropic credentials not configured") + + client = AnthropicClient() + llm_config = LLMConfig( + model="claude-3-5-haiku-20241022", + model_endpoint_type="anthropic", + model_endpoint="https://api.anthropic.com/v1", + context_window=200000, + max_tokens=256, + ) + + adapter = SimpleLLMRequestAdapter( + llm_client=client, + llm_config=llm_config, + ) + + # Anthropic requires a system message first + messages = [ + Message(role=MessageRole.system, content=[TextContent(text="You are a helpful assistant.")]), + Message(role=MessageRole.user, content=[TextContent(text="Say hello in exactly 5 words.")]), + ] + request_data = client.build_request_data(AgentType.letta_v1_agent, messages, llm_config, tools=[]) + + # Call through the adapter (production path) + try: + async for _ in adapter.invoke_llm( + request_data=request_data, + messages=messages, + tools=[], + use_assistant_message=False, + ): + pass + except LLMAuthenticationError: + pytest.skip("Anthropic credentials invalid") + + # Verify usage was extracted + assert adapter.usage is not None, "adapter.usage should not be None" + assert adapter.usage.prompt_tokens > 0, f"prompt_tokens should be > 0, got {adapter.usage.prompt_tokens}" + assert adapter.usage.completion_tokens > 0, f"completion_tokens should be > 0, got {adapter.usage.completion_tokens}" + assert adapter.usage.total_tokens > 0, f"total_tokens should be > 0, got {adapter.usage.total_tokens}" + assert adapter.usage.step_count == 1, f"step_count should be 1, got {adapter.usage.step_count}" + + print(f"Anthropic usage: prompt={adapter.usage.prompt_tokens}, completion={adapter.usage.completion_tokens}") + print(f"Anthropic cache: cached_input={adapter.usage.cached_input_tokens}, cache_write={adapter.usage.cache_write_tokens}") + + +@pytest.mark.asyncio +async def test_gemini_usage_via_adapter(): + """Test Gemini usage extraction through SimpleLLMRequestAdapter. + + This tests the actual production code path used by letta_agent_v3. + """ + if not _has_gemini_credentials(): + pytest.skip("Gemini credentials not configured") + + client = GoogleAIClient() + llm_config = LLMConfig( + model="gemini-2.0-flash", + model_endpoint_type="google_ai", + model_endpoint="https://generativelanguage.googleapis.com", + context_window=1048576, + max_tokens=256, + ) + + adapter = SimpleLLMRequestAdapter( + llm_client=client, + llm_config=llm_config, + ) + + messages = _build_simple_messages("Say hello in exactly 5 words.") + request_data = client.build_request_data(AgentType.letta_v1_agent, messages, llm_config, tools=[]) + + # Call through the adapter (production path) + try: + async for _ in adapter.invoke_llm( + request_data=request_data, + messages=messages, + tools=[], + use_assistant_message=False, + ): + pass + except LLMAuthenticationError: + pytest.skip("Gemini credentials invalid") + + # Verify usage was extracted + assert adapter.usage is not None, "adapter.usage should not be None" + assert adapter.usage.prompt_tokens > 0, f"prompt_tokens should be > 0, got {adapter.usage.prompt_tokens}" + assert adapter.usage.completion_tokens > 0, f"completion_tokens should be > 0, got {adapter.usage.completion_tokens}" + assert adapter.usage.total_tokens > 0, f"total_tokens should be > 0, got {adapter.usage.total_tokens}" + assert adapter.usage.step_count == 1, f"step_count should be 1, got {adapter.usage.step_count}" + + print(f"Gemini usage: prompt={adapter.usage.prompt_tokens}, completion={adapter.usage.completion_tokens}") + print(f"Gemini cache: cached_input={adapter.usage.cached_input_tokens}") + print(f"Gemini reasoning: {adapter.usage.reasoning_tokens}") + + +@pytest.mark.asyncio +async def test_openai_prefix_caching_via_adapter(): + """Test OpenAI prefix caching through SimpleLLMRequestAdapter. + + Makes two requests with the same large system prompt to verify + cached_input_tokens is populated on the second request. + + Note: Prefix caching is probabilistic and depends on server-side state. + """ + if not _has_openai_credentials(): + pytest.skip("OpenAI credentials not configured") + + client = OpenAIClient() + llm_config = LLMConfig.default_config("gpt-4o-mini") + + # First request - should populate the cache + adapter1 = SimpleLLMRequestAdapter(llm_client=client, llm_config=llm_config) + messages1 = [ + Message(role=MessageRole.system, content=[TextContent(text=LARGE_SYSTEM_PROMPT)]), + Message(role=MessageRole.user, content=[TextContent(text="What is 2+2?")]), + ] + request_data1 = client.build_request_data(AgentType.letta_v1_agent, messages1, llm_config) + + try: + async for _ in adapter1.invoke_llm(request_data=request_data1, messages=messages1, tools=[], use_assistant_message=False): + pass + except LLMAuthenticationError: + pytest.skip("OpenAI credentials invalid") + + print(f"Request 1 - prompt={adapter1.usage.prompt_tokens}, cached={adapter1.usage.cached_input_tokens}") + + # Second request - same system prompt, should hit cache + adapter2 = SimpleLLMRequestAdapter(llm_client=client, llm_config=llm_config) + messages2 = [ + Message(role=MessageRole.system, content=[TextContent(text=LARGE_SYSTEM_PROMPT)]), + Message(role=MessageRole.user, content=[TextContent(text="What is 3+3?")]), + ] + request_data2 = client.build_request_data(AgentType.letta_v1_agent, messages2, llm_config) + + async for _ in adapter2.invoke_llm(request_data=request_data2, messages=messages2, tools=[], use_assistant_message=False): + pass + + print(f"Request 2 - prompt={adapter2.usage.prompt_tokens}, cached={adapter2.usage.cached_input_tokens}") + + # Verify basic usage + assert adapter2.usage.prompt_tokens > 0 + assert adapter2.usage.completion_tokens > 0 + + # Note: We can't guarantee cache hit, but if it happened, cached_input_tokens should be > 0 + if adapter2.usage.cached_input_tokens and adapter2.usage.cached_input_tokens > 0: + print(f"SUCCESS: OpenAI cache hit! cached_input_tokens={adapter2.usage.cached_input_tokens}") + else: + print("INFO: No cache hit (cache may not have been populated yet)") + + +@pytest.mark.asyncio +async def test_anthropic_prefix_caching_via_adapter(): + """Test Anthropic prefix caching through SimpleLLMRequestAdapter. + + Makes two requests with the same large system prompt using cache_control + to verify cache tokens are populated. + + Note: Anthropic requires explicit cache_control breakpoints. + """ + if not _has_anthropic_credentials(): + pytest.skip("Anthropic credentials not configured") + + client = AnthropicClient() + llm_config = LLMConfig( + model="claude-3-5-haiku-20241022", + model_endpoint_type="anthropic", + model_endpoint="https://api.anthropic.com/v1", + context_window=200000, + max_tokens=256, + ) + + # First request + adapter1 = SimpleLLMRequestAdapter(llm_client=client, llm_config=llm_config) + messages1 = [ + Message(role=MessageRole.system, content=[TextContent(text=LARGE_SYSTEM_PROMPT)]), + Message(role=MessageRole.user, content=[TextContent(text="What is 2+2?")]), + ] + request_data1 = client.build_request_data(AgentType.letta_v1_agent, messages1, llm_config, tools=[]) + + try: + async for _ in adapter1.invoke_llm(request_data=request_data1, messages=messages1, tools=[], use_assistant_message=False): + pass + except LLMAuthenticationError: + pytest.skip("Anthropic credentials invalid") + + print( + f"Request 1 - prompt={adapter1.usage.prompt_tokens}, cached={adapter1.usage.cached_input_tokens}, cache_write={adapter1.usage.cache_write_tokens}" + ) + + # Second request + adapter2 = SimpleLLMRequestAdapter(llm_client=client, llm_config=llm_config) + messages2 = [ + Message(role=MessageRole.system, content=[TextContent(text=LARGE_SYSTEM_PROMPT)]), + Message(role=MessageRole.user, content=[TextContent(text="What is 3+3?")]), + ] + request_data2 = client.build_request_data(AgentType.letta_v1_agent, messages2, llm_config, tools=[]) + + async for _ in adapter2.invoke_llm(request_data=request_data2, messages=messages2, tools=[], use_assistant_message=False): + pass + + print( + f"Request 2 - prompt={adapter2.usage.prompt_tokens}, cached={adapter2.usage.cached_input_tokens}, cache_write={adapter2.usage.cache_write_tokens}" + ) + + # Verify basic usage + assert adapter2.usage.prompt_tokens > 0 + assert adapter2.usage.completion_tokens > 0 + + # Check for cache activity + if adapter2.usage.cached_input_tokens and adapter2.usage.cached_input_tokens > 0: + print(f"SUCCESS: Anthropic cache hit! cached_input_tokens={adapter2.usage.cached_input_tokens}") + elif adapter2.usage.cache_write_tokens and adapter2.usage.cache_write_tokens > 0: + print(f"INFO: Anthropic cache write! cache_write_tokens={adapter2.usage.cache_write_tokens}") + else: + print("INFO: No cache activity detected") + + +@pytest.mark.asyncio +async def test_gemini_prefix_caching_via_adapter(): + """Test Gemini prefix caching through SimpleLLMRequestAdapter. + + Makes two requests with the same large system prompt to verify + cached_input_tokens is populated. + + Note: Gemini 2.0+ has implicit caching. + """ + if not _has_gemini_credentials(): + pytest.skip("Gemini credentials not configured") + + client = GoogleAIClient() + llm_config = LLMConfig( + model="gemini-2.0-flash", + model_endpoint_type="google_ai", + model_endpoint="https://generativelanguage.googleapis.com", + context_window=1048576, + max_tokens=256, + ) + + # First request + adapter1 = SimpleLLMRequestAdapter(llm_client=client, llm_config=llm_config) + messages1 = [ + Message(role=MessageRole.system, content=[TextContent(text=LARGE_SYSTEM_PROMPT)]), + Message(role=MessageRole.user, content=[TextContent(text="What is 2+2?")]), + ] + request_data1 = client.build_request_data(AgentType.letta_v1_agent, messages1, llm_config, tools=[]) + + try: + async for _ in adapter1.invoke_llm(request_data=request_data1, messages=messages1, tools=[], use_assistant_message=False): + pass + except LLMAuthenticationError: + pytest.skip("Gemini credentials invalid") + + print(f"Request 1 - prompt={adapter1.usage.prompt_tokens}, cached={adapter1.usage.cached_input_tokens}") + + # Second request + adapter2 = SimpleLLMRequestAdapter(llm_client=client, llm_config=llm_config) + messages2 = [ + Message(role=MessageRole.system, content=[TextContent(text=LARGE_SYSTEM_PROMPT)]), + Message(role=MessageRole.user, content=[TextContent(text="What is 3+3?")]), + ] + request_data2 = client.build_request_data(AgentType.letta_v1_agent, messages2, llm_config, tools=[]) + + async for _ in adapter2.invoke_llm(request_data=request_data2, messages=messages2, tools=[], use_assistant_message=False): + pass + + print(f"Request 2 - prompt={adapter2.usage.prompt_tokens}, cached={adapter2.usage.cached_input_tokens}") + + # Verify basic usage + assert adapter2.usage.prompt_tokens > 0 + assert adapter2.usage.completion_tokens > 0 + + if adapter2.usage.cached_input_tokens and adapter2.usage.cached_input_tokens > 0: + print(f"SUCCESS: Gemini cache hit! cached_input_tokens={adapter2.usage.cached_input_tokens}") + else: + print("INFO: No cache hit detected")