fix(heartbeat): prioritize user messages over in-flight heartbeats (#594)

Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
Cameron
2026-03-13 14:40:04 -07:00
committed by GitHub
parent 039707387e
commit f1f3540005
17 changed files with 429 additions and 19 deletions

View File

@@ -535,12 +535,20 @@ features:
heartbeat:
enabled: true
intervalMin: 60 # Check every 60 minutes
skipRecentUserMin: 5 # Skip auto-heartbeats for N minutes after user message (0 disables)
skipRecentPolicy: fraction # fixed | fraction | off
skipRecentFraction: 0.5 # Used when policy=fraction (0-1)
# skipRecentUserMin: 5 # Used when policy=fixed (0 disables)
interruptOnUserMessage: true # Cancel in-flight heartbeat when user messages arrive
```
Heartbeats are background tasks where the agent can review pending work.
If the user messaged recently, automatic heartbeats are skipped by default for 5 minutes (`skipRecentUserMin`).
Set this to `0` to disable skipping. Manual `/heartbeat` bypasses the skip check.
By default, automatic heartbeats skip for half of the heartbeat interval (`skipRecentPolicy: fraction` with `skipRecentFraction: 0.5`).
- `fixed`: use `skipRecentUserMin`.
- `fraction`: use `ceil(intervalMin * skipRecentFraction)`.
- `off`: disable recent-user skipping.
`interruptOnUserMessage` defaults to `true`, so live user messages cancel in-flight heartbeat runs on the same conversation key.
Manual `/heartbeat` bypasses the recent-user skip check.
#### Custom Heartbeat Prompt
@@ -571,13 +579,19 @@ Via environment variable:
```bash
HEARTBEAT_PROMPT="Review recent conversations" npm start
# Optional: HEARTBEAT_SKIP_RECENT_USER_MIN=0 to disable recent-user skip
# Optional: HEARTBEAT_SKIP_RECENT_POLICY=fixed|fraction|off
# Optional: HEARTBEAT_SKIP_RECENT_FRACTION=0.5
# Optional: HEARTBEAT_INTERRUPT_ON_USER_MESSAGE=true
```
Precedence: `prompt` (inline YAML) > `HEARTBEAT_PROMPT` (env var) > `promptFile` (file) > built-in default.
| Field | Type | Default | Description |
|-------|------|---------|-------------|
| `features.heartbeat.skipRecentUserMin` | number | `5` | Skip auto-heartbeats for N minutes after a user message. Set `0` to disable. |
| `features.heartbeat.skipRecentPolicy` | `'fixed' \| 'fraction' \| 'off'` | `'fraction'` | How recent-user skipping is calculated. |
| `features.heartbeat.skipRecentFraction` | number | `0.5` | Fraction of `intervalMin` used when policy is `fraction` (range: `0`-`1`). |
| `features.heartbeat.skipRecentUserMin` | number | `5` | Skip auto-heartbeats for N minutes when policy is `fixed`. Set `0` to disable fixed-window skipping. |
| `features.heartbeat.interruptOnUserMessage` | boolean | `true` | Cancel in-flight heartbeat runs when a user message arrives on the same conversation key. |
| `features.heartbeat.prompt` | string | _(none)_ | Custom heartbeat prompt text |
| `features.heartbeat.promptFile` | string | _(none)_ | Path to prompt file (relative to working dir) |

View File

@@ -116,11 +116,16 @@ features:
heartbeat:
enabled: true
intervalMin: 60 # Default: 60 minutes
skipRecentUserMin: 5 # Skip auto-heartbeats for N minutes after user messages (0 disables)
skipRecentPolicy: fraction # fixed | fraction | off
skipRecentFraction: 0.5 # Used when policy=fraction (0-1)
# skipRecentUserMin: 5 # Used when policy=fixed (0 disables)
interruptOnUserMessage: true # Cancel in-flight heartbeat when user messages arrive
```
By default, automatic heartbeats are skipped for 5 minutes after a user message to avoid immediate follow-up noise.
- Set `skipRecentUserMin: 0` to disable this skip behavior.
By default, automatic heartbeats are skipped for half the heartbeat interval (`skipRecentPolicy: fraction`, `skipRecentFraction: 0.5`).
- Use `skipRecentPolicy: fixed` + `skipRecentUserMin` for a fixed window.
- Use `skipRecentPolicy: off` to disable recent-user skipping.
- `interruptOnUserMessage: true` prioritizes live user messages by cancelling in-flight heartbeat runs on the same key.
- Manual `/heartbeat` always bypasses the skip check.
### Manual Trigger

View File

@@ -67,7 +67,10 @@ SLACK_APP_TOKEN=xapp-...
| `CRON_ENABLED` | `false` | Enable cron jobs |
| `HEARTBEAT_ENABLED` | `false` | Enable heartbeat service |
| `HEARTBEAT_INTERVAL_MIN` | `30` | Heartbeat interval (minutes). Also enables heartbeat when set |
| `HEARTBEAT_SKIP_RECENT_USER_MIN` | `5` | Skip automatic heartbeats for N minutes after user messages (`0` disables) |
| `HEARTBEAT_SKIP_RECENT_POLICY` | `fraction` | Recent-user skip policy (`fixed`, `fraction`, `off`) |
| `HEARTBEAT_SKIP_RECENT_FRACTION` | `0.5` | Fraction of interval used when policy is `fraction` |
| `HEARTBEAT_SKIP_RECENT_USER_MIN` | `5` | Skip window in minutes when policy is `fixed` (`0` disables) |
| `HEARTBEAT_INTERRUPT_ON_USER_MESSAGE` | `true` | Cancel in-flight heartbeat when a user message arrives on the same key |
| `HEARTBEAT_TARGET` | - | Target chat (e.g., `telegram:123456`) |
| `OPENAI_API_KEY` | - | For voice message transcription |
| `API_HOST` | `0.0.0.0` on Railway | Optional override for API bind address |

View File

@@ -97,7 +97,10 @@ features:
heartbeat:
enabled: false
intervalMin: 30
# skipRecentUserMin: 5 # Skip auto-heartbeats for N minutes after user message (0 disables)
# skipRecentPolicy: fraction # fixed | fraction | off
# skipRecentFraction: 0.5 # Used when policy=fraction (0-1)
# skipRecentUserMin: 5 # Used when policy=fixed (0 disables)
# interruptOnUserMessage: true # Cancel in-flight heartbeat when user messages arrive
# sendFileDir: ./data/outbound # Restrict <send-file> directive to this directory (default: data/outbound)
# sendFileMaxSize: 52428800 # Max file size in bytes for <send-file> (default: 50MB)
# sendFileCleanup: false # Allow <send-file cleanup="true"> to delete files after send (default: false)

View File

@@ -289,6 +289,9 @@ Environment:
SLACK_APP_TOKEN Slack app token (xapp-...)
HEARTBEAT_INTERVAL_MIN Heartbeat interval in minutes
HEARTBEAT_SKIP_RECENT_USER_MIN Skip auto-heartbeats after user messages (0 disables)
HEARTBEAT_SKIP_RECENT_POLICY Heartbeat skip policy (fixed, fraction, off)
HEARTBEAT_SKIP_RECENT_FRACTION Fraction of interval to skip when policy=fraction
HEARTBEAT_INTERRUPT_ON_USER_MESSAGE Cancel in-flight heartbeat on user message (true/false)
CRON_ENABLED Enable cron jobs (true/false)
`);
}

View File

@@ -142,6 +142,47 @@ describe('config TUI helpers', () => {
expect(updated.providers?.[0].name).toBe('OpenAI');
});
it('extract/apply preserves heartbeat policy and preemption fields', () => {
const config: LettaBotConfig = {
...makeBaseConfig(),
agents: [
{
name: 'Primary',
channels: {},
features: {
heartbeat: {
enabled: true,
intervalMin: 30,
skipRecentPolicy: 'fraction',
skipRecentFraction: 0.5,
interruptOnUserMessage: true,
},
},
},
],
};
const draft = extractCoreDraft(config);
const heartbeat = draft.features.heartbeat;
if (!heartbeat) {
throw new Error('Expected heartbeat settings in extracted draft');
}
expect(heartbeat.skipRecentPolicy).toBe('fraction');
expect(heartbeat.skipRecentFraction).toBe(0.5);
expect(heartbeat.interruptOnUserMessage).toBe(true);
heartbeat.skipRecentPolicy = 'fixed';
heartbeat.skipRecentUserMin = 7;
delete heartbeat.skipRecentFraction;
heartbeat.interruptOnUserMessage = false;
const updated = applyCoreDraft(config, draft);
expect(updated.agents?.[0].features?.heartbeat?.skipRecentPolicy).toBe('fixed');
expect(updated.agents?.[0].features?.heartbeat?.skipRecentUserMin).toBe(7);
expect(updated.agents?.[0].features?.heartbeat?.skipRecentFraction).toBeUndefined();
expect(updated.agents?.[0].features?.heartbeat?.interruptOnUserMessage).toBe(false);
});
it('getCoreDraftWarnings flags missing API key and no enabled channels', () => {
const draft: CoreConfigDraft = {
server: { mode: 'api', apiKey: undefined, baseUrl: undefined },

View File

@@ -57,6 +57,8 @@ function getPrimaryAgent(config: LettaBotConfig): AgentConfig | null {
function normalizeFeatures(source?: AgentConfig['features']): NonNullable<AgentConfig['features']> {
const features = deepClone(source ?? {});
const skipRecentPolicy = features.heartbeat?.skipRecentPolicy
?? (features.heartbeat?.skipRecentUserMin !== undefined ? 'fixed' : 'fraction');
return {
...features,
cron: typeof features.cron === 'boolean' ? features.cron : false,
@@ -64,6 +66,9 @@ function normalizeFeatures(source?: AgentConfig['features']): NonNullable<AgentC
enabled: features.heartbeat?.enabled ?? false,
intervalMin: features.heartbeat?.intervalMin ?? 60,
skipRecentUserMin: features.heartbeat?.skipRecentUserMin,
skipRecentPolicy,
skipRecentFraction: features.heartbeat?.skipRecentFraction,
interruptOnUserMessage: features.heartbeat?.interruptOnUserMessage ?? true,
prompt: features.heartbeat?.prompt,
promptFile: features.heartbeat?.promptFile,
target: features.heartbeat?.target,
@@ -183,7 +188,7 @@ export function formatCoreDraftSummary(draft: CoreConfigDraft, configPath: strin
[
'Heartbeat',
draft.features.heartbeat?.enabled
? `${draft.features.heartbeat.intervalMin ?? 60}min`
? `${draft.features.heartbeat.intervalMin ?? 60}min${draft.features.heartbeat.skipRecentPolicy ?? 'fraction'} • preempt ${draft.features.heartbeat.interruptOnUserMessage === false ? 'off' : 'on'}`
: '✗ Disabled',
],
];
@@ -370,6 +375,62 @@ async function editFeatures(draft: CoreConfigDraft): Promise<void> {
});
if (p.isCancel(interval)) return;
draft.features.heartbeat.intervalMin = Number(interval.trim());
const skipPolicy = await p.select({
message: 'Heartbeat skip policy after user activity',
options: [
{ value: 'fraction', label: 'Fraction of interval', hint: 'default: 0.5 × interval' },
{ value: 'fixed', label: 'Fixed minutes', hint: 'manual skip window (legacy behavior)' },
{ value: 'off', label: 'Disabled', hint: 'never skip based on recent user message' },
],
initialValue: draft.features.heartbeat.skipRecentPolicy ?? 'fraction',
});
if (p.isCancel(skipPolicy)) return;
draft.features.heartbeat.skipRecentPolicy = skipPolicy as 'fixed' | 'fraction' | 'off';
if (skipPolicy === 'fixed') {
const skipMin = await p.text({
message: 'Skip heartbeats for this many minutes after user messages',
placeholder: '5',
initialValue: String(draft.features.heartbeat.skipRecentUserMin ?? 5),
validate: (value) => {
const parsed = Number(value.trim());
if (!Number.isFinite(parsed) || parsed < 0) {
return 'Enter a non-negative number';
}
return undefined;
},
});
if (p.isCancel(skipMin)) return;
draft.features.heartbeat.skipRecentUserMin = Number(skipMin.trim());
delete draft.features.heartbeat.skipRecentFraction;
} else if (skipPolicy === 'fraction') {
const skipFraction = await p.text({
message: 'Skip window as fraction of interval (0-1)',
placeholder: '0.5',
initialValue: String(draft.features.heartbeat.skipRecentFraction ?? 0.5),
validate: (value) => {
const parsed = Number(value.trim());
if (!Number.isFinite(parsed) || parsed < 0 || parsed > 1) {
return 'Enter a number between 0 and 1';
}
return undefined;
},
});
if (p.isCancel(skipFraction)) return;
draft.features.heartbeat.skipRecentFraction = Number(skipFraction.trim());
delete draft.features.heartbeat.skipRecentUserMin;
} else {
delete draft.features.heartbeat.skipRecentUserMin;
delete draft.features.heartbeat.skipRecentFraction;
}
const interruptOnUserMessage = await p.confirm({
message: 'Interrupt in-flight heartbeat when a user message arrives?',
initialValue: draft.features.heartbeat.interruptOnUserMessage !== false,
});
if (p.isCancel(interruptOnUserMessage)) return;
draft.features.heartbeat.interruptOnUserMessage = interruptOnUserMessage;
}
}

View File

@@ -262,14 +262,17 @@ describe('server.api config (canonical location)', () => {
expect(env.API_CORS_ORIGIN).toBe('*');
});
it('configToEnv should map heartbeat skip window env var', () => {
it('configToEnv should map heartbeat skip/preemption env vars', () => {
const config: LettaBotConfig = {
...DEFAULT_CONFIG,
features: {
heartbeat: {
enabled: true,
intervalMin: 30,
skipRecentPolicy: 'fraction',
skipRecentFraction: 0.5,
skipRecentUserMin: 4,
interruptOnUserMessage: false,
},
},
};
@@ -277,6 +280,9 @@ describe('server.api config (canonical location)', () => {
const env = configToEnv(config);
expect(env.HEARTBEAT_INTERVAL_MIN).toBe('30');
expect(env.HEARTBEAT_SKIP_RECENT_USER_MIN).toBe('4');
expect(env.HEARTBEAT_SKIP_RECENT_POLICY).toBe('fraction');
expect(env.HEARTBEAT_SKIP_RECENT_FRACTION).toBe('0.5');
expect(env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE).toBe('false');
});
it('configToEnv should fall back to top-level api (deprecated)', () => {

View File

@@ -527,6 +527,15 @@ export function configToEnv(config: LettaBotConfig): Record<string, string> {
if (config.features.heartbeat.skipRecentUserMin !== undefined) {
env.HEARTBEAT_SKIP_RECENT_USER_MIN = String(config.features.heartbeat.skipRecentUserMin);
}
if (config.features.heartbeat.skipRecentPolicy !== undefined) {
env.HEARTBEAT_SKIP_RECENT_POLICY = config.features.heartbeat.skipRecentPolicy;
}
if (config.features.heartbeat.skipRecentFraction !== undefined) {
env.HEARTBEAT_SKIP_RECENT_FRACTION = String(config.features.heartbeat.skipRecentFraction);
}
if (config.features.heartbeat.interruptOnUserMessage !== undefined) {
env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE = config.features.heartbeat.interruptOnUserMessage ? 'true' : 'false';
}
}
if (config.features?.sleeptime) {
if (config.features.sleeptime.trigger) {

View File

@@ -34,6 +34,7 @@ describe('normalizeAgents', () => {
'BLUESKY_NOTIFICATIONS_ENABLED', 'BLUESKY_NOTIFICATIONS_INTERVAL_SEC', 'BLUESKY_NOTIFICATIONS_LIMIT',
'BLUESKY_NOTIFICATIONS_PRIORITY', 'BLUESKY_NOTIFICATIONS_REASONS',
'HEARTBEAT_ENABLED', 'HEARTBEAT_INTERVAL_MIN', 'HEARTBEAT_SKIP_RECENT_USER_MIN',
'HEARTBEAT_SKIP_RECENT_POLICY', 'HEARTBEAT_SKIP_RECENT_FRACTION', 'HEARTBEAT_INTERRUPT_ON_USER_MESSAGE',
'SLEEPTIME_TRIGGER', 'SLEEPTIME_BEHAVIOR', 'SLEEPTIME_STEP_COUNT',
'CRON_ENABLED',
];
@@ -424,6 +425,30 @@ describe('normalizeAgents', () => {
});
});
it('should pick up heartbeat policy and preemption settings from env vars', () => {
process.env.HEARTBEAT_ENABLED = 'true';
process.env.HEARTBEAT_INTERVAL_MIN = '30';
process.env.HEARTBEAT_SKIP_RECENT_POLICY = 'fraction';
process.env.HEARTBEAT_SKIP_RECENT_FRACTION = '0.5';
process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE = 'false';
const config: LettaBotConfig = {
server: { mode: 'cloud' },
agent: { name: 'TestBot', model: 'test' },
channels: {},
};
const agents = normalizeAgents(config);
expect(agents[0].features?.heartbeat).toEqual({
enabled: true,
intervalMin: 30,
skipRecentPolicy: 'fraction',
skipRecentFraction: 0.5,
interruptOnUserMessage: false,
});
});
it('should pick up sleeptime from env vars when YAML features is empty', () => {
process.env.SLEEPTIME_TRIGGER = 'step-count';
process.env.SLEEPTIME_BEHAVIOR = 'reminder';
@@ -511,6 +536,8 @@ describe('normalizeAgents', () => {
it('should not override YAML heartbeat with env vars', () => {
process.env.HEARTBEAT_ENABLED = 'true';
process.env.HEARTBEAT_INTERVAL_MIN = '99';
process.env.HEARTBEAT_SKIP_RECENT_POLICY = 'off';
process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE = 'false';
const config: LettaBotConfig = {
server: { mode: 'cloud' },
@@ -521,6 +548,8 @@ describe('normalizeAgents', () => {
enabled: true,
intervalMin: 10,
skipRecentUserMin: 3,
skipRecentPolicy: 'fixed',
interruptOnUserMessage: true,
},
},
};
@@ -530,6 +559,8 @@ describe('normalizeAgents', () => {
// YAML values should win
expect(agents[0].features?.heartbeat?.intervalMin).toBe(10);
expect(agents[0].features?.heartbeat?.skipRecentUserMin).toBe(3);
expect(agents[0].features?.heartbeat?.skipRecentPolicy).toBe('fixed');
expect(agents[0].features?.heartbeat?.interruptOnUserMessage).toBe(true);
});
it('should not override YAML sleeptime with env vars', () => {

View File

@@ -11,6 +11,7 @@ import { createLogger } from '../logger.js';
const log = createLogger('Config');
export type ServerMode = 'api' | 'docker' | 'cloud' | 'selfhosted';
export type CanonicalServerMode = 'api' | 'docker';
export type HeartbeatSkipRecentPolicy = 'fixed' | 'fraction' | 'off';
export function canonicalizeServerMode(mode?: ServerMode): CanonicalServerMode {
return mode === 'docker' || mode === 'selfhosted' ? 'docker' : 'api';
@@ -89,6 +90,9 @@ export interface AgentConfig {
enabled: boolean;
intervalMin?: number;
skipRecentUserMin?: number; // Skip auto-heartbeats for N minutes after user message (0 disables)
skipRecentPolicy?: HeartbeatSkipRecentPolicy; // 'fixed' | 'fraction' | 'off'
skipRecentFraction?: number; // Fraction of intervalMin when policy=fraction (0-1)
interruptOnUserMessage?: boolean; // Cancel in-flight heartbeat when user messages arrive
prompt?: string; // Custom heartbeat prompt (replaces default body)
promptFile?: string; // Path to prompt file (re-read each tick for live editing)
target?: string; // Delivery target ("telegram:123", "slack:C123", etc.)
@@ -185,6 +189,9 @@ export interface LettaBotConfig {
enabled: boolean;
intervalMin?: number;
skipRecentUserMin?: number; // Skip auto-heartbeats for N minutes after user message (0 disables)
skipRecentPolicy?: HeartbeatSkipRecentPolicy; // 'fixed' | 'fraction' | 'off'
skipRecentFraction?: number; // Fraction of intervalMin when policy=fraction (0-1)
interruptOnUserMessage?: boolean; // Cancel in-flight heartbeat when user messages arrive
prompt?: string; // Custom heartbeat prompt (replaces default body)
promptFile?: string; // Path to prompt file (re-read each tick for live editing)
target?: string; // Delivery target ("telegram:123", "slack:C123", etc.)
@@ -804,11 +811,29 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
const skipRecentUserMin = process.env.HEARTBEAT_SKIP_RECENT_USER_MIN
? parseInt(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN, 10)
: undefined;
const skipRecentPolicyRaw = process.env.HEARTBEAT_SKIP_RECENT_POLICY;
const skipRecentPolicy = skipRecentPolicyRaw === 'fixed'
|| skipRecentPolicyRaw === 'fraction'
|| skipRecentPolicyRaw === 'off'
? skipRecentPolicyRaw
: undefined;
const skipRecentFraction = process.env.HEARTBEAT_SKIP_RECENT_FRACTION
? Number(process.env.HEARTBEAT_SKIP_RECENT_FRACTION)
: undefined;
const interruptOnUserMessageRaw = process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE;
const interruptOnUserMessage = interruptOnUserMessageRaw === 'true'
? true
: interruptOnUserMessageRaw === 'false'
? false
: undefined;
features.heartbeat = {
enabled: true,
...(Number.isFinite(intervalMin) ? { intervalMin } : {}),
...(Number.isFinite(skipRecentUserMin) ? { skipRecentUserMin } : {}),
...(skipRecentPolicy ? { skipRecentPolicy } : {}),
...(Number.isFinite(skipRecentFraction) ? { skipRecentFraction } : {}),
...(interruptOnUserMessage !== undefined ? { interruptOnUserMessage } : {}),
};
}

View File

@@ -288,6 +288,8 @@ export class LettaBot implements AgentSession {
private processing = false; // Global lock for shared mode
private processingKeys: Set<string> = new Set(); // Per-key locks for per-channel mode
private cancelledKeys: Set<string> = new Set(); // Tracks keys where /cancel was issued
private backgroundCancelledKeys: Set<string> = new Set(); // Tracks background runs cancelled by live user activity
private activeBackgroundTriggerByKey: Map<string, string> = new Map();
private sendSequence = 0; // Monotonic counter for desync diagnostics
// Forward-looking: stale-result detection via runIds becomes active once the
// SDK surfaces non-empty result run_ids. Until then, this map mostly stays
@@ -971,6 +973,23 @@ export class LettaBot implements AgentSession {
// =========================================================================
// Message queue
// =========================================================================
private maybePreemptHeartbeatForUserMessage(convKey: string): void {
if (this.config.interruptHeartbeatOnUserMessage === false) {
return;
}
if (this.activeBackgroundTriggerByKey.get(convKey) !== 'heartbeat') {
return;
}
this.backgroundCancelledKeys.add(convKey);
const session = this.sessionManager.getSession(convKey);
if (session) {
session.abort().catch(() => {});
}
log.info(`Preempted in-flight heartbeat due to user message (key=${convKey})`);
}
private async handleMessage(msg: InboundMessage, adapter: ChannelAdapter): Promise<void> {
// AskUserQuestion support: if the agent is waiting for a user answer,
@@ -987,6 +1006,8 @@ export class LettaBot implements AgentSession {
return;
}
this.maybePreemptHeartbeatForUserMessage(incomingConvKey);
log.info(`Message from ${msg.userId} on ${msg.channel}: ${msg.text}`);
if (msg.isGroup && this.groupBatcher) {
@@ -1881,7 +1902,9 @@ export class LettaBot implements AgentSession {
): Promise<string> {
const isSilent = context?.outputMode === 'silent';
const convKey = this.resolveHeartbeatConversationKey();
const triggerType = context?.type ?? 'heartbeat';
const acquired = await this.acquireLock(convKey);
this.activeBackgroundTriggerByKey.set(convKey, triggerType);
const sendT0 = performance.now();
const sendTurnId = this.turnLogger ? generateTurnId() : '';
@@ -1892,15 +1915,29 @@ export class LettaBot implements AgentSession {
let retried = false;
while (true) {
if (this.backgroundCancelledKeys.has(convKey)) {
log.info(`sendToAgent: background run pre-cancelled by live user activity (key=${convKey})`);
return '';
}
const { session, stream } = await this.sessionManager.runSession(text, { convKey, retried });
try {
if (this.backgroundCancelledKeys.has(convKey)) {
session.abort().catch(() => {});
log.info(`sendToAgent: background run cancelled before stream start (key=${convKey})`);
return '';
}
let response = '';
let sawStaleDuplicateResult = false;
let approvalRetryPending = false;
let usedMessageCli = false;
let lastErrorDetail: StreamErrorDetail | undefined;
for await (const msg of stream()) {
if (this.backgroundCancelledKeys.has(convKey)) {
session.abort().catch(() => {});
log.info(`sendToAgent: cancelled in-flight background stream (key=${convKey})`);
return '';
}
sendTurnAcc?.feedRaw(msg);
if (msg.type === 'tool_call') {
this.sessionManager.syncTodoToolCall(msg);
@@ -1930,6 +1967,10 @@ export class LettaBot implements AgentSession {
// TODO(letta-code-sdk#31): Remove once SDK handles HITL approvals in bypassPermissions mode.
if (msg.success === false || msg.error) {
if (this.backgroundCancelledKeys.has(convKey)) {
log.info(`sendToAgent: cancelled heartbeat produced terminal error result; suppressing (key=${convKey})`);
return '';
}
// Enrich opaque errors from run metadata (mirrors processMessage logic).
const convId = typeof msg.conversationId === 'string' ? msg.conversationId : undefined;
if (this.store.agentId &&
@@ -2041,11 +2082,17 @@ export class LettaBot implements AgentSession {
} catch (error) {
// Invalidate on stream errors so next call gets a fresh subprocess
this.sessionManager.invalidateSession(convKey);
if (this.backgroundCancelledKeys.has(convKey)) {
log.info(`sendToAgent: background run ended after cancellation (key=${convKey})`);
return '';
}
throw error;
}
}
} finally {
this.activeBackgroundTriggerByKey.delete(convKey);
this.backgroundCancelledKeys.delete(convKey);
if (this.config.reuseSession === false) {
this.sessionManager.invalidateSession(convKey);
}

View File

@@ -882,6 +882,66 @@ describe('SDK session contract', () => {
expect(processSpy).toHaveBeenCalledWith('slack');
});
it('preempts an in-flight heartbeat when a user message arrives on the same key', async () => {
const streamStarted = deferred<void>();
const releaseStream = deferred<void>();
let aborted = false;
const mockSession = {
initialize: vi.fn(async () => undefined),
send: vi.fn(async (_message: unknown) => undefined),
stream: vi.fn(() =>
(async function* () {
streamStarted.resolve();
await releaseStream.promise;
if (aborted) {
throw new Error('aborted');
}
yield { type: 'assistant', content: 'heartbeat reply' };
yield { type: 'result', success: true };
})()
),
abort: vi.fn(async () => {
aborted = true;
releaseStream.resolve();
}),
close: vi.fn(() => undefined),
recoverPendingApprovals: vi.fn(async () => ({ recovered: false, unsupported: true, detail: 'mock' })),
agentId: 'agent-contract-test',
conversationId: 'conversation-contract-test',
};
vi.mocked(resumeSession).mockReturnValue(mockSession as never);
const bot = new LettaBot({
workingDir: join(dataDir, 'working'),
allowedTools: [],
interruptHeartbeatOnUserMessage: true,
});
const botInternal = bot as any;
const processQueueSpy = vi.spyOn(botInternal, 'processQueue').mockResolvedValue(undefined);
const heartbeatPromise = bot.sendToAgent('heartbeat');
await streamStarted.promise;
await botInternal.handleMessage({
userId: 'u1',
channel: 'telegram',
chatId: 'c1',
text: 'hi during heartbeat',
timestamp: new Date(),
isGroup: false,
}, {} as any);
const response = await heartbeatPromise;
await Promise.resolve();
expect(response).toBe('');
expect(mockSession.abort).toHaveBeenCalledTimes(1);
expect(botInternal.processing).toBe(false);
expect(processQueueSpy).toHaveBeenCalled();
});
it('LRU eviction in per-chat mode does not close active keys', async () => {
const createdSession = {
initialize: vi.fn(async () => undefined),

View File

@@ -204,6 +204,7 @@ export interface BotConfig {
// Conversation routing
conversationMode?: 'disabled' | 'shared' | 'per-channel' | 'per-chat'; // Default: shared
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
interruptHeartbeatOnUserMessage?: boolean; // Default true. Cancel in-flight heartbeat on user message.
conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode)
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed at cost of ~5s latency per message.

View File

@@ -251,6 +251,7 @@ describe('HeartbeatService prompt resolution', () => {
const service = new HeartbeatService(bot, createConfig({
workingDir: tmpDir,
skipRecentPolicy: 'fixed',
skipRecentUserMinutes: 5,
}));
@@ -267,6 +268,7 @@ describe('HeartbeatService prompt resolution', () => {
const service = new HeartbeatService(bot, createConfig({
workingDir: tmpDir,
skipRecentPolicy: 'fixed',
skipRecentUserMinutes: 0,
}));
@@ -274,6 +276,40 @@ describe('HeartbeatService prompt resolution', () => {
expect(bot.sendToAgent).toHaveBeenCalledTimes(1);
});
it('defaults to fraction policy (0.5 of interval) when no fixed window is configured', async () => {
const bot = createMockBot();
(bot.getLastUserMessageTime as ReturnType<typeof vi.fn>).mockReturnValue(
new Date(Date.now() - 10 * 60 * 1000),
);
const service = new HeartbeatService(bot, createConfig({
workingDir: tmpDir,
intervalMinutes: 30,
}));
await (service as any).runHeartbeat(false);
// 30m * 0.5 => 15m skip window; 10m ago should skip.
expect(bot.sendToAgent).not.toHaveBeenCalled();
});
it('does not skip when skipRecentPolicy is off', async () => {
const bot = createMockBot();
(bot.getLastUserMessageTime as ReturnType<typeof vi.fn>).mockReturnValue(
new Date(Date.now() - 1 * 60 * 1000),
);
const service = new HeartbeatService(bot, createConfig({
workingDir: tmpDir,
skipRecentPolicy: 'off',
skipRecentUserMinutes: 60,
}));
await (service as any).runHeartbeat(false);
expect(bot.sendToAgent).toHaveBeenCalledTimes(1);
});
});
// ── Memfs health check ─────────────────────────────────────────────────

View File

@@ -48,6 +48,8 @@ export interface HeartbeatConfig {
enabled: boolean;
intervalMinutes: number;
skipRecentUserMinutes?: number; // Default 5. Set to 0 to disable skip logic.
skipRecentPolicy?: 'fixed' | 'fraction' | 'off';
skipRecentFraction?: number; // Used when policy=fraction. Expected range: 0-1.
workingDir: string;
agentKey: string;
@@ -80,12 +82,51 @@ export class HeartbeatService {
this.config = config;
}
private getSkipWindowMs(): number {
const raw = this.config.skipRecentUserMinutes;
if (raw === undefined || !Number.isFinite(raw) || raw < 0) {
return 5 * 60 * 1000; // default: 5 minutes
private getSkipRecentPolicy(): 'fixed' | 'fraction' | 'off' {
const configured = this.config.skipRecentPolicy;
if (configured === 'fixed' || configured === 'fraction' || configured === 'off') {
return configured;
}
return Math.floor(raw * 60 * 1000);
// Backward compatibility: if explicit minutes are configured, preserve the
// historical fixed-window behavior unless policy is explicitly set.
if (this.config.skipRecentUserMinutes !== undefined) {
return 'fixed';
}
// New default: skip for half the heartbeat interval.
return 'fraction';
}
private getSkipWindow(): { policy: 'fixed' | 'fraction' | 'off'; minutes: number; milliseconds: number } {
const policy = this.getSkipRecentPolicy();
if (policy === 'off') {
return { policy, minutes: 0, milliseconds: 0 };
}
if (policy === 'fraction') {
const rawFraction = this.config.skipRecentFraction;
const fraction = rawFraction !== undefined && Number.isFinite(rawFraction)
? Math.max(0, Math.min(1, rawFraction))
: 0.5;
const minutes = Math.ceil(Math.max(0, this.config.intervalMinutes) * fraction);
return {
policy,
minutes,
milliseconds: Math.floor(minutes * 60 * 1000),
};
}
const raw = this.config.skipRecentUserMinutes;
const minutes = (raw === undefined || !Number.isFinite(raw) || raw < 0)
? 5
: raw;
return {
policy,
minutes,
milliseconds: Math.floor(minutes * 60 * 1000),
};
}
/**
@@ -207,17 +248,19 @@ export class HeartbeatService {
// Skip if user sent a message in the configured window (unless manual trigger)
if (!skipRecentCheck) {
const skipWindowMs = this.getSkipWindowMs();
const { policy, minutes: skipWindowMin, milliseconds: skipWindowMs } = this.getSkipWindow();
const lastUserMessage = this.bot.getLastUserMessageTime();
if (skipWindowMs > 0 && lastUserMessage) {
const msSinceLastMessage = now.getTime() - lastUserMessage.getTime();
if (msSinceLastMessage < skipWindowMs) {
const minutesAgo = Math.round(msSinceLastMessage / 60000);
log.info(`User messaged ${minutesAgo}m ago - skipping heartbeat`);
log.info(`User messaged ${minutesAgo}m ago - skipping heartbeat (policy=${policy}, window=${skipWindowMin}m)`);
logEvent('heartbeat_skipped_recent_user', {
lastUserMessage: lastUserMessage.toISOString(),
minutesAgo,
skipPolicy: policy,
skipWindowMin,
});
return;
}

View File

@@ -218,6 +218,19 @@ function ensureRequiredTools(tools: string[]): string[] {
return out;
}
function parseOptionalBoolean(raw?: string): boolean | undefined {
if (raw === 'true') return true;
if (raw === 'false') return false;
return undefined;
}
function parseHeartbeatSkipRecentPolicy(raw?: string): 'fixed' | 'fraction' | 'off' | undefined {
if (raw === 'fixed' || raw === 'fraction' || raw === 'off') {
return raw;
}
return undefined;
}
// Global config (shared across all agents)
const globalConfig = {
workingDir: getWorkingDir(),
@@ -232,6 +245,9 @@ const globalConfig = {
attachmentsMaxAgeDays: resolveAttachmentsMaxAgeDays(),
cronEnabled: process.env.CRON_ENABLED === 'true', // Legacy env var fallback
heartbeatSkipRecentUserMin: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN),
heartbeatSkipRecentPolicy: parseHeartbeatSkipRecentPolicy(process.env.HEARTBEAT_SKIP_RECENT_POLICY),
heartbeatSkipRecentFraction: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_FRACTION),
heartbeatInterruptOnUserMessage: parseOptionalBoolean(process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE),
};
// Validate LETTA_API_KEY is set for API mode (docker mode doesn't require it)
@@ -350,6 +366,7 @@ async function main() {
const cronStorePath = cronStoreFilename
? resolve(getCronDataDir(), cronStoreFilename)
: undefined;
const heartbeatConfig = agentConfig.features?.heartbeat;
const bot = new LettaBot({
workingDir: resolvedWorkingDir,
@@ -366,6 +383,10 @@ async function main() {
display: agentConfig.features?.display,
conversationMode: agentConfig.conversations?.mode || 'shared',
heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active',
interruptHeartbeatOnUserMessage:
heartbeatConfig?.interruptOnUserMessage
?? globalConfig.heartbeatInterruptOnUserMessage
?? true,
conversationOverrides: agentConfig.conversations?.perChannel,
maxSessions: agentConfig.conversations?.maxSessions,
reuseSession: agentConfig.conversations?.reuseSession,
@@ -469,11 +490,12 @@ async function main() {
}
// Per-agent heartbeat
const heartbeatConfig = agentConfig.features?.heartbeat;
const heartbeatService = new HeartbeatService(bot, {
enabled: heartbeatConfig?.enabled ?? false,
intervalMinutes: heartbeatConfig?.intervalMin ?? 240,
skipRecentUserMinutes: heartbeatConfig?.skipRecentUserMin ?? globalConfig.heartbeatSkipRecentUserMin,
skipRecentPolicy: heartbeatConfig?.skipRecentPolicy ?? globalConfig.heartbeatSkipRecentPolicy,
skipRecentFraction: heartbeatConfig?.skipRecentFraction ?? globalConfig.heartbeatSkipRecentFraction,
agentKey: agentConfig.name,
memfs: resolvedMemfs,
prompt: heartbeatConfig?.prompt || process.env.HEARTBEAT_PROMPT,