draft with links

This commit is contained in:
Kian Jones
2025-06-20 16:11:46 -07:00
parent 683ec13d28
commit 02909e4d0a
10 changed files with 13262 additions and 9 deletions

View File

@@ -0,0 +1,482 @@
#!/usr/bin/env python3
import json
import sys
import os
from collections import defaultdict
from datetime import datetime
def load_feature_mappings(config_file=None):
"""Load feature mappings from config file."""
if config_file is None:
# Default to feature_mappings.json in the same directory as this script
script_dir = os.path.dirname(os.path.abspath(__file__))
config_file = os.path.join(script_dir, 'feature_mappings.json')
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"Error: Could not find feature mappings config file '{config_file}'")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: Invalid JSON in feature mappings config file '{config_file}'")
sys.exit(1)
def get_support_status(passed_tests, feature_tests):
"""Determine support status for a feature category."""
if not feature_tests:
return "" # Unknown - no tests for this feature
# Filter out error tests when checking for support
non_error_tests = [test for test in feature_tests if not test.endswith('_error')]
error_tests = [test for test in feature_tests if test.endswith('_error')]
# Check which non-error tests passed
passed_non_error_tests = [test for test in non_error_tests if test in passed_tests]
# If there are no non-error tests, only error tests, treat as unknown
if not non_error_tests:
return "" # Only error tests available
# Support is based only on non-error tests
if len(passed_non_error_tests) == len(non_error_tests):
return "" # Full support
elif len(passed_non_error_tests) == 0:
return "" # No support
else:
return "⚠️" # Partial support
def categorize_tests(all_test_names, feature_mapping):
"""Categorize test names into feature buckets."""
categorized = {feature: [] for feature in feature_mapping.keys()}
for test_name in all_test_names:
for feature, test_patterns in feature_mapping.items():
if test_name in test_patterns:
categorized[feature].append(test_name)
break
return categorized
def calculate_support_score(feature_support, feature_order):
"""Calculate a numeric support score for ranking models.
For partial support, the score is weighted by the position of the feature
in the feature_order list (earlier features get higher weight).
"""
score = 0
max_features = len(feature_order)
for feature, status in feature_support.items():
# Get position weight (earlier features get higher weight)
if feature in feature_order:
position_weight = (max_features - feature_order.index(feature)) / max_features
else:
position_weight = 0.5 # Default weight for unmapped features
if status == "": # Full support
score += 10 * position_weight
elif status == "⚠️": # Partial support - weighted by column position
score += 5 * position_weight
elif status == "": # No support
score += 1 * position_weight
# Unknown (❓) gets 0 points
return score
def calculate_provider_support_score(models_data, feature_order):
"""Calculate a provider-level support score based on all models' support scores."""
if not models_data:
return 0
# Calculate the average support score across all models in the provider
total_score = sum(model['support_score'] for model in models_data)
return total_score / len(models_data)
def get_test_function_line_numbers(test_file_path):
"""Extract line numbers for test functions from the test file."""
test_line_numbers = {}
try:
with open(test_file_path, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
if 'def test_' in line and line.strip().startswith('def test_'):
# Extract function name
func_name = line.strip().split('def ')[1].split('(')[0]
test_line_numbers[func_name] = i
except FileNotFoundError:
print(f"Warning: Could not find test file at {test_file_path}")
return test_line_numbers
def get_github_repo_info():
"""Get GitHub repository information from git remote."""
try:
# Try to get the GitHub repo URL from git remote
import subprocess
result = subprocess.run(['git', 'remote', 'get-url', 'origin'],
capture_output=True, text=True, cwd=os.path.dirname(__file__))
if result.returncode == 0:
remote_url = result.stdout.strip()
# Parse GitHub URL
if 'github.com' in remote_url:
if remote_url.startswith('https://'):
# https://github.com/user/repo.git -> user/repo
repo_path = remote_url.replace('https://github.com/', '').replace('.git', '')
elif remote_url.startswith('git@'):
# git@github.com:user/repo.git -> user/repo
repo_path = remote_url.split(':')[1].replace('.git', '')
else:
return None
return repo_path
except:
pass
# Default fallback
return "letta-ai/letta"
def generate_test_details(model_info, feature_mapping):
"""Generate detailed test results for a model."""
details = []
# Get test function line numbers
script_dir = os.path.dirname(os.path.abspath(__file__))
test_file_path = os.path.join(script_dir, 'model_sweep.py')
test_line_numbers = get_test_function_line_numbers(test_file_path)
# Get GitHub repo info
repo_path = get_github_repo_info()
# Get current commit hash for links
try:
import subprocess
result = subprocess.run(['git', 'rev-parse', 'HEAD'],
capture_output=True, text=True, cwd=script_dir)
commit_hash = result.stdout.strip() if result.returncode == 0 else 'main'
except:
commit_hash = 'main'
for feature, tests in model_info['categorized_tests'].items():
if not tests:
continue
details.append(f"### {feature}")
details.append("")
for test in sorted(tests):
if test in model_info['passed_tests']:
status = ""
elif test in model_info['failed_tests']:
status = ""
else:
status = ""
# Create GitHub link if we have line number info
if test in test_line_numbers and repo_path:
line_num = test_line_numbers[test]
github_link = f"https://github.com/{repo_path}/blob/{commit_hash}/.github/scripts/model-sweep/model_sweep.py#L{line_num}"
details.append(f"- {status} [`{test}`]({github_link})")
else:
details.append(f"- {status} `{test}`")
details.append("")
return details
def calculate_column_widths(all_provider_data, feature_mapping):
"""Calculate the maximum width needed for each column across all providers."""
widths = {
'model': len('Model'),
'context_window': len('Context Window'),
'last_scanned': len('Last Scanned')
}
# Feature column widths
for feature in feature_mapping.keys():
widths[feature] = len(feature)
# Check all model data for maximum widths
for provider_data in all_provider_data.values():
for model_info in provider_data:
# Model name width (including backticks)
model_width = len(f"`{model_info['name']}`")
widths['model'] = max(widths['model'], model_width)
# Context window width (with commas)
context_width = len(f"{model_info['context_window']:,}")
widths['context_window'] = max(widths['context_window'], context_width)
# Last scanned width
widths['last_scanned'] = max(widths['last_scanned'], len(str(model_info['last_scanned'])))
# Feature support symbols are always 2 chars, so no need to check
return widths
def process_model_sweep_report(input_file, output_file, config_file=None, debug=False):
"""Convert model sweep JSON data to MDX report."""
# Load feature mappings from config file
feature_mapping = load_feature_mappings(config_file)
# if debug:
# print("DEBUG: Feature mappings loaded:")
# for feature, tests in feature_mapping.items():
# print(f" {feature}: {tests}")
# print()
# Read the JSON data
with open(input_file, 'r') as f:
data = json.load(f)
tests = data.get('tests', [])
# if debug:
# print("DEBUG: Tests loaded:")
# print([test['outcome'] for test in tests if 'haiku' in test['nodeid']])
# Calculate summary statistics
providers = set(test['metadata']['llm_config']['provider_name'] for test in tests)
models = set(test['metadata']['llm_config']['model'] for test in tests)
total_tests = len(tests)
# Start building the MDX
mdx_lines = [
"---",
"title: Support Models",
f"generated: {datetime.now().isoformat()}",
"---",
"",
"# Supported Models",
"",
"## Overview",
"",
"Letta routinely runs automated scans against available providers and models. These are the results of the latest scan.",
"",
f"Ran {total_tests} tests against {len(models)} models across {len(providers)} providers on {datetime.now().strftime('%B %dth, %Y')}",
"",
""
]
# Group tests by provider
provider_groups = defaultdict(list)
for test in tests:
provider_name = test['metadata']['llm_config']['provider_name']
provider_groups[provider_name].append(test)
# Process all providers first to collect model data
all_provider_data = {}
provider_support_scores = {}
for provider_name in provider_groups.keys():
provider_tests = provider_groups[provider_name]
# Group tests by model within this provider
model_groups = defaultdict(list)
for test in provider_tests:
model_name = test['metadata']['llm_config']['model']
model_groups[model_name].append(test)
# Process all models to calculate support scores for ranking
model_data = []
for model_name in model_groups.keys():
model_tests = model_groups[model_name]
# if debug:
# print(f"DEBUG: Processing model '{model_name}' in provider '{provider_name}'")
# Extract unique test names for passed and failed tests
passed_tests = set()
failed_tests = set()
all_test_names = set()
for test in model_tests:
# Extract test name from nodeid (split on :: and [)
test_name = test['nodeid'].split('::')[1].split('[')[0]
all_test_names.add(test_name)
# if debug:
# print(f" Test name: {test_name}")
# print(f" Outcome: {test}")
if test['outcome'] == 'passed':
passed_tests.add(test_name)
elif test['outcome'] == 'failed':
failed_tests.add(test_name)
# if debug:
# print(f" All test names found: {sorted(all_test_names)}")
# print(f" Passed tests: {sorted(passed_tests)}")
# print(f" Failed tests: {sorted(failed_tests)}")
# Categorize tests into features
categorized_tests = categorize_tests(all_test_names, feature_mapping)
# if debug:
# print(f" Categorized tests:")
# for feature, tests in categorized_tests.items():
# print(f" {feature}: {tests}")
# Determine support status for each feature
feature_support = {}
for feature_name in feature_mapping.keys():
feature_support[feature_name] = get_support_status(passed_tests, categorized_tests[feature_name])
# if debug:
# print(f" Feature support:")
# for feature, status in feature_support.items():
# print(f" {feature}: {status}")
# print()
# Get context window and last scanned time
context_window = model_tests[0]['metadata']['llm_config']['context_window']
# Try to get time_last_scanned from metadata, fallback to current time
try:
last_scanned = model_tests[0]['metadata'].get('time_last_scanned',
model_tests[0]['metadata'].get('timestamp',
datetime.now().isoformat()))
# Format timestamp if it's a full ISO string
if 'T' in str(last_scanned):
last_scanned = str(last_scanned).split('T')[0] # Just the date part
except:
last_scanned = "Unknown"
# Calculate support score for ranking
feature_order = list(feature_mapping.keys())
support_score = calculate_support_score(feature_support, feature_order)
# Store model data for sorting
model_data.append({
'name': model_name,
'feature_support': feature_support,
'context_window': context_window,
'last_scanned': last_scanned,
'support_score': support_score,
'failed_tests': failed_tests,
'passed_tests': passed_tests,
'categorized_tests': categorized_tests
})
# Sort models by support score (descending) then by name (ascending)
model_data.sort(key=lambda x: (-x['support_score'], x['name']))
# Store provider data
all_provider_data[provider_name] = model_data
provider_support_scores[provider_name] = calculate_provider_support_score(model_data, list(feature_mapping.keys()))
# Calculate column widths for consistent formatting
column_widths = calculate_column_widths(all_provider_data, feature_mapping)
# Sort providers by support score (descending) then by name (ascending)
sorted_providers = sorted(
provider_support_scores.keys(),
key=lambda x: (-provider_support_scores[x], x)
)
# Generate output for each provider
for provider_name in sorted_providers:
model_data = all_provider_data[provider_name]
support_score = provider_support_scores[provider_name]
# Create dynamic headers with proper padding and centering
feature_names = list(feature_mapping.keys())
# Build header row with left-aligned first column, centered others
header_parts = [f"{'Model':<{column_widths['model']}}"]
for feature in feature_names:
header_parts.append(f"{feature:^{column_widths[feature]}}")
header_parts.extend([
f"{'Context Window':^{column_widths['context_window']}}",
f"{'Last Scanned':^{column_widths['last_scanned']}}"
])
header_row = "| " + " | ".join(header_parts) + " |"
# Build separator row with left-aligned first column, centered others
separator_parts = [f"{'-' * column_widths['model']}"]
for feature in feature_names:
separator_parts.append(f":{'-' * (column_widths[feature] - 2)}:")
separator_parts.extend([
f":{'-' * (column_widths['context_window'] - 2)}:",
f":{'-' * (column_widths['last_scanned'] - 2)}:"
])
separator_row = "|" + "|".join(separator_parts) + "|"
# Add provider section without percentage
mdx_lines.extend([
f"## {provider_name}",
"",
header_row,
separator_row
])
# Generate table rows for sorted models with proper padding
for model_info in model_data:
# Create anchor for model details
model_anchor = model_info['name'].replace('/', '_').replace(':', '_').replace('-', '_').lower()
# Build row with left-aligned first column, centered others
row_parts = [f"[`{model_info['name']}`](#{provider_name.lower().replace(' ', '_')}_{model_anchor}_details)".ljust(column_widths['model'] + 20)] # Add extra space for link syntax
for feature in feature_names:
row_parts.append(f"{model_info['feature_support'][feature]:^{column_widths[feature]}}")
row_parts.extend([
f"{model_info['context_window']:,}".center(column_widths['context_window']),
f"{model_info['last_scanned']}".center(column_widths['last_scanned'])
])
row = "| " + " | ".join(row_parts) + " |"
mdx_lines.append(row)
# Add detailed test results for each model
mdx_lines.extend(["", "### Detailed Test Results", ""])
for model_info in model_data:
model_anchor = model_info['name'].replace('/', '_').replace(':', '_').replace('-', '_').lower()
mdx_lines.append(f"#### {model_info['name']} {{#{provider_name.lower().replace(' ', '_')}_{model_anchor}_details}}")
mdx_lines.append("")
# Add test details
test_details = generate_test_details(model_info, feature_mapping)
mdx_lines.extend(test_details)
# Add spacing between providers
mdx_lines.extend(["", "---", ""])
# Write the MDX file
with open(output_file, 'w') as f:
f.write('\n'.join(mdx_lines))
print(f"Model sweep report saved to {output_file}")
def main():
input_file = "model_sweep_report.json"
output_file = "model_sweep_report.mdx"
config_file = None
debug = False
# Allow command line arguments
if len(sys.argv) > 1:
# Use the file located in the same directory as this script
script_dir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(script_dir, sys.argv[1])
if len(sys.argv) > 2:
# Use the file located in the same directory as this script
script_dir = os.path.dirname(os.path.abspath(__file__))
output_file = os.path.join(script_dir, sys.argv[2])
if len(sys.argv) > 3:
config_file = sys.argv[3]
if len(sys.argv) > 4 and sys.argv[4] == "--debug":
debug = True
try:
process_model_sweep_report(input_file, output_file, config_file, debug)
except FileNotFoundError:
print(f"Error: Could not find input file '{input_file}'")
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: Invalid JSON in file '{input_file}'")
sys.exit(1)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()