Files
certctl/internal/scheduler/scheduler.go
T
shankar0123 8191b1ee64 scheduler+db: close Phase 6 — scale hardening across pool, jitter, ETag, asyncpoll
Phase 6 of the certctl architecture diligence remediation. Five
findings across the same scheduler-and-DB-pool surface.

SCALE-M1 (Med) — DB pool default bumped 25 → 50
  internal/config/config.go line 1972:
    MaxConnections: getEnvInt("CERTCTL_DATABASE_MAX_CONNS", 50)
  Postgres default max_connections is 100; 50 leaves headroom for
  pg_dump + ad-hoc psql + a server replica without exhausting the
  DB-side cap. Operator override env var unchanged. Operator-tune
  ladder for larger fleets (5K / 50K certs) lives in
  docs/operator/scale.md as starter values pending Phase 8 load
  tests — explicitly marked TBD.

SCALE-M3 (Med) — async-CA poll budget operator-configurable
  Live state was partially-already-shipped: all 4 async-CA
  connectors (digicert, entrust, globalsign, sectigo) already have
  per-connector CERTCTL_<NAME>_POLL_MAX_WAIT_SECONDS (Audit fix #5
  closed pre-Phase-6). What was missing: a global package-default
  override. Shipped:
    - internal/connector/issuer/asyncpoll/asyncpoll.go gains
      SetDefaultMaxWait(d) + effectiveDefaultMaxWait var + the
      currentDefaultMaxWait() priority resolver.
    - cmd/server/main.go reads CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS
      at boot and calls SetDefaultMaxWait.
    - deploy/ENVIRONMENTS.md documents the new env var (G-3 guard
      green).
  Naming deviation from the prompt's CERTCTL_ASYNC_POLL_MAX_ATTEMPTS:
  the live code tracks wall-clock time (MaxWait), not attempt count.
  Matched the existing per-connector nomenclature (_POLL_MAX_WAIT_SECONDS)
  so the priority chain reads naturally.

SCALE-M5 (Med) — JitteredTicker wrapper for all 15 scheduler loops
  internal/scheduler/jitter.go ships NewJitteredTicker(interval,
  jitterPct) + DefaultSchedulerJitter (±10%). All 15 sites in
  internal/scheduler/scheduler.go migrated from bare time.NewTicker
  to NewJitteredTicker(interval, DefaultSchedulerJitter). Base
  intervals unchanged; only the per-tick envelope adds ±10%
  randomized delay so multiple loops with the same nominal cadence
  don't co-fire and spike CPU + DB at wall-clock boundaries.

  internal/scheduler/jitter_test.go pins:
    - Bounded envelope (each tick within ±jitterPct of interval)
    - Mean drift < 30% of nominal (sign-bug detector)
    - Stop() releases the goroutine + closes C
    - Stop() idempotent (no panic on repeat)
    - Zero-jitter behaves like time.NewTicker
    - Negative and >=1 jitterPct values clamped defensively

  CI guard scripts/ci-guards/no-bare-newticker-in-scheduler.sh blocks
  any future bare time.NewTicker in scheduler.go.

SCALE-L1 (Low) — renewal-sweep semaphore behavior documented
  docs/operator/scale.md "Scheduler tick budgets" section explains
  the per-tick concurrency semaphore (CERTCTL_RENEWAL_CONCURRENCY=25
  default), the ctx-cancellation drain on tick-budget overrun, and
  operator tuning advice (raise concurrency + DB pool together).
  No code change — the behavior is defensible as-is per the audit.

SCALE-L2 (Low) — ETag middleware for top-5 read endpoints
  internal/api/middleware/etag.go computes SHA-256 ETag over the
  buffered response body, respects If-None-Match, short-circuits
  to 304 Not Modified on match. GET/HEAD only; non-2xx responses
  pass through unchanged. 64 KiB buffer cap degrades gracefully on
  oversized responses (no caching, body still flushes intact).

  Wired around the top-5 read endpoints via etagged() helper in
  internal/api/router/router.go:
    GET /api/v1/certificates
    GET /api/v1/agents
    GET /api/v1/jobs
    GET /api/v1/audit
    GET /api/v1/discovered-certificates

  internal/api/middleware/etag_test.go pins 11 behaviors including
  304-on-repeat, 200-after-mutation-with-new-ETag, POST bypass,
  4xx/5xx pass-through, oversized-response degradation, wildcard
  match, HEAD-treated-like-GET, byte-equal pass-through.

Cross-cutting fixes:
  - internal/config/config_test.go::TestLoad_DefaultValues updated
    to assert the new 50 default (was 25).
  - deploy/helm/certctl/values.yaml comment corrected — agent
    pollInterval is hardcoded 30s, not env-configurable; the
    Phase 4 comment mistakenly referenced CERTCTL_AGENT_POLL_INTERVAL
    which G-3 caught as a phantom env var.
  - asyncpoll.go reformatted by gofmt; functionally unchanged.

Verification (all pass):
  grep -nE 'SetMaxOpenConns' internal/repository/postgres/db.go    # finds 1 site
  grep -nE 'CERTCTL_DATABASE_MAX_CONNS.*50' internal/config/config.go  # config default is 50
  grep -rnE 'CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS' internal/ deploy/ENVIRONMENTS.md  # wired
  grep -cE 'time\.NewTicker\(' internal/scheduler/scheduler.go    # 0 (all migrated)
  grep -cE 'JitteredTicker' internal/scheduler/scheduler.go         # 15
  ls internal/scheduler/jitter.go internal/api/middleware/etag.go   # both exist
  ls docs/operator/scale.md                                          # exists
  bash scripts/ci-guards/no-bare-newticker-in-scheduler.sh          # clean
  bash scripts/ci-guards/G-3-env-docs-drift.sh                      # clean
  go test ./internal/scheduler/ ./internal/api/middleware/ \
    ./internal/connector/issuer/asyncpoll/ ./internal/config/       # 4/4 packages green

Closes: cowork/certctl-architecture-diligence-audit.html#fix-SCALE-M1
        cowork/certctl-architecture-diligence-audit.html#fix-SCALE-M3
        cowork/certctl-architecture-diligence-audit.html#fix-SCALE-M5
        cowork/certctl-architecture-diligence-audit.html#fix-SCALE-L1
        cowork/certctl-architecture-diligence-audit.html#fix-SCALE-L2
2026-05-14 01:23:03 +00:00

1250 lines
43 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package scheduler
import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"time"
)
// RenewalServicer defines the interface for renewal operations used by the scheduler.
type RenewalServicer interface {
CheckExpiringCertificates(ctx context.Context) error
ExpireShortLivedCertificates(ctx context.Context) error
}
// JobServicer defines the interface for job processing used by the scheduler.
//
// RetryFailedJobs was added to close coverage gap I-001: JobService.RetryFailedJobs
// existed and was unit-tested but had no runtime caller prior to this loop being
// wired. The scheduler now drives it on an independent tick so failed jobs whose
// attempt counter is below MaxAttempts are periodically reset to Pending for the
// job processor to pick up again. maxRetries is advisory (per-job gating uses
// each job's own Attempts/MaxAttempts fields).
type JobServicer interface {
ProcessPendingJobs(ctx context.Context) error
RetryFailedJobs(ctx context.Context, maxRetries int) error
}
// AgentServicer defines the interface for agent health checks used by the scheduler.
type AgentServicer interface {
MarkStaleAgentsOffline(ctx context.Context, interval time.Duration) error
}
// NotificationServicer defines the interface for notification processing used by the scheduler.
//
// RetryFailedNotifications was added to close coverage gap I-005: the retry
// sweep transitions eligible Failed notifications to Pending on an independent
// tick, using exponential backoff with a 1h cap and a 5-attempt DLQ budget.
// Mirrors the I-001 job retry loop topology.
type NotificationServicer interface {
ProcessPendingNotifications(ctx context.Context) error
RetryFailedNotifications(ctx context.Context) error
}
// NetworkScanServicer defines the interface for network scanning used by the scheduler.
type NetworkScanServicer interface {
ScanAllTargets(ctx context.Context) error
}
// DigestServicer defines the interface for digest email processing used by the scheduler.
type DigestServicer interface {
ProcessDigest(ctx context.Context) error
}
// HealthCheckServicer defines the interface for endpoint TLS health monitoring used by the scheduler.
type HealthCheckServicer interface {
RunHealthChecks(ctx context.Context) error
}
// CloudDiscoveryServicer defines the interface for cloud secret manager discovery used by the scheduler.
type CloudDiscoveryServicer interface {
DiscoverAll(ctx context.Context) (int, []error)
}
// CRLCacheServicer defines the interface for the scheduler's CRL
// pre-generation loop. RegenerateAll iterates every issuer that
// supports CRL signing and refreshes its crl_cache row. Per-issuer
// failures are logged + audited; a single bad issuer does not stop
// the others.
//
// Bundle CRL/OCSP-Responder Phase 3: the scheduler-driven cache lets
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint serve from cache
// instead of regenerating per request.
type CRLCacheServicer interface {
RegenerateAll(ctx context.Context)
}
// ACMEGarbageCollector is the interface the scheduler's acmeGCLoop
// invokes once per tick. The concrete implementation is *service.ACMEService.
// Phase 5 — sweeps expired nonces / authzs / orders.
type ACMEGarbageCollector interface {
GarbageCollect(ctx context.Context) error
}
// SessionGarbageCollector is the interface the scheduler's sessionGCLoop
// invokes once per CERTCTL_SESSION_GC_INTERVAL tick. Concrete impl is
// *session.Service. Sweeps expired post-login + pre-login session rows
// AND retired-past-retention signing-key rows. Auth Bundle 2 Phase 4.
type SessionGarbageCollector interface {
GarbageCollect(ctx context.Context) (int, error)
}
// BCLReplayGarbageCollector sweeps expired rows from the BCL consumed-jti
// table. Audit 2026-05-10 HIGH-3 closure — the scheduler invokes this
// alongside the session-GC tick so a single ticker drives both. Concrete
// impl is repository.BCLReplayRepository.SweepExpired.
type BCLReplayGarbageCollector interface {
SweepExpired(ctx context.Context, now time.Time) (int, error)
}
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
type JobReaperService interface {
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
// Bundle C / Audit M-016 (CWE-754): closes the gap left by ReapTimedOutJobs
// (which only handles AwaitingCSR / AwaitingApproval). Jobs in Running
// status whose owning agent has been silent for longer than agentTTL get
// transitioned to Failed with reason "agent_offline" so I-001's retry
// loop can re-queue them on a healthy agent.
ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error
}
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
// and notification processing.
type Scheduler struct {
renewalService RenewalServicer
jobService JobServicer
agentService AgentServicer
notificationService NotificationServicer
networkScanService NetworkScanServicer
digestService DigestServicer
healthCheckService HealthCheckServicer
cloudDiscoveryService CloudDiscoveryServicer
crlCacheService CRLCacheServicer
acmeGC ACMEGarbageCollector
sessionGC SessionGarbageCollector
bclReplayGC BCLReplayGarbageCollector
jobReaper JobReaperService
logger *slog.Logger
// Configurable tick intervals
renewalCheckInterval time.Duration
jobProcessorInterval time.Duration
jobRetryInterval time.Duration
agentHealthCheckInterval time.Duration
notificationProcessInterval time.Duration
notificationRetryInterval time.Duration
shortLivedExpiryCheckInterval time.Duration
networkScanInterval time.Duration
digestInterval time.Duration
healthCheckInterval time.Duration
cloudDiscoveryInterval time.Duration
crlGenerationInterval time.Duration
jobTimeoutInterval time.Duration
acmeGCInterval time.Duration
sessionGCInterval time.Duration
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
agentOfflineJobTTL time.Duration
awaitingCSRTimeout time.Duration
awaitingApprovalTimeout time.Duration
// Idempotency guards: prevent duplicate execution of slow jobs
renewalCheckRunning atomic.Bool
jobProcessorRunning atomic.Bool
jobRetryRunning atomic.Bool
agentHealthCheckRunning atomic.Bool
notificationProcessRunning atomic.Bool
notificationRetryRunning atomic.Bool
shortLivedExpiryCheckRunning atomic.Bool
networkScanRunning atomic.Bool
digestRunning atomic.Bool
healthCheckRunning atomic.Bool
cloudDiscoveryRunning atomic.Bool
crlGenerationRunning atomic.Bool
jobTimeoutRunning atomic.Bool
acmeGCRunning atomic.Bool
sessionGCRunning atomic.Bool
// Graceful shutdown: wait for in-flight work to complete
wg sync.WaitGroup
}
// NewScheduler creates a new scheduler with configurable intervals.
func NewScheduler(
renewalService RenewalServicer,
jobService JobServicer,
agentService AgentServicer,
notificationService NotificationServicer,
networkScanService NetworkScanServicer,
logger *slog.Logger,
) *Scheduler {
return &Scheduler{
renewalService: renewalService,
jobService: jobService,
agentService: agentService,
notificationService: notificationService,
networkScanService: networkScanService,
logger: logger,
// Default intervals
renewalCheckInterval: 1 * time.Hour,
jobProcessorInterval: 30 * time.Second,
jobRetryInterval: 5 * time.Minute,
agentHealthCheckInterval: 2 * time.Minute,
notificationProcessInterval: 1 * time.Minute,
notificationRetryInterval: 2 * time.Minute,
shortLivedExpiryCheckInterval: 30 * time.Second,
networkScanInterval: 6 * time.Hour,
digestInterval: 24 * time.Hour,
healthCheckInterval: 60 * time.Second,
cloudDiscoveryInterval: 6 * time.Hour,
crlGenerationInterval: 1 * time.Hour,
jobTimeoutInterval: 10 * time.Minute,
acmeGCInterval: 1 * time.Minute,
sessionGCInterval: 1 * time.Hour,
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
// must miss multiple heartbeats before its in-flight jobs are reaped.
agentOfflineJobTTL: 5 * time.Minute,
}
}
// SetDigestService sets the digest service for the 7th scheduler loop.
// Called after construction since digest is optional.
func (s *Scheduler) SetDigestService(ds DigestServicer) {
s.digestService = ds
}
// SetDigestInterval configures the interval for digest email processing.
func (s *Scheduler) SetDigestInterval(d time.Duration) {
s.digestInterval = d
}
// SetRenewalCheckInterval configures the interval for renewal checks.
func (s *Scheduler) SetRenewalCheckInterval(d time.Duration) {
s.renewalCheckInterval = d
}
// SetJobProcessorInterval configures the interval for job processing.
func (s *Scheduler) SetJobProcessorInterval(d time.Duration) {
s.jobProcessorInterval = d
}
// SetJobRetryInterval configures the interval for the failed-job retry loop
// (coverage gap I-001). Defaults to 5 minutes; honors
// CERTCTL_SCHEDULER_RETRY_INTERVAL when wired from config.
func (s *Scheduler) SetJobRetryInterval(d time.Duration) {
s.jobRetryInterval = d
}
// SetAgentHealthCheckInterval configures the interval for agent health checks.
func (s *Scheduler) SetAgentHealthCheckInterval(d time.Duration) {
s.agentHealthCheckInterval = d
}
// SetNotificationProcessInterval configures the interval for notification processing.
func (s *Scheduler) SetNotificationProcessInterval(d time.Duration) {
s.notificationProcessInterval = d
}
// SetNotificationRetryInterval configures the interval for the failed-notification
// retry sweep (coverage gap I-005). Defaults to 2 minutes; honors
// CERTCTL_NOTIFICATION_RETRY_INTERVAL when wired from config.
func (s *Scheduler) SetNotificationRetryInterval(d time.Duration) {
s.notificationRetryInterval = d
}
// SetNetworkScanInterval configures the interval for network scanning.
func (s *Scheduler) SetNetworkScanInterval(d time.Duration) {
s.networkScanInterval = d
}
// SetShortLivedExpiryCheckInterval configures the interval for short-lived certificate expiry checks.
func (s *Scheduler) SetShortLivedExpiryCheckInterval(d time.Duration) {
s.shortLivedExpiryCheckInterval = d
}
// SetHealthCheckService sets the health check service for the 8th scheduler loop.
// Called after construction since health monitoring is optional.
func (s *Scheduler) SetHealthCheckService(hcs HealthCheckServicer) {
s.healthCheckService = hcs
}
// SetHealthCheckInterval configures the interval for endpoint TLS health checks.
func (s *Scheduler) SetHealthCheckInterval(d time.Duration) {
s.healthCheckInterval = d
}
// SetCloudDiscoveryService sets the cloud discovery service for the 9th scheduler loop.
// Called after construction since cloud discovery is optional.
func (s *Scheduler) SetCloudDiscoveryService(cds CloudDiscoveryServicer) {
s.cloudDiscoveryService = cds
}
// SetCloudDiscoveryInterval configures the interval for cloud secret manager discovery.
func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
s.cloudDiscoveryInterval = d
}
// SetCRLCacheService sets the CRL cache service for the crlGenerationLoop.
// Called after construction since the loop is optional — when this is
// unset, no pre-generation happens and HTTP CRL fetches go through the
// on-demand path.
//
// Bundle CRL/OCSP-Responder Phase 3.
func (s *Scheduler) SetCRLCacheService(svc CRLCacheServicer) {
s.crlCacheService = svc
}
// SetCRLGenerationInterval configures the interval at which the
// scheduler regenerates CRLs into the crl_cache table. Default 1h
// (matches relying-party CRL refresh expectations under RFC 5280).
// Operators with chatty fleets can shorten; operators with bandwidth
// constraints can lengthen as long as nextUpdate stays comfortably in
// the future per generation.
//
// Zero or negative values are ignored.
func (s *Scheduler) SetCRLGenerationInterval(d time.Duration) {
if d <= 0 {
return
}
s.crlGenerationInterval = d
}
// SetJobReaperService sets the job reaper service (I-003).
func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
s.jobReaper = jr
}
// SetACMEGarbageCollector wires the ACME GC service. Phase 5 — when
// non-nil, an acmeGCLoop runs every acmeGCInterval and sweeps expired
// nonces / authzs / orders. Optional: leaving nil disables the loop
// (legacy behavior pre-Phase-5).
func (s *Scheduler) SetACMEGarbageCollector(gc ACMEGarbageCollector) {
s.acmeGC = gc
}
// SetACMEGCInterval configures the interval at which the ACME GC sweep
// runs. Default 1m. Operators with quiet fleets can lengthen to 5m;
// operators expecting nonce-storms can shorten to 30s. Zero or
// negative values are ignored.
func (s *Scheduler) SetACMEGCInterval(d time.Duration) {
if d <= 0 {
return
}
s.acmeGCInterval = d
}
// SetSessionGarbageCollector wires the Auth Bundle 2 Phase 4 session GC
// service. Optional; nil disables the loop (Bundle-2-disabled deployments
// still run pre-Phase-4 behavior).
func (s *Scheduler) SetSessionGarbageCollector(gc SessionGarbageCollector) {
s.sessionGC = gc
}
// SetBCLReplayGarbageCollector wires the BCL consumed-jti GC. Audit
// 2026-05-10 HIGH-3 closure. The sweep runs on the same ticker as the
// session GC loop (no separate interval; replay rows are short-lived).
func (s *Scheduler) SetBCLReplayGarbageCollector(gc BCLReplayGarbageCollector) {
s.bclReplayGC = gc
}
// SetSessionGCInterval configures the interval at which the session GC
// sweep runs. Default 1h. Wire: CERTCTL_SESSION_GC_INTERVAL. Zero or
// negative values are ignored.
func (s *Scheduler) SetSessionGCInterval(d time.Duration) {
if d <= 0 {
return
}
s.sessionGCInterval = d
}
// SetAgentOfflineJobTTL sets the threshold past which a Running job whose
// owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016.
// Zero or negative values are ignored (the default of 5 minutes is kept).
func (s *Scheduler) SetAgentOfflineJobTTL(d time.Duration) {
if d <= 0 {
return
}
s.agentOfflineJobTTL = d
}
// SetJobTimeoutInterval sets the job timeout reaper tick interval (I-003).
func (s *Scheduler) SetJobTimeoutInterval(d time.Duration) {
s.jobTimeoutInterval = d
}
// SetAwaitingCSRTimeout sets the AwaitingCSR TTL (I-003).
func (s *Scheduler) SetAwaitingCSRTimeout(d time.Duration) {
s.awaitingCSRTimeout = d
}
// SetAwaitingApprovalTimeout sets the AwaitingApproval TTL (I-003).
func (s *Scheduler) SetAwaitingApprovalTimeout(d time.Duration) {
s.awaitingApprovalTimeout = d
}
// Start initiates all background scheduler loops. It returns a channel that signals
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
startedChan := make(chan struct{})
go func() {
s.logger.Info("scheduler starting")
// Track all loop goroutines in the WaitGroup so WaitForCompletion
// blocks until they've fully exited (prevents test races).
// Base count is 8: renewal, job processor, job retry (I-001),
// job timeout (I-003), agent health, notification, notification retry
// (I-005), short-lived expiry. Optional loops (network scan, digest,
// health check, cloud discovery) add to this.
loopCount := 8
if s.networkScanService != nil {
loopCount++
}
if s.digestService != nil {
loopCount++
}
if s.healthCheckService != nil {
loopCount++
}
if s.cloudDiscoveryService != nil {
loopCount++
}
if s.crlCacheService != nil {
loopCount++
}
if s.acmeGC != nil {
loopCount++
}
if s.sessionGC != nil {
loopCount++
}
s.wg.Add(loopCount)
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobProcessorLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobRetryLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobTimeoutLoop(ctx) }()
go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationRetryLoop(ctx) }()
go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }()
if s.networkScanService != nil {
go func() { defer s.wg.Done(); s.networkScanLoop(ctx) }()
}
if s.digestService != nil {
go func() { defer s.wg.Done(); s.digestLoop(ctx) }()
}
if s.healthCheckService != nil {
go func() { defer s.wg.Done(); s.healthCheckLoop(ctx) }()
}
if s.cloudDiscoveryService != nil {
go func() { defer s.wg.Done(); s.cloudDiscoveryLoop(ctx) }()
}
if s.crlCacheService != nil {
go func() { defer s.wg.Done(); s.crlGenerationLoop(ctx) }()
}
if s.acmeGC != nil {
go func() { defer s.wg.Done(); s.acmeGCLoop(ctx) }()
}
if s.sessionGC != nil {
go func() { defer s.wg.Done(); s.sessionGCLoop(ctx) }()
}
// Signal that all loops are launched
close(startedChan)
// Wait for context cancellation
<-ctx.Done()
s.logger.Info("scheduler shutting down", "reason", ctx.Err())
}()
return startedChan
}
// renewalCheckLoop runs every renewalCheckInterval and checks for expiring certificates.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) renewalCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.renewalCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.renewalCheckRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.renewalCheckRunning.Store(false)
s.runRenewalCheck(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.renewalCheckRunning.CompareAndSwap(false, true) {
s.logger.Warn("renewal check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.renewalCheckRunning.Store(false)
s.runRenewalCheck(ctx)
}()
}
}
}
// runRenewalCheck executes a single renewal check with error recovery.
func (s *Scheduler) runRenewalCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.renewalService.CheckExpiringCertificates(opCtx); err != nil {
s.logger.Error("renewal check failed",
"error", err,
"interval", s.renewalCheckInterval.String())
} else {
s.logger.Debug("renewal check completed")
}
}
// jobProcessorLoop runs every jobProcessorInterval and processes pending jobs.
// It picks up pending jobs, executes them, and handles the results.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous job is still running.
func (s *Scheduler) jobProcessorLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.jobProcessorInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobProcessorRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobProcessorRunning.Store(false)
s.runJobProcessor(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobProcessorRunning.CompareAndSwap(false, true) {
s.logger.Warn("job processor still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobProcessorRunning.Store(false)
s.runJobProcessor(ctx)
}()
}
}
}
// runJobProcessor executes a single job processing cycle with error recovery.
func (s *Scheduler) runJobProcessor(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.jobService.ProcessPendingJobs(opCtx); err != nil {
s.logger.Error("job processor failed",
"error", err,
"interval", s.jobProcessorInterval.String())
} else {
s.logger.Debug("job processor completed")
}
}
// jobRetryLoop runs every jobRetryInterval and transitions eligible Failed jobs
// back to Pending so the job processor can pick them up again. Closes coverage
// gap I-001 — JobService.RetryFailedJobs had no runtime caller prior to this
// loop being wired. Runs immediately on start, then every interval.
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
// is still running.
func (s *Scheduler) jobRetryLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.jobRetryInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobRetryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobRetryRunning.Store(false)
s.runJobRetry(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobRetryRunning.CompareAndSwap(false, true) {
s.logger.Warn("job retry still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobRetryRunning.Store(false)
s.runJobRetry(ctx)
}()
}
}
}
// runJobRetry executes a single failed-job retry cycle with error recovery.
// Uses the same 2-minute per-tick timeout as runJobProcessor; RetryFailedJobs
// issues one SELECT and one UPDATE per eligible job (cheap), so this headroom
// covers very large failure backlogs without starving the loop.
func (s *Scheduler) runJobRetry(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
// maxRetries is advisory at the service layer (per-job gating uses each
// job's own Attempts/MaxAttempts). Passing 3 matches the conventional
// default seen across the codebase's job creation paths.
if err := s.jobService.RetryFailedJobs(opCtx, 3); err != nil {
s.logger.Error("job retry failed",
"error", err,
"interval", s.jobRetryInterval.String())
} else {
s.logger.Debug("job retry completed")
}
}
// jobTimeoutLoop runs every jobTimeoutInterval and transitions jobs stuck in
// AwaitingCSR or AwaitingApproval to Failed if they exceed their TTL. I-001's
// retry loop then auto-promotes eligible Failed jobs back to Pending. Closes
// coverage gap I-003. Uses atomic.Bool to prevent duplicate execution.
func (s *Scheduler) jobTimeoutLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.jobTimeoutInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobTimeoutRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobTimeoutRunning.Store(false)
s.runJobTimeout(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobTimeoutRunning.CompareAndSwap(false, true) {
s.logger.Warn("job timeout reaper still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobTimeoutRunning.Store(false)
s.runJobTimeout(ctx)
}()
}
}
}
// runJobTimeout executes a single job timeout reaping cycle with error recovery.
// When no JobReaperService has been wired (e.g. in tests that don't exercise
// I-003) the call is a safe no-op, preserving the always-on loop topology
// described in I-003 without forcing every consumer to wire a reaper.
//
// Bundle C / Audit M-016: the reaping cycle now has TWO arms:
//
// 1. ReapTimedOutJobs handles AwaitingCSR / AwaitingApproval timeouts (I-003).
// 2. ReapJobsWithOfflineAgents handles Running jobs whose owning agent has
// gone silent (M-016). Reuses the same agentHealthCheckTimeout as the
// mark-stale-agents-offline path for consistency: if the agent is judged
// offline by AgentService.MarkStaleAgentsOffline, its in-flight jobs
// should be reaped on the same cadence.
func (s *Scheduler) runJobTimeout(ctx context.Context) {
if s.jobReaper == nil {
return
}
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.jobReaper.ReapTimedOutJobs(opCtx, s.awaitingCSRTimeout, s.awaitingApprovalTimeout); err != nil {
s.logger.Error("job timeout reaper failed",
"error", err,
"interval", s.jobTimeoutInterval.String())
} else {
s.logger.Debug("job timeout reaper completed")
}
// Second arm: offline-agent reaper. Uses agentOfflineTimeout (defaults to
// 5 minutes — same value the agent-health-check path uses to flip an
// agent to Offline). A sensible default of 5×agentHealthCheckInterval
// catches agents that miss multiple consecutive heartbeats while leaving
// a single missed beat as a transient blip that does NOT reap.
offlineCtx, offlineCancel := context.WithTimeout(ctx, 2*time.Minute)
defer offlineCancel()
if err := s.jobReaper.ReapJobsWithOfflineAgents(offlineCtx, s.agentOfflineJobTTL); err != nil {
s.logger.Error("offline-agent job reaper failed",
"error", err,
"agent_offline_ttl", s.agentOfflineJobTTL.String())
} else {
s.logger.Debug("offline-agent job reaper completed")
}
}
// agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline.
// An agent is considered stale if it hasn't sent a heartbeat within the health check interval.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.agentHealthCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.agentHealthCheckRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.agentHealthCheckRunning.Store(false)
s.runAgentHealthCheck(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.agentHealthCheckRunning.CompareAndSwap(false, true) {
s.logger.Warn("agent health check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.agentHealthCheckRunning.Store(false)
s.runAgentHealthCheck(ctx)
}()
}
}
}
// runAgentHealthCheck executes a single agent health check with error recovery.
func (s *Scheduler) runAgentHealthCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
if err := s.agentService.MarkStaleAgentsOffline(opCtx, s.agentHealthCheckInterval); err != nil {
s.logger.Error("agent health check failed",
"error", err,
"interval", s.agentHealthCheckInterval.String())
} else {
s.logger.Debug("agent health check completed")
}
}
// notificationProcessLoop runs every notificationProcessInterval and processes pending notifications.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous process is still running.
func (s *Scheduler) notificationProcessLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.notificationProcessInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.notificationProcessRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationProcessRunning.Store(false)
s.runNotificationProcess(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.notificationProcessRunning.CompareAndSwap(false, true) {
s.logger.Warn("notification processor still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationProcessRunning.Store(false)
s.runNotificationProcess(ctx)
}()
}
}
}
// runNotificationProcess executes a single notification processing cycle with error recovery.
func (s *Scheduler) runNotificationProcess(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
if err := s.notificationService.ProcessPendingNotifications(opCtx); err != nil {
s.logger.Error("notification processor failed",
"error", err,
"interval", s.notificationProcessInterval.String())
} else {
s.logger.Debug("notification processor completed")
}
}
// notificationRetryLoop runs every notificationRetryInterval and transitions
// eligible Failed notifications back to Pending so the notification processor
// can pick them up again. Closes coverage gap I-005 — NotificationService.
// RetryFailedNotifications had no runtime caller prior to this loop being
// wired. Runs immediately on start, then every interval.
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
// is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte.
func (s *Scheduler) notificationRetryLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.notificationRetryInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.notificationRetryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationRetryRunning.Store(false)
s.runNotificationRetry(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.notificationRetryRunning.CompareAndSwap(false, true) {
s.logger.Warn("notification retry still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationRetryRunning.Store(false)
s.runNotificationRetry(ctx)
}()
}
}
}
// runNotificationRetry executes a single failed-notification retry cycle with
// error recovery. Uses a 2-minute per-tick timeout matching runJobRetry;
// RetryFailedNotifications issues one SELECT and one UPDATE per eligible row
// (cheap), so this headroom covers very large failure backlogs without
// starving the loop. The service layer swallows per-row send errors (mirrors
// ProcessPendingNotifications) and only returns the List error from the
// initial ListRetryEligible call.
func (s *Scheduler) runNotificationRetry(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.notificationService.RetryFailedNotifications(opCtx); err != nil {
s.logger.Error("notification retry failed",
"error", err,
"interval", s.notificationRetryInterval.String())
} else {
s.logger.Debug("notification retry completed")
}
}
// shortLivedExpiryCheckLoop runs every shortLivedExpiryCheckInterval and marks expired
// short-lived certificates. For certs with TTL < 1 hour, expiry IS revocation —
// no CRL/OCSP needed.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.shortLivedExpiryCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.shortLivedExpiryCheckRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.shortLivedExpiryCheckRunning.Store(false)
s.runShortLivedExpiryCheck(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.shortLivedExpiryCheckRunning.CompareAndSwap(false, true) {
s.logger.Warn("short-lived expiry check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.shortLivedExpiryCheckRunning.Store(false)
s.runShortLivedExpiryCheck(ctx)
}()
}
}
}
// runShortLivedExpiryCheck executes a single short-lived expiry check with error recovery.
func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := s.renewalService.ExpireShortLivedCertificates(opCtx); err != nil {
s.logger.Error("short-lived expiry check failed",
"error", err,
"interval", s.shortLivedExpiryCheckInterval.String())
} else {
s.logger.Debug("short-lived expiry check completed")
}
}
// networkScanLoop runs every networkScanInterval and performs active TLS scanning
// of configured network targets.
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
func (s *Scheduler) networkScanLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.networkScanInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.networkScanRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.networkScanRunning.Store(false)
s.runNetworkScan(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.networkScanRunning.CompareAndSwap(false, true) {
s.logger.Warn("network scan still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.networkScanRunning.Store(false)
s.runNetworkScan(ctx)
}()
}
}
}
// runNetworkScan executes a single network scan cycle with error recovery.
func (s *Scheduler) runNetworkScan(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
if err := s.networkScanService.ScanAllTargets(opCtx); err != nil {
s.logger.Error("network scan failed",
"error", err,
"interval", s.networkScanInterval.String())
} else {
s.logger.Debug("network scan completed")
}
}
// digestLoop runs every digestInterval and generates/sends certificate digest emails.
// Uses atomic.Bool to prevent duplicate execution if the previous digest is still running.
func (s *Scheduler) digestLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.digestInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Do NOT run immediately on start for digest — wait for the first tick.
// Digests are infrequent (24h default) and shouldn't fire on every restart.
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.digestRunning.CompareAndSwap(false, true) {
s.logger.Warn("digest processor still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.digestRunning.Store(false)
s.runDigest(ctx)
}()
}
}
}
// runDigest executes a single digest processing cycle with error recovery.
func (s *Scheduler) runDigest(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.digestService.ProcessDigest(opCtx); err != nil {
s.logger.Error("digest processor failed",
"error", err,
"interval", s.digestInterval.String())
} else {
s.logger.Debug("digest processor completed")
}
}
// healthCheckLoop runs every healthCheckInterval and performs endpoint TLS health checks.
// Do NOT run immediately on start — health checks are frequent (60s default) and may be
// resource-intensive. Wait for the first tick.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) healthCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.healthCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Do NOT run immediately on start for health checks — wait for the first tick.
// Health checks are frequent and shouldn't fire on every restart.
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.healthCheckRunning.CompareAndSwap(false, true) {
s.logger.Debug("health check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.healthCheckRunning.Store(false)
s.runHealthCheck(ctx)
}()
}
}
}
// runHealthCheck executes a single health check cycle with error recovery.
func (s *Scheduler) runHealthCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.healthCheckService.RunHealthChecks(opCtx); err != nil {
s.logger.Error("health check run failed",
"error", err,
"interval", s.healthCheckInterval.String())
} else {
s.logger.Debug("health check completed")
}
}
// cloudDiscoveryLoop runs every cloudDiscoveryInterval and discovers certificates from cloud secret managers.
// Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop.
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.cloudDiscoveryInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.cloudDiscoveryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.cloudDiscoveryRunning.Store(false)
s.runCloudDiscovery(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.cloudDiscoveryRunning.CompareAndSwap(false, true) {
s.logger.Warn("cloud discovery still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.cloudDiscoveryRunning.Store(false)
s.runCloudDiscovery(ctx)
}()
}
}
}
// runCloudDiscovery executes a single cloud discovery cycle with error recovery.
func (s *Scheduler) runCloudDiscovery(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
total, errs := s.cloudDiscoveryService.DiscoverAll(opCtx)
if len(errs) > 0 {
s.logger.Error("cloud discovery completed with errors",
"certificates_found", total,
"errors", len(errs),
"interval", s.cloudDiscoveryInterval.String())
for _, err := range errs {
if !errors.Is(err, context.Canceled) {
s.logger.Error("cloud discovery error", "error", err)
}
}
} else {
s.logger.Debug("cloud discovery completed",
"certificates_found", total)
}
}
// WaitForCompletion waits for all in-flight scheduler work to complete.
// It respects the provided timeout and returns an error if work is still in progress after timeout.
// Call this after the scheduler context has been cancelled to ensure graceful shutdown.
func (s *Scheduler) WaitForCompletion(timeout time.Duration) error {
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
s.logger.Info("all scheduler work completed")
return nil
case <-time.After(timeout):
s.logger.Warn("scheduler work did not complete within timeout", "timeout", timeout.String())
return ErrSchedulerShutdownTimeout
}
}
// crlGenerationLoop periodically pre-generates CRLs into crl_cache so
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint can serve from
// cache rather than regenerating per request. Mirrors the digestLoop
// shape: ticker, atomic.Bool guard for re-entry, WaitGroup integration
// for graceful shutdown.
//
// Bundle CRL/OCSP-Responder Phase 3.
func (s *Scheduler) crlGenerationLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.crlGenerationInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Do NOT run immediately on start. CRLs are typically valid for
// many hours; firing on every restart wastes work. The first tick
// arrives after one interval; on cache miss the HTTP handler
// triggers an immediate generation via the cache service.
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.crlGenerationRunning.CompareAndSwap(false, true) {
s.logger.Warn("CRL pre-generation still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.crlGenerationRunning.Store(false)
s.runCRLGeneration(ctx)
}()
}
}
}
// runCRLGeneration executes a single CRL pre-generation cycle with
// error recovery. Per-issuer failures inside RegenerateAll are logged
// + audited by the cache service itself; this wrapper only reports the
// outer context shape and bumps a metric (when wired).
func (s *Scheduler) runCRLGeneration(ctx context.Context) {
// 5-minute timeout: the per-issuer generation is fast (sub-second
// for most CAs), but the loop walks every issuer that supports
// CRL. Bound the total cycle so a stuck issuer cannot block the
// next tick.
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
s.crlCacheService.RegenerateAll(opCtx)
}
// ErrSchedulerShutdownTimeout is returned when scheduler graceful shutdown times out.
var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeout")
// acmeGCLoop runs every acmeGCInterval and invokes ACMEGarbageCollector.
// Per the project's scheduler-idempotency architecture decision: an
// atomic.Bool guard prevents concurrent tick execution; the
// sync.WaitGroup tracks the in-flight goroutine for graceful shutdown.
// Phase 5.
func (s *Scheduler) acmeGCLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.acmeGCInterval, DefaultSchedulerJitter)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.acmeGCRunning.CompareAndSwap(false, true) {
s.logger.Warn("ACME GC sweep still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.acmeGCRunning.Store(false)
// 1-minute timeout per sweep — the per-statement work is
// cheap (single DELETE / UPDATE per sweep, all on indexed
// columns), but bound the cycle so a stuck Postgres can't
// block the next tick.
opCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := s.acmeGC.GarbageCollect(opCtx); err != nil {
s.logger.Warn("acme gc sweep failed (next tick will retry)", "error", err)
}
}()
}
}
}
// sessionGCLoop runs every sessionGCInterval and invokes
// SessionGarbageCollector.GarbageCollect, which sweeps:
// - sessions whose absolute_expires_at is in the past (post-login expired);
// - pre-login session rows older than 10 minutes;
// - retired-past-retention session_signing_keys rows.
//
// Auth Bundle 2 Phase 4. The atomic.Bool guard + the per-tick
// context.WithTimeout match the pattern of every other loop in this
// file: a stuck Postgres can't block the next tick, and concurrent
// sweeps are skipped not queued.
func (s *Scheduler) sessionGCLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.sessionGCInterval, DefaultSchedulerJitter)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.sessionGCRunning.CompareAndSwap(false, true) {
s.logger.Warn("session GC sweep still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.sessionGCRunning.Store(false)
opCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if _, err := s.sessionGC.GarbageCollect(opCtx); err != nil {
s.logger.Warn("session gc sweep failed (next tick will retry)", "error", err)
}
// Audit 2026-05-10 HIGH-3 — sweep expired BCL consumed-jti
// rows on the same tick. Best-effort; failure logs at WARN
// (the next tick retries).
if s.bclReplayGC != nil {
if n, err := s.bclReplayGC.SweepExpired(opCtx, time.Now().UTC()); err != nil {
s.logger.Warn("bcl replay gc sweep failed (next tick will retry)", "error", err)
} else if n > 0 {
s.logger.Debug("bcl replay gc swept rows", "rows", n)
}
}
}()
}
}
}