From dba4cc9ea05e26fd113cc8707b89430500a71196 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Wed, 28 May 2025 10:06:16 -0700 Subject: [PATCH] feat: Add TTFT latency from provider in traces (#2481) --- letta/agents/letta_agent.py | 11 +++++++++- .../anthropic_streaming_interface.py | 20 ++++++++++++++++--- .../interfaces/openai_streaming_interface.py | 19 ++++++++++++++++-- 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 062e800f..ed60a238 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -319,6 +319,7 @@ class LettaAgent(BaseAgent): ttft_span = tracer.start_span("time_to_first_token", start_time=request_start_timestamp_ns) ttft_span.set_attributes({f"llm_config.{k}": v for k, v in agent_state.llm_config.model_dump().items() if v is not None}) + provider_request_start_timestamp_ns = None for _ in range(max_steps): step_id = generate_step_id() in_context_messages = await self._rebuild_memory_async( @@ -338,6 +339,12 @@ class LettaAgent(BaseAgent): log_event("agent.stream.llm_request.created") # [2^] try: + if first_chunk and ttft_span is not None: + provider_request_start_timestamp_ns = get_utc_timestamp_ns() + provider_req_start_ns = provider_request_start_timestamp_ns - request_start_timestamp_ns + ttft_span.add_event( + name="provider_req_start_ns", attributes={"provider_req_start_ms": provider_req_start_ns // 1_000_000} + ) stream = await llm_client.stream_async(request_data, agent_state.llm_config) except Exception as e: raise llm_client.handle_llm_error(e) @@ -358,7 +365,9 @@ class LettaAgent(BaseAgent): else: raise ValueError(f"Streaming not supported for {agent_state.llm_config}") - async for chunk in interface.process(stream): + async for chunk in interface.process( + stream, ttft_span=ttft_span, provider_request_start_timestamp_ns=provider_request_start_timestamp_ns + ): # Measure time to first token if first_chunk and ttft_span is not None: now = get_utc_timestamp_ns() diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 6fb0ae34..70cb249f 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -1,7 +1,7 @@ import json from datetime import datetime, timezone from enum import Enum -from typing import AsyncGenerator, List, Union +from typing import AsyncGenerator, List, Optional, Union from anthropic import AsyncStream from anthropic.types.beta import ( @@ -23,6 +23,7 @@ from anthropic.types.beta import ( ) from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG +from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.log import get_logger from letta.schemas.letta_message import ( @@ -116,13 +117,26 @@ class AnthropicStreamingInterface: logger.error("Error checking inner thoughts: %s", e) raise - @trace_method - async def process(self, stream: AsyncStream[BetaRawMessageStreamEvent]) -> AsyncGenerator[LettaMessage, None]: + async def process( + self, + stream: AsyncStream[BetaRawMessageStreamEvent], + ttft_span: Optional["Span"] = None, + provider_request_start_timestamp_ns: Optional[int] = None, + ) -> AsyncGenerator[LettaMessage, None]: prev_message_type = None message_index = 0 + first_chunk = True try: async with stream: async for event in stream: + if first_chunk and ttft_span is not None and provider_request_start_timestamp_ns is not None: + now = get_utc_timestamp_ns() + ttft_ns = now - provider_request_start_timestamp_ns + ttft_span.add_event( + name="anthropic_time_to_first_token_ms", attributes={"anthropic_time_to_first_token_ms": ttft_ns // 1_000_000} + ) + first_chunk = False + # TODO: Support BetaThinkingBlock, BetaRedactedThinkingBlock if isinstance(event, BetaRawContentBlockStartEvent): content = event.content_block diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 3edde5ec..13c31087 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -5,6 +5,7 @@ from openai import AsyncStream from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG +from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.schemas.letta_message import AssistantMessage, LettaMessage, ReasoningMessage, ToolCallDelta, ToolCallMessage from letta.schemas.letta_message_content import TextContent from letta.schemas.message import Message @@ -65,16 +66,30 @@ class OpenAIStreamingInterface: function=FunctionCall(arguments=self.current_function_arguments, name=function_name), ) - @trace_method - async def process(self, stream: AsyncStream[ChatCompletionChunk]) -> AsyncGenerator[LettaMessage, None]: + async def process( + self, + stream: AsyncStream[ChatCompletionChunk], + ttft_span: Optional["Span"] = None, + provider_request_start_timestamp_ns: Optional[int] = None, + ) -> AsyncGenerator[LettaMessage, None]: """ Iterates over the OpenAI stream, yielding SSE events. It also collects tokens and detects if a tool call is triggered. """ + first_chunk = True + async with stream: prev_message_type = None message_index = 0 async for chunk in stream: + if first_chunk and ttft_span is not None and provider_request_start_timestamp_ns is not None: + now = get_utc_timestamp_ns() + ttft_ns = now - provider_request_start_timestamp_ns + ttft_span.add_event( + name="openai_time_to_first_token_ms", attributes={"openai_time_to_first_token_ms": ttft_ns // 1_000_000} + ) + first_chunk = False + if not self.model or not self.message_id: self.model = chunk.model self.message_id = chunk.id