diff --git a/src/config/types.ts b/src/config/types.ts index 7266db4..02c9ffb 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -70,6 +70,7 @@ export interface AgentConfig { heartbeat?: string; // "dedicated" | "last-active" | "" (default: last-active) perChannel?: string[]; // Channels that should always have their own conversation maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction) + reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed. }; /** Features for this agent */ features?: { @@ -158,6 +159,7 @@ export interface LettaBotConfig { heartbeat?: string; // "dedicated" | "last-active" | "" (default: last-active) perChannel?: string[]; // Channels that should always have their own conversation maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction) + reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed. }; // Features diff --git a/src/core/bot.ts b/src/core/bot.ts index 3aceb13..ba487de 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -201,6 +201,7 @@ export class LettaBot implements AgentSession { private processing = false; // Global lock for shared mode private processingKeys: Set = new Set(); // Per-key locks for per-channel mode private cancelledKeys: Set = new Set(); // Tracks keys where /cancel was issued + private sendSequence = 0; // Monotonic counter for desync diagnostics // AskUserQuestion support: resolves when the next user message arrives. // In per-chat mode, keyed by convKey so each chat resolves independently. @@ -1528,6 +1529,8 @@ export class LettaBot implements AgentSession { let session: Session | null = null; try { const convKey = this.resolveConversationKey(msg.channel, msg.chatId); + const seq = ++this.sendSequence; + log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} text=${(msg.text || '').slice(0, 80)}`); const run = await this.runSession(messageToSend, { retried, canUseTool, convKey }); lap('session send'); session = run.session; @@ -1784,11 +1787,23 @@ export class LettaBot implements AgentSession { const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : ''; if (resultText.trim().length > 0) { - response = resultText; + // Guard against n-1 desync: if we accumulated assistant text via + // streaming and the result carries different content, the result + // likely belongs to a prior run that leaked through the SDK's + // stream queue. Prefer the real-time streamed content. + if (response.trim().length > 0 && resultText.trim() !== response.trim()) { + log.warn( + `Result text diverges from streamed content ` + + `(resultLen=${resultText.length}, streamLen=${response.length}). ` + + `Preferring streamed content to avoid n-1 desync.` + ); + } else { + response = resultText; + } } const hasResponse = response.trim().length > 0; const isTerminalError = streamMsg.success === false || !!streamMsg.error; - log.info(`Stream result: success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`); + log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}, responsePreview=${response.trim().slice(0, 60)}`); log.info(`Stream message counts:`, msgTypeCounts); if (streamMsg.error) { const detail = resultText.trim(); @@ -2020,8 +2035,14 @@ export class LettaBot implements AgentSession { log.error('Failed to send error message to channel:', sendError); } } finally { - // Session stays alive for reuse -- only invalidated on errors - this.cancelledKeys.delete(this.resolveConversationKey(msg.channel, msg.chatId)); + const finalConvKey = this.resolveConversationKey(msg.channel, msg.chatId); + // When session reuse is disabled, invalidate after every message to + // eliminate any possibility of stream state bleed between sequential + // sends. Costs ~5s subprocess init overhead per message. + if (this.config.reuseSession === false) { + this.invalidateSession(finalConvKey); + } + this.cancelledKeys.delete(finalConvKey); } } diff --git a/src/core/types.ts b/src/core/types.ts index 0751489..c65b099 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -173,6 +173,7 @@ export interface BotConfig { heartbeatConversation?: string; // "dedicated" | "last-active" | "" (default: last-active) conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode) maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction) + reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed at cost of ~5s latency per message. } /** diff --git a/src/main.ts b/src/main.ts index 27509ab..a229f8c 100644 --- a/src/main.ts +++ b/src/main.ts @@ -609,6 +609,7 @@ async function main() { heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active', conversationOverrides: agentConfig.conversations?.perChannel, maxSessions: agentConfig.conversations?.maxSessions, + reuseSession: agentConfig.conversations?.reuseSession, redaction: agentConfig.security?.redaction, cronStorePath, skills: {