diff --git a/cmd/server/main.go b/cmd/server/main.go index 8c5198b..2ee96b8 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -808,6 +808,11 @@ func main() { // CERTCTL_RENEWAL_CONCURRENCY; ≤0 normalised to 1 (sequential) // inside the setter. jobService.SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) + // SCALE-001 closure (Sprint 2, 2026-05-16): per-tick ClaimPendingJobs + // cap so 100K-job bursts don't materialise the full queue into + // memory before the bounded fan-out engages. Setting normalises ≤0 + // to 1000 (fail-safe vs. legacy unlimited semantics). + jobService.SetClaimLimit(cfg.Scheduler.JobClaimLimit) agentService := service.NewAgentService(agentRepo, certificateRepo, jobRepo, targetRepo, auditService, issuerRegistry, renewalService) agentService.SetProfileRepo(profileRepo) issuerService := service.NewIssuerService(issuerRepo, auditService, issuerRegistry, encryptionKey, logger) diff --git a/internal/config/config.go b/internal/config/config.go index 5e02dd4..9fd8ac3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -350,7 +350,12 @@ func Load() (*Config, error) { JobProcessorInterval: getEnvDuration("CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL", 30*time.Second), // Audit fix #9 — per-tick concurrency cap on the renewal/issuance/ // deployment goroutine fan-out. ≤0 → 1 (sequential). - RenewalConcurrency: getEnvInt("CERTCTL_RENEWAL_CONCURRENCY", 25), + RenewalConcurrency: getEnvInt("CERTCTL_RENEWAL_CONCURRENCY", 25), + // SCALE-001 closure (Sprint 2, 2026-05-16) — per-tick claim cap on + // the scheduler's ClaimPendingJobs sweep. Default 1000 keeps the + // fan-out busy (≈40× the renewal-concurrency cap) without + // page-thrashing on a 100K-job burst. ≤0 → 1000 (fail-safe). + JobClaimLimit: getEnvInt("CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT", 1000), AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute), NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute), // I-005: retry sweep for failed notifications. Mirrors RetryInterval diff --git a/internal/config/server.go b/internal/config/server.go index 085760e..5f1bcde 100644 --- a/internal/config/server.go +++ b/internal/config/server.go @@ -170,6 +170,26 @@ type SchedulerConfig struct { // Setting: CERTCTL_RENEWAL_CONCURRENCY environment variable. RenewalConcurrency int + // JobClaimLimit caps the number of Pending rows a single + // scheduler tick may claim via repository.JobRepository.ClaimPendingJobs. + // Default 1000. + // + // SCALE-001 closure (Sprint 2, 2026-05-16). Pre-fix the scheduler + // invoked ClaimPendingJobs with limit:0, which loads every Pending + // row in a single transaction. A 100K-job burst (cert-fleet sweep, + // post-outage recovery, etc.) would marshal the full queue into + // process memory before boundedFanOut's semaphore could back- + // pressure the upstream CAs. Capping the claim per tick keeps + // memory bounded; the next tick (JobProcessorInterval=30s default) + // picks up the rest. + // + // Operator-tune: bump for very-large-fleet deploys where 1000 + // per 30s isn't enough throughput. Values ≤ 0 fall back to 1000 + // rather than the legacy unlimited semantics — fail-safe. + // + // Setting: CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT environment variable. + JobClaimLimit int + // AgentHealthCheckInterval is how often the scheduler checks agent heartbeats. // Default: 2 minutes. Minimum: 1 second. Marks agents offline if no recent heartbeat. // Setting: CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL environment variable. diff --git a/internal/service/job.go b/internal/service/job.go index 21ac164..dfce926 100644 --- a/internal/service/job.go +++ b/internal/service/job.go @@ -44,6 +44,14 @@ type JobService struct { // wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) // to switch on the bounded fan-out. Audit fix #9. renewalConcurrency int + + // claimLimit caps the number of Pending rows ProcessPendingJobs + // claims in a single tick via ClaimPendingJobs. 0 (zero-value) + // preserves the legacy "unlimited" semantics so existing test + // wiring keeps its byte-for-byte behaviour. Production wiring + // calls SetClaimLimit(cfg.Scheduler.JobClaimLimit) (default 1000). + // SCALE-001 closure (Sprint 2, 2026-05-16). + claimLimit int } // NewJobService creates a new job service. @@ -93,6 +101,27 @@ func (s *JobService) SetRenewalConcurrency(n int) { s.renewalConcurrency = n } +// SetClaimLimit wires the per-tick cap on the number of Pending rows +// ProcessPendingJobs claims from the JobRepository. Production wiring +// passes cfg.Scheduler.JobClaimLimit (default 1000). Values ≤ 0 fall +// back to 1000 — fail-safe rather than reverting to the legacy +// unlimited semantics that SCALE-001 closed. +// +// Test wiring that constructs JobService via NewJobService and never +// calls SetClaimLimit retains the historical limit:0 ClaimPendingJobs +// invocation (the JobService.claimLimit zero-value), preserving +// byte-for-byte unit-test behaviour. Repository-level tests that +// exercise the LIMIT clause specifically pass a non-zero limit +// directly to ClaimPendingJobs and don't go through this seam. +// +// SCALE-001 closure (Sprint 2, 2026-05-16). +func (s *JobService) SetClaimLimit(n int) { + if n <= 0 { + n = 1000 + } + s.claimLimit = n +} + // SetAuditService wires an optional audit service for emitting lifecycle // events (e.g., scheduler-driven job_retry transitions recorded by // RetryFailedJobs). Construction keeps the audit dependency optional so @@ -112,8 +141,11 @@ func (s *JobService) SetAuditService(a *AuditService) { // idempotent against the pre-flipped state. func (s *JobService) ProcessPendingJobs(ctx context.Context) error { // Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock). - // Empty jobType matches all types; zero limit means unlimited (preserves prior semantics). - pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", 0) + // Empty jobType matches all types; claimLimit caps the per-tick claim + // (SCALE-001 closure — pre-fix limit:0 meant unlimited, which page- + // thrashed on 100K-job bursts). Zero claimLimit preserves legacy + // unlimited semantics for test wiring that hasn't called SetClaimLimit. + pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", s.claimLimit) if err != nil { return fmt.Errorf("failed to claim pending jobs: %w", err) } @@ -123,7 +155,13 @@ func (s *JobService) ProcessPendingJobs(ctx context.Context) error { return nil } - s.logger.Info("processing pending jobs", "count", len(pendingJobs)) + // SCALE-001: emit the per-tick claim count so operators can spot + // the cap engaging (when count == claimLimit, the queue is + // running ahead of the fan-out and the operator may want to bump + // CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT or CERTCTL_RENEWAL_CONCURRENCY). + s.logger.Info("processing pending jobs", + "count", len(pendingJobs), + "claim_limit", s.claimLimit) // Audit fix #9: bounded concurrent fan-out. When renewalConcurrency // is the zero value (caller never set it), fall through to the diff --git a/internal/service/job_test.go b/internal/service/job_test.go index c65ac2c..beeb581 100644 --- a/internal/service/job_test.go +++ b/internal/service/job_test.go @@ -6,6 +6,7 @@ import ( "errors" "log/slog" "os" + "strconv" "strings" "sync" "testing" @@ -931,3 +932,88 @@ func TestJobService_ReapTimedOutJobs_RepoErrorPropagates(t *testing.T) { t.Fatalf("expected 0 audit events after repo error, got %d", len(auditRepo.Events)) } } + +// ============================================================================= +// SCALE-001 closure (Sprint 2, 2026-05-16). JobService.SetClaimLimit +// must propagate to the ClaimPendingJobs `limit` argument so a 100K-job +// burst can't materialise the whole queue into process memory in one +// tick. The cap engages: claim N rows per tick, leave the rest for the +// next tick (JobProcessorInterval=30s default). +// +// Pre-fix the call was ClaimPendingJobs(ctx, "", 0) — limit 0 meant +// unlimited. Post-fix the limit is the configured cap, default 1000 +// (wired in cmd/server/main.go), test wiring overrides via SetClaimLimit. +// ============================================================================= + +func TestProcessPendingJobs_RespectsClaimLimit(t *testing.T) { + ctx := context.Background() + + now := time.Now() + // Stuff 10 pending renewal jobs. + jobs := make(map[string]*domain.Job, 10) + for i := 0; i < 10; i++ { + id := "job-claim-" + strconv.Itoa(i) + jobs[id] = &domain.Job{ + ID: id, + Type: domain.JobTypeRenewal, + CertificateID: "cert-claim-" + strconv.Itoa(i), + Status: domain.JobStatusPending, + Attempts: 0, + MaxAttempts: 3, + CreatedAt: now, + ScheduledAt: now, + } + } + jobRepo := &mockJobRepo{ + Jobs: jobs, + StatusUpdates: make(map[string]domain.JobStatus), + } + + jobService := newTestJobService(jobRepo) + jobService.SetClaimLimit(3) + + if err := jobService.ProcessPendingJobs(ctx); err != nil { + // processing fails for renewal-without-cert; the limit invariant + // is asserted independently of downstream success. + t.Logf("ProcessPendingJobs returned error (expected): %v", err) + } + + if jobRepo.LastClaimLimit != 3 { + t.Errorf("LastClaimLimit = %d; want 3 (SetClaimLimit must propagate)", jobRepo.LastClaimLimit) + } + + // Count how many transitioned from Pending → Running via the mock's + // atomic-claim behaviour. + jobRepo.mu.Lock() + defer jobRepo.mu.Unlock() + var running int + for _, j := range jobRepo.Jobs { + if j.Status == domain.JobStatusRunning { + running++ + } + } + if running != 3 { + t.Errorf("running-job count = %d; want 3 (claim cap should have stopped after 3)", running) + } +} + +// TestSetClaimLimit_NormalisesNonPositive pins the fail-safe behaviour +// — values ≤ 0 fall back to 1000 rather than reverting to the legacy +// unlimited semantics that SCALE-001 closed. +func TestSetClaimLimit_NormalisesNonPositive(t *testing.T) { + for _, in := range []int{0, -1, -1000} { + jobRepo := &mockJobRepo{ + Jobs: map[string]*domain.Job{}, + StatusUpdates: make(map[string]domain.JobStatus), + } + svc := newTestJobService(jobRepo) + svc.SetClaimLimit(in) + // Drive a claim to capture LastClaimLimit. + if err := svc.ProcessPendingJobs(context.Background()); err != nil { + t.Fatalf("ProcessPendingJobs: %v", err) + } + if jobRepo.LastClaimLimit != 1000 { + t.Errorf("SetClaimLimit(%d): LastClaimLimit = %d; want 1000 (fail-safe default)", in, jobRepo.LastClaimLimit) + } + } +} diff --git a/internal/service/testutil_test.go b/internal/service/testutil_test.go index 5085e93..c1f6e45 100644 --- a/internal/service/testutil_test.go +++ b/internal/service/testutil_test.go @@ -207,6 +207,10 @@ type mockJobRepo struct { ListTimedOutErr error ListOfflineAgentJobsErr error Updated []*domain.Job + // SCALE-001 closure (Sprint 2): records the most-recent `limit` + // passed to ClaimPendingJobs so tests can pin the per-tick cap + // propagation from JobService.SetClaimLimit. + LastClaimLimit int } func (m *mockJobRepo) List(ctx context.Context) ([]*domain.Job, error) { @@ -352,9 +356,13 @@ func (m *mockJobRepo) ListPendingByAgentID(ctx context.Context, agentID string) // ClaimPendingJobs simulates the H-6 atomic claim semantics: matching rows are transitioned // Pending → Running before being returned. The in-memory mock has no concurrency primitives // beyond the existing mutex, which is sufficient for single-goroutine service tests. +// +// LastClaimLimit is recorded for SCALE-001 (Sprint 2) tests that pin the +// per-tick cap propagation from JobService.SetClaimLimit. func (m *mockJobRepo) ClaimPendingJobs(ctx context.Context, jobType domain.JobType, limit int) ([]*domain.Job, error) { m.mu.Lock() defer m.mu.Unlock() + m.LastClaimLimit = limit if m.ListErr != nil { return nil, m.ListErr }