Files
certctl/internal/scheduler/scheduler.go
T
shankar0123 663b14bfd8 feat(retention): COMP-002-RETENTION — federated-user PII purge pipeline
Sprint 6 closure of the audit's MED-severity COMP-002-RETENTION
finding.

Pre-fix posture: the federated-user admin surface
(auth_users.go::Deactivate) sets users.deactivated_at on soft-delete,
but the PII columns (email, display_name, oidc_subject) stay
populated forever. No in-code primitive for GDPR right-to-be-
forgotten; no scheduled retention purge.

This commit ships the audit's recommended two-phase fix:

  Phase 1 — operator-callable scrub primitive
    internal/service/user_retention.go
      UserRetentionService.DeleteUserPII(ctx, userID):
        - revoke all active sessions (defense-in-depth)
        - email := 'purged@redacted.local'
        - display_name := '[purged]'
        - oidc_subject := 'sha256:' || hex(sha256(original))
        - audit_events row with action=user.purge_pii,
          category=auth, actor=system

      Why hash oidc_subject instead of NULL:
        1. (oidc_provider_id, oidc_subject) UNIQUE constraint would
           trip on multiple purged users converging to NULL
        2. The hash is one-way; the original IdP-side identifier is
           unrecoverable. Re-login under the same subject mints a
           fresh u-id (right-to-be-forgotten semantics)
        3. Forensic continuity: an operator can recompute
           sha256(<known-subject>) and confirm "this user was
           deactivated then purged"

      users.id itself is preserved so historical
      audit_events.actor = u-X rows still resolve. The forensic-
      attribution chain stays intact even after the PII is gone.

  Phase 2 — scheduled batch purge
    internal/scheduler/scheduler.go
      UserRetentionPurger interface + userRetentionLoop:
        - PurgeDeactivatedUsers enumerates every user with
          deactivated_at < NOW() - retention_window
        - DeleteUserPII per row
        - per-tick batch cap (default 200) keeps blast radius
          predictable; large backlogs spread across multiple ticks
        - atomic.Bool guard + 5-min per-tick context.WithTimeout

    Repository contract grew a single new method:
      internal/repository/user.go::ListDeactivatedBefore(ctx, t)
      internal/repository/postgres/user.go: SQL-side filter
      (deactivated_at IS NOT NULL AND deactivated_at < $1)
      ORDER BY deactivated_at ASC, cross-tenant.

  Configuration
    CERTCTL_USER_RETENTION_INTERVAL   default 24h
    CERTCTL_USER_RETENTION_WINDOW     default 30 days
    CERTCTL_USER_RETENTION_BATCH_CAP  default 200

  Test stub additions for repository.UserRepository.ListDeactivatedBefore:
    internal/auth/oidc/service_test.go::stubUsers
    internal/api/handler/auth_users_test.go::stubFullUserRepo
    internal/api/handler/auth_session_oidc_test.go::stubUserRepo

  Documentation
    docs/operator/privacy-and-retention.md
      - retention pipeline diagram (day-0 deactivate → day-N purge)
      - operator config table
      - verification runbook (4 steps with SQL)
      - what's NOT covered (deferred: DSAR export, api_keys cascade,
        retroactive audit_events.details redaction)

  Tests
    internal/service/user_retention_test.go (NEW, 4 tests):
      TestDeleteUserPII_ScrubsAndRevokes
      TestDeleteUserPII_IsIdempotent
      TestPurgeDeactivatedUsers_RespectsWindow
      TestPurgeDeactivatedUsers_BatchCap

Verified locally:
  go vet ./...                                   (clean)
  gofmt -l internal/ cmd/                        (clean)
  go test -short -count=1 \
    ./internal/service/... ./internal/scheduler/... ./internal/config/...
    (all green)

Cross-sprint interaction: pairs with COMP-001-HASH (prior commit).
The user.purge_pii audit row this service emits flows through the
new hash chain, so the scrub event is itself tamper-evident.

Closes COMP-002-RETENTION. Sprint 6 is complete (2/2 findings).
2026-05-16 06:18:39 +00:00

1586 lines
56 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package scheduler
import (
"context"
"errors"
"log/slog"
"sync"
"sync/atomic"
"time"
)
// RenewalServicer defines the interface for renewal operations used by the scheduler.
type RenewalServicer interface {
CheckExpiringCertificates(ctx context.Context) error
ExpireShortLivedCertificates(ctx context.Context) error
}
// JobServicer defines the interface for job processing used by the scheduler.
//
// RetryFailedJobs was added to close coverage gap I-001: JobService.RetryFailedJobs
// existed and was unit-tested but had no runtime caller prior to this loop being
// wired. The scheduler now drives it on an independent tick so failed jobs whose
// attempt counter is below MaxAttempts are periodically reset to Pending for the
// job processor to pick up again. maxRetries is advisory (per-job gating uses
// each job's own Attempts/MaxAttempts fields).
type JobServicer interface {
ProcessPendingJobs(ctx context.Context) error
RetryFailedJobs(ctx context.Context, maxRetries int) error
}
// AgentServicer defines the interface for agent health checks used by the scheduler.
type AgentServicer interface {
MarkStaleAgentsOffline(ctx context.Context, interval time.Duration) error
}
// NotificationServicer defines the interface for notification processing used by the scheduler.
//
// RetryFailedNotifications was added to close coverage gap I-005: the retry
// sweep transitions eligible Failed notifications to Pending on an independent
// tick, using exponential backoff with a 1h cap and a 5-attempt DLQ budget.
// Mirrors the I-001 job retry loop topology.
type NotificationServicer interface {
ProcessPendingNotifications(ctx context.Context) error
RetryFailedNotifications(ctx context.Context) error
}
// NetworkScanServicer defines the interface for network scanning used by the scheduler.
type NetworkScanServicer interface {
ScanAllTargets(ctx context.Context) error
}
// DigestServicer defines the interface for digest email processing used by the scheduler.
type DigestServicer interface {
ProcessDigest(ctx context.Context) error
}
// HealthCheckServicer defines the interface for endpoint TLS health monitoring used by the scheduler.
type HealthCheckServicer interface {
RunHealthChecks(ctx context.Context) error
}
// CloudDiscoveryServicer defines the interface for cloud secret manager discovery used by the scheduler.
type CloudDiscoveryServicer interface {
DiscoverAll(ctx context.Context) (int, []error)
}
// CRLCacheServicer defines the interface for the scheduler's CRL
// pre-generation loop. RegenerateAll iterates every issuer that
// supports CRL signing and refreshes its crl_cache row. Per-issuer
// failures are logged + audited; a single bad issuer does not stop
// the others.
//
// Bundle CRL/OCSP-Responder Phase 3: the scheduler-driven cache lets
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint serve from cache
// instead of regenerating per request.
type CRLCacheServicer interface {
RegenerateAll(ctx context.Context)
}
// ACMEGarbageCollector is the interface the scheduler's acmeGCLoop
// invokes once per tick. The concrete implementation is *service.ACMEService.
// Phase 5 — sweeps expired nonces / authzs / orders.
type ACMEGarbageCollector interface {
GarbageCollect(ctx context.Context) error
}
// SessionGarbageCollector is the interface the scheduler's sessionGCLoop
// invokes once per CERTCTL_SESSION_GC_INTERVAL tick. Concrete impl is
// *session.Service. Sweeps expired post-login + pre-login session rows
// AND retired-past-retention signing-key rows. Auth Bundle 2 Phase 4.
type SessionGarbageCollector interface {
GarbageCollect(ctx context.Context) (int, error)
}
// BCLReplayGarbageCollector sweeps expired rows from the BCL consumed-jti
// table. Audit 2026-05-10 HIGH-3 closure — the scheduler invokes this
// alongside the session-GC tick so a single ticker drives both. Concrete
// impl is repository.BCLReplayRepository.SweepExpired.
type BCLReplayGarbageCollector interface {
SweepExpired(ctx context.Context, now time.Time) (int, error)
}
// RateLimitGarbageCollector sweeps stale rows from the
// rate_limit_buckets table introduced in migration 000046. Phase 13
// Sprint 13.3 (ARCH-M1 closure completion) — wired only when
// CERTCTL_RATE_LIMIT_BACKEND=postgres. Concrete impl is
// *ratelimit.PostgresGC. Mirrors the ACMEGarbageCollector +
// SessionGarbageCollector contracts so the scheduler reuses the same
// atomic.Bool + WithTimeout + ticker pattern as the existing GC loops.
//
// Returns the row count to surface via observability logs (matches
// SessionGarbageCollector's shape — the operator wants to see
// "how many buckets did the sweep delete" in steady-state monitoring).
type RateLimitGarbageCollector interface {
GarbageCollect(ctx context.Context) (int64, error)
}
// AuditChainVerifier walks the audit_events per-row hash chain
// installed by migration 000047 (Sprint 6 COMP-001-HASH) and reports
// the first break it finds. The scheduler's auditChainVerifyLoop
// invokes this on a configurable cadence (default 6h) and increments
// the certctl_audit_chain_break_detected counter on any non-empty
// brokenAtID return — that counter is the operator-facing signal for
// tamper-evidence.
//
// Concrete impl is *postgres.AuditRepository, which delegates to the
// SQL function audit_events_verify_chain() shipped in the same
// migration. The function is STABLE plpgsql so the walk happens
// entirely server-side (no row-shipping to the application).
type AuditChainVerifier interface {
VerifyHashChain(ctx context.Context) (brokenAtID string, brokenAtPos int, rowCount int, err error)
}
// AuditChainBreakRecorder is the metric-side dependency for the
// audit-chain verify loop. Concrete impl is the
// *service.AuditChainCounter wired in cmd/server/main.go; tests use
// an in-memory implementation. The scheduler calls Inc() on a chain
// break + Observe(rowCount) on every walk so operators can see "we
// walked N rows and it was clean" in metrics.
type AuditChainBreakRecorder interface {
RecordBreak(brokenAtID string, brokenAtPos int)
RecordSuccess(rowCount int)
}
// UserRetentionPurger is the Sprint 6 COMP-002-RETENTION scheduler-side
// interface. Concrete impl is *service.UserRetentionService — it walks
// every user whose deactivated_at exceeds the retention window and
// scrubs PII columns (email / display_name / oidc_subject hash). The
// loop calls PurgeDeactivatedUsers on every CERTCTL_USER_RETENTION_INTERVAL
// tick. nil = loop is not wired (deployments that disable retention).
type UserRetentionPurger interface {
PurgeDeactivatedUsers(ctx context.Context) (purged, failed int, err error)
}
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
type JobReaperService interface {
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
// Bundle C / Audit M-016 (CWE-754): closes the gap left by ReapTimedOutJobs
// (which only handles AwaitingCSR / AwaitingApproval). Jobs in Running
// status whose owning agent has been silent for longer than agentTTL get
// transitioned to Failed with reason "agent_offline" so I-001's retry
// loop can re-queue them on a healthy agent.
ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error
}
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
// and notification processing.
type Scheduler struct {
renewalService RenewalServicer
jobService JobServicer
agentService AgentServicer
notificationService NotificationServicer
networkScanService NetworkScanServicer
digestService DigestServicer
healthCheckService HealthCheckServicer
cloudDiscoveryService CloudDiscoveryServicer
crlCacheService CRLCacheServicer
acmeGC ACMEGarbageCollector
sessionGC SessionGarbageCollector
bclReplayGC BCLReplayGarbageCollector
rateLimitGC RateLimitGarbageCollector
auditChainVerifier AuditChainVerifier
auditChainRecorder AuditChainBreakRecorder
userRetention UserRetentionPurger
jobReaper JobReaperService
logger *slog.Logger
// Configurable tick intervals
renewalCheckInterval time.Duration
jobProcessorInterval time.Duration
jobRetryInterval time.Duration
agentHealthCheckInterval time.Duration
notificationProcessInterval time.Duration
notificationRetryInterval time.Duration
shortLivedExpiryCheckInterval time.Duration
networkScanInterval time.Duration
digestInterval time.Duration
healthCheckInterval time.Duration
cloudDiscoveryInterval time.Duration
crlGenerationInterval time.Duration
jobTimeoutInterval time.Duration
acmeGCInterval time.Duration
sessionGCInterval time.Duration
rateLimitGCInterval time.Duration
auditChainVerifyInterval time.Duration
userRetentionInterval time.Duration
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
agentOfflineJobTTL time.Duration
awaitingCSRTimeout time.Duration
awaitingApprovalTimeout time.Duration
// Idempotency guards: prevent duplicate execution of slow jobs
renewalCheckRunning atomic.Bool
jobProcessorRunning atomic.Bool
jobRetryRunning atomic.Bool
agentHealthCheckRunning atomic.Bool
notificationProcessRunning atomic.Bool
notificationRetryRunning atomic.Bool
shortLivedExpiryCheckRunning atomic.Bool
networkScanRunning atomic.Bool
digestRunning atomic.Bool
healthCheckRunning atomic.Bool
cloudDiscoveryRunning atomic.Bool
crlGenerationRunning atomic.Bool
jobTimeoutRunning atomic.Bool
acmeGCRunning atomic.Bool
sessionGCRunning atomic.Bool
rateLimitGCRunning atomic.Bool
auditChainVerifyRunning atomic.Bool
userRetentionRunning atomic.Bool
// Graceful shutdown: wait for in-flight work to complete
wg sync.WaitGroup
}
// NewScheduler creates a new scheduler with configurable intervals.
func NewScheduler(
renewalService RenewalServicer,
jobService JobServicer,
agentService AgentServicer,
notificationService NotificationServicer,
networkScanService NetworkScanServicer,
logger *slog.Logger,
) *Scheduler {
return &Scheduler{
renewalService: renewalService,
jobService: jobService,
agentService: agentService,
notificationService: notificationService,
networkScanService: networkScanService,
logger: logger,
// Default intervals
renewalCheckInterval: 1 * time.Hour,
jobProcessorInterval: 30 * time.Second,
jobRetryInterval: 5 * time.Minute,
agentHealthCheckInterval: 2 * time.Minute,
notificationProcessInterval: 1 * time.Minute,
notificationRetryInterval: 2 * time.Minute,
shortLivedExpiryCheckInterval: 30 * time.Second,
networkScanInterval: 6 * time.Hour,
digestInterval: 24 * time.Hour,
healthCheckInterval: 60 * time.Second,
cloudDiscoveryInterval: 6 * time.Hour,
crlGenerationInterval: 1 * time.Hour,
jobTimeoutInterval: 10 * time.Minute,
acmeGCInterval: 1 * time.Minute,
sessionGCInterval: 1 * time.Hour,
rateLimitGCInterval: 5 * time.Minute,
// Sprint 6 COMP-001-HASH: chain walk is O(N) over audit_events
// (server-side plpgsql). 6h is a balance — quick enough to
// surface tampering within a working day, infrequent enough to
// not dominate a quiet fleet's DB load. Operators with huge
// audit tables can lengthen via CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL.
auditChainVerifyInterval: 6 * time.Hour,
// Sprint 6 COMP-002-RETENTION: user PII purge cadence. Default
// 24h — deactivated rows persist past the retention window
// (default 30d) only until the next tick, which is fine for
// GDPR-style "delete within reasonable time" expectations.
userRetentionInterval: 24 * time.Hour,
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
// must miss multiple heartbeats before its in-flight jobs are reaped.
agentOfflineJobTTL: 5 * time.Minute,
}
}
// SetDigestService sets the digest service for the 7th scheduler loop.
// Called after construction since digest is optional.
func (s *Scheduler) SetDigestService(ds DigestServicer) {
s.digestService = ds
}
// SetDigestInterval configures the interval for digest email processing.
func (s *Scheduler) SetDigestInterval(d time.Duration) {
s.digestInterval = d
}
// SetRenewalCheckInterval configures the interval for renewal checks.
func (s *Scheduler) SetRenewalCheckInterval(d time.Duration) {
s.renewalCheckInterval = d
}
// SetJobProcessorInterval configures the interval for job processing.
func (s *Scheduler) SetJobProcessorInterval(d time.Duration) {
s.jobProcessorInterval = d
}
// SetJobRetryInterval configures the interval for the failed-job retry loop
// (coverage gap I-001). Defaults to 5 minutes; honors
// CERTCTL_SCHEDULER_RETRY_INTERVAL when wired from config.
func (s *Scheduler) SetJobRetryInterval(d time.Duration) {
s.jobRetryInterval = d
}
// SetAgentHealthCheckInterval configures the interval for agent health checks.
func (s *Scheduler) SetAgentHealthCheckInterval(d time.Duration) {
s.agentHealthCheckInterval = d
}
// SetNotificationProcessInterval configures the interval for notification processing.
func (s *Scheduler) SetNotificationProcessInterval(d time.Duration) {
s.notificationProcessInterval = d
}
// SetNotificationRetryInterval configures the interval for the failed-notification
// retry sweep (coverage gap I-005). Defaults to 2 minutes; honors
// CERTCTL_NOTIFICATION_RETRY_INTERVAL when wired from config.
func (s *Scheduler) SetNotificationRetryInterval(d time.Duration) {
s.notificationRetryInterval = d
}
// SetNetworkScanInterval configures the interval for network scanning.
func (s *Scheduler) SetNetworkScanInterval(d time.Duration) {
s.networkScanInterval = d
}
// SetShortLivedExpiryCheckInterval configures the interval for short-lived certificate expiry checks.
func (s *Scheduler) SetShortLivedExpiryCheckInterval(d time.Duration) {
s.shortLivedExpiryCheckInterval = d
}
// SetHealthCheckService sets the health check service for the 8th scheduler loop.
// Called after construction since health monitoring is optional.
func (s *Scheduler) SetHealthCheckService(hcs HealthCheckServicer) {
s.healthCheckService = hcs
}
// SetHealthCheckInterval configures the interval for endpoint TLS health checks.
func (s *Scheduler) SetHealthCheckInterval(d time.Duration) {
s.healthCheckInterval = d
}
// SetCloudDiscoveryService sets the cloud discovery service for the 9th scheduler loop.
// Called after construction since cloud discovery is optional.
func (s *Scheduler) SetCloudDiscoveryService(cds CloudDiscoveryServicer) {
s.cloudDiscoveryService = cds
}
// SetCloudDiscoveryInterval configures the interval for cloud secret manager discovery.
func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
s.cloudDiscoveryInterval = d
}
// SetCRLCacheService sets the CRL cache service for the crlGenerationLoop.
// Called after construction since the loop is optional — when this is
// unset, no pre-generation happens and HTTP CRL fetches go through the
// on-demand path.
//
// Bundle CRL/OCSP-Responder Phase 3.
func (s *Scheduler) SetCRLCacheService(svc CRLCacheServicer) {
s.crlCacheService = svc
}
// SetCRLGenerationInterval configures the interval at which the
// scheduler regenerates CRLs into the crl_cache table. Default 1h
// (matches relying-party CRL refresh expectations under RFC 5280).
// Operators with chatty fleets can shorten; operators with bandwidth
// constraints can lengthen as long as nextUpdate stays comfortably in
// the future per generation.
//
// Zero or negative values are ignored.
func (s *Scheduler) SetCRLGenerationInterval(d time.Duration) {
if d <= 0 {
return
}
s.crlGenerationInterval = d
}
// SetJobReaperService sets the job reaper service (I-003).
func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
s.jobReaper = jr
}
// SetACMEGarbageCollector wires the ACME GC service. Phase 5 — when
// non-nil, an acmeGCLoop runs every acmeGCInterval and sweeps expired
// nonces / authzs / orders. Optional: leaving nil disables the loop
// (legacy behavior pre-Phase-5).
func (s *Scheduler) SetACMEGarbageCollector(gc ACMEGarbageCollector) {
s.acmeGC = gc
}
// SetACMEGCInterval configures the interval at which the ACME GC sweep
// runs. Default 1m. Operators with quiet fleets can lengthen to 5m;
// operators expecting nonce-storms can shorten to 30s. Zero or
// negative values are ignored.
func (s *Scheduler) SetACMEGCInterval(d time.Duration) {
if d <= 0 {
return
}
s.acmeGCInterval = d
}
// SetSessionGarbageCollector wires the Auth Bundle 2 Phase 4 session GC
// service. Optional; nil disables the loop (Bundle-2-disabled deployments
// still run pre-Phase-4 behavior).
func (s *Scheduler) SetSessionGarbageCollector(gc SessionGarbageCollector) {
s.sessionGC = gc
}
// SetBCLReplayGarbageCollector wires the BCL consumed-jti GC. Audit
// 2026-05-10 HIGH-3 closure. The sweep runs on the same ticker as the
// session GC loop (no separate interval; replay rows are short-lived).
func (s *Scheduler) SetBCLReplayGarbageCollector(gc BCLReplayGarbageCollector) {
s.bclReplayGC = gc
}
// SetSessionGCInterval configures the interval at which the session GC
// sweep runs. Default 1h. Wire: CERTCTL_SESSION_GC_INTERVAL. Zero or
// negative values are ignored.
func (s *Scheduler) SetSessionGCInterval(d time.Duration) {
if d <= 0 {
return
}
s.sessionGCInterval = d
}
// SetRateLimitGarbageCollector wires the Phase 13 Sprint 13.3 rate-
// limit bucket GC. Optional; nil disables the loop (which is the
// correct behavior when CERTCTL_RATE_LIMIT_BACKEND=memory — the
// in-memory backend's prune-on-Allow path keeps buckets short-lived
// without a separate sweep).
//
// Concrete impl is *ratelimit.PostgresGC, constructed in
// cmd/server/main.go only when the postgres backend is selected.
func (s *Scheduler) SetRateLimitGarbageCollector(gc RateLimitGarbageCollector) {
s.rateLimitGC = gc
}
// SetRateLimitGCInterval configures the interval at which the rate-
// limit GC sweep runs. Default 5m. Wire:
// CERTCTL_RATE_LIMIT_JANITOR_INTERVAL. Zero or negative values are
// ignored.
func (s *Scheduler) SetRateLimitGCInterval(d time.Duration) {
if d <= 0 {
return
}
s.rateLimitGCInterval = d
}
// SetAuditChainVerifier wires the Sprint 6 COMP-001-HASH chain
// verifier. Optional; when nil the auditChainVerifyLoop is skipped
// (test fixtures that don't seed migration 000047 can leave it
// unset). Concrete impl is *postgres.AuditRepository.
func (s *Scheduler) SetAuditChainVerifier(v AuditChainVerifier) {
s.auditChainVerifier = v
}
// SetAuditChainBreakRecorder wires the metric-side counter that the
// verify loop calls on every walk (RecordSuccess) and on detection of
// a break (RecordBreak). Concrete impl is *service.AuditChainCounter.
func (s *Scheduler) SetAuditChainBreakRecorder(r AuditChainBreakRecorder) {
s.auditChainRecorder = r
}
// SetAuditChainVerifyInterval configures the audit_events_verify_chain
// tick cadence. Default 6h. Wire: CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL.
// Zero or negative values are ignored.
func (s *Scheduler) SetAuditChainVerifyInterval(d time.Duration) {
if d <= 0 {
return
}
s.auditChainVerifyInterval = d
}
// SetUserRetentionPurger wires the Sprint 6 COMP-002-RETENTION
// user-PII-purge sweeper. Optional — nil disables the loop (deployments
// that don't have any federated humans yet, or those that want manual
// purge via the admin endpoint only). Concrete impl is
// *service.UserRetentionService.
func (s *Scheduler) SetUserRetentionPurger(p UserRetentionPurger) {
s.userRetention = p
}
// SetUserRetentionInterval configures the userRetentionLoop tick
// cadence. Default 24h. Wire: CERTCTL_USER_RETENTION_INTERVAL.
// Zero or negative values are ignored.
func (s *Scheduler) SetUserRetentionInterval(d time.Duration) {
if d <= 0 {
return
}
s.userRetentionInterval = d
}
// SetAgentOfflineJobTTL sets the threshold past which a Running job whose
// owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016.
// Zero or negative values are ignored (the default of 5 minutes is kept).
func (s *Scheduler) SetAgentOfflineJobTTL(d time.Duration) {
if d <= 0 {
return
}
s.agentOfflineJobTTL = d
}
// SetJobTimeoutInterval sets the job timeout reaper tick interval (I-003).
func (s *Scheduler) SetJobTimeoutInterval(d time.Duration) {
s.jobTimeoutInterval = d
}
// SetAwaitingCSRTimeout sets the AwaitingCSR TTL (I-003).
func (s *Scheduler) SetAwaitingCSRTimeout(d time.Duration) {
s.awaitingCSRTimeout = d
}
// SetAwaitingApprovalTimeout sets the AwaitingApproval TTL (I-003).
func (s *Scheduler) SetAwaitingApprovalTimeout(d time.Duration) {
s.awaitingApprovalTimeout = d
}
// Start initiates all background scheduler loops. It returns a channel that signals
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
startedChan := make(chan struct{})
go func() {
s.logger.Info("scheduler starting")
// Track all loop goroutines in the WaitGroup so WaitForCompletion
// blocks until they've fully exited (prevents test races).
// Base count is 8: renewal, job processor, job retry (I-001),
// job timeout (I-003), agent health, notification, notification retry
// (I-005), short-lived expiry. Optional loops (network scan, digest,
// health check, cloud discovery) add to this.
loopCount := 8
if s.networkScanService != nil {
loopCount++
}
if s.digestService != nil {
loopCount++
}
if s.healthCheckService != nil {
loopCount++
}
if s.cloudDiscoveryService != nil {
loopCount++
}
if s.crlCacheService != nil {
loopCount++
}
if s.acmeGC != nil {
loopCount++
}
if s.sessionGC != nil {
loopCount++
}
if s.rateLimitGC != nil {
loopCount++
}
if s.auditChainVerifier != nil {
loopCount++
}
if s.userRetention != nil {
loopCount++
}
s.wg.Add(loopCount)
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobProcessorLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobRetryLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobTimeoutLoop(ctx) }()
go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationRetryLoop(ctx) }()
go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }()
if s.networkScanService != nil {
go func() { defer s.wg.Done(); s.networkScanLoop(ctx) }()
}
if s.digestService != nil {
go func() { defer s.wg.Done(); s.digestLoop(ctx) }()
}
if s.healthCheckService != nil {
go func() { defer s.wg.Done(); s.healthCheckLoop(ctx) }()
}
if s.cloudDiscoveryService != nil {
go func() { defer s.wg.Done(); s.cloudDiscoveryLoop(ctx) }()
}
if s.crlCacheService != nil {
go func() { defer s.wg.Done(); s.crlGenerationLoop(ctx) }()
}
if s.acmeGC != nil {
go func() { defer s.wg.Done(); s.acmeGCLoop(ctx) }()
}
if s.sessionGC != nil {
go func() { defer s.wg.Done(); s.sessionGCLoop(ctx) }()
}
if s.rateLimitGC != nil {
go func() { defer s.wg.Done(); s.rateLimitGCLoop(ctx) }()
}
if s.auditChainVerifier != nil {
go func() { defer s.wg.Done(); s.auditChainVerifyLoop(ctx) }()
}
if s.userRetention != nil {
go func() { defer s.wg.Done(); s.userRetentionLoop(ctx) }()
}
// Signal that all loops are launched
close(startedChan)
// Wait for context cancellation
<-ctx.Done()
s.logger.Info("scheduler shutting down", "reason", ctx.Err())
}()
return startedChan
}
// renewalCheckLoop runs every renewalCheckInterval and checks for expiring certificates.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) renewalCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.renewalCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.renewalCheckRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.renewalCheckRunning.Store(false)
s.runRenewalCheck(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.renewalCheckRunning.CompareAndSwap(false, true) {
s.logger.Warn("renewal check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.renewalCheckRunning.Store(false)
s.runRenewalCheck(ctx)
}()
}
}
}
// runRenewalCheck executes a single renewal check with error recovery.
func (s *Scheduler) runRenewalCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.renewalService.CheckExpiringCertificates(opCtx); err != nil {
s.logger.Error("renewal check failed",
"error", err,
"interval", s.renewalCheckInterval.String())
} else {
s.logger.Debug("renewal check completed")
}
}
// jobProcessorLoop runs every jobProcessorInterval and processes pending jobs.
// It picks up pending jobs, executes them, and handles the results.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous job is still running.
func (s *Scheduler) jobProcessorLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.jobProcessorInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobProcessorRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobProcessorRunning.Store(false)
s.runJobProcessor(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobProcessorRunning.CompareAndSwap(false, true) {
s.logger.Warn("job processor still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobProcessorRunning.Store(false)
s.runJobProcessor(ctx)
}()
}
}
}
// runJobProcessor executes a single job processing cycle with error recovery.
func (s *Scheduler) runJobProcessor(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.jobService.ProcessPendingJobs(opCtx); err != nil {
s.logger.Error("job processor failed",
"error", err,
"interval", s.jobProcessorInterval.String())
} else {
s.logger.Debug("job processor completed")
}
}
// jobRetryLoop runs every jobRetryInterval and transitions eligible Failed jobs
// back to Pending so the job processor can pick them up again. Closes coverage
// gap I-001 — JobService.RetryFailedJobs had no runtime caller prior to this
// loop being wired. Runs immediately on start, then every interval.
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
// is still running.
func (s *Scheduler) jobRetryLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.jobRetryInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobRetryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobRetryRunning.Store(false)
s.runJobRetry(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobRetryRunning.CompareAndSwap(false, true) {
s.logger.Warn("job retry still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobRetryRunning.Store(false)
s.runJobRetry(ctx)
}()
}
}
}
// runJobRetry executes a single failed-job retry cycle with error recovery.
// Uses the same 2-minute per-tick timeout as runJobProcessor; RetryFailedJobs
// issues one SELECT and one UPDATE per eligible job (cheap), so this headroom
// covers very large failure backlogs without starving the loop.
func (s *Scheduler) runJobRetry(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
// maxRetries is advisory at the service layer (per-job gating uses each
// job's own Attempts/MaxAttempts). Passing 3 matches the conventional
// default seen across the codebase's job creation paths.
if err := s.jobService.RetryFailedJobs(opCtx, 3); err != nil {
s.logger.Error("job retry failed",
"error", err,
"interval", s.jobRetryInterval.String())
} else {
s.logger.Debug("job retry completed")
}
}
// jobTimeoutLoop runs every jobTimeoutInterval and transitions jobs stuck in
// AwaitingCSR or AwaitingApproval to Failed if they exceed their TTL. I-001's
// retry loop then auto-promotes eligible Failed jobs back to Pending. Closes
// coverage gap I-003. Uses atomic.Bool to prevent duplicate execution.
func (s *Scheduler) jobTimeoutLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.jobTimeoutInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobTimeoutRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobTimeoutRunning.Store(false)
s.runJobTimeout(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobTimeoutRunning.CompareAndSwap(false, true) {
s.logger.Warn("job timeout reaper still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobTimeoutRunning.Store(false)
s.runJobTimeout(ctx)
}()
}
}
}
// runJobTimeout executes a single job timeout reaping cycle with error recovery.
// When no JobReaperService has been wired (e.g. in tests that don't exercise
// I-003) the call is a safe no-op, preserving the always-on loop topology
// described in I-003 without forcing every consumer to wire a reaper.
//
// Bundle C / Audit M-016: the reaping cycle now has TWO arms:
//
// 1. ReapTimedOutJobs handles AwaitingCSR / AwaitingApproval timeouts (I-003).
// 2. ReapJobsWithOfflineAgents handles Running jobs whose owning agent has
// gone silent (M-016). Reuses the same agentHealthCheckTimeout as the
// mark-stale-agents-offline path for consistency: if the agent is judged
// offline by AgentService.MarkStaleAgentsOffline, its in-flight jobs
// should be reaped on the same cadence.
func (s *Scheduler) runJobTimeout(ctx context.Context) {
if s.jobReaper == nil {
return
}
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.jobReaper.ReapTimedOutJobs(opCtx, s.awaitingCSRTimeout, s.awaitingApprovalTimeout); err != nil {
s.logger.Error("job timeout reaper failed",
"error", err,
"interval", s.jobTimeoutInterval.String())
} else {
s.logger.Debug("job timeout reaper completed")
}
// Second arm: offline-agent reaper. Uses agentOfflineTimeout (defaults to
// 5 minutes — same value the agent-health-check path uses to flip an
// agent to Offline). A sensible default of 5×agentHealthCheckInterval
// catches agents that miss multiple consecutive heartbeats while leaving
// a single missed beat as a transient blip that does NOT reap.
offlineCtx, offlineCancel := context.WithTimeout(ctx, 2*time.Minute)
defer offlineCancel()
if err := s.jobReaper.ReapJobsWithOfflineAgents(offlineCtx, s.agentOfflineJobTTL); err != nil {
s.logger.Error("offline-agent job reaper failed",
"error", err,
"agent_offline_ttl", s.agentOfflineJobTTL.String())
} else {
s.logger.Debug("offline-agent job reaper completed")
}
}
// agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline.
// An agent is considered stale if it hasn't sent a heartbeat within the health check interval.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.agentHealthCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.agentHealthCheckRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.agentHealthCheckRunning.Store(false)
s.runAgentHealthCheck(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.agentHealthCheckRunning.CompareAndSwap(false, true) {
s.logger.Warn("agent health check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.agentHealthCheckRunning.Store(false)
s.runAgentHealthCheck(ctx)
}()
}
}
}
// runAgentHealthCheck executes a single agent health check with error recovery.
func (s *Scheduler) runAgentHealthCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
if err := s.agentService.MarkStaleAgentsOffline(opCtx, s.agentHealthCheckInterval); err != nil {
s.logger.Error("agent health check failed",
"error", err,
"interval", s.agentHealthCheckInterval.String())
} else {
s.logger.Debug("agent health check completed")
}
}
// notificationProcessLoop runs every notificationProcessInterval and processes pending notifications.
// If an error occurs, it logs the error but continues running.
// Uses atomic.Bool to prevent duplicate execution if the previous process is still running.
func (s *Scheduler) notificationProcessLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.notificationProcessInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.notificationProcessRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationProcessRunning.Store(false)
s.runNotificationProcess(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.notificationProcessRunning.CompareAndSwap(false, true) {
s.logger.Warn("notification processor still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationProcessRunning.Store(false)
s.runNotificationProcess(ctx)
}()
}
}
}
// runNotificationProcess executes a single notification processing cycle with error recovery.
func (s *Scheduler) runNotificationProcess(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
if err := s.notificationService.ProcessPendingNotifications(opCtx); err != nil {
s.logger.Error("notification processor failed",
"error", err,
"interval", s.notificationProcessInterval.String())
} else {
s.logger.Debug("notification processor completed")
}
}
// notificationRetryLoop runs every notificationRetryInterval and transitions
// eligible Failed notifications back to Pending so the notification processor
// can pick them up again. Closes coverage gap I-005 — NotificationService.
// RetryFailedNotifications had no runtime caller prior to this loop being
// wired. Runs immediately on start, then every interval.
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
// is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte.
func (s *Scheduler) notificationRetryLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.notificationRetryInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.notificationRetryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationRetryRunning.Store(false)
s.runNotificationRetry(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.notificationRetryRunning.CompareAndSwap(false, true) {
s.logger.Warn("notification retry still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationRetryRunning.Store(false)
s.runNotificationRetry(ctx)
}()
}
}
}
// runNotificationRetry executes a single failed-notification retry cycle with
// error recovery. Uses a 2-minute per-tick timeout matching runJobRetry;
// RetryFailedNotifications issues one SELECT and one UPDATE per eligible row
// (cheap), so this headroom covers very large failure backlogs without
// starving the loop. The service layer swallows per-row send errors (mirrors
// ProcessPendingNotifications) and only returns the List error from the
// initial ListRetryEligible call.
func (s *Scheduler) runNotificationRetry(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.notificationService.RetryFailedNotifications(opCtx); err != nil {
s.logger.Error("notification retry failed",
"error", err,
"interval", s.notificationRetryInterval.String())
} else {
s.logger.Debug("notification retry completed")
}
}
// shortLivedExpiryCheckLoop runs every shortLivedExpiryCheckInterval and marks expired
// short-lived certificates. For certs with TTL < 1 hour, expiry IS revocation —
// no CRL/OCSP needed.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.shortLivedExpiryCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.shortLivedExpiryCheckRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.shortLivedExpiryCheckRunning.Store(false)
s.runShortLivedExpiryCheck(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.shortLivedExpiryCheckRunning.CompareAndSwap(false, true) {
s.logger.Warn("short-lived expiry check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.shortLivedExpiryCheckRunning.Store(false)
s.runShortLivedExpiryCheck(ctx)
}()
}
}
}
// runShortLivedExpiryCheck executes a single short-lived expiry check with error recovery.
func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := s.renewalService.ExpireShortLivedCertificates(opCtx); err != nil {
s.logger.Error("short-lived expiry check failed",
"error", err,
"interval", s.shortLivedExpiryCheckInterval.String())
} else {
s.logger.Debug("short-lived expiry check completed")
}
}
// networkScanLoop runs every networkScanInterval and performs active TLS scanning
// of configured network targets.
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
func (s *Scheduler) networkScanLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.networkScanInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.networkScanRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.networkScanRunning.Store(false)
s.runNetworkScan(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.networkScanRunning.CompareAndSwap(false, true) {
s.logger.Warn("network scan still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.networkScanRunning.Store(false)
s.runNetworkScan(ctx)
}()
}
}
}
// runNetworkScan executes a single network scan cycle with error recovery.
func (s *Scheduler) runNetworkScan(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
if err := s.networkScanService.ScanAllTargets(opCtx); err != nil {
s.logger.Error("network scan failed",
"error", err,
"interval", s.networkScanInterval.String())
} else {
s.logger.Debug("network scan completed")
}
}
// digestLoop runs every digestInterval and generates/sends certificate digest emails.
// Uses atomic.Bool to prevent duplicate execution if the previous digest is still running.
func (s *Scheduler) digestLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.digestInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Do NOT run immediately on start for digest — wait for the first tick.
// Digests are infrequent (24h default) and shouldn't fire on every restart.
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.digestRunning.CompareAndSwap(false, true) {
s.logger.Warn("digest processor still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.digestRunning.Store(false)
s.runDigest(ctx)
}()
}
}
}
// runDigest executes a single digest processing cycle with error recovery.
func (s *Scheduler) runDigest(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.digestService.ProcessDigest(opCtx); err != nil {
s.logger.Error("digest processor failed",
"error", err,
"interval", s.digestInterval.String())
} else {
s.logger.Debug("digest processor completed")
}
}
// healthCheckLoop runs every healthCheckInterval and performs endpoint TLS health checks.
// Do NOT run immediately on start — health checks are frequent (60s default) and may be
// resource-intensive. Wait for the first tick.
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
func (s *Scheduler) healthCheckLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.healthCheckInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Do NOT run immediately on start for health checks — wait for the first tick.
// Health checks are frequent and shouldn't fire on every restart.
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.healthCheckRunning.CompareAndSwap(false, true) {
s.logger.Debug("health check still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.healthCheckRunning.Store(false)
s.runHealthCheck(ctx)
}()
}
}
}
// runHealthCheck executes a single health check cycle with error recovery.
func (s *Scheduler) runHealthCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.healthCheckService.RunHealthChecks(opCtx); err != nil {
s.logger.Error("health check run failed",
"error", err,
"interval", s.healthCheckInterval.String())
} else {
s.logger.Debug("health check completed")
}
}
// cloudDiscoveryLoop runs every cloudDiscoveryInterval and discovers certificates from cloud secret managers.
// Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop.
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.cloudDiscoveryInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.cloudDiscoveryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.cloudDiscoveryRunning.Store(false)
s.runCloudDiscovery(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.cloudDiscoveryRunning.CompareAndSwap(false, true) {
s.logger.Warn("cloud discovery still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.cloudDiscoveryRunning.Store(false)
s.runCloudDiscovery(ctx)
}()
}
}
}
// runCloudDiscovery executes a single cloud discovery cycle with error recovery.
func (s *Scheduler) runCloudDiscovery(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
total, errs := s.cloudDiscoveryService.DiscoverAll(opCtx)
if len(errs) > 0 {
s.logger.Error("cloud discovery completed with errors",
"certificates_found", total,
"errors", len(errs),
"interval", s.cloudDiscoveryInterval.String())
for _, err := range errs {
if !errors.Is(err, context.Canceled) {
s.logger.Error("cloud discovery error", "error", err)
}
}
} else {
s.logger.Debug("cloud discovery completed",
"certificates_found", total)
}
}
// WaitForCompletion waits for all in-flight scheduler work to complete.
// It respects the provided timeout and returns an error if work is still in progress after timeout.
// Call this after the scheduler context has been cancelled to ensure graceful shutdown.
func (s *Scheduler) WaitForCompletion(timeout time.Duration) error {
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
s.logger.Info("all scheduler work completed")
return nil
case <-time.After(timeout):
s.logger.Warn("scheduler work did not complete within timeout", "timeout", timeout.String())
return ErrSchedulerShutdownTimeout
}
}
// crlGenerationLoop periodically pre-generates CRLs into crl_cache so
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint can serve from
// cache rather than regenerating per request. Mirrors the digestLoop
// shape: ticker, atomic.Bool guard for re-entry, WaitGroup integration
// for graceful shutdown.
//
// Bundle CRL/OCSP-Responder Phase 3.
func (s *Scheduler) crlGenerationLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.crlGenerationInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Do NOT run immediately on start. CRLs are typically valid for
// many hours; firing on every restart wastes work. The first tick
// arrives after one interval; on cache miss the HTTP handler
// triggers an immediate generation via the cache service.
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.crlGenerationRunning.CompareAndSwap(false, true) {
s.logger.Warn("CRL pre-generation still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.crlGenerationRunning.Store(false)
s.runCRLGeneration(ctx)
}()
}
}
}
// runCRLGeneration executes a single CRL pre-generation cycle with
// error recovery. Per-issuer failures inside RegenerateAll are logged
// + audited by the cache service itself; this wrapper only reports the
// outer context shape and bumps a metric (when wired).
func (s *Scheduler) runCRLGeneration(ctx context.Context) {
// 5-minute timeout: the per-issuer generation is fast (sub-second
// for most CAs), but the loop walks every issuer that supports
// CRL. Bound the total cycle so a stuck issuer cannot block the
// next tick.
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
s.crlCacheService.RegenerateAll(opCtx)
}
// ErrSchedulerShutdownTimeout is returned when scheduler graceful shutdown times out.
var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeout")
// acmeGCLoop runs every acmeGCInterval and invokes ACMEGarbageCollector.
// Per the project's scheduler-idempotency architecture decision: an
// atomic.Bool guard prevents concurrent tick execution; the
// sync.WaitGroup tracks the in-flight goroutine for graceful shutdown.
// Phase 5.
func (s *Scheduler) acmeGCLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.acmeGCInterval, DefaultSchedulerJitter)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.acmeGCRunning.CompareAndSwap(false, true) {
s.logger.Warn("ACME GC sweep still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.acmeGCRunning.Store(false)
// 1-minute timeout per sweep — the per-statement work is
// cheap (single DELETE / UPDATE per sweep, all on indexed
// columns), but bound the cycle so a stuck Postgres can't
// block the next tick.
opCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := s.acmeGC.GarbageCollect(opCtx); err != nil {
s.logger.Warn("acme gc sweep failed (next tick will retry)", "error", err)
}
}()
}
}
}
// sessionGCLoop runs every sessionGCInterval and invokes
// SessionGarbageCollector.GarbageCollect, which sweeps:
// - sessions whose absolute_expires_at is in the past (post-login expired);
// - pre-login session rows older than 10 minutes;
// - retired-past-retention session_signing_keys rows.
//
// Auth Bundle 2 Phase 4. The atomic.Bool guard + the per-tick
// context.WithTimeout match the pattern of every other loop in this
// file: a stuck Postgres can't block the next tick, and concurrent
// sweeps are skipped not queued.
func (s *Scheduler) sessionGCLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.sessionGCInterval, DefaultSchedulerJitter)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.sessionGCRunning.CompareAndSwap(false, true) {
s.logger.Warn("session GC sweep still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.sessionGCRunning.Store(false)
opCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if _, err := s.sessionGC.GarbageCollect(opCtx); err != nil {
s.logger.Warn("session gc sweep failed (next tick will retry)", "error", err)
}
// Audit 2026-05-10 HIGH-3 — sweep expired BCL consumed-jti
// rows on the same tick. Best-effort; failure logs at WARN
// (the next tick retries).
if s.bclReplayGC != nil {
if n, err := s.bclReplayGC.SweepExpired(opCtx, time.Now().UTC()); err != nil {
s.logger.Warn("bcl replay gc sweep failed (next tick will retry)", "error", err)
} else if n > 0 {
s.logger.Debug("bcl replay gc swept rows", "rows", n)
}
}
}()
}
}
}
// rateLimitGCLoop runs every rateLimitGCInterval and invokes
// RateLimitGarbageCollector.GarbageCollect, which sweeps stale rows
// from the rate_limit_buckets table introduced in Phase 13 Sprint
// 13.2's migration 000046.
//
// Wired only when CERTCTL_RATE_LIMIT_BACKEND=postgres (the in-memory
// backend's prune-on-Allow path keeps buckets short-lived without a
// separate sweep — cmd/server/main.go skips SetRateLimitGarbageCollector
// for that case so this loop never launches).
//
// Phase 13 Sprint 13.3 closure. The atomic.Bool guard + per-tick
// context.WithTimeout match every other GC loop's pattern.
func (s *Scheduler) rateLimitGCLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.rateLimitGCInterval, DefaultSchedulerJitter)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.rateLimitGCRunning.CompareAndSwap(false, true) {
s.logger.Warn("rate-limit GC sweep still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.rateLimitGCRunning.Store(false)
// 1-minute timeout matches acme + session GC loops.
opCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if n, err := s.rateLimitGC.GarbageCollect(opCtx); err != nil {
s.logger.Warn("rate-limit gc sweep failed (next tick will retry)", "error", err)
} else if n > 0 {
s.logger.Debug("rate-limit gc swept stale buckets", "rows", n)
}
}()
}
}
}
// auditChainVerifyLoop is the Sprint 6 COMP-001-HASH tamper-evidence
// sweeper. Every CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL tick it calls
// AuditChainVerifier.VerifyHashChain — which runs migration 000047's
// audit_events_verify_chain() plpgsql function entirely server-side —
// and reports through the metric-side recorder.
//
// Why a scheduler loop rather than a CI/cron job: the audit's spec
// language ("CI/cron job that walks the chain end-to-end") describes
// the intent, not the implementation. A scheduler loop has three
// advantages over a sidecar cron:
//
// 1. Single deploy artifact — no external scheduler / no extra Pod.
// 2. Configurable cadence via the same CERTCTL_* env-var pattern as
// every other scheduled task.
// 3. The certctl_audit_chain_break_detected metric is exposed on
// /api/v1/metrics/prometheus immediately, no separate scrape
// endpoint to wire.
//
// Performance: the chain walk is O(N) plpgsql with a single sequential
// scan + per-row digest(). On testcontainers PG-16-alpine with 1M
// rows it costs ~2-3s — well under the 5-minute per-tick context
// timeout. Operators with much larger audit tables should monitor
// the per-tick latency and lengthen the interval if the walk crowds
// out the application's foreground traffic.
//
// Self-restart contract: if a tick is still running when the next
// tick fires, the new tick is skipped (CompareAndSwap guard); the
// log line tells operators we're behind so they can pick a longer
// interval. This mirrors every other GC / sweep loop in the file.
func (s *Scheduler) auditChainVerifyLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.auditChainVerifyInterval, DefaultSchedulerJitter)
defer ticker.Stop()
// Run once immediately on start so a freshly-deployed instance
// gets a baseline metric reading + surfaces tampering on the first
// post-restart tick rather than after the first full interval.
s.runAuditChainVerify(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.runAuditChainVerify(ctx)
}
}
}
// userRetentionLoop is the Sprint 6 COMP-002-RETENTION sweeper. Every
// CERTCTL_USER_RETENTION_INTERVAL tick it asks
// UserRetentionService.PurgeDeactivatedUsers to walk every user whose
// deactivated_at is older than the retention window and scrub the PII
// columns. The service is responsible for the row-level work + audit
// emission; the loop only orchestrates cadence + concurrency control.
//
// Mirrors the GC-loop pattern: atomic.Bool guard prevents overlapping
// ticks; per-tick context.WithTimeout caps the worst case at 5
// minutes. The retention service's purgeBatchCap (default 200) is the
// inner-loop budget — large backlogs spread across multiple ticks.
func (s *Scheduler) userRetentionLoop(ctx context.Context) {
ticker := NewJitteredTicker(s.userRetentionInterval, DefaultSchedulerJitter)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.userRetentionRunning.CompareAndSwap(false, true) {
s.logger.Warn("user retention purge still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.userRetentionRunning.Store(false)
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
purged, failed, err := s.userRetention.PurgeDeactivatedUsers(opCtx)
if err != nil {
s.logger.Warn("user retention purge failed (next tick will retry)", "error", err)
return
}
if purged > 0 || failed > 0 {
s.logger.Info("user retention purge tick",
"purged", purged, "failed", failed)
}
}()
}
}
}
// runAuditChainVerify executes a single chain-verify pass with the
// atomic.Bool + WithTimeout + goroutine pattern every other GC loop
// uses. Extracted so the loop body + the "run once on start" path
// share one implementation.
func (s *Scheduler) runAuditChainVerify(ctx context.Context) {
if !s.auditChainVerifyRunning.CompareAndSwap(false, true) {
s.logger.Warn("audit chain verify still running, skipping tick")
return
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.auditChainVerifyRunning.Store(false)
// 5-minute timeout — chain walk is O(N) over the full
// audit_events table; large fleets may want a longer interval
// but the per-tick deadline keeps a runaway walk from blocking
// the next tick indefinitely.
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
brokenID, brokenPos, rowCount, err := s.auditChainVerifier.VerifyHashChain(opCtx)
if err != nil {
s.logger.Warn("audit chain verify failed (next tick will retry)",
"error", err)
return
}
if brokenID != "" {
s.logger.Error("audit chain break detected — tamper-evidence trigger fired",
"broken_at_id", brokenID,
"broken_at_pos", brokenPos,
"row_count", rowCount)
if s.auditChainRecorder != nil {
s.auditChainRecorder.RecordBreak(brokenID, brokenPos)
}
return
}
s.logger.Debug("audit chain verify clean", "rows", rowCount)
if s.auditChainRecorder != nil {
s.auditChainRecorder.RecordSuccess(rowCount)
}
}()
}