From 2fbd767c50060d38bc08c02b1b80853709e97602 Mon Sep 17 00:00:00 2001 From: Cameron Date: Sun, 8 Feb 2026 21:41:45 -0800 Subject: [PATCH] feat: add AgentSession interface and LettaGateway orchestrator (Phase 1b) (#216) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Interface-first multi-agent orchestration layer. - Define AgentSession interface capturing the contract consumers depend on - LettaBot implements AgentSession (already has all methods, now explicit) - LettaGateway manages multiple named AgentSession instances - Update heartbeat, cron, polling, API server to depend on interface, not concrete class - 8 new gateway tests No behavioral changes. Consumers that used LettaBot now use AgentSession interface, enabling multi-agent without modifying consumer code. Part of #109 Written by Cameron ◯ Letta Code "First, solve the problem. Then, write the code." -- John Johnson --- src/api/server.ts | 4 +- src/core/bot.ts | 3 +- src/core/gateway.test.ts | 92 ++++++++++++++++++++++++++++++++++++++++ src/core/gateway.ts | 92 ++++++++++++++++++++++++++++++++++++++++ src/core/index.ts | 2 + src/core/interfaces.ts | 56 ++++++++++++++++++++++++ src/cron/heartbeat.ts | 6 +-- src/cron/service.ts | 6 +-- src/polling/service.ts | 6 +-- 9 files changed, 255 insertions(+), 12 deletions(-) create mode 100644 src/core/gateway.test.ts create mode 100644 src/core/gateway.ts create mode 100644 src/core/interfaces.ts diff --git a/src/api/server.ts b/src/api/server.ts index 443f3dc..d371fe2 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -8,7 +8,7 @@ import * as fs from 'fs'; import { validateApiKey } from './auth.js'; import type { SendMessageRequest, SendMessageResponse, SendFileResponse } from './types.js'; import { parseMultipart } from './multipart.js'; -import type { LettaBot } from '../core/bot.js'; +import type { AgentSession } from '../core/interfaces.js'; import type { ChannelId } from '../core/types.js'; const VALID_CHANNELS: ChannelId[] = ['telegram', 'slack', 'discord', 'whatsapp', 'signal']; @@ -26,7 +26,7 @@ interface ServerOptions { /** * Create and start the HTTP API server */ -export function createApiServer(bot: LettaBot, options: ServerOptions): http.Server { +export function createApiServer(bot: AgentSession, options: ServerOptions): http.Server { const server = http.createServer(async (req, res) => { // Set CORS headers (configurable origin, defaults to same-origin for security) const corsOrigin = options.corsOrigin || req.headers.origin || 'null'; diff --git a/src/core/bot.ts b/src/core/bot.ts index 980dd5b..7773616 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -8,6 +8,7 @@ import { createAgent, createSession, resumeSession, imageFromFile, imageFromURL, import { mkdirSync } from 'node:fs'; import type { ChannelAdapter } from '../channels/types.js'; import type { BotConfig, InboundMessage, TriggerContext } from './types.js'; +import type { AgentSession } from './interfaces.js'; import { Store } from './store.js'; import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval } from '../tools/letta-api.js'; import { installSkillsToAgent } from '../skills/loader.js'; @@ -79,7 +80,7 @@ async function buildMultimodalMessage( return content.length > 1 ? content : formattedText; } -export class LettaBot { +export class LettaBot implements AgentSession { private store: Store; private config: BotConfig; private channels: Map = new Map(); diff --git a/src/core/gateway.test.ts b/src/core/gateway.test.ts new file mode 100644 index 0000000..c347431 --- /dev/null +++ b/src/core/gateway.test.ts @@ -0,0 +1,92 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { LettaGateway } from './gateway.js'; +import type { AgentSession } from './interfaces.js'; + +function createMockSession(channels: string[] = ['telegram']): AgentSession { + return { + registerChannel: vi.fn(), + setGroupBatcher: vi.fn(), + processGroupBatch: vi.fn(), + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + sendToAgent: vi.fn().mockResolvedValue('response'), + deliverToChannel: vi.fn().mockResolvedValue('msg-123'), + getStatus: vi.fn().mockReturnValue({ agentId: 'agent-123', channels }), + setAgentId: vi.fn(), + reset: vi.fn(), + getLastMessageTarget: vi.fn().mockReturnValue(null), + getLastUserMessageTime: vi.fn().mockReturnValue(null), + }; +} + +describe('LettaGateway', () => { + let gateway: LettaGateway; + + beforeEach(() => { + gateway = new LettaGateway(); + }); + + it('adds and retrieves agents', () => { + const session = createMockSession(); + gateway.addAgent('test', session); + expect(gateway.getAgent('test')).toBe(session); + expect(gateway.getAgentNames()).toEqual(['test']); + expect(gateway.size).toBe(1); + }); + + it('rejects empty agent names', () => { + expect(() => gateway.addAgent('', createMockSession())).toThrow('empty'); + }); + + it('rejects duplicate agent names', () => { + gateway.addAgent('test', createMockSession()); + expect(() => gateway.addAgent('test', createMockSession())).toThrow('already exists'); + }); + + it('starts all agents', async () => { + const s1 = createMockSession(); + const s2 = createMockSession(); + gateway.addAgent('a', s1); + gateway.addAgent('b', s2); + await gateway.start(); + expect(s1.start).toHaveBeenCalled(); + expect(s2.start).toHaveBeenCalled(); + }); + + it('stops all agents', async () => { + const s1 = createMockSession(); + const s2 = createMockSession(); + gateway.addAgent('a', s1); + gateway.addAgent('b', s2); + await gateway.stop(); + expect(s1.stop).toHaveBeenCalled(); + expect(s2.stop).toHaveBeenCalled(); + }); + + it('routes deliverToChannel to correct agent', async () => { + const s1 = createMockSession(['telegram']); + const s2 = createMockSession(['discord']); + gateway.addAgent('a', s1); + gateway.addAgent('b', s2); + + await gateway.deliverToChannel('discord', 'chat-1', { text: 'hello' }); + expect(s2.deliverToChannel).toHaveBeenCalledWith('discord', 'chat-1', { text: 'hello' }); + expect(s1.deliverToChannel).not.toHaveBeenCalled(); + }); + + it('throws when no agent owns channel', async () => { + gateway.addAgent('a', createMockSession(['telegram'])); + await expect(gateway.deliverToChannel('slack', 'ch-1', { text: 'hi' })).rejects.toThrow('No agent owns channel'); + }); + + it('handles start failures gracefully', async () => { + const good = createMockSession(); + const bad = createMockSession(); + (bad.start as any).mockRejectedValue(new Error('boom')); + gateway.addAgent('good', good); + gateway.addAgent('bad', bad); + // Should not throw -- uses Promise.allSettled + await gateway.start(); + expect(good.start).toHaveBeenCalled(); + }); +}); diff --git a/src/core/gateway.ts b/src/core/gateway.ts new file mode 100644 index 0000000..b7609e0 --- /dev/null +++ b/src/core/gateway.ts @@ -0,0 +1,92 @@ +/** + * LettaGateway - Orchestrates multiple agent sessions. + * + * In multi-agent mode, the gateway manages multiple AgentSession instances, + * each with their own channels, message queue, and state. + * + * See: docs/multi-agent-architecture.md + */ + +import type { AgentSession } from './interfaces.js'; + +export class LettaGateway { + private agents: Map = new Map(); + + /** + * Add a named agent session to the gateway. + * @throws if name is empty or already exists + */ + addAgent(name: string, session: AgentSession): void { + if (!name?.trim()) { + throw new Error('Agent name cannot be empty'); + } + if (this.agents.has(name)) { + throw new Error(`Agent "${name}" already exists`); + } + this.agents.set(name, session); + console.log(`[Gateway] Added agent: ${name}`); + } + + /** Get an agent session by name */ + getAgent(name: string): AgentSession | undefined { + return this.agents.get(name); + } + + /** Get all agent names */ + getAgentNames(): string[] { + return Array.from(this.agents.keys()); + } + + /** Get agent count */ + get size(): number { + return this.agents.size; + } + + /** Start all agents */ + async start(): Promise { + console.log(`[Gateway] Starting ${this.agents.size} agent(s)...`); + const results = await Promise.allSettled( + Array.from(this.agents.entries()).map(async ([name, session]) => { + await session.start(); + console.log(`[Gateway] Started: ${name}`); + }) + ); + const failed = results.filter(r => r.status === 'rejected'); + if (failed.length > 0) { + console.error(`[Gateway] ${failed.length} agent(s) failed to start`); + } + console.log(`[Gateway] ${results.length - failed.length}/${results.length} agents started`); + } + + /** Stop all agents */ + async stop(): Promise { + console.log(`[Gateway] Stopping all agents...`); + for (const [name, session] of this.agents) { + try { + await session.stop(); + console.log(`[Gateway] Stopped: ${name}`); + } catch (e) { + console.error(`[Gateway] Failed to stop ${name}:`, e); + } + } + } + + /** + * Deliver a message to a channel. + * Finds the agent that owns the channel and delegates. + */ + async deliverToChannel( + channelId: string, + chatId: string, + options: { text?: string; filePath?: string; kind?: 'image' | 'file' } + ): Promise { + // Try each agent until one owns the channel + for (const [name, session] of this.agents) { + const status = session.getStatus(); + if (status.channels.includes(channelId)) { + return session.deliverToChannel(channelId, chatId, options); + } + } + throw new Error(`No agent owns channel: ${channelId}`); + } +} diff --git a/src/core/index.ts b/src/core/index.ts index 41d9732..e33ddae 100644 --- a/src/core/index.ts +++ b/src/core/index.ts @@ -5,5 +5,7 @@ export * from './types.js'; export * from './store.js'; export * from './bot.js'; +export * from './interfaces.js'; +export * from './gateway.js'; export * from './formatter.js'; export * from './prompts.js'; diff --git a/src/core/interfaces.ts b/src/core/interfaces.ts new file mode 100644 index 0000000..b0f2cb9 --- /dev/null +++ b/src/core/interfaces.ts @@ -0,0 +1,56 @@ +/** + * AgentSession interface - the contract for agent communication. + * + * Consumers (cron, heartbeat, polling, API server) depend on this interface, + * not the concrete LettaBot class. This enables multi-agent orchestration + * via LettaGateway without changing consumer code. + */ + +import type { ChannelAdapter } from '../channels/types.js'; +import type { InboundMessage, TriggerContext } from './types.js'; +import type { GroupBatcher } from './group-batcher.js'; + +export interface AgentSession { + /** Register a channel adapter */ + registerChannel(adapter: ChannelAdapter): void; + + /** Configure group message batching */ + setGroupBatcher(batcher: GroupBatcher, intervals: Map, instantGroupIds?: Set): void; + + /** Process a batched group message */ + processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void; + + /** Start all registered channels */ + start(): Promise; + + /** Stop all channels */ + stop(): Promise; + + /** Send a message to the agent (used by cron, heartbeat, polling) */ + sendToAgent(text: string, context?: TriggerContext): Promise; + + /** Deliver a message/file to a specific channel */ + deliverToChannel(channelId: string, chatId: string, options: { + text?: string; + filePath?: string; + kind?: 'image' | 'file'; + }): Promise; + + /** Get agent status */ + getStatus(): { agentId: string | null; channels: string[] }; + + /** Set agent ID (for container deploys) */ + setAgentId(agentId: string): void; + + /** Reset agent state */ + reset(): void; + + /** Get the last message target (for heartbeat delivery) */ + getLastMessageTarget(): { channel: string; chatId: string } | null; + + /** Get the time of the last user message (for heartbeat skip logic) */ + getLastUserMessageTime(): Date | null; + + /** Callback to trigger heartbeat */ + onTriggerHeartbeat?: () => Promise; +} diff --git a/src/cron/heartbeat.ts b/src/cron/heartbeat.ts index 494405d..b9e6845 100644 --- a/src/cron/heartbeat.ts +++ b/src/cron/heartbeat.ts @@ -9,7 +9,7 @@ import { appendFileSync, mkdirSync } from 'node:fs'; import { resolve, dirname } from 'node:path'; -import type { LettaBot } from '../core/bot.js'; +import type { AgentSession } from '../core/interfaces.js'; import type { TriggerContext } from '../core/types.js'; import { buildHeartbeatPrompt } from '../core/prompts.js'; import { getDataDir } from '../utils/paths.js'; @@ -57,11 +57,11 @@ export interface HeartbeatConfig { * Heartbeat Service */ export class HeartbeatService { - private bot: LettaBot; + private bot: AgentSession; private config: HeartbeatConfig; private intervalId: NodeJS.Timeout | null = null; - constructor(bot: LettaBot, config: HeartbeatConfig) { + constructor(bot: AgentSession, config: HeartbeatConfig) { this.bot = bot; this.config = config; } diff --git a/src/cron/service.ts b/src/cron/service.ts index f9fbb46..c46ea88 100644 --- a/src/cron/service.ts +++ b/src/cron/service.ts @@ -7,7 +7,7 @@ import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, watch, type FSWatcher } from 'node:fs'; import { resolve, dirname } from 'node:path'; -import type { LettaBot } from '../core/bot.js'; +import type { AgentSession } from '../core/interfaces.js'; import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js'; import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js'; import { getDataDir } from '../utils/paths.js'; @@ -49,7 +49,7 @@ const DEFAULT_HEARTBEAT: HeartbeatConfig = { export class CronService { private jobs: Map = new Map(); private scheduledJobs: Map = new Map(); - private bot: LettaBot; + private bot: AgentSession; private storePath: string; private config: CronConfig; private started = false; @@ -57,7 +57,7 @@ export class CronService { private fileWatcher: FSWatcher | null = null; private lastFileContent: string = ''; - constructor(bot: LettaBot, config?: CronConfig) { + constructor(bot: AgentSession, config?: CronConfig) { this.bot = bot; this.config = config || {}; this.storePath = config?.storePath diff --git a/src/polling/service.ts b/src/polling/service.ts index 2326dab..7dd8c37 100644 --- a/src/polling/service.ts +++ b/src/polling/service.ts @@ -8,7 +8,7 @@ import { spawnSync } from 'node:child_process'; import { existsSync, readFileSync, writeFileSync } from 'node:fs'; import { join } from 'node:path'; -import { LettaBot } from '../core/bot.js'; +import type { AgentSession } from '../core/interfaces.js'; export interface PollingConfig { intervalMs: number; // Polling interval in milliseconds @@ -21,14 +21,14 @@ export interface PollingConfig { export class PollingService { private intervalId: ReturnType | null = null; - private bot: LettaBot; + private bot: AgentSession; private config: PollingConfig; // Track seen email IDs to detect new emails (persisted to disk) private seenEmailIds: Set = new Set(); private seenEmailsPath: string; - constructor(bot: LettaBot, config: PollingConfig) { + constructor(bot: AgentSession, config: PollingConfig) { this.bot = bot; this.config = config; this.seenEmailsPath = join(config.workingDir, 'seen-emails.json');