feat(listen): emit explicit approval and tool execution lifecycle events (#1381)
This commit is contained in:
@@ -330,6 +330,12 @@ describe("listen-client state_response control protocol", () => {
|
||||
expect(snapshot.control_response_capable).toBe(true);
|
||||
});
|
||||
|
||||
test("advertises tool lifecycle capability for device clients", () => {
|
||||
const runtime = __listenClientTestUtils.createRuntime();
|
||||
const snapshot = __listenClientTestUtils.buildStateResponse(runtime, 2);
|
||||
expect(snapshot.tool_lifecycle_capable).toBe(true);
|
||||
});
|
||||
|
||||
test("includes the effective working directory", () => {
|
||||
const runtime = __listenClientTestUtils.createRuntime();
|
||||
const snapshot = __listenClientTestUtils.buildStateResponse(runtime, 1);
|
||||
@@ -818,6 +824,70 @@ describe("listen-client emitToWS adapter", () => {
|
||||
// Verify it's a UUID format after the prefix
|
||||
expect(runtime.sessionId.length).toBeGreaterThan(10);
|
||||
});
|
||||
|
||||
test("emits approval lifecycle events", () => {
|
||||
const socket = new MockSocket(WebSocket.OPEN);
|
||||
const requested = {
|
||||
type: "approval_requested" as const,
|
||||
request_id: "perm-tool-1",
|
||||
tool_call_id: "tool-1",
|
||||
tool_name: "Bash",
|
||||
run_id: "run-1",
|
||||
session_id: "listen-test",
|
||||
uuid: "approval-requested-1",
|
||||
};
|
||||
const received = {
|
||||
type: "approval_received" as const,
|
||||
request_id: "perm-tool-1",
|
||||
tool_call_id: "tool-1",
|
||||
decision: "allow" as const,
|
||||
reason: "Approved via WebSocket",
|
||||
run_id: "run-1",
|
||||
session_id: "listen-test",
|
||||
uuid: "approval-received-1",
|
||||
};
|
||||
|
||||
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, requested);
|
||||
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, received);
|
||||
|
||||
expect(socket.sentPayloads).toHaveLength(2);
|
||||
const sentRequested = JSON.parse(socket.sentPayloads[0] as string);
|
||||
const sentReceived = JSON.parse(socket.sentPayloads[1] as string);
|
||||
expect(sentRequested.type).toBe("approval_requested");
|
||||
expect(sentRequested.request_id).toBe("perm-tool-1");
|
||||
expect(sentReceived.type).toBe("approval_received");
|
||||
expect(sentReceived.decision).toBe("allow");
|
||||
});
|
||||
|
||||
test("emits tool execution lifecycle events", () => {
|
||||
const socket = new MockSocket(WebSocket.OPEN);
|
||||
const started = {
|
||||
type: "tool_execution_started" as const,
|
||||
tool_call_id: "tool-2",
|
||||
run_id: "run-2",
|
||||
session_id: "listen-test",
|
||||
uuid: "tool-exec-started-2",
|
||||
};
|
||||
const finished = {
|
||||
type: "tool_execution_finished" as const,
|
||||
tool_call_id: "tool-2",
|
||||
status: "success" as const,
|
||||
run_id: "run-2",
|
||||
session_id: "listen-test",
|
||||
uuid: "tool-exec-finished-2",
|
||||
};
|
||||
|
||||
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, started);
|
||||
__listenClientTestUtils.emitToWS(socket as unknown as WebSocket, finished);
|
||||
|
||||
expect(socket.sentPayloads).toHaveLength(2);
|
||||
const sentStarted = JSON.parse(socket.sentPayloads[0] as string);
|
||||
const sentFinished = JSON.parse(socket.sentPayloads[1] as string);
|
||||
expect(sentStarted.type).toBe("tool_execution_started");
|
||||
expect(sentStarted.tool_call_id).toBe("tool-2");
|
||||
expect(sentFinished.type).toBe("tool_execution_finished");
|
||||
expect(sentFinished.status).toBe("success");
|
||||
});
|
||||
});
|
||||
|
||||
describe("listen-client post-stop approval recovery policy", () => {
|
||||
|
||||
@@ -176,6 +176,65 @@ export interface StreamEvent extends MessageEnvelope {
|
||||
event: LettaStreamingResponse;
|
||||
}
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════
|
||||
// TOOL LIFECYCLE EVENTS
|
||||
// ═══════════════════════════════════════════════════════════════
|
||||
|
||||
/**
|
||||
* Informational lifecycle event emitted when the runtime asks for user approval
|
||||
* for a specific tool call.
|
||||
*
|
||||
* NOTE:
|
||||
* - `control_request` remains the canonical UI trigger for approval state.
|
||||
* - This event is telemetry/lifecycle only and should not replace
|
||||
* `control_request` in UI reducers.
|
||||
*/
|
||||
export interface ApprovalRequestedMessage extends MessageEnvelope {
|
||||
type: "approval_requested";
|
||||
request_id: string;
|
||||
tool_call_id: string;
|
||||
tool_name: string;
|
||||
run_id?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Informational lifecycle event emitted after an approval request receives
|
||||
* a decision.
|
||||
*
|
||||
* NOTE:
|
||||
* - `control_request` + `control_response` remain canonical for approval flow.
|
||||
* - This event is telemetry/lifecycle only.
|
||||
*/
|
||||
export interface ApprovalReceivedMessage extends MessageEnvelope {
|
||||
type: "approval_received";
|
||||
request_id: string;
|
||||
tool_call_id: string;
|
||||
decision: "allow" | "deny";
|
||||
reason?: string;
|
||||
run_id?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when local execution starts for a previously approved tool call.
|
||||
* This is authoritative for starting tool-running timers in device clients.
|
||||
*/
|
||||
export interface ToolExecutionStartedMessage extends MessageEnvelope {
|
||||
type: "tool_execution_started";
|
||||
tool_call_id: string;
|
||||
run_id?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted when local execution finishes for a previously started tool call.
|
||||
* This is authoritative for stopping tool-running timers in device clients.
|
||||
*/
|
||||
export interface ToolExecutionFinishedMessage extends MessageEnvelope {
|
||||
type: "tool_execution_finished";
|
||||
tool_call_id: string;
|
||||
status: "success" | "error";
|
||||
run_id?: string;
|
||||
}
|
||||
|
||||
// ═══════════════════════════════════════════════════════════════
|
||||
// AUTO APPROVAL
|
||||
// ═══════════════════════════════════════════════════════════════
|
||||
@@ -778,6 +837,10 @@ export type WireMessage =
|
||||
| SystemMessage
|
||||
| ContentMessage
|
||||
| StreamEvent
|
||||
| ApprovalRequestedMessage
|
||||
| ApprovalReceivedMessage
|
||||
| ToolExecutionStartedMessage
|
||||
| ToolExecutionFinishedMessage
|
||||
| AutoApprovalMessage
|
||||
| CancelAckMessage
|
||||
| ErrorMessage
|
||||
|
||||
@@ -63,6 +63,8 @@ import { settingsManager } from "../settings-manager";
|
||||
import { isInteractiveApprovalTool } from "../tools/interactivePolicy";
|
||||
import { loadTools } from "../tools/manager";
|
||||
import type {
|
||||
ApprovalReceivedMessage,
|
||||
ApprovalRequestedMessage,
|
||||
AutoApprovalMessage,
|
||||
CancelAckMessage,
|
||||
CanUseToolResponse,
|
||||
@@ -77,6 +79,8 @@ import type {
|
||||
RetryMessage,
|
||||
StopReasonType,
|
||||
SyncCompleteMessage,
|
||||
ToolExecutionFinishedMessage,
|
||||
ToolExecutionStartedMessage,
|
||||
TranscriptBackfillMessage,
|
||||
TranscriptSupplementMessage,
|
||||
} from "../types/protocol";
|
||||
@@ -286,6 +290,7 @@ interface StateResponseMessage {
|
||||
is_processing: boolean;
|
||||
last_stop_reason: string | null;
|
||||
control_response_capable: boolean;
|
||||
tool_lifecycle_capable: boolean;
|
||||
active_run: {
|
||||
run_id: string | null;
|
||||
agent_id: string | null;
|
||||
@@ -1385,6 +1390,7 @@ function buildStateResponse(
|
||||
is_processing: runtime.isProcessing,
|
||||
last_stop_reason: runtime.lastStopReason,
|
||||
control_response_capable: true,
|
||||
tool_lifecycle_capable: true,
|
||||
active_run: {
|
||||
run_id: runtime.activeRunId,
|
||||
agent_id: runtime.activeAgentId,
|
||||
@@ -1536,6 +1542,10 @@ function sendControlMessageOverWebSocket(
|
||||
|
||||
export type WsProtocolEvent =
|
||||
| MessageWire
|
||||
| ApprovalRequestedMessage
|
||||
| ApprovalReceivedMessage
|
||||
| ToolExecutionStartedMessage
|
||||
| ToolExecutionFinishedMessage
|
||||
| AutoApprovalMessage
|
||||
| CancelAckMessage
|
||||
| ErrorMessage
|
||||
@@ -1949,6 +1959,54 @@ function emitInterruptToolReturnMessage(
|
||||
}
|
||||
}
|
||||
|
||||
function emitToolExecutionStartedEvents(
|
||||
socket: WebSocket,
|
||||
runtime: ListenerRuntime,
|
||||
params: {
|
||||
toolCallIds: string[];
|
||||
runId?: string | null;
|
||||
agentId?: string;
|
||||
conversationId?: string;
|
||||
},
|
||||
): void {
|
||||
for (const toolCallId of params.toolCallIds) {
|
||||
emitToWS(socket, {
|
||||
type: "tool_execution_started",
|
||||
tool_call_id: toolCallId,
|
||||
...(params.runId ? { run_id: params.runId } : {}),
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `tool-exec-started-${toolCallId}`,
|
||||
agent_id: params.agentId,
|
||||
conversation_id: params.conversationId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function emitToolExecutionFinishedEvents(
|
||||
socket: WebSocket,
|
||||
runtime: ListenerRuntime,
|
||||
params: {
|
||||
approvals: ApprovalResult[] | null;
|
||||
runId?: string | null;
|
||||
agentId?: string;
|
||||
conversationId?: string;
|
||||
},
|
||||
): void {
|
||||
const toolReturns = extractInterruptToolReturns(params.approvals);
|
||||
for (const toolReturn of toolReturns) {
|
||||
emitToWS(socket, {
|
||||
type: "tool_execution_finished",
|
||||
tool_call_id: toolReturn.tool_call_id,
|
||||
status: toolReturn.status,
|
||||
...(params.runId ? { run_id: params.runId } : {}),
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `tool-exec-finished-${toolReturn.tool_call_id}`,
|
||||
agent_id: params.agentId,
|
||||
conversation_id: params.conversationId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function getInterruptApprovalsForEmission(
|
||||
runtime: ListenerRuntime,
|
||||
params: {
|
||||
@@ -3690,6 +3748,8 @@ async function handleIncomingMessage(
|
||||
ac.parsedArgs,
|
||||
turnWorkingDirectory,
|
||||
);
|
||||
const lifecycleRunId =
|
||||
runId || runtime.activeRunId || msgRunIds[msgRunIds.length - 1];
|
||||
|
||||
const controlRequest: ControlRequest = {
|
||||
type: "control_request",
|
||||
@@ -3707,6 +3767,18 @@ async function handleIncomingMessage(
|
||||
conversation_id: conversationId,
|
||||
};
|
||||
|
||||
emitToWS(socket, {
|
||||
type: "approval_requested",
|
||||
request_id: requestId,
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
tool_name: ac.approval.toolName,
|
||||
...(lifecycleRunId ? { run_id: lifecycleRunId } : {}),
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `approval-requested-${ac.approval.toolCallId}`,
|
||||
agent_id: agentId,
|
||||
conversation_id: conversationId,
|
||||
});
|
||||
|
||||
const responseBody = await requestApprovalOverWS(
|
||||
runtime,
|
||||
socket,
|
||||
@@ -3742,21 +3814,58 @@ async function handleIncomingMessage(
|
||||
agent_id: agentId,
|
||||
conversation_id: conversationId,
|
||||
} as AutoApprovalMessage);
|
||||
emitToWS(socket, {
|
||||
type: "approval_received",
|
||||
request_id: requestId,
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
decision: "allow",
|
||||
reason: "Approved via WebSocket",
|
||||
...(lifecycleRunId ? { run_id: lifecycleRunId } : {}),
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `approval-received-${ac.approval.toolCallId}`,
|
||||
agent_id: agentId,
|
||||
conversation_id: conversationId,
|
||||
});
|
||||
} else {
|
||||
decisions.push({
|
||||
type: "deny",
|
||||
approval: ac.approval,
|
||||
reason: response?.message || "Denied via WebSocket",
|
||||
});
|
||||
emitToWS(socket, {
|
||||
type: "approval_received",
|
||||
request_id: requestId,
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
decision: "deny",
|
||||
reason: response?.message || "Denied via WebSocket",
|
||||
...(lifecycleRunId ? { run_id: lifecycleRunId } : {}),
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `approval-received-${ac.approval.toolCallId}`,
|
||||
agent_id: agentId,
|
||||
conversation_id: conversationId,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
const denyReason =
|
||||
responseBody.subtype === "error"
|
||||
? responseBody.error
|
||||
: "Unknown error";
|
||||
decisions.push({
|
||||
type: "deny",
|
||||
approval: ac.approval,
|
||||
reason:
|
||||
responseBody.subtype === "error"
|
||||
? responseBody.error
|
||||
: "Unknown error",
|
||||
reason: denyReason,
|
||||
});
|
||||
emitToWS(socket, {
|
||||
type: "approval_received",
|
||||
request_id: requestId,
|
||||
tool_call_id: ac.approval.toolCallId,
|
||||
decision: "deny",
|
||||
reason: denyReason,
|
||||
...(lifecycleRunId ? { run_id: lifecycleRunId } : {}),
|
||||
session_id: runtime.sessionId,
|
||||
uuid: `approval-received-${ac.approval.toolCallId}`,
|
||||
agent_id: agentId,
|
||||
conversation_id: conversationId,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -3771,6 +3880,14 @@ async function handleIncomingMessage(
|
||||
)
|
||||
.map((decision) => decision.approval.toolCallId);
|
||||
runtime.activeExecutingToolCallIds = [...lastExecutingToolCallIds];
|
||||
const executionRunId =
|
||||
runId || runtime.activeRunId || msgRunIds[msgRunIds.length - 1];
|
||||
emitToolExecutionStartedEvents(socket, runtime, {
|
||||
toolCallIds: lastExecutingToolCallIds,
|
||||
runId: executionRunId,
|
||||
agentId,
|
||||
conversationId,
|
||||
});
|
||||
|
||||
// Execute approved/denied tools
|
||||
const executionResults = await executeApprovalBatch(
|
||||
@@ -3788,6 +3905,12 @@ async function handleIncomingMessage(
|
||||
executionResults,
|
||||
lastExecutingToolCallIds,
|
||||
);
|
||||
emitToolExecutionFinishedEvents(socket, runtime, {
|
||||
approvals: persistedExecutionResults,
|
||||
runId: executionRunId,
|
||||
agentId,
|
||||
conversationId,
|
||||
});
|
||||
lastExecutionResults = persistedExecutionResults;
|
||||
// WS-first parity: publish tool-return terminal outcomes immediately on
|
||||
// normal approval execution, before continuation stream send.
|
||||
@@ -3850,6 +3973,12 @@ async function handleIncomingMessage(
|
||||
conversationId,
|
||||
});
|
||||
if (approvalsForEmission) {
|
||||
emitToolExecutionFinishedEvents(socket, runtime, {
|
||||
approvals: approvalsForEmission,
|
||||
runId: runtime.activeRunId || msgRunIds[msgRunIds.length - 1],
|
||||
agentId: agentId || "",
|
||||
conversationId,
|
||||
});
|
||||
emitInterruptToolReturnMessage(
|
||||
socket,
|
||||
runtime,
|
||||
|
||||
Reference in New Issue
Block a user