feat: add approval loop and improved conversationId handling for list… (#1057)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
@@ -20,7 +20,10 @@ import { getResumeData } from "../agent/check-approval";
|
||||
import { getClient } from "../agent/client";
|
||||
import { sendMessageStream } from "../agent/message";
|
||||
import { createBuffers } from "../cli/helpers/accumulator";
|
||||
import { classifyApprovals } from "../cli/helpers/approvalClassification";
|
||||
import { generatePlanFilePath } from "../cli/helpers/planName";
|
||||
import { drainStreamWithResume } from "../cli/helpers/stream";
|
||||
import { permissionMode } from "../permissions/mode";
|
||||
import { settingsManager } from "../settings-manager";
|
||||
import { loadTools } from "../tools/manager";
|
||||
|
||||
@@ -70,8 +73,24 @@ interface RunStartedMessage {
|
||||
runId: string;
|
||||
}
|
||||
|
||||
type ServerMessage = PongMessage | IncomingMessage;
|
||||
type ClientMessage = PingMessage | ResultMessage | RunStartedMessage;
|
||||
interface ModeChangeMessage {
|
||||
type: "mode_change";
|
||||
mode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
|
||||
}
|
||||
|
||||
interface ModeChangedMessage {
|
||||
type: "mode_changed";
|
||||
mode: "default" | "acceptEdits" | "plan" | "bypassPermissions";
|
||||
success: boolean;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
type ServerMessage = PongMessage | IncomingMessage | ModeChangeMessage;
|
||||
type ClientMessage =
|
||||
| PingMessage
|
||||
| ResultMessage
|
||||
| RunStartedMessage
|
||||
| ModeChangedMessage;
|
||||
|
||||
type ListenerRuntime = {
|
||||
socket: WebSocket | null;
|
||||
@@ -89,6 +108,44 @@ type ApprovalSlot =
|
||||
// Listen mode supports one active connection per process.
|
||||
let activeRuntime: ListenerRuntime | null = null;
|
||||
|
||||
/**
|
||||
* Handle mode change request from cloud
|
||||
*/
|
||||
function handleModeChange(msg: ModeChangeMessage, socket: WebSocket): void {
|
||||
try {
|
||||
permissionMode.setMode(msg.mode);
|
||||
|
||||
// If entering plan mode, generate and set plan file path
|
||||
if (msg.mode === "plan" && !permissionMode.getPlanFilePath()) {
|
||||
const planFilePath = generatePlanFilePath();
|
||||
permissionMode.setPlanFilePath(planFilePath);
|
||||
}
|
||||
|
||||
// Send success acknowledgment
|
||||
sendClientMessage(socket, {
|
||||
type: "mode_changed",
|
||||
mode: msg.mode,
|
||||
success: true,
|
||||
});
|
||||
|
||||
if (process.env.DEBUG) {
|
||||
console.log(`[Listen] Mode changed to: ${msg.mode}`);
|
||||
}
|
||||
} catch (error) {
|
||||
// Send failure acknowledgment
|
||||
sendClientMessage(socket, {
|
||||
type: "mode_changed",
|
||||
mode: msg.mode,
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : "Mode change failed",
|
||||
});
|
||||
|
||||
if (process.env.DEBUG) {
|
||||
console.error("[Listen] Mode change failed:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const MAX_RETRY_DURATION_MS = 5 * 60 * 1000; // 5 minutes
|
||||
const INITIAL_RETRY_DELAY_MS = 1000; // 1 second
|
||||
const MAX_RETRY_DELAY_MS = 30000; // 30 seconds
|
||||
@@ -146,7 +203,11 @@ function parseServerMessage(data: WebSocket.RawData): ServerMessage | null {
|
||||
try {
|
||||
const raw = typeof data === "string" ? data : data.toString();
|
||||
const parsed = JSON.parse(raw) as { type?: string };
|
||||
if (parsed.type === "pong" || parsed.type === "message") {
|
||||
if (
|
||||
parsed.type === "pong" ||
|
||||
parsed.type === "message" ||
|
||||
parsed.type === "mode_change"
|
||||
) {
|
||||
return parsed as ServerMessage;
|
||||
}
|
||||
return null;
|
||||
@@ -338,6 +399,13 @@ async function connectWithRetry(
|
||||
runtime.hasSuccessfulConnection = true;
|
||||
opts.onConnected();
|
||||
|
||||
// Send current mode state to cloud for UI sync
|
||||
sendClientMessage(socket, {
|
||||
type: "mode_changed",
|
||||
mode: permissionMode.getMode(),
|
||||
success: true,
|
||||
});
|
||||
|
||||
runtime.heartbeatInterval = setInterval(() => {
|
||||
sendClientMessage(socket, { type: "ping" });
|
||||
}, 30000);
|
||||
@@ -345,31 +413,40 @@ async function connectWithRetry(
|
||||
|
||||
socket.on("message", (data: WebSocket.RawData) => {
|
||||
const parsed = parseServerMessage(data);
|
||||
if (!parsed || parsed.type !== "message") {
|
||||
if (!parsed) {
|
||||
return;
|
||||
}
|
||||
|
||||
runtime.messageQueue = runtime.messageQueue
|
||||
.then(async () => {
|
||||
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
|
||||
return;
|
||||
}
|
||||
// Handle mode change messages immediately (not queued)
|
||||
if (parsed.type === "mode_change") {
|
||||
handleModeChange(parsed, socket);
|
||||
return;
|
||||
}
|
||||
|
||||
opts.onStatusChange?.("receiving", opts.connectionId);
|
||||
await handleIncomingMessage(
|
||||
parsed,
|
||||
socket,
|
||||
opts.onStatusChange,
|
||||
opts.connectionId,
|
||||
);
|
||||
opts.onStatusChange?.("idle", opts.connectionId);
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
if (process.env.DEBUG) {
|
||||
console.error("[Listen] Error handling queued message:", error);
|
||||
}
|
||||
opts.onStatusChange?.("idle", opts.connectionId);
|
||||
});
|
||||
// Handle incoming messages (queued for sequential processing)
|
||||
if (parsed.type === "message") {
|
||||
runtime.messageQueue = runtime.messageQueue
|
||||
.then(async () => {
|
||||
if (runtime !== activeRuntime || runtime.intentionallyClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
opts.onStatusChange?.("receiving", opts.connectionId);
|
||||
await handleIncomingMessage(
|
||||
parsed,
|
||||
socket,
|
||||
opts.onStatusChange,
|
||||
opts.connectionId,
|
||||
);
|
||||
opts.onStatusChange?.("idle", opts.connectionId);
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
if (process.env.DEBUG) {
|
||||
console.error("[Listen] Error handling queued message:", error);
|
||||
}
|
||||
opts.onStatusChange?.("idle", opts.connectionId);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
socket.on("close", (code: number, reason: Buffer) => {
|
||||
@@ -427,13 +504,25 @@ async function handleIncomingMessage(
|
||||
): Promise<void> {
|
||||
try {
|
||||
const agentId = msg.agentId;
|
||||
const requestedConversationId = msg.conversationId;
|
||||
// requestedConversationId can be:
|
||||
// - undefined: no conversation (use agent endpoint)
|
||||
// - null: no conversation (use agent endpoint)
|
||||
// - string: specific conversation ID (use conversations endpoint)
|
||||
const requestedConversationId = msg.conversationId || undefined;
|
||||
|
||||
// For sendMessageStream: "default" means use agent endpoint, else use conversations endpoint
|
||||
const conversationId = requestedConversationId ?? "default";
|
||||
|
||||
if (!agentId) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (process.env.DEBUG) {
|
||||
console.log(
|
||||
`[Listen] Handling message: agentId=${agentId}, requestedConversationId=${requestedConversationId}, conversationId=${conversationId}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (connectionId) {
|
||||
onStatusChange?.("processing", connectionId);
|
||||
}
|
||||
@@ -503,38 +592,150 @@ async function handleIncomingMessage(
|
||||
});
|
||||
|
||||
let runIdSent = false;
|
||||
|
||||
const buffers = createBuffers(agentId);
|
||||
const result = await drainStreamWithResume(
|
||||
stream as Stream<LettaStreamingResponse>,
|
||||
buffers,
|
||||
() => {},
|
||||
undefined,
|
||||
undefined,
|
||||
({ chunk }) => {
|
||||
const maybeRunId = (chunk as { run_id?: unknown }).run_id;
|
||||
if (!runIdSent && typeof maybeRunId === "string") {
|
||||
runIdSent = true;
|
||||
sendClientMessage(socket, {
|
||||
type: "run_started",
|
||||
runId: maybeRunId,
|
||||
});
|
||||
}
|
||||
return undefined;
|
||||
},
|
||||
);
|
||||
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: result.stopReason === "end_turn",
|
||||
stopReason: result.stopReason,
|
||||
});
|
||||
} catch {
|
||||
// Approval loop: continue until end_turn or error
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const result = await drainStreamWithResume(
|
||||
stream as Stream<LettaStreamingResponse>,
|
||||
buffers,
|
||||
() => {},
|
||||
undefined,
|
||||
undefined,
|
||||
({ chunk }) => {
|
||||
const maybeRunId = (chunk as { run_id?: unknown }).run_id;
|
||||
if (!runIdSent && typeof maybeRunId === "string") {
|
||||
runIdSent = true;
|
||||
sendClientMessage(socket, {
|
||||
type: "run_started",
|
||||
runId: maybeRunId,
|
||||
});
|
||||
}
|
||||
return undefined;
|
||||
},
|
||||
);
|
||||
|
||||
const stopReason = result.stopReason;
|
||||
const approvals = result.approvals || [];
|
||||
|
||||
// Case 1: Turn ended normally
|
||||
if (stopReason === "end_turn") {
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: true,
|
||||
stopReason: "end_turn",
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
// Case 2: Error or cancelled
|
||||
if (stopReason !== "requires_approval") {
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: false,
|
||||
stopReason,
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
// Case 3: Requires approval - classify and handle based on permission mode
|
||||
if (approvals.length === 0) {
|
||||
// Unexpected: requires_approval but no approvals
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: false,
|
||||
stopReason: "error",
|
||||
});
|
||||
break;
|
||||
}
|
||||
|
||||
// Classify approvals (auto-allow, auto-deny, needs user input)
|
||||
// Don't treat "ask" as deny - cloud UI can handle approvals
|
||||
const { autoAllowed, autoDenied, needsUserInput } =
|
||||
await classifyApprovals(approvals, {
|
||||
treatAskAsDeny: false, // Let cloud UI handle approvals
|
||||
requireArgsForAutoApprove: true,
|
||||
});
|
||||
|
||||
// If there are approvals that need user input, pause execution
|
||||
// Cloud UI will see pending approvals via /v1/runs/:runId/stream from core
|
||||
// and show approval dialog. When user approves, cloud sends approval message
|
||||
// back to this device, which resumes execution.
|
||||
if (needsUserInput.length > 0) {
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: false,
|
||||
stopReason: "requires_approval",
|
||||
});
|
||||
break; // Exit loop - cloud will send approval message when user approves
|
||||
}
|
||||
|
||||
// Only auto-allowed and auto-denied tools remain
|
||||
// Build decisions list
|
||||
type Decision =
|
||||
| {
|
||||
type: "approve";
|
||||
approval: {
|
||||
toolCallId: string;
|
||||
toolName: string;
|
||||
toolArgs: string;
|
||||
};
|
||||
}
|
||||
| {
|
||||
type: "deny";
|
||||
approval: {
|
||||
toolCallId: string;
|
||||
toolName: string;
|
||||
toolArgs: string;
|
||||
};
|
||||
reason: string;
|
||||
};
|
||||
|
||||
const decisions: Decision[] = [
|
||||
...autoAllowed.map((ac) => ({
|
||||
type: "approve" as const,
|
||||
approval: ac.approval,
|
||||
})),
|
||||
...autoDenied.map((ac) => ({
|
||||
type: "deny" as const,
|
||||
approval: ac.approval,
|
||||
reason: ac.denyReason || ac.permission.reason || "Permission denied",
|
||||
})),
|
||||
];
|
||||
|
||||
// Execute approved/denied tools
|
||||
const executionResults = await executeApprovalBatch(decisions);
|
||||
|
||||
// Send approval message back to agent to continue execution
|
||||
const approvalStream = await sendMessageStream(
|
||||
conversationId,
|
||||
[
|
||||
{
|
||||
type: "approval",
|
||||
approvals: executionResults,
|
||||
},
|
||||
],
|
||||
{
|
||||
agentId,
|
||||
streamTokens: true,
|
||||
background: true,
|
||||
},
|
||||
);
|
||||
|
||||
// Replace stream with approval stream for next iteration
|
||||
Object.assign(stream, approvalStream);
|
||||
}
|
||||
} catch (error) {
|
||||
sendClientMessage(socket, {
|
||||
type: "result",
|
||||
success: false,
|
||||
stopReason: "error",
|
||||
});
|
||||
|
||||
if (process.env.DEBUG) {
|
||||
console.error("[Listen] Error handling message:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user