From 462d14edce38daf386d255e41c761cea71b0b04e Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Thu, 26 Feb 2026 16:57:42 -0800 Subject: [PATCH] =?UTF-8?q?feat(queue):=20QueueRuntime=20TUI=20cutover=20?= =?UTF-8?q?=E2=80=94=20remove=20messageQueue=20array=20as=20source=20of=20?= =?UTF-8?q?truth=20(#1168)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Letta --- src/cli/App.tsx | 181 +++++++------- src/cli/helpers/queuedMessageParts.ts | 72 ++++++ src/tests/cli/queue-ordering-wiring.test.ts | 6 +- .../tui/tui-queue-coalescing-parity.test.ts | 234 ++++++++++++++++++ 4 files changed, 394 insertions(+), 99 deletions(-) create mode 100644 src/tests/tui/tui-queue-coalescing-parity.test.ts diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 6602693..21509a5 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -79,7 +79,11 @@ import { import type { ApprovalContext } from "../permissions/analyzer"; import { type PermissionMode, permissionMode } from "../permissions/mode"; import { OPENAI_CODEX_PROVIDER_NAME } from "../providers/openai-codex-provider"; -import { QueueRuntime } from "../queue/queueRuntime"; +import { + type MessageQueueItem, + QueueRuntime, + type TaskNotificationQueueItem, +} from "../queue/queueRuntime"; import { DEFAULT_COMPLETION_PROMISE, type RalphState, @@ -227,9 +231,11 @@ import { } from "./helpers/pasteRegistry"; import { generatePlanFilePath } from "./helpers/planName"; import { + buildContentFromQueueBatch, buildQueuedContentParts, buildQueuedUserText, getQueuedNotificationSummaries, + toQueuedMsg, } from "./helpers/queuedMessageParts"; import { resolveReasoningTabToggleCommand } from "./helpers/reasoningTabToggle"; import { safeJsonParseOr } from "./helpers/safeJsonParse"; @@ -1666,56 +1672,46 @@ export default function App({ const conversationBusyRetriesRef = useRef(0); // Message queue state for queueing messages during streaming - const [messageQueue, setMessageQueue] = useState([]); + const [queueDisplay, setQueueDisplay] = useState([]); - const messageQueueRef = useRef([]); // For synchronous access - useEffect(() => { - messageQueueRef.current = messageQueue; - }, [messageQueue]); - - // PRQ4: divergence check — runs after every messageQueue commit, by which time - // tuiQueueRef has already been updated (enqueue/consumeItems called synchronously - // before setMessageQueue). Warn-only, never throws. - useEffect(() => { - if ((tuiQueueRef.current?.length ?? 0) !== messageQueue.length) { - debugWarn( - "queue-lifecycle", - `drift: QueueRuntime.length=${tuiQueueRef.current?.length ?? 0} messageQueue.length=${messageQueue.length}`, - ); - } - }, [messageQueue]); - - // PRQ4: QueueRuntime mirror — parallel lifecycle tracking alongside existing queue. - // Callbacks emit to the debug log only (gated on LETTA_DEBUG=1). - // Does NOT drive submit decisions — existing messageQueue state remains authoritative. - // Lazy init: useRef(new QueueRuntime(...)) would allocate on every render - // (React ignores all but the first, but construction still runs). The ref is - // typed QueueRuntime | null; call sites use ?. so the type is enforced and a - // missed init would no-op rather than hide behind an unsafe cast. + // QueueRuntime — authoritative queue. maxItems: Infinity disables drop limits + // to match the previous unbounded array semantics. queueDisplay is a derived + // UI state maintained by the onEnqueued/onDequeued/onCleared callbacks. + // Lazy init pattern; typed QueueRuntime | null with ?. at all call sites. const tuiQueueRef = useRef(null); if (!tuiQueueRef.current) { tuiQueueRef.current = new QueueRuntime({ + maxItems: Infinity, callbacks: { - onEnqueued: (item, queueLen) => + onEnqueued: (item, queueLen) => { debugLog( "queue-lifecycle", `enqueued item_id=${item.id} kind=${item.kind} queue_len=${queueLen}`, - ), - onDequeued: (batch) => + ); + // queueDisplay is the single source for UI — updated only here. + if (item.kind === "message" || item.kind === "task_notification") { + setQueueDisplay((prev) => [...prev, toQueuedMsg(item)]); + } + }, + onDequeued: (batch) => { debugLog( "queue-lifecycle", `dequeued batch_id=${batch.batchId} merged_count=${batch.mergedCount} queue_len_after=${batch.queueLenAfter}`, - ), + ); + setQueueDisplay((prev) => prev.slice(batch.mergedCount)); + }, onBlocked: (reason, queueLen) => debugLog( "queue-lifecycle", `blocked reason=${reason} queue_len=${queueLen}`, ), - onCleared: (reason, clearedCount) => + onCleared: (_reason, _clearedCount) => { debugLog( "queue-lifecycle", - `cleared reason=${reason} cleared_count=${clearedCount}`, - ), + `cleared reason=${_reason} cleared_count=${_clearedCount}`, + ); + setQueueDisplay([]); + }, }, }); } @@ -1724,12 +1720,10 @@ export default function App({ const overrideContentPartsRef = useRef(null); // Set up message queue bridge for background tasks - // This allows non-React code (Task.ts) to add notifications to messageQueue + // This allows non-React code (Task.ts) to add notifications to queueDisplay useEffect(() => { - // Provide a queue adder that adds to messageQueue and bumps dequeueEpoch + // Enqueue via QueueRuntime — onEnqueued callback updates queueDisplay. setMessageQueueAdder((message: QueuedMessage) => { - setMessageQueue((q) => [...q, message]); - // PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking. tuiQueueRef.current?.enqueue( message.kind === "task_notification" ? ({ @@ -1814,15 +1808,20 @@ export default function App({ [], ); - // Consume queued messages for appending to tool results (clears queue) + // Consume queued messages for appending to tool results (clears queue). + // consumeItems fires onDequeued → setQueueDisplay(prev => prev.slice(n)) + // so no direct setQueueDisplay call is needed here. const consumeQueuedMessages = useCallback((): QueuedMessage[] | null => { - if (messageQueueRef.current.length === 0) return null; - const messages = [...messageQueueRef.current]; - // PRQ4: items are being submitted into the current turn, so fire onDequeued - // (not onCleared) to reflect actual consumption, not an error/cancel drop. - tuiQueueRef.current?.consumeItems(messages.length); - setMessageQueue([]); - return messages; + const len = tuiQueueRef.current?.length ?? 0; + if (len === 0) return null; + const batch = tuiQueueRef.current?.consumeItems(len); + if (!batch) return null; + return batch.items + .filter( + (item): item is MessageQueueItem | TaskNotificationQueueItem => + item.kind === "message" || item.kind === "task_notification", + ) + .map(toQueuedMsg); }, []); // Helper to wrap async handlers that need to close overlay and lock input @@ -5115,8 +5114,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error - tuiQueueRef.current?.clear("error"); // PRQ4 - setMessageQueue([]); + tuiQueueRef.current?.clear("error"); setStreaming(false); sendDesktopNotification("Stream error", "error"); // Notify user of error @@ -5220,8 +5218,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error - tuiQueueRef.current?.clear("error"); // PRQ4 - setMessageQueue([]); + tuiQueueRef.current?.clear("error"); setStreaming(false); sendDesktopNotification(); @@ -5252,8 +5249,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error - tuiQueueRef.current?.clear("error"); // PRQ4 - setMessageQueue([]); + tuiQueueRef.current?.clear("error"); setStreaming(false); sendDesktopNotification("Execution error", "error"); // Notify user of error @@ -5293,8 +5289,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error - tuiQueueRef.current?.clear("error"); // PRQ4 - setMessageQueue([]); + tuiQueueRef.current?.clear("error"); setStreaming(false); sendDesktopNotification("Processing error", "error"); // Notify user of error @@ -5311,7 +5306,7 @@ export default function App({ // won't re-run on its own — bump dequeueEpoch to force re-evaluation. // Only bump for normal completions — if stale (ESC was pressed), the user // cancelled and queued messages should NOT be auto-submitted. - if (!isStale && messageQueueRef.current.length > 0) { + if (!isStale && (tuiQueueRef.current?.length ?? 0) > 0) { setDequeueEpoch((e) => e + 1); } @@ -5369,9 +5364,7 @@ export default function App({ // Handler when user presses UP/ESC to load queue into input for editing const handleEnterQueueEditMode = useCallback(() => { - // PRQ4: items are discarded (user is editing them), not submitted. tuiQueueRef.current?.clear("stale_generation"); - setMessageQueue([]); }, []); // Handle paste errors (e.g., image too large) @@ -6434,10 +6427,10 @@ export default function App({ // If there are queued messages and agent is not busy, bump epoch to trigger // dequeue effect. Without this, the effect won't re-run because refs aren't // in its deps array (only state values are). - if (!isAgentBusy() && messageQueue.length > 0) { + if (!isAgentBusy() && (tuiQueueRef.current?.length ?? 0) > 0) { debugLog( "queue", - `Bumping dequeueEpoch: userCancelledRef was reset, ${messageQueue.length} message(s) queued, agent not busy`, + `Bumping dequeueEpoch: userCancelledRef was reset, ${tuiQueueRef.current?.length ?? 0} message(s) queued, agent not busy`, ); setDequeueEpoch((e) => e + 1); } @@ -6459,22 +6452,13 @@ export default function App({ isNonStateCommand(userTextForInput); if (isAgentBusy() && !shouldBypassQueue) { - setMessageQueue((prev) => { - const newQueue: QueuedMessage[] = [ - ...prev, - { kind: "user", text: msg }, - ]; - - // Regular messages: queue and wait for tool completion - - return newQueue; - }); - // PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking. + // Enqueue via QueueRuntime — onEnqueued callback updates queueDisplay. tuiQueueRef.current?.enqueue({ kind: "message", source: "user", content: msg, } as Parameters[0]); + setDequeueEpoch((e) => e + 1); return { submitted: true }; // Clears input } @@ -9091,7 +9075,7 @@ ${SYSTEM_REMINDER_CLOSE} // Combine reminders with content as separate text parts. // This preserves each reminder boundary in the API payload. - // Note: Task notifications now come through messageQueue directly (added by messageQueueBridge) + // Note: Task notifications now come through queueDisplay directly (added by messageQueueBridge) const reminderParts: Array<{ type: "text"; text: string }> = []; const pushReminder = (text: string) => { if (!text) return; @@ -9831,15 +9815,16 @@ ${SYSTEM_REMINDER_CLOSE} onSubmitRef.current = onSubmit; }, [onSubmit]); - // Process queued messages when streaming ends - // Task notifications are now added directly to messageQueue via messageQueueBridge + // Process queued messages when streaming ends. + // QueueRuntime is authoritative: consumeItems drives the dequeue and fires + // onDequeued → setQueueDisplay(prev => prev.slice(n)) to update the UI. + // dequeueEpoch is the sole re-trigger: bumped on every enqueue, turn + // completion (abortControllerRef clears), and cancel-reset. useEffect(() => { - // Reference dequeueEpoch to satisfy exhaustive-deps - it's used to force - // re-runs when userCancelledRef is reset (refs aren't in deps) - // Also triggers when task notifications are added to queue - void dequeueEpoch; + void dequeueEpoch; // explicit dep to satisfy exhaustive-deps lint - const hasAnythingQueued = messageQueue.length > 0; + const queueLen = tuiQueueRef.current?.length ?? 0; + const hasAnythingQueued = queueLen > 0; if ( !streaming && @@ -9853,28 +9838,33 @@ ${SYSTEM_REMINDER_CLOSE} !userCancelledRef.current && // Don't dequeue if user just cancelled !abortControllerRef.current // Don't dequeue while processConversation is still active ) { - // Concatenate all queued messages into one (better UX when user types multiple - // messages quickly - they get combined into one context for the agent) - // Task notifications are already in the queue as XML strings - const concatenatedMessage = messageQueue - .map((item) => item.text) + // consumeItems(n) fires onDequeued → setQueueDisplay(prev => prev.slice(n)). + const batch = tuiQueueRef.current?.consumeItems(queueLen); + if (!batch) return; + + // Build concatenated text for lastDequeuedMessageRef (error restoration). + const concatenatedMessage = batch.items + .map((item) => { + if (item.kind === "task_notification") return item.text; + if (item.kind === "message") { + return typeof item.content === "string" ? item.content : ""; + } + return ""; + }) + .filter((t) => t.length > 0) .join("\n"); - const queuedContentParts = buildQueuedContentParts(messageQueue); + + const queuedContentParts = buildContentFromQueueBatch(batch); debugLog( "queue", - `Dequeuing ${messageQueue.length} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`, + `Dequeuing ${batch.mergedCount} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`, ); - // Store the message before clearing queue - allows restoration on error + // Store before submit — allows restoration on error (ESC path). lastDequeuedMessageRef.current = concatenatedMessage; - // PRQ4: fire onDequeued before clearing state so QueueRuntime and - // messageQueue drop to 0 together (divergence check runs after commit). - tuiQueueRef.current?.consumeItems(messageQueue.length); - setMessageQueue([]); - // Submit the concatenated message using the normal submit flow - // This ensures all setup (reminders, UI updates, etc.) happens correctly + // Submit via normal flow — overrideContentPartsRef carries rich content parts. overrideContentPartsRef.current = queuedContentParts; onSubmitRef.current(concatenatedMessage); } else if (hasAnythingQueued) { @@ -9883,9 +9873,7 @@ ${SYSTEM_REMINDER_CLOSE} "queue", `Dequeue blocked: streaming=${streaming}, queuedOverlayAction=${!!queuedOverlayAction}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`, ); - // PRQ4: emit queue_blocked on first blocked-reason transition per reason. - // tryDequeue deduplicates via lastEmittedBlockedReason — fires onBlocked - // only when the reason changes, not on every effect re-run. + // Emit queue_blocked on blocked-reason transitions only (dedup via tryDequeue). const blockedReason = getTuiBlockedReason({ streaming, isExecutingTool, @@ -9903,13 +9891,12 @@ ${SYSTEM_REMINDER_CLOSE} } }, [ streaming, - messageQueue, pendingApprovals, commandRunning, isExecutingTool, anySelectorOpen, queuedOverlayAction, - dequeueEpoch, // Triggered when userCancelledRef is reset OR task notifications added + dequeueEpoch, // Triggered on every enqueue, turn completion, and cancel-reset ]); // Helper to send all approval results when done @@ -12474,7 +12461,7 @@ Plan file path: ${planFilePath}`; currentModel={currentModelDisplay} currentModelProvider={currentModelProvider} currentReasoningEffort={currentReasoningEffort} - messageQueue={messageQueue} + messageQueue={queueDisplay} onEnterQueueEditMode={handleEnterQueueEditMode} onEscapeCancel={ profileConfirmPending ? handleProfileEscapeCancel : undefined diff --git a/src/cli/helpers/queuedMessageParts.ts b/src/cli/helpers/queuedMessageParts.ts index 2ddf3f2..b989ff5 100644 --- a/src/cli/helpers/queuedMessageParts.ts +++ b/src/cli/helpers/queuedMessageParts.ts @@ -1,4 +1,9 @@ import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { + DequeuedBatch, + MessageQueueItem, + TaskNotificationQueueItem, +} from "../../queue/queueRuntime"; import { mergeQueuedTurnInput } from "../../queue/turnQueueRuntime"; import type { QueuedMessage } from "./messageQueueBridge"; import { buildMessageContentFromDisplay } from "./pasteRegistry"; @@ -42,3 +47,70 @@ export function buildQueuedUserText(queued: QueuedMessage[]): string { .filter((text) => text.length > 0) .join("\n"); } + +/** + * Convert a QueueItem (message or task_notification) to the QueuedMessage + * shape used by the TUI display state and callers of consumeQueuedMessages. + * + * In the TUI, MessageQueueItem.content is always a plain string (the display + * text from the input field). The fallback array-flatten path handles any + * future case where content arrives as content parts. + */ +export function toQueuedMsg( + item: MessageQueueItem | TaskNotificationQueueItem, +): QueuedMessage { + if (item.kind === "task_notification") { + return { kind: "task_notification", text: item.text }; + } + const text = + typeof item.content === "string" + ? item.content + : item.content + .filter((p): p is { type: "text"; text: string } => p.type === "text") + .map((p) => p.text) + .join(""); + return { kind: "user", text }; +} + +/** + * Build merged MessageCreate content from a DequeuedBatch. + * + * Produces identical output to buildQueuedContentParts() for equivalent + * inputs — this is enforced by the golden parity test. The difference is + * that the input is QueueItem[] (from QueueRuntime) instead of QueuedMessage[]. + * + * Only message and task_notification items contribute to the content batch; + * barrier items (approval_result, overlay_action) are skipped. + */ +export function buildContentFromQueueBatch( + batch: DequeuedBatch, +): MessageCreate["content"] { + const queueInput = batch.items + .filter( + (item): item is MessageQueueItem | TaskNotificationQueueItem => + item.kind === "message" || item.kind === "task_notification", + ) + .map((item) => + item.kind === "task_notification" + ? ({ kind: "task_notification", text: item.text } as const) + : ({ + kind: "user", + content: item.content, + } as const), + ); + + const merged = mergeQueuedTurnInput(queueInput, { + // For string content (common TUI case), apply paste-registry resolution + // exactly as buildQueuedContentParts does. For already-normalized content + // parts, pass through unchanged. + normalizeUserContent: (content) => + typeof content === "string" + ? buildMessageContentFromDisplay(content) + : content, + }); + + if (merged === null) { + return []; + } + return merged; +} diff --git a/src/tests/cli/queue-ordering-wiring.test.ts b/src/tests/cli/queue-ordering-wiring.test.ts index 6c653d3..d71765f 100644 --- a/src/tests/cli/queue-ordering-wiring.test.ts +++ b/src/tests/cli/queue-ordering-wiring.test.ts @@ -11,7 +11,7 @@ describe("queue ordering wiring", () => { test("dequeue effect keeps all sensitive safety gates", () => { const source = readAppSource(); const start = source.indexOf( - "// Process queued messages when streaming ends", + "// Process queued messages when streaming ends.", ); const end = source.indexOf( "// Helper to send all approval results when done", @@ -30,7 +30,9 @@ describe("queue ordering wiring", () => { expect(segment).toContain("!userCancelledRef.current"); expect(segment).toContain("!abortControllerRef.current"); expect(segment).toContain("queuedOverlayAction="); - expect(segment).toContain("setMessageQueue([]);"); + // Queue is now drained via QueueRuntime.consumeItems; setQueueDisplay is + // updated automatically via the onDequeued callback — no direct setState here. + expect(segment).toContain("tuiQueueRef.current?.consumeItems(queueLen)"); expect(segment).toContain("onSubmitRef.current(concatenatedMessage);"); expect(segment).toContain("queuedOverlayAction,"); }); diff --git a/src/tests/tui/tui-queue-coalescing-parity.test.ts b/src/tests/tui/tui-queue-coalescing-parity.test.ts new file mode 100644 index 0000000..817b5e8 --- /dev/null +++ b/src/tests/tui/tui-queue-coalescing-parity.test.ts @@ -0,0 +1,234 @@ +/** + * Golden parity test: buildContentFromQueueBatch (new QueueRuntime path) must + * produce identical output to buildQueuedContentParts (current messageQueue path) + * for the same logical input. + * + * This test must pass before any state is swapped in the PRQ4 cutover. + * If these two functions ever diverge, the cutover has introduced a regression. + */ + +import { describe, expect, test } from "bun:test"; +import type { QueuedMessage } from "../../cli/helpers/messageQueueBridge"; +import { + buildContentFromQueueBatch, + buildQueuedContentParts, +} from "../../cli/helpers/queuedMessageParts"; +import { QueueRuntime } from "../../queue/queueRuntime"; + +// ── Helpers ─────────────────────────────────────────────────────── + +/** Build a DequeuedBatch from a list of (kind, text) pairs via QueueRuntime. */ +function makeBatch( + items: Array<{ kind: "user" | "task_notification"; text: string }>, +) { + const q = new QueueRuntime({ maxItems: Infinity }); + for (const item of items) { + if (item.kind === "task_notification") { + q.enqueue({ + kind: "task_notification", + source: "task_notification", + text: item.text, + } as Parameters[0]); + } else { + q.enqueue({ + kind: "message", + source: "user", + content: item.text, + } as Parameters[0]); + } + } + const batch = q.consumeItems(items.length); + if (!batch) throw new Error("consumeItems returned null for non-empty queue"); + return batch; +} + +/** Build the QueuedMessage[] equivalent for the old path. */ +function makeQueued( + items: Array<{ kind: "user" | "task_notification"; text: string }>, +): QueuedMessage[] { + return items.map((item) => ({ + kind: item.kind, + text: item.text, + })); +} + +// ── Fixtures ────────────────────────────────────────────────────── + +const SINGLE_USER = [{ kind: "user" as const, text: "hello world" }]; + +const SINGLE_NOTIF = [ + { + kind: "task_notification" as const, + text: "done", + }, +]; + +const USER_THEN_NOTIF = [ + { kind: "user" as const, text: "first message" }, + { + kind: "task_notification" as const, + text: "bg task done", + }, +]; + +const NOTIF_THEN_USER = [ + { + kind: "task_notification" as const, + text: "prelude", + }, + { kind: "user" as const, text: "follow-up" }, +]; + +const THREE_ITEMS = [ + { kind: "user" as const, text: "msg one" }, + { + kind: "task_notification" as const, + text: "mid notif", + }, + { kind: "user" as const, text: "msg three" }, +]; + +const MULTILINE_USER = [ + { kind: "user" as const, text: "line one\nline two\nline three" }, +]; + +// Intentionally unused — documents the empty-batch case tested inline below +const _EMPTY: Array<{ kind: "user" | "task_notification"; text: string }> = []; + +// ── Tests ───────────────────────────────────────────────────────── + +describe("buildContentFromQueueBatch parity with buildQueuedContentParts", () => { + test("single user message", () => { + const batch = makeBatch(SINGLE_USER); + const queued = makeQueued(SINGLE_USER); + expect(buildContentFromQueueBatch(batch)).toEqual( + buildQueuedContentParts(queued), + ); + }); + + test("single task_notification", () => { + const batch = makeBatch(SINGLE_NOTIF); + const queued = makeQueued(SINGLE_NOTIF); + expect(buildContentFromQueueBatch(batch)).toEqual( + buildQueuedContentParts(queued), + ); + }); + + test("user then task_notification (coalesced batch)", () => { + const batch = makeBatch(USER_THEN_NOTIF); + const queued = makeQueued(USER_THEN_NOTIF); + expect(buildContentFromQueueBatch(batch)).toEqual( + buildQueuedContentParts(queued), + ); + }); + + test("task_notification then user (reverse order)", () => { + const batch = makeBatch(NOTIF_THEN_USER); + const queued = makeQueued(NOTIF_THEN_USER); + expect(buildContentFromQueueBatch(batch)).toEqual( + buildQueuedContentParts(queued), + ); + }); + + test("three items: user + notif + user", () => { + const batch = makeBatch(THREE_ITEMS); + const queued = makeQueued(THREE_ITEMS); + expect(buildContentFromQueueBatch(batch)).toEqual( + buildQueuedContentParts(queued), + ); + }); + + test("multiline user message", () => { + const batch = makeBatch(MULTILINE_USER); + const queued = makeQueued(MULTILINE_USER); + expect(buildContentFromQueueBatch(batch)).toEqual( + buildQueuedContentParts(queued), + ); + }); + + test("empty batch returns []", () => { + // Empty queue: consumeItems returns null, so test the null→[] path directly + const q = new QueueRuntime({ maxItems: Infinity }); + q.enqueue({ kind: "message", source: "user", content: "x" } as Parameters< + typeof q.enqueue + >[0]); + const batch = q.consumeItems(1); + if (!batch) throw new Error("expected non-null batch"); + // Override items to empty to test the null-merged → [] return + const emptyBatch = { ...batch, items: [] }; + expect(buildContentFromQueueBatch(emptyBatch)).toEqual( + buildQueuedContentParts([]), + ); + }); + + test("output is non-empty array for non-empty input", () => { + const batch = makeBatch(SINGLE_USER); + const result = buildContentFromQueueBatch(batch); + expect(Array.isArray(result)).toBe(true); + expect((result as unknown[]).length).toBeGreaterThan(0); + }); + + test("separator \\n between items matches old path", () => { + const batch = makeBatch(USER_THEN_NOTIF); + const queued = makeQueued(USER_THEN_NOTIF); + const newResult = buildContentFromQueueBatch(batch); + const oldResult = buildQueuedContentParts(queued); + // Both should have a text separator part between the two items + expect(newResult).toEqual(oldResult); + // Verify separator is present (text part with \n between items) + const parts = newResult as Array<{ type: string; text?: string }>; + const sepIdx = parts.findIndex((p) => p.type === "text" && p.text === "\n"); + expect(sepIdx).toBeGreaterThan(0); + }); +}); + +describe("toQueuedMsg", () => { + // Imported lazily here to keep test readable + test("user message with string content round-trips to QueuedMessage", async () => { + const { toQueuedMsg } = await import( + "../../cli/helpers/queuedMessageParts" + ); + const item = { + id: "item-1", + kind: "message" as const, + source: "user" as const, + content: "hello", + enqueuedAt: 0, + }; + expect(toQueuedMsg(item)).toEqual({ kind: "user", text: "hello" }); + }); + + test("task_notification round-trips to QueuedMessage", async () => { + const { toQueuedMsg } = await import( + "../../cli/helpers/queuedMessageParts" + ); + const item = { + id: "item-2", + kind: "task_notification" as const, + source: "task_notification" as const, + text: "done", + enqueuedAt: 0, + }; + expect(toQueuedMsg(item)).toEqual({ + kind: "task_notification", + text: "done", + }); + }); + + test("user message with content parts extracts text parts", async () => { + const { toQueuedMsg } = await import( + "../../cli/helpers/queuedMessageParts" + ); + const item = { + id: "item-3", + kind: "message" as const, + source: "user" as const, + content: [ + { type: "text" as const, text: "hello " }, + { type: "text" as const, text: "world" }, + ], + enqueuedAt: 0, + }; + expect(toQueuedMsg(item)).toEqual({ kind: "user", text: "hello world" }); + }); +});