mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-10 13:58:52 +00:00
475421457f
CI race detector flagged TestBoundedFanOut_SkipsAgentRoutedDeployments
on commit 35e18bf (audit fix #9). The test's `work` closure was
appending to a plain []string slice from worker goroutines without
synchronisation:
var seenIDs []string
work := func(ctx context.Context, job *domain.Job) error {
seen.Add(1)
seenIDs = append(seenIDs, job.ID) // race
return nil
}
atomic.Int64 covered the count assertion but the slice header itself
is the racing memory — race detector caught both the read+write race
on the slice header and the runtime.growslice path on append.
Fix: protect seenIDs with a sync.Mutex. The slice is only used in
the failure-message branch (`t.Errorf` ids=%v formatting), so the
contention is irrelevant to performance — correctness only.
Also locked around the read in the t.Errorf format-args evaluation,
since that read happens AFTER boundedFanOut returns (and Wait()
inside boundedFanOut synchronizes the worker goroutines), but the
explicit Lock/Unlock makes the synchronisation visible without
depending on the implicit happens-before from Wait.
The other five tests in the file (TestBoundedFanOut_CapHolds,
_AllJobsRun, _CtxCancelInterrupts, _FailedJobsCounted,
TestSetRenewalConcurrency_NormalizesNonPositive) only mutate
atomic.Int64 counters from worker goroutines, so they were
already race-clean.
Verified locally: go test -race -count=1 -run
'TestBoundedFanOut|TestSetRenewalConcurrency' ./internal/service/...
green.
264 lines
8.3 KiB
Go
264 lines
8.3 KiB
Go
package service
|
||
|
||
// Audit fix #9 — bounded scheduler concurrency tests.
|
||
//
|
||
// boundedFanOut is the load-bearing primitive that caps the number of
|
||
// concurrent renewal/issuance/deployment goroutines per scheduler tick.
|
||
// Production wiring in cmd/server/main.go calls
|
||
// SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) (default 25);
|
||
// these tests pin the cap behaviour directly against boundedFanOut so
|
||
// they don't have to stand up the full renewal/deployment service
|
||
// graph just to assert "the cap holds."
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"strconv"
|
||
"sync"
|
||
"sync/atomic"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/shankar0123/certctl/internal/domain"
|
||
)
|
||
|
||
// quietLogger discards the boundedFanOut log output so the test runner
|
||
// doesn't drown in info-level lines for every dispatched job.
|
||
func quietLogger() *slog.Logger {
|
||
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
|
||
}
|
||
|
||
// makeJobs builds n pending renewal jobs with deterministic IDs.
|
||
func makeJobs(n int) []*domain.Job {
|
||
jobs := make([]*domain.Job, n)
|
||
for i := 0; i < n; i++ {
|
||
jobs[i] = &domain.Job{
|
||
ID: "job-" + strconv.Itoa(i),
|
||
Type: domain.JobTypeRenewal,
|
||
Status: domain.JobStatusPending,
|
||
}
|
||
}
|
||
return jobs
|
||
}
|
||
|
||
// TestBoundedFanOut_CapHolds is the primary regression guard for the
|
||
// audit's #9 blocker. It runs 50 jobs through a fan-out with cap=5,
|
||
// where each "job" sleeps 50ms to ensure several dispatchers are
|
||
// in-flight simultaneously, and asserts that the peak in-flight count
|
||
// never exceeded the cap. Pre-fix the renewal fan-out had no cap, so
|
||
// this test would have observed peak in-flight = 50.
|
||
func TestBoundedFanOut_CapHolds(t *testing.T) {
|
||
const (
|
||
capN = 5
|
||
totalJobs = 50
|
||
workSleep = 50 * time.Millisecond
|
||
hardBudget = 30 * time.Second // generous; cap=5 + 50 jobs * 50ms ≈ 500ms
|
||
)
|
||
|
||
jobs := makeJobs(totalJobs)
|
||
|
||
var inFlight atomic.Int64
|
||
var peak atomic.Int64
|
||
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
now := inFlight.Add(1)
|
||
// Lock-free max via CompareAndSwap loop. Avoids a mutex on the
|
||
// hot path which would itself constrain concurrency and
|
||
// invalidate the measurement.
|
||
for {
|
||
cur := peak.Load()
|
||
if now <= cur {
|
||
break
|
||
}
|
||
if peak.CompareAndSwap(cur, now) {
|
||
break
|
||
}
|
||
}
|
||
time.Sleep(workSleep)
|
||
inFlight.Add(-1)
|
||
return nil
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), hardBudget)
|
||
defer cancel()
|
||
if err := boundedFanOut(ctx, jobs, capN, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut returned error: %v", err)
|
||
}
|
||
|
||
if got := peak.Load(); got > int64(capN) {
|
||
t.Errorf("peak in-flight count exceeded the cap: got %d, cap %d", got, capN)
|
||
}
|
||
// Sanity: the cap should actually be reached at least once with
|
||
// 50 jobs × 50ms sleep — if it isn't, either the workload is too
|
||
// short or the gate is broken in a way that caps below the
|
||
// intended value.
|
||
if got := peak.Load(); got < int64(capN) {
|
||
t.Errorf("peak in-flight count never reached the cap: got %d, cap %d (workload too short or gate broken low?)", got, capN)
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_AllJobsRun pins that every (non-skipped) job is
|
||
// actually dispatched — the cap should add backpressure, not drop
|
||
// jobs. Counterpart to TestBoundedFanOut_CapHolds: that test asserts
|
||
// the upper bound; this one asserts the lower bound.
|
||
func TestBoundedFanOut_AllJobsRun(t *testing.T) {
|
||
const capN = 3
|
||
jobs := makeJobs(20)
|
||
|
||
var dispatched atomic.Int64
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
dispatched.Add(1)
|
||
return nil
|
||
}
|
||
|
||
if err := boundedFanOut(context.Background(), jobs, capN, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut returned error: %v", err)
|
||
}
|
||
|
||
if got := dispatched.Load(); got != int64(len(jobs)) {
|
||
t.Errorf("expected all %d jobs to be dispatched, got %d", len(jobs), got)
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_SkipsAgentRoutedDeployments pins the
|
||
// shouldSkipJob contract: deployment jobs with a non-empty AgentID
|
||
// belong to the agent's GetPendingWork path, so the server-side
|
||
// fan-out must skip them. boundedFanOut's behaviour here matches the
|
||
// pre-audit-#9 sequential loop's behaviour exactly.
|
||
func TestBoundedFanOut_SkipsAgentRoutedDeployments(t *testing.T) {
|
||
agentID := "agent-1"
|
||
jobs := []*domain.Job{
|
||
{ID: "j1", Type: domain.JobTypeRenewal, Status: domain.JobStatusPending},
|
||
{ID: "j2", Type: domain.JobTypeDeployment, Status: domain.JobStatusPending, AgentID: &agentID},
|
||
{ID: "j3", Type: domain.JobTypeIssuance, Status: domain.JobStatusPending},
|
||
}
|
||
|
||
var seen atomic.Int64
|
||
// seenIDs is appended from multiple worker goroutines; the
|
||
// surrounding mutex serialises the slice mutation so the
|
||
// race detector stays clean. (atomic.Int64 alone isn't
|
||
// sufficient: the slice header is the racing memory.)
|
||
var (
|
||
mu sync.Mutex
|
||
seenIDs []string
|
||
)
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
seen.Add(1)
|
||
mu.Lock()
|
||
seenIDs = append(seenIDs, job.ID)
|
||
mu.Unlock()
|
||
return nil
|
||
}
|
||
|
||
if err := boundedFanOut(context.Background(), jobs, 5, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut returned error: %v", err)
|
||
}
|
||
|
||
if got := seen.Load(); got != 2 {
|
||
mu.Lock()
|
||
t.Errorf("expected 2 jobs to run (renewal + issuance, deployment-with-agent skipped), got %d (ids=%v)", got, seenIDs)
|
||
mu.Unlock()
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_CtxCancelInterrupts pins that ctx cancellation
|
||
// during a long-running fan-out interrupts the dispatch loop. Without
|
||
// the ctx-aware Acquire (audit prompt's "anti-pattern: channel-based
|
||
// semaphore without ctx-aware acquire"), this test would hang the
|
||
// scheduler indefinitely on a stuck CA call.
|
||
func TestBoundedFanOut_CtxCancelInterrupts(t *testing.T) {
|
||
jobs := makeJobs(100)
|
||
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
// Work that respects ctx — sleeps until ctx done or 5s elapsed.
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(5 * time.Second):
|
||
return nil
|
||
}
|
||
}
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
// Cancel after 100ms so the fan-out aborts mid-stream.
|
||
go func() {
|
||
time.Sleep(100 * time.Millisecond)
|
||
cancel()
|
||
}()
|
||
|
||
start := time.Now()
|
||
err := boundedFanOut(ctx, jobs, 3, work, quietLogger())
|
||
elapsed := time.Since(start)
|
||
|
||
if err != nil {
|
||
t.Fatalf("boundedFanOut should not propagate the ctx error from work; got %v", err)
|
||
}
|
||
// Even with ctx cancel, the function returns nil because the
|
||
// loop exits via the Acquire-cancel branch (logged warn) and
|
||
// the Wait drains in-flight goroutines. Total elapsed should be
|
||
// well under the 5s "stuck CA" cap if the cancel actually
|
||
// interrupted the dispatch.
|
||
if elapsed > 6*time.Second {
|
||
t.Errorf("ctx cancel did not interrupt fan-out: elapsed=%v, expected <6s", elapsed)
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_FailedJobsCounted pins that errors from `work`
|
||
// don't cause the fan-out to abort — the failed counter increments
|
||
// and the loop continues. Jobs are independent; one cert failing
|
||
// shouldn't block the rest.
|
||
func TestBoundedFanOut_FailedJobsCounted(t *testing.T) {
|
||
const totalJobs = 10
|
||
jobs := makeJobs(totalJobs)
|
||
|
||
var dispatched atomic.Int64
|
||
failEvery := 3 // jobs 0, 3, 6, 9 fail
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
idx, _ := strconv.Atoi(job.ID[len("job-"):])
|
||
dispatched.Add(1)
|
||
if idx%failEvery == 0 {
|
||
return fmt.Errorf("simulated failure for %s", job.ID)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
if err := boundedFanOut(context.Background(), jobs, 4, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut should swallow per-job errors; got %v", err)
|
||
}
|
||
|
||
if got := dispatched.Load(); got != int64(totalJobs) {
|
||
t.Errorf("expected all %d jobs dispatched even with failures, got %d", totalJobs, got)
|
||
}
|
||
}
|
||
|
||
// TestSetRenewalConcurrency_NormalizesNonPositive pins the ≤0 → 1
|
||
// fail-safe in SetRenewalConcurrency. semaphore.NewWeighted(0)
|
||
// constructs a semaphore that blocks every Acquire forever; the
|
||
// normalization prevents a misconfigured env var from wedging the
|
||
// scheduler.
|
||
func TestSetRenewalConcurrency_NormalizesNonPositive(t *testing.T) {
|
||
cases := []struct {
|
||
in int
|
||
want int
|
||
}{
|
||
{-100, 1},
|
||
{-1, 1},
|
||
{0, 1},
|
||
{1, 1},
|
||
{25, 25},
|
||
{1000, 1000},
|
||
}
|
||
for _, tc := range cases {
|
||
t.Run(strconv.Itoa(tc.in), func(t *testing.T) {
|
||
s := &JobService{}
|
||
s.SetRenewalConcurrency(tc.in)
|
||
if s.renewalConcurrency != tc.want {
|
||
t.Errorf("SetRenewalConcurrency(%d) -> renewalConcurrency=%d, want %d", tc.in, s.renewalConcurrency, tc.want)
|
||
}
|
||
})
|
||
}
|
||
}
|