3814 lines
126 KiB
TypeScript
3814 lines
126 KiB
TypeScript
import { randomUUID } from "node:crypto";
|
||
import type { Letta } from "@letta-ai/letta-client";
|
||
import { APIError } from "@letta-ai/letta-client/core/error";
|
||
import type {
|
||
AgentState,
|
||
MessageCreate,
|
||
} from "@letta-ai/letta-client/resources/agents/agents";
|
||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
|
||
import type {
|
||
ApprovalDecision,
|
||
ApprovalResult,
|
||
} from "./agent/approval-execution";
|
||
import {
|
||
extractConflictDetail,
|
||
fetchRunErrorDetail,
|
||
getPreStreamErrorAction,
|
||
getRetryDelayMs,
|
||
isApprovalPendingError,
|
||
isEmptyResponseRetryable,
|
||
isInvalidToolCallIdsError,
|
||
parseRetryAfterHeaderMs,
|
||
shouldRetryRunMetadataError,
|
||
} from "./agent/approval-recovery";
|
||
import { handleBootstrapSessionState } from "./agent/bootstrapHandler";
|
||
import { getClient } from "./agent/client";
|
||
import { buildClientSkillsPayload } from "./agent/clientSkills";
|
||
import { setAgentContext, setConversationId } from "./agent/context";
|
||
import { createAgent } from "./agent/create";
|
||
import { handleListMessages } from "./agent/listMessagesHandler";
|
||
import { ISOLATED_BLOCK_LABELS } from "./agent/memory";
|
||
import { getStreamToolContextId, sendMessageStream } from "./agent/message";
|
||
import {
|
||
getModelPresetUpdateForAgent,
|
||
getModelUpdateArgs,
|
||
getResumeRefreshArgs,
|
||
resolveModel,
|
||
} from "./agent/model";
|
||
import { updateAgentLLMConfig, updateAgentSystemPrompt } from "./agent/modify";
|
||
import { resolveSkillSourcesSelection } from "./agent/skillSources";
|
||
import type { SkillSource } from "./agent/skills";
|
||
import { SessionStats } from "./agent/stats";
|
||
import type { ParsedCliArgs } from "./cli/args";
|
||
import {
|
||
normalizeConversationShorthandFlags,
|
||
parseCsvListFlag,
|
||
parseJsonArrayFlag,
|
||
parsePositiveIntFlag,
|
||
resolveImportFlagAlias,
|
||
} from "./cli/flagUtils";
|
||
import {
|
||
createBuffers,
|
||
type Line,
|
||
markIncompleteToolsAsCancelled,
|
||
toLines,
|
||
} from "./cli/helpers/accumulator";
|
||
import { classifyApprovals } from "./cli/helpers/approvalClassification";
|
||
import { createContextTracker } from "./cli/helpers/contextTracker";
|
||
import { formatErrorDetails } from "./cli/helpers/errorFormatter";
|
||
import {
|
||
getReflectionSettings,
|
||
type ReflectionSettings,
|
||
type ReflectionTrigger,
|
||
reflectionSettingsToLegacyMode,
|
||
} from "./cli/helpers/memoryReminder";
|
||
import {
|
||
type QueuedMessage,
|
||
setMessageQueueAdder,
|
||
} from "./cli/helpers/messageQueueBridge";
|
||
import {
|
||
type DrainStreamHook,
|
||
drainStreamWithResume,
|
||
} from "./cli/helpers/stream";
|
||
import {
|
||
validateConversationDefaultRequiresAgent,
|
||
validateFlagConflicts,
|
||
validateRegistryHandleOrThrow,
|
||
} from "./cli/startupFlagValidation";
|
||
import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants";
|
||
import { computeDiffPreviews } from "./helpers/diffPreview";
|
||
import { QueueRuntime } from "./queue/queueRuntime";
|
||
import {
|
||
mergeQueuedTurnInput,
|
||
type QueuedTurnInput,
|
||
} from "./queue/turnQueueRuntime";
|
||
import {
|
||
buildSharedReminderParts,
|
||
prependReminderPartsToContent,
|
||
} from "./reminders/engine";
|
||
import {
|
||
createSharedReminderState,
|
||
syncReminderStateFromContextTracker,
|
||
} from "./reminders/state";
|
||
import { settingsManager } from "./settings-manager";
|
||
import {
|
||
isHeadlessAutoAllowTool,
|
||
isInteractiveApprovalTool,
|
||
} from "./tools/interactivePolicy";
|
||
import {
|
||
type ExternalToolDefinition,
|
||
registerExternalTools,
|
||
setExternalToolExecutor,
|
||
} from "./tools/manager";
|
||
import type {
|
||
AutoApprovalMessage,
|
||
BootstrapSessionStateRequest,
|
||
CanUseToolControlRequest,
|
||
CanUseToolResponse,
|
||
ControlRequest,
|
||
ControlResponse,
|
||
ErrorMessage,
|
||
ListMessagesControlRequest,
|
||
MessageWire,
|
||
QueueLifecycleEvent,
|
||
RecoverPendingApprovalsControlRequest,
|
||
RecoveryMessage,
|
||
ResultMessage,
|
||
RetryMessage,
|
||
StreamEvent,
|
||
SystemInitMessage,
|
||
} from "./types/protocol";
|
||
import { debugLog } from "./utils/debug";
|
||
import {
|
||
markMilestone,
|
||
measureSinceMilestone,
|
||
reportAllMilestones,
|
||
} from "./utils/timing";
|
||
|
||
// Maximum number of times to retry a turn when the backend
|
||
// reports an `llm_api_error` stop reason. This helps smooth
|
||
// over transient LLM/backend issues without requiring the
|
||
// caller to manually resubmit the prompt.
|
||
const LLM_API_ERROR_MAX_RETRIES = 3;
|
||
|
||
// Retry config for empty response errors (Opus 4.6 SADs)
|
||
// Retry 1: same input. Retry 2: with system reminder nudge.
|
||
const EMPTY_RESPONSE_MAX_RETRIES = 2;
|
||
|
||
// Retry config for 409 "conversation busy" errors (exponential backoff)
|
||
const CONVERSATION_BUSY_MAX_RETRIES = 3; // 10s -> 20s -> 40s
|
||
|
||
export type BidirectionalQueuedInput = QueuedTurnInput<
|
||
MessageCreate["content"]
|
||
>;
|
||
|
||
export function mergeBidirectionalQueuedInput(
|
||
queued: BidirectionalQueuedInput[],
|
||
): MessageCreate["content"] | null {
|
||
return mergeQueuedTurnInput(queued, {
|
||
normalizeUserContent: (content) => content,
|
||
});
|
||
}
|
||
|
||
type ReflectionOverrides = {
|
||
trigger?: ReflectionTrigger;
|
||
deprecatedBehaviorRaw?: string;
|
||
stepCount?: number;
|
||
};
|
||
|
||
function parseReflectionOverrides(
|
||
values: ParsedCliArgs["values"],
|
||
): ReflectionOverrides {
|
||
const triggerRaw = values["reflection-trigger"];
|
||
const behaviorRaw = values["reflection-behavior"];
|
||
const stepCountRaw = values["reflection-step-count"];
|
||
|
||
if (!triggerRaw && !behaviorRaw && !stepCountRaw) {
|
||
return {};
|
||
}
|
||
|
||
const overrides: ReflectionOverrides = {};
|
||
|
||
if (triggerRaw !== undefined) {
|
||
if (
|
||
triggerRaw !== "off" &&
|
||
triggerRaw !== "step-count" &&
|
||
triggerRaw !== "compaction-event"
|
||
) {
|
||
throw new Error(
|
||
`Invalid --reflection-trigger "${triggerRaw}". Valid values: off, step-count, compaction-event`,
|
||
);
|
||
}
|
||
overrides.trigger = triggerRaw;
|
||
}
|
||
|
||
if (behaviorRaw !== undefined) {
|
||
if (behaviorRaw !== "reminder" && behaviorRaw !== "auto-launch") {
|
||
throw new Error(
|
||
`Invalid --reflection-behavior "${behaviorRaw}". Valid values: reminder, auto-launch`,
|
||
);
|
||
}
|
||
overrides.deprecatedBehaviorRaw = behaviorRaw;
|
||
}
|
||
|
||
if (stepCountRaw !== undefined) {
|
||
try {
|
||
overrides.stepCount = parsePositiveIntFlag({
|
||
rawValue: stepCountRaw,
|
||
flagName: "reflection-step-count",
|
||
});
|
||
} catch {
|
||
throw new Error(
|
||
`Invalid --reflection-step-count "${stepCountRaw}". Expected a positive integer.`,
|
||
);
|
||
}
|
||
}
|
||
|
||
return overrides;
|
||
}
|
||
|
||
function hasReflectionOverrides(overrides: ReflectionOverrides): boolean {
|
||
return (
|
||
overrides.trigger !== undefined ||
|
||
overrides.deprecatedBehaviorRaw !== undefined ||
|
||
overrides.stepCount !== undefined
|
||
);
|
||
}
|
||
|
||
async function applyReflectionOverrides(
|
||
agentId: string,
|
||
overrides: ReflectionOverrides,
|
||
): Promise<ReflectionSettings> {
|
||
const current = getReflectionSettings();
|
||
const merged: ReflectionSettings = {
|
||
trigger: overrides.trigger ?? current.trigger,
|
||
stepCount: overrides.stepCount ?? current.stepCount,
|
||
};
|
||
|
||
if (!hasReflectionOverrides(overrides)) {
|
||
return merged;
|
||
}
|
||
|
||
if (overrides.deprecatedBehaviorRaw !== undefined) {
|
||
console.warn(
|
||
"Warning: --reflection-behavior is deprecated and ignored. Reflection now always auto-launches subagents.",
|
||
);
|
||
}
|
||
|
||
const memfsEnabled = settingsManager.isMemfsEnabled(agentId);
|
||
if (!memfsEnabled && merged.trigger === "compaction-event") {
|
||
throw new Error(
|
||
"--reflection-trigger compaction-event requires memfs enabled for this agent.",
|
||
);
|
||
}
|
||
|
||
try {
|
||
settingsManager.getLocalProjectSettings();
|
||
} catch {
|
||
await settingsManager.loadLocalProjectSettings();
|
||
}
|
||
|
||
const legacyMode = reflectionSettingsToLegacyMode(merged);
|
||
settingsManager.updateLocalProjectSettings({
|
||
memoryReminderInterval: legacyMode,
|
||
reflectionTrigger: merged.trigger,
|
||
reflectionStepCount: merged.stepCount,
|
||
});
|
||
settingsManager.updateSettings({
|
||
memoryReminderInterval: legacyMode,
|
||
reflectionTrigger: merged.trigger,
|
||
reflectionStepCount: merged.stepCount,
|
||
});
|
||
|
||
return merged;
|
||
}
|
||
|
||
export async function handleHeadlessCommand(
|
||
parsedArgs: ParsedCliArgs,
|
||
model?: string,
|
||
skillsDirectoryOverride?: string,
|
||
skillSourcesOverride?: SkillSource[],
|
||
systemInfoReminderEnabledOverride?: boolean,
|
||
) {
|
||
const { values, positionals } = parsedArgs;
|
||
|
||
// Set tool filter if provided (controls which tools are loaded)
|
||
if (values.tools !== undefined) {
|
||
const { toolFilter } = await import("./tools/filter");
|
||
toolFilter.setEnabledTools(values.tools);
|
||
}
|
||
// Set permission mode if provided (or via --yolo alias)
|
||
const permissionModeValue = values["permission-mode"];
|
||
const yoloMode = values.yolo;
|
||
if (yoloMode || permissionModeValue) {
|
||
const { permissionMode } = await import("./permissions/mode");
|
||
if (yoloMode) {
|
||
permissionMode.setMode("bypassPermissions");
|
||
} else if (permissionModeValue) {
|
||
const validModes = [
|
||
"default",
|
||
"acceptEdits",
|
||
"bypassPermissions",
|
||
"plan",
|
||
];
|
||
if (validModes.includes(permissionModeValue)) {
|
||
permissionMode.setMode(
|
||
permissionModeValue as
|
||
| "default"
|
||
| "acceptEdits"
|
||
| "bypassPermissions"
|
||
| "plan",
|
||
);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Set CLI permission overrides if provided (inherited from parent agent)
|
||
if (values.allowedTools || values.disallowedTools) {
|
||
const { cliPermissions } = await import("./permissions/cli");
|
||
if (values.allowedTools) {
|
||
cliPermissions.setAllowedTools(values.allowedTools);
|
||
}
|
||
if (values.disallowedTools) {
|
||
cliPermissions.setDisallowedTools(values.disallowedTools);
|
||
}
|
||
}
|
||
|
||
// Check for input-format early - if stream-json, we don't need a prompt
|
||
const inputFormat = values["input-format"];
|
||
const isBidirectionalMode = inputFormat === "stream-json";
|
||
|
||
// If headless output is being piped and the downstream closes early (e.g.
|
||
// `| head`), Node will throw EPIPE on stdout writes. Treat this as a normal
|
||
// termination rather than crashing with a stack trace.
|
||
//
|
||
// Note: this must be registered before any `console.log` in headless mode.
|
||
process.stdout.on("error", (err: unknown) => {
|
||
const code =
|
||
typeof err === "object" && err !== null && "code" in err
|
||
? (err as { code?: unknown }).code
|
||
: undefined;
|
||
|
||
if (code === "EPIPE") {
|
||
process.exit(0);
|
||
}
|
||
|
||
// Re-throw unknown stdout errors so they surface during tests/debugging.
|
||
throw err;
|
||
});
|
||
|
||
// Get prompt from either positional args or stdin (unless in bidirectional mode)
|
||
let prompt = positionals.slice(2).join(" ");
|
||
|
||
// If no prompt provided as args, try reading from stdin (unless in bidirectional mode)
|
||
if (!prompt && !isBidirectionalMode) {
|
||
// Check if stdin is available (piped input)
|
||
if (!process.stdin.isTTY) {
|
||
const chunks: Buffer[] = [];
|
||
for await (const chunk of process.stdin) {
|
||
chunks.push(chunk);
|
||
}
|
||
prompt = Buffer.concat(chunks).toString("utf-8").trim();
|
||
}
|
||
}
|
||
|
||
if (!prompt && !isBidirectionalMode) {
|
||
console.error("Error: No prompt provided");
|
||
process.exit(1);
|
||
}
|
||
|
||
const client = await getClient();
|
||
markMilestone("HEADLESS_CLIENT_READY");
|
||
|
||
// Check for --resume flag (interactive only)
|
||
if (values.resume) {
|
||
console.error(
|
||
"Error: --resume is for interactive mode only (opens conversation selector).\n" +
|
||
"In headless mode, use:\n" +
|
||
" --continue Resume the last session (agent + conversation)\n" +
|
||
" --conversation <id> Resume a specific conversation by ID",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
// --new: Create a new conversation (for concurrent sessions)
|
||
let forceNewConversation = values.new ?? false;
|
||
const fromAgentId = values["from-agent"];
|
||
|
||
// Resolve agent (same logic as interactive mode)
|
||
let agent: AgentState | null = null;
|
||
let specifiedAgentId = values.agent;
|
||
const specifiedAgentName = values.name;
|
||
let specifiedConversationId = values.conversation;
|
||
const shouldContinue = values.continue;
|
||
const forceNew = values["new-agent"];
|
||
const systemPromptPreset = values.system;
|
||
const systemCustom = values["system-custom"];
|
||
const embeddingModel = values.embedding;
|
||
const memoryBlocksJson = values["memory-blocks"];
|
||
const blockValueArgs = values["block-value"];
|
||
const initBlocksRaw = values["init-blocks"];
|
||
const baseToolsRaw = values["base-tools"];
|
||
const skillsDirectory = values.skills ?? skillsDirectoryOverride;
|
||
const noSkillsFlag = values["no-skills"];
|
||
const noBundledSkillsFlag = values["no-bundled-skills"];
|
||
const skillSourcesRaw = values["skill-sources"];
|
||
const memfsFlag = values.memfs;
|
||
const noMemfsFlag = values["no-memfs"];
|
||
// Startup policy for the git-backed memory pull on session init.
|
||
// "blocking" (default): await the pull before proceeding.
|
||
// "background": fire the pull async, emit init without waiting.
|
||
// "skip": skip the pull entirely this session.
|
||
const memfsStartupRaw = values["memfs-startup"];
|
||
const memfsStartupPolicy: "blocking" | "background" | "skip" =
|
||
memfsStartupRaw === "background" || memfsStartupRaw === "skip"
|
||
? memfsStartupRaw
|
||
: "blocking";
|
||
const requestedMemoryPromptMode: "memfs" | "standard" | undefined = memfsFlag
|
||
? "memfs"
|
||
: noMemfsFlag
|
||
? "standard"
|
||
: undefined;
|
||
const shouldAutoEnableMemfsForNewAgent = !memfsFlag && !noMemfsFlag;
|
||
const fromAfFile = resolveImportFlagAlias({
|
||
importFlagValue: values.import,
|
||
fromAfFlagValue: values["from-af"],
|
||
});
|
||
const preLoadSkillsRaw = values["pre-load-skills"];
|
||
const systemInfoReminderEnabled =
|
||
systemInfoReminderEnabledOverride ?? !values["no-system-info-reminder"];
|
||
const reflectionOverrides = (() => {
|
||
try {
|
||
return parseReflectionOverrides(values);
|
||
} catch (error) {
|
||
console.error(
|
||
error instanceof Error ? `Error: ${error.message}` : String(error),
|
||
);
|
||
process.exit(1);
|
||
}
|
||
})();
|
||
const maxTurnsRaw = values["max-turns"];
|
||
const tagsRaw = values.tags;
|
||
const resolvedSkillSources = (() => {
|
||
if (skillSourcesOverride) {
|
||
return skillSourcesOverride;
|
||
}
|
||
try {
|
||
return resolveSkillSourcesSelection({
|
||
skillSourcesRaw,
|
||
noSkills: noSkillsFlag,
|
||
noBundledSkills: noBundledSkillsFlag,
|
||
});
|
||
} catch (error) {
|
||
console.error(
|
||
error instanceof Error ? `Error: ${error.message}` : String(error),
|
||
);
|
||
process.exit(1);
|
||
}
|
||
})();
|
||
|
||
const tags = parseCsvListFlag(tagsRaw);
|
||
|
||
// Parse and validate max-turns if provided
|
||
let maxTurns: number | undefined;
|
||
try {
|
||
maxTurns = parsePositiveIntFlag({
|
||
rawValue: maxTurnsRaw,
|
||
flagName: "max-turns",
|
||
});
|
||
} catch (error) {
|
||
console.error(
|
||
`Error: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
if (preLoadSkillsRaw && resolvedSkillSources.length === 0) {
|
||
console.error(
|
||
"Error: --pre-load-skills cannot be used when all skill sources are disabled.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
try {
|
||
const normalized = normalizeConversationShorthandFlags({
|
||
specifiedConversationId,
|
||
specifiedAgentId,
|
||
});
|
||
specifiedConversationId = normalized.specifiedConversationId ?? undefined;
|
||
specifiedAgentId = normalized.specifiedAgentId ?? undefined;
|
||
} catch (error) {
|
||
console.error(
|
||
error instanceof Error ? `Error: ${error.message}` : String(error),
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
// Validate --conv default requires --agent (unless --new-agent will create one)
|
||
try {
|
||
validateConversationDefaultRequiresAgent({
|
||
specifiedConversationId,
|
||
specifiedAgentId,
|
||
forceNew,
|
||
});
|
||
} catch (error) {
|
||
console.error(
|
||
error instanceof Error ? `Error: ${error.message}` : String(error),
|
||
);
|
||
console.error("Usage: letta --agent agent-xyz --conv default");
|
||
console.error(" or: letta --conv agent-xyz (shorthand)");
|
||
process.exit(1);
|
||
}
|
||
|
||
if (fromAgentId) {
|
||
if (!specifiedAgentId && !specifiedConversationId) {
|
||
console.error(
|
||
"Error: --from-agent requires --agent <id> or --conversation <id>.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
if (shouldContinue) {
|
||
console.error("Error: --from-agent cannot be used with --continue");
|
||
process.exit(1);
|
||
}
|
||
if (forceNew) {
|
||
console.error("Error: --from-agent cannot be used with --new-agent");
|
||
process.exit(1);
|
||
}
|
||
if (!specifiedConversationId && !forceNewConversation) {
|
||
forceNewConversation = true;
|
||
}
|
||
}
|
||
|
||
// Validate shared mutual-exclusion rules for startup flags.
|
||
try {
|
||
validateFlagConflicts({
|
||
guard: specifiedConversationId && specifiedConversationId !== "default",
|
||
checks: [
|
||
{
|
||
when: specifiedAgentId,
|
||
message: "--conversation cannot be used with --agent",
|
||
},
|
||
{
|
||
when: specifiedAgentName,
|
||
message: "--conversation cannot be used with --name",
|
||
},
|
||
{
|
||
when: forceNew,
|
||
message: "--conversation cannot be used with --new-agent",
|
||
},
|
||
{
|
||
when: fromAfFile,
|
||
message: "--conversation cannot be used with --import",
|
||
},
|
||
{
|
||
when: shouldContinue,
|
||
message: "--conversation cannot be used with --continue",
|
||
},
|
||
],
|
||
});
|
||
|
||
validateFlagConflicts({
|
||
guard: forceNewConversation,
|
||
checks: [
|
||
{
|
||
when: shouldContinue,
|
||
message: "--new cannot be used with --continue",
|
||
},
|
||
{
|
||
when: specifiedConversationId,
|
||
message: "--new cannot be used with --conversation",
|
||
},
|
||
],
|
||
});
|
||
} catch (error) {
|
||
console.error(
|
||
error instanceof Error ? `Error: ${error.message}` : String(error),
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
// Validate --import flag (also accepts legacy --from-af)
|
||
// Detect if it's a registry handle (e.g., @author/name) or a local file path
|
||
let isRegistryImport = false;
|
||
if (fromAfFile) {
|
||
try {
|
||
validateFlagConflicts({
|
||
guard: fromAfFile,
|
||
checks: [
|
||
{
|
||
when: specifiedAgentId,
|
||
message: "--import cannot be used with --agent",
|
||
},
|
||
{
|
||
when: specifiedAgentName,
|
||
message: "--import cannot be used with --name",
|
||
},
|
||
{
|
||
when: shouldContinue,
|
||
message: "--import cannot be used with --continue",
|
||
},
|
||
{
|
||
when: forceNew,
|
||
message: "--import cannot be used with --new-agent",
|
||
},
|
||
],
|
||
});
|
||
} catch (error) {
|
||
console.error(
|
||
error instanceof Error ? `Error: ${error.message}` : String(error),
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
// Check if this looks like a registry handle (@author/name)
|
||
if (fromAfFile.startsWith("@")) {
|
||
// Definitely a registry handle
|
||
isRegistryImport = true;
|
||
// Validate handle format
|
||
try {
|
||
validateRegistryHandleOrThrow(fromAfFile);
|
||
} catch {
|
||
console.error(
|
||
`Error: Invalid registry handle "${fromAfFile}". Use format: letta --import @author/agentname`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Validate --name flag
|
||
if (specifiedAgentName) {
|
||
if (specifiedAgentId) {
|
||
console.error("Error: --name cannot be used with --agent");
|
||
process.exit(1);
|
||
}
|
||
if (forceNew) {
|
||
console.error("Error: --name cannot be used with --new-agent");
|
||
process.exit(1);
|
||
}
|
||
}
|
||
|
||
if (initBlocksRaw && !forceNew) {
|
||
console.error(
|
||
"Error: --init-blocks can only be used together with --new to control initial memory blocks.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
const initBlocks = parseCsvListFlag(initBlocksRaw);
|
||
|
||
if (baseToolsRaw && !forceNew) {
|
||
console.error(
|
||
"Error: --base-tools can only be used together with --new to control initial base tools.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
const baseTools = parseCsvListFlag(baseToolsRaw);
|
||
|
||
// Validate system prompt options (--system and --system-custom are mutually exclusive)
|
||
if (systemPromptPreset && systemCustom) {
|
||
console.error(
|
||
"Error: --system and --system-custom are mutually exclusive. Use one or the other.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
// Parse memory blocks JSON if provided
|
||
// Supports two formats:
|
||
// - CreateBlock: { label: string, value: string, description?: string }
|
||
// - BlockReference: { blockId: string }
|
||
let memoryBlocks:
|
||
| Array<
|
||
| { label: string; value: string; description?: string }
|
||
| { blockId: string }
|
||
>
|
||
| undefined;
|
||
if (memoryBlocksJson !== undefined) {
|
||
if (!forceNew) {
|
||
console.error(
|
||
"Error: --memory-blocks can only be used together with --new to provide initial memory blocks.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
try {
|
||
memoryBlocks = parseJsonArrayFlag(memoryBlocksJson, "memory-blocks") as
|
||
| Array<{ label: string; value: string; description?: string }>
|
||
| Array<{ blockId: string }>;
|
||
// Validate each block has required fields
|
||
for (const block of memoryBlocks) {
|
||
const hasBlockId =
|
||
"blockId" in block && typeof block.blockId === "string";
|
||
const hasLabelValue =
|
||
"label" in block &&
|
||
"value" in block &&
|
||
typeof block.label === "string" &&
|
||
typeof block.value === "string";
|
||
|
||
if (!hasBlockId && !hasLabelValue) {
|
||
throw new Error(
|
||
"Each memory block must have either 'blockId' (string) or 'label' and 'value' (strings)",
|
||
);
|
||
}
|
||
}
|
||
} catch (error) {
|
||
console.error(
|
||
`Error: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
|
||
// Parse --block-value args (format: label=value)
|
||
let blockValues: Record<string, string> | undefined;
|
||
if (blockValueArgs && blockValueArgs.length > 0) {
|
||
if (!forceNew) {
|
||
console.error(
|
||
"Error: --block-value can only be used together with --new to set block values.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
blockValues = {};
|
||
for (const arg of blockValueArgs) {
|
||
const eqIndex = arg.indexOf("=");
|
||
if (eqIndex === -1) {
|
||
console.error(
|
||
`Error: Invalid --block-value format "${arg}". Expected format: label=value`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
const label = arg.slice(0, eqIndex);
|
||
const value = arg.slice(eqIndex + 1);
|
||
blockValues[label] = value;
|
||
}
|
||
}
|
||
|
||
// Priority 0: --conversation derives agent from conversation ID.
|
||
// "default" is a virtual agent-scoped conversation (not a retrievable conv-*).
|
||
// It requires --agent and should not hit conversations.retrieve().
|
||
if (specifiedConversationId && specifiedConversationId !== "default") {
|
||
try {
|
||
debugLog(
|
||
"conversations",
|
||
`retrieve(${specifiedConversationId}) [headless conv→agent lookup]`,
|
||
);
|
||
const conversation = await client.conversations.retrieve(
|
||
specifiedConversationId,
|
||
);
|
||
agent = await client.agents.retrieve(conversation.agent_id);
|
||
} catch (_error) {
|
||
console.error(`Conversation ${specifiedConversationId} not found`);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
|
||
// Priority 1: Import from AgentFile template (local file or registry)
|
||
if (!agent && fromAfFile) {
|
||
let result: { agent: AgentState; skills?: string[] };
|
||
|
||
if (isRegistryImport) {
|
||
// Import from letta-ai/agent-file registry
|
||
const { importAgentFromRegistry } = await import("./agent/import");
|
||
result = await importAgentFromRegistry({
|
||
handle: fromAfFile,
|
||
modelOverride: model,
|
||
stripMessages: true,
|
||
stripSkills: false,
|
||
});
|
||
} else {
|
||
// Import from local file
|
||
const { importAgentFromFile } = await import("./agent/import");
|
||
result = await importAgentFromFile({
|
||
filePath: fromAfFile,
|
||
modelOverride: model,
|
||
stripMessages: true,
|
||
stripSkills: false,
|
||
});
|
||
}
|
||
|
||
agent = result.agent;
|
||
|
||
// Mark imported agents as "custom" to prevent legacy auto-migration
|
||
// from overwriting their system prompt on resume.
|
||
if (settingsManager.isReady) {
|
||
settingsManager.setSystemPromptPreset(agent.id, "custom");
|
||
}
|
||
|
||
// Display extracted skills summary
|
||
if (result.skills && result.skills.length > 0) {
|
||
const { getAgentSkillsDir } = await import("./agent/skills");
|
||
const skillsDir = getAgentSkillsDir(agent.id);
|
||
console.log(
|
||
`📦 Extracted ${result.skills.length} skill${result.skills.length === 1 ? "" : "s"} to ${skillsDir}: ${result.skills.join(", ")}`,
|
||
);
|
||
}
|
||
}
|
||
|
||
// Priority 2: Try to use --agent specified ID
|
||
if (!agent && specifiedAgentId) {
|
||
try {
|
||
agent = await client.agents.retrieve(specifiedAgentId);
|
||
} catch (_error) {
|
||
console.error(`Agent ${specifiedAgentId} not found`);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
|
||
// Priority 3: Check if --new flag was passed (skip all resume logic)
|
||
if (!agent && forceNew) {
|
||
const updateArgs = getModelUpdateArgs(model);
|
||
// Pre-determine memfs mode so the agent is created with the correct prompt.
|
||
const { isLettaCloud, enableMemfsIfCloud } = await import(
|
||
"./agent/memoryFilesystem"
|
||
);
|
||
const willAutoEnableMemfs =
|
||
shouldAutoEnableMemfsForNewAgent && (await isLettaCloud());
|
||
const effectiveMemoryMode =
|
||
requestedMemoryPromptMode ?? (willAutoEnableMemfs ? "memfs" : undefined);
|
||
|
||
const createOptions = {
|
||
model,
|
||
embeddingModel,
|
||
updateArgs,
|
||
skillsDirectory,
|
||
parallelToolCalls: true,
|
||
systemPromptPreset,
|
||
systemPromptCustom: systemCustom,
|
||
memoryPromptMode: effectiveMemoryMode,
|
||
initBlocks,
|
||
baseTools,
|
||
memoryBlocks,
|
||
blockValues,
|
||
tags,
|
||
};
|
||
const result = await createAgent(createOptions);
|
||
agent = result.agent;
|
||
|
||
// Enable memfs on Letta Cloud (tags, repo clone, tool detach).
|
||
if (willAutoEnableMemfs) {
|
||
await enableMemfsIfCloud(agent.id);
|
||
}
|
||
}
|
||
|
||
// Priority 4: Try to resume from project settings (.letta/settings.local.json)
|
||
if (!agent) {
|
||
await settingsManager.loadLocalProjectSettings();
|
||
const localAgentId = settingsManager.getLocalLastAgentId(process.cwd());
|
||
if (localAgentId) {
|
||
try {
|
||
agent = await client.agents.retrieve(localAgentId);
|
||
} catch (_error) {
|
||
// Local LRU agent doesn't exist - log and continue
|
||
console.error(`Unable to locate agent ${localAgentId} in .letta/`);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Priority 5: Try to reuse global LRU (covers directory-switching case)
|
||
// Do NOT restore global conversation — use default (project-scoped conversations)
|
||
if (!agent) {
|
||
const globalAgentId = settingsManager.getGlobalLastAgentId();
|
||
if (globalAgentId) {
|
||
try {
|
||
agent = await client.agents.retrieve(globalAgentId);
|
||
} catch (_error) {
|
||
// Global LRU agent doesn't exist
|
||
}
|
||
}
|
||
}
|
||
|
||
// Priority 6: --continue with no agent found → error
|
||
if (!agent && shouldContinue) {
|
||
console.error("No recent session found in .letta/ or ~/.letta.");
|
||
console.error("Run 'letta' to get started.");
|
||
process.exit(1);
|
||
}
|
||
|
||
// Priority 7: Fresh user with no LRU - create default agent
|
||
if (!agent) {
|
||
const { ensureDefaultAgents } = await import("./agent/defaults");
|
||
const defaultAgent = await ensureDefaultAgents(client);
|
||
if (defaultAgent) {
|
||
agent = defaultAgent;
|
||
}
|
||
}
|
||
|
||
// All paths should have resolved to an agent by now
|
||
if (!agent) {
|
||
console.error("No agent found. Use --new-agent to create a new agent.");
|
||
process.exit(1);
|
||
}
|
||
markMilestone("HEADLESS_AGENT_RESOLVED");
|
||
|
||
// Check if we're resuming an existing agent (not creating a new one)
|
||
const isResumingAgent = !!(
|
||
specifiedAgentId ||
|
||
shouldContinue ||
|
||
(!forceNew && !fromAfFile)
|
||
);
|
||
|
||
// If resuming, always refresh model settings from presets to keep
|
||
// preset-derived fields in sync, then apply optional command-line
|
||
// overrides (model/system prompt).
|
||
if (isResumingAgent) {
|
||
if (model) {
|
||
const modelHandle = resolveModel(model);
|
||
if (typeof modelHandle !== "string") {
|
||
console.error(`Error: Invalid model "${model}"`);
|
||
process.exit(1);
|
||
}
|
||
|
||
// Always apply model update - different model IDs can share the same
|
||
// handle but have different settings (e.g., gpt-5.2-medium vs gpt-5.2-xhigh)
|
||
const updateArgs = getModelUpdateArgs(model);
|
||
agent = await updateAgentLLMConfig(agent.id, modelHandle, updateArgs);
|
||
} else {
|
||
const presetRefresh = getModelPresetUpdateForAgent(agent);
|
||
if (presetRefresh) {
|
||
const { updateArgs: resumeRefreshUpdateArgs, needsUpdate } =
|
||
getResumeRefreshArgs(presetRefresh.updateArgs, agent);
|
||
|
||
if (needsUpdate) {
|
||
agent = await updateAgentLLMConfig(
|
||
agent.id,
|
||
presetRefresh.modelHandle,
|
||
resumeRefreshUpdateArgs,
|
||
{ preserveContextWindow: true },
|
||
);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Determine which conversation to use
|
||
let conversationId: string;
|
||
let effectiveReflectionSettings: ReflectionSettings;
|
||
|
||
const isSubagent = process.env.LETTA_CODE_AGENT_ROLE === "subagent";
|
||
|
||
// Captured so prompt logic below can await it when needed.
|
||
let memfsBgPromise: Promise<unknown> | undefined;
|
||
|
||
// Apply memfs flags and auto-enable from server tag when local settings are missing.
|
||
// Respects memfsStartupPolicy:
|
||
// "blocking" (default) – await the pull; exit on conflict.
|
||
// "background" – fire pull async; session init proceeds immediately.
|
||
// "skip" – skip the pull this session.
|
||
if (memfsStartupPolicy === "skip") {
|
||
// Run enable/disable logic but skip the git pull.
|
||
try {
|
||
const { applyMemfsFlags } = await import("./agent/memoryFilesystem");
|
||
await applyMemfsFlags(agent.id, memfsFlag, noMemfsFlag, {
|
||
pullOnExistingRepo: false,
|
||
agentTags: agent.tags,
|
||
skipPromptUpdate: forceNew,
|
||
});
|
||
} catch (error) {
|
||
console.error(
|
||
`Memory flags failed: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
} else if (memfsStartupPolicy === "background") {
|
||
// Fire pull async; don't block session initialisation.
|
||
const { applyMemfsFlags } = await import("./agent/memoryFilesystem");
|
||
memfsBgPromise = applyMemfsFlags(agent.id, memfsFlag, noMemfsFlag, {
|
||
pullOnExistingRepo: true,
|
||
agentTags: agent.tags,
|
||
skipPromptUpdate: forceNew,
|
||
}).catch((error) => {
|
||
// Log to stderr only — the session is already live.
|
||
console.error(
|
||
`[memfs background pull] ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
});
|
||
} else {
|
||
// "blocking" — original behaviour.
|
||
try {
|
||
const { applyMemfsFlags } = await import("./agent/memoryFilesystem");
|
||
const memfsResult = await applyMemfsFlags(
|
||
agent.id,
|
||
memfsFlag,
|
||
noMemfsFlag,
|
||
{
|
||
pullOnExistingRepo: true,
|
||
agentTags: agent.tags,
|
||
skipPromptUpdate: forceNew,
|
||
},
|
||
);
|
||
if (memfsResult.pullSummary?.includes("CONFLICT")) {
|
||
console.error(
|
||
"Memory has merge conflicts. Run in interactive mode to resolve.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
} catch (error) {
|
||
console.error(
|
||
`Memory git sync failed: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
|
||
// Ensure background memfs sync settles before prompt logic reads isMemfsEnabled().
|
||
if (memfsBgPromise && isResumingAgent) {
|
||
await memfsBgPromise;
|
||
}
|
||
|
||
// Apply --system flag after memfs sync so isMemfsEnabled() is up to date.
|
||
if (isResumingAgent && systemPromptPreset) {
|
||
const result = await updateAgentSystemPrompt(agent.id, systemPromptPreset);
|
||
if (!result.success || !result.agent) {
|
||
console.error(`Failed to update system prompt: ${result.message}`);
|
||
process.exit(1);
|
||
}
|
||
agent = result.agent;
|
||
}
|
||
|
||
// Auto-heal system prompt drift (rebuild from stored recipe).
|
||
// Runs after memfs sync so isMemfsEnabled() reflects the final state.
|
||
if (isResumingAgent && !systemPromptPreset) {
|
||
let storedPreset = settingsManager.getSystemPromptPreset(agent.id);
|
||
|
||
// Adopt legacy agents (created before recipe tracking) as "custom"
|
||
// so their prompts are left untouched by auto-heal.
|
||
if (
|
||
!storedPreset &&
|
||
agent.tags?.includes("origin:letta-code") &&
|
||
!agent.tags?.includes("role:subagent")
|
||
) {
|
||
storedPreset = "custom";
|
||
settingsManager.setSystemPromptPreset(agent.id, storedPreset);
|
||
}
|
||
|
||
if (storedPreset && storedPreset !== "custom") {
|
||
const { buildSystemPrompt: rebuildPrompt, isKnownPreset: isKnown } =
|
||
await import("./agent/promptAssets");
|
||
if (isKnown(storedPreset)) {
|
||
const memoryMode = settingsManager.isMemfsEnabled(agent.id)
|
||
? "memfs"
|
||
: "standard";
|
||
const expected = rebuildPrompt(storedPreset, memoryMode);
|
||
if (agent.system !== expected) {
|
||
const client = await getClient();
|
||
await client.agents.update(agent.id, { system: expected });
|
||
agent = await client.agents.retrieve(agent.id);
|
||
}
|
||
} else {
|
||
settingsManager.clearSystemPromptPreset(agent.id);
|
||
}
|
||
}
|
||
}
|
||
|
||
try {
|
||
effectiveReflectionSettings = await applyReflectionOverrides(
|
||
agent.id,
|
||
reflectionOverrides,
|
||
);
|
||
} catch (error) {
|
||
console.error(
|
||
`Failed to apply sleeptime settings: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
// Determine which blocks to isolate for the conversation
|
||
const isolatedBlockLabels: string[] =
|
||
initBlocks === undefined
|
||
? [...ISOLATED_BLOCK_LABELS]
|
||
: ISOLATED_BLOCK_LABELS.filter((label) =>
|
||
initBlocks.includes(label as string),
|
||
);
|
||
|
||
if (specifiedConversationId) {
|
||
if (specifiedConversationId === "default") {
|
||
// "default" is the agent's primary message history (no explicit conversation)
|
||
// Don't validate - just use it directly
|
||
conversationId = "default";
|
||
} else {
|
||
// User specified an explicit conversation to resume - validate it exists
|
||
try {
|
||
debugLog(
|
||
"conversations",
|
||
`retrieve(${specifiedConversationId}) [headless --conv validate]`,
|
||
);
|
||
await client.conversations.retrieve(specifiedConversationId);
|
||
conversationId = specifiedConversationId;
|
||
} catch {
|
||
console.error(
|
||
`Error: Conversation ${specifiedConversationId} not found`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
} else if (shouldContinue) {
|
||
// Try to resume the last conversation for this agent
|
||
await settingsManager.loadLocalProjectSettings();
|
||
const lastSession =
|
||
settingsManager.getLocalLastSession(process.cwd()) ??
|
||
settingsManager.getGlobalLastSession();
|
||
|
||
if (lastSession && lastSession.agentId === agent.id) {
|
||
if (lastSession.conversationId === "default") {
|
||
// "default" is always valid - just use it directly
|
||
conversationId = "default";
|
||
} else {
|
||
// Verify the conversation still exists
|
||
try {
|
||
debugLog(
|
||
"conversations",
|
||
`retrieve(${lastSession.conversationId}) [headless lastSession resume]`,
|
||
);
|
||
await client.conversations.retrieve(lastSession.conversationId);
|
||
conversationId = lastSession.conversationId;
|
||
} catch {
|
||
// Conversation no longer exists - error with helpful message
|
||
console.error(
|
||
`Attempting to resume conversation ${lastSession.conversationId}, but conversation was not found.`,
|
||
);
|
||
console.error(
|
||
"Resume the default conversation with 'letta -p ...', view recent conversations with 'letta --resume', or start a new conversation with 'letta -p ... --new'.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
}
|
||
} else {
|
||
// No matching session - error with helpful message
|
||
console.error("No previous session found for this agent to resume.");
|
||
console.error(
|
||
"Resume the default conversation with 'letta -p ...', or start a new conversation with 'letta -p ... --new'.",
|
||
);
|
||
process.exit(1);
|
||
}
|
||
} else if (forceNewConversation) {
|
||
// --new flag: create a new conversation (for concurrent sessions)
|
||
const conversation = await client.conversations.create({
|
||
agent_id: agent.id,
|
||
isolated_block_labels: isolatedBlockLabels,
|
||
});
|
||
conversationId = conversation.id;
|
||
} else if (isSubagent) {
|
||
// Freshly created subagents have no concurrency risk — use the default
|
||
// conversation so it's easy to inspect in the ADE.
|
||
conversationId = "default";
|
||
} else {
|
||
// Default for headless: always create a new conversation to avoid
|
||
// 409 "conversation busy" races (e.g., parent agent calling letta -p).
|
||
// Use --conv default to explicitly target the agent's
|
||
// primary conversation.
|
||
const conversation = await client.conversations.create({
|
||
agent_id: agent.id,
|
||
isolated_block_labels: isolatedBlockLabels,
|
||
});
|
||
conversationId = conversation.id;
|
||
}
|
||
markMilestone("HEADLESS_CONVERSATION_READY");
|
||
|
||
// Set conversation ID in context for tools (e.g., Skill tool) to access
|
||
setConversationId(conversationId);
|
||
|
||
// Save session (agent + conversation) to both project and global settings
|
||
// Skip for subagents - they shouldn't pollute the LRU settings
|
||
if (!isSubagent) {
|
||
await settingsManager.loadLocalProjectSettings();
|
||
settingsManager.setLocalLastSession(
|
||
{ agentId: agent.id, conversationId },
|
||
process.cwd(),
|
||
);
|
||
settingsManager.setGlobalLastSession({
|
||
agentId: agent.id,
|
||
conversationId,
|
||
});
|
||
}
|
||
|
||
// Set agent context for tools that need it (e.g., Skill tool, Task tool)
|
||
setAgentContext(agent.id, skillsDirectory, resolvedSkillSources);
|
||
|
||
// Validate output format
|
||
const outputFormat = values["output-format"] || "text";
|
||
const includePartialMessages = Boolean(values["include-partial-messages"]);
|
||
if (!["text", "json", "stream-json"].includes(outputFormat)) {
|
||
console.error(
|
||
`Error: Invalid output format "${outputFormat}". Valid formats: text, json, stream-json`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
if (inputFormat && inputFormat !== "stream-json") {
|
||
console.error(
|
||
`Error: Invalid input format "${inputFormat}". Valid formats: stream-json`,
|
||
);
|
||
process.exit(1);
|
||
}
|
||
|
||
const { getClientToolsFromRegistry } = await import("./tools/manager");
|
||
const loadedToolNames = getClientToolsFromRegistry().map((t) => t.name);
|
||
const availableTools =
|
||
loadedToolNames.length > 0
|
||
? loadedToolNames
|
||
: agent.tools?.map((t) => t.name).filter((n): n is string => !!n) || [];
|
||
|
||
// If input-format is stream-json, use bidirectional mode
|
||
if (isBidirectionalMode) {
|
||
await runBidirectionalMode(
|
||
agent,
|
||
conversationId,
|
||
client,
|
||
outputFormat,
|
||
includePartialMessages,
|
||
availableTools,
|
||
resolvedSkillSources,
|
||
systemInfoReminderEnabled,
|
||
effectiveReflectionSettings,
|
||
);
|
||
return;
|
||
}
|
||
|
||
// Create buffers to accumulate stream (pass agent.id for server-side tool hooks)
|
||
const buffers = createBuffers(agent.id);
|
||
|
||
// Initialize session stats
|
||
const sessionStats = new SessionStats();
|
||
|
||
// Use agent.id as session_id for all stream-json messages
|
||
const sessionId = agent.id;
|
||
|
||
// Output init event for stream-json format
|
||
if (outputFormat === "stream-json") {
|
||
const initEvent: SystemInitMessage = {
|
||
type: "system",
|
||
subtype: "init",
|
||
session_id: sessionId,
|
||
agent_id: agent.id,
|
||
conversation_id: conversationId,
|
||
model: agent.llm_config?.model ?? "",
|
||
tools: availableTools,
|
||
cwd: process.cwd(),
|
||
mcp_servers: [],
|
||
permission_mode: "",
|
||
slash_commands: [],
|
||
memfs_enabled: settingsManager.isMemfsEnabled(agent.id),
|
||
skill_sources: resolvedSkillSources,
|
||
system_info_reminder_enabled: systemInfoReminderEnabled,
|
||
reflection_trigger: effectiveReflectionSettings.trigger,
|
||
reflection_step_count: effectiveReflectionSettings.stepCount,
|
||
uuid: `init-${agent.id}`,
|
||
};
|
||
console.log(JSON.stringify(initEvent));
|
||
}
|
||
|
||
const reminderContextTracker = createContextTracker();
|
||
const sharedReminderState = createSharedReminderState();
|
||
|
||
// Helper to resolve any pending approvals before sending user input
|
||
const resolveAllPendingApprovals = async () => {
|
||
const { getResumeData } = await import("./agent/check-approval");
|
||
while (true) {
|
||
// Re-fetch agent to get latest in-context messages (source of truth for backend)
|
||
const freshAgent = await client.agents.retrieve(agent.id);
|
||
|
||
let resume: Awaited<ReturnType<typeof getResumeData>>;
|
||
try {
|
||
resume = await getResumeData(client, freshAgent, conversationId);
|
||
} catch (error) {
|
||
// Treat 404/422 as "no approvals" - stale message/conversation state
|
||
if (
|
||
error instanceof APIError &&
|
||
(error.status === 404 || error.status === 422)
|
||
) {
|
||
break;
|
||
}
|
||
throw error;
|
||
}
|
||
|
||
// Use plural field for parallel tool calls
|
||
const pendingApprovals = resume.pendingApprovals || [];
|
||
if (pendingApprovals.length === 0) break;
|
||
|
||
// Phase 1: Collect decisions for all approvals
|
||
type Decision =
|
||
| {
|
||
type: "approve";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
reason: string;
|
||
matchedRule: string;
|
||
}
|
||
| {
|
||
type: "deny";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
reason: string;
|
||
};
|
||
|
||
const { autoAllowed, autoDenied } = await classifyApprovals(
|
||
pendingApprovals,
|
||
{
|
||
alwaysRequiresUserInput: isInteractiveApprovalTool,
|
||
treatAskAsDeny: true,
|
||
denyReasonForAsk: "Tool requires approval (headless mode)",
|
||
requireArgsForAutoApprove: true,
|
||
missingNameReason: "Tool call incomplete - missing name",
|
||
},
|
||
);
|
||
|
||
const decisions: Decision[] = [
|
||
...autoAllowed.map((ac) => ({
|
||
type: "approve" as const,
|
||
approval: ac.approval,
|
||
reason: ac.permission.reason || "Allowed by permission rule",
|
||
matchedRule:
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? ac.permission.matchedRule
|
||
: "auto-approved",
|
||
})),
|
||
...autoDenied.map((ac) => {
|
||
const fallback =
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? `Permission denied: ${ac.permission.matchedRule}`
|
||
: ac.permission.reason
|
||
? `Permission denied: ${ac.permission.reason}`
|
||
: "Permission denied: Unknown reason";
|
||
return {
|
||
type: "deny" as const,
|
||
approval: ac.approval,
|
||
reason: ac.denyReason ?? fallback,
|
||
};
|
||
}),
|
||
];
|
||
|
||
// Phase 2: Execute approved tools and format results using shared function
|
||
const { executeApprovalBatch } = await import(
|
||
"./agent/approval-execution"
|
||
);
|
||
|
||
// Emit auto_approval events for stream-json format
|
||
if (outputFormat === "stream-json") {
|
||
for (const decision of decisions) {
|
||
if (decision.type === "approve") {
|
||
const autoApprovalMsg: AutoApprovalMessage = {
|
||
type: "auto_approval",
|
||
tool_call: {
|
||
name: decision.approval.toolName,
|
||
tool_call_id: decision.approval.toolCallId,
|
||
arguments: decision.approval.toolArgs,
|
||
},
|
||
reason: decision.reason,
|
||
matched_rule: decision.matchedRule,
|
||
session_id: sessionId,
|
||
uuid: `auto-approval-${decision.approval.toolCallId}`,
|
||
};
|
||
console.log(JSON.stringify(autoApprovalMsg));
|
||
}
|
||
}
|
||
}
|
||
|
||
const executedResults = await executeApprovalBatch(decisions);
|
||
|
||
// Send all results in one batch
|
||
const approvalInput: ApprovalCreate = {
|
||
type: "approval",
|
||
approvals: executedResults as ApprovalResult[],
|
||
};
|
||
|
||
// Inject queued skill content as user message parts (LET-7353)
|
||
const approvalMessages: Array<
|
||
| import("@letta-ai/letta-client/resources/agents/agents").MessageCreate
|
||
| import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate
|
||
> = [approvalInput];
|
||
{
|
||
const { consumeQueuedSkillContent } = await import(
|
||
"./tools/impl/skillContentRegistry"
|
||
);
|
||
const skillContents = consumeQueuedSkillContent();
|
||
if (skillContents.length > 0) {
|
||
approvalMessages.push({
|
||
role: "user" as const,
|
||
content: skillContents.map((sc) => ({
|
||
type: "text" as const,
|
||
text: sc.content,
|
||
})),
|
||
});
|
||
}
|
||
}
|
||
|
||
// Send the approval to clear the pending state; drain the stream without output
|
||
const approvalStream = await sendMessageStream(
|
||
conversationId,
|
||
approvalMessages,
|
||
{ agentId: agent.id },
|
||
);
|
||
const drainResult = await drainStreamWithResume(
|
||
approvalStream,
|
||
createBuffers(agent.id),
|
||
() => {},
|
||
undefined,
|
||
undefined,
|
||
undefined,
|
||
reminderContextTracker,
|
||
);
|
||
// If the approval drain errored or was cancelled, abort rather than
|
||
// looping back and re-fetching approvals (which would restart the cycle).
|
||
if (
|
||
drainResult.stopReason === "error" ||
|
||
drainResult.stopReason === "cancelled"
|
||
) {
|
||
throw new Error(
|
||
`Approval drain ended with stop reason: ${drainResult.stopReason}`,
|
||
);
|
||
}
|
||
}
|
||
};
|
||
|
||
// Clear any pending approvals before starting a new turn - ONLY when resuming (LET-7101)
|
||
// For new agents/conversations, lazy recovery handles any edge cases
|
||
if (isResumingAgent) {
|
||
await resolveAllPendingApprovals();
|
||
}
|
||
|
||
// Build message content with reminders
|
||
const contentParts: MessageCreate["content"] = [];
|
||
const pushPart = (text: string) => {
|
||
if (!text) return;
|
||
contentParts.push({ type: "text", text });
|
||
};
|
||
|
||
if (fromAgentId) {
|
||
const senderAgentId = fromAgentId;
|
||
const senderAgent = await client.agents.retrieve(senderAgentId);
|
||
const systemReminder = `${SYSTEM_REMINDER_OPEN}
|
||
This message is from "${senderAgent.name}" (agent ID: ${senderAgentId}), an agent currently running inside the Letta Code CLI (docs.letta.com/letta-code).
|
||
The sender will only see the final message you generate (not tool calls or reasoning).
|
||
If you need to share detailed information, include it in your response text.
|
||
${SYSTEM_REMINDER_CLOSE}
|
||
|
||
`;
|
||
pushPart(systemReminder);
|
||
}
|
||
|
||
syncReminderStateFromContextTracker(
|
||
sharedReminderState,
|
||
reminderContextTracker,
|
||
);
|
||
const lastRunAt = (agent as { last_run_completion?: string })
|
||
.last_run_completion;
|
||
const { parts: sharedReminderParts } = await buildSharedReminderParts({
|
||
mode: isSubagent ? "subagent" : "headless-one-shot",
|
||
agent: {
|
||
id: agent.id,
|
||
name: agent.name,
|
||
description: agent.description,
|
||
lastRunAt: lastRunAt ?? null,
|
||
},
|
||
state: sharedReminderState,
|
||
sessionContextReminderEnabled: systemInfoReminderEnabled,
|
||
reflectionSettings: effectiveReflectionSettings,
|
||
skillSources: resolvedSkillSources,
|
||
resolvePlanModeReminder: async () => {
|
||
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
|
||
return PLAN_MODE_REMINDER;
|
||
},
|
||
});
|
||
for (const part of sharedReminderParts) {
|
||
pushPart(part.text);
|
||
}
|
||
|
||
// Pre-load specific skills' full content (used by subagents with skills: field)
|
||
if (preLoadSkillsRaw) {
|
||
const { readFile: readFileAsync } = await import("node:fs/promises");
|
||
const { skillPathById } = await buildClientSkillsPayload({
|
||
agentId: agent.id,
|
||
skillSources: resolvedSkillSources,
|
||
logger: (message) => {
|
||
if (process.env.DEBUG) {
|
||
console.warn(`[DEBUG] ${message}`);
|
||
}
|
||
},
|
||
});
|
||
const skillIds = preLoadSkillsRaw
|
||
.split(",")
|
||
.map((s) => s.trim())
|
||
.filter(Boolean);
|
||
const loadedContents: string[] = [];
|
||
for (const skillId of skillIds) {
|
||
const skillPath = skillPathById[skillId];
|
||
if (!skillPath) continue;
|
||
try {
|
||
const content = await readFileAsync(skillPath, "utf-8");
|
||
loadedContents.push(`<${skillId}>\n${content}\n</${skillId}>`);
|
||
} catch {
|
||
// Skill file not readable, skip
|
||
}
|
||
}
|
||
if (loadedContents.length > 0) {
|
||
pushPart(
|
||
`<loaded_skills>\n${loadedContents.join("\n\n")}\n</loaded_skills>`,
|
||
);
|
||
}
|
||
}
|
||
|
||
// Add user prompt
|
||
pushPart(prompt);
|
||
|
||
// Start with the user message
|
||
let currentInput: Array<MessageCreate | ApprovalCreate> = [
|
||
{
|
||
role: "user",
|
||
content: contentParts,
|
||
},
|
||
];
|
||
|
||
// Track lastRunId outside the while loop so it's available in catch block
|
||
let lastKnownRunId: string | null = null;
|
||
let llmApiErrorRetries = 0;
|
||
let emptyResponseRetries = 0;
|
||
let conversationBusyRetries = 0;
|
||
markMilestone("HEADLESS_FIRST_STREAM_START");
|
||
measureSinceMilestone("headless-setup-total", "HEADLESS_CLIENT_READY");
|
||
|
||
// Helper to check max turns limit using server-side step count from buffers
|
||
const checkMaxTurns = () => {
|
||
if (maxTurns !== undefined && buffers.usage.stepCount >= maxTurns) {
|
||
if (outputFormat === "stream-json") {
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: `Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`,
|
||
stop_reason: "max_steps",
|
||
session_id: sessionId,
|
||
uuid: `error-max-turns-${randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
} else {
|
||
console.error(
|
||
`Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`,
|
||
);
|
||
}
|
||
process.exit(1);
|
||
}
|
||
};
|
||
|
||
try {
|
||
while (true) {
|
||
// Check max turns limit before starting a new turn (uses server-side step count)
|
||
checkMaxTurns();
|
||
|
||
// Inject queued skill content as user message parts (LET-7353)
|
||
{
|
||
const { consumeQueuedSkillContent } = await import(
|
||
"./tools/impl/skillContentRegistry"
|
||
);
|
||
const skillContents = consumeQueuedSkillContent();
|
||
if (skillContents.length > 0) {
|
||
currentInput = [
|
||
...currentInput,
|
||
{
|
||
role: "user" as const,
|
||
content: skillContents.map((sc) => ({
|
||
type: "text" as const,
|
||
text: sc.content,
|
||
})),
|
||
},
|
||
];
|
||
}
|
||
}
|
||
|
||
// Wrap sendMessageStream in try-catch to handle pre-stream errors (e.g., 409)
|
||
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
|
||
let turnToolContextId: string | null = null;
|
||
try {
|
||
stream = await sendMessageStream(conversationId, currentInput, {
|
||
agentId: agent.id,
|
||
});
|
||
turnToolContextId = getStreamToolContextId(stream);
|
||
} catch (preStreamError) {
|
||
// Extract error detail using shared helper (handles nested/direct/message shapes)
|
||
const errorDetail = extractConflictDetail(preStreamError);
|
||
|
||
const preStreamAction = getPreStreamErrorAction(
|
||
errorDetail,
|
||
conversationBusyRetries,
|
||
CONVERSATION_BUSY_MAX_RETRIES,
|
||
{
|
||
status:
|
||
preStreamError instanceof APIError
|
||
? preStreamError.status
|
||
: undefined,
|
||
transientRetries: llmApiErrorRetries,
|
||
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
|
||
},
|
||
);
|
||
|
||
// Check for pending approval blocking new messages - resolve and retry.
|
||
// This is distinct from "conversation busy" and needs approval resolution,
|
||
// not just a timed delay.
|
||
if (preStreamAction === "resolve_approval_pending") {
|
||
if (outputFormat === "stream-json") {
|
||
const recoveryMsg: RecoveryMessage = {
|
||
type: "recovery",
|
||
recovery_type: "approval_pending",
|
||
message:
|
||
"Detected pending approval conflict on send; resolving before retry",
|
||
session_id: sessionId,
|
||
uuid: `recovery-pre-stream-${randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(recoveryMsg));
|
||
} else {
|
||
console.error(
|
||
"Pending approval detected, resolving before retry...",
|
||
);
|
||
}
|
||
|
||
await resolveAllPendingApprovals();
|
||
continue;
|
||
}
|
||
|
||
// Check for 409 "conversation busy" error - retry once with delay
|
||
if (preStreamAction === "retry_conversation_busy") {
|
||
conversationBusyRetries += 1;
|
||
const retryDelayMs = getRetryDelayMs({
|
||
category: "conversation_busy",
|
||
attempt: conversationBusyRetries,
|
||
});
|
||
|
||
// Emit retry message for stream-json mode
|
||
if (outputFormat === "stream-json") {
|
||
const retryMsg: RetryMessage = {
|
||
type: "retry",
|
||
reason: "error", // 409 conversation busy is a pre-stream error
|
||
attempt: conversationBusyRetries,
|
||
max_attempts: CONVERSATION_BUSY_MAX_RETRIES,
|
||
delay_ms: retryDelayMs,
|
||
session_id: sessionId,
|
||
uuid: `retry-conversation-busy-${randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(retryMsg));
|
||
} else {
|
||
console.error(
|
||
`Conversation is busy, waiting ${Math.round(retryDelayMs / 1000)}s and retrying...`,
|
||
);
|
||
}
|
||
|
||
// Wait before retry
|
||
await new Promise((resolve) => setTimeout(resolve, retryDelayMs));
|
||
continue;
|
||
}
|
||
|
||
if (preStreamAction === "retry_transient") {
|
||
const attempt = llmApiErrorRetries + 1;
|
||
const retryAfterMs =
|
||
preStreamError instanceof APIError
|
||
? parseRetryAfterHeaderMs(
|
||
preStreamError.headers?.get("retry-after"),
|
||
)
|
||
: null;
|
||
const delayMs = getRetryDelayMs({
|
||
category: "transient_provider",
|
||
attempt,
|
||
detail: errorDetail,
|
||
retryAfterMs,
|
||
});
|
||
|
||
llmApiErrorRetries = attempt;
|
||
|
||
if (outputFormat === "stream-json") {
|
||
const retryMsg: RetryMessage = {
|
||
type: "retry",
|
||
reason: "llm_api_error",
|
||
attempt,
|
||
max_attempts: LLM_API_ERROR_MAX_RETRIES,
|
||
delay_ms: delayMs,
|
||
session_id: sessionId,
|
||
uuid: `retry-pre-stream-${randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(retryMsg));
|
||
} else {
|
||
const delaySeconds = Math.round(delayMs / 1000);
|
||
console.error(
|
||
`Transient API error before streaming (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
|
||
);
|
||
}
|
||
|
||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||
conversationBusyRetries = 0;
|
||
continue;
|
||
}
|
||
|
||
// Reset conversation busy retry counter on other errors
|
||
conversationBusyRetries = 0;
|
||
|
||
// Re-throw to outer catch for other errors
|
||
throw preStreamError;
|
||
}
|
||
|
||
// For stream-json, output each chunk as it arrives
|
||
let stopReason: StopReasonType | null = null;
|
||
let approvals: Array<{
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
}> = [];
|
||
let apiDurationMs: number;
|
||
let lastRunId: string | null = null;
|
||
let approvalPendingRecovery = false;
|
||
|
||
if (outputFormat === "stream-json") {
|
||
// Track approval requests across streamed chunks
|
||
const autoApprovalEmitted = new Set<string>();
|
||
|
||
const streamJsonHook: DrainStreamHook = async ({
|
||
chunk,
|
||
shouldOutput,
|
||
errorInfo,
|
||
updatedApproval,
|
||
}) => {
|
||
let shouldOutputChunk = shouldOutput;
|
||
|
||
if (errorInfo && shouldOutput) {
|
||
const errorEvent: ErrorMessage = {
|
||
type: "error",
|
||
message: errorInfo.message,
|
||
stop_reason: "error",
|
||
run_id: errorInfo.run_id,
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
...(errorInfo.error_type &&
|
||
errorInfo.run_id && {
|
||
api_error: {
|
||
message_type: "error_message",
|
||
message: errorInfo.message,
|
||
error_type: errorInfo.error_type,
|
||
detail: errorInfo.detail,
|
||
run_id: errorInfo.run_id,
|
||
},
|
||
}),
|
||
};
|
||
console.log(JSON.stringify(errorEvent));
|
||
shouldOutputChunk = false;
|
||
}
|
||
|
||
// Detect server conflict due to pending approval; handle it and retry
|
||
// Check both detail and message fields since error formats vary
|
||
if (
|
||
isApprovalPendingError(errorInfo?.detail) ||
|
||
isApprovalPendingError(errorInfo?.message)
|
||
) {
|
||
const recoveryRunId = errorInfo?.run_id;
|
||
const recoveryMsg: RecoveryMessage = {
|
||
type: "recovery",
|
||
recovery_type: "approval_pending",
|
||
message:
|
||
"Detected pending approval conflict; auto-denying stale approval and retrying",
|
||
run_id: recoveryRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `recovery-${recoveryRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(recoveryMsg));
|
||
approvalPendingRecovery = true;
|
||
return { stopReason: "error", shouldAccumulate: true };
|
||
}
|
||
|
||
// Check if this approval will be auto-approved. Dedup per tool_call_id
|
||
if (
|
||
updatedApproval &&
|
||
!autoApprovalEmitted.has(updatedApproval.toolCallId)
|
||
) {
|
||
const { autoAllowed } = await classifyApprovals([updatedApproval], {
|
||
alwaysRequiresUserInput: isInteractiveApprovalTool,
|
||
requireArgsForAutoApprove: true,
|
||
missingNameReason: "Tool call incomplete - missing name",
|
||
});
|
||
|
||
const [approval] = autoAllowed;
|
||
if (approval) {
|
||
const permission = approval.permission;
|
||
shouldOutputChunk = false;
|
||
const autoApprovalMsg: AutoApprovalMessage = {
|
||
type: "auto_approval",
|
||
tool_call: {
|
||
name: approval.approval.toolName,
|
||
tool_call_id: approval.approval.toolCallId,
|
||
arguments: approval.approval.toolArgs || "{}",
|
||
},
|
||
reason: permission.reason || "Allowed by permission rule",
|
||
matched_rule:
|
||
"matchedRule" in permission && permission.matchedRule
|
||
? permission.matchedRule
|
||
: "auto-approved",
|
||
session_id: sessionId,
|
||
uuid: `auto-approval-${approval.approval.toolCallId}`,
|
||
};
|
||
console.log(JSON.stringify(autoApprovalMsg));
|
||
autoApprovalEmitted.add(approval.approval.toolCallId);
|
||
}
|
||
}
|
||
|
||
if (shouldOutputChunk) {
|
||
const chunkWithIds = chunk as typeof chunk & {
|
||
otid?: string;
|
||
id?: string;
|
||
};
|
||
const uuid = chunkWithIds.otid || chunkWithIds.id;
|
||
|
||
if (includePartialMessages) {
|
||
const streamEvent: StreamEvent = {
|
||
type: "stream_event",
|
||
event: chunk,
|
||
session_id: sessionId,
|
||
uuid: uuid || randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(streamEvent));
|
||
} else {
|
||
const msg: MessageWire = {
|
||
type: "message",
|
||
...chunk,
|
||
session_id: sessionId,
|
||
uuid: uuid || randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(msg));
|
||
}
|
||
}
|
||
|
||
return { shouldOutput: shouldOutputChunk, shouldAccumulate: true };
|
||
};
|
||
|
||
const result = await drainStreamWithResume(
|
||
stream,
|
||
buffers,
|
||
() => {},
|
||
undefined,
|
||
undefined,
|
||
streamJsonHook,
|
||
reminderContextTracker,
|
||
);
|
||
stopReason = result.stopReason;
|
||
approvals = result.approvals || [];
|
||
apiDurationMs = result.apiDurationMs;
|
||
lastRunId = result.lastRunId || null;
|
||
if (lastRunId) lastKnownRunId = lastRunId;
|
||
} else {
|
||
// Normal mode: use drainStreamWithResume
|
||
const result = await drainStreamWithResume(
|
||
stream,
|
||
buffers,
|
||
() => {}, // No UI refresh needed in headless mode
|
||
undefined,
|
||
undefined,
|
||
undefined,
|
||
reminderContextTracker,
|
||
);
|
||
stopReason = result.stopReason;
|
||
approvals = result.approvals || [];
|
||
apiDurationMs = result.apiDurationMs;
|
||
lastRunId = result.lastRunId || null;
|
||
if (lastRunId) lastKnownRunId = lastRunId;
|
||
}
|
||
|
||
// Track API duration for this stream
|
||
sessionStats.endTurn(apiDurationMs);
|
||
|
||
// Check max turns after each turn (server may have taken multiple steps)
|
||
checkMaxTurns();
|
||
|
||
if (approvalPendingRecovery) {
|
||
await resolveAllPendingApprovals();
|
||
continue;
|
||
}
|
||
|
||
// Case 1: Turn ended normally
|
||
if (stopReason === "end_turn") {
|
||
// Reset retry counters on success
|
||
llmApiErrorRetries = 0;
|
||
emptyResponseRetries = 0;
|
||
conversationBusyRetries = 0;
|
||
break;
|
||
}
|
||
|
||
// Case 2: Requires approval - batch process all approvals
|
||
if (stopReason === "requires_approval") {
|
||
if (approvals.length === 0) {
|
||
console.error("Unexpected empty approvals array");
|
||
process.exit(1);
|
||
}
|
||
|
||
// Phase 1: Collect decisions for all approvals
|
||
type Decision =
|
||
| {
|
||
type: "approve";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
}
|
||
| {
|
||
type: "deny";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
reason: string;
|
||
};
|
||
|
||
const { autoAllowed, autoDenied, needsUserInput } =
|
||
await classifyApprovals(approvals, {
|
||
alwaysRequiresUserInput: isInteractiveApprovalTool,
|
||
requireArgsForAutoApprove: true,
|
||
missingNameReason: "Tool call incomplete - missing name",
|
||
});
|
||
|
||
const decisions: Decision[] = [
|
||
...autoAllowed.map((ac) => ({
|
||
type: "approve" as const,
|
||
approval: ac.approval,
|
||
})),
|
||
...needsUserInput.map((ac) => {
|
||
// One-shot headless mode has no control channel for interactive
|
||
// approvals. Match Claude behavior by auto-allowing EnterPlanMode
|
||
// while denying tools that need runtime user responses.
|
||
if (isHeadlessAutoAllowTool(ac.approval.toolName)) {
|
||
return {
|
||
type: "approve" as const,
|
||
approval: ac.approval,
|
||
};
|
||
}
|
||
return {
|
||
type: "deny" as const,
|
||
approval: ac.approval,
|
||
reason: "Tool requires approval (headless mode)",
|
||
};
|
||
}),
|
||
...autoDenied.map((ac) => {
|
||
const fallback =
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? `Permission denied: ${ac.permission.matchedRule}`
|
||
: ac.permission.reason
|
||
? `Permission denied: ${ac.permission.reason}`
|
||
: "Permission denied: Unknown reason";
|
||
return {
|
||
type: "deny" as const,
|
||
approval: ac.approval,
|
||
reason: ac.denyReason ?? fallback,
|
||
};
|
||
}),
|
||
];
|
||
|
||
// Phase 2: Execute all approved tools and format results using shared function
|
||
const { executeApprovalBatch } = await import(
|
||
"./agent/approval-execution"
|
||
);
|
||
const executedResults = await executeApprovalBatch(
|
||
decisions,
|
||
undefined,
|
||
{
|
||
toolContextId: turnToolContextId ?? undefined,
|
||
},
|
||
);
|
||
|
||
// Send all results in one batch
|
||
currentInput = [
|
||
{
|
||
type: "approval",
|
||
approvals: executedResults as ApprovalResult[],
|
||
},
|
||
];
|
||
continue;
|
||
}
|
||
|
||
// Cache latest error text for this turn
|
||
let latestErrorText: string | null = null;
|
||
const linesForTurn = toLines(buffers);
|
||
for (let i = linesForTurn.length - 1; i >= 0; i -= 1) {
|
||
const line = linesForTurn[i];
|
||
if (
|
||
line?.kind === "error" &&
|
||
"text" in line &&
|
||
typeof line.text === "string"
|
||
) {
|
||
latestErrorText = line.text;
|
||
break;
|
||
}
|
||
}
|
||
|
||
// Fetch run error detail for invalid tool call ID detection
|
||
const detailFromRun = await fetchRunErrorDetail(lastRunId);
|
||
|
||
// Case 3: Transient LLM API error - retry with exponential backoff up to a limit
|
||
if (stopReason === "llm_api_error") {
|
||
if (llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES) {
|
||
const attempt = llmApiErrorRetries + 1;
|
||
const delayMs = getRetryDelayMs({
|
||
category: "transient_provider",
|
||
attempt,
|
||
detail: detailFromRun,
|
||
});
|
||
|
||
llmApiErrorRetries = attempt;
|
||
|
||
if (outputFormat === "stream-json") {
|
||
const retryMsg: RetryMessage = {
|
||
type: "retry",
|
||
reason: "llm_api_error",
|
||
attempt,
|
||
max_attempts: LLM_API_ERROR_MAX_RETRIES,
|
||
delay_ms: delayMs,
|
||
run_id: lastRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `retry-${lastRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(retryMsg));
|
||
} else {
|
||
const delaySeconds = Math.round(delayMs / 1000);
|
||
console.error(
|
||
`LLM API error encountered (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
|
||
);
|
||
}
|
||
|
||
// Exponential backoff before retrying the same input
|
||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||
|
||
continue;
|
||
}
|
||
}
|
||
|
||
// "Invalid tool call IDs" means server HAS pending approvals but with different IDs.
|
||
// Fetch the actual pending approvals and process them before retrying.
|
||
const invalidIdsDetected =
|
||
isInvalidToolCallIdsError(detailFromRun) ||
|
||
isInvalidToolCallIdsError(latestErrorText);
|
||
|
||
if (invalidIdsDetected) {
|
||
if (outputFormat === "stream-json") {
|
||
const recoveryMsg: RecoveryMessage = {
|
||
type: "recovery",
|
||
recovery_type: "invalid_tool_call_ids",
|
||
message:
|
||
"Tool call ID mismatch; fetching actual pending approvals and resyncing",
|
||
run_id: lastRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `recovery-${lastRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(recoveryMsg));
|
||
} else {
|
||
console.error(
|
||
"Tool call ID mismatch; fetching actual pending approvals...",
|
||
);
|
||
}
|
||
|
||
try {
|
||
// Fetch and process actual pending approvals from server
|
||
await resolveAllPendingApprovals();
|
||
// After processing, continue to next iteration (fresh state)
|
||
continue;
|
||
} catch {
|
||
// If fetch fails, exit with error
|
||
if (outputFormat === "stream-json") {
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: "Failed to fetch pending approvals for resync",
|
||
stop_reason: stopReason,
|
||
run_id: lastRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `error-${lastRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
} else {
|
||
console.error("Failed to fetch pending approvals for resync");
|
||
}
|
||
process.exit(1);
|
||
}
|
||
}
|
||
|
||
// Unexpected stop reason (error, llm_api_error, etc.)
|
||
// Before failing, check run metadata to see if this is a retriable error
|
||
// This handles cases where the backend sends a generic error stop_reason but the
|
||
// underlying cause is a transient LLM/network issue that should be retried
|
||
|
||
// Early exit for stop reasons that should never be retried
|
||
const nonRetriableReasons: StopReasonType[] = [
|
||
"cancelled",
|
||
"requires_approval",
|
||
"max_steps",
|
||
"max_tokens_exceeded",
|
||
"context_window_overflow_in_system_prompt",
|
||
"end_turn",
|
||
"tool_rule",
|
||
"no_tool_call",
|
||
];
|
||
if (nonRetriableReasons.includes(stopReason)) {
|
||
// Fall through to error display
|
||
} else if (lastRunId && llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES) {
|
||
try {
|
||
const run = await client.runs.retrieve(lastRunId);
|
||
const metaError = run.metadata?.error as
|
||
| {
|
||
error_type?: string;
|
||
message?: string;
|
||
detail?: string;
|
||
// Handle nested error structure (error.error) that can occur in some edge cases
|
||
error?: { error_type?: string; detail?: string };
|
||
}
|
||
| undefined;
|
||
|
||
// Check for llm_error at top level or nested (handles error.error nesting)
|
||
const errorType =
|
||
metaError?.error_type ?? metaError?.error?.error_type;
|
||
|
||
const detail = metaError?.detail ?? metaError?.error?.detail ?? "";
|
||
|
||
// Special handling for empty response errors (Opus 4.6 SADs)
|
||
// Empty LLM response retry (e.g. Opus 4.6 occasionally returns no content).
|
||
// Retry 1: same input unchanged. Retry 2: append system reminder nudging the model.
|
||
if (
|
||
isEmptyResponseRetryable(
|
||
errorType,
|
||
detail,
|
||
emptyResponseRetries,
|
||
EMPTY_RESPONSE_MAX_RETRIES,
|
||
)
|
||
) {
|
||
const attempt = emptyResponseRetries + 1;
|
||
const delayMs = getRetryDelayMs({
|
||
category: "empty_response",
|
||
attempt,
|
||
});
|
||
|
||
emptyResponseRetries = attempt;
|
||
|
||
// Only append a nudge on the last attempt
|
||
if (attempt >= EMPTY_RESPONSE_MAX_RETRIES) {
|
||
const nudgeMessage: MessageCreate = {
|
||
role: "system",
|
||
content: `<system-reminder>The previous response was empty. Please provide a response with either text content or a tool call.</system-reminder>`,
|
||
};
|
||
currentInput = [...currentInput, nudgeMessage];
|
||
}
|
||
|
||
if (outputFormat === "stream-json") {
|
||
const retryMsg: RetryMessage = {
|
||
type: "retry",
|
||
reason: "llm_api_error",
|
||
attempt,
|
||
max_attempts: EMPTY_RESPONSE_MAX_RETRIES,
|
||
delay_ms: delayMs,
|
||
run_id: lastRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `retry-empty-${lastRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(retryMsg));
|
||
} else {
|
||
console.error(
|
||
`Empty LLM response, retrying (attempt ${attempt} of ${EMPTY_RESPONSE_MAX_RETRIES})...`,
|
||
);
|
||
}
|
||
|
||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||
continue;
|
||
}
|
||
|
||
if (shouldRetryRunMetadataError(errorType, detail)) {
|
||
const attempt = llmApiErrorRetries + 1;
|
||
const delayMs = getRetryDelayMs({
|
||
category: "transient_provider",
|
||
attempt,
|
||
detail,
|
||
});
|
||
|
||
llmApiErrorRetries = attempt;
|
||
|
||
if (outputFormat === "stream-json") {
|
||
const retryMsg: RetryMessage = {
|
||
type: "retry",
|
||
reason: "llm_api_error",
|
||
attempt,
|
||
max_attempts: LLM_API_ERROR_MAX_RETRIES,
|
||
delay_ms: delayMs,
|
||
run_id: lastRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `retry-${lastRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(retryMsg));
|
||
} else {
|
||
const delaySeconds = Math.round(delayMs / 1000);
|
||
console.error(
|
||
`LLM API error encountered (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
|
||
);
|
||
}
|
||
|
||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||
continue;
|
||
}
|
||
} catch (_e) {
|
||
// If we can't fetch run metadata, fall through to normal error handling
|
||
}
|
||
}
|
||
|
||
// Mark incomplete tool calls as cancelled to prevent stuck state
|
||
markIncompleteToolsAsCancelled(buffers, true, "stream_error");
|
||
|
||
// Extract error details from buffers if available
|
||
const errorLines = toLines(buffers).filter(
|
||
(line) => line.kind === "error",
|
||
);
|
||
const errorMessages = errorLines
|
||
.map((line) => ("text" in line ? line.text : ""))
|
||
.filter(Boolean);
|
||
|
||
let errorMessage =
|
||
errorMessages.length > 0
|
||
? errorMessages.join("; ")
|
||
: `Unexpected stop reason: ${stopReason}`;
|
||
|
||
// Fetch detailed error from run metadata if available (same as TUI mode)
|
||
if (lastRunId && errorMessages.length === 0) {
|
||
try {
|
||
const run = await client.runs.retrieve(lastRunId);
|
||
if (run.metadata?.error) {
|
||
const errorData = run.metadata.error as {
|
||
type?: string;
|
||
message?: string;
|
||
detail?: string;
|
||
};
|
||
// Construct error object that formatErrorDetails can parse
|
||
const errorObject = {
|
||
error: {
|
||
error: errorData,
|
||
run_id: lastRunId,
|
||
},
|
||
};
|
||
errorMessage = formatErrorDetails(errorObject, agent.id);
|
||
}
|
||
} catch (_e) {
|
||
// If we can't fetch error details, append note to error message
|
||
errorMessage = `${errorMessage}\n(Unable to fetch additional error details from server)`;
|
||
}
|
||
}
|
||
|
||
if (outputFormat === "stream-json") {
|
||
// Emit error event
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: errorMessage,
|
||
stop_reason: stopReason,
|
||
run_id: lastRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `error-${lastRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
} else {
|
||
console.error(`Error: ${errorMessage}`);
|
||
}
|
||
process.exit(1);
|
||
}
|
||
} catch (error) {
|
||
// Mark incomplete tool calls as cancelled
|
||
markIncompleteToolsAsCancelled(buffers, true, "stream_error");
|
||
|
||
// Use comprehensive error formatting (same as TUI mode)
|
||
const errorDetails = formatErrorDetails(error, agent.id);
|
||
|
||
if (outputFormat === "stream-json") {
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: errorDetails,
|
||
stop_reason: "error",
|
||
run_id: lastKnownRunId ?? undefined,
|
||
session_id: sessionId,
|
||
uuid: `error-${lastKnownRunId || randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
} else {
|
||
console.error(`Error: ${errorDetails}`);
|
||
}
|
||
process.exit(1);
|
||
}
|
||
|
||
// Update stats with final usage data from buffers
|
||
sessionStats.updateUsageFromBuffers(buffers);
|
||
|
||
// Extract final result from transcript, with sensible fallbacks
|
||
const lines = toLines(buffers);
|
||
const reversed = [...lines].reverse();
|
||
|
||
const lastAssistant = reversed.find(
|
||
(line) =>
|
||
line.kind === "assistant" &&
|
||
"text" in line &&
|
||
typeof line.text === "string" &&
|
||
line.text.trim().length > 0,
|
||
) as Extract<Line, { kind: "assistant" }> | undefined;
|
||
|
||
const lastReasoning = reversed.find(
|
||
(line) =>
|
||
line.kind === "reasoning" &&
|
||
"text" in line &&
|
||
typeof line.text === "string" &&
|
||
line.text.trim().length > 0,
|
||
) as Extract<Line, { kind: "reasoning" }> | undefined;
|
||
|
||
const lastToolResult = reversed.find(
|
||
(line) =>
|
||
line.kind === "tool_call" &&
|
||
"resultText" in line &&
|
||
typeof (line as Extract<Line, { kind: "tool_call" }>).resultText ===
|
||
"string" &&
|
||
((line as Extract<Line, { kind: "tool_call" }>).resultText ?? "").trim()
|
||
.length > 0,
|
||
) as Extract<Line, { kind: "tool_call" }> | undefined;
|
||
|
||
const resultText =
|
||
lastAssistant?.text ||
|
||
lastReasoning?.text ||
|
||
lastToolResult?.resultText ||
|
||
"No assistant response found";
|
||
|
||
const stats = sessionStats.getSnapshot();
|
||
const usage = {
|
||
prompt_tokens: stats.usage.promptTokens,
|
||
completion_tokens: stats.usage.completionTokens,
|
||
total_tokens: stats.usage.totalTokens,
|
||
step_count: stats.usage.stepCount,
|
||
cached_input_tokens: stats.usage.cachedInputTokens,
|
||
cache_write_tokens: stats.usage.cacheWriteTokens,
|
||
reasoning_tokens: stats.usage.reasoningTokens,
|
||
...(stats.usage.contextTokens !== undefined && {
|
||
context_tokens: stats.usage.contextTokens,
|
||
}),
|
||
};
|
||
|
||
// Output based on format
|
||
if (outputFormat === "json") {
|
||
const output = {
|
||
type: "result",
|
||
subtype: "success",
|
||
is_error: false,
|
||
duration_ms: Math.round(stats.totalWallMs),
|
||
duration_api_ms: Math.round(stats.totalApiMs),
|
||
num_turns: stats.usage.stepCount,
|
||
result: resultText,
|
||
agent_id: agent.id,
|
||
conversation_id: conversationId,
|
||
usage,
|
||
};
|
||
console.log(JSON.stringify(output, null, 2));
|
||
} else if (outputFormat === "stream-json") {
|
||
// Output final result event
|
||
// Collect all run_ids from buffers
|
||
const allRunIds = new Set<string>();
|
||
for (const line of toLines(buffers)) {
|
||
// Extract run_id from any line that might have it
|
||
// This is a fallback in case we missed any during streaming
|
||
if ("run_id" in line && typeof line.run_id === "string") {
|
||
allRunIds.add(line.run_id);
|
||
}
|
||
}
|
||
|
||
// Use the last run_id as the result uuid if available, otherwise derive from agent_id
|
||
const resultUuid =
|
||
allRunIds.size > 0
|
||
? `result-${Array.from(allRunIds).pop()}`
|
||
: `result-${agent.id}`;
|
||
const resultEvent: ResultMessage = {
|
||
type: "result",
|
||
subtype: "success",
|
||
session_id: sessionId,
|
||
duration_ms: Math.round(stats.totalWallMs),
|
||
duration_api_ms: Math.round(stats.totalApiMs),
|
||
num_turns: stats.usage.stepCount,
|
||
result: resultText,
|
||
agent_id: agent.id,
|
||
conversation_id: conversationId,
|
||
run_ids: Array.from(allRunIds),
|
||
usage,
|
||
uuid: resultUuid,
|
||
};
|
||
console.log(JSON.stringify(resultEvent));
|
||
} else {
|
||
// text format (default)
|
||
if (!resultText || resultText === "No assistant response found") {
|
||
console.error("No assistant response found");
|
||
process.exit(1);
|
||
}
|
||
console.log(resultText);
|
||
}
|
||
|
||
// Report all milestones at the end for latency audit
|
||
markMilestone("HEADLESS_COMPLETE");
|
||
reportAllMilestones();
|
||
}
|
||
|
||
/**
|
||
* Bidirectional mode for SDK communication.
|
||
* Reads JSON messages from stdin, processes them, and outputs responses.
|
||
* Stays alive until stdin closes.
|
||
*/
|
||
async function runBidirectionalMode(
|
||
agent: AgentState,
|
||
conversationId: string,
|
||
client: Letta,
|
||
_outputFormat: string,
|
||
includePartialMessages: boolean,
|
||
availableTools: string[],
|
||
skillSources: SkillSource[],
|
||
systemInfoReminderEnabled: boolean,
|
||
reflectionSettings: ReflectionSettings,
|
||
): Promise<void> {
|
||
const sessionId = agent.id;
|
||
const readline = await import("node:readline");
|
||
|
||
// Emit init event
|
||
const initEvent = {
|
||
type: "system",
|
||
subtype: "init",
|
||
session_id: sessionId,
|
||
agent_id: agent.id,
|
||
conversation_id: conversationId,
|
||
model: agent.llm_config?.model,
|
||
tools: availableTools,
|
||
cwd: process.cwd(),
|
||
memfs_enabled: settingsManager.isMemfsEnabled(agent.id),
|
||
skill_sources: skillSources,
|
||
system_info_reminder_enabled: systemInfoReminderEnabled,
|
||
reflection_trigger: reflectionSettings.trigger,
|
||
reflection_step_count: reflectionSettings.stepCount,
|
||
uuid: `init-${agent.id}`,
|
||
};
|
||
console.log(JSON.stringify(initEvent));
|
||
|
||
// Track current operation for interrupt support
|
||
let currentAbortController: AbortController | null = null;
|
||
const reminderContextTracker = createContextTracker();
|
||
const sharedReminderState = createSharedReminderState();
|
||
const isSubagent = process.env.LETTA_CODE_AGENT_ROLE === "subagent";
|
||
|
||
// Resolve pending approvals for this conversation before retrying user input.
|
||
const resolveAllPendingApprovals = async () => {
|
||
const { getResumeData } = await import("./agent/check-approval");
|
||
while (true) {
|
||
// Re-fetch agent to get latest in-context messages (source of truth for backend)
|
||
const freshAgent = await client.agents.retrieve(agent.id);
|
||
|
||
let resume: Awaited<ReturnType<typeof getResumeData>>;
|
||
try {
|
||
resume = await getResumeData(client, freshAgent, conversationId);
|
||
} catch (error) {
|
||
// Treat 404/422 as "no approvals" - stale message/conversation state
|
||
if (
|
||
error instanceof APIError &&
|
||
(error.status === 404 || error.status === 422)
|
||
) {
|
||
break;
|
||
}
|
||
throw error;
|
||
}
|
||
|
||
const pendingApprovals = resume.pendingApprovals || [];
|
||
if (pendingApprovals.length === 0) break;
|
||
|
||
type Decision =
|
||
| {
|
||
type: "approve";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
reason: string;
|
||
matchedRule: string;
|
||
}
|
||
| {
|
||
type: "deny";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
reason: string;
|
||
};
|
||
|
||
const { autoAllowed, autoDenied } = await classifyApprovals(
|
||
pendingApprovals,
|
||
{
|
||
treatAskAsDeny: true,
|
||
denyReasonForAsk: "Tool requires approval (headless mode)",
|
||
requireArgsForAutoApprove: true,
|
||
missingNameReason: "Tool call incomplete - missing name",
|
||
},
|
||
);
|
||
|
||
const decisions: Decision[] = [
|
||
...autoAllowed.map((ac) => ({
|
||
type: "approve" as const,
|
||
approval: ac.approval,
|
||
reason: ac.permission.reason || "Allowed by permission rule",
|
||
matchedRule:
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? ac.permission.matchedRule
|
||
: "auto-approved",
|
||
})),
|
||
...autoDenied.map((ac) => {
|
||
const fallback =
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? `Permission denied: ${ac.permission.matchedRule}`
|
||
: ac.permission.reason
|
||
? `Permission denied: ${ac.permission.reason}`
|
||
: "Permission denied: Unknown reason";
|
||
return {
|
||
type: "deny" as const,
|
||
approval: ac.approval,
|
||
reason: ac.denyReason ?? fallback,
|
||
};
|
||
}),
|
||
];
|
||
|
||
const { executeApprovalBatch } = await import(
|
||
"./agent/approval-execution"
|
||
);
|
||
const executedResults = await executeApprovalBatch(decisions);
|
||
|
||
const approvalInput: ApprovalCreate = {
|
||
type: "approval",
|
||
approvals: executedResults as ApprovalResult[],
|
||
};
|
||
|
||
const approvalMessages: Array<
|
||
| import("@letta-ai/letta-client/resources/agents/agents").MessageCreate
|
||
| import("@letta-ai/letta-client/resources/agents/messages").ApprovalCreate
|
||
> = [approvalInput];
|
||
|
||
{
|
||
const { consumeQueuedSkillContent } = await import(
|
||
"./tools/impl/skillContentRegistry"
|
||
);
|
||
const skillContents = consumeQueuedSkillContent();
|
||
if (skillContents.length > 0) {
|
||
approvalMessages.push({
|
||
role: "user" as const,
|
||
content: skillContents.map((sc) => ({
|
||
type: "text" as const,
|
||
text: sc.content,
|
||
})),
|
||
});
|
||
}
|
||
}
|
||
|
||
const approvalStream = await sendMessageStream(
|
||
conversationId,
|
||
approvalMessages,
|
||
{ agentId: agent.id },
|
||
);
|
||
const drainResult = await drainStreamWithResume(
|
||
approvalStream,
|
||
createBuffers(agent.id),
|
||
() => {},
|
||
undefined,
|
||
undefined,
|
||
undefined,
|
||
reminderContextTracker,
|
||
);
|
||
if (
|
||
drainResult.stopReason === "error" ||
|
||
drainResult.stopReason === "cancelled"
|
||
) {
|
||
throw new Error(
|
||
`Approval drain ended with stop reason: ${drainResult.stopReason}`,
|
||
);
|
||
}
|
||
}
|
||
};
|
||
|
||
// Create readline interface for stdin
|
||
const rl = readline.createInterface({
|
||
input: process.stdin,
|
||
terminal: false,
|
||
});
|
||
|
||
// Create async iterator and line queue for permission callbacks
|
||
const lineQueue: string[] = [];
|
||
let lineResolver: ((line: string | null) => void) | null = null;
|
||
|
||
// ── Queue lifecycle tracking (stream-json only) ────────────────
|
||
// Bidirectional mode always runs under stream-json input format, so queue
|
||
// events are always emitted here. emitQueueEvent is a no-op guard retained
|
||
// for clarity and future-proofing against non-stream-json callers.
|
||
const emitQueueEvent = (e: QueueLifecycleEvent): void => {
|
||
console.log(JSON.stringify(e));
|
||
};
|
||
|
||
let turnInProgress = false;
|
||
|
||
const msgQueueRuntime = new QueueRuntime({
|
||
callbacks: {
|
||
onEnqueued: (item, queueLen) =>
|
||
emitQueueEvent({
|
||
type: "queue_item_enqueued",
|
||
item_id: item.id,
|
||
client_message_id: item.clientMessageId ?? `cm-${item.id}`,
|
||
source: item.source,
|
||
kind: item.kind,
|
||
queue_len: queueLen,
|
||
session_id: sessionId,
|
||
uuid: `q-enq-${item.id}`,
|
||
}),
|
||
onDequeued: (batch) =>
|
||
emitQueueEvent({
|
||
type: "queue_batch_dequeued",
|
||
batch_id: batch.batchId,
|
||
item_ids: batch.items.map((i) => i.id),
|
||
merged_count: batch.mergedCount,
|
||
queue_len_after: batch.queueLenAfter,
|
||
session_id: sessionId,
|
||
uuid: `q-deq-${batch.batchId}`,
|
||
}),
|
||
onCleared: (reason, clearedCount) =>
|
||
emitQueueEvent({
|
||
type: "queue_cleared",
|
||
reason,
|
||
cleared_count: clearedCount,
|
||
session_id: sessionId,
|
||
uuid: `q-clr-${randomUUID()}`,
|
||
}),
|
||
},
|
||
});
|
||
|
||
/**
|
||
* Parses a raw JSON line and returns the queue item payload if it is a
|
||
* user message or task_notification. Returns null for control lines
|
||
* (control_request, control_response, etc.) and malformed JSON.
|
||
*/
|
||
function parseUserLine(raw: string): {
|
||
kind: "message" | "task_notification";
|
||
content: string;
|
||
} | null {
|
||
if (!raw.trim()) return null;
|
||
try {
|
||
const parsed: {
|
||
type?: string;
|
||
message?: { content?: string };
|
||
_queuedKind?: string;
|
||
} = JSON.parse(raw);
|
||
if (parsed.type !== "user" || parsed.message?.content === undefined)
|
||
return null;
|
||
const kind =
|
||
parsed._queuedKind === "task_notification"
|
||
? "task_notification"
|
||
: "message";
|
||
return { kind, content: parsed.message.content };
|
||
} catch {
|
||
return null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Emit queue_blocked on the FIRST user/task line arrival during an active
|
||
* turn. Does NOT enqueue to msgQueueRuntime — that happens later, at the
|
||
* coalescing loop where consumption is certain (avoids orphaned items from
|
||
* the external-tool wait loop which drops non-matching lines silently).
|
||
*/
|
||
let blockedEmittedThisTurn = false;
|
||
function maybeNotifyBlocked(raw: string): void {
|
||
if (!turnInProgress || blockedEmittedThisTurn) return;
|
||
if (!parseUserLine(raw)) return;
|
||
blockedEmittedThisTurn = true;
|
||
// queue_len: count user/task items currently in lineQueue (best-effort)
|
||
const queueLen = lineQueue.filter((l) => parseUserLine(l) !== null).length;
|
||
emitQueueEvent({
|
||
type: "queue_blocked",
|
||
reason: "runtime_busy",
|
||
queue_len: Math.max(1, queueLen),
|
||
session_id: sessionId,
|
||
uuid: `q-blk-${randomUUID()}`,
|
||
});
|
||
}
|
||
|
||
/** Enqueue a BidirectionalQueuedInput into msgQueueRuntime for lifecycle tracking. */
|
||
function enqueueForTracking(input: BidirectionalQueuedInput): void {
|
||
if (input.kind === "task_notification") {
|
||
msgQueueRuntime.enqueue({
|
||
kind: "task_notification",
|
||
source: "task_notification",
|
||
text: input.text,
|
||
} as Parameters<typeof msgQueueRuntime.enqueue>[0]);
|
||
} else {
|
||
msgQueueRuntime.enqueue({
|
||
kind: "message",
|
||
source: "user",
|
||
content: input.content,
|
||
} as Parameters<typeof msgQueueRuntime.enqueue>[0]);
|
||
}
|
||
}
|
||
|
||
const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) =>
|
||
JSON.stringify({
|
||
type: "user",
|
||
message: {
|
||
role: "user",
|
||
content: queuedMessage.text,
|
||
},
|
||
_queuedKind: queuedMessage.kind,
|
||
});
|
||
|
||
// Connect Task/subagent background notifications to the same queueing path
|
||
// used by user input so bidirectional mode inherits TUI-style queue behavior.
|
||
setMessageQueueAdder((queuedMessage) => {
|
||
const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage);
|
||
maybeNotifyBlocked(syntheticUserLine);
|
||
if (lineResolver) {
|
||
const resolve = lineResolver;
|
||
lineResolver = null;
|
||
resolve(syntheticUserLine);
|
||
return;
|
||
}
|
||
lineQueue.push(syntheticUserLine);
|
||
});
|
||
|
||
// Feed lines into queue or resolver
|
||
rl.on("line", (line) => {
|
||
maybeNotifyBlocked(line);
|
||
if (lineResolver) {
|
||
const resolve = lineResolver;
|
||
lineResolver = null;
|
||
resolve(line);
|
||
} else {
|
||
lineQueue.push(line);
|
||
}
|
||
});
|
||
|
||
rl.on("close", () => {
|
||
setMessageQueueAdder(null);
|
||
msgQueueRuntime.clear("shutdown");
|
||
if (lineResolver) {
|
||
const resolve = lineResolver;
|
||
lineResolver = null;
|
||
resolve(null);
|
||
}
|
||
});
|
||
|
||
// Helper to get next line (from queue or wait)
|
||
async function getNextLine(): Promise<string | null> {
|
||
if (lineQueue.length > 0) {
|
||
return lineQueue.shift() ?? null;
|
||
}
|
||
return new Promise<string | null>((resolve) => {
|
||
lineResolver = resolve;
|
||
});
|
||
}
|
||
|
||
// Helper to send permission request and wait for response
|
||
// Uses Claude SDK's control_request/control_response format for compatibility
|
||
async function requestPermission(
|
||
toolCallId: string,
|
||
toolName: string,
|
||
toolInput: Record<string, unknown>,
|
||
): Promise<{
|
||
decision: "allow" | "deny";
|
||
reason?: string;
|
||
updatedInput?: Record<string, unknown> | null;
|
||
}> {
|
||
const requestId = `perm-${toolCallId}`;
|
||
|
||
// Compute diff previews for file-modifying tools
|
||
const diffs = await computeDiffPreviews(toolName, toolInput);
|
||
|
||
// Build can_use_tool control request (Claude SDK format)
|
||
const canUseToolRequest: CanUseToolControlRequest = {
|
||
subtype: "can_use_tool",
|
||
tool_name: toolName,
|
||
input: toolInput,
|
||
tool_call_id: toolCallId, // Letta-specific
|
||
permission_suggestions: [], // TODO: not implemented
|
||
blocked_path: null, // TODO: not implemented
|
||
...(diffs.length > 0 ? { diffs } : {}),
|
||
};
|
||
|
||
const controlRequest: ControlRequest = {
|
||
type: "control_request",
|
||
request_id: requestId,
|
||
request: canUseToolRequest,
|
||
};
|
||
|
||
console.log(JSON.stringify(controlRequest));
|
||
|
||
const deferredLines: string[] = [];
|
||
|
||
// Wait for control_response
|
||
let result: {
|
||
decision: "allow" | "deny";
|
||
reason?: string;
|
||
updatedInput?: Record<string, unknown> | null;
|
||
} | null = null;
|
||
|
||
while (result === null) {
|
||
const line = await getNextLine();
|
||
if (line === null) {
|
||
result = { decision: "deny", reason: "stdin closed" };
|
||
break;
|
||
}
|
||
if (!line.trim()) continue;
|
||
|
||
try {
|
||
const msg = JSON.parse(line);
|
||
if (
|
||
msg.type === "control_response" &&
|
||
msg.response?.request_id === requestId
|
||
) {
|
||
// Parse the can_use_tool response
|
||
const response = msg.response?.response as
|
||
| CanUseToolResponse
|
||
| undefined;
|
||
if (!response) {
|
||
result = { decision: "deny", reason: "Invalid response format" };
|
||
break;
|
||
}
|
||
|
||
if (response.behavior === "allow") {
|
||
result = {
|
||
decision: "allow",
|
||
updatedInput: response.updatedInput,
|
||
};
|
||
} else {
|
||
result = {
|
||
decision: "deny",
|
||
reason: response.message,
|
||
// TODO: handle interrupt flag
|
||
};
|
||
}
|
||
break;
|
||
}
|
||
|
||
// Defer other messages for the main loop without re-reading them.
|
||
deferredLines.push(line);
|
||
} catch {
|
||
// Defer parse errors so the main loop can surface them.
|
||
deferredLines.push(line);
|
||
}
|
||
}
|
||
|
||
if (deferredLines.length > 0) {
|
||
lineQueue.unshift(...deferredLines);
|
||
}
|
||
|
||
return result;
|
||
}
|
||
|
||
async function recoverPendingApprovalsFromControlRequest(
|
||
request: RecoverPendingApprovalsControlRequest,
|
||
): Promise<{
|
||
recovered: boolean;
|
||
pending_approval: boolean;
|
||
approvals_processed: number;
|
||
}> {
|
||
const targetAgentId = request.agent_id ?? agent.id;
|
||
const targetConversationId = request.conversation_id ?? conversationId;
|
||
|
||
if (targetAgentId !== agent.id) {
|
||
throw new Error(
|
||
`recover_pending_approvals agent mismatch: ${targetAgentId} != ${agent.id}`,
|
||
);
|
||
}
|
||
|
||
const { getResumeData } = await import("./agent/check-approval");
|
||
const { executeApprovalBatch } = await import("./agent/approval-execution");
|
||
|
||
let approvalsProcessed = 0;
|
||
const MAX_RECOVERY_PASSES = 8;
|
||
|
||
for (let pass = 0; pass < MAX_RECOVERY_PASSES; pass += 1) {
|
||
const freshAgent = await client.agents.retrieve(agent.id);
|
||
|
||
let resume: Awaited<ReturnType<typeof getResumeData>>;
|
||
try {
|
||
resume = await getResumeData(client, freshAgent, targetConversationId, {
|
||
includeMessageHistory: false,
|
||
});
|
||
} catch (error) {
|
||
if (
|
||
error instanceof APIError &&
|
||
(error.status === 404 || error.status === 422)
|
||
) {
|
||
return {
|
||
recovered: true,
|
||
pending_approval: false,
|
||
approvals_processed: approvalsProcessed,
|
||
};
|
||
}
|
||
throw error;
|
||
}
|
||
|
||
const pendingApprovals = resume.pendingApprovals || [];
|
||
if (pendingApprovals.length === 0) {
|
||
return {
|
||
recovered: true,
|
||
pending_approval: false,
|
||
approvals_processed: approvalsProcessed,
|
||
};
|
||
}
|
||
|
||
const { autoAllowed, autoDenied, needsUserInput } =
|
||
await classifyApprovals(pendingApprovals, {
|
||
alwaysRequiresUserInput: isInteractiveApprovalTool,
|
||
requireArgsForAutoApprove: true,
|
||
missingNameReason: "Tool call incomplete - missing name",
|
||
});
|
||
|
||
const decisions: ApprovalDecision[] = [
|
||
...autoAllowed.map((ac) => ({
|
||
type: "approve" as const,
|
||
approval: ac.approval,
|
||
})),
|
||
...autoDenied.map((ac) => ({
|
||
type: "deny" as const,
|
||
approval: ac.approval,
|
||
reason: ac.denyReason || ac.permission.reason || "Permission denied",
|
||
})),
|
||
];
|
||
|
||
for (const ac of needsUserInput) {
|
||
const permResponse = await requestPermission(
|
||
ac.approval.toolCallId,
|
||
ac.approval.toolName,
|
||
ac.parsedArgs,
|
||
);
|
||
|
||
if (permResponse.decision === "allow") {
|
||
const finalApproval = permResponse.updatedInput
|
||
? {
|
||
...ac.approval,
|
||
toolArgs: JSON.stringify(permResponse.updatedInput),
|
||
}
|
||
: ac.approval;
|
||
decisions.push({ type: "approve", approval: finalApproval });
|
||
} else {
|
||
decisions.push({
|
||
type: "deny",
|
||
approval: ac.approval,
|
||
reason: permResponse.reason || "Denied by SDK callback",
|
||
});
|
||
}
|
||
}
|
||
|
||
if (decisions.length === 0) {
|
||
return {
|
||
recovered: false,
|
||
pending_approval: true,
|
||
approvals_processed: approvalsProcessed,
|
||
};
|
||
}
|
||
|
||
const executedResults = await executeApprovalBatch(decisions);
|
||
approvalsProcessed += executedResults.length;
|
||
|
||
const approvalInput: ApprovalCreate = {
|
||
type: "approval",
|
||
approvals: executedResults as ApprovalResult[],
|
||
};
|
||
const approvalStream = await sendMessageStream(
|
||
targetConversationId,
|
||
[approvalInput],
|
||
{ agentId: agent.id },
|
||
);
|
||
|
||
const drainResult = await drainStreamWithResume(
|
||
approvalStream,
|
||
createBuffers(agent.id),
|
||
() => {},
|
||
undefined,
|
||
undefined,
|
||
undefined,
|
||
reminderContextTracker,
|
||
);
|
||
|
||
if (drainResult.stopReason === "error") {
|
||
throw new Error(
|
||
drainResult.fallbackError ||
|
||
"recover_pending_approvals failed while applying approvals",
|
||
);
|
||
}
|
||
}
|
||
|
||
return {
|
||
recovered: false,
|
||
pending_approval: true,
|
||
approvals_processed: approvalsProcessed,
|
||
};
|
||
}
|
||
|
||
// Main processing loop
|
||
while (true) {
|
||
const line = await getNextLine();
|
||
if (line === null) break; // stdin closed
|
||
if (!line.trim()) continue;
|
||
|
||
let message: {
|
||
type: string;
|
||
message?: { role: string; content: MessageCreate["content"] };
|
||
request_id?: string;
|
||
request?: { subtype: string };
|
||
session_id?: string;
|
||
};
|
||
|
||
try {
|
||
message = JSON.parse(line);
|
||
} catch {
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: "Invalid JSON input",
|
||
stop_reason: "error",
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
continue;
|
||
}
|
||
|
||
// Handle control requests
|
||
if (message.type === "control_request") {
|
||
const subtype = message.request?.subtype;
|
||
const requestId = message.request_id;
|
||
|
||
if (subtype === "initialize") {
|
||
// Return session info
|
||
const initResponse: ControlResponse = {
|
||
type: "control_response",
|
||
response: {
|
||
subtype: "success",
|
||
request_id: requestId ?? "",
|
||
response: {
|
||
agent_id: agent.id,
|
||
model: agent.llm_config?.model,
|
||
tools: availableTools,
|
||
memfs_enabled: settingsManager.isMemfsEnabled(agent.id),
|
||
skill_sources: skillSources,
|
||
system_info_reminder_enabled: systemInfoReminderEnabled,
|
||
reflection_trigger: reflectionSettings.trigger,
|
||
reflection_step_count: reflectionSettings.stepCount,
|
||
},
|
||
},
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(initResponse));
|
||
} else if (subtype === "interrupt") {
|
||
// Abort current operation if any
|
||
if (currentAbortController !== null) {
|
||
(currentAbortController as AbortController).abort();
|
||
currentAbortController = null;
|
||
}
|
||
const interruptResponse: ControlResponse = {
|
||
type: "control_response",
|
||
response: {
|
||
subtype: "success",
|
||
request_id: requestId ?? "",
|
||
},
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(interruptResponse));
|
||
} else if (subtype === "register_external_tools") {
|
||
// Register external tools from SDK
|
||
const toolsRequest = message.request as {
|
||
tools?: ExternalToolDefinition[];
|
||
};
|
||
const tools = toolsRequest.tools ?? [];
|
||
|
||
registerExternalTools(tools);
|
||
|
||
// Set up the external tool executor to send requests back to SDK
|
||
setExternalToolExecutor(async (toolCallId, toolName, input) => {
|
||
// Send execute_external_tool request to SDK
|
||
const execRequest: ControlRequest = {
|
||
type: "control_request",
|
||
request_id: `ext-${toolCallId}`,
|
||
request: {
|
||
subtype: "execute_external_tool",
|
||
tool_call_id: toolCallId,
|
||
tool_name: toolName,
|
||
input,
|
||
} as unknown as CanUseToolControlRequest, // Type cast for compatibility
|
||
};
|
||
console.log(JSON.stringify(execRequest));
|
||
|
||
// Wait for external_tool_result response
|
||
while (true) {
|
||
const line = await getNextLine();
|
||
if (line === null) {
|
||
return {
|
||
content: [{ type: "text", text: "stdin closed" }],
|
||
isError: true,
|
||
};
|
||
}
|
||
if (!line.trim()) continue;
|
||
|
||
try {
|
||
const msg = JSON.parse(line);
|
||
if (
|
||
msg.type === "control_response" &&
|
||
msg.response?.subtype === "external_tool_result" &&
|
||
msg.response?.tool_call_id === toolCallId
|
||
) {
|
||
return {
|
||
content: msg.response.content ?? [{ type: "text", text: "" }],
|
||
isError: msg.response.is_error ?? false,
|
||
};
|
||
}
|
||
} catch {
|
||
// Ignore parse errors, keep waiting
|
||
}
|
||
}
|
||
});
|
||
|
||
const registerResponse: ControlResponse = {
|
||
type: "control_response",
|
||
response: {
|
||
subtype: "success",
|
||
request_id: requestId ?? "",
|
||
response: { registered: tools.length },
|
||
},
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(registerResponse));
|
||
} else if (subtype === "bootstrap_session_state") {
|
||
const bootstrapReq = message.request as BootstrapSessionStateRequest;
|
||
const { getResumeData } = await import("./agent/check-approval");
|
||
let hasPendingApproval = false;
|
||
|
||
try {
|
||
// Re-fetch for parity with approval checks elsewhere in headless mode.
|
||
const freshAgent = await client.agents.retrieve(agent.id);
|
||
const resume = await getResumeData(
|
||
client,
|
||
freshAgent,
|
||
conversationId,
|
||
{
|
||
includeMessageHistory: false,
|
||
},
|
||
);
|
||
hasPendingApproval = (resume.pendingApprovals?.length ?? 0) > 0;
|
||
} catch (error) {
|
||
// Keep bootstrap non-fatal if approval probe fails on stale resources.
|
||
if (
|
||
!(error instanceof APIError) ||
|
||
(error.status !== 404 && error.status !== 422)
|
||
) {
|
||
console.warn(
|
||
`[bootstrap] pending-approval probe failed: ${error instanceof Error ? error.message : String(error)}`,
|
||
);
|
||
}
|
||
}
|
||
|
||
const bootstrapResp = await handleBootstrapSessionState({
|
||
bootstrapReq,
|
||
sessionContext: {
|
||
agentId: agent.id,
|
||
conversationId,
|
||
model: agent.llm_config?.model,
|
||
tools: availableTools,
|
||
memfsEnabled: settingsManager.isMemfsEnabled(agent.id),
|
||
sessionId,
|
||
},
|
||
requestId: requestId ?? "",
|
||
client,
|
||
hasPendingApproval,
|
||
});
|
||
console.log(JSON.stringify(bootstrapResp));
|
||
} else if (subtype === "list_messages") {
|
||
const listReq = message.request as ListMessagesControlRequest;
|
||
const listResp = await handleListMessages({
|
||
listReq,
|
||
sessionConversationId: conversationId,
|
||
sessionAgentId: agent.id,
|
||
sessionId,
|
||
requestId: requestId ?? "",
|
||
client,
|
||
});
|
||
console.log(JSON.stringify(listResp));
|
||
} else if (subtype === "recover_pending_approvals") {
|
||
const recoverReq =
|
||
message.request as RecoverPendingApprovalsControlRequest;
|
||
try {
|
||
const recovery =
|
||
await recoverPendingApprovalsFromControlRequest(recoverReq);
|
||
const recoveryResponse: ControlResponse = {
|
||
type: "control_response",
|
||
response: {
|
||
subtype: "success",
|
||
request_id: requestId ?? "",
|
||
response: recovery,
|
||
},
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(recoveryResponse));
|
||
} catch (error) {
|
||
const recoveryError: ControlResponse = {
|
||
type: "control_response",
|
||
response: {
|
||
subtype: "error",
|
||
request_id: requestId ?? "",
|
||
error: error instanceof Error ? error.message : String(error),
|
||
},
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(recoveryError));
|
||
}
|
||
} else {
|
||
const errorResponse: ControlResponse = {
|
||
type: "control_response",
|
||
response: {
|
||
subtype: "error",
|
||
request_id: requestId ?? "",
|
||
error: `Unknown control request subtype: ${subtype}`,
|
||
},
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(errorResponse));
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// Handle user messages
|
||
if (message.type === "user" && message.message?.content !== undefined) {
|
||
const queuedInputs: BidirectionalQueuedInput[] = [
|
||
{
|
||
kind: "user",
|
||
content: message.message.content,
|
||
},
|
||
];
|
||
|
||
// Batch any already-buffered user lines into the same turn, mirroring
|
||
// TUI queue dequeue behavior (single coalesced submit when idle).
|
||
while (lineQueue.length > 0) {
|
||
const candidate = lineQueue[0];
|
||
if (!candidate?.trim()) {
|
||
lineQueue.shift();
|
||
continue;
|
||
}
|
||
|
||
let parsedCandidate: {
|
||
type?: string;
|
||
message?: { content?: MessageCreate["content"] };
|
||
_queuedKind?: QueuedMessage["kind"];
|
||
};
|
||
try {
|
||
parsedCandidate = JSON.parse(candidate);
|
||
} catch {
|
||
// Leave malformed lines for the main loop to surface as parse errors.
|
||
break;
|
||
}
|
||
|
||
if (
|
||
parsedCandidate.type === "user" &&
|
||
parsedCandidate.message?.content !== undefined
|
||
) {
|
||
lineQueue.shift();
|
||
if (parsedCandidate._queuedKind === "task_notification") {
|
||
const notificationText =
|
||
typeof parsedCandidate.message.content === "string"
|
||
? parsedCandidate.message.content
|
||
: parsedCandidate.message.content
|
||
.reduce((texts: string[], part) => {
|
||
if (
|
||
part.type === "text" &&
|
||
"text" in part &&
|
||
typeof part.text === "string"
|
||
) {
|
||
texts.push(part.text);
|
||
}
|
||
return texts;
|
||
}, [])
|
||
.join("");
|
||
queuedInputs.push({
|
||
kind: "task_notification",
|
||
text: notificationText,
|
||
});
|
||
} else {
|
||
queuedInputs.push({
|
||
kind: "user",
|
||
content: parsedCandidate.message.content,
|
||
});
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// Stop coalescing when the queue head is not a user-input line.
|
||
// The outer loop must process control/error/system lines in-order.
|
||
break;
|
||
}
|
||
|
||
// Enqueue consumed items into msgQueueRuntime for lifecycle tracking.
|
||
// Done here (not at arrival) to avoid orphaned items from the external-
|
||
// tool wait loop, which consumes non-matching lines via getNextLine()
|
||
// without deferring them back to lineQueue.
|
||
for (const input of queuedInputs) {
|
||
enqueueForTracking(input);
|
||
}
|
||
// Signal dequeue for exactly the items we just enqueued. consumeItems(n)
|
||
// bypasses QueueRuntime's internal coalescing policy so the count matches
|
||
// what the coalescing loop actually yielded.
|
||
msgQueueRuntime.consumeItems(queuedInputs.length);
|
||
|
||
const userContent = mergeBidirectionalQueuedInput(queuedInputs);
|
||
if (userContent === null) {
|
||
continue;
|
||
}
|
||
|
||
// Create abort controller for this operation
|
||
currentAbortController = new AbortController();
|
||
|
||
turnInProgress = true;
|
||
try {
|
||
const buffers = createBuffers(agent.id);
|
||
const startTime = performance.now();
|
||
let numTurns = 0;
|
||
let lastStopReason: StopReasonType | null = null; // Track for result subtype
|
||
let sawStreamError = false; // Track if we emitted an error during streaming
|
||
let preStreamTransientRetries = 0;
|
||
|
||
syncReminderStateFromContextTracker(
|
||
sharedReminderState,
|
||
reminderContextTracker,
|
||
);
|
||
const lastRunAt = (agent as { last_run_completion?: string })
|
||
.last_run_completion;
|
||
const { parts: sharedReminderParts } = await buildSharedReminderParts({
|
||
mode: isSubagent ? "subagent" : "headless-bidirectional",
|
||
agent: {
|
||
id: agent.id,
|
||
name: agent.name,
|
||
description: agent.description,
|
||
lastRunAt: lastRunAt ?? null,
|
||
},
|
||
state: sharedReminderState,
|
||
sessionContextReminderEnabled: systemInfoReminderEnabled,
|
||
reflectionSettings,
|
||
skillSources,
|
||
resolvePlanModeReminder: async () => {
|
||
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
|
||
return PLAN_MODE_REMINDER;
|
||
},
|
||
});
|
||
const enrichedContent = prependReminderPartsToContent(
|
||
userContent,
|
||
sharedReminderParts,
|
||
);
|
||
|
||
// Initial input is the user message
|
||
let currentInput: MessageCreate[] = [
|
||
{ role: "user", content: enrichedContent },
|
||
];
|
||
|
||
// Approval handling loop - continue until end_turn or error
|
||
while (true) {
|
||
numTurns++;
|
||
|
||
// Check if aborted
|
||
if (currentAbortController?.signal.aborted) {
|
||
break;
|
||
}
|
||
|
||
// Inject queued skill content as user message parts (LET-7353)
|
||
{
|
||
const { consumeQueuedSkillContent } = await import(
|
||
"./tools/impl/skillContentRegistry"
|
||
);
|
||
const skillContents = consumeQueuedSkillContent();
|
||
if (skillContents.length > 0) {
|
||
currentInput = [
|
||
...currentInput,
|
||
{
|
||
role: "user" as const,
|
||
content: skillContents.map((sc) => ({
|
||
type: "text" as const,
|
||
text: sc.content,
|
||
})),
|
||
},
|
||
];
|
||
}
|
||
}
|
||
|
||
// Send message to agent.
|
||
// Wrap in try-catch to handle pre-stream 409 approval-pending errors.
|
||
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
|
||
let turnToolContextId: string | null = null;
|
||
try {
|
||
stream = await sendMessageStream(conversationId, currentInput, {
|
||
agentId: agent.id,
|
||
});
|
||
turnToolContextId = getStreamToolContextId(stream);
|
||
} catch (preStreamError) {
|
||
// Extract error detail using shared helper (handles nested/direct/message shapes)
|
||
const errorDetail = extractConflictDetail(preStreamError);
|
||
|
||
// Route through shared pre-stream conflict classifier (parity with main loop + TUI)
|
||
// Bidir mode has no conversation-busy retry budget, so pass 0/0 to disable busy-retry.
|
||
const preStreamAction = getPreStreamErrorAction(errorDetail, 0, 0, {
|
||
status:
|
||
preStreamError instanceof APIError
|
||
? preStreamError.status
|
||
: undefined,
|
||
transientRetries: preStreamTransientRetries,
|
||
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
|
||
});
|
||
|
||
if (preStreamAction === "resolve_approval_pending") {
|
||
const recoveryMsg: RecoveryMessage = {
|
||
type: "recovery",
|
||
recovery_type: "approval_pending",
|
||
message:
|
||
"Detected pending approval conflict on send; resolving before retry",
|
||
session_id: sessionId,
|
||
uuid: `recovery-bidir-${randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(recoveryMsg));
|
||
await resolveAllPendingApprovals();
|
||
continue;
|
||
}
|
||
|
||
if (preStreamAction === "retry_transient") {
|
||
const attempt = preStreamTransientRetries + 1;
|
||
const retryAfterMs =
|
||
preStreamError instanceof APIError
|
||
? parseRetryAfterHeaderMs(
|
||
preStreamError.headers?.get("retry-after"),
|
||
)
|
||
: null;
|
||
const delayMs = getRetryDelayMs({
|
||
category: "transient_provider",
|
||
attempt,
|
||
detail: errorDetail,
|
||
retryAfterMs,
|
||
});
|
||
preStreamTransientRetries = attempt;
|
||
|
||
const retryMsg: RetryMessage = {
|
||
type: "retry",
|
||
reason: "llm_api_error",
|
||
attempt,
|
||
max_attempts: LLM_API_ERROR_MAX_RETRIES,
|
||
delay_ms: delayMs,
|
||
session_id: sessionId,
|
||
uuid: `retry-bidir-${randomUUID()}`,
|
||
};
|
||
console.log(JSON.stringify(retryMsg));
|
||
|
||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||
continue;
|
||
}
|
||
|
||
throw preStreamError;
|
||
}
|
||
preStreamTransientRetries = 0;
|
||
const streamJsonHook: DrainStreamHook = ({
|
||
chunk,
|
||
shouldOutput,
|
||
errorInfo,
|
||
}) => {
|
||
// Handle in-stream errors (emit ErrorMessage with full details)
|
||
if (errorInfo && shouldOutput) {
|
||
sawStreamError = true; // Track that we saw an error (affects result subtype)
|
||
const errorEvent: ErrorMessage = {
|
||
type: "error",
|
||
message: errorInfo.message,
|
||
stop_reason: "error",
|
||
run_id: errorInfo.run_id,
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
...(errorInfo.error_type &&
|
||
errorInfo.run_id && {
|
||
api_error: {
|
||
message_type: "error_message",
|
||
message: errorInfo.message,
|
||
error_type: errorInfo.error_type,
|
||
detail: errorInfo.detail,
|
||
run_id: errorInfo.run_id,
|
||
},
|
||
}),
|
||
};
|
||
console.log(JSON.stringify(errorEvent));
|
||
return { shouldAccumulate: true };
|
||
}
|
||
|
||
if (!shouldOutput) {
|
||
return { shouldAccumulate: true };
|
||
}
|
||
|
||
const chunkWithIds = chunk as typeof chunk & {
|
||
otid?: string;
|
||
id?: string;
|
||
};
|
||
const uuid = chunkWithIds.otid || chunkWithIds.id;
|
||
|
||
if (includePartialMessages) {
|
||
const streamEvent: StreamEvent = {
|
||
type: "stream_event",
|
||
event: chunk,
|
||
session_id: sessionId,
|
||
uuid: uuid || randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(streamEvent));
|
||
} else {
|
||
const msg: MessageWire = {
|
||
type: "message",
|
||
...chunk,
|
||
session_id: sessionId,
|
||
uuid: uuid || randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(msg));
|
||
}
|
||
|
||
return { shouldAccumulate: true };
|
||
};
|
||
|
||
const result = await drainStreamWithResume(
|
||
stream,
|
||
buffers,
|
||
() => {},
|
||
currentAbortController?.signal,
|
||
undefined,
|
||
streamJsonHook,
|
||
reminderContextTracker,
|
||
);
|
||
const stopReason = result.stopReason;
|
||
lastStopReason = stopReason; // Track for result subtype
|
||
const approvals = result.approvals || [];
|
||
|
||
// Case 1: Turn ended normally - break out of loop
|
||
if (stopReason === "end_turn") {
|
||
break;
|
||
}
|
||
|
||
// Case 2: Aborted - break out of loop
|
||
if (
|
||
currentAbortController?.signal.aborted ||
|
||
stopReason === "cancelled"
|
||
) {
|
||
break;
|
||
}
|
||
|
||
// Case 3: Requires approval - process approvals and continue
|
||
if (stopReason === "requires_approval") {
|
||
if (approvals.length === 0) {
|
||
// Anomalous state: requires_approval but no approvals
|
||
// Treat as error rather than false-positive success
|
||
lastStopReason = "error";
|
||
break;
|
||
}
|
||
|
||
// Check permissions and collect decisions
|
||
type Decision =
|
||
| {
|
||
type: "approve";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
matchedRule: string;
|
||
}
|
||
| {
|
||
type: "deny";
|
||
approval: {
|
||
toolCallId: string;
|
||
toolName: string;
|
||
toolArgs: string;
|
||
};
|
||
reason: string;
|
||
};
|
||
|
||
const { autoAllowed, autoDenied, needsUserInput } =
|
||
await classifyApprovals(approvals, {
|
||
alwaysRequiresUserInput: isInteractiveApprovalTool,
|
||
requireArgsForAutoApprove: true,
|
||
missingNameReason: "Tool call incomplete - missing name",
|
||
});
|
||
|
||
const decisions: Decision[] = [
|
||
...autoAllowed.map((ac) => ({
|
||
type: "approve" as const,
|
||
approval: ac.approval,
|
||
matchedRule:
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? ac.permission.matchedRule
|
||
: "auto-approved",
|
||
})),
|
||
...autoDenied.map((ac) => {
|
||
const fallback =
|
||
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||
? `Permission denied: ${ac.permission.matchedRule}`
|
||
: ac.permission.reason
|
||
? `Permission denied: ${ac.permission.reason}`
|
||
: "Permission denied: Unknown reason";
|
||
return {
|
||
type: "deny" as const,
|
||
approval: ac.approval,
|
||
reason: ac.denyReason ?? fallback,
|
||
};
|
||
}),
|
||
];
|
||
|
||
for (const approvalItem of autoAllowed) {
|
||
const permission = approvalItem.permission;
|
||
const autoApprovalMsg: AutoApprovalMessage = {
|
||
type: "auto_approval",
|
||
tool_call: {
|
||
name: approvalItem.approval.toolName,
|
||
tool_call_id: approvalItem.approval.toolCallId,
|
||
arguments: approvalItem.approval.toolArgs,
|
||
},
|
||
reason: permission.reason || "auto-approved",
|
||
matched_rule:
|
||
"matchedRule" in permission && permission.matchedRule
|
||
? permission.matchedRule
|
||
: "auto-approved",
|
||
session_id: sessionId,
|
||
uuid: `auto-approval-${approvalItem.approval.toolCallId}`,
|
||
};
|
||
console.log(JSON.stringify(autoApprovalMsg));
|
||
}
|
||
|
||
for (const ac of needsUserInput) {
|
||
// permission.decision === "ask" - request permission from SDK
|
||
const permResponse = await requestPermission(
|
||
ac.approval.toolCallId,
|
||
ac.approval.toolName,
|
||
ac.parsedArgs,
|
||
);
|
||
|
||
if (permResponse.decision === "allow") {
|
||
// If provided updatedInput (e.g., for AskUserQuestion with answers),
|
||
// update the approval's toolArgs to use it
|
||
const finalApproval = permResponse.updatedInput
|
||
? {
|
||
...ac.approval,
|
||
toolArgs: JSON.stringify(permResponse.updatedInput),
|
||
}
|
||
: ac.approval;
|
||
|
||
decisions.push({
|
||
type: "approve",
|
||
approval: finalApproval,
|
||
matchedRule: "SDK callback approved",
|
||
});
|
||
|
||
// Emit auto_approval event for SDK-approved tool
|
||
const autoApprovalMsg: AutoApprovalMessage = {
|
||
type: "auto_approval",
|
||
tool_call: {
|
||
name: finalApproval.toolName,
|
||
tool_call_id: finalApproval.toolCallId,
|
||
arguments: finalApproval.toolArgs,
|
||
},
|
||
reason: permResponse.reason || "SDK callback approved",
|
||
matched_rule: "canUseTool callback",
|
||
session_id: sessionId,
|
||
uuid: `auto-approval-${ac.approval.toolCallId}`,
|
||
};
|
||
console.log(JSON.stringify(autoApprovalMsg));
|
||
} else {
|
||
decisions.push({
|
||
type: "deny",
|
||
approval: ac.approval,
|
||
reason: permResponse.reason || "Denied by SDK callback",
|
||
});
|
||
}
|
||
}
|
||
|
||
// Execute approved tools
|
||
const { executeApprovalBatch } = await import(
|
||
"./agent/approval-execution"
|
||
);
|
||
const executedResults = await executeApprovalBatch(
|
||
decisions,
|
||
undefined,
|
||
{ toolContextId: turnToolContextId ?? undefined },
|
||
);
|
||
|
||
// Send approval results back to continue
|
||
currentInput = [
|
||
{
|
||
type: "approval",
|
||
approvals: executedResults,
|
||
} as unknown as MessageCreate,
|
||
];
|
||
|
||
// Continue the loop to process the next stream
|
||
continue;
|
||
}
|
||
|
||
// Other stop reasons - break
|
||
break;
|
||
}
|
||
|
||
// Emit result
|
||
const durationMs = performance.now() - startTime;
|
||
const lines = toLines(buffers);
|
||
const reversed = [...lines].reverse();
|
||
const lastAssistant = reversed.find(
|
||
(line) =>
|
||
line.kind === "assistant" &&
|
||
"text" in line &&
|
||
typeof line.text === "string" &&
|
||
line.text.trim().length > 0,
|
||
) as Extract<Line, { kind: "assistant" }> | undefined;
|
||
const lastReasoning = reversed.find(
|
||
(line) =>
|
||
line.kind === "reasoning" &&
|
||
"text" in line &&
|
||
typeof line.text === "string" &&
|
||
line.text.trim().length > 0,
|
||
) as Extract<Line, { kind: "reasoning" }> | undefined;
|
||
const lastToolResult = reversed.find(
|
||
(line) =>
|
||
line.kind === "tool_call" &&
|
||
"resultText" in line &&
|
||
typeof (line as Extract<Line, { kind: "tool_call" }>).resultText ===
|
||
"string" &&
|
||
(
|
||
(line as Extract<Line, { kind: "tool_call" }>).resultText ?? ""
|
||
).trim().length > 0,
|
||
) as Extract<Line, { kind: "tool_call" }> | undefined;
|
||
const resultText =
|
||
lastAssistant?.text ||
|
||
lastReasoning?.text ||
|
||
lastToolResult?.resultText ||
|
||
"";
|
||
|
||
// Determine result subtype based on how the turn ended
|
||
const isAborted = currentAbortController?.signal.aborted;
|
||
// isError if: (1) stop reason indicates error, OR (2) we emitted an error during streaming
|
||
const isError =
|
||
sawStreamError ||
|
||
(lastStopReason &&
|
||
lastStopReason !== "end_turn" &&
|
||
lastStopReason !== "requires_approval");
|
||
const subtype: ResultMessage["subtype"] = isAborted
|
||
? "interrupted"
|
||
: isError
|
||
? "error"
|
||
: "success";
|
||
|
||
const resultMsg: ResultMessage = {
|
||
type: "result",
|
||
subtype,
|
||
session_id: sessionId,
|
||
duration_ms: Math.round(durationMs),
|
||
duration_api_ms: 0, // Not tracked in bidirectional mode
|
||
num_turns: numTurns,
|
||
result: resultText,
|
||
agent_id: agent.id,
|
||
conversation_id: conversationId,
|
||
run_ids: [],
|
||
usage: null,
|
||
uuid: `result-${agent.id}-${Date.now()}`,
|
||
// Include stop_reason only when subtype is "error" (not "interrupted")
|
||
...(subtype === "error" && {
|
||
stop_reason:
|
||
lastStopReason && lastStopReason !== "end_turn"
|
||
? lastStopReason
|
||
: "error", // Use "error" if sawStreamError but lastStopReason was end_turn
|
||
}),
|
||
};
|
||
console.log(JSON.stringify(resultMsg));
|
||
} catch (error) {
|
||
// Use formatErrorDetails for comprehensive error formatting (same as one-shot mode)
|
||
const errorDetails = formatErrorDetails(error, agent.id);
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: errorDetails,
|
||
stop_reason: "error",
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
|
||
// Also emit a result message with subtype: "error" so SDK knows the turn failed
|
||
const errorResultMsg: ResultMessage = {
|
||
type: "result",
|
||
subtype: "error",
|
||
session_id: sessionId,
|
||
duration_ms: 0,
|
||
duration_api_ms: 0,
|
||
num_turns: 0,
|
||
result: null,
|
||
agent_id: agent.id,
|
||
conversation_id: conversationId,
|
||
run_ids: [],
|
||
usage: null,
|
||
uuid: `result-error-${agent.id}-${Date.now()}`,
|
||
stop_reason: "error",
|
||
};
|
||
console.log(JSON.stringify(errorResultMsg));
|
||
} finally {
|
||
turnInProgress = false;
|
||
blockedEmittedThisTurn = false;
|
||
currentAbortController = null;
|
||
}
|
||
continue;
|
||
}
|
||
|
||
// Unknown message type
|
||
const errorMsg: ErrorMessage = {
|
||
type: "error",
|
||
message: `Unknown message type: ${message.type}`,
|
||
stop_reason: "error",
|
||
session_id: sessionId,
|
||
uuid: randomUUID(),
|
||
};
|
||
console.log(JSON.stringify(errorMsg));
|
||
}
|
||
|
||
// Stdin closed, exit gracefully
|
||
setMessageQueueAdder(null);
|
||
process.exit(0);
|
||
}
|