feat: queue headless integration (#1162)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-26 11:01:48 -08:00
committed by GitHub
parent b593460cec
commit d0dd53a472
4 changed files with 565 additions and 0 deletions

View File

@@ -74,6 +74,7 @@ import {
} from "./cli/startupFlagValidation";
import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants";
import { computeDiffPreviews } from "./helpers/diffPreview";
import { QueueRuntime } from "./queue/queueRuntime";
import {
mergeQueuedTurnInput,
type QueuedTurnInput,
@@ -106,6 +107,7 @@ import type {
ErrorMessage,
ListMessagesControlRequest,
MessageWire,
QueueLifecycleEvent,
RecoveryMessage,
ResultMessage,
RetryMessage,
@@ -2478,6 +2480,116 @@ async function runBidirectionalMode(
const lineQueue: string[] = [];
let lineResolver: ((line: string | null) => void) | null = null;
// ── Queue lifecycle tracking (stream-json only) ────────────────
// Bidirectional mode always runs under stream-json input format, so queue
// events are always emitted here. emitQueueEvent is a no-op guard retained
// for clarity and future-proofing against non-stream-json callers.
const emitQueueEvent = (e: QueueLifecycleEvent): void => {
console.log(JSON.stringify(e));
};
let turnInProgress = false;
const msgQueueRuntime = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) =>
emitQueueEvent({
type: "queue_item_enqueued",
item_id: item.id,
source: item.source,
kind: item.kind,
queue_len: queueLen,
session_id: sessionId,
uuid: `q-enq-${item.id}`,
}),
onDequeued: (batch) =>
emitQueueEvent({
type: "queue_batch_dequeued",
batch_id: batch.batchId,
item_ids: batch.items.map((i) => i.id),
merged_count: batch.mergedCount,
queue_len_after: batch.queueLenAfter,
session_id: sessionId,
uuid: `q-deq-${batch.batchId}`,
}),
onCleared: (reason, clearedCount) =>
emitQueueEvent({
type: "queue_cleared",
reason,
cleared_count: clearedCount,
session_id: sessionId,
uuid: `q-clr-${randomUUID()}`,
}),
},
});
/**
* Parses a raw JSON line and returns the queue item payload if it is a
* user message or task_notification. Returns null for control lines
* (control_request, control_response, etc.) and malformed JSON.
*/
function parseUserLine(raw: string): {
kind: "message" | "task_notification";
content: string;
} | null {
if (!raw.trim()) return null;
try {
const parsed: {
type?: string;
message?: { content?: string };
_queuedKind?: string;
} = JSON.parse(raw);
if (parsed.type !== "user" || parsed.message?.content === undefined)
return null;
const kind =
parsed._queuedKind === "task_notification"
? "task_notification"
: "message";
return { kind, content: parsed.message.content };
} catch {
return null;
}
}
/**
* Emit queue_blocked on the FIRST user/task line arrival during an active
* turn. Does NOT enqueue to msgQueueRuntime — that happens later, at the
* coalescing loop where consumption is certain (avoids orphaned items from
* the external-tool wait loop which drops non-matching lines silently).
*/
let blockedEmittedThisTurn = false;
function maybeNotifyBlocked(raw: string): void {
if (!turnInProgress || blockedEmittedThisTurn) return;
if (!parseUserLine(raw)) return;
blockedEmittedThisTurn = true;
// queue_len: count user/task items currently in lineQueue (best-effort)
const queueLen = lineQueue.filter((l) => parseUserLine(l) !== null).length;
emitQueueEvent({
type: "queue_blocked",
reason: "runtime_busy",
queue_len: Math.max(1, queueLen),
session_id: sessionId,
uuid: `q-blk-${randomUUID()}`,
});
}
/** Enqueue a BidirectionalQueuedInput into msgQueueRuntime for lifecycle tracking. */
function enqueueForTracking(input: BidirectionalQueuedInput): void {
if (input.kind === "task_notification") {
msgQueueRuntime.enqueue({
kind: "task_notification",
source: "task_notification",
text: input.text,
} as Parameters<typeof msgQueueRuntime.enqueue>[0]);
} else {
msgQueueRuntime.enqueue({
kind: "message",
source: "user",
content: input.content,
} as Parameters<typeof msgQueueRuntime.enqueue>[0]);
}
}
const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) =>
JSON.stringify({
type: "user",
@@ -2492,6 +2604,7 @@ async function runBidirectionalMode(
// used by user input so bidirectional mode inherits TUI-style queue behavior.
setMessageQueueAdder((queuedMessage) => {
const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage);
maybeNotifyBlocked(syntheticUserLine);
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
@@ -2503,6 +2616,7 @@ async function runBidirectionalMode(
// Feed lines into queue or resolver
rl.on("line", (line) => {
maybeNotifyBlocked(line);
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
@@ -2514,6 +2628,7 @@ async function runBidirectionalMode(
rl.on("close", () => {
setMessageQueueAdder(null);
msgQueueRuntime.clear("shutdown");
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
@@ -2904,6 +3019,18 @@ async function runBidirectionalMode(
break;
}
// Enqueue consumed items into msgQueueRuntime for lifecycle tracking.
// Done here (not at arrival) to avoid orphaned items from the external-
// tool wait loop, which consumes non-matching lines via getNextLine()
// without deferring them back to lineQueue.
for (const input of queuedInputs) {
enqueueForTracking(input);
}
// Signal dequeue for exactly the items we just enqueued. consumeItems(n)
// bypasses QueueRuntime's internal coalescing policy so the count matches
// what the coalescing loop actually yielded.
msgQueueRuntime.consumeItems(queuedInputs.length);
const userContent = mergeBidirectionalQueuedInput(queuedInputs);
if (userContent === null) {
continue;
@@ -2912,6 +3039,7 @@ async function runBidirectionalMode(
// Create abort controller for this operation
currentAbortController = new AbortController();
turnInProgress = true;
try {
const buffers = createBuffers(agent.id);
const startTime = performance.now();
@@ -3390,6 +3518,8 @@ async function runBidirectionalMode(
};
console.log(JSON.stringify(errorResultMsg));
} finally {
turnInProgress = false;
blockedEmittedThisTurn = false;
currentAbortController = null;
}
continue;

View File

@@ -7,6 +7,8 @@ import type {
QueueItemSource,
} from "../types/protocol";
export type { QueueBlockedReason, QueueClearedReason, QueueItemKind };
// ── Item types ───────────────────────────────────────────────────
type QueueItemBase = {
@@ -256,6 +258,41 @@ export class QueueRuntime {
return result;
}
/**
* Caller-controlled dequeue: removes exactly the first `n` items (or all
* available if fewer exist) without applying the coalescable/barrier policy.
* Used when the caller has already decided how many items to consume (e.g.
* headless coalescing loop, listen one-message-per-turn).
* Returns null if queue is empty or n <= 0.
*/
consumeItems(n: number): DequeuedBatch | null {
if (this.store.length === 0 || n <= 0) return null;
const count = Math.min(n, this.store.length);
const batch = this.store.splice(0, count);
if (this.store.length === 0) {
this.blockedEmittedForNonEmpty = false;
}
const result: DequeuedBatch = {
batchId: `batch-${++this.nextBatchId}`,
items: batch,
mergedCount: count,
queueLenAfter: this.store.length,
};
this.safeCallback("onDequeued", result);
return result;
}
/**
* Reset blocked-reason tracking after a turn completes (unblocked transition).
* Call when the consumer becomes idle so the next arrival can re-emit
* onBlocked correctly. Should only be called when the queue is actually
* idle (i.e. pendingTurns === 0 in listen, turnInProgress === false in headless).
*/
resetBlockedState(): void {
this.lastEmittedBlockedReason = null;
this.blockedEmittedForNonEmpty = false;
}
// ── Clear ──────────────────────────────────────────────────────
/** Remove all items and fire onCleared. */

View File

@@ -0,0 +1,305 @@
/**
* Tests for PRQ3: queue lifecycle event emission in headless bidirectional mode.
*
* Invariants verified:
* - parseUserLine: correctly classifies lines
* - blocked events fire directly at arrival time (not via QueueRuntime),
* once per turn, on first user/task arrival while turnInProgress
* - enqueued + dequeued events fire together at coalescing-loop time
* (not at arrival), eliminating orphans from external-tool drop
* - external-tool drop scenario: blocked fires at arrival, no enqueued
* event for the dropped item
* - exit paths emit queue_cleared
* - control lines produce no events
*/
import { describe, expect, test } from "bun:test";
import type { BidirectionalQueuedInput } from "../../headless";
import type {
DequeuedBatch,
QueueBlockedReason,
QueueClearedReason,
QueueItem,
} from "../../queue/queueRuntime";
import { QueueRuntime } from "../../queue/queueRuntime";
// ── Helpers mirroring production logic ───────────────────────────
type ParsedLine =
| { kind: "message"; content: string }
| { kind: "task_notification"; content: string }
| null;
function parseUserLine(raw: string): ParsedLine {
if (!raw.trim()) return null;
try {
const parsed: {
type?: string;
message?: { content?: string };
_queuedKind?: string;
} = JSON.parse(raw);
if (parsed.type !== "user" || parsed.message?.content === undefined)
return null;
const kind =
parsed._queuedKind === "task_notification"
? "task_notification"
: "message";
return { kind, content: parsed.message.content };
} catch {
return null;
}
}
function makeUserLine(content: string): string {
return JSON.stringify({ type: "user", message: { content } });
}
function makeTaskLine(text: string): string {
return JSON.stringify({
type: "user",
message: { content: text },
_queuedKind: "task_notification",
});
}
function makeControlLine(requestId = "req-1"): string {
return JSON.stringify({
type: "control_response",
response: { subtype: "decision", request_id: requestId, decision: "allow" },
});
}
// ── Shared queue builder ──────────────────────────────────────────
type Recorded = {
enqueued: Array<{ item: QueueItem; queueLen: number }>;
dequeued: DequeuedBatch[];
blocked: Array<{ reason: QueueBlockedReason; queueLen: number }>;
cleared: Array<{ reason: QueueClearedReason; count: number }>;
};
function buildRuntime(): { q: QueueRuntime; rec: Recorded } {
const rec: Recorded = {
enqueued: [],
dequeued: [],
blocked: [],
cleared: [],
};
const q = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => rec.enqueued.push({ item, queueLen }),
onDequeued: (batch) => rec.dequeued.push(batch),
onBlocked: (reason, queueLen) => rec.blocked.push({ reason, queueLen }),
onCleared: (reason, count) => rec.cleared.push({ reason, count }),
},
});
return { q, rec };
}
/** Mirrors enqueueForTracking() from headless. */
function enqueueForTracking(
q: QueueRuntime,
input: BidirectionalQueuedInput,
): void {
if (input.kind === "task_notification") {
q.enqueue({
kind: "task_notification",
source: "task_notification",
text: input.text,
} as Parameters<typeof q.enqueue>[0]);
} else {
q.enqueue({
kind: "message",
source: "user",
content: input.content,
} as Parameters<typeof q.enqueue>[0]);
}
}
/** Mirrors maybeNotifyBlocked(): emits queue_blocked directly on first busy arrival. */
type BlockedState = { emitted: boolean };
function maybeNotifyBlocked(
raw: string,
turnInProgress: boolean,
state: BlockedState,
blocked: Array<{ reason: string; queueLen: number }>,
lineQueue: string[],
): void {
if (!turnInProgress || state.emitted) return;
if (!parseUserLine(raw)) return;
state.emitted = true;
const queueLen = Math.max(
1,
lineQueue.filter((l) => parseUserLine(l) !== null).length,
);
blocked.push({ reason: "runtime_busy", queueLen });
}
// ── Tests ─────────────────────────────────────────────────────────
describe("parseUserLine", () => {
test("returns null for control_response", () => {
expect(parseUserLine(makeControlLine())).toBeNull();
});
test("returns null for empty/whitespace", () => {
expect(parseUserLine("")).toBeNull();
expect(parseUserLine(" ")).toBeNull();
});
test("returns null for malformed JSON", () => {
expect(parseUserLine("{not json")).toBeNull();
});
test("returns message for user line", () => {
const r = parseUserLine(makeUserLine("hello"));
expect(r?.kind).toBe("message");
expect(r?.content).toBe("hello");
});
test("returns task_notification for task line", () => {
const r = parseUserLine(makeTaskLine("<notif/>"));
expect(r?.kind).toBe("task_notification");
expect(r?.content).toBe("<notif/>");
});
});
describe("idle path — enqueued + dequeued fire together at coalescing time", () => {
test("no enqueued at arrival, enqueued+dequeued together in coalescing loop", () => {
const { q, rec } = buildRuntime();
// Simulate: line arrives while idle → no enqueue at arrival
const lineQueue: string[] = [];
const blocked: Array<{ reason: string; queueLen: number }> = [];
const bstate: BlockedState = { emitted: false };
const raw = makeUserLine("hello");
lineQueue.push(raw);
maybeNotifyBlocked(raw, false /* idle */, bstate, blocked, lineQueue);
expect(rec.enqueued).toHaveLength(0); // not yet
expect(blocked).toHaveLength(0); // idle: no blocked
// Coalescing loop consumes the item
const input: BidirectionalQueuedInput = { kind: "user", content: "hello" };
enqueueForTracking(q, input);
q.consumeItems(1);
expect(rec.enqueued).toHaveLength(1);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(1);
expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0);
});
});
describe("busy path — blocked fires at arrival, enqueued+dequeued at next turn", () => {
test("blocked fires on first user arrival during turn; enqueued fires at coalescing", () => {
const { q, rec } = buildRuntime();
const lineQueue: string[] = [];
const blocked: Array<{ reason: string; queueLen: number }> = [];
const bstate: BlockedState = { emitted: false };
// Turn 1 in progress
const raw = makeUserLine("msg-during-turn");
lineQueue.push(raw);
maybeNotifyBlocked(raw, true /* busy */, bstate, blocked, lineQueue);
expect(blocked).toHaveLength(1);
expect(blocked.at(0)?.reason).toBe("runtime_busy");
expect(rec.enqueued).toHaveLength(0); // NOT enqueued yet at arrival
// Second arrival — no new blocked (dedup)
const raw2 = makeUserLine("msg2");
lineQueue.push(raw2);
maybeNotifyBlocked(raw2, true, bstate, blocked, lineQueue);
expect(blocked).toHaveLength(1); // still 1
// Turn ends, bstate resets
bstate.emitted = false;
// Turn 2 coalescing loop consumes both
for (const input of [
{ kind: "user" as const, content: "msg-during-turn" },
{ kind: "user" as const, content: "msg2" },
]) {
enqueueForTracking(q, input);
}
q.consumeItems(2);
expect(rec.enqueued).toHaveLength(2);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(2);
expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0);
});
});
describe("external-tool drop scenario — no orphaned items", () => {
test("blocked fires at arrival, dropped line never enters QueueRuntime", () => {
const { q, rec } = buildRuntime();
const lineQueue: string[] = [];
const blocked: Array<{ reason: string; queueLen: number }> = [];
const bstate: BlockedState = { emitted: false };
// User line arrives during turn (external-tool wait in progress)
const raw = makeUserLine("user-msg-during-ext-tool");
lineQueue.push(raw);
maybeNotifyBlocked(raw, true, bstate, blocked, lineQueue);
expect(blocked).toHaveLength(1); // blocked fires on arrival
// External-tool wait loop DROPS the line (not deferred back, just consumed)
lineQueue.shift(); // simulates getNextLine() consuming without deferring
// QueueRuntime should have NO items (arrival never enqueued)
expect(q.length).toBe(0);
expect(rec.enqueued).toHaveLength(0);
// consumeItems(0) — nothing was in the coalescing loop (no user items)
const result = q.consumeItems(0);
expect(result).toBeNull();
expect(rec.dequeued).toHaveLength(0); // no dequeue event
});
});
describe("control line barrier", () => {
test("control line produces no events", () => {
const { q, rec } = buildRuntime();
const lineQueue: string[] = [];
const blocked: Array<{ reason: string; queueLen: number }> = [];
const bstate: BlockedState = { emitted: false };
maybeNotifyBlocked(makeControlLine(), true, bstate, blocked, lineQueue);
expect(blocked).toHaveLength(0);
expect(rec.enqueued).toHaveLength(0);
expect(q.length).toBe(0);
});
});
describe("coalesced batch — task + user", () => {
test("enqueueForTracking + consumeItems(2) fires correct batch", () => {
const { q, rec } = buildRuntime();
const inputs: BidirectionalQueuedInput[] = [
{ kind: "task_notification", text: "<notif/>" },
{ kind: "user", content: "follow-up" },
];
for (const input of inputs) enqueueForTracking(q, input);
q.consumeItems(2);
expect(rec.dequeued.at(0)?.mergedCount).toBe(2);
expect(rec.dequeued.at(0)?.items.at(0)?.kind).toBe("task_notification");
expect(rec.dequeued.at(0)?.items.at(1)?.kind).toBe("message");
});
});
describe("exit paths", () => {
test("clear(shutdown) emits queue_cleared and drains", () => {
const { q, rec } = buildRuntime();
enqueueForTracking(q, { kind: "user", content: "pending" });
q.clear("shutdown");
expect(rec.cleared.at(0)?.reason).toBe("shutdown");
expect(rec.cleared.at(0)?.count).toBe(1);
expect(q.length).toBe(0);
});
test("clear(error) emits queue_cleared", () => {
const { q, rec } = buildRuntime();
enqueueForTracking(q, { kind: "user", content: "pending" });
q.clear("error");
expect(rec.cleared.at(0)?.reason).toBe("error");
});
test("clear on empty queue fires with count=0", () => {
const { q, rec } = buildRuntime();
q.clear("shutdown");
expect(rec.cleared.at(0)?.count).toBe(0);
});
});

View File

@@ -425,3 +425,96 @@ describe("IDs and accessors", () => {
expect(q.length).toBe(2); // unchanged
});
});
// ── consumeItems ──────────────────────────────────────────────────
describe("consumeItems", () => {
test("removes exactly n items and fires onDequeued with correct batch", () => {
const batches: DequeuedBatch[] = [];
const q = new QueueRuntime({
callbacks: { onDequeued: (b) => batches.push(b) },
});
q.enqueue(makeMsg("a"));
q.enqueue(makeMsg("b"));
q.enqueue(makeMsg("c"));
const result = q.consumeItems(2);
expect(result).not.toBeNull();
expect(result?.mergedCount).toBe(2);
expect(result?.queueLenAfter).toBe(1);
expect(q.length).toBe(1);
expect(batches).toHaveLength(1);
expect(batches.at(0)?.mergedCount).toBe(2);
});
test("consumes all when n exceeds queue length", () => {
const q = new QueueRuntime();
q.enqueue(makeMsg("a"));
const result = q.consumeItems(100);
expect(result?.mergedCount).toBe(1);
expect(q.length).toBe(0);
});
test("returns null on empty queue", () => {
const q = new QueueRuntime();
expect(q.consumeItems(1)).toBeNull();
});
test("returns null for n <= 0", () => {
const q = new QueueRuntime();
q.enqueue(makeMsg());
expect(q.consumeItems(0)).toBeNull();
expect(q.length).toBe(1); // unchanged
});
test("respects barrier items — does not skip them", () => {
// consumeItems(2) with [approval_result, message] should consume both
// since it ignores the barrier policy
const q = new QueueRuntime();
q.enqueue(makeApproval());
q.enqueue(makeMsg("a"));
const result = q.consumeItems(2);
expect(result?.mergedCount).toBe(2);
expect(q.length).toBe(0);
});
test("resets blockedEmittedForNonEmpty when queue drains", () => {
const blocked: string[] = [];
const q = new QueueRuntime({
callbacks: { onBlocked: (r) => blocked.push(r) },
});
q.enqueue(makeMsg("a"));
q.tryDequeue("runtime_busy"); // emits blocked
q.consumeItems(1); // drains queue
q.enqueue(makeMsg("b"));
q.tryDequeue("runtime_busy"); // should emit blocked again (epoch reset on drain)
expect(blocked).toHaveLength(2);
});
});
// ── resetBlockedState ─────────────────────────────────────────────
describe("resetBlockedState", () => {
test("allows same reason to re-emit onBlocked after reset", () => {
const blocked: string[] = [];
const q = new QueueRuntime({
callbacks: { onBlocked: (r) => blocked.push(r) },
});
q.enqueue(makeMsg());
q.tryDequeue("runtime_busy"); // first blocked emit
q.tryDequeue("runtime_busy"); // same reason — no second emit
q.resetBlockedState();
q.tryDequeue("runtime_busy"); // after reset — emits again
expect(blocked).toHaveLength(2);
expect(blocked).toEqual(["runtime_busy", "runtime_busy"]);
});
test("does not fire onBlocked on empty queue after reset", () => {
const blocked: string[] = [];
const q = new QueueRuntime({
callbacks: { onBlocked: (r) => blocked.push(r) },
});
q.resetBlockedState(); // no-op on empty
q.tryDequeue("runtime_busy");
expect(blocked).toHaveLength(0);
});
});