feat: streaming output for bash commands (#516)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-01-11 15:49:38 -08:00
committed by GitHub
parent c07a5edd88
commit 163e34b04b
10 changed files with 439 additions and 17 deletions

View File

@@ -160,7 +160,14 @@ export type ApprovalResult = ToolReturn | ApprovalReturn;
async function executeSingleDecision(
decision: ApprovalDecision,
onChunk?: (chunk: ToolReturnMessage) => void,
options?: { abortSignal?: AbortSignal },
options?: {
abortSignal?: AbortSignal;
onStreamingOutput?: (
toolCallId: string,
chunk: string,
isStderr?: boolean,
) => void;
},
): Promise<ApprovalResult> {
// If aborted, record an interrupted result
if (options?.abortSignal?.aborted) {
@@ -216,6 +223,14 @@ async function executeSingleDecision(
{
signal: options?.abortSignal,
toolCallId: decision.approval.toolCallId,
onOutput: options?.onStreamingOutput
? (chunk, stream) =>
options.onStreamingOutput?.(
decision.approval.toolCallId,
chunk,
stream === "stderr",
)
: undefined,
},
);
@@ -312,7 +327,14 @@ async function executeSingleDecision(
export async function executeApprovalBatch(
decisions: ApprovalDecision[],
onChunk?: (chunk: ToolReturnMessage) => void,
options?: { abortSignal?: AbortSignal },
options?: {
abortSignal?: AbortSignal;
onStreamingOutput?: (
toolCallId: string,
chunk: string,
isStderr?: boolean,
) => void;
},
): Promise<ApprovalResult[]> {
// Pre-allocate results array to maintain original order
const results: (ApprovalResult | null)[] = new Array(decisions.length).fill(
@@ -400,7 +422,14 @@ export async function executeApprovalBatch(
export async function executeAutoAllowedTools(
autoAllowed: Array<{ approval: ApprovalRequest }>,
onChunk: (chunk: ToolReturnMessage) => void,
options?: { abortSignal?: AbortSignal },
options?: {
abortSignal?: AbortSignal;
onStreamingOutput?: (
toolCallId: string,
chunk: string,
isStderr?: boolean,
) => void;
},
): Promise<AutoAllowedResult[]> {
const decisions: ApprovalDecision[] = autoAllowed.map((ac) => ({
type: "approve" as const,

View File

@@ -117,6 +117,7 @@ import { UserMessage } from "./components/UserMessageRich";
import { WelcomeScreen } from "./components/WelcomeScreen";
import { AnimationProvider } from "./contexts/AnimationContext";
import {
appendStreamingOutput,
type Buffers,
createBuffers,
type Line,
@@ -1097,6 +1098,60 @@ export default function App({
commitEligibleLines(b);
}, [commitEligibleLines]);
// Trailing-edge debounce for bash streaming output (100ms = max 10 updates/sec)
// Unlike refreshDerivedThrottled, this REPLACES pending updates to always show latest state
const streamingRefreshTimeoutRef = useRef<ReturnType<
typeof setTimeout
> | null>(null);
const refreshDerivedStreaming = useCallback(() => {
// Cancel any pending refresh - we want the LATEST state
if (streamingRefreshTimeoutRef.current) {
clearTimeout(streamingRefreshTimeoutRef.current);
}
streamingRefreshTimeoutRef.current = setTimeout(() => {
streamingRefreshTimeoutRef.current = null;
if (!buffersRef.current.interrupted) {
refreshDerived();
}
}, 100);
}, [refreshDerived]);
// Cleanup streaming refresh on unmount
useEffect(() => {
return () => {
if (streamingRefreshTimeoutRef.current) {
clearTimeout(streamingRefreshTimeoutRef.current);
}
};
}, []);
// Helper to update streaming output for bash/shell tools
const updateStreamingOutput = useCallback(
(toolCallId: string, chunk: string, isStderr = false) => {
const lineId = buffersRef.current.toolCallIdToLineId.get(toolCallId);
if (!lineId) return;
const entry = buffersRef.current.byId.get(lineId);
if (!entry || entry.kind !== "tool_call") return;
// Immutable update with tail buffer
const newStreaming = appendStreamingOutput(
entry.streaming,
chunk,
entry.streaming?.startTime || Date.now(),
isStderr,
);
buffersRef.current.byId.set(lineId, {
...entry,
streaming: newStreaming,
});
refreshDerivedStreaming();
},
[refreshDerivedStreaming],
);
// Throttled version for streaming updates (~60fps max)
const refreshDerivedThrottled = useCallback(() => {
// Use a ref to track pending refresh
@@ -2075,6 +2130,10 @@ export default function App({
const autoAllowedResults = await executeAutoAllowedTools(
autoAllowed,
(chunk) => onChunk(buffersRef.current, chunk),
{
abortSignal: signal,
onStreamingOutput: updateStreamingOutput,
},
);
// Create denial results for auto-denied tools and update buffers
@@ -2564,6 +2623,7 @@ export default function App({
refreshDerivedThrottled,
setStreaming,
currentModelId,
updateStreamingOutput,
],
);
@@ -2965,14 +3025,22 @@ export default function App({
const handleBashSubmit = useCallback(
async (command: string) => {
const cmdId = uid("bash");
const startTime = Date.now();
// Add running bash_command line
// Add running bash_command line with streaming state
buffersRef.current.byId.set(cmdId, {
kind: "bash_command",
id: cmdId,
input: command,
output: "",
phase: "running",
streaming: {
tailLines: [],
partialLine: "",
partialIsStderr: false,
totalLineCount: 0,
startTime,
},
});
buffersRef.current.order.push(cmdId);
refreshDerived();
@@ -2986,13 +3054,29 @@ export default function App({
cwd: process.cwd(),
env: getShellEnv(),
timeout: 30000, // 30 second timeout
onOutput: (chunk, stream) => {
const entry = buffersRef.current.byId.get(cmdId);
if (entry && entry.kind === "bash_command") {
const newStreaming = appendStreamingOutput(
entry.streaming,
chunk,
startTime,
stream === "stderr",
);
buffersRef.current.byId.set(cmdId, {
...entry,
streaming: newStreaming,
});
refreshDerivedStreaming();
}
},
});
// Combine stdout and stderr for output
const output = (result.stdout + result.stderr).trim();
const success = result.exitCode === 0;
// Update line with output
// Update line with output, clear streaming state
buffersRef.current.byId.set(cmdId, {
kind: "bash_command",
id: cmdId,
@@ -3000,6 +3084,7 @@ export default function App({
output: output || (success ? "" : `Exit code: ${result.exitCode}`),
phase: "finished",
success,
streaming: undefined,
});
// Cache for next user message
@@ -3023,6 +3108,7 @@ export default function App({
output: errOutput,
phase: "finished",
success: false,
streaming: undefined,
});
// Still cache for next user message (even failures are visible to agent)
@@ -3031,7 +3117,7 @@ export default function App({
refreshDerived();
},
[refreshDerived],
[refreshDerived, refreshDerivedStreaming],
);
/**
@@ -3138,6 +3224,7 @@ export default function App({
const autoAllowedResults = await executeAutoAllowedTools(
autoAllowed,
(chunk) => onChunk(buffersRef.current, chunk),
{ onStreamingOutput: updateStreamingOutput },
);
// Map to ApprovalResult format (ToolReturn)
allResults.push(
@@ -3187,7 +3274,7 @@ export default function App({
// If check fails, proceed anyway (don't block user)
return { blocked: false };
}
}, [agentId, processConversation, refreshDerived]);
}, [agentId, processConversation, refreshDerived, updateStreamingOutput]);
// biome-ignore lint/correctness/useExhaustiveDependencies: refs read .current dynamically, complex callback with intentional deps
const onSubmit = useCallback(
@@ -5019,6 +5106,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
const autoAllowedResults = await executeAutoAllowedTools(
autoAllowed,
(chunk) => onChunk(buffersRef.current, chunk),
{ onStreamingOutput: updateStreamingOutput },
);
// Create denial results for auto-denied and update UI
@@ -5175,6 +5263,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
const autoAllowedWithResults = await executeAutoAllowedTools(
autoAllowed,
(chunk) => onChunk(buffersRef.current, chunk),
{ onStreamingOutput: updateStreamingOutput },
);
// Create denial reasons for auto-denied and update UI
@@ -5380,7 +5469,10 @@ DO NOT respond to these messages or otherwise consider them in your response unl
// Flush UI so completed tools show up while the batch continues
refreshDerived();
},
{ abortSignal: approvalAbortController.signal },
{
abortSignal: approvalAbortController.signal,
onStreamingOutput: updateStreamingOutput,
},
);
// Combine with auto-handled and auto-denied results using snapshots
@@ -5473,6 +5565,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
refreshDerived,
appendError,
setStreaming,
updateStreamingOutput,
],
);
@@ -5652,6 +5745,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
onChunk(buffersRef.current, chunk);
refreshDerived();
},
{ onStreamingOutput: updateStreamingOutput },
);
// Combine with auto-handled and auto-denied results (from initial check)
@@ -5704,6 +5798,7 @@ DO NOT respond to these messages or otherwise consider them in your response unl
refreshDerived,
isExecutingTool,
setStreaming,
updateStreamingOutput,
],
);

View File

@@ -1,9 +1,12 @@
import { Box, Text } from "ink";
import { memo } from "react";
import type { StreamingState } from "../helpers/accumulator";
import { useTerminalWidth } from "../hooks/useTerminalWidth";
import { BlinkDot } from "./BlinkDot.js";
import { CollapsedOutputDisplay } from "./CollapsedOutputDisplay";
import { colors } from "./colors.js";
import { MarkdownDisplay } from "./MarkdownDisplay.js";
import { StreamingOutputDisplay } from "./StreamingOutputDisplay";
type BashCommandLine = {
kind: "bash_command";
@@ -12,6 +15,7 @@ type BashCommandLine = {
output: string;
phase?: "running" | "finished";
success?: boolean;
streaming?: StreamingState;
};
/**
@@ -54,8 +58,18 @@ export const BashCommandMessage = memo(
</Box>
</Box>
{/* Command output (if present) */}
{line.output && (
{/* Streaming output during execution */}
{line.phase === "running" && line.streaming && (
<StreamingOutputDisplay streaming={line.streaming} />
)}
{/* Collapsed output after completion */}
{line.phase === "finished" && line.output && (
<CollapsedOutputDisplay output={line.output} />
)}
{/* Fallback: show output when phase is undefined (legacy bash commands before streaming) */}
{!line.phase && line.output && (
<Box flexDirection="row">
<Box width={5} flexShrink={0}>
<Text>{" ⎿ "}</Text>

View File

@@ -0,0 +1,57 @@
import { Box, Text } from "ink";
import { memo } from "react";
const COLLAPSED_LINES = 3;
interface CollapsedOutputDisplayProps {
output: string; // Full output from completion
}
/**
* Display component for bash output after completion.
* Shows first 3 lines with count of hidden lines.
* Note: expand/collapse (ctrl+o) is deferred to a future PR.
*/
export const CollapsedOutputDisplay = memo(
({ output }: CollapsedOutputDisplayProps) => {
// Keep empty lines for accurate display (don't filter them out)
const lines = output.split("\n");
// Remove trailing empty line from final newline
if (lines.length > 0 && lines[lines.length - 1] === "") {
lines.pop();
}
if (lines.length === 0) {
return null;
}
const visibleLines = lines.slice(0, COLLAPSED_LINES);
const hiddenCount = Math.max(0, lines.length - COLLAPSED_LINES);
return (
<Box flexDirection="column">
{/* L-bracket on first line - matches ToolCallMessageRich format " ⎿ " */}
<Box>
<Text>{" ⎿ "}</Text>
<Text>{visibleLines[0]}</Text>
</Box>
{/* Remaining visible lines with indent (5 spaces to align with content after bracket) */}
{visibleLines.slice(1).map((line, i) => (
// biome-ignore lint/suspicious/noArrayIndexKey: Lines are positional output, stable order within render
<Text key={i}>
{" "}
{line}
</Text>
))}
{/* Hidden count hint */}
{hiddenCount > 0 && (
<Text dimColor>
{" "} +{hiddenCount} lines
</Text>
)}
</Box>
);
},
);
CollapsedOutputDisplay.displayName = "CollapsedOutputDisplay";

View File

@@ -0,0 +1,67 @@
import { Box, Text } from "ink";
import { memo, useEffect, useState } from "react";
import type { StreamingState } from "../helpers/accumulator";
interface StreamingOutputDisplayProps {
streaming: StreamingState;
}
/**
* Display component for streaming bash output during execution.
* Shows a rolling window of the last 5 lines with elapsed time.
*/
export const StreamingOutputDisplay = memo(
({ streaming }: StreamingOutputDisplayProps) => {
// Force re-render every second for elapsed timer
const [, forceUpdate] = useState(0);
useEffect(() => {
const interval = setInterval(() => forceUpdate((n) => n + 1), 1000);
return () => clearInterval(interval);
}, []);
const elapsed = Math.floor((Date.now() - streaming.startTime) / 1000);
const { tailLines, totalLineCount } = streaming;
const hiddenCount = Math.max(0, totalLineCount - tailLines.length);
// No output yet - don't show anything
const firstLine = tailLines[0];
if (!firstLine) {
return null;
}
return (
<Box flexDirection="column">
{/* L-bracket on first line - matches ToolCallMessageRich format " ⎿ " */}
<Box>
<Text dimColor>{" ⎿ "}</Text>
<Text
dimColor={!firstLine.isStderr}
color={firstLine.isStderr ? "red" : undefined}
>
{firstLine.text}
</Text>
</Box>
{/* Remaining lines with indent (5 spaces to align with content after bracket) */}
{tailLines.slice(1).map((line, i) => (
<Text
// biome-ignore lint/suspicious/noArrayIndexKey: Lines are positional output, stable order within render
key={i}
dimColor={!line.isStderr}
color={line.isStderr ? "red" : undefined}
>
{" "}
{line.text}
</Text>
))}
{/* Hidden count + elapsed time */}
{hiddenCount > 0 && (
<Text dimColor>
{" "} +{hiddenCount} more lines ({elapsed}s)
</Text>
)}
</Box>
);
},
);
StreamingOutputDisplay.displayName = "StreamingOutputDisplay";

View File

@@ -29,9 +29,11 @@ function isQuestionTool(name: string): boolean {
return name === "AskUserQuestion";
}
import type { StreamingState } from "../helpers/accumulator";
import { useTerminalWidth } from "../hooks/useTerminalWidth";
import { AdvancedDiffRenderer } from "./AdvancedDiffRenderer";
import { BlinkDot } from "./BlinkDot.js";
import { CollapsedOutputDisplay } from "./CollapsedOutputDisplay";
import { colors } from "./colors.js";
import {
EditRenderer,
@@ -41,6 +43,7 @@ import {
import { MarkdownDisplay } from "./MarkdownDisplay.js";
import { MemoryDiffRenderer } from "./MemoryDiffRenderer.js";
import { PlanRenderer } from "./PlanRenderer.js";
import { StreamingOutputDisplay } from "./StreamingOutputDisplay";
import { TodoRenderer } from "./TodoRenderer.js";
type ToolCallLine = {
@@ -52,8 +55,25 @@ type ToolCallLine = {
resultText?: string;
resultOk?: boolean;
phase: "streaming" | "ready" | "running" | "finished";
streaming?: StreamingState;
};
/**
* Check if tool is a shell/bash tool that supports streaming output
*/
function isShellTool(name: string): boolean {
const shellTools = [
"Bash",
"Shell",
"shell",
"shell_command",
"run_shell_command",
"RunShellCommand",
"ShellCommand",
];
return shellTools.includes(name);
}
/**
* ToolCallMessageRich - Rich formatting version with old layout logic
* This preserves the exact wrapping and spacing logic from the old codebase
@@ -680,8 +700,31 @@ export const ToolCallMessage = memo(
</Box>
</Box>
{/* Tool result (if present) */}
{getResultElement()}
{/* Streaming output for shell tools during execution */}
{isShellTool(rawName) && line.phase === "running" && line.streaming && (
<StreamingOutputDisplay streaming={line.streaming} />
)}
{/* Collapsed output for shell tools after completion */}
{isShellTool(rawName) &&
line.phase === "finished" &&
line.resultText &&
line.resultOk !== false && (
<CollapsedOutputDisplay output={line.resultText} />
)}
{/* Tool result for non-shell tools or shell tool errors */}
{(() => {
// Show default result element when:
// - Not a shell tool (always show result)
// - Shell tool with error (show error message)
// - Shell tool in streaming/ready phase (show default "Running..." etc)
const showDefaultResult =
!isShellTool(rawName) ||
(line.phase === "finished" && line.resultOk === false) ||
(line.phase !== "running" && line.phase !== "finished");
return showDefaultResult ? getResultElement() : null;
})()}
</Box>
);
},

View File

@@ -7,6 +7,93 @@
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
import { INTERRUPTED_BY_USER } from "../../constants";
// Constants for streaming output
const MAX_TAIL_LINES = 5;
const MAX_BUFFER_SIZE = 100_000; // 100KB
/**
* A line of streaming output with its source (stdout or stderr).
*/
export interface StreamingLine {
text: string;
isStderr: boolean;
}
/**
* Streaming state for bash/shell tools.
* Tracks a rolling window of output during execution.
*/
export interface StreamingState {
tailLines: StreamingLine[]; // Last 5 complete lines (for rolling display)
partialLine: string; // Incomplete line being accumulated
partialIsStderr: boolean; // Whether partial line is from stderr
totalLineCount: number; // Total lines seen (for "+N more" count)
startTime: number; // For elapsed time display
}
/**
* Append a chunk of output to the streaming state.
* Maintains a tail buffer of the last N lines and handles partial line accumulation.
*/
export function appendStreamingOutput(
state: StreamingState | undefined,
chunk: string,
startTime: number,
isStderr = false,
): StreamingState {
const current = state || {
tailLines: [],
partialLine: "",
partialIsStderr: false,
totalLineCount: 0,
startTime,
};
let tailLines = [...current.tailLines];
let totalLineCount = current.totalLineCount;
let partialLine = current.partialLine;
let partialIsStderr = current.partialIsStderr;
// If stream type changed and we have a partial, flush it as a complete line
if (partialLine && isStderr !== partialIsStderr) {
tailLines.push({ text: partialLine, isStderr: partialIsStderr });
totalLineCount++;
partialLine = "";
}
// Append chunk to partial line
let buffer = partialLine + chunk;
// Size limit check - slice at line boundary to avoid corrupted lines
if (buffer.length > MAX_BUFFER_SIZE) {
const truncated = buffer.slice(-MAX_BUFFER_SIZE);
const firstNewline = truncated.indexOf("\n");
buffer = firstNewline >= 0 ? truncated.slice(firstNewline + 1) : truncated;
}
// Split into complete lines + remainder
const lines = buffer.split("\n");
const newPartialLine = lines.pop() || ""; // Last element is incomplete
// Convert string lines to StreamingLine objects with current stream's stderr flag
const newLines: StreamingLine[] = lines.map((text) => ({
text,
isStderr,
}));
// Update tail with new complete lines (keep empty lines for accurate display)
const allLines = [...tailLines, ...newLines];
const finalTailLines = allLines.slice(-MAX_TAIL_LINES);
return {
tailLines: finalTailLines,
partialLine: newPartialLine,
partialIsStderr: isStderr,
totalLineCount: totalLineCount + lines.length,
startTime: current.startTime,
};
}
// One line per transcript row. Tool calls evolve in-place.
// For tool call returns, merge into the tool call matching the toolCallId
export type Line =
@@ -36,6 +123,8 @@ export type Line =
resultOk?: boolean;
// state that's useful for rendering
phase: "streaming" | "ready" | "running" | "finished";
// streaming output state (for shell tools during execution)
streaming?: StreamingState;
}
| { kind: "error"; id: string; text: string }
| {
@@ -54,6 +143,8 @@ export type Line =
output: string;
phase?: "running" | "finished";
success?: boolean;
// streaming output state (during execution)
streaming?: StreamingState;
}
| {
kind: "status";

View File

@@ -37,6 +37,7 @@ function spawnWithLauncher(
env: NodeJS.ProcessEnv;
timeout: number;
signal?: AbortSignal;
onOutput?: (chunk: string, stream: "stdout" | "stderr") => void;
},
): Promise<{ stdout: string; stderr: string; exitCode: number | null }> {
return new Promise((resolve, reject) => {
@@ -71,10 +72,12 @@ function spawnWithLauncher(
childProcess.stdout?.on("data", (chunk: Buffer) => {
stdoutChunks.push(chunk);
options.onOutput?.(chunk.toString("utf8"), "stdout");
});
childProcess.stderr?.on("data", (chunk: Buffer) => {
stderrChunks.push(chunk);
options.onOutput?.(chunk.toString("utf8"), "stderr");
});
childProcess.on("error", (err) => {
@@ -137,6 +140,7 @@ export async function spawnCommand(
env: NodeJS.ProcessEnv;
timeout: number;
signal?: AbortSignal;
onOutput?: (chunk: string, stream: "stdout" | "stderr") => void;
},
): Promise<{ stdout: string; stderr: string; exitCode: number | null }> {
// On Unix (Linux/macOS), use simple bash -c approach (original behavior)
@@ -200,6 +204,7 @@ interface BashArgs {
description?: string;
run_in_background?: boolean;
signal?: AbortSignal;
onOutput?: (chunk: string, stream: "stdout" | "stderr") => void;
}
interface BashResult {
@@ -218,6 +223,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
description: _description,
run_in_background = false,
signal,
onOutput,
} = args;
const userCwd = process.env.USER_CWD || process.cwd();
@@ -314,6 +320,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
env: getShellEnv(),
timeout: effectiveTimeout,
signal,
onOutput,
});
let output = stdout;

View File

@@ -87,5 +87,15 @@ export function getShellEnv(): NodeJS.ProcessEnv {
: nodeModulesDir;
}
// Disable interactive pagers (fixes git log, man, etc. hanging)
env.PAGER = "cat";
env.GIT_PAGER = "cat";
env.MANPAGER = "cat";
// Ensure TERM is set for proper color support
if (!env.TERM) {
env.TERM = "xterm-256color";
}
return env;
}

View File

@@ -686,13 +686,17 @@ function flattenToolResponse(result: unknown): string {
*
* @param name - The name of the tool to execute
* @param args - Arguments object to pass to the tool
* @param options - Optional execution options (abort signal, tool call ID)
* @param options - Optional execution options (abort signal, tool call ID, streaming callback)
* @returns Promise with the tool's execution result including status and optional stdout/stderr
*/
export async function executeTool(
name: string,
args: ToolArgs,
options?: { signal?: AbortSignal; toolCallId?: string },
options?: {
signal?: AbortSignal;
toolCallId?: string;
onOutput?: (chunk: string, stream: "stdout" | "stderr") => void;
},
): Promise<ToolExecutionResult> {
const internalName = resolveInternalToolName(name);
if (!internalName) {
@@ -716,9 +720,14 @@ export async function executeTool(
// Inject options for tools that support them without altering schemas
let enhancedArgs = args;
// Inject abort signal for Bash tool
if (internalName === "Bash" && options?.signal) {
enhancedArgs = { ...enhancedArgs, signal: options.signal };
// Inject abort signal and streaming callback for Bash tool
if (internalName === "Bash") {
if (options?.signal) {
enhancedArgs = { ...enhancedArgs, signal: options.signal };
}
if (options?.onOutput) {
enhancedArgs = { ...enhancedArgs, onOutput: options.onOutput };
}
}
// Inject toolCallId and abort signal for Task tool