feat: queue listen integration (#1163)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-26 11:02:01 -08:00
committed by GitHub
parent d0dd53a472
commit 908820c44a
2 changed files with 394 additions and 12 deletions

View File

@@ -0,0 +1,276 @@
/**
* Integration-level tests for PRQ5: queue lifecycle event emission in
* listen-client.ts.
*
* These tests drive QueueRuntime directly, mirroring the wiring pattern in
* listen-client to verify:
* - Single message: enqueued → dequeued, no blocked, real queue_len
* - Two rapid synchronous arrivals: second gets blocked(runtime_busy)
* because pendingTurns is incremented before the .then() chain
* - Connection close: queue_cleared("shutdown") emitted once
* - Per-turn error: no queue_cleared — queue continues for remaining turns
* - ApprovalCreate payloads (no `content` field) are not enqueued
* - QueueLifecycleEvent is assignable to WsProtocolEvent (type-level)
*/
import { describe, expect, test } from "bun:test";
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import type {
DequeuedBatch,
QueueBlockedReason,
QueueClearedReason,
QueueItem,
} from "../../queue/queueRuntime";
import { QueueRuntime } from "../../queue/queueRuntime";
import type { QueueLifecycleEvent } from "../../types/protocol";
import type { WsProtocolEvent } from "../../websocket/listen-client";
// ── Type-level assertion: QueueLifecycleEvent ⊆ WsProtocolEvent ──
// Imports the real WsProtocolEvent from listen-client. If QueueLifecycleEvent
// is ever removed from that union, this assertion fails at compile time.
type _AssertAssignable = QueueLifecycleEvent extends WsProtocolEvent
? true
: never;
const _typeCheck: _AssertAssignable = true;
void _typeCheck; // suppress unused warning
// ── Helpers ───────────────────────────────────────────────────────
type Recorded = {
enqueued: Array<{ item: QueueItem; queueLen: number }>;
dequeued: DequeuedBatch[];
blocked: Array<{ reason: QueueBlockedReason; queueLen: number }>;
cleared: Array<{ reason: QueueClearedReason; count: number }>;
};
function buildRuntime(): { q: QueueRuntime; rec: Recorded } {
const rec: Recorded = {
enqueued: [],
dequeued: [],
blocked: [],
cleared: [],
};
const q = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => rec.enqueued.push({ item, queueLen }),
onDequeued: (batch) => rec.dequeued.push(batch),
onBlocked: (reason, queueLen) => rec.blocked.push({ reason, queueLen }),
onCleared: (reason, count) => rec.cleared.push({ reason, count }),
},
});
return { q, rec };
}
/** Mirrors listen-client message arrival logic for a MessageCreate payload. */
function simulateMessageArrival(
q: QueueRuntime,
pendingTurnsRef: { value: number },
payload: MessageCreate | ApprovalCreate,
): boolean {
const isUserMessage = "content" in payload;
if (isUserMessage) {
q.enqueue({
kind: "message",
source: "user",
content: (payload as MessageCreate).content,
} as Parameters<typeof q.enqueue>[0]);
if (pendingTurnsRef.value > 0) {
q.tryDequeue("runtime_busy");
}
}
pendingTurnsRef.value++; // synchronous before .then()
return isUserMessage;
}
/** Mirrors the start of the .then() chain callback. */
function simulateTurnStart(
q: QueueRuntime,
_pendingTurnsRef: { value: number },
isUserMessage: boolean,
): void {
if (isUserMessage) q.consumeItems(1);
}
/** Mirrors the finally block. */
function simulateTurnEnd(
q: QueueRuntime,
pendingTurnsRef: { value: number },
): void {
pendingTurnsRef.value--;
if (pendingTurnsRef.value === 0) q.resetBlockedState();
}
function makeMessageCreate(text = "hello"): MessageCreate {
return { role: "user", content: text } as unknown as MessageCreate;
}
function makeApprovalCreate(): ApprovalCreate {
// ApprovalCreate does NOT have a `content` field — used for legacy approval path
return { type: "approval", approvals: [] } as unknown as ApprovalCreate;
}
// ── Tests ─────────────────────────────────────────────────────────
describe("single message — idle path", () => {
test("enqueued → dequeued, no blocked, real queue_len values", () => {
const { q, rec } = buildRuntime();
const turns = { value: 0 };
const isUser = simulateMessageArrival(q, turns, makeMessageCreate());
expect(rec.enqueued).toHaveLength(1);
expect(rec.enqueued.at(0)?.queueLen).toBe(1);
expect(rec.blocked).toHaveLength(0);
simulateTurnStart(q, turns, isUser);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(1);
expect(rec.dequeued.at(0)?.queueLenAfter).toBe(0);
simulateTurnEnd(q, turns);
expect(turns.value).toBe(0);
expect(q.length).toBe(0);
});
});
describe("two rapid messages — busy path", () => {
test("second arrival gets blocked(runtime_busy) due to sync pendingTurns", () => {
const { q, rec } = buildRuntime();
const turns = { value: 0 };
// First message arrives
const isUser1 = simulateMessageArrival(
q,
turns,
makeMessageCreate("first"),
);
expect(turns.value).toBe(1); // synchronously incremented
expect(rec.blocked).toHaveLength(0); // was 0 at arrival
// Second message arrives BEFORE first turn's .then() runs
const isUser2 = simulateMessageArrival(
q,
turns,
makeMessageCreate("second"),
);
expect(turns.value).toBe(2);
expect(rec.blocked).toHaveLength(1);
expect(rec.blocked.at(0)?.reason).toBe("runtime_busy");
expect(rec.blocked.at(0)?.queueLen).toBe(2); // both enqueued
// First turn runs
simulateTurnStart(q, turns, isUser1);
expect(rec.dequeued).toHaveLength(1);
expect(rec.dequeued.at(0)?.mergedCount).toBe(1);
simulateTurnEnd(q, turns);
expect(turns.value).toBe(1); // second still pending
// Second turn runs
simulateTurnStart(q, turns, isUser2);
expect(rec.dequeued).toHaveLength(2);
expect(rec.dequeued.at(1)?.mergedCount).toBe(1);
expect(rec.dequeued.at(1)?.queueLenAfter).toBe(0);
simulateTurnEnd(q, turns);
expect(turns.value).toBe(0);
});
test("blocked fires only once for same reason; resets when fully drained", () => {
const { q, rec } = buildRuntime();
const turns = { value: 0 };
simulateMessageArrival(q, turns, makeMessageCreate("a"));
simulateMessageArrival(q, turns, makeMessageCreate("b")); // blocked
simulateMessageArrival(q, turns, makeMessageCreate("c")); // same reason — no extra blocked
expect(rec.blocked).toHaveLength(1);
// Drain all three
for (let i = 0; i < 3; i++) {
simulateTurnStart(q, turns, true);
simulateTurnEnd(q, turns);
}
expect(turns.value).toBe(0);
// New arrival after full drain — should be idle (no blocked)
simulateMessageArrival(q, turns, makeMessageCreate("d"));
expect(rec.blocked).toHaveLength(1); // still just the original one
});
});
describe("pendingTurns safety — always decremented", () => {
test("pendingTurns decrements even when simulateTurnStart would throw", () => {
// Mirrors the production fix: onStatusChange("receiving") moved inside try
// so the finally always fires. Here we verify that the turn-end path
// (finally equivalent) always restores pendingTurns to 0.
const { q } = buildRuntime();
const turns = { value: 0 };
simulateMessageArrival(q, turns, makeMessageCreate("msg"));
expect(turns.value).toBe(1);
// Simulate: consumeItems fires, then an error before handleIncomingMessage
q.consumeItems(1);
// finally fires (error path)
simulateTurnEnd(q, turns);
expect(turns.value).toBe(0); // not leaked
expect(q.length).toBe(0);
});
});
describe("ApprovalCreate payloads", () => {
test("ApprovalCreate is not enqueued (no content field)", () => {
const { q, rec } = buildRuntime();
const turns = { value: 0 };
const isUser = simulateMessageArrival(q, turns, makeApprovalCreate());
expect(isUser).toBe(false);
expect(rec.enqueued).toHaveLength(0);
expect(turns.value).toBe(1); // pendingTurns still increments
// No consumeItems called in .then()
simulateTurnStart(q, turns, isUser);
expect(rec.dequeued).toHaveLength(0);
simulateTurnEnd(q, turns);
expect(turns.value).toBe(0);
});
});
describe("connection close", () => {
test("clear(shutdown) emits queue_cleared exactly once for intentional close", () => {
const { q, rec } = buildRuntime();
q.clear("shutdown");
expect(rec.cleared).toHaveLength(1);
expect(rec.cleared.at(0)?.reason).toBe("shutdown");
expect(rec.cleared.at(0)?.count).toBe(0);
});
test("clear(shutdown) emits with correct count when items are pending", () => {
const { q, rec } = buildRuntime();
const turns = { value: 0 };
simulateMessageArrival(q, turns, makeMessageCreate("pending"));
q.clear("shutdown"); // connection closed before turn ran
expect(rec.cleared.at(0)?.count).toBe(1);
expect(q.length).toBe(0);
});
});
describe("per-turn error — no queue_cleared", () => {
test("turn error only decrements pendingTurns; remaining turns still dequeue", () => {
const { q, rec } = buildRuntime();
const turns = { value: 0 };
simulateMessageArrival(q, turns, makeMessageCreate("first"));
simulateMessageArrival(q, turns, makeMessageCreate("second"));
// First turn: simulate error — finally still runs
simulateTurnStart(q, turns, true);
simulateTurnEnd(q, turns); // error path still hits finally
expect(rec.cleared).toHaveLength(0); // no queue_cleared
// Second turn still runs
simulateTurnStart(q, turns, true);
expect(rec.dequeued).toHaveLength(2);
simulateTurnEnd(q, turns);
expect(turns.value).toBe(0);
expect(rec.cleared).toHaveLength(0); // still no queue_cleared
});
});

View File

@@ -31,6 +31,7 @@ import { generatePlanFilePath } from "../cli/helpers/planName";
import { drainStreamWithResume } from "../cli/helpers/stream";
import { computeDiffPreviews } from "../helpers/diffPreview";
import { permissionMode } from "../permissions/mode";
import { QueueRuntime } from "../queue/queueRuntime";
import { settingsManager } from "../settings-manager";
import { isInteractiveApprovalTool } from "../tools/interactivePolicy";
import { loadTools } from "../tools/manager";
@@ -42,6 +43,7 @@ import type {
ErrorMessage,
MessageWire,
ResultMessage as ProtocolResultMessage,
QueueLifecycleEvent,
RecoveryMessage,
RetryMessage,
StopReasonType,
@@ -140,6 +142,11 @@ type ListenerRuntime = {
controlResponseCapable: boolean;
/** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */
sessionId: string;
/** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */
queueRuntime: QueueRuntime;
/** Count of turns currently queued or in-flight in the promise chain. Incremented
* synchronously on message arrival (before .then()) to avoid async scheduling races. */
pendingTurns: number;
};
type ApprovalSlot =
@@ -192,7 +199,7 @@ const INITIAL_RETRY_DELAY_MS = 1000; // 1 second
const MAX_RETRY_DELAY_MS = 30000; // 30 seconds
function createRuntime(): ListenerRuntime {
return {
const runtime: ListenerRuntime = {
socket: null,
heartbeatInterval: null,
reconnectTimeout: null,
@@ -202,7 +209,63 @@ function createRuntime(): ListenerRuntime {
pendingApprovalResolvers: new Map(),
controlResponseCapable: false,
sessionId: `listen-${crypto.randomUUID()}`,
pendingTurns: 0,
// queueRuntime assigned below — needs runtime ref in callbacks
queueRuntime: null as unknown as QueueRuntime,
};
runtime.queueRuntime = new QueueRuntime({
callbacks: {
onEnqueued: (item, queueLen) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_item_enqueued",
item_id: item.id,
source: item.source,
kind: item.kind,
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-enq-${item.id}`,
});
}
},
onDequeued: (batch) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_batch_dequeued",
batch_id: batch.batchId,
item_ids: batch.items.map((i) => i.id),
merged_count: batch.mergedCount,
queue_len_after: batch.queueLenAfter,
session_id: runtime.sessionId,
uuid: `q-deq-${batch.batchId}`,
});
}
},
onBlocked: (reason, queueLen) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_blocked",
reason,
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-blk-${crypto.randomUUID()}`,
});
}
},
onCleared: (reason, clearedCount) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_cleared",
reason,
cleared_count: clearedCount,
session_id: runtime.sessionId,
uuid: `q-clr-${crypto.randomUUID()}`,
});
}
},
},
});
return runtime;
}
function clearRuntimeTimers(runtime: ListenerRuntime): void {
@@ -303,13 +366,14 @@ function sendControlMessageOverWebSocket(
// ── Typed protocol event adapter ────────────────────────────────
type WsProtocolEvent =
export type WsProtocolEvent =
| MessageWire
| AutoApprovalMessage
| ErrorMessage
| RetryMessage
| RecoveryMessage
| ProtocolResultMessage;
| ProtocolResultMessage
| QueueLifecycleEvent;
/**
* Single adapter for all outbound typed protocol events.
@@ -672,21 +736,59 @@ async function connectWithRetry(
// Handle incoming messages (queued for sequential processing)
if (parsed.type === "message") {
// Queue lifecycle tracking: only enqueue if first payload is a
// MessageCreate (has `content`). ApprovalCreate payloads (legacy
// approval path) do not represent user-initiated messages.
const firstPayload = parsed.messages.at(0);
const isUserMessage =
firstPayload !== undefined && "content" in firstPayload;
if (isUserMessage) {
runtime.queueRuntime.enqueue({
kind: "message",
source: "user",
content: (firstPayload as MessageCreate).content,
} as Parameters<typeof runtime.queueRuntime.enqueue>[0]);
// Emit blocked on state transition when turns are already queued.
// pendingTurns is incremented synchronously (below) before .then(),
// so a second arrival always sees the correct count.
if (runtime.pendingTurns > 0) {
runtime.queueRuntime.tryDequeue("runtime_busy");
}
}
// Increment synchronously before chaining to avoid scheduling races
runtime.pendingTurns++;
runtime.messageQueue = runtime.messageQueue
.then(async () => {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
runtime.pendingTurns--;
return;
}
opts.onStatusChange?.("receiving", opts.connectionId);
await handleIncomingMessage(
parsed,
socket,
runtime,
opts.onStatusChange,
opts.connectionId,
);
opts.onStatusChange?.("idle", opts.connectionId);
// Signal dequeue for exactly this one turn (one message per chain cb)
if (isUserMessage) {
runtime.queueRuntime.consumeItems(1);
}
// onStatusChange("receiving") is inside try so that any throw
// still reaches the finally and decrements pendingTurns.
try {
opts.onStatusChange?.("receiving", opts.connectionId);
await handleIncomingMessage(
parsed,
socket,
runtime,
opts.onStatusChange,
opts.connectionId,
);
opts.onStatusChange?.("idle", opts.connectionId);
} finally {
runtime.pendingTurns--;
// Reset blocked state only when queue is fully drained
if (runtime.pendingTurns === 0) {
runtime.queueRuntime.resetBlockedState();
}
}
})
.catch((error: unknown) => {
if (process.env.DEBUG) {
@@ -702,6 +804,10 @@ async function connectWithRetry(
return;
}
// Single authoritative queue_cleared emission for all close paths
// (intentional and unintentional). Must fire before early returns.
runtime.queueRuntime.clear("shutdown");
if (process.env.DEBUG) {
console.log(
`[Listen] WebSocket disconnected (code: ${code}, reason: ${reason.toString()})`,