# Debouncer Integration Steps ## Quick Setup (3 steps) ### Step 1: Add import to bridge-e2ee.py Add this line near other imports (around line 61): ```python from debouncer import create_message_debouncer, MessageDebouncer ``` ### Step 2: Initialize debouncer in Bridge.__init__ Add this in the Bridge class __init__ or run method (around line 2650, near heartbeat setup): ```python # Initialize message debouncer self.debouncer = create_message_debouncer( debounce_ms=2000, # 2 second window on_flush=self.process_batched_messages, ) ``` ### Step 3: Add process_batched_messages method Add this method to the Bridge class (anywhere, but before run()): ```python async def process_batched_messages(self, messages: list) -> None: """Process batched messages from the debouncer""" if not messages: return # Combine all messages combined_text = "\n\n".join([msg['text'] for msg in messages]) room_id = messages[0]['room_id'] sender = messages[0]['sender'] evt = messages[0]['event'] # Use first event as representative log.info(f"[Debouncer] Processing {len(messages)} batched messages for {sender}") log.debug(f"Combined text: {combined_text[:100]}...") # Call existing message processing logic directly await self._handle_message_after_debounce(evt, room_id, sender, combined_text) async def _handle_message_after_debounce(self, evt, room_id, sender, combined_text): """Helper to process message after debouncing (bypasses debouncer)""" # Wrap with send_typing async with self._send_typing_wrapper(room_id): # Send to Letta response, status, step_ids = await asyncio.to_thread( send_to_letta, combined_text, conversation_id=await self.get_or_create_conversation(room_id), ) # Process response (same as your existing code) if status == "SUCCESS": # Handle response with images if any # Add to conversation cache await self._add_to_conversation(room_id, sender, response) # Send response to Matrix await self.send_message(room_id, response) elif status == "BUSY": # Handle busy state message_queue.append((room_id, sender, combined_text, 0)) await self.send_message(room_id, "⏳ Agent busy, queued") else: await self.send_message(room_id, response) ``` ### Step 4: Wrap on_message to use debouncer Modify the existing `async def on_message(self, evt):` method: Add this at the beginning (right after the old message checks but before image/audio handling): ```python # Debounce text messages if evt.content.msgtype == MessageType.TEXT and not body.startswith("!"): # Skip debouncing for commands if body.startswith("!"): pass # Let commands pass through, they'll be handled later else: # Enqueue for debouncing await self.debouncer.enqueue({ 'room_id': room_id, 'sender': sender, 'text': body, 'event': evt, # Store full event for processing }) return # Return early - debouncer will handle processing # Existing image/audio handling continues below... ``` ### Full Example: Modified on_message ```python async def on_message(self, evt): """Handle incoming messages (text and images)""" # Ignore messages during initial sync. if not self.initial_sync_done: return # Ignore old messages (more than 60 seconds old) event_time = datetime.fromtimestamp(evt.timestamp / 1000) message_age = datetime.now() - event_time if message_age > timedelta(seconds=60): log.debug(f"Ignoring old message ({message_age.seconds}s old) from {evt.sender}") return # Ignore own messages if evt.sender == self.user_id: return room_id = evt.room_id sender = evt.sender body = evt.content.body # Update heartbeat tracker (user is active) if self.heartbeat: self.heartbeat.update_last_user_message(str(room_id)) # Handle images (skip debouncing) if evt.content.msgtype == MessageType.IMAGE: return await self.on_image(evt, room_id, sender, body) # Handle audio (skip debouncing) if evt.content.msgtype == MessageType.AUDIO: return await self.on_audio(evt, room_id, sender, body) # Only handle text messages if evt.content.msgtype != MessageType.TEXT: return log.info(f"[{room_id}] {sender}: {body}") # Handle bridge commands (skip debouncing) if body.startswith("!"): # Command handling logic here... cmd = body.strip().lower() if cmd in ("!new", "!newconversation", "!fresh", "!reset"): await self.reset_conversation(room_id) await self.send_message(room_id, "🔄 Fresh conversation started. What's on your mind?") return # ... other commands ... # DEBOUNCER: Queue text messages if evt.content.msgtype == MessageType.TEXT: await self.debouncer.enqueue({ 'room_id': room_id, 'sender': sender, 'text': body, 'event': evt, # Store full event for processing }) return ``` ## Testing Send messages rapidly (within 2 seconds): ``` User: Hey User: Are you there? User: Hello?? ``` Result: They'll be combined and sent to Letta as one message: ``` "Hey\n\nAre you there?\n\nHello??" ``` ## Configuration Adjust the debounce window in `create_message_debouncer()`: - `debounce_ms=2000` = 2 second window (default) - `debounce_ms=3000` = 3 second window (for slower users) - `debounce_ms=1000` = 1 second window (for rapid fire) - `debounce_ms=0` = DISABLED (process immediately) ## Benefits 1. **Reduces agent overhead**: Batch rapid messages instead of calling Letta multiple times 2. **Better UX**: Combines split thoughts into coherent messages 3. **Lower costs**: Fewer API calls to Letta 4. **More natural**: Matches how humans type (pause, think, continue)