feat(protocol): add queue lifecycle event types (#1160)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-25 22:17:51 -08:00
committed by GitHub
parent 6ddd54592b
commit b06de55102
2 changed files with 403 additions and 1 deletions

View File

@@ -0,0 +1,270 @@
import { describe, expect, test } from "bun:test";
import type {
QueueBatchDequeuedEvent,
QueueBlockedEvent,
QueueBlockedReason,
QueueClearedEvent,
QueueClearedReason,
QueueItemDroppedEvent,
QueueItemDroppedReason,
QueueItemEnqueuedEvent,
QueueItemKind,
QueueItemSource,
QueueLifecycleEvent,
WireMessage,
} from "../../types/protocol";
/**
* Wire-shape tests for queue lifecycle protocol events.
* These verify that each event type has the expected fields and
* discriminant values, ensuring the protocol contract is stable.
*/
describe("QueueItemEnqueuedEvent wire shape", () => {
test("has required fields with correct types", () => {
const event: QueueItemEnqueuedEvent = {
type: "queue_item_enqueued",
item_id: "item-1",
source: "user",
kind: "message",
queue_len: 1,
session_id: "session-abc",
uuid: "uuid-123",
};
expect(event.type).toBe("queue_item_enqueued");
expect(event.item_id).toBe("item-1");
expect(event.source).toBe("user");
expect(event.kind).toBe("message");
expect(event.queue_len).toBe(1);
expect(event.session_id).toBe("session-abc");
expect(event.uuid).toBe("uuid-123");
});
test("source covers all item origins", () => {
const sources: Record<QueueItemSource, true> = {
user: true,
task_notification: true,
subagent: true,
system: true,
} satisfies Record<QueueItemSource, true>;
expect(Object.keys(sources)).toHaveLength(4);
});
test("kind covers all content types", () => {
const kinds: Record<QueueItemKind, true> = {
message: true,
task_notification: true,
approval_result: true,
overlay_action: true,
} satisfies Record<QueueItemKind, true>;
expect(Object.keys(kinds)).toHaveLength(4);
});
});
describe("QueueBatchDequeuedEvent wire shape", () => {
test("has required fields with correct types", () => {
const event: QueueBatchDequeuedEvent = {
type: "queue_batch_dequeued",
batch_id: "batch-1",
item_ids: ["item-1", "item-2"],
merged_count: 2,
queue_len_after: 0,
session_id: "session-abc",
uuid: "uuid-456",
};
expect(event.type).toBe("queue_batch_dequeued");
expect(event.batch_id).toBe("batch-1");
expect(event.item_ids).toEqual(["item-1", "item-2"]);
expect(event.merged_count).toBe(2);
expect(event.queue_len_after).toBe(0);
});
test("single-item batch has merged_count 1", () => {
const event: QueueBatchDequeuedEvent = {
type: "queue_batch_dequeued",
batch_id: "batch-2",
item_ids: ["item-1"],
merged_count: 1,
queue_len_after: 3,
session_id: "s",
uuid: "u",
};
expect(event.merged_count).toBe(1);
expect(event.item_ids).toHaveLength(1);
});
});
describe("QueueBlockedEvent wire shape", () => {
test("has required fields with correct types", () => {
const event: QueueBlockedEvent = {
type: "queue_blocked",
reason: "streaming",
queue_len: 2,
session_id: "session-abc",
uuid: "uuid-789",
};
expect(event.type).toBe("queue_blocked");
expect(event.reason).toBe("streaming");
expect(event.queue_len).toBe(2);
});
test("reason covers all blocked states", () => {
const reasons: Record<QueueBlockedReason, true> = {
streaming: true,
pending_approvals: true,
overlay_open: true,
command_running: true,
interrupt_in_progress: true,
runtime_busy: true,
} satisfies Record<QueueBlockedReason, true>;
expect(Object.keys(reasons)).toHaveLength(6);
});
});
describe("QueueClearedEvent wire shape", () => {
test("has required fields with correct types", () => {
const event: QueueClearedEvent = {
type: "queue_cleared",
reason: "processed",
cleared_count: 3,
session_id: "session-abc",
uuid: "uuid-012",
};
expect(event.type).toBe("queue_cleared");
expect(event.reason).toBe("processed");
expect(event.cleared_count).toBe(3);
});
test("reason covers all terminal conditions", () => {
const reasons: Record<QueueClearedReason, true> = {
processed: true,
error: true,
cancelled: true,
shutdown: true,
stale_generation: true,
} satisfies Record<QueueClearedReason, true>;
expect(Object.keys(reasons)).toHaveLength(5);
});
});
describe("QueueItemDroppedEvent wire shape", () => {
test("has required fields with correct types", () => {
const event: QueueItemDroppedEvent = {
type: "queue_item_dropped",
item_id: "item-99",
reason: "buffer_limit",
queue_len: 10,
session_id: "session-abc",
uuid: "uuid-345",
};
expect(event.type).toBe("queue_item_dropped");
expect(event.item_id).toBe("item-99");
expect(event.reason).toBe("buffer_limit");
expect(event.queue_len).toBe(10);
});
test("reason covers all drop causes", () => {
const reasons: Record<QueueItemDroppedReason, true> = {
buffer_limit: true,
stale_generation: true,
} satisfies Record<QueueItemDroppedReason, true>;
expect(Object.keys(reasons)).toHaveLength(2);
});
});
describe("QueueLifecycleEvent union", () => {
test("discriminates on type field", () => {
const events: QueueLifecycleEvent[] = [
{
type: "queue_item_enqueued",
item_id: "i1",
source: "user",
kind: "message",
queue_len: 1,
session_id: "s",
uuid: "u",
},
{
type: "queue_batch_dequeued",
batch_id: "b1",
item_ids: ["i1"],
merged_count: 1,
queue_len_after: 0,
session_id: "s",
uuid: "u",
},
{
type: "queue_blocked",
reason: "streaming",
queue_len: 1,
session_id: "s",
uuid: "u",
},
{
type: "queue_cleared",
reason: "processed",
cleared_count: 1,
session_id: "s",
uuid: "u",
},
{
type: "queue_item_dropped",
item_id: "i2",
reason: "buffer_limit",
queue_len: 0,
session_id: "s",
uuid: "u",
},
];
const types = events.map((e) => e.type);
expect(types).toEqual([
"queue_item_enqueued",
"queue_batch_dequeued",
"queue_blocked",
"queue_cleared",
"queue_item_dropped",
]);
});
test("all events serialize to valid JSON with envelope", () => {
const event: QueueLifecycleEvent = {
type: "queue_item_enqueued",
item_id: "i1",
source: "task_notification",
kind: "task_notification",
queue_len: 2,
session_id: "listen-abc123",
uuid: "enqueue-i1",
};
const json = JSON.stringify(event);
const parsed = JSON.parse(json);
expect(parsed.type).toBe("queue_item_enqueued");
expect(parsed.session_id).toBe("listen-abc123");
expect(parsed.uuid).toBe("enqueue-i1");
});
test("QueueLifecycleEvent is assignable to WireMessage", () => {
// Compile-time check: if QueueLifecycleEvent is removed from WireMessage,
// this assignment fails and the test won't compile.
const event: QueueLifecycleEvent = {
type: "queue_item_enqueued",
item_id: "i1",
source: "user",
kind: "message",
queue_len: 1,
session_id: "s",
uuid: "u",
};
const wire: WireMessage = event;
expect(wire.type).toBe("queue_item_enqueued");
});
});

View File

@@ -254,6 +254,137 @@ export interface ResultMessage extends MessageEnvelope {
stop_reason?: StopReasonType;
}
// ═══════════════════════════════════════════════════════════════
// QUEUE LIFECYCLE
// Events emitted by the shared queue runtime. Each describes a
// discrete state transition in the turn queue. Consumers (TUI,
// headless bidir JSON, WS listen) emit these through their
// respective output channels.
// ═══════════════════════════════════════════════════════════════
/**
* Source that produced the queue item.
* - user: Submitted via Enter in TUI or stdin in headless
* - task_notification: Background subagent completion
* - subagent: Direct subagent result
* - system: Approval results, overlay actions, system reminders
*/
export type QueueItemSource =
| "user"
| "task_notification"
| "subagent"
| "system";
/**
* Kind of content carried by the queue item.
* - message: User or system text to send to the agent
* - task_notification: Background task completed notification
* - approval_result: Tool approval/denial result
* - overlay_action: Plan mode, AskUserQuestion, etc.
*/
export type QueueItemKind =
| "message"
| "task_notification"
| "approval_result"
| "overlay_action";
/**
* Emitted synchronously when an item enters the queue.
* A queue item is a discrete, submitted unit of work (post-Enter for user
* messages, or a delivered notification/result for system sources).
*/
export interface QueueItemEnqueuedEvent extends MessageEnvelope {
type: "queue_item_enqueued";
item_id: string;
source: QueueItemSource;
kind: QueueItemKind;
queue_len: number;
}
/**
* Emitted exactly once when the runtime dequeues a batch for submission.
* Contiguous coalescable items (user + task messages) are merged into one batch.
*/
export interface QueueBatchDequeuedEvent extends MessageEnvelope {
type: "queue_batch_dequeued";
batch_id: string;
item_ids: string[];
merged_count: number;
queue_len_after: number;
}
/**
* Why the queue cannot dequeue right now.
* - streaming: Agent turn is actively streaming
* - pending_approvals: Waiting for HITL approval decisions
* - overlay_open: Plan mode, AskUserQuestion, or other overlay is active
* - command_running: Slash command is executing
* - interrupt_in_progress: User interrupt (Esc) is being processed
* - runtime_busy: Generic busy state (e.g., listen-client turn in flight)
*/
export type QueueBlockedReason =
| "streaming"
| "pending_approvals"
| "overlay_open"
| "command_running"
| "interrupt_in_progress"
| "runtime_busy";
/**
* Emitted only on blocked-reason state transitions (not on every dequeue
* check while blocked). The runtime tracks lastEmittedBlockedReason and
* fires this only when the reason changes or transitions from unblocked.
*/
export interface QueueBlockedEvent extends MessageEnvelope {
type: "queue_blocked";
reason: QueueBlockedReason;
queue_len: number;
}
/**
* Why the queue was cleared.
*/
export type QueueClearedReason =
| "processed"
| "error"
| "cancelled"
| "shutdown"
| "stale_generation";
/**
* Emitted when the queue is flushed due to a terminal condition.
*/
export interface QueueClearedEvent extends MessageEnvelope {
type: "queue_cleared";
reason: QueueClearedReason;
cleared_count: number;
}
/**
* Why an item was dropped without processing.
*/
export type QueueItemDroppedReason = "buffer_limit" | "stale_generation";
/**
* Emitted when an item is dropped from the queue without being processed.
*/
export interface QueueItemDroppedEvent extends MessageEnvelope {
type: "queue_item_dropped";
item_id: string;
reason: QueueItemDroppedReason;
queue_len: number;
}
/**
* Union of all queue lifecycle events.
*/
export type QueueLifecycleEvent =
| QueueItemEnqueuedEvent
| QueueBatchDequeuedEvent
| QueueBlockedEvent
| QueueClearedEvent
| QueueItemDroppedEvent;
// ═══════════════════════════════════════════════════════════════
// CONTROL PROTOCOL
// Bidirectional: SDK → CLI and CLI → SDK both use control_request/response
@@ -508,4 +639,5 @@ export type WireMessage =
| RecoveryMessage
| ResultMessage
| ControlResponse
| ControlRequest; // CLI → SDK control requests (e.g., can_use_tool)
| ControlRequest // CLI → SDK control requests (e.g., can_use_tool)
| QueueLifecycleEvent;