fix: scope cron jobs to individual agents in multi-agent mode (#432)
This commit is contained in:
@@ -980,6 +980,11 @@ export class LettaBot implements AgentSession {
|
|||||||
? this.store.conversationId
|
? this.store.conversationId
|
||||||
: this.store.getConversationId(key);
|
: this.store.getConversationId(key);
|
||||||
|
|
||||||
|
// Propagate per-agent cron store path to CLI subprocesses (lettabot-schedule)
|
||||||
|
if (this.config.cronStorePath) {
|
||||||
|
process.env.CRON_STORE_PATH = this.config.cronStorePath;
|
||||||
|
}
|
||||||
|
|
||||||
if (convId) {
|
if (convId) {
|
||||||
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
|
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
|
||||||
if (this.store.agentId) {
|
if (this.store.agentId) {
|
||||||
|
|||||||
@@ -155,6 +155,9 @@ export interface BotConfig {
|
|||||||
sendFileMaxSize?: number; // Max file size in bytes for <send-file> (default: 50MB)
|
sendFileMaxSize?: number; // Max file size in bytes for <send-file> (default: 50MB)
|
||||||
sendFileCleanup?: boolean; // Allow <send-file cleanup="true"> to delete files after send (default: false)
|
sendFileCleanup?: boolean; // Allow <send-file cleanup="true"> to delete files after send (default: false)
|
||||||
|
|
||||||
|
// Cron
|
||||||
|
cronStorePath?: string; // Resolved cron store path (per-agent in multi-agent mode)
|
||||||
|
|
||||||
// Conversation routing
|
// Conversation routing
|
||||||
conversationMode?: 'shared' | 'per-channel' | 'per-chat'; // Default: shared
|
conversationMode?: 'shared' | 'per-channel' | 'per-chat'; // Default: shared
|
||||||
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
|
||||||
|
|||||||
@@ -55,8 +55,8 @@ interface CronStore {
|
|||||||
jobs: CronJob[];
|
jobs: CronJob[];
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store path
|
// Store path (CRON_STORE_PATH env var set by bot.ts for per-agent scoping in multi-agent mode)
|
||||||
const STORE_PATH = getCronStorePath();
|
const STORE_PATH = process.env.CRON_STORE_PATH || getCronStorePath();
|
||||||
const LOG_PATH = getCronLogPath();
|
const LOG_PATH = getCronLogPath();
|
||||||
|
|
||||||
function migrateLegacyStoreIfNeeded(): void {
|
function migrateLegacyStoreIfNeeded(): void {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@
|
|||||||
* Supports heartbeat check-ins and agent-managed cron jobs.
|
* Supports heartbeat check-ins and agent-managed cron jobs.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, watch, type FSWatcher } from 'node:fs';
|
import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, renameSync, watch, type FSWatcher } from 'node:fs';
|
||||||
import { resolve, dirname } from 'node:path';
|
import { resolve, dirname } from 'node:path';
|
||||||
import type { AgentSession } from '../core/interfaces.js';
|
import type { AgentSession } from '../core/interfaces.js';
|
||||||
import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js';
|
import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js';
|
||||||
@@ -67,6 +67,7 @@ export class CronService {
|
|||||||
? resolve(getCronDataDir(), config.storePath)
|
? resolve(getCronDataDir(), config.storePath)
|
||||||
: getCronStorePath();
|
: getCronStorePath();
|
||||||
this.migrateLegacyStoreIfNeeded();
|
this.migrateLegacyStoreIfNeeded();
|
||||||
|
this.migrateFromGlobalStoreIfNeeded();
|
||||||
this.loadJobs();
|
this.loadJobs();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,6 +87,29 @@ export class CronService {
|
|||||||
log.error('Failed to migrate legacy store:', e);
|
log.error('Failed to migrate legacy store:', e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Multi-agent upgrade: if this agent has a per-agent storePath but the file
|
||||||
|
* doesn't exist yet, copy from the global cron-jobs.json as a starting point.
|
||||||
|
* Only the first agent to start gets the migration (others start empty).
|
||||||
|
*/
|
||||||
|
private migrateFromGlobalStoreIfNeeded(): void {
|
||||||
|
if (!this.config.storePath) return; // Not in multi-agent mode
|
||||||
|
if (existsSync(this.storePath)) return; // Already has own store
|
||||||
|
|
||||||
|
const globalPath = getCronStorePath();
|
||||||
|
if (globalPath === this.storePath || !existsSync(globalPath)) return;
|
||||||
|
|
||||||
|
try {
|
||||||
|
mkdirSync(dirname(this.storePath), { recursive: true });
|
||||||
|
copyFileSync(globalPath, this.storePath);
|
||||||
|
// Rename global file so subsequent agents don't also copy it
|
||||||
|
renameSync(globalPath, globalPath + '.migrated');
|
||||||
|
logEvent('store_migrated_from_global', { from: globalPath, to: this.storePath });
|
||||||
|
} catch (e) {
|
||||||
|
log.error('Failed to migrate from global cron store:', e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private loadJobs(): void {
|
private loadJobs(): void {
|
||||||
try {
|
try {
|
||||||
|
|||||||
13
src/main.ts
13
src/main.ts
@@ -23,7 +23,7 @@ import {
|
|||||||
serverModeLabel,
|
serverModeLabel,
|
||||||
} from './config/index.js';
|
} from './config/index.js';
|
||||||
import { isLettaApiUrl } from './utils/server.js';
|
import { isLettaApiUrl } from './utils/server.js';
|
||||||
import { getDataDir, getWorkingDir, hasRailwayVolume, resolveWorkingDirPath } from './utils/paths.js';
|
import { getCronDataDir, getDataDir, getWorkingDir, hasRailwayVolume, resolveWorkingDirPath } from './utils/paths.js';
|
||||||
import { parseCsvList, parseNonNegativeNumber } from './utils/parse.js';
|
import { parseCsvList, parseNonNegativeNumber } from './utils/parse.js';
|
||||||
import { sleep } from './utils/time.js';
|
import { sleep } from './utils/time.js';
|
||||||
import { createLogger, setLogLevel } from './logger.js';
|
import { createLogger, setLogLevel } from './logger.js';
|
||||||
@@ -579,6 +579,14 @@ async function main() {
|
|||||||
const resolvedWorkingDir = agentConfig.workingDir
|
const resolvedWorkingDir = agentConfig.workingDir
|
||||||
? resolveWorkingDirPath(agentConfig.workingDir)
|
? resolveWorkingDirPath(agentConfig.workingDir)
|
||||||
: globalConfig.workingDir;
|
: globalConfig.workingDir;
|
||||||
|
// Per-agent cron store path: in multi-agent mode, each agent gets its own file
|
||||||
|
const cronStoreFilename = agents.length > 1
|
||||||
|
? `cron-jobs-${agentConfig.name}.json`
|
||||||
|
: undefined;
|
||||||
|
const cronStorePath = cronStoreFilename
|
||||||
|
? resolve(getCronDataDir(), cronStoreFilename)
|
||||||
|
: undefined;
|
||||||
|
|
||||||
const bot = new LettaBot({
|
const bot = new LettaBot({
|
||||||
workingDir: resolvedWorkingDir,
|
workingDir: resolvedWorkingDir,
|
||||||
agentName: agentConfig.name,
|
agentName: agentConfig.name,
|
||||||
@@ -596,6 +604,7 @@ async function main() {
|
|||||||
conversationOverrides: agentConfig.conversations?.perChannel,
|
conversationOverrides: agentConfig.conversations?.perChannel,
|
||||||
maxSessions: agentConfig.conversations?.maxSessions,
|
maxSessions: agentConfig.conversations?.maxSessions,
|
||||||
redaction: agentConfig.security?.redaction,
|
redaction: agentConfig.security?.redaction,
|
||||||
|
cronStorePath,
|
||||||
skills: {
|
skills: {
|
||||||
cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled,
|
cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled,
|
||||||
googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,
|
googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,
|
||||||
@@ -679,7 +688,7 @@ async function main() {
|
|||||||
|
|
||||||
// Per-agent cron
|
// Per-agent cron
|
||||||
if (agentConfig.features?.cron ?? globalConfig.cronEnabled) {
|
if (agentConfig.features?.cron ?? globalConfig.cronEnabled) {
|
||||||
const cronService = new CronService(bot);
|
const cronService = new CronService(bot, cronStoreFilename ? { storePath: cronStoreFilename } : undefined);
|
||||||
await cronService.start();
|
await cronService.start();
|
||||||
services.cronServices.push(cronService);
|
services.cronServices.push(cronService);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user