From 7c3ab0449434914badb2334d7b2ac5c62e291317 Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 6 Mar 2026 17:20:24 -0800 Subject: [PATCH] Low-risk simplification pass: bot pipeline, startup, API, onboarding, provider sync (#520) --- docs/getting-started.md | 2 +- package.json | 4 +- src/api/portal.html | 303 +++++++++++++ src/api/server.test.ts | 72 +++ src/api/server.ts | 511 ++++------------------ src/channels/factory.ts | 186 ++++++++ src/config/io.ts | 105 +++-- src/core/bot.ts | 279 +++++++----- src/core/result-guard.test.ts | 85 ++++ src/main.ts | 301 +------------ src/onboard.projection.test.ts | 195 +++++++++ src/onboard.ts | 772 +++++++++++++++++++-------------- src/startup/bootstrap.ts | 172 ++++++++ 13 files changed, 1799 insertions(+), 1188 deletions(-) create mode 100644 src/api/portal.html create mode 100644 src/channels/factory.ts create mode 100644 src/onboard.projection.test.ts create mode 100644 src/startup/bootstrap.ts diff --git a/docs/getting-started.md b/docs/getting-started.md index 6b092f0..e70a637 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -115,7 +115,7 @@ Pull the latest changes and rebuild: npm run update ``` -This resets the lockfile, pulls from git, installs dependencies, and rebuilds. If you've modified source files locally, stash them first with `git stash`. +This performs a fast-forward-only pull, installs dependencies, and rebuilds without resetting tracked files. ## Next Steps diff --git a/package.json b/package.json index 47d8ec1..a066072 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "setup": "tsx src/setup.ts", "dev": "tsx src/main.ts", "build": "tsc", - "postbuild": "cp -r src/looms/*.txt dist/looms/ && node scripts/fix-bin-permissions.mjs", + "postbuild": "cp -r src/looms/*.txt dist/looms/ && cp src/api/portal.html dist/api/portal.html && node scripts/fix-bin-permissions.mjs", "prepare": "npx patch-package || true", "prepublishOnly": "npm run build && npm run test:run", "start": "node dist/main.js", @@ -28,7 +28,7 @@ "skills:status": "tsx src/cli.ts skills status", "cron": "tsx src/cron/cli.ts", "pairing": "tsx src/cli.ts pairing", - "update": "git checkout -- package-lock.json && git pull && npm ci && npm run build", + "update": "git pull --ff-only && npm ci && npm run build", "skill:install": "npx clawdhub install --dir ~/.letta/skills", "skill:search": "npx clawdhub search", "skill:list": "npx clawdhub list --dir ~/.letta/skills", diff --git a/src/api/portal.html b/src/api/portal.html new file mode 100644 index 0000000..8886097 --- /dev/null +++ b/src/api/portal.html @@ -0,0 +1,303 @@ + + + + + +LettaBot Portal + + + +
+

LettaBot Portal

+ +
+ + + + +
+ + +
+
+ + + + \ No newline at end of file diff --git a/src/api/server.test.ts b/src/api/server.test.ts index dc9a8dd..61d8a93 100644 --- a/src/api/server.test.ts +++ b/src/api/server.test.ts @@ -211,6 +211,78 @@ describe('POST /api/v1/chat', () => { }); }); +describe('POST /api/v1/chat/async', () => { + let server: http.Server; + let port: number; + let router: AgentRouter; + + beforeAll(async () => { + router = createMockRouter(); + server = createApiServer(router, { + port: TEST_PORT, + apiKey: TEST_API_KEY, + host: '127.0.0.1', + }); + await new Promise((resolve) => { + if (server.listening) { resolve(); return; } + server.once('listening', resolve); + }); + port = getPort(server); + }); + + afterAll(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it('reuses shared validation: content-type guard', async () => { + const res = await request(port, 'POST', '/api/v1/chat/async', 'hello', { + 'content-type': 'text/plain', + 'x-api-key': TEST_API_KEY, + }); + expect(res.status).toBe(400); + expect(JSON.parse(res.body).error).toContain('application/json'); + }); + + it('reuses shared validation: missing message', async () => { + const res = await request(port, 'POST', '/api/v1/chat/async', '{"agent":"LettaBot"}', { + 'content-type': 'application/json', + 'x-api-key': TEST_API_KEY, + }); + expect(res.status).toBe(400); + expect(JSON.parse(res.body).error).toContain('message'); + }); + + it('reuses shared validation: unknown agent', async () => { + const res = await request(port, 'POST', '/api/v1/chat/async', '{"message":"hi","agent":"unknown"}', { + 'content-type': 'application/json', + 'x-api-key': TEST_API_KEY, + }); + expect(res.status).toBe(404); + expect(JSON.parse(res.body).error).toContain('Agent not found'); + expect(JSON.parse(res.body).error).toContain('LettaBot'); + }); + + it('returns 202 and queues background delivery', async () => { + (router as any).sendToAgent = vi.fn().mockResolvedValue('done'); + + const res = await request(port, 'POST', '/api/v1/chat/async', '{"message":"queue me"}', { + 'content-type': 'application/json', + 'x-api-key': TEST_API_KEY, + }); + expect(res.status).toBe(202); + const body = JSON.parse(res.body); + expect(body.success).toBe(true); + expect(body.status).toBe('queued'); + expect(body.agentName).toBe('LettaBot'); + expect((router as any).sendToAgent).toHaveBeenCalledWith( + undefined, + 'queue me', + { type: 'webhook', outputMode: 'silent' }, + ); + + }); +}); + describe('GET /portal', () => { let server: http.Server; let port: number; diff --git a/src/api/server.ts b/src/api/server.ts index 895f3b9..9d263a5 100644 --- a/src/api/server.ts +++ b/src/api/server.ts @@ -6,7 +6,7 @@ import * as http from 'http'; import * as fs from 'fs'; import { validateApiKey } from './auth.js'; -import type { SendMessageRequest, SendMessageResponse, SendFileResponse, ChatRequest, ChatResponse, AsyncChatResponse, PairingListResponse, PairingApproveRequest, PairingApproveResponse } from './types.js'; +import type { SendMessageResponse, ChatRequest, ChatResponse, AsyncChatResponse, PairingListResponse, PairingApproveRequest, PairingApproveResponse } from './types.js'; import { listPairingRequests, approvePairingCode } from '../pairing/store.js'; import { parseMultipart } from './multipart.js'; import type { AgentRouter } from '../core/interfaces.js'; @@ -26,6 +26,14 @@ const VALID_CHANNELS: ChannelId[] = ['telegram', 'slack', 'discord', 'whatsapp', const MAX_BODY_SIZE = 10 * 1024; // 10KB const MAX_TEXT_LENGTH = 10000; // 10k chars const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB +const WEBHOOK_CONTEXT = { type: 'webhook' as const, outputMode: 'silent' as const }; +const PORTAL_HTML = fs.readFileSync(new URL('./portal.html', import.meta.url), 'utf-8'); + +type ResolvedChatRequest = { + message: string; + agentName: string | undefined; + resolvedName: string; +}; interface ServerOptions { port: number; @@ -138,49 +146,11 @@ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): // Route: POST /api/v1/chat (send a message to the agent, get response) if (req.url === '/api/v1/chat' && req.method === 'POST') { try { - if (!validateApiKey(req.headers, options.apiKey)) { - sendError(res, 401, 'Unauthorized'); + const resolved = await parseWebhookChatRequest(req, res, options.apiKey, deliverer); + if (!resolved) { return; } - - const contentType = req.headers['content-type'] || ''; - if (!contentType.includes('application/json')) { - sendError(res, 400, 'Content-Type must be application/json'); - return; - } - - const body = await readBody(req, MAX_BODY_SIZE); - let chatReq: ChatRequest; - try { - chatReq = JSON.parse(body); - } catch { - sendError(res, 400, 'Invalid JSON body'); - return; - } - - if (!chatReq.message || typeof chatReq.message !== 'string') { - sendError(res, 400, 'Missing required field: message'); - return; - } - - if (chatReq.message.length > MAX_TEXT_LENGTH) { - sendError(res, 400, `Message too long (max ${MAX_TEXT_LENGTH} chars)`); - return; - } - - // Resolve agent name (defaults to first agent) - const agentName = chatReq.agent; - const agentNames = deliverer.getAgentNames(); - const resolvedName = agentName || agentNames[0]; - - if (agentName && !agentNames.includes(agentName)) { - sendError(res, 404, `Agent not found: ${agentName}. Available: ${agentNames.join(', ')}`); - return; - } - - log.info(`Chat request for agent "${resolvedName}": ${chatReq.message.slice(0, 100)}...`); - - const context = { type: 'webhook' as const, outputMode: 'silent' as const }; + log.info(`Chat request for agent "${resolved.resolvedName}": ${resolved.message.slice(0, 100)}...`); const wantsStream = (req.headers.accept || '').includes('text/event-stream'); if (wantsStream) { @@ -195,7 +165,7 @@ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): req.on('close', () => { clientDisconnected = true; }); try { - for await (const msg of deliverer.streamToAgent(agentName, chatReq.message, context)) { + for await (const msg of deliverer.streamToAgent(resolved.agentName, resolved.message, WEBHOOK_CONTEXT)) { if (clientDisconnected) break; res.write(`data: ${JSON.stringify(msg)}\n\n`); if (msg.type === 'result') break; @@ -208,12 +178,12 @@ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): res.end(); } else { // Sync: wait for full response - const response = await deliverer.sendToAgent(agentName, chatReq.message, context); + const response = await deliverer.sendToAgent(resolved.agentName, resolved.message, WEBHOOK_CONTEXT); const chatRes: ChatResponse = { success: true, response, - agentName: resolvedName, + agentName: resolved.resolvedName, }; res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(chatRes)); @@ -233,60 +203,24 @@ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): // Route: POST /api/v1/chat/async (fire-and-forget: returns 202, processes in background) if (req.url === '/api/v1/chat/async' && req.method === 'POST') { try { - if (!validateApiKey(req.headers, options.apiKey)) { - sendError(res, 401, 'Unauthorized'); + const resolved = await parseWebhookChatRequest(req, res, options.apiKey, deliverer); + if (!resolved) { return; } - - const contentType = req.headers['content-type'] || ''; - if (!contentType.includes('application/json')) { - sendError(res, 400, 'Content-Type must be application/json'); - return; - } - - const body = await readBody(req, MAX_BODY_SIZE); - let chatReq: ChatRequest; - try { - chatReq = JSON.parse(body); - } catch { - sendError(res, 400, 'Invalid JSON body'); - return; - } - - if (!chatReq.message || typeof chatReq.message !== 'string') { - sendError(res, 400, 'Missing required field: message'); - return; - } - - if (chatReq.message.length > MAX_TEXT_LENGTH) { - sendError(res, 400, `Message too long (max ${MAX_TEXT_LENGTH} chars)`); - return; - } - - const agentName = chatReq.agent; - const agentNames = deliverer.getAgentNames(); - const resolvedName = agentName || agentNames[0]; - - if (agentName && !agentNames.includes(agentName)) { - sendError(res, 404, `Agent not found: ${agentName}. Available: ${agentNames.join(', ')}`); - return; - } - - log.info(`Async chat request for agent "${resolvedName}": ${chatReq.message.slice(0, 100)}...`); + log.info(`Async chat request for agent "${resolved.resolvedName}": ${resolved.message.slice(0, 100)}...`); // Return 202 immediately const asyncRes: AsyncChatResponse = { success: true, status: 'queued', - agentName: resolvedName, + agentName: resolved.resolvedName, }; res.writeHead(202, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(asyncRes)); // Process in background (detached promise) - const context = { type: 'webhook' as const, outputMode: 'silent' as const }; - deliverer.sendToAgent(agentName, chatReq.message, context).catch((error: any) => { - log.error(`Async chat background error for agent "${resolvedName}":`, error); + deliverer.sendToAgent(resolved.agentName, resolved.message, WEBHOOK_CONTEXT).catch((error: any) => { + log.error(`Async chat background error for agent "${resolved.resolvedName}":`, error); }); } catch (error: any) { log.error('Async chat error:', error); @@ -706,7 +640,7 @@ export function createApiServer(deliverer: AgentRouter, options: ServerOptions): // Route: GET /portal - Admin portal for pairing approvals if ((req.url === '/portal' || req.url === '/portal/') && req.method === 'GET') { res.writeHead(200, { 'Content-Type': 'text/html; charset=utf-8' }); - res.end(portalHtml); + res.end(PORTAL_HTML); return; } @@ -752,35 +686,83 @@ function readBody(req: http.IncomingMessage, maxSize: number): Promise { }); } -/** - * Validate send message request - */ -function validateRequest(request: SendMessageRequest): { message: string; field?: string } | null { - if (!request.channel) { - return { message: 'Missing required field: channel', field: 'channel' }; +function ensureAuthorized(req: http.IncomingMessage, res: http.ServerResponse, apiKey: string): boolean { + if (validateApiKey(req.headers, apiKey)) { + return true; + } + sendError(res, 401, 'Unauthorized'); + return false; +} + +function ensureJsonContentType(req: http.IncomingMessage, res: http.ServerResponse): boolean { + const contentType = req.headers['content-type'] || ''; + if (contentType.includes('application/json')) { + return true; + } + sendError(res, 400, 'Content-Type must be application/json'); + return false; +} + +async function parseJsonBody(req: http.IncomingMessage, res: http.ServerResponse): Promise { + const body = await readBody(req, MAX_BODY_SIZE); + try { + return JSON.parse(body) as T; + } catch { + sendError(res, 400, 'Invalid JSON body'); + return null; + } +} + +function resolveAgentNameOrError( + deliverer: AgentRouter, + requestedAgentName: string | undefined, + res: http.ServerResponse, +): { agentName: string | undefined; resolvedName: string } | null { + const agentNames = deliverer.getAgentNames(); + const resolvedName = requestedAgentName || agentNames[0]; + if (requestedAgentName && !agentNames.includes(requestedAgentName)) { + sendError(res, 404, `Agent not found: ${requestedAgentName}. Available: ${agentNames.join(', ')}`); + return null; + } + return { agentName: requestedAgentName, resolvedName }; +} + +async function parseWebhookChatRequest( + req: http.IncomingMessage, + res: http.ServerResponse, + apiKey: string, + deliverer: AgentRouter, +): Promise { + if (!ensureAuthorized(req, res, apiKey)) { + return null; + } + if (!ensureJsonContentType(req, res)) { + return null; } - if (!request.chatId) { - return { message: 'Missing required field: chatId', field: 'chatId' }; + const chatReq = await parseJsonBody(req, res); + if (!chatReq) { + return null; + } + if (!chatReq.message || typeof chatReq.message !== 'string') { + sendError(res, 400, 'Missing required field: message'); + return null; + } + if (chatReq.message.length > MAX_TEXT_LENGTH) { + sendError(res, 400, `Message too long (max ${MAX_TEXT_LENGTH} chars)`); + return null; } - if (!request.text) { - return { message: 'Missing required field: text', field: 'text' }; + const agent = resolveAgentNameOrError(deliverer, chatReq.agent, res); + if (!agent) { + return null; } - if (!VALID_CHANNELS.includes(request.channel as ChannelId)) { - return { message: `Invalid channel: ${request.channel}`, field: 'channel' }; - } - - if (typeof request.text !== 'string') { - return { message: 'Field "text" must be a string', field: 'text' }; - } - - if (request.text.length > MAX_TEXT_LENGTH) { - return { message: `Text too long (max ${MAX_TEXT_LENGTH} chars)`, field: 'text' }; - } - - return null; + return { + message: chatReq.message, + agentName: agent.agentName, + resolvedName: agent.resolvedName, + }; } /** @@ -795,310 +777,3 @@ function sendError(res: http.ServerResponse, status: number, message: string, fi res.writeHead(status, { 'Content-Type': 'application/json' }); res.end(JSON.stringify(response)); } - -/** - * Admin portal HTML - self-contained page for pairing approvals - */ -const portalHtml = ` - - - - -LettaBot Portal - - - -
-

LettaBot Portal

- -
- - - - -
- - -
-
- - - -`; diff --git a/src/channels/factory.ts b/src/channels/factory.ts new file mode 100644 index 0000000..1235374 --- /dev/null +++ b/src/channels/factory.ts @@ -0,0 +1,186 @@ +import { DiscordAdapter } from './discord.js'; +import { SignalAdapter } from './signal.js'; +import { SlackAdapter } from './slack.js'; +import { TelegramMTProtoAdapter } from './telegram-mtproto.js'; +import { TelegramAdapter } from './telegram.js'; +import type { ChannelAdapter } from './types.js'; +import { WhatsAppAdapter } from './whatsapp/index.js'; +import type { AgentConfig } from '../config/types.js'; +import { createLogger } from '../logger.js'; + +const log = createLogger('Config'); + +type SharedFactoryOptions = { + attachmentsDir: string; + attachmentsMaxBytes: number; +}; + +type SharedChannelBuilder = { + isEnabled: (agentConfig: AgentConfig) => boolean; + build: (agentConfig: AgentConfig, options: SharedFactoryOptions) => ChannelAdapter; +}; + +function nonEmpty(values: T[] | undefined): T[] | undefined { + return values && values.length > 0 ? values : undefined; +} + +function parseUserIds(values: Array | undefined): number[] | undefined { + const normalized = nonEmpty(values); + if (!normalized) return undefined; + return normalized.map((value) => (typeof value === 'string' ? parseInt(value, 10) : value)); +} + +const SHARED_CHANNEL_BUILDERS: SharedChannelBuilder[] = [ + { + isEnabled: (agentConfig) => !!(agentConfig.channels.slack?.botToken && agentConfig.channels.slack?.appToken), + build: (agentConfig, options) => { + const slack = agentConfig.channels.slack; + if (!slack?.botToken || !slack.appToken) { + throw new Error(`Slack is enabled for agent "${agentConfig.name}" but required tokens are missing`); + } + return new SlackAdapter({ + botToken: slack.botToken, + appToken: slack.appToken, + dmPolicy: slack.dmPolicy || 'pairing', + allowedUsers: nonEmpty(slack.allowedUsers), + streaming: slack.streaming, + attachmentsDir: options.attachmentsDir, + attachmentsMaxBytes: options.attachmentsMaxBytes, + groups: slack.groups, + agentName: agentConfig.name, + }); + }, + }, + { + isEnabled: (agentConfig) => !!agentConfig.channels.whatsapp?.enabled, + build: (agentConfig, options) => { + const selfChatMode = agentConfig.channels.whatsapp!.selfChat ?? true; + if (!selfChatMode) { + log.warn('WARNING: selfChatMode is OFF - bot will respond to ALL incoming messages!'); + log.warn('Only use this if this is a dedicated bot number, not your personal WhatsApp.'); + } + return new WhatsAppAdapter({ + sessionPath: agentConfig.channels.whatsapp!.sessionPath || process.env.WHATSAPP_SESSION_PATH || './data/whatsapp-session', + dmPolicy: agentConfig.channels.whatsapp!.dmPolicy || 'pairing', + allowedUsers: nonEmpty(agentConfig.channels.whatsapp!.allowedUsers), + selfChatMode, + attachmentsDir: options.attachmentsDir, + attachmentsMaxBytes: options.attachmentsMaxBytes, + groups: agentConfig.channels.whatsapp!.groups, + mentionPatterns: agentConfig.channels.whatsapp!.mentionPatterns, + agentName: agentConfig.name, + }); + }, + }, + { + isEnabled: (agentConfig) => !!agentConfig.channels.signal?.phone, + build: (agentConfig, options) => { + const signal = agentConfig.channels.signal; + if (!signal?.phone) { + throw new Error(`Signal is enabled for agent "${agentConfig.name}" but phone is missing`); + } + const selfChatMode = signal.selfChat ?? true; + if (!selfChatMode) { + log.warn('WARNING: selfChatMode is OFF - bot will respond to ALL incoming messages!'); + log.warn('Only use this if this is a dedicated bot number, not your personal Signal.'); + } + return new SignalAdapter({ + phoneNumber: signal.phone, + cliPath: signal.cliPath || process.env.SIGNAL_CLI_PATH || 'signal-cli', + httpHost: signal.httpHost || process.env.SIGNAL_HTTP_HOST || '127.0.0.1', + httpPort: signal.httpPort || parseInt(process.env.SIGNAL_HTTP_PORT || '8090', 10), + dmPolicy: signal.dmPolicy || 'pairing', + allowedUsers: nonEmpty(signal.allowedUsers), + selfChatMode, + attachmentsDir: options.attachmentsDir, + attachmentsMaxBytes: options.attachmentsMaxBytes, + groups: signal.groups, + mentionPatterns: signal.mentionPatterns, + agentName: agentConfig.name, + }); + }, + }, + { + isEnabled: (agentConfig) => !!agentConfig.channels.discord?.token, + build: (agentConfig, options) => { + const discord = agentConfig.channels.discord; + if (!discord?.token) { + throw new Error(`Discord is enabled for agent "${agentConfig.name}" but token is missing`); + } + return new DiscordAdapter({ + token: discord.token, + dmPolicy: discord.dmPolicy || 'pairing', + allowedUsers: nonEmpty(discord.allowedUsers), + streaming: discord.streaming, + attachmentsDir: options.attachmentsDir, + attachmentsMaxBytes: options.attachmentsMaxBytes, + groups: discord.groups, + agentName: agentConfig.name, + ignoreBotReactions: discord.ignoreBotReactions, + }); + }, + }, +]; + +/** + * Create channel adapters for an agent from its config. + * Uses a table-driven builder for shared channel setup while preserving + * Telegram-specific mutual-exclusion checks. + */ +export function createChannelsForAgent( + agentConfig: AgentConfig, + attachmentsDir: string, + attachmentsMaxBytes: number, +): ChannelAdapter[] { + const adapters: ChannelAdapter[] = []; + const sharedOptions = { attachmentsDir, attachmentsMaxBytes }; + + const hasTelegramBot = !!agentConfig.channels.telegram?.token; + const hasTelegramMtproto = !!agentConfig.channels['telegram-mtproto']?.apiId; + if (hasTelegramBot && hasTelegramMtproto) { + log.error(`Agent "${agentConfig.name}" has both telegram and telegram-mtproto configured.`); + log.error(' The Bot API adapter and MTProto adapter cannot run together.'); + log.error('Choose one: telegram (bot token) or telegram-mtproto (user account).'); + process.exit(1); + } + + if (hasTelegramBot) { + adapters.push(new TelegramAdapter({ + token: agentConfig.channels.telegram!.token!, + dmPolicy: agentConfig.channels.telegram!.dmPolicy || 'pairing', + allowedUsers: parseUserIds(agentConfig.channels.telegram!.allowedUsers), + streaming: agentConfig.channels.telegram!.streaming, + attachmentsDir, + attachmentsMaxBytes, + groups: agentConfig.channels.telegram!.groups, + mentionPatterns: agentConfig.channels.telegram!.mentionPatterns, + agentName: agentConfig.name, + })); + } + + if (hasTelegramMtproto) { + const mtprotoConfig = agentConfig.channels['telegram-mtproto']!; + if (mtprotoConfig.apiId === undefined || !mtprotoConfig.apiHash || !mtprotoConfig.phoneNumber) { + log.error(`Agent "${agentConfig.name}" has incomplete telegram-mtproto config (requires apiId, apiHash, phoneNumber).`); + process.exit(1); + } + adapters.push(new TelegramMTProtoAdapter({ + apiId: mtprotoConfig.apiId, + apiHash: mtprotoConfig.apiHash, + phoneNumber: mtprotoConfig.phoneNumber, + databaseDirectory: mtprotoConfig.databaseDirectory || './data/telegram-mtproto', + dmPolicy: mtprotoConfig.dmPolicy || 'pairing', + allowedUsers: parseUserIds(mtprotoConfig.allowedUsers), + groupPolicy: mtprotoConfig.groupPolicy || 'both', + adminChatId: mtprotoConfig.adminChatId, + })); + } + + for (const builder of SHARED_CHANNEL_BUILDERS) { + if (builder.isEnabled(agentConfig)) { + adapters.push(builder.build(agentConfig, sharedOptions)); + } + } + + return adapters; +} diff --git a/src/config/io.ts b/src/config/io.ts index 6d4cfa1..69244bf 100644 --- a/src/config/io.ts +++ b/src/config/io.ts @@ -527,6 +527,66 @@ export function applyConfigToEnv(config: LettaBotConfig): void { } } +async function listProviders(apiKey: string): Promise> { + const listResponse = await fetch(`${LETTA_API_URL}/v1/providers`, { + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${apiKey}`, + }, + }); + + if (!listResponse.ok) { + throw new Error(`Failed to list providers: ${listResponse.status} ${listResponse.statusText}`); + } + + return listResponse.json() as Promise>; +} + +/** + * Create or update a BYOK provider on Letta API. + * Returns whether the provider was created or updated. + */ +export async function upsertProvider( + apiKey: string, + provider: ProviderConfig, + knownProviders?: Array<{ id: string; name: string }>, +): Promise<'created' | 'updated'> { + const existingProviders = knownProviders ?? await listProviders(apiKey); + const existing = existingProviders.find((p) => p.name === provider.name); + + if (existing) { + const response = await fetch(`${LETTA_API_URL}/v1/providers/${existing.id}`, { + method: 'PATCH', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${apiKey}`, + }, + body: JSON.stringify({ api_key: provider.apiKey }), + }); + if (!response.ok) { + throw new Error(`Failed to update provider ${provider.name}: ${response.status} ${response.statusText}`); + } + return 'updated'; + } + + const response = await fetch(`${LETTA_API_URL}/v1/providers`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${apiKey}`, + }, + body: JSON.stringify({ + name: provider.name, + provider_type: provider.type, + api_key: provider.apiKey, + }), + }); + if (!response.ok) { + throw new Error(`Failed to create provider ${provider.name}: ${response.status} ${response.statusText}`); + } + return 'created'; +} + /** * Create BYOK providers on Letta API */ @@ -540,52 +600,15 @@ export async function syncProviders(config: Partial & Pick - : []; + // List existing providers once, then pass to each upsert call. + const existingProviders = await listProviders(apiKey).catch(() => [] as Array<{ id: string; name: string }>); // Create or update each provider for (const provider of config.providers) { - const existing = existingProviders.find(p => p.name === provider.name); - try { - if (existing) { - // Update existing - await fetch(`${baseUrl}/v1/providers/${existing.id}`, { - method: 'PATCH', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${apiKey}`, - }, - body: JSON.stringify({ api_key: provider.apiKey }), - }); - log.info(`Updated provider: ${provider.name}`); - } else { - // Create new - await fetch(`${baseUrl}/v1/providers`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${apiKey}`, - }, - body: JSON.stringify({ - name: provider.name, - provider_type: provider.type, - api_key: provider.apiKey, - }), - }); - log.info(`Created provider: ${provider.name}`); - } + const action = await upsertProvider(apiKey, provider, existingProviders); + log.info(`${action === 'updated' ? 'Updated' : 'Created'} provider: ${provider.name}`); } catch (err) { log.error(`Failed to sync provider ${provider.name}:`, err); } diff --git a/src/core/bot.ts b/src/core/bot.ts index 85abf09..7a454f4 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -40,6 +40,22 @@ const AUDIO_FILE_EXTENSIONS = new Set([ '.ogg', '.opus', '.mp3', '.m4a', '.wav', '.aac', '.flac', ]); +type StreamErrorDetail = { + message: string; + stopReason: string; + apiError?: Record; + isApprovalError?: boolean; +}; + +type ResultRetryDecision = { + isTerminalError: boolean; + isConflictError: boolean; + isApprovalConflict: boolean; + isNonRetryableError: boolean; + shouldRetryForEmptyResult: boolean; + shouldRetryForErrorResult: boolean; +}; + /** Infer whether a file is an image, audio, or generic file based on extension. */ export function inferFileKind(filePath: string): 'image' | 'file' | 'audio' { const ext = extname(filePath).toLowerCase(); @@ -874,18 +890,48 @@ export class LettaBot implements AgentSession { this.processing = false; } - // ========================================================================= - // processMessage - User-facing message handling - // ========================================================================= - - private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { - // Track timing and last target - const debugTiming = !!process.env.LETTABOT_DEBUG_TIMING; - const t0 = debugTiming ? performance.now() : 0; - const lap = (label: string) => { - log.debug(`${label}: ${(performance.now() - t0).toFixed(0)}ms`); + private buildCanUseToolCallback(msg: InboundMessage, adapter: ChannelAdapter): CanUseToolCallback { + return async (toolName, toolInput) => { + if (toolName === 'AskUserQuestion') { + const questions = (toolInput.questions || []) as Array<{ + question: string; + header: string; + options: Array<{ label: string; description: string }>; + multiSelect: boolean; + }>; + const questionText = formatQuestionsForChannel(questions); + log.info(`AskUserQuestion: sending ${questions.length} question(s) to ${msg.channel}:${msg.chatId}`); + await adapter.sendMessage({ chatId: msg.chatId, text: questionText, threadId: msg.threadId }); + + // Wait for the user's next message (intercepted by handleMessage). + // Key by convKey so each chat resolves independently in per-chat mode. + const questionConvKey = this.resolveConversationKey(msg.channel, msg.chatId); + const answer = await new Promise((resolve) => { + this.pendingQuestionResolvers.set(questionConvKey, resolve); + }); + log.info(`AskUserQuestion: received answer (${answer.length} chars)`); + + const answers: Record = {}; + for (const q of questions) { + answers[q.question] = answer; + } + return { + behavior: 'allow' as const, + updatedInput: { ...toolInput, answers }, + }; + } + + // All other interactive tools: allow by default + return { behavior: 'allow' as const }; }; - const suppressDelivery = isResponseDeliverySuppressed(msg); + } + + private async prepareMessageForRun( + msg: InboundMessage, + adapter: ChannelAdapter, + suppressDelivery: boolean, + lap: (label: string) => void, + ): Promise<{ messageToSend: SendMessage; canUseTool: CanUseToolCallback } | null> { this.lastUserMessageTime = new Date(); // Skip heartbeat target update for listening mode (don't redirect heartbeats) @@ -920,10 +966,9 @@ export class LettaBot implements AgentSession { threadId: msg.threadId, }); } - return; + return null; } - // Format message with metadata envelope const prevTarget = this.store.lastMessageTarget; const isNewChatSession = !prevTarget || prevTarget.chatId !== msg.chatId || prevTarget.channel !== msg.channel; const sessionContext: SessionContextOptions | undefined = isNewChatSession ? { @@ -937,42 +982,100 @@ export class LettaBot implements AgentSession { const messageToSend = await buildMultimodalMessage(formattedText, msg); lap('format message'); - // Build AskUserQuestion-aware canUseTool callback with channel context. - // In bypassPermissions mode, this callback is only invoked for interactive - // tools (AskUserQuestion, ExitPlanMode) -- normal tools are auto-approved. - const canUseTool: CanUseToolCallback = async (toolName, toolInput) => { - if (toolName === 'AskUserQuestion') { - const questions = (toolInput.questions || []) as Array<{ - question: string; - header: string; - options: Array<{ label: string; description: string }>; - multiSelect: boolean; - }>; - const questionText = formatQuestionsForChannel(questions); - log.info(`AskUserQuestion: sending ${questions.length} question(s) to ${msg.channel}:${msg.chatId}`); - await adapter.sendMessage({ chatId: msg.chatId, text: questionText, threadId: msg.threadId }); + const canUseTool = this.buildCanUseToolCallback(msg, adapter); + return { messageToSend, canUseTool }; + } - // Wait for the user's next message (intercepted by handleMessage). - // Key by convKey so each chat resolves independently in per-chat mode. - const questionConvKey = this.resolveConversationKey(msg.channel, msg.chatId); - const answer = await new Promise((resolve) => { - this.pendingQuestionResolvers.set(questionConvKey, resolve); - }); - log.info(`AskUserQuestion: received answer (${answer.length} chars)`); + private isNonRetryableError(lastErrorDetail: StreamErrorDetail | null, isTerminalError: boolean): boolean { + if (!isTerminalError) return false; + const errMsg = lastErrorDetail?.message?.toLowerCase() || ''; + const errApiMsg = (typeof lastErrorDetail?.apiError?.message === 'string' + ? lastErrorDetail.apiError.message : '').toLowerCase(); + const errAny = errMsg + ' ' + errApiMsg; + return ( + errAny.includes('out of credits') || errAny.includes('usage limit') || + errAny.includes('401') || errAny.includes('403') || + errAny.includes('unauthorized') || errAny.includes('forbidden') || + errAny.includes('404') || + ((errAny.includes('agent') || errAny.includes('conversation')) && errAny.includes('not found')) || + errAny.includes('rate limit') || errAny.includes('429') + ); + } - // Map the user's response to each question - const answers: Record = {}; - for (const q of questions) { - answers[q.question] = answer; - } - return { - behavior: 'allow' as const, - updatedInput: { ...toolInput, answers }, - }; - } - // All other interactive tools: allow by default - return { behavior: 'allow' as const }; + private buildResultRetryDecision( + streamMsg: StreamMsg, + resultText: string, + hasResponse: boolean, + sentAnyMessage: boolean, + lastErrorDetail: StreamErrorDetail | null, + ): ResultRetryDecision { + const isTerminalError = streamMsg.success === false || !!streamMsg.error; + const nothingDelivered = !hasResponse && !sentAnyMessage; + const isConflictError = lastErrorDetail?.message?.toLowerCase().includes('conflict') || false; + const isApprovalConflict = (isConflictError && + lastErrorDetail?.message?.toLowerCase().includes('waiting for approval')) || + lastErrorDetail?.isApprovalError === true; + const isNonRetryableError = this.isNonRetryableError(lastErrorDetail, isTerminalError); + + return { + isTerminalError, + isConflictError, + isApprovalConflict, + isNonRetryableError, + shouldRetryForEmptyResult: streamMsg.success === true && resultText === '' && nothingDelivered, + shouldRetryForErrorResult: isTerminalError && nothingDelivered && !isConflictError && !isNonRetryableError, }; + } + + private async deliverNoVisibleResponseIfNeeded( + msg: InboundMessage, + adapter: ChannelAdapter, + sentAnyMessage: boolean, + receivedAnyData: boolean, + msgTypeCounts: Record, + ): Promise { + if (sentAnyMessage) return; + + if (!receivedAnyData) { + log.error('Stream received NO DATA - possible stuck state'); + await adapter.sendMessage({ + chatId: msg.chatId, + text: '(No response received -- the connection may have dropped or the server may be busy. Please try again. If this persists, /reset will start a fresh conversation.)', + threadId: msg.threadId, + }); + return; + } + + const hadToolActivity = (msgTypeCounts['tool_call'] || 0) > 0 || (msgTypeCounts['tool_result'] || 0) > 0; + if (hadToolActivity) { + log.info('Agent had tool activity but no assistant message - likely sent via tool'); + return; + } + + await adapter.sendMessage({ + chatId: msg.chatId, + text: '(The agent processed your message but didn\'t produce a visible response. This can happen with certain prompts. Try rephrasing or sending again.)', + threadId: msg.threadId, + }); + } + + // ========================================================================= + // processMessage - User-facing message handling + // ========================================================================= + + private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise { + // Track timing and last target + const debugTiming = !!process.env.LETTABOT_DEBUG_TIMING; + const t0 = debugTiming ? performance.now() : 0; + const lap = (label: string) => { + log.debug(`${label}: ${(performance.now() - t0).toFixed(0)}ms`); + }; + const suppressDelivery = isResponseDeliverySuppressed(msg); + const prepared = await this.prepareMessageForRun(msg, adapter, suppressDelivery, lap); + if (!prepared) { + return; + } + const { messageToSend, canUseTool } = prepared; // Run session let session: Session | null = null; @@ -998,7 +1101,7 @@ export class LettaBot implements AgentSession { let sentAnyMessage = false; let receivedAnyData = false; let sawNonAssistantSinceLastUuid = false; - let lastErrorDetail: { message: string; stopReason: string; apiError?: Record; isApprovalError?: boolean } | null = null; + let lastErrorDetail: StreamErrorDetail | null = null; let retryInfo: { attempt: number; maxAttempts: number; reason: string } | null = null; let reasoningBuffer = ''; let expectedForegroundRunId: string | null = null; @@ -1388,7 +1491,6 @@ export class LettaBot implements AgentSession { } } const hasResponse = response.trim().length > 0; - const isTerminalError = streamMsg.success === false || !!streamMsg.error; log.info(`Stream result: seq=${seq} success=${streamMsg.success}, hasResponse=${hasResponse}, resultLen=${resultText.length}`); if (response.trim().length > 0) { log.debug(`Stream result preview: seq=${seq} responsePreview=${response.trim().slice(0, 60)}`); @@ -1419,7 +1521,6 @@ export class LettaBot implements AgentSession { // Only retry if we never sent anything to the user. hasResponse tracks // the current buffer, but finalizeMessage() clears it on type changes. // sentAnyMessage is the authoritative "did we deliver output" flag. - const nothingDelivered = !hasResponse && !sentAnyMessage; const retryConvKey = this.resolveConversationKey(msg.channel, msg.chatId); const retryConvIdFromStore = (retryConvKey === 'shared' ? this.store.conversationId @@ -1427,11 +1528,18 @@ export class LettaBot implements AgentSession { const retryConvId = (typeof streamMsg.conversationId === 'string' && streamMsg.conversationId.length > 0) ? streamMsg.conversationId : retryConvIdFromStore; + const initialRetryDecision = this.buildResultRetryDecision( + streamMsg, + resultText, + hasResponse, + sentAnyMessage, + lastErrorDetail, + ); // Enrich opaque error detail from run metadata (single fast API call). // The wire protocol's stop_reason often just says "error" -- the run // metadata has the actual detail (e.g. "waiting for approval on a tool call"). - if (isTerminalError && this.store.agentId && + if (initialRetryDecision.isTerminalError && this.store.agentId && (!lastErrorDetail || lastErrorDetail.message === 'Agent stopped: error')) { const enriched = await getLatestRunError(this.store.agentId, retryConvId); if (enriched) { @@ -1443,19 +1551,20 @@ export class LettaBot implements AgentSession { } } - // Don't retry on 409 CONFLICT -- the conversation is busy, retrying - // immediately will just get the same error and waste a session. - const isConflictError = lastErrorDetail?.message?.toLowerCase().includes('conflict') || false; + const retryDecision = this.buildResultRetryDecision( + streamMsg, + resultText, + hasResponse, + sentAnyMessage, + lastErrorDetail, + ); // For approval-specific conflicts, attempt recovery directly (don't // enter the generic retry path which would just get another CONFLICT). // Use isApprovalError from run metadata as a fallback when the // error message doesn't contain the expected strings (e.g. when // the type=error event was lost and enrichment detected a stuck run). - const isApprovalConflict = (isConflictError && - lastErrorDetail?.message?.toLowerCase().includes('waiting for approval')) || - lastErrorDetail?.isApprovalError === true; - if (isApprovalConflict && !retried && this.store.agentId) { + if (retryDecision.isApprovalConflict && !retried && this.store.agentId) { if (retryConvId) { log.info('Approval conflict detected -- attempting targeted recovery...'); this.sessionManager.invalidateSession(retryConvKey); @@ -1474,35 +1583,16 @@ export class LettaBot implements AgentSession { } } - // Non-retryable errors: billing, auth, not-found -- skip recovery/retry - // entirely and surface the error to the user immediately. - // Check both the top-level message and the nested apiError.message - // (the billing/auth string can appear in either location). - const errMsg = lastErrorDetail?.message?.toLowerCase() || ''; - const errApiMsg = (typeof lastErrorDetail?.apiError?.message === 'string' - ? lastErrorDetail.apiError.message : '').toLowerCase(); - const errAny = errMsg + ' ' + errApiMsg; - const isNonRetryableError = isTerminalError && ( - errAny.includes('out of credits') || errAny.includes('usage limit') || - errAny.includes('401') || errAny.includes('403') || - errAny.includes('unauthorized') || errAny.includes('forbidden') || - errAny.includes('404') || - ((errAny.includes('agent') || errAny.includes('conversation')) && errAny.includes('not found')) || - errAny.includes('rate limit') || errAny.includes('429') - ); - - const shouldRetryForEmptyResult = streamMsg.success && resultText === '' && nothingDelivered; - const shouldRetryForErrorResult = isTerminalError && nothingDelivered && !isConflictError && !isNonRetryableError; - if (shouldRetryForEmptyResult || shouldRetryForErrorResult) { - if (shouldRetryForEmptyResult) { + if (retryDecision.shouldRetryForEmptyResult || retryDecision.shouldRetryForErrorResult) { + if (retryDecision.shouldRetryForEmptyResult) { log.error(`Warning: Agent returned empty result with no response. stopReason=${streamMsg.stopReason || 'N/A'}, conv=${streamMsg.conversationId || 'N/A'}`); } - if (shouldRetryForErrorResult) { + if (retryDecision.shouldRetryForErrorResult) { log.error(`Warning: Agent returned terminal error (error=${streamMsg.error}, stopReason=${streamMsg.stopReason || 'N/A'}) with no response.`); } if (!retried && this.store.agentId && retryConvId) { - const reason = shouldRetryForErrorResult ? 'error result' : 'empty result'; + const reason = retryDecision.shouldRetryForErrorResult ? 'error result' : 'empty result'; log.info(`${reason} - attempting orphaned approval recovery...`); this.sessionManager.invalidateSession(retryConvKey); session = null; @@ -1519,14 +1609,14 @@ export class LettaBot implements AgentSession { // Some client-side approval failures do not surface as pending approvals. // Retry once anyway in case the previous run terminated mid-tool cycle. - if (shouldRetryForErrorResult) { + if (retryDecision.shouldRetryForErrorResult) { log.info('Retrying once after terminal error (no orphaned approvals detected)...'); return this.processMessage(msg, adapter, true); } } } - if (isTerminalError && !hasResponse && !sentAnyMessage) { + if (retryDecision.isTerminalError && !hasResponse && !sentAnyMessage) { if (lastErrorDetail) { response = formatApiErrorForUser(lastErrorDetail); } else { @@ -1627,28 +1717,7 @@ export class LettaBot implements AgentSession { } lap('message delivered'); - // Handle no response - if (!sentAnyMessage) { - if (!receivedAnyData) { - log.error('Stream received NO DATA - possible stuck state'); - await adapter.sendMessage({ - chatId: msg.chatId, - text: '(No response received -- the connection may have dropped or the server may be busy. Please try again. If this persists, /reset will start a fresh conversation.)', - threadId: msg.threadId - }); - } else { - const hadToolActivity = (msgTypeCounts['tool_call'] || 0) > 0 || (msgTypeCounts['tool_result'] || 0) > 0; - if (hadToolActivity) { - log.info('Agent had tool activity but no assistant message - likely sent via tool'); - } else { - await adapter.sendMessage({ - chatId: msg.chatId, - text: '(The agent processed your message but didn\'t produce a visible response. This can happen with certain prompts. Try rephrasing or sending again.)', - threadId: msg.threadId - }); - } - } - } + await this.deliverNoVisibleResponseIfNeeded(msg, adapter, sentAnyMessage, receivedAnyData, msgTypeCounts); } catch (error) { log.error('Error processing message:', error); @@ -1736,7 +1805,7 @@ export class LettaBot implements AgentSession { let sawStaleDuplicateResult = false; let approvalRetryPending = false; let usedMessageCli = false; - let lastErrorDetail: { message: string; stopReason: string; apiError?: Record; isApprovalError?: boolean } | undefined; + let lastErrorDetail: StreamErrorDetail | undefined; for await (const msg of stream()) { if (msg.type === 'tool_call') { this.sessionManager.syncTodoToolCall(msg); diff --git a/src/core/result-guard.test.ts b/src/core/result-guard.test.ts index d537f15..55dc3e3 100644 --- a/src/core/result-guard.test.ts +++ b/src/core/result-guard.test.ts @@ -300,4 +300,89 @@ describe('result divergence guard', () => { const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text); expect(sentTexts).toEqual(['main reply']); }); + + it('treats as intentional silence and does not deliver a visible message', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + (bot as any).sessionManager.runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: '' }; + yield { type: 'result', success: true, result: '' }; + }, + })); + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + await (bot as any).processMessage(msg, adapter); + + expect(adapter.sendMessage).not.toHaveBeenCalled(); + expect(adapter.editMessage).not.toHaveBeenCalled(); + }); + + it('skips all post-stream delivery when message processing is cancelled', async () => { + const bot = new LettaBot({ + workingDir: workDir, + allowedTools: [], + }); + + const adapter = { + id: 'mock', + name: 'Mock', + start: vi.fn(async () => {}), + stop: vi.fn(async () => {}), + isRunning: vi.fn(() => true), + sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })), + editMessage: vi.fn(async () => {}), + sendTypingIndicator: vi.fn(async () => {}), + stopTypingIndicator: vi.fn(async () => {}), + supportsEditing: vi.fn(() => false), + sendFile: vi.fn(async () => ({ messageId: 'file-1' })), + }; + + (bot as any).sessionManager.runSession = vi.fn(async () => ({ + session: { abort: vi.fn(async () => {}) }, + stream: async function* () { + yield { type: 'assistant', content: 'this should never be delivered' }; + yield { type: 'result', success: true, result: 'this should never be delivered' }; + }, + })); + + const msg: InboundMessage = { + channel: 'discord', + chatId: 'chat-1', + userId: 'user-1', + text: 'hello', + timestamp: new Date(), + }; + + (bot as any).cancelledKeys.add('shared'); + await (bot as any).processMessage(msg, adapter); + + expect(adapter.sendMessage).not.toHaveBeenCalled(); + expect(adapter.editMessage).not.toHaveBeenCalled(); + }); }); diff --git a/src/main.ts b/src/main.ts index c977270..91dbc16 100644 --- a/src/main.ts +++ b/src/main.ts @@ -5,9 +5,8 @@ * Chat continues seamlessly between Telegram, Slack, and WhatsApp. */ -import { existsSync, mkdirSync, readFileSync, promises as fs } from 'node:fs'; +import { existsSync, mkdirSync, promises as fs } from 'node:fs'; import { join, resolve } from 'node:path'; -import { spawn } from 'node:child_process'; // API server imports import { createApiServer } from './api/server.js'; @@ -25,11 +24,10 @@ import { serverModeLabel, wasLoadedFromFleetConfig, } from './config/index.js'; -import { isLettaApiUrl } from './utils/server.js'; import { getCronDataDir, getDataDir, getWorkingDir, hasRailwayVolume, resolveWorkingDirPath } from './utils/paths.js'; import { parseCsvList, parseNonNegativeNumber } from './utils/parse.js'; -import { sleep } from './utils/time.js'; import { createLogger, setLogLevel } from './logger.js'; +import { loadStoredAgentId, refreshTokensIfNeeded, withDiscoveryLock } from './startup/bootstrap.js'; const log = createLogger('Config'); @@ -58,122 +56,16 @@ if (process.env.DEBUG === '1' && !process.env.DEBUG_SDK) { // Sync BYOK providers on startup (async, don't block) syncProviders(yamlConfig).catch(err => log.error('Failed to sync providers:', err)); -// Load agent ID from store and set as env var (SDK needs this) -// Load agent ID from store file, or use LETTA_AGENT_ID env var as fallback const STORE_PATH = resolve(getDataDir(), 'lettabot-agent.json'); const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - -if (existsSync(STORE_PATH)) { - try { - const raw = JSON.parse(readFileSync(STORE_PATH, 'utf-8')); - - // V2 format: get first agent's ID - if (raw.version === 2 && raw.agents) { - const firstAgent = Object.values(raw.agents)[0] as any; - if (firstAgent?.agentId) { - process.env.LETTA_AGENT_ID = firstAgent.agentId; - } - // Check server mismatch on first agent - if (firstAgent?.agentId && firstAgent?.baseUrl) { - const storedUrl = firstAgent.baseUrl.replace(/\/$/, ''); - const currentUrl = currentBaseUrl.replace(/\/$/, ''); - - if (storedUrl !== currentUrl) { - log.warn(`⚠️ Server mismatch detected!`); - log.warn(` Stored agent was created on: ${storedUrl}`); - log.warn(` Current server: ${currentUrl}`); - log.warn(` The agent ${firstAgent.agentId} may not exist on this server.`); - log.warn(` Run 'lettabot onboard' to select or create an agent for this server.`); - } - } - } else if (raw.agentId) { - // V1 format (legacy) - process.env.LETTA_AGENT_ID = raw.agentId; - // Check server mismatch - if (raw.agentId && raw.baseUrl) { - const storedUrl = raw.baseUrl.replace(/\/$/, ''); - const currentUrl = currentBaseUrl.replace(/\/$/, ''); - - if (storedUrl !== currentUrl) { - log.warn(`⚠️ Server mismatch detected!`); - log.warn(` Stored agent was created on: ${storedUrl}`); - log.warn(` Current server: ${currentUrl}`); - log.warn(` The agent ${raw.agentId} may not exist on this server.`); - log.warn(` Run 'lettabot onboard' to select or create an agent for this server.`); - } - } - } - } catch {} -} -// Allow LETTA_AGENT_ID env var to override (useful for local server testing) -// This is already set if passed on command line - -// OAuth token refresh - check and refresh before loading SDK -import { loadTokens, saveTokens, isTokenExpired, hasRefreshToken, getDeviceName } from './auth/tokens.js'; -import { refreshAccessToken } from './auth/oauth.js'; - -async function refreshTokensIfNeeded(): Promise { - // If env var is set, that takes precedence (no refresh needed) - if (process.env.LETTA_API_KEY) { - return; - } - - // OAuth tokens only work with Letta API - skip if using custom server - if (!isLettaApiUrl(process.env.LETTA_BASE_URL)) { - return; - } - - const tokens = loadTokens(); - if (!tokens?.accessToken) { - return; // No stored tokens - } - - // Set access token to env var - process.env.LETTA_API_KEY = tokens.accessToken; - - // Check if token needs refresh - if (isTokenExpired(tokens) && hasRefreshToken(tokens)) { - try { - log.info('Refreshing access token...'); - const newTokens = await refreshAccessToken( - tokens.refreshToken!, - tokens.deviceId, - getDeviceName(), - ); - - // Update stored tokens - const now = Date.now(); - saveTokens({ - accessToken: newTokens.access_token, - refreshToken: newTokens.refresh_token ?? tokens.refreshToken, - tokenExpiresAt: now + newTokens.expires_in * 1000, - deviceId: tokens.deviceId, - deviceName: tokens.deviceName, - }); - - // Update env var with new token - process.env.LETTA_API_KEY = newTokens.access_token; - log.info('Token refreshed successfully'); - } catch (err) { - log.error('Failed to refresh token:', err instanceof Error ? err.message : err); - log.error('You may need to re-authenticate with `lettabot onboard`'); - } - } -} - -// Run token refresh before importing SDK (which reads LETTA_API_KEY) +loadStoredAgentId(STORE_PATH, currentBaseUrl); await refreshTokensIfNeeded(); import { normalizeAgents } from './config/types.js'; import { LettaGateway } from './core/gateway.js'; import { LettaBot } from './core/bot.js'; import type { Store } from './core/store.js'; -import { TelegramAdapter } from './channels/telegram.js'; -import { TelegramMTProtoAdapter } from './channels/telegram-mtproto.js'; -import { SlackAdapter } from './channels/slack.js'; -import { WhatsAppAdapter } from './channels/whatsapp/index.js'; -import { SignalAdapter } from './channels/signal.js'; -import { DiscordAdapter } from './channels/discord.js'; +import { createChannelsForAgent } from './channels/factory.js'; import { GroupBatcher } from './core/group-batcher.js'; import { printStartupBanner } from './core/banner.js'; import { collectGroupBatchingConfig } from './core/group-batching-config.js'; @@ -217,56 +109,6 @@ function parseHeartbeatTarget(raw?: string): { channel: string; chatId: string } const DEFAULT_ATTACHMENTS_MAX_MB = 20; const DEFAULT_ATTACHMENTS_MAX_AGE_DAYS = 14; const ATTACHMENTS_PRUNE_INTERVAL_MS = 24 * 60 * 60 * 1000; -const DISCOVERY_LOCK_TIMEOUT_MS = 15_000; -const DISCOVERY_LOCK_STALE_MS = 60_000; -const DISCOVERY_LOCK_RETRY_MS = 100; - -function getDiscoveryLockPath(agentName: string): string { - const safe = agentName - .trim() - .replace(/[^a-zA-Z0-9_-]/g, '-') - .replace(/-+/g, '-') - .replace(/^-|-$/g, '') || 'agent'; - return `${STORE_PATH}.${safe}.discover.lock`; -} - -async function withDiscoveryLock(agentName: string, fn: () => Promise): Promise { - const lockPath = getDiscoveryLockPath(agentName); - const start = Date.now(); - - while (true) { - try { - const handle = await fs.open(lockPath, 'wx'); - try { - await handle.writeFile(`${process.pid}\n`, { encoding: 'utf-8' }); - return await fn(); - } finally { - await handle.close().catch(() => {}); - await fs.rm(lockPath, { force: true }).catch(() => {}); - } - } catch (error) { - const err = error as NodeJS.ErrnoException; - if (err.code !== 'EEXIST') { - throw error; - } - - try { - const stats = await fs.stat(lockPath); - if (Date.now() - stats.mtimeMs > DISCOVERY_LOCK_STALE_MS) { - await fs.rm(lockPath, { force: true }); - continue; - } - } catch { - // Best-effort stale lock cleanup. - } - - if (Date.now() - start >= DISCOVERY_LOCK_TIMEOUT_MS) { - throw new Error(`Timed out waiting for startup discovery lock: ${lockPath}`); - } - await sleep(DISCOVERY_LOCK_RETRY_MS); - } - } -} function resolveAttachmentsMaxBytes(): number { const rawBytes = Number(process.env.ATTACHMENTS_MAX_BYTES); @@ -342,139 +184,6 @@ async function pruneAttachmentsDir(baseDir: string, maxAgeDays: number): Promise } } -/** - * Create channel adapters for an agent from its config - */ -function createChannelsForAgent( - agentConfig: import('./config/types.js').AgentConfig, - attachmentsDir: string, - attachmentsMaxBytes: number, -): import('./channels/types.js').ChannelAdapter[] { - const adapters: import('./channels/types.js').ChannelAdapter[] = []; - - // Mutual exclusion: cannot use both Telegram Bot API and MTProto simultaneously - const hasTelegramBot = !!agentConfig.channels.telegram?.token; - const hasTelegramMtproto = !!(agentConfig.channels['telegram-mtproto'] as any)?.apiId; - - if (hasTelegramBot && hasTelegramMtproto) { - log.error(`Agent "${agentConfig.name}" has both telegram and telegram-mtproto configured.`); - log.error(' The Bot API adapter and MTProto adapter cannot run together.'); - log.error('Choose one: telegram (bot token) or telegram-mtproto (user account).'); - process.exit(1); - } - - if (hasTelegramBot) { - adapters.push(new TelegramAdapter({ - token: agentConfig.channels.telegram!.token!, - dmPolicy: agentConfig.channels.telegram!.dmPolicy || 'pairing', - allowedUsers: agentConfig.channels.telegram!.allowedUsers && agentConfig.channels.telegram!.allowedUsers.length > 0 - ? agentConfig.channels.telegram!.allowedUsers.map(u => typeof u === 'string' ? parseInt(u, 10) : u) - : undefined, - streaming: agentConfig.channels.telegram!.streaming, - attachmentsDir, - attachmentsMaxBytes, - groups: agentConfig.channels.telegram!.groups, - mentionPatterns: agentConfig.channels.telegram!.mentionPatterns, - agentName: agentConfig.name, - })); - } - - if (hasTelegramMtproto) { - const mtprotoConfig = agentConfig.channels['telegram-mtproto'] as any; - adapters.push(new TelegramMTProtoAdapter({ - apiId: mtprotoConfig.apiId, - apiHash: mtprotoConfig.apiHash, - phoneNumber: mtprotoConfig.phoneNumber, - databaseDirectory: mtprotoConfig.databaseDirectory || './data/telegram-mtproto', - dmPolicy: mtprotoConfig.dmPolicy || 'pairing', - allowedUsers: mtprotoConfig.allowedUsers && mtprotoConfig.allowedUsers.length > 0 - ? mtprotoConfig.allowedUsers.map((u: string | number) => typeof u === 'string' ? parseInt(u, 10) : u) - : undefined, - groupPolicy: mtprotoConfig.groupPolicy || 'both', - adminChatId: mtprotoConfig.adminChatId, - })); - } - - if (agentConfig.channels.slack?.botToken && agentConfig.channels.slack?.appToken) { - adapters.push(new SlackAdapter({ - botToken: agentConfig.channels.slack.botToken, - appToken: agentConfig.channels.slack.appToken, - dmPolicy: agentConfig.channels.slack.dmPolicy || 'pairing', - allowedUsers: agentConfig.channels.slack.allowedUsers && agentConfig.channels.slack.allowedUsers.length > 0 - ? agentConfig.channels.slack.allowedUsers - : undefined, - streaming: agentConfig.channels.slack.streaming, - attachmentsDir, - attachmentsMaxBytes, - groups: agentConfig.channels.slack.groups, - agentName: agentConfig.name, - })); - } - - if (agentConfig.channels.whatsapp?.enabled) { - const selfChatMode = agentConfig.channels.whatsapp.selfChat ?? true; - if (!selfChatMode) { - log.warn('WARNING: selfChatMode is OFF - bot will respond to ALL incoming messages!'); - log.warn('Only use this if this is a dedicated bot number, not your personal WhatsApp.'); - } - adapters.push(new WhatsAppAdapter({ - sessionPath: agentConfig.channels.whatsapp.sessionPath || process.env.WHATSAPP_SESSION_PATH || './data/whatsapp-session', - dmPolicy: agentConfig.channels.whatsapp.dmPolicy || 'pairing', - allowedUsers: agentConfig.channels.whatsapp.allowedUsers && agentConfig.channels.whatsapp.allowedUsers.length > 0 - ? agentConfig.channels.whatsapp.allowedUsers - : undefined, - selfChatMode, - attachmentsDir, - attachmentsMaxBytes, - groups: agentConfig.channels.whatsapp.groups, - mentionPatterns: agentConfig.channels.whatsapp.mentionPatterns, - agentName: agentConfig.name, - })); - } - - if (agentConfig.channels.signal?.phone) { - const selfChatMode = agentConfig.channels.signal.selfChat ?? true; - if (!selfChatMode) { - log.warn('WARNING: selfChatMode is OFF - bot will respond to ALL incoming messages!'); - log.warn('Only use this if this is a dedicated bot number, not your personal Signal.'); - } - adapters.push(new SignalAdapter({ - phoneNumber: agentConfig.channels.signal.phone, - cliPath: agentConfig.channels.signal.cliPath || process.env.SIGNAL_CLI_PATH || 'signal-cli', - httpHost: agentConfig.channels.signal.httpHost || process.env.SIGNAL_HTTP_HOST || '127.0.0.1', - httpPort: agentConfig.channels.signal.httpPort || parseInt(process.env.SIGNAL_HTTP_PORT || '8090', 10), - dmPolicy: agentConfig.channels.signal.dmPolicy || 'pairing', - allowedUsers: agentConfig.channels.signal.allowedUsers && agentConfig.channels.signal.allowedUsers.length > 0 - ? agentConfig.channels.signal.allowedUsers - : undefined, - selfChatMode, - attachmentsDir, - attachmentsMaxBytes, - groups: agentConfig.channels.signal.groups, - mentionPatterns: agentConfig.channels.signal.mentionPatterns, - agentName: agentConfig.name, - })); - } - - if (agentConfig.channels.discord?.token) { - adapters.push(new DiscordAdapter({ - token: agentConfig.channels.discord.token, - dmPolicy: agentConfig.channels.discord.dmPolicy || 'pairing', - allowedUsers: agentConfig.channels.discord.allowedUsers && agentConfig.channels.discord.allowedUsers.length > 0 - ? agentConfig.channels.discord.allowedUsers - : undefined, - streaming: agentConfig.channels.discord.streaming, - attachmentsDir, - attachmentsMaxBytes, - groups: agentConfig.channels.discord.groups, - agentName: agentConfig.name, - ignoreBotReactions: agentConfig.channels.discord.ignoreBotReactions, - })); - } - - return adapters; -} - /** * Create and configure a group batcher for an agent */ @@ -661,7 +370,7 @@ async function main() { // Fleet configs rely on pre-created agents from lettactl apply. if (!initialStatus.agentId && (isContainerDeploy || wasLoadedFromFleetConfig())) { try { - await withDiscoveryLock(agentConfig.name, async () => { + await withDiscoveryLock(STORE_PATH, agentConfig.name, async () => { // Re-read status after lock acquisition in case another instance already set it. initialStatus = bot.getStatus(); if (initialStatus.agentId) return; diff --git a/src/onboard.projection.test.ts b/src/onboard.projection.test.ts new file mode 100644 index 0000000..af08e19 --- /dev/null +++ b/src/onboard.projection.test.ts @@ -0,0 +1,195 @@ +import { describe, expect, it } from 'vitest'; +import { + applyOnboardEnvProjection, + buildProjectedAgentConfig, + toProjectionInputFromNonInteractiveConfig, + toProjectionInputFromOnboardConfig, +} from './onboard.js'; + +describe('onboarding projection helpers', () => { + it('produces equivalent agent projection for non-interactive and interactive paths', () => { + const nonInteractive = { + agentName: 'LettaBot', + agentId: 'agent-123', + telegram: { + enabled: true, + botToken: 'tg-token', + dmPolicy: 'allowlist' as const, + allowedUsers: ['111', '222'], + groupDebounceSec: 8, + groupPollIntervalMin: 2, + instantGroups: ['-1001'], + listeningGroups: ['-1002'], + }, + slack: { + enabled: true, + appToken: 'xapp-1', + botToken: 'xoxb-1', + allowedUsers: ['U1'], + groupDebounceSec: 6, + groupPollIntervalMin: 3, + instantGroups: ['C1'], + listeningGroups: ['C2'], + }, + discord: { + enabled: true, + botToken: 'discord-token', + dmPolicy: 'pairing' as const, + allowedUsers: ['user-a'], + groupDebounceSec: 5, + groupPollIntervalMin: 4, + instantGroups: ['g1'], + listeningGroups: ['g2'], + }, + whatsapp: { + enabled: true, + selfChat: false, + dmPolicy: 'open' as const, + allowedUsers: ['+1555'], + groupDebounceSec: 10, + groupPollIntervalMin: 7, + instantGroups: ['wa1'], + listeningGroups: ['wa2'], + }, + signal: { + enabled: true, + phoneNumber: '+15551234567', + selfChat: true, + dmPolicy: 'allowlist' as const, + allowedUsers: ['+15559876543'], + groupDebounceSec: 9, + groupPollIntervalMin: 6, + instantGroups: ['sg1'], + listeningGroups: ['sg2'], + }, + }; + + const interactive: any = { + agentName: 'LettaBot', + agentId: 'agent-123', + telegram: { + enabled: true, + token: 'tg-token', + dmPolicy: 'allowlist', + allowedUsers: ['111', '222'], + groupDebounceSec: 8, + groupPollIntervalMin: 2, + instantGroups: ['-1001'], + listeningGroups: ['-1002'], + }, + slack: { + enabled: true, + appToken: 'xapp-1', + botToken: 'xoxb-1', + allowedUsers: ['U1'], + groupDebounceSec: 6, + groupPollIntervalMin: 3, + instantGroups: ['C1'], + listeningGroups: ['C2'], + }, + discord: { + enabled: true, + token: 'discord-token', + dmPolicy: 'pairing', + allowedUsers: ['user-a'], + groupDebounceSec: 5, + groupPollIntervalMin: 4, + instantGroups: ['g1'], + listeningGroups: ['g2'], + }, + whatsapp: { + enabled: true, + selfChat: false, + dmPolicy: 'open', + allowedUsers: ['+1555'], + groupDebounceSec: 10, + groupPollIntervalMin: 7, + instantGroups: ['wa1'], + listeningGroups: ['wa2'], + }, + signal: { + enabled: true, + phone: '+15551234567', + selfChat: true, + dmPolicy: 'allowlist', + allowedUsers: ['+15559876543'], + groupDebounceSec: 9, + groupPollIntervalMin: 6, + instantGroups: ['sg1'], + listeningGroups: ['sg2'], + }, + cron: false, + heartbeat: { enabled: false, interval: '60' }, + google: { enabled: false, accounts: [] }, + }; + + const fromEnv = buildProjectedAgentConfig( + toProjectionInputFromNonInteractiveConfig(nonInteractive), + ); + const fromInteractive = buildProjectedAgentConfig( + toProjectionInputFromOnboardConfig(interactive), + ); + + expect(fromInteractive).toEqual(fromEnv); + }); + + it('applies env projection and clears stale channel keys when disabled', () => { + const env: Record = { + TELEGRAM_BOT_TOKEN: 'old-tg', + TELEGRAM_ALLOWED_USERS: 'old', + DISCORD_BOT_TOKEN: 'old-discord', + WHATSAPP_ENABLED: 'true', + WHATSAPP_SELF_CHAT_MODE: 'true', + SIGNAL_PHONE_NUMBER: '+1-old', + SIGNAL_SELF_CHAT_MODE: 'false', + HEARTBEAT_INTERVAL_MIN: '30', + CRON_ENABLED: 'true', + }; + + const config: any = { + agentName: 'ProjectedBot', + telegram: { + enabled: true, + token: 'new-tg', + dmPolicy: 'pairing', + allowedUsers: ['123', '456'], + }, + slack: { enabled: false }, + discord: { enabled: false }, + whatsapp: { + enabled: true, + selfChat: false, + dmPolicy: 'allowlist', + allowedUsers: ['+1444'], + }, + signal: { + enabled: true, + phone: '+1777', + selfChat: false, + dmPolicy: 'open', + allowedUsers: ['+1888'], + }, + heartbeat: { enabled: false }, + cron: false, + transcription: { + enabled: true, + provider: 'mistral', + apiKey: 'mistral-key', + }, + }; + + applyOnboardEnvProjection(config, env); + + expect(env.AGENT_NAME).toBe('ProjectedBot'); + expect(env.TELEGRAM_BOT_TOKEN).toBe('new-tg'); + expect(env.TELEGRAM_ALLOWED_USERS).toBe('123,456'); + expect(env.DISCORD_BOT_TOKEN).toBeUndefined(); + expect(env.WHATSAPP_ENABLED).toBe('true'); + expect(env.WHATSAPP_SELF_CHAT_MODE).toBeUndefined(); + expect(env.SIGNAL_PHONE_NUMBER).toBe('+1777'); + expect(env.SIGNAL_SELF_CHAT_MODE).toBe('false'); + expect(env.HEARTBEAT_INTERVAL_MIN).toBeUndefined(); + expect(env.CRON_ENABLED).toBeUndefined(); + expect(env.MISTRAL_API_KEY).toBe('mistral-key'); + }); +}); diff --git a/src/onboard.ts b/src/onboard.ts index 23a3a23..4948ec3 100644 --- a/src/onboard.ts +++ b/src/onboard.ts @@ -6,7 +6,7 @@ import { existsSync, readFileSync, writeFileSync } from 'node:fs'; import { resolve } from 'node:path'; import { spawnSync } from 'node:child_process'; import * as p from '@clack/prompts'; -import { saveConfig, syncProviders, isApiServerMode } from './config/index.js'; +import { saveConfig, syncProviders, upsertProvider, isApiServerMode } from './config/index.js'; import type { AgentConfig, LettaBotConfig } from './config/types.js'; import { isLettaApiUrl } from './utils/server.js'; import { parseCsvList, parseOptionalInt } from './utils/parse.js'; @@ -114,6 +114,9 @@ async function saveConfigFromEnv(config: any, configPath: string, existingConfig // Resolve API server config from existing config (server.api is canonical, top-level api is fallback) const existingApiConfig = existingConfig?.server?.api ?? existingConfig?.api; + const projectedAgent = buildProjectedAgentConfig( + toProjectionInputFromNonInteractiveConfig(config), + ); const lettabotConfig: Partial & Pick = { server: { @@ -122,80 +125,7 @@ async function saveConfigFromEnv(config: any, configPath: string, existingConfig apiKey: config.apiKey, ...(existingApiConfig ? { api: existingApiConfig } : {}), }, - agents: [{ - name: config.agentName, - ...(config.agentId ? { id: config.agentId } : {}), - channels: { - ...(config.telegram.enabled ? { - telegram: { - enabled: true, - token: config.telegram.botToken, - dmPolicy: config.telegram.dmPolicy, - allowedUsers: config.telegram.allowedUsers, - groupDebounceSec: config.telegram.groupDebounceSec, - groupPollIntervalMin: config.telegram.groupPollIntervalMin, - instantGroups: config.telegram.instantGroups, - listeningGroups: config.telegram.listeningGroups, - } - } : {}), - ...(config.slack.enabled ? { - slack: { - enabled: true, - botToken: config.slack.botToken, - appToken: config.slack.appToken, - allowedUsers: config.slack.allowedUsers, - groupDebounceSec: config.slack.groupDebounceSec, - groupPollIntervalMin: config.slack.groupPollIntervalMin, - instantGroups: config.slack.instantGroups, - listeningGroups: config.slack.listeningGroups, - } - } : {}), - ...(config.discord.enabled ? { - discord: { - enabled: true, - token: config.discord.botToken, - dmPolicy: config.discord.dmPolicy, - allowedUsers: config.discord.allowedUsers, - groupDebounceSec: config.discord.groupDebounceSec, - groupPollIntervalMin: config.discord.groupPollIntervalMin, - instantGroups: config.discord.instantGroups, - listeningGroups: config.discord.listeningGroups, - } - } : {}), - ...(config.whatsapp.enabled ? { - whatsapp: { - enabled: true, - selfChat: config.whatsapp.selfChat, - dmPolicy: config.whatsapp.dmPolicy, - allowedUsers: config.whatsapp.allowedUsers, - groupDebounceSec: config.whatsapp.groupDebounceSec, - groupPollIntervalMin: config.whatsapp.groupPollIntervalMin, - instantGroups: config.whatsapp.instantGroups, - listeningGroups: config.whatsapp.listeningGroups, - } - } : {}), - ...(config.signal.enabled ? { - signal: { - enabled: true, - phone: config.signal.phoneNumber, - selfChat: config.signal.selfChat, - dmPolicy: config.signal.dmPolicy, - allowedUsers: config.signal.allowedUsers, - groupDebounceSec: config.signal.groupDebounceSec, - groupPollIntervalMin: config.signal.groupPollIntervalMin, - instantGroups: config.signal.instantGroups, - listeningGroups: config.signal.listeningGroups, - } - } : {}), - }, - features: { - cron: false, - heartbeat: { - enabled: false, - intervalMin: 60, - }, - }, - }], + agents: [projectedAgent], // Preserve unmanaged top-level fields from existing config ...(existingConfig?.attachments ? { attachments: existingConfig.attachments } : {}), }; @@ -290,6 +220,418 @@ interface OnboardConfig { transcription: { enabled: boolean; provider?: 'openai' | 'mistral'; apiKey?: string; model?: string }; } +type NonInteractiveProjectionSource = { + agentName: string; + agentId?: string; + telegram: { + enabled: boolean; + botToken?: string; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + slack: { + enabled: boolean; + appToken?: string; + botToken?: string; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + discord: { + enabled: boolean; + botToken?: string; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + whatsapp: { + enabled: boolean; + selfChat?: boolean; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + signal: { + enabled: boolean; + phoneNumber?: string; + selfChat?: boolean; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; +}; + +export type AgentProjectionInput = { + name: string; + id?: string; + telegram: { + enabled: boolean; + token?: string; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + slack: { + enabled: boolean; + appToken?: string; + botToken?: string; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + discord: { + enabled: boolean; + token?: string; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + whatsapp: { + enabled: boolean; + selfChat?: boolean; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + signal: { + enabled: boolean; + phone?: string; + selfChat?: boolean; + dmPolicy?: 'pairing' | 'allowlist' | 'open'; + allowedUsers?: string[]; + groupDebounceSec?: number; + groupPollIntervalMin?: number; + instantGroups?: string[]; + listeningGroups?: string[]; + }; + cronEnabled: boolean; + heartbeat: { enabled: boolean; intervalMin?: number }; + google: { enabled: boolean; accounts: Array<{ account: string; services: string[] }> }; +}; + +export function toProjectionInputFromNonInteractiveConfig(config: NonInteractiveProjectionSource): AgentProjectionInput { + return { + name: config.agentName, + id: config.agentId, + telegram: { + enabled: config.telegram.enabled, + token: config.telegram.botToken, + dmPolicy: config.telegram.dmPolicy, + allowedUsers: config.telegram.allowedUsers, + groupDebounceSec: config.telegram.groupDebounceSec, + groupPollIntervalMin: config.telegram.groupPollIntervalMin, + instantGroups: config.telegram.instantGroups, + listeningGroups: config.telegram.listeningGroups, + }, + slack: { + enabled: config.slack.enabled, + appToken: config.slack.appToken, + botToken: config.slack.botToken, + allowedUsers: config.slack.allowedUsers, + groupDebounceSec: config.slack.groupDebounceSec, + groupPollIntervalMin: config.slack.groupPollIntervalMin, + instantGroups: config.slack.instantGroups, + listeningGroups: config.slack.listeningGroups, + }, + discord: { + enabled: config.discord.enabled, + token: config.discord.botToken, + dmPolicy: config.discord.dmPolicy, + allowedUsers: config.discord.allowedUsers, + groupDebounceSec: config.discord.groupDebounceSec, + groupPollIntervalMin: config.discord.groupPollIntervalMin, + instantGroups: config.discord.instantGroups, + listeningGroups: config.discord.listeningGroups, + }, + whatsapp: { + enabled: config.whatsapp.enabled, + selfChat: config.whatsapp.selfChat, + dmPolicy: config.whatsapp.dmPolicy, + allowedUsers: config.whatsapp.allowedUsers, + groupDebounceSec: config.whatsapp.groupDebounceSec, + groupPollIntervalMin: config.whatsapp.groupPollIntervalMin, + instantGroups: config.whatsapp.instantGroups, + listeningGroups: config.whatsapp.listeningGroups, + }, + signal: { + enabled: config.signal.enabled, + phone: config.signal.phoneNumber, + selfChat: config.signal.selfChat, + dmPolicy: config.signal.dmPolicy, + allowedUsers: config.signal.allowedUsers, + groupDebounceSec: config.signal.groupDebounceSec, + groupPollIntervalMin: config.signal.groupPollIntervalMin, + instantGroups: config.signal.instantGroups, + listeningGroups: config.signal.listeningGroups, + }, + cronEnabled: false, + heartbeat: { enabled: false, intervalMin: 60 }, + google: { enabled: false, accounts: [] }, + }; +} + +export function toProjectionInputFromOnboardConfig(config: OnboardConfig): AgentProjectionInput { + return { + name: config.agentName || 'LettaBot', + id: config.agentId, + telegram: { + enabled: config.telegram.enabled, + token: config.telegram.token, + dmPolicy: config.telegram.dmPolicy, + allowedUsers: config.telegram.allowedUsers, + groupDebounceSec: config.telegram.groupDebounceSec, + groupPollIntervalMin: config.telegram.groupPollIntervalMin, + instantGroups: config.telegram.instantGroups, + listeningGroups: config.telegram.listeningGroups, + }, + slack: { + enabled: config.slack.enabled, + appToken: config.slack.appToken, + botToken: config.slack.botToken, + allowedUsers: config.slack.allowedUsers, + groupDebounceSec: config.slack.groupDebounceSec, + groupPollIntervalMin: config.slack.groupPollIntervalMin, + instantGroups: config.slack.instantGroups, + listeningGroups: config.slack.listeningGroups, + }, + discord: { + enabled: config.discord.enabled, + token: config.discord.token, + dmPolicy: config.discord.dmPolicy, + allowedUsers: config.discord.allowedUsers, + groupDebounceSec: config.discord.groupDebounceSec, + groupPollIntervalMin: config.discord.groupPollIntervalMin, + instantGroups: config.discord.instantGroups, + listeningGroups: config.discord.listeningGroups, + }, + whatsapp: { + enabled: config.whatsapp.enabled, + selfChat: config.whatsapp.selfChat, + dmPolicy: config.whatsapp.dmPolicy, + allowedUsers: config.whatsapp.allowedUsers, + groupDebounceSec: config.whatsapp.groupDebounceSec, + groupPollIntervalMin: config.whatsapp.groupPollIntervalMin, + instantGroups: config.whatsapp.instantGroups, + listeningGroups: config.whatsapp.listeningGroups, + }, + signal: { + enabled: config.signal.enabled, + phone: config.signal.phone, + selfChat: config.signal.selfChat, + dmPolicy: config.signal.dmPolicy, + allowedUsers: config.signal.allowedUsers, + groupDebounceSec: config.signal.groupDebounceSec, + groupPollIntervalMin: config.signal.groupPollIntervalMin, + instantGroups: config.signal.instantGroups, + listeningGroups: config.signal.listeningGroups, + }, + cronEnabled: config.cron, + heartbeat: { + enabled: config.heartbeat.enabled, + intervalMin: config.heartbeat.interval ? parseInt(config.heartbeat.interval, 10) : undefined, + }, + google: config.google, + }; +} + +export function buildProjectedAgentConfig(input: AgentProjectionInput): AgentConfig { + return { + name: input.name, + ...(input.id ? { id: input.id } : {}), + channels: { + ...(input.telegram.enabled ? { + telegram: { + enabled: true, + token: input.telegram.token, + dmPolicy: input.telegram.dmPolicy, + allowedUsers: input.telegram.allowedUsers, + groupDebounceSec: input.telegram.groupDebounceSec, + groupPollIntervalMin: input.telegram.groupPollIntervalMin, + instantGroups: input.telegram.instantGroups, + listeningGroups: input.telegram.listeningGroups, + } + } : {}), + ...(input.slack.enabled ? { + slack: { + enabled: true, + appToken: input.slack.appToken, + botToken: input.slack.botToken, + allowedUsers: input.slack.allowedUsers, + groupDebounceSec: input.slack.groupDebounceSec, + groupPollIntervalMin: input.slack.groupPollIntervalMin, + instantGroups: input.slack.instantGroups, + listeningGroups: input.slack.listeningGroups, + } + } : {}), + ...(input.discord.enabled ? { + discord: { + enabled: true, + token: input.discord.token, + dmPolicy: input.discord.dmPolicy, + allowedUsers: input.discord.allowedUsers, + groupDebounceSec: input.discord.groupDebounceSec, + groupPollIntervalMin: input.discord.groupPollIntervalMin, + instantGroups: input.discord.instantGroups, + listeningGroups: input.discord.listeningGroups, + } + } : {}), + ...(input.whatsapp.enabled ? { + whatsapp: { + enabled: true, + selfChat: input.whatsapp.selfChat, + dmPolicy: input.whatsapp.dmPolicy, + allowedUsers: input.whatsapp.allowedUsers, + groupDebounceSec: input.whatsapp.groupDebounceSec, + groupPollIntervalMin: input.whatsapp.groupPollIntervalMin, + instantGroups: input.whatsapp.instantGroups, + listeningGroups: input.whatsapp.listeningGroups, + } + } : {}), + ...(input.signal.enabled ? { + signal: { + enabled: true, + phone: input.signal.phone, + selfChat: input.signal.selfChat, + dmPolicy: input.signal.dmPolicy, + allowedUsers: input.signal.allowedUsers, + groupDebounceSec: input.signal.groupDebounceSec, + groupPollIntervalMin: input.signal.groupPollIntervalMin, + instantGroups: input.signal.instantGroups, + listeningGroups: input.signal.listeningGroups, + } + } : {}), + }, + features: { + cron: input.cronEnabled, + heartbeat: { + enabled: input.heartbeat.enabled, + intervalMin: input.heartbeat.intervalMin, + }, + }, + ...(input.google.enabled ? { + integrations: { + google: { + enabled: true, + accounts: input.google.accounts, + }, + }, + ...((() => { + const gmailAccounts = input.google.accounts + .filter(a => a.services?.includes('gmail')) + .map(a => a.account); + return gmailAccounts.length > 0 ? { + polling: { gmail: { accounts: gmailAccounts } }, + } : {}; + })()), + } : {}), + }; +} + +export function applyOnboardEnvProjection(config: OnboardConfig, env: Record): void { + if (config.agentName) env.AGENT_NAME = config.agentName; + + if (config.telegram.enabled && config.telegram.token) { + env.TELEGRAM_BOT_TOKEN = config.telegram.token; + if (config.telegram.dmPolicy) env.TELEGRAM_DM_POLICY = config.telegram.dmPolicy; + if (config.telegram.allowedUsers?.length) env.TELEGRAM_ALLOWED_USERS = config.telegram.allowedUsers.join(','); + else delete env.TELEGRAM_ALLOWED_USERS; + } else { + delete env.TELEGRAM_BOT_TOKEN; + delete env.TELEGRAM_DM_POLICY; + delete env.TELEGRAM_ALLOWED_USERS; + } + + if (config.slack.enabled) { + if (config.slack.appToken) env.SLACK_APP_TOKEN = config.slack.appToken; + if (config.slack.botToken) env.SLACK_BOT_TOKEN = config.slack.botToken; + if (config.slack.allowedUsers?.length) env.SLACK_ALLOWED_USERS = config.slack.allowedUsers.join(','); + else delete env.SLACK_ALLOWED_USERS; + } else { + delete env.SLACK_APP_TOKEN; + delete env.SLACK_BOT_TOKEN; + delete env.SLACK_ALLOWED_USERS; + } + + if (config.discord.enabled && config.discord.token) { + env.DISCORD_BOT_TOKEN = config.discord.token; + if (config.discord.dmPolicy) env.DISCORD_DM_POLICY = config.discord.dmPolicy; + if (config.discord.allowedUsers?.length) env.DISCORD_ALLOWED_USERS = config.discord.allowedUsers.join(','); + else delete env.DISCORD_ALLOWED_USERS; + } else { + delete env.DISCORD_BOT_TOKEN; + delete env.DISCORD_DM_POLICY; + delete env.DISCORD_ALLOWED_USERS; + } + + if (config.whatsapp.enabled) { + env.WHATSAPP_ENABLED = 'true'; + if (config.whatsapp.selfChat) env.WHATSAPP_SELF_CHAT_MODE = 'true'; + else delete env.WHATSAPP_SELF_CHAT_MODE; + if (config.whatsapp.dmPolicy) env.WHATSAPP_DM_POLICY = config.whatsapp.dmPolicy; + if (config.whatsapp.allowedUsers?.length) env.WHATSAPP_ALLOWED_USERS = config.whatsapp.allowedUsers.join(','); + else delete env.WHATSAPP_ALLOWED_USERS; + } else { + delete env.WHATSAPP_ENABLED; + delete env.WHATSAPP_SELF_CHAT_MODE; + delete env.WHATSAPP_DM_POLICY; + delete env.WHATSAPP_ALLOWED_USERS; + } + + if (config.signal.enabled && config.signal.phone) { + env.SIGNAL_PHONE_NUMBER = config.signal.phone; + if (config.signal.selfChat === false) env.SIGNAL_SELF_CHAT_MODE = 'false'; + else delete env.SIGNAL_SELF_CHAT_MODE; + if (config.signal.dmPolicy) env.SIGNAL_DM_POLICY = config.signal.dmPolicy; + if (config.signal.allowedUsers?.length) env.SIGNAL_ALLOWED_USERS = config.signal.allowedUsers.join(','); + else delete env.SIGNAL_ALLOWED_USERS; + } else { + delete env.SIGNAL_PHONE_NUMBER; + delete env.SIGNAL_SELF_CHAT_MODE; + delete env.SIGNAL_DM_POLICY; + delete env.SIGNAL_ALLOWED_USERS; + } + + if (config.heartbeat.enabled && config.heartbeat.interval) { + env.HEARTBEAT_INTERVAL_MIN = config.heartbeat.interval; + } else { + delete env.HEARTBEAT_INTERVAL_MIN; + } + + if (config.cron) env.CRON_ENABLED = 'true'; + else delete env.CRON_ENABLED; + + if (config.transcription.enabled && config.transcription.apiKey) { + if (config.transcription.provider === 'mistral') env.MISTRAL_API_KEY = config.transcription.apiKey; + else env.OPENAI_API_KEY = config.transcription.apiKey; + } +} + const isPlaceholder = (val?: string) => !val || /^(your_|sk-\.\.\.|placeholder|example)/i.test(val); const isDockerAuthMethod = (method: OnboardConfig['authMethod']) => method === 'docker' || method === 'selfhosted'; @@ -632,76 +974,40 @@ async function stepProviders(config: OnboardConfig, env: Record) if (p.isCancel(providerKey)) { p.cancel('Setup cancelled'); process.exit(0); } if (providerKey) { - // Create or update provider via Letta API const spinner = p.spinner(); spinner.start(`Connecting ${provider.displayName}...`); try { - // First check if provider already exists - const listResponse = await fetch('https://api.letta.com/v1/providers', { - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${apiKey}`, - }, - }); - - let existingProvider: { id: string; name: string } | undefined; - if (listResponse.ok) { - const providers = await listResponse.json() as Array<{ id: string; name: string }>; - existingProvider = providers.find(p => p.name === provider.name); + if (!apiKey) { + spinner.stop('Missing Letta API key'); + continue; } - - let response: Response; - if (existingProvider) { - // Update existing provider - response = await fetch(`https://api.letta.com/v1/providers/${existingProvider.id}`, { - method: 'PATCH', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${apiKey}`, - }, - body: JSON.stringify({ - api_key: providerKey, - }), - }); - } else { - // Create new provider - response = await fetch('https://api.letta.com/v1/providers', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${apiKey}`, - }, - body: JSON.stringify({ - name: provider.name, - provider_type: provider.providerType, - api_key: providerKey, - }), - }); - } - - if (response.ok) { - spinner.stop(`Connected ${provider.displayName}`); - providersById.set(provider.id, { id: provider.id, name: provider.name, apiKey: providerKey }); - // If OpenAI was just connected, offer to enable voice transcription - if (provider.id === 'openai') { - const enableTranscription = await p.confirm({ - message: 'Enable voice message transcription with this OpenAI key? (uses Whisper)', - initialValue: true, - }); - if (!p.isCancel(enableTranscription) && enableTranscription) { - config.transcription.enabled = true; - config.transcription.provider = 'openai'; - config.transcription.apiKey = providerKey; - } + await upsertProvider(apiKey, { + id: provider.id, + name: provider.name, + type: provider.providerType, + apiKey: providerKey, + }); + + spinner.stop(`Connected ${provider.displayName}`); + providersById.set(provider.id, { id: provider.id, name: provider.name, apiKey: providerKey }); + + // If OpenAI was just connected, offer to enable voice transcription + if (provider.id === 'openai') { + const enableTranscription = await p.confirm({ + message: 'Enable voice message transcription with this OpenAI key? (uses Whisper)', + initialValue: true, + }); + if (!p.isCancel(enableTranscription) && enableTranscription) { + config.transcription.enabled = true; + config.transcription.provider = 'openai'; + config.transcription.apiKey = providerKey; } - } else { - const error = await response.text(); - spinner.stop(`Failed to connect ${provider.displayName}: ${error}`); } } catch (err) { - spinner.stop(`Failed to connect ${provider.displayName}`); + const detail = err instanceof Error ? `: ${err.message}` : ''; + spinner.stop(`Failed to connect ${provider.displayName}${detail}`); } } } @@ -1623,105 +1929,8 @@ export async function onboard(options?: { nonInteractive?: boolean }): Promise { @@ -1757,96 +1966,9 @@ export async function onboard(options?: { nonInteractive?: boolean }): Promise { - const gmailAccounts = config.google.accounts - .filter(a => a.services?.includes('gmail')) - .map(a => a.account); - return gmailAccounts.length > 0 ? { - polling: { gmail: { accounts: gmailAccounts } }, - } : {}; - })()), - } : {}), - }; + const agentConfig: AgentConfig = buildProjectedAgentConfig( + toProjectionInputFromOnboardConfig(config), + ); // Convert to YAML config (multi-agent format) // Resolve API server config from existing config (server.api is canonical, top-level api is fallback) diff --git a/src/startup/bootstrap.ts b/src/startup/bootstrap.ts new file mode 100644 index 0000000..41a3141 --- /dev/null +++ b/src/startup/bootstrap.ts @@ -0,0 +1,172 @@ +import { existsSync, readFileSync, promises as fs } from 'node:fs'; +import { hasRefreshToken, isTokenExpired, getDeviceName, loadTokens, saveTokens } from '../auth/tokens.js'; +import { refreshAccessToken } from '../auth/oauth.js'; +import { isLettaApiUrl } from '../utils/server.js'; +import { sleep } from '../utils/time.js'; +import { createLogger } from '../logger.js'; + +const log = createLogger('Config'); + +const DISCOVERY_LOCK_TIMEOUT_MS = 15_000; +const DISCOVERY_LOCK_STALE_MS = 60_000; +const DISCOVERY_LOCK_RETRY_MS = 100; + +function warnServerMismatch(storedUrl: string, currentUrl: string, agentId: string): void { + if (storedUrl === currentUrl) return; + log.warn('⚠️ Server mismatch detected!'); + log.warn(` Stored agent was created on: ${storedUrl}`); + log.warn(` Current server: ${currentUrl}`); + log.warn(` The agent ${agentId} may not exist on this server.`); + log.warn(` Run 'lettabot onboard' to select or create an agent for this server.`); +} + +function normalizeUrl(url: string): string { + return url.replace(/\/$/, ''); +} + +/** + * Best-effort load of stored agent ID into LETTA_AGENT_ID. + * Handles both v1 and v2 store shapes and warns on base URL mismatch. + */ +export function loadStoredAgentId(storePath: string, currentBaseUrl: string): void { + if (!existsSync(storePath)) return; + + try { + const raw = JSON.parse(readFileSync(storePath, 'utf-8')) as { + version?: number; + agentId?: string; + baseUrl?: string; + agents?: Record; + }; + + // V2 format + if (raw.version === 2 && raw.agents) { + const firstAgent = Object.values(raw.agents)[0]; + if (!firstAgent?.agentId) return; + process.env.LETTA_AGENT_ID = firstAgent.agentId; + if (!firstAgent.baseUrl) return; + warnServerMismatch( + normalizeUrl(firstAgent.baseUrl), + normalizeUrl(currentBaseUrl), + firstAgent.agentId, + ); + return; + } + + // V1 format (legacy) + if (!raw.agentId) return; + process.env.LETTA_AGENT_ID = raw.agentId; + if (!raw.baseUrl) return; + warnServerMismatch( + normalizeUrl(raw.baseUrl), + normalizeUrl(currentBaseUrl), + raw.agentId, + ); + } catch { + // Best-effort load; ignore malformed store files. + } +} + +/** + * Refresh OAuth tokens (if needed) before loading SDK modules that read LETTA_API_KEY. + */ +export async function refreshTokensIfNeeded(): Promise { + // Explicit API key always wins. + if (process.env.LETTA_API_KEY) { + return; + } + + // OAuth only applies to Letta API endpoints. + if (!isLettaApiUrl(process.env.LETTA_BASE_URL)) { + return; + } + + const tokens = loadTokens(); + if (!tokens?.accessToken) { + return; + } + + process.env.LETTA_API_KEY = tokens.accessToken; + + if (!isTokenExpired(tokens) || !hasRefreshToken(tokens)) { + return; + } + + try { + log.info('Refreshing access token...'); + const newTokens = await refreshAccessToken( + tokens.refreshToken!, + tokens.deviceId, + getDeviceName(), + ); + + const now = Date.now(); + saveTokens({ + accessToken: newTokens.access_token, + refreshToken: newTokens.refresh_token ?? tokens.refreshToken, + tokenExpiresAt: now + newTokens.expires_in * 1000, + deviceId: tokens.deviceId, + deviceName: tokens.deviceName, + }); + + process.env.LETTA_API_KEY = newTokens.access_token; + log.info('Token refreshed successfully'); + } catch (err) { + log.error('Failed to refresh token:', err instanceof Error ? err.message : err); + log.error('You may need to re-authenticate with `lettabot onboard`'); + } +} + +function getDiscoveryLockPath(storePath: string, agentName: string): string { + const safe = agentName + .trim() + .replace(/[^a-zA-Z0-9_-]/g, '-') + .replace(/-+/g, '-') + .replace(/^-|-$/g, '') || 'agent'; + return `${storePath}.${safe}.discover.lock`; +} + +/** + * Inter-process lock to avoid startup races when discovering agents by name. + */ +export async function withDiscoveryLock( + storePath: string, + agentName: string, + fn: () => Promise, +): Promise { + const lockPath = getDiscoveryLockPath(storePath, agentName); + const start = Date.now(); + + while (true) { + try { + const handle = await fs.open(lockPath, 'wx'); + try { + await handle.writeFile(`${process.pid}\n`, { encoding: 'utf-8' }); + return await fn(); + } finally { + await handle.close().catch(() => {}); + await fs.rm(lockPath, { force: true }).catch(() => {}); + } + } catch (error) { + const err = error as NodeJS.ErrnoException; + if (err.code !== 'EEXIST') { + throw error; + } + + try { + const stats = await fs.stat(lockPath); + if (Date.now() - stats.mtimeMs > DISCOVERY_LOCK_STALE_MS) { + await fs.rm(lockPath, { force: true }); + continue; + } + } catch { + // Best-effort stale lock cleanup. + } + + if (Date.now() - start >= DISCOVERY_LOCK_TIMEOUT_MS) { + throw new Error(`Timed out waiting for startup discovery lock: ${lockPath}`); + } + await sleep(DISCOVERY_LOCK_RETRY_MS); + } + } +}