fix(listen): normalize interrupt tool returns for websocket parity (#1289)
This commit is contained in:
123
src/agent/approval-result-normalization.ts
Normal file
123
src/agent/approval-result-normalization.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
|
||||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import { INTERRUPTED_BY_USER } from "../constants";
|
||||
import type { ApprovalResult } from "./approval-execution";
|
||||
|
||||
type OutgoingMessage = MessageCreate | ApprovalCreate;
|
||||
|
||||
export type ApprovalNormalizationOptions = {
|
||||
/**
|
||||
* Structured interrupt provenance: tool_call_ids known to have been interrupted.
|
||||
* When provided, these IDs are forced to persist as status=error.
|
||||
*/
|
||||
interruptedToolCallIds?: Iterable<string>;
|
||||
/**
|
||||
* Temporary fallback guard for legacy drift where tool_return text is the only
|
||||
* interrupt signal. Keep false by default for strict structured behavior.
|
||||
*/
|
||||
allowInterruptTextFallback?: boolean;
|
||||
};
|
||||
|
||||
function normalizeToolReturnText(value: unknown): string {
|
||||
if (typeof value === "string") return value;
|
||||
|
||||
if (Array.isArray(value)) {
|
||||
const text = value
|
||||
.filter(
|
||||
(part): part is { type: "text"; text: string } =>
|
||||
!!part &&
|
||||
typeof part === "object" &&
|
||||
"type" in part &&
|
||||
(part as { type?: unknown }).type === "text" &&
|
||||
"text" in part &&
|
||||
typeof (part as { text?: unknown }).text === "string",
|
||||
)
|
||||
.map((part) => part.text)
|
||||
.join("\n")
|
||||
.trim();
|
||||
return text;
|
||||
}
|
||||
|
||||
if (value === null || value === undefined) return "";
|
||||
|
||||
try {
|
||||
return JSON.stringify(value);
|
||||
} catch {
|
||||
return String(value);
|
||||
}
|
||||
}
|
||||
|
||||
export function normalizeApprovalResultsForPersistence(
|
||||
approvals: ApprovalResult[] | null | undefined,
|
||||
options: ApprovalNormalizationOptions = {},
|
||||
): ApprovalResult[] {
|
||||
if (!approvals || approvals.length === 0) return approvals ?? [];
|
||||
|
||||
const interruptedSet = new Set(options.interruptedToolCallIds ?? []);
|
||||
|
||||
return approvals.map((approval) => {
|
||||
if (
|
||||
!approval ||
|
||||
typeof approval !== "object" ||
|
||||
!("type" in approval) ||
|
||||
approval.type !== "tool"
|
||||
) {
|
||||
return approval;
|
||||
}
|
||||
|
||||
const toolCallId =
|
||||
"tool_call_id" in approval && typeof approval.tool_call_id === "string"
|
||||
? approval.tool_call_id
|
||||
: "";
|
||||
|
||||
const interruptedByStructuredId =
|
||||
toolCallId.length > 0 && interruptedSet.has(toolCallId);
|
||||
const interruptedByLegacyText = options.allowInterruptTextFallback
|
||||
? normalizeToolReturnText(
|
||||
"tool_return" in approval ? approval.tool_return : "",
|
||||
) === INTERRUPTED_BY_USER
|
||||
: false;
|
||||
|
||||
if (
|
||||
(interruptedByStructuredId || interruptedByLegacyText) &&
|
||||
"status" in approval &&
|
||||
approval.status !== "error"
|
||||
) {
|
||||
return {
|
||||
...approval,
|
||||
status: "error" as const,
|
||||
};
|
||||
}
|
||||
|
||||
return approval;
|
||||
});
|
||||
}
|
||||
|
||||
export function normalizeOutgoingApprovalMessages(
|
||||
messages: OutgoingMessage[],
|
||||
options: ApprovalNormalizationOptions = {},
|
||||
): OutgoingMessage[] {
|
||||
if (!messages || messages.length === 0) return messages;
|
||||
|
||||
return messages.map((message) => {
|
||||
if (
|
||||
!message ||
|
||||
typeof message !== "object" ||
|
||||
!("type" in message) ||
|
||||
message.type !== "approval" ||
|
||||
!("approvals" in message)
|
||||
) {
|
||||
return message;
|
||||
}
|
||||
|
||||
const normalizedApprovals = normalizeApprovalResultsForPersistence(
|
||||
message.approvals as ApprovalResult[],
|
||||
options,
|
||||
);
|
||||
|
||||
return {
|
||||
...message,
|
||||
approvals: normalizedApprovals,
|
||||
} as ApprovalCreate;
|
||||
});
|
||||
}
|
||||
@@ -13,6 +13,10 @@ import {
|
||||
waitForToolsetReady,
|
||||
} from "../tools/manager";
|
||||
import { isTimingsEnabled } from "../utils/timing";
|
||||
import {
|
||||
type ApprovalNormalizationOptions,
|
||||
normalizeOutgoingApprovalMessages,
|
||||
} from "./approval-result-normalization";
|
||||
import { getClient } from "./client";
|
||||
|
||||
const streamRequestStartTimes = new WeakMap<object, number>();
|
||||
@@ -58,6 +62,7 @@ export async function sendMessageStream(
|
||||
streamTokens?: boolean;
|
||||
background?: boolean;
|
||||
agentId?: string; // Required when conversationId is "default"
|
||||
approvalNormalization?: ApprovalNormalizationOptions;
|
||||
} = { streamTokens: true, background: true },
|
||||
// Disable SDK retries by default - state management happens outside the stream,
|
||||
// so retries would violate idempotency and create race conditions
|
||||
@@ -82,9 +87,13 @@ export async function sendMessageStream(
|
||||
}
|
||||
|
||||
const resolvedConversationId = conversationId;
|
||||
const normalizedMessages = normalizeOutgoingApprovalMessages(
|
||||
messages,
|
||||
opts.approvalNormalization,
|
||||
);
|
||||
|
||||
const requestBody = {
|
||||
messages,
|
||||
messages: normalizedMessages,
|
||||
streaming: true,
|
||||
stream_tokens: opts.streamTokens ?? true,
|
||||
background: opts.background ?? true,
|
||||
|
||||
103
src/tests/agent/approval-result-normalization.test.ts
Normal file
103
src/tests/agent/approval-result-normalization.test.ts
Normal file
@@ -0,0 +1,103 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import type { ApprovalResult } from "../../agent/approval-execution";
|
||||
import {
|
||||
normalizeApprovalResultsForPersistence,
|
||||
normalizeOutgoingApprovalMessages,
|
||||
} from "../../agent/approval-result-normalization";
|
||||
import { INTERRUPTED_BY_USER } from "../../constants";
|
||||
|
||||
describe("normalizeApprovalResultsForPersistence", () => {
|
||||
test("forces status=error for structured interrupted tool_call_ids", () => {
|
||||
const approvals: ApprovalResult[] = [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-1",
|
||||
tool_return: "some return",
|
||||
status: "success",
|
||||
} as ApprovalResult,
|
||||
];
|
||||
|
||||
const normalized = normalizeApprovalResultsForPersistence(approvals, {
|
||||
interruptedToolCallIds: ["call-1"],
|
||||
});
|
||||
|
||||
expect(normalized[0]).toMatchObject({
|
||||
type: "tool",
|
||||
tool_call_id: "call-1",
|
||||
status: "error",
|
||||
});
|
||||
});
|
||||
|
||||
test("does not modify non-interrupted tool results", () => {
|
||||
const approvals: ApprovalResult[] = [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-2",
|
||||
tool_return: "ok",
|
||||
status: "success",
|
||||
} as ApprovalResult,
|
||||
];
|
||||
|
||||
const normalized = normalizeApprovalResultsForPersistence(approvals, {
|
||||
interruptedToolCallIds: ["other-id"],
|
||||
});
|
||||
|
||||
expect(normalized[0]).toMatchObject({
|
||||
type: "tool",
|
||||
tool_call_id: "call-2",
|
||||
status: "success",
|
||||
});
|
||||
});
|
||||
|
||||
test("supports legacy fallback on interrupt text when explicitly enabled", () => {
|
||||
const approvals: ApprovalResult[] = [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-3",
|
||||
tool_return: [{ type: "text", text: INTERRUPTED_BY_USER }],
|
||||
status: "success",
|
||||
} as ApprovalResult,
|
||||
];
|
||||
|
||||
const normalized = normalizeApprovalResultsForPersistence(approvals, {
|
||||
allowInterruptTextFallback: true,
|
||||
});
|
||||
|
||||
expect(normalized[0]).toMatchObject({
|
||||
type: "tool",
|
||||
tool_call_id: "call-3",
|
||||
status: "error",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("normalizeOutgoingApprovalMessages", () => {
|
||||
test("normalizes approvals and preserves non-approval messages", () => {
|
||||
const approvalMessage: ApprovalCreate = {
|
||||
type: "approval",
|
||||
approvals: [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-7",
|
||||
tool_return: "foo",
|
||||
status: "success",
|
||||
} as ApprovalResult,
|
||||
],
|
||||
};
|
||||
|
||||
const messages = normalizeOutgoingApprovalMessages(
|
||||
[{ role: "user", content: "hello" }, approvalMessage],
|
||||
{ interruptedToolCallIds: ["call-7"] },
|
||||
);
|
||||
|
||||
expect(messages[0]).toMatchObject({ role: "user", content: "hello" });
|
||||
const normalizedApproval = messages[1] as ApprovalCreate;
|
||||
const approvals = normalizedApproval.approvals ?? [];
|
||||
expect(approvals[0]).toMatchObject({
|
||||
type: "tool",
|
||||
tool_call_id: "call-7",
|
||||
status: "error",
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -640,3 +640,44 @@ describe("listen-client post-stop approval recovery policy", () => {
|
||||
expect(shouldRecover).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("listen-client tool_return wire normalization", () => {
|
||||
test("normalizes legacy top-level tool return fields to canonical tool_returns[]", () => {
|
||||
const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({
|
||||
message_type: "tool_return_message",
|
||||
id: "message-1",
|
||||
run_id: "run-1",
|
||||
tool_call_id: "call-1",
|
||||
status: "error",
|
||||
tool_return: [{ type: "text", text: "Interrupted by user" }],
|
||||
});
|
||||
|
||||
expect(normalized).toEqual({
|
||||
message_type: "tool_return_message",
|
||||
id: "message-1",
|
||||
run_id: "run-1",
|
||||
tool_returns: [
|
||||
{
|
||||
tool_call_id: "call-1",
|
||||
status: "error",
|
||||
tool_return: "Interrupted by user",
|
||||
},
|
||||
],
|
||||
});
|
||||
expect(normalized).not.toHaveProperty("tool_call_id");
|
||||
expect(normalized).not.toHaveProperty("status");
|
||||
expect(normalized).not.toHaveProperty("tool_return");
|
||||
});
|
||||
|
||||
test("returns null for tool_return_message when no canonical status is available", () => {
|
||||
const normalized = __listenClientTestUtils.normalizeToolReturnWireMessage({
|
||||
message_type: "tool_return_message",
|
||||
id: "message-2",
|
||||
run_id: "run-2",
|
||||
tool_call_id: "call-2",
|
||||
tool_return: "maybe done",
|
||||
});
|
||||
|
||||
expect(normalized).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -178,6 +178,29 @@ describe("extractInterruptToolReturns", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
test("converts multimodal tool_return content into displayable text", () => {
|
||||
const results: ApprovalResult[] = [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-multimodal",
|
||||
status: "error",
|
||||
tool_return: [
|
||||
{ type: "text", text: "Interrupted by user" },
|
||||
{ type: "image", image_url: "https://example.com/image.png" },
|
||||
],
|
||||
} as ApprovalResult,
|
||||
];
|
||||
|
||||
const mapped = extractInterruptToolReturns(results);
|
||||
expect(mapped).toEqual([
|
||||
{
|
||||
tool_call_id: "call-multimodal",
|
||||
status: "error",
|
||||
tool_return: "Interrupted by user",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
test("emitInterruptToolReturnMessage emits deterministic per-tool terminal messages", () => {
|
||||
const runtime = createRuntime();
|
||||
const socket = new MockSocket(WebSocket.OPEN) as unknown as WebSocket;
|
||||
@@ -208,16 +231,26 @@ describe("extractInterruptToolReturns", () => {
|
||||
expect(toolReturnFrames).toHaveLength(2);
|
||||
expect(toolReturnFrames[0]).toMatchObject({
|
||||
run_id: "run-1",
|
||||
tool_call_id: "call-a",
|
||||
status: "success",
|
||||
tool_returns: [{ tool_call_id: "call-a", status: "success" }],
|
||||
tool_returns: [
|
||||
{ tool_call_id: "call-a", status: "success", tool_return: "704" },
|
||||
],
|
||||
});
|
||||
expect(toolReturnFrames[1]).toMatchObject({
|
||||
run_id: "run-1",
|
||||
tool_returns: [
|
||||
{
|
||||
tool_call_id: "call-b",
|
||||
status: "error",
|
||||
tool_returns: [{ tool_call_id: "call-b", status: "error" }],
|
||||
tool_return: "User interrupted the stream",
|
||||
},
|
||||
],
|
||||
});
|
||||
expect(toolReturnFrames[0]).not.toHaveProperty("tool_call_id");
|
||||
expect(toolReturnFrames[0]).not.toHaveProperty("status");
|
||||
expect(toolReturnFrames[0]).not.toHaveProperty("tool_return");
|
||||
expect(toolReturnFrames[1]).not.toHaveProperty("tool_call_id");
|
||||
expect(toolReturnFrames[1]).not.toHaveProperty("status");
|
||||
expect(toolReturnFrames[1]).not.toHaveProperty("tool_return");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -305,13 +338,14 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
|
||||
// Cancel fires: populateInterruptQueue (Path A — has execution results)
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: executionResults,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: ["call-1", "call-2"],
|
||||
agentId,
|
||||
conversationId,
|
||||
});
|
||||
|
||||
expect(populated).toBe(true);
|
||||
expect(runtime.pendingInterruptedResults).toBe(executionResults);
|
||||
expect(runtime.pendingInterruptedResults).toEqual(executionResults);
|
||||
expect(runtime.pendingInterruptedContext).toMatchObject({
|
||||
agentId,
|
||||
conversationId,
|
||||
@@ -323,7 +357,7 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
|
||||
|
||||
expect(consumed).not.toBeNull();
|
||||
expect(consumed?.type).toBe("approval");
|
||||
expect(consumed?.approvals).toBe(executionResults);
|
||||
expect(consumed?.approvals).toEqual(executionResults);
|
||||
expect(consumed?.approvals).toHaveLength(2);
|
||||
|
||||
// Queue is atomically cleared after consumption
|
||||
@@ -342,6 +376,7 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
|
||||
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: executionResults,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: ["call-1"],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -353,9 +388,85 @@ describe("Path A: cancel during tool execution → next turn consumes actual res
|
||||
approve: true, // Path A preserves actual approval state
|
||||
});
|
||||
});
|
||||
|
||||
test("normalizes interrupted tool results to error via structured tool_call_id", () => {
|
||||
const runtime = createRuntime();
|
||||
const executionResults: ApprovalResult[] = [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-1",
|
||||
status: "success",
|
||||
tool_return: "result text does not matter when ID is interrupted",
|
||||
} as unknown as ApprovalResult,
|
||||
];
|
||||
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: executionResults,
|
||||
lastExecutingToolCallIds: ["call-1"],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
});
|
||||
|
||||
expect(populated).toBe(true);
|
||||
expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({
|
||||
type: "tool",
|
||||
tool_call_id: "call-1",
|
||||
status: "error",
|
||||
});
|
||||
});
|
||||
|
||||
test("keeps legacy text fallback for interrupted tool return normalization", () => {
|
||||
const runtime = createRuntime();
|
||||
const executionResults: ApprovalResult[] = [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-legacy",
|
||||
status: "success",
|
||||
tool_return: [{ type: "text", text: "Interrupted by user" }],
|
||||
} as unknown as ApprovalResult,
|
||||
];
|
||||
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: executionResults,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
});
|
||||
|
||||
expect(populated).toBe(true);
|
||||
expect(runtime.pendingInterruptedResults?.[0]).toMatchObject({
|
||||
type: "tool",
|
||||
tool_call_id: "call-legacy",
|
||||
status: "error",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("Path B: cancel during approval wait → next turn consumes synthesized denials", () => {
|
||||
test("prefers synthesized tool-error results when execution was already in-flight", () => {
|
||||
const runtime = createRuntime();
|
||||
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: null,
|
||||
lastExecutingToolCallIds: ["call-running-1"],
|
||||
lastNeedsUserInputToolCallIds: ["call-running-1"],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
});
|
||||
|
||||
expect(populated).toBe(true);
|
||||
expect(runtime.pendingInterruptedResults).toEqual([
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: "call-running-1",
|
||||
tool_return: "Interrupted by user",
|
||||
status: "error",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
test("full sequence: populate from batch map IDs → consume synthesized denials", () => {
|
||||
const runtime = createRuntime();
|
||||
const agentId = "agent-abc";
|
||||
@@ -371,6 +482,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
|
||||
// Cancel fires during approval wait: no execution results
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: null,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId,
|
||||
conversationId,
|
||||
@@ -409,6 +521,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
|
||||
// No batch map entries, but we have the snapshot IDs
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: null,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: ["call-a", "call-b"],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -427,6 +540,7 @@ describe("Path B: cancel during approval wait → next turn consumes synthesized
|
||||
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: null,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -453,6 +567,7 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", (
|
||||
reason: "cancelled",
|
||||
},
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId,
|
||||
conversationId: convId,
|
||||
@@ -476,6 +591,7 @@ describe("post-cancel next turn: queue consumed exactly once (no error loop)", (
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId,
|
||||
conversationId: convId,
|
||||
@@ -496,6 +612,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-first", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -511,6 +628,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
|
||||
reason: "x",
|
||||
},
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -530,6 +648,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -543,6 +662,7 @@ describe("idempotency: first cancel populates, second is no-op", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-2", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -564,6 +684,7 @@ describe("epoch guard: stale context discarded on consume", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -587,6 +708,7 @@ describe("epoch guard: stale context discarded on consume", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-old",
|
||||
conversationId: "conv-1",
|
||||
@@ -605,6 +727,7 @@ describe("epoch guard: stale context discarded on consume", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-old",
|
||||
@@ -623,6 +746,7 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial",
|
||||
// Also batch map should be cleared by clearPendingApprovalBatchIds
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: null,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [], // cleared after send
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -640,6 +764,7 @@ describe("stale Path-B IDs: clearing after successful send prevents re-denial",
|
||||
|
||||
const populated = populateInterruptQueue(runtime, {
|
||||
lastExecutionResults: null,
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [], // cleared from previous send
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -716,6 +841,7 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-1",
|
||||
conversationId: "conv-1",
|
||||
@@ -734,6 +860,7 @@ describe("consume clears pendingApprovalBatchByToolCallId", () => {
|
||||
lastExecutionResults: [
|
||||
{ type: "approval", tool_call_id: "call-1", approve: true },
|
||||
],
|
||||
lastExecutingToolCallIds: [],
|
||||
lastNeedsUserInputToolCallIds: [],
|
||||
agentId: "agent-old",
|
||||
conversationId: "conv-old",
|
||||
|
||||
@@ -18,6 +18,7 @@ import {
|
||||
executeApprovalBatch,
|
||||
} from "../agent/approval-execution";
|
||||
import { fetchRunErrorDetail } from "../agent/approval-recovery";
|
||||
import { normalizeApprovalResultsForPersistence } from "../agent/approval-result-normalization";
|
||||
import { getResumeData } from "../agent/check-approval";
|
||||
import { getClient } from "../agent/client";
|
||||
import { getStreamToolContextId, sendMessageStream } from "../agent/message";
|
||||
@@ -34,6 +35,7 @@ import { createBuffers } from "../cli/helpers/accumulator";
|
||||
import { classifyApprovals } from "../cli/helpers/approvalClassification";
|
||||
import { generatePlanFilePath } from "../cli/helpers/planName";
|
||||
import { drainStreamWithResume } from "../cli/helpers/stream";
|
||||
import { INTERRUPTED_BY_USER } from "../constants";
|
||||
import { computeDiffPreviews } from "../helpers/diffPreview";
|
||||
import { permissionMode } from "../permissions/mode";
|
||||
import { type QueueItem, QueueRuntime } from "../queue/queueRuntime";
|
||||
@@ -907,6 +909,7 @@ function shouldAttemptPostStopApprovalRecovery(params: {
|
||||
|
||||
interface InterruptPopulateInput {
|
||||
lastExecutionResults: ApprovalResult[] | null;
|
||||
lastExecutingToolCallIds: string[];
|
||||
lastNeedsUserInputToolCallIds: string[];
|
||||
agentId: string;
|
||||
conversationId: string;
|
||||
@@ -920,10 +923,48 @@ interface InterruptToolReturn {
|
||||
stderr?: string[];
|
||||
}
|
||||
|
||||
function asToolReturnStatus(value: unknown): "success" | "error" | null {
|
||||
if (value === "success" || value === "error") {
|
||||
return value;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function normalizeToolReturnValue(value: unknown): string {
|
||||
if (typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
const textParts = value
|
||||
.filter(
|
||||
(
|
||||
part,
|
||||
): part is {
|
||||
type: string;
|
||||
text: string;
|
||||
} =>
|
||||
!!part &&
|
||||
typeof part === "object" &&
|
||||
"type" in part &&
|
||||
part.type === "text" &&
|
||||
"text" in part &&
|
||||
typeof part.text === "string",
|
||||
)
|
||||
.map((part) => part.text);
|
||||
if (textParts.length > 0) {
|
||||
return textParts.join("\n");
|
||||
}
|
||||
}
|
||||
if (
|
||||
value &&
|
||||
typeof value === "object" &&
|
||||
"type" in value &&
|
||||
value.type === "text" &&
|
||||
"text" in value &&
|
||||
typeof value.text === "string"
|
||||
) {
|
||||
return value.text;
|
||||
}
|
||||
if (value === null || value === undefined) {
|
||||
return "";
|
||||
}
|
||||
@@ -934,6 +975,116 @@ function normalizeToolReturnValue(value: unknown): string {
|
||||
}
|
||||
}
|
||||
|
||||
function normalizeInterruptedApprovalsForQueue(
|
||||
approvals: ApprovalResult[] | null,
|
||||
interruptedToolCallIds: string[],
|
||||
): ApprovalResult[] | null {
|
||||
if (!approvals || approvals.length === 0) {
|
||||
return approvals;
|
||||
}
|
||||
|
||||
return normalizeApprovalResultsForPersistence(approvals, {
|
||||
interruptedToolCallIds,
|
||||
// Temporary fallback guard while all producers migrate to structured IDs.
|
||||
allowInterruptTextFallback: true,
|
||||
});
|
||||
}
|
||||
|
||||
function extractCanonicalToolReturnsFromWire(
|
||||
payload: Record<string, unknown>,
|
||||
): InterruptToolReturn[] {
|
||||
const fromArray: InterruptToolReturn[] = [];
|
||||
const toolReturnsValue = payload.tool_returns;
|
||||
if (Array.isArray(toolReturnsValue)) {
|
||||
for (const raw of toolReturnsValue) {
|
||||
if (!raw || typeof raw !== "object") {
|
||||
continue;
|
||||
}
|
||||
const rec = raw as Record<string, unknown>;
|
||||
const toolCallId =
|
||||
typeof rec.tool_call_id === "string" ? rec.tool_call_id : null;
|
||||
const status = asToolReturnStatus(rec.status);
|
||||
if (!toolCallId || !status) {
|
||||
continue;
|
||||
}
|
||||
const stdout = Array.isArray(rec.stdout)
|
||||
? rec.stdout.filter(
|
||||
(entry): entry is string => typeof entry === "string",
|
||||
)
|
||||
: undefined;
|
||||
const stderr = Array.isArray(rec.stderr)
|
||||
? rec.stderr.filter(
|
||||
(entry): entry is string => typeof entry === "string",
|
||||
)
|
||||
: undefined;
|
||||
fromArray.push({
|
||||
tool_call_id: toolCallId,
|
||||
status,
|
||||
tool_return: normalizeToolReturnValue(rec.tool_return),
|
||||
...(stdout ? { stdout } : {}),
|
||||
...(stderr ? { stderr } : {}),
|
||||
});
|
||||
}
|
||||
}
|
||||
if (fromArray.length > 0) {
|
||||
return fromArray;
|
||||
}
|
||||
|
||||
const topLevelToolCallId =
|
||||
typeof payload.tool_call_id === "string" ? payload.tool_call_id : null;
|
||||
const topLevelStatus = asToolReturnStatus(payload.status);
|
||||
if (!topLevelToolCallId || !topLevelStatus) {
|
||||
return [];
|
||||
}
|
||||
const stdout = Array.isArray(payload.stdout)
|
||||
? payload.stdout.filter(
|
||||
(entry): entry is string => typeof entry === "string",
|
||||
)
|
||||
: undefined;
|
||||
const stderr = Array.isArray(payload.stderr)
|
||||
? payload.stderr.filter(
|
||||
(entry): entry is string => typeof entry === "string",
|
||||
)
|
||||
: undefined;
|
||||
return [
|
||||
{
|
||||
tool_call_id: topLevelToolCallId,
|
||||
status: topLevelStatus,
|
||||
tool_return: normalizeToolReturnValue(payload.tool_return),
|
||||
...(stdout ? { stdout } : {}),
|
||||
...(stderr ? { stderr } : {}),
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
function normalizeToolReturnWireMessage(
|
||||
chunk: Record<string, unknown>,
|
||||
): Record<string, unknown> | null {
|
||||
if (chunk.message_type !== "tool_return_message") {
|
||||
return chunk;
|
||||
}
|
||||
|
||||
const canonicalToolReturns = extractCanonicalToolReturnsFromWire(chunk);
|
||||
if (canonicalToolReturns.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const {
|
||||
tool_call_id: _toolCallId,
|
||||
status: _status,
|
||||
tool_return: _toolReturn,
|
||||
stdout: _stdout,
|
||||
stderr: _stderr,
|
||||
...rest
|
||||
} = chunk;
|
||||
|
||||
return {
|
||||
...rest,
|
||||
message_type: "tool_return_message",
|
||||
tool_returns: canonicalToolReturns,
|
||||
};
|
||||
}
|
||||
|
||||
function extractInterruptToolReturns(
|
||||
approvals: ApprovalResult[] | null,
|
||||
): InterruptToolReturn[] {
|
||||
@@ -1030,12 +1181,15 @@ function emitInterruptToolReturnMessage(
|
||||
id: `message-${crypto.randomUUID()}`,
|
||||
date: new Date().toISOString(),
|
||||
run_id: resolvedRunId,
|
||||
tool_returns: [
|
||||
{
|
||||
tool_call_id: toolReturn.tool_call_id,
|
||||
tool_return: toolReturn.tool_return,
|
||||
status: toolReturn.status,
|
||||
tool_return: toolReturn.tool_return,
|
||||
...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}),
|
||||
...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}),
|
||||
tool_returns: [toolReturn],
|
||||
},
|
||||
],
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `${uuidPrefix}-${crypto.randomUUID()}`,
|
||||
} as unknown as MessageWire);
|
||||
@@ -1092,7 +1246,31 @@ function populateInterruptQueue(
|
||||
|
||||
if (input.lastExecutionResults && input.lastExecutionResults.length > 0) {
|
||||
// Path A: execution happened before cancel — queue actual results
|
||||
runtime.pendingInterruptedResults = input.lastExecutionResults;
|
||||
// Guard parity: interrupted tool returns must persist as status=error.
|
||||
runtime.pendingInterruptedResults = normalizeInterruptedApprovalsForQueue(
|
||||
input.lastExecutionResults,
|
||||
input.lastExecutingToolCallIds,
|
||||
);
|
||||
runtime.pendingInterruptedContext = {
|
||||
agentId: input.agentId,
|
||||
conversationId: input.conversationId,
|
||||
continuationEpoch: runtime.continuationEpoch,
|
||||
};
|
||||
return true;
|
||||
}
|
||||
|
||||
// Path A.5: execution was in-flight (approved tools started) but no
|
||||
// terminal results were captured before cancel. Match App/headless parity by
|
||||
// queuing explicit tool errors, not synthetic approval denials.
|
||||
if (input.lastExecutingToolCallIds.length > 0) {
|
||||
runtime.pendingInterruptedResults = input.lastExecutingToolCallIds.map(
|
||||
(toolCallId) => ({
|
||||
type: "tool" as const,
|
||||
tool_call_id: toolCallId,
|
||||
tool_return: INTERRUPTED_BY_USER,
|
||||
status: "error" as const,
|
||||
}),
|
||||
);
|
||||
runtime.pendingInterruptedContext = {
|
||||
agentId: input.agentId,
|
||||
conversationId: input.conversationId,
|
||||
@@ -2288,6 +2466,7 @@ async function handleIncomingMessage(
|
||||
// Track last approval-loop state for cancel-time queueing (Phase 1.2).
|
||||
// Hoisted before try so the cancel catch block can access them.
|
||||
let lastExecutionResults: ApprovalResult[] | null = null;
|
||||
let lastExecutingToolCallIds: string[] = [];
|
||||
let lastNeedsUserInputToolCallIds: string[] = [];
|
||||
|
||||
runtime.isProcessing = true;
|
||||
@@ -2477,12 +2656,18 @@ async function handleIncomingMessage(
|
||||
otid?: string;
|
||||
id?: string;
|
||||
};
|
||||
const normalizedChunk = normalizeToolReturnWireMessage(
|
||||
chunk as unknown as Record<string, unknown>,
|
||||
);
|
||||
if (normalizedChunk) {
|
||||
emitToWS(socket, {
|
||||
...chunk,
|
||||
...normalizedChunk,
|
||||
type: "message",
|
||||
session_id: runtime.sessionId,
|
||||
uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(),
|
||||
} as MessageWire);
|
||||
uuid:
|
||||
chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(),
|
||||
} as unknown as MessageWire);
|
||||
}
|
||||
}
|
||||
|
||||
return undefined;
|
||||
@@ -2875,6 +3060,15 @@ async function handleIncomingMessage(
|
||||
}
|
||||
}
|
||||
|
||||
// Snapshot executing tool_call_ids before execution starts so cancel can
|
||||
// preserve tool-error parity even if execution aborts mid-await.
|
||||
lastExecutingToolCallIds = decisions
|
||||
.filter(
|
||||
(decision): decision is Extract<Decision, { type: "approve" }> =>
|
||||
decision.type === "approve",
|
||||
)
|
||||
.map((decision) => decision.approval.toolCallId);
|
||||
|
||||
// Execute approved/denied tools
|
||||
const executionResults = await executeApprovalBatch(
|
||||
decisions,
|
||||
@@ -2922,6 +3116,7 @@ async function handleIncomingMessage(
|
||||
// cancel during the subsequent stream drain won't queue already-sent
|
||||
// results (Path A) or re-deny already-resolved tool calls (Path B).
|
||||
lastExecutionResults = null;
|
||||
lastExecutingToolCallIds = [];
|
||||
lastNeedsUserInputToolCallIds = [];
|
||||
|
||||
turnToolContextId = getStreamToolContextId(
|
||||
@@ -2933,6 +3128,7 @@ async function handleIncomingMessage(
|
||||
// Queue interrupted tool-call resolutions for the next message turn.
|
||||
populateInterruptQueue(runtime, {
|
||||
lastExecutionResults,
|
||||
lastExecutingToolCallIds,
|
||||
lastNeedsUserInputToolCallIds,
|
||||
agentId: agentId || "",
|
||||
conversationId,
|
||||
@@ -3078,5 +3274,6 @@ export const __listenClientTestUtils = {
|
||||
extractInterruptToolReturns,
|
||||
emitInterruptToolReturnMessage,
|
||||
getInterruptApprovalsForEmission,
|
||||
normalizeToolReturnWireMessage,
|
||||
shouldAttemptPostStopApprovalRecovery,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user