diff --git a/src/agent/message.ts b/src/agent/message.ts index 477a4be..47f5d7e 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -18,16 +18,11 @@ export async function sendMessageStream( background?: boolean; // add more later: includePings, request timeouts, etc. } = { streamTokens: true, background: true }, - requestOptions: { maxRetries?: number } = { maxRetries: 0 }, ): Promise> { const client = await getClient(); - return client.agents.messages.stream( - agentId, - { - messages: messages, - stream_tokens: opts.streamTokens ?? true, - background: opts.background ?? true, - }, - requestOptions, - ); + return client.agents.messages.stream(agentId, { + messages: messages, + stream_tokens: opts.streamTokens ?? true, + background: opts.background ?? true, + }); } diff --git a/src/agent/recover.ts b/src/agent/recover.ts deleted file mode 100644 index 0ad23d5..0000000 --- a/src/agent/recover.ts +++ /dev/null @@ -1,86 +0,0 @@ -import type Letta from "@letta-ai/letta-client"; -import type { AgentState } from "@letta-ai/letta-client/resources/agents/agents"; - -import type { createBuffers } from "../cli/helpers/accumulator"; -import type { ApprovalRequest, DrainResult } from "../cli/helpers/stream"; -import { drainStreamWithResume } from "../cli/helpers/stream"; -import { getResumeData } from "./check-approval"; - -export async function resyncPendingApprovals( - client: Letta, - agent: AgentState, -): Promise { - const { pendingApprovals } = await getResumeData(client, agent); - return pendingApprovals ?? []; -} - -export async function findNewestActiveBackgroundRunId( - client: Letta, - agentId: string, -): Promise { - const runsPage = await client.runs.list({ - active: true, - agent_id: agentId, - background: true, - limit: 10, - }); - - const runs = runsPage.items ?? []; - if (runs.length === 0) return null; - - // Prefer the most recently created run. - runs.sort((a, b) => { - const aTs = - Date.parse((a as { created_at?: string }).created_at ?? "") || 0; - const bTs = - Date.parse((b as { created_at?: string }).created_at ?? "") || 0; - return bTs - aTs; - }); - - return runs[0]?.id ?? null; -} - -export type StaleApprovalRecovery = - | { kind: "pending_approval"; approvals: ApprovalRequest[] } - | { kind: "relatched"; result: DrainResult } - | { kind: "noop" }; - -export async function recoverFromStaleApproval( - client: Letta, - agentId: string, - buffers: ReturnType, - refresh: () => void, - abortSignal: AbortSignal | undefined, - opts: { - lastKnownRunId?: string | null; - lastKnownSeqId?: number | null; - } = {}, -): Promise { - const agent = await client.agents.retrieve(agentId); - const approvals = await resyncPendingApprovals(client, agent); - if (approvals.length > 0) { - return { kind: "pending_approval", approvals }; - } - - const runId = - opts.lastKnownRunId ?? - (await findNewestActiveBackgroundRunId(client, agentId)); - if (!runId) return { kind: "noop" }; - - const stream = await client.runs.messages.stream( - runId, - { - starting_after: opts.lastKnownSeqId ?? undefined, - batch_size: 1000, - }, - { maxRetries: 0 }, - ); - - const result = await drainStreamWithResume( - stream, - buffers, - refresh, - abortSignal, - ); - return { kind: "relatched", result }; -} diff --git a/src/cli/App.tsx b/src/cli/App.tsx index 8321817..6f07d4b 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -1,7 +1,7 @@ // src/cli/App.tsx import { existsSync, readFileSync, writeFileSync } from "node:fs"; -import { APIError, APIUserAbortError } from "@letta-ai/letta-client/core/error"; +import { APIUserAbortError } from "@letta-ai/letta-client/core/error"; import type { AgentState, MessageCreate, @@ -20,10 +20,6 @@ import { getClient } from "../agent/client"; import { getCurrentAgentId, setCurrentAgentId } from "../agent/context"; import type { AgentProvenance } from "../agent/create"; import { sendMessageStream } from "../agent/message"; -import { - recoverFromStaleApproval, - resyncPendingApprovals, -} from "../agent/recover"; import { SessionStats } from "../agent/stats"; import type { ApprovalContext } from "../permissions/analyzer"; import { permissionMode } from "../permissions/mode"; @@ -845,9 +841,6 @@ export default function App({ initialInput: Array, ): Promise => { const currentInput = initialInput; - // Track lastRunId outside the while loop so it's available in catch block - let lastKnownRunId: string | null = null; - let lastKnownSeqId: number | null = null; try { // Check if user hit escape before we started @@ -931,28 +924,14 @@ export default function App({ } }; - const { - stopReason, - approval, - approvals, - apiDurationMs, - lastRunId, - lastSeqId, - } = await drainStreamWithResume( - stream, - buffersRef.current, - refreshDerivedThrottled, - abortControllerRef.current?.signal, - syncAgentState, - ); - - // Update lastKnownRunId for error handling in catch block - if (lastRunId) { - lastKnownRunId = lastRunId; - } - if (lastSeqId !== null && lastSeqId !== undefined) { - lastKnownSeqId = lastSeqId; - } + const { stopReason, approval, approvals, apiDurationMs, lastRunId } = + await drainStreamWithResume( + stream, + buffersRef.current, + refreshDerivedThrottled, + abortControllerRef.current?.signal, + syncAgentState, + ); // Track API duration sessionStatsRef.current.endTurn(apiDurationMs); @@ -1440,76 +1419,6 @@ export default function App({ return; } } catch (e) { - // Stale approval recovery: - // If the approval submission succeeded server-side but the client observed a stale state - // (or a retried request), the backend returns a 400 ValueError saying no approvals are pending. - // In that case, resync + relatch to the active run stream instead of erroring. - if ( - e instanceof APIError && - e.status === 400 && - typeof currentInput?.[0] === "object" && - currentInput[0] && - "type" in currentInput[0] && - currentInput[0].type === "approval" - ) { - const detail = - (e.error as Record)?.detail ?? - (e.error as Record>)?.error - ?.detail ?? - e.message ?? - ""; - if ( - typeof detail === "string" && - detail.includes( - "Cannot process approval response: No tool call is currently awaiting approval", - ) - ) { - try { - const client = await getClient(); - const recovery = await recoverFromStaleApproval( - client, - agentId, - buffersRef.current, - refreshDerivedThrottled, - abortControllerRef.current?.signal, - { lastKnownRunId, lastKnownSeqId }, - ); - - let approvalsToShow: ApprovalRequest[] = []; - if (recovery.kind === "pending_approval") { - approvalsToShow = recovery.approvals; - } else { - // After relatching, re-check in case a new approval request was produced. - const agent = await client.agents.retrieve(agentId); - approvalsToShow = await resyncPendingApprovals(client, agent); - } - - if (approvalsToShow.length > 0) { - setPendingApprovals(approvalsToShow); - const contexts = await Promise.all( - approvalsToShow.map(async (approvalItem) => { - const parsedArgs = safeJsonParseOr>( - approvalItem.toolArgs, - {}, - ); - return await analyzeToolApproval( - approvalItem.toolName, - parsedArgs, - ); - }), - ); - setApprovalContexts(contexts); - } - - setStreaming(false); - refreshDerived(); - return; - } catch (_recoveryError) { - // Fall through to normal error handling if recovery fails. - } - } - } - // Mark incomplete tool calls as cancelled to prevent stuck blinking UI markIncompleteToolsAsCancelled(buffersRef.current); @@ -1549,13 +1458,11 @@ export default function App({ abortControllerRef.current = null; } }, - [ appendError, refreshDerived, refreshDerivedThrottled, setStreaming, - agentId, currentModelId, ], ); diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 7abe414..d61a3d3 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -17,7 +17,7 @@ export type ApprovalRequest = { toolArgs: string; }; -export type DrainResult = { +type DrainResult = { stopReason: StopReasonType; lastRunId?: string | null; lastSeqId?: number | null; @@ -263,14 +263,10 @@ export async function drainStreamWithResume( try { const client = await getClient(); // Resume from Redis where we left off - const resumeStream = await client.runs.messages.stream( - result.lastRunId, - { - starting_after: result.lastSeqId, - batch_size: 1000, // Fetch buffered chunks quickly - }, - { maxRetries: 0 }, - ); + const resumeStream = await client.runs.messages.stream(result.lastRunId, { + starting_after: result.lastSeqId, + batch_size: 1000, // Fetch buffered chunks quickly + }); // Continue draining from where we left off // Note: Don't pass onFirstMessage again - already called in initial drain