Files
matrix-bridge-legacy/debouncer.py
2026-03-28 23:50:54 -04:00

203 lines
7.0 KiB
Python

#!/usr/bin/env python3
"""
Message Debouncing Utility
Batches rapid consecutive messages from the same sender.
Based on lettabot's inbound-debounce pattern.
This reduces agent session overhead and improves user experience
by combining messages like:
"Hey" + "Are you there?" + "Hello??""Hey\nAre you there?\nHello??"
"""
import asyncio
import logging
from datetime import datetime
from typing import TypeVar, Generic, Optional, Callable, Awaitable, List
from dataclasses import dataclass, field
log = logging.getLogger("meridian.debouncer")
T = TypeVar('T')
@dataclass
class DebounceBuffer(Generic[T]):
"""Debounce buffer for a specific key"""
items: List[T] = field(default_factory=list)
task: Optional[asyncio.Task] = None
class MessageDebouncer(Generic[T]):
"""
Message debouncer - batches rapid consecutive messages.
Usage:
debouncer = MessageDebouncer(
debounce_ms=2000, # 2 second window
build_key=lambda msg: f"{msg.room_id}:{msg.sender}",
should_debounce=lambda msg: not msg.has_media, # Don't debounce media
on_flush=process_messages,
)
await debouncer.enqueue(message)
"""
def __init__(
self,
debounce_ms: int,
build_key: Callable[[T], Optional[str]],
on_flush: Callable[[List[T]], Awaitable[None]],
should_debounce: Optional[Callable[[T], bool]] = None,
on_error: Optional[Callable[[Exception, List[T]], None]] = None,
):
"""
Initialize the debouncer.
Args:
debounce_ms: Debounce window in milliseconds.
Messages within this window are batched together.
Set to 0 to disable debouncing.
build_key: Function to build a unique key for an item.
Items with the same key are debounced together.
Return None to skip debouncing for this item.
on_flush: Callback to process batched items.
Called with array of items after debounce window expires.
should_debounce: Optional predicate to determine if item should be debounced.
Return False to process immediately even if debounce_ms > 0.
on_error: Optional error handler for flush failures.
"""
self.debounce_ms = max(0, debounce_ms)
self.debounce_seconds = self.debounce_ms / 1000.0
self.build_key = build_key
self.on_flush = on_flush
self.should_debounce = should_debounce or (lambda _: True)
self.on_error = on_error
self._buffers: dict[str, DebounceBuffer[T]] = {}
self._lock = asyncio.Lock()
async def _flush_buffer(self, key: str) -> None:
"""Flush a specific buffer"""
async with self._lock:
buffer = self._buffers.pop(key, None)
if not buffer or not buffer.items:
return
# Cancel the task if still pending
if buffer.task and not buffer.task.done():
buffer.task.cancel()
# Process items outside the lock
items = buffer.items
try:
log.debug(f"[Debouncer] Flushing {len(items)} items for key: {key[:30]}...")
await self.on_flush(items)
except Exception as e:
log.error(f"[Debouncer] Flush error for key {key}: {e}")
if self.on_error:
self.on_error(e, items)
async def _schedule_flush(self, key: str) -> None:
"""Schedule a flush after the debounce window"""
await asyncio.sleep(self.debounce_seconds)
await self._flush_buffer(key)
async def enqueue(self, item: T) -> None:
"""
Enqueue an item for debouncing.
If debouncing is disabled or item shouldn't be debounced,
processes immediately.
"""
key = self.build_key(item)
can_debounce = self.debounce_ms > 0 and self.should_debounce(item)
# Process immediately if debouncing disabled or item shouldn't be debounced
if not can_debounce or not key:
# Flush any pending items with this key first
if key and key in self._buffers:
await self.flush_key(key)
# Process this item immediately
try:
await self.on_flush([item])
except Exception as e:
log.error(f"[Debouncer] Immediate flush error: {e}")
if self.on_error:
self.on_error(e, [item])
return
async with self._lock:
existing = self._buffers.get(key)
if existing:
# Add to existing buffer and reschedule
existing.items.append(item)
log.debug(
f"[Debouncer] Added to buffer for {key[:30]}... "
f"(now {len(existing.items)} items)"
)
# Cancel old task and reschedule (extends window)
if existing.task and not existing.task.done():
existing.task.cancel()
existing.task = asyncio.create_task(self._schedule_flush(key))
else:
# Create new buffer
buffer = DebounceBuffer(items=[item])
buffer.task = asyncio.create_task(self._schedule_flush(key))
self._buffers[key] = buffer
log.debug(f"[Debouncer] Created buffer for {key[:30]}...")
async def flush_key(self, key: str) -> None:
"""Flush items for a specific key immediately"""
await self._flush_buffer(key)
async def flush_all(self) -> None:
"""Flush all pending buffers"""
async with self._lock:
keys = list(self._buffers.keys())
for key in keys:
await self._flush_buffer(key)
def get_stats(self) -> dict:
"""Get debouncer statistics"""
return {
"debounce_ms": self.debounce_ms,
"active_buffers": len(self._buffers),
"buffer_keys": list(self._buffers.keys()),
}
def create_message_debouncer(
debounce_ms: int,
on_flush: Callable[[List[dict]], Awaitable[None]],
) -> MessageDebouncer[dict]:
"""
Create a message debouncer for Matrix messages.
Args:
debounce_ms: Debounce window in milliseconds (e.g., 2000 for 2 seconds)
on_flush: Callback to process batched messages
Returns:
MessageDebouncer configured for Matrix messages
"""
return MessageDebouncer(
debounce_ms=debounce_ms,
build_key=lambda msg: f"{msg.get('room_id')}:{msg.get('sender')}",
should_debounce=lambda msg: (
# Don't debounce messages with media
not msg.get('has_image') and
not msg.get('has_audio') and
# Don't debounce commands
not msg.get('text', '').startswith('!')
),
on_flush=on_flush,
on_error=lambda e, items: log.error(
f"[Debouncer] Failed to process {len(items)} messages: {e}"
),
)