From 924ae8e8bcbc14f4613d58ed71e624ebb29ad24a Mon Sep 17 00:00:00 2001 From: Shubham Naik Date: Fri, 20 Feb 2026 12:30:20 -0800 Subject: [PATCH] =?UTF-8?q?feat:=20add=20approval=20loop=20and=20improved?= =?UTF-8?q?=20conversationId=20handling=20for=20list=E2=80=A6=20(#1057)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Letta --- src/websocket/listen-client.ts | 301 +++++++++++++++++++++++++++------ 1 file changed, 251 insertions(+), 50 deletions(-) diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 3700fc0..8701244 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -20,7 +20,10 @@ import { getResumeData } from "../agent/check-approval"; import { getClient } from "../agent/client"; import { sendMessageStream } from "../agent/message"; import { createBuffers } from "../cli/helpers/accumulator"; +import { classifyApprovals } from "../cli/helpers/approvalClassification"; +import { generatePlanFilePath } from "../cli/helpers/planName"; import { drainStreamWithResume } from "../cli/helpers/stream"; +import { permissionMode } from "../permissions/mode"; import { settingsManager } from "../settings-manager"; import { loadTools } from "../tools/manager"; @@ -70,8 +73,24 @@ interface RunStartedMessage { runId: string; } -type ServerMessage = PongMessage | IncomingMessage; -type ClientMessage = PingMessage | ResultMessage | RunStartedMessage; +interface ModeChangeMessage { + type: "mode_change"; + mode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; +} + +interface ModeChangedMessage { + type: "mode_changed"; + mode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; + success: boolean; + error?: string; +} + +type ServerMessage = PongMessage | IncomingMessage | ModeChangeMessage; +type ClientMessage = + | PingMessage + | ResultMessage + | RunStartedMessage + | ModeChangedMessage; type ListenerRuntime = { socket: WebSocket | null; @@ -89,6 +108,44 @@ type ApprovalSlot = // Listen mode supports one active connection per process. let activeRuntime: ListenerRuntime | null = null; +/** + * Handle mode change request from cloud + */ +function handleModeChange(msg: ModeChangeMessage, socket: WebSocket): void { + try { + permissionMode.setMode(msg.mode); + + // If entering plan mode, generate and set plan file path + if (msg.mode === "plan" && !permissionMode.getPlanFilePath()) { + const planFilePath = generatePlanFilePath(); + permissionMode.setPlanFilePath(planFilePath); + } + + // Send success acknowledgment + sendClientMessage(socket, { + type: "mode_changed", + mode: msg.mode, + success: true, + }); + + if (process.env.DEBUG) { + console.log(`[Listen] Mode changed to: ${msg.mode}`); + } + } catch (error) { + // Send failure acknowledgment + sendClientMessage(socket, { + type: "mode_changed", + mode: msg.mode, + success: false, + error: error instanceof Error ? error.message : "Mode change failed", + }); + + if (process.env.DEBUG) { + console.error("[Listen] Mode change failed:", error); + } + } +} + const MAX_RETRY_DURATION_MS = 5 * 60 * 1000; // 5 minutes const INITIAL_RETRY_DELAY_MS = 1000; // 1 second const MAX_RETRY_DELAY_MS = 30000; // 30 seconds @@ -146,7 +203,11 @@ function parseServerMessage(data: WebSocket.RawData): ServerMessage | null { try { const raw = typeof data === "string" ? data : data.toString(); const parsed = JSON.parse(raw) as { type?: string }; - if (parsed.type === "pong" || parsed.type === "message") { + if ( + parsed.type === "pong" || + parsed.type === "message" || + parsed.type === "mode_change" + ) { return parsed as ServerMessage; } return null; @@ -338,6 +399,13 @@ async function connectWithRetry( runtime.hasSuccessfulConnection = true; opts.onConnected(); + // Send current mode state to cloud for UI sync + sendClientMessage(socket, { + type: "mode_changed", + mode: permissionMode.getMode(), + success: true, + }); + runtime.heartbeatInterval = setInterval(() => { sendClientMessage(socket, { type: "ping" }); }, 30000); @@ -345,31 +413,40 @@ async function connectWithRetry( socket.on("message", (data: WebSocket.RawData) => { const parsed = parseServerMessage(data); - if (!parsed || parsed.type !== "message") { + if (!parsed) { return; } - runtime.messageQueue = runtime.messageQueue - .then(async () => { - if (runtime !== activeRuntime || runtime.intentionallyClosed) { - return; - } + // Handle mode change messages immediately (not queued) + if (parsed.type === "mode_change") { + handleModeChange(parsed, socket); + return; + } - opts.onStatusChange?.("receiving", opts.connectionId); - await handleIncomingMessage( - parsed, - socket, - opts.onStatusChange, - opts.connectionId, - ); - opts.onStatusChange?.("idle", opts.connectionId); - }) - .catch((error: unknown) => { - if (process.env.DEBUG) { - console.error("[Listen] Error handling queued message:", error); - } - opts.onStatusChange?.("idle", opts.connectionId); - }); + // Handle incoming messages (queued for sequential processing) + if (parsed.type === "message") { + runtime.messageQueue = runtime.messageQueue + .then(async () => { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + opts.onStatusChange?.("receiving", opts.connectionId); + await handleIncomingMessage( + parsed, + socket, + opts.onStatusChange, + opts.connectionId, + ); + opts.onStatusChange?.("idle", opts.connectionId); + }) + .catch((error: unknown) => { + if (process.env.DEBUG) { + console.error("[Listen] Error handling queued message:", error); + } + opts.onStatusChange?.("idle", opts.connectionId); + }); + } }); socket.on("close", (code: number, reason: Buffer) => { @@ -427,13 +504,25 @@ async function handleIncomingMessage( ): Promise { try { const agentId = msg.agentId; - const requestedConversationId = msg.conversationId; + // requestedConversationId can be: + // - undefined: no conversation (use agent endpoint) + // - null: no conversation (use agent endpoint) + // - string: specific conversation ID (use conversations endpoint) + const requestedConversationId = msg.conversationId || undefined; + + // For sendMessageStream: "default" means use agent endpoint, else use conversations endpoint const conversationId = requestedConversationId ?? "default"; if (!agentId) { return; } + if (process.env.DEBUG) { + console.log( + `[Listen] Handling message: agentId=${agentId}, requestedConversationId=${requestedConversationId}, conversationId=${conversationId}`, + ); + } + if (connectionId) { onStatusChange?.("processing", connectionId); } @@ -503,38 +592,150 @@ async function handleIncomingMessage( }); let runIdSent = false; - const buffers = createBuffers(agentId); - const result = await drainStreamWithResume( - stream as Stream, - buffers, - () => {}, - undefined, - undefined, - ({ chunk }) => { - const maybeRunId = (chunk as { run_id?: unknown }).run_id; - if (!runIdSent && typeof maybeRunId === "string") { - runIdSent = true; - sendClientMessage(socket, { - type: "run_started", - runId: maybeRunId, - }); - } - return undefined; - }, - ); - sendClientMessage(socket, { - type: "result", - success: result.stopReason === "end_turn", - stopReason: result.stopReason, - }); - } catch { + // Approval loop: continue until end_turn or error + // eslint-disable-next-line no-constant-condition + while (true) { + const result = await drainStreamWithResume( + stream as Stream, + buffers, + () => {}, + undefined, + undefined, + ({ chunk }) => { + const maybeRunId = (chunk as { run_id?: unknown }).run_id; + if (!runIdSent && typeof maybeRunId === "string") { + runIdSent = true; + sendClientMessage(socket, { + type: "run_started", + runId: maybeRunId, + }); + } + return undefined; + }, + ); + + const stopReason = result.stopReason; + const approvals = result.approvals || []; + + // Case 1: Turn ended normally + if (stopReason === "end_turn") { + sendClientMessage(socket, { + type: "result", + success: true, + stopReason: "end_turn", + }); + break; + } + + // Case 2: Error or cancelled + if (stopReason !== "requires_approval") { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason, + }); + break; + } + + // Case 3: Requires approval - classify and handle based on permission mode + if (approvals.length === 0) { + // Unexpected: requires_approval but no approvals + sendClientMessage(socket, { + type: "result", + success: false, + stopReason: "error", + }); + break; + } + + // Classify approvals (auto-allow, auto-deny, needs user input) + // Don't treat "ask" as deny - cloud UI can handle approvals + const { autoAllowed, autoDenied, needsUserInput } = + await classifyApprovals(approvals, { + treatAskAsDeny: false, // Let cloud UI handle approvals + requireArgsForAutoApprove: true, + }); + + // If there are approvals that need user input, pause execution + // Cloud UI will see pending approvals via /v1/runs/:runId/stream from core + // and show approval dialog. When user approves, cloud sends approval message + // back to this device, which resumes execution. + if (needsUserInput.length > 0) { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason: "requires_approval", + }); + break; // Exit loop - cloud will send approval message when user approves + } + + // Only auto-allowed and auto-denied tools remain + // Build decisions list + type Decision = + | { + type: "approve"; + approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + }; + } + | { + type: "deny"; + approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + }; + reason: string; + }; + + const decisions: Decision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + })), + ...autoDenied.map((ac) => ({ + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason || ac.permission.reason || "Permission denied", + })), + ]; + + // Execute approved/denied tools + const executionResults = await executeApprovalBatch(decisions); + + // Send approval message back to agent to continue execution + const approvalStream = await sendMessageStream( + conversationId, + [ + { + type: "approval", + approvals: executionResults, + }, + ], + { + agentId, + streamTokens: true, + background: true, + }, + ); + + // Replace stream with approval stream for next iteration + Object.assign(stream, approvalStream); + } + } catch (error) { sendClientMessage(socket, { type: "result", success: false, stopReason: "error", }); + + if (process.env.DEBUG) { + console.error("[Listen] Error handling message:", error); + } } }