feat: add --debug flag for plain-text logging (#1215)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -36,6 +36,26 @@ function PromptEnvName(props: {
|
||||
);
|
||||
}
|
||||
|
||||
function formatTimestamp(): string {
|
||||
const now = new Date();
|
||||
const h = String(now.getHours()).padStart(2, "0");
|
||||
const m = String(now.getMinutes()).padStart(2, "0");
|
||||
const s = String(now.getSeconds()).padStart(2, "0");
|
||||
const ms = String(now.getMilliseconds()).padStart(3, "0");
|
||||
return `${h}:${m}:${s}.${ms}`;
|
||||
}
|
||||
|
||||
function debugWsLogger(
|
||||
direction: "send" | "recv",
|
||||
label: "client" | "protocol" | "control" | "lifecycle",
|
||||
event: unknown,
|
||||
): void {
|
||||
const arrow = direction === "send" ? "\u2192 send" : "\u2190 recv";
|
||||
const tag = label === "client" ? "" : ` (${label})`;
|
||||
const json = JSON.stringify(event);
|
||||
console.log(`[${formatTimestamp()}] ${arrow}${tag} ${json}`);
|
||||
}
|
||||
|
||||
export async function runListenSubcommand(argv: string[]): Promise<number> {
|
||||
// Parse arguments
|
||||
const { values } = parseArgs({
|
||||
@@ -43,13 +63,16 @@ export async function runListenSubcommand(argv: string[]): Promise<number> {
|
||||
options: {
|
||||
envName: { type: "string" },
|
||||
help: { type: "boolean", short: "h" },
|
||||
debug: { type: "boolean" },
|
||||
},
|
||||
allowPositionals: false,
|
||||
});
|
||||
|
||||
const debugMode = !!values.debug;
|
||||
|
||||
// Show help
|
||||
if (values.help) {
|
||||
console.log("Usage: letta listen [--env-name <name>]\n");
|
||||
console.log("Usage: letta remote [--env-name <name>] [--debug]\n");
|
||||
console.log(
|
||||
"Register this letta-code instance to receive messages from Letta Cloud.\n",
|
||||
);
|
||||
@@ -57,12 +80,16 @@ export async function runListenSubcommand(argv: string[]): Promise<number> {
|
||||
console.log(
|
||||
" --env-name <name> Friendly name for this environment (uses hostname if not provided)",
|
||||
);
|
||||
console.log(
|
||||
" --debug Plain-text mode: log all WebSocket events instead of interactive UI",
|
||||
);
|
||||
console.log(" -h, --help Show this help message\n");
|
||||
console.log("Examples:");
|
||||
console.log(
|
||||
" letta listen # Uses hostname as default",
|
||||
" letta remote # Uses hostname as default",
|
||||
);
|
||||
console.log(' letta listen --env-name "work-laptop"\n');
|
||||
console.log(' letta remote --env-name "work-laptop"');
|
||||
console.log(" letta remote --debug # Log all WS events\n");
|
||||
console.log(
|
||||
"Once connected, this instance will listen for incoming messages from cloud agents.",
|
||||
);
|
||||
@@ -89,6 +116,10 @@ export async function runListenSubcommand(argv: string[]): Promise<number> {
|
||||
if (savedName) {
|
||||
// Reuse saved name
|
||||
connectionName = savedName;
|
||||
} else if (debugMode) {
|
||||
// In debug mode, default to hostname without prompting
|
||||
connectionName = hostname();
|
||||
settingsManager.setListenerEnvName(connectionName);
|
||||
} else {
|
||||
// No saved name - prompt user
|
||||
connectionName = await new Promise<string>((resolve) => {
|
||||
@@ -125,6 +156,12 @@ export async function runListenSubcommand(argv: string[]): Promise<number> {
|
||||
const serverUrl = getServerUrl();
|
||||
const registerUrl = `${serverUrl}/v1/environments/register`;
|
||||
|
||||
if (debugMode) {
|
||||
console.log(`[${formatTimestamp()}] Registering with ${registerUrl}`);
|
||||
console.log(`[${formatTimestamp()}] deviceId: ${deviceId}`);
|
||||
console.log(`[${formatTimestamp()}] connectionName: ${connectionName}`);
|
||||
}
|
||||
|
||||
const registerResponse = await fetch(registerUrl, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
@@ -149,62 +186,103 @@ export async function runListenSubcommand(argv: string[]): Promise<number> {
|
||||
wsUrl: string;
|
||||
};
|
||||
|
||||
// Clear screen and render Ink UI
|
||||
console.clear();
|
||||
|
||||
let updateStatusCallback:
|
||||
| ((status: "idle" | "receiving" | "processing") => void)
|
||||
| null = null;
|
||||
let updateRetryStatusCallback:
|
||||
| ((attempt: number, nextRetryIn: number) => void)
|
||||
| null = null;
|
||||
let clearRetryStatusCallback: (() => void) | null = null;
|
||||
|
||||
const { unmount } = render(
|
||||
<ListenerStatusUI
|
||||
connectionId={connectionId}
|
||||
envName={connectionName}
|
||||
onReady={(callbacks) => {
|
||||
updateStatusCallback = callbacks.updateStatus;
|
||||
updateRetryStatusCallback = callbacks.updateRetryStatus;
|
||||
clearRetryStatusCallback = callbacks.clearRetryStatus;
|
||||
}}
|
||||
/>,
|
||||
);
|
||||
if (debugMode) {
|
||||
console.log(`[${formatTimestamp()}] Registered successfully`);
|
||||
console.log(`[${formatTimestamp()}] connectionId: ${connectionId}`);
|
||||
console.log(`[${formatTimestamp()}] wsUrl: ${wsUrl}`);
|
||||
console.log(`[${formatTimestamp()}] Connecting WebSocket...`);
|
||||
console.log("");
|
||||
}
|
||||
|
||||
// Import and start WebSocket client
|
||||
const { startListenerClient } = await import(
|
||||
"../../websocket/listen-client"
|
||||
);
|
||||
|
||||
await startListenerClient({
|
||||
connectionId,
|
||||
wsUrl,
|
||||
deviceId,
|
||||
connectionName,
|
||||
onStatusChange: (status) => {
|
||||
clearRetryStatusCallback?.();
|
||||
updateStatusCallback?.(status);
|
||||
},
|
||||
onConnected: () => {
|
||||
clearRetryStatusCallback?.();
|
||||
updateStatusCallback?.("idle");
|
||||
},
|
||||
onRetrying: (attempt, _maxAttempts, nextRetryIn) => {
|
||||
updateRetryStatusCallback?.(attempt, nextRetryIn);
|
||||
},
|
||||
onDisconnected: () => {
|
||||
unmount();
|
||||
console.log("\n✗ Listener disconnected");
|
||||
console.log("Connection to Letta Cloud was lost.\n");
|
||||
process.exit(1);
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
unmount();
|
||||
console.error(`\n✗ Listener error: ${error.message}\n`);
|
||||
process.exit(1);
|
||||
},
|
||||
});
|
||||
if (debugMode) {
|
||||
// Debug mode: plain-text event logging, no Ink UI
|
||||
await startListenerClient({
|
||||
connectionId,
|
||||
wsUrl,
|
||||
deviceId,
|
||||
connectionName,
|
||||
onWsEvent: debugWsLogger,
|
||||
onStatusChange: (status) => {
|
||||
console.log(`[${formatTimestamp()}] status: ${status}`);
|
||||
},
|
||||
onConnected: () => {
|
||||
console.log(
|
||||
`[${formatTimestamp()}] Connected. Awaiting instructions.`,
|
||||
);
|
||||
console.log("");
|
||||
},
|
||||
onRetrying: (attempt, _maxAttempts, nextRetryIn) => {
|
||||
console.log(
|
||||
`[${formatTimestamp()}] Reconnecting (attempt ${attempt}, retry in ${Math.round(nextRetryIn / 1000)}s)`,
|
||||
);
|
||||
},
|
||||
onDisconnected: () => {
|
||||
console.log(`[${formatTimestamp()}] Disconnected.`);
|
||||
process.exit(1);
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
console.error(`[${formatTimestamp()}] Error: ${error.message}`);
|
||||
process.exit(1);
|
||||
},
|
||||
});
|
||||
} else {
|
||||
// Normal mode: interactive Ink UI
|
||||
console.clear();
|
||||
|
||||
let updateStatusCallback:
|
||||
| ((status: "idle" | "receiving" | "processing") => void)
|
||||
| null = null;
|
||||
let updateRetryStatusCallback:
|
||||
| ((attempt: number, nextRetryIn: number) => void)
|
||||
| null = null;
|
||||
let clearRetryStatusCallback: (() => void) | null = null;
|
||||
|
||||
const { unmount } = render(
|
||||
<ListenerStatusUI
|
||||
connectionId={connectionId}
|
||||
envName={connectionName}
|
||||
onReady={(callbacks) => {
|
||||
updateStatusCallback = callbacks.updateStatus;
|
||||
updateRetryStatusCallback = callbacks.updateRetryStatus;
|
||||
clearRetryStatusCallback = callbacks.clearRetryStatus;
|
||||
}}
|
||||
/>,
|
||||
);
|
||||
|
||||
await startListenerClient({
|
||||
connectionId,
|
||||
wsUrl,
|
||||
deviceId,
|
||||
connectionName,
|
||||
onStatusChange: (status) => {
|
||||
clearRetryStatusCallback?.();
|
||||
updateStatusCallback?.(status);
|
||||
},
|
||||
onConnected: () => {
|
||||
clearRetryStatusCallback?.();
|
||||
updateStatusCallback?.("idle");
|
||||
},
|
||||
onRetrying: (attempt, _maxAttempts, nextRetryIn) => {
|
||||
updateRetryStatusCallback?.(attempt, nextRetryIn);
|
||||
},
|
||||
onDisconnected: () => {
|
||||
unmount();
|
||||
console.log("\n\u2717 Listener disconnected");
|
||||
console.log("Connection to Letta Cloud was lost.\n");
|
||||
process.exit(1);
|
||||
},
|
||||
onError: (error: Error) => {
|
||||
unmount();
|
||||
console.error(`\n\u2717 Listener error: ${error.message}\n`);
|
||||
process.exit(1);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
// Keep process alive
|
||||
return new Promise<number>(() => {
|
||||
|
||||
@@ -72,6 +72,12 @@ interface StartListenerOptions {
|
||||
nextRetryIn: number,
|
||||
connectionId: string,
|
||||
) => void;
|
||||
/** Debug hook: called for every WS frame sent or received. */
|
||||
onWsEvent?: (
|
||||
direction: "send" | "recv",
|
||||
label: "client" | "protocol" | "control" | "lifecycle",
|
||||
event: unknown,
|
||||
) => void;
|
||||
}
|
||||
|
||||
interface PingMessage {
|
||||
@@ -177,6 +183,8 @@ type ListenerRuntime = {
|
||||
/** Count of turns currently queued or in-flight in the promise chain. Incremented
|
||||
* synchronously on message arrival (before .then()) to avoid async scheduling races. */
|
||||
pendingTurns: number;
|
||||
/** Optional debug hook for WS event logging. */
|
||||
onWsEvent?: StartListenerOptions["onWsEvent"];
|
||||
};
|
||||
|
||||
type ApprovalSlot =
|
||||
@@ -383,8 +391,22 @@ export function parseServerMessage(
|
||||
}
|
||||
}
|
||||
|
||||
/** Fire onWsEvent without risking transport disruption. */
|
||||
function safeEmitWsEvent(
|
||||
direction: "send" | "recv",
|
||||
label: "client" | "protocol" | "control" | "lifecycle",
|
||||
event: unknown,
|
||||
): void {
|
||||
try {
|
||||
activeRuntime?.onWsEvent?.(direction, label, event);
|
||||
} catch {
|
||||
// Debug hook must never break transport flow.
|
||||
}
|
||||
}
|
||||
|
||||
function sendClientMessage(socket: WebSocket, payload: ClientMessage): void {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
safeEmitWsEvent("send", "client", payload);
|
||||
socket.send(JSON.stringify(payload));
|
||||
}
|
||||
}
|
||||
@@ -395,6 +417,7 @@ function sendControlMessageOverWebSocket(
|
||||
): void {
|
||||
// Central hook for protocol-only outbound WS messages so future
|
||||
// filtering/mutation can be added without touching approval flow.
|
||||
safeEmitWsEvent("send", "control", payload);
|
||||
socket.send(JSON.stringify(payload));
|
||||
}
|
||||
|
||||
@@ -419,6 +442,7 @@ export type WsProtocolEvent =
|
||||
*/
|
||||
function emitToWS(socket: WebSocket, event: WsProtocolEvent): void {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
safeEmitWsEvent("send", "protocol", event);
|
||||
socket.send(JSON.stringify(event));
|
||||
}
|
||||
}
|
||||
@@ -661,6 +685,7 @@ export async function startListenerClient(
|
||||
}
|
||||
|
||||
const runtime = createRuntime();
|
||||
runtime.onWsEvent = opts.onWsEvent;
|
||||
activeRuntime = runtime;
|
||||
|
||||
await connectWithRetry(runtime, opts);
|
||||
@@ -737,6 +762,7 @@ async function connectWithRetry(
|
||||
return;
|
||||
}
|
||||
|
||||
safeEmitWsEvent("recv", "lifecycle", { type: "_ws_open" });
|
||||
runtime.hasSuccessfulConnection = true;
|
||||
opts.onConnected(opts.connectionId);
|
||||
|
||||
@@ -753,7 +779,17 @@ async function connectWithRetry(
|
||||
});
|
||||
|
||||
socket.on("message", (data: WebSocket.RawData) => {
|
||||
const raw = data.toString();
|
||||
const parsed = parseServerMessage(data);
|
||||
if (parsed) {
|
||||
safeEmitWsEvent("recv", "client", parsed);
|
||||
} else {
|
||||
// Log unparseable frames so protocol drift is visible in debug mode
|
||||
safeEmitWsEvent("recv", "lifecycle", {
|
||||
type: "_ws_unparseable",
|
||||
raw,
|
||||
});
|
||||
}
|
||||
if (process.env.DEBUG) {
|
||||
console.log(
|
||||
`[Listen] Received message: ${JSON.stringify(parsed, null, 2)}`,
|
||||
@@ -877,6 +913,12 @@ async function connectWithRetry(
|
||||
return;
|
||||
}
|
||||
|
||||
safeEmitWsEvent("recv", "lifecycle", {
|
||||
type: "_ws_close",
|
||||
code,
|
||||
reason: reason.toString(),
|
||||
});
|
||||
|
||||
// Single authoritative queue_cleared emission for all close paths
|
||||
// (intentional and unintentional). Must fire before early returns.
|
||||
runtime.queueRuntime.clear("shutdown");
|
||||
@@ -925,6 +967,10 @@ async function connectWithRetry(
|
||||
});
|
||||
|
||||
socket.on("error", (error: Error) => {
|
||||
safeEmitWsEvent("recv", "lifecycle", {
|
||||
type: "_ws_error",
|
||||
message: error.message,
|
||||
});
|
||||
if (process.env.DEBUG) {
|
||||
console.error("[Listen] WebSocket error:", error);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user