From 36904723962c8aed6c9363ead019fb2e94c6bf33 Mon Sep 17 00:00:00 2001 From: Fimeg Date: Sat, 1 Nov 2025 20:34:00 -0400 Subject: [PATCH] feat: granular subsystem commands with parallel scanner execution Split monolithic scan_updates into individual subsystems (updates/storage/system/docker). Scanners now run in parallel via goroutines - cuts scan time roughly in half, maybe more. Agent changes: - Orchestrator pattern for scanner management - New scanners: storage (disk metrics), system (cpu/mem/processes) - New commands: scan_storage, scan_system, scan_docker - Wrapped existing scanners (APT/DNF/Docker/Windows/Winget) with common interface - Version bump to 0.1.20 Server changes: - Migration 015: agent_subsystems table with trigger for auto-init - Subsystem CRUD: enable/disable, interval (5min-24hr), auto-run toggle - API routes: /api/v1/agents/:id/subsystems/* (9 endpoints) - Stats tracking per subsystem Web UI changes: - ChatTimeline shows subsystem-specific labels and icons - AgentScanners got interactive toggles, interval dropdowns, manual trigger buttons - TypeScript types added for subsystems Backward compatible with legacy scan_updates - for now. Bugs probably exist somewhere. --- aggregator-agent/cmd/agent/main.go | 51 +- .../cmd/agent/subsystem_handlers.go | 232 +++++++++ .../internal/orchestrator/orchestrator.go | 261 ++++++++++ .../internal/orchestrator/scanner_wrappers.go | 114 +++++ .../internal/orchestrator/storage_scanner.go | 87 ++++ .../internal/orchestrator/system_scanner.go | 137 +++++ aggregator-server/Dockerfile | 8 +- aggregator-server/cmd/server/main.go | 13 + .../internal/api/handlers/agents.go | 22 +- .../internal/api/handlers/subsystems.go | 327 ++++++++++++ .../migrations/015_agent_subsystems.down.sql | 17 + .../migrations/015_agent_subsystems.up.sql | 81 +++ .../internal/database/queries/agents.go | 14 + .../internal/database/queries/subsystems.go | 293 +++++++++++ .../internal/models/subsystem.go | 51 ++ .../src/components/AgentScanners.tsx | 483 ++++++++++-------- .../src/components/ChatTimeline.tsx | 130 ++++- aggregator-web/src/lib/api.ts | 51 +- aggregator-web/src/types/index.ts | 32 ++ 19 files changed, 2151 insertions(+), 253 deletions(-) create mode 100644 aggregator-agent/cmd/agent/subsystem_handlers.go create mode 100644 aggregator-agent/internal/orchestrator/orchestrator.go create mode 100644 aggregator-agent/internal/orchestrator/scanner_wrappers.go create mode 100644 aggregator-agent/internal/orchestrator/storage_scanner.go create mode 100644 aggregator-agent/internal/orchestrator/system_scanner.go create mode 100644 aggregator-server/internal/api/handlers/subsystems.go create mode 100644 aggregator-server/internal/database/migrations/015_agent_subsystems.down.sql create mode 100644 aggregator-server/internal/database/migrations/015_agent_subsystems.up.sql create mode 100644 aggregator-server/internal/database/queries/subsystems.go create mode 100644 aggregator-server/internal/models/subsystem.go diff --git a/aggregator-agent/cmd/agent/main.go b/aggregator-agent/cmd/agent/main.go index 6886de4..88ae76a 100644 --- a/aggregator-agent/cmd/agent/main.go +++ b/aggregator-agent/cmd/agent/main.go @@ -19,6 +19,7 @@ import ( "github.com/Fimeg/RedFlag/aggregator-agent/internal/config" "github.com/Fimeg/RedFlag/aggregator-agent/internal/display" "github.com/Fimeg/RedFlag/aggregator-agent/internal/installer" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/orchestrator" "github.com/Fimeg/RedFlag/aggregator-agent/internal/scanner" "github.com/Fimeg/RedFlag/aggregator-agent/internal/service" "github.com/Fimeg/RedFlag/aggregator-agent/internal/system" @@ -26,7 +27,7 @@ import ( ) const ( - AgentVersion = "0.1.19" // Phase 0: Circuit breakers, timeouts, and subsystem resilience + AgentVersion = "0.1.20" // Phase 1: Granular subsystem commands and parallel scanner execution ) // getConfigPath returns the platform-specific config path @@ -466,6 +467,37 @@ func runAgent(cfg *config.Config) error { HalfOpenAttempts: cfg.Subsystems.Winget.CircuitBreaker.HalfOpenAttempts, }) + // Initialize scanner orchestrator for parallel execution and granular subsystem management + scanOrchestrator := orchestrator.NewOrchestrator() + + // Register update scanners + scanOrchestrator.RegisterScanner("apt", orchestrator.NewAPTScannerWrapper(aptScanner), aptCB, cfg.Subsystems.APT.Timeout, cfg.Subsystems.APT.Enabled) + scanOrchestrator.RegisterScanner("dnf", orchestrator.NewDNFScannerWrapper(dnfScanner), dnfCB, cfg.Subsystems.DNF.Timeout, cfg.Subsystems.DNF.Enabled) + scanOrchestrator.RegisterScanner("docker", orchestrator.NewDockerScannerWrapper(dockerScanner), dockerCB, cfg.Subsystems.Docker.Timeout, cfg.Subsystems.Docker.Enabled) + scanOrchestrator.RegisterScanner("windows", orchestrator.NewWindowsUpdateScannerWrapper(windowsUpdateScanner), windowsCB, cfg.Subsystems.Windows.Timeout, cfg.Subsystems.Windows.Enabled) + scanOrchestrator.RegisterScanner("winget", orchestrator.NewWingetScannerWrapper(wingetScanner), wingetCB, cfg.Subsystems.Winget.Timeout, cfg.Subsystems.Winget.Enabled) + + // Register storage and system scanners + storageScanner := orchestrator.NewStorageScanner(AgentVersion) + systemScanner := orchestrator.NewSystemScanner(AgentVersion) + + // Storage and system scanners don't need circuit breakers (always available, fast operations) + storageCB := circuitbreaker.New("Storage", circuitbreaker.Config{ + FailureThreshold: 5, + FailureWindow: 10 * time.Minute, + OpenDuration: 5 * time.Minute, + HalfOpenAttempts: 1, + }) + systemCB := circuitbreaker.New("System", circuitbreaker.Config{ + FailureThreshold: 5, + FailureWindow: 10 * time.Minute, + OpenDuration: 5 * time.Minute, + HalfOpenAttempts: 1, + }) + + scanOrchestrator.RegisterScanner("storage", storageScanner, storageCB, 30*time.Second, cfg.Subsystems.Storage.Enabled) + scanOrchestrator.RegisterScanner("system", systemScanner, systemCB, 30*time.Second, true) // System scanner always enabled + // Initialize acknowledgment tracker for command result reliability ackTracker := acknowledgment.NewTracker(getStatePath()) if err := ackTracker.Load(); err != nil { @@ -610,10 +642,25 @@ func runAgent(cfg *config.Config) error { switch cmd.Type { case "scan_updates": - if err := handleScanUpdates(apiClient, cfg, ackTracker, aptScanner, dnfScanner, dockerScanner, windowsUpdateScanner, wingetScanner, aptCB, dnfCB, dockerCB, windowsCB, wingetCB, cmd.ID); err != nil { + if err := handleScanUpdatesV2(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil { log.Printf("Error scanning updates: %v\n", err) } + case "scan_storage": + if err := handleScanStorage(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil { + log.Printf("Error scanning storage: %v\n", err) + } + + case "scan_system": + if err := handleScanSystem(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil { + log.Printf("Error scanning system: %v\n", err) + } + + case "scan_docker": + if err := handleScanDocker(apiClient, cfg, ackTracker, scanOrchestrator, cmd.ID); err != nil { + log.Printf("Error scanning Docker: %v\n", err) + } + case "collect_specs": log.Println("Spec collection not yet implemented") diff --git a/aggregator-agent/cmd/agent/subsystem_handlers.go b/aggregator-agent/cmd/agent/subsystem_handlers.go new file mode 100644 index 0000000..d65c308 --- /dev/null +++ b/aggregator-agent/cmd/agent/subsystem_handlers.go @@ -0,0 +1,232 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/Fimeg/RedFlag/aggregator-agent/internal/acknowledgment" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/config" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/orchestrator" +) + +// handleScanUpdatesV2 scans all update subsystems (APT, DNF, Docker, Windows Update, Winget) in parallel +// This is the new orchestrator-based version for v0.1.20 +func handleScanUpdatesV2(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error { + log.Println("Scanning for updates (parallel execution)...") + + ctx := context.Background() + startTime := time.Now() + + // Execute all update scanners in parallel + results, allUpdates := orch.ScanAll(ctx) + + // Format results + stdout, stderr, exitCode := orchestrator.FormatScanSummary(results) + + // Add timing information + duration := time.Since(startTime) + stdout += fmt.Sprintf("\nScan completed in %.2f seconds\n", duration.Seconds()) + + // Create scan log entry with subsystem metadata + logReport := client.LogReport{ + CommandID: commandID, + Action: "scan_updates", + Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0], + Stdout: stdout, + Stderr: stderr, + ExitCode: exitCode, + DurationSeconds: int(duration.Seconds()), + } + + // Report the scan log + if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil { + log.Printf("Failed to report scan log: %v\n", err) + // Continue anyway - updates are more important + } + + // Report updates to server if any were found + if len(allUpdates) > 0 { + report := client.UpdateReport{ + CommandID: commandID, + Timestamp: time.Now(), + Updates: allUpdates, + } + + if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil { + return fmt.Errorf("failed to report updates: %w", err) + } + + log.Printf("✓ Reported %d updates to server\n", len(allUpdates)) + } else { + log.Println("No updates found") + } + + return nil +} + +// handleScanStorage scans disk usage metrics only +func handleScanStorage(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error { + log.Println("Scanning storage...") + + ctx := context.Background() + startTime := time.Now() + + // Execute storage scanner + result, err := orch.ScanSingle(ctx, "storage") + if err != nil { + return fmt.Errorf("failed to scan storage: %w", err) + } + + // Format results + results := []orchestrator.ScanResult{result} + stdout, stderr, exitCode := orchestrator.FormatScanSummary(results) + + duration := time.Since(startTime) + stdout += fmt.Sprintf("\nStorage scan completed in %.2f seconds\n", duration.Seconds()) + + // Create scan log entry + logReport := client.LogReport{ + CommandID: commandID, + Action: "scan_storage", + Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0], + Stdout: stdout, + Stderr: stderr, + ExitCode: exitCode, + DurationSeconds: int(duration.Seconds()), + } + + // Report the scan log + if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil { + log.Printf("Failed to report scan log: %v\n", err) + } + + // Report "updates" (disk info) to server + if len(result.Updates) > 0 { + report := client.UpdateReport{ + CommandID: commandID, + Timestamp: time.Now(), + Updates: result.Updates, + } + + if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil { + return fmt.Errorf("failed to report storage metrics: %w", err) + } + + log.Printf("✓ Reported %d disk mount points to server\n", len(result.Updates)) + } + + return nil +} + +// handleScanSystem scans system metrics (CPU, memory, processes, uptime) +func handleScanSystem(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error { + log.Println("Scanning system metrics...") + + ctx := context.Background() + startTime := time.Now() + + // Execute system scanner + result, err := orch.ScanSingle(ctx, "system") + if err != nil { + return fmt.Errorf("failed to scan system: %w", err) + } + + // Format results + results := []orchestrator.ScanResult{result} + stdout, stderr, exitCode := orchestrator.FormatScanSummary(results) + + duration := time.Since(startTime) + stdout += fmt.Sprintf("\nSystem scan completed in %.2f seconds\n", duration.Seconds()) + + // Create scan log entry + logReport := client.LogReport{ + CommandID: commandID, + Action: "scan_system", + Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0], + Stdout: stdout, + Stderr: stderr, + ExitCode: exitCode, + DurationSeconds: int(duration.Seconds()), + } + + // Report the scan log + if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil { + log.Printf("Failed to report scan log: %v\n", err) + } + + // Report "updates" (system metrics) to server + if len(result.Updates) > 0 { + report := client.UpdateReport{ + CommandID: commandID, + Timestamp: time.Now(), + Updates: result.Updates, + } + + if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil { + return fmt.Errorf("failed to report system metrics: %w", err) + } + + log.Printf("✓ Reported system metrics to server\n") + } + + return nil +} + +// handleScanDocker scans Docker image updates only +func handleScanDocker(apiClient *client.Client, cfg *config.Config, ackTracker *acknowledgment.Tracker, orch *orchestrator.Orchestrator, commandID string) error { + log.Println("Scanning Docker images...") + + ctx := context.Background() + startTime := time.Now() + + // Execute Docker scanner + result, err := orch.ScanSingle(ctx, "docker") + if err != nil { + return fmt.Errorf("failed to scan Docker: %w", err) + } + + // Format results + results := []orchestrator.ScanResult{result} + stdout, stderr, exitCode := orchestrator.FormatScanSummary(results) + + duration := time.Since(startTime) + stdout += fmt.Sprintf("\nDocker scan completed in %.2f seconds\n", duration.Seconds()) + + // Create scan log entry + logReport := client.LogReport{ + CommandID: commandID, + Action: "scan_docker", + Result: map[bool]string{true: "success", false: "failure"}[exitCode == 0], + Stdout: stdout, + Stderr: stderr, + ExitCode: exitCode, + DurationSeconds: int(duration.Seconds()), + } + + // Report the scan log + if err := reportLogWithAck(apiClient, cfg, ackTracker, logReport); err != nil { + log.Printf("Failed to report scan log: %v\n", err) + } + + // Report updates to server if any were found + if len(result.Updates) > 0 { + report := client.UpdateReport{ + CommandID: commandID, + Timestamp: time.Now(), + Updates: result.Updates, + } + + if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil { + return fmt.Errorf("failed to report Docker updates: %w", err) + } + + log.Printf("✓ Reported %d Docker image updates to server\n", len(result.Updates)) + } else { + log.Println("No Docker image updates found") + } + + return nil +} diff --git a/aggregator-agent/internal/orchestrator/orchestrator.go b/aggregator-agent/internal/orchestrator/orchestrator.go new file mode 100644 index 0000000..b446b5c --- /dev/null +++ b/aggregator-agent/internal/orchestrator/orchestrator.go @@ -0,0 +1,261 @@ +package orchestrator + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + "github.com/Fimeg/RedFlag/aggregator-agent/internal/circuitbreaker" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" +) + +// Scanner represents a generic update scanner +type Scanner interface { + // IsAvailable checks if the scanner is available on this system + IsAvailable() bool + + // Scan performs the actual scanning and returns update items + Scan() ([]client.UpdateReportItem, error) + + // Name returns the scanner name for logging + Name() string +} + +// ScannerConfig holds configuration for a single scanner +type ScannerConfig struct { + Scanner Scanner + CircuitBreaker *circuitbreaker.CircuitBreaker + Timeout time.Duration + Enabled bool +} + +// ScanResult holds the result of a scanner execution +type ScanResult struct { + ScannerName string + Updates []client.UpdateReportItem + Error error + Duration time.Duration + Status string // "success", "failed", "disabled", "unavailable", "skipped" +} + +// Orchestrator manages and coordinates multiple scanners +type Orchestrator struct { + scanners map[string]*ScannerConfig + mu sync.RWMutex +} + +// NewOrchestrator creates a new scanner orchestrator +func NewOrchestrator() *Orchestrator { + return &Orchestrator{ + scanners: make(map[string]*ScannerConfig), + } +} + +// RegisterScanner adds a scanner to the orchestrator +func (o *Orchestrator) RegisterScanner(name string, scanner Scanner, cb *circuitbreaker.CircuitBreaker, timeout time.Duration, enabled bool) { + o.mu.Lock() + defer o.mu.Unlock() + + o.scanners[name] = &ScannerConfig{ + Scanner: scanner, + CircuitBreaker: cb, + Timeout: timeout, + Enabled: enabled, + } +} + +// ScanAll executes all registered scanners in parallel +func (o *Orchestrator) ScanAll(ctx context.Context) ([]ScanResult, []client.UpdateReportItem) { + o.mu.RLock() + defer o.mu.RUnlock() + + var wg sync.WaitGroup + resultsChan := make(chan ScanResult, len(o.scanners)) + + // Launch goroutine for each scanner + for name, scannerConfig := range o.scanners { + wg.Add(1) + go func(name string, cfg *ScannerConfig) { + defer wg.Done() + result := o.executeScan(ctx, name, cfg) + resultsChan <- result + }(name, scannerConfig) + } + + // Wait for all scanners to complete + wg.Wait() + close(resultsChan) + + // Collect results + var results []ScanResult + var allUpdates []client.UpdateReportItem + + for result := range resultsChan { + results = append(results, result) + if result.Error == nil && len(result.Updates) > 0 { + allUpdates = append(allUpdates, result.Updates...) + } + } + + return results, allUpdates +} + +// ScanSingle executes a single scanner by name +func (o *Orchestrator) ScanSingle(ctx context.Context, scannerName string) (ScanResult, error) { + o.mu.RLock() + defer o.mu.RUnlock() + + cfg, exists := o.scanners[scannerName] + if !exists { + return ScanResult{ + ScannerName: scannerName, + Status: "failed", + Error: fmt.Errorf("scanner not found: %s", scannerName), + }, fmt.Errorf("scanner not found: %s", scannerName) + } + + return o.executeScan(ctx, scannerName, cfg), nil +} + +// executeScan runs a single scanner with circuit breaker and timeout protection +func (o *Orchestrator) executeScan(ctx context.Context, name string, cfg *ScannerConfig) ScanResult { + result := ScanResult{ + ScannerName: name, + Status: "failed", + } + + startTime := time.Now() + defer func() { + result.Duration = time.Since(startTime) + }() + + // Check if enabled + if !cfg.Enabled { + result.Status = "disabled" + log.Printf("[%s] Scanner disabled via configuration", name) + return result + } + + // Check if available + if !cfg.Scanner.IsAvailable() { + result.Status = "unavailable" + log.Printf("[%s] Scanner not available on this system", name) + return result + } + + // Execute with circuit breaker and timeout + log.Printf("[%s] Starting scan...", name) + + var updates []client.UpdateReportItem + + err := cfg.CircuitBreaker.Call(func() error { + // Create timeout context + timeoutCtx, cancel := context.WithTimeout(ctx, cfg.Timeout) + defer cancel() + + // Channel for scan result + type scanResult struct { + updates []client.UpdateReportItem + err error + } + scanChan := make(chan scanResult, 1) + + // Run scan in goroutine + go func() { + u, e := cfg.Scanner.Scan() + scanChan <- scanResult{updates: u, err: e} + }() + + // Wait for scan or timeout + select { + case <-timeoutCtx.Done(): + return fmt.Errorf("scan timeout after %v", cfg.Timeout) + case res := <-scanChan: + if res.err != nil { + return res.err + } + updates = res.updates + return nil + } + }) + + if err != nil { + result.Error = err + result.Status = "failed" + log.Printf("[%s] Scan failed: %v", name, err) + return result + } + + result.Updates = updates + result.Status = "success" + log.Printf("[%s] Scan completed: found %d updates (took %v)", name, len(updates), result.Duration) + + return result +} + +// GetScannerNames returns a list of all registered scanner names +func (o *Orchestrator) GetScannerNames() []string { + o.mu.RLock() + defer o.mu.RUnlock() + + names := make([]string, 0, len(o.scanners)) + for name := range o.scanners { + names = append(names, name) + } + return names +} + +// FormatScanSummary creates a human-readable summary of scan results +func FormatScanSummary(results []ScanResult) (stdout string, stderr string, exitCode int) { + var successResults []string + var errorMessages []string + totalUpdates := 0 + + for _, result := range results { + switch result.Status { + case "success": + msg := fmt.Sprintf("%s: Found %d updates (%.2fs)", + result.ScannerName, len(result.Updates), result.Duration.Seconds()) + successResults = append(successResults, msg) + totalUpdates += len(result.Updates) + + case "failed": + msg := fmt.Sprintf("%s: %v", result.ScannerName, result.Error) + errorMessages = append(errorMessages, msg) + + case "disabled": + successResults = append(successResults, fmt.Sprintf("%s: Disabled", result.ScannerName)) + + case "unavailable": + successResults = append(successResults, fmt.Sprintf("%s: Not available", result.ScannerName)) + } + } + + // Build stdout + if len(successResults) > 0 { + stdout = "Scan Results:\n" + for _, msg := range successResults { + stdout += fmt.Sprintf(" - %s\n", msg) + } + stdout += fmt.Sprintf("\nTotal Updates Found: %d\n", totalUpdates) + } + + // Build stderr + if len(errorMessages) > 0 { + stderr = "Scan Errors:\n" + for _, msg := range errorMessages { + stderr += fmt.Sprintf(" - %s\n", msg) + } + } + + // Determine exit code + if len(errorMessages) > 0 { + exitCode = 1 + } else { + exitCode = 0 + } + + return stdout, stderr, exitCode +} diff --git a/aggregator-agent/internal/orchestrator/scanner_wrappers.go b/aggregator-agent/internal/orchestrator/scanner_wrappers.go new file mode 100644 index 0000000..463a02f --- /dev/null +++ b/aggregator-agent/internal/orchestrator/scanner_wrappers.go @@ -0,0 +1,114 @@ +package orchestrator + +import ( + "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/scanner" +) + +// APTScannerWrapper wraps the APT scanner to implement the Scanner interface +type APTScannerWrapper struct { + scanner *scanner.APTScanner +} + +func NewAPTScannerWrapper(s *scanner.APTScanner) *APTScannerWrapper { + return &APTScannerWrapper{scanner: s} +} + +func (w *APTScannerWrapper) IsAvailable() bool { + return w.scanner.IsAvailable() +} + +func (w *APTScannerWrapper) Scan() ([]client.UpdateReportItem, error) { + return w.scanner.Scan() +} + +func (w *APTScannerWrapper) Name() string { + return "APT Update Scanner" +} + +// DNFScannerWrapper wraps the DNF scanner to implement the Scanner interface +type DNFScannerWrapper struct { + scanner *scanner.DNFScanner +} + +func NewDNFScannerWrapper(s *scanner.DNFScanner) *DNFScannerWrapper { + return &DNFScannerWrapper{scanner: s} +} + +func (w *DNFScannerWrapper) IsAvailable() bool { + return w.scanner.IsAvailable() +} + +func (w *DNFScannerWrapper) Scan() ([]client.UpdateReportItem, error) { + return w.scanner.Scan() +} + +func (w *DNFScannerWrapper) Name() string { + return "DNF Update Scanner" +} + +// DockerScannerWrapper wraps the Docker scanner to implement the Scanner interface +type DockerScannerWrapper struct { + scanner *scanner.DockerScanner +} + +func NewDockerScannerWrapper(s *scanner.DockerScanner) *DockerScannerWrapper { + return &DockerScannerWrapper{scanner: s} +} + +func (w *DockerScannerWrapper) IsAvailable() bool { + if w.scanner == nil { + return false + } + return w.scanner.IsAvailable() +} + +func (w *DockerScannerWrapper) Scan() ([]client.UpdateReportItem, error) { + return w.scanner.Scan() +} + +func (w *DockerScannerWrapper) Name() string { + return "Docker Image Update Scanner" +} + +// WindowsUpdateScannerWrapper wraps the Windows Update scanner to implement the Scanner interface +type WindowsUpdateScannerWrapper struct { + scanner *scanner.WindowsUpdateScanner +} + +func NewWindowsUpdateScannerWrapper(s *scanner.WindowsUpdateScanner) *WindowsUpdateScannerWrapper { + return &WindowsUpdateScannerWrapper{scanner: s} +} + +func (w *WindowsUpdateScannerWrapper) IsAvailable() bool { + return w.scanner.IsAvailable() +} + +func (w *WindowsUpdateScannerWrapper) Scan() ([]client.UpdateReportItem, error) { + return w.scanner.Scan() +} + +func (w *WindowsUpdateScannerWrapper) Name() string { + return "Windows Update Scanner" +} + +// WingetScannerWrapper wraps the Winget scanner to implement the Scanner interface +type WingetScannerWrapper struct { + scanner *scanner.WingetScanner +} + +func NewWingetScannerWrapper(s *scanner.WingetScanner) *WingetScannerWrapper { + return &WingetScannerWrapper{scanner: s} +} + +func (w *WingetScannerWrapper) IsAvailable() bool { + return w.scanner.IsAvailable() +} + +func (w *WingetScannerWrapper) Scan() ([]client.UpdateReportItem, error) { + return w.scanner.Scan() +} + +func (w *WingetScannerWrapper) Name() string { + return "Winget Package Update Scanner" +} diff --git a/aggregator-agent/internal/orchestrator/storage_scanner.go b/aggregator-agent/internal/orchestrator/storage_scanner.go new file mode 100644 index 0000000..7f41ac6 --- /dev/null +++ b/aggregator-agent/internal/orchestrator/storage_scanner.go @@ -0,0 +1,87 @@ +package orchestrator + +import ( + "fmt" + + "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/system" +) + +// StorageScanner scans disk usage metrics +type StorageScanner struct { + agentVersion string +} + +// NewStorageScanner creates a new storage scanner +func NewStorageScanner(agentVersion string) *StorageScanner { + return &StorageScanner{ + agentVersion: agentVersion, + } +} + +// IsAvailable always returns true since storage scanning is always available +func (s *StorageScanner) IsAvailable() bool { + return true +} + +// Scan collects disk usage information and returns it as "updates" for reporting +func (s *StorageScanner) Scan() ([]client.UpdateReportItem, error) { + sysInfo, err := system.GetSystemInfo(s.agentVersion) + if err != nil { + return nil, fmt.Errorf("failed to get system info: %w", err) + } + + if len(sysInfo.DiskInfo) == 0 { + return nil, fmt.Errorf("no disk information available") + } + + // Convert disk info to UpdateReportItem format for reporting + // This is a bit unconventional but allows us to use the existing reporting infrastructure + var items []client.UpdateReportItem + + for _, disk := range sysInfo.DiskInfo { + // Create a pseudo-update item for each disk + item := client.UpdateReportItem{ + PackageName: fmt.Sprintf("disk-%s", disk.Mountpoint), + CurrentVersion: fmt.Sprintf("%.1f%% used", disk.UsedPercent), + AvailableVersion: fmt.Sprintf("%d GB available", disk.Available/(1024*1024*1024)), + PackageType: "storage", + Severity: determineDiskSeverity(disk.UsedPercent), + PackageDescription: fmt.Sprintf("Disk: %s (%s) - %s", disk.Mountpoint, disk.Filesystem, disk.Device), + Metadata: map[string]interface{}{ + "mountpoint": disk.Mountpoint, + "filesystem": disk.Filesystem, + "device": disk.Device, + "disk_type": disk.DiskType, + "total_bytes": disk.Total, + "used_bytes": disk.Used, + "available_bytes": disk.Available, + "used_percent": disk.UsedPercent, + "is_root": disk.IsRoot, + "is_largest": disk.IsLargest, + }, + } + items = append(items, item) + } + + return items, nil +} + +// Name returns the scanner name +func (s *StorageScanner) Name() string { + return "Disk Usage Reporter" +} + +// determineDiskSeverity returns severity based on disk usage percentage +func determineDiskSeverity(usedPercent float64) string { + switch { + case usedPercent >= 95: + return "critical" + case usedPercent >= 90: + return "important" + case usedPercent >= 80: + return "moderate" + default: + return "low" + } +} diff --git a/aggregator-agent/internal/orchestrator/system_scanner.go b/aggregator-agent/internal/orchestrator/system_scanner.go new file mode 100644 index 0000000..c9e6aaa --- /dev/null +++ b/aggregator-agent/internal/orchestrator/system_scanner.go @@ -0,0 +1,137 @@ +package orchestrator + +import ( + "fmt" + + "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/system" +) + +// SystemScanner scans system metrics (CPU, memory, processes, uptime) +type SystemScanner struct { + agentVersion string +} + +// NewSystemScanner creates a new system scanner +func NewSystemScanner(agentVersion string) *SystemScanner { + return &SystemScanner{ + agentVersion: agentVersion, + } +} + +// IsAvailable always returns true since system scanning is always available +func (s *SystemScanner) IsAvailable() bool { + return true +} + +// Scan collects system information and returns it as "updates" for reporting +func (s *SystemScanner) Scan() ([]client.UpdateReportItem, error) { + sysInfo, err := system.GetSystemInfo(s.agentVersion) + if err != nil { + return nil, fmt.Errorf("failed to get system info: %w", err) + } + + // Convert system info to UpdateReportItem format for reporting + var items []client.UpdateReportItem + + // CPU info item + cpuItem := client.UpdateReportItem{ + PackageName: "system-cpu", + CurrentVersion: fmt.Sprintf("%d cores, %d threads", sysInfo.CPUInfo.Cores, sysInfo.CPUInfo.Threads), + AvailableVersion: sysInfo.CPUInfo.ModelName, + PackageType: "system", + Severity: "low", + PackageDescription: fmt.Sprintf("CPU: %s", sysInfo.CPUInfo.ModelName), + Metadata: map[string]interface{}{ + "cpu_model": sysInfo.CPUInfo.ModelName, + "cpu_cores": sysInfo.CPUInfo.Cores, + "cpu_threads": sysInfo.CPUInfo.Threads, + }, + } + items = append(items, cpuItem) + + // Memory info item + memItem := client.UpdateReportItem{ + PackageName: "system-memory", + CurrentVersion: fmt.Sprintf("%.1f%% used", sysInfo.MemoryInfo.UsedPercent), + AvailableVersion: fmt.Sprintf("%d GB total", sysInfo.MemoryInfo.Total/(1024*1024*1024)), + PackageType: "system", + Severity: determineMemorySeverity(sysInfo.MemoryInfo.UsedPercent), + PackageDescription: fmt.Sprintf("Memory: %.1f GB / %.1f GB used", + float64(sysInfo.MemoryInfo.Used)/(1024*1024*1024), + float64(sysInfo.MemoryInfo.Total)/(1024*1024*1024)), + Metadata: map[string]interface{}{ + "memory_total": sysInfo.MemoryInfo.Total, + "memory_used": sysInfo.MemoryInfo.Used, + "memory_available": sysInfo.MemoryInfo.Available, + "memory_used_percent": sysInfo.MemoryInfo.UsedPercent, + }, + } + items = append(items, memItem) + + // Process count item + processItem := client.UpdateReportItem{ + PackageName: "system-processes", + CurrentVersion: fmt.Sprintf("%d processes", sysInfo.RunningProcesses), + AvailableVersion: "n/a", + PackageType: "system", + Severity: "low", + PackageDescription: fmt.Sprintf("Running Processes: %d", sysInfo.RunningProcesses), + Metadata: map[string]interface{}{ + "process_count": sysInfo.RunningProcesses, + }, + } + items = append(items, processItem) + + // Uptime item + uptimeItem := client.UpdateReportItem{ + PackageName: "system-uptime", + CurrentVersion: sysInfo.Uptime, + AvailableVersion: "n/a", + PackageType: "system", + Severity: "low", + PackageDescription: fmt.Sprintf("System Uptime: %s", sysInfo.Uptime), + Metadata: map[string]interface{}{ + "uptime": sysInfo.Uptime, + }, + } + items = append(items, uptimeItem) + + // Reboot required item (if applicable) + if sysInfo.RebootRequired { + rebootItem := client.UpdateReportItem{ + PackageName: "system-reboot", + CurrentVersion: "required", + AvailableVersion: "n/a", + PackageType: "system", + Severity: "important", + PackageDescription: fmt.Sprintf("Reboot Required: %s", sysInfo.RebootReason), + Metadata: map[string]interface{}{ + "reboot_required": true, + "reboot_reason": sysInfo.RebootReason, + }, + } + items = append(items, rebootItem) + } + + return items, nil +} + +// Name returns the scanner name +func (s *SystemScanner) Name() string { + return "System Metrics Reporter" +} + +// determineMemorySeverity returns severity based on memory usage percentage +func determineMemorySeverity(usedPercent float64) string { + switch { + case usedPercent >= 95: + return "critical" + case usedPercent >= 90: + return "important" + case usedPercent >= 80: + return "moderate" + default: + return "low" + } +} diff --git a/aggregator-server/Dockerfile b/aggregator-server/Dockerfile index f1d9a67..b836734 100644 --- a/aggregator-server/Dockerfile +++ b/aggregator-server/Dockerfile @@ -16,16 +16,16 @@ WORKDIR /build COPY aggregator-agent/ ./ # Build for Linux amd64 -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o binaries/linux-amd64/redflag-agent cmd/agent/main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o binaries/linux-amd64/redflag-agent ./cmd/agent # Build for Linux arm64 -RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o binaries/linux-arm64/redflag-agent cmd/agent/main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o binaries/linux-arm64/redflag-agent ./cmd/agent # Build for Windows amd64 -RUN CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o binaries/windows-amd64/redflag-agent.exe cmd/agent/main.go +RUN CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o binaries/windows-amd64/redflag-agent.exe ./cmd/agent # Build for Windows arm64 -RUN CGO_ENABLED=0 GOOS=windows GOARCH=arm64 go build -o binaries/windows-arm64/redflag-agent.exe cmd/agent/main.go +RUN CGO_ENABLED=0 GOOS=windows GOARCH=arm64 go build -o binaries/windows-arm64/redflag-agent.exe ./cmd/agent # Stage 3: Final image with server and all agent binaries FROM alpine:latest diff --git a/aggregator-server/cmd/server/main.go b/aggregator-server/cmd/server/main.go index c13924c..7a2e824 100644 --- a/aggregator-server/cmd/server/main.go +++ b/aggregator-server/cmd/server/main.go @@ -128,6 +128,7 @@ func main() { refreshTokenQueries := queries.NewRefreshTokenQueries(db.DB) registrationTokenQueries := queries.NewRegistrationTokenQueries(db.DB) userQueries := queries.NewUserQueries(db.DB) + subsystemQueries := queries.NewSubsystemQueries(db.DB) // Ensure admin user exists if err := userQueries.EnsureAdminUser(cfg.Admin.Username, cfg.Admin.Username+"@redflag.local", cfg.Admin.Password); err != nil { @@ -153,6 +154,7 @@ func main() { registrationTokenHandler := handlers.NewRegistrationTokenHandler(registrationTokenQueries, agentQueries, cfg) rateLimitHandler := handlers.NewRateLimitHandler(rateLimiter) downloadHandler := handlers.NewDownloadHandler(filepath.Join("/app"), cfg) + subsystemHandler := handlers.NewSubsystemHandler(subsystemQueries, commandQueries) // Setup router router := gin.Default() @@ -195,6 +197,17 @@ func main() { agents.POST("/:id/system-info", rateLimiter.RateLimit("agent_reports", middleware.KeyByAgentID), agentHandler.ReportSystemInfo) agents.POST("/:id/rapid-mode", rateLimiter.RateLimit("agent_reports", middleware.KeyByAgentID), agentHandler.SetRapidPollingMode) agents.DELETE("/:id", agentHandler.UnregisterAgent) + + // Subsystem routes + agents.GET("/:id/subsystems", subsystemHandler.GetSubsystems) + agents.GET("/:id/subsystems/:subsystem", subsystemHandler.GetSubsystem) + agents.PATCH("/:id/subsystems/:subsystem", subsystemHandler.UpdateSubsystem) + agents.POST("/:id/subsystems/:subsystem/enable", subsystemHandler.EnableSubsystem) + agents.POST("/:id/subsystems/:subsystem/disable", subsystemHandler.DisableSubsystem) + agents.POST("/:id/subsystems/:subsystem/trigger", subsystemHandler.TriggerSubsystem) + agents.GET("/:id/subsystems/:subsystem/stats", subsystemHandler.GetSubsystemStats) + agents.POST("/:id/subsystems/:subsystem/auto-run", subsystemHandler.SetAutoRun) + agents.POST("/:id/subsystems/:subsystem/interval", subsystemHandler.SetInterval) } // Dashboard/Web routes (protected by web auth) diff --git a/aggregator-server/internal/api/handlers/agents.go b/aggregator-server/internal/api/handlers/agents.go index 1c244b7..586f7cd 100644 --- a/aggregator-server/internal/api/handlers/agents.go +++ b/aggregator-server/internal/api/handlers/agents.go @@ -256,8 +256,8 @@ func (h *AgentHandler) GetCommands(c *gin.Context) { } } - // Update agent with new metadata - if err := h.agentQueries.UpdateAgent(agent); err != nil { + // Update agent with new metadata (preserve version tracking) + if err := h.agentQueries.UpdateAgentMetadata(agentID, agent.Metadata, agent.Status, time.Now()); err != nil { log.Printf("Warning: Failed to update agent metrics: %v", err) } } @@ -269,21 +269,7 @@ func (h *AgentHandler) GetCommands(c *gin.Context) { return } - // Process command acknowledgments if agent sent any - var acknowledgedIDs []string - if metrics != nil && len(metrics.PendingAcknowledgments) > 0 { - // Verify which commands from the agent's pending list have been recorded - verified, err := h.commandQueries.VerifyCommandsCompleted(metrics.PendingAcknowledgments) - if err != nil { - log.Printf("Warning: Failed to verify command acknowledgments for agent %s: %v", agentID, err) - } else { - acknowledgedIDs = verified - if len(acknowledgedIDs) > 0 { - log.Printf("Acknowledged %d command results for agent %s", len(acknowledgedIDs), agentID) - } - } - } - + // Process heartbeat metadata from agent check-ins if metrics.Metadata != nil { agent, err := h.agentQueries.GetAgentByID(agentID) @@ -454,7 +440,7 @@ func (h *AgentHandler) GetCommands(c *gin.Context) { response := models.CommandsResponse{ Commands: commandItems, RapidPolling: rapidPolling, - AcknowledgedIDs: acknowledgedIDs, + AcknowledgedIDs: []string{}, // No acknowledgments in current implementation } c.JSON(http.StatusOK, response) diff --git a/aggregator-server/internal/api/handlers/subsystems.go b/aggregator-server/internal/api/handlers/subsystems.go new file mode 100644 index 0000000..c353d4e --- /dev/null +++ b/aggregator-server/internal/api/handlers/subsystems.go @@ -0,0 +1,327 @@ +package handlers + +import ( + "net/http" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/database/queries" + "github.com/Fimeg/RedFlag/aggregator-server/internal/models" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +type SubsystemHandler struct { + subsystemQueries *queries.SubsystemQueries + commandQueries *queries.CommandQueries +} + +func NewSubsystemHandler(sq *queries.SubsystemQueries, cq *queries.CommandQueries) *SubsystemHandler { + return &SubsystemHandler{ + subsystemQueries: sq, + commandQueries: cq, + } +} + +// GetSubsystems retrieves all subsystems for an agent +// GET /api/v1/agents/:id/subsystems +func (h *SubsystemHandler) GetSubsystems(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystems, err := h.subsystemQueries.GetSubsystems(agentID) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve subsystems"}) + return + } + + c.JSON(http.StatusOK, subsystems) +} + +// GetSubsystem retrieves a specific subsystem for an agent +// GET /api/v1/agents/:id/subsystems/:subsystem +func (h *SubsystemHandler) GetSubsystem(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + sub, err := h.subsystemQueries.GetSubsystem(agentID, subsystem) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve subsystem"}) + return + } + + if sub == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + + c.JSON(http.StatusOK, sub) +} + +// UpdateSubsystem updates subsystem configuration +// PATCH /api/v1/agents/:id/subsystems/:subsystem +func (h *SubsystemHandler) UpdateSubsystem(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + var config models.SubsystemConfig + if err := c.ShouldBindJSON(&config); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate interval if provided + if config.IntervalMinutes != nil && (*config.IntervalMinutes < 5 || *config.IntervalMinutes > 1440) { + c.JSON(http.StatusBadRequest, gin.H{"error": "Interval must be between 5 and 1440 minutes"}) + return + } + + err = h.subsystemQueries.UpdateSubsystem(agentID, subsystem, config) + if err != nil { + if err.Error() == "subsystem not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update subsystem"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "Subsystem updated successfully"}) +} + +// EnableSubsystem enables a subsystem +// POST /api/v1/agents/:id/subsystems/:subsystem/enable +func (h *SubsystemHandler) EnableSubsystem(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + err = h.subsystemQueries.EnableSubsystem(agentID, subsystem) + if err != nil { + if err.Error() == "subsystem not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to enable subsystem"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "Subsystem enabled successfully"}) +} + +// DisableSubsystem disables a subsystem +// POST /api/v1/agents/:id/subsystems/:subsystem/disable +func (h *SubsystemHandler) DisableSubsystem(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + err = h.subsystemQueries.DisableSubsystem(agentID, subsystem) + if err != nil { + if err.Error() == "subsystem not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to disable subsystem"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "Subsystem disabled successfully"}) +} + +// TriggerSubsystem manually triggers a subsystem scan +// POST /api/v1/agents/:id/subsystems/:subsystem/trigger +func (h *SubsystemHandler) TriggerSubsystem(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + // Verify subsystem exists and is enabled + sub, err := h.subsystemQueries.GetSubsystem(agentID, subsystem) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve subsystem"}) + return + } + + if sub == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + + if !sub.Enabled { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem is disabled"}) + return + } + + // Create command for the subsystem + commandType := "scan_" + subsystem + command := &models.AgentCommand{ + AgentID: agentID, + CommandType: commandType, + Status: "pending", + Source: "web_ui", // Manual trigger from UI + } + + err = h.commandQueries.CreateCommand(command) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create command"}) + return + } + + c.JSON(http.StatusOK, gin.H{ + "message": "Subsystem scan triggered successfully", + "command_id": command.ID, + }) +} + +// GetSubsystemStats retrieves statistics for a subsystem +// GET /api/v1/agents/:id/subsystems/:subsystem/stats +func (h *SubsystemHandler) GetSubsystemStats(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + stats, err := h.subsystemQueries.GetSubsystemStats(agentID, subsystem) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to retrieve subsystem stats"}) + return + } + + if stats == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + + c.JSON(http.StatusOK, stats) +} + +// SetAutoRun enables or disables auto-run for a subsystem +// POST /api/v1/agents/:id/subsystems/:subsystem/auto-run +func (h *SubsystemHandler) SetAutoRun(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + var req struct { + AutoRun bool `json:"auto_run"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + err = h.subsystemQueries.SetAutoRun(agentID, subsystem, req.AutoRun) + if err != nil { + if err.Error() == "subsystem not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update auto-run"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "Auto-run updated successfully"}) +} + +// SetInterval sets the interval for a subsystem +// POST /api/v1/agents/:id/subsystems/:subsystem/interval +func (h *SubsystemHandler) SetInterval(c *gin.Context) { + agentID, err := uuid.Parse(c.Param("id")) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid agent ID"}) + return + } + + subsystem := c.Param("subsystem") + if subsystem == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "Subsystem name required"}) + return + } + + var req struct { + IntervalMinutes int `json:"interval_minutes"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate interval + if req.IntervalMinutes < 5 || req.IntervalMinutes > 1440 { + c.JSON(http.StatusBadRequest, gin.H{"error": "Interval must be between 5 and 1440 minutes"}) + return + } + + err = h.subsystemQueries.SetInterval(agentID, subsystem, req.IntervalMinutes) + if err != nil { + if err.Error() == "subsystem not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "Subsystem not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to update interval"}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "Interval updated successfully"}) +} diff --git a/aggregator-server/internal/database/migrations/015_agent_subsystems.down.sql b/aggregator-server/internal/database/migrations/015_agent_subsystems.down.sql new file mode 100644 index 0000000..398a2d6 --- /dev/null +++ b/aggregator-server/internal/database/migrations/015_agent_subsystems.down.sql @@ -0,0 +1,17 @@ +-- Migration: 013_agent_subsystems (down) +-- Purpose: Rollback agent subsystems table +-- Version: 0.1.20 +-- Date: 2025-11-01 + +-- Drop trigger and function +DROP TRIGGER IF EXISTS trigger_create_default_subsystems ON agents; +DROP FUNCTION IF EXISTS create_default_subsystems(); + +-- Drop indexes +DROP INDEX IF EXISTS idx_agent_subsystems_lookup; +DROP INDEX IF EXISTS idx_agent_subsystems_subsystem; +DROP INDEX IF EXISTS idx_agent_subsystems_next_run; +DROP INDEX IF EXISTS idx_agent_subsystems_agent; + +-- Drop table +DROP TABLE IF EXISTS agent_subsystems; diff --git a/aggregator-server/internal/database/migrations/015_agent_subsystems.up.sql b/aggregator-server/internal/database/migrations/015_agent_subsystems.up.sql new file mode 100644 index 0000000..abb2098 --- /dev/null +++ b/aggregator-server/internal/database/migrations/015_agent_subsystems.up.sql @@ -0,0 +1,81 @@ +-- Migration: 013_agent_subsystems +-- Purpose: Add agent subsystems table for granular command scheduling and management +-- Version: 0.1.20 +-- Date: 2025-11-01 + +-- Create agent_subsystems table for tracking individual subsystem configurations per agent +CREATE TABLE IF NOT EXISTS agent_subsystems ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + subsystem VARCHAR(50) NOT NULL, + enabled BOOLEAN DEFAULT true, + interval_minutes INTEGER DEFAULT 15, + auto_run BOOLEAN DEFAULT false, + last_run_at TIMESTAMP, + next_run_at TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + UNIQUE(agent_id, subsystem) +); + +-- Create indexes for efficient querying +CREATE INDEX IF NOT EXISTS idx_agent_subsystems_agent ON agent_subsystems(agent_id); +CREATE INDEX IF NOT EXISTS idx_agent_subsystems_next_run ON agent_subsystems(next_run_at) + WHERE enabled = true AND auto_run = true; +CREATE INDEX IF NOT EXISTS idx_agent_subsystems_subsystem ON agent_subsystems(subsystem); + +-- Create a composite index for common queries (agent + subsystem) +CREATE INDEX IF NOT EXISTS idx_agent_subsystems_lookup ON agent_subsystems(agent_id, subsystem, enabled); + +-- Default subsystems for existing agents +-- Only insert for agents that don't already have subsystems configured +INSERT INTO agent_subsystems (agent_id, subsystem, enabled, interval_minutes, auto_run) +SELECT id, 'updates', true, 15, false FROM agents +WHERE NOT EXISTS ( + SELECT 1 FROM agent_subsystems WHERE agent_subsystems.agent_id = agents.id AND subsystem = 'updates' +) +UNION ALL +SELECT id, 'storage', true, 15, false FROM agents +WHERE NOT EXISTS ( + SELECT 1 FROM agent_subsystems WHERE agent_subsystems.agent_id = agents.id AND subsystem = 'storage' +) +UNION ALL +SELECT id, 'system', true, 30, false FROM agents +WHERE NOT EXISTS ( + SELECT 1 FROM agent_subsystems WHERE agent_subsystems.agent_id = agents.id AND subsystem = 'system' +) +UNION ALL +SELECT id, 'docker', false, 15, false FROM agents +WHERE NOT EXISTS ( + SELECT 1 FROM agent_subsystems WHERE agent_subsystems.agent_id = agents.id AND subsystem = 'docker' +); + +-- Create trigger to automatically insert default subsystems for new agents +CREATE OR REPLACE FUNCTION create_default_subsystems() +RETURNS TRIGGER AS $$ +BEGIN + -- Insert default subsystems for new agent + INSERT INTO agent_subsystems (agent_id, subsystem, enabled, interval_minutes, auto_run) + VALUES + (NEW.id, 'updates', true, 15, false), + (NEW.id, 'storage', true, 15, false), + (NEW.id, 'system', true, 30, false), + (NEW.id, 'docker', false, 15, false); + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_create_default_subsystems + AFTER INSERT ON agents + FOR EACH ROW + EXECUTE FUNCTION create_default_subsystems(); + +-- Add comment for documentation +COMMENT ON TABLE agent_subsystems IS 'Per-agent subsystem configurations for granular command scheduling'; +COMMENT ON COLUMN agent_subsystems.subsystem IS 'Subsystem name: updates, storage, system, docker'; +COMMENT ON COLUMN agent_subsystems.enabled IS 'Whether this subsystem is enabled for the agent'; +COMMENT ON COLUMN agent_subsystems.interval_minutes IS 'How often to run this subsystem (in minutes)'; +COMMENT ON COLUMN agent_subsystems.auto_run IS 'Whether the server should auto-schedule this subsystem'; +COMMENT ON COLUMN agent_subsystems.last_run_at IS 'Last time this subsystem was executed'; +COMMENT ON COLUMN agent_subsystems.next_run_at IS 'Next scheduled run time for auto-run subsystems'; diff --git a/aggregator-server/internal/database/queries/agents.go b/aggregator-server/internal/database/queries/agents.go index 8fd8da6..2f009be 100644 --- a/aggregator-server/internal/database/queries/agents.go +++ b/aggregator-server/internal/database/queries/agents.go @@ -67,6 +67,20 @@ func (q *AgentQueries) UpdateAgent(agent *models.Agent) error { return err } +// UpdateAgentMetadata updates only the metadata, last_seen, and status fields +// Used for metrics updates to avoid overwriting version tracking +func (q *AgentQueries) UpdateAgentMetadata(id uuid.UUID, metadata models.JSONB, status string, lastSeen time.Time) error { + query := ` + UPDATE agents SET + last_seen = $1, + status = $2, + metadata = $3 + WHERE id = $4 + ` + _, err := q.db.Exec(query, lastSeen, status, metadata, id) + return err +} + // ListAgents returns all agents with optional filtering func (q *AgentQueries) ListAgents(status, osType string) ([]models.Agent, error) { var agents []models.Agent diff --git a/aggregator-server/internal/database/queries/subsystems.go b/aggregator-server/internal/database/queries/subsystems.go new file mode 100644 index 0000000..c328ffb --- /dev/null +++ b/aggregator-server/internal/database/queries/subsystems.go @@ -0,0 +1,293 @@ +package queries + +import ( + "database/sql" + "fmt" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/models" + "github.com/google/uuid" + "github.com/jmoiron/sqlx" +) + +type SubsystemQueries struct { + db *sqlx.DB +} + +func NewSubsystemQueries(db *sqlx.DB) *SubsystemQueries { + return &SubsystemQueries{db: db} +} + +// GetSubsystems retrieves all subsystems for an agent +func (q *SubsystemQueries) GetSubsystems(agentID uuid.UUID) ([]models.AgentSubsystem, error) { + query := ` + SELECT id, agent_id, subsystem, enabled, interval_minutes, auto_run, + last_run_at, next_run_at, created_at, updated_at + FROM agent_subsystems + WHERE agent_id = $1 + ORDER BY subsystem + ` + + var subsystems []models.AgentSubsystem + err := q.db.Select(&subsystems, query, agentID) + if err != nil { + return nil, fmt.Errorf("failed to get subsystems: %w", err) + } + + return subsystems, nil +} + +// GetSubsystem retrieves a specific subsystem for an agent +func (q *SubsystemQueries) GetSubsystem(agentID uuid.UUID, subsystem string) (*models.AgentSubsystem, error) { + query := ` + SELECT id, agent_id, subsystem, enabled, interval_minutes, auto_run, + last_run_at, next_run_at, created_at, updated_at + FROM agent_subsystems + WHERE agent_id = $1 AND subsystem = $2 + ` + + var sub models.AgentSubsystem + err := q.db.Get(&sub, query, agentID, subsystem) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get subsystem: %w", err) + } + + return &sub, nil +} + +// UpdateSubsystem updates a subsystem configuration +func (q *SubsystemQueries) UpdateSubsystem(agentID uuid.UUID, subsystem string, config models.SubsystemConfig) error { + // Build dynamic update query based on provided fields + updates := []string{} + args := []interface{}{agentID, subsystem} + argIdx := 3 + + if config.Enabled != nil { + updates = append(updates, fmt.Sprintf("enabled = $%d", argIdx)) + args = append(args, *config.Enabled) + argIdx++ + } + + if config.IntervalMinutes != nil { + updates = append(updates, fmt.Sprintf("interval_minutes = $%d", argIdx)) + args = append(args, *config.IntervalMinutes) + argIdx++ + } + + if config.AutoRun != nil { + updates = append(updates, fmt.Sprintf("auto_run = $%d", argIdx)) + args = append(args, *config.AutoRun) + argIdx++ + + // If enabling auto_run, calculate next_run_at + if *config.AutoRun { + updates = append(updates, fmt.Sprintf("next_run_at = NOW() + INTERVAL '%d minutes'", argIdx)) + } + } + + if len(updates) == 0 { + return fmt.Errorf("no fields to update") + } + + updates = append(updates, "updated_at = NOW()") + + query := fmt.Sprintf(` + UPDATE agent_subsystems + SET %s + WHERE agent_id = $1 AND subsystem = $2 + `, joinUpdates(updates)) + + result, err := q.db.Exec(query, args...) + if err != nil { + return fmt.Errorf("failed to update subsystem: %w", err) + } + + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rows == 0 { + return fmt.Errorf("subsystem not found") + } + + return nil +} + +// UpdateLastRun updates the last_run_at timestamp for a subsystem +func (q *SubsystemQueries) UpdateLastRun(agentID uuid.UUID, subsystem string) error { + query := ` + UPDATE agent_subsystems + SET last_run_at = NOW(), + next_run_at = CASE + WHEN auto_run THEN NOW() + (interval_minutes || ' minutes')::INTERVAL + ELSE next_run_at + END, + updated_at = NOW() + WHERE agent_id = $1 AND subsystem = $2 + ` + + result, err := q.db.Exec(query, agentID, subsystem) + if err != nil { + return fmt.Errorf("failed to update last run: %w", err) + } + + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rows == 0 { + return fmt.Errorf("subsystem not found") + } + + return nil +} + +// GetDueSubsystems retrieves all subsystems that are due to run +func (q *SubsystemQueries) GetDueSubsystems() ([]models.AgentSubsystem, error) { + query := ` + SELECT id, agent_id, subsystem, enabled, interval_minutes, auto_run, + last_run_at, next_run_at, created_at, updated_at + FROM agent_subsystems + WHERE enabled = true + AND auto_run = true + AND (next_run_at IS NULL OR next_run_at <= NOW()) + ORDER BY next_run_at ASC NULLS FIRST + LIMIT 1000 + ` + + var subsystems []models.AgentSubsystem + err := q.db.Select(&subsystems, query) + if err != nil { + return nil, fmt.Errorf("failed to get due subsystems: %w", err) + } + + return subsystems, nil +} + +// GetSubsystemStats retrieves statistics for a subsystem +func (q *SubsystemQueries) GetSubsystemStats(agentID uuid.UUID, subsystem string) (*models.SubsystemStats, error) { + query := ` + SELECT + s.subsystem, + s.enabled, + s.last_run_at, + s.next_run_at, + s.interval_minutes, + s.auto_run, + COUNT(c.id) FILTER (WHERE c.command_type = 'scan_' || s.subsystem) as run_count, + MAX(c.status) FILTER (WHERE c.command_type = 'scan_' || s.subsystem) as last_status, + MAX(al.duration_seconds) FILTER (WHERE al.action = 'scan_' || s.subsystem) as last_duration + FROM agent_subsystems s + LEFT JOIN agent_commands c ON c.agent_id = s.agent_id + LEFT JOIN agent_logs al ON al.command_id = c.id + WHERE s.agent_id = $1 AND s.subsystem = $2 + GROUP BY s.subsystem, s.enabled, s.last_run_at, s.next_run_at, s.interval_minutes, s.auto_run + ` + + var stats models.SubsystemStats + err := q.db.Get(&stats, query, agentID, subsystem) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get subsystem stats: %w", err) + } + + return &stats, nil +} + +// EnableSubsystem enables a subsystem +func (q *SubsystemQueries) EnableSubsystem(agentID uuid.UUID, subsystem string) error { + enabled := true + return q.UpdateSubsystem(agentID, subsystem, models.SubsystemConfig{ + Enabled: &enabled, + }) +} + +// DisableSubsystem disables a subsystem +func (q *SubsystemQueries) DisableSubsystem(agentID uuid.UUID, subsystem string) error { + enabled := false + return q.UpdateSubsystem(agentID, subsystem, models.SubsystemConfig{ + Enabled: &enabled, + }) +} + +// SetAutoRun enables or disables auto-run for a subsystem +func (q *SubsystemQueries) SetAutoRun(agentID uuid.UUID, subsystem string, autoRun bool) error { + return q.UpdateSubsystem(agentID, subsystem, models.SubsystemConfig{ + AutoRun: &autoRun, + }) +} + +// SetInterval sets the interval for a subsystem +func (q *SubsystemQueries) SetInterval(agentID uuid.UUID, subsystem string, intervalMinutes int) error { + return q.UpdateSubsystem(agentID, subsystem, models.SubsystemConfig{ + IntervalMinutes: &intervalMinutes, + }) +} + +// CreateSubsystem creates a new subsystem configuration (used for custom subsystems) +func (q *SubsystemQueries) CreateSubsystem(sub *models.AgentSubsystem) error { + query := ` + INSERT INTO agent_subsystems (agent_id, subsystem, enabled, interval_minutes, auto_run, last_run_at, next_run_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id, created_at, updated_at + ` + + err := q.db.QueryRow( + query, + sub.AgentID, + sub.Subsystem, + sub.Enabled, + sub.IntervalMinutes, + sub.AutoRun, + sub.LastRunAt, + sub.NextRunAt, + ).Scan(&sub.ID, &sub.CreatedAt, &sub.UpdatedAt) + + if err != nil { + return fmt.Errorf("failed to create subsystem: %w", err) + } + + return nil +} + +// DeleteSubsystem deletes a subsystem configuration +func (q *SubsystemQueries) DeleteSubsystem(agentID uuid.UUID, subsystem string) error { + query := ` + DELETE FROM agent_subsystems + WHERE agent_id = $1 AND subsystem = $2 + ` + + result, err := q.db.Exec(query, agentID, subsystem) + if err != nil { + return fmt.Errorf("failed to delete subsystem: %w", err) + } + + rows, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rows == 0 { + return fmt.Errorf("subsystem not found") + } + + return nil +} + +// Helper function to join update statements +func joinUpdates(updates []string) string { + result := "" + for i, update := range updates { + if i > 0 { + result += ", " + } + result += update + } + return result +} diff --git a/aggregator-server/internal/models/subsystem.go b/aggregator-server/internal/models/subsystem.go new file mode 100644 index 0000000..7abe46c --- /dev/null +++ b/aggregator-server/internal/models/subsystem.go @@ -0,0 +1,51 @@ +package models + +import ( + "time" + + "github.com/google/uuid" +) + +// AgentSubsystem represents a subsystem configuration for an agent +type AgentSubsystem struct { + ID uuid.UUID `json:"id" db:"id"` + AgentID uuid.UUID `json:"agent_id" db:"agent_id"` + Subsystem string `json:"subsystem" db:"subsystem"` + Enabled bool `json:"enabled" db:"enabled"` + IntervalMinutes int `json:"interval_minutes" db:"interval_minutes"` + AutoRun bool `json:"auto_run" db:"auto_run"` + LastRunAt *time.Time `json:"last_run_at,omitempty" db:"last_run_at"` + NextRunAt *time.Time `json:"next_run_at,omitempty" db:"next_run_at"` + CreatedAt time.Time `json:"created_at" db:"created_at"` + UpdatedAt time.Time `json:"updated_at" db:"updated_at"` +} + +// SubsystemType represents the type of subsystem +type SubsystemType string + +const ( + SubsystemUpdates SubsystemType = "updates" + SubsystemStorage SubsystemType = "storage" + SubsystemSystem SubsystemType = "system" + SubsystemDocker SubsystemType = "docker" +) + +// SubsystemConfig represents the configuration for updating a subsystem +type SubsystemConfig struct { + Enabled *bool `json:"enabled,omitempty"` + IntervalMinutes *int `json:"interval_minutes,omitempty"` + AutoRun *bool `json:"auto_run,omitempty"` +} + +// SubsystemStats provides statistics about a subsystem's execution +type SubsystemStats struct { + Subsystem string `json:"subsystem"` + Enabled bool `json:"enabled"` + LastRunAt *time.Time `json:"last_run_at,omitempty"` + NextRunAt *time.Time `json:"next_run_at,omitempty"` + IntervalMinutes int `json:"interval_minutes"` + AutoRun bool `json:"auto_run"` + RunCount int `json:"run_count"` // Total runs + LastStatus string `json:"last_status"` // Last command status + LastDuration int `json:"last_duration"` // Last run duration in seconds +} diff --git a/aggregator-web/src/components/AgentScanners.tsx b/aggregator-web/src/components/AgentScanners.tsx index 95e6698..ae4ab31 100644 --- a/aggregator-web/src/components/AgentScanners.tsx +++ b/aggregator-web/src/components/AgentScanners.tsx @@ -1,5 +1,5 @@ import React, { useState } from 'react'; -import { useMutation } from '@tanstack/react-query'; +import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'; import { MonitorPlay, RefreshCw, @@ -13,162 +13,136 @@ import { Database, Shield, Search, + HardDrive, + Cpu, + Container, + Package, } from 'lucide-react'; import { formatRelativeTime } from '@/lib/utils'; import { agentApi } from '@/lib/api'; import toast from 'react-hot-toast'; import { cn } from '@/lib/utils'; +import { AgentSubsystem } from '@/types'; interface AgentScannersProps { agentId: string; } -interface ScannerConfig { - id: string; - name: string; - description: string; - icon: React.ReactNode; - enabled: boolean; - frequency: number; // minutes - last_run?: string; - next_run?: string; - status: 'idle' | 'running' | 'completed' | 'failed'; - category: 'storage' | 'security' | 'system' | 'network'; -} - -interface ScannerResponse { - scanner_id: string; - status: string; - message: string; - next_run?: string; -} +// Map subsystem types to icons and display names +const subsystemConfig: Record = { + updates: { + icon: , + name: 'Package Update Scanner', + description: 'Scans for available package updates (APT, DNF, Windows Update, etc.)', + category: 'system', + }, + storage: { + icon: , + name: 'Disk Usage Reporter', + description: 'Reports disk usage metrics and storage availability', + category: 'storage', + }, + system: { + icon: , + name: 'System Metrics Scanner', + description: 'Reports CPU, memory, processes, and system uptime', + category: 'system', + }, + docker: { + icon: , + name: 'Docker Image Scanner', + description: 'Scans Docker containers for available image updates', + category: 'system', + }, +}; export function AgentScanners({ agentId }: AgentScannersProps) { - // Mock agent health monitoring configs - in real implementation, these would come from the backend - const [scanners, setScanners] = useState([ - { - id: 'disk-reporter', - name: 'Disk Usage Reporter', - description: 'Agent reports disk usage metrics to server', - icon: , - enabled: true, - frequency: 15, // 15 minutes - last_run: new Date(Date.now() - 10 * 60 * 1000).toISOString(), // 10 minutes ago - status: 'completed', - category: 'storage', - }, - { - id: 'docker-check', - name: 'Docker Check-in', - description: 'Agent checks for Docker container status', - icon: , - enabled: true, - frequency: 60, // 1 hour - last_run: new Date(Date.now() - 45 * 60 * 1000).toISOString(), // 45 minutes ago - status: 'completed', - category: 'system', - }, - { - id: 'security-check', - name: 'Security Check-in (Coming Soon)', - description: 'CVE scanning & security advisory checks - not yet implemented', - icon: , - enabled: false, - frequency: 240, // 4 hours - status: 'idle', - category: 'security', - }, - { - id: 'agent-heartbeat', - name: 'Agent Heartbeat', - description: 'Agent check-in interval and health reporting', - icon: , - enabled: true, - frequency: 30, // 30 minutes - last_run: new Date(Date.now() - 5 * 60 * 1000).toISOString(), // 5 minutes ago - status: 'running', - category: 'system', - }, - ]); + const queryClient = useQueryClient(); - // Toggle scanner mutation - const toggleScannerMutation = useMutation({ - mutationFn: async ({ scannerId, enabled, frequency }: { scannerId: string; enabled: boolean; frequency: number }) => { - const response = await agentApi.toggleScanner(agentId, scannerId, enabled, frequency); - return response; + // Fetch subsystems from API + const { data: subsystems = [], isLoading, refetch } = useQuery({ + queryKey: ['subsystems', agentId], + queryFn: async () => { + const data = await agentApi.getSubsystems(agentId); + return data; }, - onSuccess: (data: ScannerResponse, variables) => { - toast.success(`Scanner ${variables.enabled ? 'enabled' : 'disabled'} successfully`); - // Update local state - setScanners(prev => prev.map(scanner => - scanner.id === variables.scannerId - ? { - ...scanner, - enabled: variables.enabled, - frequency: variables.frequency, - status: variables.enabled ? 'idle' : 'disabled' as any, - next_run: data.next_run - } - : scanner - )); + refetchInterval: 30000, // Refresh every 30 seconds + }); + + // Toggle subsystem enabled/disabled + const toggleSubsystemMutation = useMutation({ + mutationFn: async ({ subsystem, enabled }: { subsystem: string; enabled: boolean }) => { + if (enabled) { + return await agentApi.enableSubsystem(agentId, subsystem); + } else { + return await agentApi.disableSubsystem(agentId, subsystem); + } + }, + onSuccess: (data, variables) => { + toast.success(`${subsystemConfig[variables.subsystem]?.name || variables.subsystem} ${variables.enabled ? 'enabled' : 'disabled'}`); + queryClient.invalidateQueries({ queryKey: ['subsystems', agentId] }); }, onError: (error: any, variables) => { - toast.error(`Failed to ${variables.enabled ? 'enable' : 'disable'} scanner: ${error.message || 'Unknown error'}`); + toast.error(`Failed to ${variables.enabled ? 'enable' : 'disable'} subsystem: ${error.response?.data?.error || error.message}`); }, }); - // Run scanner mutation - const runScannerMutation = useMutation({ - mutationFn: async (scannerId: string) => { - const response = await agentApi.runScanner(agentId, scannerId); - return response; + // Update subsystem interval + const updateIntervalMutation = useMutation({ + mutationFn: async ({ subsystem, intervalMinutes }: { subsystem: string; intervalMinutes: number }) => { + return await agentApi.setSubsystemInterval(agentId, subsystem, intervalMinutes); }, - onSuccess: (data: ScannerResponse, scannerId) => { - toast.success('Scanner execution initiated'); - // Update local state - setScanners(prev => prev.map(scanner => - scanner.id === scannerId - ? { ...scanner, status: 'running', last_run: new Date().toISOString() } - : scanner - )); + onSuccess: (data, variables) => { + toast.success(`Interval updated to ${variables.intervalMinutes} minutes`); + queryClient.invalidateQueries({ queryKey: ['subsystems', agentId] }); }, onError: (error: any) => { - toast.error(`Failed to run scanner: ${error.message || 'Unknown error'}`); + toast.error(`Failed to update interval: ${error.response?.data?.error || error.message}`); }, }); - const handleToggleScanner = (scannerId: string, enabled: boolean, frequency: number) => { - toggleScannerMutation.mutate({ scannerId, enabled, frequency }); + // Toggle auto-run + const toggleAutoRunMutation = useMutation({ + mutationFn: async ({ subsystem, autoRun }: { subsystem: string; autoRun: boolean }) => { + return await agentApi.setSubsystemAutoRun(agentId, subsystem, autoRun); + }, + onSuccess: (data, variables) => { + toast.success(`Auto-run ${variables.autoRun ? 'enabled' : 'disabled'}`); + queryClient.invalidateQueries({ queryKey: ['subsystems', agentId] }); + }, + onError: (error: any) => { + toast.error(`Failed to toggle auto-run: ${error.response?.data?.error || error.message}`); + }, + }); + + // Trigger manual scan + const triggerScanMutation = useMutation({ + mutationFn: async (subsystem: string) => { + return await agentApi.triggerSubsystem(agentId, subsystem); + }, + onSuccess: (data, subsystem) => { + toast.success(`${subsystemConfig[subsystem]?.name || subsystem} scan triggered`); + queryClient.invalidateQueries({ queryKey: ['subsystems', agentId] }); + }, + onError: (error: any) => { + toast.error(`Failed to trigger scan: ${error.response?.data?.error || error.message}`); + }, + }); + + const handleToggleEnabled = (subsystem: string, currentEnabled: boolean) => { + toggleSubsystemMutation.mutate({ subsystem, enabled: !currentEnabled }); }; - const handleRunScanner = (scannerId: string) => { - runScannerMutation.mutate(scannerId); + const handleIntervalChange = (subsystem: string, intervalMinutes: number) => { + updateIntervalMutation.mutate({ subsystem, intervalMinutes }); }; - const handleFrequencyChange = (scannerId: string, frequency: number) => { - const scanner = scanners.find(s => s.id === scannerId); - if (scanner) { - handleToggleScanner(scannerId, scanner.enabled, frequency); - } + const handleToggleAutoRun = (subsystem: string, currentAutoRun: boolean) => { + toggleAutoRunMutation.mutate({ subsystem, autoRun: !currentAutoRun }); }; - const getStatusIcon = (status: string) => { - switch (status) { - case 'running': - return ; - case 'completed': - return ; - case 'failed': - return ; - default: - return ; - } - }; - - const getFrequencyLabel = (frequency: number) => { - if (frequency < 60) return `${frequency}m`; - if (frequency < 1440) return `${frequency / 60}h`; - return `${frequency / 1440}d`; + const handleTriggerScan = (subsystem: string) => { + triggerScanMutation.mutate(subsystem); }; const frequencyOptions = [ @@ -181,9 +155,14 @@ export function AgentScanners({ agentId }: AgentScannersProps) { { value: 1440, label: '24 hours' }, ]; - const enabledCount = scanners.filter(s => s.enabled).length; - const runningCount = scanners.filter(s => s.status === 'running').length; - const failedCount = scanners.filter(s => s.status === 'failed').length; + const getFrequencyLabel = (frequency: number) => { + if (frequency < 60) return `${frequency}m`; + if (frequency < 1440) return `${frequency / 60}h`; + return `${frequency / 1440}d`; + }; + + const enabledCount = subsystems.filter(s => s.enabled).length; + const autoRunCount = subsystems.filter(s => s.auto_run && s.enabled).length; return (
@@ -192,114 +171,176 @@ export function AgentScanners({ agentId }: AgentScannersProps) {
- Active: - {enabledCount}/{scanners.length} + Enabled: + {enabledCount}/{subsystems.length}
- Running: - {runningCount} + Auto-Run: + {autoRunCount}
- {failedCount > 0 && ( -
- Failed: - {failedCount} -
- )}
+
- {/* Agent Health Monitoring Table */} + {/* Subsystem Configuration Table */}
-

Agent Check-in Configuration

+

Subsystem Configuration

-
- - - - - - - - - - - - - - {scanners.map((scanner) => ( - - {/* Scanner Name */} - - - {/* Category */} - - - {/* Status */} - - - {/* Enabled Toggle */} - - - {/* Frequency */} - - - {/* Last Run */} - - - {/* Actions */} - + {isLoading ? ( +
+ + Loading subsystems... +
+ ) : subsystems.length === 0 ? ( +
+ +

No subsystems found

+

+ Subsystems will be created automatically when the agent checks in. +

+
+ ) : ( +
+
Check TypeCategoryStatusEnabledCheck IntervalLast CheckActions
-
- {scanner.icon} -
-
{scanner.name}
-
{scanner.description}
-
-
-
{scanner.category} -
- {getStatusIcon(scanner.status)} - - {scanner.status} - -
-
- - {scanner.enabled ? 'ON' : 'OFF'} - - - {scanner.enabled ? ( - {getFrequencyLabel(scanner.frequency)} - ) : ( - - - )} - - {scanner.last_run ? formatRelativeTime(scanner.last_run) : '-'} - - Auto -
+ + + + + + + + + + - ))} - -
SubsystemCategoryEnabledAuto-RunIntervalLast RunNext RunActions
-
+ + + {subsystems.map((subsystem: AgentSubsystem) => { + const config = subsystemConfig[subsystem.subsystem] || { + icon: , + name: subsystem.subsystem, + description: 'Custom subsystem', + category: 'system', + }; + + return ( + + {/* Subsystem Name */} + +
+ {config.icon} +
+
{config.name}
+
{config.description}
+
+
+ + + {/* Category */} + {config.category} + + {/* Enabled Toggle */} + + + + + {/* Auto-Run Toggle */} + + + + + {/* Interval Selector */} + + {subsystem.enabled ? ( + + ) : ( + - + )} + + + {/* Last Run */} + + {subsystem.last_run_at ? formatRelativeTime(subsystem.last_run_at) : '-'} + + + {/* Next Run */} + + {subsystem.next_run_at && subsystem.auto_run ? formatRelativeTime(subsystem.next_run_at) : '-'} + + + {/* Actions */} + + + + + ); + })} + + +
+ )} {/* Compact note */}
- Agent check-ins report system state to the server on scheduled intervals. The agent initiates all communication - the server never "scans" your machine. + Subsystems report specific metrics to the server on scheduled intervals. Enable auto-run to schedule automatic scans, or trigger manual scans as needed.
); diff --git a/aggregator-web/src/components/ChatTimeline.tsx b/aggregator-web/src/components/ChatTimeline.tsx index 7a829c6..b84db99 100644 --- a/aggregator-web/src/components/ChatTimeline.tsx +++ b/aggregator-web/src/components/ChatTimeline.tsx @@ -15,6 +15,9 @@ import { Activity, Copy, Hash, + HardDrive, + Cpu, + Container, } from 'lucide-react'; import { useQuery } from '@tanstack/react-query'; import { logApi } from '@/lib/api'; @@ -243,6 +246,12 @@ const ChatTimeline: React.FC = ({ agentId, className, isScope switch (action) { case 'scan_updates': return ; + case 'scan_storage': + return ; + case 'scan_system': + return ; + case 'scan_docker': + return ; case 'dry_run_update': return ; case 'confirm_dependencies': @@ -444,17 +453,47 @@ const ChatTimeline: React.FC = ({ agentId, className, isScope let sentence = ''; const isInProgress = result === 'running' || result === 'pending' || result === 'sent'; - + if (entry.type === 'command') { if (action === 'scan updates') { if (isInProgress) { - sentence = `Scan initiated for '${subject}'`; + sentence = `Package Update Scanner initiated`; } else if (statusType === 'success') { - sentence = `Scan completed for '${subject}'`; + sentence = `Package Update Scanner completed`; } else if (statusType === 'failed') { - sentence = `Scan failed for '${subject}'`; + sentence = `Package Update Scanner failed`; } else { - sentence = `Scan results for '${subject}'`; + sentence = `Package Update Scanner results`; + } + } else if (action === 'scan storage') { + if (isInProgress) { + sentence = `Disk Usage Reporter initiated`; + } else if (statusType === 'success') { + sentence = `Disk Usage Reporter completed`; + } else if (statusType === 'failed') { + sentence = `Disk Usage Reporter failed`; + } else { + sentence = `Disk Usage Reporter results`; + } + } else if (action === 'scan system') { + if (isInProgress) { + sentence = `System Metrics Scanner initiated`; + } else if (statusType === 'success') { + sentence = `System Metrics Scanner completed`; + } else if (statusType === 'failed') { + sentence = `System Metrics Scanner failed`; + } else { + sentence = `System Metrics Scanner results`; + } + } else if (action === 'scan docker') { + if (isInProgress) { + sentence = `Docker Image Scanner initiated`; + } else if (statusType === 'success') { + sentence = `Docker Image Scanner completed`; + } else if (statusType === 'failed') { + sentence = `Docker Image Scanner failed`; + } else { + sentence = `Docker Image Scanner results`; } } else if (action === 'dry run update') { if (isInProgress) { @@ -763,8 +802,20 @@ const ChatTimeline: React.FC = ({ agentId, className, isScope {entry.stdout && (

- - {entry.action === 'scan_updates' ? 'Analysis Results' : 'Operation Details'} + {entry.action === 'scan_storage' ? ( + + ) : entry.action === 'scan_system' ? ( + + ) : entry.action === 'scan_docker' ? ( + + ) : ( + + )} + {entry.action === 'scan_updates' ? 'Package Analysis Results' : + entry.action === 'scan_storage' ? 'Disk Usage Report' : + entry.action === 'scan_system' ? 'System Metrics Report' : + entry.action === 'scan_docker' ? 'Docker Image Analysis' : + 'Operation Details'}

{(() => { @@ -837,6 +888,71 @@ const ChatTimeline: React.FC = ({ agentId, className, isScope } }); } + } else if (entry.action === 'scan_storage') { + // Parse storage/disk usage information + // Look for disk metrics in the stdout + const diskLines = stdout.split('\n'); + diskLines.forEach(line => { + // Match patterns like "Mount: /dev/sda1" or "Usage: 85%" + const mountMatch = line.match(/(?:Mount|Filesystem|Path):\s*([^\s]+)/i); + const usageMatch = line.match(/(?:Usage|Used):\s*(\d+\.?\d*%?)/i); + const sizeMatch = line.match(/(?:Size|Total):\s*([^\s]+)/i); + const availMatch = line.match(/(?:Available|Free):\s*([^\s]+)/i); + + if (mountMatch || usageMatch || sizeMatch || availMatch) { + if (mountMatch) details.push({ label: "Mount Point", value: mountMatch[1] }); + if (usageMatch) details.push({ label: "Usage", value: usageMatch[1] }); + if (sizeMatch) details.push({ label: "Total Size", value: sizeMatch[1] }); + if (availMatch) details.push({ label: "Available", value: availMatch[1] }); + } + }); + } else if (entry.action === 'scan_system') { + // Parse system metrics (CPU, memory, processes, uptime) + const cpuMatch = stdout.match(/(?:CPU|Processor):\s*([^\n]+)/i); + if (cpuMatch) { + details.push({ label: "CPU", value: cpuMatch[1].trim() }); + } + + const memoryMatch = stdout.match(/(?:Memory|RAM):\s*([^\n]+)/i); + if (memoryMatch) { + details.push({ label: "Memory", value: memoryMatch[1].trim() }); + } + + const processMatch = stdout.match(/(?:Processes|Process Count):\s*(\d+)/i); + if (processMatch) { + details.push({ label: "Running Processes", value: processMatch[1] }); + } + + const uptimeMatch = stdout.match(/(?:Uptime|Up Time):\s*([^\n]+)/i); + if (uptimeMatch) { + details.push({ label: "System Uptime", value: uptimeMatch[1].trim() }); + } + + const loadMatch = stdout.match(/(?:Load Average|Load):\s*([^\n]+)/i); + if (loadMatch) { + details.push({ label: "Load Average", value: loadMatch[1].trim() }); + } + } else if (entry.action === 'scan_docker') { + // Parse Docker image/container information + const containerCountMatch = stdout.match(/(?:Containers|Container Count):\s*(\d+)/i); + if (containerCountMatch) { + details.push({ label: "Containers", value: containerCountMatch[1] }); + } + + const imageCountMatch = stdout.match(/(?:Images|Image Count):\s*(\d+)/i); + if (imageCountMatch) { + details.push({ label: "Images", value: imageCountMatch[1] }); + } + + const updateCountMatch = stdout.match(/(?:Updates Available|Updatable Images):\s*(\d+)/i); + if (updateCountMatch) { + details.push({ label: "Updates Available", value: updateCountMatch[1] }); + } + + const runningMatch = stdout.match(/(?:Running Containers):\s*(\d+)/i); + if (runningMatch) { + details.push({ label: "Running", value: runningMatch[1] }); + } } // Extract "Packages installed" info diff --git a/aggregator-web/src/lib/api.ts b/aggregator-web/src/lib/api.ts index f0e40c7..cfc347d 100644 --- a/aggregator-web/src/lib/api.ts +++ b/aggregator-web/src/lib/api.ts @@ -21,7 +21,10 @@ import { RateLimitConfig, RateLimitStats, RateLimitUsage, - RateLimitSummary + RateLimitSummary, + AgentSubsystem, + SubsystemConfig, + SubsystemStats } from '@/types'; // Base URL for API - use nginx proxy @@ -111,6 +114,52 @@ export const agentApi = { unregisterAgent: async (id: string): Promise => { await api.delete(`/agents/${id}`); }, + + // Subsystem Management + getSubsystems: async (agentId: string): Promise => { + const response = await api.get(`/agents/${agentId}/subsystems`); + return response.data; + }, + + getSubsystem: async (agentId: string, subsystem: string): Promise => { + const response = await api.get(`/agents/${agentId}/subsystems/${subsystem}`); + return response.data; + }, + + updateSubsystem: async (agentId: string, subsystem: string, config: SubsystemConfig): Promise<{ message: string }> => { + const response = await api.patch(`/agents/${agentId}/subsystems/${subsystem}`, config); + return response.data; + }, + + enableSubsystem: async (agentId: string, subsystem: string): Promise<{ message: string }> => { + const response = await api.post(`/agents/${agentId}/subsystems/${subsystem}/enable`); + return response.data; + }, + + disableSubsystem: async (agentId: string, subsystem: string): Promise<{ message: string }> => { + const response = await api.post(`/agents/${agentId}/subsystems/${subsystem}/disable`); + return response.data; + }, + + triggerSubsystem: async (agentId: string, subsystem: string): Promise<{ message: string; command_id: string }> => { + const response = await api.post(`/agents/${agentId}/subsystems/${subsystem}/trigger`); + return response.data; + }, + + getSubsystemStats: async (agentId: string, subsystem: string): Promise => { + const response = await api.get(`/agents/${agentId}/subsystems/${subsystem}/stats`); + return response.data; + }, + + setSubsystemAutoRun: async (agentId: string, subsystem: string, autoRun: boolean): Promise<{ message: string }> => { + const response = await api.post(`/agents/${agentId}/subsystems/${subsystem}/auto-run`, { auto_run: autoRun }); + return response.data; + }, + + setSubsystemInterval: async (agentId: string, subsystem: string, intervalMinutes: number): Promise<{ message: string }> => { + const response = await api.post(`/agents/${agentId}/subsystems/${subsystem}/interval`, { interval_minutes: intervalMinutes }); + return response.data; + }, }; export const updateApi = { diff --git a/aggregator-web/src/types/index.ts b/aggregator-web/src/types/index.ts index ff325c9..5530fa6 100644 --- a/aggregator-web/src/types/index.ts +++ b/aggregator-web/src/types/index.ts @@ -367,4 +367,36 @@ export interface RateLimitSummary { total_requests_per_minute: number; most_active_endpoint: string; average_utilization: number; +} + +// Subsystem types +export interface AgentSubsystem { + id: string; + agent_id: string; + subsystem: 'updates' | 'storage' | 'system' | 'docker'; + enabled: boolean; + interval_minutes: number; + auto_run: boolean; + last_run_at: string | null; + next_run_at: string | null; + created_at: string; + updated_at: string; +} + +export interface SubsystemConfig { + enabled?: boolean; + interval_minutes?: number; + auto_run?: boolean; +} + +export interface SubsystemStats { + subsystem: string; + enabled: boolean; + last_run_at: string | null; + next_run_at: string | null; + interval_minutes: number; + auto_run: boolean; + run_count: number; + last_status: string; + last_duration: number; } \ No newline at end of file