diff --git a/src/headless.ts b/src/headless.ts index ea2b62e..1808a99 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -74,6 +74,7 @@ import { } from "./cli/startupFlagValidation"; import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants"; import { computeDiffPreviews } from "./helpers/diffPreview"; +import { QueueRuntime } from "./queue/queueRuntime"; import { mergeQueuedTurnInput, type QueuedTurnInput, @@ -106,6 +107,7 @@ import type { ErrorMessage, ListMessagesControlRequest, MessageWire, + QueueLifecycleEvent, RecoveryMessage, ResultMessage, RetryMessage, @@ -2478,6 +2480,116 @@ async function runBidirectionalMode( const lineQueue: string[] = []; let lineResolver: ((line: string | null) => void) | null = null; + // ── Queue lifecycle tracking (stream-json only) ──────────────── + // Bidirectional mode always runs under stream-json input format, so queue + // events are always emitted here. emitQueueEvent is a no-op guard retained + // for clarity and future-proofing against non-stream-json callers. + const emitQueueEvent = (e: QueueLifecycleEvent): void => { + console.log(JSON.stringify(e)); + }; + + let turnInProgress = false; + + const msgQueueRuntime = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => + emitQueueEvent({ + type: "queue_item_enqueued", + item_id: item.id, + source: item.source, + kind: item.kind, + queue_len: queueLen, + session_id: sessionId, + uuid: `q-enq-${item.id}`, + }), + onDequeued: (batch) => + emitQueueEvent({ + type: "queue_batch_dequeued", + batch_id: batch.batchId, + item_ids: batch.items.map((i) => i.id), + merged_count: batch.mergedCount, + queue_len_after: batch.queueLenAfter, + session_id: sessionId, + uuid: `q-deq-${batch.batchId}`, + }), + onCleared: (reason, clearedCount) => + emitQueueEvent({ + type: "queue_cleared", + reason, + cleared_count: clearedCount, + session_id: sessionId, + uuid: `q-clr-${randomUUID()}`, + }), + }, + }); + + /** + * Parses a raw JSON line and returns the queue item payload if it is a + * user message or task_notification. Returns null for control lines + * (control_request, control_response, etc.) and malformed JSON. + */ + function parseUserLine(raw: string): { + kind: "message" | "task_notification"; + content: string; + } | null { + if (!raw.trim()) return null; + try { + const parsed: { + type?: string; + message?: { content?: string }; + _queuedKind?: string; + } = JSON.parse(raw); + if (parsed.type !== "user" || parsed.message?.content === undefined) + return null; + const kind = + parsed._queuedKind === "task_notification" + ? "task_notification" + : "message"; + return { kind, content: parsed.message.content }; + } catch { + return null; + } + } + + /** + * Emit queue_blocked on the FIRST user/task line arrival during an active + * turn. Does NOT enqueue to msgQueueRuntime — that happens later, at the + * coalescing loop where consumption is certain (avoids orphaned items from + * the external-tool wait loop which drops non-matching lines silently). + */ + let blockedEmittedThisTurn = false; + function maybeNotifyBlocked(raw: string): void { + if (!turnInProgress || blockedEmittedThisTurn) return; + if (!parseUserLine(raw)) return; + blockedEmittedThisTurn = true; + // queue_len: count user/task items currently in lineQueue (best-effort) + const queueLen = lineQueue.filter((l) => parseUserLine(l) !== null).length; + emitQueueEvent({ + type: "queue_blocked", + reason: "runtime_busy", + queue_len: Math.max(1, queueLen), + session_id: sessionId, + uuid: `q-blk-${randomUUID()}`, + }); + } + + /** Enqueue a BidirectionalQueuedInput into msgQueueRuntime for lifecycle tracking. */ + function enqueueForTracking(input: BidirectionalQueuedInput): void { + if (input.kind === "task_notification") { + msgQueueRuntime.enqueue({ + kind: "task_notification", + source: "task_notification", + text: input.text, + } as Parameters[0]); + } else { + msgQueueRuntime.enqueue({ + kind: "message", + source: "user", + content: input.content, + } as Parameters[0]); + } + } + const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) => JSON.stringify({ type: "user", @@ -2492,6 +2604,7 @@ async function runBidirectionalMode( // used by user input so bidirectional mode inherits TUI-style queue behavior. setMessageQueueAdder((queuedMessage) => { const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage); + maybeNotifyBlocked(syntheticUserLine); if (lineResolver) { const resolve = lineResolver; lineResolver = null; @@ -2503,6 +2616,7 @@ async function runBidirectionalMode( // Feed lines into queue or resolver rl.on("line", (line) => { + maybeNotifyBlocked(line); if (lineResolver) { const resolve = lineResolver; lineResolver = null; @@ -2514,6 +2628,7 @@ async function runBidirectionalMode( rl.on("close", () => { setMessageQueueAdder(null); + msgQueueRuntime.clear("shutdown"); if (lineResolver) { const resolve = lineResolver; lineResolver = null; @@ -2904,6 +3019,18 @@ async function runBidirectionalMode( break; } + // Enqueue consumed items into msgQueueRuntime for lifecycle tracking. + // Done here (not at arrival) to avoid orphaned items from the external- + // tool wait loop, which consumes non-matching lines via getNextLine() + // without deferring them back to lineQueue. + for (const input of queuedInputs) { + enqueueForTracking(input); + } + // Signal dequeue for exactly the items we just enqueued. consumeItems(n) + // bypasses QueueRuntime's internal coalescing policy so the count matches + // what the coalescing loop actually yielded. + msgQueueRuntime.consumeItems(queuedInputs.length); + const userContent = mergeBidirectionalQueuedInput(queuedInputs); if (userContent === null) { continue; @@ -2912,6 +3039,7 @@ async function runBidirectionalMode( // Create abort controller for this operation currentAbortController = new AbortController(); + turnInProgress = true; try { const buffers = createBuffers(agent.id); const startTime = performance.now(); @@ -3390,6 +3518,8 @@ async function runBidirectionalMode( }; console.log(JSON.stringify(errorResultMsg)); } finally { + turnInProgress = false; + blockedEmittedThisTurn = false; currentAbortController = null; } continue; diff --git a/src/queue/queueRuntime.ts b/src/queue/queueRuntime.ts index 8236a3c..cd3f8e4 100644 --- a/src/queue/queueRuntime.ts +++ b/src/queue/queueRuntime.ts @@ -7,6 +7,8 @@ import type { QueueItemSource, } from "../types/protocol"; +export type { QueueBlockedReason, QueueClearedReason, QueueItemKind }; + // ── Item types ─────────────────────────────────────────────────── type QueueItemBase = { @@ -256,6 +258,41 @@ export class QueueRuntime { return result; } + /** + * Caller-controlled dequeue: removes exactly the first `n` items (or all + * available if fewer exist) without applying the coalescable/barrier policy. + * Used when the caller has already decided how many items to consume (e.g. + * headless coalescing loop, listen one-message-per-turn). + * Returns null if queue is empty or n <= 0. + */ + consumeItems(n: number): DequeuedBatch | null { + if (this.store.length === 0 || n <= 0) return null; + const count = Math.min(n, this.store.length); + const batch = this.store.splice(0, count); + if (this.store.length === 0) { + this.blockedEmittedForNonEmpty = false; + } + const result: DequeuedBatch = { + batchId: `batch-${++this.nextBatchId}`, + items: batch, + mergedCount: count, + queueLenAfter: this.store.length, + }; + this.safeCallback("onDequeued", result); + return result; + } + + /** + * Reset blocked-reason tracking after a turn completes (unblocked transition). + * Call when the consumer becomes idle so the next arrival can re-emit + * onBlocked correctly. Should only be called when the queue is actually + * idle (i.e. pendingTurns === 0 in listen, turnInProgress === false in headless). + */ + resetBlockedState(): void { + this.lastEmittedBlockedReason = null; + this.blockedEmittedForNonEmpty = false; + } + // ── Clear ────────────────────────────────────────────────────── /** Remove all items and fire onCleared. */ diff --git a/src/tests/headless/queue-lifecycle-events.test.ts b/src/tests/headless/queue-lifecycle-events.test.ts new file mode 100644 index 0000000..90a26a2 --- /dev/null +++ b/src/tests/headless/queue-lifecycle-events.test.ts @@ -0,0 +1,305 @@ +/** + * Tests for PRQ3: queue lifecycle event emission in headless bidirectional mode. + * + * Invariants verified: + * - parseUserLine: correctly classifies lines + * - blocked events fire directly at arrival time (not via QueueRuntime), + * once per turn, on first user/task arrival while turnInProgress + * - enqueued + dequeued events fire together at coalescing-loop time + * (not at arrival), eliminating orphans from external-tool drop + * - external-tool drop scenario: blocked fires at arrival, no enqueued + * event for the dropped item + * - exit paths emit queue_cleared + * - control lines produce no events + */ + +import { describe, expect, test } from "bun:test"; +import type { BidirectionalQueuedInput } from "../../headless"; +import type { + DequeuedBatch, + QueueBlockedReason, + QueueClearedReason, + QueueItem, +} from "../../queue/queueRuntime"; +import { QueueRuntime } from "../../queue/queueRuntime"; + +// ── Helpers mirroring production logic ─────────────────────────── + +type ParsedLine = + | { kind: "message"; content: string } + | { kind: "task_notification"; content: string } + | null; + +function parseUserLine(raw: string): ParsedLine { + if (!raw.trim()) return null; + try { + const parsed: { + type?: string; + message?: { content?: string }; + _queuedKind?: string; + } = JSON.parse(raw); + if (parsed.type !== "user" || parsed.message?.content === undefined) + return null; + const kind = + parsed._queuedKind === "task_notification" + ? "task_notification" + : "message"; + return { kind, content: parsed.message.content }; + } catch { + return null; + } +} + +function makeUserLine(content: string): string { + return JSON.stringify({ type: "user", message: { content } }); +} + +function makeTaskLine(text: string): string { + return JSON.stringify({ + type: "user", + message: { content: text }, + _queuedKind: "task_notification", + }); +} + +function makeControlLine(requestId = "req-1"): string { + return JSON.stringify({ + type: "control_response", + response: { subtype: "decision", request_id: requestId, decision: "allow" }, + }); +} + +// ── Shared queue builder ────────────────────────────────────────── + +type Recorded = { + enqueued: Array<{ item: QueueItem; queueLen: number }>; + dequeued: DequeuedBatch[]; + blocked: Array<{ reason: QueueBlockedReason; queueLen: number }>; + cleared: Array<{ reason: QueueClearedReason; count: number }>; +}; + +function buildRuntime(): { q: QueueRuntime; rec: Recorded } { + const rec: Recorded = { + enqueued: [], + dequeued: [], + blocked: [], + cleared: [], + }; + const q = new QueueRuntime({ + callbacks: { + onEnqueued: (item, queueLen) => rec.enqueued.push({ item, queueLen }), + onDequeued: (batch) => rec.dequeued.push(batch), + onBlocked: (reason, queueLen) => rec.blocked.push({ reason, queueLen }), + onCleared: (reason, count) => rec.cleared.push({ reason, count }), + }, + }); + return { q, rec }; +} + +/** Mirrors enqueueForTracking() from headless. */ +function enqueueForTracking( + q: QueueRuntime, + input: BidirectionalQueuedInput, +): void { + if (input.kind === "task_notification") { + q.enqueue({ + kind: "task_notification", + source: "task_notification", + text: input.text, + } as Parameters[0]); + } else { + q.enqueue({ + kind: "message", + source: "user", + content: input.content, + } as Parameters[0]); + } +} + +/** Mirrors maybeNotifyBlocked(): emits queue_blocked directly on first busy arrival. */ +type BlockedState = { emitted: boolean }; + +function maybeNotifyBlocked( + raw: string, + turnInProgress: boolean, + state: BlockedState, + blocked: Array<{ reason: string; queueLen: number }>, + lineQueue: string[], +): void { + if (!turnInProgress || state.emitted) return; + if (!parseUserLine(raw)) return; + state.emitted = true; + const queueLen = Math.max( + 1, + lineQueue.filter((l) => parseUserLine(l) !== null).length, + ); + blocked.push({ reason: "runtime_busy", queueLen }); +} + +// ── Tests ───────────────────────────────────────────────────────── + +describe("parseUserLine", () => { + test("returns null for control_response", () => { + expect(parseUserLine(makeControlLine())).toBeNull(); + }); + test("returns null for empty/whitespace", () => { + expect(parseUserLine("")).toBeNull(); + expect(parseUserLine(" ")).toBeNull(); + }); + test("returns null for malformed JSON", () => { + expect(parseUserLine("{not json")).toBeNull(); + }); + test("returns message for user line", () => { + const r = parseUserLine(makeUserLine("hello")); + expect(r?.kind).toBe("message"); + expect(r?.content).toBe("hello"); + }); + test("returns task_notification for task line", () => { + const r = parseUserLine(makeTaskLine("")); + expect(r?.kind).toBe("task_notification"); + expect(r?.content).toBe(""); + }); +}); + +describe("idle path — enqueued + dequeued fire together at coalescing time", () => { + test("no enqueued at arrival, enqueued+dequeued together in coalescing loop", () => { + const { q, rec } = buildRuntime(); + // Simulate: line arrives while idle → no enqueue at arrival + const lineQueue: string[] = []; + const blocked: Array<{ reason: string; queueLen: number }> = []; + const bstate: BlockedState = { emitted: false }; + const raw = makeUserLine("hello"); + lineQueue.push(raw); + maybeNotifyBlocked(raw, false /* idle */, bstate, blocked, lineQueue); + expect(rec.enqueued).toHaveLength(0); // not yet + expect(blocked).toHaveLength(0); // idle: no blocked + + // Coalescing loop consumes the item + const input: BidirectionalQueuedInput = { kind: "user", content: "hello" }; + enqueueForTracking(q, input); + q.consumeItems(1); + + expect(rec.enqueued).toHaveLength(1); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(1); + expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0); + }); +}); + +describe("busy path — blocked fires at arrival, enqueued+dequeued at next turn", () => { + test("blocked fires on first user arrival during turn; enqueued fires at coalescing", () => { + const { q, rec } = buildRuntime(); + const lineQueue: string[] = []; + const blocked: Array<{ reason: string; queueLen: number }> = []; + const bstate: BlockedState = { emitted: false }; + + // Turn 1 in progress + const raw = makeUserLine("msg-during-turn"); + lineQueue.push(raw); + maybeNotifyBlocked(raw, true /* busy */, bstate, blocked, lineQueue); + expect(blocked).toHaveLength(1); + expect(blocked.at(0)?.reason).toBe("runtime_busy"); + expect(rec.enqueued).toHaveLength(0); // NOT enqueued yet at arrival + + // Second arrival — no new blocked (dedup) + const raw2 = makeUserLine("msg2"); + lineQueue.push(raw2); + maybeNotifyBlocked(raw2, true, bstate, blocked, lineQueue); + expect(blocked).toHaveLength(1); // still 1 + + // Turn ends, bstate resets + bstate.emitted = false; + + // Turn 2 coalescing loop consumes both + for (const input of [ + { kind: "user" as const, content: "msg-during-turn" }, + { kind: "user" as const, content: "msg2" }, + ]) { + enqueueForTracking(q, input); + } + q.consumeItems(2); + + expect(rec.enqueued).toHaveLength(2); + expect(rec.dequeued).toHaveLength(1); + expect(rec.dequeued.at(0)?.mergedCount).toBe(2); + expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0); + }); +}); + +describe("external-tool drop scenario — no orphaned items", () => { + test("blocked fires at arrival, dropped line never enters QueueRuntime", () => { + const { q, rec } = buildRuntime(); + const lineQueue: string[] = []; + const blocked: Array<{ reason: string; queueLen: number }> = []; + const bstate: BlockedState = { emitted: false }; + + // User line arrives during turn (external-tool wait in progress) + const raw = makeUserLine("user-msg-during-ext-tool"); + lineQueue.push(raw); + maybeNotifyBlocked(raw, true, bstate, blocked, lineQueue); + expect(blocked).toHaveLength(1); // blocked fires on arrival + + // External-tool wait loop DROPS the line (not deferred back, just consumed) + lineQueue.shift(); // simulates getNextLine() consuming without deferring + + // QueueRuntime should have NO items (arrival never enqueued) + expect(q.length).toBe(0); + expect(rec.enqueued).toHaveLength(0); + + // consumeItems(0) — nothing was in the coalescing loop (no user items) + const result = q.consumeItems(0); + expect(result).toBeNull(); + expect(rec.dequeued).toHaveLength(0); // no dequeue event + }); +}); + +describe("control line barrier", () => { + test("control line produces no events", () => { + const { q, rec } = buildRuntime(); + const lineQueue: string[] = []; + const blocked: Array<{ reason: string; queueLen: number }> = []; + const bstate: BlockedState = { emitted: false }; + + maybeNotifyBlocked(makeControlLine(), true, bstate, blocked, lineQueue); + expect(blocked).toHaveLength(0); + expect(rec.enqueued).toHaveLength(0); + expect(q.length).toBe(0); + }); +}); + +describe("coalesced batch — task + user", () => { + test("enqueueForTracking + consumeItems(2) fires correct batch", () => { + const { q, rec } = buildRuntime(); + const inputs: BidirectionalQueuedInput[] = [ + { kind: "task_notification", text: "" }, + { kind: "user", content: "follow-up" }, + ]; + for (const input of inputs) enqueueForTracking(q, input); + q.consumeItems(2); + expect(rec.dequeued.at(0)?.mergedCount).toBe(2); + expect(rec.dequeued.at(0)?.items.at(0)?.kind).toBe("task_notification"); + expect(rec.dequeued.at(0)?.items.at(1)?.kind).toBe("message"); + }); +}); + +describe("exit paths", () => { + test("clear(shutdown) emits queue_cleared and drains", () => { + const { q, rec } = buildRuntime(); + enqueueForTracking(q, { kind: "user", content: "pending" }); + q.clear("shutdown"); + expect(rec.cleared.at(0)?.reason).toBe("shutdown"); + expect(rec.cleared.at(0)?.count).toBe(1); + expect(q.length).toBe(0); + }); + test("clear(error) emits queue_cleared", () => { + const { q, rec } = buildRuntime(); + enqueueForTracking(q, { kind: "user", content: "pending" }); + q.clear("error"); + expect(rec.cleared.at(0)?.reason).toBe("error"); + }); + test("clear on empty queue fires with count=0", () => { + const { q, rec } = buildRuntime(); + q.clear("shutdown"); + expect(rec.cleared.at(0)?.count).toBe(0); + }); +}); diff --git a/src/tests/queue/queueRuntime.test.ts b/src/tests/queue/queueRuntime.test.ts index e0a551f..bbd29db 100644 --- a/src/tests/queue/queueRuntime.test.ts +++ b/src/tests/queue/queueRuntime.test.ts @@ -425,3 +425,96 @@ describe("IDs and accessors", () => { expect(q.length).toBe(2); // unchanged }); }); + +// ── consumeItems ────────────────────────────────────────────────── + +describe("consumeItems", () => { + test("removes exactly n items and fires onDequeued with correct batch", () => { + const batches: DequeuedBatch[] = []; + const q = new QueueRuntime({ + callbacks: { onDequeued: (b) => batches.push(b) }, + }); + q.enqueue(makeMsg("a")); + q.enqueue(makeMsg("b")); + q.enqueue(makeMsg("c")); + const result = q.consumeItems(2); + expect(result).not.toBeNull(); + expect(result?.mergedCount).toBe(2); + expect(result?.queueLenAfter).toBe(1); + expect(q.length).toBe(1); + expect(batches).toHaveLength(1); + expect(batches.at(0)?.mergedCount).toBe(2); + }); + + test("consumes all when n exceeds queue length", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg("a")); + const result = q.consumeItems(100); + expect(result?.mergedCount).toBe(1); + expect(q.length).toBe(0); + }); + + test("returns null on empty queue", () => { + const q = new QueueRuntime(); + expect(q.consumeItems(1)).toBeNull(); + }); + + test("returns null for n <= 0", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg()); + expect(q.consumeItems(0)).toBeNull(); + expect(q.length).toBe(1); // unchanged + }); + + test("respects barrier items — does not skip them", () => { + // consumeItems(2) with [approval_result, message] should consume both + // since it ignores the barrier policy + const q = new QueueRuntime(); + q.enqueue(makeApproval()); + q.enqueue(makeMsg("a")); + const result = q.consumeItems(2); + expect(result?.mergedCount).toBe(2); + expect(q.length).toBe(0); + }); + + test("resets blockedEmittedForNonEmpty when queue drains", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg("a")); + q.tryDequeue("runtime_busy"); // emits blocked + q.consumeItems(1); // drains queue + q.enqueue(makeMsg("b")); + q.tryDequeue("runtime_busy"); // should emit blocked again (epoch reset on drain) + expect(blocked).toHaveLength(2); + }); +}); + +// ── resetBlockedState ───────────────────────────────────────────── + +describe("resetBlockedState", () => { + test("allows same reason to re-emit onBlocked after reset", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg()); + q.tryDequeue("runtime_busy"); // first blocked emit + q.tryDequeue("runtime_busy"); // same reason — no second emit + q.resetBlockedState(); + q.tryDequeue("runtime_busy"); // after reset — emits again + expect(blocked).toHaveLength(2); + expect(blocked).toEqual(["runtime_busy", "runtime_busy"]); + }); + + test("does not fire onBlocked on empty queue after reset", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.resetBlockedState(); // no-op on empty + q.tryDequeue("runtime_busy"); + expect(blocked).toHaveLength(0); + }); +});