From 80f6e97ca91b223f075230a853fef5ef96de782c Mon Sep 17 00:00:00 2001 From: Andy Li <55300002+cliandy@users.noreply.github.com> Date: Thu, 5 Jun 2025 17:20:14 -0700 Subject: [PATCH] feat: otel metrics and expanded collecting (#2647) (passed tests in last run) --- letta/agent.py | 2 +- letta/agents/letta_agent.py | 61 ++++++--- letta/agents/letta_agent_batch.py | 2 +- letta/agents/voice_sleeptime_agent.py | 14 +- letta/groups/sleeptime_multi_agent_v2.py | 2 +- letta/helpers/datetime_helpers.py | 50 ++++++- .../noop_helper.py => helpers/singleton.py} | 5 + .../anthropic_streaming_interface.py | 4 +- .../interfaces/openai_streaming_interface.py | 4 +- letta/llm_api/anthropic.py | 2 +- letta/llm_api/anthropic_client.py | 2 +- letta/llm_api/google_vertex_client.py | 2 +- letta/llm_api/llm_api_tools.py | 2 +- letta/llm_api/llm_client_base.py | 2 +- letta/llm_api/openai.py | 2 +- letta/llm_api/openai_client.py | 2 +- letta/local_llm/chat_completion_proxy.py | 2 +- letta/memory.py | 2 +- letta/otel/__init__.py | 0 letta/otel/context.py | 26 ++++ letta/otel/events.py | 0 letta/otel/metric_registry.py | 122 ++++++++++++++++++ letta/otel/metrics.py | 66 ++++++++++ letta/otel/resource.py | 26 ++++ letta/{ => otel}/tracing.py | 121 +++++++---------- letta/server/db.py | 2 +- letta/server/rest_api/app.py | 12 +- letta/server/rest_api/routers/v1/agents.py | 8 +- letta/server/rest_api/utils.py | 12 +- letta/server/server.py | 2 +- letta/services/agent_manager.py | 2 +- letta/services/block_manager.py | 2 +- letta/services/files_agents_manager.py | 2 +- letta/services/group_manager.py | 2 +- .../services/helpers/agent_manager_helper.py | 2 +- letta/services/identity_manager.py | 2 +- letta/services/job_manager.py | 2 +- letta/services/llm_batch_manager.py | 2 +- letta/services/message_manager.py | 2 +- letta/services/organization_manager.py | 2 +- letta/services/passage_manager.py | 2 +- letta/services/per_agent_lock_manager.py | 2 +- letta/services/provider_manager.py | 2 +- letta/services/sandbox_config_manager.py | 2 +- letta/services/source_manager.py | 2 +- letta/services/step_manager.py | 4 +- letta/services/summarizer/summarizer.py | 2 +- letta/services/telemetry_manager.py | 2 +- .../tool_executor/builtin_tool_executor.py | 2 +- .../tool_executor/composio_tool_executor.py | 2 +- .../tool_executor/mcp_tool_executor.py | 2 +- .../tool_executor/tool_execution_manager.py | 30 ++++- .../tool_executor/tool_execution_sandbox.py | 2 +- letta/services/tool_executor/tool_executor.py | 2 +- letta/services/tool_manager.py | 2 +- letta/services/tool_sandbox/e2b_sandbox.py | 2 +- letta/services/tool_sandbox/local_sandbox.py | 2 +- letta/services/user_manager.py | 2 +- letta/settings.py | 1 + .../otel-collector-config-clickhouse-dev.yaml | 14 +- 60 files changed, 499 insertions(+), 161 deletions(-) rename letta/{services/helpers/noop_helper.py => helpers/singleton.py} (73%) create mode 100644 letta/otel/__init__.py create mode 100644 letta/otel/context.py create mode 100644 letta/otel/events.py create mode 100644 letta/otel/metric_registry.py create mode 100644 letta/otel/metrics.py create mode 100644 letta/otel/resource.py rename letta/{ => otel}/tracing.py (66%) diff --git a/letta/agent.py b/letta/agent.py index 6571f736..f81e0f69 100644 --- a/letta/agent.py +++ b/letta/agent.py @@ -41,6 +41,7 @@ from letta.log import get_logger from letta.memory import summarize_messages from letta.orm import User from letta.orm.enums import ToolType +from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState, AgentStepResponse, UpdateAgent, get_prompt_template_for_agent_type from letta.schemas.block import BlockUpdate from letta.schemas.embedding_config import EmbeddingConfig @@ -72,7 +73,6 @@ from letta.services.tool_manager import ToolManager from letta.settings import settings, summarizer_settings from letta.streaming_interface import StreamingRefreshCLIInterface from letta.system import get_heartbeat, get_token_limit_warning, package_function_response, package_summarize_message, package_user_message -from letta.tracing import log_event, trace_method from letta.utils import count_tokens, get_friendly_error_msg, get_tool_call_id, log_telemetry, parse_json, validate_function_response logger = get_logger(__name__) diff --git a/letta/agents/letta_agent.py b/letta/agents/letta_agent.py index 59acf654..b259373f 100644 --- a/letta/agents/letta_agent.py +++ b/letta/agents/letta_agent.py @@ -16,7 +16,7 @@ from letta.agents.helpers import ( ) from letta.errors import ContextWindowExceededError from letta.helpers import ToolRulesSolver -from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.helpers.datetime_helpers import AsyncTimer, get_utc_timestamp_ns, ns_to_ms from letta.helpers.tool_execution_helper import enable_strict_mode from letta.interfaces.anthropic_streaming_interface import AnthropicStreamingInterface from letta.interfaces.openai_streaming_interface import OpenAIStreamingInterface @@ -25,6 +25,9 @@ from letta.llm_api.llm_client_base import LLMClientBase from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.log import get_logger from letta.orm.enums import ToolType +from letta.otel.context import get_ctx_attributes +from letta.otel.metric_registry import MetricRegistry +from letta.otel.tracing import log_event, trace_method, tracer from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole, MessageStreamStatus from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent @@ -48,7 +51,7 @@ from letta.services.telemetry_manager import NoopTelemetryManager, TelemetryMana from letta.services.tool_executor.tool_execution_manager import ToolExecutionManager from letta.settings import model_settings from letta.system import package_function_response -from letta.tracing import log_event, trace_method, tracer +from letta.types import JsonDict from letta.utils import log_telemetry, validate_function_response logger = get_logger(__name__) @@ -178,7 +181,7 @@ class LettaAgent(BaseAgent): # log llm request time now = get_utc_timestamp_ns() llm_request_ns = now - step_start - agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": llm_request_ns // 1_000_000}) + agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": ns_to_ms(llm_request_ns)}) response = llm_client.convert_response_to_chat_completion(response_data, in_context_messages, agent_state.llm_config) @@ -210,7 +213,7 @@ class LettaAgent(BaseAgent): # log LLM request time now = get_utc_timestamp_ns() llm_request_ns = now - step_start - agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": llm_request_ns // 1_000_000}) + agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": ns_to_ms(llm_request_ns)}) persisted_messages, should_continue = await self._handle_ai_response( tool_call, @@ -227,7 +230,7 @@ class LettaAgent(BaseAgent): # log step time now = get_utc_timestamp_ns() step_ns = now - step_start - agent_step_span.add_event(name="step_ms", attributes={"duration_ms": step_ns // 1_000_000}) + agent_step_span.add_event(name="step_ms", attributes={"duration_ms": ns_to_ms(step_ns)}) agent_step_span.end() # Log LLM Trace @@ -267,7 +270,7 @@ class LettaAgent(BaseAgent): if request_start_timestamp_ns: now = get_utc_timestamp_ns() request_ns = now - request_start_timestamp_ns - request_span.add_event(name="letta_request_ms", attributes={"duration_ms": request_ns // 1_000_000}) + request_span.add_event(name="letta_request_ms", attributes={"duration_ms": ns_to_ms(request_ns)}) request_span.end() # Return back usage @@ -321,7 +324,7 @@ class LettaAgent(BaseAgent): # log LLM request time now = get_utc_timestamp_ns() llm_request_ns = now - step_start - agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": llm_request_ns // 1_000_000}) + agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": ns_to_ms(llm_request_ns)}) # TODO: add run_id usage.step_count += 1 @@ -363,7 +366,7 @@ class LettaAgent(BaseAgent): # log step time now = get_utc_timestamp_ns() step_ns = now - step_start - agent_step_span.add_event(name="step_ms", attributes={"duration_ms": step_ns // 1_000_000}) + agent_step_span.add_event(name="step_ms", attributes={"duration_ms": ns_to_ms(step_ns)}) agent_step_span.end() # Log LLM Trace @@ -384,7 +387,7 @@ class LettaAgent(BaseAgent): if request_start_timestamp_ns: now = get_utc_timestamp_ns() request_ns = now - request_start_timestamp_ns - request_span.add_event(name="request_ms", attributes={"duration_ms": request_ns // 1_000_000}) + request_span.add_event(name="request_ms", attributes={"duration_ms": ns_to_ms(request_ns)}) request_span.end() # Extend the in context message ids @@ -480,7 +483,7 @@ class LettaAgent(BaseAgent): if first_chunk and request_span is not None: now = get_utc_timestamp_ns() ttft_ns = now - request_start_timestamp_ns - request_span.add_event(name="time_to_first_token_ms", attributes={"ttft_ms": ttft_ns // 1_000_000}) + request_span.add_event(name="time_to_first_token_ms", attributes={"ttft_ms": ns_to_ms(ttft_ns)}) first_chunk = False yield f"data: {chunk.model_dump_json()}\n\n" @@ -490,6 +493,9 @@ class LettaAgent(BaseAgent): usage.completion_tokens += interface.output_tokens usage.prompt_tokens += interface.input_tokens usage.total_tokens += interface.input_tokens + interface.output_tokens + MetricRegistry().message_output_tokens.record( + interface.output_tokens, dict(get_ctx_attributes(), **{"model.name": agent_state.llm_config.model}) + ) # Persist input messages if not already # Special strategy to lower TTFT @@ -500,7 +506,7 @@ class LettaAgent(BaseAgent): # log LLM request time now = get_utc_timestamp_ns() llm_request_ns = now - step_start - agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": llm_request_ns // 1_000_000}) + agent_step_span.add_event(name="llm_request_ms", attributes={"duration_ms": ns_to_ms(llm_request_ns)}) # Process resulting stream content tool_call = interface.get_tool_call_object() @@ -525,7 +531,7 @@ class LettaAgent(BaseAgent): # log total step time now = get_utc_timestamp_ns() step_ns = now - step_start - agent_step_span.add_event(name="step_ms", attributes={"duration_ms": step_ns // 1_000_000}) + agent_step_span.add_event(name="step_ms", attributes={"duration_ms": ns_to_ms(step_ns)}) agent_step_span.end() # TODO (cliandy): the stream POST request span has ended at this point, we should tie this to the stream @@ -576,7 +582,7 @@ class LettaAgent(BaseAgent): if request_start_timestamp_ns: now = get_utc_timestamp_ns() request_ns = now - request_start_timestamp_ns - request_span.add_event(name="letta_request_ms", attributes={"duration_ms": request_ns // 1_000_000}) + request_span.add_event(name="letta_request_ms", attributes={"duration_ms": ns_to_ms(request_ns)}) request_span.end() # TODO: Also yield out a letta usage stats SSE @@ -603,10 +609,16 @@ class LettaAgent(BaseAgent): ) log_event("agent.stream_no_tokens.llm_request.created") + async with AsyncTimer() as timer: + response = await llm_client.request_async(request_data, agent_state.llm_config) + MetricRegistry().llm_execution_time_ms_histogram.record( + timer.elapsed_ms, + dict(get_ctx_attributes(), **{"model.name": agent_state.llm_config.model}), + ) # Attempt LLM request return ( request_data, - await llm_client.request_async(request_data, agent_state.llm_config), + response, current_in_context_messages, new_in_context_messages, ) @@ -653,9 +665,7 @@ class LettaAgent(BaseAgent): if first_chunk and ttft_span is not None: provider_request_start_timestamp_ns = get_utc_timestamp_ns() provider_req_start_ns = provider_request_start_timestamp_ns - request_start_timestamp_ns - ttft_span.add_event( - name="provider_req_start_ns", attributes={"provider_req_start_ms": provider_req_start_ns // 1_000_000} - ) + ttft_span.add_event(name="provider_req_start_ns", attributes={"provider_req_start_ms": ns_to_ms(provider_req_start_ns)}) # Attempt LLM request return ( @@ -861,6 +871,7 @@ class LettaAgent(BaseAgent): tool_args=tool_args, agent_state=agent_state, agent_step_span=agent_step_span, + step_id=step_id, ) log_telemetry( self.logger, "_handle_ai_response execute tool finish", tool_execution_result=tool_execution_result, tool_call_id=tool_call_id @@ -938,10 +949,15 @@ class LettaAgent(BaseAgent): @trace_method async def _execute_tool( - self, tool_name: str, tool_args: dict, agent_state: AgentState, agent_step_span: Optional["Span"] = None + self, + tool_name: str, + tool_args: JsonDict, + agent_state: AgentState, + agent_step_span: Optional["Span"] = None, + step_id: str | None = None, ) -> "ToolExecutionResult": """ - Executes a tool and returns (result, success_flag). + Executes a tool and returns the ToolExecutionResult. """ from letta.schemas.tool_execution_result import ToolExecutionResult @@ -973,7 +989,10 @@ class LettaAgent(BaseAgent): # TODO: Integrate sandbox result log_event(name=f"start_{tool_name}_execution", attributes=tool_args) tool_execution_result = await tool_execution_manager.execute_tool_async( - function_name=tool_name, function_args=tool_args, tool=target_tool + function_name=tool_name, + function_args=tool_args, + tool=target_tool, + step_id=step_id, ) if agent_step_span: end_time = get_utc_timestamp_ns() @@ -981,7 +1000,7 @@ class LettaAgent(BaseAgent): name="tool_execution_completed", attributes={ "tool_name": target_tool.name, - "duration_ms": (end_time - start_time) // 1_000_000, + "duration_ms": ns_to_ms((end_time - start_time)), "success": tool_execution_result.success_flag, "tool_type": target_tool.tool_type, "tool_id": target_tool.id, diff --git a/letta/agents/letta_agent_batch.py b/letta/agents/letta_agent_batch.py index dd1c71d0..f0810533 100644 --- a/letta/agents/letta_agent_batch.py +++ b/letta/agents/letta_agent_batch.py @@ -16,6 +16,7 @@ from letta.llm_api.llm_client import LLMClient from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.log import get_logger from letta.orm.enums import ToolType +from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState, AgentStepState from letta.schemas.enums import AgentStepStatus, JobStatus, MessageStreamStatus, ProviderType from letta.schemas.job import JobUpdate @@ -39,7 +40,6 @@ from letta.services.passage_manager import PassageManager from letta.services.sandbox_config_manager import SandboxConfigManager from letta.services.tool_executor.tool_execution_manager import ToolExecutionManager from letta.settings import tool_settings -from letta.tracing import log_event, trace_method logger = get_logger(__name__) diff --git a/letta/agents/voice_sleeptime_agent.py b/letta/agents/voice_sleeptime_agent.py index f0e11d1e..c8ebeb98 100644 --- a/letta/agents/voice_sleeptime_agent.py +++ b/letta/agents/voice_sleeptime_agent.py @@ -3,6 +3,7 @@ from typing import AsyncGenerator, List, Optional, Tuple, Union from letta.agents.helpers import _create_letta_response, serialize_message_history from letta.agents.letta_agent import LettaAgent from letta.orm.enums import ToolType +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.block import BlockUpdate from letta.schemas.enums import MessageStreamStatus @@ -17,7 +18,7 @@ from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager from letta.services.summarizer.enums import SummarizationMode from letta.services.summarizer.summarizer import Summarizer -from letta.tracing import trace_method +from letta.types import JsonDict class VoiceSleeptimeAgent(LettaAgent): @@ -89,9 +90,16 @@ class VoiceSleeptimeAgent(LettaAgent): ) @trace_method - async def _execute_tool(self, tool_name: str, tool_args: dict, agent_state: AgentState, agent_step_span: Optional["Span"] = None): + async def _execute_tool( + self, + tool_name: str, + tool_args: JsonDict, + agent_state: AgentState, + agent_step_span: Optional["Span"] = None, + step_id: str | None = None, + ) -> "ToolExecutionResult": """ - Executes a tool and returns (result, success_flag). + Executes a tool and returns the ToolExecutionResult """ from letta.schemas.tool_execution_result import ToolExecutionResult diff --git a/letta/groups/sleeptime_multi_agent_v2.py b/letta/groups/sleeptime_multi_agent_v2.py index 7d46ae43..587a8ef8 100644 --- a/letta/groups/sleeptime_multi_agent_v2.py +++ b/letta/groups/sleeptime_multi_agent_v2.py @@ -5,6 +5,7 @@ from typing import AsyncGenerator, List, Optional from letta.agents.base_agent import BaseAgent from letta.agents.letta_agent import LettaAgent from letta.groups.helpers import stringify_message +from letta.otel.tracing import trace_method from letta.schemas.enums import JobStatus from letta.schemas.group import Group, ManagerType from letta.schemas.job import JobUpdate @@ -21,7 +22,6 @@ from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager from letta.services.step_manager import NoopStepManager, StepManager from letta.services.telemetry_manager import NoopTelemetryManager, TelemetryManager -from letta.tracing import trace_method class SleeptimeMultiAgentV2(BaseAgent): diff --git a/letta/helpers/datetime_helpers.py b/letta/helpers/datetime_helpers.py index 60d7d12a..9c5cd825 100644 --- a/letta/helpers/datetime_helpers.py +++ b/letta/helpers/datetime_helpers.py @@ -1,7 +1,9 @@ import re import time -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta +from datetime import timezone as dt_timezone from time import strftime +from typing import Callable import pytz @@ -66,7 +68,7 @@ def get_local_time(timezone=None): def get_utc_time() -> datetime: """Get the current UTC time""" # return datetime.now(pytz.utc) - return datetime.now(timezone.utc) + return datetime.now(dt_timezone.utc) def get_utc_time_int() -> int: @@ -78,9 +80,13 @@ def get_utc_timestamp_ns() -> int: return int(time.time_ns()) +def ns_to_ms(ns: int) -> int: + return ns // 1_000_000 + + def timestamp_to_datetime(timestamp_seconds: int) -> datetime: """Convert Unix timestamp in seconds to UTC datetime object""" - return datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc) + return datetime.fromtimestamp(timestamp_seconds, tz=dt_timezone.utc) def format_datetime(dt): @@ -105,3 +111,41 @@ def extract_date_from_timestamp(timestamp): def is_utc_datetime(dt: datetime) -> bool: return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) == timedelta(0) + + +class AsyncTimer: + """An async context manager for timing async code execution. + + Takes in an optional callback_func to call on exit with arguments + taking in the elapsed_ms and exc if present. + + Do not use the start and end times outside of this function as they are relative. + """ + + def __init__(self, callback_func: Callable | None = None): + self._start_time_ns = None + self._end_time_ns = None + self.elapsed_ns = None + self.callback_func = callback_func + + async def __aenter__(self): + self._start_time_ns = time.perf_counter_ns() + return self + + async def __aexit__(self, exc_type, exc, tb): + self._end_time_ns = time.perf_counter_ns() + self.elapsed_ns = self._end_time_ns - self._start_time_ns + if self.callback_func: + from asyncio import iscoroutinefunction + + if iscoroutinefunction(self.callback_func): + await self.callback_func(self.elapsed_ms, exc) + else: + self.callback_func(self.elapsed_ms, exc) + return False + + @property + def elapsed_ms(self): + if self.elapsed_ns is not None: + return ns_to_ms(self.elapsed_ns) + return None diff --git a/letta/services/helpers/noop_helper.py b/letta/helpers/singleton.py similarity index 73% rename from letta/services/helpers/noop_helper.py rename to letta/helpers/singleton.py index 7f32e628..1d382457 100644 --- a/letta/services/helpers/noop_helper.py +++ b/letta/helpers/singleton.py @@ -1,7 +1,12 @@ +# TODO (cliandy): consolidate with decorators later +from functools import wraps + + def singleton(cls): """Decorator to make a class a Singleton class.""" instances = {} + @wraps(cls) def get_instance(*args, **kwargs): if cls not in instances: instances[cls] = cls(*args, **kwargs) diff --git a/letta/interfaces/anthropic_streaming_interface.py b/letta/interfaces/anthropic_streaming_interface.py index 16622093..65489166 100644 --- a/letta/interfaces/anthropic_streaming_interface.py +++ b/letta/interfaces/anthropic_streaming_interface.py @@ -23,7 +23,7 @@ from anthropic.types.beta import ( ) from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG -from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.helpers.datetime_helpers import get_utc_timestamp_ns, ns_to_ms from letta.local_llm.constants import INNER_THOUGHTS_KWARG from letta.log import get_logger from letta.schemas.letta_message import ( @@ -131,7 +131,7 @@ class AnthropicStreamingInterface: now = get_utc_timestamp_ns() ttft_ns = now - provider_request_start_timestamp_ns ttft_span.add_event( - name="anthropic_time_to_first_token_ms", attributes={"anthropic_time_to_first_token_ms": ttft_ns // 1_000_000} + name="anthropic_time_to_first_token_ms", attributes={"anthropic_time_to_first_token_ms": ns_to_ms(ttft_ns)} ) first_chunk = False diff --git a/letta/interfaces/openai_streaming_interface.py b/letta/interfaces/openai_streaming_interface.py index 33ac6051..36c583c6 100644 --- a/letta/interfaces/openai_streaming_interface.py +++ b/letta/interfaces/openai_streaming_interface.py @@ -5,7 +5,7 @@ from openai import AsyncStream from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG -from letta.helpers.datetime_helpers import get_utc_timestamp_ns +from letta.helpers.datetime_helpers import get_utc_timestamp_ns, ns_to_ms from letta.schemas.letta_message import AssistantMessage, LettaMessage, ReasoningMessage, ToolCallDelta, ToolCallMessage from letta.schemas.letta_message_content import TextContent from letta.schemas.message import Message @@ -85,7 +85,7 @@ class OpenAIStreamingInterface: now = get_utc_timestamp_ns() ttft_ns = now - provider_request_start_timestamp_ns ttft_span.add_event( - name="openai_time_to_first_token_ms", attributes={"openai_time_to_first_token_ms": ttft_ns // 1_000_000} + name="openai_time_to_first_token_ms", attributes={"openai_time_to_first_token_ms": ns_to_ms(ttft_ns)} ) first_chunk = False diff --git a/letta/llm_api/anthropic.py b/letta/llm_api/anthropic.py index fadc652d..7d9cdf73 100644 --- a/letta/llm_api/anthropic.py +++ b/letta/llm_api/anthropic.py @@ -26,6 +26,7 @@ from letta.llm_api.helpers import add_inner_thoughts_to_functions from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages from letta.log import get_logger +from letta.otel.tracing import log_event from letta.schemas.enums import ProviderCategory from letta.schemas.message import Message as _Message from letta.schemas.message import MessageRole as _MessageRole @@ -45,7 +46,6 @@ from letta.services.provider_manager import ProviderManager from letta.services.user_manager import UserManager from letta.settings import model_settings from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface -from letta.tracing import log_event logger = get_logger(__name__) diff --git a/letta/llm_api/anthropic_client.py b/letta/llm_api/anthropic_client.py index 3a60199a..121955d7 100644 --- a/letta/llm_api/anthropic_client.py +++ b/letta/llm_api/anthropic_client.py @@ -27,6 +27,7 @@ from letta.llm_api.helpers import add_inner_thoughts_to_functions, unpack_all_in from letta.llm_api.llm_client_base import LLMClientBase from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.schemas.enums import ProviderCategory from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage @@ -36,7 +37,6 @@ from letta.schemas.openai.chat_completion_response import Message as ChoiceMessa from letta.schemas.openai.chat_completion_response import ToolCall, UsageStatistics from letta.services.provider_manager import ProviderManager from letta.settings import model_settings -from letta.tracing import trace_method DUMMY_FIRST_USER_MESSAGE = "User initializing bootup sequence." diff --git a/letta/llm_api/google_vertex_client.py b/letta/llm_api/google_vertex_client.py index 60e800a6..25b1e00f 100644 --- a/letta/llm_api/google_vertex_client.py +++ b/letta/llm_api/google_vertex_client.py @@ -12,12 +12,12 @@ from letta.llm_api.llm_client_base import LLMClientBase from letta.local_llm.json_parser import clean_json_string_extra_backslash from letta.local_llm.utils import count_tokens from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage from letta.schemas.openai.chat_completion_request import Tool from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, Choice, FunctionCall, Message, ToolCall, UsageStatistics from letta.settings import model_settings, settings -from letta.tracing import trace_method from letta.utils import get_tool_call_id logger = get_logger(__name__) diff --git a/letta/llm_api/llm_api_tools.py b/letta/llm_api/llm_api_tools.py index a1af262f..62d3bf3b 100644 --- a/letta/llm_api/llm_api_tools.py +++ b/letta/llm_api/llm_api_tools.py @@ -26,6 +26,7 @@ from letta.local_llm.chat_completion_proxy import get_chat_completion from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages from letta.orm.user import User +from letta.otel.tracing import log_event, trace_method from letta.schemas.enums import ProviderCategory from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message @@ -35,7 +36,6 @@ from letta.schemas.provider_trace import ProviderTraceCreate from letta.services.telemetry_manager import TelemetryManager from letta.settings import ModelSettings from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface -from letta.tracing import log_event, trace_method LLM_API_PROVIDER_OPTIONS = ["openai", "azure", "anthropic", "google_ai", "cohere", "local", "groq", "deepseek"] diff --git a/letta/llm_api/llm_client_base.py b/letta/llm_api/llm_client_base.py index e88b6081..bd4054f4 100644 --- a/letta/llm_api/llm_client_base.py +++ b/letta/llm_api/llm_client_base.py @@ -6,13 +6,13 @@ from openai import AsyncStream, Stream from openai.types.chat.chat_completion_chunk import ChatCompletionChunk from letta.errors import LLMError +from letta.otel.tracing import log_event, trace_method from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message from letta.schemas.openai.chat_completion_response import ChatCompletionResponse from letta.schemas.provider_trace import ProviderTraceCreate from letta.services.telemetry_manager import TelemetryManager -from letta.tracing import log_event, trace_method if TYPE_CHECKING: from letta.orm import User diff --git a/letta/llm_api/openai.py b/letta/llm_api/openai.py index fe3b77cc..7372b1d0 100644 --- a/letta/llm_api/openai.py +++ b/letta/llm_api/openai.py @@ -19,6 +19,7 @@ from letta.llm_api.openai_client import ( from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION, INNER_THOUGHTS_KWARG_DESCRIPTION_GO_FIRST from letta.local_llm.utils import num_tokens_from_functions, num_tokens_from_messages from letta.log import get_logger +from letta.otel.tracing import log_event from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as _Message from letta.schemas.message import MessageRole as _MessageRole @@ -36,7 +37,6 @@ from letta.schemas.openai.chat_completion_response import ( ) from letta.schemas.openai.embedding_response import EmbeddingResponse from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface -from letta.tracing import log_event from letta.utils import get_tool_call_id, smart_urljoin logger = get_logger(__name__) diff --git a/letta/llm_api/openai_client.py b/letta/llm_api/openai_client.py index 7be16835..737466e8 100644 --- a/letta/llm_api/openai_client.py +++ b/letta/llm_api/openai_client.py @@ -23,6 +23,7 @@ from letta.llm_api.helpers import add_inner_thoughts_to_functions, convert_to_st from letta.llm_api.llm_client_base import LLMClientBase from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION, INNER_THOUGHTS_KWARG_DESCRIPTION_GO_FIRST from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.schemas.embedding_config import EmbeddingConfig from letta.schemas.enums import ProviderCategory, ProviderType from letta.schemas.llm_config import LLMConfig @@ -34,7 +35,6 @@ from letta.schemas.openai.chat_completion_request import Tool as OpenAITool from letta.schemas.openai.chat_completion_request import ToolFunctionChoice, cast_message_to_subtype from letta.schemas.openai.chat_completion_response import ChatCompletionResponse from letta.settings import model_settings -from letta.tracing import trace_method logger = get_logger(__name__) diff --git a/letta/local_llm/chat_completion_proxy.py b/letta/local_llm/chat_completion_proxy.py index 35db97ed..214e0487 100644 --- a/letta/local_llm/chat_completion_proxy.py +++ b/letta/local_llm/chat_completion_proxy.py @@ -20,9 +20,9 @@ from letta.local_llm.utils import count_tokens, get_available_wrappers from letta.local_llm.vllm.api import get_vllm_completion from letta.local_llm.webui.api import get_webui_completion from letta.local_llm.webui.legacy_api import get_webui_completion as get_webui_completion_legacy +from letta.otel.tracing import log_event from letta.prompts.gpt_summarize import SYSTEM as SUMMARIZE_SYSTEM_MESSAGE from letta.schemas.openai.chat_completion_response import ChatCompletionResponse, Choice, Message, ToolCall, UsageStatistics -from letta.tracing import log_event from letta.utils import get_tool_call_id has_shown_warning = False diff --git a/letta/memory.py b/letta/memory.py index 818f45ca..c6a03c14 100644 --- a/letta/memory.py +++ b/letta/memory.py @@ -3,6 +3,7 @@ from typing import TYPE_CHECKING, Callable, Dict, List from letta.constants import MESSAGE_SUMMARY_REQUEST_ACK from letta.llm_api.llm_api_tools import create from letta.llm_api.llm_client import LLMClient +from letta.otel.tracing import trace_method from letta.prompts.gpt_summarize import SYSTEM as SUMMARY_PROMPT_SYSTEM from letta.schemas.agent import AgentState from letta.schemas.enums import MessageRole @@ -10,7 +11,6 @@ from letta.schemas.letta_message_content import TextContent from letta.schemas.memory import Memory from letta.schemas.message import Message from letta.settings import summarizer_settings -from letta.tracing import trace_method from letta.utils import count_tokens, printd if TYPE_CHECKING: diff --git a/letta/otel/__init__.py b/letta/otel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/letta/otel/context.py b/letta/otel/context.py new file mode 100644 index 00000000..4c83b98c --- /dev/null +++ b/letta/otel/context.py @@ -0,0 +1,26 @@ +from contextvars import ContextVar +from typing import Any, Dict + +# Create context var at module level (outside middleware) +request_attributes: ContextVar[Dict[str, Any]] = ContextVar("request_attributes", default={}) + + +# Helper functions +def set_ctx_attributes(attrs: Dict[str, Any]): + """Set attributes in current context""" + current = request_attributes.get() + new_attrs = {**current, **attrs} + request_attributes.set(new_attrs) + + +def add_ctx_attribute(key: str, value: Any): + """Add single attribute to current context""" + current = request_attributes.get() + new_attrs = {**current, key: value} + request_attributes.set(new_attrs) + + +def get_ctx_attributes() -> Dict[str, Any]: + """Get all attributes from current context""" + print(request_attributes.get()) + return request_attributes.get() diff --git a/letta/otel/events.py b/letta/otel/events.py new file mode 100644 index 00000000..e69de29b diff --git a/letta/otel/metric_registry.py b/letta/otel/metric_registry.py new file mode 100644 index 00000000..60ece713 --- /dev/null +++ b/letta/otel/metric_registry.py @@ -0,0 +1,122 @@ +from dataclasses import dataclass, field +from functools import partial + +from opentelemetry import metrics +from opentelemetry.metrics import Counter, Histogram + +from letta.helpers.singleton import singleton +from letta.otel.metrics import get_letta_meter + + +@singleton +@dataclass(frozen=True) +class MetricRegistry: + """Registry of all application metrics + + Metrics are composed of the following: + - name + - description + - unit: UCUM unit of the metric (i.e. 'By' for bytes, 'ms' for milliseconds, '1' for count + - bucket_bounds (list[float] | None): the explicit bucket bounds for histogram metrics + + and instruments are of types Counter, Histogram, and Gauge + + The relationship between the various models is as follows: + project_id -N:1-> base_template_id -N:1-> template_id -N:1-> agent_id + agent_id -1:1+-> model_name + agent_id -1:N -> tool_name + """ + + Instrument = Counter | Histogram + _metrics: dict[str, Instrument] = field(default_factory=dict, init=False) + _meter: metrics.Meter = field(init=False) + + def __post_init__(self): + object.__setattr__(self, "_meter", get_letta_meter()) + + def _get_or_create_metric(self, name: str, factory): + """Lazy initialization of metrics.""" + if name not in self._metrics: + self._metrics[name] = factory() + return self._metrics[name] + + # (includes base attributes: project, template_base, template, agent) + @property + def user_message_counter(self) -> Counter: + return self._get_or_create_metric( + "count_user_message", + partial( + self._meter.create_counter, + name="count_user_message", + description="Counts the number of messages sent by the user", + unit="1", + ), + ) + + # (includes tool_name, tool_execution_success, & step_id on failure) + @property + def tool_execution_counter(self) -> Counter: + return self._get_or_create_metric( + "count_tool_execution", + partial(self._meter.create_counter, name="count_tool_execution", description="Counts the number of tools executed.", unit="1"), + ) + + # project_id + model + @property + def ttft_ms_histogram(self) -> Histogram: + return self._get_or_create_metric( + "hist_ttft_ms", + partial(self._meter.create_histogram, name="hist_ttft_ms", description="Histogram for the Time to First Token (ms)", unit="ms"), + ) + + # (includes model name) + @property + def llm_execution_time_ms_histogram(self) -> Histogram: + return self._get_or_create_metric( + "hist_llm_execution_time_ms", + partial( + self._meter.create_histogram, + name="hist_llm_execution_time_ms", + description="Histogram for LLM execution time (ms)", + unit="ms", + ), + ) + + # (includes tool name) + @property + def tool_execution_time_ms_histogram(self) -> Histogram: + return self._get_or_create_metric( + "hist_tool_execution_time_ms", + partial( + self._meter.create_histogram, + name="hist_tool_execution_time_ms", + description="Histogram for tool execution time (ms)", + unit="ms", + ), + ) + + # TODO (cliandy): instrument this + @property + def message_cost(self) -> Histogram: + return self._get_or_create_metric( + "hist_message_cost_usd", + partial( + self._meter.create_histogram, + name="hist_message_cost_usd", + description="Histogram for cost of messages (usd) per step", + unit="usd", + ), + ) + + # (includes model name) + @property + def message_output_tokens(self) -> Histogram: + return self._get_or_create_metric( + "hist_message_output_tokens", + partial( + self._meter.create_histogram, + name="hist_message_output_tokens", + description="Histogram for output tokens generated by LLM per step", + unit="1", + ), + ) diff --git a/letta/otel/metrics.py b/letta/otel/metrics.py new file mode 100644 index 00000000..8e11f291 --- /dev/null +++ b/letta/otel/metrics.py @@ -0,0 +1,66 @@ +from fastapi import FastAPI, Request +from opentelemetry import metrics +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.metrics import NoOpMeter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +from letta.log import get_logger +from letta.otel.context import add_ctx_attribute +from letta.otel.resource import get_resource, is_pytest_environment + +logger = get_logger(__name__) + +_meter: metrics.Meter = NoOpMeter("noop") +_is_metrics_initialized: bool = False + + +async def _otel_metric_middleware(request: Request, call_next): + if not _is_metrics_initialized: + return await call_next(request) + + header_attributes = { + "x-organization-id": "organization.id", + "x-project-id": "project.id", + "x-base-template-id": "base_template.id", + "x-template-id": "template.id", + "x-agent-id": "agent.id", + } + try: + for header_key, otel_key in header_attributes.items(): + header_value = request.headers.get(header_key) + if header_value: + add_ctx_attribute(otel_key, header_value) + return await call_next(request) + except Exception: + raise + + +def setup_metrics( + endpoint: str, + app: FastAPI | None = None, + service_name: str = "memgpt-server", +) -> None: + if is_pytest_environment(): + return + assert endpoint + + global _is_metrics_initialized, _meter + + otlp_metric_exporter = OTLPMetricExporter(endpoint=endpoint) + metric_reader = PeriodicExportingMetricReader(exporter=otlp_metric_exporter) + meter_provider = MeterProvider(resource=get_resource(service_name), metric_readers=[metric_reader]) + metrics.set_meter_provider(meter_provider) + _meter = metrics.get_meter(__name__) + + if app: + app.middleware("http")(_otel_metric_middleware) + + _is_metrics_initialized = True + + +def get_letta_meter() -> metrics.Meter | None: + """Returns the global letta meter if metrics are initialized.""" + if not _is_metrics_initialized or isinstance(_meter, NoOpMeter): + logger.warning("Metrics are not initialized or meter is not available.") + return _meter diff --git a/letta/otel/resource.py b/letta/otel/resource.py new file mode 100644 index 00000000..d11e3104 --- /dev/null +++ b/letta/otel/resource.py @@ -0,0 +1,26 @@ +import os +import sys +import uuid + +from opentelemetry.sdk.resources import Resource + +from letta import __version__ as letta_version + +_resources = {} + + +def get_resource(service_name: str) -> Resource: + _env = os.getenv("LETTA_ENVIRONMENT") + if service_name not in _resources: + resource_dict = { + "service.name": service_name, + "letta.version": letta_version, + } + if _env != "PRODUCTION": + resource_dict["device.id"] = uuid.getnode() # MAC address as unique device identifier, + _resources[(service_name, _env)] = Resource.create(resource_dict) + return _resources[(service_name, _env)] + + +def is_pytest_environment(): + return "pytest" in sys.modules diff --git a/letta/tracing.py b/letta/otel/tracing.py similarity index 66% rename from letta/tracing.py rename to letta/otel/tracing.py index de3e4c5e..05cd485a 100644 --- a/letta/tracing.py +++ b/letta/otel/tracing.py @@ -1,6 +1,5 @@ import inspect import re -import sys import time from functools import wraps from typing import Any, Dict, List, Optional @@ -11,15 +10,18 @@ from fastapi.responses import JSONResponse from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.requests import RequestsInstrumentor -from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.trace import Status, StatusCode -from letta import __version__ as letta_version +from letta.log import get_logger +from letta.otel.resource import get_resource, is_pytest_environment +from letta.settings import settings +logger = get_logger(__name__) # TODO: set up logger config for this tracer = trace.get_tracer(__name__) _is_tracing_initialized = False + _excluded_v1_endpoints_regex: List[str] = [ # "^GET /v1/agents/(?P[^/]+)/messages$", # "^GET /v1/agents/(?P[^/]+)/context$", @@ -30,11 +32,7 @@ _excluded_v1_endpoints_regex: List[str] = [ ] -def is_pytest_environment(): - return "pytest" in sys.modules - - -async def trace_request_middleware(request: Request, call_next): +async def _trace_request_middleware(request: Request, call_next): if not _is_tracing_initialized: return await call_next(request) initial_span_name = f"{request.method} {request.url.path}" @@ -56,7 +54,7 @@ async def trace_request_middleware(request: Request, call_next): raise -async def update_trace_attributes(request: Request): +async def _update_trace_attributes(request: Request): """Dependency to update trace attributes after FastAPI has processed the request""" if not _is_tracing_initialized: return @@ -78,35 +76,19 @@ async def update_trace_attributes(request: Request): for key, value in request.path_params.items(): span.set_attribute(f"http.{key}", value) - # Add user ID if available - user_id = request.headers.get("user_id") - if user_id: - span.set_attribute("user.id", user_id) - - # Add organization_id if available - organization_id = request.headers.get("x-organization-id") - if organization_id: - span.set_attribute("organization.id", organization_id) - - # Add project_id if available - project_id = request.headers.get("x-project-id") - if project_id: - span.set_attribute("project.id", project_id) - - # Add agent_id if available - agent_id = request.headers.get("x-agent-id") - if agent_id: - span.set_attribute("agent.id", agent_id) - - # Add template_id if available - template_id = request.headers.get("x-template-id") - if template_id: - span.set_attribute("template.id", template_id) - - # Add base_template_id if available - base_template_id = request.headers.get("x-base-template-id") - if base_template_id: - span.set_attribute("base_template.id", base_template_id) + # Add the following headers to span if available + header_attributes = { + "user_id": "user.id", + "x-organization-id": "organization.id", + "x-project-id": "project.id", + "x-agent-id": "agent.id", + "x-template-id": "template.id", + "x-base-template-id": "base_template.id", + } + for header_key, span_key in header_attributes.items(): + header_value = request.headers.get(header_key) + if header_value: + span.set_attribute(span_key, header_value) # Add request body if available try: @@ -117,7 +99,7 @@ async def update_trace_attributes(request: Request): pass -async def trace_error_handler(_request: Request, exc: Exception) -> JSONResponse: +async def _trace_error_handler(_request: Request, exc: Exception) -> JSONResponse: status_code = getattr(exc, "status_code", 500) error_msg = str(exc) @@ -142,49 +124,44 @@ def setup_tracing( ) -> None: if is_pytest_environment(): return + assert endpoint global _is_tracing_initialized - provider = TracerProvider(resource=Resource.create({"service.name": service_name})) - import uuid + tracer_provider = TracerProvider(resource=get_resource(service_name)) + tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint))) + _is_tracing_initialized = True + trace.set_tracer_provider(tracer_provider) - provider = TracerProvider( - resource=Resource.create( - { - "service.name": service_name, - "device.id": uuid.getnode(), # MAC address as unique device identifier, - "letta.version": letta_version, - } - ) - ) - if endpoint: - provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint))) - _is_tracing_initialized = True - trace.set_tracer_provider(provider) + # Instrumentors (e.g., RequestsInstrumentor) + def requests_callback(span: trace.Span, _: Any, response: Any) -> None: + if hasattr(response, "status_code"): + span.set_status(Status(StatusCode.OK if response.status_code < 400 else StatusCode.ERROR)) - def requests_callback(span: trace.Span, _: Any, response: Any) -> None: - if hasattr(response, "status_code"): - span.set_status(Status(StatusCode.OK if response.status_code < 400 else StatusCode.ERROR)) + RequestsInstrumentor().instrument(response_hook=requests_callback) - RequestsInstrumentor().instrument(response_hook=requests_callback) + if settings.sqlalchemy_tracing: + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor - if app: - # Add middleware first - app.middleware("http")(trace_request_middleware) + SQLAlchemyInstrumentor().instrument() - # Add dependency to v1 routes - from letta.server.rest_api.routers.v1 import ROUTERS as v1_routes + if app: + # Add middleware first + app.middleware("http")(_trace_request_middleware) - for router in v1_routes: - for route in router.routes: - full_path = ((next(iter(route.methods)) + " ") if route.methods else "") + "/v1" + route.path - if not any(re.match(regex, full_path) for regex in _excluded_v1_endpoints_regex): - route.dependencies.append(Depends(update_trace_attributes)) + # Add dependency to v1 routes + from letta.server.rest_api.routers.v1 import ROUTERS as V1_ROUTES - # Register exception handlers - app.exception_handler(HTTPException)(trace_error_handler) - app.exception_handler(RequestValidationError)(trace_error_handler) - app.exception_handler(Exception)(trace_error_handler) + for router in V1_ROUTES: + for route in router.routes: + full_path = ((next(iter(route.methods)) + " ") if route.methods else "") + "/v1" + route.path + if not any(re.match(regex, full_path) for regex in _excluded_v1_endpoints_regex): + route.dependencies.append(Depends(_update_trace_attributes)) + + # Register exception handlers for tracing + app.exception_handler(HTTPException)(_trace_error_handler) + app.exception_handler(RequestValidationError)(_trace_error_handler) + app.exception_handler(Exception)(_trace_error_handler) def trace_method(func): diff --git a/letta/server/db.py b/letta/server/db.py index f50b0988..cf423c59 100644 --- a/letta/server/db.py +++ b/letta/server/db.py @@ -13,8 +13,8 @@ from sqlalchemy.orm import sessionmaker from letta.config import LettaConfig from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.settings import settings -from letta.tracing import trace_method logger = get_logger(__name__) diff --git a/letta/server/rest_api/app.py b/letta/server/rest_api/app.py index 944fc2bd..d7fecc74 100644 --- a/letta/server/rest_api/app.py +++ b/letta/server/rest_api/app.py @@ -256,13 +256,15 @@ def create_application() -> "FastAPI": print(f"▶ Using OTLP tracing with endpoint: {otlp_endpoint}") env_name_suffix = os.getenv("ENV_NAME") service_name = f"letta-server-{env_name_suffix.lower()}" if env_name_suffix else "letta-server" - from letta.tracing import setup_tracing + from letta.otel.metrics import setup_metrics + from letta.otel.tracing import setup_tracing setup_tracing( endpoint=otlp_endpoint, app=app, service_name=service_name, ) + setup_metrics(endpoint=otlp_endpoint, app=app, service_name=service_name) for route in v1_routes: app.include_router(route, prefix=API_PREFIX) @@ -339,14 +341,14 @@ def start_server( target="letta.server.rest_api.app:app", # factory=True, interface="asgi", - address=host or "localhost", + address=host or "127.0.0.1", # Note granian address must be an ip address port=port or REST_DEFAULT_PORT, workers=settings.uvicorn_workers, # threads= reload=reload or settings.uvicorn_reload, reload_ignore_patterns=["openapi_letta.json"], reload_ignore_worker_failure=True, - reload_tick=100, + reload_tick=4000, # set to 4s to prevent crashing on weird state # log_level="info" ssl_keyfile="certs/localhost-key.pem", ssl_cert="certs/localhost.pem", @@ -380,14 +382,14 @@ def start_server( target="letta.server.rest_api.app:app", # factory=True, interface="asgi", - address=host or "localhost", + address=host or "127.0.0.1", # Note granian address must be an ip address port=port or REST_DEFAULT_PORT, workers=settings.uvicorn_workers, # threads= reload=reload or settings.uvicorn_reload, reload_ignore_patterns=["openapi_letta.json"], reload_ignore_worker_failure=True, - reload_tick=100, + reload_tick=4000, # set to 4s to prevent crashing on weird state # log_level="info" ).serve() else: diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index fc1dbfca..5d1873c0 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -17,6 +17,8 @@ from letta.groups.sleeptime_multi_agent_v2 import SleeptimeMultiAgentV2 from letta.helpers.datetime_helpers import get_utc_timestamp_ns from letta.log import get_logger from letta.orm.errors import NoResultFound +from letta.otel.context import get_ctx_attributes +from letta.otel.metric_registry import MetricRegistry from letta.schemas.agent import AgentState, AgentType, CreateAgent, UpdateAgent from letta.schemas.block import Block, BlockUpdate from letta.schemas.group import Group @@ -663,6 +665,8 @@ async def send_message( Process a user message and return the agent's response. This endpoint accepts a message from a user and processes it through the agent. """ + MetricRegistry().user_message_counter.add(1, get_ctx_attributes()) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) request_start_timestamp_ns = get_utc_timestamp_ns() # TODO: This is redundant, remove soon @@ -741,7 +745,8 @@ async def send_message_streaming( This endpoint accepts a message from a user and processes it through the agent. It will stream the steps of the response always, and stream the tokens if 'stream_tokens' is set to True. """ - request_start_timestamp_ns = get_utc_timestamp_ns() + MetricRegistry().user_message_counter.add(1, get_ctx_attributes()) + actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) # TODO: This is redundant, remove soon agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"]) @@ -877,6 +882,7 @@ async def send_message_async( Asynchronously process a user message and return a run object. The actual processing happens in the background, and the status can be checked using the run ID. """ + MetricRegistry().user_message_counter.add(1, get_ctx_attributes()) actor = await server.user_manager.get_actor_or_default_async(actor_id=actor_id) # Create a new job diff --git a/letta/server/rest_api/utils.py b/letta/server/rest_api/utils.py index cd00af35..f5ba0cbd 100644 --- a/letta/server/rest_api/utils.py +++ b/letta/server/rest_api/utils.py @@ -15,9 +15,12 @@ from pydantic import BaseModel from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG, FUNC_FAILED_HEARTBEAT_MESSAGE, REQ_HEARTBEAT_MESSAGE from letta.errors import ContextWindowExceededError, RateLimitExceededError -from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns +from letta.helpers.datetime_helpers import get_utc_time, get_utc_timestamp_ns, ns_to_ms from letta.helpers.message_helper import convert_message_creates_to_messages from letta.log import get_logger +from letta.otel.context import get_ctx_attributes +from letta.otel.metric_registry import MetricRegistry +from letta.otel.tracing import tracer from letta.schemas.enums import MessageRole from letta.schemas.letta_message_content import OmittedReasoningContent, ReasoningContent, RedactedReasoningContent, TextContent from letta.schemas.llm_config import LLMConfig @@ -27,7 +30,6 @@ from letta.schemas.usage import LettaUsageStatistics from letta.schemas.user import User from letta.server.rest_api.interface import StreamingServerInterface from letta.system import get_heartbeat, package_function_response -from letta.tracing import tracer if TYPE_CHECKING: from letta.server.server import SyncServer @@ -81,8 +83,12 @@ async def sse_async_generator( if first_chunk and ttft_span is not None: now = get_utc_timestamp_ns() ttft_ns = now - request_start_timestamp_ns - ttft_span.add_event(name="time_to_first_token_ms", attributes={"ttft_ms": ttft_ns // 1_000_000}) + ttft_span.add_event(name="time_to_first_token_ms", attributes={"ttft_ms": ns_to_ms(ttft_ns)}) ttft_span.end() + metric_attributes = get_ctx_attributes() + if llm_config: + metric_attributes["model.name"] = llm_config.model + MetricRegistry().ttft_ms_histogram.record(ns_to_ms(ttft_ns), metric_attributes) first_chunk = False # yield f"data: {json.dumps(chunk)}\n\n" diff --git a/letta/server/server.py b/letta/server/server.py index 347a9735..8ee45339 100644 --- a/letta/server/server.py +++ b/letta/server/server.py @@ -34,6 +34,7 @@ from letta.interface import AgentInterface # abstract from letta.interface import CLIInterface # for printing to terminal from letta.log import get_logger from letta.orm.errors import NoResultFound +from letta.otel.tracing import log_event, trace_method from letta.prompts.gpt_system import get_system_text from letta.schemas.agent import AgentState, AgentType, CreateAgent, UpdateAgent from letta.schemas.block import Block, BlockUpdate, CreateBlock @@ -101,7 +102,6 @@ from letta.services.tool_executor.tool_execution_manager import ToolExecutionMan from letta.services.tool_manager import ToolManager from letta.services.user_manager import UserManager from letta.settings import model_settings, settings, tool_settings -from letta.tracing import log_event, trace_method from letta.utils import get_friendly_error_msg, get_persona_text, make_key config = LettaConfig.load() diff --git a/letta/services/agent_manager.py b/letta/services/agent_manager.py index 2f6c5612..0ceb9f58 100644 --- a/letta/services/agent_manager.py +++ b/letta/services/agent_manager.py @@ -37,6 +37,7 @@ from letta.orm.errors import NoResultFound from letta.orm.sandbox_config import AgentEnvironmentVariable from letta.orm.sandbox_config import AgentEnvironmentVariable as AgentEnvironmentVariableModel from letta.orm.sqlalchemy_base import AccessType +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState as PydanticAgentState from letta.schemas.agent import AgentType, CreateAgent, UpdateAgent, get_prompt_template_for_agent_type from letta.schemas.block import DEFAULT_BLOCKS @@ -86,7 +87,6 @@ from letta.services.message_manager import MessageManager from letta.services.passage_manager import PassageManager from letta.services.source_manager import SourceManager from letta.services.tool_manager import ToolManager -from letta.tracing import trace_method from letta.utils import enforce_types, united_diff logger = get_logger(__name__) diff --git a/letta/services/block_manager.py b/letta/services/block_manager.py index ce731125..12156cd4 100644 --- a/letta/services/block_manager.py +++ b/letta/services/block_manager.py @@ -9,12 +9,12 @@ from letta.orm.block import Block as BlockModel from letta.orm.block_history import BlockHistory from letta.orm.enums import ActorType from letta.orm.errors import NoResultFound +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState as PydanticAgentState from letta.schemas.block import Block as PydanticBlock from letta.schemas.block import BlockUpdate from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types logger = get_logger(__name__) diff --git a/letta/services/files_agents_manager.py b/letta/services/files_agents_manager.py index cb174864..a9bb7a6d 100644 --- a/letta/services/files_agents_manager.py +++ b/letta/services/files_agents_manager.py @@ -5,11 +5,11 @@ from sqlalchemy import and_, func, select, update from letta.orm.errors import NoResultFound from letta.orm.files_agents import FileAgent as FileAgentModel +from letta.otel.tracing import trace_method from letta.schemas.block import Block as PydanticBlock from letta.schemas.file import FileAgent as PydanticFileAgent from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/group_manager.py b/letta/services/group_manager.py index 1884e5d0..57f532fb 100644 --- a/letta/services/group_manager.py +++ b/letta/services/group_manager.py @@ -7,13 +7,13 @@ from letta.orm.agent import Agent as AgentModel from letta.orm.errors import NoResultFound from letta.orm.group import Group as GroupModel from letta.orm.message import Message as MessageModel +from letta.otel.tracing import trace_method from letta.schemas.group import Group as PydanticGroup from letta.schemas.group import GroupCreate, GroupUpdate, ManagerType from letta.schemas.letta_message import LettaMessage from letta.schemas.message import Message as PydanticMessage from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/helpers/agent_manager_helper.py b/letta/services/helpers/agent_manager_helper.py index 3513a89b..7d83b1ab 100644 --- a/letta/services/helpers/agent_manager_helper.py +++ b/letta/services/helpers/agent_manager_helper.py @@ -16,6 +16,7 @@ from letta.orm.agents_tags import AgentsTags from letta.orm.errors import NoResultFound from letta.orm.identity import Identity from letta.orm.sqlite_functions import adapt_array +from letta.otel.tracing import trace_method from letta.prompts import gpt_system from letta.schemas.agent import AgentState, AgentType from letta.schemas.embedding_config import EmbeddingConfig @@ -27,7 +28,6 @@ from letta.schemas.tool_rule import ToolRule from letta.schemas.user import User from letta.settings import settings from letta.system import get_initial_boot_messages, get_login_event, package_function_response -from letta.tracing import trace_method # Static methods diff --git a/letta/services/identity_manager.py b/letta/services/identity_manager.py index f91854fd..abf637ba 100644 --- a/letta/services/identity_manager.py +++ b/letta/services/identity_manager.py @@ -7,11 +7,11 @@ from sqlalchemy.exc import NoResultFound from letta.orm.agent import Agent as AgentModel from letta.orm.block import Block as BlockModel from letta.orm.identity import Identity as IdentityModel +from letta.otel.tracing import trace_method from letta.schemas.identity import Identity as PydanticIdentity from letta.schemas.identity import IdentityCreate, IdentityProperty, IdentityType, IdentityUpdate, IdentityUpsert from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/job_manager.py b/letta/services/job_manager.py index 11361b35..541d7514 100644 --- a/letta/services/job_manager.py +++ b/letta/services/job_manager.py @@ -14,6 +14,7 @@ from letta.orm.message import Message as MessageModel from letta.orm.sqlalchemy_base import AccessType from letta.orm.step import Step from letta.orm.step import Step as StepModel +from letta.otel.tracing import trace_method from letta.schemas.enums import JobStatus, MessageRole from letta.schemas.job import BatchJob as PydanticBatchJob from letta.schemas.job import Job as PydanticJob @@ -25,7 +26,6 @@ from letta.schemas.step import Step as PydanticStep from letta.schemas.usage import LettaUsageStatistics from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/llm_batch_manager.py b/letta/services/llm_batch_manager.py index eeff0673..4700d146 100644 --- a/letta/services/llm_batch_manager.py +++ b/letta/services/llm_batch_manager.py @@ -9,6 +9,7 @@ from letta.log import get_logger from letta.orm import Message as MessageModel from letta.orm.llm_batch_items import LLMBatchItem from letta.orm.llm_batch_job import LLMBatchJob +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentStepState from letta.schemas.enums import AgentStepStatus, JobStatus, ProviderType from letta.schemas.llm_batch_job import LLMBatchItem as PydanticLLMBatchItem @@ -17,7 +18,6 @@ from letta.schemas.llm_config import LLMConfig from letta.schemas.message import Message as PydanticMessage from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types logger = get_logger(__name__) diff --git a/letta/services/message_manager.py b/letta/services/message_manager.py index 01763689..45a6fc6f 100644 --- a/letta/services/message_manager.py +++ b/letta/services/message_manager.py @@ -7,13 +7,13 @@ from letta.log import get_logger from letta.orm.agent import Agent as AgentModel from letta.orm.errors import NoResultFound from letta.orm.message import Message as MessageModel +from letta.otel.tracing import trace_method from letta.schemas.enums import MessageRole from letta.schemas.letta_message import LettaMessageUpdateUnion from letta.schemas.message import Message as PydanticMessage from letta.schemas.message import MessageUpdate from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types logger = get_logger(__name__) diff --git a/letta/services/organization_manager.py b/letta/services/organization_manager.py index 08f8f70a..23fe92c8 100644 --- a/letta/services/organization_manager.py +++ b/letta/services/organization_manager.py @@ -2,10 +2,10 @@ from typing import List, Optional from letta.orm.errors import NoResultFound from letta.orm.organization import Organization as OrganizationModel +from letta.otel.tracing import trace_method from letta.schemas.organization import Organization as PydanticOrganization from letta.schemas.organization import OrganizationUpdate from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/passage_manager.py b/letta/services/passage_manager.py index 99f7d6c4..b60e2e1b 100644 --- a/letta/services/passage_manager.py +++ b/letta/services/passage_manager.py @@ -11,11 +11,11 @@ from letta.constants import MAX_EMBEDDING_DIM from letta.embeddings import embedding_model, parse_and_chunk_text from letta.orm.errors import NoResultFound from letta.orm.passage import AgentPassage, SourcePassage +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.passage import Passage as PydanticPassage from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/per_agent_lock_manager.py b/letta/services/per_agent_lock_manager.py index e8e2a0a4..aff76a1f 100644 --- a/letta/services/per_agent_lock_manager.py +++ b/letta/services/per_agent_lock_manager.py @@ -1,7 +1,7 @@ import threading from collections import defaultdict -from letta.tracing import trace_method +from letta.otel.tracing import trace_method class PerAgentLockManager: diff --git a/letta/services/provider_manager.py b/letta/services/provider_manager.py index 0e9cf948..bdb7bd24 100644 --- a/letta/services/provider_manager.py +++ b/letta/services/provider_manager.py @@ -1,12 +1,12 @@ from typing import List, Optional, Union from letta.orm.provider import Provider as ProviderModel +from letta.otel.tracing import trace_method from letta.schemas.enums import ProviderCategory, ProviderType from letta.schemas.providers import Provider as PydanticProvider from letta.schemas.providers import ProviderCheck, ProviderCreate, ProviderUpdate from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/services/sandbox_config_manager.py b/letta/services/sandbox_config_manager.py index 211f6975..d9a8bffb 100644 --- a/letta/services/sandbox_config_manager.py +++ b/letta/services/sandbox_config_manager.py @@ -5,6 +5,7 @@ from letta.log import get_logger from letta.orm.errors import NoResultFound from letta.orm.sandbox_config import SandboxConfig as SandboxConfigModel from letta.orm.sandbox_config import SandboxEnvironmentVariable as SandboxEnvVarModel +from letta.otel.tracing import trace_method from letta.schemas.environment_variables import SandboxEnvironmentVariable as PydanticEnvVar from letta.schemas.environment_variables import SandboxEnvironmentVariableCreate, SandboxEnvironmentVariableUpdate from letta.schemas.sandbox_config import LocalSandboxConfig @@ -12,7 +13,6 @@ from letta.schemas.sandbox_config import SandboxConfig as PydanticSandboxConfig from letta.schemas.sandbox_config import SandboxConfigCreate, SandboxConfigUpdate, SandboxType from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types, printd logger = get_logger(__name__) diff --git a/letta/services/source_manager.py b/letta/services/source_manager.py index 099441b5..83131383 100644 --- a/letta/services/source_manager.py +++ b/letta/services/source_manager.py @@ -10,13 +10,13 @@ from letta.orm.file import FileContent as FileContentModel from letta.orm.file import FileMetadata as FileMetadataModel from letta.orm.source import Source as SourceModel from letta.orm.sqlalchemy_base import AccessType +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState as PydanticAgentState from letta.schemas.file import FileMetadata as PydanticFileMetadata from letta.schemas.source import Source as PydanticSource from letta.schemas.source import SourceUpdate from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.tracing import trace_method from letta.utils import enforce_types, printd diff --git a/letta/services/step_manager.py b/letta/services/step_manager.py index ea30e2d5..9401af48 100644 --- a/letta/services/step_manager.py +++ b/letta/services/step_manager.py @@ -5,16 +5,16 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Session +from letta.helpers.singleton import singleton from letta.orm.errors import NoResultFound from letta.orm.job import Job as JobModel from letta.orm.sqlalchemy_base import AccessType from letta.orm.step import Step as StepModel +from letta.otel.tracing import get_trace_id, trace_method from letta.schemas.openai.chat_completion_response import UsageStatistics from letta.schemas.step import Step as PydanticStep from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.services.helpers.noop_helper import singleton -from letta.tracing import get_trace_id, trace_method from letta.utils import enforce_types diff --git a/letta/services/summarizer/summarizer.py b/letta/services/summarizer/summarizer.py index 3fc23dbf..fa6d18d5 100644 --- a/letta/services/summarizer/summarizer.py +++ b/letta/services/summarizer/summarizer.py @@ -6,11 +6,11 @@ from typing import List, Optional, Tuple, Union from letta.agents.ephemeral_summary_agent import EphemeralSummaryAgent from letta.constants import DEFAULT_MESSAGE_TOOL, DEFAULT_MESSAGE_TOOL_KWARG from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.schemas.enums import MessageRole from letta.schemas.letta_message_content import TextContent from letta.schemas.message import Message, MessageCreate from letta.services.summarizer.enums import SummarizationMode -from letta.tracing import trace_method logger = get_logger(__name__) diff --git a/letta/services/telemetry_manager.py b/letta/services/telemetry_manager.py index 2dc14ca9..213bdb09 100644 --- a/letta/services/telemetry_manager.py +++ b/letta/services/telemetry_manager.py @@ -1,11 +1,11 @@ from letta.helpers.json_helpers import json_dumps, json_loads +from letta.helpers.singleton import singleton from letta.orm.provider_trace import ProviderTrace as ProviderTraceModel from letta.schemas.provider_trace import ProviderTrace as PydanticProviderTrace from letta.schemas.provider_trace import ProviderTraceCreate from letta.schemas.step import Step as PydanticStep from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry -from letta.services.helpers.noop_helper import singleton from letta.utils import enforce_types diff --git a/letta/services/tool_executor/builtin_tool_executor.py b/letta/services/tool_executor/builtin_tool_executor.py index e1993e19..cc320a47 100644 --- a/letta/services/tool_executor/builtin_tool_executor.py +++ b/letta/services/tool_executor/builtin_tool_executor.py @@ -3,6 +3,7 @@ from textwrap import shorten from typing import Any, Dict, Literal, Optional from letta.constants import WEB_SEARCH_CLIP_CONTENT, WEB_SEARCH_INCLUDE_SCORE, WEB_SEARCH_SEPARATOR +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool @@ -10,7 +11,6 @@ from letta.schemas.tool_execution_result import ToolExecutionResult from letta.schemas.user import User from letta.services.tool_executor.tool_executor_base import ToolExecutor from letta.settings import tool_settings -from letta.tracing import trace_method class LettaBuiltinToolExecutor(ToolExecutor): diff --git a/letta/services/tool_executor/composio_tool_executor.py b/letta/services/tool_executor/composio_tool_executor.py index 4f4dddb8..8053c521 100644 --- a/letta/services/tool_executor/composio_tool_executor.py +++ b/letta/services/tool_executor/composio_tool_executor.py @@ -3,13 +3,13 @@ from typing import Any, Dict, Optional from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY from letta.functions.composio_helpers import execute_composio_action_async, generate_composio_action_from_func_name from letta.helpers.composio_helpers import get_composio_api_key_async +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult from letta.schemas.user import User from letta.services.tool_executor.tool_executor_base import ToolExecutor -from letta.tracing import trace_method class ExternalComposioToolExecutor(ToolExecutor): diff --git a/letta/services/tool_executor/mcp_tool_executor.py b/letta/services/tool_executor/mcp_tool_executor.py index eaef6ea3..1640c145 100644 --- a/letta/services/tool_executor/mcp_tool_executor.py +++ b/letta/services/tool_executor/mcp_tool_executor.py @@ -1,6 +1,7 @@ from typing import Any, Dict, Optional from letta.constants import MCP_TOOL_TAG_NAME_PREFIX +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool @@ -8,7 +9,6 @@ from letta.schemas.tool_execution_result import ToolExecutionResult from letta.schemas.user import User from letta.services.mcp_manager import MCPManager from letta.services.tool_executor.tool_executor_base import ToolExecutor -from letta.tracing import trace_method class ExternalMCPToolExecutor(ToolExecutor): diff --git a/letta/services/tool_executor/tool_execution_manager.py b/letta/services/tool_executor/tool_execution_manager.py index 085019ee..2b4dad6a 100644 --- a/letta/services/tool_executor/tool_execution_manager.py +++ b/letta/services/tool_executor/tool_execution_manager.py @@ -2,8 +2,12 @@ import traceback from typing import Any, Dict, Optional, Type from letta.constants import FUNCTION_RETURN_VALUE_TRUNCATED +from letta.helpers.datetime_helpers import AsyncTimer from letta.log import get_logger from letta.orm.enums import ToolType +from letta.otel.context import get_ctx_attributes +from letta.otel.metric_registry import MetricRegistry +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool @@ -21,7 +25,6 @@ from letta.services.tool_executor.mcp_tool_executor import ExternalMCPToolExecut from letta.services.tool_executor.multi_agent_tool_executor import LettaMultiAgentToolExecutor from letta.services.tool_executor.tool_executor import SandboxToolExecutor from letta.services.tool_executor.tool_executor_base import ToolExecutor -from letta.tracing import trace_method from letta.utils import get_friendly_error_msg @@ -85,10 +88,13 @@ class ToolExecutionManager: self.sandbox_env_vars = sandbox_env_vars @trace_method - async def execute_tool_async(self, function_name: str, function_args: dict, tool: Tool) -> ToolExecutionResult: + async def execute_tool_async( + self, function_name: str, function_args: dict, tool: Tool, step_id: str | None = None + ) -> ToolExecutionResult: """ Execute a tool asynchronously and persist any state changes. """ + status = "error" # set as default for tracking purposes try: executor = ToolExecutorFactory.get_executor( tool.tool_type, @@ -98,9 +104,17 @@ class ToolExecutionManager: passage_manager=self.passage_manager, actor=self.actor, ) - result = await executor.execute( - function_name, function_args, tool, self.actor, self.agent_state, self.sandbox_config, self.sandbox_env_vars - ) + + def _metrics_callback(exec_time_ms: int, exc): + return MetricRegistry().tool_execution_time_ms_histogram.record( + exec_time_ms, dict(get_ctx_attributes(), **{"tool.name": tool.name}) + ) + + async with AsyncTimer(callback_func=_metrics_callback): + result = await executor.execute( + function_name, function_args, tool, self.actor, self.agent_state, self.sandbox_config, self.sandbox_env_vars + ) + status = result.status # trim result return_str = str(result.func_return) @@ -110,6 +124,7 @@ class ToolExecutionManager: return result except Exception as e: + status = "error" self.logger.error(f"Error executing tool {function_name}: {str(e)}") error_message = get_friendly_error_msg( function_name=function_name, @@ -121,3 +136,8 @@ class ToolExecutionManager: func_return=error_message, stderr=[traceback.format_exc()], ) + finally: + metric_attrs = {"tool.name": tool.name, "tool.execution_success": status == "success"} + if status == "error" and step_id: + metric_attrs["step.id"] = step_id + MetricRegistry().tool_execution_counter.add(1, dict(get_ctx_attributes(), **metric_attrs)) diff --git a/letta/services/tool_executor/tool_execution_sandbox.py b/letta/services/tool_executor/tool_execution_sandbox.py index 4d60de8f..e41089ea 100644 --- a/letta/services/tool_executor/tool_execution_sandbox.py +++ b/letta/services/tool_executor/tool_execution_sandbox.py @@ -11,6 +11,7 @@ from typing import Any, Dict, Optional from letta.functions.helpers import generate_model_from_args_json_schema from letta.log import get_logger +from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig, SandboxType from letta.schemas.tool import Tool @@ -27,7 +28,6 @@ from letta.services.organization_manager import OrganizationManager from letta.services.sandbox_config_manager import SandboxConfigManager from letta.services.tool_manager import ToolManager from letta.settings import tool_settings -from letta.tracing import log_event, trace_method from letta.utils import get_friendly_error_msg logger = get_logger(__name__) diff --git a/letta/services/tool_executor/tool_executor.py b/letta/services/tool_executor/tool_executor.py index f34e7d01..b57d87cb 100644 --- a/letta/services/tool_executor/tool_executor.py +++ b/letta/services/tool_executor/tool_executor.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Optional from letta.functions.ast_parsers import coerce_dict_args_by_annotations, get_function_annotations_from_source from letta.log import get_logger +from letta.otel.tracing import trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig from letta.schemas.tool import Tool @@ -13,7 +14,6 @@ from letta.services.tool_executor.tool_executor_base import ToolExecutor from letta.services.tool_sandbox.e2b_sandbox import AsyncToolSandboxE2B from letta.services.tool_sandbox.local_sandbox import AsyncToolSandboxLocal from letta.settings import tool_settings -from letta.tracing import trace_method from letta.types import JsonDict from letta.utils import get_friendly_error_msg diff --git a/letta/services/tool_manager.py b/letta/services/tool_manager.py index 491e4bca..78daed8a 100644 --- a/letta/services/tool_manager.py +++ b/letta/services/tool_manager.py @@ -24,12 +24,12 @@ from letta.orm.enums import ToolType # TODO: Remove this once we translate all of these to the ORM from letta.orm.errors import NoResultFound from letta.orm.tool import Tool as ToolModel +from letta.otel.tracing import trace_method from letta.schemas.tool import Tool as PydanticTool from letta.schemas.tool import ToolCreate, ToolUpdate from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.services.mcp.types import SSEServerConfig, StdioServerConfig -from letta.tracing import trace_method from letta.utils import enforce_types, printd logger = get_logger(__name__) diff --git a/letta/services/tool_sandbox/e2b_sandbox.py b/letta/services/tool_sandbox/e2b_sandbox.py index ea07f1f1..fb44fc94 100644 --- a/letta/services/tool_sandbox/e2b_sandbox.py +++ b/letta/services/tool_sandbox/e2b_sandbox.py @@ -3,13 +3,13 @@ from typing import TYPE_CHECKING, Any, Dict, Optional from e2b_code_interpreter import AsyncSandbox from letta.log import get_logger +from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig, SandboxType from letta.schemas.tool import Tool from letta.schemas.tool_execution_result import ToolExecutionResult from letta.services.helpers.tool_parser_helper import parse_stdout_best_effort from letta.services.tool_sandbox.base import AsyncToolSandboxBase -from letta.tracing import log_event, trace_method from letta.types import JsonDict from letta.utils import get_friendly_error_msg diff --git a/letta/services/tool_sandbox/local_sandbox.py b/letta/services/tool_sandbox/local_sandbox.py index 8fe34870..08529b6f 100644 --- a/letta/services/tool_sandbox/local_sandbox.py +++ b/letta/services/tool_sandbox/local_sandbox.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Optional from pydantic.config import JsonDict +from letta.otel.tracing import log_event, trace_method from letta.schemas.agent import AgentState from letta.schemas.sandbox_config import SandboxConfig, SandboxType from letta.schemas.tool import Tool @@ -20,7 +21,6 @@ from letta.services.helpers.tool_execution_helper import ( from letta.services.helpers.tool_parser_helper import parse_stdout_best_effort from letta.services.tool_sandbox.base import AsyncToolSandboxBase from letta.settings import tool_settings -from letta.tracing import log_event, trace_method from letta.utils import get_friendly_error_msg, parse_stderr_error_msg diff --git a/letta/services/user_manager.py b/letta/services/user_manager.py index c48006ec..eab42743 100644 --- a/letta/services/user_manager.py +++ b/letta/services/user_manager.py @@ -5,11 +5,11 @@ from sqlalchemy import select, text from letta.orm.errors import NoResultFound from letta.orm.organization import Organization as OrganizationModel from letta.orm.user import User as UserModel +from letta.otel.tracing import trace_method from letta.schemas.user import User as PydanticUser from letta.schemas.user import UserUpdate from letta.server.db import db_registry from letta.services.organization_manager import OrganizationManager -from letta.tracing import trace_method from letta.utils import enforce_types diff --git a/letta/settings.py b/letta/settings.py index 0982c827..eaa033a4 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -210,6 +210,7 @@ class Settings(BaseSettings): use_uvloop: bool = False use_granian: bool = False + sqlalchemy_tracing: bool = False # event loop parallelism event_loop_threadpool_max_workers: int = 43 diff --git a/otel/otel-collector-config-clickhouse-dev.yaml b/otel/otel-collector-config-clickhouse-dev.yaml index 1c9c0cca..f4a7c374 100644 --- a/otel/otel-collector-config-clickhouse-dev.yaml +++ b/otel/otel-collector-config-clickhouse-dev.yaml @@ -12,12 +12,18 @@ processors: send_batch_size: 1024 exporters: - file: + file/traces: path: ${HOME}/.letta/logs/traces.json rotation: max_megabytes: 100 max_days: 7 max_backups: 5 + file/metrics: + path: ${HOME}/.letta/logs/metrics.json + rotation: + max_megabytes: 100 + max_days: 7 + max_backups: 5 clickhouse: endpoint: ${CLICKHOUSE_ENDPOINT} database: ${CLICKHOUSE_DATABASE} @@ -40,4 +46,8 @@ service: traces: receivers: [otlp] processors: [batch] - exporters: [file, clickhouse] + exporters: [file/traces, clickhouse] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [file/metrics, clickhouse]