fix(concurrency): wire retry_count increment for stuck command re-delivery (DEV-029)
retry_count column and filter existed but counter was never incremented. Stuck commands always had retry_count=0 and always passed the WHERE retry_count < 5 filter, making the cap ineffective. Fix: Added RedeliverStuckCommandTx that sets retry_count = retry_count + 1 on stuck->sent re-delivery. GetCommands handler now uses MarkCommandSentTx for new commands (retry_count stays 0) and RedeliverStuckCommandTx for stuck re-delivery (retry_count increments). All 77 tests pass. DEV-029 resolved. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -474,12 +474,12 @@ func (h *AgentHandler) GetCommands(c *gin.Context) {
|
|||||||
log.Printf("[WARNING] [server] [command] get_stuck_failed agent_id=%s error=%v", agentID, err)
|
log.Printf("[WARNING] [server] [command] get_stuck_failed agent_id=%s error=%v", agentID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Combine all commands to return
|
// Convert to response format and mark as sent within the transaction
|
||||||
allCommands := append(pendingCommands, stuckCommands...)
|
allCommands := append(pendingCommands, stuckCommands...)
|
||||||
|
|
||||||
// Convert to response format and mark all as sent within the same transaction
|
|
||||||
commandItems := make([]models.CommandItem, 0, len(allCommands))
|
commandItems := make([]models.CommandItem, 0, len(allCommands))
|
||||||
for _, cmd := range allCommands {
|
|
||||||
|
// Mark pending commands as sent (first delivery — retry_count stays at 0)
|
||||||
|
for _, cmd := range pendingCommands {
|
||||||
createdAt := cmd.CreatedAt
|
createdAt := cmd.CreatedAt
|
||||||
commandItems = append(commandItems, models.CommandItem{
|
commandItems = append(commandItems, models.CommandItem{
|
||||||
ID: cmd.ID.String(),
|
ID: cmd.ID.String(),
|
||||||
@@ -491,13 +491,29 @@ func (h *AgentHandler) GetCommands(c *gin.Context) {
|
|||||||
AgentID: cmd.AgentID.String(),
|
AgentID: cmd.AgentID.String(),
|
||||||
CreatedAt: &createdAt,
|
CreatedAt: &createdAt,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Mark as sent within the transaction
|
|
||||||
if err := h.commandQueries.MarkCommandSentTx(cmdTx, cmd.ID); err != nil {
|
if err := h.commandQueries.MarkCommandSentTx(cmdTx, cmd.ID); err != nil {
|
||||||
log.Printf("[ERROR] [server] [command] mark_sent_failed command_id=%s error=%v", cmd.ID, err)
|
log.Printf("[ERROR] [server] [command] mark_sent_failed command_id=%s error=%v", cmd.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Re-deliver stuck commands (increments retry_count — DEV-029 fix)
|
||||||
|
for _, cmd := range stuckCommands {
|
||||||
|
createdAt := cmd.CreatedAt
|
||||||
|
commandItems = append(commandItems, models.CommandItem{
|
||||||
|
ID: cmd.ID.String(),
|
||||||
|
Type: cmd.CommandType,
|
||||||
|
Params: cmd.Params,
|
||||||
|
Signature: cmd.Signature,
|
||||||
|
KeyID: cmd.KeyID,
|
||||||
|
SignedAt: cmd.SignedAt,
|
||||||
|
AgentID: cmd.AgentID.String(),
|
||||||
|
CreatedAt: &createdAt,
|
||||||
|
})
|
||||||
|
if err := h.commandQueries.RedeliverStuckCommandTx(cmdTx, cmd.ID); err != nil {
|
||||||
|
log.Printf("[ERROR] [server] [command] redeliver_stuck_failed command_id=%s error=%v", cmd.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Commit the transaction — releases locks
|
// Commit the transaction — releases locks
|
||||||
if err := cmdTx.Commit(); err != nil {
|
if err := cmdTx.Commit(); err != nil {
|
||||||
log.Printf("[ERROR] [server] [command] transaction_commit_failed agent_id=%s error=%v", agentID, err)
|
log.Printf("[ERROR] [server] [command] transaction_commit_failed agent_id=%s error=%v", agentID, err)
|
||||||
|
|||||||
@@ -145,6 +145,19 @@ func (q *CommandQueries) MarkCommandSentTx(tx *sqlx.Tx, id uuid.UUID) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RedeliverStuckCommandTx marks a stuck command for re-delivery, incrementing retry_count (DEV-029 fix).
|
||||||
|
// Only used for stuck command re-delivery, NOT for initial first delivery.
|
||||||
|
func (q *CommandQueries) RedeliverStuckCommandTx(tx *sqlx.Tx, id uuid.UUID) error {
|
||||||
|
now := time.Now()
|
||||||
|
query := `
|
||||||
|
UPDATE agent_commands
|
||||||
|
SET status = 'sent', sent_at = $1, retry_count = retry_count + 1
|
||||||
|
WHERE id = $2
|
||||||
|
`
|
||||||
|
_, err := tx.Exec(query, now, id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// GetStuckCommandsTx retrieves stuck commands with FOR UPDATE SKIP LOCKED (F-B2-2 fix)
|
// GetStuckCommandsTx retrieves stuck commands with FOR UPDATE SKIP LOCKED (F-B2-2 fix)
|
||||||
// Excludes commands that have exceeded max retries (F-B2-10 fix)
|
// Excludes commands that have exceeded max retries (F-B2-10 fix)
|
||||||
func (q *CommandQueries) GetStuckCommandsTx(tx *sqlx.Tx, agentID uuid.UUID, olderThan time.Duration) ([]models.AgentCommand, error) {
|
func (q *CommandQueries) GetStuckCommandsTx(tx *sqlx.Tx, agentID uuid.UUID, olderThan time.Duration) ([]models.AgentCommand, error) {
|
||||||
|
|||||||
@@ -3,7 +3,8 @@ package database_test
|
|||||||
// stuck_command_retry_test.go — Tests for stuck command retry limit.
|
// stuck_command_retry_test.go — Tests for stuck command retry limit.
|
||||||
//
|
//
|
||||||
// F-B2-10 FIXED: retry_count column added (migration 029).
|
// F-B2-10 FIXED: retry_count column added (migration 029).
|
||||||
// GetStuckCommands now filters with retry_count < 5.
|
// GetStuckCommands filters retry_count < 5.
|
||||||
|
// RedeliverStuckCommandTx increments retry_count on re-delivery (DEV-029 fix).
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
@@ -13,7 +14,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestStuckCommandHasNoMaxRetryCount(t *testing.T) {
|
func TestStuckCommandHasNoMaxRetryCount(t *testing.T) {
|
||||||
// POST-FIX: retry_count column exists and GetStuckCommands filters on it.
|
// POST-FIX: retry_count column exists, filter in query, and increment wired.
|
||||||
migrationsDir := filepath.Join("migrations")
|
migrationsDir := filepath.Join("migrations")
|
||||||
files, err := os.ReadDir(migrationsDir)
|
files, err := os.ReadDir(migrationsDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -43,15 +44,15 @@ func TestStuckCommandHasNoMaxRetryCount(t *testing.T) {
|
|||||||
cmdPath := filepath.Join("queries", "commands.go")
|
cmdPath := filepath.Join("queries", "commands.go")
|
||||||
content, err := os.ReadFile(cmdPath)
|
content, err := os.ReadFile(cmdPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Logf("[WARNING] [server] [database] could not read commands.go: %v", err)
|
t.Fatalf("failed to read commands.go: %v", err)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
src := string(content)
|
src := string(content)
|
||||||
|
|
||||||
|
// Verify filter exists in GetStuckCommands
|
||||||
stuckIdx := strings.Index(src, "func (q *CommandQueries) GetStuckCommands")
|
stuckIdx := strings.Index(src, "func (q *CommandQueries) GetStuckCommands")
|
||||||
if stuckIdx == -1 {
|
if stuckIdx == -1 {
|
||||||
t.Log("[WARNING] [server] [database] GetStuckCommands function not found")
|
t.Fatal("[ERROR] [server] [database] GetStuckCommands function not found")
|
||||||
return
|
|
||||||
}
|
}
|
||||||
stuckBody := src[stuckIdx:]
|
stuckBody := src[stuckIdx:]
|
||||||
if len(stuckBody) > 500 {
|
if len(stuckBody) > 500 {
|
||||||
@@ -61,34 +62,44 @@ func TestStuckCommandHasNoMaxRetryCount(t *testing.T) {
|
|||||||
t.Error("[ERROR] [server] [database] F-B2-10 NOT FIXED: GetStuckCommands has no retry filter")
|
t.Error("[ERROR] [server] [database] F-B2-10 NOT FIXED: GetStuckCommands has no retry filter")
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Log("[INFO] [server] [database] F-B2-10 FIXED: retry count limit on stuck commands")
|
// DEV-029: Verify RedeliverStuckCommandTx increments retry_count
|
||||||
|
if !strings.Contains(src, "RedeliverStuckCommandTx") {
|
||||||
|
t.Error("[ERROR] [server] [database] DEV-029 NOT FIXED: no RedeliverStuckCommandTx function")
|
||||||
|
}
|
||||||
|
redeliverIdx := strings.Index(src, "func (q *CommandQueries) RedeliverStuckCommandTx")
|
||||||
|
if redeliverIdx != -1 {
|
||||||
|
redeliverBody := src[redeliverIdx:]
|
||||||
|
if len(redeliverBody) > 300 {
|
||||||
|
redeliverBody = redeliverBody[:300]
|
||||||
|
}
|
||||||
|
if !strings.Contains(redeliverBody, "retry_count = retry_count + 1") {
|
||||||
|
t.Error("[ERROR] [server] [database] DEV-029 NOT FIXED: RedeliverStuckCommandTx does not increment retry_count")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("[INFO] [server] [database] F-B2-10 + DEV-029 FIXED: retry count wired end-to-end")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStuckCommandHasMaxRetryCount(t *testing.T) {
|
func TestStuckCommandHasMaxRetryCount(t *testing.T) {
|
||||||
migrationsDir := filepath.Join("migrations")
|
// Verify: column exists, filter in query, and increment function exists
|
||||||
files, err := os.ReadDir(migrationsDir)
|
cmdPath := filepath.Join("queries", "commands.go")
|
||||||
|
content, err := os.ReadFile(cmdPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to read migrations directory: %v", err)
|
t.Fatalf("failed to read commands.go: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hasRetryCount := false
|
src := string(content)
|
||||||
for _, f := range files {
|
|
||||||
if !strings.HasSuffix(f.Name(), ".up.sql") {
|
// Must have RedeliverStuckCommandTx with retry_count increment
|
||||||
continue
|
if !strings.Contains(src, "retry_count = retry_count + 1") {
|
||||||
}
|
t.Errorf("[ERROR] [server] [database] retry_count is never incremented.\n" +
|
||||||
content, err := os.ReadFile(filepath.Join(migrationsDir, f.Name()))
|
"DEV-029: RedeliverStuckCommandTx must increment retry_count on re-delivery.")
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
src := strings.ToLower(string(content))
|
|
||||||
if strings.Contains(src, "agent_commands") && strings.Contains(src, "retry_count") {
|
|
||||||
hasRetryCount = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !hasRetryCount {
|
// Must have retry_count < 5 filter
|
||||||
t.Errorf("[ERROR] [server] [database] no retry_count column on agent_commands.\n" +
|
if !strings.Contains(src, "retry_count < 5") {
|
||||||
"F-B2-10: add retry_count and cap re-delivery at max retries.")
|
t.Errorf("[ERROR] [server] [database] no retry_count < 5 filter in stuck command queries")
|
||||||
}
|
}
|
||||||
t.Log("[INFO] [server] [database] F-B2-10 FIXED: retry_count column exists")
|
|
||||||
|
t.Log("[INFO] [server] [database] F-B2-10 + DEV-029 FIXED: retry count capped at 5")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -293,4 +293,4 @@ This document records deviations from the implementation spec.
|
|||||||
|
|
||||||
**Impact:** LOW — stuck commands are not capped at 5 retries as intended. They continue to be re-delivered indefinitely (pre-fix behavior). The fix is structurally correct (column, index, filter all in place) but the increment step was missed.
|
**Impact:** LOW — stuck commands are not capped at 5 retries as intended. They continue to be re-delivered indefinitely (pre-fix behavior). The fix is structurally correct (column, index, filter all in place) but the increment step was missed.
|
||||||
|
|
||||||
**Action required:** Add `retry_count = retry_count + 1` to the UPDATE in `MarkCommandSentTx` or add a separate function for stuck command re-delivery that increments the counter. This is a targeted one-line fix for the next round.
|
**Resolution:** Added `RedeliverStuckCommandTx` function that sets `retry_count = retry_count + 1` on re-delivery. The GetCommands handler now uses `MarkCommandSentTx` for new pending commands (retry_count stays 0) and `RedeliverStuckCommandTx` for stuck command re-delivery (retry_count increments). The `retry_count < 5` filter is now effective.
|
||||||
|
|||||||
Reference in New Issue
Block a user