fix(scheduler): SCALE-001 — cap ClaimPendingJobs per-tick (default 1000)

Sprint 2 unified-master-audit closure. Pre-fix the scheduler invoked
ClaimPendingJobs(ctx, "", 0). limit:0 loads every Pending row in a
single transaction — a 100K-job burst (cert-fleet sweep, post-outage
recovery, large agent-fleet first boot) marshalled the full queue
into process memory before boundedFanOut's semaphore could back-
pressure the upstream CAs.

Fix:
  - SchedulerConfig.JobClaimLimit (env CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT,
    default 1000). ≤0 normalised to 1000 in SetClaimLimit — fail-safe
    vs. legacy unlimited semantics.
  - JobService.claimLimit threaded into the existing
    ProcessPendingJobs flow; ClaimPendingJobs(ctx, "", s.claimLimit).
  - cmd/server/main.go wires jobService.SetClaimLimit(cfg.Scheduler.JobClaimLimit).
  - 'processing pending jobs' log line now includes claim_limit so
    operators can spot the cap engaging (count == claim_limit ⇒
    queue is running ahead of fan-out; bump CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT
    or CERTCTL_RENEWAL_CONCURRENCY).
  - Test wiring keeps the legacy zero-value (unlimited) for byte-
    for-byte compatibility with the existing 600+ JobService unit
    tests — only production code goes through SetClaimLimit.

Regression coverage:
  - mockJobRepo.LastClaimLimit records the limit passed through
    ClaimPendingJobs so tests can pin the propagation.
  - TestProcessPendingJobs_RespectsClaimLimit: 10 Pending rows,
    SetClaimLimit(3), expect exactly 3 transition to Running plus
    LastClaimLimit=3 on the mock.
  - TestSetClaimLimit_NormalisesNonPositive: 0/-1/-1000 all
    normalise to 1000.

Closes SCALE-001.
This commit is contained in:
shankar0123
2026-05-16 04:00:49 +00:00
parent 7d2e7043b9
commit 037876fa0f
6 changed files with 166 additions and 4 deletions
+41 -3
View File
@@ -44,6 +44,14 @@ type JobService struct {
// wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
// to switch on the bounded fan-out. Audit fix #9.
renewalConcurrency int
// claimLimit caps the number of Pending rows ProcessPendingJobs
// claims in a single tick via ClaimPendingJobs. 0 (zero-value)
// preserves the legacy "unlimited" semantics so existing test
// wiring keeps its byte-for-byte behaviour. Production wiring
// calls SetClaimLimit(cfg.Scheduler.JobClaimLimit) (default 1000).
// SCALE-001 closure (Sprint 2, 2026-05-16).
claimLimit int
}
// NewJobService creates a new job service.
@@ -93,6 +101,27 @@ func (s *JobService) SetRenewalConcurrency(n int) {
s.renewalConcurrency = n
}
// SetClaimLimit wires the per-tick cap on the number of Pending rows
// ProcessPendingJobs claims from the JobRepository. Production wiring
// passes cfg.Scheduler.JobClaimLimit (default 1000). Values ≤ 0 fall
// back to 1000 — fail-safe rather than reverting to the legacy
// unlimited semantics that SCALE-001 closed.
//
// Test wiring that constructs JobService via NewJobService and never
// calls SetClaimLimit retains the historical limit:0 ClaimPendingJobs
// invocation (the JobService.claimLimit zero-value), preserving
// byte-for-byte unit-test behaviour. Repository-level tests that
// exercise the LIMIT clause specifically pass a non-zero limit
// directly to ClaimPendingJobs and don't go through this seam.
//
// SCALE-001 closure (Sprint 2, 2026-05-16).
func (s *JobService) SetClaimLimit(n int) {
if n <= 0 {
n = 1000
}
s.claimLimit = n
}
// SetAuditService wires an optional audit service for emitting lifecycle
// events (e.g., scheduler-driven job_retry transitions recorded by
// RetryFailedJobs). Construction keeps the audit dependency optional so
@@ -112,8 +141,11 @@ func (s *JobService) SetAuditService(a *AuditService) {
// idempotent against the pre-flipped state.
func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
// Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock).
// Empty jobType matches all types; zero limit means unlimited (preserves prior semantics).
pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", 0)
// Empty jobType matches all types; claimLimit caps the per-tick claim
// (SCALE-001 closure — pre-fix limit:0 meant unlimited, which page-
// thrashed on 100K-job bursts). Zero claimLimit preserves legacy
// unlimited semantics for test wiring that hasn't called SetClaimLimit.
pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", s.claimLimit)
if err != nil {
return fmt.Errorf("failed to claim pending jobs: %w", err)
}
@@ -123,7 +155,13 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
return nil
}
s.logger.Info("processing pending jobs", "count", len(pendingJobs))
// SCALE-001: emit the per-tick claim count so operators can spot
// the cap engaging (when count == claimLimit, the queue is
// running ahead of the fan-out and the operator may want to bump
// CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT or CERTCTL_RENEWAL_CONCURRENCY).
s.logger.Info("processing pending jobs",
"count", len(pendingJobs),
"claim_limit", s.claimLimit)
// Audit fix #9: bounded concurrent fan-out. When renewalConcurrency
// is the zero value (caller never set it), fall through to the
+86
View File
@@ -6,6 +6,7 @@ import (
"errors"
"log/slog"
"os"
"strconv"
"strings"
"sync"
"testing"
@@ -931,3 +932,88 @@ func TestJobService_ReapTimedOutJobs_RepoErrorPropagates(t *testing.T) {
t.Fatalf("expected 0 audit events after repo error, got %d", len(auditRepo.Events))
}
}
// =============================================================================
// SCALE-001 closure (Sprint 2, 2026-05-16). JobService.SetClaimLimit
// must propagate to the ClaimPendingJobs `limit` argument so a 100K-job
// burst can't materialise the whole queue into process memory in one
// tick. The cap engages: claim N rows per tick, leave the rest for the
// next tick (JobProcessorInterval=30s default).
//
// Pre-fix the call was ClaimPendingJobs(ctx, "", 0) — limit 0 meant
// unlimited. Post-fix the limit is the configured cap, default 1000
// (wired in cmd/server/main.go), test wiring overrides via SetClaimLimit.
// =============================================================================
func TestProcessPendingJobs_RespectsClaimLimit(t *testing.T) {
ctx := context.Background()
now := time.Now()
// Stuff 10 pending renewal jobs.
jobs := make(map[string]*domain.Job, 10)
for i := 0; i < 10; i++ {
id := "job-claim-" + strconv.Itoa(i)
jobs[id] = &domain.Job{
ID: id,
Type: domain.JobTypeRenewal,
CertificateID: "cert-claim-" + strconv.Itoa(i),
Status: domain.JobStatusPending,
Attempts: 0,
MaxAttempts: 3,
CreatedAt: now,
ScheduledAt: now,
}
}
jobRepo := &mockJobRepo{
Jobs: jobs,
StatusUpdates: make(map[string]domain.JobStatus),
}
jobService := newTestJobService(jobRepo)
jobService.SetClaimLimit(3)
if err := jobService.ProcessPendingJobs(ctx); err != nil {
// processing fails for renewal-without-cert; the limit invariant
// is asserted independently of downstream success.
t.Logf("ProcessPendingJobs returned error (expected): %v", err)
}
if jobRepo.LastClaimLimit != 3 {
t.Errorf("LastClaimLimit = %d; want 3 (SetClaimLimit must propagate)", jobRepo.LastClaimLimit)
}
// Count how many transitioned from Pending → Running via the mock's
// atomic-claim behaviour.
jobRepo.mu.Lock()
defer jobRepo.mu.Unlock()
var running int
for _, j := range jobRepo.Jobs {
if j.Status == domain.JobStatusRunning {
running++
}
}
if running != 3 {
t.Errorf("running-job count = %d; want 3 (claim cap should have stopped after 3)", running)
}
}
// TestSetClaimLimit_NormalisesNonPositive pins the fail-safe behaviour
// — values ≤ 0 fall back to 1000 rather than reverting to the legacy
// unlimited semantics that SCALE-001 closed.
func TestSetClaimLimit_NormalisesNonPositive(t *testing.T) {
for _, in := range []int{0, -1, -1000} {
jobRepo := &mockJobRepo{
Jobs: map[string]*domain.Job{},
StatusUpdates: make(map[string]domain.JobStatus),
}
svc := newTestJobService(jobRepo)
svc.SetClaimLimit(in)
// Drive a claim to capture LastClaimLimit.
if err := svc.ProcessPendingJobs(context.Background()); err != nil {
t.Fatalf("ProcessPendingJobs: %v", err)
}
if jobRepo.LastClaimLimit != 1000 {
t.Errorf("SetClaimLimit(%d): LastClaimLimit = %d; want 1000 (fail-safe default)", in, jobRepo.LastClaimLimit)
}
}
}
+8
View File
@@ -207,6 +207,10 @@ type mockJobRepo struct {
ListTimedOutErr error
ListOfflineAgentJobsErr error
Updated []*domain.Job
// SCALE-001 closure (Sprint 2): records the most-recent `limit`
// passed to ClaimPendingJobs so tests can pin the per-tick cap
// propagation from JobService.SetClaimLimit.
LastClaimLimit int
}
func (m *mockJobRepo) List(ctx context.Context) ([]*domain.Job, error) {
@@ -352,9 +356,13 @@ func (m *mockJobRepo) ListPendingByAgentID(ctx context.Context, agentID string)
// ClaimPendingJobs simulates the H-6 atomic claim semantics: matching rows are transitioned
// Pending → Running before being returned. The in-memory mock has no concurrency primitives
// beyond the existing mutex, which is sufficient for single-goroutine service tests.
//
// LastClaimLimit is recorded for SCALE-001 (Sprint 2) tests that pin the
// per-tick cap propagation from JobService.SetClaimLimit.
func (m *mockJobRepo) ClaimPendingJobs(ctx context.Context, jobType domain.JobType, limit int) ([]*domain.Job, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.LastClaimLimit = limit
if m.ListErr != nil {
return nil, m.ListErr
}