feat: align headless bidirectional queueing with TUI runtime (#990)

This commit is contained in:
Charles Packer
2026-02-17 11:38:29 -08:00
committed by GitHub
parent dda410943f
commit 7cc8729e57
3 changed files with 216 additions and 1 deletions

View File

@@ -15,6 +15,8 @@ export type QueuedMessage = {
type QueueAdder = (message: QueuedMessage) => void;
// Global bridge is intentionally single-consumer. Each process runs either
// one TUI App instance or one headless bidirectional loop.
let queueAdder: QueueAdder | null = null;
const pendingMessages: QueuedMessage[] = [];
const MAX_PENDING_MESSAGES = 10;

View File

@@ -41,6 +41,10 @@ import {
type ReflectionTrigger,
reflectionSettingsToLegacyMode,
} from "./cli/helpers/memoryReminder";
import {
type QueuedMessage,
setMessageQueueAdder,
} from "./cli/helpers/messageQueueBridge";
import {
type DrainStreamHook,
drainStreamWithResume,
@@ -121,6 +125,54 @@ export function shouldReinjectSkillsAfterCompaction(lines: Line[]): boolean {
);
}
type MessageContentParts = Exclude<MessageCreate["content"], string>;
export type BidirectionalQueuedInput =
| {
kind: "user";
content: MessageCreate["content"];
}
| {
kind: "task_notification";
text: string;
};
export function mergeBidirectionalQueuedInput(
queued: BidirectionalQueuedInput[],
): MessageCreate["content"] | null {
if (queued.length === 0) {
return null;
}
const mergedParts: MessageContentParts = [];
let isFirst = true;
for (const item of queued) {
if (!isFirst) {
mergedParts.push({ type: "text", text: "\n" });
}
isFirst = false;
if (item.kind === "task_notification") {
mergedParts.push({ type: "text", text: item.text });
continue;
}
if (typeof item.content === "string") {
mergedParts.push({ type: "text", text: item.content });
continue;
}
mergedParts.push(...item.content);
}
if (mergedParts.length === 0) {
return null;
}
return mergedParts as MessageCreate["content"];
}
type ReflectionOverrides = {
trigger?: ReflectionTrigger;
behavior?: ReflectionBehavior;
@@ -2393,6 +2445,29 @@ async function runBidirectionalMode(
const lineQueue: string[] = [];
let lineResolver: ((line: string | null) => void) | null = null;
const serializeQueuedMessageAsUserLine = (queuedMessage: QueuedMessage) =>
JSON.stringify({
type: "user",
message: {
role: "user",
content: queuedMessage.text,
},
_queuedKind: queuedMessage.kind,
});
// Connect Task/subagent background notifications to the same queueing path
// used by user input so bidirectional mode inherits TUI-style queue behavior.
setMessageQueueAdder((queuedMessage) => {
const syntheticUserLine = serializeQueuedMessageAsUserLine(queuedMessage);
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
resolve(syntheticUserLine);
return;
}
lineQueue.push(syntheticUserLine);
});
// Feed lines into queue or resolver
rl.on("line", (line) => {
if (lineResolver) {
@@ -2405,6 +2480,7 @@ async function runBidirectionalMode(
});
rl.on("close", () => {
setMessageQueueAdder(null);
if (lineResolver) {
const resolve = lineResolver;
lineResolver = null;
@@ -2669,7 +2745,77 @@ async function runBidirectionalMode(
// Handle user messages
if (message.type === "user" && message.message?.content !== undefined) {
const userContent = message.message.content;
const queuedInputs: BidirectionalQueuedInput[] = [
{
kind: "user",
content: message.message.content,
},
];
// Batch any already-buffered user lines into the same turn, mirroring
// TUI queue dequeue behavior (single coalesced submit when idle).
while (lineQueue.length > 0) {
const candidate = lineQueue[0];
if (!candidate?.trim()) {
lineQueue.shift();
continue;
}
let parsedCandidate: {
type?: string;
message?: { content?: MessageCreate["content"] };
_queuedKind?: QueuedMessage["kind"];
};
try {
parsedCandidate = JSON.parse(candidate);
} catch {
// Leave malformed lines for the main loop to surface as parse errors.
break;
}
if (
parsedCandidate.type === "user" &&
parsedCandidate.message?.content !== undefined
) {
lineQueue.shift();
if (parsedCandidate._queuedKind === "task_notification") {
const notificationText =
typeof parsedCandidate.message.content === "string"
? parsedCandidate.message.content
: parsedCandidate.message.content
.reduce((texts: string[], part) => {
if (
part.type === "text" &&
"text" in part &&
typeof part.text === "string"
) {
texts.push(part.text);
}
return texts;
}, [])
.join("");
queuedInputs.push({
kind: "task_notification",
text: notificationText,
});
} else {
queuedInputs.push({
kind: "user",
content: parsedCandidate.message.content,
});
}
continue;
}
// Stop coalescing when the queue head is not a user-input line.
// The outer loop must process control/error/system lines in-order.
break;
}
const userContent = mergeBidirectionalQueuedInput(queuedInputs);
if (userContent === null) {
continue;
}
// Create abort controller for this operation
currentAbortController = new AbortController();
@@ -3180,5 +3326,6 @@ async function runBidirectionalMode(
}
// Stdin closed, exit gracefully
setMessageQueueAdder(null);
process.exit(0);
}

View File

@@ -0,0 +1,66 @@
import { describe, expect, test } from "bun:test";
import { readFileSync } from "node:fs";
import { fileURLToPath } from "node:url";
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import {
type BidirectionalQueuedInput,
mergeBidirectionalQueuedInput,
} from "../../headless";
describe("headless bidirectional queue merging", () => {
test("merges queued user and task notification inputs into one content payload", () => {
const queued: BidirectionalQueuedInput[] = [
{ kind: "user", content: "first user message" },
{
kind: "task_notification",
text: "<task-notification><summary>done</summary></task-notification>",
},
{ kind: "user", content: "second user message" },
];
const merged = mergeBidirectionalQueuedInput(queued);
expect(Array.isArray(merged)).toBe(true);
if (!Array.isArray(merged)) return;
const textParts = merged.flatMap((part) =>
part.type === "text" ? [part.text] : [],
);
expect(textParts.join("")).toContain("first user message");
expect(textParts.join("")).toContain("<task-notification>");
expect(textParts.join("")).toContain("second user message");
});
test("preserves multimodal user content parts", () => {
const multimodal = [
{ type: "text", text: "describe image" },
{
type: "image",
source: { type: "base64", media_type: "image/png", data: "abc" },
},
] as unknown as Exclude<MessageCreate["content"], string>;
const queued: BidirectionalQueuedInput[] = [
{ kind: "user", content: multimodal },
];
const merged = mergeBidirectionalQueuedInput(queued);
expect(Array.isArray(merged)).toBe(true);
if (!Array.isArray(merged)) return;
expect(merged[0]).toEqual(multimodal[0]);
expect(merged[1]).toEqual(multimodal[1]);
});
});
describe("headless bidirectional queue wiring", () => {
test("registers and clears messageQueueBridge adder in bidirectional mode", () => {
const headlessPath = fileURLToPath(
new URL("../../headless.ts", import.meta.url),
);
const source = readFileSync(headlessPath, "utf-8");
expect(source).toContain("setMessageQueueAdder((queuedMessage) =>");
expect(source).toContain("serializeQueuedMessageAsUserLine");
expect(source).toContain("_queuedKind");
expect(source).toContain("setMessageQueueAdder(null)");
});
});