From b8d6e199e4a1d62c05fb5d5c7f4dde67e71bc109 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Mon, 23 Mar 2026 17:25:17 -0700 Subject: [PATCH] fix(listener): harden interrupt completion and approval recovery (#1492) Co-authored-by: Letta Code --- src/agent/memoryGit.ts | 1 + src/tests/shell-codex.test.ts | 21 +++-- .../listen-client-concurrency.test.ts | 93 ++++++++++++++++--- .../websocket/listen-client-protocol.test.ts | 86 ++++++++++++++++- src/websocket/helpers/listenerQueueAdapter.ts | 9 +- src/websocket/listener/client.ts | 27 ++++-- src/websocket/listener/queue.ts | 13 ++- src/websocket/listener/runtime.ts | 8 +- src/websocket/listener/turn-approval.ts | 12 +-- src/websocket/listener/turn.ts | 27 +++--- 10 files changed, 236 insertions(+), 61 deletions(-) diff --git a/src/agent/memoryGit.ts b/src/agent/memoryGit.ts index 14f7504..03025e9 100644 --- a/src/agent/memoryGit.ts +++ b/src/agent/memoryGit.ts @@ -185,6 +185,7 @@ get_fm_value() { # Skip skill SKILL.md files — they use a different frontmatter format. for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do staged=$(git show ":$file") + staged=$(printf '%s' "$staged" | tr -d '\r') # Frontmatter is required first_line=$(echo "$staged" | head -1) diff --git a/src/tests/shell-codex.test.ts b/src/tests/shell-codex.test.ts index f83c8eb..652ff60 100644 --- a/src/tests/shell-codex.test.ts +++ b/src/tests/shell-codex.test.ts @@ -6,6 +6,13 @@ import { shell } from "../tools/impl/Shell.js"; const isWindows = process.platform === "win32"; +function getEchoCommand(...args: string[]): string[] { + if (isWindows) { + return ["cmd.exe", "/c", "echo", ...args]; + } + return ["/usr/bin/env", "echo", ...args]; +} + describe("shell codex tool", () => { let tempDir: string; @@ -18,11 +25,13 @@ describe("shell codex tool", () => { test("executes simple command with execvp-style args", async () => { const result = await shell({ - command: ["echo", "hello", "world"], + command: getEchoCommand("hello", "world"), }); - expect(result.output).toBe("hello world"); - expect(result.stdout).toContain("hello world"); + expect(result.output.replaceAll('"', "")).toBe("hello world"); + expect(result.stdout.join(" ").replaceAll('"', "")).toContain( + "hello world", + ); expect(result.stderr.length).toBe(0); }); @@ -54,10 +63,10 @@ describe("shell codex tool", () => { // This is the key test for execvp semantics - args with spaces // should NOT be split const result = await shell({ - command: ["echo", "hello world", "foo bar"], + command: getEchoCommand("hello world", "foo bar"), }); - expect(result.output).toBe("hello world foo bar"); + expect(result.output.replaceAll('"', "")).toBe("hello world foo bar"); }); test.skipIf(isWindows)("respects workdir parameter", async () => { @@ -180,7 +189,7 @@ describe("shell codex tool", () => { test("handles special characters in arguments", async () => { const result = await shell({ - command: ["echo", "$HOME", "$(whoami)", "`date`"], + command: getEchoCommand("$HOME", "$(whoami)", "`date`"), }); // Since we're using execvp-style (not shell expansion), diff --git a/src/tests/websocket/listen-client-concurrency.test.ts b/src/tests/websocket/listen-client-concurrency.test.ts index a323419..76e6add 100644 --- a/src/tests/websocket/listen-client-concurrency.test.ts +++ b/src/tests/websocket/listen-client-concurrency.test.ts @@ -1,6 +1,5 @@ import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import WebSocket from "ws"; -import type { ResumeData } from "../../agent/check-approval"; import { permissionMode } from "../../permissions/mode"; import type { MessageQueueItem, @@ -68,13 +67,6 @@ const getClientMock = mock(async () => ({ cancel: cancelConversationMock, }, })); -const getResumeDataMock = mock( - async (): Promise => ({ - pendingApproval: null, - pendingApprovals: [], - messageHistory: [], - }), -); const classifyApprovalsMock = mock(async () => ({ autoAllowed: [], autoDenied: [], @@ -203,7 +195,6 @@ describe("listen-client multi-worker concurrency", () => { drainStreamWithResumeMock.mockClear(); getClientMock.mockClear(); retrieveAgentMock.mockClear(); - getResumeDataMock.mockClear(); classifyApprovalsMock.mockClear(); executeApprovalBatchMock.mockClear(); cancelConversationMock.mockClear(); @@ -706,11 +697,6 @@ describe("listen-client multi-worker concurrency", () => { status: "success", }; - getResumeDataMock.mockResolvedValueOnce({ - pendingApproval: approval, - pendingApprovals: [approval], - messageHistory: [], - }); classifyApprovalsMock.mockResolvedValueOnce({ autoAllowed: [ { @@ -757,7 +743,13 @@ describe("listen-client multi-worker concurrency", () => { runtime, socket as unknown as WebSocket, new AbortController().signal, - { getResumeData: getResumeDataMock }, + { + getResumeData: async () => ({ + pendingApproval: approval, + pendingApprovals: [approval], + messageHistory: [], + }), + }, ); await waitFor(() => sendMessageStreamMock.mock.calls.length === 1); @@ -862,6 +854,77 @@ describe("listen-client multi-worker concurrency", () => { expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true); }); + test("stale approval response after approval-only interrupt unlatches cancelRequested and allows queue pump", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + __listenClientTestUtils.setActiveRuntime(listener); + const runtime = __listenClientTestUtils.getOrCreateScopedRuntime( + listener, + "agent-1", + "conv-a", + ); + const socket = new MockSocket(); + + runtime.cancelRequested = true; + runtime.isProcessing = false; + runtime.loopStatus = "WAITING_ON_INPUT"; + + const queueInput = { + kind: "message", + source: "user", + content: "queued after stale approval", + clientMessageId: "cm-stale-approval", + agentId: "agent-1", + conversationId: "conv-a", + } satisfies Omit; + const item = runtime.queueRuntime.enqueue(queueInput); + if (!item) { + throw new Error("Expected queued item to be created"); + } + runtime.queuedMessagesByItemId.set( + item.id, + makeIncomingMessage("agent-1", "conv-a", "queued after stale approval"), + ); + + const scheduleQueuePumpMock = mock(() => { + __listenClientTestUtils.scheduleQueuePump( + runtime, + socket as unknown as WebSocket, + {} as never, + async () => {}, + ); + }); + + const handled = await __listenClientTestUtils.handleApprovalResponseInput( + listener, + { + runtime: { agent_id: "agent-1", conversation_id: "conv-a" }, + response: { + request_id: "perm-stale-after-approval-only-interrupt", + decision: { behavior: "allow" }, + }, + socket: socket as unknown as WebSocket, + opts: { + onStatusChange: undefined, + connectionId: "conn-1", + }, + processQueuedTurn: async () => {}, + }, + { + resolveRuntimeForApprovalRequest: () => null, + resolvePendingApprovalResolver: () => false, + getOrCreateScopedRuntime: () => runtime, + resolveRecoveredApprovalResponse: async () => false, + scheduleQueuePump: scheduleQueuePumpMock, + }, + ); + + expect(handled).toBe(false); + expect(runtime.cancelRequested).toBe(false); + expect(scheduleQueuePumpMock).toHaveBeenCalledTimes(1); + await waitFor(() => runtime.queuePumpScheduled === false); + expect(runtime.queueRuntime.length).toBe(0); + }); + test("change_device_state command holds queued input until the tracked command completes", async () => { const listener = __listenClientTestUtils.createListenerRuntime(); __listenClientTestUtils.setActiveRuntime(listener); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index d81d3df..ef06a17 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -277,6 +277,35 @@ describe("listen-client approval resolver wiring", () => { expect(runtime.pendingApprovalResolvers.size).toBe(0); }); + test("resolving final approval response restores WAITING_ON_INPUT even while processing stays true", async () => { + const runtime = __listenClientTestUtils.createRuntime(); + const socket = new MockSocket(WebSocket.OPEN); + const requestId = "perm-processing"; + + runtime.isProcessing = true; + runtime.loopStatus = "WAITING_ON_APPROVAL" as never; + + const pending = requestApprovalOverWS( + runtime, + socket as unknown as WebSocket, + requestId, + makeControlRequest(requestId), + ); + + const resolved = resolvePendingApprovalResolver( + runtime, + makeSuccessResponse(requestId), + ); + + expect(resolved).toBe(true); + await expect(pending).resolves.toMatchObject({ + request_id: requestId, + decision: { behavior: "allow" }, + }); + expect(String(runtime.loopStatus)).toBe("WAITING_ON_INPUT"); + expect(runtime.isProcessing).toBe(true); + }); + test("ignores non-matching request_id and keeps pending resolver", async () => { const runtime = __listenClientTestUtils.createRuntime(); const socket = new MockSocket(WebSocket.OPEN); @@ -329,8 +358,9 @@ describe("listen-client approval resolver wiring", () => { await expect(second).rejects.toThrow("socket closed"); }); - test("cleanup resets WAITING_ON_INPUT instead of restoring fake processing", async () => { + test("cleanup resets loop status to WAITING_ON_INPUT even while processing stays true", async () => { const runtime = __listenClientTestUtils.createRuntime(); + runtime.isProcessing = true; runtime.loopStatus = "WAITING_ON_APPROVAL"; @@ -341,6 +371,7 @@ describe("listen-client approval resolver wiring", () => { rejectPendingApprovalResolvers(runtime, "socket closed"); expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT"); + expect(runtime.isProcessing).toBe(true); await expect(pending).rejects.toThrow("socket closed"); }); @@ -1302,6 +1333,59 @@ describe("listen-client capability-gated approval flow", () => { expect.any(Function), ); }); + + test("stale approval responses after interrupt are benign and do not mutate runtime state", async () => { + const listener = __listenClientTestUtils.createListenerRuntime(); + const targetRuntime = + __listenClientTestUtils.getOrCreateConversationRuntime( + listener, + "agent-1", + "default", + ); + targetRuntime.cancelRequested = true; + targetRuntime.loopStatus = "WAITING_ON_INPUT"; + targetRuntime.isProcessing = false; + + const socket = new MockSocket(WebSocket.OPEN); + const scheduleQueuePumpMock = mock(() => {}); + const resolveRecoveredApprovalResponseMock = mock(async () => false); + + const handled = await __listenClientTestUtils.handleApprovalResponseInput( + listener, + { + runtime: { agent_id: "agent-1", conversation_id: "default" }, + response: { + request_id: "perm-stale-after-interrupt", + decision: { behavior: "allow" }, + }, + socket: socket as unknown as WebSocket, + opts: { + onStatusChange: undefined, + connectionId: "conn-1", + }, + processQueuedTurn: async () => {}, + }, + { + resolveRuntimeForApprovalRequest: () => null, + resolvePendingApprovalResolver: () => false, + getOrCreateScopedRuntime: () => targetRuntime, + resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock, + scheduleQueuePump: scheduleQueuePumpMock, + }, + ); + + expect(handled).toBe(false); + expect(resolveRecoveredApprovalResponseMock).not.toHaveBeenCalled(); + expect(scheduleQueuePumpMock).toHaveBeenCalledWith( + targetRuntime, + socket, + expect.objectContaining({ connectionId: "conn-1" }), + expect.any(Function), + ); + expect(targetRuntime.cancelRequested).toBe(false); + expect(targetRuntime.loopStatus).toBe("WAITING_ON_INPUT"); + expect(targetRuntime.isProcessing).toBe(false); + }); }); describe("listen-client approval recovery batch correlation", () => { diff --git a/src/websocket/helpers/listenerQueueAdapter.ts b/src/websocket/helpers/listenerQueueAdapter.ts index 6b382e8..a281c60 100644 --- a/src/websocket/helpers/listenerQueueAdapter.ts +++ b/src/websocket/helpers/listenerQueueAdapter.ts @@ -12,7 +12,14 @@ export type ListenerQueueGatingConditions = { export function getListenerBlockedReason( c: ListenerQueueGatingConditions, ): QueueBlockedReason | null { - if (c.cancelRequested) return "interrupt_in_progress"; + if ( + c.cancelRequested && + (c.isProcessing || + c.isRecoveringApprovals || + c.loopStatus !== "WAITING_ON_INPUT") + ) { + return "interrupt_in_progress"; + } if (c.pendingApprovalsLen > 0) return "pending_approvals"; if (c.isRecoveringApprovals) return "runtime_busy"; if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals"; diff --git a/src/websocket/listener/client.ts b/src/websocket/listener/client.ts index f905fa5..3e8fcfd 100644 --- a/src/websocket/listener/client.ts +++ b/src/websocket/listener/client.ts @@ -339,6 +339,18 @@ async function handleApprovalResponseInput( params.runtime.agent_id, params.runtime.conversation_id, ); + + if (targetRuntime.cancelRequested && !targetRuntime.isProcessing) { + targetRuntime.cancelRequested = false; + deps.scheduleQueuePump( + targetRuntime, + params.socket, + params.opts as StartListenerOptions, + params.processQueuedTurn, + ); + return false; + } + if ( await deps.resolveRecoveredApprovalResponse( targetRuntime, @@ -422,11 +434,9 @@ async function handleChangeDeviceStateInput( const shouldTrackCommand = !scopedRuntime.isProcessing && resolvedDeps.getPendingControlRequestCount(listener, scope) === 0; - if (shouldTrackCommand) { resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope); } - try { if (params.command.payload.mode) { resolvedDeps.handleModeChange( @@ -436,7 +446,6 @@ async function handleChangeDeviceStateInput( scope, ); } - if (params.command.payload.cwd) { await resolvedDeps.handleCwdChange( { @@ -949,11 +958,9 @@ async function connectWithRetry( ...scopedRuntime.activeExecutingToolCallIds, ]; } - if ( - scopedRuntime.activeAbortController && - !scopedRuntime.activeAbortController.signal.aborted - ) { - scopedRuntime.activeAbortController.abort(); + const activeAbortController = scopedRuntime.activeAbortController; + if (activeAbortController && !activeAbortController.signal.aborted) { + activeAbortController.abort(); } const recoveredApprovalState = getRecoveredApprovalStateForScope( runtime, @@ -977,6 +984,10 @@ async function connectWithRetry( }); } + if (!hasActiveTurn) { + scopedRuntime.cancelRequested = false; + } + // Backend cancel parity with TUI (App.tsx:5932-5941). // Fire-and-forget — local cancel + queued results are the primary mechanism. const cancelConversationId = scopedRuntime.conversationId; diff --git a/src/websocket/listener/queue.ts b/src/websocket/listener/queue.ts index cbcc50b..4699891 100644 --- a/src/websocket/listener/queue.ts +++ b/src/websocket/listener/queue.ts @@ -250,10 +250,6 @@ function buildQueuedTurnMessage( }; } -export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { - return parsed.messages.some((payload) => "content" in payload); -} - export function consumeQueuedTurn(runtime: ConversationRuntime): { dequeuedBatch: DequeuedBatch; queuedTurn: IncomingMessage; @@ -279,7 +275,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): { } } - if (!hasMessage || queueLen === 0) { + if (!hasMessage) { + return null; + } + if (queueLen === 0) { return null; } @@ -299,6 +298,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): { }; } +export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean { + return parsed.messages.some((payload) => "content" in payload); +} + function computeListenerQueueBlockedReason( runtime: ConversationRuntime, ): QueueBlockedReason | null { diff --git a/src/websocket/listener/runtime.ts b/src/websocket/listener/runtime.ts index ba04eb1..1dadb44 100644 --- a/src/websocket/listener/runtime.ts +++ b/src/websocket/listener/runtime.ts @@ -225,11 +225,9 @@ export function clearConversationRuntimeState( runtime: ConversationRuntime, ): void { runtime.cancelRequested = true; - if ( - runtime.activeAbortController && - !runtime.activeAbortController.signal.aborted - ) { - runtime.activeAbortController.abort(); + const activeAbortController = runtime.activeAbortController; + if (activeAbortController && !activeAbortController.signal.aborted) { + activeAbortController.abort(); } runtime.pendingApprovalBatchByToolCallId.clear(); runtime.pendingInterruptedResults = null; diff --git a/src/websocket/listener/turn-approval.ts b/src/websocket/listener/turn-approval.ts index 586a843..324f02b 100644 --- a/src/websocket/listener/turn-approval.ts +++ b/src/websocket/listener/turn-approval.ts @@ -95,6 +95,7 @@ export async function handleApprovalStop(params: { currentInput: Array; pendingNormalizationInterruptedToolCallIds: string[]; turnToolContextId: string | null; + abortSignal: AbortSignal; buildSendOptions: () => Parameters< typeof sendApprovalContinuationWithRetry >[2]; @@ -112,13 +113,9 @@ export async function handleApprovalStop(params: { msgRunIds, currentInput, turnToolContextId, + abortSignal, buildSendOptions, } = params; - const abortController = runtime.activeAbortController; - - if (!abortController) { - throw new Error("Missing active abort controller during approval handling"); - } if (approvals.length === 0) { runtime.lastStopReason = "error"; @@ -283,7 +280,7 @@ export async function handleApprovalStop(params: { const executionResults = await executeApprovalBatch(decisions, undefined, { toolContextId: turnToolContextId ?? undefined, - abortSignal: abortController.signal, + abortSignal, workingDirectory: turnWorkingDirectory, }); const persistedExecutionResults = normalizeExecutionResultsForInterruptParity( @@ -331,7 +328,6 @@ export async function handleApprovalStop(params: { nextInput.push(...queuedTurn.messages); emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); } - setLoopStatus(runtime, "SENDING_API_REQUEST", { agent_id: agentId, conversation_id: conversationId, @@ -342,7 +338,7 @@ export async function handleApprovalStop(params: { buildSendOptions(), socket, runtime, - abortController.signal, + abortSignal, ); if (!stream) { return { diff --git a/src/websocket/listener/turn.ts b/src/websocket/listener/turn.ts index 3530e61..31a09ec 100644 --- a/src/websocket/listener/turn.ts +++ b/src/websocket/listener/turn.ts @@ -122,7 +122,9 @@ export async function handleIncomingMessage( runtime.isProcessing = true; runtime.cancelRequested = false; - runtime.activeAbortController = new AbortController(); + const turnAbortController = new AbortController(); + runtime.activeAbortController = turnAbortController; + const turnAbortSignal = turnAbortController.signal; runtime.activeWorkingDirectory = turnWorkingDirectory; runtime.activeRunId = null; runtime.activeRunStartedAt = new Date().toISOString(); @@ -240,7 +242,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -248,7 +250,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -275,7 +277,7 @@ export async function handleIncomingMessage( stream as Stream, buffers, () => {}, - runtime.activeAbortController.signal, + turnAbortSignal, undefined, ({ chunk, shouldOutput, errorInfo }) => { const maybeRunId = (chunk as { run_id?: unknown }).run_id; @@ -428,7 +430,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -436,7 +438,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -492,7 +494,7 @@ export async function handleIncomingMessage( }); await new Promise((resolve) => setTimeout(resolve, delayMs)); - if (runtime.activeAbortController.signal.aborted) { + if (turnAbortSignal.aborted) { throw new Error("Cancelled by user"); } @@ -511,7 +513,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -519,7 +521,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -563,7 +565,7 @@ export async function handleIncomingMessage( }); await new Promise((resolve) => setTimeout(resolve, delayMs)); - if (runtime.activeAbortController.signal.aborted) { + if (turnAbortSignal.aborted) { throw new Error("Cancelled by user"); } @@ -582,7 +584,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ) : await sendMessageStreamWithRetry( conversationId, @@ -590,7 +592,7 @@ export async function handleIncomingMessage( buildSendOptions(), socket, runtime, - runtime.activeAbortController.signal, + turnAbortSignal, ); if (!stream) { return; @@ -672,6 +674,7 @@ export async function handleIncomingMessage( currentInput, pendingNormalizationInterruptedToolCallIds, turnToolContextId, + abortSignal: turnAbortSignal, buildSendOptions, }); if (approvalResult.terminated || !approvalResult.stream) {