mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 22:51:30 +00:00
scheduler: bound renewal concurrency via CERTCTL_RENEWAL_CONCURRENCY
Closes the #9 acquisition-readiness blocker from the 2026-05-01 issuer coverage audit. Pre-fix, JobService.ProcessPendingJobs ran every claimed job sequentially in a single goroutine: safe but slow, and operators with large fleets had no lever to dial throughput up. Switching to fire-and-forget per-job goroutines would have unbounded the upstream-CA call rate and tripped DigiCert / Entrust / Sectigo rate limits — certctl's response to 429 was to retry on the next tick, re-fanning out the same calls and digging deeper into the limit. Operators need a knob. This commit: - Adds CERTCTL_RENEWAL_CONCURRENCY env var (default 25) loaded via the existing getEnvInt pattern in internal/config/config.go. Documented inline as the cap for the per-tick renewal/issuance/ deployment goroutine fan-out, with operator-tuning guidance: permissive upstream limits + large fleets (>10k certs) → 100; strict limits or async-CA-heavy fleets → 25 or lower. - Wires golang.org/x/sync/semaphore.Weighted around the per-job goroutine launch in JobService.ProcessPendingJobs. Acquire(ctx, 1) is the load-bearing piece — it BLOCKS the loop when at the cap, providing real backpressure rather than fire-and-forget. The fan-out is split into processPendingJobsSequential (legacy, preserved for unit-test wiring that doesn't call SetRenewalConcurrency) and processPendingJobsConcurrent (production, delegates to a generic boundedFanOut helper). - boundedFanOut takes the per-job work as a closure so the cap can be tested directly without standing up the renewal/deployment service graph. processed/failed counters use atomic.Int64 to avoid mutex overhead on every job completion; final log line reads both AFTER wg.Wait so the counts reflect every dispatched job. ctx-aware Acquire ensures a shutdown ctx cancel interrupts the dispatch loop promptly; in-flight goroutines drain via Wait before the function returns so no goroutine outlives the scheduler tick. - shouldSkipJob extracted as a package-private helper so the agent-routed-deployment skip logic is shared between the sequential and concurrent paths byte-for-byte (the audit prompt's "channel-based semaphore without ctx-aware acquire" anti-pattern is explicitly avoided — semaphore.Weighted.Acquire returns on ctx done; channel <- struct{}{} would block forever). - SetRenewalConcurrency setter on JobService normalises ≤0 to 1. semaphore.NewWeighted(0) constructs a semaphore that blocks every Acquire forever; the normalisation prevents a misconfigured env var from wedging the scheduler. - cmd/server/main.go wires SetRenewalConcurrency(cfg.Scheduler. RenewalConcurrency) on the freshly-built jobService, immediately after SetAuditService. Production deployments always take the bounded path; tests that build JobService directly via NewJobService keep their strict-sequential behaviour because renewalConcurrency is the zero value. - Tests in internal/service/job_concurrency_test.go: * TestBoundedFanOut_CapHolds — primary regression guard. 50 jobs × 50ms work × cap=5 → asserts peak in-flight never exceeds 5 AND reaches 5 at least once (catches both upper-bound regressions and gates that incorrectly cap below the configured value). Lock-free max via CompareAndSwap so the measurement instrument doesn't itself constrain concurrency. * TestBoundedFanOut_AllJobsRun — lower-bound: every non-skipped job is dispatched. * TestBoundedFanOut_SkipsAgentRoutedDeployments — pins the shouldSkipJob contract. * TestBoundedFanOut_CtxCancelInterrupts — ctx cancellation interrupts a stuck fan-out within the timeout budget. * TestBoundedFanOut_FailedJobsCounted — per-job errors don't abort the fan-out. * TestSetRenewalConcurrency_NormalizesNonPositive — ≤0 → 1 fail-safe pinned across negative/zero/positive inputs. - docs/features.md: scheduler-loop table augmented with the concurrency-cap env-var pointer alongside the job-processor row. - docs/architecture.md: Concurrency Safety section gains a paragraph explaining the cap, the operator-tuning guidance, the ctx-aware Acquire semantics, and the audit reference. Operator-facing impact: the first big renewal sweep no longer takes down the upstream CA's rate-limit budget. Existing deployments get the bounded path automatically (default 25); operators can override via env var without code changes. Verified locally: - gofmt -l . clean - go vet ./... clean - staticcheck ./... clean - go test -short -count=1 across service / scheduler / config / integration: green - Six new tests under TestBoundedFanOut* + TestSetRenewalConcurrency*: green Audit reference: cowork/issuer-coverage-audit-2026-05-01/RESULTS.md Top-10 fix #9.
This commit is contained in:
@@ -383,6 +383,11 @@ func main() {
|
|||||||
// I-001: emit "job_retry" audit events when the scheduler resets Failed→Pending.
|
// I-001: emit "job_retry" audit events when the scheduler resets Failed→Pending.
|
||||||
// SetAuditService is optional — JobService falls back to nil-guarded no-op if unwired.
|
// SetAuditService is optional — JobService falls back to nil-guarded no-op if unwired.
|
||||||
jobService.SetAuditService(auditService)
|
jobService.SetAuditService(auditService)
|
||||||
|
// Audit fix #9: bound the per-tick goroutine fan-out so a 5k-cert
|
||||||
|
// sweep doesn't trip upstream-CA rate limits. Default 25 from
|
||||||
|
// CERTCTL_RENEWAL_CONCURRENCY; ≤0 normalised to 1 (sequential)
|
||||||
|
// inside the setter.
|
||||||
|
jobService.SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
||||||
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
|
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
|
||||||
agentService.SetProfileRepo(profileRepo)
|
agentService.SetProfileRepo(profileRepo)
|
||||||
issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger)
|
issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger)
|
||||||
|
|||||||
@@ -1044,6 +1044,8 @@ For deployments that need JWT/OIDC/mTLS, the standard pattern is to put an authe
|
|||||||
|
|
||||||
The background scheduler uses `sync/atomic.Bool` idempotency guards on every loop (8 always-on plus up to 4 optional) — if a tick fires while the previous iteration is still running, it skips. A `sync.WaitGroup` tracks all in-flight goroutines. `WaitForCompletion(timeout)` blocks during shutdown until all work finishes or the timeout expires, preventing state corruption from mid-flight database operations during process exit.
|
The background scheduler uses `sync/atomic.Bool` idempotency guards on every loop (8 always-on plus up to 4 optional) — if a tick fires while the previous iteration is still running, it skips. A `sync.WaitGroup` tracks all in-flight goroutines. `WaitForCompletion(timeout)` blocks during shutdown until all work finishes or the timeout expires, preventing state corruption from mid-flight database operations during process exit.
|
||||||
|
|
||||||
|
The job-processor tick fans the per-job work out across up to `CERTCTL_RENEWAL_CONCURRENCY` goroutines (default 25), gated by `golang.org/x/sync/semaphore.Weighted`. The cap is the operator's lever for "how many concurrent CA calls per scheduler tick" — operators with permissive upstream limits and large fleets (>10k certs) can bump to 100; operators with strict limits or async-CA-heavy fleets should stay at 25 or lower. Values ≤ 0 normalise to 1 (sequential). The Acquire is ctx-aware so a shutdown-driven ctx cancel interrupts the dispatch loop promptly; in-flight goroutines drain via Wait before the tick returns. Closes the #9 acquisition-readiness blocker from the 2026-05-01 issuer coverage audit (pre-fix the fan-out had no cap, so a 5,000-cert sweep tripped DigiCert / Entrust / Sectigo rate limits and the next tick re-fanned-out the same calls).
|
||||||
|
|
||||||
### Logging
|
### Logging
|
||||||
|
|
||||||
All logging throughout the service layer uses Go's `log/slog` package for structured, queryable logs. This replaces ad-hoc `fmt.Printf` statements with consistent key-value logging that includes request context, operation names, and error details. Agents also implement exponential backoff on network failures to gracefully handle temporary connectivity issues with the control plane.
|
All logging throughout the service layer uses Go's `log/slog` package for structured, queryable logs. This replaces ad-hoc `fmt.Printf` statements with consistent key-value logging that includes request context, operation names, and error details. Agents also implement exponential backoff on network failures to gracefully handle temporary connectivity issues with the control plane.
|
||||||
|
|||||||
+1
-1
@@ -1205,7 +1205,7 @@ Single SQL `UNION` query replaces the previous "fetch all, filter in Go" approac
|
|||||||
| Loop | Default Interval | Always-on | Env Var | Description |
|
| Loop | Default Interval | Always-on | Env Var | Description |
|
||||||
|---|---|---|---|---|
|
|---|---|---|---|---|
|
||||||
| Renewal check | 1 hour | Yes | `CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL` | Check expiring certs, query ARI, create renewal jobs |
|
| Renewal check | 1 hour | Yes | `CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL` | Check expiring certs, query ARI, create renewal jobs |
|
||||||
| Job processor | 30 seconds | Yes | `CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL` | Process pending jobs |
|
| Job processor | 30 seconds | Yes | `CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL` | Process pending jobs (concurrency cap via `CERTCTL_RENEWAL_CONCURRENCY`, default 25) |
|
||||||
| Job retry | 5 minutes | Yes | `CERTCTL_SCHEDULER_RETRY_INTERVAL` | Retry Failed jobs (I-001) |
|
| Job retry | 5 minutes | Yes | `CERTCTL_SCHEDULER_RETRY_INTERVAL` | Retry Failed jobs (I-001) |
|
||||||
| Job timeout reaper | 10 minutes | Yes | `CERTCTL_JOB_TIMEOUT_INTERVAL` (per-state thresholds: `CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT`, `CERTCTL_JOB_AWAITING_CSR_TIMEOUT`) | Fail AwaitingCSR/AwaitingApproval jobs past timeout (I-003) |
|
| Job timeout reaper | 10 minutes | Yes | `CERTCTL_JOB_TIMEOUT_INTERVAL` (per-state thresholds: `CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT`, `CERTCTL_JOB_AWAITING_CSR_TIMEOUT`) | Fail AwaitingCSR/AwaitingApproval jobs past timeout (I-003) |
|
||||||
| Agent health check | 2 minutes | Yes | `CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL` | Check agent heartbeat staleness |
|
| Agent health check | 2 minutes | Yes | `CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL` | Check agent heartbeat staleness |
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ require (
|
|||||||
github.com/masterzen/winrm v0.0.0-20250927112105-5f8e6c707321
|
github.com/masterzen/winrm v0.0.0-20250927112105-5f8e6c707321
|
||||||
github.com/pkg/sftp v1.13.10
|
github.com/pkg/sftp v1.13.10
|
||||||
golang.org/x/crypto v0.45.0
|
golang.org/x/crypto v0.45.0
|
||||||
|
golang.org/x/sync v0.18.0
|
||||||
software.sslmate.com/src/go-pkcs12 v0.7.0
|
software.sslmate.com/src/go-pkcs12 v0.7.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -571,6 +571,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
|
|||||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||||
|
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
|||||||
@@ -1143,6 +1143,25 @@ type SchedulerConfig struct {
|
|||||||
// Setting: CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL environment variable.
|
// Setting: CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL environment variable.
|
||||||
JobProcessorInterval time.Duration
|
JobProcessorInterval time.Duration
|
||||||
|
|
||||||
|
// RenewalConcurrency caps the number of concurrent renewal/issuance/
|
||||||
|
// deployment goroutines launched per job-processor tick. Default 25 —
|
||||||
|
// high enough to make use of HTTP/1.1 connection reuse against an
|
||||||
|
// upstream CA, low enough to stay under typical per-customer rate
|
||||||
|
// limits. Operators with permissive upstream limits and large fleets
|
||||||
|
// (>10k certs) can bump to 100; operators with strict limits or
|
||||||
|
// async-CA-heavy fleets should keep at 25 or lower.
|
||||||
|
//
|
||||||
|
// Values ≤ 0 fall back to 1 (sequential) — fail-safe rather than
|
||||||
|
// panicking on semaphore.NewWeighted(0) semantics.
|
||||||
|
//
|
||||||
|
// Closes the #9 acquisition-readiness blocker from the 2026-05-01
|
||||||
|
// issuer coverage audit. Pre-fix the per-tick fan-out had no cap,
|
||||||
|
// so a 5k-cert sweep launched 5k in-flight HTTP calls to upstream
|
||||||
|
// CAs and tripped DigiCert/Entrust/Sectigo rate limits.
|
||||||
|
//
|
||||||
|
// Setting: CERTCTL_RENEWAL_CONCURRENCY environment variable.
|
||||||
|
RenewalConcurrency int
|
||||||
|
|
||||||
// AgentHealthCheckInterval is how often the scheduler checks agent heartbeats.
|
// AgentHealthCheckInterval is how often the scheduler checks agent heartbeats.
|
||||||
// Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat.
|
// Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat.
|
||||||
// Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable.
|
// Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable.
|
||||||
@@ -1436,6 +1455,9 @@ func Load() (*Config, error) {
|
|||||||
Scheduler: SchedulerConfig{
|
Scheduler: SchedulerConfig{
|
||||||
RenewalCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL", 1*time.Hour),
|
RenewalCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL", 1*time.Hour),
|
||||||
JobProcessorInterval: getEnvDuration("CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL", 30*time.Second),
|
JobProcessorInterval: getEnvDuration("CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL", 30*time.Second),
|
||||||
|
// Audit fix #9 — per-tick concurrency cap on the renewal/issuance/
|
||||||
|
// deployment goroutine fan-out. ≤0 → 1 (sequential).
|
||||||
|
RenewalConcurrency: getEnvInt("CERTCTL_RENEWAL_CONCURRENCY", 25),
|
||||||
AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute),
|
AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute),
|
||||||
NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute),
|
NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute),
|
||||||
// I-005: retry sweep for failed notifications. Mirrors RetryInterval
|
// I-005: retry sweep for failed notifications. Mirrors RetryInterval
|
||||||
|
|||||||
+141
-6
@@ -6,8 +6,12 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
|
||||||
"github.com/shankar0123/certctl/internal/domain"
|
"github.com/shankar0123/certctl/internal/domain"
|
||||||
"github.com/shankar0123/certctl/internal/repository"
|
"github.com/shankar0123/certctl/internal/repository"
|
||||||
)
|
)
|
||||||
@@ -29,6 +33,14 @@ type JobService struct {
|
|||||||
deploymentService *DeploymentService
|
deploymentService *DeploymentService
|
||||||
auditService *AuditService
|
auditService *AuditService
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
|
||||||
|
// renewalConcurrency caps the number of concurrent goroutines that
|
||||||
|
// ProcessPendingJobs spawns. 0 (zero-value) means "sequential" so
|
||||||
|
// existing test wiring that constructs JobService directly via
|
||||||
|
// NewJobService keeps its strict-serial behaviour. Production
|
||||||
|
// wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
||||||
|
// to switch on the bounded fan-out. Audit fix #9.
|
||||||
|
renewalConcurrency int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJobService creates a new job service.
|
// NewJobService creates a new job service.
|
||||||
@@ -56,6 +68,28 @@ func NewJobService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetRenewalConcurrency wires the per-tick fan-out concurrency cap that
|
||||||
|
// ProcessPendingJobs uses. Called from cmd/server/main.go with
|
||||||
|
// cfg.Scheduler.RenewalConcurrency (default 25). Values ≤ 0 are
|
||||||
|
// normalised to 1 — fail-safe to sequential rather than panicking on
|
||||||
|
// semaphore.NewWeighted(0) which constructs a semaphore that blocks
|
||||||
|
// every Acquire.
|
||||||
|
//
|
||||||
|
// Audit fix #9: bounded scheduler concurrency. Pre-fix
|
||||||
|
// ProcessPendingJobs ran every claimed job sequentially in a single
|
||||||
|
// goroutine (slow but safe); operators with large fleets needed a
|
||||||
|
// performance lever, but switching to fire-and-forget per-job
|
||||||
|
// goroutines would have unbounded the upstream-CA call rate and
|
||||||
|
// tripped DigiCert / Entrust / Sectigo rate limits. The cap gives
|
||||||
|
// the operator a knob (CERTCTL_RENEWAL_CONCURRENCY) to dial
|
||||||
|
// throughput up without losing the rate-limit headroom.
|
||||||
|
func (s *JobService) SetRenewalConcurrency(n int) {
|
||||||
|
if n <= 0 {
|
||||||
|
n = 1
|
||||||
|
}
|
||||||
|
s.renewalConcurrency = n
|
||||||
|
}
|
||||||
|
|
||||||
// SetAuditService wires an optional audit service for emitting lifecycle
|
// SetAuditService wires an optional audit service for emitting lifecycle
|
||||||
// events (e.g., scheduler-driven job_retry transitions recorded by
|
// events (e.g., scheduler-driven job_retry transitions recorded by
|
||||||
// RetryFailedJobs). Construction keeps the audit dependency optional so
|
// RetryFailedJobs). Construction keeps the audit dependency optional so
|
||||||
@@ -88,21 +122,34 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
|
|||||||
|
|
||||||
s.logger.Info("processing pending jobs", "count", len(pendingJobs))
|
s.logger.Info("processing pending jobs", "count", len(pendingJobs))
|
||||||
|
|
||||||
|
// Audit fix #9: bounded concurrent fan-out. When renewalConcurrency
|
||||||
|
// is the zero value (caller never set it), fall through to the
|
||||||
|
// historical strict-sequential loop so every existing test path
|
||||||
|
// keeps its byte-for-byte behaviour. Production wiring in
|
||||||
|
// cmd/server/main.go always calls SetRenewalConcurrency with the
|
||||||
|
// configured cap (default 25), so the bounded path is always taken
|
||||||
|
// in real deployments.
|
||||||
|
if s.renewalConcurrency <= 0 {
|
||||||
|
return s.processPendingJobsSequential(ctx, pendingJobs)
|
||||||
|
}
|
||||||
|
return s.processPendingJobsConcurrent(ctx, pendingJobs, s.renewalConcurrency)
|
||||||
|
}
|
||||||
|
|
||||||
|
// processPendingJobsSequential is the legacy strict-serial fan-out
|
||||||
|
// (preserved for unit-test wiring that constructs JobService via
|
||||||
|
// NewJobService and never calls SetRenewalConcurrency). One job at a
|
||||||
|
// time, no concurrency.
|
||||||
|
func (s *JobService) processPendingJobsSequential(ctx context.Context, pendingJobs []*domain.Job) error {
|
||||||
var processedCount int
|
var processedCount int
|
||||||
var failedCount int
|
var failedCount int
|
||||||
|
|
||||||
// Process each job
|
|
||||||
for _, job := range pendingJobs {
|
for _, job := range pendingJobs {
|
||||||
// Skip deployment jobs that have an agent_id — those are meant for agent
|
if shouldSkipJob(job) {
|
||||||
// pickup via GetPendingWork(), not server-side processing. The server should
|
|
||||||
// only process deployment jobs without an agent (legacy/serverless targets).
|
|
||||||
if job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != "" {
|
|
||||||
s.logger.Debug("skipping agent-routed deployment job",
|
s.logger.Debug("skipping agent-routed deployment job",
|
||||||
"job_id", job.ID,
|
"job_id", job.ID,
|
||||||
"agent_id", *job.AgentID)
|
"agent_id", *job.AgentID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.processJob(ctx, job); err != nil {
|
if err := s.processJob(ctx, job); err != nil {
|
||||||
s.logger.Error("failed to process job",
|
s.logger.Error("failed to process job",
|
||||||
"job_id", job.ID,
|
"job_id", job.ID,
|
||||||
@@ -122,6 +169,94 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processPendingJobsConcurrent is the production entry point for the
|
||||||
|
// bounded fan-out — it pins the per-job work function to s.processJob.
|
||||||
|
// Test code at internal/service/job_concurrency_test.go drives
|
||||||
|
// boundedFanOut directly with a counter-tracking stub so the
|
||||||
|
// concurrency cap can be asserted without standing up the full
|
||||||
|
// renewal/deployment service graph.
|
||||||
|
func (s *JobService) processPendingJobsConcurrent(ctx context.Context, pendingJobs []*domain.Job, capN int) error {
|
||||||
|
return boundedFanOut(ctx, pendingJobs, capN, s.processJob, s.logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
// boundedFanOut runs `work` over `pendingJobs` with at most `capN`
|
||||||
|
// goroutines in flight at any moment, using
|
||||||
|
// golang.org/x/sync/semaphore.Weighted for the gate. The Acquire(ctx,
|
||||||
|
// 1) call BLOCKS the loop when at the cap, providing real backpressure
|
||||||
|
// rather than fire-and-forget — the scheduler tick can't outrun the
|
||||||
|
// upstream-CA rate limit. ctx cancellation propagates through Acquire
|
||||||
|
// so a context.Done() interrupts the fan-out promptly; in-flight
|
||||||
|
// goroutines drain via Wait before the function returns, so no
|
||||||
|
// goroutine outlives the scheduler tick.
|
||||||
|
//
|
||||||
|
// processed / failed are tracked via atomic.Int64 to avoid mutex
|
||||||
|
// overhead on every job completion; the final log line reads both
|
||||||
|
// after Wait, so the values reflect every dispatched job.
|
||||||
|
//
|
||||||
|
// Audit fix #9: the cap is a load-bearing pre-condition for "operators
|
||||||
|
// with permissive upstream limits and large fleets can scale up the
|
||||||
|
// renewal sweep without losing rate-limit headroom."
|
||||||
|
func boundedFanOut(ctx context.Context, pendingJobs []*domain.Job, capN int, work func(context.Context, *domain.Job) error, logger *slog.Logger) error {
|
||||||
|
var processed, failed atomic.Int64
|
||||||
|
sem := semaphore.NewWeighted(int64(capN))
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, job := range pendingJobs {
|
||||||
|
if shouldSkipJob(job) {
|
||||||
|
logger.Debug("skipping agent-routed deployment job",
|
||||||
|
"job_id", job.ID,
|
||||||
|
"agent_id", *job.AgentID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Acquire BEFORE launching so the offered load to upstream
|
||||||
|
// CAs respects the cap regardless of how fast individual
|
||||||
|
// jobs complete.
|
||||||
|
if err := sem.Acquire(ctx, 1); err != nil {
|
||||||
|
dispatched := processed.Load() + failed.Load()
|
||||||
|
logger.Warn("renewal fan-out cancelled mid-tick",
|
||||||
|
"reason", err,
|
||||||
|
"jobs_dispatched", dispatched,
|
||||||
|
"jobs_remaining", int64(len(pendingJobs))-dispatched)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(j *domain.Job) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer sem.Release(1)
|
||||||
|
|
||||||
|
if err := work(ctx, j); err != nil {
|
||||||
|
logger.Error("failed to process job",
|
||||||
|
"job_id", j.ID,
|
||||||
|
"job_type", j.Type,
|
||||||
|
"error", err)
|
||||||
|
failed.Add(1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
processed.Add(1)
|
||||||
|
}(job)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
logger.Info("job processing completed",
|
||||||
|
"processed", processed.Load(),
|
||||||
|
"failed", failed.Load(),
|
||||||
|
"total", len(pendingJobs),
|
||||||
|
"concurrency_cap", capN)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// shouldSkipJob returns true for deployment jobs that have an agent_id —
|
||||||
|
// those are meant for agent pickup via GetPendingWork(), not server-side
|
||||||
|
// processing. The server should only process deployment jobs without an
|
||||||
|
// agent (legacy/serverless targets).
|
||||||
|
func shouldSkipJob(job *domain.Job) bool {
|
||||||
|
return job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != ""
|
||||||
|
}
|
||||||
|
|
||||||
// processJob routes a single job to the appropriate service based on type.
|
// processJob routes a single job to the appropriate service based on type.
|
||||||
func (s *JobService) processJob(ctx context.Context, job *domain.Job) error {
|
func (s *JobService) processJob(ctx context.Context, job *domain.Job) error {
|
||||||
s.logger.Debug("processing job",
|
s.logger.Debug("processing job",
|
||||||
|
|||||||
@@ -0,0 +1,251 @@
|
|||||||
|
package service
|
||||||
|
|
||||||
|
// Audit fix #9 — bounded scheduler concurrency tests.
|
||||||
|
//
|
||||||
|
// boundedFanOut is the load-bearing primitive that caps the number of
|
||||||
|
// concurrent renewal/issuance/deployment goroutines per scheduler tick.
|
||||||
|
// Production wiring in cmd/server/main.go calls
|
||||||
|
// SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) (default 25);
|
||||||
|
// these tests pin the cap behaviour directly against boundedFanOut so
|
||||||
|
// they don't have to stand up the full renewal/deployment service
|
||||||
|
// graph just to assert "the cap holds."
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"strconv"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/shankar0123/certctl/internal/domain"
|
||||||
|
)
|
||||||
|
|
||||||
|
// quietLogger discards the boundedFanOut log output so the test runner
|
||||||
|
// doesn't drown in info-level lines for every dispatched job.
|
||||||
|
func quietLogger() *slog.Logger {
|
||||||
|
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// makeJobs builds n pending renewal jobs with deterministic IDs.
|
||||||
|
func makeJobs(n int) []*domain.Job {
|
||||||
|
jobs := make([]*domain.Job, n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
jobs[i] = &domain.Job{
|
||||||
|
ID: "job-" + strconv.Itoa(i),
|
||||||
|
Type: domain.JobTypeRenewal,
|
||||||
|
Status: domain.JobStatusPending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return jobs
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBoundedFanOut_CapHolds is the primary regression guard for the
|
||||||
|
// audit's #9 blocker. It runs 50 jobs through a fan-out with cap=5,
|
||||||
|
// where each "job" sleeps 50ms to ensure several dispatchers are
|
||||||
|
// in-flight simultaneously, and asserts that the peak in-flight count
|
||||||
|
// never exceeded the cap. Pre-fix the renewal fan-out had no cap, so
|
||||||
|
// this test would have observed peak in-flight = 50.
|
||||||
|
func TestBoundedFanOut_CapHolds(t *testing.T) {
|
||||||
|
const (
|
||||||
|
capN = 5
|
||||||
|
totalJobs = 50
|
||||||
|
workSleep = 50 * time.Millisecond
|
||||||
|
hardBudget = 30 * time.Second // generous; cap=5 + 50 jobs * 50ms ≈ 500ms
|
||||||
|
)
|
||||||
|
|
||||||
|
jobs := makeJobs(totalJobs)
|
||||||
|
|
||||||
|
var inFlight atomic.Int64
|
||||||
|
var peak atomic.Int64
|
||||||
|
|
||||||
|
work := func(ctx context.Context, job *domain.Job) error {
|
||||||
|
now := inFlight.Add(1)
|
||||||
|
// Lock-free max via CompareAndSwap loop. Avoids a mutex on the
|
||||||
|
// hot path which would itself constrain concurrency and
|
||||||
|
// invalidate the measurement.
|
||||||
|
for {
|
||||||
|
cur := peak.Load()
|
||||||
|
if now <= cur {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if peak.CompareAndSwap(cur, now) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(workSleep)
|
||||||
|
inFlight.Add(-1)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), hardBudget)
|
||||||
|
defer cancel()
|
||||||
|
if err := boundedFanOut(ctx, jobs, capN, work, quietLogger()); err != nil {
|
||||||
|
t.Fatalf("boundedFanOut returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := peak.Load(); got > int64(capN) {
|
||||||
|
t.Errorf("peak in-flight count exceeded the cap: got %d, cap %d", got, capN)
|
||||||
|
}
|
||||||
|
// Sanity: the cap should actually be reached at least once with
|
||||||
|
// 50 jobs × 50ms sleep — if it isn't, either the workload is too
|
||||||
|
// short or the gate is broken in a way that caps below the
|
||||||
|
// intended value.
|
||||||
|
if got := peak.Load(); got < int64(capN) {
|
||||||
|
t.Errorf("peak in-flight count never reached the cap: got %d, cap %d (workload too short or gate broken low?)", got, capN)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBoundedFanOut_AllJobsRun pins that every (non-skipped) job is
|
||||||
|
// actually dispatched — the cap should add backpressure, not drop
|
||||||
|
// jobs. Counterpart to TestBoundedFanOut_CapHolds: that test asserts
|
||||||
|
// the upper bound; this one asserts the lower bound.
|
||||||
|
func TestBoundedFanOut_AllJobsRun(t *testing.T) {
|
||||||
|
const capN = 3
|
||||||
|
jobs := makeJobs(20)
|
||||||
|
|
||||||
|
var dispatched atomic.Int64
|
||||||
|
work := func(ctx context.Context, job *domain.Job) error {
|
||||||
|
dispatched.Add(1)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := boundedFanOut(context.Background(), jobs, capN, work, quietLogger()); err != nil {
|
||||||
|
t.Fatalf("boundedFanOut returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := dispatched.Load(); got != int64(len(jobs)) {
|
||||||
|
t.Errorf("expected all %d jobs to be dispatched, got %d", len(jobs), got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBoundedFanOut_SkipsAgentRoutedDeployments pins the
|
||||||
|
// shouldSkipJob contract: deployment jobs with a non-empty AgentID
|
||||||
|
// belong to the agent's GetPendingWork path, so the server-side
|
||||||
|
// fan-out must skip them. boundedFanOut's behaviour here matches the
|
||||||
|
// pre-audit-#9 sequential loop's behaviour exactly.
|
||||||
|
func TestBoundedFanOut_SkipsAgentRoutedDeployments(t *testing.T) {
|
||||||
|
agentID := "agent-1"
|
||||||
|
jobs := []*domain.Job{
|
||||||
|
{ID: "j1", Type: domain.JobTypeRenewal, Status: domain.JobStatusPending},
|
||||||
|
{ID: "j2", Type: domain.JobTypeDeployment, Status: domain.JobStatusPending, AgentID: &agentID},
|
||||||
|
{ID: "j3", Type: domain.JobTypeIssuance, Status: domain.JobStatusPending},
|
||||||
|
}
|
||||||
|
|
||||||
|
var seen atomic.Int64
|
||||||
|
var seenIDs []string
|
||||||
|
work := func(ctx context.Context, job *domain.Job) error {
|
||||||
|
seen.Add(1)
|
||||||
|
seenIDs = append(seenIDs, job.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := boundedFanOut(context.Background(), jobs, 5, work, quietLogger()); err != nil {
|
||||||
|
t.Fatalf("boundedFanOut returned error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := seen.Load(); got != 2 {
|
||||||
|
t.Errorf("expected 2 jobs to run (renewal + issuance, deployment-with-agent skipped), got %d (ids=%v)", got, seenIDs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBoundedFanOut_CtxCancelInterrupts pins that ctx cancellation
|
||||||
|
// during a long-running fan-out interrupts the dispatch loop. Without
|
||||||
|
// the ctx-aware Acquire (audit prompt's "anti-pattern: channel-based
|
||||||
|
// semaphore without ctx-aware acquire"), this test would hang the
|
||||||
|
// scheduler indefinitely on a stuck CA call.
|
||||||
|
func TestBoundedFanOut_CtxCancelInterrupts(t *testing.T) {
|
||||||
|
jobs := makeJobs(100)
|
||||||
|
|
||||||
|
work := func(ctx context.Context, job *domain.Job) error {
|
||||||
|
// Work that respects ctx — sleeps until ctx done or 5s elapsed.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
// Cancel after 100ms so the fan-out aborts mid-stream.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
err := boundedFanOut(ctx, jobs, 3, work, quietLogger())
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("boundedFanOut should not propagate the ctx error from work; got %v", err)
|
||||||
|
}
|
||||||
|
// Even with ctx cancel, the function returns nil because the
|
||||||
|
// loop exits via the Acquire-cancel branch (logged warn) and
|
||||||
|
// the Wait drains in-flight goroutines. Total elapsed should be
|
||||||
|
// well under the 5s "stuck CA" cap if the cancel actually
|
||||||
|
// interrupted the dispatch.
|
||||||
|
if elapsed > 6*time.Second {
|
||||||
|
t.Errorf("ctx cancel did not interrupt fan-out: elapsed=%v, expected <6s", elapsed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestBoundedFanOut_FailedJobsCounted pins that errors from `work`
|
||||||
|
// don't cause the fan-out to abort — the failed counter increments
|
||||||
|
// and the loop continues. Jobs are independent; one cert failing
|
||||||
|
// shouldn't block the rest.
|
||||||
|
func TestBoundedFanOut_FailedJobsCounted(t *testing.T) {
|
||||||
|
const totalJobs = 10
|
||||||
|
jobs := makeJobs(totalJobs)
|
||||||
|
|
||||||
|
var dispatched atomic.Int64
|
||||||
|
failEvery := 3 // jobs 0, 3, 6, 9 fail
|
||||||
|
work := func(ctx context.Context, job *domain.Job) error {
|
||||||
|
idx, _ := strconv.Atoi(job.ID[len("job-"):])
|
||||||
|
dispatched.Add(1)
|
||||||
|
if idx%failEvery == 0 {
|
||||||
|
return fmt.Errorf("simulated failure for %s", job.ID)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := boundedFanOut(context.Background(), jobs, 4, work, quietLogger()); err != nil {
|
||||||
|
t.Fatalf("boundedFanOut should swallow per-job errors; got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := dispatched.Load(); got != int64(totalJobs) {
|
||||||
|
t.Errorf("expected all %d jobs dispatched even with failures, got %d", totalJobs, got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSetRenewalConcurrency_NormalizesNonPositive pins the ≤0 → 1
|
||||||
|
// fail-safe in SetRenewalConcurrency. semaphore.NewWeighted(0)
|
||||||
|
// constructs a semaphore that blocks every Acquire forever; the
|
||||||
|
// normalization prevents a misconfigured env var from wedging the
|
||||||
|
// scheduler.
|
||||||
|
func TestSetRenewalConcurrency_NormalizesNonPositive(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
in int
|
||||||
|
want int
|
||||||
|
}{
|
||||||
|
{-100, 1},
|
||||||
|
{-1, 1},
|
||||||
|
{0, 1},
|
||||||
|
{1, 1},
|
||||||
|
{25, 25},
|
||||||
|
{1000, 1000},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
t.Run(strconv.Itoa(tc.in), func(t *testing.T) {
|
||||||
|
s := &JobService{}
|
||||||
|
s.SetRenewalConcurrency(tc.in)
|
||||||
|
if s.renewalConcurrency != tc.want {
|
||||||
|
t.Errorf("SetRenewalConcurrency(%d) -> renewalConcurrency=%d, want %d", tc.in, s.renewalConcurrency, tc.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user