fix(discord): isolate conversations per-thread in thread-only mode

Thread-only mode previously relied on the global conversationMode to
determine whether threads got separate conversations. In shared or
per-channel mode, all threads shared one conversation causing crosstalk.

Add forcePerChat flag on InboundMessage that the Discord adapter sets
when thread-only mode is active. resolveConversationKey treats flagged
messages as per-chat regardless of the configured mode, giving each
thread its own isolated conversation while still sharing agent memory.

Written by Cameron ◯ Letta Code

"Time is an illusion. Lunchtime doubly so." -- Douglas Adams
This commit is contained in:
Cameron
2026-03-09 18:25:09 -07:00
parent 4d037aca6a
commit b79f29d8fa
6 changed files with 43 additions and 12 deletions

View File

@@ -461,6 +461,8 @@ channels:
Thread messages inherit parent channel config, so child threads under `EZRA_CHANNEL_ID` use the same group rules. Thread messages inherit parent channel config, so child threads under `EZRA_CHANNEL_ID` use the same group rules.
When `threadMode: thread-only` is set, each thread automatically gets its own isolated conversation (message history), regardless of the global `conversations.mode` setting. This prevents messages from different threads from being interleaved in the same conversation. Agent memory (blocks) is still shared across all threads.
### Finding Group IDs ### Finding Group IDs
Each channel uses different identifiers for groups: Each channel uses different identifiers for groups:

View File

@@ -209,6 +209,7 @@ Behavior summary:
- Top-level messages are ignored in `thread-only` mode. - Top-level messages are ignored in `thread-only` mode.
- Top-level @mentions create a thread and are answered in that thread when `autoCreateThreadOnMention` is enabled. - Top-level @mentions create a thread and are answered in that thread when `autoCreateThreadOnMention` is enabled.
- Thread messages inherit parent channel config. If `EZRA_CHANNEL_ID` is configured, replies in its child threads use that same config. - Thread messages inherit parent channel config. If `EZRA_CHANNEL_ID` is configured, replies in its child threads use that same config.
- Each thread gets its own isolated conversation (message history), regardless of the global `conversations.mode` setting. This prevents crosstalk between threads. Agent memory (blocks) is still shared.
Required Discord permissions for auto-create: Required Discord permissions for auto-create:

View File

@@ -381,6 +381,7 @@ Ask the bot owner to approve with:
let isListeningMode = false; let isListeningMode = false;
let effectiveChatId = message.channel.id; let effectiveChatId = message.channel.id;
let effectiveGroupName = groupName; let effectiveGroupName = groupName;
let isThreadOnly = false;
// Group gating: config-based allowlist + mode // Group gating: config-based allowlist + mode
if (isGroup && this.config.groups) { if (isGroup && this.config.groups) {
@@ -414,7 +415,8 @@ Ask the bot owner to approve with:
} }
const threadMode = resolveDiscordThreadMode(this.config.groups, keys); const threadMode = resolveDiscordThreadMode(this.config.groups, keys);
if (threadMode === 'thread-only' && !isThreadMessage) { isThreadOnly = threadMode === 'thread-only';
if (isThreadOnly && !isThreadMessage) {
const shouldCreateThread = const shouldCreateThread =
wasMentioned && resolveDiscordAutoCreateThreadOnMention(this.config.groups, keys); wasMentioned && resolveDiscordAutoCreateThreadOnMention(this.config.groups, keys);
if (!shouldCreateThread) { if (!shouldCreateThread) {
@@ -444,6 +446,7 @@ Ask the bot owner to approve with:
serverId: message.guildId || undefined, serverId: message.guildId || undefined,
wasMentioned, wasMentioned,
isListeningMode, isListeningMode,
forcePerChat: isThreadOnly || undefined,
attachments, attachments,
formatterHints: this.getFormatterHints(), formatterHints: this.getFormatterHints(),
}); });

View File

@@ -150,10 +150,11 @@ export function resolveConversationKey(
conversationMode: string | undefined, conversationMode: string | undefined,
conversationOverrides: Set<string>, conversationOverrides: Set<string>,
chatId?: string, chatId?: string,
forcePerChat?: boolean,
): string { ): string {
if (conversationMode === 'disabled') return 'default'; if (conversationMode === 'disabled') return 'default';
const normalized = channel.toLowerCase(); const normalized = channel.toLowerCase();
if (conversationMode === 'per-chat' && chatId) return `${normalized}:${chatId}`; if ((conversationMode === 'per-chat' || forcePerChat) && chatId) return `${normalized}:${chatId}`;
if (conversationMode === 'per-channel') return normalized; if (conversationMode === 'per-channel') return normalized;
if (conversationOverrides.has(normalized)) return normalized; if (conversationOverrides.has(normalized)) return normalized;
return 'shared'; return 'shared';
@@ -562,8 +563,8 @@ export class LettaBot implements AgentSession {
* Returns 'shared' in shared mode (unless channel is in perChannel overrides). * Returns 'shared' in shared mode (unless channel is in perChannel overrides).
* Returns channel id in per-channel mode or for override channels. * Returns channel id in per-channel mode or for override channels.
*/ */
private resolveConversationKey(channel: string, chatId?: string): string { private resolveConversationKey(channel: string, chatId?: string, forcePerChat?: boolean): string {
return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides, chatId); return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides, chatId, forcePerChat);
} }
/** /**
@@ -650,7 +651,7 @@ export class LettaBot implements AgentSession {
} }
} }
const convKey = this.resolveConversationKey(effective.channel, effective.chatId); const convKey = this.resolveConversationKey(effective.channel, effective.chatId, effective.forcePerChat);
if (convKey !== 'shared') { if (convKey !== 'shared') {
this.enqueueForKey(convKey, effective, adapter); this.enqueueForKey(convKey, effective, adapter);
} else { } else {
@@ -889,7 +890,7 @@ export class LettaBot implements AgentSession {
// queuing it for normal processing. This prevents a deadlock where // queuing it for normal processing. This prevents a deadlock where
// the stream is paused waiting for user input while the processing // the stream is paused waiting for user input while the processing
// flag blocks new messages from being handled. // flag blocks new messages from being handled.
const incomingConvKey = this.resolveConversationKey(msg.channel, msg.chatId); const incomingConvKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat);
const pendingResolver = this.pendingQuestionResolvers.get(incomingConvKey); const pendingResolver = this.pendingQuestionResolvers.get(incomingConvKey);
if (pendingResolver) { if (pendingResolver) {
log.info(`Intercepted message as AskUserQuestion answer from ${msg.userId} (key=${incomingConvKey})`); log.info(`Intercepted message as AskUserQuestion answer from ${msg.userId} (key=${incomingConvKey})`);
@@ -909,7 +910,7 @@ export class LettaBot implements AgentSession {
return; return;
} }
const convKey = this.resolveConversationKey(msg.channel, msg.chatId); const convKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat);
if (convKey !== 'shared') { if (convKey !== 'shared') {
// Per-channel, per-chat, or override mode: messages on different keys can run in parallel. // Per-channel, per-chat, or override mode: messages on different keys can run in parallel.
this.enqueueForKey(convKey, msg, adapter); this.enqueueForKey(convKey, msg, adapter);
@@ -994,7 +995,7 @@ export class LettaBot implements AgentSession {
// Wait for the user's next message (intercepted by handleMessage). // Wait for the user's next message (intercepted by handleMessage).
// Key by convKey so each chat resolves independently in per-chat mode. // Key by convKey so each chat resolves independently in per-chat mode.
const questionConvKey = this.resolveConversationKey(msg.channel, msg.chatId); const questionConvKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat);
const answer = await new Promise<string>((resolve) => { const answer = await new Promise<string>((resolve) => {
this.pendingQuestionResolvers.set(questionConvKey, resolve); this.pendingQuestionResolvers.set(questionConvKey, resolve);
}); });
@@ -1169,7 +1170,7 @@ export class LettaBot implements AgentSession {
// Run session // Run session
let session: Session | null = null; let session: Session | null = null;
try { try {
const convKey = this.resolveConversationKey(msg.channel, msg.chatId); const convKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat);
const seq = ++this.sendSequence; const seq = ++this.sendSequence;
const userText = msg.text || ''; const userText = msg.text || '';
log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} textLen=${userText.length}`); log.info(`processMessage seq=${seq} key=${convKey} retried=${retried} user=${msg.userId} textLen=${userText.length}`);
@@ -1605,7 +1606,7 @@ export class LettaBot implements AgentSession {
// Only retry if we never sent anything to the user. hasResponse tracks // Only retry if we never sent anything to the user. hasResponse tracks
// the current buffer, but finalizeMessage() clears it on type changes. // the current buffer, but finalizeMessage() clears it on type changes.
// sentAnyMessage is the authoritative "did we deliver output" flag. // sentAnyMessage is the authoritative "did we deliver output" flag.
const retryConvKey = this.resolveConversationKey(msg.channel, msg.chatId); const retryConvKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat);
const retryConvIdFromStore = (retryConvKey === 'shared' const retryConvIdFromStore = (retryConvKey === 'shared'
? this.store.conversationId ? this.store.conversationId
: this.store.getConversationId(retryConvKey)) ?? undefined; : this.store.getConversationId(retryConvKey)) ?? undefined;
@@ -1815,7 +1816,7 @@ export class LettaBot implements AgentSession {
log.error('Failed to send error message to channel:', sendError); log.error('Failed to send error message to channel:', sendError);
} }
} finally { } finally {
const finalConvKey = this.resolveConversationKey(msg.channel, msg.chatId); const finalConvKey = this.resolveConversationKey(msg.channel, msg.chatId, msg.forcePerChat);
// When session reuse is disabled, invalidate after every message to // When session reuse is disabled, invalidate after every message to
// eliminate any possibility of stream state bleed between sequential // eliminate any possibility of stream state bleed between sequential
// sends. Costs ~5s subprocess init overhead per message. // sends. Costs ~5s subprocess init overhead per message.

View File

@@ -85,10 +85,33 @@ describe('resolveConversationKey', () => {
expect(resolveConversationKey('telegram', 'disabled', new Set(), '12345')).toBe('default'); expect(resolveConversationKey('telegram', 'disabled', new Set(), '12345')).toBe('default');
}); });
it('returns "default" in disabled mode regardless of overrides', () => { it('returns \"default\" in disabled mode regardless of overrides', () => {
const overrides = new Set(['telegram']); const overrides = new Set(['telegram']);
expect(resolveConversationKey('telegram', 'disabled', overrides)).toBe('default'); expect(resolveConversationKey('telegram', 'disabled', overrides)).toBe('default');
}); });
// --- forcePerChat ---
it('forcePerChat overrides shared mode to per-chat', () => {
expect(resolveConversationKey('discord', 'shared', new Set(), '99999', true)).toBe('discord:99999');
});
it('forcePerChat overrides per-channel mode to per-chat', () => {
expect(resolveConversationKey('discord', 'per-channel', new Set(), '99999', true)).toBe('discord:99999');
});
it('forcePerChat without chatId falls back normally', () => {
expect(resolveConversationKey('discord', 'shared', new Set(), undefined, true)).toBe('shared');
});
it('forcePerChat still respects disabled mode', () => {
expect(resolveConversationKey('discord', 'disabled', new Set(), '99999', true)).toBe('default');
});
it('forcePerChat=false does not change behavior', () => {
expect(resolveConversationKey('discord', 'shared', new Set(), '99999', false)).toBe('shared');
expect(resolveConversationKey('discord', 'per-channel', new Set(), '99999', false)).toBe('discord');
});
}); });
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------

View File

@@ -84,6 +84,7 @@ export interface InboundMessage {
isBatch?: boolean; // Is this a batched group message? isBatch?: boolean; // Is this a batched group message?
batchedMessages?: InboundMessage[]; // Original individual messages (for batch formatting) batchedMessages?: InboundMessage[]; // Original individual messages (for batch formatting)
isListeningMode?: boolean; // Listening mode: agent processes for memory but response is suppressed isListeningMode?: boolean; // Listening mode: agent processes for memory but response is suppressed
forcePerChat?: boolean; // Force per-chat conversation routing (e.g., Discord thread-only mode)
formatterHints?: FormatterHints; // Channel capabilities for directive rendering formatterHints?: FormatterHints; // Channel capabilities for directive rendering
} }