Add per-channel conversation overrides (#340)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Cameron <cameron@pfiffer.org>
This commit is contained in:
@@ -234,12 +234,29 @@ Each entry in `agents:` accepts:
|
||||
| `id` | string | No | Use existing agent ID (skips creation) |
|
||||
| `displayName` | string | No | Prefix outbound messages (e.g. `"💜 Signo"`) |
|
||||
| `model` | string | No | Model for agent creation |
|
||||
| `conversations` | object | No | Conversation routing config (shared vs per-channel) |
|
||||
| `conversations` | object | No | Conversation routing (mode, heartbeat, perChannel overrides) |
|
||||
| `channels` | object | No | Channel configs (same schema as top-level `channels:`). At least one agent must have channels. |
|
||||
| `features` | object | No | Per-agent features (cron, heartbeat, memfs, maxToolCalls) |
|
||||
| `polling` | object | No | Per-agent polling config (Gmail, etc.) |
|
||||
| `integrations` | object | No | Per-agent integrations (Google, etc.) |
|
||||
|
||||
### Conversation Routing
|
||||
|
||||
Conversation routing controls which incoming messages share a Letta conversation.
|
||||
|
||||
```yaml
|
||||
conversations:
|
||||
mode: shared # shared (default) or per-channel
|
||||
heartbeat: last-active # per-channel mode, or shared mode with perChannel overrides
|
||||
perChannel:
|
||||
- bluesky # always separate, even in shared mode
|
||||
```
|
||||
|
||||
- **mode: shared** (default) keeps one shared conversation across all channels.
|
||||
- **mode: per-channel** creates an independent conversation per channel.
|
||||
- **perChannel** lets you keep most channels shared while carving out specific channels to run independently.
|
||||
- **heartbeat**: `dedicated`, `last-active`, or a specific channel name. Applies in per-channel mode and in shared mode with perChannel overrides.
|
||||
|
||||
### How it works
|
||||
|
||||
- Each agent is a separate Letta agent with its own conversation history and memory
|
||||
|
||||
@@ -24,6 +24,7 @@ agents:
|
||||
conversations:
|
||||
mode: shared # "shared" (default) or "per-channel"
|
||||
heartbeat: last-active # "dedicated" | "last-active" | "<channel>"
|
||||
# perChannel: ["slack"] # Keep these channels isolated while others share a conversation
|
||||
|
||||
channels:
|
||||
telegram:
|
||||
|
||||
@@ -23,6 +23,36 @@ import {
|
||||
} from './types.js';
|
||||
|
||||
describe('normalizeAgents', () => {
|
||||
const envVars = [
|
||||
'TELEGRAM_BOT_TOKEN', 'TELEGRAM_DM_POLICY', 'TELEGRAM_ALLOWED_USERS',
|
||||
'SLACK_BOT_TOKEN', 'SLACK_APP_TOKEN', 'SLACK_DM_POLICY', 'SLACK_ALLOWED_USERS',
|
||||
'WHATSAPP_ENABLED', 'WHATSAPP_SELF_CHAT_MODE', 'WHATSAPP_DM_POLICY', 'WHATSAPP_ALLOWED_USERS',
|
||||
'SIGNAL_PHONE_NUMBER', 'SIGNAL_SELF_CHAT_MODE', 'SIGNAL_DM_POLICY', 'SIGNAL_ALLOWED_USERS',
|
||||
'DISCORD_BOT_TOKEN', 'DISCORD_DM_POLICY', 'DISCORD_ALLOWED_USERS',
|
||||
'BLUESKY_WANTED_DIDS', 'BLUESKY_WANTED_COLLECTIONS', 'BLUESKY_JETSTREAM_URL', 'BLUESKY_CURSOR',
|
||||
'BLUESKY_HANDLE', 'BLUESKY_APP_PASSWORD', 'BLUESKY_SERVICE_URL', 'BLUESKY_APPVIEW_URL',
|
||||
'BLUESKY_NOTIFICATIONS_ENABLED', 'BLUESKY_NOTIFICATIONS_INTERVAL_SEC', 'BLUESKY_NOTIFICATIONS_LIMIT',
|
||||
'BLUESKY_NOTIFICATIONS_PRIORITY', 'BLUESKY_NOTIFICATIONS_REASONS',
|
||||
];
|
||||
const savedEnv: Record<string, string | undefined> = {};
|
||||
|
||||
beforeEach(() => {
|
||||
for (const key of envVars) {
|
||||
savedEnv[key] = process.env[key];
|
||||
delete process.env[key];
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
for (const key of envVars) {
|
||||
if (savedEnv[key] !== undefined) {
|
||||
process.env[key] = savedEnv[key];
|
||||
} else {
|
||||
delete process.env[key];
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it('canonicalizes legacy server mode aliases', () => {
|
||||
expect(canonicalizeServerMode('cloud')).toBe('api');
|
||||
expect(canonicalizeServerMode('api')).toBe('api');
|
||||
@@ -243,32 +273,6 @@ describe('normalizeAgents', () => {
|
||||
});
|
||||
|
||||
describe('env var fallback (container deploys)', () => {
|
||||
const envVars = [
|
||||
'TELEGRAM_BOT_TOKEN', 'TELEGRAM_DM_POLICY', 'TELEGRAM_ALLOWED_USERS',
|
||||
'SLACK_BOT_TOKEN', 'SLACK_APP_TOKEN', 'SLACK_DM_POLICY', 'SLACK_ALLOWED_USERS',
|
||||
'WHATSAPP_ENABLED', 'WHATSAPP_SELF_CHAT_MODE', 'WHATSAPP_DM_POLICY', 'WHATSAPP_ALLOWED_USERS',
|
||||
'SIGNAL_PHONE_NUMBER', 'SIGNAL_SELF_CHAT_MODE', 'SIGNAL_DM_POLICY', 'SIGNAL_ALLOWED_USERS',
|
||||
'DISCORD_BOT_TOKEN', 'DISCORD_DM_POLICY', 'DISCORD_ALLOWED_USERS',
|
||||
];
|
||||
const savedEnv: Record<string, string | undefined> = {};
|
||||
|
||||
beforeEach(() => {
|
||||
for (const key of envVars) {
|
||||
savedEnv[key] = process.env[key];
|
||||
delete process.env[key];
|
||||
}
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
for (const key of envVars) {
|
||||
if (savedEnv[key] !== undefined) {
|
||||
process.env[key] = savedEnv[key];
|
||||
} else {
|
||||
delete process.env[key];
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it('should pick up channels from env vars when YAML has none', () => {
|
||||
process.env.TELEGRAM_BOT_TOKEN = 'env-telegram-token';
|
||||
process.env.DISCORD_BOT_TOKEN = 'env-discord-token';
|
||||
|
||||
@@ -66,6 +66,7 @@ export interface AgentConfig {
|
||||
conversations?: {
|
||||
mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels)
|
||||
heartbeat?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
perChannel?: string[]; // Channels that should always have their own conversation
|
||||
};
|
||||
/** Features for this agent */
|
||||
features?: {
|
||||
@@ -143,6 +144,7 @@ export interface LettaBotConfig {
|
||||
conversations?: {
|
||||
mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels)
|
||||
heartbeat?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
perChannel?: string[]; // Channels that should always have their own conversation
|
||||
};
|
||||
|
||||
// Features
|
||||
|
||||
@@ -206,6 +206,49 @@ export function isResponseDeliverySuppressed(msg: Pick<InboundMessage, 'isListen
|
||||
return msg.isListeningMode === true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pure function: resolve the conversation key for a channel message.
|
||||
* Returns the channel id in per-channel mode or when the channel is in overrides.
|
||||
* Returns 'shared' otherwise.
|
||||
*/
|
||||
export function resolveConversationKey(
|
||||
channel: string,
|
||||
conversationMode: string | undefined,
|
||||
conversationOverrides: Set<string>,
|
||||
): string {
|
||||
const normalized = channel.toLowerCase();
|
||||
if (conversationMode === 'per-channel') return normalized;
|
||||
if (conversationOverrides.has(normalized)) return normalized;
|
||||
return 'shared';
|
||||
}
|
||||
|
||||
/**
|
||||
* Pure function: resolve the conversation key for heartbeat/sendToAgent.
|
||||
* In per-channel mode, respects heartbeatConversation setting.
|
||||
* In shared mode with overrides, respects override channels when using last-active.
|
||||
*/
|
||||
export function resolveHeartbeatConversationKey(
|
||||
conversationMode: string | undefined,
|
||||
heartbeatConversation: string | undefined,
|
||||
conversationOverrides: Set<string>,
|
||||
lastActiveChannel?: string,
|
||||
): string {
|
||||
const hb = heartbeatConversation || 'last-active';
|
||||
|
||||
if (conversationMode === 'per-channel') {
|
||||
if (hb === 'dedicated') return 'heartbeat';
|
||||
if (hb === 'last-active') return lastActiveChannel ?? 'shared';
|
||||
return hb;
|
||||
}
|
||||
|
||||
// shared mode — if last-active and overrides exist, respect the override channel
|
||||
if (hb === 'last-active' && conversationOverrides.size > 0 && lastActiveChannel) {
|
||||
return resolveConversationKey(lastActiveChannel, conversationMode, conversationOverrides);
|
||||
}
|
||||
|
||||
return 'shared';
|
||||
}
|
||||
|
||||
export class LettaBot implements AgentSession {
|
||||
private store: Store;
|
||||
private config: BotConfig;
|
||||
@@ -230,6 +273,7 @@ export class LettaBot implements AgentSession {
|
||||
// channel (and optionally heartbeat) gets its own subprocess.
|
||||
private sessions: Map<string, Session> = new Map();
|
||||
private currentCanUseTool: CanUseToolCallback | undefined;
|
||||
private conversationOverrides: Set<string> = new Set();
|
||||
// Stable callback wrapper so the Session options never change, but we can
|
||||
// swap out the per-message handler before each send().
|
||||
private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => {
|
||||
@@ -243,6 +287,9 @@ export class LettaBot implements AgentSession {
|
||||
this.config = config;
|
||||
mkdirSync(config.workingDir, { recursive: true });
|
||||
this.store = new Store('lettabot-agent.json', config.agentName);
|
||||
if (config.conversationOverrides?.length) {
|
||||
this.conversationOverrides = new Set(config.conversationOverrides.map((ch) => ch.toLowerCase()));
|
||||
}
|
||||
log.info(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`);
|
||||
}
|
||||
|
||||
@@ -725,27 +772,25 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
/**
|
||||
* Resolve the conversation key for a channel message.
|
||||
* In shared mode returns "shared"; in per-channel mode returns the channel id.
|
||||
* Returns 'shared' in shared mode (unless channel is in perChannel overrides).
|
||||
* Returns channel id in per-channel mode or for override channels.
|
||||
*/
|
||||
private resolveConversationKey(channel: string): string {
|
||||
return this.config.conversationMode === 'per-channel' ? channel : 'shared';
|
||||
return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the conversation key for heartbeat/sendToAgent.
|
||||
* Respects perChannel overrides when using last-active in shared mode.
|
||||
*/
|
||||
private resolveHeartbeatConversationKey(): string {
|
||||
if (this.config.conversationMode !== 'per-channel') return 'shared';
|
||||
|
||||
const hb = this.config.heartbeatConversation || 'last-active';
|
||||
if (hb === 'dedicated') return 'heartbeat';
|
||||
if (hb === 'last-active') {
|
||||
// Use the last channel the user messaged on
|
||||
const target = this.store.lastMessageTarget;
|
||||
return target ? target.channel : 'shared';
|
||||
}
|
||||
// Explicit channel name (e.g., "telegram")
|
||||
return hb;
|
||||
const lastActiveChannel = this.store.lastMessageTarget?.channel;
|
||||
return resolveHeartbeatConversationKey(
|
||||
this.config.conversationMode,
|
||||
this.config.heartbeatConversation,
|
||||
this.conversationOverrides,
|
||||
lastActiveChannel,
|
||||
);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
@@ -1029,8 +1074,8 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
|
||||
if (this.config.conversationMode === 'per-channel') {
|
||||
const convKey = this.resolveConversationKey(effective.channel);
|
||||
const convKey = this.resolveConversationKey(effective.channel);
|
||||
if (convKey !== 'shared') {
|
||||
this.enqueueForKey(convKey, effective, adapter);
|
||||
} else {
|
||||
this.messageQueue.push({ msg: effective, adapter });
|
||||
@@ -1224,10 +1269,9 @@ export class LettaBot implements AgentSession {
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.config.conversationMode === 'per-channel') {
|
||||
// Per-channel mode: messages on different channels can run in parallel.
|
||||
// Only serialize within the same conversation key.
|
||||
const convKey = this.resolveConversationKey(msg.channel);
|
||||
const convKey = this.resolveConversationKey(msg.channel);
|
||||
if (convKey !== 'shared') {
|
||||
// Per-channel or override mode: messages on different keys can run in parallel.
|
||||
this.enqueueForKey(convKey, msg, adapter);
|
||||
} else {
|
||||
// Shared mode: single global queue (existing behavior)
|
||||
@@ -1833,7 +1877,7 @@ export class LettaBot implements AgentSession {
|
||||
private async acquireLock(convKey: string): Promise<boolean> {
|
||||
if (convKey === 'heartbeat') return false; // No lock needed
|
||||
|
||||
if (this.config.conversationMode === 'per-channel') {
|
||||
if (convKey !== 'shared') {
|
||||
while (this.processingKeys.has(convKey)) {
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
}
|
||||
@@ -1849,8 +1893,17 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
private releaseLock(convKey: string, acquired: boolean): void {
|
||||
if (!acquired) return;
|
||||
if (this.config.conversationMode === 'per-channel') {
|
||||
if (convKey !== 'shared') {
|
||||
this.processingKeys.delete(convKey);
|
||||
// Heartbeats/sendToAgent may hold a channel key while user messages for
|
||||
// that same key queue up. Kick the keyed worker after unlock so queued
|
||||
// messages are not left waiting for another inbound message to arrive.
|
||||
const queue = this.keyedQueues.get(convKey);
|
||||
if (queue && queue.length > 0) {
|
||||
this.processKeyedQueue(convKey).catch(err =>
|
||||
log.error(`Fatal error in processKeyedQueue(${convKey}) after lock release:`, err)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
this.processing = false;
|
||||
this.processQueue();
|
||||
|
||||
108
src/core/conversation-key.test.ts
Normal file
108
src/core/conversation-key.test.ts
Normal file
@@ -0,0 +1,108 @@
|
||||
import { describe, it, expect } from 'vitest';
|
||||
import { resolveConversationKey, resolveHeartbeatConversationKey } from './bot.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// resolveConversationKey
|
||||
// ---------------------------------------------------------------------------
|
||||
describe('resolveConversationKey', () => {
|
||||
it('returns "shared" in shared mode for a normal channel', () => {
|
||||
expect(resolveConversationKey('telegram', 'shared', new Set())).toBe('shared');
|
||||
});
|
||||
|
||||
it('returns channel id in per-channel mode', () => {
|
||||
expect(resolveConversationKey('telegram', 'per-channel', new Set())).toBe('telegram');
|
||||
});
|
||||
|
||||
it('returns channel id for override channel in shared mode', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveConversationKey('slack', 'shared', overrides)).toBe('slack');
|
||||
});
|
||||
|
||||
it('non-override channels still return "shared" when overrides are configured', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveConversationKey('telegram', 'shared', overrides)).toBe('shared');
|
||||
});
|
||||
|
||||
it('multiple override channels all get their own keys', () => {
|
||||
const overrides = new Set(['slack', 'discord']);
|
||||
expect(resolveConversationKey('slack', 'shared', overrides)).toBe('slack');
|
||||
expect(resolveConversationKey('discord', 'shared', overrides)).toBe('discord');
|
||||
expect(resolveConversationKey('telegram', 'shared', overrides)).toBe('shared');
|
||||
});
|
||||
|
||||
it('normalizes channel name to lowercase', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveConversationKey('SLACK', 'shared', overrides)).toBe('slack');
|
||||
expect(resolveConversationKey('Telegram', 'per-channel', new Set())).toBe('telegram');
|
||||
});
|
||||
|
||||
it('case-insensitive override matching', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveConversationKey('Slack', 'shared', overrides)).toBe('slack');
|
||||
});
|
||||
|
||||
it('returns channel id in per-channel mode even when channel is also in overrides', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveConversationKey('slack', 'per-channel', overrides)).toBe('slack');
|
||||
});
|
||||
|
||||
it('returns "shared" when conversationMode is undefined', () => {
|
||||
expect(resolveConversationKey('telegram', undefined, new Set())).toBe('shared');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// resolveHeartbeatConversationKey
|
||||
// ---------------------------------------------------------------------------
|
||||
describe('resolveHeartbeatConversationKey', () => {
|
||||
// --- per-channel mode ---
|
||||
|
||||
it('returns "heartbeat" when mode=per-channel and heartbeat=dedicated', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-channel', 'dedicated', new Set())).toBe('heartbeat');
|
||||
});
|
||||
|
||||
it('returns last-active channel in per-channel mode with last-active', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-channel', 'last-active', new Set(), 'telegram')).toBe('telegram');
|
||||
});
|
||||
|
||||
it('returns "shared" when per-channel and last-active but no last channel', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-channel', 'last-active', new Set(), undefined)).toBe('shared');
|
||||
});
|
||||
|
||||
it('returns explicit channel name in per-channel mode', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-channel', 'discord', new Set(), 'telegram')).toBe('discord');
|
||||
});
|
||||
|
||||
// --- shared mode, no overrides ---
|
||||
|
||||
it('returns "shared" in shared mode with no overrides', () => {
|
||||
expect(resolveHeartbeatConversationKey('shared', 'last-active', new Set(), 'telegram')).toBe('shared');
|
||||
});
|
||||
|
||||
it('returns "shared" in shared mode with undefined heartbeat', () => {
|
||||
expect(resolveHeartbeatConversationKey('shared', undefined, new Set(), 'telegram')).toBe('shared');
|
||||
});
|
||||
|
||||
// --- shared mode with overrides ---
|
||||
|
||||
it('returns override channel key when last-active channel is an override', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveHeartbeatConversationKey('shared', 'last-active', overrides, 'slack')).toBe('slack');
|
||||
});
|
||||
|
||||
it('returns "shared" when last-active channel is NOT an override', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveHeartbeatConversationKey('shared', 'last-active', overrides, 'telegram')).toBe('shared');
|
||||
});
|
||||
|
||||
it('returns "shared" when overrides exist but no last-active channel', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveHeartbeatConversationKey('shared', 'last-active', overrides, undefined)).toBe('shared');
|
||||
});
|
||||
|
||||
it('returns "shared" in shared mode even with overrides when heartbeat is not last-active', () => {
|
||||
// Non-last-active heartbeat in shared mode always returns 'shared'
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveHeartbeatConversationKey('shared', 'dedicated', overrides, 'slack')).toBe('shared');
|
||||
});
|
||||
});
|
||||
@@ -251,4 +251,33 @@ describe('SDK session contract', () => {
|
||||
const opts = vi.mocked(createSession).mock.calls[0][1];
|
||||
expect(opts).not.toHaveProperty('memfs');
|
||||
});
|
||||
|
||||
it('restarts a keyed queue after non-shared lock release when backlog exists', async () => {
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
});
|
||||
const botInternal = bot as any;
|
||||
|
||||
botInternal.processingKeys.add('slack');
|
||||
botInternal.keyedQueues.set('slack', [
|
||||
{
|
||||
msg: {
|
||||
userId: 'u1',
|
||||
channel: 'slack',
|
||||
chatId: 'C123',
|
||||
text: 'queued while locked',
|
||||
timestamp: new Date(),
|
||||
isGroup: false,
|
||||
},
|
||||
adapter: {},
|
||||
},
|
||||
]);
|
||||
|
||||
const processSpy = vi.spyOn(botInternal, 'processKeyedQueue').mockResolvedValue(undefined);
|
||||
botInternal.releaseLock('slack', true);
|
||||
|
||||
expect(botInternal.processingKeys.has('slack')).toBe(false);
|
||||
expect(processSpy).toHaveBeenCalledWith('slack');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -156,6 +156,7 @@ export interface BotConfig {
|
||||
// Conversation routing
|
||||
conversationMode?: 'shared' | 'per-channel'; // Default: shared
|
||||
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode)
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -602,6 +602,7 @@ async function main() {
|
||||
display: agentConfig.features?.display,
|
||||
conversationMode: agentConfig.conversations?.mode || 'shared',
|
||||
heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active',
|
||||
conversationOverrides: agentConfig.conversations?.perChannel,
|
||||
skills: {
|
||||
cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled,
|
||||
googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,
|
||||
|
||||
Reference in New Issue
Block a user