mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 18:21:32 +00:00
fix(scheduler): SCALE-001 — cap ClaimPendingJobs per-tick (default 1000)
Sprint 2 unified-master-audit closure. Pre-fix the scheduler invoked
ClaimPendingJobs(ctx, "", 0). limit:0 loads every Pending row in a
single transaction — a 100K-job burst (cert-fleet sweep, post-outage
recovery, large agent-fleet first boot) marshalled the full queue
into process memory before boundedFanOut's semaphore could back-
pressure the upstream CAs.
Fix:
- SchedulerConfig.JobClaimLimit (env CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT,
default 1000). ≤0 normalised to 1000 in SetClaimLimit — fail-safe
vs. legacy unlimited semantics.
- JobService.claimLimit threaded into the existing
ProcessPendingJobs flow; ClaimPendingJobs(ctx, "", s.claimLimit).
- cmd/server/main.go wires jobService.SetClaimLimit(cfg.Scheduler.JobClaimLimit).
- 'processing pending jobs' log line now includes claim_limit so
operators can spot the cap engaging (count == claim_limit ⇒
queue is running ahead of fan-out; bump CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT
or CERTCTL_RENEWAL_CONCURRENCY).
- Test wiring keeps the legacy zero-value (unlimited) for byte-
for-byte compatibility with the existing 600+ JobService unit
tests — only production code goes through SetClaimLimit.
Regression coverage:
- mockJobRepo.LastClaimLimit records the limit passed through
ClaimPendingJobs so tests can pin the propagation.
- TestProcessPendingJobs_RespectsClaimLimit: 10 Pending rows,
SetClaimLimit(3), expect exactly 3 transition to Running plus
LastClaimLimit=3 on the mock.
- TestSetClaimLimit_NormalisesNonPositive: 0/-1/-1000 all
normalise to 1000.
Closes SCALE-001.
This commit is contained in:
@@ -808,6 +808,11 @@ func main() {
|
|||||||
// CERTCTL_RENEWAL_CONCURRENCY; ≤0 normalised to 1 (sequential)
|
// CERTCTL_RENEWAL_CONCURRENCY; ≤0 normalised to 1 (sequential)
|
||||||
// inside the setter.
|
// inside the setter.
|
||||||
jobService.SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
jobService.SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
||||||
|
// SCALE-001 closure (Sprint 2, 2026-05-16): per-tick ClaimPendingJobs
|
||||||
|
// cap so 100K-job bursts don't materialise the full queue into
|
||||||
|
// memory before the bounded fan-out engages. Setting normalises ≤0
|
||||||
|
// to 1000 (fail-safe vs. legacy unlimited semantics).
|
||||||
|
jobService.SetClaimLimit(cfg.Scheduler.JobClaimLimit)
|
||||||
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
|
agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService)
|
||||||
agentService.SetProfileRepo(profileRepo)
|
agentService.SetProfileRepo(profileRepo)
|
||||||
issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger)
|
issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger)
|
||||||
|
|||||||
@@ -351,6 +351,11 @@ func Load() (*Config, error) {
|
|||||||
// Audit fix #9 — per-tick concurrency cap on the renewal/issuance/
|
// Audit fix #9 — per-tick concurrency cap on the renewal/issuance/
|
||||||
// deployment goroutine fan-out. ≤0 → 1 (sequential).
|
// deployment goroutine fan-out. ≤0 → 1 (sequential).
|
||||||
RenewalConcurrency: getEnvInt("CERTCTL_RENEWAL_CONCURRENCY", 25),
|
RenewalConcurrency: getEnvInt("CERTCTL_RENEWAL_CONCURRENCY", 25),
|
||||||
|
// SCALE-001 closure (Sprint 2, 2026-05-16) — per-tick claim cap on
|
||||||
|
// the scheduler's ClaimPendingJobs sweep. Default 1000 keeps the
|
||||||
|
// fan-out busy (≈40× the renewal-concurrency cap) without
|
||||||
|
// page-thrashing on a 100K-job burst. ≤0 → 1000 (fail-safe).
|
||||||
|
JobClaimLimit: getEnvInt("CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT", 1000),
|
||||||
AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute),
|
AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute),
|
||||||
NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute),
|
NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute),
|
||||||
// I-005: retry sweep for failed notifications. Mirrors RetryInterval
|
// I-005: retry sweep for failed notifications. Mirrors RetryInterval
|
||||||
|
|||||||
@@ -170,6 +170,26 @@ type SchedulerConfig struct {
|
|||||||
// Setting: CERTCTL_RENEWAL_CONCURRENCY environment variable.
|
// Setting: CERTCTL_RENEWAL_CONCURRENCY environment variable.
|
||||||
RenewalConcurrency int
|
RenewalConcurrency int
|
||||||
|
|
||||||
|
// JobClaimLimit caps the number of Pending rows a single
|
||||||
|
// scheduler tick may claim via repository.JobRepository.ClaimPendingJobs.
|
||||||
|
// Default 1000.
|
||||||
|
//
|
||||||
|
// SCALE-001 closure (Sprint 2, 2026-05-16). Pre-fix the scheduler
|
||||||
|
// invoked ClaimPendingJobs with limit:0, which loads every Pending
|
||||||
|
// row in a single transaction. A 100K-job burst (cert-fleet sweep,
|
||||||
|
// post-outage recovery, etc.) would marshal the full queue into
|
||||||
|
// process memory before boundedFanOut's semaphore could back-
|
||||||
|
// pressure the upstream CAs. Capping the claim per tick keeps
|
||||||
|
// memory bounded; the next tick (JobProcessorInterval=30s default)
|
||||||
|
// picks up the rest.
|
||||||
|
//
|
||||||
|
// Operator-tune: bump for very-large-fleet deploys where 1000
|
||||||
|
// per 30s isn't enough throughput. Values ≤ 0 fall back to 1000
|
||||||
|
// rather than the legacy unlimited semantics — fail-safe.
|
||||||
|
//
|
||||||
|
// Setting: CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT environment variable.
|
||||||
|
JobClaimLimit int
|
||||||
|
|
||||||
// AgentHealthCheckInterval is how often the scheduler checks agent heartbeats.
|
// AgentHealthCheckInterval is how often the scheduler checks agent heartbeats.
|
||||||
// Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat.
|
// Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat.
|
||||||
// Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable.
|
// Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable.
|
||||||
|
|||||||
+41
-3
@@ -44,6 +44,14 @@ type JobService struct {
|
|||||||
// wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
// wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
||||||
// to switch on the bounded fan-out. Audit fix #9.
|
// to switch on the bounded fan-out. Audit fix #9.
|
||||||
renewalConcurrency int
|
renewalConcurrency int
|
||||||
|
|
||||||
|
// claimLimit caps the number of Pending rows ProcessPendingJobs
|
||||||
|
// claims in a single tick via ClaimPendingJobs. 0 (zero-value)
|
||||||
|
// preserves the legacy "unlimited" semantics so existing test
|
||||||
|
// wiring keeps its byte-for-byte behaviour. Production wiring
|
||||||
|
// calls SetClaimLimit(cfg.Scheduler.JobClaimLimit) (default 1000).
|
||||||
|
// SCALE-001 closure (Sprint 2, 2026-05-16).
|
||||||
|
claimLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewJobService creates a new job service.
|
// NewJobService creates a new job service.
|
||||||
@@ -93,6 +101,27 @@ func (s *JobService) SetRenewalConcurrency(n int) {
|
|||||||
s.renewalConcurrency = n
|
s.renewalConcurrency = n
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetClaimLimit wires the per-tick cap on the number of Pending rows
|
||||||
|
// ProcessPendingJobs claims from the JobRepository. Production wiring
|
||||||
|
// passes cfg.Scheduler.JobClaimLimit (default 1000). Values ≤ 0 fall
|
||||||
|
// back to 1000 — fail-safe rather than reverting to the legacy
|
||||||
|
// unlimited semantics that SCALE-001 closed.
|
||||||
|
//
|
||||||
|
// Test wiring that constructs JobService via NewJobService and never
|
||||||
|
// calls SetClaimLimit retains the historical limit:0 ClaimPendingJobs
|
||||||
|
// invocation (the JobService.claimLimit zero-value), preserving
|
||||||
|
// byte-for-byte unit-test behaviour. Repository-level tests that
|
||||||
|
// exercise the LIMIT clause specifically pass a non-zero limit
|
||||||
|
// directly to ClaimPendingJobs and don't go through this seam.
|
||||||
|
//
|
||||||
|
// SCALE-001 closure (Sprint 2, 2026-05-16).
|
||||||
|
func (s *JobService) SetClaimLimit(n int) {
|
||||||
|
if n <= 0 {
|
||||||
|
n = 1000
|
||||||
|
}
|
||||||
|
s.claimLimit = n
|
||||||
|
}
|
||||||
|
|
||||||
// SetAuditService wires an optional audit service for emitting lifecycle
|
// SetAuditService wires an optional audit service for emitting lifecycle
|
||||||
// events (e.g., scheduler-driven job_retry transitions recorded by
|
// events (e.g., scheduler-driven job_retry transitions recorded by
|
||||||
// RetryFailedJobs). Construction keeps the audit dependency optional so
|
// RetryFailedJobs). Construction keeps the audit dependency optional so
|
||||||
@@ -112,8 +141,11 @@ func (s *JobService) SetAuditService(a *AuditService) {
|
|||||||
// idempotent against the pre-flipped state.
|
// idempotent against the pre-flipped state.
|
||||||
func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
|
func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
|
||||||
// Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock).
|
// Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock).
|
||||||
// Empty jobType matches all types; zero limit means unlimited (preserves prior semantics).
|
// Empty jobType matches all types; claimLimit caps the per-tick claim
|
||||||
pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", 0)
|
// (SCALE-001 closure — pre-fix limit:0 meant unlimited, which page-
|
||||||
|
// thrashed on 100K-job bursts). Zero claimLimit preserves legacy
|
||||||
|
// unlimited semantics for test wiring that hasn't called SetClaimLimit.
|
||||||
|
pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", s.claimLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to claim pending jobs: %w", err)
|
return fmt.Errorf("failed to claim pending jobs: %w", err)
|
||||||
}
|
}
|
||||||
@@ -123,7 +155,13 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
s.logger.Info("processing pending jobs", "count", len(pendingJobs))
|
// SCALE-001: emit the per-tick claim count so operators can spot
|
||||||
|
// the cap engaging (when count == claimLimit, the queue is
|
||||||
|
// running ahead of the fan-out and the operator may want to bump
|
||||||
|
// CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT or CERTCTL_RENEWAL_CONCURRENCY).
|
||||||
|
s.logger.Info("processing pending jobs",
|
||||||
|
"count", len(pendingJobs),
|
||||||
|
"claim_limit", s.claimLimit)
|
||||||
|
|
||||||
// Audit fix #9: bounded concurrent fan-out. When renewalConcurrency
|
// Audit fix #9: bounded concurrent fan-out. When renewalConcurrency
|
||||||
// is the zero value (caller never set it), fall through to the
|
// is the zero value (caller never set it), fall through to the
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -931,3 +932,88 @@ func TestJobService_ReapTimedOutJobs_RepoErrorPropagates(t *testing.T) {
|
|||||||
t.Fatalf("expected 0 audit events after repo error, got %d", len(auditRepo.Events))
|
t.Fatalf("expected 0 audit events after repo error, got %d", len(auditRepo.Events))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// =============================================================================
|
||||||
|
// SCALE-001 closure (Sprint 2, 2026-05-16). JobService.SetClaimLimit
|
||||||
|
// must propagate to the ClaimPendingJobs `limit` argument so a 100K-job
|
||||||
|
// burst can't materialise the whole queue into process memory in one
|
||||||
|
// tick. The cap engages: claim N rows per tick, leave the rest for the
|
||||||
|
// next tick (JobProcessorInterval=30s default).
|
||||||
|
//
|
||||||
|
// Pre-fix the call was ClaimPendingJobs(ctx, "", 0) — limit 0 meant
|
||||||
|
// unlimited. Post-fix the limit is the configured cap, default 1000
|
||||||
|
// (wired in cmd/server/main.go), test wiring overrides via SetClaimLimit.
|
||||||
|
// =============================================================================
|
||||||
|
|
||||||
|
func TestProcessPendingJobs_RespectsClaimLimit(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
// Stuff 10 pending renewal jobs.
|
||||||
|
jobs := make(map[string]*domain.Job, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
id := "job-claim-" + strconv.Itoa(i)
|
||||||
|
jobs[id] = &domain.Job{
|
||||||
|
ID: id,
|
||||||
|
Type: domain.JobTypeRenewal,
|
||||||
|
CertificateID: "cert-claim-" + strconv.Itoa(i),
|
||||||
|
Status: domain.JobStatusPending,
|
||||||
|
Attempts: 0,
|
||||||
|
MaxAttempts: 3,
|
||||||
|
CreatedAt: now,
|
||||||
|
ScheduledAt: now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jobRepo := &mockJobRepo{
|
||||||
|
Jobs: jobs,
|
||||||
|
StatusUpdates: make(map[string]domain.JobStatus),
|
||||||
|
}
|
||||||
|
|
||||||
|
jobService := newTestJobService(jobRepo)
|
||||||
|
jobService.SetClaimLimit(3)
|
||||||
|
|
||||||
|
if err := jobService.ProcessPendingJobs(ctx); err != nil {
|
||||||
|
// processing fails for renewal-without-cert; the limit invariant
|
||||||
|
// is asserted independently of downstream success.
|
||||||
|
t.Logf("ProcessPendingJobs returned error (expected): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if jobRepo.LastClaimLimit != 3 {
|
||||||
|
t.Errorf("LastClaimLimit = %d; want 3 (SetClaimLimit must propagate)", jobRepo.LastClaimLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Count how many transitioned from Pending → Running via the mock's
|
||||||
|
// atomic-claim behaviour.
|
||||||
|
jobRepo.mu.Lock()
|
||||||
|
defer jobRepo.mu.Unlock()
|
||||||
|
var running int
|
||||||
|
for _, j := range jobRepo.Jobs {
|
||||||
|
if j.Status == domain.JobStatusRunning {
|
||||||
|
running++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if running != 3 {
|
||||||
|
t.Errorf("running-job count = %d; want 3 (claim cap should have stopped after 3)", running)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSetClaimLimit_NormalisesNonPositive pins the fail-safe behaviour
|
||||||
|
// — values ≤ 0 fall back to 1000 rather than reverting to the legacy
|
||||||
|
// unlimited semantics that SCALE-001 closed.
|
||||||
|
func TestSetClaimLimit_NormalisesNonPositive(t *testing.T) {
|
||||||
|
for _, in := range []int{0, -1, -1000} {
|
||||||
|
jobRepo := &mockJobRepo{
|
||||||
|
Jobs: map[string]*domain.Job{},
|
||||||
|
StatusUpdates: make(map[string]domain.JobStatus),
|
||||||
|
}
|
||||||
|
svc := newTestJobService(jobRepo)
|
||||||
|
svc.SetClaimLimit(in)
|
||||||
|
// Drive a claim to capture LastClaimLimit.
|
||||||
|
if err := svc.ProcessPendingJobs(context.Background()); err != nil {
|
||||||
|
t.Fatalf("ProcessPendingJobs: %v", err)
|
||||||
|
}
|
||||||
|
if jobRepo.LastClaimLimit != 1000 {
|
||||||
|
t.Errorf("SetClaimLimit(%d): LastClaimLimit = %d; want 1000 (fail-safe default)", in, jobRepo.LastClaimLimit)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -207,6 +207,10 @@ type mockJobRepo struct {
|
|||||||
ListTimedOutErr error
|
ListTimedOutErr error
|
||||||
ListOfflineAgentJobsErr error
|
ListOfflineAgentJobsErr error
|
||||||
Updated []*domain.Job
|
Updated []*domain.Job
|
||||||
|
// SCALE-001 closure (Sprint 2): records the most-recent `limit`
|
||||||
|
// passed to ClaimPendingJobs so tests can pin the per-tick cap
|
||||||
|
// propagation from JobService.SetClaimLimit.
|
||||||
|
LastClaimLimit int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockJobRepo) List(ctx context.Context) ([]*domain.Job, error) {
|
func (m *mockJobRepo) List(ctx context.Context) ([]*domain.Job, error) {
|
||||||
@@ -352,9 +356,13 @@ func (m *mockJobRepo) ListPendingByAgentID(ctx context.Context, agentID string)
|
|||||||
// ClaimPendingJobs simulates the H-6 atomic claim semantics: matching rows are transitioned
|
// ClaimPendingJobs simulates the H-6 atomic claim semantics: matching rows are transitioned
|
||||||
// Pending → Running before being returned. The in-memory mock has no concurrency primitives
|
// Pending → Running before being returned. The in-memory mock has no concurrency primitives
|
||||||
// beyond the existing mutex, which is sufficient for single-goroutine service tests.
|
// beyond the existing mutex, which is sufficient for single-goroutine service tests.
|
||||||
|
//
|
||||||
|
// LastClaimLimit is recorded for SCALE-001 (Sprint 2) tests that pin the
|
||||||
|
// per-tick cap propagation from JobService.SetClaimLimit.
|
||||||
func (m *mockJobRepo) ClaimPendingJobs(ctx context.Context, jobType domain.JobType, limit int) ([]*domain.Job, error) {
|
func (m *mockJobRepo) ClaimPendingJobs(ctx context.Context, jobType domain.JobType, limit int) ([]*domain.Job, error) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
m.LastClaimLimit = limit
|
||||||
if m.ListErr != nil {
|
if m.ListErr != nil {
|
||||||
return nil, m.ListErr
|
return nil, m.ListErr
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user