5.9 KiB
5.9 KiB
Debouncer Integration Steps
Quick Setup (3 steps)
Step 1: Add import to bridge-e2ee.py
Add this line near other imports (around line 61):
from debouncer import create_message_debouncer, MessageDebouncer
Step 2: Initialize debouncer in Bridge.init
Add this in the Bridge class init or run method (around line 2650, near heartbeat setup):
# Initialize message debouncer
self.debouncer = create_message_debouncer(
debounce_ms=2000, # 2 second window
on_flush=self.process_batched_messages,
)
Step 3: Add process_batched_messages method
Add this method to the Bridge class (anywhere, but before run()):
async def process_batched_messages(self, messages: list) -> None:
"""Process batched messages from the debouncer"""
if not messages:
return
# Combine all messages
combined_text = "\n\n".join([msg['text'] for msg in messages])
room_id = messages[0]['room_id']
sender = messages[0]['sender']
evt = messages[0]['event'] # Use first event as representative
log.info(f"[Debouncer] Processing {len(messages)} batched messages for {sender}")
log.debug(f"Combined text: {combined_text[:100]}...")
# Call existing message processing logic directly
await self._handle_message_after_debounce(evt, room_id, sender, combined_text)
async def _handle_message_after_debounce(self, evt, room_id, sender, combined_text):
"""Helper to process message after debouncing (bypasses debouncer)"""
# Wrap with send_typing
async with self._send_typing_wrapper(room_id):
# Send to Letta
response, status, step_ids = await asyncio.to_thread(
send_to_letta,
combined_text,
conversation_id=await self.get_or_create_conversation(room_id),
)
# Process response (same as your existing code)
if status == "SUCCESS":
# Handle response with images if any
# Add to conversation cache
await self._add_to_conversation(room_id, sender, response)
# Send response to Matrix
await self.send_message(room_id, response)
elif status == "BUSY":
# Handle busy state
message_queue.append((room_id, sender, combined_text, 0))
await self.send_message(room_id, "⏳ Agent busy, queued")
else:
await self.send_message(room_id, response)
Step 4: Wrap on_message to use debouncer
Modify the existing async def on_message(self, evt): method:
Add this at the beginning (right after the old message checks but before image/audio handling):
# Debounce text messages
if evt.content.msgtype == MessageType.TEXT and not body.startswith("!"):
# Skip debouncing for commands
if body.startswith("!"):
pass # Let commands pass through, they'll be handled later
else:
# Enqueue for debouncing
await self.debouncer.enqueue({
'room_id': room_id,
'sender': sender,
'text': body,
'event': evt, # Store full event for processing
})
return # Return early - debouncer will handle processing
# Existing image/audio handling continues below...
Full Example: Modified on_message
async def on_message(self, evt):
"""Handle incoming messages (text and images)"""
# Ignore messages during initial sync.
if not self.initial_sync_done:
return
# Ignore old messages (more than 60 seconds old)
event_time = datetime.fromtimestamp(evt.timestamp / 1000)
message_age = datetime.now() - event_time
if message_age > timedelta(seconds=60):
log.debug(f"Ignoring old message ({message_age.seconds}s old) from {evt.sender}")
return
# Ignore own messages
if evt.sender == self.user_id:
return
room_id = evt.room_id
sender = evt.sender
body = evt.content.body
# Update heartbeat tracker (user is active)
if self.heartbeat:
self.heartbeat.update_last_user_message(str(room_id))
# Handle images (skip debouncing)
if evt.content.msgtype == MessageType.IMAGE:
return await self.on_image(evt, room_id, sender, body)
# Handle audio (skip debouncing)
if evt.content.msgtype == MessageType.AUDIO:
return await self.on_audio(evt, room_id, sender, body)
# Only handle text messages
if evt.content.msgtype != MessageType.TEXT:
return
log.info(f"[{room_id}] {sender}: {body}")
# Handle bridge commands (skip debouncing)
if body.startswith("!"):
# Command handling logic here...
cmd = body.strip().lower()
if cmd in ("!new", "!newconversation", "!fresh", "!reset"):
await self.reset_conversation(room_id)
await self.send_message(room_id, "🔄 Fresh conversation started. What's on your mind?")
return
# ... other commands ...
# DEBOUNCER: Queue text messages
if evt.content.msgtype == MessageType.TEXT:
await self.debouncer.enqueue({
'room_id': room_id,
'sender': sender,
'text': body,
'event': evt, # Store full event for processing
})
return
Testing
Send messages rapidly (within 2 seconds):
User: Hey
User: Are you there?
User: Hello??
Result: They'll be combined and sent to Letta as one message:
"Hey\n\nAre you there?\n\nHello??"
Configuration
Adjust the debounce window in create_message_debouncer():
debounce_ms=2000= 2 second window (default)debounce_ms=3000= 3 second window (for slower users)debounce_ms=1000= 1 second window (for rapid fire)debounce_ms=0= DISABLED (process immediately)
Benefits
- Reduces agent overhead: Batch rapid messages instead of calling Letta multiple times
- Better UX: Combines split thoughts into coherent messages
- Lower costs: Fewer API calls to Letta
- More natural: Matches how humans type (pause, think, continue)