fix: properly stop streams on ESC interrupt (#457)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -2054,13 +2054,37 @@ export default function App({
|
||||
}, []);
|
||||
|
||||
const handleInterrupt = useCallback(async () => {
|
||||
// If we're executing client-side tools, abort them locally instead of hitting the backend
|
||||
// Don't show "Stream interrupted" banner - the tool result will show "Interrupted by user"
|
||||
// If we're executing client-side tools, abort them AND the main stream
|
||||
if (isExecutingTool && toolAbortControllerRef.current) {
|
||||
toolAbortControllerRef.current.abort();
|
||||
|
||||
// ALSO abort the main stream - don't leave it running
|
||||
buffersRef.current.abortGeneration =
|
||||
(buffersRef.current.abortGeneration || 0) + 1;
|
||||
const toolsCancelled = markIncompleteToolsAsCancelled(buffersRef.current);
|
||||
|
||||
// Show interrupt feedback (yellow message if no tools were cancelled)
|
||||
if (!toolsCancelled) {
|
||||
appendError("Stream interrupted by user", true);
|
||||
}
|
||||
|
||||
if (abortControllerRef.current) {
|
||||
abortControllerRef.current.abort();
|
||||
abortControllerRef.current = null;
|
||||
}
|
||||
|
||||
userCancelledRef.current = true; // Prevent dequeue
|
||||
setStreaming(false);
|
||||
setIsExecutingTool(false);
|
||||
refreshDerived();
|
||||
|
||||
// Delay flag reset to ensure React has flushed state updates before dequeue can fire.
|
||||
// Use setTimeout(50) instead of setTimeout(0) - the longer delay ensures React's
|
||||
// batched state updates have been fully processed before we allow the dequeue effect.
|
||||
setTimeout(() => {
|
||||
userCancelledRef.current = false;
|
||||
}, 50);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2077,7 +2101,13 @@ export default function App({
|
||||
// Prevent multiple handleInterrupt calls while state updates are pending
|
||||
setInterruptRequested(true);
|
||||
|
||||
// Abort the stream via abort signal
|
||||
// Set interrupted flag FIRST, before abort() triggers any async work.
|
||||
// This ensures onChunk and other guards see interrupted=true immediately.
|
||||
buffersRef.current.abortGeneration =
|
||||
(buffersRef.current.abortGeneration || 0) + 1;
|
||||
const toolsCancelled = markIncompleteToolsAsCancelled(buffersRef.current);
|
||||
|
||||
// NOW abort the stream - interrupted flag is already set
|
||||
if (abortControllerRef.current) {
|
||||
abortControllerRef.current.abort();
|
||||
abortControllerRef.current = null; // Clear ref so isAgentBusy() returns false
|
||||
@@ -2089,7 +2119,6 @@ export default function App({
|
||||
// Stop streaming and show error message (unless tool calls were cancelled,
|
||||
// since the tool result will show "Interrupted by user")
|
||||
setStreaming(false);
|
||||
const toolsCancelled = markIncompleteToolsAsCancelled(buffersRef.current);
|
||||
if (!toolsCancelled) {
|
||||
appendError("Stream interrupted by user", true);
|
||||
}
|
||||
@@ -2111,13 +2140,13 @@ export default function App({
|
||||
});
|
||||
|
||||
// Reset cancellation flags after cleanup is complete.
|
||||
// This allows the dequeue effect to process any queued messages.
|
||||
// We use setTimeout to ensure React state updates (setStreaming, etc.)
|
||||
// have been processed before the dequeue effect runs.
|
||||
// Use setTimeout(50) instead of setTimeout(0) to ensure React has fully processed
|
||||
// the streaming=false state before we allow the dequeue effect to start a new conversation.
|
||||
// This prevents the "Maximum update depth exceeded" infinite render loop.
|
||||
setTimeout(() => {
|
||||
userCancelledRef.current = false;
|
||||
setInterruptRequested(false);
|
||||
}, 0);
|
||||
}, 50);
|
||||
|
||||
return;
|
||||
} else {
|
||||
|
||||
@@ -73,6 +73,7 @@ export type Buffers = {
|
||||
pendingRefresh?: boolean; // Track throttled refresh state
|
||||
interrupted?: boolean; // Track if stream was interrupted by user (skip stale refreshes)
|
||||
commitGeneration?: number; // Incremented when resuming from error to invalidate pending refreshes
|
||||
abortGeneration?: number; // Incremented on each interrupt to detect cancellation across async boundaries
|
||||
usage: {
|
||||
promptTokens: number;
|
||||
completionTokens: number;
|
||||
@@ -92,6 +93,7 @@ export function createBuffers(): Buffers {
|
||||
toolCallIdToLineId: new Map(),
|
||||
lastOtid: null,
|
||||
commitGeneration: 0,
|
||||
abortGeneration: 0,
|
||||
usage: {
|
||||
promptTokens: 0,
|
||||
completionTokens: 0,
|
||||
|
||||
@@ -56,12 +56,22 @@ export async function drainStream(
|
||||
// Track if we triggered abort via our listener (for eager cancellation)
|
||||
let abortedViaListener = false;
|
||||
|
||||
// Capture the abort generation at stream start to detect if handleInterrupt ran
|
||||
const startAbortGen = buffers.abortGeneration || 0;
|
||||
|
||||
// Set up abort listener to propagate our signal to SDK's stream controller
|
||||
// This immediately cancels the HTTP request instead of waiting for next chunk
|
||||
const abortHandler = () => {
|
||||
abortedViaListener = true;
|
||||
// Abort the SDK's stream controller to cancel the underlying HTTP request
|
||||
if (stream.controller && !stream.controller.signal.aborted) {
|
||||
if (!stream.controller) {
|
||||
debugWarn(
|
||||
"drainStream",
|
||||
"stream.controller is undefined - cannot abort HTTP request",
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (!stream.controller.signal.aborted) {
|
||||
stream.controller.abort();
|
||||
}
|
||||
};
|
||||
@@ -80,6 +90,15 @@ export async function drainStream(
|
||||
for await (const chunk of stream) {
|
||||
// console.log("chunk", chunk);
|
||||
|
||||
// Check if abort generation changed (handleInterrupt ran while we were waiting)
|
||||
// This catches cases where the abort signal might not propagate correctly
|
||||
if ((buffers.abortGeneration || 0) !== startAbortGen) {
|
||||
stopReason = "cancelled";
|
||||
// Don't call markIncompleteToolsAsCancelled - handleInterrupt already did
|
||||
queueMicrotask(refresh);
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if stream was aborted
|
||||
if (abortSignal?.aborted) {
|
||||
stopReason = "cancelled";
|
||||
@@ -321,11 +340,14 @@ export async function drainStreamWithResume(
|
||||
);
|
||||
|
||||
// If stream ended without proper stop_reason and we have resume info, try once to reconnect
|
||||
// Only resume if we have an abortSignal AND it's not aborted (explicit check prevents
|
||||
// undefined abortSignal from accidentally allowing resume after user cancellation)
|
||||
if (
|
||||
result.stopReason === "error" &&
|
||||
result.lastRunId &&
|
||||
result.lastSeqId !== null &&
|
||||
!abortSignal?.aborted
|
||||
abortSignal &&
|
||||
!abortSignal.aborted
|
||||
) {
|
||||
// Preserve the original error in case resume fails
|
||||
const originalFallbackError = result.fallbackError;
|
||||
|
||||
Reference in New Issue
Block a user