From c0ecdb16d09a101aa58e758d59ba0c567e9693c9 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Thu, 5 Mar 2026 17:33:46 -0800 Subject: [PATCH] fix(listen): recover stale approval conflicts after stop-reason errors (#1285) --- .../websocket/listen-client-protocol.test.ts | 55 +++++++++ src/websocket/listen-client.ts | 115 ++++++++++++++++-- 2 files changed, 161 insertions(+), 9 deletions(-) diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index a022cf4..2677373 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -585,3 +585,58 @@ describe("listen-client emitToWS adapter", () => { expect(runtime.sessionId.length).toBeGreaterThan(10); }); }); + +describe("listen-client post-stop approval recovery policy", () => { + test("retries when run detail indicates invalid tool call IDs", () => { + const shouldRecover = + __listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({ + stopReason: "error", + runIdsSeen: 1, + retries: 0, + runErrorDetail: + "Invalid tool call IDs: expected [toolu_abc], got [toolu_def]", + latestErrorText: null, + }); + + expect(shouldRecover).toBe(true); + }); + + test("retries when run detail indicates approval pending", () => { + const shouldRecover = + __listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({ + stopReason: "error", + runIdsSeen: 1, + retries: 0, + runErrorDetail: "Conversation is waiting for approval", + latestErrorText: null, + }); + + expect(shouldRecover).toBe(true); + }); + + test("retries on generic no-run error heuristic", () => { + const shouldRecover = + __listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({ + stopReason: "error", + runIdsSeen: 0, + retries: 0, + runErrorDetail: null, + latestErrorText: null, + }); + + expect(shouldRecover).toBe(true); + }); + + test("does not retry once retry budget is exhausted", () => { + const shouldRecover = + __listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({ + stopReason: "error", + runIdsSeen: 0, + retries: 2, + runErrorDetail: null, + latestErrorText: null, + }); + + expect(shouldRecover).toBe(false); + }); +}); diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index bce59a8..774e079 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -24,7 +24,11 @@ import { getStreamToolContextId, sendMessageStream } from "../agent/message"; import { extractConflictDetail, getPreStreamErrorAction, + isApprovalPendingError, + isInvalidToolCallIdsError, parseRetryAfterHeaderMs, + rebuildInputWithFreshDenials, + shouldAttemptApprovalRecovery, } from "../agent/turn-recovery-policy"; import { createBuffers } from "../cli/helpers/accumulator"; import { classifyApprovals } from "../cli/helpers/approvalClassification"; @@ -864,6 +868,37 @@ function emitToWS(socket: WebSocket, event: WsProtocolEvent): void { const LLM_API_ERROR_MAX_RETRIES = 3; const MAX_PRE_STREAM_RECOVERY = 2; +const MAX_POST_STOP_APPROVAL_RECOVERY = 2; + +function shouldAttemptPostStopApprovalRecovery(params: { + stopReason: string | null | undefined; + runIdsSeen: number; + retries: number; + runErrorDetail: string | null; + latestErrorText: string | null; +}): boolean { + const invalidToolCallIdsDetected = + isInvalidToolCallIdsError(params.runErrorDetail) || + isInvalidToolCallIdsError(params.latestErrorText); + const approvalPendingDetected = + isApprovalPendingError(params.runErrorDetail) || + isApprovalPendingError(params.latestErrorText); + + // Heuristic fallback: + // If the stream stops with generic "error" before any run_id was emitted, + // this is frequently a stale approval conflict after reconnect/interrupt. + const genericNoRunError = + params.stopReason === "error" && params.runIdsSeen === 0; + + return shouldAttemptApprovalRecovery({ + approvalPendingDetected: + invalidToolCallIdsDetected || + approvalPendingDetected || + genericNoRunError, + retries: params.retries, + maxRetries: MAX_POST_STOP_APPROVAL_RECOVERY, + }); +} // --------------------------------------------------------------------------- // Interrupt queue helpers — extracted for testability. @@ -2248,6 +2283,7 @@ async function handleIncomingMessage( const msgStartTime = performance.now(); let msgTurnCount = 0; const msgRunIds: string[] = []; + let postStopApprovalRecoveryRetries = 0; // Track last approval-loop state for cancel-time queueing (Phase 1.2). // Hoisted before try so the cancel catch block can access them. @@ -2374,9 +2410,11 @@ async function handleIncomingMessage( ); } + let currentInput = messagesToSend; + let stream = await sendMessageStreamWithRetry( conversationId, - messagesToSend, + currentInput, { agentId, streamTokens: true, background: true }, socket, runtime, @@ -2395,6 +2433,7 @@ async function handleIncomingMessage( while (true) { msgTurnCount++; runIdSent = false; + let latestErrorText: string | null = null; const result = await drainStreamWithResume( stream as Stream, buffers, @@ -2421,6 +2460,7 @@ async function handleIncomingMessage( // Emit in-stream errors if (errorInfo) { + latestErrorText = errorInfo.message || latestErrorText; emitToWS(socket, { type: "error", message: errorInfo.message || "Stream error", @@ -2517,6 +2557,63 @@ async function handleIncomingMessage( // Case 3: Error (or cancel-induced error) if (stopReason !== "requires_approval") { + const errorDetail = await fetchRunErrorDetail( + runId || runtime.activeRunId || msgRunIds[msgRunIds.length - 1], + ).catch(() => null); + + if ( + !runtime.cancelRequested && + shouldAttemptPostStopApprovalRecovery({ + stopReason, + runIdsSeen: msgRunIds.length, + retries: postStopApprovalRecoveryRetries, + runErrorDetail: errorDetail, + latestErrorText, + }) + ) { + postStopApprovalRecoveryRetries += 1; + emitToWS(socket, { + type: "recovery", + recovery_type: "approval_pending", + message: + "Recovering from stale approval conflict after interrupted/reconnected turn", + run_id: runId || msgRunIds[msgRunIds.length - 1] || undefined, + session_id: runtime.sessionId, + uuid: `recovery-${crypto.randomUUID()}`, + } as RecoveryMessage); + + try { + const client = await getClient(); + const agent = await client.agents.retrieve(agentId || ""); + const { pendingApprovals: existingApprovals } = await getResumeData( + client, + agent, + requestedConversationId, + ); + currentInput = rebuildInputWithFreshDenials( + currentInput, + existingApprovals ?? [], + "Auto-denied: stale approval from interrupted session", + ); + } catch { + // Fetch failed — strip stale approval payload and retry plain message + currentInput = rebuildInputWithFreshDenials(currentInput, [], ""); + } + + stream = await sendMessageStreamWithRetry( + conversationId, + currentInput, + { agentId, streamTokens: true, background: true }, + socket, + runtime, + runtime.activeAbortController.signal, + ); + turnToolContextId = getStreamToolContextId( + stream as Stream, + ); + continue; + } + // Cancel-induced errors should be treated as cancellation, not error. // This handles the race where cancel fires during stream drain and the // backend returns "error" instead of "cancelled". @@ -2562,8 +2659,6 @@ async function handleIncomingMessage( runtime.isProcessing = false; clearActiveRunState(runtime); - // Try to fetch richer error detail from the run metadata - const errorDetail = await fetchRunErrorDetail(runId).catch(() => null); const errorMessage = errorDetail || `Unexpected stop reason: ${stopReason}`; @@ -2808,14 +2903,15 @@ async function handleIncomingMessage( ); // Create fresh approval stream for next iteration + currentInput = [ + { + type: "approval", + approvals: executionResults, + }, + ]; stream = await sendMessageStreamWithRetry( conversationId, - [ - { - type: "approval", - approvals: executionResults, - }, - ], + currentInput, { agentId, streamTokens: true, background: true }, socket, runtime, @@ -2982,4 +3078,5 @@ export const __listenClientTestUtils = { extractInterruptToolReturns, emitInterruptToolReturnMessage, getInterruptApprovalsForEmission, + shouldAttemptPostStopApprovalRecovery, };