feat: update message queueing silent cancel and clean up eager cancellation state (#184)
This commit is contained in:
159
src/cli/App.tsx
159
src/cli/App.tsx
@@ -367,6 +367,15 @@ export default function App({
|
||||
// Message queue state for queueing messages during streaming
|
||||
const [messageQueue, setMessageQueue] = useState<string[]>([]);
|
||||
|
||||
// Queue cancellation: when queue length > 1, we send cancel and wait for natural stream end
|
||||
const waitingForQueueCancelRef = useRef(false);
|
||||
const queueSnapshotRef = useRef<string[]>([]);
|
||||
const [restoreQueueOnCancel, setRestoreQueueOnCancel] = useState(false);
|
||||
const restoreQueueOnCancelRef = useRef(restoreQueueOnCancel);
|
||||
useEffect(() => {
|
||||
restoreQueueOnCancelRef.current = restoreQueueOnCancel;
|
||||
}, [restoreQueueOnCancel]);
|
||||
|
||||
// Track terminal shrink events to refresh static output (prevents wrapped leftovers)
|
||||
const columns = useTerminalWidth();
|
||||
const prevColumnsRef = useRef(columns);
|
||||
@@ -392,16 +401,13 @@ export default function App({
|
||||
// Commit immutable/finished lines into the historical log
|
||||
const commitEligibleLines = useCallback((b: Buffers) => {
|
||||
const newlyCommitted: StaticItem[] = [];
|
||||
// console.log(`[COMMIT] Checking ${b.order.length} lines for commit eligibility`);
|
||||
for (const id of b.order) {
|
||||
if (emittedIdsRef.current.has(id)) continue;
|
||||
const ln = b.byId.get(id);
|
||||
if (!ln) continue;
|
||||
// console.log(`[COMMIT] Checking ${id}: kind=${ln.kind}, phase=${(ln as any).phase}`);
|
||||
if (ln.kind === "user" || ln.kind === "error" || ln.kind === "status") {
|
||||
emittedIdsRef.current.add(id);
|
||||
newlyCommitted.push({ ...ln });
|
||||
// console.log(`[COMMIT] Committed ${id} (${ln.kind})`);
|
||||
continue;
|
||||
}
|
||||
// Commands with phase should only commit when finished
|
||||
@@ -409,20 +415,15 @@ export default function App({
|
||||
if (!ln.phase || ln.phase === "finished") {
|
||||
emittedIdsRef.current.add(id);
|
||||
newlyCommitted.push({ ...ln });
|
||||
// console.log(`[COMMIT] Committed ${id} (command, finished)`);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if ("phase" in ln && ln.phase === "finished") {
|
||||
emittedIdsRef.current.add(id);
|
||||
newlyCommitted.push({ ...ln });
|
||||
// console.log(`[COMMIT] Committed ${id} (${ln.kind}, finished)`);
|
||||
} else {
|
||||
// console.log(`[COMMIT] NOT committing ${id} (phase=${(ln as any).phase})`);
|
||||
}
|
||||
}
|
||||
if (newlyCommitted.length > 0) {
|
||||
// console.log(`[COMMIT] Total committed: ${newlyCommitted.length} items`);
|
||||
setStaticItems((prev) => [...prev, ...newlyCommitted]);
|
||||
}
|
||||
}, []);
|
||||
@@ -647,16 +648,67 @@ export default function App({
|
||||
// Case 1: Turn ended normally
|
||||
if (stopReason === "end_turn") {
|
||||
setStreaming(false);
|
||||
|
||||
// Check if we were waiting for cancel but stream finished naturally
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
// Don't clear queue, don't send - let dequeue effect handle them one by one
|
||||
} else {
|
||||
// Auto-send concatenated message
|
||||
// Clear the queue
|
||||
setMessageQueue([]);
|
||||
|
||||
// Concatenate the snapshot
|
||||
const concatenatedMessage = queueSnapshotRef.current.join("\n");
|
||||
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Case 1.5: Stream was cancelled by user
|
||||
if (stopReason === "cancelled") {
|
||||
// Only show error if not using eager cancel (eager cancel already handled this)
|
||||
if (!EAGER_CANCEL) {
|
||||
appendError("Stream interrupted by user");
|
||||
}
|
||||
setStreaming(false);
|
||||
|
||||
// Check if this cancel was triggered by queue threshold
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
if (restoreQueueOnCancelRef.current) {
|
||||
// User hit ESC during queue cancel - abort the auto-send
|
||||
setRestoreQueueOnCancel(false);
|
||||
// Don't clear queue, don't send - let dequeue effect handle them one by one
|
||||
} else {
|
||||
// Auto-send concatenated message
|
||||
// Clear the queue
|
||||
setMessageQueue([]);
|
||||
|
||||
// Concatenate the snapshot
|
||||
const concatenatedMessage = queueSnapshotRef.current.join("\n");
|
||||
|
||||
if (concatenatedMessage.trim()) {
|
||||
onSubmitRef.current(concatenatedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
// Reset flags
|
||||
waitingForQueueCancelRef.current = false;
|
||||
queueSnapshotRef.current = [];
|
||||
} else {
|
||||
// Regular user cancellation - show error
|
||||
if (!EAGER_CANCEL) {
|
||||
appendError("Stream interrupted by user");
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -954,6 +1006,12 @@ export default function App({
|
||||
|
||||
if (!streaming || interruptRequested) return;
|
||||
|
||||
// If we're in the middle of queue cancel, set flag to restore instead of auto-send
|
||||
if (waitingForQueueCancelRef.current) {
|
||||
setRestoreQueueOnCancel(true);
|
||||
// Don't reset flags - let the cancel complete naturally
|
||||
}
|
||||
|
||||
// If EAGER_CANCEL is enabled, immediately stop everything client-side first
|
||||
if (EAGER_CANCEL) {
|
||||
// Abort the stream via abort signal
|
||||
@@ -970,6 +1028,13 @@ export default function App({
|
||||
appendError("Stream interrupted by user");
|
||||
refreshDerived();
|
||||
|
||||
// Clear any pending approvals since we're cancelling
|
||||
setPendingApprovals([]);
|
||||
setApprovalContexts([]);
|
||||
setApprovalResults([]);
|
||||
setAutoHandledResults([]);
|
||||
setAutoDeniedApprovals([]);
|
||||
|
||||
// Send cancel request to backend asynchronously (fire-and-forget)
|
||||
// Don't wait for it or show errors since user already got feedback
|
||||
getClient()
|
||||
@@ -1032,7 +1097,28 @@ export default function App({
|
||||
const agentBusy = streaming || isExecutingTool || commandRunning;
|
||||
|
||||
if (agentBusy) {
|
||||
setMessageQueue((prev) => [...prev, msg]);
|
||||
setMessageQueue((prev) => {
|
||||
const newQueue = [...prev, msg];
|
||||
|
||||
// If queue grows to 2+ messages and we're not already waiting for cancel,
|
||||
// send cancel request and capture snapshot
|
||||
if (newQueue.length > 1 && !waitingForQueueCancelRef.current) {
|
||||
// Capture snapshot of queue right now
|
||||
queueSnapshotRef.current = [...newQueue];
|
||||
waitingForQueueCancelRef.current = true;
|
||||
|
||||
// Send cancel request to backend (fire-and-forget)
|
||||
getClient()
|
||||
.then((client) => client.agents.messages.cancel(agentId))
|
||||
.then(() => {})
|
||||
.catch(() => {
|
||||
// Reset flag if cancel fails
|
||||
waitingForQueueCancelRef.current = false;
|
||||
});
|
||||
}
|
||||
|
||||
return newQueue;
|
||||
});
|
||||
return { submitted: true }; // Clears input
|
||||
}
|
||||
|
||||
@@ -1971,10 +2057,33 @@ ${recentCommits}
|
||||
agent,
|
||||
);
|
||||
|
||||
// Check if user cancelled while we were fetching approval state
|
||||
if (
|
||||
userCancelledRef.current ||
|
||||
abortControllerRef.current?.signal.aborted
|
||||
) {
|
||||
// User hit ESC during the check - abort and clean up
|
||||
buffersRef.current.byId.delete(userId);
|
||||
const orderIndex = buffersRef.current.order.indexOf(userId);
|
||||
if (orderIndex !== -1) {
|
||||
buffersRef.current.order.splice(orderIndex, 1);
|
||||
}
|
||||
setStreaming(false);
|
||||
refreshDerived();
|
||||
return { submitted: false };
|
||||
}
|
||||
|
||||
if (existingApprovals && existingApprovals.length > 0) {
|
||||
// There are pending approvals - show them and DON'T send the message yet
|
||||
// The message will be restored to the input field for the user to decide
|
||||
// Note: The user message is already in the transcript (optimistic update)
|
||||
|
||||
// Remove the optimistic user message from transcript to avoid duplication
|
||||
buffersRef.current.byId.delete(userId);
|
||||
const orderIndex = buffersRef.current.order.indexOf(userId);
|
||||
if (orderIndex !== -1) {
|
||||
buffersRef.current.order.splice(orderIndex, 1);
|
||||
}
|
||||
|
||||
setStreaming(false); // Stop streaming indicator
|
||||
setPendingApprovals(existingApprovals);
|
||||
|
||||
@@ -1988,14 +2097,28 @@ ${recentCommits}
|
||||
return await analyzeToolApproval(approval.toolName, parsedArgs);
|
||||
}),
|
||||
);
|
||||
|
||||
// Check again after async approval analysis
|
||||
if (
|
||||
userCancelledRef.current ||
|
||||
abortControllerRef.current?.signal.aborted
|
||||
) {
|
||||
// User cancelled during analysis - don't show dialog
|
||||
setStreaming(false);
|
||||
refreshDerived();
|
||||
return { submitted: false };
|
||||
}
|
||||
|
||||
setApprovalContexts(contexts);
|
||||
|
||||
// Refresh to remove the message from UI
|
||||
refreshDerived();
|
||||
|
||||
// Return false = message NOT submitted, will be restored to input
|
||||
return { submitted: false };
|
||||
}
|
||||
} catch (error) {
|
||||
} catch (_error) {
|
||||
// If check fails, proceed anyway (don't block user)
|
||||
console.error("Failed to check pending approvals:", error);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2052,7 +2175,9 @@ ${recentCommits}
|
||||
messageQueue.length > 0 &&
|
||||
pendingApprovals.length === 0 &&
|
||||
!commandRunning &&
|
||||
!isExecutingTool
|
||||
!isExecutingTool &&
|
||||
!waitingForQueueCancelRef.current && // Don't dequeue while waiting for cancel
|
||||
!userCancelledRef.current // Don't dequeue if user just cancelled
|
||||
) {
|
||||
const [firstMessage, ...rest] = messageQueue;
|
||||
setMessageQueue(rest);
|
||||
|
||||
Reference in New Issue
Block a user