diff --git a/src/channels/discord.ts b/src/channels/discord.ts index 1a0e6ff..e6de0ae 100644 --- a/src/channels/discord.ts +++ b/src/channels/discord.ts @@ -177,33 +177,36 @@ Ask the bot owner to approve with: } } - const access = await this.checkAccess(userId); - if (access === 'blocked') { - const ch = message.channel; - if (ch.isTextBased() && 'send' in ch) { - await (ch as { send: (content: string) => Promise }).send( - "Sorry, you're not authorized to use this bot." - ); - } - return; - } - - if (access === 'pairing') { - const { code, created } = await upsertPairingRequest('discord', userId, { - username: message.author.username, - }); - - if (!code) { - await message.channel.send('Too many pending pairing requests. Please try again later.'); + // Bypass pairing for guild (group) messages + if (!message.guildId) { + const access = await this.checkAccess(userId); + if (access === 'blocked') { + const ch = message.channel; + if (ch.isTextBased() && 'send' in ch) { + await (ch as { send: (content: string) => Promise }).send( + "Sorry, you're not authorized to use this bot." + ); + } return; } - if (created) { - console.log(`[Discord] New pairing request from ${userId} (${message.author.username}): ${code}`); - } + if (access === 'pairing') { + const { code, created } = await upsertPairingRequest('discord', userId, { + username: message.author.username, + }); - await this.sendPairingMessage(message, this.formatPairingMsg(code)); - return; + if (!code) { + await message.channel.send('Too many pending pairing requests. Please try again later.'); + return; + } + + if (created) { + console.log(`[Discord] New pairing request from ${userId} (${message.author.username}): ${code}`); + } + + await this.sendPairingMessage(message, this.formatPairingMsg(code)); + return; + } } const attachments = await this.collectAttachments(message.attachments, message.channel.id); @@ -237,6 +240,7 @@ Ask the bot owner to approve with: const isGroup = !!message.guildId; const groupName = isGroup && 'name' in message.channel ? message.channel.name : undefined; const displayName = message.member?.displayName || message.author.globalName || message.author.username; + const wasMentioned = isGroup && !!this.client?.user && message.mentions.has(this.client.user); await this.onMessage({ channel: 'discord', @@ -249,6 +253,8 @@ Ask the bot owner to approve with: timestamp: message.createdAt, isGroup, groupName, + serverId: message.guildId || undefined, + wasMentioned, attachments, }); } @@ -318,6 +324,10 @@ Ask the bot owner to approve with: } } + getDmPolicy(): string { + return this.config.dmPolicy || 'pairing'; + } + supportsEditing(): boolean { return true; } @@ -375,6 +385,7 @@ Ask the bot owner to approve with: timestamp: new Date(), isGroup, groupName, + serverId: message.guildId || undefined, reaction: { emoji, messageId: message.id, diff --git a/src/channels/signal.ts b/src/channels/signal.ts index 389885b..a69fecc 100644 --- a/src/channels/signal.ts +++ b/src/channels/signal.ts @@ -303,6 +303,10 @@ This code expires in 1 hour.`; }; } + getDmPolicy(): string { + return this.config.dmPolicy || 'pairing'; + } + supportsEditing(): boolean { return false; } @@ -679,8 +683,11 @@ This code expires in 1 hour.`; } // selfChatMode enabled - allow the message through console.log('[Signal] Note to Self allowed (selfChatMode enabled)'); + } else if (chatId.startsWith('group:')) { + // Group messages bypass pairing - anyone in the group can interact + console.log('[Signal] Group message - bypassing access control'); } else { - // External message - check access control + // External DM - check access control console.log('[Signal] Checking access for external message'); const access = await this.checkAccess(source); console.log(`[Signal] Access result: ${access}`); diff --git a/src/channels/slack.ts b/src/channels/slack.ts index bb54840..1c4d41b 100644 --- a/src/channels/slack.ts +++ b/src/channels/slack.ts @@ -17,6 +17,7 @@ let App: typeof import('@slack/bolt').App; export interface SlackConfig { botToken: string; // xoxb-... appToken: string; // xapp-... (for Socket Mode) + dmPolicy?: 'pairing' | 'allowlist' | 'open'; allowedUsers?: string[]; // Slack user IDs (e.g., U01234567) attachmentsDir?: string; attachmentsMaxBytes?: number; @@ -139,11 +140,12 @@ export class SlackAdapter implements ChannelAdapter { threadId: threadTs, isGroup, groupName: isGroup ? channelId : undefined, // Would need conversations.info for name + wasMentioned: false, // Regular messages; app_mention handles mentions attachments, }); } }); - + // Handle app mentions (@bot) this.app.event('app_mention', async ({ event }) => { const userId = event.user || ''; @@ -189,6 +191,7 @@ export class SlackAdapter implements ChannelAdapter { threadId: threadTs, isGroup, groupName: isGroup ? channelId : undefined, + wasMentioned: true, // app_mention is always a mention attachments, }); } @@ -274,6 +277,10 @@ export class SlackAdapter implements ChannelAdapter { }); } + getDmPolicy(): string { + return this.config.dmPolicy || 'pairing'; + } + async sendTypingIndicator(_chatId: string): Promise { // Slack doesn't have a typing indicator API for bots // This is a no-op diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index 921ad9b..a676fac 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -14,6 +14,7 @@ import { upsertPairingRequest, formatPairingMessage, } from '../pairing/store.js'; +import { isGroupApproved, approveGroup } from '../pairing/group-store.js'; import { basename } from 'node:path'; import { buildAttachmentPath, downloadToFile } from './attachments.js'; @@ -79,17 +80,68 @@ export class TelegramAdapter implements ChannelAdapter { } private setupHandlers(): void { - // Middleware: Check access based on dmPolicy + // Detect when bot is added/removed from groups (proactive group gating) + this.bot.on('my_chat_member', async (ctx) => { + const chatMember = ctx.myChatMember; + if (!chatMember) return; + + const chatType = chatMember.chat.type; + if (chatType !== 'group' && chatType !== 'supergroup') return; + + const newStatus = chatMember.new_chat_member.status; + if (newStatus !== 'member' && newStatus !== 'administrator') return; + + const chatId = String(chatMember.chat.id); + const fromId = String(chatMember.from.id); + const dmPolicy = this.config.dmPolicy || 'pairing'; + + // No gating when policy is not pairing + if (dmPolicy !== 'pairing') { + await approveGroup('telegram', chatId); + console.log(`[Telegram] Group ${chatId} auto-approved (dmPolicy=${dmPolicy})`); + return; + } + + // Check if the user who added the bot is paired + const configAllowlist = this.config.allowedUsers?.map(String); + const allowed = await isUserAllowed('telegram', fromId, configAllowlist); + + if (allowed) { + await approveGroup('telegram', chatId); + console.log(`[Telegram] Group ${chatId} approved by paired user ${fromId}`); + } else { + console.log(`[Telegram] Unpaired user ${fromId} tried to add bot to group ${chatId}, leaving`); + try { + await ctx.api.sendMessage(chatId, 'This bot can only be added to groups by paired users.'); + await ctx.api.leaveChat(chatId); + } catch (err) { + console.error('[Telegram] Failed to leave group:', err); + } + } + }); + + // Middleware: Check access based on dmPolicy (bypass for groups) this.bot.use(async (ctx, next) => { const userId = ctx.from?.id; if (!userId) return; - + + // Group gating: check if group is approved before processing + const chatType = ctx.chat?.type; + if (chatType === 'group' || chatType === 'supergroup') { + const dmPolicy = this.config.dmPolicy || 'pairing'; + if (dmPolicy === 'open' || await isGroupApproved('telegram', String(ctx.chat!.id))) { + await next(); + } + // Silently drop messages from unapproved groups + return; + } + const access = await this.checkAccess( String(userId), ctx.from?.username, ctx.from?.first_name ); - + if (access === 'allowed') { await next(); return; @@ -158,19 +210,49 @@ export class TelegramAdapter implements ChannelAdapter { const userId = ctx.from?.id; const chatId = ctx.chat.id; const text = ctx.message.text; - + if (!userId) return; if (text.startsWith('/')) return; // Skip other commands - + + // Group detection + const chatType = ctx.chat.type; + const isGroup = chatType === 'group' || chatType === 'supergroup'; + const groupName = isGroup && 'title' in ctx.chat ? ctx.chat.title : undefined; + + // Mention detection for groups + let wasMentioned = false; + if (isGroup) { + const botUsername = this.bot.botInfo?.username; + if (botUsername) { + // Check entities for bot_command or mention matching our username + const entities = ctx.message.entities || []; + wasMentioned = entities.some((e) => { + if (e.type === 'mention') { + const mentioned = text.substring(e.offset, e.offset + e.length); + return mentioned.toLowerCase() === `@${botUsername.toLowerCase()}`; + } + return false; + }); + // Fallback: text-based check + if (!wasMentioned) { + wasMentioned = text.toLowerCase().includes(`@${botUsername.toLowerCase()}`); + } + } + } + if (this.onMessage) { await this.onMessage({ channel: 'telegram', chatId: String(chatId), userId: String(userId), userName: ctx.from.username || ctx.from.first_name, + userHandle: ctx.from.username, messageId: String(ctx.message.message_id), text, timestamp: new Date(), + isGroup, + groupName, + wasMentioned, }); } }); @@ -433,6 +515,10 @@ export class TelegramAdapter implements ChannelAdapter { ]); } + getDmPolicy(): string { + return this.config.dmPolicy || 'pairing'; + } + async sendTypingIndicator(chatId: string): Promise { await this.bot.api.sendChatAction(chatId, 'typing'); } diff --git a/src/channels/types.ts b/src/channels/types.ts index ebcc40e..899a695 100644 --- a/src/channels/types.ts +++ b/src/channels/types.ts @@ -26,6 +26,7 @@ export interface ChannelAdapter { // Capabilities (optional) supportsEditing?(): boolean; sendFile?(file: OutboundFile): Promise<{ messageId: string }>; + getDmPolicy?(): string; // Event handlers (set by bot core) onMessage?: (msg: InboundMessage) => Promise; diff --git a/src/channels/whatsapp/index.ts b/src/channels/whatsapp/index.ts index 470d624..42953fb 100644 --- a/src/channels/whatsapp/index.ts +++ b/src/channels/whatsapp/index.ts @@ -977,6 +977,10 @@ export class WhatsAppAdapter implements ChannelAdapter { ); } + getDmPolicy(): string { + return this.config.dmPolicy || 'pairing'; + } + supportsEditing(): boolean { return false; } diff --git a/src/config/io.ts b/src/config/io.ts index f030b4f..f4f2d1c 100644 --- a/src/config/io.ts +++ b/src/config/io.ts @@ -58,7 +58,12 @@ export function loadConfig(): LettaBotConfig { try { const content = readFileSync(configPath, 'utf-8'); const parsed = YAML.parse(content) as Partial; - + + // Fix instantGroups: YAML parses large numeric IDs (e.g. Discord snowflakes) + // as JavaScript numbers, losing precision for values > Number.MAX_SAFE_INTEGER. + // Re-extract from document AST to preserve the original string representation. + fixInstantGroupIds(content, parsed); + // Merge with defaults return { ...DEFAULT_CONFIG, @@ -133,6 +138,15 @@ export function configToEnv(config: LettaBotConfig): Record { if (config.channels.slack?.botToken) { env.SLACK_BOT_TOKEN = config.channels.slack.botToken; } + if (config.channels.slack?.dmPolicy) { + env.SLACK_DM_POLICY = config.channels.slack.dmPolicy; + } + if (config.channels.slack?.groupPollIntervalMin !== undefined) { + env.SLACK_GROUP_POLL_INTERVAL_MIN = String(config.channels.slack.groupPollIntervalMin); + } + if (config.channels.slack?.instantGroups?.length) { + env.SLACK_INSTANT_GROUPS = config.channels.slack.instantGroups.join(','); + } if (config.channels.whatsapp?.enabled) { env.WHATSAPP_ENABLED = 'true'; if (config.channels.whatsapp.selfChat) { @@ -141,6 +155,12 @@ export function configToEnv(config: LettaBotConfig): Record { env.WHATSAPP_SELF_CHAT_MODE = 'false'; } } + if (config.channels.whatsapp?.groupPollIntervalMin !== undefined) { + env.WHATSAPP_GROUP_POLL_INTERVAL_MIN = String(config.channels.whatsapp.groupPollIntervalMin); + } + if (config.channels.whatsapp?.instantGroups?.length) { + env.WHATSAPP_INSTANT_GROUPS = config.channels.whatsapp.instantGroups.join(','); + } if (config.channels.signal?.phone) { env.SIGNAL_PHONE_NUMBER = config.channels.signal.phone; // Signal selfChat defaults to true, so only set env if explicitly false @@ -148,6 +168,18 @@ export function configToEnv(config: LettaBotConfig): Record { env.SIGNAL_SELF_CHAT_MODE = 'false'; } } + if (config.channels.signal?.groupPollIntervalMin !== undefined) { + env.SIGNAL_GROUP_POLL_INTERVAL_MIN = String(config.channels.signal.groupPollIntervalMin); + } + if (config.channels.signal?.instantGroups?.length) { + env.SIGNAL_INSTANT_GROUPS = config.channels.signal.instantGroups.join(','); + } + if (config.channels.telegram?.groupPollIntervalMin !== undefined) { + env.TELEGRAM_GROUP_POLL_INTERVAL_MIN = String(config.channels.telegram.groupPollIntervalMin); + } + if (config.channels.telegram?.instantGroups?.length) { + env.TELEGRAM_INSTANT_GROUPS = config.channels.telegram.instantGroups.join(','); + } if (config.channels.discord?.token) { env.DISCORD_BOT_TOKEN = config.channels.discord.token; if (config.channels.discord.dmPolicy) { @@ -157,6 +189,12 @@ export function configToEnv(config: LettaBotConfig): Record { env.DISCORD_ALLOWED_USERS = config.channels.discord.allowedUsers.join(','); } } + if (config.channels.discord?.groupPollIntervalMin !== undefined) { + env.DISCORD_GROUP_POLL_INTERVAL_MIN = String(config.channels.discord.groupPollIntervalMin); + } + if (config.channels.discord?.instantGroups?.length) { + env.DISCORD_INSTANT_GROUPS = config.channels.discord.instantGroups.join(','); + } // Features if (config.features?.cron) { @@ -281,3 +319,49 @@ export async function syncProviders(config: LettaBotConfig): Promise { } } } + +/** + * Fix instantGroups arrays that may contain large numeric IDs parsed by YAML. + * Discord snowflake IDs exceed Number.MAX_SAFE_INTEGER, so YAML parses them + * as lossy JavaScript numbers. We re-read from the document AST to get the + * original string representation. + */ +function fixInstantGroupIds(yamlContent: string, parsed: Partial): void { + if (!parsed.channels) return; + + try { + const doc = YAML.parseDocument(yamlContent); + const channels = ['telegram', 'slack', 'whatsapp', 'signal', 'discord'] as const; + + for (const ch of channels) { + const seq = doc.getIn(['channels', ch, 'instantGroups'], true); + if (YAML.isSeq(seq)) { + const fixed = seq.items.map((item: unknown) => { + if (YAML.isScalar(item)) { + // For numbers, use the original source text to avoid precision loss + if (typeof item.value === 'number' && item.source) { + return item.source; + } + return String(item.value); + } + return String(item); + }); + const cfg = parsed.channels[ch]; + if (cfg) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (cfg as any).instantGroups = fixed; + } + } + } + } catch { + // Fallback: just ensure entries are strings (won't fix precision, but safe) + const channels = ['telegram', 'slack', 'whatsapp', 'signal', 'discord'] as const; + for (const ch of channels) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const cfg = parsed.channels?.[ch] as any; + if (cfg && Array.isArray(cfg.instantGroups)) { + cfg.instantGroups = cfg.instantGroups.map((v: unknown) => String(v)); + } + } + } +} diff --git a/src/config/types.ts b/src/config/types.ts index a5a2176..0c575ad 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -100,13 +100,18 @@ export interface TelegramConfig { token?: string; dmPolicy?: 'pairing' | 'allowlist' | 'open'; allowedUsers?: string[]; + groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate) + instantGroups?: string[]; // Group chat IDs that bypass batching } export interface SlackConfig { enabled: boolean; appToken?: string; botToken?: string; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; allowedUsers?: string[]; + groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate) + instantGroups?: string[]; // Channel IDs that bypass batching } export interface WhatsAppConfig { @@ -118,6 +123,8 @@ export interface WhatsAppConfig { groupAllowFrom?: string[]; mentionPatterns?: string[]; groups?: Record; + groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate) + instantGroups?: string[]; // Group JIDs that bypass batching } export interface SignalConfig { @@ -129,6 +136,8 @@ export interface SignalConfig { // Group gating mentionPatterns?: string[]; // Regex patterns for mention detection (e.g., ["@bot"]) groups?: Record; // Per-group settings, "*" for defaults + groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate) + instantGroups?: string[]; // Group IDs that bypass batching } export interface DiscordConfig { @@ -136,6 +145,8 @@ export interface DiscordConfig { token?: string; dmPolicy?: 'pairing' | 'allowlist' | 'open'; allowedUsers?: string[]; + groupPollIntervalMin?: number; // Batch interval in minutes (default: 10, 0 = immediate) + instantGroups?: string[]; // Guild/server IDs or channel IDs that bypass batching } export interface GoogleConfig { diff --git a/src/core/bot.ts b/src/core/bot.ts index 01d354d..0a21fc4 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -11,7 +11,8 @@ import type { BotConfig, InboundMessage, TriggerContext } from './types.js'; import { Store } from './store.js'; import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval } from '../tools/letta-api.js'; import { installSkillsToAgent } from '../skills/loader.js'; -import { formatMessageEnvelope, type SessionContextOptions } from './formatter.js'; +import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js'; +import type { GroupBatcher } from './group-batcher.js'; import { loadMemoryBlocks } from './memory.js'; import { SYSTEM_PROMPT } from './system-prompt.js'; @@ -41,6 +42,9 @@ export class LettaBot { // Callback to trigger heartbeat (set by main.ts) public onTriggerHeartbeat?: () => Promise; + private groupBatcher?: GroupBatcher; + private groupIntervals: Map = new Map(); // channel -> intervalMin + private instantGroupIds: Set = new Set(); // channel:id keys for instant processing private processing = false; constructor(config: BotConfig) { @@ -65,6 +69,37 @@ export class LettaBot { console.log(`Registered channel: ${adapter.name}`); } + /** + * Set the group batcher and per-channel intervals. + */ + setGroupBatcher(batcher: GroupBatcher, intervals: Map, instantGroupIds?: Set): void { + this.groupBatcher = batcher; + this.groupIntervals = intervals; + if (instantGroupIds) { + this.instantGroupIds = instantGroupIds; + } + console.log('[Bot] Group batcher configured'); + } + + /** + * Inject a batched group message into the queue and trigger processing. + * Called by GroupBatcher's onFlush callback. + */ + processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void { + const count = msg.batchedMessages?.length || 0; + console.log(`[Bot] Group batch: ${count} messages from ${msg.channel}:${msg.chatId}`); + + // Unwrap single-message batches so they use formatMessageEnvelope (DM-style) + // instead of the chat-log batch format + const effective = (count === 1 && msg.batchedMessages) + ? msg.batchedMessages[0] + : msg; + this.messageQueue.push({ msg: effective, adapter }); + if (!this.processing) { + this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); + } + } + /** * Handle slash commands */ @@ -218,7 +253,18 @@ export class LettaBot { */ private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { console.log(`[${msg.channel}] Message from ${msg.userId}: ${msg.text}`); - + + // Route group messages to batcher if configured + if (msg.isGroup && this.groupBatcher) { + // Check if this group is configured for instant processing + const isInstant = this.instantGroupIds.has(`${msg.channel}:${msg.chatId}`) + || (msg.serverId && this.instantGroupIds.has(`${msg.channel}:${msg.serverId}`)); + const intervalMin = isInstant ? 0 : (this.groupIntervals.get(msg.channel) ?? 10); + console.log(`[Bot] Group message routed to batcher (interval=${intervalMin}min, mentioned=${msg.wasMentioned}, instant=${!!isInstant})`); + this.groupBatcher.enqueue(msg, adapter, intervalMin); + return; + } + // Add to queue this.messageQueue.push({ msg, adapter }); console.log(`[Queue] Added to queue, length: ${this.messageQueue.length}, processing: ${this.processing}`); @@ -394,7 +440,9 @@ export class LettaBot { } : undefined; // Send message to agent with metadata envelope - const formattedMessage = formatMessageEnvelope(msg, {}, sessionContext); + const formattedMessage = msg.isBatch && msg.batchedMessages + ? formatGroupBatchEnvelope(msg.batchedMessages) + : formatMessageEnvelope(msg); try { await withTimeout(session.send(formattedMessage), 'Session send'); } catch (sendError) { diff --git a/src/core/formatter.ts b/src/core/formatter.ts index 2cdaf45..18851e5 100644 --- a/src/core/formatter.ts +++ b/src/core/formatter.ts @@ -39,6 +39,31 @@ const DEFAULT_OPTIONS: EnvelopeOptions = { includeGroup: true, }; +/** + * Format a short time string (e.g., "4:30 PM") + */ +function formatShortTime(date: Date, options: EnvelopeOptions): string { + let timeZone: string | undefined; + if (options.timezone === 'utc') { + timeZone = 'UTC'; + } else if (options.timezone && options.timezone !== 'local') { + try { + new Intl.DateTimeFormat('en-US', { timeZone: options.timezone }); + timeZone = options.timezone; + } catch { + timeZone = undefined; + } + } + + const formatter = new Intl.DateTimeFormat('en-US', { + hour: 'numeric', + minute: '2-digit', + hour12: true, + timeZone, + }); + return formatter.format(date); +} + /** * Session context options for first-message enrichment */ @@ -337,3 +362,60 @@ export function formatMessageEnvelope( } return reminder; } + +/** + * Format a group batch of messages as a chat log for the agent. + * + * Output format: + * [GROUP CHAT - discord:123 #general - 3 messages] + * [4:30 PM] Alice: Hey everyone + * [4:32 PM] Bob: What's up? + * [4:35 PM] Alice: @LettaBot can you help? + * (Format: **bold** *italic* ...) + */ +export function formatGroupBatchEnvelope( + messages: InboundMessage[], + options: EnvelopeOptions = {} +): string { + if (messages.length === 0) return ''; + + const opts = { ...DEFAULT_OPTIONS, ...options }; + const first = messages[0]; + + // Header: [GROUP CHAT - channel:chatId #groupName - N messages] + const headerParts: string[] = ['GROUP CHAT']; + headerParts.push(`${first.channel}:${first.chatId}`); + if (first.groupName?.trim()) { + if ((first.channel === 'slack' || first.channel === 'discord') && !first.groupName.startsWith('#')) { + headerParts.push(`#${first.groupName}`); + } else { + headerParts.push(first.groupName); + } + } + headerParts.push(`${messages.length} message${messages.length === 1 ? '' : 's'}`); + const header = `[${headerParts.join(' - ')}]`; + + // Chat log lines + const lines = messages.map((msg) => { + const time = formatShortTime(msg.timestamp, opts); + const sender = formatSender(msg); + const textParts: string[] = []; + if (msg.text?.trim()) textParts.push(msg.text.trim()); + if (msg.reaction) { + const action = msg.reaction.action || 'added'; + textParts.push(`[Reaction ${action}: ${msg.reaction.emoji}]`); + } + if (msg.attachments && msg.attachments.length > 0) { + const names = msg.attachments.map((a) => a.name || 'attachment').join(', '); + textParts.push(`[Attachments: ${names}]`); + } + const body = textParts.join(' ') || '(empty)'; + return `[${time}] ${sender}: ${body}`; + }); + + // Format hint + const formatHint = CHANNEL_FORMATS[first.channel]; + const hint = formatHint ? `\n(Format: ${formatHint})` : ''; + + return `${header}\n${lines.join('\n')}${hint}`; +} diff --git a/src/core/group-batcher.ts b/src/core/group-batcher.ts new file mode 100644 index 0000000..0f9abcd --- /dev/null +++ b/src/core/group-batcher.ts @@ -0,0 +1,113 @@ +/** + * Group Message Batcher + * + * Buffers group chat messages and flushes them periodically or on @mention. + * Channel-agnostic: works with any ChannelAdapter. + */ + +import type { ChannelAdapter } from '../channels/types.js'; +import type { InboundMessage } from './types.js'; + +export interface BufferEntry { + messages: InboundMessage[]; + adapter: ChannelAdapter; + timer: ReturnType | null; +} + +export type OnFlushCallback = (msg: InboundMessage, adapter: ChannelAdapter) => void; + +export class GroupBatcher { + private buffer: Map = new Map(); + private onFlush: OnFlushCallback; + + constructor(onFlush: OnFlushCallback) { + this.onFlush = onFlush; + } + + /** + * Add a group message to the buffer. + * If wasMentioned, flush immediately. + * If intervalMin is 0, flush on every message (no batching). + * Otherwise, start a timer on the first message (does NOT reset on subsequent messages). + */ + enqueue(msg: InboundMessage, adapter: ChannelAdapter, intervalMin: number): void { + const key = `${msg.channel}:${msg.chatId}`; + + let entry = this.buffer.get(key); + if (!entry) { + entry = { messages: [], adapter, timer: null }; + this.buffer.set(key, entry); + } + + entry.messages.push(msg); + entry.adapter = adapter; // Update adapter reference + + // Immediate flush: @mention or intervalMin=0 + if (msg.wasMentioned || intervalMin === 0) { + this.flush(key); + return; + } + + // Start timer on first message only (don't reset to prevent starvation) + if (!entry.timer) { + const ms = intervalMin * 60 * 1000; + entry.timer = setTimeout(() => { + this.flush(key); + }, ms); + } + } + + /** + * Flush buffered messages for a key, building a synthetic batch InboundMessage. + */ + flush(key: string): void { + const entry = this.buffer.get(key); + if (!entry || entry.messages.length === 0) return; + + // Clear timer + if (entry.timer) { + clearTimeout(entry.timer); + entry.timer = null; + } + + const messages = entry.messages; + const adapter = entry.adapter; + + // Remove from buffer + this.buffer.delete(key); + + // Use the last message as the base for the synthetic batch message + const last = messages[messages.length - 1]; + + const batchMsg: InboundMessage = { + channel: last.channel, + chatId: last.chatId, + userId: last.userId, + userName: last.userName, + userHandle: last.userHandle, + messageId: last.messageId, + text: messages.map((m) => m.text).join('\n'), + timestamp: last.timestamp, + isGroup: true, + groupName: last.groupName, + wasMentioned: messages.some((m) => m.wasMentioned), + isBatch: true, + batchedMessages: messages, + }; + + this.onFlush(batchMsg, adapter); + } + + /** + * Clear all timers on shutdown. + */ + stop(): void { + for (const [, entry] of this.buffer) { + if (entry.timer) { + clearTimeout(entry.timer); + entry.timer = null; + } + } + this.buffer.clear(); + } +} diff --git a/src/core/types.ts b/src/core/types.ts index c0733e8..a35d381 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -76,10 +76,13 @@ export interface InboundMessage { threadId?: string; // Slack thread_ts isGroup?: boolean; // Is this from a group chat? groupName?: string; // Group/channel name if applicable + serverId?: string; // Server/guild ID (Discord only) wasMentioned?: boolean; // Was bot explicitly mentioned? (groups only) replyToUser?: string; // Phone number of who they're replying to (if reply) attachments?: InboundAttachment[]; reaction?: InboundReaction; + isBatch?: boolean; // Is this a batched group message? + batchedMessages?: InboundMessage[]; // Original individual messages (for batch formatting) } /** diff --git a/src/main.ts b/src/main.ts index 33d2075..bbd5f97 100644 --- a/src/main.ts +++ b/src/main.ts @@ -119,6 +119,7 @@ import { SlackAdapter } from './channels/slack.js'; import { WhatsAppAdapter } from './channels/whatsapp/index.js'; import { SignalAdapter } from './channels/signal.js'; import { DiscordAdapter } from './channels/discord.js'; +import { GroupBatcher } from './core/group-batcher.js'; import { CronService } from './cron/service.js'; import { HeartbeatService } from './cron/heartbeat.js'; import { PollingService } from './polling/service.js'; @@ -244,12 +245,21 @@ const config = { token: process.env.TELEGRAM_BOT_TOKEN || '', dmPolicy: (process.env.TELEGRAM_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open', allowedUsers: process.env.TELEGRAM_ALLOWED_USERS?.split(',').filter(Boolean).map(Number) || [], + groupPollIntervalMin: process.env.TELEGRAM_GROUP_POLL_INTERVAL_MIN !== undefined + ? parseInt(process.env.TELEGRAM_GROUP_POLL_INTERVAL_MIN, 10) + : 10, + instantGroups: process.env.TELEGRAM_INSTANT_GROUPS?.split(',').filter(Boolean) || [], }, slack: { enabled: !!process.env.SLACK_BOT_TOKEN && !!process.env.SLACK_APP_TOKEN, botToken: process.env.SLACK_BOT_TOKEN || '', appToken: process.env.SLACK_APP_TOKEN || '', + dmPolicy: (process.env.SLACK_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open', allowedUsers: process.env.SLACK_ALLOWED_USERS?.split(',').filter(Boolean) || [], + groupPollIntervalMin: process.env.SLACK_GROUP_POLL_INTERVAL_MIN !== undefined + ? parseInt(process.env.SLACK_GROUP_POLL_INTERVAL_MIN, 10) + : 10, + instantGroups: process.env.SLACK_INSTANT_GROUPS?.split(',').filter(Boolean) || [], }, whatsapp: { enabled: process.env.WHATSAPP_ENABLED === 'true', @@ -257,6 +267,10 @@ const config = { dmPolicy: (process.env.WHATSAPP_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open', allowedUsers: process.env.WHATSAPP_ALLOWED_USERS?.split(',').filter(Boolean) || [], selfChatMode: process.env.WHATSAPP_SELF_CHAT_MODE !== 'false', // Default true (safe - only self-chat) + groupPollIntervalMin: process.env.WHATSAPP_GROUP_POLL_INTERVAL_MIN !== undefined + ? parseInt(process.env.WHATSAPP_GROUP_POLL_INTERVAL_MIN, 10) + : 10, + instantGroups: process.env.WHATSAPP_INSTANT_GROUPS?.split(',').filter(Boolean) || [], }, signal: { enabled: !!process.env.SIGNAL_PHONE_NUMBER, @@ -267,12 +281,20 @@ const config = { dmPolicy: (process.env.SIGNAL_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open', allowedUsers: process.env.SIGNAL_ALLOWED_USERS?.split(',').filter(Boolean) || [], selfChatMode: process.env.SIGNAL_SELF_CHAT_MODE !== 'false', // Default true + groupPollIntervalMin: process.env.SIGNAL_GROUP_POLL_INTERVAL_MIN !== undefined + ? parseInt(process.env.SIGNAL_GROUP_POLL_INTERVAL_MIN, 10) + : 10, + instantGroups: process.env.SIGNAL_INSTANT_GROUPS?.split(',').filter(Boolean) || [], }, discord: { enabled: !!process.env.DISCORD_BOT_TOKEN, token: process.env.DISCORD_BOT_TOKEN || '', dmPolicy: (process.env.DISCORD_DM_POLICY || 'pairing') as 'pairing' | 'allowlist' | 'open', allowedUsers: process.env.DISCORD_ALLOWED_USERS?.split(',').filter(Boolean) || [], + groupPollIntervalMin: process.env.DISCORD_GROUP_POLL_INTERVAL_MIN !== undefined + ? parseInt(process.env.DISCORD_GROUP_POLL_INTERVAL_MIN, 10) + : 10, + instantGroups: process.env.DISCORD_INSTANT_GROUPS?.split(',').filter(Boolean) || [], }, // Cron @@ -414,6 +436,7 @@ async function main() { const slack = new SlackAdapter({ botToken: config.slack.botToken, appToken: config.slack.appToken, + dmPolicy: config.slack.dmPolicy, allowedUsers: config.slack.allowedUsers.length > 0 ? config.slack.allowedUsers : undefined, attachmentsDir, attachmentsMaxBytes: config.attachmentsMaxBytes, @@ -467,6 +490,46 @@ async function main() { bot.registerChannel(discord); } + // Create and wire group batcher + const groupIntervals = new Map(); + if (config.telegram.enabled) { + groupIntervals.set('telegram', config.telegram.groupPollIntervalMin ?? 10); + } + if (config.slack.enabled) { + groupIntervals.set('slack', config.slack.groupPollIntervalMin ?? 10); + } + if (config.whatsapp.enabled) { + groupIntervals.set('whatsapp', config.whatsapp.groupPollIntervalMin ?? 10); + } + if (config.signal.enabled) { + groupIntervals.set('signal', config.signal.groupPollIntervalMin ?? 10); + } + if (config.discord.enabled) { + groupIntervals.set('discord', config.discord.groupPollIntervalMin ?? 10); + } + // Build instant group IDs set (channel:id format) + const instantGroupIds = new Set(); + const channelInstantGroups: Array<[string, string[]]> = [ + ['telegram', config.telegram.instantGroups], + ['slack', config.slack.instantGroups], + ['whatsapp', config.whatsapp.instantGroups], + ['signal', config.signal.instantGroups], + ['discord', config.discord.instantGroups], + ]; + for (const [channel, ids] of channelInstantGroups) { + for (const id of ids) { + instantGroupIds.add(`${channel}:${id}`); + } + } + if (instantGroupIds.size > 0) { + console.log(`[Groups] Instant groups: ${[...instantGroupIds].join(', ')}`); + } + + const groupBatcher = new GroupBatcher((msg, adapter) => { + bot.processGroupBatch(msg, adapter); + }); + bot.setGroupBatcher(groupBatcher, groupIntervals, instantGroupIds); + // Start cron service if enabled // Note: CronService uses getDataDir() for cron-jobs.json to match the CLI let cronService: CronService | null = null; @@ -546,6 +609,7 @@ async function main() { // Handle shutdown const shutdown = async () => { console.log('\nShutting down...'); + groupBatcher.stop(); heartbeatService?.stop(); cronService?.stop(); await bot.stop(); diff --git a/src/pairing/group-store.ts b/src/pairing/group-store.ts new file mode 100644 index 0000000..921c377 --- /dev/null +++ b/src/pairing/group-store.ts @@ -0,0 +1,68 @@ +/** + * Approved Groups Store + * + * Tracks which groups have been approved (activated by a paired user). + * Only relevant when dmPolicy === 'pairing'. + * + * Storage: ~/.lettabot/credentials/{channel}-approvedGroups.json + */ + +import fs from 'node:fs'; +import path from 'node:path'; +import os from 'node:os'; +import crypto from 'node:crypto'; + +interface ApprovedGroupsStore { + version: 1; + groups: string[]; +} + +function getCredentialsDir(): string { + return path.join(os.homedir(), '.lettabot', 'credentials'); +} + +function getStorePath(channel: string): string { + return path.join(getCredentialsDir(), `${channel}-approvedGroups.json`); +} + +async function ensureDir(dir: string): Promise { + await fs.promises.mkdir(dir, { recursive: true, mode: 0o700 }); +} + +async function readJson(filePath: string, fallback: T): Promise { + try { + const raw = await fs.promises.readFile(filePath, 'utf-8'); + return JSON.parse(raw) as T; + } catch { + return fallback; + } +} + +async function writeJson(filePath: string, data: unknown): Promise { + await ensureDir(path.dirname(filePath)); + const tmp = `${filePath}.${crypto.randomUUID()}.tmp`; + await fs.promises.writeFile(tmp, JSON.stringify(data, null, 2) + '\n', { encoding: 'utf-8' }); + await fs.promises.chmod(tmp, 0o600); + await fs.promises.rename(tmp, filePath); +} + +/** + * Check if a group has been approved for a given channel. + */ +export async function isGroupApproved(channel: string, chatId: string): Promise { + const filePath = getStorePath(channel); + const store = await readJson(filePath, { version: 1, groups: [] }); + return (store.groups || []).includes(chatId); +} + +/** + * Approve a group for a given channel. + */ +export async function approveGroup(channel: string, chatId: string): Promise { + const filePath = getStorePath(channel); + const store = await readJson(filePath, { version: 1, groups: [] }); + const groups = store.groups || []; + if (groups.includes(chatId)) return; + groups.push(chatId); + await writeJson(filePath, { version: 1, groups }); +}