Files
Redflag/aggregator-server/internal/scheduler/queue.go
Fimeg bf4d46529f feat: add resilience and reliability features for agent subsystems
Added circuit breakers with configurable timeouts for all subsystems (APT, DNF, Docker, Windows, Winget, Storage). Replaces cron-based scheduler with priority queue that should scale beyond 1000+ agents if your homelab is that big.

Command acknowledgment system ensures results aren't lost on network failures or restarts. Agent tracks pending acknowledgments with persistent state and automatic retry.

- Circuit breakers: 3 failures in 1min opens circuit, 30s cooldown
- Per-subsystem timeouts: 30s-10min depending on scanner
- Priority queue scheduler: O(log n), worker pool, jitter, backpressure
- Acknowledgments: at-least-once delivery, max 10 retries over 24h
- All tests passing (26/26)
2025-11-01 18:42:41 -04:00

287 lines
6.4 KiB
Go

package scheduler
import (
"container/heap"
"fmt"
"sync"
"time"
"github.com/google/uuid"
)
// SubsystemJob represents a scheduled subsystem scan
type SubsystemJob struct {
AgentID uuid.UUID
AgentHostname string // For logging/debugging
Subsystem string
IntervalMinutes int
NextRunAt time.Time
Enabled bool
index int // Heap index (managed by heap.Interface)
}
// String returns a human-readable representation of the job
func (j *SubsystemJob) String() string {
return fmt.Sprintf("[%s/%s] next_run=%s interval=%dm",
j.AgentHostname, j.Subsystem,
j.NextRunAt.Format("15:04:05"), j.IntervalMinutes)
}
// jobHeap implements heap.Interface for SubsystemJob priority queue
// Jobs are ordered by NextRunAt (earliest first)
type jobHeap []*SubsystemJob
func (h jobHeap) Len() int { return len(h) }
func (h jobHeap) Less(i, j int) bool {
return h[i].NextRunAt.Before(h[j].NextRunAt)
}
func (h jobHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *jobHeap) Push(x interface{}) {
n := len(*h)
job := x.(*SubsystemJob)
job.index = n
*h = append(*h, job)
}
func (h *jobHeap) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
old[n-1] = nil // Avoid memory leak
job.index = -1 // Mark as removed
*h = old[0 : n-1]
return job
}
// PriorityQueue is a thread-safe priority queue for subsystem jobs
// Jobs are ordered by their NextRunAt timestamp (earliest first)
type PriorityQueue struct {
heap jobHeap
mu sync.RWMutex
// Index for fast lookups by agent_id + subsystem
index map[string]*SubsystemJob // key: "agent_id:subsystem"
}
// NewPriorityQueue creates a new empty priority queue
func NewPriorityQueue() *PriorityQueue {
pq := &PriorityQueue{
heap: make(jobHeap, 0),
index: make(map[string]*SubsystemJob),
}
heap.Init(&pq.heap)
return pq
}
// Push adds a job to the queue
// If a job with the same agent_id + subsystem already exists, it's updated
func (pq *PriorityQueue) Push(job *SubsystemJob) {
pq.mu.Lock()
defer pq.mu.Unlock()
key := makeKey(job.AgentID, job.Subsystem)
// Check if job already exists
if existing, exists := pq.index[key]; exists {
// Update existing job
existing.NextRunAt = job.NextRunAt
existing.IntervalMinutes = job.IntervalMinutes
existing.Enabled = job.Enabled
existing.AgentHostname = job.AgentHostname
heap.Fix(&pq.heap, existing.index)
return
}
// Add new job
heap.Push(&pq.heap, job)
pq.index[key] = job
}
// Pop removes and returns the job with the earliest NextRunAt
// Returns nil if queue is empty
func (pq *PriorityQueue) Pop() *SubsystemJob {
pq.mu.Lock()
defer pq.mu.Unlock()
if pq.heap.Len() == 0 {
return nil
}
job := heap.Pop(&pq.heap).(*SubsystemJob)
key := makeKey(job.AgentID, job.Subsystem)
delete(pq.index, key)
return job
}
// Peek returns the job with the earliest NextRunAt without removing it
// Returns nil if queue is empty
func (pq *PriorityQueue) Peek() *SubsystemJob {
pq.mu.RLock()
defer pq.mu.RUnlock()
if pq.heap.Len() == 0 {
return nil
}
return pq.heap[0]
}
// Remove removes a specific job from the queue
// Returns true if job was found and removed, false otherwise
func (pq *PriorityQueue) Remove(agentID uuid.UUID, subsystem string) bool {
pq.mu.Lock()
defer pq.mu.Unlock()
key := makeKey(agentID, subsystem)
job, exists := pq.index[key]
if !exists {
return false
}
heap.Remove(&pq.heap, job.index)
delete(pq.index, key)
return true
}
// Get retrieves a specific job without removing it
// Returns nil if not found
func (pq *PriorityQueue) Get(agentID uuid.UUID, subsystem string) *SubsystemJob {
pq.mu.RLock()
defer pq.mu.RUnlock()
key := makeKey(agentID, subsystem)
return pq.index[key]
}
// PopBefore returns all jobs with NextRunAt <= before, up to limit
// Jobs are removed from the queue
// If limit <= 0, all matching jobs are returned
func (pq *PriorityQueue) PopBefore(before time.Time, limit int) []*SubsystemJob {
pq.mu.Lock()
defer pq.mu.Unlock()
var jobs []*SubsystemJob
for pq.heap.Len() > 0 {
// Peek at next job
next := pq.heap[0]
// Stop if next job is after our cutoff
if next.NextRunAt.After(before) {
break
}
// Stop if we've hit the limit
if limit > 0 && len(jobs) >= limit {
break
}
// Pop and collect the job
job := heap.Pop(&pq.heap).(*SubsystemJob)
key := makeKey(job.AgentID, job.Subsystem)
delete(pq.index, key)
jobs = append(jobs, job)
}
return jobs
}
// PeekBefore returns all jobs with NextRunAt <= before without removing them
// If limit <= 0, all matching jobs are returned
func (pq *PriorityQueue) PeekBefore(before time.Time, limit int) []*SubsystemJob {
pq.mu.RLock()
defer pq.mu.RUnlock()
var jobs []*SubsystemJob
for i := 0; i < pq.heap.Len(); i++ {
job := pq.heap[i]
if job.NextRunAt.After(before) {
// Since heap is sorted by NextRunAt, we can break early
// Note: This is only valid because we peek in order
break
}
if limit > 0 && len(jobs) >= limit {
break
}
jobs = append(jobs, job)
}
return jobs
}
// Len returns the number of jobs in the queue
func (pq *PriorityQueue) Len() int {
pq.mu.RLock()
defer pq.mu.RUnlock()
return pq.heap.Len()
}
// Clear removes all jobs from the queue
func (pq *PriorityQueue) Clear() {
pq.mu.Lock()
defer pq.mu.Unlock()
pq.heap = make(jobHeap, 0)
pq.index = make(map[string]*SubsystemJob)
heap.Init(&pq.heap)
}
// GetStats returns statistics about the queue
func (pq *PriorityQueue) GetStats() QueueStats {
pq.mu.RLock()
defer pq.mu.RUnlock()
stats := QueueStats{
Size: pq.heap.Len(),
}
if pq.heap.Len() > 0 {
stats.NextRunAt = &pq.heap[0].NextRunAt
stats.OldestJob = pq.heap[0].String()
}
// Count jobs by subsystem
stats.JobsBySubsystem = make(map[string]int)
for _, job := range pq.heap {
stats.JobsBySubsystem[job.Subsystem]++
}
return stats
}
// QueueStats holds statistics about the priority queue
type QueueStats struct {
Size int
NextRunAt *time.Time
OldestJob string
JobsBySubsystem map[string]int
}
// String returns a human-readable representation of stats
func (s QueueStats) String() string {
nextRun := "empty"
if s.NextRunAt != nil {
nextRun = s.NextRunAt.Format("15:04:05")
}
return fmt.Sprintf("size=%d next=%s oldest=%s", s.Size, nextRun, s.OldestJob)
}
// makeKey creates a unique key for agent_id + subsystem
func makeKey(agentID uuid.UUID, subsystem string) string {
return agentID.String() + ":" + subsystem
}