diff --git a/src/agent/message.ts b/src/agent/message.ts index 47f5d7e..477a4be 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -18,11 +18,16 @@ export async function sendMessageStream( background?: boolean; // add more later: includePings, request timeouts, etc. } = { streamTokens: true, background: true }, + requestOptions: { maxRetries?: number } = { maxRetries: 0 }, ): Promise> { const client = await getClient(); - return client.agents.messages.stream(agentId, { - messages: messages, - stream_tokens: opts.streamTokens ?? true, - background: opts.background ?? true, - }); + return client.agents.messages.stream( + agentId, + { + messages: messages, + stream_tokens: opts.streamTokens ?? true, + background: opts.background ?? true, + }, + requestOptions, + ); } diff --git a/src/agent/recover.ts b/src/agent/recover.ts new file mode 100644 index 0000000..0ad23d5 --- /dev/null +++ b/src/agent/recover.ts @@ -0,0 +1,86 @@ +import type Letta from "@letta-ai/letta-client"; +import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents"; + +import type { createBuffers } from "../cli/helpers/accumulator"; +import type { ApprovalRequest, DrainResult } from "../cli/helpers/stream"; +import { drainStreamWithResume } from "../cli/helpers/stream"; +import { getResumeData } from "./check-approval"; + +export async function resyncPendingApprovals( + client: Letta, + agent: AgentState, +): Promise { + const { pendingApprovals } = await getResumeData(client, agent); + return pendingApprovals ?? []; +} + +export async function findNewestActiveBackgroundRunId( + client: Letta, + agentId: string, +): Promise { + const runsPage = await client.runs.list({ + active: true, + agent_id: agentId, + background: true, + limit: 10, + }); + + const runs = runsPage.items ?? []; + if (runs.length === 0) return null; + + // Prefer the most recently created run. + runs.sort((a, b) => { + const aTs = + Date.parse((a as { created_at?: string }).created_at ?? "") || 0; + const bTs = + Date.parse((b as { created_at?: string }).created_at ?? "") || 0; + return bTs - aTs; + }); + + return runs[0]?.id ?? null; +} + +export type StaleApprovalRecovery = + | { kind: "pending_approval"; approvals: ApprovalRequest[] } + | { kind: "relatched"; result: DrainResult } + | { kind: "noop" }; + +export async function recoverFromStaleApproval( + client: Letta, + agentId: string, + buffers: ReturnType, + refresh: () => void, + abortSignal: AbortSignal | undefined, + opts: { + lastKnownRunId?: string | null; + lastKnownSeqId?: number | null; + } = {}, +): Promise { + const agent = await client.agents.retrieve(agentId); + const approvals = await resyncPendingApprovals(client, agent); + if (approvals.length > 0) { + return { kind: "pending_approval", approvals }; + } + + const runId = + opts.lastKnownRunId ?? + (await findNewestActiveBackgroundRunId(client, agentId)); + if (!runId) return { kind: "noop" }; + + const stream = await client.runs.messages.stream( + runId, + { + starting_after: opts.lastKnownSeqId ?? undefined, + batch_size: 1000, + }, + { maxRetries: 0 }, + ); + + const result = await drainStreamWithResume( + stream, + buffers, + refresh, + abortSignal, + ); + return { kind: "relatched", result }; +} diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 70146ec..a9ddd3a 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -1,7 +1,7 @@ // src/cli/App.tsx import { existsSync, readFileSync, writeFileSync } from "node:fs"; -import { APIUserAbortError } from "@letta-ai/letta-client/core/error"; +import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error"; import type { AgentState, MessageCreate, @@ -20,6 +20,10 @@ import { getClient } from "../agent/client"; import { getCurrentAgentId, setCurrentAgentId } from "../agent/context"; import type { AgentProvenance } from "../agent/create"; import { sendMessageStream } from "../agent/message"; +import { + recoverFromStaleApproval, + resyncPendingApprovals, +} from "../agent/recover"; import { SessionStats } from "../agent/stats"; import type { ApprovalContext } from "../permissions/analyzer"; import { permissionMode } from "../permissions/mode"; @@ -834,6 +838,9 @@ export default function App({ initialInput: Array, ): Promise => { const currentInput = initialInput; + // Track lastRunId outside the while loop so it's available in catch block + let lastKnownRunId: string | null = null; + let lastKnownSeqId: number | null = null; try { // Check if user hit escape before we started @@ -917,14 +924,28 @@ export default function App({ } }; - const { stopReason, approval, approvals, apiDurationMs, lastRunId } = - await drainStreamWithResume( - stream, - buffersRef.current, - refreshDerivedThrottled, - abortControllerRef.current?.signal, - syncAgentState, - ); + const { + stopReason, + approval, + approvals, + apiDurationMs, + lastRunId, + lastSeqId, + } = await drainStreamWithResume( + stream, + buffersRef.current, + refreshDerivedThrottled, + abortControllerRef.current?.signal, + syncAgentState, + ); + + // Update lastKnownRunId for error handling in catch block + if (lastRunId) { + lastKnownRunId = lastRunId; + } + if (lastSeqId !== null && lastSeqId !== undefined) { + lastKnownSeqId = lastSeqId; + } // Track API duration sessionStatsRef.current.endTurn(apiDurationMs); @@ -1412,6 +1433,76 @@ export default function App({ return; } } catch (e) { + // Stale approval recovery: + // If the approval submission succeeded server-side but the client observed a stale state + // (or a retried request), the backend returns a 400 ValueError saying no approvals are pending. + // In that case, resync + relatch to the active run stream instead of erroring. + if ( + e instanceof APIError && + e.status === 400 && + typeof currentInput?.[0] === "object" && + currentInput[0] && + "type" in currentInput[0] && + currentInput[0].type === "approval" + ) { + const detail = + (e.error as Record)?.detail ?? + (e.error as Record>)?.error + ?.detail ?? + e.message ?? + ""; + if ( + typeof detail === "string" && + detail.includes( + "Cannot process approval response: No tool call is currently awaiting approval", + ) + ) { + try { + const client = await getClient(); + const recovery = await recoverFromStaleApproval( + client, + agentId, + buffersRef.current, + refreshDerivedThrottled, + abortControllerRef.current?.signal, + { lastKnownRunId, lastKnownSeqId }, + ); + + let approvalsToShow: ApprovalRequest[] = []; + if (recovery.kind === "pending_approval") { + approvalsToShow = recovery.approvals; + } else { + // After relatching, re-check in case a new approval request was produced. + const agent = await client.agents.retrieve(agentId); + approvalsToShow = await resyncPendingApprovals(client, agent); + } + + if (approvalsToShow.length > 0) { + setPendingApprovals(approvalsToShow); + const contexts = await Promise.all( + approvalsToShow.map(async (approvalItem) => { + const parsedArgs = safeJsonParseOr>( + approvalItem.toolArgs, + {}, + ); + return await analyzeToolApproval( + approvalItem.toolName, + parsedArgs, + ); + }), + ); + setApprovalContexts(contexts); + } + + setStreaming(false); + refreshDerived(); + return; + } catch (_recoveryError) { + // Fall through to normal error handling if recovery fails. + } + } + } + // Mark incomplete tool calls as cancelled to prevent stuck blinking UI markIncompleteToolsAsCancelled(buffersRef.current); @@ -1451,11 +1542,13 @@ export default function App({ abortControllerRef.current = null; } }, + [ appendError, refreshDerived, refreshDerivedThrottled, setStreaming, + agentId, currentModelId, ], ); diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index d61a3d3..7abe414 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -17,7 +17,7 @@ export type ApprovalRequest = { toolArgs: string; }; -type DrainResult = { +export type DrainResult = { stopReason: StopReasonType; lastRunId?: string | null; lastSeqId?: number | null; @@ -263,10 +263,14 @@ export async function drainStreamWithResume( try { const client = await getClient(); // Resume from Redis where we left off - const resumeStream = await client.runs.messages.stream(result.lastRunId, { - starting_after: result.lastSeqId, - batch_size: 1000, // Fetch buffered chunks quickly - }); + const resumeStream = await client.runs.messages.stream( + result.lastRunId, + { + starting_after: result.lastSeqId, + batch_size: 1000, // Fetch buffered chunks quickly + }, + { maxRetries: 0 }, + ); // Continue draining from where we left off // Note: Don't pass onFirstMessage again - already called in initial drain