From 560380d721a56dc89c993b97eec73e98e416673b Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 13 Feb 2026 17:21:38 -0800 Subject: [PATCH] feat: per-channel conversation routing (#299) --- src/channels/discord.ts | 11 +- src/channels/telegram.ts | 8 + src/config/types.ts | 11 ++ src/core/bot.ts | 342 +++++++++++++++++++++++++++++---------- src/core/store.test.ts | 73 +++++++++ src/core/store.ts | 40 +++++ src/core/types.ts | 7 +- src/main.ts | 2 + 8 files changed, 402 insertions(+), 92 deletions(-) diff --git a/src/channels/discord.ts b/src/channels/discord.ts index 641da7d..09b514c 100644 --- a/src/channels/discord.ts +++ b/src/channels/discord.ts @@ -249,15 +249,8 @@ Ask the bot owner to approve with: return; } if (this.onCommand) { - if (command === 'status') { - const result = await this.onCommand('status'); - if (result) { - await message.channel.send(result); - } - return; - } - if (command === 'heartbeat') { - const result = await this.onCommand('heartbeat'); + if (command === 'status' || command === 'reset' || command === 'heartbeat') { + const result = await this.onCommand(command); if (result) { await message.channel.send(result); } diff --git a/src/channels/telegram.ts b/src/channels/telegram.ts index 15df85f..8b0f89b 100644 --- a/src/channels/telegram.ts +++ b/src/channels/telegram.ts @@ -247,6 +247,14 @@ export class TelegramAdapter implements ChannelAdapter { await this.onCommand('heartbeat'); } }); + + // Handle /reset + this.bot.command('reset', async (ctx) => { + if (this.onCommand) { + const result = await this.onCommand('reset'); + await ctx.reply(result || 'Reset complete'); + } + }); // Handle text messages this.bot.on('message:text', async (ctx) => { diff --git a/src/config/types.ts b/src/config/types.ts index 9797efd..bc0f40e 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -47,6 +47,11 @@ export interface AgentConfig { signal?: SignalConfig; discord?: DiscordConfig; }; + /** Conversation routing */ + conversations?: { + mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels) + heartbeat?: string; // "dedicated" | "last-active" | "" (default: last-active) + }; /** Features for this agent */ features?: { cron?: boolean; @@ -112,6 +117,12 @@ export interface LettaBotConfig { discord?: DiscordConfig; }; + // Conversation routing + conversations?: { + mode?: 'shared' | 'per-channel'; // Default: shared (single conversation across all channels) + heartbeat?: string; // "dedicated" | "last-active" | "" (default: last-active) + }; + // Features features?: { cron?: boolean; diff --git a/src/core/bot.ts b/src/core/bot.ts index b2af729..0955937 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -130,13 +130,16 @@ export class LettaBot implements AgentSession { private groupIntervals: Map = new Map(); private instantGroupIds: Set = new Set(); private listeningGroupIds: Set = new Set(); - private processing = false; + private processing = false; // Global lock for shared mode + private processingKeys: Set = new Set(); // Per-key locks for per-channel mode // AskUserQuestion support: resolves when the next user message arrives private pendingQuestionResolver: ((text: string) => void) | null = null; - // Persistent session: reuse a single CLI subprocess across messages - private persistentSession: Session | null = null; + // Persistent sessions: reuse CLI subprocesses across messages. + // In shared mode, only the "shared" key is used. In per-channel mode, each + // channel (and optionally heartbeat) gets its own subprocess. + private sessions: Map = new Map(); private currentCanUseTool: CanUseToolCallback | undefined; // Stable callback wrapper so the Session options never change, but we can // swap out the per-message handler before each send(). @@ -307,21 +310,59 @@ export class LettaBot implements AgentSession { return acted; } + // ========================================================================= + // Conversation key resolution + // ========================================================================= + /** - * Return the persistent session, creating and initializing it if needed. - * The subprocess stays alive across messages -- only recreated on failure. + * Resolve the conversation key for a channel message. + * In shared mode returns "shared"; in per-channel mode returns the channel id. */ - private async ensureSession(): Promise { - if (this.persistentSession) { - return this.persistentSession; + private resolveConversationKey(channel: string): string { + return this.config.conversationMode === 'per-channel' ? channel : 'shared'; + } + + /** + * Resolve the conversation key for heartbeat/sendToAgent. + */ + private resolveHeartbeatConversationKey(): string { + if (this.config.conversationMode !== 'per-channel') return 'shared'; + + const hb = this.config.heartbeatConversation || 'last-active'; + if (hb === 'dedicated') return 'heartbeat'; + if (hb === 'last-active') { + // Use the last channel the user messaged on + const target = this.store.lastMessageTarget; + return target ? target.channel : 'shared'; } + // Explicit channel name (e.g., "telegram") + return hb; + } + + // ========================================================================= + // Session lifecycle (per-key) + // ========================================================================= + + /** + * Return the persistent session for the given conversation key, + * creating and initializing it if needed. + */ + private async ensureSessionForKey(key: string): Promise { + const existing = this.sessions.get(key); + if (existing) return existing; const opts = this.baseSessionOptions(this.sessionCanUseTool); let session: Session; - if (this.store.conversationId) { + // In per-channel mode, look up per-key conversation ID. + // In shared mode (key === "shared"), use the legacy single conversationId. + const convId = key === 'shared' + ? this.store.conversationId + : this.store.getConversationId(key); + + if (convId) { process.env.LETTA_AGENT_ID = this.store.agentId || undefined; - session = resumeSession(this.store.conversationId, opts); + session = resumeSession(convId, opts); } else if (this.store.agentId) { process.env.LETTA_AGENT_ID = this.store.agentId; session = createSession(this.store.agentId, opts); @@ -345,21 +386,36 @@ export class LettaBot implements AgentSession { } // Initialize eagerly so the subprocess is ready before the first send() - console.log('[Bot] Initializing session subprocess...'); + console.log(`[Bot] Initializing session subprocess (key=${key})...`); await session.initialize(); - console.log('[Bot] Session subprocess ready'); - this.persistentSession = session; + console.log(`[Bot] Session subprocess ready (key=${key})`); + this.sessions.set(key, session); return session; } + /** Legacy convenience: resolve key from shared/per-channel mode and delegate. */ + private async ensureSession(): Promise { + return this.ensureSessionForKey('shared'); + } + /** - * Destroy the persistent session so the next ensureSession() spawns a fresh one. + * Destroy session(s). If key provided, destroys only that key. + * If key is undefined, destroys ALL sessions. */ - private invalidateSession(): void { - if (this.persistentSession) { - console.log('[Bot] Invalidating persistent session'); - this.persistentSession.close(); - this.persistentSession = null; + private invalidateSession(key?: string): void { + if (key) { + const session = this.sessions.get(key); + if (session) { + console.log(`[Bot] Invalidating session (key=${key})`); + session.close(); + this.sessions.delete(key); + } + } else { + for (const [k, session] of this.sessions) { + console.log(`[Bot] Invalidating session (key=${k})`); + session.close(); + } + this.sessions.clear(); } } @@ -369,7 +425,11 @@ export class LettaBot implements AgentSession { async warmSession(): Promise { if (!this.store.agentId && !this.store.conversationId) return; try { - await this.ensureSession(); + // In shared mode, warm the single session. In per-channel mode, warm nothing + // (sessions are created on first message per channel). + if (this.config.conversationMode !== 'per-channel') { + await this.ensureSessionForKey('shared'); + } } catch (err) { console.warn('[Bot] Session pre-warm failed:', err instanceof Error ? err.message : err); } @@ -377,18 +437,27 @@ export class LettaBot implements AgentSession { /** * Persist conversation ID after a successful session result. - * Agent ID and first-run setup are handled eagerly in ensureSession(). + * Agent ID and first-run setup are handled eagerly in ensureSessionForKey(). */ - private persistSessionState(session: Session): void { - // Agent ID already persisted in ensureSession() on creation. + private persistSessionState(session: Session, convKey?: string): void { + // Agent ID already persisted in ensureSessionForKey() on creation. // Here we only update if the server returned a different one (shouldn't happen). if (session.agentId && session.agentId !== this.store.agentId) { const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); console.log('[Bot] Agent ID updated:', session.agentId); - } else if (session.conversationId && session.conversationId !== this.store.conversationId) { - this.store.conversationId = session.conversationId; - console.log('[Bot] Conversation ID updated:', session.conversationId); + } else if (session.conversationId) { + // In per-channel mode, persist per-key. In shared mode, use legacy field. + if (convKey && convKey !== 'shared') { + const existing = this.store.getConversationId(convKey); + if (session.conversationId !== existing) { + this.store.setConversationId(convKey, session.conversationId); + console.log(`[Bot] Conversation ID updated (key=${convKey}):`, session.conversationId); + } + } else if (session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + console.log('[Bot] Conversation ID updated:', session.conversationId); + } } } @@ -404,30 +473,35 @@ export class LettaBot implements AgentSession { */ private async runSession( message: SendMessage, - options: { retried?: boolean; canUseTool?: CanUseToolCallback } = {}, + options: { retried?: boolean; canUseTool?: CanUseToolCallback; convKey?: string } = {}, ): Promise<{ session: Session; stream: () => AsyncGenerator }> { - const { retried = false, canUseTool } = options; + const { retried = false, canUseTool, convKey = 'shared' } = options; // Update the per-message callback before sending this.currentCanUseTool = canUseTool; - let session = await this.ensureSession(); + let session = await this.ensureSessionForKey(convKey); + + // Resolve the conversation ID for this key (for error recovery) + const convId = convKey === 'shared' + ? this.store.conversationId + : this.store.getConversationId(convKey); // Send message with fallback chain try { await session.send(message); } catch (error) { // 409 CONFLICT from orphaned approval - if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) { + if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) { console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...'); - this.invalidateSession(); + this.invalidateSession(convKey); const result = await recoverOrphanedConversationApproval( this.store.agentId, - this.store.conversationId + convId ); if (result.recovered) { console.log(`[Bot] Recovery succeeded (${result.details}), retrying...`); - return this.runSession(message, { retried: true, canUseTool }); + return this.runSession(message, { retried: true, canUseTool, convKey }); } console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`); throw error; @@ -437,28 +511,29 @@ export class LettaBot implements AgentSession { // Only retry on errors that indicate missing conversation/agent, not // on auth, network, or protocol errors (which would just fail again). if (this.store.agentId && isConversationMissingError(error)) { - console.warn('[Bot] Conversation not found, creating a new conversation...'); - this.invalidateSession(); - session = await this.ensureSession(); + console.warn(`[Bot] Conversation not found (key=${convKey}), creating a new conversation...`); + this.invalidateSession(convKey); + if (convKey !== 'shared') { + this.store.clearConversation(convKey); + } else { + this.store.conversationId = null; + } + session = await this.ensureSessionForKey(convKey); await session.send(message); } else { // Unknown error -- invalidate so we get a fresh subprocess next time - this.invalidateSession(); + this.invalidateSession(convKey); throw error; } } // Persist conversation ID immediately after successful send, before streaming. - // If streaming disconnects/aborts before result, the next turn will still - // resume the correct conversation instead of forking a new one. - if (session.conversationId && session.conversationId !== this.store.conversationId) { - this.store.conversationId = session.conversationId; - console.log('[Bot] Saved conversation ID:', session.conversationId); - } + this.persistSessionState(session, convKey); // Return session and a deduplicated stream generator const seenToolCallIds = new Set(); const self = this; + const capturedConvKey = convKey; // Capture for closure async function* dedupedStream(): AsyncGenerator { for await (const raw of session.stream()) { @@ -475,7 +550,7 @@ export class LettaBot implements AgentSession { // Persist state on result if (msg.type === 'result') { - self.persistSessionState(session); + self.persistSessionState(session, capturedConvKey); break; } } @@ -490,7 +565,7 @@ export class LettaBot implements AgentSession { registerChannel(adapter: ChannelAdapter): void { adapter.onMessage = (msg) => this.handleMessage(msg, adapter); - adapter.onCommand = (cmd) => this.handleCommand(cmd); + adapter.onCommand = (cmd) => this.handleCommand(cmd, adapter.id); this.channels.set(adapter.id, adapter); console.log(`Registered channel: ${adapter.name}`); } @@ -523,9 +598,14 @@ export class LettaBot implements AgentSession { } } - this.messageQueue.push({ msg: effective, adapter }); - if (!this.processing) { - this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); + if (this.config.conversationMode === 'per-channel') { + const convKey = this.resolveConversationKey(effective.channel); + this.enqueueForKey(convKey, effective, adapter); + } else { + this.messageQueue.push({ msg: effective, adapter }); + if (!this.processing) { + this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); + } } } @@ -533,7 +613,7 @@ export class LettaBot implements AgentSession { // Commands // ========================================================================= - private async handleCommand(command: string): Promise { + private async handleCommand(command: string, channelId?: string): Promise { console.log(`[Command] Received: /${command}`); switch (command) { case 'status': { @@ -557,12 +637,35 @@ export class LettaBot implements AgentSession { return '⏰ Heartbeat triggered (silent mode - check server logs)'; } case 'reset': { - const oldConversationId = this.store.conversationId; - this.store.conversationId = null; + const convKey = channelId ? this.resolveConversationKey(channelId) : undefined; + if (convKey && convKey !== 'shared') { + // Per-channel mode: only clear the conversation for this channel + this.store.clearConversation(convKey); + this.invalidateSession(convKey); + console.log(`[Command] /reset - conversation cleared for ${convKey}`); + // Eagerly create the new session so we can report the conversation ID + try { + const session = await this.ensureSessionForKey(convKey); + const newConvId = session.conversationId || '(pending)'; + this.persistSessionState(session, convKey); + return `Conversation reset for this channel. New conversation: ${newConvId}\nOther channels are unaffected. (Agent memory is preserved.)`; + } catch { + return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`; + } + } + // Shared mode or no channel context: clear everything + this.store.clearConversation(); this.store.resetRecoveryAttempts(); - this.invalidateSession(); // Subprocess has old conversation baked in - console.log(`[Command] /reset - conversation cleared (was: ${oldConversationId})`); - return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)'; + this.invalidateSession(); + console.log('[Command] /reset - all conversations cleared'); + try { + const session = await this.ensureSessionForKey('shared'); + const newConvId = session.conversationId || '(pending)'; + this.persistSessionState(session, 'shared'); + return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`; + } catch { + return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)'; + } } default: return null; @@ -690,12 +793,59 @@ export class LettaBot implements AgentSession { return; } - this.messageQueue.push({ msg, adapter }); - if (!this.processing) { - this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); + if (this.config.conversationMode === 'per-channel') { + // Per-channel mode: messages on different channels can run in parallel. + // Only serialize within the same conversation key. + const convKey = this.resolveConversationKey(msg.channel); + this.enqueueForKey(convKey, msg, adapter); + } else { + // Shared mode: single global queue (existing behavior) + this.messageQueue.push({ msg, adapter }); + if (!this.processing) { + this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err)); + } } } - + + /** + * Enqueue a message for a specific conversation key. + * Messages with the same key are serialized; different keys run in parallel. + */ + private keyedQueues: Map> = new Map(); + + private enqueueForKey(key: string, msg: InboundMessage, adapter: ChannelAdapter): void { + let queue = this.keyedQueues.get(key); + if (!queue) { + queue = []; + this.keyedQueues.set(key, queue); + } + queue.push({ msg, adapter }); + + if (!this.processingKeys.has(key)) { + this.processKeyedQueue(key).catch(err => + console.error(`[Queue] Fatal error in processKeyedQueue(${key}):`, err) + ); + } + } + + private async processKeyedQueue(key: string): Promise { + if (this.processingKeys.has(key)) return; + this.processingKeys.add(key); + + const queue = this.keyedQueues.get(key); + while (queue && queue.length > 0) { + const { msg, adapter } = queue.shift()!; + try { + await this.processMessage(msg, adapter); + } catch (error) { + console.error(`[Queue] Error processing message (key=${key}):`, error); + } + } + + this.processingKeys.delete(key); + this.keyedQueues.delete(key); + } + private async processQueue(): Promise { if (this.processing || this.messageQueue.length === 0) return; @@ -815,7 +965,8 @@ export class LettaBot implements AgentSession { // Run session let session: Session | null = null; try { - const run = await this.runSession(messageToSend, { retried, canUseTool }); + const convKey = this.resolveConversationKey(msg.channel); + const run = await this.runSession(messageToSend, { retried, canUseTool, convKey }); lap('session send'); session = run.session; @@ -992,15 +1143,19 @@ export class LettaBot implements AgentSession { console.error(`[Bot] Warning: Agent returned terminal error (error=${streamMsg.error}, stopReason=${streamMsg.stopReason || 'N/A'}) with no response.`); } - if (!retried && this.store.agentId && this.store.conversationId) { + const retryConvKey = this.resolveConversationKey(msg.channel); + const retryConvId = retryConvKey === 'shared' + ? this.store.conversationId + : this.store.getConversationId(retryConvKey); + if (!retried && this.store.agentId && retryConvId) { const reason = shouldRetryForErrorResult ? 'error result' : 'empty result'; console.log(`[Bot] ${reason} - attempting orphaned approval recovery...`); - this.invalidateSession(); + this.invalidateSession(retryConvKey); session = null; clearInterval(typingInterval); const convResult = await recoverOrphanedConversationApproval( this.store.agentId, - this.store.conversationId + retryConvId ); if (convResult.recovered) { console.log(`[Bot] Recovery succeeded (${convResult.details}), retrying message...`); @@ -1127,19 +1282,48 @@ export class LettaBot implements AgentSession { // sendToAgent - Background triggers (heartbeats, cron, webhooks) // ========================================================================= + /** + * Acquire the appropriate lock for a conversation key. + * In per-channel mode with a dedicated key, no lock needed (parallel OK). + * In per-channel mode with a channel key, wait for that key's queue. + * In shared mode, use the global processing flag. + */ + private async acquireLock(convKey: string): Promise { + if (convKey === 'heartbeat') return false; // No lock needed + + if (this.config.conversationMode === 'per-channel') { + while (this.processingKeys.has(convKey)) { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + this.processingKeys.add(convKey); + } else { + while (this.processing) { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + this.processing = true; + } + return true; + } + + private releaseLock(convKey: string, acquired: boolean): void { + if (!acquired) return; + if (this.config.conversationMode === 'per-channel') { + this.processingKeys.delete(convKey); + } else { + this.processing = false; + this.processQueue(); + } + } + async sendToAgent( text: string, _context?: TriggerContext ): Promise { - // Serialize with message queue to prevent 409 conflicts - while (this.processing) { - await new Promise(resolve => setTimeout(resolve, 1000)); - } - - this.processing = true; + const convKey = this.resolveHeartbeatConversationKey(); + const acquired = await this.acquireLock(convKey); try { - const { stream } = await this.runSession(text); + const { stream } = await this.runSession(text, { convKey }); try { let response = ''; @@ -1162,12 +1346,11 @@ export class LettaBot implements AgentSession { return response; } catch (error) { // Invalidate on stream errors so next call gets a fresh subprocess - this.invalidateSession(); + this.invalidateSession(convKey); throw error; } } finally { - this.processing = false; - this.processQueue(); + this.releaseLock(convKey, acquired); } } @@ -1179,25 +1362,20 @@ export class LettaBot implements AgentSession { text: string, _context?: TriggerContext ): AsyncGenerator { - // Serialize with message queue to prevent 409 conflicts - while (this.processing) { - await new Promise(resolve => setTimeout(resolve, 1000)); - } - - this.processing = true; + const convKey = this.resolveHeartbeatConversationKey(); + const acquired = await this.acquireLock(convKey); try { - const { stream } = await this.runSession(text); + const { stream } = await this.runSession(text, { convKey }); try { yield* stream(); } catch (error) { - this.invalidateSession(); + this.invalidateSession(convKey); throw error; } } finally { - this.processing = false; - this.processQueue(); + this.releaseLock(convKey, acquired); } } diff --git a/src/core/store.test.ts b/src/core/store.test.ts index 3466512..18c226d 100644 --- a/src/core/store.test.ts +++ b/src/core/store.test.ts @@ -225,4 +225,77 @@ describe('Store', () => { expect(defaultStore.agentId).toBe('global-agent'); expect(namedStore.agentId).toBeNull(); }); + + // Per-key conversation management + + it('should get/set per-key conversation IDs', () => { + const store = new Store(testStorePath, 'TestBot'); + + // Initially null for all keys + expect(store.getConversationId('telegram')).toBeNull(); + expect(store.getConversationId('slack')).toBeNull(); + + // Set per-key + store.setConversationId('telegram', 'conv-tg-1'); + store.setConversationId('slack', 'conv-slack-1'); + + expect(store.getConversationId('telegram')).toBe('conv-tg-1'); + expect(store.getConversationId('slack')).toBe('conv-slack-1'); + + // Legacy field is separate + expect(store.conversationId).toBeNull(); + }); + + it('should fall back to legacy conversationId when key is undefined', () => { + const store = new Store(testStorePath, 'TestBot'); + store.conversationId = 'conv-shared'; + + expect(store.getConversationId()).toBe('conv-shared'); + expect(store.getConversationId(undefined)).toBe('conv-shared'); + }); + + it('should clear a specific conversation key', () => { + const store = new Store(testStorePath, 'TestBot'); + + store.setConversationId('telegram', 'conv-tg'); + store.setConversationId('slack', 'conv-slack'); + store.clearConversation('telegram'); + + expect(store.getConversationId('telegram')).toBeNull(); + expect(store.getConversationId('slack')).toBe('conv-slack'); + }); + + it('should clear all conversations when key is undefined', () => { + const store = new Store(testStorePath, 'TestBot'); + + store.conversationId = 'conv-shared'; + store.setConversationId('telegram', 'conv-tg'); + store.setConversationId('discord', 'conv-dc'); + store.clearConversation(); + + expect(store.conversationId).toBeNull(); + expect(store.getConversationId('telegram')).toBeNull(); + expect(store.getConversationId('discord')).toBeNull(); + }); + + it('should persist per-key conversations across reloads', () => { + const store1 = new Store(testStorePath, 'TestBot'); + store1.setConversationId('telegram', 'conv-tg-persist'); + store1.setConversationId('heartbeat', 'conv-hb-persist'); + + const store2 = new Store(testStorePath, 'TestBot'); + expect(store2.getConversationId('telegram')).toBe('conv-tg-persist'); + expect(store2.getConversationId('heartbeat')).toBe('conv-hb-persist'); + }); + + it('should isolate per-key conversations across agents', () => { + const store1 = new Store(testStorePath, 'Bot1'); + const store2 = new Store(testStorePath, 'Bot2'); + + store1.setConversationId('telegram', 'conv-bot1-tg'); + store2.setConversationId('telegram', 'conv-bot2-tg'); + + expect(store1.getConversationId('telegram')).toBe('conv-bot1-tg'); + expect(store2.getConversationId('telegram')).toBe('conv-bot2-tg'); + }); }); diff --git a/src/core/store.ts b/src/core/store.ts index 329c197..9246244 100644 --- a/src/core/store.ts +++ b/src/core/store.ts @@ -126,6 +126,46 @@ export class Store { this.agentData().conversationId = id; this.save(); } + + // Per-key conversation management (for per-channel mode) + + /** + * Get conversation ID for a specific key (channel name, "heartbeat", etc.). + * Falls back to the legacy single conversationId when key is undefined. + */ + getConversationId(key?: string): string | null { + if (!key) return this.conversationId; + return this.agentData().conversations?.[key] || null; + } + + /** + * Set conversation ID for a specific key. + */ + setConversationId(key: string, id: string): void { + const agent = this.agentData(); + if (!agent.conversations) { + agent.conversations = {}; + } + agent.conversations[key] = id; + this.save(); + } + + /** + * Clear conversation(s). If key is provided, clears only that key. + * If key is undefined, clears the legacy conversationId AND all per-key conversations. + */ + clearConversation(key?: string): void { + const agent = this.agentData(); + if (key) { + if (agent.conversations) { + delete agent.conversations[key]; + } + } else { + agent.conversationId = null; + agent.conversations = undefined; + } + this.save(); + } get baseUrl(): string | undefined { return this.agentData().baseUrl; diff --git a/src/core/types.ts b/src/core/types.ts index 2d1e9a2..b40c818 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -137,6 +137,10 @@ export interface BotConfig { // Security allowedUsers?: string[]; // Empty = allow all + + // Conversation routing + conversationMode?: 'shared' | 'per-channel'; // Default: shared + heartbeatConversation?: string; // "dedicated" | "last-active" | "" (default: last-active) } /** @@ -154,7 +158,8 @@ export interface LastMessageTarget { */ export interface AgentStore { agentId: string | null; - conversationId?: string | null; // Current conversation ID + conversationId?: string | null; // Current conversation ID (used in shared mode) + conversations?: Record; // Per-key conversation IDs (used in per-channel mode) baseUrl?: string; // Server URL this agent belongs to createdAt?: string; lastUsedAt?: string; diff --git a/src/main.ts b/src/main.ts index 0ca9806..b174273 100644 --- a/src/main.ts +++ b/src/main.ts @@ -527,6 +527,8 @@ async function main() { disallowedTools: globalConfig.disallowedTools, displayName: agentConfig.displayName, maxToolCalls: agentConfig.features?.maxToolCalls, + conversationMode: agentConfig.conversations?.mode || 'shared', + heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active', skills: { cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled, googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,