From 163e34b04bd8ed87fd2d9a3df338084f6211e881 Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Sun, 11 Jan 2026 15:49:38 -0800 Subject: [PATCH] feat: streaming output for bash commands (#516) Co-authored-by: Letta --- src/agent/approval-execution.ts | 35 +++++- src/cli/App.tsx | 105 +++++++++++++++++- src/cli/components/BashCommandMessage.tsx | 18 ++- src/cli/components/CollapsedOutputDisplay.tsx | 57 ++++++++++ src/cli/components/StreamingOutputDisplay.tsx | 67 +++++++++++ src/cli/components/ToolCallMessageRich.tsx | 47 +++++++- src/cli/helpers/accumulator.ts | 91 +++++++++++++++ src/tools/impl/Bash.ts | 7 ++ src/tools/impl/shellEnv.ts | 10 ++ src/tools/manager.ts | 19 +++- 10 files changed, 439 insertions(+), 17 deletions(-) create mode 100644 src/cli/components/CollapsedOutputDisplay.tsx create mode 100644 src/cli/components/StreamingOutputDisplay.tsx diff --git a/src/agent/approval-execution.ts b/src/agent/approval-execution.ts index 5fdaa7c..ea132b8 100644 --- a/src/agent/approval-execution.ts +++ b/src/agent/approval-execution.ts @@ -160,7 +160,14 @@ export type ApprovalResult = ToolReturn | ApprovalReturn; async function executeSingleDecision( decision: ApprovalDecision, onChunk?: (chunk: ToolReturnMessage) => void, - options?: { abortSignal?: AbortSignal }, + options?: { + abortSignal?: AbortSignal; + onStreamingOutput?: ( + toolCallId: string, + chunk: string, + isStderr?: boolean, + ) => void; + }, ): Promise { // If aborted, record an interrupted result if (options?.abortSignal?.aborted) { @@ -216,6 +223,14 @@ async function executeSingleDecision( { signal: options?.abortSignal, toolCallId: decision.approval.toolCallId, + onOutput: options?.onStreamingOutput + ? (chunk, stream) => + options.onStreamingOutput?.( + decision.approval.toolCallId, + chunk, + stream === "stderr", + ) + : undefined, }, ); @@ -312,7 +327,14 @@ async function executeSingleDecision( export async function executeApprovalBatch( decisions: ApprovalDecision[], onChunk?: (chunk: ToolReturnMessage) => void, - options?: { abortSignal?: AbortSignal }, + options?: { + abortSignal?: AbortSignal; + onStreamingOutput?: ( + toolCallId: string, + chunk: string, + isStderr?: boolean, + ) => void; + }, ): Promise { // Pre-allocate results array to maintain original order const results: (ApprovalResult | null)[] = new Array(decisions.length).fill( @@ -400,7 +422,14 @@ export async function executeApprovalBatch( export async function executeAutoAllowedTools( autoAllowed: Array<{ approval: ApprovalRequest }>, onChunk: (chunk: ToolReturnMessage) => void, - options?: { abortSignal?: AbortSignal }, + options?: { + abortSignal?: AbortSignal; + onStreamingOutput?: ( + toolCallId: string, + chunk: string, + isStderr?: boolean, + ) => void; + }, ): Promise { const decisions: ApprovalDecision[] = autoAllowed.map((ac) => ({ type: "approve" as const, diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 512642d..9a8f5c9 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -117,6 +117,7 @@ import { UserMessage } from "./components/UserMessageRich"; import { WelcomeScreen } from "./components/WelcomeScreen"; import { AnimationProvider } from "./contexts/AnimationContext"; import { + appendStreamingOutput, type Buffers, createBuffers, type Line, @@ -1097,6 +1098,60 @@ export default function App({ commitEligibleLines(b); }, [commitEligibleLines]); + // Trailing-edge debounce for bash streaming output (100ms = max 10 updates/sec) + // Unlike refreshDerivedThrottled, this REPLACES pending updates to always show latest state + const streamingRefreshTimeoutRef = useRef | null>(null); + const refreshDerivedStreaming = useCallback(() => { + // Cancel any pending refresh - we want the LATEST state + if (streamingRefreshTimeoutRef.current) { + clearTimeout(streamingRefreshTimeoutRef.current); + } + streamingRefreshTimeoutRef.current = setTimeout(() => { + streamingRefreshTimeoutRef.current = null; + if (!buffersRef.current.interrupted) { + refreshDerived(); + } + }, 100); + }, [refreshDerived]); + + // Cleanup streaming refresh on unmount + useEffect(() => { + return () => { + if (streamingRefreshTimeoutRef.current) { + clearTimeout(streamingRefreshTimeoutRef.current); + } + }; + }, []); + + // Helper to update streaming output for bash/shell tools + const updateStreamingOutput = useCallback( + (toolCallId: string, chunk: string, isStderr = false) => { + const lineId = buffersRef.current.toolCallIdToLineId.get(toolCallId); + if (!lineId) return; + + const entry = buffersRef.current.byId.get(lineId); + if (!entry || entry.kind !== "tool_call") return; + + // Immutable update with tail buffer + const newStreaming = appendStreamingOutput( + entry.streaming, + chunk, + entry.streaming?.startTime || Date.now(), + isStderr, + ); + + buffersRef.current.byId.set(lineId, { + ...entry, + streaming: newStreaming, + }); + + refreshDerivedStreaming(); + }, + [refreshDerivedStreaming], + ); + // Throttled version for streaming updates (~60fps max) const refreshDerivedThrottled = useCallback(() => { // Use a ref to track pending refresh @@ -2075,6 +2130,10 @@ export default function App({ const autoAllowedResults = await executeAutoAllowedTools( autoAllowed, (chunk) => onChunk(buffersRef.current, chunk), + { + abortSignal: signal, + onStreamingOutput: updateStreamingOutput, + }, ); // Create denial results for auto-denied tools and update buffers @@ -2564,6 +2623,7 @@ export default function App({ refreshDerivedThrottled, setStreaming, currentModelId, + updateStreamingOutput, ], ); @@ -2965,14 +3025,22 @@ export default function App({ const handleBashSubmit = useCallback( async (command: string) => { const cmdId = uid("bash"); + const startTime = Date.now(); - // Add running bash_command line + // Add running bash_command line with streaming state buffersRef.current.byId.set(cmdId, { kind: "bash_command", id: cmdId, input: command, output: "", phase: "running", + streaming: { + tailLines: [], + partialLine: "", + partialIsStderr: false, + totalLineCount: 0, + startTime, + }, }); buffersRef.current.order.push(cmdId); refreshDerived(); @@ -2986,13 +3054,29 @@ export default function App({ cwd: process.cwd(), env: getShellEnv(), timeout: 30000, // 30 second timeout + onOutput: (chunk, stream) => { + const entry = buffersRef.current.byId.get(cmdId); + if (entry && entry.kind === "bash_command") { + const newStreaming = appendStreamingOutput( + entry.streaming, + chunk, + startTime, + stream === "stderr", + ); + buffersRef.current.byId.set(cmdId, { + ...entry, + streaming: newStreaming, + }); + refreshDerivedStreaming(); + } + }, }); // Combine stdout and stderr for output const output = (result.stdout + result.stderr).trim(); const success = result.exitCode === 0; - // Update line with output + // Update line with output, clear streaming state buffersRef.current.byId.set(cmdId, { kind: "bash_command", id: cmdId, @@ -3000,6 +3084,7 @@ export default function App({ output: output || (success ? "" : `Exit code: ${result.exitCode}`), phase: "finished", success, + streaming: undefined, }); // Cache for next user message @@ -3023,6 +3108,7 @@ export default function App({ output: errOutput, phase: "finished", success: false, + streaming: undefined, }); // Still cache for next user message (even failures are visible to agent) @@ -3031,7 +3117,7 @@ export default function App({ refreshDerived(); }, - [refreshDerived], + [refreshDerived, refreshDerivedStreaming], ); /** @@ -3138,6 +3224,7 @@ export default function App({ const autoAllowedResults = await executeAutoAllowedTools( autoAllowed, (chunk) => onChunk(buffersRef.current, chunk), + { onStreamingOutput: updateStreamingOutput }, ); // Map to ApprovalResult format (ToolReturn) allResults.push( @@ -3187,7 +3274,7 @@ export default function App({ // If check fails, proceed anyway (don't block user) return { blocked: false }; } - }, [agentId, processConversation, refreshDerived]); + }, [agentId, processConversation, refreshDerived, updateStreamingOutput]); // biome-ignore lint/correctness/useExhaustiveDependencies: refs read .current dynamically, complex callback with intentional deps const onSubmit = useCallback( @@ -5019,6 +5106,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl const autoAllowedResults = await executeAutoAllowedTools( autoAllowed, (chunk) => onChunk(buffersRef.current, chunk), + { onStreamingOutput: updateStreamingOutput }, ); // Create denial results for auto-denied and update UI @@ -5175,6 +5263,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl const autoAllowedWithResults = await executeAutoAllowedTools( autoAllowed, (chunk) => onChunk(buffersRef.current, chunk), + { onStreamingOutput: updateStreamingOutput }, ); // Create denial reasons for auto-denied and update UI @@ -5380,7 +5469,10 @@ DO NOT respond to these messages or otherwise consider them in your response unl // Flush UI so completed tools show up while the batch continues refreshDerived(); }, - { abortSignal: approvalAbortController.signal }, + { + abortSignal: approvalAbortController.signal, + onStreamingOutput: updateStreamingOutput, + }, ); // Combine with auto-handled and auto-denied results using snapshots @@ -5473,6 +5565,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl refreshDerived, appendError, setStreaming, + updateStreamingOutput, ], ); @@ -5652,6 +5745,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl onChunk(buffersRef.current, chunk); refreshDerived(); }, + { onStreamingOutput: updateStreamingOutput }, ); // Combine with auto-handled and auto-denied results (from initial check) @@ -5704,6 +5798,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl refreshDerived, isExecutingTool, setStreaming, + updateStreamingOutput, ], ); diff --git a/src/cli/components/BashCommandMessage.tsx b/src/cli/components/BashCommandMessage.tsx index e4ff9a7..a173eb0 100644 --- a/src/cli/components/BashCommandMessage.tsx +++ b/src/cli/components/BashCommandMessage.tsx @@ -1,9 +1,12 @@ import { Box, Text } from "ink"; import { memo } from "react"; +import type { StreamingState } from "../helpers/accumulator"; import { useTerminalWidth } from "../hooks/useTerminalWidth"; import { BlinkDot } from "./BlinkDot.js"; +import { CollapsedOutputDisplay } from "./CollapsedOutputDisplay"; import { colors } from "./colors.js"; import { MarkdownDisplay } from "./MarkdownDisplay.js"; +import { StreamingOutputDisplay } from "./StreamingOutputDisplay"; type BashCommandLine = { kind: "bash_command"; @@ -12,6 +15,7 @@ type BashCommandLine = { output: string; phase?: "running" | "finished"; success?: boolean; + streaming?: StreamingState; }; /** @@ -54,8 +58,18 @@ export const BashCommandMessage = memo( - {/* Command output (if present) */} - {line.output && ( + {/* Streaming output during execution */} + {line.phase === "running" && line.streaming && ( + + )} + + {/* Collapsed output after completion */} + {line.phase === "finished" && line.output && ( + + )} + + {/* Fallback: show output when phase is undefined (legacy bash commands before streaming) */} + {!line.phase && line.output && ( {" ⎿ "} diff --git a/src/cli/components/CollapsedOutputDisplay.tsx b/src/cli/components/CollapsedOutputDisplay.tsx new file mode 100644 index 0000000..9e1eb1c --- /dev/null +++ b/src/cli/components/CollapsedOutputDisplay.tsx @@ -0,0 +1,57 @@ +import { Box, Text } from "ink"; +import { memo } from "react"; + +const COLLAPSED_LINES = 3; + +interface CollapsedOutputDisplayProps { + output: string; // Full output from completion +} + +/** + * Display component for bash output after completion. + * Shows first 3 lines with count of hidden lines. + * Note: expand/collapse (ctrl+o) is deferred to a future PR. + */ +export const CollapsedOutputDisplay = memo( + ({ output }: CollapsedOutputDisplayProps) => { + // Keep empty lines for accurate display (don't filter them out) + const lines = output.split("\n"); + // Remove trailing empty line from final newline + if (lines.length > 0 && lines[lines.length - 1] === "") { + lines.pop(); + } + + if (lines.length === 0) { + return null; + } + + const visibleLines = lines.slice(0, COLLAPSED_LINES); + const hiddenCount = Math.max(0, lines.length - COLLAPSED_LINES); + + return ( + + {/* L-bracket on first line - matches ToolCallMessageRich format " ⎿ " */} + + {" ⎿ "} + {visibleLines[0]} + + {/* Remaining visible lines with indent (5 spaces to align with content after bracket) */} + {visibleLines.slice(1).map((line, i) => ( + // biome-ignore lint/suspicious/noArrayIndexKey: Lines are positional output, stable order within render + + {" "} + {line} + + ))} + {/* Hidden count hint */} + {hiddenCount > 0 && ( + + {" "}… +{hiddenCount} lines + + )} + + ); + }, +); + +CollapsedOutputDisplay.displayName = "CollapsedOutputDisplay"; diff --git a/src/cli/components/StreamingOutputDisplay.tsx b/src/cli/components/StreamingOutputDisplay.tsx new file mode 100644 index 0000000..3ec5daf --- /dev/null +++ b/src/cli/components/StreamingOutputDisplay.tsx @@ -0,0 +1,67 @@ +import { Box, Text } from "ink"; +import { memo, useEffect, useState } from "react"; +import type { StreamingState } from "../helpers/accumulator"; + +interface StreamingOutputDisplayProps { + streaming: StreamingState; +} + +/** + * Display component for streaming bash output during execution. + * Shows a rolling window of the last 5 lines with elapsed time. + */ +export const StreamingOutputDisplay = memo( + ({ streaming }: StreamingOutputDisplayProps) => { + // Force re-render every second for elapsed timer + const [, forceUpdate] = useState(0); + useEffect(() => { + const interval = setInterval(() => forceUpdate((n) => n + 1), 1000); + return () => clearInterval(interval); + }, []); + + const elapsed = Math.floor((Date.now() - streaming.startTime) / 1000); + const { tailLines, totalLineCount } = streaming; + const hiddenCount = Math.max(0, totalLineCount - tailLines.length); + + // No output yet - don't show anything + const firstLine = tailLines[0]; + if (!firstLine) { + return null; + } + + return ( + + {/* L-bracket on first line - matches ToolCallMessageRich format " ⎿ " */} + + {" ⎿ "} + + {firstLine.text} + + + {/* Remaining lines with indent (5 spaces to align with content after bracket) */} + {tailLines.slice(1).map((line, i) => ( + + {" "} + {line.text} + + ))} + {/* Hidden count + elapsed time */} + {hiddenCount > 0 && ( + + {" "}… +{hiddenCount} more lines ({elapsed}s) + + )} + + ); + }, +); + +StreamingOutputDisplay.displayName = "StreamingOutputDisplay"; diff --git a/src/cli/components/ToolCallMessageRich.tsx b/src/cli/components/ToolCallMessageRich.tsx index 241ca83..0912841 100644 --- a/src/cli/components/ToolCallMessageRich.tsx +++ b/src/cli/components/ToolCallMessageRich.tsx @@ -29,9 +29,11 @@ function isQuestionTool(name: string): boolean { return name === "AskUserQuestion"; } +import type { StreamingState } from "../helpers/accumulator"; import { useTerminalWidth } from "../hooks/useTerminalWidth"; import { AdvancedDiffRenderer } from "./AdvancedDiffRenderer"; import { BlinkDot } from "./BlinkDot.js"; +import { CollapsedOutputDisplay } from "./CollapsedOutputDisplay"; import { colors } from "./colors.js"; import { EditRenderer, @@ -41,6 +43,7 @@ import { import { MarkdownDisplay } from "./MarkdownDisplay.js"; import { MemoryDiffRenderer } from "./MemoryDiffRenderer.js"; import { PlanRenderer } from "./PlanRenderer.js"; +import { StreamingOutputDisplay } from "./StreamingOutputDisplay"; import { TodoRenderer } from "./TodoRenderer.js"; type ToolCallLine = { @@ -52,8 +55,25 @@ type ToolCallLine = { resultText?: string; resultOk?: boolean; phase: "streaming" | "ready" | "running" | "finished"; + streaming?: StreamingState; }; +/** + * Check if tool is a shell/bash tool that supports streaming output + */ +function isShellTool(name: string): boolean { + const shellTools = [ + "Bash", + "Shell", + "shell", + "shell_command", + "run_shell_command", + "RunShellCommand", + "ShellCommand", + ]; + return shellTools.includes(name); +} + /** * ToolCallMessageRich - Rich formatting version with old layout logic * This preserves the exact wrapping and spacing logic from the old codebase @@ -680,8 +700,31 @@ export const ToolCallMessage = memo( - {/* Tool result (if present) */} - {getResultElement()} + {/* Streaming output for shell tools during execution */} + {isShellTool(rawName) && line.phase === "running" && line.streaming && ( + + )} + + {/* Collapsed output for shell tools after completion */} + {isShellTool(rawName) && + line.phase === "finished" && + line.resultText && + line.resultOk !== false && ( + + )} + + {/* Tool result for non-shell tools or shell tool errors */} + {(() => { + // Show default result element when: + // - Not a shell tool (always show result) + // - Shell tool with error (show error message) + // - Shell tool in streaming/ready phase (show default "Running..." etc) + const showDefaultResult = + !isShellTool(rawName) || + (line.phase === "finished" && line.resultOk === false) || + (line.phase !== "running" && line.phase !== "finished"); + return showDefaultResult ? getResultElement() : null; + })()} ); }, diff --git a/src/cli/helpers/accumulator.ts b/src/cli/helpers/accumulator.ts index ac09a01..012be6a 100644 --- a/src/cli/helpers/accumulator.ts +++ b/src/cli/helpers/accumulator.ts @@ -7,6 +7,93 @@ import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; import { INTERRUPTED_BY_USER } from "../../constants"; +// Constants for streaming output +const MAX_TAIL_LINES = 5; +const MAX_BUFFER_SIZE = 100_000; // 100KB + +/** + * A line of streaming output with its source (stdout or stderr). + */ +export interface StreamingLine { + text: string; + isStderr: boolean; +} + +/** + * Streaming state for bash/shell tools. + * Tracks a rolling window of output during execution. + */ +export interface StreamingState { + tailLines: StreamingLine[]; // Last 5 complete lines (for rolling display) + partialLine: string; // Incomplete line being accumulated + partialIsStderr: boolean; // Whether partial line is from stderr + totalLineCount: number; // Total lines seen (for "+N more" count) + startTime: number; // For elapsed time display +} + +/** + * Append a chunk of output to the streaming state. + * Maintains a tail buffer of the last N lines and handles partial line accumulation. + */ +export function appendStreamingOutput( + state: StreamingState | undefined, + chunk: string, + startTime: number, + isStderr = false, +): StreamingState { + const current = state || { + tailLines: [], + partialLine: "", + partialIsStderr: false, + totalLineCount: 0, + startTime, + }; + + let tailLines = [...current.tailLines]; + let totalLineCount = current.totalLineCount; + let partialLine = current.partialLine; + let partialIsStderr = current.partialIsStderr; + + // If stream type changed and we have a partial, flush it as a complete line + if (partialLine && isStderr !== partialIsStderr) { + tailLines.push({ text: partialLine, isStderr: partialIsStderr }); + totalLineCount++; + partialLine = ""; + } + + // Append chunk to partial line + let buffer = partialLine + chunk; + + // Size limit check - slice at line boundary to avoid corrupted lines + if (buffer.length > MAX_BUFFER_SIZE) { + const truncated = buffer.slice(-MAX_BUFFER_SIZE); + const firstNewline = truncated.indexOf("\n"); + buffer = firstNewline >= 0 ? truncated.slice(firstNewline + 1) : truncated; + } + + // Split into complete lines + remainder + const lines = buffer.split("\n"); + const newPartialLine = lines.pop() || ""; // Last element is incomplete + + // Convert string lines to StreamingLine objects with current stream's stderr flag + const newLines: StreamingLine[] = lines.map((text) => ({ + text, + isStderr, + })); + + // Update tail with new complete lines (keep empty lines for accurate display) + const allLines = [...tailLines, ...newLines]; + const finalTailLines = allLines.slice(-MAX_TAIL_LINES); + + return { + tailLines: finalTailLines, + partialLine: newPartialLine, + partialIsStderr: isStderr, + totalLineCount: totalLineCount + lines.length, + startTime: current.startTime, + }; +} + // One line per transcript row. Tool calls evolve in-place. // For tool call returns, merge into the tool call matching the toolCallId export type Line = @@ -36,6 +123,8 @@ export type Line = resultOk?: boolean; // state that's useful for rendering phase: "streaming" | "ready" | "running" | "finished"; + // streaming output state (for shell tools during execution) + streaming?: StreamingState; } | { kind: "error"; id: string; text: string } | { @@ -54,6 +143,8 @@ export type Line = output: string; phase?: "running" | "finished"; success?: boolean; + // streaming output state (during execution) + streaming?: StreamingState; } | { kind: "status"; diff --git a/src/tools/impl/Bash.ts b/src/tools/impl/Bash.ts index 94a576b..8dd25a7 100644 --- a/src/tools/impl/Bash.ts +++ b/src/tools/impl/Bash.ts @@ -37,6 +37,7 @@ function spawnWithLauncher( env: NodeJS.ProcessEnv; timeout: number; signal?: AbortSignal; + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void; }, ): Promise<{ stdout: string; stderr: string; exitCode: number | null }> { return new Promise((resolve, reject) => { @@ -71,10 +72,12 @@ function spawnWithLauncher( childProcess.stdout?.on("data", (chunk: Buffer) => { stdoutChunks.push(chunk); + options.onOutput?.(chunk.toString("utf8"), "stdout"); }); childProcess.stderr?.on("data", (chunk: Buffer) => { stderrChunks.push(chunk); + options.onOutput?.(chunk.toString("utf8"), "stderr"); }); childProcess.on("error", (err) => { @@ -137,6 +140,7 @@ export async function spawnCommand( env: NodeJS.ProcessEnv; timeout: number; signal?: AbortSignal; + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void; }, ): Promise<{ stdout: string; stderr: string; exitCode: number | null }> { // On Unix (Linux/macOS), use simple bash -c approach (original behavior) @@ -200,6 +204,7 @@ interface BashArgs { description?: string; run_in_background?: boolean; signal?: AbortSignal; + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void; } interface BashResult { @@ -218,6 +223,7 @@ export async function bash(args: BashArgs): Promise { description: _description, run_in_background = false, signal, + onOutput, } = args; const userCwd = process.env.USER_CWD || process.cwd(); @@ -314,6 +320,7 @@ export async function bash(args: BashArgs): Promise { env: getShellEnv(), timeout: effectiveTimeout, signal, + onOutput, }); let output = stdout; diff --git a/src/tools/impl/shellEnv.ts b/src/tools/impl/shellEnv.ts index a1156d3..ccefa81 100644 --- a/src/tools/impl/shellEnv.ts +++ b/src/tools/impl/shellEnv.ts @@ -87,5 +87,15 @@ export function getShellEnv(): NodeJS.ProcessEnv { : nodeModulesDir; } + // Disable interactive pagers (fixes git log, man, etc. hanging) + env.PAGER = "cat"; + env.GIT_PAGER = "cat"; + env.MANPAGER = "cat"; + + // Ensure TERM is set for proper color support + if (!env.TERM) { + env.TERM = "xterm-256color"; + } + return env; } diff --git a/src/tools/manager.ts b/src/tools/manager.ts index fb124cb..16817bb 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -686,13 +686,17 @@ function flattenToolResponse(result: unknown): string { * * @param name - The name of the tool to execute * @param args - Arguments object to pass to the tool - * @param options - Optional execution options (abort signal, tool call ID) + * @param options - Optional execution options (abort signal, tool call ID, streaming callback) * @returns Promise with the tool's execution result including status and optional stdout/stderr */ export async function executeTool( name: string, args: ToolArgs, - options?: { signal?: AbortSignal; toolCallId?: string }, + options?: { + signal?: AbortSignal; + toolCallId?: string; + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void; + }, ): Promise { const internalName = resolveInternalToolName(name); if (!internalName) { @@ -716,9 +720,14 @@ export async function executeTool( // Inject options for tools that support them without altering schemas let enhancedArgs = args; - // Inject abort signal for Bash tool - if (internalName === "Bash" && options?.signal) { - enhancedArgs = { ...enhancedArgs, signal: options.signal }; + // Inject abort signal and streaming callback for Bash tool + if (internalName === "Bash") { + if (options?.signal) { + enhancedArgs = { ...enhancedArgs, signal: options.signal }; + } + if (options?.onOutput) { + enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput }; + } } // Inject toolCallId and abort signal for Task tool