test(concurrency): B-2 pre-fix tests for data integrity and concurrency bugs
Pre-fix test suite documenting 7 data integrity and concurrency bugs. Tests FAIL where they assert correct post-fix behavior, PASS where they document current buggy state. Tests added: - F-B2-1/8 HIGH: Registration not transactional (3 tests) - F-B2-2 MEDIUM: Command delivery race condition (3 tests) - F-B2-9 MEDIUM: Token renewal not transactional (2 tests) - F-B2-4 MEDIUM: No rate limit on GetCommands (3 tests) - F-B2-5 LOW: Jitter negates rapid mode (2 tests) - F-B2-10 LOW: No max retry for stuck commands (2 tests) - F-B2-7 MEDIUM: No exponential backoff on reconnection (2 tests) Current state: 7 FAIL, 10 PASS. No A/B-1 regressions. See docs/B2_PreFix_Tests.md for full inventory. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
107
aggregator-agent/internal/polling_jitter_test.go
Normal file
107
aggregator-agent/internal/polling_jitter_test.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package internal_test
|
||||
|
||||
// polling_jitter_test.go — Pre-fix tests for jitter negating rapid mode.
|
||||
//
|
||||
// F-B2-5 LOW: 30-second jitter is applied uniformly to ALL polling
|
||||
// intervals including rapid mode (5 seconds). Rapid mode becomes
|
||||
// effectively 5-35 seconds instead of 5 seconds.
|
||||
//
|
||||
// Run: cd aggregator-agent && go test ./internal/... -v -run TestJitter
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 5.1 — Documents jitter exceeding rapid mode interval (F-B2-5)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestJitterExceedsRapidModeInterval(t *testing.T) {
|
||||
// F-B2-5 LOW: Startup jitter (0-30s) is applied uniformly to ALL
|
||||
// polling intervals including rapid mode. Rapid mode (5s) becomes
|
||||
// effectively 5-35s. The jitter should be capped at the polling
|
||||
// interval or not applied when interval < jitter range.
|
||||
mainPath := filepath.Join("..", "cmd", "agent", "main.go")
|
||||
content, err := os.ReadFile(mainPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agent main.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
// Find jitter application (rand.Intn(30) or similar)
|
||||
hasFixedJitter := strings.Contains(src, "rand.Intn(30)") ||
|
||||
strings.Contains(src, "Intn(30)")
|
||||
|
||||
// Find rapid mode interval
|
||||
hasRapidInterval := strings.Contains(src, "return 5") // rapid mode returns 5 seconds
|
||||
|
||||
if !hasFixedJitter {
|
||||
t.Error("[ERROR] [agent] [polling] expected fixed 30-second jitter in main.go")
|
||||
}
|
||||
|
||||
if !hasRapidInterval {
|
||||
t.Log("[WARNING] [agent] [polling] could not confirm rapid mode 5-second interval")
|
||||
}
|
||||
|
||||
// Check if jitter is conditional on polling mode
|
||||
hasConditionalJitter := strings.Contains(src, "RapidPolling") &&
|
||||
(strings.Contains(src, "jitter") || strings.Contains(src, "Jitter"))
|
||||
|
||||
// The jitter block should NOT be inside a rapid-mode conditional
|
||||
// (it's applied unconditionally — that's the bug)
|
||||
if hasConditionalJitter {
|
||||
t.Log("[INFO] [agent] [polling] jitter may already be conditional on rapid mode")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [agent] [polling] F-B2-5 confirmed: 30s jitter applied to all intervals including 5s rapid mode")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 5.2 — Jitter must not exceed polling interval (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestJitterDoesNotExceedPollingInterval(t *testing.T) {
|
||||
// F-B2-5: After fix, jitter must not exceed the current polling
|
||||
// interval. Cap jitter at pollingInterval/2 or skip jitter in rapid mode.
|
||||
mainPath := filepath.Join("..", "cmd", "agent", "main.go")
|
||||
content, err := os.ReadFile(mainPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agent main.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
// After fix: jitter should be bounded by the polling interval
|
||||
// Look for patterns like: min(jitter, interval) or conditional skip in rapid mode
|
||||
jitterIdx := strings.Index(src, "rand.Intn(30)")
|
||||
if jitterIdx == -1 {
|
||||
t.Log("[INFO] [agent] [polling] fixed 30s jitter not found (may be refactored)")
|
||||
return
|
||||
}
|
||||
|
||||
// The jitter line should have a conditional that reduces or skips it in rapid mode
|
||||
// Look for rapid polling check WITHIN 10 lines before the jitter
|
||||
contextStart := jitterIdx - 400
|
||||
if contextStart < 0 {
|
||||
contextStart = 0
|
||||
}
|
||||
contextBefore := src[contextStart:jitterIdx]
|
||||
|
||||
hasRapidModeGuard := strings.Contains(contextBefore, "RapidPolling") ||
|
||||
strings.Contains(contextBefore, "rapidPolling") ||
|
||||
strings.Contains(contextBefore, "rapid_polling")
|
||||
|
||||
if !hasRapidModeGuard {
|
||||
t.Errorf("[ERROR] [agent] [polling] jitter is not guarded for rapid mode.\n" +
|
||||
"F-B2-5: 30s fixed jitter on 5s rapid interval makes rapid mode ineffective.\n" +
|
||||
"After fix: cap jitter at pollingInterval/2 or skip in rapid mode.")
|
||||
}
|
||||
}
|
||||
106
aggregator-agent/internal/reconnect_stagger_test.go
Normal file
106
aggregator-agent/internal/reconnect_stagger_test.go
Normal file
@@ -0,0 +1,106 @@
|
||||
package internal_test
|
||||
|
||||
// reconnect_stagger_test.go — Pre-fix tests for thundering herd on reconnection.
|
||||
//
|
||||
// F-B2-7 MEDIUM: Agent reconnection uses only a fixed 30-second jitter.
|
||||
// After a server restart, all agents retry within 30 seconds of the
|
||||
// server becoming available, causing a traffic spike.
|
||||
//
|
||||
// Run: cd aggregator-agent && go test ./internal/... -v -run TestReconnect
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 7.1 — Documents fixed jitter only (F-B2-7)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestReconnectionUsesFixedJitterOnly(t *testing.T) {
|
||||
// F-B2-7 MEDIUM: Agent reconnection uses only a fixed 30-second
|
||||
// jitter. After a server restart, all agents that were waiting
|
||||
// begin retrying within 30 seconds. True thundering herd mitigation
|
||||
// requires exponential backoff with full jitter.
|
||||
mainPath := filepath.Join("..", "cmd", "agent", "main.go")
|
||||
content, err := os.ReadFile(mainPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agent main.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
// Check for fixed jitter pattern
|
||||
hasFixedJitter := strings.Contains(src, "rand.Intn(30)")
|
||||
|
||||
// Check for exponential backoff in the main polling loop (not config sync)
|
||||
// The main polling loop is the for{} block that calls GetCommands
|
||||
pollLoopIdx := strings.Index(src, "GetCommands(cfg.AgentID")
|
||||
if pollLoopIdx == -1 {
|
||||
pollLoopIdx = strings.Index(src, "GetCommands(")
|
||||
}
|
||||
|
||||
hasExpBackoffInPollLoop := false
|
||||
if pollLoopIdx > 0 {
|
||||
// Check 500 chars around the GetCommands call for backoff logic
|
||||
contextStart := pollLoopIdx - 500
|
||||
if contextStart < 0 {
|
||||
contextStart = 0
|
||||
}
|
||||
context := strings.ToLower(src[contextStart : pollLoopIdx+500])
|
||||
hasExpBackoffInPollLoop = strings.Contains(context, "exponential backoff") ||
|
||||
(strings.Contains(context, "backoff") && strings.Contains(context, "attempt"))
|
||||
}
|
||||
|
||||
if !hasFixedJitter {
|
||||
t.Error("[ERROR] [agent] [polling] expected fixed jitter in main.go")
|
||||
}
|
||||
|
||||
if hasExpBackoffInPollLoop {
|
||||
t.Error("[ERROR] [agent] [polling] F-B2-7 already fixed: exponential backoff in polling loop")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [agent] [polling] F-B2-7 confirmed: reconnection uses fixed 30s jitter only")
|
||||
t.Log("[INFO] [agent] [polling] all agents recovering from outage retry within a 30s window")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 7.2 — Must use exponential backoff with jitter (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestReconnectionUsesExponentialBackoffWithJitter(t *testing.T) {
|
||||
// F-B2-7: After fix, implement exponential backoff with full jitter:
|
||||
// delay = rand(0, min(cap, base * 2^attempt))
|
||||
mainPath := filepath.Join("..", "cmd", "agent", "main.go")
|
||||
content, err := os.ReadFile(mainPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agent main.go: %v", err)
|
||||
}
|
||||
|
||||
src := strings.ToLower(string(content))
|
||||
|
||||
// Check for exponential backoff specifically in the main polling loop error path
|
||||
// (not the config sync backoff which already exists)
|
||||
pollLoopIdx := strings.Index(src, "getcommands")
|
||||
hasExpBackoff := false
|
||||
if pollLoopIdx > 0 {
|
||||
context := src[pollLoopIdx:]
|
||||
if len(context) > 2000 {
|
||||
context = context[:2000]
|
||||
}
|
||||
hasExpBackoff = strings.Contains(context, "exponential") ||
|
||||
(strings.Contains(context, "backoff") && strings.Contains(context, "attempt"))
|
||||
}
|
||||
|
||||
if !hasExpBackoff {
|
||||
t.Errorf("[ERROR] [agent] [polling] no exponential backoff found in reconnection logic.\n" +
|
||||
"F-B2-7: implement exponential backoff with full jitter for reconnection.\n" +
|
||||
"After fix: delay increases with each consecutive failure.")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
package handlers_test
|
||||
|
||||
// command_delivery_race_test.go — Pre-fix tests for command delivery race condition.
|
||||
//
|
||||
// F-B2-2 MEDIUM: GetCommands + MarkCommandSent are not in a transaction.
|
||||
// Two concurrent requests from the same agent can both read the same
|
||||
// pending commands before either marks them as sent.
|
||||
//
|
||||
// Run: cd aggregator-server && go test ./internal/api/handlers/... -v -run TestGetCommands
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 2.1 — Documents non-transactional command delivery (F-B2-2)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetCommandsAndMarkSentNotTransactional(t *testing.T) {
|
||||
// F-B2-2: GetCommands + MarkCommandSent are not in a transaction.
|
||||
// Two concurrent requests from the same agent can both read the
|
||||
// same pending commands before either marks them as sent.
|
||||
// Mitigated by agent-side dedup (A-2) but commands are still
|
||||
// delivered twice, wasting bandwidth.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
cmdIdx := strings.Index(src, "func (h *AgentHandler) GetCommands")
|
||||
if cmdIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] GetCommands function not found")
|
||||
}
|
||||
|
||||
// Search the entire file for the pattern (function is very long due to metrics/metadata handling)
|
||||
hasGetPending := strings.Contains(src, "GetPendingCommands")
|
||||
hasMarkSent := strings.Contains(src, "MarkCommandSent")
|
||||
|
||||
if !hasGetPending || !hasMarkSent {
|
||||
t.Error("[ERROR] [server] [handlers] expected GetPendingCommands and MarkCommandSent in agents.go")
|
||||
}
|
||||
|
||||
// Check if GetCommands function body contains a transaction
|
||||
fnBody := src[cmdIdx:]
|
||||
// Find the next top-level function to bound our search
|
||||
nextFn := strings.Index(fnBody[1:], "\nfunc ")
|
||||
if nextFn > 0 {
|
||||
fnBody = fnBody[:nextFn+1]
|
||||
}
|
||||
|
||||
hasTransaction := strings.Contains(fnBody, ".Beginx()") || strings.Contains(fnBody, ".Begin()")
|
||||
|
||||
if hasTransaction {
|
||||
t.Error("[ERROR] [server] [handlers] F-B2-2 already fixed: GetCommands uses a transaction")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [handlers] F-B2-2 confirmed: GetCommands fetches then marks without transaction")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 2.2 — Command delivery must be atomic (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetCommandsMustBeAtomic(t *testing.T) {
|
||||
// F-B2-2: After fix, use SELECT FOR UPDATE SKIP LOCKED to
|
||||
// atomically claim commands for delivery.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
cmdIdx := strings.Index(src, "func (h *AgentHandler) GetCommands")
|
||||
if cmdIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] GetCommands function not found")
|
||||
}
|
||||
|
||||
fnBody := src[cmdIdx:]
|
||||
nextFn := strings.Index(fnBody[1:], "\nfunc ")
|
||||
if nextFn > 0 {
|
||||
fnBody = fnBody[:nextFn+1]
|
||||
}
|
||||
|
||||
hasTransaction := strings.Contains(fnBody, ".Beginx()") ||
|
||||
strings.Contains(fnBody, ".Begin()")
|
||||
|
||||
if !hasTransaction {
|
||||
t.Errorf("[ERROR] [server] [handlers] GetCommands does not use a transaction.\n" +
|
||||
"F-B2-2: GetPendingCommands and MarkCommandSent must be atomic.\n" +
|
||||
"After fix: use SELECT FOR UPDATE SKIP LOCKED or wrap in transaction.")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 2.3 — Documents absence of SELECT FOR UPDATE (F-B2-2)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestSelectForUpdatePatternInGetCommands(t *testing.T) {
|
||||
// F-B2-2: PostgreSQL SELECT FOR UPDATE SKIP LOCKED is the
|
||||
// standard pattern for claiming work items from a queue.
|
||||
cmdPath := filepath.Join("..", "..", "database", "queries", "commands.go")
|
||||
content, err := os.ReadFile(cmdPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read commands.go: %v", err)
|
||||
}
|
||||
|
||||
src := strings.ToLower(string(content))
|
||||
|
||||
hasForUpdate := strings.Contains(src, "for update") || strings.Contains(src, "skip locked")
|
||||
|
||||
if hasForUpdate {
|
||||
t.Error("[ERROR] [server] [handlers] F-B2-2 already fixed: SELECT FOR UPDATE found in commands.go")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [handlers] F-B2-2 confirmed: no SELECT FOR UPDATE in command queries")
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
package handlers_test
|
||||
|
||||
// rapid_mode_ratelimit_test.go — Pre-fix tests for rapid mode rate limiting.
|
||||
//
|
||||
// F-B2-4 MEDIUM: GET /agents/:id/commands has no rate limit.
|
||||
// In rapid mode (5s polling) with 50 agents, this generates
|
||||
// 3,000 queries/minute without any throttling.
|
||||
//
|
||||
// Run: cd aggregator-server && go test ./internal/api/handlers/... -v -run TestRapidMode
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 4.1 — Documents missing rate limit on GetCommands (F-B2-4)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetCommandsEndpointHasNoRateLimit(t *testing.T) {
|
||||
// F-B2-4: GET /agents/:id/commands has no rate limit. In rapid
|
||||
// mode (5s polling) with 50 concurrent agents this generates
|
||||
// 3,000 queries/minute. A rate limiter should be applied.
|
||||
mainPath := filepath.Join("..", "..", "..", "cmd", "server", "main.go")
|
||||
content, err := os.ReadFile(mainPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read main.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
// Find the GetCommands route registration
|
||||
// Pattern: agents.GET("/:id/commands", ...)
|
||||
cmdRouteIdx := strings.Index(src, `/:id/commands"`)
|
||||
if cmdRouteIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] GetCommands route not found in main.go")
|
||||
}
|
||||
|
||||
// Get the line containing the route registration
|
||||
lineStart := strings.LastIndex(src[:cmdRouteIdx], "\n") + 1
|
||||
lineEnd := strings.Index(src[cmdRouteIdx:], "\n") + cmdRouteIdx
|
||||
routeLine := src[lineStart:lineEnd]
|
||||
|
||||
// Check if rateLimiter.RateLimit appears on this specific line
|
||||
hasRateLimit := strings.Contains(routeLine, "rateLimiter.RateLimit")
|
||||
|
||||
if hasRateLimit {
|
||||
t.Error("[ERROR] [server] [handlers] F-B2-4 already fixed: GetCommands has rate limit")
|
||||
}
|
||||
|
||||
t.Logf("[INFO] [server] [handlers] F-B2-4 confirmed: GetCommands route has no rate limit")
|
||||
t.Logf("[INFO] [server] [handlers] route line: %s", strings.TrimSpace(routeLine))
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 4.2 — GetCommands should have rate limit (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestGetCommandsEndpointShouldHaveRateLimit(t *testing.T) {
|
||||
// F-B2-4: After fix, apply a permissive rate limit to GetCommands
|
||||
// to cap rapid mode load without breaking normal operation.
|
||||
mainPath := filepath.Join("..", "..", "..", "cmd", "server", "main.go")
|
||||
content, err := os.ReadFile(mainPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read main.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
cmdRouteIdx := strings.Index(src, `/:id/commands"`)
|
||||
if cmdRouteIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] GetCommands route not found")
|
||||
}
|
||||
|
||||
lineStart := strings.LastIndex(src[:cmdRouteIdx], "\n") + 1
|
||||
lineEnd := strings.Index(src[cmdRouteIdx:], "\n") + cmdRouteIdx
|
||||
routeLine := src[lineStart:lineEnd]
|
||||
|
||||
if !strings.Contains(routeLine, "rateLimiter.RateLimit") {
|
||||
t.Errorf("[ERROR] [server] [handlers] GetCommands has no rate limit.\n" +
|
||||
"F-B2-4: apply rate limit to cap rapid mode load.")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 4.3 — Rapid mode has server-side max duration (documents existing cap)
|
||||
//
|
||||
// Category: PASS (the cap exists)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRapidModeHasServerSideMaxDuration(t *testing.T) {
|
||||
// F-B2-4: Server-side max duration for rapid mode exists (60 minutes).
|
||||
// But there is no cap on how many agents can be in rapid mode simultaneously.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
rapidIdx := strings.Index(src, "func (h *AgentHandler) SetRapidPollingMode")
|
||||
if rapidIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] SetRapidPollingMode not found")
|
||||
}
|
||||
|
||||
fnBody := src[rapidIdx:]
|
||||
if len(fnBody) > 1500 {
|
||||
fnBody = fnBody[:1500]
|
||||
}
|
||||
|
||||
// Check for max duration validation
|
||||
hasMaxDuration := strings.Contains(fnBody, "max=60") || strings.Contains(fnBody, "max_minutes")
|
||||
|
||||
if !hasMaxDuration {
|
||||
t.Error("[ERROR] [server] [handlers] no max duration validation in SetRapidPollingMode")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [handlers] F-B2-4: rapid mode max duration cap exists (60 minutes)")
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
package handlers_test
|
||||
|
||||
// registration_transaction_test.go — Pre-fix tests for registration transaction safety.
|
||||
//
|
||||
// F-B2-1/F-B2-8 HIGH: Registration uses 4 separate DB operations
|
||||
// (ValidateRegistrationToken, CreateAgent, MarkTokenUsed, CreateRefreshToken)
|
||||
// without a wrapping transaction. Crash between steps leaves orphaned state.
|
||||
//
|
||||
// Run: cd aggregator-server && go test ./internal/api/handlers/... -v -run TestRegistration
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 1.1 — Documents non-transactional registration (F-B2-1/F-B2-8)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRegistrationFlowIsNotTransactional(t *testing.T) {
|
||||
// F-B2-1/F-B2-8: Registration uses 4 separate DB operations without
|
||||
// a wrapping transaction. Crash between CreateAgent and MarkTokenUsed
|
||||
// leaves an orphaned agent. The manual rollback (delete agent on
|
||||
// token failure) is best-effort, not atomic.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
// Find the RegisterAgent function
|
||||
regIdx := strings.Index(src, "func (h *AgentHandler) RegisterAgent")
|
||||
if regIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] RegisterAgent function not found in agents.go")
|
||||
}
|
||||
|
||||
// Extract the function body up to the next top-level func
|
||||
fnBody := src[regIdx:]
|
||||
nextFn := strings.Index(fnBody[1:], "\nfunc ")
|
||||
if nextFn > 0 {
|
||||
fnBody = fnBody[:nextFn+1]
|
||||
}
|
||||
|
||||
// Verify the 4 DB operations exist
|
||||
hasValidate := strings.Contains(fnBody, "ValidateRegistrationToken")
|
||||
hasCreate := strings.Contains(fnBody, "CreateAgent")
|
||||
hasMarkUsed := strings.Contains(fnBody, "MarkTokenUsed")
|
||||
hasRefreshToken := strings.Contains(fnBody, "CreateRefreshToken")
|
||||
|
||||
if !hasValidate || !hasCreate || !hasMarkUsed || !hasRefreshToken {
|
||||
t.Errorf("[ERROR] [server] [handlers] missing expected DB operations in RegisterAgent: "+
|
||||
"validate=%v create=%v markUsed=%v refreshToken=%v",
|
||||
hasValidate, hasCreate, hasMarkUsed, hasRefreshToken)
|
||||
}
|
||||
|
||||
// Check that NO transaction wraps these operations
|
||||
hasTransaction := strings.Contains(fnBody, ".Beginx()") ||
|
||||
strings.Contains(fnBody, ".Begin()")
|
||||
|
||||
if hasTransaction {
|
||||
t.Error("[ERROR] [server] [handlers] F-B2-1 already fixed: " +
|
||||
"RegisterAgent uses a transaction")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [handlers] F-B2-1 confirmed: RegisterAgent uses 4 separate DB operations without transaction")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 1.2 — Registration MUST be transactional (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRegistrationFlowMustBeTransactional(t *testing.T) {
|
||||
// F-B2-1/F-B2-8: After fix, all 4 registration steps must be inside
|
||||
// a single transaction. Failure at any step rolls back atomically.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
regIdx := strings.Index(src, "func (h *AgentHandler) RegisterAgent")
|
||||
if regIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] RegisterAgent function not found")
|
||||
}
|
||||
|
||||
fnBody2 := src[regIdx:]
|
||||
nextFn2 := strings.Index(fnBody2[1:], "\nfunc ")
|
||||
if nextFn2 > 0 {
|
||||
fnBody2 = fnBody2[:nextFn2+1]
|
||||
}
|
||||
|
||||
hasTransaction2 := strings.Contains(fnBody2, ".Beginx()") ||
|
||||
strings.Contains(fnBody2, ".Begin()")
|
||||
|
||||
if !hasTransaction2 {
|
||||
t.Errorf("[ERROR] [server] [handlers] RegisterAgent does not use a transaction.\n" +
|
||||
"F-B2-1: all registration DB operations must be wrapped in a single transaction.\n" +
|
||||
"After fix: db.Beginx() wraps validate, create, mark, and refresh token ops.")
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 1.3 — Manual rollback exists (documents current mitigation)
|
||||
//
|
||||
// Category: PASS-NOW (documents the manual rollback)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestRegistrationManualRollbackExists(t *testing.T) {
|
||||
// F-B2-1: Manual rollback exists but is not atomic.
|
||||
// After fix: transaction replaces manual rollback entirely.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
regIdx := strings.Index(src, "func (h *AgentHandler) RegisterAgent")
|
||||
if regIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] RegisterAgent function not found")
|
||||
}
|
||||
|
||||
fnBody3 := src[regIdx:]
|
||||
nextFn3 := strings.Index(fnBody3[1:], "\nfunc ")
|
||||
if nextFn3 > 0 {
|
||||
fnBody3 = fnBody3[:nextFn3+1]
|
||||
}
|
||||
|
||||
// The manual rollback deletes the agent when token marking fails
|
||||
hasManualRollback := strings.Contains(fnBody3, "DeleteAgent")
|
||||
|
||||
if !hasManualRollback {
|
||||
t.Error("[ERROR] [server] [handlers] no manual rollback found in RegisterAgent")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [handlers] F-B2-1 confirmed: manual agent deletion rollback exists (non-atomic)")
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package handlers_test
|
||||
|
||||
// token_renewal_transaction_test.go — Pre-fix tests for token renewal transaction safety.
|
||||
//
|
||||
// F-B2-9 MEDIUM: Token renewal is not transactional. ValidateRefreshToken
|
||||
// and UpdateExpiration are separate DB operations.
|
||||
//
|
||||
// Run: cd aggregator-server && go test ./internal/api/handlers/... -v -run TestTokenRenewal
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 3.1 — Documents non-transactional token renewal (F-B2-9)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestTokenRenewalIsNotTransactional(t *testing.T) {
|
||||
// F-B2-9 MEDIUM: Token renewal is not transactional. If server
|
||||
// crashes between ValidateRefreshToken and UpdateExpiration,
|
||||
// the token is validated but expiry is not extended.
|
||||
// Self-healing on retry (token still valid).
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
renewIdx := strings.Index(src, "func (h *AgentHandler) RenewToken")
|
||||
if renewIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] RenewToken function not found")
|
||||
}
|
||||
|
||||
fnBody := src[renewIdx:]
|
||||
if len(fnBody) > 2000 {
|
||||
fnBody = fnBody[:2000]
|
||||
}
|
||||
|
||||
hasValidate := strings.Contains(fnBody, "ValidateRefreshToken")
|
||||
hasUpdateExpiry := strings.Contains(fnBody, "UpdateExpiration")
|
||||
hasTransaction := strings.Contains(fnBody, ".Beginx()") || strings.Contains(fnBody, ".Begin()")
|
||||
|
||||
if !hasValidate || !hasUpdateExpiry {
|
||||
t.Error("[ERROR] [server] [handlers] expected ValidateRefreshToken and UpdateExpiration in RenewToken")
|
||||
}
|
||||
|
||||
if hasTransaction {
|
||||
t.Error("[ERROR] [server] [handlers] F-B2-9 already fixed: RenewToken uses a transaction")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [handlers] F-B2-9 confirmed: RenewToken not transactional")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 3.2 — Token renewal should be transactional (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestTokenRenewalShouldBeTransactional(t *testing.T) {
|
||||
// F-B2-9: After fix, wrap validate + update in a single
|
||||
// transaction to ensure atomic renewal.
|
||||
agentsPath := filepath.Join(".", "agents.go")
|
||||
content, err := os.ReadFile(agentsPath)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read agents.go: %v", err)
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
|
||||
renewIdx := strings.Index(src, "func (h *AgentHandler) RenewToken")
|
||||
if renewIdx == -1 {
|
||||
t.Fatal("[ERROR] [server] [handlers] RenewToken function not found")
|
||||
}
|
||||
|
||||
fnBody := src[renewIdx:]
|
||||
if len(fnBody) > 2000 {
|
||||
fnBody = fnBody[:2000]
|
||||
}
|
||||
|
||||
hasTransaction := strings.Contains(fnBody, ".Beginx()") || strings.Contains(fnBody, ".Begin()")
|
||||
|
||||
if !hasTransaction {
|
||||
t.Errorf("[ERROR] [server] [handlers] RenewToken is not transactional.\n" +
|
||||
"F-B2-9: validate + update expiry must be in a single transaction.")
|
||||
}
|
||||
}
|
||||
124
aggregator-server/internal/database/stuck_command_retry_test.go
Normal file
124
aggregator-server/internal/database/stuck_command_retry_test.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package database_test
|
||||
|
||||
// stuck_command_retry_test.go — Pre-fix tests for stuck command retry limit.
|
||||
//
|
||||
// F-B2-10 LOW: No maximum retry count for stuck commands. A command that
|
||||
// always causes the agent to crash will be re-delivered indefinitely.
|
||||
//
|
||||
// Run: cd aggregator-server && go test ./internal/database/... -v -run TestStuckCommand
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 6.1 — Documents unlimited retry for stuck commands (F-B2-10)
|
||||
//
|
||||
// Category: PASS-NOW (documents the bug)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestStuckCommandHasNoMaxRetryCount(t *testing.T) {
|
||||
// F-B2-10 LOW: No maximum retry count for stuck commands.
|
||||
// A command that always causes the agent to crash will be
|
||||
// delivered and re-delivered indefinitely via the stuck
|
||||
// command re-delivery path.
|
||||
|
||||
// Check agent_commands schema for retry_count column
|
||||
migrationsDir := filepath.Join("migrations")
|
||||
files, err := os.ReadDir(migrationsDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read migrations directory: %v", err)
|
||||
}
|
||||
|
||||
hasRetryCount := false
|
||||
for _, f := range files {
|
||||
if !strings.HasSuffix(f.Name(), ".up.sql") {
|
||||
continue
|
||||
}
|
||||
content, err := os.ReadFile(filepath.Join(migrationsDir, f.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
src := strings.ToLower(string(content))
|
||||
if strings.Contains(src, "agent_commands") &&
|
||||
(strings.Contains(src, "retry_count") || strings.Contains(src, "delivery_count") ||
|
||||
strings.Contains(src, "attempt_count")) {
|
||||
hasRetryCount = true
|
||||
}
|
||||
}
|
||||
|
||||
if hasRetryCount {
|
||||
t.Error("[ERROR] [server] [database] F-B2-10 already fixed: retry_count column exists")
|
||||
}
|
||||
|
||||
// Check GetStuckCommands specifically for a retry limit in its WHERE clause
|
||||
cmdPath := filepath.Join("queries", "commands.go")
|
||||
content, err := os.ReadFile(cmdPath)
|
||||
if err != nil {
|
||||
t.Logf("[WARNING] [server] [database] could not read commands.go: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
src := string(content)
|
||||
// Find GetStuckCommands function and check its query
|
||||
stuckIdx := strings.Index(src, "func (q *CommandQueries) GetStuckCommands")
|
||||
if stuckIdx == -1 {
|
||||
t.Log("[WARNING] [server] [database] GetStuckCommands function not found")
|
||||
return
|
||||
}
|
||||
stuckBody := src[stuckIdx:]
|
||||
if len(stuckBody) > 500 {
|
||||
stuckBody = stuckBody[:500]
|
||||
}
|
||||
stuckLower := strings.ToLower(stuckBody)
|
||||
hasRetryFilter := strings.Contains(stuckLower, "delivery_count <") ||
|
||||
strings.Contains(stuckLower, "retry_count <") ||
|
||||
strings.Contains(stuckLower, "max_retries")
|
||||
|
||||
if hasRetryFilter {
|
||||
t.Error("[ERROR] [server] [database] F-B2-10 already fixed: retry limit in GetStuckCommands")
|
||||
}
|
||||
|
||||
t.Log("[INFO] [server] [database] F-B2-10 confirmed: no retry count limit on stuck commands")
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Test 6.2 — Stuck commands must have max retry count (assert fix)
|
||||
//
|
||||
// Category: FAIL-NOW / PASS-AFTER-FIX
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestStuckCommandHasMaxRetryCount(t *testing.T) {
|
||||
// F-B2-10: After fix, add retry_count column and cap re-delivery
|
||||
// at a maximum (e.g., 5 attempts).
|
||||
migrationsDir := filepath.Join("migrations")
|
||||
files, err := os.ReadDir(migrationsDir)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read migrations directory: %v", err)
|
||||
}
|
||||
|
||||
hasRetryCount := false
|
||||
for _, f := range files {
|
||||
if !strings.HasSuffix(f.Name(), ".up.sql") {
|
||||
continue
|
||||
}
|
||||
content, err := os.ReadFile(filepath.Join(migrationsDir, f.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
src := strings.ToLower(string(content))
|
||||
if strings.Contains(src, "agent_commands") &&
|
||||
(strings.Contains(src, "retry_count") || strings.Contains(src, "delivery_count") ||
|
||||
strings.Contains(src, "attempt_count")) {
|
||||
hasRetryCount = true
|
||||
}
|
||||
}
|
||||
|
||||
if !hasRetryCount {
|
||||
t.Errorf("[ERROR] [server] [database] no retry_count column on agent_commands.\n" +
|
||||
"F-B2-10: add retry_count and cap re-delivery at max retries.")
|
||||
}
|
||||
}
|
||||
54
docs/B2_PreFix_Tests.md
Normal file
54
docs/B2_PreFix_Tests.md
Normal file
@@ -0,0 +1,54 @@
|
||||
# B-2 Pre-Fix Test Suite
|
||||
|
||||
**Date:** 2026-03-29
|
||||
**Branch:** culurien
|
||||
**Purpose:** Document data integrity and concurrency bugs BEFORE fixes.
|
||||
**Reference:** docs/B2_Data_Integrity_Audit.md
|
||||
|
||||
---
|
||||
|
||||
## Test Files Created
|
||||
|
||||
| File | Package | Bugs Documented |
|
||||
|------|---------|-----------------|
|
||||
| `server/internal/api/handlers/registration_transaction_test.go` | `handlers_test` | F-B2-1, F-B2-8 |
|
||||
| `server/internal/api/handlers/command_delivery_race_test.go` | `handlers_test` | F-B2-2 |
|
||||
| `server/internal/api/handlers/token_renewal_transaction_test.go` | `handlers_test` | F-B2-9 |
|
||||
| `server/internal/api/handlers/rapid_mode_ratelimit_test.go` | `handlers_test` | F-B2-4 |
|
||||
| `server/internal/database/stuck_command_retry_test.go` | `database_test` | F-B2-10 |
|
||||
| `agent/internal/polling_jitter_test.go` | `internal_test` | F-B2-5 |
|
||||
| `agent/internal/reconnect_stagger_test.go` | `internal_test` | F-B2-7 |
|
||||
|
||||
---
|
||||
|
||||
## State-Change Summary
|
||||
|
||||
| Test | Bug | Current | After Fix |
|
||||
|------|-----|---------|-----------|
|
||||
| TestRegistrationFlowIsNotTransactional | F-B2-1 | PASS | update |
|
||||
| TestRegistrationFlowMustBeTransactional | F-B2-1 | **FAIL** | PASS |
|
||||
| TestRegistrationManualRollbackExists | F-B2-1 | PASS | update |
|
||||
| TestGetCommandsAndMarkSentNotTransactional | F-B2-2 | PASS | update |
|
||||
| TestGetCommandsMustBeAtomic | F-B2-2 | **FAIL** | PASS |
|
||||
| TestSelectForUpdatePatternInGetCommands | F-B2-2 | PASS | update |
|
||||
| TestTokenRenewalIsNotTransactional | F-B2-9 | PASS | update |
|
||||
| TestTokenRenewalShouldBeTransactional | F-B2-9 | **FAIL** | PASS |
|
||||
| TestGetCommandsEndpointHasNoRateLimit | F-B2-4 | PASS | update |
|
||||
| TestGetCommandsEndpointShouldHaveRateLimit | F-B2-4 | **FAIL** | PASS |
|
||||
| TestRapidModeHasServerSideMaxDuration | F-B2-4 | PASS | PASS |
|
||||
| TestJitterExceedsRapidModeInterval | F-B2-5 | PASS | update |
|
||||
| TestJitterDoesNotExceedPollingInterval | F-B2-5 | **FAIL** | PASS |
|
||||
| TestStuckCommandHasNoMaxRetryCount | F-B2-10 | PASS | update |
|
||||
| TestStuckCommandHasMaxRetryCount | F-B2-10 | **FAIL** | PASS |
|
||||
| TestReconnectionUsesFixedJitterOnly | F-B2-7 | PASS | update |
|
||||
| TestReconnectionUsesExponentialBackoffWithJitter | F-B2-7 | **FAIL** | PASS |
|
||||
|
||||
**7 FAIL** (assert post-fix behavior), **10 PASS** (document current state).
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
|
||||
1. All tests are static source inspection — no live database required.
|
||||
2. All A-series and B-1 tests continue to pass (no regressions).
|
||||
3. Agent tests in `internal/` package avoid the pre-existing build failures in `migration/pathutils` and `migration/validation` packages.
|
||||
Reference in New Issue
Block a user