fix: handle stale duplicate run results and background session reuse (#458)

This commit is contained in:
Cameron
2026-03-02 13:46:29 -08:00
committed by GitHub
parent 157a192fba
commit e40a8d61ee
4 changed files with 345 additions and 55 deletions

View File

@@ -202,6 +202,10 @@ export class LettaBot implements AgentSession {
private processingKeys: Set<string> = new Set(); // Per-key locks for per-channel mode
private cancelledKeys: Set<string> = new Set(); // Tracks keys where /cancel was issued
private sendSequence = 0; // Monotonic counter for desync diagnostics
// Forward-looking: stale-result detection via runIds becomes active once the
// SDK surfaces non-empty result run_ids. Until then, this map mostly stays
// empty and the streamed/result divergence guard remains the active defense.
private lastResultRunFingerprints: Map<string, string> = new Map();
// AskUserQuestion support: resolves when the next user message arrives.
// In per-chat mode, keyed by convKey so each chat resolves independently.
@@ -234,6 +238,9 @@ export class LettaBot implements AgentSession {
this.config = config;
mkdirSync(config.workingDir, { recursive: true });
this.store = new Store('lettabot-agent.json', config.agentName);
if (config.reuseSession === false) {
log.warn('Session reuse disabled (conversations.reuseSession=false): each foreground/background message uses a fresh SDK subprocess (~5s overhead per turn).');
}
if (config.conversationOverrides?.length) {
this.conversationOverrides = new Set(config.conversationOverrides.map((ch) => ch.toLowerCase()));
}
@@ -253,6 +260,38 @@ export class LettaBot implements AgentSession {
return `${this.config.displayName}: ${text}`;
}
private normalizeResultRunIds(msg: StreamMsg): string[] {
// Forward-looking compatibility:
// - Current SDK releases often emit result.run_ids as null/undefined.
// - When runIds are absent, caller gets [] and falls back to streamed vs
// result text comparison (which works with today's wire payloads).
const rawRunIds = (msg as StreamMsg & { runIds?: unknown; run_ids?: unknown }).runIds
?? (msg as StreamMsg & { run_ids?: unknown }).run_ids;
if (!Array.isArray(rawRunIds)) return [];
const runIds = rawRunIds.filter((id): id is string =>
typeof id === 'string' && id.trim().length > 0
);
if (runIds.length === 0) return [];
return [...new Set(runIds)].sort();
}
private classifyResultRun(convKey: string, msg: StreamMsg): 'fresh' | 'stale' | 'unknown' {
const runIds = this.normalizeResultRunIds(msg);
if (runIds.length === 0) return 'unknown';
const fingerprint = runIds.join(',');
const previous = this.lastResultRunFingerprints.get(convKey);
if (previous === fingerprint) {
log.warn(`Detected stale duplicate result (key=${convKey}, runIds=${fingerprint})`);
return 'stale';
}
this.lastResultRunFingerprints.set(convKey, fingerprint);
return 'fresh';
}
// =========================================================================
// Session options (shared by processMessage and sendToAgent)
// =========================================================================
@@ -773,6 +812,7 @@ export class LettaBot implements AgentSession {
this.sessionLastUsed.delete(oldestKey);
this.sessionGenerations.delete(oldestKey);
this.sessionCreationLocks.delete(oldestKey);
this.lastResultRunFingerprints.delete(oldestKey);
} else {
// All existing sessions are active; allow temporary overflow.
log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`);
@@ -808,6 +848,7 @@ export class LettaBot implements AgentSession {
this.sessions.delete(key);
this.sessionLastUsed.delete(key);
}
this.lastResultRunFingerprints.delete(key);
} else {
const keys = new Set<string>([
...this.sessions.keys(),
@@ -825,6 +866,7 @@ export class LettaBot implements AgentSession {
this.sessions.clear();
this.sessionCreationLocks.clear();
this.sessionLastUsed.clear();
this.lastResultRunFingerprints.clear();
}
}
@@ -1530,7 +1572,11 @@ export class LettaBot implements AgentSession {
try {
const convKey = this.resolveConversationKey(msg.channel, msg.chatId);
const seq = ++this.sendSequence;
log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} text=${(msg.text || '').slice(0, 80)}`);
const userText = msg.text || '';
log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} textLen=${userText.length}`);
if (userText.length > 0) {
log.debug(`processMessage seq=${seq} textPreview=${userText.slice(0, 80)}`);
}
const run = await this.runSession(messageToSend, { retried, canUseTool, convKey });
lap('session send');
session = run.session;
@@ -1611,6 +1657,7 @@ export class LettaBot implements AgentSession {
try {
let firstChunkLogged = false;
let streamedAssistantText = '';
for await (const streamMsg of run.stream()) {
// Check for /cancel before processing each chunk
if (this.cancelledKeys.has(convKey)) {
@@ -1735,7 +1782,9 @@ export class LettaBot implements AgentSession {
}
lastAssistantUuid = msgUuid || lastAssistantUuid;
response += streamMsg.content || '';
const assistantChunk = streamMsg.content || '';
response += assistantChunk;
streamedAssistantText += assistantChunk;
// Live-edit streaming for channels that support it
// Hold back streaming edits while response could still be <no-reply/> or <actions> block
@@ -1776,7 +1825,7 @@ export class LettaBot implements AgentSession {
// content from a previously cancelled run as the result for the
// next message. Discard it and retry so the message gets processed.
if (streamMsg.stopReason === 'cancelled') {
log.info(`Discarding cancelled run result (len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`);
log.info(`Discarding cancelled run result (seq=${seq}, len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`);
this.invalidateSession(convKey);
session = null;
if (!retried) {
@@ -1785,25 +1834,49 @@ export class LettaBot implements AgentSession {
break;
}
const resultRunState = this.classifyResultRun(convKey, streamMsg);
if (resultRunState === 'stale') {
this.invalidateSession(convKey);
session = null;
if (!retried) {
log.warn(`Retrying message after stale duplicate result (seq=${seq}, key=${convKey})`);
return this.processMessage(msg, adapter, true);
}
response = '';
break;
}
const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : '';
if (resultText.trim().length > 0) {
// Guard against n-1 desync: if we accumulated assistant text via
// streaming and the result carries different content, the result
// likely belongs to a prior run that leaked through the SDK's
// stream queue. Prefer the real-time streamed content.
if (response.trim().length > 0 && resultText.trim() !== response.trim()) {
const streamedTextTrimmed = streamedAssistantText.trim();
const resultTextTrimmed = resultText.trim();
// Decision tree:
// 1) Diverged from streamed output -> prefer streamed text (active fix today)
// 2) No streamed assistant text -> use result text as fallback
// 3) Streamed text exists but nothing was delivered -> allow one result resend
// Compare against all streamed assistant text, not the current
// response buffer (which can be reset between assistant turns).
if (streamedTextTrimmed.length > 0 && resultTextTrimmed !== streamedTextTrimmed) {
log.warn(
`Result text diverges from streamed content ` +
`(resultLen=${resultText.length}, streamLen=${response.length}). ` +
`(resultLen=${resultText.length}, streamLen=${streamedAssistantText.length}). ` +
`Preferring streamed content to avoid n-1 desync.`
);
} else {
} else if (streamedTextTrimmed.length === 0) {
// Fallback for models/providers that only populate result text.
response = resultText;
} else if (!sentAnyMessage && response.trim().length === 0) {
// Safety fallback: if we streamed text but nothing was
// delivered yet, allow a single result-based resend.
response = resultText;
}
}
const hasResponse = response.trim().length > 0;
const isTerminalError = streamMsg.success === false || !!streamMsg.error;
log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}, responsePreview=${response.trim().slice(0, 60)}`);
log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`);
if (response.trim().length > 0) {
log.debug(`Stream result preview: seq=${seq} responsePreview=${response.trim().slice(0, 60)}`);
}
log.info(`Stream message counts:`, msgTypeCounts);
if (streamMsg.error) {
const detail = resultText.trim();
@@ -2101,10 +2174,13 @@ export class LettaBot implements AgentSession {
const acquired = await this.acquireLock(convKey);
try {
const { stream } = await this.runSession(text, { convKey });
let retried = false;
while (true) {
const { stream } = await this.runSession(text, { convKey, retried });
try {
let response = '';
let sawStaleDuplicateResult = false;
let lastErrorDetail: { message: string; stopReason: string; apiError?: Record<string, unknown> } | undefined;
for await (const msg of stream()) {
if (msg.type === 'tool_call') {
@@ -2121,6 +2197,12 @@ export class LettaBot implements AgentSession {
response += msg.content || '';
}
if (msg.type === 'result') {
const resultRunState = this.classifyResultRun(convKey, msg);
if (resultRunState === 'stale') {
sawStaleDuplicateResult = true;
break;
}
// TODO(letta-code-sdk#31): Remove once SDK handles HITL approvals in bypassPermissions mode.
if (msg.success === false || msg.error) {
// Enrich opaque errors from run metadata (mirrors processMessage logic).
@@ -2140,6 +2222,17 @@ export class LettaBot implements AgentSession {
break;
}
}
if (sawStaleDuplicateResult) {
this.invalidateSession(convKey);
if (retried) {
throw new Error('Agent stream returned stale duplicate result after retry');
}
log.warn(`Retrying sendToAgent after stale duplicate result (key=${convKey})`);
retried = true;
continue;
}
if (isSilent && response.trim()) {
log.info(`Silent mode: collected ${response.length} chars (not delivered)`);
}
@@ -2149,7 +2242,11 @@ export class LettaBot implements AgentSession {
this.invalidateSession(convKey);
throw error;
}
}
} finally {
if (this.config.reuseSession === false) {
this.invalidateSession(convKey);
}
this.releaseLock(convKey, acquired);
}
}
@@ -2175,6 +2272,9 @@ export class LettaBot implements AgentSession {
throw error;
}
} finally {
if (this.config.reuseSession === false) {
this.invalidateSession(convKey);
}
this.releaseLock(convKey, acquired);
}
}

View File

@@ -0,0 +1,107 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { mkdtempSync, rmSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { LettaBot } from './bot.js';
import type { InboundMessage } from './types.js';
describe('result divergence guard', () => {
let workDir: string;
beforeEach(() => {
workDir = mkdtempSync(join(tmpdir(), 'lettabot-result-guard-'));
});
afterEach(() => {
rmSync(workDir, { recursive: true, force: true });
});
it('does not resend full result text when streamed content was already flushed', async () => {
const bot = new LettaBot({
workingDir: workDir,
allowedTools: [],
});
const adapter = {
id: 'mock',
name: 'Mock',
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
isRunning: vi.fn(() => true),
sendMessage: vi.fn(async () => ({ messageId: 'msg-1' })),
editMessage: vi.fn(async () => {}),
sendTypingIndicator: vi.fn(async () => {}),
stopTypingIndicator: vi.fn(async () => {}),
supportsEditing: vi.fn(() => false),
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
// Assistant text is flushed when tool_call arrives.
yield { type: 'assistant', content: 'first segment' };
yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' } };
// Result repeats the same text; this must not cause a duplicate send.
yield { type: 'result', success: true, result: 'first segment' };
},
}));
const msg: InboundMessage = {
channel: 'discord',
chatId: 'chat-1',
userId: 'user-1',
text: 'hello',
timestamp: new Date(),
};
await (bot as any).processMessage(msg, adapter);
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
expect(sentTexts).toEqual(['first segment']);
});
it('prefers streamed assistant text when result text diverges after flush', async () => {
const bot = new LettaBot({
workingDir: workDir,
allowedTools: [],
});
const adapter = {
id: 'mock',
name: 'Mock',
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),
isRunning: vi.fn(() => true),
sendMessage: vi.fn(async () => ({ messageId: 'msg-1' })),
editMessage: vi.fn(async () => {}),
sendTypingIndicator: vi.fn(async () => {}),
stopTypingIndicator: vi.fn(async () => {}),
supportsEditing: vi.fn(() => false),
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield { type: 'assistant', content: 'streamed-segment' };
yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' } };
// Divergent stale result should not replace or resend streamed content.
yield { type: 'result', success: true, result: 'stale-result-segment' };
},
}));
const msg: InboundMessage = {
channel: 'discord',
chatId: 'chat-1',
userId: 'user-1',
text: 'hello',
timestamp: new Date(),
};
await (bot as any).processMessage(msg, adapter);
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
expect(sentTexts).toEqual(['streamed-segment']);
});
});

View File

@@ -684,6 +684,88 @@ describe('SDK session contract', () => {
);
});
it('retries sendToAgent when SDK result runIds repeat the previous run', async () => {
let streamCall = 0;
const mockSession = {
initialize: vi.fn(async () => undefined),
send: vi.fn(async () => undefined),
stream: vi.fn(() => {
const call = streamCall++;
return (async function* () {
if (call === 0) {
yield { type: 'assistant', content: 'response-A' };
yield { type: 'result', success: true, runIds: ['run-A'] };
return;
}
if (call === 1) {
// Stale replay of the previous run; bot should retry once.
yield { type: 'assistant', content: 'stale-A' };
yield { type: 'result', success: true, runIds: ['run-A'] };
return;
}
yield { type: 'assistant', content: 'response-B' };
yield { type: 'result', success: true, runIds: ['run-B'] };
})();
}),
close: vi.fn(() => undefined),
agentId: 'agent-runid-test',
conversationId: 'conversation-runid-test',
};
vi.mocked(createSession).mockReturnValue(mockSession as never);
vi.mocked(resumeSession).mockReturnValue(mockSession as never);
const bot = new LettaBot({
workingDir: join(dataDir, 'working'),
allowedTools: [],
});
const responseA = await bot.sendToAgent('first message');
expect(responseA).toBe('response-A');
const responseB = await bot.sendToAgent('second message');
expect(responseB).toBe('response-B');
expect(mockSession.send).toHaveBeenCalledTimes(3);
expect(mockSession.send).toHaveBeenNthCalledWith(1, 'first message');
expect(mockSession.send).toHaveBeenNthCalledWith(2, 'second message');
expect(mockSession.send).toHaveBeenNthCalledWith(3, 'second message');
expect(mockSession.close).toHaveBeenCalledTimes(1);
});
it('invalidates background sessions when reuseSession is false', async () => {
const mockSession = {
initialize: vi.fn(async () => undefined),
send: vi.fn(async () => undefined),
stream: vi.fn(() =>
(async function* () {
yield { type: 'assistant', content: 'ok' };
// Keep this fixture aligned with current SDK output where runIds is
// often absent; this test validates reuseSession behavior only.
yield { type: 'result', success: true };
})()
),
close: vi.fn(() => undefined),
agentId: 'agent-reuse-false',
conversationId: 'conversation-reuse-false',
};
vi.mocked(createSession).mockReturnValue(mockSession as never);
vi.mocked(resumeSession).mockReturnValue(mockSession as never);
const bot = new LettaBot({
workingDir: join(dataDir, 'working'),
allowedTools: [],
reuseSession: false,
});
await bot.sendToAgent('first background trigger');
await bot.sendToAgent('second background trigger');
expect(mockSession.close).toHaveBeenCalledTimes(2);
});
it('does not leak stale stream events between consecutive sendToAgent calls', async () => {
// Simulates the real SDK behavior prior to 0.1.8: the shared streamQueue
// retains events that arrive after the result message. When the next

View File

@@ -198,6 +198,7 @@ export interface StreamMsg {
uuid?: string;
isError?: boolean;
result?: string;
runIds?: string[];
success?: boolean;
error?: string;
[key: string]: unknown;