diff --git a/src/agent/approval-result-normalization.ts b/src/agent/approval-result-normalization.ts new file mode 100644 index 0000000..52a612a --- /dev/null +++ b/src/agent/approval-result-normalization.ts @@ -0,0 +1,123 @@ +import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents"; +import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; +import { INTERRUPTED_BY_USER } from "../constants"; +import type { ApprovalResult } from "./approval-execution"; + +type OutgoingMessage = MessageCreate | ApprovalCreate; + +export type ApprovalNormalizationOptions = { + /** + * Structured interrupt provenance: tool_call_ids known to have been interrupted. + * When provided, these IDs are forced to persist as status=error. + */ + interruptedToolCallIds?: Iterable; + /** + * Temporary fallback guard for legacy drift where tool_return text is the only + * interrupt signal. Keep false by default for strict structured behavior. + */ + allowInterruptTextFallback?: boolean; +}; + +function normalizeToolReturnText(value: unknown): string { + if (typeof value === "string") return value; + + if (Array.isArray(value)) { + const text = value + .filter( + (part): part is { type: "text"; text: string } => + !!part && + typeof part === "object" && + "type" in part && + (part as { type?: unknown }).type === "text" && + "text" in part && + typeof (part as { text?: unknown }).text === "string", + ) + .map((part) => part.text) + .join("\n") + .trim(); + return text; + } + + if (value === null || value === undefined) return ""; + + try { + return JSON.stringify(value); + } catch { + return String(value); + } +} + +export function normalizeApprovalResultsForPersistence( + approvals: ApprovalResult[] | null | undefined, + options: ApprovalNormalizationOptions = {}, +): ApprovalResult[] { + if (!approvals || approvals.length === 0) return approvals ?? []; + + const interruptedSet = new Set(options.interruptedToolCallIds ?? []); + + return approvals.map((approval) => { + if ( + !approval || + typeof approval !== "object" || + !("type" in approval) || + approval.type !== "tool" + ) { + return approval; + } + + const toolCallId = + "tool_call_id" in approval && typeof approval.tool_call_id === "string" + ? approval.tool_call_id + : ""; + + const interruptedByStructuredId = + toolCallId.length > 0 && interruptedSet.has(toolCallId); + const interruptedByLegacyText = options.allowInterruptTextFallback + ? normalizeToolReturnText( + "tool_return" in approval ? approval.tool_return : "", + ) === INTERRUPTED_BY_USER + : false; + + if ( + (interruptedByStructuredId || interruptedByLegacyText) && + "status" in approval && + approval.status !== "error" + ) { + return { + ...approval, + status: "error" as const, + }; + } + + return approval; + }); +} + +export function normalizeOutgoingApprovalMessages( + messages: OutgoingMessage[], + options: ApprovalNormalizationOptions = {}, +): OutgoingMessage[] { + if (!messages || messages.length === 0) return messages; + + return messages.map((message) => { + if ( + !message || + typeof message !== "object" || + !("type" in message) || + message.type !== "approval" || + !("approvals" in message) + ) { + return message; + } + + const normalizedApprovals = normalizeApprovalResultsForPersistence( + message.approvals as ApprovalResult[], + options, + ); + + return { + ...message, + approvals: normalizedApprovals, + } as ApprovalCreate; + }); +} diff --git a/src/agent/message.ts b/src/agent/message.ts index e6e22b9..2f722a8 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -13,6 +13,10 @@ import { waitForToolsetReady, } from "../tools/manager"; import { isTimingsEnabled } from "../utils/timing"; +import { + type ApprovalNormalizationOptions, + normalizeOutgoingApprovalMessages, +} from "./approval-result-normalization"; import { getClient } from "./client"; const streamRequestStartTimes = new WeakMap(); @@ -58,6 +62,7 @@ export async function sendMessageStream( streamTokens?: boolean; background?: boolean; agentId?: string; // Required when conversationId is "default" + approvalNormalization?: ApprovalNormalizationOptions; } = { streamTokens: true, background: true }, // Disable SDK retries by default - state management happens outside the stream, // so retries would violate idempotency and create race conditions @@ -82,9 +87,13 @@ export async function sendMessageStream( } const resolvedConversationId = conversationId; + const normalizedMessages = normalizeOutgoingApprovalMessages( + messages, + opts.approvalNormalization, + ); const requestBody = { - messages, + messages: normalizedMessages, streaming: true, stream_tokens: opts.streamTokens ?? true, background: opts.background ?? true, diff --git a/src/tests/agent/approval-result-normalization.test.ts b/src/tests/agent/approval-result-normalization.test.ts new file mode 100644 index 0000000..1093ffe --- /dev/null +++ b/src/tests/agent/approval-result-normalization.test.ts @@ -0,0 +1,103 @@ +import { describe, expect, test } from "bun:test"; +import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; +import type { ApprovalResult } from "../../agent/approval-execution"; +import { + normalizeApprovalResultsForPersistence, + normalizeOutgoingApprovalMessages, +} from "../../agent/approval-result-normalization"; +import { INTERRUPTED_BY_USER } from "../../constants"; + +describe("normalizeApprovalResultsForPersistence", () => { + test("forces status=error for structured interrupted tool_call_ids", () => { + const approvals: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-1", + tool_return: "some return", + status: "success", + } as ApprovalResult, + ]; + + const normalized = normalizeApprovalResultsForPersistence(approvals, { + interruptedToolCallIds: ["call-1"], + }); + + expect(normalized[0]).toMatchObject({ + type: "tool", + tool_call_id: "call-1", + status: "error", + }); + }); + + test("does not modify non-interrupted tool results", () => { + const approvals: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-2", + tool_return: "ok", + status: "success", + } as ApprovalResult, + ]; + + const normalized = normalizeApprovalResultsForPersistence(approvals, { + interruptedToolCallIds: ["other-id"], + }); + + expect(normalized[0]).toMatchObject({ + type: "tool", + tool_call_id: "call-2", + status: "success", + }); + }); + + test("supports legacy fallback on interrupt text when explicitly enabled", () => { + const approvals: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-3", + tool_return: [{ type: "text", text: INTERRUPTED_BY_USER }], + status: "success", + } as ApprovalResult, + ]; + + const normalized = normalizeApprovalResultsForPersistence(approvals, { + allowInterruptTextFallback: true, + }); + + expect(normalized[0]).toMatchObject({ + type: "tool", + tool_call_id: "call-3", + status: "error", + }); + }); +}); + +describe("normalizeOutgoingApprovalMessages", () => { + test("normalizes approvals and preserves non-approval messages", () => { + const approvalMessage: ApprovalCreate = { + type: "approval", + approvals: [ + { + type: "tool", + tool_call_id: "call-7", + tool_return: "foo", + status: "success", + } as ApprovalResult, + ], + }; + + const messages = normalizeOutgoingApprovalMessages( + [{ role: "user", content: "hello" }, approvalMessage], + { interruptedToolCallIds: ["call-7"] }, + ); + + expect(messages[0]).toMatchObject({ role: "user", content: "hello" }); + const normalizedApproval = messages[1] as ApprovalCreate; + const approvals = normalizedApproval.approvals ?? []; + expect(approvals[0]).toMatchObject({ + type: "tool", + tool_call_id: "call-7", + status: "error", + }); + }); +}); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index 2677373..beef0b9 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -640,3 +640,44 @@ describe("listen-client post-stop approval recovery policy", () => { expect(shouldRecover).toBe(false); }); }); + +describe("listen-client tool_return wire normalization", () => { + test("normalizes legacy top-level tool return fields to canonical tool_returns[]", () => { + const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({ + message_type: "tool_return_message", + id: "message-1", + run_id: "run-1", + tool_call_id: "call-1", + status: "error", + tool_return: [{ type: "text", text: "Interrupted by user" }], + }); + + expect(normalized).toEqual({ + message_type: "tool_return_message", + id: "message-1", + run_id: "run-1", + tool_returns: [ + { + tool_call_id: "call-1", + status: "error", + tool_return: "Interrupted by user", + }, + ], + }); + expect(normalized).not.toHaveProperty("tool_call_id"); + expect(normalized).not.toHaveProperty("status"); + expect(normalized).not.toHaveProperty("tool_return"); + }); + + test("returns null for tool_return_message when no canonical status is available", () => { + const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({ + message_type: "tool_return_message", + id: "message-2", + run_id: "run-2", + tool_call_id: "call-2", + tool_return: "maybe done", + }); + + expect(normalized).toBeNull(); + }); +}); diff --git a/src/tests/websocket/listen-interrupt-queue.test.ts b/src/tests/websocket/listen-interrupt-queue.test.ts index cffc612..4b3fb6a 100644 --- a/src/tests/websocket/listen-interrupt-queue.test.ts +++ b/src/tests/websocket/listen-interrupt-queue.test.ts @@ -178,6 +178,29 @@ describe("extractInterruptToolReturns", () => { ]); }); + test("converts multimodal tool_return content into displayable text", () => { + const results: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-multimodal", + status: "error", + tool_return: [ + { type: "text", text: "Interrupted by user" }, + { type: "image", image_url: "https://example.com/image.png" }, + ], + } as ApprovalResult, + ]; + + const mapped = extractInterruptToolReturns(results); + expect(mapped).toEqual([ + { + tool_call_id: "call-multimodal", + status: "error", + tool_return: "Interrupted by user", + }, + ]); + }); + test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => { const runtime = createRuntime(); const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; @@ -208,16 +231,26 @@ describe("extractInterruptToolReturns", () => { expect(toolReturnFrames).toHaveLength(2); expect(toolReturnFrames[0]).toMatchObject({ run_id: "run-1", - tool_call_id: "call-a", - status: "success", - tool_returns: [{ tool_call_id: "call-a", status: "success" }], + tool_returns: [ + { tool_call_id: "call-a", status: "success", tool_return: "704" }, + ], }); expect(toolReturnFrames[1]).toMatchObject({ run_id: "run-1", - tool_call_id: "call-b", - status: "error", - tool_returns: [{ tool_call_id: "call-b", status: "error" }], + tool_returns: [ + { + tool_call_id: "call-b", + status: "error", + tool_return: "User interrupted the stream", + }, + ], }); + expect(toolReturnFrames[0]).not.toHaveProperty("tool_call_id"); + expect(toolReturnFrames[0]).not.toHaveProperty("status"); + expect(toolReturnFrames[0]).not.toHaveProperty("tool_return"); + expect(toolReturnFrames[1]).not.toHaveProperty("tool_call_id"); + expect(toolReturnFrames[1]).not.toHaveProperty("status"); + expect(toolReturnFrames[1]).not.toHaveProperty("tool_return"); }); }); @@ -305,13 +338,14 @@ describe("Path A: cancel during tool execution → next turn consumes actual res // Cancel fires: populateInterruptQueue (Path A — has execution results) const populated = populateInterruptQueue(runtime, { lastExecutionResults: executionResults, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: ["call-1", "call-2"], agentId, conversationId, }); expect(populated).toBe(true); - expect(runtime.pendingInterruptedResults).toBe(executionResults); + expect(runtime.pendingInterruptedResults).toEqual(executionResults); expect(runtime.pendingInterruptedContext).toMatchObject({ agentId, conversationId, @@ -323,7 +357,7 @@ describe("Path A: cancel during tool execution → next turn consumes actual res expect(consumed).not.toBeNull(); expect(consumed?.type).toBe("approval"); - expect(consumed?.approvals).toBe(executionResults); + expect(consumed?.approvals).toEqual(executionResults); expect(consumed?.approvals).toHaveLength(2); // Queue is atomically cleared after consumption @@ -342,6 +376,7 @@ describe("Path A: cancel during tool execution → next turn consumes actual res const populated = populateInterruptQueue(runtime, { lastExecutionResults: executionResults, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: ["call-1"], agentId: "agent-1", conversationId: "conv-1", @@ -353,9 +388,85 @@ describe("Path A: cancel during tool execution → next turn consumes actual res approve: true, // Path A preserves actual approval state }); }); + + test("normalizes interrupted tool results to error via structured tool_call_id", () => { + const runtime = createRuntime(); + const executionResults: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-1", + status: "success", + tool_return: "result text does not matter when ID is interrupted", + } as unknown as ApprovalResult, + ]; + + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: executionResults, + lastExecutingToolCallIds: ["call-1"], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ + type: "tool", + tool_call_id: "call-1", + status: "error", + }); + }); + + test("keeps legacy text fallback for interrupted tool return normalization", () => { + const runtime = createRuntime(); + const executionResults: ApprovalResult[] = [ + { + type: "tool", + tool_call_id: "call-legacy", + status: "success", + tool_return: [{ type: "text", text: "Interrupted by user" }], + } as unknown as ApprovalResult, + ]; + + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: executionResults, + lastExecutingToolCallIds: [], + lastNeedsUserInputToolCallIds: [], + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({ + type: "tool", + tool_call_id: "call-legacy", + status: "error", + }); + }); }); describe("Path B: cancel during approval wait → next turn consumes synthesized denials", () => { + test("prefers synthesized tool-error results when execution was already in-flight", () => { + const runtime = createRuntime(); + + const populated = populateInterruptQueue(runtime, { + lastExecutionResults: null, + lastExecutingToolCallIds: ["call-running-1"], + lastNeedsUserInputToolCallIds: ["call-running-1"], + agentId: "agent-1", + conversationId: "conv-1", + }); + + expect(populated).toBe(true); + expect(runtime.pendingInterruptedResults).toEqual([ + { + type: "tool", + tool_call_id: "call-running-1", + tool_return: "Interrupted by user", + status: "error", + }, + ]); + }); + test("full sequence: populate from batch map IDs → consume synthesized denials", () => { const runtime = createRuntime(); const agentId = "agent-abc"; @@ -371,6 +482,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized // Cancel fires during approval wait: no execution results const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId, conversationId, @@ -409,6 +521,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized // No batch map entries, but we have the snapshot IDs const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: ["call-a", "call-b"], agentId: "agent-1", conversationId: "conv-1", @@ -427,6 +540,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -453,6 +567,7 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", ( reason: "cancelled", }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId, conversationId: convId, @@ -476,6 +591,7 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", ( lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId, conversationId: convId, @@ -496,6 +612,7 @@ describe("idempotency: first cancel populates, second is no-op", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-first", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -511,6 +628,7 @@ describe("idempotency: first cancel populates, second is no-op", () => { reason: "x", }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -530,6 +648,7 @@ describe("idempotency: first cancel populates, second is no-op", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -543,6 +662,7 @@ describe("idempotency: first cancel populates, second is no-op", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-2", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -564,6 +684,7 @@ describe("epoch guard: stale context discarded on consume", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -587,6 +708,7 @@ describe("epoch guard: stale context discarded on consume", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-old", conversationId: "conv-1", @@ -605,6 +727,7 @@ describe("epoch guard: stale context discarded on consume", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-old", @@ -623,6 +746,7 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial", // Also batch map should be cleared by clearPendingApprovalBatchIds const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], // cleared after send agentId: "agent-1", conversationId: "conv-1", @@ -640,6 +764,7 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial", const populated = populateInterruptQueue(runtime, { lastExecutionResults: null, + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], // cleared from previous send agentId: "agent-1", conversationId: "conv-1", @@ -716,6 +841,7 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-1", conversationId: "conv-1", @@ -734,6 +860,7 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => { lastExecutionResults: [ { type: "approval", tool_call_id: "call-1", approve: true }, ], + lastExecutingToolCallIds: [], lastNeedsUserInputToolCallIds: [], agentId: "agent-old", conversationId: "conv-old", diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 774e079..6bd1311 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -18,6 +18,7 @@ import { executeApprovalBatch, } from "../agent/approval-execution"; import { fetchRunErrorDetail } from "../agent/approval-recovery"; +import { normalizeApprovalResultsForPersistence } from "../agent/approval-result-normalization"; import { getResumeData } from "../agent/check-approval"; import { getClient } from "../agent/client"; import { getStreamToolContextId, sendMessageStream } from "../agent/message"; @@ -34,6 +35,7 @@ import { createBuffers } from "../cli/helpers/accumulator"; import { classifyApprovals } from "../cli/helpers/approvalClassification"; import { generatePlanFilePath } from "../cli/helpers/planName"; import { drainStreamWithResume } from "../cli/helpers/stream"; +import { INTERRUPTED_BY_USER } from "../constants"; import { computeDiffPreviews } from "../helpers/diffPreview"; import { permissionMode } from "../permissions/mode"; import { type QueueItem, QueueRuntime } from "../queue/queueRuntime"; @@ -907,6 +909,7 @@ function shouldAttemptPostStopApprovalRecovery(params: { interface InterruptPopulateInput { lastExecutionResults: ApprovalResult[] | null; + lastExecutingToolCallIds: string[]; lastNeedsUserInputToolCallIds: string[]; agentId: string; conversationId: string; @@ -920,10 +923,48 @@ interface InterruptToolReturn { stderr?: string[]; } +function asToolReturnStatus(value: unknown): "success" | "error" | null { + if (value === "success" || value === "error") { + return value; + } + return null; +} + function normalizeToolReturnValue(value: unknown): string { if (typeof value === "string") { return value; } + if (Array.isArray(value)) { + const textParts = value + .filter( + ( + part, + ): part is { + type: string; + text: string; + } => + !!part && + typeof part === "object" && + "type" in part && + part.type === "text" && + "text" in part && + typeof part.text === "string", + ) + .map((part) => part.text); + if (textParts.length > 0) { + return textParts.join("\n"); + } + } + if ( + value && + typeof value === "object" && + "type" in value && + value.type === "text" && + "text" in value && + typeof value.text === "string" + ) { + return value.text; + } if (value === null || value === undefined) { return ""; } @@ -934,6 +975,116 @@ function normalizeToolReturnValue(value: unknown): string { } } +function normalizeInterruptedApprovalsForQueue( + approvals: ApprovalResult[] | null, + interruptedToolCallIds: string[], +): ApprovalResult[] | null { + if (!approvals || approvals.length === 0) { + return approvals; + } + + return normalizeApprovalResultsForPersistence(approvals, { + interruptedToolCallIds, + // Temporary fallback guard while all producers migrate to structured IDs. + allowInterruptTextFallback: true, + }); +} + +function extractCanonicalToolReturnsFromWire( + payload: Record, +): InterruptToolReturn[] { + const fromArray: InterruptToolReturn[] = []; + const toolReturnsValue = payload.tool_returns; + if (Array.isArray(toolReturnsValue)) { + for (const raw of toolReturnsValue) { + if (!raw || typeof raw !== "object") { + continue; + } + const rec = raw as Record; + const toolCallId = + typeof rec.tool_call_id === "string" ? rec.tool_call_id : null; + const status = asToolReturnStatus(rec.status); + if (!toolCallId || !status) { + continue; + } + const stdout = Array.isArray(rec.stdout) + ? rec.stdout.filter( + (entry): entry is string => typeof entry === "string", + ) + : undefined; + const stderr = Array.isArray(rec.stderr) + ? rec.stderr.filter( + (entry): entry is string => typeof entry === "string", + ) + : undefined; + fromArray.push({ + tool_call_id: toolCallId, + status, + tool_return: normalizeToolReturnValue(rec.tool_return), + ...(stdout ? { stdout } : {}), + ...(stderr ? { stderr } : {}), + }); + } + } + if (fromArray.length > 0) { + return fromArray; + } + + const topLevelToolCallId = + typeof payload.tool_call_id === "string" ? payload.tool_call_id : null; + const topLevelStatus = asToolReturnStatus(payload.status); + if (!topLevelToolCallId || !topLevelStatus) { + return []; + } + const stdout = Array.isArray(payload.stdout) + ? payload.stdout.filter( + (entry): entry is string => typeof entry === "string", + ) + : undefined; + const stderr = Array.isArray(payload.stderr) + ? payload.stderr.filter( + (entry): entry is string => typeof entry === "string", + ) + : undefined; + return [ + { + tool_call_id: topLevelToolCallId, + status: topLevelStatus, + tool_return: normalizeToolReturnValue(payload.tool_return), + ...(stdout ? { stdout } : {}), + ...(stderr ? { stderr } : {}), + }, + ]; +} + +function normalizeToolReturnWireMessage( + chunk: Record, +): Record | null { + if (chunk.message_type !== "tool_return_message") { + return chunk; + } + + const canonicalToolReturns = extractCanonicalToolReturnsFromWire(chunk); + if (canonicalToolReturns.length === 0) { + return null; + } + + const { + tool_call_id: _toolCallId, + status: _status, + tool_return: _toolReturn, + stdout: _stdout, + stderr: _stderr, + ...rest + } = chunk; + + return { + ...rest, + message_type: "tool_return_message", + tool_returns: canonicalToolReturns, + }; +} + function extractInterruptToolReturns( approvals: ApprovalResult[] | null, ): InterruptToolReturn[] { @@ -1030,12 +1181,15 @@ function emitInterruptToolReturnMessage( id: `message-${crypto.randomUUID()}`, date: new Date().toISOString(), run_id: resolvedRunId, - tool_call_id: toolReturn.tool_call_id, - tool_return: toolReturn.tool_return, - status: toolReturn.status, - ...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}), - ...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}), - tool_returns: [toolReturn], + tool_returns: [ + { + tool_call_id: toolReturn.tool_call_id, + status: toolReturn.status, + tool_return: toolReturn.tool_return, + ...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}), + ...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}), + }, + ], session_id: runtime.sessionId, uuid: `${uuidPrefix}-${crypto.randomUUID()}`, } as unknown as MessageWire); @@ -1092,7 +1246,31 @@ function populateInterruptQueue( if (input.lastExecutionResults && input.lastExecutionResults.length > 0) { // Path A: execution happened before cancel — queue actual results - runtime.pendingInterruptedResults = input.lastExecutionResults; + // Guard parity: interrupted tool returns must persist as status=error. + runtime.pendingInterruptedResults = normalizeInterruptedApprovalsForQueue( + input.lastExecutionResults, + input.lastExecutingToolCallIds, + ); + runtime.pendingInterruptedContext = { + agentId: input.agentId, + conversationId: input.conversationId, + continuationEpoch: runtime.continuationEpoch, + }; + return true; + } + + // Path A.5: execution was in-flight (approved tools started) but no + // terminal results were captured before cancel. Match App/headless parity by + // queuing explicit tool errors, not synthetic approval denials. + if (input.lastExecutingToolCallIds.length > 0) { + runtime.pendingInterruptedResults = input.lastExecutingToolCallIds.map( + (toolCallId) => ({ + type: "tool" as const, + tool_call_id: toolCallId, + tool_return: INTERRUPTED_BY_USER, + status: "error" as const, + }), + ); runtime.pendingInterruptedContext = { agentId: input.agentId, conversationId: input.conversationId, @@ -2288,6 +2466,7 @@ async function handleIncomingMessage( // Track last approval-loop state for cancel-time queueing (Phase 1.2). // Hoisted before try so the cancel catch block can access them. let lastExecutionResults: ApprovalResult[] | null = null; + let lastExecutingToolCallIds: string[] = []; let lastNeedsUserInputToolCallIds: string[] = []; runtime.isProcessing = true; @@ -2477,12 +2656,18 @@ async function handleIncomingMessage( otid?: string; id?: string; }; - emitToWS(socket, { - ...chunk, - type: "message", - session_id: runtime.sessionId, - uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), - } as MessageWire); + const normalizedChunk = normalizeToolReturnWireMessage( + chunk as unknown as Record, + ); + if (normalizedChunk) { + emitToWS(socket, { + ...normalizedChunk, + type: "message", + session_id: runtime.sessionId, + uuid: + chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), + } as unknown as MessageWire); + } } return undefined; @@ -2875,6 +3060,15 @@ async function handleIncomingMessage( } } + // Snapshot executing tool_call_ids before execution starts so cancel can + // preserve tool-error parity even if execution aborts mid-await. + lastExecutingToolCallIds = decisions + .filter( + (decision): decision is Extract => + decision.type === "approve", + ) + .map((decision) => decision.approval.toolCallId); + // Execute approved/denied tools const executionResults = await executeApprovalBatch( decisions, @@ -2922,6 +3116,7 @@ async function handleIncomingMessage( // cancel during the subsequent stream drain won't queue already-sent // results (Path A) or re-deny already-resolved tool calls (Path B). lastExecutionResults = null; + lastExecutingToolCallIds = []; lastNeedsUserInputToolCallIds = []; turnToolContextId = getStreamToolContextId( @@ -2933,6 +3128,7 @@ async function handleIncomingMessage( // Queue interrupted tool-call resolutions for the next message turn. populateInterruptQueue(runtime, { lastExecutionResults, + lastExecutingToolCallIds, lastNeedsUserInputToolCallIds, agentId: agentId || "", conversationId, @@ -3078,5 +3274,6 @@ export const __listenClientTestUtils = { extractInterruptToolReturns, emitInterruptToolReturnMessage, getInterruptApprovalsForEmission, + normalizeToolReturnWireMessage, shouldAttemptPostStopApprovalRecovery, };