feat: add AgentSession interface and LettaGateway orchestrator (Phase 1b) (#216)
Interface-first multi-agent orchestration layer. - Define AgentSession interface capturing the contract consumers depend on - LettaBot implements AgentSession (already has all methods, now explicit) - LettaGateway manages multiple named AgentSession instances - Update heartbeat, cron, polling, API server to depend on interface, not concrete class - 8 new gateway tests No behavioral changes. Consumers that used LettaBot now use AgentSession interface, enabling multi-agent without modifying consumer code. Part of #109 Written by Cameron ◯ Letta Code "First, solve the problem. Then, write the code." -- John Johnson
This commit is contained in:
@@ -8,7 +8,7 @@ import * as fs from 'fs';
|
||||
import { validateApiKey } from './auth.js';
|
||||
import type { SendMessageRequest, SendMessageResponse, SendFileResponse } from './types.js';
|
||||
import { parseMultipart } from './multipart.js';
|
||||
import type { LettaBot } from '../core/bot.js';
|
||||
import type { AgentSession } from '../core/interfaces.js';
|
||||
import type { ChannelId } from '../core/types.js';
|
||||
|
||||
const VALID_CHANNELS: ChannelId[] = ['telegram', 'slack', 'discord', 'whatsapp', 'signal'];
|
||||
@@ -26,7 +26,7 @@ interface ServerOptions {
|
||||
/**
|
||||
* Create and start the HTTP API server
|
||||
*/
|
||||
export function createApiServer(bot: LettaBot, options: ServerOptions): http.Server {
|
||||
export function createApiServer(bot: AgentSession, options: ServerOptions): http.Server {
|
||||
const server = http.createServer(async (req, res) => {
|
||||
// Set CORS headers (configurable origin, defaults to same-origin for security)
|
||||
const corsOrigin = options.corsOrigin || req.headers.origin || 'null';
|
||||
|
||||
@@ -8,6 +8,7 @@ import { createAgent, createSession, resumeSession, imageFromFile, imageFromURL,
|
||||
import { mkdirSync } from 'node:fs';
|
||||
import type { ChannelAdapter } from '../channels/types.js';
|
||||
import type { BotConfig, InboundMessage, TriggerContext } from './types.js';
|
||||
import type { AgentSession } from './interfaces.js';
|
||||
import { Store } from './store.js';
|
||||
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
|
||||
import { installSkillsToAgent } from '../skills/loader.js';
|
||||
@@ -79,7 +80,7 @@ async function buildMultimodalMessage(
|
||||
return content.length > 1 ? content : formattedText;
|
||||
}
|
||||
|
||||
export class LettaBot {
|
||||
export class LettaBot implements AgentSession {
|
||||
private store: Store;
|
||||
private config: BotConfig;
|
||||
private channels: Map<string, ChannelAdapter> = new Map();
|
||||
|
||||
92
src/core/gateway.test.ts
Normal file
92
src/core/gateway.test.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
||||
import { LettaGateway } from './gateway.js';
|
||||
import type { AgentSession } from './interfaces.js';
|
||||
|
||||
function createMockSession(channels: string[] = ['telegram']): AgentSession {
|
||||
return {
|
||||
registerChannel: vi.fn(),
|
||||
setGroupBatcher: vi.fn(),
|
||||
processGroupBatch: vi.fn(),
|
||||
start: vi.fn().mockResolvedValue(undefined),
|
||||
stop: vi.fn().mockResolvedValue(undefined),
|
||||
sendToAgent: vi.fn().mockResolvedValue('response'),
|
||||
deliverToChannel: vi.fn().mockResolvedValue('msg-123'),
|
||||
getStatus: vi.fn().mockReturnValue({ agentId: 'agent-123', channels }),
|
||||
setAgentId: vi.fn(),
|
||||
reset: vi.fn(),
|
||||
getLastMessageTarget: vi.fn().mockReturnValue(null),
|
||||
getLastUserMessageTime: vi.fn().mockReturnValue(null),
|
||||
};
|
||||
}
|
||||
|
||||
describe('LettaGateway', () => {
|
||||
let gateway: LettaGateway;
|
||||
|
||||
beforeEach(() => {
|
||||
gateway = new LettaGateway();
|
||||
});
|
||||
|
||||
it('adds and retrieves agents', () => {
|
||||
const session = createMockSession();
|
||||
gateway.addAgent('test', session);
|
||||
expect(gateway.getAgent('test')).toBe(session);
|
||||
expect(gateway.getAgentNames()).toEqual(['test']);
|
||||
expect(gateway.size).toBe(1);
|
||||
});
|
||||
|
||||
it('rejects empty agent names', () => {
|
||||
expect(() => gateway.addAgent('', createMockSession())).toThrow('empty');
|
||||
});
|
||||
|
||||
it('rejects duplicate agent names', () => {
|
||||
gateway.addAgent('test', createMockSession());
|
||||
expect(() => gateway.addAgent('test', createMockSession())).toThrow('already exists');
|
||||
});
|
||||
|
||||
it('starts all agents', async () => {
|
||||
const s1 = createMockSession();
|
||||
const s2 = createMockSession();
|
||||
gateway.addAgent('a', s1);
|
||||
gateway.addAgent('b', s2);
|
||||
await gateway.start();
|
||||
expect(s1.start).toHaveBeenCalled();
|
||||
expect(s2.start).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('stops all agents', async () => {
|
||||
const s1 = createMockSession();
|
||||
const s2 = createMockSession();
|
||||
gateway.addAgent('a', s1);
|
||||
gateway.addAgent('b', s2);
|
||||
await gateway.stop();
|
||||
expect(s1.stop).toHaveBeenCalled();
|
||||
expect(s2.stop).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('routes deliverToChannel to correct agent', async () => {
|
||||
const s1 = createMockSession(['telegram']);
|
||||
const s2 = createMockSession(['discord']);
|
||||
gateway.addAgent('a', s1);
|
||||
gateway.addAgent('b', s2);
|
||||
|
||||
await gateway.deliverToChannel('discord', 'chat-1', { text: 'hello' });
|
||||
expect(s2.deliverToChannel).toHaveBeenCalledWith('discord', 'chat-1', { text: 'hello' });
|
||||
expect(s1.deliverToChannel).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('throws when no agent owns channel', async () => {
|
||||
gateway.addAgent('a', createMockSession(['telegram']));
|
||||
await expect(gateway.deliverToChannel('slack', 'ch-1', { text: 'hi' })).rejects.toThrow('No agent owns channel');
|
||||
});
|
||||
|
||||
it('handles start failures gracefully', async () => {
|
||||
const good = createMockSession();
|
||||
const bad = createMockSession();
|
||||
(bad.start as any).mockRejectedValue(new Error('boom'));
|
||||
gateway.addAgent('good', good);
|
||||
gateway.addAgent('bad', bad);
|
||||
// Should not throw -- uses Promise.allSettled
|
||||
await gateway.start();
|
||||
expect(good.start).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
92
src/core/gateway.ts
Normal file
92
src/core/gateway.ts
Normal file
@@ -0,0 +1,92 @@
|
||||
/**
|
||||
* LettaGateway - Orchestrates multiple agent sessions.
|
||||
*
|
||||
* In multi-agent mode, the gateway manages multiple AgentSession instances,
|
||||
* each with their own channels, message queue, and state.
|
||||
*
|
||||
* See: docs/multi-agent-architecture.md
|
||||
*/
|
||||
|
||||
import type { AgentSession } from './interfaces.js';
|
||||
|
||||
export class LettaGateway {
|
||||
private agents: Map<string, AgentSession> = new Map();
|
||||
|
||||
/**
|
||||
* Add a named agent session to the gateway.
|
||||
* @throws if name is empty or already exists
|
||||
*/
|
||||
addAgent(name: string, session: AgentSession): void {
|
||||
if (!name?.trim()) {
|
||||
throw new Error('Agent name cannot be empty');
|
||||
}
|
||||
if (this.agents.has(name)) {
|
||||
throw new Error(`Agent "${name}" already exists`);
|
||||
}
|
||||
this.agents.set(name, session);
|
||||
console.log(`[Gateway] Added agent: ${name}`);
|
||||
}
|
||||
|
||||
/** Get an agent session by name */
|
||||
getAgent(name: string): AgentSession | undefined {
|
||||
return this.agents.get(name);
|
||||
}
|
||||
|
||||
/** Get all agent names */
|
||||
getAgentNames(): string[] {
|
||||
return Array.from(this.agents.keys());
|
||||
}
|
||||
|
||||
/** Get agent count */
|
||||
get size(): number {
|
||||
return this.agents.size;
|
||||
}
|
||||
|
||||
/** Start all agents */
|
||||
async start(): Promise<void> {
|
||||
console.log(`[Gateway] Starting ${this.agents.size} agent(s)...`);
|
||||
const results = await Promise.allSettled(
|
||||
Array.from(this.agents.entries()).map(async ([name, session]) => {
|
||||
await session.start();
|
||||
console.log(`[Gateway] Started: ${name}`);
|
||||
})
|
||||
);
|
||||
const failed = results.filter(r => r.status === 'rejected');
|
||||
if (failed.length > 0) {
|
||||
console.error(`[Gateway] ${failed.length} agent(s) failed to start`);
|
||||
}
|
||||
console.log(`[Gateway] ${results.length - failed.length}/${results.length} agents started`);
|
||||
}
|
||||
|
||||
/** Stop all agents */
|
||||
async stop(): Promise<void> {
|
||||
console.log(`[Gateway] Stopping all agents...`);
|
||||
for (const [name, session] of this.agents) {
|
||||
try {
|
||||
await session.stop();
|
||||
console.log(`[Gateway] Stopped: ${name}`);
|
||||
} catch (e) {
|
||||
console.error(`[Gateway] Failed to stop ${name}:`, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deliver a message to a channel.
|
||||
* Finds the agent that owns the channel and delegates.
|
||||
*/
|
||||
async deliverToChannel(
|
||||
channelId: string,
|
||||
chatId: string,
|
||||
options: { text?: string; filePath?: string; kind?: 'image' | 'file' }
|
||||
): Promise<string | undefined> {
|
||||
// Try each agent until one owns the channel
|
||||
for (const [name, session] of this.agents) {
|
||||
const status = session.getStatus();
|
||||
if (status.channels.includes(channelId)) {
|
||||
return session.deliverToChannel(channelId, chatId, options);
|
||||
}
|
||||
}
|
||||
throw new Error(`No agent owns channel: ${channelId}`);
|
||||
}
|
||||
}
|
||||
@@ -5,5 +5,7 @@
|
||||
export * from './types.js';
|
||||
export * from './store.js';
|
||||
export * from './bot.js';
|
||||
export * from './interfaces.js';
|
||||
export * from './gateway.js';
|
||||
export * from './formatter.js';
|
||||
export * from './prompts.js';
|
||||
|
||||
56
src/core/interfaces.ts
Normal file
56
src/core/interfaces.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
/**
|
||||
* AgentSession interface - the contract for agent communication.
|
||||
*
|
||||
* Consumers (cron, heartbeat, polling, API server) depend on this interface,
|
||||
* not the concrete LettaBot class. This enables multi-agent orchestration
|
||||
* via LettaGateway without changing consumer code.
|
||||
*/
|
||||
|
||||
import type { ChannelAdapter } from '../channels/types.js';
|
||||
import type { InboundMessage, TriggerContext } from './types.js';
|
||||
import type { GroupBatcher } from './group-batcher.js';
|
||||
|
||||
export interface AgentSession {
|
||||
/** Register a channel adapter */
|
||||
registerChannel(adapter: ChannelAdapter): void;
|
||||
|
||||
/** Configure group message batching */
|
||||
setGroupBatcher(batcher: GroupBatcher, intervals: Map<string, number>, instantGroupIds?: Set<string>): void;
|
||||
|
||||
/** Process a batched group message */
|
||||
processGroupBatch(msg: InboundMessage, adapter: ChannelAdapter): void;
|
||||
|
||||
/** Start all registered channels */
|
||||
start(): Promise<void>;
|
||||
|
||||
/** Stop all channels */
|
||||
stop(): Promise<void>;
|
||||
|
||||
/** Send a message to the agent (used by cron, heartbeat, polling) */
|
||||
sendToAgent(text: string, context?: TriggerContext): Promise<string>;
|
||||
|
||||
/** Deliver a message/file to a specific channel */
|
||||
deliverToChannel(channelId: string, chatId: string, options: {
|
||||
text?: string;
|
||||
filePath?: string;
|
||||
kind?: 'image' | 'file';
|
||||
}): Promise<string | undefined>;
|
||||
|
||||
/** Get agent status */
|
||||
getStatus(): { agentId: string | null; channels: string[] };
|
||||
|
||||
/** Set agent ID (for container deploys) */
|
||||
setAgentId(agentId: string): void;
|
||||
|
||||
/** Reset agent state */
|
||||
reset(): void;
|
||||
|
||||
/** Get the last message target (for heartbeat delivery) */
|
||||
getLastMessageTarget(): { channel: string; chatId: string } | null;
|
||||
|
||||
/** Get the time of the last user message (for heartbeat skip logic) */
|
||||
getLastUserMessageTime(): Date | null;
|
||||
|
||||
/** Callback to trigger heartbeat */
|
||||
onTriggerHeartbeat?: () => Promise<void>;
|
||||
}
|
||||
@@ -9,7 +9,7 @@
|
||||
|
||||
import { appendFileSync, mkdirSync } from 'node:fs';
|
||||
import { resolve, dirname } from 'node:path';
|
||||
import type { LettaBot } from '../core/bot.js';
|
||||
import type { AgentSession } from '../core/interfaces.js';
|
||||
import type { TriggerContext } from '../core/types.js';
|
||||
import { buildHeartbeatPrompt } from '../core/prompts.js';
|
||||
import { getDataDir } from '../utils/paths.js';
|
||||
@@ -57,11 +57,11 @@ export interface HeartbeatConfig {
|
||||
* Heartbeat Service
|
||||
*/
|
||||
export class HeartbeatService {
|
||||
private bot: LettaBot;
|
||||
private bot: AgentSession;
|
||||
private config: HeartbeatConfig;
|
||||
private intervalId: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(bot: LettaBot, config: HeartbeatConfig) {
|
||||
constructor(bot: AgentSession, config: HeartbeatConfig) {
|
||||
this.bot = bot;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
|
||||
import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, watch, type FSWatcher } from 'node:fs';
|
||||
import { resolve, dirname } from 'node:path';
|
||||
import type { LettaBot } from '../core/bot.js';
|
||||
import type { AgentSession } from '../core/interfaces.js';
|
||||
import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js';
|
||||
import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js';
|
||||
import { getDataDir } from '../utils/paths.js';
|
||||
@@ -49,7 +49,7 @@ const DEFAULT_HEARTBEAT: HeartbeatConfig = {
|
||||
export class CronService {
|
||||
private jobs: Map<string, CronJob> = new Map();
|
||||
private scheduledJobs: Map<string, import('node-schedule').Job> = new Map();
|
||||
private bot: LettaBot;
|
||||
private bot: AgentSession;
|
||||
private storePath: string;
|
||||
private config: CronConfig;
|
||||
private started = false;
|
||||
@@ -57,7 +57,7 @@ export class CronService {
|
||||
private fileWatcher: FSWatcher | null = null;
|
||||
private lastFileContent: string = '';
|
||||
|
||||
constructor(bot: LettaBot, config?: CronConfig) {
|
||||
constructor(bot: AgentSession, config?: CronConfig) {
|
||||
this.bot = bot;
|
||||
this.config = config || {};
|
||||
this.storePath = config?.storePath
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
import { spawnSync } from 'node:child_process';
|
||||
import { existsSync, readFileSync, writeFileSync } from 'node:fs';
|
||||
import { join } from 'node:path';
|
||||
import { LettaBot } from '../core/bot.js';
|
||||
import type { AgentSession } from '../core/interfaces.js';
|
||||
|
||||
export interface PollingConfig {
|
||||
intervalMs: number; // Polling interval in milliseconds
|
||||
@@ -21,14 +21,14 @@ export interface PollingConfig {
|
||||
|
||||
export class PollingService {
|
||||
private intervalId: ReturnType<typeof setInterval> | null = null;
|
||||
private bot: LettaBot;
|
||||
private bot: AgentSession;
|
||||
private config: PollingConfig;
|
||||
|
||||
// Track seen email IDs to detect new emails (persisted to disk)
|
||||
private seenEmailIds: Set<string> = new Set();
|
||||
private seenEmailsPath: string;
|
||||
|
||||
constructor(bot: LettaBot, config: PollingConfig) {
|
||||
constructor(bot: AgentSession, config: PollingConfig) {
|
||||
this.bot = bot;
|
||||
this.config = config;
|
||||
this.seenEmailsPath = join(config.workingDir, 'seen-emails.json');
|
||||
|
||||
Reference in New Issue
Block a user