diff --git a/src/cli/helpers/accumulator.ts b/src/cli/helpers/accumulator.ts index f454b74..3e82315 100644 --- a/src/cli/helpers/accumulator.ts +++ b/src/cli/helpers/accumulator.ts @@ -238,6 +238,12 @@ export type Buffers = { pendingToolByRun: Map; // temporary id per run until real id toolCallIdToLineId: Map; lastOtid: string | null; // Track the last otid to detect transitions + // Alias maps to keep assistant deltas on one line when streams mix id/otid. + assistantCanonicalByMessageId: Map; + assistantCanonicalByOtid: Map; + // Alias maps to keep reasoning deltas on one line when streams mix id/otid. + reasoningCanonicalByMessageId: Map; + reasoningCanonicalByOtid: Map; pendingRefresh?: boolean; // Track throttled refresh state interrupted?: boolean; // Track if stream was interrupted by user (skip stale refreshes) commitGeneration?: number; // Incremented when resuming from error to invalidate pending refreshes @@ -273,6 +279,10 @@ export function createBuffers(agentId?: string): Buffers { pendingToolByRun: new Map(), toolCallIdToLineId: new Map(), lastOtid: null, + assistantCanonicalByMessageId: new Map(), + assistantCanonicalByOtid: new Map(), + reasoningCanonicalByMessageId: new Map(), + reasoningCanonicalByOtid: new Map(), commitGeneration: 0, abortGeneration: 0, usage: { @@ -441,6 +451,107 @@ function extractTextPart(v: unknown): string { return ""; } +function resolveAssistantLineId( + b: Buffers, + chunk: LettaStreamingResponse & { id?: string; otid?: string }, +): string | undefined { + const messageId = typeof chunk.id === "string" ? chunk.id : undefined; + const otid = typeof chunk.otid === "string" ? chunk.otid : undefined; + + const canonicalFromMessageId = messageId + ? b.assistantCanonicalByMessageId.get(messageId) + : undefined; + const canonicalFromOtid = otid + ? b.assistantCanonicalByOtid.get(otid) + : undefined; + + let canonical = + canonicalFromMessageId || canonicalFromOtid || messageId || otid; + if (!canonical) return undefined; + + // If both aliases exist but disagree, prefer the one that already has a line. + if ( + canonicalFromMessageId && + canonicalFromOtid && + canonicalFromMessageId !== canonicalFromOtid + ) { + const messageLineExists = b.byId.has(canonicalFromMessageId); + const otidLineExists = b.byId.has(canonicalFromOtid); + + if (messageLineExists && !otidLineExists) { + canonical = canonicalFromMessageId; + } else if (otidLineExists && !messageLineExists) { + canonical = canonicalFromOtid; + } else { + canonical = canonicalFromMessageId; + } + + debugLog( + "accumulator", + `Assistant id/otid alias conflict resolved to ${canonical}`, + ); + } + + if (messageId) { + b.assistantCanonicalByMessageId.set(messageId, canonical); + } + if (otid) { + b.assistantCanonicalByOtid.set(otid, canonical); + } + + return canonical; +} + +function resolveReasoningLineId( + b: Buffers, + chunk: LettaStreamingResponse & { id?: string; otid?: string }, +): string | undefined { + const messageId = typeof chunk.id === "string" ? chunk.id : undefined; + const otid = typeof chunk.otid === "string" ? chunk.otid : undefined; + + const canonicalFromMessageId = messageId + ? b.reasoningCanonicalByMessageId.get(messageId) + : undefined; + const canonicalFromOtid = otid + ? b.reasoningCanonicalByOtid.get(otid) + : undefined; + + let canonical = + canonicalFromMessageId || canonicalFromOtid || messageId || otid; + if (!canonical) return undefined; + + if ( + canonicalFromMessageId && + canonicalFromOtid && + canonicalFromMessageId !== canonicalFromOtid + ) { + const messageLineExists = b.byId.has(canonicalFromMessageId); + const otidLineExists = b.byId.has(canonicalFromOtid); + + if (messageLineExists && !otidLineExists) { + canonical = canonicalFromMessageId; + } else if (otidLineExists && !messageLineExists) { + canonical = canonicalFromOtid; + } else { + canonical = canonicalFromMessageId; + } + + debugLog( + "accumulator", + `Reasoning id/otid alias conflict resolved to ${canonical}`, + ); + } + + if (messageId) { + b.reasoningCanonicalByMessageId.set(messageId, canonical); + } + if (otid) { + b.reasoningCanonicalByOtid.set(otid, canonical); + } + + return canonical; +} + /** * Attempts to split content at a paragraph boundary for aggressive static promotion. * If split found, creates a committed line for "before" and updates original with "after". @@ -518,7 +629,11 @@ export function onChunk( switch (chunk.message_type) { case "reasoning_message": { - const id = chunk.otid; + const chunkWithIds = chunk as LettaStreamingResponse & { + id?: string; + otid?: string; + }; + const id = resolveReasoningLineId(b, chunkWithIds); // console.log(`[REASONING] Received chunk with otid=${id}, delta="${chunk.reasoning?.substring(0, 50)}..."`); if (!id) { // console.log(`[REASONING] No otid, breaking`); @@ -550,7 +665,13 @@ export function onChunk( } case "assistant_message": { - const id = chunk.otid; + const chunkWithIds = chunk as LettaStreamingResponse & { + id?: string; + otid?: string; + }; + // Resolve to a stable line id across mixed streams where some chunks + // have only id, only otid, or both. + const id = resolveAssistantLineId(b, chunkWithIds); if (!id) break; // Handle otid transition (mark previous line as finished) diff --git a/src/cli/helpers/backfill.ts b/src/cli/helpers/backfill.ts index 753fe23..b54b3e8 100644 --- a/src/cli/helpers/backfill.ts +++ b/src/cli/helpers/backfill.ts @@ -167,6 +167,10 @@ export function backfillBuffers(buffers: Buffers, history: Message[]): void { buffers.toolCallIdToLineId.clear(); buffers.pendingToolByRun.clear(); buffers.lastOtid = null; + buffers.assistantCanonicalByMessageId.clear(); + buffers.assistantCanonicalByOtid.clear(); + buffers.reasoningCanonicalByMessageId.clear(); + buffers.reasoningCanonicalByOtid.clear(); // Note: we don't reset tokenCount here (it resets per-turn in onSubmit) // Iterate over the history and add the messages to the buffers diff --git a/src/tests/cli/accumulator-usage.test.ts b/src/tests/cli/accumulator-usage.test.ts index 16a7b22..0656fb9 100644 --- a/src/tests/cli/accumulator-usage.test.ts +++ b/src/tests/cli/accumulator-usage.test.ts @@ -105,4 +105,136 @@ describe("accumulator usage statistics", () => { expect(tracker.pendingSkillsReinject).toBe(true); expect(tracker.pendingReflectionTrigger).toBe(true); }); + + test("accumulates assistant messages when otid is missing but id is present", () => { + const buffers = createBuffers(); + + onChunk(buffers, { + message_type: "assistant_message", + id: "assistant-fallback-1", + content: [{ type: "text", text: "Hello " }], + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "assistant_message", + id: "assistant-fallback-1", + content: [{ type: "text", text: "world" }], + } as unknown as LettaStreamingResponse); + + const line = buffers.byId.get("assistant-fallback-1"); + expect(line?.kind).toBe("assistant"); + expect(line && "text" in line ? line.text : "").toBe("Hello world"); + }); + + test("keeps one assistant line when stream transitions id -> both -> otid", () => { + const buffers = createBuffers(); + + onChunk(buffers, { + message_type: "assistant_message", + id: "assistant-msg-1", + content: [{ type: "text", text: "Hello " }], + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "assistant_message", + id: "assistant-msg-1", + otid: "assistant-otid-1", + content: [{ type: "text", text: "from " }], + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "assistant_message", + otid: "assistant-otid-1", + content: [{ type: "text", text: "stream" }], + } as unknown as LettaStreamingResponse); + + const line = buffers.byId.get("assistant-msg-1"); + expect(line?.kind).toBe("assistant"); + expect(line && "text" in line ? line.text : "").toBe("Hello from stream"); + expect(buffers.byId.get("assistant-otid-1")).toBeUndefined(); + }); + + test("keeps one assistant line when stream transitions otid -> both -> id", () => { + const buffers = createBuffers(); + + onChunk(buffers, { + message_type: "assistant_message", + otid: "assistant-otid-2", + content: [{ type: "text", text: "Hello " }], + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "assistant_message", + id: "assistant-msg-2", + otid: "assistant-otid-2", + content: [{ type: "text", text: "from " }], + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "assistant_message", + id: "assistant-msg-2", + content: [{ type: "text", text: "stream" }], + } as unknown as LettaStreamingResponse); + + const line = buffers.byId.get("assistant-otid-2"); + expect(line?.kind).toBe("assistant"); + expect(line && "text" in line ? line.text : "").toBe("Hello from stream"); + expect(buffers.byId.get("assistant-msg-2")).toBeUndefined(); + }); + + test("keeps one reasoning line when stream transitions id -> both -> otid", () => { + const buffers = createBuffers(); + + onChunk(buffers, { + message_type: "reasoning_message", + id: "reasoning-msg-1", + reasoning: "Think ", + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "reasoning_message", + id: "reasoning-msg-1", + otid: "reasoning-otid-1", + reasoning: "through ", + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "reasoning_message", + otid: "reasoning-otid-1", + reasoning: "it", + } as unknown as LettaStreamingResponse); + + const line = buffers.byId.get("reasoning-msg-1"); + expect(line?.kind).toBe("reasoning"); + expect(line && "text" in line ? line.text : "").toBe("Think through it"); + expect(buffers.byId.get("reasoning-otid-1")).toBeUndefined(); + }); + + test("keeps one reasoning line when stream transitions otid -> both -> id", () => { + const buffers = createBuffers(); + + onChunk(buffers, { + message_type: "reasoning_message", + otid: "reasoning-otid-2", + reasoning: "Think ", + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "reasoning_message", + id: "reasoning-msg-2", + otid: "reasoning-otid-2", + reasoning: "through ", + } as unknown as LettaStreamingResponse); + + onChunk(buffers, { + message_type: "reasoning_message", + id: "reasoning-msg-2", + reasoning: "it", + } as unknown as LettaStreamingResponse); + + const line = buffers.byId.get("reasoning-otid-2"); + expect(line?.kind).toBe("reasoning"); + expect(line && "text" in line ? line.text : "").toBe("Think through it"); + expect(buffers.byId.get("reasoning-msg-2")).toBeUndefined(); + }); });