From 269e3815510ee733dcd5fab380ddb62ac1090a99 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sat, 28 Feb 2026 10:42:20 -0800 Subject: [PATCH] feat: ws sync (#1222) --- src/agent/message.ts | 4 +- src/queue/queueRuntime.ts | 2 +- src/types/protocol.ts | 27 ++ src/websocket/listen-client.ts | 477 +++++++++++++++++++++++++++++++-- 4 files changed, 492 insertions(+), 18 deletions(-) diff --git a/src/agent/message.ts b/src/agent/message.ts index 87666af..b9dc2fd 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -62,7 +62,9 @@ export async function sendMessageStream( } = { streamTokens: true, background: true }, // Disable SDK retries by default - state management happens outside the stream, // so retries would violate idempotency and create race conditions - requestOptions: { maxRetries?: number } = { maxRetries: 0 }, + requestOptions: { maxRetries?: number; signal?: AbortSignal } = { + maxRetries: 0, + }, ): Promise> { const requestStartTime = isTimingsEnabled() ? performance.now() : undefined; const requestStartedAtMs = Date.now(); diff --git a/src/queue/queueRuntime.ts b/src/queue/queueRuntime.ts index cd3f8e4..350b56c 100644 --- a/src/queue/queueRuntime.ts +++ b/src/queue/queueRuntime.ts @@ -328,7 +328,7 @@ export class QueueRuntime { return { ...input, id: `q-${++this.nextId}`, - enqueuedAt: performance.now(), + enqueuedAt: Date.now(), } as QueueItem; } diff --git a/src/types/protocol.ts b/src/types/protocol.ts index 2d16b1f..33cdab3 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -73,6 +73,8 @@ export type SystemPromptConfig = string | SystemPromptPresetConfig; export interface MessageEnvelope { session_id: string; uuid: string; + /** Monotonic per-session event sequence. Optional for backward compatibility. */ + event_seq?: number; } // ═══════════════════════════════════════════════════════════════ @@ -222,6 +224,17 @@ export interface RecoveryMessage extends MessageEnvelope { run_id?: string; } +/** + * Acknowledges a cancel request received over the device websocket control path. + */ +export interface CancelAckMessage extends MessageEnvelope { + type: "cancel_ack"; + request_id: string; + accepted: boolean; + run_id?: string | null; + reason?: string; +} + // ═══════════════════════════════════════════════════════════════ // RESULT // ═══════════════════════════════════════════════════════════════ @@ -297,9 +310,16 @@ export type QueueItemKind = */ export interface QueueItemEnqueuedEvent extends MessageEnvelope { type: "queue_item_enqueued"; + /** Stable queue item identifier. Preferred field. */ + id?: string; + /** @deprecated Use `id`. */ item_id: string; source: QueueItemSource; kind: QueueItemKind; + /** Full queue item content; renderers may truncate for display. */ + content?: MessageCreate["content"] | string; + /** ISO8601 UTC enqueue timestamp. */ + enqueued_at?: string; queue_len: number; } @@ -372,6 +392,9 @@ export type QueueItemDroppedReason = "buffer_limit" | "stale_generation"; */ export interface QueueItemDroppedEvent extends MessageEnvelope { type: "queue_item_dropped"; + /** Stable queue item identifier. Preferred field. */ + id?: string; + /** @deprecated Use `id`. */ item_id: string; reason: QueueItemDroppedReason; queue_len: number; @@ -662,6 +685,9 @@ export interface QueueSnapshotMessage extends MessageEnvelope { type: "queue_snapshot"; /** Items currently in the queue, in enqueue order. */ items: Array<{ + /** Stable queue item identifier. Preferred field. */ + id?: string; + /** @deprecated Use `id`. */ item_id: string; kind: QueueItemKind; source: QueueItemSource; @@ -714,6 +740,7 @@ export type WireMessage = | ContentMessage | StreamEvent | AutoApprovalMessage + | CancelAckMessage | ErrorMessage | RetryMessage | RecoveryMessage diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 0a60e05..ab2d072 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -31,12 +31,13 @@ import { generatePlanFilePath } from "../cli/helpers/planName"; import { drainStreamWithResume } from "../cli/helpers/stream"; import { computeDiffPreviews } from "../helpers/diffPreview"; import { permissionMode } from "../permissions/mode"; -import { QueueRuntime } from "../queue/queueRuntime"; +import { type QueueItem, QueueRuntime } from "../queue/queueRuntime"; import { settingsManager } from "../settings-manager"; import { isInteractiveApprovalTool } from "../tools/interactivePolicy"; import { loadTools } from "../tools/manager"; import type { AutoApprovalMessage, + CancelAckMessage, CanUseToolResponse, ControlRequest, ControlResponseBody, @@ -108,11 +109,15 @@ interface ResultMessage { type: "result"; success: boolean; stopReason?: string; + event_seq?: number; + session_id?: string; } interface RunStartedMessage { type: "run_started"; runId: string; + event_seq?: number; + session_id?: string; } interface ModeChangeMessage { @@ -130,12 +135,24 @@ interface ModeChangedMessage { mode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; success: boolean; error?: string; + event_seq?: number; + session_id?: string; } interface GetStatusMessage { type: "get_status"; } +interface GetStateMessage { + type: "get_state"; +} + +interface CancelRunMessage { + type: "cancel_run"; + request_id?: string; + run_id?: string | null; +} + interface RecoverPendingApprovalsMessage { type: "recover_pending_approvals"; agentId?: string; @@ -147,6 +164,43 @@ interface StatusResponseMessage { currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; lastStopReason: string | null; isProcessing: boolean; + event_seq?: number; + session_id?: string; +} + +interface StateResponseMessage { + type: "state_response"; + schema_version: 1; + session_id: string; + snapshot_id: string; + generated_at: string; + state_seq: number; + mode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; + is_processing: boolean; + last_stop_reason: string | null; + control_response_capable: boolean; + active_run: { + run_id: string | null; + agent_id: string | null; + conversation_id: string | null; + started_at: string | null; + }; + pending_control_requests: Array<{ + request_id: string; + request: ControlRequest["request"]; + }>; + queue: { + queue_len: number; + pending_turns: number; + items: Array<{ + id: string; + kind: string; + source: string; + content: unknown; + enqueued_at: string; + }>; + }; + event_seq?: number; } type ServerMessage = @@ -155,6 +209,8 @@ type ServerMessage = | IncomingMessage | ModeChangeMessage | GetStatusMessage + | GetStateMessage + | CancelRunMessage | RecoverPendingApprovalsMessage | WsControlResponse; type ClientMessage = @@ -162,11 +218,13 @@ type ClientMessage = | ResultMessage | RunStartedMessage | ModeChangedMessage - | StatusResponseMessage; + | StatusResponseMessage + | StateResponseMessage; type PendingApprovalResolver = { resolve: (response: ControlResponseBody) => void; reject: (reason: Error) => void; + controlRequest?: ControlRequest; }; type ListenerRuntime = { @@ -181,10 +239,21 @@ type ListenerRuntime = { controlResponseCapable: boolean; /** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */ sessionId: string; + /** Monotonic event sequence for all outbound status/protocol events. */ + eventSeqCounter: number; /** Last stop reason from completed run */ lastStopReason: string | null; /** Whether currently processing a message */ isProcessing: boolean; + /** Active run metadata for reconnect snapshot state. */ + activeAgentId: string | null; + activeConversationId: string | null; + activeRunId: string | null; + activeRunStartedAt: string | null; + /** Abort controller for the currently active message turn. */ + activeAbortController: AbortController | null; + /** True when a cancel_run request has been issued for the active turn. */ + cancelRequested: boolean; /** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */ queueRuntime: QueueRuntime; /** Count of turns currently queued or in-flight in the promise chain. Incremented @@ -256,8 +325,15 @@ function createRuntime(): ListenerRuntime { pendingApprovalResolvers: new Map(), controlResponseCapable: false, sessionId: `listen-${crypto.randomUUID()}`, + eventSeqCounter: 0, lastStopReason: null, isProcessing: false, + activeAgentId: null, + activeConversationId: null, + activeRunId: null, + activeRunStartedAt: null, + activeAbortController: null, + cancelRequested: false, isRecoveringApprovals: false, pendingTurns: 0, // queueRuntime assigned below — needs runtime ref in callbacks @@ -267,11 +343,15 @@ function createRuntime(): ListenerRuntime { callbacks: { onEnqueued: (item, queueLen) => { if (runtime.socket?.readyState === WebSocket.OPEN) { + const content = item.kind === "message" ? item.content : item.text; emitToWS(runtime.socket, { type: "queue_item_enqueued", + id: item.id, item_id: item.id, source: item.source, kind: item.kind, + content, + enqueued_at: new Date(item.enqueuedAt).toISOString(), queue_len: queueLen, session_id: runtime.sessionId, uuid: `q-enq-${item.id}`, @@ -313,6 +393,19 @@ function createRuntime(): ListenerRuntime { }); } }, + onDropped: (item, reason, queueLen) => { + if (runtime.socket?.readyState === WebSocket.OPEN) { + emitToWS(runtime.socket, { + type: "queue_item_dropped", + id: item.id, + item_id: item.id, + reason, + queue_len: queueLen, + session_id: runtime.sessionId, + uuid: `q-drp-${item.id}`, + }); + } + }, }, }); return runtime; @@ -329,11 +422,26 @@ function clearRuntimeTimers(runtime: ListenerRuntime): void { } } +function clearActiveRunState(runtime: ListenerRuntime): void { + runtime.activeAgentId = null; + runtime.activeConversationId = null; + runtime.activeRunId = null; + runtime.activeRunStartedAt = null; + runtime.activeAbortController = null; +} + function stopRuntime( runtime: ListenerRuntime, suppressCallbacks: boolean, ): void { runtime.intentionallyClosed = true; + runtime.cancelRequested = true; + if ( + runtime.activeAbortController && + !runtime.activeAbortController.signal.aborted + ) { + runtime.activeAbortController.abort(); + } clearRuntimeTimers(runtime); rejectPendingApprovalResolvers(runtime, "Listener runtime stopped"); @@ -386,6 +494,8 @@ export function parseServerMessage( parsed.type === "message" || parsed.type === "mode_change" || parsed.type === "get_status" || + parsed.type === "get_state" || + parsed.type === "cancel_run" || parsed.type === "recover_pending_approvals" ) { return parsed as ServerMessage; @@ -415,21 +525,155 @@ function safeEmitWsEvent( } } -function sendClientMessage(socket: WebSocket, payload: ClientMessage): void { +function nextEventSeq(runtime: ListenerRuntime | null): number | null { + if (!runtime) { + return null; + } + runtime.eventSeqCounter += 1; + return runtime.eventSeqCounter; +} + +function getQueueItemContent(item: QueueItem): unknown { + return item.kind === "message" ? item.content : item.text; +} + +function buildStateResponse( + runtime: ListenerRuntime, + stateSeq: number, +): StateResponseMessage { + const queueItems = runtime.queueRuntime.items.map((item) => ({ + id: item.id, + kind: item.kind, + source: item.source, + content: getQueueItemContent(item), + enqueued_at: new Date(item.enqueuedAt).toISOString(), + })); + + const pendingControlRequests = Array.from( + runtime.pendingApprovalResolvers.entries(), + ).flatMap(([requestId, pending]) => { + if (!pending.controlRequest) { + return []; + } + return [ + { + request_id: requestId, + request: pending.controlRequest.request, + }, + ]; + }); + + return { + type: "state_response", + schema_version: 1, + session_id: runtime.sessionId, + snapshot_id: `snapshot-${crypto.randomUUID()}`, + generated_at: new Date().toISOString(), + state_seq: stateSeq, + event_seq: stateSeq, + mode: permissionMode.getMode(), + is_processing: runtime.isProcessing, + last_stop_reason: runtime.lastStopReason, + control_response_capable: runtime.controlResponseCapable, + active_run: { + run_id: runtime.activeRunId, + agent_id: runtime.activeAgentId, + conversation_id: runtime.activeConversationId, + started_at: runtime.activeRunStartedAt, + }, + pending_control_requests: pendingControlRequests, + queue: { + queue_len: runtime.queueRuntime.length, + pending_turns: runtime.pendingTurns, + items: queueItems, + }, + }; +} + +function sendStateSnapshot(socket: WebSocket, runtime: ListenerRuntime): void { + const stateSeq = nextEventSeq(runtime); + if (stateSeq === null) { + return; + } + const stateResponse = buildStateResponse(runtime, stateSeq); + sendClientMessage(socket, stateResponse, runtime); +} + +function emitCancelAck( + socket: WebSocket, + runtime: ListenerRuntime, + params: { + requestId: string; + accepted: boolean; + reason?: string; + runId?: string | null; + }, +): void { + emitToWS(socket, { + type: "cancel_ack", + request_id: params.requestId, + accepted: params.accepted, + reason: params.reason, + run_id: params.runId ?? runtime.activeRunId, + session_id: runtime.sessionId, + uuid: `cancel-ack-${params.requestId}`, + } as CancelAckMessage); +} + +function sendClientMessage( + socket: WebSocket, + payload: ClientMessage, + runtime: ListenerRuntime | null = activeRuntime, +): void { if (socket.readyState === WebSocket.OPEN) { - safeEmitWsEvent("send", "client", payload); - socket.send(JSON.stringify(payload)); + let outbound = payload as unknown as Record; + if (payload.type !== "ping") { + const hasEventSeq = typeof outbound.event_seq === "number"; + if (!hasEventSeq) { + const eventSeq = nextEventSeq(runtime); + if (eventSeq !== null) { + outbound = { + ...outbound, + event_seq: eventSeq, + session_id: + typeof outbound.session_id === "string" + ? outbound.session_id + : runtime?.sessionId, + }; + } + } else if ( + typeof outbound.session_id !== "string" && + runtime?.sessionId + ) { + outbound = { + ...outbound, + session_id: runtime.sessionId, + }; + } + } + safeEmitWsEvent("send", "client", outbound); + socket.send(JSON.stringify(outbound)); } } function sendControlMessageOverWebSocket( socket: WebSocket, payload: ControlRequest, + runtime: ListenerRuntime | null = activeRuntime, ): void { // Central hook for protocol-only outbound WS messages so future // filtering/mutation can be added without touching approval flow. - safeEmitWsEvent("send", "control", payload); - socket.send(JSON.stringify(payload)); + const eventSeq = nextEventSeq(runtime); + const outbound = + eventSeq === null + ? payload + : { + ...payload, + event_seq: eventSeq, + session_id: runtime?.sessionId, + }; + safeEmitWsEvent("send", "control", outbound); + socket.send(JSON.stringify(outbound)); } // ── Typed protocol event adapter ──────────────────────────────── @@ -437,6 +681,7 @@ function sendControlMessageOverWebSocket( export type WsProtocolEvent = | MessageWire | AutoApprovalMessage + | CancelAckMessage | ErrorMessage | RetryMessage | RecoveryMessage @@ -453,8 +698,22 @@ export type WsProtocolEvent = */ function emitToWS(socket: WebSocket, event: WsProtocolEvent): void { if (socket.readyState === WebSocket.OPEN) { - safeEmitWsEvent("send", "protocol", event); - socket.send(JSON.stringify(event)); + const runtime = activeRuntime; + const eventSeq = nextEventSeq(runtime); + const eventRecord = event as unknown as Record; + const outbound = + eventSeq === null + ? eventRecord + : { + ...eventRecord, + event_seq: eventSeq, + session_id: + typeof eventRecord.session_id === "string" + ? eventRecord.session_id + : runtime?.sessionId, + }; + safeEmitWsEvent("send", "protocol", outbound); + socket.send(JSON.stringify(outbound)); } } @@ -470,6 +729,7 @@ async function sendMessageStreamWithRetry( opts: Parameters[2], socket: WebSocket, runtime: ListenerRuntime, + abortSignal?: AbortSignal, ): Promise>> { let transientRetries = 0; let conversationBusyRetries = 0; @@ -477,9 +737,24 @@ async function sendMessageStreamWithRetry( // eslint-disable-next-line no-constant-condition while (true) { + if (abortSignal?.aborted) { + throw new Error("Cancelled by user"); + } + try { - return await sendMessageStream(conversationId, messages, opts); + return await sendMessageStream( + conversationId, + messages, + opts, + abortSignal + ? { maxRetries: 0, signal: abortSignal } + : { maxRetries: 0 }, + ); } catch (preStreamError) { + if (abortSignal?.aborted) { + throw new Error("Cancelled by user"); + } + const errorDetail = extractConflictDetail(preStreamError); const action = getPreStreamErrorAction( errorDetail, @@ -523,6 +798,9 @@ async function sendMessageStreamWithRetry( } as RetryMessage); await new Promise((resolve) => setTimeout(resolve, delayMs)); + if (abortSignal?.aborted) { + throw new Error("Cancelled by user"); + } continue; } @@ -542,6 +820,9 @@ async function sendMessageStreamWithRetry( } as RetryMessage); await new Promise((resolve) => setTimeout(resolve, delayMs)); + if (abortSignal?.aborted) { + throw new Error("Cancelled by user"); + } continue; } @@ -591,7 +872,11 @@ export function requestApprovalOverWS( } return new Promise((resolve, reject) => { - runtime.pendingApprovalResolvers.set(requestId, { resolve, reject }); + runtime.pendingApprovalResolvers.set(requestId, { + resolve, + reject, + controlRequest, + }); try { sendControlMessageOverWebSocket(socket, controlRequest); } catch (error) { @@ -1061,6 +1346,79 @@ async function connectWithRetry( return; } + if (parsed.type === "cancel_run") { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + const requestId = + typeof parsed.request_id === "string" && parsed.request_id.length > 0 + ? parsed.request_id + : `cancel-${crypto.randomUUID()}`; + const requestedRunId = + typeof parsed.run_id === "string" ? parsed.run_id : runtime.activeRunId; + const hasPendingApprovals = runtime.pendingApprovalResolvers.size > 0; + const hasActiveTurn = runtime.isProcessing; + + if (!hasActiveTurn && !hasPendingApprovals) { + emitCancelAck(socket, runtime, { + requestId, + accepted: false, + reason: "no_active_turn", + runId: requestedRunId, + }); + return; + } + + runtime.cancelRequested = true; + if ( + runtime.activeAbortController && + !runtime.activeAbortController.signal.aborted + ) { + runtime.activeAbortController.abort(); + } + if (hasPendingApprovals) { + rejectPendingApprovalResolvers(runtime, "Cancelled by user"); + } + emitCancelAck(socket, runtime, { + requestId, + accepted: true, + runId: requestedRunId, + }); + return; + } + + if (parsed.type === "get_state") { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + // If we're blocked on an approval callback, don't queue behind the + // pending turn; respond immediately so refreshed clients can render the + // approval card needed to unblock execution. + if (runtime.pendingApprovalResolvers.size > 0) { + sendStateSnapshot(socket, runtime); + return; + } + + // Serialize snapshot generation with the same message queue used for + // message processing so reconnect snapshots cannot race in-flight turns. + runtime.messageQueue = runtime.messageQueue + .then(async () => { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + sendStateSnapshot(socket, runtime); + }) + .catch((error: unknown) => { + if (process.env.DEBUG) { + console.error("[Listen] Error handling queued get_state:", error); + } + }); + return; + } + if (parsed.type === "recover_pending_approvals") { if (runtime !== activeRuntime || runtime.intentionallyClosed) { return; @@ -1265,6 +1623,12 @@ async function handleIncomingMessage( const msgRunIds: string[] = []; runtime.isProcessing = true; + runtime.cancelRequested = false; + runtime.activeAbortController = new AbortController(); + runtime.activeAgentId = agentId ?? null; + runtime.activeConversationId = conversationId; + runtime.activeRunId = null; + runtime.activeRunStartedAt = new Date().toISOString(); try { // Latch capability: once seen, always use blocking path (strict check to avoid truthy strings) @@ -1274,6 +1638,7 @@ async function handleIncomingMessage( if (!agentId) { runtime.isProcessing = false; + clearActiveRunState(runtime); return; } @@ -1362,6 +1727,7 @@ async function handleIncomingMessage( { agentId, streamTokens: true, background: true }, socket, runtime, + runtime.activeAbortController.signal, ); turnToolContextId = getStreamToolContextId( @@ -1380,12 +1746,15 @@ async function handleIncomingMessage( stream as Stream, buffers, () => {}, - undefined, + runtime.activeAbortController.signal, undefined, ({ chunk, shouldOutput, errorInfo }) => { const maybeRunId = (chunk as { run_id?: unknown }).run_id; if (typeof maybeRunId === "string") { runId = maybeRunId; + if (runtime.activeRunId !== maybeRunId) { + runtime.activeRunId = maybeRunId; + } if (!runIdSent) { runIdSent = true; msgRunIds.push(maybeRunId); @@ -1433,6 +1802,7 @@ async function handleIncomingMessage( if (stopReason === "end_turn") { runtime.lastStopReason = "end_turn"; runtime.isProcessing = false; + clearActiveRunState(runtime); if (runtime.controlResponseCapable) { emitToWS(socket, { @@ -1459,10 +1829,43 @@ async function handleIncomingMessage( break; } - // Case 2: Error or cancelled + // Case 2: Explicit cancellation + if (stopReason === "cancelled") { + runtime.lastStopReason = "cancelled"; + runtime.isProcessing = false; + clearActiveRunState(runtime); + + if (runtime.controlResponseCapable) { + emitToWS(socket, { + type: "result", + subtype: "interrupted", + agent_id: agentId, + conversation_id: conversationId, + duration_ms: performance.now() - msgStartTime, + duration_api_ms: 0, + num_turns: msgTurnCount, + result: null, + run_ids: msgRunIds, + usage: null, + stop_reason: "cancelled", + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); + } else { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason: "cancelled", + }); + } + break; + } + + // Case 3: Error if (stopReason !== "requires_approval") { runtime.lastStopReason = stopReason; runtime.isProcessing = false; + clearActiveRunState(runtime); emitToWS(socket, { type: "error", @@ -1498,11 +1901,12 @@ async function handleIncomingMessage( break; } - // Case 3: Requires approval - classify and handle based on permission mode + // Case 4: Requires approval - classify and handle based on permission mode if (approvals.length === 0) { // Unexpected: requires_approval but no approvals runtime.lastStopReason = "error"; runtime.isProcessing = false; + clearActiveRunState(runtime); sendClientMessage(socket, { type: "result", @@ -1575,10 +1979,12 @@ async function handleIncomingMessage( // Handle tools that need user input if (needsUserInput.length > 0) { + runtime.lastStopReason = "requires_approval"; + if (!runtime.controlResponseCapable) { // Legacy path: break out, let cloud re-enter with ApprovalCreate - runtime.lastStopReason = "requires_approval"; runtime.isProcessing = false; + clearActiveRunState(runtime); sendClientMessage(socket, { type: "result", @@ -1667,7 +2073,10 @@ async function handleIncomingMessage( const executionResults = await executeApprovalBatch( decisions, undefined, - { toolContextId: turnToolContextId ?? undefined }, + { + toolContextId: turnToolContextId ?? undefined, + abortSignal: runtime.activeAbortController.signal, + }, ); // Create fresh approval stream for next iteration @@ -1682,14 +2091,47 @@ async function handleIncomingMessage( { agentId, streamTokens: true, background: true }, socket, runtime, + runtime.activeAbortController.signal, ); turnToolContextId = getStreamToolContextId( stream as Stream, ); } } catch (error) { + if (runtime.cancelRequested) { + runtime.lastStopReason = "cancelled"; + runtime.isProcessing = false; + clearActiveRunState(runtime); + + if (runtime.controlResponseCapable) { + emitToWS(socket, { + type: "result", + subtype: "interrupted", + agent_id: agentId || "", + conversation_id: conversationId, + duration_ms: performance.now() - msgStartTime, + duration_api_ms: 0, + num_turns: msgTurnCount, + result: null, + run_ids: msgRunIds, + usage: null, + stop_reason: "cancelled", + session_id: runtime.sessionId, + uuid: `result-${crypto.randomUUID()}`, + }); + } else { + sendClientMessage(socket, { + type: "result", + success: false, + stopReason: "cancelled", + }); + } + return; + } + runtime.lastStopReason = "error"; runtime.isProcessing = false; + clearActiveRunState(runtime); const errorMessage = error instanceof Error ? error.message : String(error); emitToWS(socket, { @@ -1726,6 +2168,9 @@ async function handleIncomingMessage( if (process.env.DEBUG) { console.error("[Listen] Error handling message:", error); } + } finally { + runtime.activeAbortController = null; + runtime.cancelRequested = false; } }