fix(listener): harden interrupt completion and approval recovery (#1492)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-03-23 17:25:17 -07:00
committed by GitHub
parent 5f30588b7a
commit b8d6e199e4
10 changed files with 236 additions and 61 deletions

View File

@@ -185,6 +185,7 @@ get_fm_value() {
# Skip skill SKILL.md files — they use a different frontmatter format. # Skip skill SKILL.md files — they use a different frontmatter format.
for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do for file in $(git diff --cached --name-only --diff-filter=ACM | grep -E '^(memory/)?(system|reference)/.*\\.md$'); do
staged=$(git show ":$file") staged=$(git show ":$file")
staged=$(printf '%s' "$staged" | tr -d '\r')
# Frontmatter is required # Frontmatter is required
first_line=$(echo "$staged" | head -1) first_line=$(echo "$staged" | head -1)

View File

@@ -6,6 +6,13 @@ import { shell } from "../tools/impl/Shell.js";
const isWindows = process.platform === "win32"; const isWindows = process.platform === "win32";
function getEchoCommand(...args: string[]): string[] {
if (isWindows) {
return ["cmd.exe", "/c", "echo", ...args];
}
return ["/usr/bin/env", "echo", ...args];
}
describe("shell codex tool", () => { describe("shell codex tool", () => {
let tempDir: string; let tempDir: string;
@@ -18,11 +25,13 @@ describe("shell codex tool", () => {
test("executes simple command with execvp-style args", async () => { test("executes simple command with execvp-style args", async () => {
const result = await shell({ const result = await shell({
command: ["echo", "hello", "world"], command: getEchoCommand("hello", "world"),
}); });
expect(result.output).toBe("hello world"); expect(result.output.replaceAll('"', "")).toBe("hello world");
expect(result.stdout).toContain("hello world"); expect(result.stdout.join(" ").replaceAll('"', "")).toContain(
"hello world",
);
expect(result.stderr.length).toBe(0); expect(result.stderr.length).toBe(0);
}); });
@@ -54,10 +63,10 @@ describe("shell codex tool", () => {
// This is the key test for execvp semantics - args with spaces // This is the key test for execvp semantics - args with spaces
// should NOT be split // should NOT be split
const result = await shell({ const result = await shell({
command: ["echo", "hello world", "foo bar"], command: getEchoCommand("hello world", "foo bar"),
}); });
expect(result.output).toBe("hello world foo bar"); expect(result.output.replaceAll('"', "")).toBe("hello world foo bar");
}); });
test.skipIf(isWindows)("respects workdir parameter", async () => { test.skipIf(isWindows)("respects workdir parameter", async () => {
@@ -180,7 +189,7 @@ describe("shell codex tool", () => {
test("handles special characters in arguments", async () => { test("handles special characters in arguments", async () => {
const result = await shell({ const result = await shell({
command: ["echo", "$HOME", "$(whoami)", "`date`"], command: getEchoCommand("$HOME", "$(whoami)", "`date`"),
}); });
// Since we're using execvp-style (not shell expansion), // Since we're using execvp-style (not shell expansion),

View File

@@ -1,6 +1,5 @@
import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test";
import WebSocket from "ws"; import WebSocket from "ws";
import type { ResumeData } from "../../agent/check-approval";
import { permissionMode } from "../../permissions/mode"; import { permissionMode } from "../../permissions/mode";
import type { import type {
MessageQueueItem, MessageQueueItem,
@@ -68,13 +67,6 @@ const getClientMock = mock(async () => ({
cancel: cancelConversationMock, cancel: cancelConversationMock,
}, },
})); }));
const getResumeDataMock = mock(
async (): Promise<ResumeData> => ({
pendingApproval: null,
pendingApprovals: [],
messageHistory: [],
}),
);
const classifyApprovalsMock = mock(async () => ({ const classifyApprovalsMock = mock(async () => ({
autoAllowed: [], autoAllowed: [],
autoDenied: [], autoDenied: [],
@@ -203,7 +195,6 @@ describe("listen-client multi-worker concurrency", () => {
drainStreamWithResumeMock.mockClear(); drainStreamWithResumeMock.mockClear();
getClientMock.mockClear(); getClientMock.mockClear();
retrieveAgentMock.mockClear(); retrieveAgentMock.mockClear();
getResumeDataMock.mockClear();
classifyApprovalsMock.mockClear(); classifyApprovalsMock.mockClear();
executeApprovalBatchMock.mockClear(); executeApprovalBatchMock.mockClear();
cancelConversationMock.mockClear(); cancelConversationMock.mockClear();
@@ -706,11 +697,6 @@ describe("listen-client multi-worker concurrency", () => {
status: "success", status: "success",
}; };
getResumeDataMock.mockResolvedValueOnce({
pendingApproval: approval,
pendingApprovals: [approval],
messageHistory: [],
});
classifyApprovalsMock.mockResolvedValueOnce({ classifyApprovalsMock.mockResolvedValueOnce({
autoAllowed: [ autoAllowed: [
{ {
@@ -757,7 +743,13 @@ describe("listen-client multi-worker concurrency", () => {
runtime, runtime,
socket as unknown as WebSocket, socket as unknown as WebSocket,
new AbortController().signal, new AbortController().signal,
{ getResumeData: getResumeDataMock }, {
getResumeData: async () => ({
pendingApproval: approval,
pendingApprovals: [approval],
messageHistory: [],
}),
},
); );
await waitFor(() => sendMessageStreamMock.mock.calls.length === 1); await waitFor(() => sendMessageStreamMock.mock.calls.length === 1);
@@ -862,6 +854,77 @@ describe("listen-client multi-worker concurrency", () => {
expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true); expect(listener.conversationRuntimes.has(runtimeA.key)).toBe(true);
}); });
test("stale approval response after approval-only interrupt unlatches cancelRequested and allows queue pump", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener);
const runtime = __listenClientTestUtils.getOrCreateScopedRuntime(
listener,
"agent-1",
"conv-a",
);
const socket = new MockSocket();
runtime.cancelRequested = true;
runtime.isProcessing = false;
runtime.loopStatus = "WAITING_ON_INPUT";
const queueInput = {
kind: "message",
source: "user",
content: "queued after stale approval",
clientMessageId: "cm-stale-approval",
agentId: "agent-1",
conversationId: "conv-a",
} satisfies Omit<MessageQueueItem, "id" | "enqueuedAt">;
const item = runtime.queueRuntime.enqueue(queueInput);
if (!item) {
throw new Error("Expected queued item to be created");
}
runtime.queuedMessagesByItemId.set(
item.id,
makeIncomingMessage("agent-1", "conv-a", "queued after stale approval"),
);
const scheduleQueuePumpMock = mock(() => {
__listenClientTestUtils.scheduleQueuePump(
runtime,
socket as unknown as WebSocket,
{} as never,
async () => {},
);
});
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
listener,
{
runtime: { agent_id: "agent-1", conversation_id: "conv-a" },
response: {
request_id: "perm-stale-after-approval-only-interrupt",
decision: { behavior: "allow" },
},
socket: socket as unknown as WebSocket,
opts: {
onStatusChange: undefined,
connectionId: "conn-1",
},
processQueuedTurn: async () => {},
},
{
resolveRuntimeForApprovalRequest: () => null,
resolvePendingApprovalResolver: () => false,
getOrCreateScopedRuntime: () => runtime,
resolveRecoveredApprovalResponse: async () => false,
scheduleQueuePump: scheduleQueuePumpMock,
},
);
expect(handled).toBe(false);
expect(runtime.cancelRequested).toBe(false);
expect(scheduleQueuePumpMock).toHaveBeenCalledTimes(1);
await waitFor(() => runtime.queuePumpScheduled === false);
expect(runtime.queueRuntime.length).toBe(0);
});
test("change_device_state command holds queued input until the tracked command completes", async () => { test("change_device_state command holds queued input until the tracked command completes", async () => {
const listener = __listenClientTestUtils.createListenerRuntime(); const listener = __listenClientTestUtils.createListenerRuntime();
__listenClientTestUtils.setActiveRuntime(listener); __listenClientTestUtils.setActiveRuntime(listener);

View File

@@ -277,6 +277,35 @@ describe("listen-client approval resolver wiring", () => {
expect(runtime.pendingApprovalResolvers.size).toBe(0); expect(runtime.pendingApprovalResolvers.size).toBe(0);
}); });
test("resolving final approval response restores WAITING_ON_INPUT even while processing stays true", async () => {
const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN);
const requestId = "perm-processing";
runtime.isProcessing = true;
runtime.loopStatus = "WAITING_ON_APPROVAL" as never;
const pending = requestApprovalOverWS(
runtime,
socket as unknown as WebSocket,
requestId,
makeControlRequest(requestId),
);
const resolved = resolvePendingApprovalResolver(
runtime,
makeSuccessResponse(requestId),
);
expect(resolved).toBe(true);
await expect(pending).resolves.toMatchObject({
request_id: requestId,
decision: { behavior: "allow" },
});
expect(String(runtime.loopStatus)).toBe("WAITING_ON_INPUT");
expect(runtime.isProcessing).toBe(true);
});
test("ignores non-matching request_id and keeps pending resolver", async () => { test("ignores non-matching request_id and keeps pending resolver", async () => {
const runtime = __listenClientTestUtils.createRuntime(); const runtime = __listenClientTestUtils.createRuntime();
const socket = new MockSocket(WebSocket.OPEN); const socket = new MockSocket(WebSocket.OPEN);
@@ -329,8 +358,9 @@ describe("listen-client approval resolver wiring", () => {
await expect(second).rejects.toThrow("socket closed"); await expect(second).rejects.toThrow("socket closed");
}); });
test("cleanup resets WAITING_ON_INPUT instead of restoring fake processing", async () => { test("cleanup resets loop status to WAITING_ON_INPUT even while processing stays true", async () => {
const runtime = __listenClientTestUtils.createRuntime(); const runtime = __listenClientTestUtils.createRuntime();
runtime.isProcessing = true; runtime.isProcessing = true;
runtime.loopStatus = "WAITING_ON_APPROVAL"; runtime.loopStatus = "WAITING_ON_APPROVAL";
@@ -341,6 +371,7 @@ describe("listen-client approval resolver wiring", () => {
rejectPendingApprovalResolvers(runtime, "socket closed"); rejectPendingApprovalResolvers(runtime, "socket closed");
expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT"); expect(runtime.loopStatus as string).toBe("WAITING_ON_INPUT");
expect(runtime.isProcessing).toBe(true);
await expect(pending).rejects.toThrow("socket closed"); await expect(pending).rejects.toThrow("socket closed");
}); });
@@ -1302,6 +1333,59 @@ describe("listen-client capability-gated approval flow", () => {
expect.any(Function), expect.any(Function),
); );
}); });
test("stale approval responses after interrupt are benign and do not mutate runtime state", async () => {
const listener = __listenClientTestUtils.createListenerRuntime();
const targetRuntime =
__listenClientTestUtils.getOrCreateConversationRuntime(
listener,
"agent-1",
"default",
);
targetRuntime.cancelRequested = true;
targetRuntime.loopStatus = "WAITING_ON_INPUT";
targetRuntime.isProcessing = false;
const socket = new MockSocket(WebSocket.OPEN);
const scheduleQueuePumpMock = mock(() => {});
const resolveRecoveredApprovalResponseMock = mock(async () => false);
const handled = await __listenClientTestUtils.handleApprovalResponseInput(
listener,
{
runtime: { agent_id: "agent-1", conversation_id: "default" },
response: {
request_id: "perm-stale-after-interrupt",
decision: { behavior: "allow" },
},
socket: socket as unknown as WebSocket,
opts: {
onStatusChange: undefined,
connectionId: "conn-1",
},
processQueuedTurn: async () => {},
},
{
resolveRuntimeForApprovalRequest: () => null,
resolvePendingApprovalResolver: () => false,
getOrCreateScopedRuntime: () => targetRuntime,
resolveRecoveredApprovalResponse: resolveRecoveredApprovalResponseMock,
scheduleQueuePump: scheduleQueuePumpMock,
},
);
expect(handled).toBe(false);
expect(resolveRecoveredApprovalResponseMock).not.toHaveBeenCalled();
expect(scheduleQueuePumpMock).toHaveBeenCalledWith(
targetRuntime,
socket,
expect.objectContaining({ connectionId: "conn-1" }),
expect.any(Function),
);
expect(targetRuntime.cancelRequested).toBe(false);
expect(targetRuntime.loopStatus).toBe("WAITING_ON_INPUT");
expect(targetRuntime.isProcessing).toBe(false);
});
}); });
describe("listen-client approval recovery batch correlation", () => { describe("listen-client approval recovery batch correlation", () => {

View File

@@ -12,7 +12,14 @@ export type ListenerQueueGatingConditions = {
export function getListenerBlockedReason( export function getListenerBlockedReason(
c: ListenerQueueGatingConditions, c: ListenerQueueGatingConditions,
): QueueBlockedReason | null { ): QueueBlockedReason | null {
if (c.cancelRequested) return "interrupt_in_progress"; if (
c.cancelRequested &&
(c.isProcessing ||
c.isRecoveringApprovals ||
c.loopStatus !== "WAITING_ON_INPUT")
) {
return "interrupt_in_progress";
}
if (c.pendingApprovalsLen > 0) return "pending_approvals"; if (c.pendingApprovalsLen > 0) return "pending_approvals";
if (c.isRecoveringApprovals) return "runtime_busy"; if (c.isRecoveringApprovals) return "runtime_busy";
if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals"; if (c.loopStatus === "WAITING_ON_APPROVAL") return "pending_approvals";

View File

@@ -339,6 +339,18 @@ async function handleApprovalResponseInput(
params.runtime.agent_id, params.runtime.agent_id,
params.runtime.conversation_id, params.runtime.conversation_id,
); );
if (targetRuntime.cancelRequested && !targetRuntime.isProcessing) {
targetRuntime.cancelRequested = false;
deps.scheduleQueuePump(
targetRuntime,
params.socket,
params.opts as StartListenerOptions,
params.processQueuedTurn,
);
return false;
}
if ( if (
await deps.resolveRecoveredApprovalResponse( await deps.resolveRecoveredApprovalResponse(
targetRuntime, targetRuntime,
@@ -422,11 +434,9 @@ async function handleChangeDeviceStateInput(
const shouldTrackCommand = const shouldTrackCommand =
!scopedRuntime.isProcessing && !scopedRuntime.isProcessing &&
resolvedDeps.getPendingControlRequestCount(listener, scope) === 0; resolvedDeps.getPendingControlRequestCount(listener, scope) === 0;
if (shouldTrackCommand) { if (shouldTrackCommand) {
resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope); resolvedDeps.setLoopStatus(scopedRuntime, "EXECUTING_COMMAND", scope);
} }
try { try {
if (params.command.payload.mode) { if (params.command.payload.mode) {
resolvedDeps.handleModeChange( resolvedDeps.handleModeChange(
@@ -436,7 +446,6 @@ async function handleChangeDeviceStateInput(
scope, scope,
); );
} }
if (params.command.payload.cwd) { if (params.command.payload.cwd) {
await resolvedDeps.handleCwdChange( await resolvedDeps.handleCwdChange(
{ {
@@ -949,11 +958,9 @@ async function connectWithRetry(
...scopedRuntime.activeExecutingToolCallIds, ...scopedRuntime.activeExecutingToolCallIds,
]; ];
} }
if ( const activeAbortController = scopedRuntime.activeAbortController;
scopedRuntime.activeAbortController && if (activeAbortController && !activeAbortController.signal.aborted) {
!scopedRuntime.activeAbortController.signal.aborted activeAbortController.abort();
) {
scopedRuntime.activeAbortController.abort();
} }
const recoveredApprovalState = getRecoveredApprovalStateForScope( const recoveredApprovalState = getRecoveredApprovalStateForScope(
runtime, runtime,
@@ -977,6 +984,10 @@ async function connectWithRetry(
}); });
} }
if (!hasActiveTurn) {
scopedRuntime.cancelRequested = false;
}
// Backend cancel parity with TUI (App.tsx:5932-5941). // Backend cancel parity with TUI (App.tsx:5932-5941).
// Fire-and-forget — local cancel + queued results are the primary mechanism. // Fire-and-forget — local cancel + queued results are the primary mechanism.
const cancelConversationId = scopedRuntime.conversationId; const cancelConversationId = scopedRuntime.conversationId;

View File

@@ -250,10 +250,6 @@ function buildQueuedTurnMessage(
}; };
} }
export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
return parsed.messages.some((payload) => "content" in payload);
}
export function consumeQueuedTurn(runtime: ConversationRuntime): { export function consumeQueuedTurn(runtime: ConversationRuntime): {
dequeuedBatch: DequeuedBatch; dequeuedBatch: DequeuedBatch;
queuedTurn: IncomingMessage; queuedTurn: IncomingMessage;
@@ -279,7 +275,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): {
} }
} }
if (!hasMessage || queueLen === 0) { if (!hasMessage) {
return null;
}
if (queueLen === 0) {
return null; return null;
} }
@@ -299,6 +298,10 @@ export function consumeQueuedTurn(runtime: ConversationRuntime): {
}; };
} }
export function shouldQueueInboundMessage(parsed: IncomingMessage): boolean {
return parsed.messages.some((payload) => "content" in payload);
}
function computeListenerQueueBlockedReason( function computeListenerQueueBlockedReason(
runtime: ConversationRuntime, runtime: ConversationRuntime,
): QueueBlockedReason | null { ): QueueBlockedReason | null {

View File

@@ -225,11 +225,9 @@ export function clearConversationRuntimeState(
runtime: ConversationRuntime, runtime: ConversationRuntime,
): void { ): void {
runtime.cancelRequested = true; runtime.cancelRequested = true;
if ( const activeAbortController = runtime.activeAbortController;
runtime.activeAbortController && if (activeAbortController && !activeAbortController.signal.aborted) {
!runtime.activeAbortController.signal.aborted activeAbortController.abort();
) {
runtime.activeAbortController.abort();
} }
runtime.pendingApprovalBatchByToolCallId.clear(); runtime.pendingApprovalBatchByToolCallId.clear();
runtime.pendingInterruptedResults = null; runtime.pendingInterruptedResults = null;

View File

@@ -95,6 +95,7 @@ export async function handleApprovalStop(params: {
currentInput: Array<MessageCreate | ApprovalCreate>; currentInput: Array<MessageCreate | ApprovalCreate>;
pendingNormalizationInterruptedToolCallIds: string[]; pendingNormalizationInterruptedToolCallIds: string[];
turnToolContextId: string | null; turnToolContextId: string | null;
abortSignal: AbortSignal;
buildSendOptions: () => Parameters< buildSendOptions: () => Parameters<
typeof sendApprovalContinuationWithRetry typeof sendApprovalContinuationWithRetry
>[2]; >[2];
@@ -112,13 +113,9 @@ export async function handleApprovalStop(params: {
msgRunIds, msgRunIds,
currentInput, currentInput,
turnToolContextId, turnToolContextId,
abortSignal,
buildSendOptions, buildSendOptions,
} = params; } = params;
const abortController = runtime.activeAbortController;
if (!abortController) {
throw new Error("Missing active abort controller during approval handling");
}
if (approvals.length === 0) { if (approvals.length === 0) {
runtime.lastStopReason = "error"; runtime.lastStopReason = "error";
@@ -283,7 +280,7 @@ export async function handleApprovalStop(params: {
const executionResults = await executeApprovalBatch(decisions, undefined, { const executionResults = await executeApprovalBatch(decisions, undefined, {
toolContextId: turnToolContextId ?? undefined, toolContextId: turnToolContextId ?? undefined,
abortSignal: abortController.signal, abortSignal,
workingDirectory: turnWorkingDirectory, workingDirectory: turnWorkingDirectory,
}); });
const persistedExecutionResults = normalizeExecutionResultsForInterruptParity( const persistedExecutionResults = normalizeExecutionResultsForInterruptParity(
@@ -331,7 +328,6 @@ export async function handleApprovalStop(params: {
nextInput.push(...queuedTurn.messages); nextInput.push(...queuedTurn.messages);
emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch); emitDequeuedUserMessage(socket, runtime, queuedTurn, dequeuedBatch);
} }
setLoopStatus(runtime, "SENDING_API_REQUEST", { setLoopStatus(runtime, "SENDING_API_REQUEST", {
agent_id: agentId, agent_id: agentId,
conversation_id: conversationId, conversation_id: conversationId,
@@ -342,7 +338,7 @@ export async function handleApprovalStop(params: {
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
abortController.signal, abortSignal,
); );
if (!stream) { if (!stream) {
return { return {

View File

@@ -122,7 +122,9 @@ export async function handleIncomingMessage(
runtime.isProcessing = true; runtime.isProcessing = true;
runtime.cancelRequested = false; runtime.cancelRequested = false;
runtime.activeAbortController = new AbortController(); const turnAbortController = new AbortController();
runtime.activeAbortController = turnAbortController;
const turnAbortSignal = turnAbortController.signal;
runtime.activeWorkingDirectory = turnWorkingDirectory; runtime.activeWorkingDirectory = turnWorkingDirectory;
runtime.activeRunId = null; runtime.activeRunId = null;
runtime.activeRunStartedAt = new Date().toISOString(); runtime.activeRunStartedAt = new Date().toISOString();
@@ -240,7 +242,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
) )
: await sendMessageStreamWithRetry( : await sendMessageStreamWithRetry(
conversationId, conversationId,
@@ -248,7 +250,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
); );
if (!stream) { if (!stream) {
return; return;
@@ -275,7 +277,7 @@ export async function handleIncomingMessage(
stream as Stream<LettaStreamingResponse>, stream as Stream<LettaStreamingResponse>,
buffers, buffers,
() => {}, () => {},
runtime.activeAbortController.signal, turnAbortSignal,
undefined, undefined,
({ chunk, shouldOutput, errorInfo }) => { ({ chunk, shouldOutput, errorInfo }) => {
const maybeRunId = (chunk as { run_id?: unknown }).run_id; const maybeRunId = (chunk as { run_id?: unknown }).run_id;
@@ -428,7 +430,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
) )
: await sendMessageStreamWithRetry( : await sendMessageStreamWithRetry(
conversationId, conversationId,
@@ -436,7 +438,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
); );
if (!stream) { if (!stream) {
return; return;
@@ -492,7 +494,7 @@ export async function handleIncomingMessage(
}); });
await new Promise((resolve) => setTimeout(resolve, delayMs)); await new Promise((resolve) => setTimeout(resolve, delayMs));
if (runtime.activeAbortController.signal.aborted) { if (turnAbortSignal.aborted) {
throw new Error("Cancelled by user"); throw new Error("Cancelled by user");
} }
@@ -511,7 +513,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
) )
: await sendMessageStreamWithRetry( : await sendMessageStreamWithRetry(
conversationId, conversationId,
@@ -519,7 +521,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
); );
if (!stream) { if (!stream) {
return; return;
@@ -563,7 +565,7 @@ export async function handleIncomingMessage(
}); });
await new Promise((resolve) => setTimeout(resolve, delayMs)); await new Promise((resolve) => setTimeout(resolve, delayMs));
if (runtime.activeAbortController.signal.aborted) { if (turnAbortSignal.aborted) {
throw new Error("Cancelled by user"); throw new Error("Cancelled by user");
} }
@@ -582,7 +584,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
) )
: await sendMessageStreamWithRetry( : await sendMessageStreamWithRetry(
conversationId, conversationId,
@@ -590,7 +592,7 @@ export async function handleIncomingMessage(
buildSendOptions(), buildSendOptions(),
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, turnAbortSignal,
); );
if (!stream) { if (!stream) {
return; return;
@@ -672,6 +674,7 @@ export async function handleIncomingMessage(
currentInput, currentInput,
pendingNormalizationInterruptedToolCallIds, pendingNormalizationInterruptedToolCallIds,
turnToolContextId, turnToolContextId,
abortSignal: turnAbortSignal,
buildSendOptions, buildSendOptions,
}); });
if (approvalResult.terminated || !approvalResult.stream) { if (approvalResult.terminated || !approvalResult.stream) {