feat(listen): add protocol_v2, move ws server to v2 (#1387)

Co-authored-by: Shubham Naik <shub@letta.com>
Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-03-16 14:46:56 -07:00
committed by GitHub
parent 8ecf39798c
commit 3edaf91ee4
12 changed files with 4215 additions and 2106 deletions

View File

@@ -4,12 +4,16 @@ import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents"
import type { Message } from "@letta-ai/letta-client/resources/agents/messages";
import { getResumeData } from "../../agent/check-approval";
function makeAgent(overrides: Partial<AgentState> = {}): AgentState {
type ResumeAgentState = AgentState & {
in_context_message_ids?: string[] | null;
};
function makeAgent(overrides: Partial<ResumeAgentState> = {}): AgentState {
return {
id: "agent-test",
message_ids: ["msg-last"],
...overrides,
} as AgentState;
} as ResumeAgentState;
}
function makeApprovalMessage(id = "msg-last"): Message {
@@ -74,7 +78,9 @@ describe("getResumeData", () => {
const conversationsList = mock(async () => ({
getPaginatedItems: () => [],
}));
const agentsList = mock(async () => ({ items: [] }));
const agentsList = mock(async () => ({
getPaginatedItems: () => [makeApprovalMessage()],
}));
const messagesRetrieve = mock(async () => [makeApprovalMessage()]);
const client = {
@@ -88,7 +94,10 @@ describe("getResumeData", () => {
const resume = await getResumeData(
client,
makeAgent({ message_ids: ["msg-last"] }),
makeAgent({
message_ids: ["msg-last"],
in_context_message_ids: ["msg-last"],
}),
"default",
{ includeMessageHistory: false },
);
@@ -99,6 +108,60 @@ describe("getResumeData", () => {
expect(resume.messageHistory).toEqual([]);
});
test("default conversation resume uses in-context ids instead of stale agent.message_ids", async () => {
const agentsList = mock(async () => ({
getPaginatedItems: () => [makeApprovalMessage("msg-default-latest")],
}));
const messagesRetrieve = mock(async () => [
makeApprovalMessage("msg-live"),
]);
const client = {
agents: { messages: { list: agentsList } },
messages: { retrieve: messagesRetrieve },
} as unknown as Letta;
const resume = await getResumeData(
client,
makeAgent({
message_ids: ["msg-stale"],
in_context_message_ids: ["msg-live"],
}),
"default",
{ includeMessageHistory: false },
);
expect(messagesRetrieve).toHaveBeenCalledWith("msg-live");
expect(messagesRetrieve).toHaveBeenCalledTimes(1);
expect(agentsList).toHaveBeenCalledTimes(0);
expect(resume.pendingApprovals).toHaveLength(1);
expect(resume.pendingApprovals[0]?.toolCallId).toBe("tool-1");
});
test("default conversation falls back to default conversation stream when in-context ids are unavailable", async () => {
const agentsList = mock(async () => ({
getPaginatedItems: () => [makeApprovalMessage("msg-default-latest")],
}));
const messagesRetrieve = mock(async () => [makeUserMessage("msg-stale")]);
const client = {
agents: { messages: { list: agentsList } },
messages: { retrieve: messagesRetrieve },
} as unknown as Letta;
const resume = await getResumeData(
client,
makeAgent({ in_context_message_ids: [] }),
"default",
{ includeMessageHistory: false },
);
expect(messagesRetrieve).toHaveBeenCalledTimes(0);
expect(agentsList).toHaveBeenCalledTimes(1);
expect(resume.pendingApprovals).toHaveLength(1);
expect(resume.pendingApprovals[0]?.toolCallId).toBe("tool-1");
});
test("default behavior keeps backfill enabled when options are omitted", async () => {
const conversationsRetrieve = mock(async () => ({
in_context_message_ids: ["msg-last"],
@@ -119,7 +182,11 @@ describe("getResumeData", () => {
messages: { retrieve: messagesRetrieve },
} as unknown as Letta;
const resume = await getResumeData(client, makeAgent(), "default");
const resume = await getResumeData(
client,
makeAgent({ in_context_message_ids: ["msg-last"] }),
"default",
);
expect(messagesRetrieve).toHaveBeenCalledTimes(1);
expect(agentsList).toHaveBeenCalledTimes(1);

View File

@@ -2,7 +2,7 @@
* Tests for the static transcript sync protocol types (LSS1).
*
* Verifies structural correctness, discriminant exhaustiveness, and
* membership in WireMessage / WsProtocolEvent unions.
* membership in the legacy WireMessage union.
*/
import { describe, expect, test } from "bun:test";
@@ -13,7 +13,6 @@ import type {
TranscriptSupplementMessage,
WireMessage,
} from "../../types/protocol";
import type { WsProtocolEvent } from "../../websocket/listen-client";
// ── Helpers ───────────────────────────────────────────────────────
@@ -186,45 +185,6 @@ describe("WireMessage union membership", () => {
});
});
describe("WsProtocolEvent union membership", () => {
test("TranscriptBackfillMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "transcript_backfill",
messages: [],
is_final: true,
};
expect(msg.type).toBe("transcript_backfill");
});
test("QueueSnapshotMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "queue_snapshot",
items: [],
};
expect(msg.type).toBe("queue_snapshot");
});
test("SyncCompleteMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "sync_complete",
had_pending_turn: false,
};
expect(msg.type).toBe("sync_complete");
});
test("TranscriptSupplementMessage is assignable to WsProtocolEvent", () => {
const msg: WsProtocolEvent = {
...ENVELOPE,
type: "transcript_supplement",
messages: [],
};
expect(msg.type).toBe("transcript_supplement");
});
});
// ── Discriminant exhaustiveness ───────────────────────────────────
describe("type discriminants are unique across all four types", () => {

File diff suppressed because it is too large Load Diff

View File

@@ -210,6 +210,8 @@ describe("extractInterruptToolReturns", () => {
test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => {
const runtime = createRuntime();
const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
runtime.activeAgentId = "agent-1";
runtime.activeConversationId = "default";
const approvals: ApprovalResult[] = [
{
type: "tool",
@@ -231,32 +233,45 @@ describe("extractInterruptToolReturns", () => {
JSON.parse(raw),
);
const toolReturnFrames = parsed.filter(
(payload) => payload.message_type === "tool_return_message",
(payload) =>
payload.type === "stream_delta" &&
payload.delta?.message_type === "tool_return_message",
);
expect(toolReturnFrames).toHaveLength(2);
expect(toolReturnFrames[0]).toMatchObject({
run_id: "run-1",
delta: {
run_id: "run-1",
tool_returns: [
{ tool_call_id: "call-a", status: "success", tool_return: "704" },
],
},
});
expect(toolReturnFrames[1]).toMatchObject({
delta: {
run_id: "run-1",
tool_returns: [
{
tool_call_id: "call-b",
status: "error",
tool_return: "User interrupted the stream",
},
],
},
});
expect(toolReturnFrames[0].delta).toMatchObject({
tool_returns: [
{ tool_call_id: "call-a", status: "success", tool_return: "704" },
],
});
expect(toolReturnFrames[1]).toMatchObject({
run_id: "run-1",
tool_returns: [
{
tool_call_id: "call-b",
status: "error",
tool_return: "User interrupted the stream",
},
],
});
expect(toolReturnFrames[0]).not.toHaveProperty("tool_call_id");
expect(toolReturnFrames[0]).not.toHaveProperty("status");
expect(toolReturnFrames[0]).not.toHaveProperty("tool_return");
expect(toolReturnFrames[1]).not.toHaveProperty("tool_call_id");
expect(toolReturnFrames[1]).not.toHaveProperty("status");
expect(toolReturnFrames[1]).not.toHaveProperty("tool_return");
expect(toolReturnFrames[0].delta.tool_call_id).toBe("call-a");
expect(toolReturnFrames[0].delta.status).toBe("success");
expect(toolReturnFrames[0].delta.tool_return).toBe("704");
expect(toolReturnFrames[1].delta.tool_call_id).toBe("call-b");
expect(toolReturnFrames[1].delta.status).toBe("error");
expect(toolReturnFrames[1].delta.tool_return).toBe(
"User interrupted the stream",
);
});
});
@@ -791,9 +806,8 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial",
describe("cancel-induced stop reason reclassification", () => {
/**
* Mirrors the effectiveStopReason computation from the Case 3 stream path.
* Both the legacy (sendClientMessage) and modern (emitToWS) branches now
* use effectiveStopReason — this test verifies the reclassification logic
* that both branches depend on.
* Both the legacy and canonical listener branches use effectiveStopReason.
* This test verifies the reclassification logic those branches depend on.
*/
function computeEffectiveStopReason(
cancelRequested: boolean,

View File

@@ -7,10 +7,9 @@
* - Single message: enqueued → dequeued, no blocked, real queue_len
* - Two rapid synchronous arrivals: second gets blocked(runtime_busy)
* because pendingTurns is incremented before the .then() chain
* - Connection close: queue_cleared("shutdown") emitted once
* - Per-turn error: no queue_cleared — queue continues for remaining turns
* - Connection close: queue clear still happens once in QueueRuntime
* - Per-turn error: no queue clear — queue continues for remaining turns
* - ApprovalCreate payloads (no `content` field) are not enqueued
* - QueueLifecycleEvent is assignable to WsProtocolEvent (type-level)
*/
import { describe, expect, test } from "bun:test";
@@ -23,17 +22,6 @@ import type {
QueueItem,
} from "../../queue/queueRuntime";
import { QueueRuntime } from "../../queue/queueRuntime";
import type { QueueLifecycleEvent } from "../../types/protocol";
import type { WsProtocolEvent } from "../../websocket/listen-client";
// ── Type-level assertion: QueueLifecycleEvent ⊆ WsProtocolEvent ──
// Imports the real WsProtocolEvent from listen-client. If QueueLifecycleEvent
// is ever removed from that union, this assertion fails at compile time.
type _AssertAssignable = QueueLifecycleEvent extends WsProtocolEvent
? true
: never;
const _typeCheck: _AssertAssignable = true;
void _typeCheck; // suppress unused warning
// ── Helpers ───────────────────────────────────────────────────────
@@ -264,7 +252,7 @@ describe("ApprovalCreate payloads", () => {
});
describe("connection close", () => {
test("clear(shutdown) emits queue_cleared exactly once for intentional close", () => {
test("clear(shutdown) reports a single clear callback for intentional close", () => {
const { q, rec } = buildRuntime();
q.clear("shutdown");
expect(rec.cleared).toHaveLength(1);
@@ -302,13 +290,13 @@ describe("per-turn error — no queue_cleared", () => {
// First turn: simulate error — finally still runs
simulateTurnStart(q, turns, arrival1, skipIds);
simulateTurnEnd(q, turns); // error path still hits finally
expect(rec.cleared).toHaveLength(0); // no queue_cleared
expect(rec.cleared).toHaveLength(0); // no queue clear
// Second callback no-ops; first turn already consumed coalesced batch.
simulateTurnStart(q, turns, arrival2, skipIds);
expect(rec.dequeued).toHaveLength(1);
simulateTurnEnd(q, turns);
expect(turns.value).toBe(0);
expect(rec.cleared).toHaveLength(0); // still no queue_cleared
expect(rec.cleared).toHaveLength(0); // still no queue clear
});
});