From 0e8f84ca5359ec1686b4ca572b20d9b25d1e24ca Mon Sep 17 00:00:00 2001 From: christinatong01 Date: Thu, 19 Mar 2026 17:24:56 -0700 Subject: [PATCH] Revert "feat(umi): add subagent id to stream delta and subagent snapshot to WS event" This reverts commit bda874b3cd14d5f23e4d8c6182fa1057d487fac5. --- src/agent/subagents/manager.ts | 5 -- src/cli/helpers/subagentState.ts | 51 ----------------- src/types/protocol_v2.ts | 36 +----------- src/websocket/listener/client.ts | 36 ------------ src/websocket/listener/protocol-outbound.ts | 62 --------------------- src/websocket/listener/types.ts | 4 -- 6 files changed, 1 insertion(+), 193 deletions(-) diff --git a/src/agent/subagents/manager.ts b/src/agent/subagents/manager.ts index 9832773..568edb6 100644 --- a/src/agent/subagents/manager.ts +++ b/src/agent/subagents/manager.ts @@ -12,7 +12,6 @@ import { createInterface } from "node:readline"; import { buildChatUrl } from "../../cli/helpers/appUrls"; import { addToolCall, - emitStreamEvent, updateSubagent, } from "../../cli/helpers/subagentState.js"; import { @@ -395,10 +394,6 @@ function processStreamEvent( case "message": if (event.message_type === "approval_request_message") { handleApprovalRequestEvent(event, state); - } else { - // Forward non-approval message events for WS streaming to the web UI. - // Approval requests are internal to the subagent's permission flow. - emitStreamEvent(subagentId, event); } break; diff --git a/src/cli/helpers/subagentState.ts b/src/cli/helpers/subagentState.ts index 085c7f0..6922c17 100644 --- a/src/cli/helpers/subagentState.ts +++ b/src/cli/helpers/subagentState.ts @@ -365,54 +365,3 @@ export function getSnapshot(): { } { return cachedSnapshot; } - -// ============================================================================ -// Stream Event Forwarding -// ============================================================================ - -/** - * A raw message-type event from the subagent's stdout (headless format). - * Shape: { type: "message", message_type: string, ...LettaStreamingResponse fields } - */ -export interface SubagentStreamEvent { - type: "message"; - message_type: string; - [key: string]: unknown; -} - -/** - * Callback for forwarding raw subagent stream events to the WS layer. - * The event is the parsed JSON line from the subagent's stdout. - */ -export type SubagentStreamEventListener = ( - subagentId: string, - event: SubagentStreamEvent, -) => void; - -const streamEventListeners = new Set(); - -/** - * Subscribe to raw subagent stream events (for WS forwarding). - * Returns an unsubscribe function. - */ -export function subscribeToStreamEvents( - listener: SubagentStreamEventListener, -): () => void { - streamEventListeners.add(listener); - return () => { - streamEventListeners.delete(listener); - }; -} - -/** - * Emit a raw stream event from a subagent. Called from processStreamEvent - * in manager.ts for message-type events that should be forwarded to the web UI. - */ -export function emitStreamEvent( - subagentId: string, - event: SubagentStreamEvent, -): void { - for (const listener of streamEventListeners) { - listener(subagentId, event); - } -} diff --git a/src/types/protocol_v2.ts b/src/types/protocol_v2.ts index 60b0c56..3950258 100644 --- a/src/types/protocol_v2.ts +++ b/src/types/protocol_v2.ts @@ -279,39 +279,6 @@ export type StreamDelta = export interface StreamDeltaMessage extends RuntimeEnvelope { type: "stream_delta"; delta: StreamDelta; - subagent_id?: string; -} - -/** - * Subagent state snapshot. - * Emitted via `update_subagent_state` on every subagent mutation. - */ -export interface SubagentSnapshotToolCall { - id: string; - name: string; - args: string; -} - -export interface SubagentSnapshot { - subagent_id: string; - subagent_type: string; - description: string; - status: "pending" | "running" | "completed" | "error"; - agent_url: string | null; - model?: string; - is_background?: boolean; - silent?: boolean; - tool_call_id?: string; - start_time: number; - tool_calls: SubagentSnapshotToolCall[]; - total_tokens: number; - duration_ms: number; - error?: string; -} - -export interface SubagentStateUpdateMessage extends RuntimeEnvelope { - type: "update_subagent_state"; - subagents: SubagentSnapshot[]; } export interface ApprovalResponseAllowDecision { @@ -430,7 +397,6 @@ export type WsProtocolMessage = | DeviceStatusUpdateMessage | LoopStatusUpdateMessage | QueueUpdateMessage - | StreamDeltaMessage - | SubagentStateUpdateMessage; + | StreamDeltaMessage; export type { StopReasonType }; diff --git a/src/websocket/listener/client.ts b/src/websocket/listener/client.ts index b93d427..742105a 100644 --- a/src/websocket/listener/client.ts +++ b/src/websocket/listener/client.ts @@ -10,10 +10,6 @@ import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/mes import WebSocket from "ws"; import { getClient } from "../../agent/client"; import { generatePlanFilePath } from "../../cli/helpers/planName"; -import { - subscribe as subscribeToSubagentState, - subscribeToStreamEvents as subscribeToSubagentStreamEvents, -} from "../../cli/helpers/subagentState"; import { INTERRUPTED_BY_USER } from "../../constants"; import { type DequeuedBatch, QueueRuntime } from "../../queue/queueRuntime"; import { createSharedReminderState } from "../../reminders/state"; @@ -75,8 +71,6 @@ import { emitLoopStatusUpdate, emitRetryDelta, emitStateSync, - emitStreamDelta, - emitSubagentStateIfOpen, scheduleQueueEmit, setLoopStatus, } from "./protocol-outbound"; @@ -711,32 +705,6 @@ async function connectWithRetry( } } - // Subscribe to subagent state changes and emit snapshots over WS. - // Store the unsubscribe function on the runtime for cleanup on close. - runtime._unsubscribeSubagentState?.(); - runtime._unsubscribeSubagentState = subscribeToSubagentState(() => { - emitSubagentStateIfOpen(runtime); - }); - - // Subscribe to subagent stream events and forward as tagged stream_delta. - // Events are raw JSON lines from the subagent's stdout (headless format): - // { type: "message", message_type: "tool_call_message", ...LettaStreamingResponse fields } - // These are already MessageDelta-shaped (type:"message" + LettaStreamingResponse). - runtime._unsubscribeSubagentStreamEvents?.(); - runtime._unsubscribeSubagentStreamEvents = - subscribeToSubagentStreamEvents((subagentId, event) => { - if (socket.readyState !== WebSocket.OPEN) return; - // The event has { type: "message", message_type, ...LettaStreamingResponse } - // plus extra headless fields (session_id, uuid) that pass through harmlessly. - emitStreamDelta( - socket, - runtime, - event as unknown as import("../../types/protocol_v2").StreamDelta, - undefined, // scope: falls back to listener's default agent/conversation - subagentId, - ); - }); - runtime.heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ type: "ping" })); @@ -1075,10 +1043,6 @@ async function connectWithRetry( clearRuntimeTimers(runtime); killAllTerminals(); - runtime._unsubscribeSubagentState?.(); - runtime._unsubscribeSubagentState = undefined; - runtime._unsubscribeSubagentStreamEvents?.(); - runtime._unsubscribeSubagentStreamEvents = undefined; runtime.socket = null; for (const conversationRuntime of runtime.conversationRuntimes.values()) { rejectPendingApprovalResolvers( diff --git a/src/websocket/listener/protocol-outbound.ts b/src/websocket/listener/protocol-outbound.ts index 6864579..db1ab9a 100644 --- a/src/websocket/listener/protocol-outbound.ts +++ b/src/websocket/listener/protocol-outbound.ts @@ -18,11 +18,8 @@ import type { StopReasonType, StreamDelta, StreamDeltaMessage, - SubagentSnapshot, - SubagentStateUpdateMessage, WsProtocolMessage, } from "../../types/protocol_v2"; -import { getSubagents } from "../../cli/helpers/subagentState"; import { SYSTEM_REMINDER_RE } from "./constants"; import { getConversationWorkingDirectory } from "./cwd"; import { getConversationPermissionModeState } from "./permissionMode"; @@ -453,63 +450,6 @@ export function emitStateSync( emitDeviceStatusUpdate(socket, runtime, scope); emitLoopStatusUpdate(socket, runtime, scope); emitQueueUpdate(socket, runtime, scope); - emitSubagentStateUpdate(socket, runtime, scope); -} - -// ───────────────────────────────────────────── -// Subagent state -// ───────────────────────────────────────────── - -export function buildSubagentSnapshot(): SubagentSnapshot[] { - return getSubagents() - .filter((a) => !a.silent) - .map((a) => ({ - subagent_id: a.id, - subagent_type: a.type, - description: a.description, - status: a.status, - agent_url: a.agentURL, - model: a.model, - is_background: a.isBackground, - silent: a.silent, - tool_call_id: a.toolCallId, - start_time: a.startTime, - tool_calls: a.toolCalls, - total_tokens: a.totalTokens, - duration_ms: a.durationMs, - error: a.error, - })); -} - -export function emitSubagentStateUpdate( - socket: WebSocket, - runtime: RuntimeCarrier, - scope?: { - agent_id?: string | null; - conversation_id?: string | null; - }, -): void { - const message: Omit< - SubagentStateUpdateMessage, - "runtime" | "event_seq" | "emitted_at" | "idempotency_key" - > = { - type: "update_subagent_state", - subagents: buildSubagentSnapshot(), - }; - emitProtocolV2Message(socket, runtime, message, scope); -} - -export function emitSubagentStateIfOpen( - runtime: RuntimeCarrier, - scope?: { - agent_id?: string | null; - conversation_id?: string | null; - }, -): void { - const listener = getListenerRuntime(runtime); - if (listener?.socket?.readyState === WebSocket.OPEN) { - emitSubagentStateUpdate(listener.socket, runtime, scope); - } } export function scheduleQueueEmit( @@ -665,7 +605,6 @@ export function emitStreamDelta( agent_id?: string | null; conversation_id?: string | null; }, - subagentId?: string, ): void { const message: Omit< StreamDeltaMessage, @@ -673,7 +612,6 @@ export function emitStreamDelta( > = { type: "stream_delta", delta, - ...(subagentId ? { subagent_id: subagentId } : {}), }; emitProtocolV2Message(socket, runtime, message, scope); } diff --git a/src/websocket/listener/types.ts b/src/websocket/listener/types.ts index 76f3253..2876331 100644 --- a/src/websocket/listener/types.ts +++ b/src/websocket/listener/types.ts @@ -159,10 +159,6 @@ export type ListenerRuntime = { conversationRuntimes: Map; approvalRuntimeKeyByRequestId: Map; lastEmittedStatus: "idle" | "receiving" | "processing" | null; - /** Unsubscribe from subagent state store (set on socket open, cleared on close). */ - _unsubscribeSubagentState?: (() => void) | undefined; - /** Unsubscribe from subagent stream events (set on socket open, cleared on close). */ - _unsubscribeSubagentStreamEvents?: (() => void) | undefined; }; export interface InterruptPopulateInput {