Files
letta-code/src/headless.ts
Charles Packer d0dd53a472 feat: queue headless integration (#1162)
Co-authored-by: Letta <noreply@letta.com>
2026-02-26 11:01:48 -08:00

3543 lines
118 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { randomUUID } from "node:crypto";
import type { Letta } from "@letta-ai/letta-client";
import { APIError } from "@letta-ai/letta-client/core/error";
import type {
AgentState,
MessageCreate,
} from "@letta-ai/letta-client/resources/agents/agents";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
import type { ApprovalResult } from "./agent/approval-execution";
import {
extractConflictDetail,
fetchRunErrorDetail,
getPreStreamErrorAction,
isApprovalPendingError,
isEmptyResponseRetryable,
isInvalidToolCallIdsError,
parseRetryAfterHeaderMs,
shouldRetryRunMetadataError,
} from "./agent/approval-recovery";
import { handleBootstrapSessionState } from "./agent/bootstrapHandler";
import { getClient } from "./agent/client";
import { setAgentContext, setConversationId } from "./agent/context";
import { createAgent } from "./agent/create";
import { handleListMessages } from "./agent/listMessagesHandler";
import { ISOLATED_BLOCK_LABELS } from "./agent/memory";
import { getStreamToolContextId, sendMessageStream } from "./agent/message";
import {
getModelPresetUpdateForAgent,
getModelUpdateArgs,
getResumeRefreshArgs,
resolveModel,
} from "./agent/model";
import { updateAgentLLMConfig, updateAgentSystemPrompt } from "./agent/modify";
import { resolveSkillSourcesSelection } from "./agent/skillSources";
import type { SkillSource } from "./agent/skills";
import { SessionStats } from "./agent/stats";
import type { ParsedCliArgs } from "./cli/args";
import {
normalizeConversationShorthandFlags,
parseCsvListFlag,
parseJsonArrayFlag,
parsePositiveIntFlag,
resolveImportFlagAlias,
} from "./cli/flagUtils";
import {
createBuffers,
type Line,
markIncompleteToolsAsCancelled,
toLines,
} from "./cli/helpers/accumulator";
import { classifyApprovals } from "./cli/helpers/approvalClassification";
import { createContextTracker } from "./cli/helpers/contextTracker";
import { formatErrorDetails } from "./cli/helpers/errorFormatter";
import {
getReflectionSettings,
type ReflectionBehavior,
type ReflectionSettings,
type ReflectionTrigger,
reflectionSettingsToLegacyMode,
} from "./cli/helpers/memoryReminder";
import {
type QueuedMessage,
setMessageQueueAdder,
} from "./cli/helpers/messageQueueBridge";
import {
type DrainStreamHook,
drainStreamWithResume,
} from "./cli/helpers/stream";
import {
validateConversationDefaultRequiresAgent,
validateFlagConflicts,
validateRegistryHandleOrThrow,
} from "./cli/startupFlagValidation";
import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants";
import { computeDiffPreviews } from "./helpers/diffPreview";
import { QueueRuntime } from "./queue/queueRuntime";
import {
mergeQueuedTurnInput,
type QueuedTurnInput,
} from "./queue/turnQueueRuntime";
import {
buildSharedReminderParts,
prependReminderPartsToContent,
} from "./reminders/engine";
import {
createSharedReminderState,
syncReminderStateFromContextTracker,
} from "./reminders/state";
import { settingsManager } from "./settings-manager";
import {
isHeadlessAutoAllowTool,
isInteractiveApprovalTool,
} from "./tools/interactivePolicy";
import {
type ExternalToolDefinition,
registerExternalTools,
setExternalToolExecutor,
} from "./tools/manager";
import type {
AutoApprovalMessage,
BootstrapSessionStateRequest,
CanUseToolControlRequest,
CanUseToolResponse,
ControlRequest,
ControlResponse,
ErrorMessage,
ListMessagesControlRequest,
MessageWire,
QueueLifecycleEvent,
RecoveryMessage,
ResultMessage,
RetryMessage,
StreamEvent,
SystemInitMessage,
} from "./types/protocol";
import {
markMilestone,
measureSinceMilestone,
reportAllMilestones,
} from "./utils/timing";
// Maximum number of times to retry a turn when the backend
// reports an `llm_api_error` stop reason. This helps smooth
// over transient LLM/backend issues without requiring the
// caller to manually resubmit the prompt.
const LLM_API_ERROR_MAX_RETRIES = 3;
// Retry config for empty response errors (Opus 4.6 SADs)
// Retry 1: same input. Retry 2: with system reminder nudge.
const EMPTY_RESPONSE_MAX_RETRIES = 2;
// Retry config for 409 "conversation busy" errors
const CONVERSATION_BUSY_MAX_RETRIES = 1; // Only retry once, fail on 2nd 409
const CONVERSATION_BUSY_RETRY_DELAY_MS = 2500; // 2.5 seconds
export type BidirectionalQueuedInput = QueuedTurnInput<
MessageCreate["content"]
>;
export function mergeBidirectionalQueuedInput(
queued: BidirectionalQueuedInput[],
): MessageCreate["content"] | null {
return mergeQueuedTurnInput(queued, {
normalizeUserContent: (content) => content,
});
}
type ReflectionOverrides = {
trigger?: ReflectionTrigger;
behavior?: ReflectionBehavior;
stepCount?: number;
};
function parseReflectionOverrides(
values: ParsedCliArgs["values"],
): ReflectionOverrides {
const triggerRaw = values["reflection-trigger"];
const behaviorRaw = values["reflection-behavior"];
const stepCountRaw = values["reflection-step-count"];
if (!triggerRaw && !behaviorRaw && !stepCountRaw) {
return {};
}
const overrides: ReflectionOverrides = {};
if (triggerRaw !== undefined) {
if (
triggerRaw !== "off" &&
triggerRaw !== "step-count" &&
triggerRaw !== "compaction-event"
) {
throw new Error(
`Invalid --reflection-trigger "${triggerRaw}". Valid values: off, step-count, compaction-event`,
);
}
overrides.trigger = triggerRaw;
}
if (behaviorRaw !== undefined) {
if (behaviorRaw !== "reminder" && behaviorRaw !== "auto-launch") {
throw new Error(
`Invalid --reflection-behavior "${behaviorRaw}". Valid values: reminder, auto-launch`,
);
}
overrides.behavior = behaviorRaw;
}
if (stepCountRaw !== undefined) {
try {
overrides.stepCount = parsePositiveIntFlag({
rawValue: stepCountRaw,
flagName: "reflection-step-count",
});
} catch {
throw new Error(
`Invalid --reflection-step-count "${stepCountRaw}". Expected a positive integer.`,
);
}
}
return overrides;
}
function hasReflectionOverrides(overrides: ReflectionOverrides): boolean {
return (
overrides.trigger !== undefined ||
overrides.behavior !== undefined ||
overrides.stepCount !== undefined
);
}
async function applyReflectionOverrides(
agentId: string,
overrides: ReflectionOverrides,
): Promise<ReflectionSettings> {
const current = getReflectionSettings();
const merged: ReflectionSettings = {
trigger: overrides.trigger ?? current.trigger,
behavior: overrides.behavior ?? current.behavior,
stepCount: overrides.stepCount ?? current.stepCount,
};
if (!hasReflectionOverrides(overrides)) {
return merged;
}
const memfsEnabled = settingsManager.isMemfsEnabled(agentId);
if (!memfsEnabled && merged.trigger === "compaction-event") {
throw new Error(
"--reflection-trigger compaction-event requires memfs enabled for this agent.",
);
}
if (
!memfsEnabled &&
merged.trigger !== "off" &&
merged.behavior === "auto-launch"
) {
throw new Error(
"--reflection-behavior auto-launch requires memfs enabled for this agent.",
);
}
try {
settingsManager.getLocalProjectSettings();
} catch {
await settingsManager.loadLocalProjectSettings();
}
const legacyMode = reflectionSettingsToLegacyMode(merged);
settingsManager.updateLocalProjectSettings({
memoryReminderInterval: legacyMode,
reflectionTrigger: merged.trigger,
reflectionBehavior: merged.behavior,
reflectionStepCount: merged.stepCount,
});
settingsManager.updateSettings({
memoryReminderInterval: legacyMode,
reflectionTrigger: merged.trigger,
reflectionBehavior: merged.behavior,
reflectionStepCount: merged.stepCount,
});
return merged;
}
export async function handleHeadlessCommand(
parsedArgs: ParsedCliArgs,
model?: string,
skillsDirectoryOverride?: string,
skillSourcesOverride?: SkillSource[],
systemInfoReminderEnabledOverride?: boolean,
) {
const { values, positionals } = parsedArgs;
// Set tool filter if provided (controls which tools are loaded)
if (values.tools !== undefined) {
const { toolFilter } = await import("./tools/filter");
toolFilter.setEnabledTools(values.tools);
}
// Set permission mode if provided (or via --yolo alias)
const permissionModeValue = values["permission-mode"];
const yoloMode = values.yolo;
if (yoloMode || permissionModeValue) {
const { permissionMode } = await import("./permissions/mode");
if (yoloMode) {
permissionMode.setMode("bypassPermissions");
} else if (permissionModeValue) {
const validModes = [
"default",
"acceptEdits",
"bypassPermissions",
"plan",
];
if (validModes.includes(permissionModeValue)) {
permissionMode.setMode(
permissionModeValue as
| "default"
| "acceptEdits"
| "bypassPermissions"
| "plan",
);
}
}
}
// Set CLI permission overrides if provided (inherited from parent agent)
if (values.allowedTools || values.disallowedTools) {
const { cliPermissions } = await import("./permissions/cli");
if (values.allowedTools) {
cliPermissions.setAllowedTools(values.allowedTools);
}
if (values.disallowedTools) {
cliPermissions.setDisallowedTools(values.disallowedTools);
}
}
// Check for input-format early - if stream-json, we don't need a prompt
const inputFormat = values["input-format"];
const isBidirectionalMode = inputFormat === "stream-json";
// If headless output is being piped and the downstream closes early (e.g.
// `| head`), Node will throw EPIPE on stdout writes. Treat this as a normal
// termination rather than crashing with a stack trace.
//
// Note: this must be registered before any `console.log` in headless mode.
process.stdout.on("error", (err: unknown) => {
const code =
typeof err === "object" && err !== null && "code" in err
? (err as { code?: unknown }).code
: undefined;
if (code === "EPIPE") {
process.exit(0);
}
// Re-throw unknown stdout errors so they surface during tests/debugging.
throw err;
});
// Get prompt from either positional args or stdin (unless in bidirectional mode)
let prompt = positionals.slice(2).join(" ");
// If no prompt provided as args, try reading from stdin (unless in bidirectional mode)
if (!prompt && !isBidirectionalMode) {
// Check if stdin is available (piped input)
if (!process.stdin.isTTY) {
const chunks: Buffer[] = [];
for await (const chunk of process.stdin) {
chunks.push(chunk);
}
prompt = Buffer.concat(chunks).toString("utf-8").trim();
}
}
if (!prompt && !isBidirectionalMode) {
console.error("Error: No prompt provided");
process.exit(1);
}
const client = await getClient();
markMilestone("HEADLESS_CLIENT_READY");
// Check for --resume flag (interactive only)
if (values.resume) {
console.error(
"Error: --resume is for interactive mode only (opens conversation selector).\n" +
"In headless mode, use:\n" +
" --continue Resume the last session (agent + conversation)\n" +
" --conversation <id> Resume a specific conversation by ID",
);
process.exit(1);
}
// --new: Create a new conversation (for concurrent sessions)
let forceNewConversation = values.new ?? false;
const fromAgentId = values["from-agent"];
// Resolve agent (same logic as interactive mode)
let agent: AgentState | null = null;
let specifiedAgentId = values.agent;
const specifiedAgentName = values.name;
let specifiedConversationId = values.conversation;
const shouldContinue = values.continue;
const forceNew = values["new-agent"];
const systemPromptPreset = values.system;
const systemCustom = values["system-custom"];
const systemAppend = values["system-append"];
const embeddingModel = values.embedding;
const memoryBlocksJson = values["memory-blocks"];
const blockValueArgs = values["block-value"];
const initBlocksRaw = values["init-blocks"];
const baseToolsRaw = values["base-tools"];
const skillsDirectory = values.skills ?? skillsDirectoryOverride;
const noSkillsFlag = values["no-skills"];
const noBundledSkillsFlag = values["no-bundled-skills"];
const skillSourcesRaw = values["skill-sources"];
const memfsFlag = values.memfs;
const noMemfsFlag = values["no-memfs"];
// Startup policy for the git-backed memory pull on session init.
// "blocking" (default): await the pull before proceeding.
// "background": fire the pull async, emit init without waiting.
// "skip": skip the pull entirely this session.
const memfsStartupRaw = values["memfs-startup"];
const memfsStartupPolicy: "blocking" | "background" | "skip" =
memfsStartupRaw === "background" || memfsStartupRaw === "skip"
? memfsStartupRaw
: "blocking";
const requestedMemoryPromptMode: "memfs" | "standard" | undefined = memfsFlag
? "memfs"
: noMemfsFlag
? "standard"
: undefined;
const shouldAutoEnableMemfsForNewAgent = !memfsFlag && !noMemfsFlag;
const fromAfFile = resolveImportFlagAlias({
importFlagValue: values.import,
fromAfFlagValue: values["from-af"],
});
const preLoadSkillsRaw = values["pre-load-skills"];
const systemInfoReminderEnabled =
systemInfoReminderEnabledOverride ?? !values["no-system-info-reminder"];
const reflectionOverrides = (() => {
try {
return parseReflectionOverrides(values);
} catch (error) {
console.error(
error instanceof Error ? `Error: ${error.message}` : String(error),
);
process.exit(1);
}
})();
const maxTurnsRaw = values["max-turns"];
const tagsRaw = values.tags;
const resolvedSkillSources = (() => {
if (skillSourcesOverride) {
return skillSourcesOverride;
}
try {
return resolveSkillSourcesSelection({
skillSourcesRaw,
noSkills: noSkillsFlag,
noBundledSkills: noBundledSkillsFlag,
});
} catch (error) {
console.error(
error instanceof Error ? `Error: ${error.message}` : String(error),
);
process.exit(1);
}
})();
const tags = parseCsvListFlag(tagsRaw);
// Parse and validate max-turns if provided
let maxTurns: number | undefined;
try {
maxTurns = parsePositiveIntFlag({
rawValue: maxTurnsRaw,
flagName: "max-turns",
});
} catch (error) {
console.error(
`Error: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
if (preLoadSkillsRaw && resolvedSkillSources.length === 0) {
console.error(
"Error: --pre-load-skills cannot be used when all skill sources are disabled.",
);
process.exit(1);
}
try {
const normalized = normalizeConversationShorthandFlags({
specifiedConversationId,
specifiedAgentId,
});
specifiedConversationId = normalized.specifiedConversationId ?? undefined;
specifiedAgentId = normalized.specifiedAgentId ?? undefined;
} catch (error) {
console.error(
error instanceof Error ? `Error: ${error.message}` : String(error),
);
process.exit(1);
}
// Validate --conv default requires --agent (unless --new-agent will create one)
try {
validateConversationDefaultRequiresAgent({
specifiedConversationId,
specifiedAgentId,
forceNew,
});
} catch (error) {
console.error(
error instanceof Error ? `Error: ${error.message}` : String(error),
);
console.error("Usage: letta --agent agent-xyz --conv default");
console.error(" or: letta --conv agent-xyz (shorthand)");
process.exit(1);
}
if (fromAgentId) {
if (!specifiedAgentId && !specifiedConversationId) {
console.error(
"Error: --from-agent requires --agent <id> or --conversation <id>.",
);
process.exit(1);
}
if (shouldContinue) {
console.error("Error: --from-agent cannot be used with --continue");
process.exit(1);
}
if (forceNew) {
console.error("Error: --from-agent cannot be used with --new-agent");
process.exit(1);
}
if (!specifiedConversationId && !forceNewConversation) {
forceNewConversation = true;
}
}
// Validate shared mutual-exclusion rules for startup flags.
try {
validateFlagConflicts({
guard: specifiedConversationId && specifiedConversationId !== "default",
checks: [
{
when: specifiedAgentId,
message: "--conversation cannot be used with --agent",
},
{
when: specifiedAgentName,
message: "--conversation cannot be used with --name",
},
{
when: forceNew,
message: "--conversation cannot be used with --new-agent",
},
{
when: fromAfFile,
message: "--conversation cannot be used with --import",
},
{
when: shouldContinue,
message: "--conversation cannot be used with --continue",
},
],
});
validateFlagConflicts({
guard: forceNewConversation,
checks: [
{
when: shouldContinue,
message: "--new cannot be used with --continue",
},
{
when: specifiedConversationId,
message: "--new cannot be used with --conversation",
},
],
});
} catch (error) {
console.error(
error instanceof Error ? `Error: ${error.message}` : String(error),
);
process.exit(1);
}
// Validate --import flag (also accepts legacy --from-af)
// Detect if it's a registry handle (e.g., @author/name) or a local file path
let isRegistryImport = false;
if (fromAfFile) {
try {
validateFlagConflicts({
guard: fromAfFile,
checks: [
{
when: specifiedAgentId,
message: "--import cannot be used with --agent",
},
{
when: specifiedAgentName,
message: "--import cannot be used with --name",
},
{
when: shouldContinue,
message: "--import cannot be used with --continue",
},
{
when: forceNew,
message: "--import cannot be used with --new-agent",
},
],
});
} catch (error) {
console.error(
error instanceof Error ? `Error: ${error.message}` : String(error),
);
process.exit(1);
}
// Check if this looks like a registry handle (@author/name)
if (fromAfFile.startsWith("@")) {
// Definitely a registry handle
isRegistryImport = true;
// Validate handle format
try {
validateRegistryHandleOrThrow(fromAfFile);
} catch {
console.error(
`Error: Invalid registry handle "${fromAfFile}". Use format: letta --import @author/agentname`,
);
process.exit(1);
}
}
}
// Validate --name flag
if (specifiedAgentName) {
if (specifiedAgentId) {
console.error("Error: --name cannot be used with --agent");
process.exit(1);
}
if (forceNew) {
console.error("Error: --name cannot be used with --new-agent");
process.exit(1);
}
}
if (initBlocksRaw && !forceNew) {
console.error(
"Error: --init-blocks can only be used together with --new to control initial memory blocks.",
);
process.exit(1);
}
const initBlocks = parseCsvListFlag(initBlocksRaw);
if (baseToolsRaw && !forceNew) {
console.error(
"Error: --base-tools can only be used together with --new to control initial base tools.",
);
process.exit(1);
}
const baseTools = parseCsvListFlag(baseToolsRaw);
// Validate system prompt options (--system and --system-custom are mutually exclusive)
if (systemPromptPreset && systemCustom) {
console.error(
"Error: --system and --system-custom are mutually exclusive. Use one or the other.",
);
process.exit(1);
}
// Parse memory blocks JSON if provided
// Supports two formats:
// - CreateBlock: { label: string, value: string, description?: string }
// - BlockReference: { blockId: string }
let memoryBlocks:
| Array<
| { label: string; value: string; description?: string }
| { blockId: string }
>
| undefined;
if (memoryBlocksJson !== undefined) {
if (!forceNew) {
console.error(
"Error: --memory-blocks can only be used together with --new to provide initial memory blocks.",
);
process.exit(1);
}
try {
memoryBlocks = parseJsonArrayFlag(memoryBlocksJson, "memory-blocks") as
| Array<{ label: string; value: string; description?: string }>
| Array<{ blockId: string }>;
// Validate each block has required fields
for (const block of memoryBlocks) {
const hasBlockId =
"blockId" in block && typeof block.blockId === "string";
const hasLabelValue =
"label" in block &&
"value" in block &&
typeof block.label === "string" &&
typeof block.value === "string";
if (!hasBlockId && !hasLabelValue) {
throw new Error(
"Each memory block must have either 'blockId' (string) or 'label' and 'value' (strings)",
);
}
}
} catch (error) {
console.error(
`Error: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
}
// Parse --block-value args (format: label=value)
let blockValues: Record<string, string> | undefined;
if (blockValueArgs && blockValueArgs.length > 0) {
if (!forceNew) {
console.error(
"Error: --block-value can only be used together with --new to set block values.",
);
process.exit(1);
}
blockValues = {};
for (const arg of blockValueArgs) {
const eqIndex = arg.indexOf("=");
if (eqIndex === -1) {
console.error(
`Error: Invalid --block-value format "${arg}". Expected format: label=value`,
);
process.exit(1);
}
const label = arg.slice(0, eqIndex);
const value = arg.slice(eqIndex + 1);
blockValues[label] = value;
}
}
// Priority 0: --conversation derives agent from conversation ID.
// "default" is a virtual agent-scoped conversation (not a retrievable conv-*).
// It requires --agent and should not hit conversations.retrieve().
if (specifiedConversationId && specifiedConversationId !== "default") {
try {
const conversation = await client.conversations.retrieve(
specifiedConversationId,
);
agent = await client.agents.retrieve(conversation.agent_id);
} catch (_error) {
console.error(`Conversation ${specifiedConversationId} not found`);
process.exit(1);
}
}
// Priority 1: Import from AgentFile template (local file or registry)
if (!agent && fromAfFile) {
let result: { agent: AgentState; skills?: string[] };
if (isRegistryImport) {
// Import from letta-ai/agent-file registry
const { importAgentFromRegistry } = await import("./agent/import");
result = await importAgentFromRegistry({
handle: fromAfFile,
modelOverride: model,
stripMessages: true,
stripSkills: false,
});
} else {
// Import from local file
const { importAgentFromFile } = await import("./agent/import");
result = await importAgentFromFile({
filePath: fromAfFile,
modelOverride: model,
stripMessages: true,
stripSkills: false,
});
}
agent = result.agent;
// Display extracted skills summary
if (result.skills && result.skills.length > 0) {
const { getAgentSkillsDir } = await import("./agent/skills");
const skillsDir = getAgentSkillsDir(agent.id);
console.log(
`📦 Extracted ${result.skills.length} skill${result.skills.length === 1 ? "" : "s"} to ${skillsDir}: ${result.skills.join(", ")}`,
);
}
}
// Priority 2: Try to use --agent specified ID
if (!agent && specifiedAgentId) {
try {
agent = await client.agents.retrieve(specifiedAgentId);
} catch (_error) {
console.error(`Agent ${specifiedAgentId} not found`);
process.exit(1);
}
}
// Priority 3: Check if --new flag was passed (skip all resume logic)
if (!agent && forceNew) {
const updateArgs = getModelUpdateArgs(model);
const createOptions = {
model,
embeddingModel,
updateArgs,
skillsDirectory,
parallelToolCalls: true,
systemPromptPreset,
systemPromptCustom: systemCustom,
systemPromptAppend: systemAppend,
memoryPromptMode: requestedMemoryPromptMode,
initBlocks,
baseTools,
memoryBlocks,
blockValues,
tags,
};
const result = await createAgent(createOptions);
agent = result.agent;
// Enable memfs by default on Letta Cloud for new agents when no explicit memfs flags are provided.
if (shouldAutoEnableMemfsForNewAgent) {
const { enableMemfsIfCloud } = await import("./agent/memoryFilesystem");
await enableMemfsIfCloud(agent.id);
}
}
// Priority 4: Try to resume from project settings (.letta/settings.local.json)
if (!agent) {
await settingsManager.loadLocalProjectSettings();
const localAgentId = settingsManager.getLocalLastAgentId(process.cwd());
if (localAgentId) {
try {
agent = await client.agents.retrieve(localAgentId);
} catch (_error) {
// Local LRU agent doesn't exist - log and continue
console.error(`Unable to locate agent ${localAgentId} in .letta/`);
}
}
}
// Priority 5: Try to reuse global LRU (covers directory-switching case)
// Do NOT restore global conversation — use default (project-scoped conversations)
if (!agent) {
const globalAgentId = settingsManager.getGlobalLastAgentId();
if (globalAgentId) {
try {
agent = await client.agents.retrieve(globalAgentId);
} catch (_error) {
// Global LRU agent doesn't exist
}
}
}
// Priority 6: --continue with no agent found → error
if (!agent && shouldContinue) {
console.error("No recent session found in .letta/ or ~/.letta.");
console.error("Run 'letta' to get started.");
process.exit(1);
}
// Priority 7: Fresh user with no LRU - create default agent
if (!agent) {
const { ensureDefaultAgents } = await import("./agent/defaults");
const defaultAgent = await ensureDefaultAgents(client);
if (defaultAgent) {
agent = defaultAgent;
}
}
// All paths should have resolved to an agent by now
if (!agent) {
console.error("No agent found. Use --new-agent to create a new agent.");
process.exit(1);
}
markMilestone("HEADLESS_AGENT_RESOLVED");
// Check if we're resuming an existing agent (not creating a new one)
const isResumingAgent = !!(
specifiedAgentId ||
shouldContinue ||
(!forceNew && !fromAfFile)
);
// If resuming, always refresh model settings from presets to keep
// preset-derived fields in sync, then apply optional command-line
// overrides (model/system prompt).
if (isResumingAgent) {
if (model) {
const modelHandle = resolveModel(model);
if (typeof modelHandle !== "string") {
console.error(`Error: Invalid model "${model}"`);
process.exit(1);
}
// Always apply model update - different model IDs can share the same
// handle but have different settings (e.g., gpt-5.2-medium vs gpt-5.2-xhigh)
const updateArgs = getModelUpdateArgs(model);
agent = await updateAgentLLMConfig(agent.id, modelHandle, updateArgs);
} else {
const presetRefresh = getModelPresetUpdateForAgent(agent);
if (presetRefresh) {
const { updateArgs: resumeRefreshUpdateArgs, needsUpdate } =
getResumeRefreshArgs(presetRefresh.updateArgs, agent);
if (needsUpdate) {
agent = await updateAgentLLMConfig(
agent.id,
presetRefresh.modelHandle,
resumeRefreshUpdateArgs,
);
}
}
}
if (systemPromptPreset) {
const result = await updateAgentSystemPrompt(
agent.id,
systemPromptPreset,
);
if (!result.success || !result.agent) {
console.error(`Failed to update system prompt: ${result.message}`);
process.exit(1);
}
agent = result.agent;
}
}
// Determine which conversation to use
let conversationId: string;
let effectiveReflectionSettings: ReflectionSettings;
const isSubagent = process.env.LETTA_CODE_AGENT_ROLE === "subagent";
// Apply memfs flags and auto-enable from server tag when local settings are missing.
// Respects memfsStartupPolicy:
// "blocking" (default) await the pull; exit on conflict.
// "background" fire pull async; session init proceeds immediately.
// "skip" skip the pull this session.
if (memfsStartupPolicy === "skip") {
// Run enable/disable logic but skip the git pull.
try {
const { applyMemfsFlags } = await import("./agent/memoryFilesystem");
await applyMemfsFlags(agent.id, memfsFlag, noMemfsFlag, {
pullOnExistingRepo: false,
agentTags: agent.tags,
});
} catch (error) {
console.error(
`Memory flags failed: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
} else if (memfsStartupPolicy === "background") {
// Fire pull async; don't block session initialisation.
const { applyMemfsFlags } = await import("./agent/memoryFilesystem");
applyMemfsFlags(agent.id, memfsFlag, noMemfsFlag, {
pullOnExistingRepo: true,
agentTags: agent.tags,
}).catch((error) => {
// Log to stderr only — the session is already live.
console.error(
`[memfs background pull] ${error instanceof Error ? error.message : String(error)}`,
);
});
} else {
// "blocking" — original behaviour.
try {
const { applyMemfsFlags } = await import("./agent/memoryFilesystem");
const memfsResult = await applyMemfsFlags(
agent.id,
memfsFlag,
noMemfsFlag,
{ pullOnExistingRepo: true, agentTags: agent.tags },
);
if (memfsResult.pullSummary?.includes("CONFLICT")) {
console.error(
"Memory has merge conflicts. Run in interactive mode to resolve.",
);
process.exit(1);
}
} catch (error) {
console.error(
`Memory git sync failed: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
}
try {
effectiveReflectionSettings = await applyReflectionOverrides(
agent.id,
reflectionOverrides,
);
} catch (error) {
console.error(
`Failed to apply sleeptime settings: ${error instanceof Error ? error.message : String(error)}`,
);
process.exit(1);
}
// Determine which blocks to isolate for the conversation
const isolatedBlockLabels: string[] =
initBlocks === undefined
? [...ISOLATED_BLOCK_LABELS]
: ISOLATED_BLOCK_LABELS.filter((label) =>
initBlocks.includes(label as string),
);
if (specifiedConversationId) {
if (specifiedConversationId === "default") {
// "default" is the agent's primary message history (no explicit conversation)
// Don't validate - just use it directly
conversationId = "default";
} else {
// User specified an explicit conversation to resume - validate it exists
try {
await client.conversations.retrieve(specifiedConversationId);
conversationId = specifiedConversationId;
} catch {
console.error(
`Error: Conversation ${specifiedConversationId} not found`,
);
process.exit(1);
}
}
} else if (shouldContinue) {
// Try to resume the last conversation for this agent
await settingsManager.loadLocalProjectSettings();
const lastSession =
settingsManager.getLocalLastSession(process.cwd()) ??
settingsManager.getGlobalLastSession();
if (lastSession && lastSession.agentId === agent.id) {
if (lastSession.conversationId === "default") {
// "default" is always valid - just use it directly
conversationId = "default";
} else {
// Verify the conversation still exists
try {
await client.conversations.retrieve(lastSession.conversationId);
conversationId = lastSession.conversationId;
} catch {
// Conversation no longer exists - error with helpful message
console.error(
`Attempting to resume conversation ${lastSession.conversationId}, but conversation was not found.`,
);
console.error(
"Resume the default conversation with 'letta -p ...', view recent conversations with 'letta --resume', or start a new conversation with 'letta -p ... --new'.",
);
process.exit(1);
}
}
} else {
// No matching session - error with helpful message
console.error("No previous session found for this agent to resume.");
console.error(
"Resume the default conversation with 'letta -p ...', or start a new conversation with 'letta -p ... --new'.",
);
process.exit(1);
}
} else if (forceNewConversation) {
// --new flag: create a new conversation (for concurrent sessions)
const conversation = await client.conversations.create({
agent_id: agent.id,
isolated_block_labels: isolatedBlockLabels,
});
conversationId = conversation.id;
} else if (isSubagent) {
// Freshly created subagents have no concurrency risk — use the default
// conversation so it's easy to inspect in the ADE.
conversationId = "default";
} else {
// Default for headless: always create a new conversation to avoid
// 409 "conversation busy" races (e.g., parent agent calling letta -p).
// Use --conv default to explicitly target the agent's
// primary conversation.
const conversation = await client.conversations.create({
agent_id: agent.id,
isolated_block_labels: isolatedBlockLabels,
});
conversationId = conversation.id;
}
markMilestone("HEADLESS_CONVERSATION_READY");
// Set conversation ID in context for tools (e.g., Skill tool) to access
setConversationId(conversationId);
// Save session (agent + conversation) to both project and global settings
// Skip for subagents - they shouldn't pollute the LRU settings
if (!isSubagent) {
await settingsManager.loadLocalProjectSettings();
settingsManager.setLocalLastSession(
{ agentId: agent.id, conversationId },
process.cwd(),
);
settingsManager.setGlobalLastSession({
agentId: agent.id,
conversationId,
});
}
// Set agent context for tools that need it (e.g., Skill tool, Task tool)
setAgentContext(agent.id, skillsDirectory, resolvedSkillSources);
// Validate output format
const outputFormat = values["output-format"] || "text";
const includePartialMessages = Boolean(values["include-partial-messages"]);
if (!["text", "json", "stream-json"].includes(outputFormat)) {
console.error(
`Error: Invalid output format "${outputFormat}". Valid formats: text, json, stream-json`,
);
process.exit(1);
}
if (inputFormat && inputFormat !== "stream-json") {
console.error(
`Error: Invalid input format "${inputFormat}". Valid formats: stream-json`,
);
process.exit(1);
}
const { getClientToolsFromRegistry } = await import("./tools/manager");
const loadedToolNames = getClientToolsFromRegistry().map((t) => t.name);
const availableTools =
loadedToolNames.length > 0
? loadedToolNames
: agent.tools?.map((t) => t.name).filter((n): n is string => !!n) || [];
// If input-format is stream-json, use bidirectional mode
if (isBidirectionalMode) {
await runBidirectionalMode(
agent,
conversationId,
client,
outputFormat,
includePartialMessages,
availableTools,
resolvedSkillSources,
systemInfoReminderEnabled,
effectiveReflectionSettings,
);
return;
}
// Create buffers to accumulate stream (pass agent.id for server-side tool hooks)
const buffers = createBuffers(agent.id);
// Initialize session stats
const sessionStats = new SessionStats();
// Use agent.id as session_id for all stream-json messages
const sessionId = agent.id;
// Output init event for stream-json format
if (outputFormat === "stream-json") {
const initEvent: SystemInitMessage = {
type: "system",
subtype: "init",
session_id: sessionId,
agent_id: agent.id,
conversation_id: conversationId,
model: agent.llm_config?.model ?? "",
tools: availableTools,
cwd: process.cwd(),
mcp_servers: [],
permission_mode: "",
slash_commands: [],
memfs_enabled: settingsManager.isMemfsEnabled(agent.id),
skill_sources: resolvedSkillSources,
system_info_reminder_enabled: systemInfoReminderEnabled,
reflection_trigger: effectiveReflectionSettings.trigger,
reflection_behavior: effectiveReflectionSettings.behavior,
reflection_step_count: effectiveReflectionSettings.stepCount,
uuid: `init-${agent.id}`,
};
console.log(JSON.stringify(initEvent));
}
const reminderContextTracker = createContextTracker();
const sharedReminderState = createSharedReminderState();
// Helper to resolve any pending approvals before sending user input
const resolveAllPendingApprovals = async () => {
const { getResumeData } = await import("./agent/check-approval");
while (true) {
// Re-fetch agent to get latest in-context messages (source of truth for backend)
const freshAgent = await client.agents.retrieve(agent.id);
let resume: Awaited<ReturnType<typeof getResumeData>>;
try {
resume = await getResumeData(client, freshAgent, conversationId);
} catch (error) {
// Treat 404/422 as "no approvals" - stale message/conversation state
if (
error instanceof APIError &&
(error.status === 404 || error.status === 422)
) {
break;
}
throw error;
}
// Use plural field for parallel tool calls
const pendingApprovals = resume.pendingApprovals || [];
if (pendingApprovals.length === 0) break;
// Phase 1: Collect decisions for all approvals
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
matchedRule: string;
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
const { autoAllowed, autoDenied } = await classifyApprovals(
pendingApprovals,
{
alwaysRequiresUserInput: isInteractiveApprovalTool,
treatAskAsDeny: true,
denyReasonForAsk: "Tool requires approval (headless mode)",
requireArgsForAutoApprove: true,
missingNameReason: "Tool call incomplete - missing name",
},
);
const decisions: Decision[] = [
...autoAllowed.map((ac) => ({
type: "approve" as const,
approval: ac.approval,
reason: ac.permission.reason || "Allowed by permission rule",
matchedRule:
"matchedRule" in ac.permission && ac.permission.matchedRule
? ac.permission.matchedRule
: "auto-approved",
})),
...autoDenied.map((ac) => {
const fallback =
"matchedRule" in ac.permission && ac.permission.matchedRule
? `Permission denied: ${ac.permission.matchedRule}`
: ac.permission.reason
? `Permission denied: ${ac.permission.reason}`
: "Permission denied: Unknown reason";
return {
type: "deny" as const,
approval: ac.approval,
reason: ac.denyReason ?? fallback,
};
}),
];
// Phase 2: Execute approved tools and format results using shared function
const { executeApprovalBatch } = await import(
"./agent/approval-execution"
);
// Emit auto_approval events for stream-json format
if (outputFormat === "stream-json") {
for (const decision of decisions) {
if (decision.type === "approve") {
const autoApprovalMsg: AutoApprovalMessage = {
type: "auto_approval",
tool_call: {
name: decision.approval.toolName,
tool_call_id: decision.approval.toolCallId,
arguments: decision.approval.toolArgs,
},
reason: decision.reason,
matched_rule: decision.matchedRule,
session_id: sessionId,
uuid: `auto-approval-${decision.approval.toolCallId}`,
};
console.log(JSON.stringify(autoApprovalMsg));
}
}
}
const executedResults = await executeApprovalBatch(decisions);
// Send all results in one batch
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as ApprovalResult[],
};
// Inject queued skill content as user message parts (LET-7353)
const approvalMessages: Array<
| import("@letta-ai/letta-client/resources/agents/agents").MessageCreate
| import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate
> = [approvalInput];
{
const { consumeQueuedSkillContent } = await import(
"./tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
approvalMessages.push({
role: "user" as const,
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
});
}
}
// Send the approval to clear the pending state; drain the stream without output
const approvalStream = await sendMessageStream(
conversationId,
approvalMessages,
{ agentId: agent.id },
);
const drainResult = await drainStreamWithResume(
approvalStream,
createBuffers(agent.id),
() => {},
undefined,
undefined,
undefined,
reminderContextTracker,
);
// If the approval drain errored or was cancelled, abort rather than
// looping back and re-fetching approvals (which would restart the cycle).
if (
drainResult.stopReason === "error" ||
drainResult.stopReason === "cancelled"
) {
throw new Error(
`Approval drain ended with stop reason: ${drainResult.stopReason}`,
);
}
}
};
// Clear any pending approvals before starting a new turn - ONLY when resuming (LET-7101)
// For new agents/conversations, lazy recovery handles any edge cases
if (isResumingAgent) {
await resolveAllPendingApprovals();
}
// Build message content with reminders
const contentParts: MessageCreate["content"] = [];
const pushPart = (text: string) => {
if (!text) return;
contentParts.push({ type: "text", text });
};
if (fromAgentId) {
const senderAgentId = fromAgentId;
const senderAgent = await client.agents.retrieve(senderAgentId);
const systemReminder = `${SYSTEM_REMINDER_OPEN}
This message is from "${senderAgent.name}" (agent ID: ${senderAgentId}), an agent currently running inside the Letta Code CLI (docs.letta.com/letta-code).
The sender will only see the final message you generate (not tool calls or reasoning).
If you need to share detailed information, include it in your response text.
${SYSTEM_REMINDER_CLOSE}
`;
pushPart(systemReminder);
}
syncReminderStateFromContextTracker(
sharedReminderState,
reminderContextTracker,
);
const lastRunAt = (agent as { last_run_completion?: string })
.last_run_completion;
const { parts: sharedReminderParts } = await buildSharedReminderParts({
mode: isSubagent ? "subagent" : "headless-one-shot",
agent: {
id: agent.id,
name: agent.name,
description: agent.description,
lastRunAt: lastRunAt ?? null,
},
state: sharedReminderState,
sessionContextReminderEnabled: systemInfoReminderEnabled,
reflectionSettings: effectiveReflectionSettings,
skillSources: resolvedSkillSources,
resolvePlanModeReminder: async () => {
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
return PLAN_MODE_REMINDER;
},
});
for (const part of sharedReminderParts) {
pushPart(part.text);
}
// Pre-load specific skills' full content (used by subagents with skills: field)
if (preLoadSkillsRaw) {
const { readFile: readFileAsync } = await import("node:fs/promises");
const skillIds = preLoadSkillsRaw
.split(",")
.map((s) => s.trim())
.filter(Boolean);
const loadedContents: string[] = [];
for (const skillId of skillIds) {
const skillPath = sharedReminderState.skillPathById[skillId];
if (!skillPath) continue;
try {
const content = await readFileAsync(skillPath, "utf-8");
loadedContents.push(`<${skillId}>\n${content}\n</${skillId}>`);
} catch {
// Skill file not readable, skip
}
}
if (loadedContents.length > 0) {
pushPart(
`<loaded_skills>\n${loadedContents.join("\n\n")}\n</loaded_skills>`,
);
}
}
// Add user prompt
pushPart(prompt);
// Start with the user message
let currentInput: Array<MessageCreate | ApprovalCreate> = [
{
role: "user",
content: contentParts,
},
];
// Track lastRunId outside the while loop so it's available in catch block
let lastKnownRunId: string | null = null;
let llmApiErrorRetries = 0;
let emptyResponseRetries = 0;
let conversationBusyRetries = 0;
markMilestone("HEADLESS_FIRST_STREAM_START");
measureSinceMilestone("headless-setup-total", "HEADLESS_CLIENT_READY");
// Helper to check max turns limit using server-side step count from buffers
const checkMaxTurns = () => {
if (maxTurns !== undefined && buffers.usage.stepCount >= maxTurns) {
if (outputFormat === "stream-json") {
const errorMsg: ErrorMessage = {
type: "error",
message: `Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`,
stop_reason: "max_steps",
session_id: sessionId,
uuid: `error-max-turns-${randomUUID()}`,
};
console.log(JSON.stringify(errorMsg));
} else {
console.error(
`Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`,
);
}
process.exit(1);
}
};
try {
while (true) {
// Check max turns limit before starting a new turn (uses server-side step count)
checkMaxTurns();
// Inject queued skill content as user message parts (LET-7353)
{
const { consumeQueuedSkillContent } = await import(
"./tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
currentInput = [
...currentInput,
{
role: "user" as const,
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
},
];
}
}
// Wrap sendMessageStream in try-catch to handle pre-stream errors (e.g., 409)
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
let turnToolContextId: string | null = null;
try {
stream = await sendMessageStream(conversationId, currentInput, {
agentId: agent.id,
});
turnToolContextId = getStreamToolContextId(stream);
} catch (preStreamError) {
// Extract error detail using shared helper (handles nested/direct/message shapes)
const errorDetail = extractConflictDetail(preStreamError);
const preStreamAction = getPreStreamErrorAction(
errorDetail,
conversationBusyRetries,
CONVERSATION_BUSY_MAX_RETRIES,
{
status:
preStreamError instanceof APIError
? preStreamError.status
: undefined,
transientRetries: llmApiErrorRetries,
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
},
);
// Check for pending approval blocking new messages - resolve and retry.
// This is distinct from "conversation busy" and needs approval resolution,
// not just a timed delay.
if (preStreamAction === "resolve_approval_pending") {
if (outputFormat === "stream-json") {
const recoveryMsg: RecoveryMessage = {
type: "recovery",
recovery_type: "approval_pending",
message:
"Detected pending approval conflict on send; resolving before retry",
session_id: sessionId,
uuid: `recovery-pre-stream-${randomUUID()}`,
};
console.log(JSON.stringify(recoveryMsg));
} else {
console.error(
"Pending approval detected, resolving before retry...",
);
}
await resolveAllPendingApprovals();
continue;
}
// Check for 409 "conversation busy" error - retry once with delay
if (preStreamAction === "retry_conversation_busy") {
conversationBusyRetries += 1;
// Emit retry message for stream-json mode
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "error", // 409 conversation busy is a pre-stream error
attempt: conversationBusyRetries,
max_attempts: CONVERSATION_BUSY_MAX_RETRIES,
delay_ms: CONVERSATION_BUSY_RETRY_DELAY_MS,
session_id: sessionId,
uuid: `retry-conversation-busy-${randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
console.error(
`Conversation is busy, waiting ${CONVERSATION_BUSY_RETRY_DELAY_MS / 1000}s and retrying...`,
);
}
// Wait before retry
await new Promise((resolve) =>
setTimeout(resolve, CONVERSATION_BUSY_RETRY_DELAY_MS),
);
continue;
}
if (preStreamAction === "retry_transient") {
const attempt = llmApiErrorRetries + 1;
const retryAfterMs =
preStreamError instanceof APIError
? parseRetryAfterHeaderMs(
preStreamError.headers?.get("retry-after"),
)
: null;
const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1);
llmApiErrorRetries = attempt;
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
session_id: sessionId,
uuid: `retry-pre-stream-${randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
const delaySeconds = Math.round(delayMs / 1000);
console.error(
`Transient API error before streaming (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
);
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
conversationBusyRetries = 0;
continue;
}
// Reset conversation busy retry counter on other errors
conversationBusyRetries = 0;
// Re-throw to outer catch for other errors
throw preStreamError;
}
// For stream-json, output each chunk as it arrives
let stopReason: StopReasonType | null = null;
let approvals: Array<{
toolCallId: string;
toolName: string;
toolArgs: string;
}> = [];
let apiDurationMs: number;
let lastRunId: string | null = null;
let approvalPendingRecovery = false;
if (outputFormat === "stream-json") {
// Track approval requests across streamed chunks
const autoApprovalEmitted = new Set<string>();
const streamJsonHook: DrainStreamHook = async ({
chunk,
shouldOutput,
errorInfo,
updatedApproval,
}) => {
let shouldOutputChunk = shouldOutput;
if (errorInfo && shouldOutput) {
const errorEvent: ErrorMessage = {
type: "error",
message: errorInfo.message,
stop_reason: "error",
run_id: errorInfo.run_id,
session_id: sessionId,
uuid: randomUUID(),
...(errorInfo.error_type &&
errorInfo.run_id && {
api_error: {
message_type: "error_message",
message: errorInfo.message,
error_type: errorInfo.error_type,
detail: errorInfo.detail,
run_id: errorInfo.run_id,
},
}),
};
console.log(JSON.stringify(errorEvent));
shouldOutputChunk = false;
}
// Detect server conflict due to pending approval; handle it and retry
// Check both detail and message fields since error formats vary
if (
isApprovalPendingError(errorInfo?.detail) ||
isApprovalPendingError(errorInfo?.message)
) {
const recoveryRunId = errorInfo?.run_id;
const recoveryMsg: RecoveryMessage = {
type: "recovery",
recovery_type: "approval_pending",
message:
"Detected pending approval conflict; auto-denying stale approval and retrying",
run_id: recoveryRunId ?? undefined,
session_id: sessionId,
uuid: `recovery-${recoveryRunId || randomUUID()}`,
};
console.log(JSON.stringify(recoveryMsg));
approvalPendingRecovery = true;
return { stopReason: "error", shouldAccumulate: true };
}
// Check if this approval will be auto-approved. Dedup per tool_call_id
if (
updatedApproval &&
!autoApprovalEmitted.has(updatedApproval.toolCallId)
) {
const { autoAllowed } = await classifyApprovals([updatedApproval], {
alwaysRequiresUserInput: isInteractiveApprovalTool,
requireArgsForAutoApprove: true,
missingNameReason: "Tool call incomplete - missing name",
});
const [approval] = autoAllowed;
if (approval) {
const permission = approval.permission;
shouldOutputChunk = false;
const autoApprovalMsg: AutoApprovalMessage = {
type: "auto_approval",
tool_call: {
name: approval.approval.toolName,
tool_call_id: approval.approval.toolCallId,
arguments: approval.approval.toolArgs || "{}",
},
reason: permission.reason || "Allowed by permission rule",
matched_rule:
"matchedRule" in permission && permission.matchedRule
? permission.matchedRule
: "auto-approved",
session_id: sessionId,
uuid: `auto-approval-${approval.approval.toolCallId}`,
};
console.log(JSON.stringify(autoApprovalMsg));
autoApprovalEmitted.add(approval.approval.toolCallId);
}
}
if (shouldOutputChunk) {
const chunkWithIds = chunk as typeof chunk & {
otid?: string;
id?: string;
};
const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
const streamEvent: StreamEvent = {
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid: uuid || randomUUID(),
};
console.log(JSON.stringify(streamEvent));
} else {
const msg: MessageWire = {
type: "message",
...chunk,
session_id: sessionId,
uuid: uuid || randomUUID(),
};
console.log(JSON.stringify(msg));
}
}
return { shouldOutput: shouldOutputChunk, shouldAccumulate: true };
};
const result = await drainStreamWithResume(
stream,
buffers,
() => {},
undefined,
undefined,
streamJsonHook,
reminderContextTracker,
);
stopReason = result.stopReason;
approvals = result.approvals || [];
apiDurationMs = result.apiDurationMs;
lastRunId = result.lastRunId || null;
if (lastRunId) lastKnownRunId = lastRunId;
} else {
// Normal mode: use drainStreamWithResume
const result = await drainStreamWithResume(
stream,
buffers,
() => {}, // No UI refresh needed in headless mode
undefined,
undefined,
undefined,
reminderContextTracker,
);
stopReason = result.stopReason;
approvals = result.approvals || [];
apiDurationMs = result.apiDurationMs;
lastRunId = result.lastRunId || null;
if (lastRunId) lastKnownRunId = lastRunId;
}
// Track API duration for this stream
sessionStats.endTurn(apiDurationMs);
// Check max turns after each turn (server may have taken multiple steps)
checkMaxTurns();
if (approvalPendingRecovery) {
await resolveAllPendingApprovals();
continue;
}
// Case 1: Turn ended normally
if (stopReason === "end_turn") {
// Reset retry counters on success
llmApiErrorRetries = 0;
emptyResponseRetries = 0;
conversationBusyRetries = 0;
break;
}
// Case 2: Requires approval - batch process all approvals
if (stopReason === "requires_approval") {
if (approvals.length === 0) {
console.error("Unexpected empty approvals array");
process.exit(1);
}
// Phase 1: Collect decisions for all approvals
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
const { autoAllowed, autoDenied, needsUserInput } =
await classifyApprovals(approvals, {
alwaysRequiresUserInput: isInteractiveApprovalTool,
requireArgsForAutoApprove: true,
missingNameReason: "Tool call incomplete - missing name",
});
const decisions: Decision[] = [
...autoAllowed.map((ac) => ({
type: "approve" as const,
approval: ac.approval,
})),
...needsUserInput.map((ac) => {
// One-shot headless mode has no control channel for interactive
// approvals. Match Claude behavior by auto-allowing EnterPlanMode
// while denying tools that need runtime user responses.
if (isHeadlessAutoAllowTool(ac.approval.toolName)) {
return {
type: "approve" as const,
approval: ac.approval,
};
}
return {
type: "deny" as const,
approval: ac.approval,
reason: "Tool requires approval (headless mode)",
};
}),
...autoDenied.map((ac) => {
const fallback =
"matchedRule" in ac.permission && ac.permission.matchedRule
? `Permission denied: ${ac.permission.matchedRule}`
: ac.permission.reason
? `Permission denied: ${ac.permission.reason}`
: "Permission denied: Unknown reason";
return {
type: "deny" as const,
approval: ac.approval,
reason: ac.denyReason ?? fallback,
};
}),
];
// Phase 2: Execute all approved tools and format results using shared function
const { executeApprovalBatch } = await import(
"./agent/approval-execution"
);
const executedResults = await executeApprovalBatch(
decisions,
undefined,
{
toolContextId: turnToolContextId ?? undefined,
},
);
// Send all results in one batch
currentInput = [
{
type: "approval",
approvals: executedResults as ApprovalResult[],
},
];
continue;
}
// Cache latest error text for this turn
let latestErrorText: string | null = null;
const linesForTurn = toLines(buffers);
for (let i = linesForTurn.length - 1; i >= 0; i -= 1) {
const line = linesForTurn[i];
if (
line?.kind === "error" &&
"text" in line &&
typeof line.text === "string"
) {
latestErrorText = line.text;
break;
}
}
// Fetch run error detail for invalid tool call ID detection
const detailFromRun = await fetchRunErrorDetail(lastRunId);
// Case 3: Transient LLM API error - retry with exponential backoff up to a limit
if (stopReason === "llm_api_error") {
if (llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES) {
const attempt = llmApiErrorRetries + 1;
const baseDelayMs = 1000;
const delayMs = baseDelayMs * 2 ** (attempt - 1);
llmApiErrorRetries = attempt;
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `retry-${lastRunId || randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
const delaySeconds = Math.round(delayMs / 1000);
console.error(
`LLM API error encountered (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
);
}
// Exponential backoff before retrying the same input
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
}
// "Invalid tool call IDs" means server HAS pending approvals but with different IDs.
// Fetch the actual pending approvals and process them before retrying.
const invalidIdsDetected =
isInvalidToolCallIdsError(detailFromRun) ||
isInvalidToolCallIdsError(latestErrorText);
if (invalidIdsDetected) {
if (outputFormat === "stream-json") {
const recoveryMsg: RecoveryMessage = {
type: "recovery",
recovery_type: "invalid_tool_call_ids",
message:
"Tool call ID mismatch; fetching actual pending approvals and resyncing",
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `recovery-${lastRunId || randomUUID()}`,
};
console.log(JSON.stringify(recoveryMsg));
} else {
console.error(
"Tool call ID mismatch; fetching actual pending approvals...",
);
}
try {
// Fetch and process actual pending approvals from server
await resolveAllPendingApprovals();
// After processing, continue to next iteration (fresh state)
continue;
} catch {
// If fetch fails, exit with error
if (outputFormat === "stream-json") {
const errorMsg: ErrorMessage = {
type: "error",
message: "Failed to fetch pending approvals for resync",
stop_reason: stopReason,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `error-${lastRunId || randomUUID()}`,
};
console.log(JSON.stringify(errorMsg));
} else {
console.error("Failed to fetch pending approvals for resync");
}
process.exit(1);
}
}
// Unexpected stop reason (error, llm_api_error, etc.)
// Before failing, check run metadata to see if this is a retriable error
// This handles cases where the backend sends a generic error stop_reason but the
// underlying cause is a transient LLM/network issue that should be retried
// Early exit for stop reasons that should never be retried
const nonRetriableReasons: StopReasonType[] = [
"cancelled",
"requires_approval",
"max_steps",
"max_tokens_exceeded",
"context_window_overflow_in_system_prompt",
"end_turn",
"tool_rule",
"no_tool_call",
];
if (nonRetriableReasons.includes(stopReason)) {
// Fall through to error display
} else if (lastRunId && llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES) {
try {
const run = await client.runs.retrieve(lastRunId);
const metaError = run.metadata?.error as
| {
error_type?: string;
message?: string;
detail?: string;
// Handle nested error structure (error.error) that can occur in some edge cases
error?: { error_type?: string; detail?: string };
}
| undefined;
// Check for llm_error at top level or nested (handles error.error nesting)
const errorType =
metaError?.error_type ?? metaError?.error?.error_type;
const detail = metaError?.detail ?? metaError?.error?.detail ?? "";
// Special handling for empty response errors (Opus 4.6 SADs)
// Empty LLM response retry (e.g. Opus 4.6 occasionally returns no content).
// Retry 1: same input unchanged. Retry 2: append system reminder nudging the model.
if (
isEmptyResponseRetryable(
errorType,
detail,
emptyResponseRetries,
EMPTY_RESPONSE_MAX_RETRIES,
)
) {
const attempt = emptyResponseRetries + 1;
const delayMs = 500 * attempt;
emptyResponseRetries = attempt;
// Only append a nudge on the last attempt
if (attempt >= EMPTY_RESPONSE_MAX_RETRIES) {
const nudgeMessage: MessageCreate = {
role: "system",
content: `<system-reminder>The previous response was empty. Please provide a response with either text content or a tool call.</system-reminder>`,
};
currentInput = [...currentInput, nudgeMessage];
}
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: EMPTY_RESPONSE_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `retry-empty-${lastRunId || randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
console.error(
`Empty LLM response, retrying (attempt ${attempt} of ${EMPTY_RESPONSE_MAX_RETRIES})...`,
);
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
if (shouldRetryRunMetadataError(errorType, detail)) {
const attempt = llmApiErrorRetries + 1;
const baseDelayMs = 1000;
const delayMs = baseDelayMs * 2 ** (attempt - 1);
llmApiErrorRetries = attempt;
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `retry-${lastRunId || randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
const delaySeconds = Math.round(delayMs / 1000);
console.error(
`LLM API error encountered (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
);
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
} catch (_e) {
// If we can't fetch run metadata, fall through to normal error handling
}
}
// Mark incomplete tool calls as cancelled to prevent stuck state
markIncompleteToolsAsCancelled(buffers, true, "stream_error");
// Extract error details from buffers if available
const errorLines = toLines(buffers).filter(
(line) => line.kind === "error",
);
const errorMessages = errorLines
.map((line) => ("text" in line ? line.text : ""))
.filter(Boolean);
let errorMessage =
errorMessages.length > 0
? errorMessages.join("; ")
: `Unexpected stop reason: ${stopReason}`;
// Fetch detailed error from run metadata if available (same as TUI mode)
if (lastRunId && errorMessages.length === 0) {
try {
const run = await client.runs.retrieve(lastRunId);
if (run.metadata?.error) {
const errorData = run.metadata.error as {
type?: string;
message?: string;
detail?: string;
};
// Construct error object that formatErrorDetails can parse
const errorObject = {
error: {
error: errorData,
run_id: lastRunId,
},
};
errorMessage = formatErrorDetails(errorObject, agent.id);
}
} catch (_e) {
// If we can't fetch error details, append note to error message
errorMessage = `${errorMessage}\n(Unable to fetch additional error details from server)`;
}
}
if (outputFormat === "stream-json") {
// Emit error event
const errorMsg: ErrorMessage = {
type: "error",
message: errorMessage,
stop_reason: stopReason,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `error-${lastRunId || randomUUID()}`,
};
console.log(JSON.stringify(errorMsg));
} else {
console.error(`Error: ${errorMessage}`);
}
process.exit(1);
}
} catch (error) {
// Mark incomplete tool calls as cancelled
markIncompleteToolsAsCancelled(buffers, true, "stream_error");
// Use comprehensive error formatting (same as TUI mode)
const errorDetails = formatErrorDetails(error, agent.id);
if (outputFormat === "stream-json") {
const errorMsg: ErrorMessage = {
type: "error",
message: errorDetails,
stop_reason: "error",
run_id: lastKnownRunId ?? undefined,
session_id: sessionId,
uuid: `error-${lastKnownRunId || randomUUID()}`,
};
console.log(JSON.stringify(errorMsg));
} else {
console.error(`Error: ${errorDetails}`);
}
process.exit(1);
}
// Update stats with final usage data from buffers
sessionStats.updateUsageFromBuffers(buffers);
// Extract final result from transcript, with sensible fallbacks
const lines = toLines(buffers);
const reversed = [...lines].reverse();
const lastAssistant = reversed.find(
(line) =>
line.kind === "assistant" &&
"text" in line &&
typeof line.text === "string" &&
line.text.trim().length > 0,
) as Extract<Line, { kind: "assistant" }> | undefined;
const lastReasoning = reversed.find(
(line) =>
line.kind === "reasoning" &&
"text" in line &&
typeof line.text === "string" &&
line.text.trim().length > 0,
) as Extract<Line, { kind: "reasoning" }> | undefined;
const lastToolResult = reversed.find(
(line) =>
line.kind === "tool_call" &&
"resultText" in line &&
typeof (line as Extract<Line, { kind: "tool_call" }>).resultText ===
"string" &&
((line as Extract<Line, { kind: "tool_call" }>).resultText ?? "").trim()
.length > 0,
) as Extract<Line, { kind: "tool_call" }> | undefined;
const resultText =
lastAssistant?.text ||
lastReasoning?.text ||
lastToolResult?.resultText ||
"No assistant response found";
const stats = sessionStats.getSnapshot();
const usage = {
prompt_tokens: stats.usage.promptTokens,
completion_tokens: stats.usage.completionTokens,
total_tokens: stats.usage.totalTokens,
step_count: stats.usage.stepCount,
cached_input_tokens: stats.usage.cachedInputTokens,
cache_write_tokens: stats.usage.cacheWriteTokens,
reasoning_tokens: stats.usage.reasoningTokens,
...(stats.usage.contextTokens !== undefined && {
context_tokens: stats.usage.contextTokens,
}),
};
// Output based on format
if (outputFormat === "json") {
const output = {
type: "result",
subtype: "success",
is_error: false,
duration_ms: Math.round(stats.totalWallMs),
duration_api_ms: Math.round(stats.totalApiMs),
num_turns: stats.usage.stepCount,
result: resultText,
agent_id: agent.id,
conversation_id: conversationId,
usage,
};
console.log(JSON.stringify(output, null, 2));
} else if (outputFormat === "stream-json") {
// Output final result event
// Collect all run_ids from buffers
const allRunIds = new Set<string>();
for (const line of toLines(buffers)) {
// Extract run_id from any line that might have it
// This is a fallback in case we missed any during streaming
if ("run_id" in line && typeof line.run_id === "string") {
allRunIds.add(line.run_id);
}
}
// Use the last run_id as the result uuid if available, otherwise derive from agent_id
const resultUuid =
allRunIds.size > 0
? `result-${Array.from(allRunIds).pop()}`
: `result-${agent.id}`;
const resultEvent: ResultMessage = {
type: "result",
subtype: "success",
session_id: sessionId,
duration_ms: Math.round(stats.totalWallMs),
duration_api_ms: Math.round(stats.totalApiMs),
num_turns: stats.usage.stepCount,
result: resultText,
agent_id: agent.id,
conversation_id: conversationId,
run_ids: Array.from(allRunIds),
usage,
uuid: resultUuid,
};
console.log(JSON.stringify(resultEvent));
} else {
// text format (default)
if (!resultText || resultText === "No assistant response found") {
console.error("No assistant response found");
process.exit(1);
}
console.log(resultText);
}
// Report all milestones at the end for latency audit
markMilestone("HEADLESS_COMPLETE");
reportAllMilestones();
}
/**
* Bidirectional mode for SDK communication.
* Reads JSON messages from stdin, processes them, and outputs responses.
* Stays alive until stdin closes.
*/
async function runBidirectionalMode(
agent: AgentState,
conversationId: string,
client: Letta,
_outputFormat: string,
includePartialMessages: boolean,
availableTools: string[],
skillSources: SkillSource[],
systemInfoReminderEnabled: boolean,
reflectionSettings: ReflectionSettings,
): Promise<void> {
const sessionId = agent.id;
const readline = await import("node:readline");
// Emit init event
const initEvent = {
type: "system",
subtype: "init",
session_id: sessionId,
agent_id: agent.id,
conversation_id: conversationId,
model: agent.llm_config?.model,
tools: availableTools,
cwd: process.cwd(),
memfs_enabled: settingsManager.isMemfsEnabled(agent.id),
skill_sources: skillSources,
system_info_reminder_enabled: systemInfoReminderEnabled,
reflection_trigger: reflectionSettings.trigger,
reflection_behavior: reflectionSettings.behavior,
reflection_step_count: reflectionSettings.stepCount,
uuid: `init-${agent.id}`,
};
console.log(JSON.stringify(initEvent));
// Track current operation for interrupt support
let currentAbortController: AbortController | null = null;
const reminderContextTracker = createContextTracker();
const sharedReminderState = createSharedReminderState();
const isSubagent = process.env.LETTA_CODE_AGENT_ROLE === "subagent";
// Resolve pending approvals for this conversation before retrying user input.
const resolveAllPendingApprovals = async () => {
const { getResumeData } = await import("./agent/check-approval");
while (true) {
// Re-fetch agent to get latest in-context messages (source of truth for backend)
const freshAgent = await client.agents.retrieve(agent.id);
let resume: Awaited<ReturnType<typeof getResumeData>>;
try {
resume = await getResumeData(client, freshAgent, conversationId);
} catch (error) {
// Treat 404/422 as "no approvals" - stale message/conversation state
if (
error instanceof APIError &&
(error.status === 404 || error.status === 422)
) {
break;
}
throw error;
}
const pendingApprovals = resume.pendingApprovals || [];
if (pendingApprovals.length === 0) break;
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
matchedRule: string;
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
const { autoAllowed, autoDenied } = await classifyApprovals(
pendingApprovals,
{
treatAskAsDeny: true,
denyReasonForAsk: "Tool requires approval (headless mode)",
requireArgsForAutoApprove: true,
missingNameReason: "Tool call incomplete - missing name",
},
);
const decisions: Decision[] = [
...autoAllowed.map((ac) => ({
type: "approve" as const,
approval: ac.approval,
reason: ac.permission.reason || "Allowed by permission rule",
matchedRule:
"matchedRule" in ac.permission && ac.permission.matchedRule
? ac.permission.matchedRule
: "auto-approved",
})),
...autoDenied.map((ac) => {
const fallback =
"matchedRule" in ac.permission && ac.permission.matchedRule
? `Permission denied: ${ac.permission.matchedRule}`
: ac.permission.reason
? `Permission denied: ${ac.permission.reason}`
: "Permission denied: Unknown reason";
return {
type: "deny" as const,
approval: ac.approval,
reason: ac.denyReason ?? fallback,
};
}),
];
const { executeApprovalBatch } = await import(
"./agent/approval-execution"
);
const executedResults = await executeApprovalBatch(decisions);
const approvalInput: ApprovalCreate = {
type: "approval",
approvals: executedResults as ApprovalResult[],
};
const approvalMessages: Array<
| import("@letta-ai/letta-client/resources/agents/agents").MessageCreate
| import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate
> = [approvalInput];
{
const { consumeQueuedSkillContent } = await import(
"./tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
approvalMessages.push({
role: "user" as const,
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
});
}
}
const approvalStream = await sendMessageStream(
conversationId,
approvalMessages,
{ agentId: agent.id },
);
const drainResult = await drainStreamWithResume(
approvalStream,
createBuffers(agent.id),
() => {},
undefined,
undefined,
undefined,
reminderContextTracker,
);
if (
drainResult.stopReason === "error" ||
drainResult.stopReason === "cancelled"
) {
throw new Error(
`Approval drain ended with stop reason: ${drainResult.stopReason}`,
);
}
}
};
// Create readline interface for stdin
const rl = readline.createInterface({
input: process.stdin,
terminal: false,
});
// Create async iterator and line queue for permission callbacks
const lineQueue: string[] = [];
let lineResolver: ((line: string | null) => void) | null = null;
// ── Queue lifecycle tracking (stream-json only) ────────────────
// Bidirectional mode always runs under stream-json input format, so queue
// events are always emitted here. emitQueueEvent is a no-op guard retained
// for clarity and future-proofing against non-stream-json callers.
const emitQueueEvent = (e: QueueLifecycleEvent): void => {
console.log(JSON.stringify(e));
};
let turnInProgress = false;
const msgQueueRuntime = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) =>
emitQueueEvent({
type: "queue_item_enqueued",
item_id: item.id,
source: item.source,
kind: item.kind,
queue_len: queueLen,
session_id: sessionId,
uuid: `q-enq-${item.id}`,
}),
onDequeued: (batch) =>
emitQueueEvent({
type: "queue_batch_dequeued",
batch_id: batch.batchId,
item_ids: batch.items.map((i) => i.id),
merged_count: batch.mergedCount,
queue_len_after: batch.queueLenAfter,
session_id: sessionId,
uuid: `q-deq-${batch.batchId}`,
}),
onCleared: (reason, clearedCount) =>
emitQueueEvent({
type: "queue_cleared",
reason,
cleared_count: clearedCount,
session_id: sessionId,
uuid: `q-clr-${randomUUID()}`,
}),
},
});
/**
* Parses a raw JSON line and returns the queue item payload if it is a
* user message or task_notification. Returns null for control lines
* (control_request, control_response, etc.) and malformed JSON.
*/
function parseUserLine(raw: string): {
kind: "message" | "task_notification";
content: string;
} | null {
if (!raw.trim()) return null;
try {
const parsed: {
type?: string;
message?: { content?: string };
_queuedKind?: string;
} = JSON.parse(raw);
if (parsed.type !== "user" || parsed.message?.content === undefined)
return null;
const kind =
parsed._queuedKind === "task_notification"
? "task_notification"
: "message";
return { kind, content: parsed.message.content };
} catch {
return null;
}
}
/**
* Emit queue_blocked on the FIRST user/task line arrival during an active
* turn. Does NOT enqueue to msgQueueRuntime — that happens later, at the
* coalescing loop where consumption is certain (avoids orphaned items from
* the external-tool wait loop which drops non-matching lines silently).
*/
let blockedEmittedThisTurn = false;
function maybeNotifyBlocked(raw: string): void {
if (!turnInProgress || blockedEmittedThisTurn) return;
if (!parseUserLine(raw)) return;
blockedEmittedThisTurn = true;
// queue_len: count user/task items currently in lineQueue (best-effort)
const queueLen = lineQueue.filter((l) => parseUserLine(l) !== null).length;
emitQueueEvent({
type: "queue_blocked",
reason: "runtime_busy",
queue_len: Math.max(1, queueLen),
session_id: sessionId,
uuid: `q-blk-${randomUUID()}`,
});
}
/** Enqueue a BidirectionalQueuedInput into msgQueueRuntime for lifecycle tracking. */
function enqueueForTracking(input: BidirectionalQueuedInput): void {
if (input.kind === "task_notification") {
msgQueueRuntime.enqueue({
kind: "task_notification",
source: "task_notification",
text: input.text,
} as Parameters<typeof msgQueueRuntime.enqueue>[0]);
} else {
msgQueueRuntime.enqueue({
kind: "message",
source: "user",
content: input.content,
} as Parameters<typeof msgQueueRuntime.enqueue>[0]);
}
}
const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) =>
JSON.stringify({
type: "user",
message: {
role: "user",
content: queuedMessage.text,
},
_queuedKind: queuedMessage.kind,
});
// Connect Task/subagent background notifications to the same queueing path
// used by user input so bidirectional mode inherits TUI-style queue behavior.
setMessageQueueAdder((queuedMessage) => {
const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage);
maybeNotifyBlocked(syntheticUserLine);
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
resolve(syntheticUserLine);
return;
}
lineQueue.push(syntheticUserLine);
});
// Feed lines into queue or resolver
rl.on("line", (line) => {
maybeNotifyBlocked(line);
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
resolve(line);
} else {
lineQueue.push(line);
}
});
rl.on("close", () => {
setMessageQueueAdder(null);
msgQueueRuntime.clear("shutdown");
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
resolve(null);
}
});
// Helper to get next line (from queue or wait)
async function getNextLine(): Promise<string | null> {
if (lineQueue.length > 0) {
return lineQueue.shift() ?? null;
}
return new Promise<string | null>((resolve) => {
lineResolver = resolve;
});
}
// Helper to send permission request and wait for response
// Uses Claude SDK's control_request/control_response format for compatibility
async function requestPermission(
toolCallId: string,
toolName: string,
toolInput: Record<string, unknown>,
): Promise<{
decision: "allow" | "deny";
reason?: string;
updatedInput?: Record<string, unknown> | null;
}> {
const requestId = `perm-${toolCallId}`;
// Compute diff previews for file-modifying tools
const diffs = await computeDiffPreviews(toolName, toolInput);
// Build can_use_tool control request (Claude SDK format)
const canUseToolRequest: CanUseToolControlRequest = {
subtype: "can_use_tool",
tool_name: toolName,
input: toolInput,
tool_call_id: toolCallId, // Letta-specific
permission_suggestions: [], // TODO: not implemented
blocked_path: null, // TODO: not implemented
...(diffs.length > 0 ? { diffs } : {}),
};
const controlRequest: ControlRequest = {
type: "control_request",
request_id: requestId,
request: canUseToolRequest,
};
console.log(JSON.stringify(controlRequest));
const deferredLines: string[] = [];
// Wait for control_response
let result: {
decision: "allow" | "deny";
reason?: string;
updatedInput?: Record<string, unknown> | null;
} | null = null;
while (result === null) {
const line = await getNextLine();
if (line === null) {
result = { decision: "deny", reason: "stdin closed" };
break;
}
if (!line.trim()) continue;
try {
const msg = JSON.parse(line);
if (
msg.type === "control_response" &&
msg.response?.request_id === requestId
) {
// Parse the can_use_tool response
const response = msg.response?.response as
| CanUseToolResponse
| undefined;
if (!response) {
result = { decision: "deny", reason: "Invalid response format" };
break;
}
if (response.behavior === "allow") {
result = {
decision: "allow",
updatedInput: response.updatedInput,
};
} else {
result = {
decision: "deny",
reason: response.message,
// TODO: handle interrupt flag
};
}
break;
}
// Defer other messages for the main loop without re-reading them.
deferredLines.push(line);
} catch {
// Defer parse errors so the main loop can surface them.
deferredLines.push(line);
}
}
if (deferredLines.length > 0) {
lineQueue.unshift(...deferredLines);
}
return result;
}
// Main processing loop
while (true) {
const line = await getNextLine();
if (line === null) break; // stdin closed
if (!line.trim()) continue;
let message: {
type: string;
message?: { role: string; content: MessageCreate["content"] };
request_id?: string;
request?: { subtype: string };
session_id?: string;
};
try {
message = JSON.parse(line);
} catch {
const errorMsg: ErrorMessage = {
type: "error",
message: "Invalid JSON input",
stop_reason: "error",
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(errorMsg));
continue;
}
// Handle control requests
if (message.type === "control_request") {
const subtype = message.request?.subtype;
const requestId = message.request_id;
if (subtype === "initialize") {
// Return session info
const initResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "success",
request_id: requestId ?? "",
response: {
agent_id: agent.id,
model: agent.llm_config?.model,
tools: availableTools,
memfs_enabled: settingsManager.isMemfsEnabled(agent.id),
skill_sources: skillSources,
system_info_reminder_enabled: systemInfoReminderEnabled,
reflection_trigger: reflectionSettings.trigger,
reflection_behavior: reflectionSettings.behavior,
reflection_step_count: reflectionSettings.stepCount,
},
},
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(initResponse));
} else if (subtype === "interrupt") {
// Abort current operation if any
if (currentAbortController !== null) {
(currentAbortController as AbortController).abort();
currentAbortController = null;
}
const interruptResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "success",
request_id: requestId ?? "",
},
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(interruptResponse));
} else if (subtype === "register_external_tools") {
// Register external tools from SDK
const toolsRequest = message.request as {
tools?: ExternalToolDefinition[];
};
const tools = toolsRequest.tools ?? [];
registerExternalTools(tools);
// Set up the external tool executor to send requests back to SDK
setExternalToolExecutor(async (toolCallId, toolName, input) => {
// Send execute_external_tool request to SDK
const execRequest: ControlRequest = {
type: "control_request",
request_id: `ext-${toolCallId}`,
request: {
subtype: "execute_external_tool",
tool_call_id: toolCallId,
tool_name: toolName,
input,
} as unknown as CanUseToolControlRequest, // Type cast for compatibility
};
console.log(JSON.stringify(execRequest));
// Wait for external_tool_result response
while (true) {
const line = await getNextLine();
if (line === null) {
return {
content: [{ type: "text", text: "stdin closed" }],
isError: true,
};
}
if (!line.trim()) continue;
try {
const msg = JSON.parse(line);
if (
msg.type === "control_response" &&
msg.response?.subtype === "external_tool_result" &&
msg.response?.tool_call_id === toolCallId
) {
return {
content: msg.response.content ?? [{ type: "text", text: "" }],
isError: msg.response.is_error ?? false,
};
}
} catch {
// Ignore parse errors, keep waiting
}
}
});
const registerResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "success",
request_id: requestId ?? "",
response: { registered: tools.length },
},
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(registerResponse));
} else if (subtype === "bootstrap_session_state") {
const bootstrapReq = message.request as BootstrapSessionStateRequest;
const { getResumeData } = await import("./agent/check-approval");
let hasPendingApproval = false;
try {
// Re-fetch for parity with approval checks elsewhere in headless mode.
const freshAgent = await client.agents.retrieve(agent.id);
const resume = await getResumeData(
client,
freshAgent,
conversationId,
{
includeMessageHistory: false,
},
);
hasPendingApproval = (resume.pendingApprovals?.length ?? 0) > 0;
} catch (error) {
// Keep bootstrap non-fatal if approval probe fails on stale resources.
if (
!(error instanceof APIError) ||
(error.status !== 404 && error.status !== 422)
) {
console.warn(
`[bootstrap] pending-approval probe failed: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
const bootstrapResp = await handleBootstrapSessionState({
bootstrapReq,
sessionContext: {
agentId: agent.id,
conversationId,
model: agent.llm_config?.model,
tools: availableTools,
memfsEnabled: settingsManager.isMemfsEnabled(agent.id),
sessionId,
},
requestId: requestId ?? "",
client,
hasPendingApproval,
});
console.log(JSON.stringify(bootstrapResp));
} else if (subtype === "list_messages") {
const listReq = message.request as ListMessagesControlRequest;
const listResp = await handleListMessages({
listReq,
sessionConversationId: conversationId,
sessionAgentId: agent.id,
sessionId,
requestId: requestId ?? "",
client,
});
console.log(JSON.stringify(listResp));
} else {
const errorResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "error",
request_id: requestId ?? "",
error: `Unknown control request subtype: ${subtype}`,
},
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(errorResponse));
}
continue;
}
// Handle user messages
if (message.type === "user" && message.message?.content !== undefined) {
const queuedInputs: BidirectionalQueuedInput[] = [
{
kind: "user",
content: message.message.content,
},
];
// Batch any already-buffered user lines into the same turn, mirroring
// TUI queue dequeue behavior (single coalesced submit when idle).
while (lineQueue.length > 0) {
const candidate = lineQueue[0];
if (!candidate?.trim()) {
lineQueue.shift();
continue;
}
let parsedCandidate: {
type?: string;
message?: { content?: MessageCreate["content"] };
_queuedKind?: QueuedMessage["kind"];
};
try {
parsedCandidate = JSON.parse(candidate);
} catch {
// Leave malformed lines for the main loop to surface as parse errors.
break;
}
if (
parsedCandidate.type === "user" &&
parsedCandidate.message?.content !== undefined
) {
lineQueue.shift();
if (parsedCandidate._queuedKind === "task_notification") {
const notificationText =
typeof parsedCandidate.message.content === "string"
? parsedCandidate.message.content
: parsedCandidate.message.content
.reduce((texts: string[], part) => {
if (
part.type === "text" &&
"text" in part &&
typeof part.text === "string"
) {
texts.push(part.text);
}
return texts;
}, [])
.join("");
queuedInputs.push({
kind: "task_notification",
text: notificationText,
});
} else {
queuedInputs.push({
kind: "user",
content: parsedCandidate.message.content,
});
}
continue;
}
// Stop coalescing when the queue head is not a user-input line.
// The outer loop must process control/error/system lines in-order.
break;
}
// Enqueue consumed items into msgQueueRuntime for lifecycle tracking.
// Done here (not at arrival) to avoid orphaned items from the external-
// tool wait loop, which consumes non-matching lines via getNextLine()
// without deferring them back to lineQueue.
for (const input of queuedInputs) {
enqueueForTracking(input);
}
// Signal dequeue for exactly the items we just enqueued. consumeItems(n)
// bypasses QueueRuntime's internal coalescing policy so the count matches
// what the coalescing loop actually yielded.
msgQueueRuntime.consumeItems(queuedInputs.length);
const userContent = mergeBidirectionalQueuedInput(queuedInputs);
if (userContent === null) {
continue;
}
// Create abort controller for this operation
currentAbortController = new AbortController();
turnInProgress = true;
try {
const buffers = createBuffers(agent.id);
const startTime = performance.now();
let numTurns = 0;
let lastStopReason: StopReasonType | null = null; // Track for result subtype
let sawStreamError = false; // Track if we emitted an error during streaming
let preStreamTransientRetries = 0;
syncReminderStateFromContextTracker(
sharedReminderState,
reminderContextTracker,
);
const lastRunAt = (agent as { last_run_completion?: string })
.last_run_completion;
const { parts: sharedReminderParts } = await buildSharedReminderParts({
mode: isSubagent ? "subagent" : "headless-bidirectional",
agent: {
id: agent.id,
name: agent.name,
description: agent.description,
lastRunAt: lastRunAt ?? null,
},
state: sharedReminderState,
sessionContextReminderEnabled: systemInfoReminderEnabled,
reflectionSettings,
skillSources,
resolvePlanModeReminder: async () => {
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
return PLAN_MODE_REMINDER;
},
});
const enrichedContent = prependReminderPartsToContent(
userContent,
sharedReminderParts,
);
// Initial input is the user message
let currentInput: MessageCreate[] = [
{ role: "user", content: enrichedContent },
];
// Approval handling loop - continue until end_turn or error
while (true) {
numTurns++;
// Check if aborted
if (currentAbortController?.signal.aborted) {
break;
}
// Inject queued skill content as user message parts (LET-7353)
{
const { consumeQueuedSkillContent } = await import(
"./tools/impl/skillContentRegistry"
);
const skillContents = consumeQueuedSkillContent();
if (skillContents.length > 0) {
currentInput = [
...currentInput,
{
role: "user" as const,
content: skillContents.map((sc) => ({
type: "text" as const,
text: sc.content,
})),
},
];
}
}
// Send message to agent.
// Wrap in try-catch to handle pre-stream 409 approval-pending errors.
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
let turnToolContextId: string | null = null;
try {
stream = await sendMessageStream(conversationId, currentInput, {
agentId: agent.id,
});
turnToolContextId = getStreamToolContextId(stream);
} catch (preStreamError) {
// Extract error detail using shared helper (handles nested/direct/message shapes)
const errorDetail = extractConflictDetail(preStreamError);
// Route through shared pre-stream conflict classifier (parity with main loop + TUI)
// Bidir mode has no conversation-busy retry budget, so pass 0/0 to disable busy-retry.
const preStreamAction = getPreStreamErrorAction(errorDetail, 0, 0, {
status:
preStreamError instanceof APIError
? preStreamError.status
: undefined,
transientRetries: preStreamTransientRetries,
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
});
if (preStreamAction === "resolve_approval_pending") {
const recoveryMsg: RecoveryMessage = {
type: "recovery",
recovery_type: "approval_pending",
message:
"Detected pending approval conflict on send; resolving before retry",
session_id: sessionId,
uuid: `recovery-bidir-${randomUUID()}`,
};
console.log(JSON.stringify(recoveryMsg));
await resolveAllPendingApprovals();
continue;
}
if (preStreamAction === "retry_transient") {
const attempt = preStreamTransientRetries + 1;
const retryAfterMs =
preStreamError instanceof APIError
? parseRetryAfterHeaderMs(
preStreamError.headers?.get("retry-after"),
)
: null;
const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1);
preStreamTransientRetries = attempt;
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
session_id: sessionId,
uuid: `retry-bidir-${randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
throw preStreamError;
}
preStreamTransientRetries = 0;
const streamJsonHook: DrainStreamHook = ({
chunk,
shouldOutput,
errorInfo,
}) => {
// Handle in-stream errors (emit ErrorMessage with full details)
if (errorInfo && shouldOutput) {
sawStreamError = true; // Track that we saw an error (affects result subtype)
const errorEvent: ErrorMessage = {
type: "error",
message: errorInfo.message,
stop_reason: "error",
run_id: errorInfo.run_id,
session_id: sessionId,
uuid: randomUUID(),
...(errorInfo.error_type &&
errorInfo.run_id && {
api_error: {
message_type: "error_message",
message: errorInfo.message,
error_type: errorInfo.error_type,
detail: errorInfo.detail,
run_id: errorInfo.run_id,
},
}),
};
console.log(JSON.stringify(errorEvent));
return { shouldAccumulate: true };
}
if (!shouldOutput) {
return { shouldAccumulate: true };
}
const chunkWithIds = chunk as typeof chunk & {
otid?: string;
id?: string;
};
const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
const streamEvent: StreamEvent = {
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid: uuid || randomUUID(),
};
console.log(JSON.stringify(streamEvent));
} else {
const msg: MessageWire = {
type: "message",
...chunk,
session_id: sessionId,
uuid: uuid || randomUUID(),
};
console.log(JSON.stringify(msg));
}
return { shouldAccumulate: true };
};
const result = await drainStreamWithResume(
stream,
buffers,
() => {},
currentAbortController?.signal,
undefined,
streamJsonHook,
reminderContextTracker,
);
const stopReason = result.stopReason;
lastStopReason = stopReason; // Track for result subtype
const approvals = result.approvals || [];
// Case 1: Turn ended normally - break out of loop
if (stopReason === "end_turn") {
break;
}
// Case 2: Aborted - break out of loop
if (
currentAbortController?.signal.aborted ||
stopReason === "cancelled"
) {
break;
}
// Case 3: Requires approval - process approvals and continue
if (stopReason === "requires_approval") {
if (approvals.length === 0) {
// Anomalous state: requires_approval but no approvals
// Treat as error rather than false-positive success
lastStopReason = "error";
break;
}
// Check permissions and collect decisions
type Decision =
| {
type: "approve";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
matchedRule: string;
}
| {
type: "deny";
approval: {
toolCallId: string;
toolName: string;
toolArgs: string;
};
reason: string;
};
const { autoAllowed, autoDenied, needsUserInput } =
await classifyApprovals(approvals, {
alwaysRequiresUserInput: isInteractiveApprovalTool,
requireArgsForAutoApprove: true,
missingNameReason: "Tool call incomplete - missing name",
});
const decisions: Decision[] = [
...autoAllowed.map((ac) => ({
type: "approve" as const,
approval: ac.approval,
matchedRule:
"matchedRule" in ac.permission && ac.permission.matchedRule
? ac.permission.matchedRule
: "auto-approved",
})),
...autoDenied.map((ac) => {
const fallback =
"matchedRule" in ac.permission && ac.permission.matchedRule
? `Permission denied: ${ac.permission.matchedRule}`
: ac.permission.reason
? `Permission denied: ${ac.permission.reason}`
: "Permission denied: Unknown reason";
return {
type: "deny" as const,
approval: ac.approval,
reason: ac.denyReason ?? fallback,
};
}),
];
for (const approvalItem of autoAllowed) {
const permission = approvalItem.permission;
const autoApprovalMsg: AutoApprovalMessage = {
type: "auto_approval",
tool_call: {
name: approvalItem.approval.toolName,
tool_call_id: approvalItem.approval.toolCallId,
arguments: approvalItem.approval.toolArgs,
},
reason: permission.reason || "auto-approved",
matched_rule:
"matchedRule" in permission && permission.matchedRule
? permission.matchedRule
: "auto-approved",
session_id: sessionId,
uuid: `auto-approval-${approvalItem.approval.toolCallId}`,
};
console.log(JSON.stringify(autoApprovalMsg));
}
for (const ac of needsUserInput) {
// permission.decision === "ask" - request permission from SDK
const permResponse = await requestPermission(
ac.approval.toolCallId,
ac.approval.toolName,
ac.parsedArgs,
);
if (permResponse.decision === "allow") {
// If provided updatedInput (e.g., for AskUserQuestion with answers),
// update the approval's toolArgs to use it
const finalApproval = permResponse.updatedInput
? {
...ac.approval,
toolArgs: JSON.stringify(permResponse.updatedInput),
}
: ac.approval;
decisions.push({
type: "approve",
approval: finalApproval,
matchedRule: "SDK callback approved",
});
// Emit auto_approval event for SDK-approved tool
const autoApprovalMsg: AutoApprovalMessage = {
type: "auto_approval",
tool_call: {
name: finalApproval.toolName,
tool_call_id: finalApproval.toolCallId,
arguments: finalApproval.toolArgs,
},
reason: permResponse.reason || "SDK callback approved",
matched_rule: "canUseTool callback",
session_id: sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
};
console.log(JSON.stringify(autoApprovalMsg));
} else {
decisions.push({
type: "deny",
approval: ac.approval,
reason: permResponse.reason || "Denied by SDK callback",
});
}
}
// Execute approved tools
const { executeApprovalBatch } = await import(
"./agent/approval-execution"
);
const executedResults = await executeApprovalBatch(
decisions,
undefined,
{ toolContextId: turnToolContextId ?? undefined },
);
// Send approval results back to continue
currentInput = [
{
type: "approval",
approvals: executedResults,
} as unknown as MessageCreate,
];
// Continue the loop to process the next stream
continue;
}
// Other stop reasons - break
break;
}
// Emit result
const durationMs = performance.now() - startTime;
const lines = toLines(buffers);
const reversed = [...lines].reverse();
const lastAssistant = reversed.find(
(line) =>
line.kind === "assistant" &&
"text" in line &&
typeof line.text === "string" &&
line.text.trim().length > 0,
) as Extract<Line, { kind: "assistant" }> | undefined;
const lastReasoning = reversed.find(
(line) =>
line.kind === "reasoning" &&
"text" in line &&
typeof line.text === "string" &&
line.text.trim().length > 0,
) as Extract<Line, { kind: "reasoning" }> | undefined;
const lastToolResult = reversed.find(
(line) =>
line.kind === "tool_call" &&
"resultText" in line &&
typeof (line as Extract<Line, { kind: "tool_call" }>).resultText ===
"string" &&
(
(line as Extract<Line, { kind: "tool_call" }>).resultText ?? ""
).trim().length > 0,
) as Extract<Line, { kind: "tool_call" }> | undefined;
const resultText =
lastAssistant?.text ||
lastReasoning?.text ||
lastToolResult?.resultText ||
"";
// Determine result subtype based on how the turn ended
const isAborted = currentAbortController?.signal.aborted;
// isError if: (1) stop reason indicates error, OR (2) we emitted an error during streaming
const isError =
sawStreamError ||
(lastStopReason &&
lastStopReason !== "end_turn" &&
lastStopReason !== "requires_approval");
const subtype: ResultMessage["subtype"] = isAborted
? "interrupted"
: isError
? "error"
: "success";
const resultMsg: ResultMessage = {
type: "result",
subtype,
session_id: sessionId,
duration_ms: Math.round(durationMs),
duration_api_ms: 0, // Not tracked in bidirectional mode
num_turns: numTurns,
result: resultText,
agent_id: agent.id,
conversation_id: conversationId,
run_ids: [],
usage: null,
uuid: `result-${agent.id}-${Date.now()}`,
// Include stop_reason only when subtype is "error" (not "interrupted")
...(subtype === "error" && {
stop_reason:
lastStopReason && lastStopReason !== "end_turn"
? lastStopReason
: "error", // Use "error" if sawStreamError but lastStopReason was end_turn
}),
};
console.log(JSON.stringify(resultMsg));
} catch (error) {
// Use formatErrorDetails for comprehensive error formatting (same as one-shot mode)
const errorDetails = formatErrorDetails(error, agent.id);
const errorMsg: ErrorMessage = {
type: "error",
message: errorDetails,
stop_reason: "error",
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(errorMsg));
// Also emit a result message with subtype: "error" so SDK knows the turn failed
const errorResultMsg: ResultMessage = {
type: "result",
subtype: "error",
session_id: sessionId,
duration_ms: 0,
duration_api_ms: 0,
num_turns: 0,
result: null,
agent_id: agent.id,
conversation_id: conversationId,
run_ids: [],
usage: null,
uuid: `result-error-${agent.id}-${Date.now()}`,
stop_reason: "error",
};
console.log(JSON.stringify(errorResultMsg));
} finally {
turnInProgress = false;
blockedEmittedThisTurn = false;
currentAbortController = null;
}
continue;
}
// Unknown message type
const errorMsg: ErrorMessage = {
type: "error",
message: `Unknown message type: ${message.type}`,
stop_reason: "error",
session_id: sessionId,
uuid: randomUUID(),
};
console.log(JSON.stringify(errorMsg));
}
// Stdin closed, exit gracefully
setMessageQueueAdder(null);
process.exit(0);
}