diff --git a/src/cli/components/ToolCallMessageRich.tsx b/src/cli/components/ToolCallMessageRich.tsx index 1dd55c6..e00f137 100644 --- a/src/cli/components/ToolCallMessageRich.tsx +++ b/src/cli/components/ToolCallMessageRich.tsx @@ -21,7 +21,7 @@ import { isPatchTool, isPlanTool, isSearchTool, - isShellTool, + isShellOutputTool, isTaskTool, isTodoTool, } from "../helpers/toolNameMapping.js"; @@ -192,7 +192,12 @@ export const ToolCallMessage = memo( const truncatedDisplay = needsTruncation ? `${normalizedDisplay.slice(0, maxArgsChars - 1)}…` : normalizedDisplay; - args = `(${truncatedDisplay})`; + if (rawName.toLowerCase() === "taskoutput") { + const separator = truncatedDisplay.startsWith("(") ? "" : " "; + args = separator + truncatedDisplay; + } else { + args = `(${truncatedDisplay})`; + } } } @@ -850,16 +855,21 @@ export const ToolCallMessage = memo( {/* Streaming output for shell tools during execution */} - {isShellTool(rawName) && line.phase === "running" && line.streaming && ( - - )} + {isShellOutputTool(rawName) && + line.phase === "running" && + line.streaming && ( + + )} {/* Collapsed output for shell tools after completion */} - {isShellTool(rawName) && + {isShellOutputTool(rawName) && line.phase === "finished" && line.resultText && line.resultOk !== false && ( - + )} {/* Tool result for non-shell tools or shell tool errors */} @@ -869,7 +879,7 @@ export const ToolCallMessage = memo( // - Shell tool with error (show error message) // - Shell tool in streaming/ready phase (show default "Running..." etc) const showDefaultResult = - !isShellTool(rawName) || + !isShellOutputTool(rawName) || (line.phase === "finished" && line.resultOk === false) || (line.phase !== "running" && line.phase !== "finished"); return showDefaultResult ? getResultElement() : null; diff --git a/src/cli/helpers/accumulator.ts b/src/cli/helpers/accumulator.ts index f005253..93f056b 100644 --- a/src/cli/helpers/accumulator.ts +++ b/src/cli/helpers/accumulator.ts @@ -16,7 +16,7 @@ import { extractCompactionSummary } from "./backfill"; import type { ContextTracker } from "./contextTracker"; import { MAX_CONTEXT_HISTORY } from "./contextTracker"; import { findLastSafeSplitPoint } from "./markdownSplit"; -import { isShellTool } from "./toolNameMapping"; +import { isShellOutputTool } from "./toolNameMapping"; type CompactionSummaryMessageChunk = { message_type: "summary_message"; @@ -1207,7 +1207,7 @@ export function setToolCallsRunning(b: Buffers, toolCallIds: string[]): void { const line = b.byId.get(lineId); if (line && line.kind === "tool_call") { const shouldSeedStreaming = - line.name && isShellTool(line.name) && !line.streaming; + line.name && isShellOutputTool(line.name) && !line.streaming; b.byId.set(lineId, { ...line, phase: "running", diff --git a/src/cli/helpers/formatArgsDisplay.ts b/src/cli/helpers/formatArgsDisplay.ts index a112425..79f6996 100644 --- a/src/cli/helpers/formatArgsDisplay.ts +++ b/src/cli/helpers/formatArgsDisplay.ts @@ -272,6 +272,14 @@ export function formatArgsDisplay( return { display, parsed }; } + // TaskOutput: show task id with optional non-blocking marker + if (toolName.toLowerCase() === "taskoutput" && parsed.task_id) { + const taskId = String(parsed.task_id); + const isNonBlocking = parsed.block === false; + display = isNonBlocking ? `(non-blocking) ${taskId}` : taskId; + return { display, parsed }; + } + // Shell/Bash tools: show just the command if (isShellTool(toolName) && parsed.command) { // Handle both string and array command formats diff --git a/src/cli/helpers/toolNameMapping.ts b/src/cli/helpers/toolNameMapping.ts index 82f0fce..29769ac 100644 --- a/src/cli/helpers/toolNameMapping.ts +++ b/src/cli/helpers/toolNameMapping.ts @@ -68,9 +68,8 @@ export function getDisplayToolName(rawName: string): string { if (rawName === "Replace" || rawName === "replace") return "Update"; if (rawName === "WriteFile" || rawName === "write_file") return "Write"; if (rawName === "KillBash") return "Kill Bash"; - if (rawName === "BashOutput" || rawName === "TaskOutput") { - return "Shell Output"; - } + if (rawName === "BashOutput") return "Shell Output"; + if (rawName === "TaskOutput") return "Task Output"; if (rawName === "MultiEdit") return "Update"; // No mapping found, return as-is @@ -215,6 +214,15 @@ export function isShellTool(name: string): boolean { ); } +/** + * Checks if a tool should use shell-style streaming output rendering. + * Includes shell command tools plus TaskOutput/BashOutput pollers. + */ +export function isShellOutputTool(name: string): boolean { + const n = name.toLowerCase(); + return isShellTool(name) || n === "taskoutput" || n === "bashoutput"; +} + /** * Checks if a tool is a search/grep tool */ diff --git a/src/tests/cli/toolNameMapping.test.ts b/src/tests/cli/toolNameMapping.test.ts index 014462a..9827d70 100644 --- a/src/tests/cli/toolNameMapping.test.ts +++ b/src/tests/cli/toolNameMapping.test.ts @@ -1,5 +1,9 @@ import { describe, expect, test } from "bun:test"; -import { isMemoryTool } from "../../cli/helpers/toolNameMapping"; +import { + getDisplayToolName, + isMemoryTool, + isShellOutputTool, +} from "../../cli/helpers/toolNameMapping"; describe("toolNameMapping.isMemoryTool", () => { test("recognizes all supported memory tool names", () => { @@ -15,3 +19,16 @@ describe("toolNameMapping.isMemoryTool", () => { expect(isMemoryTool("web_search")).toBe(false); }); }); + +describe("toolNameMapping task output mappings", () => { + test("uses distinct display labels for shell output and task output", () => { + expect(getDisplayToolName("BashOutput")).toBe("Shell Output"); + expect(getDisplayToolName("TaskOutput")).toBe("Task Output"); + }); + + test("treats TaskOutput as shell-style output for streaming UI", () => { + expect(isShellOutputTool("TaskOutput")).toBe(true); + expect(isShellOutputTool("BashOutput")).toBe(true); + expect(isShellOutputTool("Task")).toBe(false); + }); +}); diff --git a/src/tests/tools/task-output.test.ts b/src/tests/tools/task-output.test.ts index f0f01eb..875c2a0 100644 --- a/src/tests/tools/task-output.test.ts +++ b/src/tests/tools/task-output.test.ts @@ -31,6 +31,7 @@ describe.skipIf(isWindows)("TaskOutput and TaskStop", () => { // Should return in less than 500ms (not waiting for 2s sleep) expect(elapsed).toBeLessThan(500); expect(result.status).toBe("running"); + expect(result.message).toContain("Task is still running"); // Cleanup await task_stop({ task_id: taskId }); @@ -60,6 +61,31 @@ describe.skipIf(isWindows)("TaskOutput and TaskStop", () => { expect(result.status).toBe("completed"); }); + test("TaskOutput with block=true streams output chunks", async () => { + const startResult = await bash({ + command: "sleep 0.2 && echo 'first' && sleep 0.2 && echo 'second'", + description: "Streaming process", + run_in_background: true, + }); + + const match = startResult.content[0]?.text.match(/bash_(\d+)/); + expect(match).toBeDefined(); + const taskId = `bash_${match?.[1]}`; + + const outputChunks: string[] = []; + const result = await task_output({ + task_id: taskId, + block: true, + timeout: 5000, + onOutput: (chunk) => outputChunks.push(chunk), + }); + + const streamed = outputChunks.join(""); + expect(streamed).toContain("first"); + expect(streamed).toContain("second"); + expect(result.status).toBe("completed"); + }); + test("TaskOutput respects timeout when blocking", async () => { // Start a long-running process const startResult = await bash({ diff --git a/src/tools/impl/BashOutput.ts b/src/tools/impl/BashOutput.ts index 7d4938e..7812c7e 100644 --- a/src/tools/impl/BashOutput.ts +++ b/src/tools/impl/BashOutput.ts @@ -7,6 +7,8 @@ interface GetTaskOutputArgs { block?: boolean; timeout?: number; filter?: string; + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void; + runningMessageWhenNonBlocking?: boolean; } interface GetTaskOutputResult { @@ -14,6 +16,74 @@ interface GetTaskOutputResult { status?: "running" | "completed" | "failed"; } +const POLL_INTERVAL_MS = 100; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function emitNewProcessOutput( + proc: typeof backgroundProcesses extends Map ? V : never, + onOutput: (chunk: string, stream: "stdout" | "stderr") => void, + indexes: { stdout: number; stderr: number }, + filter?: string, +): { stdout: number; stderr: number } { + const next = { ...indexes }; + + if (proc.stdout.length > next.stdout) { + const newStdoutLines = proc.stdout.slice(next.stdout); + const filtered = filter + ? newStdoutLines.filter((line) => line.includes(filter)) + : newStdoutLines; + if (filtered.length > 0) { + onOutput(`${filtered.join("\n")}\n`, "stdout"); + } + next.stdout = proc.stdout.length; + } + + if (proc.stderr.length > next.stderr) { + const newStderrLines = proc.stderr.slice(next.stderr); + const filtered = filter + ? newStderrLines.filter((line) => line.includes(filter)) + : newStderrLines; + if (filtered.length > 0) { + onOutput(`${filtered.join("\n")}\n`, "stderr"); + } + next.stderr = proc.stderr.length; + } + + return next; +} + +function emitNewBackgroundTaskOutput( + task: typeof backgroundTasks extends Map ? V : never, + onOutput: (chunk: string, stream: "stdout" | "stderr") => void, + cursor: { outputIndex: number; emittedError?: string }, + filter?: string, +): { outputIndex: number; emittedError?: string } { + const next = { ...cursor }; + + if (task.output.length > next.outputIndex) { + const newOutputLines = task.output.slice(next.outputIndex); + const filtered = filter + ? newOutputLines.filter((line) => line.includes(filter)) + : newOutputLines; + if (filtered.length > 0) { + onOutput(`${filtered.join("\n")}\n`, "stdout"); + } + next.outputIndex = task.output.length; + } + + if (task.error && task.error !== next.emittedError) { + if (!filter || task.error.includes(filter)) { + onOutput(`[error] ${task.error}\n`, "stderr"); + } + next.emittedError = task.error; + } + + return next; +} + /** * Core implementation for retrieving task/process output. * Used by both BashOutput (legacy) and TaskOutput (new). @@ -22,18 +92,41 @@ interface GetTaskOutputResult { export async function getTaskOutput( args: GetTaskOutputArgs, ): Promise { - const { task_id, block = false, timeout = 30000, filter } = args; + const { + task_id, + block = false, + timeout = 30000, + filter, + onOutput, + runningMessageWhenNonBlocking = false, + } = args; // Check backgroundProcesses first (for Bash background commands) const proc = backgroundProcesses.get(task_id); if (proc) { - return getProcessOutput(task_id, proc, block, timeout, filter); + return getProcessOutput( + task_id, + proc, + block, + timeout, + filter, + onOutput, + runningMessageWhenNonBlocking, + ); } // Check backgroundTasks (for Task background subagents) const task = backgroundTasks.get(task_id); if (task) { - return getBackgroundTaskOutput(task_id, task, block, timeout, filter); + return getBackgroundTaskOutput( + task_id, + task, + block, + timeout, + filter, + onOutput, + runningMessageWhenNonBlocking, + ); } return { message: `No background process found with ID: ${task_id}` }; @@ -48,22 +141,37 @@ async function getProcessOutput( block: boolean, timeout: number, filter?: string, + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void, + runningMessageWhenNonBlocking?: boolean, ): Promise { - // If blocking, wait for process to complete (or timeout) + // If blocking, wait for process to complete (or timeout) while streaming deltas. if (block && proc.status === "running") { const startTime = Date.now(); - await new Promise((resolve) => { - const checkInterval = setInterval(() => { - const currentProc = backgroundProcesses.get(task_id); - if (!currentProc || currentProc.status !== "running") { - clearInterval(checkInterval); - resolve(); - } else if (Date.now() - startTime >= timeout) { - clearInterval(checkInterval); - resolve(); - } - }, 100); // Check every 100ms - }); + let cursor = { stdout: 0, stderr: 0 }; + + if (onOutput) { + cursor = emitNewProcessOutput(proc, onOutput, cursor, filter); + } + + while (Date.now() - startTime < timeout) { + const currentProc = backgroundProcesses.get(task_id); + if (!currentProc) break; + + if (onOutput) { + cursor = emitNewProcessOutput(currentProc, onOutput, cursor, filter); + } + + if (currentProc.status !== "running") { + break; + } + + await sleep(POLL_INTERVAL_MS); + } + + const finalProc = backgroundProcesses.get(task_id); + if (finalProc && onOutput) { + emitNewProcessOutput(finalProc, onOutput, cursor, filter); + } } // Re-fetch in case status changed while waiting @@ -72,6 +180,14 @@ async function getProcessOutput( return { message: `Process ${task_id} no longer exists` }; } + if ( + !block && + runningMessageWhenNonBlocking && + currentProc.status === "running" + ) { + return { message: "Task is still running...", status: "running" }; + } + const stdout = currentProc.stdout.join("\n"); const stderr = currentProc.stderr.join("\n"); let text = stdout; @@ -109,22 +225,44 @@ async function getBackgroundTaskOutput( block: boolean, timeout: number, filter?: string, + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void, + runningMessageWhenNonBlocking?: boolean, ): Promise { - // If blocking, wait for task to complete (or timeout) + // If blocking, wait for task to complete (or timeout) while streaming deltas. if (block && task.status === "running") { const startTime = Date.now(); - await new Promise((resolve) => { - const checkInterval = setInterval(() => { - const currentTask = backgroundTasks.get(task_id); - if (!currentTask || currentTask.status !== "running") { - clearInterval(checkInterval); - resolve(); - } else if (Date.now() - startTime >= timeout) { - clearInterval(checkInterval); - resolve(); - } - }, 100); // Check every 100ms - }); + let cursor: { outputIndex: number; emittedError?: string } = { + outputIndex: 0, + }; + + if (onOutput) { + cursor = emitNewBackgroundTaskOutput(task, onOutput, cursor, filter); + } + + while (Date.now() - startTime < timeout) { + const currentTask = backgroundTasks.get(task_id); + if (!currentTask) break; + + if (onOutput) { + cursor = emitNewBackgroundTaskOutput( + currentTask, + onOutput, + cursor, + filter, + ); + } + + if (currentTask.status !== "running") { + break; + } + + await sleep(POLL_INTERVAL_MS); + } + + const finalTask = backgroundTasks.get(task_id); + if (finalTask && onOutput) { + emitNewBackgroundTaskOutput(finalTask, onOutput, cursor, filter); + } } // Re-fetch in case status changed while waiting @@ -133,6 +271,14 @@ async function getBackgroundTaskOutput( return { message: `Task ${task_id} no longer exists` }; } + if ( + !block && + runningMessageWhenNonBlocking && + currentTask.status === "running" + ) { + return { message: "Task is still running...", status: "running" }; + } + let text = currentTask.output.join("\n"); if (currentTask.error) { text = text diff --git a/src/tools/impl/TaskOutput.ts b/src/tools/impl/TaskOutput.ts index 498fc81..905fd25 100644 --- a/src/tools/impl/TaskOutput.ts +++ b/src/tools/impl/TaskOutput.ts @@ -5,6 +5,7 @@ interface TaskOutputArgs { task_id: string; block?: boolean; timeout?: number; + onOutput?: (chunk: string, stream: "stdout" | "stderr") => void; } interface TaskOutputResult { @@ -20,11 +21,13 @@ export async function task_output( args: TaskOutputArgs, ): Promise { validateRequiredParams(args, ["task_id"], "TaskOutput"); - const { task_id, block = true, timeout = 30000 } = args; + const { task_id, block = true, timeout = 30000, onOutput } = args; return getTaskOutput({ task_id, block, timeout, + onOutput, + runningMessageWhenNonBlocking: true, }); } diff --git a/src/tools/manager.ts b/src/tools/manager.ts index 7d149cb..52d4812 100644 --- a/src/tools/manager.ts +++ b/src/tools/manager.ts @@ -15,6 +15,8 @@ import { TOOL_DEFINITIONS, type ToolName } from "./toolDefinitions"; export const TOOL_NAMES = Object.keys(TOOL_DEFINITIONS) as ToolName[]; const STREAMING_SHELL_TOOLS = new Set([ "Bash", + "BashOutput", + "TaskOutput", "shell_command", "ShellCommand", "shell", @@ -110,6 +112,8 @@ export const OPENAI_PASCAL_TOOLS: ToolName[] = [ "EnterPlanMode", "ExitPlanMode", "Task", + "TaskOutput", + "TaskStop", "Skill", // Standard Codex tools "ShellCommand",