From c95c4a472216cbda69f14596fffa7a8d7b4d7054 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Thu, 5 Mar 2026 20:42:56 -0800 Subject: [PATCH] revert: "fix(listen): normalize interrupt tool returns for websocket parity" (#1291) --- src/agent/approval-result-normalization.ts | 123 ---------- src/agent/message.ts | 11 +- .../approval-result-normalization.test.ts | 103 -------- .../websocket/listen-client-protocol.test.ts | 41 ---- .../websocket/listen-interrupt-queue.test.ts | 143 +---------- src/websocket/listen-client.ts | 223 +----------------- 6 files changed, 22 insertions(+), 622 deletions(-) delete mode 100644 src/agent/approval-result-normalization.ts delete mode 100644 src/tests/agent/approval-result-normalization.test.ts diff --git a/src/agent/approval-result-normalization.ts b/src/agent/approval-result-normalization.ts deleted file mode 100644 index 52a612a..0000000 --- a/src/agent/approval-result-normalization.ts +++ /dev/null @@ -1,123 +0,0 @@ -import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; -import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; -import { INTERRUPTED_BY_USER } from "../constants"; -import type { ApprovalResult } from "./approval-execution"; - -type OutgoingMessage = MessageCreate | ApprovalCreate; - -export type ApprovalNormalizationOptions = { - /** - * Structured interrupt provenance: tool_call_ids known to have been interrupted. - * When provided, these IDs are forced to persist as status=error. - */ - interruptedToolCallIds?: Iterable; - /** - * Temporary fallback guard for legacy drift where tool_return text is the only - * interrupt signal. Keep false by default for strict structured behavior. - */ - allowInterruptTextFallback?: boolean; -}; - -function normalizeToolReturnText(value: unknown): string { - if (typeof value === "string") return value; - - if (Array.isArray(value)) { - const text = value - .filter( - (part): part is { type: "text"; text: string } => - !!part && - typeof part === "object" && - "type" in part && - (part as { type?: unknown }).type === "text" && - "text" in part && - typeof (part as { text?: unknown }).text === "string", - ) - .map((part) => part.text) - .join("\n") - .trim(); - return text; - } - - if (value === null || value === undefined) return ""; - - try { - return JSON.stringify(value); - } catch { - return String(value); - } -} - -export function normalizeApprovalResultsForPersistence( - approvals: ApprovalResult[] | null | undefined, - options: ApprovalNormalizationOptions = {}, -): ApprovalResult[] { - if (!approvals || approvals.length === 0) return approvals ?? []; - - const interruptedSet = new Set(options.interruptedToolCallIds ?? []); - - return approvals.map((approval) => { - if ( - !approval || - typeof approval !== "object" || - !("type" in approval) || - approval.type !== "tool" - ) { - return approval; - } - - const toolCallId = - "tool_call_id" in approval && typeof approval.tool_call_id === "string" - ? approval.tool_call_id - : ""; - - const interruptedByStructuredId = - toolCallId.length > 0 && interruptedSet.has(toolCallId); - const interruptedByLegacyText = options.allowInterruptTextFallback - ? normalizeToolReturnText( - "tool_return" in approval ? approval.tool_return : "", - ) === INTERRUPTED_BY_USER - : false; - - if ( - (interruptedByStructuredId || interruptedByLegacyText) && - "status" in approval && - approval.status !== "error" - ) { - return { - ...approval, - status: "error" as const, - }; - } - - return approval; - }); -} - -export function normalizeOutgoingApprovalMessages( - messages: OutgoingMessage[], - options: ApprovalNormalizationOptions = {}, -): OutgoingMessage[] { - if (!messages || messages.length === 0) return messages; - - return messages.map((message) => { - if ( - !message || - typeof message !== "object" || - !("type" in message) || - message.type !== "approval" || - !("approvals" in message) - ) { - return message; - } - - const normalizedApprovals = normalizeApprovalResultsForPersistence( - message.approvals as ApprovalResult[], - options, - ); - - return { - ...message, - approvals: normalizedApprovals, - } as ApprovalCreate; - }); -} diff --git a/src/agent/message.ts b/src/agent/message.ts index 2f722a8..e6e22b9 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -13,10 +13,6 @@ import { waitForToolsetReady, } from "../tools/manager"; import { isTimingsEnabled } from "../utils/timing"; -import { - type ApprovalNormalizationOptions, - normalizeOutgoingApprovalMessages, -} from "./approval-result-normalization"; import { getClient } from "./client"; const streamRequestStartTimes = new WeakMap(); @@ -62,7 +58,6 @@ export async function sendMessageStream( streamTokens?: boolean; background?: boolean; agentId?: string; // Required when conversationId is "default" - approvalNormalization?: ApprovalNormalizationOptions; } = { streamTokens: true, background: true }, // Disable SDK retries by default - state management happens outside the stream, // so retries would violate idempotency and create race conditions @@ -87,13 +82,9 @@ export async function sendMessageStream( } const resolvedConversationId = conversationId; - const normalizedMessages = normalizeOutgoingApprovalMessages( - messages, - opts.approvalNormalization, - ); const requestBody = { - messages: normalizedMessages, + messages, streaming: true, stream_tokens: opts.streamTokens ?? true, background: opts.background ?? true, diff --git a/src/tests/agent/approval-result-normalization.test.ts b/src/tests/agent/approval-result-normalization.test.ts deleted file mode 100644 index 1093ffe..0000000 --- a/src/tests/agent/approval-result-normalization.test.ts +++ /dev/null @@ -1,103 +0,0 @@ -import { describe, expect, test } from "bun:test"; -import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; -import type { ApprovalResult } from "../../agent/approval-execution"; -import { - normalizeApprovalResultsForPersistence, - normalizeOutgoingApprovalMessages, -} from "../../agent/approval-result-normalization"; -import { INTERRUPTED_BY_USER } from "../../constants"; - -describe("normalizeApprovalResultsForPersistence", () => { - test("forces status=error for structured interrupted tool_call_ids", () => { - const approvals: ApprovalResult[] = [ - { - type: "tool", - tool_call_id: "call-1", - tool_return: "some return", - status: "success", - } as ApprovalResult, - ]; - - const normalized = normalizeApprovalResultsForPersistence(approvals, { - interruptedToolCallIds: ["call-1"], - }); - - expect(normalized[0]).toMatchObject({ - type: "tool", - tool_call_id: "call-1", - status: "error", - }); - }); - - test("does not modify non-interrupted tool results", () => { - const approvals: ApprovalResult[] = [ - { - type: "tool", - tool_call_id: "call-2", - tool_return: "ok", - status: "success", - } as ApprovalResult, - ]; - - const normalized = normalizeApprovalResultsForPersistence(approvals, { - interruptedToolCallIds: ["other-id"], - }); - - expect(normalized[0]).toMatchObject({ - type: "tool", - tool_call_id: "call-2", - status: "success", - }); - }); - - test("supports legacy fallback on interrupt text when explicitly enabled", () => { - const approvals: ApprovalResult[] = [ - { - type: "tool", - tool_call_id: "call-3", - tool_return: [{ type: "text", text: INTERRUPTED_BY_USER }], - status: "success", - } as ApprovalResult, - ]; - - const normalized = normalizeApprovalResultsForPersistence(approvals, { - allowInterruptTextFallback: true, - }); - - expect(normalized[0]).toMatchObject({ - type: "tool", - tool_call_id: "call-3", - status: "error", - }); - }); -}); - -describe("normalizeOutgoingApprovalMessages", () => { - test("normalizes approvals and preserves non-approval messages", () => { - const approvalMessage: ApprovalCreate = { - type: "approval", - approvals: [ - { - type: "tool", - tool_call_id: "call-7", - tool_return: "foo", - status: "success", - } as ApprovalResult, - ], - }; - - const messages = normalizeOutgoingApprovalMessages( - [{ role: "user", content: "hello" }, approvalMessage], - { interruptedToolCallIds: ["call-7"] }, - ); - - expect(messages[0]).toMatchObject({ role: "user", content: "hello" }); - const normalizedApproval = messages[1] as ApprovalCreate; - const approvals = normalizedApproval.approvals ?? []; - expect(approvals[0]).toMatchObject({ - type: "tool", - tool_call_id: "call-7", - status: "error", - }); - }); -}); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index beef0b9..2677373 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -640,44 +640,3 @@ describe("listen-client post-stop approval recovery policy", () => { expect(shouldRecover).toBe(false); }); }); - -describe("listen-client tool_return wire normalization", () => { - test("normalizes legacy top-level tool return fields to canonical tool_returns[]", () => { - const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({ - message_type: "tool_return_message", - id: "message-1", - run_id: "run-1", - tool_call_id: "call-1", - status: "error", - tool_return: [{ type: "text", text: "Interrupted by user" }], - }); - - expect(normalized).toEqual({ - message_type: "tool_return_message", - id: "message-1", - run_id: "run-1", - tool_returns: [ - { - tool_call_id: "call-1", - status: "error", - tool_return: "Interrupted by user", - }, - ], - }); - expect(normalized).not.toHaveProperty("tool_call_id"); - expect(normalized).not.toHaveProperty("status"); - expect(normalized).not.toHaveProperty("tool_return"); - }); - - test("returns null for tool_return_message when no canonical status is available", () => { - const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({ - message_type: "tool_return_message", - id: "message-2", - run_id: "run-2", - tool_call_id: "call-2", - tool_return: "maybe done", - }); - - expect(normalized).toBeNull(); - }); -}); diff --git a/src/tests/websocket/listen-interrupt-queue.test.ts b/src/tests/websocket/listen-interrupt-queue.test.ts index 4b3fb6a..cffc612 100644 --- a/src/tests/websocket/listen-interrupt-queue.test.ts +++ b/src/tests/websocket/listen-interrupt-queue.test.ts @@ -178,29 +178,6 @@ describe("extractInterruptToolReturns", () => { ]); }); - test("converts multimodal tool_return content into displayable text", () => { - const results: ApprovalResult[] = [ - { - type: "tool", - tool_call_id: "call-multimodal", - status: "error", - tool_return: [ - { type: "text", text: "Interrupted by user" }, - { type: "image", image_url: "https://example.com/image.png" }, - ], - } as ApprovalResult, - ]; - - const mapped = extractInterruptToolReturns(results); - expect(mapped).toEqual([ - { - tool_call_id: "call-multimodal", - status: "error", - tool_return: "Interrupted by user", - }, - ]); - }); - test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => { const runtime = createRuntime(); const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; @@ -231,26 +208,16 @@ describe("extractInterruptToolReturns", () => { expect(toolReturnFrames).toHaveLength(2); expect(toolReturnFrames[0]).toMatchObject({ run_id: "run-1", - tool_returns: [ - { tool_call_id: "call-a", status: "success", tool_return: "704" }, - ], + tool_call_id: "call-a", + status: "success", + tool_returns: [{ tool_call_id: "call-a", status: "success" }], }); expect(toolReturnFrames[1]).toMatchObject({ run_id: "run-1", - tool_returns: [ - { - tool_call_id: "call-b", - status: "error", - tool_return: "User interrupted the stream", - }, - ], + tool_call_id: "call-b", + status: "error", + tool_returns: [{ tool_call_id: "call-b", status: "error" }], }); - expect(toolReturnFrames[0]).not.toHaveProperty("tool_call_id"); - expect(toolReturnFrames[0]).not.toHaveProperty("status"); - expect(toolReturnFrames[0]).not.toHaveProperty("tool_return"); - expect(toolReturnFrames[1]).not.toHaveProperty("tool_call_id"); - expect(toolReturnFrames[1]).not.toHaveProperty("status"); - expect(toolReturnFrames[1]).not.toHaveProperty("tool_return"); }); }); @@ -338,14 +305,13 @@ describe("Path A: cancel during tool execution → next turn consumes actual res // Cancel fires: populateInterruptQueue (Path A — has execution results) const populated = populateInterruptQueue(runtime, { lastExecutionResults: executionResults, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: ["call-1", "call-2"], agentId, conversationId, }); expect(populated).toBe(true); - expect(runtime.pendingInterruptedResults).toEqual(executionResults); + expect(runtime.pendingInterruptedResults).toBe(executionResults); expect(runtime.pendingInterruptedContext).toMatchObject({ agentId, conversationId, @@ -357,7 +323,7 @@ describe("Path A: cancel during tool execution → next turn consumes actual res expect(consumed).not.toBeNull(); expect(consumed?.type).toBe("approval"); - expect(consumed?.approvals).toEqual(executionResults); + expect(consumed?.approvals).toBe(executionResults); expect(consumed?.approvals).toHaveLength(2); // Queue is atomically cleared after consumption @@ -376,7 +342,6 @@ describe("Path A: cancel during tool execution → next turn consumes actual res const populated = populateInterruptQueue(runtime, { lastExecutionResults: executionResults, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: ["call-1"], agentId: "agent-1", conversationId: "conv-1", @@ -388,85 +353,9 @@ describe("Path A: cancel during tool execution → next turn consumes actual res approve: true, // Path A preserves actual approval state }); }); - - test("normalizes interrupted tool results to error via structured tool_call_id", () => { - const runtime = createRuntime(); - const executionResults: ApprovalResult[] = [ - { - type: "tool", - tool_call_id: "call-1", - status: "success", - tool_return: "result text does not matter when ID is interrupted", - } as unknown as ApprovalResult, - ]; - - const populated = populateInterruptQueue(runtime, { - lastExecutionResults: executionResults, - lastExecutingToolCallIds: ["call-1"], - lastNeedsUserInputToolCallIds: [], - agentId: "agent-1", - conversationId: "conv-1", - }); - - expect(populated).toBe(true); - expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ - type: "tool", - tool_call_id: "call-1", - status: "error", - }); - }); - - test("keeps legacy text fallback for interrupted tool return normalization", () => { - const runtime = createRuntime(); - const executionResults: ApprovalResult[] = [ - { - type: "tool", - tool_call_id: "call-legacy", - status: "success", - tool_return: [{ type: "text", text: "Interrupted by user" }], - } as unknown as ApprovalResult, - ]; - - const populated = populateInterruptQueue(runtime, { - lastExecutionResults: executionResults, - lastExecutingToolCallIds: [], - lastNeedsUserInputToolCallIds: [], - agentId: "agent-1", - conversationId: "conv-1", - }); - - expect(populated).toBe(true); - expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ - type: "tool", - tool_call_id: "call-legacy", - status: "error", - }); - }); }); describe("Path B: cancel during approval wait → next turn consumes synthesized denials", () => { - test("prefers synthesized tool-error results when execution was already in-flight", () => { - const runtime = createRuntime(); - - const populated = populateInterruptQueue(runtime, { - lastExecutionResults: null, - lastExecutingToolCallIds: ["call-running-1"], - lastNeedsUserInputToolCallIds: ["call-running-1"], - agentId: "agent-1", - conversationId: "conv-1", - }); - - expect(populated).toBe(true); - expect(runtime.pendingInterruptedResults).toEqual([ - { - type: "tool", - tool_call_id: "call-running-1", - tool_return: "Interrupted by user", - status: "error", - }, - ]); - }); - test("full sequence: populate from batch map IDs → consume synthesized denials", () => { const runtime = createRuntime(); const agentId = "agent-abc"; @@ -482,7 +371,6 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized // Cancel fires during approval wait: no execution results const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId, conversationId, @@ -521,7 +409,6 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized // No batch map entries, but we have the snapshot IDs const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: ["call-a", "call-b"], agentId: "agent-1", conversationId: "conv-1", @@ -540,7 +427,6 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -567,7 +453,6 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", ( reason: "cancelled", }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId, conversationId: convId, @@ -591,7 +476,6 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", ( lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId, conversationId: convId, @@ -612,7 +496,6 @@ describe("idempotency: first cancel populates, second is no-op", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-first", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -628,7 +511,6 @@ describe("idempotency: first cancel populates, second is no-op", () => { reason: "x", }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -648,7 +530,6 @@ describe("idempotency: first cancel populates, second is no-op", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -662,7 +543,6 @@ describe("idempotency: first cancel populates, second is no-op", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-2", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -684,7 +564,6 @@ describe("epoch guard: stale context discarded on consume", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -708,7 +587,6 @@ describe("epoch guard: stale context discarded on consume", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-old", conversationId: "conv-1", @@ -727,7 +605,6 @@ describe("epoch guard: stale context discarded on consume", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-old", @@ -746,7 +623,6 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial", // Also batch map should be cleared by clearPendingApprovalBatchIds const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], // cleared after send agentId: "agent-1", conversationId: "conv-1", @@ -764,7 +640,6 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial", const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], // cleared from previous send agentId: "agent-1", conversationId: "conv-1", @@ -841,7 +716,6 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -860,7 +734,6 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], - lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-old", conversationId: "conv-old", diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 6bd1311..774e079 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -18,7 +18,6 @@ import { executeApprovalBatch, } from "../agent/approval-execution"; import { fetchRunErrorDetail } from "../agent/approval-recovery"; -import { normalizeApprovalResultsForPersistence } from "../agent/approval-result-normalization"; import { getResumeData } from "../agent/check-approval"; import { getClient } from "../agent/client"; import { getStreamToolContextId, sendMessageStream } from "../agent/message"; @@ -35,7 +34,6 @@ import { createBuffers } from "../cli/helpers/accumulator"; import { classifyApprovals } from "../cli/helpers/approvalClassification"; import { generatePlanFilePath } from "../cli/helpers/planName"; import { drainStreamWithResume } from "../cli/helpers/stream"; -import { INTERRUPTED_BY_USER } from "../constants"; import { computeDiffPreviews } from "../helpers/diffPreview"; import { permissionMode } from "../permissions/mode"; import { type QueueItem, QueueRuntime } from "../queue/queueRuntime"; @@ -909,7 +907,6 @@ function shouldAttemptPostStopApprovalRecovery(params: { interface InterruptPopulateInput { lastExecutionResults: ApprovalResult[] | null; - lastExecutingToolCallIds: string[]; lastNeedsUserInputToolCallIds: string[]; agentId: string; conversationId: string; @@ -923,48 +920,10 @@ interface InterruptToolReturn { stderr?: string[]; } -function asToolReturnStatus(value: unknown): "success" | "error" | null { - if (value === "success" || value === "error") { - return value; - } - return null; -} - function normalizeToolReturnValue(value: unknown): string { if (typeof value === "string") { return value; } - if (Array.isArray(value)) { - const textParts = value - .filter( - ( - part, - ): part is { - type: string; - text: string; - } => - !!part && - typeof part === "object" && - "type" in part && - part.type === "text" && - "text" in part && - typeof part.text === "string", - ) - .map((part) => part.text); - if (textParts.length > 0) { - return textParts.join("\n"); - } - } - if ( - value && - typeof value === "object" && - "type" in value && - value.type === "text" && - "text" in value && - typeof value.text === "string" - ) { - return value.text; - } if (value === null || value === undefined) { return ""; } @@ -975,116 +934,6 @@ function normalizeToolReturnValue(value: unknown): string { } } -function normalizeInterruptedApprovalsForQueue( - approvals: ApprovalResult[] | null, - interruptedToolCallIds: string[], -): ApprovalResult[] | null { - if (!approvals || approvals.length === 0) { - return approvals; - } - - return normalizeApprovalResultsForPersistence(approvals, { - interruptedToolCallIds, - // Temporary fallback guard while all producers migrate to structured IDs. - allowInterruptTextFallback: true, - }); -} - -function extractCanonicalToolReturnsFromWire( - payload: Record, -): InterruptToolReturn[] { - const fromArray: InterruptToolReturn[] = []; - const toolReturnsValue = payload.tool_returns; - if (Array.isArray(toolReturnsValue)) { - for (const raw of toolReturnsValue) { - if (!raw || typeof raw !== "object") { - continue; - } - const rec = raw as Record; - const toolCallId = - typeof rec.tool_call_id === "string" ? rec.tool_call_id : null; - const status = asToolReturnStatus(rec.status); - if (!toolCallId || !status) { - continue; - } - const stdout = Array.isArray(rec.stdout) - ? rec.stdout.filter( - (entry): entry is string => typeof entry === "string", - ) - : undefined; - const stderr = Array.isArray(rec.stderr) - ? rec.stderr.filter( - (entry): entry is string => typeof entry === "string", - ) - : undefined; - fromArray.push({ - tool_call_id: toolCallId, - status, - tool_return: normalizeToolReturnValue(rec.tool_return), - ...(stdout ? { stdout } : {}), - ...(stderr ? { stderr } : {}), - }); - } - } - if (fromArray.length > 0) { - return fromArray; - } - - const topLevelToolCallId = - typeof payload.tool_call_id === "string" ? payload.tool_call_id : null; - const topLevelStatus = asToolReturnStatus(payload.status); - if (!topLevelToolCallId || !topLevelStatus) { - return []; - } - const stdout = Array.isArray(payload.stdout) - ? payload.stdout.filter( - (entry): entry is string => typeof entry === "string", - ) - : undefined; - const stderr = Array.isArray(payload.stderr) - ? payload.stderr.filter( - (entry): entry is string => typeof entry === "string", - ) - : undefined; - return [ - { - tool_call_id: topLevelToolCallId, - status: topLevelStatus, - tool_return: normalizeToolReturnValue(payload.tool_return), - ...(stdout ? { stdout } : {}), - ...(stderr ? { stderr } : {}), - }, - ]; -} - -function normalizeToolReturnWireMessage( - chunk: Record, -): Record | null { - if (chunk.message_type !== "tool_return_message") { - return chunk; - } - - const canonicalToolReturns = extractCanonicalToolReturnsFromWire(chunk); - if (canonicalToolReturns.length === 0) { - return null; - } - - const { - tool_call_id: _toolCallId, - status: _status, - tool_return: _toolReturn, - stdout: _stdout, - stderr: _stderr, - ...rest - } = chunk; - - return { - ...rest, - message_type: "tool_return_message", - tool_returns: canonicalToolReturns, - }; -} - function extractInterruptToolReturns( approvals: ApprovalResult[] | null, ): InterruptToolReturn[] { @@ -1181,15 +1030,12 @@ function emitInterruptToolReturnMessage( id: `message-${crypto.randomUUID()}`, date: new Date().toISOString(), run_id: resolvedRunId, - tool_returns: [ - { - tool_call_id: toolReturn.tool_call_id, - status: toolReturn.status, - tool_return: toolReturn.tool_return, - ...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}), - ...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}), - }, - ], + tool_call_id: toolReturn.tool_call_id, + tool_return: toolReturn.tool_return, + status: toolReturn.status, + ...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}), + ...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}), + tool_returns: [toolReturn], session_id: runtime.sessionId, uuid: `${uuidPrefix}-${crypto.randomUUID()}`, } as unknown as MessageWire); @@ -1246,31 +1092,7 @@ function populateInterruptQueue( if (input.lastExecutionResults && input.lastExecutionResults.length > 0) { // Path A: execution happened before cancel — queue actual results - // Guard parity: interrupted tool returns must persist as status=error. - runtime.pendingInterruptedResults = normalizeInterruptedApprovalsForQueue( - input.lastExecutionResults, - input.lastExecutingToolCallIds, - ); - runtime.pendingInterruptedContext = { - agentId: input.agentId, - conversationId: input.conversationId, - continuationEpoch: runtime.continuationEpoch, - }; - return true; - } - - // Path A.5: execution was in-flight (approved tools started) but no - // terminal results were captured before cancel. Match App/headless parity by - // queuing explicit tool errors, not synthetic approval denials. - if (input.lastExecutingToolCallIds.length > 0) { - runtime.pendingInterruptedResults = input.lastExecutingToolCallIds.map( - (toolCallId) => ({ - type: "tool" as const, - tool_call_id: toolCallId, - tool_return: INTERRUPTED_BY_USER, - status: "error" as const, - }), - ); + runtime.pendingInterruptedResults = input.lastExecutionResults; runtime.pendingInterruptedContext = { agentId: input.agentId, conversationId: input.conversationId, @@ -2466,7 +2288,6 @@ async function handleIncomingMessage( // Track last approval-loop state for cancel-time queueing (Phase 1.2). // Hoisted before try so the cancel catch block can access them. let lastExecutionResults: ApprovalResult[] | null = null; - let lastExecutingToolCallIds: string[] = []; let lastNeedsUserInputToolCallIds: string[] = []; runtime.isProcessing = true; @@ -2656,18 +2477,12 @@ async function handleIncomingMessage( otid?: string; id?: string; }; - const normalizedChunk = normalizeToolReturnWireMessage( - chunk as unknown as Record, - ); - if (normalizedChunk) { - emitToWS(socket, { - ...normalizedChunk, - type: "message", - session_id: runtime.sessionId, - uuid: - chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), - } as unknown as MessageWire); - } + emitToWS(socket, { + ...chunk, + type: "message", + session_id: runtime.sessionId, + uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), + } as MessageWire); } return undefined; @@ -3060,15 +2875,6 @@ async function handleIncomingMessage( } } - // Snapshot executing tool_call_ids before execution starts so cancel can - // preserve tool-error parity even if execution aborts mid-await. - lastExecutingToolCallIds = decisions - .filter( - (decision): decision is Extract => - decision.type === "approve", - ) - .map((decision) => decision.approval.toolCallId); - // Execute approved/denied tools const executionResults = await executeApprovalBatch( decisions, @@ -3116,7 +2922,6 @@ async function handleIncomingMessage( // cancel during the subsequent stream drain won't queue already-sent // results (Path A) or re-deny already-resolved tool calls (Path B). lastExecutionResults = null; - lastExecutingToolCallIds = []; lastNeedsUserInputToolCallIds = []; turnToolContextId = getStreamToolContextId( @@ -3128,7 +2933,6 @@ async function handleIncomingMessage( // Queue interrupted tool-call resolutions for the next message turn. populateInterruptQueue(runtime, { lastExecutionResults, - lastExecutingToolCallIds, lastNeedsUserInputToolCallIds, agentId: agentId || "", conversationId, @@ -3274,6 +3078,5 @@ export const __listenClientTestUtils = { extractInterruptToolReturns, emitInterruptToolReturnMessage, getInterruptApprovalsForEmission, - normalizeToolReturnWireMessage, shouldAttemptPostStopApprovalRecovery, };