From e8bd2e653712a99714b7b697fab984e8678fcd47 Mon Sep 17 00:00:00 2001 From: Christina Tong Date: Mon, 8 Dec 2025 23:46:18 -0800 Subject: [PATCH] fix: optimistic cancellation [LET-6273] (#166) --- src/cli/App.tsx | 95 +++++++++++++++++++++++++++++++-------- src/cli/helpers/stream.ts | 9 +++- 2 files changed, 85 insertions(+), 19 deletions(-) diff --git a/src/cli/App.tsx b/src/cli/App.tsx index d5b64a2..8f2f6f9 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -1,13 +1,15 @@ // src/cli/App.tsx import { existsSync, readFileSync } from "node:fs"; -import { APIError } from "@letta-ai/letta-client/core/error"; +import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error"; +import type { Stream } from "@letta-ai/letta-client/core/streaming"; import type { AgentState, MessageCreate, } from "@letta-ai/letta-client/resources/agents/agents"; import type { ApprovalCreate, + LettaStreamingResponse, Message, } from "@letta-ai/letta-client/resources/agents/messages"; import type { LlmConfig } from "@letta-ai/letta-client/resources/models/models"; @@ -360,6 +362,9 @@ export default function App({ // AbortController for stream cancellation const abortControllerRef = useRef(null); + // Track if user wants to cancel (persists across state updates) + const userCancelledRef = useRef(false); + // Track terminal shrink events to refresh static output (prevents wrapped leftovers) const columns = useTerminalWidth(); const prevColumnsRef = useRef(columns); @@ -602,6 +607,12 @@ export default function App({ let lastKnownRunId: string | null = null; try { + // Check if user hit escape before we started + if (userCancelledRef.current) { + userCancelledRef.current = false; // Reset for next time + return; + } + setStreaming(true); abortControllerRef.current = new AbortController(); @@ -610,6 +621,12 @@ export default function App({ markIncompleteToolsAsCancelled(buffersRef.current); while (true) { + // Check if cancelled before starting new stream + if (abortControllerRef.current?.signal.aborted) { + setStreaming(false); + return; + } + // Stream one turn const stream = await sendMessageStream(agentId, currentInput); const { stopReason, approval, approvals, apiDurationMs, lastRunId } = @@ -640,7 +657,10 @@ export default function App({ // Case 1.5: Stream was cancelled by user if (stopReason === "cancelled") { - appendError("Stream interrupted by user"); + // Only show error if not using eager cancel (eager cancel already handled this) + if (!EAGER_CANCEL) { + appendError("Stream interrupted by user"); + } setStreaming(false); return; } @@ -874,6 +894,14 @@ export default function App({ // Mark incomplete tool calls as cancelled to prevent stuck blinking UI markIncompleteToolsAsCancelled(buffersRef.current); + // If using eager cancel and this is an abort error, silently ignore it + // The user already got "Stream interrupted by user" feedback from handleInterrupt + if (EAGER_CANCEL && e instanceof APIUserAbortError) { + setStreaming(false); + refreshDerived(); + return; + } + // Build error message with run_id for debugging const runIdSuffix = lastKnownRunId ? `\n(run_id: ${lastKnownRunId}, stop_reason: error)` @@ -921,24 +949,53 @@ export default function App({ if (!streaming || interruptRequested) return; - setInterruptRequested(true); - try { - const client = await getClient(); - - // Send cancel request to backend - const _cancelResult = await client.agents.messages.cancel(agentId); - // console.error("cancelResult", JSON.stringify(cancelResult, null, 2)); - - // If EAGER_CANCEL is enabled, immediately abort the stream client-side - // This provides instant feedback without waiting for backend to acknowledge - if (EAGER_CANCEL && abortControllerRef.current) { + // If EAGER_CANCEL is enabled, immediately stop everything client-side first + if (EAGER_CANCEL) { + // Abort the stream via abort signal + if (abortControllerRef.current) { abortControllerRef.current.abort(); } - } catch (e) { - appendError(`Failed to interrupt stream: ${String(e)}`); - setInterruptRequested(false); + + // Set cancellation flag to prevent processConversation from starting + userCancelledRef.current = true; + + // Stop streaming and show error message + setStreaming(false); + markIncompleteToolsAsCancelled(buffersRef.current); + appendError("Stream interrupted by user"); + refreshDerived(); + + // Send cancel request to backend asynchronously (fire-and-forget) + // Don't wait for it or show errors since user already got feedback + getClient() + .then((client) => client.agents.messages.cancel(agentId)) + .catch(() => { + // Silently ignore - cancellation already happened client-side + }); + + return; + } else { + setInterruptRequested(true); + try { + const client = await getClient(); + await client.agents.messages.cancel(agentId); + + if (abortControllerRef.current) { + abortControllerRef.current.abort(); + } + } catch (e) { + appendError(`Failed to interrupt stream: ${String(e)}`); + setInterruptRequested(false); + } } - }, [agentId, streaming, interruptRequested, appendError, isExecutingTool]); + }, [ + agentId, + streaming, + interruptRequested, + appendError, + isExecutingTool, + refreshDerived, + ]); // Reset interrupt flag when streaming ends useEffect(() => { @@ -2876,7 +2933,9 @@ Plan file path: ${planFilePath}`; !systemPromptSelectorOpen && !agentSelectorOpen } - streaming={streaming} + streaming={ + streaming && !abortControllerRef.current?.signal.aborted + } commandRunning={commandRunning} tokenCount={tokenCount} thinkingMessage={thinkingMessage} diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 67d4bd7..68f98fa 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -53,7 +53,6 @@ export async function drainStream( // Check if stream was aborted if (abortSignal?.aborted) { stopReason = "cancelled"; - // Mark incomplete tool calls as cancelled to prevent stuck blinking UI markIncompleteToolsAsCancelled(buffers); queueMicrotask(refresh); break; @@ -130,6 +129,14 @@ export async function drainStream( onChunk(buffers, chunk); queueMicrotask(refresh); + // Check abort signal again after processing chunk (for eager cancellation) + if (abortSignal?.aborted) { + stopReason = "cancelled"; + markIncompleteToolsAsCancelled(buffers); + queueMicrotask(refresh); + break; + } + if (chunk.message_type === "stop_reason") { stopReason = chunk.stop_reason; // Continue reading stream to get usage_statistics that may come after