From 6a0bcdd6839e6ae9d2872d33921db59385b745cb Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Wed, 21 Jan 2026 14:57:48 -0800 Subject: [PATCH] feat: add 409 retry, error improvements, and queue restoration (#618) Co-authored-by: Letta --- src/agent/approval-recovery.ts | 17 +++ src/cli/App.tsx | 198 +++++++++++++++++++++++++----- src/cli/components/InputRich.tsx | 15 +++ src/cli/helpers/errorFormatter.ts | 5 +- src/headless.ts | 82 ++++++++++++- 5 files changed, 281 insertions(+), 36 deletions(-) diff --git a/src/agent/approval-recovery.ts b/src/agent/approval-recovery.ts index f420a96..10681ac 100644 --- a/src/agent/approval-recovery.ts +++ b/src/agent/approval-recovery.ts @@ -10,6 +10,11 @@ const APPROVAL_RECOVERY_DETAIL_FRAGMENT = // This is the CONFLICT error - opposite of desync const APPROVAL_PENDING_DETAIL_FRAGMENT = "cannot send a new message"; +// Error when conversation is busy (another request is being processed) +// This is a 409 CONFLICT when trying to send while a run is active +const CONVERSATION_BUSY_DETAIL_FRAGMENT = + "another request is currently being processed"; + type RunErrorMetadata = | { error_type?: string; @@ -38,6 +43,18 @@ export function isApprovalPendingError(detail: unknown): boolean { return detail.toLowerCase().includes(APPROVAL_PENDING_DETAIL_FRAGMENT); } +/** + * Check if error indicates the conversation is busy (another request is being processed). + * This is a 409 CONFLICT when trying to send a message while a run is still active. + * + * Error format: + * { detail: "CONFLICT: Cannot send a new message: Another request is currently being processed..." } + */ +export function isConversationBusyError(detail: unknown): boolean { + if (typeof detail !== "string") return false; + return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT); +} + export async function fetchRunErrorDetail( runId: string | null | undefined, ): Promise { diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 91c6f96..286c446 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -33,6 +33,7 @@ import { fetchRunErrorDetail, isApprovalPendingError, isApprovalStateDesyncError, + isConversationBusyError, } from "../agent/approval-recovery"; import { prefetchAvailableModelHandles } from "../agent/available-models"; import { getResumeData } from "../agent/check-approval"; @@ -192,9 +193,17 @@ const EAGER_CANCEL = true; // Maximum retries for transient LLM API errors (matches headless.ts) const LLM_API_ERROR_MAX_RETRIES = 3; +// Retry config for 409 "conversation busy" errors +const CONVERSATION_BUSY_MAX_RETRIES = 1; // Only retry once, fail on 2nd 409 +const CONVERSATION_BUSY_RETRY_DELAY_MS = 2500; // 2.5 seconds + // Message shown when user interrupts the stream const INTERRUPT_MESSAGE = - "Interrupted – tell the agent what to do differently. Something went wrong? Use /feedback to report the issue."; + "Interrupted – tell the agent what to do differently. Something went wrong? Use /feedback to report issues."; + +// Hint shown after errors to encourage feedback +const ERROR_FEEDBACK_HINT = + "Something went wrong? Use /feedback to report issues."; // tiny helper for unique ids (avoid overwriting prior user lines) function uid(prefix: string) { @@ -982,6 +991,9 @@ export default function App({ // Retry counter for transient LLM API errors (ref for synchronous access in loop) const llmApiErrorRetriesRef = useRef(0); + // Retry counter for 409 "conversation busy" errors + const conversationBusyRetriesRef = useRef(0); + // Message queue state for queueing messages during streaming const [messageQueue, setMessageQueue] = useState([]); @@ -998,6 +1010,13 @@ export default function App({ // Incremented when userCancelledRef is reset while messages are queued const [dequeueEpoch, setDequeueEpoch] = useState(0); + // Track last dequeued message for restoration on error + // If an error occurs after dequeue, we restore this to the input field (if input is empty) + const lastDequeuedMessageRef = useRef(null); + + // Restored input value - set when we need to restore a message to the input after error + const [restoredInput, setRestoredInput] = useState(null); + // Helper to check if agent is busy (streaming, executing tool, or running command) // Uses refs for synchronous access outside React's closure system // biome-ignore lint/correctness/useExhaustiveDependencies: refs are stable objects, .current is read dynamically @@ -1729,9 +1748,10 @@ export default function App({ } processingConversationRef.current += 1; - // Reset retry counter for new conversation turns (fresh budget per user message) + // Reset retry counters for new conversation turns (fresh budget per user message) if (!allowReentry) { llmApiErrorRetriesRef.current = 0; + conversationBusyRetriesRef.current = 0; } // Track last run ID for error reporting (accessible in catch block) @@ -1796,41 +1816,93 @@ export default function App({ { agentId: agentIdRef.current }, ); } catch (preStreamError) { + // Extract error detail from APIError (handles both direct and nested structures) + // Direct: e.error.detail | Nested: e.error.error.detail (matches formatErrorDetails) + let errorDetail = ""; + if ( + preStreamError instanceof APIError && + preStreamError.error && + typeof preStreamError.error === "object" + ) { + const errObj = preStreamError.error as Record; + // Check nested structure first: e.error.error.detail + if ( + errObj.error && + typeof errObj.error === "object" && + "detail" in errObj.error + ) { + const nested = errObj.error as Record; + errorDetail = + typeof nested.detail === "string" ? nested.detail : ""; + } + // Fallback to direct structure: e.error.detail + if (!errorDetail && typeof errObj.detail === "string") { + errorDetail = errObj.detail; + } + } + // Final fallback: use Error.message + if (!errorDetail && preStreamError instanceof Error) { + errorDetail = preStreamError.message; + } + + // Check for 409 "conversation busy" error - retry once with delay + if ( + isConversationBusyError(errorDetail) && + conversationBusyRetriesRef.current < CONVERSATION_BUSY_MAX_RETRIES + ) { + conversationBusyRetriesRef.current += 1; + + // Show status message + const statusId = uid("status"); + buffersRef.current.byId.set(statusId, { + kind: "status", + id: statusId, + lines: ["Conversation is busy, waiting and retrying…"], + }); + buffersRef.current.order.push(statusId); + refreshDerived(); + + // Wait with abort checking (same pattern as LLM API error retry) + let cancelled = false; + const startTime = Date.now(); + while ( + Date.now() - startTime < + CONVERSATION_BUSY_RETRY_DELAY_MS + ) { + if ( + abortControllerRef.current?.signal.aborted || + userCancelledRef.current + ) { + cancelled = true; + break; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + // Remove status message + buffersRef.current.byId.delete(statusId); + buffersRef.current.order = buffersRef.current.order.filter( + (id) => id !== statusId, + ); + refreshDerived(); + + if (!cancelled) { + // Reset interrupted flag so retry stream chunks are processed + buffersRef.current.interrupted = false; + continue; + } + // User pressed ESC - fall through to error handling + } + + // Reset conversation busy retry counter on non-busy error + conversationBusyRetriesRef.current = 0; + // Check if this is a pre-stream approval desync error const hasApprovalInPayload = currentInput.some( (item) => item?.type === "approval", ); if (hasApprovalInPayload) { - // Extract error detail from APIError (handles both direct and nested structures) - // Direct: e.error.detail | Nested: e.error.error.detail (matches formatErrorDetails) - let errorDetail = ""; - if ( - preStreamError instanceof APIError && - preStreamError.error && - typeof preStreamError.error === "object" - ) { - const errObj = preStreamError.error as Record; - // Check nested structure first: e.error.error.detail - if ( - errObj.error && - typeof errObj.error === "object" && - "detail" in errObj.error - ) { - const nested = errObj.error as Record; - errorDetail = - typeof nested.detail === "string" ? nested.detail : ""; - } - // Fallback to direct structure: e.error.detail - if (!errorDetail && typeof errObj.detail === "string") { - errorDetail = errObj.detail; - } - } - // Final fallback: use Error.message - if (!errorDetail && preStreamError instanceof Error) { - errorDetail = preStreamError.message; - } - // If desync detected and retries available, recover with keep-alive prompt if ( isApprovalStateDesyncError(errorDetail) && @@ -2012,6 +2084,8 @@ export default function App({ if (stopReasonToHandle === "end_turn") { setStreaming(false); llmApiErrorRetriesRef.current = 0; // Reset retry counter on success + conversationBusyRetriesRef.current = 0; + lastDequeuedMessageRef.current = null; // Clear - message was processed successfully // Disable eager approval check after first successful message (LET-7101) // Any new approvals from here on are from our own turn, not orphaned @@ -2650,6 +2724,16 @@ export default function App({ lastFailureMessage || `An error occurred during agent execution\n(run_id: ${lastRunId ?? "unknown"}, stop_reason: ${stopReasonToHandle})`; appendError(errorToShow, true); + appendError(ERROR_FEEDBACK_HINT, true); + + // Restore dequeued message to input on error + if (lastDequeuedMessageRef.current) { + setRestoredInput(lastDequeuedMessageRef.current); + lastDequeuedMessageRef.current = null; + } + // Clear any remaining queue on error + setMessageQueue([]); + setStreaming(false); sendDesktopNotification(); refreshDerived(); @@ -2761,8 +2845,9 @@ export default function App({ // User pressed ESC - fall through to error handling } - // Reset retry counter on non-retriable error (or max retries exceeded) + // Reset retry counters on non-retriable error (or max retries exceeded) llmApiErrorRetriesRef.current = 0; + conversationBusyRetriesRef.current = 0; // Mark incomplete tool calls as finished to prevent stuck blinking UI markIncompleteToolsAsCancelled( @@ -2792,6 +2877,16 @@ export default function App({ ? `Stream error: ${fallbackError}\n(run_id: ${lastRunId})` : `Stream error: ${fallbackError}`; appendError(errorMsg, true); // Skip telemetry - already tracked above + appendError(ERROR_FEEDBACK_HINT, true); + + // Restore dequeued message to input on error + if (lastDequeuedMessageRef.current) { + setRestoredInput(lastDequeuedMessageRef.current); + lastDequeuedMessageRef.current = null; + } + // Clear any remaining queue on error + setMessageQueue([]); + setStreaming(false); sendDesktopNotification(); // Notify user of error refreshDerived(); @@ -2824,12 +2919,14 @@ export default function App({ agentIdRef.current, ); appendError(errorDetails, true); // Skip telemetry - already tracked above + appendError(ERROR_FEEDBACK_HINT, true); } else { // No error metadata, show generic error with run info appendError( `An error occurred during agent execution\n(run_id: ${lastRunId}, stop_reason: ${stopReason})`, true, // Skip telemetry - already tracked above ); + appendError(ERROR_FEEDBACK_HINT, true); } } catch (_e) { // If we can't fetch error details, show generic error @@ -2837,6 +2934,19 @@ export default function App({ `An error occurred during agent execution\n(run_id: ${lastRunId}, stop_reason: ${stopReason})\n(Unable to fetch additional error details from server)`, true, // Skip telemetry - already tracked above ); + appendError(ERROR_FEEDBACK_HINT, true); + + // Restore dequeued message to input on error + if (lastDequeuedMessageRef.current) { + setRestoredInput(lastDequeuedMessageRef.current); + lastDequeuedMessageRef.current = null; + } + // Clear any remaining queue on error + setMessageQueue([]); + + setStreaming(false); + sendDesktopNotification(); + refreshDerived(); return; } } else { @@ -2845,8 +2955,17 @@ export default function App({ `An error occurred during agent execution\n(stop_reason: ${stopReason})`, true, // Skip telemetry - already tracked above ); + appendError(ERROR_FEEDBACK_HINT, true); } + // Restore dequeued message to input on error + if (lastDequeuedMessageRef.current) { + setRestoredInput(lastDequeuedMessageRef.current); + lastDequeuedMessageRef.current = null; + } + // Clear any remaining queue on error + setMessageQueue([]); + setStreaming(false); sendDesktopNotification(); // Notify user of error refreshDerived(); @@ -2891,6 +3010,16 @@ export default function App({ // Use comprehensive error formatting const errorDetails = formatErrorDetails(e, agentIdRef.current); appendError(errorDetails, true); // Skip telemetry - already tracked above with more context + appendError(ERROR_FEEDBACK_HINT, true); + + // Restore dequeued message to input on error (Input component will only use if empty) + if (lastDequeuedMessageRef.current) { + setRestoredInput(lastDequeuedMessageRef.current); + lastDequeuedMessageRef.current = null; + } + // Clear any remaining queue on error + setMessageQueue([]); + setStreaming(false); sendDesktopNotification(); // Notify user of error refreshDerived(); @@ -6394,6 +6523,9 @@ DO NOT respond to these messages or otherwise consider them in your response unl "queue", `Dequeuing ${messageQueue.length} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`, ); + + // Store the message before clearing queue - allows restoration on error + lastDequeuedMessageRef.current = concatenatedMessage; setMessageQueue([]); // Submit the concatenated message using the normal submit flow @@ -8097,6 +8229,8 @@ Plan file path: ${planFilePath}`; onRalphExit={handleRalphExit} conversationId={conversationId} onPasteError={handlePasteError} + restoredInput={restoredInput} + onRestoredInputConsumed={() => setRestoredInput(null)} /> diff --git a/src/cli/components/InputRich.tsx b/src/cli/components/InputRich.tsx index ee22881..591be64 100644 --- a/src/cli/components/InputRich.tsx +++ b/src/cli/components/InputRich.tsx @@ -140,6 +140,8 @@ export function Input({ onRalphExit, conversationId, onPasteError, + restoredInput, + onRestoredInputConsumed, }: { visible?: boolean; streaming: boolean; @@ -165,6 +167,8 @@ export function Input({ onRalphExit?: () => void; conversationId?: string; onPasteError?: (message: string) => void; + restoredInput?: string | null; + onRestoredInputConsumed?: () => void; }) { const [value, setValue] = useState(""); const [escapePressed, setEscapePressed] = useState(false); @@ -191,6 +195,17 @@ export function Input({ // Bash mode state const [isBashMode, setIsBashMode] = useState(false); + // Restore input from error (only if current value is empty) + useEffect(() => { + if (restoredInput && value === "") { + setValue(restoredInput); + onRestoredInputConsumed?.(); + } else if (restoredInput && value !== "") { + // Input has content, don't clobber - just consume the restored value + onRestoredInputConsumed?.(); + } + }, [restoredInput, value, onRestoredInputConsumed]); + const handleBangAtEmpty = () => { if (isBashMode) return false; setIsBashMode(true); diff --git a/src/cli/helpers/errorFormatter.ts b/src/cli/helpers/errorFormatter.ts index 89164fa..fbc13c8 100644 --- a/src/cli/helpers/errorFormatter.ts +++ b/src/cli/helpers/errorFormatter.ts @@ -88,7 +88,10 @@ export function formatErrorDetails( runId = e.error.run_id; } - const baseError = detail ? `${e.message}\nDetail: ${detail}` : e.message; + // When detail is available, prefer showing just the detail to avoid redundancy + // (e.message often contains the full JSON body like '409 {"detail":"CONFLICT: ..."}') + const baseError = + detail && typeof detail === "string" ? detail : e.message; return runId && agentId ? `${baseError}\n${createAgentLink(runId, agentId, conversationId)}` : baseError; diff --git a/src/headless.ts b/src/headless.ts index 5961076..f71c1e0 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -13,6 +13,7 @@ import { fetchRunErrorDetail, isApprovalPendingError, isApprovalStateDesyncError, + isConversationBusyError, } from "./agent/approval-recovery"; import { getClient } from "./agent/client"; import { initializeLoadedSkillsFlag, setAgentContext } from "./agent/context"; @@ -59,6 +60,10 @@ import { // caller to manually resubmit the prompt. const LLM_API_ERROR_MAX_RETRIES = 3; +// Retry config for 409 "conversation busy" errors +const CONVERSATION_BUSY_MAX_RETRIES = 1; // Only retry once, fail on 2nd 409 +const CONVERSATION_BUSY_RETRY_DELAY_MS = 2500; // 2.5 seconds + export async function handleHeadlessCommand( argv: string[], model?: string, @@ -945,15 +950,83 @@ export async function handleHeadlessCommand( // Track lastRunId outside the while loop so it's available in catch block let lastKnownRunId: string | null = null; let llmApiErrorRetries = 0; + let conversationBusyRetries = 0; markMilestone("HEADLESS_FIRST_STREAM_START"); measureSinceMilestone("headless-setup-total", "HEADLESS_CLIENT_READY"); try { while (true) { - const stream = await sendMessageStream(conversationId, currentInput, { - agentId: agent.id, - }); + // Wrap sendMessageStream in try-catch to handle pre-stream errors (e.g., 409) + let stream: Awaited>; + try { + stream = await sendMessageStream(conversationId, currentInput, { + agentId: agent.id, + }); + } catch (preStreamError) { + // Extract error detail from APIError + let errorDetail = ""; + if ( + preStreamError instanceof APIError && + preStreamError.error && + typeof preStreamError.error === "object" + ) { + const errObj = preStreamError.error as Record; + if ( + errObj.error && + typeof errObj.error === "object" && + "detail" in errObj.error + ) { + const nested = errObj.error as Record; + errorDetail = + typeof nested.detail === "string" ? nested.detail : ""; + } + if (!errorDetail && typeof errObj.detail === "string") { + errorDetail = errObj.detail; + } + } + if (!errorDetail && preStreamError instanceof Error) { + errorDetail = preStreamError.message; + } + + // Check for 409 "conversation busy" error - retry once with delay + if ( + isConversationBusyError(errorDetail) && + conversationBusyRetries < CONVERSATION_BUSY_MAX_RETRIES + ) { + conversationBusyRetries += 1; + + // Emit retry message for stream-json mode + if (outputFormat === "stream-json") { + const retryMsg: RetryMessage = { + type: "retry", + reason: "error", // 409 conversation busy is a pre-stream error + attempt: conversationBusyRetries, + max_attempts: CONVERSATION_BUSY_MAX_RETRIES, + delay_ms: CONVERSATION_BUSY_RETRY_DELAY_MS, + session_id: sessionId, + uuid: `retry-conversation-busy-${crypto.randomUUID()}`, + }; + console.log(JSON.stringify(retryMsg)); + } else { + console.error( + `Conversation is busy, waiting ${CONVERSATION_BUSY_RETRY_DELAY_MS / 1000}s and retrying...`, + ); + } + + // Wait before retry + await new Promise((resolve) => + setTimeout(resolve, CONVERSATION_BUSY_RETRY_DELAY_MS), + ); + continue; + } + + // Reset conversation busy retry counter on other errors + conversationBusyRetries = 0; + + // Re-throw to outer catch for other errors + throw preStreamError; + } // For stream-json, output each chunk as it arrives let stopReason: StopReasonType | null = null; @@ -1147,6 +1220,9 @@ export async function handleHeadlessCommand( // Case 1: Turn ended normally if (stopReason === "end_turn") { + // Reset retry counters on success + llmApiErrorRetries = 0; + conversationBusyRetries = 0; break; }