From 42eb671bf4a32bffb79de1ba119878266f3b603e Mon Sep 17 00:00:00 2001 From: Charles Packer Date: Tue, 4 Nov 2025 11:20:58 -0800 Subject: [PATCH] fix: if no stop reason, attempt to resume from background mode (#56) --- bun.lock | 4 +-- package.json | 2 +- src/agent/check-approval.ts | 2 +- src/cli/App.tsx | 4 +-- src/cli/helpers/stream.ts | 62 ++++++++++++++++++++++++++++++++++++ src/headless.ts | 10 +++--- src/index.ts | 6 ++-- src/tools/toolDefinitions.ts | 26 +++++++-------- tsconfig.json | 3 +- 9 files changed, 91 insertions(+), 28 deletions(-) diff --git a/bun.lock b/bun.lock index e249908..25b6f61 100644 --- a/bun.lock +++ b/bun.lock @@ -4,7 +4,7 @@ "": { "name": "@letta-ai/letta-code", "dependencies": { - "@letta-ai/letta-client": "1.0.0-alpha.10", + "@letta-ai/letta-client": "1.0.0-alpha.14", "ink-link": "^5.0.0", "open": "^10.2.0", }, @@ -33,7 +33,7 @@ "@isaacs/brace-expansion": ["@isaacs/brace-expansion@5.0.0", "", { "dependencies": { "@isaacs/balanced-match": "^4.0.1" } }, "sha512-ZT55BDLV0yv0RBm2czMiZ+SqCGO7AvmOM3G/w2xhVPH+te0aKgFjmBvGlL1dH+ql2tgGO3MVrbb3jCKyvpgnxA=="], - "@letta-ai/letta-client": ["@letta-ai/letta-client@1.0.0-alpha.10", "", {}, "sha512-UV9b5rbkEPX2+7Dp0gD3Hk8KHbubz6K3GoAF1u813UZCguJcQO2azXNmjyvinQSPhKxlB/WYT/79toB4ElljSw=="], + "@letta-ai/letta-client": ["@letta-ai/letta-client@1.0.0-alpha.14", "", {}, "sha512-p5k1j2UyQmVnSN5TtAvFi8LszYTwH6yUMpPRv9BfvrsCY7s+ifSH735vF5Yi9ecMfYnhVJZhAsnr5Bq6/crLUw=="], "@types/bun": ["@types/bun@1.3.1", "", { "dependencies": { "bun-types": "1.3.1" } }, "sha512-4jNMk2/K9YJtfqwoAa28c8wK+T7nvJFOjxI4h/7sORWcypRNxBpr+TPNaCfVWq70tLCJsqoFwcf0oI0JU/fvMQ=="], diff --git a/package.json b/package.json index ecf4657..1c8708d 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "access": "public" }, "dependencies": { - "@letta-ai/letta-client": "1.0.0-alpha.10", + "@letta-ai/letta-client": "1.0.0-alpha.14", "ink-link": "^5.0.0", "open": "^10.2.0" }, diff --git a/src/agent/check-approval.ts b/src/agent/check-approval.ts index 4fe02e8..2c72b36 100644 --- a/src/agent/check-approval.ts +++ b/src/agent/check-approval.ts @@ -70,7 +70,7 @@ export async function getResumeData( (msg) => msg.message_type === "approval_request_message", ); const inContextMessage = - approvalMessage ?? matchingMessages[matchingMessages.length - 1]; + approvalMessage ?? matchingMessages[matchingMessages.length - 1]!; messageToCheck = inContextMessage; } else { diff --git a/src/cli/App.tsx b/src/cli/App.tsx index bbb8a67..95b3969 100644 --- a/src/cli/App.tsx +++ b/src/cli/App.tsx @@ -57,7 +57,7 @@ import { clearPlaceholdersInText, } from "./helpers/pasteRegistry"; import { safeJsonParseOr } from "./helpers/safeJsonParse"; -import { type ApprovalRequest, drainStream } from "./helpers/stream"; +import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream"; import { getRandomThinkingMessage } from "./helpers/thinkingMessages"; import { useTerminalWidth } from "./hooks/useTerminalWidth"; @@ -395,7 +395,7 @@ export default function App({ // Stream one turn const stream = await sendMessageStream(agentId, currentInput); const { stopReason, approval, apiDurationMs, lastRunId } = - await drainStream( + await drainStreamWithResume( stream, buffersRef.current, refreshDerivedThrottled, diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 856afa6..0e9e0d8 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -1,6 +1,7 @@ import type { Stream } from "@letta-ai/letta-client/core/streaming"; import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; +import { getClient } from "../../agent/client"; import { type createBuffers, @@ -140,3 +141,64 @@ export async function drainStream( return { stopReason, approval, lastRunId, lastSeqId, apiDurationMs }; } + +/** + * Drain a stream with automatic resume on disconnect. + * + * If the stream ends without receiving a proper stop_reason chunk (indicating + * an unexpected disconnect), this will automatically attempt to resume from + * Redis using the last received run_id and seq_id. + * + * @param stream - Initial stream from agent.messages.stream() + * @param buffers - Buffer to accumulate chunks + * @param refresh - Callback to refresh UI + * @param abortSignal - Optional abort signal for cancellation + * @returns Result with stop_reason, approval info, and timing + */ +export async function drainStreamWithResume( + stream: Stream, + buffers: ReturnType, + refresh: () => void, + abortSignal?: AbortSignal, +): Promise { + const overallStartTime = performance.now(); + + // Attempt initial drain + let result = await drainStream(stream, buffers, refresh, abortSignal); + + // If stream ended without proper stop_reason and we have resume info, try once to reconnect + if ( + result.stopReason === "error" && + result.lastRunId && + result.lastSeqId !== null && + !abortSignal?.aborted + ) { + try { + const client = await getClient(); + // Resume from Redis where we left off + const resumeStream = await client.runs.messages.stream(result.lastRunId, { + starting_after: result.lastSeqId, + batch_size: 1000, // Fetch buffered chunks quickly + }); + + // Continue draining from where we left off + const resumeResult = await drainStream( + resumeStream, + buffers, + refresh, + abortSignal, + ); + + // Use the resume result (should have proper stop_reason now) + result = resumeResult; + } catch (e) { + // Resume failed - stick with the error stop_reason + // The original error result will be returned + } + } + + // Update duration to reflect total time (including resume attempt) + result.apiDurationMs = performance.now() - overallStartTime; + + return result; +} diff --git a/src/headless.ts b/src/headless.ts index 774bb25..1ec55b3 100644 --- a/src/headless.ts +++ b/src/headless.ts @@ -13,7 +13,7 @@ import { getModelUpdateArgs } from "./agent/model"; import { SessionStats } from "./agent/stats"; import { createBuffers, toLines } from "./cli/helpers/accumulator"; import { safeJsonParseOr } from "./cli/helpers/safeJsonParse"; -import { drainStream } from "./cli/helpers/stream"; +import { drainStreamWithResume } from "./cli/helpers/stream"; import { settingsManager } from "./settings-manager"; import { checkToolPermission, executeTool } from "./tools/manager"; @@ -142,7 +142,7 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { const initEvent = { type: "init", agent_id: agent.id, - model: agent.llmConfig?.model, + model: agent.llm_config?.model, tools: agent.tools?.map((t) => t.name) || [], }; console.log(JSON.stringify(initEvent)); @@ -228,7 +228,7 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { // no-op } } else { - await drainStream(approvalStream, createBuffers(), () => {}); + await drainStreamWithResume(approvalStream, createBuffers(), () => {}); } } }; @@ -453,8 +453,8 @@ export async function handleHeadlessCommand(argv: string[], model?: string) { ); markCurrentLineAsFinished(buffers); } else { - // Normal mode: use drainStream - const result = await drainStream( + // Normal mode: use drainStreamWithResume + const result = await drainStreamWithResume( stream, buffers, () => {}, // No UI refresh needed in headless mode diff --git a/src/index.ts b/src/index.ts index 28002e8..1395b60 100755 --- a/src/index.ts +++ b/src/index.ts @@ -159,7 +159,7 @@ async function main() { // Validate credentials by checking health endpoint const { validateCredentials } = await import("./auth/oauth"); - const isValid = await validateCredentials(baseURL, apiKey); + const isValid = await validateCredentials(baseURL, apiKey ?? ""); if (!isValid) { // For headless mode, error out with helpful message @@ -266,7 +266,7 @@ async function main() { "assembling" | "upserting" | "initializing" | "checking" | "ready" >("assembling"); const [agentId, setAgentId] = useState(null); - const [agentState, setAgentState] = useState(null); + const [agentState, setAgentState] = useState(null); const [resumeData, setResumeData] = useState(null); const [isResumingSession, setIsResumingSession] = useState(false); @@ -365,7 +365,7 @@ async function main() { !forceNew && localProjectSettings?.lastAgent && agent.id === localProjectSettings.lastAgent; - const resuming = continueSession || !!agentIdArg || isResumingProject; + const resuming = !!(continueSession || agentIdArg || isResumingProject); setIsResumingSession(resuming); // Get resume data (pending approval + message history) if resuming diff --git a/src/tools/toolDefinitions.ts b/src/tools/toolDefinitions.ts index fe6da41..fd241b2 100644 --- a/src/tools/toolDefinitions.ts +++ b/src/tools/toolDefinitions.ts @@ -35,7 +35,7 @@ import ReadSchema from "./schemas/Read.json"; import TodoWriteSchema from "./schemas/TodoWrite.json"; import WriteSchema from "./schemas/Write.json"; -type ToolImplementation = (args: Record) => Promise; +type ToolImplementation = (args: Record) => Promise; interface ToolAssets { schema: Record; @@ -47,62 +47,62 @@ const toolDefinitions = { Bash: { schema: BashSchema, description: BashDescription.trim(), - impl: bash, + impl: bash as ToolImplementation, }, BashOutput: { schema: BashOutputSchema, description: BashOutputDescription.trim(), - impl: bash_output, + impl: bash_output as ToolImplementation, }, Edit: { schema: EditSchema, description: EditDescription.trim(), - impl: edit, + impl: edit as ToolImplementation, }, ExitPlanMode: { schema: ExitPlanModeSchema, description: ExitPlanModeDescription.trim(), - impl: exit_plan_mode, + impl: exit_plan_mode as ToolImplementation, }, Glob: { schema: GlobSchema, description: GlobDescription.trim(), - impl: glob, + impl: glob as ToolImplementation, }, Grep: { schema: GrepSchema, description: GrepDescription.trim(), - impl: grep, + impl: grep as ToolImplementation, }, KillBash: { schema: KillBashSchema, description: KillBashDescription.trim(), - impl: kill_bash, + impl: kill_bash as ToolImplementation, }, LS: { schema: LSSchema, description: LSDescription.trim(), - impl: ls, + impl: ls as ToolImplementation, }, MultiEdit: { schema: MultiEditSchema, description: MultiEditDescription.trim(), - impl: multi_edit, + impl: multi_edit as ToolImplementation, }, Read: { schema: ReadSchema, description: ReadDescription.trim(), - impl: read, + impl: read as ToolImplementation, }, TodoWrite: { schema: TodoWriteSchema, description: TodoWriteDescription.trim(), - impl: todo_write, + impl: todo_write as ToolImplementation, }, Write: { schema: WriteSchema, description: WriteDescription.trim(), - impl: write, + impl: write as ToolImplementation, }, } as const satisfies Record; diff --git a/tsconfig.json b/tsconfig.json index 4cad888..b2f40f9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -26,5 +26,6 @@ "noUnusedLocals": false, "noUnusedParameters": false, "noPropertyAccessFromIndexSignature": false - } + }, + "include": ["src/**/*"] }