fix(listen): recover stale approval conflicts after stop-reason errors (#1285)
This commit is contained in:
@@ -585,3 +585,58 @@ describe("listen-client emitToWS adapter", () => {
|
||||
expect(runtime.sessionId.length).toBeGreaterThan(10);
|
||||
});
|
||||
});
|
||||
|
||||
describe("listen-client post-stop approval recovery policy", () => {
|
||||
test("retries when run detail indicates invalid tool call IDs", () => {
|
||||
const shouldRecover =
|
||||
__listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({
|
||||
stopReason: "error",
|
||||
runIdsSeen: 1,
|
||||
retries: 0,
|
||||
runErrorDetail:
|
||||
"Invalid tool call IDs: expected [toolu_abc], got [toolu_def]",
|
||||
latestErrorText: null,
|
||||
});
|
||||
|
||||
expect(shouldRecover).toBe(true);
|
||||
});
|
||||
|
||||
test("retries when run detail indicates approval pending", () => {
|
||||
const shouldRecover =
|
||||
__listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({
|
||||
stopReason: "error",
|
||||
runIdsSeen: 1,
|
||||
retries: 0,
|
||||
runErrorDetail: "Conversation is waiting for approval",
|
||||
latestErrorText: null,
|
||||
});
|
||||
|
||||
expect(shouldRecover).toBe(true);
|
||||
});
|
||||
|
||||
test("retries on generic no-run error heuristic", () => {
|
||||
const shouldRecover =
|
||||
__listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({
|
||||
stopReason: "error",
|
||||
runIdsSeen: 0,
|
||||
retries: 0,
|
||||
runErrorDetail: null,
|
||||
latestErrorText: null,
|
||||
});
|
||||
|
||||
expect(shouldRecover).toBe(true);
|
||||
});
|
||||
|
||||
test("does not retry once retry budget is exhausted", () => {
|
||||
const shouldRecover =
|
||||
__listenClientTestUtils.shouldAttemptPostStopApprovalRecovery({
|
||||
stopReason: "error",
|
||||
runIdsSeen: 0,
|
||||
retries: 2,
|
||||
runErrorDetail: null,
|
||||
latestErrorText: null,
|
||||
});
|
||||
|
||||
expect(shouldRecover).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -24,7 +24,11 @@ import { getStreamToolContextId, sendMessageStream } from "../agent/message";
|
||||
import {
|
||||
extractConflictDetail,
|
||||
getPreStreamErrorAction,
|
||||
isApprovalPendingError,
|
||||
isInvalidToolCallIdsError,
|
||||
parseRetryAfterHeaderMs,
|
||||
rebuildInputWithFreshDenials,
|
||||
shouldAttemptApprovalRecovery,
|
||||
} from "../agent/turn-recovery-policy";
|
||||
import { createBuffers } from "../cli/helpers/accumulator";
|
||||
import { classifyApprovals } from "../cli/helpers/approvalClassification";
|
||||
@@ -864,6 +868,37 @@ function emitToWS(socket: WebSocket, event: WsProtocolEvent): void {
|
||||
|
||||
const LLM_API_ERROR_MAX_RETRIES = 3;
|
||||
const MAX_PRE_STREAM_RECOVERY = 2;
|
||||
const MAX_POST_STOP_APPROVAL_RECOVERY = 2;
|
||||
|
||||
function shouldAttemptPostStopApprovalRecovery(params: {
|
||||
stopReason: string | null | undefined;
|
||||
runIdsSeen: number;
|
||||
retries: number;
|
||||
runErrorDetail: string | null;
|
||||
latestErrorText: string | null;
|
||||
}): boolean {
|
||||
const invalidToolCallIdsDetected =
|
||||
isInvalidToolCallIdsError(params.runErrorDetail) ||
|
||||
isInvalidToolCallIdsError(params.latestErrorText);
|
||||
const approvalPendingDetected =
|
||||
isApprovalPendingError(params.runErrorDetail) ||
|
||||
isApprovalPendingError(params.latestErrorText);
|
||||
|
||||
// Heuristic fallback:
|
||||
// If the stream stops with generic "error" before any run_id was emitted,
|
||||
// this is frequently a stale approval conflict after reconnect/interrupt.
|
||||
const genericNoRunError =
|
||||
params.stopReason === "error" && params.runIdsSeen === 0;
|
||||
|
||||
return shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected:
|
||||
invalidToolCallIdsDetected ||
|
||||
approvalPendingDetected ||
|
||||
genericNoRunError,
|
||||
retries: params.retries,
|
||||
maxRetries: MAX_POST_STOP_APPROVAL_RECOVERY,
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Interrupt queue helpers — extracted for testability.
|
||||
@@ -2248,6 +2283,7 @@ async function handleIncomingMessage(
|
||||
const msgStartTime = performance.now();
|
||||
let msgTurnCount = 0;
|
||||
const msgRunIds: string[] = [];
|
||||
let postStopApprovalRecoveryRetries = 0;
|
||||
|
||||
// Track last approval-loop state for cancel-time queueing (Phase 1.2).
|
||||
// Hoisted before try so the cancel catch block can access them.
|
||||
@@ -2374,9 +2410,11 @@ async function handleIncomingMessage(
|
||||
);
|
||||
}
|
||||
|
||||
let currentInput = messagesToSend;
|
||||
|
||||
let stream = await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
messagesToSend,
|
||||
currentInput,
|
||||
{ agentId, streamTokens: true, background: true },
|
||||
socket,
|
||||
runtime,
|
||||
@@ -2395,6 +2433,7 @@ async function handleIncomingMessage(
|
||||
while (true) {
|
||||
msgTurnCount++;
|
||||
runIdSent = false;
|
||||
let latestErrorText: string | null = null;
|
||||
const result = await drainStreamWithResume(
|
||||
stream as Stream<LettaStreamingResponse>,
|
||||
buffers,
|
||||
@@ -2421,6 +2460,7 @@ async function handleIncomingMessage(
|
||||
|
||||
// Emit in-stream errors
|
||||
if (errorInfo) {
|
||||
latestErrorText = errorInfo.message || latestErrorText;
|
||||
emitToWS(socket, {
|
||||
type: "error",
|
||||
message: errorInfo.message || "Stream error",
|
||||
@@ -2517,6 +2557,63 @@ async function handleIncomingMessage(
|
||||
|
||||
// Case 3: Error (or cancel-induced error)
|
||||
if (stopReason !== "requires_approval") {
|
||||
const errorDetail = await fetchRunErrorDetail(
|
||||
runId || runtime.activeRunId || msgRunIds[msgRunIds.length - 1],
|
||||
).catch(() => null);
|
||||
|
||||
if (
|
||||
!runtime.cancelRequested &&
|
||||
shouldAttemptPostStopApprovalRecovery({
|
||||
stopReason,
|
||||
runIdsSeen: msgRunIds.length,
|
||||
retries: postStopApprovalRecoveryRetries,
|
||||
runErrorDetail: errorDetail,
|
||||
latestErrorText,
|
||||
})
|
||||
) {
|
||||
postStopApprovalRecoveryRetries += 1;
|
||||
emitToWS(socket, {
|
||||
type: "recovery",
|
||||
recovery_type: "approval_pending",
|
||||
message:
|
||||
"Recovering from stale approval conflict after interrupted/reconnected turn",
|
||||
run_id: runId || msgRunIds[msgRunIds.length - 1] || undefined,
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `recovery-${crypto.randomUUID()}`,
|
||||
} as RecoveryMessage);
|
||||
|
||||
try {
|
||||
const client = await getClient();
|
||||
const agent = await client.agents.retrieve(agentId || "");
|
||||
const { pendingApprovals: existingApprovals } = await getResumeData(
|
||||
client,
|
||||
agent,
|
||||
requestedConversationId,
|
||||
);
|
||||
currentInput = rebuildInputWithFreshDenials(
|
||||
currentInput,
|
||||
existingApprovals ?? [],
|
||||
"Auto-denied: stale approval from interrupted session",
|
||||
);
|
||||
} catch {
|
||||
// Fetch failed — strip stale approval payload and retry plain message
|
||||
currentInput = rebuildInputWithFreshDenials(currentInput, [], "");
|
||||
}
|
||||
|
||||
stream = await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
currentInput,
|
||||
{ agentId, streamTokens: true, background: true },
|
||||
socket,
|
||||
runtime,
|
||||
runtime.activeAbortController.signal,
|
||||
);
|
||||
turnToolContextId = getStreamToolContextId(
|
||||
stream as Stream<LettaStreamingResponse>,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Cancel-induced errors should be treated as cancellation, not error.
|
||||
// This handles the race where cancel fires during stream drain and the
|
||||
// backend returns "error" instead of "cancelled".
|
||||
@@ -2562,8 +2659,6 @@ async function handleIncomingMessage(
|
||||
runtime.isProcessing = false;
|
||||
clearActiveRunState(runtime);
|
||||
|
||||
// Try to fetch richer error detail from the run metadata
|
||||
const errorDetail = await fetchRunErrorDetail(runId).catch(() => null);
|
||||
const errorMessage =
|
||||
errorDetail || `Unexpected stop reason: ${stopReason}`;
|
||||
|
||||
@@ -2808,14 +2903,15 @@ async function handleIncomingMessage(
|
||||
);
|
||||
|
||||
// Create fresh approval stream for next iteration
|
||||
currentInput = [
|
||||
{
|
||||
type: "approval",
|
||||
approvals: executionResults,
|
||||
},
|
||||
];
|
||||
stream = await sendMessageStreamWithRetry(
|
||||
conversationId,
|
||||
[
|
||||
{
|
||||
type: "approval",
|
||||
approvals: executionResults,
|
||||
},
|
||||
],
|
||||
currentInput,
|
||||
{ agentId, streamTokens: true, background: true },
|
||||
socket,
|
||||
runtime,
|
||||
@@ -2982,4 +3078,5 @@ export const __listenClientTestUtils = {
|
||||
extractInterruptToolReturns,
|
||||
emitInterruptToolReturnMessage,
|
||||
getInterruptApprovalsForEmission,
|
||||
shouldAttemptPostStopApprovalRecovery,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user