feat: propagate conversation_id on all WebSocket events (#1328)

Co-authored-by: Letta Code <noreply@letta.com>
Co-authored-by: cpacker <packercharles@gmail.com>
This commit is contained in:
Shubham Naik
2026-03-10 15:28:05 -07:00
committed by GitHub
parent 95c0d8aefa
commit b12744a408
5 changed files with 238 additions and 4 deletions

View File

@@ -16,6 +16,10 @@ type QueueItemBase = {
id: string;
/** Optional client-side message correlation ID from submit payloads. */
clientMessageId?: string;
/** Optional agent scope for listener-mode attribution. */
agentId?: string;
/** Optional conversation scope for listener-mode attribution. */
conversationId?: string;
source: QueueItemSource;
enqueuedAt: number;
};
@@ -55,6 +59,13 @@ export function isCoalescable(kind: QueueItemKind): boolean {
return kind === "message" || kind === "task_notification";
}
function hasSameScope(a: QueueItem, b: QueueItem): boolean {
return (
(a.agentId ?? null) === (b.agentId ?? null) &&
(a.conversationId ?? null) === (b.conversationId ?? null)
);
}
// ── Batch / callbacks ────────────────────────────────────────────
export interface DequeuedBatch {
@@ -77,7 +88,11 @@ export interface QueueCallbacks {
* Only fires when queue is non-empty.
*/
onBlocked?: (reason: QueueBlockedReason, queueLen: number) => void;
onCleared?: (reason: QueueClearedReason, clearedCount: number) => void;
onCleared?: (
reason: QueueClearedReason,
clearedCount: number,
items: QueueItem[],
) => void;
/**
* Fired when an item is dropped.
* queueLen is the post-operation queue depth:
@@ -226,9 +241,12 @@ export class QueueRuntime {
// Drain contiguous coalescable items from head
const batch: QueueItem[] = [];
const first = this.store[0];
while (
first !== undefined &&
this.store.length > 0 &&
isCoalescable(this.store[0]?.kind ?? "approval_result")
isCoalescable(this.store[0]?.kind ?? "approval_result") &&
hasSameScope(first, this.store[0] as QueueItem)
) {
const item = this.store.shift();
if (item) batch.push(item);
@@ -300,10 +318,11 @@ export class QueueRuntime {
/** Remove all items and fire onCleared. */
clear(reason: QueueClearedReason): void {
const count = this.store.length;
const clearedItems = this.store.slice();
this.store.length = 0;
this.lastEmittedBlockedReason = null;
this.blockedEmittedForNonEmpty = false;
this.safeCallback("onCleared", reason, count);
this.safeCallback("onCleared", reason, count, clearedItems);
}
// ── Accessors ──────────────────────────────────────────────────

View File

@@ -12,6 +12,20 @@ function makeMsg(text = "hello"): Omit<MessageQueueItem, "id" | "enqueuedAt"> {
return { kind: "message", source: "user", content: text };
}
function makeScopedMsg(params: {
text?: string;
agentId?: string;
conversationId?: string;
}): Omit<MessageQueueItem, "id" | "enqueuedAt"> {
return {
kind: "message",
source: "user",
content: params.text ?? "hello",
agentId: params.agentId,
conversationId: params.conversationId,
};
}
function makeTask(
text = "<notification/>",
): Omit<
@@ -194,6 +208,32 @@ describe("dequeue coalescable items", () => {
expect(b?.batchId).toMatch(/^batch-\d+$/);
});
test("does not coalesce adjacent items from different conversation scopes", () => {
const q = new QueueRuntime();
q.enqueue(
makeScopedMsg({
text: "a",
agentId: "agent-1",
conversationId: "default",
}),
);
q.enqueue(
makeScopedMsg({
text: "b",
agentId: "agent-2",
conversationId: "default",
}),
);
const first = q.tryDequeue(null);
expect(first?.items).toHaveLength(1);
expect((first?.items[0] as MessageQueueItem).content).toBe("a");
const second = q.tryDequeue(null);
expect(second?.items).toHaveLength(1);
expect((second?.items[0] as MessageQueueItem).content).toBe("b");
});
test("length is 0 after full dequeue", () => {
const q = new QueueRuntime();
q.enqueue(makeMsg());

View File

@@ -6,6 +6,7 @@ import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/mes
import WebSocket from "ws";
import { buildConversationMessagesCreateRequestBody } from "../../agent/message";
import { INTERRUPTED_BY_USER } from "../../constants";
import type { MessageQueueItem } from "../../queue/queueRuntime";
import type { ControlRequest, ControlResponseBody } from "../../types/protocol";
import {
__listenClientTestUtils,
@@ -244,6 +245,84 @@ describe("listen-client requestApprovalOverWS", () => {
});
});
describe("listen-client conversation-scoped protocol events", () => {
test("queue lifecycle events carry agent_id and conversation_id from the queued item", () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
runtime.socket = socket as unknown as WebSocket;
const input: Omit<MessageQueueItem, "id" | "enqueuedAt"> = {
kind: "message",
source: "user",
content: "hello",
clientMessageId: "cm-queue-1",
agentId: "agent-default",
conversationId: "default",
};
const item = runtime.queueRuntime.enqueue(input);
expect(item).not.toBeNull();
runtime.queueRuntime.tryDequeue("runtime_busy");
const enqueued = JSON.parse(socket.sentPayloads[0] as string);
expect(enqueued.type).toBe("queue_item_enqueued");
expect(enqueued.agent_id).toBe("agent-default");
expect(enqueued.conversation_id).toBe("default");
const blocked = JSON.parse(socket.sentPayloads[1] as string);
expect(blocked.type).toBe("queue_blocked");
expect(blocked.agent_id).toBe("agent-default");
expect(blocked.conversation_id).toBe("default");
});
test("cancel_ack includes agent_id and conversation_id", () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
runtime.activeAgentId = "agent-123";
runtime.activeConversationId = "default";
runtime.activeRunId = "run-123";
__listenClientTestUtils.emitCancelAck(
socket as unknown as WebSocket,
runtime,
{
requestId: "cancel-1",
accepted: true,
},
);
const sent = JSON.parse(socket.sentPayloads[0] as string);
expect(sent.type).toBe("cancel_ack");
expect(sent.agent_id).toBe("agent-123");
expect(sent.conversation_id).toBe("default");
expect(sent.run_id).toBe("run-123");
});
test("queue_batch_dequeued keeps the batch scope", () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
runtime.socket = socket as unknown as WebSocket;
const input: Omit<MessageQueueItem, "id" | "enqueuedAt"> = {
kind: "message",
source: "user",
content: "hello",
clientMessageId: "cm-queue-2",
agentId: "agent-xyz",
conversationId: "conv-xyz",
};
runtime.queueRuntime.enqueue(input);
runtime.queueRuntime.tryDequeue(null);
const dequeued = JSON.parse(socket.sentPayloads[1] as string);
expect(dequeued.type).toBe("queue_batch_dequeued");
expect(dequeued.agent_id).toBe("agent-xyz");
expect(dequeued.conversation_id).toBe("conv-xyz");
});
});
describe("listen-client state_response control protocol", () => {
test("always advertises control_response capability", () => {
const runtime = __listenClientTestUtils.createRuntime();

View File

@@ -75,6 +75,10 @@ export interface MessageEnvelope {
uuid: string;
/** Monotonic per-session event sequence. Optional for backward compatibility. */
event_seq?: number;
/** Agent that triggered this event. Used with default conversation scoping. */
agent_id?: string;
/** Conversation that triggered this event. Used for conversation-scoped filtering. */
conversation_id?: string;
}
// ═══════════════════════════════════════════════════════════════
@@ -162,6 +166,8 @@ export type MessageWire = {
type: "message";
session_id: string;
uuid: string;
agent_id?: string;
conversation_id?: string;
} & LettaStreamingResponse;
// ═══════════════════════════════════════════════════════════════
@@ -422,6 +428,10 @@ export interface ControlRequest {
type: "control_request";
request_id: string;
request: ControlRequestBody;
/** Agent that triggered this control request. */
agent_id?: string;
/** Conversation that triggered this control request. */
conversation_id?: string;
}
// SDK → CLI request subtypes

View File

@@ -120,6 +120,8 @@ interface RunStartedMessage {
batch_id: string;
event_seq?: number;
session_id?: string;
agent_id?: string;
conversation_id?: string;
}
interface RunRequestErrorMessage {
@@ -132,6 +134,8 @@ interface RunRequestErrorMessage {
batch_id?: string;
event_seq?: number;
session_id?: string;
agent_id?: string;
conversation_id?: string;
}
interface ModeChangeMessage {
@@ -471,6 +475,35 @@ const MAX_RETRY_DURATION_MS = 5 * 60 * 1000; // 5 minutes
const INITIAL_RETRY_DELAY_MS = 1000; // 1 second
const MAX_RETRY_DELAY_MS = 30000; // 30 seconds
function getQueueItemScope(item?: QueueItem | null): {
agent_id?: string;
conversation_id?: string;
} {
if (!item) {
return {};
}
return {
agent_id: item.agentId,
conversation_id: item.conversationId,
};
}
function getQueueItemsScope(items: QueueItem[]): {
agent_id?: string;
conversation_id?: string;
} {
const first = items[0];
if (!first) {
return {};
}
const sameScope = items.every(
(item) =>
(item.agentId ?? null) === (first.agentId ?? null) &&
(item.conversationId ?? null) === (first.conversationId ?? null),
);
return sameScope ? getQueueItemScope(first) : {};
}
function createRuntime(): ListenerRuntime {
const bootWorkingDirectory = process.env.USER_CWD || process.cwd();
const runtime: ListenerRuntime = {
@@ -523,6 +556,7 @@ function createRuntime(): ListenerRuntime {
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-enq-${item.id}`,
...getQueueItemScope(item),
});
}
},
@@ -536,6 +570,7 @@ function createRuntime(): ListenerRuntime {
queue_len_after: batch.queueLenAfter,
session_id: runtime.sessionId,
uuid: `q-deq-${batch.batchId}`,
...getQueueItemsScope(batch.items),
});
}
},
@@ -547,10 +582,11 @@ function createRuntime(): ListenerRuntime {
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-blk-${crypto.randomUUID()}`,
...getQueueItemScope(runtime.queueRuntime.items[0]),
});
}
},
onCleared: (reason, clearedCount) => {
onCleared: (reason, clearedCount, items) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_cleared",
@@ -558,6 +594,7 @@ function createRuntime(): ListenerRuntime {
cleared_count: clearedCount,
session_id: runtime.sessionId,
uuid: `q-clr-${crypto.randomUUID()}`,
...getQueueItemsScope(items),
});
}
},
@@ -571,6 +608,7 @@ function createRuntime(): ListenerRuntime {
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-drp-${item.id}`,
...getQueueItemScope(item),
});
}
},
@@ -942,6 +980,8 @@ function emitCancelAck(
accepted: boolean;
reason?: string;
runId?: string | null;
agentId?: string | null;
conversationId?: string | null;
},
): void {
emitToWS(socket, {
@@ -950,6 +990,9 @@ function emitCancelAck(
accepted: params.accepted,
reason: params.reason,
run_id: params.runId ?? runtime.activeRunId,
agent_id: params.agentId ?? runtime.activeAgentId ?? undefined,
conversation_id:
params.conversationId ?? runtime.activeConversationId ?? undefined,
session_id: runtime.sessionId,
uuid: `cancel-ack-${params.requestId}`,
} as CancelAckMessage);
@@ -1441,6 +1484,7 @@ function emitInterruptToolReturnMessage(
id: `message-${crypto.randomUUID()}`,
date: new Date().toISOString(),
run_id: resolvedRunId,
agent_id: runtime.activeAgentId ?? undefined,
tool_returns: [
{
tool_call_id: toolReturn.tool_call_id,
@@ -1452,6 +1496,7 @@ function emitInterruptToolReturnMessage(
],
session_id: runtime.sessionId,
uuid: `${uuidPrefix}-${crypto.randomUUID()}`,
conversation_id: runtime.activeConversationId ?? undefined,
} as unknown as MessageWire);
}
}
@@ -1815,6 +1860,8 @@ async function sendMessageStreamWithRetry(
delay_ms: delayMs,
session_id: runtime.sessionId,
uuid: `retry-${crypto.randomUUID()}`,
agent_id: runtime.activeAgentId ?? undefined,
conversation_id: conversationId,
} as RetryMessage);
await new Promise((resolve) => setTimeout(resolve, delayMs));
@@ -1840,6 +1887,8 @@ async function sendMessageStreamWithRetry(
delay_ms: delayMs,
session_id: runtime.sessionId,
uuid: `retry-${crypto.randomUUID()}`,
agent_id: runtime.activeAgentId ?? undefined,
conversation_id: conversationId,
} as RetryMessage);
await new Promise((resolve) => setTimeout(resolve, delayMs));
@@ -1982,6 +2031,8 @@ async function recoverPendingApprovals(
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: agentId,
conversation_id: conversationId,
});
runtime.lastStopReason = "requires_approval";
return;
@@ -2031,6 +2082,8 @@ async function recoverPendingApprovals(
: "auto-approved",
session_id: runtime.sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
agent_id: agentId,
conversation_id: conversationId,
} as AutoApprovalMessage);
}
@@ -2072,6 +2125,8 @@ async function recoverPendingApprovals(
blocked_path: null,
...(diffs.length > 0 ? { diffs } : {}),
},
agent_id: agentId,
conversation_id: conversationId,
};
const responseBody = await requestApprovalOverWS(
@@ -2105,6 +2160,8 @@ async function recoverPendingApprovals(
matched_rule: "canUseTool callback",
session_id: runtime.sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
agent_id: agentId,
conversation_id: conversationId,
} as AutoApprovalMessage);
} else {
decisions.push({
@@ -2496,6 +2553,8 @@ async function connectWithRetry(
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: runtime.activeAgentId ?? undefined,
conversation_id: runtime.activeConversationId ?? undefined,
});
} finally {
runtime.pendingTurns--;
@@ -2529,6 +2588,8 @@ async function connectWithRetry(
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: runtime.activeAgentId ?? undefined,
conversation_id: runtime.activeConversationId ?? undefined,
});
return;
}
@@ -2548,6 +2609,8 @@ async function connectWithRetry(
content: userPayload.content,
clientMessageId:
userPayload.client_message_id ?? `cm-submit-${crypto.randomUUID()}`,
agentId: parsed.agentId ?? undefined,
conversationId: parsed.conversationId || "default",
} as Parameters<typeof runtime.queueRuntime.enqueue>[0]);
enqueuedQueueItemId = enqueuedItem?.id ?? null;
// Emit blocked on state transition when turns are already queued.
@@ -2850,6 +2913,8 @@ async function handleIncomingMessage(
type: "run_started",
runId: maybeRunId,
batch_id: dequeuedBatchId,
agent_id: agentId,
conversation_id: conversationId,
});
}
}
@@ -2864,6 +2929,8 @@ async function handleIncomingMessage(
run_id: runId || errorInfo.run_id,
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: agentId,
conversation_id: conversationId,
});
}
@@ -2883,6 +2950,8 @@ async function handleIncomingMessage(
session_id: runtime.sessionId,
uuid:
chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(),
agent_id: agentId,
conversation_id: conversationId,
} as unknown as MessageWire);
}
}
@@ -2954,6 +3023,8 @@ async function handleIncomingMessage(
run_id: runId || msgRunIds[msgRunIds.length - 1] || undefined,
session_id: runtime.sessionId,
uuid: `recovery-${crypto.randomUUID()}`,
agent_id: agentId,
conversation_id: conversationId,
} as RecoveryMessage);
try {
@@ -3029,6 +3100,8 @@ async function handleIncomingMessage(
run_id: runId,
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: agentId,
conversation_id: conversationId,
});
emitTurnResult(socket, runtime, {
subtype: "error",
@@ -3055,6 +3128,8 @@ async function handleIncomingMessage(
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: agentId,
conversation_id: conversationId,
});
emitTurnResult(socket, runtime, {
subtype: "error",
@@ -3126,6 +3201,8 @@ async function handleIncomingMessage(
: "auto-approved",
session_id: runtime.sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
agent_id: agentId,
conversation_id: conversationId,
} as AutoApprovalMessage);
}
@@ -3166,6 +3243,8 @@ async function handleIncomingMessage(
blocked_path: null,
...(diffs.length > 0 ? { diffs } : {}),
},
agent_id: agentId,
conversation_id: conversationId,
};
const responseBody = await requestApprovalOverWS(
@@ -3200,6 +3279,8 @@ async function handleIncomingMessage(
matched_rule: "canUseTool callback",
session_id: runtime.sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
agent_id: agentId,
conversation_id: conversationId,
} as AutoApprovalMessage);
} else {
decisions.push({
@@ -3353,6 +3434,8 @@ async function handleIncomingMessage(
type: "run_request_error",
error: errorPayload,
batch_id: dequeuedBatchId,
agent_id: agentId,
conversation_id: conversationId,
});
}
@@ -3363,6 +3446,8 @@ async function handleIncomingMessage(
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
agent_id: agentId || undefined,
conversation_id: conversationId,
});
emitTurnResult(socket, runtime, {
subtype: "error",
@@ -3410,6 +3495,7 @@ export const __listenClientTestUtils = {
buildStateResponse,
handleCwdChange,
emitToWS,
emitCancelAck,
getConversationWorkingDirectory,
rememberPendingApprovalBatchIds,
resolvePendingApprovalBatchId,