feat: add DEBUG_SDK diagnostic logging for session and transport layers (#29)

Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com>
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Cameron
2026-02-09 17:37:09 -08:00
committed by GitHub
parent c23bf8ee0d
commit 2fff691f5b
2 changed files with 114 additions and 11 deletions

View File

@@ -22,6 +22,11 @@ import type {
} from "./types.js";
// All logging gated behind DEBUG_SDK env var
function sessionLog(tag: string, ...args: unknown[]) {
if (process.env.DEBUG_SDK) console.error(`[SDK-Session] [${tag}]`, ...args);
}
export class Session implements AsyncDisposable {
private transport: SubprocessTransport;
private _agentId: string | null = null;
@@ -29,6 +34,7 @@ export class Session implements AsyncDisposable {
private _conversationId: string | null = null;
private initialized = false;
constructor(
private options: InternalSessionOptions = {}
) {
@@ -44,7 +50,9 @@ export class Session implements AsyncDisposable {
throw new Error("Session already initialized");
}
sessionLog("init", "connecting transport...");
await this.transport.connect();
sessionLog("init", "transport connected, sending initialize request");
// Send initialize control request
await this.transport.write({
@@ -54,7 +62,9 @@ export class Session implements AsyncDisposable {
});
// Wait for init message
sessionLog("init", "waiting for init message from CLI...");
for await (const msg of this.transport.messages()) {
sessionLog("init", `received wire message: type=${msg.type}`);
if (msg.type === "system" && "subtype" in msg && msg.subtype === "init") {
const initMsg = msg as WireMessage & {
agent_id: string;
@@ -68,6 +78,8 @@ export class Session implements AsyncDisposable {
this._conversationId = initMsg.conversation_id;
this.initialized = true;
sessionLog("init", `initialized: agent=${initMsg.agent_id} conversation=${initMsg.conversation_id} model=${initMsg.model} tools=${initMsg.tools?.length || 0}`);
return {
type: "init",
agentId: initMsg.agent_id,
@@ -79,6 +91,7 @@ export class Session implements AsyncDisposable {
}
}
sessionLog("init", "ERROR: transport closed before init message received");
throw new Error("Failed to initialize session - no init message received");
}
@@ -100,23 +113,37 @@ export class Session implements AsyncDisposable {
*/
async send(message: SendMessage): Promise<void> {
if (!this.initialized) {
sessionLog("send", "auto-initializing (not yet initialized)");
await this.initialize();
}
const preview = typeof message === "string"
? message.slice(0, 100)
: Array.isArray(message) ? `[multimodal: ${message.length} parts]` : String(message).slice(0, 100);
sessionLog("send", `sending message: ${preview}${typeof message === "string" && message.length > 100 ? "..." : ""}`);
await this.transport.write({
type: "user",
message: { role: "user", content: message },
});
sessionLog("send", "message written to transport");
}
/**
* Stream messages from the agent
*/
async *stream(): AsyncGenerator<SDKMessage> {
const streamStart = Date.now();
let yieldCount = 0;
let dropCount = 0;
let gotResult = false;
sessionLog("stream", `starting stream (agent=${this._agentId}, conversation=${this._conversationId})`);
for await (const wireMsg of this.transport.messages()) {
// Handle CLI → SDK control requests (e.g., can_use_tool)
if (wireMsg.type === "control_request") {
const controlReq = wireMsg as ControlRequest;
sessionLog("stream", `control_request: subtype=${controlReq.request.subtype} tool=${(controlReq.request as CanUseToolControlRequest).tool_name || "N/A"}`);
if (controlReq.request.subtype === "can_use_tool") {
await this.handleCanUseTool(
controlReq.request_id,
@@ -128,14 +155,27 @@ export class Session implements AsyncDisposable {
const sdkMsg = this.transformMessage(wireMsg);
if (sdkMsg) {
yieldCount++;
sessionLog("stream", `yield #${yieldCount}: type=${sdkMsg.type}${sdkMsg.type === "result" ? ` success=${(sdkMsg as SDKResultMessage).success} error=${(sdkMsg as SDKResultMessage).error || "none"}` : ""}`);
yield sdkMsg;
// Stop on result message
if (sdkMsg.type === "result") {
gotResult = true;
break;
}
} else {
dropCount++;
const wireMsgAny = wireMsg as unknown as Record<string, unknown>;
sessionLog("stream", `DROPPED wire message #${dropCount}: type=${wireMsg.type} message_type=${wireMsgAny.message_type || "N/A"} subtype=${wireMsgAny.subtype || "N/A"}`);
}
}
const elapsed = Date.now() - streamStart;
sessionLog("stream", `stream ended: duration=${elapsed}ms yielded=${yieldCount} dropped=${dropCount} gotResult=${gotResult}`);
if (!gotResult) {
sessionLog("stream", `WARNING: stream ended WITHOUT a result message -- transport may have closed unexpectedly`);
}
}
/**
@@ -147,8 +187,11 @@ export class Session implements AsyncDisposable {
): Promise<void> {
let response: CanUseToolResponse;
sessionLog("canUseTool", `tool=${req.tool_name} mode=${this.options.permissionMode || "default"} requestId=${requestId}`);
// If bypassPermissions mode, auto-allow all tools
if (this.options.permissionMode === "bypassPermissions") {
sessionLog("canUseTool", `AUTO-ALLOW ${req.tool_name} (bypassPermissions)`);
response = {
behavior: "allow",
updatedInput: null,
@@ -187,6 +230,8 @@ export class Session implements AsyncDisposable {
}
// Send control_response (Claude SDK compatible format)
const responseBehavior = "behavior" in response ? response.behavior : "unknown";
sessionLog("canUseTool", `responding: requestId=${requestId} behavior=${responseBehavior}`);
await this.transport.write({
type: "control_response",
response: {
@@ -195,12 +240,14 @@ export class Session implements AsyncDisposable {
response,
},
});
sessionLog("canUseTool", `response sent for ${req.tool_name}`);
}
/**
* Abort the current operation (interrupt without closing the session)
*/
async abort(): Promise<void> {
sessionLog("abort", `aborting session (agent=${this._agentId})`);
await this.transport.write({
type: "control_request",
request_id: `interrupt-${Date.now()}`,
@@ -212,6 +259,7 @@ export class Session implements AsyncDisposable {
* Close the session
*/
close(): void {
sessionLog("close", `closing session (agent=${this._agentId}, conversation=${this._conversationId})`);
this.transport.close();
}

View File

@@ -8,13 +8,20 @@ import { spawn, type ChildProcess } from "node:child_process";
import { createInterface, type Interface } from "node:readline";
import type { InternalSessionOptions, WireMessage } from "./types.js";
// All logging gated behind DEBUG_SDK env var
function sdkLog(tag: string, ...args: unknown[]) {
if (process.env.DEBUG_SDK) console.error(`[SDK-Transport] [${tag}]`, ...args);
}
export class SubprocessTransport {
private process: ChildProcess | null = null;
private stdout: Interface | null = null;
private messageQueue: WireMessage[] = [];
private messageResolvers: Array<(msg: WireMessage) => void> = [];
private messageResolvers: Array<(msg: WireMessage | null) => void> = [];
private closed = false;
private agentId?: string;
private wireMessageCount = 0;
private lastMessageAt = 0;
constructor(
private options: InternalSessionOptions = {}
@@ -28,10 +35,10 @@ export class SubprocessTransport {
// Find the CLI - use the installed letta-code package
const cliPath = await this.findCli();
if (process.env.DEBUG) {
console.log("[letta-code-sdk] Using CLI:", cliPath);
console.log("[letta-code-sdk] Args:", args.join(" "));
}
sdkLog("connect", `CLI: ${cliPath}`);
sdkLog("connect", `args: ${args.join(" ")}`);
sdkLog("connect", `cwd: ${this.options.cwd || process.cwd()}`);
sdkLog("connect", `permissionMode: ${this.options.permissionMode || "default"}`);
this.process = spawn("node", [cliPath, ...args], {
cwd: this.options.cwd || process.cwd(),
@@ -39,6 +46,9 @@ export class SubprocessTransport {
env: { ...process.env },
});
const pid = this.process.pid;
sdkLog("connect", `CLI process spawned, pid=${pid}`);
if (!this.process.stdout || !this.process.stdin) {
throw new Error("Failed to create subprocess pipes");
}
@@ -55,7 +65,8 @@ export class SubprocessTransport {
const msg = JSON.parse(line) as WireMessage;
this.handleMessage(msg);
} catch {
// Ignore non-JSON lines (stderr leakage, etc.)
// Non-JSON line from CLI stdout - could be important debug info
sdkLog("stdout", `[non-JSON] ${line.slice(0, 500)}`);
}
});
@@ -70,11 +81,24 @@ export class SubprocessTransport {
}
// Handle process exit
this.process.on("close", (code) => {
this.closed = true;
//
// BUG FIX: When the CLI subprocess exits while read() has a pending
// resolver waiting for the next message, that resolver would never fire.
// The messages() async generator would be stuck in `await this.read()`
// forever, causing session.stream() to hang, which deadlocks the
// caller's processing mutex. Resolving pending readers with null on
// process exit lets messages() break out of its loop cleanly.
this.process.on("close", (code, signal) => {
if (code !== 0 && code !== null) {
console.error(`[letta-code-sdk] CLI process exited with code ${code}`);
}
sdkLog("close", `CLI process exited: pid=${pid} code=${code} signal=${signal} wireMessages=${this.wireMessageCount} msSinceLastMsg=${this.lastMessageAt ? Date.now() - this.lastMessageAt : 0} pendingResolvers=${this.messageResolvers.length} queueLen=${this.messageQueue.length}`);
this.closed = true;
// Flush pending readers so they don't hang forever (see comment above)
for (const resolve of this.messageResolvers) {
resolve(null);
}
this.messageResolvers = [];
});
this.process.on("error", (err) => {
@@ -88,8 +112,12 @@ export class SubprocessTransport {
*/
async write(data: object): Promise<void> {
if (!this.process?.stdin || this.closed) {
throw new Error("Transport not connected");
const err = new Error(`Transport not connected (closed=${this.closed}, pid=${this.process?.pid}, stdin=${!!this.process?.stdin})`);
sdkLog("write", err.message);
throw err;
}
const payload = data as Record<string, unknown>;
sdkLog("write", `type=${payload.type} subtype=${(payload.request as Record<string, unknown>)?.subtype || (payload.response as Record<string, unknown>)?.subtype || "N/A"}`);
this.process.stdin.write(JSON.stringify(data) + "\n");
}
@@ -104,10 +132,12 @@ export class SubprocessTransport {
// If closed, no more messages
if (this.closed) {
sdkLog("read", `returning null (closed), total wireMessages=${this.wireMessageCount}`);
return null;
}
// Wait for next message
sdkLog("read", `waiting for next message (resolvers=${this.messageResolvers.length + 1}, queue=${this.messageQueue.length})`);
return new Promise((resolve) => {
this.messageResolvers.push(resolve);
});
@@ -119,7 +149,10 @@ export class SubprocessTransport {
async *messages(): AsyncGenerator<WireMessage> {
while (true) {
const msg = await this.read();
if (msg === null) break;
if (msg === null) {
sdkLog("messages", `iterator ending (closed=${this.closed}, wireMessages=${this.wireMessageCount})`);
break;
}
yield msg;
}
}
@@ -128,6 +161,7 @@ export class SubprocessTransport {
* Close the transport
*/
close(): void {
sdkLog("close", `explicit close called (wireMessages=${this.wireMessageCount}, pendingResolvers=${this.messageResolvers.length}, pid=${this.process?.pid})`);
if (this.process) {
this.process.stdin?.end();
this.process.kill();
@@ -137,7 +171,7 @@ export class SubprocessTransport {
// Resolve any pending readers with null
for (const resolve of this.messageResolvers) {
resolve(null as unknown as WireMessage);
resolve(null);
}
this.messageResolvers = [];
}
@@ -147,9 +181,30 @@ export class SubprocessTransport {
}
private handleMessage(msg: WireMessage): void {
this.wireMessageCount++;
this.lastMessageAt = Date.now();
// Compact log of every wire message for traceability
const wirePayload = msg as unknown as Record<string, unknown>;
const msgType = wirePayload.message_type || wirePayload.subtype || "";
sdkLog("wire", `#${this.wireMessageCount} type=${msg.type} ${msgType ? `msg_type=${msgType}` : ""} resolvers=${this.messageResolvers.length} queue=${this.messageQueue.length}`);
// Always log critical message types (result, errors, approval)
if (msg.type === "result") {
const result = wirePayload as unknown as { subtype?: string; result?: string; duration_ms?: number; stop_reason?: string };
sdkLog("wire", `RESULT: subtype=${result.subtype} stop_reason=${result.stop_reason || "N/A"} duration=${result.duration_ms}ms resultLen=${result.result?.length || 0}`);
}
// Track agent_id from init message
if (msg.type === "system" && "subtype" in msg && msg.subtype === "init") {
this.agentId = (msg as unknown as { agent_id: string }).agent_id;
sdkLog("wire", `INIT: agent_id=${this.agentId}`);
}
// Log control requests (approval flow)
if (msg.type === "control_request") {
const req = wirePayload as unknown as { request_id?: string; request?: { subtype?: string; tool_name?: string } };
sdkLog("wire", `CONTROL_REQUEST: id=${req.request_id} subtype=${req.request?.subtype} tool=${req.request?.tool_name || "N/A"}`);
}
// If someone is waiting for a message, give it to them