fix: resolve CONFLICT error after interrupt during tool execution (#949)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
|
||||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import { getClient } from "./client";
|
||||
|
||||
// Error when approval tool call IDs don't match what server expects
|
||||
@@ -131,3 +133,70 @@ export async function fetchRunErrorDetail(
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract error detail string from a pre-stream APIError's nested body.
|
||||
*
|
||||
* Handles the common SDK error shapes:
|
||||
* - Nested: `e.error.error.detail` → `e.error.error.message`
|
||||
* - Direct: `e.error.detail` → `e.error.message`
|
||||
* - Error: `e.message`
|
||||
*
|
||||
* Checks `detail` first (specific) then `message` (generic) at each level.
|
||||
*/
|
||||
export function extractConflictDetail(error: unknown): string {
|
||||
if (error && typeof error === "object" && "error" in error) {
|
||||
const errObj = (error as Record<string, unknown>).error;
|
||||
if (errObj && typeof errObj === "object") {
|
||||
const outer = errObj as Record<string, unknown>;
|
||||
// Nested: e.error.error.detail → e.error.error.message
|
||||
if (outer.error && typeof outer.error === "object") {
|
||||
const nested = outer.error as Record<string, unknown>;
|
||||
if (typeof nested.detail === "string") return nested.detail;
|
||||
if (typeof nested.message === "string") return nested.message;
|
||||
}
|
||||
// Direct: e.error.detail → e.error.message
|
||||
if (typeof outer.detail === "string") return outer.detail;
|
||||
if (typeof outer.message === "string") return outer.message;
|
||||
}
|
||||
}
|
||||
if (error instanceof Error) return error.message;
|
||||
return "";
|
||||
}
|
||||
|
||||
interface PendingApprovalInfo {
|
||||
toolCallId: string;
|
||||
toolName: string;
|
||||
toolArgs: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip stale approval payloads from the message input array and optionally
|
||||
* prepend fresh denial results for the actual pending approvals from the server.
|
||||
*
|
||||
* Used during approval-conflict recovery: the original payload may contain
|
||||
* queued approvals from an interrupt that the backend already rejected. This
|
||||
* replaces them with denials for the real pending approvals.
|
||||
*/
|
||||
export function rebuildInputWithFreshDenials(
|
||||
currentInput: Array<MessageCreate | ApprovalCreate>,
|
||||
serverApprovals: PendingApprovalInfo[],
|
||||
denialReason: string,
|
||||
): Array<MessageCreate | ApprovalCreate> {
|
||||
const stripped = currentInput.filter((item) => item?.type !== "approval");
|
||||
|
||||
if (serverApprovals.length > 0) {
|
||||
const denials: ApprovalCreate = {
|
||||
type: "approval",
|
||||
approvals: serverApprovals.map((a) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: a.toolCallId,
|
||||
approve: false,
|
||||
reason: denialReason,
|
||||
})),
|
||||
};
|
||||
return [denials, ...stripped];
|
||||
}
|
||||
|
||||
return stripped;
|
||||
}
|
||||
|
||||
119
src/cli/App.tsx
119
src/cli/App.tsx
@@ -29,10 +29,12 @@ import {
|
||||
getDisplayableToolReturn,
|
||||
} from "../agent/approval-execution";
|
||||
import {
|
||||
extractConflictDetail,
|
||||
fetchRunErrorDetail,
|
||||
getPreStreamErrorAction,
|
||||
isApprovalPendingError,
|
||||
isConversationBusyError,
|
||||
isInvalidToolCallIdsError,
|
||||
rebuildInputWithFreshDenials,
|
||||
} from "../agent/approval-recovery";
|
||||
import { prefetchAvailableModelHandles } from "../agent/available-models";
|
||||
import { getResumeData } from "../agent/check-approval";
|
||||
@@ -3141,40 +3143,48 @@ export default function App({
|
||||
{ agentId: agentIdRef.current },
|
||||
);
|
||||
} catch (preStreamError) {
|
||||
// Extract error detail from APIError (handles both direct and nested structures)
|
||||
// Direct: e.error.detail | Nested: e.error.error.detail (matches formatErrorDetails)
|
||||
let errorDetail = "";
|
||||
// Extract error detail using shared helper (handles nested/direct/message shapes)
|
||||
const errorDetail = extractConflictDetail(preStreamError);
|
||||
|
||||
// Route through shared pre-stream conflict classifier (parity with headless.ts)
|
||||
const preStreamAction = getPreStreamErrorAction(
|
||||
errorDetail,
|
||||
conversationBusyRetriesRef.current,
|
||||
CONVERSATION_BUSY_MAX_RETRIES,
|
||||
);
|
||||
|
||||
// Resolve stale approval conflict: fetch real pending approvals, auto-deny, retry.
|
||||
// Shares llmApiErrorRetriesRef budget with LLM transient-error retries (max 3 per turn).
|
||||
// Resets on each processConversation entry and on success.
|
||||
if (
|
||||
preStreamError instanceof APIError &&
|
||||
preStreamError.error &&
|
||||
typeof preStreamError.error === "object"
|
||||
preStreamAction === "resolve_approval_pending" &&
|
||||
llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
|
||||
) {
|
||||
const errObj = preStreamError.error as Record<string, unknown>;
|
||||
// Check nested structure first: e.error.error.detail
|
||||
if (
|
||||
errObj.error &&
|
||||
typeof errObj.error === "object" &&
|
||||
"detail" in errObj.error
|
||||
) {
|
||||
const nested = errObj.error as Record<string, unknown>;
|
||||
errorDetail =
|
||||
typeof nested.detail === "string" ? nested.detail : "";
|
||||
llmApiErrorRetriesRef.current += 1;
|
||||
try {
|
||||
const client = await getClient();
|
||||
const agent = await client.agents.retrieve(agentIdRef.current);
|
||||
const { pendingApprovals: existingApprovals } =
|
||||
await getResumeData(client, agent, conversationIdRef.current);
|
||||
currentInput = rebuildInputWithFreshDenials(
|
||||
currentInput,
|
||||
existingApprovals ?? [],
|
||||
"Auto-denied: stale approval from interrupted session",
|
||||
);
|
||||
} catch {
|
||||
// Fetch failed — strip stale payload and retry plain message
|
||||
currentInput = rebuildInputWithFreshDenials(
|
||||
currentInput,
|
||||
[],
|
||||
"",
|
||||
);
|
||||
}
|
||||
// Fallback to direct structure: e.error.detail
|
||||
if (!errorDetail && typeof errObj.detail === "string") {
|
||||
errorDetail = errObj.detail;
|
||||
}
|
||||
}
|
||||
// Final fallback: use Error.message
|
||||
if (!errorDetail && preStreamError instanceof Error) {
|
||||
errorDetail = preStreamError.message;
|
||||
buffersRef.current.interrupted = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check for 409 "conversation busy" error - retry once with delay
|
||||
if (
|
||||
isConversationBusyError(errorDetail) &&
|
||||
conversationBusyRetriesRef.current < CONVERSATION_BUSY_MAX_RETRIES
|
||||
) {
|
||||
// Check for 409 "conversation busy" error - retry with exponential backoff
|
||||
if (preStreamAction === "retry_conversation_busy") {
|
||||
conversationBusyRetriesRef.current += 1;
|
||||
const retryDelayMs =
|
||||
CONVERSATION_BUSY_RETRY_BASE_DELAY_MS *
|
||||
@@ -4296,14 +4306,15 @@ export default function App({
|
||||
}
|
||||
}
|
||||
|
||||
// Check for approval pending error (sent user message while approval waiting)
|
||||
// This is the lazy recovery path for when needsEagerApprovalCheck is false
|
||||
// Check for approval pending error (sent user message while approval waiting).
|
||||
// This is the lazy recovery path: fetch real pending approvals, auto-deny, retry.
|
||||
// Works regardless of hasApprovalInPayload — stale queued approvals from an
|
||||
// interrupt may have been rejected by the backend.
|
||||
const approvalPendingDetected =
|
||||
isApprovalPendingError(detailFromRun) ||
|
||||
isApprovalPendingError(latestErrorText);
|
||||
|
||||
if (
|
||||
!hasApprovalInPayload &&
|
||||
approvalPendingDetected &&
|
||||
llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
|
||||
) {
|
||||
@@ -4315,28 +4326,14 @@ export default function App({
|
||||
const agent = await client.agents.retrieve(agentIdRef.current);
|
||||
const { pendingApprovals: existingApprovals } =
|
||||
await getResumeData(client, agent, conversationIdRef.current);
|
||||
|
||||
if (existingApprovals && existingApprovals.length > 0) {
|
||||
// Create denial results for all stale approvals
|
||||
// Use the same format as handleCancelApprovals (lines 6390-6395)
|
||||
const denialResults = existingApprovals.map((approval) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: approval.toolCallId,
|
||||
approve: false,
|
||||
reason:
|
||||
"Auto-denied: stale approval from interrupted session",
|
||||
}));
|
||||
|
||||
// Prepend approval denials to the current input (keeps user message)
|
||||
const approvalPayload: ApprovalCreate = {
|
||||
type: "approval",
|
||||
approvals: denialResults,
|
||||
};
|
||||
currentInput.unshift(approvalPayload);
|
||||
}
|
||||
// else: No approvals found - server state may have cleared, just retry
|
||||
currentInput = rebuildInputWithFreshDenials(
|
||||
currentInput,
|
||||
existingApprovals ?? [],
|
||||
"Auto-denied: stale approval from interrupted session",
|
||||
);
|
||||
} catch {
|
||||
// If we can't fetch approvals, just retry the original message
|
||||
// Fetch failed — strip stale payload and retry plain message
|
||||
currentInput = rebuildInputWithFreshDenials(currentInput, [], "");
|
||||
}
|
||||
|
||||
// Reset interrupted flag so retry stream chunks are processed
|
||||
@@ -4770,6 +4767,20 @@ export default function App({
|
||||
toolResultsInFlightRef.current = false;
|
||||
refreshDerived();
|
||||
|
||||
// Send cancel request to backend (fire-and-forget).
|
||||
// Without this, the backend stays in requires_approval state after tool interrupt,
|
||||
// causing CONFLICT on the next user message.
|
||||
getClient()
|
||||
.then((client) => {
|
||||
if (conversationIdRef.current === "default") {
|
||||
return client.agents.messages.cancel(agentIdRef.current);
|
||||
}
|
||||
return client.conversations.cancel(conversationIdRef.current);
|
||||
})
|
||||
.catch(() => {
|
||||
// Silently ignore - cancellation already happened client-side
|
||||
});
|
||||
|
||||
// Delay flag reset to ensure React has flushed state updates before dequeue can fire.
|
||||
// Use setTimeout(50) instead of setTimeout(0) - the longer delay ensures React's
|
||||
// batched state updates have been fully processed before we allow the dequeue effect.
|
||||
|
||||
@@ -2,10 +2,12 @@ import { describe, expect, test } from "bun:test";
|
||||
import type { Message } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import {
|
||||
classifyPreStreamConflict,
|
||||
extractConflictDetail,
|
||||
getPreStreamErrorAction,
|
||||
isApprovalPendingError,
|
||||
isConversationBusyError,
|
||||
isInvalidToolCallIdsError,
|
||||
rebuildInputWithFreshDenials,
|
||||
} from "../agent/approval-recovery";
|
||||
import { extractApprovals } from "../agent/check-approval";
|
||||
|
||||
@@ -350,6 +352,208 @@ describe("extractApprovals", () => {
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Tests for extractConflictDetail - shared error-text extraction from APIError shapes.
|
||||
* Replaces duplicated inline extraction in App.tsx and headless.ts.
|
||||
*/
|
||||
describe("extractConflictDetail", () => {
|
||||
test("extracts detail from nested error shape (e.error.error.detail)", () => {
|
||||
// This is the exact error shape from the interrupt→CONFLICT screenshot
|
||||
const error = {
|
||||
error: {
|
||||
error: {
|
||||
message_type: "error_message",
|
||||
error_type: "internal_error",
|
||||
message: "An unknown error occurred with the LLM streaming request.",
|
||||
detail:
|
||||
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call. Please approve or deny the pending request before continuing.",
|
||||
},
|
||||
run_id: "run-d056a979-dd6e-481c-86c1-9ca1da8f618c",
|
||||
},
|
||||
};
|
||||
const detail = extractConflictDetail(error);
|
||||
expect(detail).toBe(
|
||||
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call. Please approve or deny the pending request before continuing.",
|
||||
);
|
||||
// Extracted detail should route correctly through the classifier
|
||||
expect(isApprovalPendingError(detail)).toBe(true);
|
||||
});
|
||||
|
||||
test("falls back to nested message when detail is missing", () => {
|
||||
const error = {
|
||||
error: {
|
||||
error: {
|
||||
message: "An unknown error with approval waiting for approval",
|
||||
},
|
||||
},
|
||||
};
|
||||
expect(extractConflictDetail(error)).toBe(
|
||||
"An unknown error with approval waiting for approval",
|
||||
);
|
||||
});
|
||||
|
||||
test("extracts from flat shape (e.error.detail)", () => {
|
||||
const error = {
|
||||
error: {
|
||||
detail:
|
||||
"CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation.",
|
||||
},
|
||||
};
|
||||
const detail = extractConflictDetail(error);
|
||||
expect(isConversationBusyError(detail)).toBe(true);
|
||||
});
|
||||
|
||||
test("extracts from flat shape (e.error.message) when detail is missing", () => {
|
||||
const error = {
|
||||
error: { message: "Agent is waiting for approval on tool call" },
|
||||
};
|
||||
const detail = extractConflictDetail(error);
|
||||
expect(detail).toBe("Agent is waiting for approval on tool call");
|
||||
});
|
||||
|
||||
test("falls back to Error.message", () => {
|
||||
const error = new Error("Connection refused");
|
||||
expect(extractConflictDetail(error)).toBe("Connection refused");
|
||||
});
|
||||
|
||||
test("returns empty string for non-error input", () => {
|
||||
expect(extractConflictDetail(null)).toBe("");
|
||||
expect(extractConflictDetail("string")).toBe("");
|
||||
expect(extractConflictDetail(42)).toBe("");
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Tests for rebuildInputWithFreshDenials - strips stale approval payloads
|
||||
* from the currentInput array and optionally prepends fresh denial results.
|
||||
*/
|
||||
describe("rebuildInputWithFreshDenials", () => {
|
||||
test("strips stale approval and prepends fresh denials", () => {
|
||||
const input = [
|
||||
{
|
||||
type: "approval" as const,
|
||||
approvals: [
|
||||
{
|
||||
type: "tool" as const,
|
||||
tool_call_id: "stale-id",
|
||||
tool_return: "Interrupted by user",
|
||||
status: "error" as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
{ type: "message" as const, role: "user" as const, content: "hello" },
|
||||
];
|
||||
const serverApprovals = [
|
||||
{ toolCallId: "real-id", toolName: "Read", toolArgs: "{}" },
|
||||
];
|
||||
|
||||
const result = rebuildInputWithFreshDenials(
|
||||
input,
|
||||
serverApprovals,
|
||||
"Auto-denied",
|
||||
);
|
||||
|
||||
// Stale approval stripped, fresh denial prepended, message preserved
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0]!.type).toBe("approval");
|
||||
const approvalPayload = result[0] as {
|
||||
type: "approval";
|
||||
approvals: Array<{
|
||||
tool_call_id: string;
|
||||
approve: boolean;
|
||||
reason: string;
|
||||
}>;
|
||||
};
|
||||
expect(approvalPayload.approvals[0]!.tool_call_id).toBe("real-id");
|
||||
expect(approvalPayload.approvals[0]!.approve).toBe(false);
|
||||
expect(approvalPayload.approvals[0]!.reason).toBe("Auto-denied");
|
||||
expect(result[1]!.type).toBe("message");
|
||||
});
|
||||
|
||||
test("strips stale approval with no server approvals (clean retry)", () => {
|
||||
const input = [
|
||||
{ type: "approval" as const, approvals: [] },
|
||||
{ type: "message" as const, role: "user" as const, content: "hello" },
|
||||
];
|
||||
|
||||
const result = rebuildInputWithFreshDenials(input, [], "");
|
||||
|
||||
// Only message remains
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]!.type).toBe("message");
|
||||
});
|
||||
|
||||
test("prepends fresh denials when no stale approvals to strip", () => {
|
||||
const input = [
|
||||
{ type: "message" as const, role: "user" as const, content: "hello" },
|
||||
];
|
||||
const serverApprovals = [
|
||||
{ toolCallId: "real-id", toolName: "Read", toolArgs: "{}" },
|
||||
];
|
||||
|
||||
const result = rebuildInputWithFreshDenials(
|
||||
input,
|
||||
serverApprovals,
|
||||
"Auto-denied",
|
||||
);
|
||||
|
||||
// Fresh denial prepended, message preserved
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0]!.type).toBe("approval");
|
||||
expect(result[1]!.type).toBe("message");
|
||||
});
|
||||
|
||||
test("handles multiple stale approvals and multiple server approvals", () => {
|
||||
const input = [
|
||||
{
|
||||
type: "approval" as const,
|
||||
approvals: [
|
||||
{
|
||||
type: "approval" as const,
|
||||
tool_call_id: "old-1",
|
||||
approve: false,
|
||||
reason: "stale",
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
type: "approval" as const,
|
||||
approvals: [
|
||||
{
|
||||
type: "approval" as const,
|
||||
tool_call_id: "old-2",
|
||||
approve: false,
|
||||
reason: "stale",
|
||||
},
|
||||
],
|
||||
},
|
||||
{ type: "message" as const, role: "user" as const, content: "hello" },
|
||||
];
|
||||
const serverApprovals = [
|
||||
{ toolCallId: "new-1", toolName: "Bash", toolArgs: "{}" },
|
||||
{ toolCallId: "new-2", toolName: "Write", toolArgs: "{}" },
|
||||
];
|
||||
|
||||
const result = rebuildInputWithFreshDenials(
|
||||
input,
|
||||
serverApprovals,
|
||||
"denied",
|
||||
);
|
||||
|
||||
// Both stale approvals stripped, one fresh denial payload prepended, message kept
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0]!.type).toBe("approval");
|
||||
const approvalPayload = result[0] as {
|
||||
type: "approval";
|
||||
approvals: Array<{ tool_call_id: string }>;
|
||||
};
|
||||
expect(approvalPayload.approvals).toHaveLength(2);
|
||||
expect(approvalPayload.approvals[0]!.tool_call_id).toBe("new-1");
|
||||
expect(approvalPayload.approvals[1]!.tool_call_id).toBe("new-2");
|
||||
expect(result[1]!.type).toBe("message");
|
||||
});
|
||||
});
|
||||
|
||||
/**
|
||||
* Note: Full integration testing of lazy approval recovery requires:
|
||||
* 1. Starting CLI without --yolo
|
||||
|
||||
64
src/tests/cli/approval-recovery-wiring.test.ts
Normal file
64
src/tests/cli/approval-recovery-wiring.test.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
describe("approval recovery wiring", () => {
|
||||
test("pre-stream catch uses shared recovery router and stale input rebuild", () => {
|
||||
const appPath = fileURLToPath(
|
||||
new URL("../../cli/App.tsx", import.meta.url),
|
||||
);
|
||||
const source = readFileSync(appPath, "utf-8");
|
||||
|
||||
const start = source.indexOf("} catch (preStreamError) {");
|
||||
const end = source.indexOf(
|
||||
"// Check again after network call - user may have pressed Escape during sendMessageStream",
|
||||
);
|
||||
|
||||
expect(start).toBeGreaterThan(-1);
|
||||
expect(end).toBeGreaterThan(start);
|
||||
|
||||
const segment = source.slice(start, end);
|
||||
|
||||
expect(segment).toContain("extractConflictDetail(preStreamError)");
|
||||
expect(segment).toContain("getPreStreamErrorAction(");
|
||||
expect(segment).toContain('preStreamAction === "resolve_approval_pending"');
|
||||
expect(segment).toContain("rebuildInputWithFreshDenials(");
|
||||
});
|
||||
|
||||
test("lazy recovery is not gated by hasApprovalInPayload", () => {
|
||||
const appPath = fileURLToPath(
|
||||
new URL("../../cli/App.tsx", import.meta.url),
|
||||
);
|
||||
const source = readFileSync(appPath, "utf-8");
|
||||
|
||||
const start = source.indexOf("const approvalPendingDetected =");
|
||||
const end = source.indexOf("// Check if this is a retriable error");
|
||||
|
||||
expect(start).toBeGreaterThan(-1);
|
||||
expect(end).toBeGreaterThan(start);
|
||||
|
||||
const segment = source.slice(start, end);
|
||||
|
||||
expect(segment).toContain("approvalPendingDetected &&");
|
||||
expect(segment).not.toContain("!hasApprovalInPayload &&");
|
||||
});
|
||||
|
||||
test("tool interrupt branch includes backend cancel call before early return", () => {
|
||||
const appPath = fileURLToPath(
|
||||
new URL("../../cli/App.tsx", import.meta.url),
|
||||
);
|
||||
const source = readFileSync(appPath, "utf-8");
|
||||
|
||||
const start = source.indexOf("if (\n isExecutingTool");
|
||||
const end = source.indexOf("if (!streaming || interruptRequested)");
|
||||
|
||||
expect(start).toBeGreaterThan(-1);
|
||||
expect(end).toBeGreaterThan(start);
|
||||
|
||||
const segment = source.slice(start, end);
|
||||
|
||||
expect(segment).toContain("getClient()");
|
||||
expect(segment).toContain("client.agents.messages.cancel");
|
||||
expect(segment).toContain("client.conversations.cancel");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user