diff --git a/src/agent/subagents/manager.ts b/src/agent/subagents/manager.ts index 568edb6..9832773 100644 --- a/src/agent/subagents/manager.ts +++ b/src/agent/subagents/manager.ts @@ -12,6 +12,7 @@ import { createInterface } from "node:readline"; import { buildChatUrl } from "../../cli/helpers/appUrls"; import { addToolCall, + emitStreamEvent, updateSubagent, } from "../../cli/helpers/subagentState.js"; import { @@ -394,6 +395,10 @@ function processStreamEvent( case "message": if (event.message_type === "approval_request_message") { handleApprovalRequestEvent(event, state); + } else { + // Forward non-approval message events for WS streaming to the web UI. + // Approval requests are internal to the subagent's permission flow. + emitStreamEvent(subagentId, event); } break; diff --git a/src/cli/helpers/subagentState.ts b/src/cli/helpers/subagentState.ts index 6922c17..085c7f0 100644 --- a/src/cli/helpers/subagentState.ts +++ b/src/cli/helpers/subagentState.ts @@ -365,3 +365,54 @@ export function getSnapshot(): { } { return cachedSnapshot; } + +// ============================================================================ +// Stream Event Forwarding +// ============================================================================ + +/** + * A raw message-type event from the subagent's stdout (headless format). + * Shape: { type: "message", message_type: string, ...LettaStreamingResponse fields } + */ +export interface SubagentStreamEvent { + type: "message"; + message_type: string; + [key: string]: unknown; +} + +/** + * Callback for forwarding raw subagent stream events to the WS layer. + * The event is the parsed JSON line from the subagent's stdout. + */ +export type SubagentStreamEventListener = ( + subagentId: string, + event: SubagentStreamEvent, +) => void; + +const streamEventListeners = new Set(); + +/** + * Subscribe to raw subagent stream events (for WS forwarding). + * Returns an unsubscribe function. + */ +export function subscribeToStreamEvents( + listener: SubagentStreamEventListener, +): () => void { + streamEventListeners.add(listener); + return () => { + streamEventListeners.delete(listener); + }; +} + +/** + * Emit a raw stream event from a subagent. Called from processStreamEvent + * in manager.ts for message-type events that should be forwarded to the web UI. + */ +export function emitStreamEvent( + subagentId: string, + event: SubagentStreamEvent, +): void { + for (const listener of streamEventListeners) { + listener(subagentId, event); + } +} diff --git a/src/types/protocol_v2.ts b/src/types/protocol_v2.ts index 3950258..60b0c56 100644 --- a/src/types/protocol_v2.ts +++ b/src/types/protocol_v2.ts @@ -279,6 +279,39 @@ export type StreamDelta = export interface StreamDeltaMessage extends RuntimeEnvelope { type: "stream_delta"; delta: StreamDelta; + subagent_id?: string; +} + +/** + * Subagent state snapshot. + * Emitted via `update_subagent_state` on every subagent mutation. + */ +export interface SubagentSnapshotToolCall { + id: string; + name: string; + args: string; +} + +export interface SubagentSnapshot { + subagent_id: string; + subagent_type: string; + description: string; + status: "pending" | "running" | "completed" | "error"; + agent_url: string | null; + model?: string; + is_background?: boolean; + silent?: boolean; + tool_call_id?: string; + start_time: number; + tool_calls: SubagentSnapshotToolCall[]; + total_tokens: number; + duration_ms: number; + error?: string; +} + +export interface SubagentStateUpdateMessage extends RuntimeEnvelope { + type: "update_subagent_state"; + subagents: SubagentSnapshot[]; } export interface ApprovalResponseAllowDecision { @@ -397,6 +430,7 @@ export type WsProtocolMessage = | DeviceStatusUpdateMessage | LoopStatusUpdateMessage | QueueUpdateMessage - | StreamDeltaMessage; + | StreamDeltaMessage + | SubagentStateUpdateMessage; export type { StopReasonType }; diff --git a/src/websocket/listener/client.ts b/src/websocket/listener/client.ts index 742105a..b93d427 100644 --- a/src/websocket/listener/client.ts +++ b/src/websocket/listener/client.ts @@ -10,6 +10,10 @@ import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/mes import WebSocket from "ws"; import { getClient } from "../../agent/client"; import { generatePlanFilePath } from "../../cli/helpers/planName"; +import { + subscribe as subscribeToSubagentState, + subscribeToStreamEvents as subscribeToSubagentStreamEvents, +} from "../../cli/helpers/subagentState"; import { INTERRUPTED_BY_USER } from "../../constants"; import { type DequeuedBatch, QueueRuntime } from "../../queue/queueRuntime"; import { createSharedReminderState } from "../../reminders/state"; @@ -71,6 +75,8 @@ import { emitLoopStatusUpdate, emitRetryDelta, emitStateSync, + emitStreamDelta, + emitSubagentStateIfOpen, scheduleQueueEmit, setLoopStatus, } from "./protocol-outbound"; @@ -705,6 +711,32 @@ async function connectWithRetry( } } + // Subscribe to subagent state changes and emit snapshots over WS. + // Store the unsubscribe function on the runtime for cleanup on close. + runtime._unsubscribeSubagentState?.(); + runtime._unsubscribeSubagentState = subscribeToSubagentState(() => { + emitSubagentStateIfOpen(runtime); + }); + + // Subscribe to subagent stream events and forward as tagged stream_delta. + // Events are raw JSON lines from the subagent's stdout (headless format): + // { type: "message", message_type: "tool_call_message", ...LettaStreamingResponse fields } + // These are already MessageDelta-shaped (type:"message" + LettaStreamingResponse). + runtime._unsubscribeSubagentStreamEvents?.(); + runtime._unsubscribeSubagentStreamEvents = + subscribeToSubagentStreamEvents((subagentId, event) => { + if (socket.readyState !== WebSocket.OPEN) return; + // The event has { type: "message", message_type, ...LettaStreamingResponse } + // plus extra headless fields (session_id, uuid) that pass through harmlessly. + emitStreamDelta( + socket, + runtime, + event as unknown as import("../../types/protocol_v2").StreamDelta, + undefined, // scope: falls back to listener's default agent/conversation + subagentId, + ); + }); + runtime.heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ type: "ping" })); @@ -1043,6 +1075,10 @@ async function connectWithRetry( clearRuntimeTimers(runtime); killAllTerminals(); + runtime._unsubscribeSubagentState?.(); + runtime._unsubscribeSubagentState = undefined; + runtime._unsubscribeSubagentStreamEvents?.(); + runtime._unsubscribeSubagentStreamEvents = undefined; runtime.socket = null; for (const conversationRuntime of runtime.conversationRuntimes.values()) { rejectPendingApprovalResolvers( diff --git a/src/websocket/listener/protocol-outbound.ts b/src/websocket/listener/protocol-outbound.ts index db1ab9a..6864579 100644 --- a/src/websocket/listener/protocol-outbound.ts +++ b/src/websocket/listener/protocol-outbound.ts @@ -18,8 +18,11 @@ import type { StopReasonType, StreamDelta, StreamDeltaMessage, + SubagentSnapshot, + SubagentStateUpdateMessage, WsProtocolMessage, } from "../../types/protocol_v2"; +import { getSubagents } from "../../cli/helpers/subagentState"; import { SYSTEM_REMINDER_RE } from "./constants"; import { getConversationWorkingDirectory } from "./cwd"; import { getConversationPermissionModeState } from "./permissionMode"; @@ -450,6 +453,63 @@ export function emitStateSync( emitDeviceStatusUpdate(socket, runtime, scope); emitLoopStatusUpdate(socket, runtime, scope); emitQueueUpdate(socket, runtime, scope); + emitSubagentStateUpdate(socket, runtime, scope); +} + +// ───────────────────────────────────────────── +// Subagent state +// ───────────────────────────────────────────── + +export function buildSubagentSnapshot(): SubagentSnapshot[] { + return getSubagents() + .filter((a) => !a.silent) + .map((a) => ({ + subagent_id: a.id, + subagent_type: a.type, + description: a.description, + status: a.status, + agent_url: a.agentURL, + model: a.model, + is_background: a.isBackground, + silent: a.silent, + tool_call_id: a.toolCallId, + start_time: a.startTime, + tool_calls: a.toolCalls, + total_tokens: a.totalTokens, + duration_ms: a.durationMs, + error: a.error, + })); +} + +export function emitSubagentStateUpdate( + socket: WebSocket, + runtime: RuntimeCarrier, + scope?: { + agent_id?: string | null; + conversation_id?: string | null; + }, +): void { + const message: Omit< + SubagentStateUpdateMessage, + "runtime" | "event_seq" | "emitted_at" | "idempotency_key" + > = { + type: "update_subagent_state", + subagents: buildSubagentSnapshot(), + }; + emitProtocolV2Message(socket, runtime, message, scope); +} + +export function emitSubagentStateIfOpen( + runtime: RuntimeCarrier, + scope?: { + agent_id?: string | null; + conversation_id?: string | null; + }, +): void { + const listener = getListenerRuntime(runtime); + if (listener?.socket?.readyState === WebSocket.OPEN) { + emitSubagentStateUpdate(listener.socket, runtime, scope); + } } export function scheduleQueueEmit( @@ -605,6 +665,7 @@ export function emitStreamDelta( agent_id?: string | null; conversation_id?: string | null; }, + subagentId?: string, ): void { const message: Omit< StreamDeltaMessage, @@ -612,6 +673,7 @@ export function emitStreamDelta( > = { type: "stream_delta", delta, + ...(subagentId ? { subagent_id: subagentId } : {}), }; emitProtocolV2Message(socket, runtime, message, scope); } diff --git a/src/websocket/listener/types.ts b/src/websocket/listener/types.ts index 2876331..76f3253 100644 --- a/src/websocket/listener/types.ts +++ b/src/websocket/listener/types.ts @@ -159,6 +159,10 @@ export type ListenerRuntime = { conversationRuntimes: Map; approvalRuntimeKeyByRequestId: Map; lastEmittedStatus: "idle" | "receiving" | "processing" | null; + /** Unsubscribe from subagent state store (set on socket open, cleared on close). */ + _unsubscribeSubagentState?: (() => void) | undefined; + /** Unsubscribe from subagent stream events (set on socket open, cleared on close). */ + _unsubscribeSubagentStreamEvents?: (() => void) | undefined; }; export interface InterruptPopulateInput {