diff --git a/fern/openapi.json b/fern/openapi.json index 424c1fd3..6f62b8e2 100644 --- a/fern/openapi.json +++ b/fern/openapi.json @@ -10518,7 +10518,7 @@ "in": "query", "required": false, "schema": { - "const": "created_at", + "enum": ["created_at", "duration"], "type": "string", "description": "Field to sort by", "default": "created_at", @@ -10569,6 +10569,99 @@ "title": "Project Id" }, "description": "Filter runs by project ID." + }, + { + "name": "duration_percentile", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "description": "Filter runs by duration percentile (1-100). Returns runs slower than this percentile.", + "title": "Duration Percentile" + }, + "description": "Filter runs by duration percentile (1-100). Returns runs slower than this percentile." + }, + { + "name": "duration_value", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "integer" + }, + { + "type": "null" + } + ], + "description": "Duration value in nanoseconds for filtering. Must be used with duration_operator.", + "title": "Duration Value" + }, + "description": "Duration value in nanoseconds for filtering. Must be used with duration_operator." + }, + { + "name": "duration_operator", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "enum": ["gt", "lt", "eq"], + "type": "string" + }, + { + "type": "null" + } + ], + "description": "Comparison operator for duration filter: 'gt' (greater than), 'lt' (less than), 'eq' (equals).", + "title": "Duration Operator" + }, + "description": "Comparison operator for duration filter: 'gt' (greater than), 'lt' (less than), 'eq' (equals)." + }, + { + "name": "start_date", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": "null" + } + ], + "description": "Filter runs created on or after this date (ISO 8601 format).", + "title": "Start Date" + }, + "description": "Filter runs created on or after this date (ISO 8601 format)." + }, + { + "name": "end_date", + "in": "query", + "required": false, + "schema": { + "anyOf": [ + { + "type": "string", + "format": "date-time" + }, + { + "type": "null" + } + ], + "description": "Filter runs created on or before this date (ISO 8601 format).", + "title": "End Date" + }, + "description": "Filter runs created on or before this date (ISO 8601 format)." } ], "responses": { diff --git a/letta/server/rest_api/routers/v1/internal_runs.py b/letta/server/rest_api/routers/v1/internal_runs.py index 7c8d8525..1e6df2d6 100644 --- a/letta/server/rest_api/routers/v1/internal_runs.py +++ b/letta/server/rest_api/routers/v1/internal_runs.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import List, Literal, Optional from fastapi import APIRouter, Depends, Query @@ -55,7 +56,7 @@ async def list_runs( order: Literal["asc", "desc"] = Query( "desc", description="Sort order for runs by creation time. 'asc' for oldest first, 'desc' for newest first" ), - order_by: Literal["created_at"] = Query("created_at", description="Field to sort by"), + order_by: Literal["created_at", "duration"] = Query("created_at", description="Field to sort by"), active: bool = Query(False, description="Filter for active runs."), ascending: bool = Query( False, @@ -63,6 +64,17 @@ async def list_runs( deprecated=True, ), project_id: Optional[str] = Query(None, description="Filter runs by project ID."), + duration_percentile: Optional[int] = Query( + None, description="Filter runs by duration percentile (1-100). Returns runs slower than this percentile." + ), + duration_value: Optional[int] = Query( + None, description="Duration value in nanoseconds for filtering. Must be used with duration_operator." + ), + duration_operator: Optional[Literal["gt", "lt", "eq"]] = Query( + None, description="Comparison operator for duration filter: 'gt' (greater than), 'lt' (less than), 'eq' (equals)." + ), + start_date: Optional[datetime] = Query(None, description="Filter runs created on or after this date (ISO 8601 format)."), + end_date: Optional[datetime] = Query(None, description="Filter runs created on or before this date (ISO 8601 format)."), headers: HeaderParams = Depends(get_headers), ): """ @@ -90,6 +102,11 @@ async def list_runs( # Convert string statuses to RunStatus enum parsed_statuses = convert_statuses_to_enum(statuses) + # Create duration filter dict if both parameters provided + duration_filter = None + if duration_value is not None and duration_operator is not None: + duration_filter = {"value": duration_value, "operator": duration_operator} + runs = await runs_manager.list_runs( actor=actor, agent_ids=agent_ids, @@ -105,5 +122,10 @@ async def list_runs( step_count_operator=step_count_operator, tools_used=tools_used, project_id=project_id, + order_by=order_by, + duration_percentile=duration_percentile, + duration_filter=duration_filter, + start_date=start_date, + end_date=end_date, ) return runs diff --git a/letta/services/run_manager.py b/letta/services/run_manager.py index 7a6b324e..5691d1ad 100644 --- a/letta/services/run_manager.py +++ b/letta/services/run_manager.py @@ -115,12 +115,22 @@ class RunManager: step_count_operator: ComparisonOperator = ComparisonOperator.EQ, tools_used: Optional[List[str]] = None, project_id: Optional[str] = None, + order_by: Literal["created_at", "duration"] = "created_at", + duration_percentile: Optional[int] = None, + duration_filter: Optional[dict] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, ) -> List[PydanticRun]: """List runs with filtering options.""" async with db_registry.async_session() as session: - from sqlalchemy import or_, select + from sqlalchemy import func, or_, select - query = select(RunModel).filter(RunModel.organization_id == actor.organization_id) + # Always join with run_metrics to get duration data + query = ( + select(RunModel, RunMetricsModel.run_ns) + .outerjoin(RunMetricsModel, RunModel.id == RunMetricsModel.id) + .filter(RunModel.organization_id == actor.organization_id) + ) # Filter by project_id if provided if project_id: @@ -148,41 +158,107 @@ class RunManager: if template_family: query = query.filter(RunModel.base_template_id == template_family) - # Filter by step_count and/or tools_used - join with run_metrics - if step_count is not None or tools_used: - query = query.join(RunMetricsModel, RunModel.id == RunMetricsModel.id) + # Filter by date range + if start_date: + query = query.filter(RunModel.created_at >= start_date) + if end_date: + query = query.filter(RunModel.created_at <= end_date) - # Filter by step_count with the specified operator - if step_count is not None: - if step_count_operator == ComparisonOperator.EQ: - query = query.filter(RunMetricsModel.num_steps == step_count) - elif step_count_operator == ComparisonOperator.GTE: - query = query.filter(RunMetricsModel.num_steps >= step_count) - elif step_count_operator == ComparisonOperator.LTE: - query = query.filter(RunMetricsModel.num_steps <= step_count) + # Filter by step_count with the specified operator + if step_count is not None: + if step_count_operator == ComparisonOperator.EQ: + query = query.filter(RunMetricsModel.num_steps == step_count) + elif step_count_operator == ComparisonOperator.GTE: + query = query.filter(RunMetricsModel.num_steps >= step_count) + elif step_count_operator == ComparisonOperator.LTE: + query = query.filter(RunMetricsModel.num_steps <= step_count) - # Filter by tools used ids - if tools_used: - from sqlalchemy import String, cast as sa_cast, type_coerce - from sqlalchemy.dialects.postgresql import ARRAY, JSONB + # Filter by tools used ids + if tools_used: + from sqlalchemy import String, cast as sa_cast, type_coerce + from sqlalchemy.dialects.postgresql import ARRAY, JSONB - # Use ?| operator to check if any tool_id exists in the array (OR logic) - jsonb_tools = sa_cast(RunMetricsModel.tools_used, JSONB) - tools_array = type_coerce(tools_used, ARRAY(String)) - query = query.filter(jsonb_tools.op("?|")(tools_array)) + # Use ?| operator to check if any tool_id exists in the array (OR logic) + jsonb_tools = sa_cast(RunMetricsModel.tools_used, JSONB) + tools_array = type_coerce(tools_used, ARRAY(String)) + query = query.filter(jsonb_tools.op("?|")(tools_array)) - # Apply pagination - from letta.services.helpers.run_manager_helper import _apply_pagination_async + # Ensure run_ns is not null when working with duration + if order_by == "duration" or duration_percentile is not None or duration_filter is not None: + query = query.filter(RunMetricsModel.run_ns.isnot(None)) - query = await _apply_pagination_async(query, before, after, session, ascending=ascending) + # Apply duration filter if requested + if duration_filter is not None: + duration_value = duration_filter.get("value") if isinstance(duration_filter, dict) else duration_filter.value + duration_operator = duration_filter.get("operator") if isinstance(duration_filter, dict) else duration_filter.operator + + if duration_operator == "gt": + query = query.filter(RunMetricsModel.run_ns > duration_value) + elif duration_operator == "lt": + query = query.filter(RunMetricsModel.run_ns < duration_value) + elif duration_operator == "eq": + query = query.filter(RunMetricsModel.run_ns == duration_value) + + # Apply duration percentile filter if requested + if duration_percentile is not None: + # Calculate the percentile threshold + percentile_query = ( + select(func.percentile_cont(duration_percentile / 100.0).within_group(RunMetricsModel.run_ns)) + .select_from(RunMetricsModel) + .join(RunModel, RunModel.id == RunMetricsModel.id) + .filter(RunModel.organization_id == actor.organization_id) + .filter(RunMetricsModel.run_ns.isnot(None)) + ) + + # Apply same filters to percentile calculation + if project_id: + percentile_query = percentile_query.filter(RunModel.project_id == project_id) + if agent_ids: + percentile_query = percentile_query.filter(RunModel.agent_id.in_(agent_ids)) + if statuses: + percentile_query = percentile_query.filter(RunModel.status.in_(statuses)) + + # Execute percentile query + percentile_result = await session.execute(percentile_query) + percentile_threshold = percentile_result.scalar() + + # Filter by percentile threshold (runs slower than the percentile) + if percentile_threshold is not None: + query = query.filter(RunMetricsModel.run_ns >= percentile_threshold) + + # Apply sorting based on order_by + if order_by == "duration": + # Sort by duration + if ascending: + query = query.order_by(RunMetricsModel.run_ns.asc()) + else: + query = query.order_by(RunMetricsModel.run_ns.desc()) + else: + # Apply pagination for created_at ordering + from letta.services.helpers.run_manager_helper import _apply_pagination_async + + query = await _apply_pagination_async(query, before, after, session, ascending=ascending) # Apply limit if limit: query = query.limit(limit) result = await session.execute(query) - runs = result.scalars().all() - return [run.to_pydantic() for run in runs] + rows = result.all() + + # Populate total_duration_ns from run_metrics.run_ns + pydantic_runs = [] + for row in rows: + run_model = row[0] + run_ns = row[1] + + pydantic_run = run_model.to_pydantic() + if run_ns is not None: + pydantic_run.total_duration_ns = run_ns + + pydantic_runs.append(pydantic_run) + + return pydantic_runs @enforce_types @raise_on_invalid_id(param_name="run_id", expected_prefix=PrimitiveType.RUN) diff --git a/tests/managers/test_run_manager.py b/tests/managers/test_run_manager.py index 9856ee1f..2ebb644c 100644 --- a/tests/managers/test_run_manager.py +++ b/tests/managers/test_run_manager.py @@ -1775,3 +1775,242 @@ async def test_list_runs_with_no_status_filter_returns_all(server: SyncServer, s assert RunStatus.completed in statuses_found assert RunStatus.failed in statuses_found assert RunStatus.cancelled in statuses_found + + +# ====================================================================================================================== +# RunManager Tests - Duration Filtering +# ====================================================================================================================== + + +@pytest.mark.asyncio +async def test_list_runs_by_duration_gt(server: SyncServer, sarah_agent, default_user): + """Test listing runs filtered by duration greater than a threshold.""" + import asyncio + + # Create runs with different durations + runs_data = [] + + # Fast run (< 100ms) + run_fast = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "fast"}), + actor=default_user, + ) + await asyncio.sleep(0.05) # 50ms + await server.run_manager.update_run_by_id_async( + run_fast.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + runs_data.append(run_fast) + + # Medium run (~150ms) + run_medium = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "medium"}), + actor=default_user, + ) + await asyncio.sleep(0.15) # 150ms + await server.run_manager.update_run_by_id_async( + run_medium.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + runs_data.append(run_medium) + + # Slow run (~250ms) + run_slow = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "slow"}), + actor=default_user, + ) + await asyncio.sleep(0.25) # 250ms + await server.run_manager.update_run_by_id_async( + run_slow.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + runs_data.append(run_slow) + + # Filter runs with duration > 100ms (100,000,000 ns) + filtered_runs = await server.run_manager.list_runs( + actor=default_user, + agent_id=sarah_agent.id, + duration_filter={"value": 100_000_000, "operator": "gt"}, + ) + + # Should return medium and slow runs + assert len(filtered_runs) >= 2 + run_ids = {run.id for run in filtered_runs} + assert run_medium.id in run_ids + assert run_slow.id in run_ids + + +@pytest.mark.asyncio +async def test_list_runs_by_duration_lt(server: SyncServer, sarah_agent, default_user): + """Test listing runs filtered by duration less than a threshold.""" + import asyncio + + # Create runs with different durations + # Fast run + run_fast = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "fast"}), + actor=default_user, + ) + await asyncio.sleep(0.05) # 50ms + await server.run_manager.update_run_by_id_async( + run_fast.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + + # Slow run + run_slow = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"speed": "slow"}), + actor=default_user, + ) + await asyncio.sleep(0.30) # 300ms + await server.run_manager.update_run_by_id_async( + run_slow.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + + # Get actual durations to set a threshold between them + fast_metrics = await server.run_manager.get_run_metrics_async(run_id=run_fast.id, actor=default_user) + slow_metrics = await server.run_manager.get_run_metrics_async(run_id=run_slow.id, actor=default_user) + + # Set threshold between the two durations + threshold = (fast_metrics.run_ns + slow_metrics.run_ns) // 2 + + # Filter runs with duration < threshold + filtered_runs = await server.run_manager.list_runs( + actor=default_user, + agent_id=sarah_agent.id, + duration_filter={"value": threshold, "operator": "lt"}, + ) + + # Should return only the fast run + assert len(filtered_runs) >= 1 + assert run_fast.id in [run.id for run in filtered_runs] + # Verify slow run is not included + assert run_slow.id not in [run.id for run in filtered_runs] + + +@pytest.mark.asyncio +async def test_list_runs_by_duration_percentile(server: SyncServer, sarah_agent, default_user): + """Test listing runs filtered by duration percentile.""" + import asyncio + + # Create runs with varied durations + run_ids = [] + durations_ms = [50, 100, 150, 200, 250, 300, 350, 400, 450, 500] + + for i, duration_ms in enumerate(durations_ms): + run = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"index": i}), + actor=default_user, + ) + await asyncio.sleep(duration_ms / 1000.0) # Convert to seconds + await server.run_manager.update_run_by_id_async( + run.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + run_ids.append(run.id) + + # Filter runs in top 20% (80th percentile) + # This should return approximately the slowest 20% of runs + filtered_runs = await server.run_manager.list_runs( + actor=default_user, + agent_id=sarah_agent.id, + duration_percentile=80, + ) + + # Should return at least 2 runs (approximately 20% of 10) + assert len(filtered_runs) >= 2 + # Verify the slowest run is definitely included + filtered_ids = {run.id for run in filtered_runs} + assert run_ids[-1] in filtered_ids # Slowest run (500ms) + + # Verify that filtered runs are among the slower runs + # At least one should be from the slowest 3 + slowest_3_ids = set(run_ids[-3:]) + assert len(filtered_ids & slowest_3_ids) >= 2, "Expected at least 2 of the slowest 3 runs" + + +@pytest.mark.asyncio +async def test_list_runs_by_duration_with_order_by(server: SyncServer, sarah_agent, default_user): + """Test listing runs filtered by duration with different order_by options.""" + import asyncio + + # Create runs with different durations + runs = [] + for i, duration_ms in enumerate([100, 200, 300]): + run = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"index": i}), + actor=default_user, + ) + await asyncio.sleep(duration_ms / 1000.0) + await server.run_manager.update_run_by_id_async( + run.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + runs.append(run) + + # Test order_by="duration" with ascending order + filtered_runs_asc = await server.run_manager.list_runs( + actor=default_user, + agent_id=sarah_agent.id, + order_by="duration", + ascending=True, + ) + + # Should be ordered from fastest to slowest + assert len(filtered_runs_asc) >= 3 + # Get metrics to verify ordering + metrics_asc = [] + for run in filtered_runs_asc[:3]: + metrics = await server.run_manager.get_run_metrics_async(run_id=run.id, actor=default_user) + metrics_asc.append(metrics.run_ns) + # Verify ascending order + assert metrics_asc[0] <= metrics_asc[1] <= metrics_asc[2] + + # Test order_by="duration" with descending order (default) + filtered_runs_desc = await server.run_manager.list_runs( + actor=default_user, + agent_id=sarah_agent.id, + order_by="duration", + ascending=False, + ) + + # Should be ordered from slowest to fastest + assert len(filtered_runs_desc) >= 3 + # Get metrics to verify ordering + metrics_desc = [] + for run in filtered_runs_desc[:3]: + metrics = await server.run_manager.get_run_metrics_async(run_id=run.id, actor=default_user) + metrics_desc.append(metrics.run_ns) + # Verify descending order + assert metrics_desc[0] >= metrics_desc[1] >= metrics_desc[2] + + +@pytest.mark.asyncio +async def test_list_runs_combined_duration_filter_and_percentile(server: SyncServer, sarah_agent, default_user): + """Test combining duration filter with percentile filter.""" + import asyncio + + # Create runs with varied durations + runs = [] + for i, duration_ms in enumerate([50, 100, 150, 200, 250, 300, 350, 400]): + run = await server.run_manager.create_run( + pydantic_run=PydanticRun(agent_id=sarah_agent.id, metadata={"index": i}), + actor=default_user, + ) + await asyncio.sleep(duration_ms / 1000.0) + await server.run_manager.update_run_by_id_async( + run.id, RunUpdate(status=RunStatus.completed, stop_reason=StopReasonType.end_turn), actor=default_user + ) + runs.append(run) + + # Filter runs that are: + # 1. In top 50% slowest (duration_percentile=50) + # 2. AND greater than 200ms (duration_filter > 200_000_000 ns) + filtered_runs = await server.run_manager.list_runs( + actor=default_user, + agent_id=sarah_agent.id, + duration_percentile=50, + duration_filter={"value": 200_000_000, "operator": "gt"}, + ) + + # Should return runs that satisfy both conditions + assert len(filtered_runs) >= 2 + # Verify all returned runs meet both criteria + for run in filtered_runs: + metrics = await server.run_manager.get_run_metrics_async(run_id=run.id, actor=default_user) + # Should be greater than 200ms + assert metrics.run_ns > 200_000_000