From 69b3d165d65321862753b426cc6ed1e8395a6f0e Mon Sep 17 00:00:00 2001 From: Jason Carreira Date: Tue, 24 Feb 2026 14:14:26 -0500 Subject: [PATCH] Add per-channel conversation overrides (#340) Co-authored-by: Claude Sonnet 4.6 Co-authored-by: Cameron --- docs/configuration.md | 19 ++++- lettabot.example.yaml | 1 + src/config/normalize.test.ts | 56 ++++++------- src/config/types.ts | 2 + src/core/bot.ts | 95 +++++++++++++++++----- src/core/conversation-key.test.ts | 108 ++++++++++++++++++++++++++ src/core/sdk-session-contract.test.ts | 29 +++++++ src/core/types.ts | 1 + src/main.ts | 1 + 9 files changed, 264 insertions(+), 48 deletions(-) create mode 100644 src/core/conversation-key.test.ts diff --git a/docs/configuration.md b/docs/configuration.md index 3ced4e9..c6bd549 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -234,12 +234,29 @@ Each entry in `agents:` accepts: | `id` | string | No | Use existing agent ID (skips creation) | | `displayName` | string | No | Prefix outbound messages (e.g. `"💜 Signo"`) | | `model` | string | No | Model for agent creation | -| `conversations` | object | No | Conversation routing config (shared vs per-channel) | +| `conversations` | object | No | Conversation routing (mode, heartbeat, perChannel overrides) | | `channels` | object | No | Channel configs (same schema as top-level `channels:`). At least one agent must have channels. | | `features` | object | No | Per-agent features (cron, heartbeat, memfs, maxToolCalls) | | `polling` | object | No | Per-agent polling config (Gmail, etc.) | | `integrations` | object | No | Per-agent integrations (Google, etc.) | +### Conversation Routing + +Conversation routing controls which incoming messages share a Letta conversation. + +```yaml +conversations: + mode: shared # shared (default) or per-channel + heartbeat: last-active # per-channel mode, or shared mode with perChannel overrides + perChannel: + - bluesky # always separate, even in shared mode +``` + +- **mode: shared** (default) keeps one shared conversation across all channels. +- **mode: per-channel** creates an independent conversation per channel. +- **perChannel** lets you keep most channels shared while carving out specific channels to run independently. +- **heartbeat**: `dedicated`, `last-active`, or a specific channel name. Applies in per-channel mode and in shared mode with perChannel overrides. + ### How it works - Each agent is a separate Letta agent with its own conversation history and memory diff --git a/lettabot.example.yaml b/lettabot.example.yaml index 8c3db9a..4384a4b 100644 --- a/lettabot.example.yaml +++ b/lettabot.example.yaml @@ -24,6 +24,7 @@ agents: conversations: mode: shared # "shared" (default) or "per-channel" heartbeat: last-active # "dedicated" | "last-active" | "" + # perChannel: ["slack"] # Keep these channels isolated while others share a conversation channels: telegram: diff --git a/src/config/normalize.test.ts b/src/config/normalize.test.ts index 5856eee..31ddeb6 100644 --- a/src/config/normalize.test.ts +++ b/src/config/normalize.test.ts @@ -23,6 +23,36 @@ import { } from './types.js'; describe('normalizeAgents', () => { + const envVars = [ + 'TELEGRAM_BOT_TOKEN', 'TELEGRAM_DM_POLICY', 'TELEGRAM_ALLOWED_USERS', + 'SLACK_BOT_TOKEN', 'SLACK_APP_TOKEN', 'SLACK_DM_POLICY', 'SLACK_ALLOWED_USERS', + 'WHATSAPP_ENABLED', 'WHATSAPP_SELF_CHAT_MODE', 'WHATSAPP_DM_POLICY', 'WHATSAPP_ALLOWED_USERS', + 'SIGNAL_PHONE_NUMBER', 'SIGNAL_SELF_CHAT_MODE', 'SIGNAL_DM_POLICY', 'SIGNAL_ALLOWED_USERS', + 'DISCORD_BOT_TOKEN', 'DISCORD_DM_POLICY', 'DISCORD_ALLOWED_USERS', + 'BLUESKY_WANTED_DIDS', 'BLUESKY_WANTED_COLLECTIONS', 'BLUESKY_JETSTREAM_URL', 'BLUESKY_CURSOR', + 'BLUESKY_HANDLE', 'BLUESKY_APP_PASSWORD', 'BLUESKY_SERVICE_URL', 'BLUESKY_APPVIEW_URL', + 'BLUESKY_NOTIFICATIONS_ENABLED', 'BLUESKY_NOTIFICATIONS_INTERVAL_SEC', 'BLUESKY_NOTIFICATIONS_LIMIT', + 'BLUESKY_NOTIFICATIONS_PRIORITY', 'BLUESKY_NOTIFICATIONS_REASONS', + ]; + const savedEnv: Record = {}; + + beforeEach(() => { + for (const key of envVars) { + savedEnv[key] = process.env[key]; + delete process.env[key]; + } + }); + + afterEach(() => { + for (const key of envVars) { + if (savedEnv[key] !== undefined) { + process.env[key] = savedEnv[key]; + } else { + delete process.env[key]; + } + } + }); + it('canonicalizes legacy server mode aliases', () => { expect(canonicalizeServerMode('cloud')).toBe('api'); expect(canonicalizeServerMode('api')).toBe('api'); @@ -243,32 +273,6 @@ describe('normalizeAgents', () => { }); describe('env var fallback (container deploys)', () => { - const envVars = [ - 'TELEGRAM_BOT_TOKEN', 'TELEGRAM_DM_POLICY', 'TELEGRAM_ALLOWED_USERS', - 'SLACK_BOT_TOKEN', 'SLACK_APP_TOKEN', 'SLACK_DM_POLICY', 'SLACK_ALLOWED_USERS', - 'WHATSAPP_ENABLED', 'WHATSAPP_SELF_CHAT_MODE', 'WHATSAPP_DM_POLICY', 'WHATSAPP_ALLOWED_USERS', - 'SIGNAL_PHONE_NUMBER', 'SIGNAL_SELF_CHAT_MODE', 'SIGNAL_DM_POLICY', 'SIGNAL_ALLOWED_USERS', - 'DISCORD_BOT_TOKEN', 'DISCORD_DM_POLICY', 'DISCORD_ALLOWED_USERS', - ]; - const savedEnv: Record = {}; - - beforeEach(() => { - for (const key of envVars) { - savedEnv[key] = process.env[key]; - delete process.env[key]; - } - }); - - afterEach(() => { - for (const key of envVars) { - if (savedEnv[key] !== undefined) { - process.env[key] = savedEnv[key]; - } else { - delete process.env[key]; - } - } - }); - it('should pick up channels from env vars when YAML has none', () => { process.env.TELEGRAM_BOT_TOKEN = 'env-telegram-token'; process.env.DISCORD_BOT_TOKEN = 'env-discord-token'; diff --git a/src/config/types.ts b/src/config/types.ts index 31146ba..4b6f342 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -66,6 +66,7 @@ export interface AgentConfig { conversations?: { mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels) heartbeat?: string; // "dedicated" | "last-active" | "" (default: last-active) + perChannel?: string[]; // Channels that should always have their own conversation }; /** Features for this agent */ features?: { @@ -143,6 +144,7 @@ export interface LettaBotConfig { conversations?: { mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels) heartbeat?: string; // "dedicated" | "last-active" | "" (default: last-active) + perChannel?: string[]; // Channels that should always have their own conversation }; // Features diff --git a/src/core/bot.ts b/src/core/bot.ts index 2961efa..00bc42d 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -206,6 +206,49 @@ export function isResponseDeliverySuppressed(msg: Pick, +): string { + const normalized = channel.toLowerCase(); + if (conversationMode === 'per-channel') return normalized; + if (conversationOverrides.has(normalized)) return normalized; + return 'shared'; +} + +/** + * Pure function: resolve the conversation key for heartbeat/sendToAgent. + * In per-channel mode, respects heartbeatConversation setting. + * In shared mode with overrides, respects override channels when using last-active. + */ +export function resolveHeartbeatConversationKey( + conversationMode: string | undefined, + heartbeatConversation: string | undefined, + conversationOverrides: Set, + lastActiveChannel?: string, +): string { + const hb = heartbeatConversation || 'last-active'; + + if (conversationMode === 'per-channel') { + if (hb === 'dedicated') return 'heartbeat'; + if (hb === 'last-active') return lastActiveChannel ?? 'shared'; + return hb; + } + + // shared mode — if last-active and overrides exist, respect the override channel + if (hb === 'last-active' && conversationOverrides.size > 0 && lastActiveChannel) { + return resolveConversationKey(lastActiveChannel, conversationMode, conversationOverrides); + } + + return 'shared'; +} + export class LettaBot implements AgentSession { private store: Store; private config: BotConfig; @@ -230,6 +273,7 @@ export class LettaBot implements AgentSession { // channel (and optionally heartbeat) gets its own subprocess. private sessions: Map = new Map(); private currentCanUseTool: CanUseToolCallback | undefined; + private conversationOverrides: Set = new Set(); // Stable callback wrapper so the Session options never change, but we can // swap out the per-message handler before each send(). private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => { @@ -243,6 +287,9 @@ export class LettaBot implements AgentSession { this.config = config; mkdirSync(config.workingDir, { recursive: true }); this.store = new Store('lettabot-agent.json', config.agentName); + if (config.conversationOverrides?.length) { + this.conversationOverrides = new Set(config.conversationOverrides.map((ch) => ch.toLowerCase())); + } log.info(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`); } @@ -725,27 +772,25 @@ export class LettaBot implements AgentSession { /** * Resolve the conversation key for a channel message. - * In shared mode returns "shared"; in per-channel mode returns the channel id. + * Returns 'shared' in shared mode (unless channel is in perChannel overrides). + * Returns channel id in per-channel mode or for override channels. */ private resolveConversationKey(channel: string): string { - return this.config.conversationMode === 'per-channel' ? channel : 'shared'; + return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides); } /** * Resolve the conversation key for heartbeat/sendToAgent. + * Respects perChannel overrides when using last-active in shared mode. */ private resolveHeartbeatConversationKey(): string { - if (this.config.conversationMode !== 'per-channel') return 'shared'; - - const hb = this.config.heartbeatConversation || 'last-active'; - if (hb === 'dedicated') return 'heartbeat'; - if (hb === 'last-active') { - // Use the last channel the user messaged on - const target = this.store.lastMessageTarget; - return target ? target.channel : 'shared'; - } - // Explicit channel name (e.g., "telegram") - return hb; + const lastActiveChannel = this.store.lastMessageTarget?.channel; + return resolveHeartbeatConversationKey( + this.config.conversationMode, + this.config.heartbeatConversation, + this.conversationOverrides, + lastActiveChannel, + ); } // ========================================================================= @@ -1029,8 +1074,8 @@ export class LettaBot implements AgentSession { } } - if (this.config.conversationMode === 'per-channel') { - const convKey = this.resolveConversationKey(effective.channel); + const convKey = this.resolveConversationKey(effective.channel); + if (convKey !== 'shared') { this.enqueueForKey(convKey, effective, adapter); } else { this.messageQueue.push({ msg: effective, adapter }); @@ -1224,10 +1269,9 @@ export class LettaBot implements AgentSession { return; } - if (this.config.conversationMode === 'per-channel') { - // Per-channel mode: messages on different channels can run in parallel. - // Only serialize within the same conversation key. - const convKey = this.resolveConversationKey(msg.channel); + const convKey = this.resolveConversationKey(msg.channel); + if (convKey !== 'shared') { + // Per-channel or override mode: messages on different keys can run in parallel. this.enqueueForKey(convKey, msg, adapter); } else { // Shared mode: single global queue (existing behavior) @@ -1833,7 +1877,7 @@ export class LettaBot implements AgentSession { private async acquireLock(convKey: string): Promise { if (convKey === 'heartbeat') return false; // No lock needed - if (this.config.conversationMode === 'per-channel') { + if (convKey !== 'shared') { while (this.processingKeys.has(convKey)) { await new Promise(resolve => setTimeout(resolve, 1000)); } @@ -1849,8 +1893,17 @@ export class LettaBot implements AgentSession { private releaseLock(convKey: string, acquired: boolean): void { if (!acquired) return; - if (this.config.conversationMode === 'per-channel') { + if (convKey !== 'shared') { this.processingKeys.delete(convKey); + // Heartbeats/sendToAgent may hold a channel key while user messages for + // that same key queue up. Kick the keyed worker after unlock so queued + // messages are not left waiting for another inbound message to arrive. + const queue = this.keyedQueues.get(convKey); + if (queue && queue.length > 0) { + this.processKeyedQueue(convKey).catch(err => + log.error(`Fatal error in processKeyedQueue(${convKey}) after lock release:`, err) + ); + } } else { this.processing = false; this.processQueue(); diff --git a/src/core/conversation-key.test.ts b/src/core/conversation-key.test.ts new file mode 100644 index 0000000..a0fcc80 --- /dev/null +++ b/src/core/conversation-key.test.ts @@ -0,0 +1,108 @@ +import { describe, it, expect } from 'vitest'; +import { resolveConversationKey, resolveHeartbeatConversationKey } from './bot.js'; + +// --------------------------------------------------------------------------- +// resolveConversationKey +// --------------------------------------------------------------------------- +describe('resolveConversationKey', () => { + it('returns "shared" in shared mode for a normal channel', () => { + expect(resolveConversationKey('telegram', 'shared', new Set())).toBe('shared'); + }); + + it('returns channel id in per-channel mode', () => { + expect(resolveConversationKey('telegram', 'per-channel', new Set())).toBe('telegram'); + }); + + it('returns channel id for override channel in shared mode', () => { + const overrides = new Set(['slack']); + expect(resolveConversationKey('slack', 'shared', overrides)).toBe('slack'); + }); + + it('non-override channels still return "shared" when overrides are configured', () => { + const overrides = new Set(['slack']); + expect(resolveConversationKey('telegram', 'shared', overrides)).toBe('shared'); + }); + + it('multiple override channels all get their own keys', () => { + const overrides = new Set(['slack', 'discord']); + expect(resolveConversationKey('slack', 'shared', overrides)).toBe('slack'); + expect(resolveConversationKey('discord', 'shared', overrides)).toBe('discord'); + expect(resolveConversationKey('telegram', 'shared', overrides)).toBe('shared'); + }); + + it('normalizes channel name to lowercase', () => { + const overrides = new Set(['slack']); + expect(resolveConversationKey('SLACK', 'shared', overrides)).toBe('slack'); + expect(resolveConversationKey('Telegram', 'per-channel', new Set())).toBe('telegram'); + }); + + it('case-insensitive override matching', () => { + const overrides = new Set(['slack']); + expect(resolveConversationKey('Slack', 'shared', overrides)).toBe('slack'); + }); + + it('returns channel id in per-channel mode even when channel is also in overrides', () => { + const overrides = new Set(['slack']); + expect(resolveConversationKey('slack', 'per-channel', overrides)).toBe('slack'); + }); + + it('returns "shared" when conversationMode is undefined', () => { + expect(resolveConversationKey('telegram', undefined, new Set())).toBe('shared'); + }); +}); + +// --------------------------------------------------------------------------- +// resolveHeartbeatConversationKey +// --------------------------------------------------------------------------- +describe('resolveHeartbeatConversationKey', () => { + // --- per-channel mode --- + + it('returns "heartbeat" when mode=per-channel and heartbeat=dedicated', () => { + expect(resolveHeartbeatConversationKey('per-channel', 'dedicated', new Set())).toBe('heartbeat'); + }); + + it('returns last-active channel in per-channel mode with last-active', () => { + expect(resolveHeartbeatConversationKey('per-channel', 'last-active', new Set(), 'telegram')).toBe('telegram'); + }); + + it('returns "shared" when per-channel and last-active but no last channel', () => { + expect(resolveHeartbeatConversationKey('per-channel', 'last-active', new Set(), undefined)).toBe('shared'); + }); + + it('returns explicit channel name in per-channel mode', () => { + expect(resolveHeartbeatConversationKey('per-channel', 'discord', new Set(), 'telegram')).toBe('discord'); + }); + + // --- shared mode, no overrides --- + + it('returns "shared" in shared mode with no overrides', () => { + expect(resolveHeartbeatConversationKey('shared', 'last-active', new Set(), 'telegram')).toBe('shared'); + }); + + it('returns "shared" in shared mode with undefined heartbeat', () => { + expect(resolveHeartbeatConversationKey('shared', undefined, new Set(), 'telegram')).toBe('shared'); + }); + + // --- shared mode with overrides --- + + it('returns override channel key when last-active channel is an override', () => { + const overrides = new Set(['slack']); + expect(resolveHeartbeatConversationKey('shared', 'last-active', overrides, 'slack')).toBe('slack'); + }); + + it('returns "shared" when last-active channel is NOT an override', () => { + const overrides = new Set(['slack']); + expect(resolveHeartbeatConversationKey('shared', 'last-active', overrides, 'telegram')).toBe('shared'); + }); + + it('returns "shared" when overrides exist but no last-active channel', () => { + const overrides = new Set(['slack']); + expect(resolveHeartbeatConversationKey('shared', 'last-active', overrides, undefined)).toBe('shared'); + }); + + it('returns "shared" in shared mode even with overrides when heartbeat is not last-active', () => { + // Non-last-active heartbeat in shared mode always returns 'shared' + const overrides = new Set(['slack']); + expect(resolveHeartbeatConversationKey('shared', 'dedicated', overrides, 'slack')).toBe('shared'); + }); +}); diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index 4a310eb..e188991 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -251,4 +251,33 @@ describe('SDK session contract', () => { const opts = vi.mocked(createSession).mock.calls[0][1]; expect(opts).not.toHaveProperty('memfs'); }); + + it('restarts a keyed queue after non-shared lock release when backlog exists', async () => { + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + }); + const botInternal = bot as any; + + botInternal.processingKeys.add('slack'); + botInternal.keyedQueues.set('slack', [ + { + msg: { + userId: 'u1', + channel: 'slack', + chatId: 'C123', + text: 'queued while locked', + timestamp: new Date(), + isGroup: false, + }, + adapter: {}, + }, + ]); + + const processSpy = vi.spyOn(botInternal, 'processKeyedQueue').mockResolvedValue(undefined); + botInternal.releaseLock('slack', true); + + expect(botInternal.processingKeys.has('slack')).toBe(false); + expect(processSpy).toHaveBeenCalledWith('slack'); + }); }); diff --git a/src/core/types.ts b/src/core/types.ts index aedf6d7..697eb06 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -156,6 +156,7 @@ export interface BotConfig { // Conversation routing conversationMode?: 'shared' | 'per-channel'; // Default: shared heartbeatConversation?: string; // "dedicated" | "last-active" | "" (default: last-active) + conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode) } /** diff --git a/src/main.ts b/src/main.ts index 4cc2bd0..922807f 100644 --- a/src/main.ts +++ b/src/main.ts @@ -602,6 +602,7 @@ async function main() { display: agentConfig.features?.display, conversationMode: agentConfig.conversations?.mode || 'shared', heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active', + conversationOverrides: agentConfig.conversations?.perChannel, skills: { cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled, googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,