feat(listen): recover pending approvals via websocket control (#1219)
This commit is contained in:
@@ -136,6 +136,12 @@ interface GetStatusMessage {
|
|||||||
type: "get_status";
|
type: "get_status";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface RecoverPendingApprovalsMessage {
|
||||||
|
type: "recover_pending_approvals";
|
||||||
|
agentId?: string;
|
||||||
|
conversationId?: string;
|
||||||
|
}
|
||||||
|
|
||||||
interface StatusResponseMessage {
|
interface StatusResponseMessage {
|
||||||
type: "status_response";
|
type: "status_response";
|
||||||
currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
|
currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
|
||||||
@@ -149,6 +155,7 @@ type ServerMessage =
|
|||||||
| IncomingMessage
|
| IncomingMessage
|
||||||
| ModeChangeMessage
|
| ModeChangeMessage
|
||||||
| GetStatusMessage
|
| GetStatusMessage
|
||||||
|
| RecoverPendingApprovalsMessage
|
||||||
| WsControlResponse;
|
| WsControlResponse;
|
||||||
type ClientMessage =
|
type ClientMessage =
|
||||||
| PingMessage
|
| PingMessage
|
||||||
@@ -185,6 +192,8 @@ type ListenerRuntime = {
|
|||||||
pendingTurns: number;
|
pendingTurns: number;
|
||||||
/** Optional debug hook for WS event logging. */
|
/** Optional debug hook for WS event logging. */
|
||||||
onWsEvent?: StartListenerOptions["onWsEvent"];
|
onWsEvent?: StartListenerOptions["onWsEvent"];
|
||||||
|
/** Prevent duplicate concurrent pending-approval recovery passes. */
|
||||||
|
isRecoveringApprovals: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
type ApprovalSlot =
|
type ApprovalSlot =
|
||||||
@@ -249,6 +258,7 @@ function createRuntime(): ListenerRuntime {
|
|||||||
sessionId: `listen-${crypto.randomUUID()}`,
|
sessionId: `listen-${crypto.randomUUID()}`,
|
||||||
lastStopReason: null,
|
lastStopReason: null,
|
||||||
isProcessing: false,
|
isProcessing: false,
|
||||||
|
isRecoveringApprovals: false,
|
||||||
pendingTurns: 0,
|
pendingTurns: 0,
|
||||||
// queueRuntime assigned below — needs runtime ref in callbacks
|
// queueRuntime assigned below — needs runtime ref in callbacks
|
||||||
queueRuntime: null as unknown as QueueRuntime,
|
queueRuntime: null as unknown as QueueRuntime,
|
||||||
@@ -375,7 +385,8 @@ export function parseServerMessage(
|
|||||||
parsed.type === "status" ||
|
parsed.type === "status" ||
|
||||||
parsed.type === "message" ||
|
parsed.type === "message" ||
|
||||||
parsed.type === "mode_change" ||
|
parsed.type === "mode_change" ||
|
||||||
parsed.type === "get_status"
|
parsed.type === "get_status" ||
|
||||||
|
parsed.type === "recover_pending_approvals"
|
||||||
) {
|
) {
|
||||||
return parsed as ServerMessage;
|
return parsed as ServerMessage;
|
||||||
}
|
}
|
||||||
@@ -673,6 +684,213 @@ function buildApprovalExecutionPlan(
|
|||||||
return { slots, decisions };
|
return { slots, decisions };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function recoverPendingApprovals(
|
||||||
|
runtime: ListenerRuntime,
|
||||||
|
socket: WebSocket,
|
||||||
|
msg: RecoverPendingApprovalsMessage,
|
||||||
|
): Promise<void> {
|
||||||
|
if (runtime.isProcessing || runtime.isRecoveringApprovals) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
runtime.isRecoveringApprovals = true;
|
||||||
|
try {
|
||||||
|
const agentId = msg.agentId;
|
||||||
|
if (!agentId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const requestedConversationId = msg.conversationId || undefined;
|
||||||
|
const conversationId = requestedConversationId ?? "default";
|
||||||
|
|
||||||
|
const client = await getClient();
|
||||||
|
const agent = await client.agents.retrieve(agentId);
|
||||||
|
|
||||||
|
let resumeData: Awaited<ReturnType<typeof getResumeData>>;
|
||||||
|
try {
|
||||||
|
resumeData = await getResumeData(client, agent, requestedConversationId, {
|
||||||
|
includeMessageHistory: false,
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
if (
|
||||||
|
error instanceof APIError &&
|
||||||
|
(error.status === 404 || error.status === 422)
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
const pendingApprovals = resumeData.pendingApprovals || [];
|
||||||
|
if (pendingApprovals.length === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
type Decision =
|
||||||
|
| {
|
||||||
|
type: "approve";
|
||||||
|
approval: {
|
||||||
|
toolCallId: string;
|
||||||
|
toolName: string;
|
||||||
|
toolArgs: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
| {
|
||||||
|
type: "deny";
|
||||||
|
approval: {
|
||||||
|
toolCallId: string;
|
||||||
|
toolName: string;
|
||||||
|
toolArgs: string;
|
||||||
|
};
|
||||||
|
reason: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
const { autoAllowed, autoDenied, needsUserInput } = await classifyApprovals(
|
||||||
|
pendingApprovals,
|
||||||
|
{
|
||||||
|
alwaysRequiresUserInput: isInteractiveApprovalTool,
|
||||||
|
treatAskAsDeny: false,
|
||||||
|
requireArgsForAutoApprove: true,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const ac of autoAllowed) {
|
||||||
|
emitToWS(socket, {
|
||||||
|
type: "auto_approval",
|
||||||
|
tool_call: {
|
||||||
|
name: ac.approval.toolName,
|
||||||
|
tool_call_id: ac.approval.toolCallId,
|
||||||
|
arguments: ac.approval.toolArgs,
|
||||||
|
},
|
||||||
|
reason: ac.permission.reason || "auto-approved",
|
||||||
|
matched_rule:
|
||||||
|
"matchedRule" in ac.permission && ac.permission.matchedRule
|
||||||
|
? ac.permission.matchedRule
|
||||||
|
: "auto-approved",
|
||||||
|
session_id: runtime.sessionId,
|
||||||
|
uuid: `auto-approval-${ac.approval.toolCallId}`,
|
||||||
|
} as AutoApprovalMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
const decisions: Decision[] = [
|
||||||
|
...autoAllowed.map((ac) => ({
|
||||||
|
type: "approve" as const,
|
||||||
|
approval: ac.approval,
|
||||||
|
})),
|
||||||
|
...autoDenied.map((ac) => ({
|
||||||
|
type: "deny" as const,
|
||||||
|
approval: ac.approval,
|
||||||
|
reason: ac.denyReason || ac.permission.reason || "Permission denied",
|
||||||
|
})),
|
||||||
|
];
|
||||||
|
|
||||||
|
if (needsUserInput.length > 0) {
|
||||||
|
if (!runtime.controlResponseCapable) {
|
||||||
|
runtime.lastStopReason = "requires_approval";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const ac of needsUserInput) {
|
||||||
|
const requestId = `perm-${ac.approval.toolCallId}`;
|
||||||
|
const diffs = await computeDiffPreviews(
|
||||||
|
ac.approval.toolName,
|
||||||
|
ac.parsedArgs,
|
||||||
|
);
|
||||||
|
|
||||||
|
const controlRequest: ControlRequest = {
|
||||||
|
type: "control_request",
|
||||||
|
request_id: requestId,
|
||||||
|
request: {
|
||||||
|
subtype: "can_use_tool",
|
||||||
|
tool_name: ac.approval.toolName,
|
||||||
|
input: ac.parsedArgs,
|
||||||
|
tool_call_id: ac.approval.toolCallId,
|
||||||
|
permission_suggestions: [],
|
||||||
|
blocked_path: null,
|
||||||
|
...(diffs.length > 0 ? { diffs } : {}),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const responseBody = await requestApprovalOverWS(
|
||||||
|
runtime,
|
||||||
|
socket,
|
||||||
|
requestId,
|
||||||
|
controlRequest,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (responseBody.subtype === "success") {
|
||||||
|
const response = responseBody.response as
|
||||||
|
| CanUseToolResponse
|
||||||
|
| undefined;
|
||||||
|
if (response?.behavior === "allow") {
|
||||||
|
const finalApproval = response.updatedInput
|
||||||
|
? {
|
||||||
|
...ac.approval,
|
||||||
|
toolArgs: JSON.stringify(response.updatedInput),
|
||||||
|
}
|
||||||
|
: ac.approval;
|
||||||
|
decisions.push({ type: "approve", approval: finalApproval });
|
||||||
|
|
||||||
|
emitToWS(socket, {
|
||||||
|
type: "auto_approval",
|
||||||
|
tool_call: {
|
||||||
|
name: finalApproval.toolName,
|
||||||
|
tool_call_id: finalApproval.toolCallId,
|
||||||
|
arguments: finalApproval.toolArgs,
|
||||||
|
},
|
||||||
|
reason: "Approved via WebSocket",
|
||||||
|
matched_rule: "canUseTool callback",
|
||||||
|
session_id: runtime.sessionId,
|
||||||
|
uuid: `auto-approval-${ac.approval.toolCallId}`,
|
||||||
|
} as AutoApprovalMessage);
|
||||||
|
} else {
|
||||||
|
decisions.push({
|
||||||
|
type: "deny",
|
||||||
|
approval: ac.approval,
|
||||||
|
reason: response?.message || "Denied via WebSocket",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
decisions.push({
|
||||||
|
type: "deny",
|
||||||
|
approval: ac.approval,
|
||||||
|
reason:
|
||||||
|
responseBody.subtype === "error"
|
||||||
|
? responseBody.error
|
||||||
|
: "Unknown error",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (decisions.length === 0) {
|
||||||
|
runtime.lastStopReason = "requires_approval";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const executionResults = await executeApprovalBatch(decisions);
|
||||||
|
|
||||||
|
await handleIncomingMessage(
|
||||||
|
{
|
||||||
|
type: "message",
|
||||||
|
agentId,
|
||||||
|
conversationId,
|
||||||
|
messages: [
|
||||||
|
{
|
||||||
|
type: "approval",
|
||||||
|
approvals: executionResults,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
supportsControlResponse: runtime.controlResponseCapable,
|
||||||
|
},
|
||||||
|
socket,
|
||||||
|
runtime,
|
||||||
|
);
|
||||||
|
} finally {
|
||||||
|
runtime.isRecoveringApprovals = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the listener WebSocket client with automatic retry.
|
* Start the listener WebSocket client with automatic retry.
|
||||||
*/
|
*/
|
||||||
@@ -843,6 +1061,53 @@ async function connectWithRetry(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (parsed.type === "recover_pending_approvals") {
|
||||||
|
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recovery requests are only sent by the modern cloud listener protocol.
|
||||||
|
runtime.controlResponseCapable = true;
|
||||||
|
|
||||||
|
// Serialize recovery with normal message handling to avoid concurrent
|
||||||
|
// handleIncomingMessage execution when user messages arrive concurrently.
|
||||||
|
runtime.pendingTurns++;
|
||||||
|
runtime.messageQueue = runtime.messageQueue
|
||||||
|
.then(async () => {
|
||||||
|
try {
|
||||||
|
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await recoverPendingApprovals(runtime, socket, parsed);
|
||||||
|
} catch (error) {
|
||||||
|
const errorMessage =
|
||||||
|
error instanceof Error ? error.message : String(error);
|
||||||
|
emitToWS(socket, {
|
||||||
|
type: "error",
|
||||||
|
message: `Pending approval recovery failed: ${errorMessage}`,
|
||||||
|
stop_reason: "error",
|
||||||
|
session_id: runtime.sessionId,
|
||||||
|
uuid: `error-${crypto.randomUUID()}`,
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
runtime.pendingTurns--;
|
||||||
|
if (runtime.pendingTurns === 0) {
|
||||||
|
runtime.queueRuntime.resetBlockedState();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch((error: unknown) => {
|
||||||
|
if (process.env.DEBUG) {
|
||||||
|
console.error(
|
||||||
|
"[Listen] Error handling queued pending approval recovery:",
|
||||||
|
error,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Handle incoming messages (queued for sequential processing)
|
// Handle incoming messages (queued for sequential processing)
|
||||||
if (parsed.type === "message") {
|
if (parsed.type === "message") {
|
||||||
// Queue lifecycle tracking: only enqueue if first payload is a
|
// Queue lifecycle tracking: only enqueue if first payload is a
|
||||||
|
|||||||
Reference in New Issue
Block a user