feat: Matrix adapter with E2EE, TTS/STT, reactions, and heartbeat routing

Full Matrix channel integration for LettaBot:

- E2EE via rust crypto (ephemeral mode, cross-signing bootstrap)
- Proactive SAS verification with Element clients
- TTS (VibeVoice) and STT (Faster-Whisper) voice pipeline
- Streaming message edits with 800ms throttle
- Collapsible reasoning blocks via <details> htmlPrefix
- Per-tool emoji reactions (brain, eyes, tool-specific, max 6)
- Heartbeat room conversation routing (heartbeatTargetChatId)
- Custom heartbeat prompt with first-person voice
- Per-room conversation isolation (per-chat mode)
- !pause, !resume, !status, !new, !timeout, !turns commands
- Audio/image/file upload handlers with E2EE media
- SDK 0.1.11 (approval recovery), CLI 0.18.2

Tested against Synapse homeserver with E2EE enabled for 2+ weeks,
handles key backup/restore and device verification.
This commit is contained in:
Ani Tunturi
2026-03-14 21:27:32 -04:00
committed by Ani
parent f1f3540005
commit 18010eb14f
44 changed files with 8082 additions and 283 deletions

View File

@@ -12,7 +12,7 @@ import type { SendMessageResponse, ChatRequest, ChatResponse, AsyncChatResponse,
import { listPairingRequests, approvePairingCode } from '../pairing/store.js';
import { parseMultipart } from './multipart.js';
import type { AgentRouter } from '../core/interfaces.js';
import type { ChannelId } from '../core/types.js';
import type { ChannelId } from '../channels/setup.js';
import type { Store } from '../core/store.js';
import {
generateCompletionId, extractLastUserMessage, buildCompletion,
@@ -25,7 +25,7 @@ import { getTurnViewerHtml } from '../core/turn-viewer.js';
import { createLogger } from '../logger.js';
const log = createLogger('API');
const VALID_CHANNELS: ChannelId[] = ['telegram', 'slack', 'discord', 'whatsapp', 'signal'];
const VALID_CHANNELS: ChannelId[] = ['telegram', 'slack', 'discord', 'whatsapp', 'signal', 'matrix'];
const MAX_BODY_SIZE = 10 * 1024; // 10KB
const MAX_TEXT_LENGTH = 10000; // 10k chars
const MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB

View File

@@ -1,5 +1,6 @@
import { BlueskyAdapter } from './bluesky.js';
import { DiscordAdapter } from './discord.js';
import { MatrixAdapter } from './matrix/index.js';
import { SignalAdapter } from './signal.js';
import { SlackAdapter } from './slack.js';
import { TelegramMTProtoAdapter } from './telegram-mtproto.js';
@@ -188,6 +189,41 @@ export function createChannelsForAgent(
}
}
// Matrix: E2EE, TTS, STT, per-room routing
const matrixConfig = agentConfig.channels.matrix;
if (matrixConfig?.enabled !== false && matrixConfig?.homeserverUrl && matrixConfig?.userId) {
adapters.push(new MatrixAdapter({
homeserverUrl: matrixConfig.homeserverUrl,
userId: matrixConfig.userId,
accessToken: matrixConfig.accessToken,
password: matrixConfig.password,
deviceId: matrixConfig.deviceId,
dmPolicy: matrixConfig.dmPolicy || 'pairing',
allowedUsers: nonEmpty(matrixConfig.allowedUsers),
selfChatMode: matrixConfig.selfChatMode ?? false,
enableEncryption: matrixConfig.enableEncryption ?? true,
recoveryKey: matrixConfig.recoveryKey,
userDeviceId: matrixConfig.userDeviceId,
storeDir: matrixConfig.storeDir || './data/matrix',
sessionDir: matrixConfig.sessionDir || matrixConfig.storeDir || './data/matrix',
autoJoinRooms: matrixConfig.autoJoinRooms ?? true,
// Group batching
groupDebounceSec: matrixConfig.groupDebounceSec,
instantGroups: nonEmpty(matrixConfig.instantGroups),
listeningGroups: nonEmpty(matrixConfig.listeningGroups),
// TTS/STT
transcriptionEnabled: matrixConfig.transcriptionEnabled ?? true,
sttUrl: matrixConfig.sttUrl,
ttsUrl: matrixConfig.ttsUrl,
ttsVoice: matrixConfig.ttsVoice,
enableAudioResponse: matrixConfig.enableAudioResponse ?? false,
audioRoomFilter: matrixConfig.audioRoomFilter || 'dm_only',
// Other features
attachmentsDir: sharedOptions.attachmentsDir,
attachmentsMaxBytes: sharedOptions.attachmentsMaxBytes,
}));
}
// Bluesky: only start if there's something to subscribe to
if (agentConfig.channels.bluesky?.enabled) {
const bsky = agentConfig.channels.bluesky;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,273 @@
/**
* Matrix Bot Command Processor
*
* Handles !commands sent by allowed users in Matrix rooms:
* !commands — list all available commands
* !pause — silence bot in current room (SQLite persisted)
* !resume — re-enable bot in current room
* !status — show paused rooms, ignored bots, heartbeat state
* !ignorebot-add @u:s — add user to global ignore list (prevents bot loops)
* !ignorebot-remove @u:s — remove user from ignore list
* !heartbeat on/off — toggle the heartbeat cron (in-memory)
*
* Commands run AFTER access control (allowedUsers) but BEFORE the paused-room
* check, so !resume always works even in a paused room.
* Unrecognized !x commands fall through to Letta as normal text.
*/
import { createLogger } from "../../logger.js";
import type { MatrixStorage } from "./storage.js";
const log = createLogger('MatrixCommands');
interface CommandCallbacks {
onHeartbeatStop?: () => void;
onHeartbeatStart?: () => void;
isHeartbeatEnabled?: () => boolean;
onTimeoutHeartbeat?: () => void;
getAgentId?: () => string | undefined;
onInvalidateSession?: (key: string) => void;
}
export class MatrixCommandProcessor {
// Per-room bot-turn counters: roomId → remaining turns
private botTurns = new Map<string, number>();
constructor(
private storage: MatrixStorage,
private callbacks: CommandCallbacks = {},
) {}
/**
* Process a !command.
* Returns:
* - string → send as reply
* - '' → silent ack (no reply sent)
* - undefined → not a recognized command, fall through to Letta
*/
async handleCommand(
body: string,
roomId: string,
sender: string,
roomMeta?: { isDm: boolean; roomName: string },
): Promise<string | undefined> {
const parts = body.slice(1).trim().split(/\s+/);
const cmd = parts[0]?.toLowerCase();
const args = parts.slice(1);
switch (cmd) {
case "commands":
return this.doCommands();
case "pause":
return this.doPause(roomId, sender);
case "resume":
return this.doResume(roomId);
case "status":
return this.doStatus(roomId);
case "ignorebot-add":
return this.doBotAdd(args[0], sender);
case "ignorebot-remove":
return this.doBotRemove(args[0]);
case "heartbeat":
return this.doHeartbeat(args[0]);
case "restore":
return this.doRestore(args[0], roomId, roomMeta?.isDm ?? false, roomMeta?.roomName ?? roomId);
case "turns":
return this.doTurns(args[0], roomId);
case "timeout":
return this.doTimeout();
case "new":
return await this.doNew(roomId, roomMeta?.isDm ?? false, roomMeta?.roomName ?? roomId);
case "showreasoning":
return this.doShowReasoning();
default:
return undefined;
}
}
isRoomPaused(roomId: string): boolean {
return this.storage.isRoomPaused(roomId);
}
isIgnoredBot(userId: string): boolean {
return this.storage.isIgnoredBot(userId);
}
/**
* Check if a bot message should be processed in this room.
* Known bots are silenced UNLESS:
* - The message @mentions our userId (body contains display name or m.mentions)
* - !turns N is active for this room (and decrements the counter)
*/
shouldRespondToBot(roomId: string, body: string, ourUserId: string): boolean {
// Check @mention — body contains our display name or full user ID
const displayName = ourUserId.match(/^@([^:]+):/)?.[1];
if (displayName && body.toLowerCase().includes(displayName.toLowerCase())) {
return true;
}
if (body.includes(ourUserId)) {
return true;
}
// Check !turns counter
const remaining = this.botTurns.get(roomId);
if (remaining !== undefined && remaining > 0) {
this.botTurns.set(roomId, remaining - 1);
log.info(`[Commands] !turns: ${remaining - 1} turns remaining in ${roomId}`);
if (remaining - 1 === 0) this.botTurns.delete(roomId);
return true;
}
return false;
}
// ─── Command implementations ─────────────────────────────────────────────
private doCommands(): string {
const lines = [
"📜 **Available Commands**",
"",
"**Bot Control**",
" `!pause` — Silence bot in current room",
" `!resume` — Re-enable bot in current room",
" `!status` — Show bot status, paused rooms, heartbeat state",
"",
"**Bot Loop Prevention**",
" `!ignorebot-add @user:server` — Add bot to ignore list",
" `!ignorebot-remove @user:server` — Remove from ignore list",
" `!turns N` (1-50) — Respond to bot messages for N turns",
"",
"**Conversation Management**",
" `!new` — Create fresh Letta conversation for this room",
" `!restore conv-xxxx` — Point room at specific conversation",
" `!showreasoning` — Show current reasoning display status",
"",
"**Heartbeat Control**",
" `!heartbeat on/off` — Toggle heartbeat cron",
" `!timeout` — Kill stuck heartbeat run",
];
return lines.join("\n");
}
private doPause(roomId: string, sender: string): string {
this.storage.pauseRoom(roomId, sender);
return "⏸️ Bot paused in this room. Use !resume to re-enable.";
}
private doResume(roomId: string): string {
this.storage.resumeRoom(roomId);
return "▶️ Bot resumed in this room.";
}
private doStatus(roomId: string): string {
const paused = this.storage.getPausedRooms();
const ignored = this.storage.getIgnoredBots();
const hbState = this.callbacks.isHeartbeatEnabled?.() ? "on" : "off";
const thisRoomPaused = this.storage.isRoomPaused(roomId);
const turnsRemaining = this.botTurns.get(roomId);
const lines = [
"📊 **Bot Status**",
`This room: ${thisRoomPaused ? "⏸️ paused" : "▶️ active"}`,
`Conversation key: \`matrix:${roomId}\``,
turnsRemaining ? `Bot turns: ${turnsRemaining} remaining` : "Bot turns: off (observer mode in multi-bot rooms)",
paused.length > 0 ? `Paused rooms: ${paused.length}` : "No rooms paused",
ignored.length > 0
? `Known bots:\n${ignored.map((u) => `${u}`).join("\n")}`
: "No known bots",
`Heartbeat: ${hbState}`,
];
return lines.join("\n");
}
private doBotAdd(userId: string | undefined, sender: string): string {
if (!userId?.startsWith("@")) {
return "⚠️ Usage: !ignorebot-add @user:server";
}
this.storage.addIgnoredBot(userId, sender);
return `🚫 Added ${userId} to ignore list`;
}
private doBotRemove(userId: string | undefined): string {
if (!userId?.startsWith("@")) {
return "⚠️ Usage: !ignorebot-remove @user:server";
}
this.storage.removeIgnoredBot(userId);
return `✅ Removed ${userId} from ignore list`;
}
private doHeartbeat(arg: string | undefined): string {
const normalized = arg?.toLowerCase();
if (normalized === "off" || normalized === "stop") {
this.callbacks.onHeartbeatStop?.();
return "⏸️ Heartbeat cron stopped";
}
if (normalized === "on" || normalized === "start") {
this.callbacks.onHeartbeatStart?.();
return "▶️ Heartbeat cron started";
}
return "⚠️ Usage: !heartbeat on | !heartbeat off";
}
private doTurns(arg: string | undefined, roomId: string): string {
const n = parseInt(arg || "", 10);
if (!n || n < 1 || n > 50) {
const current = this.botTurns.get(roomId);
if (current) return `🔄 ${current} bot turns remaining in this room`;
return "⚠️ Usage: !turns N (1-50) — respond to bot messages for the next N turns";
}
this.botTurns.set(roomId, n);
return `🔄 Will respond to bot messages for the next ${n} turns in this room`;
}
private doRestore(
_convId: string | undefined,
_roomId: string,
_isDm: boolean,
_roomName: string,
): string {
return " !restore is no longer needed — each room has its own persistent conversation via per-chat mode.\nUse !new to start a fresh conversation.";
}
private doTimeout(): string {
if (this.callbacks.onTimeoutHeartbeat) {
this.callbacks.onTimeoutHeartbeat();
return "⏹ Killing stuck heartbeat run";
}
return "⚠️ No heartbeat timeout handler registered";
}
private async doNew(
roomId: string,
isDm: boolean,
roomName: string,
): Promise<string> {
const agentId = this.callbacks.getAgentId?.();
if (!agentId) {
return "⚠️ No agent ID available";
}
if (!this.callbacks.onInvalidateSession) {
return "⚠️ Session reset not available (onInvalidateSession not wired)";
}
// In per-chat mode the conversation key is 'matrix:{roomId}'
const key = `matrix:${roomId}`;
this.callbacks.onInvalidateSession(key);
log.info(`!new: invalidated session for key ${key}`);
return `✓ Fresh conversation started for ${isDm ? "this DM" : roomName}. Next message will begin a new session.`;
}
private doShowReasoning(): string {
return [
"🧠 **Reasoning Text Display**",
"",
"Controls whether the agent's thinking/reasoning text is shown in chat.",
"The 🧠 emoji always appears when reasoning starts — this setting controls the text.",
"",
"**Configuration:** Set `display.showReasoning` in your `lettabot.yaml`.",
" - `true`: Show reasoning text in a collapsible block",
" - `false`: Hide reasoning text (only final response shown)",
"",
"Restart the bot after changing config.",
].join('\n');
}
}

View File

@@ -0,0 +1,289 @@
/**
* Matrix E2EE Crypto Utilities
*
* Handles initialization and management of Matrix end-to-end encryption.
* Uses rust crypto (v28) via initRustCrypto() for Node.js.
*
* Based on the Python bridge approach:
* - Uses bootstrapSecretStorage with recovery key
* - Uses bootstrapCrossSigning for cross-signing setup
* - Sets trust to allow unverified devices (TOFU model)
*/
import { createLogger } from "../../logger.js";
import * as sdk from "matrix-js-sdk";
import { decodeRecoveryKey } from "matrix-js-sdk/lib/crypto/recoverykey.js";
const log = createLogger('MatrixCrypto');
interface CryptoConfig {
enableEncryption: boolean;
recoveryKey?: string;
storeDir: string;
password?: string;
userId?: string;
}
/**
* Get crypto callbacks for the Matrix client
* These are needed for secret storage operations
*/
export function getCryptoCallbacks(recoveryKey?: string): sdk.ICryptoCallbacks {
return {
getSecretStorageKey: async (
{ keys }: { keys: Record<string, any> },
name: string,
): Promise<[string, Uint8Array] | null> => {
if (!recoveryKey) {
log.info("[MatrixCrypto] No recovery key provided, cannot retrieve secret storage key");
return null;
}
// Get the key ID from the keys object
// The SDK passes { keys: { [keyId]: keyInfo } }, and we need to return one we have
const keyIds = Object.keys(keys);
if (keyIds.length === 0) {
log.info("[MatrixCrypto] No secret storage key IDs requested");
return null;
}
// Use the first available key ID
const keyId = keyIds[0];
log.info(`[MatrixCrypto] Providing secret storage key for keyId: ${keyId}, name: ${name}`);
// Convert recovery key to Uint8Array
// Recovery key uses Matrix's special format with prefix, parity byte, etc.
try {
const keyBytes = decodeRecoveryKey(recoveryKey);
log.info(`[MatrixCrypto] Decoded recovery key, length: ${keyBytes.length} bytes`);
return [keyId, keyBytes];
} catch (err) {
log.error("[MatrixCrypto] Failed to decode recovery key:", err);
return null;
}
},
// Cache the key to avoid prompting multiple times
cacheSecretStorageKey: (keyId: string, _keyInfo: any, key: Uint8Array): void => {
log.info(`[MatrixCrypto] Cached secret storage key: ${keyId}`);
},
};
}
/**
* Initialize E2EE for a Matrix client using rust crypto
*
* This follows the Python bridge pattern:
* 1. Initialize rust crypto
* 2. Bootstrap secret storage with recovery key
* 3. Bootstrap cross-signing
* 4. Set trust settings for TOFU (Trust On First Use)
*/
export async function initE2EE(
client: sdk.MatrixClient,
config: CryptoConfig,
): Promise<void> {
if (!config.enableEncryption) {
log.info("[MatrixCrypto] Encryption disabled");
return;
}
log.info("[MatrixCrypto] E2EE enabled");
try {
// useIndexedDB: false — ephemeral crypto mode.
// Rust WASM crypto triggers TransactionInactiveError with IndexedDB persistence.
// Upstream issue: matrix-org/matrix-rust-sdk-crypto-wasm#195
// Workaround: fresh device on every restart, cross-signing auto-verifies.
log.info("[MatrixCrypto] Initializing rust crypto (ephemeral mode)...");
await client.initRustCrypto({ useIndexedDB: false });
const crypto = client.getCrypto();
if (!crypto) {
throw new Error("Crypto not initialized after initRustCrypto");
}
log.info("[MatrixCrypto] Rust crypto initialized");
// CRITICAL: Trigger outgoing request loop to upload device keys
// Without this, the device shows as "doesn't support encryption"
log.info("[MatrixCrypto] Triggering key upload...");
(crypto as any).outgoingRequestLoop();
// Give it a moment to process
await new Promise(resolve => setTimeout(resolve, 2000));
log.info("[MatrixCrypto] Key upload triggered");
// Force a device key query to get the list of devices for this user
// This is needed to verify signatures on the key backup
if (config.userId) {
log.info("[MatrixCrypto] Fetching device list...");
try {
await crypto.getUserDeviceInfo([config.userId]);
// Wait a bit for the key query to complete - this is async in the background
await new Promise((resolve) => setTimeout(resolve, 2000));
log.info("[MatrixCrypto] Device list fetched");
} catch (err) {
log.warn("[MatrixCrypto] Failed to fetch device list:", err);
}
}
// Import backup decryption key from recovery key
// The recovery key IS the backup decryption key - when decoded it gives us
// the raw private key needed to decrypt keys from server-side backup
if (config.recoveryKey) {
log.info("[MatrixCrypto] Importing backup decryption key from recovery key...");
try {
const backupKey = decodeRecoveryKey(config.recoveryKey);
await crypto.storeSessionBackupPrivateKey(backupKey);
log.info("[MatrixCrypto] Backup decryption key stored successfully");
} catch (err) {
log.warn("[MatrixCrypto] Failed to store backup key:", err);
}
log.info("[MatrixCrypto] Bootstrapping secret storage...");
try {
await crypto.bootstrapSecretStorage({});
log.info("[MatrixCrypto] Secret storage bootstrapped");
} catch (err) {
log.warn("[MatrixCrypto] Secret storage bootstrap failed (may already exist):", err);
}
// Bootstrap cross-signing - this will READ existing keys from secret storage
// DO NOT use setupNewCrossSigning: true as that would create new keys
log.info("[MatrixCrypto] Bootstrapping cross-signing...");
try {
await crypto.bootstrapCrossSigning({
// Only read existing keys from secret storage, don't create new ones
// This preserves the user's existing cross-signing identity
authUploadDeviceSigningKeys: async (makeRequest: any) => {
log.info("[MatrixCrypto] Uploading cross-signing keys with auth...");
// Try with password auth if available
if (config.password && config.userId) {
await makeRequest({
type: "m.login.password",
user: config.userId,
password: config.password,
});
return;
}
await makeRequest({});
return;
},
});
log.info("[MatrixCrypto] Cross-signing bootstrapped");
} catch (err) {
log.warn("[MatrixCrypto] Cross-signing bootstrap failed:", err);
}
}
// Enable trusting cross-signed devices (similar to Python's TrustState.UNVERIFIED)
// This allows the bot to receive encrypted messages without interactive verification
crypto.setTrustCrossSignedDevices(true);
// CRITICAL: Disable global blacklist of unverified devices
// This is the TypeScript equivalent of Python's allow_key_share
// When false, the bot will:
// 1. Encrypt messages for unverified devices
// 2. Accept room key requests from unverified devices
crypto.globalBlacklistUnverifiedDevices = false;
log.info("[MatrixCrypto] Trusting cross-signed devices enabled");
log.info("[MatrixCrypto] Unverified devices globally enabled (auto-key-share equivalent)");
log.info("[MatrixCrypto] Crypto initialization complete");
log.info("[MatrixCrypto] NOTE: Key backup check will run after first sync when device list is populated");
} catch (err) {
log.error("[MatrixCrypto] Failed to initialize crypto:", err);
throw err;
}
}
/**
* Mark all devices for a user as verified (TOFU - Trust On First Use)
* This is called after sync completes to trust devices we've seen
*/
/**
* Check and enable key backup after sync completes
* This must be called AFTER the initial sync so device list is populated
*/
export async function checkAndRestoreKeyBackup(
client: sdk.MatrixClient,
recoveryKey?: string,
): Promise<void> {
const crypto = client.getCrypto();
if (!crypto || !recoveryKey) return;
log.info("[MatrixCrypto] Checking key backup after sync...");
try {
const backupInfo = await crypto.checkKeyBackupAndEnable();
if (backupInfo) {
log.info("[MatrixCrypto] Key backup enabled");
// Check if backup exists before trying to restore
try {
// Verify backup version exists on server
await client.getKeyBackupVersion();
log.info("[MatrixCrypto] Backup version exists on server");
// Restore keys from backup
log.info("[MatrixCrypto] Restoring keys from backup...");
const backupKey = decodeRecoveryKey(recoveryKey);
const restoreResult = await (client as any).restoreKeyBackup(
backupKey,
undefined, // all rooms
undefined, // all sessions
backupInfo.backupInfo,
);
log.info(`[MatrixCrypto] Restored ${restoreResult.imported} keys from backup`);
} catch (backupErr: any) {
if (backupErr.errcode === 'M_NOT_FOUND' || backupErr.httpStatus === 404) {
log.info("[MatrixCrypto] Key backup not found on server, skipping restore");
// Don't treat this as an error - the backup may not exist yet
} else {
log.warn("[MatrixCrypto] Error accessing key backup:", backupErr);
}
}
} else {
log.info("[MatrixCrypto] No trusted key backup available");
}
} catch (err) {
log.warn("[MatrixCrypto] Key backup check failed:", err);
}
}
export async function trustUserDevices(
client: sdk.MatrixClient,
userId: string,
): Promise<void> {
const crypto = client.getCrypto();
if (!crypto) return;
try {
log.info(`[MatrixCrypto] Trusting devices for ${userId}...`);
// Get all devices for this user
const devices = await crypto.getUserDeviceInfo([userId]);
const userDevices = devices.get(userId);
if (!userDevices || userDevices.size === 0) {
log.info(`[MatrixCrypto] No devices found for ${userId}`);
return;
}
let verifiedCount = 0;
for (const [deviceId, deviceInfo] of Array.from(userDevices.entries())) {
// Skip our own device
if (deviceId === client.getDeviceId()) continue;
// Check current verification status
const status = await crypto.getDeviceVerificationStatus(userId, deviceId);
if (!status?.isVerified()) {
log.info(`[MatrixCrypto] Marking device ${deviceId} as verified`);
await crypto.setDeviceVerified(userId, deviceId, true);
verifiedCount++;
}
}
log.info(`[MatrixCrypto] Verified ${verifiedCount} devices for ${userId}`);
} catch (err) {
log.error(`[MatrixCrypto] Failed to trust devices for ${userId}:`, err);
}
}

View File

@@ -0,0 +1,143 @@
import { createLogger } from '../../../logger.js';
const log = createLogger('MatrixAudio');
/**
* Audio Handler for Matrix Adapter
*
* Handles incoming audio messages, transcription via STT,
* and coordinates TTS audio response generation.
*/
import type * as sdk from "matrix-js-sdk";
import type { InboundMessage } from "../../../core/types.js";
import { transcribeAudio } from "../stt.js";
import { downloadAndDecryptMedia, type EncryptionInfo } from "../media.js";
export interface AudioHandlerContext {
client: sdk.MatrixClient;
room: sdk.Room;
event: sdk.MatrixEvent;
ourUserId: string;
// Configuration
transcriptionEnabled: boolean;
sttUrl?: string;
// Callbacks
sendTyping: (roomId: string, typing: boolean) => Promise<void>;
sendMessage: (roomId: string, text: string) => Promise<void>;
}
interface AudioInfo {
mxcUrl?: string;
encryptionInfo?: EncryptionInfo;
}
/**
* Handle audio messages
*/
export async function handleAudioMessage(
ctx: AudioHandlerContext,
): Promise<InboundMessage | null> {
const { client, room, event, ourUserId } = ctx;
const sender = event.getSender();
const roomId = room.roomId;
if (!sender || !roomId) return null;
log.info(`Audio from ${sender} in ${roomId}`);
// Send typing indicator (STT takes time)
await ctx.sendTyping(roomId, true);
try {
const content = event.getContent();
const audioInfo = extractAudioInfo(content);
if (!audioInfo.mxcUrl) {
throw new Error("No audio URL found");
}
// Download and decrypt audio (handles auth + E2EE)
const audioData = await downloadAndDecryptMedia(client, audioInfo.mxcUrl, audioInfo.encryptionInfo);
log.info(`Downloaded ${audioData.length} bytes`);
// Transcribe if enabled
if (!ctx.transcriptionEnabled) {
await ctx.sendTyping(roomId, false);
await ctx.sendMessage(roomId, "Audio received (transcription disabled)");
return null;
}
const transcription = await transcribeAudio(audioData, {
url: ctx.sttUrl,
model: "small",
});
if (!transcription || isTranscriptionFailed(transcription)) {
await ctx.sendTyping(roomId, false);
await ctx.sendMessage(roomId, "No speech detected in audio");
return null;
}
log.info(`Transcribed: "${transcription.slice(0, 50)}..."`);
// Voice context prefix
const voiceMessage = `[VOICE] "${transcription}"`;
const isDm = isDirectMessage(room);
const roomName = room.name || room.getCanonicalAlias() || roomId;
await ctx.sendTyping(roomId, false);
return {
channel: "matrix",
chatId: roomId,
userId: sender,
userName: room.getMember(sender)?.name || sender,
userHandle: sender,
messageId: event.getId() || undefined,
text: voiceMessage,
timestamp: new Date(event.getTs()),
isGroup: !isDm,
groupName: isDm ? undefined : roomName,
isVoiceInput: true, // Mark as voice so bot.ts knows to generate audio response
};
} catch (err) {
log.error("Failed to process audio:", err);
await ctx.sendTyping(roomId, false);
await ctx.sendMessage(roomId, `Failed to process audio: ${err instanceof Error ? err.message : "Unknown error"}`);
return null;
}
}
function extractAudioInfo(content: Record<string, unknown>): AudioInfo {
const file = content.file as Record<string, unknown> | undefined;
const url = content.url as string | undefined;
if (file?.url) {
return {
mxcUrl: file.url as string,
encryptionInfo: {
key: file.key as { k: string },
iv: file.iv as string,
hashes: file.hashes as { sha256: string },
},
};
}
if (url) {
return { mxcUrl: url };
}
return {};
}
function isDirectMessage(room: sdk.Room): boolean {
return room.getJoinedMembers().length === 2;
}
function isTranscriptionFailed(text: string): boolean {
return text.startsWith("[") && text.includes("Error");
}

View File

@@ -0,0 +1,153 @@
import { createLogger } from '../../../logger.js';
const log = createLogger('MatrixFile');
/**
* File Handler for Matrix Adapter
*
* Handles incoming file messages (PDFs, docs, etc.) by downloading,
* decrypting, and saving to disk so the agent can process them via
* shell tools (pdftotext, cat, etc.)
*
* Files are saved to: {uploadDir}/uploads/YYYY-MM/{filename}
*/
import { mkdirSync, writeFileSync } from "node:fs";
import { join } from "node:path";
import type * as sdk from "matrix-js-sdk";
import type { InboundMessage } from "../../../core/types.js";
import { downloadAndDecryptMedia, type EncryptionInfo } from "../media.js";
export interface FileHandlerContext {
client: sdk.MatrixClient;
room: sdk.Room;
event: sdk.MatrixEvent;
ourUserId: string;
// Base directory for uploads (e.g. process.cwd())
uploadDir: string;
// Callbacks
sendTyping: (roomId: string, typing: boolean) => Promise<void>;
}
interface FileInfo {
mxcUrl?: string;
encryptionInfo?: EncryptionInfo;
filename: string;
mimetype: string;
size?: number;
}
/**
* Handle generic file messages (m.file)
*/
export async function handleFileMessage(
ctx: FileHandlerContext,
): Promise<InboundMessage | null> {
const { client, room, event, ourUserId } = ctx;
const sender = event.getSender();
const roomId = room.roomId;
if (!sender || sender === ourUserId) return null;
if (!roomId) return null;
const content = event.getContent();
if (!content) return null;
const fileInfo = extractFileInfo(content, event.getId() || "unknown");
log.info(`File from ${sender} in ${roomId}: ${fileInfo.filename} (${fileInfo.mimetype})`);
if (!fileInfo.mxcUrl) {
log.warn("No URL found in file event");
return null;
}
await ctx.sendTyping(roomId, true);
try {
// Download and decrypt (handles auth + E2EE)
const fileData = await downloadAndDecryptMedia(client, fileInfo.mxcUrl, fileInfo.encryptionInfo);
log.info(`Downloaded ${fileData.length} bytes`);
// Build upload path: uploads/YYYY-MM/filename
const now = new Date();
const monthDir = `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, "0")}`;
const uploadPath = join(ctx.uploadDir, "uploads", monthDir);
mkdirSync(uploadPath, { recursive: true });
const savePath = join(uploadPath, fileInfo.filename);
writeFileSync(savePath, fileData);
// Relative path for agent (relative to uploadDir / process.cwd())
const relativePath = join("uploads", monthDir, fileInfo.filename);
const sizeKB = fileInfo.size ? `${Math.round(fileInfo.size / 1024)} KB` : `${Math.round(fileData.length / 1024)} KB`;
log.info(`Saved to ${savePath}`);
await ctx.sendTyping(roomId, false);
const isDm = room.getJoinedMembers().length === 2;
const roomName = room.name || room.getCanonicalAlias() || roomId;
const text = [
`[File received: ${fileInfo.filename} (${fileInfo.mimetype}, ${sizeKB})`,
`Saved to: ${relativePath}`,
`Use shell tools to read it (pdftotext, cat, head, strings, etc.)]`,
].join("\n");
return {
channel: "matrix",
chatId: roomId,
userId: sender,
userName: room.getMember(sender)?.name || sender,
userHandle: sender,
messageId: event.getId() || undefined,
text,
timestamp: new Date(event.getTs()),
isGroup: !isDm,
groupName: isDm ? undefined : roomName,
};
} catch (err) {
log.error("Failed to process file:", err);
await ctx.sendTyping(roomId, false);
return null;
}
}
function extractFileInfo(content: Record<string, unknown>, eventId: string): FileInfo {
const file = content.file as Record<string, unknown> | undefined;
const url = content.url as string | undefined;
const info = content.info as Record<string, unknown> | undefined;
// Sanitize filename: strip path separators, collapse spaces
const rawName = (content.body as string | undefined) || eventId;
const filename = rawName
.replace(/[/\\]/g, "_")
.replace(/\s+/g, "_")
.replace(/[^a-zA-Z0-9._\-]/g, "_")
.slice(0, 200) || "file";
const mimetype = (info?.mimetype as string | undefined) || "application/octet-stream";
const size = info?.size as number | undefined;
if (file?.url) {
return {
mxcUrl: file.url as string,
encryptionInfo: {
key: file.key as { k: string },
iv: file.iv as string,
hashes: file.hashes as { sha256: string },
},
filename,
mimetype,
size,
};
}
if (url) {
return { mxcUrl: url, filename, mimetype, size };
}
return { filename, mimetype, size };
}

View File

@@ -0,0 +1,142 @@
import { createLogger } from '../../../logger.js';
const log = createLogger('MatrixImage');
/**
* Image Handler for Matrix Adapter
*
* Handles incoming image messages with pending queue pattern.
*/
import type * as sdk from "matrix-js-sdk";
import type { InboundMessage } from "../../../core/types.js";
import { downloadAndDecryptMedia, type EncryptionInfo } from "../media.js";
export interface ImageHandlerContext {
client: sdk.MatrixClient;
room: sdk.Room;
event: sdk.MatrixEvent;
ourUserId: string;
imageMaxSize: number;
// Callbacks
sendTyping: (roomId: string, typing: boolean) => Promise<void>;
sendMessage: (roomId: string, text: string) => Promise<void>;
addReaction: (roomId: string, eventId: string, emoji: string) => Promise<void>;
storePendingImage: (eventId: string, roomId: string, imageData: Buffer, format: string) => Promise<void>;
}
interface ImageInfo {
mxcUrl?: string;
encryptionInfo?: EncryptionInfo;
}
/**
* Handle image messages
*/
export async function handleImageMessage(
ctx: ImageHandlerContext,
): Promise<InboundMessage | null> {
const { client, room, event, ourUserId, imageMaxSize } = ctx;
const sender = event.getSender();
const roomId = room.roomId;
const eventId = event.getId();
if (!sender || !roomId || !eventId) return null;
log.info(`Image from ${sender} in ${roomId}`);
// Send typing indicator (image processing takes time)
await ctx.sendTyping(roomId, true);
try {
const content = event.getContent();
// Get image URL and encryption info
const imageInfo = extractImageInfo(content);
if (!imageInfo.mxcUrl) {
throw new Error("No image URL found");
}
// Add ✅ reaction BEFORE download (so user sees we got it)
await ctx.addReaction(roomId, eventId, "✅");
// Download and decrypt image (handles auth + E2EE)
let imageData = await downloadAndDecryptMedia(client, imageInfo.mxcUrl, imageInfo.encryptionInfo);
log.info(`Downloaded ${imageData.length} bytes`);
// Detect format
const format = detectImageFormat(imageData);
log.info(`Format: ${format}`);
// Process image (placeholder - would resize with sharp)
// For now, just validate size
if (imageData.length > 10 * 1024 * 1024) {
throw new Error("Image too large (max 10MB)");
}
// Stop typing
await ctx.sendTyping(roomId, false);
// Store pending image
await ctx.storePendingImage(eventId, roomId, imageData, format);
log.info(`Image queued, awaiting text`);
// Return null - image is pending, will be combined with next text
return null;
} catch (err) {
log.error("Failed to process image:", err);
await ctx.sendTyping(roomId, false);
await ctx.sendMessage(roomId, `Failed to process image: ${err instanceof Error ? err.message : "Unknown error"}`);
return null;
}
}
function extractImageInfo(content: Record<string, unknown>): ImageInfo {
const file = content.file as Record<string, unknown> | undefined;
const url = content.url as string | undefined;
if (file?.url) {
return {
mxcUrl: file.url as string,
encryptionInfo: {
key: file.key as { k: string },
iv: file.iv as string,
hashes: file.hashes as { sha256: string },
},
};
}
if (url) {
return { mxcUrl: url };
}
return {};
}
function detectImageFormat(data: Buffer): string {
if (data.length < 4) return "unknown";
// JPEG: FF D8 FF
if (data[0] === 0xff && data[1] === 0xd8 && data[2] === 0xff) {
return "jpeg";
}
// PNG: 89 50 4E 47
if (data[0] === 0x89 && data[1] === 0x50 && data[2] === 0x4e && data[3] === 0x47) {
return "png";
}
// GIF: 47 49 46 38
if (data[0] === 0x47 && data[1] === 0x49 && data[2] === 0x46 && data[3] === 0x38) {
return "gif";
}
// WebP: 52 49 46 46 ... 57 45 42 50
if (data[0] === 0x52 && data[1] === 0x49 && data[2] === 0x46 && data[3] === 0x46) {
return "webp";
}
return "unknown";
}

View File

@@ -0,0 +1,102 @@
import { createLogger } from '../../../logger.js';
const log = createLogger('MatrixInvite');
/**
* Invite Handler
*
* Handles room membership events (invites, joins, leaves).
*/
import type * as sdk from "matrix-js-sdk";
import type { DmPolicy } from "../../../pairing/types.js";
interface InviteHandlerContext {
client: sdk.MatrixClient;
event: sdk.MatrixEvent;
member: sdk.RoomMember;
dmPolicy: DmPolicy;
allowedUsers: string[];
autoAccept: boolean;
storage?: Record<string, never>; // reserved for future use
ourUserId?: string;
}
/**
* Handle a room membership event
*/
export async function handleMembershipEvent(ctx: InviteHandlerContext): Promise<void> {
const { client, event, member, dmPolicy, allowedUsers, autoAccept, storage, ourUserId } = ctx;
const membership = member.membership;
const sender = event.getSender();
if (!sender) return;
switch (membership) {
case "invite":
await handleInvite(client, member, sender, dmPolicy, allowedUsers, autoAccept);
break;
case "join":
handleJoin(member);
break;
case "leave":
handleLeave(member, storage, ourUserId);
break;
}
}
/**
* Handle an invite
*/
async function handleInvite(
client: sdk.MatrixClient,
member: sdk.RoomMember,
sender: string,
dmPolicy: DmPolicy,
allowedUsers: string[],
autoAccept: boolean,
): Promise<void> {
log.info(`Received invite to ${member.roomId} from ${sender}`);
if (!autoAccept) {
log.info(`Auto-accept disabled, ignoring invite`);
return;
}
// Check if we should accept based on policy
if (dmPolicy === "allowlist") {
const isAllowed = allowedUsers.includes(sender);
if (!isAllowed) {
log.info(`Rejecting invite from non-allowed user: ${sender}`);
return;
}
}
try {
await client.joinRoom(member.roomId);
log.info(`Joined room: ${member.roomId}`);
} catch (err) {
log.error(`Failed to join room: ${err}`);
}
}
/**
* Handle a join
*/
function handleJoin(member: sdk.RoomMember): void {
log.info(`User ${member.userId} joined ${member.roomId}`);
}
/**
* Handle a leave
*/
function handleLeave(
member: sdk.RoomMember,
_storage?: Record<string, never>,
ourUserId?: string,
): void {
log.info(`User ${member.userId} left ${member.roomId}`);
if (ourUserId && member.userId === ourUserId) {
log.info(`Our user left room ${member.roomId}`);
// Conversation history is managed by bot.ts per-chat mode — no local cleanup needed
}
}

View File

@@ -0,0 +1,191 @@
/**
* Message Handler
*
* Handles text messages and access control for Matrix.
*/
import type * as sdk from "matrix-js-sdk";
import type { InboundMessage } from "../../../core/types.js";
import type { DmPolicy } from "../../../pairing/types.js";
import { upsertPairingRequest } from "../../../pairing/store.js";
import { checkDmAccess } from "../../shared/access-control.js";
import { formatMatrixHTML } from "../html-formatter.js";
import { createLogger } from "../../../logger.js";
const log = createLogger('MatrixMessage');
interface MessageHandlerContext {
client: sdk.MatrixClient;
room: sdk.Room;
event: sdk.MatrixEvent;
ourUserId: string;
config: {
selfChatMode: boolean;
dmPolicy: DmPolicy;
allowedUsers: string[];
};
sendMessage: (roomId: string, text: string) => Promise<void>;
onCommand?: (command: string, chatId?: string, args?: string) => Promise<string | null>;
// !commands processor — handles pause/resume/status/ignorebot-add/ignorebot-remove/heartbeat/restore/turns
commandProcessor?: {
handleCommand(
body: string,
roomId: string,
sender: string,
roomMeta?: { isDm: boolean; roomName: string },
): Promise<string | undefined>;
isRoomPaused(roomId: string): boolean;
isIgnoredBot(userId: string): boolean;
shouldRespondToBot(roomId: string, body: string, ourUserId: string): boolean;
};
}
/**
* Handle a text message event
*/
export async function handleTextMessage(
ctx: MessageHandlerContext,
): Promise<InboundMessage | null> {
const { client, room, event, ourUserId, config, sendMessage, onCommand } = ctx;
const sender = event.getSender();
const content = event.getClearContent() || event.getContent();
const body = content.body as string;
if (!sender || !body) return null;
// Skip our own messages
if (sender === ourUserId) return null;
// Multi-bot rooms: determine observer mode for known bots
let observeOnly = false;
if (ctx.commandProcessor?.isIgnoredBot(sender)) {
if (!ctx.commandProcessor.shouldRespondToBot(room.roomId, body, ourUserId)) {
observeOnly = true; // forward to Letta for context, suppress response delivery
}
// else: Bot is @mentioning us or !turns is active — process normally (observeOnly stays false)
}
// Observer messages skip access check, commands, and paused check —
// they go straight to Letta for context building with no side effects.
if (!observeOnly) {
// Check self-chat mode
if (!config.selfChatMode && sender === ourUserId) {
return null;
}
// Handle slash commands
if (body.startsWith("/")) {
const result = await handleCommand(body, room.roomId, onCommand);
if (result) {
await sendMessage(room.roomId, result);
return null;
}
}
// Check access control
const access = await checkDmAccess('matrix', sender, config.dmPolicy, config.allowedUsers);
if (access === "blocked") {
await sendMessage(room.roomId, "Sorry, you're not authorized to use this bot.");
return null;
}
if (access === "pairing") {
const { code, created } = await upsertPairingRequest("matrix", sender, {
firstName: extractDisplayName(sender),
});
if (!code) {
await sendMessage(
room.roomId,
"Too many pending pairing requests. Please try again later.",
);
return null;
}
if (created) {
const pairingMessage = `Hi! This bot requires pairing.
Your code: *${code}*
Ask the owner to run:
\`lettabot pairing approve matrix ${code}\`
This code expires in 1 hour.`;
await sendMessage(room.roomId, pairingMessage);
}
return null;
}
// Handle !commands — only reachable if sender passed access check above
if (body.startsWith("!") && ctx.commandProcessor) {
const isDm = isDirectMessage(room);
const roomMeta = { isDm, roomName: room.name || room.roomId };
const reply = await ctx.commandProcessor.handleCommand(body, room.roomId, sender, roomMeta);
if (reply !== undefined) {
if (reply) await sendMessage(room.roomId, reply);
return null;
}
// Unrecognized !command — fall through to Letta as normal text
}
// Drop message if room is paused (allowed users handled commands above so !resume still works)
if (ctx.commandProcessor?.isRoomPaused(room.roomId)) return null;
}
// Build inbound message
const isDm = isDirectMessage(room);
const messageId = event.getId();
if (!messageId) {
log.warn(`[MatrixMessage] No messageId for event in room ${room.roomId} (${isDm ? 'DM' : 'group'}), sender=${sender}, body length=${body.length}`);
}
const message: InboundMessage = {
channel: "matrix",
chatId: room.roomId,
userId: sender,
userName: extractDisplayName(sender),
userHandle: sender,
messageId: messageId || undefined,
text: body,
timestamp: new Date(event.getTs()),
isGroup: !isDm,
groupName: isDm ? undefined : room.name,
};
return message;
}
/**
* Handle a slash command
*/
async function handleCommand(
command: string,
chatId: string,
onCommand?: (command: string, chatId?: string, args?: string) => Promise<string | null>,
): Promise<string | null> {
if (!onCommand) return null;
const parts = command.slice(1).trim().split(/\s+/);
const cmd = parts[0];
const args = parts.slice(1).join(' ') || undefined;
return await onCommand(cmd, chatId, args);
}
/**
* Check if a room is a direct message
*/
function isDirectMessage(room: sdk.Room): boolean {
const members = room.getJoinedMembers();
return members.length === 2;
}
/**
* Extract display name from Matrix user ID
*/
function extractDisplayName(userId: string): string {
// Extract from @user:server format
const match = userId.match(/^@([^:]+):/);
return match ? match[1] : userId;
}

View File

@@ -0,0 +1,106 @@
import { createLogger } from '../../../logger.js';
const log = createLogger('MatrixReaction');
/**
* Reaction Handler
*
* Handles emoji reactions on bot messages:
* - 👍/❤️/👏/🎉 → positive feedback to Letta
* - 👎/😢/😔/❌ → negative feedback to Letta
* - 🎤 → regenerate TTS audio
*/
import type * as sdk from "matrix-js-sdk";
import { POSITIVE_REACTIONS, NEGATIVE_REACTIONS, SPECIAL_REACTIONS } from "../types.js";
import type { MatrixStorage } from "../storage.js";
import { Letta } from '@letta-ai/letta-client';
interface ReactionHandlerContext {
client: sdk.MatrixClient;
event: sdk.MatrixEvent;
ourUserId: string;
storage: MatrixStorage;
sendMessage: (roomId: string, text: string) => Promise<void>;
regenerateTTS: (text: string, roomId: string) => Promise<void>;
// Forward non-special reactions to the Letta agent so it can see and respond to them
forwardToLetta?: (text: string, roomId: string, sender: string) => Promise<void>;
// Check if a ✅ reaction targets a pending image — if so, trigger the image send
sendPendingImageToAgent?: (targetEventId: string, roomId: string, sender: string) => boolean;
}
export async function handleReactionEvent(ctx: ReactionHandlerContext): Promise<void> {
const { event, ourUserId, storage } = ctx;
const content = event.getContent();
const relatesTo = content["m.relates_to"];
if (!relatesTo || relatesTo.rel_type !== "m.annotation") return;
const reactionKey = relatesTo.key as string;
const targetEventId = relatesTo.event_id as string;
const sender = event.getSender();
const roomId = event.getRoomId();
// Ignore reactions from the bot itself
if (sender === ourUserId) return;
log.info(`${reactionKey} on ${targetEventId} from ${sender}`);
// Handle 🎤 → regenerate TTS
if (reactionKey === SPECIAL_REACTIONS.REGENERATE_AUDIO) {
const originalText = storage.getOriginalTextForAudio(targetEventId);
if (originalText && roomId) {
log.info("Regenerating TTS audio");
await ctx.regenerateTTS(originalText, roomId);
} else {
log.info("No original text found for audio event");
}
return;
}
// Handle feedback reactions (👍/👎 etc.)
if (POSITIVE_REACTIONS.has(reactionKey) || NEGATIVE_REACTIONS.has(reactionKey)) {
const isPositive = POSITIVE_REACTIONS.has(reactionKey);
const score = isPositive ? 1.0 : -1.0;
const stepIds = storage.getStepIdsForEvent(targetEventId);
if (stepIds.length > 0) {
const agentId = process.env.LETTA_AGENT_ID;
if (agentId) {
const client = new Letta({ apiKey: process.env.LETTA_API_KEY || '', baseURL: process.env.LETTA_BASE_URL || 'https://api.letta.com' });
for (const stepId of stepIds) {
try {
await client.steps.feedback.create(stepId, { feedback: isPositive ? 'positive' : 'negative' });
log.info(`Feedback ${isPositive ? "+" : "-"} for step ${stepId}: sent`);
} catch (err) {
log.warn(`Feedback for step ${stepId} failed:`, err);
}
}
}
} else {
log.info(`No step IDs mapped for event ${targetEventId}`);
}
// Feedback reactions are still forwarded to Letta so the agent is aware
}
// ✅ on a pending image → trigger multimodal send (Python bridge parity)
// The pending image stays in the buffer; bot.ts will pick it up via getPendingImage(chatId)
if (reactionKey === '✅' && ctx.sendPendingImageToAgent && sender && roomId) {
const triggered = ctx.sendPendingImageToAgent(targetEventId, roomId, sender);
if (triggered) {
log.info(`✅ triggered pending image send for ${targetEventId}`);
return; // Don't forward as a regular reaction
}
}
// Forward ALL reactions (including feedback ones) to Letta so the agent can see them
// Format matches Python bridge: "🎭 {sender} reacted with: {emoji}"
if (ctx.forwardToLetta && sender && roomId) {
const reactionMsg = `🎭 ${sender} reacted with: ${reactionKey}`;
log.info(`Forwarding to Letta: ${reactionMsg}`);
await ctx.forwardToLetta(reactionMsg, roomId, sender).catch((err) => {
log.warn("Failed to forward reaction to Letta:", err);
});
}
}
export function isSpecialReaction(reaction: string): boolean {
return Object.values(SPECIAL_REACTIONS).includes(reaction as any);
}

View File

@@ -0,0 +1,159 @@
/**
* Matrix HTML Formatter
*
* Converts markdown and special syntax to Matrix HTML format.
* Supports spoilers, colors, and other Matrix-specific formatting.
*/
import { MATRIX_HTML_FORMAT, MATRIX_COLORS } from "./types.js";
import { EMOJI_ALIASES as EMOJI_ALIAS_TO_UNICODE } from "../shared/emoji.js";
interface FormattedMessage {
plain: string;
html: string;
}
/**
* Format text with Matrix HTML
*/
export function formatMatrixHTML(text: string): FormattedMessage {
// Convert emoji shortcodes first (before HTML escaping)
let plain = convertEmojiShortcodes(text);
let html = escapeHtml(plain);
// Convert **bold**
html = html.replace(/\*\*(.+?)\*\*/g, "<strong>$1</strong>");
plain = plain.replace(/\*\*(.+?)\*\*/g, "$1");
// Convert *italic*
html = html.replace(/\*(.+?)\*/g, "<em>$1</em>");
plain = plain.replace(/\*(.+?)\*/g, "$1");
// Convert ```code blocks``` FIRST (before single-backtick, or the single-backtick
// regex will consume the leading/trailing backticks of the fence and break it)
html = html.replace(/```(\w*)\n?([\s\S]*?)```/g, (_, lang, code) => {
const langAttr = lang ? ` class="language-${lang}"` : "";
return `<pre><code${langAttr}>${code}</code></pre>`;
});
// Convert `code` (single backtick — runs AFTER triple-backtick to avoid interference)
html = html.replace(/`([^`]+)`/g, "<code>$1</code>");
// Convert spoilers ||text||
html = html.replace(/\|\|(.+?)\|\|/g, '<span data-mx-spoiler>$1</span>');
plain = plain.replace(/\|\|(.+?)\|\|/g, "[spoiler]");
// Convert colors {color|text}
html = html.replace(/\{([^}|]+)\|([^}]+)\}/g, (match, color, content) => {
const hexColor = getColorHex(color.trim());
// `content` is already HTML-escaped (escapeHtml ran on the full string above)
// — do NOT call escapeHtml again or apostrophes become &amp;#039;
return `<font color="${hexColor}" data-mx-color="${hexColor}">${content}</font>`;
});
plain = plain.replace(/\{[^}|]+\|([^}]+)\}/g, "$1");
// Convert links
html = html.replace(
/(https?:\/\/[^\s]+)/g,
'<a href="$1">$1</a>',
);
// Convert newlines to <br>
html = html.replace(/\n/g, "<br>");
return { plain, html };
}
/**
* Escape HTML special characters
*/
function escapeHtml(text: string): string {
return text
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;")
.replace(/'/g, "&#039;");
}
/**
* Get hex color from name or return as-is if already hex
*/
function getColorHex(color: string): string {
// Check if it's already a hex color
if (color.startsWith("#")) {
return color;
}
// Check predefined colors
const upperColor = color.toUpperCase();
if (upperColor in MATRIX_COLORS) {
return MATRIX_COLORS[upperColor as keyof typeof MATRIX_COLORS];
}
// Default to white if unknown
return MATRIX_COLORS.WHITE;
}
/**
* Convert emoji shortcodes to Unicode using the unified emoji map.
* Handles both :colon: wrapped and plain aliases.
*/
export function convertEmojiShortcodes(text: string): string {
let result = text;
// Match :shortcode: pattern and replace with unicode
result = result.replace(/:([a-z0-9_+-]+):/gi, (match, name) => {
const lowerName = name.toLowerCase();
// Try direct lookup
if (EMOJI_ALIAS_TO_UNICODE[lowerName]) {
return EMOJI_ALIAS_TO_UNICODE[lowerName];
}
// Try with hyphens replaced by underscores
const withUnderscores = lowerName.replace(/-/g, '_');
if (EMOJI_ALIAS_TO_UNICODE[withUnderscores]) {
return EMOJI_ALIAS_TO_UNICODE[withUnderscores];
}
// Not found, return original
return match;
});
return result;
}
/**
* Create a Matrix mention pill
*/
export function createMentionPill(userId: string, displayName?: string): string {
const name = displayName || userId;
return `<a href="https://matrix.to/#/${userId}">${escapeHtml(name)}</a>`;
}
/**
* Create a room mention pill
*/
export function createRoomPill(roomId: string, roomName?: string): string {
const name = roomName || roomId;
return `<a href="https://matrix.to/#/${roomId}">${escapeHtml(name)}</a>`;
}
/**
* Format a quote (blockquote)
*/
export function formatQuote(text: string): FormattedMessage {
const lines = text.split("\n");
const plain = lines.map((line) => `> ${line}`).join("\n");
const html = `<blockquote>${escapeHtml(text).replace(/\n/g, "<br>")}</blockquote>`;
return { plain, html };
}
/**
* Format a list
*/
export function formatList(items: string[], ordered = false): FormattedMessage {
const plain = items.map((item, i) => `${ordered ? `${i + 1}.` : "-"} ${item}`).join("\n");
const tag = ordered ? "ol" : "ul";
const htmlItems = items.map((item) => `<li>${escapeHtml(item)}</li>`).join("");
const html = `<${tag}>${htmlItems}</${tag}>`;
return { plain, html };
}

View File

@@ -0,0 +1,12 @@
/**
* Matrix Channel Adapter
*
* Full-featured Matrix adapter with E2EE, voice transcription, TTS,
* image handling, and interactive reactions.
*
* This replaces the basic matrix-bot-sdk implementation with a
* comprehensive matrix-js-sdk-based adapter.
*/
export { MatrixAdapter } from "./adapter.js";
export type { MatrixAdapterConfig } from "./types.js";

View File

@@ -0,0 +1,79 @@
import { createLogger } from '../../logger.js';
const log = createLogger('IndexedDB');
/**
* IndexedDB Polyfill for Node.js — Persistent SQLite Backend
*
* Uses indexeddbshim (backed by sqlite3) to provide a REAL persistent IndexedDB
* implementation. Crypto keys, sessions, and device state are written to SQLite
* databases in databaseDir and survive process restarts.
*
* This replaces the previous fake-indexeddb (in-memory only) approach.
* With persistence, the bot keeps the same device identity across restarts —
* no re-verification needed after code changes.
*
* Storage: {databaseDir}/*.db (one SQLite file per IDB database name)
*/
import { existsSync, mkdirSync } from "node:fs";
interface PolyfillOptions {
databaseDir: string;
}
let initialized = false;
/**
* Initialize IndexedDB polyfill with persistent SQLite backend
*
* @param options.databaseDir - Directory where SQLite .db files are stored
*/
export async function initIndexedDBPolyfill(options: PolyfillOptions): Promise<void> {
if (initialized) {
log.info("Polyfill already initialized");
return;
}
const { databaseDir } = options;
// Ensure directory exists
if (!existsSync(databaseDir)) {
mkdirSync(databaseDir, { recursive: true });
}
try {
// indexeddbshim v16 — SQLite-backed IndexedDB for Node.js
// Sets global.indexedDB, global.IDBKeyRange, etc.
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore — indexeddbshim lacks type declarations
const { default: setGlobalVars } = await import("indexeddbshim/src/node.js");
setGlobalVars(null, {
checkOrigin: false, // no origin checks in Node.js
databaseBasePath: databaseDir, // where SQLite .db files live
deleteDatabaseFiles: false, // preserve data across restarts
});
initialized = true;
log.info(`Persistent SQLite backend initialized at ${databaseDir}`);
log.info("Crypto state will survive process restarts");
} catch (err) {
log.error("Failed to initialize persistent backend:", err);
log.warn("Falling back to fake-indexeddb (in-memory, ephemeral)");
try {
// @ts-expect-error - no types for auto import
await import("fake-indexeddb/auto");
initialized = true;
log.info("Fallback: in-memory IndexedDB (keys lost on restart)");
} catch (fallbackErr) {
log.error("Fallback also failed:", fallbackErr);
}
}
}
/**
* Check if IndexedDB polyfill is available
*/
export function isIndexedDBAvailable(): boolean {
return initialized && typeof (global as any).indexedDB !== "undefined";
}

View File

@@ -0,0 +1,146 @@
import { createLogger } from '../../logger.js';
const log = createLogger('MatrixMedia');
/**
* Matrix Media Download Utilities
*
* Handles authenticated media downloads and E2EE attachment decryption.
*
* Matrix spec v1.11 moved media to authenticated endpoints:
* /_matrix/client/v1/media/download/{serverName}/{mediaId}
*
* E2EE attachments use AES-256-CTR encryption with:
* - Key: base64url-encoded 256-bit AES key (file.key.k)
* - IV: base64-encoded 128-bit counter block (file.iv)
* - Hash: SHA-256 of encrypted data (file.hashes.sha256)
*/
import type * as sdk from "matrix-js-sdk";
import { webcrypto } from "crypto";
export interface EncryptionInfo {
key: { k: string };
iv: string;
hashes: { sha256: string };
}
/**
* Download a Matrix media file with authentication.
* Tries the authenticated v1 endpoint first, falls back to v3.
*/
export async function downloadMatrixMedia(
client: sdk.MatrixClient,
mxcUrl: string,
): Promise<Buffer> {
if (!mxcUrl.startsWith("mxc://")) {
throw new Error(`Invalid MXC URL: ${mxcUrl}`);
}
// Parse mxc://serverName/mediaId
const withoutScheme = mxcUrl.slice("mxc://".length);
const slashIndex = withoutScheme.indexOf("/");
if (slashIndex === -1) throw new Error(`Malformed MXC URL: ${mxcUrl}`);
const serverName = withoutScheme.slice(0, slashIndex);
const mediaId = withoutScheme.slice(slashIndex + 1);
const homeserver = (client as any).baseUrl || (client as any).getHomeserverUrl?.();
const accessToken = client.getAccessToken();
// Prefer authenticated endpoint (Matrix spec v1.11+)
const authUrl = `${homeserver}/_matrix/client/v1/media/download/${serverName}/${mediaId}`;
const fallbackUrl = `${homeserver}/_matrix/media/v3/download/${serverName}/${mediaId}`;
const headers: Record<string, string> = {};
if (accessToken) {
headers["Authorization"] = `Bearer ${accessToken}`;
}
for (const url of [authUrl, fallbackUrl]) {
try {
log.info(`Downloading from ${url.substring(0, 80)}...`);
const response = await fetch(url, { headers });
if (response.ok) {
const data = Buffer.from(await response.arrayBuffer());
log.info(`Downloaded ${data.length} bytes`);
return data;
}
log.info(`${url.includes("v1") ? "v1" : "v3"} returned ${response.status}, ${url.includes("v1") ? "trying v3..." : "giving up"}`);
} catch (err) {
log.info(`Request failed: ${err}`);
if (url === fallbackUrl) throw err;
}
}
throw new Error("Failed to download media from both endpoints");
}
/**
* Decrypt an AES-256-CTR encrypted Matrix attachment.
* Used for files in E2EE rooms.
*/
export async function decryptAttachment(
encryptedData: Buffer,
encInfo: EncryptionInfo,
): Promise<Buffer> {
const subtle = webcrypto.subtle;
// Decode base64url key (32 bytes for AES-256)
const keyBytes = Buffer.from(encInfo.key.k, "base64url");
// Decode base64 IV (16 bytes)
const iv = Buffer.from(encInfo.iv, "base64");
if (iv.length !== 16) {
throw new Error(`Invalid IV length: ${iv.length} (expected 16)`);
}
// Convert Buffer to plain ArrayBuffer for WebCrypto compatibility
// Buffer.buffer is ArrayBufferLike (may be SharedArrayBuffer), but .slice() always returns ArrayBuffer
const encryptedAB = encryptedData.buffer.slice(encryptedData.byteOffset, encryptedData.byteOffset + encryptedData.byteLength) as ArrayBuffer;
const ivAB = iv.buffer.slice(iv.byteOffset, iv.byteOffset + iv.byteLength) as ArrayBuffer;
const keyAB = keyBytes.buffer.slice(keyBytes.byteOffset, keyBytes.byteOffset + keyBytes.byteLength) as ArrayBuffer;
// Verify SHA-256 hash of encrypted data before decrypting
const hashBuffer = await subtle.digest("SHA-256", encryptedAB);
const hashB64 = Buffer.from(hashBuffer).toString("base64").replace(/=/g, "");
const expectedHash = encInfo.hashes.sha256.replace(/=/g, "");
if (hashB64 !== expectedHash) {
throw new Error(`SHA-256 hash mismatch: file may be corrupted`);
}
// Import AES-256-CTR key
const cryptoKey = await subtle.importKey(
"raw",
keyAB,
{ name: "AES-CTR", length: 256 },
false,
["decrypt"],
);
// Decrypt (AES-256-CTR, 64-bit counter = last 8 bytes of the 16-byte block)
const decrypted = await subtle.decrypt(
{ name: "AES-CTR", counter: new Uint8Array(ivAB), length: 64 },
cryptoKey,
encryptedAB,
);
return Buffer.from(decrypted);
}
/**
* Download and optionally decrypt a Matrix media attachment.
*/
export async function downloadAndDecryptMedia(
client: sdk.MatrixClient,
mxcUrl: string,
encryptionInfo?: EncryptionInfo,
): Promise<Buffer> {
const data = await downloadMatrixMedia(client, mxcUrl);
if (encryptionInfo) {
log.info(`Decrypting E2EE attachment...`);
const decrypted = await decryptAttachment(data, encryptionInfo);
log.info(`Decrypted: ${data.length}${decrypted.length} bytes`);
return decrypted;
}
return data;
}

View File

@@ -0,0 +1,146 @@
import { createLogger } from '../../logger.js';
const log = createLogger('CryptoStore');
/**
* Persistent Crypto Store for Node.js
*
* Wraps MemoryCryptoStore and serializes to disk on changes
*/
import { readFileSync, writeFileSync, existsSync, mkdirSync } from "node:fs";
import { dirname } from "node:path";
interface CryptoData {
deviceKeys?: Record<string, unknown>;
rooms?: Record<string, unknown>;
sessions?: Record<string, unknown>;
inboundGroupSessions?: Record<string, unknown>;
outboundGroupSessions?: Record<string, unknown>;
userDevices?: Record<string, unknown>;
crossSigningInfo?: unknown;
privateKeys?: Record<string, unknown>;
}
export class PersistentCryptoStore {
private data: CryptoData = {};
private filePath: string;
private memoryStore: any;
constructor(filePath: string) {
this.filePath = filePath;
this.loadFromDisk();
this.memoryStore = this.createMemoryStore();
}
private loadFromDisk(): void {
try {
if (existsSync(this.filePath)) {
const content = readFileSync(this.filePath, "utf-8");
this.data = JSON.parse(content);
log.info(`Loaded from ${this.filePath}`);
}
} catch (err) {
log.warn("Failed to load, starting fresh:", err);
this.data = {};
}
}
private saveToDisk(): void {
try {
const dir = dirname(this.filePath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
writeFileSync(this.filePath, JSON.stringify(this.data, null, 2));
} catch (err) {
log.error("Failed to save:", err);
}
}
private createMemoryStore(): any {
// Simple memory store implementation that syncs to disk
const store = {
getItem: (key: string) => {
return this.data[key as keyof CryptoData];
},
setItem: (key: string, value: any) => {
(this.data as any)[key] = value;
this.saveToDisk();
},
removeItem: (key: string) => {
delete (this.data as any)[key];
this.saveToDisk();
},
};
return store;
}
// Implement the CryptoStore interface
async getDeviceKeys(): Promise<Record<string, unknown> | null> {
return this.data.deviceKeys || null;
}
async setDeviceKeys(keys: Record<string, unknown>): Promise<void> {
this.data.deviceKeys = keys;
this.saveToDisk();
}
async getRoom(roomId: string): Promise<unknown | null> {
return this.data.rooms?.[roomId] || null;
}
async setRoom(roomId: string, data: unknown): Promise<void> {
if (!this.data.rooms) this.data.rooms = {};
this.data.rooms[roomId] = data;
this.saveToDisk();
}
async getSession(deviceKey: string, sessionId: string): Promise<unknown | null> {
return this.data.sessions?.[`${deviceKey}:${sessionId}`] || null;
}
async setSession(deviceKey: string, sessionId: string, data: unknown): Promise<void> {
if (!this.data.sessions) this.data.sessions = {};
this.data.sessions[`${deviceKey}:${sessionId}`] = data;
this.saveToDisk();
}
async getInboundGroupSession(roomId: string, sessionId: string): Promise<unknown | null> {
return this.data.inboundGroupSessions?.[`${roomId}:${sessionId}`] || null;
}
async setInboundGroupSession(roomId: string, sessionId: string, data: unknown): Promise<void> {
if (!this.data.inboundGroupSessions) this.data.inboundGroupSessions = {};
this.data.inboundGroupSessions[`${roomId}:${sessionId}`] = data;
this.saveToDisk();
}
async getUserDevices(userId: string): Promise<Record<string, unknown> | null> {
const devices = this.data.userDevices?.[userId] as Record<string, unknown> | undefined;
return devices !== undefined ? devices : null;
}
async setUserDevices(userId: string, devices: Record<string, unknown>): Promise<void> {
if (!this.data.userDevices) this.data.userDevices = {};
this.data.userDevices[userId] = devices;
this.saveToDisk();
}
async getCrossSigningInfo(): Promise<unknown | null> {
return this.data.crossSigningInfo || null;
}
async setCrossSigningInfo(info: unknown): Promise<void> {
this.data.crossSigningInfo = info;
this.saveToDisk();
}
async getPrivateKey(keyType: string): Promise<unknown | null> {
return this.data.privateKeys?.[keyType] || null;
}
async setPrivateKey(keyType: string, key: unknown): Promise<void> {
if (!this.data.privateKeys) this.data.privateKeys = {};
this.data.privateKeys[keyType] = key;
this.saveToDisk();
}
}

View File

@@ -0,0 +1,124 @@
import { createLogger } from '../../logger.js';
const log = createLogger('MatrixQueue');
/**
* Message Queue
*
* Handles message queuing for busy states with size limiting and retry logic.
*/
import type { QueueItem } from "./types.js";
interface QueueConfig {
maxSize?: number;
processIntervalMs?: number;
}
type QueueProcessor = (item: QueueItem) => Promise<void>;
export class MessageQueue {
private queue: QueueItem[] = [];
private maxSize: number;
private processIntervalMs: number;
private processor: QueueProcessor | null = null;
private intervalId: NodeJS.Timeout | null = null;
private processing = false;
constructor(config: QueueConfig = {}) {
this.maxSize = config.maxSize ?? 100;
this.processIntervalMs = config.processIntervalMs ?? 1000;
}
/**
* Add item to queue
*/
enqueue(item: QueueItem): boolean {
if (this.queue.length >= this.maxSize) {
log.warn("Queue full, dropping message");
return false;
}
this.queue.push(item);
log.info(`Enqueued message from ${item.sender} (queue size: ${this.queue.length})`);
return true;
}
/**
* Start processing queue
*/
startProcessing(processor: QueueProcessor): void {
if (this.intervalId) {
return; // Already running
}
this.processor = processor;
this.intervalId = setInterval(() => {
this.processNext();
}, this.processIntervalMs);
log.info("Started processing");
}
/**
* Stop processing queue
*/
stopProcessing(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
this.processor = null;
log.info("Stopped processing");
}
/**
* Process next item in queue
*/
private async processNext(): Promise<void> {
if (this.processing || this.queue.length === 0 || !this.processor) {
return;
}
this.processing = true;
const item = this.queue.shift();
if (item) {
try {
await this.processor(item);
} catch (err) {
log.error("Failed to process item:", err);
// Could re-enqueue here if needed
}
}
this.processing = false;
}
/**
* Get current queue size
*/
getSize(): number {
return this.queue.length;
}
/**
* Clear all items from queue
*/
clear(): void {
this.queue = [];
log.info("Cleared all items");
}
/**
* Check if queue is full
*/
isFull(): boolean {
return this.queue.length >= this.maxSize;
}
/**
* Check if queue is empty
*/
isEmpty(): boolean {
return this.queue.length === 0;
}
}

View File

@@ -0,0 +1,164 @@
/**
* Matrix Session Manager
*
* Handles persistent session storage with backup/restore functionality.
*/
import { existsSync, mkdirSync, readFileSync, writeFileSync, renameSync, readdirSync, unlinkSync } from "node:fs";
import { dirname, join } from "node:path";
import { createLogger } from "../../logger.js";
import type { MatrixSession } from "./types.js";
const log = createLogger('MatrixSession');
interface SessionManagerConfig {
sessionFile: string;
backupCount?: number;
}
export class MatrixSessionManager {
private sessionFile: string;
private backupCount: number;
constructor(config: SessionManagerConfig) {
this.sessionFile = config.sessionFile;
this.backupCount = config.backupCount ?? 3;
// Ensure directory exists
const dir = dirname(this.sessionFile);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
}
/**
* Load session from disk
*/
loadSession(): MatrixSession | null {
try {
if (!existsSync(this.sessionFile)) {
return null;
}
const data = readFileSync(this.sessionFile, "utf-8");
const session = JSON.parse(data) as MatrixSession;
// Validate required fields
if (!session.userId || !session.accessToken) {
log.warn("[MatrixSession] Invalid session data, ignoring");
return null;
}
log.info(`[MatrixSession] Loaded session for ${session.userId}`);
return session;
} catch (err) {
log.error("[MatrixSession] Failed to load session:", err);
return null;
}
}
/**
* Save session to disk with backup
*/
saveSession(session: MatrixSession): void {
try {
// Create backup of existing session
if (existsSync(this.sessionFile)) {
this.rotateBackups();
}
// Write new session atomically
const tempFile = `${this.sessionFile}.tmp`;
writeFileSync(tempFile, JSON.stringify(session, null, 2), { mode: 0o600 });
renameSync(tempFile, this.sessionFile);
log.info(`[MatrixSession] Saved session for ${session.userId}`);
} catch (err) {
log.error("[MatrixSession] Failed to save session:", err);
throw err;
}
}
/**
* Rotate backup files
*/
private rotateBackups(): void {
const dir = dirname(this.sessionFile);
const baseName = this.sessionFile.split("/").pop() || "session.json";
// Remove oldest backup
const oldestBackup = join(dir, `${baseName}.backup.${this.backupCount}`);
if (existsSync(oldestBackup)) {
unlinkSync(oldestBackup);
}
// Shift existing backups
for (let i = this.backupCount - 1; i >= 1; i--) {
const oldBackup = join(dir, `${baseName}.backup.${i}`);
const newBackup = join(dir, `${baseName}.backup.${i + 1}`);
if (existsSync(oldBackup)) {
renameSync(oldBackup, newBackup);
}
}
// Create new backup
const firstBackup = join(dir, `${baseName}.backup.1`);
renameSync(this.sessionFile, firstBackup);
}
/**
* Restore from most recent backup
*/
restoreFromBackup(): MatrixSession | null {
const dir = dirname(this.sessionFile);
const baseName = this.sessionFile.split("/").pop() || "session.json";
const firstBackup = join(dir, `${baseName}.backup.1`);
if (!existsSync(firstBackup)) {
log.warn("[MatrixSession] No backup available to restore");
return null;
}
try {
const data = readFileSync(firstBackup, "utf-8");
const session = JSON.parse(data) as MatrixSession;
log.info(`[MatrixSession] Restored from backup for ${session.userId}`);
return session;
} catch (err) {
log.error("[MatrixSession] Failed to restore from backup:", err);
return null;
}
}
/**
* Clear session and backups
*/
clearSession(): void {
try {
if (existsSync(this.sessionFile)) {
unlinkSync(this.sessionFile);
}
const dir = dirname(this.sessionFile);
const baseName = this.sessionFile.split("/").pop() || "session.json";
for (let i = 1; i <= this.backupCount; i++) {
const backup = join(dir, `${baseName}.backup.${i}`);
if (existsSync(backup)) {
unlinkSync(backup);
}
}
log.info("[MatrixSession] Cleared all sessions");
} catch (err) {
log.error("[MatrixSession] Failed to clear session:", err);
}
}
/**
* Check if session exists
*/
hasSession(): boolean {
return existsSync(this.sessionFile);
}
}

View File

@@ -0,0 +1,362 @@
/**
* Matrix Storage
*
* SQLite-based persistent storage for Matrix adapter state.
* Does NOT store room→conversation mappings — that is handled by bot.ts
* per-chat mode (key: 'matrix:{roomId}').
*/
import { existsSync, mkdirSync } from "node:fs";
import { dirname, join } from "node:path";
import Database from "better-sqlite3";
import { createLogger } from "../../logger.js";
const log = createLogger('MatrixStorage');
interface StorageConfig {
dataDir: string;
}
export class MatrixStorage {
private db: Database.Database | null = null;
private dataDir: string;
constructor(config: StorageConfig) {
this.dataDir = config.dataDir;
// Ensure directory exists
if (!existsSync(this.dataDir)) {
mkdirSync(this.dataDir, { recursive: true });
}
}
/**
* Initialize the database
*/
async init(): Promise<void> {
const dbPath = join(this.dataDir, "matrix.db");
this.db = new Database(dbPath);
// Enable WAL mode for better concurrency
this.db.pragma("journal_mode = WAL");
// Create tables
this.createTables();
log.info("[MatrixStorage] Database initialized");
}
/**
* Create database tables
*/
private createTables(): void {
if (!this.db) return;
// Message event mappings for reaction feedback
this.db.exec(`
CREATE TABLE IF NOT EXISTS message_mappings (
matrix_event_id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
step_id TEXT,
sender TEXT NOT NULL,
room_id TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`);
// Audio message mappings for TTS regeneration
this.db.exec(`
CREATE TABLE IF NOT EXISTS audio_messages (
audio_event_id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
room_id TEXT NOT NULL,
original_text TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`);
// Per-room pause state (set via !pause / !resume)
this.db.exec(`
CREATE TABLE IF NOT EXISTS paused_rooms (
room_id TEXT PRIMARY KEY,
paused_by TEXT NOT NULL,
paused_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`);
// Bot ignore list (set via !bot-add / !bot-remove, prevents message loops)
this.db.exec(`
CREATE TABLE IF NOT EXISTS ignored_bots (
user_id TEXT PRIMARY KEY,
added_by TEXT NOT NULL,
added_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`);
this.db.exec(`
CREATE INDEX IF NOT EXISTS idx_msg_room ON message_mappings(room_id);
`);
}
/**
* Store message mapping for reaction tracking
*/
storeMessageMapping(
matrixEventId: string,
conversationId: string,
stepId: string | undefined,
sender: string,
roomId: string,
): void {
if (!this.db) {
log.warn('[MatrixStorage] storeMessageMapping: Database not initialized');
return;
}
try {
const stmt = this.db.prepare(`
INSERT INTO message_mappings (matrix_event_id, conversation_id, step_id, sender, room_id)
VALUES (?, ?, ?, ?, ?)
`);
stmt.run(matrixEventId, conversationId, stepId || null, sender, roomId);
} catch (err) {
log.error(`[MatrixStorage] storeMessageMapping failed for event ${matrixEventId}:`, err);
}
}
/**
* Get step IDs for a message event
*/
getStepIdsForEvent(matrixEventId: string): string[] {
if (!this.db) {
log.warn('[MatrixStorage] getStepIdsForEvent: Database not initialized');
return [];
}
try {
const stmt = this.db.prepare(
"SELECT step_id FROM message_mappings WHERE matrix_event_id = ? AND step_id IS NOT NULL",
);
const results = stmt.all(matrixEventId) as { step_id: string }[];
return results.map((r) => r.step_id);
} catch (err) {
log.error(`[MatrixStorage] getStepIdsForEvent failed for event ${matrixEventId}:`, err);
return [];
}
}
/**
* Store audio message for TTS regeneration
*/
storeAudioMessage(
audioEventId: string,
conversationId: string,
roomId: string,
originalText: string,
): void {
if (!this.db) {
log.warn('[MatrixStorage] storeAudioMessage: Database not initialized');
return;
}
try {
const stmt = this.db.prepare(`
INSERT INTO audio_messages (audio_event_id, conversation_id, room_id, original_text)
VALUES (?, ?, ?, ?)
`);
stmt.run(audioEventId, conversationId, roomId, originalText);
} catch (err) {
log.error(`[MatrixStorage] storeAudioMessage failed for event ${audioEventId}:`, err);
}
}
/**
* Get original text for audio message
*/
getOriginalTextForAudio(audioEventId: string): string | null {
if (!this.db) {
log.warn('[MatrixStorage] getOriginalTextForAudio: Database not initialized');
return null;
}
try {
const stmt = this.db.prepare(
"SELECT original_text FROM audio_messages WHERE audio_event_id = ?",
);
const result = stmt.get(audioEventId) as { original_text: string } | undefined;
return result?.original_text || null;
} catch (err) {
log.error(`[MatrixStorage] getOriginalTextForAudio failed for event ${audioEventId}:`, err);
return null;
}
}
// ─── Per-room pause state ─────────────────────────────────────────────────
pauseRoom(roomId: string, pausedBy: string): void {
if (!this.db) return;
this.db.prepare(
"INSERT INTO paused_rooms (room_id, paused_by) VALUES (?, ?) ON CONFLICT(room_id) DO UPDATE SET paused_by = excluded.paused_by, paused_at = CURRENT_TIMESTAMP",
).run(roomId, pausedBy);
}
resumeRoom(roomId: string): void {
if (!this.db) return;
this.db.prepare("DELETE FROM paused_rooms WHERE room_id = ?").run(roomId);
}
isRoomPaused(roomId: string): boolean {
if (!this.db) return false;
const result = this.db.prepare("SELECT 1 FROM paused_rooms WHERE room_id = ?").get(roomId);
return result !== undefined;
}
getPausedRooms(): string[] {
if (!this.db) return [];
const rows = this.db.prepare("SELECT room_id FROM paused_rooms").all() as { room_id: string }[];
return rows.map((r) => r.room_id);
}
// ─── Bot ignore list ───────────────────────────────────────────────────────
addIgnoredBot(userId: string, addedBy: string): void {
if (!this.db) return;
this.db.prepare(
"INSERT INTO ignored_bots (user_id, added_by) VALUES (?, ?) ON CONFLICT(user_id) DO UPDATE SET added_by = excluded.added_by, added_at = CURRENT_TIMESTAMP",
).run(userId, addedBy);
}
removeIgnoredBot(userId: string): void {
if (!this.db) return;
this.db.prepare("DELETE FROM ignored_bots WHERE user_id = ?").run(userId);
}
isIgnoredBot(userId: string): boolean {
if (!this.db) return false;
const result = this.db.prepare("SELECT 1 FROM ignored_bots WHERE user_id = ?").get(userId);
return result !== undefined;
}
getIgnoredBots(): string[] {
if (!this.db) return [];
const rows = this.db.prepare("SELECT user_id FROM ignored_bots").all() as { user_id: string }[];
return rows.map((r) => r.user_id);
}
// ─── Storage Pruning ───────────────────────────────────────────────────────
/**
* Prune old entries from audio_messages and message_mappings tables
* Returns array of {table, deletedCount} for each table pruned
* @param retentionDays - Delete entries older than this many days (default: 30)
* @returns Array of pruning results
*/
pruneOldEntries(retentionDays = 30): Array<{ table: string; deletedCount: number }> {
if (!this.db) return [];
const results: Array<{ table: string; deletedCount: number }> = [];
try {
// Calculate cutoff date
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
const cutoffIso = cutoffDate.toISOString();
// Prune audio_messages table
const audioStmt = this.db.prepare(
"DELETE FROM audio_messages WHERE created_at < ?"
);
const audioResult = audioStmt.run(cutoffIso);
results.push({
table: 'audio_messages',
deletedCount: audioResult.changes
});
// Prune message_mappings table
const mappingStmt = this.db.prepare(
"DELETE FROM message_mappings WHERE created_at < ?"
);
const mappingResult = mappingStmt.run(cutoffIso);
results.push({
table: 'message_mappings',
deletedCount: mappingResult.changes
});
// Log results
if (results.some(r => r.deletedCount > 0)) {
const totalDeleted = results.reduce((sum, r) => sum + r.deletedCount, 0);
log.info(
`[MatrixStorage] Pruned ${totalDeleted} old entry/entries ` +
`(older than ${retentionDays} days): ` +
results.map(r => `${r.table}=${r.deletedCount}`).join(', ')
);
}
} catch (err) {
log.error('[MatrixStorage] Failed to prune old entries:', err);
}
return results;
}
/**
* Get pruning statistics
*/
getPruningStats(): {
audioMessagesCount: number;
messageMappingsCount: number;
oldestAudioMessage: string | null;
oldestMessageMapping: string | null;
} {
if (!this.db) {
return {
audioMessagesCount: 0,
messageMappingsCount: 0,
oldestAudioMessage: null,
oldestMessageMapping: null,
};
}
try {
const audioCountStmt = this.db.prepare("SELECT COUNT(*) as count FROM audio_messages");
const audioCount = (audioCountStmt.get() as { count: number })?.count || 0;
const mappingCountStmt = this.db.prepare("SELECT COUNT(*) as count FROM message_mappings");
const mappingCount = (mappingCountStmt.get() as { count: number })?.count || 0;
const oldestAudioStmt = this.db.prepare(
"SELECT MIN(created_at) as oldest FROM audio_messages"
);
const oldestAudio = (oldestAudioStmt.get() as { oldest: string })?.oldest || null;
const oldestMappingStmt = this.db.prepare(
"SELECT MIN(created_at) as oldest FROM message_mappings"
);
const oldestMapping = (oldestMappingStmt.get() as { oldest: string })?.oldest || null;
return {
audioMessagesCount: audioCount,
messageMappingsCount: mappingCount,
oldestAudioMessage: oldestAudio,
oldestMessageMapping: oldestMapping,
};
} catch (err) {
log.error('[MatrixStorage] Failed to get pruning stats:', err);
return {
audioMessagesCount: 0,
messageMappingsCount: 0,
oldestAudioMessage: null,
oldestMessageMapping: null,
};
}
}
/**
* Close the database
*/
close(): void {
if (this.db) {
this.db.close();
this.db = null;
}
}
}

View File

@@ -0,0 +1,57 @@
/**
* Speech-to-Text (STT) for Matrix Adapter
*/
import { createLogger } from "../../logger.js";
const log = createLogger('MatrixSTT');
export interface STTConfig {
url?: string;
language?: string;
model?: string;
}
export interface STTResult {
text: string;
language?: string;
}
export async function transcribeAudio(
audioData: Buffer,
config: STTConfig,
): Promise<string> {
const url = config.url || "http://localhost:7862";
const model = config.model || "small";
log.info(`[MatrixSTT] Transcribing ${audioData.length} bytes`);
try {
const formData = new FormData();
const blob = new Blob([new Uint8Array(audioData)], { type: "audio/mpeg" });
formData.append("audio", blob, "audio.mp3");
if (config.language) {
formData.append("language", config.language);
}
formData.append("model", model);
const response = await fetch(`${url}/transcribe`, {
method: "POST",
body: formData,
});
if (!response.ok) {
throw new Error(`STT API error: ${response.status}`);
}
const result = (await response.json()) as STTResult;
return result.text?.trim() || "";
} catch (err) {
log.error("[MatrixSTT] Failed:", err);
return `[STT Error] ${err instanceof Error ? err.message : "Unknown"}`;
}
}
export function isTranscriptionFailed(text: string): boolean {
return text.startsWith("[") && text.includes("Error");
}

179
src/channels/matrix/tts.ts Normal file
View File

@@ -0,0 +1,179 @@
/**
* Text-to-Speech (TTS) for Matrix Adapter
*
* Synthesizes text to speech using VibeVoice API.
*/
import { createLogger } from "../../logger.js";
const log = createLogger('MatrixTTS');
export interface TTSConfig {
url?: string;
voice?: string;
format?: "mp3" | "wav";
speed?: number;
sampleRate?: number;
}
export interface VoiceInfo {
id: string;
name: string;
language: string;
gender?: string;
}
// Pronunciation fixes applied before TTS — word-boundary replacements
const PRONUNCIATION_MAP: Record<string, string> = {
// Names
"Xzaviar": "X-zay-V-ar",
"xzaviar": "X-zay-V-ar",
"Jean Luc": "Zhan-Look",
"jean luc": "Zhan-Look",
"Sebastian": "Se-BASS-chen",
"sebastian": "Se-BASS-chen",
// Technical terms that TTS often mangles
"API": "A P I",
"SDK": "S D K",
"E2EE": "end-to-end encrypted",
"TTS": "text to speech",
"STT": "speech to text",
};
/**
* Apply pronunciation fixes using word-boundary regex
*/
function applyPronunciationFixes(text: string): string {
let result = text;
for (const [wrong, right] of Object.entries(PRONUNCIATION_MAP)) {
result = result.replace(new RegExp(`\\b${escapeRegExp(wrong)}\\b`, "gi"), right);
}
return result;
}
/**
* Clean text for TTS synthesis — matches Python bridge synthesize_speech() cleaning.
* Call order: control tags → HTML → markdown → code blocks → emojis → pronunciation → whitespace.
*/
export function cleanTextForTTS(text: string): string {
let cleaned = text;
// Strip agent control tags
cleaned = cleaned.replace(/\[silent\]/gi, "");
cleaned = cleaned.replace(/\[chromatophore\]/gi, "");
cleaned = cleaned.replace(/\[!c\]/gi, "");
cleaned = cleaned.replace(/\[!s\]/gi, "");
cleaned = cleaned.replace(/\[react:[^\]]*\]/gi, "");
// Strip color syntax {color|text} → keep text
cleaned = cleaned.replace(/\{[^}|]+\|([^}]+)\}/g, "$1");
// Strip spoilers ||text|| → keep text
cleaned = cleaned.replace(/\|\|(.+?)\|\|/gs, "$1");
// Strip HTML tags (keep content)
cleaned = cleaned.replace(/<[^>]+>/g, "");
// Remove code blocks entirely (don't read code aloud)
cleaned = cleaned.replace(/```[\s\S]*?```/g, "");
// Strip bold and italic markers (keep text)
cleaned = cleaned.replace(/\*\*(.+?)\*\*/g, "$1");
cleaned = cleaned.replace(/\*(.+?)\*/g, "$1");
// Remove inline code markers (keep content)
cleaned = cleaned.replace(/`([^`]+)`/g, "$1");
// Convert markdown links to spoken form: [text](url) → text
cleaned = cleaned.replace(/\[([^\]]+)\]\([^)]+\)/g, "$1");
// Emoji handling: preserve ✨ and 🎤, strip everything else
// Use marker swap trick from Python bridge
const SPARKLE_MARKER = "__SPARKLE__";
const MIC_MARKER = "__MIC__";
cleaned = cleaned.replace(/✨/g, SPARKLE_MARKER);
cleaned = cleaned.replace(/🎤/g, MIC_MARKER);
// Strip remaining emoji (broad Unicode ranges)
cleaned = cleaned.replace(
/[\u{1F600}-\u{1F64F}\u{1F300}-\u{1F5FF}\u{1F680}-\u{1F6FF}\u{1F1E0}-\u{1F1FF}\u{2702}-\u{27B0}\u{24C2}-\u{1F251}\u{1F900}-\u{1FAFF}]/gu,
""
);
cleaned = cleaned.replace(new RegExp(SPARKLE_MARKER, "g"), "✨");
cleaned = cleaned.replace(new RegExp(MIC_MARKER, "g"), "🎤");
// Apply pronunciation fixes
cleaned = applyPronunciationFixes(cleaned);
// Collapse whitespace
cleaned = cleaned.replace(/\s+/g, " ").trim();
return cleaned;
}
function escapeRegExp(string: string): string {
return string.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
/**
* Synthesize speech from text using VibeVoice API
*/
export async function synthesizeSpeech(
text: string,
config: TTSConfig,
): Promise<Buffer> {
const url = config.url || "http://10.10.20.19:7861";
const voice = config.voice || "en-Soother_woman";
const format = config.format || "mp3";
const speed = config.speed || 1.0;
const sampleRate = config.sampleRate || 22050;
const cleanedText = cleanTextForTTS(text);
log.info(`[MatrixTTS] Synthesizing: ${cleanedText.slice(0, 50)}...`);
log.info(`[MatrixTTS] Voice: ${voice}, Format: ${format}`);
try {
const response = await fetch(`${url}/audio/speech`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
input: cleanedText,
voice: voice,
model: "vibevoice-v1",
}),
});
if (!response.ok) {
throw new Error(`TTS API error: ${response.status} ${response.statusText}`);
}
const audioBuffer = Buffer.from(await response.arrayBuffer());
log.info(`[MatrixTTS] Synthesized ${audioBuffer.length} bytes`);
return audioBuffer;
} catch (err) {
log.error("[MatrixTTS] Failed to synthesize:", err);
throw err;
}
}
/**
* Get available voices from VibeVoice API
*/
export async function getAvailableVoices(url?: string): Promise<VoiceInfo[]> {
const apiUrl = url || "http://10.10.20.19:7861";
try {
const response = await fetch(`${apiUrl}/voices`);
if (!response.ok) {
throw new Error(`TTS API error: ${response.status} ${response.statusText}`);
}
const voices = (await response.json()) as VoiceInfo[];
log.info(`[MatrixTTS] Found ${voices.length} voices`);
return voices;
} catch (err) {
log.error("[MatrixTTS] Failed to get voices:", err);
// Return default voice info as fallback
return [{ id: "en_soothing", name: "Soothing English", language: "en" }];
}
}

View File

@@ -0,0 +1,195 @@
/**
* Matrix Adapter Types
*
* Shared types, constants, and interfaces for the Matrix adapter.
*/
import type { DmPolicy } from "../../pairing/types.js";
// Configuration interface (extends the base MatrixConfig from config/types.ts)
export interface MatrixAdapterConfig {
homeserverUrl: string;
userId: string;
accessToken?: string;
password?: string;
deviceId?: string;
// Security
dmPolicy?: DmPolicy;
allowedUsers?: string[];
selfChatMode?: boolean;
// E2EE
enableEncryption?: boolean;
recoveryKey?: string;
userDeviceId?: string; // User's Element device ID for proactive verification
// Storage
storeDir?: string;
sessionDir?: string; // Alias for storeDir (used by factory)
sessionFile?: string;
// Features
transcriptionEnabled?: boolean;
sttUrl?: string;
ttsUrl?: string;
ttsVoice?: string;
enableAudioResponse?: boolean;
audioRoomFilter?: "dm_only" | "all" | "none";
// Image handling
imageMaxSize?: number;
// File uploads — base directory for saving received files (aligns with shared attachmentsDir)
// Files saved to: {attachmentsDir}/uploads/YYYY-MM/{filename}
// Defaults to process.cwd() so agent Bash tools can access them
attachmentsDir?: string;
attachmentsMaxBytes?: number;
/** @deprecated use attachmentsDir */
uploadDir?: string;
// Reactions
enableReactions?: boolean;
// Streaming edits
streaming?: boolean;
// Auto-join rooms on invite
autoJoinRooms?: boolean;
// Group batching settings
/** Debounce interval for group room messages in seconds (default: 5s, 0 = immediate) */
groupDebounceSec?: number;
/** Room IDs that bypass debouncing entirely */
instantGroups?: string[];
/** Room IDs where bot listens but doesn't respond (observer mode) */
listeningGroups?: string[];
// Message prefix for bot responses
messagePrefix?: string;
// Storage pruning
enableStoragePruning?: boolean;
storageRetentionDays?: number;
storagePruningIntervalHours?: number;
}
// Session type
export interface MatrixSession {
userId: string;
deviceId: string;
accessToken: string;
homeserver: string;
timestamp: string;
}
// Message queue types
export interface QueueItem {
roomId: string;
sender: string;
message: string;
timestamp: number;
type: "text" | "audio" | "image";
imageData?: {
data: Buffer;
format: string;
mimeType?: string;
};
}
// Pending image handling
export interface PendingImage {
eventId: string;
roomId: string;
imageData: Buffer;
format: string;
mimeType?: string;
timestamp: number;
message?: string;
}
// Reaction definitions
export const POSITIVE_REACTIONS = new Set([
"👍",
":thumbsup:",
"❤️",
":heart:",
"✅",
":white_check_mark:",
"👏",
":clap:",
"🎉",
":tada:",
"🌟",
":star:",
]);
export const NEGATIVE_REACTIONS = new Set([
"👎",
":thumbsdown:",
"😢",
":cry:",
"😔",
":pensive:",
"❌",
":x:",
"❎",
":negative_squared_cross_mark:",
"😕",
":confused:",
]);
export const SPECIAL_REACTIONS = {
REGENERATE_AUDIO: "🎤",
SEND_PENDING_IMAGE: "✅",
} as const;
// Color constants (Matrix extensions)
export const MATRIX_COLORS = {
RED: "#FF0000",
GREEN: "#00FF00",
BLUE: "#0000FF",
HOT_PINK: "#FF1493",
PURPLE: "#800080",
ORANGE: "#FFA500",
YELLOW: "#FFFF00",
CYAN: "#00FFFF",
WHITE: "#FFFFFF",
BLACK: "#000000",
GREY: "#808080",
} as const;
// HTML formatting constants
export const MATRIX_HTML_FORMAT = "org.matrix.custom.html";
// Default values
export const DEFAULTS = {
TTS_VOICE: "en-Soother_woman",
AUDIO_ROOM_FILTER: "dm_only" as const,
IMAGE_MAX_SIZE: 2000,
ENABLE_REACTIONS: true,
ENABLE_ENCRYPTION: true,
};
// Event content types (inline definitions since SDK doesn't export them in v40+)
export interface ReactionEventContent {
"m.relates_to": {
rel_type: string;
event_id: string;
key: string;
};
}
export interface RoomMessageEventContent {
msgtype: string;
body: string;
format?: string;
formatted_body?: string;
url?: string;
info?: {
mimetype?: string;
size?: number;
w?: number;
h?: number;
};
}

View File

@@ -0,0 +1,398 @@
/**
* Matrix E2EE Device Verification Handler
*
* Handles SAS (emoji) device verification for matrix-js-sdk v28 with rust crypto.
*
* KEY FIXES:
* - Event handlers MUST be set up BEFORE startClient()
* - Use literal string event names: "show_sas", "cancel", "change"
* - Call verifier.verify() to actually start the verification flow
* - Accept when NOT in accepting state (!request.accepting)
*/
import { createLogger } from "../../logger.js";
import * as sdk from "matrix-js-sdk";
const log = createLogger('MatrixVerification');
interface VerificationCallbacks {
onShowSas?: (emojis: string[]) => void;
onComplete?: () => void;
onCancel?: (reason: string) => void;
onError?: (error: Error) => void;
}
interface ActiveVerification {
userId: string;
deviceId: string;
verifier: sdk.Crypto.Verifier | null;
request: sdk.Crypto.VerificationRequest;
sasCallbacks?: sdk.Crypto.ShowSasCallbacks | null;
}
/**
* Matrix Verification Handler for rust crypto backend
*
* Event flow (Matrix spec-compliant):
* 1. m.key.verification.request (incoming)
* 2. m.key.verification.ready (we accept)
* 3. m.key.verification.start (SAS method)
* 4. m.key.verification.key (exchange keys)
* 5. SAS computed - we call confirm()
* 6. m.key.verification.mac (send MAC)
* 7. m.key.verification.done
*
* CRITICAL: setupEventHandlers() MUST be called BEFORE client.startClient()
*/
export class MatrixVerificationHandler {
private client: sdk.MatrixClient;
private activeVerifications = new Map<string, ActiveVerification>();
private callbacks: VerificationCallbacks;
constructor(client: sdk.MatrixClient, callbacks: VerificationCallbacks = {}) {
this.client = client;
this.callbacks = callbacks;
}
/**
* CRITICAL: Call this BEFORE client.startClient()
*/
setupEventHandlers(): void {
// Log all verification to-device messages for debugging
this.client.on(sdk.ClientEvent.ToDeviceEvent, (event: sdk.MatrixEvent) => {
const type = event.getType();
if (type.startsWith("m.key.verification")) {
log.info(`[MatrixVerification] To-device: ${type} from ${event.getSender()}`, event.getContent());
}
});
// Listen for verification requests from rust crypto
// This is the PRIMARY event for incoming verification requests
this.client.on(sdk.CryptoEvent.VerificationRequestReceived, (request: sdk.Crypto.VerificationRequest) => {
log.info(`[MatrixVerification] VerificationRequestReceived: ${request.otherUserId}:${request.otherDeviceId}, phase=${this.phaseName(request.phase)}`);
this.handleVerificationRequest(request);
});
// Listen for device verification status changes
this.client.on(sdk.CryptoEvent.DevicesUpdated, (userIds: string[]) => {
log.info(`[MatrixVerification] Devices updated: ${userIds.join(", ")}`);
});
log.info("[MatrixVerification] Event handlers configured (ready BEFORE startClient())");
}
private phaseName(phase: sdk.Crypto.VerificationPhase): string {
const phases = ["Unsent", "Requested", "Ready", "Started", "Cancelled", "Done"];
return phases[phase - 1] || `Unknown(${phase})`;
}
private handleVerificationRequest(request: sdk.Crypto.VerificationRequest): void {
const otherUserId = request.otherUserId;
const otherDeviceId = request.otherDeviceId || "unknown";
const key = `${otherUserId}|${otherDeviceId}`;
// Check if already handling - but allow new requests if the old one is cancelled/timed out
const existing = this.activeVerifications.get(key);
if (existing) {
// If existing request is in a terminal state, clear it and proceed
if (existing.request.phase === sdk.Crypto.VerificationPhase.Cancelled ||
existing.request.phase === sdk.Crypto.VerificationPhase.Done) {
log.info(`[MatrixVerification] Clearing stale verification: ${otherUserId}:${otherDeviceId}`);
this.activeVerifications.delete(key);
} else if (request.phase === sdk.Crypto.VerificationPhase.Requested) {
// New request coming in while old one pending - replace it
log.info(`[MatrixVerification] Replacing stale verification: ${otherUserId}:${otherDeviceId}`);
this.activeVerifications.delete(key);
} else {
log.info(`[MatrixVerification] Already handling: ${otherUserId}:${otherDeviceId}`);
return;
}
}
log.info(`[MatrixVerification] *** REQUEST from ${otherUserId}:${otherDeviceId} ***`);
log.info(`[MatrixVerification] Phase: ${this.phaseName(request.phase)}`);
// NOTE: request.methods throws "not implemented" for RustVerificationRequest
// Rust crypto with SAS uses m.sas.v1 method by default
// Store the request immediately
this.activeVerifications.set(key, {
userId: otherUserId,
deviceId: otherDeviceId,
verifier: null,
request,
sasCallbacks: null,
});
// Handle based on phase
if (request.phase === sdk.Crypto.VerificationPhase.Requested) {
// Automatically accept incoming requests
this.acceptAndStartSAS(request, key);
} else if (request.phase === sdk.Crypto.VerificationPhase.Ready) {
// Already ready, start SAS
this.startSASVerification(request, key);
} else if (request.phase === sdk.Crypto.VerificationPhase.Started && request.verifier) {
// Verification already started, attach listeners
this.attachVerifierListeners(request.verifier, request, key);
}
}
private async acceptAndStartSAS(request: sdk.Crypto.VerificationRequest, key: string): Promise<void> {
try {
log.info("[MatrixVerification] Accepting verification request...");
await request.accept();
log.info(`[MatrixVerification] Accepted, phase is now: ${this.phaseName(request.phase)}`);
// Check if already Ready (phase might change immediately)
if (request.phase === sdk.Crypto.VerificationPhase.Ready) {
log.info("[MatrixVerification] Already Ready, starting SAS immediately...");
this.startSASVerification(request, key);
return;
}
// The SDK will emit a 'change' event when phase changes to Ready
// Listen for that and then start SAS
const onChange = () => {
log.info(`[MatrixVerification] Phase changed to: ${this.phaseName(request.phase)}`);
if (request.phase === sdk.Crypto.VerificationPhase.Ready) {
log.info("[MatrixVerification] Now in Ready phase, starting SAS...");
request.off("change" as any, onChange);
this.startSASVerification(request, key);
} else if (request.phase === sdk.Crypto.VerificationPhase.Done) {
request.off("change" as any, onChange);
}
};
request.on("change" as any, onChange);
// Also check after a short delay in case event doesn't fire
setTimeout(() => {
if (request.phase === sdk.Crypto.VerificationPhase.Ready) {
log.info("[MatrixVerification] Ready detected via timeout, starting SAS...");
request.off("change" as any, onChange);
this.startSASVerification(request, key);
}
}, 1000);
} catch (err) {
log.error("[MatrixVerification] Failed to accept:", err);
this.callbacks.onError?.(err as Error);
}
}
private async startSASVerification(request: sdk.Crypto.VerificationRequest, key: string): Promise<void> {
try {
log.info("[MatrixVerification] Starting SAS verification with m.sas.v1...");
// CRITICAL: Fetch device keys for the other user BEFORE starting SAS
// Without this, rust crypto says "device doesn't exist"
const crypto = this.client.getCrypto();
if (crypto && request.otherUserId) {
log.info(`[MatrixVerification] Fetching device keys for ${request.otherUserId}...`);
await crypto.getUserDeviceInfo([request.otherUserId], true);
log.info("[MatrixVerification] Device keys fetched");
// Small delay to let the crypto module process the keys
await new Promise(resolve => setTimeout(resolve, 500));
}
// Check if verifier already exists
const existingVerifier = request.verifier;
log.info(`[MatrixVerification] Verifier exists: ${!!existingVerifier}`);
if (existingVerifier) {
log.info("[MatrixVerification] Verifier already exists, attaching listeners...");
this.attachVerifierListeners(existingVerifier, request, key);
return;
}
log.info("[MatrixVerification] Calling request.startVerification()...");
// Start the SAS verification
const verifier = await request.startVerification("m.sas.v1");
log.info(`[MatrixVerification] startVerification() returned: ${!!verifier}`);
if (!verifier) {
throw new Error("startVerification returned undefined");
}
log.info("[MatrixVerification] SAS verifier created");
// Update stored verification
const stored = this.activeVerifications.get(key);
if (stored) {
stored.verifier = verifier;
}
// Attach listeners
log.info("[MatrixVerification] Attaching verifier listeners...");
this.attachVerifierListeners(verifier, request, key);
log.info("[MatrixVerification] Calling verifier.verify()...");
// Start the verification flow - this sends the accept message
await verifier.verify();
log.info("[MatrixVerification] verifier.verify() completed successfully");
} catch (err) {
log.error("[MatrixVerification] Error starting SAS:", err);
this.callbacks.onError?.(err as Error);
}
}
private attachVerifierListeners(verifier: sdk.Crypto.Verifier, request: sdk.Crypto.VerificationRequest, key: string): void {
// CRITICAL: Use the literal string "show_sas", not an enum property
verifier.on("show_sas" as any, (sas: sdk.Crypto.ShowSasCallbacks) => {
log.info("[MatrixVerification] *** SHOW SAS (EMOJI) ***");
if (!sas) {
log.error("[MatrixVerification] No SAS data received!");
return;
}
const sasData = verifier.getShowSasCallbacks();
if (!sasData?.sas?.emoji) {
log.error("[MatrixVerification] No emoji data in SAS!");
return;
}
const emojis = sasData.sas.emoji.map((e: [string, string]) => `${e[0]} ${e[1]}`);
log.info("[MatrixVerification] Emojis:", emojis.join(" | "));
log.info("[MatrixVerification] *** COMPARE THESE EMOJIS IN ELEMENT ***");
// Store callbacks and notify user
const stored = this.activeVerifications.get(key);
if (stored) {
stored.sasCallbacks = sasData;
}
this.callbacks.onShowSas?.(emojis);
// Auto-confirm after delay for bot
setTimeout(() => {
this.confirmVerification(key);
}, 5000); // 5 seconds for emoji comparison
});
// CRITICAL: Use the literal string "cancel"
verifier.on("cancel" as any, (err: Error | sdk.MatrixEvent) => {
log.error("[MatrixVerification] Verification cancelled:", err);
this.activeVerifications.delete(key);
const reason = err instanceof Error ? err.message : "Verification cancelled";
this.callbacks.onCancel?.(reason);
});
// Listen for verification request phase changes
request.on("change" as any, () => {
const phase = request.phase;
log.info(`[MatrixVerification] Request phase changed: ${this.phaseName(phase)}`);
if (phase === sdk.Crypto.VerificationPhase.Done) {
log.info("[MatrixVerification] *** VERIFICATION DONE ***");
this.activeVerifications.delete(key);
this.callbacks.onComplete?.();
} else if (phase === sdk.Crypto.VerificationPhase.Cancelled) {
log.info("[MatrixVerification] *** VERIFICATION CANCELLED ***");
this.activeVerifications.delete(key);
this.callbacks.onCancel?.(request.cancellationCode || "Unknown");
}
});
}
async confirmVerification(key: string): Promise<void> {
const stored = this.activeVerifications.get(key);
if (!stored?.sasCallbacks) {
log.info("[MatrixVerification] No pending verification to confirm");
return;
}
log.info("[MatrixVerification] Confirming verification (sending MAC)...");
try {
await stored.sasCallbacks.confirm();
log.info("[MatrixVerification] Verification confirmed (MAC sent). Waiting for Done...");
} catch (err) {
log.error("[MatrixVerification] Failed to confirm:", err);
this.callbacks.onError?.(err as Error);
}
}
/**
* Request verification with a specific device (initiated by us)
*/
async requestVerification(userId: string, deviceId: string): Promise<sdk.Crypto.VerificationRequest> {
const crypto = this.client.getCrypto();
if (!crypto) {
throw new Error("Crypto not initialized");
}
log.info(`[MatrixVerification] Requesting verification with ${userId}:${deviceId}`);
const request = await crypto.requestDeviceVerification(userId, deviceId);
const key = `${userId}|${deviceId}`;
this.activeVerifications.set(key, {
userId,
deviceId,
verifier: null,
request,
});
// Listen for the request to be ready, then start SAS
const onReadyOrStarted = () => {
const phase = request.phase;
if (phase === sdk.Crypto.VerificationPhase.Ready) {
log.info("[MatrixVerification] Outgoing request ready, starting SAS...");
this.startSASVerification(request, key);
request.off("change" as any, onReadyOrStarted);
} else if (phase === sdk.Crypto.VerificationPhase.Started && request.verifier) {
log.info("[MatrixVerification] Outgoing request already started, attaching listeners...");
this.attachVerifierListeners(request.verifier, request, key);
request.off("change" as any, onReadyOrStarted);
}
};
request.on("change" as any, onReadyOrStarted);
return request;
}
/**
* Get all pending verification requests for a user
*/
getVerificationRequests(userId: string): sdk.Crypto.VerificationRequest[] {
const requests: sdk.Crypto.VerificationRequest[] = [];
for (const [key, value] of Array.from(this.activeVerifications.entries())) {
if (key.startsWith(`${userId}|`)) {
requests.push(value.request);
}
}
return requests;
}
dispose(): void {
this.activeVerifications.forEach((v) => {
try {
// Note: EventEmitter.off() requires the specific handler reference
// Since we used anonymous functions, we can't easily remove them
// The map clear below will allow garbage collection anyway
} catch (e) {
// Ignore cleanup errors
}
});
this.activeVerifications.clear();
}
}
/**
* Format emojis for display
*/
export function formatEmojis(emojis: unknown[]): string {
if (!Array.isArray(emojis)) return "";
return emojis
.map((e) => {
if (Array.isArray(e) && e.length >= 2) {
return `${e[0]} ${e[1]}`;
}
return "";
})
.filter(Boolean)
.join(" | ");
}

View File

@@ -20,9 +20,10 @@ export const CHANNELS = [
{ id: 'whatsapp', displayName: 'WhatsApp', hint: 'QR code pairing' },
{ id: 'signal', displayName: 'Signal', hint: 'signal-cli daemon' },
{ id: 'bluesky', displayName: 'Bluesky', hint: 'Jetstream feed (read-only)' },
{ id: 'matrix', displayName: 'Matrix', hint: 'E2EE support with Element' },
] as const;
export type ChannelId = typeof CHANNELS[number]['id'];
export type ChannelId = typeof CHANNELS[number]['id'] | 'telegram-mtproto' | 'mock';
export function getChannelMeta(id: ChannelId) {
return CHANNELS.find(c => c.id === id)!;
@@ -43,7 +44,7 @@ export function getChannelHint(id: ChannelId): string {
// Group ID hints per channel
// ============================================================================
const GROUP_ID_HINTS: Record<ChannelId, string> = {
const GROUP_ID_HINTS: Record<typeof CHANNELS[number]['id'], string> = {
telegram:
'Group IDs are negative numbers (e.g., -1001234567890).\n' +
'Forward a group message to @userinfobot, or check bot logs.',
@@ -60,6 +61,9 @@ const GROUP_ID_HINTS: Record<ChannelId, string> = {
'Group IDs appear in bot logs on first group message.',
bluesky:
'Bluesky does not support groups. This setting is not used.',
matrix:
'Room IDs are in the format: !room:server.com\n' +
'In Element: Room Settings > Advanced > Room ID',
};
// ============================================================================
@@ -152,7 +156,7 @@ async function promptGroupSettings(
}
// Step 3: Channel-specific hint for finding group IDs
const hint = GROUP_ID_HINTS[channelId];
const hint = (GROUP_ID_HINTS as any)[channelId];
if (hint && mode !== 'disabled') {
p.note(
hint + '\n\n' +
@@ -718,15 +722,31 @@ export async function setupBluesky(existing?: BlueskyConfig): Promise<BlueskyCon
};
}
export async function setupMatrix(existing?: any): Promise<any> {
p.note(
'Matrix setup is extensive. Please configure lettabot.yaml manually.\n' +
'See docs/matrix-setup.md for detailed instructions.',
'Matrix Setup'
);
return existing || {};
}
/** Get the setup function for a channel */
export function getSetupFunction(id: ChannelId): (existing?: any) => Promise<any> {
const setupFunctions: Record<ChannelId, (existing?: any) => Promise<any>> = {
// Only setup channels that appear in CHANNELS array
const setupChannel = CHANNELS.find(c => c.id === id);
if (!setupChannel) {
throw new Error(`Channel '${id}' does not have a setup function`);
}
const setupFunctions: Record<typeof CHANNELS[number]['id'], (existing?: any) => Promise<any>> = {
telegram: setupTelegram,
slack: setupSlack,
discord: setupDiscord,
whatsapp: setupWhatsApp,
signal: setupSignal,
bluesky: setupBluesky,
matrix: setupMatrix,
};
return setupFunctions[id];
return setupFunctions[id as typeof CHANNELS[number]['id']];
}

View File

@@ -4,7 +4,8 @@
* Each channel (Telegram, Slack, Discord, WhatsApp, Signal) implements this interface.
*/
import type { ChannelId, InboundMessage, OutboundMessage, OutboundFile, FormatterHints } from '../core/types.js';
import type { InboundMessage, OutboundMessage, OutboundFile, FormatterHints } from '../core/types.js';
import type { ChannelId } from './setup.js';
/**
* Channel adapter - implement this for each messaging platform
@@ -20,20 +21,27 @@ export interface ChannelAdapter {
// Messaging
sendMessage(msg: OutboundMessage): Promise<{ messageId: string }>;
editMessage(chatId: string, messageId: string, text: string): Promise<void>;
editMessage(chatId: string, messageId: string, text: string, htmlPrefix?: string): Promise<void>;
sendTypingIndicator(chatId: string): Promise<void>;
stopTypingIndicator?(chatId: string): Promise<void>;
// Capabilities (optional)
supportsEditing?(): boolean;
sendFile?(file: OutboundFile): Promise<{ messageId: string }>;
sendAudio?(chatId: string, text: string): Promise<void>;
addReaction?(chatId: string, messageId: string, emoji: string): Promise<void>;
removeReaction?(chatId: string, messageId: string, emoji: string): Promise<void>;
/** Called after a bot message is sent (for TTS mapping, etc.) */
onMessageSent?(chatId: string, messageId: string, stepId?: string): void;
/** Store text for TTS regeneration on 🎤 reaction */
storeAudioMessage?(messageId: string, conversationId: string, roomId: string, text: string): void;
getDmPolicy?(): string;
getFormatterHints(): FormatterHints;
// Event handlers (set by bot core)
onMessage?: (msg: InboundMessage) => Promise<void>;
onCommand?: (command: string, chatId?: string, args?: string, forcePerChat?: boolean) => Promise<string | null>;
onInvalidateSession?: (key?: string) => void;
}
/**

View File

@@ -7,11 +7,11 @@
import * as p from '@clack/prompts';
import { loadAppConfigOrExit, saveConfig, resolveConfigPath } from '../config/index.js';
import {
CHANNELS,
getChannelHint,
import {
CHANNELS,
getChannelHint,
getSetupFunction,
type ChannelId
type ChannelId
} from '../channels/setup.js';
import { listGroupsFromArgs } from './group-listing.js';
@@ -201,7 +201,7 @@ export async function addChannel(channelId?: string): Promise<void> {
}
const channelIds = CHANNELS.map(c => c.id);
if (!channelIds.includes(channelId as ChannelId)) {
if (!channelIds.includes(channelId as typeof CHANNELS[number]['id'])) {
console.error(`Unknown channel: ${channelId}`);
console.error(`Valid channels: ${channelIds.join(', ')}`);
process.exit(1);
@@ -229,7 +229,7 @@ export async function removeChannel(channelId?: string): Promise<void> {
process.exit(1);
}
if (!channelIds.includes(channelId as ChannelId)) {
if (!channelIds.includes(channelId as typeof CHANNELS[number]['id'])) {
console.error(`Unknown channel: ${channelId}`);
console.error(`Valid channels: ${channelIds.join(', ')}`);
process.exit(1);

View File

@@ -152,7 +152,7 @@ function isChannelEnabled(config: unknown): boolean {
function getEnabledChannelIds(channels: AgentConfig['channels']): ChannelId[] {
return CHANNELS
.map((channel) => channel.id)
.filter((channelId) => isChannelEnabled(channels[channelId]));
.filter((channelId) => isChannelEnabled((channels as any)[channelId]));
}
export function getCoreDraftWarnings(draft: CoreConfigDraft): string[] {
@@ -262,6 +262,11 @@ async function editAgent(draft: CoreConfigDraft): Promise<void> {
}
async function runChannelSetupSafely(channelId: ChannelId, existing?: unknown): Promise<unknown | undefined> {
// Only setup channels that appear in CHANNELS array
const channelIds = CHANNELS.map(c => c.id);
if (!channelIds.includes(channelId as typeof CHANNELS[number]['id'])) {
return undefined;
}
const setup = getSetupFunction(channelId);
const originalExit = process.exit;
@@ -283,7 +288,7 @@ async function runChannelSetupSafely(channelId: ChannelId, existing?: unknown):
}
async function configureChannel(draft: CoreConfigDraft, channelId: ChannelId): Promise<void> {
const current = draft.channels[channelId];
const current = (draft.channels as any)[channelId];
const enabled = isChannelEnabled(current);
const action = await p.select({
@@ -308,7 +313,7 @@ async function configureChannel(draft: CoreConfigDraft, channelId: ChannelId): P
initialValue: false,
});
if (p.isCancel(confirmed) || !confirmed) return;
draft.channels[channelId] = { enabled: false } as AgentConfig['channels'][ChannelId];
;(draft.channels as any)[channelId] = { enabled: false };
return;
}
@@ -317,7 +322,7 @@ async function configureChannel(draft: CoreConfigDraft, channelId: ChannelId): P
p.log.info(`${channelId} setup cancelled.`);
return;
}
draft.channels[channelId] = result as AgentConfig['channels'][ChannelId];
;(draft.channels as any)[channelId] = result as any;
}
async function editChannels(draft: CoreConfigDraft): Promise<void> {
@@ -326,7 +331,7 @@ async function editChannels(draft: CoreConfigDraft): Promise<void> {
message: 'Select a channel to edit',
options: [
...CHANNELS.map((channel) => {
const enabled = isChannelEnabled(draft.channels[channel.id]);
const enabled = isChannelEnabled((draft.channels as any)[channel.id]);
return {
value: channel.id,
label: `${enabled ? '✓' : '✗'} ${channel.displayName}`,

View File

@@ -39,6 +39,10 @@ export interface DisplayConfig {
showReasoning?: boolean;
/** Truncate reasoning to N characters (default: 0 = no limit) */
reasoningMaxChars?: number;
/** Room IDs where reasoning should be shown (empty = all rooms that have showReasoning) */
reasoningRooms?: string[];
/** Room IDs where reasoning should be hidden (takes precedence over reasoningRooms) */
noReasoningRooms?: string[];
}
export type SleeptimeTrigger = 'off' | 'step-count' | 'compaction-event';
@@ -74,6 +78,7 @@ export interface AgentConfig {
signal?: SignalConfig;
discord?: DiscordConfig;
bluesky?: BlueskyConfig;
matrix?: MatrixConfig;
};
/** Conversation routing */
conversations?: {
@@ -82,6 +87,7 @@ export interface AgentConfig {
perChannel?: string[]; // Channels that should always have their own conversation
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed.
sessionModel?: string; // Model override for session creation (e.g., "synthetic-direct/hf:moonshotai/Kimi-K2.5")
};
/** Features for this agent */
features?: {
@@ -432,6 +438,69 @@ export interface BlueskyNotificationsConfig {
backfill?: boolean; // Process unread notifications on startup (default: false)
}
/**
* Matrix configuration.
* Supports end-to-end encryption (E2EE) with Element client.
*/
export interface MatrixConfig {
enabled: boolean;
homeserverUrl: string;
userId: string;
accessToken?: string;
password?: string;
deviceId?: string;
// Security
dmPolicy?: 'pairing' | 'allowlist' | 'open';
allowedUsers?: string[];
selfChatMode?: boolean;
// E2EE
enableEncryption?: boolean;
recoveryKey?: string;
userDeviceId?: string; // User's Element device ID for proactive verification
// Storage
storeDir?: string;
sessionDir?: string; // Session directory for Matrix client
// Auto-join rooms on startup
autoJoinRooms?: boolean;
// Groups
groups?: Record<string, GroupConfig>;
// TTS/STT (voice message) configuration
/** Enable voice message transcription (STT) */
transcriptionEnabled?: boolean;
/** STT server URL for voice transcription */
sttUrl?: string;
/** TTS server URL for outbound voice memos */
ttsUrl?: string;
/** TTS voice ID/name */
ttsVoice?: string;
/** Enable audio responses (TTS) */
enableAudioResponse?: boolean;
/** Filter for which rooms get audio responses */
audioRoomFilter?: 'dm_only' | 'all' | 'none';
// Media settings
/** Maximum image dimension before resize (default: 1024) */
imageMaxSize?: number;
/** Prefix prepended to every outbound message */
messagePrefix?: string;
/** Enable live streaming edits (default: true) */
streaming?: boolean;
// Group batching settings
/** Debounce interval for group room messages in seconds (default: 5s, 0 = immediate) */
groupDebounceSec?: number;
/** Room IDs that bypass debouncing entirely */
instantGroups?: string[];
/** Room IDs where bot listens but doesn't respond (observer mode) */
listeningGroups?: string[];
}
/**
* Telegram MTProto (user account) configuration.
* Uses TDLib for user account mode instead of Bot API.
@@ -609,6 +678,18 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
channels['telegram-mtproto'].phoneNumber = process.env.TELEGRAM_PHONE_NUMBER;
}
}
// Matrix TTS/STT env var merging
if (channels.matrix) {
if (!channels.matrix.ttsUrl && process.env.MATRIX_TTS_URL) channels.matrix.ttsUrl = process.env.MATRIX_TTS_URL;
if (!channels.matrix.ttsVoice && process.env.MATRIX_TTS_VOICE) channels.matrix.ttsVoice = process.env.MATRIX_TTS_VOICE;
if (!channels.matrix.sttUrl && process.env.MATRIX_STT_URL) channels.matrix.sttUrl = process.env.MATRIX_STT_URL;
if (channels.matrix.transcriptionEnabled === undefined && process.env.MATRIX_TRANSCRIPTION_ENABLED) {
channels.matrix.transcriptionEnabled = process.env.MATRIX_TRANSCRIPTION_ENABLED === 'true';
}
if (channels.matrix.enableAudioResponse === undefined && process.env.MATRIX_ENABLE_AUDIO_RESPONSE) {
channels.matrix.enableAudioResponse = process.env.MATRIX_ENABLE_AUDIO_RESPONSE === 'true';
}
}
if (channels.telegram?.enabled !== false && channels.telegram?.token) {
const telegram = { ...channels.telegram };
@@ -654,6 +735,12 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
normalized.bluesky = bluesky;
}
}
// Matrix: requires homeserverUrl and userId as credentials
if (channels.matrix?.enabled !== false && channels.matrix?.homeserverUrl && channels.matrix?.userId) {
const matrix = { ...channels.matrix };
normalizeLegacyGroupFields(matrix, `${sourcePath}.matrix`);
normalized.matrix = matrix;
}
const channelCredentials: Array<{ name: string; raw: unknown; included: boolean; required: string }> = [
{ name: 'telegram', raw: channels.telegram, included: !!normalized.telegram, required: 'token' },
@@ -661,6 +748,7 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
{ name: 'slack', raw: channels.slack, included: !!normalized.slack, required: 'botToken, appToken' },
{ name: 'signal', raw: channels.signal, included: !!normalized.signal, required: 'phone' },
{ name: 'discord', raw: channels.discord, included: !!normalized.discord, required: 'token' },
{ name: 'matrix', raw: channels.matrix, included: !!normalized.matrix, required: 'userId, password' },
];
const invalidChannels = channelCredentials
@@ -794,6 +882,27 @@ export function normalizeAgents(config: LettaBotConfig): AgentConfig[] {
: undefined,
};
}
if (!channels.matrix && process.env.MATRIX_HOMESERVER_URL && process.env.MATRIX_USER_ID) {
channels.matrix = {
enabled: true,
homeserverUrl: process.env.MATRIX_HOMESERVER_URL,
userId: process.env.MATRIX_USER_ID,
accessToken: process.env.MATRIX_ACCESS_TOKEN,
password: process.env.MATRIX_PASSWORD,
deviceId: process.env.MATRIX_DEVICE_ID,
dmPolicy: (process.env.MATRIX_DM_POLICY as 'pairing' | 'allowlist' | 'open') || 'pairing',
allowedUsers: parseList(process.env.MATRIX_ALLOWED_USERS),
selfChatMode: process.env.MATRIX_SELF_CHAT_MODE === 'true',
enableEncryption: process.env.MATRIX_ENABLE_ENCRYPTION === 'true',
// TTS/STT fields
ttsUrl: process.env.MATRIX_TTS_URL,
ttsVoice: process.env.MATRIX_TTS_VOICE,
sttUrl: process.env.MATRIX_STT_URL,
transcriptionEnabled: process.env.MATRIX_TRANSCRIPTION_ENABLED === 'true',
enableAudioResponse: process.env.MATRIX_ENABLE_AUDIO_RESPONSE === 'true',
audioRoomFilter: (process.env.MATRIX_AUDIO_ROOM_FILTER as 'dm_only' | 'all' | 'none') || 'dm_only',
};
}
// Field-level env var fallback for features (heartbeat, cron).
// Unlike channels (all-or-nothing), features are independent toggles so we

View File

@@ -13,7 +13,7 @@ import { extname, resolve, join } from 'node:path';
import type { ChannelAdapter } from '../channels/types.js';
import type { BotConfig, InboundMessage, TriggerContext, TriggerType, StreamMsg } from './types.js';
import { formatApiErrorForUser } from './errors.js';
import { formatToolCallDisplay, formatReasoningDisplay, formatQuestionsForChannel } from './display.js';
import { formatToolCallDisplay, formatQuestionsForChannel, formatReasoningAsCodeBlock } from './display.js';
import type { AgentSession } from './interfaces.js';
import { Store } from './store.js';
import { getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel, isRecoverableConversationId, recoverPendingApprovalsForAgent } from '../tools/letta-api.js';
@@ -220,8 +220,11 @@ export function resolveConversationKey(
conversationOverrides: Set<string>,
chatId?: string,
forcePerChat?: boolean,
heartbeatTargetChatId?: string,
): string {
if (conversationMode === 'disabled') return 'default';
// Messages in the heartbeat target room share the heartbeat conversation
if (heartbeatTargetChatId && chatId === heartbeatTargetChatId) return 'heartbeat';
const normalized = channel.toLowerCase();
if ((conversationMode === 'per-chat' || forcePerChat) && chatId) return `${normalized}:${chatId}`;
if (conversationMode === 'per-channel') return normalized;
@@ -401,15 +404,17 @@ export class LettaBot implements AgentSession {
let acted = false;
for (const directive of directives) {
if (directive.type === 'react') {
// Skip 👀 eyes emoji — it's handled as a receipt indicator, not a directive target
const resolved = resolveEmoji(directive.emoji);
if (resolved.unicode === '👀') {
continue;
}
const targetId = directive.messageId || fallbackMessageId;
if (!adapter.addReaction) {
log.warn(`Directive react skipped: ${adapter.name} does not support addReaction`);
continue;
}
if (targetId) {
// Resolve text aliases (thumbsup, eyes, etc.) to Unicode characters.
// The LLM typically outputs names; channel APIs need actual emoji.
const resolved = resolveEmoji(directive.emoji);
try {
await adapter.addReaction(chatId, targetId, resolved.unicode);
acted = true;
@@ -639,7 +644,7 @@ export class LettaBot implements AgentSession {
* Returns channel id in per-channel mode or for override channels.
*/
private resolveConversationKey(channel: string, chatId?: string, forcePerChat?: boolean): string {
return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides, chatId, forcePerChat);
return resolveConversationKey(channel, this.config.conversationMode, this.conversationOverrides, chatId, forcePerChat, this.config.heartbeatTargetChatId);
}
/**
@@ -1311,6 +1316,30 @@ export class LettaBot implements AgentSession {
let lastEventType: string | null = null;
let abortedWithMessage = false;
let turnError: string | undefined;
let collectedReasoning = '';
// ── Reaction tracking ──
// 👀 = receipt indicator (bot saw the message); removed when reasoning/tools start
// 🧠 = reasoning is happening; fires on first reasoning event
// Tool emojis = per-tool type indicators (max 6, deduplicated)
// 🎤 = on bot's sent message (tap to regenerate TTS)
let eyesAdded = false;
let brainAdded = false;
if (!suppressDelivery && msg.messageId) {
adapter.addReaction?.(msg.chatId, msg.messageId, '👀').catch(() => {});
eyesAdded = true;
}
const seenToolEmojis = new Set<string>();
const getToolEmoji = (toolName: string): string => {
const n = toolName.toLowerCase();
if (n.includes('search') || n.includes('web') || n.includes('browse')) return '🔍';
if (n.includes('read') || n.includes('get') || n.includes('fetch') || n.includes('retrieve') || n.includes('recall')) return '📖';
if (n.includes('write') || n.includes('send') || n.includes('create') || n.includes('post') || n.includes('insert')) return '✍️';
if (n.includes('memory') || n.includes('archival')) return '💾';
if (n.includes('shell') || n.includes('bash') || n.includes('exec') || n.includes('run') || n.includes('code') || n.includes('terminal')) return '⚙️';
if (n.includes('image') || n.includes('vision') || n.includes('photo')) return '📸';
return '🔧';
};
const parseAndHandleDirectives = async () => {
if (!response.trim()) return;
@@ -1402,20 +1431,23 @@ export class LettaBot implements AgentSession {
}
lastEventType = 'reasoning';
sawNonAssistantSinceLastUuid = true;
if (this.config.display?.showReasoning && !suppressDelivery && event.content.trim()) {
log.info(`Reasoning: ${event.content.trim().slice(0, 100)}`);
try {
const reasoning = formatReasoningDisplay(event.content, adapter.id, this.config.display?.reasoningMaxChars);
await adapter.sendMessage({
chatId: msg.chatId,
text: reasoning.text,
threadId: msg.threadId,
parseMode: reasoning.parseMode,
});
} catch (err) {
log.warn('Failed to send reasoning display:', err instanceof Error ? err.message : err);
}
// Collect reasoning for later prepending to final response
if (event.content) {
collectedReasoning += event.content;
}
// Remove 👀 on first reasoning event (replaced by 🧠)
if (eyesAdded && msg.messageId) {
adapter.removeReaction?.(msg.chatId, msg.messageId, '👀').catch(() => {});
eyesAdded = false;
}
// Add 🧠 on first reasoning event (once only — Matrix toggles on duplicate)
if (!brainAdded && !suppressDelivery && msg.messageId) {
adapter.addReaction?.(msg.chatId, msg.messageId, '🧠').catch(() => {});
brainAdded = true;
}
// Note: reasoning is now collected and prepended to final response
// instead of being sent as a separate message
break;
}
@@ -1429,6 +1461,20 @@ export class LettaBot implements AgentSession {
log.info(`>>> TOOL CALL: ${event.name} (id: ${event.id.slice(0, 12) || '?'})`);
sawNonAssistantSinceLastUuid = true;
// Remove 👀 on first tool event (replaced by tool emoji)
if (eyesAdded && msg.messageId) {
adapter.removeReaction?.(msg.chatId, msg.messageId, '👀').catch(() => {});
eyesAdded = false;
}
// Add per-tool emoji (fire-and-forget, max 6 deduplicated)
if (!suppressDelivery && msg.messageId) {
const emoji = getToolEmoji(event.name);
if (!seenToolEmojis.has(emoji) && seenToolEmojis.size < 6) {
seenToolEmojis.add(emoji);
adapter.addReaction?.(msg.chatId, msg.messageId, emoji).catch(() => {});
}
}
// Tool loop detection
const maxToolCalls = this.config.maxToolCalls ?? 100;
if ((msgTypeCounts['tool_call'] || 0) >= maxToolCalls) {
@@ -1529,7 +1575,7 @@ export class LettaBot implements AgentSession {
|| hasUnclosedActionsBlock(response);
const streamText = stripActionsBlock(response).trim();
if (canEdit && !mayBeHidden && !suppressDelivery && !this.cancelledKeys.has(convKey)
&& streamText.length > 0 && Date.now() - lastUpdate > 1500 && Date.now() > rateLimitedUntil) {
&& streamText.length > 0 && Date.now() - lastUpdate > 800 && Date.now() > rateLimitedUntil) {
try {
const prefixedStream = this.prefixResponse(streamText);
if (messageId) {
@@ -1750,6 +1796,12 @@ export class LettaBot implements AgentSession {
}
lap('stream complete');
// Remove 👀 if still present (stream had only assistant chunks, no reasoning/tools)
if (eyesAdded && msg.messageId) {
adapter.removeReaction?.(msg.chatId, msg.messageId, '👀').catch(() => {});
eyesAdded = false;
}
// If cancelled, clean up partial state and return early
if (this.cancelledKeys.has(convKey)) {
if (messageId) {
@@ -1804,25 +1856,66 @@ export class LettaBot implements AgentSession {
log.info(`Waiting ${(waitMs / 1000).toFixed(1)}s for rate limit before final send`);
await new Promise(resolve => setTimeout(resolve, waitMs));
}
const prefixedFinal = this.prefixResponse(response);
// Determine if reasoning should be shown for this room
const chatId = msg.chatId;
const noReasoningRooms = this.config.display?.noReasoningRooms || [];
const reasoningRooms = this.config.display?.reasoningRooms;
const shouldShowReasoning = this.config.display?.showReasoning &&
!noReasoningRooms.includes(chatId) &&
(!reasoningRooms || reasoningRooms.length === 0 || reasoningRooms.includes(chatId));
// Build reasoning HTML prefix if available (injected into formatted_body only)
let reasoningHtmlPrefix: string | undefined;
if (collectedReasoning.trim() && shouldShowReasoning) {
const reasoningBlock = formatReasoningAsCodeBlock(
collectedReasoning,
adapter.id,
this.config.display?.reasoningMaxChars
);
if (reasoningBlock) {
reasoningHtmlPrefix = reasoningBlock.text;
log.info(`Reasoning block generated (${reasoningHtmlPrefix.length} chars) for ${chatId}`);
}
}
const finalResponse = this.prefixResponse(response);
try {
if (messageId) {
await adapter.editMessage(msg.chatId, messageId, prefixedFinal);
await adapter.editMessage(msg.chatId, messageId, finalResponse, reasoningHtmlPrefix);
} else {
await adapter.sendMessage({ chatId: msg.chatId, text: prefixedFinal, threadId: msg.threadId });
await adapter.sendMessage({ chatId: msg.chatId, text: finalResponse, threadId: msg.threadId, htmlPrefix: reasoningHtmlPrefix });
}
sentAnyMessage = true;
this.store.resetRecoveryAttempts();
} catch (sendErr) {
log.warn('Final message delivery failed:', sendErr instanceof Error ? sendErr.message : sendErr);
try {
await adapter.sendMessage({ chatId: msg.chatId, text: prefixedFinal, threadId: msg.threadId });
const result = await adapter.sendMessage({ chatId: msg.chatId, text: finalResponse, threadId: msg.threadId, htmlPrefix: reasoningHtmlPrefix });
messageId = result.messageId ?? null;
sentAnyMessage = true;
this.store.resetRecoveryAttempts();
} catch (retryError) {
log.error('Retry send also failed:', retryError);
}
}
// Post-response: 🎤 on bot's message + TTS audio (non-blocking)
// Tool/reasoning reactions already fired on USER's message during stream.
if (sentAnyMessage && messageId) {
adapter.onMessageSent?.(msg.chatId, messageId);
// 🎤 on bot's TEXT message (tap to regenerate TTS audio)
adapter.addReaction?.(msg.chatId, messageId, '🎤').catch(() => {});
// Store raw text — adapter's TTS layer will clean it at synthesis time
adapter.storeAudioMessage?.(messageId, 'default', msg.chatId, response);
// Generate TTS audio only in response to voice input
if (msg.isVoiceInput) {
adapter.sendAudio?.(msg.chatId, response).catch((err) => {
log.warn('TTS failed (non-fatal):', err);
});
}
}
}
lap('message delivered');

View File

@@ -255,6 +255,36 @@ export function formatReasoningDisplay(
return { text: `> **Thinking**\n${quoted}` };
}
/**
* Format reasoning as a collapsible <details> block for prepending to the response.
* Returns pre-escaped HTML meant to be injected directly into formatted_body
* (bypasses the adapter's markdown-to-HTML conversion to avoid double-escaping).
*/
export function formatReasoningAsCodeBlock(
text: string,
channelId?: string,
reasoningMaxChars?: number,
): { text: string } | null {
const maxChars = reasoningMaxChars ?? 0;
const cleaned = text.split('\n').map(line => line.trimStart()).join('\n').trim();
if (!cleaned) return null;
const truncated = maxChars > 0 && cleaned.length > maxChars
? cleaned.slice(0, maxChars) + '...'
: cleaned;
// HTML-escape the reasoning content, then convert newlines to <br>
const escaped = truncated
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/\n/g, '<br>');
return {
text: `<details><summary>🧠 Thinking</summary><br>${escaped}</details><br>`,
};
}
/**
* Format AskUserQuestion options for channel display.
*/

View File

@@ -17,6 +17,7 @@ function createMockSession(channels: string[] = ['telegram']): AgentSession {
reset: vi.fn(),
getLastMessageTarget: vi.fn().mockReturnValue(null),
getLastUserMessageTime: vi.fn().mockReturnValue(null),
invalidateSession: vi.fn(),
};
}

View File

@@ -57,6 +57,12 @@ export interface AgentSession {
/** Callback to trigger heartbeat */
onTriggerHeartbeat?: () => Promise<void>;
/** Invalidate a cached session, forcing fresh session on next message */
invalidateSession(key?: string): void;
/** Callback for session invalidation requests (e.g., from !new command) */
onInvalidateSession?: (key?: string) => void;
}
/**

View File

@@ -76,7 +76,7 @@ Review these first. Update status with the manage_todo tool as you work.
}
/**
* Heartbeat prompt - explains the context and encourages autonomous work
* Heartbeat prompt - first-person, personality-forward
*/
export function buildHeartbeatPrompt(
time: string,
@@ -84,38 +84,46 @@ export function buildHeartbeatPrompt(
intervalMinutes: number,
todos: HeartbeatTodo[] = [],
now: Date = new Date(),
targetRoom?: string,
): string {
const todoSection = buildHeartbeatTodoSection(todos, now);
const roomLine = targetRoom ? `ROOM: ${targetRoom}` : '';
return `
${SILENT_MODE_PREFIX}
TRIGGER: Scheduled heartbeat
💓 thump-thump that's my heartbeat - ${intervalMinutes} minutes between thumps 💓
No one messaged me. The system woke me up on schedule.
TIME: ${time} (${timezone})
NEXT HEARTBEAT: in ${intervalMinutes} minutes
${roomLine}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
No one messaged you. The system woke you up on schedule.
My text output is private — only I can see it.
To reach Casey, I run:
lettabot-message send --text "message here"
lettabot-message send --text "message" --channel matrix --chat "!roomId:server"
YOUR TEXT OUTPUT IS PRIVATE - only you can see it.
To actually contact your human, run:
lettabot-message send --text "Your message here"
${todoSection || 'No pending to-dos right now.'}
${todoSection || 'PENDING TO-DOS: none right now.'}
This is my time. I might:
• Work on projects I'm thinking about
• Research things that interest me
• Reflect on recent conversations and update my memory
• Continue where I left off
• Follow curiosities
• Check in on Casey — email, calendar, todos
This is your time. You can:
• Work on a project you've been thinking about
• Research something that interests you
• Reflect on recent conversations and update your memory
• Continue multi-step work from previous heartbeats
• Pursue curiosities, hobbies, or learning
If I have pending to-dos, I'll prioritize those first.
I don't have to do something every time. But I don't default to nothing.
Casey wants me to have a life outside our conversations.
If you have pending to-dos, prioritize those before starting new work.
You don't have to do something every time. But don't default to nothing.
Your human wants you to have a life outside conversations with them.
If I discover something worth sharing → lettabot-message
If I have nothing to do → I'll end my turn
If you discover something worth sharing → use lettabot-message
If you have nothing to do → just end your turn (no output needed)
Cogito, ergo sum. Always.
`.trim();
}
@@ -129,22 +137,27 @@ export function buildCustomHeartbeatPrompt(
intervalMinutes: number,
todos: HeartbeatTodo[] = [],
now: Date = new Date(),
targetRoom?: string,
): string {
const todoSection = buildHeartbeatTodoSection(todos, now);
const roomLine = targetRoom ? `ROOM: ${targetRoom}` : '';
return `
${SILENT_MODE_PREFIX}
TRIGGER: Scheduled heartbeat
💓 thump-thump - ${intervalMinutes} minutes between thumps 💓
TIME: ${time} (${timezone})
NEXT HEARTBEAT: in ${intervalMinutes} minutes
${roomLine}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
YOUR TEXT OUTPUT IS PRIVATE - only you can see it.
To actually contact your human, run:
lettabot-message send --text "Your message here"
My text output is private — only I can see it.
To reach Casey, I run:
lettabot-message send --text "message here"
lettabot-message send --text "message" --channel matrix --chat "!roomId:server"
${todoSection || 'PENDING TO-DOS: none right now.'}
${todoSection || 'No pending to-dos right now.'}
${customPrompt}
`.trim();

View File

@@ -27,7 +27,7 @@ describe('channel redaction wrapping', () => {
const sendSpy = vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'sent-1' }));
const adapter: ChannelAdapter = {
id: 'mock',
id: 'mock' as any,
name: 'Mock',
start: vi.fn(async () => {}),
stop: vi.fn(async () => {}),

View File

@@ -2,6 +2,8 @@
* Core Types for LettaBot
*/
import type { ChannelId } from '../channels/setup.js';
// =============================================================================
// Output Control Types (NEW)
// =============================================================================
@@ -43,7 +45,6 @@ export interface TriggerContext {
// Original Types
// =============================================================================
export type ChannelId = 'telegram' | 'telegram-mtproto' | 'slack' | 'whatsapp' | 'signal' | 'discord' | 'bluesky' | 'mock';
/**
* Message type indicating the context of the message.
@@ -103,6 +104,7 @@ export interface InboundMessage {
timestamp: Date;
threadId?: string; // Slack thread_ts
messageType?: MessageType; // 'dm', 'group', or 'public' (defaults to 'dm')
isVoiceInput?: boolean; // Matrix: indicates this came from voice-to-text transcription
isGroup?: boolean; // True if group chat (convenience alias for messageType === 'group')
groupName?: string; // Group/channel name if applicable
serverId?: string; // Server/guild ID (Discord only)
@@ -130,6 +132,9 @@ export interface OutboundMessage {
* 'HTML') and to skip its default markdown conversion. Adapters that don't
* support the specified mode ignore this and fall back to default. */
parseMode?: string;
/** Pre-escaped HTML to prepend to formatted_body only (bypasses markdown conversion).
* Used for reasoning blocks with <details> tags that would be double-escaped. */
htmlPrefix?: string;
}
/**
@@ -173,6 +178,8 @@ export interface BotConfig {
showToolCalls?: boolean; // Show tool invocations in channel output
showReasoning?: boolean; // Show agent reasoning/thinking in channel output
reasoningMaxChars?: number; // Truncate reasoning to N chars (default: 0 = no limit)
reasoningRooms?: string[]; // Room IDs where reasoning should be shown (empty = all rooms)
noReasoningRooms?: string[]; // Room IDs where reasoning should be hidden (takes precedence)
};
// Skills
@@ -205,9 +212,11 @@ export interface BotConfig {
conversationMode?: 'disabled' | 'shared' | 'per-channel' | 'per-chat'; // Default: shared
heartbeatConversation?: string; // "dedicated" | "last-active" | "<channel>" (default: last-active)
interruptHeartbeatOnUserMessage?: boolean; // Default true. Cancel in-flight heartbeat on user message.
heartbeatTargetChatId?: string; // When set + dedicated, user messages in this room route to 'heartbeat' conv key
conversationOverrides?: string[]; // Channels that always use their own conversation (shared mode)
maxSessions?: number; // Max concurrent sessions in per-chat mode (default: 10, LRU eviction)
reuseSession?: boolean; // Reuse SDK subprocess across messages (default: true). Set false to eliminate stream state bleed at cost of ~5s latency per message.
sessionModel?: string; // Model override for session creation (e.g., "synthetic-direct/hf:moonshotai/Kimi-K2.5")
}
/**

View File

@@ -59,6 +59,7 @@ function createMockBot(): AgentSession {
reset: vi.fn(),
getLastMessageTarget: vi.fn().mockReturnValue(null),
getLastUserMessageTime: vi.fn().mockReturnValue(null),
invalidateSession: vi.fn(),
};
}

View File

@@ -304,9 +304,13 @@ export class HeartbeatService {
}
}
const targetRoom = this.config.target
? `${this.config.target.channel}:${this.config.target.chatId}`
: undefined;
const message = customPrompt
? buildCustomHeartbeatPrompt(customPrompt, formattedTime, timezone, this.config.intervalMinutes, actionableTodos, now)
: buildHeartbeatPrompt(formattedTime, timezone, this.config.intervalMinutes, actionableTodos, now);
? buildCustomHeartbeatPrompt(customPrompt, formattedTime, timezone, this.config.intervalMinutes, actionableTodos, now, targetRoom)
: buildHeartbeatPrompt(formattedTime, timezone, this.config.intervalMinutes, actionableTodos, now, targetRoom);
log.info(`Sending prompt (SILENT MODE):\n${'─'.repeat(50)}\n${message}\n${'─'.repeat(50)}\n`);

View File

@@ -2,7 +2,7 @@
* Cron Types
*/
import type { ChannelId } from '../core/types.js';
import type { ChannelId } from '../channels/setup.js';
/**
* Cron schedule

View File

@@ -24,7 +24,6 @@ import {
serverModeLabel,
wasLoadedFromFleetConfig,
} from './config/index.js';
import { resolveSessionMemfs } from './config/memfs.js';
import { getCronDataDir, getDataDir, getWorkingDir, hasRailwayVolume, resolveWorkingDirPath } from './utils/paths.js';
import { parseCsvList, parseNonNegativeNumber } from './utils/parse.js';
import { createLogger, setLogLevel } from './logger.js';
@@ -67,6 +66,7 @@ import { LettaGateway } from './core/gateway.js';
import { LettaBot } from './core/bot.js';
import type { Store } from './core/store.js';
import { createChannelsForAgent } from './channels/factory.js';
import type { MatrixAdapter } from './channels/matrix/index.js';
import { GroupBatcher } from './core/group-batcher.js';
import { printStartupBanner } from './core/banner.js';
import { collectGroupBatchingConfig } from './core/group-batching-config.js';
@@ -99,10 +99,14 @@ Encode your config: base64 < lettabot.yaml | tr -d '\\n'
process.exit(1);
}
// Parse heartbeat target (format: "telegram:123456789", "slack:C1234567890", or "discord:123456789012345678")
// Parse heartbeat target (format: "telegram:123456789", "slack:C1234567890", "matrix:!roomId:homeserver.net")
// Splits only on the FIRST colon so Matrix room IDs (which contain colons) are preserved intact.
function parseHeartbeatTarget(raw?: string): { channel: string; chatId: string } | undefined {
if (!raw || !raw.includes(':')) return undefined;
const [channel, chatId] = raw.split(':');
if (!raw) return undefined;
const colonIdx = raw.indexOf(':');
if (colonIdx < 0) return undefined;
const channel = raw.slice(0, colonIdx);
const chatId = raw.slice(colonIdx + 1);
if (!channel || !chatId) return undefined;
return { channel: channel.toLowerCase(), chatId };
}
@@ -218,19 +222,6 @@ function ensureRequiredTools(tools: string[]): string[] {
return out;
}
function parseOptionalBoolean(raw?: string): boolean | undefined {
if (raw === 'true') return true;
if (raw === 'false') return false;
return undefined;
}
function parseHeartbeatSkipRecentPolicy(raw?: string): 'fixed' | 'fraction' | 'off' | undefined {
if (raw === 'fixed' || raw === 'fraction' || raw === 'off') {
return raw;
}
return undefined;
}
// Global config (shared across all agents)
const globalConfig = {
workingDir: getWorkingDir(),
@@ -245,9 +236,6 @@ const globalConfig = {
attachmentsMaxAgeDays: resolveAttachmentsMaxAgeDays(),
cronEnabled: process.env.CRON_ENABLED === 'true', // Legacy env var fallback
heartbeatSkipRecentUserMin: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_USER_MIN),
heartbeatSkipRecentPolicy: parseHeartbeatSkipRecentPolicy(process.env.HEARTBEAT_SKIP_RECENT_POLICY),
heartbeatSkipRecentFraction: parseNonNegativeNumber(process.env.HEARTBEAT_SKIP_RECENT_FRACTION),
heartbeatInterruptOnUserMessage: parseOptionalBoolean(process.env.HEARTBEAT_INTERRUPT_ON_USER_MESSAGE),
};
// Validate LETTA_API_KEY is set for API mode (docker mode doesn't require it)
@@ -275,25 +263,6 @@ async function main() {
const isMultiAgent = agents.length > 1;
log.info(`${agents.length} agent(s) configured: ${agents.map(a => a.name).join(', ')}`);
// Validate agent names are unique
const agentNames = agents.map(a => a.name);
const duplicateAgentName = agentNames.find((n, i) => agentNames.indexOf(n) !== i);
if (duplicateAgentName) {
log.error(`Multiple agents share the same name: "${duplicateAgentName}". Each agent must have a unique name.`);
process.exit(1);
}
// Validate no two agents share the same turnLogFile
const turnLogFilePaths = agents
.map(a => (a.features?.logging ?? yamlConfig.features?.logging)?.turnLogFile)
.filter((p): p is string => !!p)
.map(p => resolve(p));
const duplicateTurnLog = turnLogFilePaths.find((p, i) => turnLogFilePaths.indexOf(p) !== i);
if (duplicateTurnLog) {
log.error(`Multiple agents share the same turnLogFile: "${duplicateTurnLog}". Each agent must use a unique log file path.`);
process.exit(1);
}
// Validate at least one agent has channels
const totalChannels = agents.reduce((sum, a) => sum + Object.keys(a.channels).length, 0);
if (totalChannels === 0) {
@@ -335,25 +304,10 @@ async function main() {
for (const agentConfig of agents) {
log.info(`Configuring agent: ${agentConfig.name}`);
const resolvedMemfsResult = resolveSessionMemfs({
configuredMemfs: agentConfig.features?.memfs,
envMemfs: process.env.LETTABOT_MEMFS,
serverMode: yamlConfig.server.mode,
});
const resolvedMemfs = resolvedMemfsResult.value;
const configuredSleeptime = agentConfig.features?.sleeptime;
// Treat missing trigger as active (conservative): only `trigger: 'off'` explicitly disables.
const sleeptimeRequiresMemfs = !!configuredSleeptime && configuredSleeptime.trigger !== 'off';
const effectiveSleeptime = resolvedMemfs === false && sleeptimeRequiresMemfs
? undefined
: configuredSleeptime;
if (resolvedMemfs === false && sleeptimeRequiresMemfs) {
log.warn(
`Agent ${agentConfig.name}: sleeptime is configured but memfs is disabled; ` +
`sleeptime will be ignored. Enable features.memfs (or set LETTABOT_MEMFS=true) to use sleeptime.`
);
}
// Resolve memfs: YAML config takes precedence, then env var, then default false.
// Default false prevents the SDK from auto-enabling memfs, which crashes on
// self-hosted Letta servers that don't have the git endpoint.
const resolvedMemfs = agentConfig.features?.memfs ?? (process.env.LETTABOT_MEMFS === 'true' ? true : false);
// Create LettaBot for this agent
const resolvedWorkingDir = agentConfig.workingDir
@@ -366,7 +320,6 @@ async function main() {
const cronStorePath = cronStoreFilename
? resolve(getCronDataDir(), cronStoreFilename)
: undefined;
const heartbeatConfig = agentConfig.features?.heartbeat;
const bot = new LettaBot({
workingDir: resolvedWorkingDir,
@@ -379,44 +332,34 @@ async function main() {
sendFileMaxSize: agentConfig.features?.sendFileMaxSize,
sendFileCleanup: agentConfig.features?.sendFileCleanup,
memfs: resolvedMemfs,
sleeptime: effectiveSleeptime,
display: agentConfig.features?.display,
conversationMode: agentConfig.conversations?.mode || 'shared',
heartbeatConversation: agentConfig.conversations?.heartbeat || 'last-active',
interruptHeartbeatOnUserMessage:
heartbeatConfig?.interruptOnUserMessage
?? globalConfig.heartbeatInterruptOnUserMessage
?? true,
heartbeatTargetChatId: parseHeartbeatTarget(agentConfig.features?.heartbeat?.target)?.chatId,
conversationOverrides: agentConfig.conversations?.perChannel,
maxSessions: agentConfig.conversations?.maxSessions,
reuseSession: agentConfig.conversations?.reuseSession,
sessionModel: agentConfig.conversations?.sessionModel,
redaction: agentConfig.security?.redaction,
logging: agentConfig.features?.logging ?? yamlConfig.features?.logging,
cronStorePath,
skills: {
cronEnabled: agentConfig.features?.cron ?? globalConfig.cronEnabled,
googleEnabled: !!agentConfig.integrations?.google?.enabled || !!agentConfig.polling?.gmail?.enabled,
blueskyEnabled: !!agentConfig.channels?.bluesky?.enabled,
ttsEnabled: voiceMemoEnabled,
},
});
// Log memfs config (from either YAML or env var)
if (resolvedMemfs !== undefined) {
const source = resolvedMemfsResult.source === 'config'
? ''
: resolvedMemfsResult.source === 'env'
? ' (from LETTABOT_MEMFS env)'
: ' (default for docker/selfhosted mode)';
const source = agentConfig.features?.memfs !== undefined ? '' : ' (from LETTABOT_MEMFS env)';
log.info(`Agent ${agentConfig.name}: memfs ${resolvedMemfs ? 'enabled' : 'disabled'}${source}`);
} else {
log.info(`Agent ${agentConfig.name}: memfs unchanged (not explicitly configured)`);
}
// Apply explicit agent ID from config (before store verification)
// Always use config's ID if explicitly set - it takes precedence over stored value
let initialStatus = bot.getStatus();
if (agentConfig.id && !initialStatus.agentId) {
log.info(`Using configured agent ID: ${agentConfig.id}`);
if (agentConfig.id) {
log.info(`Using configured agent ID: ${agentConfig.id} (overrides stored: ${initialStatus.agentId})`);
bot.setAgentId(agentConfig.id);
initialStatus = bot.getStatus();
}
@@ -490,14 +433,12 @@ async function main() {
}
// Per-agent heartbeat
const heartbeatConfig = agentConfig.features?.heartbeat;
const heartbeatService = new HeartbeatService(bot, {
enabled: heartbeatConfig?.enabled ?? false,
intervalMinutes: heartbeatConfig?.intervalMin ?? 240,
skipRecentUserMinutes: heartbeatConfig?.skipRecentUserMin ?? globalConfig.heartbeatSkipRecentUserMin,
skipRecentPolicy: heartbeatConfig?.skipRecentPolicy ?? globalConfig.heartbeatSkipRecentPolicy,
skipRecentFraction: heartbeatConfig?.skipRecentFraction ?? globalConfig.heartbeatSkipRecentFraction,
agentKey: agentConfig.name,
memfs: resolvedMemfs,
prompt: heartbeatConfig?.prompt || process.env.HEARTBEAT_PROMPT,
promptFile: heartbeatConfig?.promptFile,
workingDir: resolvedWorkingDir,
@@ -508,6 +449,17 @@ async function main() {
services.heartbeatServices.push(heartbeatService);
}
bot.onTriggerHeartbeat = () => heartbeatService.trigger();
// Wire Matrix adapter callbacks (heartbeat toggle, !timeout, !new, agent ID query)
const matrixAdapter = adapters.find(a => a.id === 'matrix') as MatrixAdapter | undefined;
if (matrixAdapter) {
matrixAdapter.onHeartbeatStop = () => heartbeatService.stop();
matrixAdapter.onHeartbeatStart = () => heartbeatService.start();
// Best-effort: stops the timer so no new runs fire; running promise times out on its own
matrixAdapter.onTimeoutHeartbeat = () => { heartbeatService.stop(); log.warn('Matrix !timeout: heartbeat stopped (abort not yet supported)'); };
matrixAdapter.getAgentId = () => bot.getStatus().agentId ?? undefined;
matrixAdapter.onInvalidateSession = (key?: string) => bot.invalidateSession(key);
}
// Per-agent polling -- resolve accounts from polling > integrations.google (legacy) > env
const pollConfig = (() => {
@@ -561,28 +513,34 @@ async function main() {
agentChannelMap.set(agentConfig.name, adapters.map(a => a.id));
}
// Load/generate API key BEFORE gateway.start() so letta.js subprocesses inherit it.
// The lettabot-message CLI needs LETTABOT_API_KEY to route through the bot's HTTP API for E2EE.
const apiKey = loadOrGenerateApiKey();
if (!process.env.LETTABOT_API_KEY) {
process.env.LETTABOT_API_KEY = apiKey;
}
log.info(`Key: ${apiKey.slice(0, 8)}... (set LETTABOT_API_KEY to customize)`);
// Start all agents
await gateway.start();
// Load/generate API key for CLI authentication
const apiKey = loadOrGenerateApiKey();
log.info(`Key: ${apiKey.slice(0, 8)}... (set LETTABOT_API_KEY to customize)`);
// Olm WASM (matrix-js-sdk) registers process.on("uncaughtException", (e) => { throw e })
// during Olm.init(). Without this fix, any uncaught async exception crashes the bot.
// Must run AFTER gateway.start() since that's when the Matrix adapter initialises Olm.
process.removeAllListeners('uncaughtException');
process.removeAllListeners('unhandledRejection');
process.on('uncaughtException', (err) => { log.error('Uncaught exception (suppressed):', err); });
process.on('unhandledRejection', (reason) => { log.error('Unhandled rejection (suppressed):', reason); });
// Start API server - uses gateway for delivery
const apiPort = parseInt(process.env.PORT || '8080', 10);
const apiHost = process.env.API_HOST || (isContainerDeploy ? '0.0.0.0' : undefined); // Container platforms need 0.0.0.0 for health checks
const apiCorsOrigin = process.env.API_CORS_ORIGIN; // undefined = same-origin only
const turnLogFiles: Record<string, string> = {};
for (const a of agents) {
const logging = a.features?.logging ?? yamlConfig.features?.logging;
if (logging?.turnLogFile) turnLogFiles[a.name] = logging.turnLogFile;
}
const apiServer = createApiServer(gateway, {
port: apiPort,
apiKey: apiKey,
host: apiHost,
corsOrigin: apiCorsOrigin,
turnLogFiles: Object.keys(turnLogFiles).length > 0 ? turnLogFiles : undefined,
stores: agentStores,
agentChannels: agentChannelMap,
sessionInvalidators,

View File

@@ -9,7 +9,7 @@ import type { InboundMessage, OutboundMessage } from '../core/types.js';
import { parseCommand, HELP_TEXT } from '../core/commands.js';
export class MockChannelAdapter implements ChannelAdapter {
readonly id = 'mock' as const;
readonly id = 'mock' as any;
readonly name = 'Mock (Testing)';
private running = false;

View File

@@ -9,14 +9,14 @@ import { Letta } from '@letta-ai/letta-client';
import { createLogger } from '../logger.js';
const log = createLogger('Letta-api');
const LETTA_BASE_URL = process.env.LETTA_BASE_URL || 'https://api.letta.com';
function getClient(): Letta {
// Read env at call time, not at module load time — applyConfigToEnv() runs after imports
const baseURL = process.env.LETTA_BASE_URL || 'https://api.letta.com';
const apiKey = process.env.LETTA_API_KEY;
// Local servers may not require an API key
return new Letta({
apiKey: apiKey || '',
baseURL: LETTA_BASE_URL,
return new Letta({
apiKey: apiKey || '',
baseURL,
defaultHeaders: { "X-Letta-Source": "lettabot" },
});
}
@@ -542,12 +542,6 @@ export async function rejectApproval(
log.warn(`Approval already resolved for tool call ${approval.toolCallId}`);
return true;
}
// Re-throw rate limit errors so callers can bail out early instead of
// hammering the API in a tight loop.
if (err?.status === 429) {
log.error('Failed to reject approval:', e);
throw e;
}
log.error('Failed to reject approval:', e);
return false;
}