fix: optimistic cancellation [LET-6273] (#166)

This commit is contained in:
Christina Tong
2025-12-08 23:46:18 -08:00
committed by GitHub
parent 38b532e8ba
commit e8bd2e6537
2 changed files with 85 additions and 19 deletions

View File

@@ -1,13 +1,15 @@
// src/cli/App.tsx
import { existsSync, readFileSync } from "node:fs";
import { APIError } from "@letta-ai/letta-client/core/error";
import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error";
import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type {
AgentState,
MessageCreate,
} from "@letta-ai/letta-client/resources/agents/agents";
import type {
ApprovalCreate,
LettaStreamingResponse,
Message,
} from "@letta-ai/letta-client/resources/agents/messages";
import type { LlmConfig } from "@letta-ai/letta-client/resources/models/models";
@@ -360,6 +362,9 @@ export default function App({
// AbortController for stream cancellation
const abortControllerRef = useRef<AbortController | null>(null);
// Track if user wants to cancel (persists across state updates)
const userCancelledRef = useRef(false);
// Track terminal shrink events to refresh static output (prevents wrapped leftovers)
const columns = useTerminalWidth();
const prevColumnsRef = useRef(columns);
@@ -602,6 +607,12 @@ export default function App({
let lastKnownRunId: string | null = null;
try {
// Check if user hit escape before we started
if (userCancelledRef.current) {
userCancelledRef.current = false; // Reset for next time
return;
}
setStreaming(true);
abortControllerRef.current = new AbortController();
@@ -610,6 +621,12 @@ export default function App({
markIncompleteToolsAsCancelled(buffersRef.current);
while (true) {
// Check if cancelled before starting new stream
if (abortControllerRef.current?.signal.aborted) {
setStreaming(false);
return;
}
// Stream one turn
const stream = await sendMessageStream(agentId, currentInput);
const { stopReason, approval, approvals, apiDurationMs, lastRunId } =
@@ -640,7 +657,10 @@ export default function App({
// Case 1.5: Stream was cancelled by user
if (stopReason === "cancelled") {
appendError("Stream interrupted by user");
// Only show error if not using eager cancel (eager cancel already handled this)
if (!EAGER_CANCEL) {
appendError("Stream interrupted by user");
}
setStreaming(false);
return;
}
@@ -874,6 +894,14 @@ export default function App({
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
markIncompleteToolsAsCancelled(buffersRef.current);
// If using eager cancel and this is an abort error, silently ignore it
// The user already got "Stream interrupted by user" feedback from handleInterrupt
if (EAGER_CANCEL && e instanceof APIUserAbortError) {
setStreaming(false);
refreshDerived();
return;
}
// Build error message with run_id for debugging
const runIdSuffix = lastKnownRunId
? `\n(run_id: ${lastKnownRunId}, stop_reason: error)`
@@ -921,24 +949,53 @@ export default function App({
if (!streaming || interruptRequested) return;
setInterruptRequested(true);
try {
const client = await getClient();
// Send cancel request to backend
const _cancelResult = await client.agents.messages.cancel(agentId);
// console.error("cancelResult", JSON.stringify(cancelResult, null, 2));
// If EAGER_CANCEL is enabled, immediately abort the stream client-side
// This provides instant feedback without waiting for backend to acknowledge
if (EAGER_CANCEL && abortControllerRef.current) {
// If EAGER_CANCEL is enabled, immediately stop everything client-side first
if (EAGER_CANCEL) {
// Abort the stream via abort signal
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
} catch (e) {
appendError(`Failed to interrupt stream: ${String(e)}`);
setInterruptRequested(false);
// Set cancellation flag to prevent processConversation from starting
userCancelledRef.current = true;
// Stop streaming and show error message
setStreaming(false);
markIncompleteToolsAsCancelled(buffersRef.current);
appendError("Stream interrupted by user");
refreshDerived();
// Send cancel request to backend asynchronously (fire-and-forget)
// Don't wait for it or show errors since user already got feedback
getClient()
.then((client) => client.agents.messages.cancel(agentId))
.catch(() => {
// Silently ignore - cancellation already happened client-side
});
return;
} else {
setInterruptRequested(true);
try {
const client = await getClient();
await client.agents.messages.cancel(agentId);
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
} catch (e) {
appendError(`Failed to interrupt stream: ${String(e)}`);
setInterruptRequested(false);
}
}
}, [agentId, streaming, interruptRequested, appendError, isExecutingTool]);
}, [
agentId,
streaming,
interruptRequested,
appendError,
isExecutingTool,
refreshDerived,
]);
// Reset interrupt flag when streaming ends
useEffect(() => {
@@ -2876,7 +2933,9 @@ Plan file path: ${planFilePath}`;
!systemPromptSelectorOpen &&
!agentSelectorOpen
}
streaming={streaming}
streaming={
streaming && !abortControllerRef.current?.signal.aborted
}
commandRunning={commandRunning}
tokenCount={tokenCount}
thinkingMessage={thinkingMessage}

View File

@@ -53,7 +53,6 @@ export async function drainStream(
// Check if stream was aborted
if (abortSignal?.aborted) {
stopReason = "cancelled";
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
markIncompleteToolsAsCancelled(buffers);
queueMicrotask(refresh);
break;
@@ -130,6 +129,14 @@ export async function drainStream(
onChunk(buffers, chunk);
queueMicrotask(refresh);
// Check abort signal again after processing chunk (for eager cancellation)
if (abortSignal?.aborted) {
stopReason = "cancelled";
markIncompleteToolsAsCancelled(buffers);
queueMicrotask(refresh);
break;
}
if (chunk.message_type === "stop_reason") {
stopReason = chunk.stop_reason;
// Continue reading stream to get usage_statistics that may come after