fix(main): restore upstream code accidentally dropped in matrix-adapter commit

The 18010eb commit deleted several upstream features from main.ts:
- resolveSessionMemfs and sleeptime/memfs integration
- parseOptionalBoolean, parseHeartbeatSkipRecentPolicy helpers
- heartbeat skip-recent policy/fraction/interrupt config
- duplicate agent name and turnLogFile validation
- turn log file collection for API server
- logging and blueskyEnabled bot config

Restores all upstream code while preserving our Matrix additions:
parseHeartbeatTarget first-colon fix, MatrixAdapter callback wiring,
heartbeatTargetChatId, sessionModel, API key propagation, Olm crash fix.
This commit is contained in:
Ani
2026-03-16 14:02:37 -04:00
parent 9a972c30a2
commit 7cd7bd3720

View File

@@ -1,6 +1,6 @@
/** /**
* LettaBot - Multi-Channel AI Assistant * LettaBot - Multi-Channel AI Assistant
* *
* Single agent, single conversation across all channels. * Single agent, single conversation across all channels.
* Chat continues seamlessly between Telegram, Slack, and WhatsApp. * Chat continues seamlessly between Telegram, Slack, and WhatsApp.
*/ */
@@ -24,6 +24,7 @@ import {
serverModeLabel, serverModeLabel,
wasLoadedFromFleetConfig, wasLoadedFromFleetConfig,
} from './config/index.js'; } from './config/index.js';
import { resolveSessionMemfs } from './config/memfs.js';
import { getCronDataDir, getDataDir, getWorkingDir, hasRailwayVolume, resolveWorkingDirPath } from './utils/paths.js'; import { getCronDataDir, getDataDir, getWorkingDir, hasRailwayVolume, resolveWorkingDirPath } from './utils/paths.js';
import { parseCsvList, parseNonNegativeNumber } from './utils/parse.js'; import { parseCsvList, parseNonNegativeNumber } from './utils/parse.js';
import { createLogger, setLogLevel } from './logger.js'; import { createLogger, setLogLevel } from './logger.js';
@@ -222,6 +223,19 @@ function ensureRequiredTools(tools: string[]): string[] {
return out; return out;
} }
function parseOptionalBoolean(raw?: string): boolean | undefined {
if (raw === 'true') return true;
if (raw === 'false') return false;
return undefined;
}
function parseHeartbeatSkipRecentPolicy(raw?: string): 'fixed' | 'fraction' | 'off' | undefined {
if (raw === 'fixed' || raw === 'fraction' || raw === 'off') {
return raw;
}
return undefined;
}
// Global config (shared across all agents) // Global config (shared across all agents)
const globalConfig = { const globalConfig = {
workingDir: getWorkingDir(), workingDir: getWorkingDir(),
@@ -236,6 +250,9 @@ const globalConfig = {
attachmentsMaxAgeDays: resolveAttachmentsMaxAgeDays(), attachmentsMaxAgeDays: resolveAttachmentsMaxAgeDays(),
cronEnabled: process.env.CRON_ENABLED === 'true', // Legacy env var fallback cronEnabled: process.env.CRON_ENABLED === 'true', // Legacy env var fallback
heartbeatSkipRecentUserMin: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN), heartbeatSkipRecentUserMin: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN),
heartbeatSkipRecentPolicy: parseHeartbeatSkipRecentPolicy(process.env.HEARTBEAT_SKIP_RECENT_POLICY),
heartbeatSkipRecentFraction: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_FRACTION),
heartbeatInterruptOnUserMessage: parseOptionalBoolean(process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE),
}; };
// Validate LETTA_API_KEY is set for API mode (docker mode doesn't require it) // Validate LETTA_API_KEY is set for API mode (docker mode doesn't require it)
@@ -248,7 +265,7 @@ if (!isDockerServerMode(yamlConfig.server.mode) && !process.env.LETTA_API_KEY) {
async function main() { async function main() {
log.info('Starting LettaBot...'); log.info('Starting LettaBot...');
// Log storage locations (helpful for Railway debugging) // Log storage locations (helpful for Railway debugging)
const dataDir = getDataDir(); const dataDir = getDataDir();
if (hasRailwayVolume()) { if (hasRailwayVolume()) {
@@ -257,12 +274,31 @@ async function main() {
log.info(`Data directory: ${dataDir}`); log.info(`Data directory: ${dataDir}`);
log.info(`Working directory: ${globalConfig.workingDir}`); log.info(`Working directory: ${globalConfig.workingDir}`);
process.env.LETTABOT_WORKING_DIR = globalConfig.workingDir; process.env.LETTABOT_WORKING_DIR = globalConfig.workingDir;
// Normalize config to agents array // Normalize config to agents array
const agents = normalizeAgents(yamlConfig); const agents = normalizeAgents(yamlConfig);
const isMultiAgent = agents.length > 1; const isMultiAgent = agents.length > 1;
log.info(`${agents.length} agent(s) configured: ${agents.map(a => a.name).join(', ')}`); log.info(`${agents.length} agent(s) configured: ${agents.map(a => a.name).join(', ')}`);
// Validate agent names are unique
const agentNames = agents.map(a => a.name);
const duplicateAgentName = agentNames.find((n, i) => agentNames.indexOf(n) !== i);
if (duplicateAgentName) {
log.error(`Multiple agents share the same name: "${duplicateAgentName}". Each agent must have a unique name.`);
process.exit(1);
}
// Validate no two agents share the same turnLogFile
const turnLogFilePaths = agents
.map(a => (a.features?.logging ?? yamlConfig.features?.logging)?.turnLogFile)
.filter((p): p is string => !!p)
.map(p => resolve(p));
const duplicateTurnLog = turnLogFilePaths.find((p, i) => turnLogFilePaths.indexOf(p) !== i);
if (duplicateTurnLog) {
log.error(`Multiple agents share the same turnLogFile: "${duplicateTurnLog}". Each agent must use a unique log file path.`);
process.exit(1);
}
// Validate at least one agent has channels // Validate at least one agent has channels
const totalChannels = agents.reduce((sum, a) => sum + Object.keys(a.channels).length, 0); const totalChannels = agents.reduce((sum, a) => sum + Object.keys(a.channels).length, 0);
if (totalChannels === 0) { if (totalChannels === 0) {
@@ -283,31 +319,46 @@ async function main() {
}, ATTACHMENTS_PRUNE_INTERVAL_MS); }, ATTACHMENTS_PRUNE_INTERVAL_MS);
timer.unref?.(); timer.unref?.();
} }
const gateway = new LettaGateway(); const gateway = new LettaGateway();
const agentStores = new Map<string, Store>(); const agentStores = new Map<string, Store>();
const sessionInvalidators = new Map<string, (key?: string) => void>(); const sessionInvalidators = new Map<string, (key?: string) => void>();
const agentChannelMap = new Map<string, string[]>(); const agentChannelMap = new Map<string, string[]>();
const voiceMemoEnabled = isVoiceMemoConfigured(); const voiceMemoEnabled = isVoiceMemoConfigured();
const services: { const services: {
cronServices: CronService[], cronServices: CronService[],
heartbeatServices: HeartbeatService[], heartbeatServices: HeartbeatService[],
pollingServices: PollingService[], pollingServices: PollingService[],
groupBatchers: GroupBatcher[] groupBatchers: GroupBatcher[]
} = { } = {
cronServices: [], cronServices: [],
heartbeatServices: [], heartbeatServices: [],
pollingServices: [], pollingServices: [],
groupBatchers: [], groupBatchers: [],
}; };
for (const agentConfig of agents) { for (const agentConfig of agents) {
log.info(`Configuring agent: ${agentConfig.name}`); log.info(`Configuring agent: ${agentConfig.name}`);
// Resolve memfs: YAML config takes precedence, then env var, then default false. const resolvedMemfsResult = resolveSessionMemfs({
// Default false prevents the SDK from auto-enabling memfs, which crashes on configuredMemfs: agentConfig.features?.memfs,
// self-hosted Letta servers that don't have the git endpoint. envMemfs: process.env.LETTABOT_MEMFS,
const resolvedMemfs = agentConfig.features?.memfs ?? (process.env.LETTABOT_MEMFS === 'true' ? true : false); serverMode: yamlConfig.server.mode,
});
const resolvedMemfs = resolvedMemfsResult.value;
const configuredSleeptime = agentConfig.features?.sleeptime;
// Treat missing trigger as active (conservative): only `trigger: 'off'` explicitly disables.
const sleeptimeRequiresMemfs = !!configuredSleeptime && configuredSleeptime.trigger !== 'off';
const effectiveSleeptime = resolvedMemfs === false && sleeptimeRequiresMemfs
? undefined
: configuredSleeptime;
if (resolvedMemfs === false && sleeptimeRequiresMemfs) {
log.warn(
`Agent ${agentConfig.name}: sleeptime is configured but memfs is disabled; ` +
`sleeptime will be ignored. Enable features.memfs (or set LETTABOT_MEMFS=true) to use sleeptime.`
);
}
// Create LettaBot for this agent // Create LettaBot for this agent
const resolvedWorkingDir = agentConfig.workingDir const resolvedWorkingDir = agentConfig.workingDir
@@ -320,6 +371,7 @@ async function main() {
const cronStorePath = cronStoreFilename const cronStorePath = cronStoreFilename
? resolve(getCronDataDir(), cronStoreFilename) ? resolve(getCronDataDir(), cronStoreFilename)
: undefined; : undefined;
const heartbeatConfig = agentConfig.features?.heartbeat;
const bot = new LettaBot({ const bot = new LettaBot({
workingDir: resolvedWorkingDir, workingDir: resolvedWorkingDir,
@@ -332,27 +384,40 @@ async function main() {
sendFileMaxSize: agentConfig.features?.sendFileMaxSize, sendFileMaxSize: agentConfig.features?.sendFileMaxSize,
sendFileCleanup: agentConfig.features?.sendFileCleanup, sendFileCleanup: agentConfig.features?.sendFileCleanup,
memfs: resolvedMemfs, memfs: resolvedMemfs,
sleeptime: effectiveSleeptime,
display: agentConfig.features?.display, display: agentConfig.features?.display,
conversationMode: agentConfig.conversations?.mode || 'shared', conversationMode: agentConfig.conversations?.mode || 'shared',
heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active', heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active',
heartbeatTargetChatId: parseHeartbeatTarget(agentConfig.features?.heartbeat?.target)?.chatId, heartbeatTargetChatId: parseHeartbeatTarget(heartbeatConfig?.target)?.chatId,
interruptHeartbeatOnUserMessage:
heartbeatConfig?.interruptOnUserMessage
?? globalConfig.heartbeatInterruptOnUserMessage
?? true,
conversationOverrides: agentConfig.conversations?.perChannel, conversationOverrides: agentConfig.conversations?.perChannel,
maxSessions: agentConfig.conversations?.maxSessions, maxSessions: agentConfig.conversations?.maxSessions,
reuseSession: agentConfig.conversations?.reuseSession, reuseSession: agentConfig.conversations?.reuseSession,
sessionModel: agentConfig.conversations?.sessionModel, sessionModel: agentConfig.conversations?.sessionModel,
redaction: agentConfig.security?.redaction, redaction: agentConfig.security?.redaction,
logging: agentConfig.features?.logging ?? yamlConfig.features?.logging,
cronStorePath, cronStorePath,
skills: { skills: {
cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled, cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled,
googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled, googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,
blueskyEnabled: !!agentConfig.channels?.bluesky?.enabled,
ttsEnabled: voiceMemoEnabled, ttsEnabled: voiceMemoEnabled,
}, },
}); });
// Log memfs config (from either YAML or env var) // Log memfs config (from either YAML or env var)
if (resolvedMemfs !== undefined) { if (resolvedMemfs !== undefined) {
const source = agentConfig.features?.memfs !== undefined ? '' : ' (from LETTABOT_MEMFS env)'; const source = resolvedMemfsResult.source === 'config'
? ''
: resolvedMemfsResult.source === 'env'
? ' (from LETTABOT_MEMFS env)'
: ' (default for docker/selfhosted mode)';
log.info(`Agent ${agentConfig.name}: memfs ${resolvedMemfs ? 'enabled' : 'disabled'}${source}`); log.info(`Agent ${agentConfig.name}: memfs ${resolvedMemfs ? 'enabled' : 'disabled'}${source}`);
} else {
log.info(`Agent ${agentConfig.name}: memfs unchanged (not explicitly configured)`);
} }
// Apply explicit agent ID from config (before store verification) // Apply explicit agent ID from config (before store verification)
@@ -363,7 +428,7 @@ async function main() {
bot.setAgentId(agentConfig.id); bot.setAgentId(agentConfig.id);
initialStatus = bot.getStatus(); initialStatus = bot.getStatus();
} }
// Verify agent exists (clear stale ID if deleted) // Verify agent exists (clear stale ID if deleted)
if (initialStatus.agentId) { if (initialStatus.agentId) {
const exists = await agentExists(initialStatus.agentId); const exists = await agentExists(initialStatus.agentId);
@@ -401,7 +466,7 @@ async function main() {
if (!initialStatus.agentId) { if (!initialStatus.agentId) {
log.info(`No agent found - will create on first message`); log.info(`No agent found - will create on first message`);
} }
// Disable tool approvals // Disable tool approvals
if (initialStatus.agentId) { if (initialStatus.agentId) {
ensureNoToolApprovals(initialStatus.agentId).catch(err => { ensureNoToolApprovals(initialStatus.agentId).catch(err => {
@@ -433,12 +498,14 @@ async function main() {
} }
// Per-agent heartbeat // Per-agent heartbeat
const heartbeatConfig = agentConfig.features?.heartbeat;
const heartbeatService = new HeartbeatService(bot, { const heartbeatService = new HeartbeatService(bot, {
enabled: heartbeatConfig?.enabled ?? false, enabled: heartbeatConfig?.enabled ?? false,
intervalMinutes: heartbeatConfig?.intervalMin ?? 240, intervalMinutes: heartbeatConfig?.intervalMin ?? 240,
skipRecentUserMinutes: heartbeatConfig?.skipRecentUserMin ?? globalConfig.heartbeatSkipRecentUserMin, skipRecentUserMinutes: heartbeatConfig?.skipRecentUserMin ?? globalConfig.heartbeatSkipRecentUserMin,
skipRecentPolicy: heartbeatConfig?.skipRecentPolicy ?? globalConfig.heartbeatSkipRecentPolicy,
skipRecentFraction: heartbeatConfig?.skipRecentFraction ?? globalConfig.heartbeatSkipRecentFraction,
agentKey: agentConfig.name, agentKey: agentConfig.name,
memfs: resolvedMemfs,
prompt: heartbeatConfig?.prompt || process.env.HEARTBEAT_PROMPT, prompt: heartbeatConfig?.prompt || process.env.HEARTBEAT_PROMPT,
promptFile: heartbeatConfig?.promptFile, promptFile: heartbeatConfig?.promptFile,
workingDir: resolvedWorkingDir, workingDir: resolvedWorkingDir,
@@ -450,7 +517,7 @@ async function main() {
} }
bot.onTriggerHeartbeat = () => heartbeatService.trigger(); bot.onTriggerHeartbeat = () => heartbeatService.trigger();
// Wire Matrix adapter callbacks (heartbeat toggle, !timeout, !new, agent ID query) // Wire Matrix adapter callbacks (heartbeat toggle, !timeout)
const matrixAdapter = adapters.find(a => a.id === 'matrix') as MatrixAdapter | undefined; const matrixAdapter = adapters.find(a => a.id === 'matrix') as MatrixAdapter | undefined;
if (matrixAdapter) { if (matrixAdapter) {
matrixAdapter.onHeartbeatStop = () => heartbeatService.stop(); matrixAdapter.onHeartbeatStop = () => heartbeatService.stop();
@@ -458,7 +525,7 @@ async function main() {
// Best-effort: stops the timer so no new runs fire; running promise times out on its own // Best-effort: stops the timer so no new runs fire; running promise times out on its own
matrixAdapter.onTimeoutHeartbeat = () => { heartbeatService.stop(); log.warn('Matrix !timeout: heartbeat stopped (abort not yet supported)'); }; matrixAdapter.onTimeoutHeartbeat = () => { heartbeatService.stop(); log.warn('Matrix !timeout: heartbeat stopped (abort not yet supported)'); };
} }
// Per-agent polling -- resolve accounts from polling > integrations.google (legacy) > env // Per-agent polling -- resolve accounts from polling > integrations.google (legacy) > env
const pollConfig = (() => { const pollConfig = (() => {
const pollingAccounts = parseGmailAccounts( const pollingAccounts = parseGmailAccounts(
@@ -494,7 +561,7 @@ async function main() {
}, },
}; };
})(); })();
if (pollConfig.enabled && pollConfig.gmail.enabled && pollConfig.gmail.accounts.length > 0) { if (pollConfig.enabled && pollConfig.gmail.enabled && pollConfig.gmail.accounts.length > 0) {
const pollingService = new PollingService(bot, { const pollingService = new PollingService(bot, {
intervalMs: pollConfig.intervalMs, intervalMs: pollConfig.intervalMs,
@@ -504,13 +571,13 @@ async function main() {
pollingService.start(); pollingService.start();
services.pollingServices.push(pollingService); services.pollingServices.push(pollingService);
} }
gateway.addAgent(agentConfig.name, bot); gateway.addAgent(agentConfig.name, bot);
agentStores.set(agentConfig.name, bot.store); agentStores.set(agentConfig.name, bot.store);
sessionInvalidators.set(agentConfig.name, (key) => bot.invalidateSession(key)); sessionInvalidators.set(agentConfig.name, (key) => bot.invalidateSession(key));
agentChannelMap.set(agentConfig.name, adapters.map(a => a.id)); agentChannelMap.set(agentConfig.name, adapters.map(a => a.id));
} }
// Load/generate API key BEFORE gateway.start() so letta.js subprocesses inherit it. // Load/generate API key BEFORE gateway.start() so letta.js subprocesses inherit it.
// The lettabot-message CLI needs LETTABOT_API_KEY to route through the bot's HTTP API for E2EE. // The lettabot-message CLI needs LETTABOT_API_KEY to route through the bot's HTTP API for E2EE.
const apiKey = loadOrGenerateApiKey(); const apiKey = loadOrGenerateApiKey();
@@ -534,16 +601,22 @@ async function main() {
const apiPort = parseInt(process.env.PORT || '8080', 10); const apiPort = parseInt(process.env.PORT || '8080', 10);
const apiHost = process.env.API_HOST || (isContainerDeploy ? '0.0.0.0' : undefined); // Container platforms need 0.0.0.0 for health checks const apiHost = process.env.API_HOST || (isContainerDeploy ? '0.0.0.0' : undefined); // Container platforms need 0.0.0.0 for health checks
const apiCorsOrigin = process.env.API_CORS_ORIGIN; // undefined = same-origin only const apiCorsOrigin = process.env.API_CORS_ORIGIN; // undefined = same-origin only
const turnLogFiles: Record<string, string> = {};
for (const a of agents) {
const logging = a.features?.logging ?? yamlConfig.features?.logging;
if (logging?.turnLogFile) turnLogFiles[a.name] = logging.turnLogFile;
}
const apiServer = createApiServer(gateway, { const apiServer = createApiServer(gateway, {
port: apiPort, port: apiPort,
apiKey: apiKey, apiKey: apiKey,
host: apiHost, host: apiHost,
corsOrigin: apiCorsOrigin, corsOrigin: apiCorsOrigin,
turnLogFiles: Object.keys(turnLogFiles).length > 0 ? turnLogFiles : undefined,
stores: agentStores, stores: agentStores,
agentChannels: agentChannelMap, agentChannels: agentChannelMap,
sessionInvalidators, sessionInvalidators,
}); });
// Startup banner // Startup banner
const bannerAgents = gateway.getAgentNames().map(name => { const bannerAgents = gateway.getAgentNames().map(name => {
const agent = gateway.getAgent(name)!; const agent = gateway.getAgent(name)!;
@@ -564,7 +637,7 @@ async function main() {
if (!process.env.LETTABOT_NO_BANNER) { if (!process.env.LETTABOT_NO_BANNER) {
printStartupBanner(bannerAgents); printStartupBanner(bannerAgents);
} }
// Shutdown // Shutdown
const shutdown = async () => { const shutdown = async () => {
log.info('Shutting down...'); log.info('Shutting down...');