feat(protocol): add static transcript sync message types (LSS1) (#1169)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-26 14:52:51 -08:00
committed by GitHub
parent 728555480a
commit 7a36d27e8f
3 changed files with 367 additions and 2 deletions

View File

@@ -0,0 +1,273 @@
/**
* Tests for the static transcript sync protocol types (LSS1).
*
* Verifies structural correctness, discriminant exhaustiveness, and
* membership in WireMessage / WsProtocolEvent unions.
*/
import { describe, expect, test } from "bun:test";
import type {
QueueSnapshotMessage,
SyncCompleteMessage,
TranscriptBackfillMessage,
TranscriptSupplementMessage,
WireMessage,
} from "../../types/protocol";
import type { WsProtocolEvent } from "../../websocket/listen-client";
// ── Helpers ───────────────────────────────────────────────────────
const ENVELOPE = { session_id: "sess-1", uuid: "uuid-1" } as const;
// ── TranscriptBackfillMessage ─────────────────────────────────────
describe("TranscriptBackfillMessage", () => {
test("minimal empty backfill is structurally valid", () => {
const msg: TranscriptBackfillMessage = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: true,
};
expect(msg.type).toBe("transcript_backfill");
expect(msg.messages).toHaveLength(0);
expect(msg.is_final).toBe(true);
});
test("is_final: false marks a non-terminal chunk", () => {
const msg: TranscriptBackfillMessage = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: false,
};
expect(msg.is_final).toBe(false);
});
test("type discriminant is 'transcript_backfill'", () => {
const msg: TranscriptBackfillMessage = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: true,
};
// Narrowing works via the discriminant
if (msg.type === "transcript_backfill") {
expect(msg.is_final).toBeDefined();
}
});
});
// ── QueueSnapshotMessage ──────────────────────────────────────────
describe("QueueSnapshotMessage", () => {
test("empty snapshot is valid", () => {
const msg: QueueSnapshotMessage = {
...ENVELOPE,
type: "queue_snapshot",
items: [],
};
expect(msg.type).toBe("queue_snapshot");
expect(msg.items).toHaveLength(0);
});
test("snapshot with items preserves order and fields", () => {
const msg: QueueSnapshotMessage = {
...ENVELOPE,
type: "queue_snapshot",
items: [
{ item_id: "item-1", kind: "message", source: "user" },
{
item_id: "item-2",
kind: "task_notification",
source: "task_notification",
},
],
};
expect(msg.items).toHaveLength(2);
const [first, second] = msg.items;
expect(first?.item_id).toBe("item-1");
expect(first?.kind).toBe("message");
expect(first?.source).toBe("user");
expect(second?.kind).toBe("task_notification");
});
});
// ── SyncCompleteMessage ───────────────────────────────────────────
describe("SyncCompleteMessage", () => {
test("had_pending_turn: false for idle connect", () => {
const msg: SyncCompleteMessage = {
...ENVELOPE,
type: "sync_complete",
had_pending_turn: false,
};
expect(msg.type).toBe("sync_complete");
expect(msg.had_pending_turn).toBe(false);
});
test("had_pending_turn: true for mid-turn connect", () => {
const msg: SyncCompleteMessage = {
...ENVELOPE,
type: "sync_complete",
had_pending_turn: true,
};
expect(msg.had_pending_turn).toBe(true);
});
});
// ── TranscriptSupplementMessage ───────────────────────────────────
describe("TranscriptSupplementMessage", () => {
test("empty supplement is valid", () => {
const msg: TranscriptSupplementMessage = {
...ENVELOPE,
type: "transcript_supplement",
messages: [],
};
expect(msg.type).toBe("transcript_supplement");
expect(msg.messages).toHaveLength(0);
});
test("distinct type discriminant from transcript_backfill", () => {
const backfill: TranscriptBackfillMessage = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: true,
};
const supplement: TranscriptSupplementMessage = {
...ENVELOPE,
type: "transcript_supplement",
messages: [],
};
expect(backfill.type).not.toBe(supplement.type);
});
});
// ── Union membership ──────────────────────────────────────────────
describe("WireMessage union membership", () => {
test("TranscriptBackfillMessage is assignable to WireMessage", () => {
const msg: WireMessage = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: true,
};
expect(msg.type).toBe("transcript_backfill");
});
test("QueueSnapshotMessage is assignable to WireMessage", () => {
const msg: WireMessage = {
...ENVELOPE,
type: "queue_snapshot",
items: [],
};
expect(msg.type).toBe("queue_snapshot");
});
test("SyncCompleteMessage is assignable to WireMessage", () => {
const msg: WireMessage = {
...ENVELOPE,
type: "sync_complete",
had_pending_turn: false,
};
expect(msg.type).toBe("sync_complete");
});
test("TranscriptSupplementMessage is assignable to WireMessage", () => {
const msg: WireMessage = {
...ENVELOPE,
type: "transcript_supplement",
messages: [],
};
expect(msg.type).toBe("transcript_supplement");
});
});
describe("WsProtocolEvent union membership", () => {
test("TranscriptBackfillMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: true,
};
expect(msg.type).toBe("transcript_backfill");
});
test("QueueSnapshotMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "queue_snapshot",
items: [],
};
expect(msg.type).toBe("queue_snapshot");
});
test("SyncCompleteMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "sync_complete",
had_pending_turn: false,
};
expect(msg.type).toBe("sync_complete");
});
test("TranscriptSupplementMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "transcript_supplement",
messages: [],
};
expect(msg.type).toBe("transcript_supplement");
});
});
// ── Discriminant exhaustiveness ───────────────────────────────────
describe("type discriminants are unique across all four types", () => {
test("all four sync-phase discriminants are distinct", () => {
const types = [
"transcript_backfill",
"queue_snapshot",
"sync_complete",
"transcript_supplement",
];
const unique = new Set(types);
expect(unique.size).toBe(types.length);
});
test("none conflict with existing WireMessage discriminants", () => {
// Existing discriminants: system, message, stream_event, auto_approval,
// error, retry, recovery, result, control_response, control_request,
// queue_item_enqueued, queue_batch_dequeued, queue_blocked, queue_cleared,
// queue_item_dropped
const existing = new Set([
"system",
"message",
"stream_event",
"auto_approval",
"error",
"retry",
"recovery",
"result",
"control_response",
"control_request",
"queue_item_enqueued",
"queue_batch_dequeued",
"queue_blocked",
"queue_cleared",
"queue_item_dropped",
]);
for (const t of [
"transcript_backfill",
"queue_snapshot",
"sync_complete",
"transcript_supplement",
]) {
expect(existing.has(t)).toBe(false);
}
});
});

View File

@@ -11,6 +11,7 @@
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type {
AssistantMessage as LettaAssistantMessage,
Message as LettaMessage,
ReasoningMessage as LettaReasoningMessage,
LettaStreamingResponse,
ToolCallMessage as LettaToolCallMessage,
@@ -23,6 +24,7 @@ import type { ToolReturnMessage as LettaToolReturnMessage } from "@letta-ai/lett
// Re-export letta-client types that consumers may need
export type {
LettaStreamingResponse,
LettaMessage,
ToolCall,
StopReasonType,
MessageCreate,
@@ -622,6 +624,84 @@ export interface UserInput {
message: MessageCreate;
}
// ═══════════════════════════════════════════════════════════════
// STATIC TRANSCRIPT SYNC
// Emitted by the WS listen client when a remote consumer (SDK,
// desktop app) connects or reconnects mid-session. Together they
// allow the consumer to reconstruct the full session state without
// polling. See listen-client.ts for the emit sequence.
// ═══════════════════════════════════════════════════════════════
/**
* Emitted once during the static sync phase (before sync_complete).
* Carries committed message history for the current conversation.
*
* V1: always a single page (is_final: true). Pagination via multiple
* chunks (is_final: false on all but the last) is reserved for future use.
*/
export interface TranscriptBackfillMessage extends MessageEnvelope {
type: "transcript_backfill";
/** Committed conversation messages in chronological order. */
messages: LettaMessage[];
/**
* True when this is the only or last backfill chunk for this sync.
* Future pagination will emit multiple chunks with is_final: false
* on all but the last.
*/
is_final: boolean;
}
/**
* Emitted during the static sync phase when there are items in the
* turn queue at connect time. Gives the consumer a point-in-time
* snapshot of queue contents without requiring live queue events.
*
* Omitted entirely when the queue is empty at sync time.
*/
export interface QueueSnapshotMessage extends MessageEnvelope {
type: "queue_snapshot";
/** Items currently in the queue, in enqueue order. */
items: Array<{
item_id: string;
kind: QueueItemKind;
source: QueueItemSource;
}>;
}
/**
* Marks the end of the initial static sync phase.
* All transcript_backfill and queue_snapshot messages are guaranteed
* to precede this event. After sync_complete, the consumer receives
* live queue lifecycle events (queue_item_enqueued, etc.) and message
* stream events in real time.
*
* had_pending_turn: true means a turn was already in-flight when the
* consumer connected; message chunks for that turn will follow.
*/
export interface SyncCompleteMessage extends MessageEnvelope {
type: "sync_complete";
had_pending_turn: boolean;
}
/**
* Post-sync supplemental backfill. Emitted AFTER sync_complete when
* context (agent_id / conversation_id) was not available at connect
* time but became known from the first inbound message.
*
* Distinct from transcript_backfill (which is only emitted during the
* static phase) so clients can handle it without breaking the
* sync_complete contract. The client should replace its (empty)
* transcript with the messages provided here.
*
* Emitted at most once per connection (guarded by supplementSent flag
* in the listener runtime).
*/
export interface TranscriptSupplementMessage extends MessageEnvelope {
type: "transcript_supplement";
/** Committed conversation messages in chronological order. */
messages: LettaMessage[];
}
// ═══════════════════════════════════════════════════════════════
// UNION TYPE
// ═══════════════════════════════════════════════════════════════
@@ -640,4 +720,8 @@ export type WireMessage =
| ResultMessage
| ControlResponse
| ControlRequest // CLI → SDK control requests (e.g., can_use_tool)
| QueueLifecycleEvent;
| QueueLifecycleEvent
| TranscriptBackfillMessage
| QueueSnapshotMessage
| SyncCompleteMessage
| TranscriptSupplementMessage;

View File

@@ -44,9 +44,13 @@ import type {
MessageWire,
ResultMessage as ProtocolResultMessage,
QueueLifecycleEvent,
QueueSnapshotMessage,
RecoveryMessage,
RetryMessage,
StopReasonType,
SyncCompleteMessage,
TranscriptBackfillMessage,
TranscriptSupplementMessage,
} from "../types/protocol";
interface StartListenerOptions {
@@ -373,7 +377,11 @@ export type WsProtocolEvent =
| RetryMessage
| RecoveryMessage
| ProtocolResultMessage
| QueueLifecycleEvent;
| QueueLifecycleEvent
| TranscriptBackfillMessage
| QueueSnapshotMessage
| SyncCompleteMessage
| TranscriptSupplementMessage;
/**
* Single adapter for all outbound typed protocol events.