feat: Add TTFT latency from provider in traces (#2481)
This commit is contained in:
@@ -319,6 +319,7 @@ class LettaAgent(BaseAgent):
|
||||
ttft_span = tracer.start_span("time_to_first_token", start_time=request_start_timestamp_ns)
|
||||
ttft_span.set_attributes({f"llm_config.{k}": v for k, v in agent_state.llm_config.model_dump().items() if v is not None})
|
||||
|
||||
provider_request_start_timestamp_ns = None
|
||||
for _ in range(max_steps):
|
||||
step_id = generate_step_id()
|
||||
in_context_messages = await self._rebuild_memory_async(
|
||||
@@ -338,6 +339,12 @@ class LettaAgent(BaseAgent):
|
||||
log_event("agent.stream.llm_request.created") # [2^]
|
||||
|
||||
try:
|
||||
if first_chunk and ttft_span is not None:
|
||||
provider_request_start_timestamp_ns = get_utc_timestamp_ns()
|
||||
provider_req_start_ns = provider_request_start_timestamp_ns - request_start_timestamp_ns
|
||||
ttft_span.add_event(
|
||||
name="provider_req_start_ns", attributes={"provider_req_start_ms": provider_req_start_ns // 1_000_000}
|
||||
)
|
||||
stream = await llm_client.stream_async(request_data, agent_state.llm_config)
|
||||
except Exception as e:
|
||||
raise llm_client.handle_llm_error(e)
|
||||
@@ -358,7 +365,9 @@ class LettaAgent(BaseAgent):
|
||||
else:
|
||||
raise ValueError(f"Streaming not supported for {agent_state.llm_config}")
|
||||
|
||||
async for chunk in interface.process(stream):
|
||||
async for chunk in interface.process(
|
||||
stream, ttft_span=ttft_span, provider_request_start_timestamp_ns=provider_request_start_timestamp_ns
|
||||
):
|
||||
# Measure time to first token
|
||||
if first_chunk and ttft_span is not None:
|
||||
now = get_utc_timestamp_ns()
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from typing import AsyncGenerator, List, Union
|
||||
from typing import AsyncGenerator, List, Optional, Union
|
||||
|
||||
from anthropic import AsyncStream
|
||||
from anthropic.types.beta import (
|
||||
@@ -23,6 +23,7 @@ from anthropic.types.beta import (
|
||||
)
|
||||
|
||||
from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG
|
||||
from letta.helpers.datetime_helpers import get_utc_timestamp_ns
|
||||
from letta.local_llm.constants import INNER_THOUGHTS_KWARG
|
||||
from letta.log import get_logger
|
||||
from letta.schemas.letta_message import (
|
||||
@@ -116,13 +117,26 @@ class AnthropicStreamingInterface:
|
||||
logger.error("Error checking inner thoughts: %s", e)
|
||||
raise
|
||||
|
||||
@trace_method
|
||||
async def process(self, stream: AsyncStream[BetaRawMessageStreamEvent]) -> AsyncGenerator[LettaMessage, None]:
|
||||
async def process(
|
||||
self,
|
||||
stream: AsyncStream[BetaRawMessageStreamEvent],
|
||||
ttft_span: Optional["Span"] = None,
|
||||
provider_request_start_timestamp_ns: Optional[int] = None,
|
||||
) -> AsyncGenerator[LettaMessage, None]:
|
||||
prev_message_type = None
|
||||
message_index = 0
|
||||
first_chunk = True
|
||||
try:
|
||||
async with stream:
|
||||
async for event in stream:
|
||||
if first_chunk and ttft_span is not None and provider_request_start_timestamp_ns is not None:
|
||||
now = get_utc_timestamp_ns()
|
||||
ttft_ns = now - provider_request_start_timestamp_ns
|
||||
ttft_span.add_event(
|
||||
name="anthropic_time_to_first_token_ms", attributes={"anthropic_time_to_first_token_ms": ttft_ns // 1_000_000}
|
||||
)
|
||||
first_chunk = False
|
||||
|
||||
# TODO: Support BetaThinkingBlock, BetaRedactedThinkingBlock
|
||||
if isinstance(event, BetaRawContentBlockStartEvent):
|
||||
content = event.content_block
|
||||
|
||||
@@ -5,6 +5,7 @@ from openai import AsyncStream
|
||||
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
|
||||
|
||||
from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG
|
||||
from letta.helpers.datetime_helpers import get_utc_timestamp_ns
|
||||
from letta.schemas.letta_message import AssistantMessage, LettaMessage, ReasoningMessage, ToolCallDelta, ToolCallMessage
|
||||
from letta.schemas.letta_message_content import TextContent
|
||||
from letta.schemas.message import Message
|
||||
@@ -65,16 +66,30 @@ class OpenAIStreamingInterface:
|
||||
function=FunctionCall(arguments=self.current_function_arguments, name=function_name),
|
||||
)
|
||||
|
||||
@trace_method
|
||||
async def process(self, stream: AsyncStream[ChatCompletionChunk]) -> AsyncGenerator[LettaMessage, None]:
|
||||
async def process(
|
||||
self,
|
||||
stream: AsyncStream[ChatCompletionChunk],
|
||||
ttft_span: Optional["Span"] = None,
|
||||
provider_request_start_timestamp_ns: Optional[int] = None,
|
||||
) -> AsyncGenerator[LettaMessage, None]:
|
||||
"""
|
||||
Iterates over the OpenAI stream, yielding SSE events.
|
||||
It also collects tokens and detects if a tool call is triggered.
|
||||
"""
|
||||
first_chunk = True
|
||||
|
||||
async with stream:
|
||||
prev_message_type = None
|
||||
message_index = 0
|
||||
async for chunk in stream:
|
||||
if first_chunk and ttft_span is not None and provider_request_start_timestamp_ns is not None:
|
||||
now = get_utc_timestamp_ns()
|
||||
ttft_ns = now - provider_request_start_timestamp_ns
|
||||
ttft_span.add_event(
|
||||
name="openai_time_to_first_token_ms", attributes={"openai_time_to_first_token_ms": ttft_ns // 1_000_000}
|
||||
)
|
||||
first_chunk = False
|
||||
|
||||
if not self.model or not self.message_id:
|
||||
self.model = chunk.model
|
||||
self.message_id = chunk.id
|
||||
|
||||
Reference in New Issue
Block a user