From 6ddd54592b593634108357dc4b72e29639cf6054 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Wed, 25 Feb 2026 21:52:44 -0800 Subject: [PATCH] feat: ws emissions parity (#1158) Co-authored-by: Letta --- .../websocket/listen-client-protocol.test.ts | 136 +++++++ src/websocket/listen-client.ts | 365 +++++++++++++++--- 2 files changed, 458 insertions(+), 43 deletions(-) diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index 74a7842..6114c98 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -376,3 +376,139 @@ describe("listen-client capability-gated approval flow", () => { rejectPendingApprovalResolvers(runtime, "test cleanup"); }); }); + +describe("listen-client emitToWS adapter", () => { + test("sends event when socket is OPEN", () => { + const socket = new MockSocket(WebSocket.OPEN); + const event = { + type: "error" as const, + message: "test error", + stop_reason: "error" as const, + session_id: "listen-test", + uuid: "test-uuid", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event); + + expect(socket.sentPayloads).toHaveLength(1); + const sent = JSON.parse(socket.sentPayloads[0] as string); + expect(sent.type).toBe("error"); + expect(sent.message).toBe("test error"); + expect(sent.session_id).toBe("listen-test"); + }); + + test("does not send when socket is CLOSED", () => { + const socket = new MockSocket(WebSocket.CLOSED); + const event = { + type: "error" as const, + message: "test error", + stop_reason: "error" as const, + session_id: "listen-test", + uuid: "test-uuid", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event); + + expect(socket.sentPayloads).toHaveLength(0); + }); + + test("emits RecoveryMessage with recovery_type", () => { + const socket = new MockSocket(WebSocket.OPEN); + const event: Parameters[1] = { + type: "recovery", + recovery_type: "approval_pending", + message: "Detected pending approval conflict", + session_id: "listen-abc", + uuid: "recovery-123", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event); + + expect(socket.sentPayloads).toHaveLength(1); + const sent = JSON.parse(socket.sentPayloads[0] as string); + expect(sent.type).toBe("recovery"); + expect(sent.recovery_type).toBe("approval_pending"); + expect(sent.session_id).toBe("listen-abc"); + }); + + test("emits AutoApprovalMessage with tool_call shape", () => { + const socket = new MockSocket(WebSocket.OPEN); + const event = { + type: "auto_approval" as const, + tool_call: { + name: "Write", + tool_call_id: "call-123", + arguments: '{"file_path": "/test.ts"}', + }, + reason: "auto-approved", + matched_rule: "auto-approved", + session_id: "listen-test", + uuid: "auto-approval-call-123", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event); + + expect(socket.sentPayloads).toHaveLength(1); + const sent = JSON.parse(socket.sentPayloads[0] as string); + expect(sent.type).toBe("auto_approval"); + expect(sent.tool_call.name).toBe("Write"); + expect(sent.tool_call.tool_call_id).toBe("call-123"); + }); + + test("emits RetryMessage with attempt/delay details", () => { + const socket = new MockSocket(WebSocket.OPEN); + const event = { + type: "retry" as const, + reason: "llm_api_error" as const, + attempt: 1, + max_attempts: 3, + delay_ms: 1000, + session_id: "listen-test", + uuid: "retry-123", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event); + + expect(socket.sentPayloads).toHaveLength(1); + const sent = JSON.parse(socket.sentPayloads[0] as string); + expect(sent.type).toBe("retry"); + expect(sent.attempt).toBe(1); + expect(sent.max_attempts).toBe(3); + expect(sent.delay_ms).toBe(1000); + }); + + test("emits rich ResultMessage with full metadata", () => { + const socket = new MockSocket(WebSocket.OPEN); + const event = { + type: "result" as const, + subtype: "success" as const, + agent_id: "agent-123", + conversation_id: "conv-456", + duration_ms: 1500, + duration_api_ms: 0, + num_turns: 2, + result: null, + run_ids: ["run-1", "run-2"], + usage: null, + session_id: "listen-test", + uuid: "result-123", + }; + + __listenClientTestUtils.emitToWS(socket as unknown as WebSocket, event); + + expect(socket.sentPayloads).toHaveLength(1); + const sent = JSON.parse(socket.sentPayloads[0] as string); + expect(sent.type).toBe("result"); + expect(sent.subtype).toBe("success"); + expect(sent.agent_id).toBe("agent-123"); + expect(sent.num_turns).toBe(2); + expect(sent.run_ids).toEqual(["run-1", "run-2"]); + }); + + test("runtime sessionId is stable and uses listen- prefix", () => { + const runtime = __listenClientTestUtils.createRuntime(); + expect(runtime.sessionId).toMatch(/^listen-/); + // Verify it's a UUID format after the prefix + expect(runtime.sessionId.length).toBeGreaterThan(10); + }); +}); diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 84842a3..fb90034 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -3,6 +3,7 @@ * Connects to Letta Cloud and receives messages to execute locally */ +import { APIError } from "@letta-ai/letta-client/core/error"; import type { Stream } from "@letta-ai/letta-client/core/streaming"; import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; import type { @@ -18,7 +19,12 @@ import { } from "../agent/approval-execution"; import { getResumeData } from "../agent/check-approval"; import { getClient } from "../agent/client"; -import { sendMessageStream } from "../agent/message"; +import { getStreamToolContextId, sendMessageStream } from "../agent/message"; +import { + extractConflictDetail, + getPreStreamErrorAction, + parseRetryAfterHeaderMs, +} from "../agent/turn-recovery-policy"; import { createBuffers } from "../cli/helpers/accumulator"; import { classifyApprovals } from "../cli/helpers/approvalClassification"; import { generatePlanFilePath } from "../cli/helpers/planName"; @@ -29,9 +35,16 @@ import { settingsManager } from "../settings-manager"; import { isInteractiveApprovalTool } from "../tools/interactivePolicy"; import { loadTools } from "../tools/manager"; import type { + AutoApprovalMessage, CanUseToolResponse, ControlRequest, ControlResponseBody, + ErrorMessage, + MessageWire, + ResultMessage as ProtocolResultMessage, + RecoveryMessage, + RetryMessage, + StopReasonType, } from "../types/protocol"; interface StartListenerOptions { @@ -125,6 +138,8 @@ type ListenerRuntime = { pendingApprovalResolvers: Map; /** Latched once supportsControlResponse is seen on any message. */ controlResponseCapable: boolean; + /** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */ + sessionId: string; }; type ApprovalSlot = @@ -186,6 +201,7 @@ function createRuntime(): ListenerRuntime { messageQueue: Promise.resolve(), pendingApprovalResolvers: new Map(), controlResponseCapable: false, + sessionId: `listen-${crypto.randomUUID()}`, }; } @@ -285,6 +301,119 @@ function sendControlMessageOverWebSocket( socket.send(JSON.stringify(payload)); } +// ── Typed protocol event adapter ──────────────────────────────── + +type WsProtocolEvent = + | MessageWire + | AutoApprovalMessage + | ErrorMessage + | RetryMessage + | RecoveryMessage + | ProtocolResultMessage; + +/** + * Single adapter for all outbound typed protocol events. + * Passthrough for now — provides a seam for future filtering/versioning/redacting. + */ +function emitToWS(socket: WebSocket, event: WsProtocolEvent): void { + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify(event)); + } +} + +const LLM_API_ERROR_MAX_RETRIES = 3; + +/** + * Wrap sendMessageStream with pre-stream error handling (retry/recovery). + * Mirrors headless bidirectional mode's pre-stream error handling. + */ +async function sendMessageStreamWithRetry( + conversationId: string, + messages: Parameters[1], + opts: Parameters[2], + socket: WebSocket, + runtime: ListenerRuntime, +): Promise>> { + let transientRetries = 0; + let conversationBusyRetries = 0; + const MAX_CONVERSATION_BUSY_RETRIES = 1; + + // eslint-disable-next-line no-constant-condition + while (true) { + try { + return await sendMessageStream(conversationId, messages, opts); + } catch (preStreamError) { + const errorDetail = extractConflictDetail(preStreamError); + const action = getPreStreamErrorAction( + errorDetail, + conversationBusyRetries, + MAX_CONVERSATION_BUSY_RETRIES, + { + status: + preStreamError instanceof APIError + ? preStreamError.status + : undefined, + transientRetries, + maxTransientRetries: LLM_API_ERROR_MAX_RETRIES, + }, + ); + + if (action === "resolve_approval_pending") { + // Listener can't auto-resolve pending approvals like headless does. + // Rethrow — the cloud will resend with the approval. + throw preStreamError; + } + + if (action === "retry_transient") { + const attempt = transientRetries + 1; + const retryAfterMs = + preStreamError instanceof APIError + ? parseRetryAfterHeaderMs( + preStreamError.headers?.get("retry-after"), + ) + : null; + const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1); + transientRetries = attempt; + + emitToWS(socket, { + type: "retry", + reason: "llm_api_error", + attempt, + max_attempts: LLM_API_ERROR_MAX_RETRIES, + delay_ms: delayMs, + session_id: runtime.sessionId, + uuid: `retry-${crypto.randomUUID()}`, + } as RetryMessage); + + await new Promise((resolve) => setTimeout(resolve, delayMs)); + continue; + } + + if (action === "retry_conversation_busy") { + const attempt = conversationBusyRetries + 1; + const delayMs = 2500; + conversationBusyRetries = attempt; + + emitToWS(socket, { + type: "retry", + reason: "error", + attempt, + max_attempts: MAX_CONVERSATION_BUSY_RETRIES, + delay_ms: delayMs, + session_id: runtime.sessionId, + uuid: `retry-${crypto.randomUUID()}`, + } as RetryMessage); + + await new Promise((resolve) => setTimeout(resolve, delayMs)); + continue; + } + + // rethrow unrecoverable errors + throw preStreamError; + } + } +} + export function resolvePendingApprovalResolver( runtime: ListenerRuntime, response: ControlResponseBody, @@ -623,22 +752,20 @@ async function handleIncomingMessage( ) => void, connectionId?: string, ): Promise { + // Hoist identifiers and tracking state so they're available in catch for error-result + const agentId = msg.agentId; + const requestedConversationId = msg.conversationId || undefined; + const conversationId = requestedConversationId ?? "default"; + const msgStartTime = performance.now(); + let msgTurnCount = 0; + const msgRunIds: string[] = []; + try { // Latch capability: once seen, always use blocking path (strict check to avoid truthy strings) if (msg.supportsControlResponse === true) { runtime.controlResponseCapable = true; } - const agentId = msg.agentId; - // requestedConversationId can be: - // - undefined: no conversation (use agent endpoint) - // - null: no conversation (use agent endpoint) - // - string: specific conversation ID (use conversations endpoint) - const requestedConversationId = msg.conversationId || undefined; - - // For sendMessageStream: "default" means use agent endpoint, else use conversations endpoint - const conversationId = requestedConversationId ?? "default"; - if (!agentId) { return; } @@ -654,6 +781,7 @@ async function handleIncomingMessage( } let messagesToSend: Array = msg.messages; + let turnToolContextId: string | null = null; const firstMessage = msg.messages[0]; const isApprovalMessage = @@ -683,7 +811,11 @@ async function handleIncomingMessage( resumeData.pendingApprovals, ); const decisionResults = - decisions.length > 0 ? await executeApprovalBatch(decisions) : []; + decisions.length > 0 + ? await executeApprovalBatch(decisions, undefined, { + toolContextId: turnToolContextId ?? undefined, + }) + : []; const rebuiltApprovals: ApprovalResult[] = []; let decisionResultIndex = 0; @@ -717,33 +849,72 @@ async function handleIncomingMessage( ]; } - let stream = await sendMessageStream(conversationId, messagesToSend, { - agentId, - streamTokens: true, - background: true, - }); + let stream = await sendMessageStreamWithRetry( + conversationId, + messagesToSend, + { agentId, streamTokens: true, background: true }, + socket, + runtime, + ); + turnToolContextId = getStreamToolContextId( + stream as Stream, + ); let runIdSent = false; + let runId: string | undefined; const buffers = createBuffers(agentId); // Approval loop: continue until end_turn or error // eslint-disable-next-line no-constant-condition while (true) { + msgTurnCount++; + runIdSent = false; const result = await drainStreamWithResume( stream as Stream, buffers, () => {}, undefined, undefined, - ({ chunk }) => { + ({ chunk, shouldOutput, errorInfo }) => { const maybeRunId = (chunk as { run_id?: unknown }).run_id; - if (!runIdSent && typeof maybeRunId === "string") { - runIdSent = true; - sendClientMessage(socket, { - type: "run_started", - runId: maybeRunId, + if (typeof maybeRunId === "string") { + runId = maybeRunId; + if (!runIdSent) { + runIdSent = true; + msgRunIds.push(maybeRunId); + sendClientMessage(socket, { + type: "run_started", + runId: maybeRunId, + }); + } + } + + // Emit in-stream errors + if (errorInfo) { + emitToWS(socket, { + type: "error", + message: errorInfo.message || "Stream error", + stop_reason: (errorInfo.error_type as StopReasonType) || "error", + run_id: runId || errorInfo.run_id, + session_id: runtime.sessionId, + uuid: `error-${crypto.randomUUID()}`, }); } + + // Emit chunk as MessageWire for protocol consumers + if (shouldOutput) { + const chunkWithIds = chunk as typeof chunk & { + otid?: string; + id?: string; + }; + emitToWS(socket, { + ...chunk, + type: "message", + session_id: runtime.sessionId, + uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), + } as MessageWire); + } + return undefined; }, ); @@ -753,21 +924,64 @@ async function handleIncomingMessage( // Case 1: Turn ended normally if (stopReason === "end_turn") { - sendClientMessage(socket, { - type: "result", - success: true, - stopReason: "end_turn", - }); + if (runtime.controlResponseCapable) { + emitToWS(socket, { + type: "result", + subtype: "success", + agent_id: agentId, + conversation_id: conversationId, + duration_ms: performance.now() - msgStartTime, + duration_api_ms: 0, + num_turns: msgTurnCount, + result: null, + run_ids: msgRunIds, + usage: null, + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); + } else { + sendClientMessage(socket, { + type: "result", + success: true, + stopReason: "end_turn", + }); + } break; } // Case 2: Error or cancelled if (stopReason !== "requires_approval") { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason, + emitToWS(socket, { + type: "error", + message: `Unexpected stop reason: ${stopReason}`, + stop_reason: (stopReason as StopReasonType) || "error", + run_id: runId, + session_id: runtime.sessionId, + uuid: `error-${crypto.randomUUID()}`, }); + if (runtime.controlResponseCapable) { + emitToWS(socket, { + type: "result", + subtype: "error", + agent_id: agentId, + conversation_id: conversationId, + duration_ms: performance.now() - msgStartTime, + duration_api_ms: 0, + num_turns: msgTurnCount, + result: null, + run_ids: msgRunIds, + usage: null, + stop_reason: (stopReason as StopReasonType) || "error", + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); + } else { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason, + }); + } break; } @@ -812,6 +1026,25 @@ async function handleIncomingMessage( reason: string; }; + // Emit auto-approval events for auto-allowed tools + for (const ac of autoAllowed) { + emitToWS(socket, { + type: "auto_approval", + tool_call: { + name: ac.approval.toolName, + tool_call_id: ac.approval.toolCallId, + arguments: ac.approval.toolArgs, + }, + reason: ac.permission.reason || "auto-approved", + matched_rule: + "matchedRule" in ac.permission && ac.permission.matchedRule + ? ac.permission.matchedRule + : "auto-approved", + session_id: runtime.sessionId, + uuid: `auto-approval-${ac.approval.toolCallId}`, + } as AutoApprovalMessage); + } + const decisions: Decision[] = [ ...autoAllowed.map((ac) => ({ type: "approve" as const, @@ -877,6 +1110,20 @@ async function handleIncomingMessage( } : ac.approval; decisions.push({ type: "approve", approval: finalApproval }); + + // Emit auto-approval event for WS-callback-approved tool + emitToWS(socket, { + type: "auto_approval", + tool_call: { + name: finalApproval.toolName, + tool_call_id: finalApproval.toolCallId, + arguments: finalApproval.toolArgs, + }, + reason: "Approved via WebSocket", + matched_rule: "canUseTool callback", + session_id: runtime.sessionId, + uuid: `auto-approval-${ac.approval.toolCallId}`, + } as AutoApprovalMessage); } else { decisions.push({ type: "deny", @@ -898,10 +1145,14 @@ async function handleIncomingMessage( } // Execute approved/denied tools - const executionResults = await executeApprovalBatch(decisions); + const executionResults = await executeApprovalBatch( + decisions, + undefined, + { toolContextId: turnToolContextId ?? undefined }, + ); // Create fresh approval stream for next iteration - stream = await sendMessageStream( + stream = await sendMessageStreamWithRetry( conversationId, [ { @@ -909,19 +1160,46 @@ async function handleIncomingMessage( approvals: executionResults, }, ], - { - agentId, - streamTokens: true, - background: true, - }, + { agentId, streamTokens: true, background: true }, + socket, + runtime, + ); + turnToolContextId = getStreamToolContextId( + stream as Stream, ); } } catch (error) { - sendClientMessage(socket, { - type: "result", - success: false, - stopReason: "error", + const errorMessage = error instanceof Error ? error.message : String(error); + emitToWS(socket, { + type: "error", + message: errorMessage, + stop_reason: "error", + session_id: runtime.sessionId, + uuid: `error-${crypto.randomUUID()}`, }); + if (runtime.controlResponseCapable) { + emitToWS(socket, { + type: "result", + subtype: "error", + agent_id: agentId || "", + conversation_id: conversationId, + duration_ms: performance.now() - msgStartTime, + duration_api_ms: 0, + num_turns: msgTurnCount, + result: null, + run_ids: msgRunIds, + usage: null, + stop_reason: "error", + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); + } else { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason: "error", + }); + } if (process.env.DEBUG) { console.error("[Listen] Error handling message:", error); @@ -952,4 +1230,5 @@ export function stopListenerClient(): void { export const __listenClientTestUtils = { createRuntime, stopRuntime, + emitToWS, };