From cd28bab41aea83d944f58657205e7f40c4600e81 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Fri, 2 Jan 2026 16:20:59 -0800 Subject: [PATCH] fix: make headless tests event-driven to reduce CI flakiness (#451) Co-authored-by: Letta --- src/tests/headless-input-format.test.ts | 269 +++++++++++------- src/tests/headless-stream-json-format.test.ts | 10 +- 2 files changed, 177 insertions(+), 102 deletions(-) diff --git a/src/tests/headless-input-format.test.ts b/src/tests/headless-input-format.test.ts index b42abe0..297a3eb 100644 --- a/src/tests/headless-input-format.test.ts +++ b/src/tests/headless-input-format.test.ts @@ -20,12 +20,12 @@ const FAST_PROMPT = /** * Helper to run bidirectional commands with stdin input. - * Sends input lines, waits for output, and returns parsed JSON lines. + * Event-driven: waits for init message before sending input, waits for result before closing. */ async function runBidirectional( inputs: string[], extraArgs: string[] = [], - waitMs = 12000, // Increased for slower CI environments (Linux ARM, Windows) + timeoutMs = 90000, // Overall timeout for the entire operation ): Promise { return new Promise((resolve, reject) => { const proc = spawn( @@ -50,66 +50,154 @@ async function runBidirectional( }, ); - let stdout = ""; - let stderr = ""; + const objects: object[] = []; + let buffer = ""; + let inputIndex = 0; + let initReceived = false; + let closing = false; + + // Count expected responses based on input types + const inputTypes = inputs.map((i) => { + try { + const parsed = JSON.parse(i); + return parsed.type; + } catch { + return "invalid"; // Invalid JSON + } + }); + const expectedUserResults = inputTypes.filter((t) => t === "user").length; + const expectedControlResponses = inputTypes.filter( + (t) => t === "control_request", + ).length; + const hasInvalidInput = inputTypes.includes("invalid"); + + let userResultsReceived = 0; + let controlResponsesReceived = 0; + + const maybeClose = () => { + if (closing) return; + + // For invalid input, close after receiving error + // For control requests only, close after all control_responses + // For user messages, close after all results + // For mixed, close when we have all expected responses + + const allUserResultsDone = + expectedUserResults === 0 || userResultsReceived >= expectedUserResults; + const allControlResponsesDone = + expectedControlResponses === 0 || + controlResponsesReceived >= expectedControlResponses; + const allInputsSent = inputIndex >= inputs.length; + + if (allInputsSent && allUserResultsDone && allControlResponsesDone) { + closing = true; + setTimeout(() => proc.stdin?.end(), 500); + } + }; + + const processLine = (line: string) => { + if (!line.trim()) return; + try { + const obj = JSON.parse(line); + objects.push(obj); + + // Check for init message - signal to start sending inputs + if (obj.type === "system" && obj.subtype === "init" && !initReceived) { + initReceived = true; + sendNextInput(); + } + + // Check for control_response + if (obj.type === "control_response") { + controlResponsesReceived++; + maybeClose(); + } + + // Check for result message + if (obj.type === "result") { + userResultsReceived++; + // If more inputs to send, send next after a brief delay + // This gives the CLI time to be ready for the next input + if (inputIndex < inputs.length) { + setTimeout(sendNextInput, 200); + } + // Always check if we should close (might have received all expected results) + maybeClose(); + } + + // Check for error message (for invalid JSON input test) + if (obj.type === "error" && hasInvalidInput) { + closing = true; + setTimeout(() => proc.stdin?.end(), 500); + } + } catch { + // Not valid JSON, ignore + } + }; + + const sendNextInput = () => { + if (inputIndex < inputs.length) { + proc.stdin?.write(`${inputs[inputIndex]}\n`); + inputIndex++; + } + }; proc.stdout?.on("data", (data) => { - stdout += data.toString(); + buffer += data.toString(); + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; // Keep incomplete line in buffer + for (const line of lines) { + processLine(line); + } }); + let stderr = ""; proc.stderr?.on("data", (data) => { stderr += data.toString(); }); - // Write inputs with delays between them - let inputIndex = 0; - const writeNextInput = () => { - if (inputIndex < inputs.length) { - proc.stdin?.write(`${inputs[inputIndex]}\n`); - inputIndex++; - setTimeout(writeNextInput, 1000); // 1s between inputs - } else { - // All inputs sent, wait for processing then close - setTimeout(() => { - proc.stdin?.end(); - }, waitMs); - } - }; - - // Start writing inputs after delay for process to initialize - // CI environments are slower, need more time for bun to start - // 8s delay accounts for slow ARM/Windows CI runners - setTimeout(writeNextInput, 8000); - proc.on("close", (code) => { - // Parse line-delimited JSON - const lines = stdout - .split("\n") - .filter((line) => line.trim()) - .filter((line) => { - try { - JSON.parse(line); - return true; - } catch { - return false; - } - }) - .map((line) => JSON.parse(line)); + // Process any remaining buffer + if (buffer.trim()) { + processLine(buffer); + } - if (lines.length === 0 && code !== 0) { - reject(new Error(`Process exited with code ${code}: ${stderr}`)); + // Check if we got enough results + const gotExpectedResults = + userResultsReceived >= expectedUserResults && + controlResponsesReceived >= expectedControlResponses; + + if (objects.length === 0 && code !== 0) { + reject( + new Error( + `Process exited with code ${code}, no output received. stderr: ${stderr}`, + ), + ); + } else if (!gotExpectedResults && code !== 0) { + reject( + new Error( + `Process exited with code ${code} before all results received. ` + + `Got ${userResultsReceived}/${expectedUserResults} user results, ` + + `${controlResponsesReceived}/${expectedControlResponses} control responses. ` + + `inputIndex: ${inputIndex}, initReceived: ${initReceived}. stderr: ${stderr}`, + ), + ); } else { - resolve(lines); + resolve(objects); } }); - // Safety timeout - generous for CI environments - setTimeout( - () => { - proc.kill(); - }, - waitMs + 15000 + inputs.length * 2000, - ); + // Safety timeout + const timeout = setTimeout(() => { + proc.kill(); + reject( + new Error( + `Timeout after ${timeoutMs}ms. Received ${objects.length} objects, init: ${initReceived}, userResults: ${userResultsReceived}/${expectedUserResults}, controlResponses: ${controlResponsesReceived}/${expectedControlResponses}`, + ), + ); + }, timeoutMs); + + proc.on("close", () => clearTimeout(timeout)); }); } @@ -150,22 +238,18 @@ describe("input-format stream-json", () => { expect(initResponse?.agent_id).toBeDefined(); } }, - { timeout: 30000 }, + { timeout: 120000 }, ); test( "user message returns assistant response and result", async () => { - const objects = (await runBidirectional( - [ - JSON.stringify({ - type: "user", - message: { role: "user", content: FAST_PROMPT }, - }), - ], - [], - 10000, - )) as WireMessage[]; + const objects = (await runBidirectional([ + JSON.stringify({ + type: "user", + message: { role: "user", content: FAST_PROMPT }, + }), + ])) as WireMessage[]; // Should have init event const initEvent = objects.find( @@ -207,32 +291,28 @@ describe("input-format stream-json", () => { expect(result?.agent_id).toBeDefined(); expect(result?.duration_ms).toBeGreaterThan(0); }, - { timeout: 60000 }, + { timeout: 120000 }, ); test( "multi-turn conversation maintains context", async () => { - const objects = (await runBidirectional( - [ - JSON.stringify({ - type: "user", - message: { - role: "user", - content: "Say hello", - }, - }), - JSON.stringify({ - type: "user", - message: { - role: "user", - content: "Say goodbye", - }, - }), - ], - [], - 20000, - )) as WireMessage[]; + const objects = (await runBidirectional([ + JSON.stringify({ + type: "user", + message: { + role: "user", + content: "Say hello", + }, + }), + JSON.stringify({ + type: "user", + message: { + role: "user", + content: "Say goodbye", + }, + }), + ])) as WireMessage[]; // Should have at least two results (one per turn) const results = objects.filter( @@ -256,23 +336,19 @@ describe("input-format stream-json", () => { expect(firstResult.session_id).toBe(lastResult.session_id); } }, - { timeout: 120000 }, + { timeout: 180000 }, ); test( "interrupt control request is acknowledged", async () => { - const objects = (await runBidirectional( - [ - JSON.stringify({ - type: "control_request", - request_id: "int_1", - request: { subtype: "interrupt" }, - }), - ], - [], - 12000, // Longer wait for slow CI (ARM, Windows) - )) as WireMessage[]; + const objects = (await runBidirectional([ + JSON.stringify({ + type: "control_request", + request_id: "int_1", + request: { subtype: "interrupt" }, + }), + ])) as WireMessage[]; // Should have control_response for interrupt const controlResponse = objects.find( @@ -282,7 +358,7 @@ describe("input-format stream-json", () => { expect(controlResponse).toBeDefined(); expect(controlResponse?.response.subtype).toBe("success"); }, - { timeout: 45000 }, // Increased from 30s for slow CI + { timeout: 120000 }, ); test( @@ -296,7 +372,6 @@ describe("input-format stream-json", () => { }), ], ["--include-partial-messages"], - 10000, )) as WireMessage[]; // Should have stream_event messages (not just "message" type) @@ -330,7 +405,7 @@ describe("input-format stream-json", () => { expect(result).toBeDefined(); expect(result?.subtype).toBe("success"); }, - { timeout: 60000 }, + { timeout: 120000 }, ); test( @@ -353,7 +428,7 @@ describe("input-format stream-json", () => { expect(controlResponse).toBeDefined(); expect(controlResponse?.response.subtype).toBe("error"); }, - { timeout: 30000 }, + { timeout: 120000 }, ); test( @@ -371,6 +446,6 @@ describe("input-format stream-json", () => { expect(errorMsg).toBeDefined(); expect(errorMsg?.message).toContain("Invalid JSON"); }, - { timeout: 30000 }, + { timeout: 120000 }, ); }); diff --git a/src/tests/headless-stream-json-format.test.ts b/src/tests/headless-stream-json-format.test.ts index 37dddca..fcef111 100644 --- a/src/tests/headless-stream-json-format.test.ts +++ b/src/tests/headless-stream-json-format.test.ts @@ -105,7 +105,7 @@ describe("stream-json format", () => { expect(init.cwd).toBeDefined(); expect(init.uuid).toBe(`init-${init.agent_id}`); }, - { timeout: 60000 }, + { timeout: 120000 }, ); test( @@ -131,7 +131,7 @@ describe("stream-json format", () => { // uuid should be otid or id from the Letta SDK chunk expect(msg.uuid).toBeTruthy(); }, - { timeout: 60000 }, + { timeout: 120000 }, ); test( @@ -156,7 +156,7 @@ describe("stream-json format", () => { expect(result.uuid).toContain("result-"); expect(result.result).toBeDefined(); }, - { timeout: 60000 }, + { timeout: 120000 }, ); test( @@ -183,7 +183,7 @@ describe("stream-json format", () => { // The event should contain the original Letta SDK chunk expect("message_type" in event.event).toBe(true); }, - { timeout: 60000 }, + { timeout: 120000 }, ); test( @@ -217,6 +217,6 @@ describe("stream-json format", () => { }); expect(resultLine).toBeDefined(); }, - { timeout: 60000 }, + { timeout: 120000 }, ); });