Update README with current features and screenshots
- Cross-platform support (Windows/Linux) with Windows Updates and Winget - Added dependency confirmation workflow and refresh token authentication - New screenshots: History, Live Operations, Windows Agent Details - Local CLI features with terminal output and cache system - Updated known limitations - Proxmox integration is broken - Organized docs to docs/ folder and updated .gitignore - Probably introduced a dozen bugs with Windows agents - stay tuned
This commit is contained in:
@@ -8,21 +8,26 @@ import (
|
||||
"github.com/aggregator-project/aggregator-server/internal/api/middleware"
|
||||
"github.com/aggregator-project/aggregator-server/internal/database/queries"
|
||||
"github.com/aggregator-project/aggregator-server/internal/models"
|
||||
"github.com/aggregator-project/aggregator-server/internal/utils"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type AgentHandler struct {
|
||||
agentQueries *queries.AgentQueries
|
||||
commandQueries *queries.CommandQueries
|
||||
checkInInterval int
|
||||
agentQueries *queries.AgentQueries
|
||||
commandQueries *queries.CommandQueries
|
||||
refreshTokenQueries *queries.RefreshTokenQueries
|
||||
checkInInterval int
|
||||
latestAgentVersion string
|
||||
}
|
||||
|
||||
func NewAgentHandler(aq *queries.AgentQueries, cq *queries.CommandQueries, checkInInterval int) *AgentHandler {
|
||||
func NewAgentHandler(aq *queries.AgentQueries, cq *queries.CommandQueries, rtq *queries.RefreshTokenQueries, checkInInterval int, latestAgentVersion string) *AgentHandler {
|
||||
return &AgentHandler{
|
||||
agentQueries: aq,
|
||||
commandQueries: cq,
|
||||
checkInInterval: checkInInterval,
|
||||
agentQueries: aq,
|
||||
commandQueries: cq,
|
||||
refreshTokenQueries: rtq,
|
||||
checkInInterval: checkInInterval,
|
||||
latestAgentVersion: latestAgentVersion,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,17 +65,32 @@ func (h *AgentHandler) RegisterAgent(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
// Generate JWT token
|
||||
// Generate JWT access token (short-lived: 24 hours)
|
||||
token, err := middleware.GenerateAgentToken(agent.ID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to generate token"})
|
||||
return
|
||||
}
|
||||
|
||||
// Return response
|
||||
// Generate refresh token (long-lived: 90 days)
|
||||
refreshToken, err := queries.GenerateRefreshToken()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to generate refresh token"})
|
||||
return
|
||||
}
|
||||
|
||||
// Store refresh token in database with 90-day expiration
|
||||
refreshTokenExpiry := time.Now().Add(90 * 24 * time.Hour)
|
||||
if err := h.refreshTokenQueries.CreateRefreshToken(agent.ID, refreshToken, refreshTokenExpiry); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to store refresh token"})
|
||||
return
|
||||
}
|
||||
|
||||
// Return response with both tokens
|
||||
response := models.AgentRegistrationResponse{
|
||||
AgentID: agent.ID,
|
||||
Token: token,
|
||||
AgentID: agent.ID,
|
||||
Token: token,
|
||||
RefreshToken: refreshToken,
|
||||
Config: map[string]interface{}{
|
||||
"check_in_interval": h.checkInInterval,
|
||||
"server_url": c.Request.Host,
|
||||
@@ -81,15 +101,123 @@ func (h *AgentHandler) RegisterAgent(c *gin.Context) {
|
||||
}
|
||||
|
||||
// GetCommands returns pending commands for an agent
|
||||
// Agents can optionally send lightweight system metrics in request body
|
||||
func (h *AgentHandler) GetCommands(c *gin.Context) {
|
||||
agentID := c.MustGet("agent_id").(uuid.UUID)
|
||||
|
||||
// Try to parse optional system metrics from request body
|
||||
var metrics struct {
|
||||
CPUPercent float64 `json:"cpu_percent,omitempty"`
|
||||
MemoryPercent float64 `json:"memory_percent,omitempty"`
|
||||
MemoryUsedGB float64 `json:"memory_used_gb,omitempty"`
|
||||
MemoryTotalGB float64 `json:"memory_total_gb,omitempty"`
|
||||
DiskUsedGB float64 `json:"disk_used_gb,omitempty"`
|
||||
DiskTotalGB float64 `json:"disk_total_gb,omitempty"`
|
||||
DiskPercent float64 `json:"disk_percent,omitempty"`
|
||||
Uptime string `json:"uptime,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
}
|
||||
|
||||
// Parse metrics if provided (optional, won't fail if empty)
|
||||
err := c.ShouldBindJSON(&metrics)
|
||||
if err != nil {
|
||||
log.Printf("DEBUG: Failed to parse metrics JSON: %v", err)
|
||||
}
|
||||
|
||||
// Debug logging to see what we received
|
||||
log.Printf("DEBUG: Received metrics - Version: '%s', CPU: %.2f, Memory: %.2f",
|
||||
metrics.Version, metrics.CPUPercent, metrics.MemoryPercent)
|
||||
|
||||
// Always handle version information if provided
|
||||
if metrics.Version != "" {
|
||||
// Get current agent to preserve existing metadata
|
||||
agent, err := h.agentQueries.GetAgentByID(agentID)
|
||||
if err == nil && agent.Metadata != nil {
|
||||
// Update agent's current version
|
||||
if err := h.agentQueries.UpdateAgentVersion(agentID, metrics.Version); err != nil {
|
||||
log.Printf("Warning: Failed to update agent version: %v", err)
|
||||
} else {
|
||||
// Check if update is available
|
||||
updateAvailable := utils.IsNewerVersion(h.latestAgentVersion, metrics.Version)
|
||||
|
||||
// Update agent's update availability status
|
||||
if err := h.agentQueries.UpdateAgentUpdateAvailable(agentID, updateAvailable); err != nil {
|
||||
log.Printf("Warning: Failed to update agent update availability: %v", err)
|
||||
}
|
||||
|
||||
// Log version check
|
||||
if updateAvailable {
|
||||
log.Printf("🔄 Agent %s (%s) version %s has update available: %s",
|
||||
agent.Hostname, agentID, metrics.Version, h.latestAgentVersion)
|
||||
} else {
|
||||
log.Printf("✅ Agent %s (%s) version %s is up to date",
|
||||
agent.Hostname, agentID, metrics.Version)
|
||||
}
|
||||
|
||||
// Store version in metadata as well
|
||||
agent.Metadata["reported_version"] = metrics.Version
|
||||
agent.Metadata["latest_version"] = h.latestAgentVersion
|
||||
agent.Metadata["update_available"] = updateAvailable
|
||||
agent.Metadata["version_checked_at"] = time.Now().Format(time.RFC3339)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update agent metadata with current metrics if provided
|
||||
if metrics.CPUPercent > 0 || metrics.MemoryPercent > 0 || metrics.DiskUsedGB > 0 || metrics.Uptime != "" {
|
||||
// Get current agent to preserve existing metadata
|
||||
agent, err := h.agentQueries.GetAgentByID(agentID)
|
||||
if err == nil && agent.Metadata != nil {
|
||||
// Update metrics in metadata
|
||||
agent.Metadata["cpu_percent"] = metrics.CPUPercent
|
||||
agent.Metadata["memory_percent"] = metrics.MemoryPercent
|
||||
agent.Metadata["memory_used_gb"] = metrics.MemoryUsedGB
|
||||
agent.Metadata["memory_total_gb"] = metrics.MemoryTotalGB
|
||||
agent.Metadata["disk_used_gb"] = metrics.DiskUsedGB
|
||||
agent.Metadata["disk_total_gb"] = metrics.DiskTotalGB
|
||||
agent.Metadata["disk_percent"] = metrics.DiskPercent
|
||||
agent.Metadata["uptime"] = metrics.Uptime
|
||||
agent.Metadata["metrics_updated_at"] = time.Now().Format(time.RFC3339)
|
||||
|
||||
// Update agent with new metadata
|
||||
if err := h.agentQueries.UpdateAgent(agent); err != nil {
|
||||
log.Printf("Warning: Failed to update agent metrics: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update last_seen
|
||||
if err := h.agentQueries.UpdateAgentLastSeen(agentID); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update last seen"})
|
||||
return
|
||||
}
|
||||
log.Printf("Updated last_seen for agent %s", agentID)
|
||||
|
||||
// Check for version updates for agents that don't send version in metrics
|
||||
// This ensures agents like Metis that don't report version still get update checks
|
||||
if metrics.Version == "" {
|
||||
// Get current agent to check version
|
||||
agent, err := h.agentQueries.GetAgentByID(agentID)
|
||||
if err == nil && agent.CurrentVersion != "" {
|
||||
// Check if update is available based on stored version
|
||||
updateAvailable := utils.IsNewerVersion(h.latestAgentVersion, agent.CurrentVersion)
|
||||
|
||||
// Update agent's update availability status if it changed
|
||||
if agent.UpdateAvailable != updateAvailable {
|
||||
if err := h.agentQueries.UpdateAgentUpdateAvailable(agentID, updateAvailable); err != nil {
|
||||
log.Printf("Warning: Failed to update agent update availability: %v", err)
|
||||
} else {
|
||||
// Log version check for agent without version reporting
|
||||
if updateAvailable {
|
||||
log.Printf("🔄 Agent %s (%s) stored version %s has update available: %s",
|
||||
agent.Hostname, agentID, agent.CurrentVersion, h.latestAgentVersion)
|
||||
} else {
|
||||
log.Printf("✅ Agent %s (%s) stored version %s is up to date",
|
||||
agent.Hostname, agentID, agent.CurrentVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get pending commands
|
||||
commands, err := h.commandQueries.GetPendingCommands(agentID)
|
||||
@@ -246,6 +374,58 @@ func (h *AgentHandler) TriggerUpdate(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// RenewToken handles token renewal using refresh token
|
||||
func (h *AgentHandler) RenewToken(c *gin.Context) {
|
||||
var req models.TokenRenewalRequest
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Validate refresh token
|
||||
refreshToken, err := h.refreshTokenQueries.ValidateRefreshToken(req.AgentID, req.RefreshToken)
|
||||
if err != nil {
|
||||
log.Printf("Token renewal failed for agent %s: %v", req.AgentID, err)
|
||||
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid or expired refresh token"})
|
||||
return
|
||||
}
|
||||
|
||||
// Check if agent still exists
|
||||
agent, err := h.agentQueries.GetAgentByID(req.AgentID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "agent not found"})
|
||||
return
|
||||
}
|
||||
|
||||
// Update agent last_seen timestamp
|
||||
if err := h.agentQueries.UpdateAgentLastSeen(req.AgentID); err != nil {
|
||||
log.Printf("Warning: Failed to update last_seen for agent %s: %v", req.AgentID, err)
|
||||
}
|
||||
|
||||
// Update refresh token expiration (sliding window - reset to 90 days from now)
|
||||
// This ensures active agents never need to re-register
|
||||
newExpiry := time.Now().Add(90 * 24 * time.Hour)
|
||||
if err := h.refreshTokenQueries.UpdateExpiration(refreshToken.ID, newExpiry); err != nil {
|
||||
log.Printf("Warning: Failed to update refresh token expiration: %v", err)
|
||||
}
|
||||
|
||||
// Generate new access token (24 hours)
|
||||
token, err := middleware.GenerateAgentToken(req.AgentID)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to generate token"})
|
||||
return
|
||||
}
|
||||
|
||||
log.Printf("✅ Token renewed successfully for agent %s (%s)", agent.Hostname, req.AgentID)
|
||||
|
||||
// Return new access token
|
||||
response := models.TokenRenewalResponse{
|
||||
Token: token,
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, response)
|
||||
}
|
||||
|
||||
// UnregisterAgent removes an agent from the system
|
||||
func (h *AgentHandler) UnregisterAgent(c *gin.Context) {
|
||||
idStr := c.Param("id")
|
||||
|
||||
@@ -13,14 +13,16 @@ import (
|
||||
)
|
||||
|
||||
type UpdateHandler struct {
|
||||
updateQueries *queries.UpdateQueries
|
||||
agentQueries *queries.AgentQueries
|
||||
updateQueries *queries.UpdateQueries
|
||||
agentQueries *queries.AgentQueries
|
||||
commandQueries *queries.CommandQueries
|
||||
}
|
||||
|
||||
func NewUpdateHandler(uq *queries.UpdateQueries, aq *queries.AgentQueries) *UpdateHandler {
|
||||
func NewUpdateHandler(uq *queries.UpdateQueries, aq *queries.AgentQueries, cq *queries.CommandQueries) *UpdateHandler {
|
||||
return &UpdateHandler{
|
||||
updateQueries: uq,
|
||||
agentQueries: aq,
|
||||
updateQueries: uq,
|
||||
agentQueries: aq,
|
||||
commandQueries: cq,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,11 +184,46 @@ func (h *UpdateHandler) ReportLog(c *gin.Context) {
|
||||
ExecutedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Store the log entry
|
||||
if err := h.updateQueries.CreateUpdateLog(log); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to save log"})
|
||||
return
|
||||
}
|
||||
|
||||
// NEW: Update command status if command_id is provided
|
||||
if req.CommandID != "" {
|
||||
commandID, err := uuid.Parse(req.CommandID)
|
||||
if err != nil {
|
||||
// Log warning but don't fail the request
|
||||
fmt.Printf("Warning: Invalid command ID format in log request: %s\n", req.CommandID)
|
||||
} else {
|
||||
// Prepare result data for command update
|
||||
result := models.JSONB{
|
||||
"stdout": req.Stdout,
|
||||
"stderr": req.Stderr,
|
||||
"exit_code": req.ExitCode,
|
||||
"duration_seconds": req.DurationSeconds,
|
||||
"logged_at": time.Now(),
|
||||
}
|
||||
|
||||
// Update command status based on log result
|
||||
if req.Result == "success" {
|
||||
if err := h.commandQueries.MarkCommandCompleted(commandID, result); err != nil {
|
||||
fmt.Printf("Warning: Failed to mark command %s as completed: %v\n", commandID, err)
|
||||
}
|
||||
} else if req.Result == "failed" || req.Result == "dry_run_failed" {
|
||||
if err := h.commandQueries.MarkCommandFailed(commandID, result); err != nil {
|
||||
fmt.Printf("Warning: Failed to mark command %s as failed: %v\n", commandID, err)
|
||||
}
|
||||
} else {
|
||||
// For other results, just update the result field
|
||||
if err := h.commandQueries.UpdateCommandResult(commandID, result); err != nil {
|
||||
fmt.Printf("Warning: Failed to update command %s result: %v\n", commandID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "log recorded"})
|
||||
}
|
||||
|
||||
@@ -328,7 +365,7 @@ func (h *UpdateHandler) RejectUpdate(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, gin.H{"message": "update rejected"})
|
||||
}
|
||||
|
||||
// InstallUpdate marks an update as ready for installation
|
||||
// InstallUpdate marks an update as ready for installation and creates a dry run command for the agent
|
||||
func (h *UpdateHandler) InstallUpdate(c *gin.Context) {
|
||||
idStr := c.Param("id")
|
||||
id, err := uuid.Parse(idStr)
|
||||
@@ -337,10 +374,268 @@ func (h *UpdateHandler) InstallUpdate(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := h.updateQueries.InstallUpdate(id); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to start update installation"})
|
||||
// Get the full update details to extract agent_id, package_name, and package_type
|
||||
update, err := h.updateQueries.GetUpdateByID(id)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get update details"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "update installation started"})
|
||||
// Create a command for the agent to perform dry run first
|
||||
command := &models.AgentCommand{
|
||||
ID: uuid.New(),
|
||||
AgentID: update.AgentID,
|
||||
CommandType: models.CommandTypeDryRunUpdate,
|
||||
Params: map[string]interface{}{
|
||||
"update_id": id.String(),
|
||||
"package_name": update.PackageName,
|
||||
"package_type": update.PackageType,
|
||||
},
|
||||
Status: models.CommandStatusPending,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Store the command in database
|
||||
if err := h.commandQueries.CreateCommand(command); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create dry run command"})
|
||||
return
|
||||
}
|
||||
|
||||
// Update the package status to 'checking_dependencies' to show dry run is starting
|
||||
if err := h.updateQueries.SetCheckingDependencies(id); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update package status"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"message": "dry run command created for agent",
|
||||
"command_id": command.ID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// GetUpdateLogs retrieves installation logs for a specific update
|
||||
func (h *UpdateHandler) GetUpdateLogs(c *gin.Context) {
|
||||
idStr := c.Param("id")
|
||||
id, err := uuid.Parse(idStr)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid update ID"})
|
||||
return
|
||||
}
|
||||
|
||||
// Parse limit from query params
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
||||
|
||||
logs, err := h.updateQueries.GetUpdateLogs(id, limit)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to retrieve update logs"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"logs": logs,
|
||||
"count": len(logs),
|
||||
})
|
||||
}
|
||||
|
||||
// ReportDependencies handles dependency reporting from agents after dry run
|
||||
func (h *UpdateHandler) ReportDependencies(c *gin.Context) {
|
||||
agentID := c.MustGet("agent_id").(uuid.UUID)
|
||||
|
||||
// Update last_seen timestamp
|
||||
if err := h.agentQueries.UpdateAgentLastSeen(agentID); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update last seen"})
|
||||
return
|
||||
}
|
||||
|
||||
var req models.DependencyReportRequest
|
||||
if err := c.ShouldBindJSON(&req); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
// Update the package status to pending_dependencies
|
||||
if err := h.updateQueries.SetPendingDependencies(agentID, req.PackageType, req.PackageName, req.Dependencies); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update package status"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "dependencies reported and status updated"})
|
||||
}
|
||||
|
||||
// ConfirmDependencies handles user confirmation to proceed with dependency installation
|
||||
func (h *UpdateHandler) ConfirmDependencies(c *gin.Context) {
|
||||
idStr := c.Param("id")
|
||||
id, err := uuid.Parse(idStr)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid update ID"})
|
||||
return
|
||||
}
|
||||
|
||||
// Get the update details
|
||||
update, err := h.updateQueries.GetUpdateByID(id)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusNotFound, gin.H{"error": "update not found"})
|
||||
return
|
||||
}
|
||||
|
||||
// Create a command for the agent to install with dependencies
|
||||
command := &models.AgentCommand{
|
||||
ID: uuid.New(),
|
||||
AgentID: update.AgentID,
|
||||
CommandType: models.CommandTypeConfirmDependencies,
|
||||
Params: map[string]interface{}{
|
||||
"update_id": id.String(),
|
||||
"package_name": update.PackageName,
|
||||
"package_type": update.PackageType,
|
||||
"dependencies": update.Metadata["dependencies"], // Dependencies stored in metadata
|
||||
},
|
||||
Status: models.CommandStatusPending,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Store the command in database
|
||||
if err := h.commandQueries.CreateCommand(command); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create confirmation command"})
|
||||
return
|
||||
}
|
||||
|
||||
// Update the package status to 'installing'
|
||||
if err := h.updateQueries.InstallUpdate(id); err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update package status"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"message": "dependency installation confirmed and command created",
|
||||
"command_id": command.ID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// GetAllLogs retrieves logs across all agents with filtering for universal log view
|
||||
func (h *UpdateHandler) GetAllLogs(c *gin.Context) {
|
||||
filters := &models.LogFilters{
|
||||
Action: c.Query("action"),
|
||||
Result: c.Query("result"),
|
||||
}
|
||||
|
||||
// Parse agent_id if provided
|
||||
if agentIDStr := c.Query("agent_id"); agentIDStr != "" {
|
||||
agentID, err := uuid.Parse(agentIDStr)
|
||||
if err == nil {
|
||||
filters.AgentID = agentID
|
||||
}
|
||||
}
|
||||
|
||||
// Parse since timestamp if provided
|
||||
if sinceStr := c.Query("since"); sinceStr != "" {
|
||||
sinceTime, err := time.Parse(time.RFC3339, sinceStr)
|
||||
if err == nil {
|
||||
filters.Since = &sinceTime
|
||||
}
|
||||
}
|
||||
|
||||
// Parse pagination
|
||||
page, _ := strconv.Atoi(c.DefaultQuery("page", "1"))
|
||||
pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "100"))
|
||||
filters.Page = page
|
||||
filters.PageSize = pageSize
|
||||
|
||||
logs, total, err := h.updateQueries.GetAllLogs(filters)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to retrieve logs"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"logs": logs,
|
||||
"total": total,
|
||||
"page": page,
|
||||
"page_size": pageSize,
|
||||
})
|
||||
}
|
||||
|
||||
// GetActiveOperations retrieves currently running operations for live status view
|
||||
func (h *UpdateHandler) GetActiveOperations(c *gin.Context) {
|
||||
operations, err := h.updateQueries.GetActiveOperations()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to retrieve active operations"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"operations": operations,
|
||||
"count": len(operations),
|
||||
})
|
||||
}
|
||||
|
||||
// RetryCommand retries a failed, timed_out, or cancelled command
|
||||
func (h *UpdateHandler) RetryCommand(c *gin.Context) {
|
||||
idStr := c.Param("id")
|
||||
id, err := uuid.Parse(idStr)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid command ID"})
|
||||
return
|
||||
}
|
||||
|
||||
// Create a new command based on the original
|
||||
newCommand, err := h.commandQueries.RetryCommand(id)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("failed to retry command: %v", err)})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"message": "command retry created",
|
||||
"command_id": newCommand.ID.String(),
|
||||
"new_id": newCommand.ID.String(),
|
||||
})
|
||||
}
|
||||
|
||||
// CancelCommand cancels a pending or sent command
|
||||
func (h *UpdateHandler) CancelCommand(c *gin.Context) {
|
||||
idStr := c.Param("id")
|
||||
id, err := uuid.Parse(idStr)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid command ID"})
|
||||
return
|
||||
}
|
||||
|
||||
// Cancel the command
|
||||
if err := h.commandQueries.CancelCommand(id); err != nil {
|
||||
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("failed to cancel command: %v", err)})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{"message": "command cancelled"})
|
||||
}
|
||||
|
||||
// GetActiveCommands retrieves currently active commands for live operations view
|
||||
func (h *UpdateHandler) GetActiveCommands(c *gin.Context) {
|
||||
commands, err := h.commandQueries.GetActiveCommands()
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to retrieve active commands"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"commands": commands,
|
||||
"count": len(commands),
|
||||
})
|
||||
}
|
||||
|
||||
// GetRecentCommands retrieves recent commands for retry functionality
|
||||
func (h *UpdateHandler) GetRecentCommands(c *gin.Context) {
|
||||
limit, _ := strconv.Atoi(c.DefaultQuery("limit", "50"))
|
||||
|
||||
commands, err := h.commandQueries.GetRecentCommands(limit)
|
||||
if err != nil {
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to retrieve recent commands"})
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(http.StatusOK, gin.H{
|
||||
"commands": commands,
|
||||
"count": len(commands),
|
||||
"limit": limit,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ type Config struct {
|
||||
CheckInInterval int
|
||||
OfflineThreshold int
|
||||
Timezone string
|
||||
LatestAgentVersion string
|
||||
}
|
||||
|
||||
// Load reads configuration from environment variables
|
||||
@@ -27,12 +28,13 @@ func Load() (*Config, error) {
|
||||
offlineThreshold, _ := strconv.Atoi(getEnv("OFFLINE_THRESHOLD", "600"))
|
||||
|
||||
cfg := &Config{
|
||||
ServerPort: getEnv("SERVER_PORT", "8080"),
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://aggregator:aggregator@localhost:5432/aggregator?sslmode=disable"),
|
||||
JWTSecret: getEnv("JWT_SECRET", "test-secret-for-development-only"),
|
||||
CheckInInterval: checkInInterval,
|
||||
OfflineThreshold: offlineThreshold,
|
||||
Timezone: getEnv("TIMEZONE", "UTC"),
|
||||
ServerPort: getEnv("SERVER_PORT", "8080"),
|
||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://aggregator:aggregator@localhost:5432/aggregator?sslmode=disable"),
|
||||
JWTSecret: getEnv("JWT_SECRET", "test-secret-for-development-only"),
|
||||
CheckInInterval: checkInInterval,
|
||||
OfflineThreshold: offlineThreshold,
|
||||
Timezone: getEnv("TIMEZONE", "UTC"),
|
||||
LatestAgentVersion: getEnv("LATEST_AGENT_VERSION", "0.1.4"),
|
||||
}
|
||||
|
||||
// Debug: Log what JWT secret we're using (remove in production)
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
-- Fix foreign key relationship for update_logs table to reference current_package_state instead of update_packages
|
||||
-- This ensures compatibility with the new event sourcing system
|
||||
|
||||
-- First, drop the existing foreign key constraint
|
||||
ALTER TABLE update_logs DROP CONSTRAINT IF EXISTS update_logs_update_package_id_fkey;
|
||||
|
||||
-- Add the new foreign key constraint to reference current_package_state
|
||||
ALTER TABLE update_logs
|
||||
ADD CONSTRAINT update_logs_update_package_id_fkey
|
||||
FOREIGN KEY (update_package_id) REFERENCES current_package_state(id) ON DELETE SET NULL;
|
||||
|
||||
-- Add index for better performance on the new foreign key
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_update_package ON update_logs(update_package_id);
|
||||
@@ -0,0 +1,18 @@
|
||||
-- Add pending_dependencies and checking_dependencies status to support dependency confirmation workflow
|
||||
ALTER TABLE current_package_state
|
||||
DROP CONSTRAINT IF EXISTS current_package_state_status_check;
|
||||
|
||||
ALTER TABLE current_package_state
|
||||
ADD CONSTRAINT current_package_state_status_check
|
||||
CHECK (status IN ('pending', 'approved', 'updated', 'failed', 'ignored', 'installing', 'pending_dependencies', 'checking_dependencies'));
|
||||
|
||||
-- Also update any legacy tables if they exist
|
||||
DO $$
|
||||
BEGIN
|
||||
IF EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'updates') THEN
|
||||
ALTER TABLE updates
|
||||
DROP CONSTRAINT IF EXISTS updates_status_check,
|
||||
ADD CONSTRAINT updates_status_check
|
||||
CHECK (status IN ('pending', 'approved', 'scheduled', 'installing', 'installed', 'failed', 'ignored', 'pending_dependencies', 'checking_dependencies'));
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -0,0 +1,18 @@
|
||||
-- Add missing command statuses to the check constraint
|
||||
-- This allows 'timed_out', 'cancelled', and 'running' statuses that the application uses
|
||||
|
||||
-- First drop the existing constraint
|
||||
ALTER TABLE agent_commands DROP CONSTRAINT IF EXISTS agent_commands_status_check;
|
||||
|
||||
-- Add the new constraint with all valid statuses
|
||||
ALTER TABLE agent_commands
|
||||
ADD CONSTRAINT agent_commands_status_check
|
||||
CHECK (status::text = ANY (ARRAY[
|
||||
'pending'::character varying,
|
||||
'sent'::character varying,
|
||||
'running'::character varying,
|
||||
'completed'::character varying,
|
||||
'failed'::character varying,
|
||||
'timed_out'::character varying,
|
||||
'cancelled'::character varying
|
||||
]::text[]));
|
||||
@@ -0,0 +1,13 @@
|
||||
-- Expand status column to accommodate longer status values
|
||||
-- checking_dependencies (23 chars) and pending_dependencies (21 chars) exceed current 20 char limit
|
||||
|
||||
ALTER TABLE current_package_state
|
||||
ALTER COLUMN status TYPE character varying(30);
|
||||
|
||||
-- Update check constraint to match new length
|
||||
ALTER TABLE current_package_state
|
||||
DROP CONSTRAINT IF EXISTS current_package_state_status_check;
|
||||
|
||||
ALTER TABLE current_package_state
|
||||
ADD CONSTRAINT current_package_state_status_check
|
||||
CHECK (status::text = ANY (ARRAY['pending'::character varying, 'approved'::character varying, 'updated'::character varying, 'failed'::character varying, 'ignored'::character varying, 'installing'::character varying, 'pending_dependencies'::character varying, 'checking_dependencies'::character varying]::text[]));
|
||||
@@ -0,0 +1,29 @@
|
||||
-- 008_create_refresh_tokens_table.sql
|
||||
-- Create refresh tokens table for secure token renewal
|
||||
|
||||
CREATE TABLE IF NOT EXISTS refresh_tokens (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
|
||||
token_hash VARCHAR(64) NOT NULL, -- SHA-256 hash of the refresh token
|
||||
expires_at TIMESTAMP NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
last_used_at TIMESTAMP,
|
||||
revoked BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
CONSTRAINT unique_token_hash UNIQUE(token_hash)
|
||||
);
|
||||
|
||||
-- Index for fast agent lookup
|
||||
CREATE INDEX IF NOT EXISTS idx_refresh_tokens_agent_id ON refresh_tokens(agent_id);
|
||||
|
||||
-- Index for expiration cleanup
|
||||
CREATE INDEX IF NOT EXISTS idx_refresh_tokens_expires_at ON refresh_tokens(expires_at);
|
||||
|
||||
-- Index for token validation
|
||||
CREATE INDEX IF NOT EXISTS idx_refresh_tokens_hash_not_revoked
|
||||
ON refresh_tokens(token_hash) WHERE NOT revoked;
|
||||
|
||||
COMMENT ON TABLE refresh_tokens IS 'Stores long-lived refresh tokens for agent token renewal without re-registration';
|
||||
COMMENT ON COLUMN refresh_tokens.token_hash IS 'SHA-256 hash of the refresh token for secure storage';
|
||||
COMMENT ON COLUMN refresh_tokens.expires_at IS 'Refresh token expiration (default: 90 days from creation)';
|
||||
COMMENT ON COLUMN refresh_tokens.last_used_at IS 'Timestamp of last successful token renewal';
|
||||
COMMENT ON COLUMN refresh_tokens.revoked IS 'Flag to revoke token before expiration';
|
||||
@@ -0,0 +1,16 @@
|
||||
-- Add version tracking to agents table
|
||||
-- This enables the hybrid version tracking system
|
||||
|
||||
ALTER TABLE agents
|
||||
ADD COLUMN current_version VARCHAR(50) DEFAULT '0.1.3',
|
||||
ADD COLUMN update_available BOOLEAN DEFAULT FALSE,
|
||||
ADD COLUMN last_version_check TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
|
||||
|
||||
-- Add index for faster queries on update status
|
||||
CREATE INDEX idx_agents_update_available ON agents(update_available);
|
||||
CREATE INDEX idx_agents_current_version ON agents(current_version);
|
||||
|
||||
-- Add comment to document the purpose
|
||||
COMMENT ON COLUMN agents.current_version IS 'The version of the agent currently running';
|
||||
COMMENT ON COLUMN agents.update_available IS 'Whether an update is available for this agent';
|
||||
COMMENT ON COLUMN agents.last_version_check IS 'Last time the agent version was checked';
|
||||
@@ -49,6 +49,24 @@ func (q *AgentQueries) UpdateAgentLastSeen(id uuid.UUID) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateAgent updates an agent's full record including metadata
|
||||
func (q *AgentQueries) UpdateAgent(agent *models.Agent) error {
|
||||
query := `
|
||||
UPDATE agents SET
|
||||
hostname = :hostname,
|
||||
os_type = :os_type,
|
||||
os_version = :os_version,
|
||||
os_architecture = :os_architecture,
|
||||
agent_version = :agent_version,
|
||||
last_seen = :last_seen,
|
||||
status = :status,
|
||||
metadata = :metadata
|
||||
WHERE id = :id
|
||||
`
|
||||
_, err := q.db.NamedExec(query, agent)
|
||||
return err
|
||||
}
|
||||
|
||||
// ListAgents returns all agents with optional filtering
|
||||
func (q *AgentQueries) ListAgents(status, osType string) ([]models.Agent, error) {
|
||||
var agents []models.Agent
|
||||
@@ -137,6 +155,29 @@ func (q *AgentQueries) ListAgentsWithLastScan(status, osType string) ([]models.A
|
||||
return agents, err
|
||||
}
|
||||
|
||||
// UpdateAgentVersion updates the agent's version information and checks for updates
|
||||
func (q *AgentQueries) UpdateAgentVersion(id uuid.UUID, currentVersion string) error {
|
||||
query := `
|
||||
UPDATE agents SET
|
||||
current_version = $1,
|
||||
last_version_check = $2
|
||||
WHERE id = $3
|
||||
`
|
||||
_, err := q.db.Exec(query, currentVersion, time.Now().UTC(), id)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateAgentUpdateAvailable sets whether an update is available for an agent
|
||||
func (q *AgentQueries) UpdateAgentUpdateAvailable(id uuid.UUID, updateAvailable bool) error {
|
||||
query := `
|
||||
UPDATE agents SET
|
||||
update_available = $1
|
||||
WHERE id = $2
|
||||
`
|
||||
_, err := q.db.Exec(query, updateAvailable, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteAgent removes an agent and all associated data
|
||||
func (q *AgentQueries) DeleteAgent(id uuid.UUID) error {
|
||||
// Start a transaction for atomic deletion
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package queries
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/aggregator-project/aggregator-server/internal/models"
|
||||
@@ -77,3 +78,168 @@ func (q *CommandQueries) MarkCommandFailed(id uuid.UUID, result models.JSONB) er
|
||||
_, err := q.db.Exec(query, now, result, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetCommandsByStatus retrieves commands with a specific status
|
||||
func (q *CommandQueries) GetCommandsByStatus(status string) ([]models.AgentCommand, error) {
|
||||
var commands []models.AgentCommand
|
||||
query := `
|
||||
SELECT * FROM agent_commands
|
||||
WHERE status = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
err := q.db.Select(&commands, query, status)
|
||||
return commands, err
|
||||
}
|
||||
|
||||
// UpdateCommandStatus updates only the status of a command
|
||||
func (q *CommandQueries) UpdateCommandStatus(id uuid.UUID, status string) error {
|
||||
query := `
|
||||
UPDATE agent_commands
|
||||
SET status = $1
|
||||
WHERE id = $2
|
||||
`
|
||||
_, err := q.db.Exec(query, status, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateCommandResult updates only the result of a command
|
||||
func (q *CommandQueries) UpdateCommandResult(id uuid.UUID, result interface{}) error {
|
||||
query := `
|
||||
UPDATE agent_commands
|
||||
SET result = $1
|
||||
WHERE id = $2
|
||||
`
|
||||
_, err := q.db.Exec(query, result, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetCommandByID retrieves a specific command by ID
|
||||
func (q *CommandQueries) GetCommandByID(id uuid.UUID) (*models.AgentCommand, error) {
|
||||
var command models.AgentCommand
|
||||
query := `
|
||||
SELECT * FROM agent_commands
|
||||
WHERE id = $1
|
||||
`
|
||||
err := q.db.Get(&command, query, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &command, nil
|
||||
}
|
||||
|
||||
// CancelCommand marks a command as cancelled
|
||||
func (q *CommandQueries) CancelCommand(id uuid.UUID) error {
|
||||
now := time.Now()
|
||||
query := `
|
||||
UPDATE agent_commands
|
||||
SET status = 'cancelled', completed_at = $1
|
||||
WHERE id = $2 AND status IN ('pending', 'sent')
|
||||
`
|
||||
_, err := q.db.Exec(query, now, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// RetryCommand creates a new command based on a failed/timed_out/cancelled command
|
||||
func (q *CommandQueries) RetryCommand(originalID uuid.UUID) (*models.AgentCommand, error) {
|
||||
// Get the original command
|
||||
original, err := q.GetCommandByID(originalID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Only allow retry of failed, timed_out, or cancelled commands
|
||||
if original.Status != "failed" && original.Status != "timed_out" && original.Status != "cancelled" {
|
||||
return nil, fmt.Errorf("command must be failed, timed_out, or cancelled to retry")
|
||||
}
|
||||
|
||||
// Create new command with same parameters
|
||||
newCommand := &models.AgentCommand{
|
||||
ID: uuid.New(),
|
||||
AgentID: original.AgentID,
|
||||
CommandType: original.CommandType,
|
||||
Params: original.Params,
|
||||
Status: models.CommandStatusPending,
|
||||
CreatedAt: time.Now(),
|
||||
}
|
||||
|
||||
// Store the new command
|
||||
if err := q.CreateCommand(newCommand); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newCommand, nil
|
||||
}
|
||||
|
||||
// GetActiveCommands retrieves commands that are not in a final/terminal state
|
||||
// Shows anything that's in progress or can be retried (excludes completed and cancelled)
|
||||
func (q *CommandQueries) GetActiveCommands() ([]models.ActiveCommandInfo, error) {
|
||||
var commands []models.ActiveCommandInfo
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
c.id,
|
||||
c.agent_id,
|
||||
c.command_type,
|
||||
c.status,
|
||||
c.created_at,
|
||||
c.sent_at,
|
||||
c.result,
|
||||
a.hostname as agent_hostname,
|
||||
COALESCE(ups.package_name, 'N/A') as package_name,
|
||||
COALESCE(ups.package_type, 'N/A') as package_type
|
||||
FROM agent_commands c
|
||||
LEFT JOIN agents a ON c.agent_id = a.id
|
||||
LEFT JOIN current_package_state ups ON (
|
||||
c.params->>'update_id' = ups.id::text OR
|
||||
(c.params->>'package_name' = ups.package_name AND c.params->>'package_type' = ups.package_type)
|
||||
)
|
||||
WHERE c.status NOT IN ('completed', 'cancelled')
|
||||
ORDER BY c.created_at DESC
|
||||
`
|
||||
|
||||
err := q.db.Select(&commands, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get active commands: %w", err)
|
||||
}
|
||||
|
||||
return commands, nil
|
||||
}
|
||||
|
||||
// GetRecentCommands retrieves recent commands (including failed, completed, etc.) for retry functionality
|
||||
func (q *CommandQueries) GetRecentCommands(limit int) ([]models.ActiveCommandInfo, error) {
|
||||
var commands []models.ActiveCommandInfo
|
||||
|
||||
if limit == 0 {
|
||||
limit = 50 // Default limit
|
||||
}
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
c.id,
|
||||
c.agent_id,
|
||||
c.command_type,
|
||||
c.status,
|
||||
c.created_at,
|
||||
c.sent_at,
|
||||
c.completed_at,
|
||||
c.result,
|
||||
a.hostname as agent_hostname,
|
||||
COALESCE(ups.package_name, 'N/A') as package_name,
|
||||
COALESCE(ups.package_type, 'N/A') as package_type
|
||||
FROM agent_commands c
|
||||
LEFT JOIN agents a ON c.agent_id = a.id
|
||||
LEFT JOIN current_package_state ups ON (
|
||||
c.params->>'update_id' = ups.id::text OR
|
||||
(c.params->>'package_name' = ups.package_name AND c.params->>'package_type' = ups.package_type)
|
||||
)
|
||||
ORDER BY c.created_at DESC
|
||||
LIMIT $1
|
||||
`
|
||||
|
||||
err := q.db.Select(&commands, query, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get recent commands: %w", err)
|
||||
}
|
||||
|
||||
return commands, nil
|
||||
}
|
||||
|
||||
171
aggregator-server/internal/database/queries/refresh_tokens.go
Normal file
171
aggregator-server/internal/database/queries/refresh_tokens.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package queries
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type RefreshTokenQueries struct {
|
||||
db *sqlx.DB
|
||||
}
|
||||
|
||||
func NewRefreshTokenQueries(db *sqlx.DB) *RefreshTokenQueries {
|
||||
return &RefreshTokenQueries{db: db}
|
||||
}
|
||||
|
||||
// RefreshToken represents a refresh token in the database
|
||||
type RefreshToken struct {
|
||||
ID uuid.UUID `db:"id"`
|
||||
AgentID uuid.UUID `db:"agent_id"`
|
||||
TokenHash string `db:"token_hash"`
|
||||
ExpiresAt time.Time `db:"expires_at"`
|
||||
CreatedAt time.Time `db:"created_at"`
|
||||
LastUsedAt *time.Time `db:"last_used_at"`
|
||||
Revoked bool `db:"revoked"`
|
||||
}
|
||||
|
||||
// GenerateRefreshToken creates a cryptographically secure random token
|
||||
func GenerateRefreshToken() (string, error) {
|
||||
// Generate 32 bytes of random data (256 bits)
|
||||
tokenBytes := make([]byte, 32)
|
||||
if _, err := rand.Read(tokenBytes); err != nil {
|
||||
return "", fmt.Errorf("failed to generate random token: %w", err)
|
||||
}
|
||||
|
||||
// Encode as hex string (64 characters)
|
||||
token := hex.EncodeToString(tokenBytes)
|
||||
return token, nil
|
||||
}
|
||||
|
||||
// HashRefreshToken creates SHA-256 hash of the token for storage
|
||||
func HashRefreshToken(token string) string {
|
||||
hash := sha256.Sum256([]byte(token))
|
||||
return hex.EncodeToString(hash[:])
|
||||
}
|
||||
|
||||
// CreateRefreshToken stores a new refresh token for an agent
|
||||
func (q *RefreshTokenQueries) CreateRefreshToken(agentID uuid.UUID, token string, expiresAt time.Time) error {
|
||||
tokenHash := HashRefreshToken(token)
|
||||
|
||||
query := `
|
||||
INSERT INTO refresh_tokens (agent_id, token_hash, expires_at)
|
||||
VALUES ($1, $2, $3)
|
||||
`
|
||||
|
||||
_, err := q.db.Exec(query, agentID, tokenHash, expiresAt)
|
||||
return err
|
||||
}
|
||||
|
||||
// ValidateRefreshToken checks if a refresh token is valid
|
||||
func (q *RefreshTokenQueries) ValidateRefreshToken(agentID uuid.UUID, token string) (*RefreshToken, error) {
|
||||
tokenHash := HashRefreshToken(token)
|
||||
|
||||
query := `
|
||||
SELECT id, agent_id, token_hash, expires_at, created_at, last_used_at, revoked
|
||||
FROM refresh_tokens
|
||||
WHERE agent_id = $1 AND token_hash = $2 AND NOT revoked
|
||||
`
|
||||
|
||||
var refreshToken RefreshToken
|
||||
err := q.db.Get(&refreshToken, query, agentID, tokenHash)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("refresh token not found or invalid: %w", err)
|
||||
}
|
||||
|
||||
// Check if token is expired
|
||||
if time.Now().After(refreshToken.ExpiresAt) {
|
||||
return nil, fmt.Errorf("refresh token expired")
|
||||
}
|
||||
|
||||
return &refreshToken, nil
|
||||
}
|
||||
|
||||
// UpdateLastUsed updates the last_used_at timestamp for a refresh token
|
||||
func (q *RefreshTokenQueries) UpdateLastUsed(tokenID uuid.UUID) error {
|
||||
query := `
|
||||
UPDATE refresh_tokens
|
||||
SET last_used_at = NOW()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
_, err := q.db.Exec(query, tokenID)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateExpiration updates the refresh token expiration (for sliding window)
|
||||
// Resets expiration to specified time and updates last_used_at
|
||||
func (q *RefreshTokenQueries) UpdateExpiration(tokenID uuid.UUID, newExpiry time.Time) error {
|
||||
query := `
|
||||
UPDATE refresh_tokens
|
||||
SET expires_at = $1, last_used_at = NOW()
|
||||
WHERE id = $2
|
||||
`
|
||||
|
||||
_, err := q.db.Exec(query, newExpiry, tokenID)
|
||||
return err
|
||||
}
|
||||
|
||||
// RevokeRefreshToken marks a refresh token as revoked
|
||||
func (q *RefreshTokenQueries) RevokeRefreshToken(agentID uuid.UUID, token string) error {
|
||||
tokenHash := HashRefreshToken(token)
|
||||
|
||||
query := `
|
||||
UPDATE refresh_tokens
|
||||
SET revoked = TRUE
|
||||
WHERE agent_id = $1 AND token_hash = $2
|
||||
`
|
||||
|
||||
_, err := q.db.Exec(query, agentID, tokenHash)
|
||||
return err
|
||||
}
|
||||
|
||||
// RevokeAllAgentTokens revokes all refresh tokens for an agent
|
||||
func (q *RefreshTokenQueries) RevokeAllAgentTokens(agentID uuid.UUID) error {
|
||||
query := `
|
||||
UPDATE refresh_tokens
|
||||
SET revoked = TRUE
|
||||
WHERE agent_id = $1 AND NOT revoked
|
||||
`
|
||||
|
||||
_, err := q.db.Exec(query, agentID)
|
||||
return err
|
||||
}
|
||||
|
||||
// CleanupExpiredTokens removes expired refresh tokens from the database
|
||||
func (q *RefreshTokenQueries) CleanupExpiredTokens() (int64, error) {
|
||||
query := `
|
||||
DELETE FROM refresh_tokens
|
||||
WHERE expires_at < NOW() OR revoked = TRUE
|
||||
`
|
||||
|
||||
result, err := q.db.Exec(query)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
rowsAffected, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return rowsAffected, nil
|
||||
}
|
||||
|
||||
// GetActiveTokenCount returns the number of active (non-revoked, non-expired) tokens for an agent
|
||||
func (q *RefreshTokenQueries) GetActiveTokenCount(agentID uuid.UUID) (int, error) {
|
||||
query := `
|
||||
SELECT COUNT(*)
|
||||
FROM refresh_tokens
|
||||
WHERE agent_id = $1 AND NOT revoked AND expires_at > NOW()
|
||||
`
|
||||
|
||||
var count int
|
||||
err := q.db.Get(&count, query, agentID)
|
||||
return count, err
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package queries
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -210,6 +211,41 @@ func (q *UpdateQueries) InstallUpdate(id uuid.UUID) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// SetCheckingDependencies marks an update as being checked for dependencies
|
||||
func (q *UpdateQueries) SetCheckingDependencies(id uuid.UUID) error {
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'checking_dependencies', last_updated_at = NOW()
|
||||
WHERE id = $1 AND status = 'approved'
|
||||
`
|
||||
_, err := q.db.Exec(query, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// SetPendingDependencies marks an update as having dependencies that need approval
|
||||
func (q *UpdateQueries) SetPendingDependencies(agentID uuid.UUID, packageType, packageName string, dependencies []string) error {
|
||||
// Marshal dependencies to JSON for database storage
|
||||
depsJSON, err := json.Marshal(dependencies)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal dependencies: %w", err)
|
||||
}
|
||||
|
||||
query := `
|
||||
UPDATE current_package_state
|
||||
SET status = 'pending_dependencies',
|
||||
metadata = jsonb_set(
|
||||
jsonb_set(metadata, '{dependencies}', $4::jsonb),
|
||||
'{dependencies_reported_at}',
|
||||
to_jsonb(NOW())
|
||||
),
|
||||
last_updated_at = NOW()
|
||||
WHERE agent_id = $1 AND package_type = $2 AND package_name = $3
|
||||
AND status IN ('checking_dependencies', 'installing')
|
||||
`
|
||||
_, err = q.db.Exec(query, agentID, packageType, packageName, depsJSON)
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateUpdateLog inserts an update log entry
|
||||
func (q *UpdateQueries) CreateUpdateLog(log *models.UpdateLog) error {
|
||||
query := `
|
||||
@@ -456,7 +492,7 @@ func (q *UpdateQueries) GetPackageHistory(agentID uuid.UUID, packageType, packag
|
||||
}
|
||||
|
||||
// UpdatePackageStatus updates the status of a package and records history
|
||||
func (q *UpdateQueries) UpdatePackageStatus(agentID uuid.UUID, packageType, packageName, status string, metadata map[string]interface{}) error {
|
||||
func (q *UpdateQueries) UpdatePackageStatus(agentID uuid.UUID, packageType, packageName, status string, metadata models.JSONB) error {
|
||||
tx, err := q.db.Beginx()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to begin transaction: %w", err)
|
||||
@@ -587,3 +623,127 @@ func (q *UpdateQueries) GetAllUpdateStats() (*models.UpdateStats, error) {
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// GetUpdateLogs retrieves installation logs for a specific update
|
||||
func (q *UpdateQueries) GetUpdateLogs(updateID uuid.UUID, limit int) ([]models.UpdateLog, error) {
|
||||
var logs []models.UpdateLog
|
||||
|
||||
query := `
|
||||
SELECT
|
||||
id, agent_id, update_package_id, action, result,
|
||||
stdout, stderr, exit_code, duration_seconds, executed_at
|
||||
FROM update_logs
|
||||
WHERE update_package_id = $1
|
||||
ORDER BY executed_at DESC
|
||||
LIMIT $2
|
||||
`
|
||||
|
||||
if limit == 0 {
|
||||
limit = 50 // Default limit
|
||||
}
|
||||
|
||||
err := q.db.Select(&logs, query, updateID, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get update logs: %w", err)
|
||||
}
|
||||
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
// GetAllLogs retrieves logs across all agents with filtering
|
||||
func (q *UpdateQueries) GetAllLogs(filters *models.LogFilters) ([]models.UpdateLog, int, error) {
|
||||
var logs []models.UpdateLog
|
||||
whereClause := []string{"1=1"}
|
||||
args := []interface{}{}
|
||||
argIdx := 1
|
||||
|
||||
// Add filters
|
||||
if filters.AgentID != uuid.Nil {
|
||||
whereClause = append(whereClause, fmt.Sprintf("agent_id = $%d", argIdx))
|
||||
args = append(args, filters.AgentID)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if filters.Action != "" {
|
||||
whereClause = append(whereClause, fmt.Sprintf("action = $%d", argIdx))
|
||||
args = append(args, filters.Action)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if filters.Result != "" {
|
||||
whereClause = append(whereClause, fmt.Sprintf("result = $%d", argIdx))
|
||||
args = append(args, filters.Result)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
if filters.Since != nil {
|
||||
whereClause = append(whereClause, fmt.Sprintf("executed_at >= $%d", argIdx))
|
||||
args = append(args, filters.Since)
|
||||
argIdx++
|
||||
}
|
||||
|
||||
// Get total count
|
||||
countQuery := "SELECT COUNT(*) FROM update_logs WHERE " + strings.Join(whereClause, " AND ")
|
||||
var total int
|
||||
err := q.db.Get(&total, countQuery, args...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to get logs count: %w", err)
|
||||
}
|
||||
|
||||
// Get paginated results
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
id, agent_id, update_package_id, action, result,
|
||||
stdout, stderr, exit_code, duration_seconds, executed_at
|
||||
FROM update_logs
|
||||
WHERE %s
|
||||
ORDER BY executed_at DESC
|
||||
LIMIT $%d OFFSET $%d
|
||||
`, strings.Join(whereClause, " AND "), argIdx, argIdx+1)
|
||||
|
||||
limit := filters.PageSize
|
||||
if limit == 0 {
|
||||
limit = 100 // Default limit
|
||||
}
|
||||
offset := (filters.Page - 1) * limit
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
}
|
||||
|
||||
args = append(args, limit, offset)
|
||||
err = q.db.Select(&logs, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to get all logs: %w", err)
|
||||
}
|
||||
|
||||
return logs, total, nil
|
||||
}
|
||||
|
||||
// GetActiveOperations returns currently running operations
|
||||
func (q *UpdateQueries) GetActiveOperations() ([]models.ActiveOperation, error) {
|
||||
var operations []models.ActiveOperation
|
||||
|
||||
query := `
|
||||
SELECT DISTINCT ON (agent_id, package_type, package_name)
|
||||
id,
|
||||
agent_id,
|
||||
package_type,
|
||||
package_name,
|
||||
current_version,
|
||||
available_version,
|
||||
severity,
|
||||
status,
|
||||
last_updated_at,
|
||||
metadata
|
||||
FROM current_package_state
|
||||
WHERE status IN ('checking_dependencies', 'installing', 'pending_dependencies')
|
||||
ORDER BY agent_id, package_type, package_name, last_updated_at DESC
|
||||
`
|
||||
|
||||
err := q.db.Select(&operations, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get active operations: %w", err)
|
||||
}
|
||||
|
||||
return operations, nil
|
||||
}
|
||||
|
||||
@@ -15,7 +15,10 @@ type Agent struct {
|
||||
OSType string `json:"os_type" db:"os_type"`
|
||||
OSVersion string `json:"os_version" db:"os_version"`
|
||||
OSArchitecture string `json:"os_architecture" db:"os_architecture"`
|
||||
AgentVersion string `json:"agent_version" db:"agent_version"`
|
||||
AgentVersion string `json:"agent_version" db:"agent_version"` // Version at registration
|
||||
CurrentVersion string `json:"current_version" db:"current_version"` // Current running version
|
||||
UpdateAvailable bool `json:"update_available" db:"update_available"` // Whether update is available
|
||||
LastVersionCheck time.Time `json:"last_version_check" db:"last_version_check"` // Last time version was checked
|
||||
LastSeen time.Time `json:"last_seen" db:"last_seen"`
|
||||
Status string `json:"status" db:"status"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
@@ -30,7 +33,10 @@ type AgentWithLastScan struct {
|
||||
OSType string `json:"os_type" db:"os_type"`
|
||||
OSVersion string `json:"os_version" db:"os_version"`
|
||||
OSArchitecture string `json:"os_architecture" db:"os_architecture"`
|
||||
AgentVersion string `json:"agent_version" db:"agent_version"`
|
||||
AgentVersion string `json:"agent_version" db:"agent_version"` // Version at registration
|
||||
CurrentVersion string `json:"current_version" db:"current_version"` // Current running version
|
||||
UpdateAvailable bool `json:"update_available" db:"update_available"` // Whether update is available
|
||||
LastVersionCheck time.Time `json:"last_version_check" db:"last_version_check"` // Last time version was checked
|
||||
LastSeen time.Time `json:"last_seen" db:"last_seen"`
|
||||
Status string `json:"status" db:"status"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
@@ -67,9 +73,21 @@ type AgentRegistrationRequest struct {
|
||||
|
||||
// AgentRegistrationResponse is returned after successful registration
|
||||
type AgentRegistrationResponse struct {
|
||||
AgentID uuid.UUID `json:"agent_id"`
|
||||
Token string `json:"token"`
|
||||
Config map[string]interface{} `json:"config"`
|
||||
AgentID uuid.UUID `json:"agent_id"`
|
||||
Token string `json:"token"` // Short-lived access token (24h)
|
||||
RefreshToken string `json:"refresh_token"` // Long-lived refresh token (90d)
|
||||
Config map[string]interface{} `json:"config"`
|
||||
}
|
||||
|
||||
// TokenRenewalRequest is the payload for token renewal using refresh token
|
||||
type TokenRenewalRequest struct {
|
||||
AgentID uuid.UUID `json:"agent_id" binding:"required"`
|
||||
RefreshToken string `json:"refresh_token" binding:"required"`
|
||||
}
|
||||
|
||||
// TokenRenewalResponse is returned after successful token renewal
|
||||
type TokenRenewalResponse struct {
|
||||
Token string `json:"token"` // New short-lived access token (24h)
|
||||
}
|
||||
|
||||
// UTCTime is a time.Time that marshals to ISO format with UTC timezone
|
||||
|
||||
@@ -33,11 +33,13 @@ type CommandItem struct {
|
||||
|
||||
// Command types
|
||||
const (
|
||||
CommandTypeScanUpdates = "scan_updates"
|
||||
CommandTypeCollectSpecs = "collect_specs"
|
||||
CommandTypeInstallUpdate = "install_updates"
|
||||
CommandTypeRollback = "rollback_update"
|
||||
CommandTypeUpdateAgent = "update_agent"
|
||||
CommandTypeScanUpdates = "scan_updates"
|
||||
CommandTypeCollectSpecs = "collect_specs"
|
||||
CommandTypeInstallUpdate = "install_updates"
|
||||
CommandTypeDryRunUpdate = "dry_run_update"
|
||||
CommandTypeConfirmDependencies = "confirm_dependencies"
|
||||
CommandTypeRollback = "rollback_update"
|
||||
CommandTypeUpdateAgent = "update_agent"
|
||||
)
|
||||
|
||||
// Command statuses
|
||||
@@ -46,4 +48,22 @@ const (
|
||||
CommandStatusSent = "sent"
|
||||
CommandStatusCompleted = "completed"
|
||||
CommandStatusFailed = "failed"
|
||||
CommandStatusTimedOut = "timed_out"
|
||||
CommandStatusCancelled = "cancelled"
|
||||
CommandStatusRunning = "running"
|
||||
)
|
||||
|
||||
// ActiveCommandInfo represents information about an active command for UI display
|
||||
type ActiveCommandInfo struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
AgentID uuid.UUID `json:"agent_id" db:"agent_id"`
|
||||
CommandType string `json:"command_type" db:"command_type"`
|
||||
Status string `json:"status" db:"status"`
|
||||
CreatedAt time.Time `json:"created_at" db:"created_at"`
|
||||
SentAt *time.Time `json:"sent_at,omitempty" db:"sent_at"`
|
||||
CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"`
|
||||
Result JSONB `json:"result,omitempty" db:"result"`
|
||||
AgentHostname string `json:"agent_hostname" db:"agent_hostname"`
|
||||
PackageName string `json:"package_name" db:"package_name"`
|
||||
PackageType string `json:"package_type" db:"package_type"`
|
||||
}
|
||||
|
||||
@@ -77,6 +77,30 @@ type UpdateLogRequest struct {
|
||||
DurationSeconds int `json:"duration_seconds"`
|
||||
}
|
||||
|
||||
// DependencyReportRequest is used by agents to report dependencies after dry run
|
||||
type DependencyReportRequest struct {
|
||||
PackageName string `json:"package_name" binding:"required"`
|
||||
PackageType string `json:"package_type" binding:"required"`
|
||||
Dependencies []string `json:"dependencies" binding:"required"`
|
||||
UpdateID string `json:"update_id" binding:"required"`
|
||||
DryRunResult *InstallResult `json:"dry_run_result,omitempty"`
|
||||
}
|
||||
|
||||
// InstallResult represents the result of a package installation attempt (from agent)
|
||||
type InstallResult struct {
|
||||
Success bool `json:"success"`
|
||||
ErrorMessage string `json:"error_message,omitempty"`
|
||||
Stdout string `json:"stdout,omitempty"`
|
||||
Stderr string `json:"stderr,omitempty"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
DurationSeconds int `json:"duration_seconds"`
|
||||
Action string `json:"action,omitempty"` // "install", "upgrade", "dry_run", etc.
|
||||
PackagesInstalled []string `json:"packages_installed,omitempty"`
|
||||
ContainersUpdated []string `json:"containers_updated,omitempty"`
|
||||
Dependencies []string `json:"dependencies,omitempty"` // List of dependency packages found during dry run
|
||||
IsDryRun bool `json:"is_dry_run"` // Whether this is a dry run result
|
||||
}
|
||||
|
||||
// UpdateFilters for querying updates
|
||||
type UpdateFilters struct {
|
||||
AgentID uuid.UUID
|
||||
@@ -163,3 +187,27 @@ type UpdateStats struct {
|
||||
ModerateUpdates int `json:"moderate_updates" db:"moderate_updates"`
|
||||
LowUpdates int `json:"low_updates" db:"low_updates"`
|
||||
}
|
||||
|
||||
// LogFilters for querying logs across all agents
|
||||
type LogFilters struct {
|
||||
AgentID uuid.UUID
|
||||
Action string
|
||||
Result string
|
||||
Since *time.Time
|
||||
Page int
|
||||
PageSize int
|
||||
}
|
||||
|
||||
// ActiveOperation represents a currently running operation
|
||||
type ActiveOperation struct {
|
||||
ID uuid.UUID `json:"id" db:"id"`
|
||||
AgentID uuid.UUID `json:"agent_id" db:"agent_id"`
|
||||
PackageType string `json:"package_type" db:"package_type"`
|
||||
PackageName string `json:"package_name" db:"package_name"`
|
||||
CurrentVersion string `json:"current_version" db:"current_version"`
|
||||
AvailableVersion string `json:"available_version" db:"available_version"`
|
||||
Severity string `json:"severity" db:"severity"`
|
||||
Status string `json:"status" db:"status"`
|
||||
LastUpdatedAt time.Time `json:"last_updated_at" db:"last_updated_at"`
|
||||
Metadata JSONB `json:"metadata" db:"metadata"`
|
||||
}
|
||||
|
||||
219
aggregator-server/internal/services/timeout.go
Normal file
219
aggregator-server/internal/services/timeout.go
Normal file
@@ -0,0 +1,219 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/aggregator-project/aggregator-server/internal/database/queries"
|
||||
"github.com/aggregator-project/aggregator-server/internal/models"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// TimeoutService handles timeout management for long-running operations
|
||||
type TimeoutService struct {
|
||||
commandQueries *queries.CommandQueries
|
||||
updateQueries *queries.UpdateQueries
|
||||
ticker *time.Ticker
|
||||
stopChan chan bool
|
||||
timeoutDuration time.Duration
|
||||
}
|
||||
|
||||
// NewTimeoutService creates a new timeout service
|
||||
func NewTimeoutService(cq *queries.CommandQueries, uq *queries.UpdateQueries) *TimeoutService {
|
||||
return &TimeoutService{
|
||||
commandQueries: cq,
|
||||
updateQueries: uq,
|
||||
timeoutDuration: 2 * time.Hour, // 2 hours timeout - allows for system upgrades and large operations
|
||||
stopChan: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the timeout monitoring service
|
||||
func (ts *TimeoutService) Start() {
|
||||
log.Printf("Starting timeout service with %v timeout duration", ts.timeoutDuration)
|
||||
|
||||
// Create a ticker that runs every 5 minutes
|
||||
ts.ticker = time.NewTicker(5 * time.Minute)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ts.ticker.C:
|
||||
ts.checkForTimeouts()
|
||||
case <-ts.stopChan:
|
||||
ts.ticker.Stop()
|
||||
log.Println("Timeout service stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop stops the timeout monitoring service
|
||||
func (ts *TimeoutService) Stop() {
|
||||
close(ts.stopChan)
|
||||
}
|
||||
|
||||
// checkForTimeouts checks for commands that have been running too long
|
||||
func (ts *TimeoutService) checkForTimeouts() {
|
||||
log.Println("Checking for timed out operations...")
|
||||
|
||||
// Get all commands that are in 'sent' status
|
||||
commands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusSent)
|
||||
if err != nil {
|
||||
log.Printf("Error getting sent commands: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
timeoutThreshold := time.Now().Add(-ts.timeoutDuration)
|
||||
timedOutCommands := make([]models.AgentCommand, 0)
|
||||
|
||||
for _, command := range commands {
|
||||
// Check if command has been sent and is older than timeout threshold
|
||||
if command.SentAt != nil && command.SentAt.Before(timeoutThreshold) {
|
||||
timedOutCommands = append(timedOutCommands, command)
|
||||
}
|
||||
}
|
||||
|
||||
if len(timedOutCommands) > 0 {
|
||||
log.Printf("Found %d timed out commands", len(timedOutCommands))
|
||||
|
||||
for _, command := range timedOutCommands {
|
||||
if err := ts.timeoutCommand(&command); err != nil {
|
||||
log.Printf("Error timing out command %s: %v", command.ID, err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Println("No timed out operations found")
|
||||
}
|
||||
}
|
||||
|
||||
// timeoutCommand marks a specific command as timed out and updates related entities
|
||||
func (ts *TimeoutService) timeoutCommand(command *models.AgentCommand) error {
|
||||
log.Printf("Timing out command %s (type: %s, agent: %s)",
|
||||
command.ID, command.CommandType, command.AgentID)
|
||||
|
||||
// Update command status to timed_out
|
||||
if err := ts.commandQueries.UpdateCommandStatus(command.ID, models.CommandStatusTimedOut); err != nil {
|
||||
return fmt.Errorf("failed to update command status: %w", err)
|
||||
}
|
||||
|
||||
// Update result with timeout information
|
||||
result := models.JSONB{
|
||||
"error": "operation timed out",
|
||||
"timeout_at": time.Now(),
|
||||
"duration": ts.timeoutDuration.String(),
|
||||
"command_id": command.ID.String(),
|
||||
}
|
||||
|
||||
if err := ts.commandQueries.UpdateCommandResult(command.ID, result); err != nil {
|
||||
return fmt.Errorf("failed to update command result: %w", err)
|
||||
}
|
||||
|
||||
// Update related update package status if applicable
|
||||
if err := ts.updateRelatedPackageStatus(command); err != nil {
|
||||
log.Printf("Warning: failed to update related package status: %v", err)
|
||||
// Don't return error here as the main timeout operation succeeded
|
||||
}
|
||||
|
||||
// Create a log entry for the timeout
|
||||
logEntry := &models.UpdateLog{
|
||||
ID: uuid.New(),
|
||||
AgentID: command.AgentID,
|
||||
UpdatePackageID: ts.extractUpdatePackageID(command),
|
||||
Action: command.CommandType,
|
||||
Result: "timed_out",
|
||||
Stdout: "",
|
||||
Stderr: fmt.Sprintf("Command %s timed out after %v", command.CommandType, ts.timeoutDuration),
|
||||
ExitCode: 124, // Standard timeout exit code
|
||||
DurationSeconds: int(ts.timeoutDuration.Seconds()),
|
||||
ExecutedAt: time.Now(),
|
||||
}
|
||||
|
||||
if err := ts.updateQueries.CreateUpdateLog(logEntry); err != nil {
|
||||
log.Printf("Warning: failed to create timeout log entry: %v", err)
|
||||
// Don't return error here as the main timeout operation succeeded
|
||||
}
|
||||
|
||||
log.Printf("Successfully timed out command %s", command.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateRelatedPackageStatus updates the status of related update packages when a command times out
|
||||
func (ts *TimeoutService) updateRelatedPackageStatus(command *models.AgentCommand) error {
|
||||
// Extract update_id from command params if it exists
|
||||
_, ok := command.Params["update_id"].(string)
|
||||
if !ok {
|
||||
// This command doesn't have an associated update_id, so no package status to update
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the package status to 'failed' with timeout reason
|
||||
metadata := models.JSONB{
|
||||
"timeout": true,
|
||||
"timeout_at": time.Now(),
|
||||
"timeout_duration": ts.timeoutDuration.String(),
|
||||
"command_id": command.ID.String(),
|
||||
"failure_reason": "operation timed out",
|
||||
}
|
||||
|
||||
return ts.updateQueries.UpdatePackageStatus(command.AgentID,
|
||||
command.Params["package_type"].(string),
|
||||
command.Params["package_name"].(string),
|
||||
"failed",
|
||||
metadata)
|
||||
}
|
||||
|
||||
// extractUpdatePackageID extracts the update package ID from command params
|
||||
func (ts *TimeoutService) extractUpdatePackageID(command *models.AgentCommand) *uuid.UUID {
|
||||
updateIDStr, ok := command.Params["update_id"].(string)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
updateID, err := uuid.Parse(updateIDStr)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &updateID
|
||||
}
|
||||
|
||||
// GetTimeoutStatus returns statistics about timed out operations
|
||||
func (ts *TimeoutService) GetTimeoutStatus() (map[string]interface{}, error) {
|
||||
// Get all timed out commands
|
||||
timedOutCommands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusTimedOut)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get timed out commands: %w", err)
|
||||
}
|
||||
|
||||
// Get all active commands
|
||||
activeCommands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusSent)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get active commands: %w", err)
|
||||
}
|
||||
|
||||
// Count commands approaching timeout (within 5 minutes of timeout)
|
||||
timeoutThreshold := time.Now().Add(-ts.timeoutDuration + 5*time.Minute)
|
||||
approachingTimeout := 0
|
||||
for _, command := range activeCommands {
|
||||
if command.SentAt != nil && command.SentAt.Before(timeoutThreshold) {
|
||||
approachingTimeout++
|
||||
}
|
||||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
"total_timed_out": len(timedOutCommands),
|
||||
"total_active": len(activeCommands),
|
||||
"approaching_timeout": approachingTimeout,
|
||||
"timeout_duration": ts.timeoutDuration.String(),
|
||||
"last_check": time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SetTimeoutDuration allows changing the timeout duration
|
||||
func (ts *TimeoutService) SetTimeoutDuration(duration time.Duration) {
|
||||
ts.timeoutDuration = duration
|
||||
log.Printf("Timeout duration updated to %v", duration)
|
||||
}
|
||||
Reference in New Issue
Block a user