refactor: Unify reminder management across interactive and headless modes (#1001)
Co-authored-by: Letta <noreply@letta.com> Co-authored-by: cpacker <packercharles@gmail.com>
This commit is contained in:
273
src/headless.ts
273
src/headless.ts
@@ -33,6 +33,7 @@ import {
|
||||
toLines,
|
||||
} from "./cli/helpers/accumulator";
|
||||
import { classifyApprovals } from "./cli/helpers/approvalClassification";
|
||||
import { createContextTracker } from "./cli/helpers/contextTracker";
|
||||
import { formatErrorDetails } from "./cli/helpers/errorFormatter";
|
||||
import {
|
||||
getReflectionSettings,
|
||||
@@ -54,6 +55,14 @@ import {
|
||||
mergeQueuedTurnInput,
|
||||
type QueuedTurnInput,
|
||||
} from "./queue/turnQueueRuntime";
|
||||
import {
|
||||
buildSharedReminderParts,
|
||||
prependReminderPartsToContent,
|
||||
} from "./reminders/engine";
|
||||
import {
|
||||
createSharedReminderState,
|
||||
syncReminderStateFromContextTracker,
|
||||
} from "./reminders/state";
|
||||
import { settingsManager } from "./settings-manager";
|
||||
import {
|
||||
isHeadlessAutoAllowTool,
|
||||
@@ -94,40 +103,6 @@ const LLM_API_ERROR_MAX_RETRIES = 3;
|
||||
const CONVERSATION_BUSY_MAX_RETRIES = 1; // Only retry once, fail on 2nd 409
|
||||
const CONVERSATION_BUSY_RETRY_DELAY_MS = 2500; // 2.5 seconds
|
||||
|
||||
export function prependSkillsReminderToContent(
|
||||
content: MessageCreate["content"],
|
||||
skillsReminder: string,
|
||||
): MessageCreate["content"] {
|
||||
if (!skillsReminder) {
|
||||
return content;
|
||||
}
|
||||
|
||||
if (typeof content === "string") {
|
||||
return `${skillsReminder}\n\n${content}`;
|
||||
}
|
||||
|
||||
if (Array.isArray(content)) {
|
||||
return [
|
||||
{
|
||||
type: "text",
|
||||
text: `${skillsReminder}\n\n`,
|
||||
},
|
||||
...content,
|
||||
] as MessageCreate["content"];
|
||||
}
|
||||
|
||||
return content;
|
||||
}
|
||||
|
||||
export function shouldReinjectSkillsAfterCompaction(lines: Line[]): boolean {
|
||||
return lines.some(
|
||||
(line) =>
|
||||
line.kind === "event" &&
|
||||
line.eventType === "compaction" &&
|
||||
line.phase === "finished" &&
|
||||
(line.summary !== undefined || line.stats !== undefined),
|
||||
);
|
||||
}
|
||||
export type BidirectionalQueuedInput = QueuedTurnInput<
|
||||
MessageCreate["content"]
|
||||
>;
|
||||
@@ -1177,6 +1152,9 @@ export async function handleHeadlessCommand(
|
||||
console.log(JSON.stringify(initEvent));
|
||||
}
|
||||
|
||||
const reminderContextTracker = createContextTracker();
|
||||
const sharedReminderState = createSharedReminderState();
|
||||
|
||||
// Helper to resolve any pending approvals before sending user input
|
||||
const resolveAllPendingApprovals = async () => {
|
||||
const { getResumeData } = await import("./agent/check-approval");
|
||||
@@ -1321,16 +1299,23 @@ export async function handleHeadlessCommand(
|
||||
approvalMessages,
|
||||
{ agentId: agent.id },
|
||||
);
|
||||
if (outputFormat === "stream-json") {
|
||||
// Consume quickly but don't emit message frames to stdout
|
||||
for await (const _ of approvalStream) {
|
||||
// no-op
|
||||
}
|
||||
} else {
|
||||
await drainStreamWithResume(
|
||||
approvalStream,
|
||||
createBuffers(agent.id),
|
||||
() => {},
|
||||
const drainResult = await drainStreamWithResume(
|
||||
approvalStream,
|
||||
createBuffers(agent.id),
|
||||
() => {},
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
reminderContextTracker,
|
||||
);
|
||||
// If the approval drain errored or was cancelled, abort rather than
|
||||
// looping back and re-fetching approvals (which would restart the cycle).
|
||||
if (
|
||||
drainResult.stopReason === "error" ||
|
||||
drainResult.stopReason === "cancelled"
|
||||
) {
|
||||
throw new Error(
|
||||
`Approval drain ended with stop reason: ${drainResult.stopReason}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1343,7 +1328,6 @@ export async function handleHeadlessCommand(
|
||||
}
|
||||
|
||||
// Build message content with reminders
|
||||
const { permissionMode } = await import("./permissions/mode");
|
||||
const contentParts: MessageCreate["content"] = [];
|
||||
const pushPart = (text: string) => {
|
||||
if (!text) return;
|
||||
@@ -1363,59 +1347,56 @@ ${SYSTEM_REMINDER_CLOSE}
|
||||
pushPart(systemReminder);
|
||||
}
|
||||
|
||||
// Inject available skills as system-reminder (LET-7353)
|
||||
{
|
||||
const {
|
||||
discoverSkills,
|
||||
SKILLS_DIR: defaultDir,
|
||||
formatSkillsAsSystemReminder,
|
||||
} = await import("./agent/skills");
|
||||
const { getSkillsDirectory } = await import("./agent/context");
|
||||
const { join } = await import("node:path");
|
||||
try {
|
||||
const skillsDir = getSkillsDirectory() || join(process.cwd(), defaultDir);
|
||||
const { skills } = await discoverSkills(skillsDir, agent.id, {
|
||||
sources: resolvedSkillSources,
|
||||
});
|
||||
const skillsReminder = formatSkillsAsSystemReminder(skills);
|
||||
if (skillsReminder) {
|
||||
pushPart(skillsReminder);
|
||||
}
|
||||
|
||||
// Pre-load specific skills' full content (used by subagents with skills: field)
|
||||
if (preLoadSkillsRaw) {
|
||||
const { readFile: readFileAsync } = await import("node:fs/promises");
|
||||
const skillIds = preLoadSkillsRaw
|
||||
.split(",")
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
const loadedContents: string[] = [];
|
||||
for (const skillId of skillIds) {
|
||||
const skill = skills.find((s) => s.id === skillId);
|
||||
if (skill?.path) {
|
||||
try {
|
||||
const content = await readFileAsync(skill.path, "utf-8");
|
||||
loadedContents.push(`<${skillId}>\n${content}\n</${skillId}>`);
|
||||
} catch {
|
||||
// Skill file not readable, skip
|
||||
}
|
||||
}
|
||||
}
|
||||
if (loadedContents.length > 0) {
|
||||
pushPart(
|
||||
`<loaded_skills>\n${loadedContents.join("\n\n")}\n</loaded_skills>`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Skills discovery failed, skip
|
||||
}
|
||||
syncReminderStateFromContextTracker(
|
||||
sharedReminderState,
|
||||
reminderContextTracker,
|
||||
);
|
||||
const lastRunAt = (agent as { last_run_completion?: string })
|
||||
.last_run_completion;
|
||||
const { parts: sharedReminderParts } = await buildSharedReminderParts({
|
||||
mode: "headless-one-shot",
|
||||
agent: {
|
||||
id: agent.id,
|
||||
name: agent.name,
|
||||
description: agent.description,
|
||||
lastRunAt: lastRunAt ?? null,
|
||||
},
|
||||
state: sharedReminderState,
|
||||
sessionContextReminderEnabled: systemInfoReminderEnabled,
|
||||
reflectionSettings: effectiveReflectionSettings,
|
||||
skillSources: resolvedSkillSources,
|
||||
resolvePlanModeReminder: async () => {
|
||||
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
|
||||
return PLAN_MODE_REMINDER;
|
||||
},
|
||||
});
|
||||
for (const part of sharedReminderParts) {
|
||||
pushPart(part.text);
|
||||
}
|
||||
|
||||
// Add plan mode reminder if in plan mode (highest priority)
|
||||
if (permissionMode.getMode() === "plan") {
|
||||
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
|
||||
pushPart(PLAN_MODE_REMINDER);
|
||||
// Pre-load specific skills' full content (used by subagents with skills: field)
|
||||
if (preLoadSkillsRaw) {
|
||||
const { readFile: readFileAsync } = await import("node:fs/promises");
|
||||
const skillIds = preLoadSkillsRaw
|
||||
.split(",")
|
||||
.map((s) => s.trim())
|
||||
.filter(Boolean);
|
||||
const loadedContents: string[] = [];
|
||||
for (const skillId of skillIds) {
|
||||
const skillPath = sharedReminderState.skillPathById[skillId];
|
||||
if (!skillPath) continue;
|
||||
try {
|
||||
const content = await readFileAsync(skillPath, "utf-8");
|
||||
loadedContents.push(`<${skillId}>\n${content}\n</${skillId}>`);
|
||||
} catch {
|
||||
// Skill file not readable, skip
|
||||
}
|
||||
}
|
||||
if (loadedContents.length > 0) {
|
||||
pushPart(
|
||||
`<loaded_skills>\n${loadedContents.join("\n\n")}\n</loaded_skills>`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Add user prompt
|
||||
@@ -1739,6 +1720,7 @@ ${SYSTEM_REMINDER_CLOSE}
|
||||
undefined,
|
||||
undefined,
|
||||
streamJsonHook,
|
||||
reminderContextTracker,
|
||||
);
|
||||
stopReason = result.stopReason;
|
||||
approvals = result.approvals || [];
|
||||
@@ -1751,6 +1733,10 @@ ${SYSTEM_REMINDER_CLOSE}
|
||||
stream,
|
||||
buffers,
|
||||
() => {}, // No UI refresh needed in headless mode
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
reminderContextTracker,
|
||||
);
|
||||
stopReason = result.stopReason;
|
||||
approvals = result.approvals || [];
|
||||
@@ -2271,12 +2257,8 @@ async function runBidirectionalMode(
|
||||
|
||||
// Track current operation for interrupt support
|
||||
let currentAbortController: AbortController | null = null;
|
||||
// Skills reminder lifecycle in bidirectional mode:
|
||||
// - Inject once on first user turn
|
||||
// - Reinject only after compaction completion or skills diff
|
||||
let hasInjectedSkillsReminder = false;
|
||||
let pendingSkillsReinject = false;
|
||||
let cachedSkillsReminder: string | null = null;
|
||||
const reminderContextTracker = createContextTracker();
|
||||
const sharedReminderState = createSharedReminderState();
|
||||
|
||||
// Resolve pending approvals for this conversation before retrying user input.
|
||||
const resolveAllPendingApprovals = async () => {
|
||||
@@ -2394,11 +2376,23 @@ async function runBidirectionalMode(
|
||||
approvalMessages,
|
||||
{ agentId: agent.id },
|
||||
);
|
||||
await drainStreamWithResume(
|
||||
const drainResult = await drainStreamWithResume(
|
||||
approvalStream,
|
||||
createBuffers(agent.id),
|
||||
() => {},
|
||||
undefined,
|
||||
undefined,
|
||||
undefined,
|
||||
reminderContextTracker,
|
||||
);
|
||||
if (
|
||||
drainResult.stopReason === "error" ||
|
||||
drainResult.stopReason === "cancelled"
|
||||
) {
|
||||
throw new Error(
|
||||
`Approval drain ended with stop reason: ${drainResult.stopReason}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -2795,48 +2789,33 @@ async function runBidirectionalMode(
|
||||
let sawStreamError = false; // Track if we emitted an error during streaming
|
||||
let preStreamTransientRetries = 0;
|
||||
|
||||
// Inject available skills as system-reminder for bidirectional mode (LET-7353).
|
||||
// Discover each turn so skill file changes are naturally picked up.
|
||||
let enrichedContent = userContent;
|
||||
try {
|
||||
const {
|
||||
discoverSkills: discover,
|
||||
SKILLS_DIR: defaultDir,
|
||||
formatSkillsAsSystemReminder,
|
||||
} = await import("./agent/skills");
|
||||
const { getSkillsDirectory } = await import("./agent/context");
|
||||
const { join } = await import("node:path");
|
||||
const skillsDir =
|
||||
getSkillsDirectory() || join(process.cwd(), defaultDir);
|
||||
const { skills } = await discover(skillsDir, agent.id, {
|
||||
sources: skillSources,
|
||||
});
|
||||
const latestSkillsReminder = formatSkillsAsSystemReminder(skills);
|
||||
|
||||
// Trigger reinjection when the available-skills block changed on disk.
|
||||
if (
|
||||
cachedSkillsReminder !== null &&
|
||||
latestSkillsReminder !== cachedSkillsReminder
|
||||
) {
|
||||
pendingSkillsReinject = true;
|
||||
}
|
||||
cachedSkillsReminder = latestSkillsReminder;
|
||||
|
||||
const shouldInjectSkillsReminder =
|
||||
!hasInjectedSkillsReminder || pendingSkillsReinject;
|
||||
if (shouldInjectSkillsReminder && latestSkillsReminder) {
|
||||
enrichedContent = prependSkillsReminderToContent(
|
||||
enrichedContent,
|
||||
latestSkillsReminder,
|
||||
);
|
||||
}
|
||||
if (shouldInjectSkillsReminder) {
|
||||
hasInjectedSkillsReminder = true;
|
||||
pendingSkillsReinject = false;
|
||||
}
|
||||
} catch {
|
||||
// Skills discovery failed, skip
|
||||
}
|
||||
syncReminderStateFromContextTracker(
|
||||
sharedReminderState,
|
||||
reminderContextTracker,
|
||||
);
|
||||
const lastRunAt = (agent as { last_run_completion?: string })
|
||||
.last_run_completion;
|
||||
const { parts: sharedReminderParts } = await buildSharedReminderParts({
|
||||
mode: "headless-bidirectional",
|
||||
agent: {
|
||||
id: agent.id,
|
||||
name: agent.name,
|
||||
description: agent.description,
|
||||
lastRunAt: lastRunAt ?? null,
|
||||
},
|
||||
state: sharedReminderState,
|
||||
sessionContextReminderEnabled: systemInfoReminderEnabled,
|
||||
reflectionSettings,
|
||||
skillSources,
|
||||
resolvePlanModeReminder: async () => {
|
||||
const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets");
|
||||
return PLAN_MODE_REMINDER;
|
||||
},
|
||||
});
|
||||
const enrichedContent = prependReminderPartsToContent(
|
||||
userContent,
|
||||
sharedReminderParts,
|
||||
);
|
||||
|
||||
// Initial input is the user message
|
||||
let currentInput: MessageCreate[] = [
|
||||
@@ -3005,6 +2984,7 @@ async function runBidirectionalMode(
|
||||
currentAbortController?.signal,
|
||||
undefined,
|
||||
streamJsonHook,
|
||||
reminderContextTracker,
|
||||
);
|
||||
const stopReason = result.stopReason;
|
||||
lastStopReason = stopReason; // Track for result subtype
|
||||
@@ -3176,9 +3156,6 @@ async function runBidirectionalMode(
|
||||
// Emit result
|
||||
const durationMs = performance.now() - startTime;
|
||||
const lines = toLines(buffers);
|
||||
if (shouldReinjectSkillsAfterCompaction(lines)) {
|
||||
pendingSkillsReinject = true;
|
||||
}
|
||||
const reversed = [...lines].reverse();
|
||||
const lastAssistant = reversed.find(
|
||||
(line) =>
|
||||
|
||||
Reference in New Issue
Block a user