fix(resume): harden transcript backfill (#1082)
This commit is contained in:
@@ -8,8 +8,39 @@ import type { Message } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import type { ApprovalRequest } from "../cli/helpers/stream";
|
||||
import { debugWarn } from "../utils/debug";
|
||||
|
||||
// Number of recent messages to backfill when resuming a session
|
||||
const MESSAGE_HISTORY_LIMIT = 15;
|
||||
// Backfill should feel like "the last turn(s)", not "the last N raw messages".
|
||||
// Tool-heavy turns can generate many tool_call/tool_return messages that would
|
||||
// otherwise push the most recent assistant/user messages out of the window.
|
||||
const BACKFILL_PRIMARY_MESSAGE_LIMIT = 12; // user/assistant/reasoning/event/summary
|
||||
const BACKFILL_MAX_RENDERABLE_MESSAGES = 80; // safety cap
|
||||
|
||||
// Note: We intentionally do not include tool-call / tool-return chatter in the
|
||||
// resume backfill. Pending approvals are handled via `pendingApprovals` and
|
||||
// shown separately in the UI. Including tool logs here makes resume feel like a
|
||||
// corrupted replay when the last "turn" was tool-heavy.
|
||||
|
||||
// Stop fetching once we have enough actual conversational anchors.
|
||||
// Reasoning can be extremely tool-step heavy, so anchor on user/assistant.
|
||||
const BACKFILL_ANCHOR_MESSAGE_LIMIT = 6;
|
||||
|
||||
// We fetch more than we render so tool-heavy turns don't push the last
|
||||
// user-visible assistant message out of the backfill window.
|
||||
const BACKFILL_PAGE_LIMIT = 200;
|
||||
const BACKFILL_MAX_PAGES = 25; // 5k messages max
|
||||
const BACKFILL_MIN_ASSISTANT = 1;
|
||||
|
||||
function isPrimaryMessageType(messageType: string | undefined): boolean {
|
||||
return (
|
||||
messageType === "user_message" ||
|
||||
messageType === "assistant_message" ||
|
||||
messageType === "event_message" ||
|
||||
messageType === "summary_message"
|
||||
);
|
||||
}
|
||||
|
||||
function isAnchorMessageType(messageType: string | undefined): boolean {
|
||||
return messageType === "user_message" || messageType === "assistant_message";
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if message backfilling is enabled via LETTA_BACKFILL env var.
|
||||
@@ -89,9 +120,104 @@ export function extractApprovals(messageToCheck: Message): {
|
||||
* Prepare message history for backfill, trimming orphaned tool returns.
|
||||
* Messages should already be in chronological order (oldest first).
|
||||
*/
|
||||
function prepareMessageHistory(messages: Message[]): Message[] {
|
||||
const historyCount = Math.min(MESSAGE_HISTORY_LIMIT, messages.length);
|
||||
let messageHistory = messages.slice(-historyCount);
|
||||
// Exported for tests: resume UX depends on strict message-type filtering.
|
||||
export function prepareMessageHistory(
|
||||
messages: Message[],
|
||||
opts?: { primaryOnly?: boolean },
|
||||
): Message[] {
|
||||
const isRenderable = (msg: Message): boolean => {
|
||||
const t = msg.message_type;
|
||||
if (
|
||||
t === "user_message" ||
|
||||
t === "assistant_message" ||
|
||||
t === "reasoning_message" ||
|
||||
t === "tool_call_message" ||
|
||||
t === "tool_return_message" ||
|
||||
t === "approval_request_message" ||
|
||||
t === "approval_response_message"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
// Newer servers may include extra message types (event/summary) that we render in backfill.
|
||||
const ts = t as string | undefined;
|
||||
return ts === "event_message" || ts === "summary_message";
|
||||
};
|
||||
|
||||
const renderable = messages.filter(isRenderable);
|
||||
if (opts?.primaryOnly) {
|
||||
// Resume view should prioritize the actual conversation (user/assistant + events).
|
||||
// Reasoning can be extremely tool-step heavy and will crowd out assistant messages.
|
||||
const convo = renderable.filter((m) =>
|
||||
isPrimaryMessageType(m.message_type),
|
||||
);
|
||||
let trimmed = convo.slice(-BACKFILL_PRIMARY_MESSAGE_LIMIT);
|
||||
|
||||
// Hardening: if the last N items are all user/system-y content, ensure we
|
||||
// still include the most recent assistant message when one exists.
|
||||
const hasAssistant = trimmed.some(
|
||||
(m) => m.message_type === "assistant_message",
|
||||
);
|
||||
if (!hasAssistant) {
|
||||
const lastAssistantIndex = convo
|
||||
.map((m) => m.message_type)
|
||||
.lastIndexOf("assistant_message");
|
||||
if (lastAssistantIndex >= 0) {
|
||||
const start = Math.max(
|
||||
0,
|
||||
lastAssistantIndex - (BACKFILL_PRIMARY_MESSAGE_LIMIT - 1),
|
||||
);
|
||||
trimmed = convo.slice(start, start + BACKFILL_PRIMARY_MESSAGE_LIMIT);
|
||||
}
|
||||
}
|
||||
if (trimmed.length > 0) return trimmed;
|
||||
|
||||
// If we have no user/assistant/event/summary (rare), fall back to reasoning.
|
||||
// If reasoning is also absent, show a small tail of whatever renderable
|
||||
// messages exist so resume isn't blank.
|
||||
const reasoning = renderable.filter(
|
||||
(m) => m.message_type === "reasoning_message",
|
||||
);
|
||||
if (reasoning.length > 0) {
|
||||
return reasoning.slice(-BACKFILL_PRIMARY_MESSAGE_LIMIT);
|
||||
}
|
||||
// Last resort: show a small reasoning-only slice.
|
||||
// Do not fall back to tool chatter.
|
||||
return [];
|
||||
}
|
||||
|
||||
// Walk backwards until we've captured enough "primary" messages to anchor
|
||||
// the replay (user/assistant/reasoning + high-level events), but include tool
|
||||
// messages in-between so the last turn still makes sense.
|
||||
const isPrimary = (msg: Message): boolean => {
|
||||
const t = msg.message_type;
|
||||
return (
|
||||
t === "user_message" ||
|
||||
t === "assistant_message" ||
|
||||
t === "reasoning_message" ||
|
||||
(t as string | undefined) === "event_message" ||
|
||||
(t as string | undefined) === "summary_message"
|
||||
);
|
||||
};
|
||||
|
||||
let primaryCount = 0;
|
||||
let startIndex = Math.max(0, renderable.length - 1);
|
||||
for (let i = renderable.length - 1; i >= 0; i -= 1) {
|
||||
const msg = renderable[i];
|
||||
if (!msg) continue;
|
||||
if (isPrimary(msg)) {
|
||||
primaryCount += 1;
|
||||
if (primaryCount >= BACKFILL_PRIMARY_MESSAGE_LIMIT) {
|
||||
startIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
startIndex = i;
|
||||
}
|
||||
|
||||
let messageHistory = renderable.slice(startIndex);
|
||||
if (messageHistory.length > BACKFILL_MAX_RENDERABLE_MESSAGES) {
|
||||
messageHistory = messageHistory.slice(-BACKFILL_MAX_RENDERABLE_MESSAGES);
|
||||
}
|
||||
|
||||
// Skip if starts with orphaned tool_return (incomplete turn)
|
||||
if (messageHistory[0]?.message_type === "tool_return_message") {
|
||||
@@ -107,13 +233,82 @@ function prepareMessageHistory(messages: Message[]): Message[] {
|
||||
*/
|
||||
function sortChronological(messages: Message[]): Message[] {
|
||||
return [...messages].sort((a, b) => {
|
||||
// All message types have 'date' field
|
||||
const dateA = a.date ?? "";
|
||||
const dateB = b.date ?? "";
|
||||
return new Date(dateA).getTime() - new Date(dateB).getTime();
|
||||
// All message types *should* have 'date', but be defensive.
|
||||
const ta = a.date ? new Date(a.date).getTime() : 0;
|
||||
const tb = b.date ? new Date(b.date).getTime() : 0;
|
||||
if (!Number.isFinite(ta) && !Number.isFinite(tb)) return 0;
|
||||
if (!Number.isFinite(ta)) return -1;
|
||||
if (!Number.isFinite(tb)) return 1;
|
||||
return ta - tb;
|
||||
});
|
||||
}
|
||||
|
||||
async function fetchConversationBackfillMessages(
|
||||
client: Letta,
|
||||
conversationId: string,
|
||||
): Promise<Message[]> {
|
||||
const collected: Message[] = [];
|
||||
// Messages can have multiple variants with the same id (e.g. approval_request + reasoning).
|
||||
// Dedupe using a key that preserves distinct variants while still preventing
|
||||
// overlap across pagination pages.
|
||||
const seen = new Set<string>();
|
||||
|
||||
let cursorBefore: string | null = null;
|
||||
let assistantCount = 0;
|
||||
let anchorCount = 0;
|
||||
|
||||
for (let pageIndex = 0; pageIndex < BACKFILL_MAX_PAGES; pageIndex += 1) {
|
||||
const page = await client.conversations.messages.list(conversationId, {
|
||||
limit: BACKFILL_PAGE_LIMIT,
|
||||
order: "desc",
|
||||
...(cursorBefore ? { before: cursorBefore } : {}),
|
||||
});
|
||||
const items = page.getPaginatedItems();
|
||||
if (items.length === 0) break;
|
||||
|
||||
// items are newest->oldest; use the last item as our "before" cursor.
|
||||
cursorBefore = items[items.length - 1]?.id ?? null;
|
||||
|
||||
for (const m of items) {
|
||||
if (!m?.id) continue;
|
||||
|
||||
// Prefer otid when available (it is unique across variants). Otherwise,
|
||||
// include message_type to avoid dropping variants that share ids.
|
||||
const key =
|
||||
"otid" in m && (m as { otid?: unknown }).otid
|
||||
? `otid:${String((m as { otid?: unknown }).otid)}`
|
||||
: `id:${m.id}:${m.message_type ?? ""}`;
|
||||
|
||||
if (seen.has(key)) continue;
|
||||
seen.add(key);
|
||||
collected.push(m);
|
||||
|
||||
if (m.message_type === "assistant_message") assistantCount += 1;
|
||||
if (isAnchorMessageType(m.message_type)) anchorCount += 1;
|
||||
}
|
||||
|
||||
// Stop once we can confidently show a good recent slice.
|
||||
if (
|
||||
assistantCount >= BACKFILL_MIN_ASSISTANT &&
|
||||
anchorCount >= BACKFILL_ANCHOR_MESSAGE_LIMIT
|
||||
) {
|
||||
break;
|
||||
}
|
||||
|
||||
// If the server returned fewer than requested, we're likely at the end.
|
||||
if (items.length < BACKFILL_PAGE_LIMIT) break;
|
||||
}
|
||||
|
||||
if (assistantCount < BACKFILL_MIN_ASSISTANT) {
|
||||
debugWarn(
|
||||
"check-approval",
|
||||
`Backfill scan found 0 assistant messages in last ${collected.length} messages (tool-heavy conversation?)`,
|
||||
);
|
||||
}
|
||||
|
||||
return sortChronological(collected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets data needed to resume an agent session.
|
||||
* Checks for pending approvals and retrieves recent message history for backfill.
|
||||
@@ -157,17 +352,16 @@ export async function getResumeData(
|
||||
);
|
||||
if (isBackfillEnabled()) {
|
||||
try {
|
||||
const backfill = await client.conversations.messages.list(
|
||||
const backfill = await fetchConversationBackfillMessages(
|
||||
client,
|
||||
conversationId,
|
||||
{
|
||||
limit: MESSAGE_HISTORY_LIMIT,
|
||||
order: "desc",
|
||||
},
|
||||
);
|
||||
return {
|
||||
pendingApproval: null,
|
||||
pendingApprovals: [],
|
||||
messageHistory: sortChronological(backfill.getPaginatedItems()),
|
||||
messageHistory: prepareMessageHistory(backfill, {
|
||||
primaryOnly: true,
|
||||
}),
|
||||
};
|
||||
} catch (backfillError) {
|
||||
debugWarn(
|
||||
@@ -195,14 +389,10 @@ export async function getResumeData(
|
||||
// Wrapped in try/catch so backfill failures don't crash the CLI
|
||||
if (isBackfillEnabled()) {
|
||||
try {
|
||||
const backfillPage = await client.conversations.messages.list(
|
||||
messages = await fetchConversationBackfillMessages(
|
||||
client,
|
||||
conversationId,
|
||||
{
|
||||
limit: MESSAGE_HISTORY_LIMIT,
|
||||
order: "desc",
|
||||
},
|
||||
);
|
||||
messages = sortChronological(backfillPage.getPaginatedItems());
|
||||
} catch (backfillError) {
|
||||
debugWarn(
|
||||
"check-approval",
|
||||
@@ -234,7 +424,9 @@ export async function getResumeData(
|
||||
return {
|
||||
pendingApproval,
|
||||
pendingApprovals,
|
||||
messageHistory: prepareMessageHistory(messages),
|
||||
messageHistory: prepareMessageHistory(messages, {
|
||||
primaryOnly: true,
|
||||
}),
|
||||
};
|
||||
}
|
||||
} else {
|
||||
@@ -247,7 +439,7 @@ export async function getResumeData(
|
||||
return {
|
||||
pendingApproval: null,
|
||||
pendingApprovals: [],
|
||||
messageHistory: prepareMessageHistory(messages),
|
||||
messageHistory: prepareMessageHistory(messages, { primaryOnly: true }),
|
||||
};
|
||||
} else {
|
||||
// Use agent messages API for "default" conversation or when no conversation ID
|
||||
@@ -282,7 +474,7 @@ export async function getResumeData(
|
||||
if (isBackfillEnabled()) {
|
||||
try {
|
||||
const messagesPage = await client.agents.messages.list(agent.id, {
|
||||
limit: MESSAGE_HISTORY_LIMIT,
|
||||
limit: BACKFILL_PAGE_LIMIT,
|
||||
order: "desc",
|
||||
conversation_id: "default", // Key: filter to default conversation only
|
||||
});
|
||||
@@ -290,7 +482,7 @@ export async function getResumeData(
|
||||
|
||||
if (process.env.DEBUG) {
|
||||
console.log(
|
||||
`[DEBUG] agents.messages.list(conversation_id=default) returned ${messagesPage.items.length} messages`,
|
||||
`[DEBUG] agents.messages.list(conversation_id=default) returned ${messages.length} messages`,
|
||||
);
|
||||
}
|
||||
} catch (backfillError) {
|
||||
@@ -322,7 +514,9 @@ export async function getResumeData(
|
||||
return {
|
||||
pendingApproval,
|
||||
pendingApprovals,
|
||||
messageHistory: prepareMessageHistory(messages),
|
||||
messageHistory: prepareMessageHistory(messages, {
|
||||
primaryOnly: true,
|
||||
}),
|
||||
};
|
||||
}
|
||||
} else {
|
||||
@@ -335,7 +529,7 @@ export async function getResumeData(
|
||||
return {
|
||||
pendingApproval: null,
|
||||
pendingApprovals: [],
|
||||
messageHistory: prepareMessageHistory(messages),
|
||||
messageHistory: prepareMessageHistory(messages, { primaryOnly: true }),
|
||||
};
|
||||
}
|
||||
} catch (error) {
|
||||
|
||||
105
src/tests/cli/prepareMessageHistory.test.ts
Normal file
105
src/tests/cli/prepareMessageHistory.test.ts
Normal file
@@ -0,0 +1,105 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import type { Message } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import { prepareMessageHistory } from "../../agent/check-approval";
|
||||
|
||||
function msg(
|
||||
type: string,
|
||||
id: string,
|
||||
dateMs: number,
|
||||
extra?: Record<string, unknown>,
|
||||
): Message {
|
||||
return {
|
||||
id,
|
||||
message_type: type,
|
||||
date: new Date(dateMs).toISOString(),
|
||||
...(extra ?? {}),
|
||||
} as unknown as Message;
|
||||
}
|
||||
|
||||
describe("prepareMessageHistory", () => {
|
||||
test("primaryOnly returns only primary message types", () => {
|
||||
const base = 1_700_000_000_000;
|
||||
const messages: Message[] = [
|
||||
msg("user_message", "u1", base + 1),
|
||||
msg("tool_call_message", "tc1", base + 2),
|
||||
msg("approval_request_message", "ar1", base + 3),
|
||||
msg("tool_return_message", "tr1", base + 4),
|
||||
msg("assistant_message", "a1", base + 5),
|
||||
msg("reasoning_message", "r1", base + 6),
|
||||
msg("approval_response_message", "ap1", base + 7),
|
||||
msg("event_message", "e1", base + 8),
|
||||
msg("summary_message", "s1", base + 9),
|
||||
];
|
||||
|
||||
const out = prepareMessageHistory(messages, { primaryOnly: true });
|
||||
expect(out.map((m) => m.message_type)).toEqual([
|
||||
"user_message",
|
||||
"assistant_message",
|
||||
"event_message",
|
||||
"summary_message",
|
||||
]);
|
||||
});
|
||||
|
||||
test("primaryOnly includes most recent assistant even if last N primary messages lack it", () => {
|
||||
const base = 1_700_000_000_000;
|
||||
const messages: Message[] = [];
|
||||
|
||||
// An older assistant message, then many user/event messages.
|
||||
messages.push(msg("assistant_message", "a1", base + 1));
|
||||
for (let i = 0; i < 30; i += 1) {
|
||||
messages.push(msg("user_message", `u${i}`, base + 10 + i));
|
||||
}
|
||||
|
||||
const out = prepareMessageHistory(messages, { primaryOnly: true });
|
||||
expect(out.some((m) => m.message_type === "assistant_message")).toBe(true);
|
||||
expect(
|
||||
out.every((m) =>
|
||||
[
|
||||
"user_message",
|
||||
"assistant_message",
|
||||
"event_message",
|
||||
"summary_message",
|
||||
].includes(m.message_type as string),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
test("primaryOnly falls back to reasoning when no primary messages exist", () => {
|
||||
const base = 1_700_000_000_000;
|
||||
const messages: Message[] = [
|
||||
msg("tool_return_message", "tr1", base + 1),
|
||||
msg("reasoning_message", "r1", base + 2),
|
||||
msg("tool_return_message", "tr2", base + 3),
|
||||
msg("reasoning_message", "r2", base + 4),
|
||||
];
|
||||
|
||||
const out = prepareMessageHistory(messages, { primaryOnly: true });
|
||||
expect(out.map((m) => m.message_type)).toEqual([
|
||||
"reasoning_message",
|
||||
"reasoning_message",
|
||||
]);
|
||||
});
|
||||
|
||||
test("primaryOnly returns [] when no primary or reasoning messages exist", () => {
|
||||
const base = 1_700_000_000_000;
|
||||
const messages: Message[] = [
|
||||
msg("tool_return_message", "tr1", base + 1),
|
||||
msg("approval_request_message", "ar1", base + 2),
|
||||
msg("approval_response_message", "ap1", base + 3),
|
||||
];
|
||||
|
||||
const out = prepareMessageHistory(messages, { primaryOnly: true });
|
||||
expect(out).toEqual([]);
|
||||
});
|
||||
|
||||
test("non-primaryOnly skips orphaned leading tool_return_message", () => {
|
||||
const base = 1_700_000_000_000;
|
||||
const messages: Message[] = [
|
||||
msg("tool_return_message", "tr1", base + 1),
|
||||
msg("assistant_message", "a1", base + 2),
|
||||
];
|
||||
|
||||
const out = prepareMessageHistory(messages);
|
||||
expect(out[0]?.message_type).toBe("assistant_message");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user