diff --git a/src/headless.ts b/src/headless.ts index d5c6711..f16cec3 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -4,6 +4,7 @@ import type { MessageCreate, } from "@letta-ai/letta-client/resources/agents/agents"; import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/messages"; +import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import { getClient } from "./agent/client"; import { createAgent } from "./agent/create"; import { sendMessageStream } from "./agent/message"; @@ -135,7 +136,93 @@ export async function handleHeadlessCommand(argv: string[]) { console.log(JSON.stringify(initEvent)); } - // Send message and process stream loop + // Helper to resolve any pending approvals before sending user input + const resolveAllPendingApprovals = async () => { + const { getResumeData } = await import("./agent/check-approval"); + while (true) { + const resume = await getResumeData(client, agent.id); + if (!resume.pendingApproval) break; + const { toolCallId, toolName, toolArgs } = resume.pendingApproval; + const parsedArgs = safeJsonParseOr>( + toolArgs || "{}", + {}, + ); + const permission = await checkToolPermission(toolName, parsedArgs); + let approvalInput: ApprovalCreate; + if (permission.decision === "deny" || permission.decision === "ask") { + const denyReason = + permission.decision === "ask" + ? "Tool requires approval (headless mode)" + : `Permission denied: ${permission.matchedRule || permission.reason}`; + approvalInput = { + type: "approval", + approval_request_id: toolCallId, + approve: false, + reason: denyReason, + }; + } else { + // Verify required args present; if missing, deny so the model retries with args + const { getToolSchema } = await import("./tools/manager"); + const schema = getToolSchema(toolName); + const required = + (schema?.input_schema?.required as string[] | undefined) || []; + const missing = required.filter( + (key) => + !(key in parsedArgs) || String(parsedArgs[key] ?? "").length === 0, + ); + if (missing.length > 0) { + approvalInput = { + type: "approval", + approval_request_id: toolCallId, + approve: false, + reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`, + }; + } else { + const toolResult = await executeTool(toolName, parsedArgs); + // Emit auto_approval event for stream-json for visibility + if (outputFormat === "stream-json") { + console.log( + JSON.stringify({ + type: "auto_approval", + tool_name: toolName, + tool_call_id: toolCallId, + reason: permission.reason, + matched_rule: permission.matchedRule, + }), + ); + } + approvalInput = { + type: "approval", + approvals: [ + { + type: "tool", + tool_call_id: toolCallId, + tool_return: toolResult.toolReturn, + status: toolResult.status, + stdout: toolResult.stdout, + stderr: toolResult.stderr, + }, + ], + }; + } + } + // Send the approval to clear the pending state; drain the stream without output + const approvalStream = await sendMessageStream(agent.id, [approvalInput]); + if (outputFormat === "stream-json") { + // Consume quickly but don't emit message frames to stdout + for await (const _ of approvalStream) { + // no-op + } + } else { + await drainStream(approvalStream, createBuffers(), () => {}); + } + } + }; + + // Clear any pending approvals before starting a new turn + await resolveAllPendingApprovals(); + + // Start with the user message let currentInput: Array = [ { role: "user", @@ -148,7 +235,7 @@ export async function handleHeadlessCommand(argv: string[]) { const stream = await sendMessageStream(agent.id, currentInput); // For stream-json, output each chunk as it arrives - let stopReason: Letta.StopReasonType; + let stopReason: StopReasonType; let approval: { toolCallId: string; toolName: string; @@ -158,47 +245,152 @@ export async function handleHeadlessCommand(argv: string[]) { if (outputFormat === "stream-json") { const startTime = performance.now(); - let lastStopReason: Letta.StopReasonType | null = null; + let lastStopReason: StopReasonType | null = null; + + // Track approval requests across streamed chunks + const approvalRequests = new Map< + string, + { toolName: string; args: string } + >(); + const autoApprovalEmitted = new Set(); + let lastApprovalId: string | null = null; for await (const chunk of stream) { - // Output chunk as message event - console.log( - JSON.stringify({ - type: "message", - ...chunk, - }), - ); + // Detect server conflict due to pending approval; handle it and retry + const errObj = (chunk as unknown as { error?: { detail?: string } }) + .error; + if (errObj?.detail?.includes("Cannot send a new message")) { + // Don't emit this error; clear approvals and retry outer loop + await resolveAllPendingApprovals(); + // Reset state and restart turn + lastStopReason = "error" as StopReasonType; + break; + } + if ( + errObj?.detail?.includes( + "No tool call is currently awaiting approval", + ) + ) { + // Server isn't ready for an approval yet; let the stream continue until it is + // Suppress the error frame from output + continue; + } + // Check if we should skip outputting approval requests in bypass mode + const isApprovalRequest = + chunk.message_type === "approval_request_message"; + let shouldOutputChunk = true; + + // Track approval requests + if (isApprovalRequest) { + const chunkWithTools = chunk as typeof chunk & { + tool_call?: { + tool_call_id?: string; + name?: string; + arguments?: string; + }; + tool_calls?: Array<{ + tool_call_id?: string; + name?: string; + arguments?: string; + }>; + }; + + const toolCalls = Array.isArray(chunkWithTools.tool_calls) + ? chunkWithTools.tool_calls + : chunkWithTools.tool_call + ? [chunkWithTools.tool_call] + : []; + + for (const toolCall of toolCalls) { + if (toolCall?.tool_call_id && toolCall?.name) { + const id = toolCall.tool_call_id; + lastApprovalId = id; + + // Prefer the most complete args we have seen so far; concatenate deltas + const prev = approvalRequests.get(id); + const base = prev && prev.args !== "{}" ? prev.args : ""; + const incomingArgs = + toolCall.arguments && toolCall.arguments.trim().length > 0 + ? `${base}${toolCall.arguments}` + : base || "{}"; + + approvalRequests.set(id, { + toolName: toolCall.name, + args: incomingArgs, + }); + + // Keep an up-to-date approval object for downstream handling + approval = { + toolCallId: id, + toolName: toolCall.name, + toolArgs: incomingArgs, + }; + + // Check if this approval will be auto-approved. Dedup per tool_call_id + if (!autoApprovalEmitted.has(id)) { + const parsedArgs = safeJsonParseOr | null>(incomingArgs || "{}", null); + const permission = await checkToolPermission( + toolCall.name, + parsedArgs || {}, + ); + if (permission.decision === "allow" && parsedArgs) { + // Only emit auto_approval if we already have all required params + const { getToolSchema } = await import("./tools/manager"); + const schema = getToolSchema(toolCall.name); + const required = + (schema?.input_schema?.required as + | string[] + | undefined) || []; + const missing = required.filter( + (key) => + !(key in parsedArgs) || + String( + (parsedArgs as Record)[key] ?? "", + ).length === 0, + ); + if (missing.length === 0) { + shouldOutputChunk = false; + console.log( + JSON.stringify({ + type: "auto_approval", + tool_name: toolCall.name, + tool_call_id: id, + reason: permission.reason, + matched_rule: permission.matchedRule, + }), + ); + autoApprovalEmitted.add(id); + } + } + } + } + } + } + + // Output chunk as message event (unless filtered) + if (shouldOutputChunk) { + console.log( + JSON.stringify({ + type: "message", + ...chunk, + }), + ); + } // Still accumulate for approval tracking const { onChunk } = await import("./cli/helpers/accumulator"); onChunk(buffers, chunk); - // Track stop reason and approval - if (chunk.messageType === "stop_reason") { - lastStopReason = chunk.stopReason; - } - - // Track approval requests - if (chunk.messageType === "approval_request_message") { - const chunkWithToolCall = chunk as typeof chunk & { - toolCall?: { - toolCallId?: string; - name?: string; - arguments?: string; - }; - }; - const toolCall = chunkWithToolCall.toolCall; - if (toolCall?.toolCallId && toolCall?.name) { - approval = { - toolCallId: toolCall.toolCallId, - toolName: toolCall.name, - toolArgs: toolCall.arguments || "{}", - }; - } + // Track stop reason + if (chunk.message_type === "stop_reason") { + lastStopReason = chunk.stop_reason; } } - stopReason = lastStopReason || Letta.StopReasonType.Error; + stopReason = lastStopReason || "error"; apiDurationMs = performance.now() - startTime; // Mark final line as finished @@ -269,7 +461,29 @@ export async function handleHeadlessCommand(argv: string[]) { continue; } - // Permission is "allow" - auto-execute tool and continue loop + // Permission is "allow" - verify we have required arguments before executing + const { getToolSchema } = await import("./tools/manager"); + const schema = getToolSchema(toolName); + const required = + (schema?.input_schema?.required as string[] | undefined) || []; + const missing = required.filter( + (key) => + !(key in parsedArgs) || String(parsedArgs[key] ?? "").length === 0, + ); + if (missing.length > 0) { + // Auto-deny with a clear reason so the model can retry with arguments + currentInput = [ + { + type: "approval", + approval_request_id: toolCallId, + approve: false, + reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`, + }, + ]; + continue; + } + + // Execute tool and continue loop const toolResult = await executeTool(toolName, parsedArgs); currentInput = [