fix: if no stop reason, attempt to resume from background mode (#56)
This commit is contained in:
@@ -57,7 +57,7 @@ import {
|
||||
clearPlaceholdersInText,
|
||||
} from "./helpers/pasteRegistry";
|
||||
import { safeJsonParseOr } from "./helpers/safeJsonParse";
|
||||
import { type ApprovalRequest, drainStream } from "./helpers/stream";
|
||||
import { type ApprovalRequest, drainStreamWithResume } from "./helpers/stream";
|
||||
import { getRandomThinkingMessage } from "./helpers/thinkingMessages";
|
||||
import { useTerminalWidth } from "./hooks/useTerminalWidth";
|
||||
|
||||
@@ -395,7 +395,7 @@ export default function App({
|
||||
// Stream one turn
|
||||
const stream = await sendMessageStream(agentId, currentInput);
|
||||
const { stopReason, approval, apiDurationMs, lastRunId } =
|
||||
await drainStream(
|
||||
await drainStreamWithResume(
|
||||
stream,
|
||||
buffersRef.current,
|
||||
refreshDerivedThrottled,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { Stream } from "@letta-ai/letta-client/core/streaming";
|
||||
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
|
||||
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
|
||||
import { getClient } from "../../agent/client";
|
||||
|
||||
import {
|
||||
type createBuffers,
|
||||
@@ -140,3 +141,64 @@ export async function drainStream(
|
||||
|
||||
return { stopReason, approval, lastRunId, lastSeqId, apiDurationMs };
|
||||
}
|
||||
|
||||
/**
|
||||
* Drain a stream with automatic resume on disconnect.
|
||||
*
|
||||
* If the stream ends without receiving a proper stop_reason chunk (indicating
|
||||
* an unexpected disconnect), this will automatically attempt to resume from
|
||||
* Redis using the last received run_id and seq_id.
|
||||
*
|
||||
* @param stream - Initial stream from agent.messages.stream()
|
||||
* @param buffers - Buffer to accumulate chunks
|
||||
* @param refresh - Callback to refresh UI
|
||||
* @param abortSignal - Optional abort signal for cancellation
|
||||
* @returns Result with stop_reason, approval info, and timing
|
||||
*/
|
||||
export async function drainStreamWithResume(
|
||||
stream: Stream<LettaStreamingResponse>,
|
||||
buffers: ReturnType<typeof createBuffers>,
|
||||
refresh: () => void,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<DrainResult> {
|
||||
const overallStartTime = performance.now();
|
||||
|
||||
// Attempt initial drain
|
||||
let result = await drainStream(stream, buffers, refresh, abortSignal);
|
||||
|
||||
// If stream ended without proper stop_reason and we have resume info, try once to reconnect
|
||||
if (
|
||||
result.stopReason === "error" &&
|
||||
result.lastRunId &&
|
||||
result.lastSeqId !== null &&
|
||||
!abortSignal?.aborted
|
||||
) {
|
||||
try {
|
||||
const client = await getClient();
|
||||
// Resume from Redis where we left off
|
||||
const resumeStream = await client.runs.messages.stream(result.lastRunId, {
|
||||
starting_after: result.lastSeqId,
|
||||
batch_size: 1000, // Fetch buffered chunks quickly
|
||||
});
|
||||
|
||||
// Continue draining from where we left off
|
||||
const resumeResult = await drainStream(
|
||||
resumeStream,
|
||||
buffers,
|
||||
refresh,
|
||||
abortSignal,
|
||||
);
|
||||
|
||||
// Use the resume result (should have proper stop_reason now)
|
||||
result = resumeResult;
|
||||
} catch (e) {
|
||||
// Resume failed - stick with the error stop_reason
|
||||
// The original error result will be returned
|
||||
}
|
||||
}
|
||||
|
||||
// Update duration to reflect total time (including resume attempt)
|
||||
result.apiDurationMs = performance.now() - overallStartTime;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user