fix: enable--yolo mode with output stream-json (#38)
This commit is contained in:
282
src/headless.ts
282
src/headless.ts
@@ -4,6 +4,7 @@ import type {
|
||||
MessageCreate,
|
||||
} from "@letta-ai/letta-client/resources/agents/agents";
|
||||
import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
|
||||
import { getClient } from "./agent/client";
|
||||
import { createAgent } from "./agent/create";
|
||||
import { sendMessageStream } from "./agent/message";
|
||||
@@ -135,7 +136,93 @@ export async function handleHeadlessCommand(argv: string[]) {
|
||||
console.log(JSON.stringify(initEvent));
|
||||
}
|
||||
|
||||
// Send message and process stream loop
|
||||
// Helper to resolve any pending approvals before sending user input
|
||||
const resolveAllPendingApprovals = async () => {
|
||||
const { getResumeData } = await import("./agent/check-approval");
|
||||
while (true) {
|
||||
const resume = await getResumeData(client, agent.id);
|
||||
if (!resume.pendingApproval) break;
|
||||
const { toolCallId, toolName, toolArgs } = resume.pendingApproval;
|
||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||
toolArgs || "{}",
|
||||
{},
|
||||
);
|
||||
const permission = await checkToolPermission(toolName, parsedArgs);
|
||||
let approvalInput: ApprovalCreate;
|
||||
if (permission.decision === "deny" || permission.decision === "ask") {
|
||||
const denyReason =
|
||||
permission.decision === "ask"
|
||||
? "Tool requires approval (headless mode)"
|
||||
: `Permission denied: ${permission.matchedRule || permission.reason}`;
|
||||
approvalInput = {
|
||||
type: "approval",
|
||||
approval_request_id: toolCallId,
|
||||
approve: false,
|
||||
reason: denyReason,
|
||||
};
|
||||
} else {
|
||||
// Verify required args present; if missing, deny so the model retries with args
|
||||
const { getToolSchema } = await import("./tools/manager");
|
||||
const schema = getToolSchema(toolName);
|
||||
const required =
|
||||
(schema?.input_schema?.required as string[] | undefined) || [];
|
||||
const missing = required.filter(
|
||||
(key) =>
|
||||
!(key in parsedArgs) || String(parsedArgs[key] ?? "").length === 0,
|
||||
);
|
||||
if (missing.length > 0) {
|
||||
approvalInput = {
|
||||
type: "approval",
|
||||
approval_request_id: toolCallId,
|
||||
approve: false,
|
||||
reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`,
|
||||
};
|
||||
} else {
|
||||
const toolResult = await executeTool(toolName, parsedArgs);
|
||||
// Emit auto_approval event for stream-json for visibility
|
||||
if (outputFormat === "stream-json") {
|
||||
console.log(
|
||||
JSON.stringify({
|
||||
type: "auto_approval",
|
||||
tool_name: toolName,
|
||||
tool_call_id: toolCallId,
|
||||
reason: permission.reason,
|
||||
matched_rule: permission.matchedRule,
|
||||
}),
|
||||
);
|
||||
}
|
||||
approvalInput = {
|
||||
type: "approval",
|
||||
approvals: [
|
||||
{
|
||||
type: "tool",
|
||||
tool_call_id: toolCallId,
|
||||
tool_return: toolResult.toolReturn,
|
||||
status: toolResult.status,
|
||||
stdout: toolResult.stdout,
|
||||
stderr: toolResult.stderr,
|
||||
},
|
||||
],
|
||||
};
|
||||
}
|
||||
}
|
||||
// Send the approval to clear the pending state; drain the stream without output
|
||||
const approvalStream = await sendMessageStream(agent.id, [approvalInput]);
|
||||
if (outputFormat === "stream-json") {
|
||||
// Consume quickly but don't emit message frames to stdout
|
||||
for await (const _ of approvalStream) {
|
||||
// no-op
|
||||
}
|
||||
} else {
|
||||
await drainStream(approvalStream, createBuffers(), () => {});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Clear any pending approvals before starting a new turn
|
||||
await resolveAllPendingApprovals();
|
||||
|
||||
// Start with the user message
|
||||
let currentInput: Array<MessageCreate | ApprovalCreate> = [
|
||||
{
|
||||
role: "user",
|
||||
@@ -148,7 +235,7 @@ export async function handleHeadlessCommand(argv: string[]) {
|
||||
const stream = await sendMessageStream(agent.id, currentInput);
|
||||
|
||||
// For stream-json, output each chunk as it arrives
|
||||
let stopReason: Letta.StopReasonType;
|
||||
let stopReason: StopReasonType;
|
||||
let approval: {
|
||||
toolCallId: string;
|
||||
toolName: string;
|
||||
@@ -158,47 +245,152 @@ export async function handleHeadlessCommand(argv: string[]) {
|
||||
|
||||
if (outputFormat === "stream-json") {
|
||||
const startTime = performance.now();
|
||||
let lastStopReason: Letta.StopReasonType | null = null;
|
||||
let lastStopReason: StopReasonType | null = null;
|
||||
|
||||
// Track approval requests across streamed chunks
|
||||
const approvalRequests = new Map<
|
||||
string,
|
||||
{ toolName: string; args: string }
|
||||
>();
|
||||
const autoApprovalEmitted = new Set<string>();
|
||||
let lastApprovalId: string | null = null;
|
||||
|
||||
for await (const chunk of stream) {
|
||||
// Output chunk as message event
|
||||
console.log(
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
...chunk,
|
||||
}),
|
||||
);
|
||||
// Detect server conflict due to pending approval; handle it and retry
|
||||
const errObj = (chunk as unknown as { error?: { detail?: string } })
|
||||
.error;
|
||||
if (errObj?.detail?.includes("Cannot send a new message")) {
|
||||
// Don't emit this error; clear approvals and retry outer loop
|
||||
await resolveAllPendingApprovals();
|
||||
// Reset state and restart turn
|
||||
lastStopReason = "error" as StopReasonType;
|
||||
break;
|
||||
}
|
||||
if (
|
||||
errObj?.detail?.includes(
|
||||
"No tool call is currently awaiting approval",
|
||||
)
|
||||
) {
|
||||
// Server isn't ready for an approval yet; let the stream continue until it is
|
||||
// Suppress the error frame from output
|
||||
continue;
|
||||
}
|
||||
// Check if we should skip outputting approval requests in bypass mode
|
||||
const isApprovalRequest =
|
||||
chunk.message_type === "approval_request_message";
|
||||
let shouldOutputChunk = true;
|
||||
|
||||
// Track approval requests
|
||||
if (isApprovalRequest) {
|
||||
const chunkWithTools = chunk as typeof chunk & {
|
||||
tool_call?: {
|
||||
tool_call_id?: string;
|
||||
name?: string;
|
||||
arguments?: string;
|
||||
};
|
||||
tool_calls?: Array<{
|
||||
tool_call_id?: string;
|
||||
name?: string;
|
||||
arguments?: string;
|
||||
}>;
|
||||
};
|
||||
|
||||
const toolCalls = Array.isArray(chunkWithTools.tool_calls)
|
||||
? chunkWithTools.tool_calls
|
||||
: chunkWithTools.tool_call
|
||||
? [chunkWithTools.tool_call]
|
||||
: [];
|
||||
|
||||
for (const toolCall of toolCalls) {
|
||||
if (toolCall?.tool_call_id && toolCall?.name) {
|
||||
const id = toolCall.tool_call_id;
|
||||
lastApprovalId = id;
|
||||
|
||||
// Prefer the most complete args we have seen so far; concatenate deltas
|
||||
const prev = approvalRequests.get(id);
|
||||
const base = prev && prev.args !== "{}" ? prev.args : "";
|
||||
const incomingArgs =
|
||||
toolCall.arguments && toolCall.arguments.trim().length > 0
|
||||
? `${base}${toolCall.arguments}`
|
||||
: base || "{}";
|
||||
|
||||
approvalRequests.set(id, {
|
||||
toolName: toolCall.name,
|
||||
args: incomingArgs,
|
||||
});
|
||||
|
||||
// Keep an up-to-date approval object for downstream handling
|
||||
approval = {
|
||||
toolCallId: id,
|
||||
toolName: toolCall.name,
|
||||
toolArgs: incomingArgs,
|
||||
};
|
||||
|
||||
// Check if this approval will be auto-approved. Dedup per tool_call_id
|
||||
if (!autoApprovalEmitted.has(id)) {
|
||||
const parsedArgs = safeJsonParseOr<Record<
|
||||
string,
|
||||
unknown
|
||||
> | null>(incomingArgs || "{}", null);
|
||||
const permission = await checkToolPermission(
|
||||
toolCall.name,
|
||||
parsedArgs || {},
|
||||
);
|
||||
if (permission.decision === "allow" && parsedArgs) {
|
||||
// Only emit auto_approval if we already have all required params
|
||||
const { getToolSchema } = await import("./tools/manager");
|
||||
const schema = getToolSchema(toolCall.name);
|
||||
const required =
|
||||
(schema?.input_schema?.required as
|
||||
| string[]
|
||||
| undefined) || [];
|
||||
const missing = required.filter(
|
||||
(key) =>
|
||||
!(key in parsedArgs) ||
|
||||
String(
|
||||
(parsedArgs as Record<string, unknown>)[key] ?? "",
|
||||
).length === 0,
|
||||
);
|
||||
if (missing.length === 0) {
|
||||
shouldOutputChunk = false;
|
||||
console.log(
|
||||
JSON.stringify({
|
||||
type: "auto_approval",
|
||||
tool_name: toolCall.name,
|
||||
tool_call_id: id,
|
||||
reason: permission.reason,
|
||||
matched_rule: permission.matchedRule,
|
||||
}),
|
||||
);
|
||||
autoApprovalEmitted.add(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Output chunk as message event (unless filtered)
|
||||
if (shouldOutputChunk) {
|
||||
console.log(
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
...chunk,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// Still accumulate for approval tracking
|
||||
const { onChunk } = await import("./cli/helpers/accumulator");
|
||||
onChunk(buffers, chunk);
|
||||
|
||||
// Track stop reason and approval
|
||||
if (chunk.messageType === "stop_reason") {
|
||||
lastStopReason = chunk.stopReason;
|
||||
}
|
||||
|
||||
// Track approval requests
|
||||
if (chunk.messageType === "approval_request_message") {
|
||||
const chunkWithToolCall = chunk as typeof chunk & {
|
||||
toolCall?: {
|
||||
toolCallId?: string;
|
||||
name?: string;
|
||||
arguments?: string;
|
||||
};
|
||||
};
|
||||
const toolCall = chunkWithToolCall.toolCall;
|
||||
if (toolCall?.toolCallId && toolCall?.name) {
|
||||
approval = {
|
||||
toolCallId: toolCall.toolCallId,
|
||||
toolName: toolCall.name,
|
||||
toolArgs: toolCall.arguments || "{}",
|
||||
};
|
||||
}
|
||||
// Track stop reason
|
||||
if (chunk.message_type === "stop_reason") {
|
||||
lastStopReason = chunk.stop_reason;
|
||||
}
|
||||
}
|
||||
|
||||
stopReason = lastStopReason || Letta.StopReasonType.Error;
|
||||
stopReason = lastStopReason || "error";
|
||||
apiDurationMs = performance.now() - startTime;
|
||||
|
||||
// Mark final line as finished
|
||||
@@ -269,7 +461,29 @@ export async function handleHeadlessCommand(argv: string[]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Permission is "allow" - auto-execute tool and continue loop
|
||||
// Permission is "allow" - verify we have required arguments before executing
|
||||
const { getToolSchema } = await import("./tools/manager");
|
||||
const schema = getToolSchema(toolName);
|
||||
const required =
|
||||
(schema?.input_schema?.required as string[] | undefined) || [];
|
||||
const missing = required.filter(
|
||||
(key) =>
|
||||
!(key in parsedArgs) || String(parsedArgs[key] ?? "").length === 0,
|
||||
);
|
||||
if (missing.length > 0) {
|
||||
// Auto-deny with a clear reason so the model can retry with arguments
|
||||
currentInput = [
|
||||
{
|
||||
type: "approval",
|
||||
approval_request_id: toolCallId,
|
||||
approve: false,
|
||||
reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`,
|
||||
},
|
||||
];
|
||||
continue;
|
||||
}
|
||||
|
||||
// Execute tool and continue loop
|
||||
const toolResult = await executeTool(toolName, parsedArgs);
|
||||
|
||||
currentInput = [
|
||||
|
||||
Reference in New Issue
Block a user