diff --git a/src/queue/queueRuntime.ts b/src/queue/queueRuntime.ts new file mode 100644 index 0000000..8236a3c --- /dev/null +++ b/src/queue/queueRuntime.ts @@ -0,0 +1,312 @@ +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { + QueueBlockedReason, + QueueClearedReason, + QueueItemDroppedReason, + QueueItemKind, + QueueItemSource, +} from "../types/protocol"; + +// ── Item types ─────────────────────────────────────────────────── + +type QueueItemBase = { + /** Stable monotonic ID assigned on enqueue. */ + id: string; + source: QueueItemSource; + enqueuedAt: number; +}; + +export type MessageQueueItem = QueueItemBase & { + kind: "message"; + /** Full multimodal content — string or content-part array. */ + content: MessageCreate["content"]; +}; + +export type TaskNotificationQueueItem = QueueItemBase & { + kind: "task_notification"; + /** XML notification string. */ + text: string; +}; + +export type ApprovalResultQueueItem = QueueItemBase & { + kind: "approval_result"; + text: string; +}; + +export type OverlayActionQueueItem = QueueItemBase & { + kind: "overlay_action"; + text: string; +}; + +export type QueueItem = + | MessageQueueItem + | TaskNotificationQueueItem + | ApprovalResultQueueItem + | OverlayActionQueueItem; + +// ── Coalescability ─────────────────────────────────────────────── + +/** Coalescable items can be merged into a single submission batch. */ +export function isCoalescable(kind: QueueItemKind): boolean { + return kind === "message" || kind === "task_notification"; +} + +// ── Batch / callbacks ──────────────────────────────────────────── + +export interface DequeuedBatch { + batchId: string; + items: QueueItem[]; + /** + * Number of items that were merged into this batch. + * Equal to items.length for coalescable batches; 1 for barrier items. + */ + mergedCount: number; + /** Queue length after this batch was removed. */ + queueLenAfter: number; +} + +export interface QueueCallbacks { + onEnqueued?: (item: QueueItem, queueLen: number) => void; + onDequeued?: (batch: DequeuedBatch) => void; + /** + * Fired on blocked-reason state transitions (not on every check). + * Only fires when queue is non-empty. + */ + onBlocked?: (reason: QueueBlockedReason, queueLen: number) => void; + onCleared?: (reason: QueueClearedReason, clearedCount: number) => void; + /** + * Fired when an item is dropped. + * queueLen is the post-operation queue depth: + * - Soft-limit coalescable drop: one removed, one added → net unchanged. + * - Hard-ceiling rejection: item not added → current length unchanged. + */ + onDropped?: ( + item: QueueItem, + reason: QueueItemDroppedReason, + queueLen: number, + ) => void; +} + +// ── Options ────────────────────────────────────────────────────── + +export interface QueueRuntimeOptions { + /** + * Soft limit. When reached, the oldest coalescable item is dropped + * to make room for a new one. Default: 100. + */ + maxItems?: number; + /** + * Hard ceiling. When reached, enqueue is rejected entirely (returns null) + * for all item kinds and onDropped fires. Default: maxItems * 3. + */ + hardMaxItems?: number; + callbacks?: QueueCallbacks; +} + +// ── Runtime ────────────────────────────────────────────────────── + +export class QueueRuntime { + private readonly store: QueueItem[] = []; + private readonly callbacks: QueueCallbacks; + private readonly maxItems: number; + private readonly hardMaxItems: number; + private nextId = 0; + private nextBatchId = 0; + + // Blocked-reason transition tracking + private lastEmittedBlockedReason: QueueBlockedReason | null = null; + private blockedEmittedForNonEmpty = false; + + constructor(options: QueueRuntimeOptions = {}) { + const maxItems = Math.max(1, Math.floor(options.maxItems ?? 100) || 100); + const hardMaxItems = Math.max( + maxItems, + Math.floor(options.hardMaxItems ?? maxItems * 3) || maxItems * 3, + ); + this.maxItems = maxItems; + this.hardMaxItems = hardMaxItems; + this.callbacks = options.callbacks ?? {}; + } + + // ── Enqueue ──────────────────────────────────────────────────── + + /** + * Add an item to the queue. Returns the enqueued item (with assigned id + * and enqueuedAt), or null if the hard ceiling was reached. + * + * - If at soft limit and item is coalescable: drops oldest coalescable item. + * - If at soft limit and item is a barrier: allows overflow (soft limit only + * applies to coalescable items). + * - If at hard ceiling: rejects all item kinds, fires onDropped("buffer_limit"). + */ + enqueue(input: Omit): QueueItem | null { + // Hard ceiling check + if (this.store.length >= this.hardMaxItems) { + const phantom = this.makeItem(input); + this.safeCallback( + "onDropped", + phantom, + "buffer_limit", + this.store.length, + ); + return null; + } + + // Soft limit: only drop coalescable items + if (this.store.length >= this.maxItems && isCoalescable(input.kind)) { + const dropIdx = this.store.findIndex((i) => isCoalescable(i.kind)); + const dropped = + dropIdx !== -1 ? this.store.splice(dropIdx, 1)[0] : undefined; + if (dropped !== undefined) { + const item = this.makeItem(input); + this.store.push(item); + // queueLen after: same as before (one dropped, one added) + this.safeCallback( + "onDropped", + dropped, + "buffer_limit", + this.store.length, + ); + this.safeCallback("onEnqueued", item, this.store.length); + return item; + } + } + + const item = this.makeItem(input); + this.store.push(item); + this.safeCallback("onEnqueued", item, this.store.length); + + // If queue just became non-empty while blocked, blocked-epoch tracking resets + // so the next tryDequeue call can re-emit the blocked event. + if (this.store.length === 1) { + this.blockedEmittedForNonEmpty = false; + } + + return item; + } + + // ── Dequeue ──────────────────────────────────────────────────── + + /** + * Attempt to dequeue the next batch. + * + * Pass `blockedReason` (non-null) when the caller's gating conditions + * prevent submission. Pass `null` when submission is allowed. + * + * Returns null if blocked or queue is empty. + * Returns a DequeuedBatch with coalescable items (or a single barrier). + */ + tryDequeue(blockedReason: QueueBlockedReason | null): DequeuedBatch | null { + if (blockedReason !== null) { + // Only emit on transition when queue is non-empty + if (this.store.length > 0) { + const shouldEmit = + blockedReason !== this.lastEmittedBlockedReason || + !this.blockedEmittedForNonEmpty; + if (shouldEmit) { + this.lastEmittedBlockedReason = blockedReason; + this.blockedEmittedForNonEmpty = true; + this.safeCallback("onBlocked", blockedReason, this.store.length); + } + } + return null; + } + + // Unblocked — reset tracking + this.lastEmittedBlockedReason = null; + this.blockedEmittedForNonEmpty = false; + + if (this.store.length === 0) { + return null; + } + + // Drain contiguous coalescable items from head + const batch: QueueItem[] = []; + while ( + this.store.length > 0 && + isCoalescable(this.store[0]?.kind ?? "approval_result") + ) { + const item = this.store.shift(); + if (item) batch.push(item); + } + + // If head was a barrier (no coalescables found), dequeue it alone + if (batch.length === 0 && this.store.length > 0) { + const item = this.store.shift(); + if (item) batch.push(item); + } + + if (batch.length === 0) { + return null; + } + + // When queue becomes empty after dequeue, reset blocked epoch tracking + if (this.store.length === 0) { + this.blockedEmittedForNonEmpty = false; + } + + const result: DequeuedBatch = { + batchId: `batch-${++this.nextBatchId}`, + items: batch, + mergedCount: batch.length, + queueLenAfter: this.store.length, + }; + + this.safeCallback("onDequeued", result); + return result; + } + + // ── Clear ────────────────────────────────────────────────────── + + /** Remove all items and fire onCleared. */ + clear(reason: QueueClearedReason): void { + const count = this.store.length; + this.store.length = 0; + this.lastEmittedBlockedReason = null; + this.blockedEmittedForNonEmpty = false; + this.safeCallback("onCleared", reason, count); + } + + // ── Accessors ────────────────────────────────────────────────── + + get length(): number { + return this.store.length; + } + + get isEmpty(): boolean { + return this.store.length === 0; + } + + get items(): readonly QueueItem[] { + return this.store.slice(); + } + + peek(): readonly QueueItem[] { + return this.store.slice(); + } + + // ── Internals ────────────────────────────────────────────────── + + private makeItem(input: Omit): QueueItem { + return { + ...input, + id: `q-${++this.nextId}`, + enqueuedAt: performance.now(), + } as QueueItem; + } + + private safeCallback( + name: K, + ...args: Parameters> + ): void { + try { + (this.callbacks[name] as ((...a: unknown[]) => void) | undefined)?.( + ...args, + ); + } catch (err) { + if (process.env.DEBUG) { + console.error(`[QueueRuntime] callback "${name}" threw:`, err); + } + } + } +} diff --git a/src/tests/queue/queueRuntime.test.ts b/src/tests/queue/queueRuntime.test.ts new file mode 100644 index 0000000..e0a551f --- /dev/null +++ b/src/tests/queue/queueRuntime.test.ts @@ -0,0 +1,427 @@ +import { describe, expect, test } from "bun:test"; +import { + type DequeuedBatch, + type MessageQueueItem, + type QueueItem, + QueueRuntime, +} from "../../queue/queueRuntime"; + +// ── Helpers ─────────────────────────────────────────────────────── + +function makeMsg(text = "hello"): Omit { + return { kind: "message", source: "user", content: text }; +} + +function makeTask( + text = "", +): Omit< + Extract, + "id" | "enqueuedAt" +> { + return { kind: "task_notification", source: "task_notification", text }; +} + +function makeApproval(): Omit< + Extract, + "id" | "enqueuedAt" +> { + return { kind: "approval_result", source: "system", text: "{}" }; +} + +function makeOverlay(): Omit< + Extract, + "id" | "enqueuedAt" +> { + return { kind: "overlay_action", source: "system", text: "plan_mode" }; +} + +// ── Enqueue ─────────────────────────────────────────────────────── + +describe("enqueue basics", () => { + test("adds item, assigns ID, returns item, length increases", () => { + const q = new QueueRuntime(); + const item = q.enqueue(makeMsg()); + expect(item).not.toBeNull(); + expect(item?.id).toMatch(/^q-\d+$/); + expect(item?.kind).toBe("message"); + expect(q.length).toBe(1); + }); + + test("onEnqueued fires with correct item and queue length", () => { + const calls: [QueueItem, number][] = []; + const q = new QueueRuntime({ + callbacks: { onEnqueued: (item, len) => calls.push([item, len]) }, + }); + q.enqueue(makeMsg("a")); + q.enqueue(makeMsg("b")); + expect(calls).toHaveLength(2); + expect(calls.at(0)?.[1]).toBe(1); + expect(calls.at(1)?.[1]).toBe(2); + }); + + test("multimodal content preserved through round-trip", () => { + const q = new QueueRuntime(); + // Use an array of text parts to verify multipart content is preserved + const content: MessageQueueItem["content"] = [ + { type: "text" as const, text: "part one" }, + { type: "text" as const, text: "part two" }, + ]; + const input: Omit = { + kind: "message", + source: "user", + content, + }; + const item = q.enqueue(input); + expect(item).not.toBeNull(); + const batch = q.tryDequeue(null); + expect(batch).not.toBeNull(); + const dequeued = batch?.items.at(0) as MessageQueueItem; + expect(dequeued.content).toEqual(content); + }); +}); + +describe("bounded buffer — soft limit", () => { + test("drops oldest coalescable when at soft limit", () => { + const dropped: QueueItem[] = []; + const q = new QueueRuntime({ + maxItems: 2, + callbacks: { onDropped: (item) => dropped.push(item) }, + }); + const a = q.enqueue(makeMsg("a")); + expect(a).not.toBeNull(); + q.enqueue(makeMsg("b")); + q.enqueue(makeMsg("c")); // triggers drop of "a" + + expect(dropped).toHaveLength(1); + expect((dropped.at(0) as MessageQueueItem).content).toBe("a"); + const droppedItem = dropped.at(0); + expect(a?.id).toEqual(droppedItem?.id); + expect(q.length).toBe(2); + }); + + test("barrier items not dropped at soft limit", () => { + const dropped: QueueItem[] = []; + const q = new QueueRuntime({ + maxItems: 1, + callbacks: { onDropped: (item) => dropped.push(item) }, + }); + q.enqueue(makeApproval()); // fills to capacity (barrier) + q.enqueue(makeApproval()); // another barrier — soft limit exceeded, not dropped + expect(dropped).toHaveLength(0); + expect(q.length).toBe(2); + }); + + test("coalescable drop resumes when new coalescable arrives at capacity", () => { + const dropped: QueueItem[] = []; + const q = new QueueRuntime({ + maxItems: 2, + callbacks: { onDropped: (item) => dropped.push(item) }, + }); + q.enqueue(makeMsg("a")); + q.enqueue(makeMsg("b")); // full + q.enqueue(makeMsg("c")); // drops "a" + q.enqueue(makeMsg("d")); // drops "b" + expect(dropped).toHaveLength(2); + expect(q.length).toBe(2); + }); +}); + +describe("bounded buffer — hard ceiling", () => { + test("returns null when hardMaxItems reached, fires onDropped(buffer_limit)", () => { + const dropped: [QueueItem, string][] = []; + const q = new QueueRuntime({ + maxItems: 1, + hardMaxItems: 2, + callbacks: { + onDropped: (item, reason) => dropped.push([item, reason]), + }, + }); + q.enqueue(makeApproval()); // 1 + q.enqueue(makeApproval()); // 2 (soft barrier overflow) + const result = q.enqueue(makeApproval()); // hard ceiling + expect(result).toBeNull(); + expect(dropped).toHaveLength(1); + expect(dropped.at(0)?.[1]).toBe("buffer_limit"); + expect(q.length).toBe(2); // unchanged + }); + + test("hard ceiling applies to coalescable items too", () => { + // maxItems == hardMaxItems: soft drop would normally kick in for coalescable, + // but hard ceiling fires first since there's no room even after a drop. + // With hardMaxItems=2 and maxItems=2: soft limit drops oldest coalescable, + // so length stays at 2 — enqueue succeeds. To force coalescable rejection, + // use hardMaxItems=1 (maxItems clamped to 1 as well). + const dropped: string[] = []; + const q = new QueueRuntime({ + maxItems: 1, + hardMaxItems: 1, + callbacks: { onDropped: (_item, reason) => dropped.push(reason) }, + }); + q.enqueue(makeMsg("a")); // length 1 = at hard ceiling + const rejected = q.enqueue(makeMsg("b")); // hard ceiling — coalescable rejected + expect(rejected).toBeNull(); + expect(dropped).toEqual(["buffer_limit"]); + expect(q.length).toBe(1); // unchanged + }); +}); + +// ── Dequeue — coalescable ───────────────────────────────────────── + +describe("dequeue coalescable items", () => { + test("returns all contiguous coalescable items as one batch", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg("a")); + q.enqueue(makeTask("")); + q.enqueue(makeMsg("b")); + const batch = q.tryDequeue(null); + expect(batch).not.toBeNull(); + expect(batch?.items).toHaveLength(3); + expect(batch?.mergedCount).toBe(3); + }); + + test("onDequeued fires with correct batch metadata", () => { + const batches: DequeuedBatch[] = []; + const q = new QueueRuntime({ + callbacks: { onDequeued: (b) => batches.push(b) }, + }); + q.enqueue(makeMsg("a")); + q.enqueue(makeMsg("b")); + q.tryDequeue(null); + expect(batches).toHaveLength(1); + const b = batches.at(0); + expect(b?.mergedCount).toBe(2); + expect(b?.queueLenAfter).toBe(0); + expect(b?.batchId).toMatch(/^batch-\d+$/); + }); + + test("length is 0 after full dequeue", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg()); + q.enqueue(makeMsg()); + q.tryDequeue(null); + expect(q.length).toBe(0); + expect(q.isEmpty).toBe(true); + }); + + test("tryDequeue on empty queue returns null, no callback", () => { + const batches: DequeuedBatch[] = []; + const q = new QueueRuntime({ + callbacks: { onDequeued: (b) => batches.push(b) }, + }); + const result = q.tryDequeue(null); + expect(result).toBeNull(); + expect(batches).toHaveLength(0); + }); +}); + +// ── Dequeue — barrier ───────────────────────────────────────────── + +describe("dequeue barrier items", () => { + test("barrier item at head dequeued alone", () => { + const q = new QueueRuntime(); + q.enqueue(makeApproval()); + q.enqueue(makeMsg("following")); + const batch = q.tryDequeue(null); + expect(batch).not.toBeNull(); + expect(batch?.items).toHaveLength(1); + expect(batch?.items.at(0)?.kind).toBe("approval_result"); + expect(q.length).toBe(1); // message still in queue + }); + + test("coalescable items before barrier dequeued as batch, barrier stays", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg("a")); + q.enqueue(makeMsg("b")); + q.enqueue(makeOverlay()); // barrier + q.enqueue(makeMsg("c")); // after barrier + const batch = q.tryDequeue(null); + expect(batch?.items).toHaveLength(2); + expect(batch?.items.at(0)?.kind).toBe("message"); + expect(batch?.items.at(1)?.kind).toBe("message"); + expect(q.length).toBe(2); // overlay + msg still queued + }); + + test("mixed [msg, msg, overlay, msg]: first dequeue gets [msg, msg]", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg("1")); + q.enqueue(makeMsg("2")); + q.enqueue(makeOverlay()); + q.enqueue(makeMsg("3")); + + const b1 = q.tryDequeue(null); + expect(b1?.items.map((i) => i.kind)).toEqual(["message", "message"]); + + const b2 = q.tryDequeue(null); // overlay alone + expect(b2?.items).toHaveLength(1); + expect(b2?.items.at(0)?.kind).toBe("overlay_action"); + + const b3 = q.tryDequeue(null); // remaining msg + expect(b3?.items).toHaveLength(1); + expect(b3?.items.at(0)?.kind).toBe("message"); + + expect(q.isEmpty).toBe(true); + }); +}); + +// ── Blocked ─────────────────────────────────────────────────────── + +describe("blocked state", () => { + test("tryDequeue(streaming) returns null, fires onBlocked", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg()); + const result = q.tryDequeue("streaming"); + expect(result).toBeNull(); + expect(blocked).toEqual(["streaming"]); + }); + + test("same reason twice fires onBlocked only once", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg()); + q.tryDequeue("streaming"); + q.tryDequeue("streaming"); + expect(blocked).toHaveLength(1); + }); + + test("different reason fires onBlocked again", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg()); + q.tryDequeue("streaming"); + q.tryDequeue("pending_approvals"); + expect(blocked).toEqual(["streaming", "pending_approvals"]); + }); + + test("tryDequeue(null) after blocked resets tracking", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg()); + q.tryDequeue("streaming"); + q.tryDequeue(null); // unblocks, dequeues + q.enqueue(makeMsg()); + q.tryDequeue("streaming"); // should fire again + expect(blocked).toHaveLength(2); + }); + + test("blocked with empty queue does NOT fire onBlocked", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.tryDequeue("streaming"); + expect(blocked).toHaveLength(0); + }); + + test("queue empties while blocked, then refills under same reason: emits again", () => { + const blocked: string[] = []; + const q = new QueueRuntime({ + callbacks: { onBlocked: (r) => blocked.push(r) }, + }); + q.enqueue(makeMsg()); + q.tryDequeue("streaming"); // emits "streaming" + q.tryDequeue(null); // dequeues, queue empty, resets epoch + q.enqueue(makeMsg()); // refills under same block + q.tryDequeue("streaming"); // should emit "streaming" again + expect(blocked).toHaveLength(2); + expect(blocked).toEqual(["streaming", "streaming"]); + }); +}); + +// ── Clear ───────────────────────────────────────────────────────── + +describe("clear", () => { + test("removes all items and fires onCleared with count", () => { + const cleared: [string, number][] = []; + const q = new QueueRuntime({ + callbacks: { onCleared: (r, n) => cleared.push([r, n]) }, + }); + q.enqueue(makeMsg()); + q.enqueue(makeMsg()); + q.clear("error"); + expect(q.length).toBe(0); + expect(cleared).toEqual([["error", 2]]); + }); + + test("clear on empty queue fires onCleared with 0", () => { + const cleared: number[] = []; + const q = new QueueRuntime({ + callbacks: { onCleared: (_r, n) => cleared.push(n) }, + }); + q.clear("shutdown"); + expect(cleared).toEqual([0]); + }); +}); + +// ── Callback safety ─────────────────────────────────────────────── + +describe("callback safety", () => { + test("callback that throws does not corrupt queue state", () => { + const q = new QueueRuntime({ + callbacks: { + onEnqueued: () => { + throw new Error("boom"); + }, + }, + }); + const item = q.enqueue(makeMsg()); + expect(item).not.toBeNull(); + expect(q.length).toBe(1); // state intact despite callback throw + }); + + test("subsequent operations work after callback throw", () => { + let throwCount = 0; + const q = new QueueRuntime({ + callbacks: { + onDequeued: () => { + throwCount++; + throw new Error("oops"); + }, + }, + }); + q.enqueue(makeMsg("a")); + q.tryDequeue(null); // throws in callback + q.enqueue(makeMsg("b")); + const batch = q.tryDequeue(null); // should work fine + expect(batch).not.toBeNull(); + expect(batch?.items).toHaveLength(1); + expect(throwCount).toBe(2); + }); +}); + +// ── IDs ─────────────────────────────────────────────────────────── + +describe("IDs and accessors", () => { + test("IDs are monotonically increasing within a runtime instance", () => { + const q = new QueueRuntime(); + const a = q.enqueue(makeMsg()); + const b = q.enqueue(makeMsg()); + const c = q.enqueue(makeMsg()); + expect(a).not.toBeNull(); + expect(b).not.toBeNull(); + expect(c).not.toBeNull(); + const ids = [a?.id ?? "", b?.id ?? "", c?.id ?? ""].map((id) => + Number.parseInt(id.replace("q-", ""), 10), + ); + const [id0, id1, id2] = ids; + expect(id0).toBeLessThan(id1 ?? 0); + expect(id1).toBeLessThan(id2 ?? 0); + }); + + test("peek returns items without removing them", () => { + const q = new QueueRuntime(); + q.enqueue(makeMsg("a")); + q.enqueue(makeMsg("b")); + const peeked = q.peek(); + expect(peeked).toHaveLength(2); + expect(q.length).toBe(2); // unchanged + }); +});