import { randomUUID } from "node:crypto"; import type { Letta } from "@letta-ai/letta-client"; import { APIError } from "@letta-ai/letta-client/core/error"; import type { AgentState, MessageCreate, } from "@letta-ai/letta-client/resources/agents/agents"; import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import type { ApprovalDecision, ApprovalResult, } from "./agent/approval-execution"; import { extractConflictDetail, fetchRunErrorDetail, getPreStreamErrorAction, getRetryDelayMs, isApprovalPendingError, isEmptyResponseRetryable, isInvalidToolCallIdsError, parseRetryAfterHeaderMs, shouldRetryRunMetadataError, } from "./agent/approval-recovery"; import { handleBootstrapSessionState } from "./agent/bootstrapHandler"; import { getClient } from "./agent/client"; import { buildClientSkillsPayload } from "./agent/clientSkills"; import { setAgentContext, setConversationId } from "./agent/context"; import { createAgent } from "./agent/create"; import { handleListMessages } from "./agent/listMessagesHandler"; import { ISOLATED_BLOCK_LABELS } from "./agent/memory"; import { getStreamToolContextId, sendMessageStream } from "./agent/message"; import { getModelPresetUpdateForAgent, getModelUpdateArgs, getResumeRefreshArgs, resolveModel, } from "./agent/model"; import { updateAgentLLMConfig, updateAgentSystemPrompt } from "./agent/modify"; import { resolveSkillSourcesSelection } from "./agent/skillSources"; import type { SkillSource } from "./agent/skills"; import { SessionStats } from "./agent/stats"; import type { ParsedCliArgs } from "./cli/args"; import { normalizeConversationShorthandFlags, parseCsvListFlag, parseJsonArrayFlag, parsePositiveIntFlag, resolveImportFlagAlias, } from "./cli/flagUtils"; import { createBuffers, type Line, markIncompleteToolsAsCancelled, toLines, } from "./cli/helpers/accumulator"; import { classifyApprovals } from "./cli/helpers/approvalClassification"; import { createContextTracker } from "./cli/helpers/contextTracker"; import { formatErrorDetails } from "./cli/helpers/errorFormatter"; import { getReflectionSettings, type ReflectionSettings, type ReflectionTrigger, reflectionSettingsToLegacyMode, } from "./cli/helpers/memoryReminder"; import { type QueuedMessage, setMessageQueueAdder, } from "./cli/helpers/messageQueueBridge"; import { type DrainStreamHook, drainStreamWithResume, } from "./cli/helpers/stream"; import { validateConversationDefaultRequiresAgent, validateFlagConflicts, validateRegistryHandleOrThrow, } from "./cli/startupFlagValidation"; import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants"; import { computeDiffPreviews } from "./helpers/diffPreview"; import { QueueRuntime } from "./queue/queueRuntime"; import { mergeQueuedTurnInput, type QueuedTurnInput, } from "./queue/turnQueueRuntime"; import { buildSharedReminderParts, prependReminderPartsToContent, } from "./reminders/engine"; import { createSharedReminderState, syncReminderStateFromContextTracker, } from "./reminders/state"; import { settingsManager } from "./settings-manager"; import { isHeadlessAutoAllowTool, isInteractiveApprovalTool, } from "./tools/interactivePolicy"; import { type ExternalToolDefinition, registerExternalTools, setExternalToolExecutor, } from "./tools/manager"; import { clearPersistedClientToolRules } from "./tools/toolset"; import type { AutoApprovalMessage, BootstrapSessionStateRequest, CanUseToolControlRequest, CanUseToolResponse, ControlRequest, ControlResponse, ErrorMessage, ListMessagesControlRequest, MessageWire, QueueLifecycleEvent, RecoverPendingApprovalsControlRequest, RecoveryMessage, ResultMessage, RetryMessage, StreamEvent, SystemInitMessage, } from "./types/protocol"; import { debugLog, debugWarn, isDebugEnabled } from "./utils/debug"; import { markMilestone, measureSinceMilestone, reportAllMilestones, } from "./utils/timing"; // Maximum number of times to retry a turn when the backend // reports an `llm_api_error` stop reason. This helps smooth // over transient LLM/backend issues without requiring the // caller to manually resubmit the prompt. const LLM_API_ERROR_MAX_RETRIES = 3; // Retry config for empty response errors (Opus 4.6 SADs) // Retry 1: same input. Retry 2: with system reminder nudge. const EMPTY_RESPONSE_MAX_RETRIES = 2; // Retry config for 409 "conversation busy" errors (exponential backoff) const CONVERSATION_BUSY_MAX_RETRIES = 3; // 10s -> 20s -> 40s export type BidirectionalQueuedInput = QueuedTurnInput< MessageCreate["content"] >; export function mergeBidirectionalQueuedInput( queued: BidirectionalQueuedInput[], ): MessageCreate["content"] | null { return mergeQueuedTurnInput(queued, { normalizeUserContent: (content) => content, }); } type ReflectionOverrides = { trigger?: ReflectionTrigger; deprecatedBehaviorRaw?: string; stepCount?: number; }; function parseReflectionOverrides( values: ParsedCliArgs["values"], ): ReflectionOverrides { const triggerRaw = values["reflection-trigger"]; const behaviorRaw = values["reflection-behavior"]; const stepCountRaw = values["reflection-step-count"]; if (!triggerRaw && !behaviorRaw && !stepCountRaw) { return {}; } const overrides: ReflectionOverrides = {}; if (triggerRaw !== undefined) { if ( triggerRaw !== "off" && triggerRaw !== "step-count" && triggerRaw !== "compaction-event" ) { throw new Error( `Invalid --reflection-trigger "${triggerRaw}". Valid values: off, step-count, compaction-event`, ); } overrides.trigger = triggerRaw; } if (behaviorRaw !== undefined) { if (behaviorRaw !== "reminder" && behaviorRaw !== "auto-launch") { throw new Error( `Invalid --reflection-behavior "${behaviorRaw}". Valid values: reminder, auto-launch`, ); } overrides.deprecatedBehaviorRaw = behaviorRaw; } if (stepCountRaw !== undefined) { try { overrides.stepCount = parsePositiveIntFlag({ rawValue: stepCountRaw, flagName: "reflection-step-count", }); } catch { throw new Error( `Invalid --reflection-step-count "${stepCountRaw}". Expected a positive integer.`, ); } } return overrides; } function hasReflectionOverrides(overrides: ReflectionOverrides): boolean { return ( overrides.trigger !== undefined || overrides.deprecatedBehaviorRaw !== undefined || overrides.stepCount !== undefined ); } async function applyReflectionOverrides( agentId: string, overrides: ReflectionOverrides, ): Promise { const current = getReflectionSettings(); const merged: ReflectionSettings = { trigger: overrides.trigger ?? current.trigger, stepCount: overrides.stepCount ?? current.stepCount, }; if (!hasReflectionOverrides(overrides)) { return merged; } if (overrides.deprecatedBehaviorRaw !== undefined) { console.warn( "Warning: --reflection-behavior is deprecated and ignored. Reflection now always auto-launches subagents.", ); } const memfsEnabled = settingsManager.isMemfsEnabled(agentId); if (!memfsEnabled && merged.trigger === "compaction-event") { throw new Error( "--reflection-trigger compaction-event requires memfs enabled for this agent.", ); } try { settingsManager.getLocalProjectSettings(); } catch { await settingsManager.loadLocalProjectSettings(); } const legacyMode = reflectionSettingsToLegacyMode(merged); settingsManager.updateLocalProjectSettings({ memoryReminderInterval: legacyMode, reflectionTrigger: merged.trigger, reflectionStepCount: merged.stepCount, }); settingsManager.updateSettings({ memoryReminderInterval: legacyMode, reflectionTrigger: merged.trigger, reflectionStepCount: merged.stepCount, }); return merged; } export async function handleHeadlessCommand( parsedArgs: ParsedCliArgs, model?: string, skillsDirectoryOverride?: string, skillSourcesOverride?: SkillSource[], systemInfoReminderEnabledOverride?: boolean, ) { const { values, positionals } = parsedArgs; // Set tool filter if provided (controls which tools are loaded) if (values.tools !== undefined) { const { toolFilter } = await import("./tools/filter"); toolFilter.setEnabledTools(values.tools); } // Set permission mode if provided (or via --yolo alias) const permissionModeValue = values["permission-mode"]; const yoloMode = values.yolo; if (yoloMode || permissionModeValue) { const { permissionMode } = await import("./permissions/mode"); if (yoloMode) { permissionMode.setMode("bypassPermissions"); } else if (permissionModeValue) { const validModes = [ "default", "acceptEdits", "bypassPermissions", "plan", ]; if (validModes.includes(permissionModeValue)) { permissionMode.setMode( permissionModeValue as | "default" | "acceptEdits" | "bypassPermissions" | "plan", ); } } } // Set CLI permission overrides if provided (inherited from parent agent) if (values.allowedTools || values.disallowedTools) { const { cliPermissions } = await import("./permissions/cli"); if (values.allowedTools) { cliPermissions.setAllowedTools(values.allowedTools); } if (values.disallowedTools) { cliPermissions.setDisallowedTools(values.disallowedTools); } } // Check for input-format early - if stream-json, we don't need a prompt const inputFormat = values["input-format"]; const isBidirectionalMode = inputFormat === "stream-json"; // If headless output is being piped and the downstream closes early (e.g. // `| head`), Node will throw EPIPE on stdout writes. Treat this as a normal // termination rather than crashing with a stack trace. // // Note: this must be registered before any `console.log` in headless mode. process.stdout.on("error", (err: unknown) => { const code = typeof err === "object" && err !== null && "code" in err ? (err as { code?: unknown }).code : undefined; if (code === "EPIPE") { process.exit(0); } // Re-throw unknown stdout errors so they surface during tests/debugging. throw err; }); // Get prompt from either positional args or stdin (unless in bidirectional mode) let prompt = positionals.slice(2).join(" "); // If no prompt provided as args, try reading from stdin (unless in bidirectional mode) if (!prompt && !isBidirectionalMode) { // Check if stdin is available (piped input) if (!process.stdin.isTTY) { const chunks: Buffer[] = []; for await (const chunk of process.stdin) { chunks.push(chunk); } prompt = Buffer.concat(chunks).toString("utf-8").trim(); } } if (!prompt && !isBidirectionalMode) { console.error("Error: No prompt provided"); process.exit(1); } const client = await getClient(); markMilestone("HEADLESS_CLIENT_READY"); // Check for --resume flag (interactive only) if (values.resume) { console.error( "Error: --resume is for interactive mode only (opens conversation selector).\n" + "In headless mode, use:\n" + " --conversation Resume a specific conversation by ID", ); process.exit(1); } // --new: Create a new conversation (for concurrent sessions) let forceNewConversation = values.new ?? false; const fromAgentId = values["from-agent"]; // Resolve agent (same logic as interactive mode) let agent: AgentState | null = null; let specifiedAgentId = values.agent; const specifiedAgentName = values.name; let specifiedConversationId = values.conversation; const forceNew = values["new-agent"]; const systemPromptPreset = values.system; const systemCustom = values["system-custom"]; const embeddingModel = values.embedding; const memoryBlocksJson = values["memory-blocks"]; const blockValueArgs = values["block-value"]; const initBlocksRaw = values["init-blocks"]; const baseToolsRaw = values["base-tools"]; const skillsDirectory = values.skills ?? skillsDirectoryOverride; const noSkillsFlag = values["no-skills"]; const noBundledSkillsFlag = values["no-bundled-skills"]; const skillSourcesRaw = values["skill-sources"]; const memfsFlag = values.memfs; const noMemfsFlag = values["no-memfs"]; // Startup policy for the git-backed memory pull on session init. // "blocking" (default): await the pull before proceeding. // "background": fire the pull async, emit init without waiting. // "skip": skip the pull entirely this session. const memfsStartupRaw = values["memfs-startup"]; const memfsStartupPolicy: "blocking" | "background" | "skip" = memfsStartupRaw === "background" || memfsStartupRaw === "skip" ? memfsStartupRaw : "blocking"; const requestedMemoryPromptMode: "memfs" | "standard" | undefined = memfsFlag ? "memfs" : noMemfsFlag ? "standard" : undefined; const shouldAutoEnableMemfsForNewAgent = !memfsFlag && !noMemfsFlag; const fromAfFile = resolveImportFlagAlias({ importFlagValue: values.import, fromAfFlagValue: values["from-af"], }); const preLoadSkillsRaw = values["pre-load-skills"]; const systemInfoReminderEnabled = systemInfoReminderEnabledOverride ?? !values["no-system-info-reminder"]; const reflectionOverrides = (() => { try { return parseReflectionOverrides(values); } catch (error) { console.error( error instanceof Error ? `Error: ${error.message}` : String(error), ); process.exit(1); } })(); const maxTurnsRaw = values["max-turns"]; const tagsRaw = values.tags; const resolvedSkillSources = (() => { if (skillSourcesOverride) { return skillSourcesOverride; } try { return resolveSkillSourcesSelection({ skillSourcesRaw, noSkills: noSkillsFlag, noBundledSkills: noBundledSkillsFlag, }); } catch (error) { console.error( error instanceof Error ? `Error: ${error.message}` : String(error), ); process.exit(1); } })(); const tags = parseCsvListFlag(tagsRaw); // Parse and validate max-turns if provided let maxTurns: number | undefined; try { maxTurns = parsePositiveIntFlag({ rawValue: maxTurnsRaw, flagName: "max-turns", }); } catch (error) { console.error( `Error: ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } if (preLoadSkillsRaw && resolvedSkillSources.length === 0) { console.error( "Error: --pre-load-skills cannot be used when all skill sources are disabled.", ); process.exit(1); } try { const normalized = normalizeConversationShorthandFlags({ specifiedConversationId, specifiedAgentId, }); specifiedConversationId = normalized.specifiedConversationId ?? undefined; specifiedAgentId = normalized.specifiedAgentId ?? undefined; } catch (error) { console.error( error instanceof Error ? `Error: ${error.message}` : String(error), ); process.exit(1); } // Validate --conv default requires --agent (unless --new-agent will create one) try { validateConversationDefaultRequiresAgent({ specifiedConversationId, specifiedAgentId, forceNew, }); } catch (error) { console.error( error instanceof Error ? `Error: ${error.message}` : String(error), ); console.error("Usage: letta --agent agent-xyz --conv default"); console.error(" or: letta --conv agent-xyz (shorthand)"); process.exit(1); } if (fromAgentId) { if (!specifiedAgentId && !specifiedConversationId) { console.error( "Error: --from-agent requires --agent or --conversation .", ); process.exit(1); } if (forceNew) { console.error("Error: --from-agent cannot be used with --new-agent"); process.exit(1); } if (!specifiedConversationId && !forceNewConversation) { forceNewConversation = true; } } // Validate shared mutual-exclusion rules for startup flags. try { validateFlagConflicts({ guard: specifiedConversationId && specifiedConversationId !== "default", checks: [ { when: specifiedAgentId, message: "--conversation cannot be used with --agent", }, { when: specifiedAgentName, message: "--conversation cannot be used with --name", }, { when: forceNew, message: "--conversation cannot be used with --new-agent", }, { when: fromAfFile, message: "--conversation cannot be used with --import", }, ], }); validateFlagConflicts({ guard: forceNewConversation, checks: [ { when: specifiedConversationId, message: "--new cannot be used with --conversation", }, ], }); } catch (error) { console.error( error instanceof Error ? `Error: ${error.message}` : String(error), ); process.exit(1); } // Validate --import flag (also accepts legacy --from-af) // Detect if it's a registry handle (e.g., @author/name) or a local file path let isRegistryImport = false; if (fromAfFile) { try { validateFlagConflicts({ guard: fromAfFile, checks: [ { when: specifiedAgentId, message: "--import cannot be used with --agent", }, { when: specifiedAgentName, message: "--import cannot be used with --name", }, { when: forceNew, message: "--import cannot be used with --new-agent", }, ], }); } catch (error) { console.error( error instanceof Error ? `Error: ${error.message}` : String(error), ); process.exit(1); } // Check if this looks like a registry handle (@author/name) if (fromAfFile.startsWith("@")) { // Definitely a registry handle isRegistryImport = true; // Validate handle format try { validateRegistryHandleOrThrow(fromAfFile); } catch { console.error( `Error: Invalid registry handle "${fromAfFile}". Use format: letta --import @author/agentname`, ); process.exit(1); } } } // Validate --name flag if (specifiedAgentName) { if (specifiedAgentId) { console.error("Error: --name cannot be used with --agent"); process.exit(1); } if (forceNew) { console.error("Error: --name cannot be used with --new-agent"); process.exit(1); } } if (initBlocksRaw && !forceNew) { console.error( "Error: --init-blocks can only be used together with --new to control initial memory blocks.", ); process.exit(1); } const initBlocks = parseCsvListFlag(initBlocksRaw); if (baseToolsRaw && !forceNew) { console.error( "Error: --base-tools can only be used together with --new to control initial base tools.", ); process.exit(1); } const baseTools = parseCsvListFlag(baseToolsRaw); // Validate system prompt options (--system and --system-custom are mutually exclusive) if (systemPromptPreset && systemCustom) { console.error( "Error: --system and --system-custom are mutually exclusive. Use one or the other.", ); process.exit(1); } // Parse memory blocks JSON if provided // Supports two formats: // - CreateBlock: { label: string, value: string, description?: string } // - BlockReference: { blockId: string } let memoryBlocks: | Array< | { label: string; value: string; description?: string } | { blockId: string } > | undefined; if (memoryBlocksJson !== undefined) { if (!forceNew) { console.error( "Error: --memory-blocks can only be used together with --new to provide initial memory blocks.", ); process.exit(1); } try { memoryBlocks = parseJsonArrayFlag(memoryBlocksJson, "memory-blocks") as | Array<{ label: string; value: string; description?: string }> | Array<{ blockId: string }>; // Validate each block has required fields for (const block of memoryBlocks) { const hasBlockId = "blockId" in block && typeof block.blockId === "string"; const hasLabelValue = "label" in block && "value" in block && typeof block.label === "string" && typeof block.value === "string"; if (!hasBlockId && !hasLabelValue) { throw new Error( "Each memory block must have either 'blockId' (string) or 'label' and 'value' (strings)", ); } } } catch (error) { console.error( `Error: ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } } // Parse --block-value args (format: label=value) let blockValues: Record | undefined; if (blockValueArgs && blockValueArgs.length > 0) { if (!forceNew) { console.error( "Error: --block-value can only be used together with --new to set block values.", ); process.exit(1); } blockValues = {}; for (const arg of blockValueArgs) { const eqIndex = arg.indexOf("="); if (eqIndex === -1) { console.error( `Error: Invalid --block-value format "${arg}". Expected format: label=value`, ); process.exit(1); } const label = arg.slice(0, eqIndex); const value = arg.slice(eqIndex + 1); blockValues[label] = value; } } // Priority 0: --conversation derives agent from conversation ID. // "default" is a virtual agent-scoped conversation (not a retrievable conv-*). // It requires --agent and should not hit conversations.retrieve(). if (specifiedConversationId && specifiedConversationId !== "default") { try { debugLog( "conversations", `retrieve(${specifiedConversationId}) [headless convβ†’agent lookup]`, ); const conversation = await client.conversations.retrieve( specifiedConversationId, ); agent = await client.agents.retrieve(conversation.agent_id); } catch (_error) { console.error(`Conversation ${specifiedConversationId} not found`); process.exit(1); } } // Priority 1: Import from AgentFile template (local file or registry) if (!agent && fromAfFile) { let result: { agent: AgentState; skills?: string[] }; if (isRegistryImport) { // Import from letta-ai/agent-file registry const { importAgentFromRegistry } = await import("./agent/import"); result = await importAgentFromRegistry({ handle: fromAfFile, modelOverride: model, stripMessages: true, stripSkills: false, }); } else { // Import from local file const { importAgentFromFile } = await import("./agent/import"); result = await importAgentFromFile({ filePath: fromAfFile, modelOverride: model, stripMessages: true, stripSkills: false, }); } agent = result.agent; // Mark imported agents as "custom" to prevent legacy auto-migration // from overwriting their system prompt on resume. if (settingsManager.isReady) { settingsManager.setSystemPromptPreset(agent.id, "custom"); } // Display extracted skills summary if (result.skills && result.skills.length > 0) { const { getAgentSkillsDir } = await import("./agent/skills"); const skillsDir = getAgentSkillsDir(agent.id); console.log( `πŸ“¦ Extracted ${result.skills.length} skill${result.skills.length === 1 ? "" : "s"} to ${skillsDir}: ${result.skills.join(", ")}`, ); } } // Priority 2: Try to use --agent specified ID if (!agent && specifiedAgentId) { try { agent = await client.agents.retrieve(specifiedAgentId); } catch (_error) { console.error(`Agent ${specifiedAgentId} not found`); process.exit(1); } } // Priority 3: Check if --new flag was passed (skip all resume logic) if (!agent && forceNew) { const updateArgs = getModelUpdateArgs(model); // Pre-determine memfs mode so the agent is created with the correct prompt. const { isLettaCloud, enableMemfsIfCloud } = await import( "./agent/memoryFilesystem" ); const willAutoEnableMemfs = shouldAutoEnableMemfsForNewAgent && (await isLettaCloud()); const effectiveMemoryMode = requestedMemoryPromptMode ?? (willAutoEnableMemfs ? "memfs" : undefined); const createOptions = { model, embeddingModel, updateArgs, skillsDirectory, parallelToolCalls: true, systemPromptPreset, systemPromptCustom: systemCustom, memoryPromptMode: effectiveMemoryMode, initBlocks, baseTools, memoryBlocks, blockValues, tags, }; const result = await createAgent(createOptions); agent = result.agent; // Enable memfs on Letta Cloud (tags, repo clone, tool detach). if (willAutoEnableMemfs) { await enableMemfsIfCloud(agent.id); } } // Priority 4: Try to resume from project settings (.letta/settings.local.json) if (!agent) { await settingsManager.loadLocalProjectSettings(); const localAgentId = settingsManager.getLocalLastAgentId(process.cwd()); if (localAgentId) { try { agent = await client.agents.retrieve(localAgentId); } catch (_error) { // Local LRU agent doesn't exist - log and continue console.error(`Unable to locate agent ${localAgentId} in .letta/`); } } } // Priority 5: Try to reuse global LRU (covers directory-switching case) // Do NOT restore global conversation β€” use default (project-scoped conversations) if (!agent) { const globalAgentId = settingsManager.getGlobalLastAgentId(); if (globalAgentId) { try { agent = await client.agents.retrieve(globalAgentId); } catch (_error) { // Global LRU agent doesn't exist } } } // Priority 6: Fresh user with no LRU - create default agent if (!agent) { const { ensureDefaultAgents } = await import("./agent/defaults"); const defaultAgent = await ensureDefaultAgents(client, { preferredModel: model, }); if (defaultAgent) { agent = defaultAgent; } } // All paths should have resolved to an agent by now if (!agent) { console.error("No agent found. Use --new-agent to create a new agent."); process.exit(1); } markMilestone("HEADLESS_AGENT_RESOLVED"); // Check if we're resuming an existing agent (not creating a new one) const isResumingAgent = !!(specifiedAgentId || (!forceNew && !fromAfFile)); // If resuming, always refresh model settings from presets to keep // preset-derived fields in sync, then apply optional command-line // overrides (model/system prompt). if (isResumingAgent) { if (model) { const modelHandle = resolveModel(model); if (typeof modelHandle !== "string") { console.error(`Error: Invalid model "${model}"`); process.exit(1); } // Always apply model update - different model IDs can share the same // handle but have different settings (e.g., gpt-5.2-medium vs gpt-5.2-xhigh) const updateArgs = getModelUpdateArgs(model); agent = await updateAgentLLMConfig(agent.id, modelHandle, updateArgs); } else { const presetRefresh = getModelPresetUpdateForAgent(agent); if (presetRefresh) { const { updateArgs: resumeRefreshUpdateArgs, needsUpdate } = getResumeRefreshArgs(presetRefresh.updateArgs, agent); if (needsUpdate) { agent = await updateAgentLLMConfig( agent.id, presetRefresh.modelHandle, resumeRefreshUpdateArgs, { preserveContextWindow: true }, ); } } } } // Determine which conversation to use let conversationId: string; let effectiveReflectionSettings: ReflectionSettings; const isSubagent = process.env.LETTA_CODE_AGENT_ROLE === "subagent"; // Captured so prompt logic below can await it when needed. let memfsBgPromise: Promise | undefined; // Apply memfs flags and auto-enable from server tag when local settings are missing. // Respects memfsStartupPolicy: // "blocking" (default) – await the pull; exit on conflict. // "background" – fire pull async; session init proceeds immediately. // "skip" – skip the pull this session. if (memfsStartupPolicy === "skip") { // Run enable/disable logic but skip the git pull. try { const { applyMemfsFlags } = await import("./agent/memoryFilesystem"); await applyMemfsFlags(agent.id, memfsFlag, noMemfsFlag, { pullOnExistingRepo: false, agentTags: agent.tags, skipPromptUpdate: forceNew, }); } catch (error) { console.error( `Memory flags failed: ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } } else if (memfsStartupPolicy === "background") { // Fire pull async; don't block session initialisation. const { applyMemfsFlags } = await import("./agent/memoryFilesystem"); memfsBgPromise = applyMemfsFlags(agent.id, memfsFlag, noMemfsFlag, { pullOnExistingRepo: true, agentTags: agent.tags, skipPromptUpdate: forceNew, }).catch((error) => { // Log to stderr only β€” the session is already live. console.error( `[memfs background pull] ${error instanceof Error ? error.message : String(error)}`, ); }); } else { // "blocking" β€” original behaviour. try { const { applyMemfsFlags } = await import("./agent/memoryFilesystem"); const memfsResult = await applyMemfsFlags( agent.id, memfsFlag, noMemfsFlag, { pullOnExistingRepo: true, agentTags: agent.tags, skipPromptUpdate: forceNew, }, ); if (memfsResult.pullSummary?.includes("CONFLICT")) { console.error( "Memory has merge conflicts. Run in interactive mode to resolve.", ); process.exit(1); } } catch (error) { console.error( `Memory git sync failed: ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } } // Ensure background memfs sync settles before prompt logic reads isMemfsEnabled(). if (memfsBgPromise && isResumingAgent) { await memfsBgPromise; } // Apply --system flag after memfs sync so isMemfsEnabled() is up to date. if (isResumingAgent && systemPromptPreset) { const result = await updateAgentSystemPrompt(agent.id, systemPromptPreset); if (!result.success || !result.agent) { console.error(`Failed to update system prompt: ${result.message}`); process.exit(1); } agent = result.agent; } // Auto-heal system prompt drift (rebuild from stored recipe). // Runs after memfs sync so isMemfsEnabled() reflects the final state. if (isResumingAgent && !systemPromptPreset) { let storedPreset = settingsManager.getSystemPromptPreset(agent.id); // Adopt legacy agents (created before recipe tracking) as "custom" // so their prompts are left untouched by auto-heal. if ( !storedPreset && (agent.tags?.includes("origin:letta-code") || agent.tags?.includes("origin:lettabot")) && !agent.tags?.includes("role:subagent") ) { storedPreset = "custom"; settingsManager.setSystemPromptPreset(agent.id, storedPreset); } if (storedPreset && storedPreset !== "custom") { const { buildSystemPrompt: rebuildPrompt, isKnownPreset: isKnown } = await import("./agent/promptAssets"); if (isKnown(storedPreset)) { const memoryMode = settingsManager.isMemfsEnabled(agent.id) ? "memfs" : "standard"; const expected = rebuildPrompt(storedPreset, memoryMode); if (agent.system !== expected) { const client = await getClient(); await client.agents.update(agent.id, { system: expected }); agent = await client.agents.retrieve(agent.id); } } else { settingsManager.clearSystemPromptPreset(agent.id); } } } const startupAgentId = agent.id; void clearPersistedClientToolRules(startupAgentId) .then((cleanup) => { if (cleanup) { const count = cleanup.removedToolNames.length; const names = cleanup.removedToolNames.join(", "); debugLog( "headless startup", `Cleared ${count} persisted client tool rule${count === 1 ? "" : "s"} for ${startupAgentId}${count > 0 ? `: ${names}` : ""}`, ); return; } debugLog( "headless startup", `No persisted client tool rules to clear for ${startupAgentId}`, ); }) .catch((error) => { debugWarn( "headless startup", `Failed to clear persisted client tool rules for ${startupAgentId}: ${error instanceof Error ? error.message : String(error)}`, ); }); try { effectiveReflectionSettings = await applyReflectionOverrides( agent.id, reflectionOverrides, ); } catch (error) { console.error( `Failed to apply sleeptime settings: ${error instanceof Error ? error.message : String(error)}`, ); process.exit(1); } // Determine which blocks to isolate for the conversation const isolatedBlockLabels: string[] = initBlocks === undefined ? [...ISOLATED_BLOCK_LABELS] : ISOLATED_BLOCK_LABELS.filter((label) => initBlocks.includes(label as string), ); if (specifiedConversationId) { if (specifiedConversationId === "default") { // "default" is the agent's primary message history (no explicit conversation) // Don't validate - just use it directly conversationId = "default"; } else { // User specified an explicit conversation to resume - validate it exists try { debugLog( "conversations", `retrieve(${specifiedConversationId}) [headless --conv validate]`, ); await client.conversations.retrieve(specifiedConversationId); conversationId = specifiedConversationId; } catch { console.error( `Error: Conversation ${specifiedConversationId} not found`, ); process.exit(1); } } } else if (forceNewConversation) { // --new flag: create a new conversation (for concurrent sessions) const conversation = await client.conversations.create({ agent_id: agent.id, isolated_block_labels: isolatedBlockLabels, }); conversationId = conversation.id; } else if (isSubagent) { // Freshly created subagents have no concurrency risk β€” use the default // conversation so it's easy to inspect in the ADE. conversationId = "default"; } else { // Default for headless: always create a new conversation to avoid // 409 "conversation busy" races (e.g., parent agent calling letta -p). // Use --conv default to explicitly target the agent's // primary conversation. const conversation = await client.conversations.create({ agent_id: agent.id, isolated_block_labels: isolatedBlockLabels, }); conversationId = conversation.id; } markMilestone("HEADLESS_CONVERSATION_READY"); // Set conversation ID in context for tools (e.g., Skill tool) to access setConversationId(conversationId); // Save session (agent + conversation) to both project and global settings // Skip for subagents - they shouldn't pollute the LRU settings if (!isSubagent) { await settingsManager.loadLocalProjectSettings(); settingsManager.setLocalLastSession( { agentId: agent.id, conversationId }, process.cwd(), ); settingsManager.setGlobalLastSession({ agentId: agent.id, conversationId, }); } // Set agent context for tools that need it (e.g., Skill tool, Task tool) setAgentContext(agent.id, skillsDirectory, resolvedSkillSources); // Validate output format const outputFormat = values["output-format"] || "text"; const includePartialMessages = Boolean(values["include-partial-messages"]); if (!["text", "json", "stream-json"].includes(outputFormat)) { console.error( `Error: Invalid output format "${outputFormat}". Valid formats: text, json, stream-json`, ); process.exit(1); } if (inputFormat && inputFormat !== "stream-json") { console.error( `Error: Invalid input format "${inputFormat}". Valid formats: stream-json`, ); process.exit(1); } const { getClientToolsFromRegistry } = await import("./tools/manager"); const loadedToolNames = getClientToolsFromRegistry().map((t) => t.name); const availableTools = loadedToolNames.length > 0 ? loadedToolNames : agent.tools?.map((t) => t.name).filter((n): n is string => !!n) || []; // If input-format is stream-json, use bidirectional mode if (isBidirectionalMode) { await runBidirectionalMode( agent, conversationId, client, outputFormat, includePartialMessages, availableTools, resolvedSkillSources, systemInfoReminderEnabled, effectiveReflectionSettings, ); return; } // Create buffers to accumulate stream (pass agent.id for server-side tool hooks) const buffers = createBuffers(agent.id); // Initialize session stats const sessionStats = new SessionStats(); // Use agent.id as session_id for all stream-json messages const sessionId = agent.id; // Output init event for stream-json format if (outputFormat === "stream-json") { const initEvent: SystemInitMessage = { type: "system", subtype: "init", session_id: sessionId, agent_id: agent.id, conversation_id: conversationId, model: agent.llm_config?.model ?? "", tools: availableTools, cwd: process.cwd(), mcp_servers: [], permission_mode: "", slash_commands: [], memfs_enabled: settingsManager.isMemfsEnabled(agent.id), skill_sources: resolvedSkillSources, system_info_reminder_enabled: systemInfoReminderEnabled, reflection_trigger: effectiveReflectionSettings.trigger, reflection_step_count: effectiveReflectionSettings.stepCount, uuid: `init-${agent.id}`, }; console.log(JSON.stringify(initEvent)); } const reminderContextTracker = createContextTracker(); const sharedReminderState = createSharedReminderState(); // Helper to resolve any pending approvals before sending user input const resolveAllPendingApprovals = async () => { const { getResumeData } = await import("./agent/check-approval"); while (true) { // Re-fetch agent to get latest in-context messages (source of truth for backend) const freshAgent = await client.agents.retrieve(agent.id); let resume: Awaited>; try { resume = await getResumeData(client, freshAgent, conversationId); } catch (error) { // Treat 404/422 as "no approvals" - stale message/conversation state if ( error instanceof APIError && (error.status === 404 || error.status === 422) ) { break; } throw error; } // Use plural field for parallel tool calls const pendingApprovals = resume.pendingApprovals || []; if (pendingApprovals.length === 0) break; // Phase 1: Collect decisions for all approvals type Decision = | { type: "approve"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; reason: string; matchedRule: string; } | { type: "deny"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; reason: string; }; const { autoAllowed, autoDenied } = await classifyApprovals( pendingApprovals, { alwaysRequiresUserInput: isInteractiveApprovalTool, treatAskAsDeny: true, denyReasonForAsk: "Tool requires approval (headless mode)", requireArgsForAutoApprove: true, missingNameReason: "Tool call incomplete - missing name", }, ); const decisions: Decision[] = [ ...autoAllowed.map((ac) => ({ type: "approve" as const, approval: ac.approval, reason: ac.permission.reason || "Allowed by permission rule", matchedRule: "matchedRule" in ac.permission && ac.permission.matchedRule ? ac.permission.matchedRule : "auto-approved", })), ...autoDenied.map((ac) => { const fallback = "matchedRule" in ac.permission && ac.permission.matchedRule ? `Permission denied: ${ac.permission.matchedRule}` : ac.permission.reason ? `Permission denied: ${ac.permission.reason}` : "Permission denied: Unknown reason"; return { type: "deny" as const, approval: ac.approval, reason: ac.denyReason ?? fallback, }; }), ]; // Phase 2: Execute approved tools and format results using shared function const { executeApprovalBatch } = await import( "./agent/approval-execution" ); // Emit auto_approval events for stream-json format if (outputFormat === "stream-json") { for (const decision of decisions) { if (decision.type === "approve") { const autoApprovalMsg: AutoApprovalMessage = { type: "auto_approval", tool_call: { name: decision.approval.toolName, tool_call_id: decision.approval.toolCallId, arguments: decision.approval.toolArgs, }, reason: decision.reason, matched_rule: decision.matchedRule, session_id: sessionId, uuid: `auto-approval-${decision.approval.toolCallId}`, }; console.log(JSON.stringify(autoApprovalMsg)); } } } const executedResults = await executeApprovalBatch(decisions); // Send all results in one batch const approvalInput: ApprovalCreate = { type: "approval", approvals: executedResults as ApprovalResult[], otid: randomUUID(), }; // Inject queued skill content as user message parts (LET-7353) const approvalMessages: Array< | import("@letta-ai/letta-client/resources/agents/agents").MessageCreate | import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate > = [approvalInput]; { const { consumeQueuedSkillContent } = await import( "./tools/impl/skillContentRegistry" ); const skillContents = consumeQueuedSkillContent(); if (skillContents.length > 0) { approvalMessages.push({ role: "user" as const, content: skillContents.map((sc) => ({ type: "text" as const, text: sc.content, })), otid: randomUUID(), }); } } // Send the approval to clear the pending state; drain the stream without output const approvalStream = await sendMessageStream( conversationId, approvalMessages, { agentId: agent.id }, ); const drainResult = await drainStreamWithResume( approvalStream, createBuffers(agent.id), () => {}, undefined, undefined, undefined, reminderContextTracker, ); // If the approval drain errored or was cancelled, abort rather than // looping back and re-fetching approvals (which would restart the cycle). if ( drainResult.stopReason === "error" || drainResult.stopReason === "cancelled" ) { throw new Error( `Approval drain ended with stop reason: ${drainResult.stopReason}`, ); } } }; // Clear any pending approvals before starting a new turn - ONLY when resuming (LET-7101) // For new agents/conversations, lazy recovery handles any edge cases if (isResumingAgent) { await resolveAllPendingApprovals(); } // Build message content with reminders const contentParts: MessageCreate["content"] = []; const pushPart = (text: string) => { if (!text) return; contentParts.push({ type: "text", text }); }; if (fromAgentId) { const senderAgentId = fromAgentId; const senderAgent = await client.agents.retrieve(senderAgentId); const systemReminder = `${SYSTEM_REMINDER_OPEN} This message is from "${senderAgent.name}" (agent ID: ${senderAgentId}), an agent currently running inside the Letta Code CLI (docs.letta.com/letta-code). The sender will only see the final message you generate (not tool calls or reasoning). If you need to share detailed information, include it in your response text. ${SYSTEM_REMINDER_CLOSE} `; pushPart(systemReminder); } syncReminderStateFromContextTracker( sharedReminderState, reminderContextTracker, ); const lastRunAt = (agent as { last_run_completion?: string }) .last_run_completion; const { parts: sharedReminderParts } = await buildSharedReminderParts({ mode: isSubagent ? "subagent" : "headless-one-shot", agent: { id: agent.id, name: agent.name, description: agent.description, lastRunAt: lastRunAt ?? null, }, state: sharedReminderState, sessionContextReminderEnabled: systemInfoReminderEnabled, reflectionSettings: effectiveReflectionSettings, skillSources: resolvedSkillSources, resolvePlanModeReminder: async () => { const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets"); return PLAN_MODE_REMINDER; }, }); for (const part of sharedReminderParts) { pushPart(part.text); } // Pre-load specific skills' full content (used by subagents with skills: field) if (preLoadSkillsRaw) { const { readFile: readFileAsync } = await import("node:fs/promises"); const { skillPathById } = await buildClientSkillsPayload({ agentId: agent.id, skillSources: resolvedSkillSources, logger: (message) => { if (isDebugEnabled()) { console.warn(`[DEBUG] ${message}`); } }, }); const skillIds = preLoadSkillsRaw .split(",") .map((s) => s.trim()) .filter(Boolean); const loadedContents: string[] = []; for (const skillId of skillIds) { const skillPath = skillPathById[skillId]; if (!skillPath) continue; try { const content = await readFileAsync(skillPath, "utf-8"); loadedContents.push(`<${skillId}>\n${content}\n`); } catch { // Skill file not readable, skip } } if (loadedContents.length > 0) { pushPart( `\n${loadedContents.join("\n\n")}\n`, ); } } // Add user prompt pushPart(prompt); // Start with the user message let currentInput: Array = [ { role: "user", content: contentParts, otid: randomUUID(), }, ]; const refreshCurrentInputOtids = () => { // Terminal stop-reason retries are NEW requests and must not reuse OTIDs. currentInput = currentInput.map((item) => ({ ...item, otid: randomUUID(), })); }; // Track lastRunId outside the while loop so it's available in catch block let lastKnownRunId: string | null = null; let llmApiErrorRetries = 0; let emptyResponseRetries = 0; let conversationBusyRetries = 0; markMilestone("HEADLESS_FIRST_STREAM_START"); measureSinceMilestone("headless-setup-total", "HEADLESS_CLIENT_READY"); // Helper to check max turns limit using server-side step count from buffers const checkMaxTurns = () => { if (maxTurns !== undefined && buffers.usage.stepCount >= maxTurns) { if (outputFormat === "stream-json") { const errorMsg: ErrorMessage = { type: "error", message: `Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`, stop_reason: "max_steps", session_id: sessionId, uuid: `error-max-turns-${randomUUID()}`, }; console.log(JSON.stringify(errorMsg)); } else { console.error( `Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`, ); } process.exit(1); } }; try { while (true) { // Check max turns limit before starting a new turn (uses server-side step count) checkMaxTurns(); // Inject queued skill content as user message parts (LET-7353) { const { consumeQueuedSkillContent } = await import( "./tools/impl/skillContentRegistry" ); const skillContents = consumeQueuedSkillContent(); if (skillContents.length > 0) { currentInput = [ ...currentInput, { role: "user" as const, content: skillContents.map((sc) => ({ type: "text" as const, text: sc.content, })), otid: randomUUID(), }, ]; } } // Wrap sendMessageStream in try-catch to handle pre-stream errors (e.g., 409) let stream: Awaited>; let turnToolContextId: string | null = null; try { stream = await sendMessageStream(conversationId, currentInput, { agentId: agent.id, }); turnToolContextId = getStreamToolContextId(stream); } catch (preStreamError) { // Extract error detail using shared helper (handles nested/direct/message shapes) const errorDetail = extractConflictDetail(preStreamError); const preStreamAction = getPreStreamErrorAction( errorDetail, conversationBusyRetries, CONVERSATION_BUSY_MAX_RETRIES, { status: preStreamError instanceof APIError ? preStreamError.status : undefined, transientRetries: llmApiErrorRetries, maxTransientRetries: LLM_API_ERROR_MAX_RETRIES, }, ); // Check for pending approval blocking new messages - resolve and retry. // This is distinct from "conversation busy" and needs approval resolution, // not just a timed delay. if (preStreamAction === "resolve_approval_pending") { if (outputFormat === "stream-json") { const recoveryMsg: RecoveryMessage = { type: "recovery", recovery_type: "approval_pending", message: "Detected pending approval conflict on send; resolving before retry", session_id: sessionId, uuid: `recovery-pre-stream-${randomUUID()}`, }; console.log(JSON.stringify(recoveryMsg)); } else { console.error( "Pending approval detected, resolving before retry...", ); } await resolveAllPendingApprovals(); continue; } // Check for 409 "conversation busy" - wait and retry. // Stream resume is not attempted here: without OTID validation we // cannot confirm the in-flight run belongs to this request (e.g. two // terminals on the same agent). App.tsx handles resume with proper // context via discoverFallbackRunIdWithTimeout. if (preStreamAction === "retry_conversation_busy") { conversationBusyRetries += 1; const retryDelayMs = getRetryDelayMs({ category: "conversation_busy", attempt: conversationBusyRetries, }); if (outputFormat === "stream-json") { const retryMsg: RetryMessage = { type: "retry", reason: "error", attempt: conversationBusyRetries, max_attempts: CONVERSATION_BUSY_MAX_RETRIES, delay_ms: retryDelayMs, session_id: sessionId, uuid: `retry-conversation-busy-${randomUUID()}`, }; console.log(JSON.stringify(retryMsg)); } else { console.error( `Conversation is busy, waiting ${Math.round(retryDelayMs / 1000)}s and retrying...`, ); } await new Promise((resolve) => setTimeout(resolve, retryDelayMs)); continue; } if (preStreamAction === "retry_transient") { const attempt = llmApiErrorRetries + 1; const retryAfterMs = preStreamError instanceof APIError ? parseRetryAfterHeaderMs( preStreamError.headers?.get("retry-after"), ) : null; const delayMs = getRetryDelayMs({ category: "transient_provider", attempt, detail: errorDetail, retryAfterMs, }); llmApiErrorRetries = attempt; if (outputFormat === "stream-json") { const retryMsg: RetryMessage = { type: "retry", reason: "llm_api_error", attempt, max_attempts: LLM_API_ERROR_MAX_RETRIES, delay_ms: delayMs, session_id: sessionId, uuid: `retry-pre-stream-${randomUUID()}`, }; console.log(JSON.stringify(retryMsg)); } else { const delaySeconds = Math.round(delayMs / 1000); console.error( `Transient API error before streaming (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`, ); } await new Promise((resolve) => setTimeout(resolve, delayMs)); conversationBusyRetries = 0; continue; } // Reset conversation busy retry counter on other errors conversationBusyRetries = 0; // Re-throw to outer catch for other errors throw preStreamError; } // For stream-json, output each chunk as it arrives let stopReason: StopReasonType | null = null; let approvals: Array<{ toolCallId: string; toolName: string; toolArgs: string; }> = []; let apiDurationMs: number; let lastRunId: string | null = null; let approvalPendingRecovery = false; if (outputFormat === "stream-json") { // Track approval requests across streamed chunks const autoApprovalEmitted = new Set(); const streamJsonHook: DrainStreamHook = async ({ chunk, shouldOutput, errorInfo, updatedApproval, }) => { let shouldOutputChunk = shouldOutput; if (errorInfo && shouldOutput) { const errorEvent: ErrorMessage = { type: "error", message: errorInfo.message, stop_reason: "error", run_id: errorInfo.run_id, session_id: sessionId, uuid: randomUUID(), ...(errorInfo.error_type && errorInfo.run_id && { api_error: { message_type: "error_message", message: errorInfo.message, error_type: errorInfo.error_type, detail: errorInfo.detail, run_id: errorInfo.run_id, }, }), }; console.log(JSON.stringify(errorEvent)); shouldOutputChunk = false; } // Detect server conflict due to pending approval; handle it and retry // Check both detail and message fields since error formats vary if ( isApprovalPendingError(errorInfo?.detail) || isApprovalPendingError(errorInfo?.message) ) { const recoveryRunId = errorInfo?.run_id; const recoveryMsg: RecoveryMessage = { type: "recovery", recovery_type: "approval_pending", message: "Detected pending approval conflict; auto-denying stale approval and retrying", run_id: recoveryRunId ?? undefined, session_id: sessionId, uuid: `recovery-${recoveryRunId || randomUUID()}`, }; console.log(JSON.stringify(recoveryMsg)); approvalPendingRecovery = true; return { stopReason: "error", shouldAccumulate: true }; } // Check if this approval will be auto-approved. Dedup per tool_call_id if ( updatedApproval && !autoApprovalEmitted.has(updatedApproval.toolCallId) ) { const { autoAllowed } = await classifyApprovals([updatedApproval], { alwaysRequiresUserInput: isInteractiveApprovalTool, requireArgsForAutoApprove: true, missingNameReason: "Tool call incomplete - missing name", }); const [approval] = autoAllowed; if (approval) { const permission = approval.permission; shouldOutputChunk = false; const autoApprovalMsg: AutoApprovalMessage = { type: "auto_approval", tool_call: { name: approval.approval.toolName, tool_call_id: approval.approval.toolCallId, arguments: approval.approval.toolArgs || "{}", }, reason: permission.reason || "Allowed by permission rule", matched_rule: "matchedRule" in permission && permission.matchedRule ? permission.matchedRule : "auto-approved", session_id: sessionId, uuid: `auto-approval-${approval.approval.toolCallId}`, }; console.log(JSON.stringify(autoApprovalMsg)); autoApprovalEmitted.add(approval.approval.toolCallId); } } if (shouldOutputChunk) { const chunkWithIds = chunk as typeof chunk & { otid?: string; id?: string; }; const uuid = chunkWithIds.otid || chunkWithIds.id; if (includePartialMessages) { const streamEvent: StreamEvent = { type: "stream_event", event: chunk, session_id: sessionId, uuid: uuid || randomUUID(), }; console.log(JSON.stringify(streamEvent)); } else { const msg: MessageWire = { type: "message", ...chunk, session_id: sessionId, uuid: uuid || randomUUID(), }; console.log(JSON.stringify(msg)); } } return { shouldOutput: shouldOutputChunk, shouldAccumulate: true }; }; const result = await drainStreamWithResume( stream, buffers, () => {}, undefined, undefined, streamJsonHook, reminderContextTracker, ); stopReason = result.stopReason; approvals = result.approvals || []; apiDurationMs = result.apiDurationMs; lastRunId = result.lastRunId || null; if (lastRunId) lastKnownRunId = lastRunId; } else { // Normal mode: use drainStreamWithResume const result = await drainStreamWithResume( stream, buffers, () => {}, // No UI refresh needed in headless mode undefined, undefined, undefined, reminderContextTracker, ); stopReason = result.stopReason; approvals = result.approvals || []; apiDurationMs = result.apiDurationMs; lastRunId = result.lastRunId || null; if (lastRunId) lastKnownRunId = lastRunId; } // Track API duration for this stream sessionStats.endTurn(apiDurationMs); // Check max turns after each turn (server may have taken multiple steps) checkMaxTurns(); if (approvalPendingRecovery) { await resolveAllPendingApprovals(); continue; } // Case 1: Turn ended normally if (stopReason === "end_turn") { // Reset retry counters on success llmApiErrorRetries = 0; emptyResponseRetries = 0; conversationBusyRetries = 0; break; } // Case 2: Requires approval - batch process all approvals if (stopReason === "requires_approval") { if (approvals.length === 0) { console.error("Unexpected empty approvals array"); process.exit(1); } // Phase 1: Collect decisions for all approvals type Decision = | { type: "approve"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; } | { type: "deny"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; reason: string; }; const { autoAllowed, autoDenied, needsUserInput } = await classifyApprovals(approvals, { alwaysRequiresUserInput: isInteractiveApprovalTool, requireArgsForAutoApprove: true, missingNameReason: "Tool call incomplete - missing name", }); const decisions: Decision[] = [ ...autoAllowed.map((ac) => ({ type: "approve" as const, approval: ac.approval, })), ...needsUserInput.map((ac) => { // One-shot headless mode has no control channel for interactive // approvals. Auto-allow plan-mode entry/exit tools, while denying // tools that need runtime user responses. if (isHeadlessAutoAllowTool(ac.approval.toolName)) { return { type: "approve" as const, approval: ac.approval, }; } return { type: "deny" as const, approval: ac.approval, reason: "Tool requires approval (headless mode)", }; }), ...autoDenied.map((ac) => { const fallback = "matchedRule" in ac.permission && ac.permission.matchedRule ? `Permission denied: ${ac.permission.matchedRule}` : ac.permission.reason ? `Permission denied: ${ac.permission.reason}` : "Permission denied: Unknown reason"; return { type: "deny" as const, approval: ac.approval, reason: ac.denyReason ?? fallback, }; }), ]; // Phase 2: Execute all approved tools and format results using shared function const { executeApprovalBatch } = await import( "./agent/approval-execution" ); const executedResults = await executeApprovalBatch( decisions, undefined, { toolContextId: turnToolContextId ?? undefined, }, ); // Send all results in one batch const approvalInputWithOtid = { type: "approval" as const, approvals: executedResults as ApprovalResult[], otid: randomUUID(), }; currentInput = [approvalInputWithOtid]; continue; } // Cache latest error text for this turn let latestErrorText: string | null = null; const linesForTurn = toLines(buffers); for (let i = linesForTurn.length - 1; i >= 0; i -= 1) { const line = linesForTurn[i]; if ( line?.kind === "error" && "text" in line && typeof line.text === "string" ) { latestErrorText = line.text; break; } } // Fetch run error detail for invalid tool call ID detection const detailFromRun = await fetchRunErrorDetail(lastRunId); // Case 3: Transient LLM API error - retry with exponential backoff up to a limit if (stopReason === "llm_api_error") { if (llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES) { const attempt = llmApiErrorRetries + 1; const delayMs = getRetryDelayMs({ category: "transient_provider", attempt, detail: detailFromRun, }); llmApiErrorRetries = attempt; if (outputFormat === "stream-json") { const retryMsg: RetryMessage = { type: "retry", reason: "llm_api_error", attempt, max_attempts: LLM_API_ERROR_MAX_RETRIES, delay_ms: delayMs, run_id: lastRunId ?? undefined, session_id: sessionId, uuid: `retry-${lastRunId || randomUUID()}`, }; console.log(JSON.stringify(retryMsg)); } else { const delaySeconds = Math.round(delayMs / 1000); console.error( `LLM API error encountered (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`, ); } // Exponential backoff before retrying the same input await new Promise((resolve) => setTimeout(resolve, delayMs)); // Post-stream retry creates a new run/request. refreshCurrentInputOtids(); continue; } } // "Invalid tool call IDs" means server HAS pending approvals but with different IDs. // Fetch the actual pending approvals and process them before retrying. const invalidIdsDetected = isInvalidToolCallIdsError(detailFromRun) || isInvalidToolCallIdsError(latestErrorText); if (invalidIdsDetected) { if (outputFormat === "stream-json") { const recoveryMsg: RecoveryMessage = { type: "recovery", recovery_type: "invalid_tool_call_ids", message: "Tool call ID mismatch; fetching actual pending approvals and resyncing", run_id: lastRunId ?? undefined, session_id: sessionId, uuid: `recovery-${lastRunId || randomUUID()}`, }; console.log(JSON.stringify(recoveryMsg)); } else { console.error( "Tool call ID mismatch; fetching actual pending approvals...", ); } try { // Fetch and process actual pending approvals from server await resolveAllPendingApprovals(); // After processing, continue to next iteration (fresh state) continue; } catch { // If fetch fails, exit with error if (outputFormat === "stream-json") { const errorMsg: ErrorMessage = { type: "error", message: "Failed to fetch pending approvals for resync", stop_reason: stopReason, run_id: lastRunId ?? undefined, session_id: sessionId, uuid: `error-${lastRunId || randomUUID()}`, }; console.log(JSON.stringify(errorMsg)); } else { console.error("Failed to fetch pending approvals for resync"); } process.exit(1); } } // Unexpected stop reason (error, llm_api_error, etc.) // Before failing, check run metadata to see if this is a retriable error // This handles cases where the backend sends a generic error stop_reason but the // underlying cause is a transient LLM/network issue that should be retried // Early exit for stop reasons that should never be retried const nonRetriableReasons: StopReasonType[] = [ "cancelled", "requires_approval", "max_steps", "max_tokens_exceeded", "context_window_overflow_in_system_prompt", "end_turn", "tool_rule", "no_tool_call", ]; if (nonRetriableReasons.includes(stopReason)) { // Fall through to error display } else if (lastRunId && llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES) { try { const run = await client.runs.retrieve(lastRunId); const metaError = run.metadata?.error as | { error_type?: string; message?: string; detail?: string; // Handle nested error structure (error.error) that can occur in some edge cases error?: { error_type?: string; detail?: string }; } | undefined; // Check for llm_error at top level or nested (handles error.error nesting) const errorType = metaError?.error_type ?? metaError?.error?.error_type; const detail = metaError?.detail ?? metaError?.error?.detail ?? ""; // Special handling for empty response errors (Opus 4.6 SADs) // Empty LLM response retry (e.g. Opus 4.6 occasionally returns no content). // Retry 1: same input unchanged. Retry 2: append system reminder nudging the model. if ( isEmptyResponseRetryable( errorType, detail, emptyResponseRetries, EMPTY_RESPONSE_MAX_RETRIES, ) ) { const attempt = emptyResponseRetries + 1; const delayMs = getRetryDelayMs({ category: "empty_response", attempt, }); emptyResponseRetries = attempt; // Only append a nudge on the last attempt if (attempt >= EMPTY_RESPONSE_MAX_RETRIES) { const nudgeMessage: MessageCreate = { role: "system", content: `The previous response was empty. Please provide a response with either text content or a tool call.`, otid: randomUUID(), }; currentInput = [...currentInput, nudgeMessage]; } if (outputFormat === "stream-json") { const retryMsg: RetryMessage = { type: "retry", reason: "llm_api_error", attempt, max_attempts: EMPTY_RESPONSE_MAX_RETRIES, delay_ms: delayMs, run_id: lastRunId ?? undefined, session_id: sessionId, uuid: `retry-empty-${lastRunId || randomUUID()}`, }; console.log(JSON.stringify(retryMsg)); } else { console.error( `Empty LLM response, retrying (attempt ${attempt} of ${EMPTY_RESPONSE_MAX_RETRIES})...`, ); } await new Promise((resolve) => setTimeout(resolve, delayMs)); // Empty-response retry creates a new run/request. refreshCurrentInputOtids(); continue; } if (shouldRetryRunMetadataError(errorType, detail)) { const attempt = llmApiErrorRetries + 1; const delayMs = getRetryDelayMs({ category: "transient_provider", attempt, detail, }); llmApiErrorRetries = attempt; if (outputFormat === "stream-json") { const retryMsg: RetryMessage = { type: "retry", reason: "llm_api_error", attempt, max_attempts: LLM_API_ERROR_MAX_RETRIES, delay_ms: delayMs, run_id: lastRunId ?? undefined, session_id: sessionId, uuid: `retry-${lastRunId || randomUUID()}`, }; console.log(JSON.stringify(retryMsg)); } else { const delaySeconds = Math.round(delayMs / 1000); console.error( `LLM API error encountered (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`, ); } await new Promise((resolve) => setTimeout(resolve, delayMs)); // Post-stream retry creates a new run/request. refreshCurrentInputOtids(); continue; } } catch (_e) { // If we can't fetch run metadata, fall through to normal error handling } } // Mark incomplete tool calls as cancelled to prevent stuck state markIncompleteToolsAsCancelled(buffers, true, "stream_error"); // Extract error details from buffers if available const errorLines = toLines(buffers).filter( (line) => line.kind === "error", ); const errorMessages = errorLines .map((line) => ("text" in line ? line.text : "")) .filter(Boolean); let errorMessage = errorMessages.length > 0 ? errorMessages.join("; ") : `Unexpected stop reason: ${stopReason}`; // Fetch detailed error from run metadata if available (same as TUI mode) if (lastRunId && errorMessages.length === 0) { try { const run = await client.runs.retrieve(lastRunId); if (run.metadata?.error) { const errorData = run.metadata.error as { type?: string; message?: string; detail?: string; }; // Construct error object that formatErrorDetails can parse const errorObject = { error: { error: errorData, run_id: lastRunId, }, }; errorMessage = formatErrorDetails(errorObject, agent.id); } } catch (_e) { // If we can't fetch error details, append note to error message errorMessage = `${errorMessage}\n(Unable to fetch additional error details from server)`; } } if (outputFormat === "stream-json") { // Emit error event const errorMsg: ErrorMessage = { type: "error", message: errorMessage, stop_reason: stopReason, run_id: lastRunId ?? undefined, session_id: sessionId, uuid: `error-${lastRunId || randomUUID()}`, }; console.log(JSON.stringify(errorMsg)); } else { console.error(`Error: ${errorMessage}`); } process.exit(1); } } catch (error) { // Mark incomplete tool calls as cancelled markIncompleteToolsAsCancelled(buffers, true, "stream_error"); // Use comprehensive error formatting (same as TUI mode) const errorDetails = formatErrorDetails(error, agent.id); if (outputFormat === "stream-json") { const errorMsg: ErrorMessage = { type: "error", message: errorDetails, stop_reason: "error", run_id: lastKnownRunId ?? undefined, session_id: sessionId, uuid: `error-${lastKnownRunId || randomUUID()}`, }; console.log(JSON.stringify(errorMsg)); } else { console.error(`Error: ${errorDetails}`); } process.exit(1); } // Update stats with final usage data from buffers sessionStats.updateUsageFromBuffers(buffers); // Extract final result from transcript, with sensible fallbacks const lines = toLines(buffers); const reversed = [...lines].reverse(); const lastAssistant = reversed.find( (line) => line.kind === "assistant" && "text" in line && typeof line.text === "string" && line.text.trim().length > 0, ) as Extract | undefined; const lastReasoning = reversed.find( (line) => line.kind === "reasoning" && "text" in line && typeof line.text === "string" && line.text.trim().length > 0, ) as Extract | undefined; const lastToolResult = reversed.find( (line) => line.kind === "tool_call" && "resultText" in line && typeof (line as Extract).resultText === "string" && ((line as Extract).resultText ?? "").trim() .length > 0, ) as Extract | undefined; const resultText = lastAssistant?.text || lastReasoning?.text || lastToolResult?.resultText || "No assistant response found"; const stats = sessionStats.getSnapshot(); const usage = { prompt_tokens: stats.usage.promptTokens, completion_tokens: stats.usage.completionTokens, total_tokens: stats.usage.totalTokens, step_count: stats.usage.stepCount, cached_input_tokens: stats.usage.cachedInputTokens, cache_write_tokens: stats.usage.cacheWriteTokens, reasoning_tokens: stats.usage.reasoningTokens, ...(stats.usage.contextTokens !== undefined && { context_tokens: stats.usage.contextTokens, }), }; // Output based on format if (outputFormat === "json") { const output = { type: "result", subtype: "success", is_error: false, duration_ms: Math.round(stats.totalWallMs), duration_api_ms: Math.round(stats.totalApiMs), num_turns: stats.usage.stepCount, result: resultText, agent_id: agent.id, conversation_id: conversationId, usage, }; console.log(JSON.stringify(output, null, 2)); } else if (outputFormat === "stream-json") { // Output final result event // Collect all run_ids from buffers const allRunIds = new Set(); for (const line of toLines(buffers)) { // Extract run_id from any line that might have it // This is a fallback in case we missed any during streaming if ("run_id" in line && typeof line.run_id === "string") { allRunIds.add(line.run_id); } } // Use the last run_id as the result uuid if available, otherwise derive from agent_id const resultUuid = allRunIds.size > 0 ? `result-${Array.from(allRunIds).pop()}` : `result-${agent.id}`; const resultEvent: ResultMessage = { type: "result", subtype: "success", session_id: sessionId, duration_ms: Math.round(stats.totalWallMs), duration_api_ms: Math.round(stats.totalApiMs), num_turns: stats.usage.stepCount, result: resultText, agent_id: agent.id, conversation_id: conversationId, run_ids: Array.from(allRunIds), usage, uuid: resultUuid, }; console.log(JSON.stringify(resultEvent)); } else { // text format (default) if (!resultText || resultText === "No assistant response found") { console.error("No assistant response found"); process.exit(1); } console.log(resultText); } // Report all milestones at the end for latency audit markMilestone("HEADLESS_COMPLETE"); reportAllMilestones(); } /** * Extract plain text from a MessageCreate content value (string or parts array). * Used to build a synthetic user Line for the reflection transcript. */ function extractUserTextFromContent( content: MessageCreate["content"], ): string { if (typeof content === "string") return content; if (Array.isArray(content)) { return content .filter( (p): p is { type: "text"; text: string } => typeof p === "object" && p !== null && "type" in p && (p as { type: unknown }).type === "text" && "text" in p && typeof (p as { text: unknown }).text === "string", ) .map((p) => p.text) .join("\n"); } return ""; } /** * Bidirectional mode for SDK communication. * Reads JSON messages from stdin, processes them, and outputs responses. * Stays alive until stdin closes. */ async function runBidirectionalMode( agent: AgentState, conversationId: string, client: Letta, _outputFormat: string, includePartialMessages: boolean, availableTools: string[], skillSources: SkillSource[], systemInfoReminderEnabled: boolean, reflectionSettings: ReflectionSettings, ): Promise { const sessionId = agent.id; const readline = await import("node:readline"); // Emit init event const initEvent = { type: "system", subtype: "init", session_id: sessionId, agent_id: agent.id, conversation_id: conversationId, model: agent.llm_config?.model, tools: availableTools, cwd: process.cwd(), memfs_enabled: settingsManager.isMemfsEnabled(agent.id), skill_sources: skillSources, system_info_reminder_enabled: systemInfoReminderEnabled, reflection_trigger: reflectionSettings.trigger, reflection_step_count: reflectionSettings.stepCount, uuid: `init-${agent.id}`, }; console.log(JSON.stringify(initEvent)); // Track current operation for interrupt support let currentAbortController: AbortController | null = null; const reminderContextTracker = createContextTracker(); const sharedReminderState = createSharedReminderState(); const isSubagent = process.env.LETTA_CODE_AGENT_ROLE === "subagent"; // Session-level reflection state β€” mirrors the React refs used in App.tsx const recompileByConversation = new Map>(); const recompileQueuedByConversation = new Set(); /** * Launch a reflection subagent in the background, mirroring App.tsx's * maybeLaunchReflectionSubagent. Defined once per session since agentId * and conversationId are stable in bidirectional mode. */ const maybeLaunchReflectionSubagent = async ( _triggerSource: "step-count" | "compaction-event", ): Promise => { if (!settingsManager.isMemfsEnabled(agent.id)) return false; const { getSnapshot } = await import("./cli/helpers/subagentState"); const snapshot = getSnapshot(); const hasActive = snapshot.agents.some( (a) => a.type.toLowerCase() === "reflection" && (a.status === "pending" || a.status === "running"), ); if (hasActive) { debugLog( "memory", `Skipping auto reflection launch (${_triggerSource}) because one is already active`, ); return false; } try { const { buildAutoReflectionPayload, finalizeAutoReflectionPayload, buildParentMemorySnapshot, buildReflectionSubagentPrompt, } = await import("./cli/helpers/reflectionTranscript"); const { getMemoryFilesystemRoot } = await import( "./agent/memoryFilesystem" ); const autoPayload = await buildAutoReflectionPayload( agent.id, conversationId, ); if (!autoPayload) { debugLog( "memory", `Skipping auto reflection launch (${_triggerSource}) because transcript has no new content`, ); return false; } const memoryDir = getMemoryFilesystemRoot(agent.id); const parentMemory = await buildParentMemorySnapshot(memoryDir); const reflectionPrompt = buildReflectionSubagentPrompt({ transcriptPath: autoPayload.payloadPath, memoryDir, cwd: process.cwd(), parentMemory, }); const { spawnBackgroundSubagentTask } = await import("./tools/impl/Task"); spawnBackgroundSubagentTask({ subagentType: "reflection", prompt: reflectionPrompt, description: "Reflect on recent conversations", silentCompletion: true, onComplete: async ({ success, error }) => { await finalizeAutoReflectionPayload( agent.id, conversationId, autoPayload.payloadPath, autoPayload.endSnapshotLine, success, ); const { handleMemorySubagentCompletion } = await import( "./cli/helpers/memorySubagentCompletion" ); const msg = await handleMemorySubagentCompletion( { agentId: agent.id, conversationId, subagentType: "reflection", success, error, }, { recompileByConversation, recompileQueuedByConversation, logRecompileFailure: (m) => debugWarn("memory", m), }, ); // Emit notification to stdout for SDK consumers to optionally handle console.log( JSON.stringify({ type: "system", subtype: "task_notification", session_id: sessionId, text: msg, }), ); }, }); debugLog( "memory", `Auto-launched reflection subagent (${_triggerSource})`, ); return true; } catch (launchError) { debugWarn( "memory", `Failed to auto-launch reflection subagent (${_triggerSource}): ${ launchError instanceof Error ? launchError.message : String(launchError) }`, ); return false; } }; // Resolve pending approvals for this conversation before retrying user input. const resolveAllPendingApprovals = async () => { const { getResumeData } = await import("./agent/check-approval"); while (true) { // Re-fetch agent to get latest in-context messages (source of truth for backend) const freshAgent = await client.agents.retrieve(agent.id); let resume: Awaited>; try { resume = await getResumeData(client, freshAgent, conversationId); } catch (error) { // Treat 404/422 as "no approvals" - stale message/conversation state if ( error instanceof APIError && (error.status === 404 || error.status === 422) ) { break; } throw error; } const pendingApprovals = resume.pendingApprovals || []; if (pendingApprovals.length === 0) break; type Decision = | { type: "approve"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; reason: string; matchedRule: string; } | { type: "deny"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; reason: string; }; const { autoAllowed, autoDenied } = await classifyApprovals( pendingApprovals, { treatAskAsDeny: true, denyReasonForAsk: "Tool requires approval (headless mode)", requireArgsForAutoApprove: true, missingNameReason: "Tool call incomplete - missing name", }, ); const decisions: Decision[] = [ ...autoAllowed.map((ac) => ({ type: "approve" as const, approval: ac.approval, reason: ac.permission.reason || "Allowed by permission rule", matchedRule: "matchedRule" in ac.permission && ac.permission.matchedRule ? ac.permission.matchedRule : "auto-approved", })), ...autoDenied.map((ac) => { const fallback = "matchedRule" in ac.permission && ac.permission.matchedRule ? `Permission denied: ${ac.permission.matchedRule}` : ac.permission.reason ? `Permission denied: ${ac.permission.reason}` : "Permission denied: Unknown reason"; return { type: "deny" as const, approval: ac.approval, reason: ac.denyReason ?? fallback, }; }), ]; const { executeApprovalBatch } = await import( "./agent/approval-execution" ); const executedResults = await executeApprovalBatch(decisions); const approvalInput: ApprovalCreate = { type: "approval", approvals: executedResults as ApprovalResult[], otid: randomUUID(), }; const approvalMessages: Array< | import("@letta-ai/letta-client/resources/agents/agents").MessageCreate | import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate > = [approvalInput]; { const { consumeQueuedSkillContent } = await import( "./tools/impl/skillContentRegistry" ); const skillContents = consumeQueuedSkillContent(); if (skillContents.length > 0) { approvalMessages.push({ role: "user" as const, content: skillContents.map((sc) => ({ type: "text" as const, text: sc.content, })), otid: randomUUID(), }); } } const approvalStream = await sendMessageStream( conversationId, approvalMessages, { agentId: agent.id }, ); const drainResult = await drainStreamWithResume( approvalStream, createBuffers(agent.id), () => {}, undefined, undefined, undefined, reminderContextTracker, ); if ( drainResult.stopReason === "error" || drainResult.stopReason === "cancelled" ) { throw new Error( `Approval drain ended with stop reason: ${drainResult.stopReason}`, ); } } }; // Create readline interface for stdin const rl = readline.createInterface({ input: process.stdin, terminal: false, }); // Create async iterator and line queue for permission callbacks const lineQueue: string[] = []; let lineResolver: ((line: string | null) => void) | null = null; // ── Queue lifecycle tracking (stream-json only) ──────────────── // Bidirectional mode always runs under stream-json input format, so queue // events are always emitted here. emitQueueEvent is a no-op guard retained // for clarity and future-proofing against non-stream-json callers. const emitQueueEvent = (e: QueueLifecycleEvent): void => { console.log(JSON.stringify(e)); }; let turnInProgress = false; const msgQueueRuntime = new QueueRuntime({ callbacks: { onEnqueued: (item, queueLen) => emitQueueEvent({ type: "queue_item_enqueued", item_id: item.id, client_message_id: item.clientMessageId ?? `cm-${item.id}`, source: item.source, kind: item.kind, queue_len: queueLen, session_id: sessionId, uuid: `q-enq-${item.id}`, }), onDequeued: (batch) => emitQueueEvent({ type: "queue_batch_dequeued", batch_id: batch.batchId, item_ids: batch.items.map((i) => i.id), merged_count: batch.mergedCount, queue_len_after: batch.queueLenAfter, session_id: sessionId, uuid: `q-deq-${batch.batchId}`, }), onCleared: (reason, clearedCount) => emitQueueEvent({ type: "queue_cleared", reason, cleared_count: clearedCount, session_id: sessionId, uuid: `q-clr-${randomUUID()}`, }), }, }); /** * Parses a raw JSON line and returns the queue item payload if it is a * user message or task_notification. Returns null for control lines * (control_request, control_response, etc.) and malformed JSON. */ function parseUserLine(raw: string): { kind: "message" | "task_notification"; content: string; } | null { if (!raw.trim()) return null; try { const parsed: { type?: string; message?: { content?: string }; _queuedKind?: string; } = JSON.parse(raw); if (parsed.type !== "user" || parsed.message?.content === undefined) return null; const kind = parsed._queuedKind === "task_notification" ? "task_notification" : "message"; return { kind, content: parsed.message.content }; } catch { return null; } } /** * Emit queue_blocked on the FIRST user/task line arrival during an active * turn. Does NOT enqueue to msgQueueRuntime β€” that happens later, at the * coalescing loop where consumption is certain (avoids orphaned items from * the external-tool wait loop which drops non-matching lines silently). */ let blockedEmittedThisTurn = false; function maybeNotifyBlocked(raw: string): void { if (!turnInProgress || blockedEmittedThisTurn) return; if (!parseUserLine(raw)) return; blockedEmittedThisTurn = true; // queue_len: count user/task items currently in lineQueue (best-effort) const queueLen = lineQueue.filter((l) => parseUserLine(l) !== null).length; emitQueueEvent({ type: "queue_blocked", reason: "runtime_busy", queue_len: Math.max(1, queueLen), session_id: sessionId, uuid: `q-blk-${randomUUID()}`, }); } /** Enqueue a BidirectionalQueuedInput into msgQueueRuntime for lifecycle tracking. */ function enqueueForTracking(input: BidirectionalQueuedInput): void { if (input.kind === "task_notification") { msgQueueRuntime.enqueue({ kind: "task_notification", source: "task_notification", text: input.text, } as Parameters[0]); } else { msgQueueRuntime.enqueue({ kind: "message", source: "user", content: input.content, } as Parameters[0]); } } const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) => JSON.stringify({ type: "user", message: { role: "user", content: queuedMessage.text, }, _queuedKind: queuedMessage.kind, }); // Connect Task/subagent background notifications to the same queueing path // used by user input so bidirectional mode inherits TUI-style queue behavior. setMessageQueueAdder((queuedMessage) => { const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage); maybeNotifyBlocked(syntheticUserLine); if (lineResolver) { const resolve = lineResolver; lineResolver = null; resolve(syntheticUserLine); return; } lineQueue.push(syntheticUserLine); }); // Feed lines into queue or resolver rl.on("line", (line) => { maybeNotifyBlocked(line); if (lineResolver) { const resolve = lineResolver; lineResolver = null; resolve(line); } else { lineQueue.push(line); } }); rl.on("close", () => { setMessageQueueAdder(null); msgQueueRuntime.clear("shutdown"); if (lineResolver) { const resolve = lineResolver; lineResolver = null; resolve(null); } }); // Helper to get next line (from queue or wait) async function getNextLine(): Promise { if (lineQueue.length > 0) { return lineQueue.shift() ?? null; } return new Promise((resolve) => { lineResolver = resolve; }); } // Helper to send permission request and wait for response // Uses Claude SDK's control_request/control_response format for compatibility async function requestPermission( toolCallId: string, toolName: string, toolInput: Record, ): Promise<{ decision: "allow" | "deny"; reason?: string; updatedInput?: Record | null; }> { const requestId = `perm-${toolCallId}`; // Compute diff previews for file-modifying tools const diffs = await computeDiffPreviews(toolName, toolInput); // Build can_use_tool control request (Claude SDK format) const canUseToolRequest: CanUseToolControlRequest = { subtype: "can_use_tool", tool_name: toolName, input: toolInput, tool_call_id: toolCallId, // Letta-specific permission_suggestions: [], // TODO: not implemented blocked_path: null, // TODO: not implemented ...(diffs.length > 0 ? { diffs } : {}), }; const controlRequest: ControlRequest = { type: "control_request", request_id: requestId, request: canUseToolRequest, }; console.log(JSON.stringify(controlRequest)); const deferredLines: string[] = []; // Wait for control_response let result: { decision: "allow" | "deny"; reason?: string; updatedInput?: Record | null; } | null = null; while (result === null) { const line = await getNextLine(); if (line === null) { result = { decision: "deny", reason: "stdin closed" }; break; } if (!line.trim()) continue; try { const msg = JSON.parse(line); if ( msg.type === "control_response" && msg.response?.request_id === requestId ) { // Parse the can_use_tool response const response = msg.response?.response as | CanUseToolResponse | undefined; if (!response) { result = { decision: "deny", reason: "Invalid response format" }; break; } if (response.behavior === "allow") { result = { decision: "allow", updatedInput: response.updatedInput, }; } else { result = { decision: "deny", reason: response.message, // TODO: handle interrupt flag }; } break; } // Defer other messages for the main loop without re-reading them. deferredLines.push(line); } catch { // Defer parse errors so the main loop can surface them. deferredLines.push(line); } } if (deferredLines.length > 0) { lineQueue.unshift(...deferredLines); } return result; } async function recoverPendingApprovalsFromControlRequest( request: RecoverPendingApprovalsControlRequest, ): Promise<{ recovered: boolean; pending_approval: boolean; approvals_processed: number; }> { const targetAgentId = request.agent_id ?? agent.id; const targetConversationId = request.conversation_id ?? conversationId; if (targetAgentId !== agent.id) { throw new Error( `recover_pending_approvals agent mismatch: ${targetAgentId} != ${agent.id}`, ); } const { getResumeData } = await import("./agent/check-approval"); const { executeApprovalBatch } = await import("./agent/approval-execution"); let approvalsProcessed = 0; const MAX_RECOVERY_PASSES = 8; for (let pass = 0; pass < MAX_RECOVERY_PASSES; pass += 1) { const freshAgent = await client.agents.retrieve(agent.id); let resume: Awaited>; try { resume = await getResumeData(client, freshAgent, targetConversationId, { includeMessageHistory: false, }); } catch (error) { if ( error instanceof APIError && (error.status === 404 || error.status === 422) ) { return { recovered: true, pending_approval: false, approvals_processed: approvalsProcessed, }; } throw error; } const pendingApprovals = resume.pendingApprovals || []; if (pendingApprovals.length === 0) { return { recovered: true, pending_approval: false, approvals_processed: approvalsProcessed, }; } const { autoAllowed, autoDenied, needsUserInput } = await classifyApprovals(pendingApprovals, { alwaysRequiresUserInput: isInteractiveApprovalTool, requireArgsForAutoApprove: true, missingNameReason: "Tool call incomplete - missing name", }); const decisions: ApprovalDecision[] = [ ...autoAllowed.map((ac) => ({ type: "approve" as const, approval: ac.approval, })), ...autoDenied.map((ac) => ({ type: "deny" as const, approval: ac.approval, reason: ac.denyReason || ac.permission.reason || "Permission denied", })), ]; // In headless recovery mode, auto-deny approvals that would require user // input. Calling requestPermission() here would block waiting for a // response that will never come, causing a timeout. for (const ac of needsUserInput) { decisions.push({ type: "deny", approval: ac.approval, reason: ac.denyReason || "Auto-denied during recovery - tool requires interactive approval", }); } if (decisions.length === 0) { return { recovered: false, pending_approval: true, approvals_processed: approvalsProcessed, }; } const executedResults = await executeApprovalBatch(decisions); approvalsProcessed += executedResults.length; const approvalInput: ApprovalCreate = { type: "approval", approvals: executedResults as ApprovalResult[], otid: randomUUID(), }; const approvalStream = await sendMessageStream( targetConversationId, [approvalInput], { agentId: agent.id }, ); const drainResult = await drainStreamWithResume( approvalStream, createBuffers(agent.id), () => {}, undefined, undefined, undefined, reminderContextTracker, ); if (drainResult.stopReason === "error") { throw new Error( drainResult.fallbackError || "recover_pending_approvals failed while applying approvals", ); } } return { recovered: false, pending_approval: true, approvals_processed: approvalsProcessed, }; } // Main processing loop while (true) { const line = await getNextLine(); if (line === null) break; // stdin closed if (!line.trim()) continue; let message: { type: string; message?: { role: string; content: MessageCreate["content"] }; request_id?: string; request?: { subtype: string }; session_id?: string; }; try { message = JSON.parse(line); } catch { const errorMsg: ErrorMessage = { type: "error", message: "Invalid JSON input", stop_reason: "error", session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(errorMsg)); continue; } // Handle control requests if (message.type === "control_request") { const subtype = message.request?.subtype; const requestId = message.request_id; if (subtype === "initialize") { // Return session info const initResponse: ControlResponse = { type: "control_response", response: { subtype: "success", request_id: requestId ?? "", response: { agent_id: agent.id, model: agent.llm_config?.model, tools: availableTools, memfs_enabled: settingsManager.isMemfsEnabled(agent.id), skill_sources: skillSources, system_info_reminder_enabled: systemInfoReminderEnabled, reflection_trigger: reflectionSettings.trigger, reflection_step_count: reflectionSettings.stepCount, }, }, session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(initResponse)); } else if (subtype === "interrupt") { // Abort current operation if any if (currentAbortController !== null) { (currentAbortController as AbortController).abort(); currentAbortController = null; } const interruptResponse: ControlResponse = { type: "control_response", response: { subtype: "success", request_id: requestId ?? "", }, session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(interruptResponse)); } else if (subtype === "register_external_tools") { // Register external tools from SDK const toolsRequest = message.request as { tools?: ExternalToolDefinition[]; }; const tools = toolsRequest.tools ?? []; registerExternalTools(tools); // Set up the external tool executor to send requests back to SDK setExternalToolExecutor(async (toolCallId, toolName, input) => { // Send execute_external_tool request to SDK const execRequest: ControlRequest = { type: "control_request", request_id: `ext-${toolCallId}`, request: { subtype: "execute_external_tool", tool_call_id: toolCallId, tool_name: toolName, input, } as unknown as CanUseToolControlRequest, // Type cast for compatibility }; console.log(JSON.stringify(execRequest)); // Wait for external_tool_result response while (true) { const line = await getNextLine(); if (line === null) { return { content: [{ type: "text", text: "stdin closed" }], isError: true, }; } if (!line.trim()) continue; try { const msg = JSON.parse(line); if ( msg.type === "control_response" && msg.response?.subtype === "external_tool_result" && msg.response?.tool_call_id === toolCallId ) { return { content: msg.response.content ?? [{ type: "text", text: "" }], isError: msg.response.is_error ?? false, }; } } catch { // Ignore parse errors, keep waiting } } }); const registerResponse: ControlResponse = { type: "control_response", response: { subtype: "success", request_id: requestId ?? "", response: { registered: tools.length }, }, session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(registerResponse)); } else if (subtype === "bootstrap_session_state") { const bootstrapReq = message.request as BootstrapSessionStateRequest; const { getResumeData } = await import("./agent/check-approval"); let hasPendingApproval = false; try { // Re-fetch for parity with approval checks elsewhere in headless mode. const freshAgent = await client.agents.retrieve(agent.id); const resume = await getResumeData( client, freshAgent, conversationId, { includeMessageHistory: false, }, ); hasPendingApproval = (resume.pendingApprovals?.length ?? 0) > 0; } catch (error) { // Keep bootstrap non-fatal if approval probe fails on stale resources. if ( !(error instanceof APIError) || (error.status !== 404 && error.status !== 422) ) { console.warn( `[bootstrap] pending-approval probe failed: ${error instanceof Error ? error.message : String(error)}`, ); } } const bootstrapResp = await handleBootstrapSessionState({ bootstrapReq, sessionContext: { agentId: agent.id, conversationId, model: agent.llm_config?.model, tools: availableTools, memfsEnabled: settingsManager.isMemfsEnabled(agent.id), sessionId, }, requestId: requestId ?? "", client, hasPendingApproval, }); console.log(JSON.stringify(bootstrapResp)); } else if (subtype === "list_messages") { const listReq = message.request as ListMessagesControlRequest; const listResp = await handleListMessages({ listReq, sessionConversationId: conversationId, sessionAgentId: agent.id, sessionId, requestId: requestId ?? "", client, }); console.log(JSON.stringify(listResp)); } else if (subtype === "recover_pending_approvals") { const recoverReq = message.request as RecoverPendingApprovalsControlRequest; try { const recovery = await recoverPendingApprovalsFromControlRequest(recoverReq); const recoveryResponse: ControlResponse = { type: "control_response", response: { subtype: "success", request_id: requestId ?? "", response: recovery, }, session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(recoveryResponse)); } catch (error) { const recoveryError: ControlResponse = { type: "control_response", response: { subtype: "error", request_id: requestId ?? "", error: error instanceof Error ? error.message : String(error), }, session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(recoveryError)); } } else { const errorResponse: ControlResponse = { type: "control_response", response: { subtype: "error", request_id: requestId ?? "", error: `Unknown control request subtype: ${subtype}`, }, session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(errorResponse)); } continue; } // Handle user messages if (message.type === "user" && message.message?.content !== undefined) { const queuedInputs: BidirectionalQueuedInput[] = [ { kind: "user", content: message.message.content, }, ]; // Batch any already-buffered user lines into the same turn, mirroring // TUI queue dequeue behavior (single coalesced submit when idle). while (lineQueue.length > 0) { const candidate = lineQueue[0]; if (!candidate?.trim()) { lineQueue.shift(); continue; } let parsedCandidate: { type?: string; message?: { content?: MessageCreate["content"] }; _queuedKind?: QueuedMessage["kind"]; }; try { parsedCandidate = JSON.parse(candidate); } catch { // Leave malformed lines for the main loop to surface as parse errors. break; } if ( parsedCandidate.type === "user" && parsedCandidate.message?.content !== undefined ) { lineQueue.shift(); if (parsedCandidate._queuedKind === "task_notification") { const notificationText = typeof parsedCandidate.message.content === "string" ? parsedCandidate.message.content : parsedCandidate.message.content .reduce((texts: string[], part) => { if ( part.type === "text" && "text" in part && typeof part.text === "string" ) { texts.push(part.text); } return texts; }, []) .join(""); queuedInputs.push({ kind: "task_notification", text: notificationText, }); } else { queuedInputs.push({ kind: "user", content: parsedCandidate.message.content, }); } continue; } // Stop coalescing when the queue head is not a user-input line. // The outer loop must process control/error/system lines in-order. break; } // Enqueue consumed items into msgQueueRuntime for lifecycle tracking. // Done here (not at arrival) to avoid orphaned items from the external- // tool wait loop, which consumes non-matching lines via getNextLine() // without deferring them back to lineQueue. for (const input of queuedInputs) { enqueueForTracking(input); } // Signal dequeue for exactly the items we just enqueued. consumeItems(n) // bypasses QueueRuntime's internal coalescing policy so the count matches // what the coalescing loop actually yielded. msgQueueRuntime.consumeItems(queuedInputs.length); const userContent = mergeBidirectionalQueuedInput(queuedInputs); if (userContent === null) { continue; } // Create abort controller for this operation currentAbortController = new AbortController(); turnInProgress = true; try { const buffers = createBuffers(agent.id); const startTime = performance.now(); let numTurns = 0; let lastStopReason: StopReasonType | null = null; // Track for result subtype let sawStreamError = false; // Track if we emitted an error during streaming let preStreamTransientRetries = 0; syncReminderStateFromContextTracker( sharedReminderState, reminderContextTracker, ); const lastRunAt = (agent as { last_run_completion?: string }) .last_run_completion; const { parts: sharedReminderParts } = await buildSharedReminderParts({ mode: isSubagent ? "subagent" : "headless-bidirectional", agent: { id: agent.id, name: agent.name, description: agent.description, lastRunAt: lastRunAt ?? null, }, state: sharedReminderState, sessionContextReminderEnabled: systemInfoReminderEnabled, reflectionSettings, skillSources, resolvePlanModeReminder: async () => { const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets"); return PLAN_MODE_REMINDER; }, maybeLaunchReflectionSubagent, }); const enrichedContent = prependReminderPartsToContent( userContent, sharedReminderParts, ); // Initial input is the user message let currentInput: MessageCreate[] = [ { role: "user", content: enrichedContent }, ]; // Approval handling loop - continue until end_turn or error while (true) { numTurns++; // Check if aborted if (currentAbortController?.signal.aborted) { break; } // Inject queued skill content as user message parts (LET-7353) { const { consumeQueuedSkillContent } = await import( "./tools/impl/skillContentRegistry" ); const skillContents = consumeQueuedSkillContent(); if (skillContents.length > 0) { currentInput = [ ...currentInput, { role: "user" as const, content: skillContents.map((sc) => ({ type: "text" as const, text: sc.content, })), }, ]; } } // Send message to agent. // Wrap in try-catch to handle pre-stream 409 approval-pending errors. let stream: Awaited>; let turnToolContextId: string | null = null; try { stream = await sendMessageStream(conversationId, currentInput, { agentId: agent.id, }); turnToolContextId = getStreamToolContextId(stream); } catch (preStreamError) { // Extract error detail using shared helper (handles nested/direct/message shapes) const errorDetail = extractConflictDetail(preStreamError); // Route through shared pre-stream conflict classifier (parity with main loop + TUI) // Bidir mode has no conversation-busy retry budget, so pass 0/0 to disable busy-retry. const preStreamAction = getPreStreamErrorAction(errorDetail, 0, 0, { status: preStreamError instanceof APIError ? preStreamError.status : undefined, transientRetries: preStreamTransientRetries, maxTransientRetries: LLM_API_ERROR_MAX_RETRIES, }); if (preStreamAction === "resolve_approval_pending") { const recoveryMsg: RecoveryMessage = { type: "recovery", recovery_type: "approval_pending", message: "Detected pending approval conflict on send; resolving before retry", session_id: sessionId, uuid: `recovery-bidir-${randomUUID()}`, }; console.log(JSON.stringify(recoveryMsg)); await resolveAllPendingApprovals(); continue; } if (preStreamAction === "retry_transient") { const attempt = preStreamTransientRetries + 1; const retryAfterMs = preStreamError instanceof APIError ? parseRetryAfterHeaderMs( preStreamError.headers?.get("retry-after"), ) : null; const delayMs = getRetryDelayMs({ category: "transient_provider", attempt, detail: errorDetail, retryAfterMs, }); preStreamTransientRetries = attempt; const retryMsg: RetryMessage = { type: "retry", reason: "llm_api_error", attempt, max_attempts: LLM_API_ERROR_MAX_RETRIES, delay_ms: delayMs, session_id: sessionId, uuid: `retry-bidir-${randomUUID()}`, }; console.log(JSON.stringify(retryMsg)); await new Promise((resolve) => setTimeout(resolve, delayMs)); continue; } throw preStreamError; } preStreamTransientRetries = 0; const streamJsonHook: DrainStreamHook = ({ chunk, shouldOutput, errorInfo, }) => { // Handle in-stream errors (emit ErrorMessage with full details) if (errorInfo && shouldOutput) { sawStreamError = true; // Track that we saw an error (affects result subtype) const errorEvent: ErrorMessage = { type: "error", message: errorInfo.message, stop_reason: "error", run_id: errorInfo.run_id, session_id: sessionId, uuid: randomUUID(), ...(errorInfo.error_type && errorInfo.run_id && { api_error: { message_type: "error_message", message: errorInfo.message, error_type: errorInfo.error_type, detail: errorInfo.detail, run_id: errorInfo.run_id, }, }), }; console.log(JSON.stringify(errorEvent)); return { shouldAccumulate: true }; } if (!shouldOutput) { return { shouldAccumulate: true }; } const chunkWithIds = chunk as typeof chunk & { otid?: string; id?: string; }; const uuid = chunkWithIds.otid || chunkWithIds.id; if (includePartialMessages) { const streamEvent: StreamEvent = { type: "stream_event", event: chunk, session_id: sessionId, uuid: uuid || randomUUID(), }; console.log(JSON.stringify(streamEvent)); } else { const msg: MessageWire = { type: "message", ...chunk, session_id: sessionId, uuid: uuid || randomUUID(), }; console.log(JSON.stringify(msg)); } return { shouldAccumulate: true }; }; const result = await drainStreamWithResume( stream, buffers, () => {}, currentAbortController?.signal, undefined, streamJsonHook, reminderContextTracker, ); const stopReason = result.stopReason; lastStopReason = stopReason; // Track for result subtype const approvals = result.approvals || []; // Case 1: Turn ended normally - break out of loop if (stopReason === "end_turn") { break; } // Case 2: Aborted - break out of loop if ( currentAbortController?.signal.aborted || stopReason === "cancelled" ) { break; } // Case 3: Requires approval - process approvals and continue if (stopReason === "requires_approval") { if (approvals.length === 0) { // Anomalous state: requires_approval but no approvals // Treat as error rather than false-positive success lastStopReason = "error"; break; } // Check permissions and collect decisions type Decision = | { type: "approve"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; matchedRule: string; } | { type: "deny"; approval: { toolCallId: string; toolName: string; toolArgs: string; }; reason: string; }; const { autoAllowed, autoDenied, needsUserInput } = await classifyApprovals(approvals, { alwaysRequiresUserInput: isInteractiveApprovalTool, requireArgsForAutoApprove: true, missingNameReason: "Tool call incomplete - missing name", }); const decisions: Decision[] = [ ...autoAllowed.map((ac) => ({ type: "approve" as const, approval: ac.approval, matchedRule: "matchedRule" in ac.permission && ac.permission.matchedRule ? ac.permission.matchedRule : "auto-approved", })), ...autoDenied.map((ac) => { const fallback = "matchedRule" in ac.permission && ac.permission.matchedRule ? `Permission denied: ${ac.permission.matchedRule}` : ac.permission.reason ? `Permission denied: ${ac.permission.reason}` : "Permission denied: Unknown reason"; return { type: "deny" as const, approval: ac.approval, reason: ac.denyReason ?? fallback, }; }), ]; for (const approvalItem of autoAllowed) { const permission = approvalItem.permission; const autoApprovalMsg: AutoApprovalMessage = { type: "auto_approval", tool_call: { name: approvalItem.approval.toolName, tool_call_id: approvalItem.approval.toolCallId, arguments: approvalItem.approval.toolArgs, }, reason: permission.reason || "auto-approved", matched_rule: "matchedRule" in permission && permission.matchedRule ? permission.matchedRule : "auto-approved", session_id: sessionId, uuid: `auto-approval-${approvalItem.approval.toolCallId}`, }; console.log(JSON.stringify(autoApprovalMsg)); } for (const ac of needsUserInput) { // permission.decision === "ask" - request permission from SDK const permResponse = await requestPermission( ac.approval.toolCallId, ac.approval.toolName, ac.parsedArgs, ); if (permResponse.decision === "allow") { // If provided updatedInput (e.g., for AskUserQuestion with answers), // update the approval's toolArgs to use it const finalApproval = permResponse.updatedInput ? { ...ac.approval, toolArgs: JSON.stringify(permResponse.updatedInput), } : ac.approval; decisions.push({ type: "approve", approval: finalApproval, matchedRule: "SDK callback approved", }); // Emit auto_approval event for SDK-approved tool const autoApprovalMsg: AutoApprovalMessage = { type: "auto_approval", tool_call: { name: finalApproval.toolName, tool_call_id: finalApproval.toolCallId, arguments: finalApproval.toolArgs, }, reason: permResponse.reason || "SDK callback approved", matched_rule: "canUseTool callback", session_id: sessionId, uuid: `auto-approval-${ac.approval.toolCallId}`, }; console.log(JSON.stringify(autoApprovalMsg)); } else { decisions.push({ type: "deny", approval: ac.approval, reason: permResponse.reason || "Denied by SDK callback", }); } } // Execute approved tools const { executeApprovalBatch } = await import( "./agent/approval-execution" ); const executedResults = await executeApprovalBatch( decisions, undefined, { toolContextId: turnToolContextId ?? undefined }, ); // Send approval results back to continue const approvalInputWithOtid = { type: "approval" as const, approvals: executedResults, otid: randomUUID(), }; currentInput = [approvalInputWithOtid as unknown as MessageCreate]; // Continue the loop to process the next stream continue; } // Other stop reasons - break break; } // Emit result const durationMs = performance.now() - startTime; const lines = toLines(buffers); // Append transcript delta for reflection β€” always write, even on // interrupted/error turns, so short user exchanges are captured. if (settingsManager.isMemfsEnabled(agent.id)) { try { const { appendTranscriptDeltaJsonl } = await import( "./cli/helpers/reflectionTranscript" ); const userText = extractUserTextFromContent(userContent); const userLine: Line = { kind: "user", id: randomUUID(), text: userText, }; await appendTranscriptDeltaJsonl(agent.id, conversationId, [ userLine, ...lines, ]); } catch (transcriptErr) { debugWarn( "memory", `Failed to append transcript delta: ${ transcriptErr instanceof Error ? transcriptErr.message : String(transcriptErr) }`, ); } } const reversed = [...lines].reverse(); const lastAssistant = reversed.find( (line) => line.kind === "assistant" && "text" in line && typeof line.text === "string" && line.text.trim().length > 0, ) as Extract | undefined; const lastReasoning = reversed.find( (line) => line.kind === "reasoning" && "text" in line && typeof line.text === "string" && line.text.trim().length > 0, ) as Extract | undefined; const lastToolResult = reversed.find( (line) => line.kind === "tool_call" && "resultText" in line && typeof (line as Extract).resultText === "string" && ( (line as Extract).resultText ?? "" ).trim().length > 0, ) as Extract | undefined; const resultText = lastAssistant?.text || lastReasoning?.text || lastToolResult?.resultText || ""; // Determine result subtype based on how the turn ended const isAborted = currentAbortController?.signal.aborted; // isError if: (1) stop reason indicates error, OR (2) we emitted an error during streaming const isError = sawStreamError || (lastStopReason && lastStopReason !== "end_turn" && lastStopReason !== "requires_approval"); const subtype: ResultMessage["subtype"] = isAborted ? "interrupted" : isError ? "error" : "success"; const resultMsg: ResultMessage = { type: "result", subtype, session_id: sessionId, duration_ms: Math.round(durationMs), duration_api_ms: 0, // Not tracked in bidirectional mode num_turns: numTurns, result: resultText, agent_id: agent.id, conversation_id: conversationId, run_ids: [], usage: null, uuid: `result-${agent.id}-${Date.now()}`, // Include stop_reason only when subtype is "error" (not "interrupted") ...(subtype === "error" && { stop_reason: lastStopReason && lastStopReason !== "end_turn" ? lastStopReason : "error", // Use "error" if sawStreamError but lastStopReason was end_turn }), }; console.log(JSON.stringify(resultMsg)); } catch (error) { // Use formatErrorDetails for comprehensive error formatting (same as one-shot mode) const errorDetails = formatErrorDetails(error, agent.id); const errorMsg: ErrorMessage = { type: "error", message: errorDetails, stop_reason: "error", session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(errorMsg)); // Also emit a result message with subtype: "error" so SDK knows the turn failed const errorResultMsg: ResultMessage = { type: "result", subtype: "error", session_id: sessionId, duration_ms: 0, duration_api_ms: 0, num_turns: 0, result: null, agent_id: agent.id, conversation_id: conversationId, run_ids: [], usage: null, uuid: `result-error-${agent.id}-${Date.now()}`, stop_reason: "error", }; console.log(JSON.stringify(errorResultMsg)); } finally { turnInProgress = false; blockedEmittedThisTurn = false; currentAbortController = null; } continue; } // Unknown message type const errorMsg: ErrorMessage = { type: "error", message: `Unknown message type: ${message.type}`, stop_reason: "error", session_id: sessionId, uuid: randomUUID(), }; console.log(JSON.stringify(errorMsg)); } // Stdin closed, exit gracefully setMessageQueueAdder(null); process.exit(0); }