diff --git a/letta/adapters/letta_llm_request_adapter.py b/letta/adapters/letta_llm_request_adapter.py index 015e8045..fdcf050b 100644 --- a/letta/adapters/letta_llm_request_adapter.py +++ b/letta/adapters/letta_llm_request_adapter.py @@ -1,9 +1,8 @@ -import json from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter from letta.helpers.datetime_helpers import get_utc_timestamp_ns -from letta.otel.tracing import log_attributes, log_event, trace_method +from letta.otel.tracing import log_attributes, log_event, safe_json_dumps, trace_method from letta.schemas.letta_message import LettaMessage from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, TextContent from letta.schemas.provider_trace import ProviderTraceCreate @@ -106,8 +105,8 @@ class LettaLLMRequestAdapter(LettaLLMAdapter): log_attributes( { - "request_data": json.dumps(self.request_data), - "response_data": json.dumps(self.response_data), + "request_data": safe_json_dumps(self.request_data), + "response_data": safe_json_dumps(self.response_data), } ) diff --git a/letta/adapters/letta_llm_stream_adapter.py b/letta/adapters/letta_llm_stream_adapter.py index 286c6f4a..4ad4bf92 100644 --- a/letta/adapters/letta_llm_stream_adapter.py +++ b/letta/adapters/letta_llm_stream_adapter.py @@ -1,4 +1,3 @@ -import json from typing import AsyncGenerator from letta.adapters.letta_llm_adapter import LettaLLMAdapter @@ -6,7 +5,7 @@ from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface from letta.interfaces.openai_streaming_interface import OpenAIStreamingInterface from letta.llm_api.llm_client_base import LLMClientBase -from letta.otel.tracing import log_attributes, trace_method +from letta.otel.tracing import log_attributes, safe_json_dumps, trace_method from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage from letta.schemas.llm_config import LLMConfig @@ -174,10 +173,13 @@ class LettaLLMStreamAdapter(LettaLLMAdapter): }, } + # Store response data for future reference + self.response_data = response_json + log_attributes( { - "request_data": json.dumps(self.request_data), - "response_data": json.dumps(self.response_data), + "request_data": safe_json_dumps(self.request_data), + "response_data": safe_json_dumps(response_json), } ) diff --git a/letta/adapters/simple_llm_stream_adapter.py b/letta/adapters/simple_llm_stream_adapter.py index 8e7550ff..3089f94c 100644 --- a/letta/adapters/simple_llm_stream_adapter.py +++ b/letta/adapters/simple_llm_stream_adapter.py @@ -6,7 +6,7 @@ from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.interfaces.anthropic_parallel_tool_call_streaming_interface import SimpleAnthropicStreamingInterface from letta.interfaces.gemini_streaming_interface import SimpleGeminiStreamingInterface from letta.interfaces.openai_streaming_interface import SimpleOpenAIResponsesStreamingInterface, SimpleOpenAIStreamingInterface -from letta.otel.tracing import log_attributes, trace_method +from letta.otel.tracing import log_attributes, safe_json_dumps, trace_method from letta.schemas.enums import ProviderType from letta.schemas.letta_message import LettaMessage from letta.schemas.letta_message_content import LettaMessageContentUnion @@ -257,8 +257,8 @@ class SimpleLLMStreamAdapter(LettaLLMStreamAdapter): log_attributes( { - "request_data": json.dumps(self.request_data), - "response_data": json.dumps(response_json), + "request_data": safe_json_dumps(self.request_data), + "response_data": safe_json_dumps(response_json), } ) diff --git a/letta/otel/tracing.py b/letta/otel/tracing.py index cae7850d..3a2238ed 100644 --- a/letta/otel/tracing.py +++ b/letta/otel/tracing.py @@ -1,6 +1,7 @@ import asyncio import inspect import itertools +import json import re import time import traceback @@ -417,6 +418,46 @@ def trace_method(func): return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper +def safe_json_dumps(data) -> str: + """ + Safely serialize data to JSON, handling edge cases like byte arrays. + + Used primarily for OTEL tracing to prevent serialization errors from + breaking the streaming flow when logging request/response data. + + Args: + data: Data to serialize (dict, bytes, str, etc.) + + Returns: + JSON string representation, or error message if serialization fails + """ + try: + # Handle byte arrays (e.g., from Gemini) + if isinstance(data, bytes): + try: + # Try to decode as UTF-8 first + decoded = data.decode("utf-8") + # Try to parse as JSON + try: + parsed = json.loads(decoded) + return json.dumps(parsed) + except json.JSONDecodeError: + # If not JSON, return the decoded string + return json.dumps({"raw_text": decoded}) + except UnicodeDecodeError: + # If decode fails, return base64 representation + import base64 + + return json.dumps({"base64": base64.b64encode(data).decode("ascii")}) + + # Normal case: try direct serialization + return json.dumps(data) + except Exception as e: + # Last resort: return error message + logger.warning(f"Failed to serialize data to JSON: {e}", exc_info=True) + return json.dumps({"error": f"Serialization failed: {str(e)}", "type": str(type(data))}) + + def log_attributes(attributes: Dict[str, Any]) -> None: current_span = trace.get_current_span() if current_span: diff --git a/letta/services/tool_manager.py b/letta/services/tool_manager.py index 059a8137..c32910ec 100644 --- a/letta/services/tool_manager.py +++ b/letta/services/tool_manager.py @@ -227,7 +227,8 @@ class ToolManager: ) # check if the tool name already exists - current_tool = await self.get_tool_by_name_async(tool_name=pydantic_tool.name, actor=actor) + with tracer.start_as_current_span("get_tool_by_name_async"): + current_tool = await self.get_tool_by_name_async(tool_name=pydantic_tool.name, actor=actor) if current_tool: # Put to dict and remove fields that should not be reset with tracer.start_as_current_span("pydantic_tool.model_dump"): @@ -245,21 +246,24 @@ class ToolManager: updated_tool_type = update_data.get("tool_type") with tracer.start_as_current_span("ToolUpdate_initialization"): tool_update = ToolUpdate(**update_data) - tool = await self.update_tool_by_id_async( - current_tool.id, - tool_update, - actor, - updated_tool_type=updated_tool_type, - modal_sandbox_enabled=modal_sandbox_enabled, - ) + with tracer.start_as_current_span("update_tool_by_id_async"): + tool = await self.update_tool_by_id_async( + current_tool.id, + tool_update, + actor, + updated_tool_type=updated_tool_type, + modal_sandbox_enabled=modal_sandbox_enabled, + ) else: printd( f"`create_or_update_tool` was called with user_id={actor.id}, organization_id={actor.organization_id}, name={pydantic_tool.name}, but found existing tool with nothing to update." ) - tool = await self.get_tool_by_id_async(current_tool.id, actor=actor) + with tracer.start_as_current_span("get_tool_by_id_async"): + tool = await self.get_tool_by_id_async(current_tool.id, actor=actor) return tool - return await self.create_tool_async(pydantic_tool, actor=actor, modal_sandbox_enabled=modal_sandbox_enabled) + with tracer.start_as_current_span("create_tool_async"): + return await self.create_tool_async(pydantic_tool, actor=actor, modal_sandbox_enabled=modal_sandbox_enabled) @enforce_types async def create_mcp_server(