From 5165d60881718ea8e7b94e2a5455b67ea85b4d73 Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Tue, 2 Dec 2025 17:17:40 -0800 Subject: [PATCH] feat: add a new span and log the provider request and response data objects (#6492) add a new span and log the provider request and response data objects --- letta/adapters/letta_llm_request_adapter.py | 35 ++++++--- letta/adapters/letta_llm_stream_adapter.py | 66 ++++++++++------- letta/adapters/simple_llm_stream_adapter.py | 80 ++++++++++++--------- 3 files changed, 111 insertions(+), 70 deletions(-) diff --git a/letta/adapters/letta_llm_request_adapter.py b/letta/adapters/letta_llm_request_adapter.py index eb7b606c..e2a3c88c 100644 --- a/letta/adapters/letta_llm_request_adapter.py +++ b/letta/adapters/letta_llm_request_adapter.py @@ -2,6 +2,7 @@ from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.otel.tracing import log_attributes, log_event, trace_method from letta.schemas.letta_message import LettaMessage from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, TextContent from letta.schemas.provider_trace import ProviderTraceCreate @@ -86,6 +87,7 @@ class LettaLLMRequestAdapter(LettaLLMAdapter): yield None return + @trace_method def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: """ Log provider trace data for telemetry purposes in a fire-and-forget manner. @@ -97,17 +99,28 @@ class LettaLLMRequestAdapter(LettaLLMAdapter): step_id: The step ID associated with this request for logging purposes actor: The user associated with this request for logging purposes """ - if step_id is None or actor is None or not settings.track_provider_trace: + + if step_id is None or actor is None: return - safe_create_task( - self.telemetry_manager.create_provider_trace_async( - actor=actor, - provider_trace_create=ProviderTraceCreate( - request_json=self.request_data, - response_json=self.response_data, - step_id=step_id, # Use original step_id for telemetry - ), - ), - label="create_provider_trace", + log_attributes( + { + "step_id": step_id, + "actor": actor, + "request_data": self.request_data, + "response_data": self.response_data, + } ) + + if settings.track_provider_trace: + safe_create_task( + self.telemetry_manager.create_provider_trace_async( + actor=actor, + provider_trace_create=ProviderTraceCreate( + request_json=self.request_data, + response_json=self.response_data, + step_id=step_id, # Use original step_id for telemetry + ), + ), + label="create_provider_trace", + ) diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 1ae25f21..a97ad56b 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -5,6 +5,7 @@ from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface from letta.interfaces.openai_streaming_interface import OpenAIStreamingInterface from letta.llm_api.llm_client_base import LLMClientBase +from letta.otel.tracing import log_attributes, trace_method from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage from letta.schemas.llm_config import LLMConfig @@ -139,6 +140,7 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): def supports_token_streaming(self) -> bool: return True + @trace_method def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: """ Log provider trace data for telemetry purposes in a fire-and-forget manner. @@ -151,32 +153,44 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): step_id: The step ID associated with this request for logging purposes actor: The user associated with this request for logging purposes """ - if step_id is None or actor is None or not settings.track_provider_trace: + if step_id is None or actor is None: return - safe_create_task( - self.telemetry_manager.create_provider_trace_async( - actor=actor, - provider_trace_create=ProviderTraceCreate( - request_json=self.request_data, - response_json={ - "content": { - "tool_call": self.tool_call.model_dump_json() if self.tool_call else None, - "reasoning": [content.model_dump_json() for content in self.reasoning_content], - }, - "id": self.interface.message_id, - "model": self.interface.model, - "role": "assistant", - # "stop_reason": "", - # "stop_sequence": None, - "type": "message", - "usage": { - "input_tokens": self.usage.prompt_tokens, - "output_tokens": self.usage.completion_tokens, - }, - }, - step_id=step_id, # Use original step_id for telemetry - ), - ), - label="create_provider_trace", + response_json = { + "content": { + "tool_call": self.tool_call.model_dump_json() if self.tool_call else None, + "reasoning": [content.model_dump_json() for content in self.reasoning_content], + }, + "id": self.interface.message_id, + "model": self.interface.model, + "role": "assistant", + # "stop_reason": "", + # "stop_sequence": None, + "type": "message", + "usage": { + "input_tokens": self.usage.prompt_tokens, + "output_tokens": self.usage.completion_tokens, + }, + } + + log_attributes( + { + "step_id": step_id, + "actor": actor, + "request_data": self.request_data, + "response_data": response_json, + } ) + + if settings.track_provider_trace: + safe_create_task( + self.telemetry_manager.create_provider_trace_async( + actor=actor, + provider_trace_create=ProviderTraceCreate( + request_json=self.request_data, + response_json=response_json, + step_id=step_id, # Use original step_id for telemetry + ), + ), + label="create_provider_trace", + ) diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index b4155fb3..91ce21fa 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -5,6 +5,7 @@ from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.interfaces.anthropic_parallel_tool_call_streaming_interface import SimpleAnthropicStreamingInterface from letta.interfaces.gemini_streaming_interface import SimpleGeminiStreamingInterface from letta.interfaces.openai_streaming_interface import SimpleOpenAIResponsesStreamingInterface, SimpleOpenAIStreamingInterface +from letta.otel.tracing import log_attributes, trace_method from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage from letta.schemas.letta_message_content import LettaMessageContentUnion @@ -214,6 +215,7 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): # Log request and response data self.log_provider_trace(step_id=step_id, actor=actor) + @trace_method def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: """ Log provider trace data for telemetry purposes in a fire-and-forget manner. @@ -226,39 +228,51 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): step_id: The step ID associated with this request for logging purposes actor: The user associated with this request for logging purposes """ - if step_id is None or actor is None or not settings.track_provider_trace: + if step_id is None or actor is None: return - safe_create_task( - self.telemetry_manager.create_provider_trace_async( - actor=actor, - provider_trace_create=ProviderTraceCreate( - request_json=self.request_data, - response_json={ - "content": { - "tool_call": self.tool_call.model_dump_json() if self.tool_call else None, - # "reasoning": [content.model_dump_json() for content in self.reasoning_content], - # NOTE: different - # TODO potentially split this into both content and reasoning? - "content": [content.model_dump_json() for content in self.content], - }, - "id": self.interface.message_id, - "model": self.interface.model, - "role": "assistant", - # "stop_reason": "", - # "stop_sequence": None, - "type": "message", - # Use raw_usage if available for transparent provider trace logging, else fallback - "usage": self.interface.raw_usage - if hasattr(self.interface, "raw_usage") and self.interface.raw_usage - else { - "input_tokens": self.usage.prompt_tokens, - "output_tokens": self.usage.completion_tokens, - }, - }, - step_id=step_id, # Use original step_id for telemetry - organization_id=actor.organization_id, - ), - ), - label="create_provider_trace", + response_json = { + "content": { + "tool_call": self.tool_call.model_dump_json() if self.tool_call else None, + # "reasoning": [content.model_dump_json() for content in self.reasoning_content], + # NOTE: different + # TODO potentially split this into both content and reasoning? + "content": [content.model_dump_json() for content in self.content], + }, + "id": self.interface.message_id, + "model": self.interface.model, + "role": "assistant", + # "stop_reason": "", + # "stop_sequence": None, + "type": "message", + # Use raw_usage if available for transparent provider trace logging, else fallback + "usage": self.interface.raw_usage + if hasattr(self.interface, "raw_usage") and self.interface.raw_usage + else { + "input_tokens": self.usage.prompt_tokens, + "output_tokens": self.usage.completion_tokens, + }, + } + + log_attributes( + { + "step_id": step_id, + "actor": actor, + "request_data": self.request_data, + "response_data": response_json, + } ) + + if settings.track_provider_trace: + safe_create_task( + self.telemetry_manager.create_provider_trace_async( + actor=actor, + provider_trace_create=ProviderTraceCreate( + request_json=self.request_data, + response_json=response_json, + step_id=step_id, # Use original step_id for telemetry + organization_id=actor.organization_id, + ), + ), + label="create_provider_trace", + )