From c3e82a4c89bbbbd7614898c13db443a24c7ce734 Mon Sep 17 00:00:00 2001 From: Christina Tong Date: Thu, 11 Dec 2025 20:06:20 -0800 Subject: [PATCH] feat: update message queueing silent cancel and clean up eager cancellation state (#184) --- src/cli/App.tsx | 159 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 142 insertions(+), 17 deletions(-) diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 76ac42c..2c31e47 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -367,6 +367,15 @@ export default function App({ // Message queue state for queueing messages during streaming const [messageQueue, setMessageQueue] = useState([]); + // Queue cancellation: when queue length > 1, we send cancel and wait for natural stream end + const waitingForQueueCancelRef = useRef(false); + const queueSnapshotRef = useRef([]); + const [restoreQueueOnCancel, setRestoreQueueOnCancel] = useState(false); + const restoreQueueOnCancelRef = useRef(restoreQueueOnCancel); + useEffect(() => { + restoreQueueOnCancelRef.current = restoreQueueOnCancel; + }, [restoreQueueOnCancel]); + // Track terminal shrink events to refresh static output (prevents wrapped leftovers) const columns = useTerminalWidth(); const prevColumnsRef = useRef(columns); @@ -392,16 +401,13 @@ export default function App({ // Commit immutable/finished lines into the historical log const commitEligibleLines = useCallback((b: Buffers) => { const newlyCommitted: StaticItem[] = []; - // console.log(`[COMMIT] Checking ${b.order.length} lines for commit eligibility`); for (const id of b.order) { if (emittedIdsRef.current.has(id)) continue; const ln = b.byId.get(id); if (!ln) continue; - // console.log(`[COMMIT] Checking ${id}: kind=${ln.kind}, phase=${(ln as any).phase}`); if (ln.kind === "user" || ln.kind === "error" || ln.kind === "status") { emittedIdsRef.current.add(id); newlyCommitted.push({ ...ln }); - // console.log(`[COMMIT] Committed ${id} (${ln.kind})`); continue; } // Commands with phase should only commit when finished @@ -409,20 +415,15 @@ export default function App({ if (!ln.phase || ln.phase === "finished") { emittedIdsRef.current.add(id); newlyCommitted.push({ ...ln }); - // console.log(`[COMMIT] Committed ${id} (command, finished)`); } continue; } if ("phase" in ln && ln.phase === "finished") { emittedIdsRef.current.add(id); newlyCommitted.push({ ...ln }); - // console.log(`[COMMIT] Committed ${id} (${ln.kind}, finished)`); - } else { - // console.log(`[COMMIT] NOT committing ${id} (phase=${(ln as any).phase})`); } } if (newlyCommitted.length > 0) { - // console.log(`[COMMIT] Total committed: ${newlyCommitted.length} items`); setStaticItems((prev) => [...prev, ...newlyCommitted]); } }, []); @@ -647,16 +648,67 @@ export default function App({ // Case 1: Turn ended normally if (stopReason === "end_turn") { setStreaming(false); + + // Check if we were waiting for cancel but stream finished naturally + if (waitingForQueueCancelRef.current) { + if (restoreQueueOnCancelRef.current) { + // User hit ESC during queue cancel - abort the auto-send + setRestoreQueueOnCancel(false); + // Don't clear queue, don't send - let dequeue effect handle them one by one + } else { + // Auto-send concatenated message + // Clear the queue + setMessageQueue([]); + + // Concatenate the snapshot + const concatenatedMessage = queueSnapshotRef.current.join("\n"); + + if (concatenatedMessage.trim()) { + onSubmitRef.current(concatenatedMessage); + } + } + + // Reset flags + waitingForQueueCancelRef.current = false; + queueSnapshotRef.current = []; + } + return; } // Case 1.5: Stream was cancelled by user if (stopReason === "cancelled") { - // Only show error if not using eager cancel (eager cancel already handled this) - if (!EAGER_CANCEL) { - appendError("Stream interrupted by user"); - } setStreaming(false); + + // Check if this cancel was triggered by queue threshold + if (waitingForQueueCancelRef.current) { + if (restoreQueueOnCancelRef.current) { + // User hit ESC during queue cancel - abort the auto-send + setRestoreQueueOnCancel(false); + // Don't clear queue, don't send - let dequeue effect handle them one by one + } else { + // Auto-send concatenated message + // Clear the queue + setMessageQueue([]); + + // Concatenate the snapshot + const concatenatedMessage = queueSnapshotRef.current.join("\n"); + + if (concatenatedMessage.trim()) { + onSubmitRef.current(concatenatedMessage); + } + } + + // Reset flags + waitingForQueueCancelRef.current = false; + queueSnapshotRef.current = []; + } else { + // Regular user cancellation - show error + if (!EAGER_CANCEL) { + appendError("Stream interrupted by user"); + } + } + return; } @@ -954,6 +1006,12 @@ export default function App({ if (!streaming || interruptRequested) return; + // If we're in the middle of queue cancel, set flag to restore instead of auto-send + if (waitingForQueueCancelRef.current) { + setRestoreQueueOnCancel(true); + // Don't reset flags - let the cancel complete naturally + } + // If EAGER_CANCEL is enabled, immediately stop everything client-side first if (EAGER_CANCEL) { // Abort the stream via abort signal @@ -970,6 +1028,13 @@ export default function App({ appendError("Stream interrupted by user"); refreshDerived(); + // Clear any pending approvals since we're cancelling + setPendingApprovals([]); + setApprovalContexts([]); + setApprovalResults([]); + setAutoHandledResults([]); + setAutoDeniedApprovals([]); + // Send cancel request to backend asynchronously (fire-and-forget) // Don't wait for it or show errors since user already got feedback getClient() @@ -1032,7 +1097,28 @@ export default function App({ const agentBusy = streaming || isExecutingTool || commandRunning; if (agentBusy) { - setMessageQueue((prev) => [...prev, msg]); + setMessageQueue((prev) => { + const newQueue = [...prev, msg]; + + // If queue grows to 2+ messages and we're not already waiting for cancel, + // send cancel request and capture snapshot + if (newQueue.length > 1 && !waitingForQueueCancelRef.current) { + // Capture snapshot of queue right now + queueSnapshotRef.current = [...newQueue]; + waitingForQueueCancelRef.current = true; + + // Send cancel request to backend (fire-and-forget) + getClient() + .then((client) => client.agents.messages.cancel(agentId)) + .then(() => {}) + .catch(() => { + // Reset flag if cancel fails + waitingForQueueCancelRef.current = false; + }); + } + + return newQueue; + }); return { submitted: true }; // Clears input } @@ -1971,10 +2057,33 @@ ${recentCommits} agent, ); + // Check if user cancelled while we were fetching approval state + if ( + userCancelledRef.current || + abortControllerRef.current?.signal.aborted + ) { + // User hit ESC during the check - abort and clean up + buffersRef.current.byId.delete(userId); + const orderIndex = buffersRef.current.order.indexOf(userId); + if (orderIndex !== -1) { + buffersRef.current.order.splice(orderIndex, 1); + } + setStreaming(false); + refreshDerived(); + return { submitted: false }; + } + if (existingApprovals && existingApprovals.length > 0) { // There are pending approvals - show them and DON'T send the message yet // The message will be restored to the input field for the user to decide - // Note: The user message is already in the transcript (optimistic update) + + // Remove the optimistic user message from transcript to avoid duplication + buffersRef.current.byId.delete(userId); + const orderIndex = buffersRef.current.order.indexOf(userId); + if (orderIndex !== -1) { + buffersRef.current.order.splice(orderIndex, 1); + } + setStreaming(false); // Stop streaming indicator setPendingApprovals(existingApprovals); @@ -1988,14 +2097,28 @@ ${recentCommits} return await analyzeToolApproval(approval.toolName, parsedArgs); }), ); + + // Check again after async approval analysis + if ( + userCancelledRef.current || + abortControllerRef.current?.signal.aborted + ) { + // User cancelled during analysis - don't show dialog + setStreaming(false); + refreshDerived(); + return { submitted: false }; + } + setApprovalContexts(contexts); + // Refresh to remove the message from UI + refreshDerived(); + // Return false = message NOT submitted, will be restored to input return { submitted: false }; } - } catch (error) { + } catch (_error) { // If check fails, proceed anyway (don't block user) - console.error("Failed to check pending approvals:", error); } } @@ -2052,7 +2175,9 @@ ${recentCommits} messageQueue.length > 0 && pendingApprovals.length === 0 && !commandRunning && - !isExecutingTool + !isExecutingTool && + !waitingForQueueCancelRef.current && // Don't dequeue while waiting for cancel + !userCancelledRef.current // Don't dequeue if user just cancelled ) { const [firstMessage, ...rest] = messageQueue; setMessageQueue(rest);