fix(cron): prevent cross-agent job leakage from global store migration (#588)
Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
@@ -6,7 +6,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, renameSync, watch, type FSWatcher } from 'node:fs';
|
import { existsSync, readFileSync, writeFileSync, appendFileSync, mkdirSync, copyFileSync, renameSync, watch, type FSWatcher } from 'node:fs';
|
||||||
import { resolve, dirname } from 'node:path';
|
import { resolve, dirname, basename } from 'node:path';
|
||||||
import type { AgentSession } from '../core/interfaces.js';
|
import type { AgentSession } from '../core/interfaces.js';
|
||||||
import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js';
|
import type { CronJob, CronJobCreate, CronSchedule, CronConfig, HeartbeatConfig } from './types.js';
|
||||||
import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js';
|
import { DEFAULT_HEARTBEAT_MESSAGES } from './types.js';
|
||||||
@@ -101,11 +101,38 @@ export class CronService {
|
|||||||
if (globalPath === this.storePath || !existsSync(globalPath)) return;
|
if (globalPath === this.storePath || !existsSync(globalPath)) return;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
mkdirSync(dirname(this.storePath), { recursive: true });
|
// Filter jobs: only adopt those whose delivery channels exist on this agent
|
||||||
copyFileSync(globalPath, this.storePath);
|
const agentChannels = new Set(this.bot.getStatus().channels);
|
||||||
|
const globalData: CronStoreFile = JSON.parse(readFileSync(globalPath, 'utf-8'));
|
||||||
|
const adopted: typeof globalData.jobs = [];
|
||||||
|
const skipped: string[] = [];
|
||||||
|
|
||||||
|
for (const job of globalData.jobs) {
|
||||||
|
const deliverChannel = job.deliver?.channel;
|
||||||
|
if (deliverChannel && !agentChannels.has(deliverChannel)) {
|
||||||
|
skipped.push(`${job.id} (channel=${deliverChannel})`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
adopted.push(job);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipped.length > 0) {
|
||||||
|
log.warn(`Global store migration: skipped ${skipped.length} job(s) with unavailable channels: ${skipped.join(', ')}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (adopted.length > 0) {
|
||||||
|
mkdirSync(dirname(this.storePath), { recursive: true });
|
||||||
|
writeFileSync(this.storePath, JSON.stringify({ ...globalData, jobs: adopted }, null, 2));
|
||||||
|
logEvent('store_migrated_from_global', {
|
||||||
|
from: globalPath,
|
||||||
|
to: this.storePath,
|
||||||
|
adopted: adopted.length,
|
||||||
|
skipped: skipped.length,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// Rename global file so subsequent agents don't also copy it
|
// Rename global file so subsequent agents don't also copy it
|
||||||
renameSync(globalPath, globalPath + '.migrated');
|
renameSync(globalPath, globalPath + '.migrated');
|
||||||
logEvent('store_migrated_from_global', { from: globalPath, to: this.storePath });
|
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
log.error('Failed to migrate from global cron store:', e);
|
log.error('Failed to migrate from global cron store:', e);
|
||||||
}
|
}
|
||||||
@@ -224,8 +251,9 @@ export class CronService {
|
|||||||
} catch {
|
} catch {
|
||||||
// File might not exist yet, watch the directory instead
|
// File might not exist yet, watch the directory instead
|
||||||
const dir = resolve(this.storePath, '..');
|
const dir = resolve(this.storePath, '..');
|
||||||
|
const storeBasename = basename(this.storePath);
|
||||||
this.fileWatcher = watch(dir, { persistent: false }, (eventType, filename) => {
|
this.fileWatcher = watch(dir, { persistent: false }, (eventType, filename) => {
|
||||||
if (filename === 'cron-jobs.json') {
|
if (filename === storeBasename) {
|
||||||
this.handleFileChange();
|
this.handleFileChange();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user