fix: expand chatgpt oauth retry classification (#980)

This commit is contained in:
Charles Packer
2026-02-16 14:36:15 -08:00
committed by GitHub
parent f2f59e4591
commit 5435f44c45
10 changed files with 430 additions and 49 deletions

View File

@@ -12,6 +12,7 @@ export type {
PendingApprovalInfo,
PreStreamConflictKind,
PreStreamErrorAction,
PreStreamErrorOptions,
} from "./turn-recovery-policy";
// ── Re-export pure policy helpers (single source of truth) ──────────
export {
@@ -21,8 +22,13 @@ export {
isApprovalPendingError,
isConversationBusyError,
isInvalidToolCallIdsError,
isNonRetryableProviderErrorDetail,
isRetryableProviderErrorDetail,
parseRetryAfterHeaderMs,
rebuildInputWithFreshDenials,
shouldAttemptApprovalRecovery,
shouldRetryPreStreamTransientError,
shouldRetryRunMetadataError,
} from "./turn-recovery-policy";
// ── Async helpers (network side effects — stay here) ────────────────

View File

@@ -15,6 +15,39 @@ const INVALID_TOOL_CALL_IDS_FRAGMENT = "invalid tool call ids";
const APPROVAL_PENDING_DETAIL_FRAGMENT = "waiting for approval";
const CONVERSATION_BUSY_DETAIL_FRAGMENT =
"another request is currently being processed";
const RETRYABLE_PROVIDER_DETAIL_PATTERNS = [
"Anthropic API error",
"OpenAI API error",
"Google Vertex API error",
"ChatGPT API error",
"ChatGPT server error",
"Connection error during Anthropic streaming",
"Connection error during streaming",
"upstream connect error",
"connection termination",
"peer closed connection",
"incomplete chunked read",
"Network error",
"Connection error",
"Request timed out",
"overloaded",
"api_error",
];
const NON_RETRYABLE_PROVIDER_DETAIL_PATTERNS = [
"invalid api key",
"incorrect api key",
"authentication error",
"unauthorized",
"permission denied",
"forbidden",
"invalid_request_error",
"invalid model",
"model_not_found",
"context_length_exceeded",
"invalid_encrypted_content",
];
const NON_RETRYABLE_4XX_PATTERN = /Error code:\s*4(0[0-8]|1\d|2\d|3\d|4\d|51)/i;
const RETRYABLE_429_PATTERN = /Error code:\s*429|rate limit|too many requests/i;
// ── Classifiers ─────────────────────────────────────────────────────
@@ -36,6 +69,75 @@ export function isConversationBusyError(detail: unknown): boolean {
return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT);
}
/** Transient provider/network detail that is usually safe to retry. */
export function isRetryableProviderErrorDetail(detail: unknown): boolean {
if (typeof detail !== "string") return false;
return RETRYABLE_PROVIDER_DETAIL_PATTERNS.some((pattern) =>
detail.includes(pattern),
);
}
/** Non-transient auth/validation style provider detail that should not be retried. */
export function isNonRetryableProviderErrorDetail(detail: unknown): boolean {
if (typeof detail !== "string") return false;
const normalized = detail.toLowerCase();
if (NON_RETRYABLE_4XX_PATTERN.test(detail)) return true;
return NON_RETRYABLE_PROVIDER_DETAIL_PATTERNS.some((pattern) =>
normalized.includes(pattern),
);
}
/** Retry decision for run-metadata fallback classification. */
export function shouldRetryRunMetadataError(
errorType: unknown,
detail: unknown,
): boolean {
const explicitLlmError = errorType === "llm_error";
const retryable429Detail =
typeof detail === "string" && RETRYABLE_429_PATTERN.test(detail);
const retryableDetail = isRetryableProviderErrorDetail(detail);
const nonRetryableDetail = isNonRetryableProviderErrorDetail(detail);
if (nonRetryableDetail && !retryable429Detail) return false;
if (explicitLlmError) return true;
return retryable429Detail || retryableDetail;
}
/** Retry decision for pre-stream send failures before any chunks are yielded. */
export function shouldRetryPreStreamTransientError(opts: {
status: number | undefined;
detail: unknown;
}): boolean {
const { status, detail } = opts;
if (status === 429) return true;
if (status !== undefined && status >= 500) return true;
if (status !== undefined && status >= 400) return false;
const retryable429Detail =
typeof detail === "string" && RETRYABLE_429_PATTERN.test(detail);
if (retryable429Detail) return true;
if (isNonRetryableProviderErrorDetail(detail)) return false;
return isRetryableProviderErrorDetail(detail);
}
/** Parse Retry-After header to milliseconds (seconds or HTTP-date forms). */
export function parseRetryAfterHeaderMs(
retryAfterValue: string | null | undefined,
): number | null {
if (!retryAfterValue) return null;
const seconds = Number(retryAfterValue);
if (Number.isFinite(seconds) && seconds >= 0) {
return Math.round(seconds * 1000);
}
const retryAtMs = Date.parse(retryAfterValue);
if (Number.isNaN(retryAtMs)) return null;
const delayMs = retryAtMs - Date.now();
return delayMs > 0 ? delayMs : 0;
}
// ── Pre-stream conflict routing ─────────────────────────────────────
export type PreStreamConflictKind =
@@ -46,8 +148,15 @@ export type PreStreamConflictKind =
export type PreStreamErrorAction =
| "resolve_approval_pending"
| "retry_conversation_busy"
| "retry_transient"
| "rethrow";
export interface PreStreamErrorOptions {
status?: number;
transientRetries?: number;
maxTransientRetries?: number;
}
/** Classify a pre-stream 409 conflict detail string. */
export function classifyPreStreamConflict(
detail: unknown,
@@ -62,6 +171,7 @@ export function getPreStreamErrorAction(
detail: unknown,
conversationBusyRetries: number,
maxConversationBusyRetries: number,
opts?: PreStreamErrorOptions,
): PreStreamErrorAction {
const kind = classifyPreStreamConflict(detail);
@@ -76,6 +186,14 @@ export function getPreStreamErrorAction(
return "retry_conversation_busy";
}
if (
opts &&
shouldRetryPreStreamTransientError({ status: opts.status, detail }) &&
(opts.transientRetries ?? 0) < (opts.maxTransientRetries ?? 0)
) {
return "retry_transient";
}
return "rethrow";
}

View File

@@ -34,8 +34,10 @@ import {
getPreStreamErrorAction,
isApprovalPendingError,
isInvalidToolCallIdsError,
parseRetryAfterHeaderMs,
rebuildInputWithFreshDenials,
shouldAttemptApprovalRecovery,
shouldRetryRunMetadataError,
} from "../agent/approval-recovery";
import { prefetchAvailableModelHandles } from "../agent/available-models";
import { getResumeData } from "../agent/check-approval";
@@ -482,29 +484,7 @@ async function isRetriableError(
const errorType = metaError?.error_type ?? metaError?.error?.error_type;
const detail = metaError?.detail ?? metaError?.error?.detail ?? "";
// Don't retry 4xx client errors (validation, auth, malformed requests)
// These are not transient and won't succeed on retry
const is4xxError = /Error code: 4\d{2}/.test(detail);
if (errorType === "llm_error" && !is4xxError) return true;
// Fallback: detect LLM provider errors from detail even if misclassified
// This handles edge cases where streaming errors weren't properly converted to LLMError
// Patterns are derived from handle_llm_error() message formats in the backend
const llmProviderPatterns = [
"Anthropic API error", // anthropic_client.py:759
"OpenAI API error", // openai_client.py:1034
"ChatGPT API error", // chatgpt_oauth_client.py - upstream connect errors
"Google Vertex API error", // google_vertex_client.py:848
"overloaded", // anthropic_client.py:753 - used for LLMProviderOverloaded
"api_error", // Anthropic SDK error type field
"Network error", // Transient network failures during streaming
"Connection error during", // Peer disconnections, incomplete chunked reads (Anthropic, ChatGPT streaming)
];
if (
llmProviderPatterns.some((pattern) => detail.includes(pattern)) &&
!is4xxError
) {
if (shouldRetryRunMetadataError(errorType, detail)) {
return true;
}
@@ -3156,6 +3136,14 @@ export default function App({
errorDetail,
conversationBusyRetriesRef.current,
CONVERSATION_BUSY_MAX_RETRIES,
{
status:
preStreamError instanceof APIError
? preStreamError.status
: undefined,
transientRetries: llmApiErrorRetriesRef.current,
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
},
);
// Resolve stale approval conflict: fetch real pending approvals, auto-deny, retry.
@@ -3238,6 +3226,54 @@ export default function App({
// User pressed ESC - fall through to error handling
}
// Retry pre-stream transient errors (429/5xx/network) with shared LLM retry budget
if (preStreamAction === "retry_transient") {
llmApiErrorRetriesRef.current += 1;
const attempt = llmApiErrorRetriesRef.current;
const retryAfterMs =
preStreamError instanceof APIError
? parseRetryAfterHeaderMs(
preStreamError.headers?.get("retry-after"),
)
: null;
const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1);
const statusId = uid("status");
buffersRef.current.byId.set(statusId, {
kind: "status",
id: statusId,
lines: [getRetryStatusMessage(errorDetail)],
});
buffersRef.current.order.push(statusId);
refreshDerived();
let cancelled = false;
const startTime = Date.now();
while (Date.now() - startTime < delayMs) {
if (
abortControllerRef.current?.signal.aborted ||
userCancelledRef.current
) {
cancelled = true;
break;
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
buffersRef.current.byId.delete(statusId);
buffersRef.current.order = buffersRef.current.order.filter(
(id) => id !== statusId,
);
refreshDerived();
if (!cancelled) {
buffersRef.current.interrupted = false;
conversationBusyRetriesRef.current = 0;
continue;
}
// User pressed ESC - fall through to error handling
}
// Reset conversation busy retry counter on non-busy error
conversationBusyRetriesRef.current = 0;

View File

@@ -448,6 +448,22 @@ export function getRetryStatusMessage(
if (errorDetail.includes("Anthropic API is overloaded"))
return "Anthropic API is overloaded, retrying...";
if (
errorDetail.includes("ChatGPT API error") ||
errorDetail.includes("ChatGPT server error") ||
errorDetail.includes("upstream connect error")
) {
return "OpenAI ChatGPT backend connection failed, retrying...";
}
if (
errorDetail.includes("Connection error during streaming") ||
errorDetail.includes("incomplete chunked read") ||
errorDetail.includes("connection termination")
) {
return "OpenAI ChatGPT streaming connection dropped, retrying...";
}
if (errorDetail.includes("OpenAI API error"))
return "OpenAI API error, retrying...";
return DEFAULT_RETRY_MESSAGE;
}

View File

@@ -226,8 +226,9 @@ export async function drainStream(
fallbackError = errorMessage;
}
// Set error stop reason so drainStreamWithResume can try to reconnect
stopReason = "error";
// Preserve a stop reason already parsed from stream chunks (e.g. llm_api_error)
// and only fall back to generic "error" when none is available.
stopReason = streamProcessor.stopReason || "error";
markIncompleteToolsAsCancelled(buffers, true, "stream_error");
queueMicrotask(refresh);
} finally {

View File

@@ -14,6 +14,8 @@ import {
getPreStreamErrorAction,
isApprovalPendingError,
isInvalidToolCallIdsError,
parseRetryAfterHeaderMs,
shouldRetryRunMetadataError,
} from "./agent/approval-recovery";
import { getClient } from "./agent/client";
import { setAgentContext, setConversationId } from "./agent/context";
@@ -1235,6 +1237,14 @@ ${SYSTEM_REMINDER_CLOSE}
errorDetail,
conversationBusyRetries,
CONVERSATION_BUSY_MAX_RETRIES,
{
status:
preStreamError instanceof APIError
? preStreamError.status
: undefined,
transientRetries: llmApiErrorRetries,
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
},
);
// Check for pending approval blocking new messages - resolve and retry.
@@ -1290,6 +1300,41 @@ ${SYSTEM_REMINDER_CLOSE}
continue;
}
if (preStreamAction === "retry_transient") {
const attempt = llmApiErrorRetries + 1;
const retryAfterMs =
preStreamError instanceof APIError
? parseRetryAfterHeaderMs(
preStreamError.headers?.get("retry-after"),
)
: null;
const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1);
llmApiErrorRetries = attempt;
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
session_id: sessionId,
uuid: `retry-pre-stream-${crypto.randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
const delaySeconds = Math.round(delayMs / 1000);
console.error(
`Transient API error before streaming (attempt ${attempt} of ${LLM_API_ERROR_MAX_RETRIES}), retrying in ${delaySeconds}s...`,
);
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
conversationBusyRetries = 0;
continue;
}
// Reset conversation busy retry counter on other errors
conversationBusyRetries = 0;
@@ -1696,31 +1741,9 @@ ${SYSTEM_REMINDER_CLOSE}
const errorType =
metaError?.error_type ?? metaError?.error?.error_type;
// Fallback: detect LLM provider errors from detail even if misclassified
// Patterns are derived from handle_llm_error() message formats in the backend
const detail = metaError?.detail ?? metaError?.error?.detail ?? "";
// Don't retry 4xx client errors (validation, auth, malformed requests)
// These are not transient and won't succeed on retry
const is4xxError = /Error code: 4\d{2}/.test(detail);
const llmProviderPatterns = [
"Anthropic API error", // anthropic_client.py:759
"OpenAI API error", // openai_client.py:1034
"Google Vertex API error", // google_vertex_client.py:848
"overloaded", // anthropic_client.py:753 - used for LLMProviderOverloaded
"api_error", // Anthropic SDK error type field
"Network error", // Transient network failures during streaming
"Connection error during Anthropic streaming", // Peer disconnections, incomplete chunked reads
];
const isLlmErrorFromDetail = llmProviderPatterns.some((pattern) =>
detail.includes(pattern),
);
if (
(errorType === "llm_error" || isLlmErrorFromDetail) &&
!is4xxError
) {
if (shouldRetryRunMetadataError(errorType, detail)) {
const attempt = llmApiErrorRetries + 1;
const baseDelayMs = 1000;
const delayMs = baseDelayMs * 2 ** (attempt - 1);
@@ -2397,6 +2420,7 @@ async function runBidirectionalMode(
let numTurns = 0;
let lastStopReason: StopReasonType | null = null; // Track for result subtype
let sawStreamError = false; // Track if we emitted an error during streaming
let preStreamTransientRetries = 0;
// Inject available skills as system-reminder for bidirectional mode (LET-7353)
let enrichedContent = userContent;
@@ -2468,7 +2492,14 @@ async function runBidirectionalMode(
// Route through shared pre-stream conflict classifier (parity with main loop + TUI)
// Bidir mode has no conversation-busy retry budget, so pass 0/0 to disable busy-retry.
const preStreamAction = getPreStreamErrorAction(errorDetail, 0, 0);
const preStreamAction = getPreStreamErrorAction(errorDetail, 0, 0, {
status:
preStreamError instanceof APIError
? preStreamError.status
: undefined,
transientRetries: preStreamTransientRetries,
maxTransientRetries: LLM_API_ERROR_MAX_RETRIES,
});
if (preStreamAction === "resolve_approval_pending") {
const recoveryMsg: RecoveryMessage = {
@@ -2484,8 +2515,35 @@ async function runBidirectionalMode(
continue;
}
if (preStreamAction === "retry_transient") {
const attempt = preStreamTransientRetries + 1;
const retryAfterMs =
preStreamError instanceof APIError
? parseRetryAfterHeaderMs(
preStreamError.headers?.get("retry-after"),
)
: null;
const delayMs = retryAfterMs ?? 1000 * 2 ** (attempt - 1);
preStreamTransientRetries = attempt;
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
session_id: sessionId,
uuid: `retry-bidir-${crypto.randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
await new Promise((resolve) => setTimeout(resolve, delayMs));
continue;
}
throw preStreamError;
}
preStreamTransientRetries = 0;
const streamJsonHook: DrainStreamHook = ({
chunk,
shouldOutput,

View File

@@ -23,6 +23,7 @@ describe("approval recovery wiring", () => {
expect(segment).toContain("getPreStreamErrorAction(");
expect(segment).toContain("shouldAttemptApprovalRecovery(");
expect(segment).toContain("rebuildInputWithFreshDenials(");
expect(segment).toContain('preStreamAction === "retry_transient"');
});
test("lazy recovery is not gated by hasApprovalInPayload", () => {

View File

@@ -0,0 +1,41 @@
import { describe, expect, test } from "bun:test";
import { readFileSync } from "node:fs";
import { fileURLToPath } from "node:url";
import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
import { createBuffers } from "../../cli/helpers/accumulator";
import { drainStream } from "../../cli/helpers/stream";
describe("drainStream stop reason wiring", () => {
test("catch path preserves streamProcessor.stopReason before falling back to error", () => {
const streamPath = fileURLToPath(
new URL("../../cli/helpers/stream.ts", import.meta.url),
);
const source = readFileSync(streamPath, "utf-8");
expect(source).toContain(
'stopReason = streamProcessor.stopReason || "error"',
);
});
test("preserves llm_api_error when stream throws after stop_reason chunk", async () => {
const fakeStream = {
controller: new AbortController(),
async *[Symbol.asyncIterator]() {
yield {
message_type: "stop_reason",
stop_reason: "llm_api_error",
} as LettaStreamingResponse;
throw new Error("peer closed connection");
},
} as unknown as Stream<LettaStreamingResponse>;
const result = await drainStream(
fakeStream,
createBuffers("agent-test"),
() => {},
);
expect(result.stopReason).toBe("llm_api_error");
});
});

View File

@@ -48,6 +48,7 @@ describe("headless approval recovery wiring", () => {
// Should use shared router, NOT bespoke isApprovalPendingError check
expect(segment).toContain("getPreStreamErrorAction(");
expect(segment).toContain('preStreamAction === "resolve_approval_pending"');
expect(segment).toContain('preStreamAction === "retry_transient"');
});
test("main loop pre-stream uses getPreStreamErrorAction router", () => {

View File

@@ -6,8 +6,13 @@ import {
isApprovalPendingError,
isConversationBusyError,
isInvalidToolCallIdsError,
isNonRetryableProviderErrorDetail,
isRetryableProviderErrorDetail,
parseRetryAfterHeaderMs,
rebuildInputWithFreshDenials,
shouldAttemptApprovalRecovery,
shouldRetryPreStreamTransientError,
shouldRetryRunMetadataError,
} from "../agent/turn-recovery-policy";
// ── Classifier parity ───────────────────────────────────────────────
@@ -120,6 +125,30 @@ describe("getPreStreamErrorAction", () => {
expect(getPreStreamErrorAction("Connection refused", 0, 3)).toBe("rethrow");
});
test("transient 5xx with retry budget → retry_transient", () => {
expect(
getPreStreamErrorAction(
"ChatGPT server error: upstream connect error",
0,
1,
{
status: 502,
transientRetries: 0,
maxTransientRetries: 3,
},
),
).toBe("retry_transient");
});
test("transient retry budget exhausted → rethrow", () => {
expect(
getPreStreamErrorAction("Connection error during streaming", 0, 1, {
transientRetries: 3,
maxTransientRetries: 3,
}),
).toBe("rethrow");
});
// Parity: TUI and headless both pass the same (detail, retries, max) triple
// to this function — verifying the action is deterministic from those inputs.
test("same inputs always produce same action (determinism)", () => {
@@ -132,6 +161,80 @@ describe("getPreStreamErrorAction", () => {
});
});
describe("provider detail retry helpers", () => {
test("detects retryable ChatGPT transient patterns", () => {
expect(
isRetryableProviderErrorDetail(
"ChatGPT server error: upstream connect error or disconnect/reset before headers",
),
).toBe(true);
expect(
isRetryableProviderErrorDetail(
"Connection error during streaming: incomplete chunked read",
),
).toBe(true);
});
test("detects non-retryable auth patterns", () => {
expect(
isNonRetryableProviderErrorDetail("OpenAI API error: invalid API key"),
).toBe(true);
expect(isNonRetryableProviderErrorDetail("Error code: 401")).toBe(true);
});
test("run metadata retry classification respects llm_error + non-retryable", () => {
expect(
shouldRetryRunMetadataError(
"llm_error",
"ChatGPT server error: upstream connect error",
),
).toBe(true);
expect(
shouldRetryRunMetadataError(
"llm_error",
"OpenAI API error: invalid_request_error",
),
).toBe(false);
});
test("pre-stream transient classifier handles status and detail", () => {
expect(
shouldRetryPreStreamTransientError({
status: 503,
detail: "server error",
}),
).toBe(true);
expect(
shouldRetryPreStreamTransientError({
status: 429,
detail: "rate limited",
}),
).toBe(true);
expect(
shouldRetryPreStreamTransientError({
status: 401,
detail: "unauthorized",
}),
).toBe(false);
expect(
shouldRetryPreStreamTransientError({
status: undefined,
detail: "Connection error during streaming",
}),
).toBe(true);
});
});
describe("parseRetryAfterHeaderMs", () => {
test("parses delta seconds", () => {
expect(parseRetryAfterHeaderMs("2")).toBe(2000);
});
test("returns null for invalid header", () => {
expect(parseRetryAfterHeaderMs("not-a-date")).toBeNull();
});
});
// ── Error text extraction ───────────────────────────────────────────
describe("extractConflictDetail", () => {