From 3e9202c7b32014647b5229ce68949c13fe1d874c Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sun, 26 Oct 2025 17:30:58 -0700 Subject: [PATCH] fix: add esc to cancel (#10) --- src/cli/App.tsx | 120 ++++++++++++++++++++++++++++--- src/cli/components/InputRich.tsx | 62 ++++++++++------ src/cli/helpers/accumulator.ts | 20 ++++++ src/cli/helpers/stream.ts | 10 +++ src/tools/manager.ts | 4 +- 5 files changed, 186 insertions(+), 30 deletions(-) diff --git a/src/cli/App.tsx b/src/cli/App.tsx index b354d93..e9a729f 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -3,6 +3,8 @@ import { Letta } from "@letta-ai/letta-client"; import { Box, Static } from "ink"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { getResumeData } from "../agent/check-approval"; +import { getClient } from "../agent/client"; import { sendMessageStream } from "../agent/message"; import { SessionStats } from "../agent/stats"; import type { ApprovalContext } from "../permissions/analyzer"; @@ -50,6 +52,11 @@ import { useTerminalWidth } from "./hooks/useTerminalWidth"; const CLEAR_SCREEN_AND_HOME = "\u001B[2J\u001B[H"; +// Feature flag: Check for pending approvals before sending messages +// This prevents infinite thinking state when there's an orphaned approval +// Can be disabled if the latency check adds too much overhead +const CHECK_PENDING_APPROVALS_BEFORE_SEND = true; + // tiny helper for unique ids (avoid overwriting prior user lines) function uid(prefix: string) { return `${prefix}-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; @@ -98,6 +105,9 @@ export default function App({ // Whether a stream is in flight (disables input) const [streaming, setStreaming] = useState(false); + // Whether an interrupt has been requested for the current stream + const [interruptRequested, setInterruptRequested] = useState(false); + // Whether a command is running (disables input but no streaming UI) const [commandRunning, setCommandRunning] = useState(false); @@ -145,6 +155,9 @@ export default function App({ // Guard to append welcome snapshot only once const welcomeCommittedRef = useRef(false); + // AbortController for stream cancellation + const abortControllerRef = useRef(null); + // Track terminal shrink events to refresh static output (prevents wrapped leftovers) const columns = useTerminalWidth(); const prevColumnsRef = useRef(columns); @@ -357,6 +370,7 @@ export default function App({ try { setStreaming(true); + abortControllerRef.current = new AbortController(); while (true) { // Stream one turn @@ -365,6 +379,7 @@ export default function App({ stream, buffersRef.current, refreshDerivedThrottled, + abortControllerRef.current.signal, ); // Track API duration @@ -380,6 +395,13 @@ export default function App({ return; } + // Case 1.5: Stream was cancelled by user + if (stopReason === "cancelled") { + appendError("Stream interrupted by user"); + setStreaming(false); + return; + } + // Case 2: Requires approval if (stopReason === Letta.StopReasonType.RequiresApproval) { if (!approval) { @@ -487,6 +509,8 @@ export default function App({ } catch (e) { appendError(String(e)); setStreaming(false); + } finally { + abortControllerRef.current = null; } }, [agentId, appendError, refreshDerived, refreshDerivedThrottled], @@ -500,17 +524,55 @@ export default function App({ }, 100); }, []); + const handleInterrupt = useCallback(async () => { + if (!streaming || interruptRequested) return; + + setInterruptRequested(true); + try { + const client = getClient(); + + // Send cancel request to backend + await client.agents.messages.cancel(agentId); + + // WORKAROUND: Also abort the stream immediately since backend cancellation is buggy + // TODO: Once backend is fixed, comment out the immediate abort below and uncomment the timeout version + if (abortControllerRef.current) { + abortControllerRef.current.abort(); + } + + // FUTURE: Use this timeout-based abort once backend properly sends "cancelled" stop reason + // This gives the backend 5 seconds to gracefully close the stream before forcing abort + // const abortTimeout = setTimeout(() => { + // if (abortControllerRef.current) { + // abortControllerRef.current.abort(); + // } + // }, 5000); + // + // // The timeout will be cleared in processConversation's finally block when stream ends + } catch (e) { + appendError(`Failed to interrupt stream: ${String(e)}`); + setInterruptRequested(false); + } + }, [agentId, streaming, interruptRequested, appendError]); + + // Reset interrupt flag when streaming ends + useEffect(() => { + if (!streaming) { + setInterruptRequested(false); + } + }, [streaming]); + const onSubmit = useCallback( - async (message?: string) => { + async (message?: string): Promise<{ submitted: boolean }> => { const msg = message?.trim() ?? ""; - if (!msg || streaming || commandRunning) return; + if (!msg || streaming || commandRunning) return { submitted: false }; // Handle commands (messages starting with "/") if (msg.startsWith("/")) { // Special handling for /model command - opens selector if (msg.trim() === "/model") { setModelSelectorOpen(true); - return; + return { submitted: true }; } // Special handling for /agent command - show agent link @@ -527,13 +589,13 @@ export default function App({ }); buffersRef.current.order.push(cmdId); refreshDerived(); - return; + return { submitted: true }; } // Special handling for /exit command - show stats and exit if (msg.trim() === "/exit") { handleExit(); - return; + return { submitted: true }; } // Special handling for /stream command - toggle and save @@ -587,7 +649,7 @@ export default function App({ // Unlock input setCommandRunning(false); } - return; + return { submitted: true }; } // Immediately add command to transcript with "running" phase @@ -634,7 +696,7 @@ export default function App({ // Unlock input setCommandRunning(false); } - return; // Don't send commands to Letta agent + return { submitted: true }; // Don't send commands to Letta agent } // Build message content from display value (handles placeholders for text/images) @@ -652,7 +714,7 @@ export default function App({ ] : contentParts; - // Append the user message to transcript (keep placeholders as-is for display) + // Append the user message to transcript IMMEDIATELY (optimistic update) const userId = uid("user"); buffersRef.current.byId.set(userId, { kind: "user", @@ -665,8 +727,46 @@ export default function App({ buffersRef.current.tokenCount = 0; // Rotate to a new thinking message for this turn setThinkingMessage(getRandomThinkingMessage()); + // Show streaming state immediately for responsiveness + setStreaming(true); refreshDerived(); + // Check for pending approvals before sending message + if (CHECK_PENDING_APPROVALS_BEFORE_SEND) { + try { + const client = getClient(); + const { pendingApproval: existingApproval } = await getResumeData( + client, + agentId, + ); + + if (existingApproval) { + // There's a pending approval - show it and DON'T send the message yet + // The message will be restored to the input field for the user to decide + // Note: The user message is already in the transcript (optimistic update) + setStreaming(false); // Stop streaming indicator + setPendingApproval(existingApproval); + + // Analyze approval context + const parsedArgs = safeJsonParseOr>( + existingApproval.toolArgs, + {}, + ); + const context = await analyzeToolApproval( + existingApproval.toolName, + parsedArgs, + ); + setApprovalContext(context); + + // Return false = message NOT submitted, will be restored to input + return { submitted: false }; + } + } catch (error) { + // If check fails, proceed anyway (don't block user) + console.error("Failed to check pending approvals:", error); + } + } + // Start the conversation loop await processConversation([ { @@ -677,6 +777,8 @@ export default function App({ // Clean up placeholders after submission clearPlaceholdersInText(msg); + + return { submitted: true }; }, [ streaming, @@ -1114,6 +1216,8 @@ export default function App({ permissionMode={uiPermissionMode} onPermissionModeChange={setUiPermissionMode} onExit={handleExit} + onInterrupt={handleInterrupt} + interruptRequested={interruptRequested} /> {/* Model Selector - conditionally mounted as overlay */} diff --git a/src/cli/components/InputRich.tsx b/src/cli/components/InputRich.tsx index 7d44c4e..6af407a 100644 --- a/src/cli/components/InputRich.tsx +++ b/src/cli/components/InputRich.tsx @@ -12,7 +12,7 @@ import { PasteAwareTextInput } from "./PasteAwareTextInput"; import { ShimmerText } from "./ShimmerText"; // Type assertion for ink-spinner compatibility -const Spinner = SpinnerLib as ComponentType; +const Spinner = SpinnerLib as ComponentType<{ type?: string }>; // Only show token count when it exceeds this threshold const COUNTER_VISIBLE_THRESHOLD = 1000; @@ -27,16 +27,20 @@ export function Input({ permissionMode: externalMode, onPermissionModeChange, onExit, + onInterrupt, + interruptRequested = false, }: { visible?: boolean; streaming: boolean; commandRunning?: boolean; tokenCount: number; thinkingMessage: string; - onSubmit: (message?: string) => void; + onSubmit: (message?: string) => Promise<{ submitted: boolean }>; permissionMode?: PermissionMode; onPermissionModeChange?: (mode: PermissionMode) => void; onExit?: () => void; + onInterrupt?: () => void; + interruptRequested?: boolean; }) { const [value, setValue] = useState(""); const [escapePressed, setEscapePressed] = useState(false); @@ -62,22 +66,30 @@ export function Input({ const columns = useTerminalWidth(); const contentWidth = Math.max(0, columns - 2); - // Handle escape key for double-escape-to-clear + // Handle escape key for interrupt (when streaming) or double-escape-to-clear (when not) useInput((_input, key) => { - if (key.escape && value) { - // Only work when input is non-empty - if (escapePressed) { - // Second escape - clear input - setValue(""); - setEscapePressed(false); - if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current); - } else { - // First escape - start 1-second timer - setEscapePressed(true); - if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current); - escapeTimerRef.current = setTimeout(() => { + if (key.escape) { + // When streaming, use Esc to interrupt + if (streaming && onInterrupt && !interruptRequested) { + onInterrupt(); + return; + } + + // When input is non-empty, use double-escape to clear + if (value) { + if (escapePressed) { + // Second escape - clear input + setValue(""); setEscapePressed(false); - }, 1000); + if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current); + } else { + // First escape - start 1-second timer + setEscapePressed(true); + if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current); + escapeTimerRef.current = setTimeout(() => { + setEscapePressed(false); + }, 1000); + } } } }); @@ -159,12 +171,17 @@ export function Input({ return () => clearInterval(id); }, [streaming, thinkingMessage]); - const handleSubmit = () => { + const handleSubmit = async () => { if (streaming || commandRunning) { return; } - onSubmit(value); - setValue(""); + const previousValue = value; + setValue(""); // Clear immediately for responsiveness + const result = await onSubmit(previousValue); + // If message was NOT submitted (e.g. pending approval), restore it + if (!result.submitted) { + setValue(previousValue); + } }; // Get display name and color for permission mode @@ -212,7 +229,12 @@ export function Input({ message={thinkingMessage} shimmerOffset={shimmerOffset} /> - {shouldShowTokenCount && ({tokenCount} โ†‘)} + + {" ("} + {interruptRequested ? "interrupting" : "esc to interrupt"} + {shouldShowTokenCount && ` ยท ${tokenCount} โ†‘`} + {")"} + )} diff --git a/src/cli/helpers/accumulator.ts b/src/cli/helpers/accumulator.ts index 8e2f7f2..584a4ff 100644 --- a/src/cli/helpers/accumulator.ts +++ b/src/cli/helpers/accumulator.ts @@ -150,6 +150,26 @@ export function markCurrentLineAsFinished(b: Buffers) { } } +/** + * Mark any incomplete tool calls as cancelled when stream is interrupted. + * This prevents blinking tool calls from staying in progress state. + */ +export function markIncompleteToolsAsCancelled(b: Buffers) { + for (const [id, line] of b.byId.entries()) { + if (line.kind === "tool_call" && line.phase !== "finished") { + const updatedLine = { + ...line, + phase: "finished" as const, + resultOk: false, + resultText: "Interrupted by user", + }; + b.byId.set(id, updatedLine); + } + } + // Also mark any streaming assistant/reasoning lines as finished + markCurrentLineAsFinished(b); +} + type ToolCallLine = Extract; // Flatten common SDK "parts" โ†’ text diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 73efacf..e59a9c4 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -2,6 +2,7 @@ import { Letta } from "@letta-ai/letta-client"; import { type createBuffers, markCurrentLineAsFinished, + markIncompleteToolsAsCancelled, onChunk, } from "./accumulator"; @@ -23,6 +24,7 @@ export async function drainStream( stream: AsyncIterable, buffers: ReturnType, refresh: () => void, + abortSignal?: AbortSignal, ): Promise { const startTime = performance.now(); @@ -36,6 +38,14 @@ export async function drainStream( let lastSeqId: number | null = null; for await (const chunk of stream) { + // Check if stream was aborted + if (abortSignal?.aborted) { + stopReason = "cancelled" as Letta.StopReasonType; + // Mark incomplete tool calls as cancelled to prevent stuck blinking UI + markIncompleteToolsAsCancelled(buffers); + queueMicrotask(refresh); + break; + } // Store the runId and seqId to re-connect if stream is interrupted if ("runId" in chunk && "seqId" in chunk && chunk.runId && chunk.seqId) { lastRunId = chunk.runId; diff --git a/src/tools/manager.ts b/src/tools/manager.ts index 3f01bc1..c3742b2 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -98,7 +98,7 @@ function generatePythonStub( * @returns Tool permissions object with requiresApproval flag */ export function getToolPermissions(toolName: string) { - return TOOL_PERMISSIONS[toolName] || { requiresApproval: false }; + return TOOL_PERMISSIONS[toolName as ToolName] || { requiresApproval: false }; } /** @@ -108,7 +108,7 @@ export function getToolPermissions(toolName: string) { * @deprecated Use checkToolPermission instead for full permission system support */ export function requiresApproval(toolName: string): boolean { - return TOOL_PERMISSIONS[toolName]?.requiresApproval ?? false; + return TOOL_PERMISSIONS[toolName as ToolName]?.requiresApproval ?? false; } /**