From a92e868ee651c76fb8d93f4bb102aa40094de31b Mon Sep 17 00:00:00 2001 From: Kian Jones <11655409+kianjones9@users.noreply.github.com> Date: Fri, 16 Jan 2026 22:23:48 -0800 Subject: [PATCH] feat: centralize telemetry logging at LLM client level (#8815) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: centralize telemetry logging at LLM client level Moves telemetry logging from individual adapters to LLMClientBase: - Add TelemetryStreamWrapper for streaming telemetry on stream close - Add request_async_with_telemetry() for non-streaming requests - Add stream_async_with_telemetry() for streaming requests - Add set_telemetry_context() to configure agent_id, run_id, step_id Updates adapters and agents to use new pattern: - LettaLLMAdapter now accepts agent_id/run_id in constructor - Adapters call set_telemetry_context() before LLM requests - Removes duplicate telemetry logging from adapters - Enriches traces with agent_id, run_id, call_type metadata 🐙 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix: accumulate streaming response content for telemetry TelemetryStreamWrapper now extracts actual response data from chunks: - Content text (concatenated from deltas) - Tool calls (id, name, arguments) - Model name, finish reason, usage stats 🐙 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * refactor: move streaming telemetry to caller (option 3) - Remove TelemetryStreamWrapper class - Add log_provider_trace_async() helper to LLMClientBase - stream_async_with_telemetry() now just returns raw stream - Callers log telemetry after processing with rich interface data Updated callers: - summarizer.py: logs content + usage after stream processing - letta_agent.py: logs tool_call, reasoning, model, usage 🐙 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta * fix: pass agent_id and run_id to parent adapter class LettaLLMStreamAdapter was not passing agent_id/run_id to parent, causing "unexpected keyword argument" errors. 🐙 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta --------- Co-authored-by: Letta --- letta/adapters/letta_llm_adapter.py | 10 +- letta/adapters/letta_llm_stream_adapter.py | 5 +- letta/adapters/simple_llm_request_adapter.py | 11 ++- letta/agents/ephemeral_summary_agent.py | 9 +- letta/agents/letta_agent.py | 50 ++++++++-- letta/agents/letta_agent_v2.py | 7 +- letta/agents/letta_agent_v3.py | 7 +- letta/llm_api/llm_client_base.py | 97 ++++++++++++++++++++ letta/server/rest_api/routers/v1/tools.py | 8 +- letta/services/summarizer/summarizer.py | 31 ++++++- 10 files changed, 216 insertions(+), 19 deletions(-) diff --git a/letta/adapters/letta_llm_adapter.py b/letta/adapters/letta_llm_adapter.py index ca2e1534..027bacdb 100644 --- a/letta/adapters/letta_llm_adapter.py +++ b/letta/adapters/letta_llm_adapter.py @@ -20,9 +20,17 @@ class LettaLLMAdapter(ABC): through a consistent API. """ - def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig) -> None: + def __init__( + self, + llm_client: LLMClientBase, + llm_config: LLMConfig, + agent_id: str | None = None, + run_id: str | None = None, + ) -> None: self.llm_client: LLMClientBase = llm_client self.llm_config: LLMConfig = llm_config + self.agent_id: str | None = agent_id + self.run_id: str | None = run_id self.message_id: str | None = None self.request_data: dict | None = None self.response_data: dict | None = None diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 7f96f914..3612ce95 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -26,9 +26,8 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): specific streaming formats. """ - def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig, run_id: str | None = None) -> None: - super().__init__(llm_client, llm_config) - self.run_id = run_id + def __init__(self, llm_client: LLMClientBase, llm_config: LLMConfig, agent_id: str | None = None, run_id: str | None = None) -> None: + super().__init__(llm_client, llm_config, agent_id=agent_id, run_id=run_id) self.interface: OpenAIStreamingInterface | AnthropicStreamingInterface | None = None async def invoke_llm( diff --git a/letta/adapters/simple_llm_request_adapter.py b/letta/adapters/simple_llm_request_adapter.py index e053c2a9..d795c8a1 100644 --- a/letta/adapters/simple_llm_request_adapter.py +++ b/letta/adapters/simple_llm_request_adapter.py @@ -38,9 +38,16 @@ class SimpleLLMRequestAdapter(LettaLLMRequestAdapter): # Store request data self.request_data = request_data - # Make the blocking LLM request + # Set telemetry context and make the blocking LLM request + self.llm_client.set_telemetry_context( + telemetry_manager=self.telemetry_manager, + step_id=step_id, + agent_id=self.agent_id, + run_id=self.run_id, + call_type="agent_step", + ) try: - self.response_data = await self.llm_client.request_async(request_data, self.llm_config) + self.response_data = await self.llm_client.request_async_with_telemetry(request_data, self.llm_config) except Exception as e: raise self.llm_client.handle_llm_error(e) diff --git a/letta/agents/ephemeral_summary_agent.py b/letta/agents/ephemeral_summary_agent.py index fc97611d..49af71e8 100644 --- a/letta/agents/ephemeral_summary_agent.py +++ b/letta/agents/ephemeral_summary_agent.py @@ -86,7 +86,14 @@ class EphemeralSummaryAgent(BaseAgent): ) request_data = llm_client.build_request_data(agent_state.agent_type, messages, agent_state.llm_config, tools=[]) - response_data = await llm_client.request_async(request_data, agent_state.llm_config) + from letta.services.telemetry_manager import TelemetryManager + + llm_client.set_telemetry_context( + telemetry_manager=TelemetryManager(), + agent_id=self.agent_id, + call_type="summarization", + ) + response_data = await llm_client.request_async_with_telemetry(request_data, agent_state.llm_config) response = await llm_client.convert_response_to_chat_completion(response_data, messages, agent_state.llm_config) summary = response.choices[0].message.content.strip() diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 1e5b85d6..e5ed1062 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -414,7 +414,9 @@ class LettaAgent(BaseAgent): provider_trace=ProviderTrace( request_json=request_data, response_json=response_data, - step_id=step_id, # Use original step_id for telemetry + step_id=step_id, + agent_id=self.agent_id, + run_id=self.current_run_id, ), ) step_progression = StepProgression.LOGGED_TRACE @@ -759,7 +761,9 @@ class LettaAgent(BaseAgent): provider_trace=ProviderTrace( request_json=request_data, response_json=response_data, - step_id=step_id, # Use original step_id for telemetry + step_id=step_id, + agent_id=self.agent_id, + run_id=self.current_run_id, ), ) step_progression = StepProgression.LOGGED_TRACE @@ -1117,6 +1121,22 @@ class LettaAgent(BaseAgent): stop_reason = LettaStopReason(stop_reason=StopReasonType.invalid_tool_call.value) raise e reasoning_content = interface.get_reasoning_content() + + # Log provider trace telemetry after stream processing + await llm_client.log_provider_trace_async( + request_data=request_data, + response_json={ + "content": { + "tool_call": tool_call.model_dump() if tool_call else None, + "reasoning": [c.model_dump() for c in reasoning_content] if reasoning_content else [], + }, + "model": getattr(interface, "model", None), + "usage": { + "input_tokens": interface.input_tokens, + "output_tokens": interface.output_tokens, + }, + }, + ) persisted_messages, should_continue, stop_reason = await self._handle_ai_response( tool_call, valid_tool_names, @@ -1208,7 +1228,9 @@ class LettaAgent(BaseAgent): "output_tokens": usage.completion_tokens, }, }, - step_id=step_id, # Use original step_id for telemetry + step_id=step_id, + agent_id=self.agent_id, + run_id=self.current_run_id, ), ) step_progression = StepProgression.LOGGED_TRACE @@ -1430,8 +1452,14 @@ class LettaAgent(BaseAgent): log_event("agent.stream_no_tokens.llm_request.created") async with AsyncTimer() as timer: - # Attempt LLM request - response = await llm_client.request_async(request_data, agent_state.llm_config) + # Attempt LLM request with telemetry + llm_client.set_telemetry_context( + telemetry_manager=self.telemetry_manager, + agent_id=self.agent_id, + run_id=self.current_run_id, + call_type="agent_step", + ) + response = await llm_client.request_async_with_telemetry(request_data, agent_state.llm_config) # Track LLM request time step_metrics.llm_request_ns = int(timer.elapsed_ns) @@ -1492,10 +1520,18 @@ class LettaAgent(BaseAgent): attributes={"request_start_to_provider_request_start_ns": ns_to_ms(request_start_to_provider_request_start_ns)}, ) - # Attempt LLM request + # Set telemetry context before streaming + llm_client.set_telemetry_context( + telemetry_manager=self.telemetry_manager, + agent_id=self.agent_id, + run_id=self.current_run_id, + call_type="agent_step", + ) + + # Attempt LLM request with telemetry wrapper return ( request_data, - await llm_client.stream_async(request_data, agent_state.llm_config), + await llm_client.stream_async_with_telemetry(request_data, agent_state.llm_config), current_in_context_messages, new_in_context_messages, valid_tool_names, diff --git a/letta/agents/letta_agent_v2.py b/letta/agents/letta_agent_v2.py index 4cc4a368..83d3d0bb 100644 --- a/letta/agents/letta_agent_v2.py +++ b/letta/agents/letta_agent_v2.py @@ -205,7 +205,9 @@ class LettaAgentV2(BaseAgentV2): response = self._step( messages=in_context_messages + self.response_messages, input_messages_to_persist=input_messages_to_persist, - llm_adapter=LettaLLMRequestAdapter(llm_client=self.llm_client, llm_config=self.agent_state.llm_config), + llm_adapter=LettaLLMRequestAdapter( + llm_client=self.llm_client, llm_config=self.agent_state.llm_config, agent_id=self.agent_state.id, run_id=run_id + ), run_id=run_id, use_assistant_message=use_assistant_message, include_return_message_types=include_return_message_types, @@ -286,12 +288,15 @@ class LettaAgentV2(BaseAgentV2): llm_adapter = LettaLLMStreamAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, + agent_id=self.agent_state.id, run_id=run_id, ) else: llm_adapter = LettaLLMRequestAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, + agent_id=self.agent_state.id, + run_id=run_id, ) try: diff --git a/letta/agents/letta_agent_v3.py b/letta/agents/letta_agent_v3.py index 19ac2720..458d53d0 100644 --- a/letta/agents/letta_agent_v3.py +++ b/letta/agents/letta_agent_v3.py @@ -167,7 +167,9 @@ class LettaAgentV3(LettaAgentV2): messages=list(self.in_context_messages + input_messages_to_persist), input_messages_to_persist=input_messages_to_persist, # TODO need to support non-streaming adapter too - llm_adapter=SimpleLLMRequestAdapter(llm_client=self.llm_client, llm_config=self.agent_state.llm_config), + llm_adapter=SimpleLLMRequestAdapter( + llm_client=self.llm_client, llm_config=self.agent_state.llm_config, agent_id=self.agent_state.id, run_id=run_id + ), run_id=run_id, # use_assistant_message=use_assistant_message, include_return_message_types=include_return_message_types, @@ -307,12 +309,15 @@ class LettaAgentV3(LettaAgentV2): llm_adapter = SimpleLLMStreamAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, + agent_id=self.agent_state.id, run_id=run_id, ) else: llm_adapter = SimpleLLMRequestAdapter( llm_client=self.llm_client, llm_config=self.agent_state.llm_config, + agent_id=self.agent_state.id, + run_id=run_id, ) try: diff --git a/letta/llm_api/llm_client_base.py b/letta/llm_api/llm_client_base.py index 5ed4b9e8..bc3816b9 100644 --- a/letta/llm_api/llm_client_base.py +++ b/letta/llm_api/llm_client_base.py @@ -37,6 +37,103 @@ class LLMClientBase: self.actor = actor self.put_inner_thoughts_first = put_inner_thoughts_first self.use_tool_naming = use_tool_naming + self._telemetry_manager: Optional["TelemetryManager"] = None + self._telemetry_agent_id: Optional[str] = None + self._telemetry_run_id: Optional[str] = None + self._telemetry_step_id: Optional[str] = None + self._telemetry_call_type: Optional[str] = None + + def set_telemetry_context( + self, + telemetry_manager: Optional["TelemetryManager"] = None, + agent_id: Optional[str] = None, + run_id: Optional[str] = None, + step_id: Optional[str] = None, + call_type: Optional[str] = None, + ) -> None: + """Set telemetry context for provider trace logging.""" + self._telemetry_manager = telemetry_manager + self._telemetry_agent_id = agent_id + self._telemetry_run_id = run_id + self._telemetry_step_id = step_id + self._telemetry_call_type = call_type + + async def request_async_with_telemetry(self, request_data: dict, llm_config: LLMConfig) -> dict: + """Wrapper around request_async that logs telemetry for all requests including errors. + + Call set_telemetry_context() first to set agent_id, run_id, etc. + """ + from letta.log import get_logger + + logger = get_logger(__name__) + response_data = None + error_msg = None + try: + response_data = await self.request_async(request_data, llm_config) + return response_data + except Exception as e: + error_msg = str(e) + raise + finally: + if self._telemetry_manager and settings.track_provider_trace: + if self.actor is None: + logger.warning(f"Skipping telemetry: actor is None (call_type={self._telemetry_call_type})") + else: + try: + pydantic_actor = self.actor.to_pydantic() if hasattr(self.actor, "to_pydantic") else self.actor + await self._telemetry_manager.create_provider_trace_async( + actor=pydantic_actor, + provider_trace=ProviderTrace( + request_json=request_data, + response_json=response_data if response_data else {"error": error_msg}, + step_id=self._telemetry_step_id, + agent_id=self._telemetry_agent_id, + run_id=self._telemetry_run_id, + call_type=self._telemetry_call_type, + ), + ) + except Exception as e: + logger.warning(f"Failed to log telemetry: {e}") + + async def stream_async_with_telemetry(self, request_data: dict, llm_config: LLMConfig): + """Returns raw stream. Caller should log telemetry after processing via log_provider_trace_async(). + + Call set_telemetry_context() first to set agent_id, run_id, etc. + After consuming the stream, call log_provider_trace_async() with the response data. + """ + return await self.stream_async(request_data, llm_config) + + async def log_provider_trace_async(self, request_data: dict, response_json: dict) -> None: + """Log provider trace telemetry. Call after processing LLM response. + + Uses telemetry context set via set_telemetry_context(). + """ + from letta.log import get_logger + + logger = get_logger(__name__) + + if not self._telemetry_manager or not settings.track_provider_trace: + return + + if self.actor is None: + logger.warning(f"Skipping telemetry: actor is None (call_type={self._telemetry_call_type})") + return + + try: + pydantic_actor = self.actor.to_pydantic() if hasattr(self.actor, "to_pydantic") else self.actor + await self._telemetry_manager.create_provider_trace_async( + actor=pydantic_actor, + provider_trace=ProviderTrace( + request_json=request_data, + response_json=response_json, + step_id=self._telemetry_step_id, + agent_id=self._telemetry_agent_id, + run_id=self._telemetry_run_id, + call_type=self._telemetry_call_type, + ), + ) + except Exception as e: + logger.warning(f"Failed to log telemetry: {e}") @trace_method async def send_llm_request( diff --git a/letta/server/rest_api/routers/v1/tools.py b/letta/server/rest_api/routers/v1/tools.py index acff6d84..ac6de569 100644 --- a/letta/server/rest_api/routers/v1/tools.py +++ b/letta/server/rest_api/routers/v1/tools.py @@ -952,7 +952,13 @@ async def generate_tool_from_prompt( llm_config, tools=[tool], ) - response_data = await llm_client.request_async(request_data, llm_config) + from letta.services.telemetry_manager import TelemetryManager + + llm_client.set_telemetry_context( + telemetry_manager=TelemetryManager(), + call_type="tool_generation", + ) + response_data = await llm_client.request_async_with_telemetry(request_data, llm_config) response = await llm_client.convert_response_to_chat_completion(response_data, input_messages, llm_config) # Validate that we got a tool call response diff --git a/letta/services/summarizer/summarizer.py b/letta/services/summarizer/summarizer.py index 78a97a91..5de1ec48 100644 --- a/letta/services/summarizer/summarizer.py +++ b/letta/services/summarizer/summarizer.py @@ -426,11 +426,15 @@ async def simple_summary( actor: User, include_ack: bool = True, prompt: str | None = None, + telemetry_manager: "TelemetryManager | None" = None, + agent_id: str | None = None, + run_id: str | None = None, ) -> str: """Generate a simple summary from a list of messages. Intentionally kept functional due to the simplicity of the prompt. """ + from letta.services.telemetry_manager import TelemetryManager # Create an LLMClient from the config llm_client = LLMClient.create( @@ -440,6 +444,15 @@ async def simple_summary( ) assert llm_client is not None + # Always set telemetry context - create TelemetryManager if not provided + tm = telemetry_manager or TelemetryManager() + llm_client.set_telemetry_context( + telemetry_manager=tm, + agent_id=agent_id, + run_id=run_id, + call_type="summarization", + ) + # Prepare the messages payload to send to the LLM system_prompt = prompt or gpt_summarize.SYSTEM # Build the initial transcript without clamping to preserve fidelity @@ -494,13 +507,27 @@ async def simple_summary( ) # AnthropicClient.stream_async sets request_data["stream"] = True internally. - stream = await llm_client.stream_async(req_data, summarizer_llm_config) + stream = await llm_client.stream_async_with_telemetry(req_data, summarizer_llm_config) async for _chunk in interface.process(stream): # We don't emit anything; we just want the fully-accumulated content. pass content_parts = interface.get_content() text = "".join(part.text for part in content_parts if isinstance(part, TextContent)).strip() + + # Log telemetry after stream processing + await llm_client.log_provider_trace_async( + request_data=req_data, + response_json={ + "content": text, + "model": summarizer_llm_config.model, + "usage": { + "input_tokens": getattr(interface, "input_tokens", None), + "output_tokens": getattr(interface, "output_tokens", None), + }, + }, + ) + if not text: logger.warning("No content returned from summarizer (streaming path)") raise Exception("Summary failed to generate") @@ -512,7 +539,7 @@ async def simple_summary( summarizer_llm_config.model_endpoint_type, summarizer_llm_config.model, ) - response_data = await llm_client.request_async(req_data, summarizer_llm_config) + response_data = await llm_client.request_async_with_telemetry(req_data, summarizer_llm_config) response = await llm_client.convert_response_to_chat_completion( response_data, req_messages_obj,