Files
matrix-bridge-legacy/bridge_with_debouncer.py
2026-03-28 23:50:54 -04:00

219 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""
Matrix-Letta Bridge with Debouncer Integration
HOW TO: Add debouncer to existing bridge-e2ee.py
"""
import asyncio
from typing import TypeVar, Generic, Optional, Callable, Awaitable, List
from dataclasses import dataclass, field
T = TypeVar('T')
@dataclass
class DebounceBuffer(Generic[T]):
"""Debounce buffer for a specific key"""
items: List[T] = field(default_factory=list)
task: Optional[asyncio.Task] = None
class MessageDebouncer(Generic[T]):
"""Message debouncer - batches rapid consecutive messages."""
def __init__(
self,
debounce_ms: int,
build_key: Callable[[T], Optional[str]],
on_flush: Callable[[List[T]], Awaitable[None]],
should_debounce: Optional[Callable[[T], bool]] = None,
on_error: Optional[Callable[[Exception, List[T]], None]] = None,
):
self.debounce_ms = max(0, debounce_ms)
self.debounce_seconds = self.debounce_ms / 1000.0
self.build_key = build_key
self.on_flush = on_flush
self.should_debounce = should_debounce or (lambda _: True)
self.on_error = on_error
self._buffers: dict[str, DebounceBuffer[T]] = {}
self._lock = asyncio.Lock()
async def _flush_buffer(self, key: str) -> None:
"""Flush a specific buffer"""
async with self._lock:
buffer = self._buffers.pop(key, None)
if not buffer or not buffer.items:
return
if buffer.task and not buffer.task.done():
buffer.task.cancel()
items = buffer.items
try:
print(f"[Debounce] Flushing {len(items)} messages")
await self.on_flush(items)
except Exception as e:
print(f"[Debounce] Flush error: {e}")
if self.on_error:
self.on_error(e, items)
async def _schedule_flush(self, key: str) -> None:
"""Schedule a flush after the debounce window"""
await asyncio.sleep(self.debounce_seconds)
await self._flush_buffer(key)
async def enqueue(self, item: T) -> None:
"""Enqueue an item for debouncing."""
key = self.build_key(item)
can_debounce = self.debounce_ms > 0 and self.should_debounce(item)
if not can_debounce or not key:
# Process immediately
if key and key in self._buffers:
await self.flush_key(key)
try:
await self.on_flush([item])
except Exception as e:
print(f"[Debounce] Immediate flush error: {e}")
if self.on_error:
self.on_error(e, [item])
return
async with self._lock:
existing = self._buffers.get(key)
if existing:
# Add to existing buffer
existing.items.append(item)
print(f"[Debounce] Added to buffer for {key[:30]}... (now {len(existing.items)})")
# Cancel old task and reschedule
if existing.task and not existing.task.done():
existing.task.cancel()
existing.task = asyncio.create_task(self._schedule_flush(key))
else:
# Create new buffer
buffer = DebounceBuffer(items=[item])
buffer.task = asyncio.create_task(self._schedule_flush(key))
self._buffers[key] = buffer
print(f"[Debounce] Created buffer for {key[:30]}...")
async def flush_key(self, key: str) -> None:
"""Flush items for a specific key immediately"""
await self._flush_buffer(key)
async def flush_all(self) -> None:
"""Flush all pending buffers"""
async with self._lock:
keys = list(self._buffers.keys())
for key in keys:
await self._flush_buffer(key)
def get_stats(self) -> dict:
"""Get debouncer statistics"""
return {
"debounce_ms": self.debounce_ms,
"active_buffers": len(self._buffers),
"buffer_keys": list(self._buffers.keys()),
}
def create_message_debouncer(
debounce_ms: int,
on_flush: Callable[[List[dict]], Awaitable[None]],
) -> MessageDebouncer[dict]:
"""Create a message debouncer for Matrix messages."""
return MessageDebouncer(
debounce_ms=debounce_ms,
build_key=lambda msg: f"{msg.get('room_id')}:{msg.get('sender')}",
should_debounce=lambda msg: (
not msg.get('has_image') and
not msg.get('has_audio') and
not msg.get('text', '').startswith('!')
),
on_flush=on_flush,
on_error=lambda e, items: print(f"[Debouncer] Failed to process {len(items)} messages: {e}"),
)
# Integration steps
"""
STEP 1: In bridge-e2ee.py, add import:
from debouncer import create_message_debouncer, MessageDebouncer
STEP 2: In Bridge.__init__ or init_database(), add:
# Initialize message debouncer
self.debouncer = create_message_debouncer(
debounce_ms=2000, # 2 second window
on_flush=self.process_batched_messages,
)
STEP 3: Add process_batched_messages method to Bridge class:
async def process_batched_messages(self, messages: list[dict]) -> None:
# Process batched messages
combined_text = "\n\n".join([msg['text'] for msg in messages])
room_id = messages[0]['room_id']
sender = messages[0]['sender']
# Use the first message as representative
first_msg = messages[0]
# Handle images if present (use first image)
images = []
if any(msg.get('has_image') for msg in messages):
# Find first message with image data
for msg in messages:
if msg.get('has_image') and msg.get('image_data'):
images = [msg['image_data']]
break
# Call your existing on_message logic
await self.on_message_debounced(
room_id=room_id,
sender=sender,
text=combined_text,
images=images
)
STEP 4: Wrap on_message with debouncer.enqueue:
old_on_message = self.on_message
async def on_message(self, evt):
"""Handle incoming messages with debouncing"""
# Skip debouncing for non-text messages
if evt.content.msgtype != MessageType.TEXT:
return await old_on_message(evt)
# Create message dict for debouncer
message = {
'room_id': evt.room_id,
'sender': evt.sender,
'text': evt.content.body,
'has_image': False,
'has_audio': False,
'timestamp': datetime.now(),
}
# Enqueue for debouncing
await self.debouncer.enqueue(message)
STEP 5: Add on_message_debounced method:
async def on_message_debounced(self, room_id: str, sender: str, text: str, images: list) -> None:
# Reuse existing logic but with batched text/images
# This is essentially the same as your current on_message
# but handles combined messages
...
"""
if __name__ == "__main__":
print("Debounce integration helper module")
print("\nUsage:")
print(" Add the ST STEPS above to bridge-e2ee.py")
print("\nConfiguration:")
print(" debounce_ms: 2000 # 2 second window for batching messages")
print("\nTest with:")
print(" Send multiple messages rapidly (within 2 seconds):")
print(" 'Hey' then 'Are you there?' then 'Hello??'")
print(" They will be combined into: 'Hey\n\nAre you there?\n\nHello??'")