All 6 timeout values configurable via DB settings. Fallback to defaults confirmed for fresh installs. Path traversal defense verified (403 + logging). Fixed hardcoded duration references in timeout.go log output. 163 tests pass, no regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
271 lines
9.3 KiB
Go
271 lines
9.3 KiB
Go
package services
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/Fimeg/RedFlag/aggregator-server/internal/database/queries"
|
|
"github.com/Fimeg/RedFlag/aggregator-server/internal/models"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// TimeoutService handles timeout management for long-running operations
|
|
type TimeoutService struct {
|
|
commandQueries *queries.CommandQueries
|
|
updateQueries *queries.UpdateQueries
|
|
ticker *time.Ticker
|
|
stopChan chan bool
|
|
sentTimeout time.Duration // For commands already sent to agents
|
|
pendingTimeout time.Duration // For commands stuck in queue
|
|
checkInterval time.Duration // How often to check for timeouts
|
|
}
|
|
|
|
// NewTimeoutService creates a new timeout service with configurable durations.
|
|
// Pass zero values to use defaults (2h sent, 30m pending, 5m check interval).
|
|
func NewTimeoutService(cq *queries.CommandQueries, uq *queries.UpdateQueries, sentTimeout, pendingTimeout, checkInterval time.Duration) *TimeoutService {
|
|
if sentTimeout <= 0 {
|
|
sentTimeout = 2 * time.Hour
|
|
}
|
|
if pendingTimeout <= 0 {
|
|
pendingTimeout = 30 * time.Minute
|
|
}
|
|
if checkInterval <= 0 {
|
|
checkInterval = 5 * time.Minute
|
|
}
|
|
return &TimeoutService{
|
|
commandQueries: cq,
|
|
updateQueries: uq,
|
|
sentTimeout: sentTimeout,
|
|
pendingTimeout: pendingTimeout,
|
|
checkInterval: checkInterval,
|
|
stopChan: make(chan bool),
|
|
}
|
|
}
|
|
|
|
// Start begins the timeout monitoring service
|
|
func (ts *TimeoutService) Start() {
|
|
log.Printf("[INFO] [server] [timeout] service_started sent_timeout=%v pending_timeout=%v check_interval=%v", ts.sentTimeout, ts.pendingTimeout, ts.checkInterval)
|
|
|
|
ts.ticker = time.NewTicker(ts.checkInterval)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ts.ticker.C:
|
|
ts.checkForTimeouts()
|
|
case <-ts.stopChan:
|
|
ts.ticker.Stop()
|
|
log.Println("Timeout service stopped")
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Stop stops the timeout monitoring service
|
|
func (ts *TimeoutService) Stop() {
|
|
close(ts.stopChan)
|
|
}
|
|
|
|
// checkForTimeouts checks for commands that have been running too long
|
|
func (ts *TimeoutService) checkForTimeouts() {
|
|
log.Println("Checking for timed out operations...")
|
|
|
|
sentTimeoutThreshold := time.Now().Add(-ts.sentTimeout)
|
|
pendingTimeoutThreshold := time.Now().Add(-ts.pendingTimeout)
|
|
timedOutCommands := make([]models.AgentCommand, 0)
|
|
|
|
// Check 'sent' commands (configurable, default 2 hours)
|
|
sentCommands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusSent)
|
|
if err != nil {
|
|
log.Printf("Error getting sent commands: %v", err)
|
|
} else {
|
|
for _, command := range sentCommands {
|
|
// Check if command has been sent and is older than sent timeout threshold
|
|
if command.SentAt != nil && command.SentAt.Before(sentTimeoutThreshold) {
|
|
timedOutCommands = append(timedOutCommands, command)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check 'pending' commands (configurable, default 30 minutes)
|
|
pendingCommands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusPending)
|
|
if err != nil {
|
|
log.Printf("Error getting pending commands: %v", err)
|
|
} else {
|
|
for _, command := range pendingCommands {
|
|
// Check if command has been pending longer than pending timeout threshold
|
|
if command.CreatedAt.Before(pendingTimeoutThreshold) {
|
|
timedOutCommands = append(timedOutCommands, command)
|
|
log.Printf("Found stuck pending command %s (type: %s, created: %s, age: %v)",
|
|
command.ID, command.CommandType, command.CreatedAt.Format(time.RFC3339), time.Since(command.CreatedAt))
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(timedOutCommands) > 0 {
|
|
log.Printf("[INFO] [server] [timeout] timed_out_commands=%d sent_checked=%d pending_checked=%d sent_timeout=%v pending_timeout=%v",
|
|
len(timedOutCommands), len(sentCommands), len(pendingCommands), ts.sentTimeout, ts.pendingTimeout)
|
|
|
|
for _, command := range timedOutCommands {
|
|
if err := ts.timeoutCommand(&command); err != nil {
|
|
log.Printf("Error timing out command %s: %v", command.ID, err)
|
|
}
|
|
}
|
|
} else {
|
|
log.Println("No timed out operations found")
|
|
}
|
|
}
|
|
|
|
// timeoutCommand marks a specific command as timed out and updates related entities
|
|
func (ts *TimeoutService) timeoutCommand(command *models.AgentCommand) error {
|
|
// Determine which timeout duration was applied
|
|
var appliedTimeout time.Duration
|
|
if command.Status == models.CommandStatusSent {
|
|
appliedTimeout = ts.sentTimeout
|
|
} else {
|
|
appliedTimeout = ts.pendingTimeout
|
|
}
|
|
|
|
log.Printf("Timing out command %s (type: %s, agent: %s)",
|
|
command.ID, command.CommandType, command.AgentID)
|
|
|
|
// Update command status to timed_out
|
|
if err := ts.commandQueries.UpdateCommandStatus(command.ID, models.CommandStatusTimedOut); err != nil {
|
|
return fmt.Errorf("failed to update command status: %w", err)
|
|
}
|
|
|
|
// Update result with timeout information
|
|
result := models.JSONB{
|
|
"error": "operation timed out",
|
|
"timeout_at": time.Now(),
|
|
"duration": appliedTimeout.String(),
|
|
"command_id": command.ID.String(),
|
|
}
|
|
|
|
if err := ts.commandQueries.UpdateCommandResult(command.ID, result); err != nil {
|
|
return fmt.Errorf("failed to update command result: %w", err)
|
|
}
|
|
|
|
// Update related update package status if applicable
|
|
if err := ts.updateRelatedPackageStatus(command, appliedTimeout); err != nil {
|
|
log.Printf("Warning: failed to update related package status: %v", err)
|
|
// Don't return error here as the main timeout operation succeeded
|
|
}
|
|
|
|
// Create a log entry for the timeout
|
|
logEntry := &models.UpdateLog{
|
|
ID: uuid.New(),
|
|
AgentID: command.AgentID,
|
|
UpdatePackageID: ts.extractUpdatePackageID(command),
|
|
Action: command.CommandType,
|
|
Result: "failed", // Use 'failed' to comply with database constraint
|
|
Stdout: "",
|
|
Stderr: fmt.Sprintf("Command %s timed out after %v (timeout_id: %s)", command.CommandType, appliedTimeout, command.ID),
|
|
ExitCode: 124, // Standard timeout exit code
|
|
DurationSeconds: int(appliedTimeout.Seconds()),
|
|
ExecutedAt: time.Now(),
|
|
}
|
|
|
|
if err := ts.updateQueries.CreateUpdateLog(logEntry); err != nil {
|
|
log.Printf("Warning: failed to create timeout log entry: %v", err)
|
|
// Don't return error here as the main timeout operation succeeded
|
|
}
|
|
|
|
log.Printf("Successfully timed out command %s", command.ID)
|
|
return nil
|
|
}
|
|
|
|
// updateRelatedPackageStatus updates the status of related update packages when a command times out
|
|
func (ts *TimeoutService) updateRelatedPackageStatus(command *models.AgentCommand, appliedTimeout time.Duration) error {
|
|
// Extract update_id from command params if it exists
|
|
_, ok := command.Params["update_id"].(string)
|
|
if !ok {
|
|
// This command doesn't have an associated update_id, so no package status to update
|
|
return nil
|
|
}
|
|
|
|
// Update the package status to 'failed' with timeout reason
|
|
metadata := models.JSONB{
|
|
"timeout": true,
|
|
"timeout_at": time.Now(),
|
|
"timeout_duration": appliedTimeout.String(),
|
|
"command_id": command.ID.String(),
|
|
"failure_reason": "operation timed out",
|
|
}
|
|
|
|
return ts.updateQueries.UpdatePackageStatus(command.AgentID,
|
|
command.Params["package_type"].(string),
|
|
command.Params["package_name"].(string),
|
|
"failed",
|
|
metadata,
|
|
nil) // nil = use time.Now() for timeout operations
|
|
}
|
|
|
|
// extractUpdatePackageID extracts the update package ID from command params
|
|
func (ts *TimeoutService) extractUpdatePackageID(command *models.AgentCommand) *uuid.UUID {
|
|
updateIDStr, ok := command.Params["update_id"].(string)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
updateID, err := uuid.Parse(updateIDStr)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return &updateID
|
|
}
|
|
|
|
// GetTimeoutStatus returns statistics about timed out operations
|
|
func (ts *TimeoutService) GetTimeoutStatus() (map[string]interface{}, error) {
|
|
// Get all timed out commands
|
|
timedOutCommands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusTimedOut)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get timed out commands: %w", err)
|
|
}
|
|
|
|
// Get all active commands
|
|
activeCommands, err := ts.commandQueries.GetCommandsByStatus(models.CommandStatusSent)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get active commands: %w", err)
|
|
}
|
|
|
|
// Count commands approaching timeout (within 5 minutes of timeout)
|
|
timeoutThreshold := time.Now().Add(-ts.sentTimeout + 5*time.Minute)
|
|
approachingTimeout := 0
|
|
for _, command := range activeCommands {
|
|
if command.SentAt != nil && command.SentAt.Before(timeoutThreshold) {
|
|
approachingTimeout++
|
|
}
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"total_timed_out": len(timedOutCommands),
|
|
"total_active": len(activeCommands),
|
|
"approaching_timeout": approachingTimeout,
|
|
"sent_timeout_duration": ts.sentTimeout.String(),
|
|
"pending_timeout_duration": ts.pendingTimeout.String(),
|
|
"last_check": time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// SetTimeoutDuration allows changing the timeout duration for sent commands
|
|
// TODO: This should be deprecated in favor of SetSentTimeout and SetPendingTimeout
|
|
func (ts *TimeoutService) SetTimeoutDuration(duration time.Duration) {
|
|
ts.sentTimeout = duration
|
|
log.Printf("Sent timeout duration updated to %v", duration)
|
|
}
|
|
|
|
// SetSentTimeout allows changing the timeout duration for sent commands
|
|
func (ts *TimeoutService) SetSentTimeout(duration time.Duration) {
|
|
ts.sentTimeout = duration
|
|
log.Printf("Sent timeout duration updated to %v", duration)
|
|
}
|
|
|
|
// SetPendingTimeout allows changing the timeout duration for pending commands
|
|
func (ts *TimeoutService) SetPendingTimeout(duration time.Duration) {
|
|
ts.pendingTimeout = duration
|
|
log.Printf("Pending timeout duration updated to %v", duration)
|
|
} |