feat: add Terminal-Bench weekly regression workflow [LET-7791] (#1232)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Devansh Jain
2026-03-13 14:26:38 -07:00
committed by GitHub
parent 1712651047
commit 656895d312
7 changed files with 821 additions and 0 deletions

View File

@@ -0,0 +1,99 @@
name: Terminal-Bench Regression
on:
schedule:
- cron: "0 8 * * 1,4" # Monday + Thursday 8am UTC
workflow_dispatch:
inputs:
model:
description: "Override model (blank = run both defaults)"
default: ""
concurrency:
description: "Max concurrent tasks"
default: "4"
jobs:
regression:
runs-on: ubuntu-latest
timeout-minutes: 180
strategy:
fail-fast: false
matrix:
model: [sonnet-4.6-low, gpt-5-minimal]
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Setup Python + uv
uses: astral-sh/setup-uv@v6
- name: Install Harbor
run: uv pip install --system "harbor>=0.1.45" "litellm>=1.0.0"
- name: Configure Modal
env:
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
run: |
printf '[letta]\ntoken_id = "%s"\ntoken_secret = "%s"\nactive = true\nenvironment = "terminal-bench"\nimage_builder_version = "2025.06"\n' \
"$MODAL_TOKEN_ID" "$MODAL_TOKEN_SECRET" > ~/.modal.toml
- name: Run regression tasks
env:
LETTA_API_KEY: ${{ secrets.LETTA_API_KEY }}
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
# Build --task-name flags from regression-tasks.txt
TASK_FLAGS=""
while IFS= read -r task; do
[[ "$task" =~ ^#.*$ || -z "$task" ]] && continue
TASK_FLAGS="$TASK_FLAGS --task-name $task"
done < benchmarks/terminal_bench/regression-tasks.txt
harbor run \
--dataset terminal-bench@2.0 \
--agent-import-path benchmarks.terminal_bench.letta_code_agent:LettaCode \
--model "${{ matrix.model }}" \
--env modal \
--n-concurrent ${{ inputs.concurrency || '4' }} \
--job-name "regression-${{ matrix.model }}-$(date +%Y%m%d)" \
$TASK_FLAGS
- name: Upload results artifact
if: always()
uses: actions/upload-artifact@v4
with:
name: tb-results-${{ matrix.model }}
path: jobs/
report:
needs: regression
if: always()
runs-on: ubuntu-latest
permissions:
issues: write
contents: read
steps:
- name: Checkout
uses: actions/checkout@v6
- name: Download all result artifacts
uses: actions/download-artifact@v4
with:
path: results/
- name: Setup Python + uv
uses: astral-sh/setup-uv@v6
- name: Generate report and update GitHub Issue
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_REPOSITORY: ${{ github.repository }}
GITHUB_RUN_ID: ${{ github.run_id }}
GITHUB_SERVER_URL: ${{ github.server_url }}
run: |
python benchmarks/terminal_bench/report.py \
--results-dir results/ \
--baseline benchmarks/terminal_bench/baseline.json \
--repo "${{ github.repository }}"

View File

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1,25 @@
#!/bin/bash
set -euo pipefail
apt-get update
apt-get install -y curl git unzip
# Install Node.js (required to run the letta CLI)
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
apt-get install -y nodejs
# Install Bun (required to build letta-code from source)
curl -fsSL https://bun.sh/install | bash
export BUN_INSTALL="$HOME/.bun"
export PATH="$BUN_INSTALL/bin:$PATH"
# Build letta-code from source at a pinned branch/ref
LETTA_CODE_REPO="https://github.com/letta-ai/letta-code.git"
LETTA_CODE_REF="{{ branch | default(commit | default('main')) }}"
git clone "$LETTA_CODE_REPO" /tmp/letta-code
cd /tmp/letta-code
git checkout "$LETTA_CODE_REF"
bun install
bun run build
npm link

View File

@@ -0,0 +1,419 @@
import asyncio
import json
import logging
import os
import shlex
import tempfile
import urllib.request
from datetime import datetime
from pathlib import Path
from harbor.agents.installed.base import BaseInstalledAgent, ExecInput
from harbor.environments.base import BaseEnvironment
from harbor.models.agent.context import AgentContext
from litellm import ModelResponse, Usage, completion_cost
from litellm.types.utils import CompletionTokensDetailsWrapper, PromptTokensDetailsWrapper
logger = logging.getLogger(__name__)
# Keys tried (in order) when extracting agent ID from Letta settings JSON.
_SETTINGS_AGENT_ID_KEYS = ("agent_id", "default_agent_id", "lastAgent", "last_agent")
# Provider keywords used to select the right system prompt for the CLI.
_PROVIDER_SYSTEM_MAP = {
"source-codex": ("gpt", "o1-", "o3-"),
"source-gemini": ("gemini",),
}
_DEFAULT_SYSTEM = "source-claude"
class LettaCode(BaseInstalledAgent):
"""Run Letta Code CLI inside a harbor environment."""
def __init__(self, *args, **kwargs):
# Pop letta_code_model before passing to super (which doesn't expect it).
self._letta_code_model: str | None = kwargs.pop("letta_code_model", None)
super().__init__(*args, **kwargs)
@staticmethod
def name() -> str:
return "letta-code"
@property
def _install_agent_template_path(self) -> Path:
return Path(__file__).parent / "install-letta-code.sh.j2"
def create_run_agent_commands(self, instruction: str) -> list[ExecInput]:
# Unused — we override run() directly — but required by the ABC.
return []
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _extract_agent_id_from_events(events_text: str) -> str | None:
"""Scan JSONL *text* for the first ``agent-*`` id."""
for line in events_text.splitlines():
line = line.strip()
if not line.startswith("{"):
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
for key in ("agent_id", "session_id"):
aid = event.get(key)
if isinstance(aid, str) and aid.startswith("agent-"):
return aid
return None
@staticmethod
def _extract_agent_id_from_settings(settings_text: str) -> str | None:
"""Parse Letta ``settings.local.json`` content and return an agent id."""
if not settings_text.strip():
return None
try:
json_start = settings_text.find("{")
cleaned = settings_text[json_start:] if json_start != -1 else settings_text
obj = json.loads(cleaned)
if not isinstance(obj, dict):
return None
for key in _SETTINGS_AGENT_ID_KEYS:
val = obj.get(key)
if val:
return val
# Fallback: first value that looks like an agent id.
for val in obj.values():
if isinstance(val, str) and val.startswith("agent-"):
return val
except Exception:
pass
return None
@staticmethod
def _build_model_flags(model_name: str) -> str:
"""Return CLI flags for ``--model`` and ``--system``."""
if not model_name:
return ""
flags = f"--model {shlex.quote(model_name)} "
lower = model_name.lower()
system = _DEFAULT_SYSTEM
for sys_name, keywords in _PROVIDER_SYSTEM_MAP.items():
if any(kw in lower for kw in keywords):
system = sys_name
break
flags += f"--system {system} "
return flags
def _find_events_text(self) -> str:
"""Return events JSONL content from the local logs directory."""
logs_dir = Path(self.logs_dir)
events_files = sorted(logs_dir.glob("*.events.jsonl"))
if not events_files:
return ""
return events_files[0].read_text()
# ------------------------------------------------------------------
# Usage / cost tracking
# ------------------------------------------------------------------
@staticmethod
def _extract_usage_from_events(events_text: str) -> dict[str, int]:
"""Extract token usage from Letta Code stream-json events.
Checks two formats:
1. ``message_type == "usage_statistics"`` events (Letta streaming API)
2. Last event with ``type == "result"`` containing a ``usage`` field
"""
totals: dict[str, int] = {
"prompt_tokens": 0,
"completion_tokens": 0,
"cached_input_tokens": 0,
"cache_write_tokens": 0,
"reasoning_tokens": 0,
}
parsed_events: list[dict] = []
found_usage_stats = False
for line in events_text.splitlines():
line = line.strip()
if not line.startswith("{"):
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
parsed_events.append(event)
if event.get("message_type") == "usage_statistics":
found_usage_stats = True
for key in totals:
totals[key] += event.get(key) or 0
details = event.get("prompt_tokens_details") or {}
totals["cached_input_tokens"] += details.get("cached_tokens") or 0
details = event.get("completion_tokens_details") or {}
totals["reasoning_tokens"] += details.get("reasoning_tokens") or 0
# Fallback: last result event
if not found_usage_stats and parsed_events:
last = parsed_events[-1]
if last.get("type") == "result" and "usage" in last:
usage = last["usage"]
for key in totals:
totals[key] += usage.get(key) or 0
return totals
@staticmethod
def _calculate_cost(model_name: str, usage: dict[str, int]) -> float:
"""Calculate cost in USD using litellm's pricing data."""
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
if not model_name or (prompt_tokens == 0 and completion_tokens == 0):
return 0.0
resp = ModelResponse()
resp.usage = Usage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
prompt_tokens_details=PromptTokensDetailsWrapper(
cached_tokens=usage.get("cached_input_tokens", 0),
cache_creation_tokens=usage.get("cache_write_tokens", 0),
),
completion_tokens_details=CompletionTokensDetailsWrapper(
reasoning_tokens=usage.get("reasoning_tokens", 0),
),
)
try:
return float(completion_cost(completion_response=resp, model=model_name))
except Exception:
logger.debug(f"Could not calculate cost for model {model_name}", exc_info=True)
return 0.0
def _populate_usage(self, events_text: str, context: AgentContext) -> None:
"""Extract usage from events and populate context + write usage.json."""
model_name = self.model_name or os.environ.get("LETTA_MODEL", "").strip()
usage = self._extract_usage_from_events(events_text)
cost = self._calculate_cost(model_name, usage)
context.n_input_tokens = usage["prompt_tokens"] or None
context.n_output_tokens = usage["completion_tokens"] or None
context.cost_usd = cost if cost > 0 else None
# Write usage.json to the task directory (parent of agent logs)
usage_data: dict = {
"prompt_tokens": usage["prompt_tokens"],
"completion_tokens": usage["completion_tokens"],
"total_tokens": usage["prompt_tokens"] + usage["completion_tokens"],
"cost_usd": round(cost, 6),
}
for key in ("cached_input_tokens", "cache_write_tokens", "reasoning_tokens"):
if usage.get(key, 0) > 0:
usage_data[key] = usage[key]
try:
usage_path = Path(self.logs_dir).parent / "usage.json"
usage_path.parent.mkdir(parents=True, exist_ok=True)
with open(usage_path, "w") as f:
json.dump(usage_data, f, indent=2)
except Exception as e:
logger.warning(f"Failed to save usage.json: {e}")
# ------------------------------------------------------------------
# Harbor lifecycle hooks
# ------------------------------------------------------------------
def populate_context_post_run(self, context: AgentContext) -> None:
"""Populate agent context from downloaded logs (e.g. after timeout).
Harbor calls this when ``context.is_empty()`` returns True, which
happens when ``run()`` is cancelled by a timeout before it can
populate the context itself. Harbor's ``_maybe_download_logs``
copies the container's ``/logs/agent/`` directory to
``self.logs_dir`` first, so event files should be available here.
"""
events_text = self._find_events_text()
if not events_text.strip():
return
agent_id = self._extract_agent_id_from_events(events_text)
if agent_id:
(Path(self.logs_dir) / "letta_agent_id_recovered.txt").write_text(agent_id)
try:
self._populate_usage(events_text, context)
except Exception as e:
logger.warning(f"Failed to extract usage in populate_context_post_run: {e}")
async def setup(self, environment: BaseEnvironment) -> None:
"""Install the letta CLI inside the task container."""
await super().setup(environment)
async def run(
self,
instruction: str,
environment: BaseEnvironment,
context: AgentContext,
) -> None:
"""Invoke letta CLI inside the environment with the given instruction."""
# --- environment variables ----------------------------------------
agent_env: dict[str, str] = {}
for key in ("LETTA_API_KEY", "LETTA_BASE_URL", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"):
if key in os.environ:
agent_env[key] = os.environ[key]
# Prefer Letta Code model id (bundles reasoning config) over raw handle.
# self.model_name (litellm handle) is still used for cost calculation.
cli_model = self._letta_code_model or self.model_name or os.environ.get("LETTA_MODEL", "").strip()
if cli_model:
agent_env["LETTA_MODEL"] = cli_model
# --- build full instruction with prompt prefix ----------------------
prompt_prefix = (
"Complete the task. Do NOT ask clarification questions, you have "
"enough information to complete the task. Make sure to finish the "
"task to the best of your ability and do not stop at an intermediate step."
)
full_instruction = f"{prompt_prefix}\n\n{instruction}"
# --- upload instruction -------------------------------------------
escaped_instruction = shlex.quote(full_instruction)
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmpf:
tmpf.write(full_instruction)
local_instr_path = tmpf.name
try:
await environment.exec("bash -lc 'mkdir -p /installed-agent'", timeout_sec=None)
await environment.upload_file(local_instr_path, "/installed-agent/instruction.txt")
finally:
try:
Path(local_instr_path).unlink(missing_ok=True) # type: ignore[arg-type]
except Exception:
pass
# --- build run script ---------------------------------------------
ts = datetime.now().strftime("%Y-%m-%d__%H-%M-%S")
base = f"/logs/agent/{ts}"
model_flag = self._build_model_flags(cli_model)
run_script = (
"#!/usr/bin/env bash\n"
"set -eo pipefail\n"
"source ~/.bashrc >/dev/null 2>&1 || true\n"
"mkdir -p /logs/agent\n"
f"letta --new-agent --conv default --no-skills {model_flag}-p {escaped_instruction} "
f"--permission-mode bypassPermissions --output-format stream-json "
f"2>'{base}.stderr.log' | tee '{base}.events.jsonl'\n"
)
logs_dir = Path(self.logs_dir)
logs_dir.mkdir(parents=True, exist_ok=True)
run_script_path = logs_dir / "run_script.sh"
run_script_path.write_text(run_script)
# --- execute ------------------------------------------------------
result = None
run_error: Exception | None = None
async def _capture_settings_after_delay() -> None:
"""Snapshot agent ID from settings shortly after the agent starts.
This is a safety net for timeouts: if run() is cancelled before
reaching the post-run log collection, we still have the agent ID.
"""
try:
await asyncio.sleep(1.0)
out = await environment.exec(
"bash -lc 'cat .letta/settings.local.json 2>/dev/null || true'",
timeout_sec=None,
)
mid_agent_id = self._extract_agent_id_from_settings(out.stdout or "")
if mid_agent_id:
(logs_dir / f"letta_agent_id_{ts}.txt").write_text(mid_agent_id)
except Exception:
pass
try:
await environment.exec("bash -lc 'mkdir -p /installed-agent'", timeout_sec=None)
tmp_script_path = "/installed-agent/run-letta.sh"
await environment.upload_file(str(run_script_path), tmp_script_path)
await environment.exec(f"bash -lc 'chmod +x {tmp_script_path}'", timeout_sec=None)
asyncio.create_task(_capture_settings_after_delay())
result = await environment.exec(
f"bash -lc 'bash {tmp_script_path}'",
env=agent_env or None,
timeout_sec=None,
)
except Exception as e:
run_error = e
# --- extract agent id & export -------------------------------------
# Harbor already downloads /logs/agent/{ts}.* to self.logs_dir,
# so we only need to fetch the events in-memory for agent ID extraction.
agent_id: str | None = None
events_text: str = ""
try:
events_text = await self._download_file(environment, f"{base}.events.jsonl")
settings_text = await self._download_file(environment, ".letta/settings.local.json")
agent_id = self._extract_agent_id_from_settings(settings_text)
if not agent_id:
agent_id = self._extract_agent_id_from_events(events_text)
if agent_id:
(logs_dir / f"letta_agent_id_{ts}.txt").write_text(agent_id)
if agent_id and run_error is None:
self._export_agent(agent_id, logs_dir, ts)
except Exception:
pass
# --- usage / cost -------------------------------------------------
try:
self._populate_usage(events_text, context)
except Exception as e:
logger.warning(f"Failed to extract/save usage: {e}")
# --- populate context ---------------------------------------------
context.metadata = {
**(context.metadata or {}),
"letta_return_code": getattr(result, "return_code", None),
"letta_logs_ts": ts,
}
if run_error is not None:
raise run_error
# ------------------------------------------------------------------
# Private I/O helpers
# ------------------------------------------------------------------
@staticmethod
async def _download_file(environment: BaseEnvironment, remote_path: str) -> str:
"""Cat a file from the environment, returning '' on failure."""
try:
out = await environment.exec(
f"bash -lc 'cat \"{remote_path}\" 2>/dev/null || true'",
timeout_sec=None,
)
return out.stdout or ""
except Exception:
return ""
@staticmethod
def _export_agent(agent_id: str, logs_dir: Path, ts: str) -> None:
"""Download the ``.af`` agent export (best-effort)."""
try:
base_url = os.environ.get("LETTA_BASE_URL", "https://api.letta.com").rstrip("/")
export_url = f"{base_url}/v1/agents/{agent_id}/export"
req = urllib.request.Request(export_url, method="GET")
with urllib.request.urlopen(req, timeout=30) as resp:
agent_bytes = resp.read()
(logs_dir / f"letta_agent_export_{ts}.af").write_bytes(agent_bytes)
except Exception:
pass

View File

@@ -0,0 +1,6 @@
# Terminal-Bench regression task subset for Letta Code
# These tasks are run on a schedule to detect regressions.
# Criteria: fast (<10 min), diverse capabilities, deterministic.
# Adjust based on known Letta Code pass rates.
cancel-async-tasks

View File

@@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""Parse Harbor job results and report regressions via GitHub Issue.
Usage:
python report.py --results-dir results/ --baseline baseline.json --repo owner/repo
Expects Harbor job output structure under results-dir:
results/
tb-results-<model>/
jobs/
<job-name>/
result.json
<task-name>/
result.json # trial result with reward
verifier/
reward.txt # 0.0 or 1.0
"""
import argparse
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
def parse_job_results(results_dir: Path) -> dict[str, dict[str, bool]]:
"""Parse Harbor job results into {model: {task: passed}}."""
model_results: dict[str, dict[str, bool]] = {}
for artifact_dir in sorted(results_dir.iterdir()):
if not artifact_dir.is_dir():
continue
# Artifact name: tb-results-<model>
dir_name = artifact_dir.name
if dir_name.startswith("tb-results-"):
model = dir_name[len("tb-results-"):]
else:
model = dir_name
tasks: dict[str, bool] = {}
# Look for job directories — Harbor puts them under jobs/
jobs_dir = artifact_dir / "jobs"
if not jobs_dir.exists():
# Artifacts might be flat (just the job contents)
jobs_dir = artifact_dir
for job_dir in sorted(jobs_dir.iterdir()):
if not job_dir.is_dir():
continue
# Each subdirectory of the job is a trial (task)
for trial_dir in sorted(job_dir.iterdir()):
if not trial_dir.is_dir():
continue
# Skip non-trial dirs like config.json
task_name = trial_dir.name
# Try verifier/reward.txt first
reward_file = trial_dir / "verifier" / "reward.txt"
if reward_file.exists():
try:
reward = float(reward_file.read_text().strip())
tasks[task_name] = reward >= 1.0
continue
except (ValueError, OSError):
pass
# Fall back to result.json
result_file = trial_dir / "result.json"
if result_file.exists():
try:
result = json.loads(result_file.read_text())
reward = result.get("reward", result.get("score", 0))
tasks[task_name] = float(reward) >= 1.0
except (json.JSONDecodeError, ValueError, OSError):
tasks[task_name] = False
if tasks:
model_results[model] = tasks
return model_results
def compute_pass_rate(tasks: dict[str, bool]) -> float:
"""Compute pass rate from task results."""
if not tasks:
return 0.0
return sum(1 for v in tasks.values() if v) / len(tasks)
def load_baseline(baseline_path: Path) -> dict:
"""Load baseline.json, returning empty dict if missing or empty."""
if not baseline_path.exists():
return {}
try:
data = json.loads(baseline_path.read_text())
return data if isinstance(data, dict) and data else {}
except (json.JSONDecodeError, OSError):
return {}
def build_report(
model_results: dict[str, dict[str, bool]],
baseline: dict,
) -> tuple[str, bool]:
"""Build a markdown report and determine if there's a regression.
Returns (markdown_body, has_regression).
"""
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
lines = [
f"## Terminal-Bench Regression Report — {now}",
"",
]
has_regression = False
for model, tasks in sorted(model_results.items()):
pass_rate = compute_pass_rate(tasks)
passed = sum(1 for v in tasks.values() if v)
total = len(tasks)
# Compare to baseline
baseline_model = baseline.get(model, {})
baseline_rate = baseline_model.get("pass_rate")
baseline_tasks = baseline_model.get("tasks", {})
delta_str = ""
if baseline_rate is not None:
delta = pass_rate - baseline_rate
if delta < -0.10:
has_regression = True
delta_str = f" | **{delta:+.0%} from baseline** :red_circle:"
elif delta < 0:
delta_str = f" | {delta:+.0%} from baseline :warning:"
elif delta > 0:
delta_str = f" | {delta:+.0%} from baseline :white_check_mark:"
lines.append(f"### `{model}` — {passed}/{total} ({pass_rate:.0%}){delta_str}")
lines.append("")
lines.append("| Task | Result | Baseline |")
lines.append("|------|--------|----------|")
for task_name, passed_now in sorted(tasks.items()):
result_emoji = ":white_check_mark:" if passed_now else ":x:"
baseline_val = baseline_tasks.get(task_name)
if baseline_val is None:
baseline_str = ""
elif baseline_val:
baseline_str = ":white_check_mark:"
else:
baseline_str = ":x:"
# Flag regressions: was passing, now failing
regression_marker = ""
if baseline_val is True and not passed_now:
regression_marker = " **REGRESSION**"
has_regression = True
lines.append(f"| {task_name} | {result_emoji}{regression_marker} | {baseline_str} |")
lines.append("")
if not model_results:
lines.append("No results found. Check workflow logs.")
lines.append("")
# Add workflow link
run_url = os.environ.get("GITHUB_SERVER_URL", "https://github.com")
repo = os.environ.get("GITHUB_REPOSITORY", "")
run_id = os.environ.get("GITHUB_RUN_ID", "")
if repo and run_id:
lines.append(f"[Workflow run]({run_url}/{repo}/actions/runs/{run_id})")
lines.append("")
return "\n".join(lines), has_regression
def update_github_issue(repo: str, title: str, body: str) -> None:
"""Create or update a GitHub Issue with the given title and body.
Uses `gh` CLI which must be authenticated via GH_TOKEN env var.
"""
# Search for existing issue
result = subprocess.run(
["gh", "issue", "list", "--repo", repo, "--search", f'"{title}" in:title', "--state", "open", "--json", "number", "--limit", "1"],
capture_output=True,
text=True,
)
existing_number = None
if result.returncode == 0 and result.stdout.strip():
try:
issues = json.loads(result.stdout)
if issues:
existing_number = issues[0]["number"]
except (json.JSONDecodeError, KeyError, IndexError):
pass
if existing_number:
# Update existing issue with a comment
subprocess.run(
["gh", "issue", "comment", str(existing_number), "--repo", repo, "--body", body],
check=True,
)
print(f"Updated issue #{existing_number}")
else:
# Create new issue
result = subprocess.run(
["gh", "issue", "create", "--repo", repo, "--title", title, "--body", body, "--label", "benchmark"],
capture_output=True,
text=True,
)
if result.returncode == 0:
print(f"Created issue: {result.stdout.strip()}")
else:
# Label might not exist — retry without it
subprocess.run(
["gh", "issue", "create", "--repo", repo, "--title", title, "--body", body],
check=True,
)
def main() -> None:
parser = argparse.ArgumentParser(description="Report Terminal-Bench regression results")
parser.add_argument("--results-dir", required=True, type=Path, help="Directory with downloaded artifacts")
parser.add_argument("--baseline", required=True, type=Path, help="Path to baseline.json")
parser.add_argument("--repo", required=True, help="GitHub repo (owner/repo)")
args = parser.parse_args()
model_results = parse_job_results(args.results_dir)
baseline = load_baseline(args.baseline)
if not model_results:
print("WARNING: No results parsed from artifacts.")
print(f"Contents of {args.results_dir}:")
for p in sorted(args.results_dir.rglob("*")):
print(f" {p}")
sys.exit(1)
body, has_regression = build_report(model_results, baseline)
# Print report to stdout
print(body)
# Update GitHub Issue
gh_token = os.environ.get("GH_TOKEN")
if gh_token:
update_github_issue(
repo=args.repo,
title="Terminal-Bench Regression Tracker",
body=body,
)
else:
print("GH_TOKEN not set — skipping GitHub Issue update")
if has_regression:
print("\n:red_circle: REGRESSION DETECTED — failing workflow")
sys.exit(1)
else:
print("\nNo regressions detected.")
if __name__ == "__main__":
main()