feat: add 409 retry, error improvements, and queue restoration (#618)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-21 14:57:48 -08:00
committed by GitHub
parent 802136c868
commit 6a0bcdd683
5 changed files with 281 additions and 36 deletions

View File

@@ -10,6 +10,11 @@ const APPROVAL_RECOVERY_DETAIL_FRAGMENT =
// This is the CONFLICT error - opposite of desync
const APPROVAL_PENDING_DETAIL_FRAGMENT = "cannot send a new message";
// Error when conversation is busy (another request is being processed)
// This is a 409 CONFLICT when trying to send while a run is active
const CONVERSATION_BUSY_DETAIL_FRAGMENT =
"another request is currently being processed";
type RunErrorMetadata =
| {
error_type?: string;
@@ -38,6 +43,18 @@ export function isApprovalPendingError(detail: unknown): boolean {
return detail.toLowerCase().includes(APPROVAL_PENDING_DETAIL_FRAGMENT);
}
/**
* Check if error indicates the conversation is busy (another request is being processed).
* This is a 409 CONFLICT when trying to send a message while a run is still active.
*
* Error format:
* { detail: "CONFLICT: Cannot send a new message: Another request is currently being processed..." }
*/
export function isConversationBusyError(detail: unknown): boolean {
if (typeof detail !== "string") return false;
return detail.toLowerCase().includes(CONVERSATION_BUSY_DETAIL_FRAGMENT);
}
export async function fetchRunErrorDetail(
runId: string | null | undefined,
): Promise<string | null> {

View File

@@ -33,6 +33,7 @@ import {
fetchRunErrorDetail,
isApprovalPendingError,
isApprovalStateDesyncError,
isConversationBusyError,
} from "../agent/approval-recovery";
import { prefetchAvailableModelHandles } from "../agent/available-models";
import { getResumeData } from "../agent/check-approval";
@@ -192,9 +193,17 @@ const EAGER_CANCEL = true;
// Maximum retries for transient LLM API errors (matches headless.ts)
const LLM_API_ERROR_MAX_RETRIES = 3;
// Retry config for 409 "conversation busy" errors
const CONVERSATION_BUSY_MAX_RETRIES = 1; // Only retry once, fail on 2nd 409
const CONVERSATION_BUSY_RETRY_DELAY_MS = 2500; // 2.5 seconds
// Message shown when user interrupts the stream
const INTERRUPT_MESSAGE =
"Interrupted tell the agent what to do differently. Something went wrong? Use /feedback to report the issue.";
"Interrupted tell the agent what to do differently. Something went wrong? Use /feedback to report issues.";
// Hint shown after errors to encourage feedback
const ERROR_FEEDBACK_HINT =
"Something went wrong? Use /feedback to report issues.";
// tiny helper for unique ids (avoid overwriting prior user lines)
function uid(prefix: string) {
@@ -982,6 +991,9 @@ export default function App({
// Retry counter for transient LLM API errors (ref for synchronous access in loop)
const llmApiErrorRetriesRef = useRef(0);
// Retry counter for 409 "conversation busy" errors
const conversationBusyRetriesRef = useRef(0);
// Message queue state for queueing messages during streaming
const [messageQueue, setMessageQueue] = useState<string[]>([]);
@@ -998,6 +1010,13 @@ export default function App({
// Incremented when userCancelledRef is reset while messages are queued
const [dequeueEpoch, setDequeueEpoch] = useState(0);
// Track last dequeued message for restoration on error
// If an error occurs after dequeue, we restore this to the input field (if input is empty)
const lastDequeuedMessageRef = useRef<string | null>(null);
// Restored input value - set when we need to restore a message to the input after error
const [restoredInput, setRestoredInput] = useState<string | null>(null);
// Helper to check if agent is busy (streaming, executing tool, or running command)
// Uses refs for synchronous access outside React's closure system
// biome-ignore lint/correctness/useExhaustiveDependencies: refs are stable objects, .current is read dynamically
@@ -1729,9 +1748,10 @@ export default function App({
}
processingConversationRef.current += 1;
// Reset retry counter for new conversation turns (fresh budget per user message)
// Reset retry counters for new conversation turns (fresh budget per user message)
if (!allowReentry) {
llmApiErrorRetriesRef.current = 0;
conversationBusyRetriesRef.current = 0;
}
// Track last run ID for error reporting (accessible in catch block)
@@ -1796,41 +1816,93 @@ export default function App({
{ agentId: agentIdRef.current },
);
} catch (preStreamError) {
// Extract error detail from APIError (handles both direct and nested structures)
// Direct: e.error.detail | Nested: e.error.error.detail (matches formatErrorDetails)
let errorDetail = "";
if (
preStreamError instanceof APIError &&
preStreamError.error &&
typeof preStreamError.error === "object"
) {
const errObj = preStreamError.error as Record<string, unknown>;
// Check nested structure first: e.error.error.detail
if (
errObj.error &&
typeof errObj.error === "object" &&
"detail" in errObj.error
) {
const nested = errObj.error as Record<string, unknown>;
errorDetail =
typeof nested.detail === "string" ? nested.detail : "";
}
// Fallback to direct structure: e.error.detail
if (!errorDetail && typeof errObj.detail === "string") {
errorDetail = errObj.detail;
}
}
// Final fallback: use Error.message
if (!errorDetail && preStreamError instanceof Error) {
errorDetail = preStreamError.message;
}
// Check for 409 "conversation busy" error - retry once with delay
if (
isConversationBusyError(errorDetail) &&
conversationBusyRetriesRef.current < CONVERSATION_BUSY_MAX_RETRIES
) {
conversationBusyRetriesRef.current += 1;
// Show status message
const statusId = uid("status");
buffersRef.current.byId.set(statusId, {
kind: "status",
id: statusId,
lines: ["Conversation is busy, waiting and retrying…"],
});
buffersRef.current.order.push(statusId);
refreshDerived();
// Wait with abort checking (same pattern as LLM API error retry)
let cancelled = false;
const startTime = Date.now();
while (
Date.now() - startTime <
CONVERSATION_BUSY_RETRY_DELAY_MS
) {
if (
abortControllerRef.current?.signal.aborted ||
userCancelledRef.current
) {
cancelled = true;
break;
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
// Remove status message
buffersRef.current.byId.delete(statusId);
buffersRef.current.order = buffersRef.current.order.filter(
(id) => id !== statusId,
);
refreshDerived();
if (!cancelled) {
// Reset interrupted flag so retry stream chunks are processed
buffersRef.current.interrupted = false;
continue;
}
// User pressed ESC - fall through to error handling
}
// Reset conversation busy retry counter on non-busy error
conversationBusyRetriesRef.current = 0;
// Check if this is a pre-stream approval desync error
const hasApprovalInPayload = currentInput.some(
(item) => item?.type === "approval",
);
if (hasApprovalInPayload) {
// Extract error detail from APIError (handles both direct and nested structures)
// Direct: e.error.detail | Nested: e.error.error.detail (matches formatErrorDetails)
let errorDetail = "";
if (
preStreamError instanceof APIError &&
preStreamError.error &&
typeof preStreamError.error === "object"
) {
const errObj = preStreamError.error as Record<string, unknown>;
// Check nested structure first: e.error.error.detail
if (
errObj.error &&
typeof errObj.error === "object" &&
"detail" in errObj.error
) {
const nested = errObj.error as Record<string, unknown>;
errorDetail =
typeof nested.detail === "string" ? nested.detail : "";
}
// Fallback to direct structure: e.error.detail
if (!errorDetail && typeof errObj.detail === "string") {
errorDetail = errObj.detail;
}
}
// Final fallback: use Error.message
if (!errorDetail && preStreamError instanceof Error) {
errorDetail = preStreamError.message;
}
// If desync detected and retries available, recover with keep-alive prompt
if (
isApprovalStateDesyncError(errorDetail) &&
@@ -2012,6 +2084,8 @@ export default function App({
if (stopReasonToHandle === "end_turn") {
setStreaming(false);
llmApiErrorRetriesRef.current = 0; // Reset retry counter on success
conversationBusyRetriesRef.current = 0;
lastDequeuedMessageRef.current = null; // Clear - message was processed successfully
// Disable eager approval check after first successful message (LET-7101)
// Any new approvals from here on are from our own turn, not orphaned
@@ -2650,6 +2724,16 @@ export default function App({
lastFailureMessage ||
`An error occurred during agent execution\n(run_id: ${lastRunId ?? "unknown"}, stop_reason: ${stopReasonToHandle})`;
appendError(errorToShow, true);
appendError(ERROR_FEEDBACK_HINT, true);
// Restore dequeued message to input on error
if (lastDequeuedMessageRef.current) {
setRestoredInput(lastDequeuedMessageRef.current);
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
setMessageQueue([]);
setStreaming(false);
sendDesktopNotification();
refreshDerived();
@@ -2761,8 +2845,9 @@ export default function App({
// User pressed ESC - fall through to error handling
}
// Reset retry counter on non-retriable error (or max retries exceeded)
// Reset retry counters on non-retriable error (or max retries exceeded)
llmApiErrorRetriesRef.current = 0;
conversationBusyRetriesRef.current = 0;
// Mark incomplete tool calls as finished to prevent stuck blinking UI
markIncompleteToolsAsCancelled(
@@ -2792,6 +2877,16 @@ export default function App({
? `Stream error: ${fallbackError}\n(run_id: ${lastRunId})`
: `Stream error: ${fallbackError}`;
appendError(errorMsg, true); // Skip telemetry - already tracked above
appendError(ERROR_FEEDBACK_HINT, true);
// Restore dequeued message to input on error
if (lastDequeuedMessageRef.current) {
setRestoredInput(lastDequeuedMessageRef.current);
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
setMessageQueue([]);
setStreaming(false);
sendDesktopNotification(); // Notify user of error
refreshDerived();
@@ -2824,12 +2919,14 @@ export default function App({
agentIdRef.current,
);
appendError(errorDetails, true); // Skip telemetry - already tracked above
appendError(ERROR_FEEDBACK_HINT, true);
} else {
// No error metadata, show generic error with run info
appendError(
`An error occurred during agent execution\n(run_id: ${lastRunId}, stop_reason: ${stopReason})`,
true, // Skip telemetry - already tracked above
);
appendError(ERROR_FEEDBACK_HINT, true);
}
} catch (_e) {
// If we can't fetch error details, show generic error
@@ -2837,6 +2934,19 @@ export default function App({
`An error occurred during agent execution\n(run_id: ${lastRunId}, stop_reason: ${stopReason})\n(Unable to fetch additional error details from server)`,
true, // Skip telemetry - already tracked above
);
appendError(ERROR_FEEDBACK_HINT, true);
// Restore dequeued message to input on error
if (lastDequeuedMessageRef.current) {
setRestoredInput(lastDequeuedMessageRef.current);
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
setMessageQueue([]);
setStreaming(false);
sendDesktopNotification();
refreshDerived();
return;
}
} else {
@@ -2845,8 +2955,17 @@ export default function App({
`An error occurred during agent execution\n(stop_reason: ${stopReason})`,
true, // Skip telemetry - already tracked above
);
appendError(ERROR_FEEDBACK_HINT, true);
}
// Restore dequeued message to input on error
if (lastDequeuedMessageRef.current) {
setRestoredInput(lastDequeuedMessageRef.current);
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
setMessageQueue([]);
setStreaming(false);
sendDesktopNotification(); // Notify user of error
refreshDerived();
@@ -2891,6 +3010,16 @@ export default function App({
// Use comprehensive error formatting
const errorDetails = formatErrorDetails(e, agentIdRef.current);
appendError(errorDetails, true); // Skip telemetry - already tracked above with more context
appendError(ERROR_FEEDBACK_HINT, true);
// Restore dequeued message to input on error (Input component will only use if empty)
if (lastDequeuedMessageRef.current) {
setRestoredInput(lastDequeuedMessageRef.current);
lastDequeuedMessageRef.current = null;
}
// Clear any remaining queue on error
setMessageQueue([]);
setStreaming(false);
sendDesktopNotification(); // Notify user of error
refreshDerived();
@@ -6394,6 +6523,9 @@ DO NOT respond to these messages or otherwise consider them in your response unl
"queue",
`Dequeuing ${messageQueue.length} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`,
);
// Store the message before clearing queue - allows restoration on error
lastDequeuedMessageRef.current = concatenatedMessage;
setMessageQueue([]);
// Submit the concatenated message using the normal submit flow
@@ -8097,6 +8229,8 @@ Plan file path: ${planFilePath}`;
onRalphExit={handleRalphExit}
conversationId={conversationId}
onPasteError={handlePasteError}
restoredInput={restoredInput}
onRestoredInputConsumed={() => setRestoredInput(null)}
/>
</Box>

View File

@@ -140,6 +140,8 @@ export function Input({
onRalphExit,
conversationId,
onPasteError,
restoredInput,
onRestoredInputConsumed,
}: {
visible?: boolean;
streaming: boolean;
@@ -165,6 +167,8 @@ export function Input({
onRalphExit?: () => void;
conversationId?: string;
onPasteError?: (message: string) => void;
restoredInput?: string | null;
onRestoredInputConsumed?: () => void;
}) {
const [value, setValue] = useState("");
const [escapePressed, setEscapePressed] = useState(false);
@@ -191,6 +195,17 @@ export function Input({
// Bash mode state
const [isBashMode, setIsBashMode] = useState(false);
// Restore input from error (only if current value is empty)
useEffect(() => {
if (restoredInput && value === "") {
setValue(restoredInput);
onRestoredInputConsumed?.();
} else if (restoredInput && value !== "") {
// Input has content, don't clobber - just consume the restored value
onRestoredInputConsumed?.();
}
}, [restoredInput, value, onRestoredInputConsumed]);
const handleBangAtEmpty = () => {
if (isBashMode) return false;
setIsBashMode(true);

View File

@@ -88,7 +88,10 @@ export function formatErrorDetails(
runId = e.error.run_id;
}
const baseError = detail ? `${e.message}\nDetail: ${detail}` : e.message;
// When detail is available, prefer showing just the detail to avoid redundancy
// (e.message often contains the full JSON body like '409 {"detail":"CONFLICT: ..."}')
const baseError =
detail && typeof detail === "string" ? detail : e.message;
return runId && agentId
? `${baseError}\n${createAgentLink(runId, agentId, conversationId)}`
: baseError;

View File

@@ -13,6 +13,7 @@ import {
fetchRunErrorDetail,
isApprovalPendingError,
isApprovalStateDesyncError,
isConversationBusyError,
} from "./agent/approval-recovery";
import { getClient } from "./agent/client";
import { initializeLoadedSkillsFlag, setAgentContext } from "./agent/context";
@@ -59,6 +60,10 @@ import {
// caller to manually resubmit the prompt.
const LLM_API_ERROR_MAX_RETRIES = 3;
// Retry config for 409 "conversation busy" errors
const CONVERSATION_BUSY_MAX_RETRIES = 1; // Only retry once, fail on 2nd 409
const CONVERSATION_BUSY_RETRY_DELAY_MS = 2500; // 2.5 seconds
export async function handleHeadlessCommand(
argv: string[],
model?: string,
@@ -945,15 +950,83 @@ export async function handleHeadlessCommand(
// Track lastRunId outside the while loop so it's available in catch block
let lastKnownRunId: string | null = null;
let llmApiErrorRetries = 0;
let conversationBusyRetries = 0;
markMilestone("HEADLESS_FIRST_STREAM_START");
measureSinceMilestone("headless-setup-total", "HEADLESS_CLIENT_READY");
try {
while (true) {
const stream = await sendMessageStream(conversationId, currentInput, {
agentId: agent.id,
});
// Wrap sendMessageStream in try-catch to handle pre-stream errors (e.g., 409)
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
try {
stream = await sendMessageStream(conversationId, currentInput, {
agentId: agent.id,
});
} catch (preStreamError) {
// Extract error detail from APIError
let errorDetail = "";
if (
preStreamError instanceof APIError &&
preStreamError.error &&
typeof preStreamError.error === "object"
) {
const errObj = preStreamError.error as Record<string, unknown>;
if (
errObj.error &&
typeof errObj.error === "object" &&
"detail" in errObj.error
) {
const nested = errObj.error as Record<string, unknown>;
errorDetail =
typeof nested.detail === "string" ? nested.detail : "";
}
if (!errorDetail && typeof errObj.detail === "string") {
errorDetail = errObj.detail;
}
}
if (!errorDetail && preStreamError instanceof Error) {
errorDetail = preStreamError.message;
}
// Check for 409 "conversation busy" error - retry once with delay
if (
isConversationBusyError(errorDetail) &&
conversationBusyRetries < CONVERSATION_BUSY_MAX_RETRIES
) {
conversationBusyRetries += 1;
// Emit retry message for stream-json mode
if (outputFormat === "stream-json") {
const retryMsg: RetryMessage = {
type: "retry",
reason: "error", // 409 conversation busy is a pre-stream error
attempt: conversationBusyRetries,
max_attempts: CONVERSATION_BUSY_MAX_RETRIES,
delay_ms: CONVERSATION_BUSY_RETRY_DELAY_MS,
session_id: sessionId,
uuid: `retry-conversation-busy-${crypto.randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
console.error(
`Conversation is busy, waiting ${CONVERSATION_BUSY_RETRY_DELAY_MS / 1000}s and retrying...`,
);
}
// Wait before retry
await new Promise((resolve) =>
setTimeout(resolve, CONVERSATION_BUSY_RETRY_DELAY_MS),
);
continue;
}
// Reset conversation busy retry counter on other errors
conversationBusyRetries = 0;
// Re-throw to outer catch for other errors
throw preStreamError;
}
// For stream-json, output each chunk as it arrives
let stopReason: StopReasonType | null = null;
@@ -1147,6 +1220,9 @@ export async function handleHeadlessCommand(
// Case 1: Turn ended normally
if (stopReason === "end_turn") {
// Reset retry counters on success
llmApiErrorRetries = 0;
conversationBusyRetries = 0;
break;
}