diff --git a/docs/configuration.md b/docs/configuration.md index 562b38d..def7ad0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -535,12 +535,20 @@ features: heartbeat: enabled: true intervalMin: 60 # Check every 60 minutes - skipRecentUserMin: 5 # Skip auto-heartbeats for N minutes after user message (0 disables) + skipRecentPolicy: fraction # fixed | fraction | off + skipRecentFraction: 0.5 # Used when policy=fraction (0-1) + # skipRecentUserMin: 5 # Used when policy=fixed (0 disables) + interruptOnUserMessage: true # Cancel in-flight heartbeat when user messages arrive ``` Heartbeats are background tasks where the agent can review pending work. -If the user messaged recently, automatic heartbeats are skipped by default for 5 minutes (`skipRecentUserMin`). -Set this to `0` to disable skipping. Manual `/heartbeat` bypasses the skip check. +By default, automatic heartbeats skip for half of the heartbeat interval (`skipRecentPolicy: fraction` with `skipRecentFraction: 0.5`). +- `fixed`: use `skipRecentUserMin`. +- `fraction`: use `ceil(intervalMin * skipRecentFraction)`. +- `off`: disable recent-user skipping. + +`interruptOnUserMessage` defaults to `true`, so live user messages cancel in-flight heartbeat runs on the same conversation key. +Manual `/heartbeat` bypasses the recent-user skip check. #### Custom Heartbeat Prompt @@ -571,13 +579,19 @@ Via environment variable: ```bash HEARTBEAT_PROMPT="Review recent conversations" npm start # Optional: HEARTBEAT_SKIP_RECENT_USER_MIN=0 to disable recent-user skip +# Optional: HEARTBEAT_SKIP_RECENT_POLICY=fixed|fraction|off +# Optional: HEARTBEAT_SKIP_RECENT_FRACTION=0.5 +# Optional: HEARTBEAT_INTERRUPT_ON_USER_MESSAGE=true ``` Precedence: `prompt` (inline YAML) > `HEARTBEAT_PROMPT` (env var) > `promptFile` (file) > built-in default. | Field | Type | Default | Description | |-------|------|---------|-------------| -| `features.heartbeat.skipRecentUserMin` | number | `5` | Skip auto-heartbeats for N minutes after a user message. Set `0` to disable. | +| `features.heartbeat.skipRecentPolicy` | `'fixed' \| 'fraction' \| 'off'` | `'fraction'` | How recent-user skipping is calculated. | +| `features.heartbeat.skipRecentFraction` | number | `0.5` | Fraction of `intervalMin` used when policy is `fraction` (range: `0`-`1`). | +| `features.heartbeat.skipRecentUserMin` | number | `5` | Skip auto-heartbeats for N minutes when policy is `fixed`. Set `0` to disable fixed-window skipping. | +| `features.heartbeat.interruptOnUserMessage` | boolean | `true` | Cancel in-flight heartbeat runs when a user message arrives on the same conversation key. | | `features.heartbeat.prompt` | string | _(none)_ | Custom heartbeat prompt text | | `features.heartbeat.promptFile` | string | _(none)_ | Path to prompt file (relative to working dir) | diff --git a/docs/cron-setup.md b/docs/cron-setup.md index 8daa1b8..e898e6e 100644 --- a/docs/cron-setup.md +++ b/docs/cron-setup.md @@ -116,11 +116,16 @@ features: heartbeat: enabled: true intervalMin: 60 # Default: 60 minutes - skipRecentUserMin: 5 # Skip auto-heartbeats for N minutes after user messages (0 disables) + skipRecentPolicy: fraction # fixed | fraction | off + skipRecentFraction: 0.5 # Used when policy=fraction (0-1) + # skipRecentUserMin: 5 # Used when policy=fixed (0 disables) + interruptOnUserMessage: true # Cancel in-flight heartbeat when user messages arrive ``` -By default, automatic heartbeats are skipped for 5 minutes after a user message to avoid immediate follow-up noise. -- Set `skipRecentUserMin: 0` to disable this skip behavior. +By default, automatic heartbeats are skipped for half the heartbeat interval (`skipRecentPolicy: fraction`, `skipRecentFraction: 0.5`). +- Use `skipRecentPolicy: fixed` + `skipRecentUserMin` for a fixed window. +- Use `skipRecentPolicy: off` to disable recent-user skipping. +- `interruptOnUserMessage: true` prioritizes live user messages by cancelling in-flight heartbeat runs on the same key. - Manual `/heartbeat` always bypasses the skip check. ### Manual Trigger diff --git a/docs/railway-deploy.md b/docs/railway-deploy.md index bc53878..c1099e5 100644 --- a/docs/railway-deploy.md +++ b/docs/railway-deploy.md @@ -67,7 +67,10 @@ SLACK_APP_TOKEN=xapp-... | `CRON_ENABLED` | `false` | Enable cron jobs | | `HEARTBEAT_ENABLED` | `false` | Enable heartbeat service | | `HEARTBEAT_INTERVAL_MIN` | `30` | Heartbeat interval (minutes). Also enables heartbeat when set | -| `HEARTBEAT_SKIP_RECENT_USER_MIN` | `5` | Skip automatic heartbeats for N minutes after user messages (`0` disables) | +| `HEARTBEAT_SKIP_RECENT_POLICY` | `fraction` | Recent-user skip policy (`fixed`, `fraction`, `off`) | +| `HEARTBEAT_SKIP_RECENT_FRACTION` | `0.5` | Fraction of interval used when policy is `fraction` | +| `HEARTBEAT_SKIP_RECENT_USER_MIN` | `5` | Skip window in minutes when policy is `fixed` (`0` disables) | +| `HEARTBEAT_INTERRUPT_ON_USER_MESSAGE` | `true` | Cancel in-flight heartbeat when a user message arrives on the same key | | `HEARTBEAT_TARGET` | - | Target chat (e.g., `telegram:123456`) | | `OPENAI_API_KEY` | - | For voice message transcription | | `API_HOST` | `0.0.0.0` on Railway | Optional override for API bind address | diff --git a/lettabot.example.yaml b/lettabot.example.yaml index cad479d..c419e53 100644 --- a/lettabot.example.yaml +++ b/lettabot.example.yaml @@ -97,7 +97,10 @@ features: heartbeat: enabled: false intervalMin: 30 - # skipRecentUserMin: 5 # Skip auto-heartbeats for N minutes after user message (0 disables) + # skipRecentPolicy: fraction # fixed | fraction | off + # skipRecentFraction: 0.5 # Used when policy=fraction (0-1) + # skipRecentUserMin: 5 # Used when policy=fixed (0 disables) + # interruptOnUserMessage: true # Cancel in-flight heartbeat when user messages arrive # sendFileDir: ./data/outbound # Restrict directive to this directory (default: data/outbound) # sendFileMaxSize: 52428800 # Max file size in bytes for (default: 50MB) # sendFileCleanup: false # Allow to delete files after send (default: false) diff --git a/src/cli.ts b/src/cli.ts index 683b64a..b6b6512 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -289,6 +289,9 @@ Environment: SLACK_APP_TOKEN Slack app token (xapp-...) HEARTBEAT_INTERVAL_MIN Heartbeat interval in minutes HEARTBEAT_SKIP_RECENT_USER_MIN Skip auto-heartbeats after user messages (0 disables) + HEARTBEAT_SKIP_RECENT_POLICY Heartbeat skip policy (fixed, fraction, off) + HEARTBEAT_SKIP_RECENT_FRACTION Fraction of interval to skip when policy=fraction + HEARTBEAT_INTERRUPT_ON_USER_MESSAGE Cancel in-flight heartbeat on user message (true/false) CRON_ENABLED Enable cron jobs (true/false) `); } diff --git a/src/cli/config-tui.test.ts b/src/cli/config-tui.test.ts index 51ed0a1..bc5d319 100644 --- a/src/cli/config-tui.test.ts +++ b/src/cli/config-tui.test.ts @@ -142,6 +142,47 @@ describe('config TUI helpers', () => { expect(updated.providers?.[0].name).toBe('OpenAI'); }); + it('extract/apply preserves heartbeat policy and preemption fields', () => { + const config: LettaBotConfig = { + ...makeBaseConfig(), + agents: [ + { + name: 'Primary', + channels: {}, + features: { + heartbeat: { + enabled: true, + intervalMin: 30, + skipRecentPolicy: 'fraction', + skipRecentFraction: 0.5, + interruptOnUserMessage: true, + }, + }, + }, + ], + }; + + const draft = extractCoreDraft(config); + const heartbeat = draft.features.heartbeat; + if (!heartbeat) { + throw new Error('Expected heartbeat settings in extracted draft'); + } + expect(heartbeat.skipRecentPolicy).toBe('fraction'); + expect(heartbeat.skipRecentFraction).toBe(0.5); + expect(heartbeat.interruptOnUserMessage).toBe(true); + + heartbeat.skipRecentPolicy = 'fixed'; + heartbeat.skipRecentUserMin = 7; + delete heartbeat.skipRecentFraction; + heartbeat.interruptOnUserMessage = false; + + const updated = applyCoreDraft(config, draft); + expect(updated.agents?.[0].features?.heartbeat?.skipRecentPolicy).toBe('fixed'); + expect(updated.agents?.[0].features?.heartbeat?.skipRecentUserMin).toBe(7); + expect(updated.agents?.[0].features?.heartbeat?.skipRecentFraction).toBeUndefined(); + expect(updated.agents?.[0].features?.heartbeat?.interruptOnUserMessage).toBe(false); + }); + it('getCoreDraftWarnings flags missing API key and no enabled channels', () => { const draft: CoreConfigDraft = { server: { mode: 'api', apiKey: undefined, baseUrl: undefined }, diff --git a/src/cli/config-tui.ts b/src/cli/config-tui.ts index 08f7f2b..51a4a74 100644 --- a/src/cli/config-tui.ts +++ b/src/cli/config-tui.ts @@ -57,6 +57,8 @@ function getPrimaryAgent(config: LettaBotConfig): AgentConfig | null { function normalizeFeatures(source?: AgentConfig['features']): NonNullable { const features = deepClone(source ?? {}); + const skipRecentPolicy = features.heartbeat?.skipRecentPolicy + ?? (features.heartbeat?.skipRecentUserMin !== undefined ? 'fixed' : 'fraction'); return { ...features, cron: typeof features.cron === 'boolean' ? features.cron : false, @@ -64,6 +66,9 @@ function normalizeFeatures(source?: AgentConfig['features']): NonNullable { }); if (p.isCancel(interval)) return; draft.features.heartbeat.intervalMin = Number(interval.trim()); + + const skipPolicy = await p.select({ + message: 'Heartbeat skip policy after user activity', + options: [ + { value: 'fraction', label: 'Fraction of interval', hint: 'default: 0.5 × interval' }, + { value: 'fixed', label: 'Fixed minutes', hint: 'manual skip window (legacy behavior)' }, + { value: 'off', label: 'Disabled', hint: 'never skip based on recent user message' }, + ], + initialValue: draft.features.heartbeat.skipRecentPolicy ?? 'fraction', + }); + if (p.isCancel(skipPolicy)) return; + draft.features.heartbeat.skipRecentPolicy = skipPolicy as 'fixed' | 'fraction' | 'off'; + + if (skipPolicy === 'fixed') { + const skipMin = await p.text({ + message: 'Skip heartbeats for this many minutes after user messages', + placeholder: '5', + initialValue: String(draft.features.heartbeat.skipRecentUserMin ?? 5), + validate: (value) => { + const parsed = Number(value.trim()); + if (!Number.isFinite(parsed) || parsed < 0) { + return 'Enter a non-negative number'; + } + return undefined; + }, + }); + if (p.isCancel(skipMin)) return; + draft.features.heartbeat.skipRecentUserMin = Number(skipMin.trim()); + delete draft.features.heartbeat.skipRecentFraction; + } else if (skipPolicy === 'fraction') { + const skipFraction = await p.text({ + message: 'Skip window as fraction of interval (0-1)', + placeholder: '0.5', + initialValue: String(draft.features.heartbeat.skipRecentFraction ?? 0.5), + validate: (value) => { + const parsed = Number(value.trim()); + if (!Number.isFinite(parsed) || parsed < 0 || parsed > 1) { + return 'Enter a number between 0 and 1'; + } + return undefined; + }, + }); + if (p.isCancel(skipFraction)) return; + draft.features.heartbeat.skipRecentFraction = Number(skipFraction.trim()); + delete draft.features.heartbeat.skipRecentUserMin; + } else { + delete draft.features.heartbeat.skipRecentUserMin; + delete draft.features.heartbeat.skipRecentFraction; + } + + const interruptOnUserMessage = await p.confirm({ + message: 'Interrupt in-flight heartbeat when a user message arrives?', + initialValue: draft.features.heartbeat.interruptOnUserMessage !== false, + }); + if (p.isCancel(interruptOnUserMessage)) return; + draft.features.heartbeat.interruptOnUserMessage = interruptOnUserMessage; } } diff --git a/src/config/io.test.ts b/src/config/io.test.ts index 45ee29f..098678a 100644 --- a/src/config/io.test.ts +++ b/src/config/io.test.ts @@ -262,14 +262,17 @@ describe('server.api config (canonical location)', () => { expect(env.API_CORS_ORIGIN).toBe('*'); }); - it('configToEnv should map heartbeat skip window env var', () => { + it('configToEnv should map heartbeat skip/preemption env vars', () => { const config: LettaBotConfig = { ...DEFAULT_CONFIG, features: { heartbeat: { enabled: true, intervalMin: 30, + skipRecentPolicy: 'fraction', + skipRecentFraction: 0.5, skipRecentUserMin: 4, + interruptOnUserMessage: false, }, }, }; @@ -277,6 +280,9 @@ describe('server.api config (canonical location)', () => { const env = configToEnv(config); expect(env.HEARTBEAT_INTERVAL_MIN).toBe('30'); expect(env.HEARTBEAT_SKIP_RECENT_USER_MIN).toBe('4'); + expect(env.HEARTBEAT_SKIP_RECENT_POLICY).toBe('fraction'); + expect(env.HEARTBEAT_SKIP_RECENT_FRACTION).toBe('0.5'); + expect(env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE).toBe('false'); }); it('configToEnv should fall back to top-level api (deprecated)', () => { diff --git a/src/config/io.ts b/src/config/io.ts index ce0175d..29518d9 100644 --- a/src/config/io.ts +++ b/src/config/io.ts @@ -527,6 +527,15 @@ export function configToEnv(config: LettaBotConfig): Record { if (config.features.heartbeat.skipRecentUserMin !== undefined) { env.HEARTBEAT_SKIP_RECENT_USER_MIN = String(config.features.heartbeat.skipRecentUserMin); } + if (config.features.heartbeat.skipRecentPolicy !== undefined) { + env.HEARTBEAT_SKIP_RECENT_POLICY = config.features.heartbeat.skipRecentPolicy; + } + if (config.features.heartbeat.skipRecentFraction !== undefined) { + env.HEARTBEAT_SKIP_RECENT_FRACTION = String(config.features.heartbeat.skipRecentFraction); + } + if (config.features.heartbeat.interruptOnUserMessage !== undefined) { + env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE = config.features.heartbeat.interruptOnUserMessage ? 'true' : 'false'; + } } if (config.features?.sleeptime) { if (config.features.sleeptime.trigger) { diff --git a/src/config/normalize.test.ts b/src/config/normalize.test.ts index b399fc8..2f7aa0f 100644 --- a/src/config/normalize.test.ts +++ b/src/config/normalize.test.ts @@ -34,6 +34,7 @@ describe('normalizeAgents', () => { 'BLUESKY_NOTIFICATIONS_ENABLED', 'BLUESKY_NOTIFICATIONS_INTERVAL_SEC', 'BLUESKY_NOTIFICATIONS_LIMIT', 'BLUESKY_NOTIFICATIONS_PRIORITY', 'BLUESKY_NOTIFICATIONS_REASONS', 'HEARTBEAT_ENABLED', 'HEARTBEAT_INTERVAL_MIN', 'HEARTBEAT_SKIP_RECENT_USER_MIN', + 'HEARTBEAT_SKIP_RECENT_POLICY', 'HEARTBEAT_SKIP_RECENT_FRACTION', 'HEARTBEAT_INTERRUPT_ON_USER_MESSAGE', 'SLEEPTIME_TRIGGER', 'SLEEPTIME_BEHAVIOR', 'SLEEPTIME_STEP_COUNT', 'CRON_ENABLED', ]; @@ -424,6 +425,30 @@ describe('normalizeAgents', () => { }); }); + it('should pick up heartbeat policy and preemption settings from env vars', () => { + process.env.HEARTBEAT_ENABLED = 'true'; + process.env.HEARTBEAT_INTERVAL_MIN = '30'; + process.env.HEARTBEAT_SKIP_RECENT_POLICY = 'fraction'; + process.env.HEARTBEAT_SKIP_RECENT_FRACTION = '0.5'; + process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE = 'false'; + + const config: LettaBotConfig = { + server: { mode: 'cloud' }, + agent: { name: 'TestBot', model: 'test' }, + channels: {}, + }; + + const agents = normalizeAgents(config); + + expect(agents[0].features?.heartbeat).toEqual({ + enabled: true, + intervalMin: 30, + skipRecentPolicy: 'fraction', + skipRecentFraction: 0.5, + interruptOnUserMessage: false, + }); + }); + it('should pick up sleeptime from env vars when YAML features is empty', () => { process.env.SLEEPTIME_TRIGGER = 'step-count'; process.env.SLEEPTIME_BEHAVIOR = 'reminder'; @@ -511,6 +536,8 @@ describe('normalizeAgents', () => { it('should not override YAML heartbeat with env vars', () => { process.env.HEARTBEAT_ENABLED = 'true'; process.env.HEARTBEAT_INTERVAL_MIN = '99'; + process.env.HEARTBEAT_SKIP_RECENT_POLICY = 'off'; + process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE = 'false'; const config: LettaBotConfig = { server: { mode: 'cloud' }, @@ -521,6 +548,8 @@ describe('normalizeAgents', () => { enabled: true, intervalMin: 10, skipRecentUserMin: 3, + skipRecentPolicy: 'fixed', + interruptOnUserMessage: true, }, }, }; @@ -530,6 +559,8 @@ describe('normalizeAgents', () => { // YAML values should win expect(agents[0].features?.heartbeat?.intervalMin).toBe(10); expect(agents[0].features?.heartbeat?.skipRecentUserMin).toBe(3); + expect(agents[0].features?.heartbeat?.skipRecentPolicy).toBe('fixed'); + expect(agents[0].features?.heartbeat?.interruptOnUserMessage).toBe(true); }); it('should not override YAML sleeptime with env vars', () => { diff --git a/src/config/types.ts b/src/config/types.ts index ad8ed10..2d4b0eb 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -11,6 +11,7 @@ import { createLogger } from '../logger.js'; const log = createLogger('Config'); export type ServerMode = 'api' | 'docker' | 'cloud' | 'selfhosted'; export type CanonicalServerMode = 'api' | 'docker'; +export type HeartbeatSkipRecentPolicy = 'fixed' | 'fraction' | 'off'; export function canonicalizeServerMode(mode?: ServerMode): CanonicalServerMode { return mode === 'docker' || mode === 'selfhosted' ? 'docker' : 'api'; @@ -89,6 +90,9 @@ export interface AgentConfig { enabled: boolean; intervalMin?: number; skipRecentUserMin?: number; // Skip auto-heartbeats for N minutes after user message (0 disables) + skipRecentPolicy?: HeartbeatSkipRecentPolicy; // 'fixed' | 'fraction' | 'off' + skipRecentFraction?: number; // Fraction of intervalMin when policy=fraction (0-1) + interruptOnUserMessage?: boolean; // Cancel in-flight heartbeat when user messages arrive prompt?: string; // Custom heartbeat prompt (replaces default body) promptFile?: string; // Path to prompt file (re-read each tick for live editing) target?: string; // Delivery target ("telegram:123", "slack:C123", etc.) @@ -185,6 +189,9 @@ export interface LettaBotConfig { enabled: boolean; intervalMin?: number; skipRecentUserMin?: number; // Skip auto-heartbeats for N minutes after user message (0 disables) + skipRecentPolicy?: HeartbeatSkipRecentPolicy; // 'fixed' | 'fraction' | 'off' + skipRecentFraction?: number; // Fraction of intervalMin when policy=fraction (0-1) + interruptOnUserMessage?: boolean; // Cancel in-flight heartbeat when user messages arrive prompt?: string; // Custom heartbeat prompt (replaces default body) promptFile?: string; // Path to prompt file (re-read each tick for live editing) target?: string; // Delivery target ("telegram:123", "slack:C123", etc.) @@ -804,11 +811,29 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] { const skipRecentUserMin = process.env.HEARTBEAT_SKIP_RECENT_USER_MIN ? parseInt(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN, 10) : undefined; + const skipRecentPolicyRaw = process.env.HEARTBEAT_SKIP_RECENT_POLICY; + const skipRecentPolicy = skipRecentPolicyRaw === 'fixed' + || skipRecentPolicyRaw === 'fraction' + || skipRecentPolicyRaw === 'off' + ? skipRecentPolicyRaw + : undefined; + const skipRecentFraction = process.env.HEARTBEAT_SKIP_RECENT_FRACTION + ? Number(process.env.HEARTBEAT_SKIP_RECENT_FRACTION) + : undefined; + const interruptOnUserMessageRaw = process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE; + const interruptOnUserMessage = interruptOnUserMessageRaw === 'true' + ? true + : interruptOnUserMessageRaw === 'false' + ? false + : undefined; features.heartbeat = { enabled: true, ...(Number.isFinite(intervalMin) ? { intervalMin } : {}), ...(Number.isFinite(skipRecentUserMin) ? { skipRecentUserMin } : {}), + ...(skipRecentPolicy ? { skipRecentPolicy } : {}), + ...(Number.isFinite(skipRecentFraction) ? { skipRecentFraction } : {}), + ...(interruptOnUserMessage !== undefined ? { interruptOnUserMessage } : {}), }; } diff --git a/src/core/bot.ts b/src/core/bot.ts index 7509eab..86024e5 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -288,6 +288,8 @@ export class LettaBot implements AgentSession { private processing = false; // Global lock for shared mode private processingKeys: Set = new Set(); // Per-key locks for per-channel mode private cancelledKeys: Set = new Set(); // Tracks keys where /cancel was issued + private backgroundCancelledKeys: Set = new Set(); // Tracks background runs cancelled by live user activity + private activeBackgroundTriggerByKey: Map = new Map(); private sendSequence = 0; // Monotonic counter for desync diagnostics // Forward-looking: stale-result detection via runIds becomes active once the // SDK surfaces non-empty result run_ids. Until then, this map mostly stays @@ -971,6 +973,23 @@ export class LettaBot implements AgentSession { // ========================================================================= // Message queue // ========================================================================= + + private maybePreemptHeartbeatForUserMessage(convKey: string): void { + if (this.config.interruptHeartbeatOnUserMessage === false) { + return; + } + + if (this.activeBackgroundTriggerByKey.get(convKey) !== 'heartbeat') { + return; + } + + this.backgroundCancelledKeys.add(convKey); + const session = this.sessionManager.getSession(convKey); + if (session) { + session.abort().catch(() => {}); + } + log.info(`Preempted in-flight heartbeat due to user message (key=${convKey})`); + } private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise { // AskUserQuestion support: if the agent is waiting for a user answer, @@ -987,6 +1006,8 @@ export class LettaBot implements AgentSession { return; } + this.maybePreemptHeartbeatForUserMessage(incomingConvKey); + log.info(`Message from ${msg.userId} on ${msg.channel}: ${msg.text}`); if (msg.isGroup && this.groupBatcher) { @@ -1881,7 +1902,9 @@ export class LettaBot implements AgentSession { ): Promise { const isSilent = context?.outputMode === 'silent'; const convKey = this.resolveHeartbeatConversationKey(); + const triggerType = context?.type ?? 'heartbeat'; const acquired = await this.acquireLock(convKey); + this.activeBackgroundTriggerByKey.set(convKey, triggerType); const sendT0 = performance.now(); const sendTurnId = this.turnLogger ? generateTurnId() : ''; @@ -1892,15 +1915,29 @@ export class LettaBot implements AgentSession { let retried = false; while (true) { + if (this.backgroundCancelledKeys.has(convKey)) { + log.info(`sendToAgent: background run pre-cancelled by live user activity (key=${convKey})`); + return ''; + } const { session, stream } = await this.sessionManager.runSession(text, { convKey, retried }); try { + if (this.backgroundCancelledKeys.has(convKey)) { + session.abort().catch(() => {}); + log.info(`sendToAgent: background run cancelled before stream start (key=${convKey})`); + return ''; + } let response = ''; let sawStaleDuplicateResult = false; let approvalRetryPending = false; let usedMessageCli = false; let lastErrorDetail: StreamErrorDetail | undefined; for await (const msg of stream()) { + if (this.backgroundCancelledKeys.has(convKey)) { + session.abort().catch(() => {}); + log.info(`sendToAgent: cancelled in-flight background stream (key=${convKey})`); + return ''; + } sendTurnAcc?.feedRaw(msg); if (msg.type === 'tool_call') { this.sessionManager.syncTodoToolCall(msg); @@ -1930,6 +1967,10 @@ export class LettaBot implements AgentSession { // TODO(letta-code-sdk#31): Remove once SDK handles HITL approvals in bypassPermissions mode. if (msg.success === false || msg.error) { + if (this.backgroundCancelledKeys.has(convKey)) { + log.info(`sendToAgent: cancelled heartbeat produced terminal error result; suppressing (key=${convKey})`); + return ''; + } // Enrich opaque errors from run metadata (mirrors processMessage logic). const convId = typeof msg.conversationId === 'string' ? msg.conversationId : undefined; if (this.store.agentId && @@ -2041,11 +2082,17 @@ export class LettaBot implements AgentSession { } catch (error) { // Invalidate on stream errors so next call gets a fresh subprocess this.sessionManager.invalidateSession(convKey); + if (this.backgroundCancelledKeys.has(convKey)) { + log.info(`sendToAgent: background run ended after cancellation (key=${convKey})`); + return ''; + } throw error; } } } finally { + this.activeBackgroundTriggerByKey.delete(convKey); + this.backgroundCancelledKeys.delete(convKey); if (this.config.reuseSession === false) { this.sessionManager.invalidateSession(convKey); } diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index e1c0f03..b549911 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -882,6 +882,66 @@ describe('SDK session contract', () => { expect(processSpy).toHaveBeenCalledWith('slack'); }); + it('preempts an in-flight heartbeat when a user message arrives on the same key', async () => { + const streamStarted = deferred(); + const releaseStream = deferred(); + let aborted = false; + + const mockSession = { + initialize: vi.fn(async () => undefined), + send: vi.fn(async (_message: unknown) => undefined), + stream: vi.fn(() => + (async function* () { + streamStarted.resolve(); + await releaseStream.promise; + if (aborted) { + throw new Error('aborted'); + } + yield { type: 'assistant', content: 'heartbeat reply' }; + yield { type: 'result', success: true }; + })() + ), + abort: vi.fn(async () => { + aborted = true; + releaseStream.resolve(); + }), + close: vi.fn(() => undefined), + recoverPendingApprovals: vi.fn(async () => ({ recovered: false, unsupported: true, detail: 'mock' })), + agentId: 'agent-contract-test', + conversationId: 'conversation-contract-test', + }; + + vi.mocked(resumeSession).mockReturnValue(mockSession as never); + + const bot = new LettaBot({ + workingDir: join(dataDir, 'working'), + allowedTools: [], + interruptHeartbeatOnUserMessage: true, + }); + const botInternal = bot as any; + const processQueueSpy = vi.spyOn(botInternal, 'processQueue').mockResolvedValue(undefined); + + const heartbeatPromise = bot.sendToAgent('heartbeat'); + await streamStarted.promise; + + await botInternal.handleMessage({ + userId: 'u1', + channel: 'telegram', + chatId: 'c1', + text: 'hi during heartbeat', + timestamp: new Date(), + isGroup: false, + }, {} as any); + + const response = await heartbeatPromise; + await Promise.resolve(); + + expect(response).toBe(''); + expect(mockSession.abort).toHaveBeenCalledTimes(1); + expect(botInternal.processing).toBe(false); + expect(processQueueSpy).toHaveBeenCalled(); + }); + it('LRU eviction in per-chat mode does not close active keys', async () => { const createdSession = { initialize: vi.fn(async () => undefined), diff --git a/src/core/types.ts b/src/core/types.ts index b54e584..9930823 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -204,6 +204,7 @@ export interface BotConfig { // Conversation routing conversationMode?: 'disabled' | 'shared' | 'per-channel' | 'per-chat'; // Default: shared heartbeatConversation?: string; // "dedicated" | "last-active" | "" (default: last-active) + interruptHeartbeatOnUserMessage?: boolean; // Default true. Cancel in-flight heartbeat on user message. conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode) maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction) reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed at cost of ~5s latency per message. diff --git a/src/cron/heartbeat.test.ts b/src/cron/heartbeat.test.ts index dd59a14..6e78b5d 100644 --- a/src/cron/heartbeat.test.ts +++ b/src/cron/heartbeat.test.ts @@ -251,6 +251,7 @@ describe('HeartbeatService prompt resolution', () => { const service = new HeartbeatService(bot, createConfig({ workingDir: tmpDir, + skipRecentPolicy: 'fixed', skipRecentUserMinutes: 5, })); @@ -267,6 +268,7 @@ describe('HeartbeatService prompt resolution', () => { const service = new HeartbeatService(bot, createConfig({ workingDir: tmpDir, + skipRecentPolicy: 'fixed', skipRecentUserMinutes: 0, })); @@ -274,6 +276,40 @@ describe('HeartbeatService prompt resolution', () => { expect(bot.sendToAgent).toHaveBeenCalledTimes(1); }); + + it('defaults to fraction policy (0.5 of interval) when no fixed window is configured', async () => { + const bot = createMockBot(); + (bot.getLastUserMessageTime as ReturnType).mockReturnValue( + new Date(Date.now() - 10 * 60 * 1000), + ); + + const service = new HeartbeatService(bot, createConfig({ + workingDir: tmpDir, + intervalMinutes: 30, + })); + + await (service as any).runHeartbeat(false); + + // 30m * 0.5 => 15m skip window; 10m ago should skip. + expect(bot.sendToAgent).not.toHaveBeenCalled(); + }); + + it('does not skip when skipRecentPolicy is off', async () => { + const bot = createMockBot(); + (bot.getLastUserMessageTime as ReturnType).mockReturnValue( + new Date(Date.now() - 1 * 60 * 1000), + ); + + const service = new HeartbeatService(bot, createConfig({ + workingDir: tmpDir, + skipRecentPolicy: 'off', + skipRecentUserMinutes: 60, + })); + + await (service as any).runHeartbeat(false); + + expect(bot.sendToAgent).toHaveBeenCalledTimes(1); + }); }); // ── Memfs health check ───────────────────────────────────────────────── diff --git a/src/cron/heartbeat.ts b/src/cron/heartbeat.ts index 7b2e802..a8307f7 100644 --- a/src/cron/heartbeat.ts +++ b/src/cron/heartbeat.ts @@ -48,6 +48,8 @@ export interface HeartbeatConfig { enabled: boolean; intervalMinutes: number; skipRecentUserMinutes?: number; // Default 5. Set to 0 to disable skip logic. + skipRecentPolicy?: 'fixed' | 'fraction' | 'off'; + skipRecentFraction?: number; // Used when policy=fraction. Expected range: 0-1. workingDir: string; agentKey: string; @@ -80,12 +82,51 @@ export class HeartbeatService { this.config = config; } - private getSkipWindowMs(): number { - const raw = this.config.skipRecentUserMinutes; - if (raw === undefined || !Number.isFinite(raw) || raw < 0) { - return 5 * 60 * 1000; // default: 5 minutes + private getSkipRecentPolicy(): 'fixed' | 'fraction' | 'off' { + const configured = this.config.skipRecentPolicy; + if (configured === 'fixed' || configured === 'fraction' || configured === 'off') { + return configured; } - return Math.floor(raw * 60 * 1000); + + // Backward compatibility: if explicit minutes are configured, preserve the + // historical fixed-window behavior unless policy is explicitly set. + if (this.config.skipRecentUserMinutes !== undefined) { + return 'fixed'; + } + + // New default: skip for half the heartbeat interval. + return 'fraction'; + } + + private getSkipWindow(): { policy: 'fixed' | 'fraction' | 'off'; minutes: number; milliseconds: number } { + const policy = this.getSkipRecentPolicy(); + + if (policy === 'off') { + return { policy, minutes: 0, milliseconds: 0 }; + } + + if (policy === 'fraction') { + const rawFraction = this.config.skipRecentFraction; + const fraction = rawFraction !== undefined && Number.isFinite(rawFraction) + ? Math.max(0, Math.min(1, rawFraction)) + : 0.5; + const minutes = Math.ceil(Math.max(0, this.config.intervalMinutes) * fraction); + return { + policy, + minutes, + milliseconds: Math.floor(minutes * 60 * 1000), + }; + } + + const raw = this.config.skipRecentUserMinutes; + const minutes = (raw === undefined || !Number.isFinite(raw) || raw < 0) + ? 5 + : raw; + return { + policy, + minutes, + milliseconds: Math.floor(minutes * 60 * 1000), + }; } /** @@ -207,17 +248,19 @@ export class HeartbeatService { // Skip if user sent a message in the configured window (unless manual trigger) if (!skipRecentCheck) { - const skipWindowMs = this.getSkipWindowMs(); + const { policy, minutes: skipWindowMin, milliseconds: skipWindowMs } = this.getSkipWindow(); const lastUserMessage = this.bot.getLastUserMessageTime(); if (skipWindowMs > 0 && lastUserMessage) { const msSinceLastMessage = now.getTime() - lastUserMessage.getTime(); if (msSinceLastMessage < skipWindowMs) { const minutesAgo = Math.round(msSinceLastMessage / 60000); - log.info(`User messaged ${minutesAgo}m ago - skipping heartbeat`); + log.info(`User messaged ${minutesAgo}m ago - skipping heartbeat (policy=${policy}, window=${skipWindowMin}m)`); logEvent('heartbeat_skipped_recent_user', { lastUserMessage: lastUserMessage.toISOString(), minutesAgo, + skipPolicy: policy, + skipWindowMin, }); return; } diff --git a/src/main.ts b/src/main.ts index ce1480b..5d59a14 100644 --- a/src/main.ts +++ b/src/main.ts @@ -218,6 +218,19 @@ function ensureRequiredTools(tools: string[]): string[] { return out; } +function parseOptionalBoolean(raw?: string): boolean | undefined { + if (raw === 'true') return true; + if (raw === 'false') return false; + return undefined; +} + +function parseHeartbeatSkipRecentPolicy(raw?: string): 'fixed' | 'fraction' | 'off' | undefined { + if (raw === 'fixed' || raw === 'fraction' || raw === 'off') { + return raw; + } + return undefined; +} + // Global config (shared across all agents) const globalConfig = { workingDir: getWorkingDir(), @@ -232,6 +245,9 @@ const globalConfig = { attachmentsMaxAgeDays: resolveAttachmentsMaxAgeDays(), cronEnabled: process.env.CRON_ENABLED === 'true', // Legacy env var fallback heartbeatSkipRecentUserMin: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN), + heartbeatSkipRecentPolicy: parseHeartbeatSkipRecentPolicy(process.env.HEARTBEAT_SKIP_RECENT_POLICY), + heartbeatSkipRecentFraction: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_FRACTION), + heartbeatInterruptOnUserMessage: parseOptionalBoolean(process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE), }; // Validate LETTA_API_KEY is set for API mode (docker mode doesn't require it) @@ -350,6 +366,7 @@ async function main() { const cronStorePath = cronStoreFilename ? resolve(getCronDataDir(), cronStoreFilename) : undefined; + const heartbeatConfig = agentConfig.features?.heartbeat; const bot = new LettaBot({ workingDir: resolvedWorkingDir, @@ -366,6 +383,10 @@ async function main() { display: agentConfig.features?.display, conversationMode: agentConfig.conversations?.mode || 'shared', heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active', + interruptHeartbeatOnUserMessage: + heartbeatConfig?.interruptOnUserMessage + ?? globalConfig.heartbeatInterruptOnUserMessage + ?? true, conversationOverrides: agentConfig.conversations?.perChannel, maxSessions: agentConfig.conversations?.maxSessions, reuseSession: agentConfig.conversations?.reuseSession, @@ -469,11 +490,12 @@ async function main() { } // Per-agent heartbeat - const heartbeatConfig = agentConfig.features?.heartbeat; const heartbeatService = new HeartbeatService(bot, { enabled: heartbeatConfig?.enabled ?? false, intervalMinutes: heartbeatConfig?.intervalMin ?? 240, skipRecentUserMinutes: heartbeatConfig?.skipRecentUserMin ?? globalConfig.heartbeatSkipRecentUserMin, + skipRecentPolicy: heartbeatConfig?.skipRecentPolicy ?? globalConfig.heartbeatSkipRecentPolicy, + skipRecentFraction: heartbeatConfig?.skipRecentFraction ?? globalConfig.heartbeatSkipRecentFraction, agentKey: agentConfig.name, memfs: resolvedMemfs, prompt: heartbeatConfig?.prompt || process.env.HEARTBEAT_PROMPT,