diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 831c082..0a60e05 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -136,6 +136,12 @@ interface GetStatusMessage { type: "get_status"; } +interface RecoverPendingApprovalsMessage { + type: "recover_pending_approvals"; + agentId?: string; + conversationId?: string; +} + interface StatusResponseMessage { type: "status_response"; currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; @@ -149,6 +155,7 @@ type ServerMessage = | IncomingMessage | ModeChangeMessage | GetStatusMessage + | RecoverPendingApprovalsMessage | WsControlResponse; type ClientMessage = | PingMessage @@ -185,6 +192,8 @@ type ListenerRuntime = { pendingTurns: number; /** Optional debug hook for WS event logging. */ onWsEvent?: StartListenerOptions["onWsEvent"]; + /** Prevent duplicate concurrent pending-approval recovery passes. */ + isRecoveringApprovals: boolean; }; type ApprovalSlot = @@ -249,6 +258,7 @@ function createRuntime(): ListenerRuntime { sessionId: `listen-${crypto.randomUUID()}`, lastStopReason: null, isProcessing: false, + isRecoveringApprovals: false, pendingTurns: 0, // queueRuntime assigned below — needs runtime ref in callbacks queueRuntime: null as unknown as QueueRuntime, @@ -375,7 +385,8 @@ export function parseServerMessage( parsed.type === "status" || parsed.type === "message" || parsed.type === "mode_change" || - parsed.type === "get_status" + parsed.type === "get_status" || + parsed.type === "recover_pending_approvals" ) { return parsed as ServerMessage; } @@ -673,6 +684,213 @@ function buildApprovalExecutionPlan( return { slots, decisions }; } +async function recoverPendingApprovals( + runtime: ListenerRuntime, + socket: WebSocket, + msg: RecoverPendingApprovalsMessage, +): Promise { + if (runtime.isProcessing || runtime.isRecoveringApprovals) { + return; + } + + runtime.isRecoveringApprovals = true; + try { + const agentId = msg.agentId; + if (!agentId) { + return; + } + + const requestedConversationId = msg.conversationId || undefined; + const conversationId = requestedConversationId ?? "default"; + + const client = await getClient(); + const agent = await client.agents.retrieve(agentId); + + let resumeData: Awaited>; + try { + resumeData = await getResumeData(client, agent, requestedConversationId, { + includeMessageHistory: false, + }); + } catch (error) { + if ( + error instanceof APIError && + (error.status === 404 || error.status === 422) + ) { + return; + } + throw error; + } + + const pendingApprovals = resumeData.pendingApprovals || []; + if (pendingApprovals.length === 0) { + return; + } + + type Decision = + | { + type: "approve"; + approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + }; + } + | { + type: "deny"; + approval: { + toolCallId: string; + toolName: string; + toolArgs: string; + }; + reason: string; + }; + + const { autoAllowed, autoDenied, needsUserInput } = await classifyApprovals( + pendingApprovals, + { + alwaysRequiresUserInput: isInteractiveApprovalTool, + treatAskAsDeny: false, + requireArgsForAutoApprove: true, + }, + ); + + for (const ac of autoAllowed) { + emitToWS(socket, { + type: "auto_approval", + tool_call: { + name: ac.approval.toolName, + tool_call_id: ac.approval.toolCallId, + arguments: ac.approval.toolArgs, + }, + reason: ac.permission.reason || "auto-approved", + matched_rule: + "matchedRule" in ac.permission && ac.permission.matchedRule + ? ac.permission.matchedRule + : "auto-approved", + session_id: runtime.sessionId, + uuid: `auto-approval-${ac.approval.toolCallId}`, + } as AutoApprovalMessage); + } + + const decisions: Decision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + })), + ...autoDenied.map((ac) => ({ + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason || ac.permission.reason || "Permission denied", + })), + ]; + + if (needsUserInput.length > 0) { + if (!runtime.controlResponseCapable) { + runtime.lastStopReason = "requires_approval"; + return; + } + + for (const ac of needsUserInput) { + const requestId = `perm-${ac.approval.toolCallId}`; + const diffs = await computeDiffPreviews( + ac.approval.toolName, + ac.parsedArgs, + ); + + const controlRequest: ControlRequest = { + type: "control_request", + request_id: requestId, + request: { + subtype: "can_use_tool", + tool_name: ac.approval.toolName, + input: ac.parsedArgs, + tool_call_id: ac.approval.toolCallId, + permission_suggestions: [], + blocked_path: null, + ...(diffs.length > 0 ? { diffs } : {}), + }, + }; + + const responseBody = await requestApprovalOverWS( + runtime, + socket, + requestId, + controlRequest, + ); + + if (responseBody.subtype === "success") { + const response = responseBody.response as + | CanUseToolResponse + | undefined; + if (response?.behavior === "allow") { + const finalApproval = response.updatedInput + ? { + ...ac.approval, + toolArgs: JSON.stringify(response.updatedInput), + } + : ac.approval; + decisions.push({ type: "approve", approval: finalApproval }); + + emitToWS(socket, { + type: "auto_approval", + tool_call: { + name: finalApproval.toolName, + tool_call_id: finalApproval.toolCallId, + arguments: finalApproval.toolArgs, + }, + reason: "Approved via WebSocket", + matched_rule: "canUseTool callback", + session_id: runtime.sessionId, + uuid: `auto-approval-${ac.approval.toolCallId}`, + } as AutoApprovalMessage); + } else { + decisions.push({ + type: "deny", + approval: ac.approval, + reason: response?.message || "Denied via WebSocket", + }); + } + } else { + decisions.push({ + type: "deny", + approval: ac.approval, + reason: + responseBody.subtype === "error" + ? responseBody.error + : "Unknown error", + }); + } + } + } + + if (decisions.length === 0) { + runtime.lastStopReason = "requires_approval"; + return; + } + + const executionResults = await executeApprovalBatch(decisions); + + await handleIncomingMessage( + { + type: "message", + agentId, + conversationId, + messages: [ + { + type: "approval", + approvals: executionResults, + }, + ], + supportsControlResponse: runtime.controlResponseCapable, + }, + socket, + runtime, + ); + } finally { + runtime.isRecoveringApprovals = false; + } +} + /** * Start the listener WebSocket client with automatic retry. */ @@ -843,6 +1061,53 @@ async function connectWithRetry( return; } + if (parsed.type === "recover_pending_approvals") { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + // Recovery requests are only sent by the modern cloud listener protocol. + runtime.controlResponseCapable = true; + + // Serialize recovery with normal message handling to avoid concurrent + // handleIncomingMessage execution when user messages arrive concurrently. + runtime.pendingTurns++; + runtime.messageQueue = runtime.messageQueue + .then(async () => { + try { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + await recoverPendingApprovals(runtime, socket, parsed); + } catch (error) { + const errorMessage = + error instanceof Error ? error.message : String(error); + emitToWS(socket, { + type: "error", + message: `Pending approval recovery failed: ${errorMessage}`, + stop_reason: "error", + session_id: runtime.sessionId, + uuid: `error-${crypto.randomUUID()}`, + }); + } finally { + runtime.pendingTurns--; + if (runtime.pendingTurns === 0) { + runtime.queueRuntime.resetBlockedState(); + } + } + }) + .catch((error: unknown) => { + if (process.env.DEBUG) { + console.error( + "[Listen] Error handling queued pending approval recovery:", + error, + ); + } + }); + return; + } + // Handle incoming messages (queued for sequential processing) if (parsed.type === "message") { // Queue lifecycle tracking: only enqueue if first payload is a