fix: handle possible case of desync on pending approvals (#53)
This commit is contained in:
@@ -2,6 +2,7 @@
|
|||||||
// Check for pending approvals and retrieve recent message history when resuming an agent
|
// Check for pending approvals and retrieve recent message history when resuming an agent
|
||||||
|
|
||||||
import type Letta from "@letta-ai/letta-client";
|
import type Letta from "@letta-ai/letta-client";
|
||||||
|
import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents";
|
||||||
import type { LettaMessageUnion } from "@letta-ai/letta-client/resources/agents/messages";
|
import type { LettaMessageUnion } from "@letta-ai/letta-client/resources/agents/messages";
|
||||||
import type { ApprovalRequest } from "../cli/helpers/stream";
|
import type { ApprovalRequest } from "../cli/helpers/stream";
|
||||||
|
|
||||||
@@ -18,34 +19,98 @@ export interface ResumeData {
|
|||||||
* Checks for pending approvals and retrieves recent message history for backfill.
|
* Checks for pending approvals and retrieves recent message history for backfill.
|
||||||
*
|
*
|
||||||
* @param client - The Letta client
|
* @param client - The Letta client
|
||||||
* @param agentId - The agent ID
|
* @param agent - The agent state (includes in-context messages)
|
||||||
* @returns Pending approval (if any) and recent message history
|
* @returns Pending approval (if any) and recent message history
|
||||||
*/
|
*/
|
||||||
export async function getResumeData(
|
export async function getResumeData(
|
||||||
client: Letta,
|
client: Letta,
|
||||||
agentId: string,
|
agent: AgentState,
|
||||||
): Promise<ResumeData> {
|
): Promise<ResumeData> {
|
||||||
try {
|
try {
|
||||||
const messagesPage = await client.agents.messages.list(agentId);
|
const messagesPage = await client.agents.messages.list(agent.id);
|
||||||
const messages = messagesPage.items;
|
const messages = messagesPage.items;
|
||||||
if (!messages || messages.length === 0) {
|
if (!messages || messages.length === 0) {
|
||||||
return { pendingApproval: null, messageHistory: [] };
|
return { pendingApproval: null, messageHistory: [] };
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check for pending approval (last message)
|
// Compare cursor last message with in-context last message ID
|
||||||
|
// The backend uses in-context messages for CONFLICT validation, so if they're
|
||||||
|
// desynced, we need to check the in-context message for pending approvals
|
||||||
|
const cursorLastMessage = messages[messages.length - 1];
|
||||||
|
if (!cursorLastMessage) {
|
||||||
|
return { pendingApproval: null, messageHistory: [] };
|
||||||
|
}
|
||||||
|
|
||||||
|
const inContextLastMessageId =
|
||||||
|
agent.message_ids && agent.message_ids.length > 0
|
||||||
|
? agent.message_ids[agent.message_ids.length - 1]
|
||||||
|
: null;
|
||||||
|
|
||||||
|
let messageToCheck = cursorLastMessage;
|
||||||
|
|
||||||
|
// If there's a desync, find the in-context message in the cursor fetch
|
||||||
|
if (
|
||||||
|
inContextLastMessageId &&
|
||||||
|
cursorLastMessage.id !== inContextLastMessageId
|
||||||
|
) {
|
||||||
|
console.warn(
|
||||||
|
`[check-approval] Desync detected - cursor last: ${cursorLastMessage.id}, in-context last: ${inContextLastMessageId}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Search for the in-context message in the fetched messages
|
||||||
|
// NOTE: There might be multiple messages with the same ID (duplicates)
|
||||||
|
// We want the one with role === "approval" if it exists
|
||||||
|
const matchingMessages = messages.filter(
|
||||||
|
(msg) => msg.id === inContextLastMessageId,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (matchingMessages.length > 0) {
|
||||||
|
// Prefer the approval request message if it exists (duplicates can have different types)
|
||||||
|
const approvalMessage = matchingMessages.find(
|
||||||
|
(msg) => msg.message_type === "approval_request_message",
|
||||||
|
);
|
||||||
|
const inContextMessage =
|
||||||
|
approvalMessage || matchingMessages[matchingMessages.length - 1]!;
|
||||||
|
|
||||||
|
messageToCheck = inContextMessage;
|
||||||
|
} else {
|
||||||
|
console.warn(
|
||||||
|
`[check-approval] In-context message ${inContextLastMessageId} not found in cursor fetch.\n` +
|
||||||
|
` This likely means the in-context message is older than the cursor window.\n` +
|
||||||
|
` Falling back to cursor message - approval state may be incorrect.`,
|
||||||
|
);
|
||||||
|
// Fall back to cursor message if we can't find the in-context one
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for pending approval using SDK types
|
||||||
let pendingApproval: ApprovalRequest | null = null;
|
let pendingApproval: ApprovalRequest | null = null;
|
||||||
const lastMessage = messages[messages.length - 1];
|
|
||||||
if (lastMessage?.message_type === "approval_request_message") {
|
if (messageToCheck.message_type === "approval_request_message") {
|
||||||
|
// Cast to access tool_calls with proper typing
|
||||||
|
const approvalMsg = messageToCheck as LettaMessageUnion & {
|
||||||
|
tool_calls?: Array<{
|
||||||
|
tool_call_id?: string;
|
||||||
|
name?: string;
|
||||||
|
arguments?: string;
|
||||||
|
}>;
|
||||||
|
tool_call?: {
|
||||||
|
tool_call_id?: string;
|
||||||
|
name?: string;
|
||||||
|
arguments?: string;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
// Use tool_calls array (new) or fallback to tool_call (deprecated)
|
// Use tool_calls array (new) or fallback to tool_call (deprecated)
|
||||||
const toolCalls = Array.isArray(lastMessage.tool_calls)
|
const toolCalls = Array.isArray(approvalMsg.tool_calls)
|
||||||
? lastMessage.tool_calls
|
? approvalMsg.tool_calls
|
||||||
: lastMessage.tool_call
|
: approvalMsg.tool_call
|
||||||
? [lastMessage.tool_call]
|
? [approvalMsg.tool_call]
|
||||||
: [];
|
: [];
|
||||||
|
|
||||||
if (toolCalls.length > 0) {
|
if (toolCalls.length > 0) {
|
||||||
const toolCall = toolCalls[0];
|
const toolCall = toolCalls[0];
|
||||||
// Ensure all required fields are present (type guard for ToolCall vs ToolCallDelta)
|
// Ensure all required fields are present
|
||||||
if (toolCall?.tool_call_id && toolCall.name && toolCall.arguments) {
|
if (toolCall?.tool_call_id && toolCall.name && toolCall.arguments) {
|
||||||
pendingApproval = {
|
pendingApproval = {
|
||||||
toolCallId: toolCall.tool_call_id,
|
toolCallId: toolCall.tool_call_id,
|
||||||
@@ -56,7 +121,7 @@ export async function getResumeData(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get last N messages for backfill
|
// Get last N messages for backfill (always use cursor messages for history)
|
||||||
const historyCount = Math.min(MESSAGE_HISTORY_LIMIT, messages.length);
|
const historyCount = Math.min(MESSAGE_HISTORY_LIMIT, messages.length);
|
||||||
let messageHistory = messages.slice(-historyCount);
|
let messageHistory = messages.slice(-historyCount);
|
||||||
|
|
||||||
|
|||||||
@@ -394,12 +394,13 @@ export default function App({
|
|||||||
while (true) {
|
while (true) {
|
||||||
// Stream one turn
|
// Stream one turn
|
||||||
const stream = await sendMessageStream(agentId, currentInput);
|
const stream = await sendMessageStream(agentId, currentInput);
|
||||||
const { stopReason, approval, apiDurationMs } = await drainStream(
|
const { stopReason, approval, apiDurationMs, lastRunId } =
|
||||||
stream,
|
await drainStream(
|
||||||
buffersRef.current,
|
stream,
|
||||||
refreshDerivedThrottled,
|
buffersRef.current,
|
||||||
abortControllerRef.current.signal,
|
refreshDerivedThrottled,
|
||||||
);
|
abortControllerRef.current.signal,
|
||||||
|
);
|
||||||
|
|
||||||
// Track API duration
|
// Track API duration
|
||||||
sessionStatsRef.current.endTurn(apiDurationMs);
|
sessionStatsRef.current.endTurn(apiDurationMs);
|
||||||
@@ -520,17 +521,39 @@ export default function App({
|
|||||||
continue; // Loop continues naturally
|
continue; // Loop continues naturally
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: for error stop reasons, fetch step details
|
|
||||||
// using lastRunId to get full error message from step.errorData
|
|
||||||
// Example: client.runs.steps.list(lastRunId, { limit: 1, order: "desc" })
|
|
||||||
// Then display step.errorData.message or full error details instead of generic message
|
|
||||||
|
|
||||||
// Unexpected stop reason (error, llm_api_error, etc.)
|
// Unexpected stop reason (error, llm_api_error, etc.)
|
||||||
// Mark incomplete tool calls as finished to prevent stuck blinking UI
|
// Mark incomplete tool calls as finished to prevent stuck blinking UI
|
||||||
markIncompleteToolsAsCancelled(buffersRef.current);
|
markIncompleteToolsAsCancelled(buffersRef.current);
|
||||||
|
|
||||||
// Show stop reason (mid-stream errors should already be in buffers)
|
// Fetch error details from the run if available
|
||||||
appendError(`Unexpected stop reason: ${stopReason}`);
|
let errorDetails = `Unexpected stop reason: ${stopReason}`;
|
||||||
|
if (lastRunId) {
|
||||||
|
try {
|
||||||
|
const client = await getClient();
|
||||||
|
const run = await client.runs.retrieve(lastRunId);
|
||||||
|
|
||||||
|
// Check if run has error information in metadata
|
||||||
|
if (run.metadata?.error) {
|
||||||
|
const error = run.metadata.error as {
|
||||||
|
type?: string;
|
||||||
|
message?: string;
|
||||||
|
detail?: string;
|
||||||
|
};
|
||||||
|
const errorType = error.type ? `[${error.type}] ` : "";
|
||||||
|
const errorMessage = error.message || "An error occurred";
|
||||||
|
const errorDetail = error.detail ? `\n${error.detail}` : "";
|
||||||
|
errorDetails = `${errorType}${errorMessage}${errorDetail}`;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
// If we can't fetch error details, let user know
|
||||||
|
appendError(
|
||||||
|
`${errorDetails}\n(Unable to fetch additional error details from server)`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
appendError(errorDetails);
|
||||||
|
|
||||||
setStreaming(false);
|
setStreaming(false);
|
||||||
refreshDerived();
|
refreshDerived();
|
||||||
@@ -889,9 +912,11 @@ export default function App({
|
|||||||
if (CHECK_PENDING_APPROVALS_BEFORE_SEND) {
|
if (CHECK_PENDING_APPROVALS_BEFORE_SEND) {
|
||||||
try {
|
try {
|
||||||
const client = await getClient();
|
const client = await getClient();
|
||||||
|
// Fetch fresh agent state to check for pending approvals with accurate in-context messages
|
||||||
|
const agent = await client.agents.retrieve(agentId);
|
||||||
const { pendingApproval: existingApproval } = await getResumeData(
|
const { pendingApproval: existingApproval } = await getResumeData(
|
||||||
client,
|
client,
|
||||||
agentId,
|
agent,
|
||||||
);
|
);
|
||||||
|
|
||||||
if (existingApproval) {
|
if (existingApproval) {
|
||||||
|
|||||||
@@ -146,7 +146,9 @@ export async function handleHeadlessCommand(argv: string[], model?: string) {
|
|||||||
const resolveAllPendingApprovals = async () => {
|
const resolveAllPendingApprovals = async () => {
|
||||||
const { getResumeData } = await import("./agent/check-approval");
|
const { getResumeData } = await import("./agent/check-approval");
|
||||||
while (true) {
|
while (true) {
|
||||||
const resume = await getResumeData(client, agent.id);
|
// Re-fetch agent to get latest in-context messages (source of truth for backend)
|
||||||
|
const freshAgent = await client.agents.retrieve(agent.id);
|
||||||
|
const resume = await getResumeData(client, freshAgent);
|
||||||
if (!resume.pendingApproval) break;
|
if (!resume.pendingApproval) break;
|
||||||
const { toolCallId, toolName, toolArgs } = resume.pendingApproval;
|
const { toolCallId, toolName, toolArgs } = resume.pendingApproval;
|
||||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||||
@@ -248,6 +250,7 @@ export async function handleHeadlessCommand(argv: string[], model?: string) {
|
|||||||
toolArgs: string;
|
toolArgs: string;
|
||||||
} | null = null;
|
} | null = null;
|
||||||
let apiDurationMs: number;
|
let apiDurationMs: number;
|
||||||
|
let lastRunId: string | null = null;
|
||||||
|
|
||||||
if (outputFormat === "stream-json") {
|
if (outputFormat === "stream-json") {
|
||||||
const startTime = performance.now();
|
const startTime = performance.now();
|
||||||
@@ -435,6 +438,8 @@ export async function handleHeadlessCommand(argv: string[], model?: string) {
|
|||||||
|
|
||||||
stopReason = lastStopReason || "error";
|
stopReason = lastStopReason || "error";
|
||||||
apiDurationMs = performance.now() - startTime;
|
apiDurationMs = performance.now() - startTime;
|
||||||
|
// Use the last run_id we saw (if any)
|
||||||
|
lastRunId = runIds.size > 0 ? Array.from(runIds).pop() || null : null;
|
||||||
|
|
||||||
// Mark final line as finished
|
// Mark final line as finished
|
||||||
const { markCurrentLineAsFinished } = await import(
|
const { markCurrentLineAsFinished } = await import(
|
||||||
@@ -451,6 +456,7 @@ export async function handleHeadlessCommand(argv: string[], model?: string) {
|
|||||||
stopReason = result.stopReason;
|
stopReason = result.stopReason;
|
||||||
approval = result.approval || null;
|
approval = result.approval || null;
|
||||||
apiDurationMs = result.apiDurationMs;
|
apiDurationMs = result.apiDurationMs;
|
||||||
|
lastRunId = result.lastRunId || null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track API duration for this stream
|
// Track API duration for this stream
|
||||||
@@ -556,11 +562,32 @@ export async function handleHeadlessCommand(argv: string[], model?: string) {
|
|||||||
.map((line) => ("text" in line ? line.text : ""))
|
.map((line) => ("text" in line ? line.text : ""))
|
||||||
.filter(Boolean);
|
.filter(Boolean);
|
||||||
|
|
||||||
const errorMessage =
|
let errorMessage =
|
||||||
errorMessages.length > 0
|
errorMessages.length > 0
|
||||||
? errorMessages.join("; ")
|
? errorMessages.join("; ")
|
||||||
: `Unexpected stop reason: ${stopReason}`;
|
: `Unexpected stop reason: ${stopReason}`;
|
||||||
|
|
||||||
|
// Fetch detailed error from run metadata if available
|
||||||
|
if (lastRunId && errorMessages.length === 0) {
|
||||||
|
try {
|
||||||
|
const run = await client.runs.retrieve(lastRunId);
|
||||||
|
if (run.metadata?.error) {
|
||||||
|
const error = run.metadata.error as {
|
||||||
|
type?: string;
|
||||||
|
message?: string;
|
||||||
|
detail?: string;
|
||||||
|
};
|
||||||
|
const errorType = error.type ? `[${error.type}] ` : "";
|
||||||
|
const errorMsg = error.message || "An error occurred";
|
||||||
|
const errorDetail = error.detail ? `: ${error.detail}` : "";
|
||||||
|
errorMessage = `${errorType}${errorMsg}${errorDetail}`;
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
// If we can't fetch error details, append note to error message
|
||||||
|
errorMessage = `${errorMessage}\n(Unable to fetch additional error details from server)`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (outputFormat === "stream-json") {
|
if (outputFormat === "stream-json") {
|
||||||
// Emit error event
|
// Emit error event
|
||||||
console.log(
|
console.log(
|
||||||
|
|||||||
@@ -365,7 +365,7 @@ async function main() {
|
|||||||
// Get resume data (pending approval + message history) if resuming
|
// Get resume data (pending approval + message history) if resuming
|
||||||
if (resuming) {
|
if (resuming) {
|
||||||
setLoadingState("checking");
|
setLoadingState("checking");
|
||||||
const data = await getResumeData(client, agent.id);
|
const data = await getResumeData(client, agent);
|
||||||
setResumeData(data);
|
setResumeData(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user