Files
letta-server/tests/test_temporal_metrics_local.py
Kian Jones 25d54dd896 chore: enable F821, F401, W293 (#9503)
* auto fixes

* auto fix pt2 and transitive deps and undefined var checking locals()

* manual fixes (ignored or letta-code fixed)

* fix circular import
2026-02-24 10:55:08 -08:00

133 lines
4.2 KiB
Python

"""
Local test for temporal metrics.
Run with: uv run pytest tests/test_temporal_metrics_local.py -v -s
"""
import os
from unittest.mock import patch
import pytest
from letta.agents.temporal.metrics import (
ActivityMetrics,
TemporalMetrics,
WorkflowMetrics,
)
@pytest.fixture(autouse=True)
def setup_metrics():
"""Setup metrics for testing."""
# Force re-initialization
TemporalMetrics._initialized = False
# Enable metrics for testing
os.environ["DD_METRICS_ENABLED"] = "true"
os.environ["DD_AGENT_HOST"] = "localhost"
os.environ["DD_DOGSTATSD_PORT"] = "8125"
os.environ["DD_ENV"] = "local-test"
os.environ["DD_SERVICE"] = "letta-temporal-test"
yield
# Cleanup
TemporalMetrics._initialized = False
@pytest.mark.asyncio
async def test_metrics_initialization():
"""Test that metrics initialize correctly."""
TemporalMetrics.initialize()
assert TemporalMetrics._initialized is True
print(f"\n✓ Metrics initialized: enabled={TemporalMetrics.is_enabled()}")
@pytest.mark.asyncio
async def test_workflow_metrics():
"""Test workflow metrics recording."""
with patch("letta.agents.temporal.metrics.statsd") as mock_statsd:
TemporalMetrics._initialized = False
TemporalMetrics._enabled = True
TemporalMetrics._initialized = True
# Record workflow metrics
WorkflowMetrics.record_workflow_start(workflow_type="TemporalAgentWorkflow", workflow_id="test-workflow-123")
WorkflowMetrics.record_workflow_success(
workflow_type="TemporalAgentWorkflow",
workflow_id="test-workflow-123",
duration_ns=1_000_000_000, # 1 second
)
WorkflowMetrics.record_workflow_usage(
workflow_type="TemporalAgentWorkflow",
step_count=5,
completion_tokens=100,
prompt_tokens=50,
total_tokens=150,
)
# Verify metrics were called
assert mock_statsd.increment.called
assert mock_statsd.histogram.called
assert mock_statsd.gauge.called
print("\n✓ Workflow metrics recorded successfully")
print(f" - increment called {mock_statsd.increment.call_count} times")
print(f" - histogram called {mock_statsd.histogram.call_count} times")
print(f" - gauge called {mock_statsd.gauge.call_count} times")
@pytest.mark.asyncio
async def test_activity_metrics():
"""Test activity metrics recording."""
with patch("letta.agents.temporal.metrics.statsd") as mock_statsd:
TemporalMetrics._initialized = False
TemporalMetrics._enabled = True
TemporalMetrics._initialized = True
# Record activity metrics
ActivityMetrics.record_activity_start("llm_request")
ActivityMetrics.record_activity_success("llm_request", duration_ms=500.0)
# Verify metrics were called
assert mock_statsd.increment.called
assert mock_statsd.histogram.called
print("\n✓ Activity metrics recorded successfully")
print(f" - increment called {mock_statsd.increment.call_count} times")
@pytest.mark.asyncio
async def test_metrics_with_real_dogstatsd():
"""
Test metrics with real DogStatsD connection (requires Datadog agent running).
This test will skip if the agent is not available.
"""
import socket
# Check if DogStatsD is listening
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.connect(("localhost", 8125))
dogstatsd_available = True
sock.close()
except Exception:
dogstatsd_available = False
if not dogstatsd_available:
pytest.skip("DogStatsD not available on localhost:8125")
# Force re-initialization with real connection
TemporalMetrics._initialized = False
TemporalMetrics.initialize()
# Send test metrics
TemporalMetrics.increment("temporal.test.counter", value=1, tags=["test:true"])
TemporalMetrics.gauge("temporal.test.gauge", value=42.0, tags=["test:true"])
TemporalMetrics.histogram("temporal.test.histogram", value=100.0, tags=["test:true"])
print("\n✓ Real metrics sent to DogStatsD at localhost:8125")
print(" Check your Datadog UI for metrics with prefix 'temporal.test.*'")