Files
letta-code/benchmarks/terminal_bench/letta_code_agent.py
2026-03-13 14:26:38 -07:00

420 lines
17 KiB
Python

import asyncio
import json
import logging
import os
import shlex
import tempfile
import urllib.request
from datetime import datetime
from pathlib import Path
from harbor.agents.installed.base import BaseInstalledAgent, ExecInput
from harbor.environments.base import BaseEnvironment
from harbor.models.agent.context import AgentContext
from litellm import ModelResponse, Usage, completion_cost
from litellm.types.utils import CompletionTokensDetailsWrapper, PromptTokensDetailsWrapper
logger = logging.getLogger(__name__)
# Keys tried (in order) when extracting agent ID from Letta settings JSON.
_SETTINGS_AGENT_ID_KEYS = ("agent_id", "default_agent_id", "lastAgent", "last_agent")
# Provider keywords used to select the right system prompt for the CLI.
_PROVIDER_SYSTEM_MAP = {
"source-codex": ("gpt", "o1-", "o3-"),
"source-gemini": ("gemini",),
}
_DEFAULT_SYSTEM = "source-claude"
class LettaCode(BaseInstalledAgent):
"""Run Letta Code CLI inside a harbor environment."""
def __init__(self, *args, **kwargs):
# Pop letta_code_model before passing to super (which doesn't expect it).
self._letta_code_model: str | None = kwargs.pop("letta_code_model", None)
super().__init__(*args, **kwargs)
@staticmethod
def name() -> str:
return "letta-code"
@property
def _install_agent_template_path(self) -> Path:
return Path(__file__).parent / "install-letta-code.sh.j2"
def create_run_agent_commands(self, instruction: str) -> list[ExecInput]:
# Unused — we override run() directly — but required by the ABC.
return []
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _extract_agent_id_from_events(events_text: str) -> str | None:
"""Scan JSONL *text* for the first ``agent-*`` id."""
for line in events_text.splitlines():
line = line.strip()
if not line.startswith("{"):
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
for key in ("agent_id", "session_id"):
aid = event.get(key)
if isinstance(aid, str) and aid.startswith("agent-"):
return aid
return None
@staticmethod
def _extract_agent_id_from_settings(settings_text: str) -> str | None:
"""Parse Letta ``settings.local.json`` content and return an agent id."""
if not settings_text.strip():
return None
try:
json_start = settings_text.find("{")
cleaned = settings_text[json_start:] if json_start != -1 else settings_text
obj = json.loads(cleaned)
if not isinstance(obj, dict):
return None
for key in _SETTINGS_AGENT_ID_KEYS:
val = obj.get(key)
if val:
return val
# Fallback: first value that looks like an agent id.
for val in obj.values():
if isinstance(val, str) and val.startswith("agent-"):
return val
except Exception:
pass
return None
@staticmethod
def _build_model_flags(model_name: str) -> str:
"""Return CLI flags for ``--model`` and ``--system``."""
if not model_name:
return ""
flags = f"--model {shlex.quote(model_name)} "
lower = model_name.lower()
system = _DEFAULT_SYSTEM
for sys_name, keywords in _PROVIDER_SYSTEM_MAP.items():
if any(kw in lower for kw in keywords):
system = sys_name
break
flags += f"--system {system} "
return flags
def _find_events_text(self) -> str:
"""Return events JSONL content from the local logs directory."""
logs_dir = Path(self.logs_dir)
events_files = sorted(logs_dir.glob("*.events.jsonl"))
if not events_files:
return ""
return events_files[0].read_text()
# ------------------------------------------------------------------
# Usage / cost tracking
# ------------------------------------------------------------------
@staticmethod
def _extract_usage_from_events(events_text: str) -> dict[str, int]:
"""Extract token usage from Letta Code stream-json events.
Checks two formats:
1. ``message_type == "usage_statistics"`` events (Letta streaming API)
2. Last event with ``type == "result"`` containing a ``usage`` field
"""
totals: dict[str, int] = {
"prompt_tokens": 0,
"completion_tokens": 0,
"cached_input_tokens": 0,
"cache_write_tokens": 0,
"reasoning_tokens": 0,
}
parsed_events: list[dict] = []
found_usage_stats = False
for line in events_text.splitlines():
line = line.strip()
if not line.startswith("{"):
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
parsed_events.append(event)
if event.get("message_type") == "usage_statistics":
found_usage_stats = True
for key in totals:
totals[key] += event.get(key) or 0
details = event.get("prompt_tokens_details") or {}
totals["cached_input_tokens"] += details.get("cached_tokens") or 0
details = event.get("completion_tokens_details") or {}
totals["reasoning_tokens"] += details.get("reasoning_tokens") or 0
# Fallback: last result event
if not found_usage_stats and parsed_events:
last = parsed_events[-1]
if last.get("type") == "result" and "usage" in last:
usage = last["usage"]
for key in totals:
totals[key] += usage.get(key) or 0
return totals
@staticmethod
def _calculate_cost(model_name: str, usage: dict[str, int]) -> float:
"""Calculate cost in USD using litellm's pricing data."""
prompt_tokens = usage.get("prompt_tokens", 0)
completion_tokens = usage.get("completion_tokens", 0)
if not model_name or (prompt_tokens == 0 and completion_tokens == 0):
return 0.0
resp = ModelResponse()
resp.usage = Usage(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
prompt_tokens_details=PromptTokensDetailsWrapper(
cached_tokens=usage.get("cached_input_tokens", 0),
cache_creation_tokens=usage.get("cache_write_tokens", 0),
),
completion_tokens_details=CompletionTokensDetailsWrapper(
reasoning_tokens=usage.get("reasoning_tokens", 0),
),
)
try:
return float(completion_cost(completion_response=resp, model=model_name))
except Exception:
logger.debug(f"Could not calculate cost for model {model_name}", exc_info=True)
return 0.0
def _populate_usage(self, events_text: str, context: AgentContext) -> None:
"""Extract usage from events and populate context + write usage.json."""
model_name = self.model_name or os.environ.get("LETTA_MODEL", "").strip()
usage = self._extract_usage_from_events(events_text)
cost = self._calculate_cost(model_name, usage)
context.n_input_tokens = usage["prompt_tokens"] or None
context.n_output_tokens = usage["completion_tokens"] or None
context.cost_usd = cost if cost > 0 else None
# Write usage.json to the task directory (parent of agent logs)
usage_data: dict = {
"prompt_tokens": usage["prompt_tokens"],
"completion_tokens": usage["completion_tokens"],
"total_tokens": usage["prompt_tokens"] + usage["completion_tokens"],
"cost_usd": round(cost, 6),
}
for key in ("cached_input_tokens", "cache_write_tokens", "reasoning_tokens"):
if usage.get(key, 0) > 0:
usage_data[key] = usage[key]
try:
usage_path = Path(self.logs_dir).parent / "usage.json"
usage_path.parent.mkdir(parents=True, exist_ok=True)
with open(usage_path, "w") as f:
json.dump(usage_data, f, indent=2)
except Exception as e:
logger.warning(f"Failed to save usage.json: {e}")
# ------------------------------------------------------------------
# Harbor lifecycle hooks
# ------------------------------------------------------------------
def populate_context_post_run(self, context: AgentContext) -> None:
"""Populate agent context from downloaded logs (e.g. after timeout).
Harbor calls this when ``context.is_empty()`` returns True, which
happens when ``run()`` is cancelled by a timeout before it can
populate the context itself. Harbor's ``_maybe_download_logs``
copies the container's ``/logs/agent/`` directory to
``self.logs_dir`` first, so event files should be available here.
"""
events_text = self._find_events_text()
if not events_text.strip():
return
agent_id = self._extract_agent_id_from_events(events_text)
if agent_id:
(Path(self.logs_dir) / "letta_agent_id_recovered.txt").write_text(agent_id)
try:
self._populate_usage(events_text, context)
except Exception as e:
logger.warning(f"Failed to extract usage in populate_context_post_run: {e}")
async def setup(self, environment: BaseEnvironment) -> None:
"""Install the letta CLI inside the task container."""
await super().setup(environment)
async def run(
self,
instruction: str,
environment: BaseEnvironment,
context: AgentContext,
) -> None:
"""Invoke letta CLI inside the environment with the given instruction."""
# --- environment variables ----------------------------------------
agent_env: dict[str, str] = {}
for key in ("LETTA_API_KEY", "LETTA_BASE_URL", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"):
if key in os.environ:
agent_env[key] = os.environ[key]
# Prefer Letta Code model id (bundles reasoning config) over raw handle.
# self.model_name (litellm handle) is still used for cost calculation.
cli_model = self._letta_code_model or self.model_name or os.environ.get("LETTA_MODEL", "").strip()
if cli_model:
agent_env["LETTA_MODEL"] = cli_model
# --- build full instruction with prompt prefix ----------------------
prompt_prefix = (
"Complete the task. Do NOT ask clarification questions, you have "
"enough information to complete the task. Make sure to finish the "
"task to the best of your ability and do not stop at an intermediate step."
)
full_instruction = f"{prompt_prefix}\n\n{instruction}"
# --- upload instruction -------------------------------------------
escaped_instruction = shlex.quote(full_instruction)
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmpf:
tmpf.write(full_instruction)
local_instr_path = tmpf.name
try:
await environment.exec("bash -lc 'mkdir -p /installed-agent'", timeout_sec=None)
await environment.upload_file(local_instr_path, "/installed-agent/instruction.txt")
finally:
try:
Path(local_instr_path).unlink(missing_ok=True) # type: ignore[arg-type]
except Exception:
pass
# --- build run script ---------------------------------------------
ts = datetime.now().strftime("%Y-%m-%d__%H-%M-%S")
base = f"/logs/agent/{ts}"
model_flag = self._build_model_flags(cli_model)
run_script = (
"#!/usr/bin/env bash\n"
"set -eo pipefail\n"
"source ~/.bashrc >/dev/null 2>&1 || true\n"
"mkdir -p /logs/agent\n"
f"letta --new-agent --conv default --no-skills {model_flag}-p {escaped_instruction} "
f"--permission-mode bypassPermissions --output-format stream-json "
f"2>'{base}.stderr.log' | tee '{base}.events.jsonl'\n"
)
logs_dir = Path(self.logs_dir)
logs_dir.mkdir(parents=True, exist_ok=True)
run_script_path = logs_dir / "run_script.sh"
run_script_path.write_text(run_script)
# --- execute ------------------------------------------------------
result = None
run_error: Exception | None = None
async def _capture_settings_after_delay() -> None:
"""Snapshot agent ID from settings shortly after the agent starts.
This is a safety net for timeouts: if run() is cancelled before
reaching the post-run log collection, we still have the agent ID.
"""
try:
await asyncio.sleep(1.0)
out = await environment.exec(
"bash -lc 'cat .letta/settings.local.json 2>/dev/null || true'",
timeout_sec=None,
)
mid_agent_id = self._extract_agent_id_from_settings(out.stdout or "")
if mid_agent_id:
(logs_dir / f"letta_agent_id_{ts}.txt").write_text(mid_agent_id)
except Exception:
pass
try:
await environment.exec("bash -lc 'mkdir -p /installed-agent'", timeout_sec=None)
tmp_script_path = "/installed-agent/run-letta.sh"
await environment.upload_file(str(run_script_path), tmp_script_path)
await environment.exec(f"bash -lc 'chmod +x {tmp_script_path}'", timeout_sec=None)
asyncio.create_task(_capture_settings_after_delay())
result = await environment.exec(
f"bash -lc 'bash {tmp_script_path}'",
env=agent_env or None,
timeout_sec=None,
)
except Exception as e:
run_error = e
# --- extract agent id & export -------------------------------------
# Harbor already downloads /logs/agent/{ts}.* to self.logs_dir,
# so we only need to fetch the events in-memory for agent ID extraction.
agent_id: str | None = None
events_text: str = ""
try:
events_text = await self._download_file(environment, f"{base}.events.jsonl")
settings_text = await self._download_file(environment, ".letta/settings.local.json")
agent_id = self._extract_agent_id_from_settings(settings_text)
if not agent_id:
agent_id = self._extract_agent_id_from_events(events_text)
if agent_id:
(logs_dir / f"letta_agent_id_{ts}.txt").write_text(agent_id)
if agent_id and run_error is None:
self._export_agent(agent_id, logs_dir, ts)
except Exception:
pass
# --- usage / cost -------------------------------------------------
try:
self._populate_usage(events_text, context)
except Exception as e:
logger.warning(f"Failed to extract/save usage: {e}")
# --- populate context ---------------------------------------------
context.metadata = {
**(context.metadata or {}),
"letta_return_code": getattr(result, "return_code", None),
"letta_logs_ts": ts,
}
if run_error is not None:
raise run_error
# ------------------------------------------------------------------
# Private I/O helpers
# ------------------------------------------------------------------
@staticmethod
async def _download_file(environment: BaseEnvironment, remote_path: str) -> str:
"""Cat a file from the environment, returning '' on failure."""
try:
out = await environment.exec(
f"bash -lc 'cat \"{remote_path}\" 2>/dev/null || true'",
timeout_sec=None,
)
return out.stdout or ""
except Exception:
return ""
@staticmethod
def _export_agent(agent_id: str, logs_dir: Path, ts: str) -> None:
"""Download the ``.af`` agent export (best-effort)."""
try:
base_url = os.environ.get("LETTA_BASE_URL", "https://api.letta.com").rstrip("/")
export_url = f"{base_url}/v1/agents/{agent_id}/export"
req = urllib.request.Request(export_url, method="GET")
with urllib.request.urlopen(req, timeout=30) as resp:
agent_bytes = resp.read()
(logs_dir / f"letta_agent_export_{ts}.af").write_bytes(agent_bytes)
except Exception:
pass