diff --git a/docs/README.md b/docs/README.md index c665806..fe15076 100644 --- a/docs/README.md +++ b/docs/README.md @@ -13,6 +13,7 @@ LettaBot is a multi-channel AI assistant powered by [Letta](https://letta.com) t - [Chat API](./configuration.md#chat-endpoint) - HTTP endpoint for programmatic agent access - [Open WebUI Setup](./openwebui-setup.md) - Web chat UI via OpenAI-compatible API - [Response Directives](./directives.md) - XML action directives (reactions, etc.) +- [Skills](./skills.md) - Skills architecture and authoring guide - [Scheduling Tasks](./cron-setup.md) - Cron jobs and heartbeats - [Gmail Pub/Sub](./gmail-pubsub.md) - Email notifications integration - [Railway Deployment](./railway-deploy.md) - Railway-specific setup (one-click deploy, volumes) diff --git a/docs/skills.md b/docs/skills.md new file mode 100644 index 0000000..1bfc43e --- /dev/null +++ b/docs/skills.md @@ -0,0 +1,154 @@ +# Skills + +Skills extend your LettaBot agent with CLI tools and specialized knowledge. They follow the open [Agent Skills](https://docs.letta.com/letta-code/skills) standard used by Letta Code, Cursor, Claude Code, and other compatible agents. + +This document covers how skills work within lettabot specifically -- directory hierarchy, feature-gated installation, the SKILL.md format, and how to author new skills. + +## How skills work + +Skills go through two phases: + +1. **Installation** -- Feature-gated skills (scheduling, Google, voice memo) are automatically copied to the agent's skill directory based on config flags in `lettabot.yaml`. Non-feature-gated skills are discovered directly from the directory hierarchy. +2. **Runtime** -- When a session starts, skill directories containing executables are prepended to `PATH` so the agent can invoke them as CLI tools. + +## Directory hierarchy + +LettaBot scans these directories in priority order. Same-name skills at higher priority override lower ones: + +| Priority | Path | Scope | Description | +|----------|------|-------|-------------| +| 1 (highest) | `.skills/` | Project | Skills specific to this lettabot project | +| 2 | `~/.letta/agents/{id}/skills/` | Agent | Skills for one specific agent | +| 3 | `~/.letta/skills/` | Global | Shared across all agents on this machine | +| 4 | `skills/` (in lettabot repo) | Bundled | Ships with lettabot | +| 5 (lowest) | `~/.agents/skills/` | skills.sh | Installed via [skills.sh](https://skills.sh) | + +Feature-gated skills are copied from source directories into the agent-scoped directory (`~/.letta/agents/{id}/skills/`) when a session is first acquired. The copy is idempotent -- skills already present in the target are skipped. + +## Feature-gated skills + +Some skills are only installed when their corresponding feature is enabled in `lettabot.yaml`: + +| Config flag | Skills installed | Purpose | +|------------|-----------------|---------| +| `features.cron: true` | `scheduling` | Cron jobs and one-off reminders via `lettabot-schedule` | +| `integrations.google.enabled: true` or `polling.gmail.enabled: true` | `gog`, `google` | Google Workspace and Gmail integration | +| TTS provider configured (ElevenLabs or OpenAI key set) | `voice-memo` | Voice memo replies via `lettabot-tts` | + +The mapping from config flags to skill names lives in `FEATURE_SKILLS` in `src/skills/loader.ts`. The code also supports passing explicit skill names programmatically via `additionalSkills` in `SkillsInstallConfig`. + +## SKILL.md format + +Each skill is a directory containing a `SKILL.md` file with YAML frontmatter and a body written for the agent (not humans). The body is loaded into the agent's context when the skill is relevant -- it's a prompt, not documentation. + +```markdown +--- +name: scheduling +description: Create scheduled tasks and one-off reminders. +--- + +# Scheduling + +(Agent-facing instructions: CLI usage, when to use the skill, examples, constraints...) +``` + +### Frontmatter fields + +| Field | Required | Description | +|-------|----------|-------------| +| `name` | Yes | Skill identifier (must be unique within its scope) | +| `description` | No | Brief description shown to the agent | +| `emoji` | No | Display emoji | +| `homepage` | No | URL for the skill's homepage or docs | +| `metadata` | No | JSON-encoded object with a `clawdbot` key (see below) | + +### ClawdBot metadata + +The `metadata` field can contain a JSON-encoded `clawdbot` object for requirements and install specs: + +```yaml +metadata: >- + {"clawdbot": { + "emoji": "📦", + "requires": {"bins": ["mycli"], "env": ["MY_API_KEY"]}, + "install": [{"kind": "brew", "formula": "mycli"}] + }} +``` + +**`requires`** -- prerequisites for the skill to be eligible: + +| Field | Type | Description | +|-------|------|-------------| +| `bins` | `string[]` | All of these binaries must exist on PATH | +| `anyBins` | `string[]` | At least one of these must exist | +| `env` | `string[]` | Required environment variables | + +Run `lettabot skills status` to see which skills are eligible and which have missing binaries, environment variables, or platform mismatches. + +**`install`** -- how to install dependencies (tried in order, filtered by platform): + +| Field | Type | Description | +|-------|------|-------------| +| `kind` | `'brew' \| 'node' \| 'go' \| 'uv' \| 'download'` | Package manager | +| `formula` | `string` | Homebrew formula (for `brew`) | +| `package` | `string` | npm/uv package name (for `node`/`uv`) | +| `module` | `string` | Go module path (for `go`) | +| `url` | `string` | Download URL (for `download`) | +| `bins` | `string[]` | Binaries this installs | +| `os` | `string[]` | Platform filter (`darwin`, `linux`, `win32`) | +| `label` | `string` | Display label for the install option | + +**Other metadata fields:** + +| Field | Type | Description | +|-------|------|-------------| +| `os` | `string[]` | Restrict skill to these platforms | +| `always` | `boolean` | Always eligible regardless of requirements | +| `skillKey` | `string` | Override the skill's key identifier | +| `primaryEnv` | `string` | Primary environment variable for the skill | + +## Skill execution + +When a session starts, `withAgentSkillsOnPath()` in `src/skills/loader.ts` prepends the agent's skill directories to `PATH`. Only directories containing at least one non-`.md` file (i.e., executables or scripts) are added. + +PATH mutations are serialized via a lock to avoid races when multiple sessions initialize concurrently. The original PATH is restored after the session operation completes. + +## Bundled skills + +LettaBot ships with two built-in skills in the `skills/` directory: + +- **scheduling** -- Create recurring cron jobs and one-off reminders via the `lettabot-schedule` CLI. Enabled by `features.cron: true`. See [Cron Setup](./cron-setup.md) for details. +- **voice-memo** -- Reply with voice notes using the `` directive and `lettabot-tts` CLI. Enabled when a TTS provider (ElevenLabs or OpenAI) is configured. + +## Installing external skills + +Use the interactive CLI to discover and enable skills: + +```bash +# Interactive skill selector +lettabot skills + +# Enable/disable skills from Clawdhub, skills.sh, and built-in sources +lettabot skills sync + +# Check status of all discovered skills +lettabot skills status +``` + +External skill sources: + +- [Clawdhub](https://clawdhub.com/) -- `npx clawdhub@latest install ` +- [skills.sh](https://skills.sh) -- community skill repositories + +See the [Letta Code skills documentation](https://docs.letta.com/letta-code/skills) for the general skill installation flow and additional skill sources. + +## Authoring a new skill + +To add a skill to lettabot: + +1. Create a directory under `skills//` with a `SKILL.md` file. The frontmatter declares metadata (see above). The body is a prompt loaded into the agent's context -- write it as instructions the agent will follow, not as human documentation. +2. Place any executables or scripts alongside `SKILL.md` in the same directory. These become available on the agent's PATH at runtime. +3. If the skill should be feature-gated (only installed when a config flag is set), add an entry to `FEATURE_SKILLS` in `src/skills/loader.ts` and wire up the corresponding config flag in `main.ts`. +4. Verify with `lettabot skills status` that the skill is discovered and eligible. + +For project-local skills that don't ship with lettabot, place them in `.skills//` instead. diff --git a/lettabot-agent.json.bak b/lettabot-agent.json.bak new file mode 100644 index 0000000..d3b2051 --- /dev/null +++ b/lettabot-agent.json.bak @@ -0,0 +1,16 @@ +{ + "version": 2, + "agents": { + "LettaBot": { + "agentId": "agent-78eb94ac-d2ef-414f-8b74-a02a95e16b33", + "lastUsedAt": "2026-03-02T21:48:34.231Z", + "createdAt": "2026-03-02T21:48:34.231Z", + "lastMessageTarget": { + "channel": "discord", + "chatId": "chat-1", + "updatedAt": "2026-03-02T22:45:13.260Z" + }, + "recoveryAttempts": 0 + } + } +} diff --git a/src/core/bot.ts b/src/core/bot.ts index d78beb9..2ae7fa5 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -4,28 +4,25 @@ * Single agent, single conversation - chat continues across all channels. */ -import { createAgent, createSession, resumeSession, imageFromFile, imageFromURL, type Session, type MessageContentItem, type SendMessage, type CanUseToolCallback } from '@letta-ai/letta-code-sdk'; +import { imageFromFile, imageFromURL, type Session, type MessageContentItem, type SendMessage, type CanUseToolCallback } from '@letta-ai/letta-code-sdk'; import { mkdirSync, existsSync } from 'node:fs'; import { access, unlink, realpath, stat, constants } from 'node:fs/promises'; import { execFile } from 'node:child_process'; import { extname, resolve, join } from 'node:path'; import type { ChannelAdapter } from '../channels/types.js'; import type { BotConfig, InboundMessage, TriggerContext, StreamMsg } from './types.js'; -import { isApprovalConflictError, isConversationMissingError, isAgentMissingFromInitError, formatApiErrorForUser } from './errors.js'; +import { formatApiErrorForUser } from './errors.js'; import { formatToolCallDisplay, formatReasoningDisplay, formatQuestionsForChannel } from './display.js'; import type { AgentSession } from './interfaces.js'; import { Store } from './store.js'; -import { updateAgentName, getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js'; -import { installSkillsToAgent, withAgentSkillsOnPath, getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js'; +import { getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel } from '../tools/letta-api.js'; +import { getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js'; import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js'; import type { GroupBatcher } from './group-batcher.js'; -import { loadMemoryBlocks } from './memory.js'; import { redactOutbound } from './redact.js'; -import { SYSTEM_PROMPT } from './system-prompt.js'; import { parseDirectives, stripActionsBlock, type Directive } from './directives.js'; import { resolveEmoji } from './emoji.js'; -import { createManageTodoTool } from '../tools/todo.js'; -import { syncTodosFromTool } from '../todo/store.js'; +import { SessionManager } from './session-manager.js'; import { createLogger } from '../logger.js'; @@ -212,28 +209,9 @@ export class LettaBot implements AgentSession { // In shared mode, a single entry keyed by 'shared' provides legacy behavior. private pendingQuestionResolvers: Map void> = new Map(); - // Persistent sessions: reuse CLI subprocesses across messages. - // In shared mode, only the "shared" key is used. In per-channel mode, each - // channel (and optionally heartbeat) gets its own subprocess. In per-chat - // mode, each unique channel:chatId gets its own subprocess (LRU-evicted). - private sessions: Map = new Map(); - private sessionLastUsed: Map = new Map(); // LRU tracking for per-chat mode - // Coalesces concurrent ensureSessionForKey calls for the same key so the - // second caller waits for the first instead of creating a duplicate session. - // generation prevents stale in-flight creations from being reused after reset. - private sessionCreationLocks: Map; generation: number }> = new Map(); - private sessionGenerations: Map = new Map(); - private currentCanUseTool: CanUseToolCallback | undefined; private conversationOverrides: Set = new Set(); - // Stable callback wrapper so the Session options never change, but we can - // swap out the per-message handler before each send(). - private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => { - if (this.currentCanUseTool) { - return this.currentCanUseTool(toolName, toolInput); - } - return { behavior: 'allow' as const }; - }; - + private readonly sessionManager: SessionManager; + constructor(config: BotConfig) { this.config = config; mkdirSync(config.workingDir, { recursive: true }); @@ -244,6 +222,7 @@ export class LettaBot implements AgentSession { if (config.conversationOverrides?.length) { this.conversationOverrides = new Set(config.conversationOverrides.map((ch) => ch.toLowerCase())); } + this.sessionManager = new SessionManager(this.store, config, this.processingKeys, this.lastResultRunFingerprints); log.info(`LettaBot initialized. Agent ID: ${this.store.agentId || '(new)'}`); } @@ -292,114 +271,9 @@ export class LettaBot implements AgentSession { return 'fresh'; } - // ========================================================================= - // Session options (shared by processMessage and sendToAgent) - // ========================================================================= - - private getTodoAgentKey(): string { - return this.store.agentId || this.config.agentName || 'LettaBot'; - } - - private syncTodoToolCall(streamMsg: StreamMsg): void { - if (streamMsg.type !== 'tool_call') return; - - const normalizedToolName = (streamMsg.toolName || '').toLowerCase(); - const isBuiltInTodoTool = normalizedToolName === 'todowrite' - || normalizedToolName === 'todo_write' - || normalizedToolName === 'writetodos' - || normalizedToolName === 'write_todos'; - if (!isBuiltInTodoTool) return; - - const input = (streamMsg.toolInput && typeof streamMsg.toolInput === 'object') - ? streamMsg.toolInput as Record - : null; - if (!input || !Array.isArray(input.todos)) return; - - const incoming: Array<{ - content?: string; - description?: string; - status: 'pending' | 'in_progress' | 'completed' | 'cancelled'; - }> = []; - for (const item of input.todos) { - if (!item || typeof item !== 'object') continue; - const obj = item as Record; - const statusRaw = typeof obj.status === 'string' ? obj.status : ''; - if (!['pending', 'in_progress', 'completed', 'cancelled'].includes(statusRaw)) continue; - incoming.push({ - content: typeof obj.content === 'string' ? obj.content : undefined, - description: typeof obj.description === 'string' ? obj.description : undefined, - status: statusRaw as 'pending' | 'in_progress' | 'completed' | 'cancelled', - }); - } - if (incoming.length === 0) return; - - try { - const summary = syncTodosFromTool(this.getTodoAgentKey(), incoming); - if (summary.added > 0 || summary.updated > 0) { - log.info(`Synced ${summary.totalIncoming} todo(s) from ${streamMsg.toolName} into heartbeat store (added=${summary.added}, updated=${summary.updated})`); - } - } catch (err) { - log.warn('Failed to sync TodoWrite todos:', err instanceof Error ? err.message : err); - } - } - - private getSessionTimeoutMs(): number { - const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS); - if (Number.isFinite(envTimeoutMs) && envTimeoutMs > 0) { - return envTimeoutMs; - } - return 60000; - } - - private async withSessionTimeout( - promise: Promise, - label: string, - ): Promise { - const timeoutMs = this.getSessionTimeoutMs(); - let timeoutId: ReturnType | undefined; - const timeoutPromise = new Promise((_, reject) => { - timeoutId = setTimeout(() => { - reject(new Error(`${label} timed out after ${timeoutMs}ms`)); - }, timeoutMs); - }); - try { - return await Promise.race([promise, timeoutPromise]); - } finally { - if (timeoutId) clearTimeout(timeoutId); - } - } - - private baseSessionOptions(canUseTool?: CanUseToolCallback) { - return { - permissionMode: 'bypassPermissions' as const, - allowedTools: this.config.allowedTools, - disallowedTools: [ - // Block built-in TodoWrite -- it requires interactive approval (fails - // silently during heartbeats) and writes to the CLI's own store rather - // than lettabot's persistent heartbeat store. The agent should use the - // custom manage_todo tool instead. - 'TodoWrite', - ...(this.config.disallowedTools || []), - ], - cwd: this.config.workingDir, - tools: [createManageTodoTool(this.getTodoAgentKey())], - // Memory filesystem (context repository): true -> --memfs, false -> --no-memfs, undefined -> leave unchanged - ...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}), - // In bypassPermissions mode, canUseTool is only called for interactive - // tools (AskUserQuestion, ExitPlanMode). When no callback is provided - // (background triggers), the SDK auto-denies interactive tools. - ...(canUseTool ? { canUseTool } : {}), - }; - } - // ========================================================================= // AskUserQuestion formatting // ========================================================================= - - /** - * Format AskUserQuestion questions as a single channel message. - * Displays each question with numbered options for the user to choose from. - */ // ========================================================================= // Session lifecycle helpers // ========================================================================= @@ -585,505 +459,13 @@ export class LettaBot implements AgentSession { ); } - // ========================================================================= - // Session lifecycle (per-key) - // ========================================================================= + // Session lifecycle delegated to SessionManager /** - * Return the persistent session for the given conversation key, - * creating and initializing it if needed. - * - * After initialization, calls bootstrapState() to detect pending approvals. - * If an orphaned approval is found, recovers proactively before returning - * the session -- preventing the first send() from hitting a 409 CONFLICT. - */ - private async ensureSessionForKey(key: string, bootstrapRetried = false): Promise { - const generation = this.sessionGenerations.get(key) ?? 0; - - // Fast path: session already exists - const existing = this.sessions.get(key); - if (existing) { - this.sessionLastUsed.set(key, Date.now()); - return existing; - } - - // Coalesce concurrent callers: if another call is already creating this - // key (e.g. warmSession running while first message arrives), wait for - // it instead of creating a duplicate session. - const pending = this.sessionCreationLocks.get(key); - if (pending && pending.generation === generation) return pending.promise; - - const promise = this._createSessionForKey(key, bootstrapRetried, generation); - this.sessionCreationLocks.set(key, { promise, generation }); - try { - return await promise; - } finally { - const current = this.sessionCreationLocks.get(key); - if (current?.promise === promise) { - this.sessionCreationLocks.delete(key); - } - } - } - - /** Internal session creation -- called via ensureSessionForKey's lock. */ - private async _createSessionForKey( - key: string, - bootstrapRetried: boolean, - generation: number, - ): Promise { - // Session was invalidated while this creation path was queued. - if ((this.sessionGenerations.get(key) ?? 0) !== generation) { - return this.ensureSessionForKey(key, bootstrapRetried); - } - - // Re-read the store file from disk so we pick up agent/conversation ID - // changes made by other processes (e.g. after a restart or container deploy). - // This costs one synchronous disk read per incoming message, which is fine - // at chat-bot throughput. If this ever becomes a bottleneck, throttle to - // refresh at most once per second. - this.store.refresh(); - - const opts = this.baseSessionOptions(this.sessionCanUseTool); - let session: Session; - let sessionAgentId: string | undefined; - - // In disabled mode, always resume the agent's built-in default conversation. - // Skip store lookup entirely -- no conversation ID is persisted. - const convId = key === 'default' - ? null - : key === 'shared' - ? this.store.conversationId - : this.store.getConversationId(key); - - // Propagate per-agent cron store path to CLI subprocesses (lettabot-schedule) - if (this.config.cronStorePath) { - process.env.CRON_STORE_PATH = this.config.cronStorePath; - } - - if (key === 'default' && this.store.agentId) { - process.env.LETTA_AGENT_ID = this.store.agentId; - installSkillsToAgent(this.store.agentId, this.config.skills); - sessionAgentId = this.store.agentId; - session = resumeSession('default', opts); - } else if (convId) { - process.env.LETTA_AGENT_ID = this.store.agentId || undefined; - if (this.store.agentId) { - installSkillsToAgent(this.store.agentId, this.config.skills); - sessionAgentId = this.store.agentId; - } - session = resumeSession(convId, opts); - } else if (this.store.agentId) { - // Agent exists but no conversation stored -- resume the default conversation - process.env.LETTA_AGENT_ID = this.store.agentId; - installSkillsToAgent(this.store.agentId, this.config.skills); - sessionAgentId = this.store.agentId; - session = resumeSession(this.store.agentId, opts); - } else { - // Create new agent -- persist immediately so we don't orphan it on later failures - log.info('Creating new agent'); - const newAgentId = await createAgent({ - systemPrompt: SYSTEM_PROMPT, - memory: loadMemoryBlocks(this.config.agentName), - tags: ['origin:lettabot'], - ...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}), - }); - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(newAgentId, currentBaseUrl); - log.info('Saved new agent ID:', newAgentId); - - if (this.config.agentName) { - updateAgentName(newAgentId, this.config.agentName).catch(() => {}); - } - installSkillsToAgent(newAgentId, this.config.skills); - sessionAgentId = newAgentId; - - // In disabled mode, resume the built-in default conversation instead of - // creating a new one. Other modes create a fresh conversation per key. - session = key === 'default' - ? resumeSession('default', opts) - : createSession(newAgentId, opts); - } - - // Initialize eagerly so the subprocess is ready before the first send() - log.info(`Initializing session subprocess (key=${key})...`); - try { - if (sessionAgentId) { - await withAgentSkillsOnPath( - sessionAgentId, - () => this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`), - ); - } else { - await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`); - } - log.info(`Session subprocess ready (key=${key})`); - } catch (error) { - // Close immediately so failed initialization cannot leak a subprocess. - session.close(); - - // If the stored agent ID doesn't exist on the server (deleted externally, - // ghost agent from failed pairing, etc.), clear the stale ID and retry. - // The retry will hit the "else" branch and create a fresh agent. - // Uses bootstrapRetried to prevent infinite recursion if creation also fails. - if (this.store.agentId && !bootstrapRetried && isAgentMissingFromInitError(error)) { - log.warn( - `Agent ${this.store.agentId} appears missing from server, ` + - `clearing stale agent ID and recreating...`, - ); - this.store.clearAgent(); - return this._createSessionForKey(key, /* bootstrapRetried */ true, generation); - } - - throw error; - } - - // reset/invalidate can happen while initialize() is in-flight. - if ((this.sessionGenerations.get(key) ?? 0) !== generation) { - log.info(`Discarding stale initialized session (key=${key})`); - session.close(); - return this.ensureSessionForKey(key, bootstrapRetried); - } - - // Proactive approval detection via bootstrapState(). - // Single CLI round-trip that returns hasPendingApproval flag alongside - // session metadata. If an orphaned approval is stuck, recover now so the - // first send() doesn't hit a 409 CONFLICT. - if (!bootstrapRetried && this.store.agentId) { - try { - const bootstrap = await this.withSessionTimeout( - session.bootstrapState(), - `Session bootstrapState (key=${key})`, - ); - if (bootstrap.hasPendingApproval) { - const convId = bootstrap.conversationId || session.conversationId; - log.warn(`Pending approval detected at session startup (key=${key}, conv=${convId}), recovering...`); - session.close(); - if (convId) { - const result = await recoverOrphanedConversationApproval( - this.store.agentId, - convId, - true, /* deepScan */ - ); - if (result.recovered) { - log.info(`Proactive approval recovery succeeded: ${result.details}`); - } else { - log.warn(`Proactive approval recovery did not find resolvable approvals: ${result.details}`); - } - } - // Recreate session after recovery (conversation state changed). - // Call _createSessionForKey directly (not ensureSessionForKey) since - // we're already inside the creation lock for this key. - return this._createSessionForKey(key, true, generation); - } - } catch (err) { - // bootstrapState failure is non-fatal -- the session is still usable. - // The reactive 409 handler in runSession() will catch stuck approvals. - log.warn(`bootstrapState check failed (key=${key}), continuing:`, err instanceof Error ? err.message : err); - } - } - - if ((this.sessionGenerations.get(key) ?? 0) !== generation) { - log.info(`Discarding stale session after bootstrapState (key=${key})`); - session.close(); - return this.ensureSessionForKey(key, bootstrapRetried); - } - - // LRU eviction: in per-chat mode, limit concurrent sessions to avoid - // unbounded subprocess growth. Evicted sessions can be cheaply recreated - // via resumeSession() since conversation IDs are persisted in the store. - const maxSessions = this.config.maxSessions ?? 10; - if (this.config.conversationMode === 'per-chat' && this.sessions.size >= maxSessions) { - let oldestKey: string | null = null; - let oldestTime = Infinity; - for (const [k, ts] of this.sessionLastUsed) { - if (k === key) continue; - if (!this.sessions.has(k)) continue; - // Never evict an active/in-flight key (can close a live stream). - if (this.processingKeys.has(k) || this.sessionCreationLocks.has(k)) continue; - if (ts < oldestTime) { - oldestKey = k; - oldestTime = ts; - } - } - if (oldestKey) { - log.info(`LRU session eviction: closing session for key="${oldestKey}" (${this.sessions.size} active, max=${maxSessions})`); - const evicted = this.sessions.get(oldestKey); - evicted?.close(); - this.sessions.delete(oldestKey); - this.sessionLastUsed.delete(oldestKey); - this.sessionGenerations.delete(oldestKey); - this.sessionCreationLocks.delete(oldestKey); - this.lastResultRunFingerprints.delete(oldestKey); - } else { - // All existing sessions are active; allow temporary overflow. - log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`); - } - } - - this.sessions.set(key, session); - this.sessionLastUsed.set(key, Date.now()); - return session; - } - - /** Legacy convenience: resolve key from shared/per-channel mode and delegate. */ - private async ensureSession(): Promise { - return this.ensureSessionForKey('shared'); - } - - /** - * Destroy session(s). If key provided, destroys only that key. - * If key is undefined, destroys ALL sessions. - */ - private invalidateSession(key?: string): void { - if (key) { - // Invalidate any in-flight creation for this key so reset can force - // a fresh conversation/session immediately. - const nextGeneration = (this.sessionGenerations.get(key) ?? 0) + 1; - this.sessionGenerations.set(key, nextGeneration); - this.sessionCreationLocks.delete(key); - - const session = this.sessions.get(key); - if (session) { - log.info(`Invalidating session (key=${key})`); - session.close(); - this.sessions.delete(key); - this.sessionLastUsed.delete(key); - } - this.lastResultRunFingerprints.delete(key); - } else { - const keys = new Set([ - ...this.sessions.keys(), - ...this.sessionCreationLocks.keys(), - ]); - for (const k of keys) { - const nextGeneration = (this.sessionGenerations.get(k) ?? 0) + 1; - this.sessionGenerations.set(k, nextGeneration); - } - - for (const [k, session] of this.sessions) { - log.info(`Invalidating session (key=${k})`); - session.close(); - } - this.sessions.clear(); - this.sessionCreationLocks.clear(); - this.sessionLastUsed.clear(); - this.lastResultRunFingerprints.clear(); - } - } - - /** - * Pre-warm the session subprocess at startup. Call after config/agent is loaded. + * Pre-warm the session subprocess at startup. */ async warmSession(): Promise { - this.store.refresh(); - if (!this.store.agentId && !this.store.conversationId) return; - try { - const mode = this.config.conversationMode || 'shared'; - // In shared mode, warm the single session. In per-channel/per-chat modes, - // warm nothing (sessions are created on first message per key). - if (mode === 'shared') { - await this.ensureSessionForKey('shared'); - } - } catch (err) { - log.warn('Session pre-warm failed:', err instanceof Error ? err.message : err); - } - } - - /** - * Persist conversation ID after a successful session result. - * Agent ID and first-run setup are handled eagerly in ensureSessionForKey(). - */ - private persistSessionState(session: Session, convKey?: string): void { - // Agent ID already persisted in ensureSessionForKey() on creation. - // Here we only update if the server returned a different one (shouldn't happen). - if (session.agentId && session.agentId !== this.store.agentId) { - const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; - this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); - log.info('Agent ID updated:', session.agentId); - } else if (session.conversationId && session.conversationId !== 'default' && convKey !== 'default') { - // In per-channel mode, persist per-key. In shared mode, use legacy field. - // Skip saving "default" -- it's an API alias, not a real conversation ID. - // In disabled mode (convKey === 'default'), skip -- always use the built-in default. - if (convKey && convKey !== 'shared') { - const existing = this.store.getConversationId(convKey); - if (session.conversationId !== existing) { - this.store.setConversationId(convKey, session.conversationId); - log.info(`Conversation ID updated (key=${convKey}):`, session.conversationId); - } - } else if (session.conversationId !== this.store.conversationId) { - this.store.conversationId = session.conversationId; - log.info('Conversation ID updated:', session.conversationId); - } - } - } - - /** - * Send a message and return a deduplicated stream. - * - * Handles: - * - Persistent session reuse (subprocess stays alive across messages) - * - CONFLICT recovery from orphaned approvals (retry once) - * - Conversation-not-found fallback (create new conversation) - * - Tool call deduplication - * - Session persistence after result - */ - private async runSession( - message: SendMessage, - options: { retried?: boolean; canUseTool?: CanUseToolCallback; convKey?: string } = {}, - ): Promise<{ session: Session; stream: () => AsyncGenerator }> { - const { retried = false, canUseTool, convKey = 'shared' } = options; - - // Update the per-message callback before sending - this.currentCanUseTool = canUseTool; - - let session = await this.ensureSessionForKey(convKey); - - // Resolve the conversation ID for this key (for error recovery) - const convId = convKey === 'shared' - ? this.store.conversationId - : this.store.getConversationId(convKey); - - // Send message with fallback chain - try { - await this.withSessionTimeout(session.send(message), `Session send (key=${convKey})`); - } catch (error) { - // 409 CONFLICT from orphaned approval - if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) { - log.info('CONFLICT detected - attempting orphaned approval recovery...'); - this.invalidateSession(convKey); - const result = await recoverOrphanedConversationApproval( - this.store.agentId, - convId - ); - if (result.recovered) { - log.info(`Recovery succeeded (${result.details}), retrying...`); - return this.runSession(message, { retried: true, canUseTool, convKey }); - } - log.error(`Orphaned approval recovery failed: ${result.details}`); - throw error; - } - - // Conversation/agent not found - try creating a new conversation. - // Only retry on errors that indicate missing conversation/agent, not - // on auth, network, or protocol errors (which would just fail again). - if (this.store.agentId && isConversationMissingError(error)) { - log.warn(`Conversation not found (key=${convKey}), creating a new conversation...`); - this.invalidateSession(convKey); - if (convKey !== 'shared') { - this.store.clearConversation(convKey); - } else { - this.store.conversationId = null; - } - session = await this.ensureSessionForKey(convKey); - try { - await this.withSessionTimeout(session.send(message), `Session send retry (key=${convKey})`); - } catch (retryError) { - this.invalidateSession(convKey); - throw retryError; - } - } else { - // Unknown error -- invalidate so we get a fresh subprocess next time - this.invalidateSession(convKey); - throw error; - } - } - - // Persist conversation ID immediately after successful send, before streaming. - this.persistSessionState(session, convKey); - - // Return session and a stream generator that buffers tool_call chunks and - // flushes them with fully accumulated arguments on the next type boundary. - // This ensures display messages always have complete args (channels can't - // edit messages after sending). - const pendingToolCalls = new Map(); - const self = this; - const capturedConvKey = convKey; // Capture for closure - - /** Merge tool argument strings, handling both delta and cumulative chunking. */ - function mergeToolArgs(existing: string, incoming: string): string { - if (!incoming) return existing; - if (!existing) return incoming; - if (incoming === existing) return existing; - // Cumulative: latest chunk includes all prior text - if (incoming.startsWith(existing)) return incoming; - if (existing.endsWith(incoming)) return existing; - // Delta: each chunk is an append - return `${existing}${incoming}`; - } - - function* flushPending(): Generator { - for (const [, pending] of pendingToolCalls) { - if (!pending.accumulatedArgs) { - // No rawArguments accumulated (old SDK or single complete chunk) -- - // preserve the original toolInput from the first chunk as-is. - yield pending.msg; - continue; - } - let toolInput: Record = {}; - try { toolInput = JSON.parse(pending.accumulatedArgs); } - catch { toolInput = { raw: pending.accumulatedArgs }; } - yield { ...pending.msg, toolInput }; - } - pendingToolCalls.clear(); - lastPendingToolCallId = null; - } - - let anonToolCallCounter = 0; - let lastPendingToolCallId: string | null = null; - - async function* dedupedStream(): AsyncGenerator { - for await (const raw of session.stream()) { - const msg = raw as StreamMsg; - - if (msg.type === 'tool_call') { - let id = msg.toolCallId; - if (!id) { - // Tool calls without IDs (e.g., from models that don't emit - // tool_call_id on subsequent argument chunks) still need to be - // accumulated. Assign a synthetic ID so they enter the buffer. - // If tool name matches the most recent pending call, treat this as - // a continuation even when the first chunk had a real toolCallId. - const currentPending = lastPendingToolCallId ? pendingToolCalls.get(lastPendingToolCallId) : null; - if (lastPendingToolCallId && currentPending && (currentPending.msg.toolName || 'unknown') === (msg.toolName || 'unknown')) { - id = lastPendingToolCallId; - } else { - id = `__anon_${++anonToolCallCounter}__`; - } - } - - const incoming = (msg as StreamMsg & { rawArguments?: string }).rawArguments || ''; - const existing = pendingToolCalls.get(id); - if (existing) { - existing.accumulatedArgs = mergeToolArgs(existing.accumulatedArgs, incoming); - } else { - pendingToolCalls.set(id, { msg, accumulatedArgs: incoming }); - } - lastPendingToolCallId = id; - continue; // buffer, don't yield yet - } - - // Flush pending tool calls on semantic type boundary (not stream_event) - if (pendingToolCalls.size > 0 && msg.type !== 'stream_event') { - yield* flushPending(); - } - - if (msg.type === 'result') { - // Flush any remaining before result - yield* flushPending(); - self.persistSessionState(session, capturedConvKey); - } - - yield msg; - - if (msg.type === 'result') { - break; - } - } - - // Flush remaining at generator end (shouldn't normally happen) - yield* flushPending(); - } - - return { session, stream: dedupedStream }; + return this.sessionManager.warmSession(); } // ========================================================================= @@ -1191,13 +573,13 @@ export class LettaBot implements AgentSession { this.store.clearConversation(convKey); this.store.resetRecoveryAttempts(); - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); log.info(`/reset - conversation cleared for key="${convKey}"`); // Eagerly create the new session so we can report the conversation ID. try { - const session = await this.ensureSessionForKey(convKey); + const session = await this.sessionManager.ensureSessionForKey(convKey); const newConvId = session.conversationId || '(pending)'; - this.persistSessionState(session, convKey); + this.sessionManager.persistSessionState(session, convKey); if (convKey === 'shared') { return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`; } @@ -1223,7 +605,7 @@ export class LettaBot implements AgentSession { this.cancelledKeys.add(convKey); // Abort client-side stream - const session = this.sessions.get(convKey); + const session = this.sessionManager.getSession(convKey); if (session) { session.abort().catch(() => {}); log.info(`/cancel - aborted session stream (key=${convKey})`); @@ -1577,7 +959,7 @@ export class LettaBot implements AgentSession { if (userText.length > 0) { log.debug(`processMessage seq=${seq} textPreview=${userText.slice(0, 80)}`); } - const run = await this.runSession(messageToSend, { retried, canUseTool, convKey }); + const run = await this.sessionManager.runSession(messageToSend, { retried, canUseTool, convKey }); lap('session send'); session = run.session; @@ -1715,7 +1097,7 @@ export class LettaBot implements AgentSession { // Log meaningful events with structured summaries if (streamMsg.type === 'tool_call') { - this.syncTodoToolCall(streamMsg); + this.sessionManager.syncTodoToolCall(streamMsg); const tcName = streamMsg.toolName || 'unknown'; const tcId = streamMsg.toolCallId?.slice(0, 12) || '?'; log.info(`>>> TOOL CALL: ${tcName} (id: ${tcId})`); @@ -1826,7 +1208,7 @@ export class LettaBot implements AgentSession { // next message. Discard it and retry so the message gets processed. if (streamMsg.stopReason === 'cancelled') { log.info(`Discarding cancelled run result (seq=${seq}, len=${typeof streamMsg.result === 'string' ? streamMsg.result.length : 0})`); - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); session = null; if (!retried) { return this.processMessage(msg, adapter, true); @@ -1836,7 +1218,7 @@ export class LettaBot implements AgentSession { const resultRunState = this.classifyResultRun(convKey, streamMsg); if (resultRunState === 'stale') { - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); session = null; if (!retried) { log.warn(`Retrying message after stale duplicate result (seq=${seq}, key=${convKey})`); @@ -1925,7 +1307,7 @@ export class LettaBot implements AgentSession { if (isApprovalConflict && !retried && this.store.agentId) { if (retryConvId) { log.info('Approval conflict detected -- attempting targeted recovery...'); - this.invalidateSession(retryConvKey); + this.sessionManager.invalidateSession(retryConvKey); session = null; clearInterval(typingInterval); const convResult = await recoverOrphanedConversationApproval( @@ -1969,7 +1351,7 @@ export class LettaBot implements AgentSession { if (!retried && this.store.agentId && retryConvId) { const reason = shouldRetryForErrorResult ? 'error result' : 'empty result'; log.info(`${reason} - attempting orphaned approval recovery...`); - this.invalidateSession(retryConvKey); + this.sessionManager.invalidateSession(retryConvKey); session = null; clearInterval(typingInterval); const convResult = await recoverOrphanedConversationApproval( @@ -2113,7 +1495,7 @@ export class LettaBot implements AgentSession { // eliminate any possibility of stream state bleed between sequential // sends. Costs ~5s subprocess init overhead per message. if (this.config.reuseSession === false) { - this.invalidateSession(finalConvKey); + this.sessionManager.invalidateSession(finalConvKey); } this.cancelledKeys.delete(finalConvKey); } @@ -2176,7 +1558,7 @@ export class LettaBot implements AgentSession { try { let retried = false; while (true) { - const { stream } = await this.runSession(text, { convKey, retried }); + const { stream } = await this.sessionManager.runSession(text, { convKey, retried }); try { let response = ''; @@ -2184,7 +1566,7 @@ export class LettaBot implements AgentSession { let lastErrorDetail: { message: string; stopReason: string; apiError?: Record } | undefined; for await (const msg of stream()) { if (msg.type === 'tool_call') { - this.syncTodoToolCall(msg); + this.sessionManager.syncTodoToolCall(msg); } if (msg.type === 'error') { lastErrorDetail = { @@ -2224,7 +1606,7 @@ export class LettaBot implements AgentSession { } if (sawStaleDuplicateResult) { - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); if (retried) { throw new Error('Agent stream returned stale duplicate result after retry'); } @@ -2239,13 +1621,14 @@ export class LettaBot implements AgentSession { return response; } catch (error) { // Invalidate on stream errors so next call gets a fresh subprocess - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); throw error; } + } } finally { if (this.config.reuseSession === false) { - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); } this.releaseLock(convKey, acquired); } @@ -2263,17 +1646,17 @@ export class LettaBot implements AgentSession { const acquired = await this.acquireLock(convKey); try { - const { stream } = await this.runSession(text, { convKey }); + const { stream } = await this.sessionManager.runSession(text, { convKey }); try { yield* stream(); } catch (error) { - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); throw error; } } finally { if (this.config.reuseSession === false) { - this.invalidateSession(convKey); + this.sessionManager.invalidateSession(convKey); } this.releaseLock(convKey, acquired); } diff --git a/src/core/listening-mode-directives.test.ts b/src/core/listening-mode-directives.test.ts index c199e75..fa52a48 100644 --- a/src/core/listening-mode-directives.test.ts +++ b/src/core/listening-mode-directives.test.ts @@ -39,7 +39,7 @@ describe('listening mode directive safety', () => { sendFile: vi.fn(async () => ({ messageId: 'file-1' })), }; - (bot as any).runSession = vi.fn(async () => ({ + (bot as any).sessionManager.runSession = vi.fn(async () => ({ session: { abort: vi.fn(async () => {}) }, stream: async function* () { yield { diff --git a/src/core/result-guard.test.ts b/src/core/result-guard.test.ts index b253ccd..0ff0cd9 100644 --- a/src/core/result-guard.test.ts +++ b/src/core/result-guard.test.ts @@ -36,7 +36,7 @@ describe('result divergence guard', () => { sendFile: vi.fn(async () => ({ messageId: 'file-1' })), }; - (bot as any).runSession = vi.fn(async () => ({ + (bot as any).sessionManager.runSession = vi.fn(async () => ({ session: { abort: vi.fn(async () => {}) }, stream: async function* () { // Assistant text is flushed when tool_call arrives. @@ -81,7 +81,7 @@ describe('result divergence guard', () => { sendFile: vi.fn(async () => ({ messageId: 'file-1' })), }; - (bot as any).runSession = vi.fn(async () => ({ + (bot as any).sessionManager.runSession = vi.fn(async () => ({ session: { abort: vi.fn(async () => {}) }, stream: async function* () { yield { type: 'assistant', content: 'streamed-segment' }; diff --git a/src/core/sdk-session-contract.test.ts b/src/core/sdk-session-contract.test.ts index b35f44b..0cf5063 100644 --- a/src/core/sdk-session-contract.test.ts +++ b/src/core/sdk-session-contract.test.ts @@ -547,19 +547,20 @@ describe('SDK session contract', () => { bot.setAgentId('agent-contract-test'); const botInternal = bot as any; - botInternal.sessions.set('telegram:active', activeSession); - botInternal.sessions.set('telegram:idle', idleSession); - botInternal.sessionLastUsed.set('telegram:active', 1); - botInternal.sessionLastUsed.set('telegram:idle', 2); + const sm = botInternal.sessionManager; + sm.sessions.set('telegram:active', activeSession); + sm.sessions.set('telegram:idle', idleSession); + sm.sessionLastUsed.set('telegram:active', 1); + sm.sessionLastUsed.set('telegram:idle', 2); botInternal.processingKeys.add('telegram:active'); - await botInternal._createSessionForKey('telegram:new', true, 0); + await sm._createSessionForKey('telegram:new', true, 0); expect(activeSession.close).not.toHaveBeenCalled(); expect(idleSession.close).toHaveBeenCalledTimes(1); - expect(botInternal.sessions.has('telegram:active')).toBe(true); - expect(botInternal.sessions.has('telegram:idle')).toBe(false); - expect(botInternal.sessions.has('telegram:new')).toBe(true); + expect(sm.sessions.has('telegram:active')).toBe(true); + expect(sm.sessions.has('telegram:idle')).toBe(false); + expect(sm.sessions.has('telegram:new')).toBe(true); }); it('enriches opaque error via stream error event in sendToAgent', async () => { diff --git a/src/core/session-manager.ts b/src/core/session-manager.ts new file mode 100644 index 0000000..f1f056c --- /dev/null +++ b/src/core/session-manager.ts @@ -0,0 +1,652 @@ +/** + * SessionManager - Owns CLI subprocess lifecycle, session creation, + * LRU eviction, invalidation, and message send+stream. + * + * Extracted from bot.ts to isolate session concerns from message + * routing, channel management, and directive execution. + */ + +import { createAgent, createSession, resumeSession, type Session, type SendMessage, type CanUseToolCallback } from '@letta-ai/letta-code-sdk'; +import type { BotConfig, StreamMsg } from './types.js'; +import { isApprovalConflictError, isConversationMissingError, isAgentMissingFromInitError } from './errors.js'; +import { Store } from './store.js'; +import { updateAgentName, recoverOrphanedConversationApproval } from '../tools/letta-api.js'; +import { installSkillsToAgent, withAgentSkillsOnPath } from '../skills/loader.js'; +import { loadMemoryBlocks } from './memory.js'; +import { SYSTEM_PROMPT } from './system-prompt.js'; +import { createManageTodoTool } from '../tools/todo.js'; +import { syncTodosFromTool } from '../todo/store.js'; +import { createLogger } from '../logger.js'; + +const log = createLogger('Session'); + +export class SessionManager { + private readonly store: Store; + private readonly config: BotConfig; + + // Active processing keys -- owned by LettaBot, read here for LRU eviction safety. + private readonly processingKeys: ReadonlySet; + // Stale-result fingerprints -- owned by LettaBot, cleaned here on invalidation/eviction. + private readonly lastResultRunFingerprints: Map; + + // Persistent sessions: reuse CLI subprocesses across messages. + private sessions: Map = new Map(); + private sessionLastUsed: Map = new Map(); + private sessionCreationLocks: Map; generation: number }> = new Map(); + private sessionGenerations: Map = new Map(); + + // Per-message tool callback. Updated before each send() so the Session + // options (which hold a stable wrapper) route to the current handler. + private currentCanUseTool: CanUseToolCallback | undefined; + + // Stable callback wrapper so the Session options never change, but we can + // swap out the per-message handler before each send(). + private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => { + if (this.currentCanUseTool) { + return this.currentCanUseTool(toolName, toolInput); + } + return { behavior: 'allow' as const }; + }; + + constructor( + store: Store, + config: BotConfig, + processingKeys: ReadonlySet, + lastResultRunFingerprints: Map, + ) { + this.store = store; + this.config = config; + this.processingKeys = processingKeys; + this.lastResultRunFingerprints = lastResultRunFingerprints; + } + + // ========================================================================= + // Todo sync (stream utility) + // ========================================================================= + + private getTodoAgentKey(): string { + return this.store.agentId || this.config.agentName || 'LettaBot'; + } + + /** Sync TodoWrite tool calls to the persistent heartbeat store. */ + syncTodoToolCall(streamMsg: StreamMsg): void { + if (streamMsg.type !== 'tool_call') return; + + const normalizedToolName = (streamMsg.toolName || '').toLowerCase(); + const isBuiltInTodoTool = normalizedToolName === 'todowrite' + || normalizedToolName === 'todo_write' + || normalizedToolName === 'writetodos' + || normalizedToolName === 'write_todos'; + if (!isBuiltInTodoTool) return; + + const input = (streamMsg.toolInput && typeof streamMsg.toolInput === 'object') + ? streamMsg.toolInput as Record + : null; + if (!input || !Array.isArray(input.todos)) return; + + const incoming: Array<{ + content?: string; + description?: string; + status: 'pending' | 'in_progress' | 'completed' | 'cancelled'; + }> = []; + for (const item of input.todos) { + if (!item || typeof item !== 'object') continue; + const obj = item as Record; + const statusRaw = typeof obj.status === 'string' ? obj.status : ''; + if (!['pending', 'in_progress', 'completed', 'cancelled'].includes(statusRaw)) continue; + incoming.push({ + content: typeof obj.content === 'string' ? obj.content : undefined, + description: typeof obj.description === 'string' ? obj.description : undefined, + status: statusRaw as 'pending' | 'in_progress' | 'completed' | 'cancelled', + }); + } + if (incoming.length === 0) return; + + try { + const summary = syncTodosFromTool(this.getTodoAgentKey(), incoming); + if (summary.added > 0 || summary.updated > 0) { + log.info(`Synced ${summary.totalIncoming} todo(s) from ${streamMsg.toolName} into heartbeat store (added=${summary.added}, updated=${summary.updated})`); + } + } catch (err) { + log.warn('Failed to sync TodoWrite todos:', err instanceof Error ? err.message : err); + } + } + + // ========================================================================= + // Session options & timeout + // ========================================================================= + + private getSessionTimeoutMs(): number { + const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS); + if (Number.isFinite(envTimeoutMs) && envTimeoutMs > 0) { + return envTimeoutMs; + } + return 60000; + } + + async withSessionTimeout( + promise: Promise, + label: string, + ): Promise { + const timeoutMs = this.getSessionTimeoutMs(); + let timeoutId: ReturnType | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutId = setTimeout(() => { + reject(new Error(`${label} timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); + try { + return await Promise.race([promise, timeoutPromise]); + } finally { + if (timeoutId) clearTimeout(timeoutId); + } + } + + private baseSessionOptions(canUseTool?: CanUseToolCallback) { + return { + permissionMode: 'bypassPermissions' as const, + allowedTools: this.config.allowedTools, + disallowedTools: [ + // Block built-in TodoWrite -- it requires interactive approval (fails + // silently during heartbeats) and writes to the CLI's own store rather + // than lettabot's persistent heartbeat store. The agent should use the + // custom manage_todo tool instead. + 'TodoWrite', + ...(this.config.disallowedTools || []), + ], + cwd: this.config.workingDir, + tools: [createManageTodoTool(this.getTodoAgentKey())], + // Memory filesystem (context repository): true -> --memfs, false -> --no-memfs, undefined -> leave unchanged + ...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}), + // In bypassPermissions mode, canUseTool is only called for interactive + // tools (AskUserQuestion, ExitPlanMode). When no callback is provided + // (background triggers), the SDK auto-denies interactive tools. + ...(canUseTool ? { canUseTool } : {}), + }; + } + + // ========================================================================= + // Session lifecycle (per-key) + // ========================================================================= + + /** + * Return the persistent session for the given conversation key, + * creating and initializing it if needed. + * + * After initialization, calls bootstrapState() to detect pending approvals. + * If an orphaned approval is found, recovers proactively before returning + * the session -- preventing the first send() from hitting a 409 CONFLICT. + */ + async ensureSessionForKey(key: string, bootstrapRetried = false): Promise { + const generation = this.sessionGenerations.get(key) ?? 0; + + // Fast path: session already exists + const existing = this.sessions.get(key); + if (existing) { + this.sessionLastUsed.set(key, Date.now()); + return existing; + } + + // Coalesce concurrent callers: if another call is already creating this + // key (e.g. warmSession running while first message arrives), wait for + // it instead of creating a duplicate session. + const pending = this.sessionCreationLocks.get(key); + if (pending && pending.generation === generation) return pending.promise; + + const promise = this._createSessionForKey(key, bootstrapRetried, generation); + this.sessionCreationLocks.set(key, { promise, generation }); + try { + return await promise; + } finally { + const current = this.sessionCreationLocks.get(key); + if (current?.promise === promise) { + this.sessionCreationLocks.delete(key); + } + } + } + + /** Internal session creation -- called via ensureSessionForKey's lock. */ + private async _createSessionForKey( + key: string, + bootstrapRetried: boolean, + generation: number, + ): Promise { + // Session was invalidated while this creation path was queued. + if ((this.sessionGenerations.get(key) ?? 0) !== generation) { + return this.ensureSessionForKey(key, bootstrapRetried); + } + + // Re-read the store file from disk so we pick up agent/conversation ID + // changes made by other processes (e.g. after a restart or container deploy). + this.store.refresh(); + + const opts = this.baseSessionOptions(this.sessionCanUseTool); + let session: Session; + let sessionAgentId: string | undefined; + + // In disabled mode, always resume the agent's built-in default conversation. + // Skip store lookup entirely -- no conversation ID is persisted. + const convId = key === 'default' + ? null + : key === 'shared' + ? this.store.conversationId + : this.store.getConversationId(key); + + // Propagate per-agent cron store path to CLI subprocesses (lettabot-schedule) + if (this.config.cronStorePath) { + process.env.CRON_STORE_PATH = this.config.cronStorePath; + } + + if (key === 'default' && this.store.agentId) { + process.env.LETTA_AGENT_ID = this.store.agentId; + installSkillsToAgent(this.store.agentId, this.config.skills); + sessionAgentId = this.store.agentId; + session = resumeSession('default', opts); + } else if (convId) { + process.env.LETTA_AGENT_ID = this.store.agentId || undefined; + if (this.store.agentId) { + installSkillsToAgent(this.store.agentId, this.config.skills); + sessionAgentId = this.store.agentId; + } + session = resumeSession(convId, opts); + } else if (this.store.agentId) { + // Agent exists but no conversation stored -- resume the default conversation + process.env.LETTA_AGENT_ID = this.store.agentId; + installSkillsToAgent(this.store.agentId, this.config.skills); + sessionAgentId = this.store.agentId; + session = resumeSession(this.store.agentId, opts); + } else { + // Create new agent -- persist immediately so we don't orphan it on later failures + log.info('Creating new agent'); + const newAgentId = await createAgent({ + systemPrompt: SYSTEM_PROMPT, + memory: loadMemoryBlocks(this.config.agentName), + tags: ['origin:lettabot'], + ...(this.config.memfs !== undefined ? { memfs: this.config.memfs } : {}), + }); + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(newAgentId, currentBaseUrl); + log.info('Saved new agent ID:', newAgentId); + + if (this.config.agentName) { + updateAgentName(newAgentId, this.config.agentName).catch(() => {}); + } + installSkillsToAgent(newAgentId, this.config.skills); + sessionAgentId = newAgentId; + + // In disabled mode, resume the built-in default conversation instead of + // creating a new one. Other modes create a fresh conversation per key. + session = key === 'default' + ? resumeSession('default', opts) + : createSession(newAgentId, opts); + } + + // Initialize eagerly so the subprocess is ready before the first send() + log.info(`Initializing session subprocess (key=${key})...`); + try { + if (sessionAgentId) { + await withAgentSkillsOnPath( + sessionAgentId, + () => this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`), + ); + } else { + await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`); + } + log.info(`Session subprocess ready (key=${key})`); + } catch (error) { + // Close immediately so failed initialization cannot leak a subprocess. + session.close(); + + // If the stored agent ID doesn't exist on the server (deleted externally, + // ghost agent from failed pairing, etc.), clear the stale ID and retry. + if (this.store.agentId && !bootstrapRetried && isAgentMissingFromInitError(error)) { + log.warn( + `Agent ${this.store.agentId} appears missing from server, ` + + `clearing stale agent ID and recreating...`, + ); + this.store.clearAgent(); + return this._createSessionForKey(key, /* bootstrapRetried */ true, generation); + } + + throw error; + } + + // reset/invalidate can happen while initialize() is in-flight. + if ((this.sessionGenerations.get(key) ?? 0) !== generation) { + log.info(`Discarding stale initialized session (key=${key})`); + session.close(); + return this.ensureSessionForKey(key, bootstrapRetried); + } + + // Proactive approval detection via bootstrapState(). + if (!bootstrapRetried && this.store.agentId) { + try { + const bootstrap = await this.withSessionTimeout( + session.bootstrapState(), + `Session bootstrapState (key=${key})`, + ); + if (bootstrap.hasPendingApproval) { + const convId = bootstrap.conversationId || session.conversationId; + log.warn(`Pending approval detected at session startup (key=${key}, conv=${convId}), recovering...`); + session.close(); + if (convId) { + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + convId, + true, /* deepScan */ + ); + if (result.recovered) { + log.info(`Proactive approval recovery succeeded: ${result.details}`); + } else { + log.warn(`Proactive approval recovery did not find resolvable approvals: ${result.details}`); + } + } + return this._createSessionForKey(key, true, generation); + } + } catch (err) { + // bootstrapState failure is non-fatal -- the reactive 409 handler in + // runSession() will catch stuck approvals. + log.warn(`bootstrapState check failed (key=${key}), continuing:`, err instanceof Error ? err.message : err); + } + } + + if ((this.sessionGenerations.get(key) ?? 0) !== generation) { + log.info(`Discarding stale session after bootstrapState (key=${key})`); + session.close(); + return this.ensureSessionForKey(key, bootstrapRetried); + } + + // LRU eviction: in per-chat mode, limit concurrent sessions to avoid + // unbounded subprocess growth. + const maxSessions = this.config.maxSessions ?? 10; + if (this.config.conversationMode === 'per-chat' && this.sessions.size >= maxSessions) { + let oldestKey: string | null = null; + let oldestTime = Infinity; + for (const [k, ts] of this.sessionLastUsed) { + if (k === key) continue; + if (!this.sessions.has(k)) continue; + // Never evict an active/in-flight key (can close a live stream). + if (this.processingKeys.has(k) || this.sessionCreationLocks.has(k)) continue; + if (ts < oldestTime) { + oldestKey = k; + oldestTime = ts; + } + } + if (oldestKey) { + log.info(`LRU session eviction: closing session for key="${oldestKey}" (${this.sessions.size} active, max=${maxSessions})`); + const evicted = this.sessions.get(oldestKey); + evicted?.close(); + this.sessions.delete(oldestKey); + this.sessionLastUsed.delete(oldestKey); + this.sessionGenerations.delete(oldestKey); + this.sessionCreationLocks.delete(oldestKey); + this.lastResultRunFingerprints.delete(oldestKey); + } else { + // All existing sessions are active; allow temporary overflow. + log.debug(`LRU session eviction skipped: all ${this.sessions.size} sessions are active/in-flight`); + } + } + + this.sessions.set(key, session); + this.sessionLastUsed.set(key, Date.now()); + return session; + } + + /** Get an active session by key (for abort/cancel). */ + getSession(key: string): Session | undefined { + return this.sessions.get(key); + } + + /** + * Destroy session(s). If key provided, destroys only that key. + * If key is undefined, destroys ALL sessions. + */ + invalidateSession(key?: string): void { + if (key) { + const nextGeneration = (this.sessionGenerations.get(key) ?? 0) + 1; + this.sessionGenerations.set(key, nextGeneration); + this.sessionCreationLocks.delete(key); + + const session = this.sessions.get(key); + if (session) { + log.info(`Invalidating session (key=${key})`); + session.close(); + this.sessions.delete(key); + this.sessionLastUsed.delete(key); + } + this.lastResultRunFingerprints.delete(key); + } else { + const keys = new Set([ + ...this.sessions.keys(), + ...this.sessionCreationLocks.keys(), + ]); + for (const k of keys) { + const nextGeneration = (this.sessionGenerations.get(k) ?? 0) + 1; + this.sessionGenerations.set(k, nextGeneration); + } + + for (const [k, session] of this.sessions) { + log.info(`Invalidating session (key=${k})`); + session.close(); + } + this.sessions.clear(); + this.sessionCreationLocks.clear(); + this.sessionLastUsed.clear(); + this.lastResultRunFingerprints.clear(); + } + } + + /** + * Pre-warm the session subprocess at startup. + */ + async warmSession(): Promise { + this.store.refresh(); + if (!this.store.agentId && !this.store.conversationId) return; + try { + const mode = this.config.conversationMode || 'shared'; + if (mode === 'shared') { + await this.ensureSessionForKey('shared'); + } + } catch (err) { + log.warn('Session pre-warm failed:', err instanceof Error ? err.message : err); + } + } + + /** + * Persist conversation ID after a successful session result. + * Agent ID and first-run setup are handled eagerly in ensureSessionForKey(). + */ + persistSessionState(session: Session, convKey?: string): void { + // Agent ID already persisted in ensureSessionForKey() on creation. + // Here we only update if the server returned a different one (shouldn't happen). + if (session.agentId && session.agentId !== this.store.agentId) { + const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com'; + this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined); + log.info('Agent ID updated:', session.agentId); + } else if (session.conversationId && session.conversationId !== 'default' && convKey !== 'default') { + // In per-channel mode, persist per-key. In shared mode, use legacy field. + // Skip saving "default" -- it's an API alias, not a real conversation ID. + // In disabled mode (convKey === 'default'), skip -- always use the built-in default. + if (convKey && convKey !== 'shared') { + const existing = this.store.getConversationId(convKey); + if (session.conversationId !== existing) { + this.store.setConversationId(convKey, session.conversationId); + log.info(`Conversation ID updated (key=${convKey}):`, session.conversationId); + } + } else if (session.conversationId !== this.store.conversationId) { + this.store.conversationId = session.conversationId; + log.info('Conversation ID updated:', session.conversationId); + } + } + } + + // ========================================================================= + // Send + stream + // ========================================================================= + + /** + * Send a message and return a deduplicated stream. + * + * Handles: + * - Persistent session reuse (subprocess stays alive across messages) + * - CONFLICT recovery from orphaned approvals (retry once) + * - Conversation-not-found fallback (create new conversation) + * - Tool call deduplication + * - Session persistence after result + */ + async runSession( + message: SendMessage, + options: { retried?: boolean; canUseTool?: CanUseToolCallback; convKey?: string } = {}, + ): Promise<{ session: Session; stream: () => AsyncGenerator }> { + const { retried = false, canUseTool, convKey = 'shared' } = options; + + // Update the per-message callback before sending + this.currentCanUseTool = canUseTool; + + let session = await this.ensureSessionForKey(convKey); + + // Resolve the conversation ID for this key (for error recovery) + const convId = convKey === 'shared' + ? this.store.conversationId + : this.store.getConversationId(convKey); + + // Send message with fallback chain + try { + await this.withSessionTimeout(session.send(message), `Session send (key=${convKey})`); + } catch (error) { + // 409 CONFLICT from orphaned approval + if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) { + log.info('CONFLICT detected - attempting orphaned approval recovery...'); + this.invalidateSession(convKey); + const result = await recoverOrphanedConversationApproval( + this.store.agentId, + convId + ); + if (result.recovered) { + log.info(`Recovery succeeded (${result.details}), retrying...`); + return this.runSession(message, { retried: true, canUseTool, convKey }); + } + log.error(`Orphaned approval recovery failed: ${result.details}`); + throw error; + } + + // Conversation/agent not found - try creating a new conversation. + if (this.store.agentId && isConversationMissingError(error)) { + log.warn(`Conversation not found (key=${convKey}), creating a new conversation...`); + this.invalidateSession(convKey); + if (convKey !== 'shared') { + this.store.clearConversation(convKey); + } else { + this.store.conversationId = null; + } + session = await this.ensureSessionForKey(convKey); + try { + await this.withSessionTimeout(session.send(message), `Session send retry (key=${convKey})`); + } catch (retryError) { + this.invalidateSession(convKey); + throw retryError; + } + } else { + // Unknown error -- invalidate so we get a fresh subprocess next time + this.invalidateSession(convKey); + throw error; + } + } + + // Persist conversation ID immediately after successful send, before streaming. + this.persistSessionState(session, convKey); + + // Return session and a stream generator that buffers tool_call chunks and + // flushes them with fully accumulated arguments on the next type boundary. + const pendingToolCalls = new Map(); + const self = this; + const capturedConvKey = convKey; // Capture for closure + + /** Merge tool argument strings, handling both delta and cumulative chunking. */ + function mergeToolArgs(existing: string, incoming: string): string { + if (!incoming) return existing; + if (!existing) return incoming; + if (incoming === existing) return existing; + // Cumulative: latest chunk includes all prior text + if (incoming.startsWith(existing)) return incoming; + if (existing.endsWith(incoming)) return existing; + // Delta: each chunk is an append + return `${existing}${incoming}`; + } + + function* flushPending(): Generator { + for (const [, pending] of pendingToolCalls) { + if (!pending.accumulatedArgs) { + // No rawArguments accumulated (old SDK or single complete chunk) -- + // preserve the original toolInput from the first chunk as-is. + yield pending.msg; + continue; + } + let toolInput: Record = {}; + try { toolInput = JSON.parse(pending.accumulatedArgs); } + catch { toolInput = { raw: pending.accumulatedArgs }; } + yield { ...pending.msg, toolInput }; + } + pendingToolCalls.clear(); + lastPendingToolCallId = null; + } + + let anonToolCallCounter = 0; + let lastPendingToolCallId: string | null = null; + + async function* dedupedStream(): AsyncGenerator { + for await (const raw of session.stream()) { + const msg = raw as StreamMsg; + + if (msg.type === 'tool_call') { + let id = msg.toolCallId; + if (!id) { + // Tool calls without IDs (e.g., from models that don't emit + // tool_call_id on subsequent argument chunks) still need to be + // accumulated. Assign a synthetic ID so they enter the buffer. + // If tool name matches the most recent pending call, treat this as + // a continuation even when the first chunk had a real toolCallId. + const currentPending = lastPendingToolCallId ? pendingToolCalls.get(lastPendingToolCallId) : null; + if (lastPendingToolCallId && currentPending && (currentPending.msg.toolName || 'unknown') === (msg.toolName || 'unknown')) { + id = lastPendingToolCallId; + } else { + id = `__anon_${++anonToolCallCounter}__`; + } + } + + const incoming = (msg as StreamMsg & { rawArguments?: string }).rawArguments || ''; + const existing = pendingToolCalls.get(id); + if (existing) { + existing.accumulatedArgs = mergeToolArgs(existing.accumulatedArgs, incoming); + } else { + pendingToolCalls.set(id, { msg, accumulatedArgs: incoming }); + } + lastPendingToolCallId = id; + continue; // buffer, don't yield yet + } + + // Flush pending tool calls on semantic type boundary (not stream_event) + if (pendingToolCalls.size > 0 && msg.type !== 'stream_event') { + yield* flushPending(); + } + + if (msg.type === 'result') { + // Flush any remaining before result + yield* flushPending(); + self.persistSessionState(session, capturedConvKey); + } + + yield msg; + + if (msg.type === 'result') { + break; + } + } + + // Flush remaining at generator end (shouldn't normally happen) + yield* flushPending(); + } + + return { session, stream: dedupedStream }; + } +}