fix: prevent stale processConversation calls from affecting UI state (#480)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-06 16:14:24 -08:00
committed by GitHub
parent 90018b6fd9
commit 57875dedd1
2 changed files with 91 additions and 12 deletions

View File

@@ -1,6 +1,7 @@
// src/cli/App.tsx
import { existsSync, readFileSync, writeFileSync } from "node:fs";
import { APIUserAbortError } from "@letta-ai/letta-client/core/error";
import type {
AgentState,
@@ -483,6 +484,10 @@ export default function App({
// Tracks depth to allow intentional reentry while blocking parallel calls
const processingConversationRef = useRef(0);
// Generation counter - incremented on each ESC interrupt.
// Allows processConversation to detect if it's been superseded.
const conversationGenerationRef = useRef(0);
// Whether an interrupt has been requested for the current stream
const [interruptRequested, setInterruptRequested] = useState(false);
@@ -1209,12 +1214,24 @@ export default function App({
const processConversation = useCallback(
async (
initialInput: Array<MessageCreate | ApprovalCreate>,
options?: { allowReentry?: boolean },
options?: { allowReentry?: boolean; submissionGeneration?: number },
): Promise<void> => {
// Copy so we can safely mutate for retry recovery flows
const currentInput = [...initialInput];
const allowReentry = options?.allowReentry ?? false;
// Use provided generation (from onSubmit) or capture current
// This allows detecting if ESC was pressed during async work before this function was called
const myGeneration =
options?.submissionGeneration ?? conversationGenerationRef.current;
// Check if we're already stale (ESC was pressed while we were queued in onSubmit).
// This can happen if ESC was pressed during async work before processConversation was called.
// We check early to avoid setting state (streaming, etc.) for stale conversations.
if (myGeneration !== conversationGenerationRef.current) {
return;
}
// Guard against concurrent processConversation calls
// This can happen if user submits two messages in quick succession
// Uses dedicated ref (not streamingRef) since streaming may be set early for UI responsiveness
@@ -1238,12 +1255,19 @@ export default function App({
return;
}
// Double-check we haven't become stale between entry and try block
if (myGeneration !== conversationGenerationRef.current) {
return;
}
setStreaming(true);
abortControllerRef.current = new AbortController();
// Clear any stale pending tool calls from previous turns
// If we're sending a new message, old pending state is no longer relevant
markIncompleteToolsAsCancelled(buffersRef.current);
// Pass false to avoid setting interrupted=true, which causes race conditions
// with concurrent processConversation calls reading the flag
markIncompleteToolsAsCancelled(buffersRef.current, false);
// Reset interrupted flag since we're starting a fresh stream
buffersRef.current.interrupted = false;
@@ -1257,7 +1281,13 @@ export default function App({
// Check if cancelled before starting new stream
if (signal?.aborted) {
setStreaming(false);
const isStaleAtAbort =
myGeneration !== conversationGenerationRef.current;
// Only set streaming=false if this is the current generation.
// If stale, a newer processConversation might be running and we shouldn't affect its UI.
if (!isStaleAtAbort) {
setStreaming(false);
}
return;
}
@@ -1269,7 +1299,13 @@ export default function App({
// Check again after network call - user may have pressed Escape during sendMessageStream
if (signal?.aborted) {
setStreaming(false);
const isStaleAtAbort =
myGeneration !== conversationGenerationRef.current;
// Only set streaming=false if this is the current generation.
// If stale, a newer processConversation might be running and we shouldn't affect its UI.
if (!isStaleAtAbort) {
setStreaming(false);
}
return;
}
@@ -1349,6 +1385,17 @@ export default function App({
const wasAborted = !!signal?.aborted;
let stopReasonToHandle = wasAborted ? "cancelled" : stopReason;
// Check if this conversation became stale while the stream was running.
// If stale, a newer processConversation is running and we shouldn't modify UI state.
const isStaleAfterDrain =
myGeneration !== conversationGenerationRef.current;
// If this conversation is stale, exit without modifying UI state.
// A newer conversation is running and should control the UI.
if (isStaleAfterDrain) {
return;
}
// Immediate refresh after stream completes to show final state unless
// the user already cancelled (handleInterrupt rendered the UI).
if (!wasInterrupted) {
@@ -2125,11 +2172,19 @@ export default function App({
sendDesktopNotification(); // Notify user of error
refreshDerived();
} finally {
// Check if this conversation was superseded by an ESC interrupt
const isStale = myGeneration !== conversationGenerationRef.current;
abortControllerRef.current = null;
processingConversationRef.current = Math.max(
0,
processingConversationRef.current - 1,
);
// Only decrement ref if this conversation is still current.
// If stale (ESC was pressed), handleInterrupt already reset ref to 0.
if (!isStale) {
processingConversationRef.current = Math.max(
0,
processingConversationRef.current - 1,
);
}
}
},
[
@@ -2198,7 +2253,9 @@ export default function App({
return;
}
if (!streaming || interruptRequested) return;
if (!streaming || interruptRequested) {
return;
}
// If we're in the middle of queue cancel, set flag to restore instead of auto-send
if (waitingForQueueCancelRef.current) {
@@ -2226,6 +2283,14 @@ export default function App({
// Set cancellation flag to prevent processConversation from starting
userCancelledRef.current = true;
// Increment generation to mark any in-flight processConversation as stale.
// The stale processConversation will check this and exit quietly without
// decrementing the ref (since we reset it here).
conversationGenerationRef.current += 1;
// Reset the processing guard so the next message can start a new conversation.
processingConversationRef.current = 0;
// Stop streaming and show error message (unless tool calls were cancelled,
// since the tool result will show "Interrupted by user")
setStreaming(false);
@@ -2776,6 +2841,10 @@ export default function App({
if (!msg) return { submitted: false };
// Capture the generation at submission time, BEFORE any async work.
// This allows detecting if ESC was pressed during async operations.
const submissionGeneration = conversationGenerationRef.current;
// Track user input (agent_id automatically added from telemetry.currentAgentId)
telemetry.trackUserInput(msg, "user", currentModelId || "unknown");
@@ -4652,7 +4721,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
content: messageContent as unknown as MessageCreate["content"],
});
await processConversation(initialInput);
await processConversation(initialInput, { submissionGeneration });
// Clean up placeholders after submission
clearPlaceholdersInText(msg);

View File

@@ -174,11 +174,21 @@ export function markCurrentLineAsFinished(b: Buffers) {
/**
* Mark any incomplete tool calls as cancelled when stream is interrupted.
* This prevents blinking tool calls from staying in progress state.
* @param b - The buffers object
* @param setInterruptedFlag - Whether to set the interrupted flag (default true).
* Pass false when clearing stale tool calls at stream startup to avoid race conditions
* with concurrent processConversation calls reading the flag.
* @returns true if any tool calls were marked as cancelled
*/
export function markIncompleteToolsAsCancelled(b: Buffers): boolean {
export function markIncompleteToolsAsCancelled(
b: Buffers,
setInterruptedFlag = true,
): boolean {
// Mark buffer as interrupted to skip stale throttled refreshes
b.interrupted = true;
// (only when actually interrupting, not when clearing stale state at startup)
if (setInterruptedFlag) {
b.interrupted = true;
}
let anyToolsCancelled = false;
for (const [id, line] of b.byId.entries()) {