From 57875dedd134f57411b4caba07f5750f49a7e463 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 6 Jan 2026 16:14:24 -0800 Subject: [PATCH] fix: prevent stale processConversation calls from affecting UI state (#480) Co-authored-by: Letta --- src/cli/App.tsx | 89 ++++++++++++++++++++++++++++++---- src/cli/helpers/accumulator.ts | 14 +++++- 2 files changed, 91 insertions(+), 12 deletions(-) diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 0522bef..7c14fad 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -1,6 +1,7 @@ // src/cli/App.tsx import { existsSync, readFileSync, writeFileSync } from "node:fs"; + import { APIUserAbortError } from "@letta-ai/letta-client/core/error"; import type { AgentState, @@ -483,6 +484,10 @@ export default function App({ // Tracks depth to allow intentional reentry while blocking parallel calls const processingConversationRef = useRef(0); + // Generation counter - incremented on each ESC interrupt. + // Allows processConversation to detect if it's been superseded. + const conversationGenerationRef = useRef(0); + // Whether an interrupt has been requested for the current stream const [interruptRequested, setInterruptRequested] = useState(false); @@ -1209,12 +1214,24 @@ export default function App({ const processConversation = useCallback( async ( initialInput: Array, - options?: { allowReentry?: boolean }, + options?: { allowReentry?: boolean; submissionGeneration?: number }, ): Promise => { // Copy so we can safely mutate for retry recovery flows const currentInput = [...initialInput]; const allowReentry = options?.allowReentry ?? false; + // Use provided generation (from onSubmit) or capture current + // This allows detecting if ESC was pressed during async work before this function was called + const myGeneration = + options?.submissionGeneration ?? conversationGenerationRef.current; + + // Check if we're already stale (ESC was pressed while we were queued in onSubmit). + // This can happen if ESC was pressed during async work before processConversation was called. + // We check early to avoid setting state (streaming, etc.) for stale conversations. + if (myGeneration !== conversationGenerationRef.current) { + return; + } + // Guard against concurrent processConversation calls // This can happen if user submits two messages in quick succession // Uses dedicated ref (not streamingRef) since streaming may be set early for UI responsiveness @@ -1238,12 +1255,19 @@ export default function App({ return; } + // Double-check we haven't become stale between entry and try block + if (myGeneration !== conversationGenerationRef.current) { + return; + } + setStreaming(true); abortControllerRef.current = new AbortController(); // Clear any stale pending tool calls from previous turns // If we're sending a new message, old pending state is no longer relevant - markIncompleteToolsAsCancelled(buffersRef.current); + // Pass false to avoid setting interrupted=true, which causes race conditions + // with concurrent processConversation calls reading the flag + markIncompleteToolsAsCancelled(buffersRef.current, false); // Reset interrupted flag since we're starting a fresh stream buffersRef.current.interrupted = false; @@ -1257,7 +1281,13 @@ export default function App({ // Check if cancelled before starting new stream if (signal?.aborted) { - setStreaming(false); + const isStaleAtAbort = + myGeneration !== conversationGenerationRef.current; + // Only set streaming=false if this is the current generation. + // If stale, a newer processConversation might be running and we shouldn't affect its UI. + if (!isStaleAtAbort) { + setStreaming(false); + } return; } @@ -1269,7 +1299,13 @@ export default function App({ // Check again after network call - user may have pressed Escape during sendMessageStream if (signal?.aborted) { - setStreaming(false); + const isStaleAtAbort = + myGeneration !== conversationGenerationRef.current; + // Only set streaming=false if this is the current generation. + // If stale, a newer processConversation might be running and we shouldn't affect its UI. + if (!isStaleAtAbort) { + setStreaming(false); + } return; } @@ -1349,6 +1385,17 @@ export default function App({ const wasAborted = !!signal?.aborted; let stopReasonToHandle = wasAborted ? "cancelled" : stopReason; + // Check if this conversation became stale while the stream was running. + // If stale, a newer processConversation is running and we shouldn't modify UI state. + const isStaleAfterDrain = + myGeneration !== conversationGenerationRef.current; + + // If this conversation is stale, exit without modifying UI state. + // A newer conversation is running and should control the UI. + if (isStaleAfterDrain) { + return; + } + // Immediate refresh after stream completes to show final state unless // the user already cancelled (handleInterrupt rendered the UI). if (!wasInterrupted) { @@ -2125,11 +2172,19 @@ export default function App({ sendDesktopNotification(); // Notify user of error refreshDerived(); } finally { + // Check if this conversation was superseded by an ESC interrupt + const isStale = myGeneration !== conversationGenerationRef.current; + abortControllerRef.current = null; - processingConversationRef.current = Math.max( - 0, - processingConversationRef.current - 1, - ); + + // Only decrement ref if this conversation is still current. + // If stale (ESC was pressed), handleInterrupt already reset ref to 0. + if (!isStale) { + processingConversationRef.current = Math.max( + 0, + processingConversationRef.current - 1, + ); + } } }, [ @@ -2198,7 +2253,9 @@ export default function App({ return; } - if (!streaming || interruptRequested) return; + if (!streaming || interruptRequested) { + return; + } // If we're in the middle of queue cancel, set flag to restore instead of auto-send if (waitingForQueueCancelRef.current) { @@ -2226,6 +2283,14 @@ export default function App({ // Set cancellation flag to prevent processConversation from starting userCancelledRef.current = true; + // Increment generation to mark any in-flight processConversation as stale. + // The stale processConversation will check this and exit quietly without + // decrementing the ref (since we reset it here). + conversationGenerationRef.current += 1; + + // Reset the processing guard so the next message can start a new conversation. + processingConversationRef.current = 0; + // Stop streaming and show error message (unless tool calls were cancelled, // since the tool result will show "Interrupted by user") setStreaming(false); @@ -2776,6 +2841,10 @@ export default function App({ if (!msg) return { submitted: false }; + // Capture the generation at submission time, BEFORE any async work. + // This allows detecting if ESC was pressed during async operations. + const submissionGeneration = conversationGenerationRef.current; + // Track user input (agent_id automatically added from telemetry.currentAgentId) telemetry.trackUserInput(msg, "user", currentModelId || "unknown"); @@ -4652,7 +4721,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl content: messageContent as unknown as MessageCreate["content"], }); - await processConversation(initialInput); + await processConversation(initialInput, { submissionGeneration }); // Clean up placeholders after submission clearPlaceholdersInText(msg); diff --git a/src/cli/helpers/accumulator.ts b/src/cli/helpers/accumulator.ts index 4e843b1..3ddd939 100644 --- a/src/cli/helpers/accumulator.ts +++ b/src/cli/helpers/accumulator.ts @@ -174,11 +174,21 @@ export function markCurrentLineAsFinished(b: Buffers) { /** * Mark any incomplete tool calls as cancelled when stream is interrupted. * This prevents blinking tool calls from staying in progress state. + * @param b - The buffers object + * @param setInterruptedFlag - Whether to set the interrupted flag (default true). + * Pass false when clearing stale tool calls at stream startup to avoid race conditions + * with concurrent processConversation calls reading the flag. * @returns true if any tool calls were marked as cancelled */ -export function markIncompleteToolsAsCancelled(b: Buffers): boolean { +export function markIncompleteToolsAsCancelled( + b: Buffers, + setInterruptedFlag = true, +): boolean { // Mark buffer as interrupted to skip stale throttled refreshes - b.interrupted = true; + // (only when actually interrupting, not when clearing stale state at startup) + if (setInterruptedFlag) { + b.interrupted = true; + } let anyToolsCancelled = false; for (const [id, line] of b.byId.entries()) {