feat: add background task notification system (#827)

Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
Charles Packer
2026-02-04 22:45:16 -08:00
committed by GitHub
parent 84e9a6d744
commit 48ccd8f220
44 changed files with 2244 additions and 234 deletions

View File

@@ -1,6 +1,11 @@
import { spawn } from "node:child_process";
import { INTERRUPTED_BY_USER } from "../../constants";
import { backgroundProcesses, getNextBashId } from "./process_manager.js";
import {
appendToOutputFile,
backgroundProcesses,
createBackgroundOutputFile,
getNextBashId,
} from "./process_manager.js";
import { getShellEnv } from "./shellEnv.js";
import { buildShellLaunchers } from "./shellLaunchers.js";
import { spawnWithLauncher } from "./shellRunner.js";
@@ -168,6 +173,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
if (run_in_background) {
const bashId = getNextBashId();
const outputFile = createBackgroundOutputFile(bashId);
const launcher = getBackgroundLauncher(command);
const [executable, ...launcherArgs] = launcher;
if (!executable) {
@@ -190,26 +196,35 @@ export async function bash(args: BashArgs): Promise<BashResult> {
exitCode: null,
lastReadIndex: { stdout: 0, stderr: 0 },
startTime: new Date(),
outputFile,
});
const bgProcess = backgroundProcesses.get(bashId);
if (!bgProcess) {
throw new Error("Failed to track background process state");
}
childProcess.stdout?.on("data", (data: Buffer) => {
const lines = data.toString().split("\n").filter(Boolean);
const text = data.toString();
const lines = text.split("\n").filter(Boolean);
bgProcess.stdout.push(...lines);
// Also write to output file
appendToOutputFile(outputFile, text);
});
childProcess.stderr?.on("data", (data: Buffer) => {
const lines = data.toString().split("\n").filter(Boolean);
const text = data.toString();
const lines = text.split("\n").filter(Boolean);
bgProcess.stderr.push(...lines);
// Also write to output file (prefixed with [stderr])
appendToOutputFile(outputFile, `[stderr] ${text}`);
});
childProcess.on("exit", (code: number | null) => {
bgProcess.status = code === 0 ? "completed" : "failed";
bgProcess.exitCode = code;
appendToOutputFile(outputFile, `\n[exit code: ${code}]\n`);
});
childProcess.on("error", (err: Error) => {
bgProcess.status = "failed";
bgProcess.stderr.push(err.message);
appendToOutputFile(outputFile, `\n[error] ${err.message}\n`);
});
if (timeout && timeout > 0) {
setTimeout(() => {
@@ -217,6 +232,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
childProcess.kill("SIGTERM");
bgProcess.status = "failed";
bgProcess.stderr.push(`Command timed out after ${timeout}ms`);
appendToOutputFile(outputFile, `\n[timeout after ${timeout}ms]\n`);
}
}, timeout);
}
@@ -224,7 +240,7 @@ export async function bash(args: BashArgs): Promise<BashResult> {
content: [
{
type: "text",
text: `Command running in background with ID: ${bashId}`,
text: `Command running in background with ID: ${bashId}\nOutput file: ${outputFile}`,
},
],
status: "success",

View File

@@ -1,27 +1,82 @@
import { backgroundProcesses } from "./process_manager.js";
import { backgroundProcesses, backgroundTasks } from "./process_manager.js";
import { LIMITS, truncateByChars } from "./truncation.js";
import { validateRequiredParams } from "./validation.js";
interface BashOutputArgs {
shell_id: string;
interface GetTaskOutputArgs {
task_id: string;
block?: boolean;
timeout?: number;
filter?: string;
}
interface BashOutputResult {
interface GetTaskOutputResult {
message: string;
status?: "running" | "completed" | "failed";
}
export async function bash_output(
args: BashOutputArgs,
): Promise<BashOutputResult> {
validateRequiredParams(args, ["shell_id"], "BashOutput");
const { shell_id, filter } = args;
const proc = backgroundProcesses.get(shell_id);
if (!proc)
return { message: `No background process found with ID: ${shell_id}` };
const stdout = proc.stdout.join("\n");
const stderr = proc.stderr.join("\n");
/**
* Core implementation for retrieving task/process output.
* Used by both BashOutput (legacy) and TaskOutput (new).
* Checks both backgroundProcesses (Bash) and backgroundTasks (Task).
*/
export async function getTaskOutput(
args: GetTaskOutputArgs,
): Promise<GetTaskOutputResult> {
const { task_id, block = false, timeout = 30000, filter } = args;
// Check backgroundProcesses first (for Bash background commands)
const proc = backgroundProcesses.get(task_id);
if (proc) {
return getProcessOutput(task_id, proc, block, timeout, filter);
}
// Check backgroundTasks (for Task background subagents)
const task = backgroundTasks.get(task_id);
if (task) {
return getBackgroundTaskOutput(task_id, task, block, timeout, filter);
}
return { message: `No background process found with ID: ${task_id}` };
}
/**
* Get output from a background Bash process.
*/
async function getProcessOutput(
task_id: string,
proc: typeof backgroundProcesses extends Map<string, infer V> ? V : never,
block: boolean,
timeout: number,
filter?: string,
): Promise<GetTaskOutputResult> {
// If blocking, wait for process to complete (or timeout)
if (block && proc.status === "running") {
const startTime = Date.now();
await new Promise<void>((resolve) => {
const checkInterval = setInterval(() => {
const currentProc = backgroundProcesses.get(task_id);
if (!currentProc || currentProc.status !== "running") {
clearInterval(checkInterval);
resolve();
} else if (Date.now() - startTime >= timeout) {
clearInterval(checkInterval);
resolve();
}
}, 100); // Check every 100ms
});
}
// Re-fetch in case status changed while waiting
const currentProc = backgroundProcesses.get(task_id);
if (!currentProc) {
return { message: `Process ${task_id} no longer exists` };
}
const stdout = currentProc.stdout.join("\n");
const stderr = currentProc.stderr.join("\n");
let text = stdout;
if (stderr) text = text ? `${text}\n${stderr}` : stderr;
if (filter) {
text = text
.split("\n")
@@ -31,13 +86,107 @@ export async function bash_output(
const userCwd = process.env.USER_CWD || process.cwd();
// Apply character limit to prevent excessive token usage (same as Bash)
// Apply character limit to prevent excessive token usage
const { content: truncatedOutput } = truncateByChars(
text || "(no output yet)",
LIMITS.BASH_OUTPUT_CHARS,
"BashOutput",
{ workingDirectory: userCwd, toolName: "BashOutput" },
"TaskOutput",
{ workingDirectory: userCwd, toolName: "TaskOutput" },
);
return { message: truncatedOutput };
return {
message: truncatedOutput,
status: currentProc.status,
};
}
/**
* Get output from a background Task (subagent).
*/
async function getBackgroundTaskOutput(
task_id: string,
task: typeof backgroundTasks extends Map<string, infer V> ? V : never,
block: boolean,
timeout: number,
filter?: string,
): Promise<GetTaskOutputResult> {
// If blocking, wait for task to complete (or timeout)
if (block && task.status === "running") {
const startTime = Date.now();
await new Promise<void>((resolve) => {
const checkInterval = setInterval(() => {
const currentTask = backgroundTasks.get(task_id);
if (!currentTask || currentTask.status !== "running") {
clearInterval(checkInterval);
resolve();
} else if (Date.now() - startTime >= timeout) {
clearInterval(checkInterval);
resolve();
}
}, 100); // Check every 100ms
});
}
// Re-fetch in case status changed while waiting
const currentTask = backgroundTasks.get(task_id);
if (!currentTask) {
return { message: `Task ${task_id} no longer exists` };
}
let text = currentTask.output.join("\n");
if (currentTask.error) {
text = text
? `${text}\n[error] ${currentTask.error}`
: `[error] ${currentTask.error}`;
}
if (filter) {
text = text
.split("\n")
.filter((line) => line.includes(filter))
.join("\n");
}
const userCwd = process.env.USER_CWD || process.cwd();
// Apply character limit to prevent excessive token usage
const { content: truncatedOutput } = truncateByChars(
text || "(no output yet)",
LIMITS.TASK_OUTPUT_CHARS,
"TaskOutput",
{ workingDirectory: userCwd, toolName: "TaskOutput" },
);
return {
message: truncatedOutput,
status: currentTask.status,
};
}
// Legacy BashOutput interface
interface BashOutputArgs {
shell_id: string;
filter?: string;
}
interface BashOutputResult {
message: string;
}
/**
* Legacy BashOutput function - wraps getTaskOutput with non-blocking behavior.
*/
export async function bash_output(
args: BashOutputArgs,
): Promise<BashOutputResult> {
validateRequiredParams(args, ["shell_id"], "BashOutput");
const { shell_id, filter } = args;
const result = await getTaskOutput({
task_id: shell_id,
block: false, // BashOutput is always non-blocking (legacy behavior)
filter,
});
return { message: result.message };
}

View File

@@ -11,12 +11,22 @@ import {
getAllSubagentConfigs,
} from "../../agent/subagents";
import { spawnSubagent } from "../../agent/subagents/manager";
import { addToMessageQueue } from "../../cli/helpers/messageQueueBridge.js";
import {
completeSubagent,
generateSubagentId,
getSnapshot as getSubagentSnapshot,
registerSubagent,
} from "../../cli/helpers/subagentState.js";
import { formatTaskNotification } from "../../cli/helpers/taskNotifications.js";
import { runSubagentStopHooks } from "../../hooks";
import {
appendToOutputFile,
type BackgroundTask,
backgroundTasks,
createBackgroundOutputFile,
getNextTaskId,
} from "./process_manager.js";
import { LIMITS, truncateByChars } from "./truncation.js";
import { validateRequiredParams } from "./validation";
@@ -28,6 +38,8 @@ interface TaskArgs {
model?: string;
agent_id?: string; // Deploy an existing agent instead of creating new
conversation_id?: string; // Resume from an existing conversation
run_in_background?: boolean; // Run the task in background
max_turns?: number; // Maximum number of agentic turns
toolCallId?: string; // Injected by executeTool for linking subagent to parent tool call
signal?: AbortSignal; // Injected by executeTool for interruption handling
}
@@ -108,7 +120,184 @@ export async function task(args: TaskArgs): Promise<string> {
// Register subagent with state store for UI display
const subagentId = generateSubagentId();
registerSubagent(subagentId, subagent_type, description, toolCallId);
const isBackground = args.run_in_background ?? false;
registerSubagent(
subagentId,
subagent_type,
description,
toolCallId,
isBackground,
);
// Handle background execution
if (isBackground) {
const taskId = getNextTaskId();
const outputFile = createBackgroundOutputFile(taskId);
// Create abort controller for potential cancellation
const abortController = new AbortController();
// Register background task
const bgTask: BackgroundTask = {
description,
subagentType: subagent_type,
subagentId,
status: "running",
output: [],
startTime: new Date(),
outputFile,
abortController,
};
backgroundTasks.set(taskId, bgTask);
// Write initial status to output file
appendToOutputFile(
outputFile,
`[Task started: ${description}]\n[subagent_type: ${subagent_type}]\n\n`,
);
// Fire-and-forget: run subagent without awaiting
spawnSubagent(
subagent_type,
prompt,
model,
subagentId,
abortController.signal,
args.agent_id,
args.conversation_id,
args.max_turns,
)
.then((result) => {
// Update background task state
bgTask.status = result.success ? "completed" : "failed";
if (result.error) {
bgTask.error = result.error;
}
// Build output header
const header = [
`subagent_type=${subagent_type}`,
result.agentId ? `agent_id=${result.agentId}` : undefined,
result.conversationId
? `conversation_id=${result.conversationId}`
: undefined,
]
.filter(Boolean)
.join(" ");
// Write result to output file
if (result.success) {
appendToOutputFile(outputFile, `${header}\n\n${result.report}\n`);
bgTask.output.push(result.report || "");
} else {
appendToOutputFile(
outputFile,
`[error] ${result.error || "Subagent execution failed"}\n`,
);
}
appendToOutputFile(
outputFile,
`\n[Task ${result.success ? "completed" : "failed"}]\n`,
);
// Mark subagent as completed in state store
completeSubagent(subagentId, {
success: result.success,
error: result.error,
totalTokens: result.totalTokens,
});
const subagentSnapshot = getSubagentSnapshot();
const toolUses = subagentSnapshot.agents.find(
(agent) => agent.id === subagentId,
)?.toolCalls.length;
const durationMs = Math.max(0, Date.now() - bgTask.startTime.getTime());
// Build and truncate the result (same as foreground path)
const fullResult = result.success
? `${header}\n\n${result.report || ""}`
: result.error || "Subagent execution failed";
const userCwd = process.env.USER_CWD || process.cwd();
const { content: truncatedResult } = truncateByChars(
fullResult,
LIMITS.TASK_OUTPUT_CHARS,
"Task",
{ workingDirectory: userCwd, toolName: "Task" },
);
// Format and queue notification for auto-firing when idle
const notificationXml = formatTaskNotification({
taskId,
status: result.success ? "completed" : "failed",
summary: `Agent "${description}" ${result.success ? "completed" : "failed"}`,
result: truncatedResult,
outputFile,
usage: {
totalTokens: result.totalTokens,
toolUses,
durationMs,
},
});
addToMessageQueue({ kind: "task_notification", text: notificationXml });
// Run SubagentStop hooks (fire-and-forget)
runSubagentStopHooks(
subagent_type,
subagentId,
result.success,
result.error,
result.agentId,
result.conversationId,
).catch(() => {
// Silently ignore hook errors
});
})
.catch((error) => {
const errorMessage =
error instanceof Error ? error.message : String(error);
bgTask.status = "failed";
bgTask.error = errorMessage;
appendToOutputFile(outputFile, `[error] ${errorMessage}\n`);
// Mark subagent as completed with error
completeSubagent(subagentId, { success: false, error: errorMessage });
const subagentSnapshot = getSubagentSnapshot();
const toolUses = subagentSnapshot.agents.find(
(agent) => agent.id === subagentId,
)?.toolCalls.length;
const durationMs = Math.max(0, Date.now() - bgTask.startTime.getTime());
// Format and queue notification for auto-firing when idle
const notificationXml = formatTaskNotification({
taskId,
status: "failed",
summary: `Agent "${description}" failed`,
result: errorMessage,
outputFile,
usage: {
toolUses,
durationMs,
},
});
addToMessageQueue({ kind: "task_notification", text: notificationXml });
// Run SubagentStop hooks for error case
runSubagentStopHooks(
subagent_type,
subagentId,
false,
errorMessage,
args.agent_id,
args.conversation_id,
).catch(() => {
// Silently ignore hook errors
});
});
// Return immediately with task ID and output file
return `Task running in background with ID: ${taskId}\nOutput file: ${outputFile}`;
}
try {
const result = await spawnSubagent(
@@ -119,6 +308,7 @@ export async function task(args: TaskArgs): Promise<string> {
signal,
args.agent_id,
args.conversation_id,
args.max_turns,
);
// Mark subagent as completed in state store

View File

@@ -0,0 +1,30 @@
import { getTaskOutput } from "./BashOutput.js";
import { validateRequiredParams } from "./validation.js";
interface TaskOutputArgs {
task_id: string;
block?: boolean;
timeout?: number;
}
interface TaskOutputResult {
message: string;
status?: "running" | "completed" | "failed";
}
/**
* TaskOutput - retrieves output from a running or completed background task.
* Supports blocking (wait for completion) and timeout.
*/
export async function task_output(
args: TaskOutputArgs,
): Promise<TaskOutputResult> {
validateRequiredParams(args, ["task_id"], "TaskOutput");
const { task_id, block = true, timeout = 30000 } = args;
return getTaskOutput({
task_id,
block,
timeout,
});
}

View File

@@ -0,0 +1,37 @@
import { kill_bash } from "./KillBash.js";
import { backgroundTasks } from "./process_manager.js";
import { validateRequiredParams } from "./validation.js";
interface TaskStopArgs {
task_id?: string;
shell_id?: string; // deprecated, for backwards compatibility
}
interface TaskStopResult {
killed: boolean;
}
export async function task_stop(args: TaskStopArgs): Promise<TaskStopResult> {
// Support both task_id and deprecated shell_id
let id = args.task_id ?? args.shell_id;
if (!id) {
validateRequiredParams(args, ["task_id"], "TaskStop");
id = ""; // unreachable, validateRequiredParams throws
}
// Check if this is a background Task (subagent)
const task = backgroundTasks.get(id);
if (task) {
if (task.status === "running" && task.abortController) {
task.abortController.abort();
task.status = "failed";
task.error = "Aborted by user";
return { killed: true };
}
// Task exists but isn't running or doesn't have abort controller
return { killed: false };
}
// Fall back to killing a Bash background process
return kill_bash({ shell_id: id });
}

View File

@@ -7,8 +7,65 @@ export interface BackgroundProcess {
exitCode: number | null;
lastReadIndex: { stdout: number; stderr: number };
startTime?: Date;
outputFile?: string; // File path for persistent output
}
export interface BackgroundTask {
description: string;
subagentType: string;
subagentId: string;
status: "running" | "completed" | "failed";
output: string[];
error?: string;
startTime: Date;
outputFile: string;
abortController?: AbortController;
}
export const backgroundProcesses = new Map<string, BackgroundProcess>();
export const backgroundTasks = new Map<string, BackgroundTask>();
let bashIdCounter = 1;
export const getNextBashId = () => `bash_${bashIdCounter++}`;
let taskIdCounter = 1;
export const getNextTaskId = () => `task_${taskIdCounter++}`;
/**
* Get a temp directory for background task output files.
* Uses LETTA_SCRATCHPAD if set, otherwise falls back to os.tmpdir().
*/
export function getBackgroundOutputDir(): string {
const scratchpad = process.env.LETTA_SCRATCHPAD;
if (scratchpad) {
return scratchpad;
}
// Fall back to system temp with a letta-specific subdirectory
const os = require("node:os");
const path = require("node:path");
return path.join(os.tmpdir(), "letta-background");
}
/**
* Create a unique output file path for a background process/task.
*/
export function createBackgroundOutputFile(id: string): string {
const fs = require("node:fs");
const path = require("node:path");
const dir = getBackgroundOutputDir();
// Ensure directory exists
fs.mkdirSync(dir, { recursive: true });
const filePath = path.join(dir, `${id}.log`);
// Create empty file
fs.writeFileSync(filePath, "");
return filePath;
}
/**
* Append content to a background output file.
*/
export function appendToOutputFile(filePath: string, content: string): void {
const fs = require("node:fs");
fs.appendFileSync(filePath, content);
}