fix: resolve queue-cancel hang and stuck queue issues (#617)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
283
src/cli/App.tsx
283
src/cli/App.tsx
@@ -63,6 +63,7 @@ import {
|
||||
savePermissionRule,
|
||||
type ToolExecutionResult,
|
||||
} from "../tools/manager";
|
||||
import { debugLog } from "../utils/debug";
|
||||
import {
|
||||
handleMcpAdd,
|
||||
handleMcpUsage,
|
||||
@@ -993,6 +994,10 @@ export default function App({
|
||||
restoreQueueOnCancelRef.current = restoreQueueOnCancel;
|
||||
}, [restoreQueueOnCancel]);
|
||||
|
||||
// Epoch counter to force dequeue effect re-run when refs change but state doesn't
|
||||
// Incremented when userCancelledRef is reset while messages are queued
|
||||
const [dequeueEpoch, setDequeueEpoch] = useState(0);
|
||||
|
||||
// Helper to check if agent is busy (streaming, executing tool, or running command)
|
||||
// Uses refs for synchronous access outside React's closure system
|
||||
// biome-ignore lint/correctness/useExhaustiveDependencies: refs are stable objects, .current is read dynamically
|
||||
@@ -2022,24 +2027,18 @@ export default function App({
|
||||
|
||||
// Check if we were waiting for cancel but stream finished naturally
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
// Queue-cancel completed - let dequeue effect handle the messages
|
||||
// We don't call onSubmit here because isAgentBusy() would return true
|
||||
// (abortControllerRef is still set until finally block), causing re-queue
|
||||
debugLog(
|
||||
"queue",
|
||||
"Queue-cancel completed (end_turn): messages will be processed by dequeue effect",
|
||||
);
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
// Don't clear queue, don't send - let dequeue effect handle them one by one
|
||||
} else {
|
||||
// Auto-send concatenated message
|
||||
// Clear the queue
|
||||
setMessageQueue([]);
|
||||
|
||||
// Concatenate the snapshot
|
||||
const concatenatedMessage = queueSnapshotRef.current.join("\n");
|
||||
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
// Reset flags - dequeue effect will fire when streaming=false commits
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
}
|
||||
@@ -2061,24 +2060,18 @@ export default function App({
|
||||
|
||||
// Check if this cancel was triggered by queue threshold
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
// Queue-cancel completed - let dequeue effect handle the messages
|
||||
// We don't call onSubmit here because isAgentBusy() would return true
|
||||
// (abortControllerRef is still set until finally block), causing re-queue
|
||||
debugLog(
|
||||
"queue",
|
||||
"Queue-cancel completed (cancelled): messages will be processed by dequeue effect",
|
||||
);
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
// Don't clear queue, don't send - let dequeue effect handle them one by one
|
||||
} else {
|
||||
// Auto-send concatenated message
|
||||
// Clear the queue
|
||||
setMessageQueue([]);
|
||||
|
||||
// Concatenate the snapshot
|
||||
const concatenatedMessage = queueSnapshotRef.current.join("\n");
|
||||
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
// Reset flags - dequeue effect will fire when streaming=false commits
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
} else {
|
||||
@@ -2133,48 +2126,40 @@ export default function App({
|
||||
// If in quietCancel mode (user queued messages), auto-reject all approvals
|
||||
// and send denials + queued messages together
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
// Create denial results for all approvals
|
||||
const denialResults = approvalsToProcess.map((approvalItem) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: approvalItem.toolCallId,
|
||||
approve: false,
|
||||
reason: "User cancelled - new message queued",
|
||||
}));
|
||||
|
||||
// Update buffers to show tools as cancelled
|
||||
for (const approvalItem of approvalsToProcess) {
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: approvalItem.toolCallId,
|
||||
tool_return: "Cancelled - user sent new message",
|
||||
status: "error",
|
||||
});
|
||||
}
|
||||
refreshDerived();
|
||||
|
||||
// Queue denial results - dequeue effect will pick them up via onSubmit
|
||||
queueApprovalResults(denialResults);
|
||||
|
||||
debugLog(
|
||||
"queue",
|
||||
`Queue-cancel completed (requires_approval): ${denialResults.length} denial(s) queued, messages will be processed by dequeue effect`,
|
||||
);
|
||||
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
// Don't clear queue, don't send - let dequeue effect handle them one by one
|
||||
} else {
|
||||
// Create denial results for all approvals
|
||||
const denialResults = approvalsToProcess.map(
|
||||
(approvalItem) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: approvalItem.toolCallId,
|
||||
approve: false,
|
||||
reason: "User cancelled - new message queued",
|
||||
}),
|
||||
);
|
||||
|
||||
// Update buffers to show tools as cancelled
|
||||
for (const approvalItem of approvalsToProcess) {
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: approvalItem.toolCallId,
|
||||
tool_return: "Cancelled - user sent new message",
|
||||
status: "error",
|
||||
});
|
||||
}
|
||||
refreshDerived();
|
||||
|
||||
// Queue denial results to be sent with the queued message
|
||||
queueApprovalResults(denialResults);
|
||||
|
||||
// Get queued messages and clear queue
|
||||
const concatenatedMessage = queueSnapshotRef.current.join("\n");
|
||||
setMessageQueue([]);
|
||||
|
||||
// Send via onSubmit which will combine queuedApprovalResults + message
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
// Reset flags - dequeue effect will fire when streaming=false commits
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
setStreaming(false);
|
||||
@@ -2455,27 +2440,21 @@ export default function App({
|
||||
|
||||
// Check if user queued messages during auto-allowed tool execution
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
} else {
|
||||
// Queue results to be sent with the queued message
|
||||
if (allResults.length > 0) {
|
||||
queueApprovalResults(allResults, autoAllowedMetadata);
|
||||
}
|
||||
|
||||
// Get queued messages and clear queue
|
||||
const concatenatedMessage =
|
||||
queueSnapshotRef.current.join("\n");
|
||||
setMessageQueue([]);
|
||||
|
||||
// Send via onSubmit
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
// Queue results - dequeue effect will pick them up via onSubmit
|
||||
if (allResults.length > 0) {
|
||||
queueApprovalResults(allResults, autoAllowedMetadata);
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
debugLog(
|
||||
"queue",
|
||||
`Queue-cancel completed (auto-allowed): ${allResults.length} result(s) queued, messages will be processed by dequeue effect`,
|
||||
);
|
||||
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
setRestoreQueueOnCancel(false);
|
||||
}
|
||||
|
||||
// Reset flags - dequeue effect will fire when streaming=false commits
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
setStreaming(false);
|
||||
@@ -2502,49 +2481,43 @@ export default function App({
|
||||
|
||||
// Check again if user queued messages during auto-allowed tool execution
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
} else {
|
||||
// Create denial results for tools that need user input
|
||||
const denialResults = needsUserInput.map((ac) => ({
|
||||
type: "approval" as const,
|
||||
// Create denial results for tools that need user input
|
||||
const denialResults = needsUserInput.map((ac) => ({
|
||||
type: "approval" as const,
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
approve: false,
|
||||
reason: "User cancelled - new message queued",
|
||||
}));
|
||||
|
||||
// Update buffers to show tools as cancelled
|
||||
for (const ac of needsUserInput) {
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
approve: false,
|
||||
reason: "User cancelled - new message queued",
|
||||
}));
|
||||
tool_return: "Cancelled - user sent new message",
|
||||
status: "error",
|
||||
});
|
||||
}
|
||||
refreshDerived();
|
||||
|
||||
// Update buffers to show tools as cancelled
|
||||
for (const ac of needsUserInput) {
|
||||
onChunk(buffersRef.current, {
|
||||
message_type: "tool_return_message",
|
||||
id: "dummy",
|
||||
date: new Date().toISOString(),
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
tool_return: "Cancelled - user sent new message",
|
||||
status: "error",
|
||||
});
|
||||
}
|
||||
refreshDerived();
|
||||
|
||||
// Combine with auto-handled results and queue for sending
|
||||
const queuedResults = [...allResults, ...denialResults];
|
||||
if (queuedResults.length > 0) {
|
||||
queueApprovalResults(queuedResults, autoAllowedMetadata);
|
||||
}
|
||||
|
||||
// Get queued messages and clear queue
|
||||
const concatenatedMessage =
|
||||
queueSnapshotRef.current.join("\n");
|
||||
setMessageQueue([]);
|
||||
|
||||
// Send via onSubmit
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
// Combine with auto-handled results and queue for sending
|
||||
const queuedResults = [...allResults, ...denialResults];
|
||||
if (queuedResults.length > 0) {
|
||||
queueApprovalResults(queuedResults, autoAllowedMetadata);
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
debugLog(
|
||||
"queue",
|
||||
`Queue-cancel completed (auto-allowed+approvals): ${queuedResults.length} result(s) queued, messages will be processed by dequeue effect`,
|
||||
);
|
||||
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
setRestoreQueueOnCancel(false);
|
||||
}
|
||||
|
||||
// Reset flags - dequeue effect will fire when streaming=false commits
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
setStreaming(false);
|
||||
@@ -3820,6 +3793,17 @@ export default function App({
|
||||
// userCancelledRef.current, so we must clear it here to prevent blocking.
|
||||
userCancelledRef.current = false;
|
||||
|
||||
// If there are queued messages and agent is not busy, bump epoch to trigger
|
||||
// dequeue effect. Without this, the effect won't re-run because refs aren't
|
||||
// in its deps array (only state values are).
|
||||
if (!isAgentBusy() && messageQueue.length > 0) {
|
||||
debugLog(
|
||||
"queue",
|
||||
`Bumping dequeueEpoch: userCancelledRef was reset, ${messageQueue.length} message(s) queued, agent not busy`,
|
||||
);
|
||||
setDequeueEpoch((e) => e + 1);
|
||||
}
|
||||
|
||||
if (isAgentBusy()) {
|
||||
setMessageQueue((prev) => {
|
||||
const newQueue = [...prev, msg];
|
||||
@@ -3835,6 +3819,10 @@ export default function App({
|
||||
) {
|
||||
waitingForQueueCancelRef.current = true;
|
||||
queueSnapshotRef.current = [...newQueue];
|
||||
debugLog(
|
||||
"queue",
|
||||
`Initiating queue-cancel: queueing "${msg.slice(0, 50)}${msg.length > 50 ? "..." : ""}", sending cancel to server`,
|
||||
);
|
||||
|
||||
// Abort client-side tool execution if in progress
|
||||
// This makes tool interruption visible immediately instead of waiting for completion
|
||||
@@ -3856,6 +3844,28 @@ export default function App({
|
||||
// Reset flag if cancel fails
|
||||
waitingForQueueCancelRef.current = false;
|
||||
});
|
||||
|
||||
// Timeout fallback: if server cancel is slow or fails, abort client-side
|
||||
// after 3 seconds to prevent "Thinking..." from hanging forever
|
||||
setTimeout(() => {
|
||||
// Only abort if we're still waiting AND stream is still active
|
||||
// (If stream ended naturally or user pressed ESC, these will be false/null)
|
||||
if (
|
||||
waitingForQueueCancelRef.current &&
|
||||
abortControllerRef.current
|
||||
) {
|
||||
debugLog(
|
||||
"queue",
|
||||
"Timeout fallback: aborting stream after 3s (server cancel was slow/failed)",
|
||||
);
|
||||
abortControllerRef.current.abort();
|
||||
|
||||
// Reset flags here because the abort may cause early returns
|
||||
// in processConversation that skip the completion handlers
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
}
|
||||
}, 3000);
|
||||
}
|
||||
|
||||
return newQueue;
|
||||
@@ -6363,6 +6373,10 @@ DO NOT respond to these messages or otherwise consider them in your response unl
|
||||
|
||||
// Process queued messages when streaming ends
|
||||
useEffect(() => {
|
||||
// Reference dequeueEpoch to satisfy exhaustive-deps - it's used to force
|
||||
// re-runs when userCancelledRef is reset (refs aren't in deps)
|
||||
void dequeueEpoch;
|
||||
|
||||
if (
|
||||
!streaming &&
|
||||
messageQueue.length > 0 &&
|
||||
@@ -6373,12 +6387,24 @@ DO NOT respond to these messages or otherwise consider them in your response unl
|
||||
!waitingForQueueCancelRef.current && // Don't dequeue while waiting for cancel
|
||||
!userCancelledRef.current // Don't dequeue if user just cancelled
|
||||
) {
|
||||
const [firstMessage, ...rest] = messageQueue;
|
||||
setMessageQueue(rest);
|
||||
// Concatenate all queued messages into one (better UX when user types multiple
|
||||
// messages quickly - they get combined into one context for the agent)
|
||||
const concatenatedMessage = messageQueue.join("\n");
|
||||
debugLog(
|
||||
"queue",
|
||||
`Dequeuing ${messageQueue.length} message(s): "${concatenatedMessage.slice(0, 50)}${concatenatedMessage.length > 50 ? "..." : ""}"`,
|
||||
);
|
||||
setMessageQueue([]);
|
||||
|
||||
// Submit the first message using the normal submit flow
|
||||
// Submit the concatenated message using the normal submit flow
|
||||
// This ensures all setup (reminders, UI updates, etc.) happens correctly
|
||||
onSubmitRef.current(firstMessage);
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
} else if (messageQueue.length > 0) {
|
||||
// Log why dequeue was blocked (useful for debugging stuck queues)
|
||||
debugLog(
|
||||
"queue",
|
||||
`Dequeue blocked: streaming=${streaming}, pendingApprovals=${pendingApprovals.length}, commandRunning=${commandRunning}, isExecutingTool=${isExecutingTool}, anySelectorOpen=${anySelectorOpen}, waitingForQueueCancel=${waitingForQueueCancelRef.current}, userCancelled=${userCancelledRef.current}`,
|
||||
);
|
||||
}
|
||||
}, [
|
||||
streaming,
|
||||
@@ -6387,6 +6413,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
|
||||
commandRunning,
|
||||
isExecutingTool,
|
||||
anySelectorOpen,
|
||||
dequeueEpoch, // Triggered when userCancelledRef is reset while messages are queued
|
||||
]);
|
||||
|
||||
// Helper to send all approval results when done
|
||||
|
||||
Reference in New Issue
Block a user