diff --git a/src/tests/protocol/queue-lifecycle-types.test.ts b/src/tests/protocol/queue-lifecycle-types.test.ts new file mode 100644 index 0000000..349c7ec --- /dev/null +++ b/src/tests/protocol/queue-lifecycle-types.test.ts @@ -0,0 +1,270 @@ +import { describe, expect, test } from "bun:test"; +import type { + QueueBatchDequeuedEvent, + QueueBlockedEvent, + QueueBlockedReason, + QueueClearedEvent, + QueueClearedReason, + QueueItemDroppedEvent, + QueueItemDroppedReason, + QueueItemEnqueuedEvent, + QueueItemKind, + QueueItemSource, + QueueLifecycleEvent, + WireMessage, +} from "../../types/protocol"; + +/** + * Wire-shape tests for queue lifecycle protocol events. + * These verify that each event type has the expected fields and + * discriminant values, ensuring the protocol contract is stable. + */ + +describe("QueueItemEnqueuedEvent wire shape", () => { + test("has required fields with correct types", () => { + const event: QueueItemEnqueuedEvent = { + type: "queue_item_enqueued", + item_id: "item-1", + source: "user", + kind: "message", + queue_len: 1, + session_id: "session-abc", + uuid: "uuid-123", + }; + + expect(event.type).toBe("queue_item_enqueued"); + expect(event.item_id).toBe("item-1"); + expect(event.source).toBe("user"); + expect(event.kind).toBe("message"); + expect(event.queue_len).toBe(1); + expect(event.session_id).toBe("session-abc"); + expect(event.uuid).toBe("uuid-123"); + }); + + test("source covers all item origins", () => { + const sources: Record = { + user: true, + task_notification: true, + subagent: true, + system: true, + } satisfies Record; + expect(Object.keys(sources)).toHaveLength(4); + }); + + test("kind covers all content types", () => { + const kinds: Record = { + message: true, + task_notification: true, + approval_result: true, + overlay_action: true, + } satisfies Record; + expect(Object.keys(kinds)).toHaveLength(4); + }); +}); + +describe("QueueBatchDequeuedEvent wire shape", () => { + test("has required fields with correct types", () => { + const event: QueueBatchDequeuedEvent = { + type: "queue_batch_dequeued", + batch_id: "batch-1", + item_ids: ["item-1", "item-2"], + merged_count: 2, + queue_len_after: 0, + session_id: "session-abc", + uuid: "uuid-456", + }; + + expect(event.type).toBe("queue_batch_dequeued"); + expect(event.batch_id).toBe("batch-1"); + expect(event.item_ids).toEqual(["item-1", "item-2"]); + expect(event.merged_count).toBe(2); + expect(event.queue_len_after).toBe(0); + }); + + test("single-item batch has merged_count 1", () => { + const event: QueueBatchDequeuedEvent = { + type: "queue_batch_dequeued", + batch_id: "batch-2", + item_ids: ["item-1"], + merged_count: 1, + queue_len_after: 3, + session_id: "s", + uuid: "u", + }; + + expect(event.merged_count).toBe(1); + expect(event.item_ids).toHaveLength(1); + }); +}); + +describe("QueueBlockedEvent wire shape", () => { + test("has required fields with correct types", () => { + const event: QueueBlockedEvent = { + type: "queue_blocked", + reason: "streaming", + queue_len: 2, + session_id: "session-abc", + uuid: "uuid-789", + }; + + expect(event.type).toBe("queue_blocked"); + expect(event.reason).toBe("streaming"); + expect(event.queue_len).toBe(2); + }); + + test("reason covers all blocked states", () => { + const reasons: Record = { + streaming: true, + pending_approvals: true, + overlay_open: true, + command_running: true, + interrupt_in_progress: true, + runtime_busy: true, + } satisfies Record; + expect(Object.keys(reasons)).toHaveLength(6); + }); +}); + +describe("QueueClearedEvent wire shape", () => { + test("has required fields with correct types", () => { + const event: QueueClearedEvent = { + type: "queue_cleared", + reason: "processed", + cleared_count: 3, + session_id: "session-abc", + uuid: "uuid-012", + }; + + expect(event.type).toBe("queue_cleared"); + expect(event.reason).toBe("processed"); + expect(event.cleared_count).toBe(3); + }); + + test("reason covers all terminal conditions", () => { + const reasons: Record = { + processed: true, + error: true, + cancelled: true, + shutdown: true, + stale_generation: true, + } satisfies Record; + expect(Object.keys(reasons)).toHaveLength(5); + }); +}); + +describe("QueueItemDroppedEvent wire shape", () => { + test("has required fields with correct types", () => { + const event: QueueItemDroppedEvent = { + type: "queue_item_dropped", + item_id: "item-99", + reason: "buffer_limit", + queue_len: 10, + session_id: "session-abc", + uuid: "uuid-345", + }; + + expect(event.type).toBe("queue_item_dropped"); + expect(event.item_id).toBe("item-99"); + expect(event.reason).toBe("buffer_limit"); + expect(event.queue_len).toBe(10); + }); + + test("reason covers all drop causes", () => { + const reasons: Record = { + buffer_limit: true, + stale_generation: true, + } satisfies Record; + expect(Object.keys(reasons)).toHaveLength(2); + }); +}); + +describe("QueueLifecycleEvent union", () => { + test("discriminates on type field", () => { + const events: QueueLifecycleEvent[] = [ + { + type: "queue_item_enqueued", + item_id: "i1", + source: "user", + kind: "message", + queue_len: 1, + session_id: "s", + uuid: "u", + }, + { + type: "queue_batch_dequeued", + batch_id: "b1", + item_ids: ["i1"], + merged_count: 1, + queue_len_after: 0, + session_id: "s", + uuid: "u", + }, + { + type: "queue_blocked", + reason: "streaming", + queue_len: 1, + session_id: "s", + uuid: "u", + }, + { + type: "queue_cleared", + reason: "processed", + cleared_count: 1, + session_id: "s", + uuid: "u", + }, + { + type: "queue_item_dropped", + item_id: "i2", + reason: "buffer_limit", + queue_len: 0, + session_id: "s", + uuid: "u", + }, + ]; + + const types = events.map((e) => e.type); + expect(types).toEqual([ + "queue_item_enqueued", + "queue_batch_dequeued", + "queue_blocked", + "queue_cleared", + "queue_item_dropped", + ]); + }); + + test("all events serialize to valid JSON with envelope", () => { + const event: QueueLifecycleEvent = { + type: "queue_item_enqueued", + item_id: "i1", + source: "task_notification", + kind: "task_notification", + queue_len: 2, + session_id: "listen-abc123", + uuid: "enqueue-i1", + }; + + const json = JSON.stringify(event); + const parsed = JSON.parse(json); + + expect(parsed.type).toBe("queue_item_enqueued"); + expect(parsed.session_id).toBe("listen-abc123"); + expect(parsed.uuid).toBe("enqueue-i1"); + }); + + test("QueueLifecycleEvent is assignable to WireMessage", () => { + // Compile-time check: if QueueLifecycleEvent is removed from WireMessage, + // this assignment fails and the test won't compile. + const event: QueueLifecycleEvent = { + type: "queue_item_enqueued", + item_id: "i1", + source: "user", + kind: "message", + queue_len: 1, + session_id: "s", + uuid: "u", + }; + const wire: WireMessage = event; + expect(wire.type).toBe("queue_item_enqueued"); + }); +}); diff --git a/src/types/protocol.ts b/src/types/protocol.ts index 29930a0..50e2f4b 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -254,6 +254,137 @@ export interface ResultMessage extends MessageEnvelope { stop_reason?: StopReasonType; } +// ═══════════════════════════════════════════════════════════════ +// QUEUE LIFECYCLE +// Events emitted by the shared queue runtime. Each describes a +// discrete state transition in the turn queue. Consumers (TUI, +// headless bidir JSON, WS listen) emit these through their +// respective output channels. +// ═══════════════════════════════════════════════════════════════ + +/** + * Source that produced the queue item. + * - user: Submitted via Enter in TUI or stdin in headless + * - task_notification: Background subagent completion + * - subagent: Direct subagent result + * - system: Approval results, overlay actions, system reminders + */ +export type QueueItemSource = + | "user" + | "task_notification" + | "subagent" + | "system"; + +/** + * Kind of content carried by the queue item. + * - message: User or system text to send to the agent + * - task_notification: Background task completed notification + * - approval_result: Tool approval/denial result + * - overlay_action: Plan mode, AskUserQuestion, etc. + */ +export type QueueItemKind = + | "message" + | "task_notification" + | "approval_result" + | "overlay_action"; + +/** + * Emitted synchronously when an item enters the queue. + * A queue item is a discrete, submitted unit of work (post-Enter for user + * messages, or a delivered notification/result for system sources). + */ +export interface QueueItemEnqueuedEvent extends MessageEnvelope { + type: "queue_item_enqueued"; + item_id: string; + source: QueueItemSource; + kind: QueueItemKind; + queue_len: number; +} + +/** + * Emitted exactly once when the runtime dequeues a batch for submission. + * Contiguous coalescable items (user + task messages) are merged into one batch. + */ +export interface QueueBatchDequeuedEvent extends MessageEnvelope { + type: "queue_batch_dequeued"; + batch_id: string; + item_ids: string[]; + merged_count: number; + queue_len_after: number; +} + +/** + * Why the queue cannot dequeue right now. + * - streaming: Agent turn is actively streaming + * - pending_approvals: Waiting for HITL approval decisions + * - overlay_open: Plan mode, AskUserQuestion, or other overlay is active + * - command_running: Slash command is executing + * - interrupt_in_progress: User interrupt (Esc) is being processed + * - runtime_busy: Generic busy state (e.g., listen-client turn in flight) + */ +export type QueueBlockedReason = + | "streaming" + | "pending_approvals" + | "overlay_open" + | "command_running" + | "interrupt_in_progress" + | "runtime_busy"; + +/** + * Emitted only on blocked-reason state transitions (not on every dequeue + * check while blocked). The runtime tracks lastEmittedBlockedReason and + * fires this only when the reason changes or transitions from unblocked. + */ +export interface QueueBlockedEvent extends MessageEnvelope { + type: "queue_blocked"; + reason: QueueBlockedReason; + queue_len: number; +} + +/** + * Why the queue was cleared. + */ +export type QueueClearedReason = + | "processed" + | "error" + | "cancelled" + | "shutdown" + | "stale_generation"; + +/** + * Emitted when the queue is flushed due to a terminal condition. + */ +export interface QueueClearedEvent extends MessageEnvelope { + type: "queue_cleared"; + reason: QueueClearedReason; + cleared_count: number; +} + +/** + * Why an item was dropped without processing. + */ +export type QueueItemDroppedReason = "buffer_limit" | "stale_generation"; + +/** + * Emitted when an item is dropped from the queue without being processed. + */ +export interface QueueItemDroppedEvent extends MessageEnvelope { + type: "queue_item_dropped"; + item_id: string; + reason: QueueItemDroppedReason; + queue_len: number; +} + +/** + * Union of all queue lifecycle events. + */ +export type QueueLifecycleEvent = + | QueueItemEnqueuedEvent + | QueueBatchDequeuedEvent + | QueueBlockedEvent + | QueueClearedEvent + | QueueItemDroppedEvent; + // ═══════════════════════════════════════════════════════════════ // CONTROL PROTOCOL // Bidirectional: SDK → CLI and CLI → SDK both use control_request/response @@ -508,4 +639,5 @@ export type WireMessage = | RecoveryMessage | ResultMessage | ControlResponse - | ControlRequest; // CLI → SDK control requests (e.g., can_use_tool) + | ControlRequest // CLI → SDK control requests (e.g., can_use_tool) + | QueueLifecycleEvent;