diff --git a/package.json b/package.json index 9941ba4..1336921 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@letta-ai/letta-code-sdk", - "version": "0.1.6", + "version": "0.1.7", "description": "SDK for programmatic control of Letta Code CLI", "type": "module", "main": "./dist/index.js", diff --git a/src/session.ts b/src/session.ts index 814842e..4fea98a 100644 --- a/src/session.ts +++ b/src/session.ts @@ -263,112 +263,21 @@ export class Session implements AsyncDisposable { private async runBackgroundPump(): Promise { sessionLog("pump", "background pump started"); - // Buffer for accumulating tool_call_message arguments. - // The CLI streams tool_call_message in chunks with partial `arguments`. - // We accumulate args per tool_call_id and flush the complete message when - // a different message type arrives or the pump ends. + // Tool call chunks are emitted immediately (no buffering). + // Consumers that need complete arguments should accumulate rawArguments + // across chunks sharing the same toolCallId on their side. // - // IMPORTANT: The Letta API follows the OpenAI streaming format for parallel - // tool calls -- only the FIRST chunk per tool call includes `tool_call_id`. - // Subsequent chunks identify themselves via `index` only. We maintain an - // index-to-id mapping to associate argument deltas with their parent call. - const pendingToolCalls = new Map(); + // Index-to-ID mapping: The Letta API follows the OpenAI streaming format + // for parallel tool calls -- only the first chunk per tool call includes + // tool_call_id, subsequent chunks identify themselves via index only. + // We maintain a lightweight index->id map so transformMessage() can + // resolve the ID for every chunk without buffering. const indexToToolCallId = new Map(); - const mergeToolArgs = (existing: string, incoming: string): string => { - if (!incoming) return existing; - if (!existing) return incoming; - if (incoming === existing) return existing; - - // Handle cumulative chunking where the latest chunk already includes prior text. - if (incoming.startsWith(existing)) return incoming; - if (existing.endsWith(incoming)) return existing; - - // Handle delta chunking where each chunk is an append. - return `${existing}${incoming}`; - }; - - const extractToolCall = (wireMsgAny: Record) => { - const toolCalls = wireMsgAny.tool_calls as Array> | undefined; - const toolCall = wireMsgAny.tool_call as Record | undefined; - const tc = toolCalls?.[0] || toolCall; - if (!tc) return null; - - // Resolve tool_call_id: direct field, or via index mapping from first chunk - let tcId = (tc.tool_call_id as string | undefined) ?? (tc.id as string | undefined); - const tcIndex = tc.index as number | undefined; - - // Extract args from either flat format or OpenAI nested `function` format - const fnObj = tc.function as Record | undefined; - const argsText = (tc.arguments as string | undefined) ?? (fnObj?.arguments as string | undefined) ?? ""; - const toolName = (tc.name as string | undefined) ?? (fnObj?.name as string | undefined); - - if (tcId && tcIndex !== undefined) { - // First chunk for this index: establish index -> id mapping - indexToToolCallId.set(tcIndex, tcId); - } else if (!tcId && tcIndex !== undefined) { - // Subsequent chunk: look up id by index - tcId = indexToToolCallId.get(tcIndex); - } - - if (!tcId) return null; - return { - id: tcId, - name: toolName ?? "?", - args: argsText, - }; - }; - - const flushPendingToolCalls = () => { - const patchToolCallArgs = ( - tc: Record, - args: string, - ): Record => { - const fnObj = tc.function as Record | undefined; - return { - ...tc, - // Keep top-level args up to date for flat-wire consumers. - arguments: args, - // Also patch nested OpenAI function args when present. - ...(fnObj ? { function: { ...fnObj, arguments: args } } : {}), - }; - }; - - for (const [, pending] of pendingToolCalls) { - // Patch the accumulated arguments into the wire message before transforming - const patched = { ...pending.wireMsg } as Record; - const toolCalls = patched.tool_calls as Array> | undefined; - const toolCall = patched.tool_call as Record | undefined; - if (toolCalls?.[0]) { - patched.tool_calls = [ - patchToolCallArgs(toolCalls[0], pending.accumulatedArgs), - ]; - } else if (toolCall) { - patched.tool_call = patchToolCallArgs( - toolCall, - pending.accumulatedArgs, - ); - } - const sdkMsg = this.transformMessage(patched as unknown as WireMessage); - if (sdkMsg) { - this.enqueueStreamMessage(sdkMsg); - } - } - pendingToolCalls.clear(); - indexToToolCallId.clear(); - }; - for await (const wireMsg of this.transport.messages()) { const wireMsgAny = wireMsg as unknown as Record; if (wireMsg.type === "control_request") { - // Ensure tool rows/args are visible before any runtime approval callback. - if (pendingToolCalls.size > 0) { - flushPendingToolCalls(); - } const handled = await this.handleControlRequest(wireMsg as ControlRequest); if (!handled) { sessionLog("pump", `DROPPED unsupported control_request: subtype=${(wireMsgAny.request as Record)?.subtype || "N/A"}`); @@ -393,39 +302,33 @@ export class Session implements AsyncDisposable { continue; } - // Accumulate tool_call_message arguments across streaming chunks. - // The CLI sends partial args in each chunk; we concatenate and flush - // the complete message when a different type arrives. + // For tool_call_message chunks, resolve index-based IDs before transform. + // The first chunk has both tool_call_id and index; subsequent chunks have + // only index. Patch the wire message so transformMessage() always sees an ID. const messageType = wireMsgAny.message_type as string | undefined; - if (wireMsg.type === "message" && (messageType === "tool_call_message" || messageType === "approval_request_message")) { - const toolCall = extractToolCall(wireMsgAny); - if (toolCall) { - const existing = pendingToolCalls.get(toolCall.id); - if (existing) { - // Same tool_call_id: accumulate arguments - existing.accumulatedArgs = mergeToolArgs( - existing.accumulatedArgs, - toolCall.args - ); - sessionLog("pump", `tool_call args accumulated for ${toolCall.id}: +${toolCall.args.length} chars`); - } else { - // New tool_call_id: buffer it - pendingToolCalls.set(toolCall.id, { - wireMsg, - accumulatedArgs: toolCall.args, - }); - sessionLog("pump", `tool_call buffered: ${toolCall.name} id=${toolCall.id}`); - } - continue; - } - // No tool_call_id -- fall through to normal processing - } + const toolCalls = wireMsgAny.tool_calls as Array> | undefined; + const toolCall = wireMsgAny.tool_call as Record | undefined; + const tc = toolCalls?.[0] || toolCall; + if (tc) { + const fnObj = tc.function as Record | undefined; + const tcId = (tc.tool_call_id as string | undefined) ?? (tc.id as string | undefined); + const tcIndex = tc.index as number | undefined; - // Non-tool_call message: flush any pending tool calls first. - // Skip flush for stream_event to avoid premature flush of incomplete tool calls. - if (pendingToolCalls.size > 0 && wireMsg.type !== "stream_event") { - flushPendingToolCalls(); + if (tcId && tcIndex !== undefined) { + indexToToolCallId.set(tcIndex, tcId); + } else if (!tcId && tcIndex !== undefined) { + const resolvedId = indexToToolCallId.get(tcIndex); + if (resolvedId) { + // Patch the resolved ID into the wire message for transformMessage + if (fnObj) { + tc.id = resolvedId; + } else { + tc.tool_call_id = resolvedId; + } + } + } + } } const sdkMsg = this.transformMessage(wireMsg); @@ -436,11 +339,6 @@ export class Session implements AsyncDisposable { } } - // Flush any remaining buffered tool calls on pump end - if (pendingToolCalls.size > 0) { - flushPendingToolCalls(); - } - sessionLog("pump", "background pump ended"); } @@ -999,6 +897,7 @@ export class Session implements AsyncDisposable { toolCallId, toolName, toolInput, + rawArguments: toolArgs || undefined, uuid: msg.uuid, }; } diff --git a/src/tests/session.test.ts b/src/tests/session.test.ts index c96ac18..3c99abb 100644 --- a/src/tests/session.test.ts +++ b/src/tests/session.test.ts @@ -407,6 +407,7 @@ describe("Session", () => { toolCallId: "call-approval-1", toolName: "Bash", toolInput: { command: "pwd" }, + rawArguments: JSON.stringify({ command: "pwd" }), uuid: "approval-1", }); }); @@ -427,6 +428,7 @@ describe("Session", () => { toolCallId: "call-approval-2", toolName: "Read", toolInput: { raw: "path=/tmp/foo.txt" }, + rawArguments: "path=/tmp/foo.txt", uuid: "approval-2", }); }); diff --git a/src/tests/tool-call-args-accumulation.test.ts b/src/tests/tool-call-args-accumulation.test.ts index 8b79b70..0c6ca12 100644 --- a/src/tests/tool-call-args-accumulation.test.ts +++ b/src/tests/tool-call-args-accumulation.test.ts @@ -111,25 +111,13 @@ function reasoningChunk(uuid: string, text = "done"): WireMessage { } as unknown as WireMessage; } -function canUseToolRequest(requestId: string): WireMessage { - return { - type: "control_request", - request_id: requestId, - request: { - subtype: "can_use_tool", - tool_name: "Bash", - input: { command: "echo hi" }, - }, - } as unknown as WireMessage; -} - function queuedMessages(session: Session) { return ((session as unknown as { streamQueue: unknown[] }).streamQueue ?? []) as Array>; } -describe("tool call argument accumulation", () => { - test("accumulates delta chunks and parses final tool input", async () => { +describe("tool call streaming passthrough", () => { + test("emits each chunk immediately with rawArguments", async () => { const { transport } = makeFakeTransport([ toolChunk("tc-1", '{"command":"echo', "msg-1"), toolChunk("tc-1", ' hi"}', "msg-1"), @@ -143,62 +131,46 @@ describe("tool call argument accumulation", () => { .runBackgroundPump(); const msgs = queuedMessages(session); - const toolMsg = msgs.find((m) => m.type === "tool_call"); - expect(toolMsg).toBeDefined(); - expect(toolMsg?.toolInput).toEqual({ command: "echo hi" }); + const toolMsgs = msgs.filter((m) => m.type === "tool_call"); + + // Both chunks should be emitted individually (no buffering) + expect(toolMsgs.length).toBe(2); + + // First chunk has partial args + expect(toolMsgs[0]?.toolCallId).toBe("tc-1"); + expect(toolMsgs[0]?.rawArguments).toBe('{"command":"echo'); + + // Second chunk has the continuation + expect(toolMsgs[1]?.toolCallId).toBe("tc-1"); + expect(toolMsgs[1]?.rawArguments).toBe(' hi"}'); + + // Reasoning message also present + expect(msgs.some((m) => m.type === "reasoning")).toBe(true); }); - test("flushes pending tool call before control_request callback runs", async () => { - const queueSizesSeenByCallback: number[] = []; - - const canUseTool = mock(() => { - queueSizesSeenByCallback.push( - queuedMessages(session).filter((m) => m.type === "tool_call").length - ); - return { - behavior: "allow" as const, - updatedInput: null, - updatedPermissions: [], - }; - }); - - const { transport, writes } = makeFakeTransport([ - toolChunk("tc-2", '{"command":"echo', "msg-3"), - toolChunk("tc-2", ' hi"}', "msg-3"), - canUseToolRequest("can-use-1"), + test("emits single complete chunk with parsed toolInput", async () => { + const { transport } = makeFakeTransport([ + toolChunk("tc-2", '{"command":"echo hi"}', "msg-3"), reasoningChunk("msg-4"), ]); - const session = new Session({ - agentId: "agent-test", - canUseTool, - }); + const session = new Session({ agentId: "agent-test" }); (session as unknown as { transport: FakeTransport }).transport = transport; await (session as unknown as { runBackgroundPump: () => Promise }) .runBackgroundPump(); - expect(canUseTool).toHaveBeenCalledTimes(1); - expect(queueSizesSeenByCallback[0]).toBe(1); - expect( - writes.some((w) => { - const wire = w as { - type?: string; - response?: { request_id?: string; subtype?: string }; - }; - return ( - wire.type === "control_response" && - wire.response?.request_id === "can-use-1" && - wire.response?.subtype === "success" - ); - }) - ).toBe(true); + const msgs = queuedMessages(session); + const toolMsg = msgs.find((m) => m.type === "tool_call"); + expect(toolMsg).toBeDefined(); + expect(toolMsg?.toolInput).toEqual({ command: "echo hi" }); + expect(toolMsg?.rawArguments).toBe('{"command":"echo hi"}'); }); - test("handles cumulative chunks without duplicating prior arguments", async () => { + test("resolves index-only continuation chunks to correct toolCallId", async () => { const { transport } = makeFakeTransport([ - toolChunk("tc-3", '{"command":"ec', "msg-5"), - toolChunk("tc-3", '{"command":"echo hi"}', "msg-5"), + indexedToolChunk(0, '{"command":"echo', "msg-5", { toolCallId: "tc-3" }), + indexedToolChunk(0, ' hi"}', "msg-5"), reasoningChunk("msg-6"), ]); @@ -209,15 +181,23 @@ describe("tool call argument accumulation", () => { .runBackgroundPump(); const msgs = queuedMessages(session); - const toolMsg = msgs.find((m) => m.type === "tool_call"); - expect(toolMsg).toBeDefined(); - expect(toolMsg?.toolInput).toEqual({ command: "echo hi" }); + const toolMsgs = msgs.filter((m) => m.type === "tool_call"); + + // Both chunks emitted, both resolved to same toolCallId + expect(toolMsgs.length).toBe(2); + expect(toolMsgs[0]?.toolCallId).toBe("tc-3"); + expect(toolMsgs[1]?.toolCallId).toBe("tc-3"); }); - test("accumulates index-only continuation chunks after first id-bearing chunk", async () => { + test("resolves nested OpenAI function chunks with id + index continuation", async () => { const { transport } = makeFakeTransport([ - indexedToolChunk(0, '{"command":"echo', "msg-7", { toolCallId: "tc-4" }), - indexedToolChunk(0, ' hi"}', "msg-7"), + nestedFunctionChunk(1, '{"command":"echo', "msg-7", { + toolCallId: "tc-4", + toolName: "Bash", + }), + nestedFunctionChunk(1, ' hi"}', "msg-7", { + toolName: "Bash", + }), reasoningChunk("msg-8"), ]); @@ -228,20 +208,22 @@ describe("tool call argument accumulation", () => { .runBackgroundPump(); const msgs = queuedMessages(session); - const toolMsg = msgs.find((m) => m.type === "tool_call"); - expect(toolMsg).toBeDefined(); - expect(toolMsg?.toolCallId).toBe("tc-4"); - expect(toolMsg?.toolInput).toEqual({ command: "echo hi" }); + const toolMsgs = msgs.filter((m) => m.type === "tool_call"); + + expect(toolMsgs.length).toBe(2); + expect(toolMsgs[0]?.toolCallId).toBe("tc-4"); + expect(toolMsgs[0]?.toolName).toBe("Bash"); + expect(toolMsgs[1]?.toolCallId).toBe("tc-4"); }); - test("handles nested OpenAI function chunks with tool_call_id + index continuation", async () => { + test("resolves nested OpenAI function chunks keyed by id field", async () => { const { transport } = makeFakeTransport([ - nestedFunctionChunk(1, '{"command":"echo', "msg-9", { - toolCallId: "tc-5", - toolName: "Bash", + nestedFunctionChunk(2, '{"query":"hello', "msg-9", { + toolId: "call_abc123", + toolName: "web_search", }), - nestedFunctionChunk(1, ' hi"}', "msg-9", { - toolName: "Bash", + nestedFunctionChunk(2, ' world"}', "msg-9", { + toolName: "web_search", }), reasoningChunk("msg-10"), ]); @@ -253,22 +235,22 @@ describe("tool call argument accumulation", () => { .runBackgroundPump(); const msgs = queuedMessages(session); - const toolMsg = msgs.find((m) => m.type === "tool_call"); - expect(toolMsg).toBeDefined(); - expect(toolMsg?.toolCallId).toBe("tc-5"); - expect(toolMsg?.toolName).toBe("Bash"); - expect(toolMsg?.toolInput).toEqual({ command: "echo hi" }); + const toolMsgs = msgs.filter((m) => m.type === "tool_call"); + + expect(toolMsgs.length).toBe(2); + expect(toolMsgs[0]?.toolCallId).toBe("call_abc123"); + expect(toolMsgs[0]?.toolName).toBe("web_search"); + expect(toolMsgs[1]?.toolCallId).toBe("call_abc123"); }); - test("handles nested OpenAI function chunks keyed by id + index continuation", async () => { + test("parallel tool calls emit independently with correct IDs", async () => { const { transport } = makeFakeTransport([ - nestedFunctionChunk(2, '{"query":"hello', "msg-11", { - toolId: "call_abc123", - toolName: "web_search", - }), - nestedFunctionChunk(2, ' world"}', "msg-11", { - toolName: "web_search", - }), + // Tool A: first chunk + indexedToolChunk(0, '{"command":"ls"}', "msg-11", { toolCallId: "tc-A" }), + // Tool B: first chunk + indexedToolChunk(1, '{"query":"test"}', "msg-11", { toolCallId: "tc-B", toolName: "web_search" }), + // Tool A: second chunk + indexedToolChunk(0, '', "msg-11"), reasoningChunk("msg-12"), ]); @@ -279,10 +261,12 @@ describe("tool call argument accumulation", () => { .runBackgroundPump(); const msgs = queuedMessages(session); - const toolMsg = msgs.find((m) => m.type === "tool_call"); - expect(toolMsg).toBeDefined(); - expect(toolMsg?.toolCallId).toBe("call_abc123"); - expect(toolMsg?.toolName).toBe("web_search"); - expect(toolMsg?.toolInput).toEqual({ query: "hello world" }); + const toolMsgs = msgs.filter((m) => m.type === "tool_call"); + + // 3 tool_call chunks emitted (2 for A, 1 for B) + expect(toolMsgs.length).toBe(3); + expect(toolMsgs[0]?.toolCallId).toBe("tc-A"); + expect(toolMsgs[1]?.toolCallId).toBe("tc-B"); + expect(toolMsgs[2]?.toolCallId).toBe("tc-A"); // continuation via index }); }); diff --git a/src/types.ts b/src/types.ts index b0c0b37..06b6e71 100644 --- a/src/types.ts +++ b/src/types.ts @@ -460,6 +460,8 @@ export interface SDKToolCallMessage { toolCallId: string; toolName: string; toolInput: Record; + /** Raw unparsed arguments string from the wire for consumer-side accumulation. */ + rawArguments?: string; uuid: string; }