fix: canonicalize assistant and reasoning stream ids (#926)

This commit is contained in:
Charles Packer
2026-02-11 23:59:26 -08:00
committed by GitHub
parent 9630da190a
commit 8f784b78ff
3 changed files with 259 additions and 2 deletions

View File

@@ -238,6 +238,12 @@ export type Buffers = {
pendingToolByRun: Map<string, string>; // temporary id per run until real id
toolCallIdToLineId: Map<string, string>;
lastOtid: string | null; // Track the last otid to detect transitions
// Alias maps to keep assistant deltas on one line when streams mix id/otid.
assistantCanonicalByMessageId: Map<string, string>;
assistantCanonicalByOtid: Map<string, string>;
// Alias maps to keep reasoning deltas on one line when streams mix id/otid.
reasoningCanonicalByMessageId: Map<string, string>;
reasoningCanonicalByOtid: Map<string, string>;
pendingRefresh?: boolean; // Track throttled refresh state
interrupted?: boolean; // Track if stream was interrupted by user (skip stale refreshes)
commitGeneration?: number; // Incremented when resuming from error to invalidate pending refreshes
@@ -273,6 +279,10 @@ export function createBuffers(agentId?: string): Buffers {
pendingToolByRun: new Map(),
toolCallIdToLineId: new Map(),
lastOtid: null,
assistantCanonicalByMessageId: new Map(),
assistantCanonicalByOtid: new Map(),
reasoningCanonicalByMessageId: new Map(),
reasoningCanonicalByOtid: new Map(),
commitGeneration: 0,
abortGeneration: 0,
usage: {
@@ -441,6 +451,107 @@ function extractTextPart(v: unknown): string {
return "";
}
function resolveAssistantLineId(
b: Buffers,
chunk: LettaStreamingResponse & { id?: string; otid?: string },
): string | undefined {
const messageId = typeof chunk.id === "string" ? chunk.id : undefined;
const otid = typeof chunk.otid === "string" ? chunk.otid : undefined;
const canonicalFromMessageId = messageId
? b.assistantCanonicalByMessageId.get(messageId)
: undefined;
const canonicalFromOtid = otid
? b.assistantCanonicalByOtid.get(otid)
: undefined;
let canonical =
canonicalFromMessageId || canonicalFromOtid || messageId || otid;
if (!canonical) return undefined;
// If both aliases exist but disagree, prefer the one that already has a line.
if (
canonicalFromMessageId &&
canonicalFromOtid &&
canonicalFromMessageId !== canonicalFromOtid
) {
const messageLineExists = b.byId.has(canonicalFromMessageId);
const otidLineExists = b.byId.has(canonicalFromOtid);
if (messageLineExists && !otidLineExists) {
canonical = canonicalFromMessageId;
} else if (otidLineExists && !messageLineExists) {
canonical = canonicalFromOtid;
} else {
canonical = canonicalFromMessageId;
}
debugLog(
"accumulator",
`Assistant id/otid alias conflict resolved to ${canonical}`,
);
}
if (messageId) {
b.assistantCanonicalByMessageId.set(messageId, canonical);
}
if (otid) {
b.assistantCanonicalByOtid.set(otid, canonical);
}
return canonical;
}
function resolveReasoningLineId(
b: Buffers,
chunk: LettaStreamingResponse & { id?: string; otid?: string },
): string | undefined {
const messageId = typeof chunk.id === "string" ? chunk.id : undefined;
const otid = typeof chunk.otid === "string" ? chunk.otid : undefined;
const canonicalFromMessageId = messageId
? b.reasoningCanonicalByMessageId.get(messageId)
: undefined;
const canonicalFromOtid = otid
? b.reasoningCanonicalByOtid.get(otid)
: undefined;
let canonical =
canonicalFromMessageId || canonicalFromOtid || messageId || otid;
if (!canonical) return undefined;
if (
canonicalFromMessageId &&
canonicalFromOtid &&
canonicalFromMessageId !== canonicalFromOtid
) {
const messageLineExists = b.byId.has(canonicalFromMessageId);
const otidLineExists = b.byId.has(canonicalFromOtid);
if (messageLineExists && !otidLineExists) {
canonical = canonicalFromMessageId;
} else if (otidLineExists && !messageLineExists) {
canonical = canonicalFromOtid;
} else {
canonical = canonicalFromMessageId;
}
debugLog(
"accumulator",
`Reasoning id/otid alias conflict resolved to ${canonical}`,
);
}
if (messageId) {
b.reasoningCanonicalByMessageId.set(messageId, canonical);
}
if (otid) {
b.reasoningCanonicalByOtid.set(otid, canonical);
}
return canonical;
}
/**
* Attempts to split content at a paragraph boundary for aggressive static promotion.
* If split found, creates a committed line for "before" and updates original with "after".
@@ -518,7 +629,11 @@ export function onChunk(
switch (chunk.message_type) {
case "reasoning_message": {
const id = chunk.otid;
const chunkWithIds = chunk as LettaStreamingResponse & {
id?: string;
otid?: string;
};
const id = resolveReasoningLineId(b, chunkWithIds);
// console.log(`[REASONING] Received chunk with otid=${id}, delta="${chunk.reasoning?.substring(0, 50)}..."`);
if (!id) {
// console.log(`[REASONING] No otid, breaking`);
@@ -550,7 +665,13 @@ export function onChunk(
}
case "assistant_message": {
const id = chunk.otid;
const chunkWithIds = chunk as LettaStreamingResponse & {
id?: string;
otid?: string;
};
// Resolve to a stable line id across mixed streams where some chunks
// have only id, only otid, or both.
const id = resolveAssistantLineId(b, chunkWithIds);
if (!id) break;
// Handle otid transition (mark previous line as finished)

View File

@@ -167,6 +167,10 @@ export function backfillBuffers(buffers: Buffers, history: Message[]): void {
buffers.toolCallIdToLineId.clear();
buffers.pendingToolByRun.clear();
buffers.lastOtid = null;
buffers.assistantCanonicalByMessageId.clear();
buffers.assistantCanonicalByOtid.clear();
buffers.reasoningCanonicalByMessageId.clear();
buffers.reasoningCanonicalByOtid.clear();
// Note: we don't reset tokenCount here (it resets per-turn in onSubmit)
// Iterate over the history and add the messages to the buffers

View File

@@ -105,4 +105,136 @@ describe("accumulator usage statistics", () => {
expect(tracker.pendingSkillsReinject).toBe(true);
expect(tracker.pendingReflectionTrigger).toBe(true);
});
test("accumulates assistant messages when otid is missing but id is present", () => {
const buffers = createBuffers();
onChunk(buffers, {
message_type: "assistant_message",
id: "assistant-fallback-1",
content: [{ type: "text", text: "Hello " }],
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "assistant_message",
id: "assistant-fallback-1",
content: [{ type: "text", text: "world" }],
} as unknown as LettaStreamingResponse);
const line = buffers.byId.get("assistant-fallback-1");
expect(line?.kind).toBe("assistant");
expect(line && "text" in line ? line.text : "").toBe("Hello world");
});
test("keeps one assistant line when stream transitions id -> both -> otid", () => {
const buffers = createBuffers();
onChunk(buffers, {
message_type: "assistant_message",
id: "assistant-msg-1",
content: [{ type: "text", text: "Hello " }],
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "assistant_message",
id: "assistant-msg-1",
otid: "assistant-otid-1",
content: [{ type: "text", text: "from " }],
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "assistant_message",
otid: "assistant-otid-1",
content: [{ type: "text", text: "stream" }],
} as unknown as LettaStreamingResponse);
const line = buffers.byId.get("assistant-msg-1");
expect(line?.kind).toBe("assistant");
expect(line && "text" in line ? line.text : "").toBe("Hello from stream");
expect(buffers.byId.get("assistant-otid-1")).toBeUndefined();
});
test("keeps one assistant line when stream transitions otid -> both -> id", () => {
const buffers = createBuffers();
onChunk(buffers, {
message_type: "assistant_message",
otid: "assistant-otid-2",
content: [{ type: "text", text: "Hello " }],
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "assistant_message",
id: "assistant-msg-2",
otid: "assistant-otid-2",
content: [{ type: "text", text: "from " }],
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "assistant_message",
id: "assistant-msg-2",
content: [{ type: "text", text: "stream" }],
} as unknown as LettaStreamingResponse);
const line = buffers.byId.get("assistant-otid-2");
expect(line?.kind).toBe("assistant");
expect(line && "text" in line ? line.text : "").toBe("Hello from stream");
expect(buffers.byId.get("assistant-msg-2")).toBeUndefined();
});
test("keeps one reasoning line when stream transitions id -> both -> otid", () => {
const buffers = createBuffers();
onChunk(buffers, {
message_type: "reasoning_message",
id: "reasoning-msg-1",
reasoning: "Think ",
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "reasoning_message",
id: "reasoning-msg-1",
otid: "reasoning-otid-1",
reasoning: "through ",
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "reasoning_message",
otid: "reasoning-otid-1",
reasoning: "it",
} as unknown as LettaStreamingResponse);
const line = buffers.byId.get("reasoning-msg-1");
expect(line?.kind).toBe("reasoning");
expect(line && "text" in line ? line.text : "").toBe("Think through it");
expect(buffers.byId.get("reasoning-otid-1")).toBeUndefined();
});
test("keeps one reasoning line when stream transitions otid -> both -> id", () => {
const buffers = createBuffers();
onChunk(buffers, {
message_type: "reasoning_message",
otid: "reasoning-otid-2",
reasoning: "Think ",
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "reasoning_message",
id: "reasoning-msg-2",
otid: "reasoning-otid-2",
reasoning: "through ",
} as unknown as LettaStreamingResponse);
onChunk(buffers, {
message_type: "reasoning_message",
id: "reasoning-msg-2",
reasoning: "it",
} as unknown as LettaStreamingResponse);
const line = buffers.byId.get("reasoning-otid-2");
expect(line?.kind).toBe("reasoning");
expect(line && "text" in line ? line.text : "").toBe("Think through it");
expect(buffers.byId.get("reasoning-msg-2")).toBeUndefined();
});
});