revert: "fix: add robustness for connection errors for approvals" (#345)

This commit is contained in:
Charles Packer
2025-12-22 14:31:21 -08:00
committed by GitHub
parent cf558b868d
commit 008233f295
4 changed files with 19 additions and 207 deletions

View File

@@ -18,16 +18,11 @@ export async function sendMessageStream(
background?: boolean;
// add more later: includePings, request timeouts, etc.
} = { streamTokens: true, background: true },
requestOptions: { maxRetries?: number } = { maxRetries: 0 },
): Promise<Stream<LettaStreamingResponse>> {
const client = await getClient();
return client.agents.messages.stream(
agentId,
{
messages: messages,
stream_tokens: opts.streamTokens ?? true,
background: opts.background ?? true,
},
requestOptions,
);
return client.agents.messages.stream(agentId, {
messages: messages,
stream_tokens: opts.streamTokens ?? true,
background: opts.background ?? true,
});
}

View File

@@ -1,86 +0,0 @@
import type Letta from "@letta-ai/letta-client";
import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents";
import type { createBuffers } from "../cli/helpers/accumulator";
import type { ApprovalRequest, DrainResult } from "../cli/helpers/stream";
import { drainStreamWithResume } from "../cli/helpers/stream";
import { getResumeData } from "./check-approval";
export async function resyncPendingApprovals(
client: Letta,
agent: AgentState,
): Promise<ApprovalRequest[]> {
const { pendingApprovals } = await getResumeData(client, agent);
return pendingApprovals ?? [];
}
export async function findNewestActiveBackgroundRunId(
client: Letta,
agentId: string,
): Promise<string | null> {
const runsPage = await client.runs.list({
active: true,
agent_id: agentId,
background: true,
limit: 10,
});
const runs = runsPage.items ?? [];
if (runs.length === 0) return null;
// Prefer the most recently created run.
runs.sort((a, b) => {
const aTs =
Date.parse((a as { created_at?: string }).created_at ?? "") || 0;
const bTs =
Date.parse((b as { created_at?: string }).created_at ?? "") || 0;
return bTs - aTs;
});
return runs[0]?.id ?? null;
}
export type StaleApprovalRecovery =
| { kind: "pending_approval"; approvals: ApprovalRequest[] }
| { kind: "relatched"; result: DrainResult }
| { kind: "noop" };
export async function recoverFromStaleApproval(
client: Letta,
agentId: string,
buffers: ReturnType<typeof createBuffers>,
refresh: () => void,
abortSignal: AbortSignal | undefined,
opts: {
lastKnownRunId?: string | null;
lastKnownSeqId?: number | null;
} = {},
): Promise<StaleApprovalRecovery> {
const agent = await client.agents.retrieve(agentId);
const approvals = await resyncPendingApprovals(client, agent);
if (approvals.length > 0) {
return { kind: "pending_approval", approvals };
}
const runId =
opts.lastKnownRunId ??
(await findNewestActiveBackgroundRunId(client, agentId));
if (!runId) return { kind: "noop" };
const stream = await client.runs.messages.stream(
runId,
{
starting_after: opts.lastKnownSeqId ?? undefined,
batch_size: 1000,
},
{ maxRetries: 0 },
);
const result = await drainStreamWithResume(
stream,
buffers,
refresh,
abortSignal,
);
return { kind: "relatched", result };
}

View File

@@ -1,7 +1,7 @@
// src/cli/App.tsx
import { existsSync, readFileSync, writeFileSync } from "node:fs";
import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error";
import { APIUserAbortError } from "@letta-ai/letta-client/core/error";
import type {
AgentState,
MessageCreate,
@@ -20,10 +20,6 @@ import { getClient } from "../agent/client";
import { getCurrentAgentId, setCurrentAgentId } from "../agent/context";
import type { AgentProvenance } from "../agent/create";
import { sendMessageStream } from "../agent/message";
import {
recoverFromStaleApproval,
resyncPendingApprovals,
} from "../agent/recover";
import { SessionStats } from "../agent/stats";
import type { ApprovalContext } from "../permissions/analyzer";
import { permissionMode } from "../permissions/mode";
@@ -845,9 +841,6 @@ export default function App({
initialInput: Array<MessageCreate | ApprovalCreate>,
): Promise<void> => {
const currentInput = initialInput;
// Track lastRunId outside the while loop so it's available in catch block
let lastKnownRunId: string | null = null;
let lastKnownSeqId: number | null = null;
try {
// Check if user hit escape before we started
@@ -931,28 +924,14 @@ export default function App({
}
};
const {
stopReason,
approval,
approvals,
apiDurationMs,
lastRunId,
lastSeqId,
} = await drainStreamWithResume(
stream,
buffersRef.current,
refreshDerivedThrottled,
abortControllerRef.current?.signal,
syncAgentState,
);
// Update lastKnownRunId for error handling in catch block
if (lastRunId) {
lastKnownRunId = lastRunId;
}
if (lastSeqId !== null && lastSeqId !== undefined) {
lastKnownSeqId = lastSeqId;
}
const { stopReason, approval, approvals, apiDurationMs, lastRunId } =
await drainStreamWithResume(
stream,
buffersRef.current,
refreshDerivedThrottled,
abortControllerRef.current?.signal,
syncAgentState,
);
// Track API duration
sessionStatsRef.current.endTurn(apiDurationMs);
@@ -1440,76 +1419,6 @@ export default function App({
return;
}
} catch (e) {
// Stale approval recovery:
// If the approval submission succeeded server-side but the client observed a stale state
// (or a retried request), the backend returns a 400 ValueError saying no approvals are pending.
// In that case, resync + relatch to the active run stream instead of erroring.
if (
e instanceof APIError &&
e.status === 400 &&
typeof currentInput?.[0] === "object" &&
currentInput[0] &&
"type" in currentInput[0] &&
currentInput[0].type === "approval"
) {
const detail =
(e.error as Record<string, unknown>)?.detail ??
(e.error as Record<string, Record<string, unknown>>)?.error
?.detail ??
e.message ??
"";
if (
typeof detail === "string" &&
detail.includes(
"Cannot process approval response: No tool call is currently awaiting approval",
)
) {
try {
const client = await getClient();
const recovery = await recoverFromStaleApproval(
client,
agentId,
buffersRef.current,
refreshDerivedThrottled,
abortControllerRef.current?.signal,
{ lastKnownRunId, lastKnownSeqId },
);
let approvalsToShow: ApprovalRequest[] = [];
if (recovery.kind === "pending_approval") {
approvalsToShow = recovery.approvals;
} else {
// After relatching, re-check in case a new approval request was produced.
const agent = await client.agents.retrieve(agentId);
approvalsToShow = await resyncPendingApprovals(client, agent);
}
if (approvalsToShow.length > 0) {
setPendingApprovals(approvalsToShow);
const contexts = await Promise.all(
approvalsToShow.map(async (approvalItem) => {
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
approvalItem.toolArgs,
{},
);
return await analyzeToolApproval(
approvalItem.toolName,
parsedArgs,
);
}),
);
setApprovalContexts(contexts);
}
setStreaming(false);
refreshDerived();
return;
} catch (_recoveryError) {
// Fall through to normal error handling if recovery fails.
}
}
}
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
markIncompleteToolsAsCancelled(buffersRef.current);
@@ -1549,13 +1458,11 @@ export default function App({
abortControllerRef.current = null;
}
},
[
appendError,
refreshDerived,
refreshDerivedThrottled,
setStreaming,
agentId,
currentModelId,
],
);

View File

@@ -17,7 +17,7 @@ export type ApprovalRequest = {
toolArgs: string;
};
export type DrainResult = {
type DrainResult = {
stopReason: StopReasonType;
lastRunId?: string | null;
lastSeqId?: number | null;
@@ -263,14 +263,10 @@ export async function drainStreamWithResume(
try {
const client = await getClient();
// Resume from Redis where we left off
const resumeStream = await client.runs.messages.stream(
result.lastRunId,
{
starting_after: result.lastSeqId,
batch_size: 1000, // Fetch buffered chunks quickly
},
{ maxRetries: 0 },
);
const resumeStream = await client.runs.messages.stream(result.lastRunId, {
starting_after: result.lastSeqId,
batch_size: 1000, // Fetch buffered chunks quickly
});
// Continue draining from where we left off
// Note: Don't pass onFirstMessage again - already called in initial drain