From f2485daef791ba537ae49b60f39d3fdf747940f2 Mon Sep 17 00:00:00 2001 From: cthomas Date: Fri, 5 Sep 2025 14:28:20 -0700 Subject: [PATCH] feat: add provider trace persistence in llm adapters (#4442) * feat: add provider trace persistence in llm adapters * hook up to new agent loop --- letta/adapters/letta_llm_adapter.py | 27 ++++++++++- letta/adapters/letta_llm_request_adapter.py | 36 +++++++++++++- letta/adapters/letta_llm_stream_adapter.py | 53 ++++++++++++++++++++- letta/agents/letta_agent_v2.py | 2 + 4 files changed, 114 insertions(+), 4 deletions(-) diff --git a/letta/adapters/letta_llm_adapter.py b/letta/adapters/letta_llm_adapter.py index 8bd4c5b2..022eb9fd 100644 --- a/letta/adapters/letta_llm_adapter.py +++ b/letta/adapters/letta_llm_adapter.py @@ -7,6 +7,8 @@ from letta.schemas.letta_message_content import ReasoningContent, RedactedReason from letta.schemas.llm_config import LLMConfig from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, ToolCall from letta.schemas.usage import LettaUsageStatistics +from letta.schemas.user import User +from letta.services.telemetry_manager import TelemetryManager class LettaLLMAdapter(ABC): @@ -18,7 +20,7 @@ class LettaLLMAdapter(ABC): through a consistent API. """ - def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig): + def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig) -> None: self.llm_client: LLMClientBase = llm_client self.llm_config: LLMConfig = llm_config self.message_id: str | None = None @@ -28,6 +30,7 @@ class LettaLLMAdapter(ABC): self.reasoning_content: list[TextContent | ReasoningContent | RedactedReasoningContent] | None = None self.tool_call: ToolCall | None = None self.usage: LettaUsageStatistics = LettaUsageStatistics() + self.telemetry_manager: TelemetryManager = TelemetryManager() @abstractmethod async def invoke_llm( @@ -36,7 +39,9 @@ class LettaLLMAdapter(ABC): messages: list, tools: list, use_assistant_message: bool, - ) -> AsyncGenerator[LettaMessage, None]: + step_id: str | None = None, + actor: User | None = None, + ) -> AsyncGenerator[LettaMessage | None, None]: """ Execute the LLM call and yield results as they become available. @@ -45,6 +50,8 @@ class LettaLLMAdapter(ABC): messages: The messages in context for the request tools: The tools available for the LLM to use use_assistant_message: If true, use assistant messages when streaming response + step_id: The step ID associated with this request. If provided, logs request and response data. + actor: The optional actor associated with this request for logging purposes. Yields: LettaMessage: Chunks of data for streaming adapters, or None for blocking adapters @@ -52,4 +59,20 @@ class LettaLLMAdapter(ABC): raise NotImplementedError def supports_token_streaming(self) -> bool: + """ + Check if the adapter supports token-level streaming. + + Returns: + bool: True if the adapter can stream back tokens as they are generated, False otherwise + """ return False + + def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: + """ + Log provider trace data for telemetry purposes. + + Args: + step_id: The step ID associated with this request for logging purposes + actor: The user associated with this request for logging purposes + """ + raise NotImplementedError diff --git a/letta/adapters/letta_llm_request_adapter.py b/letta/adapters/letta_llm_request_adapter.py index b34dac39..3fd0803a 100644 --- a/letta/adapters/letta_llm_request_adapter.py +++ b/letta/adapters/letta_llm_request_adapter.py @@ -1,8 +1,12 @@ +import asyncio from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter from letta.schemas.letta_message import LettaMessage from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, TextContent +from letta.schemas.provider_trace import ProviderTraceCreate +from letta.schemas.user import User +from letta.settings import settings class LettaLLMRequestAdapter(LettaLLMAdapter): @@ -20,7 +24,9 @@ class LettaLLMRequestAdapter(LettaLLMAdapter): messages: list, tools: list, use_assistant_message: bool, - ) -> AsyncGenerator[LettaMessage, None]: + step_id: str | None = None, + actor: str | None = None, + ) -> AsyncGenerator[LettaMessage | None, None]: """ Execute a blocking LLM request and yield the response. @@ -70,5 +76,33 @@ class LettaLLMRequestAdapter(LettaLLMAdapter): self.usage.prompt_tokens = self.chat_completions_response.usage.prompt_tokens self.usage.total_tokens = self.chat_completions_response.usage.total_tokens + self.log_provider_trace(step_id=step_id, actor=actor) + yield None return + + def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: + """ + Log provider trace data for telemetry purposes in a fire-and-forget manner. + + Creates an async task to log the request/response data without blocking + the main execution flow. The task runs in the background. + + Args: + step_id: The step ID associated with this request for logging purposes + actor: The user associated with this request for logging purposes + """ + if step_id is None or actor is None or not settings.track_provider_trace: + return + + asyncio.create_task( + self.telemetry_manager.create_provider_trace_async( + actor=actor, + provider_trace_create=ProviderTraceCreate( + request_json=self.request_data, + response_json=self.response_data, + step_id=step_id, # Use original step_id for telemetry + organization_id=actor.organization_id, + ), + ) + ) diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 69db429c..d7323aa6 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -1,3 +1,4 @@ +import asyncio from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter @@ -7,7 +8,10 @@ from letta.llm_api.llm_client_base import LLMClientBase from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage from letta.schemas.llm_config import LLMConfig +from letta.schemas.provider_trace import ProviderTraceCreate from letta.schemas.usage import LettaUsageStatistics +from letta.schemas.user import User +from letta.settings import settings class LettaLLMStreamAdapter(LettaLLMAdapter): @@ -20,7 +24,7 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): specific streaming formats. """ - def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig): + def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig) -> None: super().__init__(llm_client, llm_config) self.interface: OpenAIStreamingInterface | AnthropicStreamingInterface | None = None @@ -30,6 +34,8 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): messages: list, tools: list, use_assistant_message: bool, + step_id: str | None = None, + actor: User | None = None, ) -> AsyncGenerator[LettaMessage, None]: """ Execute a streaming LLM request and yield tokens/chunks as they arrive. @@ -109,5 +115,50 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): # Store any additional data from the interface self.message_id = self.interface.letta_message_id + # Log request and response data + self.log_provider_trace(step_id=step_id, actor=actor) + def supports_token_streaming(self) -> bool: return True + + def log_provider_trace(self, step_id: str | None, actor: User | None) -> None: + """ + Log provider trace data for telemetry purposes in a fire-and-forget manner. + + Creates an async task to log the request/response data without blocking + the main execution flow. For streaming adapters, this includes the final + tool call and reasoning content collected during streaming. + + Args: + step_id: The step ID associated with this request for logging purposes + actor: The user associated with this request for logging purposes + """ + if step_id is None or actor is None or not settings.track_provider_trace: + return + + asyncio.create_task( + self.telemetry_manager.create_provider_trace_async( + actor=actor, + provider_trace_create=ProviderTraceCreate( + request_json=self.request_data, + response_json={ + "content": { + "tool_call": self.tool_call.model_dump_json(), + "reasoning": [content.model_dump_json() for content in self.reasoning_content], + }, + "id": self.interface.message_id, + "model": self.interface.model, + "role": "assistant", + # "stop_reason": "", + # "stop_sequence": None, + "type": "message", + "usage": { + "input_tokens": self.usage.prompt_tokens, + "output_tokens": self.usage.completion_tokens, + }, + }, + step_id=step_id, # Use original step_id for telemetry + organization_id=actor.organization_id, + ), + ) + ) diff --git a/letta/agents/letta_agent_v2.py b/letta/agents/letta_agent_v2.py index 180eb790..3ffcfd22 100644 --- a/letta/agents/letta_agent_v2.py +++ b/letta/agents/letta_agent_v2.py @@ -419,6 +419,8 @@ class LettaAgentV2(BaseAgentV2): messages=messages, tools=valid_tools, use_assistant_message=use_assistant_message, + step_id=step_id, + actor=self.actor, ) async for chunk in invocation: if llm_adapter.supports_token_streaming():