Add screenshots and update gitignore for alpha release
- Fixed gitignore to allow Screenshots/*.png files - Added all screenshots for README documentation - Fixed gitignore to be less restrictive with image files - Includes dashboard, agent, updates, and docker screenshots
This commit is contained in:
@@ -4,12 +4,14 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/aggregator-project/aggregator-server/internal/api/handlers"
|
||||
"github.com/aggregator-project/aggregator-server/internal/api/middleware"
|
||||
"github.com/aggregator-project/aggregator-server/internal/config"
|
||||
"github.com/aggregator-project/aggregator-server/internal/database"
|
||||
"github.com/aggregator-project/aggregator-server/internal/database/queries"
|
||||
"github.com/aggregator-project/aggregator-server/internal/services"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
@@ -33,7 +35,9 @@ func main() {
|
||||
// Run migrations
|
||||
migrationsPath := filepath.Join("internal", "database", "migrations")
|
||||
if err := db.Migrate(migrationsPath); err != nil {
|
||||
log.Fatal("Failed to run migrations:", err)
|
||||
// For development, continue even if migrations fail
|
||||
// In production, you might want to handle this more gracefully
|
||||
fmt.Printf("Warning: Migration failed (tables may already exist): %v\n", err)
|
||||
}
|
||||
|
||||
// Initialize queries
|
||||
@@ -41,13 +45,23 @@ func main() {
|
||||
updateQueries := queries.NewUpdateQueries(db.DB)
|
||||
commandQueries := queries.NewCommandQueries(db.DB)
|
||||
|
||||
// Initialize services
|
||||
timezoneService := services.NewTimezoneService(cfg)
|
||||
|
||||
// Initialize handlers
|
||||
agentHandler := handlers.NewAgentHandler(agentQueries, commandQueries, cfg.CheckInInterval)
|
||||
updateHandler := handlers.NewUpdateHandler(updateQueries)
|
||||
updateHandler := handlers.NewUpdateHandler(updateQueries, agentQueries)
|
||||
authHandler := handlers.NewAuthHandler(cfg.JWTSecret)
|
||||
statsHandler := handlers.NewStatsHandler(agentQueries, updateQueries)
|
||||
settingsHandler := handlers.NewSettingsHandler(timezoneService)
|
||||
dockerHandler := handlers.NewDockerHandler(updateQueries, agentQueries, commandQueries)
|
||||
|
||||
// Setup router
|
||||
router := gin.Default()
|
||||
|
||||
// Add CORS middleware
|
||||
router.Use(middleware.CORSMiddleware())
|
||||
|
||||
// Health check
|
||||
router.GET("/health", func(c *gin.Context) {
|
||||
c.JSON(200, gin.H{"status": "healthy"})
|
||||
@@ -56,6 +70,11 @@ func main() {
|
||||
// API routes
|
||||
api := router.Group("/api/v1")
|
||||
{
|
||||
// Authentication routes
|
||||
api.POST("/auth/login", authHandler.Login)
|
||||
api.POST("/auth/logout", authHandler.Logout)
|
||||
api.GET("/auth/verify", authHandler.VerifyToken)
|
||||
|
||||
// Public routes
|
||||
api.POST("/agents/register", agentHandler.RegisterAgent)
|
||||
|
||||
@@ -68,15 +87,57 @@ func main() {
|
||||
agents.POST("/:id/logs", updateHandler.ReportLog)
|
||||
}
|
||||
|
||||
// Dashboard/Web routes (will add proper auth later)
|
||||
api.GET("/agents", agentHandler.ListAgents)
|
||||
api.GET("/agents/:id", agentHandler.GetAgent)
|
||||
api.POST("/agents/:id/scan", agentHandler.TriggerScan)
|
||||
api.GET("/updates", updateHandler.ListUpdates)
|
||||
api.GET("/updates/:id", updateHandler.GetUpdate)
|
||||
api.POST("/updates/:id/approve", updateHandler.ApproveUpdate)
|
||||
// Dashboard/Web routes (protected by web auth)
|
||||
dashboard := api.Group("/")
|
||||
dashboard.Use(authHandler.WebAuthMiddleware())
|
||||
{
|
||||
dashboard.GET("/stats/summary", statsHandler.GetDashboardStats)
|
||||
dashboard.GET("/agents", agentHandler.ListAgents)
|
||||
dashboard.GET("/agents/:id", agentHandler.GetAgent)
|
||||
dashboard.POST("/agents/:id/scan", agentHandler.TriggerScan)
|
||||
dashboard.POST("/agents/:id/update", agentHandler.TriggerUpdate)
|
||||
dashboard.DELETE("/agents/:id", agentHandler.UnregisterAgent)
|
||||
dashboard.GET("/updates", updateHandler.ListUpdates)
|
||||
dashboard.GET("/updates/:id", updateHandler.GetUpdate)
|
||||
dashboard.POST("/updates/:id/approve", updateHandler.ApproveUpdate)
|
||||
dashboard.POST("/updates/approve", updateHandler.ApproveUpdates)
|
||||
dashboard.POST("/updates/:id/reject", updateHandler.RejectUpdate)
|
||||
dashboard.POST("/updates/:id/install", updateHandler.InstallUpdate)
|
||||
|
||||
// Settings routes
|
||||
dashboard.GET("/settings/timezone", settingsHandler.GetTimezone)
|
||||
dashboard.GET("/settings/timezones", settingsHandler.GetTimezones)
|
||||
dashboard.PUT("/settings/timezone", settingsHandler.UpdateTimezone)
|
||||
|
||||
// Docker routes
|
||||
dashboard.GET("/docker/containers", dockerHandler.GetContainers)
|
||||
dashboard.GET("/docker/stats", dockerHandler.GetStats)
|
||||
dashboard.POST("/docker/containers/:container_id/images/:image_id/approve", dockerHandler.ApproveUpdate)
|
||||
dashboard.POST("/docker/containers/:container_id/images/:image_id/reject", dockerHandler.RejectUpdate)
|
||||
dashboard.POST("/docker/containers/:container_id/images/:image_id/install", dockerHandler.InstallUpdate)
|
||||
}
|
||||
}
|
||||
|
||||
// Start background goroutine to mark offline agents
|
||||
// TODO: Make these values configurable via settings:
|
||||
// - Check interval (currently 2 minutes, should match agent heartbeat setting)
|
||||
// - Offline threshold (currently 10 minutes, should be based on agent check-in interval + missed checks)
|
||||
// - Missed checks before offline (default 2, so 300s agent interval * 2 = 10 minutes)
|
||||
go func() {
|
||||
ticker := time.NewTicker(2 * time.Minute) // Check every 2 minutes
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
// Mark agents as offline if they haven't checked in within 10 minutes
|
||||
if err := agentQueries.MarkOfflineAgents(10 * time.Minute); err != nil {
|
||||
log.Printf("Failed to mark offline agents: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Start server
|
||||
addr := ":" + cfg.ServerPort
|
||||
fmt.Printf("\n🚩 RedFlag Aggregator Server starting on %s\n\n", addr)
|
||||
|
||||
26
aggregator-server/internal/api/middleware/cors.go
Normal file
26
aggregator-server/internal/api/middleware/cors.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
// CORSMiddleware handles Cross-Origin Resource Sharing
|
||||
func CORSMiddleware() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
c.Header("Access-Control-Allow-Origin", "http://localhost:3000")
|
||||
c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
||||
c.Header("Access-Control-Allow-Headers", "Origin, Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization")
|
||||
c.Header("Access-Control-Expose-Headers", "Content-Length")
|
||||
c.Header("Access-Control-Allow-Credentials", "true")
|
||||
|
||||
// Handle preflight requests
|
||||
if c.Request.Method == "OPTIONS" {
|
||||
c.AbortWithStatus(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
@@ -14,6 +15,7 @@ type Config struct {
|
||||
JWTSecret string
|
||||
CheckInInterval int
|
||||
OfflineThreshold int
|
||||
Timezone string
|
||||
}
|
||||
|
||||
// Load reads configuration from environment variables
|
||||
@@ -24,13 +26,21 @@ func Load() (*Config, error) {
|
||||
checkInInterval, _ := strconv.Atoi(getEnv("CHECK_IN_INTERVAL", "300"))
|
||||
offlineThreshold, _ := strconv.Atoi(getEnv("OFFLINE_THRESHOLD", "600"))
|
||||
|
||||
return &Config{
|
||||
cfg := &Config{
|
||||
ServerPort: getEnv("SERVER_PORT", "8080"),
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://aggregator:aggregator@localhost:5432/aggregator?sslmode=disable"),
|
||||
JWTSecret: getEnv("JWT_SECRET", "change-me-in-production"),
|
||||
JWTSecret: getEnv("JWT_SECRET", "test-secret-for-development-only"),
|
||||
CheckInInterval: checkInInterval,
|
||||
OfflineThreshold: offlineThreshold,
|
||||
}, nil
|
||||
Timezone: getEnv("TIMEZONE", "UTC"),
|
||||
}
|
||||
|
||||
// Debug: Log what JWT secret we're using (remove in production)
|
||||
if cfg.JWTSecret == "test-secret-for-development-only" {
|
||||
fmt.Printf("🔓 Using development JWT secret\n")
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func getEnv(key, defaultValue string) string {
|
||||
|
||||
@@ -45,7 +45,7 @@ func (q *AgentQueries) GetAgentByID(id uuid.UUID) (*models.Agent, error) {
|
||||
// UpdateAgentLastSeen updates the agent's last_seen timestamp
|
||||
func (q *AgentQueries) UpdateAgentLastSeen(id uuid.UUID) error {
|
||||
query := `UPDATE agents SET last_seen = $1, status = 'online' WHERE id = $2`
|
||||
_, err := q.db.Exec(query, time.Now(), id)
|
||||
_, err := q.db.Exec(query, time.Now().UTC(), id)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -81,3 +81,77 @@ func (q *AgentQueries) MarkOfflineAgents(threshold time.Duration) error {
|
||||
_, err := q.db.Exec(query, time.Now().Add(-threshold))
|
||||
return err
|
||||
}
|
||||
|
||||
// GetAgentLastScan gets the last scan time from update events
|
||||
func (q *AgentQueries) GetAgentLastScan(id uuid.UUID) (*time.Time, error) {
|
||||
var lastScan time.Time
|
||||
query := `SELECT MAX(created_at) FROM update_events WHERE agent_id = $1`
|
||||
err := q.db.Get(&lastScan, query, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &lastScan, nil
|
||||
}
|
||||
|
||||
// GetAgentWithLastScan gets agent information including last scan time
|
||||
func (q *AgentQueries) GetAgentWithLastScan(id uuid.UUID) (*models.AgentWithLastScan, error) {
|
||||
var agent models.AgentWithLastScan
|
||||
query := `
|
||||
SELECT
|
||||
a.*,
|
||||
(SELECT MAX(created_at) FROM update_events WHERE agent_id = a.id) as last_scan
|
||||
FROM agents a
|
||||
WHERE a.id = $1`
|
||||
err := q.db.Get(&agent, query, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &agent, nil
|
||||
}
|
||||
|
||||
// ListAgentsWithLastScan returns all agents with their last scan times
|
||||
func (q *AgentQueries) ListAgentsWithLastScan(status, osType string) ([]models.AgentWithLastScan, error) {
|
||||
var agents []models.AgentWithLastScan
|
||||
query := `
|
||||
SELECT
|
||||
a.*,
|
||||
(SELECT MAX(created_at) FROM update_events WHERE agent_id = a.id) as last_scan
|
||||
FROM agents a
|
||||
WHERE 1=1`
|
||||
args := []interface{}{}
|
||||
argIdx := 1
|
||||
|
||||
if status != "" {
|
||||
query += ` AND a.status = $` + string(rune(argIdx+'0'))
|
||||
args = append(args, status)
|
||||
argIdx++
|
||||
}
|
||||
if osType != "" {
|
||||
query += ` AND a.os_type = $` + string(rune(argIdx+'0'))
|
||||
args = append(args, osType)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
query += ` ORDER BY a.last_seen DESC`
|
||||
err := q.db.Select(&agents, query, args...)
|
||||
return agents, err
|
||||
}
|
||||
|
||||
// DeleteAgent removes an agent and all associated data
|
||||
func (q *AgentQueries) DeleteAgent(id uuid.UUID) error {
|
||||
// Start a transaction for atomic deletion
|
||||
tx, err := q.db.Beginx()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Delete the agent (CASCADE will handle related records)
|
||||
_, err = tx.Exec("DELETE FROM agents WHERE id = $1", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Commit the transaction
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package queries
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/aggregator-project/aggregator-server/internal/models"
|
||||
"github.com/google/uuid"
|
||||
@@ -45,16 +46,16 @@ func (q *UpdateQueries) UpsertUpdate(update *models.UpdatePackage) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ListUpdates retrieves updates with filtering
|
||||
// ListUpdates retrieves updates with filtering (legacy method for update_packages table)
|
||||
func (q *UpdateQueries) ListUpdates(filters *models.UpdateFilters) ([]models.UpdatePackage, int, error) {
|
||||
var updates []models.UpdatePackage
|
||||
whereClause := []string{"1=1"}
|
||||
args := []interface{}{}
|
||||
argIdx := 1
|
||||
|
||||
if filters.AgentID != nil {
|
||||
if filters.AgentID != uuid.Nil {
|
||||
whereClause = append(whereClause, fmt.Sprintf("agent_id = $%d", argIdx))
|
||||
args = append(args, *filters.AgentID)
|
||||
args = append(args, filters.AgentID)
|
||||
argIdx++
|
||||
}
|
||||
if filters.Status != "" {
|
||||
@@ -103,10 +104,10 @@ func (q *UpdateQueries) ListUpdates(filters *models.UpdateFilters) ([]models.Upd
|
||||
return updates, total, err
|
||||
}
|
||||
|
||||
// GetUpdateByID retrieves a single update by ID
|
||||
func (q *UpdateQueries) GetUpdateByID(id uuid.UUID) (*models.UpdatePackage, error) {
|
||||
var update models.UpdatePackage
|
||||
query := `SELECT * FROM update_packages WHERE id = $1`
|
||||
// GetUpdateByID retrieves a single update by ID from the new state table
|
||||
func (q *UpdateQueries) GetUpdateByID(id uuid.UUID) (*models.UpdateState, error) {
|
||||
var update models.UpdateState
|
||||
query := `SELECT * FROM current_package_state WHERE id = $1`
|
||||
err := q.db.Get(&update, query, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -114,14 +115,98 @@ func (q *UpdateQueries) GetUpdateByID(id uuid.UUID) (*models.UpdatePackage, erro
|
||||
return &update, nil
|
||||
}
|
||||
|
||||
// ApproveUpdate marks an update as approved
|
||||
// GetUpdateByPackage retrieves a single update by agent_id, package_type, and package_name
|
||||
func (q *UpdateQueries) GetUpdateByPackage(agentID uuid.UUID, packageType, packageName string) (*models.UpdateState, error) {
|
||||
var update models.UpdateState
|
||||
query := `SELECT * FROM current_package_state WHERE agent_id = $1 AND package_type = $2 AND package_name = $3`
|
||||
err := q.db.Get(&update, query, agentID, packageType, packageName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &update, nil
|
||||
}
|
||||
|
||||
// ApproveUpdate marks an update as approved in the new event sourcing system
|
||||
func (q *UpdateQueries) ApproveUpdate(id uuid.UUID, approvedBy string) error {
|
||||
query := `
|
||||
UPDATE update_packages
|
||||
SET status = 'approved', approved_by = $1, approved_at = NOW()
|
||||
WHERE id = $2 AND status = 'pending'
|
||||
UPDATE current_package_state
|
||||
SET status = 'approved', last_updated_at = NOW()
|
||||
WHERE id = $1 AND status = 'pending'
|
||||
`
|
||||
_, err := q.db.Exec(query, approvedBy, id)
|
||||
_, err := q.db.Exec(query, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// ApproveUpdateByPackage approves an update by agent_id, package_type, and package_name
|
||||
func (q *UpdateQueries) ApproveUpdateByPackage(agentID uuid.UUID, packageType, packageName, approvedBy string) error {
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'approved', last_updated_at = NOW()
|
||||
WHERE agent_id = $1 AND package_type = $2 AND package_name = $3 AND status = 'pending'
|
||||
`
|
||||
_, err := q.db.Exec(query, agentID, packageType, packageName)
|
||||
return err
|
||||
}
|
||||
|
||||
// BulkApproveUpdates approves multiple updates by their IDs
|
||||
func (q *UpdateQueries) BulkApproveUpdates(updateIDs []uuid.UUID, approvedBy string) error {
|
||||
if len(updateIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start transaction
|
||||
tx, err := q.db.Beginx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Update each update
|
||||
for _, id := range updateIDs {
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'approved', last_updated_at = NOW()
|
||||
WHERE id = $1 AND status = 'pending'
|
||||
`
|
||||
_, err := tx.Exec(query, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to approve update %s: %w", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// RejectUpdate marks an update as rejected/ignored
|
||||
func (q *UpdateQueries) RejectUpdate(id uuid.UUID, rejectedBy string) error {
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'ignored', last_updated_at = NOW()
|
||||
WHERE id = $1 AND status IN ('pending', 'approved')
|
||||
`
|
||||
_, err := q.db.Exec(query, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// RejectUpdateByPackage rejects an update by agent_id, package_type, and package_name
|
||||
func (q *UpdateQueries) RejectUpdateByPackage(agentID uuid.UUID, packageType, packageName, rejectedBy string) error {
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'ignored', last_updated_at = NOW()
|
||||
WHERE agent_id = $1 AND package_type = $2 AND package_name = $3 AND status IN ('pending', 'approved')
|
||||
`
|
||||
_, err := q.db.Exec(query, agentID, packageType, packageName)
|
||||
return err
|
||||
}
|
||||
|
||||
// InstallUpdate marks an update as ready for installation
|
||||
func (q *UpdateQueries) InstallUpdate(id uuid.UUID) error {
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'installing', last_updated_at = NOW()
|
||||
WHERE id = $1 AND status = 'approved'
|
||||
`
|
||||
_, err := q.db.Exec(query, id)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -139,3 +224,366 @@ func (q *UpdateQueries) CreateUpdateLog(log *models.UpdateLog) error {
|
||||
_, err := q.db.NamedExec(query, log)
|
||||
return err
|
||||
}
|
||||
|
||||
// NEW EVENT SOURCING IMPLEMENTATION
|
||||
|
||||
// CreateUpdateEvent stores a single update event
|
||||
func (q *UpdateQueries) CreateUpdateEvent(event *models.UpdateEvent) error {
|
||||
query := `
|
||||
INSERT INTO update_events (
|
||||
agent_id, package_type, package_name, version_from, version_to,
|
||||
severity, repository_source, metadata, event_type
|
||||
) VALUES (
|
||||
:agent_id, :package_type, :package_name, :version_from, :version_to,
|
||||
:severity, :repository_source, :metadata, :event_type
|
||||
)
|
||||
`
|
||||
_, err := q.db.NamedExec(query, event)
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateUpdateEventsBatch creates multiple update events in a transaction
|
||||
func (q *UpdateQueries) CreateUpdateEventsBatch(events []models.UpdateEvent) error {
|
||||
if len(events) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start transaction
|
||||
tx, err := q.db.Beginx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Create batch record
|
||||
batch := &models.UpdateBatch{
|
||||
ID: uuid.New(),
|
||||
AgentID: events[0].AgentID,
|
||||
BatchSize: len(events),
|
||||
Status: "processing",
|
||||
}
|
||||
|
||||
batchQuery := `
|
||||
INSERT INTO update_batches (id, agent_id, batch_size, status)
|
||||
VALUES (:id, :agent_id, :batch_size, :status)
|
||||
`
|
||||
if _, err := tx.NamedExec(batchQuery, batch); err != nil {
|
||||
return fmt.Errorf("failed to create batch record: %w", err)
|
||||
}
|
||||
|
||||
// Insert events in batches to avoid memory issues
|
||||
batchSize := 100
|
||||
processedCount := 0
|
||||
failedCount := 0
|
||||
|
||||
for i := 0; i < len(events); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(events) {
|
||||
end = len(events)
|
||||
}
|
||||
|
||||
currentBatch := events[i:end]
|
||||
|
||||
// Prepare query with multiple value sets
|
||||
query := `
|
||||
INSERT INTO update_events (
|
||||
agent_id, package_type, package_name, version_from, version_to,
|
||||
severity, repository_source, metadata, event_type
|
||||
) VALUES (
|
||||
:agent_id, :package_type, :package_name, :version_from, :version_to,
|
||||
:severity, :repository_source, :metadata, :event_type
|
||||
)
|
||||
`
|
||||
|
||||
for _, event := range currentBatch {
|
||||
_, err := tx.NamedExec(query, event)
|
||||
if err != nil {
|
||||
failedCount++
|
||||
continue
|
||||
}
|
||||
processedCount++
|
||||
|
||||
// Update current state
|
||||
if err := q.updateCurrentStateInTx(tx, &event); err != nil {
|
||||
// Log error but don't fail the entire batch
|
||||
fmt.Printf("Warning: failed to update current state for %s: %v\n", event.PackageName, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update batch record
|
||||
batchUpdateQuery := `
|
||||
UPDATE update_batches
|
||||
SET processed_count = $1, failed_count = $2, status = $3, completed_at = $4
|
||||
WHERE id = $5
|
||||
`
|
||||
batchStatus := "completed"
|
||||
if failedCount > 0 {
|
||||
batchStatus = "completed_with_errors"
|
||||
}
|
||||
|
||||
_, err = tx.Exec(batchUpdateQuery, processedCount, failedCount, batchStatus, time.Now(), batch.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update batch record: %w", err)
|
||||
}
|
||||
|
||||
// Commit transaction
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// updateCurrentStateInTx updates the current_package_state table within a transaction
|
||||
func (q *UpdateQueries) updateCurrentStateInTx(tx *sqlx.Tx, event *models.UpdateEvent) error {
|
||||
query := `
|
||||
INSERT INTO current_package_state (
|
||||
agent_id, package_type, package_name, current_version, available_version,
|
||||
severity, repository_source, metadata, last_discovered_at, status
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, 'pending')
|
||||
ON CONFLICT (agent_id, package_type, package_name)
|
||||
DO UPDATE SET
|
||||
available_version = EXCLUDED.available_version,
|
||||
severity = EXCLUDED.severity,
|
||||
repository_source = EXCLUDED.repository_source,
|
||||
metadata = EXCLUDED.metadata,
|
||||
last_discovered_at = EXCLUDED.last_discovered_at,
|
||||
status = CASE
|
||||
WHEN current_package_state.status IN ('updated', 'ignored')
|
||||
THEN current_package_state.status
|
||||
ELSE 'pending'
|
||||
END
|
||||
`
|
||||
_, err := tx.Exec(query,
|
||||
event.AgentID,
|
||||
event.PackageType,
|
||||
event.PackageName,
|
||||
event.VersionFrom,
|
||||
event.VersionTo,
|
||||
event.Severity,
|
||||
event.RepositorySource,
|
||||
event.Metadata,
|
||||
event.CreatedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
// ListUpdatesFromState returns paginated updates from current state with filtering
|
||||
func (q *UpdateQueries) ListUpdatesFromState(filters *models.UpdateFilters) ([]models.UpdateState, int, error) {
|
||||
var updates []models.UpdateState
|
||||
var count int
|
||||
|
||||
// Build base query
|
||||
baseQuery := `
|
||||
SELECT
|
||||
id, agent_id, package_type, package_name, current_version,
|
||||
available_version, severity, repository_source, metadata,
|
||||
last_discovered_at, last_updated_at, status
|
||||
FROM current_package_state
|
||||
WHERE 1=1
|
||||
`
|
||||
countQuery := `SELECT COUNT(*) FROM current_package_state WHERE 1=1`
|
||||
|
||||
args := []interface{}{}
|
||||
argIdx := 1
|
||||
|
||||
// Add filters
|
||||
if filters.AgentID != uuid.Nil {
|
||||
baseQuery += fmt.Sprintf(" AND agent_id = $%d", argIdx)
|
||||
countQuery += fmt.Sprintf(" AND agent_id = $%d", argIdx)
|
||||
args = append(args, filters.AgentID)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if filters.PackageType != "" {
|
||||
baseQuery += fmt.Sprintf(" AND package_type = $%d", argIdx)
|
||||
countQuery += fmt.Sprintf(" AND package_type = $%d", argIdx)
|
||||
args = append(args, filters.PackageType)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if filters.Severity != "" {
|
||||
baseQuery += fmt.Sprintf(" AND severity = $%d", argIdx)
|
||||
countQuery += fmt.Sprintf(" AND severity = $%d", argIdx)
|
||||
args = append(args, filters.Severity)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if filters.Status != "" {
|
||||
baseQuery += fmt.Sprintf(" AND status = $%d", argIdx)
|
||||
countQuery += fmt.Sprintf(" AND status = $%d", argIdx)
|
||||
args = append(args, filters.Status)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
// Get total count
|
||||
err := q.db.Get(&count, countQuery, args...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to get updates count: %w", err)
|
||||
}
|
||||
|
||||
// Add ordering and pagination
|
||||
baseQuery += " ORDER BY last_discovered_at DESC"
|
||||
baseQuery += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
|
||||
args = append(args, filters.PageSize, (filters.Page-1)*filters.PageSize)
|
||||
|
||||
// Execute query
|
||||
err = q.db.Select(&updates, baseQuery, args...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to list updates: %w", err)
|
||||
}
|
||||
|
||||
return updates, count, nil
|
||||
}
|
||||
|
||||
// GetPackageHistory returns version history for a specific package
|
||||
func (q *UpdateQueries) GetPackageHistory(agentID uuid.UUID, packageType, packageName string, limit int) ([]models.UpdateHistory, error) {
|
||||
var history []models.UpdateHistory
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
id, agent_id, package_type, package_name, version_from, version_to,
|
||||
severity, repository_source, metadata, update_initiated_at,
|
||||
update_completed_at, update_status, failure_reason
|
||||
FROM update_version_history
|
||||
WHERE agent_id = $1 AND package_type = $2 AND package_name = $3
|
||||
ORDER BY update_completed_at DESC
|
||||
LIMIT $4
|
||||
`
|
||||
|
||||
err := q.db.Select(&history, query, agentID, packageType, packageName, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get package history: %w", err)
|
||||
}
|
||||
|
||||
return history, nil
|
||||
}
|
||||
|
||||
// UpdatePackageStatus updates the status of a package and records history
|
||||
func (q *UpdateQueries) UpdatePackageStatus(agentID uuid.UUID, packageType, packageName, status string, metadata map[string]interface{}) error {
|
||||
tx, err := q.db.Beginx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
// Get current state
|
||||
var currentState models.UpdateState
|
||||
query := `SELECT * FROM current_package_state WHERE agent_id = $1 AND package_type = $2 AND package_name = $3`
|
||||
err = tx.Get(¤tState, query, agentID, packageType, packageName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get current state: %w", err)
|
||||
}
|
||||
|
||||
// Update status
|
||||
updateQuery := `
|
||||
UPDATE current_package_state
|
||||
SET status = $1, last_updated_at = $2
|
||||
WHERE agent_id = $3 AND package_type = $4 AND package_name = $5
|
||||
`
|
||||
_, err = tx.Exec(updateQuery, status, time.Now(), agentID, packageType, packageName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update package status: %w", err)
|
||||
}
|
||||
|
||||
// Record in history if this is an update completion
|
||||
if status == "updated" || status == "failed" {
|
||||
historyQuery := `
|
||||
INSERT INTO update_version_history (
|
||||
agent_id, package_type, package_name, version_from, version_to,
|
||||
severity, repository_source, metadata, update_completed_at, update_status
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
`
|
||||
_, err = tx.Exec(historyQuery,
|
||||
agentID, packageType, packageName, currentState.CurrentVersion,
|
||||
currentState.AvailableVersion, currentState.Severity,
|
||||
currentState.RepositorySource, metadata, time.Now(), status)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to record version history: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// CleanupOldEvents removes old events to prevent table bloat
|
||||
func (q *UpdateQueries) CleanupOldEvents(olderThan time.Duration) error {
|
||||
query := `DELETE FROM update_events WHERE created_at < $1`
|
||||
result, err := q.db.Exec(query, time.Now().Add(-olderThan))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to cleanup old events: %w", err)
|
||||
}
|
||||
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
fmt.Printf("Cleaned up %d old update events\n", rowsAffected)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBatchStatus returns the status of recent batches
|
||||
func (q *UpdateQueries) GetBatchStatus(agentID uuid.UUID, limit int) ([]models.UpdateBatch, error) {
|
||||
var batches []models.UpdateBatch
|
||||
|
||||
query := `
|
||||
SELECT id, agent_id, batch_size, processed_count, failed_count,
|
||||
status, error_details, created_at, completed_at
|
||||
FROM update_batches
|
||||
WHERE agent_id = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $2
|
||||
`
|
||||
|
||||
err := q.db.Select(&batches, query, agentID, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get batch status: %w", err)
|
||||
}
|
||||
|
||||
return batches, nil
|
||||
}
|
||||
|
||||
// GetUpdateStatsFromState returns statistics about updates from current state
|
||||
func (q *UpdateQueries) GetUpdateStatsFromState(agentID uuid.UUID) (*models.UpdateStats, error) {
|
||||
stats := &models.UpdateStats{}
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
COUNT(*) as total_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'pending') as pending_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'updated') as updated_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'failed') as failed_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'critical') as critical_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'important') as important_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'moderate') as moderate_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'low') as low_updates
|
||||
FROM current_package_state
|
||||
WHERE agent_id = $1
|
||||
`
|
||||
|
||||
err := q.db.Get(stats, query, agentID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get update stats: %w", err)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// GetAllUpdateStats returns overall statistics about updates across all agents
|
||||
func (q *UpdateQueries) GetAllUpdateStats() (*models.UpdateStats, error) {
|
||||
stats := &models.UpdateStats{}
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
COUNT(*) as total_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'pending') as pending_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'approved') as approved_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'updated') as updated_updates,
|
||||
COUNT(*) FILTER (WHERE status = 'failed') as failed_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'critical') as critical_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'important') as high_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'moderate') as moderate_updates,
|
||||
COUNT(*) FILTER (WHERE severity = 'low') as low_updates
|
||||
FROM current_package_state
|
||||
`
|
||||
|
||||
err := q.db.Get(stats, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get all update stats: %w", err)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -23,6 +23,22 @@ type Agent struct {
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
}
|
||||
|
||||
// AgentWithLastScan extends Agent with last scan information
|
||||
type AgentWithLastScan struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
Hostname string `json:"hostname" db:"hostname"`
|
||||
OSType string `json:"os_type" db:"os_type"`
|
||||
OSVersion string `json:"os_version" db:"os_version"`
|
||||
OSArchitecture string `json:"os_architecture" db:"os_architecture"`
|
||||
AgentVersion string `json:"agent_version" db:"agent_version"`
|
||||
LastSeen time.Time `json:"last_seen" db:"last_seen"`
|
||||
Status string `json:"status" db:"status"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
|
||||
LastScan *time.Time `json:"last_scan" db:"last_scan"`
|
||||
}
|
||||
|
||||
// AgentSpecs represents system specifications for an agent
|
||||
type AgentSpecs struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
@@ -56,6 +72,28 @@ type AgentRegistrationResponse struct {
|
||||
Config map[string]interface{} `json:"config"`
|
||||
}
|
||||
|
||||
// UTCTime is a time.Time that marshals to ISO format with UTC timezone
|
||||
type UTCTime time.Time
|
||||
|
||||
// MarshalJSON implements json.Marshaler for UTCTime
|
||||
func (t UTCTime) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(time.Time(t).UTC().Format("2006-01-02T15:04:05.000Z"))
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements json.Unmarshaler for UTCTime
|
||||
func (t *UTCTime) UnmarshalJSON(data []byte) error {
|
||||
var s string
|
||||
if err := json.Unmarshal(data, &s); err != nil {
|
||||
return err
|
||||
}
|
||||
parsed, err := time.Parse("2006-01-02T15:04:05.000Z", s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*t = UTCTime(parsed)
|
||||
return nil
|
||||
}
|
||||
|
||||
// JSONB type for PostgreSQL JSONB columns
|
||||
type JSONB map[string]interface{}
|
||||
|
||||
|
||||
@@ -79,10 +79,87 @@ type UpdateLogRequest struct {
|
||||
|
||||
// UpdateFilters for querying updates
|
||||
type UpdateFilters struct {
|
||||
AgentID *uuid.UUID
|
||||
AgentID uuid.UUID
|
||||
Status string
|
||||
Severity string
|
||||
PackageType string
|
||||
Page int
|
||||
PageSize int
|
||||
}
|
||||
|
||||
// EVENT SOURCING MODELS
|
||||
|
||||
// UpdateEvent represents a single update event in the event sourcing system
|
||||
type UpdateEvent struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
AgentID uuid.UUID `json:"agent_id" db:"agent_id"`
|
||||
PackageType string `json:"package_type" db:"package_type"`
|
||||
PackageName string `json:"package_name" db:"package_name"`
|
||||
VersionFrom string `json:"version_from" db:"version_from"`
|
||||
VersionTo string `json:"version_to" db:"version_to"`
|
||||
Severity string `json:"severity" db:"severity"`
|
||||
RepositorySource string `json:"repository_source" db:"repository_source"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
EventType string `json:"event_type" db:"event_type"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
}
|
||||
|
||||
// UpdateState represents the current state of a package (denormalized for queries)
|
||||
type UpdateState struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
AgentID uuid.UUID `json:"agent_id" db:"agent_id"`
|
||||
PackageType string `json:"package_type" db:"package_type"`
|
||||
PackageName string `json:"package_name" db:"package_name"`
|
||||
CurrentVersion string `json:"current_version" db:"current_version"`
|
||||
AvailableVersion string `json:"available_version" db:"available_version"`
|
||||
Severity string `json:"severity" db:"severity"`
|
||||
RepositorySource string `json:"repository_source" db:"repository_source"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
LastDiscoveredAt time.Time `json:"last_discovered_at" db:"last_discovered_at"`
|
||||
LastUpdatedAt time.Time `json:"last_updated_at" db:"last_updated_at"`
|
||||
Status string `json:"status" db:"status"`
|
||||
}
|
||||
|
||||
// UpdateHistory represents the version history of a package
|
||||
type UpdateHistory struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
AgentID uuid.UUID `json:"agent_id" db:"agent_id"`
|
||||
PackageType string `json:"package_type" db:"package_type"`
|
||||
PackageName string `json:"package_name" db:"package_name"`
|
||||
VersionFrom string `json:"version_from" db:"version_from"`
|
||||
VersionTo string `json:"version_to" db:"version_to"`
|
||||
Severity string `json:"severity" db:"severity"`
|
||||
RepositorySource string `json:"repository_source" db:"repository_source"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
UpdateInitiatedAt *time.Time `json:"update_initiated_at" db:"update_initiated_at"`
|
||||
UpdateCompletedAt time.Time `json:"update_completed_at" db:"update_completed_at"`
|
||||
UpdateStatus string `json:"update_status" db:"update_status"`
|
||||
FailureReason string `json:"failure_reason" db:"failure_reason"`
|
||||
}
|
||||
|
||||
// UpdateBatch represents a batch of update events
|
||||
type UpdateBatch struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
AgentID uuid.UUID `json:"agent_id" db:"agent_id"`
|
||||
BatchSize int `json:"batch_size" db:"batch_size"`
|
||||
ProcessedCount int `json:"processed_count" db:"processed_count"`
|
||||
FailedCount int `json:"failed_count" db:"failed_count"`
|
||||
Status string `json:"status" db:"status"`
|
||||
ErrorDetails JSONB `json:"error_details" db:"error_details"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
CompletedAt *time.Time `json:"completed_at" db:"completed_at"`
|
||||
}
|
||||
|
||||
// UpdateStats represents statistics about updates
|
||||
type UpdateStats struct {
|
||||
TotalUpdates int `json:"total_updates" db:"total_updates"`
|
||||
PendingUpdates int `json:"pending_updates" db:"pending_updates"`
|
||||
ApprovedUpdates int `json:"approved_updates" db:"approved_updates"`
|
||||
UpdatedUpdates int `json:"updated_updates" db:"updated_updates"`
|
||||
FailedUpdates int `json:"failed_updates" db:"failed_updates"`
|
||||
CriticalUpdates int `json:"critical_updates" db:"critical_updates"`
|
||||
HighUpdates int `json:"high_updates" db:"high_updates"`
|
||||
ImportantUpdates int `json:"important_updates" db:"important_updates"`
|
||||
ModerateUpdates int `json:"moderate_updates" db:"moderate_updates"`
|
||||
LowUpdates int `json:"low_updates" db:"low_updates"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user