diff --git a/src/cli/subcommands/listen.tsx b/src/cli/subcommands/listen.tsx index 61d59c3..d74793f 100644 --- a/src/cli/subcommands/listen.tsx +++ b/src/cli/subcommands/listen.tsx @@ -36,6 +36,26 @@ function PromptEnvName(props: { ); } +function formatTimestamp(): string { + const now = new Date(); + const h = String(now.getHours()).padStart(2, "0"); + const m = String(now.getMinutes()).padStart(2, "0"); + const s = String(now.getSeconds()).padStart(2, "0"); + const ms = String(now.getMilliseconds()).padStart(3, "0"); + return `${h}:${m}:${s}.${ms}`; +} + +function debugWsLogger( + direction: "send" | "recv", + label: "client" | "protocol" | "control" | "lifecycle", + event: unknown, +): void { + const arrow = direction === "send" ? "\u2192 send" : "\u2190 recv"; + const tag = label === "client" ? "" : ` (${label})`; + const json = JSON.stringify(event); + console.log(`[${formatTimestamp()}] ${arrow}${tag} ${json}`); +} + export async function runListenSubcommand(argv: string[]): Promise { // Parse arguments const { values } = parseArgs({ @@ -43,13 +63,16 @@ export async function runListenSubcommand(argv: string[]): Promise { options: { envName: { type: "string" }, help: { type: "boolean", short: "h" }, + debug: { type: "boolean" }, }, allowPositionals: false, }); + const debugMode = !!values.debug; + // Show help if (values.help) { - console.log("Usage: letta listen [--env-name ]\n"); + console.log("Usage: letta remote [--env-name ] [--debug]\n"); console.log( "Register this letta-code instance to receive messages from Letta Cloud.\n", ); @@ -57,12 +80,16 @@ export async function runListenSubcommand(argv: string[]): Promise { console.log( " --env-name Friendly name for this environment (uses hostname if not provided)", ); + console.log( + " --debug Plain-text mode: log all WebSocket events instead of interactive UI", + ); console.log(" -h, --help Show this help message\n"); console.log("Examples:"); console.log( - " letta listen # Uses hostname as default", + " letta remote # Uses hostname as default", ); - console.log(' letta listen --env-name "work-laptop"\n'); + console.log(' letta remote --env-name "work-laptop"'); + console.log(" letta remote --debug # Log all WS events\n"); console.log( "Once connected, this instance will listen for incoming messages from cloud agents.", ); @@ -89,6 +116,10 @@ export async function runListenSubcommand(argv: string[]): Promise { if (savedName) { // Reuse saved name connectionName = savedName; + } else if (debugMode) { + // In debug mode, default to hostname without prompting + connectionName = hostname(); + settingsManager.setListenerEnvName(connectionName); } else { // No saved name - prompt user connectionName = await new Promise((resolve) => { @@ -125,6 +156,12 @@ export async function runListenSubcommand(argv: string[]): Promise { const serverUrl = getServerUrl(); const registerUrl = `${serverUrl}/v1/environments/register`; + if (debugMode) { + console.log(`[${formatTimestamp()}] Registering with ${registerUrl}`); + console.log(`[${formatTimestamp()}] deviceId: ${deviceId}`); + console.log(`[${formatTimestamp()}] connectionName: ${connectionName}`); + } + const registerResponse = await fetch(registerUrl, { method: "POST", headers: { @@ -149,62 +186,103 @@ export async function runListenSubcommand(argv: string[]): Promise { wsUrl: string; }; - // Clear screen and render Ink UI - console.clear(); - - let updateStatusCallback: - | ((status: "idle" | "receiving" | "processing") => void) - | null = null; - let updateRetryStatusCallback: - | ((attempt: number, nextRetryIn: number) => void) - | null = null; - let clearRetryStatusCallback: (() => void) | null = null; - - const { unmount } = render( - { - updateStatusCallback = callbacks.updateStatus; - updateRetryStatusCallback = callbacks.updateRetryStatus; - clearRetryStatusCallback = callbacks.clearRetryStatus; - }} - />, - ); + if (debugMode) { + console.log(`[${formatTimestamp()}] Registered successfully`); + console.log(`[${formatTimestamp()}] connectionId: ${connectionId}`); + console.log(`[${formatTimestamp()}] wsUrl: ${wsUrl}`); + console.log(`[${formatTimestamp()}] Connecting WebSocket...`); + console.log(""); + } // Import and start WebSocket client const { startListenerClient } = await import( "../../websocket/listen-client" ); - await startListenerClient({ - connectionId, - wsUrl, - deviceId, - connectionName, - onStatusChange: (status) => { - clearRetryStatusCallback?.(); - updateStatusCallback?.(status); - }, - onConnected: () => { - clearRetryStatusCallback?.(); - updateStatusCallback?.("idle"); - }, - onRetrying: (attempt, _maxAttempts, nextRetryIn) => { - updateRetryStatusCallback?.(attempt, nextRetryIn); - }, - onDisconnected: () => { - unmount(); - console.log("\n✗ Listener disconnected"); - console.log("Connection to Letta Cloud was lost.\n"); - process.exit(1); - }, - onError: (error: Error) => { - unmount(); - console.error(`\n✗ Listener error: ${error.message}\n`); - process.exit(1); - }, - }); + if (debugMode) { + // Debug mode: plain-text event logging, no Ink UI + await startListenerClient({ + connectionId, + wsUrl, + deviceId, + connectionName, + onWsEvent: debugWsLogger, + onStatusChange: (status) => { + console.log(`[${formatTimestamp()}] status: ${status}`); + }, + onConnected: () => { + console.log( + `[${formatTimestamp()}] Connected. Awaiting instructions.`, + ); + console.log(""); + }, + onRetrying: (attempt, _maxAttempts, nextRetryIn) => { + console.log( + `[${formatTimestamp()}] Reconnecting (attempt ${attempt}, retry in ${Math.round(nextRetryIn / 1000)}s)`, + ); + }, + onDisconnected: () => { + console.log(`[${formatTimestamp()}] Disconnected.`); + process.exit(1); + }, + onError: (error: Error) => { + console.error(`[${formatTimestamp()}] Error: ${error.message}`); + process.exit(1); + }, + }); + } else { + // Normal mode: interactive Ink UI + console.clear(); + + let updateStatusCallback: + | ((status: "idle" | "receiving" | "processing") => void) + | null = null; + let updateRetryStatusCallback: + | ((attempt: number, nextRetryIn: number) => void) + | null = null; + let clearRetryStatusCallback: (() => void) | null = null; + + const { unmount } = render( + { + updateStatusCallback = callbacks.updateStatus; + updateRetryStatusCallback = callbacks.updateRetryStatus; + clearRetryStatusCallback = callbacks.clearRetryStatus; + }} + />, + ); + + await startListenerClient({ + connectionId, + wsUrl, + deviceId, + connectionName, + onStatusChange: (status) => { + clearRetryStatusCallback?.(); + updateStatusCallback?.(status); + }, + onConnected: () => { + clearRetryStatusCallback?.(); + updateStatusCallback?.("idle"); + }, + onRetrying: (attempt, _maxAttempts, nextRetryIn) => { + updateRetryStatusCallback?.(attempt, nextRetryIn); + }, + onDisconnected: () => { + unmount(); + console.log("\n\u2717 Listener disconnected"); + console.log("Connection to Letta Cloud was lost.\n"); + process.exit(1); + }, + onError: (error: Error) => { + unmount(); + console.error(`\n\u2717 Listener error: ${error.message}\n`); + process.exit(1); + }, + }); + } // Keep process alive return new Promise(() => { diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index 286be14..831c082 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -72,6 +72,12 @@ interface StartListenerOptions { nextRetryIn: number, connectionId: string, ) => void; + /** Debug hook: called for every WS frame sent or received. */ + onWsEvent?: ( + direction: "send" | "recv", + label: "client" | "protocol" | "control" | "lifecycle", + event: unknown, + ) => void; } interface PingMessage { @@ -177,6 +183,8 @@ type ListenerRuntime = { /** Count of turns currently queued or in-flight in the promise chain. Incremented * synchronously on message arrival (before .then()) to avoid async scheduling races. */ pendingTurns: number; + /** Optional debug hook for WS event logging. */ + onWsEvent?: StartListenerOptions["onWsEvent"]; }; type ApprovalSlot = @@ -383,8 +391,22 @@ export function parseServerMessage( } } +/** Fire onWsEvent without risking transport disruption. */ +function safeEmitWsEvent( + direction: "send" | "recv", + label: "client" | "protocol" | "control" | "lifecycle", + event: unknown, +): void { + try { + activeRuntime?.onWsEvent?.(direction, label, event); + } catch { + // Debug hook must never break transport flow. + } +} + function sendClientMessage(socket: WebSocket, payload: ClientMessage): void { if (socket.readyState === WebSocket.OPEN) { + safeEmitWsEvent("send", "client", payload); socket.send(JSON.stringify(payload)); } } @@ -395,6 +417,7 @@ function sendControlMessageOverWebSocket( ): void { // Central hook for protocol-only outbound WS messages so future // filtering/mutation can be added without touching approval flow. + safeEmitWsEvent("send", "control", payload); socket.send(JSON.stringify(payload)); } @@ -419,6 +442,7 @@ export type WsProtocolEvent = */ function emitToWS(socket: WebSocket, event: WsProtocolEvent): void { if (socket.readyState === WebSocket.OPEN) { + safeEmitWsEvent("send", "protocol", event); socket.send(JSON.stringify(event)); } } @@ -661,6 +685,7 @@ export async function startListenerClient( } const runtime = createRuntime(); + runtime.onWsEvent = opts.onWsEvent; activeRuntime = runtime; await connectWithRetry(runtime, opts); @@ -737,6 +762,7 @@ async function connectWithRetry( return; } + safeEmitWsEvent("recv", "lifecycle", { type: "_ws_open" }); runtime.hasSuccessfulConnection = true; opts.onConnected(opts.connectionId); @@ -753,7 +779,17 @@ async function connectWithRetry( }); socket.on("message", (data: WebSocket.RawData) => { + const raw = data.toString(); const parsed = parseServerMessage(data); + if (parsed) { + safeEmitWsEvent("recv", "client", parsed); + } else { + // Log unparseable frames so protocol drift is visible in debug mode + safeEmitWsEvent("recv", "lifecycle", { + type: "_ws_unparseable", + raw, + }); + } if (process.env.DEBUG) { console.log( `[Listen] Received message: ${JSON.stringify(parsed, null, 2)}`, @@ -877,6 +913,12 @@ async function connectWithRetry( return; } + safeEmitWsEvent("recv", "lifecycle", { + type: "_ws_close", + code, + reason: reason.toString(), + }); + // Single authoritative queue_cleared emission for all close paths // (intentional and unintentional). Must fire before early returns. runtime.queueRuntime.clear("shutdown"); @@ -925,6 +967,10 @@ async function connectWithRetry( }); socket.on("error", (error: Error) => { + safeEmitWsEvent("recv", "lifecycle", { + type: "_ws_error", + message: error.message, + }); if (process.env.DEBUG) { console.error("[Listen] WebSocket error:", error); }