fix: guard against n-1 desync in stream result handling (#444)
Co-authored-by: letta-code <248085862+letta-code@users.noreply.github.com> Co-authored-by: Cameron <cpfiffer@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
b3366228ef
commit
157a192fba
@@ -70,6 +70,7 @@ export interface AgentConfig {
|
||||
heartbeat?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
perChannel?: string[]; // Channels that should always have their own conversation
|
||||
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
|
||||
reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed.
|
||||
};
|
||||
/** Features for this agent */
|
||||
features?: {
|
||||
@@ -158,6 +159,7 @@ export interface LettaBotConfig {
|
||||
heartbeat?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
perChannel?: string[]; // Channels that should always have their own conversation
|
||||
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
|
||||
reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed.
|
||||
};
|
||||
|
||||
// Features
|
||||
|
||||
@@ -201,6 +201,7 @@ export class LettaBot implements AgentSession {
|
||||
private processing = false; // Global lock for shared mode
|
||||
private processingKeys: Set<string> = new Set(); // Per-key locks for per-channel mode
|
||||
private cancelledKeys: Set<string> = new Set(); // Tracks keys where /cancel was issued
|
||||
private sendSequence = 0; // Monotonic counter for desync diagnostics
|
||||
|
||||
// AskUserQuestion support: resolves when the next user message arrives.
|
||||
// In per-chat mode, keyed by convKey so each chat resolves independently.
|
||||
@@ -1528,6 +1529,8 @@ export class LettaBot implements AgentSession {
|
||||
let session: Session | null = null;
|
||||
try {
|
||||
const convKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
const seq = ++this.sendSequence;
|
||||
log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} text=${(msg.text || '').slice(0, 80)}`);
|
||||
const run = await this.runSession(messageToSend, { retried, canUseTool, convKey });
|
||||
lap('session send');
|
||||
session = run.session;
|
||||
@@ -1784,11 +1787,23 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
const resultText = typeof streamMsg.result === 'string' ? streamMsg.result : '';
|
||||
if (resultText.trim().length > 0) {
|
||||
// Guard against n-1 desync: if we accumulated assistant text via
|
||||
// streaming and the result carries different content, the result
|
||||
// likely belongs to a prior run that leaked through the SDK's
|
||||
// stream queue. Prefer the real-time streamed content.
|
||||
if (response.trim().length > 0 && resultText.trim() !== response.trim()) {
|
||||
log.warn(
|
||||
`Result text diverges from streamed content ` +
|
||||
`(resultLen=${resultText.length}, streamLen=${response.length}). ` +
|
||||
`Preferring streamed content to avoid n-1 desync.`
|
||||
);
|
||||
} else {
|
||||
response = resultText;
|
||||
}
|
||||
}
|
||||
const hasResponse = response.trim().length > 0;
|
||||
const isTerminalError = streamMsg.success === false || !!streamMsg.error;
|
||||
log.info(`Stream result: success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`);
|
||||
log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}, responsePreview=${response.trim().slice(0, 60)}`);
|
||||
log.info(`Stream message counts:`, msgTypeCounts);
|
||||
if (streamMsg.error) {
|
||||
const detail = resultText.trim();
|
||||
@@ -2020,8 +2035,14 @@ export class LettaBot implements AgentSession {
|
||||
log.error('Failed to send error message to channel:', sendError);
|
||||
}
|
||||
} finally {
|
||||
// Session stays alive for reuse -- only invalidated on errors
|
||||
this.cancelledKeys.delete(this.resolveConversationKey(msg.channel, msg.chatId));
|
||||
const finalConvKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
// When session reuse is disabled, invalidate after every message to
|
||||
// eliminate any possibility of stream state bleed between sequential
|
||||
// sends. Costs ~5s subprocess init overhead per message.
|
||||
if (this.config.reuseSession === false) {
|
||||
this.invalidateSession(finalConvKey);
|
||||
}
|
||||
this.cancelledKeys.delete(finalConvKey);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -173,6 +173,7 @@ export interface BotConfig {
|
||||
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode)
|
||||
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
|
||||
reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed at cost of ~5s latency per message.
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -609,6 +609,7 @@ async function main() {
|
||||
heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active',
|
||||
conversationOverrides: agentConfig.conversations?.perChannel,
|
||||
maxSessions: agentConfig.conversations?.maxSessions,
|
||||
reuseSession: agentConfig.conversations?.reuseSession,
|
||||
redaction: agentConfig.security?.redaction,
|
||||
cronStorePath,
|
||||
skills: {
|
||||
|
||||
Reference in New Issue
Block a user