fix: make headless tests event-driven to reduce CI flakiness (#451)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-02 16:20:59 -08:00
committed by GitHub
parent 6587237fa6
commit cd28bab41a
2 changed files with 177 additions and 102 deletions

View File

@@ -20,12 +20,12 @@ const FAST_PROMPT =
/**
* Helper to run bidirectional commands with stdin input.
* Sends input lines, waits for output, and returns parsed JSON lines.
* Event-driven: waits for init message before sending input, waits for result before closing.
*/
async function runBidirectional(
inputs: string[],
extraArgs: string[] = [],
waitMs = 12000, // Increased for slower CI environments (Linux ARM, Windows)
timeoutMs = 90000, // Overall timeout for the entire operation
): Promise<object[]> {
return new Promise((resolve, reject) => {
const proc = spawn(
@@ -50,66 +50,154 @@ async function runBidirectional(
},
);
let stdout = "";
let stderr = "";
const objects: object[] = [];
let buffer = "";
let inputIndex = 0;
let initReceived = false;
let closing = false;
// Count expected responses based on input types
const inputTypes = inputs.map((i) => {
try {
const parsed = JSON.parse(i);
return parsed.type;
} catch {
return "invalid"; // Invalid JSON
}
});
const expectedUserResults = inputTypes.filter((t) => t === "user").length;
const expectedControlResponses = inputTypes.filter(
(t) => t === "control_request",
).length;
const hasInvalidInput = inputTypes.includes("invalid");
let userResultsReceived = 0;
let controlResponsesReceived = 0;
const maybeClose = () => {
if (closing) return;
// For invalid input, close after receiving error
// For control requests only, close after all control_responses
// For user messages, close after all results
// For mixed, close when we have all expected responses
const allUserResultsDone =
expectedUserResults === 0 || userResultsReceived >= expectedUserResults;
const allControlResponsesDone =
expectedControlResponses === 0 ||
controlResponsesReceived >= expectedControlResponses;
const allInputsSent = inputIndex >= inputs.length;
if (allInputsSent && allUserResultsDone && allControlResponsesDone) {
closing = true;
setTimeout(() => proc.stdin?.end(), 500);
}
};
const processLine = (line: string) => {
if (!line.trim()) return;
try {
const obj = JSON.parse(line);
objects.push(obj);
// Check for init message - signal to start sending inputs
if (obj.type === "system" && obj.subtype === "init" && !initReceived) {
initReceived = true;
sendNextInput();
}
// Check for control_response
if (obj.type === "control_response") {
controlResponsesReceived++;
maybeClose();
}
// Check for result message
if (obj.type === "result") {
userResultsReceived++;
// If more inputs to send, send next after a brief delay
// This gives the CLI time to be ready for the next input
if (inputIndex < inputs.length) {
setTimeout(sendNextInput, 200);
}
// Always check if we should close (might have received all expected results)
maybeClose();
}
// Check for error message (for invalid JSON input test)
if (obj.type === "error" && hasInvalidInput) {
closing = true;
setTimeout(() => proc.stdin?.end(), 500);
}
} catch {
// Not valid JSON, ignore
}
};
const sendNextInput = () => {
if (inputIndex < inputs.length) {
proc.stdin?.write(`${inputs[inputIndex]}\n`);
inputIndex++;
}
};
proc.stdout?.on("data", (data) => {
stdout += data.toString();
buffer += data.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // Keep incomplete line in buffer
for (const line of lines) {
processLine(line);
}
});
let stderr = "";
proc.stderr?.on("data", (data) => {
stderr += data.toString();
});
// Write inputs with delays between them
let inputIndex = 0;
const writeNextInput = () => {
if (inputIndex < inputs.length) {
proc.stdin?.write(`${inputs[inputIndex]}\n`);
inputIndex++;
setTimeout(writeNextInput, 1000); // 1s between inputs
} else {
// All inputs sent, wait for processing then close
setTimeout(() => {
proc.stdin?.end();
}, waitMs);
}
};
// Start writing inputs after delay for process to initialize
// CI environments are slower, need more time for bun to start
// 8s delay accounts for slow ARM/Windows CI runners
setTimeout(writeNextInput, 8000);
proc.on("close", (code) => {
// Parse line-delimited JSON
const lines = stdout
.split("\n")
.filter((line) => line.trim())
.filter((line) => {
try {
JSON.parse(line);
return true;
} catch {
return false;
}
})
.map((line) => JSON.parse(line));
// Process any remaining buffer
if (buffer.trim()) {
processLine(buffer);
}
if (lines.length === 0 && code !== 0) {
reject(new Error(`Process exited with code ${code}: ${stderr}`));
// Check if we got enough results
const gotExpectedResults =
userResultsReceived >= expectedUserResults &&
controlResponsesReceived >= expectedControlResponses;
if (objects.length === 0 && code !== 0) {
reject(
new Error(
`Process exited with code ${code}, no output received. stderr: ${stderr}`,
),
);
} else if (!gotExpectedResults && code !== 0) {
reject(
new Error(
`Process exited with code ${code} before all results received. ` +
`Got ${userResultsReceived}/${expectedUserResults} user results, ` +
`${controlResponsesReceived}/${expectedControlResponses} control responses. ` +
`inputIndex: ${inputIndex}, initReceived: ${initReceived}. stderr: ${stderr}`,
),
);
} else {
resolve(lines);
resolve(objects);
}
});
// Safety timeout - generous for CI environments
setTimeout(
() => {
proc.kill();
},
waitMs + 15000 + inputs.length * 2000,
);
// Safety timeout
const timeout = setTimeout(() => {
proc.kill();
reject(
new Error(
`Timeout after ${timeoutMs}ms. Received ${objects.length} objects, init: ${initReceived}, userResults: ${userResultsReceived}/${expectedUserResults}, controlResponses: ${controlResponsesReceived}/${expectedControlResponses}`,
),
);
}, timeoutMs);
proc.on("close", () => clearTimeout(timeout));
});
}
@@ -150,22 +238,18 @@ describe("input-format stream-json", () => {
expect(initResponse?.agent_id).toBeDefined();
}
},
{ timeout: 30000 },
{ timeout: 120000 },
);
test(
"user message returns assistant response and result",
async () => {
const objects = (await runBidirectional(
[
JSON.stringify({
type: "user",
message: { role: "user", content: FAST_PROMPT },
}),
],
[],
10000,
)) as WireMessage[];
const objects = (await runBidirectional([
JSON.stringify({
type: "user",
message: { role: "user", content: FAST_PROMPT },
}),
])) as WireMessage[];
// Should have init event
const initEvent = objects.find(
@@ -207,32 +291,28 @@ describe("input-format stream-json", () => {
expect(result?.agent_id).toBeDefined();
expect(result?.duration_ms).toBeGreaterThan(0);
},
{ timeout: 60000 },
{ timeout: 120000 },
);
test(
"multi-turn conversation maintains context",
async () => {
const objects = (await runBidirectional(
[
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say hello",
},
}),
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say goodbye",
},
}),
],
[],
20000,
)) as WireMessage[];
const objects = (await runBidirectional([
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say hello",
},
}),
JSON.stringify({
type: "user",
message: {
role: "user",
content: "Say goodbye",
},
}),
])) as WireMessage[];
// Should have at least two results (one per turn)
const results = objects.filter(
@@ -256,23 +336,19 @@ describe("input-format stream-json", () => {
expect(firstResult.session_id).toBe(lastResult.session_id);
}
},
{ timeout: 120000 },
{ timeout: 180000 },
);
test(
"interrupt control request is acknowledged",
async () => {
const objects = (await runBidirectional(
[
JSON.stringify({
type: "control_request",
request_id: "int_1",
request: { subtype: "interrupt" },
}),
],
[],
12000, // Longer wait for slow CI (ARM, Windows)
)) as WireMessage[];
const objects = (await runBidirectional([
JSON.stringify({
type: "control_request",
request_id: "int_1",
request: { subtype: "interrupt" },
}),
])) as WireMessage[];
// Should have control_response for interrupt
const controlResponse = objects.find(
@@ -282,7 +358,7 @@ describe("input-format stream-json", () => {
expect(controlResponse).toBeDefined();
expect(controlResponse?.response.subtype).toBe("success");
},
{ timeout: 45000 }, // Increased from 30s for slow CI
{ timeout: 120000 },
);
test(
@@ -296,7 +372,6 @@ describe("input-format stream-json", () => {
}),
],
["--include-partial-messages"],
10000,
)) as WireMessage[];
// Should have stream_event messages (not just "message" type)
@@ -330,7 +405,7 @@ describe("input-format stream-json", () => {
expect(result).toBeDefined();
expect(result?.subtype).toBe("success");
},
{ timeout: 60000 },
{ timeout: 120000 },
);
test(
@@ -353,7 +428,7 @@ describe("input-format stream-json", () => {
expect(controlResponse).toBeDefined();
expect(controlResponse?.response.subtype).toBe("error");
},
{ timeout: 30000 },
{ timeout: 120000 },
);
test(
@@ -371,6 +446,6 @@ describe("input-format stream-json", () => {
expect(errorMsg).toBeDefined();
expect(errorMsg?.message).toContain("Invalid JSON");
},
{ timeout: 30000 },
{ timeout: 120000 },
);
});

View File

@@ -105,7 +105,7 @@ describe("stream-json format", () => {
expect(init.cwd).toBeDefined();
expect(init.uuid).toBe(`init-${init.agent_id}`);
},
{ timeout: 60000 },
{ timeout: 120000 },
);
test(
@@ -131,7 +131,7 @@ describe("stream-json format", () => {
// uuid should be otid or id from the Letta SDK chunk
expect(msg.uuid).toBeTruthy();
},
{ timeout: 60000 },
{ timeout: 120000 },
);
test(
@@ -156,7 +156,7 @@ describe("stream-json format", () => {
expect(result.uuid).toContain("result-");
expect(result.result).toBeDefined();
},
{ timeout: 60000 },
{ timeout: 120000 },
);
test(
@@ -183,7 +183,7 @@ describe("stream-json format", () => {
// The event should contain the original Letta SDK chunk
expect("message_type" in event.event).toBe(true);
},
{ timeout: 60000 },
{ timeout: 120000 },
);
test(
@@ -217,6 +217,6 @@ describe("stream-json format", () => {
});
expect(resultLine).toBeDefined();
},
{ timeout: 60000 },
{ timeout: 120000 },
);
});