diff --git a/aggregator-agent/cmd/agent/subsystem_handlers.go b/aggregator-agent/cmd/agent/subsystem_handlers.go index 9c51172..d58c330 100644 --- a/aggregator-agent/cmd/agent/subsystem_handlers.go +++ b/aggregator-agent/cmd/agent/subsystem_handlers.go @@ -121,19 +121,43 @@ func handleScanStorage(apiClient *client.Client, cfg *config.Config, ackTracker log.Printf("Failed to report scan log: %v\n", err) } - // Report "updates" (disk info) to server - if len(result.Updates) > 0 { - report := client.UpdateReport{ - CommandID: commandID, - Timestamp: time.Now(), - Updates: result.Updates, + // Report storage metrics to server using dedicated endpoint + // Get storage scanner and use proper interface + storageScanner := orchestrator.NewStorageScanner("unknown") // TODO: Get actual agent version + if storageScanner.IsAvailable() { + metrics, err := storageScanner.ScanStorage() + if err != nil { + return fmt.Errorf("failed to scan storage metrics: %w", err) } - if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil { - return fmt.Errorf("failed to report storage metrics: %w", err) - } + if len(metrics) > 0 { + // Convert StorageMetric to MetricsReportItem for API call + metricItems := make([]client.MetricsReportItem, 0, len(metrics)) + for _, metric := range metrics { + item := client.MetricsReportItem{ + PackageType: "storage", + PackageName: metric.Mountpoint, + CurrentVersion: fmt.Sprintf("%d bytes used", metric.UsedBytes), + AvailableVersion: fmt.Sprintf("%d bytes total", metric.TotalBytes), + Severity: metric.Severity, + RepositorySource: metric.Filesystem, + Metadata: metric.Metadata, + } + metricItems = append(metricItems, item) + } - log.Printf("✓ Reported %d disk mount points to server\n", len(result.Updates)) + report := client.MetricsReport{ + CommandID: commandID, + Timestamp: time.Now(), + Metrics: metricItems, + } + + if err := apiClient.ReportMetrics(cfg.AgentID, report); err != nil { + return fmt.Errorf("failed to report storage metrics: %w", err) + } + + log.Printf("✓ Reported %d storage metrics to server\n", len(metrics)) + } } return nil @@ -179,19 +203,43 @@ func handleScanSystem(apiClient *client.Client, cfg *config.Config, ackTracker * log.Printf("Failed to report scan log: %v\n", err) } - // Report "updates" (system metrics) to server - if len(result.Updates) > 0 { - report := client.UpdateReport{ - CommandID: commandID, - Timestamp: time.Now(), - Updates: result.Updates, + // Report system metrics to server using dedicated endpoint + // Get system scanner and use proper interface + systemScanner := orchestrator.NewSystemScanner("unknown") // TODO: Get actual agent version + if systemScanner.IsAvailable() { + metrics, err := systemScanner.ScanSystem() + if err != nil { + return fmt.Errorf("failed to scan system metrics: %w", err) } - if err := apiClient.ReportUpdates(cfg.AgentID, report); err != nil { - return fmt.Errorf("failed to report system metrics: %w", err) - } + if len(metrics) > 0 { + // Convert SystemMetric to MetricsReportItem for API call + metricItems := make([]client.MetricsReportItem, 0, len(metrics)) + for _, metric := range metrics { + item := client.MetricsReportItem{ + PackageType: "system", + PackageName: metric.MetricName, + CurrentVersion: metric.CurrentValue, + AvailableVersion: metric.AvailableValue, + Severity: metric.Severity, + RepositorySource: metric.MetricType, + Metadata: metric.Metadata, + } + metricItems = append(metricItems, item) + } - log.Printf("✓ Reported system metrics to server\n") + report := client.MetricsReport{ + CommandID: commandID, + Timestamp: time.Now(), + Metrics: metricItems, + } + + if err := apiClient.ReportMetrics(cfg.AgentID, report); err != nil { + return fmt.Errorf("failed to report system metrics: %w", err) + } + + log.Printf("✓ Reported %d system metrics to server\n", len(metrics)) + } } return nil diff --git a/aggregator-agent/internal/client/client.go b/aggregator-agent/internal/client/client.go index d70bf95..0157be6 100644 --- a/aggregator-agent/internal/client/client.go +++ b/aggregator-agent/internal/client/client.go @@ -334,6 +334,104 @@ func (c *Client) ReportUpdates(agentID uuid.UUID, report UpdateReport) error { return nil } +// MetricsReport represents metrics data (storage, system, CPU, memory) +type MetricsReport struct { + CommandID string `json:"command_id"` + Timestamp time.Time `json:"timestamp"` + Metrics []MetricsReportItem `json:"metrics"` +} + +// MetricsReportItem represents a single metric +type MetricsReportItem struct { + PackageType string `json:"package_type"` + PackageName string `json:"package_name"` + CurrentVersion string `json:"current_version"` + AvailableVersion string `json:"available_version"` + Severity string `json:"severity"` + RepositorySource string `json:"repository_source"` + Metadata map[string]interface{} `json:"metadata"` +} + +// ReportMetrics sends metrics data to the server +func (c *Client) ReportMetrics(agentID uuid.UUID, report MetricsReport) error { + url := fmt.Sprintf("%s/api/v1/agents/%s/metrics", c.baseURL, agentID) + + body, err := json.Marshal(report) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.token) + c.addMachineIDHeader(req) // Security: Validate machine binding (v0.1.22+) + + resp, err := c.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to report metrics: %s - %s", resp.Status, string(bodyBytes)) + } + + return nil +} + +// DockerReport represents Docker image information +type DockerReport struct { + CommandID string `json:"command_id"` + Timestamp time.Time `json:"timestamp"` + Images []DockerReportItem `json:"images"` +} + +// DockerReportItem represents a single Docker image +type DockerReportItem struct { + PackageType string `json:"package_type"` + PackageName string `json:"package_name"` + CurrentVersion string `json:"current_version"` + AvailableVersion string `json:"available_version"` + Severity string `json:"severity"` + RepositorySource string `json:"repository_source"` + Metadata map[string]interface{} `json:"metadata"` +} + +// ReportDockerImages sends Docker image information to the server +func (c *Client) ReportDockerImages(agentID uuid.UUID, report DockerReport) error { + url := fmt.Sprintf("%s/api/v1/agents/%s/docker-images", c.baseURL, agentID) + + body, err := json.Marshal(report) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.token) + c.addMachineIDHeader(req) // Security: Validate machine binding (v0.1.22+) + + resp, err := c.http.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to report docker images: %s - %s", resp.Status, string(bodyBytes)) + } + + return nil +} + // LogReport represents an execution log type LogReport struct { CommandID string `json:"command_id"` diff --git a/aggregator-agent/internal/orchestrator/scanner_types.go b/aggregator-agent/internal/orchestrator/scanner_types.go new file mode 100644 index 0000000..633cf3c --- /dev/null +++ b/aggregator-agent/internal/orchestrator/scanner_types.go @@ -0,0 +1,113 @@ +package orchestrator + +import ( + "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" + "github.com/Fimeg/RedFlag/aggregator-agent/internal/system" +) + +// StorageMetric represents a single storage/disk metric +type StorageMetric struct { + Mountpoint string `json:"mountpoint"` + Filesystem string `json:"filesystem"` + Device string `json:"device"` + DiskType string `json:"disk_type"` + TotalBytes int64 `json:"total_bytes"` + UsedBytes int64 `json:"used_bytes"` + AvailableBytes int64 `json:"available_bytes"` + UsedPercent float64 `json:"used_percent"` + IsRoot bool `json:"is_root"` + IsLargest bool `json:"is_largest"` + Severity string `json:"severity"` + Metadata map[string]interface{} `json:"metadata"` +} + +// SystemMetric represents a single system metric (CPU, memory, etc.) +type SystemMetric struct { + MetricName string `json:"metric_name"` + MetricType string `json:"metric_type"` // "cpu", "memory", "processes", "uptime", etc. + CurrentValue string `json:"current_value"` + AvailableValue string `json:"available_value"` + Severity string `json:"severity"` + Metadata map[string]interface{} `json:"metadata"` +} + +// DockerImage represents a single Docker image +type DockerImage struct { + ImageName string `json:"image_name"` + ImageTag string `json:"image_tag"` + ImageID string `json:"image_id"` + RepositorySource string `json:"repository_source"` + SizeBytes int64 `json:"size_bytes"` + CreatedAt string `json:"created_at"` + HasUpdate bool `json:"has_update"` + LatestImageID string `json:"latest_image_id"` + Severity string `json:"severity"` + Labels map[string]string `json:"labels"` + Metadata map[string]interface{} `json:"metadata"` +} + +// PackageUpdate represents an actual software package update (legacy, for package scanners only) +type PackageUpdate = client.UpdateReportItem + +// --- Scanner Interfaces --- + +// StorageScannerInterface handles storage/disk metrics scanning +type StorageScannerInterface interface { + IsAvailable() bool + ScanStorage() ([]StorageMetric, error) + Name() string +} + +// SystemScannerInterface handles system metrics scanning +type SystemScannerInterface interface { + IsAvailable() bool + ScanSystem() ([]SystemMetric, error) + Name() string +} + +// DockerScannerInterface handles Docker image scanning +type DockerScannerInterface interface { + IsAvailable() bool + ScanDocker() ([]DockerImage, error) + Name() string +} + +// PackageScannerInterface handles package update scanning (legacy) +type PackageScannerInterface interface { + IsAvailable() bool + ScanPackages() ([]PackageUpdate, error) + Name() string +} + +// --- Unified Scanner Types for Backwards Compatibility --- + +// ScannerType represents the type of data a scanner returns +type ScannerType string + +const ( + ScannerTypeStorage ScannerType = "storage" + ScannerTypeSystem ScannerType = "system" + ScannerTypeDocker ScannerType = "docker" + ScannerTypePackage ScannerType = "package" +) + +// TypedScannerResult represents the result of any type of scanner +type TypedScannerResult struct { + ScannerName string + ScannerType ScannerType + StorageData []StorageMetric + SystemData []SystemMetric + DockerData []DockerImage + PackageData []PackageUpdate + Error error + Duration int64 // milliseconds + Status string +} + +// TypedScanner is a unified interface that can return any type of data +type TypedScanner interface { + IsAvailable() bool + GetType() ScannerType + Scan() (TypedScannerResult, error) + Name() string +} \ No newline at end of file diff --git a/aggregator-agent/internal/orchestrator/storage_scanner.go b/aggregator-agent/internal/orchestrator/storage_scanner.go index 7f41ac6..09b3d22 100644 --- a/aggregator-agent/internal/orchestrator/storage_scanner.go +++ b/aggregator-agent/internal/orchestrator/storage_scanner.go @@ -2,6 +2,7 @@ package orchestrator import ( "fmt" + "time" "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" "github.com/Fimeg/RedFlag/aggregator-agent/internal/system" @@ -24,8 +25,8 @@ func (s *StorageScanner) IsAvailable() bool { return true } -// Scan collects disk usage information and returns it as "updates" for reporting -func (s *StorageScanner) Scan() ([]client.UpdateReportItem, error) { +// ScanStorage collects disk usage information and returns proper storage metrics +func (s *StorageScanner) ScanStorage() ([]StorageMetric, error) { sysInfo, err := system.GetSystemInfo(s.agentVersion) if err != nil { return nil, fmt.Errorf("failed to get system info: %w", err) @@ -35,41 +36,99 @@ func (s *StorageScanner) Scan() ([]client.UpdateReportItem, error) { return nil, fmt.Errorf("no disk information available") } - // Convert disk info to UpdateReportItem format for reporting - // This is a bit unconventional but allows us to use the existing reporting infrastructure - var items []client.UpdateReportItem + // Convert disk info to proper StorageMetric format + var metrics []StorageMetric for _, disk := range sysInfo.DiskInfo { - // Create a pseudo-update item for each disk - item := client.UpdateReportItem{ - PackageName: fmt.Sprintf("disk-%s", disk.Mountpoint), - CurrentVersion: fmt.Sprintf("%.1f%% used", disk.UsedPercent), - AvailableVersion: fmt.Sprintf("%d GB available", disk.Available/(1024*1024*1024)), - PackageType: "storage", - Severity: determineDiskSeverity(disk.UsedPercent), - PackageDescription: fmt.Sprintf("Disk: %s (%s) - %s", disk.Mountpoint, disk.Filesystem, disk.Device), + metric := StorageMetric{ + Mountpoint: disk.Mountpoint, + Filesystem: disk.Filesystem, + Device: disk.Device, + DiskType: disk.DiskType, + TotalBytes: disk.Total, + UsedBytes: disk.Used, + AvailableBytes: disk.Available, + UsedPercent: disk.UsedPercent, + IsRoot: disk.IsRoot, + IsLargest: disk.IsLargest, + Severity: determineDiskSeverity(disk.UsedPercent), Metadata: map[string]interface{}{ - "mountpoint": disk.Mountpoint, - "filesystem": disk.Filesystem, - "device": disk.Device, - "disk_type": disk.DiskType, - "total_bytes": disk.Total, - "used_bytes": disk.Used, - "available_bytes": disk.Available, - "used_percent": disk.UsedPercent, - "is_root": disk.IsRoot, - "is_largest": disk.IsLargest, + "agent_version": s.agentVersion, + "collected_at": sysInfo.Timestamp, }, } + metrics = append(metrics, metric) + } + + return metrics, nil +} + +// Name returns the scanner name +func (s *StorageScanner) Name() string { + return "Disk Usage Reporter" +} + +// --- Legacy Compatibility Methods --- + +// Scan collects disk usage information and returns it as "updates" for reporting (LEGACY) +// This method is kept for backwards compatibility with the old Scanner interface +func (s *StorageScanner) Scan() ([]client.UpdateReportItem, error) { + metrics, err := s.ScanStorage() + if err != nil { + return nil, err + } + + // Convert proper StorageMetric back to legacy UpdateReportItem format + var items []client.UpdateReportItem + + for _, metric := range metrics { + item := client.UpdateReportItem{ + PackageName: fmt.Sprintf("disk-%s", metric.Mountpoint), + CurrentVersion: fmt.Sprintf("%.1f%% used", metric.UsedPercent), + AvailableVersion: fmt.Sprintf("%d GB available", metric.AvailableBytes/(1024*1024*1024)), + PackageType: "storage", + Severity: metric.Severity, + PackageDescription: fmt.Sprintf("Disk: %s (%s) - %s", metric.Mountpoint, metric.Filesystem, metric.Device), + Metadata: metric.Metadata, + } items = append(items, item) } return items, nil } -// Name returns the scanner name -func (s *StorageScanner) Name() string { - return "Disk Usage Reporter" +// --- Typed Scanner Implementation --- + +// GetType returns the scanner type +func (s *StorageScanner) GetType() ScannerType { + return ScannerTypeStorage +} + +// ScanTyped returns typed results (new implementation) +func (s *StorageScanner) ScanTyped() (TypedScannerResult, error) { + startTime := time.Now() + defer func() { + // Duration will be set at the end + }() + + metrics, err := s.ScanStorage() + if err != nil { + return TypedScannerResult{ + ScannerName: s.Name(), + ScannerType: ScannerTypeStorage, + Error: err, + Status: "failed", + Duration: time.Since(startTime).Milliseconds(), + }, err + } + + return TypedScannerResult{ + ScannerName: s.Name(), + ScannerType: ScannerTypeStorage, + StorageData: metrics, + Status: "success", + Duration: time.Since(startTime).Milliseconds(), + }, nil } // determineDiskSeverity returns severity based on disk usage percentage diff --git a/aggregator-agent/internal/orchestrator/system_scanner.go b/aggregator-agent/internal/orchestrator/system_scanner.go index c9e6aaa..b613923 100644 --- a/aggregator-agent/internal/orchestrator/system_scanner.go +++ b/aggregator-agent/internal/orchestrator/system_scanner.go @@ -2,6 +2,7 @@ package orchestrator import ( "fmt" + "time" "github.com/Fimeg/RedFlag/aggregator-agent/internal/client" "github.com/Fimeg/RedFlag/aggregator-agent/internal/system" @@ -24,42 +25,38 @@ func (s *SystemScanner) IsAvailable() bool { return true } -// Scan collects system information and returns it as "updates" for reporting -func (s *SystemScanner) Scan() ([]client.UpdateReportItem, error) { +// ScanSystem collects system information and returns proper system metrics +func (s *SystemScanner) ScanSystem() ([]SystemMetric, error) { sysInfo, err := system.GetSystemInfo(s.agentVersion) if err != nil { return nil, fmt.Errorf("failed to get system info: %w", err) } - // Convert system info to UpdateReportItem format for reporting - var items []client.UpdateReportItem + // Convert system info to proper SystemMetric format + var metrics []SystemMetric - // CPU info item - cpuItem := client.UpdateReportItem{ - PackageName: "system-cpu", - CurrentVersion: fmt.Sprintf("%d cores, %d threads", sysInfo.CPUInfo.Cores, sysInfo.CPUInfo.Threads), - AvailableVersion: sysInfo.CPUInfo.ModelName, - PackageType: "system", - Severity: "low", - PackageDescription: fmt.Sprintf("CPU: %s", sysInfo.CPUInfo.ModelName), + // CPU info metric + cpuMetric := SystemMetric{ + MetricName: "system-cpu", + MetricType: "cpu", + CurrentValue: fmt.Sprintf("%d cores, %d threads", sysInfo.CPUInfo.Cores, sysInfo.CPUInfo.Threads), + AvailableValue: sysInfo.CPUInfo.ModelName, + Severity: "low", Metadata: map[string]interface{}{ "cpu_model": sysInfo.CPUInfo.ModelName, "cpu_cores": sysInfo.CPUInfo.Cores, "cpu_threads": sysInfo.CPUInfo.Threads, }, } - items = append(items, cpuItem) + metrics = append(metrics, cpuMetric) - // Memory info item - memItem := client.UpdateReportItem{ - PackageName: "system-memory", - CurrentVersion: fmt.Sprintf("%.1f%% used", sysInfo.MemoryInfo.UsedPercent), - AvailableVersion: fmt.Sprintf("%d GB total", sysInfo.MemoryInfo.Total/(1024*1024*1024)), - PackageType: "system", - Severity: determineMemorySeverity(sysInfo.MemoryInfo.UsedPercent), - PackageDescription: fmt.Sprintf("Memory: %.1f GB / %.1f GB used", - float64(sysInfo.MemoryInfo.Used)/(1024*1024*1024), - float64(sysInfo.MemoryInfo.Total)/(1024*1024*1024)), + // Memory info metric + memMetric := SystemMetric{ + MetricName: "system-memory", + MetricType: "memory", + CurrentValue: fmt.Sprintf("%.1f%% used", sysInfo.MemoryInfo.UsedPercent), + AvailableValue: fmt.Sprintf("%d GB total", sysInfo.MemoryInfo.Total/(1024*1024*1024)), + Severity: determineMemorySeverity(sysInfo.MemoryInfo.UsedPercent), Metadata: map[string]interface{}{ "memory_total": sysInfo.MemoryInfo.Total, "memory_used": sysInfo.MemoryInfo.Used, @@ -67,54 +64,51 @@ func (s *SystemScanner) Scan() ([]client.UpdateReportItem, error) { "memory_used_percent": sysInfo.MemoryInfo.UsedPercent, }, } - items = append(items, memItem) + metrics = append(metrics, memMetric) - // Process count item - processItem := client.UpdateReportItem{ - PackageName: "system-processes", - CurrentVersion: fmt.Sprintf("%d processes", sysInfo.RunningProcesses), - AvailableVersion: "n/a", - PackageType: "system", - Severity: "low", - PackageDescription: fmt.Sprintf("Running Processes: %d", sysInfo.RunningProcesses), + // Process count metric + processMetric := SystemMetric{ + MetricName: "system-processes", + MetricType: "processes", + CurrentValue: fmt.Sprintf("%d processes", sysInfo.RunningProcesses), + AvailableValue: "n/a", + Severity: "low", Metadata: map[string]interface{}{ "process_count": sysInfo.RunningProcesses, }, } - items = append(items, processItem) + metrics = append(metrics, processMetric) - // Uptime item - uptimeItem := client.UpdateReportItem{ - PackageName: "system-uptime", - CurrentVersion: sysInfo.Uptime, - AvailableVersion: "n/a", - PackageType: "system", - Severity: "low", - PackageDescription: fmt.Sprintf("System Uptime: %s", sysInfo.Uptime), + // Uptime metric + uptimeMetric := SystemMetric{ + MetricName: "system-uptime", + MetricType: "uptime", + CurrentValue: sysInfo.Uptime, + AvailableValue: "n/a", + Severity: "low", Metadata: map[string]interface{}{ "uptime": sysInfo.Uptime, }, } - items = append(items, uptimeItem) + metrics = append(metrics, uptimeMetric) - // Reboot required item (if applicable) + // Reboot required metric (if applicable) if sysInfo.RebootRequired { - rebootItem := client.UpdateReportItem{ - PackageName: "system-reboot", - CurrentVersion: "required", - AvailableVersion: "n/a", - PackageType: "system", - Severity: "important", - PackageDescription: fmt.Sprintf("Reboot Required: %s", sysInfo.RebootReason), + rebootMetric := SystemMetric{ + MetricName: "system-reboot", + MetricType: "reboot", + CurrentValue: "required", + AvailableValue: "n/a", + Severity: "important", Metadata: map[string]interface{}{ "reboot_required": true, "reboot_reason": sysInfo.RebootReason, }, } - items = append(items, rebootItem) + metrics = append(metrics, rebootMetric) } - return items, nil + return metrics, nil } // Name returns the scanner name @@ -122,6 +116,66 @@ func (s *SystemScanner) Name() string { return "System Metrics Reporter" } +// --- Legacy Compatibility Methods --- + +// Scan collects system information and returns it as "updates" for reporting (LEGACY) +// This method is kept for backwards compatibility with the old Scanner interface +func (s *SystemScanner) Scan() ([]client.UpdateReportItem, error) { + metrics, err := s.ScanSystem() + if err != nil { + return nil, err + } + + // Convert proper SystemMetric back to legacy UpdateReportItem format + var items []client.UpdateReportItem + + for _, metric := range metrics { + item := client.UpdateReportItem{ + PackageName: metric.MetricName, + CurrentVersion: metric.CurrentValue, + AvailableVersion: metric.AvailableValue, + PackageType: "system", + Severity: metric.Severity, + PackageDescription: fmt.Sprintf("System %s: %s", metric.MetricType, metric.MetricName), + Metadata: metric.Metadata, + } + items = append(items, item) + } + + return items, nil +} + +// --- Typed Scanner Implementation --- + +// GetType returns the scanner type +func (s *SystemScanner) GetType() ScannerType { + return ScannerTypeSystem +} + +// ScanTyped returns typed results (new implementation) +func (s *SystemScanner) ScanTyped() (TypedScannerResult, error) { + startTime := time.Now() + + metrics, err := s.ScanSystem() + if err != nil { + return TypedScannerResult{ + ScannerName: s.Name(), + ScannerType: ScannerTypeSystem, + Error: err, + Status: "failed", + Duration: time.Since(startTime).Milliseconds(), + }, err + } + + return TypedScannerResult{ + ScannerName: s.Name(), + ScannerType: ScannerTypeSystem, + SystemData: metrics, + Status: "success", + Duration: time.Since(startTime).Milliseconds(), + }, nil +} + // determineMemorySeverity returns severity based on memory usage percentage func determineMemorySeverity(usedPercent float64) string { switch { diff --git a/aggregator-server/cmd/server/main.go b/aggregator-server/cmd/server/main.go index 1bc9802..d9970da 100644 --- a/aggregator-server/cmd/server/main.go +++ b/aggregator-server/cmd/server/main.go @@ -132,6 +132,8 @@ func main() { userQueries := queries.NewUserQueries(db.DB) subsystemQueries := queries.NewSubsystemQueries(db.DB) agentUpdateQueries := queries.NewAgentUpdateQueries(db.DB) + metricsQueries := queries.NewMetricsQueries(db.DB) + dockerQueries := queries.NewDockerQueries(db.DB) // Ensure admin user exists if err := userQueries.EnsureAdminUser(cfg.Admin.Username, cfg.Admin.Username+"@redflag.local", cfg.Admin.Password); err != nil { @@ -172,6 +174,8 @@ func main() { rateLimitHandler := handlers.NewRateLimitHandler(rateLimiter) downloadHandler := handlers.NewDownloadHandler(filepath.Join("/app"), cfg) subsystemHandler := handlers.NewSubsystemHandler(subsystemQueries, commandQueries) + metricsHandler := handlers.NewMetricsHandler(metricsQueries, agentQueries, commandQueries) + dockerReportsHandler := handlers.NewDockerReportsHandler(dockerQueries, agentQueries, commandQueries) // Initialize verification handler var verificationHandler *handlers.VerificationHandler @@ -245,6 +249,10 @@ func main() { verificationHandler.VerifySignature(c) }) agents.DELETE("/:id", agentHandler.UnregisterAgent) + + // New dedicated endpoints for metrics and docker images (data classification fix) + agents.POST("/:id/metrics", rateLimiter.RateLimit("agent_reports", middleware.KeyByAgentID), metricsHandler.ReportMetrics) + agents.POST("/:id/docker-images", rateLimiter.RateLimit("agent_reports", middleware.KeyByAgentID), dockerReportsHandler.ReportDockerImages) } // Dashboard/Web routes (protected by web auth) @@ -310,6 +318,13 @@ func main() { dashboard.POST("/docker/containers/:container_id/images/:image_id/reject", dockerHandler.RejectUpdate) dashboard.POST("/docker/containers/:container_id/images/:image_id/install", dockerHandler.InstallUpdate) + // Metrics and Docker images routes (data classification fix) + dashboard.GET("/agents/:id/metrics", metricsHandler.GetAgentMetrics) + dashboard.GET("/agents/:id/metrics/storage", metricsHandler.GetAgentStorageMetrics) + dashboard.GET("/agents/:id/metrics/system", metricsHandler.GetAgentSystemMetrics) + dashboard.GET("/agents/:id/docker-images", dockerReportsHandler.GetAgentDockerImages) + dashboard.GET("/agents/:id/docker-info", dockerReportsHandler.GetAgentDockerInfo) + // Admin/Registration Token routes (for agent enrollment management) admin := dashboard.Group("/admin") { diff --git a/aggregator-server/internal/api/handlers/docker_reports.go b/aggregator-server/internal/api/handlers/docker_reports.go new file mode 100644 index 0000000..75e6c6e --- /dev/null +++ b/aggregator-server/internal/api/handlers/docker_reports.go @@ -0,0 +1,277 @@ +package handlers + +import ( + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/database/queries" + "github.com/Fimeg/RedFlag/aggregator-server/internal/models" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// DockerReportsHandler handles Docker image reports from agents +type DockerReportsHandler struct { + dockerQueries *queries.DockerQueries + agentQueries *queries.AgentQueries + commandQueries *queries.CommandQueries +} + +func NewDockerReportsHandler(dq *queries.DockerQueries, aq *queries.AgentQueries, cq *queries.CommandQueries) *DockerReportsHandler { + return &DockerReportsHandler{ + dockerQueries: dq, + agentQueries: aq, + commandQueries: cq, + } +} + +// ReportDockerImages handles Docker image reports from agents using event sourcing +func (h *DockerReportsHandler) ReportDockerImages(c *gin.Context) { + agentID := c.MustGet("agent_id").(uuid.UUID) + + // Update last_seen timestamp + if err := h.agentQueries.UpdateAgentLastSeen(agentID); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update last seen"}) + return + } + + var req models.DockerReportRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate command exists and belongs to agent + commandID, err := uuid.Parse(req.CommandID) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid command ID format"}) + return + } + + command, err := h.commandQueries.GetCommandByID(commandID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "command not found"}) + return + } + + if command.AgentID != agentID { + c.JSON(http.StatusForbidden, gin.H{"error": "unauthorized command"}) + return + } + + // Convert Docker images to events + events := make([]models.StoredDockerImage, 0, len(req.Images)) + for _, item := range req.Images { + event := models.StoredDockerImage{ + ID: uuid.New(), + AgentID: agentID, + PackageType: item.PackageType, + PackageName: item.PackageName, + CurrentVersion: item.CurrentVersion, + AvailableVersion: item.AvailableVersion, + Severity: item.Severity, + RepositorySource: item.RepositorySource, + Metadata: models.JSONB(item.Metadata), + EventType: "discovered", + CreatedAt: req.Timestamp, + } + events = append(events, event) + } + + // Store events in batch with error isolation + if err := h.dockerQueries.CreateDockerEventsBatch(events); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record docker image events"}) + return + } + + // Update command status to completed + result := models.JSONB{ + "docker_images_count": len(req.Images), + "logged_at": time.Now(), + } + + if err := h.commandQueries.MarkCommandCompleted(commandID, result); err != nil { + fmt.Printf("Warning: Failed to mark docker command %s as completed: %v\n", commandID, err) + } + + c.JSON(http.StatusOK, gin.H{ + "message": "docker image events recorded", + "count": len(events), + "command_id": req.CommandID, + }) +} + +// GetAgentDockerImages retrieves Docker image updates for a specific agent +func (h *DockerReportsHandler) GetAgentDockerImages(c *gin.Context) { + agentIDStr := c.Param("agentId") + agentID, err := uuid.Parse(agentIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid agent ID"}) + return + } + + // Parse query parameters + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "50")) + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 100 { + pageSize = 50 + } + + imageName := c.Query("image_name") + registry := c.Query("registry") + severity := c.Query("severity") + hasUpdatesStr := c.Query("has_updates") + + // Build filter + filter := &models.DockerFilter{ + AgentID: &agentID, + ImageName: nil, + Registry: nil, + Severity: nil, + HasUpdates: nil, + Limit: &pageSize, + Offset: &((page - 1) * pageSize), + } + + if imageName != "" { + filter.ImageName = &imageName + } + if registry != "" { + filter.Registry = ®istry + } + if severity != "" { + filter.Severity = &severity + } + if hasUpdatesStr != "" { + hasUpdates := hasUpdatesStr == "true" + filter.HasUpdates = &hasUpdates + } + + // Fetch Docker images + result, err := h.dockerQueries.GetDockerImages(filter) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch docker images"}) + return + } + + c.JSON(http.StatusOK, result) +} + +// GetAgentDockerInfo retrieves detailed Docker information for an agent +func (h *DockerReportsHandler) GetAgentDockerInfo(c *gin.Context) { + agentIDStr := c.Param("agentId") + agentID, err := uuid.Parse(agentIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid agent ID"}) + return + } + + // Get all Docker images for this agent + pageSize := 100 + offset := 0 + + filter := &models.DockerFilter{ + AgentID: &agentID, + Limit: &pageSize, + Offset: &offset, + } + + result, err := h.dockerQueries.GetDockerImages(filter) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch docker images"}) + return + } + + // Convert to detailed format + dockerInfo := make([]models.DockerImageInfo, 0, len(result.Images)) + for _, image := range result.Images { + info := models.DockerImageInfo{ + Name: image.PackageName, + Tag: extractTag(image.PackageName), + ImageID: image.CurrentVersion, + Size: parseImageSize(image.Metadata), + CreatedAt: image.CreatedAt, + Registry: image.RepositorySource, + HasUpdate: image.AvailableVersion != image.CurrentVersion, + LatestImageID: image.AvailableVersion, + Severity: image.Severity, + Labels: extractLabels(image.Metadata), + LastScanned: image.CreatedAt, + } + dockerInfo = append(dockerInfo, info) + } + + c.JSON(http.StatusOK, gin.H{ + "docker_images": dockerInfo, + "is_live": isDockerRecentlyUpdated(result.Images), + "total": len(dockerInfo), + "updates_available": countUpdates(dockerInfo), + }) +} + +// Helper function to extract tag from image name +func extractTag(imageName string) string { + // Simple implementation - split by ":" and return last part + parts := strings.Split(imageName, ":") + if len(parts) > 1 { + return parts[len(parts)-1] + } + return "latest" +} + +// Helper function to parse image size from metadata +func parseImageSize(metadata models.JSONB) int64 { + // Check if size is stored in metadata + if sizeStr, ok := metadata["size"].(string); ok { + if size, err := strconv.ParseInt(sizeStr, 10, 64); err == nil { + return size + } + } + return 0 +} + +// Helper function to extract labels from metadata +func extractLabels(metadata models.JSONB) map[string]string { + labels := make(map[string]string) + if labelsData, ok := metadata["labels"].(map[string]interface{}); ok { + for k, v := range labelsData { + if str, ok := v.(string); ok { + labels[k] = str + } + } + } + return labels +} + +// Helper function to check if Docker images are recently updated +func isDockerRecentlyUpdated(images []models.StoredDockerImage) bool { + if len(images) == 0 { + return false + } + + // Check if any image was updated in the last 5 minutes + now := time.Now() + for _, image := range images { + if now.Sub(image.CreatedAt) < 5*time.Minute { + return true + } + } + return false +} + +// Helper function to count available updates +func countUpdates(images []models.DockerImageInfo) int { + count := 0 + for _, image := range images { + if image.HasUpdate { + count++ + } + } + return count +} \ No newline at end of file diff --git a/aggregator-server/internal/api/handlers/metrics.go b/aggregator-server/internal/api/handlers/metrics.go new file mode 100644 index 0000000..dac48f9 --- /dev/null +++ b/aggregator-server/internal/api/handlers/metrics.go @@ -0,0 +1,289 @@ +package handlers + +import ( + "fmt" + "log" + "net/http" + "strconv" + "time" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/database/queries" + "github.com/Fimeg/RedFlag/aggregator-server/internal/models" + "github.com/gin-gonic/gin" + "github.com/google/uuid" +) + +// MetricsHandler handles system and storage metrics +type MetricsHandler struct { + metricsQueries *queries.MetricsQueries + agentQueries *queries.AgentQueries + commandQueries *queries.CommandQueries +} + +func NewMetricsHandler(mq *queries.MetricsQueries, aq *queries.AgentQueries, cq *queries.CommandQueries) *MetricsHandler { + return &MetricsHandler{ + metricsQueries: mq, + agentQueries: aq, + commandQueries: cq, + } +} + +// ReportMetrics handles metrics reports from agents using event sourcing +func (h *MetricsHandler) ReportMetrics(c *gin.Context) { + agentID := c.MustGet("agent_id").(uuid.UUID) + + // Update last_seen timestamp + if err := h.agentQueries.UpdateAgentLastSeen(agentID); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update last seen"}) + return + } + + var req models.MetricsReportRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate command exists and belongs to agent + commandID, err := uuid.Parse(req.CommandID) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid command ID format"}) + return + } + + command, err := h.commandQueries.GetCommandByID(commandID) + if err != nil { + c.JSON(http.StatusNotFound, gin.H{"error": "command not found"}) + return + } + + if command.AgentID != agentID { + c.JSON(http.StatusForbidden, gin.H{"error": "unauthorized command"}) + return + } + + // Convert metrics to events + events := make([]models.StoredMetric, 0, len(req.Metrics)) + for _, item := range req.Metrics { + event := models.StoredMetric{ + ID: uuid.New(), + AgentID: agentID, + PackageType: item.PackageType, + PackageName: item.PackageName, + CurrentVersion: item.CurrentVersion, + AvailableVersion: item.AvailableVersion, + Severity: item.Severity, + RepositorySource: item.RepositorySource, + Metadata: models.JSONB(item.Metadata), + EventType: "discovered", + CreatedAt: req.Timestamp, + } + events = append(events, event) + } + + // Store events in batch with error isolation + if err := h.metricsQueries.CreateMetricsEventsBatch(events); err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record metrics events"}) + return + } + + // Update command status to completed + result := models.JSONB{ + "metrics_count": len(req.Metrics), + "logged_at": time.Now(), + } + + if err := h.commandQueries.MarkCommandCompleted(commandID, result); err != nil { + fmt.Printf("Warning: Failed to mark metrics command %s as completed: %v\n", commandID, err) + } + + c.JSON(http.StatusOK, gin.H{ + "message": "metrics events recorded", + "count": len(events), + "command_id": req.CommandID, + }) +} + +// GetAgentMetrics retrieves metrics for a specific agent +func (h *MetricsHandler) GetAgentMetrics(c *gin.Context) { + agentIDStr := c.Param("agentId") + agentID, err := uuid.Parse(agentIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid agent ID"}) + return + } + + // Parse query parameters + page, _ := strconv.Atoi(c.DefaultQuery("page", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "50")) + if page < 1 { + page = 1 + } + if pageSize < 1 || pageSize > 100 { + pageSize = 50 + } + + packageType := c.Query("package_type") + severity := c.Query("severity") + + // Build filter + filter := &models.MetricFilter{ + AgentID: &agentID, + PackageType: nil, + Severity: nil, + Limit: &pageSize, + Offset: &((page - 1) * pageSize), + } + + if packageType != "" { + filter.PackageType = &packageType + } + if severity != "" { + filter.Severity = &severity + } + + // Fetch metrics + result, err := h.metricsQueries.GetMetrics(filter) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch metrics"}) + return + } + + c.JSON(http.StatusOK, result) +} + +// GetAgentStorageMetrics retrieves storage metrics for a specific agent +func (h *MetricsHandler) GetAgentStorageMetrics(c *gin.Context) { + agentIDStr := c.Param("agentId") + agentID, err := uuid.Parse(agentIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid agent ID"}) + return + } + + // Filter for storage metrics only + packageType := "storage" + pageSize := 100 // Get all storage metrics + offset := 0 + + filter := &models.MetricFilter{ + AgentID: &agentID, + PackageType: &packageType, + Limit: &pageSize, + Offset: &offset, + } + + result, err := h.metricsQueries.GetMetrics(filter) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch storage metrics"}) + return + } + + // Convert to storage-specific format + storageMetrics := make([]models.StorageMetrics, 0, len(result.Metrics)) + for _, metric := range result.Metrics { + storageMetric := models.StorageMetrics{ + MountPoint: metric.PackageName, + TotalBytes: parseBytes(metric.AvailableVersion), // Available version stores total + UsedBytes: parseBytes(metric.CurrentVersion), // Current version stores used + UsedPercent: calculateUsagePercent(parseBytes(metric.CurrentVersion), parseBytes(metric.AvailableVersion)), + Status: metric.Severity, + LastUpdated: metric.CreatedAt, + } + storageMetrics = append(storageMetrics, storageMetric) + } + + c.JSON(http.StatusOK, gin.H{ + "storage_metrics": storageMetrics, + "is_live": isRecentlyUpdated(result.Metrics), + }) +} + +// GetAgentSystemMetrics retrieves system metrics for a specific agent +func (h *MetricsHandler) GetAgentSystemMetrics(c *gin.Context) { + agentIDStr := c.Param("agentId") + agentID, err := uuid.Parse(agentIDStr) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid agent ID"}) + return + } + + // Filter for system metrics only + packageType := "system" + pageSize := 100 + offset := 0 + + filter := &models.MetricFilter{ + AgentID: &agentID, + PackageType: &packageType, + Limit: &pageSize, + Offset: &offset, + } + + result, err := h.metricsQueries.GetMetrics(filter) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch system metrics"}) + return + } + + // Aggregate system metrics + systemMetrics := aggregateSystemMetrics(result.Metrics) + + c.JSON(http.StatusOK, gin.H{ + "system_metrics": systemMetrics, + "is_live": isRecentlyUpdated(result.Metrics), + }) +} + +// Helper function to parse bytes from string +func parseBytes(s string) int64 { + // Simple implementation - in real code, parse "10GB", "500MB", etc. + // For now, return 0 if parsing fails + return 0 +} + +// Helper function to calculate usage percentage +func calculateUsagePercent(used, total int64) float64 { + if total == 0 { + return 0 + } + return float64(used) / float64(total) * 100 +} + +// Helper function to check if metrics are recently updated +func isRecentlyUpdated(metrics []models.StoredMetric) bool { + if len(metrics) == 0 { + return false + } + + // Check if any metric was updated in the last 5 minutes + now := time.Now() + for _, metric := range metrics { + if now.Sub(metric.CreatedAt) < 5*time.Minute { + return true + } + } + return false +} + +// Helper function to aggregate system metrics +func aggregateSystemMetrics(metrics []models.StoredMetric) *models.SystemMetrics { + if len(metrics) == 0 { + return nil + } + + // Aggregate the most recent metrics + // This is a simplified implementation - real code would need proper aggregation + return &models.SystemMetrics{ + CPUModel: "Unknown", + CPUCores: 0, + CPUThreads: 0, + MemoryTotal: 0, + MemoryUsed: 0, + MemoryPercent: 0, + Processes: 0, + Uptime: "Unknown", + LoadAverage: []float64{0, 0, 0}, + LastUpdated: metrics[0].CreatedAt, + } +} \ No newline at end of file diff --git a/aggregator-server/internal/api/handlers/security.go b/aggregator-server/internal/api/handlers/security.go new file mode 100644 index 0000000..f330c57 --- /dev/null +++ b/aggregator-server/internal/api/handlers/security.go @@ -0,0 +1,264 @@ +package handlers + +import ( + "net/http" + "time" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/database/queries" + "github.com/Fimeg/RedFlag/aggregator-server/internal/services" + "github.com/gin-gonic/gin" +) + +// SecurityHandler handles security health check endpoints +type SecurityHandler struct { + signingService *services.SigningService + agentQueries *queries.AgentQueries + commandQueries *queries.CommandQueries +} + +// NewSecurityHandler creates a new security handler +func NewSecurityHandler(signingService *services.SigningService, agentQueries *queries.AgentQueries, commandQueries *queries.CommandQueries) *SecurityHandler { + return &SecurityHandler{ + signingService: signingService, + agentQueries: agentQueries, + commandQueries: commandQueries, + } +} + +// setSecurityHeaders sets appropriate cache control headers for security endpoints +func (h *SecurityHandler) setSecurityHeaders(c *gin.Context) { + c.Header("Cache-Control", "no-store, no-cache, must-revalidate, private") + c.Header("Pragma", "no-cache") + c.Header("Expires", "0") +} + +// SigningStatus returns the status of the Ed25519 signing service +func (h *SecurityHandler) SigningStatus(c *gin.Context) { + h.setSecurityHeaders(c) + + response := gin.H{ + "status": "unavailable", + "timestamp": time.Now(), + "checks": map[string]interface{}{ + "service_initialized": false, + "public_key_available": false, + "signing_operational": false, + }, + } + + if h.signingService != nil { + response["status"] = "available" + response["checks"].(map[string]interface{})["service_initialized"] = true + + // Check if public key is available + pubKey := h.signingService.GetPublicKey() + if pubKey != "" { + response["checks"].(map[string]interface{})["public_key_available"] = true + response["checks"].(map[string]interface{})["signing_operational"] = true + response["public_key_fingerprint"] = h.signingService.GetPublicKeyFingerprint() + response["algorithm"] = "ed25519" + } + } + + c.JSON(http.StatusOK, response) +} + +// NonceValidationStatus returns nonce validation health metrics +func (h *SecurityHandler) NonceValidationStatus(c *gin.Context) { + h.setSecurityHeaders(c) + response := gin.H{ + "status": "unknown", + "timestamp": time.Now(), + "checks": map[string]interface{}{ + "validation_enabled": true, + "max_age_minutes": 5, + "recent_validations": 0, + "validation_failures": 0, + }, + "details": map[string]interface{}{ + "nonce_format": "UUID:UnixTimestamp", + "signature_algorithm": "ed25519", + "replay_protection": "active", + }, + } + + // TODO: Add metrics collection for nonce validations + // This would require adding logging/metrics to the nonce validation process + // For now, we provide the configuration status + + response["status"] = "healthy" + response["checks"].(map[string]interface{})["validation_enabled"] = true + response["checks"].(map[string]interface{})["max_age_minutes"] = 5 + + c.JSON(http.StatusOK, response) +} + +// CommandValidationStatus returns command validation and processing metrics +func (h *SecurityHandler) CommandValidationStatus(c *gin.Context) { + h.setSecurityHeaders(c) + response := gin.H{ + "status": "unknown", + "timestamp": time.Now(), + "metrics": map[string]interface{}{ + "total_pending_commands": 0, + "agents_with_pending": 0, + "commands_last_hour": 0, + "commands_last_24h": 0, + }, + "checks": map[string]interface{}{ + "command_processing": "unknown", + "backpressure_active": false, + "agent_responsive": "unknown", + }, + } + + // Get command metrics + if h.commandQueries != nil { + // TODO: Add methods to CommandQueries for aggregate metrics + // For now, we can provide basic status + response["metrics"].(map[string]interface{})["total_pending_commands"] = "N/A" + response["metrics"].(map[string]interface{})["agents_with_pending"] = "N/A" + } + + // Get agent metrics for responsiveness + if h.agentQueries != nil { + // TODO: Add method to count online vs offline agents + // This would help identify if agents are responsive to commands + } + + response["status"] = "healthy" + response["checks"].(map[string]interface{})["command_processing"] = "operational" + + c.JSON(http.StatusOK, response) +} + +// MachineBindingStatus returns machine binding enforcement metrics +func (h *SecurityHandler) MachineBindingStatus(c *gin.Context) { + h.setSecurityHeaders(c) + response := gin.H{ + "status": "unknown", + "timestamp": time.Now(), + "checks": map[string]interface{}{ + "binding_enforced": true, + "min_agent_version": "v0.1.22", + "fingerprint_required": true, + "recent_violations": 0, + }, + "details": map[string]interface{}{ + "enforcement_method": "hardware_fingerprint", + "binding_scope": "machine_id + cpu + memory + system_uuid", + "violation_action": "command_rejection", + }, + } + + // TODO: Add metrics for machine binding violations + // This would require logging when machine binding middleware rejects requests + + response["status"] = "enforced" + response["checks"].(map[string]interface{})["binding_enforced"] = true + response["checks"].(map[string]interface{})["min_agent_version"] = "v0.1.22" + + c.JSON(http.StatusOK, response) +} + +// SecurityOverview returns a comprehensive overview of all security subsystems +func (h *SecurityHandler) SecurityOverview(c *gin.Context) { + h.setSecurityHeaders(c) + overview := gin.H{ + "timestamp": time.Now(), + "overall_status": "unknown", + "subsystems": map[string]interface{}{ + "ed25519_signing": map[string]interface{}{ + "status": "unknown", + "enabled": true, + }, + "nonce_validation": map[string]interface{}{ + "status": "unknown", + "enabled": true, + }, + "machine_binding": map[string]interface{}{ + "status": "unknown", + "enabled": true, + }, + "command_validation": map[string]interface{}{ + "status": "unknown", + "enabled": true, + }, + }, + "alerts": []string{}, + "recommendations": []string{}, + } + + // Check Ed25519 signing + if h.signingService != nil && h.signingService.GetPublicKey() != "" { + overview["subsystems"].(map[string]interface{})["ed25519_signing"].(map[string]interface{})["status"] = "healthy" + } else { + overview["subsystems"].(map[string]interface{})["ed25519_signing"].(map[string]interface{})["status"] = "unavailable" + overview["alerts"] = append(overview["alerts"].([]string), "Ed25519 signing service not configured") + overview["recommendations"] = append(overview["recommendations"].([]string), "Set REDFLAG_SIGNING_PRIVATE_KEY environment variable") + } + + // Check nonce validation + overview["subsystems"].(map[string]interface{})["nonce_validation"].(map[string]interface{})["status"] = "healthy" + + // Check machine binding + overview["subsystems"].(map[string]interface{})["machine_binding"].(map[string]interface{})["status"] = "enforced" + + // Check command validation + overview["subsystems"].(map[string]interface{})["command_validation"].(map[string]interface{})["status"] = "operational" + + // Determine overall status + healthyCount := 0 + totalCount := 4 + for _, subsystem := range overview["subsystems"].(map[string]interface{}) { + subsystemMap := subsystem.(map[string]interface{}) + if subsystemMap["status"] == "healthy" || subsystemMap["status"] == "enforced" || subsystemMap["status"] == "operational" { + healthyCount++ + } + } + + if healthyCount == totalCount { + overview["overall_status"] = "healthy" + } else if healthyCount >= totalCount/2 { + overview["overall_status"] = "degraded" + } else { + overview["overall_status"] = "unhealthy" + } + + c.JSON(http.StatusOK, overview) +} + +// SecurityMetrics returns detailed security metrics for monitoring +func (h *SecurityHandler) SecurityMetrics(c *gin.Context) { + h.setSecurityHeaders(c) + metrics := gin.H{ + "timestamp": time.Now(), + "signing": map[string]interface{}{ + "public_key_fingerprint": "", + "algorithm": "ed25519", + "key_size": 32, + }, + "nonce": map[string]interface{}{ + "max_age_seconds": 300, // 5 minutes + "format": "UUID:UnixTimestamp", + }, + "machine_binding": map[string]interface{}{ + "min_version": "v0.1.22", + "enforcement": "hardware_fingerprint", + }, + "command_processing": map[string]interface{}{ + "backpressure_threshold": 5, + "rate_limit_per_second": 100, + }, + } + + // Add signing metrics if available + if h.signingService != nil { + metrics["signing"].(map[string]interface{})["public_key_fingerprint"] = h.signingService.GetPublicKeyFingerprint() + metrics["signing"].(map[string]interface{})["configured"] = true + } else { + metrics["signing"].(map[string]interface{})["configured"] = false + } + + c.JSON(http.StatusOK, metrics) +} \ No newline at end of file diff --git a/aggregator-server/internal/database/queries/docker.go b/aggregator-server/internal/database/queries/docker.go new file mode 100644 index 0000000..bdb9940 --- /dev/null +++ b/aggregator-server/internal/database/queries/docker.go @@ -0,0 +1,353 @@ +package queries + +import ( + "database/sql" + "fmt" + "time" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/models" + "github.com/google/uuid" +) + +// DockerQueries handles database operations for Docker images +type DockerQueries struct { + db *sql.DB +} + +func NewDockerQueries(db *sql.DB) *DockerQueries { + return &DockerQueries{db: db} +} + +// CreateDockerEventsBatch creates multiple Docker image events in a single transaction +func (q *DockerQueries) CreateDockerEventsBatch(events []models.StoredDockerImage) error { + if len(events) == 0 { + return nil + } + + tx, err := q.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Prepare the insert statement + stmt, err := tx.Prepare(` + INSERT INTO docker_images ( + id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (agent_id, package_name, package_type, created_at) DO NOTHING + `) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + // Insert each event with error isolation + for _, event := range events { + _, err := stmt.Exec( + event.ID, + event.AgentID, + event.PackageType, + event.PackageName, + event.CurrentVersion, + event.AvailableVersion, + event.Severity, + event.RepositorySource, + event.Metadata, + event.EventType, + event.CreatedAt, + ) + if err != nil { + // Log error but continue with other events + fmt.Printf("Warning: Failed to insert docker image event %s: %v\n", event.ID, err) + continue + } + } + + return tx.Commit() +} + +// GetDockerImages retrieves Docker images based on filter criteria +func (q *DockerQueries) GetDockerImages(filter *models.DockerFilter) (*models.DockerResult, error) { + query := ` + SELECT id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + FROM docker_images + WHERE 1=1 + ` + args := []interface{}{} + argIndex := 1 + + // Build WHERE clause + if filter.AgentID != nil { + query += fmt.Sprintf(" AND agent_id = $%d", argIndex) + args = append(args, *filter.AgentID) + argIndex++ + } + + if filter.ImageName != nil { + query += fmt.Sprintf(" AND package_name ILIKE $%d", argIndex) + args = append(args, "%"+*filter.ImageName+"%") + argIndex++ + } + + if filter.Registry != nil { + query += fmt.Sprintf(" AND repository_source ILIKE $%d", argIndex) + args = append(args, "%"+*filter.Registry+"%") + argIndex++ + } + + if filter.Severity != nil { + query += fmt.Sprintf(" AND severity = $%d", argIndex) + args = append(args, *filter.Severity) + argIndex++ + } + + if filter.HasUpdates != nil { + if *filter.HasUpdates { + query += fmt.Sprintf(" AND current_version != available_version", argIndex) + } else { + query += fmt.Sprintf(" AND current_version = available_version", argIndex) + } + argIndex++ + } + + // Add ordering and pagination + query += " ORDER BY created_at DESC" + + if filter.Limit != nil { + query += fmt.Sprintf(" LIMIT $%d", argIndex) + args = append(args, *filter.Limit) + argIndex++ + } + + if filter.Offset != nil { + query += fmt.Sprintf(" OFFSET $%d", argIndex) + args = append(args, *filter.Offset) + argIndex++ + } + + rows, err := q.db.Query(query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query docker images: %w", err) + } + defer rows.Close() + + var images []models.StoredDockerImage + for rows.Next() { + var image models.StoredDockerImage + err := rows.Scan( + &image.ID, + &image.AgentID, + &image.PackageType, + &image.PackageName, + &image.CurrentVersion, + &image.AvailableVersion, + &image.Severity, + &image.RepositorySource, + &image.Metadata, + &image.EventType, + &image.CreatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan docker image: %w", err) + } + images = append(images, image) + } + + // Get total count + countQuery := `SELECT COUNT(*) FROM docker_images WHERE 1=1` + countArgs := []interface{}{} + countIndex := 1 + + if filter.AgentID != nil { + countQuery += fmt.Sprintf(" AND agent_id = $%d", countIndex) + countArgs = append(countArgs, *filter.AgentID) + countIndex++ + } + + if filter.ImageName != nil { + countQuery += fmt.Sprintf(" AND package_name ILIKE $%d", countIndex) + countArgs = append(countArgs, "%"+*filter.ImageName+"%") + countIndex++ + } + + if filter.Registry != nil { + countQuery += fmt.Sprintf(" AND repository_source ILIKE $%d", countIndex) + countArgs = append(countArgs, "%"+*filter.Registry+"%") + countIndex++ + } + + if filter.Severity != nil { + countQuery += fmt.Sprintf(" AND severity = $%d", countIndex) + countArgs = append(countArgs, *filter.Severity) + countIndex++ + } + + if filter.HasUpdates != nil { + if *filter.HasUpdates { + countQuery += fmt.Sprintf(" AND current_version != available_version", countIndex) + } else { + countQuery += fmt.Sprintf(" AND current_version = available_version", countIndex) + } + countIndex++ + } + + var total int + err = q.db.QueryRow(countQuery, countArgs...).Scan(&total) + if err != nil { + return nil, fmt.Errorf("failed to count docker images: %w", err) + } + + // Calculate pagination + page := 1 + perPage := 50 + if filter.Offset != nil && filter.Limit != nil { + page = (*filter.Offset / *filter.Limit) + 1 + perPage = *filter.Limit + } + + return &models.DockerResult{ + Images: images, + Total: total, + Page: page, + PerPage: perPage, + }, nil +} + +// GetDockerImagesByAgentID retrieves Docker images for a specific agent +func (q *DockerQueries) GetDockerImagesByAgentID(agentID uuid.UUID, limit int) ([]models.StoredDockerImage, error) { + query := ` + SELECT id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + FROM docker_images + WHERE agent_id = $1 + ORDER BY created_at DESC + LIMIT $2 + ` + + rows, err := q.db.Query(query, agentID, limit) + if err != nil { + return nil, fmt.Errorf("failed to query docker images by agent: %w", err) + } + defer rows.Close() + + var images []models.StoredDockerImage + for rows.Next() { + var image models.StoredDockerImage + err := rows.Scan( + &image.ID, + &image.AgentID, + &image.PackageType, + &image.PackageName, + &image.CurrentVersion, + &image.AvailableVersion, + &image.Severity, + &image.RepositorySource, + &image.Metadata, + &image.EventType, + &image.CreatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan docker image: %w", err) + } + images = append(images, image) + } + + return images, nil +} + +// GetDockerImagesWithUpdates retrieves Docker images that have available updates +func (q *DockerQueries) GetDockerImagesWithUpdates(limit int) ([]models.StoredDockerImage, error) { + query := ` + SELECT id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + FROM docker_images + WHERE current_version != available_version + ORDER BY created_at DESC + LIMIT $1 + ` + + rows, err := q.db.Query(query, limit) + if err != nil { + return nil, fmt.Errorf("failed to query docker images with updates: %w", err) + } + defer rows.Close() + + var images []models.StoredDockerImage + for rows.Next() { + var image models.StoredDockerImage + err := rows.Scan( + &image.ID, + &image.AgentID, + &image.PackageType, + &image.PackageName, + &image.CurrentVersion, + &image.AvailableVersion, + &image.Severity, + &image.RepositorySource, + &image.Metadata, + &image.EventType, + &image.CreatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan docker image: %w", err) + } + images = append(images, image) + } + + return images, nil +} + +// DeleteOldDockerImages deletes Docker images older than the specified number of days +func (q *DockerQueries) DeleteOldDockerImages(days int) error { + query := `DELETE FROM docker_images WHERE created_at < NOW() - INTERVAL '1 day' * $1` + + result, err := q.db.Exec(query, days) + if err != nil { + return fmt.Errorf("failed to delete old docker images: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected > 0 { + fmt.Printf("Deleted %d old docker image records\n", rowsAffected) + } + + return nil +} + +// GetDockerStats returns statistics about Docker images across all agents +func (q *DockerQueries) GetDockerStats() (*models.DockerStats, error) { + var stats models.DockerStats + + // Get total images + err := q.db.QueryRow("SELECT COUNT(*) FROM docker_images").Scan(&stats.TotalImages) + if err != nil { + return nil, fmt.Errorf("failed to get total docker images: %w", err) + } + + // Get images with updates + err = q.db.QueryRow("SELECT COUNT(*) FROM docker_images WHERE current_version != available_version").Scan(&stats.UpdatesAvailable) + if err != nil { + return nil, fmt.Errorf("failed to get docker images with updates: %w", err) + } + + // Get critical updates + err = q.db.QueryRow("SELECT COUNT(*) FROM docker_images WHERE severity = 'critical' AND current_version != available_version").Scan(&stats.CriticalUpdates) + if err != nil { + return nil, fmt.Errorf("failed to get critical docker updates: %w", err) + } + + // Get agents with Docker images + err = q.db.QueryRow("SELECT COUNT(DISTINCT agent_id) FROM docker_images").Scan(&stats.AgentsWithContainers) + if err != nil { + return nil, fmt.Errorf("failed to get agents with docker images: %w", err) + } + + return &stats, nil +} \ No newline at end of file diff --git a/aggregator-server/internal/database/queries/metrics.go b/aggregator-server/internal/database/queries/metrics.go new file mode 100644 index 0000000..9e2b65b --- /dev/null +++ b/aggregator-server/internal/database/queries/metrics.go @@ -0,0 +1,287 @@ +package queries + +import ( + "database/sql" + "fmt" + "time" + + "github.com/Fimeg/RedFlag/aggregator-server/internal/models" + "github.com/google/uuid" + "github.com/lib/pq" +) + +// MetricsQueries handles database operations for metrics +type MetricsQueries struct { + db *sql.DB +} + +func NewMetricsQueries(db *sql.DB) *MetricsQueries { + return &MetricsQueries{db: db} +} + +// CreateMetricsEventsBatch creates multiple metric events in a single transaction +func (q *MetricsQueries) CreateMetricsEventsBatch(events []models.StoredMetric) error { + if len(events) == 0 { + return nil + } + + tx, err := q.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + // Prepare the insert statement + stmt, err := tx.Prepare(` + INSERT INTO metrics ( + id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (agent_id, package_name, package_type, created_at) DO NOTHING + `) + if err != nil { + return fmt.Errorf("failed to prepare statement: %w", err) + } + defer stmt.Close() + + // Insert each event with error isolation + for _, event := range events { + _, err := stmt.Exec( + event.ID, + event.AgentID, + event.PackageType, + event.PackageName, + event.CurrentVersion, + event.AvailableVersion, + event.Severity, + event.RepositorySource, + event.Metadata, + event.EventType, + event.CreatedAt, + ) + if err != nil { + // Log error but continue with other events + fmt.Printf("Warning: Failed to insert metric event %s: %v\n", event.ID, err) + continue + } + } + + return tx.Commit() +} + +// GetMetrics retrieves metrics based on filter criteria +func (q *MetricsQueries) GetMetrics(filter *models.MetricFilter) (*models.MetricResult, error) { + query := ` + SELECT id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + FROM metrics + WHERE 1=1 + ` + args := []interface{}{} + argIndex := 1 + + // Build WHERE clause + if filter.AgentID != nil { + query += fmt.Sprintf(" AND agent_id = $%d", argIndex) + args = append(args, *filter.AgentID) + argIndex++ + } + + if filter.PackageType != nil { + query += fmt.Sprintf(" AND package_type = $%d", argIndex) + args = append(args, *filter.PackageType) + argIndex++ + } + + if filter.Severity != nil { + query += fmt.Sprintf(" AND severity = $%d", argIndex) + args = append(args, *filter.Severity) + argIndex++ + } + + // Add ordering and pagination + query += " ORDER BY created_at DESC" + + if filter.Limit != nil { + query += fmt.Sprintf(" LIMIT $%d", argIndex) + args = append(args, *filter.Limit) + argIndex++ + } + + if filter.Offset != nil { + query += fmt.Sprintf(" OFFSET $%d", argIndex) + args = append(args, *filter.Offset) + argIndex++ + } + + rows, err := q.db.Query(query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query metrics: %w", err) + } + defer rows.Close() + + var metrics []models.StoredMetric + for rows.Next() { + var metric models.StoredMetric + err := rows.Scan( + &metric.ID, + &metric.AgentID, + &metric.PackageType, + &metric.PackageName, + &metric.CurrentVersion, + &metric.AvailableVersion, + &metric.Severity, + &metric.RepositorySource, + &metric.Metadata, + &metric.EventType, + &metric.CreatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan metric: %w", err) + } + metrics = append(metrics, metric) + } + + // Get total count + countQuery := `SELECT COUNT(*) FROM metrics WHERE 1=1` + countArgs := []interface{}{} + countIndex := 1 + + if filter.AgentID != nil { + countQuery += fmt.Sprintf(" AND agent_id = $%d", countIndex) + countArgs = append(countArgs, *filter.AgentID) + countIndex++ + } + + if filter.PackageType != nil { + countQuery += fmt.Sprintf(" AND package_type = $%d", countIndex) + countArgs = append(countArgs, *filter.PackageType) + countIndex++ + } + + if filter.Severity != nil { + countQuery += fmt.Sprintf(" AND severity = $%d", countIndex) + countArgs = append(countArgs, *filter.Severity) + countIndex++ + } + + var total int + err = q.db.QueryRow(countQuery, countArgs...).Scan(&total) + if err != nil { + return nil, fmt.Errorf("failed to count metrics: %w", err) + } + + // Calculate pagination + page := 1 + perPage := 50 + if filter.Offset != nil && filter.Limit != nil { + page = (*filter.Offset / *filter.Limit) + 1 + perPage = *filter.Limit + } + + return &models.MetricResult{ + Metrics: metrics, + Total: total, + Page: page, + PerPage: perPage, + }, nil +} + +// GetMetricsByAgentID retrieves metrics for a specific agent +func (q *MetricsQueries) GetMetricsByAgentID(agentID uuid.UUID, limit int) ([]models.StoredMetric, error) { + query := ` + SELECT id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + FROM metrics + WHERE agent_id = $1 + ORDER BY created_at DESC + LIMIT $2 + ` + + rows, err := q.db.Query(query, agentID, limit) + if err != nil { + return nil, fmt.Errorf("failed to query metrics by agent: %w", err) + } + defer rows.Close() + + var metrics []models.StoredMetric + for rows.Next() { + var metric models.StoredMetric + err := rows.Scan( + &metric.ID, + &metric.AgentID, + &metric.PackageType, + &metric.PackageName, + &metric.CurrentVersion, + &metric.AvailableVersion, + &metric.Severity, + &metric.RepositorySource, + &metric.Metadata, + &metric.EventType, + &metric.CreatedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan metric: %w", err) + } + metrics = append(metrics, metric) + } + + return metrics, nil +} + +// GetLatestMetricsByType retrieves the latest metrics for a specific type +func (q *MetricsQueries) GetLatestMetricsByType(agentID uuid.UUID, packageType string) (*models.StoredMetric, error) { + query := ` + SELECT id, agent_id, package_type, package_name, current_version, available_version, + severity, repository_source, metadata, event_type, created_at + FROM metrics + WHERE agent_id = $1 AND package_type = $2 + ORDER BY created_at DESC + LIMIT 1 + ` + + var metric models.StoredMetric + err := q.db.QueryRow(query, agentID, packageType).Scan( + &metric.ID, + &metric.AgentID, + &metric.PackageType, + &metric.PackageName, + &metric.CurrentVersion, + &metric.AvailableVersion, + &metric.Severity, + &metric.RepositorySource, + &metric.Metadata, + &metric.EventType, + &metric.CreatedAt, + ) + + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get latest metric: %w", err) + } + + return &metric, nil +} + +// DeleteOldMetrics deletes metrics older than the specified number of days +func (q *MetricsQueries) DeleteOldMetrics(days int) error { + query := `DELETE FROM metrics WHERE created_at < NOW() - INTERVAL '1 day' * $1` + + result, err := q.db.Exec(query, days) + if err != nil { + return fmt.Errorf("failed to delete old metrics: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected > 0 { + fmt.Printf("Deleted %d old metric records\n", rowsAffected) + } + + return nil +} \ No newline at end of file diff --git a/aggregator-server/internal/models/docker.go b/aggregator-server/internal/models/docker.go index ef6452d..44d36a1 100644 --- a/aggregator-server/internal/models/docker.go +++ b/aggregator-server/internal/models/docker.go @@ -2,6 +2,7 @@ package models import ( "time" + "github.com/google/uuid" ) // DockerPort represents a port mapping in a Docker container @@ -81,4 +82,56 @@ type BulkDockerUpdateRequest struct { ImageID string `json:"image_id" binding:"required"` } `json:"updates" binding:"required"` ScheduledAt *time.Time `json:"scheduled_at,omitempty"` +} + +// DockerReportRequest is sent by agents when reporting Docker image updates +type DockerReportRequest struct { + CommandID string `json:"command_id"` + Timestamp time.Time `json:"timestamp"` + Images []DockerImage `json:"images"` +} + +// DockerImageUpdate represents a Docker image update from agent scans +type DockerImageUpdate struct { + PackageType string `json:"package_type"` // "docker_image" + PackageName string `json:"package_name"` // image name:tag + CurrentVersion string `json:"current_version"` // current image ID + AvailableVersion string `json:"available_version"` // latest image ID + Severity string `json:"severity"` // "low", "moderate", "high", "critical" + RepositorySource string `json:"repository_source"` // registry URL + Metadata map[string]string `json:"metadata"` +} + +// StoredDockerImage represents a Docker image update in the database +type StoredDockerImage struct { + ID uuid.UUID `json:"id" db:"id"` + AgentID uuid.UUID `json:"agent_id" db:"agent_id"` + PackageType string `json:"package_type" db:"package_type"` + PackageName string `json:"package_name" db:"package_name"` + CurrentVersion string `json:"current_version" db:"current_version"` + AvailableVersion string `json:"available_version" db:"available_version"` + Severity string `json:"severity" db:"severity"` + RepositorySource string `json:"repository_source" db:"repository_source"` + Metadata JSONB `json:"metadata" db:"metadata"` + EventType string `json:"event_type" db:"event_type"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// DockerFilter represents filtering options for Docker image queries +type DockerFilter struct { + AgentID *uuid.UUID `json:"agent_id,omitempty"` + ImageName *string `json:"image_name,omitempty"` + Registry *string `json:"registry,omitempty"` + Severity *string `json:"severity,omitempty"` + HasUpdates *bool `json:"has_updates,omitempty"` + Limit *int `json:"limit,omitempty"` + Offset *int `json:"offset,omitempty"` +} + +// DockerResult represents the result of a Docker image query +type DockerResult struct { + Images []StoredDockerImage `json:"images"` + Total int `json:"total"` + Page int `json:"page"` + PerPage int `json:"per_page"` } \ No newline at end of file diff --git a/aggregator-server/internal/models/metrics.go b/aggregator-server/internal/models/metrics.go new file mode 100644 index 0000000..bf0bd2d --- /dev/null +++ b/aggregator-server/internal/models/metrics.go @@ -0,0 +1,81 @@ +package models + +import ( + "time" + "github.com/google/uuid" +) + +// MetricsReportRequest is sent by agents when reporting system/storage metrics +type MetricsReportRequest struct { + CommandID string `json:"command_id"` + Timestamp time.Time `json:"timestamp"` + Metrics []Metric `json:"metrics"` +} + +// Metric represents a system or storage metric +type Metric struct { + PackageType string `json:"package_type"` // "storage", "system", "cpu", "memory" + PackageName string `json:"package_name"` // mount point, metric name + CurrentVersion string `json:"current_version"` // current usage, value + AvailableVersion string `json:"available_version"` // available space, threshold + Severity string `json:"severity"` // "low", "moderate", "high" + RepositorySource string `json:"repository_source"` + Metadata map[string]string `json:"metadata"` +} + +// Metric represents a stored metric in the database +type StoredMetric struct { + ID uuid.UUID `json:"id" db:"id"` + AgentID uuid.UUID `json:"agent_id" db:"agent_id"` + PackageType string `json:"package_type" db:"package_type"` + PackageName string `json:"package_name" db:"package_name"` + CurrentVersion string `json:"current_version" db:"current_version"` + AvailableVersion string `json:"available_version" db:"available_version"` + Severity string `json:"severity" db:"severity"` + RepositorySource string `json:"repository_source" db:"repository_source"` + Metadata JSONB `json:"metadata" db:"metadata"` + EventType string `json:"event_type" db:"event_type"` + CreatedAt time.Time `json:"created_at" db:"created_at"` +} + +// MetricFilter represents filtering options for metrics queries +type MetricFilter struct { + AgentID *uuid.UUID `json:"agent_id,omitempty"` + PackageType *string `json:"package_type,omitempty"` + Severity *string `json:"severity,omitempty"` + Limit *int `json:"limit,omitempty"` + Offset *int `json:"offset,omitempty"` +} + +// MetricResult represents the result of a metrics query +type MetricResult struct { + Metrics []StoredMetric `json:"metrics"` + Total int `json:"total"` + Page int `json:"page"` + PerPage int `json:"per_page"` +} + +// StorageMetrics represents storage-specific metrics for easier consumption +type StorageMetrics struct { + MountPoint string `json:"mount_point"` + TotalBytes int64 `json:"total_bytes"` + UsedBytes int64 `json:"used_bytes"` + AvailableBytes int64 `json:"available_bytes"` + UsedPercent float64 `json:"used_percent"` + Status string `json:"status"` // "low", "moderate", "high", "critical" + LastUpdated time.Time `json:"last_updated"` +} + +// SystemMetrics represents system-specific metrics for easier consumption +type SystemMetrics struct { + CPUModel string `json:"cpu_model"` + CPUCores int `json:"cpu_cores"` + CPUThreads int `json:"cpu_threads"` + MemoryTotal int64 `json:"memory_total"` + MemoryUsed int64 `json:"memory_used"` + MemoryPercent float64 `json:"memory_percent"` + Processes int `json:"processes"` + Uptime string `json:"uptime"` + LoadAverage []float64 `json:"load_average"` + LastUpdated time.Time `json:"last_updated"` +} \ No newline at end of file diff --git a/aggregator-server/migrations/003_create_metrics_and_docker_tables.sql b/aggregator-server/migrations/003_create_metrics_and_docker_tables.sql new file mode 100644 index 0000000..c9ed14c --- /dev/null +++ b/aggregator-server/migrations/003_create_metrics_and_docker_tables.sql @@ -0,0 +1,84 @@ +-- Migration: Create separate tables for metrics and docker images +-- Purpose: Fix data classification issue where storage/system metrics were incorrectly stored as package updates + +-- Create metrics table for system and storage metrics +CREATE TABLE IF NOT EXISTS metrics ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + package_type VARCHAR(50) NOT NULL, -- "storage", "system", "cpu", "memory" + package_name VARCHAR(255) NOT NULL, + current_version TEXT NOT NULL, -- current usage, value + available_version TEXT NOT NULL, -- available space, threshold + severity VARCHAR(20) NOT NULL DEFAULT 'low', -- "low", "moderate", "high", "critical" + repository_source VARCHAR(255), + metadata JSONB DEFAULT '{}', + event_type VARCHAR(50) NOT NULL DEFAULT 'discovered', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + + -- Unique constraint to prevent duplicate entries + UNIQUE (agent_id, package_name, package_type, created_at) +); + +-- Create docker_images table for Docker image information +CREATE TABLE IF NOT EXISTS docker_images ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE, + package_type VARCHAR(50) NOT NULL DEFAULT 'docker_image', + package_name VARCHAR(500) NOT NULL, -- image name:tag + current_version VARCHAR(255) NOT NULL, -- current image ID + available_version VARCHAR(255), -- latest image ID + severity VARCHAR(20) NOT NULL DEFAULT 'low', -- "low", "moderate", "high", "critical" + repository_source VARCHAR(500), -- registry URL + metadata JSONB DEFAULT '{}', + event_type VARCHAR(50) NOT NULL DEFAULT 'discovered', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + + -- Unique constraint to prevent duplicate entries + UNIQUE (agent_id, package_name, package_type, created_at) +); + +-- Create indexes for better performance +CREATE INDEX IF NOT EXISTS idx_metrics_agent_id ON metrics(agent_id); +CREATE INDEX IF NOT EXISTS idx_metrics_package_type ON metrics(package_type); +CREATE INDEX IF NOT EXISTS idx_metrics_created_at ON metrics(created_at); +CREATE INDEX IF NOT EXISTS idx_metrics_severity ON metrics(severity); + +CREATE INDEX IF NOT EXISTS idx_docker_images_agent_id ON docker_images(agent_id); +CREATE INDEX IF NOT EXISTS idx_docker_images_package_type ON docker_images(package_type); +CREATE INDEX IF NOT EXISTS idx_docker_images_created_at ON docker_images(created_at); +CREATE INDEX IF NOT EXISTS idx_docker_images_severity ON docker_images(severity); +CREATE INDEX IF NOT EXISTS idx_docker_images_has_updates ON docker_images(current_version, available_version) WHERE current_version != available_version; + +-- Add comments for documentation +COMMENT ON TABLE metrics IS 'Stores system and storage metrics collected from agents, separate from package updates'; +COMMENT ON TABLE docker_images IS 'Stores Docker image information and update availability, separate from package updates'; + +COMMENT ON COLUMN metrics.package_type IS 'Type of metric: storage, system, cpu, memory, etc.'; +COMMENT ON COLUMN metrics.package_name IS 'Name of the metric (mount point, metric name, etc.)'; +COMMENT ON COLUMN metrics.current_version IS 'Current value or usage'; +COMMENT ON COLUMN metrics.available_version IS 'Available space or threshold'; +COMMENT ON COLUMN metrics.severity IS 'Severity level: low, moderate, high, critical'; + +COMMENT ON COLUMN docker_images.package_name IS 'Docker image name with tag (e.g., nginx:latest)'; +COMMENT ON COLUMN docker_images.current_version IS 'Current image ID'; +COMMENT ON COLUMN docker_images.available_version IS 'Latest available image ID'; +COMMENT ON COLUMN docker_images.severity IS 'Update severity: low, moderate, high, critical'; + +-- Create or replace function to clean old data (optional) +CREATE OR REPLACE FUNCTION clean_misclassified_data() +RETURNS INTEGER AS $$ +DECLARE + deleted_count INTEGER := 0; +BEGIN + -- This function can be called to clean up any storage/system metrics that were + -- incorrectly stored in the update_events table before migration + + -- For now, just return 0 as we're keeping the old data for audit purposes + RETURN deleted_count; +END; +$$ LANGUAGE plpgsql; + +-- Grant permissions (adjust as needed for your setup) +-- GRANT ALL PRIVILEGES ON TABLE metrics TO redflag_user; +-- GRANT ALL PRIVILEGES ON TABLE docker_images TO redflag_user; +-- GRANT USAGE ON SCHEMA public TO redflag_user; \ No newline at end of file diff --git a/aggregator-web/src/pages/TokenManagement.tsx b/aggregator-web/src/pages/TokenManagement.tsx index dea973e..d50743a 100644 --- a/aggregator-web/src/pages/TokenManagement.tsx +++ b/aggregator-web/src/pages/TokenManagement.tsx @@ -96,7 +96,11 @@ const TokenManagement: React.FC = () => { }; const getServerUrl = () => { - return `${window.location.protocol}//${window.location.host}`; + // Use API server port (8080) instead of web UI port (3000) + const protocol = window.location.protocol; + const hostname = window.location.hostname; + const port = hostname === 'localhost' || hostname === '127.0.0.1' ? ':8080' : ''; + return `${protocol}//${hostname}${port}`; }; const copyToClipboard = async (text: string) => { diff --git a/aggregator-web/src/pages/settings/AgentManagement.tsx b/aggregator-web/src/pages/settings/AgentManagement.tsx index b45b7d9..c428cc6 100644 --- a/aggregator-web/src/pages/settings/AgentManagement.tsx +++ b/aggregator-web/src/pages/settings/AgentManagement.tsx @@ -48,7 +48,11 @@ const AgentManagement: React.FC = () => { ]; const getServerUrl = () => { - return `${window.location.protocol}//${window.location.host}`; + // Use API server port (8080) instead of web UI port (3000) + const protocol = window.location.protocol; + const hostname = window.location.hostname; + const port = hostname === 'localhost' || hostname === '127.0.0.1' ? ':8080' : ''; + return `${protocol}//${hostname}${port}`; }; const getActiveToken = () => {