refactor: remove dual approval paths from listen mode (#1305)

This commit is contained in:
Charles Packer
2026-03-09 12:36:42 -07:00
committed by GitHub
parent cbcce1d4f5
commit 28039dcb43
2 changed files with 120 additions and 392 deletions

View File

@@ -241,23 +241,11 @@ describe("listen-client requestApprovalOverWS", () => {
});
});
describe("listen-client controlResponseCapable latch", () => {
test("runtime initializes with controlResponseCapable = false", () => {
describe("listen-client state_response control protocol", () => {
test("always advertises control_response capability", () => {
const runtime = __listenClientTestUtils.createRuntime();
expect(runtime.controlResponseCapable).toBe(false);
});
test("latch stays true after being set once", () => {
const runtime = __listenClientTestUtils.createRuntime();
expect(runtime.controlResponseCapable).toBe(false);
runtime.controlResponseCapable = true;
expect(runtime.controlResponseCapable).toBe(true);
// Simulates second message without the flag — latch should persist
// (actual latching happens in handleIncomingMessage, but the runtime
// field itself should hold the value)
expect(runtime.controlResponseCapable).toBe(true);
const snapshot = __listenClientTestUtils.buildStateResponse(runtime, 1);
expect(snapshot.control_response_capable).toBe(true);
});
});

View File

@@ -9,11 +9,9 @@ import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agen
import type {
ApprovalCreate,
LettaStreamingResponse,
ToolReturn,
} from "@letta-ai/letta-client/resources/agents/messages";
import WebSocket from "ws";
import {
type ApprovalDecision,
type ApprovalResult,
executeApprovalBatch,
} from "../agent/approval-execution";
@@ -111,16 +109,6 @@ interface IncomingMessage {
messages: Array<
(MessageCreate & { client_message_id?: string }) | ApprovalCreate
>;
/** Cloud sets this when it supports can_use_tool / control_response protocol. */
supportsControlResponse?: boolean;
}
interface ResultMessage {
type: "result";
success: boolean;
stopReason?: string;
event_seq?: number;
session_id?: string;
}
interface RunStartedMessage {
@@ -245,7 +233,6 @@ type ServerMessage =
| WsControlResponse;
type ClientMessage =
| PingMessage
| ResultMessage
| RunStartedMessage
| RunRequestErrorMessage
| ModeChangedMessage
@@ -266,8 +253,6 @@ type ListenerRuntime = {
hasSuccessfulConnection: boolean;
messageQueue: Promise<void>;
pendingApprovalResolvers: Map<string, PendingApprovalResolver>;
/** Latched once supportsControlResponse is seen on any message. */
controlResponseCapable: boolean;
/** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */
sessionId: string;
/** Monotonic event sequence for all outbound status/protocol events. */
@@ -326,10 +311,6 @@ type ListenerRuntime = {
pendingInterruptedToolCallIds: string[] | null;
};
type ApprovalSlot =
| { type: "result"; value: ApprovalResult }
| { type: "decision" };
// Listen mode supports one active connection per process.
let activeRuntime: ListenerRuntime | null = null;
@@ -384,7 +365,6 @@ function createRuntime(): ListenerRuntime {
hasSuccessfulConnection: false,
messageQueue: Promise.resolve(),
pendingApprovalResolvers: new Map(),
controlResponseCapable: false,
sessionId: `listen-${crypto.randomUUID()}`,
eventSeqCounter: 0,
lastStopReason: null,
@@ -745,7 +725,7 @@ function buildStateResponse(
mode: permissionMode.getMode(),
is_processing: runtime.isProcessing,
last_stop_reason: runtime.lastStopReason,
control_response_capable: runtime.controlResponseCapable,
control_response_capable: true,
active_run: {
run_id: runtime.activeRunId,
agent_id: runtime.activeAgentId,
@@ -792,6 +772,36 @@ function emitCancelAck(
} as CancelAckMessage);
}
function emitTurnResult(
socket: WebSocket,
runtime: ListenerRuntime,
params: {
subtype: ProtocolResultMessage["subtype"];
agentId: string;
conversationId: string;
durationMs: number;
numTurns: number;
runIds: string[];
stopReason?: StopReasonType;
},
): void {
emitToWS(socket, {
type: "result",
subtype: params.subtype,
agent_id: params.agentId,
conversation_id: params.conversationId,
duration_ms: params.durationMs,
duration_api_ms: 0,
num_turns: params.numTurns,
result: null,
run_ids: params.runIds,
usage: null,
...(params.stopReason ? { stop_reason: params.stopReason } : {}),
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
}
function sendClientMessage(
socket: WebSocket,
payload: ClientMessage,
@@ -1701,89 +1711,6 @@ export function requestApprovalOverWS(
});
}
function buildApprovalExecutionPlan(
approvalMessage: ApprovalCreate,
pendingApprovals: Array<{
toolCallId: string;
toolName: string;
toolArgs: string;
}>,
): {
slots: ApprovalSlot[];
decisions: ApprovalDecision[];
} {
const pendingByToolCallId = new Map(
pendingApprovals.map((approval) => [approval.toolCallId, approval]),
);
const slots: ApprovalSlot[] = [];
const decisions: ApprovalDecision[] = [];
for (const approval of approvalMessage.approvals ?? []) {
if (approval.type === "tool") {
slots.push({ type: "result", value: approval as ToolReturn });
continue;
}
if (approval.type !== "approval") {
slots.push({
type: "result",
value: {
type: "tool",
tool_call_id: "unknown",
tool_return: "Error: Unsupported approval payload",
status: "error",
},
});
continue;
}
const pending = pendingByToolCallId.get(approval.tool_call_id);
if (approval.approve) {
if (!pending) {
slots.push({
type: "result",
value: {
type: "tool",
tool_call_id: approval.tool_call_id,
tool_return: "Error: Pending approval not found",
status: "error",
},
});
continue;
}
decisions.push({
type: "approve",
approval: {
toolCallId: pending.toolCallId,
toolName: pending.toolName,
toolArgs: pending.toolArgs || "{}",
},
});
slots.push({ type: "decision" });
continue;
}
decisions.push({
type: "deny",
approval: {
toolCallId: approval.tool_call_id,
toolName: pending?.toolName ?? "",
toolArgs: pending?.toolArgs ?? "{}",
},
reason:
typeof approval.reason === "string" && approval.reason.length > 0
? approval.reason
: "Tool execution denied",
});
slots.push({ type: "decision" });
}
return { slots, decisions };
}
async function recoverPendingApprovals(
runtime: ListenerRuntime,
socket: WebSocket,
@@ -1910,11 +1837,6 @@ async function recoverPendingApprovals(
];
if (needsUserInput.length > 0) {
if (!runtime.controlResponseCapable) {
runtime.lastStopReason = "requires_approval";
return;
}
// Reflect approval-wait state in runtime snapshot while control
// requests are pending, so state_response queries see
// requires_approval even during the WS round-trip.
@@ -2015,7 +1937,6 @@ async function recoverPendingApprovals(
approvals: executionResults,
},
],
supportsControlResponse: runtime.controlResponseCapable,
},
socket,
runtime,
@@ -2319,9 +2240,6 @@ async function connectWithRetry(
return;
}
// Recovery requests are only sent by the modern cloud listener protocol.
runtime.controlResponseCapable = true;
// Serialize recovery with normal message handling to avoid concurrent
// handleIncomingMessage execution when user messages arrive concurrently.
runtime.pendingTurns++;
@@ -2363,9 +2281,23 @@ async function connectWithRetry(
// Handle incoming messages (queued for sequential processing)
if (parsed.type === "message") {
// Queue lifecycle tracking: only enqueue if first payload is a
// MessageCreate (has `content`). ApprovalCreate payloads (legacy
// approval path) do not represent user-initiated messages.
const hasApprovalPayload = parsed.messages.some(
(payload): payload is ApprovalCreate =>
"type" in payload && payload.type === "approval",
);
if (hasApprovalPayload) {
emitToWS(socket, {
type: "error",
message:
"Protocol violation: device websocket no longer accepts approval payloads inside message frames. Send control_response instead.",
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
return;
}
// Queue lifecycle tracking: only enqueue user MessageCreate payloads.
const firstPayload = parsed.messages.at(0);
const isUserMessage =
firstPayload !== undefined && "content" in firstPayload;
@@ -2585,11 +2517,6 @@ async function handleIncomingMessage(
runtime.activeExecutingToolCallIds = [];
try {
// Latch capability: once seen, always use blocking path (strict check to avoid truthy strings)
if (msg.supportsControlResponse === true) {
runtime.controlResponseCapable = true;
}
if (!agentId) {
runtime.isProcessing = false;
clearActiveRunState(runtime);
@@ -2609,7 +2536,6 @@ async function handleIncomingMessage(
let messagesToSend: Array<MessageCreate | ApprovalCreate> = [];
let turnToolContextId: string | null = null;
let queuedInterruptedToolCallIds: string[] = [];
let shouldClearSubmittedApprovalTracking = false;
// Prepend queued interrupted results from a prior cancelled turn.
const consumed = consumeInterruptQueue(
@@ -2624,100 +2550,6 @@ async function handleIncomingMessage(
messagesToSend.push(...msg.messages);
const firstMessage = msg.messages[0];
const isApprovalMessage =
firstMessage &&
"type" in firstMessage &&
firstMessage.type === "approval" &&
"approvals" in firstMessage;
if (isApprovalMessage) {
if (runtime.controlResponseCapable && process.env.DEBUG) {
console.warn(
"[Listen] Protocol violation: controlResponseCapable is latched but received legacy ApprovalCreate message. " +
"The cloud should send control_response instead. This may cause the current turn to stall.",
);
}
const approvalMessage = firstMessage as ApprovalCreate;
const client = await getClient();
const agent = await client.agents.retrieve(agentId);
const resumeData = await getResumeData(
client,
agent,
requestedConversationId,
);
const { slots, decisions } = buildApprovalExecutionPlan(
approvalMessage,
resumeData.pendingApprovals,
);
lastExecutingToolCallIds = decisions
.filter(
(
decision,
): decision is Extract<ApprovalDecision, { type: "approve" }> =>
decision.type === "approve",
)
.map((decision) => decision.approval.toolCallId);
runtime.activeExecutingToolCallIds = [...lastExecutingToolCallIds];
const decisionResults =
decisions.length > 0
? await executeApprovalBatch(decisions, undefined, {
toolContextId: turnToolContextId ?? undefined,
abortSignal: runtime.activeAbortController.signal,
})
: [];
const persistedDecisionResults =
normalizeExecutionResultsForInterruptParity(
runtime,
decisionResults,
lastExecutingToolCallIds,
);
const rebuiltApprovals: ApprovalResult[] = [];
let decisionResultIndex = 0;
for (const slot of slots) {
if (slot.type === "result") {
rebuiltApprovals.push(slot.value);
continue;
}
const next = persistedDecisionResults[decisionResultIndex];
if (next) {
rebuiltApprovals.push(next);
decisionResultIndex++;
continue;
}
rebuiltApprovals.push({
type: "tool",
tool_call_id: "unknown",
tool_return: "Error: Missing approval execution result",
status: "error",
});
}
lastExecutionResults = rebuiltApprovals;
shouldClearSubmittedApprovalTracking = true;
messagesToSend = [
{
type: "approval",
approvals: rebuiltApprovals,
},
];
// Emit terminal tool outcomes immediately so WS consumers can close
// tool-call UI state without waiting for follow-up hydration.
emitInterruptToolReturnMessage(
socket,
runtime,
rebuiltApprovals,
runtime.activeRunId ?? undefined,
"tool-return",
);
}
let currentInput = messagesToSend;
const sendOptions: Parameters<typeof sendMessageStream>[2] = {
agentId,
@@ -2740,12 +2572,6 @@ async function handleIncomingMessage(
runtime,
runtime.activeAbortController.signal,
);
if (shouldClearSubmittedApprovalTracking) {
lastExecutionResults = null;
lastExecutingToolCallIds = [];
lastNeedsUserInputToolCallIds = [];
runtime.activeExecutingToolCallIds = [];
}
turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>,
@@ -2830,28 +2656,14 @@ async function handleIncomingMessage(
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "success",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: true,
stopReason: "end_turn",
});
}
emitTurnResult(socket, runtime, {
subtype: "success",
agentId,
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
});
break;
}
@@ -2861,29 +2673,15 @@ async function handleIncomingMessage(
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "interrupted",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "cancelled",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "cancelled",
});
}
emitTurnResult(socket, runtime, {
subtype: "interrupted",
agentId,
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
stopReason: "cancelled",
});
break;
}
@@ -2961,29 +2759,15 @@ async function handleIncomingMessage(
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "interrupted",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "cancelled",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "cancelled",
});
}
emitTurnResult(socket, runtime, {
subtype: "interrupted",
agentId,
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
stopReason: "cancelled",
});
break;
}
@@ -3002,29 +2786,15 @@ async function handleIncomingMessage(
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "error",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: effectiveStopReason,
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: effectiveStopReason,
});
}
emitTurnResult(socket, runtime, {
subtype: "error",
agentId,
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
stopReason: effectiveStopReason,
});
break;
}
@@ -3035,9 +2805,20 @@ async function handleIncomingMessage(
runtime.isProcessing = false;
clearActiveRunState(runtime);
sendClientMessage(socket, {
type: "result",
success: false,
emitToWS(socket, {
type: "error",
message: "requires_approval stop returned no approvals",
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
emitTurnResult(socket, runtime, {
subtype: "error",
agentId,
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
stopReason: "error",
});
break;
@@ -3119,20 +2900,7 @@ async function handleIncomingMessage(
if (needsUserInput.length > 0) {
runtime.lastStopReason = "requires_approval";
if (!runtime.controlResponseCapable) {
// Legacy path: break out, let cloud re-enter with ApprovalCreate
runtime.isProcessing = false;
clearActiveRunState(runtime);
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "requires_approval",
});
break;
}
// New path: blocking-in-loop via WS control protocol
// Block in-loop via the control protocol for all device approvals.
for (const ac of needsUserInput) {
const requestId = `perm-${ac.approval.toolCallId}`;
const diffs = await computeDiffPreviews(
@@ -3306,29 +3074,15 @@ async function handleIncomingMessage(
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "interrupted",
agent_id: agentId || "",
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "cancelled",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "cancelled",
});
}
emitTurnResult(socket, runtime, {
subtype: "interrupted",
agentId: agentId || "",
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
stopReason: "cancelled",
});
return;
}
@@ -3363,29 +3117,15 @@ async function handleIncomingMessage(
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
});
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "error",
agent_id: agentId || "",
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "error",
});
}
emitTurnResult(socket, runtime, {
subtype: "error",
agentId: agentId || "",
conversationId,
durationMs: performance.now() - msgStartTime,
numTurns: msgTurnCount,
runIds: msgRunIds,
stopReason: "error",
});
if (process.env.DEBUG) {
console.error("[Listen] Error handling message:", error);