diff --git a/src/cli/App.tsx b/src/cli/App.tsx index fc7f070..aadc065 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -159,6 +159,7 @@ import { setToolCallsRunning, toLines, } from "./helpers/accumulator"; +import { classifyApprovals } from "./helpers/approvalClassification"; import { backfillBuffers } from "./helpers/backfill"; import { type AdvancedDiffSuccess, @@ -3478,65 +3479,13 @@ export default function App({ } // Check permissions for all approvals (including fancy UI tools) - const approvalResults = await Promise.all( - approvalsToProcess.map(async (approvalItem) => { - // Check if approval is incomplete (missing name) - // Note: toolArgs can be empty string for tools with no arguments (e.g., EnterPlanMode) - if (!approvalItem.toolName) { - return { - approval: approvalItem, - permission: { - decision: "deny" as const, - reason: - "Tool call incomplete - missing name or arguments", - }, - context: null, - }; - } - - const parsedArgs = safeJsonParseOr>( - approvalItem.toolArgs, - {}, - ); - const permission = await checkToolPermission( - approvalItem.toolName, - parsedArgs, - ); - const context = await analyzeToolApproval( - approvalItem.toolName, - parsedArgs, - ); - return { approval: approvalItem, permission, context }; - }), - ); - - // Categorize approvals by permission decision - // Fancy UI tools should always go through their dialog, even if auto-allowed - const needsUserInput: typeof approvalResults = []; - const autoDenied: typeof approvalResults = []; - const autoAllowed: typeof approvalResults = []; - - for (const ac of approvalResults) { - const { approval, permission } = ac; - let decision = permission.decision; - - // Some tools always need user input regardless of yolo mode - if ( - alwaysRequiresUserInput(approval.toolName) && - decision === "allow" - ) { - decision = "ask"; - } - - if (decision === "ask") { - needsUserInput.push(ac); - } else if (decision === "deny") { - autoDenied.push(ac); - } else { - // decision === "allow" - autoAllowed.push(ac); - } - } + const { needsUserInput, autoAllowed, autoDenied } = + await classifyApprovals(approvalsToProcess, { + getContext: analyzeToolApproval, + alwaysRequiresUserInput, + missingNameReason: + "Tool call incomplete - missing name or arguments", + }); // Precompute diffs for file edit tools before execution (both auto-allowed and needs-user-input) // This is needed for inline approval UI to show diffs, and for post-approval rendering @@ -5054,58 +5003,12 @@ export default function App({ } // There are pending approvals - check permissions (respects yolo mode) - const approvalResults = await Promise.all( - existingApprovals.map(async (approvalItem) => { - if (!approvalItem.toolName) { - return { - approval: approvalItem, - permission: { - decision: "deny" as const, - reason: "Tool call incomplete - missing name", - }, - context: null, - }; - } - const parsedArgs = safeJsonParseOr>( - approvalItem.toolArgs, - {}, - ); - const permission = await checkToolPermission( - approvalItem.toolName, - parsedArgs, - ); - const context = await analyzeToolApproval( - approvalItem.toolName, - parsedArgs, - ); - return { approval: approvalItem, permission, context }; - }), - ); - - // Categorize by permission decision - const needsUserInput: typeof approvalResults = []; - const autoAllowed: typeof approvalResults = []; - const autoDenied: typeof approvalResults = []; - - for (const ac of approvalResults) { - const { approval, permission } = ac; - let decision = permission.decision; - - if ( - alwaysRequiresUserInput(approval.toolName) && - decision === "allow" - ) { - decision = "ask"; - } - - if (decision === "ask") { - needsUserInput.push(ac); - } else if (decision === "deny") { - autoDenied.push(ac); - } else { - autoAllowed.push(ac); - } - } + const { needsUserInput, autoAllowed, autoDenied } = + await classifyApprovals(existingApprovals, { + getContext: analyzeToolApproval, + alwaysRequiresUserInput, + missingNameReason: "Tool call incomplete - missing name", + }); // If any approvals need user input, show dialog if (needsUserInput.length > 0) { @@ -7680,33 +7583,12 @@ ${SYSTEM_REMINDER_CLOSE} if (existingApprovals && existingApprovals.length > 0) { // There are pending approvals - check permissions first (respects yolo mode) - const approvalResults = await Promise.all( - existingApprovals.map(async (approvalItem) => { - if (!approvalItem.toolName) { - return { - approval: approvalItem, - permission: { - decision: "deny" as const, - reason: "Tool call incomplete - missing name", - }, - context: null, - }; - } - const parsedArgs = safeJsonParseOr>( - approvalItem.toolArgs, - {}, - ); - const permission = await checkToolPermission( - approvalItem.toolName, - parsedArgs, - ); - const context = await analyzeToolApproval( - approvalItem.toolName, - parsedArgs, - ); - return { approval: approvalItem, permission, context }; - }), - ); + const { needsUserInput, autoAllowed, autoDenied } = + await classifyApprovals(existingApprovals, { + getContext: analyzeToolApproval, + alwaysRequiresUserInput, + missingNameReason: "Tool call incomplete - missing name", + }); // Check if user cancelled during permission check if ( @@ -7723,32 +7605,6 @@ ${SYSTEM_REMINDER_CLOSE} return { submitted: false }; } - // Categorize by permission decision - const needsUserInput: typeof approvalResults = []; - const autoAllowed: typeof approvalResults = []; - const autoDenied: typeof approvalResults = []; - - for (const ac of approvalResults) { - const { approval, permission } = ac; - let decision = permission.decision; - - // Some tools always need user input regardless of yolo mode - if ( - alwaysRequiresUserInput(approval.toolName) && - decision === "allow" - ) { - decision = "ask"; - } - - if (decision === "ask") { - needsUserInput.push(ac); - } else if (decision === "deny") { - autoDenied.push(ac); - } else { - autoAllowed.push(ac); - } - } - // If all approvals can be auto-handled (yolo mode), process them immediately if (needsUserInput.length === 0) { // Precompute diffs for file edit tools before execution (both auto-allowed and needs-user-input) diff --git a/src/cli/helpers/approvalClassification.ts b/src/cli/helpers/approvalClassification.ts new file mode 100644 index 0000000..0404e75 --- /dev/null +++ b/src/cli/helpers/approvalClassification.ts @@ -0,0 +1,136 @@ +import type { ApprovalContext } from "../../permissions/analyzer"; +import { checkToolPermission, getToolSchema } from "../../tools/manager"; +import { safeJsonParseOr } from "./safeJsonParse"; +import type { ApprovalRequest } from "./streamProcessor"; + +type ToolPermission = Awaited>; + +export type ClassifiedApproval = { + approval: ApprovalRequest; + permission: ToolPermission; + context: TContext | null; + parsedArgs: Record; + missingRequiredArgs?: string[]; + denyReason?: string; +}; + +export type ApprovalClassification = { + needsUserInput: ClassifiedApproval[]; + autoAllowed: ClassifiedApproval[]; + autoDenied: ClassifiedApproval[]; +}; + +export type ClassifyApprovalsOptions = { + getContext?: ( + toolName: string, + parsedArgs: Record, + ) => Promise; + alwaysRequiresUserInput?: (toolName: string) => boolean; + treatAskAsDeny?: boolean; + denyReasonForAsk?: string; + missingNameReason?: string; + requireArgsForAutoApprove?: boolean; + missingArgsReason?: (missing: string[]) => string; +}; + +export async function getMissingRequiredArgs( + toolName: string, + parsedArgs: Record, +): Promise { + const schema = getToolSchema(toolName); + const required = + (schema?.input_schema?.required as string[] | undefined) || []; + return required.filter( + (key) => !(key in parsedArgs) || parsedArgs[key] == null, + ); +} + +export async function classifyApprovals( + approvals: ApprovalRequest[], + opts: ClassifyApprovalsOptions = {}, +): Promise> { + const needsUserInput: ClassifiedApproval[] = []; + const autoAllowed: ClassifiedApproval[] = []; + const autoDenied: ClassifiedApproval[] = []; + const denyReasonForAsk = + opts.denyReasonForAsk ?? "Tool requires approval (headless mode)"; + const missingNameReason = + opts.missingNameReason ?? "Tool call incomplete - missing name"; + + for (const approval of approvals) { + const toolName = approval.toolName; + if (!toolName) { + autoDenied.push({ + approval, + permission: { decision: "deny", reason: missingNameReason }, + context: null, + parsedArgs: {}, + denyReason: missingNameReason, + }); + continue; + } + + const parsedArgs = safeJsonParseOr>( + approval.toolArgs || "{}", + {}, + ); + const permission = await checkToolPermission(toolName, parsedArgs); + const context = opts.getContext + ? await opts.getContext(toolName, parsedArgs) + : null; + let decision = permission.decision; + + if (opts.alwaysRequiresUserInput?.(toolName) && decision === "allow") { + decision = "ask"; + } + + if (decision === "ask" && opts.treatAskAsDeny) { + autoDenied.push({ + approval, + permission, + context, + parsedArgs, + denyReason: denyReasonForAsk, + }); + continue; + } + + if (decision === "allow" && opts.requireArgsForAutoApprove) { + const missingRequiredArgs = await getMissingRequiredArgs( + toolName, + parsedArgs, + ); + if (missingRequiredArgs.length > 0) { + const denyReason = opts.missingArgsReason + ? opts.missingArgsReason(missingRequiredArgs) + : `Missing required parameter${missingRequiredArgs.length > 1 ? "s" : ""}: ${missingRequiredArgs.join(", ")}`; + autoDenied.push({ + approval, + permission, + context, + parsedArgs, + missingRequiredArgs, + denyReason, + }); + continue; + } + } + + const entry: ClassifiedApproval = { + approval, + permission, + context, + parsedArgs, + }; + + if (decision === "ask") { + needsUserInput.push(entry); + } else if (decision === "deny") { + autoDenied.push(entry); + } else { + autoAllowed.push(entry); + } + } + + return { needsUserInput, autoAllowed, autoDenied }; +} diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index e0498f1..0b0dbe0 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -13,6 +13,7 @@ import { markIncompleteToolsAsCancelled, onChunk, } from "./accumulator"; +import type { ErrorInfo } from "./streamProcessor"; import { StreamProcessor } from "./streamProcessor"; export type ApprovalRequest = { @@ -21,6 +22,27 @@ export type ApprovalRequest = { toolArgs: string; }; +export type DrainStreamHookContext = { + chunk: LettaStreamingResponse; + shouldOutput: boolean; + errorInfo?: ErrorInfo; + updatedApproval?: ApprovalRequest; + streamProcessor: StreamProcessor; +}; + +export type DrainStreamHookResult = { + shouldOutput?: boolean; + shouldAccumulate?: boolean; + stopReason?: StopReasonType; +}; + +export type DrainStreamHook = ( + ctx: DrainStreamHookContext, +) => + | DrainStreamHookResult + | undefined + | Promise; + type DrainResult = { stopReason: StopReasonType; lastRunId?: string | null; @@ -37,6 +59,7 @@ export async function drainStream( refresh: () => void, abortSignal?: AbortSignal, onFirstMessage?: () => void, + onChunkProcessed?: DrainStreamHook, ): Promise { const startTime = performance.now(); @@ -130,7 +153,8 @@ export async function drainStream( logTiming(`TTFT: ${formatDuration(ttft)} (from POST to first content)`); } - const { shouldOutput } = streamProcessor.processChunk(chunk); + const { shouldOutput, errorInfo, updatedApproval } = + streamProcessor.processChunk(chunk); // Check abort signal before processing - don't add data after interrupt if (abortSignal?.aborted) { @@ -140,10 +164,40 @@ export async function drainStream( break; } - if (shouldOutput) { + let shouldOutputChunk = shouldOutput; + let shouldAccumulate = shouldOutput; + + if (onChunkProcessed) { + const hookResult = await onChunkProcessed({ + chunk, + shouldOutput: shouldOutputChunk, + errorInfo, + updatedApproval, + streamProcessor, + }); + if (hookResult?.shouldOutput !== undefined) { + shouldOutputChunk = hookResult.shouldOutput; + } + if (hookResult?.shouldAccumulate !== undefined) { + shouldAccumulate = hookResult.shouldAccumulate; + } else { + shouldAccumulate = shouldOutputChunk; + } + if (hookResult?.stopReason) { + stopReason = hookResult.stopReason; + } + } else { + shouldAccumulate = shouldOutputChunk; + } + + if (shouldAccumulate) { onChunk(buffers, chunk); queueMicrotask(refresh); } + + if (stopReason) { + break; + } } } catch (e) { // Handle stream errors (e.g., JSON parse errors from SDK, network issues) @@ -270,6 +324,7 @@ export async function drainStream( * @param refresh - Callback to refresh UI * @param abortSignal - Optional abort signal for cancellation * @param onFirstMessage - Optional callback to invoke on first message chunk + * @param onChunkProcessed - Optional hook to observe/override per-chunk behavior * @returns Result with stop_reason, approval info, and timing */ export async function drainStreamWithResume( @@ -278,6 +333,7 @@ export async function drainStreamWithResume( refresh: () => void, abortSignal?: AbortSignal, onFirstMessage?: () => void, + onChunkProcessed?: DrainStreamHook, ): Promise { const overallStartTime = performance.now(); @@ -288,6 +344,7 @@ export async function drainStreamWithResume( refresh, abortSignal, onFirstMessage, + onChunkProcessed, ); // If stream ended without proper stop_reason and we have resume info, try once to reconnect @@ -333,6 +390,8 @@ export async function drainStreamWithResume( buffers, refresh, abortSignal, + undefined, + onChunkProcessed, ); // Use the resume result (should have proper stop_reason now) diff --git a/src/headless.ts b/src/headless.ts index 07070de..08c35c3 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -37,13 +37,14 @@ import { markIncompleteToolsAsCancelled, toLines, } from "./cli/helpers/accumulator"; +import { classifyApprovals } from "./cli/helpers/approvalClassification"; import { formatErrorDetails } from "./cli/helpers/errorFormatter"; -import { safeJsonParseOr } from "./cli/helpers/safeJsonParse"; -import { drainStreamWithResume } from "./cli/helpers/stream"; -import { StreamProcessor } from "./cli/helpers/streamProcessor"; +import { + type DrainStreamHook, + drainStreamWithResume, +} from "./cli/helpers/stream"; import { SYSTEM_REMINDER_CLOSE, SYSTEM_REMINDER_OPEN } from "./constants"; import { settingsManager } from "./settings-manager"; -import { checkToolPermission } from "./tools/manager"; import type { AutoApprovalMessage, CanUseToolControlRequest, @@ -896,54 +897,40 @@ export async function handleHeadlessCommand( reason: string; }; - const decisions: Decision[] = []; + const { autoAllowed, autoDenied } = await classifyApprovals( + pendingApprovals, + { + treatAskAsDeny: true, + denyReasonForAsk: "Tool requires approval (headless mode)", + requireArgsForAutoApprove: true, + missingNameReason: "Tool call incomplete - missing name", + }, + ); - for (const currentApproval of pendingApprovals) { - const { toolName, toolArgs } = currentApproval; - const parsedArgs = safeJsonParseOr>( - toolArgs || "{}", - {}, - ); - const permission = await checkToolPermission(toolName, parsedArgs); - - if (permission.decision === "deny" || permission.decision === "ask") { - const denyReason = - permission.decision === "ask" - ? "Tool requires approval (headless mode)" - : `Permission denied: ${permission.matchedRule || permission.reason}`; - decisions.push({ - type: "deny", - approval: currentApproval, - reason: denyReason, - }); - continue; - } - - // Verify required args present; if missing, deny so the model retries with args - const { getToolSchema } = await import("./tools/manager"); - const schema = getToolSchema(toolName); - const required = - (schema?.input_schema?.required as string[] | undefined) || []; - const missing = required.filter( - (key) => !(key in parsedArgs) || parsedArgs[key] == null, - ); - if (missing.length > 0) { - decisions.push({ - type: "deny", - approval: currentApproval, - reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`, - }); - continue; - } - - // Approve for execution - decisions.push({ - type: "approve", - approval: currentApproval, - reason: permission.reason || "Allowed by permission rule", - matchedRule: permission.matchedRule || "auto-approved", - }); - } + const decisions: Decision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + reason: ac.permission.reason || "Allowed by permission rule", + matchedRule: + "matchedRule" in ac.permission && ac.permission.matchedRule + ? ac.permission.matchedRule + : "auto-approved", + })), + ...autoDenied.map((ac) => { + const fallback = + "matchedRule" in ac.permission && ac.permission.matchedRule + ? `Permission denied: ${ac.permission.matchedRule}` + : ac.permission.reason + ? `Permission denied: ${ac.permission.reason}` + : "Permission denied: Unknown reason"; + return { + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason ?? fallback, + }; + }), + ]; // Phase 2: Execute approved tools and format results using shared function const { executeApprovalBatch } = await import( @@ -1137,20 +1124,20 @@ ${SYSTEM_REMINDER_CLOSE} }> = []; let apiDurationMs: number; let lastRunId: string | null = null; + let approvalPendingRecovery = false; if (outputFormat === "stream-json") { - const startTime = performance.now(); - // Track approval requests across streamed chunks const autoApprovalEmitted = new Set(); - const streamProcessor = new StreamProcessor(); + const streamJsonHook: DrainStreamHook = async ({ + chunk, + shouldOutput, + errorInfo, + updatedApproval, + }) => { + let shouldOutputChunk = shouldOutput; - for await (const chunk of stream) { - const { shouldOutput, errorInfo, updatedApproval } = - streamProcessor.processChunk(chunk); - - // Detect mid-stream errors if (errorInfo && shouldOutput) { const errorEvent: ErrorMessage = { type: "error", @@ -1171,13 +1158,7 @@ ${SYSTEM_REMINDER_CLOSE} }), }; console.log(JSON.stringify(errorEvent)); - - // Still accumulate for tracking - const { onChunk: accumulatorOnChunk } = await import( - "./cli/helpers/accumulator" - ); - accumulatorOnChunk(buffers, chunk); - continue; + shouldOutputChunk = false; } // Detect server conflict due to pending approval; handle it and retry @@ -1186,77 +1167,56 @@ ${SYSTEM_REMINDER_CLOSE} isApprovalPendingError(errorInfo?.detail) || isApprovalPendingError(errorInfo?.message) ) { - // Emit recovery message for stream-json mode (enables testing) - if (outputFormat === "stream-json") { - const recoveryMsg: RecoveryMessage = { - type: "recovery", - recovery_type: "approval_pending", - message: - "Detected pending approval conflict; auto-denying stale approval and retrying", - run_id: lastRunId ?? undefined, - session_id: sessionId, - uuid: `recovery-${lastRunId || crypto.randomUUID()}`, - }; - console.log(JSON.stringify(recoveryMsg)); - } - // Clear approvals and retry outer loop - await resolveAllPendingApprovals(); - // Reset state and restart turn - stopReason = "error" as StopReasonType; - break; + const recoveryRunId = errorInfo?.run_id; + const recoveryMsg: RecoveryMessage = { + type: "recovery", + recovery_type: "approval_pending", + message: + "Detected pending approval conflict; auto-denying stale approval and retrying", + run_id: recoveryRunId ?? undefined, + session_id: sessionId, + uuid: `recovery-${recoveryRunId || crypto.randomUUID()}`, + }; + console.log(JSON.stringify(recoveryMsg)); + approvalPendingRecovery = true; + return { stopReason: "error", shouldAccumulate: true }; } - // Check if we should skip outputting approval requests in bypass mode - let shouldOutputChunk = shouldOutput; - // Check if this approval will be auto-approved. Dedup per tool_call_id if ( updatedApproval && - !autoApprovalEmitted.has(updatedApproval.toolCallId) && - updatedApproval.toolName + !autoApprovalEmitted.has(updatedApproval.toolCallId) ) { - const parsedArgs = safeJsonParseOr | null>( - updatedApproval.toolArgs || "{}", - null, - ); - const permission = await checkToolPermission( - updatedApproval.toolName, - parsedArgs || {}, - ); - if (permission.decision === "allow" && parsedArgs) { - // Only emit auto_approval if we already have all required params - const { getToolSchema } = await import("./tools/manager"); - const schema = getToolSchema(updatedApproval.toolName); - const required = - (schema?.input_schema?.required as string[] | undefined) || []; - const missing = required.filter( - (key) => - !(key in parsedArgs) || - (parsedArgs as Record)[key] == null, - ); - if (missing.length === 0) { - shouldOutputChunk = false; - const autoApprovalMsg: AutoApprovalMessage = { - type: "auto_approval", - tool_call: { - name: updatedApproval.toolName, - tool_call_id: updatedApproval.toolCallId, - arguments: updatedApproval.toolArgs || "{}", - }, - reason: permission.reason || "Allowed by permission rule", - matched_rule: permission.matchedRule || "auto-approved", - session_id: sessionId, - uuid: `auto-approval-${updatedApproval.toolCallId}`, - }; - console.log(JSON.stringify(autoApprovalMsg)); - autoApprovalEmitted.add(updatedApproval.toolCallId); - } + const { autoAllowed } = await classifyApprovals([updatedApproval], { + requireArgsForAutoApprove: true, + missingNameReason: "Tool call incomplete - missing name", + }); + + const [approval] = autoAllowed; + if (approval) { + const permission = approval.permission; + shouldOutputChunk = false; + const autoApprovalMsg: AutoApprovalMessage = { + type: "auto_approval", + tool_call: { + name: approval.approval.toolName, + tool_call_id: approval.approval.toolCallId, + arguments: approval.approval.toolArgs || "{}", + }, + reason: permission.reason || "Allowed by permission rule", + matched_rule: + "matchedRule" in permission && permission.matchedRule + ? permission.matchedRule + : "auto-approved", + session_id: sessionId, + uuid: `auto-approval-${approval.approval.toolCallId}`, + }; + console.log(JSON.stringify(autoApprovalMsg)); + autoApprovalEmitted.add(approval.approval.toolCallId); } } - // Output chunk as message event (unless filtered) if (shouldOutputChunk) { - // Use existing otid or id from the Letta SDK chunk const chunkWithIds = chunk as typeof chunk & { otid?: string; id?: string; @@ -1264,7 +1224,6 @@ ${SYSTEM_REMINDER_CLOSE} const uuid = chunkWithIds.otid || chunkWithIds.id; if (includePartialMessages) { - // Emit as stream_event wrapper (like Claude Code with --include-partial-messages) const streamEvent: StreamEvent = { type: "stream_event", event: chunk, @@ -1273,7 +1232,6 @@ ${SYSTEM_REMINDER_CLOSE} }; console.log(JSON.stringify(streamEvent)); } else { - // Emit as regular message (default) const msg: MessageWire = { type: "message", ...chunk, @@ -1284,23 +1242,22 @@ ${SYSTEM_REMINDER_CLOSE} } } - // Still accumulate for approval tracking - const { onChunk } = await import("./cli/helpers/accumulator"); - onChunk(buffers, chunk); - } + return { shouldOutput: shouldOutputChunk, shouldAccumulate: true }; + }; - stopReason = stopReason || streamProcessor.stopReason || "error"; - apiDurationMs = performance.now() - startTime; - approvals = streamProcessor.getApprovals(); - // Use the last run_id we saw (if any) - lastRunId = streamProcessor.lastRunId; - if (lastRunId) lastKnownRunId = lastRunId; - - // Mark final line as finished - const { markCurrentLineAsFinished } = await import( - "./cli/helpers/accumulator" + const result = await drainStreamWithResume( + stream, + buffers, + () => {}, + undefined, + undefined, + streamJsonHook, ); - markCurrentLineAsFinished(buffers); + stopReason = result.stopReason; + approvals = result.approvals || []; + apiDurationMs = result.apiDurationMs; + lastRunId = result.lastRunId || null; + if (lastRunId) lastKnownRunId = lastRunId; } else { // Normal mode: use drainStreamWithResume const result = await drainStreamWithResume( @@ -1317,6 +1274,10 @@ ${SYSTEM_REMINDER_CLOSE} // Track API duration for this stream sessionStats.endTurn(apiDurationMs); + if (approvalPendingRecovery) { + await resolveAllPendingApprovals(); + continue; + } // Case 1: Turn ended normally if (stopReason === "end_turn") { @@ -1353,63 +1314,32 @@ ${SYSTEM_REMINDER_CLOSE} reason: string; }; - const decisions: Decision[] = []; + const { autoAllowed, autoDenied } = await classifyApprovals(approvals, { + treatAskAsDeny: true, + denyReasonForAsk: "Tool requires approval (headless mode)", + requireArgsForAutoApprove: true, + missingNameReason: "Tool call incomplete - missing name", + }); - for (const currentApproval of approvals) { - const { toolName, toolArgs } = currentApproval; - - // Check permission using existing permission system - const parsedArgs = safeJsonParseOr>( - toolArgs, - {}, - ); - const permission = await checkToolPermission(toolName, parsedArgs); - - // Handle deny decision - if (permission.decision === "deny") { - const denyReason = `Permission denied: ${permission.matchedRule || permission.reason}`; - decisions.push({ - type: "deny", - approval: currentApproval, - reason: denyReason, - }); - continue; - } - - // Handle ask decision - in headless mode, auto-deny - if (permission.decision === "ask") { - decisions.push({ - type: "deny", - approval: currentApproval, - reason: "Tool requires approval (headless mode)", - }); - continue; - } - - // Permission is "allow" - verify we have required arguments before executing - const { getToolSchema } = await import("./tools/manager"); - const schema = getToolSchema(toolName); - const required = - (schema?.input_schema?.required as string[] | undefined) || []; - const missing = required.filter( - (key) => !(key in parsedArgs) || parsedArgs[key] == null, - ); - if (missing.length > 0) { - // Auto-deny with a clear reason so the model can retry with arguments - decisions.push({ - type: "deny", - approval: currentApproval, - reason: `Missing required parameter${missing.length > 1 ? "s" : ""}: ${missing.join(", ")}`, - }); - continue; - } - - // Approve this tool for execution - decisions.push({ - type: "approve", - approval: currentApproval, - }); - } + const decisions: Decision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + })), + ...autoDenied.map((ac) => { + const fallback = + "matchedRule" in ac.permission && ac.permission.matchedRule + ? `Permission denied: ${ac.permission.matchedRule}` + : ac.permission.reason + ? `Permission denied: ${ac.permission.reason}` + : "Permission denied: Unknown reason"; + return { + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason ?? fallback, + }; + }), + ]; // Phase 2: Execute all approved tools and format results using shared function const { executeApprovalBatch } = await import( @@ -2069,53 +1999,48 @@ async function runBidirectionalMode( const stream = await sendMessageStream(conversationId, currentInput, { agentId: agent.id, }); - - const streamProcessor = new StreamProcessor(); - - // Process stream - for await (const chunk of stream) { - // Check if aborted - if (currentAbortController?.signal.aborted) { - break; + const streamJsonHook: DrainStreamHook = ({ chunk, shouldOutput }) => { + if (!shouldOutput) { + return { shouldAccumulate: true }; } - // Process chunk through StreamProcessor - const { shouldOutput } = streamProcessor.processChunk(chunk); + const chunkWithIds = chunk as typeof chunk & { + otid?: string; + id?: string; + }; + const uuid = chunkWithIds.otid || chunkWithIds.id; - // Output chunk if not suppressed - if (shouldOutput) { - const chunkWithIds = chunk as typeof chunk & { - otid?: string; - id?: string; + if (includePartialMessages) { + const streamEvent: StreamEvent = { + type: "stream_event", + event: chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), }; - const uuid = chunkWithIds.otid || chunkWithIds.id; - - if (includePartialMessages) { - const streamEvent: StreamEvent = { - type: "stream_event", - event: chunk, - session_id: sessionId, - uuid: uuid || crypto.randomUUID(), - }; - console.log(JSON.stringify(streamEvent)); - } else { - const msg: MessageWire = { - type: "message", - ...chunk, - session_id: sessionId, - uuid: uuid || crypto.randomUUID(), - }; - console.log(JSON.stringify(msg)); - } + console.log(JSON.stringify(streamEvent)); + } else { + const msg: MessageWire = { + type: "message", + ...chunk, + session_id: sessionId, + uuid: uuid || crypto.randomUUID(), + }; + console.log(JSON.stringify(msg)); } - // Accumulate for result - const { onChunk } = await import("./cli/helpers/accumulator"); - onChunk(buffers, chunk); - } + return { shouldAccumulate: true }; + }; - // Get stop reason from processor - const stopReason = streamProcessor.stopReason || "error"; + const result = await drainStreamWithResume( + stream, + buffers, + () => {}, + currentAbortController?.signal, + undefined, + streamJsonHook, + ); + const stopReason = result.stopReason; + const approvals = result.approvals || []; // Case 1: Turn ended normally - break out of loop if (stopReason === "end_turn") { @@ -2123,14 +2048,15 @@ async function runBidirectionalMode( } // Case 2: Aborted - break out of loop - if (currentAbortController?.signal.aborted) { + if ( + currentAbortController?.signal.aborted || + stopReason === "cancelled" + ) { break; } // Case 3: Requires approval - process approvals and continue if (stopReason === "requires_approval") { - const approvals = streamProcessor.getApprovals(); - if (approvals.length === 0) { // No approvals to process - break break; @@ -2157,91 +2083,100 @@ async function runBidirectionalMode( reason: string; }; - const decisions: Decision[] = []; + const { autoAllowed, autoDenied, needsUserInput } = + await classifyApprovals(approvals, { + requireArgsForAutoApprove: true, + missingNameReason: "Tool call incomplete - missing name", + }); - for (const approval of approvals) { - const parsedArgs = safeJsonParseOr>( - approval.toolArgs, - {}, - ); - const permission = await checkToolPermission( - approval.toolName, - parsedArgs, + const decisions: Decision[] = [ + ...autoAllowed.map((ac) => ({ + type: "approve" as const, + approval: ac.approval, + matchedRule: + "matchedRule" in ac.permission && ac.permission.matchedRule + ? ac.permission.matchedRule + : "auto-approved", + })), + ...autoDenied.map((ac) => { + const fallback = + "matchedRule" in ac.permission && ac.permission.matchedRule + ? `Permission denied: ${ac.permission.matchedRule}` + : ac.permission.reason + ? `Permission denied: ${ac.permission.reason}` + : "Permission denied: Unknown reason"; + return { + type: "deny" as const, + approval: ac.approval, + reason: ac.denyReason ?? fallback, + }; + }), + ]; + + for (const approvalItem of autoAllowed) { + const permission = approvalItem.permission; + const autoApprovalMsg: AutoApprovalMessage = { + type: "auto_approval", + tool_call: { + name: approvalItem.approval.toolName, + tool_call_id: approvalItem.approval.toolCallId, + arguments: approvalItem.approval.toolArgs, + }, + reason: permission.reason || "auto-approved", + matched_rule: + "matchedRule" in permission && permission.matchedRule + ? permission.matchedRule + : "auto-approved", + session_id: sessionId, + uuid: `auto-approval-${approvalItem.approval.toolCallId}`, + }; + console.log(JSON.stringify(autoApprovalMsg)); + } + + for (const ac of needsUserInput) { + // permission.decision === "ask" - request permission from SDK + const permResponse = await requestPermission( + ac.approval.toolCallId, + ac.approval.toolName, + ac.parsedArgs, ); - if (permission.decision === "allow") { + if (permResponse.decision === "allow") { + // If provided updatedInput (e.g., for AskUserQuestion with answers), + // update the approval's toolArgs to use it + const finalApproval = permResponse.updatedInput + ? { + ...ac.approval, + toolArgs: JSON.stringify(permResponse.updatedInput), + } + : ac.approval; + decisions.push({ type: "approve", - approval, - matchedRule: permission.matchedRule || "auto-approved", + approval: finalApproval, + matchedRule: "SDK callback approved", }); - // Emit auto_approval event + // Emit auto_approval event for SDK-approved tool const autoApprovalMsg: AutoApprovalMessage = { type: "auto_approval", tool_call: { - name: approval.toolName, - tool_call_id: approval.toolCallId, - arguments: approval.toolArgs, + name: finalApproval.toolName, + tool_call_id: finalApproval.toolCallId, + arguments: finalApproval.toolArgs, }, - reason: permission.reason || "auto-approved", - matched_rule: permission.matchedRule || "auto-approved", + reason: permResponse.reason || "SDK callback approved", + matched_rule: "canUseTool callback", session_id: sessionId, - uuid: `auto-approval-${approval.toolCallId}`, + uuid: `auto-approval-${ac.approval.toolCallId}`, }; console.log(JSON.stringify(autoApprovalMsg)); - } else if (permission.decision === "deny") { - // Explicitly denied by permission rules + } else { decisions.push({ type: "deny", - approval, - reason: `Permission denied: ${permission.matchedRule || permission.reason}`, + approval: ac.approval, + reason: permResponse.reason || "Denied by SDK callback", }); - } else { - // permission.decision === "ask" - request permission from SDK - const permResponse = await requestPermission( - approval.toolCallId, - approval.toolName, - parsedArgs, - ); - - if (permResponse.decision === "allow") { - // If provided updatedInput (e.g., for AskUserQuestion with answers), - // update the approval's toolArgs to use it - const finalApproval = permResponse.updatedInput - ? { - ...approval, - toolArgs: JSON.stringify(permResponse.updatedInput), - } - : approval; - - decisions.push({ - type: "approve", - approval: finalApproval, - matchedRule: "SDK callback approved", - }); - - // Emit auto_approval event for SDK-approved tool - const autoApprovalMsg: AutoApprovalMessage = { - type: "auto_approval", - tool_call: { - name: finalApproval.toolName, - tool_call_id: finalApproval.toolCallId, - arguments: finalApproval.toolArgs, - }, - reason: permResponse.reason || "SDK callback approved", - matched_rule: "canUseTool callback", - session_id: sessionId, - uuid: `auto-approval-${approval.toolCallId}`, - }; - console.log(JSON.stringify(autoApprovalMsg)); - } else { - decisions.push({ - type: "deny", - approval, - reason: permResponse.reason || "Denied by SDK callback", - }); - } } }