Shub/let 5962 add perfomranceduration search to runs [LET-5962] (#5850)

* feat: add perfomrance/search to list internal runs

* chore: add tests

* chore: fix ui

* feat: support UI for this

* chore: update tests

* chore: update types

---------

Co-authored-by: Shubham Naik <shub@memgpt.ai>
This commit is contained in:
Shubham Naik
2025-10-30 14:49:09 -07:00
committed by Caren Thomas
parent d109bab587
commit 95816b9b28
4 changed files with 458 additions and 28 deletions

View File

@@ -10518,7 +10518,7 @@
"in": "query",
"required": false,
"schema": {
"const": "created_at",
"enum": ["created_at", "duration"],
"type": "string",
"description": "Field to sort by",
"default": "created_at",
@@ -10569,6 +10569,99 @@
"title": "Project Id"
},
"description": "Filter runs by project ID."
},
{
"name": "duration_percentile",
"in": "query",
"required": false,
"schema": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"description": "Filter runs by duration percentile (1-100). Returns runs slower than this percentile.",
"title": "Duration Percentile"
},
"description": "Filter runs by duration percentile (1-100). Returns runs slower than this percentile."
},
{
"name": "duration_value",
"in": "query",
"required": false,
"schema": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"description": "Duration value in nanoseconds for filtering. Must be used with duration_operator.",
"title": "Duration Value"
},
"description": "Duration value in nanoseconds for filtering. Must be used with duration_operator."
},
{
"name": "duration_operator",
"in": "query",
"required": false,
"schema": {
"anyOf": [
{
"enum": ["gt", "lt", "eq"],
"type": "string"
},
{
"type": "null"
}
],
"description": "Comparison operator for duration filter: 'gt' (greater than), 'lt' (less than), 'eq' (equals).",
"title": "Duration Operator"
},
"description": "Comparison operator for duration filter: 'gt' (greater than), 'lt' (less than), 'eq' (equals)."
},
{
"name": "start_date",
"in": "query",
"required": false,
"schema": {
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": "null"
}
],
"description": "Filter runs created on or after this date (ISO 8601 format).",
"title": "Start Date"
},
"description": "Filter runs created on or after this date (ISO 8601 format)."
},
{
"name": "end_date",
"in": "query",
"required": false,
"schema": {
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": "null"
}
],
"description": "Filter runs created on or before this date (ISO 8601 format).",
"title": "End Date"
},
"description": "Filter runs created on or before this date (ISO 8601 format)."
}
],
"responses": {

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import List, Literal, Optional
from fastapi import APIRouter, Depends, Query
@@ -55,7 +56,7 @@ async def list_runs(
order: Literal["asc", "desc"] = Query(
"desc", description="Sort order for runs by creation time. 'asc' for oldest first, 'desc' for newest first"
),
order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"),
order_by: Literal["created_at", "duration"] = Query("created_at", description="Field to sort by"),
active: bool = Query(False, description="Filter for active runs."),
ascending: bool = Query(
False,
@@ -63,6 +64,17 @@ async def list_runs(
deprecated=True,
),
project_id: Optional[str] = Query(None, description="Filter runs by project ID."),
duration_percentile: Optional[int] = Query(
None, description="Filter runs by duration percentile (1-100). Returns runs slower than this percentile."
),
duration_value: Optional[int] = Query(
None, description="Duration value in nanoseconds for filtering. Must be used with duration_operator."
),
duration_operator: Optional[Literal["gt", "lt", "eq"]] = Query(
None, description="Comparison operator for duration filter: 'gt' (greater than), 'lt' (less than), 'eq' (equals)."
),
start_date: Optional[datetime] = Query(None, description="Filter runs created on or after this date (ISO 8601 format)."),
end_date: Optional[datetime] = Query(None, description="Filter runs created on or before this date (ISO 8601 format)."),
headers: HeaderParams = Depends(get_headers),
):
"""
@@ -90,6 +102,11 @@ async def list_runs(
# Convert string statuses to RunStatus enum
parsed_statuses = convert_statuses_to_enum(statuses)
# Create duration filter dict if both parameters provided
duration_filter = None
if duration_value is not None and duration_operator is not None:
duration_filter = {"value": duration_value, "operator": duration_operator}
runs = await runs_manager.list_runs(
actor=actor,
agent_ids=agent_ids,
@@ -105,5 +122,10 @@ async def list_runs(
step_count_operator=step_count_operator,
tools_used=tools_used,
project_id=project_id,
order_by=order_by,
duration_percentile=duration_percentile,
duration_filter=duration_filter,
start_date=start_date,
end_date=end_date,
)
return runs

View File

@@ -115,12 +115,22 @@ class RunManager:
step_count_operator: ComparisonOperator = ComparisonOperator.EQ,
tools_used: Optional[List[str]] = None,
project_id: Optional[str] = None,
order_by: Literal["created_at", "duration"] = "created_at",
duration_percentile: Optional[int] = None,
duration_filter: Optional[dict] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> List[PydanticRun]:
"""List runs with filtering options."""
async with db_registry.async_session() as session:
from sqlalchemy import or_, select
from sqlalchemy import func, or_, select
query = select(RunModel).filter(RunModel.organization_id == actor.organization_id)
# Always join with run_metrics to get duration data
query = (
select(RunModel, RunMetricsModel.run_ns)
.outerjoin(RunMetricsModel, RunModel.id == RunMetricsModel.id)
.filter(RunModel.organization_id == actor.organization_id)
)
# Filter by project_id if provided
if project_id:
@@ -148,41 +158,107 @@ class RunManager:
if template_family:
query = query.filter(RunModel.base_template_id == template_family)
# Filter by step_count and/or tools_used - join with run_metrics
if step_count is not None or tools_used:
query = query.join(RunMetricsModel, RunModel.id == RunMetricsModel.id)
# Filter by date range
if start_date:
query = query.filter(RunModel.created_at >= start_date)
if end_date:
query = query.filter(RunModel.created_at <= end_date)
# Filter by step_count with the specified operator
if step_count is not None:
if step_count_operator == ComparisonOperator.EQ:
query = query.filter(RunMetricsModel.num_steps == step_count)
elif step_count_operator == ComparisonOperator.GTE:
query = query.filter(RunMetricsModel.num_steps >= step_count)
elif step_count_operator == ComparisonOperator.LTE:
query = query.filter(RunMetricsModel.num_steps <= step_count)
# Filter by step_count with the specified operator
if step_count is not None:
if step_count_operator == ComparisonOperator.EQ:
query = query.filter(RunMetricsModel.num_steps == step_count)
elif step_count_operator == ComparisonOperator.GTE:
query = query.filter(RunMetricsModel.num_steps >= step_count)
elif step_count_operator == ComparisonOperator.LTE:
query = query.filter(RunMetricsModel.num_steps <= step_count)
# Filter by tools used ids
if tools_used:
from sqlalchemy import String, cast as sa_cast, type_coerce
from sqlalchemy.dialects.postgresql import ARRAY, JSONB
# Filter by tools used ids
if tools_used:
from sqlalchemy import String, cast as sa_cast, type_coerce
from sqlalchemy.dialects.postgresql import ARRAY, JSONB
# Use ?| operator to check if any tool_id exists in the array (OR logic)
jsonb_tools = sa_cast(RunMetricsModel.tools_used, JSONB)
tools_array = type_coerce(tools_used, ARRAY(String))
query = query.filter(jsonb_tools.op("?|")(tools_array))
# Use ?| operator to check if any tool_id exists in the array (OR logic)
jsonb_tools = sa_cast(RunMetricsModel.tools_used, JSONB)
tools_array = type_coerce(tools_used, ARRAY(String))
query = query.filter(jsonb_tools.op("?|")(tools_array))
# Apply pagination
from letta.services.helpers.run_manager_helper import _apply_pagination_async
# Ensure run_ns is not null when working with duration
if order_by == "duration" or duration_percentile is not None or duration_filter is not None:
query = query.filter(RunMetricsModel.run_ns.isnot(None))
query = await _apply_pagination_async(query, before, after, session, ascending=ascending)
# Apply duration filter if requested
if duration_filter is not None:
duration_value = duration_filter.get("value") if isinstance(duration_filter, dict) else duration_filter.value
duration_operator = duration_filter.get("operator") if isinstance(duration_filter, dict) else duration_filter.operator
if duration_operator == "gt":
query = query.filter(RunMetricsModel.run_ns > duration_value)
elif duration_operator == "lt":
query = query.filter(RunMetricsModel.run_ns < duration_value)
elif duration_operator == "eq":
query = query.filter(RunMetricsModel.run_ns == duration_value)
# Apply duration percentile filter if requested
if duration_percentile is not None:
# Calculate the percentile threshold
percentile_query = (
select(func.percentile_cont(duration_percentile / 100.0).within_group(RunMetricsModel.run_ns))
.select_from(RunMetricsModel)
.join(RunModel, RunModel.id == RunMetricsModel.id)
.filter(RunModel.organization_id == actor.organization_id)
.filter(RunMetricsModel.run_ns.isnot(None))
)
# Apply same filters to percentile calculation
if project_id:
percentile_query = percentile_query.filter(RunModel.project_id == project_id)
if agent_ids:
percentile_query = percentile_query.filter(RunModel.agent_id.in_(agent_ids))
if statuses:
percentile_query = percentile_query.filter(RunModel.status.in_(statuses))
# Execute percentile query
percentile_result = await session.execute(percentile_query)
percentile_threshold = percentile_result.scalar()
# Filter by percentile threshold (runs slower than the percentile)
if percentile_threshold is not None:
query = query.filter(RunMetricsModel.run_ns >= percentile_threshold)
# Apply sorting based on order_by
if order_by == "duration":
# Sort by duration
if ascending:
query = query.order_by(RunMetricsModel.run_ns.asc())
else:
query = query.order_by(RunMetricsModel.run_ns.desc())
else:
# Apply pagination for created_at ordering
from letta.services.helpers.run_manager_helper import _apply_pagination_async
query = await _apply_pagination_async(query, before, after, session, ascending=ascending)
# Apply limit
if limit:
query = query.limit(limit)
result = await session.execute(query)
runs = result.scalars().all()
return [run.to_pydantic() for run in runs]
rows = result.all()
# Populate total_duration_ns from run_metrics.run_ns
pydantic_runs = []
for row in rows:
run_model = row[0]
run_ns = row[1]
pydantic_run = run_model.to_pydantic()
if run_ns is not None:
pydantic_run.total_duration_ns = run_ns
pydantic_runs.append(pydantic_run)
return pydantic_runs
@enforce_types
@raise_on_invalid_id(param_name="run_id", expected_prefix=PrimitiveType.RUN)

View File

@@ -1775,3 +1775,242 @@ async def test_list_runs_with_no_status_filter_returns_all(server: SyncServer, s
assert RunStatus.completed in statuses_found
assert RunStatus.failed in statuses_found
assert RunStatus.cancelled in statuses_found
# ======================================================================================================================
# RunManager Tests - Duration Filtering
# ======================================================================================================================
@pytest.mark.asyncio
async def test_list_runs_by_duration_gt(server: SyncServer, sarah_agent, default_user):
"""Test listing runs filtered by duration greater than a threshold."""
import asyncio
# Create runs with different durations
runs_data = []
# Fast run (< 100ms)
run_fast = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "fast"}),
actor=default_user,
)
await asyncio.sleep(0.05) # 50ms
await server.run_manager.update_run_by_id_async(
run_fast.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
runs_data.append(run_fast)
# Medium run (~150ms)
run_medium = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "medium"}),
actor=default_user,
)
await asyncio.sleep(0.15) # 150ms
await server.run_manager.update_run_by_id_async(
run_medium.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
runs_data.append(run_medium)
# Slow run (~250ms)
run_slow = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "slow"}),
actor=default_user,
)
await asyncio.sleep(0.25) # 250ms
await server.run_manager.update_run_by_id_async(
run_slow.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
runs_data.append(run_slow)
# Filter runs with duration > 100ms (100,000,000 ns)
filtered_runs = await server.run_manager.list_runs(
actor=default_user,
agent_id=sarah_agent.id,
duration_filter={"value": 100_000_000, "operator": "gt"},
)
# Should return medium and slow runs
assert len(filtered_runs) >= 2
run_ids = {run.id for run in filtered_runs}
assert run_medium.id in run_ids
assert run_slow.id in run_ids
@pytest.mark.asyncio
async def test_list_runs_by_duration_lt(server: SyncServer, sarah_agent, default_user):
"""Test listing runs filtered by duration less than a threshold."""
import asyncio
# Create runs with different durations
# Fast run
run_fast = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "fast"}),
actor=default_user,
)
await asyncio.sleep(0.05) # 50ms
await server.run_manager.update_run_by_id_async(
run_fast.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
# Slow run
run_slow = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "slow"}),
actor=default_user,
)
await asyncio.sleep(0.30) # 300ms
await server.run_manager.update_run_by_id_async(
run_slow.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
# Get actual durations to set a threshold between them
fast_metrics = await server.run_manager.get_run_metrics_async(run_id=run_fast.id, actor=default_user)
slow_metrics = await server.run_manager.get_run_metrics_async(run_id=run_slow.id, actor=default_user)
# Set threshold between the two durations
threshold = (fast_metrics.run_ns + slow_metrics.run_ns) // 2
# Filter runs with duration < threshold
filtered_runs = await server.run_manager.list_runs(
actor=default_user,
agent_id=sarah_agent.id,
duration_filter={"value": threshold, "operator": "lt"},
)
# Should return only the fast run
assert len(filtered_runs) >= 1
assert run_fast.id in [run.id for run in filtered_runs]
# Verify slow run is not included
assert run_slow.id not in [run.id for run in filtered_runs]
@pytest.mark.asyncio
async def test_list_runs_by_duration_percentile(server: SyncServer, sarah_agent, default_user):
"""Test listing runs filtered by duration percentile."""
import asyncio
# Create runs with varied durations
run_ids = []
durations_ms = [50, 100, 150, 200, 250, 300, 350, 400, 450, 500]
for i, duration_ms in enumerate(durations_ms):
run = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"index": i}),
actor=default_user,
)
await asyncio.sleep(duration_ms / 1000.0) # Convert to seconds
await server.run_manager.update_run_by_id_async(
run.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
run_ids.append(run.id)
# Filter runs in top 20% (80th percentile)
# This should return approximately the slowest 20% of runs
filtered_runs = await server.run_manager.list_runs(
actor=default_user,
agent_id=sarah_agent.id,
duration_percentile=80,
)
# Should return at least 2 runs (approximately 20% of 10)
assert len(filtered_runs) >= 2
# Verify the slowest run is definitely included
filtered_ids = {run.id for run in filtered_runs}
assert run_ids[-1] in filtered_ids # Slowest run (500ms)
# Verify that filtered runs are among the slower runs
# At least one should be from the slowest 3
slowest_3_ids = set(run_ids[-3:])
assert len(filtered_ids & slowest_3_ids) >= 2, "Expected at least 2 of the slowest 3 runs"
@pytest.mark.asyncio
async def test_list_runs_by_duration_with_order_by(server: SyncServer, sarah_agent, default_user):
"""Test listing runs filtered by duration with different order_by options."""
import asyncio
# Create runs with different durations
runs = []
for i, duration_ms in enumerate([100, 200, 300]):
run = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"index": i}),
actor=default_user,
)
await asyncio.sleep(duration_ms / 1000.0)
await server.run_manager.update_run_by_id_async(
run.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
runs.append(run)
# Test order_by="duration" with ascending order
filtered_runs_asc = await server.run_manager.list_runs(
actor=default_user,
agent_id=sarah_agent.id,
order_by="duration",
ascending=True,
)
# Should be ordered from fastest to slowest
assert len(filtered_runs_asc) >= 3
# Get metrics to verify ordering
metrics_asc = []
for run in filtered_runs_asc[:3]:
metrics = await server.run_manager.get_run_metrics_async(run_id=run.id, actor=default_user)
metrics_asc.append(metrics.run_ns)
# Verify ascending order
assert metrics_asc[0] <= metrics_asc[1] <= metrics_asc[2]
# Test order_by="duration" with descending order (default)
filtered_runs_desc = await server.run_manager.list_runs(
actor=default_user,
agent_id=sarah_agent.id,
order_by="duration",
ascending=False,
)
# Should be ordered from slowest to fastest
assert len(filtered_runs_desc) >= 3
# Get metrics to verify ordering
metrics_desc = []
for run in filtered_runs_desc[:3]:
metrics = await server.run_manager.get_run_metrics_async(run_id=run.id, actor=default_user)
metrics_desc.append(metrics.run_ns)
# Verify descending order
assert metrics_desc[0] >= metrics_desc[1] >= metrics_desc[2]
@pytest.mark.asyncio
async def test_list_runs_combined_duration_filter_and_percentile(server: SyncServer, sarah_agent, default_user):
"""Test combining duration filter with percentile filter."""
import asyncio
# Create runs with varied durations
runs = []
for i, duration_ms in enumerate([50, 100, 150, 200, 250, 300, 350, 400]):
run = await server.run_manager.create_run(
pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"index": i}),
actor=default_user,
)
await asyncio.sleep(duration_ms / 1000.0)
await server.run_manager.update_run_by_id_async(
run.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user
)
runs.append(run)
# Filter runs that are:
# 1. In top 50% slowest (duration_percentile=50)
# 2. AND greater than 200ms (duration_filter > 200_000_000 ns)
filtered_runs = await server.run_manager.list_runs(
actor=default_user,
agent_id=sarah_agent.id,
duration_percentile=50,
duration_filter={"value": 200_000_000, "operator": "gt"},
)
# Should return runs that satisfy both conditions
assert len(filtered_runs) >= 2
# Verify all returned runs meet both criteria
for run in filtered_runs:
metrics = await server.run_manager.get_run_metrics_async(run_id=run.id, actor=default_user)
# Should be greater than 200ms
assert metrics.run_ns > 200_000_000