From 2f9c600d7ff42187be8e90a39040223a6905b6d2 Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Wed, 12 Feb 2025 13:27:13 -0800 Subject: [PATCH] feat: Add telemetry logging around agent / multi-agent broadcasting (#971) --- letta/agent.py | 19 ++++++++++++++++--- letta/functions/helpers.py | 29 ++++++++++++++++++++++++++++- letta/settings.py | 3 +++ letta/utils.py | 17 +++++++++++++++++ tests/test_google_embeddings.py | 1 - 5 files changed, 64 insertions(+), 5 deletions(-) diff --git a/letta/agent.py b/letta/agent.py index 9a8ce758..bad40be3 100644 --- a/letta/agent.py +++ b/letta/agent.py @@ -61,6 +61,7 @@ from letta.utils import ( get_utc_time, json_dumps, json_loads, + log_telemetry, parse_json, printd, validate_function_response, @@ -306,7 +307,7 @@ class Agent(BaseAgent): last_function_failed: bool = False, ) -> ChatCompletionResponse: """Get response from LLM API with robust retry mechanism.""" - + log_telemetry(self.logger, "_get_ai_reply start") allowed_tool_names = self.tool_rules_solver.get_allowed_tool_names(last_function_response=self.last_function_response) agent_state_tool_jsons = [t.json_schema for t in self.agent_state.tools] @@ -337,6 +338,7 @@ class Agent(BaseAgent): for attempt in range(1, empty_response_retry_limit + 1): try: + log_telemetry(self.logger, "_get_ai_reply create start") response = create( llm_config=self.agent_state.llm_config, messages=message_sequence, @@ -349,6 +351,7 @@ class Agent(BaseAgent): stream=stream, stream_interface=self.interface, ) + log_telemetry(self.logger, "_get_ai_reply create finish") # These bottom two are retryable if len(response.choices) == 0 or response.choices[0] is None: @@ -360,12 +363,13 @@ class Agent(BaseAgent): raise RuntimeError("Finish reason was length (maximum context length)") else: raise ValueError(f"Bad finish reason from API: {response.choices[0].finish_reason}") - + log_telemetry(self.logger, "_handle_ai_response finish") return response except ValueError as ve: if attempt >= empty_response_retry_limit: warnings.warn(f"Retry limit reached. Final error: {ve}") + log_telemetry(self.logger, "_handle_ai_response finish ValueError") raise Exception(f"Retries exhausted and no valid response received. Final error: {ve}") else: delay = min(backoff_factor * (2 ** (attempt - 1)), max_delay) @@ -374,8 +378,10 @@ class Agent(BaseAgent): except Exception as e: # For non-retryable errors, exit immediately + log_telemetry(self.logger, "_handle_ai_response finish generic Exception") raise e + log_telemetry(self.logger, "_handle_ai_response finish catch-all exception") raise Exception("Retries exhausted and no valid response received.") def _handle_ai_response( @@ -388,7 +394,7 @@ class Agent(BaseAgent): response_message_id: Optional[str] = None, ) -> Tuple[List[Message], bool, bool]: """Handles parsing and function execution""" - + log_telemetry(self.logger, "_handle_ai_response start") # Hacky failsafe for now to make sure we didn't implement the streaming Message ID creation incorrectly if response_message_id is not None: assert response_message_id.startswith("message-"), response_message_id @@ -506,7 +512,13 @@ class Agent(BaseAgent): self.interface.function_message(f"Running {function_name}({function_args})", msg_obj=messages[-1]) try: # handle tool execution (sandbox) and state updates + log_telemetry( + self.logger, "_handle_ai_response execute tool start", function_name=function_name, function_args=function_args + ) function_response, sandbox_run_result = self.execute_tool_and_persist_state(function_name, function_args, target_letta_tool) + log_telemetry( + self.logger, "_handle_ai_response execute tool finish", function_name=function_name, function_args=function_args + ) if sandbox_run_result and sandbox_run_result.status == "error": messages = self._handle_function_error_response( @@ -597,6 +609,7 @@ class Agent(BaseAgent): elif self.tool_rules_solver.is_terminal_tool(function_name): heartbeat_request = False + log_telemetry(self.logger, "_handle_ai_response finish") return messages, heartbeat_request, function_failed def step( diff --git a/letta/functions/helpers.py b/letta/functions/helpers.py index 92b75e49..ef42b4c9 100644 --- a/letta/functions/helpers.py +++ b/letta/functions/helpers.py @@ -17,6 +17,7 @@ from letta.schemas.message import Message, MessageCreate from letta.schemas.user import User from letta.server.rest_api.utils import get_letta_server from letta.settings import settings +from letta.utils import log_telemetry # TODO: This is kind of hacky, as this is used to search up the action later on composio's side @@ -341,10 +342,16 @@ async def async_send_message_with_retries( timeout: int, logging_prefix: Optional[str] = None, ) -> str: - logging_prefix = logging_prefix or "[async_send_message_with_retries]" + log_telemetry(sender_agent.logger, f"async_send_message_with_retries start", target_agent_id=target_agent_id) + for attempt in range(1, max_retries + 1): try: + log_telemetry( + sender_agent.logger, + f"async_send_message_with_retries -> asyncio wait for send_message_to_agent_no_stream start", + target_agent_id=target_agent_id, + ) response = await asyncio.wait_for( send_message_to_agent_no_stream( server=server, @@ -354,15 +361,24 @@ async def async_send_message_with_retries( ), timeout=timeout, ) + log_telemetry( + sender_agent.logger, + f"async_send_message_with_retries -> asyncio wait for send_message_to_agent_no_stream finish", + target_agent_id=target_agent_id, + ) # Then parse out the assistant message assistant_message = parse_letta_response_for_assistant_message(target_agent_id, response) if assistant_message: sender_agent.logger.info(f"{logging_prefix} - {assistant_message}") + log_telemetry( + sender_agent.logger, f"async_send_message_with_retries finish with assistant message", target_agent_id=target_agent_id + ) return assistant_message else: msg = f"(No response from agent {target_agent_id})" sender_agent.logger.info(f"{logging_prefix} - {msg}") + log_telemetry(sender_agent.logger, f"async_send_message_with_retries finish no response", target_agent_id=target_agent_id) return msg except asyncio.TimeoutError: @@ -380,6 +396,12 @@ async def async_send_message_with_retries( await asyncio.sleep(backoff) else: sender_agent.logger.error(f"{logging_prefix} - Fatal error: {error_msg}") + log_telemetry( + sender_agent.logger, + f"async_send_message_with_retries finish fatal error", + target_agent_id=target_agent_id, + error_msg=error_msg, + ) raise Exception(error_msg) @@ -468,6 +490,7 @@ def fire_and_forget_send_to_agent( async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent", message: str, tags: List[str]) -> List[str]: + log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async start", message=message, tags=tags) server = get_letta_server() augmented_message = ( @@ -477,7 +500,9 @@ async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent", ) # Retrieve up to 100 matching agents + log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async listing agents start", message=message, tags=tags) matching_agents = server.agent_manager.list_agents(actor=sender_agent.user, tags=tags, match_all_tags=True, limit=100) + log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async listing agents finish", message=message, tags=tags) # Create a system message messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=sender_agent.agent_state.name)] @@ -504,4 +529,6 @@ async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent", final.append(str(r)) else: final.append(r) + + log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async finish", message=message, tags=tags) return final diff --git a/letta/settings.py b/letta/settings.py index 667f7242..4fda300c 100644 --- a/letta/settings.py +++ b/letta/settings.py @@ -151,6 +151,9 @@ class Settings(BaseSettings): multi_agent_send_message_timeout: int = 20 * 60 multi_agent_concurrent_sends: int = 15 + # telemetry logging + verbose_telemetry_logging: bool = False + @property def letta_pg_uri(self) -> str: if self.pg_uri: diff --git a/letta/utils.py b/letta/utils.py index 171391e3..d0893bab 100644 --- a/letta/utils.py +++ b/letta/utils.py @@ -16,6 +16,7 @@ import uuid from contextlib import contextmanager from datetime import datetime, timedelta, timezone from functools import wraps +from logging import Logger from typing import Any, Coroutine, List, Union, _GenericAlias, get_args, get_origin, get_type_hints from urllib.parse import urljoin, urlparse @@ -1150,3 +1151,19 @@ def run_async_task(coro: Coroutine[Any, Any, Any]) -> Any: except RuntimeError: # If no event loop is running, create a new one return asyncio.run(coro) + + +def log_telemetry(logger: Logger, event: str, **kwargs): + """ + Logs telemetry events with a timestamp. + + :param logger: A logger + :param event: A string describing the event. + :param kwargs: Additional key-value pairs for logging metadata. + """ + from letta.settings import settings + + if settings.verbose_telemetry_logging: + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S,%f UTC") # More readable timestamp + extra_data = " | ".join(f"{key}={value}" for key, value in kwargs.items() if value is not None) + logger.info(f"[{timestamp}] EVENT: {event} | {extra_data}") diff --git a/tests/test_google_embeddings.py b/tests/test_google_embeddings.py index 71570ff0..dcaad596 100644 --- a/tests/test_google_embeddings.py +++ b/tests/test_google_embeddings.py @@ -8,7 +8,6 @@ load_dotenv() import os import threading import time -import uuid import pytest from letta_client import CreateBlock