chore: merge oss (#3712)
This commit is contained in:
495
.github/scripts/model-sweep/generate_model_sweep_markdown.py
vendored
Normal file
495
.github/scripts/model-sweep/generate_model_sweep_markdown.py
vendored
Normal file
@@ -0,0 +1,495 @@
|
||||
#!/usr/bin/env python3
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def load_feature_mappings(config_file=None):
|
||||
"""Load feature mappings from config file."""
|
||||
if config_file is None:
|
||||
# Default to feature_mappings.json in the same directory as this script
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
config_file = os.path.join(script_dir, "feature_mappings.json")
|
||||
|
||||
try:
|
||||
with open(config_file, "r") as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Could not find feature mappings config file '{config_file}'")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: Invalid JSON in feature mappings config file '{config_file}'")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_support_status(passed_tests, feature_tests):
|
||||
"""Determine support status for a feature category."""
|
||||
if not feature_tests:
|
||||
return "❓" # Unknown - no tests for this feature
|
||||
|
||||
# Filter out error tests when checking for support
|
||||
non_error_tests = [test for test in feature_tests if not test.endswith("_error")]
|
||||
error_tests = [test for test in feature_tests if test.endswith("_error")]
|
||||
|
||||
# Check which non-error tests passed
|
||||
passed_non_error_tests = [test for test in non_error_tests if test in passed_tests]
|
||||
|
||||
# If there are no non-error tests, only error tests, treat as unknown
|
||||
if not non_error_tests:
|
||||
return "❓" # Only error tests available
|
||||
|
||||
# Support is based only on non-error tests
|
||||
if len(passed_non_error_tests) == len(non_error_tests):
|
||||
return "✅" # Full support
|
||||
elif len(passed_non_error_tests) == 0:
|
||||
return "❌" # No support
|
||||
else:
|
||||
return "⚠️" # Partial support
|
||||
|
||||
|
||||
def categorize_tests(all_test_names, feature_mapping):
|
||||
"""Categorize test names into feature buckets."""
|
||||
categorized = {feature: [] for feature in feature_mapping.keys()}
|
||||
|
||||
for test_name in all_test_names:
|
||||
for feature, test_patterns in feature_mapping.items():
|
||||
if test_name in test_patterns:
|
||||
categorized[feature].append(test_name)
|
||||
break
|
||||
|
||||
return categorized
|
||||
|
||||
|
||||
def calculate_support_score(feature_support, feature_order):
|
||||
"""Calculate a numeric support score for ranking models.
|
||||
|
||||
For partial support, the score is weighted by the position of the feature
|
||||
in the feature_order list (earlier features get higher weight).
|
||||
"""
|
||||
score = 0
|
||||
max_features = len(feature_order)
|
||||
|
||||
for feature, status in feature_support.items():
|
||||
# Get position weight (earlier features get higher weight)
|
||||
if feature in feature_order:
|
||||
position_weight = (max_features - feature_order.index(feature)) / max_features
|
||||
else:
|
||||
position_weight = 0.5 # Default weight for unmapped features
|
||||
|
||||
if status == "✅": # Full support
|
||||
score += 10 * position_weight
|
||||
elif status == "⚠️": # Partial support - weighted by column position
|
||||
score += 5 * position_weight
|
||||
elif status == "❌": # No support
|
||||
score += 1 * position_weight
|
||||
# Unknown (❓) gets 0 points
|
||||
return score
|
||||
|
||||
|
||||
def calculate_provider_support_score(models_data, feature_order):
|
||||
"""Calculate a provider-level support score based on all models' support scores."""
|
||||
if not models_data:
|
||||
return 0
|
||||
|
||||
# Calculate the average support score across all models in the provider
|
||||
total_score = sum(model["support_score"] for model in models_data)
|
||||
return total_score / len(models_data)
|
||||
|
||||
|
||||
def get_test_function_line_numbers(test_file_path):
|
||||
"""Extract line numbers for test functions from the test file."""
|
||||
test_line_numbers = {}
|
||||
|
||||
try:
|
||||
with open(test_file_path, "r") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
for i, line in enumerate(lines, 1):
|
||||
if "def test_" in line and line.strip().startswith("def test_"):
|
||||
# Extract function name
|
||||
func_name = line.strip().split("def ")[1].split("(")[0]
|
||||
test_line_numbers[func_name] = i
|
||||
except FileNotFoundError:
|
||||
print(f"Warning: Could not find test file at {test_file_path}")
|
||||
|
||||
return test_line_numbers
|
||||
|
||||
|
||||
def get_github_repo_info():
|
||||
"""Get GitHub repository information from git remote."""
|
||||
try:
|
||||
# Try to get the GitHub repo URL from git remote
|
||||
import subprocess
|
||||
|
||||
result = subprocess.run(["git", "remote", "get-url", "origin"], capture_output=True, text=True, cwd=os.path.dirname(__file__))
|
||||
if result.returncode == 0:
|
||||
remote_url = result.stdout.strip()
|
||||
# Parse GitHub URL
|
||||
if "github.com" in remote_url:
|
||||
if remote_url.startswith("https://"):
|
||||
# https://github.com/user/repo.git -> user/repo
|
||||
repo_path = remote_url.replace("https://github.com/", "").replace(".git", "")
|
||||
elif remote_url.startswith("git@"):
|
||||
# git@github.com:user/repo.git -> user/repo
|
||||
repo_path = remote_url.split(":")[1].replace(".git", "")
|
||||
else:
|
||||
return None
|
||||
return repo_path
|
||||
except:
|
||||
pass
|
||||
|
||||
# Default fallback
|
||||
return "letta-ai/letta"
|
||||
|
||||
|
||||
def generate_test_details(model_info, feature_mapping):
|
||||
"""Generate detailed test results for a model."""
|
||||
details = []
|
||||
|
||||
# Get test function line numbers
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
test_file_path = os.path.join(script_dir, "model_sweep.py")
|
||||
test_line_numbers = get_test_function_line_numbers(test_file_path)
|
||||
|
||||
# Use the main branch GitHub URL
|
||||
base_github_url = "https://github.com/letta-ai/letta/blob/main/.github/scripts/model-sweep/model_sweep.py"
|
||||
|
||||
for feature, tests in model_info["categorized_tests"].items():
|
||||
if not tests:
|
||||
continue
|
||||
|
||||
details.append(f"### {feature}")
|
||||
details.append("")
|
||||
|
||||
for test in sorted(tests):
|
||||
if test in model_info["passed_tests"]:
|
||||
status = "✅"
|
||||
elif test in model_info["failed_tests"]:
|
||||
status = "❌"
|
||||
else:
|
||||
status = "❓"
|
||||
|
||||
# Create GitHub link if we have line number info
|
||||
if test in test_line_numbers:
|
||||
line_num = test_line_numbers[test]
|
||||
github_link = f"{base_github_url}#L{line_num}"
|
||||
details.append(f"- {status} [`{test}`]({github_link})")
|
||||
else:
|
||||
details.append(f"- {status} `{test}`")
|
||||
details.append("")
|
||||
|
||||
return details
|
||||
|
||||
|
||||
def calculate_column_widths(all_provider_data, feature_mapping):
|
||||
"""Calculate the maximum width needed for each column across all providers."""
|
||||
widths = {"model": len("Model"), "context_window": len("Context Window"), "last_scanned": len("Last Scanned")}
|
||||
|
||||
# Feature column widths
|
||||
for feature in feature_mapping.keys():
|
||||
widths[feature] = len(feature)
|
||||
|
||||
# Check all model data for maximum widths
|
||||
for provider_data in all_provider_data.values():
|
||||
for model_info in provider_data:
|
||||
# Model name width (including backticks)
|
||||
model_width = len(f"`{model_info['name']}`")
|
||||
widths["model"] = max(widths["model"], model_width)
|
||||
|
||||
# Context window width (with commas)
|
||||
context_width = len(f"{model_info['context_window']:,}")
|
||||
widths["context_window"] = max(widths["context_window"], context_width)
|
||||
|
||||
# Last scanned width
|
||||
widths["last_scanned"] = max(widths["last_scanned"], len(str(model_info["last_scanned"])))
|
||||
|
||||
# Feature support symbols are always 2 chars, so no need to check
|
||||
|
||||
return widths
|
||||
|
||||
|
||||
def process_model_sweep_report(input_file, output_file, config_file=None, debug=False):
|
||||
"""Convert model sweep JSON data to MDX report."""
|
||||
|
||||
# Load feature mappings from config file
|
||||
feature_mapping = load_feature_mappings(config_file)
|
||||
|
||||
# if debug:
|
||||
# print("DEBUG: Feature mappings loaded:")
|
||||
# for feature, tests in feature_mapping.items():
|
||||
# print(f" {feature}: {tests}")
|
||||
# print()
|
||||
|
||||
# Read the JSON data
|
||||
with open(input_file, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
tests = data.get("tests", [])
|
||||
|
||||
# if debug:
|
||||
# print("DEBUG: Tests loaded:")
|
||||
# print([test['outcome'] for test in tests if 'haiku' in test['nodeid']])
|
||||
|
||||
# Calculate summary statistics
|
||||
providers = set(test["metadata"]["llm_config"]["provider_name"] for test in tests)
|
||||
models = set(test["metadata"]["llm_config"]["model"] for test in tests)
|
||||
total_tests = len(tests)
|
||||
|
||||
# Start building the MDX
|
||||
mdx_lines = [
|
||||
"---",
|
||||
"title: Support Models",
|
||||
f"generated: {datetime.now().isoformat()}",
|
||||
"---",
|
||||
"",
|
||||
"# Supported Models",
|
||||
"",
|
||||
"## Overview",
|
||||
"",
|
||||
"Letta routinely runs automated scans against available providers and models. These are the results of the latest scan.",
|
||||
"",
|
||||
f"Ran {total_tests} tests against {len(models)} models across {len(providers)} providers on {datetime.now().strftime('%B %dth, %Y')}",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
|
||||
# Group tests by provider
|
||||
provider_groups = defaultdict(list)
|
||||
for test in tests:
|
||||
provider_name = test["metadata"]["llm_config"]["provider_name"]
|
||||
provider_groups[provider_name].append(test)
|
||||
|
||||
# Process all providers first to collect model data
|
||||
all_provider_data = {}
|
||||
provider_support_scores = {}
|
||||
|
||||
for provider_name in provider_groups.keys():
|
||||
provider_tests = provider_groups[provider_name]
|
||||
|
||||
# Group tests by model within this provider
|
||||
model_groups = defaultdict(list)
|
||||
for test in provider_tests:
|
||||
model_name = test["metadata"]["llm_config"]["model"]
|
||||
model_groups[model_name].append(test)
|
||||
|
||||
# Process all models to calculate support scores for ranking
|
||||
model_data = []
|
||||
for model_name in model_groups.keys():
|
||||
model_tests = model_groups[model_name]
|
||||
|
||||
# if debug:
|
||||
# print(f"DEBUG: Processing model '{model_name}' in provider '{provider_name}'")
|
||||
|
||||
# Extract unique test names for passed and failed tests
|
||||
passed_tests = set()
|
||||
failed_tests = set()
|
||||
all_test_names = set()
|
||||
|
||||
for test in model_tests:
|
||||
# Extract test name from nodeid (split on :: and [)
|
||||
test_name = test["nodeid"].split("::")[1].split("[")[0]
|
||||
all_test_names.add(test_name)
|
||||
|
||||
# if debug:
|
||||
# print(f" Test name: {test_name}")
|
||||
# print(f" Outcome: {test}")
|
||||
if test["outcome"] == "passed":
|
||||
passed_tests.add(test_name)
|
||||
elif test["outcome"] == "failed":
|
||||
failed_tests.add(test_name)
|
||||
|
||||
# if debug:
|
||||
# print(f" All test names found: {sorted(all_test_names)}")
|
||||
# print(f" Passed tests: {sorted(passed_tests)}")
|
||||
# print(f" Failed tests: {sorted(failed_tests)}")
|
||||
|
||||
# Categorize tests into features
|
||||
categorized_tests = categorize_tests(all_test_names, feature_mapping)
|
||||
|
||||
# if debug:
|
||||
# print(f" Categorized tests:")
|
||||
# for feature, tests in categorized_tests.items():
|
||||
# print(f" {feature}: {tests}")
|
||||
|
||||
# Determine support status for each feature
|
||||
feature_support = {}
|
||||
for feature_name in feature_mapping.keys():
|
||||
feature_support[feature_name] = get_support_status(passed_tests, categorized_tests[feature_name])
|
||||
|
||||
# if debug:
|
||||
# print(f" Feature support:")
|
||||
# for feature, status in feature_support.items():
|
||||
# print(f" {feature}: {status}")
|
||||
# print()
|
||||
|
||||
# Get context window and last scanned time
|
||||
context_window = model_tests[0]["metadata"]["llm_config"]["context_window"]
|
||||
|
||||
# Try to get time_last_scanned from metadata, fallback to current time
|
||||
try:
|
||||
last_scanned = model_tests[0]["metadata"].get(
|
||||
"time_last_scanned", model_tests[0]["metadata"].get("timestamp", datetime.now().isoformat())
|
||||
)
|
||||
# Format timestamp if it's a full ISO string
|
||||
if "T" in str(last_scanned):
|
||||
last_scanned = str(last_scanned).split("T")[0] # Just the date part
|
||||
except:
|
||||
last_scanned = "Unknown"
|
||||
|
||||
# Calculate support score for ranking
|
||||
feature_order = list(feature_mapping.keys())
|
||||
support_score = calculate_support_score(feature_support, feature_order)
|
||||
|
||||
# Store model data for sorting
|
||||
model_data.append(
|
||||
{
|
||||
"name": model_name,
|
||||
"feature_support": feature_support,
|
||||
"context_window": context_window,
|
||||
"last_scanned": last_scanned,
|
||||
"support_score": support_score,
|
||||
"failed_tests": failed_tests,
|
||||
"passed_tests": passed_tests,
|
||||
"categorized_tests": categorized_tests,
|
||||
}
|
||||
)
|
||||
|
||||
# Sort models by support score (descending) then by name (ascending)
|
||||
model_data.sort(key=lambda x: (-x["support_score"], x["name"]))
|
||||
|
||||
# Store provider data
|
||||
all_provider_data[provider_name] = model_data
|
||||
provider_support_scores[provider_name] = calculate_provider_support_score(model_data, list(feature_mapping.keys()))
|
||||
|
||||
# Calculate column widths for consistent formatting (add details column)
|
||||
column_widths = calculate_column_widths(all_provider_data, feature_mapping)
|
||||
column_widths["details"] = len("Details")
|
||||
|
||||
# Sort providers by support score (descending) then by name (ascending)
|
||||
sorted_providers = sorted(provider_support_scores.keys(), key=lambda x: (-provider_support_scores[x], x))
|
||||
|
||||
# Generate tables for all providers first
|
||||
for provider_name in sorted_providers:
|
||||
model_data = all_provider_data[provider_name]
|
||||
support_score = provider_support_scores[provider_name]
|
||||
|
||||
# Create dynamic headers with proper padding and centering
|
||||
feature_names = list(feature_mapping.keys())
|
||||
|
||||
# Build header row with left-aligned first column, centered others
|
||||
header_parts = [f"{'Model':<{column_widths['model']}}"]
|
||||
for feature in feature_names:
|
||||
header_parts.append(f"{feature:^{column_widths[feature]}}")
|
||||
header_parts.extend(
|
||||
[
|
||||
f"{'Context Window':^{column_widths['context_window']}}",
|
||||
f"{'Last Scanned':^{column_widths['last_scanned']}}",
|
||||
f"{'Details':^{column_widths['details']}}",
|
||||
]
|
||||
)
|
||||
header_row = "| " + " | ".join(header_parts) + " |"
|
||||
|
||||
# Build separator row with left-aligned first column, centered others
|
||||
separator_parts = [f"{'-' * column_widths['model']}"]
|
||||
for feature in feature_names:
|
||||
separator_parts.append(f":{'-' * (column_widths[feature] - 2)}:")
|
||||
separator_parts.extend(
|
||||
[
|
||||
f":{'-' * (column_widths['context_window'] - 2)}:",
|
||||
f":{'-' * (column_widths['last_scanned'] - 2)}:",
|
||||
f":{'-' * (column_widths['details'] - 2)}:",
|
||||
]
|
||||
)
|
||||
separator_row = "|" + "|".join(separator_parts) + "|"
|
||||
|
||||
# Add provider section without percentage
|
||||
mdx_lines.extend([f"## {provider_name}", "", header_row, separator_row])
|
||||
|
||||
# Generate table rows for sorted models with proper padding
|
||||
for model_info in model_data:
|
||||
# Create anchor for model details
|
||||
model_anchor = model_info["name"].replace("/", "_").replace(":", "_").replace("-", "_").lower()
|
||||
details_anchor = f"{provider_name.lower().replace(' ', '_')}_{model_anchor}_details"
|
||||
|
||||
# Build row with left-aligned first column, centered others
|
||||
row_parts = [f"`{model_info['name']}`".ljust(column_widths["model"])]
|
||||
for feature in feature_names:
|
||||
row_parts.append(f"{model_info['feature_support'][feature]:^{column_widths[feature]}}")
|
||||
row_parts.extend(
|
||||
[
|
||||
f"{model_info['context_window']:,}".center(column_widths["context_window"]),
|
||||
f"{model_info['last_scanned']}".center(column_widths["last_scanned"]),
|
||||
f"[View](#{details_anchor})".center(column_widths["details"]),
|
||||
]
|
||||
)
|
||||
row = "| " + " | ".join(row_parts) + " |"
|
||||
mdx_lines.append(row)
|
||||
|
||||
# Add spacing between provider tables
|
||||
mdx_lines.extend(["", ""])
|
||||
|
||||
# Add detailed test results section after all tables
|
||||
mdx_lines.extend(["---", "", "# Detailed Test Results", ""])
|
||||
|
||||
for provider_name in sorted_providers:
|
||||
model_data = all_provider_data[provider_name]
|
||||
mdx_lines.extend([f"## {provider_name}", ""])
|
||||
|
||||
for model_info in model_data:
|
||||
model_anchor = model_info["name"].replace("/", "_").replace(":", "_").replace("-", "_").lower()
|
||||
details_anchor = f"{provider_name.lower().replace(' ', '_')}_{model_anchor}_details"
|
||||
mdx_lines.append(f'<a id="{details_anchor}"></a>')
|
||||
mdx_lines.append(f"### {model_info['name']}")
|
||||
mdx_lines.append("")
|
||||
|
||||
# Add test details
|
||||
test_details = generate_test_details(model_info, feature_mapping)
|
||||
mdx_lines.extend(test_details)
|
||||
|
||||
# Add spacing between providers in details section
|
||||
mdx_lines.extend(["", ""])
|
||||
|
||||
# Write the MDX file
|
||||
with open(output_file, "w") as f:
|
||||
f.write("\n".join(mdx_lines))
|
||||
|
||||
print(f"Model sweep report saved to {output_file}")
|
||||
|
||||
|
||||
def main():
|
||||
input_file = "model_sweep_report.json"
|
||||
output_file = "model_sweep_report.mdx"
|
||||
config_file = None
|
||||
debug = False
|
||||
|
||||
# Allow command line arguments
|
||||
if len(sys.argv) > 1:
|
||||
# Use the file located in the same directory as this script
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
input_file = os.path.join(script_dir, sys.argv[1])
|
||||
if len(sys.argv) > 2:
|
||||
# Use the file located in the same directory as this script
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
output_file = os.path.join(script_dir, sys.argv[2])
|
||||
if len(sys.argv) > 3:
|
||||
config_file = sys.argv[3]
|
||||
if len(sys.argv) > 4 and sys.argv[4] == "--debug":
|
||||
debug = True
|
||||
|
||||
try:
|
||||
process_model_sweep_report(input_file, output_file, config_file, debug)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Could not find input file '{input_file}'")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError:
|
||||
print(f"Error: Invalid JSON in file '{input_file}'")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user