fix(listen): preserve interrupt error status through next-turn persistence (#1294)

This commit is contained in:
Charles Packer
2026-03-05 22:29:08 -08:00
committed by GitHub
parent cc6f754ca3
commit 52f2cc9924
6 changed files with 918 additions and 58 deletions

View File

@@ -0,0 +1,123 @@
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import { INTERRUPTED_BY_USER } from "../constants";
import type { ApprovalResult } from "./approval-execution";
type OutgoingMessage = MessageCreate | ApprovalCreate;
export type ApprovalNormalizationOptions = {
/**
* Structured interrupt provenance: tool_call_ids known to have been interrupted.
* When provided, these IDs are forced to persist as status=error.
*/
interruptedToolCallIds?: Iterable<string>;
/**
* Temporary fallback guard for legacy drift where tool_return text is the only
* interrupt signal. Keep false by default for strict structured behavior.
*/
allowInterruptTextFallback?: boolean;
};
function normalizeToolReturnText(value: unknown): string {
if (typeof value === "string") return value;
if (Array.isArray(value)) {
const text = value
.filter(
(part): part is { type: "text"; text: string } =>
!!part &&
typeof part === "object" &&
"type" in part &&
(part as { type?: unknown }).type === "text" &&
"text" in part &&
typeof (part as { text?: unknown }).text === "string",
)
.map((part) => part.text)
.join("\n")
.trim();
return text;
}
if (value === null || value === undefined) return "";
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
export function normalizeApprovalResultsForPersistence(
approvals: ApprovalResult[] | null | undefined,
options: ApprovalNormalizationOptions = {},
): ApprovalResult[] {
if (!approvals || approvals.length === 0) return approvals ?? [];
const interruptedSet = new Set(options.interruptedToolCallIds ?? []);
return approvals.map((approval) => {
if (
!approval ||
typeof approval !== "object" ||
!("type" in approval) ||
approval.type !== "tool"
) {
return approval;
}
const toolCallId =
"tool_call_id" in approval && typeof approval.tool_call_id === "string"
? approval.tool_call_id
: "";
const interruptedByStructuredId =
toolCallId.length > 0 && interruptedSet.has(toolCallId);
const interruptedByLegacyText = options.allowInterruptTextFallback
? normalizeToolReturnText(
"tool_return" in approval ? approval.tool_return : "",
) === INTERRUPTED_BY_USER
: false;
if (
(interruptedByStructuredId || interruptedByLegacyText) &&
"status" in approval &&
approval.status !== "error"
) {
return {
...approval,
status: "error" as const,
};
}
return approval;
});
}
export function normalizeOutgoingApprovalMessages(
messages: OutgoingMessage[],
options: ApprovalNormalizationOptions = {},
): OutgoingMessage[] {
if (!messages || messages.length === 0) return messages;
return messages.map((message) => {
if (
!message ||
typeof message !== "object" ||
!("type" in message) ||
message.type !== "approval" ||
!("approvals" in message)
) {
return message;
}
const normalizedApprovals = normalizeApprovalResultsForPersistence(
message.approvals as ApprovalResult[],
options,
);
return {
...message,
approvals: normalizedApprovals,
} as ApprovalCreate;
});
}

View File

@@ -9,10 +9,15 @@ import type {
LettaStreamingResponse, LettaStreamingResponse,
} from "@letta-ai/letta-client/resources/agents/messages"; } from "@letta-ai/letta-client/resources/agents/messages";
import { import {
type ClientTool,
captureToolExecutionContext, captureToolExecutionContext,
waitForToolsetReady, waitForToolsetReady,
} from "../tools/manager"; } from "../tools/manager";
import { isTimingsEnabled } from "../utils/timing"; import { isTimingsEnabled } from "../utils/timing";
import {
type ApprovalNormalizationOptions,
normalizeOutgoingApprovalMessages,
} from "./approval-result-normalization";
import { getClient } from "./client"; import { getClient } from "./client";
const streamRequestStartTimes = new WeakMap<object, number>(); const streamRequestStartTimes = new WeakMap<object, number>();
@@ -43,6 +48,40 @@ export function getStreamRequestContext(
return streamRequestContexts.get(stream as object); return streamRequestContexts.get(stream as object);
} }
export type SendMessageStreamOptions = {
streamTokens?: boolean;
background?: boolean;
agentId?: string; // Required when conversationId is "default"
approvalNormalization?: ApprovalNormalizationOptions;
};
export function buildConversationMessagesCreateRequestBody(
conversationId: string,
messages: Array<MessageCreate | ApprovalCreate>,
opts: SendMessageStreamOptions = { streamTokens: true, background: true },
clientTools: ClientTool[],
) {
const isDefaultConversation = conversationId === "default";
if (isDefaultConversation && !opts.agentId) {
throw new Error(
"agentId is required in opts when using default conversation",
);
}
return {
messages: normalizeOutgoingApprovalMessages(
messages,
opts.approvalNormalization,
),
streaming: true,
stream_tokens: opts.streamTokens ?? true,
background: opts.background ?? true,
client_tools: clientTools,
include_compaction_messages: true,
...(isDefaultConversation ? { agent_id: opts.agentId } : {}),
};
}
/** /**
* Send a message to a conversation and return a streaming response. * Send a message to a conversation and return a streaming response.
* Uses the conversations API for all conversations. * Uses the conversations API for all conversations.
@@ -54,11 +93,7 @@ export function getStreamRequestContext(
export async function sendMessageStream( export async function sendMessageStream(
conversationId: string, conversationId: string,
messages: Array<MessageCreate | ApprovalCreate>, messages: Array<MessageCreate | ApprovalCreate>,
opts: { opts: SendMessageStreamOptions = { streamTokens: true, background: true },
streamTokens?: boolean;
background?: boolean;
agentId?: string; // Required when conversationId is "default"
} = { streamTokens: true, background: true },
// Disable SDK retries by default - state management happens outside the stream, // Disable SDK retries by default - state management happens outside the stream,
// so retries would violate idempotency and create race conditions // so retries would violate idempotency and create race conditions
requestOptions: { maxRetries?: number; signal?: AbortSignal } = { requestOptions: { maxRetries?: number; signal?: AbortSignal } = {
@@ -74,24 +109,13 @@ export async function sendMessageStream(
await waitForToolsetReady(); await waitForToolsetReady();
const { clientTools, contextId } = captureToolExecutionContext(); const { clientTools, contextId } = captureToolExecutionContext();
const isDefaultConversation = conversationId === "default";
if (isDefaultConversation && !opts.agentId) {
throw new Error(
"agentId is required in opts when using default conversation",
);
}
const resolvedConversationId = conversationId; const resolvedConversationId = conversationId;
const requestBody = buildConversationMessagesCreateRequestBody(
const requestBody = { conversationId,
messages, messages,
streaming: true, opts,
stream_tokens: opts.streamTokens ?? true, clientTools,
background: opts.background ?? true, );
client_tools: clientTools,
include_compaction_messages: true,
...(isDefaultConversation ? { agent_id: opts.agentId } : {}),
};
if (process.env.DEBUG) { if (process.env.DEBUG) {
console.log( console.log(

View File

@@ -0,0 +1,103 @@
import { describe, expect, test } from "bun:test";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import type { ApprovalResult } from "../../agent/approval-execution";
import {
normalizeApprovalResultsForPersistence,
normalizeOutgoingApprovalMessages,
} from "../../agent/approval-result-normalization";
import { INTERRUPTED_BY_USER } from "../../constants";
describe("normalizeApprovalResultsForPersistence", () => {
test("forces status=error for structured interrupted tool_call_ids", () => {
const approvals: ApprovalResult[] = [
{
type: "tool",
tool_call_id: "call-1",
tool_return: "some return",
status: "success",
} as ApprovalResult,
];
const normalized = normalizeApprovalResultsForPersistence(approvals, {
interruptedToolCallIds: ["call-1"],
});
expect(normalized[0]).toMatchObject({
type: "tool",
tool_call_id: "call-1",
status: "error",
});
});
test("does not modify non-interrupted tool results", () => {
const approvals: ApprovalResult[] = [
{
type: "tool",
tool_call_id: "call-2",
tool_return: "ok",
status: "success",
} as ApprovalResult,
];
const normalized = normalizeApprovalResultsForPersistence(approvals, {
interruptedToolCallIds: ["other-id"],
});
expect(normalized[0]).toMatchObject({
type: "tool",
tool_call_id: "call-2",
status: "success",
});
});
test("supports legacy fallback on interrupt text when explicitly enabled", () => {
const approvals: ApprovalResult[] = [
{
type: "tool",
tool_call_id: "call-3",
tool_return: [{ type: "text", text: INTERRUPTED_BY_USER }],
status: "success",
} as ApprovalResult,
];
const normalized = normalizeApprovalResultsForPersistence(approvals, {
allowInterruptTextFallback: true,
});
expect(normalized[0]).toMatchObject({
type: "tool",
tool_call_id: "call-3",
status: "error",
});
});
});
describe("normalizeOutgoingApprovalMessages", () => {
test("normalizes approvals and preserves non-approval messages", () => {
const approvalMessage: ApprovalCreate = {
type: "approval",
approvals: [
{
type: "tool",
tool_call_id: "call-7",
tool_return: "foo",
status: "success",
} as ApprovalResult,
],
};
const messages = normalizeOutgoingApprovalMessages(
[{ role: "user", content: "hello" }, approvalMessage],
{ interruptedToolCallIds: ["call-7"] },
);
expect(messages[0]).toMatchObject({ role: "user", content: "hello" });
const normalizedApproval = messages[1] as ApprovalCreate;
const approvals = normalizedApproval.approvals ?? [];
expect(approvals[0]).toMatchObject({
type: "tool",
tool_call_id: "call-7",
status: "error",
});
});
});

View File

@@ -1,5 +1,8 @@
import { describe, expect, test } from "bun:test"; import { describe, expect, test } from "bun:test";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import WebSocket from "ws"; import WebSocket from "ws";
import { buildConversationMessagesCreateRequestBody } from "../../agent/message";
import { INTERRUPTED_BY_USER } from "../../constants";
import type { ControlRequest, ControlResponseBody } from "../../types/protocol"; import type { ControlRequest, ControlResponseBody } from "../../types/protocol";
import { import {
__listenClientTestUtils, __listenClientTestUtils,
@@ -640,3 +643,159 @@ describe("listen-client post-stop approval recovery policy", () => {
expect(shouldRecover).toBe(false); expect(shouldRecover).toBe(false);
}); });
}); });
describe("listen-client interrupt persistence normalization", () => {
test("forces interrupted in-flight tool results to status=error when cancelRequested", () => {
const runtime = __listenClientTestUtils.createRuntime();
runtime.cancelRequested = true;
const normalized =
__listenClientTestUtils.normalizeExecutionResultsForInterruptParity(
runtime,
[
{
type: "tool",
tool_call_id: "tool-1",
tool_return: "Interrupted by user",
status: "success",
},
],
["tool-1"],
);
expect(normalized).toEqual([
{
type: "tool",
tool_call_id: "tool-1",
tool_return: "Interrupted by user",
status: "error",
},
]);
});
test("leaves tool status unchanged when not in cancel flow", () => {
const runtime = __listenClientTestUtils.createRuntime();
runtime.cancelRequested = false;
const normalized =
__listenClientTestUtils.normalizeExecutionResultsForInterruptParity(
runtime,
[
{
type: "tool",
tool_call_id: "tool-1",
tool_return: "Interrupted by user",
status: "success",
},
],
["tool-1"],
);
expect(normalized).toEqual([
{
type: "tool",
tool_call_id: "tool-1",
tool_return: "Interrupted by user",
status: "success",
},
]);
});
});
describe("listen-client interrupt persistence request body", () => {
test("post-interrupt next-turn payload keeps interrupted tool returns as status=error", () => {
const runtime = __listenClientTestUtils.createRuntime();
const consumedAgentId = "agent-1";
const consumedConversationId = "default";
__listenClientTestUtils.populateInterruptQueue(runtime, {
lastExecutionResults: null,
lastExecutingToolCallIds: ["call-running-1"],
lastNeedsUserInputToolCallIds: [],
agentId: consumedAgentId,
conversationId: consumedConversationId,
});
const consumed = __listenClientTestUtils.consumeInterruptQueue(
runtime,
consumedAgentId,
consumedConversationId,
);
expect(consumed).not.toBeNull();
if (!consumed) {
throw new Error("Expected queued interrupt approvals to be consumed");
}
const requestBody = buildConversationMessagesCreateRequestBody(
consumedConversationId,
[
consumed.approvalMessage,
{
type: "message",
role: "user",
content: "next user message after interrupt",
},
],
{
agentId: consumedAgentId,
streamTokens: true,
background: true,
approvalNormalization: {
interruptedToolCallIds: consumed.interruptedToolCallIds,
},
},
[],
);
const approvalMessage = requestBody.messages[0] as ApprovalCreate;
expect(approvalMessage.type).toBe("approval");
expect(approvalMessage.approvals?.[0]).toMatchObject({
type: "tool",
tool_call_id: "call-running-1",
tool_return: INTERRUPTED_BY_USER,
status: "error",
});
});
});
describe("listen-client tool_return wire normalization", () => {
test("normalizes legacy top-level tool return fields to canonical tool_returns[]", () => {
const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({
message_type: "tool_return_message",
id: "message-1",
run_id: "run-1",
tool_call_id: "call-1",
status: "error",
tool_return: [{ type: "text", text: "Interrupted by user" }],
});
expect(normalized).toEqual({
message_type: "tool_return_message",
id: "message-1",
run_id: "run-1",
tool_returns: [
{
tool_call_id: "call-1",
status: "error",
tool_return: "Interrupted by user",
},
],
});
expect(normalized).not.toHaveProperty("tool_call_id");
expect(normalized).not.toHaveProperty("status");
expect(normalized).not.toHaveProperty("tool_return");
});
test("returns null for tool_return_message when no canonical status is available", () => {
const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({
message_type: "tool_return_message",
id: "message-2",
run_id: "run-2",
tool_call_id: "call-2",
tool_return: "maybe done",
});
expect(normalized).toBeNull();
});
});

View File

@@ -62,12 +62,14 @@ describe("ListenerRuntime interrupt queue fields", () => {
const runtime = createRuntime(); const runtime = createRuntime();
expect(runtime.pendingInterruptedResults).toBeNull(); expect(runtime.pendingInterruptedResults).toBeNull();
expect(runtime.pendingInterruptedContext).toBeNull(); expect(runtime.pendingInterruptedContext).toBeNull();
expect(runtime.pendingInterruptedToolCallIds).toBeNull();
expect(runtime.activeExecutingToolCallIds).toEqual([]);
expect(runtime.continuationEpoch).toBe(0); expect(runtime.continuationEpoch).toBe(0);
}); });
}); });
describe("stopRuntime teardown", () => { describe("stopRuntime teardown", () => {
test("clears pendingInterruptedResults, context, and batch map", () => { test("clears pendingInterruptedResults, context, ids, and batch map", () => {
const runtime = createRuntime(); const runtime = createRuntime();
runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; runtime.socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
@@ -84,12 +86,16 @@ describe("stopRuntime teardown", () => {
conversationId: "conv-1", conversationId: "conv-1",
continuationEpoch: 0, continuationEpoch: 0,
}; };
runtime.pendingInterruptedToolCallIds = ["call-1"];
runtime.activeExecutingToolCallIds = ["call-1"];
runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1"); runtime.pendingApprovalBatchByToolCallId.set("call-1", "batch-1");
stopRuntime(runtime, true); stopRuntime(runtime, true);
expect(runtime.pendingInterruptedResults).toBeNull(); expect(runtime.pendingInterruptedResults).toBeNull();
expect(runtime.pendingInterruptedContext).toBeNull(); expect(runtime.pendingInterruptedContext).toBeNull();
expect(runtime.pendingInterruptedToolCallIds).toBeNull();
expect(runtime.activeExecutingToolCallIds).toEqual([]);
expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(0); expect(runtime.pendingApprovalBatchByToolCallId.size).toBe(0);
}); });
@@ -178,6 +184,29 @@ describe("extractInterruptToolReturns", () => {
]); ]);
}); });
test("converts multimodal tool_return content into displayable text", () => {
const results: ApprovalResult[] = [
{
type: "tool",
tool_call_id: "call-multimodal",
status: "error",
tool_return: [
{ type: "text", text: "Interrupted by user" },
{ type: "image", image_url: "https://example.com/image.png" },
],
} as ApprovalResult,
];
const mapped = extractInterruptToolReturns(results);
expect(mapped).toEqual([
{
tool_call_id: "call-multimodal",
status: "error",
tool_return: "Interrupted by user",
},
]);
});
test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => { test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => {
const runtime = createRuntime(); const runtime = createRuntime();
const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket; const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
@@ -208,16 +237,26 @@ describe("extractInterruptToolReturns", () => {
expect(toolReturnFrames).toHaveLength(2); expect(toolReturnFrames).toHaveLength(2);
expect(toolReturnFrames[0]).toMatchObject({ expect(toolReturnFrames[0]).toMatchObject({
run_id: "run-1", run_id: "run-1",
tool_call_id: "call-a", tool_returns: [
status: "success", { tool_call_id: "call-a", status: "success", tool_return: "704" },
tool_returns: [{ tool_call_id: "call-a", status: "success" }], ],
}); });
expect(toolReturnFrames[1]).toMatchObject({ expect(toolReturnFrames[1]).toMatchObject({
run_id: "run-1", run_id: "run-1",
tool_call_id: "call-b", tool_returns: [
status: "error", {
tool_returns: [{ tool_call_id: "call-b", status: "error" }], tool_call_id: "call-b",
status: "error",
tool_return: "User interrupted the stream",
},
],
}); });
expect(toolReturnFrames[0]).not.toHaveProperty("tool_call_id");
expect(toolReturnFrames[0]).not.toHaveProperty("status");
expect(toolReturnFrames[0]).not.toHaveProperty("tool_return");
expect(toolReturnFrames[1]).not.toHaveProperty("tool_call_id");
expect(toolReturnFrames[1]).not.toHaveProperty("status");
expect(toolReturnFrames[1]).not.toHaveProperty("tool_return");
}); });
}); });
@@ -305,13 +344,14 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
// Cancel fires: populateInterruptQueue (Path A — has execution results) // Cancel fires: populateInterruptQueue (Path A — has execution results)
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: executionResults, lastExecutionResults: executionResults,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: ["call-1", "call-2"], lastNeedsUserInputToolCallIds: ["call-1", "call-2"],
agentId, agentId,
conversationId, conversationId,
}); });
expect(populated).toBe(true); expect(populated).toBe(true);
expect(runtime.pendingInterruptedResults).toBe(executionResults); expect(runtime.pendingInterruptedResults).toEqual(executionResults);
expect(runtime.pendingInterruptedContext).toMatchObject({ expect(runtime.pendingInterruptedContext).toMatchObject({
agentId, agentId,
conversationId, conversationId,
@@ -322,9 +362,10 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
const consumed = consumeInterruptQueue(runtime, agentId, conversationId); const consumed = consumeInterruptQueue(runtime, agentId, conversationId);
expect(consumed).not.toBeNull(); expect(consumed).not.toBeNull();
expect(consumed?.type).toBe("approval"); expect(consumed?.approvalMessage.type).toBe("approval");
expect(consumed?.approvals).toBe(executionResults); expect(consumed?.approvalMessage.approvals).toEqual(executionResults);
expect(consumed?.approvals).toHaveLength(2); expect(consumed?.approvalMessage.approvals).toHaveLength(2);
expect(consumed?.interruptedToolCallIds).toEqual([]);
// Queue is atomically cleared after consumption // Queue is atomically cleared after consumption
expect(runtime.pendingInterruptedResults).toBeNull(); expect(runtime.pendingInterruptedResults).toBeNull();
@@ -342,6 +383,7 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: executionResults, lastExecutionResults: executionResults,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: ["call-1"], lastNeedsUserInputToolCallIds: ["call-1"],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -353,9 +395,87 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
approve: true, // Path A preserves actual approval state approve: true, // Path A preserves actual approval state
}); });
}); });
test("normalizes interrupted tool results to error via structured tool_call_id", () => {
const runtime = createRuntime();
const executionResults: ApprovalResult[] = [
{
type: "tool",
tool_call_id: "call-1",
status: "success",
tool_return: "result text does not matter when ID is interrupted",
} as unknown as ApprovalResult,
];
const populated = populateInterruptQueue(runtime, {
lastExecutionResults: executionResults,
lastExecutingToolCallIds: ["call-1"],
lastNeedsUserInputToolCallIds: [],
agentId: "agent-1",
conversationId: "conv-1",
});
expect(populated).toBe(true);
expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({
type: "tool",
tool_call_id: "call-1",
status: "error",
});
expect(runtime.pendingInterruptedToolCallIds).toEqual(["call-1"]);
});
test("keeps legacy text fallback for interrupted tool return normalization", () => {
const runtime = createRuntime();
const executionResults: ApprovalResult[] = [
{
type: "tool",
tool_call_id: "call-legacy",
status: "success",
tool_return: [{ type: "text", text: "Interrupted by user" }],
} as unknown as ApprovalResult,
];
const populated = populateInterruptQueue(runtime, {
lastExecutionResults: executionResults,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [],
agentId: "agent-1",
conversationId: "conv-1",
});
expect(populated).toBe(true);
expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({
type: "tool",
tool_call_id: "call-legacy",
status: "error",
});
});
}); });
describe("Path B: cancel during approval wait → next turn consumes synthesized denials", () => { describe("Path B: cancel during approval wait → next turn consumes synthesized denials", () => {
test("prefers synthesized tool-error results when execution was already in-flight", () => {
const runtime = createRuntime();
const populated = populateInterruptQueue(runtime, {
lastExecutionResults: null,
lastExecutingToolCallIds: ["call-running-1"],
lastNeedsUserInputToolCallIds: ["call-running-1"],
agentId: "agent-1",
conversationId: "conv-1",
});
expect(populated).toBe(true);
expect(runtime.pendingInterruptedResults).toEqual([
{
type: "tool",
tool_call_id: "call-running-1",
tool_return: "Interrupted by user",
status: "error",
},
]);
expect(runtime.pendingInterruptedToolCallIds).toEqual(["call-running-1"]);
});
test("full sequence: populate from batch map IDs → consume synthesized denials", () => { test("full sequence: populate from batch map IDs → consume synthesized denials", () => {
const runtime = createRuntime(); const runtime = createRuntime();
const agentId = "agent-abc"; const agentId = "agent-abc";
@@ -371,6 +491,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
// Cancel fires during approval wait: no execution results // Cancel fires during approval wait: no execution results
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: null, lastExecutionResults: null,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId, agentId,
conversationId, conversationId,
@@ -397,7 +518,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
// Next user message: consume // Next user message: consume
const consumed = consumeInterruptQueue(runtime, agentId, conversationId); const consumed = consumeInterruptQueue(runtime, agentId, conversationId);
expect(consumed).not.toBeNull(); expect(consumed).not.toBeNull();
expect(consumed?.approvals).toHaveLength(2); expect(consumed?.approvalMessage.approvals).toHaveLength(2);
// Queue cleared // Queue cleared
expect(runtime.pendingInterruptedResults).toBeNull(); expect(runtime.pendingInterruptedResults).toBeNull();
@@ -409,6 +530,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
// No batch map entries, but we have the snapshot IDs // No batch map entries, but we have the snapshot IDs
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: null, lastExecutionResults: null,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: ["call-a", "call-b"], lastNeedsUserInputToolCallIds: ["call-a", "call-b"],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -427,6 +549,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: null, lastExecutionResults: null,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -453,6 +576,7 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", (
reason: "cancelled", reason: "cancelled",
}, },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId, agentId,
conversationId: convId, conversationId: convId,
@@ -476,6 +600,7 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", (
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId, agentId,
conversationId: convId, conversationId: convId,
@@ -496,6 +621,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-first", approve: true }, { type: "approval", tool_call_id: "call-first", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -511,6 +637,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
reason: "x", reason: "x",
}, },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -530,6 +657,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -543,6 +671,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-2", approve: true }, { type: "approval", tool_call_id: "call-2", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -564,6 +693,7 @@ describe("epoch guard: stale context discarded on consume", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -587,6 +717,7 @@ describe("epoch guard: stale context discarded on consume", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-old", agentId: "agent-old",
conversationId: "conv-1", conversationId: "conv-1",
@@ -605,6 +736,7 @@ describe("epoch guard: stale context discarded on consume", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-old", conversationId: "conv-old",
@@ -623,6 +755,7 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial",
// Also batch map should be cleared by clearPendingApprovalBatchIds // Also batch map should be cleared by clearPendingApprovalBatchIds
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: null, lastExecutionResults: null,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], // cleared after send lastNeedsUserInputToolCallIds: [], // cleared after send
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -640,6 +773,7 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial",
const populated = populateInterruptQueue(runtime, { const populated = populateInterruptQueue(runtime, {
lastExecutionResults: null, lastExecutionResults: null,
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], // cleared from previous send lastNeedsUserInputToolCallIds: [], // cleared from previous send
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -716,6 +850,7 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-1", agentId: "agent-1",
conversationId: "conv-1", conversationId: "conv-1",
@@ -734,6 +869,7 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => {
lastExecutionResults: [ lastExecutionResults: [
{ type: "approval", tool_call_id: "call-1", approve: true }, { type: "approval", tool_call_id: "call-1", approve: true },
], ],
lastExecutingToolCallIds: [],
lastNeedsUserInputToolCallIds: [], lastNeedsUserInputToolCallIds: [],
agentId: "agent-old", agentId: "agent-old",
conversationId: "conv-old", conversationId: "conv-old",

View File

@@ -18,6 +18,7 @@ import {
executeApprovalBatch, executeApprovalBatch,
} from "../agent/approval-execution"; } from "../agent/approval-execution";
import { fetchRunErrorDetail } from "../agent/approval-recovery"; import { fetchRunErrorDetail } from "../agent/approval-recovery";
import { normalizeApprovalResultsForPersistence } from "../agent/approval-result-normalization";
import { getResumeData } from "../agent/check-approval"; import { getResumeData } from "../agent/check-approval";
import { getClient } from "../agent/client"; import { getClient } from "../agent/client";
import { getStreamToolContextId, sendMessageStream } from "../agent/message"; import { getStreamToolContextId, sendMessageStream } from "../agent/message";
@@ -34,6 +35,7 @@ import { createBuffers } from "../cli/helpers/accumulator";
import { classifyApprovals } from "../cli/helpers/approvalClassification"; import { classifyApprovals } from "../cli/helpers/approvalClassification";
import { generatePlanFilePath } from "../cli/helpers/planName"; import { generatePlanFilePath } from "../cli/helpers/planName";
import { drainStreamWithResume } from "../cli/helpers/stream"; import { drainStreamWithResume } from "../cli/helpers/stream";
import { INTERRUPTED_BY_USER } from "../constants";
import { computeDiffPreviews } from "../helpers/diffPreview"; import { computeDiffPreviews } from "../helpers/diffPreview";
import { permissionMode } from "../permissions/mode"; import { permissionMode } from "../permissions/mode";
import { type QueueItem, QueueRuntime } from "../queue/queueRuntime"; import { type QueueItem, QueueRuntime } from "../queue/queueRuntime";
@@ -306,6 +308,16 @@ type ListenerRuntime = {
} | null; } | null;
/** Monotonic epoch for queued continuation validity checks. */ /** Monotonic epoch for queued continuation validity checks. */
continuationEpoch: number; continuationEpoch: number;
/**
* Tool call ids currently executing in the active approval loop turn.
* Used for eager cancel-time interrupt capture parity with App/headless.
*/
activeExecutingToolCallIds: string[];
/**
* Structured interrupted tool_call_ids carried with queued interrupt approvals.
* Threaded into the next send for persistence normalization.
*/
pendingInterruptedToolCallIds: string[] | null;
}; };
type ApprovalSlot = type ApprovalSlot =
@@ -382,6 +394,8 @@ function createRuntime(): ListenerRuntime {
pendingInterruptedResults: null, pendingInterruptedResults: null,
pendingInterruptedContext: null, pendingInterruptedContext: null,
continuationEpoch: 0, continuationEpoch: 0,
activeExecutingToolCallIds: [],
pendingInterruptedToolCallIds: null,
coalescedSkipQueueItemIds: new Set<string>(), coalescedSkipQueueItemIds: new Set<string>(),
pendingTurns: 0, pendingTurns: 0,
// queueRuntime assigned below — needs runtime ref in callbacks // queueRuntime assigned below — needs runtime ref in callbacks
@@ -559,6 +573,8 @@ function stopRuntime(
// Clear interrupted queue on true teardown to prevent cross-session leakage. // Clear interrupted queue on true teardown to prevent cross-session leakage.
runtime.pendingInterruptedResults = null; runtime.pendingInterruptedResults = null;
runtime.pendingInterruptedContext = null; runtime.pendingInterruptedContext = null;
runtime.pendingInterruptedToolCallIds = null;
runtime.activeExecutingToolCallIds = [];
runtime.continuationEpoch++; runtime.continuationEpoch++;
if (!runtime.socket) { if (!runtime.socket) {
@@ -907,6 +923,7 @@ function shouldAttemptPostStopApprovalRecovery(params: {
interface InterruptPopulateInput { interface InterruptPopulateInput {
lastExecutionResults: ApprovalResult[] | null; lastExecutionResults: ApprovalResult[] | null;
lastExecutingToolCallIds: string[];
lastNeedsUserInputToolCallIds: string[]; lastNeedsUserInputToolCallIds: string[];
agentId: string; agentId: string;
conversationId: string; conversationId: string;
@@ -920,10 +937,48 @@ interface InterruptToolReturn {
stderr?: string[]; stderr?: string[];
} }
function asToolReturnStatus(value: unknown): "success" | "error" | null {
if (value === "success" || value === "error") {
return value;
}
return null;
}
function normalizeToolReturnValue(value: unknown): string { function normalizeToolReturnValue(value: unknown): string {
if (typeof value === "string") { if (typeof value === "string") {
return value; return value;
} }
if (Array.isArray(value)) {
const textParts = value
.filter(
(
part,
): part is {
type: string;
text: string;
} =>
!!part &&
typeof part === "object" &&
"type" in part &&
part.type === "text" &&
"text" in part &&
typeof part.text === "string",
)
.map((part) => part.text);
if (textParts.length > 0) {
return textParts.join("\n");
}
}
if (
value &&
typeof value === "object" &&
"type" in value &&
value.type === "text" &&
"text" in value &&
typeof value.text === "string"
) {
return value.text;
}
if (value === null || value === undefined) { if (value === null || value === undefined) {
return ""; return "";
} }
@@ -934,6 +989,130 @@ function normalizeToolReturnValue(value: unknown): string {
} }
} }
function normalizeInterruptedApprovalsForQueue(
approvals: ApprovalResult[] | null,
interruptedToolCallIds: string[],
): ApprovalResult[] | null {
if (!approvals || approvals.length === 0) {
return approvals;
}
return normalizeApprovalResultsForPersistence(approvals, {
interruptedToolCallIds,
// Temporary fallback guard while all producers migrate to structured IDs.
allowInterruptTextFallback: true,
});
}
function normalizeExecutionResultsForInterruptParity(
runtime: ListenerRuntime,
executionResults: ApprovalResult[],
executingToolCallIds: string[],
): ApprovalResult[] {
if (!runtime.cancelRequested || executionResults.length === 0) {
return executionResults;
}
return normalizeApprovalResultsForPersistence(executionResults, {
interruptedToolCallIds: executingToolCallIds,
});
}
function extractCanonicalToolReturnsFromWire(
payload: Record<string, unknown>,
): InterruptToolReturn[] {
const fromArray: InterruptToolReturn[] = [];
const toolReturnsValue = payload.tool_returns;
if (Array.isArray(toolReturnsValue)) {
for (const raw of toolReturnsValue) {
if (!raw || typeof raw !== "object") {
continue;
}
const rec = raw as Record<string, unknown>;
const toolCallId =
typeof rec.tool_call_id === "string" ? rec.tool_call_id : null;
const status = asToolReturnStatus(rec.status);
if (!toolCallId || !status) {
continue;
}
const stdout = Array.isArray(rec.stdout)
? rec.stdout.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined;
const stderr = Array.isArray(rec.stderr)
? rec.stderr.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined;
fromArray.push({
tool_call_id: toolCallId,
status,
tool_return: normalizeToolReturnValue(rec.tool_return),
...(stdout ? { stdout } : {}),
...(stderr ? { stderr } : {}),
});
}
}
if (fromArray.length > 0) {
return fromArray;
}
const topLevelToolCallId =
typeof payload.tool_call_id === "string" ? payload.tool_call_id : null;
const topLevelStatus = asToolReturnStatus(payload.status);
if (!topLevelToolCallId || !topLevelStatus) {
return [];
}
const stdout = Array.isArray(payload.stdout)
? payload.stdout.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined;
const stderr = Array.isArray(payload.stderr)
? payload.stderr.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined;
return [
{
tool_call_id: topLevelToolCallId,
status: topLevelStatus,
tool_return: normalizeToolReturnValue(payload.tool_return),
...(stdout ? { stdout } : {}),
...(stderr ? { stderr } : {}),
},
];
}
function normalizeToolReturnWireMessage(
chunk: Record<string, unknown>,
): Record<string, unknown> | null {
if (chunk.message_type !== "tool_return_message") {
return chunk;
}
const canonicalToolReturns = extractCanonicalToolReturnsFromWire(chunk);
if (canonicalToolReturns.length === 0) {
return null;
}
const {
tool_call_id: _toolCallId,
status: _status,
tool_return: _toolReturn,
stdout: _stdout,
stderr: _stderr,
...rest
} = chunk;
return {
...rest,
message_type: "tool_return_message",
tool_returns: canonicalToolReturns,
};
}
function extractInterruptToolReturns( function extractInterruptToolReturns(
approvals: ApprovalResult[] | null, approvals: ApprovalResult[] | null,
): InterruptToolReturn[] { ): InterruptToolReturn[] {
@@ -1030,12 +1209,15 @@ function emitInterruptToolReturnMessage(
id: `message-${crypto.randomUUID()}`, id: `message-${crypto.randomUUID()}`,
date: new Date().toISOString(), date: new Date().toISOString(),
run_id: resolvedRunId, run_id: resolvedRunId,
tool_call_id: toolReturn.tool_call_id, tool_returns: [
tool_return: toolReturn.tool_return, {
status: toolReturn.status, tool_call_id: toolReturn.tool_call_id,
...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}), status: toolReturn.status,
...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}), tool_return: toolReturn.tool_return,
tool_returns: [toolReturn], ...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}),
...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}),
},
],
session_id: runtime.sessionId, session_id: runtime.sessionId,
uuid: `${uuidPrefix}-${crypto.randomUUID()}`, uuid: `${uuidPrefix}-${crypto.randomUUID()}`,
} as unknown as MessageWire); } as unknown as MessageWire);
@@ -1092,12 +1274,38 @@ function populateInterruptQueue(
if (input.lastExecutionResults && input.lastExecutionResults.length > 0) { if (input.lastExecutionResults && input.lastExecutionResults.length > 0) {
// Path A: execution happened before cancel — queue actual results // Path A: execution happened before cancel — queue actual results
runtime.pendingInterruptedResults = input.lastExecutionResults; // Guard parity: interrupted tool returns must persist as status=error.
runtime.pendingInterruptedResults = normalizeInterruptedApprovalsForQueue(
input.lastExecutionResults,
input.lastExecutingToolCallIds,
);
runtime.pendingInterruptedContext = { runtime.pendingInterruptedContext = {
agentId: input.agentId, agentId: input.agentId,
conversationId: input.conversationId, conversationId: input.conversationId,
continuationEpoch: runtime.continuationEpoch, continuationEpoch: runtime.continuationEpoch,
}; };
runtime.pendingInterruptedToolCallIds = [...input.lastExecutingToolCallIds];
return true;
}
// Path A.5: execution was in-flight (approved tools started) but no
// terminal results were captured before cancel. Match App/headless parity by
// queuing explicit tool errors, not synthetic approval denials.
if (input.lastExecutingToolCallIds.length > 0) {
runtime.pendingInterruptedResults = input.lastExecutingToolCallIds.map(
(toolCallId) => ({
type: "tool" as const,
tool_call_id: toolCallId,
tool_return: INTERRUPTED_BY_USER,
status: "error" as const,
}),
);
runtime.pendingInterruptedContext = {
agentId: input.agentId,
conversationId: input.conversationId,
continuationEpoch: runtime.continuationEpoch,
};
runtime.pendingInterruptedToolCallIds = [...input.lastExecutingToolCallIds];
return true; return true;
} }
@@ -1120,6 +1328,7 @@ function populateInterruptQueue(
conversationId: input.conversationId, conversationId: input.conversationId,
continuationEpoch: runtime.continuationEpoch, continuationEpoch: runtime.continuationEpoch,
}; };
runtime.pendingInterruptedToolCallIds = null;
return true; return true;
} }
@@ -1144,7 +1353,10 @@ function consumeInterruptQueue(
runtime: ListenerRuntime, runtime: ListenerRuntime,
agentId: string, agentId: string,
conversationId: string, conversationId: string,
): { type: "approval"; approvals: ApprovalResult[] } | null { ): {
approvalMessage: { type: "approval"; approvals: ApprovalResult[] };
interruptedToolCallIds: string[];
} | null {
if ( if (
!runtime.pendingInterruptedResults || !runtime.pendingInterruptedResults ||
runtime.pendingInterruptedResults.length === 0 runtime.pendingInterruptedResults.length === 0
@@ -1153,7 +1365,10 @@ function consumeInterruptQueue(
} }
const ctx = runtime.pendingInterruptedContext; const ctx = runtime.pendingInterruptedContext;
let result: { type: "approval"; approvals: ApprovalResult[] } | null = null; let result: {
approvalMessage: { type: "approval"; approvals: ApprovalResult[] };
interruptedToolCallIds: string[];
} | null = null;
if ( if (
ctx && ctx &&
@@ -1162,8 +1377,13 @@ function consumeInterruptQueue(
ctx.continuationEpoch === runtime.continuationEpoch ctx.continuationEpoch === runtime.continuationEpoch
) { ) {
result = { result = {
type: "approval", approvalMessage: {
approvals: runtime.pendingInterruptedResults, type: "approval",
approvals: runtime.pendingInterruptedResults,
},
interruptedToolCallIds: runtime.pendingInterruptedToolCallIds
? [...runtime.pendingInterruptedToolCallIds]
: [],
}; };
} }
@@ -1171,6 +1391,7 @@ function consumeInterruptQueue(
// Stale results for wrong context are discarded, not retried. // Stale results for wrong context are discarded, not retried.
runtime.pendingInterruptedResults = null; runtime.pendingInterruptedResults = null;
runtime.pendingInterruptedContext = null; runtime.pendingInterruptedContext = null;
runtime.pendingInterruptedToolCallIds = null;
runtime.pendingApprovalBatchByToolCallId.clear(); runtime.pendingApprovalBatchByToolCallId.clear();
return result; return result;
@@ -1963,6 +2184,30 @@ async function connectWithRetry(
} }
runtime.cancelRequested = true; runtime.cancelRequested = true;
// Eager interrupt capture parity with App/headless:
// if tool execution is currently in-flight, queue explicit interrupted
// tool results immediately at cancel time (before async catch paths).
if (
runtime.activeExecutingToolCallIds.length > 0 &&
(!runtime.pendingInterruptedResults ||
runtime.pendingInterruptedResults.length === 0)
) {
runtime.pendingInterruptedResults =
runtime.activeExecutingToolCallIds.map((toolCallId) => ({
type: "tool",
tool_call_id: toolCallId,
tool_return: INTERRUPTED_BY_USER,
status: "error",
}));
runtime.pendingInterruptedContext = {
agentId: runtime.activeAgentId || "",
conversationId: runtime.activeConversationId || "default",
continuationEpoch: runtime.continuationEpoch,
};
runtime.pendingInterruptedToolCallIds = [
...runtime.activeExecutingToolCallIds,
];
}
if ( if (
runtime.activeAbortController && runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted !runtime.activeAbortController.signal.aborted
@@ -2288,6 +2533,7 @@ async function handleIncomingMessage(
// Track last approval-loop state for cancel-time queueing (Phase 1.2). // Track last approval-loop state for cancel-time queueing (Phase 1.2).
// Hoisted before try so the cancel catch block can access them. // Hoisted before try so the cancel catch block can access them.
let lastExecutionResults: ApprovalResult[] | null = null; let lastExecutionResults: ApprovalResult[] | null = null;
let lastExecutingToolCallIds: string[] = [];
let lastNeedsUserInputToolCallIds: string[] = []; let lastNeedsUserInputToolCallIds: string[] = [];
runtime.isProcessing = true; runtime.isProcessing = true;
@@ -2297,6 +2543,7 @@ async function handleIncomingMessage(
runtime.activeConversationId = conversationId; runtime.activeConversationId = conversationId;
runtime.activeRunId = null; runtime.activeRunId = null;
runtime.activeRunStartedAt = new Date().toISOString(); runtime.activeRunStartedAt = new Date().toISOString();
runtime.activeExecutingToolCallIds = [];
try { try {
// Latch capability: once seen, always use blocking path (strict check to avoid truthy strings) // Latch capability: once seen, always use blocking path (strict check to avoid truthy strings)
@@ -2322,6 +2569,8 @@ async function handleIncomingMessage(
let messagesToSend: Array<MessageCreate | ApprovalCreate> = []; let messagesToSend: Array<MessageCreate | ApprovalCreate> = [];
let turnToolContextId: string | null = null; let turnToolContextId: string | null = null;
let queuedInterruptedToolCallIds: string[] = [];
let shouldClearSubmittedApprovalTracking = false;
// Prepend queued interrupted results from a prior cancelled turn. // Prepend queued interrupted results from a prior cancelled turn.
const consumed = consumeInterruptQueue( const consumed = consumeInterruptQueue(
@@ -2330,7 +2579,8 @@ async function handleIncomingMessage(
conversationId, conversationId,
); );
if (consumed) { if (consumed) {
messagesToSend.push(consumed); messagesToSend.push(consumed.approvalMessage);
queuedInterruptedToolCallIds = consumed.interruptedToolCallIds;
} }
messagesToSend.push(...msg.messages); messagesToSend.push(...msg.messages);
@@ -2362,12 +2612,29 @@ async function handleIncomingMessage(
approvalMessage, approvalMessage,
resumeData.pendingApprovals, resumeData.pendingApprovals,
); );
lastExecutingToolCallIds = decisions
.filter(
(
decision,
): decision is Extract<ApprovalDecision, { type: "approve" }> =>
decision.type === "approve",
)
.map((decision) => decision.approval.toolCallId);
runtime.activeExecutingToolCallIds = [...lastExecutingToolCallIds];
const decisionResults = const decisionResults =
decisions.length > 0 decisions.length > 0
? await executeApprovalBatch(decisions, undefined, { ? await executeApprovalBatch(decisions, undefined, {
toolContextId: turnToolContextId ?? undefined, toolContextId: turnToolContextId ?? undefined,
abortSignal: runtime.activeAbortController.signal,
}) })
: []; : [];
const persistedDecisionResults =
normalizeExecutionResultsForInterruptParity(
runtime,
decisionResults,
lastExecutingToolCallIds,
);
const rebuiltApprovals: ApprovalResult[] = []; const rebuiltApprovals: ApprovalResult[] = [];
let decisionResultIndex = 0; let decisionResultIndex = 0;
@@ -2378,7 +2645,7 @@ async function handleIncomingMessage(
continue; continue;
} }
const next = decisionResults[decisionResultIndex]; const next = persistedDecisionResults[decisionResultIndex];
if (next) { if (next) {
rebuiltApprovals.push(next); rebuiltApprovals.push(next);
decisionResultIndex++; decisionResultIndex++;
@@ -2393,6 +2660,8 @@ async function handleIncomingMessage(
}); });
} }
lastExecutionResults = rebuiltApprovals;
shouldClearSubmittedApprovalTracking = true;
messagesToSend = [ messagesToSend = [
{ {
type: "approval", type: "approval",
@@ -2411,15 +2680,33 @@ async function handleIncomingMessage(
} }
let currentInput = messagesToSend; let currentInput = messagesToSend;
const sendOptions: Parameters<typeof sendMessageStream>[2] = {
agentId,
streamTokens: true,
background: true,
...(queuedInterruptedToolCallIds.length > 0
? {
approvalNormalization: {
interruptedToolCallIds: queuedInterruptedToolCallIds,
},
}
: {}),
};
let stream = await sendMessageStreamWithRetry( let stream = await sendMessageStreamWithRetry(
conversationId, conversationId,
currentInput, currentInput,
{ agentId, streamTokens: true, background: true }, sendOptions,
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, runtime.activeAbortController.signal,
); );
if (shouldClearSubmittedApprovalTracking) {
lastExecutionResults = null;
lastExecutingToolCallIds = [];
lastNeedsUserInputToolCallIds = [];
runtime.activeExecutingToolCallIds = [];
}
turnToolContextId = getStreamToolContextId( turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>, stream as Stream<LettaStreamingResponse>,
@@ -2477,12 +2764,18 @@ async function handleIncomingMessage(
otid?: string; otid?: string;
id?: string; id?: string;
}; };
emitToWS(socket, { const normalizedChunk = normalizeToolReturnWireMessage(
...chunk, chunk as unknown as Record<string, unknown>,
type: "message", );
session_id: runtime.sessionId, if (normalizedChunk) {
uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), emitToWS(socket, {
} as MessageWire); ...normalizedChunk,
type: "message",
session_id: runtime.sessionId,
uuid:
chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(),
} as unknown as MessageWire);
}
} }
return undefined; return undefined;
@@ -2603,7 +2896,7 @@ async function handleIncomingMessage(
stream = await sendMessageStreamWithRetry( stream = await sendMessageStreamWithRetry(
conversationId, conversationId,
currentInput, currentInput,
{ agentId, streamTokens: true, background: true }, sendOptions,
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, runtime.activeAbortController.signal,
@@ -2875,6 +3168,16 @@ async function handleIncomingMessage(
} }
} }
// Snapshot executing tool_call_ids before execution starts so cancel can
// preserve tool-error parity even if execution aborts mid-await.
lastExecutingToolCallIds = decisions
.filter(
(decision): decision is Extract<Decision, { type: "approve" }> =>
decision.type === "approve",
)
.map((decision) => decision.approval.toolCallId);
runtime.activeExecutingToolCallIds = [...lastExecutingToolCallIds];
// Execute approved/denied tools // Execute approved/denied tools
const executionResults = await executeApprovalBatch( const executionResults = await executeApprovalBatch(
decisions, decisions,
@@ -2884,13 +3187,19 @@ async function handleIncomingMessage(
abortSignal: runtime.activeAbortController.signal, abortSignal: runtime.activeAbortController.signal,
}, },
); );
lastExecutionResults = executionResults; const persistedExecutionResults =
normalizeExecutionResultsForInterruptParity(
runtime,
executionResults,
lastExecutingToolCallIds,
);
lastExecutionResults = persistedExecutionResults;
// WS-first parity: publish tool-return terminal outcomes immediately on // WS-first parity: publish tool-return terminal outcomes immediately on
// normal approval execution, before continuation stream send. // normal approval execution, before continuation stream send.
emitInterruptToolReturnMessage( emitInterruptToolReturnMessage(
socket, socket,
runtime, runtime,
executionResults, persistedExecutionResults,
runtime.activeRunId || runtime.activeRunId ||
runId || runId ||
msgRunIds[msgRunIds.length - 1] || msgRunIds[msgRunIds.length - 1] ||
@@ -2906,13 +3215,13 @@ async function handleIncomingMessage(
currentInput = [ currentInput = [
{ {
type: "approval", type: "approval",
approvals: executionResults, approvals: persistedExecutionResults,
}, },
]; ];
stream = await sendMessageStreamWithRetry( stream = await sendMessageStreamWithRetry(
conversationId, conversationId,
currentInput, currentInput,
{ agentId, streamTokens: true, background: true }, sendOptions,
socket, socket,
runtime, runtime,
runtime.activeAbortController.signal, runtime.activeAbortController.signal,
@@ -2922,7 +3231,9 @@ async function handleIncomingMessage(
// cancel during the subsequent stream drain won't queue already-sent // cancel during the subsequent stream drain won't queue already-sent
// results (Path A) or re-deny already-resolved tool calls (Path B). // results (Path A) or re-deny already-resolved tool calls (Path B).
lastExecutionResults = null; lastExecutionResults = null;
lastExecutingToolCallIds = [];
lastNeedsUserInputToolCallIds = []; lastNeedsUserInputToolCallIds = [];
runtime.activeExecutingToolCallIds = [];
turnToolContextId = getStreamToolContextId( turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>, stream as Stream<LettaStreamingResponse>,
@@ -2933,6 +3244,7 @@ async function handleIncomingMessage(
// Queue interrupted tool-call resolutions for the next message turn. // Queue interrupted tool-call resolutions for the next message turn.
populateInterruptQueue(runtime, { populateInterruptQueue(runtime, {
lastExecutionResults, lastExecutionResults,
lastExecutingToolCallIds,
lastNeedsUserInputToolCallIds, lastNeedsUserInputToolCallIds,
agentId: agentId || "", agentId: agentId || "",
conversationId, conversationId,
@@ -3042,6 +3354,7 @@ async function handleIncomingMessage(
} finally { } finally {
runtime.activeAbortController = null; runtime.activeAbortController = null;
runtime.cancelRequested = false; runtime.cancelRequested = false;
runtime.activeExecutingToolCallIds = [];
} }
} }
@@ -3078,5 +3391,7 @@ export const __listenClientTestUtils = {
extractInterruptToolReturns, extractInterruptToolReturns,
emitInterruptToolReturnMessage, emitInterruptToolReturnMessage,
getInterruptApprovalsForEmission, getInterruptApprovalsForEmission,
normalizeToolReturnWireMessage,
normalizeExecutionResultsForInterruptParity,
shouldAttemptPostStopApprovalRecovery, shouldAttemptPostStopApprovalRecovery,
}; };