From 48ccd8f220334ad67a4fa236845326f4e162301c Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Wed, 4 Feb 2026 22:45:16 -0800 Subject: [PATCH] feat: add background task notification system (#827) Co-authored-by: Letta --- src/agent/subagents/manager.ts | 13 + src/cli/App.tsx | 297 +++++++++++---- src/cli/components/EventMessage.tsx | 14 + src/cli/components/InputRich.tsx | 12 +- src/cli/components/QueuedMessages.tsx | 19 +- src/cli/components/SubagentGroupDisplay.tsx | 14 +- src/cli/components/SubagentGroupStatic.tsx | 15 +- src/cli/components/ToolCallMessageRich.tsx | 8 + src/cli/components/UserMessageRich.tsx | 13 +- src/cli/helpers/backfill.ts | 38 +- src/cli/helpers/messageQueueBridge.ts | 63 ++++ src/cli/helpers/queuedMessageParts.ts | 44 +++ src/cli/helpers/subagentAggregation.ts | 40 +- src/cli/helpers/subagentState.ts | 3 + src/cli/helpers/taskNotifications.ts | 121 ++++++ src/headless.ts | 60 ++- src/tests/cli/queuedMessageParts.test.ts | 82 ++++ src/tests/cli/taskNotifications.test.ts | 216 +++++++++++ src/tests/tools/bash-background.test.ts | 39 ++ src/tests/tools/task-background.test.ts | 394 ++++++++++++++++++++ src/tests/tools/task-output.test.ts | 167 +++++++++ src/tools/descriptions/AskUserQuestion.md | 3 + src/tools/descriptions/Bash.md | 16 +- src/tools/descriptions/EnterPlanMode.md | 50 ++- src/tools/descriptions/ExitPlanMode.md | 13 +- src/tools/descriptions/Glob.md | 2 +- src/tools/descriptions/Read.md | 6 +- src/tools/descriptions/Task.md | 68 ++-- src/tools/descriptions/TaskOutput.md | 9 + src/tools/descriptions/TaskStop.md | 6 + src/tools/impl/Bash.ts | 24 +- src/tools/impl/BashOutput.ts | 185 ++++++++- src/tools/impl/Task.ts | 192 +++++++++- src/tools/impl/TaskOutput.ts | 30 ++ src/tools/impl/TaskStop.ts | 37 ++ src/tools/impl/process_manager.ts | 57 +++ src/tools/manager.ts | 6 +- src/tools/schemas/AskUserQuestion.json | 1 + src/tools/schemas/Bash.json | 4 +- src/tools/schemas/Grep.json | 6 +- src/tools/schemas/Task.json | 35 +- src/tools/schemas/TaskOutput.json | 24 ++ src/tools/schemas/TaskStop.json | 16 + src/tools/toolDefinitions.ts | 16 + 44 files changed, 2244 insertions(+), 234 deletions(-) create mode 100644 src/cli/helpers/messageQueueBridge.ts create mode 100644 src/cli/helpers/queuedMessageParts.ts create mode 100644 src/cli/helpers/taskNotifications.ts create mode 100644 src/tests/cli/queuedMessageParts.test.ts create mode 100644 src/tests/cli/taskNotifications.test.ts create mode 100644 src/tests/tools/task-background.test.ts create mode 100644 src/tests/tools/task-output.test.ts create mode 100644 src/tools/descriptions/TaskOutput.md create mode 100644 src/tools/descriptions/TaskStop.md create mode 100644 src/tools/impl/TaskOutput.ts create mode 100644 src/tools/impl/TaskStop.ts create mode 100644 src/tools/schemas/TaskOutput.json create mode 100644 src/tools/schemas/TaskStop.json diff --git a/src/agent/subagents/manager.ts b/src/agent/subagents/manager.ts index 6486e8b..ec938ff 100644 --- a/src/agent/subagents/manager.ts +++ b/src/agent/subagents/manager.ts @@ -433,6 +433,7 @@ function buildSubagentArgs( existingAgentId?: string, existingConversationId?: string, preloadedSkillsContent?: string, + maxTurns?: number, ): string[] { const args: string[] = []; const isDeployingExisting = Boolean( @@ -519,6 +520,11 @@ function buildSubagentArgs( args.push("--block-value", `loaded_skills=${preloadedSkillsContent}`); } + // Add max turns limit if specified + if (maxTurns !== undefined && maxTurns > 0) { + args.push("--max-turns", String(maxTurns)); + } + return args; } @@ -536,6 +542,7 @@ async function executeSubagent( signal?: AbortSignal, existingAgentId?: string, existingConversationId?: string, + maxTurns?: number, ): Promise { // Check if already aborted before starting if (signal?.aborted) { @@ -570,6 +577,7 @@ async function executeSubagent( existingAgentId, existingConversationId, preloadedSkillsContent, + maxTurns, ); // Spawn Letta Code in headless mode. @@ -678,6 +686,9 @@ async function executeSubagent( subagentId, true, // Mark as retry to prevent infinite loops signal, + undefined, // existingAgentId + undefined, // existingConversationId + maxTurns, ); } } @@ -788,6 +799,7 @@ export async function spawnSubagent( signal?: AbortSignal, existingAgentId?: string, existingConversationId?: string, + maxTurns?: number, ): Promise { const allConfigs = await getAllSubagentConfigs(); const config = allConfigs[type]; @@ -847,6 +859,7 @@ export async function spawnSubagent( signal, existingAgentId, existingConversationId, + maxTurns, ); return result; diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 2d5a3ab..648d3ad 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -174,12 +174,21 @@ import { buildMemoryReminder, parseMemoryPreference, } from "./helpers/memoryReminder"; +import { + type QueuedMessage, + setMessageQueueAdder, +} from "./helpers/messageQueueBridge"; import { buildMessageContentFromDisplay, clearPlaceholdersInText, resolvePlaceholders, } from "./helpers/pasteRegistry"; import { generatePlanFilePath } from "./helpers/planName"; +import { + buildQueuedContentParts, + buildQueuedUserText, + getQueuedNotificationSummaries, +} from "./helpers/queuedMessageParts"; import { safeJsonParseOr } from "./helpers/safeJsonParse"; import { getDeviceType, getLocalTime } from "./helpers/sessionContext"; import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream"; @@ -191,10 +200,13 @@ import { import { clearCompletedSubagents, clearSubagentsByIds, + getSubagentByToolCallId, getSnapshot as getSubagentSnapshot, + hasActiveSubagents, interruptActiveSubagents, subscribe as subscribeToSubagents, } from "./helpers/subagentState"; +import { extractTaskNotificationsForDisplay } from "./helpers/taskNotifications"; import { getRandomPastTenseVerb, getRandomThinkingVerb, @@ -689,6 +701,17 @@ function stripSystemReminders(text: string): string { .trim(); } +function buildTextParts( + ...parts: Array +): Array<{ type: "text"; text: string }> { + const out: Array<{ type: "text"; text: string }> = []; + for (const part of parts) { + if (!part) continue; + out.push({ type: "text", text: part }); + } + return out; +} + // Items that have finished rendering and no longer change type StaticItem = | { @@ -708,7 +731,7 @@ type StaticItem = id: string; type: string; description: string; - status: "completed" | "error"; + status: "completed" | "error" | "running"; toolCount: number; totalTokens: number; agentURL: string | null; @@ -1388,15 +1411,29 @@ export default function App({ const conversationBusyRetriesRef = useRef(0); // Message queue state for queueing messages during streaming - const [messageQueue, setMessageQueue] = useState([]); + const [messageQueue, setMessageQueue] = useState([]); - const messageQueueRef = useRef([]); // For synchronous access + const messageQueueRef = useRef([]); // For synchronous access useEffect(() => { messageQueueRef.current = messageQueue; }, [messageQueue]); + // Override content parts for queued submissions (to preserve part boundaries) + const overrideContentPartsRef = useRef(null); + + // Set up message queue bridge for background tasks + // This allows non-React code (Task.ts) to add notifications to messageQueue + useEffect(() => { + // Provide a queue adder that adds to messageQueue and bumps dequeueEpoch + setMessageQueueAdder((message: QueuedMessage) => { + setMessageQueue((q) => [...q, message]); + setDequeueEpoch((e) => e + 1); + }); + return () => setMessageQueueAdder(null); + }, []); + const waitingForQueueCancelRef = useRef(false); - const queueSnapshotRef = useRef([]); + const queueSnapshotRef = useRef([]); const [restoreQueueOnCancel, setRestoreQueueOnCancel] = useState(false); const restoreQueueOnCancelRef = useRef(restoreQueueOnCancel); useEffect(() => { @@ -1433,8 +1470,28 @@ export default function App({ ); }, [isExecutingTool]); + const appendTaskNotificationEvents = useCallback( + (summaries: string[]): boolean => { + if (summaries.length === 0) return false; + for (const summary of summaries) { + const eventId = uid("event"); + buffersRef.current.byId.set(eventId, { + kind: "event", + id: eventId, + eventType: "task_notification", + eventData: {}, + phase: "finished", + summary, + }); + buffersRef.current.order.push(eventId); + } + return true; + }, + [], + ); + // Consume queued messages for appending to tool results (clears queue + timeout) - const consumeQueuedMessages = useCallback((): string[] | null => { + const consumeQueuedMessages = useCallback((): QueuedMessage[] | null => { if (messageQueueRef.current.length === 0) return null; if (queueAppendTimeoutRef.current) { clearTimeout(queueAppendTimeoutRef.current); @@ -1651,6 +1708,15 @@ export default function App({ // Handle Task tool_calls specially - track position but don't add individually // (unless there's no subagent data, in which case commit as regular tool call) if (ln.kind === "tool_call" && ln.name && isTaskTool(ln.name)) { + if (hasInProgress && ln.toolCallId) { + const subagent = getSubagentByToolCallId(ln.toolCallId); + if (subagent) { + if (firstTaskIndex === -1) { + firstTaskIndex = newlyCommitted.length; + } + continue; + } + } // Check if this specific Task tool has subagent data (will be grouped) const hasSubagentData = finishedTaskToolCalls.some( (tc) => tc.lineId === id, @@ -2860,8 +2926,11 @@ export default function App({ // Reset interrupted flag since we're starting a fresh stream buffersRef.current.interrupted = false; - // Clear completed subagents from the UI when starting a new turn - clearCompletedSubagents(); + // Clear completed subagents from the UI when starting a new turn, + // but only if no subagents are still running. + if (!hasActiveSubagents()) { + clearCompletedSubagents(); + } while (true) { // Capture the signal BEFORE any async operations @@ -3696,34 +3765,49 @@ export default function App({ } // Append queued messages if any (from 15s append mode) - const queuedMessagesToAppend = consumeQueuedMessages(); - if (queuedMessagesToAppend?.length) { - for (const msg of queuedMessagesToAppend) { - const userId = uid("user"); - buffersRef.current.byId.set(userId, { - kind: "user", - id: userId, - text: msg, - }); - buffersRef.current.order.push(userId); - } + const queuedItemsToAppend = consumeQueuedMessages(); + const queuedNotifications = queuedItemsToAppend + ? getQueuedNotificationSummaries(queuedItemsToAppend) + : []; + const hadNotifications = + appendTaskNotificationEvents(queuedNotifications); + const queuedUserText = queuedItemsToAppend + ? buildQueuedUserText(queuedItemsToAppend) + : ""; + + if (queuedUserText) { + const userId = uid("user"); + buffersRef.current.byId.set(userId, { + kind: "user", + id: userId, + text: queuedUserText, + }); + buffersRef.current.order.push(userId); + } + + if (queuedItemsToAppend && queuedItemsToAppend.length > 0) { + const queuedContentParts = + buildQueuedContentParts(queuedItemsToAppend); setThinkingMessage(getRandomThinkingVerb()); refreshDerived(); toolResultsInFlightRef.current = true; await processConversation( [ { type: "approval", approvals: allResults }, - ...queuedMessagesToAppend.map((msg) => ({ - type: "message" as const, - role: "user" as const, - content: msg as unknown as MessageCreate["content"], - })), + { + type: "message", + role: "user", + content: queuedContentParts, + }, ], { allowReentry: true }, ); toolResultsInFlightRef.current = false; return; } + if (hadNotifications || queuedUserText.length > 0) { + refreshDerived(); + } // Cancel mode - queue results and let dequeue effect handle if (waitingForQueueCancelRef.current) { @@ -4299,6 +4383,7 @@ export default function App({ needsEagerApprovalCheck, queueApprovalResults, consumeQueuedMessages, + appendTaskNotificationEvents, maybeSyncMemoryFilesystemAfterTurn, openTrajectorySegment, syncTrajectoryTokenBase, @@ -5177,6 +5262,15 @@ export default function App({ const onSubmit = useCallback( async (message?: string): Promise<{ submitted: boolean }> => { const msg = message?.trim() ?? ""; + const overrideContentParts = overrideContentPartsRef.current; + if (overrideContentParts) { + overrideContentPartsRef.current = null; + } + const { notifications: taskNotifications, cleanedText } = + extractTaskNotificationsForDisplay(msg); + const userTextForInput = cleanedText.trim(); + const isSystemOnly = + taskNotifications.length > 0 && userTextForInput.length === 0; // Handle profile load confirmation (Enter to continue) if (profileConfirmPending && !msg) { @@ -5210,14 +5304,16 @@ export default function App({ if (!msg) return { submitted: false }; // Run UserPromptSubmit hooks - can block the prompt from being processed - const isCommand = msg.startsWith("/"); - const hookResult = await runUserPromptSubmitHooks( - msg, - isCommand, - agentId, - conversationIdRef.current, - ); - if (hookResult.blocked) { + const isCommand = userTextForInput.startsWith("/"); + const hookResult = isSystemOnly + ? { blocked: false, feedback: [] as string[] } + : await runUserPromptSubmitHooks( + userTextForInput, + isCommand, + agentId, + conversationIdRef.current, + ); + if (!isSystemOnly && hookResult.blocked) { // Show feedback from hook in the transcript const feedbackId = uid("status"); const feedback = hookResult.feedback.join("\n") || "Blocked by hook"; @@ -5244,7 +5340,13 @@ export default function App({ const submissionGeneration = conversationGenerationRef.current; // Track user input (agent_id automatically added from telemetry.currentAgentId) - telemetry.trackUserInput(msg, "user", currentModelId || "unknown"); + if (!isSystemOnly && userTextForInput.length > 0) { + telemetry.trackUserInput( + userTextForInput, + "user", + currentModelId || "unknown", + ); + } // Block submission if waiting for explicit user action (approvals) // In this case, input is hidden anyway, so this shouldn't happen @@ -5275,11 +5377,15 @@ export default function App({ // so users can browse/view while the agent is working. // Changes made in these overlays will be queued until end_turn. const shouldBypassQueue = - isInteractiveCommand(msg) || isNonStateCommand(msg); + isInteractiveCommand(userTextForInput) || + isNonStateCommand(userTextForInput); if (isAgentBusy() && !shouldBypassQueue) { setMessageQueue((prev) => { - const newQueue = [...prev, msg]; + const newQueue: QueuedMessage[] = [ + ...prev, + { kind: "user", text: msg }, + ]; const isSlashCommand = msg.startsWith("/"); @@ -5806,7 +5912,7 @@ export default function App({ { type: "message", role: "user", - content: `${systemMsg}\n\n${prompt}`, + content: buildTextParts(systemMsg, prompt), }, ]); } else { @@ -7183,7 +7289,7 @@ export default function App({ { type: "message", role: "user", - content: skillMessage, + content: buildTextParts(skillMessage), }, ]); } catch (error) { @@ -7241,9 +7347,12 @@ export default function App({ ); // Build system-reminder content for memory request - const rememberMessage = userText - ? `${SYSTEM_REMINDER_OPEN}\n${REMEMBER_PROMPT}\n${SYSTEM_REMINDER_CLOSE}${userText}` + const rememberReminder = userText + ? `${SYSTEM_REMINDER_OPEN}\n${REMEMBER_PROMPT}\n${SYSTEM_REMINDER_CLOSE}` : `${SYSTEM_REMINDER_OPEN}\n${REMEMBER_PROMPT}\n\nThe user did not specify what to remember. Look at the recent conversation context to identify what they likely want you to remember, or ask them to clarify.\n${SYSTEM_REMINDER_CLOSE}`; + const rememberParts = userText + ? buildTextParts(rememberReminder, userText) + : buildTextParts(rememberReminder); // Mark command as finished before sending message buffersRef.current.byId.set(cmdId, { @@ -7263,7 +7372,7 @@ export default function App({ { type: "message", role: "user", - content: rememberMessage, + content: rememberParts, }, ]); } catch (error) { @@ -7436,7 +7545,7 @@ ${SYSTEM_REMINDER_CLOSE}`; { type: "message", role: "user", - content: initMessage, + content: buildTextParts(initMessage), }, ]); } catch (error) { @@ -7519,7 +7628,9 @@ ${SYSTEM_REMINDER_CLOSE}`; { type: "message", role: "user", - content: `${SYSTEM_REMINDER_OPEN}\n${prompt}\n${SYSTEM_REMINDER_CLOSE}`, + content: buildTextParts( + `${SYSTEM_REMINDER_OPEN}\n${prompt}\n${SYSTEM_REMINDER_CLOSE}`, + ), }, ]); } catch (error) { @@ -7567,7 +7678,8 @@ ${SYSTEM_REMINDER_CLOSE}`; } // Build message content from display value (handles placeholders for text/images) - const contentParts = buildMessageContentFromDisplay(msg); + const contentParts = + overrideContentParts ?? buildMessageContentFromDisplay(msg); // Prepend plan mode reminder if in plan mode const planModeReminder = getPlanModeReminder(); @@ -7694,33 +7806,42 @@ ${SYSTEM_REMINDER_CLOSE} lastNotifiedModeRef.current = currentMode; } - // Combine reminders with content (session context first, then session start hook, then permission mode, then plan mode, then ralph mode, then skill unload, then bash commands, then hook feedback, then memory reminder, then memfs conflicts) - const allReminders = - sessionContextReminder + - sessionStartHookFeedback + - permissionModeAlert + - planModeReminder + - ralphModeReminder + - skillUnloadReminder + - bashCommandPrefix + - userPromptSubmitHookFeedback + - memoryReminderContent + - memfsConflictReminder; + // Combine reminders with content as separate text parts. + // This preserves each reminder boundary in the API payload. + // Note: Task notifications now come through messageQueue directly (added by messageQueueBridge) + const reminderParts: Array<{ type: "text"; text: string }> = []; + const pushReminder = (text: string) => { + if (!text) return; + reminderParts.push({ type: "text", text }); + }; + pushReminder(sessionContextReminder); + pushReminder(sessionStartHookFeedback); + pushReminder(permissionModeAlert); + pushReminder(planModeReminder); + pushReminder(ralphModeReminder); + pushReminder(skillUnloadReminder); + pushReminder(bashCommandPrefix); + pushReminder(userPromptSubmitHookFeedback); + pushReminder(memoryReminderContent); + pushReminder(memfsConflictReminder); const messageContent = - allReminders && typeof contentParts === "string" - ? allReminders + contentParts - : Array.isArray(contentParts) && allReminders - ? [{ type: "text" as const, text: allReminders }, ...contentParts] - : contentParts; + reminderParts.length > 0 + ? [...reminderParts, ...contentParts] + : contentParts; + + // Append task notifications (if any) as event lines before the user message + appendTaskNotificationEvents(taskNotifications); // Append the user message to transcript IMMEDIATELY (optimistic update) const userId = uid("user"); - buffersRef.current.byId.set(userId, { - kind: "user", - id: userId, - text: msg, - }); - buffersRef.current.order.push(userId); + if (userTextForInput) { + buffersRef.current.byId.set(userId, { + kind: "user", + id: userId, + text: userTextForInput, + }); + buffersRef.current.order.push(userId); + } // Reset token counter for this turn (only count the agent's response) buffersRef.current.tokenCount = 0; @@ -8334,6 +8455,7 @@ ${SYSTEM_REMINDER_CLOSE} pendingRalphConfig, openTrajectorySegment, resetTrajectoryBases, + appendTaskNotificationEvents, ], ); @@ -8343,14 +8465,18 @@ ${SYSTEM_REMINDER_CLOSE} }, [onSubmit]); // Process queued messages when streaming ends + // Task notifications are now added directly to messageQueue via messageQueueBridge useEffect(() => { // Reference dequeueEpoch to satisfy exhaustive-deps - it's used to force // re-runs when userCancelledRef is reset (refs aren't in deps) + // Also triggers when task notifications are added to queue void dequeueEpoch; + const hasAnythingQueued = messageQueue.length > 0; + if ( !streaming && - messageQueue.length > 0 && + hasAnythingQueued && pendingApprovals.length === 0 && !commandRunning && !isExecutingTool && @@ -8360,7 +8486,12 @@ ${SYSTEM_REMINDER_CLOSE} ) { // Concatenate all queued messages into one (better UX when user types multiple // messages quickly - they get combined into one context for the agent) - const concatenatedMessage = messageQueue.join("\n"); + // Task notifications are already in the queue as XML strings + const concatenatedMessage = messageQueue + .map((item) => item.text) + .join("\n"); + const queuedContentParts = buildQueuedContentParts(messageQueue); + debugLog( "queue", `Dequeuing ${messageQueue.length} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`, @@ -8372,8 +8503,9 @@ ${SYSTEM_REMINDER_CLOSE} // Submit the concatenated message using the normal submit flow // This ensures all setup (reminders, UI updates, etc.) happens correctly + overrideContentPartsRef.current = queuedContentParts; onSubmitRef.current(concatenatedMessage); - } else if (messageQueue.length > 0) { + } else if (hasAnythingQueued) { // Log why dequeue was blocked (useful for debugging stuck queues) debugLog( "queue", @@ -8387,7 +8519,7 @@ ${SYSTEM_REMINDER_CLOSE} commandRunning, isExecutingTool, anySelectorOpen, - dequeueEpoch, // Triggered when userCancelledRef is reset while messages are queued + dequeueEpoch, // Triggered when userCancelledRef is reset OR task notifications added ]); // Helper to send all approval results when done @@ -8575,25 +8707,33 @@ ${SYSTEM_REMINDER_CLOSE} waitingForQueueCancelRef.current = false; queueSnapshotRef.current = []; } else { - const queuedMessagesToAppend = consumeQueuedMessages(); + const queuedItemsToAppend = consumeQueuedMessages(); + const queuedNotifications = queuedItemsToAppend + ? getQueuedNotificationSummaries(queuedItemsToAppend) + : []; + const hadNotifications = + appendTaskNotificationEvents(queuedNotifications); const input: Array = [ { type: "approval", approvals: allResults as ApprovalResult[] }, ]; - if (queuedMessagesToAppend?.length) { - for (const msg of queuedMessagesToAppend) { + if (queuedItemsToAppend && queuedItemsToAppend.length > 0) { + const queuedUserText = buildQueuedUserText(queuedItemsToAppend); + if (queuedUserText) { const userId = uid("user"); buffersRef.current.byId.set(userId, { kind: "user", id: userId, - text: msg, + text: queuedUserText, }); buffersRef.current.order.push(userId); - input.push({ - type: "message", - role: "user", - content: msg as unknown as MessageCreate["content"], - }); } + input.push({ + type: "message", + role: "user", + content: buildQueuedContentParts(queuedItemsToAppend), + }); + refreshDerived(); + } else if (hadNotifications) { refreshDerived(); } toolResultsInFlightRef.current = true; @@ -8627,6 +8767,7 @@ ${SYSTEM_REMINDER_CLOSE} updateStreamingOutput, queueApprovalResults, consumeQueuedMessages, + appendTaskNotificationEvents, syncTrajectoryElapsedBase, closeTrajectorySegment, openTrajectorySegment, diff --git a/src/cli/components/EventMessage.tsx b/src/cli/components/EventMessage.tsx index 6fa5276..12963e4 100644 --- a/src/cli/components/EventMessage.tsx +++ b/src/cli/components/EventMessage.tsx @@ -34,6 +34,20 @@ export const EventMessage = memo(({ line }: { line: EventLine }) => { const columns = useTerminalWidth(); const rightWidth = Math.max(0, columns - 2); + if (line.eventType === "task_notification") { + const summary = line.summary || "Agent task completed"; + return ( + + + + + + {summary} + + + ); + } + // Only handle compaction events for now if (line.eventType !== "compaction") { return ( diff --git a/src/cli/components/InputRich.tsx b/src/cli/components/InputRich.tsx index 8123239..208a90c 100644 --- a/src/cli/components/InputRich.tsx +++ b/src/cli/components/InputRich.tsx @@ -24,6 +24,7 @@ import { OPENAI_CODEX_PROVIDER_NAME } from "../../providers/openai-codex-provide import { ralphMode } from "../../ralph/mode"; import { settingsManager } from "../../settings-manager"; import { charsToTokens, formatCompact } from "../helpers/format"; +import type { QueuedMessage } from "../helpers/messageQueueBridge"; import { useTerminalWidth } from "../hooks/useTerminalWidth"; import { colors } from "./colors"; import { InputAssist } from "./InputAssist"; @@ -236,7 +237,7 @@ export function Input({ agentName?: string | null; currentModel?: string | null; currentModelProvider?: string | null; - messageQueue?: string[]; + messageQueue?: QueuedMessage[]; onEnterQueueEditMode?: () => void; onEscapeCancel?: () => void; ralphActive?: boolean; @@ -548,7 +549,14 @@ export function Input({ ) { setAtStartBoundary(false); // Clear the queue and load into input as one multi-line message - const queueText = messageQueue.join("\n"); + const queueText = messageQueue + .filter((item) => item.kind === "user") + .map((item) => item.text.trim()) + .filter((msg) => msg.length > 0) + .join("\n"); + if (!queueText) { + return; + } setValue(queueText); // Signal to App.tsx to clear the queue if (onEnterQueueEditMode) { diff --git a/src/cli/components/QueuedMessages.tsx b/src/cli/components/QueuedMessages.tsx index 286afee..2e9ef52 100644 --- a/src/cli/components/QueuedMessages.tsx +++ b/src/cli/components/QueuedMessages.tsx @@ -1,17 +1,26 @@ import { Box } from "ink"; import { memo } from "react"; +import type { QueuedMessage } from "../helpers/messageQueueBridge"; import { Text } from "./Text"; interface QueuedMessagesProps { - messages: string[]; + messages: QueuedMessage[]; } export const QueuedMessages = memo(({ messages }: QueuedMessagesProps) => { const maxDisplay = 5; + const displayMessages = messages + .filter((msg) => msg.kind === "user") + .map((msg) => msg.text.trim()) + .filter((msg) => msg.length > 0); + + if (displayMessages.length === 0) { + return null; + } return ( - {messages.slice(0, maxDisplay).map((msg, index) => ( + {displayMessages.slice(0, maxDisplay).map((msg, index) => ( {">"} @@ -22,11 +31,13 @@ export const QueuedMessages = memo(({ messages }: QueuedMessagesProps) => { ))} - {messages.length > maxDisplay && ( + {displayMessages.length > maxDisplay && ( - ...and {messages.length - maxDisplay} more + + ...and {displayMessages.length - maxDisplay} more + )} diff --git a/src/cli/components/SubagentGroupDisplay.tsx b/src/cli/components/SubagentGroupDisplay.tsx index 8306905..6314356 100644 --- a/src/cli/components/SubagentGroupDisplay.tsx +++ b/src/cli/components/SubagentGroupDisplay.tsx @@ -111,8 +111,12 @@ const AgentRow = memo( {" "} {agent.status === "error" ? ( Error + ) : isComplete ? ( + Done + ) : agent.isBackground ? ( + Running in the background ) : ( - {isComplete ? "Done" : "Running..."} + Running... )} @@ -197,6 +201,14 @@ const AgentRow = memo( + ) : agent.isBackground ? ( + <> + + {" "} + {continueChar} + + {" Running in the background"} + ) : lastTool ? ( <> diff --git a/src/cli/components/SubagentGroupStatic.tsx b/src/cli/components/SubagentGroupStatic.tsx index 8624ea9..387308d 100644 --- a/src/cli/components/SubagentGroupStatic.tsx +++ b/src/cli/components/SubagentGroupStatic.tsx @@ -28,12 +28,13 @@ export interface StaticSubagent { id: string; type: string; description: string; - status: "completed" | "error"; + status: "completed" | "error" | "running"; toolCount: number; totalTokens: number; agentURL: string | null; error?: string; model?: string; + isBackground?: boolean; } interface SubagentGroupStaticProps { @@ -91,7 +92,7 @@ const AgentRow = memo(({ agent, isLast }: AgentRowProps) => { {/* Status line */} - {agent.status === "completed" ? ( + {agent.status === "completed" && !agent.isBackground ? ( <> {" "} @@ -99,7 +100,7 @@ const AgentRow = memo(({ agent, isLast }: AgentRowProps) => { {" Done"} - ) : ( + ) : agent.status === "error" ? ( <> @@ -116,6 +117,14 @@ const AgentRow = memo(({ agent, isLast }: AgentRowProps) => { + ) : ( + <> + + {" "} + {continueChar} + + {" Running in the background"} + )} diff --git a/src/cli/components/ToolCallMessageRich.tsx b/src/cli/components/ToolCallMessageRich.tsx index 95a3f08..19443f4 100644 --- a/src/cli/components/ToolCallMessageRich.tsx +++ b/src/cli/components/ToolCallMessageRich.tsx @@ -10,6 +10,7 @@ import { parsePatchInput, parsePatchOperations, } from "../helpers/formatArgsDisplay.js"; +import { getSubagentByToolCallId } from "../helpers/subagentState.js"; import { getDisplayToolName, isFileEditTool, @@ -112,6 +113,13 @@ export const ToolCallMessage = memo( // and liveItems handles pending approvals via InlineGenericApproval) if (isTaskTool(rawName)) { const isFinished = line.phase === "finished"; + const subagent = line.toolCallId + ? getSubagentByToolCallId(line.toolCallId) + : undefined; + if (subagent) { + // Task tool calls with subagent data are handled by SubagentGroupDisplay/Static + return null; + } if (!isFinished) { // Not finished - SubagentGroupDisplay or approval UI handles this return null; diff --git a/src/cli/components/UserMessageRich.tsx b/src/cli/components/UserMessageRich.tsx index 0c8d120..aaa316f 100644 --- a/src/cli/components/UserMessageRich.tsx +++ b/src/cli/components/UserMessageRich.tsx @@ -1,6 +1,7 @@ import { memo } from "react"; import stringWidth from "string-width"; import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "../../constants"; +import { extractTaskNotificationsForDisplay } from "../helpers/taskNotifications"; import { useTerminalWidth } from "../hooks/useTerminalWidth"; import { colors, hexToBgAnsi, hexToFgAnsi } from "./colors"; import { Text } from "./Text"; @@ -156,6 +157,11 @@ function renderBlock( export const UserMessage = memo(({ line }: { line: UserLine }) => { const columns = useTerminalWidth(); const contentWidth = Math.max(1, columns - 2); + const cleanedText = extractTaskNotificationsForDisplay(line.text).cleanedText; + const displayText = cleanedText.trim(); + if (!displayText) { + return null; + } // Build combined ANSI code for background + optional foreground const { background, text: textColor } = colors.userMessage; @@ -164,23 +170,20 @@ export const UserMessage = memo(({ line }: { line: UserLine }) => { const colorAnsi = bgAnsi + fgAnsi; // Split into system-reminder blocks and user content blocks - const blocks = splitSystemReminderBlocks(line.text); + const blocks = splitSystemReminderBlocks(displayText); const allLines: string[] = []; for (const block of blocks) { if (!block.text.trim()) continue; - - // Add blank line between blocks (not before first) if (allLines.length > 0) { allLines.push(""); } - const blockLines = renderBlock( block.text, contentWidth, columns, - !block.isSystemReminder, // highlight user content, not system-reminder + !block.isSystemReminder, colorAnsi, ); allLines.push(...blockLines); diff --git a/src/cli/helpers/backfill.ts b/src/cli/helpers/backfill.ts index 0f10b55..753fe23 100644 --- a/src/cli/helpers/backfill.ts +++ b/src/cli/helpers/backfill.ts @@ -7,6 +7,7 @@ import type { } from "@letta-ai/letta-client/resources/agents/messages"; import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "../../constants"; import type { Buffers } from "./accumulator"; +import { extractTaskNotificationsForDisplay } from "./taskNotifications"; /** * Extract displayable text from tool return content. @@ -178,9 +179,28 @@ export function backfillBuffers(buffers: Buffers, history: Message[]): void { // user message - content parts may include text and image parts case "user_message": { const rawText = renderUserContentParts(msg.content); + const { notifications, cleanedText } = + extractTaskNotificationsForDisplay(rawText); + + if (notifications.length > 0) { + let notifIndex = 0; + for (const summary of notifications) { + const notifId = `${lineId}-task-${notifIndex++}`; + const exists = buffers.byId.has(notifId); + buffers.byId.set(notifId, { + kind: "event", + id: notifId, + eventType: "task_notification", + eventData: {}, + phase: "finished", + summary, + }); + if (!exists) buffers.order.push(notifId); + } + } // Check if this is a compaction summary message (old format embedded in user_message) - const compactionSummary = extractCompactionSummary(rawText); + const compactionSummary = extractCompactionSummary(cleanedText); if (compactionSummary) { // Render as a finished compaction event const exists = buffers.byId.has(lineId); @@ -196,13 +216,15 @@ export function backfillBuffers(buffers: Buffers, history: Message[]): void { break; } - const exists = buffers.byId.has(lineId); - buffers.byId.set(lineId, { - kind: "user", - id: lineId, - text: rawText, - }); - if (!exists) buffers.order.push(lineId); + if (cleanedText) { + const exists = buffers.byId.has(lineId); + buffers.byId.set(lineId, { + kind: "user", + id: lineId, + text: cleanedText, + }); + if (!exists) buffers.order.push(lineId); + } break; } diff --git a/src/cli/helpers/messageQueueBridge.ts b/src/cli/helpers/messageQueueBridge.ts new file mode 100644 index 0000000..6d96166 --- /dev/null +++ b/src/cli/helpers/messageQueueBridge.ts @@ -0,0 +1,63 @@ +/** + * Message Queue Bridge + * + * Allows non-React code (like Task.ts) to add messages to the messageQueue. + * The queue adder function is set by App.tsx on mount. + * + * This enables background tasks to queue their notification XML directly + * into messageQueue, where the existing dequeue logic handles auto-firing. + */ + +export type QueuedMessage = { + kind: "user" | "task_notification"; + text: string; +}; + +type QueueAdder = (message: QueuedMessage) => void; + +let queueAdder: QueueAdder | null = null; +const pendingMessages: QueuedMessage[] = []; +const MAX_PENDING_MESSAGES = 10; + +/** + * Set the queue adder function. Called by App.tsx on mount. + */ +export function setMessageQueueAdder(fn: QueueAdder | null): void { + queueAdder = fn; + if (queueAdder && pendingMessages.length > 0) { + for (const message of pendingMessages) { + queueAdder(message); + } + pendingMessages.length = 0; + } +} + +/** + * Add a message to the messageQueue. + * Called from Task.ts when a background task completes. + * If queue adder not set (App not mounted), message is dropped. + */ +export function addToMessageQueue(message: QueuedMessage): void { + if (queueAdder) { + queueAdder(message); + return; + } + if (pendingMessages.length >= MAX_PENDING_MESSAGES) { + pendingMessages.shift(); + } + pendingMessages.push(message); +} + +/** + * Check if the queue bridge is connected. + */ +export function isQueueBridgeConnected(): boolean { + return queueAdder !== null; +} + +/** + * Clear any pending messages (for testing). + */ +export function clearPendingMessages(): void { + pendingMessages.length = 0; +} diff --git a/src/cli/helpers/queuedMessageParts.ts b/src/cli/helpers/queuedMessageParts.ts new file mode 100644 index 0000000..9a54a08 --- /dev/null +++ b/src/cli/helpers/queuedMessageParts.ts @@ -0,0 +1,44 @@ +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { QueuedMessage } from "./messageQueueBridge"; +import { buildMessageContentFromDisplay } from "./pasteRegistry"; +import { extractTaskNotificationsForDisplay } from "./taskNotifications"; + +export function getQueuedNotificationSummaries( + queued: QueuedMessage[], +): string[] { + const summaries: string[] = []; + for (const item of queued) { + if (item.kind !== "task_notification") continue; + const parsed = extractTaskNotificationsForDisplay(item.text); + summaries.push(...parsed.notifications); + } + return summaries; +} + +export function buildQueuedContentParts( + queued: QueuedMessage[], +): MessageCreate["content"] { + const parts: MessageCreate["content"] = []; + let isFirst = true; + for (const item of queued) { + if (!isFirst) { + parts.push({ type: "text", text: "\n" }); + } + isFirst = false; + if (item.kind === "task_notification") { + parts.push({ type: "text", text: item.text }); + continue; + } + const userParts = buildMessageContentFromDisplay(item.text); + parts.push(...userParts); + } + return parts; +} + +export function buildQueuedUserText(queued: QueuedMessage[]): string { + return queued + .filter((item) => item.kind === "user") + .map((item) => item.text) + .filter((text) => text.length > 0) + .join("\n"); +} diff --git a/src/cli/helpers/subagentAggregation.ts b/src/cli/helpers/subagentAggregation.ts index f122afd..8a98e79 100644 --- a/src/cli/helpers/subagentAggregation.ts +++ b/src/cli/helpers/subagentAggregation.ts @@ -5,7 +5,7 @@ import type { StaticSubagent } from "../components/SubagentGroupStatic.js"; import type { Line } from "./accumulator.js"; -import { getSubagentByToolCallId } from "./subagentState.js"; +import { getSubagentByToolCallId, getSubagents } from "./subagentState.js"; import { isTaskTool } from "./toolNameMapping.js"; /** @@ -31,16 +31,37 @@ export interface SubagentGroupItem { export function hasInProgressTaskToolCalls( order: string[], byId: Map, - emittedIds: Set, + _emittedIds: Set, ): boolean { + // If any foreground subagent is running, treat Task tools as in-progress. + // Background subagents shouldn't block grouping into the static area. + const hasForegroundRunning = getSubagents().some( + (agent) => + !agent.isBackground && + (agent.status === "pending" || agent.status === "running"), + ); + if (hasForegroundRunning) { + return true; + } + for (const id of order) { const ln = byId.get(id); if (!ln) continue; if (ln.kind === "tool_call" && isTaskTool(ln.name ?? "")) { - if (emittedIds.has(id)) continue; if (ln.phase !== "finished") { return true; } + if (ln.toolCallId) { + const subagent = getSubagentByToolCallId(ln.toolCallId); + if (subagent) { + if ( + !subagent.isBackground && + (subagent.status === "pending" || subagent.status === "running") + ) { + return true; + } + } + } } } return false; @@ -75,7 +96,13 @@ export function collectFinishedTaskToolCalls( ) { // Check if we have subagent data in the state store const subagent = getSubagentByToolCallId(ln.toolCallId); - if (subagent) { + if ( + subagent && + (subagent.status === "completed" || + subagent.status === "error" || + (subagent.isBackground && + (subagent.status === "pending" || subagent.status === "running"))) + ) { finished.push({ lineId: id, toolCallId: ln.toolCallId, @@ -103,12 +130,15 @@ export function createSubagentGroupItem( id: subagent.id, type: subagent.type, description: subagent.description, - status: subagent.status as "completed" | "error", + status: subagent.isBackground + ? "running" + : (subagent.status as "completed" | "error"), toolCount: subagent.toolCalls.length, totalTokens: subagent.totalTokens, agentURL: subagent.agentURL, error: subagent.error, model: subagent.model, + isBackground: subagent.isBackground, }); } } diff --git a/src/cli/helpers/subagentState.ts b/src/cli/helpers/subagentState.ts index 790d22a..82ce334 100644 --- a/src/cli/helpers/subagentState.ts +++ b/src/cli/helpers/subagentState.ts @@ -29,6 +29,7 @@ export interface SubagentState { model?: string; startTime: number; toolCallId?: string; // Links this subagent to its parent Task tool call + isBackground?: boolean; // True if running in background (fire-and-forget) } interface SubagentStore { @@ -106,6 +107,7 @@ export function registerSubagent( type: string, description: string, toolCallId?: string, + isBackground?: boolean, ): void { // Capitalize type for display (explore -> Explore) const displayType = type.charAt(0).toUpperCase() + type.slice(1); @@ -121,6 +123,7 @@ export function registerSubagent( durationMs: 0, startTime: Date.now(), toolCallId, + isBackground, }; store.agents.set(id, agent); diff --git a/src/cli/helpers/taskNotifications.ts b/src/cli/helpers/taskNotifications.ts new file mode 100644 index 0000000..b1dd0b8 --- /dev/null +++ b/src/cli/helpers/taskNotifications.ts @@ -0,0 +1,121 @@ +/** + * Task Notification Formatting + * + * Formats background task completion notifications as XML. + * The actual queueing is handled by messageQueueBridge.ts. + */ + +// ============================================================================ +// Types +// ============================================================================ + +export interface TaskNotification { + taskId: string; + status: "completed" | "failed"; + summary: string; + result: string; + outputFile: string; + usage?: { + totalTokens?: number; + toolUses?: number; + durationMs?: number; + }; +} + +// ============================================================================ +// XML Escaping +// ============================================================================ + +/** + * Escape special XML characters to prevent breaking the XML structure. + */ +function escapeXml(str: string): string { + return str.replace(/&/g, "&").replace(//g, ">"); +} + +function unescapeXml(str: string): string { + return str.replace(/</g, "<").replace(/>/g, ">").replace(/&/g, "&"); +} + +// ============================================================================ +// Public API +// ============================================================================ + +/** + * Format a single notification as XML string for queueing. + */ +export function formatTaskNotification(notification: TaskNotification): string { + // Escape summary and result to prevent XML injection + const escapedSummary = escapeXml(notification.summary); + const escapedResult = escapeXml(notification.result); + + const usageLines: string[] = []; + if (notification.usage?.totalTokens !== undefined) { + usageLines.push(`total_tokens: ${notification.usage.totalTokens}`); + } + if (notification.usage?.toolUses !== undefined) { + usageLines.push(`tool_uses: ${notification.usage.toolUses}`); + } + if (notification.usage?.durationMs !== undefined) { + usageLines.push(`duration_ms: ${notification.usage.durationMs}`); + } + const usageBlock = usageLines.length + ? `\n${usageLines.join("\n")}` + : ""; + + return ` +${notification.taskId} +${notification.status} +${escapedSummary} +${escapedResult}${usageBlock} + +Full transcript available at: ${notification.outputFile}`; +} + +export function extractTaskNotificationsForDisplay(message: string): { + notifications: string[]; + cleanedText: string; +} { + if (!message.includes("")) { + return { notifications: [], cleanedText: message }; + } + + const notificationRegex = + /[\s\S]*?(?:<\/task-notification>|$)(?:\s*Full transcript available at:[^\n]*\n?)?/g; + const notifications: string[] = []; + + let match: RegExpExecArray | null = notificationRegex.exec(message); + while (match !== null) { + const xml = match[0]; + const summaryMatch = xml.match(/([\s\S]*?)<\/summary>/); + const statusMatch = xml.match(/([\s\S]*?)<\/status>/); + const status = statusMatch?.[1]?.trim(); + let summary = summaryMatch?.[1]?.trim() || ""; + summary = unescapeXml(summary); + const display = summary || `Agent task ${status || "completed"}`; + notifications.push(display); + match = notificationRegex.exec(message); + } + + const cleanedText = message + .replace(notificationRegex, "") + .replace(/^\s*Full transcript available at:[^\n]*\n?/gm, "") + .replace(/\n{3,}/g, "\n\n") + .trim(); + + return { notifications, cleanedText }; +} + +/** + * Format multiple notifications as XML string. + * @deprecated Use formatTaskNotification and queue individually + */ +export function formatTaskNotifications( + notifications: TaskNotification[], +): string { + if (notifications.length === 0) { + return ""; + } + + return notifications.map(formatTaskNotification).join("\n\n"); +} diff --git a/src/headless.ts b/src/headless.ts index a14f355..1ef6dba 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -124,6 +124,7 @@ export async function handleHeadlessCommand( "no-skills": { type: "boolean" }, memfs: { type: "boolean" }, "no-memfs": { type: "boolean" }, + "max-turns": { type: "string" }, // Maximum number of agentic turns }, strict: false, allowPositionals: true, @@ -262,6 +263,20 @@ export async function handleHeadlessCommand( const memfsFlag = values.memfs as boolean | undefined; const noMemfsFlag = values["no-memfs"] as boolean | undefined; const fromAfFile = values["from-af"] as string | undefined; + const maxTurnsRaw = values["max-turns"] as string | undefined; + + // Parse and validate max-turns if provided + let maxTurns: number | undefined; + if (maxTurnsRaw !== undefined) { + const parsed = parseInt(maxTurnsRaw, 10); + if (Number.isNaN(parsed) || parsed <= 0) { + console.error( + `Error: --max-turns must be a positive integer, got: ${maxTurnsRaw}`, + ); + process.exit(1); + } + maxTurns = parsed; + } // Handle --conv {agent-id} shorthand: --conv agent-xyz → --agent agent-xyz --conv default if (specifiedConversationId?.startsWith("agent-")) { @@ -1005,7 +1020,11 @@ export async function handleHeadlessCommand( // Build message content with reminders (plan mode first, then skill unload) const { permissionMode } = await import("./permissions/mode"); const { hasLoadedSkills } = await import("./agent/context"); - let messageContent = ""; + const contentParts: MessageCreate["content"] = []; + const pushPart = (text: string) => { + if (!text) return; + contentParts.push({ type: "text", text }); + }; if (fromAgentId) { const senderAgentId = fromAgentId; @@ -1017,29 +1036,29 @@ If you need to share detailed information, include it in your response text. ${SYSTEM_REMINDER_CLOSE} `; - messageContent += systemReminder; + pushPart(systemReminder); } // Add plan mode reminder if in plan mode (highest priority) if (permissionMode.getMode() === "plan") { const { PLAN_MODE_REMINDER } = await import("./agent/promptAssets"); - messageContent += PLAN_MODE_REMINDER; + pushPart(PLAN_MODE_REMINDER); } // Add skill unload reminder if skills are loaded (using cached flag) if (hasLoadedSkills()) { const { SKILL_UNLOAD_REMINDER } = await import("./agent/promptAssets"); - messageContent += SKILL_UNLOAD_REMINDER; + pushPart(SKILL_UNLOAD_REMINDER); } // Add user prompt - messageContent += prompt; + pushPart(prompt); // Start with the user message let currentInput: Array = [ { role: "user", - content: [{ type: "text", text: messageContent }], + content: contentParts, }, ]; @@ -1047,12 +1066,35 @@ ${SYSTEM_REMINDER_CLOSE} let lastKnownRunId: string | null = null; let llmApiErrorRetries = 0; let conversationBusyRetries = 0; - markMilestone("HEADLESS_FIRST_STREAM_START"); measureSinceMilestone("headless-setup-total", "HEADLESS_CLIENT_READY"); + // Helper to check max turns limit using server-side step count from buffers + const checkMaxTurns = () => { + if (maxTurns !== undefined && buffers.usage.stepCount >= maxTurns) { + if (outputFormat === "stream-json") { + const errorMsg: ErrorMessage = { + type: "error", + message: `Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`, + stop_reason: "max_steps", + session_id: sessionId, + uuid: `error-max-turns-${crypto.randomUUID()}`, + }; + console.log(JSON.stringify(errorMsg)); + } else { + console.error( + `Maximum turns limit reached (${buffers.usage.stepCount}/${maxTurns} steps)`, + ); + } + process.exit(1); + } + }; + try { while (true) { + // Check max turns limit before starting a new turn (uses server-side step count) + checkMaxTurns(); + // Wrap sendMessageStream in try-catch to handle pre-stream errors (e.g., 409) let stream: Awaited>; try { @@ -1283,6 +1325,10 @@ ${SYSTEM_REMINDER_CLOSE} // Track API duration for this stream sessionStats.endTurn(apiDurationMs); + + // Check max turns after each turn (server may have taken multiple steps) + checkMaxTurns(); + if (approvalPendingRecovery) { await resolveAllPendingApprovals(); continue; diff --git a/src/tests/cli/queuedMessageParts.test.ts b/src/tests/cli/queuedMessageParts.test.ts new file mode 100644 index 0000000..03da7e0 --- /dev/null +++ b/src/tests/cli/queuedMessageParts.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, test } from "bun:test"; +import type { QueuedMessage } from "../../cli/helpers/messageQueueBridge"; +import { allocateImage } from "../../cli/helpers/pasteRegistry"; +import { + buildQueuedContentParts, + buildQueuedUserText, + getQueuedNotificationSummaries, +} from "../../cli/helpers/queuedMessageParts"; +import { formatTaskNotification } from "../../cli/helpers/taskNotifications"; + +describe("queuedMessageParts", () => { + test("buildQueuedUserText only concatenates user messages", () => { + const queued: QueuedMessage[] = [ + { kind: "user", text: "hello" }, + { + kind: "task_notification", + text: "Agent done", + }, + { kind: "user", text: "world" }, + ]; + + expect(buildQueuedUserText(queued)).toBe("hello\nworld"); + }); + + test("buildQueuedContentParts preserves boundaries and images", () => { + const imageId = allocateImage({ + data: "ZmFrZQ==", + mediaType: "image/png", + }); + const userText = `before [Image #${imageId}] after`; + const notificationXml = formatTaskNotification({ + taskId: "task_1", + status: "completed", + summary: 'Agent "Test" completed', + result: "Result line", + outputFile: "/tmp/task_1.log", + }); + + const queued: QueuedMessage[] = [ + { kind: "user", text: userText }, + { kind: "task_notification", text: notificationXml }, + { kind: "user", text: "second" }, + ]; + + const parts = buildQueuedContentParts(queued); + + expect(parts).toHaveLength(7); + expect(parts[0]).toEqual({ type: "text", text: "before " }); + expect(parts[1]).toEqual({ + type: "image", + source: { + type: "base64", + media_type: "image/png", + data: "ZmFrZQ==", + }, + }); + expect(parts[2]).toEqual({ type: "text", text: " after" }); + expect(parts[3]).toEqual({ type: "text", text: "\n" }); + expect(parts[4]).toEqual({ type: "text", text: notificationXml }); + expect(parts[5]).toEqual({ type: "text", text: "\n" }); + expect(parts[6]).toEqual({ type: "text", text: "second" }); + }); + + test("getQueuedNotificationSummaries extracts summaries", () => { + const notificationXml = formatTaskNotification({ + taskId: "task_2", + status: "completed", + summary: 'Agent "Explore" completed', + result: "Done", + outputFile: "/tmp/task_2.log", + }); + + const queued: QueuedMessage[] = [ + { kind: "user", text: "hi" }, + { kind: "task_notification", text: notificationXml }, + ]; + + expect(getQueuedNotificationSummaries(queued)).toEqual([ + 'Agent "Explore" completed', + ]); + }); +}); diff --git a/src/tests/cli/taskNotifications.test.ts b/src/tests/cli/taskNotifications.test.ts new file mode 100644 index 0000000..5d6283e --- /dev/null +++ b/src/tests/cli/taskNotifications.test.ts @@ -0,0 +1,216 @@ +import { beforeEach, describe, expect, test } from "bun:test"; +import { + addToMessageQueue, + clearPendingMessages, + isQueueBridgeConnected, + type QueuedMessage, + setMessageQueueAdder, +} from "../../cli/helpers/messageQueueBridge"; +import { + formatTaskNotification, + formatTaskNotifications, + type TaskNotification, +} from "../../cli/helpers/taskNotifications"; + +describe("taskNotifications", () => { + describe("formatTaskNotification", () => { + test("formats single notification correctly", () => { + const notification: TaskNotification = { + taskId: "task_1", + status: "completed", + summary: 'Agent "Find files" completed', + result: "Found 5 files in src/", + outputFile: "/tmp/task_1.log", + }; + + const formatted = formatTaskNotification(notification); + + expect(formatted).toContain(""); + expect(formatted).toContain("task_1"); + expect(formatted).toContain("completed"); + expect(formatted).toContain( + 'Agent "Find files" completed', + ); + expect(formatted).toContain("Found 5 files in src/"); + expect(formatted).toContain(""); + expect(formatted).toContain( + "Full transcript available at: /tmp/task_1.log", + ); + }); + + test("escapes XML special characters in summary", () => { + const notification: TaskNotification = { + taskId: "task_1", + status: "completed", + summary: 'Agent completed', + result: "Normal result", + outputFile: "/tmp/task_1.log", + }; + + const formatted = formatTaskNotification(notification); + + // Quotes don't need escaping in XML text content, only in attributes + expect(formatted).toContain('<script>alert("xss")</script>'); + expect(formatted).not.toContain("