diff --git a/src/tests/websocket/listen-queue-events.test.ts b/src/tests/websocket/listen-queue-events.test.ts new file mode 100644 index 0000000..d082397 --- /dev/null +++ b/src/tests/websocket/listen-queue-events.test.ts @@ -0,0 +1,276 @@ +/** + * Integration-level tests for PRQ5: queue lifecycle event emission in + * listen-client.ts. + * + * These tests drive QueueRuntime directly, mirroring the wiring pattern in + * listen-client to verify: + * - Single message: enqueued → dequeued, no blocked, real queue_len + * - Two rapid synchronous arrivals: second gets blocked(runtime_busy) + * because pendingTurns is incremented before the .then() chain + * - Connection close: queue_cleared("shutdown") emitted once + * - Per-turn error: no queue_cleared — queue continues for remaining turns + * - ApprovalCreate payloads (no `content` field) are not enqueued + * - QueueLifecycleEvent is assignable to WsProtocolEvent (type-level) + */ + +import { describe, expect, test } from "bun:test"; +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; +import type { + DequeuedBatch, + QueueBlockedReason, + QueueClearedReason, + QueueItem, +} from "../../queue/queueRuntime"; +import { QueueRuntime } from "../../queue/queueRuntime"; +import type { QueueLifecycleEvent } from "../../types/protocol"; +import type { WsProtocolEvent } from "../../websocket/listen-client"; + +// ── Type-level assertion: QueueLifecycleEvent ⊆ WsProtocolEvent ── +// Imports the real WsProtocolEvent from listen-client. If QueueLifecycleEvent +// is ever removed from that union, this assertion fails at compile time. +type _AssertAssignable = QueueLifecycleEvent extends WsProtocolEvent + ? true + : never; +const _typeCheck: _AssertAssignable = true; +void _typeCheck; // suppress unused warning + +// ── Helpers ─────────────────────────────────────────────────────── + +type Recorded = { + enqueued: Array<{ item: QueueItem; queueLen: number }>; + dequeued: DequeuedBatch[]; + blocked: Array<{ reason: QueueBlockedReason; queueLen: number }>; + cleared: Array<{ reason: QueueClearedReason; count: number }>; +}; + +function buildRuntime(): { q: QueueRuntime; rec: Recorded } { + const rec: Recorded = { + enqueued: [], + dequeued: [], + blocked: [], + cleared: [], + }; + const q = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => rec.enqueued.push({ item, queueLen }), + onDequeued: (batch) => rec.dequeued.push(batch), + onBlocked: (reason, queueLen) => rec.blocked.push({ reason, queueLen }), + onCleared: (reason, count) => rec.cleared.push({ reason, count }), + }, + }); + return { q, rec }; +} + +/** Mirrors listen-client message arrival logic for a MessageCreate payload. */ +function simulateMessageArrival( + q: QueueRuntime, + pendingTurnsRef: { value: number }, + payload: MessageCreate | ApprovalCreate, +): boolean { + const isUserMessage = "content" in payload; + if (isUserMessage) { + q.enqueue({ + kind: "message", + source: "user", + content: (payload as MessageCreate).content, + } as Parameters[0]); + if (pendingTurnsRef.value > 0) { + q.tryDequeue("runtime_busy"); + } + } + pendingTurnsRef.value++; // synchronous before .then() + return isUserMessage; +} + +/** Mirrors the start of the .then() chain callback. */ +function simulateTurnStart( + q: QueueRuntime, + _pendingTurnsRef: { value: number }, + isUserMessage: boolean, +): void { + if (isUserMessage) q.consumeItems(1); +} + +/** Mirrors the finally block. */ +function simulateTurnEnd( + q: QueueRuntime, + pendingTurnsRef: { value: number }, +): void { + pendingTurnsRef.value--; + if (pendingTurnsRef.value === 0) q.resetBlockedState(); +} + +function makeMessageCreate(text = "hello"): MessageCreate { + return { role: "user", content: text } as unknown as MessageCreate; +} + +function makeApprovalCreate(): ApprovalCreate { + // ApprovalCreate does NOT have a `content` field — used for legacy approval path + return { type: "approval", approvals: [] } as unknown as ApprovalCreate; +} + +// ── Tests ───────────────────────────────────────────────────────── + +describe("single message — idle path", () => { + test("enqueued → dequeued, no blocked, real queue_len values", () => { + const { q, rec } = buildRuntime(); + const turns = { value: 0 }; + + const isUser = simulateMessageArrival(q, turns, makeMessageCreate()); + expect(rec.enqueued).toHaveLength(1); + expect(rec.enqueued.at(0)?.queueLen).toBe(1); + expect(rec.blocked).toHaveLength(0); + + simulateTurnStart(q, turns, isUser); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(1); + expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0); + + simulateTurnEnd(q, turns); + expect(turns.value).toBe(0); + expect(q.length).toBe(0); + }); +}); + +describe("two rapid messages — busy path", () => { + test("second arrival gets blocked(runtime_busy) due to sync pendingTurns", () => { + const { q, rec } = buildRuntime(); + const turns = { value: 0 }; + + // First message arrives + const isUser1 = simulateMessageArrival( + q, + turns, + makeMessageCreate("first"), + ); + expect(turns.value).toBe(1); // synchronously incremented + expect(rec.blocked).toHaveLength(0); // was 0 at arrival + + // Second message arrives BEFORE first turn's .then() runs + const isUser2 = simulateMessageArrival( + q, + turns, + makeMessageCreate("second"), + ); + expect(turns.value).toBe(2); + expect(rec.blocked).toHaveLength(1); + expect(rec.blocked.at(0)?.reason).toBe("runtime_busy"); + expect(rec.blocked.at(0)?.queueLen).toBe(2); // both enqueued + + // First turn runs + simulateTurnStart(q, turns, isUser1); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(1); + simulateTurnEnd(q, turns); + expect(turns.value).toBe(1); // second still pending + + // Second turn runs + simulateTurnStart(q, turns, isUser2); + expect(rec.dequeued).toHaveLength(2); + expect(rec.dequeued.at(1)?.mergedCount).toBe(1); + expect(rec.dequeued.at(1)?.queueLenAfter).toBe(0); + simulateTurnEnd(q, turns); + expect(turns.value).toBe(0); + }); + + test("blocked fires only once for same reason; resets when fully drained", () => { + const { q, rec } = buildRuntime(); + const turns = { value: 0 }; + + simulateMessageArrival(q, turns, makeMessageCreate("a")); + simulateMessageArrival(q, turns, makeMessageCreate("b")); // blocked + simulateMessageArrival(q, turns, makeMessageCreate("c")); // same reason — no extra blocked + expect(rec.blocked).toHaveLength(1); + + // Drain all three + for (let i = 0; i < 3; i++) { + simulateTurnStart(q, turns, true); + simulateTurnEnd(q, turns); + } + expect(turns.value).toBe(0); + + // New arrival after full drain — should be idle (no blocked) + simulateMessageArrival(q, turns, makeMessageCreate("d")); + expect(rec.blocked).toHaveLength(1); // still just the original one + }); +}); + +describe("pendingTurns safety — always decremented", () => { + test("pendingTurns decrements even when simulateTurnStart would throw", () => { + // Mirrors the production fix: onStatusChange("receiving") moved inside try + // so the finally always fires. Here we verify that the turn-end path + // (finally equivalent) always restores pendingTurns to 0. + const { q } = buildRuntime(); + const turns = { value: 0 }; + + simulateMessageArrival(q, turns, makeMessageCreate("msg")); + expect(turns.value).toBe(1); + + // Simulate: consumeItems fires, then an error before handleIncomingMessage + q.consumeItems(1); + // finally fires (error path) + simulateTurnEnd(q, turns); + expect(turns.value).toBe(0); // not leaked + expect(q.length).toBe(0); + }); +}); + +describe("ApprovalCreate payloads", () => { + test("ApprovalCreate is not enqueued (no content field)", () => { + const { q, rec } = buildRuntime(); + const turns = { value: 0 }; + + const isUser = simulateMessageArrival(q, turns, makeApprovalCreate()); + expect(isUser).toBe(false); + expect(rec.enqueued).toHaveLength(0); + expect(turns.value).toBe(1); // pendingTurns still increments + + // No consumeItems called in .then() + simulateTurnStart(q, turns, isUser); + expect(rec.dequeued).toHaveLength(0); + simulateTurnEnd(q, turns); + expect(turns.value).toBe(0); + }); +}); + +describe("connection close", () => { + test("clear(shutdown) emits queue_cleared exactly once for intentional close", () => { + const { q, rec } = buildRuntime(); + q.clear("shutdown"); + expect(rec.cleared).toHaveLength(1); + expect(rec.cleared.at(0)?.reason).toBe("shutdown"); + expect(rec.cleared.at(0)?.count).toBe(0); + }); + + test("clear(shutdown) emits with correct count when items are pending", () => { + const { q, rec } = buildRuntime(); + const turns = { value: 0 }; + simulateMessageArrival(q, turns, makeMessageCreate("pending")); + q.clear("shutdown"); // connection closed before turn ran + expect(rec.cleared.at(0)?.count).toBe(1); + expect(q.length).toBe(0); + }); +}); + +describe("per-turn error — no queue_cleared", () => { + test("turn error only decrements pendingTurns; remaining turns still dequeue", () => { + const { q, rec } = buildRuntime(); + const turns = { value: 0 }; + + simulateMessageArrival(q, turns, makeMessageCreate("first")); + simulateMessageArrival(q, turns, makeMessageCreate("second")); + + // First turn: simulate error — finally still runs + simulateTurnStart(q, turns, true); + simulateTurnEnd(q, turns); // error path still hits finally + expect(rec.cleared).toHaveLength(0); // no queue_cleared + + // Second turn still runs + simulateTurnStart(q, turns, true); + expect(rec.dequeued).toHaveLength(2); + simulateTurnEnd(q, turns); + expect(turns.value).toBe(0); + expect(rec.cleared).toHaveLength(0); // still no queue_cleared + }); +}); diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index fb90034..37721a0 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -31,6 +31,7 @@ import { generatePlanFilePath } from "../cli/helpers/planName"; import { drainStreamWithResume } from "../cli/helpers/stream"; import { computeDiffPreviews } from "../helpers/diffPreview"; import { permissionMode } from "../permissions/mode"; +import { QueueRuntime } from "../queue/queueRuntime"; import { settingsManager } from "../settings-manager"; import { isInteractiveApprovalTool } from "../tools/interactivePolicy"; import { loadTools } from "../tools/manager"; @@ -42,6 +43,7 @@ import type { ErrorMessage, MessageWire, ResultMessage as ProtocolResultMessage, + QueueLifecycleEvent, RecoveryMessage, RetryMessage, StopReasonType, @@ -140,6 +142,11 @@ type ListenerRuntime = { controlResponseCapable: boolean; /** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */ sessionId: string; + /** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */ + queueRuntime: QueueRuntime; + /** Count of turns currently queued or in-flight in the promise chain. Incremented + * synchronously on message arrival (before .then()) to avoid async scheduling races. */ + pendingTurns: number; }; type ApprovalSlot = @@ -192,7 +199,7 @@ const INITIAL_RETRY_DELAY_MS = 1000; // 1 second const MAX_RETRY_DELAY_MS = 30000; // 30 seconds function createRuntime(): ListenerRuntime { - return { + const runtime: ListenerRuntime = { socket: null, heartbeatInterval: null, reconnectTimeout: null, @@ -202,7 +209,63 @@ function createRuntime(): ListenerRuntime { pendingApprovalResolvers: new Map(), controlResponseCapable: false, sessionId: `listen-${crypto.randomUUID()}`, + pendingTurns: 0, + // queueRuntime assigned below — needs runtime ref in callbacks + queueRuntime: null as unknown as QueueRuntime, }; + runtime.queueRuntime = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => { + if (runtime.socket?.readyState === WebSocket.OPEN) { + emitToWS(runtime.socket, { + type: "queue_item_enqueued", + item_id: item.id, + source: item.source, + kind: item.kind, + queue_len: queueLen, + session_id: runtime.sessionId, + uuid: `q-enq-${item.id}`, + }); + } + }, + onDequeued: (batch) => { + if (runtime.socket?.readyState === WebSocket.OPEN) { + emitToWS(runtime.socket, { + type: "queue_batch_dequeued", + batch_id: batch.batchId, + item_ids: batch.items.map((i) => i.id), + merged_count: batch.mergedCount, + queue_len_after: batch.queueLenAfter, + session_id: runtime.sessionId, + uuid: `q-deq-${batch.batchId}`, + }); + } + }, + onBlocked: (reason, queueLen) => { + if (runtime.socket?.readyState === WebSocket.OPEN) { + emitToWS(runtime.socket, { + type: "queue_blocked", + reason, + queue_len: queueLen, + session_id: runtime.sessionId, + uuid: `q-blk-${crypto.randomUUID()}`, + }); + } + }, + onCleared: (reason, clearedCount) => { + if (runtime.socket?.readyState === WebSocket.OPEN) { + emitToWS(runtime.socket, { + type: "queue_cleared", + reason, + cleared_count: clearedCount, + session_id: runtime.sessionId, + uuid: `q-clr-${crypto.randomUUID()}`, + }); + } + }, + }, + }); + return runtime; } function clearRuntimeTimers(runtime: ListenerRuntime): void { @@ -303,13 +366,14 @@ function sendControlMessageOverWebSocket( // ── Typed protocol event adapter ──────────────────────────────── -type WsProtocolEvent = +export type WsProtocolEvent = | MessageWire | AutoApprovalMessage | ErrorMessage | RetryMessage | RecoveryMessage - | ProtocolResultMessage; + | ProtocolResultMessage + | QueueLifecycleEvent; /** * Single adapter for all outbound typed protocol events. @@ -672,21 +736,59 @@ async function connectWithRetry( // Handle incoming messages (queued for sequential processing) if (parsed.type === "message") { + // Queue lifecycle tracking: only enqueue if first payload is a + // MessageCreate (has `content`). ApprovalCreate payloads (legacy + // approval path) do not represent user-initiated messages. + const firstPayload = parsed.messages.at(0); + const isUserMessage = + firstPayload !== undefined && "content" in firstPayload; + if (isUserMessage) { + runtime.queueRuntime.enqueue({ + kind: "message", + source: "user", + content: (firstPayload as MessageCreate).content, + } as Parameters[0]); + // Emit blocked on state transition when turns are already queued. + // pendingTurns is incremented synchronously (below) before .then(), + // so a second arrival always sees the correct count. + if (runtime.pendingTurns > 0) { + runtime.queueRuntime.tryDequeue("runtime_busy"); + } + } + // Increment synchronously before chaining to avoid scheduling races + runtime.pendingTurns++; + runtime.messageQueue = runtime.messageQueue .then(async () => { if (runtime !== activeRuntime || runtime.intentionallyClosed) { + runtime.pendingTurns--; return; } - opts.onStatusChange?.("receiving", opts.connectionId); - await handleIncomingMessage( - parsed, - socket, - runtime, - opts.onStatusChange, - opts.connectionId, - ); - opts.onStatusChange?.("idle", opts.connectionId); + // Signal dequeue for exactly this one turn (one message per chain cb) + if (isUserMessage) { + runtime.queueRuntime.consumeItems(1); + } + + // onStatusChange("receiving") is inside try so that any throw + // still reaches the finally and decrements pendingTurns. + try { + opts.onStatusChange?.("receiving", opts.connectionId); + await handleIncomingMessage( + parsed, + socket, + runtime, + opts.onStatusChange, + opts.connectionId, + ); + opts.onStatusChange?.("idle", opts.connectionId); + } finally { + runtime.pendingTurns--; + // Reset blocked state only when queue is fully drained + if (runtime.pendingTurns === 0) { + runtime.queueRuntime.resetBlockedState(); + } + } }) .catch((error: unknown) => { if (process.env.DEBUG) { @@ -702,6 +804,10 @@ async function connectWithRetry( return; } + // Single authoritative queue_cleared emission for all close paths + // (intentional and unintentional). Must fire before early returns. + runtime.queueRuntime.clear("shutdown"); + if (process.env.DEBUG) { console.log( `[Listen] WebSocket disconnected (code: ${code}, reason: ${reason.toString()})`,