Files
Redflag/aggregator-server/internal/database/queries/metrics.go
Fimeg eccc38d7c9 feat: separate data classification architecture
- Create separate scanner interfaces for storage, system, and docker data
- Add dedicated endpoints for metrics and docker images instead of misclassifying as updates
- Implement proper database tables for storage metrics and docker images
- Fix storage/system metrics appearing incorrectly as package updates
- Add scanner types with proper data structures for each subsystem
- Update agent handlers to use correct endpoints for each data type
2025-11-03 21:44:48 -05:00

287 lines
7.0 KiB
Go

package queries
import (
"database/sql"
"fmt"
"time"
"github.com/Fimeg/RedFlag/aggregator-server/internal/models"
"github.com/google/uuid"
"github.com/lib/pq"
)
// MetricsQueries handles database operations for metrics
type MetricsQueries struct {
db *sql.DB
}
func NewMetricsQueries(db *sql.DB) *MetricsQueries {
return &MetricsQueries{db: db}
}
// CreateMetricsEventsBatch creates multiple metric events in a single transaction
func (q *MetricsQueries) CreateMetricsEventsBatch(events []models.StoredMetric) error {
if len(events) == 0 {
return nil
}
tx, err := q.db.Begin()
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()
// Prepare the insert statement
stmt, err := tx.Prepare(`
INSERT INTO metrics (
id, agent_id, package_type, package_name, current_version, available_version,
severity, repository_source, metadata, event_type, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (agent_id, package_name, package_type, created_at) DO NOTHING
`)
if err != nil {
return fmt.Errorf("failed to prepare statement: %w", err)
}
defer stmt.Close()
// Insert each event with error isolation
for _, event := range events {
_, err := stmt.Exec(
event.ID,
event.AgentID,
event.PackageType,
event.PackageName,
event.CurrentVersion,
event.AvailableVersion,
event.Severity,
event.RepositorySource,
event.Metadata,
event.EventType,
event.CreatedAt,
)
if err != nil {
// Log error but continue with other events
fmt.Printf("Warning: Failed to insert metric event %s: %v\n", event.ID, err)
continue
}
}
return tx.Commit()
}
// GetMetrics retrieves metrics based on filter criteria
func (q *MetricsQueries) GetMetrics(filter *models.MetricFilter) (*models.MetricResult, error) {
query := `
SELECT id, agent_id, package_type, package_name, current_version, available_version,
severity, repository_source, metadata, event_type, created_at
FROM metrics
WHERE 1=1
`
args := []interface{}{}
argIndex := 1
// Build WHERE clause
if filter.AgentID != nil {
query += fmt.Sprintf(" AND agent_id = $%d", argIndex)
args = append(args, *filter.AgentID)
argIndex++
}
if filter.PackageType != nil {
query += fmt.Sprintf(" AND package_type = $%d", argIndex)
args = append(args, *filter.PackageType)
argIndex++
}
if filter.Severity != nil {
query += fmt.Sprintf(" AND severity = $%d", argIndex)
args = append(args, *filter.Severity)
argIndex++
}
// Add ordering and pagination
query += " ORDER BY created_at DESC"
if filter.Limit != nil {
query += fmt.Sprintf(" LIMIT $%d", argIndex)
args = append(args, *filter.Limit)
argIndex++
}
if filter.Offset != nil {
query += fmt.Sprintf(" OFFSET $%d", argIndex)
args = append(args, *filter.Offset)
argIndex++
}
rows, err := q.db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query metrics: %w", err)
}
defer rows.Close()
var metrics []models.StoredMetric
for rows.Next() {
var metric models.StoredMetric
err := rows.Scan(
&metric.ID,
&metric.AgentID,
&metric.PackageType,
&metric.PackageName,
&metric.CurrentVersion,
&metric.AvailableVersion,
&metric.Severity,
&metric.RepositorySource,
&metric.Metadata,
&metric.EventType,
&metric.CreatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to scan metric: %w", err)
}
metrics = append(metrics, metric)
}
// Get total count
countQuery := `SELECT COUNT(*) FROM metrics WHERE 1=1`
countArgs := []interface{}{}
countIndex := 1
if filter.AgentID != nil {
countQuery += fmt.Sprintf(" AND agent_id = $%d", countIndex)
countArgs = append(countArgs, *filter.AgentID)
countIndex++
}
if filter.PackageType != nil {
countQuery += fmt.Sprintf(" AND package_type = $%d", countIndex)
countArgs = append(countArgs, *filter.PackageType)
countIndex++
}
if filter.Severity != nil {
countQuery += fmt.Sprintf(" AND severity = $%d", countIndex)
countArgs = append(countArgs, *filter.Severity)
countIndex++
}
var total int
err = q.db.QueryRow(countQuery, countArgs...).Scan(&total)
if err != nil {
return nil, fmt.Errorf("failed to count metrics: %w", err)
}
// Calculate pagination
page := 1
perPage := 50
if filter.Offset != nil && filter.Limit != nil {
page = (*filter.Offset / *filter.Limit) + 1
perPage = *filter.Limit
}
return &models.MetricResult{
Metrics: metrics,
Total: total,
Page: page,
PerPage: perPage,
}, nil
}
// GetMetricsByAgentID retrieves metrics for a specific agent
func (q *MetricsQueries) GetMetricsByAgentID(agentID uuid.UUID, limit int) ([]models.StoredMetric, error) {
query := `
SELECT id, agent_id, package_type, package_name, current_version, available_version,
severity, repository_source, metadata, event_type, created_at
FROM metrics
WHERE agent_id = $1
ORDER BY created_at DESC
LIMIT $2
`
rows, err := q.db.Query(query, agentID, limit)
if err != nil {
return nil, fmt.Errorf("failed to query metrics by agent: %w", err)
}
defer rows.Close()
var metrics []models.StoredMetric
for rows.Next() {
var metric models.StoredMetric
err := rows.Scan(
&metric.ID,
&metric.AgentID,
&metric.PackageType,
&metric.PackageName,
&metric.CurrentVersion,
&metric.AvailableVersion,
&metric.Severity,
&metric.RepositorySource,
&metric.Metadata,
&metric.EventType,
&metric.CreatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to scan metric: %w", err)
}
metrics = append(metrics, metric)
}
return metrics, nil
}
// GetLatestMetricsByType retrieves the latest metrics for a specific type
func (q *MetricsQueries) GetLatestMetricsByType(agentID uuid.UUID, packageType string) (*models.StoredMetric, error) {
query := `
SELECT id, agent_id, package_type, package_name, current_version, available_version,
severity, repository_source, metadata, event_type, created_at
FROM metrics
WHERE agent_id = $1 AND package_type = $2
ORDER BY created_at DESC
LIMIT 1
`
var metric models.StoredMetric
err := q.db.QueryRow(query, agentID, packageType).Scan(
&metric.ID,
&metric.AgentID,
&metric.PackageType,
&metric.PackageName,
&metric.CurrentVersion,
&metric.AvailableVersion,
&metric.Severity,
&metric.RepositorySource,
&metric.Metadata,
&metric.EventType,
&metric.CreatedAt,
)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get latest metric: %w", err)
}
return &metric, nil
}
// DeleteOldMetrics deletes metrics older than the specified number of days
func (q *MetricsQueries) DeleteOldMetrics(days int) error {
query := `DELETE FROM metrics WHERE created_at < NOW() - INTERVAL '1 day' * $1`
result, err := q.db.Exec(query, days)
if err != nil {
return fmt.Errorf("failed to delete old metrics: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected > 0 {
fmt.Printf("Deleted %d old metric records\n", rowsAffected)
}
return nil
}