fix(ws): remote interrupt recovery for listen-client [LET-7796] (#1272)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-03-04 22:35:17 -08:00
committed by GitHub
parent 0092e6ab76
commit 95a628eaa0
3 changed files with 1501 additions and 15 deletions

View File

@@ -17,6 +17,7 @@ import {
type ApprovalResult,
executeApprovalBatch,
} from "../agent/approval-execution";
import { fetchRunErrorDetail } from "../agent/approval-recovery";
import { getResumeData } from "../agent/check-approval";
import { getClient } from "../agent/client";
import { getStreamToolContextId, sendMessageStream } from "../agent/message";
@@ -278,6 +279,16 @@ type ListenerRuntime = {
* Used to preserve run attachment continuity across approval recovery.
*/
pendingApprovalBatchByToolCallId: Map<string, string>;
/** Queued interrupted tool-call resolutions from a cancelled turn. Prepended to the next user message. */
pendingInterruptedResults: Array<ApprovalResult> | null;
/** Context for pendingInterruptedResults — prevents replay into wrong conversation. */
pendingInterruptedContext: {
agentId: string;
conversationId: string;
continuationEpoch: number;
} | null;
/** Monotonic epoch for queued continuation validity checks. */
continuationEpoch: number;
};
type ApprovalSlot =
@@ -351,6 +362,9 @@ function createRuntime(): ListenerRuntime {
cancelRequested: false,
isRecoveringApprovals: false,
pendingApprovalBatchByToolCallId: new Map<string, string>(),
pendingInterruptedResults: null,
pendingInterruptedContext: null,
continuationEpoch: 0,
coalescedSkipQueueItemIds: new Set<string>(),
pendingTurns: 0,
// queueRuntime assigned below — needs runtime ref in callbacks
@@ -484,6 +498,22 @@ function resolvePendingApprovalBatchId(
return batchIds.values().next().value ?? null;
}
/**
* Resolve the batch ID for pending approval recovery.
* Cold start (empty map): returns a synthetic batch ID.
* Warm (map has entries): delegates to resolvePendingApprovalBatchId,
* returning null for ambiguous/conflicting mappings (fail-closed).
*/
function resolveRecoveryBatchId(
runtime: ListenerRuntime,
pendingApprovals: Array<{ toolCallId: string }>,
): string | null {
if (runtime.pendingApprovalBatchByToolCallId.size === 0) {
return `recovery-${crypto.randomUUID()}`;
}
return resolvePendingApprovalBatchId(runtime, pendingApprovals);
}
function clearPendingApprovalBatchIds(
runtime: ListenerRuntime,
approvals: Array<{ toolCallId: string }>,
@@ -509,6 +539,11 @@ function stopRuntime(
rejectPendingApprovalResolvers(runtime, "Listener runtime stopped");
runtime.pendingApprovalBatchByToolCallId.clear();
// Clear interrupted queue on true teardown to prevent cross-session leakage.
runtime.pendingInterruptedResults = null;
runtime.pendingInterruptedContext = null;
runtime.continuationEpoch++;
if (!runtime.socket) {
return;
}
@@ -815,6 +850,358 @@ function emitToWS(socket: WebSocket, event: WsProtocolEvent): void {
}
const LLM_API_ERROR_MAX_RETRIES = 3;
const MAX_PRE_STREAM_RECOVERY = 2;
// ---------------------------------------------------------------------------
// Interrupt queue helpers — extracted for testability.
// These are the ONLY places that read/write pendingInterruptedResults.
// ---------------------------------------------------------------------------
interface InterruptPopulateInput {
lastExecutionResults: ApprovalResult[] | null;
lastNeedsUserInputToolCallIds: string[];
agentId: string;
conversationId: string;
}
interface InterruptToolReturn {
tool_call_id: string;
status: "success" | "error";
tool_return: string;
stdout?: string[];
stderr?: string[];
}
function normalizeToolReturnValue(value: unknown): string {
if (typeof value === "string") {
return value;
}
if (value === null || value === undefined) {
return "";
}
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
function extractInterruptToolReturns(
approvals: ApprovalResult[] | null,
): InterruptToolReturn[] {
if (!approvals || approvals.length === 0) {
return [];
}
return approvals.flatMap((approval): InterruptToolReturn[] => {
if (!approval || typeof approval !== "object") {
return [];
}
if ("type" in approval && approval.type === "tool") {
const toolCallId =
"tool_call_id" in approval && typeof approval.tool_call_id === "string"
? approval.tool_call_id
: null;
if (!toolCallId) {
return [];
}
const status =
"status" in approval && approval.status === "success"
? "success"
: "error";
const stdout =
"stdout" in approval && Array.isArray(approval.stdout)
? approval.stdout.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined;
const stderr =
"stderr" in approval && Array.isArray(approval.stderr)
? approval.stderr.filter(
(entry): entry is string => typeof entry === "string",
)
: undefined;
return [
{
tool_call_id: toolCallId,
status,
tool_return:
"tool_return" in approval
? normalizeToolReturnValue(approval.tool_return)
: "",
...(stdout ? { stdout } : {}),
...(stderr ? { stderr } : {}),
},
];
}
if ("type" in approval && approval.type === "approval") {
const toolCallId =
"tool_call_id" in approval && typeof approval.tool_call_id === "string"
? approval.tool_call_id
: null;
if (!toolCallId) {
return [];
}
const reason =
"reason" in approval && typeof approval.reason === "string"
? approval.reason
: "User interrupted the stream";
return [
{
tool_call_id: toolCallId,
status: "error",
tool_return: reason,
},
];
}
return [];
});
}
function emitInterruptToolReturnMessage(
socket: WebSocket,
runtime: ListenerRuntime,
approvals: ApprovalResult[] | null,
runId?: string | null,
uuidPrefix: string = "interrupt-tool-return",
): void {
const toolReturns = extractInterruptToolReturns(approvals);
if (toolReturns.length === 0) {
return;
}
const resolvedRunId = runId ?? runtime.activeRunId ?? undefined;
for (const toolReturn of toolReturns) {
emitToWS(socket, {
type: "message",
message_type: "tool_return_message",
id: `message-${crypto.randomUUID()}`,
date: new Date().toISOString(),
run_id: resolvedRunId,
tool_call_id: toolReturn.tool_call_id,
tool_return: toolReturn.tool_return,
status: toolReturn.status,
...(toolReturn.stdout ? { stdout: toolReturn.stdout } : {}),
...(toolReturn.stderr ? { stderr: toolReturn.stderr } : {}),
tool_returns: [toolReturn],
session_id: runtime.sessionId,
uuid: `${uuidPrefix}-${crypto.randomUUID()}`,
} as unknown as MessageWire);
}
}
function getInterruptApprovalsForEmission(
runtime: ListenerRuntime,
params: {
lastExecutionResults: ApprovalResult[] | null;
agentId: string;
conversationId: string;
},
): ApprovalResult[] | null {
if (params.lastExecutionResults && params.lastExecutionResults.length > 0) {
return params.lastExecutionResults;
}
const context = runtime.pendingInterruptedContext;
if (
!context ||
context.agentId !== params.agentId ||
context.conversationId !== params.conversationId ||
context.continuationEpoch !== runtime.continuationEpoch
) {
return null;
}
if (
!runtime.pendingInterruptedResults ||
runtime.pendingInterruptedResults.length === 0
) {
return null;
}
return runtime.pendingInterruptedResults;
}
/**
* Populate the interrupt queue on the runtime after a cancel.
* Returns true if the queue was populated, false if skipped (idempotent).
*
* Path A: execution completed before cancel → queue actual results.
* Path B: no execution yet → synthesize denial results from stable ID sources.
*/
function populateInterruptQueue(
runtime: ListenerRuntime,
input: InterruptPopulateInput,
): boolean {
// Idempotency: preserve first cancel's results if already populated.
const shouldPopulate =
!runtime.pendingInterruptedResults ||
runtime.pendingInterruptedResults.length === 0 ||
!runtime.pendingInterruptedContext;
if (!shouldPopulate) return false;
if (input.lastExecutionResults && input.lastExecutionResults.length > 0) {
// Path A: execution happened before cancel — queue actual results
runtime.pendingInterruptedResults = input.lastExecutionResults;
runtime.pendingInterruptedContext = {
agentId: input.agentId,
conversationId: input.conversationId,
continuationEpoch: runtime.continuationEpoch,
};
return true;
}
// Path B: no execution — synthesize denial results from stable ID sources.
const batchToolCallIds = [...runtime.pendingApprovalBatchByToolCallId.keys()];
const pendingIds =
batchToolCallIds.length > 0
? batchToolCallIds
: input.lastNeedsUserInputToolCallIds;
if (pendingIds.length > 0) {
runtime.pendingInterruptedResults = pendingIds.map((toolCallId) => ({
type: "approval" as const,
tool_call_id: toolCallId,
approve: false,
reason: "User interrupted the stream",
}));
runtime.pendingInterruptedContext = {
agentId: input.agentId,
conversationId: input.conversationId,
continuationEpoch: runtime.continuationEpoch,
};
return true;
}
if (process.env.DEBUG) {
console.warn(
"[Listen] Cancel during approval loop but no tool_call_ids available " +
"for interrupted queue — next turn may hit pre-stream conflict. " +
`batchMap=${runtime.pendingApprovalBatchByToolCallId.size}, ` +
`lastNeedsUserInput=${input.lastNeedsUserInputToolCallIds.length}`,
);
}
return false;
}
/**
* Consume queued interrupted results and return an ApprovalCreate to prepend,
* or null if nothing to consume. Always clears the queue atomically.
*
* This is the SOLE consumption point — called at the top of handleIncomingMessage.
*/
function consumeInterruptQueue(
runtime: ListenerRuntime,
agentId: string,
conversationId: string,
): { type: "approval"; approvals: ApprovalResult[] } | null {
if (
!runtime.pendingInterruptedResults ||
runtime.pendingInterruptedResults.length === 0
) {
return null;
}
const ctx = runtime.pendingInterruptedContext;
let result: { type: "approval"; approvals: ApprovalResult[] } | null = null;
if (
ctx &&
ctx.agentId === agentId &&
ctx.conversationId === conversationId &&
ctx.continuationEpoch === runtime.continuationEpoch
) {
result = {
type: "approval",
approvals: runtime.pendingInterruptedResults,
};
}
// Atomic clear — always, regardless of context match.
// Stale results for wrong context are discarded, not retried.
runtime.pendingInterruptedResults = null;
runtime.pendingInterruptedContext = null;
runtime.pendingApprovalBatchByToolCallId.clear();
return result;
}
/**
* Attempt to resolve stale pending approvals by fetching them from the backend
* and auto-denying. This is the Phase 3 bounded recovery mechanism — it does NOT
* touch pendingInterruptedResults (that's exclusively owned by handleIncomingMessage).
*/
async function resolveStaleApprovals(
runtime: ListenerRuntime,
abortSignal: AbortSignal,
): Promise<void> {
if (!runtime.activeAgentId) return;
const client = await getClient();
let agent: Awaited<ReturnType<typeof client.agents.retrieve>>;
try {
agent = await client.agents.retrieve(runtime.activeAgentId);
} catch (err) {
// 404 = agent deleted, 422 = invalid ID — both mean nothing to recover
if (err instanceof APIError && (err.status === 404 || err.status === 422)) {
return;
}
throw err;
}
const requestedConversationId =
runtime.activeConversationId && runtime.activeConversationId !== "default"
? runtime.activeConversationId
: undefined;
let resumeData: Awaited<ReturnType<typeof getResumeData>>;
try {
resumeData = await getResumeData(client, agent, requestedConversationId, {
includeMessageHistory: false,
});
} catch (err) {
// getResumeData rethrows 404/422 for conversations — treat as no approvals
if (err instanceof APIError && (err.status === 404 || err.status === 422)) {
return;
}
throw err;
}
const pendingApprovals = resumeData.pendingApprovals || [];
if (pendingApprovals.length === 0) return;
if (abortSignal.aborted) throw new Error("Cancelled");
const denialResults: ApprovalResult[] = pendingApprovals.map((approval) => ({
type: "approval" as const,
tool_call_id: approval.toolCallId,
approve: false,
reason: "Auto-denied during pre-stream approval recovery",
}));
const recoveryConversationId = runtime.activeConversationId || "default";
const recoveryStream = await sendMessageStream(
recoveryConversationId,
[{ type: "approval", approvals: denialResults }],
{
agentId: runtime.activeAgentId,
streamTokens: true,
background: true,
},
{ maxRetries: 0, signal: abortSignal },
);
const drainResult = await drainStreamWithResume(
recoveryStream as Stream<LettaStreamingResponse>,
createBuffers(runtime.activeAgentId),
() => {},
abortSignal,
);
if (drainResult.stopReason === "error") {
throw new Error("Pre-stream approval recovery drain ended with error");
}
}
/**
* Wrap sendMessageStream with pre-stream error handling (retry/recovery).
@@ -830,6 +1217,7 @@ async function sendMessageStreamWithRetry(
): Promise<Awaited<ReturnType<typeof sendMessageStream>>> {
let transientRetries = 0;
let conversationBusyRetries = 0;
let preStreamRecoveryAttempts = 0;
const MAX_CONVERSATION_BUSY_RETRIES = 1;
// eslint-disable-next-line no-constant-condition
@@ -868,9 +1256,31 @@ async function sendMessageStreamWithRetry(
);
if (action === "resolve_approval_pending") {
// Listener can't auto-resolve pending approvals like headless does.
// Rethrow — the cloud will resend with the approval.
throw preStreamError;
// Abort check first — don't let recovery mask a user cancel
if (abortSignal?.aborted) throw new Error("Cancelled by user");
// Attempt bounded recovery: fetch pending approvals and auto-deny them.
// This does NOT touch pendingInterruptedResults (sole owner: handleIncomingMessage).
if (
abortSignal &&
preStreamRecoveryAttempts < MAX_PRE_STREAM_RECOVERY
) {
preStreamRecoveryAttempts++;
try {
await resolveStaleApprovals(runtime, abortSignal);
continue; // Retry send after resolving
} catch (_recoveryError) {
if (abortSignal.aborted) throw new Error("Cancelled by user");
// Recovery failed — fall through to structured error
}
}
// Unrecoverable — emit structured error instead of blind rethrow
const detail = await fetchRunErrorDetail(runtime.activeRunId);
throw new Error(
detail ||
`Pre-stream approval conflict (resolve_approval_pending) after ${preStreamRecoveryAttempts} recovery attempts`,
);
}
if (action === "retry_transient") {
@@ -1071,6 +1481,17 @@ async function recoverPendingApprovals(
socket: WebSocket,
msg: RecoverPendingApprovalsMessage,
): Promise<void> {
console.debug(
"[listener] recover_pending_approvals received",
JSON.stringify({
agentId: msg.agentId,
conversationId: msg.conversationId ?? null,
isProcessing: runtime.isProcessing,
isRecovering: runtime.isRecoveringApprovals,
batchMapSize: runtime.pendingApprovalBatchByToolCallId.size,
}),
);
if (runtime.isProcessing || runtime.isRecoveringApprovals) {
return;
}
@@ -1108,15 +1529,12 @@ async function recoverPendingApprovals(
return;
}
const recoveryBatchId = resolvePendingApprovalBatchId(
runtime,
pendingApprovals,
);
const recoveryBatchId = resolveRecoveryBatchId(runtime, pendingApprovals);
if (!recoveryBatchId) {
emitToWS(socket, {
type: "error",
message:
"Unable to recover pending approvals without originating batch correlation",
"Unable to recover pending approvals: ambiguous batch correlation",
stop_reason: "error",
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
@@ -1189,6 +1607,11 @@ async function recoverPendingApprovals(
return;
}
// Reflect approval-wait state in runtime snapshot while control
// requests are pending, so state_response queries see
// requires_approval even during the WS round-trip.
runtime.lastStopReason = "requires_approval";
for (const ac of needsUserInput) {
const requestId = `perm-${ac.approval.toolCallId}`;
const diffs = await computeDiffPreviews(
@@ -1501,6 +1924,25 @@ async function connectWithRetry(
if (hasPendingApprovals) {
rejectPendingApprovalResolvers(runtime, "Cancelled by user");
}
// Backend cancel parity with TUI (App.tsx:5932-5941).
// Fire-and-forget — local cancel + queued results are the primary mechanism.
const cancelConversationId = runtime.activeConversationId;
const cancelAgentId = runtime.activeAgentId;
if (cancelAgentId) {
getClient()
.then((client) => {
const cancelId =
cancelConversationId === "default" || !cancelConversationId
? cancelAgentId
: cancelConversationId;
return client.conversations.cancel(cancelId);
})
.catch(() => {
// Fire-and-forget
});
}
emitCancelAck(socket, runtime, {
requestId,
accepted: true,
@@ -1794,6 +2236,11 @@ async function handleIncomingMessage(
let msgTurnCount = 0;
const msgRunIds: string[] = [];
// Track last approval-loop state for cancel-time queueing (Phase 1.2).
// Hoisted before try so the cancel catch block can access them.
let lastExecutionResults: ApprovalResult[] | null = null;
let lastNeedsUserInputToolCallIds: string[] = [];
runtime.isProcessing = true;
runtime.cancelRequested = false;
runtime.activeAbortController = new AbortController();
@@ -1824,9 +2271,21 @@ async function handleIncomingMessage(
onStatusChange?.("processing", connectionId);
}
let messagesToSend: Array<MessageCreate | ApprovalCreate> = msg.messages;
let messagesToSend: Array<MessageCreate | ApprovalCreate> = [];
let turnToolContextId: string | null = null;
// Prepend queued interrupted results from a prior cancelled turn.
const consumed = consumeInterruptQueue(
runtime,
agentId || "",
conversationId,
);
if (consumed) {
messagesToSend.push(consumed);
}
messagesToSend.push(...msg.messages);
const firstMessage = msg.messages[0];
const isApprovalMessage =
firstMessage &&
@@ -1891,6 +2350,15 @@ async function handleIncomingMessage(
approvals: rebuiltApprovals,
},
];
// Emit terminal tool outcomes immediately so WS consumers can close
// tool-call UI state without waiting for follow-up hydration.
emitInterruptToolReturnMessage(
socket,
runtime,
rebuiltApprovals,
runtime.activeRunId ?? undefined,
"tool-return",
);
}
let stream = await sendMessageStreamWithRetry(
@@ -2034,16 +2502,62 @@ async function handleIncomingMessage(
break;
}
// Case 3: Error
// Case 3: Error (or cancel-induced error)
if (stopReason !== "requires_approval") {
runtime.lastStopReason = stopReason;
// Cancel-induced errors should be treated as cancellation, not error.
// This handles the race where cancel fires during stream drain and the
// backend returns "error" instead of "cancelled".
// We're already inside `stopReason !== "requires_approval"`, so this
// is a true non-approval stop. If cancel was requested, treat as cancelled.
const effectiveStopReason: StopReasonType = runtime.cancelRequested
? "cancelled"
: (stopReason as StopReasonType) || "error";
// If effective stop reason is cancelled, route through cancelled semantics (Case 2).
if (effectiveStopReason === "cancelled") {
runtime.lastStopReason = "cancelled";
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "interrupted",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "cancelled",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "cancelled",
});
}
break;
}
runtime.lastStopReason = effectiveStopReason;
runtime.isProcessing = false;
clearActiveRunState(runtime);
// Try to fetch richer error detail from the run metadata
const errorDetail = await fetchRunErrorDetail(runId).catch(() => null);
const errorMessage =
errorDetail || `Unexpected stop reason: ${stopReason}`;
emitToWS(socket, {
type: "error",
message: `Unexpected stop reason: ${stopReason}`,
stop_reason: (stopReason as StopReasonType) || "error",
message: errorMessage,
stop_reason: effectiveStopReason,
run_id: runId,
session_id: runtime.sessionId,
uuid: `error-${crypto.randomUUID()}`,
@@ -2060,7 +2574,7 @@ async function handleIncomingMessage(
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: (stopReason as StopReasonType) || "error",
stop_reason: effectiveStopReason,
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
@@ -2068,7 +2582,7 @@ async function handleIncomingMessage(
sendClientMessage(socket, {
type: "result",
success: false,
stopReason,
stopReason: effectiveStopReason,
});
}
break;
@@ -2103,6 +2617,13 @@ async function handleIncomingMessage(
requireArgsForAutoApprove: true,
});
// Snapshot all tool_call_ids before entering approval wait so cancel can
// synthesize denial results even after pendingApprovalResolvers is cleared.
lastNeedsUserInputToolCallIds = needsUserInput.map(
(ac) => ac.approval.toolCallId,
);
lastExecutionResults = null;
// Build decisions list (before needsUserInput gate so both paths accumulate here)
type Decision =
| {
@@ -2255,6 +2776,19 @@ async function handleIncomingMessage(
abortSignal: runtime.activeAbortController.signal,
},
);
lastExecutionResults = executionResults;
// WS-first parity: publish tool-return terminal outcomes immediately on
// normal approval execution, before continuation stream send.
emitInterruptToolReturnMessage(
socket,
runtime,
executionResults,
runtime.activeRunId ||
runId ||
msgRunIds[msgRunIds.length - 1] ||
undefined,
"tool-return",
);
clearPendingApprovalBatchIds(
runtime,
decisions.map((decision) => decision.approval),
@@ -2274,12 +2808,40 @@ async function handleIncomingMessage(
runtime,
runtime.activeAbortController.signal,
);
// Results were successfully submitted to the backend — clear both so a
// cancel during the subsequent stream drain won't queue already-sent
// results (Path A) or re-deny already-resolved tool calls (Path B).
lastExecutionResults = null;
lastNeedsUserInputToolCallIds = [];
turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>,
);
}
} catch (error) {
if (runtime.cancelRequested) {
// Queue interrupted tool-call resolutions for the next message turn.
populateInterruptQueue(runtime, {
lastExecutionResults,
lastNeedsUserInputToolCallIds,
agentId: agentId || "",
conversationId,
});
const approvalsForEmission = getInterruptApprovalsForEmission(runtime, {
lastExecutionResults,
agentId: agentId || "",
conversationId,
});
if (approvalsForEmission) {
emitInterruptToolReturnMessage(
socket,
runtime,
approvalsForEmission,
runtime.activeRunId || msgRunIds[msgRunIds.length - 1] || undefined,
);
}
runtime.lastStopReason = "cancelled";
runtime.isProcessing = false;
clearActiveRunState(runtime);
@@ -2381,5 +2943,11 @@ export const __listenClientTestUtils = {
emitToWS,
rememberPendingApprovalBatchIds,
resolvePendingApprovalBatchId,
resolveRecoveryBatchId,
clearPendingApprovalBatchIds,
populateInterruptQueue,
consumeInterruptQueue,
extractInterruptToolReturns,
emitInterruptToolReturnMessage,
getInterruptApprovalsForEmission,
};