diff --git a/src/agent/message.ts b/src/agent/message.ts index 347c23d..87666af 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -17,6 +17,13 @@ import { getClient } from "./client"; const streamRequestStartTimes = new WeakMap(); const streamToolContextIds = new WeakMap(); +export type StreamRequestContext = { + conversationId: string; + resolvedConversationId: string; + agentId: string | null; + requestStartedAtMs: number; +}; +const streamRequestContexts = new WeakMap(); export function getStreamRequestStartTime( stream: Stream, @@ -30,6 +37,12 @@ export function getStreamToolContextId( return streamToolContextIds.get(stream as object) ?? null; } +export function getStreamRequestContext( + stream: Stream, +): StreamRequestContext | undefined { + return streamRequestContexts.get(stream as object); +} + /** * Send a message to a conversation and return a streaming response. * Uses the conversations API for all conversations. @@ -52,6 +65,7 @@ export async function sendMessageStream( requestOptions: { maxRetries?: number } = { maxRetries: 0 }, ): Promise> { const requestStartTime = isTimingsEnabled() ? performance.now() : undefined; + const requestStartedAtMs = Date.now(); const client = await getClient(); // Wait for any in-progress toolset switch to complete before reading tools @@ -93,6 +107,12 @@ export async function sendMessageStream( streamRequestStartTimes.set(stream as object, requestStartTime); } streamToolContextIds.set(stream as object, contextId); + streamRequestContexts.set(stream as object, { + conversationId, + resolvedConversationId, + agentId: opts.agentId ?? null, + requestStartedAtMs, + }); return stream; } diff --git a/src/cli/helpers/stream.ts b/src/cli/helpers/stream.ts index 1c957b0..6bf50fc 100644 --- a/src/cli/helpers/stream.ts +++ b/src/cli/helpers/stream.ts @@ -1,13 +1,20 @@ import { APIError } from "@letta-ai/letta-client/core/error"; import type { Stream } from "@letta-ai/letta-client/core/streaming"; -import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages"; +import type { + LettaStreamingResponse, + Run, +} from "@letta-ai/letta-client/resources/agents/messages"; import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs"; import { clearLastSDKDiagnostic, consumeLastSDKDiagnostic, getClient, } from "../../agent/client"; -import { getStreamRequestStartTime } from "../../agent/message"; +import { + getStreamRequestContext, + getStreamRequestStartTime, + type StreamRequestContext, +} from "../../agent/message"; import { telemetry } from "../../telemetry"; import { debugWarn } from "../../utils/debug"; import { formatDuration, logTiming } from "../../utils/timing"; @@ -60,6 +67,143 @@ type DrainResult = { fallbackError?: string | null; // Error message for when we can't fetch details from server (no run_id) }; +type RunsListResponse = + | Run[] + | { + getPaginatedItems?: () => Run[]; + }; + +type RunsListClient = { + runs: { + list: (query: { + conversation_id?: string | null; + agent_id?: string | null; + statuses?: string[] | null; + order?: string | null; + limit?: number | null; + }) => Promise; + }; +}; + +const FALLBACK_RUN_DISCOVERY_TIMEOUT_MS = 5000; + +function hasPaginatedItems( + response: RunsListResponse, +): response is { getPaginatedItems: () => Run[] } { + return ( + !Array.isArray(response) && typeof response.getPaginatedItems === "function" + ); +} + +function parseRunCreatedAtMs(run: Run): number { + if (!run.created_at) return 0; + const parsed = Date.parse(run.created_at); + return Number.isFinite(parsed) ? parsed : 0; +} + +async function discoverFallbackRunIdWithTimeout( + client: RunsListClient, + ctx: StreamRequestContext, +): Promise { + return withTimeout( + discoverFallbackRunIdForResume(client, ctx), + FALLBACK_RUN_DISCOVERY_TIMEOUT_MS, + `Fallback run discovery timed out after ${FALLBACK_RUN_DISCOVERY_TIMEOUT_MS}ms`, + ); +} + +function withTimeout( + promise: Promise, + timeoutMs: number, + timeoutMessage: string, +): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error(timeoutMessage)), + timeoutMs, + ); + promise.then( + (value) => { + clearTimeout(timer); + resolve(value); + }, + (error) => { + clearTimeout(timer); + reject(error); + }, + ); + }); +} + +function toRunsArray(listResponse: RunsListResponse): Run[] { + if (Array.isArray(listResponse)) return listResponse; + if (hasPaginatedItems(listResponse)) { + return listResponse.getPaginatedItems() ?? []; + } + return []; +} + +/** + * Attempt to discover a run ID to resume when the initial stream failed before + * any run_id-bearing chunk arrived. + */ +export async function discoverFallbackRunIdForResume( + client: RunsListClient, + ctx: StreamRequestContext, +): Promise { + const statuses = ["running"]; + const requestStartedAtMs = ctx.requestStartedAtMs; + + const listCandidates = async (query: { + conversation_id?: string | null; + agent_id?: string | null; + }): Promise => { + const response = await client.runs.list({ + ...query, + statuses, + order: "desc", + limit: 1, + }); + return toRunsArray(response).filter((run) => { + if (!run.id) return false; + if (run.status !== "running") return false; + // Best-effort temporal filter: only consider runs created after + // this send request started. In rare concurrent-send races within + // the same conversation, this heuristic can still pick a neighbor run. + return parseRunCreatedAtMs(run) >= requestStartedAtMs; + }); + }; + + const lookupQueries: Array<{ + conversation_id?: string | null; + agent_id?: string | null; + }> = []; + + if (ctx.conversationId === "default") { + // Default conversation routes through resolvedConversationId (typically agent ID). + lookupQueries.push({ conversation_id: ctx.resolvedConversationId }); + } else { + // Named conversation: first use the explicit conversation id. + lookupQueries.push({ conversation_id: ctx.conversationId }); + + // Keep resolved route as backup only when it differs. + if (ctx.resolvedConversationId !== ctx.conversationId) { + lookupQueries.push({ conversation_id: ctx.resolvedConversationId }); + } + } + + if (ctx.agentId) { + lookupQueries.push({ agent_id: ctx.agentId }); + } + + for (const query of lookupQueries) { + const candidates = await listCandidates(query); + if (candidates[0]?.id) return candidates[0].id; + } + + return null; +} + export async function drainStream( stream: Stream, buffers: ReturnType, @@ -346,6 +490,15 @@ export async function drainStreamWithResume( contextTracker?: ContextTracker, ): Promise { const overallStartTime = performance.now(); + const streamRequestContext = getStreamRequestContext(stream); + + let _client: Awaited> | undefined; + const lazyClient = async () => { + if (!_client) { + _client = await getClient(); + } + return _client; + }; // Attempt initial drain let result = await drainStream( @@ -358,12 +511,51 @@ export async function drainStreamWithResume( contextTracker, ); + let runIdToResume = result.lastRunId ?? null; + + // If the stream failed before exposing run_id, try to discover the latest + // running/created run for this conversation that was created after send start. + if ( + result.stopReason === "error" && + !runIdToResume && + streamRequestContext && + abortSignal && + !abortSignal.aborted + ) { + try { + const client = await lazyClient(); + runIdToResume = await discoverFallbackRunIdWithTimeout( + client, + streamRequestContext, + ); + if (runIdToResume) { + result.lastRunId = runIdToResume; + } + } catch (lookupError) { + const lookupErrorMsg = + lookupError instanceof Error + ? lookupError.message + : String(lookupError); + telemetry.trackError( + "stream_resume_lookup_failed", + lookupErrorMsg, + "stream_resume", + ); + + debugWarn( + "drainStreamWithResume", + "Fallback run_id lookup failed:", + lookupError, + ); + } + } + // If stream ended without proper stop_reason and we have resume info, try once to reconnect // Only resume if we have an abortSignal AND it's not aborted (explicit check prevents // undefined abortSignal from accidentally allowing resume after user cancellation) if ( result.stopReason === "error" && - result.lastRunId && + runIdToResume && abortSignal && !abortSignal.aborted ) { @@ -378,12 +570,12 @@ export async function drainStreamWithResume( originalFallbackError || "Stream error (no client-side detail)", "stream_resume", { - runId: result.lastRunId, + runId: result.lastRunId ?? undefined, }, ); try { - const client = await getClient(); + const client = await lazyClient(); // Reset interrupted flag so resumed chunks can be processed by onChunk. // Without this, tool_return_message for server-side tools (web_search, fetch_webpage) @@ -397,7 +589,7 @@ export async function drainStreamWithResume( // TODO: Re-enable once issues are resolved - disabled retries were causing problems // Disable SDK retries - state management happens outside, retries would create race conditions const resumeStream = await client.runs.messages.stream( - result.lastRunId, + runIdToResume, { // If lastSeqId is null the stream failed before any seq_id-bearing // chunk arrived; use 0 to replay the run from the beginning. diff --git a/src/tests/cli/stream-resume-fallback.test.ts b/src/tests/cli/stream-resume-fallback.test.ts new file mode 100644 index 0000000..20791de --- /dev/null +++ b/src/tests/cli/stream-resume-fallback.test.ts @@ -0,0 +1,148 @@ +import { describe, expect, test } from "bun:test"; +import type { Run } from "@letta-ai/letta-client/resources/agents/messages"; +import { discoverFallbackRunIdForResume } from "../../cli/helpers/stream"; + +type RunsListClient = { + runs: { + list: (query: { + conversation_id?: string | null; + agent_id?: string | null; + statuses?: string[] | null; + order?: string | null; + limit?: number | null; + }) => Promise Run[] }>; + }; +}; + +function makeRunsListClient( + runsList: RunsListClient["runs"]["list"], +): RunsListClient { + return { runs: { list: runsList } }; +} + +function run(id: string, createdAt: string): Run { + return { + id, + agent_id: "agent-test", + created_at: createdAt, + status: "running", + }; +} + +describe("discoverFallbackRunIdForResume", () => { + test("returns the latest conversation-scoped running run after request start", async () => { + const runsList = async (query: { + conversation_id?: string | null; + agent_id?: string | null; + }): Promise => { + if (query.conversation_id === "conv-123") { + expect(query).toMatchObject({ + statuses: ["running"], + order: "desc", + limit: 1, + }); + return [run("run-new", "2026-02-27T10:01:10.000Z")]; + } + return []; + }; + + const candidate = await discoverFallbackRunIdForResume( + makeRunsListClient(runsList), + { + conversationId: "conv-123", + resolvedConversationId: "conv-123", + agentId: "agent-test", + requestStartedAtMs: Date.parse("2026-02-27T10:01:00.000Z"), + }, + ); + + expect(candidate).toBe("run-new"); + }); + + test("for default conversation falls back to agent lookup when conversation lookup misses", async () => { + const calls: Array<{ + conversation_id?: string | null; + agent_id?: string | null; + }> = []; + + const runsList = async (query: { + conversation_id?: string | null; + agent_id?: string | null; + }): Promise => { + calls.push({ + conversation_id: query.conversation_id, + agent_id: query.agent_id, + }); + + if (query.agent_id === "agent-test") { + return [run("run-agent-fallback", "2026-02-27T11:00:05.000Z")]; + } + + return []; + }; + + const candidate = await discoverFallbackRunIdForResume( + makeRunsListClient(runsList), + { + conversationId: "default", + resolvedConversationId: "agent-test", + agentId: "agent-test", + requestStartedAtMs: Date.parse("2026-02-27T11:00:00.000Z"), + }, + ); + + expect(candidate).toBe("run-agent-fallback"); + expect(calls).toEqual([ + { conversation_id: "agent-test", agent_id: undefined }, + { conversation_id: undefined, agent_id: "agent-test" }, + ]); + }); + + test("returns null when latest running run is older than request start", async () => { + const runsList = async (): Promise => [ + run("run-old-1", "2026-02-27T09:59:58.000Z"), + run("run-old-2", "2026-02-27T09:59:59.000Z"), + ]; + + const candidate = await discoverFallbackRunIdForResume( + makeRunsListClient(runsList), + { + conversationId: "conv-abc", + resolvedConversationId: "conv-abc", + agentId: "agent-test", + requestStartedAtMs: Date.parse("2026-02-27T10:00:00.000Z"), + }, + ); + + expect(candidate).toBeNull(); + }); + + test("ignores created runs when selecting fallback resume run", async () => { + const runsList = async (query: { + conversation_id?: string | null; + agent_id?: string | null; + }): Promise => { + expect(query).toMatchObject({ statuses: ["running"], limit: 1 }); + return [ + { + id: "run-created", + agent_id: "agent-test", + created_at: "2026-02-27T12:00:01.000Z", + status: "created", + }, + ]; + }; + + const candidate = await discoverFallbackRunIdForResume( + makeRunsListClient(runsList), + { + conversationId: "conv-created", + resolvedConversationId: "conv-created", + agentId: "agent-test", + requestStartedAtMs: Date.parse("2026-02-27T12:00:00.000Z"), + }, + ); + + expect(candidate).toBeNull(); + }); +});