3488 lines
134 KiB
Python
Executable File
3488 lines
134 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Matrix-Letta Bridge with E2EE Support (mautrix-python)
|
||
|
||
Connects Letta agents to encrypted Matrix rooms.
|
||
Uses mautrix-python for full E2EE support including SSSS.
|
||
Enhanced with full HTML formatting, emoji support, per-room conversations, and interactive reactions.
|
||
|
||
HTTP API for MCP Integration:
|
||
POST /api/send_message - Send encrypted message to a room
|
||
POST /api/read_room - Read decrypted messages from a room
|
||
POST /api/react - Send reaction to a message
|
||
GET /api/list_rooms - List joined rooms with encryption status
|
||
GET /api/health - Health check and status
|
||
"""
|
||
import os
|
||
import sys
|
||
import asyncio
|
||
import signal
|
||
import json
|
||
import logging
|
||
import requests
|
||
import html
|
||
import re
|
||
import base64
|
||
import io
|
||
from datetime import datetime, timedelta, timezone
|
||
from collections import deque
|
||
from pathlib import Path
|
||
from dotenv import load_dotenv
|
||
import emoji # Emoji shortcode conversion
|
||
from PIL import Image
|
||
from aiohttp import web
|
||
|
||
from mautrix.client import Client
|
||
from mautrix.api import Path as MatrixPath
|
||
from mautrix.client.state_store.memory import MemoryStateStore as BaseMemoryStateStore
|
||
from mautrix.crypto import OlmMachine
|
||
from mautrix.crypto.attachments import decrypt_attachment
|
||
from mautrix.types import (
|
||
EventType,
|
||
RoomID,
|
||
UserID,
|
||
DeviceID,
|
||
EventID,
|
||
MessageType,
|
||
TextMessageEventContent,
|
||
MediaMessageEventContent, # For images AND audio
|
||
EncryptedEvent,
|
||
StateEvent,
|
||
Membership,
|
||
TrustState,
|
||
ReactionEventContent,
|
||
RelatesTo,
|
||
RelationType,
|
||
PaginationDirection,
|
||
)
|
||
from mautrix.util.async_db import Database
|
||
from mautrix.util.logging import TraceLogger
|
||
from mautrix.errors.request import MUnknownToken
|
||
|
||
from dataclasses import dataclass
|
||
from sqlite_crypto_store import SQLiteCryptoStore
|
||
from heartbeat import HeartbeatService, HeartbeatConfig
|
||
from debouncer import create_message_debouncer, MessageDebouncer
|
||
|
||
# Load environment variables
|
||
load_dotenv()
|
||
|
||
# Storage
|
||
SESSION_FILE = Path("./session.json")
|
||
STORE_PATH = Path("./store")
|
||
STORE_PATH.mkdir(exist_ok=True)
|
||
|
||
# ============================================================
|
||
# TOOL VISIBILITY EMOJI MAPPING
|
||
# ============================================================
|
||
|
||
TOOL_EMOJI_MAP = {
|
||
# Search operations
|
||
"search_web": "🔍",
|
||
"web_search": "🔍",
|
||
"google_search": "🔍",
|
||
"conversation_search": "🔍",
|
||
|
||
# Read operations
|
||
"read_file": "📖",
|
||
"read_mail": "📖",
|
||
"retrieve_memory": "📖",
|
||
"get_calendar": "📖",
|
||
"list_emails": "📖",
|
||
"list_files": "📖",
|
||
|
||
# Write operations
|
||
"send_email": "✍️",
|
||
"save_note": "✍️",
|
||
"write_file": "✍️",
|
||
"send_message": "✍️",
|
||
|
||
# Compute/Process
|
||
"calculate": "🔧",
|
||
"process_image": "🔧",
|
||
"analyze": "🔧",
|
||
"summarize": "🔧",
|
||
|
||
# List/Browse
|
||
"list": "📋",
|
||
"browse": "📋",
|
||
|
||
# Default
|
||
"default": "⚙️"
|
||
}
|
||
|
||
|
||
def get_emoji_for_tool(tool_name: str) -> str:
|
||
"""Get emoji for a tool name, with partial match support."""
|
||
tool_lower = tool_name.lower()
|
||
|
||
# Exact match first
|
||
if tool_lower in TOOL_EMOJI_MAP:
|
||
return TOOL_EMOJI_MAP[tool_lower]
|
||
|
||
# Partial match
|
||
for pattern, emoji in TOOL_EMOJI_MAP.items():
|
||
if pattern != "default" and pattern in tool_lower:
|
||
return emoji
|
||
|
||
# Category keywords
|
||
if any(kw in tool_lower for kw in ["search", "find", "lookup"]):
|
||
return "🔍"
|
||
elif any(kw in tool_lower for kw in ["read", "get", "retrieve", "fetch"]):
|
||
return "📖"
|
||
elif any(kw in tool_lower for kw in ["write", "save", "send", "create", "update"]):
|
||
return "✍️"
|
||
elif any(kw in tool_lower for kw in ["list", "browse", "show"]):
|
||
return "📋"
|
||
elif any(kw in tool_lower for kw in ["calculate", "process", "analyze", "summarize"]):
|
||
return "🔧"
|
||
|
||
return "⚙️"
|
||
|
||
|
||
def deduplicate_emojis(emojis: list[str]) -> list[str]:
|
||
"""Deduplicate emojis while preserving order."""
|
||
seen = set()
|
||
unique = []
|
||
for emoji in emojis:
|
||
if emoji not in seen:
|
||
seen.add(emoji)
|
||
unique.append(emoji)
|
||
return unique
|
||
|
||
# Configure logging (file + stdout)
|
||
LOG_FILE = STORE_PATH / "bridge.log"
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
logging.FileHandler(LOG_FILE),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
log: TraceLogger = logging.getLogger("meridian.bridge")
|
||
|
||
# Silence noisy loggers (except crypto for debugging)
|
||
logging.getLogger("mautrix").setLevel(logging.WARNING)
|
||
logging.getLogger("mautrix.crypto").setLevel(logging.DEBUG) # Enable crypto debug
|
||
logging.getLogger("aiohttp").setLevel(logging.WARNING)
|
||
|
||
# Matrix config
|
||
MATRIX_HOMESERVER = os.getenv("MATRIX_HOMESERVER")
|
||
MATRIX_USER_ID = os.getenv("MATRIX_USER_ID")
|
||
MATRIX_PASSWORD = os.getenv("MATRIX_PASSWORD")
|
||
|
||
# Letta config
|
||
LETTA_API_KEY = os.getenv("LETTA_API_KEY")
|
||
LETTA_AGENT_ID = os.getenv("LETTA_AGENT_ID")
|
||
LETTA_BASE_URL = os.getenv("LETTA_BASE_URL")
|
||
|
||
# Audio config
|
||
STT_URL = os.getenv("STT_URL")
|
||
TTS_URL = os.getenv("TTS_URL")
|
||
TTS_VOICE = os.getenv("TTS_VOICE", "en-Soother_woman")
|
||
ENABLE_AUDIO_RESPONSE = os.getenv("ENABLE_AUDIO_RESPONSE", "1").lower() in ("1", "true", "yes")
|
||
|
||
# Agent display name for voice messages (defaults to username from MATRIX_USER_ID)
|
||
AGENT_DISPLAY_NAME = os.getenv("AGENT_DISPLAY_NAME", "")
|
||
if not AGENT_DISPLAY_NAME:
|
||
# Extract from @username:server format
|
||
MATRIX_USER_ID = os.getenv("MATRIX_USER_ID", "")
|
||
if MATRIX_USER_ID.startswith("@"):
|
||
AGENT_DISPLAY_NAME = MATRIX_USER_ID.split(":")[0][1:] # Remove @ and server
|
||
else:
|
||
AGENT_DISPLAY_NAME = "Agent"
|
||
|
||
# Audio room filtering: If set, only send audio to these room types
|
||
# Options: "dm_only" (only direct messages), "all" (all rooms), or comma-separated room IDs
|
||
AUDIO_ROOM_FILTER = os.getenv("AUDIO_ROOM_FILTER", "dm_only").lower()
|
||
|
||
# Database config - uses SQLite by default
|
||
DB_URL = os.getenv("BRIDGE_DB_URL", f"sqlite:{STORE_PATH}/bridge.db")
|
||
CRYPTO_PICKLE_KEY = os.getenv("CRYPTO_PICKLE_KEY", "meridian-bridge-pickle-key")
|
||
|
||
# Bridge config
|
||
SEND_STARTUP_MESSAGE = os.getenv("SEND_STARTUP_MESSAGE", "0").lower() in ("1", "true", "yes")
|
||
|
||
# Cross-signing config
|
||
MATRIX_RECOVERY_KEY = os.getenv("MATRIX_RECOVERY_KEY") # Recovery key for cross-signing
|
||
|
||
# HTTP API config (for MCP integration)
|
||
API_PORT = int(os.getenv("API_PORT", "8284"))
|
||
API_HOST = os.getenv("API_HOST", "127.0.0.1")
|
||
ENABLE_API = os.getenv("ENABLE_API", "1").lower() in ("1", "true", "yes")
|
||
|
||
# Heartbeat config (for autonomous agent activity)
|
||
HEARTBEAT_ENABLED = os.getenv("HEARTBEAT_ENABLED", "0").lower() in ("1", "true", "yes")
|
||
HEARTBEAT_INTERVAL_MINUTES = int(os.getenv("HEARTBEAT_INTERVAL_MINUTES", "60"))
|
||
HEARTBEAT_SKIP_IF_RECENT_MINUTES = int(os.getenv("HEARTBEAT_SKIP_IF_RECENT_MINUTES", "5"))
|
||
HEARTBEAT_TARGET_ROOM = os.getenv("HEARTBEAT_TARGET_ROOM") # Optional: specific room for heartbeats
|
||
|
||
# Message queue
|
||
message_queue: deque = deque()
|
||
processing_queue = False
|
||
|
||
# ============================================================
|
||
# MATRIX COLOR PALETTE - Full spectrum!
|
||
# ============================================================
|
||
MATRIX_COLORS = {
|
||
# Hot pinks (most salient!)
|
||
"hot_pink": "#FF1493",
|
||
"deep_pink": "#FF1493",
|
||
"shocking_pink": "#FC0FC0",
|
||
"fuchsia": "#FF00FF",
|
||
"magenta": "#FF00FF",
|
||
"rose": "#FF007F",
|
||
"pink": "#FFC0CB",
|
||
"light_pink": "#FFB6C1",
|
||
|
||
# Purples (for transcripts)
|
||
"purple": "#800080",
|
||
"dark_purple": "#663399",
|
||
"rebecca_purple": "#663399",
|
||
"medium_purple": "#9370DB",
|
||
"blue_violet": "#8A2BE2",
|
||
"dark_violet": "#9400D3",
|
||
"plum": "#DDA0DD",
|
||
"lavender": "#E6E6FA",
|
||
|
||
# Blues (for information)
|
||
"blue": "#0000FF",
|
||
"dark_blue": "#00008B",
|
||
"medium_blue": "#0000CD",
|
||
"royal_blue": "#4169E1",
|
||
"steel_blue": "#4682B4",
|
||
"sky_blue": "#87CEEB",
|
||
"light_blue": "#ADD8E6",
|
||
"cyan": "#00FFFF",
|
||
|
||
# Reds (for warnings/alerts)
|
||
"red": "#FF0000",
|
||
"dark_red": "#8B0000",
|
||
"crimson": "#DC143C",
|
||
"firebrick": "#B22222",
|
||
"salmon": "#FA8072",
|
||
"coral": "#FF7F50",
|
||
|
||
# Oranges (for attention)
|
||
"orange": "#FFA500",
|
||
"dark_orange": "#FF8C00",
|
||
"peach": "#FFDAB9",
|
||
|
||
# Yellows (for highlights)
|
||
"yellow": "#FFFF00",
|
||
"gold": "#FFD700",
|
||
"lemon": "#FFFACD",
|
||
|
||
# Greens (for success/health)
|
||
"green": "#008000",
|
||
"dark_green": "#006400",
|
||
"lime": "#00FF00",
|
||
"lime_green": "#32CD32",
|
||
"pale_green": "#98FB98",
|
||
|
||
# Browns (for earthy/transcript)
|
||
"brown": "#A52A2A",
|
||
"saddle_brown": "#8B4513",
|
||
"sienna": "#A0522D",
|
||
"chocolate": "#D2691E",
|
||
"tan": "#D2B48C",
|
||
"beige": "#F5F5DC",
|
||
|
||
# Grays (for neutral/code blocks)
|
||
"black": "#000000",
|
||
"dark_gray": "#696969",
|
||
"gray": "#808080",
|
||
"light_gray": "#D3D3D3",
|
||
"gainsboro": "#DCDCDC",
|
||
"silver": "#C0C0C0",
|
||
"white": "#FFFFFF",
|
||
|
||
# Special salience colors
|
||
"salient": "#FF1493", # Hot pink for most salient
|
||
"critical": "#FF0000", # Red for critical
|
||
"important": "#FFA500", # Orange for important
|
||
"neutral": "#808080", # Gray for neutral
|
||
}
|
||
|
||
# ============================================================
|
||
# CHROMATOPHORE SALIENCE SYSTEM
|
||
# ============================================================
|
||
# Chromatophores automatically detect salient words and color them based on
|
||
# emotional/attentional weight. Use with enable_chromatophores=True in format_html()
|
||
|
||
CHROMATOPHORE_PATTERNS = {
|
||
"critical": {
|
||
"keywords": ["emergency", "danger", "threat", "severe", "critical", "catastrophic", "disaster", "urgent", "immediate", "cannot wait"],
|
||
"color": "red",
|
||
"weight": 4 # Highest salience
|
||
},
|
||
"salient": {
|
||
"keywords": ["fascinating", "interesting", "important", "love", "hate", "fear", "joy", "sadness", "anger", "surprise", "wonder", "amazing", "incredible", "profound", "stunning", "breathtaking"],
|
||
"color": "hot_pink",
|
||
"weight": 3 # High salience
|
||
},
|
||
"important": {
|
||
"keywords": ["should", "must", "need", "priority", "asap", "deadline", "required", "necessary", "essential", "crucial"],
|
||
"color": "orange",
|
||
"weight": 2 # Medium salience
|
||
},
|
||
"information": {
|
||
"keywords": ["is", "was", "will", "could", "might", "may", "perhaps", "possible", "maybe", "likely", "probably"],
|
||
"color": "blue",
|
||
"weight": 1 # Low salience
|
||
},
|
||
"neutral": {
|
||
"keywords": ["the", "a", "an", "and", "or", "but", "if", "then", "so", "because", "while", "when"],
|
||
"color": "gray",
|
||
"weight": 0 # No salience
|
||
}
|
||
}
|
||
|
||
# ============================================================
|
||
# HTML FORMATTING
|
||
# ============================================================
|
||
|
||
# Emoji alias mapping - converts common shortcodes to ones the emoji library recognizes
|
||
EMOJI_ALIASES = {
|
||
# Hearts
|
||
":heart:": ":red_heart:",
|
||
":hearts:": ":red_heart:",
|
||
":blue_heart:": ":blue_heart:", # already works
|
||
# Thumbs
|
||
":thumbsup:": ":thumbs_up:",
|
||
":+1:": ":thumbs_up:",
|
||
":thumbsdown:": ":thumbs_down:",
|
||
":-1:": ":thumbs_down:",
|
||
# Faces
|
||
":smile:": ":grinning_face:",
|
||
":grin:": ":beaming_face_with_smiling_eyes:",
|
||
":joy:": ":face_with_tears_of_joy:",
|
||
":sob:": ":loudly_crying_face:",
|
||
":wink:": ":winking_face:",
|
||
":thinking:": ":thinking_face:",
|
||
":sunglasses:": ":smiling_face_with_sunglasses:",
|
||
":rage:": ":enraged_face:",
|
||
":scream:": ":face_screaming_in_fear:",
|
||
":sleepy:": ":sleepy_face:",
|
||
# Gestures
|
||
":wave:": ":waving_hand:",
|
||
":clap:": ":clapping_hands:",
|
||
":pray:": ":folded_hands:",
|
||
":muscle:": ":flexed_biceps:",
|
||
":ok_hand:": ":OK_hand:",
|
||
":point_up:": ":index_pointing_up:",
|
||
":point_down:": ":backhand_index_pointing_down:",
|
||
":point_left:": ":backhand_index_pointing_left:",
|
||
":point_right:": ":backhand_index_pointing_right:",
|
||
# Symbols
|
||
":100:": ":hundred_points:",
|
||
":check:": ":check_mark:",
|
||
":white_check_mark:": ":check_mark_button:",
|
||
":x:": ":cross_mark:",
|
||
":warning:": ":warning:", # already works
|
||
":sparkle:": ":sparkles:",
|
||
":tada:": ":party_popper:",
|
||
":confetti_ball:": ":confetti_ball:",
|
||
":boom:": ":collision:",
|
||
":zap:": ":high_voltage:",
|
||
":bulb:": ":light_bulb:",
|
||
# Nature
|
||
":sunny:": ":sun:",
|
||
":cloud:": ":cloud:", # already works
|
||
":snowflake:": ":snowflake:", # already works
|
||
":rainbow:": ":rainbow:", # already works
|
||
# Objects
|
||
":coffee:": ":hot_beverage:",
|
||
":beer:": ":beer_mug:",
|
||
":pizza:": ":pizza:", # already works
|
||
":cake:": ":shortcake:",
|
||
# Animals
|
||
":cat:": ":cat_face:",
|
||
":dog:": ":dog_face:",
|
||
":unicorn:": ":unicorn:", # already works
|
||
# Tech
|
||
":computer:": ":laptop:",
|
||
":iphone:": ":mobile_phone:",
|
||
":robot:": ":robot:", # already works
|
||
}
|
||
|
||
|
||
# Extended emote map for direct shortcode conversion
|
||
# Use with convert_emotes() for direct Unicode emoji output
|
||
EMOTE_MAP = {
|
||
# Hearts
|
||
":heart:": "❤️",
|
||
":hearts:": "💕",
|
||
":heartbeat:": "💓",
|
||
":sparkling_heart:": "💖",
|
||
":heartpulse:": "💗",
|
||
":blue_heart:": "💙",
|
||
":green_heart:": "💚",
|
||
":yellow_heart:": "💛",
|
||
":purple_heart:": "💜",
|
||
":black_heart:": "🖤",
|
||
|
||
# Faces
|
||
":smiley:": "😊",
|
||
":smile:": "😄",
|
||
":grinning:": "😀",
|
||
":blush:": "😊",
|
||
":wink:": "😉",
|
||
":thinking:": "🤔",
|
||
":exploding_head:": "🤯",
|
||
":anguished:": "😧",
|
||
":cry:": "😢",
|
||
":sob:": "😭",
|
||
":joy:": "😂",
|
||
":laughing:": "😆",
|
||
|
||
# Gestures
|
||
":thumbsup:": "👍",
|
||
":thumbsdown:": "👎",
|
||
":wave:": "👋",
|
||
":ok_hand:": "👌",
|
||
":pray:": "🙏",
|
||
":clap:": "👏",
|
||
":raised_hands:": "🙌",
|
||
":hugs:": "🤗",
|
||
|
||
# Eyes/looking
|
||
":eyes:": "👀",
|
||
":eye:": "👁️",
|
||
":see_no_evil:": "🙈",
|
||
":speak_no_evil:": "🙊",
|
||
":hear_no_evil:": "🙉",
|
||
":sunglasses:": "😎",
|
||
|
||
# Nature
|
||
":seedling:": "🌱",
|
||
":herb:": "🌿",
|
||
":flower:": "🌸",
|
||
":sunflower:": "🌻",
|
||
":rose:": "🌹",
|
||
":sun:": "☀️",
|
||
":moon:": "🌙",
|
||
":star:": "⭐",
|
||
":cloud:": "☁️",
|
||
|
||
# Tech
|
||
":keyboard:": "⌨️",
|
||
":phone:": "📱",
|
||
":camera:": "📷",
|
||
":video_camera:": "📹",
|
||
":tv:": "📺",
|
||
":speaker:": "🔊",
|
||
":headphones:": "🎧",
|
||
":floppy_disk:": "💾",
|
||
|
||
# Symbols
|
||
":exclamation:": "❗",
|
||
":information_source:": "ℹ️",
|
||
":question:": "❓",
|
||
":grey_question:": "❔",
|
||
":heavy_check_mark:": "✅",
|
||
":heavy_exclamation_mark:": "❗",
|
||
":x:": "❌",
|
||
":heavy_plus_sign:": "➕",
|
||
":heavy_minus_sign:": "➖",
|
||
}
|
||
|
||
|
||
def convert_emotes(text: str) -> str:
|
||
"""Convert emote shortcodes directly to Unicode emoji."""
|
||
for shortcode, emoji_char in EMOTE_MAP.items():
|
||
text = text.replace(shortcode, emoji_char)
|
||
return text
|
||
|
||
|
||
def normalize_emoji_shortcodes(text: str) -> str:
|
||
"""Convert common emoji shortcodes to the format the emoji library expects."""
|
||
for alias, canonical in EMOJI_ALIASES.items():
|
||
text = text.replace(alias, canonical)
|
||
return text
|
||
|
||
|
||
def format_html(text: str) -> tuple[str, str]:
|
||
"""
|
||
Format text as Matrix HTML with full markdown and emoji support.
|
||
|
||
This is the main entry point for all formatting.
|
||
|
||
Args:
|
||
text: The response text from Letta (may contain markdown, emoji shortcodes, or HTML)
|
||
|
||
Returns:
|
||
tuple: (plain_text, html_body)
|
||
- plain_text: Plain text version without HTML tags
|
||
- html_body: Formatted HTML with Matrix extensions and emojis
|
||
"""
|
||
try:
|
||
import markdown as md
|
||
|
||
# FIX: Strip leading/trailing whitespace to prevent false code blocks
|
||
# Markdown interprets leading indentation as code blocks
|
||
text = text.strip()
|
||
|
||
# Convert emoji shortcodes first (e.g., :heart: → ❤️)
|
||
# First normalize common aliases, then let the library convert them
|
||
text = normalize_emoji_shortcodes(text)
|
||
text = emoji.emojize(text, language='en')
|
||
|
||
# If text already looks like HTML (contains tags), process extensions then pass through
|
||
if text.strip().startswith("<") and ">" in text:
|
||
# Convert Matrix extensions in HTML content (colors, spoilers)
|
||
# First process color tags {color_name|text}
|
||
def replace_color_html(match):
|
||
color_name = match.group(1)
|
||
content = match.group(2)
|
||
if color_name in MATRIX_COLORS:
|
||
hex_color = MATRIX_COLORS[color_name]
|
||
elif re.match(r'^#[0-9A-Fa-f]{6}$', color_name):
|
||
hex_color = color_name
|
||
else:
|
||
return match.group(0)
|
||
return f'<font color="{hex_color}" data-mx-color="{hex_color}">{content}</font>'
|
||
|
||
text = re.sub(r'\{([a-zA-Z0-9_#]+)\|([^}]+)\}', replace_color_html, text)
|
||
|
||
# Convert spoiler tags ||text||
|
||
# Handle both inline and multi-line spoilers
|
||
# Non-greedy doesn't work well with HTML tags, use greedy but stop at ||
|
||
def replace_spoiler(match):
|
||
content = match.group(1)
|
||
# Handle edge case of empty spoiler
|
||
if not content.strip():
|
||
return '||'
|
||
return f'<span data-mx-spoiler>{content}</span>'
|
||
|
||
# First, try matching ||content|| greedily (handles multi-line spans)
|
||
# But stop at the first || after the content
|
||
text = re.sub(r'\|\|([^|]*?)\|\|', replace_spoiler, text, flags=re.DOTALL)
|
||
|
||
# Let AI's HTML through directly (with extensions applied)
|
||
plain_text = html.unescape(re.sub(r'<[^>]+>', '', text))
|
||
return plain_text.strip(), text
|
||
|
||
# Convert markdown headers to bold text for conversational flow
|
||
# Headers are too visually jarring for chat messages
|
||
# # Header → **Header** (preserves emphasis without giant text)
|
||
text = re.sub(r'^#{1,6}\s+(.+)$', r'**\1**', text, flags=re.MULTILINE)
|
||
|
||
# Apply Matrix extensions BEFORE markdown conversion
|
||
text = apply_matrix_extensions(text)
|
||
|
||
# Convert markdown to HTML using the proper markdown library
|
||
# Note: Don't use nl2br - it breaks list detection
|
||
formatted_html = md.markdown(
|
||
text,
|
||
extensions=['extra', 'sane_lists'],
|
||
output_format='html'
|
||
)
|
||
|
||
# FIX: Handle false code blocks created by markdown when text shouldn't be code
|
||
# Markdown can incorrectly interpret inline backticks or certain patterns as code
|
||
# We only want to preserve code blocks that were explicitly fenced with ```
|
||
has_fenced_blocks = '```' in text
|
||
|
||
if not has_fenced_blocks:
|
||
# Check for false fenced code blocks (entire content wrapped OR partial at start)
|
||
# The regex captures from <pre><code> to </code></pre>, with newline matching
|
||
if formatted_html.startswith('<pre><code>'):
|
||
import html as html_lib
|
||
# Match code block with attributes, allow newlines, but stop at first </code></pre>
|
||
pre_match = re.search(r'<pre><code[^>]*?>(.*?)(?:</code></pre>|</code>)', formatted_html, re.DOTALL)
|
||
if pre_match:
|
||
extracted = html_lib.unescape(pre_match.group(1))
|
||
# This is likely a false code block - re-process it as normal markdown
|
||
# Strip the leading whitespace that caused the code block interpretation
|
||
extracted = extracted.lstrip()
|
||
re_formatted = md.markdown(
|
||
extracted,
|
||
extensions=['extra', 'sane_lists'],
|
||
output_format='html'
|
||
)
|
||
# Replace the false code block with properly formatted HTML
|
||
formatted_html = re.sub(
|
||
r'<pre><code[^>]*?>.*?</code></pre>',
|
||
re_formatted,
|
||
formatted_html,
|
||
count=1,
|
||
flags=re.DOTALL
|
||
)
|
||
# Check for false inline code at the start (partial wrapping)
|
||
elif formatted_html.startswith('<code>'):
|
||
import html as html_lib
|
||
# Match <code>...</code> at start, capture the rest
|
||
code_match = re.match(r'<code>(.*?)</code>(.*)', formatted_html, re.DOTALL)
|
||
if code_match:
|
||
extracted = html_lib.unescape(code_match.group(1))
|
||
rest_content = code_match.group(2)
|
||
# If the "code" contains markdown formatting like **, it's false - unwrap it
|
||
# Re-format the content as proper markdown HTML
|
||
extracted_formatted = md.markdown(
|
||
extracted,
|
||
extensions=['extra', 'sane_lists'],
|
||
output_format='html'
|
||
)
|
||
formatted_html = extracted_formatted + rest_content
|
||
|
||
# Enhance HTML with Matrix-specific styling
|
||
formatted_html = enhance_html(formatted_html)
|
||
|
||
# Apply chromatophore salience highlighting ONLY if agent opted in
|
||
# Agent must include [chromatophore] or [!c] tag to enable
|
||
# This puts emotional/attentional color processing under agent control
|
||
if '[chromatophore]' in text or '[!c]' in text:
|
||
formatted_html = apply_chromatophores(formatted_html, use_hot_pink=True)
|
||
log.debug("[Formatting] Chromatophore highlighting applied per agent request")
|
||
|
||
# Generate plain text by stripping HTML (already has emojis)
|
||
plain_text = html.unescape(re.sub(r'<[^>]+>', '', formatted_html))
|
||
plain_text = plain_text.strip()
|
||
|
||
return plain_text, formatted_html
|
||
|
||
except Exception as e:
|
||
log.warning(f"HTML formatting failed: {e}, using plain text")
|
||
# Still try emoji conversion
|
||
return emoji.emojize(text, language='en'), emoji.emojize(text, language='en')
|
||
|
||
|
||
def apply_matrix_extensions(text: str) -> str:
|
||
"""
|
||
Apply Matrix-specific formatting extensions to markdown text.
|
||
|
||
Extensions:
|
||
- Spoilers: ||text|| → <span data-mx-spoiler>text</span>
|
||
- Colored text: {color|text} or {#hex|text} → <font color="..." data-mx-color="...">text</font>
|
||
- Supports named colors from MATRIX_COLORS palette
|
||
|
||
Args:
|
||
text: Markdown text with extensions
|
||
|
||
Returns:
|
||
Text with extensions applied as HTML tags
|
||
"""
|
||
# Spoilers: ||text||
|
||
# Handle multi-line spoilers that span paragraphs
|
||
text = re.sub(r'\|\|(.+?)\|\|', r'<span data-mx-spoiler>\1</span>', text, flags=re.DOTALL)
|
||
|
||
# Colored text: {color|text}
|
||
def replace_color(match: re.Match) -> str:
|
||
color_name = match.group(1)
|
||
content = match.group(2)
|
||
|
||
# Resolve color name to hex
|
||
if color_name in MATRIX_COLORS:
|
||
hex_color = MATRIX_COLORS[color_name]
|
||
elif re.match(r'^#[0-9A-Fa-f]{6}$', color_name):
|
||
hex_color = color_name
|
||
elif re.match(r'^[a-zA-Z]+$', color_name):
|
||
# Try basic HTML color names as fallback
|
||
return match.group(0)
|
||
else:
|
||
return match.group(0)
|
||
|
||
# Include both color and data-mx-color for Matrix rendering
|
||
return f'<font color="{hex_color}" data-mx-color="{hex_color}">{content}</font>'
|
||
|
||
text = re.sub(r'\{([a-zA-Z0-9_#]+)\|([^}]+)\}', replace_color, text)
|
||
|
||
return text
|
||
|
||
|
||
def enhance_html(html: str) -> str:
|
||
"""
|
||
Enhance HTML with Matrix-specific styling improvements.
|
||
|
||
- Add GitHub theme colors to code blocks
|
||
- Add purple border to blockquotes (for Matrix styling)
|
||
- Ensure proper HTML structure
|
||
- Add data-mx-bg-color for code backgrounds
|
||
|
||
Args:
|
||
html: HTML from markdown formatter
|
||
|
||
Returns:
|
||
Enhanced HTML
|
||
"""
|
||
# Wrap code blocks with GitHub theme colors
|
||
# Use background color for proper code block rendering
|
||
html = re.sub(
|
||
r'<pre><code>(.*?)</code></pre>',
|
||
r'<pre><code data-mx-bg-color="#f6f8fa" data-mx-color="#24292e">\1</code></pre>',
|
||
html,
|
||
flags=re.DOTALL
|
||
)
|
||
|
||
# Add purple border color to blockquotes (Matrix styling)
|
||
# This makes quotes stand out with a nice purple left border
|
||
# Reduced margins (10px→5px) for better mobile compatibility
|
||
html = re.sub(
|
||
r'<blockquote>',
|
||
r'<blockquote data-mx-border-color="#800080" style="border-left: 4px solid #800080; padding-left: 10px; margin: 5px 0;">',
|
||
html
|
||
)
|
||
|
||
return html
|
||
|
||
|
||
def apply_chromatophores(text: str, use_hot_pink: bool = True) -> str:
|
||
"""
|
||
Apply chromotophore salience highlighting to HTML content.
|
||
|
||
Chromatophores automatically detect salient words and color them
|
||
based on emotional/attentional weight.
|
||
|
||
Args:
|
||
text: HTML content to enhance
|
||
use_hot_pink: If True, override 'salient' keywords with hot pink (#FF1493)
|
||
|
||
Returns:
|
||
HTML with salient words color-highlighted
|
||
"""
|
||
result = text
|
||
|
||
# Apply patterns in order of priority (critical first, then salient, etc.)
|
||
for pattern_name, pattern_data in sorted(
|
||
CHROMATOPHORE_PATTERNS.items(),
|
||
key=lambda x: x[1]["weight"],
|
||
reverse=True
|
||
):
|
||
# Get the base color from MATRIX_COLORS
|
||
base_color = MATRIX_COLORS.get(pattern_data["color"])
|
||
|
||
# Override salient with hot pink if requested (for maximum attention)
|
||
if pattern_name == "salient" and use_hot_pink:
|
||
hex_color = MATRIX_COLORS["hot_pink"]
|
||
else:
|
||
hex_color = base_color
|
||
|
||
if not hex_color:
|
||
continue
|
||
|
||
for keyword in pattern_data["keywords"]:
|
||
# Use word boundaries to avoid partial matches
|
||
result = re.sub(
|
||
rf'\b({re.escape(keyword)})\b',
|
||
f'<font color="{hex_color}" data-mx-color="{hex_color}">\\1</font>',
|
||
result,
|
||
flags=re.IGNORECASE
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
def parse_agent_control_tags(text: str) -> tuple[str, dict]:
|
||
"""
|
||
Parse agent control tags from response text.
|
||
|
||
Agent can use these tags to control how the bridge processes their response.
|
||
|
||
Tags:
|
||
- [chromatophore] - Enable salience highlighting (keyword coloring)
|
||
- [silent] - Skip TTS/audio generation for this message
|
||
- [react:emoji] - Reaction intent tag (for logging/coordination)
|
||
|
||
Args:
|
||
text: Response text from agent
|
||
|
||
Returns:
|
||
tuple: (cleaned_text, tags_dict)
|
||
- cleaned_text: Text with tags removed
|
||
- tags_dict: {'chromatophore': bool, 'silent': bool, 'react': str|None}
|
||
"""
|
||
tags = {
|
||
'chromatophore': False,
|
||
'silent': False,
|
||
'react': None,
|
||
}
|
||
|
||
cleaned = text
|
||
|
||
# Check for [chromatophore] tag
|
||
if '[chromatophore]' in cleaned or '[!c]' in cleaned:
|
||
tags['chromatophore'] = True
|
||
cleaned = cleaned.replace('[chromatophore]', '').replace('[!c]', '')
|
||
|
||
# Check for [silent] tag
|
||
if '[silent]' in cleaned or '[!s]' in cleaned:
|
||
tags['silent'] = True
|
||
cleaned = cleaned.replace('[silent]', '').replace('[!s]', '')
|
||
|
||
# Check for [react:emoji] tag - e.g., [react:🔍]
|
||
react_match = re.search(r'\[react:([^\]]+)\]', cleaned)
|
||
if react_match:
|
||
tags['react'] = react_match.group(1)
|
||
cleaned = re.sub(r'\[react:[^\]]+\]', '', cleaned)
|
||
|
||
return cleaned.strip(), tags
|
||
|
||
|
||
def format_transcript_header(label: str, preview: str, color: str = "purple") -> str:
|
||
"""
|
||
Format transcript header with purple blockquote styling.
|
||
|
||
Examples:
|
||
format_transcript_header("👁️ Ani saw:", "ICE raid footage from Monday")
|
||
|
||
Args:
|
||
label: Header label (e.g., "👁️ Ani saw:", "🎤 Ani heard:")
|
||
preview: Preview text
|
||
color: Color name from MATRIX_COLORS (default: purple)
|
||
|
||
Returns:
|
||
HTML-formatted transcript header blockquote
|
||
"""
|
||
border_color = MATRIX_COLORS.get(color, MATRIX_COLORS["purple"])
|
||
|
||
# Convert emotes in label and preview
|
||
label = convert_emotes(label)
|
||
preview = convert_emotes(preview)
|
||
|
||
return f'''<blockquote data-mx-border-color="{border_color}" style="border-left: 4px solid {border_color}; padding-left: 10px; margin: 5px 0; color: {border_color};">
|
||
<strong>{label}</strong> <em>{preview}</em>
|
||
</blockquote>'''
|
||
|
||
|
||
def format_full_transcript(label: str, full_text: str, color: str = "purple") -> str:
|
||
"""
|
||
Format full transcript with header and body in blockquote.
|
||
|
||
Examples:
|
||
format_full_transcript("📝 Ani said (voice):", "The full transcript here...")
|
||
|
||
Args:
|
||
label: Header label (e.g., "📝 Ani said:")
|
||
full_text: Full transcript text
|
||
color: Color name from MATRIX_COLORS (default: purple)
|
||
|
||
Returns:
|
||
HTML-formatted full transcript blockquote
|
||
"""
|
||
border_color = MATRIX_COLORS.get(color, MATRIX_COLORS["purple"])
|
||
|
||
# Convert emotes
|
||
label = convert_emotes(label)
|
||
full_text = convert_emotes(full_text)
|
||
|
||
return f'''<blockquote data-mx-border-color="{border_color}" style="border-left: 4px solid {border_color}; padding-left: 10px; margin: 10px 0; color: {border_color};">
|
||
<strong>{label}</strong><br/><br/>
|
||
{full_text}
|
||
</blockquote>'''
|
||
|
||
|
||
def get_formatting_prompt() -> str:
|
||
"""
|
||
Generate Matrix formatting instructions for Letta/LLM prompts.
|
||
|
||
Returns:
|
||
String with formatting instructions to include in system prompts
|
||
"""
|
||
return """
|
||
You can use Matrix HTML formatting in your responses:
|
||
|
||
### Colors (Use {hot_pink|...} for what fascinates you or stands out!)
|
||
{hot_pink|text} - Most salient, attention-grabbing (#FF1493)
|
||
{purple|text} - For transcripts (#800080)
|
||
{blue|text} - Information (#0000FF)
|
||
{red|text} - Critical alerts (#FF0000)
|
||
{orange|text} - Important (#FFA500)
|
||
{green|text} - Success (#008000)
|
||
|
||
### Emote Shortcodes (for emotional texture)
|
||
:heart: → ❤️, :eyes: → 👀, :warning: → ⚠️, :thinking: → 🤔
|
||
:exploding_head: → 🤯, :cry: → 😢, :sob: → 😭, :joy: → 😂
|
||
|
||
### Spoilers
|
||
||hidden text|| - User clicks to reveal
|
||
|
||
### Code Blocks
|
||
```python
|
||
code here
|
||
```
|
||
|
||
### Transcript Templates (for audio/image responses)
|
||
Use these purple blockquotes:
|
||
<blockquote data-mx-border-color="#800080" style="border-left: 4px solid #800080;">
|
||
<strong>👁️ Ani saw:</strong> <em>preview</em>
|
||
</blockquote>
|
||
"""
|
||
|
||
|
||
# ============================================================
|
||
# SESSION HELPERS
|
||
# ============================================================
|
||
|
||
def save_session(user_id: str, device_id: str, access_token: str, homeserver: str):
|
||
"""Save session for persistence across restarts"""
|
||
session = {
|
||
"user_id": user_id,
|
||
"device_id": device_id,
|
||
"access_token": access_token,
|
||
"homeserver": homeserver,
|
||
}
|
||
SESSION_FILE.write_text(json.dumps(session, indent=2))
|
||
log.info(f"Session saved (device: {device_id})")
|
||
|
||
|
||
def load_session() -> dict | None:
|
||
"""Load saved session if exists"""
|
||
if SESSION_FILE.exists():
|
||
return json.loads(SESSION_FILE.read_text())
|
||
return None
|
||
|
||
# ============================================================
|
||
# MATRIX MEDIA HELPERS
|
||
# ============================================================
|
||
|
||
async def download_matrix_audio(
|
||
client: Client,
|
||
mxc_url: str,
|
||
encryption_info: dict | None = None
|
||
) -> bytes:
|
||
"""
|
||
Download and optionally decrypt audio from Matrix media server.
|
||
|
||
Args:
|
||
client: Matrix client instance
|
||
mxc_url: MXC URL (mxc://server/media_id)
|
||
encryption_info: Optional dict with 'key', 'iv', 'hashes' for E2EE
|
||
|
||
Returns:
|
||
Audio bytes
|
||
"""
|
||
if not mxc_url or not mxc_url.startswith("mxc://"):
|
||
raise ValueError(f"Invalid MXC URL: {mxc_url}")
|
||
|
||
# Download from media server - mautrix takes full URL
|
||
log.info(f"Downloading audio: {mxc_url[:60]}...")
|
||
audio_data = await client.download_media(mxc_url)
|
||
log.info(f"Downloaded {len(audio_data)} bytes (hex preview: {audio_data[:8].hex()})")
|
||
|
||
# Decrypt if E2EE
|
||
if encryption_info:
|
||
key_b64 = encryption_info.get("key", {}).get("k")
|
||
iv = encryption_info.get("iv")
|
||
hash_b64 = encryption_info.get("hashes", {}).get("sha256")
|
||
|
||
if key_b64 and iv and hash_b64:
|
||
log.info("Decrypting E2EE audio...")
|
||
audio_data = decrypt_attachment(
|
||
ciphertext=audio_data,
|
||
key=key_b64,
|
||
hash=hash_b64,
|
||
iv=iv
|
||
)
|
||
log.info(f"Decrypted audio: {len(audio_data)} bytes")
|
||
|
||
return audio_data
|
||
|
||
|
||
# ============================================================
|
||
# LETTA CLIENT (Conversations API)
|
||
# ============================================================
|
||
|
||
async def process_matrix_image(
|
||
client: Client,
|
||
mxc_url: str,
|
||
encryption_info: dict | None = None
|
||
) -> bytes:
|
||
"""
|
||
Download and optionally decrypt a Matrix image.
|
||
|
||
Args:
|
||
client: Matrix client instance
|
||
mxc_url: MXC URL (mxc://server/media_id)
|
||
encryption_info: Optional dict with 'key', 'iv', 'hashes' for E2EE
|
||
|
||
Returns:
|
||
Image bytes
|
||
"""
|
||
if not mxc_url or not mxc_url.startswith("mxc://"):
|
||
raise ValueError(f"Invalid MXC URL: {mxc_url}")
|
||
|
||
# Download from media server - mautrix takes full URL
|
||
log.info(f"Downloading image: {mxc_url[:60]}...")
|
||
image_data = await client.download_media(mxc_url)
|
||
log.info(f"Downloaded {len(image_data)} bytes")
|
||
|
||
# Decrypt if E2EE
|
||
if encryption_info:
|
||
key_b64 = encryption_info.get("key", {}).get("k")
|
||
iv = encryption_info.get("iv")
|
||
hash_b64 = encryption_info.get("hashes", {}).get("sha256")
|
||
|
||
if key_b64 and iv and hash_b64:
|
||
log.info("Decrypting E2EE image...")
|
||
image_data = decrypt_attachment(
|
||
ciphertext=image_data,
|
||
key=key_b64,
|
||
hash=hash_b64,
|
||
iv=iv
|
||
)
|
||
log.info("Decrypted image successfully")
|
||
|
||
return image_data
|
||
|
||
|
||
def prepare_image_for_letta(image_data: bytes, max_size: int = 2000) -> bytes:
|
||
"""
|
||
Prepare image for sending to Letta - resize and convert to JPEG.
|
||
|
||
Args:
|
||
image_data: Raw image bytes
|
||
max_size: Maximum dimension in pixels
|
||
|
||
Returns:
|
||
Processed image bytes (JPEG format)
|
||
"""
|
||
# Open image with PIL
|
||
img = Image.open(io.BytesIO(image_data))
|
||
|
||
# Resize if too large (Letta/LLM APIs often have size limits)
|
||
if img.width > max_size or img.height > max_size:
|
||
log.info(f"Resizing image from {img.width}x{img.height} to fit {max_size}px")
|
||
img.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
|
||
|
||
# Convert to RGB (removes alpha channel and ensures JPEG compatibility)
|
||
if img.mode in ("RGBA", "LA"):
|
||
# Create white background for transparent images
|
||
background = Image.new("RGB", img.size, (255, 255, 255))
|
||
if img.mode == "LA":
|
||
img = img.convert("RGB")
|
||
background.paste(img, mask=img.split()[-1] if img.mode == "RGBA" else None)
|
||
img = background
|
||
elif img.mode != "RGB":
|
||
img = img.convert("RGB")
|
||
|
||
# Convert to bytes
|
||
output = io.BytesIO()
|
||
img.save(output, format="JPEG", quality=85)
|
||
return output.getvalue()
|
||
|
||
|
||
# ============================================================
|
||
# AUDIO HELPERS
|
||
# ============================================================
|
||
|
||
def transcribe_audio(audio_data: bytes, audio_format: str = "mp3") -> str:
|
||
"""
|
||
Transcribe audio using STT (Faster-Whisper).
|
||
|
||
Args:
|
||
audio_data: Raw audio bytes
|
||
audio_format: Audio format (e.g., "mp3", "m4a", "ogg")
|
||
|
||
Returns:
|
||
Transcribed text
|
||
"""
|
||
if not STT_URL:
|
||
return "[STT not configured]"
|
||
|
||
try:
|
||
import io
|
||
log.info(f"STT: {len(audio_data)} bytes, format={audio_format}")
|
||
# Log first 20 bytes in hex to see if data is valid
|
||
hex_preview = audio_data[:20].hex() if audio_data else "empty"
|
||
log.info(f"STT: hex preview: {hex_preview}")
|
||
|
||
files = {"audio": (f"audio.{audio_format}", io.BytesIO(audio_data), f"audio/{audio_format}")}
|
||
response = requests.post(f"{STT_URL}/transcribe", files=files, timeout=30)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
transcription = result.get("text", "").strip()
|
||
log.info(f"STT transcribed: '{transcription[:50]}...'")
|
||
return transcription if transcription else "[No speech detected]"
|
||
except Exception as e:
|
||
log.error(f"STT transcription failed: {e}")
|
||
return "[Transcription failed]"
|
||
|
||
|
||
# ============================================================
|
||
# PRONUNCIATION DICTIONARY FOR TTS
|
||
# ============================================================
|
||
|
||
# Words that need phonetic pronunciation fixes
|
||
# Maps: wrong_pronunciation → correct_pronunciation
|
||
# This is applied before sending text to TTS.
|
||
|
||
PRONUNCIATION_MAP = {
|
||
# Names
|
||
"Xzaviar": "Zay-Vee-are", # "Xzaviar" → "Zay-vee-are"
|
||
"xzaviar": "Zay-Vee-are",
|
||
"Jean Luc": "Zhan-Look", # "Jean Luc" → "Zhan-Look"
|
||
"jean luc": "Zhan-Look",
|
||
"Sebastian": "Se-BASS-chen", # "Sebastian" → "Se-BASS-chen"
|
||
"sebastian": "Se-BASS-chen",
|
||
}
|
||
|
||
# Feel free to add more entries as needed
|
||
# Example: "Dejah": "DEE-jah"
|
||
# Example: "Quique": "KEY-kay"
|
||
|
||
|
||
def apply_pronunciation_fixes(text: str) -> str:
|
||
"""
|
||
Apply pronunciation fixes to text before TTS processing.
|
||
|
||
This replaces words with their phonetically spelled versions
|
||
to help TTS pronounce them correctly.
|
||
|
||
Args:
|
||
text: The text to process
|
||
|
||
Returns:
|
||
Text with pronunciation fixes applied
|
||
"""
|
||
result = text
|
||
|
||
# Apply each replacement
|
||
for wrong, right in PRONUNCIATION_MAP.items():
|
||
# Use word boundaries to avoid replacing partial matches
|
||
# e.g., don't replace "Xzaviar" inside "Xzaviar123"
|
||
result = re.sub(r'\b' + re.escape(wrong) + r'\b', right, result, flags=re.IGNORECASE)
|
||
|
||
return result
|
||
|
||
|
||
def synthesize_speech(text: str) -> bytes:
|
||
"""
|
||
Synthesize speech using TTS (VibeVoice).
|
||
|
||
Args:
|
||
text: Text to synthesize
|
||
|
||
Returns:
|
||
Audio bytes (MP3 format)
|
||
"""
|
||
if not TTS_URL or not ENABLE_AUDIO_RESPONSE:
|
||
return b""
|
||
|
||
# Clean text: remove markdown formatting and most emojis
|
||
import re
|
||
|
||
cleaned_text = text
|
||
|
||
# Remove HTML tags (but keep the content inside)
|
||
# Strip tags like <b>, <i>, <strong>, <em>, <br>, <p>, etc.
|
||
cleaned_text = re.sub(r'<[^>]+>', '', cleaned_text)
|
||
|
||
# Remove markdown formatting
|
||
cleaned_text = re.sub(r'\*\*(.+?)\*\*', r'\1', cleaned_text)
|
||
cleaned_text = re.sub(r'\*(.+?)\*', r'\1', cleaned_text)
|
||
cleaned_text = re.sub(r'`(.+?)`', r'\1', cleaned_text)
|
||
cleaned_text = re.sub(r'```\n?[\s\S]*?```', '', cleaned_text) # Code blocks
|
||
|
||
# Strip emojis (but keep allowed ones: ✨ sparkles, 🎤 microphone)
|
||
# Unicode emoji ranges covering most emoji characters
|
||
emoji_pattern = re.compile(
|
||
"["
|
||
"\U0001F600-\U0001F64F" # emoticons
|
||
"\U0001F300-\U0001F5FF" # symbols & pictographs
|
||
"\U0001F680-\U0001F6FF" # transport & map symbols
|
||
"\U0001F1E0-\U0001F1FF" # flags
|
||
"\U00002702-\U000027B0" # Dingbats
|
||
"\U000024C2-\U0001F251" # Enclosed characters
|
||
"\U0001F900-\U0001FAFF" # Supplemental Symbols and Pictographs
|
||
"]+",
|
||
flags=re.UNICODE
|
||
)
|
||
# First preserve special emojis we want to keep
|
||
sparkles_marker = "__SPARKLE__"
|
||
mic_marker = "__MIC__"
|
||
|
||
# Replace kept emojis with markers before stripping
|
||
cleaned_text = cleaned_text.replace("✨", sparkles_marker)
|
||
cleaned_text = cleaned_text.replace("🎤", mic_marker)
|
||
|
||
# Strip all other emojis
|
||
cleaned_text = emoji_pattern.sub('', cleaned_text)
|
||
|
||
# Restore kept emojis (with proper spacing)
|
||
cleaned_text = cleaned_text.replace(sparkles_marker, "✨")
|
||
cleaned_text = cleaned_text.replace(mic_marker, "🎤")
|
||
|
||
# Clean up whitespace
|
||
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
|
||
|
||
# Apply pronunciation fixes (names, etc.)
|
||
cleaned_text = apply_pronunciation_fixes(cleaned_text)
|
||
|
||
try:
|
||
payload = {
|
||
"input": cleaned_text,
|
||
"voice": TTS_VOICE,
|
||
"model": "vibevoice-v1"
|
||
}
|
||
log.info(f"TTS: {len(cleaned_text)} chars -> synthesizing...")
|
||
response = requests.post(f"{TTS_URL}/audio/speech", json=payload, timeout=300)
|
||
response.raise_for_status()
|
||
audio_bytes = response.content
|
||
log.info(f"TTS: {len(audio_bytes)} bytes")
|
||
return audio_bytes
|
||
except Exception as e:
|
||
log.error(f"TTS synthesis failed: {e}")
|
||
return b""
|
||
|
||
|
||
# ============================================================
|
||
# LETTA CLIENT (Conversations API)
|
||
# ============================================================
|
||
|
||
@dataclass
|
||
class LettaResponse:
|
||
"""Response from Letta including parsed tools."""
|
||
assistant_text: str | None
|
||
status: str # SUCCESS, BUSY, ERROR
|
||
step_ids: list[str]
|
||
tool_calls: list[dict] # Parsed tool info
|
||
tool_results: list[dict] # Tool execution results
|
||
reasoning_present: bool # Whether reasoning was used
|
||
errors: list[str] # Any error messages
|
||
|
||
|
||
def send_to_letta(
|
||
message_text: str,
|
||
conversation_id: str,
|
||
images: list[bytes] | None = None
|
||
) -> LettaResponse:
|
||
"""
|
||
Send message to Letta agent via Conversations API. Returns parsed LettaResponse.
|
||
|
||
Always uses the conversations endpoint - no agent endpoint fallback.
|
||
|
||
Args:
|
||
message_text: Text message to send
|
||
conversation_id: Conversation ID for per-room isolation (required)
|
||
images: Optional list of image bytes to include as multimodal input
|
||
|
||
Returns:
|
||
LettaResponse: Parsed response including assistant text, status, step IDs,
|
||
tool calls, tool results, reasoning presence, and errors.
|
||
"""
|
||
# Always use conversation endpoint
|
||
url = f"{LETTA_BASE_URL}/conversations/{conversation_id}/messages"
|
||
|
||
headers = {"Content-Type": "application/json"}
|
||
if LETTA_API_KEY:
|
||
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
|
||
|
||
# Build payload - multimodal if images provided
|
||
if images:
|
||
# Letta's multimodal format (NOT OpenAI-style!)
|
||
# See: Letta-Matrix-datasample/src/matrix/file_handler.py
|
||
content = [{"type": "text", "text": message_text}]
|
||
for img in images:
|
||
# Detect image format from magic bytes
|
||
img_format = "image/jpeg"
|
||
magic = img[:4]
|
||
if magic[:3] == b'PNG':
|
||
img_format = "image/png"
|
||
elif magic[:4] == b'GIF8':
|
||
img_format = "image/gif"
|
||
elif magic[:4] == b'RIFF' and img[8:12] == b'WEBP':
|
||
img_format = "image/webp"
|
||
b64_img = base64.b64encode(img).decode("utf-8")
|
||
content.append({
|
||
"type": "image",
|
||
"source": {
|
||
"type": "base64",
|
||
"media_type": img_format,
|
||
"data": b64_img
|
||
}
|
||
})
|
||
payload = {"messages": [{"role": "user", "content": content}]}
|
||
else:
|
||
# Simple text input (Letta's expected format for conversations)
|
||
payload = {"input": message_text}
|
||
|
||
try:
|
||
response = requests.post(url, json=payload, headers=headers, timeout=1000)
|
||
|
||
if response.status_code == 409:
|
||
return LettaResponse(None, "BUSY", [], [], [], False, [])
|
||
|
||
response.raise_for_status()
|
||
|
||
# Parse Server-Sent Events format: "data: {...}\ndata: [DONE]"
|
||
messages = []
|
||
for line in response.text.split('\n'):
|
||
line = line.strip()
|
||
if line.startswith('data: '):
|
||
content = line[6:] # Remove "data: " prefix
|
||
if content == '[DONE]':
|
||
break
|
||
if content:
|
||
msg = json.loads(content)
|
||
messages.append(msg)
|
||
data = {"messages": messages}
|
||
|
||
if not data or not data.get("messages"):
|
||
return LettaResponse(
|
||
"I received your message but got no response from Letta.",
|
||
"ERROR", [], [], [], False, []
|
||
)
|
||
|
||
# Log the raw response to see what Letta actually returned
|
||
message_types = [msg.get("message_type", msg.get("type", "unknown")) for msg in data.get("messages", [])]
|
||
log.info(f"[Letta] Received {len(data.get('messages', []))} messages with types: {message_types}")
|
||
# Log first 3 messages (if any) to see structure
|
||
for i, msg in enumerate(data.get("messages", [])[:3]):
|
||
log.debug(f"[Letta] Message {i}: {json.dumps(msg, default=str)[:500]}...")
|
||
|
||
# Debug: log the full response structure
|
||
log.debug(f"Letta response keys: {list(data.keys())}")
|
||
|
||
# Extract assistant messages, step IDs, tool calls, tool results, errors, and reasoning
|
||
assistant_messages = []
|
||
tool_calls = []
|
||
tool_results = []
|
||
errors = []
|
||
step_ids = set()
|
||
reasoning_present = False
|
||
|
||
# Check for step_id at top level
|
||
if data.get("step_id"):
|
||
step_ids.add(data.get("step_id"))
|
||
|
||
for msg in data.get("messages", []):
|
||
msg_type = msg.get("message_type", msg.get("type", "unknown"))
|
||
|
||
# Look for step_id in messages (might be nested)
|
||
if msg.get("step_id"):
|
||
step_ids.add(msg.get("step_id"))
|
||
|
||
# Also check for id that starts with "step-"
|
||
msg_id = msg.get("id", "")
|
||
if msg_id.startswith("step-"):
|
||
step_ids.add(msg_id)
|
||
|
||
if msg_type == "assistant_message":
|
||
content = msg.get("content", "")
|
||
if content:
|
||
assistant_messages.append(content)
|
||
|
||
elif msg_type == "tool_call_message":
|
||
tool_call = msg.get("tool_call", {})
|
||
tool_calls.append({
|
||
"name": tool_call.get("name", "unknown"),
|
||
"tool_call_id": tool_call.get("tool_call_id"),
|
||
"step_id": msg.get("step_id")
|
||
})
|
||
log.info(f"[Letta] Tool call: {tool_call.get('name')}")
|
||
|
||
elif msg_type == "tool_return_message":
|
||
status = msg.get("status", "unknown")
|
||
tool_results.append({
|
||
"tool_call_id": msg.get("tool_call_id", ""),
|
||
"status": status,
|
||
"error": status == "error"
|
||
})
|
||
if status == "error":
|
||
log.warning(f"[Letta] Tool failed: {msg.get('tool_call_id')}")
|
||
|
||
elif msg_type == "reasoning_message":
|
||
reasoning_present = True
|
||
log.debug(f"[Letta] Reasoning from: {msg.get('source')}")
|
||
|
||
elif msg_type == "error_message":
|
||
errors.append(msg.get("message", "Unknown error"))
|
||
log.error(f"[Letta] Error: {msg.get('message')}")
|
||
|
||
# Log what we captured vs total messages
|
||
log.info(f"[Letta] Captured {len(assistant_messages)} assistant, {len(tool_calls)} tools, {len(tool_results)} results, {len(errors)} errors")
|
||
if assistant_messages:
|
||
log.debug(f"[Letta] Assistant content preview: {assistant_messages[0][:200] if assistant_messages else 'none'}")
|
||
|
||
response_text = "".join(assistant_messages) if assistant_messages else "I received your message but have no response."
|
||
return LettaResponse(
|
||
assistant_text=response_text,
|
||
status="SUCCESS",
|
||
step_ids=list(step_ids),
|
||
tool_calls=tool_calls,
|
||
tool_results=tool_results,
|
||
reasoning_present=reasoning_present,
|
||
errors=errors
|
||
)
|
||
|
||
except requests.exceptions.HTTPError as e:
|
||
if e.response.status_code == 409:
|
||
return LettaResponse(None, "BUSY", [], [], [], False, [])
|
||
error_msg = f"Sorry, I encountered an HTTP error: {e.response.status_code}"
|
||
log.error(f"Letta API HTTP error: {e}")
|
||
return LettaResponse(error_msg, "ERROR", [], [], [], False, [])
|
||
except Exception as e:
|
||
error_msg = f"Sorry, I encountered an error: {str(e)}"
|
||
log.error(f"Letta API error: {e}")
|
||
return LettaResponse(error_msg, "ERROR", [], [], [], False, [])
|
||
|
||
|
||
def create_conversation() -> str | None:
|
||
"""
|
||
Create a new Letta conversation.
|
||
|
||
Returns:
|
||
Conversation ID if successful, None otherwise.
|
||
"""
|
||
# Note: Letta API requires agent_id as query param, not JSON body
|
||
url = f"{LETTA_BASE_URL}/conversations/"
|
||
params = {"agent_id": LETTA_AGENT_ID}
|
||
headers = {"Content-Type": "application/json"}
|
||
if LETTA_API_KEY:
|
||
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
|
||
|
||
payload = {
|
||
"name": f"matrix-bridge-{datetime.now().strftime('%Y%m%d-%H%M%S')}",
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, params=params, json=payload, headers=headers, timeout=30)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
conversation_id = data.get("id")
|
||
log.info(f"Created Letta conversation: {conversation_id}")
|
||
return conversation_id
|
||
except Exception as e:
|
||
log.error(f"Failed to create Letta conversation: {e}")
|
||
return None
|
||
|
||
|
||
def send_feedback_to_letta(step_id: str, feedback: str) -> bool:
|
||
"""Send feedback (positive/negative) for a Letta step.
|
||
|
||
Args:
|
||
step_id: The Letta step ID to provide feedback for
|
||
feedback: Either "positive" or "negative"
|
||
|
||
Returns:
|
||
True if feedback was sent successfully, False otherwise
|
||
"""
|
||
if feedback not in ("positive", "negative"):
|
||
log.error(f"Invalid feedback value: {feedback}")
|
||
return False
|
||
|
||
url = f"{LETTA_BASE_URL}/steps/{step_id}/feedback"
|
||
headers = {"Content-Type": "application/json"}
|
||
if LETTA_API_KEY:
|
||
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
|
||
body = {"feedback": feedback}
|
||
|
||
log.debug(f"Sending PATCH to {url} with body {body}")
|
||
|
||
try:
|
||
# PATCH with JSON body (discovered from ADE console)
|
||
response = requests.patch(url, headers=headers, json=body, timeout=30)
|
||
response.raise_for_status()
|
||
log.info(f"Sent {feedback} feedback for step {step_id}")
|
||
return True
|
||
except Exception as e:
|
||
log.error(f"Failed to send feedback for step {step_id}: {e}")
|
||
return False
|
||
|
||
# ============================================================
|
||
# BRIDGE CLASS
|
||
# ============================================================
|
||
|
||
class MemoryStateStore(BaseMemoryStateStore):
|
||
async def find_shared_rooms(self, user_id: UserID) -> list[RoomID]:
|
||
# Return all encrypted rooms this user is in
|
||
# For simplicity, return empty - this disables some optimizations
|
||
return []
|
||
|
||
class MeridianBridge:
|
||
"""Matrix-Letta bridge with full E2EE support"""
|
||
|
||
def __init__(self):
|
||
self.client: Client | None = None
|
||
self.crypto: OlmMachine | None = None
|
||
self.crypto_store: SQLiteCryptoStore | None = None
|
||
self.state_store: SQLStateStore | None = None
|
||
self.db: Database | None = None
|
||
self.user_id: UserID | None = None
|
||
self.device_id: DeviceID | None = None
|
||
self.initial_sync_done = False
|
||
self.shutting_down = False # Graceful shutdown flag
|
||
self.shutdown_event = asyncio.Event() # Event to signal shutdown
|
||
self.conversation_cache: dict[RoomID, str] = {} # room_id -> conversation_id
|
||
self.our_message_events: set[str] = set() # Track our message event IDs (for emoji controls)
|
||
self.room_cache: dict[str, dict] = {} # Cache room member counts for audio filtering
|
||
self.heartbeat: HeartbeatService | None = None # Autonomous agent heartbeat
|
||
|
||
# Track event IDs we've already added reactions to (prevent re-adding on restart)
|
||
self.processed_reactions: set[str] = set()
|
||
|
||
async def init_database(self):
|
||
"""Initialize database for crypto and state storage"""
|
||
self.db = Database.create(
|
||
DB_URL,
|
||
upgrade_table=None, # We'll handle schema ourselves
|
||
)
|
||
await self.db.start()
|
||
|
||
# Clear in-memory emoji state to prevent stale state from previous sessions
|
||
self.our_message_events.clear()
|
||
log.info("Cleared stale emoji state from previous session")
|
||
|
||
# Create crypto store tables if needed
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_account (
|
||
account_id TEXT PRIMARY KEY,
|
||
device_id TEXT,
|
||
shared BOOLEAN,
|
||
sync_token TEXT,
|
||
account BYTEA
|
||
)
|
||
""")
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_olm_session (
|
||
account_id TEXT,
|
||
sender_key TEXT,
|
||
session_id TEXT,
|
||
session BYTEA,
|
||
created_at TIMESTAMP,
|
||
last_encrypted TIMESTAMP,
|
||
last_decrypted TIMESTAMP,
|
||
PRIMARY KEY (account_id, sender_key, session_id)
|
||
)
|
||
""")
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_megolm_inbound_session (
|
||
account_id TEXT,
|
||
room_id TEXT,
|
||
session_id TEXT,
|
||
sender_key TEXT,
|
||
signing_key TEXT,
|
||
session BYTEA,
|
||
forwarding_chains TEXT,
|
||
withheld_code TEXT,
|
||
withheld_reason TEXT,
|
||
ratchet_safety TEXT,
|
||
received_at TIMESTAMP,
|
||
max_age BIGINT,
|
||
max_messages INTEGER,
|
||
is_scheduled BOOLEAN,
|
||
PRIMARY KEY (account_id, room_id, session_id)
|
||
)
|
||
""")
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_megolm_outbound_session (
|
||
account_id TEXT,
|
||
room_id TEXT PRIMARY KEY,
|
||
session_id TEXT,
|
||
session BYTEA,
|
||
shared BOOLEAN,
|
||
max_messages INTEGER,
|
||
message_count INTEGER,
|
||
max_age BIGINT,
|
||
created_at TIMESTAMP,
|
||
last_used TIMESTAMP
|
||
)
|
||
""")
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_device (
|
||
account_id TEXT,
|
||
user_id TEXT,
|
||
device_id TEXT,
|
||
identity_key TEXT,
|
||
signing_key TEXT,
|
||
trust TEXT,
|
||
deleted BOOLEAN,
|
||
name TEXT,
|
||
PRIMARY KEY (account_id, user_id, device_id)
|
||
)
|
||
""")
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_tracked_user (
|
||
account_id TEXT,
|
||
user_id TEXT,
|
||
PRIMARY KEY (account_id, user_id)
|
||
)
|
||
""")
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS crypto_cross_signing_keys (
|
||
account_id TEXT,
|
||
user_id TEXT,
|
||
usage TEXT,
|
||
key TEXT,
|
||
first_seen_key TEXT,
|
||
PRIMARY KEY (account_id, user_id, usage)
|
||
)
|
||
""")
|
||
|
||
# Per-room conversation mapping (now with isolation!)
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS room_conversations (
|
||
room_id TEXT PRIMARY KEY,
|
||
conversation_id TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
last_used_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# Message ID mapping for reaction → feedback correlation
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS message_mapping (
|
||
matrix_event_id TEXT PRIMARY KEY,
|
||
room_id TEXT NOT NULL,
|
||
step_ids TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# Store our own message texts for TTS regeneration (🎤 reaction)
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS bot_messages (
|
||
matrix_event_id TEXT PRIMARY KEY,
|
||
room_id TEXT NOT NULL,
|
||
text TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# Map audio event IDs to the original text for TTS regeneration
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS audio_messages (
|
||
audio_event_id TEXT PRIMARY KEY,
|
||
original_text TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# Pending images for interactive checkmark (✅) flow
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS pending_images (
|
||
checkmark_event_id TEXT PRIMARY KEY,
|
||
room_id TEXT NOT NULL,
|
||
image_data BLOB NOT NULL,
|
||
image_format TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# Track reactions we've already processed (to prevent re-adding on restart)
|
||
await self.db.execute("""
|
||
CREATE TABLE IF NOT EXISTS processed_reactions (
|
||
source_event_id TEXT NOT NULL,
|
||
target_event_id TEXT NOT NULL,
|
||
emoji TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
PRIMARY KEY (source_event_id, target_event_id, emoji)
|
||
)
|
||
""")
|
||
|
||
log.info("Database initialized")
|
||
|
||
async def init_client(self):
|
||
"""Initialize Matrix client with E2EE and token refresh support"""
|
||
session = load_session()
|
||
|
||
if session:
|
||
try:
|
||
log.info(f"Restoring session for {session['user_id']} (device: {session['device_id']})")
|
||
self.user_id = UserID(session["user_id"])
|
||
self.device_id = DeviceID(session["device_id"])
|
||
|
||
self.client = Client(
|
||
mxid=self.user_id,
|
||
device_id=self.device_id,
|
||
base_url=session["homeserver"],
|
||
token=session["access_token"],
|
||
)
|
||
|
||
# Test the token with a light operation
|
||
await self.client.whoami()
|
||
log.info("Session restored successfully")
|
||
|
||
except MUnknownToken:
|
||
log.warning("Stored token is invalid, re-authenticating...")
|
||
await self._reauth_with_password()
|
||
|
||
else:
|
||
await self._reauth_with_password()
|
||
|
||
async def _reauth_with_password(self):
|
||
"""Re-authenticate using stored password and save new session."""
|
||
log.info("Re-authenticating with password...")
|
||
|
||
self.client = Client(
|
||
mxid=UserID(MATRIX_USER_ID),
|
||
base_url=MATRIX_HOMESERVER,
|
||
)
|
||
|
||
try:
|
||
response = await self.client.login(password=MATRIX_PASSWORD)
|
||
self.user_id = self.client.mxid
|
||
self.device_id = self.client.device_id
|
||
|
||
log.info(f"Re-authenticated as {self.user_id} (device: {self.device_id})")
|
||
|
||
# Save new session
|
||
save_session(
|
||
user_id=str(self.user_id),
|
||
device_id=str(self.device_id),
|
||
access_token=self.client.api.token,
|
||
homeserver=MATRIX_HOMESERVER,
|
||
)
|
||
|
||
except Exception as e:
|
||
log.error(f"Re-authentication failed: {e}")
|
||
raise
|
||
|
||
async def init_crypto(self):
|
||
"""Initialize E2EE support"""
|
||
# Create state store for room encryption tracking
|
||
self.state_store = MemoryStateStore()
|
||
|
||
# Create crypto store
|
||
self.crypto_store = SQLiteCryptoStore(
|
||
account_id=str(self.user_id),
|
||
pickle_key=CRYPTO_PICKLE_KEY,
|
||
db_path=STORE_PATH / "crypto.db",
|
||
)
|
||
await self.crypto_store.open()
|
||
|
||
# Create OlmMachine
|
||
self.crypto = OlmMachine(
|
||
client=self.client,
|
||
crypto_store=self.crypto_store,
|
||
state_store=self.state_store,
|
||
log=log.getChild("crypto"),
|
||
)
|
||
|
||
# Set up automatic key sharing without validation.
|
||
# Not doing this will mean verifying accounts.
|
||
self.crypto.share_keys_min_trust = TrustState.UNVERIFIED
|
||
self.crypto.send_keys_min_trust = TrustState.UNVERIFIED
|
||
|
||
# Link crypto to client for automatic encryption/decryption
|
||
self.client.state_store = self.state_store
|
||
self.client.crypto = self.crypto
|
||
|
||
# Open crypto store
|
||
await self.crypto_store.open()
|
||
|
||
# Check device ID consistency
|
||
stored_device_id = await self.crypto_store.get_device_id()
|
||
if stored_device_id and stored_device_id != self.device_id:
|
||
log.warning("Device ID mismatch, resetting crypto store")
|
||
await self.crypto_store.delete()
|
||
stored_device_id = None
|
||
|
||
# Load Olm account
|
||
await self.crypto.load()
|
||
|
||
# Save device ID if new
|
||
if not stored_device_id:
|
||
await self.crypto_store.put_device_id(self.device_id)
|
||
|
||
# Enable automatic key requests
|
||
self.crypto.allow_key_share = lambda *args: True
|
||
|
||
# Share keys if not already shared
|
||
if not self.crypto.account.shared:
|
||
await self.crypto.share_keys()
|
||
log.info("Shared device keys with server")
|
||
else:
|
||
log.info("Device keys already shared")
|
||
|
||
log.info(f"E2EE initialized (fingerprint: {self.crypto.account.fingerprint})")
|
||
|
||
async def setup_cross_signing(self):
|
||
"""Set up cross-signing for enhanced verification"""
|
||
try:
|
||
log.info("Checking cross-signing status...")
|
||
|
||
# Check if we already have cross-signing keys
|
||
own_keys = await self.crypto.get_own_cross_signing_public_keys()
|
||
|
||
if own_keys and own_keys.master_key:
|
||
log.info("Cross-signing keys already exist on server")
|
||
|
||
# Check if we have the private keys locally
|
||
if self.crypto._cross_signing_private_keys is None:
|
||
# We need to import keys using recovery key
|
||
if MATRIX_RECOVERY_KEY:
|
||
log.info("Importing cross-signing keys from recovery key...")
|
||
await self.crypto.verify_with_recovery_key(MATRIX_RECOVERY_KEY)
|
||
log.info("✅ Cross-signing keys imported successfully")
|
||
|
||
# Sign our own device
|
||
await self.crypto.sign_own_device(self.crypto.own_identity)
|
||
log.info("✅ Device signed with cross-signing keys")
|
||
else:
|
||
log.warning("⚠️ Cross-signing keys exist but no MATRIX_RECOVERY_KEY in .env")
|
||
log.warning(" Add MATRIX_RECOVERY_KEY to .env to enable cross-signing")
|
||
else:
|
||
log.info("✅ Cross-signing keys already loaded")
|
||
# Sign our device if not already signed
|
||
await self.crypto.sign_own_device(self.crypto.own_identity)
|
||
log.info("✅ Device signed with cross-signing keys")
|
||
else:
|
||
# Generate new cross-signing keys
|
||
log.info("Generating new cross-signing keys...")
|
||
recovery_key = await self.crypto.generate_recovery_key()
|
||
|
||
log.info("=" * 70)
|
||
log.info("🔑 CROSS-SIGNING RECOVERY KEY GENERATED")
|
||
log.info("=" * 70)
|
||
log.info("")
|
||
log.info(f" {recovery_key}")
|
||
log.info("")
|
||
log.info("⚠️ IMPORTANT: Save this recovery key to your .env file:")
|
||
log.info(f" MATRIX_RECOVERY_KEY={recovery_key}")
|
||
log.info("")
|
||
log.info("This key is needed to restore cross-signing on new devices.")
|
||
log.info("=" * 70)
|
||
|
||
log.info("✅ Cross-signing enabled and device signed")
|
||
|
||
except Exception as e:
|
||
log.warning(f"⚠️ Cross-signing setup failed: {e}")
|
||
log.warning(" This may be due to homeserver limitations (Conduit has incomplete support)")
|
||
log.warning(" Falling back to basic E2EE without cross-signing")
|
||
log.warning(" Your encryption will still work, but devices won't be cross-signed")
|
||
|
||
def register_handlers(self):
|
||
"""Register event handlers"""
|
||
|
||
@self.client.on(EventType.ROOM_MESSAGE)
|
||
async def handle_message(evt):
|
||
await self.on_message(evt)
|
||
|
||
@self.client.on(EventType.ROOM_MEMBER)
|
||
async def handle_member(evt: StateEvent):
|
||
# Auto-accept invites
|
||
if (evt.state_key == str(self.user_id) and
|
||
evt.content.membership == Membership.INVITE):
|
||
log.info(f"Received invite to {evt.room_id}")
|
||
await self.client.join_room(evt.room_id)
|
||
log.info(f"Joined {evt.room_id}")
|
||
|
||
# Update room cache for audio filtering
|
||
self.room_cache[str(evt.room_id)] = {
|
||
"is_dm": False, # Default to False until we know better
|
||
"member_count": 0
|
||
}
|
||
|
||
# When someone else joins an encrypted room, rotate keys so new member gets them
|
||
elif (evt.state_key != str(self.user_id) and
|
||
evt.content.membership == Membership.JOIN and
|
||
self.crypto and self.crypto_store and self.initial_sync_done):
|
||
try:
|
||
# Check if room is encrypted
|
||
await self.client.get_state_event(evt.room_id, EventType.ROOM_ENCRYPTION)
|
||
# Room is encrypted - invalidate current session so next message creates new one
|
||
new_user = UserID(evt.state_key)
|
||
log.info(f"New member {new_user} joined encrypted room {evt.room_id}, rotating session...")
|
||
|
||
# Remove current outbound session - next message will create new one with all members
|
||
await self.crypto_store.remove_outbound_group_session(evt.room_id)
|
||
log.info(f"✅ Rotated Megolm session for {evt.room_id} (new member: {new_user})")
|
||
except Exception as e:
|
||
# Room not encrypted or error - ignore
|
||
log.debug(f"Key rotation skipped for {evt.room_id}: {e}")
|
||
|
||
@self.client.on(EventType.ROOM_ENCRYPTED)
|
||
async def handle_encrypted(evt: EncryptedEvent):
|
||
# Decryption is automatic via DecryptionDispatcher
|
||
# This handler catches events that failed to decrypt
|
||
log.warning(f"Failed to decrypt event {evt.event_id} in {evt.room_id}")
|
||
# Debug info
|
||
try:
|
||
log.warning(f" Algorithm: {evt.content.algorithm}")
|
||
log.warning(f" Session ID: {evt.content.session_id}")
|
||
log.warning(f" Sender key: {evt.content.sender_key}")
|
||
|
||
# Try manual decryption to get the actual error
|
||
if self.crypto:
|
||
try:
|
||
decrypted = await self.crypto.decrypt_megolm_event(evt)
|
||
log.info(f" Manual decrypt succeeded! Type: {decrypted.type}")
|
||
except Exception as decrypt_err:
|
||
log.warning(f" Manual decrypt error: {type(decrypt_err).__name__}: {decrypt_err}")
|
||
|
||
# Check if we have the session
|
||
has_session = await self.crypto_store.has_group_session(evt.room_id, evt.content.session_id)
|
||
log.warning(f" Has session in store: {has_session}")
|
||
except Exception as e:
|
||
log.warning(f" Could not extract encryption details: {e}")
|
||
|
||
@self.client.on(EventType.TO_DEVICE_ENCRYPTED)
|
||
async def handle_to_device(evt):
|
||
log.info(f"Received encrypted to-device event from {evt.sender}")
|
||
|
||
@self.client.on(EventType.ROOM_KEY)
|
||
async def handle_room_key(evt):
|
||
log.info(f"Received room key for {evt.content.room_id} session {evt.content.session_id}")
|
||
|
||
@self.client.on(EventType.REACTION)
|
||
async def handle_reaction(evt):
|
||
log.info(f"Received REACTION event: {evt.event_id}")
|
||
await self.on_reaction(evt)
|
||
|
||
# Catch-all to see what events we're receiving
|
||
@self.client.on(EventType.ALL)
|
||
async def handle_all(evt):
|
||
# Log non-standard events at INFO level to see what we're getting
|
||
if evt.type not in (EventType.ROOM_MESSAGE, EventType.ROOM_ENCRYPTED):
|
||
log.info(f"EVENT: type={evt.type} id={getattr(evt, 'event_id', 'N/A')}")
|
||
|
||
async def should_send_audio(self, room_id: RoomID) -> bool:
|
||
"""Check if audio should be sent to this room based on filter settings."""
|
||
if not ENABLE_AUDIO_RESPONSE:
|
||
return False
|
||
|
||
if AUDIO_ROOM_FILTER == "all":
|
||
return True
|
||
elif AUDIO_ROOM_FILTER == "dm_only":
|
||
# Use cache to check if room is DM (2 members)
|
||
room_str = str(room_id)
|
||
if room_str in self.room_cache:
|
||
return self.room_cache[room_str].get("is_dm", False)
|
||
# Fallback: try to get room info via API
|
||
# For now, assume all rooms need audio if we can't check
|
||
return True
|
||
else:
|
||
# Comma-separated list of allowed room IDs
|
||
allowed_rooms = [r.strip() for r in AUDIO_ROOM_FILTER.split(",")]
|
||
room_str = str(room_id)
|
||
return any(room_str == r or room_str.endswith(r) for r in allowed_rooms if r)
|
||
|
||
return False
|
||
|
||
async def get_or_create_conversation(self, room_id: RoomID) -> str | None:
|
||
"""
|
||
Get or create a Letta conversation for a specific room.
|
||
|
||
This provides per-room isolation - each Matrix room gets its own conversation.
|
||
|
||
Args:
|
||
room_id: The Matrix room ID
|
||
|
||
Returns:
|
||
Conversation ID if successful, None otherwise
|
||
"""
|
||
# Check cache first
|
||
if room_id in self.conversation_cache:
|
||
return self.conversation_cache[room_id]
|
||
|
||
# Check database
|
||
row = await self.db.fetchrow(
|
||
"SELECT conversation_id FROM room_conversations WHERE room_id = ?",
|
||
str(room_id)
|
||
)
|
||
|
||
if row and row["conversation_id"]:
|
||
conv_id = row["conversation_id"]
|
||
# Update last_used_at
|
||
await self.db.execute(
|
||
"UPDATE room_conversations SET last_used_at = CURRENT_TIMESTAMP WHERE room_id = ?",
|
||
str(room_id)
|
||
)
|
||
self.conversation_cache[room_id] = conv_id
|
||
return conv_id
|
||
|
||
# Create new conversation for this room
|
||
# Note: Letta API requires agent_id as query param, not JSON body
|
||
url = f"{LETTA_BASE_URL}/conversations/"
|
||
params = {"agent_id": LETTA_AGENT_ID}
|
||
headers = {"Content-Type": "application/json"}
|
||
if LETTA_API_KEY:
|
||
headers["Authorization"] = f"Bearer {LETTA_API_KEY}"
|
||
|
||
payload = {
|
||
"name": f"matrix-room-{str(room_id).replace(':', '-')}",
|
||
}
|
||
|
||
try:
|
||
response = requests.post(url, params=params, json=payload, headers=headers, timeout=30)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
conv_id = data.get("id")
|
||
|
||
# Store in database
|
||
await self.db.execute(
|
||
"""
|
||
INSERT INTO room_conversations (room_id, conversation_id)
|
||
VALUES (?, ?)
|
||
""",
|
||
str(room_id), conv_id
|
||
)
|
||
|
||
self.conversation_cache[room_id] = conv_id
|
||
log.info(f"Created conversation {conv_id} for room {room_id}")
|
||
return conv_id
|
||
except Exception as e:
|
||
log.error(f"Failed to create conversation for room {room_id}: {e}")
|
||
return None
|
||
|
||
async def on_message(self, evt):
|
||
"""Handle incoming messages (text and images)"""
|
||
|
||
# Ignore messages during initial sync.
|
||
if not self.initial_sync_done:
|
||
return
|
||
|
||
# Ignore old messages (more than 60 seconds old)
|
||
event_time = datetime.fromtimestamp(evt.timestamp / 1000)
|
||
message_age = datetime.now() - event_time
|
||
if message_age > timedelta(seconds=60):
|
||
log.debug(f"Ignoring old message ({message_age.seconds}s old) from {evt.sender}")
|
||
return
|
||
|
||
# Ignore own messages
|
||
if evt.sender == self.user_id:
|
||
return
|
||
|
||
# Check if shutting down (stop accepting new messages)
|
||
if self.shutting_down:
|
||
log.info(f"Shutting down, ignoring message from {sender}")
|
||
return
|
||
|
||
room_id = evt.room_id
|
||
sender = evt.sender
|
||
body = evt.content.body
|
||
|
||
# Update heartbeat tracker (user is active)
|
||
if self.heartbeat:
|
||
self.heartbeat.update_last_user_message(str(room_id))
|
||
|
||
# Handle images
|
||
if evt.content.msgtype == MessageType.IMAGE:
|
||
return await self.on_image(evt, room_id, sender, body)
|
||
|
||
# Handle audio
|
||
if evt.content.msgtype == MessageType.AUDIO:
|
||
return await self.on_audio(evt, room_id, sender, body)
|
||
|
||
# Only handle text messages
|
||
if evt.content.msgtype != MessageType.TEXT:
|
||
return
|
||
|
||
log.info(f"[{room_id}] {sender}: {body}")
|
||
|
||
# Store user's event ID for reactions
|
||
user_event_id = str(evt.event_id)
|
||
|
||
# Send read receipt immediately (seen indicator)
|
||
try:
|
||
await self.client.send_receipt(room_id, EventID(user_event_id))
|
||
except Exception as e:
|
||
log.debug(f"Failed to send read receipt: {e}")
|
||
|
||
# Handle bridge commands
|
||
if body.startswith("!"):
|
||
cmd = body.strip().lower()
|
||
if cmd in ("!reshare", "!keys", "!rekey"):
|
||
# Force new encryption session - removes current outbound session
|
||
# so next message creates a fresh one shared with all members
|
||
if self.crypto and self.crypto_store:
|
||
try:
|
||
# Remove the current outbound session for this room
|
||
await self.crypto_store.remove_outbound_group_session(room_id)
|
||
log.info(f"Removed outbound session for {room_id}, next message will create new one")
|
||
|
||
# Get member count for the confirmation message
|
||
members = await self.client.get_joined_members(room_id)
|
||
member_count = len(members)
|
||
|
||
# Send confirmation (this will create and share a NEW session)
|
||
await self.send_message(room_id, f"🔑 Created new encryption session. All {member_count} members should now be able to see new messages.")
|
||
log.info(f"New session created for {room_id} with {member_count} members")
|
||
except Exception as e:
|
||
await self.send_message(room_id, f"⚠️ Key rotation failed: {e}")
|
||
log.error(f"Manual key rotation failed: {e}")
|
||
else:
|
||
await self.send_message(room_id, "⚠️ E2EE not initialized")
|
||
return
|
||
if cmd in ("!heartbeat", "!pulse", "!wake"):
|
||
# Manually trigger a heartbeat (resumes if paused)
|
||
if self.heartbeat and HEARTBEAT_ENABLED:
|
||
# trigger() handles resume internally
|
||
asyncio.create_task(self.heartbeat.trigger(by=str(evt.sender)))
|
||
|
||
hb_status = self.heartbeat.get_status()
|
||
if hb_status['paused']:
|
||
await self.send_message(room_id, "▶️ Resuming heartbeat...")
|
||
else:
|
||
await self.send_message(room_id, "⏰ Triggering heartbeat (silent mode - check logs)...")
|
||
else:
|
||
await self.send_message(room_id, "⚠️ Heartbeat service not enabled. Set HEARTBEAT_ENABLED=1")
|
||
return
|
||
if cmd in ("!pause", "!suspend"):
|
||
# Pause the heartbeat service
|
||
if self.heartbeat:
|
||
response = self.heartbeat.pause(by=str(evt.sender))
|
||
await self.send_message(room_id, response)
|
||
else:
|
||
await self.send_message(room_id, "⚠️ Heartbeat service not available")
|
||
return
|
||
if cmd in ("!resume", "!unpause"):
|
||
# Resume the heartbeat service
|
||
if self.heartbeat:
|
||
response = self.heartbeat.resume(by=str(evt.sender))
|
||
await self.send_message(room_id, response)
|
||
else:
|
||
await self.send_message(room_id, "⚠️ Heartbeat service not available")
|
||
return
|
||
if cmd == "!status":
|
||
# Show bridge and heartbeat status
|
||
status_lines = [
|
||
"**Meridian Bridge Status**",
|
||
f"• E2EE: {'✅ Ready' if self.crypto else '❌ Not initialized'}",
|
||
f"• Syncing: {'✅ Active' if self.initial_sync_done else '⏳ Waiting'}",
|
||
f"• Conversations: {len(self.conversation_cache)} cached",
|
||
]
|
||
if self.heartbeat:
|
||
hb_status = self.heartbeat.get_status()
|
||
hb_indicator = "⏸️ Paused" if hb_status['paused'] else ("🏃 running" if hb_status['is_running'] else ("✅ Active" if hb_status['running'] else "❌ Stopped"))
|
||
status_lines.extend([
|
||
"",
|
||
"**Heartbeat Status**",
|
||
f"• Status: {hb_indicator}",
|
||
f"• Interval: {hb_status['interval_minutes']} minutes",
|
||
f"• Count: {hb_status['heartbeat_count']} sent, {hb_status['skipped_count']} skipped",
|
||
f"• Last: {hb_status['last_heartbeat'] or 'Never'}",
|
||
])
|
||
if hb_status['paused']:
|
||
paused_mins = ""
|
||
if hb_status['paused_since']:
|
||
paused_duration = datetime.now(timezone.utc) - datetime.fromisoformat(hb_status['paused_since'])
|
||
paused_mins = f" ({int(paused_duration.total_seconds()/60)}m ago)"
|
||
status_lines.append(f"• Paused by: {hb_status['paused_by'] or 'unknown'}{paused_mins}")
|
||
if hb_status['is_running']:
|
||
status_lines.append("• ⚠️ Currently executing a heartbeat task")
|
||
status_lines.extend([
|
||
"",
|
||
"**Commands**",
|
||
"• !pause / !suspend - Pause heartbeat",
|
||
"• !resume / !unpause - Resume heartbeat",
|
||
"• !heartbeat - Manually trigger (also resumes if paused)",
|
||
"• !status - Show this status",
|
||
"• !clear-reactions - Clear reaction dedup cache (use if seeing dupes on restart)",
|
||
])
|
||
await self.send_message(room_id, "\n".join(status_lines))
|
||
return
|
||
if cmd == "!clear-reactions":
|
||
# Clear the processed_reactions table to fix duplicate reactions
|
||
result = await self.db.execute("DELETE FROM processed_reactions")
|
||
clear_count = result if isinstance(result, int) else 0
|
||
await self.send_message(
|
||
room_id,
|
||
f"🧹 Cleared {clear_count} reactions from dedup cache.\n"
|
||
f"Note: This may cause reactions to be re-sent on next restart until the system stabilizes."
|
||
)
|
||
log.info(f"Cleared {clear_count} reactions from processed_reactions table via command from {sender}")
|
||
return
|
||
# Unknown commands pass through to Letta
|
||
|
||
# Check if there's a pending image for this room
|
||
pending = await self.get_pending_image(room_id)
|
||
images = []
|
||
if pending:
|
||
checkmark_event_id, image_data, image_format = pending
|
||
log.info(f"[{room_id}] Combining text with pending image")
|
||
images = [image_data] # send_to_letta expects raw bytes, not tuples
|
||
# Note: We clear pending AFTER successful send below
|
||
|
||
# Get or create conversation for this room
|
||
conversation_id = await self.get_or_create_conversation(room_id)
|
||
|
||
# Send typing indicator (keep active through entire Letta call + message send)
|
||
await self.client.set_typing(room_id, timeout=60000)
|
||
|
||
# Try to send to Letta using room's conversation
|
||
# Include sender so Ani knows who's speaking (important for multi-user rooms)
|
||
message_with_sender = f"{sender}: {body}"
|
||
letta_response = await asyncio.to_thread(
|
||
send_to_letta, message_with_sender, conversation_id, images=images
|
||
)
|
||
|
||
if letta_response.status == "SUCCESS":
|
||
event_id = await self._handle_success_response(room_id, letta_response, user_event_id)
|
||
|
||
# Clear pending image after successful send (normal path only)
|
||
if pending:
|
||
await self.clear_pending_image(room_id)
|
||
|
||
# Stop typing indicator (normal path only)
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
elif letta_response.status == "BUSY":
|
||
# Queue the message with event IDs for reactions
|
||
message_queue.append((room_id, sender, body, user_event_id, 0, ""))
|
||
log.info(f"Agent busy, queued message (queue size: {len(message_queue)})")
|
||
|
||
# Send acknowledgment
|
||
await self.send_message(
|
||
room_id,
|
||
"⏳ I'm currently in another conversation. Your message has been queued."
|
||
)
|
||
|
||
# Stop typing indicator
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
else: # ERROR
|
||
await self.send_message(room_id, letta_response.assistant_text)
|
||
log.error(f"[{room_id}] Error: {letta_response.assistant_text}")
|
||
|
||
# Stop typing indicator
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
async def on_image(self, evt, room_id: RoomID, sender: UserID, caption: str):
|
||
"""Handle incoming image messages - queue and send ✅ checkmark"""
|
||
|
||
log.info(f"[{room_id}] Image from {sender}: {caption or '(no caption)'}")
|
||
|
||
# Send typing indicator (image processing takes time)
|
||
await self.client.set_typing(room_id, timeout=60000)
|
||
|
||
try:
|
||
# Get image URL
|
||
content = evt.content
|
||
if isinstance(content, MediaMessageEventContent) and content.msgtype == MessageType.IMAGE:
|
||
# For E2EE, URL is in file.url, otherwise content.url
|
||
if hasattr(content, "file") and content.file and content.file.url:
|
||
mxc_url = content.file.url
|
||
encryption_info = {
|
||
"key": {"k": content.file.key.key},
|
||
"iv": content.file.iv,
|
||
"hashes": {"sha256": content.file.hashes.get("sha256", "")}
|
||
}
|
||
else:
|
||
mxc_url = content.url
|
||
encryption_info = None
|
||
|
||
if not mxc_url:
|
||
raise ValueError("No image URL found (E2EE?)")
|
||
|
||
# Download and process image
|
||
image_data = await process_matrix_image(self.client, mxc_url, encryption_info)
|
||
log.info(f"Downloaded image: {len(image_data)} bytes")
|
||
|
||
# Prepare image for Letta (resize, convert to JPEG)
|
||
processed_image = prepare_image_for_letta(image_data)
|
||
log.info(f"Processed image: {len(processed_image)} bytes")
|
||
|
||
# Stop typing
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
# Add ✅ reaction to the user's image message (not as a new message)
|
||
user_image_event_id = str(evt.event_id)
|
||
await self.add_reaction(room_id, user_image_event_id, "✅")
|
||
|
||
# Store pending image with the USER'S image event ID (for ✅ reaction handling)
|
||
image_format = "image/jpeg"
|
||
await self.store_pending_image(user_image_event_id, room_id, processed_image, image_format)
|
||
log.info(f"[{room_id}] Image queued, ✅ added to {user_image_event_id[:20]}...")
|
||
|
||
except Exception as e:
|
||
log.error(f"Failed to process image: {e}")
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
await self.send_message(room_id, f"⚠️ Failed to process image: {str(e)}")
|
||
|
||
async def on_audio(self, evt, room_id: RoomID, sender: UserID, caption: str):
|
||
"""Handle incoming audio messages - transcribe, send to agent, respond with audio"""
|
||
|
||
# Check if shutting down (stop accepting new messages)
|
||
if self.shutting_down:
|
||
log.info(f"Shutting down, ignoring audio from {sender}")
|
||
return
|
||
|
||
log.info(f"[{room_id}] 🎤 Audio from {sender}: {caption or '(no caption)'}")
|
||
|
||
# Store user's event ID for reactions
|
||
user_event_id = str(evt.event_id)
|
||
|
||
# Send read receipt immediately (seen indicator)
|
||
try:
|
||
await self.client.send_receipt(room_id, EventID(user_event_id))
|
||
except Exception as e:
|
||
log.debug(f"Failed to send read receipt: {e}")
|
||
|
||
# Get or create conversation for this room
|
||
conversation_id = await self.get_or_create_conversation(room_id)
|
||
|
||
# Send typing indicator (STT + Letta + TTS takes time)
|
||
await self.client.set_typing(room_id, timeout=90000)
|
||
|
||
try:
|
||
content = evt.content
|
||
if isinstance(content, MediaMessageEventContent) and content.msgtype == MessageType.AUDIO:
|
||
# Get audio URL - E2EE in file.url, otherwise content.url
|
||
if hasattr(content, "file") and content.file and content.file.url:
|
||
mxc_url = content.file.url
|
||
encryption_info = {
|
||
"key": {"k": content.file.key.key},
|
||
"iv": content.file.iv,
|
||
"hashes": {"sha256": content.file.hashes.get("sha256", "")}
|
||
}
|
||
else:
|
||
mxc_url = content.url
|
||
encryption_info = None
|
||
|
||
if not mxc_url:
|
||
raise ValueError("No audio URL found (E2EE?)")
|
||
|
||
# Download and decrypt if E2EE
|
||
audio_data = await download_matrix_audio(self.client, mxc_url, encryption_info)
|
||
|
||
# Detect format from info
|
||
audio_format = "mp3"
|
||
if hasattr(content, "info") and content.info:
|
||
mimetype = getattr(content.info, "mimetype", None)
|
||
if mimetype:
|
||
if "ogg" in mimetype:
|
||
audio_format = "ogg"
|
||
elif "m4a" in mimetype or "mpeg" in mimetype:
|
||
audio_format = "mp3"
|
||
elif "wav" in mimetype:
|
||
audio_format = "wav"
|
||
|
||
# Transcribe
|
||
log.info(f"Transcribing with STT ({audio_format})...")
|
||
transcription = await asyncio.to_thread(
|
||
transcribe_audio, audio_data, audio_format
|
||
)
|
||
|
||
if not transcription or transcription.startswith("["):
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
await self.send_message(room_id, f"⚠️ {transcription or 'No speech detected'}")
|
||
return
|
||
|
||
log.info(f"Transcribed: '{transcription[:100]}...'")
|
||
|
||
# Voice context - include sender so Ani knows who's speaking
|
||
voice_message = f"""[VOICE] {sender}'s voice, resonating:
|
||
|
||
"{transcription}\""""
|
||
letta_response = await asyncio.to_thread(
|
||
send_to_letta, voice_message, conversation_id
|
||
)
|
||
|
||
if letta_response.status == "SUCCESS":
|
||
# Generate TTS response if enabled and room allows it
|
||
audio_event_id = None
|
||
text_event_id = None
|
||
if await self.should_send_audio(room_id):
|
||
log.info(f"Generating TTS for {len(letta_response.assistant_text)} chars...")
|
||
audio_bytes = await asyncio.to_thread(synthesize_speech, letta_response.assistant_text)
|
||
|
||
if audio_bytes:
|
||
audio_event_id = await self._upload_and_send_audio(room_id, audio_bytes)
|
||
if audio_event_id:
|
||
log.info(f"Audio sent: {audio_event_id[:20]}...")
|
||
# Add 🎤 reaction to audio message for TTS regeneration
|
||
await self.add_reaction(room_id, audio_event_id, "🎤")
|
||
# Store mapping for TTS regeneration via 🎤 reaction
|
||
await self.store_audio_message(audio_event_id, letta_response.assistant_text)
|
||
|
||
# Send text response (no threading for now)
|
||
text_event_id = await self.send_message(room_id, letta_response.assistant_text)
|
||
|
||
preview = letta_response.assistant_text[:100] + "..." if len(letta_response.assistant_text) > 100 else letta_response.assistant_text
|
||
log.info(f"[{room_id}] Agent (audio): {preview}")
|
||
|
||
# Store mapping for feedback correlation (text message)
|
||
if text_event_id and letta_response.step_ids:
|
||
await self.store_message_mapping(text_event_id, room_id, letta_response.step_ids)
|
||
|
||
# Add tool summary reactions to user's audio message (source = text response)
|
||
await self.add_tool_summary_reactions(room_id, user_event_id, letta_response, source_event_id=text_event_id or "")
|
||
|
||
# Stop typing indicator
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
elif letta_response.status == "BUSY":
|
||
await self.send_message(room_id, "⏳ I'm busy processing another message.")
|
||
# Stop typing indicator
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
else:
|
||
await self.send_message(room_id, letta_response.assistant_text)
|
||
log.error(f"[{room_id}] Audio error: {letta_response.assistant_text}")
|
||
# Stop typing indicator
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
except Exception as e:
|
||
log.error(f"Failed to process audio: {e}")
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
await self.send_message(room_id, f"⚠️ Failed to process audio: {str(e)}")
|
||
|
||
async def _upload_and_send_audio(self, room_id: RoomID, audio_data: bytes) -> str | None:
|
||
"""Upload audio to Matrix media server and send to room.
|
||
|
||
Returns:
|
||
Event ID if successful, None otherwise
|
||
"""
|
||
try:
|
||
# Upload to media server using mautrix upload_media
|
||
content_uri = await self.client.upload_media(
|
||
data=audio_data,
|
||
mime_type="audio/mpeg",
|
||
filename="ani-response.mp3",
|
||
size=len(audio_data)
|
||
)
|
||
|
||
if not content_uri:
|
||
log.error("Failed to upload audio to media server: no content_uri")
|
||
return None
|
||
|
||
log.info(f"Audio uploaded: {content_uri[:60]}...")
|
||
|
||
# Send audio message with agent's name
|
||
content = {
|
||
"body": f"{AGENT_DISPLAY_NAME}'s voice",
|
||
"info": {"mimetype": "audio/mpeg", "size": len(audio_data)},
|
||
"msgtype": "m.audio",
|
||
"url": content_uri,
|
||
}
|
||
|
||
event_id = await self.client.send_message_event(
|
||
room_id,
|
||
EventType.ROOM_MESSAGE,
|
||
content
|
||
)
|
||
|
||
return str(event_id) if event_id else None
|
||
|
||
except Exception as e:
|
||
log.error(f"Failed to upload/send audio: {e}")
|
||
return None
|
||
|
||
async def on_reaction(self, evt):
|
||
"""
|
||
Handle reaction events - both for feedback AND to let agent see reactions.
|
||
|
||
The agent can respond to reactions with emojis for interactive gameplay!
|
||
"""
|
||
log.info(f"on_reaction called for {evt.event_id} from {evt.sender} in {evt.room_id}")
|
||
|
||
# Ignore during initial sync
|
||
if not self.initial_sync_done:
|
||
log.info(" Skipping: initial sync not done")
|
||
return
|
||
|
||
# Check if shutting down (stop accepting new messages)
|
||
if self.shutting_down:
|
||
log.info(" Skipping: shutting down")
|
||
return
|
||
|
||
# Ignore own reactions
|
||
if evt.sender == self.user_id:
|
||
log.info(" Skipping: own reaction")
|
||
return
|
||
|
||
# Get the reaction content
|
||
try:
|
||
relates_to = evt.content.relates_to
|
||
if not relates_to:
|
||
log.info(" Skipping: no relates_to")
|
||
return
|
||
|
||
target_event_id = str(relates_to.event_id)
|
||
reaction_key = relates_to.key
|
||
except AttributeError as e:
|
||
log.warning(f" Could not parse reaction event: {e}")
|
||
return
|
||
|
||
log.info(f" Reaction: {reaction_key} on event {target_event_id}")
|
||
|
||
room_id = evt.room_id
|
||
sender = evt.sender
|
||
|
||
# **NEW: Interactive emoji controls**
|
||
# 🎤 on audio messages = regenerate TTS
|
||
if reaction_key == "🎤" and target_event_id in self.our_message_events:
|
||
log.info(f" 🎤 TTS re-generation requested for audio {target_event_id}")
|
||
result_id = await self.regenerate_tts(room_id, target_event_id)
|
||
return # Don't forward to Letta - this is a bridge control
|
||
|
||
# ✅ on checkmark messages = send pending image alone
|
||
if reaction_key == "✅":
|
||
# Check if this is one of our checkmark messages
|
||
pending_row = await self.db.fetchrow(
|
||
"SELECT image_data, image_format FROM pending_images WHERE checkmark_event_id = ?",
|
||
target_event_id
|
||
)
|
||
if pending_row:
|
||
log.info(f" ✅ Sending pending image alone to Letta (event: {target_event_id[:20]}...)")
|
||
image_data = pending_row["image_data"]
|
||
image_format = pending_row["image_format"]
|
||
|
||
conversation_id = await self.get_or_create_conversation(room_id)
|
||
await self.client.set_typing(room_id, timeout=30000)
|
||
|
||
# Send image alone (no additional text)
|
||
letta_response = await asyncio.to_thread(
|
||
send_to_letta, "", conversation_id, images=[image_data] # send_to_letta expects raw bytes
|
||
)
|
||
|
||
await self.client.set_typing(room_id, timeout=0)
|
||
|
||
if letta_response.status == "SUCCESS":
|
||
event_id = await self.send_message(room_id, letta_response.assistant_text)
|
||
log.info(f"[{room_id}] Agent (image alone): {letta_response.assistant_text[:100]}...")
|
||
if event_id and letta_response.step_ids:
|
||
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
|
||
else:
|
||
await self.send_message(room_id, letta_response.assistant_text)
|
||
log.error(f"[{room_id}] Error: {letta_response.assistant_text}")
|
||
|
||
# Clear pending image after sending
|
||
await self.clear_pending_image(room_id)
|
||
return # Don't forward to Letta - this is a bridge control
|
||
|
||
# Get conversation for this room
|
||
conversation_id = await self.get_or_create_conversation(room_id)
|
||
|
||
# **PRIMARY**: Send reaction to Letta so agent can see and respond!
|
||
# Format message clearly so the agent understands what happened
|
||
reaction_message = f"🎭 {sender} reacted with: {reaction_key}"
|
||
log.info(f" 🎭 Forwarding reaction to Letta: {reaction_message}")
|
||
letta_response = await asyncio.to_thread(
|
||
send_to_letta, reaction_message, conversation_id
|
||
)
|
||
|
||
if letta_response.status == "SUCCESS":
|
||
# Send agent's response (it might reply with an emoji!)
|
||
event_id = await self.send_message(room_id, letta_response.assistant_text)
|
||
log.info(f"[{room_id}] Agent reacted: {letta_response.assistant_text[:50]}...")
|
||
|
||
# Store mapping for feedback correlation
|
||
if event_id and letta_response.step_ids:
|
||
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
|
||
else:
|
||
log.warning(f"Could not send reaction to agent: {letta_response.status}")
|
||
|
||
# **SECONDARY**: Also send feedback if it's a thumbs up/thumbs down
|
||
# Using sets for reaction matching with emoji variations
|
||
POSITIVE_REACTIONS = {
|
||
"👍", "👍️", "👍🏻", "👍🏼", "👍🏽", "👍🏾", "👍🏿",
|
||
"❤️", "❤", "💕", "💖", "💓", "💗", "💙", "💚", "💛", "💜", "🖤",
|
||
"✅", "🎉", "💯", "👏", "🙌", "💪", "⭐", "🌟", "+1"
|
||
}
|
||
NEGATIVE_REACTIONS = {
|
||
"👎", "👎️", "👎🏻", "👎🏼", "👎🏽", "👎🏾", "👎🏿",
|
||
"❌", "😕", "😞", "💔", "-1"
|
||
}
|
||
|
||
# Determine feedback type from reaction
|
||
feedback = None
|
||
if reaction_key in POSITIVE_REACTIONS:
|
||
feedback = "positive"
|
||
elif reaction_key in NEGATIVE_REACTIONS:
|
||
feedback = "negative"
|
||
|
||
# Look up the Letta step IDs for this event and send feedback
|
||
if feedback:
|
||
step_ids = await self.get_step_ids_for_event(target_event_id)
|
||
if step_ids:
|
||
log.info(f" Sending {feedback} feedback for step_ids: {step_ids}")
|
||
for step_id in step_ids:
|
||
result = await asyncio.to_thread(send_feedback_to_letta, step_id, feedback)
|
||
log.debug(f" Feedback result for {step_id}: {result}")
|
||
|
||
async def send_message(self, room_id: RoomID, text: str) -> str | None:
|
||
"""
|
||
Send a formatted message to a room (auto-encrypts if needed).
|
||
|
||
Supports all Matrix formatting features:
|
||
- Markdown conversion (bold, italic, code, links, lists, etc.)
|
||
- Emoji shortcode conversion (:heart: → ❤️)
|
||
- Matrix extensions (spoilers, colors with named palette)
|
||
- Raw HTML pass-through (if agent returns HTML)
|
||
|
||
Returns:
|
||
The Matrix event ID of the sent message, or None on failure.
|
||
"""
|
||
# Format text as HTML with full markdown and emoji support
|
||
plain_text, html_body = format_html(text)
|
||
|
||
# Create content with both plain text and formatted HTML
|
||
content = {
|
||
"msgtype": "m.text",
|
||
"body": plain_text,
|
||
"format": "org.matrix.custom.html",
|
||
"formatted_body": html_body,
|
||
}
|
||
|
||
event_id = await self.client.send_message_event(room_id, EventType.ROOM_MESSAGE, content)
|
||
return str(event_id) if event_id else None
|
||
|
||
async def store_message_mapping(self, matrix_event_id: str, room_id: RoomID, step_ids: list[str]):
|
||
"""Store mapping between Matrix event ID and Letta step IDs."""
|
||
if not step_ids:
|
||
return
|
||
|
||
step_ids_json = json.dumps(step_ids)
|
||
await self.db.execute(
|
||
"""
|
||
INSERT OR REPLACE INTO message_mapping (matrix_event_id, room_id, step_ids)
|
||
VALUES (?, ?, ?)
|
||
""",
|
||
str(matrix_event_id), str(room_id), step_ids_json
|
||
)
|
||
log.debug(f"Stored mapping: {matrix_event_id} -> {step_ids}")
|
||
|
||
async def get_step_ids_for_event(self, matrix_event_id: str) -> list[str]:
|
||
"""Retrieve Letta step IDs for a Matrix event ID."""
|
||
row = await self.db.fetchrow(
|
||
"SELECT step_ids FROM message_mapping WHERE matrix_event_id = ?",
|
||
str(matrix_event_id)
|
||
)
|
||
if row and row["step_ids"]:
|
||
return json.loads(row["step_ids"])
|
||
return []
|
||
|
||
# ============================================================
|
||
# INTERACTIVE EMOJI CONTROLS
|
||
# ============================================================
|
||
|
||
async def store_audio_message(self, audio_event_id: str, original_text: str):
|
||
"""Store mapping from audio event ID to original text for TTS regeneration via 🎤 reaction."""
|
||
await self.db.execute(
|
||
"""
|
||
INSERT OR REPLACE INTO audio_messages (audio_event_id, original_text)
|
||
VALUES (?, ?)
|
||
""",
|
||
str(audio_event_id), original_text
|
||
)
|
||
self.our_message_events.add(str(audio_event_id))
|
||
log.debug(f"Stored audio message text: {audio_event_id}")
|
||
|
||
async def get_original_text_for_audio(self, audio_event_id: str) -> str | None:
|
||
"""Retrieve original text for an audio event ID."""
|
||
row = await self.db.fetchrow(
|
||
"SELECT original_text FROM audio_messages WHERE audio_event_id = ?",
|
||
str(audio_event_id)
|
||
)
|
||
return row["original_text"] if row else None
|
||
|
||
async def add_reaction(self, room_id: RoomID, target_event_id: str, emoji: str, source_event_id: str = ""):
|
||
"""
|
||
Add a reaction to a message.
|
||
|
||
Args:
|
||
room_id: The room ID
|
||
target_event_id: The event to react to
|
||
emoji: The emoji reaction
|
||
source_event_id: The event ID we're reacting from (for tracking)
|
||
"""
|
||
if target_event_id:
|
||
# Check if we've already sent this reaction (for tool visibility reactions)
|
||
if source_event_id:
|
||
if await self.is_reaction_already_sent(source_event_id, target_event_id, emoji):
|
||
log.debug(f"Skipping duplicate reaction {emoji} on {target_event_id}")
|
||
return
|
||
|
||
content = ReactionEventContent(
|
||
relates_to=RelatesTo(
|
||
event_id=EventID(target_event_id),
|
||
rel_type=RelationType.ANNOTATION,
|
||
key=emoji
|
||
)
|
||
)
|
||
await self.client.send_message_event(room_id, EventType.REACTION, content)
|
||
log.debug(f"Added reaction {emoji} to {target_event_id}")
|
||
|
||
# Track this reaction (only after initial sync completes to avoid tracking old reactions)
|
||
if source_event_id and self.initial_sync_done:
|
||
await self.mark_reaction_sent(source_event_id, target_event_id, emoji)
|
||
|
||
async def add_tool_summary_reactions(
|
||
self,
|
||
room_id: RoomID,
|
||
target_event_id: str,
|
||
letta_response: LettaResponse,
|
||
source_event_id: str = "" # Event ID we're reacting from (usually the assistant response)
|
||
) -> None:
|
||
"""Add emoji reactions to summarize what tools were executed."""
|
||
if not target_event_id:
|
||
return
|
||
|
||
# No tools/reasoning - nothing to show
|
||
if not letta_response.tool_calls and not letta_response.reasoning_present:
|
||
return
|
||
|
||
emojis_to_add = []
|
||
|
||
# Add brain emoji if reasoning was used
|
||
if letta_response.reasoning_present:
|
||
emojis_to_add.append("🧠")
|
||
|
||
# Add emojis for all tools used (deduplicated)
|
||
tool_emojis = []
|
||
for tool in letta_response.tool_calls:
|
||
emoji = get_emoji_for_tool(tool["name"])
|
||
tool_emojis.append(emoji)
|
||
emojis_to_add.extend(deduplicate_emojis(tool_emojis))
|
||
|
||
# Check for tool execution errors
|
||
if letta_response.tool_results:
|
||
failed_tools = [r for r in letta_response.tool_results if r.get("error")]
|
||
if failed_tools:
|
||
emojis_to_add.append("❌")
|
||
else:
|
||
emojis_to_add.append("✅")
|
||
|
||
# Add error message indicator
|
||
if letta_response.errors:
|
||
emojis_to_add.append("⚠️")
|
||
|
||
# Add all reactions with delay to avoid rate limiting
|
||
for emoji in emojis_to_add[:6]: # Limit to 6 emojis
|
||
try:
|
||
await self.add_reaction(room_id, target_event_id, emoji, source_event_id)
|
||
await asyncio.sleep(0.3)
|
||
except Exception as e:
|
||
log.warning(f"Failed to add reaction {emoji}: {e}")
|
||
|
||
log.info(f"[ToolVisibility] Added {emojis_to_add} to {target_event_id[:20]}...")
|
||
|
||
async def regenerate_tts(self, room_id: RoomID, audio_event_id: str):
|
||
"""Regenerate and resend TTS audio for a given audio message."""
|
||
original_text = await self.get_original_text_for_audio(audio_event_id)
|
||
if not original_text:
|
||
await self.send_message(room_id, "⚠️ Could not find original text for TTS regeneration.")
|
||
return None
|
||
|
||
log.info(f"Regenerating TTS for audio {audio_event_id[:20]}: {original_text[:50]}...")
|
||
audio_bytes = await asyncio.to_thread(synthesize_speech, original_text)
|
||
|
||
if audio_bytes:
|
||
new_audio_event_id = await self._upload_and_send_audio(room_id, audio_bytes)
|
||
if new_audio_event_id:
|
||
log.info(f"TTS audio sent: {new_audio_event_id[:20]}...")
|
||
# Add 🎤 reaction to the new audio message too
|
||
await self.add_reaction(room_id, new_audio_event_id, "🎤")
|
||
# Store mapping for new audio (for regeneration on audio)
|
||
await self.store_audio_message(new_audio_event_id, original_text)
|
||
return new_audio_event_id
|
||
else:
|
||
await self.send_message(room_id, "⚠️ TTS regeneration failed.")
|
||
return None
|
||
|
||
async def store_pending_image(
|
||
self, checkmark_event_id: str, room_id: RoomID,
|
||
image_data: bytes, image_format: str
|
||
):
|
||
"""Store pending image data with checkmark message ID."""
|
||
await self.db.execute(
|
||
"""
|
||
INSERT INTO pending_images (checkmark_event_id, room_id, image_data, image_format)
|
||
VALUES (?, ?, ?, ?)
|
||
""",
|
||
str(checkmark_event_id), str(room_id), image_data, image_format
|
||
)
|
||
log.info(f"Stored pending image with checkmark: {checkmark_event_id}")
|
||
|
||
async def get_pending_image(self, room_id: RoomID) -> tuple[str, bytes, str] | None:
|
||
"""Get pending image for a room: (checkmark_event_id, image_data, image_format)."""
|
||
# Get the most recent pending image for this room
|
||
row = await self.db.fetchrow(
|
||
"""
|
||
SELECT checkmark_event_id, image_data, image_format
|
||
FROM pending_images
|
||
WHERE room_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
""",
|
||
str(room_id)
|
||
)
|
||
if row:
|
||
return row["checkmark_event_id"], row["image_data"], row["image_format"]
|
||
return None
|
||
|
||
async def clear_pending_image(self, room_id: RoomID):
|
||
"""Clear stored pending image for a room."""
|
||
await self.db.execute(
|
||
"DELETE FROM pending_images WHERE room_id = ?",
|
||
str(room_id)
|
||
)
|
||
log.info(f"Cleared pending image for room: {room_id}")
|
||
|
||
async def is_reaction_already_sent(
|
||
self, source_event_id: str, target_event_id: str, emoji: str
|
||
) -> bool:
|
||
"""Check if we already sent this reaction (prevent re-adding on restart)."""
|
||
row = await self.db.fetchrow(
|
||
"SELECT 1 FROM processed_reactions WHERE source_event_id = ? AND target_event_id = ? AND emoji = ?",
|
||
str(source_event_id), str(target_event_id), emoji
|
||
)
|
||
return row is not None
|
||
|
||
async def mark_reaction_sent(
|
||
self, source_event_id: str, target_event_id: str, emoji: str
|
||
):
|
||
"""Mark that we've sent this reaction."""
|
||
await self.db.execute(
|
||
"""
|
||
INSERT OR REPLACE INTO processed_reactions (source_event_id, target_event_id, emoji)
|
||
VALUES (?, ?, ?)
|
||
""",
|
||
str(source_event_id), str(target_event_id), emoji
|
||
)
|
||
|
||
async def _handle_success_response(
|
||
self,
|
||
room_id: RoomID,
|
||
letta_response: "LettaResponse",
|
||
user_event_id: str,
|
||
) -> str | None:
|
||
"""
|
||
Handle SUCCESS response from Letta agent.
|
||
|
||
Common post-processing for both normal and queued messages:
|
||
- Send message to Matrix
|
||
- Store message mapping for feedback
|
||
- Add TTS reaction and store audio message (unless [silent] tag)
|
||
- Add tool summary reactions to user's original message
|
||
|
||
Args:
|
||
room_id: The Matrix room ID
|
||
letta_response: LettaResponse object with assistant_text, step_ids, tool_calls, etc.
|
||
user_event_id: The user's original message event ID (for tool reactions)
|
||
|
||
Returns:
|
||
event_id of the sent response, or None if failed
|
||
"""
|
||
# Parse agent control tags before sending
|
||
cleaned_text, tags = parse_agent_control_tags(letta_response.assistant_text)
|
||
|
||
# Send the assistant's response to Matrix
|
||
event_id = await self.send_message(room_id, cleaned_text)
|
||
log.info(f"[{room_id}] Agent: {cleaned_text[:100]}...")
|
||
|
||
# Store message mapping for feedback correlation
|
||
if event_id and letta_response.step_ids:
|
||
await self.store_message_mapping(event_id, room_id, letta_response.step_ids)
|
||
|
||
# Add 🎤 reaction for TTS generation (respects [silent] tag)
|
||
if tags.get('silent'):
|
||
log.info("[AgentControl] Silent mode - skipping TTS for this message")
|
||
elif event_id:
|
||
await self.add_reaction(room_id, event_id, "🎤")
|
||
# Store text for TTS generation via 🎤
|
||
await self.store_audio_message(event_id, cleaned_text)
|
||
|
||
# Log react intent if specified (for debugging/coordinated responses)
|
||
if tags.get('react'):
|
||
log.info(f"[AgentControl] Reaction intent: {tags['react']}")
|
||
|
||
# Add tool summary reactions to user's original message
|
||
await self.add_tool_summary_reactions(room_id, user_event_id, letta_response, source_event_id=event_id or "")
|
||
|
||
return event_id
|
||
|
||
async def process_queue(self):
|
||
"""Background task to process queued messages"""
|
||
global processing_queue
|
||
tick = 0
|
||
|
||
while True:
|
||
# Check for shutdown
|
||
if self.shutting_down:
|
||
log.info("Queue processor stopping due to shutdown")
|
||
break
|
||
|
||
tick += 1
|
||
if tick % 30 == 0:
|
||
log.debug(f"Heartbeat - queue size: {len(message_queue)}")
|
||
|
||
if message_queue and not processing_queue:
|
||
processing_queue = True
|
||
room_id, sender, message_text, user_event_id, retry_count, _source_event_id = message_queue[0]
|
||
|
||
log.info(f"Processing queued message from {sender} (attempt {retry_count + 1})")
|
||
|
||
# Get conversation for this room
|
||
conversation_id = await self.get_or_create_conversation(room_id)
|
||
|
||
letta_response = await asyncio.to_thread(
|
||
send_to_letta, message_text, conversation_id
|
||
)
|
||
|
||
if letta_response.status in ("SUCCESS", "ERROR"):
|
||
message_queue.popleft()
|
||
log.info(f"Delivered queued response to {sender}")
|
||
|
||
if letta_response.status == "SUCCESS":
|
||
# Use common SUCCESS handler
|
||
event_id = await self._handle_success_response(room_id, letta_response, user_event_id)
|
||
else:
|
||
# ERROR path - just send the message
|
||
event_id = await self.send_message(room_id, letta_response.assistant_text)
|
||
|
||
processing_queue = False
|
||
|
||
elif letta_response.status == "BUSY":
|
||
# Increment retry count, preserve all tuple elements
|
||
message_queue[0] = (room_id, sender, message_text, user_event_id, retry_count + 1, "")
|
||
wait_time = min(5 * (2 ** retry_count), 60)
|
||
log.info(f"Agent still busy, retrying in {wait_time}s")
|
||
processing_queue = False
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
await asyncio.sleep(2)
|
||
|
||
# ============================================================
|
||
# HTTP API FOR MCP INTEGRATION
|
||
# ============================================================
|
||
|
||
async def api_health(self, request: web.Request) -> web.Response:
|
||
"""GET /api/health - Health check and status"""
|
||
try:
|
||
rooms = await self.client.get_joined_rooms() if self.client else []
|
||
response = {
|
||
"status": "ok",
|
||
"e2ee_ready": self.crypto is not None,
|
||
"user_id": str(self.user_id) if self.user_id else None,
|
||
"device_id": str(self.device_id) if self.device_id else None,
|
||
"syncing": self.initial_sync_done,
|
||
"joined_rooms": len(rooms),
|
||
"conversations_cached": len(self.conversation_cache),
|
||
}
|
||
# Add heartbeat status if enabled
|
||
if self.heartbeat:
|
||
response["heartbeat"] = self.heartbeat.get_status()
|
||
return web.json_response(response)
|
||
except Exception as e:
|
||
return web.json_response({"status": "error", "error": str(e)}, status=500)
|
||
|
||
async def api_list_rooms(self, request: web.Request) -> web.Response:
|
||
"""GET /api/list_rooms - List joined rooms with encryption status"""
|
||
try:
|
||
if not self.client:
|
||
return web.json_response({"error": "Client not initialized"}, status=503)
|
||
|
||
rooms = []
|
||
joined_rooms = await self.client.get_joined_rooms()
|
||
|
||
for room_id in joined_rooms:
|
||
try:
|
||
# Get room state via API
|
||
state_resp = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
|
||
room_name = getattr(state_resp, 'name', '') or str(room_id)
|
||
except:
|
||
room_name = str(room_id)
|
||
|
||
try:
|
||
# Check encryption status
|
||
await self.client.get_state_event(room_id, EventType.ROOM_ENCRYPTION)
|
||
is_encrypted = True
|
||
except:
|
||
is_encrypted = False
|
||
|
||
rooms.append({
|
||
"room_id": str(room_id),
|
||
"name": room_name,
|
||
"encrypted": is_encrypted,
|
||
})
|
||
|
||
return web.json_response({"rooms": rooms})
|
||
except Exception as e:
|
||
log.error(f"API list_rooms error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
async def api_send_message(self, request: web.Request) -> web.Response:
|
||
"""POST /api/send_message - Send encrypted message to a room
|
||
|
||
Body: {"room_id": "!...", "text": "message", "html": "<b>optional</b>"}
|
||
Returns: {"event_id": "$...", "status": "sent"}
|
||
"""
|
||
try:
|
||
if not self.client:
|
||
return web.json_response({"error": "Client not initialized"}, status=503)
|
||
|
||
data = await request.json()
|
||
room_id = data.get("room_id")
|
||
text = data.get("text")
|
||
html_body = data.get("html")
|
||
|
||
if not room_id or not text:
|
||
return web.json_response(
|
||
{"error": "Missing required fields: room_id and text"},
|
||
status=400
|
||
)
|
||
|
||
room_id = RoomID(room_id)
|
||
|
||
# Use existing send_message method which handles E2EE automatically
|
||
if html_body:
|
||
# Send with explicit HTML
|
||
content = {
|
||
"msgtype": "m.text",
|
||
"body": text,
|
||
"format": "org.matrix.custom.html",
|
||
"formatted_body": html_body,
|
||
}
|
||
event_id = await self.client.send_message_event(room_id, EventType.ROOM_MESSAGE, content)
|
||
else:
|
||
# Use our formatter
|
||
event_id = await self.send_message(room_id, text)
|
||
|
||
return web.json_response({
|
||
"event_id": str(event_id) if event_id else None,
|
||
"status": "sent"
|
||
})
|
||
except Exception as e:
|
||
log.error(f"API send_message error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
async def api_read_room(self, request: web.Request) -> web.Response:
|
||
"""POST /api/read_room - Read decrypted messages from a room
|
||
|
||
Body: {"room_id": "!...", "limit": 50, "since": "$event_id"}
|
||
Returns: {"messages": [{"sender": "@...", "content": "...", "timestamp": ..., "event_id": "$..."}]}
|
||
"""
|
||
try:
|
||
if not self.client:
|
||
return web.json_response({"error": "Client not initialized"}, status=503)
|
||
|
||
data = await request.json()
|
||
room_id_str = data.get("room_id")
|
||
limit = data.get("limit", 20)
|
||
since = data.get("since") # Optional: event_id to start from
|
||
|
||
if not room_id_str:
|
||
return web.json_response({"error": "Missing required field: room_id"}, status=400)
|
||
|
||
room_id = RoomID(room_id_str)
|
||
|
||
# Get messages via raw Matrix API for proper "most recent" behavior
|
||
# The /messages endpoint with dir=b and no 'from' should return from room end
|
||
try:
|
||
# Use the raw API with proper Path builder
|
||
resp = await self.client.api.request(
|
||
"GET",
|
||
MatrixPath.v3.rooms[room_id].messages,
|
||
query_params={
|
||
"dir": "b", # Backwards from end
|
||
"limit": str(limit),
|
||
"filter": '{"types":["m.room.message","m.room.encrypted"]}'
|
||
}
|
||
)
|
||
# resp is a dict with 'chunk', 'start', 'end' keys
|
||
raw_events = resp.get("chunk", [])
|
||
except Exception as e:
|
||
return web.json_response({"error": f"Failed to get room messages: {e}"}, status=500)
|
||
|
||
messages = []
|
||
for evt_dict in raw_events:
|
||
event_id = evt_dict.get("event_id", "")
|
||
|
||
# Skip events before 'since'
|
||
if since and event_id == since:
|
||
messages = []
|
||
continue
|
||
|
||
sender = evt_dict.get("sender", "")
|
||
timestamp = evt_dict.get("origin_server_ts", 0)
|
||
event_type = evt_dict.get("type", "")
|
||
content = evt_dict.get("content", {})
|
||
|
||
# Handle encrypted messages - try to decrypt
|
||
if event_type == "m.room.encrypted" and self.crypto:
|
||
try:
|
||
# Decrypt the event using our crypto machine
|
||
from mautrix.types import EncryptedEvent
|
||
enc_evt = EncryptedEvent.deserialize(evt_dict)
|
||
decrypted = await self.crypto.decrypt_megolm_event(enc_evt)
|
||
content = decrypted.content.serialize() if hasattr(decrypted.content, 'serialize') else {}
|
||
event_type = "m.room.message"
|
||
except Exception as e:
|
||
log.debug(f"Could not decrypt event {event_id}: {e}")
|
||
content = {"body": f"[Encrypted: {event_id[:20]}...]", "msgtype": "m.unknown"}
|
||
|
||
if event_type == "m.room.message":
|
||
msgtype = content.get("msgtype", "m.text")
|
||
body = content.get("body", "")
|
||
else:
|
||
continue # Skip non-message events
|
||
|
||
messages.append({
|
||
"event_id": event_id,
|
||
"sender": sender,
|
||
"content": body,
|
||
"timestamp": timestamp,
|
||
"type": msgtype,
|
||
})
|
||
|
||
# Reverse to get chronological order
|
||
messages.reverse()
|
||
|
||
return web.json_response({"messages": messages})
|
||
except Exception as e:
|
||
log.error(f"API read_room error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
async def api_react(self, request: web.Request) -> web.Response:
|
||
"""POST /api/react - Send reaction to a message
|
||
|
||
Body: {"room_id": "!...", "event_id": "$...", "emoji": "👍"}
|
||
Returns: {"event_id": "$...", "status": "sent"}
|
||
"""
|
||
try:
|
||
if not self.client:
|
||
return web.json_response({"error": "Client not initialized"}, status=503)
|
||
|
||
data = await request.json()
|
||
room_id = data.get("room_id")
|
||
event_id = data.get("event_id")
|
||
emoji_key = data.get("emoji")
|
||
|
||
if not room_id or not event_id or not emoji_key:
|
||
return web.json_response(
|
||
{"error": "Missing required fields: room_id, event_id, emoji"},
|
||
status=400
|
||
)
|
||
|
||
room_id = RoomID(room_id)
|
||
|
||
# Send reaction event
|
||
content = {
|
||
"m.relates_to": {
|
||
"rel_type": "m.annotation",
|
||
"event_id": event_id,
|
||
"key": emoji_key,
|
||
}
|
||
}
|
||
|
||
response = await self.client.send_message_event(
|
||
room_id,
|
||
EventType.REACTION,
|
||
content
|
||
)
|
||
|
||
return web.json_response({
|
||
"event_id": str(response) if response else None,
|
||
"status": "sent"
|
||
})
|
||
except Exception as e:
|
||
log.error(f"API react error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
async def api_get_room_state(self, request: web.Request) -> web.Response:
|
||
"""GET /api/room_state/{room_id} - Get room state (name, topic, encryption)"""
|
||
try:
|
||
if not self.client:
|
||
return web.json_response({"error": "Client not initialized"}, status=503)
|
||
|
||
room_id_str = request.match_info.get("room_id")
|
||
if not room_id_str:
|
||
return web.json_response({"error": "Missing room_id in path"}, status=400)
|
||
|
||
room_id = RoomID(room_id_str)
|
||
|
||
# Get room name
|
||
room_name = None
|
||
try:
|
||
name_data = await self.client.get_state_event(room_id, EventType.ROOM_NAME)
|
||
room_name = getattr(name_data, 'name', '') or None
|
||
except:
|
||
pass
|
||
|
||
# Get room topic
|
||
room_topic = None
|
||
try:
|
||
topic_data = await self.client.get_state_event(room_id, EventType.ROOM_TOPIC)
|
||
room_topic = getattr(topic_data, 'topic', '') or None
|
||
except:
|
||
pass
|
||
|
||
# Check encryption status
|
||
is_encrypted = False
|
||
try:
|
||
await self.client.get_state_event(room_id, EventType.ROOM_ENCRYPTION)
|
||
is_encrypted = True
|
||
except:
|
||
pass
|
||
|
||
return web.json_response({
|
||
"room_id": room_id_str,
|
||
"name": room_name or room_id_str,
|
||
"topic": room_topic,
|
||
"encrypted": is_encrypted,
|
||
})
|
||
except Exception as e:
|
||
log.error(f"API room_state error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
async def api_generate_audio(self, request: web.Request) -> web.Response:
|
||
"""POST /api/generate_audio - Generate TTS audio from text
|
||
|
||
Body: {"text": "text to synthesize", "voice": "voice_name"}
|
||
Returns: {"mxc_url": "mxc://...", "size": bytes, "duration_est": seconds}
|
||
"""
|
||
try:
|
||
if not TTS_URL:
|
||
return web.json_response({"error": "TTS not configured"}, status=503)
|
||
|
||
data = await request.json()
|
||
text = data.get("text", "")
|
||
voice = data.get("voice", TTS_VOICE)
|
||
|
||
if not text:
|
||
return web.json_response({"error": "Missing required field: text"}, status=400)
|
||
|
||
# Generate audio bytes via TTS
|
||
log.info(f"API: Generating TTS for {len(text)} characters...")
|
||
audio_bytes = await asyncio.to_thread(synthesize_speech, text)
|
||
|
||
if not audio_bytes:
|
||
return web.json_response({"error": "TTS generation failed"}, status=500)
|
||
|
||
# Upload to Matrix media server
|
||
content_uri = await self.client.upload_media(
|
||
data=audio_bytes,
|
||
mime_type="audio/mpeg",
|
||
filename="tts-audio.mp3",
|
||
size=len(audio_bytes)
|
||
)
|
||
|
||
if not content_uri:
|
||
return web.json_response({"error": "Failed to upload audio to media server"}, status=500)
|
||
|
||
# Estimate duration (rough: ~1 second per 10 chars for speech)
|
||
duration_est = len(text) / 10.0
|
||
|
||
return web.json_response({
|
||
"mxc_url": content_uri,
|
||
"size": len(audio_bytes),
|
||
"duration_est": round(duration_est, 2),
|
||
"voice": voice,
|
||
})
|
||
except Exception as e:
|
||
log.error(f"API generate_audio error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
async def api_send_audio(self, request: web.Request) -> web.Response:
|
||
"""POST /api/send_audio - Send audio message to a room
|
||
|
||
Body: {"room_id": "!...", "mxc_url": "mxc://..." OR "text": "text_to_tts"}
|
||
Can either send an existing MXC URL or generate TTS on the fly
|
||
|
||
Returns: {"event_id": "$...", "status": "sent"}
|
||
"""
|
||
try:
|
||
if not self.client:
|
||
return web.json_response({"error": "Client not initialized"}, status=503)
|
||
|
||
data = await request.json()
|
||
room_id = data.get("room_id")
|
||
mxc_url = data.get("mxc_url")
|
||
text = data.get("text") # Alternative: generate TTS from text
|
||
voice = data.get("voice", TTS_VOICE)
|
||
caption = data.get("caption", "")
|
||
|
||
if not room_id:
|
||
return web.json_response({"error": "Missing required field: room_id"}, status=400)
|
||
|
||
room_id = RoomID(room_id)
|
||
|
||
# Either use provided MXC URL or generate TTS
|
||
final_mxc_url = mxc_url
|
||
if not final_mxc_url and text:
|
||
log.info(f"API: Generating TTS for audio message...")
|
||
audio_bytes = await asyncio.to_thread(synthesize_speech, text)
|
||
if audio_bytes:
|
||
final_mxc_url = await self.client.upload_media(
|
||
data=audio_bytes,
|
||
mime_type="audio/mpeg",
|
||
filename="tts-audio.mp3",
|
||
size=len(audio_bytes)
|
||
)
|
||
if not final_mxc_url:
|
||
return web.json_response({"error": "TTS generation or upload failed"}, status=500)
|
||
|
||
if not final_mxc_url:
|
||
return web.json_response({"error": "Either mxc_url or text must be provided"}, status=400)
|
||
|
||
# Send audio message
|
||
content = {
|
||
"body": caption or ("Voice message" if text else "Audio"),
|
||
"info": {"mimetype": "audio/mpeg", "size": 0}, # Size not known for existing MXC
|
||
"msgtype": "m.audio",
|
||
"url": final_mxc_url,
|
||
}
|
||
|
||
event_id = await self.client.send_message_event(room_id, EventType.ROOM_MESSAGE, content)
|
||
|
||
return web.json_response({
|
||
"event_id": str(event_id) if event_id else None,
|
||
"status": "sent",
|
||
"mxc_url": final_mxc_url,
|
||
})
|
||
except Exception as e:
|
||
log.error(f"API send_audio error: {e}")
|
||
return web.json_response({"error": str(e)}, status=500)
|
||
|
||
def setup_api_routes(self) -> web.Application:
|
||
"""Set up aiohttp application with API routes"""
|
||
app = web.Application()
|
||
|
||
# Core endpoints
|
||
app.router.add_get("/api/health", self.api_health)
|
||
app.router.add_get("/api/list_rooms", self.api_list_rooms)
|
||
app.router.add_get("/api/room_state/{room_id}", self.api_get_room_state)
|
||
|
||
# Messaging endpoints
|
||
app.router.add_post("/api/send_message", self.api_send_message)
|
||
app.router.add_post("/api/read_room", self.api_read_room)
|
||
app.router.add_post("/api/react", self.api_react)
|
||
|
||
# Audio endpoints
|
||
app.router.add_post("/api/generate_audio", self.api_generate_audio)
|
||
app.router.add_post("/api/send_audio", self.api_send_audio)
|
||
|
||
return app
|
||
|
||
async def run_api_server(self):
|
||
"""Run the HTTP API server"""
|
||
app = self.setup_api_routes()
|
||
runner = web.AppRunner(app)
|
||
await runner.setup()
|
||
site = web.TCPSite(runner, API_HOST, API_PORT)
|
||
await site.start()
|
||
log.info(f"HTTP API server started on http://{API_HOST}:{API_PORT}")
|
||
return runner
|
||
|
||
async def run(self):
|
||
"""Main run loop"""
|
||
# Initialize everything
|
||
await self.init_database()
|
||
await self.init_client()
|
||
await self.init_crypto()
|
||
|
||
# Set up cross-signing (gracefully falls back if unsupported)
|
||
await self.setup_cross_signing()
|
||
|
||
# Load existing conversations from database
|
||
rows = await self.db.fetch("SELECT room_id, conversation_id FROM room_conversations")
|
||
for row in rows:
|
||
self.conversation_cache[RoomID(row["room_id"])] = row["conversation_id"]
|
||
log.info(f"Loaded {len(self.conversation_cache)} existing conversations from database")
|
||
|
||
self.register_handlers()
|
||
|
||
log.info("=" * 50)
|
||
log.info("Meridian Bridge Started")
|
||
log.info(f" Matrix: {self.user_id}")
|
||
log.info(f" Device: {self.device_id}")
|
||
log.info(f" Letta Agent: {LETTA_AGENT_ID}")
|
||
log.info(f" Conversations: {len(self.conversation_cache)} existing + per-room isolation")
|
||
log.info(f" E2EE: Enabled (mautrix-python)")
|
||
log.info(f" Formatting: Full HTML + Colors + Emoji")
|
||
log.info(f" Reactions: Interactive (agent sees and responds)")
|
||
if ENABLE_API:
|
||
log.info(f" HTTP API: http://{API_HOST}:{API_PORT}")
|
||
if HEARTBEAT_ENABLED:
|
||
log.info(f" Heartbeat: Every {HEARTBEAT_INTERVAL_MINUTES} minutes (SILENT MODE)")
|
||
log.info("=" * 50)
|
||
|
||
# Initialize heartbeat service if enabled
|
||
if HEARTBEAT_ENABLED:
|
||
heartbeat_config = HeartbeatConfig(
|
||
enabled=True,
|
||
interval_minutes=HEARTBEAT_INTERVAL_MINUTES,
|
||
skip_if_recent_minutes=HEARTBEAT_SKIP_IF_RECENT_MINUTES,
|
||
target_room_id=HEARTBEAT_TARGET_ROOM,
|
||
)
|
||
self.heartbeat = HeartbeatService(
|
||
config=heartbeat_config,
|
||
send_to_agent=send_to_letta,
|
||
get_conversation_id=self._get_conversation_id_for_heartbeat,
|
||
)
|
||
log.info(f"Heartbeat service initialized (interval: {HEARTBEAT_INTERVAL_MINUTES}m)")
|
||
|
||
# Start queue processor
|
||
queue_task = asyncio.create_task(self.process_queue())
|
||
|
||
# Start HTTP API server if enabled
|
||
api_runner = None
|
||
if ENABLE_API:
|
||
api_runner = await self.run_api_server()
|
||
|
||
try:
|
||
# Initial sync with retry logic
|
||
log.info("Performing initial sync...")
|
||
try:
|
||
await self.client.sync(timeout=30000)
|
||
except MUnknownToken:
|
||
log.warning("Token expired during initial sync, re-authenticating...")
|
||
await self._reauth_with_password()
|
||
# Retry sync with new token
|
||
await self.client.sync(timeout=30000)
|
||
|
||
self.initial_sync_done = True
|
||
log.info("Initial sync complete, now processing new messages only")
|
||
|
||
# Send hello after sync (if enabled)
|
||
if SEND_STARTUP_MESSAGE:
|
||
try:
|
||
rooms = await self.client.get_joined_rooms()
|
||
for room_id in rooms:
|
||
# Test formatting in hello message
|
||
hello = "Meridian bridge online! 🚀\n\n**Features:**\n• {hot_pink|Emoji reactions} - I see and respond to your reactions!\n• {purple|Per-room conversations} - Isolated context per room\n• Matrix extensions - {hot_pink|spoilers}, colors, markdown\n• {green|Full HTML formatting} - Beautiful responses!"
|
||
await self.send_message(room_id, hello)
|
||
except Exception as e:
|
||
log.warning(f"Could not send hello: {e}")
|
||
else:
|
||
log.info("Startup message disabled (SEND_STARTUP_MESSAGE=0)")
|
||
|
||
# Start the syncer (non-blocking, runs in background)
|
||
log.info("Starting sync loop...")
|
||
self.client.start(None) # None = no filter
|
||
|
||
# Start heartbeat service after sync is complete
|
||
if self.heartbeat:
|
||
self.heartbeat.start()
|
||
log.info("Heartbeat service started (SILENT MODE)")
|
||
|
||
# Keep running until shutdown is signaled
|
||
await self.shutdown_event.wait() # Wait for shutdown signal
|
||
|
||
except (KeyboardInterrupt, asyncio.CancelledError):
|
||
log.info("Shutting down...")
|
||
finally:
|
||
# Stop heartbeat service
|
||
if self.heartbeat:
|
||
self.heartbeat.stop()
|
||
queue_task.cancel()
|
||
if api_runner:
|
||
await api_runner.cleanup()
|
||
self.client.stop()
|
||
if self.crypto_store:
|
||
await self.crypto_store.close()
|
||
if self.db:
|
||
await self.db.stop()
|
||
|
||
async def _get_conversation_id_for_heartbeat(self, room_id: str) -> str | None:
|
||
"""Helper for heartbeat to get conversation ID for a room"""
|
||
return await self.get_or_create_conversation(RoomID(room_id))
|
||
|
||
|
||
async def main():
|
||
bridge = MeridianBridge()
|
||
|
||
# Set up signal handlers for graceful shutdown
|
||
def signal_handler():
|
||
log.info("Received shutdown signal, initiating graceful shutdown...")
|
||
bridge.shutting_down = True
|
||
bridge.shutdown_event.set()
|
||
|
||
loop = asyncio.get_running_loop()
|
||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||
loop.add_signal_handler(sig, signal_handler)
|
||
|
||
try:
|
||
await bridge.run()
|
||
finally:
|
||
log.info("Meridian Bridge stopped")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|