fix: handle pre-stream approval desync errors with keep-alive recovery (#491)
Co-authored-by: Letta <noreply@letta.com>
This commit is contained in:
112
src/cli/App.tsx
112
src/cli/App.tsx
@@ -2,7 +2,7 @@
|
||||
|
||||
import { existsSync, readFileSync, writeFileSync } from "node:fs";
|
||||
|
||||
import { APIUserAbortError } from "@letta-ai/letta-client/core/error";
|
||||
import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error";
|
||||
import type {
|
||||
AgentState,
|
||||
MessageCreate,
|
||||
@@ -1488,10 +1488,112 @@ export default function App({
|
||||
}
|
||||
|
||||
// Stream one turn - use ref to always get the latest agentId
|
||||
const stream = await sendMessageStream(
|
||||
agentIdRef.current,
|
||||
currentInput,
|
||||
);
|
||||
// Wrap in try-catch to handle pre-stream desync errors (when sendMessageStream
|
||||
// throws before streaming begins, e.g., retry after LLM error when backend
|
||||
// already cleared the approval)
|
||||
let stream: Awaited<ReturnType<typeof sendMessageStream>>;
|
||||
try {
|
||||
stream = await sendMessageStream(agentIdRef.current, currentInput);
|
||||
} catch (preStreamError) {
|
||||
// Check if this is a pre-stream approval desync error
|
||||
const hasApprovalInPayload = currentInput.some(
|
||||
(item) => item?.type === "approval",
|
||||
);
|
||||
|
||||
if (hasApprovalInPayload) {
|
||||
// Extract error detail from APIError (handles both direct and nested structures)
|
||||
// Direct: e.error.detail | Nested: e.error.error.detail (matches formatErrorDetails)
|
||||
let errorDetail = "";
|
||||
if (
|
||||
preStreamError instanceof APIError &&
|
||||
preStreamError.error &&
|
||||
typeof preStreamError.error === "object"
|
||||
) {
|
||||
const errObj = preStreamError.error as Record<string, unknown>;
|
||||
// Check nested structure first: e.error.error.detail
|
||||
if (
|
||||
errObj.error &&
|
||||
typeof errObj.error === "object" &&
|
||||
"detail" in errObj.error
|
||||
) {
|
||||
const nested = errObj.error as Record<string, unknown>;
|
||||
errorDetail =
|
||||
typeof nested.detail === "string" ? nested.detail : "";
|
||||
}
|
||||
// Fallback to direct structure: e.error.detail
|
||||
if (!errorDetail && typeof errObj.detail === "string") {
|
||||
errorDetail = errObj.detail;
|
||||
}
|
||||
}
|
||||
// Final fallback: use Error.message
|
||||
if (!errorDetail && preStreamError instanceof Error) {
|
||||
errorDetail = preStreamError.message;
|
||||
}
|
||||
|
||||
// If desync detected and retries available, recover with keep-alive prompt
|
||||
if (
|
||||
isApprovalStateDesyncError(errorDetail) &&
|
||||
llmApiErrorRetriesRef.current < LLM_API_ERROR_MAX_RETRIES
|
||||
) {
|
||||
llmApiErrorRetriesRef.current += 1;
|
||||
|
||||
// Show transient status (matches post-stream desync handler UX)
|
||||
const statusId = uid("status");
|
||||
buffersRef.current.byId.set(statusId, {
|
||||
kind: "status",
|
||||
id: statusId,
|
||||
lines: [
|
||||
"Approval state desynced; resending keep-alive recovery prompt...",
|
||||
],
|
||||
});
|
||||
buffersRef.current.order.push(statusId);
|
||||
refreshDerived();
|
||||
|
||||
// Swap payload to recovery message (or strip stale approvals)
|
||||
const isApprovalOnlyPayload =
|
||||
hasApprovalInPayload && currentInput.length === 1;
|
||||
if (isApprovalOnlyPayload) {
|
||||
currentInput.splice(
|
||||
0,
|
||||
currentInput.length,
|
||||
buildApprovalRecoveryMessage(),
|
||||
);
|
||||
} else {
|
||||
// Mixed payload: strip stale approvals, keep user message
|
||||
const messageItems = currentInput.filter(
|
||||
(item) => item?.type !== "approval",
|
||||
);
|
||||
if (messageItems.length > 0) {
|
||||
currentInput.splice(
|
||||
0,
|
||||
currentInput.length,
|
||||
...messageItems,
|
||||
);
|
||||
} else {
|
||||
currentInput.splice(
|
||||
0,
|
||||
currentInput.length,
|
||||
buildApprovalRecoveryMessage(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove transient status before retry
|
||||
buffersRef.current.byId.delete(statusId);
|
||||
buffersRef.current.order = buffersRef.current.order.filter(
|
||||
(id) => id !== statusId,
|
||||
);
|
||||
refreshDerived();
|
||||
|
||||
// Reset interrupted flag so retry stream chunks are processed
|
||||
buffersRef.current.interrupted = false;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Not a recoverable desync - re-throw to outer catch
|
||||
throw preStreamError;
|
||||
}
|
||||
|
||||
// Check again after network call - user may have pressed Escape during sendMessageStream
|
||||
if (signal?.aborted) {
|
||||
|
||||
Reference in New Issue
Block a user