feat: ws emissions parity (#1158)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-25 21:52:44 -08:00
committed by GitHub
parent 2c243151c6
commit 6ddd54592b
2 changed files with 458 additions and 43 deletions

View File

@@ -376,3 +376,139 @@ describe("listen-client capability-gated approval flow", () => {
rejectPendingApprovalResolvers(runtime, "test cleanup");
});
});
describe("listen-client emitToWS adapter", () => {
test("sends event when socket is OPEN", () => {
const socket = new MockSocket(WebSocket.OPEN);
const event = {
type: "error" as const,
message: "test error",
stop_reason: "error" as const,
session_id: "listen-test",
uuid: "test-uuid",
};
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event);
expect(socket.sentPayloads).toHaveLength(1);
const sent = JSON.parse(socket.sentPayloads[0] as string);
expect(sent.type).toBe("error");
expect(sent.message).toBe("test error");
expect(sent.session_id).toBe("listen-test");
});
test("does not send when socket is CLOSED", () => {
const socket = new MockSocket(WebSocket.CLOSED);
const event = {
type: "error" as const,
message: "test error",
stop_reason: "error" as const,
session_id: "listen-test",
uuid: "test-uuid",
};
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event);
expect(socket.sentPayloads).toHaveLength(0);
});
test("emits RecoveryMessage with recovery_type", () => {
const socket = new MockSocket(WebSocket.OPEN);
const event: Parameters<typeof __listenClientTestUtils.emitToWS>[1] = {
type: "recovery",
recovery_type: "approval_pending",
message: "Detected pending approval conflict",
session_id: "listen-abc",
uuid: "recovery-123",
};
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event);
expect(socket.sentPayloads).toHaveLength(1);
const sent = JSON.parse(socket.sentPayloads[0] as string);
expect(sent.type).toBe("recovery");
expect(sent.recovery_type).toBe("approval_pending");
expect(sent.session_id).toBe("listen-abc");
});
test("emits AutoApprovalMessage with tool_call shape", () => {
const socket = new MockSocket(WebSocket.OPEN);
const event = {
type: "auto_approval" as const,
tool_call: {
name: "Write",
tool_call_id: "call-123",
arguments: '{"file_path": "/test.ts"}',
},
reason: "auto-approved",
matched_rule: "auto-approved",
session_id: "listen-test",
uuid: "auto-approval-call-123",
};
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event);
expect(socket.sentPayloads).toHaveLength(1);
const sent = JSON.parse(socket.sentPayloads[0] as string);
expect(sent.type).toBe("auto_approval");
expect(sent.tool_call.name).toBe("Write");
expect(sent.tool_call.tool_call_id).toBe("call-123");
});
test("emits RetryMessage with attempt/delay details", () => {
const socket = new MockSocket(WebSocket.OPEN);
const event = {
type: "retry" as const,
reason: "llm_api_error" as const,
attempt: 1,
max_attempts: 3,
delay_ms: 1000,
session_id: "listen-test",
uuid: "retry-123",
};
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event);
expect(socket.sentPayloads).toHaveLength(1);
const sent = JSON.parse(socket.sentPayloads[0] as string);
expect(sent.type).toBe("retry");
expect(sent.attempt).toBe(1);
expect(sent.max_attempts).toBe(3);
expect(sent.delay_ms).toBe(1000);
});
test("emits rich ResultMessage with full metadata", () => {
const socket = new MockSocket(WebSocket.OPEN);
const event = {
type: "result" as const,
subtype: "success" as const,
agent_id: "agent-123",
conversation_id: "conv-456",
duration_ms: 1500,
duration_api_ms: 0,
num_turns: 2,
result: null,
run_ids: ["run-1", "run-2"],
usage: null,
session_id: "listen-test",
uuid: "result-123",
};
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event);
expect(socket.sentPayloads).toHaveLength(1);
const sent = JSON.parse(socket.sentPayloads[0] as string);
expect(sent.type).toBe("result");
expect(sent.subtype).toBe("success");
expect(sent.agent_id).toBe("agent-123");
expect(sent.num_turns).toBe(2);
expect(sent.run_ids).toEqual(["run-1", "run-2"]);
});
test("runtime sessionId is stable and uses listen- prefix", () => {
const runtime = __listenClientTestUtils.createRuntime();
expect(runtime.sessionId).toMatch(/^listen-/);
// Verify it's a UUID format after the prefix
expect(runtime.sessionId.length).toBeGreaterThan(10);
});
});

View File

@@ -3,6 +3,7 @@
* Connects to Letta Cloud and receives messages to execute locally
*/
import { APIError } from "@letta-ai/letta-client/core/error";
import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type {
@@ -18,7 +19,12 @@ import {
} from "../agent/approval-execution";
import { getResumeData } from "../agent/check-approval";
import { getClient } from "../agent/client";
import { sendMessageStream } from "../agent/message";
import { getStreamToolContextId, sendMessageStream } from "../agent/message";
import {
extractConflictDetail,
getPreStreamErrorAction,
parseRetryAfterHeaderMs,
} from "../agent/turn-recovery-policy";
import { createBuffers } from "../cli/helpers/accumulator";
import { classifyApprovals } from "../cli/helpers/approvalClassification";
import { generatePlanFilePath } from "../cli/helpers/planName";
@@ -29,9 +35,16 @@ import { settingsManager } from "../settings-manager";
import { isInteractiveApprovalTool } from "../tools/interactivePolicy";
import { loadTools } from "../tools/manager";
import type {
AutoApprovalMessage,
CanUseToolResponse,
ControlRequest,
ControlResponseBody,
ErrorMessage,
MessageWire,
ResultMessage as ProtocolResultMessage,
RecoveryMessage,
RetryMessage,
StopReasonType,
} from "../types/protocol";
interface StartListenerOptions {
@@ -125,6 +138,8 @@ type ListenerRuntime = {
pendingApprovalResolvers: Map<string, PendingApprovalResolver>;
/** Latched once supportsControlResponse is seen on any message. */
controlResponseCapable: boolean;
/** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */
sessionId: string;
};
type ApprovalSlot =
@@ -186,6 +201,7 @@ function createRuntime(): ListenerRuntime {
messageQueue: Promise.resolve(),
pendingApprovalResolvers: new Map(),
controlResponseCapable: false,
sessionId: `listen-${crypto.randomUUID()}`,
};
}
@@ -285,6 +301,119 @@ function sendControlMessageOverWebSocket(
socket.send(JSON.stringify(payload));
}
// ── Typed protocol event adapter ────────────────────────────────
type WsProtocolEvent =
| MessageWire
| AutoApprovalMessage
| ErrorMessage
| RetryMessage
| RecoveryMessage
| ProtocolResultMessage;
/**
* Single adapter for all outbound typed protocol events.
* Passthrough for now — provides a seam for future filtering/versioning/redacting.
*/
function emitToWS(socket: WebSocket, event: WsProtocolEvent): void {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(event));
}
}
const LLM_API_ERROR_MAX_RETRIES = 3;
/**
* Wrap sendMessageStream with pre-stream error handling (retry/recovery).
* Mirrors headless bidirectional mode's pre-stream error handling.
*/
async function sendMessageStreamWithRetry(
conversationId: string,
messages: Parameters<typeof sendMessageStream>[1],
opts: Parameters<typeof sendMessageStream>[2],
socket: WebSocket,
runtime: ListenerRuntime,
): Promise<Awaited<ReturnType<typeof sendMessageStream>>> {
let transientRetries = 0;
let conversationBusyRetries = 0;
const MAX_CONVERSATION_BUSY_RETRIES = 1;
// eslint-disable-next-line no-constant-condition
while (true) {
try {
return await sendMessageStream(conversationId, messages, opts);
} catch (preStreamError) {
const errorDetail = extractConflictDetail(preStreamError);
const action = getPreStreamErrorAction(
errorDetail,
conversationBusyRetries,
MAX_CONVERSATION_BUSY_RETRIES,
{
status:
preStreamError instanceof APIError
? preStreamError.status
: undefined,
transientRetries,
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
},
);
if (action === "resolve_approval_pending") {
// Listener can't auto-resolve pending approvals like headless does.
// Rethrow — the cloud will resend with the approval.
throw preStreamError;
}
if (action === "retry_transient") {
const attempt = transientRetries + 1;
const retryAfterMs =
preStreamError instanceof APIError
? parseRetryAfterHeaderMs(
preStreamError.headers?.get("retry-after"),
)
: null;
const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1);
transientRetries = attempt;
emitToWS(socket, {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
session_id: runtime.sessionId,
uuid: `retry-${crypto.randomUUID()}`,
} as RetryMessage);
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
if (action === "retry_conversation_busy") {
const attempt = conversationBusyRetries + 1;
const delayMs = 2500;
conversationBusyRetries = attempt;
emitToWS(socket, {
type: "retry",
reason: "error",
attempt,
max_attempts: MAX_CONVERSATION_BUSY_RETRIES,
delay_ms: delayMs,
session_id: runtime.sessionId,
uuid: `retry-${crypto.randomUUID()}`,
} as RetryMessage);
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
// rethrow unrecoverable errors
throw preStreamError;
}
}
}
export function resolvePendingApprovalResolver(
runtime: ListenerRuntime,
response: ControlResponseBody,
@@ -623,22 +752,20 @@ async function handleIncomingMessage(
) => void,
connectionId?: string,
): Promise<void> {
// Hoist identifiers and tracking state so they're available in catch for error-result
const agentId = msg.agentId;
const requestedConversationId = msg.conversationId || undefined;
const conversationId = requestedConversationId ?? "default";
const msgStartTime = performance.now();
let msgTurnCount = 0;
const msgRunIds: string[] = [];
try {
// Latch capability: once seen, always use blocking path (strict check to avoid truthy strings)
if (msg.supportsControlResponse === true) {
runtime.controlResponseCapable = true;
}
const agentId = msg.agentId;
// requestedConversationId can be:
// - undefined: no conversation (use agent endpoint)
// - null: no conversation (use agent endpoint)
// - string: specific conversation ID (use conversations endpoint)
const requestedConversationId = msg.conversationId || undefined;
// For sendMessageStream: "default" means use agent endpoint, else use conversations endpoint
const conversationId = requestedConversationId ?? "default";
if (!agentId) {
return;
}
@@ -654,6 +781,7 @@ async function handleIncomingMessage(
}
let messagesToSend: Array<MessageCreate | ApprovalCreate> = msg.messages;
let turnToolContextId: string | null = null;
const firstMessage = msg.messages[0];
const isApprovalMessage =
@@ -683,7 +811,11 @@ async function handleIncomingMessage(
resumeData.pendingApprovals,
);
const decisionResults =
decisions.length > 0 ? await executeApprovalBatch(decisions) : [];
decisions.length > 0
? await executeApprovalBatch(decisions, undefined, {
toolContextId: turnToolContextId ?? undefined,
})
: [];
const rebuiltApprovals: ApprovalResult[] = [];
let decisionResultIndex = 0;
@@ -717,33 +849,72 @@ async function handleIncomingMessage(
];
}
let stream = await sendMessageStream(conversationId, messagesToSend, {
agentId,
streamTokens: true,
background: true,
});
let stream = await sendMessageStreamWithRetry(
conversationId,
messagesToSend,
{ agentId, streamTokens: true, background: true },
socket,
runtime,
);
turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>,
);
let runIdSent = false;
let runId: string | undefined;
const buffers = createBuffers(agentId);
// Approval loop: continue until end_turn or error
// eslint-disable-next-line no-constant-condition
while (true) {
msgTurnCount++;
runIdSent = false;
const result = await drainStreamWithResume(
stream as Stream<LettaStreamingResponse>,
buffers,
() => {},
undefined,
undefined,
({ chunk }) => {
({ chunk, shouldOutput, errorInfo }) => {
const maybeRunId = (chunk as { run_id?: unknown }).run_id;
if (!runIdSent && typeof maybeRunId === "string") {
runIdSent = true;
sendClientMessage(socket, {
type: "run_started",
runId: maybeRunId,
if (typeof maybeRunId === "string") {
runId = maybeRunId;
if (!runIdSent) {
runIdSent = true;
msgRunIds.push(maybeRunId);
sendClientMessage(socket, {
type: "run_started",
runId: maybeRunId,
});
}
}
// Emit in-stream errors
if (errorInfo) {
emitToWS(socket, {
type: "error",
message: errorInfo.message || "Stream error",
stop_reason: (errorInfo.error_type as StopReasonType) || "error",
run_id: runId || errorInfo.run_id,
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
}
// Emit chunk as MessageWire for protocol consumers
if (shouldOutput) {
const chunkWithIds = chunk as typeof chunk & {
otid?: string;
id?: string;
};
emitToWS(socket, {
...chunk,
type: "message",
session_id: runtime.sessionId,
uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(),
} as MessageWire);
}
return undefined;
},
);
@@ -753,21 +924,64 @@ async function handleIncomingMessage(
// Case 1: Turn ended normally
if (stopReason === "end_turn") {
sendClientMessage(socket, {
type: "result",
success: true,
stopReason: "end_turn",
});
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "success",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: true,
stopReason: "end_turn",
});
}
break;
}
// Case 2: Error or cancelled
if (stopReason !== "requires_approval") {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason,
emitToWS(socket, {
type: "error",
message: `Unexpected stop reason: ${stopReason}`,
stop_reason: (stopReason as StopReasonType) || "error",
run_id: runId,
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "error",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: (stopReason as StopReasonType) || "error",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason,
});
}
break;
}
@@ -812,6 +1026,25 @@ async function handleIncomingMessage(
reason: string;
};
// Emit auto-approval events for auto-allowed tools
for (const ac of autoAllowed) {
emitToWS(socket, {
type: "auto_approval",
tool_call: {
name: ac.approval.toolName,
tool_call_id: ac.approval.toolCallId,
arguments: ac.approval.toolArgs,
},
reason: ac.permission.reason || "auto-approved",
matched_rule:
"matchedRule" in ac.permission && ac.permission.matchedRule
? ac.permission.matchedRule
: "auto-approved",
session_id: runtime.sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
} as AutoApprovalMessage);
}
const decisions: Decision[] = [
...autoAllowed.map((ac) => ({
type: "approve" as const,
@@ -877,6 +1110,20 @@ async function handleIncomingMessage(
}
: ac.approval;
decisions.push({ type: "approve", approval: finalApproval });
// Emit auto-approval event for WS-callback-approved tool
emitToWS(socket, {
type: "auto_approval",
tool_call: {
name: finalApproval.toolName,
tool_call_id: finalApproval.toolCallId,
arguments: finalApproval.toolArgs,
},
reason: "Approved via WebSocket",
matched_rule: "canUseTool callback",
session_id: runtime.sessionId,
uuid: `auto-approval-${ac.approval.toolCallId}`,
} as AutoApprovalMessage);
} else {
decisions.push({
type: "deny",
@@ -898,10 +1145,14 @@ async function handleIncomingMessage(
}
// Execute approved/denied tools
const executionResults = await executeApprovalBatch(decisions);
const executionResults = await executeApprovalBatch(
decisions,
undefined,
{ toolContextId: turnToolContextId ?? undefined },
);
// Create fresh approval stream for next iteration
stream = await sendMessageStream(
stream = await sendMessageStreamWithRetry(
conversationId,
[
{
@@ -909,19 +1160,46 @@ async function handleIncomingMessage(
approvals: executionResults,
},
],
{
agentId,
streamTokens: true,
background: true,
},
{ agentId, streamTokens: true, background: true },
socket,
runtime,
);
turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>,
);
}
} catch (error) {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "error",
const errorMessage = error instanceof Error ? error.message : String(error);
emitToWS(socket, {
type: "error",
message: errorMessage,
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "error",
agent_id: agentId || "",
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "error",
});
}
if (process.env.DEBUG) {
console.error("[Listen] Error handling message:", error);
@@ -952,4 +1230,5 @@ export function stopListenerClient(): void {
export const __listenClientTestUtils = {
createRuntime,
stopRuntime,
emitToWS,
};