fix: update stream-json output format for SDK compatibility (#442)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2025-12-31 19:19:35 -08:00
committed by GitHub
parent 125c208e89
commit 725a2a7cad
4 changed files with 254 additions and 7 deletions

View File

@@ -71,6 +71,8 @@ jobs:
uses: t1m0thyj/unlock-keyring@v1 uses: t1m0thyj/unlock-keyring@v1
- name: Run tests (extended timeout) - name: Run tests (extended timeout)
env:
LETTA_API_KEY: ${{ secrets.LETTA_API_KEY }}
run: bun test --timeout 15000 run: bun test --timeout 15000
- name: Build bundle - name: Build bundle

View File

@@ -55,6 +55,7 @@ export async function handleHeadlessCommand(
toolset: { type: "string" }, toolset: { type: "string" },
prompt: { type: "boolean", short: "p" }, prompt: { type: "boolean", short: "p" },
"output-format": { type: "string" }, "output-format": { type: "string" },
"include-partial-messages": { type: "boolean" },
// Additional flags from index.ts that need to be filtered out // Additional flags from index.ts that need to be filtered out
help: { type: "boolean", short: "h" }, help: { type: "boolean", short: "h" },
version: { type: "boolean", short: "v" }, version: { type: "boolean", short: "v" },
@@ -391,6 +392,7 @@ export async function handleHeadlessCommand(
// Validate output format // Validate output format
const outputFormat = const outputFormat =
(values["output-format"] as string | undefined) || "text"; (values["output-format"] as string | undefined) || "text";
const includePartialMessages = Boolean(values["include-partial-messages"]);
if (!["text", "json", "stream-json"].includes(outputFormat)) { if (!["text", "json", "stream-json"].includes(outputFormat)) {
console.error( console.error(
`Error: Invalid output format "${outputFormat}". Valid formats: text, json, stream-json`, `Error: Invalid output format "${outputFormat}". Valid formats: text, json, stream-json`,
@@ -404,13 +406,20 @@ export async function handleHeadlessCommand(
// Initialize session stats // Initialize session stats
const sessionStats = new SessionStats(); const sessionStats = new SessionStats();
// Use agent.id as session_id for all stream-json messages
const sessionId = agent.id;
// Output init event for stream-json format // Output init event for stream-json format
if (outputFormat === "stream-json") { if (outputFormat === "stream-json") {
const initEvent = { const initEvent = {
type: "init", type: "system",
subtype: "init",
session_id: sessionId,
agent_id: agent.id, agent_id: agent.id,
model: agent.llm_config?.model, model: agent.llm_config?.model,
tools: agent.tools?.map((t) => t.name) || [], tools: agent.tools?.map((t) => t.name) || [],
cwd: process.cwd(),
uuid: `init-${agent.id}`,
}; };
console.log(JSON.stringify(initEvent)); console.log(JSON.stringify(initEvent));
} }
@@ -509,6 +518,8 @@ export async function handleHeadlessCommand(
tool_name: decision.approval.toolName, tool_name: decision.approval.toolName,
tool_call_id: decision.approval.toolCallId, tool_call_id: decision.approval.toolCallId,
tool_args: decision.approval.toolArgs, tool_args: decision.approval.toolArgs,
session_id: sessionId,
uuid: `auto-approval-${decision.approval.toolCallId}`,
}), }),
); );
} }
@@ -624,6 +635,8 @@ export async function handleHeadlessCommand(
type: "error", type: "error",
message: fullErrorText, message: fullErrorText,
detail: errorDetail, detail: errorDetail,
session_id: sessionId,
uuid: crypto.randomUUID(),
}), }),
); );
@@ -758,6 +771,8 @@ export async function handleHeadlessCommand(
tool_args: incomingArgs, tool_args: incomingArgs,
reason: permission.reason, reason: permission.reason,
matched_rule: permission.matchedRule, matched_rule: permission.matchedRule,
session_id: sessionId,
uuid: `auto-approval-${id}`,
}), }),
); );
autoApprovalEmitted.add(id); autoApprovalEmitted.add(id);
@@ -769,12 +784,34 @@ export async function handleHeadlessCommand(
// Output chunk as message event (unless filtered) // Output chunk as message event (unless filtered)
if (shouldOutputChunk) { if (shouldOutputChunk) {
console.log( // Use existing otid or id from the Letta SDK chunk
JSON.stringify({ const chunkWithIds = chunk as typeof chunk & {
type: "message", otid?: string;
...chunk, id?: string;
}), };
); const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
// Emit as stream_event wrapper (like Claude Code with --include-partial-messages)
console.log(
JSON.stringify({
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid,
}),
);
} else {
// Emit as regular message (default)
console.log(
JSON.stringify({
type: "message",
...chunk,
session_id: sessionId,
uuid,
}),
);
}
} }
// Still accumulate for approval tracking // Still accumulate for approval tracking
@@ -939,6 +976,8 @@ export async function handleHeadlessCommand(
max_attempts: LLM_API_ERROR_MAX_RETRIES, max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs, delay_ms: delayMs,
run_id: lastRunId, run_id: lastRunId,
session_id: sessionId,
uuid: `retry-${lastRunId || crypto.randomUUID()}`,
}), }),
); );
} else { } else {
@@ -1011,6 +1050,8 @@ export async function handleHeadlessCommand(
max_attempts: LLM_API_ERROR_MAX_RETRIES, max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs, delay_ms: delayMs,
run_id: lastRunId, run_id: lastRunId,
session_id: sessionId,
uuid: `retry-${lastRunId || crypto.randomUUID()}`,
}), }),
); );
} else { } else {
@@ -1077,6 +1118,8 @@ export async function handleHeadlessCommand(
message: errorMessage, message: errorMessage,
stop_reason: stopReason, stop_reason: stopReason,
run_id: lastRunId, run_id: lastRunId,
session_id: sessionId,
uuid: `error-${lastRunId || crypto.randomUUID()}`,
}), }),
); );
} else { } else {
@@ -1097,6 +1140,8 @@ export async function handleHeadlessCommand(
type: "error", type: "error",
message: errorDetails, message: errorDetails,
run_id: lastKnownRunId, run_id: lastKnownRunId,
session_id: sessionId,
uuid: `error-${lastKnownRunId || crypto.randomUUID()}`,
}), }),
); );
} else { } else {
@@ -1177,10 +1222,16 @@ export async function handleHeadlessCommand(
} }
} }
// Use the last run_id as the result uuid if available, otherwise derive from agent_id
const resultUuid =
allRunIds.size > 0
? `result-${Array.from(allRunIds).pop()}`
: `result-${agent.id}`;
const resultEvent = { const resultEvent = {
type: "result", type: "result",
subtype: "success", subtype: "success",
is_error: false, is_error: false,
session_id: sessionId,
duration_ms: Math.round(stats.totalWallMs), duration_ms: Math.round(stats.totalWallMs),
duration_api_ms: Math.round(stats.totalApiMs), duration_api_ms: Math.round(stats.totalApiMs),
num_turns: stats.usage.stepCount, num_turns: stats.usage.stepCount,
@@ -1192,6 +1243,7 @@ export async function handleHeadlessCommand(
completion_tokens: stats.usage.completionTokens, completion_tokens: stats.usage.completionTokens,
total_tokens: stats.usage.totalTokens, total_tokens: stats.usage.totalTokens,
}, },
uuid: resultUuid,
}; };
console.log(JSON.stringify(resultEvent)); console.log(JSON.stringify(resultEvent));
} else { } else {

View File

@@ -57,6 +57,8 @@ OPTIONS
-p, --prompt Headless prompt mode -p, --prompt Headless prompt mode
--output-format <fmt> Output format for headless mode (text, json, stream-json) --output-format <fmt> Output format for headless mode (text, json, stream-json)
Default: text Default: text
--include-partial-messages
Emit stream_event wrappers for each chunk (stream-json only)
--skills <path> Custom path to skills directory (default: .skills in current directory) --skills <path> Custom path to skills directory (default: .skills in current directory)
--sleeptime Enable sleeptime memory management (only for new agents) --sleeptime Enable sleeptime memory management (only for new agents)
--from-af <path> Create agent from an AgentFile (.af) template --from-af <path> Create agent from an AgentFile (.af) template
@@ -337,6 +339,7 @@ async function main(): Promise<void> {
"permission-mode": { type: "string" }, "permission-mode": { type: "string" },
yolo: { type: "boolean" }, yolo: { type: "boolean" },
"output-format": { type: "string" }, "output-format": { type: "string" },
"include-partial-messages": { type: "boolean" },
skills: { type: "string" }, skills: { type: "string" },
link: { type: "boolean" }, link: { type: "boolean" },
unlink: { type: "boolean" }, unlink: { type: "boolean" },

View File

@@ -0,0 +1,190 @@
import { describe, expect, test } from "bun:test";
import { spawn } from "node:child_process";
/**
* Tests for stream-json output format.
* These verify the message structure matches the SDK-compatible format.
*/
async function runHeadlessCommand(
prompt: string,
extraArgs: string[] = [],
): Promise<string[]> {
return new Promise((resolve, reject) => {
const proc = spawn(
"bun",
[
"run",
"dev",
"--new",
"-p",
prompt,
"--output-format",
"stream-json",
"--yolo",
"-m",
"haiku",
...extraArgs,
],
{
cwd: process.cwd(),
env: { ...process.env },
},
);
let stdout = "";
let stderr = "";
proc.stdout.on("data", (data) => {
stdout += data.toString();
});
proc.stderr.on("data", (data) => {
stderr += data.toString();
});
proc.on("close", (code) => {
if (code !== 0 && !stdout.includes('"type":"result"')) {
reject(new Error(`Process exited with code ${code}: ${stderr}`));
} else {
// Parse line-delimited JSON
const lines = stdout
.split("\n")
.filter((line) => line.trim())
.filter((line) => {
try {
JSON.parse(line);
return true;
} catch {
return false;
}
});
resolve(lines);
}
});
});
}
// Prescriptive prompt to ensure single-step response without tool use
const FAST_PROMPT =
"This is a test. Do not call any tools. Just respond with the word OK and nothing else.";
describe("stream-json format", () => {
test(
"init message has type 'system' with subtype 'init'",
async () => {
const lines = await runHeadlessCommand(FAST_PROMPT);
const initLine = lines.find((line) => {
const obj = JSON.parse(line);
return obj.type === "system" && obj.subtype === "init";
});
expect(initLine).toBeDefined();
const init = JSON.parse(initLine!);
expect(init.type).toBe("system");
expect(init.subtype).toBe("init");
expect(init.agent_id).toBeDefined();
expect(init.session_id).toBe(init.agent_id); // session_id should equal agent_id
expect(init.model).toBeDefined();
expect(init.tools).toBeInstanceOf(Array);
expect(init.cwd).toBeDefined();
expect(init.uuid).toBe(`init-${init.agent_id}`);
},
{ timeout: 60000 },
);
test(
"messages have session_id and uuid",
async () => {
const lines = await runHeadlessCommand(FAST_PROMPT);
// Find a message line
const messageLine = lines.find((line) => {
const obj = JSON.parse(line);
return obj.type === "message";
});
expect(messageLine).toBeDefined();
const msg = JSON.parse(messageLine!);
expect(msg.session_id).toBeDefined();
expect(msg.uuid).toBeDefined();
// uuid should be otid or id from the Letta SDK chunk
expect(msg.uuid).toBeTruthy();
},
{ timeout: 60000 },
);
test(
"result message has correct format",
async () => {
const lines = await runHeadlessCommand(FAST_PROMPT);
const resultLine = lines.find((line) => {
const obj = JSON.parse(line);
return obj.type === "result";
});
expect(resultLine).toBeDefined();
const result = JSON.parse(resultLine!);
expect(result.type).toBe("result");
expect(result.subtype).toBe("success");
expect(result.session_id).toBeDefined();
expect(result.agent_id).toBeDefined();
expect(result.session_id).toBe(result.agent_id);
expect(result.duration_ms).toBeGreaterThan(0);
expect(result.uuid).toContain("result-");
expect(result.result).toBeDefined();
},
{ timeout: 60000 },
);
test(
"--include-partial-messages wraps chunks in stream_event",
async () => {
const lines = await runHeadlessCommand(FAST_PROMPT, [
"--include-partial-messages",
]);
// Find a stream_event line
const streamEventLine = lines.find((line) => {
const obj = JSON.parse(line);
return obj.type === "stream_event";
});
expect(streamEventLine).toBeDefined();
const event = JSON.parse(streamEventLine!);
expect(event.type).toBe("stream_event");
expect(event.event).toBeDefined();
expect(event.session_id).toBeDefined();
expect(event.uuid).toBeDefined();
// The event should contain the original Letta SDK chunk
expect(event.event.message_type).toBeDefined();
},
{ timeout: 60000 },
);
test(
"without --include-partial-messages, messages are type 'message'",
async () => {
const lines = await runHeadlessCommand(FAST_PROMPT);
// Should have message lines, not stream_event
const messageLine = lines.find((line) => {
const obj = JSON.parse(line);
return obj.type === "message";
});
const streamEventLine = lines.find((line) => {
const obj = JSON.parse(line);
return obj.type === "stream_event";
});
expect(messageLine).toBeDefined();
expect(streamEventLine).toBeUndefined();
},
{ timeout: 60000 },
);
});