feat: add typed wire format for stream-json protocol (#445)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-01 20:05:41 -08:00
committed by GitHub
parent 47e4734ba1
commit 397560ef00
7 changed files with 618 additions and 256 deletions

View File

@@ -4,7 +4,10 @@ import type {
AgentState,
MessageCreate,
} from "@letta-ai/letta-client/resources/agents/agents";
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
import type {
ApprovalCreate,
LettaStreamingResponse,
} from "@letta-ai/letta-client/resources/agents/messages";
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
import type { ApprovalResult } from "./agent/approval-execution";
import { getClient } from "./agent/client";
@@ -28,6 +31,16 @@ import {
forceUpsertTools,
isToolsNotFoundError,
} from "./tools/manager";
import type {
AutoApprovalMessage,
ControlResponse,
ErrorMessage,
MessageWire,
ResultMessage,
RetryMessage,
StreamEvent,
SystemInitMessage,
} from "./types/wire";
// Maximum number of times to retry a turn when the backend
// reports an `llm_api_error` stop reason. This helps smooth
@@ -434,14 +447,18 @@ export async function handleHeadlessCommand(
// Output init event for stream-json format
if (outputFormat === "stream-json") {
const initEvent = {
const initEvent: SystemInitMessage = {
type: "system",
subtype: "init",
session_id: sessionId,
agent_id: agent.id,
model: agent.llm_config?.model,
tools: agent.tools?.map((t) => t.name) || [],
model: agent.llm_config?.model ?? "",
tools:
agent.tools?.map((t) => t.name).filter((n): n is string => !!n) || [],
cwd: process.cwd(),
mcp_servers: [],
permission_mode: "",
slash_commands: [],
uuid: `init-${agent.id}`,
};
console.log(JSON.stringify(initEvent));
@@ -468,6 +485,8 @@ export async function handleHeadlessCommand(
toolName: string;
toolArgs: string;
};
reason: string;
matchedRule: string;
}
| {
type: "deny";
@@ -523,6 +542,8 @@ export async function handleHeadlessCommand(
decisions.push({
type: "approve",
approval: currentApproval,
reason: permission.reason || "Allowed by permission rule",
matchedRule: permission.matchedRule || "auto-approved",
});
}
@@ -535,16 +556,19 @@ export async function handleHeadlessCommand(
if (outputFormat === "stream-json") {
for (const decision of decisions) {
if (decision.type === "approve") {
console.log(
JSON.stringify({
type: "auto_approval",
tool_name: decision.approval.toolName,
const autoApprovalMsg: AutoApprovalMessage = {
type: "auto_approval",
tool_call: {
name: decision.approval.toolName,
tool_call_id: decision.approval.toolCallId,
tool_args: decision.approval.toolArgs,
session_id: sessionId,
uuid: `auto-approval-${decision.approval.toolCallId}`,
}),
);
arguments: decision.approval.toolArgs,
},
reason: decision.reason,
matched_rule: decision.matchedRule,
session_id: sessionId,
uuid: `auto-approval-${decision.approval.toolCallId}`,
};
console.log(JSON.stringify(autoApprovalMsg));
}
}
}
@@ -640,28 +664,54 @@ export async function handleHeadlessCommand(
runIds.add(chunk.run_id);
}
// Detect mid-stream errors (errors without message_type)
// Detect mid-stream errors
// Case 1: LettaErrorMessage from the API (has message_type: "error_message")
if (
"message_type" in chunk &&
chunk.message_type === "error_message"
) {
// This is a LettaErrorMessage - nest it in our wire format
const apiError = chunk as LettaStreamingResponse.LettaErrorMessage;
const errorEvent: ErrorMessage = {
type: "error",
message: apiError.message,
stop_reason: "error",
run_id: apiError.run_id,
api_error: apiError,
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(errorEvent));
// Still accumulate for tracking
const { onChunk: accumulatorOnChunk } = await import(
"./cli/helpers/accumulator"
);
accumulatorOnChunk(buffers, chunk);
continue;
}
// Case 2: Generic error object without message_type
const chunkWithError = chunk as typeof chunk & {
error?: { message?: string; detail?: string };
};
if (chunkWithError.error && !chunk.message_type) {
if (chunkWithError.error && !("message_type" in chunk)) {
// Emit as error event
const errorMsg =
const errorText =
chunkWithError.error.message || "An error occurred";
const errorDetail = chunkWithError.error.detail || "";
const fullErrorText = errorDetail
? `${errorMsg}: ${errorDetail}`
: errorMsg;
? `${errorText}: ${errorDetail}`
: errorText;
console.log(
JSON.stringify({
type: "error",
message: fullErrorText,
detail: errorDetail,
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
const errorEvent: ErrorMessage = {
type: "error",
message: fullErrorText,
stop_reason: "error",
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(errorEvent));
// Still accumulate for tracking
const { onChunk: accumulatorOnChunk } = await import(
@@ -786,18 +836,19 @@ export async function handleHeadlessCommand(
);
if (missing.length === 0) {
shouldOutputChunk = false;
console.log(
JSON.stringify({
type: "auto_approval",
tool_name: nextName,
const autoApprovalMsg: AutoApprovalMessage = {
type: "auto_approval",
tool_call: {
name: nextName,
tool_call_id: id,
tool_args: incomingArgs,
reason: permission.reason,
matched_rule: permission.matchedRule,
session_id: sessionId,
uuid: `auto-approval-${id}`,
}),
);
arguments: incomingArgs || "{}",
},
reason: permission.reason || "Allowed by permission rule",
matched_rule: permission.matchedRule || "auto-approved",
session_id: sessionId,
uuid: `auto-approval-${id}`,
};
console.log(JSON.stringify(autoApprovalMsg));
autoApprovalEmitted.add(id);
}
}
@@ -816,24 +867,22 @@ export async function handleHeadlessCommand(
if (includePartialMessages) {
// Emit as stream_event wrapper (like Claude Code with --include-partial-messages)
console.log(
JSON.stringify({
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid,
}),
);
const streamEvent: StreamEvent = {
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(streamEvent));
} else {
// Emit as regular message (default)
console.log(
JSON.stringify({
type: "message",
...chunk,
session_id: sessionId,
uuid,
}),
);
const msg: MessageWire = {
type: "message",
...chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(msg));
}
}
@@ -991,18 +1040,17 @@ export async function handleHeadlessCommand(
llmApiErrorRetries = attempt;
if (outputFormat === "stream-json") {
console.log(
JSON.stringify({
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId,
session_id: sessionId,
uuid: `retry-${lastRunId || crypto.randomUUID()}`,
}),
);
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `retry-${lastRunId || crypto.randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
const delaySeconds = Math.round(delayMs / 1000);
console.error(
@@ -1065,18 +1113,17 @@ export async function handleHeadlessCommand(
llmApiErrorRetries = attempt;
if (outputFormat === "stream-json") {
console.log(
JSON.stringify({
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId,
session_id: sessionId,
uuid: `retry-${lastRunId || crypto.randomUUID()}`,
}),
);
const retryMsg: RetryMessage = {
type: "retry",
reason: "llm_api_error",
attempt,
max_attempts: LLM_API_ERROR_MAX_RETRIES,
delay_ms: delayMs,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `retry-${lastRunId || crypto.randomUUID()}`,
};
console.log(JSON.stringify(retryMsg));
} else {
const delaySeconds = Math.round(delayMs / 1000);
console.error(
@@ -1135,16 +1182,15 @@ export async function handleHeadlessCommand(
if (outputFormat === "stream-json") {
// Emit error event
console.log(
JSON.stringify({
type: "error",
message: errorMessage,
stop_reason: stopReason,
run_id: lastRunId,
session_id: sessionId,
uuid: `error-${lastRunId || crypto.randomUUID()}`,
}),
);
const errorMsg: ErrorMessage = {
type: "error",
message: errorMessage,
stop_reason: stopReason,
run_id: lastRunId ?? undefined,
session_id: sessionId,
uuid: `error-${lastRunId || crypto.randomUUID()}`,
};
console.log(JSON.stringify(errorMsg));
} else {
console.error(`Error: ${errorMessage}`);
}
@@ -1158,15 +1204,15 @@ export async function handleHeadlessCommand(
const errorDetails = formatErrorDetails(error, agent.id);
if (outputFormat === "stream-json") {
console.log(
JSON.stringify({
type: "error",
message: errorDetails,
run_id: lastKnownRunId,
session_id: sessionId,
uuid: `error-${lastKnownRunId || crypto.randomUUID()}`,
}),
);
const errorMsg: ErrorMessage = {
type: "error",
message: errorDetails,
stop_reason: "error",
run_id: lastKnownRunId ?? undefined,
session_id: sessionId,
uuid: `error-${lastKnownRunId || crypto.randomUUID()}`,
};
console.log(JSON.stringify(errorMsg));
} else {
console.error(`Error: ${errorDetails}`);
}
@@ -1250,10 +1296,9 @@ export async function handleHeadlessCommand(
allRunIds.size > 0
? `result-${Array.from(allRunIds).pop()}`
: `result-${agent.id}`;
const resultEvent = {
const resultEvent: ResultMessage = {
type: "result",
subtype: "success",
is_error: false,
session_id: sessionId,
duration_ms: Math.round(stats.totalWallMs),
duration_api_ms: Math.round(stats.totalApiMs),
@@ -1330,14 +1375,14 @@ async function runBidirectionalMode(
try {
message = JSON.parse(line);
} catch {
console.log(
JSON.stringify({
type: "error",
message: "Invalid JSON input",
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
const errorMsg: ErrorMessage = {
type: "error",
message: "Invalid JSON input",
stop_reason: "error",
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(errorMsg));
continue;
}
@@ -1348,52 +1393,49 @@ async function runBidirectionalMode(
if (subtype === "initialize") {
// Return session info
console.log(
JSON.stringify({
type: "control_response",
const initResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "success",
request_id: requestId ?? "",
response: {
subtype: "success",
request_id: requestId,
response: {
agent_id: agent.id,
model: agent.llm_config?.model,
tools: agent.tools?.map((t) => t.name) || [],
},
agent_id: agent.id,
model: agent.llm_config?.model,
tools: agent.tools?.map((t) => t.name) || [],
},
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
},
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(initResponse));
} else if (subtype === "interrupt") {
// Abort current operation if any
if (currentAbortController !== null) {
(currentAbortController as AbortController).abort();
currentAbortController = null;
}
console.log(
JSON.stringify({
type: "control_response",
response: {
subtype: "success",
request_id: requestId,
},
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
const interruptResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "success",
request_id: requestId ?? "",
},
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(interruptResponse));
} else {
console.log(
JSON.stringify({
type: "control_response",
response: {
subtype: "error",
request_id: requestId,
message: `Unknown control request subtype: ${subtype}`,
},
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
const errorResponse: ControlResponse = {
type: "control_response",
response: {
subtype: "error",
request_id: requestId ?? "",
error: `Unknown control request subtype: ${subtype}`,
},
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(errorResponse));
}
continue;
}
@@ -1429,23 +1471,21 @@ async function runBidirectionalMode(
const uuid = chunkWithIds.otid || chunkWithIds.id;
if (includePartialMessages) {
console.log(
JSON.stringify({
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid,
}),
);
const streamEvent: StreamEvent = {
type: "stream_event",
event: chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(streamEvent));
} else {
console.log(
JSON.stringify({
type: "message",
...chunk,
session_id: sessionId,
uuid,
}),
);
const msg: MessageWire = {
type: "message",
...chunk,
session_id: sessionId,
uuid: uuid || crypto.randomUUID(),
};
console.log(JSON.stringify(msg));
}
// Accumulate for result
@@ -1466,30 +1506,32 @@ async function runBidirectionalMode(
) as Extract<Line, { kind: "assistant" }> | undefined;
const resultText = lastAssistant?.text || "";
console.log(
JSON.stringify({
type: "result",
subtype: currentAbortController?.signal.aborted
? "interrupted"
: "success",
is_error: false,
session_id: sessionId,
duration_ms: Math.round(durationMs),
result: resultText,
agent_id: agent.id,
uuid: `result-${agent.id}-${Date.now()}`,
}),
);
const resultMsg: ResultMessage = {
type: "result",
subtype: currentAbortController?.signal.aborted
? "interrupted"
: "success",
session_id: sessionId,
duration_ms: Math.round(durationMs),
duration_api_ms: 0, // Not tracked in bidirectional mode
num_turns: 1,
result: resultText,
agent_id: agent.id,
run_ids: [],
usage: null,
uuid: `result-${agent.id}-${Date.now()}`,
};
console.log(JSON.stringify(resultMsg));
} catch (error) {
console.log(
JSON.stringify({
type: "error",
message:
error instanceof Error ? error.message : "Unknown error occurred",
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
const errorMsg: ErrorMessage = {
type: "error",
message:
error instanceof Error ? error.message : "Unknown error occurred",
stop_reason: "error",
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(errorMsg));
} finally {
currentAbortController = null;
}
@@ -1497,14 +1539,14 @@ async function runBidirectionalMode(
}
// Unknown message type
console.log(
JSON.stringify({
type: "error",
message: `Unknown message type: ${message.type}`,
session_id: sessionId,
uuid: crypto.randomUUID(),
}),
);
const errorMsg: ErrorMessage = {
type: "error",
message: `Unknown message type: ${message.type}`,
stop_reason: "error",
session_id: sessionId,
uuid: crypto.randomUUID(),
};
console.log(JSON.stringify(errorMsg));
}
// Stdin closed, exit gracefully