feat: Add telemetry logging around agent / multi-agent broadcasting (#971)
This commit is contained in:
@@ -61,6 +61,7 @@ from letta.utils import (
|
||||
get_utc_time,
|
||||
json_dumps,
|
||||
json_loads,
|
||||
log_telemetry,
|
||||
parse_json,
|
||||
printd,
|
||||
validate_function_response,
|
||||
@@ -306,7 +307,7 @@ class Agent(BaseAgent):
|
||||
last_function_failed: bool = False,
|
||||
) -> ChatCompletionResponse:
|
||||
"""Get response from LLM API with robust retry mechanism."""
|
||||
|
||||
log_telemetry(self.logger, "_get_ai_reply start")
|
||||
allowed_tool_names = self.tool_rules_solver.get_allowed_tool_names(last_function_response=self.last_function_response)
|
||||
agent_state_tool_jsons = [t.json_schema for t in self.agent_state.tools]
|
||||
|
||||
@@ -337,6 +338,7 @@ class Agent(BaseAgent):
|
||||
|
||||
for attempt in range(1, empty_response_retry_limit + 1):
|
||||
try:
|
||||
log_telemetry(self.logger, "_get_ai_reply create start")
|
||||
response = create(
|
||||
llm_config=self.agent_state.llm_config,
|
||||
messages=message_sequence,
|
||||
@@ -349,6 +351,7 @@ class Agent(BaseAgent):
|
||||
stream=stream,
|
||||
stream_interface=self.interface,
|
||||
)
|
||||
log_telemetry(self.logger, "_get_ai_reply create finish")
|
||||
|
||||
# These bottom two are retryable
|
||||
if len(response.choices) == 0 or response.choices[0] is None:
|
||||
@@ -360,12 +363,13 @@ class Agent(BaseAgent):
|
||||
raise RuntimeError("Finish reason was length (maximum context length)")
|
||||
else:
|
||||
raise ValueError(f"Bad finish reason from API: {response.choices[0].finish_reason}")
|
||||
|
||||
log_telemetry(self.logger, "_handle_ai_response finish")
|
||||
return response
|
||||
|
||||
except ValueError as ve:
|
||||
if attempt >= empty_response_retry_limit:
|
||||
warnings.warn(f"Retry limit reached. Final error: {ve}")
|
||||
log_telemetry(self.logger, "_handle_ai_response finish ValueError")
|
||||
raise Exception(f"Retries exhausted and no valid response received. Final error: {ve}")
|
||||
else:
|
||||
delay = min(backoff_factor * (2 ** (attempt - 1)), max_delay)
|
||||
@@ -374,8 +378,10 @@ class Agent(BaseAgent):
|
||||
|
||||
except Exception as e:
|
||||
# For non-retryable errors, exit immediately
|
||||
log_telemetry(self.logger, "_handle_ai_response finish generic Exception")
|
||||
raise e
|
||||
|
||||
log_telemetry(self.logger, "_handle_ai_response finish catch-all exception")
|
||||
raise Exception("Retries exhausted and no valid response received.")
|
||||
|
||||
def _handle_ai_response(
|
||||
@@ -388,7 +394,7 @@ class Agent(BaseAgent):
|
||||
response_message_id: Optional[str] = None,
|
||||
) -> Tuple[List[Message], bool, bool]:
|
||||
"""Handles parsing and function execution"""
|
||||
|
||||
log_telemetry(self.logger, "_handle_ai_response start")
|
||||
# Hacky failsafe for now to make sure we didn't implement the streaming Message ID creation incorrectly
|
||||
if response_message_id is not None:
|
||||
assert response_message_id.startswith("message-"), response_message_id
|
||||
@@ -506,7 +512,13 @@ class Agent(BaseAgent):
|
||||
self.interface.function_message(f"Running {function_name}({function_args})", msg_obj=messages[-1])
|
||||
try:
|
||||
# handle tool execution (sandbox) and state updates
|
||||
log_telemetry(
|
||||
self.logger, "_handle_ai_response execute tool start", function_name=function_name, function_args=function_args
|
||||
)
|
||||
function_response, sandbox_run_result = self.execute_tool_and_persist_state(function_name, function_args, target_letta_tool)
|
||||
log_telemetry(
|
||||
self.logger, "_handle_ai_response execute tool finish", function_name=function_name, function_args=function_args
|
||||
)
|
||||
|
||||
if sandbox_run_result and sandbox_run_result.status == "error":
|
||||
messages = self._handle_function_error_response(
|
||||
@@ -597,6 +609,7 @@ class Agent(BaseAgent):
|
||||
elif self.tool_rules_solver.is_terminal_tool(function_name):
|
||||
heartbeat_request = False
|
||||
|
||||
log_telemetry(self.logger, "_handle_ai_response finish")
|
||||
return messages, heartbeat_request, function_failed
|
||||
|
||||
def step(
|
||||
|
||||
@@ -17,6 +17,7 @@ from letta.schemas.message import Message, MessageCreate
|
||||
from letta.schemas.user import User
|
||||
from letta.server.rest_api.utils import get_letta_server
|
||||
from letta.settings import settings
|
||||
from letta.utils import log_telemetry
|
||||
|
||||
|
||||
# TODO: This is kind of hacky, as this is used to search up the action later on composio's side
|
||||
@@ -341,10 +342,16 @@ async def async_send_message_with_retries(
|
||||
timeout: int,
|
||||
logging_prefix: Optional[str] = None,
|
||||
) -> str:
|
||||
|
||||
logging_prefix = logging_prefix or "[async_send_message_with_retries]"
|
||||
log_telemetry(sender_agent.logger, f"async_send_message_with_retries start", target_agent_id=target_agent_id)
|
||||
|
||||
for attempt in range(1, max_retries + 1):
|
||||
try:
|
||||
log_telemetry(
|
||||
sender_agent.logger,
|
||||
f"async_send_message_with_retries -> asyncio wait for send_message_to_agent_no_stream start",
|
||||
target_agent_id=target_agent_id,
|
||||
)
|
||||
response = await asyncio.wait_for(
|
||||
send_message_to_agent_no_stream(
|
||||
server=server,
|
||||
@@ -354,15 +361,24 @@ async def async_send_message_with_retries(
|
||||
),
|
||||
timeout=timeout,
|
||||
)
|
||||
log_telemetry(
|
||||
sender_agent.logger,
|
||||
f"async_send_message_with_retries -> asyncio wait for send_message_to_agent_no_stream finish",
|
||||
target_agent_id=target_agent_id,
|
||||
)
|
||||
|
||||
# Then parse out the assistant message
|
||||
assistant_message = parse_letta_response_for_assistant_message(target_agent_id, response)
|
||||
if assistant_message:
|
||||
sender_agent.logger.info(f"{logging_prefix} - {assistant_message}")
|
||||
log_telemetry(
|
||||
sender_agent.logger, f"async_send_message_with_retries finish with assistant message", target_agent_id=target_agent_id
|
||||
)
|
||||
return assistant_message
|
||||
else:
|
||||
msg = f"(No response from agent {target_agent_id})"
|
||||
sender_agent.logger.info(f"{logging_prefix} - {msg}")
|
||||
log_telemetry(sender_agent.logger, f"async_send_message_with_retries finish no response", target_agent_id=target_agent_id)
|
||||
return msg
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
@@ -380,6 +396,12 @@ async def async_send_message_with_retries(
|
||||
await asyncio.sleep(backoff)
|
||||
else:
|
||||
sender_agent.logger.error(f"{logging_prefix} - Fatal error: {error_msg}")
|
||||
log_telemetry(
|
||||
sender_agent.logger,
|
||||
f"async_send_message_with_retries finish fatal error",
|
||||
target_agent_id=target_agent_id,
|
||||
error_msg=error_msg,
|
||||
)
|
||||
raise Exception(error_msg)
|
||||
|
||||
|
||||
@@ -468,6 +490,7 @@ def fire_and_forget_send_to_agent(
|
||||
|
||||
|
||||
async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent", message: str, tags: List[str]) -> List[str]:
|
||||
log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async start", message=message, tags=tags)
|
||||
server = get_letta_server()
|
||||
|
||||
augmented_message = (
|
||||
@@ -477,7 +500,9 @@ async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent",
|
||||
)
|
||||
|
||||
# Retrieve up to 100 matching agents
|
||||
log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async listing agents start", message=message, tags=tags)
|
||||
matching_agents = server.agent_manager.list_agents(actor=sender_agent.user, tags=tags, match_all_tags=True, limit=100)
|
||||
log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async listing agents finish", message=message, tags=tags)
|
||||
|
||||
# Create a system message
|
||||
messages = [MessageCreate(role=MessageRole.system, content=augmented_message, name=sender_agent.agent_state.name)]
|
||||
@@ -504,4 +529,6 @@ async def _send_message_to_agents_matching_all_tags_async(sender_agent: "Agent",
|
||||
final.append(str(r))
|
||||
else:
|
||||
final.append(r)
|
||||
|
||||
log_telemetry(sender_agent.logger, "_send_message_to_agents_matching_all_tags_async finish", message=message, tags=tags)
|
||||
return final
|
||||
|
||||
@@ -151,6 +151,9 @@ class Settings(BaseSettings):
|
||||
multi_agent_send_message_timeout: int = 20 * 60
|
||||
multi_agent_concurrent_sends: int = 15
|
||||
|
||||
# telemetry logging
|
||||
verbose_telemetry_logging: bool = False
|
||||
|
||||
@property
|
||||
def letta_pg_uri(self) -> str:
|
||||
if self.pg_uri:
|
||||
|
||||
@@ -16,6 +16,7 @@ import uuid
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from functools import wraps
|
||||
from logging import Logger
|
||||
from typing import Any, Coroutine, List, Union, _GenericAlias, get_args, get_origin, get_type_hints
|
||||
from urllib.parse import urljoin, urlparse
|
||||
|
||||
@@ -1150,3 +1151,19 @@ def run_async_task(coro: Coroutine[Any, Any, Any]) -> Any:
|
||||
except RuntimeError:
|
||||
# If no event loop is running, create a new one
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
def log_telemetry(logger: Logger, event: str, **kwargs):
|
||||
"""
|
||||
Logs telemetry events with a timestamp.
|
||||
|
||||
:param logger: A logger
|
||||
:param event: A string describing the event.
|
||||
:param kwargs: Additional key-value pairs for logging metadata.
|
||||
"""
|
||||
from letta.settings import settings
|
||||
|
||||
if settings.verbose_telemetry_logging:
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S,%f UTC") # More readable timestamp
|
||||
extra_data = " | ".join(f"{key}={value}" for key, value in kwargs.items() if value is not None)
|
||||
logger.info(f"[{timestamp}] EVENT: {event} | {extra_data}")
|
||||
|
||||
@@ -8,7 +8,6 @@ load_dotenv()
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from letta_client import CreateBlock
|
||||
|
||||
Reference in New Issue
Block a user