diff --git a/src/cli/commands/listen.ts b/src/cli/commands/listen.ts index c224582..9e3bdf2 100644 --- a/src/cli/commands/listen.ts +++ b/src/cli/commands/listen.ts @@ -265,88 +265,153 @@ export async function handleListen( "../../websocket/listen-client" ); - await startListenerClient({ - connectionId, - wsUrl, - deviceId, - connectionName, - onStatusChange: (status, connId) => { - const statusText = - status === "receiving" - ? "Receiving message" - : status === "processing" - ? "Processing message" - : "Awaiting instructions"; + // Helper to start client with given connection details + const startClient = async ( + connId: string, + wsUrlValue: string, + ): Promise => { + await startListenerClient({ + connectionId: connId, + wsUrl: wsUrlValue, + deviceId, + connectionName, + onStatusChange: (status, id) => { + const statusText = + status === "receiving" + ? "Receiving message" + : status === "processing" + ? "Processing message" + : "Awaiting instructions"; - const url = buildConnectionUrl(connId); - const urlText = url ? `\n\nConnect to this environment:\n${url}` : ""; + const url = buildConnectionUrl(id); + const urlText = url ? `\n\nConnect to this environment:\n${url}` : ""; - updateCommandResult( - ctx.buffersRef, - ctx.refreshDerived, - cmdId, - msg, - `Environment initialized: ${connectionName}\n${statusText}${urlText}`, - true, - "finished", - ); - }, - onRetrying: (attempt, _maxAttempts, nextRetryIn, connId) => { - const url = buildConnectionUrl(connId); - const urlText = url ? `\n\nConnect to this environment:\n${url}` : ""; + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `Environment initialized: ${connectionName}\n${statusText}${urlText}`, + true, + "finished", + ); + }, + onRetrying: (attempt, _maxAttempts, nextRetryIn, id) => { + const url = buildConnectionUrl(id); + const urlText = url ? `\n\nConnect to this environment:\n${url}` : ""; - updateCommandResult( - ctx.buffersRef, - ctx.refreshDerived, - cmdId, - msg, - `Environment initialized: ${connectionName}\n` + - `Reconnecting to Letta Cloud...\n` + - `Attempt ${attempt}, retrying in ${Math.round(nextRetryIn / 1000)}s${urlText}`, - true, - "running", - ); - }, - onConnected: (connId) => { - const url = buildConnectionUrl(connId); - const urlText = url ? `\n\nConnect to this environment:\n${url}` : ""; + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `Environment initialized: ${connectionName}\n` + + `Reconnecting to Letta Cloud...\n` + + `Attempt ${attempt}, retrying in ${Math.round(nextRetryIn / 1000)}s${urlText}`, + true, + "running", + ); + }, + onConnected: (id) => { + const url = buildConnectionUrl(id); + const urlText = url ? `\n\nConnect to this environment:\n${url}` : ""; - updateCommandResult( - ctx.buffersRef, - ctx.refreshDerived, - cmdId, - msg, - `Environment initialized: ${connectionName}\nAwaiting instructions${urlText}`, - true, - "finished", - ); - ctx.setCommandRunning(false); - }, - onDisconnected: () => { - updateCommandResult( - ctx.buffersRef, - ctx.refreshDerived, - cmdId, - msg, - `✗ Listener disconnected\n\n` + `Connection to Letta Cloud was lost.`, - false, - "finished", - ); - ctx.setCommandRunning(false); - }, - onError: (error: Error) => { - updateCommandResult( - ctx.buffersRef, - ctx.refreshDerived, - cmdId, - msg, - `✗ Listener error: ${getErrorMessage(error)}`, - false, - "finished", - ); - ctx.setCommandRunning(false); - }, - }); + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `Environment initialized: ${connectionName}\nAwaiting instructions${urlText}`, + true, + "finished", + ); + ctx.setCommandRunning(false); + }, + onNeedsReregister: async () => { + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `Environment expired, re-registering "${connectionName}"...`, + true, + "running", + ); + + try { + // Re-register to get new connectionId + const reregisterResponse = await fetch(registerUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, + "X-Letta-Source": "letta-code", + }, + body: JSON.stringify({ + deviceId, + connectionName, + }), + }); + + if (!reregisterResponse.ok) { + const error = (await reregisterResponse.json()) as { + message?: string; + }; + throw new Error(error.message || "Re-registration failed"); + } + + const reregisterData = (await reregisterResponse.json()) as { + connectionId: string; + wsUrl: string; + }; + + // Restart client with new connectionId + await startClient( + reregisterData.connectionId, + reregisterData.wsUrl, + ); + } catch (error) { + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `✗ Re-registration failed: ${getErrorMessage(error)}`, + false, + "finished", + ); + ctx.setCommandRunning(false); + } + }, + onDisconnected: () => { + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `✗ Listener disconnected\n\n` + + `Connection to Letta Cloud was lost.`, + false, + "finished", + ); + ctx.setCommandRunning(false); + }, + onError: (error: Error) => { + updateCommandResult( + ctx.buffersRef, + ctx.refreshDerived, + cmdId, + msg, + `✗ Listener error: ${getErrorMessage(error)}`, + false, + "finished", + ); + ctx.setCommandRunning(false); + }, + }); + }; + + await startClient(connectionId, wsUrl); } catch (error) { updateCommandResult( ctx.buffersRef, diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 8a3928e..286be14 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -60,6 +60,7 @@ interface StartListenerOptions { connectionName: string; onConnected: (connectionId: string) => void; onDisconnected: () => void; + onNeedsReregister?: () => void; onError: (error: Error) => void; onStatusChange?: ( status: "idle" | "receiving" | "processing", @@ -81,6 +82,13 @@ interface PongMessage { type: "pong"; } +interface StatusMessage { + type: "status"; + currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; + lastStopReason: string | null; + isProcessing: boolean; +} + interface IncomingMessage { type: "message"; agentId?: string; @@ -118,16 +126,30 @@ interface ModeChangedMessage { error?: string; } +interface GetStatusMessage { + type: "get_status"; +} + +interface StatusResponseMessage { + type: "status_response"; + currentMode: "default" | "acceptEdits" | "plan" | "bypassPermissions"; + lastStopReason: string | null; + isProcessing: boolean; +} + type ServerMessage = | PongMessage + | StatusMessage | IncomingMessage | ModeChangeMessage + | GetStatusMessage | WsControlResponse; type ClientMessage = | PingMessage | ResultMessage | RunStartedMessage - | ModeChangedMessage; + | ModeChangedMessage + | StatusResponseMessage; type PendingApprovalResolver = { resolve: (response: ControlResponseBody) => void; @@ -146,6 +168,10 @@ type ListenerRuntime = { controlResponseCapable: boolean; /** Stable session ID for MessageEnvelope-based emissions (scoped to runtime lifecycle). */ sessionId: string; + /** Last stop reason from completed run */ + lastStopReason: string | null; + /** Whether currently processing a message */ + isProcessing: boolean; /** Queue lifecycle tracking — parallel tracking layer, does not affect message processing. */ queueRuntime: QueueRuntime; /** Count of turns currently queued or in-flight in the promise chain. Incremented @@ -213,6 +239,8 @@ function createRuntime(): ListenerRuntime { pendingApprovalResolvers: new Map(), controlResponseCapable: false, sessionId: `listen-${crypto.randomUUID()}`, + lastStopReason: null, + isProcessing: false, pendingTurns: 0, // queueRuntime assigned below — needs runtime ref in callbacks queueRuntime: null as unknown as QueueRuntime, @@ -336,8 +364,10 @@ export function parseServerMessage( const parsed = JSON.parse(raw) as { type?: string; response?: unknown }; if ( parsed.type === "pong" || + parsed.type === "status" || parsed.type === "message" || - parsed.type === "mode_change" + parsed.type === "mode_change" || + parsed.type === "get_status" ) { return parsed as ServerMessage; } @@ -724,6 +754,12 @@ async function connectWithRetry( socket.on("message", (data: WebSocket.RawData) => { const parsed = parseServerMessage(data); + if (process.env.DEBUG) { + console.log( + `[Listen] Received message: ${JSON.stringify(parsed, null, 2)}`, + ); + } + if (!parsed) { return; } @@ -736,12 +772,41 @@ async function connectWithRetry( return; } + // Handle status updates from cloud (response to ping) + if (parsed.type === "status") { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + // Update runtime state from cloud's view + // Only update lastStopReason if we're not currently processing + if (!runtime.isProcessing && parsed.lastStopReason !== undefined) { + runtime.lastStopReason = parsed.lastStopReason; + } + return; + } + // Handle mode change messages immediately (not queued) if (parsed.type === "mode_change") { handleModeChange(parsed, socket); return; } + // Handle status request from cloud (immediate response) + if (parsed.type === "get_status") { + if (runtime !== activeRuntime || runtime.intentionallyClosed) { + return; + } + + sendClientMessage(socket, { + type: "status_response", + currentMode: permissionMode.getMode(), + lastStopReason: runtime.lastStopReason, + isProcessing: runtime.isProcessing, + }); + return; + } + // Handle incoming messages (queued for sequential processing) if (parsed.type === "message") { // Queue lifecycle tracking: only enqueue if first payload is a @@ -831,6 +896,20 @@ async function connectWithRetry( return; } + // 1008: Environment not found - need to re-register + if (code === 1008) { + if (process.env.DEBUG) { + console.log("[Listen] Environment not found, re-registering..."); + } + // Stop retry loop and signal that we need to re-register + if (opts.onNeedsReregister) { + opts.onNeedsReregister(); + } else { + opts.onDisconnected(); + } + return; + } + // If we had connected before, restart backoff from zero for this outage window. const nextAttempt = runtime.hasSuccessfulConnection ? 0 : attempt + 1; const nextStartTime = runtime.hasSuccessfulConnection @@ -874,6 +953,8 @@ async function handleIncomingMessage( let msgTurnCount = 0; const msgRunIds: string[] = []; + runtime.isProcessing = true; + try { // Latch capability: once seen, always use blocking path (strict check to avoid truthy strings) if (msg.supportsControlResponse === true) { @@ -881,6 +962,7 @@ async function handleIncomingMessage( } if (!agentId) { + runtime.isProcessing = false; return; } @@ -1038,6 +1120,9 @@ async function handleIncomingMessage( // Case 1: Turn ended normally if (stopReason === "end_turn") { + runtime.lastStopReason = "end_turn"; + runtime.isProcessing = false; + if (runtime.controlResponseCapable) { emitToWS(socket, { type: "result", @@ -1065,6 +1150,9 @@ async function handleIncomingMessage( // Case 2: Error or cancelled if (stopReason !== "requires_approval") { + runtime.lastStopReason = stopReason; + runtime.isProcessing = false; + emitToWS(socket, { type: "error", message: `Unexpected stop reason: ${stopReason}`, @@ -1102,6 +1190,9 @@ async function handleIncomingMessage( // Case 3: Requires approval - classify and handle based on permission mode if (approvals.length === 0) { // Unexpected: requires_approval but no approvals + runtime.lastStopReason = "error"; + runtime.isProcessing = false; + sendClientMessage(socket, { type: "result", success: false, @@ -1175,6 +1266,9 @@ async function handleIncomingMessage( if (needsUserInput.length > 0) { if (!runtime.controlResponseCapable) { // Legacy path: break out, let cloud re-enter with ApprovalCreate + runtime.lastStopReason = "requires_approval"; + runtime.isProcessing = false; + sendClientMessage(socket, { type: "result", success: false, @@ -1283,6 +1377,9 @@ async function handleIncomingMessage( ); } } catch (error) { + runtime.lastStopReason = "error"; + runtime.isProcessing = false; + const errorMessage = error instanceof Error ? error.message : String(error); emitToWS(socket, { type: "error",