fix: add esc to cancel (#10)

This commit is contained in:
Charles Packer
2025-10-26 17:30:58 -07:00
committed by GitHub
parent 66e77c2035
commit 3e9202c7b3
5 changed files with 186 additions and 30 deletions

View File

@@ -3,6 +3,8 @@
import { Letta } from "@letta-ai/letta-client";
import { Box, Static } from "ink";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { getResumeData } from "../agent/check-approval";
import { getClient } from "../agent/client";
import { sendMessageStream } from "../agent/message";
import { SessionStats } from "../agent/stats";
import type { ApprovalContext } from "../permissions/analyzer";
@@ -50,6 +52,11 @@ import { useTerminalWidth } from "./hooks/useTerminalWidth";
const CLEAR_SCREEN_AND_HOME = "\u001B[2J\u001B[H";
// Feature flag: Check for pending approvals before sending messages
// This prevents infinite thinking state when there's an orphaned approval
// Can be disabled if the latency check adds too much overhead
const CHECK_PENDING_APPROVALS_BEFORE_SEND = true;
// tiny helper for unique ids (avoid overwriting prior user lines)
function uid(prefix: string) {
return `${prefix}-${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
@@ -98,6 +105,9 @@ export default function App({
// Whether a stream is in flight (disables input)
const [streaming, setStreaming] = useState(false);
// Whether an interrupt has been requested for the current stream
const [interruptRequested, setInterruptRequested] = useState(false);
// Whether a command is running (disables input but no streaming UI)
const [commandRunning, setCommandRunning] = useState(false);
@@ -145,6 +155,9 @@ export default function App({
// Guard to append welcome snapshot only once
const welcomeCommittedRef = useRef(false);
// AbortController for stream cancellation
const abortControllerRef = useRef<AbortController | null>(null);
// Track terminal shrink events to refresh static output (prevents wrapped leftovers)
const columns = useTerminalWidth();
const prevColumnsRef = useRef(columns);
@@ -357,6 +370,7 @@ export default function App({
try {
setStreaming(true);
abortControllerRef.current = new AbortController();
while (true) {
// Stream one turn
@@ -365,6 +379,7 @@ export default function App({
stream,
buffersRef.current,
refreshDerivedThrottled,
abortControllerRef.current.signal,
);
// Track API duration
@@ -380,6 +395,13 @@ export default function App({
return;
}
// Case 1.5: Stream was cancelled by user
if (stopReason === "cancelled") {
appendError("Stream interrupted by user");
setStreaming(false);
return;
}
// Case 2: Requires approval
if (stopReason === Letta.StopReasonType.RequiresApproval) {
if (!approval) {
@@ -487,6 +509,8 @@ export default function App({
} catch (e) {
appendError(String(e));
setStreaming(false);
} finally {
abortControllerRef.current = null;
}
},
[agentId, appendError, refreshDerived, refreshDerivedThrottled],
@@ -500,17 +524,55 @@ export default function App({
}, 100);
}, []);
const handleInterrupt = useCallback(async () => {
if (!streaming || interruptRequested) return;
setInterruptRequested(true);
try {
const client = getClient();
// Send cancel request to backend
await client.agents.messages.cancel(agentId);
// WORKAROUND: Also abort the stream immediately since backend cancellation is buggy
// TODO: Once backend is fixed, comment out the immediate abort below and uncomment the timeout version
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
// FUTURE: Use this timeout-based abort once backend properly sends "cancelled" stop reason
// This gives the backend 5 seconds to gracefully close the stream before forcing abort
// const abortTimeout = setTimeout(() => {
// if (abortControllerRef.current) {
// abortControllerRef.current.abort();
// }
// }, 5000);
//
// // The timeout will be cleared in processConversation's finally block when stream ends
} catch (e) {
appendError(`Failed to interrupt stream: ${String(e)}`);
setInterruptRequested(false);
}
}, [agentId, streaming, interruptRequested, appendError]);
// Reset interrupt flag when streaming ends
useEffect(() => {
if (!streaming) {
setInterruptRequested(false);
}
}, [streaming]);
const onSubmit = useCallback(
async (message?: string) => {
async (message?: string): Promise<{ submitted: boolean }> => {
const msg = message?.trim() ?? "";
if (!msg || streaming || commandRunning) return;
if (!msg || streaming || commandRunning) return { submitted: false };
// Handle commands (messages starting with "/")
if (msg.startsWith("/")) {
// Special handling for /model command - opens selector
if (msg.trim() === "/model") {
setModelSelectorOpen(true);
return;
return { submitted: true };
}
// Special handling for /agent command - show agent link
@@ -527,13 +589,13 @@ export default function App({
});
buffersRef.current.order.push(cmdId);
refreshDerived();
return;
return { submitted: true };
}
// Special handling for /exit command - show stats and exit
if (msg.trim() === "/exit") {
handleExit();
return;
return { submitted: true };
}
// Special handling for /stream command - toggle and save
@@ -587,7 +649,7 @@ export default function App({
// Unlock input
setCommandRunning(false);
}
return;
return { submitted: true };
}
// Immediately add command to transcript with "running" phase
@@ -634,7 +696,7 @@ export default function App({
// Unlock input
setCommandRunning(false);
}
return; // Don't send commands to Letta agent
return { submitted: true }; // Don't send commands to Letta agent
}
// Build message content from display value (handles placeholders for text/images)
@@ -652,7 +714,7 @@ export default function App({
]
: contentParts;
// Append the user message to transcript (keep placeholders as-is for display)
// Append the user message to transcript IMMEDIATELY (optimistic update)
const userId = uid("user");
buffersRef.current.byId.set(userId, {
kind: "user",
@@ -665,8 +727,46 @@ export default function App({
buffersRef.current.tokenCount = 0;
// Rotate to a new thinking message for this turn
setThinkingMessage(getRandomThinkingMessage());
// Show streaming state immediately for responsiveness
setStreaming(true);
refreshDerived();
// Check for pending approvals before sending message
if (CHECK_PENDING_APPROVALS_BEFORE_SEND) {
try {
const client = getClient();
const { pendingApproval: existingApproval } = await getResumeData(
client,
agentId,
);
if (existingApproval) {
// There's a pending approval - show it and DON'T send the message yet
// The message will be restored to the input field for the user to decide
// Note: The user message is already in the transcript (optimistic update)
setStreaming(false); // Stop streaming indicator
setPendingApproval(existingApproval);
// Analyze approval context
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
existingApproval.toolArgs,
{},
);
const context = await analyzeToolApproval(
existingApproval.toolName,
parsedArgs,
);
setApprovalContext(context);
// Return false = message NOT submitted, will be restored to input
return { submitted: false };
}
} catch (error) {
// If check fails, proceed anyway (don't block user)
console.error("Failed to check pending approvals:", error);
}
}
// Start the conversation loop
await processConversation([
{
@@ -677,6 +777,8 @@ export default function App({
// Clean up placeholders after submission
clearPlaceholdersInText(msg);
return { submitted: true };
},
[
streaming,
@@ -1114,6 +1216,8 @@ export default function App({
permissionMode={uiPermissionMode}
onPermissionModeChange={setUiPermissionMode}
onExit={handleExit}
onInterrupt={handleInterrupt}
interruptRequested={interruptRequested}
/>
{/* Model Selector - conditionally mounted as overlay */}

View File

@@ -12,7 +12,7 @@ import { PasteAwareTextInput } from "./PasteAwareTextInput";
import { ShimmerText } from "./ShimmerText";
// Type assertion for ink-spinner compatibility
const Spinner = SpinnerLib as ComponentType;
const Spinner = SpinnerLib as ComponentType<{ type?: string }>;
// Only show token count when it exceeds this threshold
const COUNTER_VISIBLE_THRESHOLD = 1000;
@@ -27,16 +27,20 @@ export function Input({
permissionMode: externalMode,
onPermissionModeChange,
onExit,
onInterrupt,
interruptRequested = false,
}: {
visible?: boolean;
streaming: boolean;
commandRunning?: boolean;
tokenCount: number;
thinkingMessage: string;
onSubmit: (message?: string) => void;
onSubmit: (message?: string) => Promise<{ submitted: boolean }>;
permissionMode?: PermissionMode;
onPermissionModeChange?: (mode: PermissionMode) => void;
onExit?: () => void;
onInterrupt?: () => void;
interruptRequested?: boolean;
}) {
const [value, setValue] = useState("");
const [escapePressed, setEscapePressed] = useState(false);
@@ -62,22 +66,30 @@ export function Input({
const columns = useTerminalWidth();
const contentWidth = Math.max(0, columns - 2);
// Handle escape key for double-escape-to-clear
// Handle escape key for interrupt (when streaming) or double-escape-to-clear (when not)
useInput((_input, key) => {
if (key.escape && value) {
// Only work when input is non-empty
if (escapePressed) {
// Second escape - clear input
setValue("");
setEscapePressed(false);
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
} else {
// First escape - start 1-second timer
setEscapePressed(true);
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
escapeTimerRef.current = setTimeout(() => {
if (key.escape) {
// When streaming, use Esc to interrupt
if (streaming && onInterrupt && !interruptRequested) {
onInterrupt();
return;
}
// When input is non-empty, use double-escape to clear
if (value) {
if (escapePressed) {
// Second escape - clear input
setValue("");
setEscapePressed(false);
}, 1000);
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
} else {
// First escape - start 1-second timer
setEscapePressed(true);
if (escapeTimerRef.current) clearTimeout(escapeTimerRef.current);
escapeTimerRef.current = setTimeout(() => {
setEscapePressed(false);
}, 1000);
}
}
}
});
@@ -159,12 +171,17 @@ export function Input({
return () => clearInterval(id);
}, [streaming, thinkingMessage]);
const handleSubmit = () => {
const handleSubmit = async () => {
if (streaming || commandRunning) {
return;
}
onSubmit(value);
setValue("");
const previousValue = value;
setValue(""); // Clear immediately for responsiveness
const result = await onSubmit(previousValue);
// If message was NOT submitted (e.g. pending approval), restore it
if (!result.submitted) {
setValue(previousValue);
}
};
// Get display name and color for permission mode
@@ -212,7 +229,12 @@ export function Input({
message={thinkingMessage}
shimmerOffset={shimmerOffset}
/>
{shouldShowTokenCount && <Text dimColor> ({tokenCount} )</Text>}
<Text dimColor>
{" ("}
{interruptRequested ? "interrupting" : "esc to interrupt"}
{shouldShowTokenCount && ` · ${tokenCount}`}
{")"}
</Text>
</Box>
</Box>
)}

View File

@@ -150,6 +150,26 @@ export function markCurrentLineAsFinished(b: Buffers) {
}
}
/**
* Mark any incomplete tool calls as cancelled when stream is interrupted.
* This prevents blinking tool calls from staying in progress state.
*/
export function markIncompleteToolsAsCancelled(b: Buffers) {
for (const [id, line] of b.byId.entries()) {
if (line.kind === "tool_call" && line.phase !== "finished") {
const updatedLine = {
...line,
phase: "finished" as const,
resultOk: false,
resultText: "Interrupted by user",
};
b.byId.set(id, updatedLine);
}
}
// Also mark any streaming assistant/reasoning lines as finished
markCurrentLineAsFinished(b);
}
type ToolCallLine = Extract<Line, { kind: "tool_call" }>;
// Flatten common SDK "parts" → text

View File

@@ -2,6 +2,7 @@ import { Letta } from "@letta-ai/letta-client";
import {
type createBuffers,
markCurrentLineAsFinished,
markIncompleteToolsAsCancelled,
onChunk,
} from "./accumulator";
@@ -23,6 +24,7 @@ export async function drainStream(
stream: AsyncIterable<Letta.LettaStreamingResponse>,
buffers: ReturnType<typeof createBuffers>,
refresh: () => void,
abortSignal?: AbortSignal,
): Promise<DrainResult> {
const startTime = performance.now();
@@ -36,6 +38,14 @@ export async function drainStream(
let lastSeqId: number | null = null;
for await (const chunk of stream) {
// Check if stream was aborted
if (abortSignal?.aborted) {
stopReason = "cancelled" as Letta.StopReasonType;
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
markIncompleteToolsAsCancelled(buffers);
queueMicrotask(refresh);
break;
}
// Store the runId and seqId to re-connect if stream is interrupted
if ("runId" in chunk && "seqId" in chunk && chunk.runId && chunk.seqId) {
lastRunId = chunk.runId;

View File

@@ -98,7 +98,7 @@ function generatePythonStub(
* @returns Tool permissions object with requiresApproval flag
*/
export function getToolPermissions(toolName: string) {
return TOOL_PERMISSIONS[toolName] || { requiresApproval: false };
return TOOL_PERMISSIONS[toolName as ToolName] || { requiresApproval: false };
}
/**
@@ -108,7 +108,7 @@ export function getToolPermissions(toolName: string) {
* @deprecated Use checkToolPermission instead for full permission system support
*/
export function requiresApproval(toolName: string): boolean {
return TOOL_PERMISSIONS[toolName]?.requiresApproval ?? false;
return TOOL_PERMISSIONS[toolName as ToolName]?.requiresApproval ?? false;
}
/**