From e40a8d61eed7266b7b9b94210673984e3aa835de Mon Sep 17 00:00:00 2001 From: Cameron Date: Mon, 2 Mar 2026 13:46:29 -0800 Subject: [PATCH] fix: handle stale duplicate run results and background session reuse (#458) --- src/core/bot.ts | 210 +++++++++++++++++++------- src/core/result-guard.test.ts | 107 +++++++++++++ src/core/sdk-session-contract.test.ts | 82 ++++++++++ src/core/types.ts | 1 + 4 files changed, 345 insertions(+), 55 deletions(-) create mode 100644 src/core/result-guard.test.ts diff --git a/src/core/bot.ts b/src/core/bot.ts index ba487de..d78beb9 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -202,6 +202,10 @@ export class LettaBot implements AgentSession { private processingKeys: Set = new Set(); // Per-key locks for per-channel mode private cancelledKeys: Set = new Set(); // Tracks keys where /cancel was issued private sendSequence = 0; // Monotonic counter for desync diagnostics + // Forward-looking: stale-result detection via runIds becomes active once the + // SDK surfaces non-empty result run_ids. Until then, this map mostly stays + // empty and the streamed/result divergence guard remains the active defense. + private lastResultRunFingerprints: Map = new Map(); // AskUserQuestion support: resolves when the next user message arrives. // In per-chat mode, keyed by convKey so each chat resolves independently. @@ -234,6 +238,9 @@ export class LettaBot implements AgentSession { this.config = config; mkdirSync(config.workingDir, { recursive: true }); this.store = new Store('lettabot-agent.json', config.agentName); + if (config.reuseSession === false) { + log.warn('Session reuse disabled (conversations.reuseSession=false): each foreground/background message uses a fresh SDK subprocess (~5s overhead per turn).'); + } if (config.conversationOverrides?.length) { this.conversationOverrides = new Set(config.conversationOverrides.map((ch) => ch.toLowerCase())); } @@ -253,6 +260,38 @@ export class LettaBot implements AgentSession { return `${this.config.displayName}: ${text}`; } + private normalizeResultRunIds(msg: StreamMsg): string[] { + // Forward-looking compatibility: + // - Current SDK releases often emit result.run_ids as null/undefined. + // - When runIds are absent, caller gets [] and falls back to streamed vs + // result text comparison (which works with today's wire payloads). + const rawRunIds = (msg as StreamMsg & { runIds?: unknown; run_ids?: unknown }).runIds + ?? (msg as StreamMsg & { run_ids?: unknown }).run_ids; + if (!Array.isArray(rawRunIds)) return []; + + const runIds = rawRunIds.filter((id): id is string => + typeof id === 'string' && id.trim().length > 0 + ); + if (runIds.length === 0) return []; + + return [...new Set(runIds)].sort(); + } + + private classifyResultRun(convKey: string, msg: StreamMsg): 'fresh' | 'stale' | 'unknown' { + const runIds = this.normalizeResultRunIds(msg); + if (runIds.length === 0) return 'unknown'; + + const fingerprint = runIds.join(','); + const previous = this.lastResultRunFingerprints.get(convKey); + if (previous === fingerprint) { + log.warn(`Detected stale duplicate result (key=${convKey}, runIds=${fingerprint})`); + return 'stale'; + } + + this.lastResultRunFingerprints.set(convKey, fingerprint); + return 'fresh'; + } + // ========================================================================= // Session options (shared by processMessage and sendToAgent) // ========================================================================= @@ -773,6 +812,7 @@ export class LettaBot implements AgentSession { this.sessionLastUsed.delete(oldestKey); this.sessionGenerations.delete(oldestKey); this.sessionCreationLocks.delete(oldestKey); + this.lastResultRunFingerprints.delete(oldestKey); } else { // All existing sessions are active; allow temporary overflow. log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`); @@ -808,6 +848,7 @@ export class LettaBot implements AgentSession { this.sessions.delete(key); this.sessionLastUsed.delete(key); } + this.lastResultRunFingerprints.delete(key); } else { const keys = new Set([ ...this.sessions.keys(), @@ -825,6 +866,7 @@ export class LettaBot implements AgentSession { this.sessions.clear(); this.sessionCreationLocks.clear(); this.sessionLastUsed.clear(); + this.lastResultRunFingerprints.clear(); } } @@ -1530,7 +1572,11 @@ export class LettaBot implements AgentSession { try { const convKey = this.resolveConversationKey(msg.channel, msg.chatId); const seq = ++this.sendSequence; - log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} text=${(msg.text || '').slice(0, 80)}`); + const userText = msg.text || ''; + log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} textLen=${userText.length}`); + if (userText.length > 0) { + log.debug(`processMessage seq=${seq} textPreview=${userText.slice(0, 80)}`); + } const run = await this.runSession(messageToSend, { retried, canUseTool, convKey }); lap('session send'); session = run.session; @@ -1611,6 +1657,7 @@ export class LettaBot implements AgentSession { try { let firstChunkLogged = false; + let streamedAssistantText = ''; for await (const streamMsg of run.stream()) { // Check for /cancel before processing each chunk if (this.cancelledKeys.has(convKey)) { @@ -1735,7 +1782,9 @@ export class LettaBot implements AgentSession { } lastAssistantUuid = msgUuid || lastAssistantUuid; - response += streamMsg.content || ''; + const assistantChunk = streamMsg.content || ''; + response += assistantChunk; + streamedAssistantText += assistantChunk; // Live-edit streaming for channels that support it // Hold back streaming edits while response could still be or block @@ -1776,7 +1825,7 @@ export class LettaBot implements AgentSession { // content from a previously cancelled run as the result for the // next message. Discard it and retry so the message gets processed. if (streamMsg.stopReason === 'cancelled') { - log.info(`Discarding cancelled run result (len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`); + log.info(`Discarding cancelled run result (seq=${seq}, len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`); this.invalidateSession(convKey); session = null; if (!retried) { @@ -1785,25 +1834,49 @@ export class LettaBot implements AgentSession { break; } + const resultRunState = this.classifyResultRun(convKey, streamMsg); + if (resultRunState === 'stale') { + this.invalidateSession(convKey); + session = null; + if (!retried) { + log.warn(`Retrying message after stale duplicate result (seq=${seq}, key=${convKey})`); + return this.processMessage(msg, adapter, true); + } + response = ''; + break; + } + const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : ''; if (resultText.trim().length > 0) { - // Guard against n-1 desync: if we accumulated assistant text via - // streaming and the result carries different content, the result - // likely belongs to a prior run that leaked through the SDK's - // stream queue. Prefer the real-time streamed content. - if (response.trim().length > 0 && resultText.trim() !== response.trim()) { + const streamedTextTrimmed = streamedAssistantText.trim(); + const resultTextTrimmed = resultText.trim(); + // Decision tree: + // 1) Diverged from streamed output -> prefer streamed text (active fix today) + // 2) No streamed assistant text -> use result text as fallback + // 3) Streamed text exists but nothing was delivered -> allow one result resend + // Compare against all streamed assistant text, not the current + // response buffer (which can be reset between assistant turns). + if (streamedTextTrimmed.length > 0 && resultTextTrimmed !== streamedTextTrimmed) { log.warn( `Result text diverges from streamed content ` + - `(resultLen=${resultText.length}, streamLen=${response.length}). ` + + `(resultLen=${resultText.length}, streamLen=${streamedAssistantText.length}). ` + `Preferring streamed content to avoid n-1 desync.` ); - } else { + } else if (streamedTextTrimmed.length === 0) { + // Fallback for models/providers that only populate result text. + response = resultText; + } else if (!sentAnyMessage && response.trim().length === 0) { + // Safety fallback: if we streamed text but nothing was + // delivered yet, allow a single result-based resend. response = resultText; } } const hasResponse = response.trim().length > 0; const isTerminalError = streamMsg.success === false || !!streamMsg.error; - log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}, responsePreview=${response.trim().slice(0, 60)}`); + log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`); + if (response.trim().length > 0) { + log.debug(`Stream result preview: seq=${seq} responsePreview=${response.trim().slice(0, 60)}`); + } log.info(`Stream message counts:`, msgTypeCounts); if (streamMsg.error) { const detail = resultText.trim(); @@ -2101,55 +2174,79 @@ export class LettaBot implements AgentSession { const acquired = await this.acquireLock(convKey); try { - const { stream } = await this.runSession(text, { convKey }); - - try { - let response = ''; - let lastErrorDetail: { message: string; stopReason: string; apiError?: Record } | undefined; - for await (const msg of stream()) { - if (msg.type === 'tool_call') { - this.syncTodoToolCall(msg); - } - if (msg.type === 'error') { - lastErrorDetail = { - message: (msg as any).message || 'unknown', - stopReason: (msg as any).stopReason || 'error', - apiError: (msg as any).apiError, - }; - } - if (msg.type === 'assistant') { - response += msg.content || ''; - } - if (msg.type === 'result') { - // TODO(letta-code-sdk#31): Remove once SDK handles HITL approvals in bypassPermissions mode. - if (msg.success === false || msg.error) { - // Enrich opaque errors from run metadata (mirrors processMessage logic). - const convId = typeof msg.conversationId === 'string' ? msg.conversationId : undefined; - if (this.store.agentId && - (!lastErrorDetail || lastErrorDetail.message === 'Agent stopped: error')) { - const enriched = await getLatestRunError(this.store.agentId, convId); - if (enriched) { - lastErrorDetail = { message: enriched.message, stopReason: enriched.stopReason }; - } - } - const errMsg = lastErrorDetail?.message || msg.error || 'error'; - const errReason = lastErrorDetail?.stopReason || msg.error || 'error'; - const detail = typeof msg.result === 'string' ? msg.result.trim() : ''; - throw new Error(detail ? `Agent run failed: ${errReason} (${errMsg})` : `Agent run failed: ${errReason} -- ${errMsg}`); + let retried = false; + while (true) { + const { stream } = await this.runSession(text, { convKey, retried }); + + try { + let response = ''; + let sawStaleDuplicateResult = false; + let lastErrorDetail: { message: string; stopReason: string; apiError?: Record } | undefined; + for await (const msg of stream()) { + if (msg.type === 'tool_call') { + this.syncTodoToolCall(msg); + } + if (msg.type === 'error') { + lastErrorDetail = { + message: (msg as any).message || 'unknown', + stopReason: (msg as any).stopReason || 'error', + apiError: (msg as any).apiError, + }; + } + if (msg.type === 'assistant') { + response += msg.content || ''; + } + if (msg.type === 'result') { + const resultRunState = this.classifyResultRun(convKey, msg); + if (resultRunState === 'stale') { + sawStaleDuplicateResult = true; + break; + } + + // TODO(letta-code-sdk#31): Remove once SDK handles HITL approvals in bypassPermissions mode. + if (msg.success === false || msg.error) { + // Enrich opaque errors from run metadata (mirrors processMessage logic). + const convId = typeof msg.conversationId === 'string' ? msg.conversationId : undefined; + if (this.store.agentId && + (!lastErrorDetail || lastErrorDetail.message === 'Agent stopped: error')) { + const enriched = await getLatestRunError(this.store.agentId, convId); + if (enriched) { + lastErrorDetail = { message: enriched.message, stopReason: enriched.stopReason }; + } + } + const errMsg = lastErrorDetail?.message || msg.error || 'error'; + const errReason = lastErrorDetail?.stopReason || msg.error || 'error'; + const detail = typeof msg.result === 'string' ? msg.result.trim() : ''; + throw new Error(detail ? `Agent run failed: ${errReason} (${errMsg})` : `Agent run failed: ${errReason} -- ${errMsg}`); + } + break; } - break; } + + if (sawStaleDuplicateResult) { + this.invalidateSession(convKey); + if (retried) { + throw new Error('Agent stream returned stale duplicate result after retry'); + } + log.warn(`Retrying sendToAgent after stale duplicate result (key=${convKey})`); + retried = true; + continue; + } + + if (isSilent && response.trim()) { + log.info(`Silent mode: collected ${response.length} chars (not delivered)`); + } + return response; + } catch (error) { + // Invalidate on stream errors so next call gets a fresh subprocess + this.invalidateSession(convKey); + throw error; } - if (isSilent && response.trim()) { - log.info(`Silent mode: collected ${response.length} chars (not delivered)`); - } - return response; - } catch (error) { - // Invalidate on stream errors so next call gets a fresh subprocess - this.invalidateSession(convKey); - throw error; } } finally { + if (this.config.reuseSession === false) { + this.invalidateSession(convKey); + } this.releaseLock(convKey, acquired); } } @@ -2175,6 +2272,9 @@ export class LettaBot implements AgentSession { throw error; } } finally { + if (this.config.reuseSession === false) { + this.invalidateSession(convKey); + } this.releaseLock(convKey, acquired); } } diff --git a/src/core/result-guard.test.ts b/src/core/result-guard.test.ts new file mode 100644 index 0000000..b253ccd --- /dev/null +++ b/src/core/result-guard.test.ts @@ -0,0 +1,107 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { LettaBot } from './bot.js'; +import type { InboundMessage } from './types.js'; + +describe('result divergence guard', () => { + let workDir: string; + + beforeEach(() => { + workDir = mkdtempSync(join(tmpdir(), 'lettabot-result-guard-')); + }); + + afterEach(() => { + rmSync(workDir, { recursive: true, force: true }); + }); + + it('does not resend full result text when streamed content was already flushed', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async () => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + (bot as any).runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + // Assistant text is flushed when tool_call arrives. + yield { type: 'assistant', content: 'first segment' }; + yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' } }; + // Result repeats the same text; this must not cause a duplicate send. + yield { type: 'result', success: true, result: 'first segment' }; + }, + })); + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); + expect(sentTexts).toEqual(['first segment']); + }); + + it('prefers streamed assistant text when result text diverges after flush', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async () => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + (bot as any).runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: 'streamed-segment' }; + yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' } }; + // Divergent stale result should not replace or resend streamed content. + yield { type: 'result', success: true, result: 'stale-result-segment' }; + }, + })); + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); + expect(sentTexts).toEqual(['streamed-segment']); + }); +}); diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index fd740f2..b35f44b 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -684,6 +684,88 @@ describe('SDK session contract', () => { ); }); + it('retries sendToAgent when SDK result runIds repeat the previous run', async () => { + let streamCall = 0; + + const mockSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async () => undefined), + stream: vi.fn(() => { + const call = streamCall++; + return (async function* () { + if (call === 0) { + yield { type: 'assistant', content: 'response-A' }; + yield { type: 'result', success: true, runIds: ['run-A'] }; + return; + } + if (call === 1) { + // Stale replay of the previous run; bot should retry once. + yield { type: 'assistant', content: 'stale-A' }; + yield { type: 'result', success: true, runIds: ['run-A'] }; + return; + } + yield { type: 'assistant', content: 'response-B' }; + yield { type: 'result', success: true, runIds: ['run-B'] }; + })(); + }), + close: vi.fn(() => undefined), + agentId: 'agent-runid-test', + conversationId: 'conversation-runid-test', + }; + + vi.mocked(createSession).mockReturnValue(mockSession as never); + vi.mocked(resumeSession).mockReturnValue(mockSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + + const responseA = await bot.sendToAgent('first message'); + expect(responseA).toBe('response-A'); + + const responseB = await bot.sendToAgent('second message'); + expect(responseB).toBe('response-B'); + + expect(mockSession.send).toHaveBeenCalledTimes(3); + expect(mockSession.send).toHaveBeenNthCalledWith(1, 'first message'); + expect(mockSession.send).toHaveBeenNthCalledWith(2, 'second message'); + expect(mockSession.send).toHaveBeenNthCalledWith(3, 'second message'); + expect(mockSession.close).toHaveBeenCalledTimes(1); + }); + + it('invalidates background sessions when reuseSession is false', async () => { + const mockSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async () => undefined), + stream: vi.fn(() => + (async function* () { + yield { type: 'assistant', content: 'ok' }; + // Keep this fixture aligned with current SDK output where runIds is + // often absent; this test validates reuseSession behavior only. + yield { type: 'result', success: true }; + })() + ), + close: vi.fn(() => undefined), + agentId: 'agent-reuse-false', + conversationId: 'conversation-reuse-false', + }; + + vi.mocked(createSession).mockReturnValue(mockSession as never); + vi.mocked(resumeSession).mockReturnValue(mockSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + reuseSession: false, + }); + + await bot.sendToAgent('first background trigger'); + await bot.sendToAgent('second background trigger'); + + expect(mockSession.close).toHaveBeenCalledTimes(2); + }); + it('does not leak stale stream events between consecutive sendToAgent calls', async () => { // Simulates the real SDK behavior prior to 0.1.8: the shared streamQueue // retains events that arrive after the result message. When the next diff --git a/src/core/types.ts b/src/core/types.ts index c65b099..44aff4d 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -198,6 +198,7 @@ export interface StreamMsg { uuid?: string; isError?: boolean; result?: string; + runIds?: string[]; success?: boolean; error?: string; [key: string]: unknown;