test(integration): reduce flaky startup/headless timeout failures (#1109)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-23 17:40:41 -08:00
committed by GitHub
parent c58d5f1e07
commit 75c7dd793b
10 changed files with 315 additions and 72 deletions

View File

@@ -58,6 +58,14 @@ export interface ResumeData {
messageHistory: Message[];
}
export interface GetResumeDataOptions {
/**
* Controls whether backfill message history should be fetched.
* Defaults to true to preserve existing /resume behavior.
*/
includeMessageHistory?: boolean;
}
/**
* Extract approval requests from an approval_request_message.
* Exported for testing parallel tool call handling.
@@ -327,8 +335,10 @@ export async function getResumeData(
client: Letta,
agent: AgentState,
conversationId?: string,
options: GetResumeDataOptions = {},
): Promise<ResumeData> {
try {
const includeMessageHistory = options.includeMessageHistory ?? true;
let inContextMessageIds: string[] | null | undefined;
let messages: Message[] = [];
@@ -352,7 +362,7 @@ export async function getResumeData(
"check-approval",
"No in-context messages - no pending approvals",
);
if (isBackfillEnabled()) {
if (includeMessageHistory && isBackfillEnabled()) {
try {
const backfill = await fetchConversationBackfillMessages(
client,
@@ -389,7 +399,7 @@ export async function getResumeData(
// Fetch message history separately for backfill (desc then reverse for last N chronological)
// Wrapped in try/catch so backfill failures don't crash the CLI
if (isBackfillEnabled()) {
if (includeMessageHistory && isBackfillEnabled()) {
try {
messages = await fetchConversationBackfillMessages(
client,
@@ -473,7 +483,7 @@ export async function getResumeData(
// This filters to only the default conversation's messages (like the ADE does)
// Wrapped in try/catch so backfill failures don't crash the CLI (e.g., older servers
// may not support conversation_id filter)
if (isBackfillEnabled()) {
if (includeMessageHistory && isBackfillEnabled()) {
try {
const messagesPage = await client.agents.messages.list(agent.id, {
limit: BACKFILL_PAGE_LIMIT,

View File

@@ -2742,6 +2742,33 @@ async function runBidirectionalMode(
console.log(JSON.stringify(registerResponse));
} else if (subtype === "bootstrap_session_state") {
const bootstrapReq = message.request as BootstrapSessionStateRequest;
const { getResumeData } = await import("./agent/check-approval");
let hasPendingApproval = false;
try {
// Re-fetch for parity with approval checks elsewhere in headless mode.
const freshAgent = await client.agents.retrieve(agent.id);
const resume = await getResumeData(
client,
freshAgent,
conversationId,
{
includeMessageHistory: false,
},
);
hasPendingApproval = (resume.pendingApprovals?.length ?? 0) > 0;
} catch (error) {
// Keep bootstrap non-fatal if approval probe fails on stale resources.
if (
!(error instanceof APIError) ||
(error.status !== 404 && error.status !== 422)
) {
console.warn(
`[bootstrap] pending-approval probe failed: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
const bootstrapResp = await handleBootstrapSessionState({
bootstrapReq,
sessionContext: {
@@ -2754,7 +2781,7 @@ async function runBidirectionalMode(
},
requestId: requestId ?? "",
client,
hasPendingApproval: false, // TODO: wire approval state when available
hasPendingApproval,
});
console.log(JSON.stringify(bootstrapResp));
} else if (subtype === "list_messages") {

View File

@@ -40,7 +40,7 @@ async function runBidirectional(
"stream-json",
"--new-agent",
"-m",
"haiku",
"sonnet-4.6-low",
"--yolo",
...extraArgs,
],
@@ -202,6 +202,31 @@ async function runBidirectional(
});
}
async function runBidirectionalWithRetry(
inputs: string[],
extraArgs: string[] = [],
timeoutMs = 180000,
retryOnTimeouts = 1,
): Promise<object[]> {
let attempt = 0;
while (true) {
try {
return await runBidirectional(inputs, extraArgs, timeoutMs);
} catch (error) {
const isTimeoutError =
error instanceof Error && error.message.includes("Timeout after");
if (!isTimeoutError || attempt >= retryOnTimeouts) {
throw error;
}
attempt += 1;
// CI API latency can cause occasional long-tail timeouts.
console.warn(
`[headless-input-format] retrying after timeout (${attempt}/${retryOnTimeouts})`,
);
}
}
}
describe("input-format stream-json", () => {
test(
"initialize control request returns session info",
@@ -299,7 +324,7 @@ describe("input-format stream-json", () => {
"multi-turn conversation maintains context",
async () => {
// Multi-turn test needs 2 sequential LLM calls, so allow more time
const objects = (await runBidirectional(
const objects = (await runBidirectionalWithRetry(
[
JSON.stringify({
type: "user",
@@ -318,6 +343,7 @@ describe("input-format stream-json", () => {
],
[], // no extra args
300000, // 300s for 2 sequential LLM calls - CI can be very slow
1, // one retry for transient API slowness
)) as WireMessage[];
// Should have at least two results (one per turn)

View File

@@ -29,7 +29,7 @@ async function runHeadlessCommand(
"stream-json",
"--yolo",
"-m",
"haiku",
"sonnet-4.6-low",
...extraArgs,
],
{

View File

@@ -43,7 +43,7 @@ interface StreamMessage {
* Run bidirectional test with custom message handling.
* Allows sending messages at specific points in the flow.
*/
async function runLazyRecoveryTest(timeoutMs = 180000): Promise<{
async function runLazyRecoveryTest(timeoutMs = 300000): Promise<{
messages: StreamMessage[];
success: boolean;
errorSeen: boolean;
@@ -61,7 +61,7 @@ async function runLazyRecoveryTest(timeoutMs = 180000): Promise<{
"stream-json",
"--new-agent",
"-m",
"haiku",
"sonnet-4.6-low",
// NOTE: No --yolo flag - approvals are required
],
{
@@ -291,7 +291,12 @@ async function runLazyRecoveryTest(timeoutMs = 180000): Promise<{
describe("lazy approval recovery", () => {
test("handles concurrent message while approval is pending", async () => {
const result = await runLazyRecoveryTest();
let result = await runLazyRecoveryTest();
if (!result.success) {
// Transient API/tool timing can occasionally miss the approval window;
// retry once before failing.
result = await runLazyRecoveryTest();
}
// Log messages for debugging if test fails
if (!result.success) {
@@ -333,5 +338,5 @@ describe("lazy approval recovery", () => {
"Note: No recovery message seen - approval may have been handled before conflict",
);
}
}, 180000); // 3 minute timeout for CI
}, 320000); // 5+ minute timeout for slow CI runners
});

View File

@@ -51,7 +51,7 @@ async function startPendingApprovalSession(
"--new-agent",
"--new",
"-m",
"haiku",
"sonnet-4.6-low",
],
{
cwd: process.cwd(),

View File

@@ -15,55 +15,77 @@ async function runCli(
options: {
timeoutMs?: number;
expectExit?: number;
retryOnTimeouts?: number;
} = {},
): Promise<{ stdout: string; stderr: string; exitCode: number | null }> {
const { timeoutMs = 30000, expectExit } = options;
const { timeoutMs = 30000, expectExit, retryOnTimeouts = 1 } = options;
return new Promise((resolve, reject) => {
const proc = spawn("bun", ["run", "dev", ...args], {
cwd: projectRoot,
// Mark as subagent to prevent polluting user's LRU settings
env: { ...process.env, LETTA_CODE_AGENT_ROLE: "subagent" },
});
const runOnce = () =>
new Promise<{ stdout: string; stderr: string; exitCode: number | null }>(
(resolve, reject) => {
const proc = spawn("bun", ["run", "dev", ...args], {
cwd: projectRoot,
// Mark as subagent to prevent polluting user's LRU settings
env: { ...process.env, LETTA_CODE_AGENT_ROLE: "subagent" },
});
let stdout = "";
let stderr = "";
let stdout = "";
let stderr = "";
proc.stdout?.on("data", (data) => {
stdout += data.toString();
});
proc.stdout?.on("data", (data) => {
stdout += data.toString();
});
proc.stderr?.on("data", (data) => {
stderr += data.toString();
});
proc.stderr?.on("data", (data) => {
stderr += data.toString();
});
const timeout = setTimeout(() => {
proc.kill();
reject(
new Error(
`Timeout after ${timeoutMs}ms. stdout: ${stdout}, stderr: ${stderr}`,
),
);
}, timeoutMs);
const timeout = setTimeout(() => {
proc.kill();
reject(
new Error(
`Timeout after ${timeoutMs}ms. stdout: ${stdout}, stderr: ${stderr}`,
),
);
}, timeoutMs);
proc.on("close", (code) => {
clearTimeout(timeout);
if (expectExit !== undefined && code !== expectExit) {
reject(
new Error(
`Expected exit code ${expectExit}, got ${code}. stdout: ${stdout}, stderr: ${stderr}`,
),
);
} else {
resolve({ stdout, stderr, exitCode: code });
proc.on("close", (code) => {
clearTimeout(timeout);
if (expectExit !== undefined && code !== expectExit) {
reject(
new Error(
`Expected exit code ${expectExit}, got ${code}. stdout: ${stdout}, stderr: ${stderr}`,
),
);
} else {
resolve({ stdout, stderr, exitCode: code });
}
});
proc.on("error", (err) => {
clearTimeout(timeout);
reject(err);
});
},
);
let attempt = 0;
while (true) {
try {
return await runOnce();
} catch (error) {
const isTimeoutError =
error instanceof Error && error.message.includes("Timeout after");
if (!isTimeoutError || attempt >= retryOnTimeouts) {
throw error;
}
});
proc.on("error", (err) => {
clearTimeout(timeout);
reject(err);
});
});
attempt += 1;
// CI API calls can be transiently slow; retry once to reduce flakiness.
console.warn(
`[startup-flow] retrying after timeout (${attempt}/${retryOnTimeouts}) args=${args.join(" ")}`,
);
}
}
}
// ============================================================================
@@ -123,13 +145,13 @@ describe("Startup Flow - Integration", () => {
[
"--new-agent",
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say OK and nothing else",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(result.exitCode).toBe(0);
@@ -141,7 +163,7 @@ describe("Startup Flow - Integration", () => {
testAgentId = output.agent_id;
},
{ timeout: 130000 },
{ timeout: 190000 },
);
test(
@@ -157,13 +179,13 @@ describe("Startup Flow - Integration", () => {
"--agent",
testAgentId,
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say OK",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(result.exitCode).toBe(0);
@@ -171,7 +193,7 @@ describe("Startup Flow - Integration", () => {
const output = JSON.parse(result.stdout.slice(jsonStart));
expect(output.agent_id).toBe(testAgentId);
},
{ timeout: 130000 },
{ timeout: 190000 },
);
test(
@@ -189,13 +211,13 @@ describe("Startup Flow - Integration", () => {
testAgentId,
"--new",
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say CREATED",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(createResult.exitCode).toBe(0);
const createJsonStart = createResult.stdout.indexOf("{");
@@ -211,13 +233,13 @@ describe("Startup Flow - Integration", () => {
"--conversation",
realConversationId,
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say OK",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(result.exitCode).toBe(0);
@@ -238,13 +260,13 @@ describe("Startup Flow - Integration", () => {
[
"--new-agent",
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say OK",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(bootstrapResult.exitCode).toBe(0);
const bootstrapJsonStart = bootstrapResult.stdout.indexOf("{");
@@ -262,13 +284,13 @@ describe("Startup Flow - Integration", () => {
"--conversation",
"default",
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say OK",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(result.exitCode).toBe(0);
@@ -277,7 +299,7 @@ describe("Startup Flow - Integration", () => {
expect(output.agent_id).toBe(agentIdForTest);
expect(output.conversation_id).toBe("default");
},
{ timeout: 130000 },
{ timeout: 190000 },
);
test(
@@ -289,13 +311,13 @@ describe("Startup Flow - Integration", () => {
"--init-blocks",
"none",
"-m",
"haiku",
"sonnet-4.6-low",
"-p",
"Say OK",
"--output-format",
"json",
],
{ timeoutMs: 120000 },
{ timeoutMs: 180000 },
);
expect(result.exitCode).toBe(0);
@@ -303,7 +325,7 @@ describe("Startup Flow - Integration", () => {
const output = JSON.parse(result.stdout.slice(jsonStart));
expect(output.agent_id).toBeDefined();
},
{ timeout: 130000 },
{ timeout: 190000 },
);
});

View File

@@ -0,0 +1,130 @@
import { describe, expect, mock, test } from "bun:test";
import type Letta from "@letta-ai/letta-client";
import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents";
import type { Message } from "@letta-ai/letta-client/resources/agents/messages";
import { getResumeData } from "../../agent/check-approval";
function makeAgent(overrides: Partial<AgentState> = {}): AgentState {
return {
id: "agent-test",
message_ids: ["msg-last"],
...overrides,
} as AgentState;
}
function makeApprovalMessage(id = "msg-last"): Message {
return {
id,
date: new Date().toISOString(),
message_type: "approval_request_message",
tool_calls: [
{
tool_call_id: "tool-1",
name: "Bash",
arguments: '{"command":"echo hi"}',
},
],
} as unknown as Message;
}
function makeUserMessage(id = "msg-last"): Message {
return {
id,
date: new Date().toISOString(),
message_type: "user_message",
} as Message;
}
describe("getResumeData", () => {
test("includeMessageHistory=false still computes pending approvals without backfill (conversation path)", async () => {
const conversationsRetrieve = mock(async () => ({
in_context_message_ids: ["msg-last"],
}));
const conversationsList = mock(async () => ({
getPaginatedItems: () => [],
}));
const agentsList = mock(async () => ({ items: [] }));
const messagesRetrieve = mock(async () => [makeApprovalMessage()]);
const client = {
conversations: {
retrieve: conversationsRetrieve,
messages: { list: conversationsList },
},
agents: { messages: { list: agentsList } },
messages: { retrieve: messagesRetrieve },
} as unknown as Letta;
const resume = await getResumeData(client, makeAgent(), "conv-abc", {
includeMessageHistory: false,
});
expect(conversationsRetrieve).toHaveBeenCalledTimes(1);
expect(messagesRetrieve).toHaveBeenCalledTimes(1);
expect(conversationsList).toHaveBeenCalledTimes(0);
expect(resume.pendingApprovals).toHaveLength(1);
expect(resume.pendingApprovals[0]?.toolName).toBe("Bash");
expect(resume.messageHistory).toEqual([]);
});
test("includeMessageHistory=false skips default-conversation backfill calls", async () => {
const conversationsRetrieve = mock(async () => ({
in_context_message_ids: ["msg-last"],
}));
const conversationsList = mock(async () => ({
getPaginatedItems: () => [],
}));
const agentsList = mock(async () => ({ items: [] }));
const messagesRetrieve = mock(async () => [makeApprovalMessage()]);
const client = {
conversations: {
retrieve: conversationsRetrieve,
messages: { list: conversationsList },
},
agents: { messages: { list: agentsList } },
messages: { retrieve: messagesRetrieve },
} as unknown as Letta;
const resume = await getResumeData(
client,
makeAgent({ message_ids: ["msg-last"] }),
"default",
{ includeMessageHistory: false },
);
expect(messagesRetrieve).toHaveBeenCalledTimes(1);
expect(agentsList).toHaveBeenCalledTimes(0);
expect(resume.pendingApprovals).toHaveLength(1);
expect(resume.messageHistory).toEqual([]);
});
test("default behavior keeps backfill enabled when options are omitted", async () => {
const conversationsRetrieve = mock(async () => ({
in_context_message_ids: ["msg-last"],
}));
const conversationsList = mock(async () => ({
getPaginatedItems: () => [],
}));
const agentsList = mock(async () => ({
items: [makeUserMessage("msg-a"), makeUserMessage("msg-b")],
}));
const messagesRetrieve = mock(async () => [makeUserMessage()]);
const client = {
conversations: {
retrieve: conversationsRetrieve,
messages: { list: conversationsList },
},
agents: { messages: { list: agentsList } },
messages: { retrieve: messagesRetrieve },
} as unknown as Letta;
const resume = await getResumeData(client, makeAgent(), "default");
expect(messagesRetrieve).toHaveBeenCalledTimes(1);
expect(agentsList).toHaveBeenCalledTimes(1);
expect(resume.pendingApprovals).toHaveLength(0);
expect(resume.messageHistory.length).toBeGreaterThan(0);
});
});

View File

@@ -0,0 +1,23 @@
import { describe, expect, test } from "bun:test";
import { readFileSync } from "node:fs";
import { fileURLToPath } from "node:url";
describe("bootstrap pending-approval wiring", () => {
test("bootstrap_session_state probes approvals via getResumeData without backfill", () => {
const headlessPath = fileURLToPath(
new URL("../../headless.ts", import.meta.url),
);
const source = readFileSync(headlessPath, "utf-8");
expect(source).toContain(
'const { getResumeData } = await import("./agent/check-approval");',
);
expect(source).toContain("includeMessageHistory: false");
expect(source).toContain(
"hasPendingApproval = (resume.pendingApprovals?.length ?? 0) > 0;",
);
expect(source).not.toContain(
"hasPendingApproval: false, // TODO: wire approval state when available",
);
});
});