diff --git a/src/agent/memoryGit.ts b/src/agent/memoryGit.ts index 03025e9..14f7504 100644 --- a/src/agent/memoryGit.ts +++ b/src/agent/memoryGit.ts @@ -185,7 +185,6 @@ get_fm_value() { # Skip skill SKILL.md files — they use a different frontmatter format. for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do staged=$(git show ":$file") - staged=$(printf '%s' "$staged" | tr -d '\r') # Frontmatter is required first_line=$(echo "$staged" | head -1) diff --git a/src/tests/shell-codex.test.ts b/src/tests/shell-codex.test.ts index 652ff60..f83c8eb 100644 --- a/src/tests/shell-codex.test.ts +++ b/src/tests/shell-codex.test.ts @@ -6,13 +6,6 @@ import { shell } from "../tools/impl/Shell.js"; const isWindows = process.platform === "win32"; -function getEchoCommand(...args: string[]): string[] { - if (isWindows) { - return ["cmd.exe", "/c", "echo", ...args]; - } - return ["/usr/bin/env", "echo", ...args]; -} - describe("shell codex tool", () => { let tempDir: string; @@ -25,13 +18,11 @@ describe("shell codex tool", () => { test("executes simple command with execvp-style args", async () => { const result = await shell({ - command: getEchoCommand("hello", "world"), + command: ["echo", "hello", "world"], }); - expect(result.output.replaceAll('"', "")).toBe("hello world"); - expect(result.stdout.join(" ").replaceAll('"', "")).toContain( - "hello world", - ); + expect(result.output).toBe("hello world"); + expect(result.stdout).toContain("hello world"); expect(result.stderr.length).toBe(0); }); @@ -63,10 +54,10 @@ describe("shell codex tool", () => { // This is the key test for execvp semantics - args with spaces // should NOT be split const result = await shell({ - command: getEchoCommand("hello world", "foo bar"), + command: ["echo", "hello world", "foo bar"], }); - expect(result.output.replaceAll('"', "")).toBe("hello world foo bar"); + expect(result.output).toBe("hello world foo bar"); }); test.skipIf(isWindows)("respects workdir parameter", async () => { @@ -189,7 +180,7 @@ describe("shell codex tool", () => { test("handles special characters in arguments", async () => { const result = await shell({ - command: getEchoCommand("$HOME", "$(whoami)", "`date`"), + command: ["echo", "$HOME", "$(whoami)", "`date`"], }); // Since we're using execvp-style (not shell expansion), diff --git a/src/tests/tools/glob.test.ts b/src/tests/tools/glob.test.ts index 5c8f467..421909d 100644 --- a/src/tests/tools/glob.test.ts +++ b/src/tests/tools/glob.test.ts @@ -1,6 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test"; import { glob } from "../../tools/impl/Glob"; -import { executeTool, loadSpecificTools } from "../../tools/manager"; import { TestDirectory } from "../helpers/testFs"; describe("Glob tool", () => { @@ -45,60 +44,4 @@ describe("Glob tool", () => { expect(result.files).toEqual([]); }); - - test("aborts promptly when signal is already aborted", async () => { - testDir = new TestDirectory(); - testDir.createFile("a.ts", ""); - - const abortController = new AbortController(); - abortController.abort(); - - await expect( - glob({ - pattern: "**/*.ts", - path: testDir.path, - signal: abortController.signal, - }), - ).rejects.toMatchObject({ name: "AbortError" }); - }); - - test("manager passes signal through to Glob execution", async () => { - testDir = new TestDirectory(); - testDir.createFile("a.ts", ""); - - await loadSpecificTools(["Glob"]); - - const abortController = new AbortController(); - abortController.abort(); - - const result = await executeTool( - "Glob", - { pattern: "**/*.ts", path: testDir.path }, - { signal: abortController.signal }, - ); - - expect(result.status).toBe("error"); - expect(typeof result.toolReturn).toBe("string"); - expect(result.toolReturn).toContain("Interrupted by user"); - }); - - test("manager passes signal through to GlobGemini execution", async () => { - testDir = new TestDirectory(); - testDir.createFile("a.ts", ""); - - await loadSpecificTools(["GlobGemini"]); - - const abortController = new AbortController(); - abortController.abort(); - - const result = await executeTool( - "GlobGemini", - { pattern: "**/*.ts", dir_path: testDir.path }, - { signal: abortController.signal }, - ); - - expect(result.status).toBe("error"); - expect(typeof result.toolReturn).toBe("string"); - expect(result.toolReturn).toContain("Interrupted by user"); - }); }); diff --git a/src/tests/tools/grep.test.ts b/src/tests/tools/grep.test.ts index 8fa4e2a..4be9df6 100644 --- a/src/tests/tools/grep.test.ts +++ b/src/tests/tools/grep.test.ts @@ -1,6 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test"; import { grep } from "../../tools/impl/Grep"; -import { executeTool, loadSpecificTools } from "../../tools/manager"; import { TestDirectory } from "../helpers/testFs"; describe("Grep tool", () => { @@ -145,61 +144,4 @@ describe("Grep tool", () => { } } }); - - test("aborts promptly when signal is already aborted", async () => { - testDir = new TestDirectory(); - testDir.createFile("test.txt", "Hello World"); - - const abortController = new AbortController(); - abortController.abort(); - - await expect( - grep({ - pattern: "World", - path: testDir.path, - output_mode: "content", - signal: abortController.signal, - }), - ).rejects.toMatchObject({ name: "AbortError" }); - }); - - test("manager passes signal through to Grep execution", async () => { - testDir = new TestDirectory(); - testDir.createFile("test.txt", "Hello World"); - - await loadSpecificTools(["Grep"]); - - const abortController = new AbortController(); - abortController.abort(); - - const result = await executeTool( - "Grep", - { pattern: "World", path: testDir.path, output_mode: "content" }, - { signal: abortController.signal }, - ); - - expect(result.status).toBe("error"); - expect(typeof result.toolReturn).toBe("string"); - expect(result.toolReturn).toContain("Interrupted by user"); - }); - - test("manager passes signal through to GrepFiles execution", async () => { - testDir = new TestDirectory(); - testDir.createFile("test.txt", "Hello World"); - - await loadSpecificTools(["GrepFiles"]); - - const abortController = new AbortController(); - abortController.abort(); - - const result = await executeTool( - "GrepFiles", - { pattern: "World", path: testDir.path }, - { signal: abortController.signal }, - ); - - expect(result.status).toBe("error"); - expect(typeof result.toolReturn).toBe("string"); - expect(result.toolReturn).toContain("Interrupted by user"); - }); }); diff --git a/src/tests/tools/search-file-content.test.ts b/src/tests/tools/search-file-content.test.ts index 729a088..2798188 100644 --- a/src/tests/tools/search-file-content.test.ts +++ b/src/tests/tools/search-file-content.test.ts @@ -1,6 +1,5 @@ import { afterEach, describe, expect, test } from "bun:test"; import { search_file_content } from "../../tools/impl/SearchFileContentGemini"; -import { executeTool, loadSpecificTools } from "../../tools/manager"; import { TestDirectory } from "../helpers/testFs"; describe("SearchFileContent tool", () => { @@ -75,40 +74,4 @@ describe("SearchFileContent tool", () => { expect(result.message).toContain("Hello World"); }); - - test("aborts promptly when signal is already aborted", async () => { - testDir = new TestDirectory(); - testDir.createFile("test.txt", "Hello World"); - - const abortController = new AbortController(); - abortController.abort(); - - await expect( - search_file_content({ - pattern: "Hello", - dir_path: testDir.path, - signal: abortController.signal, - }), - ).rejects.toMatchObject({ name: "AbortError" }); - }); - - test("manager passes signal through to SearchFileContent execution", async () => { - testDir = new TestDirectory(); - testDir.createFile("test.txt", "Hello World"); - - await loadSpecificTools(["SearchFileContent"]); - - const abortController = new AbortController(); - abortController.abort(); - - const result = await executeTool( - "SearchFileContent", - { pattern: "Hello", dir_path: testDir.path }, - { signal: abortController.signal }, - ); - - expect(result.status).toBe("error"); - expect(typeof result.toolReturn).toBe("string"); - expect(result.toolReturn).toContain("Interrupted by user"); - }); }); diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts index 76e6add..a323419 100644 --- a/src/tests/websocket/listen-client-concurrency.test.ts +++ b/src/tests/websocket/listen-client-concurrency.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import WebSocket from "ws"; +import type { ResumeData } from "../../agent/check-approval"; import { permissionMode } from "../../permissions/mode"; import type { MessageQueueItem, @@ -67,6 +68,13 @@ const getClientMock = mock(async () => ({ cancel: cancelConversationMock, }, })); +const getResumeDataMock = mock( + async (): Promise => ({ + pendingApproval: null, + pendingApprovals: [], + messageHistory: [], + }), +); const classifyApprovalsMock = mock(async () => ({ autoAllowed: [], autoDenied: [], @@ -195,6 +203,7 @@ describe("listen-client multi-worker concurrency", () => { drainStreamWithResumeMock.mockClear(); getClientMock.mockClear(); retrieveAgentMock.mockClear(); + getResumeDataMock.mockClear(); classifyApprovalsMock.mockClear(); executeApprovalBatchMock.mockClear(); cancelConversationMock.mockClear(); @@ -697,6 +706,11 @@ describe("listen-client multi-worker concurrency", () => { status: "success", }; + getResumeDataMock.mockResolvedValueOnce({ + pendingApproval: approval, + pendingApprovals: [approval], + messageHistory: [], + }); classifyApprovalsMock.mockResolvedValueOnce({ autoAllowed: [ { @@ -743,13 +757,7 @@ describe("listen-client multi-worker concurrency", () => { runtime, socket as unknown as WebSocket, new AbortController().signal, - { - getResumeData: async () => ({ - pendingApproval: approval, - pendingApprovals: [approval], - messageHistory: [], - }), - }, + { getResumeData: getResumeDataMock }, ); await waitFor(() => sendMessageStreamMock.mock.calls.length === 1); @@ -854,77 +862,6 @@ describe("listen-client multi-worker concurrency", () => { expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true); }); - test("stale approval response after approval-only interrupt unlatches cancelRequested and allows queue pump", async () => { - const listener = __listenClientTestUtils.createListenerRuntime(); - __listenClientTestUtils.setActiveRuntime(listener); - const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( - listener, - "agent-1", - "conv-a", - ); - const socket = new MockSocket(); - - runtime.cancelRequested = true; - runtime.isProcessing = false; - runtime.loopStatus = "WAITING_ON_INPUT"; - - const queueInput = { - kind: "message", - source: "user", - content: "queued after stale approval", - clientMessageId: "cm-stale-approval", - agentId: "agent-1", - conversationId: "conv-a", - } satisfies Omit; - const item = runtime.queueRuntime.enqueue(queueInput); - if (!item) { - throw new Error("Expected queued item to be created"); - } - runtime.queuedMessagesByItemId.set( - item.id, - makeIncomingMessage("agent-1", "conv-a", "queued after stale approval"), - ); - - const scheduleQueuePumpMock = mock(() => { - __listenClientTestUtils.scheduleQueuePump( - runtime, - socket as unknown as WebSocket, - {} as never, - async () => {}, - ); - }); - - const handled = await __listenClientTestUtils.handleApprovalResponseInput( - listener, - { - runtime: { agent_id: "agent-1", conversation_id: "conv-a" }, - response: { - request_id: "perm-stale-after-approval-only-interrupt", - decision: { behavior: "allow" }, - }, - socket: socket as unknown as WebSocket, - opts: { - onStatusChange: undefined, - connectionId: "conn-1", - }, - processQueuedTurn: async () => {}, - }, - { - resolveRuntimeForApprovalRequest: () => null, - resolvePendingApprovalResolver: () => false, - getOrCreateScopedRuntime: () => runtime, - resolveRecoveredApprovalResponse: async () => false, - scheduleQueuePump: scheduleQueuePumpMock, - }, - ); - - expect(handled).toBe(false); - expect(runtime.cancelRequested).toBe(false); - expect(scheduleQueuePumpMock).toHaveBeenCalledTimes(1); - await waitFor(() => runtime.queuePumpScheduled === false); - expect(runtime.queueRuntime.length).toBe(0); - }); - test("change_device_state command holds queued input until the tracked command completes", async () => { const listener = __listenClientTestUtils.createListenerRuntime(); __listenClientTestUtils.setActiveRuntime(listener); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index ef06a17..d81d3df 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -277,35 +277,6 @@ describe("listen-client approval resolver wiring", () => { expect(runtime.pendingApprovalResolvers.size).toBe(0); }); - test("resolving final approval response restores WAITING_ON_INPUT even while processing stays true", async () => { - const runtime = __listenClientTestUtils.createRuntime(); - const socket = new MockSocket(WebSocket.OPEN); - const requestId = "perm-processing"; - - runtime.isProcessing = true; - runtime.loopStatus = "WAITING_ON_APPROVAL" as never; - - const pending = requestApprovalOverWS( - runtime, - socket as unknown as WebSocket, - requestId, - makeControlRequest(requestId), - ); - - const resolved = resolvePendingApprovalResolver( - runtime, - makeSuccessResponse(requestId), - ); - - expect(resolved).toBe(true); - await expect(pending).resolves.toMatchObject({ - request_id: requestId, - decision: { behavior: "allow" }, - }); - expect(String(runtime.loopStatus)).toBe("WAITING_ON_INPUT"); - expect(runtime.isProcessing).toBe(true); - }); - test("ignores non-matching request_id and keeps pending resolver", async () => { const runtime = __listenClientTestUtils.createRuntime(); const socket = new MockSocket(WebSocket.OPEN); @@ -358,9 +329,8 @@ describe("listen-client approval resolver wiring", () => { await expect(second).rejects.toThrow("socket closed"); }); - test("cleanup resets loop status to WAITING_ON_INPUT even while processing stays true", async () => { + test("cleanup resets WAITING_ON_INPUT instead of restoring fake processing", async () => { const runtime = __listenClientTestUtils.createRuntime(); - runtime.isProcessing = true; runtime.loopStatus = "WAITING_ON_APPROVAL"; @@ -371,7 +341,6 @@ describe("listen-client approval resolver wiring", () => { rejectPendingApprovalResolvers(runtime, "socket closed"); expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT"); - expect(runtime.isProcessing).toBe(true); await expect(pending).rejects.toThrow("socket closed"); }); @@ -1333,59 +1302,6 @@ describe("listen-client capability-gated approval flow", () => { expect.any(Function), ); }); - - test("stale approval responses after interrupt are benign and do not mutate runtime state", async () => { - const listener = __listenClientTestUtils.createListenerRuntime(); - const targetRuntime = - __listenClientTestUtils.getOrCreateConversationRuntime( - listener, - "agent-1", - "default", - ); - targetRuntime.cancelRequested = true; - targetRuntime.loopStatus = "WAITING_ON_INPUT"; - targetRuntime.isProcessing = false; - - const socket = new MockSocket(WebSocket.OPEN); - const scheduleQueuePumpMock = mock(() => {}); - const resolveRecoveredApprovalResponseMock = mock(async () => false); - - const handled = await __listenClientTestUtils.handleApprovalResponseInput( - listener, - { - runtime: { agent_id: "agent-1", conversation_id: "default" }, - response: { - request_id: "perm-stale-after-interrupt", - decision: { behavior: "allow" }, - }, - socket: socket as unknown as WebSocket, - opts: { - onStatusChange: undefined, - connectionId: "conn-1", - }, - processQueuedTurn: async () => {}, - }, - { - resolveRuntimeForApprovalRequest: () => null, - resolvePendingApprovalResolver: () => false, - getOrCreateScopedRuntime: () => targetRuntime, - resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock, - scheduleQueuePump: scheduleQueuePumpMock, - }, - ); - - expect(handled).toBe(false); - expect(resolveRecoveredApprovalResponseMock).not.toHaveBeenCalled(); - expect(scheduleQueuePumpMock).toHaveBeenCalledWith( - targetRuntime, - socket, - expect.objectContaining({ connectionId: "conn-1" }), - expect.any(Function), - ); - expect(targetRuntime.cancelRequested).toBe(false); - expect(targetRuntime.loopStatus).toBe("WAITING_ON_INPUT"); - expect(targetRuntime.isProcessing).toBe(false); - }); }); describe("listen-client approval recovery batch correlation", () => { diff --git a/src/tools/impl/Glob.ts b/src/tools/impl/Glob.ts index aeb0004..d9f14f2 100644 --- a/src/tools/impl/Glob.ts +++ b/src/tools/impl/Glob.ts @@ -24,7 +24,6 @@ const rgPath = getRipgrepPath(); interface GlobArgs { pattern: string; path?: string; - signal?: AbortSignal; } interface GlobResult { @@ -60,7 +59,7 @@ function applyFileLimit(files: string[], workingDirectory: string): GlobResult { export async function glob(args: GlobArgs): Promise { validateRequiredParams(args, ["pattern"], "Glob"); - const { pattern, path: searchPath, signal } = args; + const { pattern, path: searchPath } = args; // Explicit check for undefined/empty pattern (validateRequiredParams only checks key existence) if (!pattern) { @@ -94,7 +93,6 @@ export async function glob(args: GlobArgs): Promise { const { stdout } = await execFileAsync(rgPath, rgArgs, { maxBuffer: 50 * 1024 * 1024, // 50MB buffer for large file lists cwd: userCwd, - signal, }); const files = stdout.trim().split("\n").filter(Boolean).sort(); @@ -106,16 +104,6 @@ export async function glob(args: GlobArgs): Promise { code?: string | number; }; - const isAbortError = - err.name === "AbortError" || - err.code === "ABORT_ERR" || - err.message === "The operation was aborted"; - if (isAbortError) { - throw Object.assign(new Error("The operation was aborted"), { - name: "AbortError", - }); - } - // ripgrep exits with code 1 when no files match - that's not an error if (err.code === 1 || err.code === "1") { return { files: [] }; diff --git a/src/tools/impl/GlobGemini.ts b/src/tools/impl/GlobGemini.ts index 3adb5f8..b31bba9 100644 --- a/src/tools/impl/GlobGemini.ts +++ b/src/tools/impl/GlobGemini.ts @@ -11,7 +11,6 @@ interface GlobGeminiArgs { case_sensitive?: boolean; respect_git_ignore?: boolean; respect_gemini_ignore?: boolean; - signal?: AbortSignal; } export async function glob_gemini( @@ -21,7 +20,6 @@ export async function glob_gemini( const lettaArgs = { pattern: args.pattern, path: args.dir_path, - signal: args.signal, }; const result = await lettaGlob(lettaArgs); diff --git a/src/tools/impl/Grep.ts b/src/tools/impl/Grep.ts index f740fa2..32e710c 100644 --- a/src/tools/impl/Grep.ts +++ b/src/tools/impl/Grep.ts @@ -47,7 +47,6 @@ export interface GrepArgs { head_limit?: number; offset?: number; multiline?: boolean; - signal?: AbortSignal; } interface GrepResult { @@ -72,7 +71,6 @@ export async function grep(args: GrepArgs): Promise { head_limit = 100, offset = 0, multiline, - signal, } = args; const userCwd = process.env.USER_CWD || process.cwd(); @@ -104,7 +102,6 @@ export async function grep(args: GrepArgs): Promise { const { stdout } = await execFileAsync(rgPath, rgArgs, { maxBuffer: 10 * 1024 * 1024, cwd: userCwd, - signal, }); if (output_mode === "files_with_matches") { const allFiles = stdout.trim().split("\n").filter(Boolean); @@ -181,21 +178,12 @@ export async function grep(args: GrepArgs): Promise { } catch (error) { const err = error as NodeJS.ErrnoException & { stdout?: string; - code?: string | number; }; - const code = err.code !== undefined ? String(err.code) : undefined; + const code = typeof err.code === "number" ? err.code : undefined; + const _stdout = typeof err.stdout === "string" ? err.stdout : ""; const message = typeof err.message === "string" ? err.message : "Unknown error"; - const isAbortError = - err.name === "AbortError" || - err.code === "ABORT_ERR" || - err.message === "The operation was aborted"; - if (isAbortError) { - throw Object.assign(new Error("The operation was aborted"), { - name: "AbortError", - }); - } - if (code === "1") { + if (code === 1) { if (output_mode === "files_with_matches") return { output: "No files found", files: 0 }; if (output_mode === "count") diff --git a/src/tools/impl/GrepFiles.ts b/src/tools/impl/GrepFiles.ts index c7e6846..652f863 100644 --- a/src/tools/impl/GrepFiles.ts +++ b/src/tools/impl/GrepFiles.ts @@ -6,7 +6,6 @@ interface GrepFilesArgs { include?: string; path?: string; limit?: number; - signal?: AbortSignal; } interface GrepFilesResult { @@ -27,14 +26,13 @@ export async function grep_files( ): Promise { validateRequiredParams(args, ["pattern"], "grep_files"); - const { pattern, include, path, limit = DEFAULT_LIMIT, signal } = args; + const { pattern, include, path, limit = DEFAULT_LIMIT } = args; const grepArgs: GrepArgs = { pattern, path, glob: include, output_mode: "files_with_matches", - signal, }; const result = await grep(grepArgs); diff --git a/src/tools/impl/SearchFileContentGemini.ts b/src/tools/impl/SearchFileContentGemini.ts index 2ffb2fd..f6e95c2 100644 --- a/src/tools/impl/SearchFileContentGemini.ts +++ b/src/tools/impl/SearchFileContentGemini.ts @@ -9,7 +9,6 @@ interface SearchFileContentGeminiArgs { pattern: string; dir_path?: string; include?: string; - signal?: AbortSignal; } export async function search_file_content( @@ -21,7 +20,6 @@ export async function search_file_content( path: args.dir_path, glob: args.include, output_mode: "content" as const, // Return actual matching lines, not just file paths - signal: args.signal, }; const result = await grep(lettaArgs); diff --git a/src/tools/manager.ts b/src/tools/manager.ts index 270fe27..f7173ae 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -46,7 +46,7 @@ const FILE_MODIFYING_TOOLS = new Set([ ]); export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[]; -const SIGNAL_AWARE_TOOLS = new Set([ +const STREAMING_SHELL_TOOLS = new Set([ "Bash", "BashOutput", "TaskOutput", @@ -56,14 +56,6 @@ const SIGNAL_AWARE_TOOLS = new Set([ "Shell", "run_shell_command", "RunShellCommand", - "Glob", - "Grep", - "grep_files", - "GrepFiles", - "glob_gemini", - "GlobGemini", - "search_file_content", - "SearchFileContent", ]); // Maps internal tool names to server/model-facing tool names @@ -1332,22 +1324,13 @@ export async function executeTool( // Inject options for tools that support them without altering schemas let enhancedArgs = args; - if (SIGNAL_AWARE_TOOLS.has(internalName) && options?.signal) { - enhancedArgs = { ...enhancedArgs, signal: options.signal }; - } - - if ( - (internalName === "Bash" || - internalName === "BashOutput" || - internalName === "shell_command" || - internalName === "ShellCommand" || - internalName === "shell" || - internalName === "Shell" || - internalName === "run_shell_command" || - internalName === "RunShellCommand") && - options?.onOutput - ) { - enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput }; + if (STREAMING_SHELL_TOOLS.has(internalName)) { + if (options?.signal) { + enhancedArgs = { ...enhancedArgs, signal: options.signal }; + } + if (options?.onOutput) { + enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput }; + } } // Inject toolCallId and abort signal for Task tool diff --git a/src/websocket/helpers/listenerQueueAdapter.ts b/src/websocket/helpers/listenerQueueAdapter.ts index a281c60..6b382e8 100644 --- a/src/websocket/helpers/listenerQueueAdapter.ts +++ b/src/websocket/helpers/listenerQueueAdapter.ts @@ -12,14 +12,7 @@ export type ListenerQueueGatingConditions = { export function getListenerBlockedReason( c: ListenerQueueGatingConditions, ): QueueBlockedReason | null { - if ( - c.cancelRequested && - (c.isProcessing || - c.isRecoveringApprovals || - c.loopStatus !== "WAITING_ON_INPUT") - ) { - return "interrupt_in_progress"; - } + if (c.cancelRequested) return "interrupt_in_progress"; if (c.pendingApprovalsLen > 0) return "pending_approvals"; if (c.isRecoveringApprovals) return "runtime_busy"; if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals"; diff --git a/src/websocket/listener/client.ts b/src/websocket/listener/client.ts index 3e8fcfd..f905fa5 100644 --- a/src/websocket/listener/client.ts +++ b/src/websocket/listener/client.ts @@ -339,18 +339,6 @@ async function handleApprovalResponseInput( params.runtime.agent_id, params.runtime.conversation_id, ); - - if (targetRuntime.cancelRequested && !targetRuntime.isProcessing) { - targetRuntime.cancelRequested = false; - deps.scheduleQueuePump( - targetRuntime, - params.socket, - params.opts as StartListenerOptions, - params.processQueuedTurn, - ); - return false; - } - if ( await deps.resolveRecoveredApprovalResponse( targetRuntime, @@ -434,9 +422,11 @@ async function handleChangeDeviceStateInput( const shouldTrackCommand = !scopedRuntime.isProcessing && resolvedDeps.getPendingControlRequestCount(listener, scope) === 0; + if (shouldTrackCommand) { resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope); } + try { if (params.command.payload.mode) { resolvedDeps.handleModeChange( @@ -446,6 +436,7 @@ async function handleChangeDeviceStateInput( scope, ); } + if (params.command.payload.cwd) { await resolvedDeps.handleCwdChange( { @@ -958,9 +949,11 @@ async function connectWithRetry( ...scopedRuntime.activeExecutingToolCallIds, ]; } - const activeAbortController = scopedRuntime.activeAbortController; - if (activeAbortController && !activeAbortController.signal.aborted) { - activeAbortController.abort(); + if ( + scopedRuntime.activeAbortController && + !scopedRuntime.activeAbortController.signal.aborted + ) { + scopedRuntime.activeAbortController.abort(); } const recoveredApprovalState = getRecoveredApprovalStateForScope( runtime, @@ -984,10 +977,6 @@ async function connectWithRetry( }); } - if (!hasActiveTurn) { - scopedRuntime.cancelRequested = false; - } - // Backend cancel parity with TUI (App.tsx:5932-5941). // Fire-and-forget — local cancel + queued results are the primary mechanism. const cancelConversationId = scopedRuntime.conversationId; diff --git a/src/websocket/listener/queue.ts b/src/websocket/listener/queue.ts index 4699891..cbcc50b 100644 --- a/src/websocket/listener/queue.ts +++ b/src/websocket/listener/queue.ts @@ -250,6 +250,10 @@ function buildQueuedTurnMessage( }; } +export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { + return parsed.messages.some((payload) => "content" in payload); +} + export function consumeQueuedTurn(runtime: ConversationRuntime): { dequeuedBatch: DequeuedBatch; queuedTurn: IncomingMessage; @@ -275,10 +279,7 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): { } } - if (!hasMessage) { - return null; - } - if (queueLen === 0) { + if (!hasMessage || queueLen === 0) { return null; } @@ -298,10 +299,6 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): { }; } -export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { - return parsed.messages.some((payload) => "content" in payload); -} - function computeListenerQueueBlockedReason( runtime: ConversationRuntime, ): QueueBlockedReason | null { diff --git a/src/websocket/listener/runtime.ts b/src/websocket/listener/runtime.ts index 1dadb44..ba04eb1 100644 --- a/src/websocket/listener/runtime.ts +++ b/src/websocket/listener/runtime.ts @@ -225,9 +225,11 @@ export function clearConversationRuntimeState( runtime: ConversationRuntime, ): void { runtime.cancelRequested = true; - const activeAbortController = runtime.activeAbortController; - if (activeAbortController && !activeAbortController.signal.aborted) { - activeAbortController.abort(); + if ( + runtime.activeAbortController && + !runtime.activeAbortController.signal.aborted + ) { + runtime.activeAbortController.abort(); } runtime.pendingApprovalBatchByToolCallId.clear(); runtime.pendingInterruptedResults = null; diff --git a/src/websocket/listener/turn-approval.ts b/src/websocket/listener/turn-approval.ts index 324f02b..586a843 100644 --- a/src/websocket/listener/turn-approval.ts +++ b/src/websocket/listener/turn-approval.ts @@ -95,7 +95,6 @@ export async function handleApprovalStop(params: { currentInput: Array; pendingNormalizationInterruptedToolCallIds: string[]; turnToolContextId: string | null; - abortSignal: AbortSignal; buildSendOptions: () => Parameters< typeof sendApprovalContinuationWithRetry >[2]; @@ -113,9 +112,13 @@ export async function handleApprovalStop(params: { msgRunIds, currentInput, turnToolContextId, - abortSignal, buildSendOptions, } = params; + const abortController = runtime.activeAbortController; + + if (!abortController) { + throw new Error("Missing active abort controller during approval handling"); + } if (approvals.length === 0) { runtime.lastStopReason = "error"; @@ -280,7 +283,7 @@ export async function handleApprovalStop(params: { const executionResults = await executeApprovalBatch(decisions, undefined, { toolContextId: turnToolContextId ?? undefined, - abortSignal, + abortSignal: abortController.signal, workingDirectory: turnWorkingDirectory, }); const persistedExecutionResults = normalizeExecutionResultsForInterruptParity( @@ -328,6 +331,7 @@ export async function handleApprovalStop(params: { nextInput.push(...queuedTurn.messages); emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); } + setLoopStatus(runtime, "SENDING_API_REQUEST", { agent_id: agentId, conversation_id: conversationId, @@ -338,7 +342,7 @@ export async function handleApprovalStop(params: { buildSendOptions(), socket, runtime, - abortSignal, + abortController.signal, ); if (!stream) { return { diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts index 31a09ec..3530e61 100644 --- a/src/websocket/listener/turn.ts +++ b/src/websocket/listener/turn.ts @@ -122,9 +122,7 @@ export async function handleIncomingMessage( runtime.isProcessing = true; runtime.cancelRequested = false; - const turnAbortController = new AbortController(); - runtime.activeAbortController = turnAbortController; - const turnAbortSignal = turnAbortController.signal; + runtime.activeAbortController = new AbortController(); runtime.activeWorkingDirectory = turnWorkingDirectory; runtime.activeRunId = null; runtime.activeRunStartedAt = new Date().toISOString(); @@ -242,7 +240,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ) : await sendMessageStreamWithRetry( conversationId, @@ -250,7 +248,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ); if (!stream) { return; @@ -277,7 +275,7 @@ export async function handleIncomingMessage( stream as Stream, buffers, () => {}, - turnAbortSignal, + runtime.activeAbortController.signal, undefined, ({ chunk, shouldOutput, errorInfo }) => { const maybeRunId = (chunk as { run_id?: unknown }).run_id; @@ -430,7 +428,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ) : await sendMessageStreamWithRetry( conversationId, @@ -438,7 +436,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ); if (!stream) { return; @@ -494,7 +492,7 @@ export async function handleIncomingMessage( }); await new Promise((resolve) => setTimeout(resolve, delayMs)); - if (turnAbortSignal.aborted) { + if (runtime.activeAbortController.signal.aborted) { throw new Error("Cancelled by user"); } @@ -513,7 +511,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ) : await sendMessageStreamWithRetry( conversationId, @@ -521,7 +519,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ); if (!stream) { return; @@ -565,7 +563,7 @@ export async function handleIncomingMessage( }); await new Promise((resolve) => setTimeout(resolve, delayMs)); - if (turnAbortSignal.aborted) { + if (runtime.activeAbortController.signal.aborted) { throw new Error("Cancelled by user"); } @@ -584,7 +582,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ) : await sendMessageStreamWithRetry( conversationId, @@ -592,7 +590,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - turnAbortSignal, + runtime.activeAbortController.signal, ); if (!stream) { return; @@ -674,7 +672,6 @@ export async function handleIncomingMessage( currentInput, pendingNormalizationInterruptedToolCallIds, turnToolContextId, - abortSignal: turnAbortSignal, buildSendOptions, }); if (approvalResult.terminated || !approvalResult.stream) {