feat: ws sync (#1222)

This commit is contained in:
Charles Packer
2026-02-28 10:42:20 -08:00
committed by GitHub
parent 27d145d0ea
commit 269e381551
4 changed files with 492 additions and 18 deletions

View File

@@ -62,7 +62,9 @@ export async function sendMessageStream(
} = { streamTokens: true, background: true },
// Disable SDK retries by default - state management happens outside the stream,
// so retries would violate idempotency and create race conditions
requestOptions: { maxRetries?: number } = { maxRetries: 0 },
requestOptions: { maxRetries?: number; signal?: AbortSignal } = {
maxRetries: 0,
},
): Promise<Stream<LettaStreamingResponse>> {
const requestStartTime = isTimingsEnabled() ? performance.now() : undefined;
const requestStartedAtMs = Date.now();

View File

@@ -328,7 +328,7 @@ export class QueueRuntime {
return {
...input,
id: `q-${++this.nextId}`,
enqueuedAt: performance.now(),
enqueuedAt: Date.now(),
} as QueueItem;
}

View File

@@ -73,6 +73,8 @@ export type SystemPromptConfig = string | SystemPromptPresetConfig;
export interface MessageEnvelope {
session_id: string;
uuid: string;
/** Monotonic per-session event sequence. Optional for backward compatibility. */
event_seq?: number;
}
// ═══════════════════════════════════════════════════════════════
@@ -222,6 +224,17 @@ export interface RecoveryMessage extends MessageEnvelope {
run_id?: string;
}
/**
* Acknowledges a cancel request received over the device websocket control path.
*/
export interface CancelAckMessage extends MessageEnvelope {
type: "cancel_ack";
request_id: string;
accepted: boolean;
run_id?: string | null;
reason?: string;
}
// ═══════════════════════════════════════════════════════════════
// RESULT
// ═══════════════════════════════════════════════════════════════
@@ -297,9 +310,16 @@ export type QueueItemKind =
*/
export interface QueueItemEnqueuedEvent extends MessageEnvelope {
type: "queue_item_enqueued";
/** Stable queue item identifier. Preferred field. */
id?: string;
/** @deprecated Use `id`. */
item_id: string;
source: QueueItemSource;
kind: QueueItemKind;
/** Full queue item content; renderers may truncate for display. */
content?: MessageCreate["content"] | string;
/** ISO8601 UTC enqueue timestamp. */
enqueued_at?: string;
queue_len: number;
}
@@ -372,6 +392,9 @@ export type QueueItemDroppedReason = "buffer_limit" | "stale_generation";
*/
export interface QueueItemDroppedEvent extends MessageEnvelope {
type: "queue_item_dropped";
/** Stable queue item identifier. Preferred field. */
id?: string;
/** @deprecated Use `id`. */
item_id: string;
reason: QueueItemDroppedReason;
queue_len: number;
@@ -662,6 +685,9 @@ export interface QueueSnapshotMessage extends MessageEnvelope {
type: "queue_snapshot";
/** Items currently in the queue, in enqueue order. */
items: Array<{
/** Stable queue item identifier. Preferred field. */
id?: string;
/** @deprecated Use `id`. */
item_id: string;
kind: QueueItemKind;
source: QueueItemSource;
@@ -714,6 +740,7 @@ export type WireMessage =
| ContentMessage
| StreamEvent
| AutoApprovalMessage
| CancelAckMessage
| ErrorMessage
| RetryMessage
| RecoveryMessage

View File

@@ -31,12 +31,13 @@ import { generatePlanFilePath } from "../cli/helpers/planName";
import { drainStreamWithResume } from "../cli/helpers/stream";
import { computeDiffPreviews } from "../helpers/diffPreview";
import { permissionMode } from "../permissions/mode";
import { QueueRuntime } from "../queue/queueRuntime";
import { type QueueItem, QueueRuntime } from "../queue/queueRuntime";
import { settingsManager } from "../settings-manager";
import { isInteractiveApprovalTool } from "../tools/interactivePolicy";
import { loadTools } from "../tools/manager";
import type {
AutoApprovalMessage,
CancelAckMessage,
CanUseToolResponse,
ControlRequest,
ControlResponseBody,
@@ -108,11 +109,15 @@ interface ResultMessage {
type: "result";
success: boolean;
stopReason?: string;
event_seq?: number;
session_id?: string;
}
interface RunStartedMessage {
type: "run_started";
runId: string;
event_seq?: number;
session_id?: string;
}
interface ModeChangeMessage {
@@ -130,12 +135,24 @@ interface ModeChangedMessage {
mode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
success: boolean;
error?: string;
event_seq?: number;
session_id?: string;
}
interface GetStatusMessage {
type: "get_status";
}
interface GetStateMessage {
type: "get_state";
}
interface CancelRunMessage {
type: "cancel_run";
request_id?: string;
run_id?: string | null;
}
interface RecoverPendingApprovalsMessage {
type: "recover_pending_approvals";
agentId?: string;
@@ -147,6 +164,43 @@ interface StatusResponseMessage {
currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
lastStopReason: string | null;
isProcessing: boolean;
event_seq?: number;
session_id?: string;
}
interface StateResponseMessage {
type: "state_response";
schema_version: 1;
session_id: string;
snapshot_id: string;
generated_at: string;
state_seq: number;
mode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
is_processing: boolean;
last_stop_reason: string | null;
control_response_capable: boolean;
active_run: {
run_id: string | null;
agent_id: string | null;
conversation_id: string | null;
started_at: string | null;
};
pending_control_requests: Array<{
request_id: string;
request: ControlRequest["request"];
}>;
queue: {
queue_len: number;
pending_turns: number;
items: Array<{
id: string;
kind: string;
source: string;
content: unknown;
enqueued_at: string;
}>;
};
event_seq?: number;
}
type ServerMessage =
@@ -155,6 +209,8 @@ type ServerMessage =
| IncomingMessage
| ModeChangeMessage
| GetStatusMessage
| GetStateMessage
| CancelRunMessage
| RecoverPendingApprovalsMessage
| WsControlResponse;
type ClientMessage =
@@ -162,11 +218,13 @@ type ClientMessage =
| ResultMessage
| RunStartedMessage
| ModeChangedMessage
| StatusResponseMessage;
| StatusResponseMessage
| StateResponseMessage;
type PendingApprovalResolver = {
resolve: (response: ControlResponseBody) => void;
reject: (reason: Error) => void;
controlRequest?: ControlRequest;
};
type ListenerRuntime = {
@@ -181,10 +239,21 @@ type ListenerRuntime = {
controlResponseCapable: boolean;
/** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */
sessionId: string;
/** Monotonic event sequence for all outbound status/protocol events. */
eventSeqCounter: number;
/** Last stop reason from completed run */
lastStopReason: string | null;
/** Whether currently processing a message */
isProcessing: boolean;
/** Active run metadata for reconnect snapshot state. */
activeAgentId: string | null;
activeConversationId: string | null;
activeRunId: string | null;
activeRunStartedAt: string | null;
/** Abort controller for the currently active message turn. */
activeAbortController: AbortController | null;
/** True when a cancel_run request has been issued for the active turn. */
cancelRequested: boolean;
/** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */
queueRuntime: QueueRuntime;
/** Count of turns currently queued or in-flight in the promise chain. Incremented
@@ -256,8 +325,15 @@ function createRuntime(): ListenerRuntime {
pendingApprovalResolvers: new Map(),
controlResponseCapable: false,
sessionId: `listen-${crypto.randomUUID()}`,
eventSeqCounter: 0,
lastStopReason: null,
isProcessing: false,
activeAgentId: null,
activeConversationId: null,
activeRunId: null,
activeRunStartedAt: null,
activeAbortController: null,
cancelRequested: false,
isRecoveringApprovals: false,
pendingTurns: 0,
// queueRuntime assigned below — needs runtime ref in callbacks
@@ -267,11 +343,15 @@ function createRuntime(): ListenerRuntime {
callbacks: {
onEnqueued: (item, queueLen) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
const content = item.kind === "message" ? item.content : item.text;
emitToWS(runtime.socket, {
type: "queue_item_enqueued",
id: item.id,
item_id: item.id,
source: item.source,
kind: item.kind,
content,
enqueued_at: new Date(item.enqueuedAt).toISOString(),
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-enq-${item.id}`,
@@ -313,6 +393,19 @@ function createRuntime(): ListenerRuntime {
});
}
},
onDropped: (item, reason, queueLen) => {
if (runtime.socket?.readyState === WebSocket.OPEN) {
emitToWS(runtime.socket, {
type: "queue_item_dropped",
id: item.id,
item_id: item.id,
reason,
queue_len: queueLen,
session_id: runtime.sessionId,
uuid: `q-drp-${item.id}`,
});
}
},
},
});
return runtime;
@@ -329,11 +422,26 @@ function clearRuntimeTimers(runtime: ListenerRuntime): void {
}
}
function clearActiveRunState(runtime: ListenerRuntime): void {
runtime.activeAgentId = null;
runtime.activeConversationId = null;
runtime.activeRunId = null;
runtime.activeRunStartedAt = null;
runtime.activeAbortController = null;
}
function stopRuntime(
runtime: ListenerRuntime,
suppressCallbacks: boolean,
): void {
runtime.intentionallyClosed = true;
runtime.cancelRequested = true;
if (
runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted
) {
runtime.activeAbortController.abort();
}
clearRuntimeTimers(runtime);
rejectPendingApprovalResolvers(runtime, "Listener runtime stopped");
@@ -386,6 +494,8 @@ export function parseServerMessage(
parsed.type === "message" ||
parsed.type === "mode_change" ||
parsed.type === "get_status" ||
parsed.type === "get_state" ||
parsed.type === "cancel_run" ||
parsed.type === "recover_pending_approvals"
) {
return parsed as ServerMessage;
@@ -415,21 +525,155 @@ function safeEmitWsEvent(
}
}
function sendClientMessage(socket: WebSocket, payload: ClientMessage): void {
function nextEventSeq(runtime: ListenerRuntime | null): number | null {
if (!runtime) {
return null;
}
runtime.eventSeqCounter += 1;
return runtime.eventSeqCounter;
}
function getQueueItemContent(item: QueueItem): unknown {
return item.kind === "message" ? item.content : item.text;
}
function buildStateResponse(
runtime: ListenerRuntime,
stateSeq: number,
): StateResponseMessage {
const queueItems = runtime.queueRuntime.items.map((item) => ({
id: item.id,
kind: item.kind,
source: item.source,
content: getQueueItemContent(item),
enqueued_at: new Date(item.enqueuedAt).toISOString(),
}));
const pendingControlRequests = Array.from(
runtime.pendingApprovalResolvers.entries(),
).flatMap(([requestId, pending]) => {
if (!pending.controlRequest) {
return [];
}
return [
{
request_id: requestId,
request: pending.controlRequest.request,
},
];
});
return {
type: "state_response",
schema_version: 1,
session_id: runtime.sessionId,
snapshot_id: `snapshot-${crypto.randomUUID()}`,
generated_at: new Date().toISOString(),
state_seq: stateSeq,
event_seq: stateSeq,
mode: permissionMode.getMode(),
is_processing: runtime.isProcessing,
last_stop_reason: runtime.lastStopReason,
control_response_capable: runtime.controlResponseCapable,
active_run: {
run_id: runtime.activeRunId,
agent_id: runtime.activeAgentId,
conversation_id: runtime.activeConversationId,
started_at: runtime.activeRunStartedAt,
},
pending_control_requests: pendingControlRequests,
queue: {
queue_len: runtime.queueRuntime.length,
pending_turns: runtime.pendingTurns,
items: queueItems,
},
};
}
function sendStateSnapshot(socket: WebSocket, runtime: ListenerRuntime): void {
const stateSeq = nextEventSeq(runtime);
if (stateSeq === null) {
return;
}
const stateResponse = buildStateResponse(runtime, stateSeq);
sendClientMessage(socket, stateResponse, runtime);
}
function emitCancelAck(
socket: WebSocket,
runtime: ListenerRuntime,
params: {
requestId: string;
accepted: boolean;
reason?: string;
runId?: string | null;
},
): void {
emitToWS(socket, {
type: "cancel_ack",
request_id: params.requestId,
accepted: params.accepted,
reason: params.reason,
run_id: params.runId ?? runtime.activeRunId,
session_id: runtime.sessionId,
uuid: `cancel-ack-${params.requestId}`,
} as CancelAckMessage);
}
function sendClientMessage(
socket: WebSocket,
payload: ClientMessage,
runtime: ListenerRuntime | null = activeRuntime,
): void {
if (socket.readyState === WebSocket.OPEN) {
safeEmitWsEvent("send", "client", payload);
socket.send(JSON.stringify(payload));
let outbound = payload as unknown as Record<string, unknown>;
if (payload.type !== "ping") {
const hasEventSeq = typeof outbound.event_seq === "number";
if (!hasEventSeq) {
const eventSeq = nextEventSeq(runtime);
if (eventSeq !== null) {
outbound = {
...outbound,
event_seq: eventSeq,
session_id:
typeof outbound.session_id === "string"
? outbound.session_id
: runtime?.sessionId,
};
}
} else if (
typeof outbound.session_id !== "string" &&
runtime?.sessionId
) {
outbound = {
...outbound,
session_id: runtime.sessionId,
};
}
}
safeEmitWsEvent("send", "client", outbound);
socket.send(JSON.stringify(outbound));
}
}
function sendControlMessageOverWebSocket(
socket: WebSocket,
payload: ControlRequest,
runtime: ListenerRuntime | null = activeRuntime,
): void {
// Central hook for protocol-only outbound WS messages so future
// filtering/mutation can be added without touching approval flow.
safeEmitWsEvent("send", "control", payload);
socket.send(JSON.stringify(payload));
const eventSeq = nextEventSeq(runtime);
const outbound =
eventSeq === null
? payload
: {
...payload,
event_seq: eventSeq,
session_id: runtime?.sessionId,
};
safeEmitWsEvent("send", "control", outbound);
socket.send(JSON.stringify(outbound));
}
// ── Typed protocol event adapter ────────────────────────────────
@@ -437,6 +681,7 @@ function sendControlMessageOverWebSocket(
export type WsProtocolEvent =
| MessageWire
| AutoApprovalMessage
| CancelAckMessage
| ErrorMessage
| RetryMessage
| RecoveryMessage
@@ -453,8 +698,22 @@ export type WsProtocolEvent =
*/
function emitToWS(socket: WebSocket, event: WsProtocolEvent): void {
if (socket.readyState === WebSocket.OPEN) {
safeEmitWsEvent("send", "protocol", event);
socket.send(JSON.stringify(event));
const runtime = activeRuntime;
const eventSeq = nextEventSeq(runtime);
const eventRecord = event as unknown as Record<string, unknown>;
const outbound =
eventSeq === null
? eventRecord
: {
...eventRecord,
event_seq: eventSeq,
session_id:
typeof eventRecord.session_id === "string"
? eventRecord.session_id
: runtime?.sessionId,
};
safeEmitWsEvent("send", "protocol", outbound);
socket.send(JSON.stringify(outbound));
}
}
@@ -470,6 +729,7 @@ async function sendMessageStreamWithRetry(
opts: Parameters<typeof sendMessageStream>[2],
socket: WebSocket,
runtime: ListenerRuntime,
abortSignal?: AbortSignal,
): Promise<Awaited<ReturnType<typeof sendMessageStream>>> {
let transientRetries = 0;
let conversationBusyRetries = 0;
@@ -477,9 +737,24 @@ async function sendMessageStreamWithRetry(
// eslint-disable-next-line no-constant-condition
while (true) {
if (abortSignal?.aborted) {
throw new Error("Cancelled by user");
}
try {
return await sendMessageStream(conversationId, messages, opts);
return await sendMessageStream(
conversationId,
messages,
opts,
abortSignal
? { maxRetries: 0, signal: abortSignal }
: { maxRetries: 0 },
);
} catch (preStreamError) {
if (abortSignal?.aborted) {
throw new Error("Cancelled by user");
}
const errorDetail = extractConflictDetail(preStreamError);
const action = getPreStreamErrorAction(
errorDetail,
@@ -523,6 +798,9 @@ async function sendMessageStreamWithRetry(
} as RetryMessage);
await new Promise((resolve) => setTimeout(resolve, delayMs));
if (abortSignal?.aborted) {
throw new Error("Cancelled by user");
}
continue;
}
@@ -542,6 +820,9 @@ async function sendMessageStreamWithRetry(
} as RetryMessage);
await new Promise((resolve) => setTimeout(resolve, delayMs));
if (abortSignal?.aborted) {
throw new Error("Cancelled by user");
}
continue;
}
@@ -591,7 +872,11 @@ export function requestApprovalOverWS(
}
return new Promise<ControlResponseBody>((resolve, reject) => {
runtime.pendingApprovalResolvers.set(requestId, { resolve, reject });
runtime.pendingApprovalResolvers.set(requestId, {
resolve,
reject,
controlRequest,
});
try {
sendControlMessageOverWebSocket(socket, controlRequest);
} catch (error) {
@@ -1061,6 +1346,79 @@ async function connectWithRetry(
return;
}
if (parsed.type === "cancel_run") {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
}
const requestId =
typeof parsed.request_id === "string" && parsed.request_id.length > 0
? parsed.request_id
: `cancel-${crypto.randomUUID()}`;
const requestedRunId =
typeof parsed.run_id === "string" ? parsed.run_id : runtime.activeRunId;
const hasPendingApprovals = runtime.pendingApprovalResolvers.size > 0;
const hasActiveTurn = runtime.isProcessing;
if (!hasActiveTurn && !hasPendingApprovals) {
emitCancelAck(socket, runtime, {
requestId,
accepted: false,
reason: "no_active_turn",
runId: requestedRunId,
});
return;
}
runtime.cancelRequested = true;
if (
runtime.activeAbortController &&
!runtime.activeAbortController.signal.aborted
) {
runtime.activeAbortController.abort();
}
if (hasPendingApprovals) {
rejectPendingApprovalResolvers(runtime, "Cancelled by user");
}
emitCancelAck(socket, runtime, {
requestId,
accepted: true,
runId: requestedRunId,
});
return;
}
if (parsed.type === "get_state") {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
}
// If we're blocked on an approval callback, don't queue behind the
// pending turn; respond immediately so refreshed clients can render the
// approval card needed to unblock execution.
if (runtime.pendingApprovalResolvers.size > 0) {
sendStateSnapshot(socket, runtime);
return;
}
// Serialize snapshot generation with the same message queue used for
// message processing so reconnect snapshots cannot race in-flight turns.
runtime.messageQueue = runtime.messageQueue
.then(async () => {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
}
sendStateSnapshot(socket, runtime);
})
.catch((error: unknown) => {
if (process.env.DEBUG) {
console.error("[Listen] Error handling queued get_state:", error);
}
});
return;
}
if (parsed.type === "recover_pending_approvals") {
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
return;
@@ -1265,6 +1623,12 @@ async function handleIncomingMessage(
const msgRunIds: string[] = [];
runtime.isProcessing = true;
runtime.cancelRequested = false;
runtime.activeAbortController = new AbortController();
runtime.activeAgentId = agentId ?? null;
runtime.activeConversationId = conversationId;
runtime.activeRunId = null;
runtime.activeRunStartedAt = new Date().toISOString();
try {
// Latch capability: once seen, always use blocking path (strict check to avoid truthy strings)
@@ -1274,6 +1638,7 @@ async function handleIncomingMessage(
if (!agentId) {
runtime.isProcessing = false;
clearActiveRunState(runtime);
return;
}
@@ -1362,6 +1727,7 @@ async function handleIncomingMessage(
{ agentId, streamTokens: true, background: true },
socket,
runtime,
runtime.activeAbortController.signal,
);
turnToolContextId = getStreamToolContextId(
@@ -1380,12 +1746,15 @@ async function handleIncomingMessage(
stream as Stream<LettaStreamingResponse>,
buffers,
() => {},
undefined,
runtime.activeAbortController.signal,
undefined,
({ chunk, shouldOutput, errorInfo }) => {
const maybeRunId = (chunk as { run_id?: unknown }).run_id;
if (typeof maybeRunId === "string") {
runId = maybeRunId;
if (runtime.activeRunId !== maybeRunId) {
runtime.activeRunId = maybeRunId;
}
if (!runIdSent) {
runIdSent = true;
msgRunIds.push(maybeRunId);
@@ -1433,6 +1802,7 @@ async function handleIncomingMessage(
if (stopReason === "end_turn") {
runtime.lastStopReason = "end_turn";
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
@@ -1459,10 +1829,43 @@ async function handleIncomingMessage(
break;
}
// Case 2: Error or cancelled
// Case 2: Explicit cancellation
if (stopReason === "cancelled") {
runtime.lastStopReason = "cancelled";
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "interrupted",
agent_id: agentId,
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "cancelled",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "cancelled",
});
}
break;
}
// Case 3: Error
if (stopReason !== "requires_approval") {
runtime.lastStopReason = stopReason;
runtime.isProcessing = false;
clearActiveRunState(runtime);
emitToWS(socket, {
type: "error",
@@ -1498,11 +1901,12 @@ async function handleIncomingMessage(
break;
}
// Case 3: Requires approval - classify and handle based on permission mode
// Case 4: Requires approval - classify and handle based on permission mode
if (approvals.length === 0) {
// Unexpected: requires_approval but no approvals
runtime.lastStopReason = "error";
runtime.isProcessing = false;
clearActiveRunState(runtime);
sendClientMessage(socket, {
type: "result",
@@ -1575,10 +1979,12 @@ async function handleIncomingMessage(
// Handle tools that need user input
if (needsUserInput.length > 0) {
runtime.lastStopReason = "requires_approval";
if (!runtime.controlResponseCapable) {
// Legacy path: break out, let cloud re-enter with ApprovalCreate
runtime.lastStopReason = "requires_approval";
runtime.isProcessing = false;
clearActiveRunState(runtime);
sendClientMessage(socket, {
type: "result",
@@ -1667,7 +2073,10 @@ async function handleIncomingMessage(
const executionResults = await executeApprovalBatch(
decisions,
undefined,
{ toolContextId: turnToolContextId ?? undefined },
{
toolContextId: turnToolContextId ?? undefined,
abortSignal: runtime.activeAbortController.signal,
},
);
// Create fresh approval stream for next iteration
@@ -1682,14 +2091,47 @@ async function handleIncomingMessage(
{ agentId, streamTokens: true, background: true },
socket,
runtime,
runtime.activeAbortController.signal,
);
turnToolContextId = getStreamToolContextId(
stream as Stream<LettaStreamingResponse>,
);
}
} catch (error) {
if (runtime.cancelRequested) {
runtime.lastStopReason = "cancelled";
runtime.isProcessing = false;
clearActiveRunState(runtime);
if (runtime.controlResponseCapable) {
emitToWS(socket, {
type: "result",
subtype: "interrupted",
agent_id: agentId || "",
conversation_id: conversationId,
duration_ms: performance.now() - msgStartTime,
duration_api_ms: 0,
num_turns: msgTurnCount,
result: null,
run_ids: msgRunIds,
usage: null,
stop_reason: "cancelled",
session_id: runtime.sessionId,
uuid: `result-${crypto.randomUUID()}`,
});
} else {
sendClientMessage(socket, {
type: "result",
success: false,
stopReason: "cancelled",
});
}
return;
}
runtime.lastStopReason = "error";
runtime.isProcessing = false;
clearActiveRunState(runtime);
const errorMessage = error instanceof Error ? error.message : String(error);
emitToWS(socket, {
@@ -1726,6 +2168,9 @@ async function handleIncomingMessage(
if (process.env.DEBUG) {
console.error("[Listen] Error handling message:", error);
}
} finally {
runtime.activeAbortController = null;
runtime.cancelRequested = false;
}
}