mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 15:01:32 +00:00
8b75e0311b
Mechanical sed across the main go.mod's module declaration, the f5-mock-icontrol
sub-module's go.mod, every Go file's import path (361 files), and a rebuild of
the checked-in f5-mock-icontrol binary so its embedded build-info reflects the
new module path. No behavior change.
Choice B from cowork/transfer-certctl-to-org.md, executed 2026-05-04. Choice A
(keep module path declared as github.com/shankar0123/certctl regardless of
repo URL) shipped on the day of the org transfer (2026-05-03) since we had no
external Go consumers; this commit closes that deferral.
Backward-compat: GitHub HTTP redirects continue to forward
github.com/shankar0123/certctl → github.com/certctl-io/certctl at the URL
level, but Go's module proxy uses the path declared in go.mod as the
canonical name. Pre-fix, anyone trying `go get github.com/certctl-io/certctl/...`
hit a "module path mismatch" error because go.mod said
github.com/shankar0123/certctl and the URL they fetched it from said
certctl-io/certctl. Post-fix, the canonical name and the URL agree, so
go get / go install / external Go consumers / Go-tooling integrations
work cleanly via either the new path (preferred) or the old path (which
redirects and Go follows the redirect for source fetch).
Anyone still importing the old path inside their own code keeps working
provided they update their go.mod's `require` line to match — the module
path declared in their consumer's go.sum / go.mod is the authoritative
import name, so a mass sed across their import statements is the migration
on the consumer side. No external consumers exist today.
Diff shape:
361 *.go files — import path replacement only
2 go.mod — module declaration replacement only
1 binary — deploy/test/f5-mock-icontrol/f5-mock-icontrol rebuilt
so embedded build-info reflects the new path (8618965 vs
8618933 bytes; 32-byte diff is the build-info change)
Total: 364 files, 730 insertions / 730 deletions, net-zero size, pure
mechanical substitution.
Verification:
gofmt: 17 files needed re-alignment after sed (the new path is one char
shorter than the old, so column-aligned import groups drifted). Applied
`gofmt -w` to fix.
go mod tidy: clean exit on both modules.
go vet ./...: clean exit.
go build ./...: clean exit.
go test -short -count=1 on representative packages: all green
(internal/domain, internal/validation, internal/crypto, internal/crypto/signer,
cmd/agent). Test output now reads `ok github.com/certctl-io/certctl/...`
confirming the module path resolves correctly.
binary: f5-mock-icontrol rebuilt; `strings | grep shankar0123` returns
nothing; `strings | grep certctl-io/certctl` shows the new module path
embedded in build-info.
Files intentionally NOT touched in this commit:
README.md / CHANGELOG.md / docs/ / etc. — already swept to certctl-io
URLs in commit 0729ee4 (the post-transfer URL refresh). This commit is
purely the Go-tooling layer.
Scarf pixels (`shankar0123.docker.scarf.sh/...`) — Scarf-account
namespace, not a Go import or GitHub repo URL. Stays.
This is a non-blocking, non-customer-impacting change. Operators pulling
container images, running `make verify`, hitting the API, or installing the
agent see no functional difference. Only Go-tooling consumers (none today)
are affected, and they're enabled — not broken — by this commit.
264 lines
8.3 KiB
Go
264 lines
8.3 KiB
Go
package service
|
||
|
||
// Audit fix #9 — bounded scheduler concurrency tests.
|
||
//
|
||
// boundedFanOut is the load-bearing primitive that caps the number of
|
||
// concurrent renewal/issuance/deployment goroutines per scheduler tick.
|
||
// Production wiring in cmd/server/main.go calls
|
||
// SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) (default 25);
|
||
// these tests pin the cap behaviour directly against boundedFanOut so
|
||
// they don't have to stand up the full renewal/deployment service
|
||
// graph just to assert "the cap holds."
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"strconv"
|
||
"sync"
|
||
"sync/atomic"
|
||
"testing"
|
||
"time"
|
||
|
||
"github.com/certctl-io/certctl/internal/domain"
|
||
)
|
||
|
||
// quietLogger discards the boundedFanOut log output so the test runner
|
||
// doesn't drown in info-level lines for every dispatched job.
|
||
func quietLogger() *slog.Logger {
|
||
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
|
||
}
|
||
|
||
// makeJobs builds n pending renewal jobs with deterministic IDs.
|
||
func makeJobs(n int) []*domain.Job {
|
||
jobs := make([]*domain.Job, n)
|
||
for i := 0; i < n; i++ {
|
||
jobs[i] = &domain.Job{
|
||
ID: "job-" + strconv.Itoa(i),
|
||
Type: domain.JobTypeRenewal,
|
||
Status: domain.JobStatusPending,
|
||
}
|
||
}
|
||
return jobs
|
||
}
|
||
|
||
// TestBoundedFanOut_CapHolds is the primary regression guard for the
|
||
// audit's #9 blocker. It runs 50 jobs through a fan-out with cap=5,
|
||
// where each "job" sleeps 50ms to ensure several dispatchers are
|
||
// in-flight simultaneously, and asserts that the peak in-flight count
|
||
// never exceeded the cap. Pre-fix the renewal fan-out had no cap, so
|
||
// this test would have observed peak in-flight = 50.
|
||
func TestBoundedFanOut_CapHolds(t *testing.T) {
|
||
const (
|
||
capN = 5
|
||
totalJobs = 50
|
||
workSleep = 50 * time.Millisecond
|
||
hardBudget = 30 * time.Second // generous; cap=5 + 50 jobs * 50ms ≈ 500ms
|
||
)
|
||
|
||
jobs := makeJobs(totalJobs)
|
||
|
||
var inFlight atomic.Int64
|
||
var peak atomic.Int64
|
||
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
now := inFlight.Add(1)
|
||
// Lock-free max via CompareAndSwap loop. Avoids a mutex on the
|
||
// hot path which would itself constrain concurrency and
|
||
// invalidate the measurement.
|
||
for {
|
||
cur := peak.Load()
|
||
if now <= cur {
|
||
break
|
||
}
|
||
if peak.CompareAndSwap(cur, now) {
|
||
break
|
||
}
|
||
}
|
||
time.Sleep(workSleep)
|
||
inFlight.Add(-1)
|
||
return nil
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), hardBudget)
|
||
defer cancel()
|
||
if err := boundedFanOut(ctx, jobs, capN, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut returned error: %v", err)
|
||
}
|
||
|
||
if got := peak.Load(); got > int64(capN) {
|
||
t.Errorf("peak in-flight count exceeded the cap: got %d, cap %d", got, capN)
|
||
}
|
||
// Sanity: the cap should actually be reached at least once with
|
||
// 50 jobs × 50ms sleep — if it isn't, either the workload is too
|
||
// short or the gate is broken in a way that caps below the
|
||
// intended value.
|
||
if got := peak.Load(); got < int64(capN) {
|
||
t.Errorf("peak in-flight count never reached the cap: got %d, cap %d (workload too short or gate broken low?)", got, capN)
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_AllJobsRun pins that every (non-skipped) job is
|
||
// actually dispatched — the cap should add backpressure, not drop
|
||
// jobs. Counterpart to TestBoundedFanOut_CapHolds: that test asserts
|
||
// the upper bound; this one asserts the lower bound.
|
||
func TestBoundedFanOut_AllJobsRun(t *testing.T) {
|
||
const capN = 3
|
||
jobs := makeJobs(20)
|
||
|
||
var dispatched atomic.Int64
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
dispatched.Add(1)
|
||
return nil
|
||
}
|
||
|
||
if err := boundedFanOut(context.Background(), jobs, capN, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut returned error: %v", err)
|
||
}
|
||
|
||
if got := dispatched.Load(); got != int64(len(jobs)) {
|
||
t.Errorf("expected all %d jobs to be dispatched, got %d", len(jobs), got)
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_SkipsAgentRoutedDeployments pins the
|
||
// shouldSkipJob contract: deployment jobs with a non-empty AgentID
|
||
// belong to the agent's GetPendingWork path, so the server-side
|
||
// fan-out must skip them. boundedFanOut's behaviour here matches the
|
||
// pre-audit-#9 sequential loop's behaviour exactly.
|
||
func TestBoundedFanOut_SkipsAgentRoutedDeployments(t *testing.T) {
|
||
agentID := "agent-1"
|
||
jobs := []*domain.Job{
|
||
{ID: "j1", Type: domain.JobTypeRenewal, Status: domain.JobStatusPending},
|
||
{ID: "j2", Type: domain.JobTypeDeployment, Status: domain.JobStatusPending, AgentID: &agentID},
|
||
{ID: "j3", Type: domain.JobTypeIssuance, Status: domain.JobStatusPending},
|
||
}
|
||
|
||
var seen atomic.Int64
|
||
// seenIDs is appended from multiple worker goroutines; the
|
||
// surrounding mutex serialises the slice mutation so the
|
||
// race detector stays clean. (atomic.Int64 alone isn't
|
||
// sufficient: the slice header is the racing memory.)
|
||
var (
|
||
mu sync.Mutex
|
||
seenIDs []string
|
||
)
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
seen.Add(1)
|
||
mu.Lock()
|
||
seenIDs = append(seenIDs, job.ID)
|
||
mu.Unlock()
|
||
return nil
|
||
}
|
||
|
||
if err := boundedFanOut(context.Background(), jobs, 5, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut returned error: %v", err)
|
||
}
|
||
|
||
if got := seen.Load(); got != 2 {
|
||
mu.Lock()
|
||
t.Errorf("expected 2 jobs to run (renewal + issuance, deployment-with-agent skipped), got %d (ids=%v)", got, seenIDs)
|
||
mu.Unlock()
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_CtxCancelInterrupts pins that ctx cancellation
|
||
// during a long-running fan-out interrupts the dispatch loop. Without
|
||
// the ctx-aware Acquire (audit prompt's "anti-pattern: channel-based
|
||
// semaphore without ctx-aware acquire"), this test would hang the
|
||
// scheduler indefinitely on a stuck CA call.
|
||
func TestBoundedFanOut_CtxCancelInterrupts(t *testing.T) {
|
||
jobs := makeJobs(100)
|
||
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
// Work that respects ctx — sleeps until ctx done or 5s elapsed.
|
||
select {
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
case <-time.After(5 * time.Second):
|
||
return nil
|
||
}
|
||
}
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
// Cancel after 100ms so the fan-out aborts mid-stream.
|
||
go func() {
|
||
time.Sleep(100 * time.Millisecond)
|
||
cancel()
|
||
}()
|
||
|
||
start := time.Now()
|
||
err := boundedFanOut(ctx, jobs, 3, work, quietLogger())
|
||
elapsed := time.Since(start)
|
||
|
||
if err != nil {
|
||
t.Fatalf("boundedFanOut should not propagate the ctx error from work; got %v", err)
|
||
}
|
||
// Even with ctx cancel, the function returns nil because the
|
||
// loop exits via the Acquire-cancel branch (logged warn) and
|
||
// the Wait drains in-flight goroutines. Total elapsed should be
|
||
// well under the 5s "stuck CA" cap if the cancel actually
|
||
// interrupted the dispatch.
|
||
if elapsed > 6*time.Second {
|
||
t.Errorf("ctx cancel did not interrupt fan-out: elapsed=%v, expected <6s", elapsed)
|
||
}
|
||
}
|
||
|
||
// TestBoundedFanOut_FailedJobsCounted pins that errors from `work`
|
||
// don't cause the fan-out to abort — the failed counter increments
|
||
// and the loop continues. Jobs are independent; one cert failing
|
||
// shouldn't block the rest.
|
||
func TestBoundedFanOut_FailedJobsCounted(t *testing.T) {
|
||
const totalJobs = 10
|
||
jobs := makeJobs(totalJobs)
|
||
|
||
var dispatched atomic.Int64
|
||
failEvery := 3 // jobs 0, 3, 6, 9 fail
|
||
work := func(ctx context.Context, job *domain.Job) error {
|
||
idx, _ := strconv.Atoi(job.ID[len("job-"):])
|
||
dispatched.Add(1)
|
||
if idx%failEvery == 0 {
|
||
return fmt.Errorf("simulated failure for %s", job.ID)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
if err := boundedFanOut(context.Background(), jobs, 4, work, quietLogger()); err != nil {
|
||
t.Fatalf("boundedFanOut should swallow per-job errors; got %v", err)
|
||
}
|
||
|
||
if got := dispatched.Load(); got != int64(totalJobs) {
|
||
t.Errorf("expected all %d jobs dispatched even with failures, got %d", totalJobs, got)
|
||
}
|
||
}
|
||
|
||
// TestSetRenewalConcurrency_NormalizesNonPositive pins the ≤0 → 1
|
||
// fail-safe in SetRenewalConcurrency. semaphore.NewWeighted(0)
|
||
// constructs a semaphore that blocks every Acquire forever; the
|
||
// normalization prevents a misconfigured env var from wedging the
|
||
// scheduler.
|
||
func TestSetRenewalConcurrency_NormalizesNonPositive(t *testing.T) {
|
||
cases := []struct {
|
||
in int
|
||
want int
|
||
}{
|
||
{-100, 1},
|
||
{-1, 1},
|
||
{0, 1},
|
||
{1, 1},
|
||
{25, 25},
|
||
{1000, 1000},
|
||
}
|
||
for _, tc := range cases {
|
||
t.Run(strconv.Itoa(tc.in), func(t *testing.T) {
|
||
s := &JobService{}
|
||
s.SetRenewalConcurrency(tc.in)
|
||
if s.renewalConcurrency != tc.want {
|
||
t.Errorf("SetRenewalConcurrency(%d) -> renewalConcurrency=%d, want %d", tc.in, s.renewalConcurrency, tc.want)
|
||
}
|
||
})
|
||
}
|
||
}
|