Files
matrix-bridge-legacy/DEBOUNCER_INTEGRATION.md
2026-03-28 23:50:54 -04:00

189 lines
5.9 KiB
Markdown

# Debouncer Integration Steps
## Quick Setup (3 steps)
### Step 1: Add import to bridge-e2ee.py
Add this line near other imports (around line 61):
```python
from debouncer import create_message_debouncer, MessageDebouncer
```
### Step 2: Initialize debouncer in Bridge.__init__
Add this in the Bridge class __init__ or run method (around line 2650, near heartbeat setup):
```python
# Initialize message debouncer
self.debouncer = create_message_debouncer(
debounce_ms=2000, # 2 second window
on_flush=self.process_batched_messages,
)
```
### Step 3: Add process_batched_messages method
Add this method to the Bridge class (anywhere, but before run()):
```python
async def process_batched_messages(self, messages: list) -> None:
"""Process batched messages from the debouncer"""
if not messages:
return
# Combine all messages
combined_text = "\n\n".join([msg['text'] for msg in messages])
room_id = messages[0]['room_id']
sender = messages[0]['sender']
evt = messages[0]['event'] # Use first event as representative
log.info(f"[Debouncer] Processing {len(messages)} batched messages for {sender}")
log.debug(f"Combined text: {combined_text[:100]}...")
# Call existing message processing logic directly
await self._handle_message_after_debounce(evt, room_id, sender, combined_text)
async def _handle_message_after_debounce(self, evt, room_id, sender, combined_text):
"""Helper to process message after debouncing (bypasses debouncer)"""
# Wrap with send_typing
async with self._send_typing_wrapper(room_id):
# Send to Letta
response, status, step_ids = await asyncio.to_thread(
send_to_letta,
combined_text,
conversation_id=await self.get_or_create_conversation(room_id),
)
# Process response (same as your existing code)
if status == "SUCCESS":
# Handle response with images if any
# Add to conversation cache
await self._add_to_conversation(room_id, sender, response)
# Send response to Matrix
await self.send_message(room_id, response)
elif status == "BUSY":
# Handle busy state
message_queue.append((room_id, sender, combined_text, 0))
await self.send_message(room_id, "⏳ Agent busy, queued")
else:
await self.send_message(room_id, response)
```
### Step 4: Wrap on_message to use debouncer
Modify the existing `async def on_message(self, evt):` method:
Add this at the beginning (right after the old message checks but before image/audio handling):
```python
# Debounce text messages
if evt.content.msgtype == MessageType.TEXT and not body.startswith("!"):
# Skip debouncing for commands
if body.startswith("!"):
pass # Let commands pass through, they'll be handled later
else:
# Enqueue for debouncing
await self.debouncer.enqueue({
'room_id': room_id,
'sender': sender,
'text': body,
'event': evt, # Store full event for processing
})
return # Return early - debouncer will handle processing
# Existing image/audio handling continues below...
```
### Full Example: Modified on_message
```python
async def on_message(self, evt):
"""Handle incoming messages (text and images)"""
# Ignore messages during initial sync.
if not self.initial_sync_done:
return
# Ignore old messages (more than 60 seconds old)
event_time = datetime.fromtimestamp(evt.timestamp / 1000)
message_age = datetime.now() - event_time
if message_age > timedelta(seconds=60):
log.debug(f"Ignoring old message ({message_age.seconds}s old) from {evt.sender}")
return
# Ignore own messages
if evt.sender == self.user_id:
return
room_id = evt.room_id
sender = evt.sender
body = evt.content.body
# Update heartbeat tracker (user is active)
if self.heartbeat:
self.heartbeat.update_last_user_message(str(room_id))
# Handle images (skip debouncing)
if evt.content.msgtype == MessageType.IMAGE:
return await self.on_image(evt, room_id, sender, body)
# Handle audio (skip debouncing)
if evt.content.msgtype == MessageType.AUDIO:
return await self.on_audio(evt, room_id, sender, body)
# Only handle text messages
if evt.content.msgtype != MessageType.TEXT:
return
log.info(f"[{room_id}] {sender}: {body}")
# Handle bridge commands (skip debouncing)
if body.startswith("!"):
# Command handling logic here...
cmd = body.strip().lower()
if cmd in ("!new", "!newconversation", "!fresh", "!reset"):
await self.reset_conversation(room_id)
await self.send_message(room_id, "🔄 Fresh conversation started. What's on your mind?")
return
# ... other commands ...
# DEBOUNCER: Queue text messages
if evt.content.msgtype == MessageType.TEXT:
await self.debouncer.enqueue({
'room_id': room_id,
'sender': sender,
'text': body,
'event': evt, # Store full event for processing
})
return
```
## Testing
Send messages rapidly (within 2 seconds):
```
User: Hey
User: Are you there?
User: Hello??
```
Result: They'll be combined and sent to Letta as one message:
```
"Hey\n\nAre you there?\n\nHello??"
```
## Configuration
Adjust the debounce window in `create_message_debouncer()`:
- `debounce_ms=2000` = 2 second window (default)
- `debounce_ms=3000` = 3 second window (for slower users)
- `debounce_ms=1000` = 1 second window (for rapid fire)
- `debounce_ms=0` = DISABLED (process immediately)
## Benefits
1. **Reduces agent overhead**: Batch rapid messages instead of calling Letta multiple times
2. **Better UX**: Combines split thoughts into coherent messages
3. **Lower costs**: Fewer API calls to Letta
4. **More natural**: Matches how humans type (pause, think, continue)