From 7a36d27e8fd1b98fe60794b805d2f2edf44078fa Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Thu, 26 Feb 2026 14:52:51 -0800 Subject: [PATCH] feat(protocol): add static transcript sync message types (LSS1) (#1169) Co-authored-by: Letta --- src/tests/protocol/static-sync-types.test.ts | 273 +++++++++++++++++++ src/types/protocol.ts | 86 +++++- src/websocket/listen-client.ts | 10 +- 3 files changed, 367 insertions(+), 2 deletions(-) create mode 100644 src/tests/protocol/static-sync-types.test.ts diff --git a/src/tests/protocol/static-sync-types.test.ts b/src/tests/protocol/static-sync-types.test.ts new file mode 100644 index 0000000..5d90c52 --- /dev/null +++ b/src/tests/protocol/static-sync-types.test.ts @@ -0,0 +1,273 @@ +/** + * Tests for the static transcript sync protocol types (LSS1). + * + * Verifies structural correctness, discriminant exhaustiveness, and + * membership in WireMessage / WsProtocolEvent unions. + */ + +import { describe, expect, test } from "bun:test"; +import type { + QueueSnapshotMessage, + SyncCompleteMessage, + TranscriptBackfillMessage, + TranscriptSupplementMessage, + WireMessage, +} from "../../types/protocol"; +import type { WsProtocolEvent } from "../../websocket/listen-client"; + +// ── Helpers ─────────────────────────────────────────────────────── + +const ENVELOPE = { session_id: "sess-1", uuid: "uuid-1" } as const; + +// ── TranscriptBackfillMessage ───────────────────────────────────── + +describe("TranscriptBackfillMessage", () => { + test("minimal empty backfill is structurally valid", () => { + const msg: TranscriptBackfillMessage = { + ...ENVELOPE, + type: "transcript_backfill", + messages: [], + is_final: true, + }; + expect(msg.type).toBe("transcript_backfill"); + expect(msg.messages).toHaveLength(0); + expect(msg.is_final).toBe(true); + }); + + test("is_final: false marks a non-terminal chunk", () => { + const msg: TranscriptBackfillMessage = { + ...ENVELOPE, + type: "transcript_backfill", + messages: [], + is_final: false, + }; + expect(msg.is_final).toBe(false); + }); + + test("type discriminant is 'transcript_backfill'", () => { + const msg: TranscriptBackfillMessage = { + ...ENVELOPE, + type: "transcript_backfill", + messages: [], + is_final: true, + }; + // Narrowing works via the discriminant + if (msg.type === "transcript_backfill") { + expect(msg.is_final).toBeDefined(); + } + }); +}); + +// ── QueueSnapshotMessage ────────────────────────────────────────── + +describe("QueueSnapshotMessage", () => { + test("empty snapshot is valid", () => { + const msg: QueueSnapshotMessage = { + ...ENVELOPE, + type: "queue_snapshot", + items: [], + }; + expect(msg.type).toBe("queue_snapshot"); + expect(msg.items).toHaveLength(0); + }); + + test("snapshot with items preserves order and fields", () => { + const msg: QueueSnapshotMessage = { + ...ENVELOPE, + type: "queue_snapshot", + items: [ + { item_id: "item-1", kind: "message", source: "user" }, + { + item_id: "item-2", + kind: "task_notification", + source: "task_notification", + }, + ], + }; + expect(msg.items).toHaveLength(2); + const [first, second] = msg.items; + expect(first?.item_id).toBe("item-1"); + expect(first?.kind).toBe("message"); + expect(first?.source).toBe("user"); + expect(second?.kind).toBe("task_notification"); + }); +}); + +// ── SyncCompleteMessage ─────────────────────────────────────────── + +describe("SyncCompleteMessage", () => { + test("had_pending_turn: false for idle connect", () => { + const msg: SyncCompleteMessage = { + ...ENVELOPE, + type: "sync_complete", + had_pending_turn: false, + }; + expect(msg.type).toBe("sync_complete"); + expect(msg.had_pending_turn).toBe(false); + }); + + test("had_pending_turn: true for mid-turn connect", () => { + const msg: SyncCompleteMessage = { + ...ENVELOPE, + type: "sync_complete", + had_pending_turn: true, + }; + expect(msg.had_pending_turn).toBe(true); + }); +}); + +// ── TranscriptSupplementMessage ─────────────────────────────────── + +describe("TranscriptSupplementMessage", () => { + test("empty supplement is valid", () => { + const msg: TranscriptSupplementMessage = { + ...ENVELOPE, + type: "transcript_supplement", + messages: [], + }; + expect(msg.type).toBe("transcript_supplement"); + expect(msg.messages).toHaveLength(0); + }); + + test("distinct type discriminant from transcript_backfill", () => { + const backfill: TranscriptBackfillMessage = { + ...ENVELOPE, + type: "transcript_backfill", + messages: [], + is_final: true, + }; + const supplement: TranscriptSupplementMessage = { + ...ENVELOPE, + type: "transcript_supplement", + messages: [], + }; + expect(backfill.type).not.toBe(supplement.type); + }); +}); + +// ── Union membership ────────────────────────────────────────────── + +describe("WireMessage union membership", () => { + test("TranscriptBackfillMessage is assignable to WireMessage", () => { + const msg: WireMessage = { + ...ENVELOPE, + type: "transcript_backfill", + messages: [], + is_final: true, + }; + expect(msg.type).toBe("transcript_backfill"); + }); + + test("QueueSnapshotMessage is assignable to WireMessage", () => { + const msg: WireMessage = { + ...ENVELOPE, + type: "queue_snapshot", + items: [], + }; + expect(msg.type).toBe("queue_snapshot"); + }); + + test("SyncCompleteMessage is assignable to WireMessage", () => { + const msg: WireMessage = { + ...ENVELOPE, + type: "sync_complete", + had_pending_turn: false, + }; + expect(msg.type).toBe("sync_complete"); + }); + + test("TranscriptSupplementMessage is assignable to WireMessage", () => { + const msg: WireMessage = { + ...ENVELOPE, + type: "transcript_supplement", + messages: [], + }; + expect(msg.type).toBe("transcript_supplement"); + }); +}); + +describe("WsProtocolEvent union membership", () => { + test("TranscriptBackfillMessage is assignable to WsProtocolEvent", () => { + const msg: WsProtocolEvent = { + ...ENVELOPE, + type: "transcript_backfill", + messages: [], + is_final: true, + }; + expect(msg.type).toBe("transcript_backfill"); + }); + + test("QueueSnapshotMessage is assignable to WsProtocolEvent", () => { + const msg: WsProtocolEvent = { + ...ENVELOPE, + type: "queue_snapshot", + items: [], + }; + expect(msg.type).toBe("queue_snapshot"); + }); + + test("SyncCompleteMessage is assignable to WsProtocolEvent", () => { + const msg: WsProtocolEvent = { + ...ENVELOPE, + type: "sync_complete", + had_pending_turn: false, + }; + expect(msg.type).toBe("sync_complete"); + }); + + test("TranscriptSupplementMessage is assignable to WsProtocolEvent", () => { + const msg: WsProtocolEvent = { + ...ENVELOPE, + type: "transcript_supplement", + messages: [], + }; + expect(msg.type).toBe("transcript_supplement"); + }); +}); + +// ── Discriminant exhaustiveness ─────────────────────────────────── + +describe("type discriminants are unique across all four types", () => { + test("all four sync-phase discriminants are distinct", () => { + const types = [ + "transcript_backfill", + "queue_snapshot", + "sync_complete", + "transcript_supplement", + ]; + const unique = new Set(types); + expect(unique.size).toBe(types.length); + }); + + test("none conflict with existing WireMessage discriminants", () => { + // Existing discriminants: system, message, stream_event, auto_approval, + // error, retry, recovery, result, control_response, control_request, + // queue_item_enqueued, queue_batch_dequeued, queue_blocked, queue_cleared, + // queue_item_dropped + const existing = new Set([ + "system", + "message", + "stream_event", + "auto_approval", + "error", + "retry", + "recovery", + "result", + "control_response", + "control_request", + "queue_item_enqueued", + "queue_batch_dequeued", + "queue_blocked", + "queue_cleared", + "queue_item_dropped", + ]); + for (const t of [ + "transcript_backfill", + "queue_snapshot", + "sync_complete", + "transcript_supplement", + ]) { + expect(existing.has(t)).toBe(false); + } + }); +}); diff --git a/src/types/protocol.ts b/src/types/protocol.ts index 50e2f4b..2d16b1f 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -11,6 +11,7 @@ import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; import type { AssistantMessage as LettaAssistantMessage, + Message as LettaMessage, ReasoningMessage as LettaReasoningMessage, LettaStreamingResponse, ToolCallMessage as LettaToolCallMessage, @@ -23,6 +24,7 @@ import type { ToolReturnMessage as LettaToolReturnMessage } from "@letta-ai/lett // Re-export letta-client types that consumers may need export type { LettaStreamingResponse, + LettaMessage, ToolCall, StopReasonType, MessageCreate, @@ -622,6 +624,84 @@ export interface UserInput { message: MessageCreate; } +// ═══════════════════════════════════════════════════════════════ +// STATIC TRANSCRIPT SYNC +// Emitted by the WS listen client when a remote consumer (SDK, +// desktop app) connects or reconnects mid-session. Together they +// allow the consumer to reconstruct the full session state without +// polling. See listen-client.ts for the emit sequence. +// ═══════════════════════════════════════════════════════════════ + +/** + * Emitted once during the static sync phase (before sync_complete). + * Carries committed message history for the current conversation. + * + * V1: always a single page (is_final: true). Pagination via multiple + * chunks (is_final: false on all but the last) is reserved for future use. + */ +export interface TranscriptBackfillMessage extends MessageEnvelope { + type: "transcript_backfill"; + /** Committed conversation messages in chronological order. */ + messages: LettaMessage[]; + /** + * True when this is the only or last backfill chunk for this sync. + * Future pagination will emit multiple chunks with is_final: false + * on all but the last. + */ + is_final: boolean; +} + +/** + * Emitted during the static sync phase when there are items in the + * turn queue at connect time. Gives the consumer a point-in-time + * snapshot of queue contents without requiring live queue events. + * + * Omitted entirely when the queue is empty at sync time. + */ +export interface QueueSnapshotMessage extends MessageEnvelope { + type: "queue_snapshot"; + /** Items currently in the queue, in enqueue order. */ + items: Array<{ + item_id: string; + kind: QueueItemKind; + source: QueueItemSource; + }>; +} + +/** + * Marks the end of the initial static sync phase. + * All transcript_backfill and queue_snapshot messages are guaranteed + * to precede this event. After sync_complete, the consumer receives + * live queue lifecycle events (queue_item_enqueued, etc.) and message + * stream events in real time. + * + * had_pending_turn: true means a turn was already in-flight when the + * consumer connected; message chunks for that turn will follow. + */ +export interface SyncCompleteMessage extends MessageEnvelope { + type: "sync_complete"; + had_pending_turn: boolean; +} + +/** + * Post-sync supplemental backfill. Emitted AFTER sync_complete when + * context (agent_id / conversation_id) was not available at connect + * time but became known from the first inbound message. + * + * Distinct from transcript_backfill (which is only emitted during the + * static phase) so clients can handle it without breaking the + * sync_complete contract. The client should replace its (empty) + * transcript with the messages provided here. + * + * Emitted at most once per connection (guarded by supplementSent flag + * in the listener runtime). + */ +export interface TranscriptSupplementMessage extends MessageEnvelope { + type: "transcript_supplement"; + /** Committed conversation messages in chronological order. */ + messages: LettaMessage[]; +} + // ═══════════════════════════════════════════════════════════════ // UNION TYPE // ═══════════════════════════════════════════════════════════════ @@ -640,4 +720,8 @@ export type WireMessage = | ResultMessage | ControlResponse | ControlRequest // CLI → SDK control requests (e.g., can_use_tool) - | QueueLifecycleEvent; + | QueueLifecycleEvent + | TranscriptBackfillMessage + | QueueSnapshotMessage + | SyncCompleteMessage + | TranscriptSupplementMessage; diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 37721a0..8a3928e 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -44,9 +44,13 @@ import type { MessageWire, ResultMessage as ProtocolResultMessage, QueueLifecycleEvent, + QueueSnapshotMessage, RecoveryMessage, RetryMessage, StopReasonType, + SyncCompleteMessage, + TranscriptBackfillMessage, + TranscriptSupplementMessage, } from "../types/protocol"; interface StartListenerOptions { @@ -373,7 +377,11 @@ export type WsProtocolEvent = | RetryMessage | RecoveryMessage | ProtocolResultMessage - | QueueLifecycleEvent; + | QueueLifecycleEvent + | TranscriptBackfillMessage + | QueueSnapshotMessage + | SyncCompleteMessage + | TranscriptSupplementMessage; /** * Single adapter for all outbound typed protocol events.