From 2fff691f5bf593453de8fc9703939acfa5bd0c5c Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 9 Feb 2026 17:37:09 -0800 Subject: [PATCH] feat: add DEBUG_SDK diagnostic logging for session and transport layers (#29) Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com> Co-authored-by: Letta --- src/session.ts | 48 ++++++++++++++++++++++++++++++ src/transport.ts | 77 +++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 114 insertions(+), 11 deletions(-) diff --git a/src/session.ts b/src/session.ts index d5a0290..7709b8a 100644 --- a/src/session.ts +++ b/src/session.ts @@ -22,6 +22,11 @@ import type { } from "./types.js"; +// All logging gated behind DEBUG_SDK env var +function sessionLog(tag: string, ...args: unknown[]) { + if (process.env.DEBUG_SDK) console.error(`[SDK-Session] [${tag}]`, ...args); +} + export class Session implements AsyncDisposable { private transport: SubprocessTransport; private _agentId: string | null = null; @@ -29,6 +34,7 @@ export class Session implements AsyncDisposable { private _conversationId: string | null = null; private initialized = false; + constructor( private options: InternalSessionOptions = {} ) { @@ -44,7 +50,9 @@ export class Session implements AsyncDisposable { throw new Error("Session already initialized"); } + sessionLog("init", "connecting transport..."); await this.transport.connect(); + sessionLog("init", "transport connected, sending initialize request"); // Send initialize control request await this.transport.write({ @@ -54,7 +62,9 @@ export class Session implements AsyncDisposable { }); // Wait for init message + sessionLog("init", "waiting for init message from CLI..."); for await (const msg of this.transport.messages()) { + sessionLog("init", `received wire message: type=${msg.type}`); if (msg.type === "system" && "subtype" in msg && msg.subtype === "init") { const initMsg = msg as WireMessage & { agent_id: string; @@ -68,6 +78,8 @@ export class Session implements AsyncDisposable { this._conversationId = initMsg.conversation_id; this.initialized = true; + sessionLog("init", `initialized: agent=${initMsg.agent_id} conversation=${initMsg.conversation_id} model=${initMsg.model} tools=${initMsg.tools?.length || 0}`); + return { type: "init", agentId: initMsg.agent_id, @@ -79,6 +91,7 @@ export class Session implements AsyncDisposable { } } + sessionLog("init", "ERROR: transport closed before init message received"); throw new Error("Failed to initialize session - no init message received"); } @@ -100,23 +113,37 @@ export class Session implements AsyncDisposable { */ async send(message: SendMessage): Promise { if (!this.initialized) { + sessionLog("send", "auto-initializing (not yet initialized)"); await this.initialize(); } + const preview = typeof message === "string" + ? message.slice(0, 100) + : Array.isArray(message) ? `[multimodal: ${message.length} parts]` : String(message).slice(0, 100); + sessionLog("send", `sending message: ${preview}${typeof message === "string" && message.length > 100 ? "..." : ""}`); + await this.transport.write({ type: "user", message: { role: "user", content: message }, }); + sessionLog("send", "message written to transport"); } /** * Stream messages from the agent */ async *stream(): AsyncGenerator { + const streamStart = Date.now(); + let yieldCount = 0; + let dropCount = 0; + let gotResult = false; + sessionLog("stream", `starting stream (agent=${this._agentId}, conversation=${this._conversationId})`); + for await (const wireMsg of this.transport.messages()) { // Handle CLI → SDK control requests (e.g., can_use_tool) if (wireMsg.type === "control_request") { const controlReq = wireMsg as ControlRequest; + sessionLog("stream", `control_request: subtype=${controlReq.request.subtype} tool=${(controlReq.request as CanUseToolControlRequest).tool_name || "N/A"}`); if (controlReq.request.subtype === "can_use_tool") { await this.handleCanUseTool( controlReq.request_id, @@ -128,14 +155,27 @@ export class Session implements AsyncDisposable { const sdkMsg = this.transformMessage(wireMsg); if (sdkMsg) { + yieldCount++; + sessionLog("stream", `yield #${yieldCount}: type=${sdkMsg.type}${sdkMsg.type === "result" ? ` success=${(sdkMsg as SDKResultMessage).success} error=${(sdkMsg as SDKResultMessage).error || "none"}` : ""}`); yield sdkMsg; // Stop on result message if (sdkMsg.type === "result") { + gotResult = true; break; } + } else { + dropCount++; + const wireMsgAny = wireMsg as unknown as Record; + sessionLog("stream", `DROPPED wire message #${dropCount}: type=${wireMsg.type} message_type=${wireMsgAny.message_type || "N/A"} subtype=${wireMsgAny.subtype || "N/A"}`); } } + + const elapsed = Date.now() - streamStart; + sessionLog("stream", `stream ended: duration=${elapsed}ms yielded=${yieldCount} dropped=${dropCount} gotResult=${gotResult}`); + if (!gotResult) { + sessionLog("stream", `WARNING: stream ended WITHOUT a result message -- transport may have closed unexpectedly`); + } } /** @@ -147,8 +187,11 @@ export class Session implements AsyncDisposable { ): Promise { let response: CanUseToolResponse; + sessionLog("canUseTool", `tool=${req.tool_name} mode=${this.options.permissionMode || "default"} requestId=${requestId}`); + // If bypassPermissions mode, auto-allow all tools if (this.options.permissionMode === "bypassPermissions") { + sessionLog("canUseTool", `AUTO-ALLOW ${req.tool_name} (bypassPermissions)`); response = { behavior: "allow", updatedInput: null, @@ -187,6 +230,8 @@ export class Session implements AsyncDisposable { } // Send control_response (Claude SDK compatible format) + const responseBehavior = "behavior" in response ? response.behavior : "unknown"; + sessionLog("canUseTool", `responding: requestId=${requestId} behavior=${responseBehavior}`); await this.transport.write({ type: "control_response", response: { @@ -195,12 +240,14 @@ export class Session implements AsyncDisposable { response, }, }); + sessionLog("canUseTool", `response sent for ${req.tool_name}`); } /** * Abort the current operation (interrupt without closing the session) */ async abort(): Promise { + sessionLog("abort", `aborting session (agent=${this._agentId})`); await this.transport.write({ type: "control_request", request_id: `interrupt-${Date.now()}`, @@ -212,6 +259,7 @@ export class Session implements AsyncDisposable { * Close the session */ close(): void { + sessionLog("close", `closing session (agent=${this._agentId}, conversation=${this._conversationId})`); this.transport.close(); } diff --git a/src/transport.ts b/src/transport.ts index 3bdefa1..d3f3949 100644 --- a/src/transport.ts +++ b/src/transport.ts @@ -8,13 +8,20 @@ import { spawn, type ChildProcess } from "node:child_process"; import { createInterface, type Interface } from "node:readline"; import type { InternalSessionOptions, WireMessage } from "./types.js"; +// All logging gated behind DEBUG_SDK env var +function sdkLog(tag: string, ...args: unknown[]) { + if (process.env.DEBUG_SDK) console.error(`[SDK-Transport] [${tag}]`, ...args); +} + export class SubprocessTransport { private process: ChildProcess | null = null; private stdout: Interface | null = null; private messageQueue: WireMessage[] = []; - private messageResolvers: Array<(msg: WireMessage) => void> = []; + private messageResolvers: Array<(msg: WireMessage | null) => void> = []; private closed = false; private agentId?: string; + private wireMessageCount = 0; + private lastMessageAt = 0; constructor( private options: InternalSessionOptions = {} @@ -28,10 +35,10 @@ export class SubprocessTransport { // Find the CLI - use the installed letta-code package const cliPath = await this.findCli(); - if (process.env.DEBUG) { - console.log("[letta-code-sdk] Using CLI:", cliPath); - console.log("[letta-code-sdk] Args:", args.join(" ")); - } + sdkLog("connect", `CLI: ${cliPath}`); + sdkLog("connect", `args: ${args.join(" ")}`); + sdkLog("connect", `cwd: ${this.options.cwd || process.cwd()}`); + sdkLog("connect", `permissionMode: ${this.options.permissionMode || "default"}`); this.process = spawn("node", [cliPath, ...args], { cwd: this.options.cwd || process.cwd(), @@ -39,6 +46,9 @@ export class SubprocessTransport { env: { ...process.env }, }); + const pid = this.process.pid; + sdkLog("connect", `CLI process spawned, pid=${pid}`); + if (!this.process.stdout || !this.process.stdin) { throw new Error("Failed to create subprocess pipes"); } @@ -55,7 +65,8 @@ export class SubprocessTransport { const msg = JSON.parse(line) as WireMessage; this.handleMessage(msg); } catch { - // Ignore non-JSON lines (stderr leakage, etc.) + // Non-JSON line from CLI stdout - could be important debug info + sdkLog("stdout", `[non-JSON] ${line.slice(0, 500)}`); } }); @@ -70,11 +81,24 @@ export class SubprocessTransport { } // Handle process exit - this.process.on("close", (code) => { - this.closed = true; + // + // BUG FIX: When the CLI subprocess exits while read() has a pending + // resolver waiting for the next message, that resolver would never fire. + // The messages() async generator would be stuck in `await this.read()` + // forever, causing session.stream() to hang, which deadlocks the + // caller's processing mutex. Resolving pending readers with null on + // process exit lets messages() break out of its loop cleanly. + this.process.on("close", (code, signal) => { if (code !== 0 && code !== null) { console.error(`[letta-code-sdk] CLI process exited with code ${code}`); } + sdkLog("close", `CLI process exited: pid=${pid} code=${code} signal=${signal} wireMessages=${this.wireMessageCount} msSinceLastMsg=${this.lastMessageAt ? Date.now() - this.lastMessageAt : 0} pendingResolvers=${this.messageResolvers.length} queueLen=${this.messageQueue.length}`); + this.closed = true; + // Flush pending readers so they don't hang forever (see comment above) + for (const resolve of this.messageResolvers) { + resolve(null); + } + this.messageResolvers = []; }); this.process.on("error", (err) => { @@ -88,8 +112,12 @@ export class SubprocessTransport { */ async write(data: object): Promise { if (!this.process?.stdin || this.closed) { - throw new Error("Transport not connected"); + const err = new Error(`Transport not connected (closed=${this.closed}, pid=${this.process?.pid}, stdin=${!!this.process?.stdin})`); + sdkLog("write", err.message); + throw err; } + const payload = data as Record; + sdkLog("write", `type=${payload.type} subtype=${(payload.request as Record)?.subtype || (payload.response as Record)?.subtype || "N/A"}`); this.process.stdin.write(JSON.stringify(data) + "\n"); } @@ -104,10 +132,12 @@ export class SubprocessTransport { // If closed, no more messages if (this.closed) { + sdkLog("read", `returning null (closed), total wireMessages=${this.wireMessageCount}`); return null; } // Wait for next message + sdkLog("read", `waiting for next message (resolvers=${this.messageResolvers.length + 1}, queue=${this.messageQueue.length})`); return new Promise((resolve) => { this.messageResolvers.push(resolve); }); @@ -119,7 +149,10 @@ export class SubprocessTransport { async *messages(): AsyncGenerator { while (true) { const msg = await this.read(); - if (msg === null) break; + if (msg === null) { + sdkLog("messages", `iterator ending (closed=${this.closed}, wireMessages=${this.wireMessageCount})`); + break; + } yield msg; } } @@ -128,6 +161,7 @@ export class SubprocessTransport { * Close the transport */ close(): void { + sdkLog("close", `explicit close called (wireMessages=${this.wireMessageCount}, pendingResolvers=${this.messageResolvers.length}, pid=${this.process?.pid})`); if (this.process) { this.process.stdin?.end(); this.process.kill(); @@ -137,7 +171,7 @@ export class SubprocessTransport { // Resolve any pending readers with null for (const resolve of this.messageResolvers) { - resolve(null as unknown as WireMessage); + resolve(null); } this.messageResolvers = []; } @@ -147,9 +181,30 @@ export class SubprocessTransport { } private handleMessage(msg: WireMessage): void { + this.wireMessageCount++; + this.lastMessageAt = Date.now(); + + // Compact log of every wire message for traceability + const wirePayload = msg as unknown as Record; + const msgType = wirePayload.message_type || wirePayload.subtype || ""; + sdkLog("wire", `#${this.wireMessageCount} type=${msg.type} ${msgType ? `msg_type=${msgType}` : ""} resolvers=${this.messageResolvers.length} queue=${this.messageQueue.length}`); + + // Always log critical message types (result, errors, approval) + if (msg.type === "result") { + const result = wirePayload as unknown as { subtype?: string; result?: string; duration_ms?: number; stop_reason?: string }; + sdkLog("wire", `RESULT: subtype=${result.subtype} stop_reason=${result.stop_reason || "N/A"} duration=${result.duration_ms}ms resultLen=${result.result?.length || 0}`); + } + // Track agent_id from init message if (msg.type === "system" && "subtype" in msg && msg.subtype === "init") { this.agentId = (msg as unknown as { agent_id: string }).agent_id; + sdkLog("wire", `INIT: agent_id=${this.agentId}`); + } + + // Log control requests (approval flow) + if (msg.type === "control_request") { + const req = wirePayload as unknown as { request_id?: string; request?: { subtype?: string; tool_name?: string } }; + sdkLog("wire", `CONTROL_REQUEST: id=${req.request_id} subtype=${req.request?.subtype} tool=${req.request?.tool_name || "N/A"}`); } // If someone is waiting for a message, give it to them