scheduler: bound renewal concurrency via CERTCTL_RENEWAL_CONCURRENCY

Closes the #9 acquisition-readiness blocker from the 2026-05-01 issuer
coverage audit. Pre-fix, JobService.ProcessPendingJobs ran every
claimed job sequentially in a single goroutine: safe but slow, and
operators with large fleets had no lever to dial throughput up.
Switching to fire-and-forget per-job goroutines would have unbounded
the upstream-CA call rate and tripped DigiCert / Entrust / Sectigo
rate limits — certctl's response to 429 was to retry on the next
tick, re-fanning out the same calls and digging deeper into the
limit. Operators need a knob.

This commit:

- Adds CERTCTL_RENEWAL_CONCURRENCY env var (default 25) loaded via
  the existing getEnvInt pattern in internal/config/config.go.
  Documented inline as the cap for the per-tick renewal/issuance/
  deployment goroutine fan-out, with operator-tuning guidance:
  permissive upstream limits + large fleets (>10k certs) → 100;
  strict limits or async-CA-heavy fleets → 25 or lower.

- Wires golang.org/x/sync/semaphore.Weighted around the per-job
  goroutine launch in JobService.ProcessPendingJobs. Acquire(ctx, 1)
  is the load-bearing piece — it BLOCKS the loop when at the cap,
  providing real backpressure rather than fire-and-forget. The
  fan-out is split into processPendingJobsSequential (legacy,
  preserved for unit-test wiring that doesn't call
  SetRenewalConcurrency) and processPendingJobsConcurrent (production,
  delegates to a generic boundedFanOut helper).

- boundedFanOut takes the per-job work as a closure so the cap can
  be tested directly without standing up the renewal/deployment
  service graph. processed/failed counters use atomic.Int64 to
  avoid mutex overhead on every job completion; final log line
  reads both AFTER wg.Wait so the counts reflect every dispatched
  job. ctx-aware Acquire ensures a shutdown ctx cancel interrupts
  the dispatch loop promptly; in-flight goroutines drain via Wait
  before the function returns so no goroutine outlives the
  scheduler tick.

- shouldSkipJob extracted as a package-private helper so the
  agent-routed-deployment skip logic is shared between the
  sequential and concurrent paths byte-for-byte (the audit prompt's
  "channel-based semaphore without ctx-aware acquire" anti-pattern
  is explicitly avoided — semaphore.Weighted.Acquire returns on ctx
  done; channel <- struct{}{} would block forever).

- SetRenewalConcurrency setter on JobService normalises ≤0 to 1.
  semaphore.NewWeighted(0) constructs a semaphore that blocks every
  Acquire forever; the normalisation prevents a misconfigured env
  var from wedging the scheduler.

- cmd/server/main.go wires SetRenewalConcurrency(cfg.Scheduler.
  RenewalConcurrency) on the freshly-built jobService, immediately
  after SetAuditService. Production deployments always take the
  bounded path; tests that build JobService directly via
  NewJobService keep their strict-sequential behaviour because
  renewalConcurrency is the zero value.

- Tests in internal/service/job_concurrency_test.go:
  * TestBoundedFanOut_CapHolds — primary regression guard. 50 jobs
    × 50ms work × cap=5 → asserts peak in-flight never exceeds 5
    AND reaches 5 at least once (catches both upper-bound regressions
    and gates that incorrectly cap below the configured value).
    Lock-free max via CompareAndSwap so the measurement instrument
    doesn't itself constrain concurrency.
  * TestBoundedFanOut_AllJobsRun — lower-bound: every non-skipped
    job is dispatched.
  * TestBoundedFanOut_SkipsAgentRoutedDeployments — pins the
    shouldSkipJob contract.
  * TestBoundedFanOut_CtxCancelInterrupts — ctx cancellation
    interrupts a stuck fan-out within the timeout budget.
  * TestBoundedFanOut_FailedJobsCounted — per-job errors don't
    abort the fan-out.
  * TestSetRenewalConcurrency_NormalizesNonPositive — ≤0 → 1 fail-safe
    pinned across negative/zero/positive inputs.

- docs/features.md: scheduler-loop table augmented with the
  concurrency-cap env-var pointer alongside the job-processor row.

- docs/architecture.md: Concurrency Safety section gains a paragraph
  explaining the cap, the operator-tuning guidance, the ctx-aware
  Acquire semantics, and the audit reference.

Operator-facing impact: the first big renewal sweep no longer
takes down the upstream CA's rate-limit budget. Existing deployments
get the bounded path automatically (default 25); operators can
override via env var without code changes.

Verified locally:
- gofmt -l . clean
- go vet ./... clean
- staticcheck ./... clean
- go test -short -count=1 across service / scheduler / config /
  integration: green
- Six new tests under TestBoundedFanOut* + TestSetRenewalConcurrency*:
  green

Audit reference: cowork/issuer-coverage-audit-2026-05-01/RESULTS.md
Top-10 fix #9.
This commit is contained in:
shankar0123
2026-05-02 14:12:30 +00:00
parent c2e53e1ab5
commit 4b73344acf
8 changed files with 427 additions and 9 deletions
+5
View File
@@ -383,6 +383,11 @@ func main() {
// I-001: emit "job_retry" audit events when the scheduler resets Failed→Pending. // I-001: emit "job_retry" audit events when the scheduler resets Failed→Pending.
// SetAuditService is optional — JobService falls back to nil-guarded no-op if unwired. // SetAuditService is optional — JobService falls back to nil-guarded no-op if unwired.
jobService.SetAuditService(auditService) jobService.SetAuditService(auditService)
// Audit fix #9: bound the per-tick goroutine fan-out so a 5k-cert
// sweep doesn't trip upstream-CA rate limits. Default 25 from
// CERTCTL_RENEWAL_CONCURRENCY; ≤0 normalised to 1 (sequential)
// inside the setter.
jobService.SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService) agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
agentService.SetProfileRepo(profileRepo) agentService.SetProfileRepo(profileRepo)
issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger) issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger)
+2
View File
@@ -1044,6 +1044,8 @@ For deployments that need JWT/OIDC/mTLS, the standard pattern is to put an authe
The background scheduler uses `sync/atomic.Bool` idempotency guards on every loop (8 always-on plus up to 4 optional) — if a tick fires while the previous iteration is still running, it skips. A `sync.WaitGroup` tracks all in-flight goroutines. `WaitForCompletion(timeout)` blocks during shutdown until all work finishes or the timeout expires, preventing state corruption from mid-flight database operations during process exit. The background scheduler uses `sync/atomic.Bool` idempotency guards on every loop (8 always-on plus up to 4 optional) — if a tick fires while the previous iteration is still running, it skips. A `sync.WaitGroup` tracks all in-flight goroutines. `WaitForCompletion(timeout)` blocks during shutdown until all work finishes or the timeout expires, preventing state corruption from mid-flight database operations during process exit.
The job-processor tick fans the per-job work out across up to `CERTCTL_RENEWAL_CONCURRENCY` goroutines (default 25), gated by `golang.org/x/sync/semaphore.Weighted`. The cap is the operator's lever for "how many concurrent CA calls per scheduler tick" — operators with permissive upstream limits and large fleets (>10k certs) can bump to 100; operators with strict limits or async-CA-heavy fleets should stay at 25 or lower. Values ≤ 0 normalise to 1 (sequential). The Acquire is ctx-aware so a shutdown-driven ctx cancel interrupts the dispatch loop promptly; in-flight goroutines drain via Wait before the tick returns. Closes the #9 acquisition-readiness blocker from the 2026-05-01 issuer coverage audit (pre-fix the fan-out had no cap, so a 5,000-cert sweep tripped DigiCert / Entrust / Sectigo rate limits and the next tick re-fanned-out the same calls).
### Logging ### Logging
All logging throughout the service layer uses Go's `log/slog` package for structured, queryable logs. This replaces ad-hoc `fmt.Printf` statements with consistent key-value logging that includes request context, operation names, and error details. Agents also implement exponential backoff on network failures to gracefully handle temporary connectivity issues with the control plane. All logging throughout the service layer uses Go's `log/slog` package for structured, queryable logs. This replaces ad-hoc `fmt.Printf` statements with consistent key-value logging that includes request context, operation names, and error details. Agents also implement exponential backoff on network failures to gracefully handle temporary connectivity issues with the control plane.
+1 -1
View File
@@ -1205,7 +1205,7 @@ Single SQL `UNION` query replaces the previous "fetch all, filter in Go" approac
| Loop | Default Interval | Always-on | Env Var | Description | | Loop | Default Interval | Always-on | Env Var | Description |
|---|---|---|---|---| |---|---|---|---|---|
| Renewal check | 1 hour | Yes | `CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL` | Check expiring certs, query ARI, create renewal jobs | | Renewal check | 1 hour | Yes | `CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL` | Check expiring certs, query ARI, create renewal jobs |
| Job processor | 30 seconds | Yes | `CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL` | Process pending jobs | | Job processor | 30 seconds | Yes | `CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL` | Process pending jobs (concurrency cap via `CERTCTL_RENEWAL_CONCURRENCY`, default 25) |
| Job retry | 5 minutes | Yes | `CERTCTL_SCHEDULER_RETRY_INTERVAL` | Retry Failed jobs (I-001) | | Job retry | 5 minutes | Yes | `CERTCTL_SCHEDULER_RETRY_INTERVAL` | Retry Failed jobs (I-001) |
| Job timeout reaper | 10 minutes | Yes | `CERTCTL_JOB_TIMEOUT_INTERVAL` (per-state thresholds: `CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT`, `CERTCTL_JOB_AWAITING_CSR_TIMEOUT`) | Fail AwaitingCSR/AwaitingApproval jobs past timeout (I-003) | | Job timeout reaper | 10 minutes | Yes | `CERTCTL_JOB_TIMEOUT_INTERVAL` (per-state thresholds: `CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT`, `CERTCTL_JOB_AWAITING_CSR_TIMEOUT`) | Fail AwaitingCSR/AwaitingApproval jobs past timeout (I-003) |
| Agent health check | 2 minutes | Yes | `CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL` | Check agent heartbeat staleness | | Agent health check | 2 minutes | Yes | `CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL` | Check agent heartbeat staleness |
+1
View File
@@ -17,6 +17,7 @@ require (
github.com/masterzen/winrm v0.0.0-20250927112105-5f8e6c707321 github.com/masterzen/winrm v0.0.0-20250927112105-5f8e6c707321
github.com/pkg/sftp v1.13.10 github.com/pkg/sftp v1.13.10
golang.org/x/crypto v0.45.0 golang.org/x/crypto v0.45.0
golang.org/x/sync v0.18.0
software.sslmate.com/src/go-pkcs12 v0.7.0 software.sslmate.com/src/go-pkcs12 v0.7.0
) )
+2
View File
@@ -571,6 +571,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+24 -2
View File
@@ -1143,6 +1143,25 @@ type SchedulerConfig struct {
// Setting: CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL environment variable. // Setting: CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL environment variable.
JobProcessorInterval time.Duration JobProcessorInterval time.Duration
// RenewalConcurrency caps the number of concurrent renewal/issuance/
// deployment goroutines launched per job-processor tick. Default 25 —
// high enough to make use of HTTP/1.1 connection reuse against an
// upstream CA, low enough to stay under typical per-customer rate
// limits. Operators with permissive upstream limits and large fleets
// (>10k certs) can bump to 100; operators with strict limits or
// async-CA-heavy fleets should keep at 25 or lower.
//
// Values ≤ 0 fall back to 1 (sequential) — fail-safe rather than
// panicking on semaphore.NewWeighted(0) semantics.
//
// Closes the #9 acquisition-readiness blocker from the 2026-05-01
// issuer coverage audit. Pre-fix the per-tick fan-out had no cap,
// so a 5k-cert sweep launched 5k in-flight HTTP calls to upstream
// CAs and tripped DigiCert/Entrust/Sectigo rate limits.
//
// Setting: CERTCTL_RENEWAL_CONCURRENCY environment variable.
RenewalConcurrency int
// AgentHealthCheckInterval is how often the scheduler checks agent heartbeats. // AgentHealthCheckInterval is how often the scheduler checks agent heartbeats.
// Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat. // Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat.
// Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable. // Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable.
@@ -1434,8 +1453,11 @@ func Load() (*Config, error) {
DemoSeed: getEnvBool("CERTCTL_DEMO_SEED", false), DemoSeed: getEnvBool("CERTCTL_DEMO_SEED", false),
}, },
Scheduler: SchedulerConfig{ Scheduler: SchedulerConfig{
RenewalCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL", 1*time.Hour), RenewalCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL", 1*time.Hour),
JobProcessorInterval: getEnvDuration("CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL", 30*time.Second), JobProcessorInterval: getEnvDuration("CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL", 30*time.Second),
// Audit fix #9 — per-tick concurrency cap on the renewal/issuance/
// deployment goroutine fan-out. ≤0 → 1 (sequential).
RenewalConcurrency: getEnvInt("CERTCTL_RENEWAL_CONCURRENCY", 25),
AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute), AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute),
NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute), NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute),
// I-005: retry sweep for failed notifications. Mirrors RetryInterval // I-005: retry sweep for failed notifications. Mirrors RetryInterval
+141 -6
View File
@@ -6,8 +6,12 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"golang.org/x/sync/semaphore"
"github.com/shankar0123/certctl/internal/domain" "github.com/shankar0123/certctl/internal/domain"
"github.com/shankar0123/certctl/internal/repository" "github.com/shankar0123/certctl/internal/repository"
) )
@@ -29,6 +33,14 @@ type JobService struct {
deploymentService *DeploymentService deploymentService *DeploymentService
auditService *AuditService auditService *AuditService
logger *slog.Logger logger *slog.Logger
// renewalConcurrency caps the number of concurrent goroutines that
// ProcessPendingJobs spawns. 0 (zero-value) means "sequential" so
// existing test wiring that constructs JobService directly via
// NewJobService keeps its strict-serial behaviour. Production
// wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
// to switch on the bounded fan-out. Audit fix #9.
renewalConcurrency int
} }
// NewJobService creates a new job service. // NewJobService creates a new job service.
@@ -56,6 +68,28 @@ func NewJobService(
} }
} }
// SetRenewalConcurrency wires the per-tick fan-out concurrency cap that
// ProcessPendingJobs uses. Called from cmd/server/main.go with
// cfg.Scheduler.RenewalConcurrency (default 25). Values ≤ 0 are
// normalised to 1 — fail-safe to sequential rather than panicking on
// semaphore.NewWeighted(0) which constructs a semaphore that blocks
// every Acquire.
//
// Audit fix #9: bounded scheduler concurrency. Pre-fix
// ProcessPendingJobs ran every claimed job sequentially in a single
// goroutine (slow but safe); operators with large fleets needed a
// performance lever, but switching to fire-and-forget per-job
// goroutines would have unbounded the upstream-CA call rate and
// tripped DigiCert / Entrust / Sectigo rate limits. The cap gives
// the operator a knob (CERTCTL_RENEWAL_CONCURRENCY) to dial
// throughput up without losing the rate-limit headroom.
func (s *JobService) SetRenewalConcurrency(n int) {
if n <= 0 {
n = 1
}
s.renewalConcurrency = n
}
// SetAuditService wires an optional audit service for emitting lifecycle // SetAuditService wires an optional audit service for emitting lifecycle
// events (e.g., scheduler-driven job_retry transitions recorded by // events (e.g., scheduler-driven job_retry transitions recorded by
// RetryFailedJobs). Construction keeps the audit dependency optional so // RetryFailedJobs). Construction keeps the audit dependency optional so
@@ -88,21 +122,34 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
s.logger.Info("processing pending jobs", "count", len(pendingJobs)) s.logger.Info("processing pending jobs", "count", len(pendingJobs))
// Audit fix #9: bounded concurrent fan-out. When renewalConcurrency
// is the zero value (caller never set it), fall through to the
// historical strict-sequential loop so every existing test path
// keeps its byte-for-byte behaviour. Production wiring in
// cmd/server/main.go always calls SetRenewalConcurrency with the
// configured cap (default 25), so the bounded path is always taken
// in real deployments.
if s.renewalConcurrency <= 0 {
return s.processPendingJobsSequential(ctx, pendingJobs)
}
return s.processPendingJobsConcurrent(ctx, pendingJobs, s.renewalConcurrency)
}
// processPendingJobsSequential is the legacy strict-serial fan-out
// (preserved for unit-test wiring that constructs JobService via
// NewJobService and never calls SetRenewalConcurrency). One job at a
// time, no concurrency.
func (s *JobService) processPendingJobsSequential(ctx context.Context, pendingJobs []*domain.Job) error {
var processedCount int var processedCount int
var failedCount int var failedCount int
// Process each job
for _, job := range pendingJobs { for _, job := range pendingJobs {
// Skip deployment jobs that have an agent_id — those are meant for agent if shouldSkipJob(job) {
// pickup via GetPendingWork(), not server-side processing. The server should
// only process deployment jobs without an agent (legacy/serverless targets).
if job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != "" {
s.logger.Debug("skipping agent-routed deployment job", s.logger.Debug("skipping agent-routed deployment job",
"job_id", job.ID, "job_id", job.ID,
"agent_id", *job.AgentID) "agent_id", *job.AgentID)
continue continue
} }
if err := s.processJob(ctx, job); err != nil { if err := s.processJob(ctx, job); err != nil {
s.logger.Error("failed to process job", s.logger.Error("failed to process job",
"job_id", job.ID, "job_id", job.ID,
@@ -122,6 +169,94 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
return nil return nil
} }
// processPendingJobsConcurrent is the production entry point for the
// bounded fan-out — it pins the per-job work function to s.processJob.
// Test code at internal/service/job_concurrency_test.go drives
// boundedFanOut directly with a counter-tracking stub so the
// concurrency cap can be asserted without standing up the full
// renewal/deployment service graph.
func (s *JobService) processPendingJobsConcurrent(ctx context.Context, pendingJobs []*domain.Job, capN int) error {
return boundedFanOut(ctx, pendingJobs, capN, s.processJob, s.logger)
}
// boundedFanOut runs `work` over `pendingJobs` with at most `capN`
// goroutines in flight at any moment, using
// golang.org/x/sync/semaphore.Weighted for the gate. The Acquire(ctx,
// 1) call BLOCKS the loop when at the cap, providing real backpressure
// rather than fire-and-forget — the scheduler tick can't outrun the
// upstream-CA rate limit. ctx cancellation propagates through Acquire
// so a context.Done() interrupts the fan-out promptly; in-flight
// goroutines drain via Wait before the function returns, so no
// goroutine outlives the scheduler tick.
//
// processed / failed are tracked via atomic.Int64 to avoid mutex
// overhead on every job completion; the final log line reads both
// after Wait, so the values reflect every dispatched job.
//
// Audit fix #9: the cap is a load-bearing pre-condition for "operators
// with permissive upstream limits and large fleets can scale up the
// renewal sweep without losing rate-limit headroom."
func boundedFanOut(ctx context.Context, pendingJobs []*domain.Job, capN int, work func(context.Context, *domain.Job) error, logger *slog.Logger) error {
var processed, failed atomic.Int64
sem := semaphore.NewWeighted(int64(capN))
var wg sync.WaitGroup
for _, job := range pendingJobs {
if shouldSkipJob(job) {
logger.Debug("skipping agent-routed deployment job",
"job_id", job.ID,
"agent_id", *job.AgentID)
continue
}
// Acquire BEFORE launching so the offered load to upstream
// CAs respects the cap regardless of how fast individual
// jobs complete.
if err := sem.Acquire(ctx, 1); err != nil {
dispatched := processed.Load() + failed.Load()
logger.Warn("renewal fan-out cancelled mid-tick",
"reason", err,
"jobs_dispatched", dispatched,
"jobs_remaining", int64(len(pendingJobs))-dispatched)
break
}
wg.Add(1)
go func(j *domain.Job) {
defer wg.Done()
defer sem.Release(1)
if err := work(ctx, j); err != nil {
logger.Error("failed to process job",
"job_id", j.ID,
"job_type", j.Type,
"error", err)
failed.Add(1)
return
}
processed.Add(1)
}(job)
}
wg.Wait()
logger.Info("job processing completed",
"processed", processed.Load(),
"failed", failed.Load(),
"total", len(pendingJobs),
"concurrency_cap", capN)
return nil
}
// shouldSkipJob returns true for deployment jobs that have an agent_id —
// those are meant for agent pickup via GetPendingWork(), not server-side
// processing. The server should only process deployment jobs without an
// agent (legacy/serverless targets).
func shouldSkipJob(job *domain.Job) bool {
return job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != ""
}
// processJob routes a single job to the appropriate service based on type. // processJob routes a single job to the appropriate service based on type.
func (s *JobService) processJob(ctx context.Context, job *domain.Job) error { func (s *JobService) processJob(ctx context.Context, job *domain.Job) error {
s.logger.Debug("processing job", s.logger.Debug("processing job",
+251
View File
@@ -0,0 +1,251 @@
package service
// Audit fix #9 — bounded scheduler concurrency tests.
//
// boundedFanOut is the load-bearing primitive that caps the number of
// concurrent renewal/issuance/deployment goroutines per scheduler tick.
// Production wiring in cmd/server/main.go calls
// SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) (default 25);
// these tests pin the cap behaviour directly against boundedFanOut so
// they don't have to stand up the full renewal/deployment service
// graph just to assert "the cap holds."
import (
"context"
"fmt"
"io"
"log/slog"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/shankar0123/certctl/internal/domain"
)
// quietLogger discards the boundedFanOut log output so the test runner
// doesn't drown in info-level lines for every dispatched job.
func quietLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
}
// makeJobs builds n pending renewal jobs with deterministic IDs.
func makeJobs(n int) []*domain.Job {
jobs := make([]*domain.Job, n)
for i := 0; i < n; i++ {
jobs[i] = &domain.Job{
ID: "job-" + strconv.Itoa(i),
Type: domain.JobTypeRenewal,
Status: domain.JobStatusPending,
}
}
return jobs
}
// TestBoundedFanOut_CapHolds is the primary regression guard for the
// audit's #9 blocker. It runs 50 jobs through a fan-out with cap=5,
// where each "job" sleeps 50ms to ensure several dispatchers are
// in-flight simultaneously, and asserts that the peak in-flight count
// never exceeded the cap. Pre-fix the renewal fan-out had no cap, so
// this test would have observed peak in-flight = 50.
func TestBoundedFanOut_CapHolds(t *testing.T) {
const (
capN = 5
totalJobs = 50
workSleep = 50 * time.Millisecond
hardBudget = 30 * time.Second // generous; cap=5 + 50 jobs * 50ms ≈ 500ms
)
jobs := makeJobs(totalJobs)
var inFlight atomic.Int64
var peak atomic.Int64
work := func(ctx context.Context, job *domain.Job) error {
now := inFlight.Add(1)
// Lock-free max via CompareAndSwap loop. Avoids a mutex on the
// hot path which would itself constrain concurrency and
// invalidate the measurement.
for {
cur := peak.Load()
if now <= cur {
break
}
if peak.CompareAndSwap(cur, now) {
break
}
}
time.Sleep(workSleep)
inFlight.Add(-1)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), hardBudget)
defer cancel()
if err := boundedFanOut(ctx, jobs, capN, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut returned error: %v", err)
}
if got := peak.Load(); got > int64(capN) {
t.Errorf("peak in-flight count exceeded the cap: got %d, cap %d", got, capN)
}
// Sanity: the cap should actually be reached at least once with
// 50 jobs × 50ms sleep — if it isn't, either the workload is too
// short or the gate is broken in a way that caps below the
// intended value.
if got := peak.Load(); got < int64(capN) {
t.Errorf("peak in-flight count never reached the cap: got %d, cap %d (workload too short or gate broken low?)", got, capN)
}
}
// TestBoundedFanOut_AllJobsRun pins that every (non-skipped) job is
// actually dispatched — the cap should add backpressure, not drop
// jobs. Counterpart to TestBoundedFanOut_CapHolds: that test asserts
// the upper bound; this one asserts the lower bound.
func TestBoundedFanOut_AllJobsRun(t *testing.T) {
const capN = 3
jobs := makeJobs(20)
var dispatched atomic.Int64
work := func(ctx context.Context, job *domain.Job) error {
dispatched.Add(1)
return nil
}
if err := boundedFanOut(context.Background(), jobs, capN, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut returned error: %v", err)
}
if got := dispatched.Load(); got != int64(len(jobs)) {
t.Errorf("expected all %d jobs to be dispatched, got %d", len(jobs), got)
}
}
// TestBoundedFanOut_SkipsAgentRoutedDeployments pins the
// shouldSkipJob contract: deployment jobs with a non-empty AgentID
// belong to the agent's GetPendingWork path, so the server-side
// fan-out must skip them. boundedFanOut's behaviour here matches the
// pre-audit-#9 sequential loop's behaviour exactly.
func TestBoundedFanOut_SkipsAgentRoutedDeployments(t *testing.T) {
agentID := "agent-1"
jobs := []*domain.Job{
{ID: "j1", Type: domain.JobTypeRenewal, Status: domain.JobStatusPending},
{ID: "j2", Type: domain.JobTypeDeployment, Status: domain.JobStatusPending, AgentID: &agentID},
{ID: "j3", Type: domain.JobTypeIssuance, Status: domain.JobStatusPending},
}
var seen atomic.Int64
var seenIDs []string
work := func(ctx context.Context, job *domain.Job) error {
seen.Add(1)
seenIDs = append(seenIDs, job.ID)
return nil
}
if err := boundedFanOut(context.Background(), jobs, 5, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut returned error: %v", err)
}
if got := seen.Load(); got != 2 {
t.Errorf("expected 2 jobs to run (renewal + issuance, deployment-with-agent skipped), got %d (ids=%v)", got, seenIDs)
}
}
// TestBoundedFanOut_CtxCancelInterrupts pins that ctx cancellation
// during a long-running fan-out interrupts the dispatch loop. Without
// the ctx-aware Acquire (audit prompt's "anti-pattern: channel-based
// semaphore without ctx-aware acquire"), this test would hang the
// scheduler indefinitely on a stuck CA call.
func TestBoundedFanOut_CtxCancelInterrupts(t *testing.T) {
jobs := makeJobs(100)
work := func(ctx context.Context, job *domain.Job) error {
// Work that respects ctx — sleeps until ctx done or 5s elapsed.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
return nil
}
}
ctx, cancel := context.WithCancel(context.Background())
// Cancel after 100ms so the fan-out aborts mid-stream.
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
start := time.Now()
err := boundedFanOut(ctx, jobs, 3, work, quietLogger())
elapsed := time.Since(start)
if err != nil {
t.Fatalf("boundedFanOut should not propagate the ctx error from work; got %v", err)
}
// Even with ctx cancel, the function returns nil because the
// loop exits via the Acquire-cancel branch (logged warn) and
// the Wait drains in-flight goroutines. Total elapsed should be
// well under the 5s "stuck CA" cap if the cancel actually
// interrupted the dispatch.
if elapsed > 6*time.Second {
t.Errorf("ctx cancel did not interrupt fan-out: elapsed=%v, expected <6s", elapsed)
}
}
// TestBoundedFanOut_FailedJobsCounted pins that errors from `work`
// don't cause the fan-out to abort — the failed counter increments
// and the loop continues. Jobs are independent; one cert failing
// shouldn't block the rest.
func TestBoundedFanOut_FailedJobsCounted(t *testing.T) {
const totalJobs = 10
jobs := makeJobs(totalJobs)
var dispatched atomic.Int64
failEvery := 3 // jobs 0, 3, 6, 9 fail
work := func(ctx context.Context, job *domain.Job) error {
idx, _ := strconv.Atoi(job.ID[len("job-"):])
dispatched.Add(1)
if idx%failEvery == 0 {
return fmt.Errorf("simulated failure for %s", job.ID)
}
return nil
}
if err := boundedFanOut(context.Background(), jobs, 4, work, quietLogger()); err != nil {
t.Fatalf("boundedFanOut should swallow per-job errors; got %v", err)
}
if got := dispatched.Load(); got != int64(totalJobs) {
t.Errorf("expected all %d jobs dispatched even with failures, got %d", totalJobs, got)
}
}
// TestSetRenewalConcurrency_NormalizesNonPositive pins the ≤0 → 1
// fail-safe in SetRenewalConcurrency. semaphore.NewWeighted(0)
// constructs a semaphore that blocks every Acquire forever; the
// normalization prevents a misconfigured env var from wedging the
// scheduler.
func TestSetRenewalConcurrency_NormalizesNonPositive(t *testing.T) {
cases := []struct {
in int
want int
}{
{-100, 1},
{-1, 1},
{0, 1},
{1, 1},
{25, 25},
{1000, 1000},
}
for _, tc := range cases {
t.Run(strconv.Itoa(tc.in), func(t *testing.T) {
s := &JobService{}
s.SetRenewalConcurrency(tc.in)
if s.renewalConcurrency != tc.want {
t.Errorf("SetRenewalConcurrency(%d) -> renewalConcurrency=%d, want %d", tc.in, s.renewalConcurrency, tc.want)
}
})
}
}