From 5f30588b7aee8841b46ba846bc3e1cbac742b4ed Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Mon, 23 Mar 2026 16:54:25 -0700 Subject: [PATCH 1/2] fix(tools): add Glob cancellation plumbing (#1494) Co-authored-by: Letta Code --- src/tests/tools/glob.test.ts | 57 ++++++++++++++++++++ src/tests/tools/grep.test.ts | 58 +++++++++++++++++++++ src/tests/tools/search-file-content.test.ts | 37 +++++++++++++ src/tools/impl/Glob.ts | 14 ++++- src/tools/impl/GlobGemini.ts | 2 + src/tools/impl/Grep.ts | 18 +++++-- src/tools/impl/GrepFiles.ts | 4 +- src/tools/impl/SearchFileContentGemini.ts | 2 + src/tools/manager.ts | 33 +++++++++--- 9 files changed, 212 insertions(+), 13 deletions(-) diff --git a/src/tests/tools/glob.test.ts b/src/tests/tools/glob.test.ts index 421909d..5c8f467 100644 --- a/src/tests/tools/glob.test.ts +++ b/src/tests/tools/glob.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, test } from "bun:test"; import { glob } from "../../tools/impl/Glob"; +import { executeTool, loadSpecificTools } from "../../tools/manager"; import { TestDirectory } from "../helpers/testFs"; describe("Glob tool", () => { @@ -44,4 +45,60 @@ describe("Glob tool", () => { expect(result.files).toEqual([]); }); + + test("aborts promptly when signal is already aborted", async () => { + testDir = new TestDirectory(); + testDir.createFile("a.ts", ""); + + const abortController = new AbortController(); + abortController.abort(); + + await expect( + glob({ + pattern: "**/*.ts", + path: testDir.path, + signal: abortController.signal, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + }); + + test("manager passes signal through to Glob execution", async () => { + testDir = new TestDirectory(); + testDir.createFile("a.ts", ""); + + await loadSpecificTools(["Glob"]); + + const abortController = new AbortController(); + abortController.abort(); + + const result = await executeTool( + "Glob", + { pattern: "**/*.ts", path: testDir.path }, + { signal: abortController.signal }, + ); + + expect(result.status).toBe("error"); + expect(typeof result.toolReturn).toBe("string"); + expect(result.toolReturn).toContain("Interrupted by user"); + }); + + test("manager passes signal through to GlobGemini execution", async () => { + testDir = new TestDirectory(); + testDir.createFile("a.ts", ""); + + await loadSpecificTools(["GlobGemini"]); + + const abortController = new AbortController(); + abortController.abort(); + + const result = await executeTool( + "GlobGemini", + { pattern: "**/*.ts", dir_path: testDir.path }, + { signal: abortController.signal }, + ); + + expect(result.status).toBe("error"); + expect(typeof result.toolReturn).toBe("string"); + expect(result.toolReturn).toContain("Interrupted by user"); + }); }); diff --git a/src/tests/tools/grep.test.ts b/src/tests/tools/grep.test.ts index 4be9df6..8fa4e2a 100644 --- a/src/tests/tools/grep.test.ts +++ b/src/tests/tools/grep.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, test } from "bun:test"; import { grep } from "../../tools/impl/Grep"; +import { executeTool, loadSpecificTools } from "../../tools/manager"; import { TestDirectory } from "../helpers/testFs"; describe("Grep tool", () => { @@ -144,4 +145,61 @@ describe("Grep tool", () => { } } }); + + test("aborts promptly when signal is already aborted", async () => { + testDir = new TestDirectory(); + testDir.createFile("test.txt", "Hello World"); + + const abortController = new AbortController(); + abortController.abort(); + + await expect( + grep({ + pattern: "World", + path: testDir.path, + output_mode: "content", + signal: abortController.signal, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + }); + + test("manager passes signal through to Grep execution", async () => { + testDir = new TestDirectory(); + testDir.createFile("test.txt", "Hello World"); + + await loadSpecificTools(["Grep"]); + + const abortController = new AbortController(); + abortController.abort(); + + const result = await executeTool( + "Grep", + { pattern: "World", path: testDir.path, output_mode: "content" }, + { signal: abortController.signal }, + ); + + expect(result.status).toBe("error"); + expect(typeof result.toolReturn).toBe("string"); + expect(result.toolReturn).toContain("Interrupted by user"); + }); + + test("manager passes signal through to GrepFiles execution", async () => { + testDir = new TestDirectory(); + testDir.createFile("test.txt", "Hello World"); + + await loadSpecificTools(["GrepFiles"]); + + const abortController = new AbortController(); + abortController.abort(); + + const result = await executeTool( + "GrepFiles", + { pattern: "World", path: testDir.path }, + { signal: abortController.signal }, + ); + + expect(result.status).toBe("error"); + expect(typeof result.toolReturn).toBe("string"); + expect(result.toolReturn).toContain("Interrupted by user"); + }); }); diff --git a/src/tests/tools/search-file-content.test.ts b/src/tests/tools/search-file-content.test.ts index 2798188..729a088 100644 --- a/src/tests/tools/search-file-content.test.ts +++ b/src/tests/tools/search-file-content.test.ts @@ -1,5 +1,6 @@ import { afterEach, describe, expect, test } from "bun:test"; import { search_file_content } from "../../tools/impl/SearchFileContentGemini"; +import { executeTool, loadSpecificTools } from "../../tools/manager"; import { TestDirectory } from "../helpers/testFs"; describe("SearchFileContent tool", () => { @@ -74,4 +75,40 @@ describe("SearchFileContent tool", () => { expect(result.message).toContain("Hello World"); }); + + test("aborts promptly when signal is already aborted", async () => { + testDir = new TestDirectory(); + testDir.createFile("test.txt", "Hello World"); + + const abortController = new AbortController(); + abortController.abort(); + + await expect( + search_file_content({ + pattern: "Hello", + dir_path: testDir.path, + signal: abortController.signal, + }), + ).rejects.toMatchObject({ name: "AbortError" }); + }); + + test("manager passes signal through to SearchFileContent execution", async () => { + testDir = new TestDirectory(); + testDir.createFile("test.txt", "Hello World"); + + await loadSpecificTools(["SearchFileContent"]); + + const abortController = new AbortController(); + abortController.abort(); + + const result = await executeTool( + "SearchFileContent", + { pattern: "Hello", dir_path: testDir.path }, + { signal: abortController.signal }, + ); + + expect(result.status).toBe("error"); + expect(typeof result.toolReturn).toBe("string"); + expect(result.toolReturn).toContain("Interrupted by user"); + }); }); diff --git a/src/tools/impl/Glob.ts b/src/tools/impl/Glob.ts index d9f14f2..aeb0004 100644 --- a/src/tools/impl/Glob.ts +++ b/src/tools/impl/Glob.ts @@ -24,6 +24,7 @@ const rgPath = getRipgrepPath(); interface GlobArgs { pattern: string; path?: string; + signal?: AbortSignal; } interface GlobResult { @@ -59,7 +60,7 @@ function applyFileLimit(files: string[], workingDirectory: string): GlobResult { export async function glob(args: GlobArgs): Promise { validateRequiredParams(args, ["pattern"], "Glob"); - const { pattern, path: searchPath } = args; + const { pattern, path: searchPath, signal } = args; // Explicit check for undefined/empty pattern (validateRequiredParams only checks key existence) if (!pattern) { @@ -93,6 +94,7 @@ export async function glob(args: GlobArgs): Promise { const { stdout } = await execFileAsync(rgPath, rgArgs, { maxBuffer: 50 * 1024 * 1024, // 50MB buffer for large file lists cwd: userCwd, + signal, }); const files = stdout.trim().split("\n").filter(Boolean).sort(); @@ -104,6 +106,16 @@ export async function glob(args: GlobArgs): Promise { code?: string | number; }; + const isAbortError = + err.name === "AbortError" || + err.code === "ABORT_ERR" || + err.message === "The operation was aborted"; + if (isAbortError) { + throw Object.assign(new Error("The operation was aborted"), { + name: "AbortError", + }); + } + // ripgrep exits with code 1 when no files match - that's not an error if (err.code === 1 || err.code === "1") { return { files: [] }; diff --git a/src/tools/impl/GlobGemini.ts b/src/tools/impl/GlobGemini.ts index b31bba9..3adb5f8 100644 --- a/src/tools/impl/GlobGemini.ts +++ b/src/tools/impl/GlobGemini.ts @@ -11,6 +11,7 @@ interface GlobGeminiArgs { case_sensitive?: boolean; respect_git_ignore?: boolean; respect_gemini_ignore?: boolean; + signal?: AbortSignal; } export async function glob_gemini( @@ -20,6 +21,7 @@ export async function glob_gemini( const lettaArgs = { pattern: args.pattern, path: args.dir_path, + signal: args.signal, }; const result = await lettaGlob(lettaArgs); diff --git a/src/tools/impl/Grep.ts b/src/tools/impl/Grep.ts index 32e710c..f740fa2 100644 --- a/src/tools/impl/Grep.ts +++ b/src/tools/impl/Grep.ts @@ -47,6 +47,7 @@ export interface GrepArgs { head_limit?: number; offset?: number; multiline?: boolean; + signal?: AbortSignal; } interface GrepResult { @@ -71,6 +72,7 @@ export async function grep(args: GrepArgs): Promise { head_limit = 100, offset = 0, multiline, + signal, } = args; const userCwd = process.env.USER_CWD || process.cwd(); @@ -102,6 +104,7 @@ export async function grep(args: GrepArgs): Promise { const { stdout } = await execFileAsync(rgPath, rgArgs, { maxBuffer: 10 * 1024 * 1024, cwd: userCwd, + signal, }); if (output_mode === "files_with_matches") { const allFiles = stdout.trim().split("\n").filter(Boolean); @@ -178,12 +181,21 @@ export async function grep(args: GrepArgs): Promise { } catch (error) { const err = error as NodeJS.ErrnoException & { stdout?: string; + code?: string | number; }; - const code = typeof err.code === "number" ? err.code : undefined; - const _stdout = typeof err.stdout === "string" ? err.stdout : ""; + const code = err.code !== undefined ? String(err.code) : undefined; const message = typeof err.message === "string" ? err.message : "Unknown error"; - if (code === 1) { + const isAbortError = + err.name === "AbortError" || + err.code === "ABORT_ERR" || + err.message === "The operation was aborted"; + if (isAbortError) { + throw Object.assign(new Error("The operation was aborted"), { + name: "AbortError", + }); + } + if (code === "1") { if (output_mode === "files_with_matches") return { output: "No files found", files: 0 }; if (output_mode === "count") diff --git a/src/tools/impl/GrepFiles.ts b/src/tools/impl/GrepFiles.ts index 652f863..c7e6846 100644 --- a/src/tools/impl/GrepFiles.ts +++ b/src/tools/impl/GrepFiles.ts @@ -6,6 +6,7 @@ interface GrepFilesArgs { include?: string; path?: string; limit?: number; + signal?: AbortSignal; } interface GrepFilesResult { @@ -26,13 +27,14 @@ export async function grep_files( ): Promise { validateRequiredParams(args, ["pattern"], "grep_files"); - const { pattern, include, path, limit = DEFAULT_LIMIT } = args; + const { pattern, include, path, limit = DEFAULT_LIMIT, signal } = args; const grepArgs: GrepArgs = { pattern, path, glob: include, output_mode: "files_with_matches", + signal, }; const result = await grep(grepArgs); diff --git a/src/tools/impl/SearchFileContentGemini.ts b/src/tools/impl/SearchFileContentGemini.ts index f6e95c2..2ffb2fd 100644 --- a/src/tools/impl/SearchFileContentGemini.ts +++ b/src/tools/impl/SearchFileContentGemini.ts @@ -9,6 +9,7 @@ interface SearchFileContentGeminiArgs { pattern: string; dir_path?: string; include?: string; + signal?: AbortSignal; } export async function search_file_content( @@ -20,6 +21,7 @@ export async function search_file_content( path: args.dir_path, glob: args.include, output_mode: "content" as const, // Return actual matching lines, not just file paths + signal: args.signal, }; const result = await grep(lettaArgs); diff --git a/src/tools/manager.ts b/src/tools/manager.ts index f7173ae..270fe27 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -46,7 +46,7 @@ const FILE_MODIFYING_TOOLS = new Set([ ]); export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[]; -const STREAMING_SHELL_TOOLS = new Set([ +const SIGNAL_AWARE_TOOLS = new Set([ "Bash", "BashOutput", "TaskOutput", @@ -56,6 +56,14 @@ const STREAMING_SHELL_TOOLS = new Set([ "Shell", "run_shell_command", "RunShellCommand", + "Glob", + "Grep", + "grep_files", + "GrepFiles", + "glob_gemini", + "GlobGemini", + "search_file_content", + "SearchFileContent", ]); // Maps internal tool names to server/model-facing tool names @@ -1324,13 +1332,22 @@ export async function executeTool( // Inject options for tools that support them without altering schemas let enhancedArgs = args; - if (STREAMING_SHELL_TOOLS.has(internalName)) { - if (options?.signal) { - enhancedArgs = { ...enhancedArgs, signal: options.signal }; - } - if (options?.onOutput) { - enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput }; - } + if (SIGNAL_AWARE_TOOLS.has(internalName) && options?.signal) { + enhancedArgs = { ...enhancedArgs, signal: options.signal }; + } + + if ( + (internalName === "Bash" || + internalName === "BashOutput" || + internalName === "shell_command" || + internalName === "ShellCommand" || + internalName === "shell" || + internalName === "Shell" || + internalName === "run_shell_command" || + internalName === "RunShellCommand") && + options?.onOutput + ) { + enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput }; } // Inject toolCallId and abort signal for Task tool From b8d6e199e4a1d62c05fb5d5c7f4dde67e71bc109 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Mon, 23 Mar 2026 17:25:17 -0700 Subject: [PATCH 2/2] fix(listener): harden interrupt completion and approval recovery (#1492) Co-authored-by: Letta Code --- src/agent/memoryGit.ts | 1 + src/tests/shell-codex.test.ts | 21 +++-- .../listen-client-concurrency.test.ts | 93 ++++++++++++++++--- .../websocket/listen-client-protocol.test.ts | 86 ++++++++++++++++- src/websocket/helpers/listenerQueueAdapter.ts | 9 +- src/websocket/listener/client.ts | 27 ++++-- src/websocket/listener/queue.ts | 13 ++- src/websocket/listener/runtime.ts | 8 +- src/websocket/listener/turn-approval.ts | 12 +-- src/websocket/listener/turn.ts | 27 +++--- 10 files changed, 236 insertions(+), 61 deletions(-) diff --git a/src/agent/memoryGit.ts b/src/agent/memoryGit.ts index 14f7504..03025e9 100644 --- a/src/agent/memoryGit.ts +++ b/src/agent/memoryGit.ts @@ -185,6 +185,7 @@ get_fm_value() { # Skip skill SKILL.md files — they use a different frontmatter format. for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do staged=$(git show ":$file") + staged=$(printf '%s' "$staged" | tr -d '\r') # Frontmatter is required first_line=$(echo "$staged" | head -1) diff --git a/src/tests/shell-codex.test.ts b/src/tests/shell-codex.test.ts index f83c8eb..652ff60 100644 --- a/src/tests/shell-codex.test.ts +++ b/src/tests/shell-codex.test.ts @@ -6,6 +6,13 @@ import { shell } from "../tools/impl/Shell.js"; const isWindows = process.platform === "win32"; +function getEchoCommand(...args: string[]): string[] { + if (isWindows) { + return ["cmd.exe", "/c", "echo", ...args]; + } + return ["/usr/bin/env", "echo", ...args]; +} + describe("shell codex tool", () => { let tempDir: string; @@ -18,11 +25,13 @@ describe("shell codex tool", () => { test("executes simple command with execvp-style args", async () => { const result = await shell({ - command: ["echo", "hello", "world"], + command: getEchoCommand("hello", "world"), }); - expect(result.output).toBe("hello world"); - expect(result.stdout).toContain("hello world"); + expect(result.output.replaceAll('"', "")).toBe("hello world"); + expect(result.stdout.join(" ").replaceAll('"', "")).toContain( + "hello world", + ); expect(result.stderr.length).toBe(0); }); @@ -54,10 +63,10 @@ describe("shell codex tool", () => { // This is the key test for execvp semantics - args with spaces // should NOT be split const result = await shell({ - command: ["echo", "hello world", "foo bar"], + command: getEchoCommand("hello world", "foo bar"), }); - expect(result.output).toBe("hello world foo bar"); + expect(result.output.replaceAll('"', "")).toBe("hello world foo bar"); }); test.skipIf(isWindows)("respects workdir parameter", async () => { @@ -180,7 +189,7 @@ describe("shell codex tool", () => { test("handles special characters in arguments", async () => { const result = await shell({ - command: ["echo", "$HOME", "$(whoami)", "`date`"], + command: getEchoCommand("$HOME", "$(whoami)", "`date`"), }); // Since we're using execvp-style (not shell expansion), diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts index a323419..76e6add 100644 --- a/src/tests/websocket/listen-client-concurrency.test.ts +++ b/src/tests/websocket/listen-client-concurrency.test.ts @@ -1,6 +1,5 @@ import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import WebSocket from "ws"; -import type { ResumeData } from "../../agent/check-approval"; import { permissionMode } from "../../permissions/mode"; import type { MessageQueueItem, @@ -68,13 +67,6 @@ const getClientMock = mock(async () => ({ cancel: cancelConversationMock, }, })); -const getResumeDataMock = mock( - async (): Promise => ({ - pendingApproval: null, - pendingApprovals: [], - messageHistory: [], - }), -); const classifyApprovalsMock = mock(async () => ({ autoAllowed: [], autoDenied: [], @@ -203,7 +195,6 @@ describe("listen-client multi-worker concurrency", () => { drainStreamWithResumeMock.mockClear(); getClientMock.mockClear(); retrieveAgentMock.mockClear(); - getResumeDataMock.mockClear(); classifyApprovalsMock.mockClear(); executeApprovalBatchMock.mockClear(); cancelConversationMock.mockClear(); @@ -706,11 +697,6 @@ describe("listen-client multi-worker concurrency", () => { status: "success", }; - getResumeDataMock.mockResolvedValueOnce({ - pendingApproval: approval, - pendingApprovals: [approval], - messageHistory: [], - }); classifyApprovalsMock.mockResolvedValueOnce({ autoAllowed: [ { @@ -757,7 +743,13 @@ describe("listen-client multi-worker concurrency", () => { runtime, socket as unknown as WebSocket, new AbortController().signal, - { getResumeData: getResumeDataMock }, + { + getResumeData: async () => ({ + pendingApproval: approval, + pendingApprovals: [approval], + messageHistory: [], + }), + }, ); await waitFor(() => sendMessageStreamMock.mock.calls.length === 1); @@ -862,6 +854,77 @@ describe("listen-client multi-worker concurrency", () => { expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true); }); + test("stale approval response after approval-only interrupt unlatches cancelRequested and allows queue pump", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + __listenClientTestUtils.setActiveRuntime(listener); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-a", + ); + const socket = new MockSocket(); + + runtime.cancelRequested = true; + runtime.isProcessing = false; + runtime.loopStatus = "WAITING_ON_INPUT"; + + const queueInput = { + kind: "message", + source: "user", + content: "queued after stale approval", + clientMessageId: "cm-stale-approval", + agentId: "agent-1", + conversationId: "conv-a", + } satisfies Omit; + const item = runtime.queueRuntime.enqueue(queueInput); + if (!item) { + throw new Error("Expected queued item to be created"); + } + runtime.queuedMessagesByItemId.set( + item.id, + makeIncomingMessage("agent-1", "conv-a", "queued after stale approval"), + ); + + const scheduleQueuePumpMock = mock(() => { + __listenClientTestUtils.scheduleQueuePump( + runtime, + socket as unknown as WebSocket, + {} as never, + async () => {}, + ); + }); + + const handled = await __listenClientTestUtils.handleApprovalResponseInput( + listener, + { + runtime: { agent_id: "agent-1", conversation_id: "conv-a" }, + response: { + request_id: "perm-stale-after-approval-only-interrupt", + decision: { behavior: "allow" }, + }, + socket: socket as unknown as WebSocket, + opts: { + onStatusChange: undefined, + connectionId: "conn-1", + }, + processQueuedTurn: async () => {}, + }, + { + resolveRuntimeForApprovalRequest: () => null, + resolvePendingApprovalResolver: () => false, + getOrCreateScopedRuntime: () => runtime, + resolveRecoveredApprovalResponse: async () => false, + scheduleQueuePump: scheduleQueuePumpMock, + }, + ); + + expect(handled).toBe(false); + expect(runtime.cancelRequested).toBe(false); + expect(scheduleQueuePumpMock).toHaveBeenCalledTimes(1); + await waitFor(() => runtime.queuePumpScheduled === false); + expect(runtime.queueRuntime.length).toBe(0); + }); + test("change_device_state command holds queued input until the tracked command completes", async () => { const listener = __listenClientTestUtils.createListenerRuntime(); __listenClientTestUtils.setActiveRuntime(listener); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index d81d3df..ef06a17 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -277,6 +277,35 @@ describe("listen-client approval resolver wiring", () => { expect(runtime.pendingApprovalResolvers.size).toBe(0); }); + test("resolving final approval response restores WAITING_ON_INPUT even while processing stays true", async () => { + const runtime = __listenClientTestUtils.createRuntime(); + const socket = new MockSocket(WebSocket.OPEN); + const requestId = "perm-processing"; + + runtime.isProcessing = true; + runtime.loopStatus = "WAITING_ON_APPROVAL" as never; + + const pending = requestApprovalOverWS( + runtime, + socket as unknown as WebSocket, + requestId, + makeControlRequest(requestId), + ); + + const resolved = resolvePendingApprovalResolver( + runtime, + makeSuccessResponse(requestId), + ); + + expect(resolved).toBe(true); + await expect(pending).resolves.toMatchObject({ + request_id: requestId, + decision: { behavior: "allow" }, + }); + expect(String(runtime.loopStatus)).toBe("WAITING_ON_INPUT"); + expect(runtime.isProcessing).toBe(true); + }); + test("ignores non-matching request_id and keeps pending resolver", async () => { const runtime = __listenClientTestUtils.createRuntime(); const socket = new MockSocket(WebSocket.OPEN); @@ -329,8 +358,9 @@ describe("listen-client approval resolver wiring", () => { await expect(second).rejects.toThrow("socket closed"); }); - test("cleanup resets WAITING_ON_INPUT instead of restoring fake processing", async () => { + test("cleanup resets loop status to WAITING_ON_INPUT even while processing stays true", async () => { const runtime = __listenClientTestUtils.createRuntime(); + runtime.isProcessing = true; runtime.loopStatus = "WAITING_ON_APPROVAL"; @@ -341,6 +371,7 @@ describe("listen-client approval resolver wiring", () => { rejectPendingApprovalResolvers(runtime, "socket closed"); expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT"); + expect(runtime.isProcessing).toBe(true); await expect(pending).rejects.toThrow("socket closed"); }); @@ -1302,6 +1333,59 @@ describe("listen-client capability-gated approval flow", () => { expect.any(Function), ); }); + + test("stale approval responses after interrupt are benign and do not mutate runtime state", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const targetRuntime = + __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "default", + ); + targetRuntime.cancelRequested = true; + targetRuntime.loopStatus = "WAITING_ON_INPUT"; + targetRuntime.isProcessing = false; + + const socket = new MockSocket(WebSocket.OPEN); + const scheduleQueuePumpMock = mock(() => {}); + const resolveRecoveredApprovalResponseMock = mock(async () => false); + + const handled = await __listenClientTestUtils.handleApprovalResponseInput( + listener, + { + runtime: { agent_id: "agent-1", conversation_id: "default" }, + response: { + request_id: "perm-stale-after-interrupt", + decision: { behavior: "allow" }, + }, + socket: socket as unknown as WebSocket, + opts: { + onStatusChange: undefined, + connectionId: "conn-1", + }, + processQueuedTurn: async () => {}, + }, + { + resolveRuntimeForApprovalRequest: () => null, + resolvePendingApprovalResolver: () => false, + getOrCreateScopedRuntime: () => targetRuntime, + resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock, + scheduleQueuePump: scheduleQueuePumpMock, + }, + ); + + expect(handled).toBe(false); + expect(resolveRecoveredApprovalResponseMock).not.toHaveBeenCalled(); + expect(scheduleQueuePumpMock).toHaveBeenCalledWith( + targetRuntime, + socket, + expect.objectContaining({ connectionId: "conn-1" }), + expect.any(Function), + ); + expect(targetRuntime.cancelRequested).toBe(false); + expect(targetRuntime.loopStatus).toBe("WAITING_ON_INPUT"); + expect(targetRuntime.isProcessing).toBe(false); + }); }); describe("listen-client approval recovery batch correlation", () => { diff --git a/src/websocket/helpers/listenerQueueAdapter.ts b/src/websocket/helpers/listenerQueueAdapter.ts index 6b382e8..a281c60 100644 --- a/src/websocket/helpers/listenerQueueAdapter.ts +++ b/src/websocket/helpers/listenerQueueAdapter.ts @@ -12,7 +12,14 @@ export type ListenerQueueGatingConditions = { export function getListenerBlockedReason( c: ListenerQueueGatingConditions, ): QueueBlockedReason | null { - if (c.cancelRequested) return "interrupt_in_progress"; + if ( + c.cancelRequested && + (c.isProcessing || + c.isRecoveringApprovals || + c.loopStatus !== "WAITING_ON_INPUT") + ) { + return "interrupt_in_progress"; + } if (c.pendingApprovalsLen > 0) return "pending_approvals"; if (c.isRecoveringApprovals) return "runtime_busy"; if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals"; diff --git a/src/websocket/listener/client.ts b/src/websocket/listener/client.ts index f905fa5..3e8fcfd 100644 --- a/src/websocket/listener/client.ts +++ b/src/websocket/listener/client.ts @@ -339,6 +339,18 @@ async function handleApprovalResponseInput( params.runtime.agent_id, params.runtime.conversation_id, ); + + if (targetRuntime.cancelRequested && !targetRuntime.isProcessing) { + targetRuntime.cancelRequested = false; + deps.scheduleQueuePump( + targetRuntime, + params.socket, + params.opts as StartListenerOptions, + params.processQueuedTurn, + ); + return false; + } + if ( await deps.resolveRecoveredApprovalResponse( targetRuntime, @@ -422,11 +434,9 @@ async function handleChangeDeviceStateInput( const shouldTrackCommand = !scopedRuntime.isProcessing && resolvedDeps.getPendingControlRequestCount(listener, scope) === 0; - if (shouldTrackCommand) { resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope); } - try { if (params.command.payload.mode) { resolvedDeps.handleModeChange( @@ -436,7 +446,6 @@ async function handleChangeDeviceStateInput( scope, ); } - if (params.command.payload.cwd) { await resolvedDeps.handleCwdChange( { @@ -949,11 +958,9 @@ async function connectWithRetry( ...scopedRuntime.activeExecutingToolCallIds, ]; } - if ( - scopedRuntime.activeAbortController && - !scopedRuntime.activeAbortController.signal.aborted - ) { - scopedRuntime.activeAbortController.abort(); + const activeAbortController = scopedRuntime.activeAbortController; + if (activeAbortController && !activeAbortController.signal.aborted) { + activeAbortController.abort(); } const recoveredApprovalState = getRecoveredApprovalStateForScope( runtime, @@ -977,6 +984,10 @@ async function connectWithRetry( }); } + if (!hasActiveTurn) { + scopedRuntime.cancelRequested = false; + } + // Backend cancel parity with TUI (App.tsx:5932-5941). // Fire-and-forget — local cancel + queued results are the primary mechanism. const cancelConversationId = scopedRuntime.conversationId; diff --git a/src/websocket/listener/queue.ts b/src/websocket/listener/queue.ts index cbcc50b..4699891 100644 --- a/src/websocket/listener/queue.ts +++ b/src/websocket/listener/queue.ts @@ -250,10 +250,6 @@ function buildQueuedTurnMessage( }; } -export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { - return parsed.messages.some((payload) => "content" in payload); -} - export function consumeQueuedTurn(runtime: ConversationRuntime): { dequeuedBatch: DequeuedBatch; queuedTurn: IncomingMessage; @@ -279,7 +275,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): { } } - if (!hasMessage || queueLen === 0) { + if (!hasMessage) { + return null; + } + if (queueLen === 0) { return null; } @@ -299,6 +298,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): { }; } +export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { + return parsed.messages.some((payload) => "content" in payload); +} + function computeListenerQueueBlockedReason( runtime: ConversationRuntime, ): QueueBlockedReason | null { diff --git a/src/websocket/listener/runtime.ts b/src/websocket/listener/runtime.ts index ba04eb1..1dadb44 100644 --- a/src/websocket/listener/runtime.ts +++ b/src/websocket/listener/runtime.ts @@ -225,11 +225,9 @@ export function clearConversationRuntimeState( runtime: ConversationRuntime, ): void { runtime.cancelRequested = true; - if ( - runtime.activeAbortController && - !runtime.activeAbortController.signal.aborted - ) { - runtime.activeAbortController.abort(); + const activeAbortController = runtime.activeAbortController; + if (activeAbortController && !activeAbortController.signal.aborted) { + activeAbortController.abort(); } runtime.pendingApprovalBatchByToolCallId.clear(); runtime.pendingInterruptedResults = null; diff --git a/src/websocket/listener/turn-approval.ts b/src/websocket/listener/turn-approval.ts index 586a843..324f02b 100644 --- a/src/websocket/listener/turn-approval.ts +++ b/src/websocket/listener/turn-approval.ts @@ -95,6 +95,7 @@ export async function handleApprovalStop(params: { currentInput: Array; pendingNormalizationInterruptedToolCallIds: string[]; turnToolContextId: string | null; + abortSignal: AbortSignal; buildSendOptions: () => Parameters< typeof sendApprovalContinuationWithRetry >[2]; @@ -112,13 +113,9 @@ export async function handleApprovalStop(params: { msgRunIds, currentInput, turnToolContextId, + abortSignal, buildSendOptions, } = params; - const abortController = runtime.activeAbortController; - - if (!abortController) { - throw new Error("Missing active abort controller during approval handling"); - } if (approvals.length === 0) { runtime.lastStopReason = "error"; @@ -283,7 +280,7 @@ export async function handleApprovalStop(params: { const executionResults = await executeApprovalBatch(decisions, undefined, { toolContextId: turnToolContextId ?? undefined, - abortSignal: abortController.signal, + abortSignal, workingDirectory: turnWorkingDirectory, }); const persistedExecutionResults = normalizeExecutionResultsForInterruptParity( @@ -331,7 +328,6 @@ export async function handleApprovalStop(params: { nextInput.push(...queuedTurn.messages); emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); } - setLoopStatus(runtime, "SENDING_API_REQUEST", { agent_id: agentId, conversation_id: conversationId, @@ -342,7 +338,7 @@ export async function handleApprovalStop(params: { buildSendOptions(), socket, runtime, - abortController.signal, + abortSignal, ); if (!stream) { return { diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts index 3530e61..31a09ec 100644 --- a/src/websocket/listener/turn.ts +++ b/src/websocket/listener/turn.ts @@ -122,7 +122,9 @@ export async function handleIncomingMessage( runtime.isProcessing = true; runtime.cancelRequested = false; - runtime.activeAbortController = new AbortController(); + const turnAbortController = new AbortController(); + runtime.activeAbortController = turnAbortController; + const turnAbortSignal = turnAbortController.signal; runtime.activeWorkingDirectory = turnWorkingDirectory; runtime.activeRunId = null; runtime.activeRunStartedAt = new Date().toISOString(); @@ -240,7 +242,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -248,7 +250,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -275,7 +277,7 @@ export async function handleIncomingMessage( stream as Stream, buffers, () => {}, - runtime.activeAbortController.signal, + turnAbortSignal, undefined, ({ chunk, shouldOutput, errorInfo }) => { const maybeRunId = (chunk as { run_id?: unknown }).run_id; @@ -428,7 +430,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -436,7 +438,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -492,7 +494,7 @@ export async function handleIncomingMessage( }); await new Promise((resolve) => setTimeout(resolve, delayMs)); - if (runtime.activeAbortController.signal.aborted) { + if (turnAbortSignal.aborted) { throw new Error("Cancelled by user"); } @@ -511,7 +513,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -519,7 +521,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -563,7 +565,7 @@ export async function handleIncomingMessage( }); await new Promise((resolve) => setTimeout(resolve, delayMs)); - if (runtime.activeAbortController.signal.aborted) { + if (turnAbortSignal.aborted) { throw new Error("Cancelled by user"); } @@ -582,7 +584,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -590,7 +592,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -672,6 +674,7 @@ export async function handleIncomingMessage( currentInput, pendingNormalizationInterruptedToolCallIds, turnToolContextId, + abortSignal: turnAbortSignal, buildSendOptions, }); if (approvalResult.terminated || !approvalResult.stream) {