From b6394fb3a573deb1a087c3d6d928d6ad3f01a04e Mon Sep 17 00:00:00 2001 From: Cameron Date: Tue, 24 Feb 2026 16:16:53 -0800 Subject: [PATCH] feat: client-side tool call accumulation in dedupedStream (#389) --- package-lock.json | 16 ++++++------- src/core/bot.ts | 61 +++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 62 insertions(+), 15 deletions(-) diff --git a/package-lock.json b/package-lock.json index 78340ba..577eca7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,7 +12,7 @@ "@clack/prompts": "^0.11.0", "@hapi/boom": "^10.0.1", "@letta-ai/letta-client": "^1.7.8", - "@letta-ai/letta-code-sdk": "^0.1.6", + "@letta-ai/letta-code-sdk": "^0.1.8", "@types/express": "^5.0.6", "@types/node": "^25.0.10", "@types/node-schedule": "^2.1.8", @@ -1266,9 +1266,9 @@ "license": "Apache-2.0" }, "node_modules/@letta-ai/letta-code": { - "version": "0.16.8", - "resolved": "https://registry.npmjs.org/@letta-ai/letta-code/-/letta-code-0.16.8.tgz", - "integrity": "sha512-++L6O6qFiehLG6UZFHWpvLh4fgR2FNIDgOj/eRJu6LlBzajkErGVN3nrkJpNe37AEwsLKCWhXrhNIyHQZEBvQw==", + "version": "0.16.9", + "resolved": "https://registry.npmjs.org/@letta-ai/letta-code/-/letta-code-0.16.9.tgz", + "integrity": "sha512-Rsw0guXuMYxBESrLfz0ZgtP/vRlGsjIOkbThCy9LDPGYxgqhtVK4nDwrYsSbvSVXV9LWMpVvjLSRuVHg2/xDhQ==", "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { @@ -1290,12 +1290,12 @@ } }, "node_modules/@letta-ai/letta-code-sdk": { - "version": "0.1.6", - "resolved": "https://registry.npmjs.org/@letta-ai/letta-code-sdk/-/letta-code-sdk-0.1.6.tgz", - "integrity": "sha512-sTs7ptc34dzwQ2zDP4HH7h7SA+RMfVVjYiMG+kYXGW5I+kzwVZH3ocW5YnGzcq3KncRHGfsGPWYXLYUK80V/FA==", + "version": "0.1.8", + "resolved": "https://registry.npmjs.org/@letta-ai/letta-code-sdk/-/letta-code-sdk-0.1.8.tgz", + "integrity": "sha512-/y6yFEmwdW3MC303LK4rIQu7ZPgMhkijUmshRZ2ZhLUtyBxqEw0G6EY60Gcf66wEzU6y5YfL+yCukIieKJgekQ==", "license": "Apache-2.0", "dependencies": { - "@letta-ai/letta-code": "0.16.8" + "@letta-ai/letta-code": "0.16.9" } }, "node_modules/@letta-ai/letta-code/node_modules/balanced-match": { diff --git a/src/core/bot.ts b/src/core/bot.ts index 5bdf5c9..f0766ef 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -1122,23 +1122,68 @@ export class LettaBot implements AgentSession { // Persist conversation ID immediately after successful send, before streaming. this.persistSessionState(session, convKey); - // Return session and a deduplicated stream generator - const seenToolCallIds = new Set(); + // Return session and a stream generator that buffers tool_call chunks and + // flushes them with fully accumulated arguments on the next type boundary. + // This ensures display messages always have complete args (channels can't + // edit messages after sending). + const pendingToolCalls = new Map(); const self = this; const capturedConvKey = convKey; // Capture for closure + /** Merge tool argument strings, handling both delta and cumulative chunking. */ + function mergeToolArgs(existing: string, incoming: string): string { + if (!incoming) return existing; + if (!existing) return incoming; + if (incoming === existing) return existing; + // Cumulative: latest chunk includes all prior text + if (incoming.startsWith(existing)) return incoming; + if (existing.endsWith(incoming)) return existing; + // Delta: each chunk is an append + return `${existing}${incoming}`; + } + + function* flushPending(): Generator { + for (const [, pending] of pendingToolCalls) { + if (!pending.accumulatedArgs) { + // No rawArguments accumulated (old SDK or single complete chunk) -- + // preserve the original toolInput from the first chunk as-is. + yield pending.msg; + continue; + } + let toolInput: Record = {}; + try { toolInput = JSON.parse(pending.accumulatedArgs); } + catch { toolInput = { raw: pending.accumulatedArgs }; } + yield { ...pending.msg, toolInput }; + } + pendingToolCalls.clear(); + } + async function* dedupedStream(): AsyncGenerator { for await (const raw of session.stream()) { const msg = raw as StreamMsg; - // Deduplicate tool_call chunks (server streams token-by-token) if (msg.type === 'tool_call') { const id = msg.toolCallId; - if (id && seenToolCallIds.has(id)) continue; - if (id) seenToolCallIds.add(id); + if (!id) { yield msg; continue; } + + const incoming = (msg as StreamMsg & { rawArguments?: string }).rawArguments || ''; + const existing = pendingToolCalls.get(id); + if (existing) { + existing.accumulatedArgs = mergeToolArgs(existing.accumulatedArgs, incoming); + } else { + pendingToolCalls.set(id, { msg, accumulatedArgs: incoming }); + } + continue; // buffer, don't yield yet + } + + // Flush pending tool calls on semantic type boundary (not stream_event) + if (pendingToolCalls.size > 0 && msg.type !== 'stream_event') { + yield* flushPending(); } if (msg.type === 'result') { + // Flush any remaining before result + yield* flushPending(); self.persistSessionState(session, capturedConvKey); } @@ -1148,6 +1193,9 @@ export class LettaBot implements AgentSession { break; } } + + // Flush remaining at generator end (shouldn't normally happen) + yield* flushPending(); } return { session, stream: dedupedStream }; @@ -1570,7 +1618,6 @@ export class LettaBot implements AgentSession { let lastErrorDetail: { message: string; stopReason: string; apiError?: Record } | null = null; let retryInfo: { attempt: number; maxAttempts: number; reason: string } | null = null; let reasoningBuffer = ''; - // Tool call displays fire immediately on arrival (SDK now accumulates args). const msgTypeCounts: Record = {}; const parseAndHandleDirectives = async () => { @@ -1679,7 +1726,7 @@ export class LettaBot implements AgentSession { const tcId = streamMsg.toolCallId?.slice(0, 12) || '?'; log.info(`>>> TOOL CALL: ${tcName} (id: ${tcId})`); sawNonAssistantSinceLastUuid = true; - // Display tool call immediately (args are now populated by SDK accumulation fix) + // Display tool call (args are fully accumulated by dedupedStream buffer-and-flush) if (this.config.display?.showToolCalls && !suppressDelivery) { try { const text = this.formatToolCallDisplay(streamMsg);