fix: merge env var credentials into YAML channel blocks and warn on silent skip (#408)
This commit is contained in:
@@ -813,7 +813,9 @@ The API server also exposes `/v1/chat/completions` and `/v1/models` -- a drop-in
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Environment variables override config file values:
|
||||
Environment variables serve as fallbacks and can fill in missing credentials. If a channel block exists in YAML but is missing its key credential (e.g., `signal: enabled: true` without `phone`), the corresponding env var (e.g., `SIGNAL_PHONE_NUMBER`) will be merged in. YAML values always take priority -- env vars never overwrite values already set in the config file.
|
||||
|
||||
Reference:
|
||||
|
||||
| Env Variable | Config Equivalent |
|
||||
|--------------|-------------------|
|
||||
|
||||
@@ -30,6 +30,12 @@ Link signal-cli to your existing Signal account without disrupting your phone ap
|
||||
signal-cli link -n "LettaBot"
|
||||
```
|
||||
|
||||
**Headless / SSH setup:** If you're running on a server without a display, pipe through `qrencode` to render the QR code in your terminal:
|
||||
|
||||
```bash
|
||||
signal-cli link -n "LettaBot" | tee >(xargs -L 1 qrencode -t ANSI256UTF8)
|
||||
```
|
||||
|
||||
This will display a `sgnl://linkdevice?uuid=...` URI. On your phone:
|
||||
1. Open Signal → Settings (tap your profile)
|
||||
2. Tap "Linked Devices"
|
||||
|
||||
@@ -56,7 +56,7 @@ export class DiscordAdapter implements ChannelAdapter {
|
||||
private attachmentsMaxBytes?: number;
|
||||
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
|
||||
constructor(config: DiscordConfig) {
|
||||
this.config = {
|
||||
@@ -253,7 +253,7 @@ Ask the bot owner to approve with:
|
||||
}
|
||||
if (this.onCommand) {
|
||||
if (command === 'status' || command === 'reset' || command === 'heartbeat') {
|
||||
const result = await this.onCommand(command);
|
||||
const result = await this.onCommand(command, message.channel.id);
|
||||
if (result) {
|
||||
await message.channel.send(result);
|
||||
}
|
||||
|
||||
@@ -163,7 +163,7 @@ export class SignalAdapter implements ChannelAdapter {
|
||||
private baseUrl: string;
|
||||
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
|
||||
constructor(config: SignalConfig) {
|
||||
this.config = {
|
||||
@@ -785,7 +785,7 @@ This code expires in 1 hour.`;
|
||||
if (command === 'help' || command === 'start') {
|
||||
await this.sendMessage({ chatId, text: HELP_TEXT });
|
||||
} else if (this.onCommand) {
|
||||
const result = await this.onCommand(command);
|
||||
const result = await this.onCommand(command, chatId);
|
||||
if (result) await this.sendMessage({ chatId, text: result });
|
||||
}
|
||||
return; // Don't pass commands to agent
|
||||
|
||||
@@ -40,7 +40,7 @@ export class SlackAdapter implements ChannelAdapter {
|
||||
private attachmentsMaxBytes?: number;
|
||||
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
|
||||
constructor(config: SlackConfig) {
|
||||
this.config = config;
|
||||
@@ -119,7 +119,7 @@ export class SlackAdapter implements ChannelAdapter {
|
||||
if (command === 'help' || command === 'start') {
|
||||
await say(await markdownToSlackMrkdwn(HELP_TEXT));
|
||||
} else if (this.onCommand) {
|
||||
const result = await this.onCommand(command);
|
||||
const result = await this.onCommand(command, channelId);
|
||||
if (result) await say(await markdownToSlackMrkdwn(result));
|
||||
}
|
||||
return; // Don't pass commands to agent
|
||||
@@ -236,7 +236,7 @@ export class SlackAdapter implements ChannelAdapter {
|
||||
if (command === 'help' || command === 'start') {
|
||||
await this.sendMessage({ chatId: channelId, text: HELP_TEXT, threadId: threadTs });
|
||||
} else if (this.onCommand) {
|
||||
const result = await this.onCommand(command);
|
||||
const result = await this.onCommand(command, channelId);
|
||||
if (result) await this.sendMessage({ chatId: channelId, text: result, threadId: threadTs });
|
||||
}
|
||||
return; // Don't pass commands to agent
|
||||
|
||||
@@ -78,7 +78,7 @@ export class TelegramMTProtoAdapter implements ChannelAdapter {
|
||||
private pendingPairingApprovals = new Map<number, { code: string; userId: string; username: string }>();
|
||||
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
|
||||
constructor(config: TelegramMTProtoConfig) {
|
||||
this.config = {
|
||||
|
||||
@@ -44,7 +44,7 @@ export class TelegramAdapter implements ChannelAdapter {
|
||||
private attachmentsMaxBytes?: number;
|
||||
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
|
||||
constructor(config: TelegramConfig) {
|
||||
this.config = {
|
||||
@@ -239,7 +239,7 @@ export class TelegramAdapter implements ChannelAdapter {
|
||||
// Handle /status
|
||||
this.bot.command('status', async (ctx) => {
|
||||
if (this.onCommand) {
|
||||
const result = await this.onCommand('status');
|
||||
const result = await this.onCommand('status', String(ctx.chat.id));
|
||||
await ctx.reply(result || 'No status available');
|
||||
}
|
||||
});
|
||||
@@ -247,14 +247,14 @@ export class TelegramAdapter implements ChannelAdapter {
|
||||
// Handle /heartbeat - trigger heartbeat manually (silent - no reply)
|
||||
this.bot.command('heartbeat', async (ctx) => {
|
||||
if (this.onCommand) {
|
||||
await this.onCommand('heartbeat');
|
||||
await this.onCommand('heartbeat', String(ctx.chat.id));
|
||||
}
|
||||
});
|
||||
|
||||
// Handle /reset
|
||||
this.bot.command('reset', async (ctx) => {
|
||||
if (this.onCommand) {
|
||||
const result = await this.onCommand('reset');
|
||||
const result = await this.onCommand('reset', String(ctx.chat.id));
|
||||
await ctx.reply(result || 'Reset complete');
|
||||
}
|
||||
});
|
||||
|
||||
@@ -32,7 +32,7 @@ export interface ChannelAdapter {
|
||||
|
||||
// Event handlers (set by bot core)
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -181,7 +181,7 @@ export class WhatsAppAdapter implements ChannelAdapter {
|
||||
|
||||
// Event handlers (set by bot core)
|
||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||
onCommand?: (command: string) => Promise<string | null>;
|
||||
onCommand?: (command: string, chatId?: string) => Promise<string | null>;
|
||||
|
||||
// Pre-bound handlers (created once to avoid bind() overhead)
|
||||
private boundHandleConnectionUpdate: (update: Partial<import("@whiskeysockets/baileys").ConnectionState>) => void;
|
||||
@@ -814,7 +814,7 @@ export class WhatsAppAdapter implements ChannelAdapter {
|
||||
if (command === 'help' || command === 'start') {
|
||||
await this.sendMessage({ chatId, text: HELP_TEXT });
|
||||
} else if (this.onCommand) {
|
||||
const result = await this.onCommand(command);
|
||||
const result = await this.onCommand(command, chatId);
|
||||
if (result) await this.sendMessage({ chatId, text: result });
|
||||
}
|
||||
return; // Don't pass commands to agent
|
||||
|
||||
@@ -305,6 +305,56 @@ describe('normalizeAgents', () => {
|
||||
expect(agents[0].channels.telegram?.token).toBe('yaml-token');
|
||||
});
|
||||
|
||||
it('should merge env var credential into YAML block missing it', () => {
|
||||
process.env.SIGNAL_PHONE_NUMBER = '+15551234567';
|
||||
process.env.DISCORD_BOT_TOKEN = 'env-discord-token';
|
||||
process.env.TELEGRAM_BOT_TOKEN = 'env-tg-token';
|
||||
|
||||
const config: LettaBotConfig = {
|
||||
server: { mode: 'cloud' },
|
||||
agent: { name: 'TestBot', model: 'test' },
|
||||
channels: {
|
||||
signal: { enabled: true, selfChat: true, dmPolicy: 'pairing' },
|
||||
discord: { enabled: true, dmPolicy: 'open' },
|
||||
telegram: { enabled: true, dmPolicy: 'pairing' },
|
||||
},
|
||||
};
|
||||
|
||||
const agents = normalizeAgents(config);
|
||||
|
||||
// Env var should fill in the missing credential
|
||||
expect(agents[0].channels.signal?.phone).toBe('+15551234567');
|
||||
expect(agents[0].channels.signal?.dmPolicy).toBe('pairing');
|
||||
expect(agents[0].channels.discord?.token).toBe('env-discord-token');
|
||||
expect(agents[0].channels.discord?.dmPolicy).toBe('open');
|
||||
expect(agents[0].channels.telegram?.token).toBe('env-tg-token');
|
||||
});
|
||||
|
||||
it('should merge env var credential into YAML block missing it', () => {
|
||||
process.env.SIGNAL_PHONE_NUMBER = '+15551234567';
|
||||
process.env.DISCORD_BOT_TOKEN = 'env-discord-token';
|
||||
process.env.TELEGRAM_BOT_TOKEN = 'env-tg-token';
|
||||
|
||||
const config: LettaBotConfig = {
|
||||
server: { mode: 'cloud' },
|
||||
agent: { name: 'TestBot', model: 'test' },
|
||||
channels: {
|
||||
signal: { enabled: true, selfChat: true, dmPolicy: 'pairing' },
|
||||
discord: { enabled: true, dmPolicy: 'open' },
|
||||
telegram: { enabled: true, dmPolicy: 'pairing' },
|
||||
},
|
||||
};
|
||||
|
||||
const agents = normalizeAgents(config);
|
||||
|
||||
// Env var should fill in the missing credential
|
||||
expect(agents[0].channels.signal?.phone).toBe('+15551234567');
|
||||
expect(agents[0].channels.signal?.dmPolicy).toBe('pairing');
|
||||
expect(agents[0].channels.discord?.token).toBe('env-discord-token');
|
||||
expect(agents[0].channels.discord?.dmPolicy).toBe('open');
|
||||
expect(agents[0].channels.telegram?.token).toBe('env-tg-token');
|
||||
});
|
||||
|
||||
it('should not apply env vars in multi-agent mode', () => {
|
||||
process.env.TELEGRAM_BOT_TOKEN = 'env-token';
|
||||
|
||||
|
||||
@@ -64,9 +64,10 @@ export interface AgentConfig {
|
||||
};
|
||||
/** Conversation routing */
|
||||
conversations?: {
|
||||
mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels)
|
||||
mode?: 'shared' | 'per-channel' | 'per-chat'; // Default: shared (single conversation across all channels)
|
||||
heartbeat?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
perChannel?: string[]; // Channels that should always have their own conversation
|
||||
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
|
||||
};
|
||||
/** Features for this agent */
|
||||
features?: {
|
||||
@@ -142,9 +143,10 @@ export interface LettaBotConfig {
|
||||
|
||||
// Conversation routing
|
||||
conversations?: {
|
||||
mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels)
|
||||
mode?: 'shared' | 'per-channel' | 'per-chat'; // Default: shared (single conversation across all channels)
|
||||
heartbeat?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
perChannel?: string[]; // Channels that should always have their own conversation
|
||||
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
|
||||
};
|
||||
|
||||
// Features
|
||||
@@ -469,6 +471,24 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
|
||||
const normalized: AgentConfig['channels'] = {};
|
||||
if (!channels) return normalized;
|
||||
|
||||
// Merge env vars into YAML blocks that are missing their key credential.
|
||||
// Without this, `signal: enabled: true` + SIGNAL_PHONE_NUMBER env var
|
||||
// silently fails because the env-var-only fallback (below) only fires
|
||||
// when the YAML block is completely absent.
|
||||
if (channels.telegram && !channels.telegram.token && process.env.TELEGRAM_BOT_TOKEN) {
|
||||
channels.telegram.token = process.env.TELEGRAM_BOT_TOKEN;
|
||||
}
|
||||
if (channels.slack) {
|
||||
if (!channels.slack.botToken && process.env.SLACK_BOT_TOKEN) channels.slack.botToken = process.env.SLACK_BOT_TOKEN;
|
||||
if (!channels.slack.appToken && process.env.SLACK_APP_TOKEN) channels.slack.appToken = process.env.SLACK_APP_TOKEN;
|
||||
}
|
||||
if (channels.signal && !channels.signal.phone && process.env.SIGNAL_PHONE_NUMBER) {
|
||||
channels.signal.phone = process.env.SIGNAL_PHONE_NUMBER;
|
||||
}
|
||||
if (channels.discord && !channels.discord.token && process.env.DISCORD_BOT_TOKEN) {
|
||||
channels.discord.token = process.env.DISCORD_BOT_TOKEN;
|
||||
}
|
||||
|
||||
if (channels.telegram?.enabled !== false && channels.telegram?.token) {
|
||||
const telegram = { ...channels.telegram };
|
||||
normalizeLegacyGroupFields(telegram, `${sourcePath}.telegram`);
|
||||
@@ -500,6 +520,19 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
|
||||
normalized.discord = discord;
|
||||
}
|
||||
|
||||
// Warn when a channel block exists but was dropped due to missing credentials
|
||||
const channelCredentials: Array<[string, unknown, boolean]> = [
|
||||
['telegram', channels.telegram, !!normalized.telegram],
|
||||
['slack', channels.slack, !!normalized.slack],
|
||||
['signal', channels.signal, !!normalized.signal],
|
||||
['discord', channels.discord, !!normalized.discord],
|
||||
];
|
||||
for (const [name, raw, included] of channelCredentials) {
|
||||
if (raw && (raw as Record<string, unknown>).enabled !== false && !included) {
|
||||
console.warn(`[Config] Channel '${name}' is in ${sourcePath} but missing required credentials -- skipping. Check your lettabot.yaml or environment variables.`);
|
||||
}
|
||||
}
|
||||
|
||||
return normalized;
|
||||
};
|
||||
|
||||
|
||||
107
src/core/bot.ts
107
src/core/bot.ts
@@ -221,6 +221,7 @@ export function isResponseDeliverySuppressed(msg: Pick<InboundMessage, 'isListen
|
||||
|
||||
/**
|
||||
* Pure function: resolve the conversation key for a channel message.
|
||||
* Returns `${channel}:${chatId}` in per-chat mode.
|
||||
* Returns the channel id in per-channel mode or when the channel is in overrides.
|
||||
* Returns 'shared' otherwise.
|
||||
*/
|
||||
@@ -228,8 +229,10 @@ export function resolveConversationKey(
|
||||
channel: string,
|
||||
conversationMode: string | undefined,
|
||||
conversationOverrides: Set<string>,
|
||||
chatId?: string,
|
||||
): string {
|
||||
const normalized = channel.toLowerCase();
|
||||
if (conversationMode === 'per-chat' && chatId) return `${normalized}:${chatId}`;
|
||||
if (conversationMode === 'per-channel') return normalized;
|
||||
if (conversationOverrides.has(normalized)) return normalized;
|
||||
return 'shared';
|
||||
@@ -237,6 +240,7 @@ export function resolveConversationKey(
|
||||
|
||||
/**
|
||||
* Pure function: resolve the conversation key for heartbeat/sendToAgent.
|
||||
* In per-chat mode, uses the full channel:chatId of the last-active target.
|
||||
* In per-channel mode, respects heartbeatConversation setting.
|
||||
* In shared mode with overrides, respects override channels when using last-active.
|
||||
*/
|
||||
@@ -245,9 +249,19 @@ export function resolveHeartbeatConversationKey(
|
||||
heartbeatConversation: string | undefined,
|
||||
conversationOverrides: Set<string>,
|
||||
lastActiveChannel?: string,
|
||||
lastActiveChatId?: string,
|
||||
): string {
|
||||
const hb = heartbeatConversation || 'last-active';
|
||||
|
||||
if (conversationMode === 'per-chat') {
|
||||
if (hb === 'dedicated') return 'heartbeat';
|
||||
if (hb === 'last-active' && lastActiveChannel && lastActiveChatId) {
|
||||
return `${lastActiveChannel.toLowerCase()}:${lastActiveChatId}`;
|
||||
}
|
||||
// Fall back to shared if no last-active target
|
||||
return 'shared';
|
||||
}
|
||||
|
||||
if (conversationMode === 'per-channel') {
|
||||
if (hb === 'dedicated') return 'heartbeat';
|
||||
if (hb === 'last-active') return lastActiveChannel ?? 'shared';
|
||||
@@ -278,13 +292,17 @@ export class LettaBot implements AgentSession {
|
||||
private processing = false; // Global lock for shared mode
|
||||
private processingKeys: Set<string> = new Set(); // Per-key locks for per-channel mode
|
||||
|
||||
// AskUserQuestion support: resolves when the next user message arrives
|
||||
private pendingQuestionResolver: ((text: string) => void) | null = null;
|
||||
// AskUserQuestion support: resolves when the next user message arrives.
|
||||
// In per-chat mode, keyed by convKey so each chat resolves independently.
|
||||
// In shared mode, a single entry keyed by 'shared' provides legacy behavior.
|
||||
private pendingQuestionResolvers: Map<string, (text: string) => void> = new Map();
|
||||
|
||||
// Persistent sessions: reuse CLI subprocesses across messages.
|
||||
// In shared mode, only the "shared" key is used. In per-channel mode, each
|
||||
// channel (and optionally heartbeat) gets its own subprocess.
|
||||
// channel (and optionally heartbeat) gets its own subprocess. In per-chat
|
||||
// mode, each unique channel:chatId gets its own subprocess (LRU-evicted).
|
||||
private sessions: Map<string, Session> = new Map();
|
||||
private sessionLastUsed: Map<string, number> = new Map(); // LRU tracking for per-chat mode
|
||||
// Coalesces concurrent ensureSessionForKey calls for the same key so the
|
||||
// second caller waits for the first instead of creating a duplicate session.
|
||||
// generation prevents stale in-flight creations from being reused after reset.
|
||||
@@ -849,8 +867,8 @@ export class LettaBot implements AgentSession {
|
||||
* Returns 'shared' in shared mode (unless channel is in perChannel overrides).
|
||||
* Returns channel id in per-channel mode or for override channels.
|
||||
*/
|
||||
private resolveConversationKey(channel: string): string {
|
||||
return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides);
|
||||
private resolveConversationKey(channel: string, chatId?: string): string {
|
||||
return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides, chatId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -858,12 +876,13 @@ export class LettaBot implements AgentSession {
|
||||
* Respects perChannel overrides when using last-active in shared mode.
|
||||
*/
|
||||
private resolveHeartbeatConversationKey(): string {
|
||||
const lastActiveChannel = this.store.lastMessageTarget?.channel;
|
||||
const target = this.store.lastMessageTarget;
|
||||
return resolveHeartbeatConversationKey(
|
||||
this.config.conversationMode,
|
||||
this.config.heartbeatConversation,
|
||||
this.conversationOverrides,
|
||||
lastActiveChannel,
|
||||
target?.channel,
|
||||
target?.chatId,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -884,7 +903,10 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
// Fast path: session already exists
|
||||
const existing = this.sessions.get(key);
|
||||
if (existing) return existing;
|
||||
if (existing) {
|
||||
this.sessionLastUsed.set(key, Date.now());
|
||||
return existing;
|
||||
}
|
||||
|
||||
// Coalesce concurrent callers: if another call is already creating this
|
||||
// key (e.g. warmSession running while first message arrives), wait for
|
||||
@@ -1035,7 +1057,32 @@ export class LettaBot implements AgentSession {
|
||||
return this.ensureSessionForKey(key, bootstrapRetried);
|
||||
}
|
||||
|
||||
// LRU eviction: in per-chat mode, limit concurrent sessions to avoid
|
||||
// unbounded subprocess growth. Evicted sessions can be cheaply recreated
|
||||
// via resumeSession() since conversation IDs are persisted in the store.
|
||||
const maxSessions = this.config.maxSessions ?? 10;
|
||||
if (this.config.conversationMode === 'per-chat' && this.sessions.size >= maxSessions) {
|
||||
let oldestKey: string | null = null;
|
||||
let oldestTime = Infinity;
|
||||
for (const [k, ts] of this.sessionLastUsed) {
|
||||
if (k !== key && ts < oldestTime && this.sessions.has(k)) {
|
||||
oldestKey = k;
|
||||
oldestTime = ts;
|
||||
}
|
||||
}
|
||||
if (oldestKey) {
|
||||
log.info(`LRU session eviction: closing session for key="${oldestKey}" (${this.sessions.size} active, max=${maxSessions})`);
|
||||
const evicted = this.sessions.get(oldestKey);
|
||||
evicted?.close();
|
||||
this.sessions.delete(oldestKey);
|
||||
this.sessionLastUsed.delete(oldestKey);
|
||||
this.sessionGenerations.delete(oldestKey);
|
||||
this.sessionCreationLocks.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
|
||||
this.sessions.set(key, session);
|
||||
this.sessionLastUsed.set(key, Date.now());
|
||||
return session;
|
||||
}
|
||||
|
||||
@@ -1061,6 +1108,7 @@ export class LettaBot implements AgentSession {
|
||||
log.info(`Invalidating session (key=${key})`);
|
||||
session.close();
|
||||
this.sessions.delete(key);
|
||||
this.sessionLastUsed.delete(key);
|
||||
}
|
||||
} else {
|
||||
const keys = new Set<string>([
|
||||
@@ -1078,6 +1126,7 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
this.sessions.clear();
|
||||
this.sessionCreationLocks.clear();
|
||||
this.sessionLastUsed.clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1283,7 +1332,7 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
registerChannel(adapter: ChannelAdapter): void {
|
||||
adapter.onMessage = (msg) => this.handleMessage(msg, adapter);
|
||||
adapter.onCommand = (cmd) => this.handleCommand(cmd, adapter.id);
|
||||
adapter.onCommand = (cmd, chatId) => this.handleCommand(cmd, adapter.id, chatId);
|
||||
this.channels.set(adapter.id, adapter);
|
||||
log.info(`Registered channel: ${adapter.name}`);
|
||||
}
|
||||
@@ -1316,7 +1365,7 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
|
||||
const convKey = this.resolveConversationKey(effective.channel);
|
||||
const convKey = this.resolveConversationKey(effective.channel, effective.chatId);
|
||||
if (convKey !== 'shared') {
|
||||
this.enqueueForKey(convKey, effective, adapter);
|
||||
} else {
|
||||
@@ -1331,7 +1380,7 @@ export class LettaBot implements AgentSession {
|
||||
// Commands
|
||||
// =========================================================================
|
||||
|
||||
private async handleCommand(command: string, channelId?: string): Promise<string | null> {
|
||||
private async handleCommand(command: string, channelId?: string, chatId?: string): Promise<string | null> {
|
||||
log.info(`Received: /${command}`);
|
||||
switch (command) {
|
||||
case 'status': {
|
||||
@@ -1356,10 +1405,10 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
case 'reset': {
|
||||
// Always scope the reset to the caller's conversation key so that
|
||||
// other channels' conversations are never silently destroyed.
|
||||
// other channels/chats' conversations are never silently destroyed.
|
||||
// resolveConversationKey returns 'shared' for non-override channels,
|
||||
// or the channel id for per-channel / override channels.
|
||||
const convKey = channelId ? this.resolveConversationKey(channelId) : 'shared';
|
||||
// the channel id for per-channel, or channel:chatId for per-chat.
|
||||
const convKey = channelId ? this.resolveConversationKey(channelId, chatId) : 'shared';
|
||||
this.store.clearConversation(convKey);
|
||||
this.store.resetRecoveryAttempts();
|
||||
this.invalidateSession(convKey);
|
||||
@@ -1372,12 +1421,14 @@ export class LettaBot implements AgentSession {
|
||||
if (convKey === 'shared') {
|
||||
return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`;
|
||||
}
|
||||
return `Conversation reset for this channel. New conversation: ${newConvId}\nOther channels are unaffected. (Agent memory is preserved.)`;
|
||||
const scope = this.config.conversationMode === 'per-chat' ? 'this chat' : 'this channel';
|
||||
return `Conversation reset for ${scope}. New conversation: ${newConvId}\nOther conversations are unaffected. (Agent memory is preserved.)`;
|
||||
} catch {
|
||||
if (convKey === 'shared') {
|
||||
return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)';
|
||||
}
|
||||
return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`;
|
||||
const scope = this.config.conversationMode === 'per-chat' ? 'this chat' : 'this channel';
|
||||
return `Conversation reset for ${scope}. Other conversations are unaffected. (Agent memory is preserved.)`;
|
||||
}
|
||||
}
|
||||
default:
|
||||
@@ -1488,10 +1539,12 @@ export class LettaBot implements AgentSession {
|
||||
// queuing it for normal processing. This prevents a deadlock where
|
||||
// the stream is paused waiting for user input while the processing
|
||||
// flag blocks new messages from being handled.
|
||||
if (this.pendingQuestionResolver) {
|
||||
log.info(`Intercepted message as AskUserQuestion answer from ${msg.userId}`);
|
||||
this.pendingQuestionResolver(msg.text || '');
|
||||
this.pendingQuestionResolver = null;
|
||||
const incomingConvKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
const pendingResolver = this.pendingQuestionResolvers.get(incomingConvKey);
|
||||
if (pendingResolver) {
|
||||
log.info(`Intercepted message as AskUserQuestion answer from ${msg.userId} (key=${incomingConvKey})`);
|
||||
pendingResolver(msg.text || '');
|
||||
this.pendingQuestionResolvers.delete(incomingConvKey);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1506,9 +1559,9 @@ export class LettaBot implements AgentSession {
|
||||
return;
|
||||
}
|
||||
|
||||
const convKey = this.resolveConversationKey(msg.channel);
|
||||
const convKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
if (convKey !== 'shared') {
|
||||
// Per-channel or override mode: messages on different keys can run in parallel.
|
||||
// Per-channel, per-chat, or override mode: messages on different keys can run in parallel.
|
||||
this.enqueueForKey(convKey, msg, adapter);
|
||||
} else {
|
||||
// Shared mode: single global queue (existing behavior)
|
||||
@@ -1654,9 +1707,11 @@ export class LettaBot implements AgentSession {
|
||||
log.info(`AskUserQuestion: sending ${questions.length} question(s) to ${msg.channel}:${msg.chatId}`);
|
||||
await adapter.sendMessage({ chatId: msg.chatId, text: questionText, threadId: msg.threadId });
|
||||
|
||||
// Wait for the user's next message (intercepted by handleMessage)
|
||||
// Wait for the user's next message (intercepted by handleMessage).
|
||||
// Key by convKey so each chat resolves independently in per-chat mode.
|
||||
const questionConvKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
const answer = await new Promise<string>((resolve) => {
|
||||
this.pendingQuestionResolver = resolve;
|
||||
this.pendingQuestionResolvers.set(questionConvKey, resolve);
|
||||
});
|
||||
log.info(`AskUserQuestion: received answer (${answer.length} chars)`);
|
||||
|
||||
@@ -1677,7 +1732,7 @@ export class LettaBot implements AgentSession {
|
||||
// Run session
|
||||
let session: Session | null = null;
|
||||
try {
|
||||
const convKey = this.resolveConversationKey(msg.channel);
|
||||
const convKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
const run = await this.runSession(messageToSend, { retried, canUseTool, convKey });
|
||||
lap('session send');
|
||||
session = run.session;
|
||||
@@ -1923,7 +1978,7 @@ export class LettaBot implements AgentSession {
|
||||
// the current buffer, but finalizeMessage() clears it on type changes.
|
||||
// sentAnyMessage is the authoritative "did we deliver output" flag.
|
||||
const nothingDelivered = !hasResponse && !sentAnyMessage;
|
||||
const retryConvKey = this.resolveConversationKey(msg.channel);
|
||||
const retryConvKey = this.resolveConversationKey(msg.channel, msg.chatId);
|
||||
const retryConvIdFromStore = (retryConvKey === 'shared'
|
||||
? this.store.conversationId
|
||||
: this.store.getConversationId(retryConvKey)) ?? undefined;
|
||||
|
||||
@@ -49,6 +49,31 @@ describe('resolveConversationKey', () => {
|
||||
it('returns "shared" when conversationMode is undefined', () => {
|
||||
expect(resolveConversationKey('telegram', undefined, new Set())).toBe('shared');
|
||||
});
|
||||
|
||||
// --- per-chat mode ---
|
||||
|
||||
it('returns channel:chatId in per-chat mode', () => {
|
||||
expect(resolveConversationKey('telegram', 'per-chat', new Set(), '12345')).toBe('telegram:12345');
|
||||
});
|
||||
|
||||
it('normalizes channel name in per-chat mode', () => {
|
||||
expect(resolveConversationKey('Telegram', 'per-chat', new Set(), '12345')).toBe('telegram:12345');
|
||||
});
|
||||
|
||||
it('falls back to shared in per-chat mode when chatId is missing', () => {
|
||||
expect(resolveConversationKey('telegram', 'per-chat', new Set())).toBe('shared');
|
||||
expect(resolveConversationKey('telegram', 'per-chat', new Set(), undefined)).toBe('shared');
|
||||
});
|
||||
|
||||
it('per-chat mode takes precedence over overrides', () => {
|
||||
const overrides = new Set(['telegram']);
|
||||
expect(resolveConversationKey('telegram', 'per-chat', overrides, '99')).toBe('telegram:99');
|
||||
});
|
||||
|
||||
it('chatId is ignored in non-per-chat modes', () => {
|
||||
expect(resolveConversationKey('telegram', 'shared', new Set(), '12345')).toBe('shared');
|
||||
expect(resolveConversationKey('telegram', 'per-channel', new Set(), '12345')).toBe('telegram');
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -105,4 +130,22 @@ describe('resolveHeartbeatConversationKey', () => {
|
||||
const overrides = new Set(['slack']);
|
||||
expect(resolveHeartbeatConversationKey('shared', 'dedicated', overrides, 'slack')).toBe('shared');
|
||||
});
|
||||
|
||||
// --- per-chat mode ---
|
||||
|
||||
it('returns channel:chatId in per-chat mode with last-active', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-chat', 'last-active', new Set(), 'telegram', '12345')).toBe('telegram:12345');
|
||||
});
|
||||
|
||||
it('returns "heartbeat" in per-chat mode with dedicated', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-chat', 'dedicated', new Set(), 'telegram', '12345')).toBe('heartbeat');
|
||||
});
|
||||
|
||||
it('falls back to shared in per-chat mode when chatId is missing', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-chat', 'last-active', new Set(), 'telegram', undefined)).toBe('shared');
|
||||
});
|
||||
|
||||
it('falls back to "shared" in per-chat mode when no last-active target', () => {
|
||||
expect(resolveHeartbeatConversationKey('per-chat', 'last-active', new Set(), undefined, undefined)).toBe('shared');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -330,6 +330,7 @@ export class Store {
|
||||
|
||||
/**
|
||||
* Set conversation ID for a specific key.
|
||||
* TODO: consider TTL-based cleanup for per-chat conversation entries
|
||||
*/
|
||||
setConversationId(key: string, id: string): void {
|
||||
const agent = this.agentData();
|
||||
|
||||
@@ -155,9 +155,10 @@ export interface BotConfig {
|
||||
sendFileCleanup?: boolean; // Allow <send-file cleanup="true"> to delete files after send (default: false)
|
||||
|
||||
// Conversation routing
|
||||
conversationMode?: 'shared' | 'per-channel'; // Default: shared
|
||||
conversationMode?: 'shared' | 'per-channel' | 'per-chat'; // Default: shared
|
||||
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||
conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode)
|
||||
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -590,6 +590,7 @@ async function main() {
|
||||
conversationMode: agentConfig.conversations?.mode || 'shared',
|
||||
heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active',
|
||||
conversationOverrides: agentConfig.conversations?.perChannel,
|
||||
maxSessions: agentConfig.conversations?.maxSessions,
|
||||
skills: {
|
||||
cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled,
|
||||
googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,
|
||||
|
||||
Reference in New Issue
Block a user