Revert "feat(umi): add subagent id to stream delta and subagent snapshot to WS event"
This reverts commit bda874b3cd.
This commit is contained in:
@@ -12,7 +12,6 @@ import { createInterface } from "node:readline";
|
||||
import { buildChatUrl } from "../../cli/helpers/appUrls";
|
||||
import {
|
||||
addToolCall,
|
||||
emitStreamEvent,
|
||||
updateSubagent,
|
||||
} from "../../cli/helpers/subagentState.js";
|
||||
import {
|
||||
@@ -395,10 +394,6 @@ function processStreamEvent(
|
||||
case "message":
|
||||
if (event.message_type === "approval_request_message") {
|
||||
handleApprovalRequestEvent(event, state);
|
||||
} else {
|
||||
// Forward non-approval message events for WS streaming to the web UI.
|
||||
// Approval requests are internal to the subagent's permission flow.
|
||||
emitStreamEvent(subagentId, event);
|
||||
}
|
||||
break;
|
||||
|
||||
|
||||
@@ -365,54 +365,3 @@ export function getSnapshot(): {
|
||||
} {
|
||||
return cachedSnapshot;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Stream Event Forwarding
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* A raw message-type event from the subagent's stdout (headless format).
|
||||
* Shape: { type: "message", message_type: string, ...LettaStreamingResponse fields }
|
||||
*/
|
||||
export interface SubagentStreamEvent {
|
||||
type: "message";
|
||||
message_type: string;
|
||||
[key: string]: unknown;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback for forwarding raw subagent stream events to the WS layer.
|
||||
* The event is the parsed JSON line from the subagent's stdout.
|
||||
*/
|
||||
export type SubagentStreamEventListener = (
|
||||
subagentId: string,
|
||||
event: SubagentStreamEvent,
|
||||
) => void;
|
||||
|
||||
const streamEventListeners = new Set<SubagentStreamEventListener>();
|
||||
|
||||
/**
|
||||
* Subscribe to raw subagent stream events (for WS forwarding).
|
||||
* Returns an unsubscribe function.
|
||||
*/
|
||||
export function subscribeToStreamEvents(
|
||||
listener: SubagentStreamEventListener,
|
||||
): () => void {
|
||||
streamEventListeners.add(listener);
|
||||
return () => {
|
||||
streamEventListeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a raw stream event from a subagent. Called from processStreamEvent
|
||||
* in manager.ts for message-type events that should be forwarded to the web UI.
|
||||
*/
|
||||
export function emitStreamEvent(
|
||||
subagentId: string,
|
||||
event: SubagentStreamEvent,
|
||||
): void {
|
||||
for (const listener of streamEventListeners) {
|
||||
listener(subagentId, event);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,39 +279,6 @@ export type StreamDelta =
|
||||
export interface StreamDeltaMessage extends RuntimeEnvelope {
|
||||
type: "stream_delta";
|
||||
delta: StreamDelta;
|
||||
subagent_id?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subagent state snapshot.
|
||||
* Emitted via `update_subagent_state` on every subagent mutation.
|
||||
*/
|
||||
export interface SubagentSnapshotToolCall {
|
||||
id: string;
|
||||
name: string;
|
||||
args: string;
|
||||
}
|
||||
|
||||
export interface SubagentSnapshot {
|
||||
subagent_id: string;
|
||||
subagent_type: string;
|
||||
description: string;
|
||||
status: "pending" | "running" | "completed" | "error";
|
||||
agent_url: string | null;
|
||||
model?: string;
|
||||
is_background?: boolean;
|
||||
silent?: boolean;
|
||||
tool_call_id?: string;
|
||||
start_time: number;
|
||||
tool_calls: SubagentSnapshotToolCall[];
|
||||
total_tokens: number;
|
||||
duration_ms: number;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
export interface SubagentStateUpdateMessage extends RuntimeEnvelope {
|
||||
type: "update_subagent_state";
|
||||
subagents: SubagentSnapshot[];
|
||||
}
|
||||
|
||||
export interface ApprovalResponseAllowDecision {
|
||||
@@ -430,7 +397,6 @@ export type WsProtocolMessage =
|
||||
| DeviceStatusUpdateMessage
|
||||
| LoopStatusUpdateMessage
|
||||
| QueueUpdateMessage
|
||||
| StreamDeltaMessage
|
||||
| SubagentStateUpdateMessage;
|
||||
| StreamDeltaMessage;
|
||||
|
||||
export type { StopReasonType };
|
||||
|
||||
@@ -10,10 +10,6 @@ import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/mes
|
||||
import WebSocket from "ws";
|
||||
import { getClient } from "../../agent/client";
|
||||
import { generatePlanFilePath } from "../../cli/helpers/planName";
|
||||
import {
|
||||
subscribe as subscribeToSubagentState,
|
||||
subscribeToStreamEvents as subscribeToSubagentStreamEvents,
|
||||
} from "../../cli/helpers/subagentState";
|
||||
import { INTERRUPTED_BY_USER } from "../../constants";
|
||||
import { type DequeuedBatch, QueueRuntime } from "../../queue/queueRuntime";
|
||||
import { createSharedReminderState } from "../../reminders/state";
|
||||
@@ -75,8 +71,6 @@ import {
|
||||
emitLoopStatusUpdate,
|
||||
emitRetryDelta,
|
||||
emitStateSync,
|
||||
emitStreamDelta,
|
||||
emitSubagentStateIfOpen,
|
||||
scheduleQueueEmit,
|
||||
setLoopStatus,
|
||||
} from "./protocol-outbound";
|
||||
@@ -711,32 +705,6 @@ async function connectWithRetry(
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to subagent state changes and emit snapshots over WS.
|
||||
// Store the unsubscribe function on the runtime for cleanup on close.
|
||||
runtime._unsubscribeSubagentState?.();
|
||||
runtime._unsubscribeSubagentState = subscribeToSubagentState(() => {
|
||||
emitSubagentStateIfOpen(runtime);
|
||||
});
|
||||
|
||||
// Subscribe to subagent stream events and forward as tagged stream_delta.
|
||||
// Events are raw JSON lines from the subagent's stdout (headless format):
|
||||
// { type: "message", message_type: "tool_call_message", ...LettaStreamingResponse fields }
|
||||
// These are already MessageDelta-shaped (type:"message" + LettaStreamingResponse).
|
||||
runtime._unsubscribeSubagentStreamEvents?.();
|
||||
runtime._unsubscribeSubagentStreamEvents =
|
||||
subscribeToSubagentStreamEvents((subagentId, event) => {
|
||||
if (socket.readyState !== WebSocket.OPEN) return;
|
||||
// The event has { type: "message", message_type, ...LettaStreamingResponse }
|
||||
// plus extra headless fields (session_id, uuid) that pass through harmlessly.
|
||||
emitStreamDelta(
|
||||
socket,
|
||||
runtime,
|
||||
event as unknown as import("../../types/protocol_v2").StreamDelta,
|
||||
undefined, // scope: falls back to listener's default agent/conversation
|
||||
subagentId,
|
||||
);
|
||||
});
|
||||
|
||||
runtime.heartbeatInterval = setInterval(() => {
|
||||
if (socket.readyState === WebSocket.OPEN) {
|
||||
socket.send(JSON.stringify({ type: "ping" }));
|
||||
@@ -1075,10 +1043,6 @@ async function connectWithRetry(
|
||||
|
||||
clearRuntimeTimers(runtime);
|
||||
killAllTerminals();
|
||||
runtime._unsubscribeSubagentState?.();
|
||||
runtime._unsubscribeSubagentState = undefined;
|
||||
runtime._unsubscribeSubagentStreamEvents?.();
|
||||
runtime._unsubscribeSubagentStreamEvents = undefined;
|
||||
runtime.socket = null;
|
||||
for (const conversationRuntime of runtime.conversationRuntimes.values()) {
|
||||
rejectPendingApprovalResolvers(
|
||||
|
||||
@@ -18,11 +18,8 @@ import type {
|
||||
StopReasonType,
|
||||
StreamDelta,
|
||||
StreamDeltaMessage,
|
||||
SubagentSnapshot,
|
||||
SubagentStateUpdateMessage,
|
||||
WsProtocolMessage,
|
||||
} from "../../types/protocol_v2";
|
||||
import { getSubagents } from "../../cli/helpers/subagentState";
|
||||
import { SYSTEM_REMINDER_RE } from "./constants";
|
||||
import { getConversationWorkingDirectory } from "./cwd";
|
||||
import { getConversationPermissionModeState } from "./permissionMode";
|
||||
@@ -453,63 +450,6 @@ export function emitStateSync(
|
||||
emitDeviceStatusUpdate(socket, runtime, scope);
|
||||
emitLoopStatusUpdate(socket, runtime, scope);
|
||||
emitQueueUpdate(socket, runtime, scope);
|
||||
emitSubagentStateUpdate(socket, runtime, scope);
|
||||
}
|
||||
|
||||
// ─────────────────────────────────────────────
|
||||
// Subagent state
|
||||
// ─────────────────────────────────────────────
|
||||
|
||||
export function buildSubagentSnapshot(): SubagentSnapshot[] {
|
||||
return getSubagents()
|
||||
.filter((a) => !a.silent)
|
||||
.map((a) => ({
|
||||
subagent_id: a.id,
|
||||
subagent_type: a.type,
|
||||
description: a.description,
|
||||
status: a.status,
|
||||
agent_url: a.agentURL,
|
||||
model: a.model,
|
||||
is_background: a.isBackground,
|
||||
silent: a.silent,
|
||||
tool_call_id: a.toolCallId,
|
||||
start_time: a.startTime,
|
||||
tool_calls: a.toolCalls,
|
||||
total_tokens: a.totalTokens,
|
||||
duration_ms: a.durationMs,
|
||||
error: a.error,
|
||||
}));
|
||||
}
|
||||
|
||||
export function emitSubagentStateUpdate(
|
||||
socket: WebSocket,
|
||||
runtime: RuntimeCarrier,
|
||||
scope?: {
|
||||
agent_id?: string | null;
|
||||
conversation_id?: string | null;
|
||||
},
|
||||
): void {
|
||||
const message: Omit<
|
||||
SubagentStateUpdateMessage,
|
||||
"runtime" | "event_seq" | "emitted_at" | "idempotency_key"
|
||||
> = {
|
||||
type: "update_subagent_state",
|
||||
subagents: buildSubagentSnapshot(),
|
||||
};
|
||||
emitProtocolV2Message(socket, runtime, message, scope);
|
||||
}
|
||||
|
||||
export function emitSubagentStateIfOpen(
|
||||
runtime: RuntimeCarrier,
|
||||
scope?: {
|
||||
agent_id?: string | null;
|
||||
conversation_id?: string | null;
|
||||
},
|
||||
): void {
|
||||
const listener = getListenerRuntime(runtime);
|
||||
if (listener?.socket?.readyState === WebSocket.OPEN) {
|
||||
emitSubagentStateUpdate(listener.socket, runtime, scope);
|
||||
}
|
||||
}
|
||||
|
||||
export function scheduleQueueEmit(
|
||||
@@ -665,7 +605,6 @@ export function emitStreamDelta(
|
||||
agent_id?: string | null;
|
||||
conversation_id?: string | null;
|
||||
},
|
||||
subagentId?: string,
|
||||
): void {
|
||||
const message: Omit<
|
||||
StreamDeltaMessage,
|
||||
@@ -673,7 +612,6 @@ export function emitStreamDelta(
|
||||
> = {
|
||||
type: "stream_delta",
|
||||
delta,
|
||||
...(subagentId ? { subagent_id: subagentId } : {}),
|
||||
};
|
||||
emitProtocolV2Message(socket, runtime, message, scope);
|
||||
}
|
||||
|
||||
@@ -159,10 +159,6 @@ export type ListenerRuntime = {
|
||||
conversationRuntimes: Map<string, ConversationRuntime>;
|
||||
approvalRuntimeKeyByRequestId: Map<string, string>;
|
||||
lastEmittedStatus: "idle" | "receiving" | "processing" | null;
|
||||
/** Unsubscribe from subagent state store (set on socket open, cleared on close). */
|
||||
_unsubscribeSubagentState?: (() => void) | undefined;
|
||||
/** Unsubscribe from subagent stream events (set on socket open, cleared on close). */
|
||||
_unsubscribeSubagentStreamEvents?: (() => void) | undefined;
|
||||
};
|
||||
|
||||
export interface InterruptPopulateInput {
|
||||
|
||||
Reference in New Issue
Block a user