improve streaming resilience for remote mode (#1171)
This commit is contained in:
@@ -60,6 +60,7 @@ interface StartListenerOptions {
|
||||
connectionName: string;
|
||||
onConnected: (connectionId: string) => void;
|
||||
onDisconnected: () => void;
|
||||
onNeedsReregister?: () => void;
|
||||
onError: (error: Error) => void;
|
||||
onStatusChange?: (
|
||||
status: "idle" | "receiving" | "processing",
|
||||
@@ -81,6 +82,13 @@ interface PongMessage {
|
||||
type: "pong";
|
||||
}
|
||||
|
||||
interface StatusMessage {
|
||||
type: "status";
|
||||
currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
|
||||
lastStopReason: string | null;
|
||||
isProcessing: boolean;
|
||||
}
|
||||
|
||||
interface IncomingMessage {
|
||||
type: "message";
|
||||
agentId?: string;
|
||||
@@ -118,16 +126,30 @@ interface ModeChangedMessage {
|
||||
error?: string;
|
||||
}
|
||||
|
||||
interface GetStatusMessage {
|
||||
type: "get_status";
|
||||
}
|
||||
|
||||
interface StatusResponseMessage {
|
||||
type: "status_response";
|
||||
currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
|
||||
lastStopReason: string | null;
|
||||
isProcessing: boolean;
|
||||
}
|
||||
|
||||
type ServerMessage =
|
||||
| PongMessage
|
||||
| StatusMessage
|
||||
| IncomingMessage
|
||||
| ModeChangeMessage
|
||||
| GetStatusMessage
|
||||
| WsControlResponse;
|
||||
type ClientMessage =
|
||||
| PingMessage
|
||||
| ResultMessage
|
||||
| RunStartedMessage
|
||||
| ModeChangedMessage;
|
||||
| ModeChangedMessage
|
||||
| StatusResponseMessage;
|
||||
|
||||
type PendingApprovalResolver = {
|
||||
resolve: (response: ControlResponseBody) => void;
|
||||
@@ -146,6 +168,10 @@ type ListenerRuntime = {
|
||||
controlResponseCapable: boolean;
|
||||
/** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */
|
||||
sessionId: string;
|
||||
/** Last stop reason from completed run */
|
||||
lastStopReason: string | null;
|
||||
/** Whether currently processing a message */
|
||||
isProcessing: boolean;
|
||||
/** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */
|
||||
queueRuntime: QueueRuntime;
|
||||
/** Count of turns currently queued or in-flight in the promise chain. Incremented
|
||||
@@ -213,6 +239,8 @@ function createRuntime(): ListenerRuntime {
|
||||
pendingApprovalResolvers: new Map(),
|
||||
controlResponseCapable: false,
|
||||
sessionId: `listen-${crypto.randomUUID()}`,
|
||||
lastStopReason: null,
|
||||
isProcessing: false,
|
||||
pendingTurns: 0,
|
||||
// queueRuntime assigned below — needs runtime ref in callbacks
|
||||
queueRuntime: null as unknown as QueueRuntime,
|
||||
@@ -336,8 +364,10 @@ export function parseServerMessage(
|
||||
const parsed = JSON.parse(raw) as { type?: string; response?: unknown };
|
||||
if (
|
||||
parsed.type === "pong" ||
|
||||
parsed.type === "status" ||
|
||||
parsed.type === "message" ||
|
||||
parsed.type === "mode_change"
|
||||
parsed.type === "mode_change" ||
|
||||
parsed.type === "get_status"
|
||||
) {
|
||||
return parsed as ServerMessage;
|
||||
}
|
||||
@@ -724,6 +754,12 @@ async function connectWithRetry(
|
||||
|
||||
socket.on("message", (data: WebSocket.RawData) => {
|
||||
const parsed = parseServerMessage(data);
|
||||
if (process.env.DEBUG) {
|
||||
console.log(
|
||||
`[Listen] Received message: ${JSON.stringify(parsed, null, 2)}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (!parsed) {
|
||||
return;
|
||||
}
|
||||
@@ -736,12 +772,41 @@ async function connectWithRetry(
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle status updates from cloud (response to ping)
|
||||
if (parsed.type === "status") {
|
||||
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Update runtime state from cloud's view
|
||||
// Only update lastStopReason if we're not currently processing
|
||||
if (!runtime.isProcessing && parsed.lastStopReason !== undefined) {
|
||||
runtime.lastStopReason = parsed.lastStopReason;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle mode change messages immediately (not queued)
|
||||
if (parsed.type === "mode_change") {
|
||||
handleModeChange(parsed, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle status request from cloud (immediate response)
|
||||
if (parsed.type === "get_status") {
|
||||
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
sendClientMessage(socket, {
|
||||
type: "status_response",
|
||||
currentMode: permissionMode.getMode(),
|
||||
lastStopReason: runtime.lastStopReason,
|
||||
isProcessing: runtime.isProcessing,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle incoming messages (queued for sequential processing)
|
||||
if (parsed.type === "message") {
|
||||
// Queue lifecycle tracking: only enqueue if first payload is a
|
||||
@@ -831,6 +896,20 @@ async function connectWithRetry(
|
||||
return;
|
||||
}
|
||||
|
||||
// 1008: Environment not found - need to re-register
|
||||
if (code === 1008) {
|
||||
if (process.env.DEBUG) {
|
||||
console.log("[Listen] Environment not found, re-registering...");
|
||||
}
|
||||
// Stop retry loop and signal that we need to re-register
|
||||
if (opts.onNeedsReregister) {
|
||||
opts.onNeedsReregister();
|
||||
} else {
|
||||
opts.onDisconnected();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// If we had connected before, restart backoff from zero for this outage window.
|
||||
const nextAttempt = runtime.hasSuccessfulConnection ? 0 : attempt + 1;
|
||||
const nextStartTime = runtime.hasSuccessfulConnection
|
||||
@@ -874,6 +953,8 @@ async function handleIncomingMessage(
|
||||
let msgTurnCount = 0;
|
||||
const msgRunIds: string[] = [];
|
||||
|
||||
runtime.isProcessing = true;
|
||||
|
||||
try {
|
||||
// Latch capability: once seen, always use blocking path (strict check to avoid truthy strings)
|
||||
if (msg.supportsControlResponse === true) {
|
||||
@@ -881,6 +962,7 @@ async function handleIncomingMessage(
|
||||
}
|
||||
|
||||
if (!agentId) {
|
||||
runtime.isProcessing = false;
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1038,6 +1120,9 @@ async function handleIncomingMessage(
|
||||
|
||||
// Case 1: Turn ended normally
|
||||
if (stopReason === "end_turn") {
|
||||
runtime.lastStopReason = "end_turn";
|
||||
runtime.isProcessing = false;
|
||||
|
||||
if (runtime.controlResponseCapable) {
|
||||
emitToWS(socket, {
|
||||
type: "result",
|
||||
@@ -1065,6 +1150,9 @@ async function handleIncomingMessage(
|
||||
|
||||
// Case 2: Error or cancelled
|
||||
if (stopReason !== "requires_approval") {
|
||||
runtime.lastStopReason = stopReason;
|
||||
runtime.isProcessing = false;
|
||||
|
||||
emitToWS(socket, {
|
||||
type: "error",
|
||||
message: `Unexpected stop reason: ${stopReason}`,
|
||||
@@ -1102,6 +1190,9 @@ async function handleIncomingMessage(
|
||||
// Case 3: Requires approval - classify and handle based on permission mode
|
||||
if (approvals.length === 0) {
|
||||
// Unexpected: requires_approval but no approvals
|
||||
runtime.lastStopReason = "error";
|
||||
runtime.isProcessing = false;
|
||||
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: false,
|
||||
@@ -1175,6 +1266,9 @@ async function handleIncomingMessage(
|
||||
if (needsUserInput.length > 0) {
|
||||
if (!runtime.controlResponseCapable) {
|
||||
// Legacy path: break out, let cloud re-enter with ApprovalCreate
|
||||
runtime.lastStopReason = "requires_approval";
|
||||
runtime.isProcessing = false;
|
||||
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: false,
|
||||
@@ -1283,6 +1377,9 @@ async function handleIncomingMessage(
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
runtime.lastStopReason = "error";
|
||||
runtime.isProcessing = false;
|
||||
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
emitToWS(socket, {
|
||||
type: "error",
|
||||
|
||||
Reference in New Issue
Block a user