feat(queue): QueueRuntime TUI cutover — remove messageQueue array as source of truth (#1168)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-26 16:57:42 -08:00
committed by GitHub
parent 6967f9dcc9
commit 462d14edce
4 changed files with 394 additions and 99 deletions

View File

@@ -79,7 +79,11 @@ import {
import type { ApprovalContext } from "../permissions/analyzer";
import { type PermissionMode, permissionMode } from "../permissions/mode";
import { OPENAI_CODEX_PROVIDER_NAME } from "../providers/openai-codex-provider";
import { QueueRuntime } from "../queue/queueRuntime";
import {
type MessageQueueItem,
QueueRuntime,
type TaskNotificationQueueItem,
} from "../queue/queueRuntime";
import {
DEFAULT_COMPLETION_PROMISE,
type RalphState,
@@ -227,9 +231,11 @@ import {
} from "./helpers/pasteRegistry";
import { generatePlanFilePath } from "./helpers/planName";
import {
buildContentFromQueueBatch,
buildQueuedContentParts,
buildQueuedUserText,
getQueuedNotificationSummaries,
toQueuedMsg,
} from "./helpers/queuedMessageParts";
import { resolveReasoningTabToggleCommand } from "./helpers/reasoningTabToggle";
import { safeJsonParseOr } from "./helpers/safeJsonParse";
@@ -1666,56 +1672,46 @@ export default function App({
const conversationBusyRetriesRef = useRef(0);
// Message queue state for queueing messages during streaming
const [messageQueue, setMessageQueue] = useState<QueuedMessage[]>([]);
const [queueDisplay, setQueueDisplay] = useState<QueuedMessage[]>([]);
const messageQueueRef = useRef<QueuedMessage[]>([]); // For synchronous access
useEffect(() => {
messageQueueRef.current = messageQueue;
}, [messageQueue]);
// PRQ4: divergence check — runs after every messageQueue commit, by which time
// tuiQueueRef has already been updated (enqueue/consumeItems called synchronously
// before setMessageQueue). Warn-only, never throws.
useEffect(() => {
if ((tuiQueueRef.current?.length ?? 0) !== messageQueue.length) {
debugWarn(
"queue-lifecycle",
`drift: QueueRuntime.length=${tuiQueueRef.current?.length ?? 0} messageQueue.length=${messageQueue.length}`,
);
}
}, [messageQueue]);
// PRQ4: QueueRuntime mirror — parallel lifecycle tracking alongside existing queue.
// Callbacks emit to the debug log only (gated on LETTA_DEBUG=1).
// Does NOT drive submit decisions — existing messageQueue state remains authoritative.
// Lazy init: useRef(new QueueRuntime(...)) would allocate on every render
// (React ignores all but the first, but construction still runs). The ref is
// typed QueueRuntime | null; call sites use ?. so the type is enforced and a
// missed init would no-op rather than hide behind an unsafe cast.
// QueueRuntime — authoritative queue. maxItems: Infinity disables drop limits
// to match the previous unbounded array semantics. queueDisplay is a derived
// UI state maintained by the onEnqueued/onDequeued/onCleared callbacks.
// Lazy init pattern; typed QueueRuntime | null with ?. at all call sites.
const tuiQueueRef = useRef<QueueRuntime | null>(null);
if (!tuiQueueRef.current) {
tuiQueueRef.current = new QueueRuntime({
maxItems: Infinity,
callbacks: {
onEnqueued: (item, queueLen) =>
onEnqueued: (item, queueLen) => {
debugLog(
"queue-lifecycle",
`enqueued item_id=${item.id} kind=${item.kind} queue_len=${queueLen}`,
),
onDequeued: (batch) =>
);
// queueDisplay is the single source for UI — updated only here.
if (item.kind === "message" || item.kind === "task_notification") {
setQueueDisplay((prev) => [...prev, toQueuedMsg(item)]);
}
},
onDequeued: (batch) => {
debugLog(
"queue-lifecycle",
`dequeued batch_id=${batch.batchId} merged_count=${batch.mergedCount} queue_len_after=${batch.queueLenAfter}`,
),
);
setQueueDisplay((prev) => prev.slice(batch.mergedCount));
},
onBlocked: (reason, queueLen) =>
debugLog(
"queue-lifecycle",
`blocked reason=${reason} queue_len=${queueLen}`,
),
onCleared: (reason, clearedCount) =>
onCleared: (_reason, _clearedCount) => {
debugLog(
"queue-lifecycle",
`cleared reason=${reason} cleared_count=${clearedCount}`,
),
`cleared reason=${_reason} cleared_count=${_clearedCount}`,
);
setQueueDisplay([]);
},
},
});
}
@@ -1724,12 +1720,10 @@ export default function App({
const overrideContentPartsRef = useRef<MessageCreate["content"] | null>(null);
// Set up message queue bridge for background tasks
// This allows non-React code (Task.ts) to add notifications to messageQueue
// This allows non-React code (Task.ts) to add notifications to queueDisplay
useEffect(() => {
// Provide a queue adder that adds to messageQueue and bumps dequeueEpoch
// Enqueue via QueueRuntime — onEnqueued callback updates queueDisplay.
setMessageQueueAdder((message: QueuedMessage) => {
setMessageQueue((q) => [...q, message]);
// PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking.
tuiQueueRef.current?.enqueue(
message.kind === "task_notification"
? ({
@@ -1814,15 +1808,20 @@ export default function App({
[],
);
// Consume queued messages for appending to tool results (clears queue)
// Consume queued messages for appending to tool results (clears queue).
// consumeItems fires onDequeued → setQueueDisplay(prev => prev.slice(n))
// so no direct setQueueDisplay call is needed here.
const consumeQueuedMessages = useCallback((): QueuedMessage[] | null => {
if (messageQueueRef.current.length === 0) return null;
const messages = [...messageQueueRef.current];
// PRQ4: items are being submitted into the current turn, so fire onDequeued
// (not onCleared) to reflect actual consumption, not an error/cancel drop.
tuiQueueRef.current?.consumeItems(messages.length);
setMessageQueue([]);
return messages;
const len = tuiQueueRef.current?.length ?? 0;
if (len === 0) return null;
const batch = tuiQueueRef.current?.consumeItems(len);
if (!batch) return null;
return batch.items
.filter(
(item): item is MessageQueueItem | TaskNotificationQueueItem =>
item.kind === "message" || item.kind === "task_notification",
)
.map(toQueuedMsg);
}, []);
// Helper to wrap async handlers that need to close overlay and lock input
@@ -5115,8 +5114,7 @@ export default function App({
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]);
tuiQueueRef.current?.clear("error");
setStreaming(false);
sendDesktopNotification("Stream error", "error"); // Notify user of error
@@ -5220,8 +5218,7 @@ export default function App({
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]);
tuiQueueRef.current?.clear("error");
setStreaming(false);
sendDesktopNotification();
@@ -5252,8 +5249,7 @@ export default function App({
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]);
tuiQueueRef.current?.clear("error");
setStreaming(false);
sendDesktopNotification("Execution error", "error"); // Notify user of error
@@ -5293,8 +5289,7 @@ export default function App({
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
tuiQueueRef.current?.clear("error"); // PRQ4
setMessageQueue([]);
tuiQueueRef.current?.clear("error");
setStreaming(false);
sendDesktopNotification("Processing error", "error"); // Notify user of error
@@ -5311,7 +5306,7 @@ export default function App({
// won't re-run on its own — bump dequeueEpoch to force re-evaluation.
// Only bump for normal completions — if stale (ESC was pressed), the user
// cancelled and queued messages should NOT be auto-submitted.
if (!isStale && messageQueueRef.current.length > 0) {
if (!isStale && (tuiQueueRef.current?.length ?? 0) > 0) {
setDequeueEpoch((e) => e + 1);
}
@@ -5369,9 +5364,7 @@ export default function App({
// Handler when user presses UP/ESC to load queue into input for editing
const handleEnterQueueEditMode = useCallback(() => {
// PRQ4: items are discarded (user is editing them), not submitted.
tuiQueueRef.current?.clear("stale_generation");
setMessageQueue([]);
}, []);
// Handle paste errors (e.g., image too large)
@@ -6434,10 +6427,10 @@ export default function App({
// If there are queued messages and agent is not busy, bump epoch to trigger
// dequeue effect. Without this, the effect won't re-run because refs aren't
// in its deps array (only state values are).
if (!isAgentBusy() && messageQueue.length > 0) {
if (!isAgentBusy() && (tuiQueueRef.current?.length ?? 0) > 0) {
debugLog(
"queue",
`Bumping dequeueEpoch: userCancelledRef was reset, ${messageQueue.length} message(s) queued, agent not busy`,
`Bumping dequeueEpoch: userCancelledRef was reset, ${tuiQueueRef.current?.length ?? 0} message(s) queued, agent not busy`,
);
setDequeueEpoch((e) => e + 1);
}
@@ -6459,22 +6452,13 @@ export default function App({
isNonStateCommand(userTextForInput);
if (isAgentBusy() && !shouldBypassQueue) {
setMessageQueue((prev) => {
const newQueue: QueuedMessage[] = [
...prev,
{ kind: "user", text: msg },
];
// Regular messages: queue and wait for tool completion
return newQueue;
});
// PRQ4: mirror enqueue into QueueRuntime for lifecycle tracking.
// Enqueue via QueueRuntime — onEnqueued callback updates queueDisplay.
tuiQueueRef.current?.enqueue({
kind: "message",
source: "user",
content: msg,
} as Parameters<typeof tuiQueueRef.current.enqueue>[0]);
setDequeueEpoch((e) => e + 1);
return { submitted: true }; // Clears input
}
@@ -9091,7 +9075,7 @@ ${SYSTEM_REMINDER_CLOSE}
// Combine reminders with content as separate text parts.
// This preserves each reminder boundary in the API payload.
// Note: Task notifications now come through messageQueue directly (added by messageQueueBridge)
// Note: Task notifications now come through queueDisplay directly (added by messageQueueBridge)
const reminderParts: Array<{ type: "text"; text: string }> = [];
const pushReminder = (text: string) => {
if (!text) return;
@@ -9831,15 +9815,16 @@ ${SYSTEM_REMINDER_CLOSE}
onSubmitRef.current = onSubmit;
}, [onSubmit]);
// Process queued messages when streaming ends
// Task notifications are now added directly to messageQueue via messageQueueBridge
// Process queued messages when streaming ends.
// QueueRuntime is authoritative: consumeItems drives the dequeue and fires
// onDequeued → setQueueDisplay(prev => prev.slice(n)) to update the UI.
// dequeueEpoch is the sole re-trigger: bumped on every enqueue, turn
// completion (abortControllerRef clears), and cancel-reset.
useEffect(() => {
// Reference dequeueEpoch to satisfy exhaustive-deps - it's used to force
// re-runs when userCancelledRef is reset (refs aren't in deps)
// Also triggers when task notifications are added to queue
void dequeueEpoch;
void dequeueEpoch; // explicit dep to satisfy exhaustive-deps lint
const hasAnythingQueued = messageQueue.length > 0;
const queueLen = tuiQueueRef.current?.length ?? 0;
const hasAnythingQueued = queueLen > 0;
if (
!streaming &&
@@ -9853,28 +9838,33 @@ ${SYSTEM_REMINDER_CLOSE}
!userCancelledRef.current && // Don't dequeue if user just cancelled
!abortControllerRef.current // Don't dequeue while processConversation is still active
) {
// Concatenate all queued messages into one (better UX when user types multiple
// messages quickly - they get combined into one context for the agent)
// Task notifications are already in the queue as XML strings
const concatenatedMessage = messageQueue
.map((item) => item.text)
// consumeItems(n) fires onDequeued → setQueueDisplay(prev => prev.slice(n)).
const batch = tuiQueueRef.current?.consumeItems(queueLen);
if (!batch) return;
// Build concatenated text for lastDequeuedMessageRef (error restoration).
const concatenatedMessage = batch.items
.map((item) => {
if (item.kind === "task_notification") return item.text;
if (item.kind === "message") {
return typeof item.content === "string" ? item.content : "";
}
return "";
})
.filter((t) => t.length > 0)
.join("\n");
const queuedContentParts = buildQueuedContentParts(messageQueue);
const queuedContentParts = buildContentFromQueueBatch(batch);
debugLog(
"queue",
`Dequeuing ${messageQueue.length} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`,
`Dequeuing ${batch.mergedCount} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`,
);
// Store the message before clearing queue - allows restoration on error
// Store before submit — allows restoration on error (ESC path).
lastDequeuedMessageRef.current = concatenatedMessage;
// PRQ4: fire onDequeued before clearing state so QueueRuntime and
// messageQueue drop to 0 together (divergence check runs after commit).
tuiQueueRef.current?.consumeItems(messageQueue.length);
setMessageQueue([]);
// Submit the concatenated message using the normal submit flow
// This ensures all setup (reminders, UI updates, etc.) happens correctly
// Submit via normal flow — overrideContentPartsRef carries rich content parts.
overrideContentPartsRef.current = queuedContentParts;
onSubmitRef.current(concatenatedMessage);
} else if (hasAnythingQueued) {
@@ -9883,9 +9873,7 @@ ${SYSTEM_REMINDER_CLOSE}
"queue",
`Dequeue blocked: streaming=${streaming}, queuedOverlayAction=${!!queuedOverlayAction}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}, abortController=${!!abortControllerRef.current}`,
);
// PRQ4: emit queue_blocked on first blocked-reason transition per reason.
// tryDequeue deduplicates via lastEmittedBlockedReason — fires onBlocked
// only when the reason changes, not on every effect re-run.
// Emit queue_blocked on blocked-reason transitions only (dedup via tryDequeue).
const blockedReason = getTuiBlockedReason({
streaming,
isExecutingTool,
@@ -9903,13 +9891,12 @@ ${SYSTEM_REMINDER_CLOSE}
}
}, [
streaming,
messageQueue,
pendingApprovals,
commandRunning,
isExecutingTool,
anySelectorOpen,
queuedOverlayAction,
dequeueEpoch, // Triggered when userCancelledRef is reset OR task notifications added
dequeueEpoch, // Triggered on every enqueue, turn completion, and cancel-reset
]);
// Helper to send all approval results when done
@@ -12474,7 +12461,7 @@ Plan file path: ${planFilePath}`;
currentModel={currentModelDisplay}
currentModelProvider={currentModelProvider}
currentReasoningEffort={currentReasoningEffort}
messageQueue={messageQueue}
messageQueue={queueDisplay}
onEnterQueueEditMode={handleEnterQueueEditMode}
onEscapeCancel={
profileConfirmPending ? handleProfileEscapeCancel : undefined

View File

@@ -1,4 +1,9 @@
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type {
DequeuedBatch,
MessageQueueItem,
TaskNotificationQueueItem,
} from "../../queue/queueRuntime";
import { mergeQueuedTurnInput } from "../../queue/turnQueueRuntime";
import type { QueuedMessage } from "./messageQueueBridge";
import { buildMessageContentFromDisplay } from "./pasteRegistry";
@@ -42,3 +47,70 @@ export function buildQueuedUserText(queued: QueuedMessage[]): string {
.filter((text) => text.length > 0)
.join("\n");
}
/**
* Convert a QueueItem (message or task_notification) to the QueuedMessage
* shape used by the TUI display state and callers of consumeQueuedMessages.
*
* In the TUI, MessageQueueItem.content is always a plain string (the display
* text from the input field). The fallback array-flatten path handles any
* future case where content arrives as content parts.
*/
export function toQueuedMsg(
item: MessageQueueItem | TaskNotificationQueueItem,
): QueuedMessage {
if (item.kind === "task_notification") {
return { kind: "task_notification", text: item.text };
}
const text =
typeof item.content === "string"
? item.content
: item.content
.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("");
return { kind: "user", text };
}
/**
* Build merged MessageCreate content from a DequeuedBatch.
*
* Produces identical output to buildQueuedContentParts() for equivalent
* inputs — this is enforced by the golden parity test. The difference is
* that the input is QueueItem[] (from QueueRuntime) instead of QueuedMessage[].
*
* Only message and task_notification items contribute to the content batch;
* barrier items (approval_result, overlay_action) are skipped.
*/
export function buildContentFromQueueBatch(
batch: DequeuedBatch,
): MessageCreate["content"] {
const queueInput = batch.items
.filter(
(item): item is MessageQueueItem | TaskNotificationQueueItem =>
item.kind === "message" || item.kind === "task_notification",
)
.map((item) =>
item.kind === "task_notification"
? ({ kind: "task_notification", text: item.text } as const)
: ({
kind: "user",
content: item.content,
} as const),
);
const merged = mergeQueuedTurnInput(queueInput, {
// For string content (common TUI case), apply paste-registry resolution
// exactly as buildQueuedContentParts does. For already-normalized content
// parts, pass through unchanged.
normalizeUserContent: (content) =>
typeof content === "string"
? buildMessageContentFromDisplay(content)
: content,
});
if (merged === null) {
return [];
}
return merged;
}