feat: queue tui integration (#1164)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-26 11:34:37 -08:00
committed by GitHub
parent 908820c44a
commit 18fa693c9b
4 changed files with 524 additions and 0 deletions

View File

@@ -79,6 +79,7 @@ import {
import type { ApprovalContext } from "../permissions/analyzer"; import type { ApprovalContext } from "../permissions/analyzer";
import { type PermissionMode, permissionMode } from "../permissions/mode"; import { type PermissionMode, permissionMode } from "../permissions/mode";
import { OPENAI_CODEX_PROVIDER_NAME } from "../providers/openai-codex-provider"; import { OPENAI_CODEX_PROVIDER_NAME } from "../providers/openai-codex-provider";
import { QueueRuntime } from "../queue/queueRuntime";
import { import {
DEFAULT_COMPLETION_PROMISE, DEFAULT_COMPLETION_PROMISE,
type RalphState, type RalphState,
@@ -273,6 +274,7 @@ import {
alwaysRequiresUserInput, alwaysRequiresUserInput,
isTaskTool, isTaskTool,
} from "./helpers/toolNameMapping.js"; } from "./helpers/toolNameMapping.js";
import { getTuiBlockedReason } from "./helpers/tuiQueueAdapter";
import { useConfigurableStatusLine } from "./hooks/useConfigurableStatusLine"; import { useConfigurableStatusLine } from "./hooks/useConfigurableStatusLine";
import { useSuspend } from "./hooks/useSuspend/useSuspend.ts"; import { useSuspend } from "./hooks/useSuspend/useSuspend.ts";
import { useSyncedState } from "./hooks/useSyncedState"; import { useSyncedState } from "./hooks/useSyncedState";
@@ -1662,6 +1664,53 @@ export default function App({
messageQueueRef.current = messageQueue; messageQueueRef.current = messageQueue;
}, [messageQueue]); }, [messageQueue]);
// PRQ4: divergence check — runs after every messageQueue commit, by which time
// tuiQueueRef has already been updated (enqueue/consumeItems called synchronously
// before setMessageQueue). Warn-only, never throws.
useEffect(() => {
if ((tuiQueueRef.current?.length ?? 0) !== messageQueue.length) {
debugWarn(
"queue-lifecycle",
`drift: QueueRuntime.length=${tuiQueueRef.current?.length ?? 0} messageQueue.length=${messageQueue.length}`,
);
}
}, [messageQueue]);
// PRQ4: QueueRuntime mirror — parallel lifecycle tracking alongside existing queue.
// Callbacks emit to the debug log only (gated on LETTA_DEBUG=1).
// Does NOT drive submit decisions — existing messageQueue state remains authoritative.
// Lazy init: useRef(new QueueRuntime(...)) would allocate on every render
// (React ignores all but the first, but construction still runs). The ref is
// typed QueueRuntime | null; call sites use ?. so the type is enforced and a
// missed init would no-op rather than hide behind an unsafe cast.
const tuiQueueRef = useRef<QueueRuntime | null>(null);
if (!tuiQueueRef.current) {
tuiQueueRef.current = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) =>
debugLog(
"queue-lifecycle",
`enqueued item_id=${item.id} kind=${item.kind} queue_len=${queueLen}`,
),
onDequeued: (batch) =>
debugLog(
"queue-lifecycle",
`dequeued batch_id=${batch.batchId} merged_count=${batch.mergedCount} queue_len_after=${batch.queueLenAfter}`,
),
onBlocked: (reason, queueLen) =>
debugLog(
"queue-lifecycle",
`blocked reason=${reason} queue_len=${queueLen}`,
),
onCleared: (reason, clearedCount) =>
debugLog(
"queue-lifecycle",
`cleared reason=${reason} cleared_count=${clearedCount}`,
),
},
});
}
// Override content parts for queued submissions (to preserve part boundaries) // Override content parts for queued submissions (to preserve part boundaries)
const overrideContentPartsRef = useRef<MessageCreate["content"] | null>(null); const overrideContentPartsRef = useRef<MessageCreate["content"] | null>(null);
@@ -1671,6 +1720,20 @@ export default function App({
// Provide a queue adder that adds to messageQueue and bumps dequeueEpoch // Provide a queue adder that adds to messageQueue and bumps dequeueEpoch
setMessageQueueAdder((message: QueuedMessage) => { setMessageQueueAdder((message: QueuedMessage) => {
setMessageQueue((q) => [...q, message]); setMessageQueue((q) => [...q, message]);
// PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking.
tuiQueueRef.current?.enqueue(
message.kind === "task_notification"
? ({
kind: "task_notification",
source: "task_notification",
text: message.text,
} as Parameters<typeof tuiQueueRef.current.enqueue>[0])
: ({
kind: "message",
source: "user",
content: message.text,
} as Parameters<typeof tuiQueueRef.current.enqueue>[0]),
);
setDequeueEpoch((e) => e + 1); setDequeueEpoch((e) => e + 1);
}); });
return () => setMessageQueueAdder(null); return () => setMessageQueueAdder(null);
@@ -1746,6 +1809,9 @@ export default function App({
const consumeQueuedMessages = useCallback((): QueuedMessage[] | null => { const consumeQueuedMessages = useCallback((): QueuedMessage[] | null => {
if (messageQueueRef.current.length === 0) return null; if (messageQueueRef.current.length === 0) return null;
const messages = [...messageQueueRef.current]; const messages = [...messageQueueRef.current];
// PRQ4: items are being submitted into the current turn, so fire onDequeued
// (not onCleared) to reflect actual consumption, not an error/cancel drop.
tuiQueueRef.current?.consumeItems(messages.length);
setMessageQueue([]); setMessageQueue([]);
return messages; return messages;
}, []); }, []);
@@ -5040,6 +5106,7 @@ export default function App({
lastDequeuedMessageRef.current = null; lastDequeuedMessageRef.current = null;
} }
// Clear any remaining queue on error // Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]); setMessageQueue([]);
setStreaming(false); setStreaming(false);
@@ -5144,6 +5211,7 @@ export default function App({
lastDequeuedMessageRef.current = null; lastDequeuedMessageRef.current = null;
} }
// Clear any remaining queue on error // Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]); setMessageQueue([]);
setStreaming(false); setStreaming(false);
@@ -5175,6 +5243,7 @@ export default function App({
lastDequeuedMessageRef.current = null; lastDequeuedMessageRef.current = null;
} }
// Clear any remaining queue on error // Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]); setMessageQueue([]);
setStreaming(false); setStreaming(false);
@@ -5215,6 +5284,7 @@ export default function App({
lastDequeuedMessageRef.current = null; lastDequeuedMessageRef.current = null;
} }
// Clear any remaining queue on error // Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]); setMessageQueue([]);
setStreaming(false); setStreaming(false);
@@ -5290,6 +5360,8 @@ export default function App({
// Handler when user presses UP/ESC to load queue into input for editing // Handler when user presses UP/ESC to load queue into input for editing
const handleEnterQueueEditMode = useCallback(() => { const handleEnterQueueEditMode = useCallback(() => {
// PRQ4: items are discarded (user is editing them), not submitted.
tuiQueueRef.current?.clear("stale_generation");
setMessageQueue([]); setMessageQueue([]);
}, []); }, []);
@@ -6388,6 +6460,12 @@ export default function App({
return newQueue; return newQueue;
}); });
// PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking.
tuiQueueRef.current?.enqueue({
kind: "message",
source: "user",
content: msg,
} as Parameters<typeof tuiQueueRef.current.enqueue>[0]);
return { submitted: true }; // Clears input return { submitted: true }; // Clears input
} }
@@ -9735,6 +9813,9 @@ ${SYSTEM_REMINDER_CLOSE}
// Store the message before clearing queue - allows restoration on error // Store the message before clearing queue - allows restoration on error
lastDequeuedMessageRef.current = concatenatedMessage; lastDequeuedMessageRef.current = concatenatedMessage;
// PRQ4: fire onDequeued before clearing state so QueueRuntime and
// messageQueue drop to 0 together (divergence check runs after commit).
tuiQueueRef.current?.consumeItems(messageQueue.length);
setMessageQueue([]); setMessageQueue([]);
// Submit the concatenated message using the normal submit flow // Submit the concatenated message using the normal submit flow
@@ -9747,6 +9828,23 @@ ${SYSTEM_REMINDER_CLOSE}
"queue", "queue",
`Dequeue blocked: streaming=${streaming}, queuedOverlayAction=${!!queuedOverlayAction}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`, `Dequeue blocked: streaming=${streaming}, queuedOverlayAction=${!!queuedOverlayAction}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`,
); );
// PRQ4: emit queue_blocked on first blocked-reason transition per reason.
// tryDequeue deduplicates via lastEmittedBlockedReason — fires onBlocked
// only when the reason changes, not on every effect re-run.
const blockedReason = getTuiBlockedReason({
streaming,
isExecutingTool,
commandRunning,
pendingApprovalsLen: pendingApprovals.length,
queuedOverlayAction: !!queuedOverlayAction,
anySelectorOpen,
waitingForQueueCancel: waitingForQueueCancelRef.current,
userCancelled: userCancelledRef.current,
abortControllerActive: !!abortControllerRef.current,
});
if (blockedReason) {
tuiQueueRef.current?.tryDequeue(blockedReason);
}
} }
}, [ }, [
streaming, streaming,

View File

@@ -0,0 +1,38 @@
/**
* Helpers for the PRQ4 TUI QueueRuntime mirror.
*
* These are extracted as pure functions so they are independently unit-testable
* without importing React or App.tsx.
*/
import type { QueueBlockedReason } from "../../types/protocol";
export type TuiQueueGatingConditions = {
streaming: boolean;
isExecutingTool: boolean;
commandRunning: boolean;
pendingApprovalsLen: number;
queuedOverlayAction: boolean;
anySelectorOpen: boolean;
waitingForQueueCancel: boolean;
userCancelled: boolean;
abortControllerActive: boolean;
};
/**
* Map the TUI dequeue gating conditions to a QueueBlockedReason.
* Priority order matches the plan — first match wins.
* Returns null when all conditions are clear (dequeue should proceed).
*/
export function getTuiBlockedReason(
c: TuiQueueGatingConditions,
): QueueBlockedReason | null {
if (c.waitingForQueueCancel || c.userCancelled)
return "interrupt_in_progress";
if (c.pendingApprovalsLen > 0) return "pending_approvals";
if (c.queuedOverlayAction || c.anySelectorOpen) return "overlay_open";
if (c.commandRunning) return "command_running";
if (c.streaming || c.isExecutingTool || c.abortControllerActive)
return "streaming";
return null;
}

View File

@@ -0,0 +1,239 @@
/**
* Integration tests for PRQ4: TUI QueueRuntime mirror lifecycle events.
*
* Drives QueueRuntime directly using the same patterns as the App.tsx dual-path,
* without requiring React or a TUI instance. Each test mirrors one App.tsx code path.
*
* Invariants verified:
* - Idle submit: enqueue → consumeItems → onEnqueued + onDequeued, no onBlocked
* - Busy submit: enqueue while blocked → onBlocked once; unblock → consumeItems → onDequeued
* - Coalesced batch: N enqueues → consumeItems(N) → mergedCount=N
* - No double-blocked: tryDequeue same reason N times → onBlocked fires once
* - Priority: interrupt_in_progress emitted when streaming also active
* - Approval-append (consumeQueuedMessages pattern): consumeItems fires onDequeued, not onCleared
* - Queue edit clear (handleEnterQueueEditMode): clear("stale_generation") → onCleared
* - Error clear: clear("error") → onCleared
* - Divergence: consumeItems(undercount) leaves length mismatch
* - Blocked then cleared: both events fire, queue empty
*/
import { describe, expect, test } from "bun:test";
import { getTuiBlockedReason } from "../../cli/helpers/tuiQueueAdapter";
import type {
DequeuedBatch,
QueueBlockedReason,
QueueClearedReason,
QueueItem,
} from "../../queue/queueRuntime";
import { QueueRuntime } from "../../queue/queueRuntime";
// ── Helpers ───────────────────────────────────────────────────────
type Recorded = {
enqueued: Array<{ item: QueueItem; queueLen: number }>;
dequeued: DequeuedBatch[];
blocked: Array<{ reason: QueueBlockedReason; queueLen: number }>;
cleared: Array<{ reason: QueueClearedReason; count: number }>;
};
function buildRuntime(): { q: QueueRuntime; rec: Recorded } {
const rec: Recorded = {
enqueued: [],
dequeued: [],
blocked: [],
cleared: [],
};
const q = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => rec.enqueued.push({ item, queueLen }),
onDequeued: (batch) => rec.dequeued.push(batch),
onBlocked: (reason, queueLen) => rec.blocked.push({ reason, queueLen }),
onCleared: (reason, count) => rec.cleared.push({ reason, count }),
},
});
return { q, rec };
}
function enqueueUserMsg(q: QueueRuntime, text = "hello"): void {
q.enqueue({
kind: "message",
source: "user",
content: text,
} as Parameters<typeof q.enqueue>[0]);
}
function enqueueTaskNotif(q: QueueRuntime, text = "<notif/>"): void {
q.enqueue({
kind: "task_notification",
source: "task_notification",
text,
} as Parameters<typeof q.enqueue>[0]);
}
// ── Tests ─────────────────────────────────────────────────────────
describe("idle submit — single message", () => {
test("enqueued then consumeItems(1) fires onEnqueued + onDequeued, no blocked", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q);
expect(rec.enqueued).toHaveLength(1);
expect(rec.blocked).toHaveLength(0);
q.consumeItems(1);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(1);
expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0);
expect(q.length).toBe(0);
});
});
describe("busy submit — blocked on streaming", () => {
test("blocked fires on first tryDequeue; consumeItems fires dequeued", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q);
const reason = getTuiBlockedReason({
streaming: true,
isExecutingTool: false,
commandRunning: false,
pendingApprovalsLen: 0,
queuedOverlayAction: false,
anySelectorOpen: false,
waitingForQueueCancel: false,
userCancelled: false,
abortControllerActive: false,
});
expect(reason).not.toBeNull();
q.tryDequeue(reason as NonNullable<typeof reason>);
expect(rec.blocked).toHaveLength(1);
expect(rec.blocked.at(0)?.reason).toBe("streaming");
expect(rec.blocked.at(0)?.queueLen).toBe(1);
// Stream ends: consumeItems fires dequeued
q.consumeItems(1);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(1);
});
});
describe("coalesced batch", () => {
test("two enqueues then consumeItems(2) → mergedCount=2", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q, "first");
enqueueTaskNotif(q, "<task/>");
q.consumeItems(2);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(2);
expect(rec.dequeued.at(0)?.items.at(0)?.kind).toBe("message");
expect(rec.dequeued.at(0)?.items.at(1)?.kind).toBe("task_notification");
expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0);
});
});
describe("no double-blocked — QueueRuntime dedup", () => {
test("tryDequeue same reason 3× → onBlocked fires once", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q);
q.tryDequeue("streaming");
q.tryDequeue("streaming");
q.tryDequeue("streaming");
expect(rec.blocked).toHaveLength(1);
});
test("reason change re-fires onBlocked", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q);
q.tryDequeue("streaming");
q.tryDequeue("pending_approvals"); // reason changed → fires again
expect(rec.blocked).toHaveLength(2);
expect(rec.blocked.at(1)?.reason).toBe("pending_approvals");
});
});
describe("priority: interrupt_in_progress beats streaming", () => {
test("getTuiBlockedReason returns interrupt_in_progress when streaming also true", () => {
const reason = getTuiBlockedReason({
streaming: true,
isExecutingTool: false,
commandRunning: false,
pendingApprovalsLen: 0,
queuedOverlayAction: false,
anySelectorOpen: false,
waitingForQueueCancel: false,
userCancelled: true, // interrupt_in_progress
abortControllerActive: false,
});
const { q, rec } = buildRuntime();
enqueueUserMsg(q);
expect(reason).not.toBeNull();
q.tryDequeue(reason as NonNullable<typeof reason>);
expect(rec.blocked.at(0)?.reason).toBe("interrupt_in_progress");
});
});
describe("approval-append path (consumeQueuedMessages mirror)", () => {
test("consumeItems(n) fires onDequeued — items are submitted, not dropped", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q, "queued during approval");
enqueueTaskNotif(q, "<notif/>");
// Mirror consumeQueuedMessages: messages.length = 2
q.consumeItems(2);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(2);
expect(rec.cleared).toHaveLength(0); // NOT a clear
});
});
describe("queue edit clear (handleEnterQueueEditMode)", () => {
test("clear('stale_generation') fires onCleared, queue empty", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q, "pending message");
enqueueUserMsg(q, "another");
q.clear("stale_generation");
expect(rec.cleared).toHaveLength(1);
expect(rec.cleared.at(0)?.reason).toBe("stale_generation");
expect(rec.cleared.at(0)?.count).toBe(2);
expect(q.length).toBe(0);
expect(rec.dequeued).toHaveLength(0); // not a dequeue
});
});
describe("error clear", () => {
test("clear('error') fires onCleared with correct count", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q, "pending");
q.clear("error");
expect(rec.cleared.at(0)?.reason).toBe("error");
expect(rec.cleared.at(0)?.count).toBe(1);
expect(q.length).toBe(0);
});
test("clear('error') on empty queue fires with count=0", () => {
const { q, rec } = buildRuntime();
q.clear("error");
expect(rec.cleared.at(0)?.count).toBe(0);
});
});
describe("divergence scenario — consumeItems undercount", () => {
test("consumeItems(1) on 2-item queue leaves length=1 (detectable mismatch)", () => {
const { q } = buildRuntime();
enqueueUserMsg(q, "first");
enqueueUserMsg(q, "second");
q.consumeItems(1); // undercount — simulates drift
expect(q.length).toBe(1); // mismatch vs expected messageQueue.length=0
});
});
describe("blocked then cleared", () => {
test("both onBlocked and onCleared fire; queue ends empty", () => {
const { q, rec } = buildRuntime();
enqueueUserMsg(q);
q.tryDequeue("streaming"); // fires onBlocked
q.clear("error"); // fires onCleared
expect(rec.blocked).toHaveLength(1);
expect(rec.cleared).toHaveLength(1);
expect(q.length).toBe(0);
});
});

View File

@@ -0,0 +1,149 @@
/**
* Unit tests for getTuiBlockedReason() in tuiQueueAdapter.ts.
*/
import { describe, expect, test } from "bun:test";
import {
getTuiBlockedReason,
type TuiQueueGatingConditions,
} from "../../cli/helpers/tuiQueueAdapter";
const allClear: TuiQueueGatingConditions = {
streaming: false,
isExecutingTool: false,
commandRunning: false,
pendingApprovalsLen: 0,
queuedOverlayAction: false,
anySelectorOpen: false,
waitingForQueueCancel: false,
userCancelled: false,
abortControllerActive: false,
};
describe("getTuiBlockedReason", () => {
test("returns null when all conditions clear", () => {
expect(getTuiBlockedReason(allClear)).toBeNull();
});
test("streaming → 'streaming'", () => {
expect(getTuiBlockedReason({ ...allClear, streaming: true })).toBe(
"streaming",
);
});
test("isExecutingTool → 'streaming'", () => {
expect(getTuiBlockedReason({ ...allClear, isExecutingTool: true })).toBe(
"streaming",
);
});
test("abortControllerActive → 'streaming'", () => {
expect(
getTuiBlockedReason({ ...allClear, abortControllerActive: true }),
).toBe("streaming");
});
test("commandRunning → 'command_running'", () => {
expect(getTuiBlockedReason({ ...allClear, commandRunning: true })).toBe(
"command_running",
);
});
test("pendingApprovalsLen > 0 → 'pending_approvals'", () => {
expect(getTuiBlockedReason({ ...allClear, pendingApprovalsLen: 3 })).toBe(
"pending_approvals",
);
});
test("queuedOverlayAction → 'overlay_open'", () => {
expect(
getTuiBlockedReason({ ...allClear, queuedOverlayAction: true }),
).toBe("overlay_open");
});
test("anySelectorOpen → 'overlay_open'", () => {
expect(getTuiBlockedReason({ ...allClear, anySelectorOpen: true })).toBe(
"overlay_open",
);
});
test("waitingForQueueCancel → 'interrupt_in_progress'", () => {
expect(
getTuiBlockedReason({ ...allClear, waitingForQueueCancel: true }),
).toBe("interrupt_in_progress");
});
test("userCancelled → 'interrupt_in_progress'", () => {
expect(getTuiBlockedReason({ ...allClear, userCancelled: true })).toBe(
"interrupt_in_progress",
);
});
describe("priority order (first match wins)", () => {
test("interrupt_in_progress beats streaming", () => {
expect(
getTuiBlockedReason({
...allClear,
streaming: true,
userCancelled: true,
}),
).toBe("interrupt_in_progress");
});
test("interrupt_in_progress beats pending_approvals", () => {
expect(
getTuiBlockedReason({
...allClear,
pendingApprovalsLen: 2,
waitingForQueueCancel: true,
}),
).toBe("interrupt_in_progress");
});
test("pending_approvals beats overlay_open", () => {
expect(
getTuiBlockedReason({
...allClear,
pendingApprovalsLen: 1,
anySelectorOpen: true,
}),
).toBe("pending_approvals");
});
test("overlay_open beats command_running", () => {
expect(
getTuiBlockedReason({
...allClear,
commandRunning: true,
queuedOverlayAction: true,
}),
).toBe("overlay_open");
});
test("command_running beats streaming", () => {
expect(
getTuiBlockedReason({
...allClear,
streaming: true,
commandRunning: true,
}),
).toBe("command_running");
});
test("all conditions active → interrupt_in_progress (highest priority)", () => {
expect(
getTuiBlockedReason({
streaming: true,
isExecutingTool: true,
commandRunning: true,
pendingApprovalsLen: 1,
queuedOverlayAction: true,
anySelectorOpen: true,
waitingForQueueCancel: true,
userCancelled: true,
abortControllerActive: true,
}),
).toBe("interrupt_in_progress");
});
});
});