380 lines
13 KiB
Python
380 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Heartbeat Service for Matrix-Letta Bridge
|
|
|
|
Sends periodic heartbeats to wake the agent up on a schedule.
|
|
|
|
SILENT MODE: Agent's text output is NOT auto-delivered to Matrix.
|
|
The agent must use the `matrix-send-message` MCP tool to contact the user.
|
|
|
|
Based on lettabot's heartbeat implementation.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional, Callable, Awaitable
|
|
from dataclasses import dataclass, field
|
|
|
|
from prompts import build_heartbeat_prompt, build_cron_prompt
|
|
|
|
log = logging.getLogger("meridian.heartbeat")
|
|
|
|
# Log file for heartbeat events
|
|
LOG_PATH = Path("./store/heartbeat-log.jsonl")
|
|
|
|
|
|
@dataclass
|
|
class HeartbeatConfig:
|
|
"""Heartbeat configuration"""
|
|
enabled: bool = True
|
|
interval_minutes: int = 60 # Default: every hour
|
|
|
|
# Skip heartbeat if user messaged within this many minutes
|
|
skip_if_recent_minutes: int = 5
|
|
|
|
# Custom heartbeat prompt (optional - uses default if None)
|
|
custom_prompt: Optional[str] = None
|
|
|
|
# Target room for proactive messages (optional - uses last active room if None)
|
|
target_room_id: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class HeartbeatState:
|
|
"""Runtime state for heartbeat service"""
|
|
last_user_message_time: Optional[datetime] = None
|
|
last_heartbeat_time: Optional[datetime] = None
|
|
last_active_room_id: Optional[str] = None
|
|
heartbeat_count: int = 0
|
|
skipped_count: int = 0
|
|
paused: bool = False
|
|
is_running: bool = False # Currently executing a heartbeat
|
|
paused_since: Optional[datetime] = None
|
|
paused_by: Optional[str] = None # Who paused it (user context)
|
|
|
|
|
|
class HeartbeatService:
|
|
"""
|
|
Heartbeat Service - Periodic agent wake-ups
|
|
|
|
SILENT MODE: Agent's response text is NOT auto-delivered.
|
|
The agent must use MCP tools to send messages proactively.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: HeartbeatConfig,
|
|
send_to_agent: Callable[[str, str], Awaitable], # Returns LettaResponse
|
|
get_conversation_id: Callable[[str], Awaitable[Optional[str]]],
|
|
):
|
|
"""
|
|
Initialize heartbeat service.
|
|
|
|
Args:
|
|
config: Heartbeat configuration
|
|
send_to_agent: Async function to send message to Letta agent
|
|
Takes (message, conversation_id) -> LettaResponse
|
|
get_conversation_id: Async function to get conversation ID for a room
|
|
"""
|
|
self.config = config
|
|
self.send_to_agent = send_to_agent
|
|
self.get_conversation_id = get_conversation_id
|
|
self.state = HeartbeatState()
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._stop_event = asyncio.Event()
|
|
self._pause_event = asyncio.Event() # Set when paused
|
|
|
|
# Ensure log directory exists
|
|
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _log_event(self, event: str, data: dict) -> None:
|
|
"""Log heartbeat event to file and console"""
|
|
import json
|
|
entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"event": event,
|
|
**data,
|
|
}
|
|
|
|
try:
|
|
with open(LOG_PATH, "a") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
except Exception:
|
|
pass # Ignore log errors
|
|
|
|
log.info(f"[Heartbeat] {event}: {data}")
|
|
|
|
def update_last_user_message(self, room_id: str) -> None:
|
|
"""Call this when a user sends a message"""
|
|
self.state.last_user_message_time = datetime.now()
|
|
self.state.last_active_room_id = room_id
|
|
|
|
def start(self) -> None:
|
|
"""Start the heartbeat timer"""
|
|
if not self.config.enabled:
|
|
log.info("[Heartbeat] Disabled")
|
|
return
|
|
|
|
if self._task and not self._task.done():
|
|
log.info("[Heartbeat] Already running")
|
|
return
|
|
|
|
self._stop_event.clear()
|
|
self._task = asyncio.create_task(self._heartbeat_loop())
|
|
|
|
log.info(
|
|
f"[Heartbeat] Starting in SILENT MODE "
|
|
f"(every {self.config.interval_minutes} minutes)"
|
|
)
|
|
log.info(
|
|
f"[Heartbeat] First heartbeat in {self.config.interval_minutes} minutes"
|
|
)
|
|
|
|
self._log_event("heartbeat_started", {
|
|
"interval_minutes": self.config.interval_minutes,
|
|
"mode": "silent",
|
|
"note": "Agent must use matrix-send-message MCP tool to contact user",
|
|
})
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the heartbeat timer"""
|
|
self._stop_event.set()
|
|
if self._task:
|
|
self._task.cancel()
|
|
self._task = None
|
|
log.info("[Heartbeat] Stopped")
|
|
|
|
def pause(self, by: str = "user") -> str:
|
|
"""
|
|
Pause the heartbeat service (keeps timer running but skips heartbeats).
|
|
|
|
Args:
|
|
by: Who paused it (for tracking purposes)
|
|
|
|
Returns:
|
|
Status message
|
|
"""
|
|
if self.state.paused:
|
|
return "⏸️ Heartbeat is already paused"
|
|
|
|
self.state.paused = True
|
|
self.state.paused_since = datetime.now()
|
|
self.state.paused_by = by
|
|
log.info(f"[Heartbeat] Paused by {by}")
|
|
|
|
self._log_event("heartbeat_paused", {
|
|
"by": by,
|
|
"paused_since": self.state.paused_since.isoformat(),
|
|
})
|
|
|
|
return f"⏸️ Heartbeat paused by {by}"
|
|
|
|
def resume(self, by: str = "user", trigger_immediately: bool = False) -> str:
|
|
"""
|
|
Resume the heartbeat service.
|
|
|
|
Args:
|
|
by: Who resumed it (for tracking purposes)
|
|
trigger_immediately: If True, trigger a heartbeat now
|
|
|
|
Returns:
|
|
Status message
|
|
"""
|
|
if not self.state.paused:
|
|
return "▶️ Heartbeat is not paused"
|
|
|
|
self.state.paused = False
|
|
paused_duration = datetime.now() - self.state.paused_since if self.state.paused_since else None
|
|
log.info(f"[Heartbeat] Resumed by {by} (paused for {paused_duration})")
|
|
self._pause_event.clear()
|
|
|
|
self._log_event("heartbeat_resumed", {
|
|
"by": by,
|
|
"paused_duration_minutes": paused_duration.total_seconds() / 60 if paused_duration else None,
|
|
})
|
|
|
|
# Clear pause timestamps
|
|
self.state.paused_since = None
|
|
self.state.paused_by = None
|
|
|
|
if trigger_immediately:
|
|
log.info("[Heartbeat] Triggering heartbeat immediately on resume")
|
|
asyncio.create_task(self._run_heartbeat(skip_recent_check=True))
|
|
|
|
return f"▶️ Heartbeat resumed by {by}"
|
|
|
|
async def trigger(self, by: str = "user") -> None:
|
|
"""
|
|
Manually trigger a heartbeat (or resume if paused).
|
|
|
|
Args:
|
|
by: Who triggered it (for tracking purposes)
|
|
|
|
If paused, this resumes the heartbeat and runs immediately.
|
|
"""
|
|
if self.state.paused:
|
|
log.info("[Heartbeat] Trigger on paused heartbeat - resuming...")
|
|
self.resume(by=by, trigger_immediately=True)
|
|
else:
|
|
log.info("[Heartbeat] Manual trigger requested")
|
|
await self._run_heartbeat(skip_recent_check=True)
|
|
|
|
async def _heartbeat_loop(self) -> None:
|
|
"""Main heartbeat loop"""
|
|
interval_seconds = self.config.interval_minutes * 60
|
|
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
# Wait for interval (or until stopped)
|
|
await asyncio.wait_for(
|
|
self._stop_event.wait(),
|
|
timeout=interval_seconds
|
|
)
|
|
# If we get here, stop was requested
|
|
break
|
|
except asyncio.TimeoutError:
|
|
# Check if paused - if so, skip this cycle
|
|
if self.state.paused:
|
|
log.info("[Heartbeat] Skipped (paused)")
|
|
continue
|
|
# Timeout means it's time for a heartbeat
|
|
await self._run_heartbeat()
|
|
|
|
async def _run_heartbeat(self, skip_recent_check: bool = False) -> None:
|
|
"""
|
|
Run a single heartbeat.
|
|
|
|
SILENT MODE: Agent's text output is NOT auto-delivered.
|
|
The agent must use MCP tools to send messages proactively.
|
|
"""
|
|
now = datetime.now()
|
|
formatted_time = now.strftime("%Y-%m-%d %H:%M:%S")
|
|
timezone = datetime.now().astimezone().tzname() or "UTC"
|
|
|
|
log.info("")
|
|
log.info("=" * 60)
|
|
log.info(f"[Heartbeat] ⏰ RUNNING at {formatted_time} [SILENT MODE]")
|
|
log.info("=" * 60)
|
|
log.info("")
|
|
|
|
try:
|
|
# Skip if user sent a message recently (unless manual trigger)
|
|
if not skip_recent_check and self.state.last_user_message_time:
|
|
time_since_last = now - self.state.last_user_message_time
|
|
skip_window = timedelta(minutes=self.config.skip_if_recent_minutes)
|
|
|
|
if time_since_last < skip_window:
|
|
minutes_ago = int(time_since_last.total_seconds() / 60)
|
|
log.info(
|
|
f"[Heartbeat] User messaged {minutes_ago}m ago - skipping heartbeat"
|
|
)
|
|
self._log_event("heartbeat_skipped_recent_user", {
|
|
"last_user_message": self.state.last_user_message_time.isoformat(),
|
|
"minutes_ago": minutes_ago,
|
|
})
|
|
self.state.skipped_count += 1
|
|
return
|
|
|
|
# Get target room and conversation
|
|
target_room = self.config.target_room_id or self.state.last_active_room_id
|
|
|
|
if not target_room:
|
|
log.warning("[Heartbeat] No target room - skipping (no user has messaged yet)")
|
|
self._log_event("heartbeat_skipped_no_room", {})
|
|
return
|
|
|
|
conversation_id = await self.get_conversation_id(target_room)
|
|
if not conversation_id:
|
|
log.warning(f"[Heartbeat] No conversation for room {target_room} - skipping")
|
|
self._log_event("heartbeat_skipped_no_conversation", {
|
|
"room_id": target_room,
|
|
})
|
|
return
|
|
|
|
log.info(f"[Heartbeat] Sending heartbeat to agent...")
|
|
log.info(f"[Heartbeat] Target room: {target_room}")
|
|
|
|
self._log_event("heartbeat_running", {
|
|
"time": now.isoformat(),
|
|
"mode": "silent",
|
|
"target_room": target_room,
|
|
})
|
|
|
|
# Build the heartbeat message
|
|
if self.config.custom_prompt:
|
|
message = self.config.custom_prompt
|
|
else:
|
|
message = build_heartbeat_prompt(
|
|
formatted_time,
|
|
timezone,
|
|
self.config.interval_minutes,
|
|
target_room,
|
|
)
|
|
|
|
log.info(f"[Heartbeat] Sending prompt (SILENT MODE):")
|
|
log.info("-" * 50)
|
|
for line in message.split("\n")[:10]: # Show first 10 lines
|
|
log.info(f" {line}")
|
|
log.info(" ...")
|
|
log.info("-" * 50)
|
|
|
|
# Send to agent - response text is NOT delivered (silent mode)
|
|
# Agent must use MCP tools to send messages
|
|
letta_response = await asyncio.to_thread(
|
|
self.send_to_agent,
|
|
message,
|
|
conversation_id,
|
|
)
|
|
|
|
# Extract from LettaResponse object
|
|
response = letta_response.assistant_text
|
|
status = letta_response.status
|
|
|
|
# Log results
|
|
log.info(f"[Heartbeat] Agent finished.")
|
|
log.info(f" - Status: {status}")
|
|
log.info(f" - Response text: {len(response) if response else 0} chars (NOT delivered - silent mode)")
|
|
|
|
if response and response.strip():
|
|
preview = response[:100] + ("..." if len(response) > 100 else "")
|
|
log.info(f" - Response preview: \"{preview}\"")
|
|
|
|
self.state.last_heartbeat_time = now
|
|
self.state.heartbeat_count += 1
|
|
|
|
self._log_event("heartbeat_completed", {
|
|
"mode": "silent",
|
|
"response_length": len(response) if response else 0,
|
|
"status": status,
|
|
"heartbeat_count": self.state.heartbeat_count,
|
|
})
|
|
|
|
except Exception as e:
|
|
log.error(f"[Heartbeat] Error: {e}")
|
|
self._log_event("heartbeat_error", {
|
|
"error": str(e),
|
|
})
|
|
finally:
|
|
# Always reset running flag when done
|
|
self.state.is_running = False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get heartbeat service status"""
|
|
return {
|
|
"enabled": self.config.enabled,
|
|
"running": self._task is not None and not self._task.done(),
|
|
"paused": self.state.paused,
|
|
"is_running": self.state.is_running, # Currently executing a heartbeat
|
|
"interval_minutes": self.config.interval_minutes,
|
|
"skip_if_recent_minutes": self.config.skip_if_recent_minutes,
|
|
"heartbeat_count": self.state.heartbeat_count,
|
|
"skipped_count": self.state.skipped_count,
|
|
"last_heartbeat": self.state.last_heartbeat_time.isoformat() if self.state.last_heartbeat_time else None,
|
|
"last_user_message": self.state.last_user_message_time.isoformat() if self.state.last_user_message_time else None,
|
|
"last_active_room": self.state.last_active_room_id,
|
|
"paused_since": self.state.paused_since.isoformat() if self.state.paused_since else None,
|
|
"paused_by": self.state.paused_by,
|
|
}
|