Files
matrix-bridge-legacy/bridge-e2ee.backup.py
2026-03-28 23:50:54 -04:00

3409 lines
131 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Matrix-Letta Bridge with E2EE Support (mautrix-python)
Connects Letta agents to encrypted Matrix rooms.
Uses mautrix-python for full E2EE support including SSSS.
Enhanced with full HTML formatting, emoji support, per-room conversations, and interactive reactions.
HTTP API for MCP Integration:
POST /api/send_message - Send encrypted message to a room
POST /api/read_room - Read decrypted messages from a room
POST /api/react - Send reaction to a message
GET /api/list_rooms - List joined rooms with encryption status
GET /api/health - Health check and status
"""
import os
import sys
import asyncio
import signal
import json
import logging
import requests
import html
import re
import base64
import io
from datetime import datetime, timedelta, timezone
from collections import deque
from pathlib import Path
from dotenv import load_dotenv
import emoji # Emoji shortcode conversion
from PIL import Image
from aiohttp import web
from mautrix.client import Client
from mautrix.api import Path as MatrixPath
from mautrix.client.state_store.memory import MemoryStateStore as BaseMemoryStateStore
from mautrix.crypto import OlmMachine
from mautrix.crypto.attachments import decrypt_attachment
from mautrix.types import (
EventType,
RoomID,
UserID,
DeviceID,
EventID,
MessageType,
TextMessageEventContent,
MediaMessageEventContent, # For images AND audio
EncryptedEvent,
StateEvent,
Membership,
TrustState,
ReactionEventContent,
RelatesTo,
RelationType,
PaginationDirection,
)
from mautrix.util.async_db import Database
from mautrix.util.logging import TraceLogger
from mautrix.errors.request import MUnknownToken
from dataclasses import dataclass
from sqlite_crypto_store import SQLiteCryptoStore
from heartbeat import HeartbeatService, HeartbeatConfig
from debouncer import create_message_debouncer, MessageDebouncer
# Load environment variables
load_dotenv()
# Storage
SESSION_FILE = Path("./session.json")
STORE_PATH = Path("./store")
STORE_PATH.mkdir(exist_ok=True)
# ============================================================
# TOOL VISIBILITY EMOJI MAPPING
# ============================================================
TOOL_EMOJI_MAP = {
# Search operations
"search_web": "🔍",
"web_search": "🔍",
"google_search": "🔍",
"conversation_search": "🔍",
# Read operations
"read_file": "📖",
"read_mail": "📖",
"retrieve_memory": "📖",
"get_calendar": "📖",
"list_emails": "📖",
"list_files": "📖",
# Write operations
"send_email": "✍️",
"save_note": "✍️",
"write_file": "✍️",
"send_message": "✍️",
# Compute/Process
"calculate": "🔧",
"process_image": "🔧",
"analyze": "🔧",
"summarize": "🔧",
# List/Browse
"list": "📋",
"browse": "📋",
# Default
"default": "⚙️"
}
def get_emoji_for_tool(tool_name: str) -> str:
"""Get emoji for a tool name, with partial match support."""
tool_lower = tool_name.lower()
# Exact match first
if tool_lower in TOOL_EMOJI_MAP:
return TOOL_EMOJI_MAP[tool_lower]
# Partial match
for pattern, emoji in TOOL_EMOJI_MAP.items():
if pattern != "default" and pattern in tool_lower:
return emoji
# Category keywords
if any(kw in tool_lower for kw in ["search", "find", "lookup"]):
return "🔍"
elif any(kw in tool_lower for kw in ["read", "get", "retrieve", "fetch"]):
return "📖"
elif any(kw in tool_lower for kw in ["write", "save", "send", "create", "update"]):
return "✍️"
elif any(kw in tool_lower for kw in ["list", "browse", "show"]):
return "📋"
elif any(kw in tool_lower for kw in ["calculate", "process", "analyze", "summarize"]):
return "🔧"
return "⚙️"
def deduplicate_emojis(emojis: list[str]) -> list[str]:
"""Deduplicate emojis while preserving order."""
seen = set()
unique = []
for emoji in emojis:
if emoji not in seen:
seen.add(emoji)
unique.append(emoji)
return unique
# Configure logging (file + stdout)
LOG_FILE = STORE_PATH / "bridge.log"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
log: TraceLogger = logging.getLogger("meridian.bridge")
# Silence noisy loggers (except crypto for debugging)
logging.getLogger("mautrix").setLevel(logging.WARNING)
logging.getLogger("mautrix.crypto").setLevel(logging.DEBUG) # Enable crypto debug
logging.getLogger("aiohttp").setLevel(logging.WARNING)
# Matrix config
MATRIX_HOMESERVER = os.getenv("MATRIX_HOMESERVER")
MATRIX_USER_ID = os.getenv("MATRIX_USER_ID")
MATRIX_PASSWORD = os.getenv("MATRIX_PASSWORD")
# Letta config
LETTA_API_KEY = os.getenv("LETTA_API_KEY")
LETTA_AGENT_ID = os.getenv("LETTA_AGENT_ID")
LETTA_BASE_URL = os.getenv("LETTA_BASE_URL")
# Audio config
STT_URL = os.getenv("STT_URL")
TTS_URL = os.getenv("TTS_URL")
TTS_VOICE = os.getenv("TTS_VOICE", "en-Soother_woman")
ENABLE_AUDIO_RESPONSE = os.getenv("ENABLE_AUDIO_RESPONSE", "1").lower() in ("1", "true", "yes")
# Agent display name for voice messages (defaults to username from MATRIX_USER_ID)
AGENT_DISPLAY_NAME = os.getenv("AGENT_DISPLAY_NAME", "")
if not AGENT_DISPLAY_NAME:
# Extract from @username:server format
MATRIX_USER_ID = os.getenv("MATRIX_USER_ID", "")
if MATRIX_USER_ID.startswith("@"):
AGENT_DISPLAY_NAME = MATRIX_USER_ID.split(":")[0][1:] # Remove @ and server
else:
AGENT_DISPLAY_NAME = "Agent"
# Audio room filtering: If set, only send audio to these room types
# Options: "dm_only" (only direct messages), "all" (all rooms), or comma-separated room IDs
AUDIO_ROOM_FILTER = os.getenv("AUDIO_ROOM_FILTER", "dm_only").lower()
# Database config - uses SQLite by default
DB_URL = os.getenv("BRIDGE_DB_URL", f"sqlite:{STORE_PATH}/bridge.db")
CRYPTO_PICKLE_KEY = os.getenv("CRYPTO_PICKLE_KEY", "meridian-bridge-pickle-key")
# Bridge config
SEND_STARTUP_MESSAGE = os.getenv("SEND_STARTUP_MESSAGE", "0").lower() in ("1", "true", "yes")
# Cross-signing config
MATRIX_RECOVERY_KEY = os.getenv("MATRIX_RECOVERY_KEY") # Recovery key for cross-signing
# HTTP API config (for MCP integration)
API_PORT = int(os.getenv("API_PORT", "8284"))
API_HOST = os.getenv("API_HOST", "127.0.0.1")
ENABLE_API = os.getenv("ENABLE_API", "1").lower() in ("1", "true", "yes")
# Heartbeat config (for autonomous agent activity)
HEARTBEAT_ENABLED = os.getenv("HEARTBEAT_ENABLED", "0").lower() in ("1", "true", "yes")
HEARTBEAT_INTERVAL_MINUTES = int(os.getenv("HEARTBEAT_INTERVAL_MINUTES", "60"))
HEARTBEAT_SKIP_IF_RECENT_MINUTES = int(os.getenv("HEARTBEAT_SKIP_IF_RECENT_MINUTES", "5"))
HEARTBEAT_TARGET_ROOM = os.getenv("HEARTBEAT_TARGET_ROOM") # Optional: specific room for heartbeats
# Message queue
message_queue: deque = deque()
processing_queue = False
# ============================================================
# MATRIX COLOR PALETTE - Full spectrum!
# ============================================================
MATRIX_COLORS = {
# Hot pinks (most salient!)
"hot_pink": "#FF1493",
"deep_pink": "#FF1493",
"shocking_pink": "#FC0FC0",
"fuchsia": "#FF00FF",
"magenta": "#FF00FF",
"rose": "#FF007F",
"pink": "#FFC0CB",
"light_pink": "#FFB6C1",
# Purples (for transcripts)
"purple": "#800080",
"dark_purple": "#663399",
"rebecca_purple": "#663399",
"medium_purple": "#9370DB",
"blue_violet": "#8A2BE2",
"dark_violet": "#9400D3",
"plum": "#DDA0DD",
"lavender": "#E6E6FA",
# Blues (for information)
"blue": "#0000FF",
"dark_blue": "#00008B",
"medium_blue": "#0000CD",
"royal_blue": "#4169E1",
"steel_blue": "#4682B4",
"sky_blue": "#87CEEB",
"light_blue": "#ADD8E6",
"cyan": "#00FFFF",
# Reds (for warnings/alerts)
"red": "#FF0000",
"dark_red": "#8B0000",
"crimson": "#DC143C",
"firebrick": "#B22222",
"salmon": "#FA8072",
"coral": "#FF7F50",
# Oranges (for attention)
"orange": "#FFA500",
"dark_orange": "#FF8C00",
"peach": "#FFDAB9",
# Yellows (for highlights)
"yellow": "#FFFF00",
"gold": "#FFD700",
"lemon": "#FFFACD",
# Greens (for success/health)
"green": "#008000",
"dark_green": "#006400",
"lime": "#00FF00",
"lime_green": "#32CD32",
"pale_green": "#98FB98",
# Browns (for earthy/transcript)
"brown": "#A52A2A",
"saddle_brown": "#8B4513",
"sienna": "#A0522D",
"chocolate": "#D2691E",
"tan": "#D2B48C",
"beige": "#F5F5DC",
# Grays (for neutral/code blocks)
"black": "#000000",
"dark_gray": "#696969",
"gray": "#808080",
"light_gray": "#D3D3D3",
"gainsboro": "#DCDCDC",
"silver": "#C0C0C0",
"white": "#FFFFFF",
# Special salience colors
"salient": "#FF1493", # Hot pink for most salient
"critical": "#FF0000", # Red for critical
"important": "#FFA500", # Orange for important
"neutral": "#808080", # Gray for neutral
}
# ============================================================
# CHROMATOPHORE SALIENCE SYSTEM
# ============================================================
# Chromatophores automatically detect salient words and color them based on
# emotional/attentional weight. Use with enable_chromatophores=True in format_html()
CHROMATOPHORE_PATTERNS = {
"critical": {
"keywords": ["emergency", "danger", "threat", "severe", "critical", "catastrophic", "disaster", "urgent", "immediate", "cannot wait"],
"color": "red",
"weight": 4 # Highest salience
},
"salient": {
"keywords": ["fascinating", "interesting", "important", "love", "hate", "fear", "joy", "sadness", "anger", "surprise", "wonder", "amazing", "incredible", "profound", "stunning", "breathtaking"],
"color": "hot_pink",
"weight": 3 # High salience
},
"important": {
"keywords": ["should", "must", "need", "priority", "asap", "deadline", "required", "necessary", "essential", "crucial"],
"color": "orange",
"weight": 2 # Medium salience
},
"information": {
"keywords": ["is", "was", "will", "could", "might", "may", "perhaps", "possible", "maybe", "likely", "probably"],
"color": "blue",
"weight": 1 # Low salience
},
"neutral": {
"keywords": ["the", "a", "an", "and", "or", "but", "if", "then", "so", "because", "while", "when"],
"color": "gray",
"weight": 0 # No salience
}
}
# ============================================================
# HTML FORMATTING
# ============================================================
# Emoji alias mapping - converts common shortcodes to ones the emoji library recognizes
EMOJI_ALIASES = {
# Hearts
":heart:": ":red_heart:",
":hearts:": ":red_heart:",
":blue_heart:": ":blue_heart:", # already works
# Thumbs
":thumbsup:": ":thumbs_up:",
":+1:": ":thumbs_up:",
":thumbsdown:": ":thumbs_down:",
":-1:": ":thumbs_down:",
# Faces
":smile:": ":grinning_face:",
":grin:": ":beaming_face_with_smiling_eyes:",
":joy:": ":face_with_tears_of_joy:",
":sob:": ":loudly_crying_face:",
":wink:": ":winking_face:",
":thinking:": ":thinking_face:",
":sunglasses:": ":smiling_face_with_sunglasses:",
":rage:": ":enraged_face:",
":scream:": ":face_screaming_in_fear:",
":sleepy:": ":sleepy_face:",
# Gestures
":wave:": ":waving_hand:",
":clap:": ":clapping_hands:",
":pray:": ":folded_hands:",
":muscle:": ":flexed_biceps:",
":ok_hand:": ":OK_hand:",
":point_up:": ":index_pointing_up:",
":point_down:": ":backhand_index_pointing_down:",
":point_left:": ":backhand_index_pointing_left:",
":point_right:": ":backhand_index_pointing_right:",
# Symbols
":100:": ":hundred_points:",
":check:": ":check_mark:",
":white_check_mark:": ":check_mark_button:",
":x:": ":cross_mark:",
":warning:": ":warning:", # already works
":sparkle:": ":sparkles:",
":tada:": ":party_popper:",
":confetti_ball:": ":confetti_ball:",
":boom:": ":collision:",
":zap:": ":high_voltage:",
":bulb:": ":light_bulb:",
# Nature
":sunny:": ":sun:",
":cloud:": ":cloud:", # already works
":snowflake:": ":snowflake:", # already works
":rainbow:": ":rainbow:", # already works
# Objects
":coffee:": ":hot_beverage:",
":beer:": ":beer_mug:",
":pizza:": ":pizza:", # already works
":cake:": ":shortcake:",
# Animals
":cat:": ":cat_face:",
":dog:": ":dog_face:",
":unicorn:": ":unicorn:", # already works
# Tech
":computer:": ":laptop:",
":iphone:": ":mobile_phone:",
":robot:": ":robot:", # already works
}
# Extended emote map for direct shortcode conversion
# Use with convert_emotes() for direct Unicode emoji output
EMOTE_MAP = {
# Hearts
":heart:": "❤️",
":hearts:": "💕",
":heartbeat:": "💓",
":sparkling_heart:": "💖",
":heartpulse:": "💗",
":blue_heart:": "💙",
":green_heart:": "💚",
":yellow_heart:": "💛",
":purple_heart:": "💜",
":black_heart:": "🖤",
# Faces
":smiley:": "😊",
":smile:": "😄",
":grinning:": "😀",
":blush:": "😊",
":wink:": "😉",
":thinking:": "🤔",
":exploding_head:": "🤯",
":anguished:": "😧",
":cry:": "😢",
":sob:": "😭",
":joy:": "😂",
":laughing:": "😆",
# Gestures
":thumbsup:": "👍",
":thumbsdown:": "👎",
":wave:": "👋",
":ok_hand:": "👌",
":pray:": "🙏",
":clap:": "👏",
":raised_hands:": "🙌",
":hugs:": "🤗",
# Eyes/looking
":eyes:": "👀",
":eye:": "👁️",
":see_no_evil:": "🙈",
":speak_no_evil:": "🙊",
":hear_no_evil:": "🙉",
":sunglasses:": "😎",
# Nature
":seedling:": "🌱",
":herb:": "🌿",
":flower:": "🌸",
":sunflower:": "🌻",
":rose:": "🌹",
":sun:": "☀️",
":moon:": "🌙",
":star:": "",
":cloud:": "☁️",
# Tech
":keyboard:": "⌨️",
":phone:": "📱",
":camera:": "📷",
":video_camera:": "📹",
":tv:": "📺",
":speaker:": "🔊",
":headphones:": "🎧",
":floppy_disk:": "💾",
# Symbols
":exclamation:": "",
":information_source:": "",
":question:": "",
":grey_question:": "",
":heavy_check_mark:": "",
":heavy_exclamation_mark:": "",
":x:": "",
":heavy_plus_sign:": "",
":heavy_minus_sign:": "",
}
def convert_emotes(text: str) -> str:
"""Convert emote shortcodes directly to Unicode emoji."""
for shortcode, emoji_char in EMOTE_MAP.items():
text = text.replace(shortcode, emoji_char)
return text
def normalize_emoji_shortcodes(text: str) -> str:
"""Convert common emoji shortcodes to the format the emoji library expects."""
for alias, canonical in EMOJI_ALIASES.items():
text = text.replace(alias, canonical)
return text
def format_html(text: str) -> tuple[str, str]:
"""
Format text as Matrix HTML with full markdown and emoji support.
This is the main entry point for all formatting.
Args:
text: The response text from Letta (may contain markdown, emoji shortcodes, or HTML)
Returns:
tuple: (plain_text, html_body)
- plain_text: Plain text version without HTML tags
- html_body: Formatted HTML with Matrix extensions and emojis
"""
try:
import markdown as md
# FIX: Strip leading/trailing whitespace to prevent false code blocks
# Markdown interprets leading indentation as code blocks
text = text.strip()
# Convert emoji shortcodes first (e.g., :heart: → ❤️)
# First normalize common aliases, then let the library convert them
text = normalize_emoji_shortcodes(text)
text = emoji.emojize(text, language='en')
# If text already looks like HTML (contains tags), process extensions then pass through
if text.strip().startswith("<") and ">" in text:
# Convert Matrix extensions in HTML content (colors, spoilers)
# First process color tags {color_name|text}
def replace_color_html(match):
color_name = match.group(1)
content = match.group(2)
if color_name in MATRIX_COLORS:
hex_color = MATRIX_COLORS[color_name]
elif re.match(r'^#[0-9A-Fa-f]{6}$', color_name):
hex_color = color_name
else:
return match.group(0)
return f'<font color="{hex_color}" data-mx-color="{hex_color}">{content}</font>'
text = re.sub(r'\{([a-zA-Z0-9_#]+)\|([^}]+)\}', replace_color_html, text)
# Convert spoiler tags ||text||
# Handle both inline and multi-line spoilers
# Non-greedy doesn't work well with HTML tags, use greedy but stop at ||
def replace_spoiler(match):
content = match.group(1)
# Handle edge case of empty spoiler
if not content.strip():
return '||'
return f'<span data-mx-spoiler>{content}</span>'
# First, try matching ||content|| greedily (handles multi-line spans)
# But stop at the first || after the content
text = re.sub(r'\|\|([^|]*?)\|\|', replace_spoiler, text, flags=re.DOTALL)
# Let AI's HTML through directly (with extensions applied)
plain_text = html.unescape(re.sub(r'<[^>]+>', '', text))
return plain_text.strip(), text
# Convert markdown headers to bold text for conversational flow
# Headers are too visually jarring for chat messages
# # Header → **Header** (preserves emphasis without giant text)
text = re.sub(r'^#{1,6}\s+(.+)$', r'**\1**', text, flags=re.MULTILINE)
# Apply Matrix extensions BEFORE markdown conversion
text = apply_matrix_extensions(text)
# Convert markdown to HTML using the proper markdown library
# Note: Don't use nl2br - it breaks list detection
formatted_html = md.markdown(
text,
extensions=['extra', 'sane_lists'],
output_format='html'
)
# FIX: Handle false code blocks created by markdown when text shouldn't be code
# Markdown can incorrectly interpret inline backticks or certain patterns as code
# We only want to preserve code blocks that were explicitly fenced with ```
has_fenced_blocks = '```' in text
if not has_fenced_blocks:
# Check for false fenced code blocks (entire content wrapped OR partial at start)
# The regex captures from <pre><code> to </code></pre>, with newline matching
if formatted_html.startswith('<pre><code>'):
import html as html_lib
# Match code block with attributes, allow newlines, but stop at first </code></pre>
pre_match = re.search(r'<pre><code[^>]*?>(.*?)(?:</code></pre>|</code>)', formatted_html, re.DOTALL)
if pre_match:
extracted = html_lib.unescape(pre_match.group(1))
# This is likely a false code block - re-process it as normal markdown
# Strip the leading whitespace that caused the code block interpretation
extracted = extracted.lstrip()
re_formatted = md.markdown(
extracted,
extensions=['extra', 'sane_lists'],
output_format='html'
)
# Replace the false code block with properly formatted HTML
formatted_html = re.sub(
r'<pre><code[^>]*?>.*?</code></pre>',
re_formatted,
formatted_html,
count=1,
flags=re.DOTALL
)
# Check for false inline code at the start (partial wrapping)
elif formatted_html.startswith('<code>'):
import html as html_lib
# Match <code>...</code> at start, capture the rest
code_match = re.match(r'<code>(.*?)</code>(.*)', formatted_html, re.DOTALL)
if code_match:
extracted = html_lib.unescape(code_match.group(1))
rest_content = code_match.group(2)
# If the "code" contains markdown formatting like **, it's false - unwrap it
# Re-format the content as proper markdown HTML
extracted_formatted = md.markdown(
extracted,
extensions=['extra', 'sane_lists'],
output_format='html'
)
formatted_html = extracted_formatted + rest_content
# Enhance HTML with Matrix-specific styling
formatted_html = enhance_html(formatted_html)
# Apply chromatophore salience highlighting (optional)
# This adds hot pink to salient keywords like "fascinating", "critical", etc.
# Note: Chromatophores are applied AFTER HTML is fully generated
formatted_html = apply_chromatophores(formatted_html, use_hot_pink=False)
# Generate plain text by stripping HTML (already has emojis)
plain_text = html.unescape(re.sub(r'<[^>]+>', '', formatted_html))
plain_text = plain_text.strip()
return plain_text, formatted_html
except Exception as e:
log.warning(f"HTML formatting failed: {e}, using plain text")
# Still try emoji conversion
return emoji.emojize(text, language='en'), emoji.emojize(text, language='en')
def apply_matrix_extensions(text: str) -> str:
"""
Apply Matrix-specific formatting extensions to markdown text.
Extensions:
- Spoilers: ||text|| → <span data-mx-spoiler>text</span>
- Colored text: {color|text} or {#hex|text} → <font color="..." data-mx-color="...">text</font>
- Supports named colors from MATRIX_COLORS palette
Args:
text: Markdown text with extensions
Returns:
Text with extensions applied as HTML tags
"""
# Spoilers: ||text||
# Handle multi-line spoilers that span paragraphs
text = re.sub(r'\|\|(.+?)\|\|', r'<span data-mx-spoiler>\1</span>', text, flags=re.DOTALL)
# Colored text: {color|text}
def replace_color(match: re.Match) -> str:
color_name = match.group(1)
content = match.group(2)
# Resolve color name to hex
if color_name in MATRIX_COLORS:
hex_color = MATRIX_COLORS[color_name]
elif re.match(r'^#[0-9A-Fa-f]{6}$', color_name):
hex_color = color_name
elif re.match(r'^[a-zA-Z]+$', color_name):
# Try basic HTML color names as fallback
return match.group(0)
else:
return match.group(0)
# Include both color and data-mx-color for Matrix rendering
return f'<font color="{hex_color}" data-mx-color="{hex_color}">{content}</font>'
text = re.sub(r'\{([a-zA-Z0-9_#]+)\|([^}]+)\}', replace_color, text)
return text
def enhance_html(html: str) -> str:
"""
Enhance HTML with Matrix-specific styling improvements.
- Add GitHub theme colors to code blocks
- Add purple border to blockquotes (for Matrix styling)
- Ensure proper HTML structure
- Add data-mx-bg-color for code backgrounds
Args:
html: HTML from markdown formatter
Returns:
Enhanced HTML
"""
# Wrap code blocks with GitHub theme colors
# Use background color for proper code block rendering
html = re.sub(
r'<pre><code>(.*?)</code></pre>',
r'<pre><code data-mx-bg-color="#f6f8fa" data-mx-color="#24292e">\1</code></pre>',
html,
flags=re.DOTALL
)
# Add purple border color to blockquotes (Matrix styling)
# This makes quotes stand out with a nice purple left border
html = re.sub(
r'<blockquote>',
r'<blockquote data-mx-border-color="#800080" style="border-left: 4px solid #800080; padding-left: 10px; margin: 10px 0;">',
html
)
return html
def apply_chromatophores(text: str, use_hot_pink: bool = True) -> str:
"""
Apply chromotophore salience highlighting to HTML content.
Chromatophores automatically detect salient words and color them
based on emotional/attentional weight.
Args:
text: HTML content to enhance
use_hot_pink: If True, override 'salient' keywords with hot pink (#FF1493)
Returns:
HTML with salient words color-highlighted
"""
result = text
# Apply patterns in order of priority (critical first, then salient, etc.)
for pattern_name, pattern_data in sorted(
CHROMATOPHORE_PATTERNS.items(),
key=lambda x: x[1]["weight"],
reverse=True
):
# Get the base color from MATRIX_COLORS
base_color = MATRIX_COLORS.get(pattern_data["color"])
# Override salient with hot pink if requested (for maximum attention)
if pattern_name == "salient" and use_hot_pink:
hex_color = MATRIX_COLORS["hot_pink"]
else:
hex_color = base_color
if not hex_color:
continue
for keyword in pattern_data["keywords"]:
# Use word boundaries to avoid partial matches
result = re.sub(
rf'\b({re.escape(keyword)})\b',
f'<font color="{hex_color}" data-mx-color="{hex_color}">\\1</font>',
result,
flags=re.IGNORECASE
)
return result
def format_transcript_header(label: str, preview: str, color: str = "purple") -> str:
"""
Format transcript header with purple blockquote styling.
Examples:
format_transcript_header("👁️ Ani saw:", "ICE raid footage from Monday")
Args:
label: Header label (e.g., "👁️ Ani saw:", "🎤 Ani heard:")
preview: Preview text
color: Color name from MATRIX_COLORS (default: purple)
Returns:
HTML-formatted transcript header blockquote
"""
border_color = MATRIX_COLORS.get(color, MATRIX_COLORS["purple"])
# Convert emotes in label and preview
label = convert_emotes(label)
preview = convert_emotes(preview)
return f'''<blockquote data-mx-border-color="{border_color}" style="border-left: 4px solid {border_color}; padding-left: 10px; margin: 5px 0; color: {border_color};">
<strong>{label}</strong> <em>{preview}</em>
</blockquote>'''
def format_full_transcript(label: str, full_text: str, color: str = "purple") -> str:
"""
Format full transcript with header and body in blockquote.
Examples:
format_full_transcript("📝 Ani said (voice):", "The full transcript here...")
Args:
label: Header label (e.g., "📝 Ani said:")
full_text: Full transcript text
color: Color name from MATRIX_COLORS (default: purple)
Returns:
HTML-formatted full transcript blockquote
"""
border_color = MATRIX_COLORS.get(color, MATRIX_COLORS["purple"])
# Convert emotes
label = convert_emotes(label)
full_text = convert_emotes(full_text)
return f'''<blockquote data-mx-border-color="{border_color}" style="border-left: 4px solid {border_color}; padding-left: 10px; margin: 10px 0; color: {border_color};">
<strong>{label}</strong><br/><br/>
{full_text}
</blockquote>'''
def get_formatting_prompt() -> str:
"""
Generate Matrix formatting instructions for Letta/LLM prompts.
Returns:
String with formatting instructions to include in system prompts
"""
return """
You can use Matrix HTML formatting in your responses:
### Colors (Use {hot_pink|...} for what fascinates you or stands out!)
{hot_pink|text} - Most salient, attention-grabbing (#FF1493)
{purple|text} - For transcripts (#800080)
{blue|text} - Information (#0000FF)
{red|text} - Critical alerts (#FF0000)
{orange|text} - Important (#FFA500)
{green|text} - Success (#008000)
### Emote Shortcodes (for emotional texture)
:heart: → ❤️, :eyes: → 👀, :warning: → ⚠️, :thinking: → 🤔
:exploding_head: → 🤯, :cry: → 😢, :sob: → 😭, :joy: → 😂
### Spoilers
||hidden text|| - User clicks to reveal
### Code Blocks
```python
code here
```
### Transcript Templates (for audio/image responses)
Use these purple blockquotes:
<blockquote data-mx-border-color="#800080" style="border-left: 4px solid #800080;">
<strong>👁️ Ani saw:</strong> <em>preview</em>
</blockquote>
"""
# ============================================================
# SESSION HELPERS
# ============================================================
def save_session(user_id: str, device_id: str, access_token: str, homeserver: str):
"""Save session for persistence across restarts"""
session = {
"user_id": user_id,
"device_id": device_id,
"access_token": access_token,
"homeserver": homeserver,
}
SESSION_FILE.write_text(json.dumps(session, indent=2))
log.info(f"Session saved (device: {device_id})")
def load_session() -> dict | None:
"""Load saved session if exists"""
if SESSION_FILE.exists():
return json.loads(SESSION_FILE.read_text())
return None
# ============================================================
# MATRIX MEDIA HELPERS
# ============================================================
async def download_matrix_audio(
client: Client,
mxc_url: str,
encryption_info: dict | None = None
) -> bytes:
"""
Download and optionally decrypt audio from Matrix media server.
Args:
client: Matrix client instance
mxc_url: MXC URL (mxc://server/media_id)
encryption_info: Optional dict with 'key', 'iv', 'hashes' for E2EE
Returns:
Audio bytes
"""
if not mxc_url or not mxc_url.startswith("mxc://"):
raise ValueError(f"Invalid MXC URL: {mxc_url}")
# Download from media server - mautrix takes full URL
log.info(f"Downloading audio: {mxc_url[:60]}...")
audio_data = await client.download_media(mxc_url)
log.info(f"Downloaded {len(audio_data)} bytes (hex preview: {audio_data[:8].hex()})")
# Decrypt if E2EE
if encryption_info:
key_b64 = encryption_info.get("key", {}).get("k")
iv = encryption_info.get("iv")
hash_b64 = encryption_info.get("hashes", {}).get("sha256")
if key_b64 and iv and hash_b64:
log.info("Decrypting E2EE audio...")
audio_data = decrypt_attachment(
ciphertext=audio_data,
key=key_b64,
hash=hash_b64,
iv=iv
)
log.info(f"Decrypted audio: {len(audio_data)} bytes")
return audio_data
# ============================================================
# LETTA CLIENT (Conversations API)
# ============================================================
async def process_matrix_image(
client: Client,
mxc_url: str,
encryption_info: dict | None = None
) -> bytes:
"""
Download and optionally decrypt a Matrix image.
Args:
client: Matrix client instance
mxc_url: MXC URL (mxc://server/media_id)
encryption_info: Optional dict with 'key', 'iv', 'hashes' for E2EE
Returns:
Image bytes
"""
if not mxc_url or not mxc_url.startswith("mxc://"):
raise ValueError(f"Invalid MXC URL: {mxc_url}")
# Download from media server - mautrix takes full URL
log.info(f"Downloading image: {mxc_url[:60]}...")
image_data = await client.download_media(mxc_url)
log.info(f"Downloaded {len(image_data)} bytes")
# Decrypt if E2EE
if encryption_info:
key_b64 = encryption_info.get("key", {}).get("k")
iv = encryption_info.get("iv")
hash_b64 = encryption_info.get("hashes", {}).get("sha256")
if key_b64 and iv and hash_b64:
log.info("Decrypting E2EE image...")
image_data = decrypt_attachment(
ciphertext=image_data,
key=key_b64,
hash=hash_b64,
iv=iv
)
log.info("Decrypted image successfully")
return image_data
def prepare_image_for_letta(image_data: bytes, max_size: int = 2000) -> bytes:
"""
Prepare image for sending to Letta - resize and convert to JPEG.
Args:
image_data: Raw image bytes
max_size: Maximum dimension in pixels
Returns:
Processed image bytes (JPEG format)
"""
# Open image with PIL
img = Image.open(io.BytesIO(image_data))
# Resize if too large (Letta/LLM APIs often have size limits)
if img.width > max_size or img.height > max_size:
log.info(f"Resizing image from {img.width}x{img.height} to fit {max_size}px")
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to RGB (removes alpha channel and ensures JPEG compatibility)
if img.mode in ("RGBA", "LA"):
# Create white background for transparent images
background = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "LA":
img = img.convert("RGB")
background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
img = background
elif img.mode != "RGB":
img = img.convert("RGB")
# Convert to bytes
output = io.BytesIO()
img.save(output, format="JPEG", quality=85)
return output.getvalue()
# ============================================================
# AUDIO HELPERS
# ============================================================
def transcribe_audio(audio_data: bytes, audio_format: str = "mp3") -> str:
"""
Transcribe audio using STT (Faster-Whisper).
Args:
audio_data: Raw audio bytes
audio_format: Audio format (e.g., "mp3", "m4a", "ogg")
Returns:
Transcribed text
"""
if not STT_URL:
return "[STT not configured]"
try:
import io
log.info(f"STT: {len(audio_data)} bytes, format={audio_format}")
# Log first 20 bytes in hex to see if data is valid
hex_preview = audio_data[:20].hex() if audio_data else "empty"
log.info(f"STT: hex preview: {hex_preview}")
files = {"audio": (f"audio.{audio_format}", io.BytesIO(audio_data), f"audio/{audio_format}")}
response = requests.post(f"{STT_URL}/transcribe", files=files, timeout=30)
response.raise_for_status()
result = response.json()
transcription = result.get("text", "").strip()
log.info(f"STT transcribed: '{transcription[:50]}...'")
return transcription if transcription else "[No speech detected]"
except Exception as e:
log.error(f"STT transcription failed: {e}")
return "[Transcription failed]"
# ============================================================
# PRONUNCIATION DICTIONARY FOR TTS
# ============================================================
# Words that need phonetic pronunciation fixes
# Maps: wrong_pronunciation → correct_pronunciation
# This is applied before sending text to TTS.
PRONUNCIATION_MAP = {
# Names
"Xzaviar": "Zay-Vee-are", # "Xzaviar" → "Zay-vee-are"
"xzaviar": "Zay-Vee-are",
"Jean Luc": "Zhan-Look", # "Jean Luc" → "Zhan-Look"
"jean luc": "Zhan-Look",
"Sebastian": "Se-BASS-chen", # "Sebastian" → "Se-BASS-chen"
"sebastian": "Se-BASS-chen",
}
# Feel free to add more entries as needed
# Example: "Dejah": "DEE-jah"
# Example: "Quique": "KEY-kay"
def apply_pronunciation_fixes(text: str) -> str:
"""
Apply pronunciation fixes to text before TTS processing.
This replaces words with their phonetically spelled versions
to help TTS pronounce them correctly.
Args:
text: The text to process
Returns:
Text with pronunciation fixes applied
"""
result = text
# Apply each replacement
for wrong, right in PRONUNCIATION_MAP.items():
# Use word boundaries to avoid replacing partial matches
# e.g., don't replace "Xzaviar" inside "Xzaviar123"
result = re.sub(r'\b' + re.escape(wrong) + r'\b', right, result, flags=re.IGNORECASE)
return result
def synthesize_speech(text: str) -> bytes:
"""
Synthesize speech using TTS (VibeVoice).
Args:
text: Text to synthesize
Returns:
Audio bytes (MP3 format)
"""
if not TTS_URL or not ENABLE_AUDIO_RESPONSE:
return b""
# Clean text: remove markdown formatting and most emojis
import re
cleaned_text = text
# Remove HTML tags (but keep the content inside)
# Strip tags like <b>, <i>, <strong>, <em>, <br>, <p>, etc.
cleaned_text = re.sub(r'<[^>]+>', '', cleaned_text)
# Remove markdown formatting
cleaned_text = re.sub(r'\*\*(.+?)\*\*', r'\1', cleaned_text)
cleaned_text = re.sub(r'\*(.+?)\*', r'\1', cleaned_text)
cleaned_text = re.sub(r'`(.+?)`', r'\1', cleaned_text)
cleaned_text = re.sub(r'```\n?[\s\S]*?```', '', cleaned_text) # Code blocks
# Strip emojis (but keep allowed ones: ✨ sparkles, 🎤 microphone)
# Unicode emoji ranges covering most emoji characters
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags
"\U00002702-\U000027B0" # Dingbats
"\U000024C2-\U0001F251" # Enclosed characters
"\U0001F900-\U0001FAFF" # Supplemental Symbols and Pictographs
"]+",
flags=re.UNICODE
)
# First preserve special emojis we want to keep
sparkles_marker = "__SPARKLE__"
mic_marker = "__MIC__"
# Replace kept emojis with markers before stripping
cleaned_text = cleaned_text.replace("", sparkles_marker)
cleaned_text = cleaned_text.replace("🎤", mic_marker)
# Strip all other emojis
cleaned_text = emoji_pattern.sub('', cleaned_text)
# Restore kept emojis (with proper spacing)
cleaned_text = cleaned_text.replace(sparkles_marker, "")
cleaned_text = cleaned_text.replace(mic_marker, "🎤")
# Clean up whitespace
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
# Apply pronunciation fixes (names, etc.)
cleaned_text = apply_pronunciation_fixes(cleaned_text)
try:
payload = {
"input": cleaned_text,
"voice": TTS_VOICE,
"model": "vibevoice-v1"
}
log.info(f"TTS: {len(cleaned_text)} chars -> synthesizing...")
response = requests.post(f"{TTS_URL}/audio/speech", json=payload, timeout=300)
response.raise_for_status()
audio_bytes = response.content
log.info(f"TTS: {len(audio_bytes)} bytes")
return audio_bytes
except Exception as e:
log.error(f"TTS synthesis failed: {e}")
return b""
# ============================================================
# LETTA CLIENT (Conversations API)
# ============================================================
@dataclass
class LettaResponse:
"""Response from Letta including parsed tools."""
assistant_text: str | None
status: str # SUCCESS, BUSY, ERROR
step_ids: list[str]
tool_calls: list[dict] # Parsed tool info
tool_results: list[dict] # Tool execution results
reasoning_present: bool # Whether reasoning was used
errors: list[str] # Any error messages
def send_to_letta(
message_text: str,
conversation_id: str,
images: list[bytes] | None = None
) -> LettaResponse:
"""
Send message to Letta agent via Conversations API. Returns parsed LettaResponse.
Always uses the conversations endpoint - no agent endpoint fallback.
Args:
message_text: Text message to send
conversation_id: Conversation ID for per-room isolation (required)
images: Optional list of image bytes to include as multimodal input
Returns:
LettaResponse: Parsed response including assistant text, status, step IDs,
tool calls, tool results, reasoning presence, and errors.
"""
# Always use conversation endpoint
url = f"{LETTA_BASE_URL}/conversations/{conversation_id}/messages"
headers = {"Content-Type": "application/json"}
if LETTA_API_KEY:
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
# Build payload - multimodal if images provided
if images:
# Letta's multimodal format (NOT OpenAI-style!)
# See: Letta-Matrix-datasample/src/matrix/file_handler.py
content = [{"type": "text", "text": message_text}]
for img in images:
# Detect image format from magic bytes
img_format = "image/jpeg"
magic = img[:4]
if magic[:3] == b'PNG':
img_format = "image/png"
elif magic[:4] == b'GIF8':
img_format = "image/gif"
elif magic[:4] == b'RIFF' and img[8:12] == b'WEBP':
img_format = "image/webp"
b64_img = base64.b64encode(img).decode("utf-8")
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": img_format,
"data": b64_img
}
})
payload = {"messages": [{"role": "user", "content": content}]}
else:
# Simple text input (Letta's expected format for conversations)
payload = {"input": message_text}
try:
response = requests.post(url, json=payload, headers=headers, timeout=1000)
if response.status_code == 409:
return LettaResponse(None, "BUSY", [], [], [], False, [])
response.raise_for_status()
# Parse Server-Sent Events format: "data: {...}\ndata: [DONE]"
messages = []
for line in response.text.split('\n'):
line = line.strip()
if line.startswith('data: '):
content = line[6:] # Remove "data: " prefix
if content == '[DONE]':
break
if content:
msg = json.loads(content)
messages.append(msg)
data = {"messages": messages}
if not data or not data.get("messages"):
return LettaResponse(
"I received your message but got no response from Letta.",
"ERROR", [], [], [], False, []
)
# Log the raw response to see what Letta actually returned
message_types = [msg.get("message_type", msg.get("type", "unknown")) for msg in data.get("messages", [])]
log.info(f"[Letta] Received {len(data.get('messages', []))} messages with types: {message_types}")
# Log first 3 messages (if any) to see structure
for i, msg in enumerate(data.get("messages", [])[:3]):
log.debug(f"[Letta] Message {i}: {json.dumps(msg, default=str)[:500]}...")
# Debug: log the full response structure
log.debug(f"Letta response keys: {list(data.keys())}")
# Extract assistant messages, step IDs, tool calls, tool results, errors, and reasoning
assistant_messages = []
tool_calls = []
tool_results = []
errors = []
step_ids = set()
reasoning_present = False
# Check for step_id at top level
if data.get("step_id"):
step_ids.add(data.get("step_id"))
for msg in data.get("messages", []):
msg_type = msg.get("message_type", msg.get("type", "unknown"))
# Look for step_id in messages (might be nested)
if msg.get("step_id"):
step_ids.add(msg.get("step_id"))
# Also check for id that starts with "step-"
msg_id = msg.get("id", "")
if msg_id.startswith("step-"):
step_ids.add(msg_id)
if msg_type == "assistant_message":
content = msg.get("content", "")
if content:
assistant_messages.append(content)
elif msg_type == "tool_call_message":
tool_call = msg.get("tool_call", {})
tool_calls.append({
"name": tool_call.get("name", "unknown"),
"tool_call_id": tool_call.get("tool_call_id"),
"step_id": msg.get("step_id")
})
log.info(f"[Letta] Tool call: {tool_call.get('name')}")
elif msg_type == "tool_return_message":
status = msg.get("status", "unknown")
tool_results.append({
"tool_call_id": msg.get("tool_call_id", ""),
"status": status,
"error": status == "error"
})
if status == "error":
log.warning(f"[Letta] Tool failed: {msg.get('tool_call_id')}")
elif msg_type == "reasoning_message":
reasoning_present = True
log.debug(f"[Letta] Reasoning from: {msg.get('source')}")
elif msg_type == "error_message":
errors.append(msg.get("message", "Unknown error"))
log.error(f"[Letta] Error: {msg.get('message')}")
# Log what we captured vs total messages
log.info(f"[Letta] Captured {len(assistant_messages)} assistant, {len(tool_calls)} tools, {len(tool_results)} results, {len(errors)} errors")
if assistant_messages:
log.debug(f"[Letta] Assistant content preview: {assistant_messages[0][:200] if assistant_messages else 'none'}")
response_text = "".join(assistant_messages) if assistant_messages else "I received your message but have no response."
return LettaResponse(
assistant_text=response_text,
status="SUCCESS",
step_ids=list(step_ids),
tool_calls=tool_calls,
tool_results=tool_results,
reasoning_present=reasoning_present,
errors=errors
)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 409:
return LettaResponse(None, "BUSY", [], [], [], False, [])
error_msg = f"Sorry, I encountered an HTTP error: {e.response.status_code}"
log.error(f"Letta API HTTP error: {e}")
return LettaResponse(error_msg, "ERROR", [], [], [], False, [])
except Exception as e:
error_msg = f"Sorry, I encountered an error: {str(e)}"
log.error(f"Letta API error: {e}")
return LettaResponse(error_msg, "ERROR", [], [], [], False, [])
def create_conversation() -> str | None:
"""
Create a new Letta conversation.
Returns:
Conversation ID if successful, None otherwise.
"""
# Note: Letta API requires agent_id as query param, not JSON body
url = f"{LETTA_BASE_URL}/conversations/"
params = {"agent_id": LETTA_AGENT_ID}
headers = {"Content-Type": "application/json"}
if LETTA_API_KEY:
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
payload = {
"name": f"matrix-bridge-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
}
try:
response = requests.post(url, params=params, json=payload, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
conversation_id = data.get("id")
log.info(f"Created Letta conversation: {conversation_id}")
return conversation_id
except Exception as e:
log.error(f"Failed to create Letta conversation: {e}")
return None
def send_feedback_to_letta(step_id: str, feedback: str) -> bool:
"""Send feedback (positive/negative) for a Letta step.
Args:
step_id: The Letta step ID to provide feedback for
feedback: Either "positive" or "negative"
Returns:
True if feedback was sent successfully, False otherwise
"""
if feedback not in ("positive", "negative"):
log.error(f"Invalid feedback value: {feedback}")
return False
url = f"{LETTA_BASE_URL}/steps/{step_id}/feedback"
headers = {"Content-Type": "application/json"}
if LETTA_API_KEY:
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
body = {"feedback": feedback}
log.debug(f"Sending PATCH to {url} with body {body}")
try:
# PATCH with JSON body (discovered from ADE console)
response = requests.patch(url, headers=headers, json=body, timeout=30)
response.raise_for_status()
log.info(f"Sent {feedback} feedback for step {step_id}")
return True
except Exception as e:
log.error(f"Failed to send feedback for step {step_id}: {e}")
return False
# ============================================================
# BRIDGE CLASS
# ============================================================
class MemoryStateStore(BaseMemoryStateStore):
async def find_shared_rooms(self, user_id: UserID) -> list[RoomID]:
# Return all encrypted rooms this user is in
# For simplicity, return empty - this disables some optimizations
return []
class MeridianBridge:
"""Matrix-Letta bridge with full E2EE support"""
def __init__(self):
self.client: Client | None = None
self.crypto: OlmMachine | None = None
self.crypto_store: SQLiteCryptoStore | None = None
self.state_store: SQLStateStore | None = None
self.db: Database | None = None
self.user_id: UserID | None = None
self.device_id: DeviceID | None = None
self.initial_sync_done = False
self.shutting_down = False # Graceful shutdown flag
self.shutdown_event = asyncio.Event() # Event to signal shutdown
self.conversation_cache: dict[RoomID, str] = {} # room_id -> conversation_id
self.our_message_events: set[str] = set() # Track our message event IDs (for emoji controls)
self.room_cache: dict[str, dict] = {} # Cache room member counts for audio filtering
self.heartbeat: HeartbeatService | None = None # Autonomous agent heartbeat
# Track event IDs we've already added reactions to (prevent re-adding on restart)
self.processed_reactions: set[str] = set()
async def init_database(self):
"""Initialize database for crypto and state storage"""
self.db = Database.create(
DB_URL,
upgrade_table=None, # We'll handle schema ourselves
)
await self.db.start()
# Clear in-memory emoji state to prevent stale state from previous sessions
self.our_message_events.clear()
log.info("Cleared stale emoji state from previous session")
# Create crypto store tables if needed
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_account (
account_id TEXT PRIMARY KEY,
device_id TEXT,
shared BOOLEAN,
sync_token TEXT,
account BYTEA
)
""")
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_olm_session (
account_id TEXT,
sender_key TEXT,
session_id TEXT,
session BYTEA,
created_at TIMESTAMP,
last_encrypted TIMESTAMP,
last_decrypted TIMESTAMP,
PRIMARY KEY (account_id, sender_key, session_id)
)
""")
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_megolm_inbound_session (
account_id TEXT,
room_id TEXT,
session_id TEXT,
sender_key TEXT,
signing_key TEXT,
session BYTEA,
forwarding_chains TEXT,
withheld_code TEXT,
withheld_reason TEXT,
ratchet_safety TEXT,
received_at TIMESTAMP,
max_age BIGINT,
max_messages INTEGER,
is_scheduled BOOLEAN,
PRIMARY KEY (account_id, room_id, session_id)
)
""")
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_megolm_outbound_session (
account_id TEXT,
room_id TEXT PRIMARY KEY,
session_id TEXT,
session BYTEA,
shared BOOLEAN,
max_messages INTEGER,
message_count INTEGER,
max_age BIGINT,
created_at TIMESTAMP,
last_used TIMESTAMP
)
""")
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_device (
account_id TEXT,
user_id TEXT,
device_id TEXT,
identity_key TEXT,
signing_key TEXT,
trust TEXT,
deleted BOOLEAN,
name TEXT,
PRIMARY KEY (account_id, user_id, device_id)
)
""")
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_tracked_user (
account_id TEXT,
user_id TEXT,
PRIMARY KEY (account_id, user_id)
)
""")
await self.db.execute("""
CREATE TABLE IF NOT EXISTS crypto_cross_signing_keys (
account_id TEXT,
user_id TEXT,
usage TEXT,
key TEXT,
first_seen_key TEXT,
PRIMARY KEY (account_id, user_id, usage)
)
""")
# Per-room conversation mapping (now with isolation!)
await self.db.execute("""
CREATE TABLE IF NOT EXISTS room_conversations (
room_id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Message ID mapping for reaction → feedback correlation
await self.db.execute("""
CREATE TABLE IF NOT EXISTS message_mapping (
matrix_event_id TEXT PRIMARY KEY,
room_id TEXT NOT NULL,
step_ids TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Store our own message texts for TTS regeneration (🎤 reaction)
await self.db.execute("""
CREATE TABLE IF NOT EXISTS bot_messages (
matrix_event_id TEXT PRIMARY KEY,
room_id TEXT NOT NULL,
text TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Map audio event IDs to the original text for TTS regeneration
await self.db.execute("""
CREATE TABLE IF NOT EXISTS audio_messages (
audio_event_id TEXT PRIMARY KEY,
original_text TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Pending images for interactive checkmark (✅) flow
await self.db.execute("""
CREATE TABLE IF NOT EXISTS pending_images (
checkmark_event_id TEXT PRIMARY KEY,
room_id TEXT NOT NULL,
image_data BLOB NOT NULL,
image_format TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Track reactions we've already processed (to prevent re-adding on restart)
await self.db.execute("""
CREATE TABLE IF NOT EXISTS processed_reactions (
source_event_id TEXT NOT NULL,
target_event_id TEXT NOT NULL,
emoji TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (source_event_id, target_event_id, emoji)
)
""")
log.info("Database initialized")
async def init_client(self):
"""Initialize Matrix client with E2EE and token refresh support"""
session = load_session()
if session:
try:
log.info(f"Restoring session for {session['user_id']} (device: {session['device_id']})")
self.user_id = UserID(session["user_id"])
self.device_id = DeviceID(session["device_id"])
self.client = Client(
mxid=self.user_id,
device_id=self.device_id,
base_url=session["homeserver"],
token=session["access_token"],
)
# Test the token with a light operation
await self.client.whoami()
log.info("Session restored successfully")
except MUnknownToken:
log.warning("Stored token is invalid, re-authenticating...")
await self._reauth_with_password()
else:
await self._reauth_with_password()
async def _reauth_with_password(self):
"""Re-authenticate using stored password and save new session."""
log.info("Re-authenticating with password...")
self.client = Client(
mxid=UserID(MATRIX_USER_ID),
base_url=MATRIX_HOMESERVER,
)
try:
response = await self.client.login(password=MATRIX_PASSWORD)
self.user_id = self.client.mxid
self.device_id = self.client.device_id
log.info(f"Re-authenticated as {self.user_id} (device: {self.device_id})")
# Save new session
save_session(
user_id=str(self.user_id),
device_id=str(self.device_id),
access_token=self.client.api.token,
homeserver=MATRIX_HOMESERVER,
)
except Exception as e:
log.error(f"Re-authentication failed: {e}")
raise
async def init_crypto(self):
"""Initialize E2EE support"""
# Create state store for room encryption tracking
self.state_store = MemoryStateStore()
# Create crypto store
self.crypto_store = SQLiteCryptoStore(
account_id=str(self.user_id),
pickle_key=CRYPTO_PICKLE_KEY,
db_path=STORE_PATH / "crypto.db",
)
await self.crypto_store.open()
# Create OlmMachine
self.crypto = OlmMachine(
client=self.client,
crypto_store=self.crypto_store,
state_store=self.state_store,
log=log.getChild("crypto"),
)
# Set up automatic key sharing without validation.
# Not doing this will mean verifying accounts.
self.crypto.share_keys_min_trust = TrustState.UNVERIFIED
self.crypto.send_keys_min_trust = TrustState.UNVERIFIED
# Link crypto to client for automatic encryption/decryption
self.client.state_store = self.state_store
self.client.crypto = self.crypto
# Open crypto store
await self.crypto_store.open()
# Check device ID consistency
stored_device_id = await self.crypto_store.get_device_id()
if stored_device_id and stored_device_id != self.device_id:
log.warning("Device ID mismatch, resetting crypto store")
await self.crypto_store.delete()
stored_device_id = None
# Load Olm account
await self.crypto.load()
# Save device ID if new
if not stored_device_id:
await self.crypto_store.put_device_id(self.device_id)
# Enable automatic key requests
self.crypto.allow_key_share = lambda *args: True
# Share keys if not already shared
if not self.crypto.account.shared:
await self.crypto.share_keys()
log.info("Shared device keys with server")
else:
log.info("Device keys already shared")
log.info(f"E2EE initialized (fingerprint: {self.crypto.account.fingerprint})")
async def setup_cross_signing(self):
"""Set up cross-signing for enhanced verification"""
try:
log.info("Checking cross-signing status...")
# Check if we already have cross-signing keys
own_keys = await self.crypto.get_own_cross_signing_public_keys()
if own_keys and own_keys.master_key:
log.info("Cross-signing keys already exist on server")
# Check if we have the private keys locally
if self.crypto._cross_signing_private_keys is None:
# We need to import keys using recovery key
if MATRIX_RECOVERY_KEY:
log.info("Importing cross-signing keys from recovery key...")
await self.crypto.verify_with_recovery_key(MATRIX_RECOVERY_KEY)
log.info("✅ Cross-signing keys imported successfully")
# Sign our own device
await self.crypto.sign_own_device(self.crypto.own_identity)
log.info("✅ Device signed with cross-signing keys")
else:
log.warning("⚠️ Cross-signing keys exist but no MATRIX_RECOVERY_KEY in .env")
log.warning(" Add MATRIX_RECOVERY_KEY to .env to enable cross-signing")
else:
log.info("✅ Cross-signing keys already loaded")
# Sign our device if not already signed
await self.crypto.sign_own_device(self.crypto.own_identity)
log.info("✅ Device signed with cross-signing keys")
else:
# Generate new cross-signing keys
log.info("Generating new cross-signing keys...")
recovery_key = await self.crypto.generate_recovery_key()
log.info("=" * 70)
log.info("🔑 CROSS-SIGNING RECOVERY KEY GENERATED")
log.info("=" * 70)
log.info("")
log.info(f" {recovery_key}")
log.info("")
log.info("⚠️ IMPORTANT: Save this recovery key to your .env file:")
log.info(f" MATRIX_RECOVERY_KEY={recovery_key}")
log.info("")
log.info("This key is needed to restore cross-signing on new devices.")
log.info("=" * 70)
log.info("✅ Cross-signing enabled and device signed")
except Exception as e:
log.warning(f"⚠️ Cross-signing setup failed: {e}")
log.warning(" This may be due to homeserver limitations (Conduit has incomplete support)")
log.warning(" Falling back to basic E2EE without cross-signing")
log.warning(" Your encryption will still work, but devices won't be cross-signed")
def register_handlers(self):
"""Register event handlers"""
@self.client.on(EventType.ROOM_MESSAGE)
async def handle_message(evt):
await self.on_message(evt)
@self.client.on(EventType.ROOM_MEMBER)
async def handle_member(evt: StateEvent):
# Auto-accept invites
if (evt.state_key == str(self.user_id) and
evt.content.membership == Membership.INVITE):
log.info(f"Received invite to {evt.room_id}")
await self.client.join_room(evt.room_id)
log.info(f"Joined {evt.room_id}")
# Update room cache for audio filtering
self.room_cache[str(evt.room_id)] = {
"is_dm": False, # Default to False until we know better
"member_count": 0
}
# When someone else joins an encrypted room, rotate keys so new member gets them
elif (evt.state_key != str(self.user_id) and
evt.content.membership == Membership.JOIN and
self.crypto and self.crypto_store and self.initial_sync_done):
try:
# Check if room is encrypted
await self.client.get_state_event(evt.room_id, EventType.ROOM_ENCRYPTION)
# Room is encrypted - invalidate current session so next message creates new one
new_user = UserID(evt.state_key)
log.info(f"New member {new_user} joined encrypted room {evt.room_id}, rotating session...")
# Remove current outbound session - next message will create new one with all members
await self.crypto_store.remove_outbound_group_session(evt.room_id)
log.info(f"✅ Rotated Megolm session for {evt.room_id} (new member: {new_user})")
except Exception as e:
# Room not encrypted or error - ignore
log.debug(f"Key rotation skipped for {evt.room_id}: {e}")
@self.client.on(EventType.ROOM_ENCRYPTED)
async def handle_encrypted(evt: EncryptedEvent):
# Decryption is automatic via DecryptionDispatcher
# This handler catches events that failed to decrypt
log.warning(f"Failed to decrypt event {evt.event_id} in {evt.room_id}")
# Debug info
try:
log.warning(f" Algorithm: {evt.content.algorithm}")
log.warning(f" Session ID: {evt.content.session_id}")
log.warning(f" Sender key: {evt.content.sender_key}")
# Try manual decryption to get the actual error
if self.crypto:
try:
decrypted = await self.crypto.decrypt_megolm_event(evt)
log.info(f" Manual decrypt succeeded! Type: {decrypted.type}")
except Exception as decrypt_err:
log.warning(f" Manual decrypt error: {type(decrypt_err).__name__}: {decrypt_err}")
# Check if we have the session
has_session = await self.crypto_store.has_group_session(evt.room_id, evt.content.session_id)
log.warning(f" Has session in store: {has_session}")
except Exception as e:
log.warning(f" Could not extract encryption details: {e}")
@self.client.on(EventType.TO_DEVICE_ENCRYPTED)
async def handle_to_device(evt):
log.info(f"Received encrypted to-device event from {evt.sender}")
@self.client.on(EventType.ROOM_KEY)
async def handle_room_key(evt):
log.info(f"Received room key for {evt.content.room_id} session {evt.content.session_id}")
@self.client.on(EventType.REACTION)
async def handle_reaction(evt):
log.info(f"Received REACTION event: {evt.event_id}")
await self.on_reaction(evt)
# Catch-all to see what events we're receiving
@self.client.on(EventType.ALL)
async def handle_all(evt):
# Log non-standard events at INFO level to see what we're getting
if evt.type not in (EventType.ROOM_MESSAGE, EventType.ROOM_ENCRYPTED):
log.info(f"EVENT: type={evt.type} id={getattr(evt, 'event_id', 'N/A')}")
async def should_send_audio(self, room_id: RoomID) -> bool:
"""Check if audio should be sent to this room based on filter settings."""
if not ENABLE_AUDIO_RESPONSE:
return False
if AUDIO_ROOM_FILTER == "all":
return True
elif AUDIO_ROOM_FILTER == "dm_only":
# Use cache to check if room is DM (2 members)
room_str = str(room_id)
if room_str in self.room_cache:
return self.room_cache[room_str].get("is_dm", False)
# Fallback: try to get room info via API
# For now, assume all rooms need audio if we can't check
return True
else:
# Comma-separated list of allowed room IDs
allowed_rooms = [r.strip() for r in AUDIO_ROOM_FILTER.split(",")]
room_str = str(room_id)
return any(room_str == r or room_str.endswith(r) for r in allowed_rooms if r)
return False
async def get_or_create_conversation(self, room_id: RoomID) -> str | None:
"""
Get or create a Letta conversation for a specific room.
This provides per-room isolation - each Matrix room gets its own conversation.
Args:
room_id: The Matrix room ID
Returns:
Conversation ID if successful, None otherwise
"""
# Check cache first
if room_id in self.conversation_cache:
return self.conversation_cache[room_id]
# Check database
row = await self.db.fetchrow(
"SELECT conversation_id FROM room_conversations WHERE room_id = ?",
str(room_id)
)
if row and row["conversation_id"]:
conv_id = row["conversation_id"]
# Update last_used_at
await self.db.execute(
"UPDATE room_conversations SET last_used_at = CURRENT_TIMESTAMP WHERE room_id = ?",
str(room_id)
)
self.conversation_cache[room_id] = conv_id
return conv_id
# Create new conversation for this room
# Note: Letta API requires agent_id as query param, not JSON body
url = f"{LETTA_BASE_URL}/conversations/"
params = {"agent_id": LETTA_AGENT_ID}
headers = {"Content-Type": "application/json"}
if LETTA_API_KEY:
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
payload = {
"name": f"matrix-room-{str(room_id).replace(':', '-')}",
}
try:
response = requests.post(url, params=params, json=payload, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
conv_id = data.get("id")
# Store in database
await self.db.execute(
"""
INSERT INTO room_conversations (room_id, conversation_id)
VALUES (?, ?)
""",
str(room_id), conv_id
)
self.conversation_cache[room_id] = conv_id
log.info(f"Created conversation {conv_id} for room {room_id}")
return conv_id
except Exception as e:
log.error(f"Failed to create conversation for room {room_id}: {e}")
return None
async def on_message(self, evt):
"""Handle incoming messages (text and images)"""
# Ignore messages during initial sync.
if not self.initial_sync_done:
return
# Ignore old messages (more than 60 seconds old)
event_time = datetime.fromtimestamp(evt.timestamp / 1000)
message_age = datetime.now() - event_time
if message_age > timedelta(seconds=60):
log.debug(f"Ignoring old message ({message_age.seconds}s old) from {evt.sender}")
return
# Ignore own messages
if evt.sender == self.user_id:
return
# Check if shutting down (stop accepting new messages)
if self.shutting_down:
log.info(f"Shutting down, ignoring message from {sender}")
return
room_id = evt.room_id
sender = evt.sender
body = evt.content.body
# Update heartbeat tracker (user is active)
if self.heartbeat:
self.heartbeat.update_last_user_message(str(room_id))
# Handle images
if evt.content.msgtype == MessageType.IMAGE:
return await self.on_image(evt, room_id, sender, body)
# Handle audio
if evt.content.msgtype == MessageType.AUDIO:
return await self.on_audio(evt, room_id, sender, body)
# Only handle text messages
if evt.content.msgtype != MessageType.TEXT:
return
log.info(f"[{room_id}] {sender}: {body}")
# Store user's event ID for reactions
user_event_id = str(evt.event_id)
# Send read receipt immediately (seen indicator)
try:
await self.client.send_receipt(room_id, EventID(user_event_id))
except Exception as e:
log.debug(f"Failed to send read receipt: {e}")
# Handle bridge commands
if body.startswith("!"):
cmd = body.strip().lower()
if cmd in ("!reshare", "!keys", "!rekey"):
# Force new encryption session - removes current outbound session
# so next message creates a fresh one shared with all members
if self.crypto and self.crypto_store:
try:
# Remove the current outbound session for this room
await self.crypto_store.remove_outbound_group_session(room_id)
log.info(f"Removed outbound session for {room_id}, next message will create new one")
# Get member count for the confirmation message
members = await self.client.get_joined_members(room_id)
member_count = len(members)
# Send confirmation (this will create and share a NEW session)
await self.send_message(room_id, f"🔑 Created new encryption session. All {member_count} members should now be able to see new messages.")
log.info(f"New session created for {room_id} with {member_count} members")
except Exception as e:
await self.send_message(room_id, f"⚠️ Key rotation failed: {e}")
log.error(f"Manual key rotation failed: {e}")
else:
await self.send_message(room_id, "⚠️ E2EE not initialized")
return
if cmd in ("!heartbeat", "!pulse", "!wake"):
# Manually trigger a heartbeat (resumes if paused)
if self.heartbeat and HEARTBEAT_ENABLED:
# trigger() handles resume internally
asyncio.create_task(self.heartbeat.trigger(by=str(evt.sender)))
hb_status = self.heartbeat.get_status()
if hb_status['paused']:
await self.send_message(room_id, "▶️ Resuming heartbeat...")
else:
await self.send_message(room_id, "⏰ Triggering heartbeat (silent mode - check logs)...")
else:
await self.send_message(room_id, "⚠️ Heartbeat service not enabled. Set HEARTBEAT_ENABLED=1")
return
if cmd in ("!pause", "!suspend"):
# Pause the heartbeat service
if self.heartbeat:
response = self.heartbeat.pause(by=str(evt.sender))
await self.send_message(room_id, response)
else:
await self.send_message(room_id, "⚠️ Heartbeat service not available")
return
if cmd in ("!resume", "!unpause"):
# Resume the heartbeat service
if self.heartbeat:
response = self.heartbeat.resume(by=str(evt.sender))
await self.send_message(room_id, response)
else:
await self.send_message(room_id, "⚠️ Heartbeat service not available")
return
if cmd == "!status":
# Show bridge and heartbeat status
status_lines = [
"**Meridian Bridge Status**",
f"• E2EE: {'✅ Ready' if self.crypto else '❌ Not initialized'}",
f"• Syncing: {'✅ Active' if self.initial_sync_done else '⏳ Waiting'}",
f"• Conversations: {len(self.conversation_cache)} cached",
]
if self.heartbeat:
hb_status = self.heartbeat.get_status()
hb_indicator = "⏸️ Paused" if hb_status['paused'] else ("🏃 running" if hb_status['is_running'] else ("✅ Active" if hb_status['running'] else "❌ Stopped"))
status_lines.extend([
"",
"**Heartbeat Status**",
f"• Status: {hb_indicator}",
f"• Interval: {hb_status['interval_minutes']} minutes",
f"• Count: {hb_status['heartbeat_count']} sent, {hb_status['skipped_count']} skipped",
f"• Last: {hb_status['last_heartbeat'] or 'Never'}",
])
if hb_status['paused']:
paused_mins = ""
if hb_status['paused_since']:
paused_duration = datetime.now(timezone.utc) - datetime.fromisoformat(hb_status['paused_since'])
paused_mins = f" ({int(paused_duration.total_seconds()/60)}m ago)"
status_lines.append(f"• Paused by: {hb_status['paused_by'] or 'unknown'}{paused_mins}")
if hb_status['is_running']:
status_lines.append("• ⚠️ Currently executing a heartbeat task")
status_lines.extend([
"",
"**Commands**",
"• !pause / !suspend - Pause heartbeat",
"• !resume / !unpause - Resume heartbeat",
"• !heartbeat - Manually trigger (also resumes if paused)",
"• !status - Show this status",
"• !clear-reactions - Clear reaction dedup cache (use if seeing dupes on restart)",
])
await self.send_message(room_id, "\n".join(status_lines))
return
if cmd == "!clear-reactions":
# Clear the processed_reactions table to fix duplicate reactions
result = await self.db.execute("DELETE FROM processed_reactions")
clear_count = result if isinstance(result, int) else 0
await self.send_message(
room_id,
f"🧹 Cleared {clear_count} reactions from dedup cache.\n"
f"Note: This may cause reactions to be re-sent on next restart until the system stabilizes."
)
log.info(f"Cleared {clear_count} reactions from processed_reactions table via command from {sender}")
return
# Unknown commands pass through to Letta
# Check if there's a pending image for this room
pending = await self.get_pending_image(room_id)
images = []
if pending:
checkmark_event_id, image_data, image_format = pending
log.info(f"[{room_id}] Combining text with pending image")
images = [image_data] # send_to_letta expects raw bytes, not tuples
# Note: We clear pending AFTER successful send below
# Get or create conversation for this room
conversation_id = await self.get_or_create_conversation(room_id)
# Send typing indicator (keep active through entire Letta call + message send)
await self.client.set_typing(room_id, timeout=60000)
# Try to send to Letta using room's conversation
# Include sender so Ani knows who's speaking (important for multi-user rooms)
message_with_sender = f"{sender}: {body}"
letta_response = await asyncio.to_thread(
send_to_letta, message_with_sender, conversation_id, images=images
)
if letta_response.status == "SUCCESS":
event_id = await self.send_message(room_id, letta_response.assistant_text)
log.info(f"[{room_id}] Agent: {letta_response.assistant_text[:100]}...")
# Store mapping for feedback correlation
if event_id and letta_response.step_ids:
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
# Add 🎤 to text message for TTS generation (text-only responses)
if event_id:
await self.add_reaction(room_id, event_id, "🎤")
# Store text for TTS generation via 🎤
await self.store_audio_message(event_id, letta_response.assistant_text)
# Add tool summary reactions to user's message (source = assistant response)
await self.add_tool_summary_reactions(room_id, user_event_id, letta_response, source_event_id=event_id or "")
# Clear pending image after successful send
if pending:
await self.clear_pending_image(room_id)
# Stop typing indicator
await self.client.set_typing(room_id, timeout=0)
elif letta_response.status == "BUSY":
# Queue the message with event IDs for reactions
message_queue.append((room_id, sender, body, user_event_id, 0, ""))
log.info(f"Agent busy, queued message (queue size: {len(message_queue)})")
# Send acknowledgment
await self.send_message(
room_id,
"⏳ I'm currently in another conversation. Your message has been queued."
)
# Stop typing indicator
await self.client.set_typing(room_id, timeout=0)
else: # ERROR
await self.send_message(room_id, letta_response.assistant_text)
log.error(f"[{room_id}] Error: {letta_response.assistant_text}")
# Stop typing indicator
await self.client.set_typing(room_id, timeout=0)
async def on_image(self, evt, room_id: RoomID, sender: UserID, caption: str):
"""Handle incoming image messages - queue and send ✅ checkmark"""
log.info(f"[{room_id}] Image from {sender}: {caption or '(no caption)'}")
# Send typing indicator (image processing takes time)
await self.client.set_typing(room_id, timeout=60000)
try:
# Get image URL
content = evt.content
if isinstance(content, MediaMessageEventContent) and content.msgtype == MessageType.IMAGE:
# For E2EE, URL is in file.url, otherwise content.url
if hasattr(content, "file") and content.file and content.file.url:
mxc_url = content.file.url
encryption_info = {
"key": {"k": content.file.key.key},
"iv": content.file.iv,
"hashes": {"sha256": content.file.hashes.get("sha256", "")}
}
else:
mxc_url = content.url
encryption_info = None
if not mxc_url:
raise ValueError("No image URL found (E2EE?)")
# Download and process image
image_data = await process_matrix_image(self.client, mxc_url, encryption_info)
log.info(f"Downloaded image: {len(image_data)} bytes")
# Prepare image for Letta (resize, convert to JPEG)
processed_image = prepare_image_for_letta(image_data)
log.info(f"Processed image: {len(processed_image)} bytes")
# Stop typing
await self.client.set_typing(room_id, timeout=0)
# Add ✅ reaction to the user's image message (not as a new message)
user_image_event_id = str(evt.event_id)
await self.add_reaction(room_id, user_image_event_id, "")
# Store pending image with the USER'S image event ID (for ✅ reaction handling)
image_format = "image/jpeg"
await self.store_pending_image(user_image_event_id, room_id, processed_image, image_format)
log.info(f"[{room_id}] Image queued, ✅ added to {user_image_event_id[:20]}...")
except Exception as e:
log.error(f"Failed to process image: {e}")
await self.client.set_typing(room_id, timeout=0)
await self.send_message(room_id, f"⚠️ Failed to process image: {str(e)}")
async def on_audio(self, evt, room_id: RoomID, sender: UserID, caption: str):
"""Handle incoming audio messages - transcribe, send to agent, respond with audio"""
# Check if shutting down (stop accepting new messages)
if self.shutting_down:
log.info(f"Shutting down, ignoring audio from {sender}")
return
log.info(f"[{room_id}] 🎤 Audio from {sender}: {caption or '(no caption)'}")
# Store user's event ID for reactions
user_event_id = str(evt.event_id)
# Send read receipt immediately (seen indicator)
try:
await self.client.send_receipt(room_id, EventID(user_event_id))
except Exception as e:
log.debug(f"Failed to send read receipt: {e}")
# Get or create conversation for this room
conversation_id = await self.get_or_create_conversation(room_id)
# Send typing indicator (STT + Letta + TTS takes time)
await self.client.set_typing(room_id, timeout=90000)
try:
content = evt.content
if isinstance(content, MediaMessageEventContent) and content.msgtype == MessageType.AUDIO:
# Get audio URL - E2EE in file.url, otherwise content.url
if hasattr(content, "file") and content.file and content.file.url:
mxc_url = content.file.url
encryption_info = {
"key": {"k": content.file.key.key},
"iv": content.file.iv,
"hashes": {"sha256": content.file.hashes.get("sha256", "")}
}
else:
mxc_url = content.url
encryption_info = None
if not mxc_url:
raise ValueError("No audio URL found (E2EE?)")
# Download and decrypt if E2EE
audio_data = await download_matrix_audio(self.client, mxc_url, encryption_info)
# Detect format from info
audio_format = "mp3"
if hasattr(content, "info") and content.info:
mimetype = getattr(content.info, "mimetype", None)
if mimetype:
if "ogg" in mimetype:
audio_format = "ogg"
elif "m4a" in mimetype or "mpeg" in mimetype:
audio_format = "mp3"
elif "wav" in mimetype:
audio_format = "wav"
# Transcribe
log.info(f"Transcribing with STT ({audio_format})...")
transcription = await asyncio.to_thread(
transcribe_audio, audio_data, audio_format
)
if not transcription or transcription.startswith("["):
await self.client.set_typing(room_id, timeout=0)
await self.send_message(room_id, f"⚠️ {transcription or 'No speech detected'}")
return
log.info(f"Transcribed: '{transcription[:100]}...'")
# Voice context - include sender so Ani knows who's speaking
voice_message = f"""[VOICE] {sender}'s voice, resonating:
"{transcription}\""""
letta_response = await asyncio.to_thread(
send_to_letta, voice_message, conversation_id
)
if letta_response.status == "SUCCESS":
# Generate TTS response if enabled and room allows it
audio_event_id = None
text_event_id = None
if await self.should_send_audio(room_id):
log.info(f"Generating TTS for {len(letta_response.assistant_text)} chars...")
audio_bytes = await asyncio.to_thread(synthesize_speech, letta_response.assistant_text)
if audio_bytes:
audio_event_id = await self._upload_and_send_audio(room_id, audio_bytes)
if audio_event_id:
log.info(f"Audio sent: {audio_event_id[:20]}...")
# Add 🎤 reaction to audio message for TTS regeneration
await self.add_reaction(room_id, audio_event_id, "🎤")
# Store mapping for TTS regeneration via 🎤 reaction
await self.store_audio_message(audio_event_id, letta_response.assistant_text)
# Send text response (no threading for now)
text_event_id = await self.send_message(room_id, letta_response.assistant_text)
preview = letta_response.assistant_text[:100] + "..." if len(letta_response.assistant_text) > 100 else letta_response.assistant_text
log.info(f"[{room_id}] Agent (audio): {preview}")
# Store mapping for feedback correlation (text message)
if text_event_id and letta_response.step_ids:
await self.store_message_mapping(text_event_id, room_id, letta_response.step_ids)
# Add tool summary reactions to user's audio message (source = text response)
await self.add_tool_summary_reactions(room_id, user_event_id, letta_response, source_event_id=text_event_id or "")
# Stop typing indicator
await self.client.set_typing(room_id, timeout=0)
elif letta_response.status == "BUSY":
await self.send_message(room_id, "⏳ I'm busy processing another message.")
# Stop typing indicator
await self.client.set_typing(room_id, timeout=0)
else:
await self.send_message(room_id, letta_response.assistant_text)
log.error(f"[{room_id}] Audio error: {letta_response.assistant_text}")
# Stop typing indicator
await self.client.set_typing(room_id, timeout=0)
except Exception as e:
log.error(f"Failed to process audio: {e}")
await self.client.set_typing(room_id, timeout=0)
await self.send_message(room_id, f"⚠️ Failed to process audio: {str(e)}")
async def _upload_and_send_audio(self, room_id: RoomID, audio_data: bytes) -> str | None:
"""Upload audio to Matrix media server and send to room.
Returns:
Event ID if successful, None otherwise
"""
try:
# Upload to media server using mautrix upload_media
content_uri = await self.client.upload_media(
data=audio_data,
mime_type="audio/mpeg",
filename="ani-response.mp3",
size=len(audio_data)
)
if not content_uri:
log.error("Failed to upload audio to media server: no content_uri")
return None
log.info(f"Audio uploaded: {content_uri[:60]}...")
# Send audio message with agent's name
content = {
"body": f"{AGENT_DISPLAY_NAME}'s voice",
"info": {"mimetype": "audio/mpeg", "size": len(audio_data)},
"msgtype": "m.audio",
"url": content_uri,
}
event_id = await self.client.send_message_event(
room_id,
EventType.ROOM_MESSAGE,
content
)
return str(event_id) if event_id else None
except Exception as e:
log.error(f"Failed to upload/send audio: {e}")
return None
async def on_reaction(self, evt):
"""
Handle reaction events - both for feedback AND to let agent see reactions.
The agent can respond to reactions with emojis for interactive gameplay!
"""
log.info(f"on_reaction called for {evt.event_id} from {evt.sender} in {evt.room_id}")
# Ignore during initial sync
if not self.initial_sync_done:
log.info(" Skipping: initial sync not done")
return
# Check if shutting down (stop accepting new messages)
if self.shutting_down:
log.info(" Skipping: shutting down")
return
# Ignore own reactions
if evt.sender == self.user_id:
log.info(" Skipping: own reaction")
return
# Get the reaction content
try:
relates_to = evt.content.relates_to
if not relates_to:
log.info(" Skipping: no relates_to")
return
target_event_id = str(relates_to.event_id)
reaction_key = relates_to.key
except AttributeError as e:
log.warning(f" Could not parse reaction event: {e}")
return
log.info(f" Reaction: {reaction_key} on event {target_event_id}")
room_id = evt.room_id
sender = evt.sender
# **NEW: Interactive emoji controls**
# 🎤 on audio messages = regenerate TTS
if reaction_key == "🎤" and target_event_id in self.our_message_events:
log.info(f" 🎤 TTS re-generation requested for audio {target_event_id}")
result_id = await self.regenerate_tts(room_id, target_event_id)
return # Don't forward to Letta - this is a bridge control
# ✅ on checkmark messages = send pending image alone
if reaction_key == "":
# Check if this is one of our checkmark messages
pending_row = await self.db.fetchrow(
"SELECT image_data, image_format FROM pending_images WHERE checkmark_event_id = ?",
target_event_id
)
if pending_row:
log.info(f" ✅ Sending pending image alone to Letta (event: {target_event_id[:20]}...)")
image_data = pending_row["image_data"]
image_format = pending_row["image_format"]
conversation_id = await self.get_or_create_conversation(room_id)
await self.client.set_typing(room_id, timeout=30000)
# Send image alone (no additional text)
letta_response = await asyncio.to_thread(
send_to_letta, "", conversation_id, images=[image_data] # send_to_letta expects raw bytes
)
await self.client.set_typing(room_id, timeout=0)
if letta_response.status == "SUCCESS":
event_id = await self.send_message(room_id, letta_response.assistant_text)
log.info(f"[{room_id}] Agent (image alone): {letta_response.assistant_text[:100]}...")
if event_id and letta_response.step_ids:
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
else:
await self.send_message(room_id, letta_response.assistant_text)
log.error(f"[{room_id}] Error: {letta_response.assistant_text}")
# Clear pending image after sending
await self.clear_pending_image(room_id)
return # Don't forward to Letta - this is a bridge control
# Get conversation for this room
conversation_id = await self.get_or_create_conversation(room_id)
# **PRIMARY**: Send reaction to Letta so agent can see and respond!
# Format message clearly so the agent understands what happened
reaction_message = f"🎭 {sender} reacted with: {reaction_key}"
log.info(f" 🎭 Forwarding reaction to Letta: {reaction_message}")
letta_response = await asyncio.to_thread(
send_to_letta, reaction_message, conversation_id
)
if letta_response.status == "SUCCESS":
# Send agent's response (it might reply with an emoji!)
event_id = await self.send_message(room_id, letta_response.assistant_text)
log.info(f"[{room_id}] Agent reacted: {letta_response.assistant_text[:50]}...")
# Store mapping for feedback correlation
if event_id and letta_response.step_ids:
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
else:
log.warning(f"Could not send reaction to agent: {letta_response.status}")
# **SECONDARY**: Also send feedback if it's a thumbs up/thumbs down
# Using sets for reaction matching with emoji variations
POSITIVE_REACTIONS = {
"👍", "👍️", "👍🏻", "👍🏼", "👍🏽", "👍🏾", "👍🏿",
"❤️", "", "💕", "💖", "💓", "💗", "💙", "💚", "💛", "💜", "🖤",
"", "🎉", "💯", "👏", "🙌", "💪", "", "🌟", "+1"
}
NEGATIVE_REACTIONS = {
"👎", "👎️", "👎🏻", "👎🏼", "👎🏽", "👎🏾", "👎🏿",
"", "😕", "😞", "💔", "-1"
}
# Determine feedback type from reaction
feedback = None
if reaction_key in POSITIVE_REACTIONS:
feedback = "positive"
elif reaction_key in NEGATIVE_REACTIONS:
feedback = "negative"
# Look up the Letta step IDs for this event and send feedback
if feedback:
step_ids = await self.get_step_ids_for_event(target_event_id)
if step_ids:
log.info(f" Sending {feedback} feedback for step_ids: {step_ids}")
for step_id in step_ids:
result = await asyncio.to_thread(send_feedback_to_letta, step_id, feedback)
log.debug(f" Feedback result for {step_id}: {result}")
async def send_message(self, room_id: RoomID, text: str) -> str | None:
"""
Send a formatted message to a room (auto-encrypts if needed).
Supports all Matrix formatting features:
- Markdown conversion (bold, italic, code, links, lists, etc.)
- Emoji shortcode conversion (:heart: → ❤️)
- Matrix extensions (spoilers, colors with named palette)
- Raw HTML pass-through (if agent returns HTML)
Returns:
The Matrix event ID of the sent message, or None on failure.
"""
# Format text as HTML with full markdown and emoji support
plain_text, html_body = format_html(text)
# Create content with both plain text and formatted HTML
content = {
"msgtype": "m.text",
"body": plain_text,
"format": "org.matrix.custom.html",
"formatted_body": html_body,
}
event_id = await self.client.send_message_event(room_id, EventType.ROOM_MESSAGE, content)
return str(event_id) if event_id else None
async def store_message_mapping(self, matrix_event_id: str, room_id: RoomID, step_ids: list[str]):
"""Store mapping between Matrix event ID and Letta step IDs."""
if not step_ids:
return
step_ids_json = json.dumps(step_ids)
await self.db.execute(
"""
INSERT OR REPLACE INTO message_mapping (matrix_event_id, room_id, step_ids)
VALUES (?, ?, ?)
""",
str(matrix_event_id), str(room_id), step_ids_json
)
log.debug(f"Stored mapping: {matrix_event_id} -> {step_ids}")
async def get_step_ids_for_event(self, matrix_event_id: str) -> list[str]:
"""Retrieve Letta step IDs for a Matrix event ID."""
row = await self.db.fetchrow(
"SELECT step_ids FROM message_mapping WHERE matrix_event_id = ?",
str(matrix_event_id)
)
if row and row["step_ids"]:
return json.loads(row["step_ids"])
return []
# ============================================================
# INTERACTIVE EMOJI CONTROLS
# ============================================================
async def store_audio_message(self, audio_event_id: str, original_text: str):
"""Store mapping from audio event ID to original text for TTS regeneration via 🎤 reaction."""
await self.db.execute(
"""
INSERT OR REPLACE INTO audio_messages (audio_event_id, original_text)
VALUES (?, ?)
""",
str(audio_event_id), original_text
)
self.our_message_events.add(str(audio_event_id))
log.debug(f"Stored audio message text: {audio_event_id}")
async def get_original_text_for_audio(self, audio_event_id: str) -> str | None:
"""Retrieve original text for an audio event ID."""
row = await self.db.fetchrow(
"SELECT original_text FROM audio_messages WHERE audio_event_id = ?",
str(audio_event_id)
)
return row["original_text"] if row else None
async def add_reaction(self, room_id: RoomID, target_event_id: str, emoji: str, source_event_id: str = ""):
"""
Add a reaction to a message.
Args:
room_id: The room ID
target_event_id: The event to react to
emoji: The emoji reaction
source_event_id: The event ID we're reacting from (for tracking)
"""
if target_event_id:
# Check if we've already sent this reaction (for tool visibility reactions)
if source_event_id:
if await self.is_reaction_already_sent(source_event_id, target_event_id, emoji):
log.debug(f"Skipping duplicate reaction {emoji} on {target_event_id}")
return
content = ReactionEventContent(
relates_to=RelatesTo(
event_id=EventID(target_event_id),
rel_type=RelationType.ANNOTATION,
key=emoji
)
)
await self.client.send_message_event(room_id, EventType.REACTION, content)
log.debug(f"Added reaction {emoji} to {target_event_id}")
# Track this reaction (only after initial sync completes to avoid tracking old reactions)
if source_event_id and self.initial_sync_done:
await self.mark_reaction_sent(source_event_id, target_event_id, emoji)
async def add_tool_summary_reactions(
self,
room_id: RoomID,
target_event_id: str,
letta_response: LettaResponse,
source_event_id: str = "" # Event ID we're reacting from (usually the assistant response)
) -> None:
"""Add emoji reactions to summarize what tools were executed."""
if not target_event_id:
return
# No tools/reasoning - nothing to show
if not letta_response.tool_calls and not letta_response.reasoning_present:
return
emojis_to_add = []
# Add brain emoji if reasoning was used
if letta_response.reasoning_present:
emojis_to_add.append("🧠")
# Add emojis for all tools used (deduplicated)
tool_emojis = []
for tool in letta_response.tool_calls:
emoji = get_emoji_for_tool(tool["name"])
tool_emojis.append(emoji)
emojis_to_add.extend(deduplicate_emojis(tool_emojis))
# Check for tool execution errors
if letta_response.tool_results:
failed_tools = [r for r in letta_response.tool_results if r.get("error")]
if failed_tools:
emojis_to_add.append("")
else:
emojis_to_add.append("")
# Add error message indicator
if letta_response.errors:
emojis_to_add.append("⚠️")
# Add all reactions with delay to avoid rate limiting
for emoji in emojis_to_add[:6]: # Limit to 6 emojis
try:
await self.add_reaction(room_id, target_event_id, emoji, source_event_id)
await asyncio.sleep(0.3)
except Exception as e:
log.warning(f"Failed to add reaction {emoji}: {e}")
log.info(f"[ToolVisibility] Added {emojis_to_add} to {target_event_id[:20]}...")
async def regenerate_tts(self, room_id: RoomID, audio_event_id: str):
"""Regenerate and resend TTS audio for a given audio message."""
original_text = await self.get_original_text_for_audio(audio_event_id)
if not original_text:
await self.send_message(room_id, "⚠️ Could not find original text for TTS regeneration.")
return None
log.info(f"Regenerating TTS for audio {audio_event_id[:20]}: {original_text[:50]}...")
audio_bytes = await asyncio.to_thread(synthesize_speech, original_text)
if audio_bytes:
new_audio_event_id = await self._upload_and_send_audio(room_id, audio_bytes)
if new_audio_event_id:
log.info(f"TTS audio sent: {new_audio_event_id[:20]}...")
# Add 🎤 reaction to the new audio message too
await self.add_reaction(room_id, new_audio_event_id, "🎤")
# Store mapping for new audio (for regeneration on audio)
await self.store_audio_message(new_audio_event_id, original_text)
return new_audio_event_id
else:
await self.send_message(room_id, "⚠️ TTS regeneration failed.")
return None
async def store_pending_image(
self, checkmark_event_id: str, room_id: RoomID,
image_data: bytes, image_format: str
):
"""Store pending image data with checkmark message ID."""
await self.db.execute(
"""
INSERT INTO pending_images (checkmark_event_id, room_id, image_data, image_format)
VALUES (?, ?, ?, ?)
""",
str(checkmark_event_id), str(room_id), image_data, image_format
)
log.info(f"Stored pending image with checkmark: {checkmark_event_id}")
async def get_pending_image(self, room_id: RoomID) -> tuple[str, bytes, str] | None:
"""Get pending image for a room: (checkmark_event_id, image_data, image_format)."""
# Get the most recent pending image for this room
row = await self.db.fetchrow(
"""
SELECT checkmark_event_id, image_data, image_format
FROM pending_images
WHERE room_id = ?
ORDER BY created_at DESC
LIMIT 1
""",
str(room_id)
)
if row:
return row["checkmark_event_id"], row["image_data"], row["image_format"]
return None
async def clear_pending_image(self, room_id: RoomID):
"""Clear stored pending image for a room."""
await self.db.execute(
"DELETE FROM pending_images WHERE room_id = ?",
str(room_id)
)
log.info(f"Cleared pending image for room: {room_id}")
async def is_reaction_already_sent(
self, source_event_id: str, target_event_id: str, emoji: str
) -> bool:
"""Check if we already sent this reaction (prevent re-adding on restart)."""
row = await self.db.fetchrow(
"SELECT 1 FROM processed_reactions WHERE source_event_id = ? AND target_event_id = ? AND emoji = ?",
str(source_event_id), str(target_event_id), emoji
)
return row is not None
async def mark_reaction_sent(
self, source_event_id: str, target_event_id: str, emoji: str
):
"""Mark that we've sent this reaction."""
await self.db.execute(
"""
INSERT OR REPLACE INTO processed_reactions (source_event_id, target_event_id, emoji)
VALUES (?, ?, ?)
""",
str(source_event_id), str(target_event_id), emoji
)
async def process_queue(self):
"""Background task to process queued messages"""
global processing_queue
tick = 0
while True:
# Check for shutdown
if self.shutting_down:
log.info("Queue processor stopping due to shutdown")
break
tick += 1
if tick % 30 == 0:
log.debug(f"Heartbeat - queue size: {len(message_queue)}")
if message_queue and not processing_queue:
processing_queue = True
room_id, sender, message_text, user_event_id, retry_count, _source_event_id = message_queue[0]
log.info(f"Processing queued message from {sender} (attempt {retry_count + 1})")
# Get conversation for this room
conversation_id = await self.get_or_create_conversation(room_id)
letta_response = await asyncio.to_thread(
send_to_letta, message_text, conversation_id
)
if letta_response.status in ("SUCCESS", "ERROR"):
message_queue.popleft()
event_id = await self.send_message(room_id, letta_response.assistant_text)
log.info(f"Delivered queued response to {sender}")
# Store mapping for feedback correlation
if letta_response.status == "SUCCESS" and event_id and letta_response.step_ids:
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
# Add 🎤 to response for TTS generation (same as normal path)
if event_id:
await self.add_reaction(room_id, event_id, "🎤")
# Store text for TTS generation via 🎤
await self.store_audio_message(event_id, letta_response.assistant_text)
# Add tool summary reactions to user's original message
await self.add_tool_summary_reactions(room_id, user_event_id, letta_response, source_event_id=event_id or "")
processing_queue = False
elif letta_response.status == "BUSY":
# Increment retry count, preserve all tuple elements
message_queue[0] = (room_id, sender, message_text, user_event_id, retry_count + 1, "")
wait_time = min(5 * (2 ** retry_count), 60)
log.info(f"Agent still busy, retrying in {wait_time}s")
processing_queue = False
await asyncio.sleep(wait_time)
else:
await asyncio.sleep(2)
# ============================================================
# HTTP API FOR MCP INTEGRATION
# ============================================================
async def api_health(self, request: web.Request) -> web.Response:
"""GET /api/health - Health check and status"""
try:
rooms = await self.client.get_joined_rooms() if self.client else []
response = {
"status": "ok",
"e2ee_ready": self.crypto is not None,
"user_id": str(self.user_id) if self.user_id else None,
"device_id": str(self.device_id) if self.device_id else None,
"syncing": self.initial_sync_done,
"joined_rooms": len(rooms),
"conversations_cached": len(self.conversation_cache),
}
# Add heartbeat status if enabled
if self.heartbeat:
response["heartbeat"] = self.heartbeat.get_status()
return web.json_response(response)
except Exception as e:
return web.json_response({"status": "error", "error": str(e)}, status=500)
async def api_list_rooms(self, request: web.Request) -> web.Response:
"""GET /api/list_rooms - List joined rooms with encryption status"""
try:
if not self.client:
return web.json_response({"error": "Client not initialized"}, status=503)
rooms = []
joined_rooms = await self.client.get_joined_rooms()
for room_id in joined_rooms:
try:
# Get room state via API
state_resp = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
room_name = getattr(state_resp, 'name', '') or str(room_id)
except:
room_name = str(room_id)
try:
# Check encryption status
await self.client.get_state_event(room_id, EventType.ROOM_ENCRYPTION)
is_encrypted = True
except:
is_encrypted = False
rooms.append({
"room_id": str(room_id),
"name": room_name,
"encrypted": is_encrypted,
})
return web.json_response({"rooms": rooms})
except Exception as e:
log.error(f"API list_rooms error: {e}")
return web.json_response({"error": str(e)}, status=500)
async def api_send_message(self, request: web.Request) -> web.Response:
"""POST /api/send_message - Send encrypted message to a room
Body: {"room_id": "!...", "text": "message", "html": "<b>optional</b>"}
Returns: {"event_id": "$...", "status": "sent"}
"""
try:
if not self.client:
return web.json_response({"error": "Client not initialized"}, status=503)
data = await request.json()
room_id = data.get("room_id")
text = data.get("text")
html_body = data.get("html")
if not room_id or not text:
return web.json_response(
{"error": "Missing required fields: room_id and text"},
status=400
)
room_id = RoomID(room_id)
# Use existing send_message method which handles E2EE automatically
if html_body:
# Send with explicit HTML
content = {
"msgtype": "m.text",
"body": text,
"format": "org.matrix.custom.html",
"formatted_body": html_body,
}
event_id = await self.client.send_message_event(room_id, EventType.ROOM_MESSAGE, content)
else:
# Use our formatter
event_id = await self.send_message(room_id, text)
return web.json_response({
"event_id": str(event_id) if event_id else None,
"status": "sent"
})
except Exception as e:
log.error(f"API send_message error: {e}")
return web.json_response({"error": str(e)}, status=500)
async def api_read_room(self, request: web.Request) -> web.Response:
"""POST /api/read_room - Read decrypted messages from a room
Body: {"room_id": "!...", "limit": 50, "since": "$event_id"}
Returns: {"messages": [{"sender": "@...", "content": "...", "timestamp": ..., "event_id": "$..."}]}
"""
try:
if not self.client:
return web.json_response({"error": "Client not initialized"}, status=503)
data = await request.json()
room_id_str = data.get("room_id")
limit = data.get("limit", 20)
since = data.get("since") # Optional: event_id to start from
if not room_id_str:
return web.json_response({"error": "Missing required field: room_id"}, status=400)
room_id = RoomID(room_id_str)
# Get messages via raw Matrix API for proper "most recent" behavior
# The /messages endpoint with dir=b and no 'from' should return from room end
try:
# Use the raw API with proper Path builder
resp = await self.client.api.request(
"GET",
MatrixPath.v3.rooms[room_id].messages,
query_params={
"dir": "b", # Backwards from end
"limit": str(limit),
"filter": '{"types":["m.room.message","m.room.encrypted"]}'
}
)
# resp is a dict with 'chunk', 'start', 'end' keys
raw_events = resp.get("chunk", [])
except Exception as e:
return web.json_response({"error": f"Failed to get room messages: {e}"}, status=500)
messages = []
for evt_dict in raw_events:
event_id = evt_dict.get("event_id", "")
# Skip events before 'since'
if since and event_id == since:
messages = []
continue
sender = evt_dict.get("sender", "")
timestamp = evt_dict.get("origin_server_ts", 0)
event_type = evt_dict.get("type", "")
content = evt_dict.get("content", {})
# Handle encrypted messages - try to decrypt
if event_type == "m.room.encrypted" and self.crypto:
try:
# Decrypt the event using our crypto machine
from mautrix.types import EncryptedEvent
enc_evt = EncryptedEvent.deserialize(evt_dict)
decrypted = await self.crypto.decrypt_megolm_event(enc_evt)
content = decrypted.content.serialize() if hasattr(decrypted.content, 'serialize') else {}
event_type = "m.room.message"
except Exception as e:
log.debug(f"Could not decrypt event {event_id}: {e}")
content = {"body": f"[Encrypted: {event_id[:20]}...]", "msgtype": "m.unknown"}
if event_type == "m.room.message":
msgtype = content.get("msgtype", "m.text")
body = content.get("body", "")
else:
continue # Skip non-message events
messages.append({
"event_id": event_id,
"sender": sender,
"content": body,
"timestamp": timestamp,
"type": msgtype,
})
# Reverse to get chronological order
messages.reverse()
return web.json_response({"messages": messages})
except Exception as e:
log.error(f"API read_room error: {e}")
return web.json_response({"error": str(e)}, status=500)
async def api_react(self, request: web.Request) -> web.Response:
"""POST /api/react - Send reaction to a message
Body: {"room_id": "!...", "event_id": "$...", "emoji": "👍"}
Returns: {"event_id": "$...", "status": "sent"}
"""
try:
if not self.client:
return web.json_response({"error": "Client not initialized"}, status=503)
data = await request.json()
room_id = data.get("room_id")
event_id = data.get("event_id")
emoji_key = data.get("emoji")
if not room_id or not event_id or not emoji_key:
return web.json_response(
{"error": "Missing required fields: room_id, event_id, emoji"},
status=400
)
room_id = RoomID(room_id)
# Send reaction event
content = {
"m.relates_to": {
"rel_type": "m.annotation",
"event_id": event_id,
"key": emoji_key,
}
}
response = await self.client.send_message_event(
room_id,
EventType.REACTION,
content
)
return web.json_response({
"event_id": str(response) if response else None,
"status": "sent"
})
except Exception as e:
log.error(f"API react error: {e}")
return web.json_response({"error": str(e)}, status=500)
async def api_get_room_state(self, request: web.Request) -> web.Response:
"""GET /api/room_state/{room_id} - Get room state (name, topic, encryption)"""
try:
if not self.client:
return web.json_response({"error": "Client not initialized"}, status=503)
room_id_str = request.match_info.get("room_id")
if not room_id_str:
return web.json_response({"error": "Missing room_id in path"}, status=400)
room_id = RoomID(room_id_str)
# Get room name
room_name = None
try:
name_data = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
room_name = getattr(name_data, 'name', '') or None
except:
pass
# Get room topic
room_topic = None
try:
topic_data = await self.client.get_state_event(room_id, EventType.ROOM_TOPIC)
room_topic = getattr(topic_data, 'topic', '') or None
except:
pass
# Check encryption status
is_encrypted = False
try:
await self.client.get_state_event(room_id, EventType.ROOM_ENCRYPTION)
is_encrypted = True
except:
pass
return web.json_response({
"room_id": room_id_str,
"name": room_name or room_id_str,
"topic": room_topic,
"encrypted": is_encrypted,
})
except Exception as e:
log.error(f"API room_state error: {e}")
return web.json_response({"error": str(e)}, status=500)
async def api_generate_audio(self, request: web.Request) -> web.Response:
"""POST /api/generate_audio - Generate TTS audio from text
Body: {"text": "text to synthesize", "voice": "voice_name"}
Returns: {"mxc_url": "mxc://...", "size": bytes, "duration_est": seconds}
"""
try:
if not TTS_URL:
return web.json_response({"error": "TTS not configured"}, status=503)
data = await request.json()
text = data.get("text", "")
voice = data.get("voice", TTS_VOICE)
if not text:
return web.json_response({"error": "Missing required field: text"}, status=400)
# Generate audio bytes via TTS
log.info(f"API: Generating TTS for {len(text)} characters...")
audio_bytes = await asyncio.to_thread(synthesize_speech, text)
if not audio_bytes:
return web.json_response({"error": "TTS generation failed"}, status=500)
# Upload to Matrix media server
content_uri = await self.client.upload_media(
data=audio_bytes,
mime_type="audio/mpeg",
filename="tts-audio.mp3",
size=len(audio_bytes)
)
if not content_uri:
return web.json_response({"error": "Failed to upload audio to media server"}, status=500)
# Estimate duration (rough: ~1 second per 10 chars for speech)
duration_est = len(text) / 10.0
return web.json_response({
"mxc_url": content_uri,
"size": len(audio_bytes),
"duration_est": round(duration_est, 2),
"voice": voice,
})
except Exception as e:
log.error(f"API generate_audio error: {e}")
return web.json_response({"error": str(e)}, status=500)
async def api_send_audio(self, request: web.Request) -> web.Response:
"""POST /api/send_audio - Send audio message to a room
Body: {"room_id": "!...", "mxc_url": "mxc://..." OR "text": "text_to_tts"}
Can either send an existing MXC URL or generate TTS on the fly
Returns: {"event_id": "$...", "status": "sent"}
"""
try:
if not self.client:
return web.json_response({"error": "Client not initialized"}, status=503)
data = await request.json()
room_id = data.get("room_id")
mxc_url = data.get("mxc_url")
text = data.get("text") # Alternative: generate TTS from text
voice = data.get("voice", TTS_VOICE)
caption = data.get("caption", "")
if not room_id:
return web.json_response({"error": "Missing required field: room_id"}, status=400)
room_id = RoomID(room_id)
# Either use provided MXC URL or generate TTS
final_mxc_url = mxc_url
if not final_mxc_url and text:
log.info(f"API: Generating TTS for audio message...")
audio_bytes = await asyncio.to_thread(synthesize_speech, text)
if audio_bytes:
final_mxc_url = await self.client.upload_media(
data=audio_bytes,
mime_type="audio/mpeg",
filename="tts-audio.mp3",
size=len(audio_bytes)
)
if not final_mxc_url:
return web.json_response({"error": "TTS generation or upload failed"}, status=500)
if not final_mxc_url:
return web.json_response({"error": "Either mxc_url or text must be provided"}, status=400)
# Send audio message
content = {
"body": caption or ("Voice message" if text else "Audio"),
"info": {"mimetype": "audio/mpeg", "size": 0}, # Size not known for existing MXC
"msgtype": "m.audio",
"url": final_mxc_url,
}
event_id = await self.client.send_message_event(room_id, EventType.ROOM_MESSAGE, content)
return web.json_response({
"event_id": str(event_id) if event_id else None,
"status": "sent",
"mxc_url": final_mxc_url,
})
except Exception as e:
log.error(f"API send_audio error: {e}")
return web.json_response({"error": str(e)}, status=500)
def setup_api_routes(self) -> web.Application:
"""Set up aiohttp application with API routes"""
app = web.Application()
# Core endpoints
app.router.add_get("/api/health", self.api_health)
app.router.add_get("/api/list_rooms", self.api_list_rooms)
app.router.add_get("/api/room_state/{room_id}", self.api_get_room_state)
# Messaging endpoints
app.router.add_post("/api/send_message", self.api_send_message)
app.router.add_post("/api/read_room", self.api_read_room)
app.router.add_post("/api/react", self.api_react)
# Audio endpoints
app.router.add_post("/api/generate_audio", self.api_generate_audio)
app.router.add_post("/api/send_audio", self.api_send_audio)
return app
async def run_api_server(self):
"""Run the HTTP API server"""
app = self.setup_api_routes()
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, API_HOST, API_PORT)
await site.start()
log.info(f"HTTP API server started on http://{API_HOST}:{API_PORT}")
return runner
async def run(self):
"""Main run loop"""
# Initialize everything
await self.init_database()
await self.init_client()
await self.init_crypto()
# Set up cross-signing (gracefully falls back if unsupported)
await self.setup_cross_signing()
# Load existing conversations from database
rows = await self.db.fetch("SELECT room_id, conversation_id FROM room_conversations")
for row in rows:
self.conversation_cache[RoomID(row["room_id"])] = row["conversation_id"]
log.info(f"Loaded {len(self.conversation_cache)} existing conversations from database")
self.register_handlers()
log.info("=" * 50)
log.info("Meridian Bridge Started")
log.info(f" Matrix: {self.user_id}")
log.info(f" Device: {self.device_id}")
log.info(f" Letta Agent: {LETTA_AGENT_ID}")
log.info(f" Conversations: {len(self.conversation_cache)} existing + per-room isolation")
log.info(f" E2EE: Enabled (mautrix-python)")
log.info(f" Formatting: Full HTML + Colors + Emoji")
log.info(f" Reactions: Interactive (agent sees and responds)")
if ENABLE_API:
log.info(f" HTTP API: http://{API_HOST}:{API_PORT}")
if HEARTBEAT_ENABLED:
log.info(f" Heartbeat: Every {HEARTBEAT_INTERVAL_MINUTES} minutes (SILENT MODE)")
log.info("=" * 50)
# Initialize heartbeat service if enabled
if HEARTBEAT_ENABLED:
heartbeat_config = HeartbeatConfig(
enabled=True,
interval_minutes=HEARTBEAT_INTERVAL_MINUTES,
skip_if_recent_minutes=HEARTBEAT_SKIP_IF_RECENT_MINUTES,
target_room_id=HEARTBEAT_TARGET_ROOM,
)
self.heartbeat = HeartbeatService(
config=heartbeat_config,
send_to_agent=send_to_letta,
get_conversation_id=self._get_conversation_id_for_heartbeat,
)
log.info(f"Heartbeat service initialized (interval: {HEARTBEAT_INTERVAL_MINUTES}m)")
# Start queue processor
queue_task = asyncio.create_task(self.process_queue())
# Start HTTP API server if enabled
api_runner = None
if ENABLE_API:
api_runner = await self.run_api_server()
try:
# Initial sync with retry logic
log.info("Performing initial sync...")
try:
await self.client.sync(timeout=30000)
except MUnknownToken:
log.warning("Token expired during initial sync, re-authenticating...")
await self._reauth_with_password()
# Retry sync with new token
await self.client.sync(timeout=30000)
self.initial_sync_done = True
log.info("Initial sync complete, now processing new messages only")
# Send hello after sync (if enabled)
if SEND_STARTUP_MESSAGE:
try:
rooms = await self.client.get_joined_rooms()
for room_id in rooms:
# Test formatting in hello message
hello = "Meridian bridge online! 🚀\n\n**Features:**\n{hot_pink|Emoji reactions} - I see and respond to your reactions!\n{purple|Per-room conversations} - Isolated context per room\n• Matrix extensions - {hot_pink|spoilers}, colors, markdown\n{green|Full HTML formatting} - Beautiful responses!"
await self.send_message(room_id, hello)
except Exception as e:
log.warning(f"Could not send hello: {e}")
else:
log.info("Startup message disabled (SEND_STARTUP_MESSAGE=0)")
# Start the syncer (non-blocking, runs in background)
log.info("Starting sync loop...")
self.client.start(None) # None = no filter
# Start heartbeat service after sync is complete
if self.heartbeat:
self.heartbeat.start()
log.info("Heartbeat service started (SILENT MODE)")
# Keep running until shutdown is signaled
await self.shutdown_event.wait() # Wait for shutdown signal
except (KeyboardInterrupt, asyncio.CancelledError):
log.info("Shutting down...")
finally:
# Stop heartbeat service
if self.heartbeat:
self.heartbeat.stop()
queue_task.cancel()
if api_runner:
await api_runner.cleanup()
self.client.stop()
if self.crypto_store:
await self.crypto_store.close()
if self.db:
await self.db.stop()
async def _get_conversation_id_for_heartbeat(self, room_id: str) -> str | None:
"""Helper for heartbeat to get conversation ID for a room"""
return await self.get_or_create_conversation(RoomID(room_id))
async def main():
bridge = MeridianBridge()
# Set up signal handlers for graceful shutdown
def signal_handler():
log.info("Received shutdown signal, initiating graceful shutdown...")
bridge.shutting_down = True
bridge.shutdown_event.set()
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
try:
await bridge.run()
finally:
log.info("Meridian Bridge stopped")
if __name__ == "__main__":
asyncio.run(main())