From 95a628eaa09b2390c03c74db4065fa2419b3abe1 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Wed, 4 Mar 2026 22:35:17 -0800 Subject: [PATCH] fix(ws): remote interrupt recovery for listen-client [LET-7796] (#1272) Co-authored-by: Letta Code --- .../websocket/listen-interrupt-queue.test.ts | 747 ++++++++++++++++++ src/tests/websocket/listen-recovery.test.ts | 171 ++++ src/websocket/listen-client.ts | 598 +++++++++++++- 3 files changed, 1501 insertions(+), 15 deletions(-) create mode 100644 src/tests/websocket/listen-interrupt-queue.test.ts create mode 100644 src/tests/websocket/listen-recovery.test.ts diff --git a/src/tests/websocket/listen-interrupt-queue.test.ts b/src/tests/websocket/listen-interrupt-queue.test.ts new file mode 100644 index 0000000..cffc612 --- /dev/null +++ b/src/tests/websocket/listen-interrupt-queue.test.ts @@ -0,0 +1,747 @@ +/** + * Tests for the interrupt queue mechanism (LET-7796). + * + * Structure: + * 1. Structural tests — field initialization, teardown, epoch guards + * 2. Behavior-path tests — exercises populateInterruptQueue + consumeInterruptQueue + * through the same state sequences as the production cancel/resume flow: + * - Cancel during tool execution (Path A) → next turn consumes queued results + * - Cancel during approval wait (Path B) → next turn consumes synthesized denials + * - Post-cancel next turn → no repeated error loop (queue consumed once) + * - Stale-ID guard: clearing IDs after send prevents stale Path-B denials + */ +import { describe, expect, test } from "bun:test"; +import WebSocket from "ws"; +import type { ApprovalResult } from "../../agent/approval-execution"; +import { + __listenClientTestUtils, + rejectPendingApprovalResolvers, +} from "../../websocket/listen-client"; + +const { + createRuntime, + stopRuntime, + rememberPendingApprovalBatchIds, + populateInterruptQueue, + consumeInterruptQueue, + extractInterruptToolReturns, + emitInterruptToolReturnMessage, + getInterruptApprovalsForEmission, +} = __listenClientTestUtils; + +class MockSocket { + readyState: number; + closeCalls = 0; + removeAllListenersCalls = 0; + sentPayloads: string[] = []; + + constructor(readyState: number = WebSocket.OPEN) { + this.readyState = readyState; + } + + send(data: string): void { + this.sentPayloads.push(data); + } + + close(): void { + this.closeCalls += 1; + } + + removeAllListeners(): this { + this.removeAllListenersCalls += 1; + return this; + } +} + +// --------------------------------------------------------------------------- +// 1. Structural tests +// --------------------------------------------------------------------------- + +describe("ListenerRuntime interrupt queue fields", () => { + test("createRuntime initializes interrupt queue fields to safe defaults", () => { + const runtime = createRuntime(); + expect(runtime.pendingInterruptedResults).toBeNull(); + expect(runtime.pendingInterruptedContext).toBeNull(); + expect(runtime.continuationEpoch).toBe(0); + }); +}); + +describe("stopRuntime teardown", () => { + test("clears pendingInterruptedResults, context, and batch map", () => { + const runtime = createRuntime(); + runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; + + runtime.pendingInterruptedResults = [ + { + type: "approval", + tool_call_id: "call-1", + approve: false, + reason: "interrupted", + }, + ]; + runtime.pendingInterruptedContext = { + agentId: "agent-1", + conversationId: "conv-1", + continuationEpoch: 0, + }; + runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1"); + + stopRuntime(runtime, true); + + expect(runtime.pendingInterruptedResults).toBeNull(); + expect(runtime.pendingInterruptedContext).toBeNull(); + expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(0); + }); + + test("increments continuationEpoch on each stop", () => { + const runtime = createRuntime(); + runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; + + expect(runtime.continuationEpoch).toBe(0); + stopRuntime(runtime, true); + expect(runtime.continuationEpoch).toBe(1); + + runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; + runtime.intentionallyClosed = false; + stopRuntime(runtime, true); + expect(runtime.continuationEpoch).toBe(2); + }); +}); + +describe("pendingApprovalBatchByToolCallId survives rejectPendingApprovalResolvers", () => { + test("batch map preserved after resolver rejection (used for Path B IDs)", () => { + const runtime = createRuntime(); + runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1"); + runtime.pendingApprovalResolvers.set("perm-1", { + resolve: () => {}, + reject: () => {}, + }); + + rejectPendingApprovalResolvers(runtime, "cancelled"); + + expect(runtime.pendingApprovalResolvers.size).toBe(0); + expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(1); + }); +}); + +describe("extractInterruptToolReturns", () => { + test("maps completed tool execution results into tool_return payloads", () => { + const results: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-ok", + status: "success", + tool_return: "704", + } as ApprovalResult, + { + type: "tool", + tool_call_id: "call-err", + status: "error", + tool_return: "User interrupted the stream", + stderr: ["interrupted"], + } as ApprovalResult, + ]; + + const mapped = extractInterruptToolReturns(results); + expect(mapped).toEqual([ + { + tool_call_id: "call-ok", + status: "success", + tool_return: "704", + }, + { + tool_call_id: "call-err", + status: "error", + tool_return: "User interrupted the stream", + stderr: ["interrupted"], + }, + ]); + }); + + test("maps synthesized approval denials into terminal error tool returns", () => { + const results: ApprovalResult[] = [ + { + type: "approval", + tool_call_id: "call-denied", + approve: false, + reason: "User interrupted the stream", + } as ApprovalResult, + ]; + + const mapped = extractInterruptToolReturns(results); + expect(mapped).toEqual([ + { + tool_call_id: "call-denied", + status: "error", + tool_return: "User interrupted the stream", + }, + ]); + }); + + test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => { + const runtime = createRuntime(); + const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; + const approvals: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-a", + status: "success", + tool_return: "704", + } as ApprovalResult, + { + type: "approval", + tool_call_id: "call-b", + approve: false, + reason: "User interrupted the stream", + } as ApprovalResult, + ]; + + emitInterruptToolReturnMessage(socket, runtime, approvals, "run-1"); + + const parsed = (socket as unknown as MockSocket).sentPayloads.map((raw) => + JSON.parse(raw), + ); + const toolReturnFrames = parsed.filter( + (payload) => payload.message_type === "tool_return_message", + ); + + expect(toolReturnFrames).toHaveLength(2); + expect(toolReturnFrames[0]).toMatchObject({ + run_id: "run-1", + tool_call_id: "call-a", + status: "success", + tool_returns: [{ tool_call_id: "call-a", status: "success" }], + }); + expect(toolReturnFrames[1]).toMatchObject({ + run_id: "run-1", + tool_call_id: "call-b", + status: "error", + tool_returns: [{ tool_call_id: "call-b", status: "error" }], + }); + }); +}); + +describe("getInterruptApprovalsForEmission", () => { + test("prefers lastExecutionResults when available", () => { + const runtime = createRuntime(); + runtime.pendingInterruptedResults = [ + { + type: "approval", + tool_call_id: "call-old", + approve: false, + }, + ]; + runtime.pendingInterruptedContext = { + agentId: "agent-1", + conversationId: "conv-1", + continuationEpoch: runtime.continuationEpoch, + }; + + const result = getInterruptApprovalsForEmission(runtime, { + lastExecutionResults: [ + { + type: "approval", + tool_call_id: "call-new", + approve: true, + }, + ], + agentId: "agent-1", + conversationId: "conv-1", + }); + expect(result?.[0]).toMatchObject({ tool_call_id: "call-new" }); + }); + + test("falls back to pendingInterruptedResults only when context matches", () => { + const runtime = createRuntime(); + runtime.pendingInterruptedResults = [ + { + type: "approval", + tool_call_id: "call-pending", + approve: false, + }, + ]; + runtime.pendingInterruptedContext = { + agentId: "agent-1", + conversationId: "conv-1", + continuationEpoch: runtime.continuationEpoch, + }; + + const matching = getInterruptApprovalsForEmission(runtime, { + lastExecutionResults: null, + agentId: "agent-1", + conversationId: "conv-1", + }); + expect(matching?.[0]).toMatchObject({ tool_call_id: "call-pending" }); + + const mismatched = getInterruptApprovalsForEmission(runtime, { + lastExecutionResults: null, + agentId: "agent-2", + conversationId: "conv-1", + }); + expect(mismatched).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// 2. Behavior-path tests using extracted helpers +// --------------------------------------------------------------------------- + +describe("Path A: cancel during tool execution → next turn consumes actual results", () => { + test("full sequence: populate with execution results → consume on next turn", () => { + const runtime = createRuntime(); + const agentId = "agent-abc"; + const conversationId = "conv-xyz"; + + // Simulate: executeApprovalBatch completed, results captured + const executionResults: ApprovalResult[] = [ + { type: "approval", tool_call_id: "call-1", approve: true }, + { + tool_call_id: "call-2", + status: "success", + tool_return: "file written", + } as unknown as ApprovalResult, + ]; + + // Cancel fires: populateInterruptQueue (Path A — has execution results) + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: executionResults, + lastNeedsUserInputToolCallIds: ["call-1", "call-2"], + agentId, + conversationId, + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults).toBe(executionResults); + expect(runtime.pendingInterruptedContext).toMatchObject({ + agentId, + conversationId, + continuationEpoch: 0, + }); + + // Next user message arrives: consumeInterruptQueue + const consumed = consumeInterruptQueue(runtime, agentId, conversationId); + + expect(consumed).not.toBeNull(); + expect(consumed?.type).toBe("approval"); + expect(consumed?.approvals).toBe(executionResults); + expect(consumed?.approvals).toHaveLength(2); + + // Queue is atomically cleared after consumption + expect(runtime.pendingInterruptedResults).toBeNull(); + expect(runtime.pendingInterruptedContext).toBeNull(); + }); + + test("Path A takes priority over Path B even when both sources available", () => { + const runtime = createRuntime(); + + // Both execution results AND batch map IDs exist + const executionResults: ApprovalResult[] = [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ]; + runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1"); + + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: executionResults, + lastNeedsUserInputToolCallIds: ["call-1"], + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(true); + // Should contain the execution results (Path A), not synthesized denials (Path B) + expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ + approve: true, // Path A preserves actual approval state + }); + }); +}); + +describe("Path B: cancel during approval wait → next turn consumes synthesized denials", () => { + test("full sequence: populate from batch map IDs → consume synthesized denials", () => { + const runtime = createRuntime(); + const agentId = "agent-abc"; + const conversationId = "conv-xyz"; + + // Simulate: approvals classified, batch IDs remembered, waiting for user input + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-1" }, { toolCallId: "call-2" }], + "batch-42", + ); + + // Cancel fires during approval wait: no execution results + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: null, + lastNeedsUserInputToolCallIds: [], + agentId, + conversationId, + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults).toHaveLength(2); + + // Verify synthesized denials + const pendingResults = runtime.pendingInterruptedResults ?? []; + for (const result of pendingResults) { + expect(result).toMatchObject({ + type: "approval", + approve: false, + reason: "User interrupted the stream", + }); + } + const ids = runtime.pendingInterruptedResults?.map( + (r) => "tool_call_id" in r && r.tool_call_id, + ); + expect(ids).toContain("call-1"); + expect(ids).toContain("call-2"); + + // Next user message: consume + const consumed = consumeInterruptQueue(runtime, agentId, conversationId); + expect(consumed).not.toBeNull(); + expect(consumed?.approvals).toHaveLength(2); + + // Queue cleared + expect(runtime.pendingInterruptedResults).toBeNull(); + }); + + test("fallback to lastNeedsUserInputToolCallIds when batch map empty", () => { + const runtime = createRuntime(); + + // No batch map entries, but we have the snapshot IDs + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: null, + lastNeedsUserInputToolCallIds: ["call-a", "call-b"], + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults).toHaveLength(2); + const ids = runtime.pendingInterruptedResults?.map( + (r) => "tool_call_id" in r && r.tool_call_id, + ); + expect(ids).toEqual(["call-a", "call-b"]); + }); + + test("returns false when both ID sources empty (no-op edge case)", () => { + const runtime = createRuntime(); + + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: null, + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(false); + expect(runtime.pendingInterruptedResults).toBeNull(); + }); +}); + +describe("post-cancel next turn: queue consumed exactly once (no error loop)", () => { + test("second consumeInterruptQueue returns null after first consumption", () => { + const runtime = createRuntime(); + const agentId = "agent-1"; + const convId = "conv-1"; + + // Populate + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { + type: "approval", + tool_call_id: "call-1", + approve: false, + reason: "cancelled", + }, + ], + lastNeedsUserInputToolCallIds: [], + agentId, + conversationId: convId, + }); + + // First consume — gets the results + const first = consumeInterruptQueue(runtime, agentId, convId); + expect(first).not.toBeNull(); + + // Second consume — queue is empty, returns null + const second = consumeInterruptQueue(runtime, agentId, convId); + expect(second).toBeNull(); + }); + + test("third message also gets null (queue stays drained)", () => { + const runtime = createRuntime(); + const agentId = "agent-1"; + const convId = "conv-1"; + + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId, + conversationId: convId, + }); + + consumeInterruptQueue(runtime, agentId, convId); // first + consumeInterruptQueue(runtime, agentId, convId); // second + const third = consumeInterruptQueue(runtime, agentId, convId); + expect(third).toBeNull(); + }); +}); + +describe("idempotency: first cancel populates, second is no-op", () => { + test("second populateInterruptQueue returns false and preserves first results", () => { + const runtime = createRuntime(); + + const first = populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-first", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + expect(first).toBe(true); + + const second = populateInterruptQueue(runtime, { + lastExecutionResults: [ + { + type: "approval", + tool_call_id: "call-second", + approve: false, + reason: "x", + }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + expect(second).toBe(false); + + // First results preserved + expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ + tool_call_id: "call-first", + }); + }); + + test("populate succeeds again after consume clears the queue", () => { + const runtime = createRuntime(); + + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + + // Consume clears + consumeInterruptQueue(runtime, "agent-1", "conv-1"); + + // Re-populate succeeds + const repopulated = populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-2", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + expect(repopulated).toBe(true); + expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ + tool_call_id: "call-2", + }); + }); +}); + +describe("epoch guard: stale context discarded on consume", () => { + test("consume returns null for queue populated in earlier epoch", () => { + const runtime = createRuntime(); + runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; + + // Populate at epoch 0 + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + + // Stop bumps epoch, also clears — but let's test the guard directly: + // Manually bump epoch without clearing (simulating a theoretical race) + runtime.continuationEpoch = 99; + + const consumed = consumeInterruptQueue(runtime, "agent-1", "conv-1"); + // Context has epoch 0, runtime has epoch 99 → mismatch → no consumption + expect(consumed).toBeNull(); + // But queue IS cleared (atomic clear regardless of match) + expect(runtime.pendingInterruptedResults).toBeNull(); + }); + + test("consume returns null for queue with wrong agentId", () => { + const runtime = createRuntime(); + + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-old", + conversationId: "conv-1", + }); + + const consumed = consumeInterruptQueue(runtime, "agent-new", "conv-1"); + expect(consumed).toBeNull(); + // Cleared regardless + expect(runtime.pendingInterruptedResults).toBeNull(); + }); + + test("consume returns null for queue with wrong conversationId", () => { + const runtime = createRuntime(); + + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-old", + }); + + const consumed = consumeInterruptQueue(runtime, "agent-1", "conv-new"); + expect(consumed).toBeNull(); + }); +}); + +describe("stale Path-B IDs: clearing after successful send prevents re-denial", () => { + test("populate with cleared IDs after send produces no Path B denials", () => { + const runtime = createRuntime(); + + // After successful send: both lastExecutionResults and lastNeedsUserInputToolCallIds cleared + // Also batch map should be cleared by clearPendingApprovalBatchIds + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: null, + lastNeedsUserInputToolCallIds: [], // cleared after send + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(false); + expect(runtime.pendingInterruptedResults).toBeNull(); + }); + + test("batch map as primary Path B source after send still works if not cleared", () => { + const runtime = createRuntime(); + + // Batch map still has entries (from a NEW approval round that wasn't sent yet) + runtime.pendingApprovalBatchByToolCallId.set("call-new-1", "batch-new"); + + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: null, + lastNeedsUserInputToolCallIds: [], // cleared from previous send + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults).toHaveLength(1); + expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ + tool_call_id: "call-new-1", + approve: false, + }); + }); +}); + +describe("cancel-induced stop reason reclassification", () => { + /** + * Mirrors the effectiveStopReason computation from the Case 3 stream path. + * Both the legacy (sendClientMessage) and modern (emitToWS) branches now + * use effectiveStopReason — this test verifies the reclassification logic + * that both branches depend on. + */ + function computeEffectiveStopReason( + cancelRequested: boolean, + rawStopReason: string | null, + ): string { + return cancelRequested ? "cancelled" : rawStopReason || "error"; + } + + test("backend 'error' is reclassified to 'cancelled' when cancel was requested", () => { + const effective = computeEffectiveStopReason(true, "error"); + expect(effective).toBe("cancelled"); + }); + + test("backend 'error' is preserved when cancel was NOT requested", () => { + const effective = computeEffectiveStopReason(false, "error"); + expect(effective).toBe("error"); + }); + + test("null stop reason defaults to 'error' when cancel was NOT requested", () => { + const effective = computeEffectiveStopReason(false, null); + expect(effective).toBe("error"); + }); + + test("any raw stop reason is overridden to 'cancelled' when cancel was requested", () => { + expect(computeEffectiveStopReason(true, "llm_api_error")).toBe("cancelled"); + expect(computeEffectiveStopReason(true, "end_turn")).toBe("cancelled"); + expect(computeEffectiveStopReason(true, null)).toBe("cancelled"); + }); + + test("runtime.lastStopReason tracks the effective value after cancel populate", () => { + const runtime = createRuntime(); + runtime.cancelRequested = true; + + // After cancel, the production code sets: + // runtime.lastStopReason = effectiveStopReason + // where effectiveStopReason = cancelRequested ? "cancelled" : rawStop + const rawFromBackend = "error"; + const effective = computeEffectiveStopReason( + runtime.cancelRequested, + rawFromBackend, + ); + runtime.lastStopReason = effective; + + expect(runtime.lastStopReason).toBe("cancelled"); + }); +}); + +describe("consume clears pendingApprovalBatchByToolCallId", () => { + test("batch map is cleared as part of atomic consumption", () => { + const runtime = createRuntime(); + + runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1"); + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + + consumeInterruptQueue(runtime, "agent-1", "conv-1"); + + expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(0); + }); + + test("batch map is cleared even when context doesn't match (discard path)", () => { + const runtime = createRuntime(); + + runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1"); + populateInterruptQueue(runtime, { + lastExecutionResults: [ + { type: "approval", tool_call_id: "call-1", approve: true }, + ], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-old", + conversationId: "conv-old", + }); + + // Different agent → context mismatch → discard, but still clears + consumeInterruptQueue(runtime, "agent-new", "conv-new"); + + expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(0); + }); +}); diff --git a/src/tests/websocket/listen-recovery.test.ts b/src/tests/websocket/listen-recovery.test.ts new file mode 100644 index 0000000..4509c7c --- /dev/null +++ b/src/tests/websocket/listen-recovery.test.ts @@ -0,0 +1,171 @@ +/** + * Tests for pending approval recovery semantics (reconnect scenario). + * + * Covers: + * 1. Cold-start recovery: empty batch map → synthetic batch ID generated. + * 2. Warm recovery: existing batch map entries → resolved to single batch ID. + * 3. Ambiguous mapping: conflicting batch IDs → fail-closed (null). + * 4. Idempotency: repeated resolve calls with same state → same behavior. + * 5. isRecoveringApprovals guard prevents concurrent recovery. + */ +import { describe, expect, test } from "bun:test"; +import { __listenClientTestUtils } from "../../websocket/listen-client"; + +const { + createRuntime, + resolveRecoveryBatchId, + resolvePendingApprovalBatchId, + rememberPendingApprovalBatchIds, +} = __listenClientTestUtils; + +describe("resolveRecoveryBatchId cold-start", () => { + test("empty batch map returns synthetic recovery-* batch ID", () => { + const runtime = createRuntime(); + expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(0); + + const batchId = resolveRecoveryBatchId(runtime, [ + { toolCallId: "call-1" }, + { toolCallId: "call-2" }, + ]); + + expect(batchId).not.toBeNull(); + expect(batchId?.startsWith("recovery-")).toBe(true); + }); + + test("each cold-start call generates a unique batch ID", () => { + const runtime = createRuntime(); + const id1 = resolveRecoveryBatchId(runtime, [{ toolCallId: "call-1" }]); + const id2 = resolveRecoveryBatchId(runtime, [{ toolCallId: "call-1" }]); + + expect(id1).not.toBe(id2); + }); + + test("cold-start returns synthetic even with empty approval list", () => { + const runtime = createRuntime(); + const batchId = resolveRecoveryBatchId(runtime, []); + + expect(batchId).not.toBeNull(); + expect(batchId?.startsWith("recovery-")).toBe(true); + }); +}); + +describe("resolveRecoveryBatchId warm path", () => { + test("returns existing batch ID when all approvals map to same batch", () => { + const runtime = createRuntime(); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-1" }, { toolCallId: "call-2" }], + "batch-1", + ); + + const batchId = resolveRecoveryBatchId(runtime, [ + { toolCallId: "call-1" }, + { toolCallId: "call-2" }, + ]); + + expect(batchId).toBe("batch-1"); + }); + + test("returns null for ambiguous mapping (multiple batch IDs)", () => { + const runtime = createRuntime(); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-1" }], + "batch-1", + ); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-2" }], + "batch-2", + ); + + const batchId = resolveRecoveryBatchId(runtime, [ + { toolCallId: "call-1" }, + { toolCallId: "call-2" }, + ]); + + expect(batchId).toBeNull(); + }); + + test("returns null when approval has no batch mapping", () => { + const runtime = createRuntime(); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-1" }], + "batch-1", + ); + + // call-2 has no mapping + const batchId = resolveRecoveryBatchId(runtime, [ + { toolCallId: "call-1" }, + { toolCallId: "call-2" }, + ]); + + expect(batchId).toBeNull(); + }); +}); + +describe("isRecoveringApprovals guard", () => { + test("runtime starts with isRecoveringApprovals = false", () => { + const runtime = createRuntime(); + expect(runtime.isRecoveringApprovals).toBe(false); + }); + + test("guard flag prevents concurrent recovery (production pattern)", () => { + const runtime = createRuntime(); + + // Simulate first recovery in progress + runtime.isRecoveringApprovals = true; + + // Second recovery attempt should observe guard and bail + expect(runtime.isRecoveringApprovals).toBe(true); + + // Simulate completion + runtime.isRecoveringApprovals = false; + expect(runtime.isRecoveringApprovals).toBe(false); + }); +}); + +describe("resolvePendingApprovalBatchId original behavior preserved", () => { + test("returns null when map is empty (unchanged behavior)", () => { + const runtime = createRuntime(); + const batchId = resolvePendingApprovalBatchId(runtime, [ + { toolCallId: "call-1" }, + ]); + expect(batchId).toBeNull(); + }); + + test("returns batch ID for single consistent mapping", () => { + const runtime = createRuntime(); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-1" }], + "batch-abc", + ); + + const batchId = resolvePendingApprovalBatchId(runtime, [ + { toolCallId: "call-1" }, + ]); + expect(batchId).toBe("batch-abc"); + }); + + test("returns null for conflicting mappings (strict fail-closed)", () => { + const runtime = createRuntime(); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-1" }], + "batch-a", + ); + rememberPendingApprovalBatchIds( + runtime, + [{ toolCallId: "call-2" }], + "batch-b", + ); + + const batchId = resolvePendingApprovalBatchId(runtime, [ + { toolCallId: "call-1" }, + { toolCallId: "call-2" }, + ]); + expect(batchId).toBeNull(); + }); +}); diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 0974557..8801339 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -17,6 +17,7 @@ import { type ApprovalResult, executeApprovalBatch, } from "../agent/approval-execution"; +import { fetchRunErrorDetail } from "../agent/approval-recovery"; import { getResumeData } from "../agent/check-approval"; import { getClient } from "../agent/client"; import { getStreamToolContextId, sendMessageStream } from "../agent/message"; @@ -278,6 +279,16 @@ type ListenerRuntime = { * Used to preserve run attachment continuity across approval recovery. */ pendingApprovalBatchByToolCallId: Map; + /** Queued interrupted tool-call resolutions from a cancelled turn. Prepended to the next user message. */ + pendingInterruptedResults: Array | null; + /** Context for pendingInterruptedResults — prevents replay into wrong conversation. */ + pendingInterruptedContext: { + agentId: string; + conversationId: string; + continuationEpoch: number; + } | null; + /** Monotonic epoch for queued continuation validity checks. */ + continuationEpoch: number; }; type ApprovalSlot = @@ -351,6 +362,9 @@ function createRuntime(): ListenerRuntime { cancelRequested: false, isRecoveringApprovals: false, pendingApprovalBatchByToolCallId: new Map(), + pendingInterruptedResults: null, + pendingInterruptedContext: null, + continuationEpoch: 0, coalescedSkipQueueItemIds: new Set(), pendingTurns: 0, // queueRuntime assigned below — needs runtime ref in callbacks @@ -484,6 +498,22 @@ function resolvePendingApprovalBatchId( return batchIds.values().next().value ?? null; } +/** + * Resolve the batch ID for pending approval recovery. + * Cold start (empty map): returns a synthetic batch ID. + * Warm (map has entries): delegates to resolvePendingApprovalBatchId, + * returning null for ambiguous/conflicting mappings (fail-closed). + */ +function resolveRecoveryBatchId( + runtime: ListenerRuntime, + pendingApprovals: Array<{ toolCallId: string }>, +): string | null { + if (runtime.pendingApprovalBatchByToolCallId.size === 0) { + return `recovery-${crypto.randomUUID()}`; + } + return resolvePendingApprovalBatchId(runtime, pendingApprovals); +} + function clearPendingApprovalBatchIds( runtime: ListenerRuntime, approvals: Array<{ toolCallId: string }>, @@ -509,6 +539,11 @@ function stopRuntime( rejectPendingApprovalResolvers(runtime, "Listener runtime stopped"); runtime.pendingApprovalBatchByToolCallId.clear(); + // Clear interrupted queue on true teardown to prevent cross-session leakage. + runtime.pendingInterruptedResults = null; + runtime.pendingInterruptedContext = null; + runtime.continuationEpoch++; + if (!runtime.socket) { return; } @@ -815,6 +850,358 @@ function emitToWS(socket: WebSocket, event: WsProtocolEvent): void { } const LLM_API_ERROR_MAX_RETRIES = 3; +const MAX_PRE_STREAM_RECOVERY = 2; + +// --------------------------------------------------------------------------- +// Interrupt queue helpers — extracted for testability. +// These are the ONLY places that read/write pendingInterruptedResults. +// --------------------------------------------------------------------------- + +interface InterruptPopulateInput { + lastExecutionResults: ApprovalResult[] | null; + lastNeedsUserInputToolCallIds: string[]; + agentId: string; + conversationId: string; +} + +interface InterruptToolReturn { + tool_call_id: string; + status: "success" | "error"; + tool_return: string; + stdout?: string[]; + stderr?: string[]; +} + +function normalizeToolReturnValue(value: unknown): string { + if (typeof value === "string") { + return value; + } + if (value === null || value === undefined) { + return ""; + } + try { + return JSON.stringify(value); + } catch { + return String(value); + } +} + +function extractInterruptToolReturns( + approvals: ApprovalResult[] | null, +): InterruptToolReturn[] { + if (!approvals || approvals.length === 0) { + return []; + } + + return approvals.flatMap((approval): InterruptToolReturn[] => { + if (!approval || typeof approval !== "object") { + return []; + } + + if ("type" in approval && approval.type === "tool") { + const toolCallId = + "tool_call_id" in approval && typeof approval.tool_call_id === "string" + ? approval.tool_call_id + : null; + if (!toolCallId) { + return []; + } + const status = + "status" in approval && approval.status === "success" + ? "success" + : "error"; + const stdout = + "stdout" in approval && Array.isArray(approval.stdout) + ? approval.stdout.filter( + (entry): entry is string => typeof entry === "string", + ) + : undefined; + const stderr = + "stderr" in approval && Array.isArray(approval.stderr) + ? approval.stderr.filter( + (entry): entry is string => typeof entry === "string", + ) + : undefined; + + return [ + { + tool_call_id: toolCallId, + status, + tool_return: + "tool_return" in approval + ? normalizeToolReturnValue(approval.tool_return) + : "", + ...(stdout ? { stdout } : {}), + ...(stderr ? { stderr } : {}), + }, + ]; + } + + if ("type" in approval && approval.type === "approval") { + const toolCallId = + "tool_call_id" in approval && typeof approval.tool_call_id === "string" + ? approval.tool_call_id + : null; + if (!toolCallId) { + return []; + } + const reason = + "reason" in approval && typeof approval.reason === "string" + ? approval.reason + : "User interrupted the stream"; + return [ + { + tool_call_id: toolCallId, + status: "error", + tool_return: reason, + }, + ]; + } + + return []; + }); +} + +function emitInterruptToolReturnMessage( + socket: WebSocket, + runtime: ListenerRuntime, + approvals: ApprovalResult[] | null, + runId?: string | null, + uuidPrefix: string = "interrupt-tool-return", +): void { + const toolReturns = extractInterruptToolReturns(approvals); + if (toolReturns.length === 0) { + return; + } + + const resolvedRunId = runId ?? runtime.activeRunId ?? undefined; + for (const toolReturn of toolReturns) { + emitToWS(socket, { + type: "message", + message_type: "tool_return_message", + id: `message-${crypto.randomUUID()}`, + date: new Date().toISOString(), + run_id: resolvedRunId, + tool_call_id: toolReturn.tool_call_id, + tool_return: toolReturn.tool_return, + status: toolReturn.status, + ...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}), + ...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}), + tool_returns: [toolReturn], + session_id: runtime.sessionId, + uuid: `${uuidPrefix}-${crypto.randomUUID()}`, + } as unknown as MessageWire); + } +} + +function getInterruptApprovalsForEmission( + runtime: ListenerRuntime, + params: { + lastExecutionResults: ApprovalResult[] | null; + agentId: string; + conversationId: string; + }, +): ApprovalResult[] | null { + if (params.lastExecutionResults && params.lastExecutionResults.length > 0) { + return params.lastExecutionResults; + } + const context = runtime.pendingInterruptedContext; + if ( + !context || + context.agentId !== params.agentId || + context.conversationId !== params.conversationId || + context.continuationEpoch !== runtime.continuationEpoch + ) { + return null; + } + if ( + !runtime.pendingInterruptedResults || + runtime.pendingInterruptedResults.length === 0 + ) { + return null; + } + return runtime.pendingInterruptedResults; +} + +/** + * Populate the interrupt queue on the runtime after a cancel. + * Returns true if the queue was populated, false if skipped (idempotent). + * + * Path A: execution completed before cancel → queue actual results. + * Path B: no execution yet → synthesize denial results from stable ID sources. + */ +function populateInterruptQueue( + runtime: ListenerRuntime, + input: InterruptPopulateInput, +): boolean { + // Idempotency: preserve first cancel's results if already populated. + const shouldPopulate = + !runtime.pendingInterruptedResults || + runtime.pendingInterruptedResults.length === 0 || + !runtime.pendingInterruptedContext; + + if (!shouldPopulate) return false; + + if (input.lastExecutionResults && input.lastExecutionResults.length > 0) { + // Path A: execution happened before cancel — queue actual results + runtime.pendingInterruptedResults = input.lastExecutionResults; + runtime.pendingInterruptedContext = { + agentId: input.agentId, + conversationId: input.conversationId, + continuationEpoch: runtime.continuationEpoch, + }; + return true; + } + + // Path B: no execution — synthesize denial results from stable ID sources. + const batchToolCallIds = [...runtime.pendingApprovalBatchByToolCallId.keys()]; + const pendingIds = + batchToolCallIds.length > 0 + ? batchToolCallIds + : input.lastNeedsUserInputToolCallIds; + + if (pendingIds.length > 0) { + runtime.pendingInterruptedResults = pendingIds.map((toolCallId) => ({ + type: "approval" as const, + tool_call_id: toolCallId, + approve: false, + reason: "User interrupted the stream", + })); + runtime.pendingInterruptedContext = { + agentId: input.agentId, + conversationId: input.conversationId, + continuationEpoch: runtime.continuationEpoch, + }; + return true; + } + + if (process.env.DEBUG) { + console.warn( + "[Listen] Cancel during approval loop but no tool_call_ids available " + + "for interrupted queue — next turn may hit pre-stream conflict. " + + `batchMap=${runtime.pendingApprovalBatchByToolCallId.size}, ` + + `lastNeedsUserInput=${input.lastNeedsUserInputToolCallIds.length}`, + ); + } + return false; +} + +/** + * Consume queued interrupted results and return an ApprovalCreate to prepend, + * or null if nothing to consume. Always clears the queue atomically. + * + * This is the SOLE consumption point — called at the top of handleIncomingMessage. + */ +function consumeInterruptQueue( + runtime: ListenerRuntime, + agentId: string, + conversationId: string, +): { type: "approval"; approvals: ApprovalResult[] } | null { + if ( + !runtime.pendingInterruptedResults || + runtime.pendingInterruptedResults.length === 0 + ) { + return null; + } + + const ctx = runtime.pendingInterruptedContext; + let result: { type: "approval"; approvals: ApprovalResult[] } | null = null; + + if ( + ctx && + ctx.agentId === agentId && + ctx.conversationId === conversationId && + ctx.continuationEpoch === runtime.continuationEpoch + ) { + result = { + type: "approval", + approvals: runtime.pendingInterruptedResults, + }; + } + + // Atomic clear — always, regardless of context match. + // Stale results for wrong context are discarded, not retried. + runtime.pendingInterruptedResults = null; + runtime.pendingInterruptedContext = null; + runtime.pendingApprovalBatchByToolCallId.clear(); + + return result; +} + +/** + * Attempt to resolve stale pending approvals by fetching them from the backend + * and auto-denying. This is the Phase 3 bounded recovery mechanism — it does NOT + * touch pendingInterruptedResults (that's exclusively owned by handleIncomingMessage). + */ +async function resolveStaleApprovals( + runtime: ListenerRuntime, + abortSignal: AbortSignal, +): Promise { + if (!runtime.activeAgentId) return; + + const client = await getClient(); + let agent: Awaited>; + try { + agent = await client.agents.retrieve(runtime.activeAgentId); + } catch (err) { + // 404 = agent deleted, 422 = invalid ID — both mean nothing to recover + if (err instanceof APIError && (err.status === 404 || err.status === 422)) { + return; + } + throw err; + } + const requestedConversationId = + runtime.activeConversationId && runtime.activeConversationId !== "default" + ? runtime.activeConversationId + : undefined; + + let resumeData: Awaited>; + try { + resumeData = await getResumeData(client, agent, requestedConversationId, { + includeMessageHistory: false, + }); + } catch (err) { + // getResumeData rethrows 404/422 for conversations — treat as no approvals + if (err instanceof APIError && (err.status === 404 || err.status === 422)) { + return; + } + throw err; + } + + const pendingApprovals = resumeData.pendingApprovals || []; + if (pendingApprovals.length === 0) return; + if (abortSignal.aborted) throw new Error("Cancelled"); + + const denialResults: ApprovalResult[] = pendingApprovals.map((approval) => ({ + type: "approval" as const, + tool_call_id: approval.toolCallId, + approve: false, + reason: "Auto-denied during pre-stream approval recovery", + })); + + const recoveryConversationId = runtime.activeConversationId || "default"; + const recoveryStream = await sendMessageStream( + recoveryConversationId, + [{ type: "approval", approvals: denialResults }], + { + agentId: runtime.activeAgentId, + streamTokens: true, + background: true, + }, + { maxRetries: 0, signal: abortSignal }, + ); + + const drainResult = await drainStreamWithResume( + recoveryStream as Stream, + createBuffers(runtime.activeAgentId), + () => {}, + abortSignal, + ); + + if (drainResult.stopReason === "error") { + throw new Error("Pre-stream approval recovery drain ended with error"); + } +} /** * Wrap sendMessageStream with pre-stream error handling (retry/recovery). @@ -830,6 +1217,7 @@ async function sendMessageStreamWithRetry( ): Promise>> { let transientRetries = 0; let conversationBusyRetries = 0; + let preStreamRecoveryAttempts = 0; const MAX_CONVERSATION_BUSY_RETRIES = 1; // eslint-disable-next-line no-constant-condition @@ -868,9 +1256,31 @@ async function sendMessageStreamWithRetry( ); if (action === "resolve_approval_pending") { - // Listener can't auto-resolve pending approvals like headless does. - // Rethrow — the cloud will resend with the approval. - throw preStreamError; + // Abort check first — don't let recovery mask a user cancel + if (abortSignal?.aborted) throw new Error("Cancelled by user"); + + // Attempt bounded recovery: fetch pending approvals and auto-deny them. + // This does NOT touch pendingInterruptedResults (sole owner: handleIncomingMessage). + if ( + abortSignal && + preStreamRecoveryAttempts < MAX_PRE_STREAM_RECOVERY + ) { + preStreamRecoveryAttempts++; + try { + await resolveStaleApprovals(runtime, abortSignal); + continue; // Retry send after resolving + } catch (_recoveryError) { + if (abortSignal.aborted) throw new Error("Cancelled by user"); + // Recovery failed — fall through to structured error + } + } + + // Unrecoverable — emit structured error instead of blind rethrow + const detail = await fetchRunErrorDetail(runtime.activeRunId); + throw new Error( + detail || + `Pre-stream approval conflict (resolve_approval_pending) after ${preStreamRecoveryAttempts} recovery attempts`, + ); } if (action === "retry_transient") { @@ -1071,6 +1481,17 @@ async function recoverPendingApprovals( socket: WebSocket, msg: RecoverPendingApprovalsMessage, ): Promise { + console.debug( + "[listener] recover_pending_approvals received", + JSON.stringify({ + agentId: msg.agentId, + conversationId: msg.conversationId ?? null, + isProcessing: runtime.isProcessing, + isRecovering: runtime.isRecoveringApprovals, + batchMapSize: runtime.pendingApprovalBatchByToolCallId.size, + }), + ); + if (runtime.isProcessing || runtime.isRecoveringApprovals) { return; } @@ -1108,15 +1529,12 @@ async function recoverPendingApprovals( return; } - const recoveryBatchId = resolvePendingApprovalBatchId( - runtime, - pendingApprovals, - ); + const recoveryBatchId = resolveRecoveryBatchId(runtime, pendingApprovals); if (!recoveryBatchId) { emitToWS(socket, { type: "error", message: - "Unable to recover pending approvals without originating batch correlation", + "Unable to recover pending approvals: ambiguous batch correlation", stop_reason: "error", session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, @@ -1189,6 +1607,11 @@ async function recoverPendingApprovals( return; } + // Reflect approval-wait state in runtime snapshot while control + // requests are pending, so state_response queries see + // requires_approval even during the WS round-trip. + runtime.lastStopReason = "requires_approval"; + for (const ac of needsUserInput) { const requestId = `perm-${ac.approval.toolCallId}`; const diffs = await computeDiffPreviews( @@ -1501,6 +1924,25 @@ async function connectWithRetry( if (hasPendingApprovals) { rejectPendingApprovalResolvers(runtime, "Cancelled by user"); } + + // Backend cancel parity with TUI (App.tsx:5932-5941). + // Fire-and-forget — local cancel + queued results are the primary mechanism. + const cancelConversationId = runtime.activeConversationId; + const cancelAgentId = runtime.activeAgentId; + if (cancelAgentId) { + getClient() + .then((client) => { + const cancelId = + cancelConversationId === "default" || !cancelConversationId + ? cancelAgentId + : cancelConversationId; + return client.conversations.cancel(cancelId); + }) + .catch(() => { + // Fire-and-forget + }); + } + emitCancelAck(socket, runtime, { requestId, accepted: true, @@ -1794,6 +2236,11 @@ async function handleIncomingMessage( let msgTurnCount = 0; const msgRunIds: string[] = []; + // Track last approval-loop state for cancel-time queueing (Phase 1.2). + // Hoisted before try so the cancel catch block can access them. + let lastExecutionResults: ApprovalResult[] | null = null; + let lastNeedsUserInputToolCallIds: string[] = []; + runtime.isProcessing = true; runtime.cancelRequested = false; runtime.activeAbortController = new AbortController(); @@ -1824,9 +2271,21 @@ async function handleIncomingMessage( onStatusChange?.("processing", connectionId); } - let messagesToSend: Array = msg.messages; + let messagesToSend: Array = []; let turnToolContextId: string | null = null; + // Prepend queued interrupted results from a prior cancelled turn. + const consumed = consumeInterruptQueue( + runtime, + agentId || "", + conversationId, + ); + if (consumed) { + messagesToSend.push(consumed); + } + + messagesToSend.push(...msg.messages); + const firstMessage = msg.messages[0]; const isApprovalMessage = firstMessage && @@ -1891,6 +2350,15 @@ async function handleIncomingMessage( approvals: rebuiltApprovals, }, ]; + // Emit terminal tool outcomes immediately so WS consumers can close + // tool-call UI state without waiting for follow-up hydration. + emitInterruptToolReturnMessage( + socket, + runtime, + rebuiltApprovals, + runtime.activeRunId ?? undefined, + "tool-return", + ); } let stream = await sendMessageStreamWithRetry( @@ -2034,16 +2502,62 @@ async function handleIncomingMessage( break; } - // Case 3: Error + // Case 3: Error (or cancel-induced error) if (stopReason !== "requires_approval") { - runtime.lastStopReason = stopReason; + // Cancel-induced errors should be treated as cancellation, not error. + // This handles the race where cancel fires during stream drain and the + // backend returns "error" instead of "cancelled". + // We're already inside `stopReason !== "requires_approval"`, so this + // is a true non-approval stop. If cancel was requested, treat as cancelled. + const effectiveStopReason: StopReasonType = runtime.cancelRequested + ? "cancelled" + : (stopReason as StopReasonType) || "error"; + + // If effective stop reason is cancelled, route through cancelled semantics (Case 2). + if (effectiveStopReason === "cancelled") { + runtime.lastStopReason = "cancelled"; + runtime.isProcessing = false; + clearActiveRunState(runtime); + + if (runtime.controlResponseCapable) { + emitToWS(socket, { + type: "result", + subtype: "interrupted", + agent_id: agentId, + conversation_id: conversationId, + duration_ms: performance.now() - msgStartTime, + duration_api_ms: 0, + num_turns: msgTurnCount, + result: null, + run_ids: msgRunIds, + usage: null, + stop_reason: "cancelled", + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); + } else { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason: "cancelled", + }); + } + break; + } + + runtime.lastStopReason = effectiveStopReason; runtime.isProcessing = false; clearActiveRunState(runtime); + // Try to fetch richer error detail from the run metadata + const errorDetail = await fetchRunErrorDetail(runId).catch(() => null); + const errorMessage = + errorDetail || `Unexpected stop reason: ${stopReason}`; + emitToWS(socket, { type: "error", - message: `Unexpected stop reason: ${stopReason}`, - stop_reason: (stopReason as StopReasonType) || "error", + message: errorMessage, + stop_reason: effectiveStopReason, run_id: runId, session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, @@ -2060,7 +2574,7 @@ async function handleIncomingMessage( result: null, run_ids: msgRunIds, usage: null, - stop_reason: (stopReason as StopReasonType) || "error", + stop_reason: effectiveStopReason, session_id: runtime.sessionId, uuid: `result-${crypto.randomUUID()}`, }); @@ -2068,7 +2582,7 @@ async function handleIncomingMessage( sendClientMessage(socket, { type: "result", success: false, - stopReason, + stopReason: effectiveStopReason, }); } break; @@ -2103,6 +2617,13 @@ async function handleIncomingMessage( requireArgsForAutoApprove: true, }); + // Snapshot all tool_call_ids before entering approval wait so cancel can + // synthesize denial results even after pendingApprovalResolvers is cleared. + lastNeedsUserInputToolCallIds = needsUserInput.map( + (ac) => ac.approval.toolCallId, + ); + lastExecutionResults = null; + // Build decisions list (before needsUserInput gate so both paths accumulate here) type Decision = | { @@ -2255,6 +2776,19 @@ async function handleIncomingMessage( abortSignal: runtime.activeAbortController.signal, }, ); + lastExecutionResults = executionResults; + // WS-first parity: publish tool-return terminal outcomes immediately on + // normal approval execution, before continuation stream send. + emitInterruptToolReturnMessage( + socket, + runtime, + executionResults, + runtime.activeRunId || + runId || + msgRunIds[msgRunIds.length - 1] || + undefined, + "tool-return", + ); clearPendingApprovalBatchIds( runtime, decisions.map((decision) => decision.approval), @@ -2274,12 +2808,40 @@ async function handleIncomingMessage( runtime, runtime.activeAbortController.signal, ); + + // Results were successfully submitted to the backend — clear both so a + // cancel during the subsequent stream drain won't queue already-sent + // results (Path A) or re-deny already-resolved tool calls (Path B). + lastExecutionResults = null; + lastNeedsUserInputToolCallIds = []; + turnToolContextId = getStreamToolContextId( stream as Stream, ); } } catch (error) { if (runtime.cancelRequested) { + // Queue interrupted tool-call resolutions for the next message turn. + populateInterruptQueue(runtime, { + lastExecutionResults, + lastNeedsUserInputToolCallIds, + agentId: agentId || "", + conversationId, + }); + const approvalsForEmission = getInterruptApprovalsForEmission(runtime, { + lastExecutionResults, + agentId: agentId || "", + conversationId, + }); + if (approvalsForEmission) { + emitInterruptToolReturnMessage( + socket, + runtime, + approvalsForEmission, + runtime.activeRunId || msgRunIds[msgRunIds.length - 1] || undefined, + ); + } + runtime.lastStopReason = "cancelled"; runtime.isProcessing = false; clearActiveRunState(runtime); @@ -2381,5 +2943,11 @@ export const __listenClientTestUtils = { emitToWS, rememberPendingApprovalBatchIds, resolvePendingApprovalBatchId, + resolveRecoveryBatchId, clearPendingApprovalBatchIds, + populateInterruptQueue, + consumeInterruptQueue, + extractInterruptToolReturns, + emitInterruptToolReturnMessage, + getInterruptApprovalsForEmission, };