fix: add robustness for connection errors for approvals (#189)
Co-authored-by: Letta <noreply@letta.com> Co-authored-by: Charles Packer <packercharles@gmail.com>
This commit is contained in:
@@ -18,11 +18,16 @@ export async function sendMessageStream(
|
||||
background?: boolean;
|
||||
// add more later: includePings, request timeouts, etc.
|
||||
} = { streamTokens: true, background: true },
|
||||
requestOptions: { maxRetries?: number } = { maxRetries: 0 },
|
||||
): Promise<Stream<LettaStreamingResponse>> {
|
||||
const client = await getClient();
|
||||
return client.agents.messages.stream(agentId, {
|
||||
messages: messages,
|
||||
stream_tokens: opts.streamTokens ?? true,
|
||||
background: opts.background ?? true,
|
||||
});
|
||||
return client.agents.messages.stream(
|
||||
agentId,
|
||||
{
|
||||
messages: messages,
|
||||
stream_tokens: opts.streamTokens ?? true,
|
||||
background: opts.background ?? true,
|
||||
},
|
||||
requestOptions,
|
||||
);
|
||||
}
|
||||
|
||||
86
src/agent/recover.ts
Normal file
86
src/agent/recover.ts
Normal file
@@ -0,0 +1,86 @@
|
||||
import type Letta from "@letta-ai/letta-client";
|
||||
import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents";
|
||||
|
||||
import type { createBuffers } from "../cli/helpers/accumulator";
|
||||
import type { ApprovalRequest, DrainResult } from "../cli/helpers/stream";
|
||||
import { drainStreamWithResume } from "../cli/helpers/stream";
|
||||
import { getResumeData } from "./check-approval";
|
||||
|
||||
export async function resyncPendingApprovals(
|
||||
client: Letta,
|
||||
agent: AgentState,
|
||||
): Promise<ApprovalRequest[]> {
|
||||
const { pendingApprovals } = await getResumeData(client, agent);
|
||||
return pendingApprovals ?? [];
|
||||
}
|
||||
|
||||
export async function findNewestActiveBackgroundRunId(
|
||||
client: Letta,
|
||||
agentId: string,
|
||||
): Promise<string | null> {
|
||||
const runsPage = await client.runs.list({
|
||||
active: true,
|
||||
agent_id: agentId,
|
||||
background: true,
|
||||
limit: 10,
|
||||
});
|
||||
|
||||
const runs = runsPage.items ?? [];
|
||||
if (runs.length === 0) return null;
|
||||
|
||||
// Prefer the most recently created run.
|
||||
runs.sort((a, b) => {
|
||||
const aTs =
|
||||
Date.parse((a as { created_at?: string }).created_at ?? "") || 0;
|
||||
const bTs =
|
||||
Date.parse((b as { created_at?: string }).created_at ?? "") || 0;
|
||||
return bTs - aTs;
|
||||
});
|
||||
|
||||
return runs[0]?.id ?? null;
|
||||
}
|
||||
|
||||
export type StaleApprovalRecovery =
|
||||
| { kind: "pending_approval"; approvals: ApprovalRequest[] }
|
||||
| { kind: "relatched"; result: DrainResult }
|
||||
| { kind: "noop" };
|
||||
|
||||
export async function recoverFromStaleApproval(
|
||||
client: Letta,
|
||||
agentId: string,
|
||||
buffers: ReturnType<typeof createBuffers>,
|
||||
refresh: () => void,
|
||||
abortSignal: AbortSignal | undefined,
|
||||
opts: {
|
||||
lastKnownRunId?: string | null;
|
||||
lastKnownSeqId?: number | null;
|
||||
} = {},
|
||||
): Promise<StaleApprovalRecovery> {
|
||||
const agent = await client.agents.retrieve(agentId);
|
||||
const approvals = await resyncPendingApprovals(client, agent);
|
||||
if (approvals.length > 0) {
|
||||
return { kind: "pending_approval", approvals };
|
||||
}
|
||||
|
||||
const runId =
|
||||
opts.lastKnownRunId ??
|
||||
(await findNewestActiveBackgroundRunId(client, agentId));
|
||||
if (!runId) return { kind: "noop" };
|
||||
|
||||
const stream = await client.runs.messages.stream(
|
||||
runId,
|
||||
{
|
||||
starting_after: opts.lastKnownSeqId ?? undefined,
|
||||
batch_size: 1000,
|
||||
},
|
||||
{ maxRetries: 0 },
|
||||
);
|
||||
|
||||
const result = await drainStreamWithResume(
|
||||
stream,
|
||||
buffers,
|
||||
refresh,
|
||||
abortSignal,
|
||||
);
|
||||
return { kind: "relatched", result };
|
||||
}
|
||||
111
src/cli/App.tsx
111
src/cli/App.tsx
@@ -1,7 +1,7 @@
|
||||
// src/cli/App.tsx
|
||||
|
||||
import { existsSync, readFileSync, writeFileSync } from "node:fs";
|
||||
import { APIUserAbortError } from "@letta-ai/letta-client/core/error";
|
||||
import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error";
|
||||
import type {
|
||||
AgentState,
|
||||
MessageCreate,
|
||||
@@ -20,6 +20,10 @@ import { getClient } from "../agent/client";
|
||||
import { getCurrentAgentId, setCurrentAgentId } from "../agent/context";
|
||||
import type { AgentProvenance } from "../agent/create";
|
||||
import { sendMessageStream } from "../agent/message";
|
||||
import {
|
||||
recoverFromStaleApproval,
|
||||
resyncPendingApprovals,
|
||||
} from "../agent/recover";
|
||||
import { SessionStats } from "../agent/stats";
|
||||
import type { ApprovalContext } from "../permissions/analyzer";
|
||||
import { permissionMode } from "../permissions/mode";
|
||||
@@ -834,6 +838,9 @@ export default function App({
|
||||
initialInput: Array<MessageCreate | ApprovalCreate>,
|
||||
): Promise<void> => {
|
||||
const currentInput = initialInput;
|
||||
// Track lastRunId outside the while loop so it's available in catch block
|
||||
let lastKnownRunId: string | null = null;
|
||||
let lastKnownSeqId: number | null = null;
|
||||
|
||||
try {
|
||||
// Check if user hit escape before we started
|
||||
@@ -917,14 +924,28 @@ export default function App({
|
||||
}
|
||||
};
|
||||
|
||||
const { stopReason, approval, approvals, apiDurationMs, lastRunId } =
|
||||
await drainStreamWithResume(
|
||||
stream,
|
||||
buffersRef.current,
|
||||
refreshDerivedThrottled,
|
||||
abortControllerRef.current?.signal,
|
||||
syncAgentState,
|
||||
);
|
||||
const {
|
||||
stopReason,
|
||||
approval,
|
||||
approvals,
|
||||
apiDurationMs,
|
||||
lastRunId,
|
||||
lastSeqId,
|
||||
} = await drainStreamWithResume(
|
||||
stream,
|
||||
buffersRef.current,
|
||||
refreshDerivedThrottled,
|
||||
abortControllerRef.current?.signal,
|
||||
syncAgentState,
|
||||
);
|
||||
|
||||
// Update lastKnownRunId for error handling in catch block
|
||||
if (lastRunId) {
|
||||
lastKnownRunId = lastRunId;
|
||||
}
|
||||
if (lastSeqId !== null && lastSeqId !== undefined) {
|
||||
lastKnownSeqId = lastSeqId;
|
||||
}
|
||||
|
||||
// Track API duration
|
||||
sessionStatsRef.current.endTurn(apiDurationMs);
|
||||
@@ -1412,6 +1433,76 @@ export default function App({
|
||||
return;
|
||||
}
|
||||
} catch (e) {
|
||||
// Stale approval recovery:
|
||||
// If the approval submission succeeded server-side but the client observed a stale state
|
||||
// (or a retried request), the backend returns a 400 ValueError saying no approvals are pending.
|
||||
// In that case, resync + relatch to the active run stream instead of erroring.
|
||||
if (
|
||||
e instanceof APIError &&
|
||||
e.status === 400 &&
|
||||
typeof currentInput?.[0] === "object" &&
|
||||
currentInput[0] &&
|
||||
"type" in currentInput[0] &&
|
||||
currentInput[0].type === "approval"
|
||||
) {
|
||||
const detail =
|
||||
(e.error as Record<string, unknown>)?.detail ??
|
||||
(e.error as Record<string, Record<string, unknown>>)?.error
|
||||
?.detail ??
|
||||
e.message ??
|
||||
"";
|
||||
if (
|
||||
typeof detail === "string" &&
|
||||
detail.includes(
|
||||
"Cannot process approval response: No tool call is currently awaiting approval",
|
||||
)
|
||||
) {
|
||||
try {
|
||||
const client = await getClient();
|
||||
const recovery = await recoverFromStaleApproval(
|
||||
client,
|
||||
agentId,
|
||||
buffersRef.current,
|
||||
refreshDerivedThrottled,
|
||||
abortControllerRef.current?.signal,
|
||||
{ lastKnownRunId, lastKnownSeqId },
|
||||
);
|
||||
|
||||
let approvalsToShow: ApprovalRequest[] = [];
|
||||
if (recovery.kind === "pending_approval") {
|
||||
approvalsToShow = recovery.approvals;
|
||||
} else {
|
||||
// After relatching, re-check in case a new approval request was produced.
|
||||
const agent = await client.agents.retrieve(agentId);
|
||||
approvalsToShow = await resyncPendingApprovals(client, agent);
|
||||
}
|
||||
|
||||
if (approvalsToShow.length > 0) {
|
||||
setPendingApprovals(approvalsToShow);
|
||||
const contexts = await Promise.all(
|
||||
approvalsToShow.map(async (approvalItem) => {
|
||||
const parsedArgs = safeJsonParseOr<Record<string, unknown>>(
|
||||
approvalItem.toolArgs,
|
||||
{},
|
||||
);
|
||||
return await analyzeToolApproval(
|
||||
approvalItem.toolName,
|
||||
parsedArgs,
|
||||
);
|
||||
}),
|
||||
);
|
||||
setApprovalContexts(contexts);
|
||||
}
|
||||
|
||||
setStreaming(false);
|
||||
refreshDerived();
|
||||
return;
|
||||
} catch (_recoveryError) {
|
||||
// Fall through to normal error handling if recovery fails.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
|
||||
markIncompleteToolsAsCancelled(buffersRef.current);
|
||||
|
||||
@@ -1451,11 +1542,13 @@ export default function App({
|
||||
abortControllerRef.current = null;
|
||||
}
|
||||
},
|
||||
|
||||
[
|
||||
appendError,
|
||||
refreshDerived,
|
||||
refreshDerivedThrottled,
|
||||
setStreaming,
|
||||
agentId,
|
||||
currentModelId,
|
||||
],
|
||||
);
|
||||
|
||||
@@ -17,7 +17,7 @@ export type ApprovalRequest = {
|
||||
toolArgs: string;
|
||||
};
|
||||
|
||||
type DrainResult = {
|
||||
export type DrainResult = {
|
||||
stopReason: StopReasonType;
|
||||
lastRunId?: string | null;
|
||||
lastSeqId?: number | null;
|
||||
@@ -263,10 +263,14 @@ export async function drainStreamWithResume(
|
||||
try {
|
||||
const client = await getClient();
|
||||
// Resume from Redis where we left off
|
||||
const resumeStream = await client.runs.messages.stream(result.lastRunId, {
|
||||
starting_after: result.lastSeqId,
|
||||
batch_size: 1000, // Fetch buffered chunks quickly
|
||||
});
|
||||
const resumeStream = await client.runs.messages.stream(
|
||||
result.lastRunId,
|
||||
{
|
||||
starting_after: result.lastSeqId,
|
||||
batch_size: 1000, // Fetch buffered chunks quickly
|
||||
},
|
||||
{ maxRetries: 0 },
|
||||
);
|
||||
|
||||
// Continue draining from where we left off
|
||||
// Note: Don't pass onFirstMessage again - already called in initial drain
|
||||
|
||||
Reference in New Issue
Block a user