fix: listen mode streaming leak + receiveBotMessages for Discord (#295)

This commit is contained in:
Cameron
2026-02-12 18:57:13 -08:00
committed by GitHub
parent add73bdb51
commit 09ce3b810f
7 changed files with 195 additions and 9 deletions

View File

@@ -0,0 +1,63 @@
import { describe, expect, it } from 'vitest';
import { shouldProcessDiscordBotMessage } from './discord.js';
import type { GroupModeConfig } from './group-mode.js';
describe('shouldProcessDiscordBotMessage', () => {
it('allows non-bot messages', () => {
expect(shouldProcessDiscordBotMessage({
isFromBot: false,
isGroup: true,
keys: ['chat-1'],
})).toBe(true);
});
it('drops bot DMs', () => {
expect(shouldProcessDiscordBotMessage({
isFromBot: true,
isGroup: false,
keys: ['dm-1'],
})).toBe(false);
});
it('drops this bot own messages to prevent self-echo loops', () => {
const groups: Record<string, GroupModeConfig> = {
'chat-1': { mode: 'open', receiveBotMessages: true },
};
expect(shouldProcessDiscordBotMessage({
isFromBot: true,
isGroup: true,
authorId: 'bot-self',
selfUserId: 'bot-self',
groups,
keys: ['chat-1'],
})).toBe(false);
});
it('drops other bot messages when receiveBotMessages is not enabled', () => {
const groups: Record<string, GroupModeConfig> = {
'chat-1': { mode: 'open' },
};
expect(shouldProcessDiscordBotMessage({
isFromBot: true,
isGroup: true,
authorId: 'bot-other',
selfUserId: 'bot-self',
groups,
keys: ['chat-1'],
})).toBe(false);
});
it('allows other bot messages when receiveBotMessages is enabled', () => {
const groups: Record<string, GroupModeConfig> = {
'chat-1': { mode: 'open', receiveBotMessages: true },
};
expect(shouldProcessDiscordBotMessage({
isFromBot: true,
isGroup: true,
authorId: 'bot-other',
selfUserId: 'bot-self',
groups,
keys: ['chat-1'],
})).toBe(true);
});
});

View File

@@ -11,7 +11,7 @@ import type { DmPolicy } from '../pairing/types.js';
import { isUserAllowed, upsertPairingRequest } from '../pairing/store.js';
import { buildAttachmentPath, downloadToFile } from './attachments.js';
import { HELP_TEXT } from '../core/commands.js';
import { isGroupAllowed, isGroupUserAllowed, resolveGroupMode, type GroupModeConfig } from './group-mode.js';
import { isGroupAllowed, isGroupUserAllowed, resolveGroupMode, resolveReceiveBotMessages, type GroupModeConfig } from './group-mode.js';
// Dynamic import to avoid requiring Discord deps if not used
let Client: typeof import('discord.js').Client;
@@ -27,6 +27,20 @@ export interface DiscordConfig {
groups?: Record<string, GroupModeConfig>; // Per-guild/channel settings
}
export function shouldProcessDiscordBotMessage(params: {
isFromBot: boolean;
isGroup: boolean;
authorId?: string;
selfUserId?: string;
groups?: Record<string, GroupModeConfig>;
keys: string[];
}): boolean {
if (!params.isFromBot) return true;
if (!params.isGroup) return false;
if (params.selfUserId && params.authorId === params.selfUserId) return false;
return resolveReceiveBotMessages(params.groups, params.keys);
}
export class DiscordAdapter implements ChannelAdapter {
readonly id = 'discord' as const;
readonly name = 'Discord';
@@ -142,7 +156,21 @@ Ask the bot owner to approve with:
});
this.client.on('messageCreate', async (message) => {
if (message.author?.bot) return;
const isFromBot = !!message.author?.bot;
const isGroup = !!message.guildId;
const chatId = message.channel.id;
const keys = [chatId];
if (message.guildId) keys.push(message.guildId);
const selfUserId = this.client?.user?.id;
if (!shouldProcessDiscordBotMessage({
isFromBot,
isGroup,
authorId: message.author?.id,
selfUserId,
groups: this.config.groups,
keys,
})) return;
let content = (message.content || '').trim();
const userId = message.author?.id;

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from 'vitest';
import { isGroupAllowed, isGroupUserAllowed, resolveGroupAllowedUsers, resolveGroupMode, type GroupsConfig } from './group-mode.js';
import { isGroupAllowed, isGroupUserAllowed, resolveGroupAllowedUsers, resolveGroupMode, resolveReceiveBotMessages, type GroupsConfig } from './group-mode.js';
describe('group-mode helpers', () => {
describe('isGroupAllowed', () => {
@@ -122,6 +122,55 @@ describe('group-mode helpers', () => {
});
});
describe('resolveReceiveBotMessages', () => {
it('returns false when groups config is missing', () => {
expect(resolveReceiveBotMessages(undefined, ['group-1'])).toBe(false);
});
it('returns false when receiveBotMessages is not configured', () => {
const groups: GroupsConfig = { 'group-1': { mode: 'listen' } };
expect(resolveReceiveBotMessages(groups, ['group-1'])).toBe(false);
});
it('returns true when receiveBotMessages is enabled on specific key', () => {
const groups: GroupsConfig = {
'group-1': { mode: 'listen', receiveBotMessages: true },
};
expect(resolveReceiveBotMessages(groups, ['group-1'])).toBe(true);
});
it('returns false when receiveBotMessages is explicitly disabled', () => {
const groups: GroupsConfig = {
'group-1': { mode: 'listen', receiveBotMessages: false },
};
expect(resolveReceiveBotMessages(groups, ['group-1'])).toBe(false);
});
it('uses wildcard as fallback', () => {
const groups: GroupsConfig = {
'*': { mode: 'listen', receiveBotMessages: true },
};
expect(resolveReceiveBotMessages(groups, ['group-1'])).toBe(true);
});
it('prefers specific key over wildcard', () => {
const groups: GroupsConfig = {
'*': { mode: 'listen', receiveBotMessages: true },
'group-1': { mode: 'listen', receiveBotMessages: false },
};
expect(resolveReceiveBotMessages(groups, ['group-1'])).toBe(false);
});
it('uses first matching key in priority order', () => {
const groups: GroupsConfig = {
'chat-1': { mode: 'listen', receiveBotMessages: true },
'server-1': { mode: 'listen', receiveBotMessages: false },
};
expect(resolveReceiveBotMessages(groups, ['chat-1', 'server-1'])).toBe(true);
expect(resolveReceiveBotMessages(groups, ['chat-2', 'server-1'])).toBe(false);
});
});
describe('isGroupUserAllowed', () => {
it('allows all users when no groups config', () => {
expect(isGroupUserAllowed(undefined, ['group-1'], 'any-user')).toBe(true);

View File

@@ -8,6 +8,8 @@ export interface GroupModeConfig {
mode?: GroupMode;
/** Only process group messages from these user IDs. Omit to allow all users. */
allowedUsers?: string[];
/** Process messages from other bots instead of dropping them. Default: false. */
receiveBotMessages?: boolean;
/**
* @deprecated Use mode: "mention-only" (true) or "open" (false).
*/
@@ -76,6 +78,27 @@ export function isGroupUserAllowed(
return allowed.includes(userId);
}
/**
* Resolve whether bot messages should be processed for a group/channel.
*
* Priority:
* 1. First matching key in provided order
* 2. Wildcard "*"
* 3. false (default: bot messages dropped)
*/
export function resolveReceiveBotMessages(
groups: GroupsConfig | undefined,
keys: string[],
): boolean {
if (groups) {
for (const key of keys) {
if (groups[key]?.receiveBotMessages !== undefined) return !!groups[key].receiveBotMessages;
}
if (groups['*']?.receiveBotMessages !== undefined) return !!groups['*'].receiveBotMessages;
}
return false;
}
/**
* Resolve effective mode for a group/channel.
*

View File

@@ -184,6 +184,8 @@ export interface GroupConfig {
mode?: GroupMode;
/** Only process group messages from these user IDs. Omit to allow all users. */
allowedUsers?: string[];
/** Process messages from other bots instead of dropping them. Default: false. */
receiveBotMessages?: boolean;
/**
* @deprecated Use mode: "mention-only" (true) or "open" (false).
*/

View File

@@ -0,0 +1,16 @@
import { describe, expect, it } from 'vitest';
import { isResponseDeliverySuppressed } from './bot.js';
describe('isResponseDeliverySuppressed', () => {
it('returns true for listening-mode messages', () => {
expect(isResponseDeliverySuppressed({ isListeningMode: true })).toBe(true);
});
it('returns false when listening mode is disabled', () => {
expect(isResponseDeliverySuppressed({ isListeningMode: false })).toBe(false);
});
it('returns false when listening mode is undefined', () => {
expect(isResponseDeliverySuppressed({})).toBe(false);
});
});

View File

@@ -113,6 +113,10 @@ export interface StreamMsg {
[key: string]: unknown;
}
export function isResponseDeliverySuppressed(msg: Pick<InboundMessage, 'isListeningMode'>): boolean {
return msg.isListeningMode === true;
}
export class LettaBot implements AgentSession {
private store: Store;
private config: BotConfig;
@@ -721,10 +725,11 @@ export class LettaBot implements AgentSession {
const lap = (label: string) => {
if (debugTiming) console.log(`[Timing] ${label}: ${(performance.now() - t0).toFixed(0)}ms`);
};
const suppressDelivery = isResponseDeliverySuppressed(msg);
this.lastUserMessageTime = new Date();
// Skip heartbeat target update for listening mode (don't redirect heartbeats)
if (!msg.isListeningMode) {
if (!suppressDelivery) {
this.store.lastMessageTarget = {
channel: msg.channel,
chatId: msg.chatId,
@@ -734,7 +739,7 @@ export class LettaBot implements AgentSession {
}
// Fire-and-forget typing indicator so session creation starts immediately
if (!msg.isListeningMode) {
if (!suppressDelivery) {
adapter.sendTypingIndicator(msg.chatId).catch(() => {});
}
lap('typing indicator');
@@ -748,7 +753,7 @@ export class LettaBot implements AgentSession {
: { recovered: false, shouldReset: false };
lap('recovery check');
if (recovery.shouldReset) {
if (!msg.isListeningMode) {
if (!suppressDelivery) {
await adapter.sendMessage({
chatId: msg.chatId,
text: '(Session recovery failed after multiple attempts. Try: lettabot reset-conversation)',
@@ -842,7 +847,7 @@ export class LettaBot implements AgentSession {
sentAnyMessage = true;
}
}
if (response.trim()) {
if (!suppressDelivery && response.trim()) {
try {
const prefixed = this.prefixResponse(response);
if (messageId) {
@@ -934,7 +939,7 @@ export class LettaBot implements AgentSession {
|| (trimmed.startsWith('<actions') && !trimmed.includes('</actions>'));
// Strip any completed <actions> block from the streaming text
const streamText = stripActionsBlock(response).trim();
if (canEdit && !mayBeHidden && streamText.length > 0 && Date.now() - lastUpdate > 500) {
if (canEdit && !mayBeHidden && !suppressDelivery && streamText.length > 0 && Date.now() - lastUpdate > 500) {
try {
const prefixedStream = this.prefixResponse(streamText);
if (messageId) {
@@ -1048,7 +1053,7 @@ export class LettaBot implements AgentSession {
}
// Listening mode: agent processed for memory, suppress response delivery
if (msg.isListeningMode) {
if (suppressDelivery) {
console.log(`[Bot] Listening mode: processed ${msg.channel}:${msg.chatId} for memory (response suppressed)`);
return;
}