427 lines
17 KiB
Python
427 lines
17 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import shlex
|
|
import tempfile
|
|
import urllib.request
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from harbor.agents.installed.base import BaseInstalledAgent, ExecInput
|
|
from harbor.environments.base import BaseEnvironment
|
|
from harbor.models.agent.context import AgentContext
|
|
|
|
from litellm import ModelResponse, Usage, completion_cost
|
|
from litellm.types.utils import CompletionTokensDetailsWrapper, PromptTokensDetailsWrapper
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Keys tried (in order) when extracting agent ID from Letta settings JSON.
|
|
_SETTINGS_AGENT_ID_KEYS = ("agent_id", "default_agent_id", "lastAgent", "last_agent")
|
|
|
|
# Provider keywords used to select the right system prompt for the CLI.
|
|
_PROVIDER_SYSTEM_MAP = {
|
|
"source-codex": ("gpt", "o1-", "o3-"),
|
|
"source-gemini": ("gemini",),
|
|
}
|
|
_DEFAULT_SYSTEM = "source-claude"
|
|
|
|
# Map Letta Code model handles to litellm model names for cost calculation.
|
|
_LITELLM_MODEL_MAP: dict[str, str] = {
|
|
"sonnet-4.6-xhigh": "anthropic/claude-sonnet-4-6",
|
|
"gpt-5.3-codex-xhigh": "openai/gpt-5.3-codex",
|
|
}
|
|
|
|
|
|
class LettaCode(BaseInstalledAgent):
|
|
"""Run Letta Code CLI inside a harbor environment."""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
# Pop letta_code_model before passing to super (which doesn't expect it).
|
|
self._letta_code_model: str | None = kwargs.pop("letta_code_model", None)
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@staticmethod
|
|
def name() -> str:
|
|
return "letta-code"
|
|
|
|
@property
|
|
def _install_agent_template_path(self) -> Path:
|
|
return Path(__file__).parent / "install-letta-code.sh.j2"
|
|
|
|
def create_run_agent_commands(self, instruction: str) -> list[ExecInput]:
|
|
# Unused — we override run() directly — but required by the ABC.
|
|
return []
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _extract_agent_id_from_events(events_text: str) -> str | None:
|
|
"""Scan JSONL *text* for the first ``agent-*`` id."""
|
|
for line in events_text.splitlines():
|
|
line = line.strip()
|
|
if not line.startswith("{"):
|
|
continue
|
|
try:
|
|
event = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
for key in ("agent_id", "session_id"):
|
|
aid = event.get(key)
|
|
if isinstance(aid, str) and aid.startswith("agent-"):
|
|
return aid
|
|
return None
|
|
|
|
@staticmethod
|
|
def _extract_agent_id_from_settings(settings_text: str) -> str | None:
|
|
"""Parse Letta ``settings.local.json`` content and return an agent id."""
|
|
if not settings_text.strip():
|
|
return None
|
|
try:
|
|
json_start = settings_text.find("{")
|
|
cleaned = settings_text[json_start:] if json_start != -1 else settings_text
|
|
obj = json.loads(cleaned)
|
|
if not isinstance(obj, dict):
|
|
return None
|
|
for key in _SETTINGS_AGENT_ID_KEYS:
|
|
val = obj.get(key)
|
|
if val:
|
|
return val
|
|
# Fallback: first value that looks like an agent id.
|
|
for val in obj.values():
|
|
if isinstance(val, str) and val.startswith("agent-"):
|
|
return val
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
@staticmethod
|
|
def _build_model_flags(model_name: str) -> str:
|
|
"""Return CLI flags for ``--model`` and ``--system``."""
|
|
if not model_name:
|
|
return ""
|
|
flags = f"--model {shlex.quote(model_name)} "
|
|
lower = model_name.lower()
|
|
system = _DEFAULT_SYSTEM
|
|
for sys_name, keywords in _PROVIDER_SYSTEM_MAP.items():
|
|
if any(kw in lower for kw in keywords):
|
|
system = sys_name
|
|
break
|
|
flags += f"--system {system} "
|
|
return flags
|
|
|
|
def _find_events_text(self) -> str:
|
|
"""Return events JSONL content from the local logs directory."""
|
|
logs_dir = Path(self.logs_dir)
|
|
events_files = sorted(logs_dir.glob("*.events.jsonl"))
|
|
if not events_files:
|
|
return ""
|
|
return events_files[0].read_text()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Usage / cost tracking
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
def _extract_usage_from_events(events_text: str) -> dict[str, int]:
|
|
"""Extract token usage from Letta Code stream-json events.
|
|
|
|
Checks two formats:
|
|
1. ``message_type == "usage_statistics"`` events (Letta streaming API)
|
|
2. Last event with ``type == "result"`` containing a ``usage`` field
|
|
"""
|
|
totals: dict[str, int] = {
|
|
"prompt_tokens": 0,
|
|
"completion_tokens": 0,
|
|
"cached_input_tokens": 0,
|
|
"cache_write_tokens": 0,
|
|
"reasoning_tokens": 0,
|
|
}
|
|
parsed_events: list[dict] = []
|
|
found_usage_stats = False
|
|
|
|
for line in events_text.splitlines():
|
|
line = line.strip()
|
|
if not line.startswith("{"):
|
|
continue
|
|
try:
|
|
event = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
parsed_events.append(event)
|
|
|
|
if event.get("message_type") == "usage_statistics":
|
|
found_usage_stats = True
|
|
for key in totals:
|
|
totals[key] += event.get(key) or 0
|
|
details = event.get("prompt_tokens_details") or {}
|
|
totals["cached_input_tokens"] += details.get("cached_tokens") or 0
|
|
details = event.get("completion_tokens_details") or {}
|
|
totals["reasoning_tokens"] += details.get("reasoning_tokens") or 0
|
|
|
|
# Fallback: last result event
|
|
if not found_usage_stats and parsed_events:
|
|
last = parsed_events[-1]
|
|
if last.get("type") == "result" and "usage" in last:
|
|
usage = last["usage"]
|
|
for key in totals:
|
|
totals[key] += usage.get(key) or 0
|
|
|
|
return totals
|
|
|
|
@staticmethod
|
|
def _calculate_cost(model_name: str, usage: dict[str, int]) -> float:
|
|
"""Calculate cost in USD using litellm's pricing data."""
|
|
prompt_tokens = usage.get("prompt_tokens", 0)
|
|
completion_tokens = usage.get("completion_tokens", 0)
|
|
if not model_name or (prompt_tokens == 0 and completion_tokens == 0):
|
|
return 0.0
|
|
resp = ModelResponse()
|
|
resp.usage = Usage(
|
|
prompt_tokens=prompt_tokens,
|
|
completion_tokens=completion_tokens,
|
|
total_tokens=prompt_tokens + completion_tokens,
|
|
prompt_tokens_details=PromptTokensDetailsWrapper(
|
|
cached_tokens=usage.get("cached_input_tokens", 0),
|
|
cache_creation_tokens=usage.get("cache_write_tokens", 0),
|
|
),
|
|
completion_tokens_details=CompletionTokensDetailsWrapper(
|
|
reasoning_tokens=usage.get("reasoning_tokens", 0),
|
|
),
|
|
)
|
|
try:
|
|
return float(completion_cost(completion_response=resp, model=model_name))
|
|
except Exception:
|
|
logger.debug(f"Could not calculate cost for model {model_name}", exc_info=True)
|
|
return 0.0
|
|
|
|
def _populate_usage(self, events_text: str, context: AgentContext) -> None:
|
|
"""Extract usage from events and populate context + write usage.json."""
|
|
raw_model = self.model_name or os.environ.get("LETTA_MODEL", "").strip()
|
|
litellm_model = _LITELLM_MODEL_MAP.get(raw_model, raw_model)
|
|
usage = self._extract_usage_from_events(events_text)
|
|
cost = self._calculate_cost(litellm_model, usage)
|
|
|
|
context.n_input_tokens = usage["prompt_tokens"] or None
|
|
context.n_output_tokens = usage["completion_tokens"] or None
|
|
context.cost_usd = cost if cost > 0 else None
|
|
|
|
# Write usage.json to the task directory (parent of agent logs)
|
|
usage_data: dict = {
|
|
"prompt_tokens": usage["prompt_tokens"],
|
|
"completion_tokens": usage["completion_tokens"],
|
|
"total_tokens": usage["prompt_tokens"] + usage["completion_tokens"],
|
|
"cost_usd": round(cost, 6),
|
|
}
|
|
for key in ("cached_input_tokens", "cache_write_tokens", "reasoning_tokens"):
|
|
if usage.get(key, 0) > 0:
|
|
usage_data[key] = usage[key]
|
|
|
|
try:
|
|
usage_path = Path(self.logs_dir).parent / "usage.json"
|
|
usage_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(usage_path, "w") as f:
|
|
json.dump(usage_data, f, indent=2)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save usage.json: {e}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Harbor lifecycle hooks
|
|
# ------------------------------------------------------------------
|
|
|
|
def populate_context_post_run(self, context: AgentContext) -> None:
|
|
"""Populate agent context from downloaded logs (e.g. after timeout).
|
|
|
|
Harbor calls this when ``context.is_empty()`` returns True, which
|
|
happens when ``run()`` is cancelled by a timeout before it can
|
|
populate the context itself. Harbor's ``_maybe_download_logs``
|
|
copies the container's ``/logs/agent/`` directory to
|
|
``self.logs_dir`` first, so event files should be available here.
|
|
"""
|
|
events_text = self._find_events_text()
|
|
if not events_text.strip():
|
|
return
|
|
|
|
agent_id = self._extract_agent_id_from_events(events_text)
|
|
if agent_id:
|
|
(Path(self.logs_dir) / "letta_agent_id_recovered.txt").write_text(agent_id)
|
|
|
|
try:
|
|
self._populate_usage(events_text, context)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract usage in populate_context_post_run: {e}")
|
|
|
|
async def setup(self, environment: BaseEnvironment) -> None:
|
|
"""Install the letta CLI inside the task container."""
|
|
await super().setup(environment)
|
|
|
|
async def run(
|
|
self,
|
|
instruction: str,
|
|
environment: BaseEnvironment,
|
|
context: AgentContext,
|
|
) -> None:
|
|
"""Invoke letta CLI inside the environment with the given instruction."""
|
|
|
|
# --- environment variables ----------------------------------------
|
|
agent_env: dict[str, str] = {}
|
|
for key in ("LETTA_API_KEY", "LETTA_BASE_URL", "OPENAI_API_KEY", "ANTHROPIC_API_KEY"):
|
|
if key in os.environ:
|
|
agent_env[key] = os.environ[key]
|
|
|
|
# Prefer Letta Code model id (bundles reasoning config) over raw handle.
|
|
# self.model_name (litellm handle) is still used for cost calculation.
|
|
cli_model = self._letta_code_model or self.model_name or os.environ.get("LETTA_MODEL", "").strip()
|
|
if cli_model:
|
|
agent_env["LETTA_MODEL"] = cli_model
|
|
|
|
# --- build full instruction with prompt prefix ----------------------
|
|
prompt_prefix = (
|
|
"Complete the task. Do NOT ask clarification questions, you have "
|
|
"enough information to complete the task. Make sure to finish the "
|
|
"task to the best of your ability and do not stop at an intermediate step."
|
|
)
|
|
full_instruction = f"{prompt_prefix}\n\n{instruction}"
|
|
|
|
# --- upload instruction -------------------------------------------
|
|
escaped_instruction = shlex.quote(full_instruction)
|
|
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmpf:
|
|
tmpf.write(full_instruction)
|
|
local_instr_path = tmpf.name
|
|
try:
|
|
await environment.exec("bash -lc 'mkdir -p /installed-agent'", timeout_sec=None)
|
|
await environment.upload_file(local_instr_path, "/installed-agent/instruction.txt")
|
|
finally:
|
|
try:
|
|
Path(local_instr_path).unlink(missing_ok=True) # type: ignore[arg-type]
|
|
except Exception:
|
|
pass
|
|
|
|
# --- build run script ---------------------------------------------
|
|
ts = datetime.now().strftime("%Y-%m-%d__%H-%M-%S")
|
|
base = f"/logs/agent/{ts}"
|
|
model_flag = self._build_model_flags(cli_model)
|
|
|
|
run_script = (
|
|
"#!/usr/bin/env bash\n"
|
|
"set -eo pipefail\n"
|
|
"source ~/.bashrc >/dev/null 2>&1 || true\n"
|
|
"mkdir -p /logs/agent\n"
|
|
f"letta --new-agent --conv default --no-skills {model_flag}-p {escaped_instruction} "
|
|
f"--permission-mode bypassPermissions --output-format stream-json "
|
|
f"2>'{base}.stderr.log' | tee '{base}.events.jsonl'\n"
|
|
)
|
|
|
|
logs_dir = Path(self.logs_dir)
|
|
logs_dir.mkdir(parents=True, exist_ok=True)
|
|
run_script_path = logs_dir / "run_script.sh"
|
|
run_script_path.write_text(run_script)
|
|
|
|
# --- execute ------------------------------------------------------
|
|
result = None
|
|
run_error: Exception | None = None
|
|
|
|
async def _capture_settings_after_delay() -> None:
|
|
"""Snapshot agent ID from settings shortly after the agent starts.
|
|
|
|
This is a safety net for timeouts: if run() is cancelled before
|
|
reaching the post-run log collection, we still have the agent ID.
|
|
"""
|
|
try:
|
|
await asyncio.sleep(1.0)
|
|
out = await environment.exec(
|
|
"bash -lc 'cat .letta/settings.local.json 2>/dev/null || true'",
|
|
timeout_sec=None,
|
|
)
|
|
mid_agent_id = self._extract_agent_id_from_settings(out.stdout or "")
|
|
if mid_agent_id:
|
|
(logs_dir / f"letta_agent_id_{ts}.txt").write_text(mid_agent_id)
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
await environment.exec("bash -lc 'mkdir -p /installed-agent'", timeout_sec=None)
|
|
tmp_script_path = "/installed-agent/run-letta.sh"
|
|
await environment.upload_file(str(run_script_path), tmp_script_path)
|
|
await environment.exec(f"bash -lc 'chmod +x {tmp_script_path}'", timeout_sec=None)
|
|
|
|
asyncio.create_task(_capture_settings_after_delay())
|
|
|
|
result = await environment.exec(
|
|
f"bash -lc 'bash {tmp_script_path}'",
|
|
env=agent_env or None,
|
|
timeout_sec=None,
|
|
)
|
|
except Exception as e:
|
|
run_error = e
|
|
|
|
# --- extract agent id & export -------------------------------------
|
|
# Harbor already downloads /logs/agent/{ts}.* to self.logs_dir,
|
|
# so we only need to fetch the events in-memory for agent ID extraction.
|
|
agent_id: str | None = None
|
|
events_text: str = ""
|
|
try:
|
|
events_text = await self._download_file(environment, f"{base}.events.jsonl")
|
|
|
|
settings_text = await self._download_file(environment, ".letta/settings.local.json")
|
|
agent_id = self._extract_agent_id_from_settings(settings_text)
|
|
|
|
if not agent_id:
|
|
agent_id = self._extract_agent_id_from_events(events_text)
|
|
|
|
if agent_id:
|
|
(logs_dir / f"letta_agent_id_{ts}.txt").write_text(agent_id)
|
|
|
|
if agent_id and run_error is None:
|
|
self._export_agent(agent_id, logs_dir, ts)
|
|
except Exception:
|
|
pass
|
|
|
|
# --- usage / cost -------------------------------------------------
|
|
try:
|
|
self._populate_usage(events_text, context)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract/save usage: {e}")
|
|
|
|
# --- populate context ---------------------------------------------
|
|
context.metadata = {
|
|
**(context.metadata or {}),
|
|
"letta_return_code": getattr(result, "return_code", None),
|
|
"letta_logs_ts": ts,
|
|
}
|
|
|
|
if run_error is not None:
|
|
raise run_error
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private I/O helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
@staticmethod
|
|
async def _download_file(environment: BaseEnvironment, remote_path: str) -> str:
|
|
"""Cat a file from the environment, returning '' on failure."""
|
|
try:
|
|
out = await environment.exec(
|
|
f"bash -lc 'cat \"{remote_path}\" 2>/dev/null || true'",
|
|
timeout_sec=None,
|
|
)
|
|
return out.stdout or ""
|
|
except Exception:
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _export_agent(agent_id: str, logs_dir: Path, ts: str) -> None:
|
|
"""Download the ``.af`` agent export (best-effort)."""
|
|
try:
|
|
base_url = os.environ.get("LETTA_BASE_URL", "https://api.letta.com").rstrip("/")
|
|
export_url = f"{base_url}/v1/agents/{agent_id}/export"
|
|
req = urllib.request.Request(export_url, method="GET")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
agent_bytes = resp.read()
|
|
(logs_dir / f"letta_agent_export_{ts}.af").write_bytes(agent_bytes)
|
|
except Exception:
|
|
pass
|