feat: granular subsystem commands with parallel scanner execution

Split monolithic scan_updates into individual subsystems (updates/storage/system/docker).
Scanners now run in parallel via goroutines - cuts scan time roughly in half, maybe more.

Agent changes:
- Orchestrator pattern for scanner management
- New scanners: storage (disk metrics), system (cpu/mem/processes)
- New commands: scan_storage, scan_system, scan_docker
- Wrapped existing scanners (APT/DNF/Docker/Windows/Winget) with common interface
- Version bump to 0.1.20

Server changes:
- Migration 015: agent_subsystems table with trigger for auto-init
- Subsystem CRUD: enable/disable, interval (5min-24hr), auto-run toggle
- API routes: /api/v1/agents/:id/subsystems/* (9 endpoints)
- Stats tracking per subsystem

Web UI changes:
- ChatTimeline shows subsystem-specific labels and icons
- AgentScanners got interactive toggles, interval dropdowns, manual trigger buttons
- TypeScript types added for subsystems

Backward compatible with legacy scan_updates - for now. Bugs probably exist somewhere.
This commit is contained in:
Fimeg
2025-11-01 20:34:00 -04:00
parent bf4d46529f
commit 3690472396
19 changed files with 2151 additions and 253 deletions

View File

@@ -19,6 +19,7 @@ import (
"github.com/Fimeg/RedFlag/aggregator-agent/internal/config"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/display"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/installer"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/orchestrator"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/scanner"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/service"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/system"
@@ -26,7 +27,7 @@ import (
)
const (
AgentVersion = "0.1.19" // Phase 0: Circuit breakers, timeouts, and subsystem resilience
AgentVersion = "0.1.20" // Phase 1: Granular subsystem commands and parallel scanner execution
)
// getConfigPath returns the platform-specific config path
@@ -466,6 +467,37 @@ func runAgent(cfg *config.Config) error {
HalfOpenAttempts: cfg.Subsystems.Winget.CircuitBreaker.HalfOpenAttempts,
})
// Initialize scanner orchestrator for parallel execution and granular subsystem management
scanOrchestrator := orchestrator.NewOrchestrator()
// Register update scanners
scanOrchestrator.RegisterScanner("apt", orchestrator.NewAPTScannerWrapper(aptScanner), aptCB, cfg.Subsystems.APT.Timeout, cfg.Subsystems.APT.Enabled)
scanOrchestrator.RegisterScanner("dnf", orchestrator.NewDNFScannerWrapper(dnfScanner), dnfCB, cfg.Subsystems.DNF.Timeout, cfg.Subsystems.DNF.Enabled)
scanOrchestrator.RegisterScanner("docker", orchestrator.NewDockerScannerWrapper(dockerScanner), dockerCB, cfg.Subsystems.Docker.Timeout, cfg.Subsystems.Docker.Enabled)
scanOrchestrator.RegisterScanner("windows", orchestrator.NewWindowsUpdateScannerWrapper(windowsUpdateScanner), windowsCB, cfg.Subsystems.Windows.Timeout, cfg.Subsystems.Windows.Enabled)
scanOrchestrator.RegisterScanner("winget", orchestrator.NewWingetScannerWrapper(wingetScanner), wingetCB, cfg.Subsystems.Winget.Timeout, cfg.Subsystems.Winget.Enabled)
// Register storage and system scanners
storageScanner := orchestrator.NewStorageScanner(AgentVersion)
systemScanner := orchestrator.NewSystemScanner(AgentVersion)
// Storage and system scanners don't need circuit breakers (always available, fast operations)
storageCB := circuitbreaker.New("Storage", circuitbreaker.Config{
FailureThreshold: 5,
FailureWindow: 10 * time.Minute,
OpenDuration: 5 * time.Minute,
HalfOpenAttempts: 1,
})
systemCB := circuitbreaker.New("System", circuitbreaker.Config{
FailureThreshold: 5,
FailureWindow: 10 * time.Minute,
OpenDuration: 5 * time.Minute,
HalfOpenAttempts: 1,
})
scanOrchestrator.RegisterScanner("storage", storageScanner, storageCB, 30*time.Second, cfg.Subsystems.Storage.Enabled)
scanOrchestrator.RegisterScanner("system", systemScanner, systemCB, 30*time.Second, true) // System scanner always enabled
// Initialize acknowledgment tracker for command result reliability
ackTracker := acknowledgment.NewTracker(getStatePath())
if err := ackTracker.Load(); err != nil {
@@ -610,10 +642,25 @@ func runAgent(cfg *config.Config) error {
switch cmd.Type {
case "scan_updates":
if err := handleScanUpdates(apiClient, cfg, ackTracker, aptScanner, dnfScanner, dockerScanner, windowsUpdateScanner, wingetScanner, aptCB, dnfCB, dockerCB, windowsCB, wingetCB, cmd.ID); err != nil {
if err := handleScanUpdatesV2(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil {
log.Printf("Error scanning updates: %v\n", err)
}
case "scan_storage":
if err := handleScanStorage(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil {
log.Printf("Error scanning storage: %v\n", err)
}
case "scan_system":
if err := handleScanSystem(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil {
log.Printf("Error scanning system: %v\n", err)
}
case "scan_docker":
if err := handleScanDocker(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil {
log.Printf("Error scanning Docker: %v\n", err)
}
case "collect_specs":
log.Println("Spec collection not yet implemented")

View File

@@ -0,0 +1,232 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/acknowledgment"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/client"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/config"
"github.com/Fimeg/RedFlag/aggregator-agent/internal/orchestrator"
)
// handleScanUpdatesV2 scans all update subsystems (APT, DNF, Docker, Windows Update, Winget) in parallel
// This is the new orchestrator-based version for v0.1.20
func handleScanUpdatesV2(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error {
log.Println("Scanning for updates (parallel execution)...")
ctx := context.Background()
startTime := time.Now()
// Execute all update scanners in parallel
results, allUpdates := orch.ScanAll(ctx)
// Format results
stdout, stderr, exitCode := orchestrator.FormatScanSummary(results)
// Add timing information
duration := time.Since(startTime)
stdout += fmt.Sprintf("\nScan completed in %.2f seconds\n", duration.Seconds())
// Create scan log entry with subsystem metadata
logReport := client.LogReport{
CommandID: commandID,
Action: "scan_updates",
Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0],
Stdout: stdout,
Stderr: stderr,
ExitCode: exitCode,
DurationSeconds: int(duration.Seconds()),
}
// Report the scan log
if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil {
log.Printf("Failed to report scan log: %v\n", err)
// Continue anyway - updates are more important
}
// Report updates to server if any were found
if len(allUpdates) > 0 {
report := client.UpdateReport{
CommandID: commandID,
Timestamp: time.Now(),
Updates: allUpdates,
}
if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil {
return fmt.Errorf("failed to report updates: %w", err)
}
log.Printf("✓ Reported %d updates to server\n", len(allUpdates))
} else {
log.Println("No updates found")
}
return nil
}
// handleScanStorage scans disk usage metrics only
func handleScanStorage(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error {
log.Println("Scanning storage...")
ctx := context.Background()
startTime := time.Now()
// Execute storage scanner
result, err := orch.ScanSingle(ctx, "storage")
if err != nil {
return fmt.Errorf("failed to scan storage: %w", err)
}
// Format results
results := []orchestrator.ScanResult{result}
stdout, stderr, exitCode := orchestrator.FormatScanSummary(results)
duration := time.Since(startTime)
stdout += fmt.Sprintf("\nStorage scan completed in %.2f seconds\n", duration.Seconds())
// Create scan log entry
logReport := client.LogReport{
CommandID: commandID,
Action: "scan_storage",
Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0],
Stdout: stdout,
Stderr: stderr,
ExitCode: exitCode,
DurationSeconds: int(duration.Seconds()),
}
// Report the scan log
if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil {
log.Printf("Failed to report scan log: %v\n", err)
}
// Report "updates" (disk info) to server
if len(result.Updates) > 0 {
report := client.UpdateReport{
CommandID: commandID,
Timestamp: time.Now(),
Updates: result.Updates,
}
if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil {
return fmt.Errorf("failed to report storage metrics: %w", err)
}
log.Printf("✓ Reported %d disk mount points to server\n", len(result.Updates))
}
return nil
}
// handleScanSystem scans system metrics (CPU, memory, processes, uptime)
func handleScanSystem(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error {
log.Println("Scanning system metrics...")
ctx := context.Background()
startTime := time.Now()
// Execute system scanner
result, err := orch.ScanSingle(ctx, "system")
if err != nil {
return fmt.Errorf("failed to scan system: %w", err)
}
// Format results
results := []orchestrator.ScanResult{result}
stdout, stderr, exitCode := orchestrator.FormatScanSummary(results)
duration := time.Since(startTime)
stdout += fmt.Sprintf("\nSystem scan completed in %.2f seconds\n", duration.Seconds())
// Create scan log entry
logReport := client.LogReport{
CommandID: commandID,
Action: "scan_system",
Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0],
Stdout: stdout,
Stderr: stderr,
ExitCode: exitCode,
DurationSeconds: int(duration.Seconds()),
}
// Report the scan log
if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil {
log.Printf("Failed to report scan log: %v\n", err)
}
// Report "updates" (system metrics) to server
if len(result.Updates) > 0 {
report := client.UpdateReport{
CommandID: commandID,
Timestamp: time.Now(),
Updates: result.Updates,
}
if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil {
return fmt.Errorf("failed to report system metrics: %w", err)
}
log.Printf("✓ Reported system metrics to server\n")
}
return nil
}
// handleScanDocker scans Docker image updates only
func handleScanDocker(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error {
log.Println("Scanning Docker images...")
ctx := context.Background()
startTime := time.Now()
// Execute Docker scanner
result, err := orch.ScanSingle(ctx, "docker")
if err != nil {
return fmt.Errorf("failed to scan Docker: %w", err)
}
// Format results
results := []orchestrator.ScanResult{result}
stdout, stderr, exitCode := orchestrator.FormatScanSummary(results)
duration := time.Since(startTime)
stdout += fmt.Sprintf("\nDocker scan completed in %.2f seconds\n", duration.Seconds())
// Create scan log entry
logReport := client.LogReport{
CommandID: commandID,
Action: "scan_docker",
Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0],
Stdout: stdout,
Stderr: stderr,
ExitCode: exitCode,
DurationSeconds: int(duration.Seconds()),
}
// Report the scan log
if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil {
log.Printf("Failed to report scan log: %v\n", err)
}
// Report updates to server if any were found
if len(result.Updates) > 0 {
report := client.UpdateReport{
CommandID: commandID,
Timestamp: time.Now(),
Updates: result.Updates,
}
if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil {
return fmt.Errorf("failed to report Docker updates: %w", err)
}
log.Printf("✓ Reported %d Docker image updates to server\n", len(result.Updates))
} else {
log.Println("No Docker image updates found")
}
return nil
}