diff --git a/src/agent/check-approval.ts b/src/agent/check-approval.ts index 311ca85..6f27f99 100644 --- a/src/agent/check-approval.ts +++ b/src/agent/check-approval.ts @@ -8,8 +8,39 @@ import type { Message } from "@letta-ai/letta-client/resources/agents/messages"; import type { ApprovalRequest } from "../cli/helpers/stream"; import { debugWarn } from "../utils/debug"; -// Number of recent messages to backfill when resuming a session -const MESSAGE_HISTORY_LIMIT = 15; +// Backfill should feel like "the last turn(s)", not "the last N raw messages". +// Tool-heavy turns can generate many tool_call/tool_return messages that would +// otherwise push the most recent assistant/user messages out of the window. +const BACKFILL_PRIMARY_MESSAGE_LIMIT = 12; // user/assistant/reasoning/event/summary +const BACKFILL_MAX_RENDERABLE_MESSAGES = 80; // safety cap + +// Note: We intentionally do not include tool-call / tool-return chatter in the +// resume backfill. Pending approvals are handled via `pendingApprovals` and +// shown separately in the UI. Including tool logs here makes resume feel like a +// corrupted replay when the last "turn" was tool-heavy. + +// Stop fetching once we have enough actual conversational anchors. +// Reasoning can be extremely tool-step heavy, so anchor on user/assistant. +const BACKFILL_ANCHOR_MESSAGE_LIMIT = 6; + +// We fetch more than we render so tool-heavy turns don't push the last +// user-visible assistant message out of the backfill window. +const BACKFILL_PAGE_LIMIT = 200; +const BACKFILL_MAX_PAGES = 25; // 5k messages max +const BACKFILL_MIN_ASSISTANT = 1; + +function isPrimaryMessageType(messageType: string | undefined): boolean { + return ( + messageType === "user_message" || + messageType === "assistant_message" || + messageType === "event_message" || + messageType === "summary_message" + ); +} + +function isAnchorMessageType(messageType: string | undefined): boolean { + return messageType === "user_message" || messageType === "assistant_message"; +} /** * Check if message backfilling is enabled via LETTA_BACKFILL env var. @@ -89,9 +120,104 @@ export function extractApprovals(messageToCheck: Message): { * Prepare message history for backfill, trimming orphaned tool returns. * Messages should already be in chronological order (oldest first). */ -function prepareMessageHistory(messages: Message[]): Message[] { - const historyCount = Math.min(MESSAGE_HISTORY_LIMIT, messages.length); - let messageHistory = messages.slice(-historyCount); +// Exported for tests: resume UX depends on strict message-type filtering. +export function prepareMessageHistory( + messages: Message[], + opts?: { primaryOnly?: boolean }, +): Message[] { + const isRenderable = (msg: Message): boolean => { + const t = msg.message_type; + if ( + t === "user_message" || + t === "assistant_message" || + t === "reasoning_message" || + t === "tool_call_message" || + t === "tool_return_message" || + t === "approval_request_message" || + t === "approval_response_message" + ) { + return true; + } + // Newer servers may include extra message types (event/summary) that we render in backfill. + const ts = t as string | undefined; + return ts === "event_message" || ts === "summary_message"; + }; + + const renderable = messages.filter(isRenderable); + if (opts?.primaryOnly) { + // Resume view should prioritize the actual conversation (user/assistant + events). + // Reasoning can be extremely tool-step heavy and will crowd out assistant messages. + const convo = renderable.filter((m) => + isPrimaryMessageType(m.message_type), + ); + let trimmed = convo.slice(-BACKFILL_PRIMARY_MESSAGE_LIMIT); + + // Hardening: if the last N items are all user/system-y content, ensure we + // still include the most recent assistant message when one exists. + const hasAssistant = trimmed.some( + (m) => m.message_type === "assistant_message", + ); + if (!hasAssistant) { + const lastAssistantIndex = convo + .map((m) => m.message_type) + .lastIndexOf("assistant_message"); + if (lastAssistantIndex >= 0) { + const start = Math.max( + 0, + lastAssistantIndex - (BACKFILL_PRIMARY_MESSAGE_LIMIT - 1), + ); + trimmed = convo.slice(start, start + BACKFILL_PRIMARY_MESSAGE_LIMIT); + } + } + if (trimmed.length > 0) return trimmed; + + // If we have no user/assistant/event/summary (rare), fall back to reasoning. + // If reasoning is also absent, show a small tail of whatever renderable + // messages exist so resume isn't blank. + const reasoning = renderable.filter( + (m) => m.message_type === "reasoning_message", + ); + if (reasoning.length > 0) { + return reasoning.slice(-BACKFILL_PRIMARY_MESSAGE_LIMIT); + } + // Last resort: show a small reasoning-only slice. + // Do not fall back to tool chatter. + return []; + } + + // Walk backwards until we've captured enough "primary" messages to anchor + // the replay (user/assistant/reasoning + high-level events), but include tool + // messages in-between so the last turn still makes sense. + const isPrimary = (msg: Message): boolean => { + const t = msg.message_type; + return ( + t === "user_message" || + t === "assistant_message" || + t === "reasoning_message" || + (t as string | undefined) === "event_message" || + (t as string | undefined) === "summary_message" + ); + }; + + let primaryCount = 0; + let startIndex = Math.max(0, renderable.length - 1); + for (let i = renderable.length - 1; i >= 0; i -= 1) { + const msg = renderable[i]; + if (!msg) continue; + if (isPrimary(msg)) { + primaryCount += 1; + if (primaryCount >= BACKFILL_PRIMARY_MESSAGE_LIMIT) { + startIndex = i; + break; + } + } + startIndex = i; + } + + let messageHistory = renderable.slice(startIndex); + if (messageHistory.length > BACKFILL_MAX_RENDERABLE_MESSAGES) { + messageHistory = messageHistory.slice(-BACKFILL_MAX_RENDERABLE_MESSAGES); + } // Skip if starts with orphaned tool_return (incomplete turn) if (messageHistory[0]?.message_type === "tool_return_message") { @@ -107,13 +233,82 @@ function prepareMessageHistory(messages: Message[]): Message[] { */ function sortChronological(messages: Message[]): Message[] { return [...messages].sort((a, b) => { - // All message types have 'date' field - const dateA = a.date ?? ""; - const dateB = b.date ?? ""; - return new Date(dateA).getTime() - new Date(dateB).getTime(); + // All message types *should* have 'date', but be defensive. + const ta = a.date ? new Date(a.date).getTime() : 0; + const tb = b.date ? new Date(b.date).getTime() : 0; + if (!Number.isFinite(ta) && !Number.isFinite(tb)) return 0; + if (!Number.isFinite(ta)) return -1; + if (!Number.isFinite(tb)) return 1; + return ta - tb; }); } +async function fetchConversationBackfillMessages( + client: Letta, + conversationId: string, +): Promise { + const collected: Message[] = []; + // Messages can have multiple variants with the same id (e.g. approval_request + reasoning). + // Dedupe using a key that preserves distinct variants while still preventing + // overlap across pagination pages. + const seen = new Set(); + + let cursorBefore: string | null = null; + let assistantCount = 0; + let anchorCount = 0; + + for (let pageIndex = 0; pageIndex < BACKFILL_MAX_PAGES; pageIndex += 1) { + const page = await client.conversations.messages.list(conversationId, { + limit: BACKFILL_PAGE_LIMIT, + order: "desc", + ...(cursorBefore ? { before: cursorBefore } : {}), + }); + const items = page.getPaginatedItems(); + if (items.length === 0) break; + + // items are newest->oldest; use the last item as our "before" cursor. + cursorBefore = items[items.length - 1]?.id ?? null; + + for (const m of items) { + if (!m?.id) continue; + + // Prefer otid when available (it is unique across variants). Otherwise, + // include message_type to avoid dropping variants that share ids. + const key = + "otid" in m && (m as { otid?: unknown }).otid + ? `otid:${String((m as { otid?: unknown }).otid)}` + : `id:${m.id}:${m.message_type ?? ""}`; + + if (seen.has(key)) continue; + seen.add(key); + collected.push(m); + + if (m.message_type === "assistant_message") assistantCount += 1; + if (isAnchorMessageType(m.message_type)) anchorCount += 1; + } + + // Stop once we can confidently show a good recent slice. + if ( + assistantCount >= BACKFILL_MIN_ASSISTANT && + anchorCount >= BACKFILL_ANCHOR_MESSAGE_LIMIT + ) { + break; + } + + // If the server returned fewer than requested, we're likely at the end. + if (items.length < BACKFILL_PAGE_LIMIT) break; + } + + if (assistantCount < BACKFILL_MIN_ASSISTANT) { + debugWarn( + "check-approval", + `Backfill scan found 0 assistant messages in last ${collected.length} messages (tool-heavy conversation?)`, + ); + } + + return sortChronological(collected); +} + /** * Gets data needed to resume an agent session. * Checks for pending approvals and retrieves recent message history for backfill. @@ -157,17 +352,16 @@ export async function getResumeData( ); if (isBackfillEnabled()) { try { - const backfill = await client.conversations.messages.list( + const backfill = await fetchConversationBackfillMessages( + client, conversationId, - { - limit: MESSAGE_HISTORY_LIMIT, - order: "desc", - }, ); return { pendingApproval: null, pendingApprovals: [], - messageHistory: sortChronological(backfill.getPaginatedItems()), + messageHistory: prepareMessageHistory(backfill, { + primaryOnly: true, + }), }; } catch (backfillError) { debugWarn( @@ -195,14 +389,10 @@ export async function getResumeData( // Wrapped in try/catch so backfill failures don't crash the CLI if (isBackfillEnabled()) { try { - const backfillPage = await client.conversations.messages.list( + messages = await fetchConversationBackfillMessages( + client, conversationId, - { - limit: MESSAGE_HISTORY_LIMIT, - order: "desc", - }, ); - messages = sortChronological(backfillPage.getPaginatedItems()); } catch (backfillError) { debugWarn( "check-approval", @@ -234,7 +424,9 @@ export async function getResumeData( return { pendingApproval, pendingApprovals, - messageHistory: prepareMessageHistory(messages), + messageHistory: prepareMessageHistory(messages, { + primaryOnly: true, + }), }; } } else { @@ -247,7 +439,7 @@ export async function getResumeData( return { pendingApproval: null, pendingApprovals: [], - messageHistory: prepareMessageHistory(messages), + messageHistory: prepareMessageHistory(messages, { primaryOnly: true }), }; } else { // Use agent messages API for "default" conversation or when no conversation ID @@ -282,7 +474,7 @@ export async function getResumeData( if (isBackfillEnabled()) { try { const messagesPage = await client.agents.messages.list(agent.id, { - limit: MESSAGE_HISTORY_LIMIT, + limit: BACKFILL_PAGE_LIMIT, order: "desc", conversation_id: "default", // Key: filter to default conversation only }); @@ -290,7 +482,7 @@ export async function getResumeData( if (process.env.DEBUG) { console.log( - `[DEBUG] agents.messages.list(conversation_id=default) returned ${messagesPage.items.length} messages`, + `[DEBUG] agents.messages.list(conversation_id=default) returned ${messages.length} messages`, ); } } catch (backfillError) { @@ -322,7 +514,9 @@ export async function getResumeData( return { pendingApproval, pendingApprovals, - messageHistory: prepareMessageHistory(messages), + messageHistory: prepareMessageHistory(messages, { + primaryOnly: true, + }), }; } } else { @@ -335,7 +529,7 @@ export async function getResumeData( return { pendingApproval: null, pendingApprovals: [], - messageHistory: prepareMessageHistory(messages), + messageHistory: prepareMessageHistory(messages, { primaryOnly: true }), }; } } catch (error) { diff --git a/src/tests/cli/prepareMessageHistory.test.ts b/src/tests/cli/prepareMessageHistory.test.ts new file mode 100644 index 0000000..d5d6f7c --- /dev/null +++ b/src/tests/cli/prepareMessageHistory.test.ts @@ -0,0 +1,105 @@ +import { describe, expect, test } from "bun:test"; +import type { Message } from "@letta-ai/letta-client/resources/agents/messages"; +import { prepareMessageHistory } from "../../agent/check-approval"; + +function msg( + type: string, + id: string, + dateMs: number, + extra?: Record, +): Message { + return { + id, + message_type: type, + date: new Date(dateMs).toISOString(), + ...(extra ?? {}), + } as unknown as Message; +} + +describe("prepareMessageHistory", () => { + test("primaryOnly returns only primary message types", () => { + const base = 1_700_000_000_000; + const messages: Message[] = [ + msg("user_message", "u1", base + 1), + msg("tool_call_message", "tc1", base + 2), + msg("approval_request_message", "ar1", base + 3), + msg("tool_return_message", "tr1", base + 4), + msg("assistant_message", "a1", base + 5), + msg("reasoning_message", "r1", base + 6), + msg("approval_response_message", "ap1", base + 7), + msg("event_message", "e1", base + 8), + msg("summary_message", "s1", base + 9), + ]; + + const out = prepareMessageHistory(messages, { primaryOnly: true }); + expect(out.map((m) => m.message_type)).toEqual([ + "user_message", + "assistant_message", + "event_message", + "summary_message", + ]); + }); + + test("primaryOnly includes most recent assistant even if last N primary messages lack it", () => { + const base = 1_700_000_000_000; + const messages: Message[] = []; + + // An older assistant message, then many user/event messages. + messages.push(msg("assistant_message", "a1", base + 1)); + for (let i = 0; i < 30; i += 1) { + messages.push(msg("user_message", `u${i}`, base + 10 + i)); + } + + const out = prepareMessageHistory(messages, { primaryOnly: true }); + expect(out.some((m) => m.message_type === "assistant_message")).toBe(true); + expect( + out.every((m) => + [ + "user_message", + "assistant_message", + "event_message", + "summary_message", + ].includes(m.message_type as string), + ), + ).toBe(true); + }); + + test("primaryOnly falls back to reasoning when no primary messages exist", () => { + const base = 1_700_000_000_000; + const messages: Message[] = [ + msg("tool_return_message", "tr1", base + 1), + msg("reasoning_message", "r1", base + 2), + msg("tool_return_message", "tr2", base + 3), + msg("reasoning_message", "r2", base + 4), + ]; + + const out = prepareMessageHistory(messages, { primaryOnly: true }); + expect(out.map((m) => m.message_type)).toEqual([ + "reasoning_message", + "reasoning_message", + ]); + }); + + test("primaryOnly returns [] when no primary or reasoning messages exist", () => { + const base = 1_700_000_000_000; + const messages: Message[] = [ + msg("tool_return_message", "tr1", base + 1), + msg("approval_request_message", "ar1", base + 2), + msg("approval_response_message", "ap1", base + 3), + ]; + + const out = prepareMessageHistory(messages, { primaryOnly: true }); + expect(out).toEqual([]); + }); + + test("non-primaryOnly skips orphaned leading tool_return_message", () => { + const base = 1_700_000_000_000; + const messages: Message[] = [ + msg("tool_return_message", "tr1", base + 1), + msg("assistant_message", "a1", base + 2), + ]; + + const out = prepareMessageHistory(messages); + expect(out[0]?.message_type).toBe("assistant_message"); + }); +});