From 382acacc7e802ee29c1835313ecc87c745fef301 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 10 Mar 2026 16:33:56 -0700 Subject: [PATCH] fix: listener queue parity pump (#1338) --- .../websocket/listenerQueueAdapter.test.ts | 43 +++ src/websocket/helpers/listenerQueueAdapter.ts | 18 + src/websocket/listen-client.ts | 322 ++++++++++++------ 3 files changed, 273 insertions(+), 110 deletions(-) create mode 100644 src/tests/websocket/listenerQueueAdapter.test.ts create mode 100644 src/websocket/helpers/listenerQueueAdapter.ts diff --git a/src/tests/websocket/listenerQueueAdapter.test.ts b/src/tests/websocket/listenerQueueAdapter.test.ts new file mode 100644 index 0000000..8fc2e2c --- /dev/null +++ b/src/tests/websocket/listenerQueueAdapter.test.ts @@ -0,0 +1,43 @@ +import { describe, expect, test } from "bun:test"; +import { getListenerBlockedReason } from "../../websocket/helpers/listenerQueueAdapter"; + +const allClear = { + isProcessing: false, + pendingApprovalsLen: 0, + cancelRequested: false, + isRecoveringApprovals: false, +} as const; + +describe("getListenerBlockedReason", () => { + test("returns null when unblocked", () => { + expect(getListenerBlockedReason(allClear)).toBeNull(); + }); + + test("prioritizes pending approvals", () => { + expect( + getListenerBlockedReason({ ...allClear, pendingApprovalsLen: 2 }), + ).toBe("pending_approvals"); + }); + + test("prioritizes interrupt over runtime busy", () => { + expect( + getListenerBlockedReason({ + ...allClear, + cancelRequested: true, + isProcessing: true, + }), + ).toBe("interrupt_in_progress"); + }); + + test("maps recoveries to runtime busy", () => { + expect( + getListenerBlockedReason({ ...allClear, isRecoveringApprovals: true }), + ).toBe("runtime_busy"); + }); + + test("maps active processing to runtime busy", () => { + expect(getListenerBlockedReason({ ...allClear, isProcessing: true })).toBe( + "runtime_busy", + ); + }); +}); diff --git a/src/websocket/helpers/listenerQueueAdapter.ts b/src/websocket/helpers/listenerQueueAdapter.ts new file mode 100644 index 0000000..9f66377 --- /dev/null +++ b/src/websocket/helpers/listenerQueueAdapter.ts @@ -0,0 +1,18 @@ +import type { QueueBlockedReason } from "../../types/protocol"; + +export type ListenerQueueGatingConditions = { + isProcessing: boolean; + pendingApprovalsLen: number; + cancelRequested: boolean; + isRecoveringApprovals: boolean; +}; + +export function getListenerBlockedReason( + c: ListenerQueueGatingConditions, +): QueueBlockedReason | null { + if (c.pendingApprovalsLen > 0) return "pending_approvals"; + if (c.cancelRequested) return "interrupt_in_progress"; + if (c.isRecoveringApprovals) return "runtime_busy"; + if (c.isProcessing) return "runtime_busy"; + return null; +} diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index ead5084..b2d5f7e 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -39,7 +39,12 @@ import { drainStreamWithResume } from "../cli/helpers/stream"; import { INTERRUPTED_BY_USER } from "../constants"; import { computeDiffPreviews } from "../helpers/diffPreview"; import { permissionMode } from "../permissions/mode"; -import { type QueueItem, QueueRuntime } from "../queue/queueRuntime"; +import { + type DequeuedBatch, + type QueueBlockedReason, + type QueueItem, + QueueRuntime, +} from "../queue/queueRuntime"; import { mergeQueuedTurnInput } from "../queue/turnQueueRuntime"; import { buildSharedReminderParts, @@ -72,6 +77,7 @@ import type { TranscriptBackfillMessage, TranscriptSupplementMessage, } from "../types/protocol"; +import { getListenerBlockedReason } from "./helpers/listenerQueueAdapter"; interface StartListenerOptions { connectionId: string; @@ -317,13 +323,13 @@ type ListenerRuntime = { cancelRequested: boolean; /** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */ queueRuntime: QueueRuntime; - /** - * Queue item IDs that were coalesced into an earlier dequeued batch. - * Their already-scheduled promise-chain callbacks should no-op. - */ - coalescedSkipQueueItemIds: Set; - /** Count of turns currently queued or in-flight in the promise chain. Incremented - * synchronously on message arrival (before .then()) to avoid async scheduling races. */ + /** Correlates queued queue item ids to original inbound frames. */ + queuedMessagesByItemId: Map; + /** True while a queue drain pass is actively running. */ + queuePumpActive: boolean; + /** Dedupes queue pump scheduling onto messageQueue chain. */ + queuePumpScheduled: boolean; + /** Queue backlog metric for state snapshot visibility. */ pendingTurns: number; /** Optional debug hook for WS event logging. */ onWsEvent?: StartListenerOptions["onWsEvent"]; @@ -546,7 +552,9 @@ function createRuntime(): ListenerRuntime { reminderState: createSharedReminderState(), bootWorkingDirectory, workingDirectoryByConversation: new Map(), - coalescedSkipQueueItemIds: new Set(), + queuedMessagesByItemId: new Map(), + queuePumpActive: false, + queuePumpScheduled: false, pendingTurns: 0, // queueRuntime assigned below — needs runtime ref in callbacks queueRuntime: null as unknown as QueueRuntime, @@ -554,6 +562,7 @@ function createRuntime(): ListenerRuntime { runtime.queueRuntime = new QueueRuntime({ callbacks: { onEnqueued: (item, queueLen) => { + runtime.pendingTurns = queueLen; if (runtime.socket?.readyState === WebSocket.OPEN) { const content = item.kind === "message" ? item.content : item.text; emitToWS(runtime.socket, { @@ -573,6 +582,7 @@ function createRuntime(): ListenerRuntime { } }, onDequeued: (batch) => { + runtime.pendingTurns = batch.queueLenAfter; if (runtime.socket?.readyState === WebSocket.OPEN) { emitToWS(runtime.socket, { type: "queue_batch_dequeued", @@ -599,6 +609,7 @@ function createRuntime(): ListenerRuntime { } }, onCleared: (reason, clearedCount, items) => { + runtime.pendingTurns = 0; if (runtime.socket?.readyState === WebSocket.OPEN) { emitToWS(runtime.socket, { type: "queue_cleared", @@ -611,6 +622,7 @@ function createRuntime(): ListenerRuntime { } }, onDropped: (item, reason, queueLen) => { + runtime.pendingTurns = queueLen; if (runtime.socket?.readyState === WebSocket.OPEN) { emitToWS(runtime.socket, { type: "queue_item_dropped", @@ -891,6 +903,158 @@ function mergeDequeuedBatchContent( }); } +function getPrimaryQueueMessageItem(items: QueueItem[]): QueueItem | null { + for (const item of items) { + if (item.kind === "message") { + return item; + } + } + return null; +} + +function buildQueuedTurnMessage( + runtime: ListenerRuntime, + batch: DequeuedBatch, +): IncomingMessage | null { + const primaryItem = getPrimaryQueueMessageItem(batch.items); + if (!primaryItem) { + for (const item of batch.items) { + runtime.queuedMessagesByItemId.delete(item.id); + } + return null; + } + + const template = runtime.queuedMessagesByItemId.get(primaryItem.id); + for (const item of batch.items) { + runtime.queuedMessagesByItemId.delete(item.id); + } + if (!template) { + return null; + } + + const mergedContent = mergeDequeuedBatchContent(batch.items); + if (mergedContent === null) { + return null; + } + + const firstMessageIndex = template.messages.findIndex( + (payload): payload is MessageCreate & { client_message_id?: string } => + "content" in payload, + ); + if (firstMessageIndex === -1) { + return null; + } + + const firstMessage = template.messages[firstMessageIndex] as MessageCreate & { + client_message_id?: string; + }; + const mergedFirstMessage = { + ...firstMessage, + content: mergedContent, + }; + const messages = template.messages.slice(); + messages[firstMessageIndex] = mergedFirstMessage; + + return { + ...template, + messages, + }; +} + +function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { + return parsed.messages.some((payload) => "content" in payload); +} + +function computeListenerQueueBlockedReason( + runtime: ListenerRuntime, +): QueueBlockedReason | null { + return getListenerBlockedReason({ + isProcessing: runtime.isProcessing, + pendingApprovalsLen: runtime.pendingApprovalResolvers.size, + cancelRequested: runtime.cancelRequested, + isRecoveringApprovals: runtime.isRecoveringApprovals, + }); +} + +async function drainQueuedMessages( + runtime: ListenerRuntime, + socket: WebSocket, + opts: StartListenerOptions, +): Promise { + if (runtime.queuePumpActive) { + return; + } + + runtime.queuePumpActive = true; + try { + while (true) { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + const blockedReason = computeListenerQueueBlockedReason(runtime); + if (blockedReason) { + runtime.queueRuntime.tryDequeue(blockedReason); + return; + } + + const queueLen = runtime.queueRuntime.length; + if (queueLen === 0) { + return; + } + + const dequeuedBatch = runtime.queueRuntime.consumeItems(queueLen); + if (!dequeuedBatch) { + return; + } + + const queuedTurn = buildQueuedTurnMessage(runtime, dequeuedBatch); + if (!queuedTurn) { + continue; + } + + opts.onStatusChange?.("receiving", opts.connectionId); + await handleIncomingMessage( + queuedTurn, + socket, + runtime, + opts.onStatusChange, + opts.connectionId, + dequeuedBatch.batchId, + ); + opts.onStatusChange?.("idle", opts.connectionId); + } + } finally { + runtime.queuePumpActive = false; + } +} + +function scheduleQueuePump( + runtime: ListenerRuntime, + socket: WebSocket, + opts: StartListenerOptions, +): void { + if (runtime.queuePumpScheduled) { + return; + } + runtime.queuePumpScheduled = true; + runtime.messageQueue = runtime.messageQueue + .then(async () => { + runtime.queuePumpScheduled = false; + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + await drainQueuedMessages(runtime, socket, opts); + }) + .catch((error: unknown) => { + runtime.queuePumpScheduled = false; + if (process.env.DEBUG) { + console.error("[Listen] Error in queue pump:", error); + } + opts.onStatusChange?.("idle", opts.connectionId); + }); +} + function buildStateResponse( runtime: ListenerRuntime, stateSeq: number, @@ -2362,7 +2526,9 @@ async function connectWithRetry( if (runtime !== activeRuntime || runtime.intentionallyClosed) { return; } - resolvePendingApprovalResolver(runtime, parsed.response); + if (resolvePendingApprovalResolver(runtime, parsed.response)) { + scheduleQueuePump(runtime, socket, opts); + } return; } @@ -2492,6 +2658,7 @@ async function connectWithRetry( accepted: true, runId: requestedRunId, }); + scheduleQueuePump(runtime, socket, opts); return; } @@ -2547,7 +2714,6 @@ async function connectWithRetry( // Serialize recovery with normal message handling to avoid concurrent // handleIncomingMessage execution when user messages arrive concurrently. - runtime.pendingTurns++; runtime.messageQueue = runtime.messageQueue .then(async () => { try { @@ -2569,10 +2735,7 @@ async function connectWithRetry( conversation_id: runtime.activeConversationId ?? undefined, }); } finally { - runtime.pendingTurns--; - if (runtime.pendingTurns === 0) { - runtime.queueRuntime.resetBlockedState(); - } + scheduleQueuePump(runtime, socket, opts); } }) .catch((error: unknown) => { @@ -2606,108 +2769,47 @@ async function connectWithRetry( return; } - // Queue lifecycle tracking: only enqueue user MessageCreate payloads. - const firstPayload = parsed.messages.at(0); - const isUserMessage = - firstPayload !== undefined && "content" in firstPayload; - let enqueuedQueueItemId: string | null = null; - if (isUserMessage) { - const userPayload = firstPayload as MessageCreate & { - client_message_id?: string; - }; - const enqueuedItem = runtime.queueRuntime.enqueue({ - kind: "message", - source: "user", - content: userPayload.content, - clientMessageId: - userPayload.client_message_id ?? `cm-submit-${crypto.randomUUID()}`, - agentId: parsed.agentId ?? undefined, - conversationId: parsed.conversationId || "default", - } as Parameters[0]); - enqueuedQueueItemId = enqueuedItem?.id ?? null; - // Emit blocked on state transition when turns are already queued. - // pendingTurns is incremented synchronously (below) before .then(), - // so a second arrival always sees the correct count. - if (runtime.pendingTurns > 0) { - runtime.queueRuntime.tryDequeue("runtime_busy"); + if (shouldQueueInboundMessage(parsed)) { + const firstUserPayload = parsed.messages.find( + ( + payload, + ): payload is MessageCreate & { client_message_id?: string } => + "content" in payload, + ); + if (firstUserPayload) { + const enqueuedItem = runtime.queueRuntime.enqueue({ + kind: "message", + source: "user", + content: firstUserPayload.content, + clientMessageId: + firstUserPayload.client_message_id ?? + `cm-submit-${crypto.randomUUID()}`, + agentId: parsed.agentId ?? undefined, + conversationId: parsed.conversationId || "default", + } as Parameters[0]); + if (enqueuedItem) { + runtime.queuedMessagesByItemId.set(enqueuedItem.id, parsed); + } } + scheduleQueuePump(runtime, socket, opts); + return; } - // Increment synchronously before chaining to avoid scheduling races - runtime.pendingTurns++; runtime.messageQueue = runtime.messageQueue .then(async () => { if (runtime !== activeRuntime || runtime.intentionallyClosed) { - runtime.pendingTurns--; return; } - - let messageForTurn = parsed; - let dequeuedBatchId: string | null = null; - if (isUserMessage && enqueuedQueueItemId) { - if (runtime.coalescedSkipQueueItemIds.has(enqueuedQueueItemId)) { - runtime.coalescedSkipQueueItemIds.delete(enqueuedQueueItemId); - runtime.pendingTurns--; - if (runtime.pendingTurns === 0) { - runtime.queueRuntime.resetBlockedState(); - } - return; - } - - const dequeuedBatch = runtime.queueRuntime.tryDequeue(null); - if (!dequeuedBatch) { - runtime.pendingTurns--; - if (runtime.pendingTurns === 0) { - runtime.queueRuntime.resetBlockedState(); - } - return; - } - - dequeuedBatchId = dequeuedBatch.batchId; - for (const item of dequeuedBatch.items) { - if (item.id !== enqueuedQueueItemId) { - runtime.coalescedSkipQueueItemIds.add(item.id); - } - } - - const mergedContent = mergeDequeuedBatchContent( - dequeuedBatch.items, - ); - if (mergedContent !== null) { - const firstMessage = parsed.messages.at(0); - if (firstMessage && "content" in firstMessage) { - const mergedFirstMessage = { - ...firstMessage, - content: mergedContent, - }; - messageForTurn = { - ...parsed, - messages: [mergedFirstMessage, ...parsed.messages.slice(1)], - }; - } - } - } - - // onStatusChange("receiving") is inside try so that any throw - // still reaches the finally and decrements pendingTurns. - try { - opts.onStatusChange?.("receiving", opts.connectionId); - await handleIncomingMessage( - messageForTurn, - socket, - runtime, - opts.onStatusChange, - opts.connectionId, - dequeuedBatchId ?? `batch-direct-${crypto.randomUUID()}`, - ); - opts.onStatusChange?.("idle", opts.connectionId); - } finally { - runtime.pendingTurns--; - // Reset blocked state only when queue is fully drained - if (runtime.pendingTurns === 0) { - runtime.queueRuntime.resetBlockedState(); - } - } + opts.onStatusChange?.("receiving", opts.connectionId); + await handleIncomingMessage( + parsed, + socket, + runtime, + opts.onStatusChange, + opts.connectionId, + ); + opts.onStatusChange?.("idle", opts.connectionId); + scheduleQueuePump(runtime, socket, opts); }) .catch((error: unknown) => { if (process.env.DEBUG) { @@ -2731,7 +2833,7 @@ async function connectWithRetry( // Single authoritative queue_cleared emission for all close paths // (intentional and unintentional). Must fire before early returns. - runtime.coalescedSkipQueueItemIds.clear(); + runtime.queuedMessagesByItemId.clear(); runtime.queueRuntime.clear("shutdown"); if (process.env.DEBUG) {