refactor(cli): unify turn recovery policy between TUI and headless (#950)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -1,21 +1,31 @@
|
||||
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
|
||||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
/**
|
||||
* Approval recovery helpers.
|
||||
*
|
||||
* Pure policy logic lives in `./turn-recovery-policy.ts` and is re-exported
|
||||
* here for backward compatibility. This module keeps only the async/side-effect
|
||||
* helper (`fetchRunErrorDetail`) that requires network access.
|
||||
*/
|
||||
|
||||
import { getClient } from "./client";
|
||||
|
||||
// Error when approval tool call IDs don't match what server expects
|
||||
// Format: "Invalid tool call IDs: Expected [...], got [...]"
|
||||
// This is a specific subtype of desync - server HAS approvals but with different IDs
|
||||
const INVALID_TOOL_CALL_IDS_FRAGMENT = "invalid tool call ids";
|
||||
export type {
|
||||
PendingApprovalInfo,
|
||||
PreStreamConflictKind,
|
||||
PreStreamErrorAction,
|
||||
} from "./turn-recovery-policy";
|
||||
// ── Re-export pure policy helpers (single source of truth) ──────────
|
||||
export {
|
||||
classifyPreStreamConflict,
|
||||
extractConflictDetail,
|
||||
getPreStreamErrorAction,
|
||||
isApprovalPendingError,
|
||||
isConversationBusyError,
|
||||
isInvalidToolCallIdsError,
|
||||
rebuildInputWithFreshDenials,
|
||||
shouldAttemptApprovalRecovery,
|
||||
} from "./turn-recovery-policy";
|
||||
|
||||
// Error when trying to SEND message but server has pending approval waiting.
|
||||
// Use an approval-specific fragment to avoid matching conversation-busy errors,
|
||||
// which may also include "cannot send a new message".
|
||||
const APPROVAL_PENDING_DETAIL_FRAGMENT = "waiting for approval";
|
||||
|
||||
// Error when conversation is busy (another request is being processed)
|
||||
// This is a 409 CONFLICT when trying to send while a run is active
|
||||
const CONVERSATION_BUSY_DETAIL_FRAGMENT =
|
||||
"another request is currently being processed";
|
||||
// ── Async helpers (network side effects — stay here) ────────────────
|
||||
|
||||
type RunErrorMetadata =
|
||||
| {
|
||||
@@ -27,92 +37,6 @@ type RunErrorMetadata =
|
||||
| undefined
|
||||
| null;
|
||||
|
||||
/**
|
||||
* Check if error specifically indicates tool call ID mismatch.
|
||||
* This is a subtype of desync where the server HAS pending approvals,
|
||||
* but they have different IDs than what the client sent.
|
||||
*
|
||||
* Unlike "no tool call is currently awaiting approval" (server has nothing),
|
||||
* this error means we need to FETCH the actual pending approvals to resync.
|
||||
*
|
||||
* Error format:
|
||||
* { detail: "Invalid tool call IDs: Expected ['tc_abc'], got ['tc_xyz']" }
|
||||
*/
|
||||
export function isInvalidToolCallIdsError(detail: unknown): boolean {
|
||||
if (typeof detail !== "string") return false;
|
||||
return detail.toLowerCase().includes(INVALID_TOOL_CALL_IDS_FRAGMENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if error indicates there's a pending approval blocking new messages.
|
||||
* This is the CONFLICT error from the backend when trying to send a user message
|
||||
* while the agent is waiting for approval on a tool call.
|
||||
*
|
||||
* Error format:
|
||||
* { detail: "CONFLICT: Cannot send a new message: The agent is waiting for approval..." }
|
||||
*/
|
||||
export function isApprovalPendingError(detail: unknown): boolean {
|
||||
if (typeof detail !== "string") return false;
|
||||
return detail.toLowerCase().includes(APPROVAL_PENDING_DETAIL_FRAGMENT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if error indicates the conversation is busy (another request is being processed).
|
||||
* This is a 409 CONFLICT when trying to send a message while a run is still active.
|
||||
*
|
||||
* Error format:
|
||||
* { detail: "CONFLICT: Cannot send a new message: Another request is currently being processed..." }
|
||||
*/
|
||||
export function isConversationBusyError(detail: unknown): boolean {
|
||||
if (typeof detail !== "string") return false;
|
||||
return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT);
|
||||
}
|
||||
|
||||
export type PreStreamConflictKind =
|
||||
| "approval_pending"
|
||||
| "conversation_busy"
|
||||
| null;
|
||||
|
||||
export type PreStreamErrorAction =
|
||||
| "resolve_approval_pending"
|
||||
| "retry_conversation_busy"
|
||||
| "rethrow";
|
||||
|
||||
/**
|
||||
* Classify pre-stream 409 conflict details so callers can route recovery logic.
|
||||
*/
|
||||
export function classifyPreStreamConflict(
|
||||
detail: unknown,
|
||||
): PreStreamConflictKind {
|
||||
if (isApprovalPendingError(detail)) return "approval_pending";
|
||||
if (isConversationBusyError(detail)) return "conversation_busy";
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine pre-stream recovery action for one-shot headless sends.
|
||||
*/
|
||||
export function getPreStreamErrorAction(
|
||||
detail: unknown,
|
||||
conversationBusyRetries: number,
|
||||
maxConversationBusyRetries: number,
|
||||
): PreStreamErrorAction {
|
||||
const kind = classifyPreStreamConflict(detail);
|
||||
|
||||
if (kind === "approval_pending") {
|
||||
return "resolve_approval_pending";
|
||||
}
|
||||
|
||||
if (
|
||||
kind === "conversation_busy" &&
|
||||
conversationBusyRetries < maxConversationBusyRetries
|
||||
) {
|
||||
return "retry_conversation_busy";
|
||||
}
|
||||
|
||||
return "rethrow";
|
||||
}
|
||||
|
||||
export async function fetchRunErrorDetail(
|
||||
runId: string | null | undefined,
|
||||
): Promise<string | null> {
|
||||
@@ -133,70 +57,3 @@ export async function fetchRunErrorDetail(
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract error detail string from a pre-stream APIError's nested body.
|
||||
*
|
||||
* Handles the common SDK error shapes:
|
||||
* - Nested: `e.error.error.detail` → `e.error.error.message`
|
||||
* - Direct: `e.error.detail` → `e.error.message`
|
||||
* - Error: `e.message`
|
||||
*
|
||||
* Checks `detail` first (specific) then `message` (generic) at each level.
|
||||
*/
|
||||
export function extractConflictDetail(error: unknown): string {
|
||||
if (error && typeof error === "object" && "error" in error) {
|
||||
const errObj = (error as Record<string, unknown>).error;
|
||||
if (errObj && typeof errObj === "object") {
|
||||
const outer = errObj as Record<string, unknown>;
|
||||
// Nested: e.error.error.detail → e.error.error.message
|
||||
if (outer.error && typeof outer.error === "object") {
|
||||
const nested = outer.error as Record<string, unknown>;
|
||||
if (typeof nested.detail === "string") return nested.detail;
|
||||
if (typeof nested.message === "string") return nested.message;
|
||||
}
|
||||
// Direct: e.error.detail → e.error.message
|
||||
if (typeof outer.detail === "string") return outer.detail;
|
||||
if (typeof outer.message === "string") return outer.message;
|
||||
}
|
||||
}
|
||||
if (error instanceof Error) return error.message;
|
||||
return "";
|
||||
}
|
||||
|
||||
interface PendingApprovalInfo {
|
||||
toolCallId: string;
|
||||
toolName: string;
|
||||
toolArgs: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip stale approval payloads from the message input array and optionally
|
||||
* prepend fresh denial results for the actual pending approvals from the server.
|
||||
*
|
||||
* Used during approval-conflict recovery: the original payload may contain
|
||||
* queued approvals from an interrupt that the backend already rejected. This
|
||||
* replaces them with denials for the real pending approvals.
|
||||
*/
|
||||
export function rebuildInputWithFreshDenials(
|
||||
currentInput: Array<MessageCreate | ApprovalCreate>,
|
||||
serverApprovals: PendingApprovalInfo[],
|
||||
denialReason: string,
|
||||
): Array<MessageCreate | ApprovalCreate> {
|
||||
const stripped = currentInput.filter((item) => item?.type !== "approval");
|
||||
|
||||
if (serverApprovals.length > 0) {
|
||||
const denials: ApprovalCreate = {
|
||||
type: "approval",
|
||||
approvals: serverApprovals.map((a) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: a.toolCallId,
|
||||
approve: false,
|
||||
reason: denialReason,
|
||||
})),
|
||||
};
|
||||
return [denials, ...stripped];
|
||||
}
|
||||
|
||||
return stripped;
|
||||
}
|
||||
|
||||
161
src/agent/turn-recovery-policy.ts
Normal file
161
src/agent/turn-recovery-policy.ts
Normal file
@@ -0,0 +1,161 @@
|
||||
/**
|
||||
* Pure, framework-agnostic policy helpers for turn-level recovery.
|
||||
*
|
||||
* Both TUI (App.tsx) and headless (headless.ts) consume these helpers
|
||||
* so that identical conflict inputs always produce the same recovery
|
||||
* action. No network calls, no React, no stream-json output.
|
||||
*/
|
||||
|
||||
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
|
||||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
|
||||
// ── Error fragment constants ────────────────────────────────────────
|
||||
|
||||
const INVALID_TOOL_CALL_IDS_FRAGMENT = "invalid tool call ids";
|
||||
const APPROVAL_PENDING_DETAIL_FRAGMENT = "waiting for approval";
|
||||
const CONVERSATION_BUSY_DETAIL_FRAGMENT =
|
||||
"another request is currently being processed";
|
||||
|
||||
// ── Classifiers ─────────────────────────────────────────────────────
|
||||
|
||||
/** Tool call IDs don't match what the server expects. */
|
||||
export function isInvalidToolCallIdsError(detail: unknown): boolean {
|
||||
if (typeof detail !== "string") return false;
|
||||
return detail.toLowerCase().includes(INVALID_TOOL_CALL_IDS_FRAGMENT);
|
||||
}
|
||||
|
||||
/** Backend has a pending approval blocking new messages. */
|
||||
export function isApprovalPendingError(detail: unknown): boolean {
|
||||
if (typeof detail !== "string") return false;
|
||||
return detail.toLowerCase().includes(APPROVAL_PENDING_DETAIL_FRAGMENT);
|
||||
}
|
||||
|
||||
/** Conversation is busy (another request is being processed). */
|
||||
export function isConversationBusyError(detail: unknown): boolean {
|
||||
if (typeof detail !== "string") return false;
|
||||
return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT);
|
||||
}
|
||||
|
||||
// ── Pre-stream conflict routing ─────────────────────────────────────
|
||||
|
||||
export type PreStreamConflictKind =
|
||||
| "approval_pending"
|
||||
| "conversation_busy"
|
||||
| null;
|
||||
|
||||
export type PreStreamErrorAction =
|
||||
| "resolve_approval_pending"
|
||||
| "retry_conversation_busy"
|
||||
| "rethrow";
|
||||
|
||||
/** Classify a pre-stream 409 conflict detail string. */
|
||||
export function classifyPreStreamConflict(
|
||||
detail: unknown,
|
||||
): PreStreamConflictKind {
|
||||
if (isApprovalPendingError(detail)) return "approval_pending";
|
||||
if (isConversationBusyError(detail)) return "conversation_busy";
|
||||
return null;
|
||||
}
|
||||
|
||||
/** Determine the recovery action for a pre-stream 409 error. */
|
||||
export function getPreStreamErrorAction(
|
||||
detail: unknown,
|
||||
conversationBusyRetries: number,
|
||||
maxConversationBusyRetries: number,
|
||||
): PreStreamErrorAction {
|
||||
const kind = classifyPreStreamConflict(detail);
|
||||
|
||||
if (kind === "approval_pending") {
|
||||
return "resolve_approval_pending";
|
||||
}
|
||||
|
||||
if (
|
||||
kind === "conversation_busy" &&
|
||||
conversationBusyRetries < maxConversationBusyRetries
|
||||
) {
|
||||
return "retry_conversation_busy";
|
||||
}
|
||||
|
||||
return "rethrow";
|
||||
}
|
||||
|
||||
// ── Error text extraction ───────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Extract error detail string from a pre-stream APIError's nested body.
|
||||
*
|
||||
* Handles the common SDK error shapes:
|
||||
* - Nested: `e.error.error.detail` → `e.error.error.message`
|
||||
* - Direct: `e.error.detail` → `e.error.message`
|
||||
* - Error: `e.message`
|
||||
*
|
||||
* Checks `detail` first (specific) then `message` (generic) at each level.
|
||||
*/
|
||||
export function extractConflictDetail(error: unknown): string {
|
||||
if (error && typeof error === "object" && "error" in error) {
|
||||
const errObj = (error as Record<string, unknown>).error;
|
||||
if (errObj && typeof errObj === "object") {
|
||||
const outer = errObj as Record<string, unknown>;
|
||||
// Nested: e.error.error.detail → e.error.error.message
|
||||
if (outer.error && typeof outer.error === "object") {
|
||||
const nested = outer.error as Record<string, unknown>;
|
||||
if (typeof nested.detail === "string") return nested.detail;
|
||||
if (typeof nested.message === "string") return nested.message;
|
||||
}
|
||||
// Direct: e.error.detail → e.error.message
|
||||
if (typeof outer.detail === "string") return outer.detail;
|
||||
if (typeof outer.message === "string") return outer.message;
|
||||
}
|
||||
}
|
||||
if (error instanceof Error) return error.message;
|
||||
return "";
|
||||
}
|
||||
|
||||
// ── Approval payload rebuild ────────────────────────────────────────
|
||||
|
||||
export interface PendingApprovalInfo {
|
||||
toolCallId: string;
|
||||
toolName: string;
|
||||
toolArgs: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Strip stale approval payloads from the message input array and optionally
|
||||
* prepend fresh denial results for the actual pending approvals from the server.
|
||||
*/
|
||||
export function rebuildInputWithFreshDenials(
|
||||
currentInput: Array<MessageCreate | ApprovalCreate>,
|
||||
serverApprovals: PendingApprovalInfo[],
|
||||
denialReason: string,
|
||||
): Array<MessageCreate | ApprovalCreate> {
|
||||
const stripped = currentInput.filter((item) => item?.type !== "approval");
|
||||
|
||||
if (serverApprovals.length > 0) {
|
||||
const denials: ApprovalCreate = {
|
||||
type: "approval",
|
||||
approvals: serverApprovals.map((a) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: a.toolCallId,
|
||||
approve: false,
|
||||
reason: denialReason,
|
||||
})),
|
||||
};
|
||||
return [denials, ...stripped];
|
||||
}
|
||||
|
||||
return stripped;
|
||||
}
|
||||
|
||||
// ── Retry gating ────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Decide whether an approval-pending recovery attempt should proceed.
|
||||
* Centralizes the retry-budget check used by both TUI and headless.
|
||||
*/
|
||||
export function shouldAttemptApprovalRecovery(opts: {
|
||||
approvalPendingDetected: boolean;
|
||||
retries: number;
|
||||
maxRetries: number;
|
||||
}): boolean {
|
||||
return opts.approvalPendingDetected && opts.retries < opts.maxRetries;
|
||||
}
|
||||
@@ -35,6 +35,7 @@ import {
|
||||
isApprovalPendingError,
|
||||
isInvalidToolCallIdsError,
|
||||
rebuildInputWithFreshDenials,
|
||||
shouldAttemptApprovalRecovery,
|
||||
} from "../agent/approval-recovery";
|
||||
import { prefetchAvailableModelHandles } from "../agent/available-models";
|
||||
import { getResumeData } from "../agent/check-approval";
|
||||
@@ -3157,8 +3158,12 @@ export default function App({
|
||||
// Shares llmApiErrorRetriesRef budget with LLM transient-error retries (max 3 per turn).
|
||||
// Resets on each processConversation entry and on success.
|
||||
if (
|
||||
preStreamAction === "resolve_approval_pending" &&
|
||||
llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected:
|
||||
preStreamAction === "resolve_approval_pending",
|
||||
retries: llmApiErrorRetriesRef.current,
|
||||
maxRetries: LLM_API_ERROR_MAX_RETRIES,
|
||||
})
|
||||
) {
|
||||
llmApiErrorRetriesRef.current += 1;
|
||||
try {
|
||||
@@ -4315,8 +4320,11 @@ export default function App({
|
||||
isApprovalPendingError(latestErrorText);
|
||||
|
||||
if (
|
||||
approvalPendingDetected &&
|
||||
llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected,
|
||||
retries: llmApiErrorRetriesRef.current,
|
||||
maxRetries: LLM_API_ERROR_MAX_RETRIES,
|
||||
})
|
||||
) {
|
||||
llmApiErrorRetriesRef.current += 1;
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/mes
|
||||
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
|
||||
import type { ApprovalResult } from "./agent/approval-execution";
|
||||
import {
|
||||
extractConflictDetail,
|
||||
fetchRunErrorDetail,
|
||||
getPreStreamErrorAction,
|
||||
isApprovalPendingError,
|
||||
@@ -1188,30 +1189,8 @@ ${SYSTEM_REMINDER_CLOSE}
|
||||
agentId: agent.id,
|
||||
});
|
||||
} catch (preStreamError) {
|
||||
// Extract error detail from APIError
|
||||
let errorDetail = "";
|
||||
if (
|
||||
preStreamError instanceof APIError &&
|
||||
preStreamError.error &&
|
||||
typeof preStreamError.error === "object"
|
||||
) {
|
||||
const errObj = preStreamError.error as Record<string, unknown>;
|
||||
if (
|
||||
errObj.error &&
|
||||
typeof errObj.error === "object" &&
|
||||
"detail" in errObj.error
|
||||
) {
|
||||
const nested = errObj.error as Record<string, unknown>;
|
||||
errorDetail =
|
||||
typeof nested.detail === "string" ? nested.detail : "";
|
||||
}
|
||||
if (!errorDetail && typeof errObj.detail === "string") {
|
||||
errorDetail = errObj.detail;
|
||||
}
|
||||
}
|
||||
if (!errorDetail && preStreamError instanceof Error) {
|
||||
errorDetail = preStreamError.message;
|
||||
}
|
||||
// Extract error detail using shared helper (handles nested/direct/message shapes)
|
||||
const errorDetail = extractConflictDetail(preStreamError);
|
||||
|
||||
const preStreamAction = getPreStreamErrorAction(
|
||||
errorDetail,
|
||||
@@ -2444,31 +2423,14 @@ async function runBidirectionalMode(
|
||||
agentId: agent.id,
|
||||
});
|
||||
} catch (preStreamError) {
|
||||
let errorDetail = "";
|
||||
if (
|
||||
preStreamError instanceof APIError &&
|
||||
preStreamError.error &&
|
||||
typeof preStreamError.error === "object"
|
||||
) {
|
||||
const errObj = preStreamError.error as Record<string, unknown>;
|
||||
if (
|
||||
errObj.error &&
|
||||
typeof errObj.error === "object" &&
|
||||
"detail" in errObj.error
|
||||
) {
|
||||
const nested = errObj.error as Record<string, unknown>;
|
||||
errorDetail =
|
||||
typeof nested.detail === "string" ? nested.detail : "";
|
||||
}
|
||||
if (!errorDetail && typeof errObj.detail === "string") {
|
||||
errorDetail = errObj.detail;
|
||||
}
|
||||
}
|
||||
if (!errorDetail && preStreamError instanceof Error) {
|
||||
errorDetail = preStreamError.message;
|
||||
}
|
||||
// Extract error detail using shared helper (handles nested/direct/message shapes)
|
||||
const errorDetail = extractConflictDetail(preStreamError);
|
||||
|
||||
if (isApprovalPendingError(errorDetail)) {
|
||||
// Route through shared pre-stream conflict classifier (parity with main loop + TUI)
|
||||
// Bidir mode has no conversation-busy retry budget, so pass 0/0 to disable busy-retry.
|
||||
const preStreamAction = getPreStreamErrorAction(errorDetail, 0, 0);
|
||||
|
||||
if (preStreamAction === "resolve_approval_pending") {
|
||||
const recoveryMsg: RecoveryMessage = {
|
||||
type: "recovery",
|
||||
recovery_type: "approval_pending",
|
||||
|
||||
@@ -21,7 +21,7 @@ describe("approval recovery wiring", () => {
|
||||
|
||||
expect(segment).toContain("extractConflictDetail(preStreamError)");
|
||||
expect(segment).toContain("getPreStreamErrorAction(");
|
||||
expect(segment).toContain('preStreamAction === "resolve_approval_pending"');
|
||||
expect(segment).toContain("shouldAttemptApprovalRecovery(");
|
||||
expect(segment).toContain("rebuildInputWithFreshDenials(");
|
||||
});
|
||||
|
||||
@@ -39,7 +39,7 @@ describe("approval recovery wiring", () => {
|
||||
|
||||
const segment = source.slice(start, end);
|
||||
|
||||
expect(segment).toContain("approvalPendingDetected &&");
|
||||
expect(segment).toContain("shouldAttemptApprovalRecovery(");
|
||||
expect(segment).not.toContain("!hasApprovalInPayload &&");
|
||||
});
|
||||
|
||||
|
||||
68
src/tests/headless/approval-recovery-wiring.test.ts
Normal file
68
src/tests/headless/approval-recovery-wiring.test.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
describe("headless approval recovery wiring", () => {
|
||||
const headlessPath = fileURLToPath(
|
||||
new URL("../../headless.ts", import.meta.url),
|
||||
);
|
||||
const source = readFileSync(headlessPath, "utf-8");
|
||||
|
||||
test("main loop pre-stream catch uses extractConflictDetail (not inline extraction)", () => {
|
||||
// Find the first pre-stream catch block (main headless loop)
|
||||
const start = source.indexOf("} catch (preStreamError) {");
|
||||
expect(start).toBeGreaterThan(-1);
|
||||
|
||||
// Get the catch block up to the next significant landmark
|
||||
const end = source.indexOf(
|
||||
"// Check for pending approval blocking new messages",
|
||||
start,
|
||||
);
|
||||
expect(end).toBeGreaterThan(start);
|
||||
|
||||
const segment = source.slice(start, end);
|
||||
|
||||
// Should use shared extractConflictDetail, NOT inline APIError parsing
|
||||
expect(segment).toContain("extractConflictDetail(preStreamError)");
|
||||
expect(segment).not.toContain("let errorDetail = ");
|
||||
});
|
||||
|
||||
test("bidirectional loop pre-stream catch uses shared extraction and router (not inline)", () => {
|
||||
// Find the second pre-stream catch block (bidirectional mode)
|
||||
const firstCatch = source.indexOf("} catch (preStreamError) {");
|
||||
const secondCatch = source.indexOf(
|
||||
"} catch (preStreamError) {",
|
||||
firstCatch + 1,
|
||||
);
|
||||
expect(secondCatch).toBeGreaterThan(firstCatch);
|
||||
|
||||
// Get segment up to the throw
|
||||
const throwSite = source.indexOf("throw preStreamError;", secondCatch);
|
||||
expect(throwSite).toBeGreaterThan(secondCatch);
|
||||
|
||||
const segment = source.slice(secondCatch, throwSite);
|
||||
|
||||
// Should use shared extractConflictDetail, NOT inline APIError parsing
|
||||
expect(segment).toContain("extractConflictDetail(preStreamError)");
|
||||
expect(segment).not.toContain("let errorDetail = ");
|
||||
// Should use shared router, NOT bespoke isApprovalPendingError check
|
||||
expect(segment).toContain("getPreStreamErrorAction(");
|
||||
expect(segment).toContain('preStreamAction === "resolve_approval_pending"');
|
||||
});
|
||||
|
||||
test("main loop pre-stream uses getPreStreamErrorAction router", () => {
|
||||
const start = source.indexOf("} catch (preStreamError) {");
|
||||
const end = source.indexOf("throw preStreamError;", start);
|
||||
expect(end).toBeGreaterThan(start);
|
||||
|
||||
const segment = source.slice(start, end);
|
||||
expect(segment).toContain("getPreStreamErrorAction(");
|
||||
});
|
||||
|
||||
test("imports extractConflictDetail from approval-recovery", () => {
|
||||
expect(source).toContain("extractConflictDetail");
|
||||
// Verify it's imported, not locally defined
|
||||
const importBlock = source.slice(0, source.indexOf("export "));
|
||||
expect(importBlock).toContain("extractConflictDetail");
|
||||
});
|
||||
});
|
||||
327
src/tests/turn-recovery-policy.test.ts
Normal file
327
src/tests/turn-recovery-policy.test.ts
Normal file
@@ -0,0 +1,327 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import {
|
||||
classifyPreStreamConflict,
|
||||
extractConflictDetail,
|
||||
getPreStreamErrorAction,
|
||||
isApprovalPendingError,
|
||||
isConversationBusyError,
|
||||
isInvalidToolCallIdsError,
|
||||
rebuildInputWithFreshDenials,
|
||||
shouldAttemptApprovalRecovery,
|
||||
} from "../agent/turn-recovery-policy";
|
||||
|
||||
// ── Classifier parity ───────────────────────────────────────────────
|
||||
|
||||
describe("isApprovalPendingError", () => {
|
||||
test("detects real CONFLICT error", () => {
|
||||
expect(
|
||||
isApprovalPendingError(
|
||||
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call.",
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
test("case insensitive", () => {
|
||||
expect(isApprovalPendingError("WAITING FOR APPROVAL")).toBe(true);
|
||||
});
|
||||
|
||||
test("does not match conversation-busy", () => {
|
||||
expect(
|
||||
isApprovalPendingError(
|
||||
"CONFLICT: Another request is currently being processed",
|
||||
),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
test("rejects non-string", () => {
|
||||
expect(isApprovalPendingError(42)).toBe(false);
|
||||
expect(isApprovalPendingError(null)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isConversationBusyError", () => {
|
||||
test("detects real busy error", () => {
|
||||
expect(
|
||||
isConversationBusyError(
|
||||
"CONFLICT: Cannot send a new message: Another request is currently being processed for this conversation.",
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
test("rejects approval-pending", () => {
|
||||
expect(isConversationBusyError("The agent is waiting for approval")).toBe(
|
||||
false,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isInvalidToolCallIdsError", () => {
|
||||
test("detects ID mismatch", () => {
|
||||
expect(
|
||||
isInvalidToolCallIdsError(
|
||||
"Invalid tool call IDs: Expected ['tc_abc'], got ['tc_xyz']",
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
test("rejects unrelated", () => {
|
||||
expect(isInvalidToolCallIdsError("Connection refused")).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ── Pre-stream conflict routing ─────────────────────────────────────
|
||||
|
||||
describe("classifyPreStreamConflict", () => {
|
||||
test("approval pending", () => {
|
||||
expect(
|
||||
classifyPreStreamConflict("waiting for approval on a tool call"),
|
||||
).toBe("approval_pending");
|
||||
});
|
||||
|
||||
test("conversation busy", () => {
|
||||
expect(
|
||||
classifyPreStreamConflict("another request is currently being processed"),
|
||||
).toBe("conversation_busy");
|
||||
});
|
||||
|
||||
test("unknown", () => {
|
||||
expect(classifyPreStreamConflict("Connection refused")).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("getPreStreamErrorAction", () => {
|
||||
test("approval pending → resolve", () => {
|
||||
expect(getPreStreamErrorAction("waiting for approval", 0, 3)).toBe(
|
||||
"resolve_approval_pending",
|
||||
);
|
||||
});
|
||||
|
||||
test("conversation busy with budget → retry", () => {
|
||||
expect(
|
||||
getPreStreamErrorAction(
|
||||
"another request is currently being processed",
|
||||
0,
|
||||
3,
|
||||
),
|
||||
).toBe("retry_conversation_busy");
|
||||
});
|
||||
|
||||
test("conversation busy, budget exhausted → rethrow", () => {
|
||||
expect(
|
||||
getPreStreamErrorAction(
|
||||
"another request is currently being processed",
|
||||
3,
|
||||
3,
|
||||
),
|
||||
).toBe("rethrow");
|
||||
});
|
||||
|
||||
test("unknown error → rethrow", () => {
|
||||
expect(getPreStreamErrorAction("Connection refused", 0, 3)).toBe("rethrow");
|
||||
});
|
||||
|
||||
// Parity: TUI and headless both pass the same (detail, retries, max) triple
|
||||
// to this function — verifying the action is deterministic from those inputs.
|
||||
test("same inputs always produce same action (determinism)", () => {
|
||||
const detail =
|
||||
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call.";
|
||||
const a = getPreStreamErrorAction(detail, 1, 3);
|
||||
const b = getPreStreamErrorAction(detail, 1, 3);
|
||||
expect(a).toBe(b);
|
||||
expect(a).toBe("resolve_approval_pending");
|
||||
});
|
||||
});
|
||||
|
||||
// ── Error text extraction ───────────────────────────────────────────
|
||||
|
||||
describe("extractConflictDetail", () => {
|
||||
test("nested: e.error.error.detail", () => {
|
||||
const err = {
|
||||
error: {
|
||||
error: {
|
||||
detail: "CONFLICT: waiting for approval",
|
||||
message: "generic",
|
||||
},
|
||||
},
|
||||
};
|
||||
expect(extractConflictDetail(err)).toBe("CONFLICT: waiting for approval");
|
||||
});
|
||||
|
||||
test("nested: falls back to e.error.error.message", () => {
|
||||
const err = { error: { error: { message: "fallback msg" } } };
|
||||
expect(extractConflictDetail(err)).toBe("fallback msg");
|
||||
});
|
||||
|
||||
test("flat: e.error.detail", () => {
|
||||
const err = {
|
||||
error: { detail: "another request is currently being processed" },
|
||||
};
|
||||
expect(extractConflictDetail(err)).toBe(
|
||||
"another request is currently being processed",
|
||||
);
|
||||
});
|
||||
|
||||
test("flat: e.error.message", () => {
|
||||
const err = { error: { message: "some error" } };
|
||||
expect(extractConflictDetail(err)).toBe("some error");
|
||||
});
|
||||
|
||||
test("Error instance", () => {
|
||||
expect(extractConflictDetail(new Error("boom"))).toBe("boom");
|
||||
});
|
||||
|
||||
test("non-error returns empty string", () => {
|
||||
expect(extractConflictDetail(null)).toBe("");
|
||||
expect(extractConflictDetail(42)).toBe("");
|
||||
expect(extractConflictDetail("string")).toBe("");
|
||||
});
|
||||
|
||||
// Parity: same APIError shape from headless and TUI → same extracted text
|
||||
test("end-to-end: extraction feeds into classifier correctly", () => {
|
||||
const sdkError = {
|
||||
error: {
|
||||
error: {
|
||||
message_type: "error_message",
|
||||
error_type: "internal_error",
|
||||
message: "An unknown error occurred with the LLM streaming request.",
|
||||
detail:
|
||||
"CONFLICT: Cannot send a new message: The agent is waiting for approval on a tool call.",
|
||||
},
|
||||
run_id: "run-abc",
|
||||
},
|
||||
};
|
||||
const detail = extractConflictDetail(sdkError);
|
||||
expect(isApprovalPendingError(detail)).toBe(true);
|
||||
expect(isConversationBusyError(detail)).toBe(false);
|
||||
expect(getPreStreamErrorAction(detail, 0, 3)).toBe(
|
||||
"resolve_approval_pending",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
// ── Stale approval payload rewrite ──────────────────────────────────
|
||||
|
||||
describe("rebuildInputWithFreshDenials", () => {
|
||||
const userMsg = {
|
||||
type: "message" as const,
|
||||
role: "user" as const,
|
||||
content: "hello",
|
||||
};
|
||||
|
||||
test("strips stale + prepends fresh denials", () => {
|
||||
const input = [
|
||||
{
|
||||
type: "approval" as const,
|
||||
approvals: [
|
||||
{
|
||||
type: "tool" as const,
|
||||
tool_call_id: "stale",
|
||||
tool_return: "Interrupted",
|
||||
status: "error" as const,
|
||||
},
|
||||
],
|
||||
},
|
||||
userMsg,
|
||||
];
|
||||
const result = rebuildInputWithFreshDenials(
|
||||
input,
|
||||
[{ toolCallId: "real", toolName: "Read", toolArgs: "{}" }],
|
||||
"denied",
|
||||
);
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0]?.type).toBe("approval");
|
||||
expect(result[1]?.type).toBe("message");
|
||||
});
|
||||
|
||||
test("no server approvals → strips only", () => {
|
||||
const input = [
|
||||
{ type: "approval" as const, approvals: [] as never[] },
|
||||
userMsg,
|
||||
];
|
||||
const result = rebuildInputWithFreshDenials(input, [], "");
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0]?.type).toBe("message");
|
||||
});
|
||||
|
||||
test("no stale approvals → prepends fresh", () => {
|
||||
const result = rebuildInputWithFreshDenials(
|
||||
[userMsg],
|
||||
[{ toolCallId: "new", toolName: "Bash", toolArgs: "{}" }],
|
||||
"auto-denied",
|
||||
);
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0]?.type).toBe("approval");
|
||||
expect(result[1]?.type).toBe("message");
|
||||
});
|
||||
});
|
||||
|
||||
// ── Retry gating ────────────────────────────────────────────────────
|
||||
|
||||
describe("shouldAttemptApprovalRecovery", () => {
|
||||
test("true when detected and under budget", () => {
|
||||
expect(
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: true,
|
||||
retries: 0,
|
||||
maxRetries: 3,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
test("true at boundary (retries < max)", () => {
|
||||
expect(
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: true,
|
||||
retries: 2,
|
||||
maxRetries: 3,
|
||||
}),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
test("false when budget exhausted (retries === max)", () => {
|
||||
expect(
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: true,
|
||||
retries: 3,
|
||||
maxRetries: 3,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
test("false when over budget", () => {
|
||||
expect(
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: true,
|
||||
retries: 5,
|
||||
maxRetries: 3,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
test("false when not detected", () => {
|
||||
expect(
|
||||
shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: false,
|
||||
retries: 0,
|
||||
maxRetries: 3,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
// Parity: TUI uses llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
|
||||
// headless uses llmApiErrorRetries < LLM_API_ERROR_MAX_RETRIES
|
||||
// Both should produce the same result for the same inputs.
|
||||
test("parity: same inputs → same decision regardless of caller", () => {
|
||||
const tuiResult = shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: true,
|
||||
retries: 1,
|
||||
maxRetries: 3,
|
||||
});
|
||||
const headlessResult = shouldAttemptApprovalRecovery({
|
||||
approvalPendingDetected: true,
|
||||
retries: 1,
|
||||
maxRetries: 3,
|
||||
});
|
||||
expect(tuiResult).toBe(headlessResult);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user