fix: remove tool call accumulation from SDK, emit chunks as passthrough (#61)

This commit is contained in:
Cameron
2026-02-24 15:58:07 -08:00
committed by GitHub
parent 972bab33c2
commit 1333c5836c
5 changed files with 113 additions and 226 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "@letta-ai/letta-code-sdk",
"version": "0.1.6",
"version": "0.1.7",
"description": "SDK for programmatic control of Letta Code CLI",
"type": "module",
"main": "./dist/index.js",

View File

@@ -263,112 +263,21 @@ export class Session implements AsyncDisposable {
private async runBackgroundPump(): Promise<void> {
sessionLog("pump", "background pump started");
// Buffer for accumulating tool_call_message arguments.
// The CLI streams tool_call_message in chunks with partial `arguments`.
// We accumulate args per tool_call_id and flush the complete message when
// a different message type arrives or the pump ends.
// Tool call chunks are emitted immediately (no buffering).
// Consumers that need complete arguments should accumulate rawArguments
// across chunks sharing the same toolCallId on their side.
//
// IMPORTANT: The Letta API follows the OpenAI streaming format for parallel
// tool calls -- only the FIRST chunk per tool call includes `tool_call_id`.
// Subsequent chunks identify themselves via `index` only. We maintain an
// index-to-id mapping to associate argument deltas with their parent call.
const pendingToolCalls = new Map<string, {
wireMsg: WireMessage;
accumulatedArgs: string;
}>();
// Index-to-ID mapping: The Letta API follows the OpenAI streaming format
// for parallel tool calls -- only the first chunk per tool call includes
// tool_call_id, subsequent chunks identify themselves via index only.
// We maintain a lightweight index->id map so transformMessage() can
// resolve the ID for every chunk without buffering.
const indexToToolCallId = new Map<number, string>();
const mergeToolArgs = (existing: string, incoming: string): string => {
if (!incoming) return existing;
if (!existing) return incoming;
if (incoming === existing) return existing;
// Handle cumulative chunking where the latest chunk already includes prior text.
if (incoming.startsWith(existing)) return incoming;
if (existing.endsWith(incoming)) return existing;
// Handle delta chunking where each chunk is an append.
return `${existing}${incoming}`;
};
const extractToolCall = (wireMsgAny: Record<string, unknown>) => {
const toolCalls = wireMsgAny.tool_calls as Array<Record<string, unknown>> | undefined;
const toolCall = wireMsgAny.tool_call as Record<string, unknown> | undefined;
const tc = toolCalls?.[0] || toolCall;
if (!tc) return null;
// Resolve tool_call_id: direct field, or via index mapping from first chunk
let tcId = (tc.tool_call_id as string | undefined) ?? (tc.id as string | undefined);
const tcIndex = tc.index as number | undefined;
// Extract args from either flat format or OpenAI nested `function` format
const fnObj = tc.function as Record<string, unknown> | undefined;
const argsText = (tc.arguments as string | undefined) ?? (fnObj?.arguments as string | undefined) ?? "";
const toolName = (tc.name as string | undefined) ?? (fnObj?.name as string | undefined);
if (tcId && tcIndex !== undefined) {
// First chunk for this index: establish index -> id mapping
indexToToolCallId.set(tcIndex, tcId);
} else if (!tcId && tcIndex !== undefined) {
// Subsequent chunk: look up id by index
tcId = indexToToolCallId.get(tcIndex);
}
if (!tcId) return null;
return {
id: tcId,
name: toolName ?? "?",
args: argsText,
};
};
const flushPendingToolCalls = () => {
const patchToolCallArgs = (
tc: Record<string, unknown>,
args: string,
): Record<string, unknown> => {
const fnObj = tc.function as Record<string, unknown> | undefined;
return {
...tc,
// Keep top-level args up to date for flat-wire consumers.
arguments: args,
// Also patch nested OpenAI function args when present.
...(fnObj ? { function: { ...fnObj, arguments: args } } : {}),
};
};
for (const [, pending] of pendingToolCalls) {
// Patch the accumulated arguments into the wire message before transforming
const patched = { ...pending.wireMsg } as Record<string, unknown>;
const toolCalls = patched.tool_calls as Array<Record<string, unknown>> | undefined;
const toolCall = patched.tool_call as Record<string, unknown> | undefined;
if (toolCalls?.[0]) {
patched.tool_calls = [
patchToolCallArgs(toolCalls[0], pending.accumulatedArgs),
];
} else if (toolCall) {
patched.tool_call = patchToolCallArgs(
toolCall,
pending.accumulatedArgs,
);
}
const sdkMsg = this.transformMessage(patched as unknown as WireMessage);
if (sdkMsg) {
this.enqueueStreamMessage(sdkMsg);
}
}
pendingToolCalls.clear();
indexToToolCallId.clear();
};
for await (const wireMsg of this.transport.messages()) {
const wireMsgAny = wireMsg as unknown as Record<string, unknown>;
if (wireMsg.type === "control_request") {
// Ensure tool rows/args are visible before any runtime approval callback.
if (pendingToolCalls.size > 0) {
flushPendingToolCalls();
}
const handled = await this.handleControlRequest(wireMsg as ControlRequest);
if (!handled) {
sessionLog("pump", `DROPPED unsupported control_request: subtype=${(wireMsgAny.request as Record<string, unknown>)?.subtype || "N/A"}`);
@@ -393,39 +302,33 @@ export class Session implements AsyncDisposable {
continue;
}
// Accumulate tool_call_message arguments across streaming chunks.
// The CLI sends partial args in each chunk; we concatenate and flush
// the complete message when a different type arrives.
// For tool_call_message chunks, resolve index-based IDs before transform.
// The first chunk has both tool_call_id and index; subsequent chunks have
// only index. Patch the wire message so transformMessage() always sees an ID.
const messageType = wireMsgAny.message_type as string | undefined;
if (wireMsg.type === "message" && (messageType === "tool_call_message" || messageType === "approval_request_message")) {
const toolCall = extractToolCall(wireMsgAny);
if (toolCall) {
const existing = pendingToolCalls.get(toolCall.id);
if (existing) {
// Same tool_call_id: accumulate arguments
existing.accumulatedArgs = mergeToolArgs(
existing.accumulatedArgs,
toolCall.args
);
sessionLog("pump", `tool_call args accumulated for ${toolCall.id}: +${toolCall.args.length} chars`);
} else {
// New tool_call_id: buffer it
pendingToolCalls.set(toolCall.id, {
wireMsg,
accumulatedArgs: toolCall.args,
});
sessionLog("pump", `tool_call buffered: ${toolCall.name} id=${toolCall.id}`);
}
continue;
}
// No tool_call_id -- fall through to normal processing
}
const toolCalls = wireMsgAny.tool_calls as Array<Record<string, unknown>> | undefined;
const toolCall = wireMsgAny.tool_call as Record<string, unknown> | undefined;
const tc = toolCalls?.[0] || toolCall;
if (tc) {
const fnObj = tc.function as Record<string, unknown> | undefined;
const tcId = (tc.tool_call_id as string | undefined) ?? (tc.id as string | undefined);
const tcIndex = tc.index as number | undefined;
// Non-tool_call message: flush any pending tool calls first.
// Skip flush for stream_event to avoid premature flush of incomplete tool calls.
if (pendingToolCalls.size > 0 && wireMsg.type !== "stream_event") {
flushPendingToolCalls();
if (tcId && tcIndex !== undefined) {
indexToToolCallId.set(tcIndex, tcId);
} else if (!tcId && tcIndex !== undefined) {
const resolvedId = indexToToolCallId.get(tcIndex);
if (resolvedId) {
// Patch the resolved ID into the wire message for transformMessage
if (fnObj) {
tc.id = resolvedId;
} else {
tc.tool_call_id = resolvedId;
}
}
}
}
}
const sdkMsg = this.transformMessage(wireMsg);
@@ -436,11 +339,6 @@ export class Session implements AsyncDisposable {
}
}
// Flush any remaining buffered tool calls on pump end
if (pendingToolCalls.size > 0) {
flushPendingToolCalls();
}
sessionLog("pump", "background pump ended");
}
@@ -999,6 +897,7 @@ export class Session implements AsyncDisposable {
toolCallId,
toolName,
toolInput,
rawArguments: toolArgs || undefined,
uuid: msg.uuid,
};
}

View File

@@ -407,6 +407,7 @@ describe("Session", () => {
toolCallId: "call-approval-1",
toolName: "Bash",
toolInput: { command: "pwd" },
rawArguments: JSON.stringify({ command: "pwd" }),
uuid: "approval-1",
});
});
@@ -427,6 +428,7 @@ describe("Session", () => {
toolCallId: "call-approval-2",
toolName: "Read",
toolInput: { raw: "path=/tmp/foo.txt" },
rawArguments: "path=/tmp/foo.txt",
uuid: "approval-2",
});
});

View File

@@ -111,25 +111,13 @@ function reasoningChunk(uuid: string, text = "done"): WireMessage {
} as unknown as WireMessage;
}
function canUseToolRequest(requestId: string): WireMessage {
return {
type: "control_request",
request_id: requestId,
request: {
subtype: "can_use_tool",
tool_name: "Bash",
input: { command: "echo hi" },
},
} as unknown as WireMessage;
}
function queuedMessages(session: Session) {
return ((session as unknown as { streamQueue: unknown[] }).streamQueue ??
[]) as Array<Record<string, unknown>>;
}
describe("tool call argument accumulation", () => {
test("accumulates delta chunks and parses final tool input", async () => {
describe("tool call streaming passthrough", () => {
test("emits each chunk immediately with rawArguments", async () => {
const { transport } = makeFakeTransport([
toolChunk("tc-1", '{"command":"echo', "msg-1"),
toolChunk("tc-1", ' hi"}', "msg-1"),
@@ -143,62 +131,46 @@ describe("tool call argument accumulation", () => {
.runBackgroundPump();
const msgs = queuedMessages(session);
const toolMsg = msgs.find((m) => m.type === "tool_call");
expect(toolMsg).toBeDefined();
expect(toolMsg?.toolInput).toEqual({ command: "echo hi" });
const toolMsgs = msgs.filter((m) => m.type === "tool_call");
// Both chunks should be emitted individually (no buffering)
expect(toolMsgs.length).toBe(2);
// First chunk has partial args
expect(toolMsgs[0]?.toolCallId).toBe("tc-1");
expect(toolMsgs[0]?.rawArguments).toBe('{"command":"echo');
// Second chunk has the continuation
expect(toolMsgs[1]?.toolCallId).toBe("tc-1");
expect(toolMsgs[1]?.rawArguments).toBe(' hi"}');
// Reasoning message also present
expect(msgs.some((m) => m.type === "reasoning")).toBe(true);
});
test("flushes pending tool call before control_request callback runs", async () => {
const queueSizesSeenByCallback: number[] = [];
const canUseTool = mock(() => {
queueSizesSeenByCallback.push(
queuedMessages(session).filter((m) => m.type === "tool_call").length
);
return {
behavior: "allow" as const,
updatedInput: null,
updatedPermissions: [],
};
});
const { transport, writes } = makeFakeTransport([
toolChunk("tc-2", '{"command":"echo', "msg-3"),
toolChunk("tc-2", ' hi"}', "msg-3"),
canUseToolRequest("can-use-1"),
test("emits single complete chunk with parsed toolInput", async () => {
const { transport } = makeFakeTransport([
toolChunk("tc-2", '{"command":"echo hi"}', "msg-3"),
reasoningChunk("msg-4"),
]);
const session = new Session({
agentId: "agent-test",
canUseTool,
});
const session = new Session({ agentId: "agent-test" });
(session as unknown as { transport: FakeTransport }).transport = transport;
await (session as unknown as { runBackgroundPump: () => Promise<void> })
.runBackgroundPump();
expect(canUseTool).toHaveBeenCalledTimes(1);
expect(queueSizesSeenByCallback[0]).toBe(1);
expect(
writes.some((w) => {
const wire = w as {
type?: string;
response?: { request_id?: string; subtype?: string };
};
return (
wire.type === "control_response" &&
wire.response?.request_id === "can-use-1" &&
wire.response?.subtype === "success"
);
})
).toBe(true);
const msgs = queuedMessages(session);
const toolMsg = msgs.find((m) => m.type === "tool_call");
expect(toolMsg).toBeDefined();
expect(toolMsg?.toolInput).toEqual({ command: "echo hi" });
expect(toolMsg?.rawArguments).toBe('{"command":"echo hi"}');
});
test("handles cumulative chunks without duplicating prior arguments", async () => {
test("resolves index-only continuation chunks to correct toolCallId", async () => {
const { transport } = makeFakeTransport([
toolChunk("tc-3", '{"command":"ec', "msg-5"),
toolChunk("tc-3", '{"command":"echo hi"}', "msg-5"),
indexedToolChunk(0, '{"command":"echo', "msg-5", { toolCallId: "tc-3" }),
indexedToolChunk(0, ' hi"}', "msg-5"),
reasoningChunk("msg-6"),
]);
@@ -209,15 +181,23 @@ describe("tool call argument accumulation", () => {
.runBackgroundPump();
const msgs = queuedMessages(session);
const toolMsg = msgs.find((m) => m.type === "tool_call");
expect(toolMsg).toBeDefined();
expect(toolMsg?.toolInput).toEqual({ command: "echo hi" });
const toolMsgs = msgs.filter((m) => m.type === "tool_call");
// Both chunks emitted, both resolved to same toolCallId
expect(toolMsgs.length).toBe(2);
expect(toolMsgs[0]?.toolCallId).toBe("tc-3");
expect(toolMsgs[1]?.toolCallId).toBe("tc-3");
});
test("accumulates index-only continuation chunks after first id-bearing chunk", async () => {
test("resolves nested OpenAI function chunks with id + index continuation", async () => {
const { transport } = makeFakeTransport([
indexedToolChunk(0, '{"command":"echo', "msg-7", { toolCallId: "tc-4" }),
indexedToolChunk(0, ' hi"}', "msg-7"),
nestedFunctionChunk(1, '{"command":"echo', "msg-7", {
toolCallId: "tc-4",
toolName: "Bash",
}),
nestedFunctionChunk(1, ' hi"}', "msg-7", {
toolName: "Bash",
}),
reasoningChunk("msg-8"),
]);
@@ -228,20 +208,22 @@ describe("tool call argument accumulation", () => {
.runBackgroundPump();
const msgs = queuedMessages(session);
const toolMsg = msgs.find((m) => m.type === "tool_call");
expect(toolMsg).toBeDefined();
expect(toolMsg?.toolCallId).toBe("tc-4");
expect(toolMsg?.toolInput).toEqual({ command: "echo hi" });
const toolMsgs = msgs.filter((m) => m.type === "tool_call");
expect(toolMsgs.length).toBe(2);
expect(toolMsgs[0]?.toolCallId).toBe("tc-4");
expect(toolMsgs[0]?.toolName).toBe("Bash");
expect(toolMsgs[1]?.toolCallId).toBe("tc-4");
});
test("handles nested OpenAI function chunks with tool_call_id + index continuation", async () => {
test("resolves nested OpenAI function chunks keyed by id field", async () => {
const { transport } = makeFakeTransport([
nestedFunctionChunk(1, '{"command":"echo', "msg-9", {
toolCallId: "tc-5",
toolName: "Bash",
nestedFunctionChunk(2, '{"query":"hello', "msg-9", {
toolId: "call_abc123",
toolName: "web_search",
}),
nestedFunctionChunk(1, ' hi"}', "msg-9", {
toolName: "Bash",
nestedFunctionChunk(2, ' world"}', "msg-9", {
toolName: "web_search",
}),
reasoningChunk("msg-10"),
]);
@@ -253,22 +235,22 @@ describe("tool call argument accumulation", () => {
.runBackgroundPump();
const msgs = queuedMessages(session);
const toolMsg = msgs.find((m) => m.type === "tool_call");
expect(toolMsg).toBeDefined();
expect(toolMsg?.toolCallId).toBe("tc-5");
expect(toolMsg?.toolName).toBe("Bash");
expect(toolMsg?.toolInput).toEqual({ command: "echo hi" });
const toolMsgs = msgs.filter((m) => m.type === "tool_call");
expect(toolMsgs.length).toBe(2);
expect(toolMsgs[0]?.toolCallId).toBe("call_abc123");
expect(toolMsgs[0]?.toolName).toBe("web_search");
expect(toolMsgs[1]?.toolCallId).toBe("call_abc123");
});
test("handles nested OpenAI function chunks keyed by id + index continuation", async () => {
test("parallel tool calls emit independently with correct IDs", async () => {
const { transport } = makeFakeTransport([
nestedFunctionChunk(2, '{"query":"hello', "msg-11", {
toolId: "call_abc123",
toolName: "web_search",
}),
nestedFunctionChunk(2, ' world"}', "msg-11", {
toolName: "web_search",
}),
// Tool A: first chunk
indexedToolChunk(0, '{"command":"ls"}', "msg-11", { toolCallId: "tc-A" }),
// Tool B: first chunk
indexedToolChunk(1, '{"query":"test"}', "msg-11", { toolCallId: "tc-B", toolName: "web_search" }),
// Tool A: second chunk
indexedToolChunk(0, '', "msg-11"),
reasoningChunk("msg-12"),
]);
@@ -279,10 +261,12 @@ describe("tool call argument accumulation", () => {
.runBackgroundPump();
const msgs = queuedMessages(session);
const toolMsg = msgs.find((m) => m.type === "tool_call");
expect(toolMsg).toBeDefined();
expect(toolMsg?.toolCallId).toBe("call_abc123");
expect(toolMsg?.toolName).toBe("web_search");
expect(toolMsg?.toolInput).toEqual({ query: "hello world" });
const toolMsgs = msgs.filter((m) => m.type === "tool_call");
// 3 tool_call chunks emitted (2 for A, 1 for B)
expect(toolMsgs.length).toBe(3);
expect(toolMsgs[0]?.toolCallId).toBe("tc-A");
expect(toolMsgs[1]?.toolCallId).toBe("tc-B");
expect(toolMsgs[2]?.toolCallId).toBe("tc-A"); // continuation via index
});
});

View File

@@ -460,6 +460,8 @@ export interface SDKToolCallMessage {
toolCallId: string;
toolName: string;
toolInput: Record<string, unknown>;
/** Raw unparsed arguments string from the wire for consumer-side accumulation. */
rawArguments?: string;
uuid: string;
}