Files
Fimeg bf4d46529f feat: add resilience and reliability features for agent subsystems
Added circuit breakers with configurable timeouts for all subsystems (APT, DNF, Docker, Windows, Winget, Storage). Replaces cron-based scheduler with priority queue that should scale beyond 1000+ agents if your homelab is that big.

Command acknowledgment system ensures results aren't lost on network failures or restarts. Agent tracks pending acknowledgments with persistent state and automatic retry.

- Circuit breakers: 3 failures in 1min opens circuit, 30s cooldown
- Per-subsystem timeouts: 30s-10min depending on scanner
- Priority queue scheduler: O(log n), worker pool, jitter, backpressure
- Acknowledgments: at-least-once delivery, max 10 retries over 24h
- All tests passing (26/26)
2025-11-01 18:42:41 -04:00

194 lines
4.0 KiB
Go

package acknowledgment
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// PendingResult represents a command result awaiting acknowledgment
type PendingResult struct {
CommandID string `json:"command_id"`
SentAt time.Time `json:"sent_at"`
RetryCount int `json:"retry_count"`
}
// Tracker manages pending acknowledgments for command results
type Tracker struct {
pending map[string]*PendingResult
mu sync.RWMutex
filePath string
maxAge time.Duration // Max time to keep pending (default 24h)
maxRetries int // Max retries before giving up (default 10)
}
// NewTracker creates a new acknowledgment tracker
func NewTracker(statePath string) *Tracker {
return &Tracker{
pending: make(map[string]*PendingResult),
filePath: filepath.Join(statePath, "pending_acks.json"),
maxAge: 24 * time.Hour,
maxRetries: 10,
}
}
// Load restores pending acknowledgments from disk
func (t *Tracker) Load() error {
t.mu.Lock()
defer t.mu.Unlock()
// If file doesn't exist, that's fine (fresh start)
if _, err := os.Stat(t.filePath); os.IsNotExist(err) {
return nil
}
data, err := os.ReadFile(t.filePath)
if err != nil {
return fmt.Errorf("failed to read pending acks: %w", err)
}
if len(data) == 0 {
return nil // Empty file
}
var pending map[string]*PendingResult
if err := json.Unmarshal(data, &pending); err != nil {
return fmt.Errorf("failed to parse pending acks: %w", err)
}
t.pending = pending
return nil
}
// Save persists pending acknowledgments to disk
func (t *Tracker) Save() error {
t.mu.RLock()
defer t.mu.RUnlock()
// Ensure directory exists
dir := filepath.Dir(t.filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create ack directory: %w", err)
}
data, err := json.MarshalIndent(t.pending, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal pending acks: %w", err)
}
if err := os.WriteFile(t.filePath, data, 0600); err != nil {
return fmt.Errorf("failed to write pending acks: %w", err)
}
return nil
}
// Add marks a command result as pending acknowledgment
func (t *Tracker) Add(commandID string) {
t.mu.Lock()
defer t.mu.Unlock()
t.pending[commandID] = &PendingResult{
CommandID: commandID,
SentAt: time.Now(),
RetryCount: 0,
}
}
// Acknowledge marks command results as acknowledged and removes them
func (t *Tracker) Acknowledge(commandIDs []string) {
t.mu.Lock()
defer t.mu.Unlock()
for _, id := range commandIDs {
delete(t.pending, id)
}
}
// GetPending returns list of command IDs awaiting acknowledgment
func (t *Tracker) GetPending() []string {
t.mu.RLock()
defer t.mu.RUnlock()
ids := make([]string, 0, len(t.pending))
for id := range t.pending {
ids = append(ids, id)
}
return ids
}
// IncrementRetry increments retry count for a command
func (t *Tracker) IncrementRetry(commandID string) {
t.mu.Lock()
defer t.mu.Unlock()
if result, exists := t.pending[commandID]; exists {
result.RetryCount++
}
}
// Cleanup removes old or over-retried pending results
func (t *Tracker) Cleanup() int {
t.mu.Lock()
defer t.mu.Unlock()
now := time.Now()
removed := 0
for id, result := range t.pending {
// Remove if too old
if now.Sub(result.SentAt) > t.maxAge {
delete(t.pending, id)
removed++
continue
}
// Remove if retried too many times
if result.RetryCount >= t.maxRetries {
delete(t.pending, id)
removed++
continue
}
}
return removed
}
// Stats returns statistics about pending acknowledgments
func (t *Tracker) Stats() Stats {
t.mu.RLock()
defer t.mu.RUnlock()
stats := Stats{
Total: len(t.pending),
}
now := time.Now()
for _, result := range t.pending {
age := now.Sub(result.SentAt)
if age > 1*time.Hour {
stats.OlderThan1Hour++
}
if result.RetryCount > 0 {
stats.WithRetries++
}
if result.RetryCount >= 5 {
stats.HighRetries++
}
}
return stats
}
// Stats holds statistics about pending acknowledgments
type Stats struct {
Total int
OlderThan1Hour int
WithRetries int
HighRetries int
}