diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 9789ec2..a9f3f7b 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -79,6 +79,7 @@ import { import type { ApprovalContext } from "../permissions/analyzer"; import { type PermissionMode, permissionMode } from "../permissions/mode"; import { OPENAI_CODEX_PROVIDER_NAME } from "../providers/openai-codex-provider"; +import { QueueRuntime } from "../queue/queueRuntime"; import { DEFAULT_COMPLETION_PROMISE, type RalphState, @@ -273,6 +274,7 @@ import { alwaysRequiresUserInput, isTaskTool, } from "./helpers/toolNameMapping.js"; +import { getTuiBlockedReason } from "./helpers/tuiQueueAdapter"; import { useConfigurableStatusLine } from "./hooks/useConfigurableStatusLine"; import { useSuspend } from "./hooks/useSuspend/useSuspend.ts"; import { useSyncedState } from "./hooks/useSyncedState"; @@ -1662,6 +1664,53 @@ export default function App({ messageQueueRef.current = messageQueue; }, [messageQueue]); + // PRQ4: divergence check — runs after every messageQueue commit, by which time + // tuiQueueRef has already been updated (enqueue/consumeItems called synchronously + // before setMessageQueue). Warn-only, never throws. + useEffect(() => { + if ((tuiQueueRef.current?.length ?? 0) !== messageQueue.length) { + debugWarn( + "queue-lifecycle", + `drift: QueueRuntime.length=${tuiQueueRef.current?.length ?? 0} messageQueue.length=${messageQueue.length}`, + ); + } + }, [messageQueue]); + + // PRQ4: QueueRuntime mirror — parallel lifecycle tracking alongside existing queue. + // Callbacks emit to the debug log only (gated on LETTA_DEBUG=1). + // Does NOT drive submit decisions — existing messageQueue state remains authoritative. + // Lazy init: useRef(new QueueRuntime(...)) would allocate on every render + // (React ignores all but the first, but construction still runs). The ref is + // typed QueueRuntime | null; call sites use ?. so the type is enforced and a + // missed init would no-op rather than hide behind an unsafe cast. + const tuiQueueRef = useRef(null); + if (!tuiQueueRef.current) { + tuiQueueRef.current = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => + debugLog( + "queue-lifecycle", + `enqueued item_id=${item.id} kind=${item.kind} queue_len=${queueLen}`, + ), + onDequeued: (batch) => + debugLog( + "queue-lifecycle", + `dequeued batch_id=${batch.batchId} merged_count=${batch.mergedCount} queue_len_after=${batch.queueLenAfter}`, + ), + onBlocked: (reason, queueLen) => + debugLog( + "queue-lifecycle", + `blocked reason=${reason} queue_len=${queueLen}`, + ), + onCleared: (reason, clearedCount) => + debugLog( + "queue-lifecycle", + `cleared reason=${reason} cleared_count=${clearedCount}`, + ), + }, + }); + } + // Override content parts for queued submissions (to preserve part boundaries) const overrideContentPartsRef = useRef(null); @@ -1671,6 +1720,20 @@ export default function App({ // Provide a queue adder that adds to messageQueue and bumps dequeueEpoch setMessageQueueAdder((message: QueuedMessage) => { setMessageQueue((q) => [...q, message]); + // PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking. + tuiQueueRef.current?.enqueue( + message.kind === "task_notification" + ? ({ + kind: "task_notification", + source: "task_notification", + text: message.text, + } as Parameters[0]) + : ({ + kind: "message", + source: "user", + content: message.text, + } as Parameters[0]), + ); setDequeueEpoch((e) => e + 1); }); return () => setMessageQueueAdder(null); @@ -1746,6 +1809,9 @@ export default function App({ const consumeQueuedMessages = useCallback((): QueuedMessage[] | null => { if (messageQueueRef.current.length === 0) return null; const messages = [...messageQueueRef.current]; + // PRQ4: items are being submitted into the current turn, so fire onDequeued + // (not onCleared) to reflect actual consumption, not an error/cancel drop. + tuiQueueRef.current?.consumeItems(messages.length); setMessageQueue([]); return messages; }, []); @@ -5040,6 +5106,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error + tuiQueueRef.current?.clear("error"); // PRQ4 setMessageQueue([]); setStreaming(false); @@ -5144,6 +5211,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error + tuiQueueRef.current?.clear("error"); // PRQ4 setMessageQueue([]); setStreaming(false); @@ -5175,6 +5243,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error + tuiQueueRef.current?.clear("error"); // PRQ4 setMessageQueue([]); setStreaming(false); @@ -5215,6 +5284,7 @@ export default function App({ lastDequeuedMessageRef.current = null; } // Clear any remaining queue on error + tuiQueueRef.current?.clear("error"); // PRQ4 setMessageQueue([]); setStreaming(false); @@ -5290,6 +5360,8 @@ export default function App({ // Handler when user presses UP/ESC to load queue into input for editing const handleEnterQueueEditMode = useCallback(() => { + // PRQ4: items are discarded (user is editing them), not submitted. + tuiQueueRef.current?.clear("stale_generation"); setMessageQueue([]); }, []); @@ -6388,6 +6460,12 @@ export default function App({ return newQueue; }); + // PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking. + tuiQueueRef.current?.enqueue({ + kind: "message", + source: "user", + content: msg, + } as Parameters[0]); return { submitted: true }; // Clears input } @@ -9735,6 +9813,9 @@ ${SYSTEM_REMINDER_CLOSE} // Store the message before clearing queue - allows restoration on error lastDequeuedMessageRef.current = concatenatedMessage; + // PRQ4: fire onDequeued before clearing state so QueueRuntime and + // messageQueue drop to 0 together (divergence check runs after commit). + tuiQueueRef.current?.consumeItems(messageQueue.length); setMessageQueue([]); // Submit the concatenated message using the normal submit flow @@ -9747,6 +9828,23 @@ ${SYSTEM_REMINDER_CLOSE} "queue", `Dequeue blocked: streaming=${streaming}, queuedOverlayAction=${!!queuedOverlayAction}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`, ); + // PRQ4: emit queue_blocked on first blocked-reason transition per reason. + // tryDequeue deduplicates via lastEmittedBlockedReason — fires onBlocked + // only when the reason changes, not on every effect re-run. + const blockedReason = getTuiBlockedReason({ + streaming, + isExecutingTool, + commandRunning, + pendingApprovalsLen: pendingApprovals.length, + queuedOverlayAction: !!queuedOverlayAction, + anySelectorOpen, + waitingForQueueCancel: waitingForQueueCancelRef.current, + userCancelled: userCancelledRef.current, + abortControllerActive: !!abortControllerRef.current, + }); + if (blockedReason) { + tuiQueueRef.current?.tryDequeue(blockedReason); + } } }, [ streaming, diff --git a/src/cli/helpers/tuiQueueAdapter.ts b/src/cli/helpers/tuiQueueAdapter.ts new file mode 100644 index 0000000..4fe1fae --- /dev/null +++ b/src/cli/helpers/tuiQueueAdapter.ts @@ -0,0 +1,38 @@ +/** + * Helpers for the PRQ4 TUI QueueRuntime mirror. + * + * These are extracted as pure functions so they are independently unit-testable + * without importing React or App.tsx. + */ + +import type { QueueBlockedReason } from "../../types/protocol"; + +export type TuiQueueGatingConditions = { + streaming: boolean; + isExecutingTool: boolean; + commandRunning: boolean; + pendingApprovalsLen: number; + queuedOverlayAction: boolean; + anySelectorOpen: boolean; + waitingForQueueCancel: boolean; + userCancelled: boolean; + abortControllerActive: boolean; +}; + +/** + * Map the TUI dequeue gating conditions to a QueueBlockedReason. + * Priority order matches the plan — first match wins. + * Returns null when all conditions are clear (dequeue should proceed). + */ +export function getTuiBlockedReason( + c: TuiQueueGatingConditions, +): QueueBlockedReason | null { + if (c.waitingForQueueCancel || c.userCancelled) + return "interrupt_in_progress"; + if (c.pendingApprovalsLen > 0) return "pending_approvals"; + if (c.queuedOverlayAction || c.anySelectorOpen) return "overlay_open"; + if (c.commandRunning) return "command_running"; + if (c.streaming || c.isExecutingTool || c.abortControllerActive) + return "streaming"; + return null; +} diff --git a/src/tests/tui/tui-queue-lifecycle.test.ts b/src/tests/tui/tui-queue-lifecycle.test.ts new file mode 100644 index 0000000..3b1656c --- /dev/null +++ b/src/tests/tui/tui-queue-lifecycle.test.ts @@ -0,0 +1,239 @@ +/** + * Integration tests for PRQ4: TUI QueueRuntime mirror lifecycle events. + * + * Drives QueueRuntime directly using the same patterns as the App.tsx dual-path, + * without requiring React or a TUI instance. Each test mirrors one App.tsx code path. + * + * Invariants verified: + * - Idle submit: enqueue → consumeItems → onEnqueued + onDequeued, no onBlocked + * - Busy submit: enqueue while blocked → onBlocked once; unblock → consumeItems → onDequeued + * - Coalesced batch: N enqueues → consumeItems(N) → mergedCount=N + * - No double-blocked: tryDequeue same reason N times → onBlocked fires once + * - Priority: interrupt_in_progress emitted when streaming also active + * - Approval-append (consumeQueuedMessages pattern): consumeItems fires onDequeued, not onCleared + * - Queue edit clear (handleEnterQueueEditMode): clear("stale_generation") → onCleared + * - Error clear: clear("error") → onCleared + * - Divergence: consumeItems(undercount) leaves length mismatch + * - Blocked then cleared: both events fire, queue empty + */ + +import { describe, expect, test } from "bun:test"; +import { getTuiBlockedReason } from "../../cli/helpers/tuiQueueAdapter"; +import type { + DequeuedBatch, + QueueBlockedReason, + QueueClearedReason, + QueueItem, +} from "../../queue/queueRuntime"; +import { QueueRuntime } from "../../queue/queueRuntime"; + +// ── Helpers ─────────────────────────────────────────────────────── + +type Recorded = { + enqueued: Array<{ item: QueueItem; queueLen: number }>; + dequeued: DequeuedBatch[]; + blocked: Array<{ reason: QueueBlockedReason; queueLen: number }>; + cleared: Array<{ reason: QueueClearedReason; count: number }>; +}; + +function buildRuntime(): { q: QueueRuntime; rec: Recorded } { + const rec: Recorded = { + enqueued: [], + dequeued: [], + blocked: [], + cleared: [], + }; + const q = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => rec.enqueued.push({ item, queueLen }), + onDequeued: (batch) => rec.dequeued.push(batch), + onBlocked: (reason, queueLen) => rec.blocked.push({ reason, queueLen }), + onCleared: (reason, count) => rec.cleared.push({ reason, count }), + }, + }); + return { q, rec }; +} + +function enqueueUserMsg(q: QueueRuntime, text = "hello"): void { + q.enqueue({ + kind: "message", + source: "user", + content: text, + } as Parameters[0]); +} + +function enqueueTaskNotif(q: QueueRuntime, text = ""): void { + q.enqueue({ + kind: "task_notification", + source: "task_notification", + text, + } as Parameters[0]); +} + +// ── Tests ───────────────────────────────────────────────────────── + +describe("idle submit — single message", () => { + test("enqueued then consumeItems(1) fires onEnqueued + onDequeued, no blocked", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q); + expect(rec.enqueued).toHaveLength(1); + expect(rec.blocked).toHaveLength(0); + + q.consumeItems(1); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(1); + expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0); + expect(q.length).toBe(0); + }); +}); + +describe("busy submit — blocked on streaming", () => { + test("blocked fires on first tryDequeue; consumeItems fires dequeued", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q); + + const reason = getTuiBlockedReason({ + streaming: true, + isExecutingTool: false, + commandRunning: false, + pendingApprovalsLen: 0, + queuedOverlayAction: false, + anySelectorOpen: false, + waitingForQueueCancel: false, + userCancelled: false, + abortControllerActive: false, + }); + expect(reason).not.toBeNull(); + q.tryDequeue(reason as NonNullable); + expect(rec.blocked).toHaveLength(1); + expect(rec.blocked.at(0)?.reason).toBe("streaming"); + expect(rec.blocked.at(0)?.queueLen).toBe(1); + + // Stream ends: consumeItems fires dequeued + q.consumeItems(1); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(1); + }); +}); + +describe("coalesced batch", () => { + test("two enqueues then consumeItems(2) → mergedCount=2", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q, "first"); + enqueueTaskNotif(q, ""); + q.consumeItems(2); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(2); + expect(rec.dequeued.at(0)?.items.at(0)?.kind).toBe("message"); + expect(rec.dequeued.at(0)?.items.at(1)?.kind).toBe("task_notification"); + expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0); + }); +}); + +describe("no double-blocked — QueueRuntime dedup", () => { + test("tryDequeue same reason 3× → onBlocked fires once", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q); + q.tryDequeue("streaming"); + q.tryDequeue("streaming"); + q.tryDequeue("streaming"); + expect(rec.blocked).toHaveLength(1); + }); + + test("reason change re-fires onBlocked", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q); + q.tryDequeue("streaming"); + q.tryDequeue("pending_approvals"); // reason changed → fires again + expect(rec.blocked).toHaveLength(2); + expect(rec.blocked.at(1)?.reason).toBe("pending_approvals"); + }); +}); + +describe("priority: interrupt_in_progress beats streaming", () => { + test("getTuiBlockedReason returns interrupt_in_progress when streaming also true", () => { + const reason = getTuiBlockedReason({ + streaming: true, + isExecutingTool: false, + commandRunning: false, + pendingApprovalsLen: 0, + queuedOverlayAction: false, + anySelectorOpen: false, + waitingForQueueCancel: false, + userCancelled: true, // interrupt_in_progress + abortControllerActive: false, + }); + const { q, rec } = buildRuntime(); + enqueueUserMsg(q); + expect(reason).not.toBeNull(); + q.tryDequeue(reason as NonNullable); + expect(rec.blocked.at(0)?.reason).toBe("interrupt_in_progress"); + }); +}); + +describe("approval-append path (consumeQueuedMessages mirror)", () => { + test("consumeItems(n) fires onDequeued — items are submitted, not dropped", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q, "queued during approval"); + enqueueTaskNotif(q, ""); + + // Mirror consumeQueuedMessages: messages.length = 2 + q.consumeItems(2); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(2); + expect(rec.cleared).toHaveLength(0); // NOT a clear + }); +}); + +describe("queue edit clear (handleEnterQueueEditMode)", () => { + test("clear('stale_generation') fires onCleared, queue empty", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q, "pending message"); + enqueueUserMsg(q, "another"); + q.clear("stale_generation"); + expect(rec.cleared).toHaveLength(1); + expect(rec.cleared.at(0)?.reason).toBe("stale_generation"); + expect(rec.cleared.at(0)?.count).toBe(2); + expect(q.length).toBe(0); + expect(rec.dequeued).toHaveLength(0); // not a dequeue + }); +}); + +describe("error clear", () => { + test("clear('error') fires onCleared with correct count", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q, "pending"); + q.clear("error"); + expect(rec.cleared.at(0)?.reason).toBe("error"); + expect(rec.cleared.at(0)?.count).toBe(1); + expect(q.length).toBe(0); + }); + + test("clear('error') on empty queue fires with count=0", () => { + const { q, rec } = buildRuntime(); + q.clear("error"); + expect(rec.cleared.at(0)?.count).toBe(0); + }); +}); + +describe("divergence scenario — consumeItems undercount", () => { + test("consumeItems(1) on 2-item queue leaves length=1 (detectable mismatch)", () => { + const { q } = buildRuntime(); + enqueueUserMsg(q, "first"); + enqueueUserMsg(q, "second"); + q.consumeItems(1); // undercount — simulates drift + expect(q.length).toBe(1); // mismatch vs expected messageQueue.length=0 + }); +}); + +describe("blocked then cleared", () => { + test("both onBlocked and onCleared fire; queue ends empty", () => { + const { q, rec } = buildRuntime(); + enqueueUserMsg(q); + q.tryDequeue("streaming"); // fires onBlocked + q.clear("error"); // fires onCleared + expect(rec.blocked).toHaveLength(1); + expect(rec.cleared).toHaveLength(1); + expect(q.length).toBe(0); + }); +}); diff --git a/src/tests/tui/tuiQueueAdapter.test.ts b/src/tests/tui/tuiQueueAdapter.test.ts new file mode 100644 index 0000000..e8b5626 --- /dev/null +++ b/src/tests/tui/tuiQueueAdapter.test.ts @@ -0,0 +1,149 @@ +/** + * Unit tests for getTuiBlockedReason() in tuiQueueAdapter.ts. + */ + +import { describe, expect, test } from "bun:test"; +import { + getTuiBlockedReason, + type TuiQueueGatingConditions, +} from "../../cli/helpers/tuiQueueAdapter"; + +const allClear: TuiQueueGatingConditions = { + streaming: false, + isExecutingTool: false, + commandRunning: false, + pendingApprovalsLen: 0, + queuedOverlayAction: false, + anySelectorOpen: false, + waitingForQueueCancel: false, + userCancelled: false, + abortControllerActive: false, +}; + +describe("getTuiBlockedReason", () => { + test("returns null when all conditions clear", () => { + expect(getTuiBlockedReason(allClear)).toBeNull(); + }); + + test("streaming → 'streaming'", () => { + expect(getTuiBlockedReason({ ...allClear, streaming: true })).toBe( + "streaming", + ); + }); + + test("isExecutingTool → 'streaming'", () => { + expect(getTuiBlockedReason({ ...allClear, isExecutingTool: true })).toBe( + "streaming", + ); + }); + + test("abortControllerActive → 'streaming'", () => { + expect( + getTuiBlockedReason({ ...allClear, abortControllerActive: true }), + ).toBe("streaming"); + }); + + test("commandRunning → 'command_running'", () => { + expect(getTuiBlockedReason({ ...allClear, commandRunning: true })).toBe( + "command_running", + ); + }); + + test("pendingApprovalsLen > 0 → 'pending_approvals'", () => { + expect(getTuiBlockedReason({ ...allClear, pendingApprovalsLen: 3 })).toBe( + "pending_approvals", + ); + }); + + test("queuedOverlayAction → 'overlay_open'", () => { + expect( + getTuiBlockedReason({ ...allClear, queuedOverlayAction: true }), + ).toBe("overlay_open"); + }); + + test("anySelectorOpen → 'overlay_open'", () => { + expect(getTuiBlockedReason({ ...allClear, anySelectorOpen: true })).toBe( + "overlay_open", + ); + }); + + test("waitingForQueueCancel → 'interrupt_in_progress'", () => { + expect( + getTuiBlockedReason({ ...allClear, waitingForQueueCancel: true }), + ).toBe("interrupt_in_progress"); + }); + + test("userCancelled → 'interrupt_in_progress'", () => { + expect(getTuiBlockedReason({ ...allClear, userCancelled: true })).toBe( + "interrupt_in_progress", + ); + }); + + describe("priority order (first match wins)", () => { + test("interrupt_in_progress beats streaming", () => { + expect( + getTuiBlockedReason({ + ...allClear, + streaming: true, + userCancelled: true, + }), + ).toBe("interrupt_in_progress"); + }); + + test("interrupt_in_progress beats pending_approvals", () => { + expect( + getTuiBlockedReason({ + ...allClear, + pendingApprovalsLen: 2, + waitingForQueueCancel: true, + }), + ).toBe("interrupt_in_progress"); + }); + + test("pending_approvals beats overlay_open", () => { + expect( + getTuiBlockedReason({ + ...allClear, + pendingApprovalsLen: 1, + anySelectorOpen: true, + }), + ).toBe("pending_approvals"); + }); + + test("overlay_open beats command_running", () => { + expect( + getTuiBlockedReason({ + ...allClear, + commandRunning: true, + queuedOverlayAction: true, + }), + ).toBe("overlay_open"); + }); + + test("command_running beats streaming", () => { + expect( + getTuiBlockedReason({ + ...allClear, + streaming: true, + commandRunning: true, + }), + ).toBe("command_running"); + }); + + test("all conditions active → interrupt_in_progress (highest priority)", () => { + expect( + getTuiBlockedReason({ + streaming: true, + isExecutingTool: true, + commandRunning: true, + pendingApprovalsLen: 1, + queuedOverlayAction: true, + anySelectorOpen: true, + waitingForQueueCancel: true, + userCancelled: true, + abortControllerActive: true, + }), + ).toBe("interrupt_in_progress"); + }); + }); +});