refactor: extract SessionManager from bot.ts (#457)

This commit is contained in:
Cameron
2026-03-02 14:46:00 -08:00
committed by GitHub
parent e40a8d61ee
commit 5a58d759e0
8 changed files with 866 additions and 659 deletions

View File

@@ -13,6 +13,7 @@ LettaBot is a multi-channel AI assistant powered by [Letta](https://letta.com) t
- [Chat API](./configuration.md#chat-endpoint) - HTTP endpoint for programmatic agent access
- [Open WebUI Setup](./openwebui-setup.md) - Web chat UI via OpenAI-compatible API
- [Response Directives](./directives.md) - XML action directives (reactions, etc.)
- [Skills](./skills.md) - Skills architecture and authoring guide
- [Scheduling Tasks](./cron-setup.md) - Cron jobs and heartbeats
- [Gmail Pub/Sub](./gmail-pubsub.md) - Email notifications integration
- [Railway Deployment](./railway-deploy.md) - Railway-specific setup (one-click deploy, volumes)

154
docs/skills.md Normal file
View File

@@ -0,0 +1,154 @@
# Skills
Skills extend your LettaBot agent with CLI tools and specialized knowledge. They follow the open [Agent Skills](https://docs.letta.com/letta-code/skills) standard used by Letta Code, Cursor, Claude Code, and other compatible agents.
This document covers how skills work within lettabot specifically -- directory hierarchy, feature-gated installation, the SKILL.md format, and how to author new skills.
## How skills work
Skills go through two phases:
1. **Installation** -- Feature-gated skills (scheduling, Google, voice memo) are automatically copied to the agent's skill directory based on config flags in `lettabot.yaml`. Non-feature-gated skills are discovered directly from the directory hierarchy.
2. **Runtime** -- When a session starts, skill directories containing executables are prepended to `PATH` so the agent can invoke them as CLI tools.
## Directory hierarchy
LettaBot scans these directories in priority order. Same-name skills at higher priority override lower ones:
| Priority | Path | Scope | Description |
|----------|------|-------|-------------|
| 1 (highest) | `.skills/` | Project | Skills specific to this lettabot project |
| 2 | `~/.letta/agents/{id}/skills/` | Agent | Skills for one specific agent |
| 3 | `~/.letta/skills/` | Global | Shared across all agents on this machine |
| 4 | `skills/` (in lettabot repo) | Bundled | Ships with lettabot |
| 5 (lowest) | `~/.agents/skills/` | skills.sh | Installed via [skills.sh](https://skills.sh) |
Feature-gated skills are copied from source directories into the agent-scoped directory (`~/.letta/agents/{id}/skills/`) when a session is first acquired. The copy is idempotent -- skills already present in the target are skipped.
## Feature-gated skills
Some skills are only installed when their corresponding feature is enabled in `lettabot.yaml`:
| Config flag | Skills installed | Purpose |
|------------|-----------------|---------|
| `features.cron: true` | `scheduling` | Cron jobs and one-off reminders via `lettabot-schedule` |
| `integrations.google.enabled: true` or `polling.gmail.enabled: true` | `gog`, `google` | Google Workspace and Gmail integration |
| TTS provider configured (ElevenLabs or OpenAI key set) | `voice-memo` | Voice memo replies via `lettabot-tts` |
The mapping from config flags to skill names lives in `FEATURE_SKILLS` in `src/skills/loader.ts`. The code also supports passing explicit skill names programmatically via `additionalSkills` in `SkillsInstallConfig`.
## SKILL.md format
Each skill is a directory containing a `SKILL.md` file with YAML frontmatter and a body written for the agent (not humans). The body is loaded into the agent's context when the skill is relevant -- it's a prompt, not documentation.
```markdown
---
name: scheduling
description: Create scheduled tasks and one-off reminders.
---
# Scheduling
(Agent-facing instructions: CLI usage, when to use the skill, examples, constraints...)
```
### Frontmatter fields
| Field | Required | Description |
|-------|----------|-------------|
| `name` | Yes | Skill identifier (must be unique within its scope) |
| `description` | No | Brief description shown to the agent |
| `emoji` | No | Display emoji |
| `homepage` | No | URL for the skill's homepage or docs |
| `metadata` | No | JSON-encoded object with a `clawdbot` key (see below) |
### ClawdBot metadata
The `metadata` field can contain a JSON-encoded `clawdbot` object for requirements and install specs:
```yaml
metadata: >-
{"clawdbot": {
"emoji": "📦",
"requires": {"bins": ["mycli"], "env": ["MY_API_KEY"]},
"install": [{"kind": "brew", "formula": "mycli"}]
}}
```
**`requires`** -- prerequisites for the skill to be eligible:
| Field | Type | Description |
|-------|------|-------------|
| `bins` | `string[]` | All of these binaries must exist on PATH |
| `anyBins` | `string[]` | At least one of these must exist |
| `env` | `string[]` | Required environment variables |
Run `lettabot skills status` to see which skills are eligible and which have missing binaries, environment variables, or platform mismatches.
**`install`** -- how to install dependencies (tried in order, filtered by platform):
| Field | Type | Description |
|-------|------|-------------|
| `kind` | `'brew' \| 'node' \| 'go' \| 'uv' \| 'download'` | Package manager |
| `formula` | `string` | Homebrew formula (for `brew`) |
| `package` | `string` | npm/uv package name (for `node`/`uv`) |
| `module` | `string` | Go module path (for `go`) |
| `url` | `string` | Download URL (for `download`) |
| `bins` | `string[]` | Binaries this installs |
| `os` | `string[]` | Platform filter (`darwin`, `linux`, `win32`) |
| `label` | `string` | Display label for the install option |
**Other metadata fields:**
| Field | Type | Description |
|-------|------|-------------|
| `os` | `string[]` | Restrict skill to these platforms |
| `always` | `boolean` | Always eligible regardless of requirements |
| `skillKey` | `string` | Override the skill's key identifier |
| `primaryEnv` | `string` | Primary environment variable for the skill |
## Skill execution
When a session starts, `withAgentSkillsOnPath()` in `src/skills/loader.ts` prepends the agent's skill directories to `PATH`. Only directories containing at least one non-`.md` file (i.e., executables or scripts) are added.
PATH mutations are serialized via a lock to avoid races when multiple sessions initialize concurrently. The original PATH is restored after the session operation completes.
## Bundled skills
LettaBot ships with two built-in skills in the `skills/` directory:
- **scheduling** -- Create recurring cron jobs and one-off reminders via the `lettabot-schedule` CLI. Enabled by `features.cron: true`. See [Cron Setup](./cron-setup.md) for details.
- **voice-memo** -- Reply with voice notes using the `<voice>` directive and `lettabot-tts` CLI. Enabled when a TTS provider (ElevenLabs or OpenAI) is configured.
## Installing external skills
Use the interactive CLI to discover and enable skills:
```bash
# Interactive skill selector
lettabot skills
# Enable/disable skills from Clawdhub, skills.sh, and built-in sources
lettabot skills sync
# Check status of all discovered skills
lettabot skills status
```
External skill sources:
- [Clawdhub](https://clawdhub.com/) -- `npx clawdhub@latest install <skill>`
- [skills.sh](https://skills.sh) -- community skill repositories
See the [Letta Code skills documentation](https://docs.letta.com/letta-code/skills) for the general skill installation flow and additional skill sources.
## Authoring a new skill
To add a skill to lettabot:
1. Create a directory under `skills/<name>/` with a `SKILL.md` file. The frontmatter declares metadata (see above). The body is a prompt loaded into the agent's context -- write it as instructions the agent will follow, not as human documentation.
2. Place any executables or scripts alongside `SKILL.md` in the same directory. These become available on the agent's PATH at runtime.
3. If the skill should be feature-gated (only installed when a config flag is set), add an entry to `FEATURE_SKILLS` in `src/skills/loader.ts` and wire up the corresponding config flag in `main.ts`.
4. Verify with `lettabot skills status` that the skill is discovered and eligible.
For project-local skills that don't ship with lettabot, place them in `.skills/<name>/` instead.

16
lettabot-agent.json.bak Normal file
View File

@@ -0,0 +1,16 @@
{
"version": 2,
"agents": {
"LettaBot": {
"agentId": "agent-78eb94ac-d2ef-414f-8b74-a02a95e16b33",
"lastUsedAt": "2026-03-02T21:48:34.231Z",
"createdAt": "2026-03-02T21:48:34.231Z",
"lastMessageTarget": {
"channel": "discord",
"chatId": "chat-1",
"updatedAt": "2026-03-02T22:45:13.260Z"
},
"recoveryAttempts": 0
}
}
}

View File

@@ -4,28 +4,25 @@
* Single agent, single conversation - chat continues across all channels.
*/
import { createAgent, createSession, resumeSession, imageFromFile, imageFromURL, type Session, type MessageContentItem, type SendMessage, type CanUseToolCallback } from '@letta-ai/letta-code-sdk';
import { imageFromFile, imageFromURL, type Session, type MessageContentItem, type SendMessage, type CanUseToolCallback } from '@letta-ai/letta-code-sdk';
import { mkdirSync, existsSync } from 'node:fs';
import { access, unlink, realpath, stat, constants } from 'node:fs/promises';
import { execFile } from 'node:child_process';
import { extname, resolve, join } from 'node:path';
import type { ChannelAdapter } from '../channels/types.js';
import type { BotConfig, InboundMessage, TriggerContext, StreamMsg } from './types.js';
import { isApprovalConflictError, isConversationMissingError, isAgentMissingFromInitError, formatApiErrorForUser } from './errors.js';
import { formatApiErrorForUser } from './errors.js';
import { formatToolCallDisplay, formatReasoningDisplay, formatQuestionsForChannel } from './display.js';
import type { AgentSession } from './interfaces.js';
import { Store } from './store.js';
import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js';
import { installSkillsToAgent, withAgentSkillsOnPath, getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js';
import { getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js';
import { getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js';
import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js';
import type { GroupBatcher } from './group-batcher.js';
import { loadMemoryBlocks } from './memory.js';
import { redactOutbound } from './redact.js';
import { SYSTEM_PROMPT } from './system-prompt.js';
import { parseDirectives, stripActionsBlock, type Directive } from './directives.js';
import { resolveEmoji } from './emoji.js';
import { createManageTodoTool } from '../tools/todo.js';
import { syncTodosFromTool } from '../todo/store.js';
import { SessionManager } from './session-manager.js';
import { createLogger } from '../logger.js';
@@ -212,28 +209,9 @@ export class LettaBot implements AgentSession {
// In shared mode, a single entry keyed by 'shared' provides legacy behavior.
private pendingQuestionResolvers: Map<string, (text: string) => void> = new Map();
// Persistent sessions: reuse CLI subprocesses across messages.
// In shared mode, only the "shared" key is used. In per-channel mode, each
// channel (and optionally heartbeat) gets its own subprocess. In per-chat
// mode, each unique channel:chatId gets its own subprocess (LRU-evicted).
private sessions: Map<string, Session> = new Map();
private sessionLastUsed: Map<string, number> = new Map(); // LRU tracking for per-chat mode
// Coalesces concurrent ensureSessionForKey calls for the same key so the
// second caller waits for the first instead of creating a duplicate session.
// generation prevents stale in-flight creations from being reused after reset.
private sessionCreationLocks: Map<string, { promise: Promise<Session>; generation: number }> = new Map();
private sessionGenerations: Map<string, number> = new Map();
private currentCanUseTool: CanUseToolCallback | undefined;
private conversationOverrides: Set<string> = new Set();
// Stable callback wrapper so the Session options never change, but we can
// swap out the per-message handler before each send().
private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => {
if (this.currentCanUseTool) {
return this.currentCanUseTool(toolName, toolInput);
}
return { behavior: 'allow' as const };
};
private readonly sessionManager: SessionManager;
constructor(config: BotConfig) {
this.config = config;
mkdirSync(config.workingDir, { recursive: true });
@@ -244,6 +222,7 @@ export class LettaBot implements AgentSession {
if (config.conversationOverrides?.length) {
this.conversationOverrides = new Set(config.conversationOverrides.map((ch) => ch.toLowerCase()));
}
this.sessionManager = new SessionManager(this.store, config, this.processingKeys, this.lastResultRunFingerprints);
log.info(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`);
}
@@ -292,114 +271,9 @@ export class LettaBot implements AgentSession {
return 'fresh';
}
// =========================================================================
// Session options (shared by processMessage and sendToAgent)
// =========================================================================
private getTodoAgentKey(): string {
return this.store.agentId || this.config.agentName || 'LettaBot';
}
private syncTodoToolCall(streamMsg: StreamMsg): void {
if (streamMsg.type !== 'tool_call') return;
const normalizedToolName = (streamMsg.toolName || '').toLowerCase();
const isBuiltInTodoTool = normalizedToolName === 'todowrite'
|| normalizedToolName === 'todo_write'
|| normalizedToolName === 'writetodos'
|| normalizedToolName === 'write_todos';
if (!isBuiltInTodoTool) return;
const input = (streamMsg.toolInput && typeof streamMsg.toolInput === 'object')
? streamMsg.toolInput as Record<string, unknown>
: null;
if (!input || !Array.isArray(input.todos)) return;
const incoming: Array<{
content?: string;
description?: string;
status: 'pending' | 'in_progress' | 'completed' | 'cancelled';
}> = [];
for (const item of input.todos) {
if (!item || typeof item !== 'object') continue;
const obj = item as Record<string, unknown>;
const statusRaw = typeof obj.status === 'string' ? obj.status : '';
if (!['pending', 'in_progress', 'completed', 'cancelled'].includes(statusRaw)) continue;
incoming.push({
content: typeof obj.content === 'string' ? obj.content : undefined,
description: typeof obj.description === 'string' ? obj.description : undefined,
status: statusRaw as 'pending' | 'in_progress' | 'completed' | 'cancelled',
});
}
if (incoming.length === 0) return;
try {
const summary = syncTodosFromTool(this.getTodoAgentKey(), incoming);
if (summary.added > 0 || summary.updated > 0) {
log.info(`Synced ${summary.totalIncoming} todo(s) from ${streamMsg.toolName} into heartbeat store (added=${summary.added}, updated=${summary.updated})`);
}
} catch (err) {
log.warn('Failed to sync TodoWrite todos:', err instanceof Error ? err.message : err);
}
}
private getSessionTimeoutMs(): number {
const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS);
if (Number.isFinite(envTimeoutMs) && envTimeoutMs > 0) {
return envTimeoutMs;
}
return 60000;
}
private async withSessionTimeout<T>(
promise: Promise<T>,
label: string,
): Promise<T> {
const timeoutMs = this.getSessionTimeoutMs();
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<T>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new Error(`${label} timed out after ${timeoutMs}ms`));
}, timeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
if (timeoutId) clearTimeout(timeoutId);
}
}
private baseSessionOptions(canUseTool?: CanUseToolCallback) {
return {
permissionMode: 'bypassPermissions' as const,
allowedTools: this.config.allowedTools,
disallowedTools: [
// Block built-in TodoWrite -- it requires interactive approval (fails
// silently during heartbeats) and writes to the CLI's own store rather
// than lettabot's persistent heartbeat store. The agent should use the
// custom manage_todo tool instead.
'TodoWrite',
...(this.config.disallowedTools || []),
],
cwd: this.config.workingDir,
tools: [createManageTodoTool(this.getTodoAgentKey())],
// Memory filesystem (context repository): true -> --memfs, false -> --no-memfs, undefined -> leave unchanged
...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}),
// In bypassPermissions mode, canUseTool is only called for interactive
// tools (AskUserQuestion, ExitPlanMode). When no callback is provided
// (background triggers), the SDK auto-denies interactive tools.
...(canUseTool ? { canUseTool } : {}),
};
}
// =========================================================================
// AskUserQuestion formatting
// =========================================================================
/**
* Format AskUserQuestion questions as a single channel message.
* Displays each question with numbered options for the user to choose from.
*/
// =========================================================================
// Session lifecycle helpers
// =========================================================================
@@ -585,505 +459,13 @@ export class LettaBot implements AgentSession {
);
}
// =========================================================================
// Session lifecycle (per-key)
// =========================================================================
// Session lifecycle delegated to SessionManager
/**
* Return the persistent session for the given conversation key,
* creating and initializing it if needed.
*
* After initialization, calls bootstrapState() to detect pending approvals.
* If an orphaned approval is found, recovers proactively before returning
* the session -- preventing the first send() from hitting a 409 CONFLICT.
*/
private async ensureSessionForKey(key: string, bootstrapRetried = false): Promise<Session> {
const generation = this.sessionGenerations.get(key) ?? 0;
// Fast path: session already exists
const existing = this.sessions.get(key);
if (existing) {
this.sessionLastUsed.set(key, Date.now());
return existing;
}
// Coalesce concurrent callers: if another call is already creating this
// key (e.g. warmSession running while first message arrives), wait for
// it instead of creating a duplicate session.
const pending = this.sessionCreationLocks.get(key);
if (pending && pending.generation === generation) return pending.promise;
const promise = this._createSessionForKey(key, bootstrapRetried, generation);
this.sessionCreationLocks.set(key, { promise, generation });
try {
return await promise;
} finally {
const current = this.sessionCreationLocks.get(key);
if (current?.promise === promise) {
this.sessionCreationLocks.delete(key);
}
}
}
/** Internal session creation -- called via ensureSessionForKey's lock. */
private async _createSessionForKey(
key: string,
bootstrapRetried: boolean,
generation: number,
): Promise<Session> {
// Session was invalidated while this creation path was queued.
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
return this.ensureSessionForKey(key, bootstrapRetried);
}
// Re-read the store file from disk so we pick up agent/conversation ID
// changes made by other processes (e.g. after a restart or container deploy).
// This costs one synchronous disk read per incoming message, which is fine
// at chat-bot throughput. If this ever becomes a bottleneck, throttle to
// refresh at most once per second.
this.store.refresh();
const opts = this.baseSessionOptions(this.sessionCanUseTool);
let session: Session;
let sessionAgentId: string | undefined;
// In disabled mode, always resume the agent's built-in default conversation.
// Skip store lookup entirely -- no conversation ID is persisted.
const convId = key === 'default'
? null
: key === 'shared'
? this.store.conversationId
: this.store.getConversationId(key);
// Propagate per-agent cron store path to CLI subprocesses (lettabot-schedule)
if (this.config.cronStorePath) {
process.env.CRON_STORE_PATH = this.config.cronStorePath;
}
if (key === 'default' && this.store.agentId) {
process.env.LETTA_AGENT_ID = this.store.agentId;
installSkillsToAgent(this.store.agentId, this.config.skills);
sessionAgentId = this.store.agentId;
session = resumeSession('default', opts);
} else if (convId) {
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
if (this.store.agentId) {
installSkillsToAgent(this.store.agentId, this.config.skills);
sessionAgentId = this.store.agentId;
}
session = resumeSession(convId, opts);
} else if (this.store.agentId) {
// Agent exists but no conversation stored -- resume the default conversation
process.env.LETTA_AGENT_ID = this.store.agentId;
installSkillsToAgent(this.store.agentId, this.config.skills);
sessionAgentId = this.store.agentId;
session = resumeSession(this.store.agentId, opts);
} else {
// Create new agent -- persist immediately so we don't orphan it on later failures
log.info('Creating new agent');
const newAgentId = await createAgent({
systemPrompt: SYSTEM_PROMPT,
memory: loadMemoryBlocks(this.config.agentName),
tags: ['origin:lettabot'],
...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}),
});
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
this.store.setAgent(newAgentId, currentBaseUrl);
log.info('Saved new agent ID:', newAgentId);
if (this.config.agentName) {
updateAgentName(newAgentId, this.config.agentName).catch(() => {});
}
installSkillsToAgent(newAgentId, this.config.skills);
sessionAgentId = newAgentId;
// In disabled mode, resume the built-in default conversation instead of
// creating a new one. Other modes create a fresh conversation per key.
session = key === 'default'
? resumeSession('default', opts)
: createSession(newAgentId, opts);
}
// Initialize eagerly so the subprocess is ready before the first send()
log.info(`Initializing session subprocess (key=${key})...`);
try {
if (sessionAgentId) {
await withAgentSkillsOnPath(
sessionAgentId,
() => this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`),
);
} else {
await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`);
}
log.info(`Session subprocess ready (key=${key})`);
} catch (error) {
// Close immediately so failed initialization cannot leak a subprocess.
session.close();
// If the stored agent ID doesn't exist on the server (deleted externally,
// ghost agent from failed pairing, etc.), clear the stale ID and retry.
// The retry will hit the "else" branch and create a fresh agent.
// Uses bootstrapRetried to prevent infinite recursion if creation also fails.
if (this.store.agentId && !bootstrapRetried && isAgentMissingFromInitError(error)) {
log.warn(
`Agent ${this.store.agentId} appears missing from server, ` +
`clearing stale agent ID and recreating...`,
);
this.store.clearAgent();
return this._createSessionForKey(key, /* bootstrapRetried */ true, generation);
}
throw error;
}
// reset/invalidate can happen while initialize() is in-flight.
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
log.info(`Discarding stale initialized session (key=${key})`);
session.close();
return this.ensureSessionForKey(key, bootstrapRetried);
}
// Proactive approval detection via bootstrapState().
// Single CLI round-trip that returns hasPendingApproval flag alongside
// session metadata. If an orphaned approval is stuck, recover now so the
// first send() doesn't hit a 409 CONFLICT.
if (!bootstrapRetried && this.store.agentId) {
try {
const bootstrap = await this.withSessionTimeout(
session.bootstrapState(),
`Session bootstrapState (key=${key})`,
);
if (bootstrap.hasPendingApproval) {
const convId = bootstrap.conversationId || session.conversationId;
log.warn(`Pending approval detected at session startup (key=${key}, conv=${convId}), recovering...`);
session.close();
if (convId) {
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
convId,
true, /* deepScan */
);
if (result.recovered) {
log.info(`Proactive approval recovery succeeded: ${result.details}`);
} else {
log.warn(`Proactive approval recovery did not find resolvable approvals: ${result.details}`);
}
}
// Recreate session after recovery (conversation state changed).
// Call _createSessionForKey directly (not ensureSessionForKey) since
// we're already inside the creation lock for this key.
return this._createSessionForKey(key, true, generation);
}
} catch (err) {
// bootstrapState failure is non-fatal -- the session is still usable.
// The reactive 409 handler in runSession() will catch stuck approvals.
log.warn(`bootstrapState check failed (key=${key}), continuing:`, err instanceof Error ? err.message : err);
}
}
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
log.info(`Discarding stale session after bootstrapState (key=${key})`);
session.close();
return this.ensureSessionForKey(key, bootstrapRetried);
}
// LRU eviction: in per-chat mode, limit concurrent sessions to avoid
// unbounded subprocess growth. Evicted sessions can be cheaply recreated
// via resumeSession() since conversation IDs are persisted in the store.
const maxSessions = this.config.maxSessions ?? 10;
if (this.config.conversationMode === 'per-chat' && this.sessions.size >= maxSessions) {
let oldestKey: string | null = null;
let oldestTime = Infinity;
for (const [k, ts] of this.sessionLastUsed) {
if (k === key) continue;
if (!this.sessions.has(k)) continue;
// Never evict an active/in-flight key (can close a live stream).
if (this.processingKeys.has(k) || this.sessionCreationLocks.has(k)) continue;
if (ts < oldestTime) {
oldestKey = k;
oldestTime = ts;
}
}
if (oldestKey) {
log.info(`LRU session eviction: closing session for key="${oldestKey}" (${this.sessions.size} active, max=${maxSessions})`);
const evicted = this.sessions.get(oldestKey);
evicted?.close();
this.sessions.delete(oldestKey);
this.sessionLastUsed.delete(oldestKey);
this.sessionGenerations.delete(oldestKey);
this.sessionCreationLocks.delete(oldestKey);
this.lastResultRunFingerprints.delete(oldestKey);
} else {
// All existing sessions are active; allow temporary overflow.
log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`);
}
}
this.sessions.set(key, session);
this.sessionLastUsed.set(key, Date.now());
return session;
}
/** Legacy convenience: resolve key from shared/per-channel mode and delegate. */
private async ensureSession(): Promise<Session> {
return this.ensureSessionForKey('shared');
}
/**
* Destroy session(s). If key provided, destroys only that key.
* If key is undefined, destroys ALL sessions.
*/
private invalidateSession(key?: string): void {
if (key) {
// Invalidate any in-flight creation for this key so reset can force
// a fresh conversation/session immediately.
const nextGeneration = (this.sessionGenerations.get(key) ?? 0) + 1;
this.sessionGenerations.set(key, nextGeneration);
this.sessionCreationLocks.delete(key);
const session = this.sessions.get(key);
if (session) {
log.info(`Invalidating session (key=${key})`);
session.close();
this.sessions.delete(key);
this.sessionLastUsed.delete(key);
}
this.lastResultRunFingerprints.delete(key);
} else {
const keys = new Set<string>([
...this.sessions.keys(),
...this.sessionCreationLocks.keys(),
]);
for (const k of keys) {
const nextGeneration = (this.sessionGenerations.get(k) ?? 0) + 1;
this.sessionGenerations.set(k, nextGeneration);
}
for (const [k, session] of this.sessions) {
log.info(`Invalidating session (key=${k})`);
session.close();
}
this.sessions.clear();
this.sessionCreationLocks.clear();
this.sessionLastUsed.clear();
this.lastResultRunFingerprints.clear();
}
}
/**
* Pre-warm the session subprocess at startup. Call after config/agent is loaded.
* Pre-warm the session subprocess at startup.
*/
async warmSession(): Promise<void> {
this.store.refresh();
if (!this.store.agentId && !this.store.conversationId) return;
try {
const mode = this.config.conversationMode || 'shared';
// In shared mode, warm the single session. In per-channel/per-chat modes,
// warm nothing (sessions are created on first message per key).
if (mode === 'shared') {
await this.ensureSessionForKey('shared');
}
} catch (err) {
log.warn('Session pre-warm failed:', err instanceof Error ? err.message : err);
}
}
/**
* Persist conversation ID after a successful session result.
* Agent ID and first-run setup are handled eagerly in ensureSessionForKey().
*/
private persistSessionState(session: Session, convKey?: string): void {
// Agent ID already persisted in ensureSessionForKey() on creation.
// Here we only update if the server returned a different one (shouldn't happen).
if (session.agentId && session.agentId !== this.store.agentId) {
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
log.info('Agent ID updated:', session.agentId);
} else if (session.conversationId && session.conversationId !== 'default' && convKey !== 'default') {
// In per-channel mode, persist per-key. In shared mode, use legacy field.
// Skip saving "default" -- it's an API alias, not a real conversation ID.
// In disabled mode (convKey === 'default'), skip -- always use the built-in default.
if (convKey && convKey !== 'shared') {
const existing = this.store.getConversationId(convKey);
if (session.conversationId !== existing) {
this.store.setConversationId(convKey, session.conversationId);
log.info(`Conversation ID updated (key=${convKey}):`, session.conversationId);
}
} else if (session.conversationId !== this.store.conversationId) {
this.store.conversationId = session.conversationId;
log.info('Conversation ID updated:', session.conversationId);
}
}
}
/**
* Send a message and return a deduplicated stream.
*
* Handles:
* - Persistent session reuse (subprocess stays alive across messages)
* - CONFLICT recovery from orphaned approvals (retry once)
* - Conversation-not-found fallback (create new conversation)
* - Tool call deduplication
* - Session persistence after result
*/
private async runSession(
message: SendMessage,
options: { retried?: boolean; canUseTool?: CanUseToolCallback; convKey?: string } = {},
): Promise<{ session: Session; stream: () => AsyncGenerator<StreamMsg> }> {
const { retried = false, canUseTool, convKey = 'shared' } = options;
// Update the per-message callback before sending
this.currentCanUseTool = canUseTool;
let session = await this.ensureSessionForKey(convKey);
// Resolve the conversation ID for this key (for error recovery)
const convId = convKey === 'shared'
? this.store.conversationId
: this.store.getConversationId(convKey);
// Send message with fallback chain
try {
await this.withSessionTimeout(session.send(message), `Session send (key=${convKey})`);
} catch (error) {
// 409 CONFLICT from orphaned approval
if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) {
log.info('CONFLICT detected - attempting orphaned approval recovery...');
this.invalidateSession(convKey);
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
convId
);
if (result.recovered) {
log.info(`Recovery succeeded (${result.details}), retrying...`);
return this.runSession(message, { retried: true, canUseTool, convKey });
}
log.error(`Orphaned approval recovery failed: ${result.details}`);
throw error;
}
// Conversation/agent not found - try creating a new conversation.
// Only retry on errors that indicate missing conversation/agent, not
// on auth, network, or protocol errors (which would just fail again).
if (this.store.agentId && isConversationMissingError(error)) {
log.warn(`Conversation not found (key=${convKey}), creating a new conversation...`);
this.invalidateSession(convKey);
if (convKey !== 'shared') {
this.store.clearConversation(convKey);
} else {
this.store.conversationId = null;
}
session = await this.ensureSessionForKey(convKey);
try {
await this.withSessionTimeout(session.send(message), `Session send retry (key=${convKey})`);
} catch (retryError) {
this.invalidateSession(convKey);
throw retryError;
}
} else {
// Unknown error -- invalidate so we get a fresh subprocess next time
this.invalidateSession(convKey);
throw error;
}
}
// Persist conversation ID immediately after successful send, before streaming.
this.persistSessionState(session, convKey);
// Return session and a stream generator that buffers tool_call chunks and
// flushes them with fully accumulated arguments on the next type boundary.
// This ensures display messages always have complete args (channels can't
// edit messages after sending).
const pendingToolCalls = new Map<string, { msg: StreamMsg; accumulatedArgs: string }>();
const self = this;
const capturedConvKey = convKey; // Capture for closure
/** Merge tool argument strings, handling both delta and cumulative chunking. */
function mergeToolArgs(existing: string, incoming: string): string {
if (!incoming) return existing;
if (!existing) return incoming;
if (incoming === existing) return existing;
// Cumulative: latest chunk includes all prior text
if (incoming.startsWith(existing)) return incoming;
if (existing.endsWith(incoming)) return existing;
// Delta: each chunk is an append
return `${existing}${incoming}`;
}
function* flushPending(): Generator<StreamMsg> {
for (const [, pending] of pendingToolCalls) {
if (!pending.accumulatedArgs) {
// No rawArguments accumulated (old SDK or single complete chunk) --
// preserve the original toolInput from the first chunk as-is.
yield pending.msg;
continue;
}
let toolInput: Record<string, unknown> = {};
try { toolInput = JSON.parse(pending.accumulatedArgs); }
catch { toolInput = { raw: pending.accumulatedArgs }; }
yield { ...pending.msg, toolInput };
}
pendingToolCalls.clear();
lastPendingToolCallId = null;
}
let anonToolCallCounter = 0;
let lastPendingToolCallId: string | null = null;
async function* dedupedStream(): AsyncGenerator<StreamMsg> {
for await (const raw of session.stream()) {
const msg = raw as StreamMsg;
if (msg.type === 'tool_call') {
let id = msg.toolCallId;
if (!id) {
// Tool calls without IDs (e.g., from models that don't emit
// tool_call_id on subsequent argument chunks) still need to be
// accumulated. Assign a synthetic ID so they enter the buffer.
// If tool name matches the most recent pending call, treat this as
// a continuation even when the first chunk had a real toolCallId.
const currentPending = lastPendingToolCallId ? pendingToolCalls.get(lastPendingToolCallId) : null;
if (lastPendingToolCallId && currentPending && (currentPending.msg.toolName || 'unknown') === (msg.toolName || 'unknown')) {
id = lastPendingToolCallId;
} else {
id = `__anon_${++anonToolCallCounter}__`;
}
}
const incoming = (msg as StreamMsg & { rawArguments?: string }).rawArguments || '';
const existing = pendingToolCalls.get(id);
if (existing) {
existing.accumulatedArgs = mergeToolArgs(existing.accumulatedArgs, incoming);
} else {
pendingToolCalls.set(id, { msg, accumulatedArgs: incoming });
}
lastPendingToolCallId = id;
continue; // buffer, don't yield yet
}
// Flush pending tool calls on semantic type boundary (not stream_event)
if (pendingToolCalls.size > 0 && msg.type !== 'stream_event') {
yield* flushPending();
}
if (msg.type === 'result') {
// Flush any remaining before result
yield* flushPending();
self.persistSessionState(session, capturedConvKey);
}
yield msg;
if (msg.type === 'result') {
break;
}
}
// Flush remaining at generator end (shouldn't normally happen)
yield* flushPending();
}
return { session, stream: dedupedStream };
return this.sessionManager.warmSession();
}
// =========================================================================
@@ -1191,13 +573,13 @@ export class LettaBot implements AgentSession {
this.store.clearConversation(convKey);
this.store.resetRecoveryAttempts();
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
log.info(`/reset - conversation cleared for key="${convKey}"`);
// Eagerly create the new session so we can report the conversation ID.
try {
const session = await this.ensureSessionForKey(convKey);
const session = await this.sessionManager.ensureSessionForKey(convKey);
const newConvId = session.conversationId || '(pending)';
this.persistSessionState(session, convKey);
this.sessionManager.persistSessionState(session, convKey);
if (convKey === 'shared') {
return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`;
}
@@ -1223,7 +605,7 @@ export class LettaBot implements AgentSession {
this.cancelledKeys.add(convKey);
// Abort client-side stream
const session = this.sessions.get(convKey);
const session = this.sessionManager.getSession(convKey);
if (session) {
session.abort().catch(() => {});
log.info(`/cancel - aborted session stream (key=${convKey})`);
@@ -1577,7 +959,7 @@ export class LettaBot implements AgentSession {
if (userText.length > 0) {
log.debug(`processMessage seq=${seq} textPreview=${userText.slice(0, 80)}`);
}
const run = await this.runSession(messageToSend, { retried, canUseTool, convKey });
const run = await this.sessionManager.runSession(messageToSend, { retried, canUseTool, convKey });
lap('session send');
session = run.session;
@@ -1715,7 +1097,7 @@ export class LettaBot implements AgentSession {
// Log meaningful events with structured summaries
if (streamMsg.type === 'tool_call') {
this.syncTodoToolCall(streamMsg);
this.sessionManager.syncTodoToolCall(streamMsg);
const tcName = streamMsg.toolName || 'unknown';
const tcId = streamMsg.toolCallId?.slice(0, 12) || '?';
log.info(`>>> TOOL CALL: ${tcName} (id: ${tcId})`);
@@ -1826,7 +1208,7 @@ export class LettaBot implements AgentSession {
// next message. Discard it and retry so the message gets processed.
if (streamMsg.stopReason === 'cancelled') {
log.info(`Discarding cancelled run result (seq=${seq}, len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`);
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
session = null;
if (!retried) {
return this.processMessage(msg, adapter, true);
@@ -1836,7 +1218,7 @@ export class LettaBot implements AgentSession {
const resultRunState = this.classifyResultRun(convKey, streamMsg);
if (resultRunState === 'stale') {
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
session = null;
if (!retried) {
log.warn(`Retrying message after stale duplicate result (seq=${seq}, key=${convKey})`);
@@ -1925,7 +1307,7 @@ export class LettaBot implements AgentSession {
if (isApprovalConflict && !retried && this.store.agentId) {
if (retryConvId) {
log.info('Approval conflict detected -- attempting targeted recovery...');
this.invalidateSession(retryConvKey);
this.sessionManager.invalidateSession(retryConvKey);
session = null;
clearInterval(typingInterval);
const convResult = await recoverOrphanedConversationApproval(
@@ -1969,7 +1351,7 @@ export class LettaBot implements AgentSession {
if (!retried && this.store.agentId && retryConvId) {
const reason = shouldRetryForErrorResult ? 'error result' : 'empty result';
log.info(`${reason} - attempting orphaned approval recovery...`);
this.invalidateSession(retryConvKey);
this.sessionManager.invalidateSession(retryConvKey);
session = null;
clearInterval(typingInterval);
const convResult = await recoverOrphanedConversationApproval(
@@ -2113,7 +1495,7 @@ export class LettaBot implements AgentSession {
// eliminate any possibility of stream state bleed between sequential
// sends. Costs ~5s subprocess init overhead per message.
if (this.config.reuseSession === false) {
this.invalidateSession(finalConvKey);
this.sessionManager.invalidateSession(finalConvKey);
}
this.cancelledKeys.delete(finalConvKey);
}
@@ -2176,7 +1558,7 @@ export class LettaBot implements AgentSession {
try {
let retried = false;
while (true) {
const { stream } = await this.runSession(text, { convKey, retried });
const { stream } = await this.sessionManager.runSession(text, { convKey, retried });
try {
let response = '';
@@ -2184,7 +1566,7 @@ export class LettaBot implements AgentSession {
let lastErrorDetail: { message: string; stopReason: string; apiError?: Record<string, unknown> } | undefined;
for await (const msg of stream()) {
if (msg.type === 'tool_call') {
this.syncTodoToolCall(msg);
this.sessionManager.syncTodoToolCall(msg);
}
if (msg.type === 'error') {
lastErrorDetail = {
@@ -2224,7 +1606,7 @@ export class LettaBot implements AgentSession {
}
if (sawStaleDuplicateResult) {
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
if (retried) {
throw new Error('Agent stream returned stale duplicate result after retry');
}
@@ -2239,13 +1621,14 @@ export class LettaBot implements AgentSession {
return response;
} catch (error) {
// Invalidate on stream errors so next call gets a fresh subprocess
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
throw error;
}
}
} finally {
if (this.config.reuseSession === false) {
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
}
this.releaseLock(convKey, acquired);
}
@@ -2263,17 +1646,17 @@ export class LettaBot implements AgentSession {
const acquired = await this.acquireLock(convKey);
try {
const { stream } = await this.runSession(text, { convKey });
const { stream } = await this.sessionManager.runSession(text, { convKey });
try {
yield* stream();
} catch (error) {
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
throw error;
}
} finally {
if (this.config.reuseSession === false) {
this.invalidateSession(convKey);
this.sessionManager.invalidateSession(convKey);
}
this.releaseLock(convKey, acquired);
}

View File

@@ -39,7 +39,7 @@ describe('listening mode directive safety', () => {
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).runSession = vi.fn(async () => ({
(bot as any).sessionManager.runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield {

View File

@@ -36,7 +36,7 @@ describe('result divergence guard', () => {
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).runSession = vi.fn(async () => ({
(bot as any).sessionManager.runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
// Assistant text is flushed when tool_call arrives.
@@ -81,7 +81,7 @@ describe('result divergence guard', () => {
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
};
(bot as any).runSession = vi.fn(async () => ({
(bot as any).sessionManager.runSession = vi.fn(async () => ({
session: { abort: vi.fn(async () => {}) },
stream: async function* () {
yield { type: 'assistant', content: 'streamed-segment' };

View File

@@ -547,19 +547,20 @@ describe('SDK session contract', () => {
bot.setAgentId('agent-contract-test');
const botInternal = bot as any;
botInternal.sessions.set('telegram:active', activeSession);
botInternal.sessions.set('telegram:idle', idleSession);
botInternal.sessionLastUsed.set('telegram:active', 1);
botInternal.sessionLastUsed.set('telegram:idle', 2);
const sm = botInternal.sessionManager;
sm.sessions.set('telegram:active', activeSession);
sm.sessions.set('telegram:idle', idleSession);
sm.sessionLastUsed.set('telegram:active', 1);
sm.sessionLastUsed.set('telegram:idle', 2);
botInternal.processingKeys.add('telegram:active');
await botInternal._createSessionForKey('telegram:new', true, 0);
await sm._createSessionForKey('telegram:new', true, 0);
expect(activeSession.close).not.toHaveBeenCalled();
expect(idleSession.close).toHaveBeenCalledTimes(1);
expect(botInternal.sessions.has('telegram:active')).toBe(true);
expect(botInternal.sessions.has('telegram:idle')).toBe(false);
expect(botInternal.sessions.has('telegram:new')).toBe(true);
expect(sm.sessions.has('telegram:active')).toBe(true);
expect(sm.sessions.has('telegram:idle')).toBe(false);
expect(sm.sessions.has('telegram:new')).toBe(true);
});
it('enriches opaque error via stream error event in sendToAgent', async () => {

652
src/core/session-manager.ts Normal file
View File

@@ -0,0 +1,652 @@
/**
* SessionManager - Owns CLI subprocess lifecycle, session creation,
* LRU eviction, invalidation, and message send+stream.
*
* Extracted from bot.ts to isolate session concerns from message
* routing, channel management, and directive execution.
*/
import { createAgent, createSession, resumeSession, type Session, type SendMessage, type CanUseToolCallback } from '@letta-ai/letta-code-sdk';
import type { BotConfig, StreamMsg } from './types.js';
import { isApprovalConflictError, isConversationMissingError, isAgentMissingFromInitError } from './errors.js';
import { Store } from './store.js';
import { updateAgentName, recoverOrphanedConversationApproval } from '../tools/letta-api.js';
import { installSkillsToAgent, withAgentSkillsOnPath } from '../skills/loader.js';
import { loadMemoryBlocks } from './memory.js';
import { SYSTEM_PROMPT } from './system-prompt.js';
import { createManageTodoTool } from '../tools/todo.js';
import { syncTodosFromTool } from '../todo/store.js';
import { createLogger } from '../logger.js';
const log = createLogger('Session');
export class SessionManager {
private readonly store: Store;
private readonly config: BotConfig;
// Active processing keys -- owned by LettaBot, read here for LRU eviction safety.
private readonly processingKeys: ReadonlySet<string>;
// Stale-result fingerprints -- owned by LettaBot, cleaned here on invalidation/eviction.
private readonly lastResultRunFingerprints: Map<string, string>;
// Persistent sessions: reuse CLI subprocesses across messages.
private sessions: Map<string, Session> = new Map();
private sessionLastUsed: Map<string, number> = new Map();
private sessionCreationLocks: Map<string, { promise: Promise<Session>; generation: number }> = new Map();
private sessionGenerations: Map<string, number> = new Map();
// Per-message tool callback. Updated before each send() so the Session
// options (which hold a stable wrapper) route to the current handler.
private currentCanUseTool: CanUseToolCallback | undefined;
// Stable callback wrapper so the Session options never change, but we can
// swap out the per-message handler before each send().
private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => {
if (this.currentCanUseTool) {
return this.currentCanUseTool(toolName, toolInput);
}
return { behavior: 'allow' as const };
};
constructor(
store: Store,
config: BotConfig,
processingKeys: ReadonlySet<string>,
lastResultRunFingerprints: Map<string, string>,
) {
this.store = store;
this.config = config;
this.processingKeys = processingKeys;
this.lastResultRunFingerprints = lastResultRunFingerprints;
}
// =========================================================================
// Todo sync (stream utility)
// =========================================================================
private getTodoAgentKey(): string {
return this.store.agentId || this.config.agentName || 'LettaBot';
}
/** Sync TodoWrite tool calls to the persistent heartbeat store. */
syncTodoToolCall(streamMsg: StreamMsg): void {
if (streamMsg.type !== 'tool_call') return;
const normalizedToolName = (streamMsg.toolName || '').toLowerCase();
const isBuiltInTodoTool = normalizedToolName === 'todowrite'
|| normalizedToolName === 'todo_write'
|| normalizedToolName === 'writetodos'
|| normalizedToolName === 'write_todos';
if (!isBuiltInTodoTool) return;
const input = (streamMsg.toolInput && typeof streamMsg.toolInput === 'object')
? streamMsg.toolInput as Record<string, unknown>
: null;
if (!input || !Array.isArray(input.todos)) return;
const incoming: Array<{
content?: string;
description?: string;
status: 'pending' | 'in_progress' | 'completed' | 'cancelled';
}> = [];
for (const item of input.todos) {
if (!item || typeof item !== 'object') continue;
const obj = item as Record<string, unknown>;
const statusRaw = typeof obj.status === 'string' ? obj.status : '';
if (!['pending', 'in_progress', 'completed', 'cancelled'].includes(statusRaw)) continue;
incoming.push({
content: typeof obj.content === 'string' ? obj.content : undefined,
description: typeof obj.description === 'string' ? obj.description : undefined,
status: statusRaw as 'pending' | 'in_progress' | 'completed' | 'cancelled',
});
}
if (incoming.length === 0) return;
try {
const summary = syncTodosFromTool(this.getTodoAgentKey(), incoming);
if (summary.added > 0 || summary.updated > 0) {
log.info(`Synced ${summary.totalIncoming} todo(s) from ${streamMsg.toolName} into heartbeat store (added=${summary.added}, updated=${summary.updated})`);
}
} catch (err) {
log.warn('Failed to sync TodoWrite todos:', err instanceof Error ? err.message : err);
}
}
// =========================================================================
// Session options & timeout
// =========================================================================
private getSessionTimeoutMs(): number {
const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS);
if (Number.isFinite(envTimeoutMs) && envTimeoutMs > 0) {
return envTimeoutMs;
}
return 60000;
}
async withSessionTimeout<T>(
promise: Promise<T>,
label: string,
): Promise<T> {
const timeoutMs = this.getSessionTimeoutMs();
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<T>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new Error(`${label} timed out after ${timeoutMs}ms`));
}, timeoutMs);
});
try {
return await Promise.race([promise, timeoutPromise]);
} finally {
if (timeoutId) clearTimeout(timeoutId);
}
}
private baseSessionOptions(canUseTool?: CanUseToolCallback) {
return {
permissionMode: 'bypassPermissions' as const,
allowedTools: this.config.allowedTools,
disallowedTools: [
// Block built-in TodoWrite -- it requires interactive approval (fails
// silently during heartbeats) and writes to the CLI's own store rather
// than lettabot's persistent heartbeat store. The agent should use the
// custom manage_todo tool instead.
'TodoWrite',
...(this.config.disallowedTools || []),
],
cwd: this.config.workingDir,
tools: [createManageTodoTool(this.getTodoAgentKey())],
// Memory filesystem (context repository): true -> --memfs, false -> --no-memfs, undefined -> leave unchanged
...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}),
// In bypassPermissions mode, canUseTool is only called for interactive
// tools (AskUserQuestion, ExitPlanMode). When no callback is provided
// (background triggers), the SDK auto-denies interactive tools.
...(canUseTool ? { canUseTool } : {}),
};
}
// =========================================================================
// Session lifecycle (per-key)
// =========================================================================
/**
* Return the persistent session for the given conversation key,
* creating and initializing it if needed.
*
* After initialization, calls bootstrapState() to detect pending approvals.
* If an orphaned approval is found, recovers proactively before returning
* the session -- preventing the first send() from hitting a 409 CONFLICT.
*/
async ensureSessionForKey(key: string, bootstrapRetried = false): Promise<Session> {
const generation = this.sessionGenerations.get(key) ?? 0;
// Fast path: session already exists
const existing = this.sessions.get(key);
if (existing) {
this.sessionLastUsed.set(key, Date.now());
return existing;
}
// Coalesce concurrent callers: if another call is already creating this
// key (e.g. warmSession running while first message arrives), wait for
// it instead of creating a duplicate session.
const pending = this.sessionCreationLocks.get(key);
if (pending && pending.generation === generation) return pending.promise;
const promise = this._createSessionForKey(key, bootstrapRetried, generation);
this.sessionCreationLocks.set(key, { promise, generation });
try {
return await promise;
} finally {
const current = this.sessionCreationLocks.get(key);
if (current?.promise === promise) {
this.sessionCreationLocks.delete(key);
}
}
}
/** Internal session creation -- called via ensureSessionForKey's lock. */
private async _createSessionForKey(
key: string,
bootstrapRetried: boolean,
generation: number,
): Promise<Session> {
// Session was invalidated while this creation path was queued.
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
return this.ensureSessionForKey(key, bootstrapRetried);
}
// Re-read the store file from disk so we pick up agent/conversation ID
// changes made by other processes (e.g. after a restart or container deploy).
this.store.refresh();
const opts = this.baseSessionOptions(this.sessionCanUseTool);
let session: Session;
let sessionAgentId: string | undefined;
// In disabled mode, always resume the agent's built-in default conversation.
// Skip store lookup entirely -- no conversation ID is persisted.
const convId = key === 'default'
? null
: key === 'shared'
? this.store.conversationId
: this.store.getConversationId(key);
// Propagate per-agent cron store path to CLI subprocesses (lettabot-schedule)
if (this.config.cronStorePath) {
process.env.CRON_STORE_PATH = this.config.cronStorePath;
}
if (key === 'default' && this.store.agentId) {
process.env.LETTA_AGENT_ID = this.store.agentId;
installSkillsToAgent(this.store.agentId, this.config.skills);
sessionAgentId = this.store.agentId;
session = resumeSession('default', opts);
} else if (convId) {
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
if (this.store.agentId) {
installSkillsToAgent(this.store.agentId, this.config.skills);
sessionAgentId = this.store.agentId;
}
session = resumeSession(convId, opts);
} else if (this.store.agentId) {
// Agent exists but no conversation stored -- resume the default conversation
process.env.LETTA_AGENT_ID = this.store.agentId;
installSkillsToAgent(this.store.agentId, this.config.skills);
sessionAgentId = this.store.agentId;
session = resumeSession(this.store.agentId, opts);
} else {
// Create new agent -- persist immediately so we don't orphan it on later failures
log.info('Creating new agent');
const newAgentId = await createAgent({
systemPrompt: SYSTEM_PROMPT,
memory: loadMemoryBlocks(this.config.agentName),
tags: ['origin:lettabot'],
...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}),
});
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
this.store.setAgent(newAgentId, currentBaseUrl);
log.info('Saved new agent ID:', newAgentId);
if (this.config.agentName) {
updateAgentName(newAgentId, this.config.agentName).catch(() => {});
}
installSkillsToAgent(newAgentId, this.config.skills);
sessionAgentId = newAgentId;
// In disabled mode, resume the built-in default conversation instead of
// creating a new one. Other modes create a fresh conversation per key.
session = key === 'default'
? resumeSession('default', opts)
: createSession(newAgentId, opts);
}
// Initialize eagerly so the subprocess is ready before the first send()
log.info(`Initializing session subprocess (key=${key})...`);
try {
if (sessionAgentId) {
await withAgentSkillsOnPath(
sessionAgentId,
() => this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`),
);
} else {
await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`);
}
log.info(`Session subprocess ready (key=${key})`);
} catch (error) {
// Close immediately so failed initialization cannot leak a subprocess.
session.close();
// If the stored agent ID doesn't exist on the server (deleted externally,
// ghost agent from failed pairing, etc.), clear the stale ID and retry.
if (this.store.agentId && !bootstrapRetried && isAgentMissingFromInitError(error)) {
log.warn(
`Agent ${this.store.agentId} appears missing from server, ` +
`clearing stale agent ID and recreating...`,
);
this.store.clearAgent();
return this._createSessionForKey(key, /* bootstrapRetried */ true, generation);
}
throw error;
}
// reset/invalidate can happen while initialize() is in-flight.
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
log.info(`Discarding stale initialized session (key=${key})`);
session.close();
return this.ensureSessionForKey(key, bootstrapRetried);
}
// Proactive approval detection via bootstrapState().
if (!bootstrapRetried && this.store.agentId) {
try {
const bootstrap = await this.withSessionTimeout(
session.bootstrapState(),
`Session bootstrapState (key=${key})`,
);
if (bootstrap.hasPendingApproval) {
const convId = bootstrap.conversationId || session.conversationId;
log.warn(`Pending approval detected at session startup (key=${key}, conv=${convId}), recovering...`);
session.close();
if (convId) {
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
convId,
true, /* deepScan */
);
if (result.recovered) {
log.info(`Proactive approval recovery succeeded: ${result.details}`);
} else {
log.warn(`Proactive approval recovery did not find resolvable approvals: ${result.details}`);
}
}
return this._createSessionForKey(key, true, generation);
}
} catch (err) {
// bootstrapState failure is non-fatal -- the reactive 409 handler in
// runSession() will catch stuck approvals.
log.warn(`bootstrapState check failed (key=${key}), continuing:`, err instanceof Error ? err.message : err);
}
}
if ((this.sessionGenerations.get(key) ?? 0) !== generation) {
log.info(`Discarding stale session after bootstrapState (key=${key})`);
session.close();
return this.ensureSessionForKey(key, bootstrapRetried);
}
// LRU eviction: in per-chat mode, limit concurrent sessions to avoid
// unbounded subprocess growth.
const maxSessions = this.config.maxSessions ?? 10;
if (this.config.conversationMode === 'per-chat' && this.sessions.size >= maxSessions) {
let oldestKey: string | null = null;
let oldestTime = Infinity;
for (const [k, ts] of this.sessionLastUsed) {
if (k === key) continue;
if (!this.sessions.has(k)) continue;
// Never evict an active/in-flight key (can close a live stream).
if (this.processingKeys.has(k) || this.sessionCreationLocks.has(k)) continue;
if (ts < oldestTime) {
oldestKey = k;
oldestTime = ts;
}
}
if (oldestKey) {
log.info(`LRU session eviction: closing session for key="${oldestKey}" (${this.sessions.size} active, max=${maxSessions})`);
const evicted = this.sessions.get(oldestKey);
evicted?.close();
this.sessions.delete(oldestKey);
this.sessionLastUsed.delete(oldestKey);
this.sessionGenerations.delete(oldestKey);
this.sessionCreationLocks.delete(oldestKey);
this.lastResultRunFingerprints.delete(oldestKey);
} else {
// All existing sessions are active; allow temporary overflow.
log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`);
}
}
this.sessions.set(key, session);
this.sessionLastUsed.set(key, Date.now());
return session;
}
/** Get an active session by key (for abort/cancel). */
getSession(key: string): Session | undefined {
return this.sessions.get(key);
}
/**
* Destroy session(s). If key provided, destroys only that key.
* If key is undefined, destroys ALL sessions.
*/
invalidateSession(key?: string): void {
if (key) {
const nextGeneration = (this.sessionGenerations.get(key) ?? 0) + 1;
this.sessionGenerations.set(key, nextGeneration);
this.sessionCreationLocks.delete(key);
const session = this.sessions.get(key);
if (session) {
log.info(`Invalidating session (key=${key})`);
session.close();
this.sessions.delete(key);
this.sessionLastUsed.delete(key);
}
this.lastResultRunFingerprints.delete(key);
} else {
const keys = new Set<string>([
...this.sessions.keys(),
...this.sessionCreationLocks.keys(),
]);
for (const k of keys) {
const nextGeneration = (this.sessionGenerations.get(k) ?? 0) + 1;
this.sessionGenerations.set(k, nextGeneration);
}
for (const [k, session] of this.sessions) {
log.info(`Invalidating session (key=${k})`);
session.close();
}
this.sessions.clear();
this.sessionCreationLocks.clear();
this.sessionLastUsed.clear();
this.lastResultRunFingerprints.clear();
}
}
/**
* Pre-warm the session subprocess at startup.
*/
async warmSession(): Promise<void> {
this.store.refresh();
if (!this.store.agentId && !this.store.conversationId) return;
try {
const mode = this.config.conversationMode || 'shared';
if (mode === 'shared') {
await this.ensureSessionForKey('shared');
}
} catch (err) {
log.warn('Session pre-warm failed:', err instanceof Error ? err.message : err);
}
}
/**
* Persist conversation ID after a successful session result.
* Agent ID and first-run setup are handled eagerly in ensureSessionForKey().
*/
persistSessionState(session: Session, convKey?: string): void {
// Agent ID already persisted in ensureSessionForKey() on creation.
// Here we only update if the server returned a different one (shouldn't happen).
if (session.agentId && session.agentId !== this.store.agentId) {
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
log.info('Agent ID updated:', session.agentId);
} else if (session.conversationId && session.conversationId !== 'default' && convKey !== 'default') {
// In per-channel mode, persist per-key. In shared mode, use legacy field.
// Skip saving "default" -- it's an API alias, not a real conversation ID.
// In disabled mode (convKey === 'default'), skip -- always use the built-in default.
if (convKey && convKey !== 'shared') {
const existing = this.store.getConversationId(convKey);
if (session.conversationId !== existing) {
this.store.setConversationId(convKey, session.conversationId);
log.info(`Conversation ID updated (key=${convKey}):`, session.conversationId);
}
} else if (session.conversationId !== this.store.conversationId) {
this.store.conversationId = session.conversationId;
log.info('Conversation ID updated:', session.conversationId);
}
}
}
// =========================================================================
// Send + stream
// =========================================================================
/**
* Send a message and return a deduplicated stream.
*
* Handles:
* - Persistent session reuse (subprocess stays alive across messages)
* - CONFLICT recovery from orphaned approvals (retry once)
* - Conversation-not-found fallback (create new conversation)
* - Tool call deduplication
* - Session persistence after result
*/
async runSession(
message: SendMessage,
options: { retried?: boolean; canUseTool?: CanUseToolCallback; convKey?: string } = {},
): Promise<{ session: Session; stream: () => AsyncGenerator<StreamMsg> }> {
const { retried = false, canUseTool, convKey = 'shared' } = options;
// Update the per-message callback before sending
this.currentCanUseTool = canUseTool;
let session = await this.ensureSessionForKey(convKey);
// Resolve the conversation ID for this key (for error recovery)
const convId = convKey === 'shared'
? this.store.conversationId
: this.store.getConversationId(convKey);
// Send message with fallback chain
try {
await this.withSessionTimeout(session.send(message), `Session send (key=${convKey})`);
} catch (error) {
// 409 CONFLICT from orphaned approval
if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) {
log.info('CONFLICT detected - attempting orphaned approval recovery...');
this.invalidateSession(convKey);
const result = await recoverOrphanedConversationApproval(
this.store.agentId,
convId
);
if (result.recovered) {
log.info(`Recovery succeeded (${result.details}), retrying...`);
return this.runSession(message, { retried: true, canUseTool, convKey });
}
log.error(`Orphaned approval recovery failed: ${result.details}`);
throw error;
}
// Conversation/agent not found - try creating a new conversation.
if (this.store.agentId && isConversationMissingError(error)) {
log.warn(`Conversation not found (key=${convKey}), creating a new conversation...`);
this.invalidateSession(convKey);
if (convKey !== 'shared') {
this.store.clearConversation(convKey);
} else {
this.store.conversationId = null;
}
session = await this.ensureSessionForKey(convKey);
try {
await this.withSessionTimeout(session.send(message), `Session send retry (key=${convKey})`);
} catch (retryError) {
this.invalidateSession(convKey);
throw retryError;
}
} else {
// Unknown error -- invalidate so we get a fresh subprocess next time
this.invalidateSession(convKey);
throw error;
}
}
// Persist conversation ID immediately after successful send, before streaming.
this.persistSessionState(session, convKey);
// Return session and a stream generator that buffers tool_call chunks and
// flushes them with fully accumulated arguments on the next type boundary.
const pendingToolCalls = new Map<string, { msg: StreamMsg; accumulatedArgs: string }>();
const self = this;
const capturedConvKey = convKey; // Capture for closure
/** Merge tool argument strings, handling both delta and cumulative chunking. */
function mergeToolArgs(existing: string, incoming: string): string {
if (!incoming) return existing;
if (!existing) return incoming;
if (incoming === existing) return existing;
// Cumulative: latest chunk includes all prior text
if (incoming.startsWith(existing)) return incoming;
if (existing.endsWith(incoming)) return existing;
// Delta: each chunk is an append
return `${existing}${incoming}`;
}
function* flushPending(): Generator<StreamMsg> {
for (const [, pending] of pendingToolCalls) {
if (!pending.accumulatedArgs) {
// No rawArguments accumulated (old SDK or single complete chunk) --
// preserve the original toolInput from the first chunk as-is.
yield pending.msg;
continue;
}
let toolInput: Record<string, unknown> = {};
try { toolInput = JSON.parse(pending.accumulatedArgs); }
catch { toolInput = { raw: pending.accumulatedArgs }; }
yield { ...pending.msg, toolInput };
}
pendingToolCalls.clear();
lastPendingToolCallId = null;
}
let anonToolCallCounter = 0;
let lastPendingToolCallId: string | null = null;
async function* dedupedStream(): AsyncGenerator<StreamMsg> {
for await (const raw of session.stream()) {
const msg = raw as StreamMsg;
if (msg.type === 'tool_call') {
let id = msg.toolCallId;
if (!id) {
// Tool calls without IDs (e.g., from models that don't emit
// tool_call_id on subsequent argument chunks) still need to be
// accumulated. Assign a synthetic ID so they enter the buffer.
// If tool name matches the most recent pending call, treat this as
// a continuation even when the first chunk had a real toolCallId.
const currentPending = lastPendingToolCallId ? pendingToolCalls.get(lastPendingToolCallId) : null;
if (lastPendingToolCallId && currentPending && (currentPending.msg.toolName || 'unknown') === (msg.toolName || 'unknown')) {
id = lastPendingToolCallId;
} else {
id = `__anon_${++anonToolCallCounter}__`;
}
}
const incoming = (msg as StreamMsg & { rawArguments?: string }).rawArguments || '';
const existing = pendingToolCalls.get(id);
if (existing) {
existing.accumulatedArgs = mergeToolArgs(existing.accumulatedArgs, incoming);
} else {
pendingToolCalls.set(id, { msg, accumulatedArgs: incoming });
}
lastPendingToolCallId = id;
continue; // buffer, don't yield yet
}
// Flush pending tool calls on semantic type boundary (not stream_event)
if (pendingToolCalls.size > 0 && msg.type !== 'stream_event') {
yield* flushPending();
}
if (msg.type === 'result') {
// Flush any remaining before result
yield* flushPending();
self.persistSessionState(session, capturedConvKey);
}
yield msg;
if (msg.type === 'result') {
break;
}
}
// Flush remaining at generator end (shouldn't normally happen)
yield* flushPending();
}
return { session, stream: dedupedStream };
}
}