diff --git a/src/config/normalize.ts b/src/config/normalize.ts new file mode 100644 index 0000000..3f9a340 --- /dev/null +++ b/src/config/normalize.ts @@ -0,0 +1,246 @@ +/** + * Config Normalization + * + * Converts legacy single-agent configs to multi-agent format + * and ensures all required fields are present. + */ + +import { homedir } from 'node:os'; +import { join } from 'node:path'; +import type { + AgentBinding, + AgentConfig, + LettaBotConfig, + NormalizedConfig, +} from './types.js'; + +const DEFAULT_WORKSPACE = join(homedir(), '.lettabot', 'workspace'); +const DEFAULT_MODEL = 'zai/glm-4.7'; + +/** + * Normalize config to multi-agent format + * + * This function: + * 1. Converts legacy single-agent `agent` field to `agents.list[]` + * 2. Ensures defaults are populated + * 3. Creates implicit bindings for single-account channels + */ +export function normalizeConfig(config: LettaBotConfig): NormalizedConfig { + // Check if already using multi-agent format + const hasMultiAgent = config.agents?.list && config.agents.list.length > 0; + + let agents: NormalizedConfig['agents']; + let bindings: AgentBinding[]; + + if (hasMultiAgent) { + // Multi-agent mode: use as-is with defaults filled in + const defaultModel = config.agents?.defaults?.model || config.agent?.model || DEFAULT_MODEL; + + agents = { + defaults: { + model: defaultModel, + ...config.agents?.defaults, + }, + list: config.agents!.list!.map(agent => ({ + ...agent, + workspace: resolveWorkspace(agent.workspace, agent.id), + model: agent.model || defaultModel, + })), + }; + + // Ensure at least one default agent + if (!agents.list.some(a => a.default)) { + agents.list[0].default = true; + } + + bindings = config.bindings || []; + } else { + // Legacy single-agent mode: convert to multi-agent + const legacyAgent = config.agent; + const agentId = legacyAgent?.id || 'main'; + const agentName = legacyAgent?.name || 'LettaBot'; + const agentModel = legacyAgent?.model || DEFAULT_MODEL; + + agents = { + defaults: { + model: agentModel, + }, + list: [{ + id: agentId, + name: agentName, + default: true, + workspace: DEFAULT_WORKSPACE, + model: agentModel, + }], + }; + + // No explicit bindings in legacy mode - default agent handles all + bindings = []; + } + + // Create implicit bindings for channels without explicit bindings + bindings = addImplicitBindings(config, agents.list, bindings); + + // Build normalized config (omit legacy `agent` field) + const { agent: _legacyAgent, ...rest } = config; + + return { + ...rest, + agents, + bindings, + }; +} + +/** + * Resolve workspace path, expanding ~ and ensuring absolute path + */ +function resolveWorkspace(workspace: string, agentId: string): string { + if (!workspace) { + return join(homedir(), '.lettabot', `workspace-${agentId}`); + } + + // Expand ~ to home directory + if (workspace.startsWith('~')) { + return workspace.replace('~', homedir()); + } + + return workspace; +} + +/** + * Add implicit bindings for single-account channels + * + * When a channel has no explicit bindings and uses single-account mode, + * we implicitly route it to the default agent. + */ +function addImplicitBindings( + config: LettaBotConfig, + agentsList: AgentConfig[], + existingBindings: AgentBinding[] +): AgentBinding[] { + const bindings = [...existingBindings]; + const defaultAgent = agentsList.find(a => a.default) || agentsList[0]; + + // Helper to check if a channel/account combo already has a binding + const hasBinding = (channel: string, accountId?: string): boolean => { + return bindings.some(b => { + if (b.match.channel !== channel) return false; + // If checking specific account, must match + if (accountId && b.match.accountId && b.match.accountId !== accountId) return false; + // If no specific account in binding, it's a catch-all for the channel + if (!b.match.accountId && !b.match.peer) return true; + return b.match.accountId === accountId; + }); + }; + + // Process each channel type + const channelTypes = ['telegram', 'slack', 'discord', 'whatsapp', 'signal'] as const; + + for (const channelType of channelTypes) { + const channelConfig = config.channels[channelType]; + if (!channelConfig?.enabled) continue; + + // Check for multi-account mode + const accounts = (channelConfig as any).accounts as Record | undefined; + + if (accounts && Object.keys(accounts).length > 0) { + // Multi-account: add binding for each account without explicit binding + for (const accountId of Object.keys(accounts)) { + if (!hasBinding(channelType, accountId)) { + bindings.push({ + agentId: defaultAgent.id, + match: { channel: channelType, accountId }, + }); + } + } + } else { + // Single account: add binding if no channel-wide binding exists + if (!hasBinding(channelType)) { + bindings.push({ + agentId: defaultAgent.id, + match: { channel: channelType }, + }); + } + } + } + + return bindings; +} + +/** + * Get list of enabled channel account pairs from config + */ +export function getEnabledChannelAccounts(config: LettaBotConfig | NormalizedConfig): Array<{ + channel: string; + accountId: string; +}> { + const result: Array<{ channel: string; accountId: string }> = []; + + const channelTypes = ['telegram', 'slack', 'discord', 'whatsapp', 'signal'] as const; + + for (const channelType of channelTypes) { + const channelConfig = config.channels[channelType]; + if (!channelConfig?.enabled) continue; + + const accounts = (channelConfig as any).accounts as Record | undefined; + + if (accounts && Object.keys(accounts).length > 0) { + for (const accountId of Object.keys(accounts)) { + result.push({ channel: channelType, accountId }); + } + } else { + // Single account mode uses 'default' as accountId + result.push({ channel: channelType, accountId: 'default' }); + } + } + + return result; +} + +/** + * Validate config and return errors + */ +export function validateConfig(config: LettaBotConfig): string[] { + const errors: string[] = []; + + // Check for multi-agent mode + if (config.agents?.list && config.agents.list.length > 0) { + // Validate agent IDs are unique + const ids = new Set(); + for (const agent of config.agents.list) { + if (!agent.id) { + errors.push('Agent config missing required "id" field'); + } else if (ids.has(agent.id)) { + errors.push(`Duplicate agent ID: ${agent.id}`); + } else { + ids.add(agent.id); + } + + if (!agent.workspace) { + errors.push(`Agent "${agent.id}" missing required "workspace" field`); + } + } + + // Validate bindings reference valid agents + if (config.bindings) { + for (const binding of config.bindings) { + if (!ids.has(binding.agentId)) { + errors.push(`Binding references unknown agent: ${binding.agentId}`); + } + if (!binding.match?.channel) { + errors.push(`Binding for agent "${binding.agentId}" missing required "match.channel" field`); + } + } + } + } + + // Validate at least one channel is enabled (or will be via multi-agent) + const hasChannel = Object.values(config.channels || {}).some( + ch => ch && typeof ch === 'object' && (ch as any).enabled + ); + if (!hasChannel) { + errors.push('No channels enabled. Enable at least one channel (telegram, slack, discord, whatsapp, or signal)'); + } + + return errors; +} diff --git a/src/core/agent-instance.ts b/src/core/agent-instance.ts new file mode 100644 index 0000000..625d30e --- /dev/null +++ b/src/core/agent-instance.ts @@ -0,0 +1,483 @@ +/** + * Agent Instance + * + * Handles a single agent in multi-agent mode. + * Each instance has its own workspace, store, and Letta agent. + */ + +import { createAgent, createSession, resumeSession, type Session } from '@letta-ai/letta-code-sdk'; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs'; +import { homedir } from 'node:os'; +import { join, resolve } from 'node:path'; +import type { ChannelAdapter } from '../channels/types.js'; +import type { InboundMessage, TriggerContext } from './types.js'; +import { updateAgentName } from '../tools/letta-api.js'; +import { formatMessageEnvelope } from './formatter.js'; +import { loadMemoryBlocks } from './memory.js'; +import { SYSTEM_PROMPT } from './system-prompt.js'; + +/** + * Agent instance configuration + */ +export interface AgentInstanceConfig { + /** Config ID (e.g., "home", "work") - used for store path */ + configId: string; + /** Display name */ + name: string; + /** Working directory for this agent */ + workspace: string; + /** Model to use */ + model?: string; + /** Allowed tools */ + allowedTools?: string[]; +} + +/** + * Agent state persisted to disk + */ +interface AgentState { + /** Letta Cloud agent ID */ + agentId: string | null; + /** Current conversation ID */ + conversationId?: string | null; + /** Server URL this agent belongs to */ + baseUrl?: string; + /** When the agent was created */ + createdAt?: string; + /** When the agent was last used */ + lastUsedAt?: string; + /** Last message target for heartbeats */ + lastMessageTarget?: { + channel: string; + chatId: string; + messageId?: string; + updatedAt: string; + }; +} + +const DEFAULT_ALLOWED_TOOLS = [ + 'Bash', 'Read', 'Edit', 'Write', 'Glob', 'Grep', 'Task', + 'web_search', 'conversation_search', +]; + +/** + * Single agent instance + */ +export class AgentInstance { + readonly configId: string; + readonly name: string; + readonly workspace: string; + readonly model: string | undefined; + + private state: AgentState; + private statePath: string; + private allowedTools: string[]; + private processing = false; + private messageQueue: Array<{ + msg: InboundMessage; + adapter: ChannelAdapter; + resolve: (value: void) => void; + reject: (error: Error) => void; + }> = []; + + constructor(config: AgentInstanceConfig) { + this.configId = config.configId; + this.name = config.name; + this.workspace = this.resolveWorkspace(config.workspace); + this.model = config.model; + this.allowedTools = config.allowedTools || DEFAULT_ALLOWED_TOOLS; + + // State stored in ~/.lettabot/agents/{configId}/state.json + const stateDir = join(homedir(), '.lettabot', 'agents', config.configId); + this.statePath = join(stateDir, 'state.json'); + + // Ensure directories exist + mkdirSync(stateDir, { recursive: true }); + mkdirSync(this.workspace, { recursive: true }); + + // Load existing state + this.state = this.loadState(); + + console.log(`[Agent:${this.configId}] Initialized. Letta ID: ${this.state.agentId || '(new)'}`); + } + + /** + * Get the Letta Cloud agent ID + */ + get agentId(): string | null { + return this.state.agentId; + } + + /** + * Get the current conversation ID + */ + get conversationId(): string | null { + return this.state.conversationId || null; + } + + /** + * Get last message target for heartbeats + */ + get lastMessageTarget(): AgentState['lastMessageTarget'] | null { + return this.state.lastMessageTarget || null; + } + + /** + * Process an incoming message + */ + async processMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { + return new Promise((resolve, reject) => { + this.messageQueue.push({ msg, adapter, resolve, reject }); + if (!this.processing) { + this.processQueue(); + } + }); + } + + /** + * Send a message to the agent (for cron, heartbeat, etc.) + */ + async sendToAgent(text: string, _context?: TriggerContext): Promise { + const baseOptions = { + permissionMode: 'bypassPermissions' as const, + allowedTools: this.allowedTools, + cwd: this.workspace, + }; + + let session: Session; + let usedDefaultConversation = false; + let usedSpecificConversation = false; + + if (this.state.conversationId) { + usedSpecificConversation = true; + session = resumeSession(this.state.conversationId, baseOptions); + } else if (this.state.agentId) { + usedDefaultConversation = true; + session = resumeSession(this.state.agentId, baseOptions); + } else { + const newAgentId = await createAgent({ + ...baseOptions, + model: this.model, + memory: loadMemoryBlocks(this.name), + systemPrompt: SYSTEM_PROMPT, + memfs: true, + }); + session = resumeSession(newAgentId, baseOptions); + } + + try { + try { + await session.send(text); + } catch (error) { + if (usedSpecificConversation && this.state.agentId) { + console.warn(`[Agent:${this.configId}] Conversation missing, creating new...`); + session.close(); + session = createSession(this.state.agentId, baseOptions); + await session.send(text); + } else if (usedDefaultConversation && this.state.agentId) { + console.warn(`[Agent:${this.configId}] Default conversation missing, creating new...`); + session.close(); + session = createSession(this.state.agentId, baseOptions); + await session.send(text); + } else { + throw error; + } + } + + let response = ''; + for await (const streamMsg of session.stream()) { + if (streamMsg.type === 'assistant') { + response += streamMsg.content; + } + if (streamMsg.type === 'result') { + this.handleSessionResult(session); + break; + } + } + + return response; + } finally { + session.close(); + } + } + + /** + * Reset the agent (clear stored state) + */ + reset(): void { + this.state = { agentId: null }; + this.saveState(); + console.log(`[Agent:${this.configId}] Reset`); + } + + /** + * Set the agent ID (for container deploys that discover existing agents) + */ + setAgentId(agentId: string): void { + this.state.agentId = agentId; + this.saveState(); + console.log(`[Agent:${this.configId}] Agent ID set to: ${agentId}`); + } + + /** + * Process message queue sequentially + */ + private async processQueue(): Promise { + if (this.processing || this.messageQueue.length === 0) return; + + this.processing = true; + + while (this.messageQueue.length > 0) { + const { msg, adapter, resolve, reject } = this.messageQueue.shift()!; + try { + await this.handleMessage(msg, adapter); + resolve(); + } catch (error) { + reject(error instanceof Error ? error : new Error(String(error))); + } + } + + this.processing = false; + } + + /** + * Handle a single message + */ + private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { + console.log(`[Agent:${this.configId}] Message from ${msg.userId}: ${msg.text.slice(0, 50)}...`); + + // Track last message target for heartbeats + this.state.lastMessageTarget = { + channel: msg.channel, + chatId: msg.chatId, + messageId: msg.messageId, + updatedAt: new Date().toISOString(), + }; + this.saveState(); + + // Start typing indicator + await adapter.sendTypingIndicator(msg.chatId); + + const baseOptions = { + permissionMode: 'bypassPermissions' as const, + allowedTools: this.allowedTools, + cwd: this.workspace, + }; + + let session: Session; + let usedDefaultConversation = false; + let usedSpecificConversation = false; + + if (this.state.conversationId) { + usedSpecificConversation = true; + process.env.LETTA_AGENT_ID = this.state.agentId || undefined; + session = resumeSession(this.state.conversationId, baseOptions); + } else if (this.state.agentId) { + usedDefaultConversation = true; + process.env.LETTA_AGENT_ID = this.state.agentId; + session = resumeSession(this.state.agentId, baseOptions); + } else { + const newAgentId = await createAgent({ + ...baseOptions, + model: this.model, + memory: loadMemoryBlocks(this.name), + systemPrompt: SYSTEM_PROMPT, + memfs: true, + }); + session = resumeSession(newAgentId, baseOptions); + } + + try { + // Initialize session with timeout + const initTimeoutMs = 30000; + let initInfo; + + try { + initInfo = await this.withTimeout(session.initialize(), 'Session initialize', initTimeoutMs); + } catch (error) { + if (usedSpecificConversation && this.state.agentId) { + console.warn(`[Agent:${this.configId}] Conversation missing, creating new...`); + session.close(); + session = createSession(this.state.agentId, baseOptions); + initInfo = await this.withTimeout(session.initialize(), 'Session initialize', initTimeoutMs); + } else if (usedDefaultConversation && this.state.agentId) { + console.warn(`[Agent:${this.configId}] Default conversation missing, creating new...`); + session.close(); + session = createSession(this.state.agentId, baseOptions); + initInfo = await this.withTimeout(session.initialize(), 'Session initialize', initTimeoutMs); + } else { + throw error; + } + } + + // Send message + const formattedMessage = formatMessageEnvelope(msg); + await this.withTimeout(session.send(formattedMessage), 'Session send', initTimeoutMs); + + // Stream response + let response = ''; + let messageId: string | null = null; + let lastUpdate = Date.now(); + let sentAnyMessage = false; + + // Keep typing indicator alive + const typingInterval = setInterval(() => { + adapter.sendTypingIndicator(msg.chatId).catch(() => {}); + }, 4000); + + try { + for await (const streamMsg of session.stream()) { + if (streamMsg.type === 'assistant') { + response += streamMsg.content; + + // Stream updates for channels that support editing + const canEdit = adapter.supportsEditing?.() ?? true; + if (canEdit && Date.now() - lastUpdate > 500 && response.length > 0) { + try { + if (messageId) { + await adapter.editMessage(msg.chatId, messageId, response); + } else { + const result = await adapter.sendMessage({ + chatId: msg.chatId, + text: response, + threadId: msg.threadId, + }); + messageId = result.messageId; + } + sentAnyMessage = true; + } catch { + // Ignore edit errors + } + lastUpdate = Date.now(); + } + } + + if (streamMsg.type === 'result') { + this.handleSessionResult(session); + break; + } + } + } finally { + clearInterval(typingInterval); + } + + // Send final response + if (response.trim()) { + try { + if (messageId) { + await adapter.editMessage(msg.chatId, messageId, response); + } else { + await adapter.sendMessage({ + chatId: msg.chatId, + text: response, + threadId: msg.threadId, + }); + sentAnyMessage = true; + } + } catch (sendError) { + console.error(`[Agent:${this.configId}] Error sending response:`, sendError); + if (!messageId) { + await adapter.sendMessage({ + chatId: msg.chatId, + text: response, + threadId: msg.threadId, + }); + sentAnyMessage = true; + } + } + } + + + } catch (error) { + console.error(`[Agent:${this.configId}] Error:`, error); + await adapter.sendMessage({ + chatId: msg.chatId, + text: `Error: ${error instanceof Error ? error.message : 'Unknown error'}`, + threadId: msg.threadId, + }); + } finally { + session.close(); + } + } + + /** + * Handle session result - save agent/conversation IDs + */ + private handleSessionResult(session: Session): void { + const isNewAgent = !this.state.agentId && session.agentId; + + if (session.agentId && session.agentId !== this.state.agentId) { + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.state.agentId = session.agentId; + this.state.baseUrl = currentBaseUrl; + this.state.lastUsedAt = new Date().toISOString(); + if (!this.state.createdAt) { + this.state.createdAt = new Date().toISOString(); + } + } + + if (session.conversationId && session.conversationId !== this.state.conversationId) { + this.state.conversationId = session.conversationId; + } + + this.saveState(); + + // Set agent name on new creation + if (isNewAgent && session.agentId) { + updateAgentName(session.agentId, this.name).catch(() => {}); + } + } + + /** + * Wrap a promise with timeout + */ + private async withTimeout(promise: Promise, label: string, timeoutMs: number): Promise { + let timeoutId: NodeJS.Timeout; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject(new Error(`${label} timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + clearTimeout(timeoutId!); + } + } + + /** + * Resolve workspace path + */ + private resolveWorkspace(workspace: string): string { + if (workspace.startsWith('~')) { + return workspace.replace('~', homedir()); + } + return resolve(workspace); + } + + /** + * Load state from disk + */ + private loadState(): AgentState { + try { + if (existsSync(this.statePath)) { + const raw = readFileSync(this.statePath, 'utf-8'); + return JSON.parse(raw) as AgentState; + } + } catch (e) { + console.error(`[Agent:${this.configId}] Failed to load state:`, e); + } + return { agentId: null }; + } + + /** + * Save state to disk + */ + private saveState(): void { + try { + writeFileSync(this.statePath, JSON.stringify(this.state, null, 2)); + } catch (e) { + console.error(`[Agent:${this.configId}] Failed to save state:`, e); + } + } +} diff --git a/src/core/agent-manager.ts b/src/core/agent-manager.ts new file mode 100644 index 0000000..57fcf32 --- /dev/null +++ b/src/core/agent-manager.ts @@ -0,0 +1,158 @@ +/** + * Agent Manager + * + * Manages multiple agent instances in multi-agent mode. + * Creates, retrieves, and manages lifecycle of AgentInstance objects. + */ + +import type { NormalizedConfig, AgentConfig } from '../config/types.js'; +import { AgentInstance, type AgentInstanceConfig } from './agent-instance.js'; +import { agentExists, findAgentByName } from '../tools/letta-api.js'; + +/** + * Manager for multiple agent instances + */ +export class AgentManager { + private agents: Map = new Map(); + private defaultAgentId: string; + private config: NormalizedConfig; + + constructor(config: NormalizedConfig) { + this.config = config; + + // Find default agent + const defaultAgent = config.agents.list.find(a => a.default) || config.agents.list[0]; + this.defaultAgentId = defaultAgent.id; + + // Create agent instances + for (const agentConfig of config.agents.list) { + const instance = this.createInstance(agentConfig); + this.agents.set(agentConfig.id, instance); + } + + console.log(`[AgentManager] Initialized ${this.agents.size} agent(s). Default: ${this.defaultAgentId}`); + } + + /** + * Get an agent instance by ID + */ + getAgent(agentId: string): AgentInstance | undefined { + return this.agents.get(agentId); + } + + /** + * Get the default agent + */ + getDefaultAgent(): AgentInstance { + return this.agents.get(this.defaultAgentId)!; + } + + /** + * Get the default agent ID + */ + getDefaultAgentId(): string { + return this.defaultAgentId; + } + + /** + * List all agent IDs + */ + listAgentIds(): string[] { + return Array.from(this.agents.keys()); + } + + /** + * List all agents with their info + */ + listAgents(): Array<{ + configId: string; + name: string; + workspace: string; + agentId: string | null; + isDefault: boolean; + }> { + return Array.from(this.agents.values()).map(agent => ({ + configId: agent.configId, + name: agent.name, + workspace: agent.workspace, + agentId: agent.agentId, + isDefault: agent.configId === this.defaultAgentId, + })); + } + + /** + * Verify all agents exist on the server + * Clears agent IDs that no longer exist + */ + async verifyAgents(): Promise { + for (const [configId, agent] of this.agents) { + if (agent.agentId) { + const exists = await agentExists(agent.agentId); + if (!exists) { + console.log(`[AgentManager] Agent ${configId} (${agent.agentId}) not found on server, clearing...`); + agent.reset(); + } + } + } + } + + /** + * Try to discover existing agents by name on the server + * Useful for container deploys where agents already exist + */ + async discoverAgentsByName(): Promise { + for (const [configId, agent] of this.agents) { + if (!agent.agentId) { + console.log(`[AgentManager] Searching for existing agent named "${agent.name}"...`); + const found = await findAgentByName(agent.name); + if (found) { + console.log(`[AgentManager] Found existing agent: ${found.id}`); + agent.setAgentId(found.id); + } + } + } + } + + /** + * Get status summary + */ + getStatus(): { + totalAgents: number; + defaultAgentId: string; + agents: Array<{ + configId: string; + name: string; + agentId: string | null; + isDefault: boolean; + }>; + } { + return { + totalAgents: this.agents.size, + defaultAgentId: this.defaultAgentId, + agents: this.listAgents(), + }; + } + + /** + * Create an agent instance from config + */ + private createInstance(agentConfig: AgentConfig): AgentInstance { + const defaultModel = this.config.agents.defaults?.model; + + const instanceConfig: AgentInstanceConfig = { + configId: agentConfig.id, + name: agentConfig.name || agentConfig.id, + workspace: agentConfig.workspace, + model: agentConfig.model || defaultModel, + }; + + return new AgentInstance(instanceConfig); + } +} + +/** + * Create an agent manager from normalized config + */ +export function createAgentManager(config: NormalizedConfig): AgentManager { + return new AgentManager(config); +} diff --git a/src/core/gateway.ts b/src/core/gateway.ts index e03b96c..b2653c6 100644 --- a/src/core/gateway.ts +++ b/src/core/gateway.ts @@ -1,125 +1,150 @@ /** - * LettaGateway - Orchestrates multiple agent sessions. - * - * In multi-agent mode, the gateway manages multiple AgentSession instances, - * each with their own channels, message queue, and state. - * - * See: docs/multi-agent-architecture.md + * Gateway - Message routing layer between channels and agents + * + * This replaces the direct bot->channel connection with a router + * that can direct messages to different agents based on bindings. */ -import type { AgentSession, AgentRouter } from './interfaces.js'; -import type { TriggerContext } from './types.js'; -import type { StreamMsg } from './bot.js'; - -export class LettaGateway implements AgentRouter { - private agents: Map = new Map(); +import type { ChannelAdapter } from '../channels/types.js'; +import type { InboundMessage } from './types.js'; +import type { NormalizedConfig } from '../config/types.js'; +import { AgentManager, createAgentManager } from './agent-manager.js'; +import { MessageRouter, createRouter, type RoutingContext } from '../routing/router.js'; +/** + * Gateway manages channel adapters and routes messages to agents + */ +export class Gateway { + private config: NormalizedConfig; + private agentManager: AgentManager; + private router: MessageRouter; + private channels: Map = new Map(); + + constructor(config: NormalizedConfig) { + this.config = config; + this.agentManager = createAgentManager(config); + this.router = createRouter(config.bindings, this.agentManager.getDefaultAgentId()); + } + /** - * Add a named agent session to the gateway. - * @throws if name is empty or already exists + * Register a channel adapter */ - addAgent(name: string, session: AgentSession): void { - if (!name?.trim()) { - throw new Error('Agent name cannot be empty'); - } - if (this.agents.has(name)) { - throw new Error(`Agent "${name}" already exists`); - } - this.agents.set(name, session); - console.log(`[Gateway] Added agent: ${name}`); + registerChannel(adapter: ChannelAdapter): void { + const key = `${adapter.id}:${adapter.accountId}`; + this.channels.set(key, adapter); + + // Wire up message handler with routing + adapter.onMessage = async (msg: InboundMessage) => { + await this.handleMessage(msg, adapter); + }; + + console.log(`[Gateway] Registered channel: ${adapter.name} (${key})`); } - - /** Get an agent session by name */ - getAgent(name: string): AgentSession | undefined { - return this.agents.get(name); + + /** + * Get a channel adapter by ID and account + */ + getChannel(channelId: string, accountId: string = 'default'): ChannelAdapter | undefined { + return this.channels.get(`${channelId}:${accountId}`); } - - /** Get all agent names */ - getAgentNames(): string[] { - return Array.from(this.agents.keys()); + + /** + * Get all registered channels + */ + getChannels(): ChannelAdapter[] { + return Array.from(this.channels.values()); } - - /** Get agent count */ - get size(): number { - return this.agents.size; + + /** + * Get the agent manager + */ + getAgentManager(): AgentManager { + return this.agentManager; } - - /** Start all agents */ + + /** + * Start all channels + */ async start(): Promise { - console.log(`[Gateway] Starting ${this.agents.size} agent(s)...`); - const results = await Promise.allSettled( - Array.from(this.agents.entries()).map(async ([name, session]) => { - await session.start(); - console.log(`[Gateway] Started: ${name}`); - }) - ); - const failed = results.filter(r => r.status === 'rejected'); - if (failed.length > 0) { - console.error(`[Gateway] ${failed.length} agent(s) failed to start`); - } - console.log(`[Gateway] ${results.length - failed.length}/${results.length} agents started`); - } - - /** Stop all agents */ - async stop(): Promise { - console.log(`[Gateway] Stopping all agents...`); - for (const [name, session] of this.agents) { + // Verify agents exist on server + await this.agentManager.verifyAgents(); + + // Start all channels + for (const adapter of this.channels.values()) { try { - await session.stop(); - console.log(`[Gateway] Stopped: ${name}`); - } catch (e) { - console.error(`[Gateway] Failed to stop ${name}:`, e); + await adapter.start(); + console.log(`[Gateway] Started channel: ${adapter.name}`); + } catch (error) { + console.error(`[Gateway] Failed to start ${adapter.name}:`, error); } } } - + /** - * Send a message to a named agent and return the response. - * If no name is given, routes to the first registered agent. + * Stop all channels */ - async sendToAgent(agentName: string | undefined, text: string, context?: TriggerContext): Promise { - const agent = this.resolveAgent(agentName); - return agent.sendToAgent(text, context); - } - - /** - * Stream a message to a named agent, yielding chunks as they arrive. - */ - async *streamToAgent(agentName: string | undefined, text: string, context?: TriggerContext): AsyncGenerator { - const agent = this.resolveAgent(agentName); - yield* agent.streamToAgent(text, context); - } - - /** - * Resolve an agent by name, defaulting to the first registered agent. - */ - private resolveAgent(name?: string): AgentSession { - if (!name) { - const first = this.agents.values().next().value; - if (!first) throw new Error('No agents configured'); - return first; - } - const agent = this.agents.get(name); - if (!agent) throw new Error(`Agent not found: ${name}`); - return agent; - } - - /** - * Deliver a message to a channel. - * Finds the agent that owns the channel and delegates. - */ - async deliverToChannel( - channelId: string, - chatId: string, - options: { text?: string; filePath?: string; kind?: 'image' | 'file' } - ): Promise { - // Try each agent until one owns the channel - for (const [name, session] of this.agents) { - const status = session.getStatus(); - if (status.channels.includes(channelId)) { - return session.deliverToChannel(channelId, chatId, options); + async stop(): Promise { + for (const adapter of this.channels.values()) { + try { + await adapter.stop(); + } catch (error) { + console.error(`[Gateway] Error stopping ${adapter.name}:`, error); } } - throw new Error(`No agent owns channel: ${channelId}`); + } + + /** + * Handle an incoming message - route to appropriate agent + */ + private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { + // Build routing context + const ctx: RoutingContext = { + channel: msg.channel, + accountId: msg.accountId || adapter.accountId, + peerId: msg.chatId, + peerKind: msg.isGroup ? 'group' : 'dm', + }; + + // Route to agent + const result = this.router.route(ctx); + const routeDesc = this.router.describeRoute(ctx); + console.log(`[Gateway] Routing ${msg.channel}:${msg.chatId} ${routeDesc}`); + + // Get agent and process + const agent = this.agentManager.getAgent(result.agentId); + if (!agent) { + console.error(`[Gateway] Agent not found: ${result.agentId}`); + await adapter.sendMessage({ + chatId: msg.chatId, + text: `Error: Agent "${result.agentId}" not found`, + threadId: msg.threadId, + }); + return; + } + + // Process with agent + await agent.processMessage(msg, adapter); + } + + /** + * Get status summary + */ + getStatus(): { + channels: string[]; + agents: ReturnType; + bindings: number; + } { + return { + channels: Array.from(this.channels.keys()), + agents: this.agentManager.getStatus(), + bindings: this.config.bindings.length, + }; } } + +/** + * Create a gateway from normalized config + */ +export function createGateway(config: NormalizedConfig): Gateway { + return new Gateway(config); +} diff --git a/src/routing/index.ts b/src/routing/index.ts new file mode 100644 index 0000000..5ef6bc5 --- /dev/null +++ b/src/routing/index.ts @@ -0,0 +1 @@ +export * from './router.js'; diff --git a/src/routing/router.ts b/src/routing/router.ts new file mode 100644 index 0000000..c3f3959 --- /dev/null +++ b/src/routing/router.ts @@ -0,0 +1,194 @@ +/** + * Message Router + * + * Routes incoming messages to the correct agent based on bindings. + * Priority: peer match > accountId match > channel match > default agent + */ + +import type { AgentBinding } from '../config/types.js'; + +/** + * Context for routing a message + */ +export interface RoutingContext { + /** Channel type: "telegram", "slack", etc. */ + channel: string; + /** Account ID for multi-account channels */ + accountId?: string; + /** Chat/User ID */ + peerId?: string; + /** Type of peer */ + peerKind?: 'dm' | 'group'; +} + +/** + * Result of routing + */ +export interface RoutingResult { + /** Target agent ID */ + agentId: string; + /** Which binding matched (null if default) */ + matchedBinding: AgentBinding | null; + /** Match specificity level */ + matchLevel: 'peer' | 'account' | 'channel' | 'default'; +} + +/** + * Message router that matches bindings to find target agent + */ +export class MessageRouter { + private bindings: AgentBinding[]; + private defaultAgentId: string; + + constructor(bindings: AgentBinding[], defaultAgentId: string) { + // Sort bindings by specificity (most specific first) + this.bindings = this.sortBySpecificity(bindings); + this.defaultAgentId = defaultAgentId; + } + + /** + * Route a message to an agent + */ + route(ctx: RoutingContext): RoutingResult { + for (const binding of this.bindings) { + const matchLevel = this.getMatchLevel(binding, ctx); + if (matchLevel) { + return { + agentId: binding.agentId, + matchedBinding: binding, + matchLevel, + }; + } + } + + return { + agentId: this.defaultAgentId, + matchedBinding: null, + matchLevel: 'default', + }; + } + + /** + * Get the match level for a binding against context + * Returns null if no match, or the specificity level if matched + */ + private getMatchLevel(binding: AgentBinding, ctx: RoutingContext): 'peer' | 'account' | 'channel' | null { + // Channel must always match + if (binding.match.channel !== ctx.channel) { + return null; + } + + // Check peer match (most specific) + if (binding.match.peer) { + if (!ctx.peerId || !ctx.peerKind) { + return null; + } + if (binding.match.peer.kind !== ctx.peerKind) { + return null; + } + if (binding.match.peer.id !== ctx.peerId) { + return null; + } + // Peer matches - also check accountId if specified + if (binding.match.accountId && binding.match.accountId !== ctx.accountId) { + return null; + } + return 'peer'; + } + + // Check account match + if (binding.match.accountId) { + if (binding.match.accountId !== ctx.accountId) { + return null; + } + return 'account'; + } + + // Channel-only match (least specific) + return 'channel'; + } + + /** + * Sort bindings by specificity + * + * Order: peer+account > peer > account > channel + */ + private sortBySpecificity(bindings: AgentBinding[]): AgentBinding[] { + return [...bindings].sort((a, b) => { + const scoreA = this.getSpecificityScore(a); + const scoreB = this.getSpecificityScore(b); + return scoreB - scoreA; // Higher score = more specific = first + }); + } + + /** + * Calculate specificity score for sorting + */ + private getSpecificityScore(binding: AgentBinding): number { + let score = 0; + + // Peer match is most specific + if (binding.match.peer) { + score += 100; + } + + // Account ID adds specificity + if (binding.match.accountId) { + score += 10; + } + + // Channel is baseline (always present) + score += 1; + + return score; + } + + /** + * Get all bindings for a specific agent + */ + getBindingsForAgent(agentId: string): AgentBinding[] { + return this.bindings.filter(b => b.agentId === agentId); + } + + /** + * Get all unique agent IDs from bindings + */ + getRoutedAgentIds(): string[] { + const ids = new Set(); + for (const binding of this.bindings) { + ids.add(binding.agentId); + } + ids.add(this.defaultAgentId); + return Array.from(ids); + } + + /** + * Debug: describe the routing decision for a context + */ + describeRoute(ctx: RoutingContext): string { + const result = this.route(ctx); + + if (result.matchLevel === 'default') { + return `→ ${result.agentId} (default agent)`; + } + + const binding = result.matchedBinding!; + const matchDesc = []; + matchDesc.push(`channel=${binding.match.channel}`); + if (binding.match.accountId) { + matchDesc.push(`account=${binding.match.accountId}`); + } + if (binding.match.peer) { + matchDesc.push(`peer=${binding.match.peer.kind}:${binding.match.peer.id}`); + } + + return `→ ${result.agentId} (matched: ${matchDesc.join(', ')})`; + } +} + +/** + * Create a router from normalized config + */ +export function createRouter(bindings: AgentBinding[], defaultAgentId: string): MessageRouter { + return new MessageRouter(bindings, defaultAgentId); +}