feat: Add --input-format stream-json for bidirectional SDK communication (#444)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-01 01:00:45 -08:00
committed by GitHub
parent 4bf102fb27
commit 47e4734ba1
4 changed files with 620 additions and 8 deletions

View File

@@ -1,4 +1,5 @@
import { parseArgs } from "node:util";
import type { Letta } from "@letta-ai/letta-client";
import type {
AgentState,
MessageCreate,
@@ -55,6 +56,7 @@ export async function handleHeadlessCommand(
toolset: { type: "string" },
prompt: { type: "boolean", short: "p" },
"output-format": { type: "string" },
"input-format": { type: "string" },
"include-partial-messages": { type: "boolean" },
// Additional flags from index.ts that need to be filtered out
help: { type: "boolean", short: "h" },
@@ -83,11 +85,15 @@ export async function handleHeadlessCommand(
toolFilter.setEnabledTools(values.tools as string);
}
// Get prompt from either positional args or stdin
// Check for input-format early - if stream-json, we don't need a prompt
const inputFormat = values["input-format"] as string | undefined;
const isBidirectionalMode = inputFormat === "stream-json";
// Get prompt from either positional args or stdin (unless in bidirectional mode)
let prompt = positionals.slice(2).join(" ");
// If no prompt provided as args, try reading from stdin
if (!prompt) {
// If no prompt provided as args, try reading from stdin (unless in bidirectional mode)
if (!prompt && !isBidirectionalMode) {
// Check if stdin is available (piped input)
if (!process.stdin.isTTY) {
const chunks: Buffer[] = [];
@@ -98,7 +104,7 @@ export async function handleHeadlessCommand(
}
}
if (!prompt) {
if (!prompt && !isBidirectionalMode) {
console.error("Error: No prompt provided");
process.exit(1);
}
@@ -399,6 +405,23 @@ export async function handleHeadlessCommand(
);
process.exit(1);
}
if (inputFormat && inputFormat !== "stream-json") {
console.error(
`Error: Invalid input format "${inputFormat}". Valid formats: stream-json`,
);
process.exit(1);
}
// If input-format is stream-json, use bidirectional mode
if (isBidirectionalMode) {
await runBidirectionalMode(
agent,
client,
outputFormat,
includePartialMessages,
);
return;
}
// Create buffers to accumulate stream
const buffers = createBuffers();
@@ -1255,3 +1278,235 @@ export async function handleHeadlessCommand(
console.log(resultText);
}
}
/**
* Bidirectional mode for SDK communication.
* Reads JSON messages from stdin, processes them, and outputs responses.
* Stays alive until stdin closes.
*/
async function runBidirectionalMode(
agent: AgentState,
_client: Letta,
_outputFormat: string,
includePartialMessages: boolean,
): Promise<void> {
const sessionId = agent.id;
const readline = await import("node:readline");
// Emit init event
const initEvent = {
type: "system",
subtype: "init",
session_id: sessionId,
agent_id: agent.id,
model: agent.llm_config?.model,
tools: agent.tools?.map((t) => t.name) || [],
cwd: process.cwd(),
uuid: `init-${agent.id}`,
};
console.log(JSON.stringify(initEvent));
// Track current operation for interrupt support
let currentAbortController: AbortController | null = null;
// Create readline interface for stdin
const rl = readline.createInterface({
input: process.stdin,
terminal: false,
});
// Process lines as they arrive using async iterator
for await (const line of rl) {
if (!line.trim()) continue;
let message: {
type: string;
message?: { role: string; content: string };
request_id?: string;
request?: { subtype: string };
session_id?: string;
};
try {
message = JSON.parse(line);
} catch {
console.log(
JSON.stringify({
type: "error",
message: "Invalid JSON input",
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
continue;
}
// Handle control requests
if (message.type === "control_request") {
const subtype = message.request?.subtype;
const requestId = message.request_id;
if (subtype === "initialize") {
// Return session info
console.log(
JSON.stringify({
type: "control_response",
response: {
subtype: "success",
request_id: requestId,
response: {
agent_id: agent.id,
model: agent.llm_config?.model,
tools: agent.tools?.map((t) => t.name) || [],
},
},
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
} else if (subtype === "interrupt") {
// Abort current operation if any
if (currentAbortController !== null) {
(currentAbortController as AbortController).abort();
currentAbortController = null;
}
console.log(
JSON.stringify({
type: "control_response",
response: {
subtype: "success",
request_id: requestId,
},
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
} else {
console.log(
JSON.stringify({
type: "control_response",
response: {
subtype: "error",
request_id: requestId,
message: `Unknown control request subtype: ${subtype}`,
},
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
}
continue;
}
// Handle user messages
if (message.type === "user" && message.message?.content) {
const userContent = message.message.content;
// Create abort controller for this operation
currentAbortController = new AbortController();
try {
// Send message to agent
const stream = await sendMessageStream(agent.id, [
{ role: "user", content: userContent },
]);
const buffers = createBuffers();
const startTime = performance.now();
// Process stream
for await (const chunk of stream) {
// Check if aborted
if (currentAbortController?.signal.aborted) {
break;
}
// Output chunk
const chunkWithIds = chunk as typeof chunk & {
otid?: string;
id?: string;
};
const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
console.log(
JSON.stringify({
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid,
}),
);
} else {
console.log(
JSON.stringify({
type: "message",
...chunk,
session_id: sessionId,
uuid,
}),
);
}
// Accumulate for result
const { onChunk } = await import("./cli/helpers/accumulator");
onChunk(buffers, chunk);
}
// Emit result
const durationMs = performance.now() - startTime;
const lines = toLines(buffers);
const reversed = [...lines].reverse();
const lastAssistant = reversed.find(
(line) =>
line.kind === "assistant" &&
"text" in line &&
typeof line.text === "string" &&
line.text.trim().length > 0,
) as Extract<Line, { kind: "assistant" }> | undefined;
const resultText = lastAssistant?.text || "";
console.log(
JSON.stringify({
type: "result",
subtype: currentAbortController?.signal.aborted
? "interrupted"
: "success",
is_error: false,
session_id: sessionId,
duration_ms: Math.round(durationMs),
result: resultText,
agent_id: agent.id,
uuid: `result-${agent.id}-${Date.now()}`,
}),
);
} catch (error) {
console.log(
JSON.stringify({
type: "error",
message:
error instanceof Error ? error.message : "Unknown error occurred",
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
} finally {
currentAbortController = null;
}
continue;
}
// Unknown message type
console.log(
JSON.stringify({
type: "error",
message: `Unknown message type: ${message.type}`,
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
}
// Stdin closed, exit gracefully
process.exit(0);
}