Files
certctl/internal/service/job_concurrency_test.go
T
shankar0123 475421457f fix(test): TestBoundedFanOut_SkipsAgentRoutedDeployments race on seenIDs slice
CI race detector flagged TestBoundedFanOut_SkipsAgentRoutedDeployments
on commit 35e18bf (audit fix #9). The test's `work` closure was
appending to a plain []string slice from worker goroutines without
synchronisation:

    var seenIDs []string
    work := func(ctx context.Context, job *domain.Job) error {
        seen.Add(1)
        seenIDs = append(seenIDs, job.ID)  // race
        return nil
    }

atomic.Int64 covered the count assertion but the slice header itself
is the racing memory — race detector caught both the read+write race
on the slice header and the runtime.growslice path on append.

Fix: protect seenIDs with a sync.Mutex. The slice is only used in
the failure-message branch (`t.Errorf` ids=%v formatting), so the
contention is irrelevant to performance — correctness only.

Also locked around the read in the t.Errorf format-args evaluation,
since that read happens AFTER boundedFanOut returns (and Wait()
inside boundedFanOut synchronizes the worker goroutines), but the
explicit Lock/Unlock makes the synchronisation visible without
depending on the implicit happens-before from Wait.

The other five tests in the file (TestBoundedFanOut_CapHolds,
_AllJobsRun, _CtxCancelInterrupts, _FailedJobsCounted,
TestSetRenewalConcurrency_NormalizesNonPositive) only mutate
atomic.Int64 counters from worker goroutines, so they were
already race-clean.

Verified locally: go test -race -count=1 -run
'TestBoundedFanOut|TestSetRenewalConcurrency' ./internal/service/...
green.
2026-05-02 14:34:48 +00:00

264 lines
8.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
// Audit fix #9 — bounded scheduler concurrency tests.
//
// boundedFanOut is the load-bearing primitive that caps the number of
// concurrent renewal/issuance/deployment goroutines per scheduler tick.
// Production wiring in cmd/server/main.go calls
// SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) (default 25);
// these tests pin the cap behaviour directly against boundedFanOut so
// they don't have to stand up the full renewal/deployment service
// graph just to assert "the cap holds."
import (
"context"
"fmt"
"io"
"log/slog"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/shankar0123/certctl/internal/domain"
)
// quietLogger discards the boundedFanOut log output so the test runner
// doesn't drown in info-level lines for every dispatched job.
func quietLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
}
// makeJobs builds n pending renewal jobs with deterministic IDs.
func makeJobs(n int) []*domain.Job {
jobs := make([]*domain.Job, n)
for i := 0; i < n; i++ {
jobs[i] = &domain.Job{
ID: "job-" + strconv.Itoa(i),
Type: domain.JobTypeRenewal,
Status: domain.JobStatusPending,
}
}
return jobs
}
// TestBoundedFanOut_CapHolds is the primary regression guard for the
// audit's #9 blocker. It runs 50 jobs through a fan-out with cap=5,
// where each "job" sleeps 50ms to ensure several dispatchers are
// in-flight simultaneously, and asserts that the peak in-flight count
// never exceeded the cap. Pre-fix the renewal fan-out had no cap, so
// this test would have observed peak in-flight = 50.
func TestBoundedFanOut_CapHolds(t *testing.T) {
const (
capN = 5
totalJobs = 50
workSleep = 50 * time.Millisecond
hardBudget = 30 * time.Second // generous; cap=5 + 50 jobs * 50ms ≈ 500ms
)
jobs := makeJobs(totalJobs)
var inFlight atomic.Int64
var peak atomic.Int64
work := func(ctx context.Context, job *domain.Job) error {
now := inFlight.Add(1)
// Lock-free max via CompareAndSwap loop. Avoids a mutex on the
// hot path which would itself constrain concurrency and
// invalidate the measurement.
for {
cur := peak.Load()
if now <= cur {
break
}
if peak.CompareAndSwap(cur, now) {
break
}
}
time.Sleep(workSleep)
inFlight.Add(-1)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), hardBudget)
defer cancel()
if err := boundedFanOut(ctx, jobs, capN, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut returned error: %v", err)
}
if got := peak.Load(); got > int64(capN) {
t.Errorf("peak in-flight count exceeded the cap: got %d, cap %d", got, capN)
}
// Sanity: the cap should actually be reached at least once with
// 50 jobs × 50ms sleep — if it isn't, either the workload is too
// short or the gate is broken in a way that caps below the
// intended value.
if got := peak.Load(); got < int64(capN) {
t.Errorf("peak in-flight count never reached the cap: got %d, cap %d (workload too short or gate broken low?)", got, capN)
}
}
// TestBoundedFanOut_AllJobsRun pins that every (non-skipped) job is
// actually dispatched — the cap should add backpressure, not drop
// jobs. Counterpart to TestBoundedFanOut_CapHolds: that test asserts
// the upper bound; this one asserts the lower bound.
func TestBoundedFanOut_AllJobsRun(t *testing.T) {
const capN = 3
jobs := makeJobs(20)
var dispatched atomic.Int64
work := func(ctx context.Context, job *domain.Job) error {
dispatched.Add(1)
return nil
}
if err := boundedFanOut(context.Background(), jobs, capN, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut returned error: %v", err)
}
if got := dispatched.Load(); got != int64(len(jobs)) {
t.Errorf("expected all %d jobs to be dispatched, got %d", len(jobs), got)
}
}
// TestBoundedFanOut_SkipsAgentRoutedDeployments pins the
// shouldSkipJob contract: deployment jobs with a non-empty AgentID
// belong to the agent's GetPendingWork path, so the server-side
// fan-out must skip them. boundedFanOut's behaviour here matches the
// pre-audit-#9 sequential loop's behaviour exactly.
func TestBoundedFanOut_SkipsAgentRoutedDeployments(t *testing.T) {
agentID := "agent-1"
jobs := []*domain.Job{
{ID: "j1", Type: domain.JobTypeRenewal, Status: domain.JobStatusPending},
{ID: "j2", Type: domain.JobTypeDeployment, Status: domain.JobStatusPending, AgentID: &agentID},
{ID: "j3", Type: domain.JobTypeIssuance, Status: domain.JobStatusPending},
}
var seen atomic.Int64
// seenIDs is appended from multiple worker goroutines; the
// surrounding mutex serialises the slice mutation so the
// race detector stays clean. (atomic.Int64 alone isn't
// sufficient: the slice header is the racing memory.)
var (
mu sync.Mutex
seenIDs []string
)
work := func(ctx context.Context, job *domain.Job) error {
seen.Add(1)
mu.Lock()
seenIDs = append(seenIDs, job.ID)
mu.Unlock()
return nil
}
if err := boundedFanOut(context.Background(), jobs, 5, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut returned error: %v", err)
}
if got := seen.Load(); got != 2 {
mu.Lock()
t.Errorf("expected 2 jobs to run (renewal + issuance, deployment-with-agent skipped), got %d (ids=%v)", got, seenIDs)
mu.Unlock()
}
}
// TestBoundedFanOut_CtxCancelInterrupts pins that ctx cancellation
// during a long-running fan-out interrupts the dispatch loop. Without
// the ctx-aware Acquire (audit prompt's "anti-pattern: channel-based
// semaphore without ctx-aware acquire"), this test would hang the
// scheduler indefinitely on a stuck CA call.
func TestBoundedFanOut_CtxCancelInterrupts(t *testing.T) {
jobs := makeJobs(100)
work := func(ctx context.Context, job *domain.Job) error {
// Work that respects ctx — sleeps until ctx done or 5s elapsed.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
return nil
}
}
ctx, cancel := context.WithCancel(context.Background())
// Cancel after 100ms so the fan-out aborts mid-stream.
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
start := time.Now()
err := boundedFanOut(ctx, jobs, 3, work, quietLogger())
elapsed := time.Since(start)
if err != nil {
t.Fatalf("boundedFanOut should not propagate the ctx error from work; got %v", err)
}
// Even with ctx cancel, the function returns nil because the
// loop exits via the Acquire-cancel branch (logged warn) and
// the Wait drains in-flight goroutines. Total elapsed should be
// well under the 5s "stuck CA" cap if the cancel actually
// interrupted the dispatch.
if elapsed > 6*time.Second {
t.Errorf("ctx cancel did not interrupt fan-out: elapsed=%v, expected <6s", elapsed)
}
}
// TestBoundedFanOut_FailedJobsCounted pins that errors from `work`
// don't cause the fan-out to abort — the failed counter increments
// and the loop continues. Jobs are independent; one cert failing
// shouldn't block the rest.
func TestBoundedFanOut_FailedJobsCounted(t *testing.T) {
const totalJobs = 10
jobs := makeJobs(totalJobs)
var dispatched atomic.Int64
failEvery := 3 // jobs 0, 3, 6, 9 fail
work := func(ctx context.Context, job *domain.Job) error {
idx, _ := strconv.Atoi(job.ID[len("job-"):])
dispatched.Add(1)
if idx%failEvery == 0 {
return fmt.Errorf("simulated failure for %s", job.ID)
}
return nil
}
if err := boundedFanOut(context.Background(), jobs, 4, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut should swallow per-job errors; got %v", err)
}
if got := dispatched.Load(); got != int64(totalJobs) {
t.Errorf("expected all %d jobs dispatched even with failures, got %d", totalJobs, got)
}
}
// TestSetRenewalConcurrency_NormalizesNonPositive pins the ≤0 → 1
// fail-safe in SetRenewalConcurrency. semaphore.NewWeighted(0)
// constructs a semaphore that blocks every Acquire forever; the
// normalization prevents a misconfigured env var from wedging the
// scheduler.
func TestSetRenewalConcurrency_NormalizesNonPositive(t *testing.T) {
cases := []struct {
in int
want int
}{
{-100, 1},
{-1, 1},
{0, 1},
{1, 1},
{25, 25},
{1000, 1000},
}
for _, tc := range cases {
t.Run(strconv.Itoa(tc.in), func(t *testing.T) {
s := &JobService{}
s.SetRenewalConcurrency(tc.in)
if s.renewalConcurrency != tc.want {
t.Errorf("SetRenewalConcurrency(%d) -> renewalConcurrency=%d, want %d", tc.in, s.renewalConcurrency, tc.want)
}
})
}
}