feat: group message batching + Telegram group gating + instantGroups (#187)

* feat: add group message batching, Telegram group gating, and instantGroups

Group Message Batcher:
- New GroupBatcher buffers group chat messages and flushes on timer or @mention
- Channel-agnostic: works with any ChannelAdapter
- Configurable per-channel via groupPollIntervalMin (default: 10min, 0 = immediate)
- formatGroupBatchEnvelope formats batched messages as chat logs for the agent
- Single-message batches unwrapped to use DM-style formatMessageEnvelope

Telegram Group Gating:
- my_chat_member handler: bot leaves groups when added by unpaired users
- Groups added by paired users are auto-approved via group-store
- Group messages bypass DM pairing (middleware skips group/supergroup chats)
- Mention detection for @bot in group messages

Channel Group Support:
- All adapters: getDmPolicy() interface method
- Discord: serverId (guildId), wasMentioned, pairing bypass for guilds
- Signal: group messages bypass pairing
- Slack: wasMentioned field on messages

instantGroups Config:
- Per-channel instantGroups config to bypass batching for specific groups
- For Discord, checked against both serverId and chatId
- YAML config → env vars → parsed in main.ts → Set passed to bot

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: preserve large numeric IDs in instantGroups YAML config

Discord snowflake IDs exceed Number.MAX_SAFE_INTEGER, so YAML parses
unquoted IDs as lossy JavaScript numbers. Use the document AST to
extract the original string representation and avoid precision loss.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: Slack dmPolicy, Telegram group gating check

- Add dmPolicy to SlackConfig and wire through config/env/adapter
  (was hardcoded to 'open', now reads from config like other adapters)
- Check isGroupApproved() in Telegram middleware before processing
  group messages (approveGroup was called but never checked)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Gabriele Sarti
2026-02-07 17:47:22 -05:00
committed by GitHub
parent c88621574a
commit 66e8c462bf
14 changed files with 623 additions and 34 deletions

View File

@@ -177,33 +177,36 @@ Ask the bot owner to approve with:
}
}
const access = await this.checkAccess(userId);
if (access === 'blocked') {
const ch = message.channel;
if (ch.isTextBased() && 'send' in ch) {
await (ch as { send: (content: string) => Promise<unknown> }).send(
"Sorry, you're not authorized to use this bot."
);
}
return;
}
if (access === 'pairing') {
const { code, created } = await upsertPairingRequest('discord', userId, {
username: message.author.username,
});
if (!code) {
await message.channel.send('Too many pending pairing requests. Please try again later.');
// Bypass pairing for guild (group) messages
if (!message.guildId) {
const access = await this.checkAccess(userId);
if (access === 'blocked') {
const ch = message.channel;
if (ch.isTextBased() && 'send' in ch) {
await (ch as { send: (content: string) => Promise<unknown> }).send(
"Sorry, you're not authorized to use this bot."
);
}
return;
}
if (created) {
console.log(`[Discord] New pairing request from ${userId} (${message.author.username}): ${code}`);
}
if (access === 'pairing') {
const { code, created } = await upsertPairingRequest('discord', userId, {
username: message.author.username,
});
await this.sendPairingMessage(message, this.formatPairingMsg(code));
return;
if (!code) {
await message.channel.send('Too many pending pairing requests. Please try again later.');
return;
}
if (created) {
console.log(`[Discord] New pairing request from ${userId} (${message.author.username}): ${code}`);
}
await this.sendPairingMessage(message, this.formatPairingMsg(code));
return;
}
}
const attachments = await this.collectAttachments(message.attachments, message.channel.id);
@@ -237,6 +240,7 @@ Ask the bot owner to approve with:
const isGroup = !!message.guildId;
const groupName = isGroup && 'name' in message.channel ? message.channel.name : undefined;
const displayName = message.member?.displayName || message.author.globalName || message.author.username;
const wasMentioned = isGroup && !!this.client?.user && message.mentions.has(this.client.user);
await this.onMessage({
channel: 'discord',
@@ -249,6 +253,8 @@ Ask the bot owner to approve with:
timestamp: message.createdAt,
isGroup,
groupName,
serverId: message.guildId || undefined,
wasMentioned,
attachments,
});
}
@@ -318,6 +324,10 @@ Ask the bot owner to approve with:
}
}
getDmPolicy(): string {
return this.config.dmPolicy || 'pairing';
}
supportsEditing(): boolean {
return true;
}
@@ -375,6 +385,7 @@ Ask the bot owner to approve with:
timestamp: new Date(),
isGroup,
groupName,
serverId: message.guildId || undefined,
reaction: {
emoji,
messageId: message.id,

View File

@@ -303,6 +303,10 @@ This code expires in 1 hour.`;
};
}
getDmPolicy(): string {
return this.config.dmPolicy || 'pairing';
}
supportsEditing(): boolean {
return false;
}
@@ -679,8 +683,11 @@ This code expires in 1 hour.`;
}
// selfChatMode enabled - allow the message through
console.log('[Signal] Note to Self allowed (selfChatMode enabled)');
} else if (chatId.startsWith('group:')) {
// Group messages bypass pairing - anyone in the group can interact
console.log('[Signal] Group message - bypassing access control');
} else {
// External message - check access control
// External DM - check access control
console.log('[Signal] Checking access for external message');
const access = await this.checkAccess(source);
console.log(`[Signal] Access result: ${access}`);

View File

@@ -17,6 +17,7 @@ let App: typeof import('@slack/bolt').App;
export interface SlackConfig {
botToken: string; // xoxb-...
appToken: string; // xapp-... (for Socket Mode)
dmPolicy?: 'pairing' | 'allowlist' | 'open';
allowedUsers?: string[]; // Slack user IDs (e.g., U01234567)
attachmentsDir?: string;
attachmentsMaxBytes?: number;
@@ -139,11 +140,12 @@ export class SlackAdapter implements ChannelAdapter {
threadId: threadTs,
isGroup,
groupName: isGroup ? channelId : undefined, // Would need conversations.info for name
wasMentioned: false, // Regular messages; app_mention handles mentions
attachments,
});
}
});
// Handle app mentions (@bot)
this.app.event('app_mention', async ({ event }) => {
const userId = event.user || '';
@@ -189,6 +191,7 @@ export class SlackAdapter implements ChannelAdapter {
threadId: threadTs,
isGroup,
groupName: isGroup ? channelId : undefined,
wasMentioned: true, // app_mention is always a mention
attachments,
});
}
@@ -274,6 +277,10 @@ export class SlackAdapter implements ChannelAdapter {
});
}
getDmPolicy(): string {
return this.config.dmPolicy || 'pairing';
}
async sendTypingIndicator(_chatId: string): Promise<void> {
// Slack doesn't have a typing indicator API for bots
// This is a no-op

View File

@@ -14,6 +14,7 @@ import {
upsertPairingRequest,
formatPairingMessage,
} from '../pairing/store.js';
import { isGroupApproved, approveGroup } from '../pairing/group-store.js';
import { basename } from 'node:path';
import { buildAttachmentPath, downloadToFile } from './attachments.js';
@@ -79,17 +80,68 @@ export class TelegramAdapter implements ChannelAdapter {
}
private setupHandlers(): void {
// Middleware: Check access based on dmPolicy
// Detect when bot is added/removed from groups (proactive group gating)
this.bot.on('my_chat_member', async (ctx) => {
const chatMember = ctx.myChatMember;
if (!chatMember) return;
const chatType = chatMember.chat.type;
if (chatType !== 'group' && chatType !== 'supergroup') return;
const newStatus = chatMember.new_chat_member.status;
if (newStatus !== 'member' && newStatus !== 'administrator') return;
const chatId = String(chatMember.chat.id);
const fromId = String(chatMember.from.id);
const dmPolicy = this.config.dmPolicy || 'pairing';
// No gating when policy is not pairing
if (dmPolicy !== 'pairing') {
await approveGroup('telegram', chatId);
console.log(`[Telegram] Group ${chatId} auto-approved (dmPolicy=${dmPolicy})`);
return;
}
// Check if the user who added the bot is paired
const configAllowlist = this.config.allowedUsers?.map(String);
const allowed = await isUserAllowed('telegram', fromId, configAllowlist);
if (allowed) {
await approveGroup('telegram', chatId);
console.log(`[Telegram] Group ${chatId} approved by paired user ${fromId}`);
} else {
console.log(`[Telegram] Unpaired user ${fromId} tried to add bot to group ${chatId}, leaving`);
try {
await ctx.api.sendMessage(chatId, 'This bot can only be added to groups by paired users.');
await ctx.api.leaveChat(chatId);
} catch (err) {
console.error('[Telegram] Failed to leave group:', err);
}
}
});
// Middleware: Check access based on dmPolicy (bypass for groups)
this.bot.use(async (ctx, next) => {
const userId = ctx.from?.id;
if (!userId) return;
// Group gating: check if group is approved before processing
const chatType = ctx.chat?.type;
if (chatType === 'group' || chatType === 'supergroup') {
const dmPolicy = this.config.dmPolicy || 'pairing';
if (dmPolicy === 'open' || await isGroupApproved('telegram', String(ctx.chat!.id))) {
await next();
}
// Silently drop messages from unapproved groups
return;
}
const access = await this.checkAccess(
String(userId),
ctx.from?.username,
ctx.from?.first_name
);
if (access === 'allowed') {
await next();
return;
@@ -158,19 +210,49 @@ export class TelegramAdapter implements ChannelAdapter {
const userId = ctx.from?.id;
const chatId = ctx.chat.id;
const text = ctx.message.text;
if (!userId) return;
if (text.startsWith('/')) return; // Skip other commands
// Group detection
const chatType = ctx.chat.type;
const isGroup = chatType === 'group' || chatType === 'supergroup';
const groupName = isGroup && 'title' in ctx.chat ? ctx.chat.title : undefined;
// Mention detection for groups
let wasMentioned = false;
if (isGroup) {
const botUsername = this.bot.botInfo?.username;
if (botUsername) {
// Check entities for bot_command or mention matching our username
const entities = ctx.message.entities || [];
wasMentioned = entities.some((e) => {
if (e.type === 'mention') {
const mentioned = text.substring(e.offset, e.offset + e.length);
return mentioned.toLowerCase() === `@${botUsername.toLowerCase()}`;
}
return false;
});
// Fallback: text-based check
if (!wasMentioned) {
wasMentioned = text.toLowerCase().includes(`@${botUsername.toLowerCase()}`);
}
}
}
if (this.onMessage) {
await this.onMessage({
channel: 'telegram',
chatId: String(chatId),
userId: String(userId),
userName: ctx.from.username || ctx.from.first_name,
userHandle: ctx.from.username,
messageId: String(ctx.message.message_id),
text,
timestamp: new Date(),
isGroup,
groupName,
wasMentioned,
});
}
});
@@ -433,6 +515,10 @@ export class TelegramAdapter implements ChannelAdapter {
]);
}
getDmPolicy(): string {
return this.config.dmPolicy || 'pairing';
}
async sendTypingIndicator(chatId: string): Promise<void> {
await this.bot.api.sendChatAction(chatId, 'typing');
}

View File

@@ -26,6 +26,7 @@ export interface ChannelAdapter {
// Capabilities (optional)
supportsEditing?(): boolean;
sendFile?(file: OutboundFile): Promise<{ messageId: string }>;
getDmPolicy?(): string;
// Event handlers (set by bot core)
onMessage?: (msg: InboundMessage) => Promise<void>;

View File

@@ -977,6 +977,10 @@ export class WhatsAppAdapter implements ChannelAdapter {
);
}
getDmPolicy(): string {
return this.config.dmPolicy || 'pairing';
}
supportsEditing(): boolean {
return false;
}

View File

@@ -58,7 +58,12 @@ export function loadConfig(): LettaBotConfig {
try {
const content = readFileSync(configPath, 'utf-8');
const parsed = YAML.parse(content) as Partial<LettaBotConfig>;
// Fix instantGroups: YAML parses large numeric IDs (e.g. Discord snowflakes)
// as JavaScript numbers, losing precision for values > Number.MAX_SAFE_INTEGER.
// Re-extract from document AST to preserve the original string representation.
fixInstantGroupIds(content, parsed);
// Merge with defaults
return {
...DEFAULT_CONFIG,
@@ -133,6 +138,15 @@ export function configToEnv(config: LettaBotConfig): Record<string, string> {
if (config.channels.slack?.botToken) {
env.SLACK_BOT_TOKEN = config.channels.slack.botToken;
}
if (config.channels.slack?.dmPolicy) {
env.SLACK_DM_POLICY = config.channels.slack.dmPolicy;
}
if (config.channels.slack?.groupPollIntervalMin !== undefined) {
env.SLACK_GROUP_POLL_INTERVAL_MIN = String(config.channels.slack.groupPollIntervalMin);
}
if (config.channels.slack?.instantGroups?.length) {
env.SLACK_INSTANT_GROUPS = config.channels.slack.instantGroups.join(',');
}
if (config.channels.whatsapp?.enabled) {
env.WHATSAPP_ENABLED = 'true';
if (config.channels.whatsapp.selfChat) {
@@ -141,6 +155,12 @@ export function configToEnv(config: LettaBotConfig): Record<string, string> {
env.WHATSAPP_SELF_CHAT_MODE = 'false';
}
}
if (config.channels.whatsapp?.groupPollIntervalMin !== undefined) {
env.WHATSAPP_GROUP_POLL_INTERVAL_MIN = String(config.channels.whatsapp.groupPollIntervalMin);
}
if (config.channels.whatsapp?.instantGroups?.length) {
env.WHATSAPP_INSTANT_GROUPS = config.channels.whatsapp.instantGroups.join(',');
}
if (config.channels.signal?.phone) {
env.SIGNAL_PHONE_NUMBER = config.channels.signal.phone;
// Signal selfChat defaults to true, so only set env if explicitly false
@@ -148,6 +168,18 @@ export function configToEnv(config: LettaBotConfig): Record<string, string> {
env.SIGNAL_SELF_CHAT_MODE = 'false';
}
}
if (config.channels.signal?.groupPollIntervalMin !== undefined) {
env.SIGNAL_GROUP_POLL_INTERVAL_MIN = String(config.channels.signal.groupPollIntervalMin);
}
if (config.channels.signal?.instantGroups?.length) {
env.SIGNAL_INSTANT_GROUPS = config.channels.signal.instantGroups.join(',');
}
if (config.channels.telegram?.groupPollIntervalMin !== undefined) {
env.TELEGRAM_GROUP_POLL_INTERVAL_MIN = String(config.channels.telegram.groupPollIntervalMin);
}
if (config.channels.telegram?.instantGroups?.length) {
env.TELEGRAM_INSTANT_GROUPS = config.channels.telegram.instantGroups.join(',');
}
if (config.channels.discord?.token) {
env.DISCORD_BOT_TOKEN = config.channels.discord.token;
if (config.channels.discord.dmPolicy) {
@@ -157,6 +189,12 @@ export function configToEnv(config: LettaBotConfig): Record<string, string> {
env.DISCORD_ALLOWED_USERS = config.channels.discord.allowedUsers.join(',');
}
}
if (config.channels.discord?.groupPollIntervalMin !== undefined) {
env.DISCORD_GROUP_POLL_INTERVAL_MIN = String(config.channels.discord.groupPollIntervalMin);
}
if (config.channels.discord?.instantGroups?.length) {
env.DISCORD_INSTANT_GROUPS = config.channels.discord.instantGroups.join(',');
}
// Features
if (config.features?.cron) {
@@ -281,3 +319,49 @@ export async function syncProviders(config: LettaBotConfig): Promise<void> {
}
}
}
/**
* Fix instantGroups arrays that may contain large numeric IDs parsed by YAML.
* Discord snowflake IDs exceed Number.MAX_SAFE_INTEGER, so YAML parses them
* as lossy JavaScript numbers. We re-read from the document AST to get the
* original string representation.
*/
function fixInstantGroupIds(yamlContent: string, parsed: Partial<LettaBotConfig>): void {
if (!parsed.channels) return;
try {
const doc = YAML.parseDocument(yamlContent);
const channels = ['telegram', 'slack', 'whatsapp', 'signal', 'discord'] as const;
for (const ch of channels) {
const seq = doc.getIn(['channels', ch, 'instantGroups'], true);
if (YAML.isSeq(seq)) {
const fixed = seq.items.map((item: unknown) => {
if (YAML.isScalar(item)) {
// For numbers, use the original source text to avoid precision loss
if (typeof item.value === 'number' && item.source) {
return item.source;
}
return String(item.value);
}
return String(item);
});
const cfg = parsed.channels[ch];
if (cfg) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(cfg as any).instantGroups = fixed;
}
}
}
} catch {
// Fallback: just ensure entries are strings (won't fix precision, but safe)
const channels = ['telegram', 'slack', 'whatsapp', 'signal', 'discord'] as const;
for (const ch of channels) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const cfg = parsed.channels?.[ch] as any;
if (cfg && Array.isArray(cfg.instantGroups)) {
cfg.instantGroups = cfg.instantGroups.map((v: unknown) => String(v));
}
}
}
}

View File

@@ -100,13 +100,18 @@ export interface TelegramConfig {
token?: string;
dmPolicy?: 'pairing' | 'allowlist' | 'open';
allowedUsers?: string[];
groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate)
instantGroups?: string[]; // Group chat IDs that bypass batching
}
export interface SlackConfig {
enabled: boolean;
appToken?: string;
botToken?: string;
dmPolicy?: 'pairing' | 'allowlist' | 'open';
allowedUsers?: string[];
groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate)
instantGroups?: string[]; // Channel IDs that bypass batching
}
export interface WhatsAppConfig {
@@ -118,6 +123,8 @@ export interface WhatsAppConfig {
groupAllowFrom?: string[];
mentionPatterns?: string[];
groups?: Record<string, { requireMention?: boolean }>;
groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate)
instantGroups?: string[]; // Group JIDs that bypass batching
}
export interface SignalConfig {
@@ -129,6 +136,8 @@ export interface SignalConfig {
// Group gating
mentionPatterns?: string[]; // Regex patterns for mention detection (e.g., ["@bot"])
groups?: Record<string, { requireMention?: boolean }>; // Per-group settings, "*" for defaults
groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate)
instantGroups?: string[]; // Group IDs that bypass batching
}
export interface DiscordConfig {
@@ -136,6 +145,8 @@ export interface DiscordConfig {
token?: string;
dmPolicy?: 'pairing' | 'allowlist' | 'open';
allowedUsers?: string[];
groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate)
instantGroups?: string[]; // Guild/server IDs or channel IDs that bypass batching
}
export interface GoogleConfig {

View File

@@ -11,7 +11,8 @@ import type { BotConfig, InboundMessage, TriggerContext } from './types.js';
import { Store } from './store.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
import { installSkillsToAgent } from '../skills/loader.js';
import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js';
import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js';
import type { GroupBatcher } from './group-batcher.js';
import { loadMemoryBlocks } from './memory.js';
import { SYSTEM_PROMPT } from './system-prompt.js';
@@ -41,6 +42,9 @@ export class LettaBot {
// Callback to trigger heartbeat (set by main.ts)
public onTriggerHeartbeat?: () => Promise<void>;
private groupBatcher?: GroupBatcher;
private groupIntervals: Map<string, number> = new Map(); // channel -> intervalMin
private instantGroupIds: Set<string> = new Set(); // channel:id keys for instant processing
private processing = false;
constructor(config: BotConfig) {
@@ -65,6 +69,37 @@ export class LettaBot {
console.log(`Registered channel: ${adapter.name}`);
}
/**
* Set the group batcher and per-channel intervals.
*/
setGroupBatcher(batcher: GroupBatcher, intervals: Map<string, number>, instantGroupIds?: Set<string>): void {
this.groupBatcher = batcher;
this.groupIntervals = intervals;
if (instantGroupIds) {
this.instantGroupIds = instantGroupIds;
}
console.log('[Bot] Group batcher configured');
}
/**
* Inject a batched group message into the queue and trigger processing.
* Called by GroupBatcher's onFlush callback.
*/
processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void {
const count = msg.batchedMessages?.length || 0;
console.log(`[Bot] Group batch: ${count} messages from ${msg.channel}:${msg.chatId}`);
// Unwrap single-message batches so they use formatMessageEnvelope (DM-style)
// instead of the chat-log batch format
const effective = (count === 1 && msg.batchedMessages)
? msg.batchedMessages[0]
: msg;
this.messageQueue.push({ msg: effective, adapter });
if (!this.processing) {
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
}
}
/**
* Handle slash commands
*/
@@ -218,7 +253,18 @@ export class LettaBot {
*/
private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise<void> {
console.log(`[${msg.channel}] Message from ${msg.userId}: ${msg.text}`);
// Route group messages to batcher if configured
if (msg.isGroup && this.groupBatcher) {
// Check if this group is configured for instant processing
const isInstant = this.instantGroupIds.has(`${msg.channel}:${msg.chatId}`)
|| (msg.serverId && this.instantGroupIds.has(`${msg.channel}:${msg.serverId}`));
const intervalMin = isInstant ? 0 : (this.groupIntervals.get(msg.channel) ?? 10);
console.log(`[Bot] Group message routed to batcher (interval=${intervalMin}min, mentioned=${msg.wasMentioned}, instant=${!!isInstant})`);
this.groupBatcher.enqueue(msg, adapter, intervalMin);
return;
}
// Add to queue
this.messageQueue.push({ msg, adapter });
console.log(`[Queue] Added to queue, length: ${this.messageQueue.length}, processing: ${this.processing}`);
@@ -394,7 +440,9 @@ export class LettaBot {
} : undefined;
// Send message to agent with metadata envelope
const formattedMessage = formatMessageEnvelope(msg, {}, sessionContext);
const formattedMessage = msg.isBatch && msg.batchedMessages
? formatGroupBatchEnvelope(msg.batchedMessages)
: formatMessageEnvelope(msg);
try {
await withTimeout(session.send(formattedMessage), 'Session send');
} catch (sendError) {

View File

@@ -39,6 +39,31 @@ const DEFAULT_OPTIONS: EnvelopeOptions = {
includeGroup: true,
};
/**
* Format a short time string (e.g., "4:30 PM")
*/
function formatShortTime(date: Date, options: EnvelopeOptions): string {
let timeZone: string | undefined;
if (options.timezone === 'utc') {
timeZone = 'UTC';
} else if (options.timezone && options.timezone !== 'local') {
try {
new Intl.DateTimeFormat('en-US', { timeZone: options.timezone });
timeZone = options.timezone;
} catch {
timeZone = undefined;
}
}
const formatter = new Intl.DateTimeFormat('en-US', {
hour: 'numeric',
minute: '2-digit',
hour12: true,
timeZone,
});
return formatter.format(date);
}
/**
* Session context options for first-message enrichment
*/
@@ -337,3 +362,60 @@ export function formatMessageEnvelope(
}
return reminder;
}
/**
* Format a group batch of messages as a chat log for the agent.
*
* Output format:
* [GROUP CHAT - discord:123 #general - 3 messages]
* [4:30 PM] Alice: Hey everyone
* [4:32 PM] Bob: What's up?
* [4:35 PM] Alice: @LettaBot can you help?
* (Format: **bold** *italic* ...)
*/
export function formatGroupBatchEnvelope(
messages: InboundMessage[],
options: EnvelopeOptions = {}
): string {
if (messages.length === 0) return '';
const opts = { ...DEFAULT_OPTIONS, ...options };
const first = messages[0];
// Header: [GROUP CHAT - channel:chatId #groupName - N messages]
const headerParts: string[] = ['GROUP CHAT'];
headerParts.push(`${first.channel}:${first.chatId}`);
if (first.groupName?.trim()) {
if ((first.channel === 'slack' || first.channel === 'discord') && !first.groupName.startsWith('#')) {
headerParts.push(`#${first.groupName}`);
} else {
headerParts.push(first.groupName);
}
}
headerParts.push(`${messages.length} message${messages.length === 1 ? '' : 's'}`);
const header = `[${headerParts.join(' - ')}]`;
// Chat log lines
const lines = messages.map((msg) => {
const time = formatShortTime(msg.timestamp, opts);
const sender = formatSender(msg);
const textParts: string[] = [];
if (msg.text?.trim()) textParts.push(msg.text.trim());
if (msg.reaction) {
const action = msg.reaction.action || 'added';
textParts.push(`[Reaction ${action}: ${msg.reaction.emoji}]`);
}
if (msg.attachments && msg.attachments.length > 0) {
const names = msg.attachments.map((a) => a.name || 'attachment').join(', ');
textParts.push(`[Attachments: ${names}]`);
}
const body = textParts.join(' ') || '(empty)';
return `[${time}] ${sender}: ${body}`;
});
// Format hint
const formatHint = CHANNEL_FORMATS[first.channel];
const hint = formatHint ? `\n(Format: ${formatHint})` : '';
return `${header}\n${lines.join('\n')}${hint}`;
}

113
src/core/group-batcher.ts Normal file
View File

@@ -0,0 +1,113 @@
/**
* Group Message Batcher
*
* Buffers group chat messages and flushes them periodically or on @mention.
* Channel-agnostic: works with any ChannelAdapter.
*/
import type { ChannelAdapter } from '../channels/types.js';
import type { InboundMessage } from './types.js';
export interface BufferEntry {
messages: InboundMessage[];
adapter: ChannelAdapter;
timer: ReturnType<typeof setTimeout> | null;
}
export type OnFlushCallback = (msg: InboundMessage, adapter: ChannelAdapter) => void;
export class GroupBatcher {
private buffer: Map<string, BufferEntry> = new Map();
private onFlush: OnFlushCallback;
constructor(onFlush: OnFlushCallback) {
this.onFlush = onFlush;
}
/**
* Add a group message to the buffer.
* If wasMentioned, flush immediately.
* If intervalMin is 0, flush on every message (no batching).
* Otherwise, start a timer on the first message (does NOT reset on subsequent messages).
*/
enqueue(msg: InboundMessage, adapter: ChannelAdapter, intervalMin: number): void {
const key = `${msg.channel}:${msg.chatId}`;
let entry = this.buffer.get(key);
if (!entry) {
entry = { messages: [], adapter, timer: null };
this.buffer.set(key, entry);
}
entry.messages.push(msg);
entry.adapter = adapter; // Update adapter reference
// Immediate flush: @mention or intervalMin=0
if (msg.wasMentioned || intervalMin === 0) {
this.flush(key);
return;
}
// Start timer on first message only (don't reset to prevent starvation)
if (!entry.timer) {
const ms = intervalMin * 60 * 1000;
entry.timer = setTimeout(() => {
this.flush(key);
}, ms);
}
}
/**
* Flush buffered messages for a key, building a synthetic batch InboundMessage.
*/
flush(key: string): void {
const entry = this.buffer.get(key);
if (!entry || entry.messages.length === 0) return;
// Clear timer
if (entry.timer) {
clearTimeout(entry.timer);
entry.timer = null;
}
const messages = entry.messages;
const adapter = entry.adapter;
// Remove from buffer
this.buffer.delete(key);
// Use the last message as the base for the synthetic batch message
const last = messages[messages.length - 1];
const batchMsg: InboundMessage = {
channel: last.channel,
chatId: last.chatId,
userId: last.userId,
userName: last.userName,
userHandle: last.userHandle,
messageId: last.messageId,
text: messages.map((m) => m.text).join('\n'),
timestamp: last.timestamp,
isGroup: true,
groupName: last.groupName,
wasMentioned: messages.some((m) => m.wasMentioned),
isBatch: true,
batchedMessages: messages,
};
this.onFlush(batchMsg, adapter);
}
/**
* Clear all timers on shutdown.
*/
stop(): void {
for (const [, entry] of this.buffer) {
if (entry.timer) {
clearTimeout(entry.timer);
entry.timer = null;
}
}
this.buffer.clear();
}
}

View File

@@ -76,10 +76,13 @@ export interface InboundMessage {
threadId?: string; // Slack thread_ts
isGroup?: boolean; // Is this from a group chat?
groupName?: string; // Group/channel name if applicable
serverId?: string; // Server/guild ID (Discord only)
wasMentioned?: boolean; // Was bot explicitly mentioned? (groups only)
replyToUser?: string; // Phone number of who they're replying to (if reply)
attachments?: InboundAttachment[];
reaction?: InboundReaction;
isBatch?: boolean; // Is this a batched group message?
batchedMessages?: InboundMessage[]; // Original individual messages (for batch formatting)
}
/**

View File

@@ -119,6 +119,7 @@ import { SlackAdapter } from './channels/slack.js';
import { WhatsAppAdapter } from './channels/whatsapp/index.js';
import { SignalAdapter } from './channels/signal.js';
import { DiscordAdapter } from './channels/discord.js';
import { GroupBatcher } from './core/group-batcher.js';
import { CronService } from './cron/service.js';
import { HeartbeatService } from './cron/heartbeat.js';
import { PollingService } from './polling/service.js';
@@ -244,12 +245,21 @@ const config = {
token: process.env.TELEGRAM_BOT_TOKEN || '',
dmPolicy: (process.env.TELEGRAM_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open',
allowedUsers: process.env.TELEGRAM_ALLOWED_USERS?.split(',').filter(Boolean).map(Number) || [],
groupPollIntervalMin: process.env.TELEGRAM_GROUP_POLL_INTERVAL_MIN !== undefined
? parseInt(process.env.TELEGRAM_GROUP_POLL_INTERVAL_MIN, 10)
: 10,
instantGroups: process.env.TELEGRAM_INSTANT_GROUPS?.split(',').filter(Boolean) || [],
},
slack: {
enabled: !!process.env.SLACK_BOT_TOKEN && !!process.env.SLACK_APP_TOKEN,
botToken: process.env.SLACK_BOT_TOKEN || '',
appToken: process.env.SLACK_APP_TOKEN || '',
dmPolicy: (process.env.SLACK_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open',
allowedUsers: process.env.SLACK_ALLOWED_USERS?.split(',').filter(Boolean) || [],
groupPollIntervalMin: process.env.SLACK_GROUP_POLL_INTERVAL_MIN !== undefined
? parseInt(process.env.SLACK_GROUP_POLL_INTERVAL_MIN, 10)
: 10,
instantGroups: process.env.SLACK_INSTANT_GROUPS?.split(',').filter(Boolean) || [],
},
whatsapp: {
enabled: process.env.WHATSAPP_ENABLED === 'true',
@@ -257,6 +267,10 @@ const config = {
dmPolicy: (process.env.WHATSAPP_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open',
allowedUsers: process.env.WHATSAPP_ALLOWED_USERS?.split(',').filter(Boolean) || [],
selfChatMode: process.env.WHATSAPP_SELF_CHAT_MODE !== 'false', // Default true (safe - only self-chat)
groupPollIntervalMin: process.env.WHATSAPP_GROUP_POLL_INTERVAL_MIN !== undefined
? parseInt(process.env.WHATSAPP_GROUP_POLL_INTERVAL_MIN, 10)
: 10,
instantGroups: process.env.WHATSAPP_INSTANT_GROUPS?.split(',').filter(Boolean) || [],
},
signal: {
enabled: !!process.env.SIGNAL_PHONE_NUMBER,
@@ -267,12 +281,20 @@ const config = {
dmPolicy: (process.env.SIGNAL_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open',
allowedUsers: process.env.SIGNAL_ALLOWED_USERS?.split(',').filter(Boolean) || [],
selfChatMode: process.env.SIGNAL_SELF_CHAT_MODE !== 'false', // Default true
groupPollIntervalMin: process.env.SIGNAL_GROUP_POLL_INTERVAL_MIN !== undefined
? parseInt(process.env.SIGNAL_GROUP_POLL_INTERVAL_MIN, 10)
: 10,
instantGroups: process.env.SIGNAL_INSTANT_GROUPS?.split(',').filter(Boolean) || [],
},
discord: {
enabled: !!process.env.DISCORD_BOT_TOKEN,
token: process.env.DISCORD_BOT_TOKEN || '',
dmPolicy: (process.env.DISCORD_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open',
allowedUsers: process.env.DISCORD_ALLOWED_USERS?.split(',').filter(Boolean) || [],
groupPollIntervalMin: process.env.DISCORD_GROUP_POLL_INTERVAL_MIN !== undefined
? parseInt(process.env.DISCORD_GROUP_POLL_INTERVAL_MIN, 10)
: 10,
instantGroups: process.env.DISCORD_INSTANT_GROUPS?.split(',').filter(Boolean) || [],
},
// Cron
@@ -414,6 +436,7 @@ async function main() {
const slack = new SlackAdapter({
botToken: config.slack.botToken,
appToken: config.slack.appToken,
dmPolicy: config.slack.dmPolicy,
allowedUsers: config.slack.allowedUsers.length > 0 ? config.slack.allowedUsers : undefined,
attachmentsDir,
attachmentsMaxBytes: config.attachmentsMaxBytes,
@@ -467,6 +490,46 @@ async function main() {
bot.registerChannel(discord);
}
// Create and wire group batcher
const groupIntervals = new Map<string, number>();
if (config.telegram.enabled) {
groupIntervals.set('telegram', config.telegram.groupPollIntervalMin ?? 10);
}
if (config.slack.enabled) {
groupIntervals.set('slack', config.slack.groupPollIntervalMin ?? 10);
}
if (config.whatsapp.enabled) {
groupIntervals.set('whatsapp', config.whatsapp.groupPollIntervalMin ?? 10);
}
if (config.signal.enabled) {
groupIntervals.set('signal', config.signal.groupPollIntervalMin ?? 10);
}
if (config.discord.enabled) {
groupIntervals.set('discord', config.discord.groupPollIntervalMin ?? 10);
}
// Build instant group IDs set (channel:id format)
const instantGroupIds = new Set<string>();
const channelInstantGroups: Array<[string, string[]]> = [
['telegram', config.telegram.instantGroups],
['slack', config.slack.instantGroups],
['whatsapp', config.whatsapp.instantGroups],
['signal', config.signal.instantGroups],
['discord', config.discord.instantGroups],
];
for (const [channel, ids] of channelInstantGroups) {
for (const id of ids) {
instantGroupIds.add(`${channel}:${id}`);
}
}
if (instantGroupIds.size > 0) {
console.log(`[Groups] Instant groups: ${[...instantGroupIds].join(', ')}`);
}
const groupBatcher = new GroupBatcher((msg, adapter) => {
bot.processGroupBatch(msg, adapter);
});
bot.setGroupBatcher(groupBatcher, groupIntervals, instantGroupIds);
// Start cron service if enabled
// Note: CronService uses getDataDir() for cron-jobs.json to match the CLI
let cronService: CronService | null = null;
@@ -546,6 +609,7 @@ async function main() {
// Handle shutdown
const shutdown = async () => {
console.log('\nShutting down...');
groupBatcher.stop();
heartbeatService?.stop();
cronService?.stop();
await bot.stop();

View File

@@ -0,0 +1,68 @@
/**
* Approved Groups Store
*
* Tracks which groups have been approved (activated by a paired user).
* Only relevant when dmPolicy === 'pairing'.
*
* Storage: ~/.lettabot/credentials/{channel}-approvedGroups.json
*/
import fs from 'node:fs';
import path from 'node:path';
import os from 'node:os';
import crypto from 'node:crypto';
interface ApprovedGroupsStore {
version: 1;
groups: string[];
}
function getCredentialsDir(): string {
return path.join(os.homedir(), '.lettabot', 'credentials');
}
function getStorePath(channel: string): string {
return path.join(getCredentialsDir(), `${channel}-approvedGroups.json`);
}
async function ensureDir(dir: string): Promise<void> {
await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 });
}
async function readJson<T>(filePath: string, fallback: T): Promise<T> {
try {
const raw = await fs.promises.readFile(filePath, 'utf-8');
return JSON.parse(raw) as T;
} catch {
return fallback;
}
}
async function writeJson(filePath: string, data: unknown): Promise<void> {
await ensureDir(path.dirname(filePath));
const tmp = `${filePath}.${crypto.randomUUID()}.tmp`;
await fs.promises.writeFile(tmp, JSON.stringify(data, null, 2) + '\n', { encoding: 'utf-8' });
await fs.promises.chmod(tmp, 0o600);
await fs.promises.rename(tmp, filePath);
}
/**
* Check if a group has been approved for a given channel.
*/
export async function isGroupApproved(channel: string, chatId: string): Promise<boolean> {
const filePath = getStorePath(channel);
const store = await readJson<ApprovedGroupsStore>(filePath, { version: 1, groups: [] });
return (store.groups || []).includes(chatId);
}
/**
* Approve a group for a given channel.
*/
export async function approveGroup(channel: string, chatId: string): Promise<void> {
const filePath = getStorePath(channel);
const store = await readJson<ApprovedGroupsStore>(filePath, { version: 1, groups: [] });
const groups = store.groups || [];
if (groups.includes(chatId)) return;
groups.push(chatId);
await writeJson(filePath, { version: 1, groups });
}