#!/usr/bin/env python3 """ Message Debouncing Utility Batches rapid consecutive messages from the same sender. Based on lettabot's inbound-debounce pattern. This reduces agent session overhead and improves user experience by combining messages like: "Hey" + "Are you there?" + "Hello??" → "Hey\nAre you there?\nHello??" """ import asyncio import logging from datetime import datetime from typing import TypeVar, Generic, Optional, Callable, Awaitable, List from dataclasses import dataclass, field log = logging.getLogger("meridian.debouncer") T = TypeVar('T') @dataclass class DebounceBuffer(Generic[T]): """Debounce buffer for a specific key""" items: List[T] = field(default_factory=list) task: Optional[asyncio.Task] = None class MessageDebouncer(Generic[T]): """ Message debouncer - batches rapid consecutive messages. Usage: debouncer = MessageDebouncer( debounce_ms=2000, # 2 second window build_key=lambda msg: f"{msg.room_id}:{msg.sender}", should_debounce=lambda msg: not msg.has_media, # Don't debounce media on_flush=process_messages, ) await debouncer.enqueue(message) """ def __init__( self, debounce_ms: int, build_key: Callable[[T], Optional[str]], on_flush: Callable[[List[T]], Awaitable[None]], should_debounce: Optional[Callable[[T], bool]] = None, on_error: Optional[Callable[[Exception, List[T]], None]] = None, ): """ Initialize the debouncer. Args: debounce_ms: Debounce window in milliseconds. Messages within this window are batched together. Set to 0 to disable debouncing. build_key: Function to build a unique key for an item. Items with the same key are debounced together. Return None to skip debouncing for this item. on_flush: Callback to process batched items. Called with array of items after debounce window expires. should_debounce: Optional predicate to determine if item should be debounced. Return False to process immediately even if debounce_ms > 0. on_error: Optional error handler for flush failures. """ self.debounce_ms = max(0, debounce_ms) self.debounce_seconds = self.debounce_ms / 1000.0 self.build_key = build_key self.on_flush = on_flush self.should_debounce = should_debounce or (lambda _: True) self.on_error = on_error self._buffers: dict[str, DebounceBuffer[T]] = {} self._lock = asyncio.Lock() async def _flush_buffer(self, key: str) -> None: """Flush a specific buffer""" async with self._lock: buffer = self._buffers.pop(key, None) if not buffer or not buffer.items: return # Cancel the task if still pending if buffer.task and not buffer.task.done(): buffer.task.cancel() # Process items outside the lock items = buffer.items try: log.debug(f"[Debouncer] Flushing {len(items)} items for key: {key[:30]}...") await self.on_flush(items) except Exception as e: log.error(f"[Debouncer] Flush error for key {key}: {e}") if self.on_error: self.on_error(e, items) async def _schedule_flush(self, key: str) -> None: """Schedule a flush after the debounce window""" await asyncio.sleep(self.debounce_seconds) await self._flush_buffer(key) async def enqueue(self, item: T) -> None: """ Enqueue an item for debouncing. If debouncing is disabled or item shouldn't be debounced, processes immediately. """ key = self.build_key(item) can_debounce = self.debounce_ms > 0 and self.should_debounce(item) # Process immediately if debouncing disabled or item shouldn't be debounced if not can_debounce or not key: # Flush any pending items with this key first if key and key in self._buffers: await self.flush_key(key) # Process this item immediately try: await self.on_flush([item]) except Exception as e: log.error(f"[Debouncer] Immediate flush error: {e}") if self.on_error: self.on_error(e, [item]) return async with self._lock: existing = self._buffers.get(key) if existing: # Add to existing buffer and reschedule existing.items.append(item) log.debug( f"[Debouncer] Added to buffer for {key[:30]}... " f"(now {len(existing.items)} items)" ) # Cancel old task and reschedule (extends window) if existing.task and not existing.task.done(): existing.task.cancel() existing.task = asyncio.create_task(self._schedule_flush(key)) else: # Create new buffer buffer = DebounceBuffer(items=[item]) buffer.task = asyncio.create_task(self._schedule_flush(key)) self._buffers[key] = buffer log.debug(f"[Debouncer] Created buffer for {key[:30]}...") async def flush_key(self, key: str) -> None: """Flush items for a specific key immediately""" await self._flush_buffer(key) async def flush_all(self) -> None: """Flush all pending buffers""" async with self._lock: keys = list(self._buffers.keys()) for key in keys: await self._flush_buffer(key) def get_stats(self) -> dict: """Get debouncer statistics""" return { "debounce_ms": self.debounce_ms, "active_buffers": len(self._buffers), "buffer_keys": list(self._buffers.keys()), } def create_message_debouncer( debounce_ms: int, on_flush: Callable[[List[dict]], Awaitable[None]], ) -> MessageDebouncer[dict]: """ Create a message debouncer for Matrix messages. Args: debounce_ms: Debounce window in milliseconds (e.g., 2000 for 2 seconds) on_flush: Callback to process batched messages Returns: MessageDebouncer configured for Matrix messages """ return MessageDebouncer( debounce_ms=debounce_ms, build_key=lambda msg: f"{msg.get('room_id')}:{msg.get('sender')}", should_debounce=lambda msg: ( # Don't debounce messages with media not msg.get('has_image') and not msg.get('has_audio') and # Don't debounce commands not msg.get('text', '').startswith('!') ), on_flush=on_flush, on_error=lambda e, items: log.error( f"[Debouncer] Failed to process {len(items)} messages: {e}" ), )