From f71f878a3530eb3f5a4bf385d08389bb8f00327a Mon Sep 17 00:00:00 2001 From: jpetree331 Date: Sun, 29 Mar 2026 08:16:12 -0400 Subject: [PATCH] fix(concurrency): wire retry_count increment for stuck command re-delivery (DEV-029) retry_count column and filter existed but counter was never incremented. Stuck commands always had retry_count=0 and always passed the WHERE retry_count < 5 filter, making the cap ineffective. Fix: Added RedeliverStuckCommandTx that sets retry_count = retry_count + 1 on stuck->sent re-delivery. GetCommands handler now uses MarkCommandSentTx for new commands (retry_count stays 0) and RedeliverStuckCommandTx for stuck re-delivery (retry_count increments). All 77 tests pass. DEV-029 resolved. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../internal/api/handlers/agents.go | 28 ++++++-- .../internal/database/queries/commands.go | 13 ++++ .../database/stuck_command_retry_test.go | 65 +++++++++++-------- docs/Deviations_Report.md | 2 +- 4 files changed, 74 insertions(+), 34 deletions(-) diff --git a/aggregator-server/internal/api/handlers/agents.go b/aggregator-server/internal/api/handlers/agents.go index 3f9471b..b8c7ab3 100644 --- a/aggregator-server/internal/api/handlers/agents.go +++ b/aggregator-server/internal/api/handlers/agents.go @@ -474,12 +474,12 @@ func (h *AgentHandler) GetCommands(c *gin.Context) { log.Printf("[WARNING] [server] [command] get_stuck_failed agent_id=%s error=%v", agentID, err) } - // Combine all commands to return + // Convert to response format and mark as sent within the transaction allCommands := append(pendingCommands, stuckCommands...) - - // Convert to response format and mark all as sent within the same transaction commandItems := make([]models.CommandItem, 0, len(allCommands)) - for _, cmd := range allCommands { + + // Mark pending commands as sent (first delivery — retry_count stays at 0) + for _, cmd := range pendingCommands { createdAt := cmd.CreatedAt commandItems = append(commandItems, models.CommandItem{ ID: cmd.ID.String(), @@ -491,13 +491,29 @@ func (h *AgentHandler) GetCommands(c *gin.Context) { AgentID: cmd.AgentID.String(), CreatedAt: &createdAt, }) - - // Mark as sent within the transaction if err := h.commandQueries.MarkCommandSentTx(cmdTx, cmd.ID); err != nil { log.Printf("[ERROR] [server] [command] mark_sent_failed command_id=%s error=%v", cmd.ID, err) } } + // Re-deliver stuck commands (increments retry_count — DEV-029 fix) + for _, cmd := range stuckCommands { + createdAt := cmd.CreatedAt + commandItems = append(commandItems, models.CommandItem{ + ID: cmd.ID.String(), + Type: cmd.CommandType, + Params: cmd.Params, + Signature: cmd.Signature, + KeyID: cmd.KeyID, + SignedAt: cmd.SignedAt, + AgentID: cmd.AgentID.String(), + CreatedAt: &createdAt, + }) + if err := h.commandQueries.RedeliverStuckCommandTx(cmdTx, cmd.ID); err != nil { + log.Printf("[ERROR] [server] [command] redeliver_stuck_failed command_id=%s error=%v", cmd.ID, err) + } + } + // Commit the transaction — releases locks if err := cmdTx.Commit(); err != nil { log.Printf("[ERROR] [server] [command] transaction_commit_failed agent_id=%s error=%v", agentID, err) diff --git a/aggregator-server/internal/database/queries/commands.go b/aggregator-server/internal/database/queries/commands.go index d365a70..0f0e828 100644 --- a/aggregator-server/internal/database/queries/commands.go +++ b/aggregator-server/internal/database/queries/commands.go @@ -145,6 +145,19 @@ func (q *CommandQueries) MarkCommandSentTx(tx *sqlx.Tx, id uuid.UUID) error { return err } +// RedeliverStuckCommandTx marks a stuck command for re-delivery, incrementing retry_count (DEV-029 fix). +// Only used for stuck command re-delivery, NOT for initial first delivery. +func (q *CommandQueries) RedeliverStuckCommandTx(tx *sqlx.Tx, id uuid.UUID) error { + now := time.Now() + query := ` + UPDATE agent_commands + SET status = 'sent', sent_at = $1, retry_count = retry_count + 1 + WHERE id = $2 + ` + _, err := tx.Exec(query, now, id) + return err +} + // GetStuckCommandsTx retrieves stuck commands with FOR UPDATE SKIP LOCKED (F-B2-2 fix) // Excludes commands that have exceeded max retries (F-B2-10 fix) func (q *CommandQueries) GetStuckCommandsTx(tx *sqlx.Tx, agentID uuid.UUID, olderThan time.Duration) ([]models.AgentCommand, error) { diff --git a/aggregator-server/internal/database/stuck_command_retry_test.go b/aggregator-server/internal/database/stuck_command_retry_test.go index 371bd00..5756c6e 100644 --- a/aggregator-server/internal/database/stuck_command_retry_test.go +++ b/aggregator-server/internal/database/stuck_command_retry_test.go @@ -3,7 +3,8 @@ package database_test // stuck_command_retry_test.go — Tests for stuck command retry limit. // // F-B2-10 FIXED: retry_count column added (migration 029). -// GetStuckCommands now filters with retry_count < 5. +// GetStuckCommands filters retry_count < 5. +// RedeliverStuckCommandTx increments retry_count on re-delivery (DEV-029 fix). import ( "os" @@ -13,7 +14,7 @@ import ( ) func TestStuckCommandHasNoMaxRetryCount(t *testing.T) { - // POST-FIX: retry_count column exists and GetStuckCommands filters on it. + // POST-FIX: retry_count column exists, filter in query, and increment wired. migrationsDir := filepath.Join("migrations") files, err := os.ReadDir(migrationsDir) if err != nil { @@ -43,15 +44,15 @@ func TestStuckCommandHasNoMaxRetryCount(t *testing.T) { cmdPath := filepath.Join("queries", "commands.go") content, err := os.ReadFile(cmdPath) if err != nil { - t.Logf("[WARNING] [server] [database] could not read commands.go: %v", err) - return + t.Fatalf("failed to read commands.go: %v", err) } src := string(content) + + // Verify filter exists in GetStuckCommands stuckIdx := strings.Index(src, "func (q *CommandQueries) GetStuckCommands") if stuckIdx == -1 { - t.Log("[WARNING] [server] [database] GetStuckCommands function not found") - return + t.Fatal("[ERROR] [server] [database] GetStuckCommands function not found") } stuckBody := src[stuckIdx:] if len(stuckBody) > 500 { @@ -61,34 +62,44 @@ func TestStuckCommandHasNoMaxRetryCount(t *testing.T) { t.Error("[ERROR] [server] [database] F-B2-10 NOT FIXED: GetStuckCommands has no retry filter") } - t.Log("[INFO] [server] [database] F-B2-10 FIXED: retry count limit on stuck commands") + // DEV-029: Verify RedeliverStuckCommandTx increments retry_count + if !strings.Contains(src, "RedeliverStuckCommandTx") { + t.Error("[ERROR] [server] [database] DEV-029 NOT FIXED: no RedeliverStuckCommandTx function") + } + redeliverIdx := strings.Index(src, "func (q *CommandQueries) RedeliverStuckCommandTx") + if redeliverIdx != -1 { + redeliverBody := src[redeliverIdx:] + if len(redeliverBody) > 300 { + redeliverBody = redeliverBody[:300] + } + if !strings.Contains(redeliverBody, "retry_count = retry_count + 1") { + t.Error("[ERROR] [server] [database] DEV-029 NOT FIXED: RedeliverStuckCommandTx does not increment retry_count") + } + } + + t.Log("[INFO] [server] [database] F-B2-10 + DEV-029 FIXED: retry count wired end-to-end") } func TestStuckCommandHasMaxRetryCount(t *testing.T) { - migrationsDir := filepath.Join("migrations") - files, err := os.ReadDir(migrationsDir) + // Verify: column exists, filter in query, and increment function exists + cmdPath := filepath.Join("queries", "commands.go") + content, err := os.ReadFile(cmdPath) if err != nil { - t.Fatalf("failed to read migrations directory: %v", err) + t.Fatalf("failed to read commands.go: %v", err) } - hasRetryCount := false - for _, f := range files { - if !strings.HasSuffix(f.Name(), ".up.sql") { - continue - } - content, err := os.ReadFile(filepath.Join(migrationsDir, f.Name())) - if err != nil { - continue - } - src := strings.ToLower(string(content)) - if strings.Contains(src, "agent_commands") && strings.Contains(src, "retry_count") { - hasRetryCount = true - } + src := string(content) + + // Must have RedeliverStuckCommandTx with retry_count increment + if !strings.Contains(src, "retry_count = retry_count + 1") { + t.Errorf("[ERROR] [server] [database] retry_count is never incremented.\n" + + "DEV-029: RedeliverStuckCommandTx must increment retry_count on re-delivery.") } - if !hasRetryCount { - t.Errorf("[ERROR] [server] [database] no retry_count column on agent_commands.\n" + - "F-B2-10: add retry_count and cap re-delivery at max retries.") + // Must have retry_count < 5 filter + if !strings.Contains(src, "retry_count < 5") { + t.Errorf("[ERROR] [server] [database] no retry_count < 5 filter in stuck command queries") } - t.Log("[INFO] [server] [database] F-B2-10 FIXED: retry_count column exists") + + t.Log("[INFO] [server] [database] F-B2-10 + DEV-029 FIXED: retry count capped at 5") } diff --git a/docs/Deviations_Report.md b/docs/Deviations_Report.md index b85b856..cb0f9cf 100644 --- a/docs/Deviations_Report.md +++ b/docs/Deviations_Report.md @@ -293,4 +293,4 @@ This document records deviations from the implementation spec. **Impact:** LOW — stuck commands are not capped at 5 retries as intended. They continue to be re-delivered indefinitely (pre-fix behavior). The fix is structurally correct (column, index, filter all in place) but the increment step was missed. -**Action required:** Add `retry_count = retry_count + 1` to the UPDATE in `MarkCommandSentTx` or add a separate function for stuck command re-delivery that increments the counter. This is a targeted one-line fix for the next round. +**Resolution:** Added `RedeliverStuckCommandTx` function that sets `retry_count = retry_count + 1` on re-delivery. The GetCommands handler now uses `MarkCommandSentTx` for new pending commands (retry_count stays 0) and `RedeliverStuckCommandTx` for stuck command re-delivery (retry_count increments). The `retry_count < 5` filter is now effective.