fix: listener queue parity pump (#1338)

This commit is contained in:
Charles Packer
2026-03-10 16:33:56 -07:00
committed by GitHub
parent a69fbfe877
commit 382acacc7e
3 changed files with 273 additions and 110 deletions

View File

@@ -0,0 +1,43 @@
import { describe, expect, test } from "bun:test";
import { getListenerBlockedReason } from "../../websocket/helpers/listenerQueueAdapter";
const allClear = {
isProcessing: false,
pendingApprovalsLen: 0,
cancelRequested: false,
isRecoveringApprovals: false,
} as const;
describe("getListenerBlockedReason", () => {
test("returns null when unblocked", () => {
expect(getListenerBlockedReason(allClear)).toBeNull();
});
test("prioritizes pending approvals", () => {
expect(
getListenerBlockedReason({ ...allClear, pendingApprovalsLen: 2 }),
).toBe("pending_approvals");
});
test("prioritizes interrupt over runtime busy", () => {
expect(
getListenerBlockedReason({
...allClear,
cancelRequested: true,
isProcessing: true,
}),
).toBe("interrupt_in_progress");
});
test("maps recoveries to runtime busy", () => {
expect(
getListenerBlockedReason({ ...allClear, isRecoveringApprovals: true }),
).toBe("runtime_busy");
});
test("maps active processing to runtime busy", () => {
expect(getListenerBlockedReason({ ...allClear, isProcessing: true })).toBe(
"runtime_busy",
);
});
});

View File

@@ -0,0 +1,18 @@
import type { QueueBlockedReason } from "../../types/protocol";
export type ListenerQueueGatingConditions = {
isProcessing: boolean;
pendingApprovalsLen: number;
cancelRequested: boolean;
isRecoveringApprovals: boolean;
};
export function getListenerBlockedReason(
c: ListenerQueueGatingConditions,
): QueueBlockedReason | null {
if (c.pendingApprovalsLen > 0) return "pending_approvals";
if (c.cancelRequested) return "interrupt_in_progress";
if (c.isRecoveringApprovals) return "runtime_busy";
if (c.isProcessing) return "runtime_busy";
return null;
}

View File

@@ -39,7 +39,12 @@ import { drainStreamWithResume } from "../cli/helpers/stream";
import { INTERRUPTED_BY_USER } from "../constants";
import { computeDiffPreviews } from "../helpers/diffPreview";
import { permissionMode } from "../permissions/mode";
import { type QueueItem, QueueRuntime } from "../queue/queueRuntime";
import {
type DequeuedBatch,
type QueueBlockedReason,
type QueueItem,
QueueRuntime,
} from "../queue/queueRuntime";
import { mergeQueuedTurnInput } from "../queue/turnQueueRuntime";
import {
buildSharedReminderParts,
@@ -72,6 +77,7 @@ import type {
TranscriptBackfillMessage,
TranscriptSupplementMessage,
} from "../types/protocol";
import { getListenerBlockedReason } from "./helpers/listenerQueueAdapter";
interface StartListenerOptions {
connectionId: string;
@@ -317,13 +323,13 @@ type ListenerRuntime = {
cancelRequested: boolean;
/** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */
queueRuntime: QueueRuntime;
/**
* Queue item IDs that were coalesced into an earlier dequeued batch.
* Their already-scheduled promise-chain callbacks should no-op.
*/
coalescedSkipQueueItemIds: Set<string>;
/** Count of turns currently queued or in-flight in the promise chain. Incremented
* synchronously on message arrival (before .then()) to avoid async scheduling races. */
/** Correlates queued queue item ids to original inbound frames. */
queuedMessagesByItemId: Map<string, IncomingMessage>;
/** True while a queue drain pass is actively running. */
queuePumpActive: boolean;
/** Dedupes queue pump scheduling onto messageQueue chain. */
queuePumpScheduled: boolean;
/** Queue backlog metric for state snapshot visibility. */
pendingTurns: number;
/** Optional debug hook for WS event logging. */
onWsEvent?: StartListenerOptions["onWsEvent"];
@@ -546,7 +552,9 @@ function createRuntime(): ListenerRuntime {
reminderState: createSharedReminderState(),
bootWorkingDirectory,
workingDirectoryByConversation: new Map<string, string>(),
coalescedSkipQueueItemIds: new Set<string>(),
queuedMessagesByItemId: new Map<string, IncomingMessage>(),
queuePumpActive: false,
queuePumpScheduled: false,
pendingTurns: 0,
// queueRuntime assigned below — needs runtime ref in callbacks
queueRuntime: null as unknown as QueueRuntime,
@@ -554,6 +562,7 @@ function createRuntime(): ListenerRuntime {
runtime.queueRuntime = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => {
runtime.pendingTurns = queueLen;
if (runtime.socket?.readyState === WebSocket.OPEN) {
const content = item.kind === "message" ? item.content : item.text;
emitToWS(runtime.socket, {
@@ -573,6 +582,7 @@ function createRuntime(): ListenerRuntime {
}
},
onDequeued: (batch) => {
runtime.pendingTurns = batch.queueLenAfter;
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_batch_dequeued",
@@ -599,6 +609,7 @@ function createRuntime(): ListenerRuntime {
}
},
onCleared: (reason, clearedCount, items) => {
runtime.pendingTurns = 0;
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_cleared",
@@ -611,6 +622,7 @@ function createRuntime(): ListenerRuntime {
}
},
onDropped: (item, reason, queueLen) => {
runtime.pendingTurns = queueLen;
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_item_dropped",
@@ -891,6 +903,158 @@ function mergeDequeuedBatchContent(
});
}
function getPrimaryQueueMessageItem(items: QueueItem[]): QueueItem | null {
for (const item of items) {
if (item.kind === "message") {
return item;
}
}
return null;
}
function buildQueuedTurnMessage(
runtime: ListenerRuntime,
batch: DequeuedBatch,
): IncomingMessage | null {
const primaryItem = getPrimaryQueueMessageItem(batch.items);
if (!primaryItem) {
for (const item of batch.items) {
runtime.queuedMessagesByItemId.delete(item.id);
}
return null;
}
const template = runtime.queuedMessagesByItemId.get(primaryItem.id);
for (const item of batch.items) {
runtime.queuedMessagesByItemId.delete(item.id);
}
if (!template) {
return null;
}
const mergedContent = mergeDequeuedBatchContent(batch.items);
if (mergedContent === null) {
return null;
}
const firstMessageIndex = template.messages.findIndex(
(payload): payload is MessageCreate & { client_message_id?: string } =>
"content" in payload,
);
if (firstMessageIndex === -1) {
return null;
}
const firstMessage = template.messages[firstMessageIndex] as MessageCreate & {
client_message_id?: string;
};
const mergedFirstMessage = {
...firstMessage,
content: mergedContent,
};
const messages = template.messages.slice();
messages[firstMessageIndex] = mergedFirstMessage;
return {
...template,
messages,
};
}
function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
return parsed.messages.some((payload) => "content" in payload);
}
function computeListenerQueueBlockedReason(
runtime: ListenerRuntime,
): QueueBlockedReason | null {
return getListenerBlockedReason({
isProcessing: runtime.isProcessing,
pendingApprovalsLen: runtime.pendingApprovalResolvers.size,
cancelRequested: runtime.cancelRequested,
isRecoveringApprovals: runtime.isRecoveringApprovals,
});
}
async function drainQueuedMessages(
runtime: ListenerRuntime,
socket: WebSocket,
opts: StartListenerOptions,
): Promise<void> {
if (runtime.queuePumpActive) {
return;
}
runtime.queuePumpActive = true;
try {
while (true) {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
}
const blockedReason = computeListenerQueueBlockedReason(runtime);
if (blockedReason) {
runtime.queueRuntime.tryDequeue(blockedReason);
return;
}
const queueLen = runtime.queueRuntime.length;
if (queueLen === 0) {
return;
}
const dequeuedBatch = runtime.queueRuntime.consumeItems(queueLen);
if (!dequeuedBatch) {
return;
}
const queuedTurn = buildQueuedTurnMessage(runtime, dequeuedBatch);
if (!queuedTurn) {
continue;
}
opts.onStatusChange?.("receiving", opts.connectionId);
await handleIncomingMessage(
queuedTurn,
socket,
runtime,
opts.onStatusChange,
opts.connectionId,
dequeuedBatch.batchId,
);
opts.onStatusChange?.("idle", opts.connectionId);
}
} finally {
runtime.queuePumpActive = false;
}
}
function scheduleQueuePump(
runtime: ListenerRuntime,
socket: WebSocket,
opts: StartListenerOptions,
): void {
if (runtime.queuePumpScheduled) {
return;
}
runtime.queuePumpScheduled = true;
runtime.messageQueue = runtime.messageQueue
.then(async () => {
runtime.queuePumpScheduled = false;
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
}
await drainQueuedMessages(runtime, socket, opts);
})
.catch((error: unknown) => {
runtime.queuePumpScheduled = false;
if (process.env.DEBUG) {
console.error("[Listen] Error in queue pump:", error);
}
opts.onStatusChange?.("idle", opts.connectionId);
});
}
function buildStateResponse(
runtime: ListenerRuntime,
stateSeq: number,
@@ -2362,7 +2526,9 @@ async function connectWithRetry(
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
}
resolvePendingApprovalResolver(runtime, parsed.response);
if (resolvePendingApprovalResolver(runtime, parsed.response)) {
scheduleQueuePump(runtime, socket, opts);
}
return;
}
@@ -2492,6 +2658,7 @@ async function connectWithRetry(
accepted: true,
runId: requestedRunId,
});
scheduleQueuePump(runtime, socket, opts);
return;
}
@@ -2547,7 +2714,6 @@ async function connectWithRetry(
// Serialize recovery with normal message handling to avoid concurrent
// handleIncomingMessage execution when user messages arrive concurrently.
runtime.pendingTurns++;
runtime.messageQueue = runtime.messageQueue
.then(async () => {
try {
@@ -2569,10 +2735,7 @@ async function connectWithRetry(
conversation_id: runtime.activeConversationId ?? undefined,
});
} finally {
runtime.pendingTurns--;
if (runtime.pendingTurns === 0) {
runtime.queueRuntime.resetBlockedState();
}
scheduleQueuePump(runtime, socket, opts);
}
})
.catch((error: unknown) => {
@@ -2606,108 +2769,47 @@ async function connectWithRetry(
return;
}
// Queue lifecycle tracking: only enqueue user MessageCreate payloads.
const firstPayload = parsed.messages.at(0);
const isUserMessage =
firstPayload !== undefined && "content" in firstPayload;
let enqueuedQueueItemId: string | null = null;
if (isUserMessage) {
const userPayload = firstPayload as MessageCreate & {
client_message_id?: string;
};
const enqueuedItem = runtime.queueRuntime.enqueue({
kind: "message",
source: "user",
content: userPayload.content,
clientMessageId:
userPayload.client_message_id ?? `cm-submit-${crypto.randomUUID()}`,
agentId: parsed.agentId ?? undefined,
conversationId: parsed.conversationId || "default",
} as Parameters<typeof runtime.queueRuntime.enqueue>[0]);
enqueuedQueueItemId = enqueuedItem?.id ?? null;
// Emit blocked on state transition when turns are already queued.
// pendingTurns is incremented synchronously (below) before .then(),
// so a second arrival always sees the correct count.
if (runtime.pendingTurns > 0) {
runtime.queueRuntime.tryDequeue("runtime_busy");
if (shouldQueueInboundMessage(parsed)) {
const firstUserPayload = parsed.messages.find(
(
payload,
): payload is MessageCreate & { client_message_id?: string } =>
"content" in payload,
);
if (firstUserPayload) {
const enqueuedItem = runtime.queueRuntime.enqueue({
kind: "message",
source: "user",
content: firstUserPayload.content,
clientMessageId:
firstUserPayload.client_message_id ??
`cm-submit-${crypto.randomUUID()}`,
agentId: parsed.agentId ?? undefined,
conversationId: parsed.conversationId || "default",
} as Parameters<typeof runtime.queueRuntime.enqueue>[0]);
if (enqueuedItem) {
runtime.queuedMessagesByItemId.set(enqueuedItem.id, parsed);
}
}
scheduleQueuePump(runtime, socket, opts);
return;
}
// Increment synchronously before chaining to avoid scheduling races
runtime.pendingTurns++;
runtime.messageQueue = runtime.messageQueue
.then(async () => {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
runtime.pendingTurns--;
return;
}
let messageForTurn = parsed;
let dequeuedBatchId: string | null = null;
if (isUserMessage && enqueuedQueueItemId) {
if (runtime.coalescedSkipQueueItemIds.has(enqueuedQueueItemId)) {
runtime.coalescedSkipQueueItemIds.delete(enqueuedQueueItemId);
runtime.pendingTurns--;
if (runtime.pendingTurns === 0) {
runtime.queueRuntime.resetBlockedState();
}
return;
}
const dequeuedBatch = runtime.queueRuntime.tryDequeue(null);
if (!dequeuedBatch) {
runtime.pendingTurns--;
if (runtime.pendingTurns === 0) {
runtime.queueRuntime.resetBlockedState();
}
return;
}
dequeuedBatchId = dequeuedBatch.batchId;
for (const item of dequeuedBatch.items) {
if (item.id !== enqueuedQueueItemId) {
runtime.coalescedSkipQueueItemIds.add(item.id);
}
}
const mergedContent = mergeDequeuedBatchContent(
dequeuedBatch.items,
);
if (mergedContent !== null) {
const firstMessage = parsed.messages.at(0);
if (firstMessage && "content" in firstMessage) {
const mergedFirstMessage = {
...firstMessage,
content: mergedContent,
};
messageForTurn = {
...parsed,
messages: [mergedFirstMessage, ...parsed.messages.slice(1)],
};
}
}
}
// onStatusChange("receiving") is inside try so that any throw
// still reaches the finally and decrements pendingTurns.
try {
opts.onStatusChange?.("receiving", opts.connectionId);
await handleIncomingMessage(
messageForTurn,
socket,
runtime,
opts.onStatusChange,
opts.connectionId,
dequeuedBatchId ?? `batch-direct-${crypto.randomUUID()}`,
);
opts.onStatusChange?.("idle", opts.connectionId);
} finally {
runtime.pendingTurns--;
// Reset blocked state only when queue is fully drained
if (runtime.pendingTurns === 0) {
runtime.queueRuntime.resetBlockedState();
}
}
opts.onStatusChange?.("receiving", opts.connectionId);
await handleIncomingMessage(
parsed,
socket,
runtime,
opts.onStatusChange,
opts.connectionId,
);
opts.onStatusChange?.("idle", opts.connectionId);
scheduleQueuePump(runtime, socket, opts);
})
.catch((error: unknown) => {
if (process.env.DEBUG) {
@@ -2731,7 +2833,7 @@ async function connectWithRetry(
// Single authoritative queue_cleared emission for all close paths
// (intentional and unintentional). Must fire before early returns.
runtime.coalescedSkipQueueItemIds.clear();
runtime.queuedMessagesByItemId.clear();
runtime.queueRuntime.clear("shutdown");
if (process.env.DEBUG) {