// Copyright 2026 certctl LLC. All rights reserved. // SPDX-License-Identifier: BUSL-1.1 package scheduler import ( "context" "errors" "log/slog" "sync" "sync/atomic" "time" ) // RenewalServicer defines the interface for renewal operations used by the scheduler. type RenewalServicer interface { CheckExpiringCertificates(ctx context.Context) error ExpireShortLivedCertificates(ctx context.Context) error } // JobServicer defines the interface for job processing used by the scheduler. // // RetryFailedJobs was added to close coverage gap I-001: JobService.RetryFailedJobs // existed and was unit-tested but had no runtime caller prior to this loop being // wired. The scheduler now drives it on an independent tick so failed jobs whose // attempt counter is below MaxAttempts are periodically reset to Pending for the // job processor to pick up again. maxRetries is advisory (per-job gating uses // each job's own Attempts/MaxAttempts fields). type JobServicer interface { ProcessPendingJobs(ctx context.Context) error RetryFailedJobs(ctx context.Context, maxRetries int) error } // AgentServicer defines the interface for agent health checks used by the scheduler. type AgentServicer interface { MarkStaleAgentsOffline(ctx context.Context, interval time.Duration) error } // NotificationServicer defines the interface for notification processing used by the scheduler. // // RetryFailedNotifications was added to close coverage gap I-005: the retry // sweep transitions eligible Failed notifications to Pending on an independent // tick, using exponential backoff with a 1h cap and a 5-attempt DLQ budget. // Mirrors the I-001 job retry loop topology. type NotificationServicer interface { ProcessPendingNotifications(ctx context.Context) error RetryFailedNotifications(ctx context.Context) error } // NetworkScanServicer defines the interface for network scanning used by the scheduler. type NetworkScanServicer interface { ScanAllTargets(ctx context.Context) error } // DigestServicer defines the interface for digest email processing used by the scheduler. type DigestServicer interface { ProcessDigest(ctx context.Context) error } // HealthCheckServicer defines the interface for endpoint TLS health monitoring used by the scheduler. type HealthCheckServicer interface { RunHealthChecks(ctx context.Context) error } // CloudDiscoveryServicer defines the interface for cloud secret manager discovery used by the scheduler. type CloudDiscoveryServicer interface { DiscoverAll(ctx context.Context) (int, []error) } // CRLCacheServicer defines the interface for the scheduler's CRL // pre-generation loop. RegenerateAll iterates every issuer that // supports CRL signing and refreshes its crl_cache row. Per-issuer // failures are logged + audited; a single bad issuer does not stop // the others. // // Bundle CRL/OCSP-Responder Phase 3: the scheduler-driven cache lets // the /.well-known/pki/crl/{issuer_id} HTTP endpoint serve from cache // instead of regenerating per request. type CRLCacheServicer interface { RegenerateAll(ctx context.Context) } // ACMEGarbageCollector is the interface the scheduler's acmeGCLoop // invokes once per tick. The concrete implementation is *service.ACMEService. // Phase 5 — sweeps expired nonces / authzs / orders. type ACMEGarbageCollector interface { GarbageCollect(ctx context.Context) error } // SessionGarbageCollector is the interface the scheduler's sessionGCLoop // invokes once per CERTCTL_SESSION_GC_INTERVAL tick. Concrete impl is // *session.Service. Sweeps expired post-login + pre-login session rows // AND retired-past-retention signing-key rows. Auth Bundle 2 Phase 4. type SessionGarbageCollector interface { GarbageCollect(ctx context.Context) (int, error) } // BCLReplayGarbageCollector sweeps expired rows from the BCL consumed-jti // table. Audit 2026-05-10 HIGH-3 closure — the scheduler invokes this // alongside the session-GC tick so a single ticker drives both. Concrete // impl is repository.BCLReplayRepository.SweepExpired. type BCLReplayGarbageCollector interface { SweepExpired(ctx context.Context, now time.Time) (int, error) } // RateLimitGarbageCollector sweeps stale rows from the // rate_limit_buckets table introduced in migration 000046. Phase 13 // Sprint 13.3 (ARCH-M1 closure completion) — wired only when // CERTCTL_RATE_LIMIT_BACKEND=postgres. Concrete impl is // *ratelimit.PostgresGC. Mirrors the ACMEGarbageCollector + // SessionGarbageCollector contracts so the scheduler reuses the same // atomic.Bool + WithTimeout + ticker pattern as the existing GC loops. // // Returns the row count to surface via observability logs (matches // SessionGarbageCollector's shape — the operator wants to see // "how many buckets did the sweep delete" in steady-state monitoring). type RateLimitGarbageCollector interface { GarbageCollect(ctx context.Context) (int64, error) } // AuditChainVerifier walks the audit_events per-row hash chain // installed by migration 000047 (Sprint 6 COMP-001-HASH) and reports // the first break it finds. The scheduler's auditChainVerifyLoop // invokes this on a configurable cadence (default 6h) and increments // the certctl_audit_chain_break_detected counter on any non-empty // brokenAtID return — that counter is the operator-facing signal for // tamper-evidence. // // Concrete impl is *postgres.AuditRepository, which delegates to the // SQL function audit_events_verify_chain() shipped in the same // migration. The function is STABLE plpgsql so the walk happens // entirely server-side (no row-shipping to the application). type AuditChainVerifier interface { VerifyHashChain(ctx context.Context) (brokenAtID string, brokenAtPos int, rowCount int, err error) } // AuditChainBreakRecorder is the metric-side dependency for the // audit-chain verify loop. Concrete impl is the // *service.AuditChainCounter wired in cmd/server/main.go; tests use // an in-memory implementation. The scheduler calls Inc() on a chain // break + Observe(rowCount) on every walk so operators can see "we // walked N rows and it was clean" in metrics. type AuditChainBreakRecorder interface { RecordBreak(brokenAtID string, brokenAtPos int) RecordSuccess(rowCount int) } // UserRetentionPurger is the Sprint 6 COMP-002-RETENTION scheduler-side // interface. Concrete impl is *service.UserRetentionService — it walks // every user whose deactivated_at exceeds the retention window and // scrubs PII columns (email / display_name / oidc_subject hash). The // loop calls PurgeDeactivatedUsers on every CERTCTL_USER_RETENTION_INTERVAL // tick. nil = loop is not wired (deployments that disable retention). type UserRetentionPurger interface { PurgeDeactivatedUsers(ctx context.Context) (purged, failed int, err error) } // JobReaperService defines the interface for job timeout reaping used by the scheduler. type JobReaperService interface { ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error // Bundle C / Audit M-016 (CWE-754): closes the gap left by ReapTimedOutJobs // (which only handles AwaitingCSR / AwaitingApproval). Jobs in Running // status whose owning agent has been silent for longer than agentTTL get // transitioned to Failed with reason "agent_offline" so I-001's retry // loop can re-queue them on a healthy agent. ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error } // Scheduler manages background jobs and periodic tasks for the certificate control plane. // It runs multiple concurrent loops for renewal checks, job processing, agent health checks, // and notification processing. type Scheduler struct { renewalService RenewalServicer jobService JobServicer agentService AgentServicer notificationService NotificationServicer networkScanService NetworkScanServicer digestService DigestServicer healthCheckService HealthCheckServicer cloudDiscoveryService CloudDiscoveryServicer crlCacheService CRLCacheServicer acmeGC ACMEGarbageCollector sessionGC SessionGarbageCollector bclReplayGC BCLReplayGarbageCollector rateLimitGC RateLimitGarbageCollector auditChainVerifier AuditChainVerifier auditChainRecorder AuditChainBreakRecorder userRetention UserRetentionPurger jobReaper JobReaperService logger *slog.Logger // Configurable tick intervals renewalCheckInterval time.Duration jobProcessorInterval time.Duration jobRetryInterval time.Duration agentHealthCheckInterval time.Duration notificationProcessInterval time.Duration notificationRetryInterval time.Duration shortLivedExpiryCheckInterval time.Duration networkScanInterval time.Duration digestInterval time.Duration healthCheckInterval time.Duration cloudDiscoveryInterval time.Duration crlGenerationInterval time.Duration jobTimeoutInterval time.Duration acmeGCInterval time.Duration sessionGCInterval time.Duration rateLimitGCInterval time.Duration auditChainVerifyInterval time.Duration userRetentionInterval time.Duration // agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose // owning agent has been silent. Bundle C / Audit M-016. Defaults below. agentOfflineJobTTL time.Duration awaitingCSRTimeout time.Duration awaitingApprovalTimeout time.Duration // Idempotency guards: prevent duplicate execution of slow jobs renewalCheckRunning atomic.Bool jobProcessorRunning atomic.Bool jobRetryRunning atomic.Bool agentHealthCheckRunning atomic.Bool notificationProcessRunning atomic.Bool notificationRetryRunning atomic.Bool shortLivedExpiryCheckRunning atomic.Bool networkScanRunning atomic.Bool digestRunning atomic.Bool healthCheckRunning atomic.Bool cloudDiscoveryRunning atomic.Bool crlGenerationRunning atomic.Bool jobTimeoutRunning atomic.Bool acmeGCRunning atomic.Bool sessionGCRunning atomic.Bool rateLimitGCRunning atomic.Bool auditChainVerifyRunning atomic.Bool userRetentionRunning atomic.Bool // Graceful shutdown: wait for in-flight work to complete wg sync.WaitGroup } // NewScheduler creates a new scheduler with configurable intervals. func NewScheduler( renewalService RenewalServicer, jobService JobServicer, agentService AgentServicer, notificationService NotificationServicer, networkScanService NetworkScanServicer, logger *slog.Logger, ) *Scheduler { return &Scheduler{ renewalService: renewalService, jobService: jobService, agentService: agentService, notificationService: notificationService, networkScanService: networkScanService, logger: logger, // Default intervals renewalCheckInterval: 1 * time.Hour, jobProcessorInterval: 30 * time.Second, jobRetryInterval: 5 * time.Minute, agentHealthCheckInterval: 2 * time.Minute, notificationProcessInterval: 1 * time.Minute, notificationRetryInterval: 2 * time.Minute, shortLivedExpiryCheckInterval: 30 * time.Second, networkScanInterval: 6 * time.Hour, digestInterval: 24 * time.Hour, healthCheckInterval: 60 * time.Second, cloudDiscoveryInterval: 6 * time.Hour, crlGenerationInterval: 1 * time.Hour, jobTimeoutInterval: 10 * time.Minute, acmeGCInterval: 1 * time.Minute, sessionGCInterval: 1 * time.Hour, rateLimitGCInterval: 5 * time.Minute, // Sprint 6 COMP-001-HASH: chain walk is O(N) over audit_events // (server-side plpgsql). 6h is a balance — quick enough to // surface tampering within a working day, infrequent enough to // not dominate a quiet fleet's DB load. Operators with huge // audit tables can lengthen via CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL. auditChainVerifyInterval: 6 * time.Hour, // Sprint 6 COMP-002-RETENTION: user PII purge cadence. Default // 24h — deactivated rows persist past the retention window // (default 30d) only until the next tick, which is fine for // GDPR-style "delete within reasonable time" expectations. userRetentionInterval: 24 * time.Hour, // 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent // must miss multiple heartbeats before its in-flight jobs are reaped. agentOfflineJobTTL: 5 * time.Minute, } } // SetDigestService sets the digest service for the 7th scheduler loop. // Called after construction since digest is optional. func (s *Scheduler) SetDigestService(ds DigestServicer) { s.digestService = ds } // SetDigestInterval configures the interval for digest email processing. func (s *Scheduler) SetDigestInterval(d time.Duration) { s.digestInterval = d } // SetRenewalCheckInterval configures the interval for renewal checks. func (s *Scheduler) SetRenewalCheckInterval(d time.Duration) { s.renewalCheckInterval = d } // SetJobProcessorInterval configures the interval for job processing. func (s *Scheduler) SetJobProcessorInterval(d time.Duration) { s.jobProcessorInterval = d } // SetJobRetryInterval configures the interval for the failed-job retry loop // (coverage gap I-001). Defaults to 5 minutes; honors // CERTCTL_SCHEDULER_RETRY_INTERVAL when wired from config. func (s *Scheduler) SetJobRetryInterval(d time.Duration) { s.jobRetryInterval = d } // SetAgentHealthCheckInterval configures the interval for agent health checks. func (s *Scheduler) SetAgentHealthCheckInterval(d time.Duration) { s.agentHealthCheckInterval = d } // SetNotificationProcessInterval configures the interval for notification processing. func (s *Scheduler) SetNotificationProcessInterval(d time.Duration) { s.notificationProcessInterval = d } // SetNotificationRetryInterval configures the interval for the failed-notification // retry sweep (coverage gap I-005). Defaults to 2 minutes; honors // CERTCTL_NOTIFICATION_RETRY_INTERVAL when wired from config. func (s *Scheduler) SetNotificationRetryInterval(d time.Duration) { s.notificationRetryInterval = d } // SetNetworkScanInterval configures the interval for network scanning. func (s *Scheduler) SetNetworkScanInterval(d time.Duration) { s.networkScanInterval = d } // SetShortLivedExpiryCheckInterval configures the interval for short-lived certificate expiry checks. func (s *Scheduler) SetShortLivedExpiryCheckInterval(d time.Duration) { s.shortLivedExpiryCheckInterval = d } // SetHealthCheckService sets the health check service for the 8th scheduler loop. // Called after construction since health monitoring is optional. func (s *Scheduler) SetHealthCheckService(hcs HealthCheckServicer) { s.healthCheckService = hcs } // SetHealthCheckInterval configures the interval for endpoint TLS health checks. func (s *Scheduler) SetHealthCheckInterval(d time.Duration) { s.healthCheckInterval = d } // SetCloudDiscoveryService sets the cloud discovery service for the 9th scheduler loop. // Called after construction since cloud discovery is optional. func (s *Scheduler) SetCloudDiscoveryService(cds CloudDiscoveryServicer) { s.cloudDiscoveryService = cds } // SetCloudDiscoveryInterval configures the interval for cloud secret manager discovery. func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) { s.cloudDiscoveryInterval = d } // SetCRLCacheService sets the CRL cache service for the crlGenerationLoop. // Called after construction since the loop is optional — when this is // unset, no pre-generation happens and HTTP CRL fetches go through the // on-demand path. // // Bundle CRL/OCSP-Responder Phase 3. func (s *Scheduler) SetCRLCacheService(svc CRLCacheServicer) { s.crlCacheService = svc } // SetCRLGenerationInterval configures the interval at which the // scheduler regenerates CRLs into the crl_cache table. Default 1h // (matches relying-party CRL refresh expectations under RFC 5280). // Operators with chatty fleets can shorten; operators with bandwidth // constraints can lengthen as long as nextUpdate stays comfortably in // the future per generation. // // Zero or negative values are ignored. func (s *Scheduler) SetCRLGenerationInterval(d time.Duration) { if d <= 0 { return } s.crlGenerationInterval = d } // SetJobReaperService sets the job reaper service (I-003). func (s *Scheduler) SetJobReaperService(jr JobReaperService) { s.jobReaper = jr } // SetACMEGarbageCollector wires the ACME GC service. Phase 5 — when // non-nil, an acmeGCLoop runs every acmeGCInterval and sweeps expired // nonces / authzs / orders. Optional: leaving nil disables the loop // (legacy behavior pre-Phase-5). func (s *Scheduler) SetACMEGarbageCollector(gc ACMEGarbageCollector) { s.acmeGC = gc } // SetACMEGCInterval configures the interval at which the ACME GC sweep // runs. Default 1m. Operators with quiet fleets can lengthen to 5m; // operators expecting nonce-storms can shorten to 30s. Zero or // negative values are ignored. func (s *Scheduler) SetACMEGCInterval(d time.Duration) { if d <= 0 { return } s.acmeGCInterval = d } // SetSessionGarbageCollector wires the Auth Bundle 2 Phase 4 session GC // service. Optional; nil disables the loop (Bundle-2-disabled deployments // still run pre-Phase-4 behavior). func (s *Scheduler) SetSessionGarbageCollector(gc SessionGarbageCollector) { s.sessionGC = gc } // SetBCLReplayGarbageCollector wires the BCL consumed-jti GC. Audit // 2026-05-10 HIGH-3 closure. The sweep runs on the same ticker as the // session GC loop (no separate interval; replay rows are short-lived). func (s *Scheduler) SetBCLReplayGarbageCollector(gc BCLReplayGarbageCollector) { s.bclReplayGC = gc } // SetSessionGCInterval configures the interval at which the session GC // sweep runs. Default 1h. Wire: CERTCTL_SESSION_GC_INTERVAL. Zero or // negative values are ignored. func (s *Scheduler) SetSessionGCInterval(d time.Duration) { if d <= 0 { return } s.sessionGCInterval = d } // SetRateLimitGarbageCollector wires the Phase 13 Sprint 13.3 rate- // limit bucket GC. Optional; nil disables the loop (which is the // correct behavior when CERTCTL_RATE_LIMIT_BACKEND=memory — the // in-memory backend's prune-on-Allow path keeps buckets short-lived // without a separate sweep). // // Concrete impl is *ratelimit.PostgresGC, constructed in // cmd/server/main.go only when the postgres backend is selected. func (s *Scheduler) SetRateLimitGarbageCollector(gc RateLimitGarbageCollector) { s.rateLimitGC = gc } // SetRateLimitGCInterval configures the interval at which the rate- // limit GC sweep runs. Default 5m. Wire: // CERTCTL_RATE_LIMIT_JANITOR_INTERVAL. Zero or negative values are // ignored. func (s *Scheduler) SetRateLimitGCInterval(d time.Duration) { if d <= 0 { return } s.rateLimitGCInterval = d } // SetAuditChainVerifier wires the Sprint 6 COMP-001-HASH chain // verifier. Optional; when nil the auditChainVerifyLoop is skipped // (test fixtures that don't seed migration 000047 can leave it // unset). Concrete impl is *postgres.AuditRepository. func (s *Scheduler) SetAuditChainVerifier(v AuditChainVerifier) { s.auditChainVerifier = v } // SetAuditChainBreakRecorder wires the metric-side counter that the // verify loop calls on every walk (RecordSuccess) and on detection of // a break (RecordBreak). Concrete impl is *service.AuditChainCounter. func (s *Scheduler) SetAuditChainBreakRecorder(r AuditChainBreakRecorder) { s.auditChainRecorder = r } // SetAuditChainVerifyInterval configures the audit_events_verify_chain // tick cadence. Default 6h. Wire: CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL. // Zero or negative values are ignored. func (s *Scheduler) SetAuditChainVerifyInterval(d time.Duration) { if d <= 0 { return } s.auditChainVerifyInterval = d } // SetUserRetentionPurger wires the Sprint 6 COMP-002-RETENTION // user-PII-purge sweeper. Optional — nil disables the loop (deployments // that don't have any federated humans yet, or those that want manual // purge via the admin endpoint only). Concrete impl is // *service.UserRetentionService. func (s *Scheduler) SetUserRetentionPurger(p UserRetentionPurger) { s.userRetention = p } // SetUserRetentionInterval configures the userRetentionLoop tick // cadence. Default 24h. Wire: CERTCTL_USER_RETENTION_INTERVAL. // Zero or negative values are ignored. func (s *Scheduler) SetUserRetentionInterval(d time.Duration) { if d <= 0 { return } s.userRetentionInterval = d } // SetAgentOfflineJobTTL sets the threshold past which a Running job whose // owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016. // Zero or negative values are ignored (the default of 5 minutes is kept). func (s *Scheduler) SetAgentOfflineJobTTL(d time.Duration) { if d <= 0 { return } s.agentOfflineJobTTL = d } // SetJobTimeoutInterval sets the job timeout reaper tick interval (I-003). func (s *Scheduler) SetJobTimeoutInterval(d time.Duration) { s.jobTimeoutInterval = d } // SetAwaitingCSRTimeout sets the AwaitingCSR TTL (I-003). func (s *Scheduler) SetAwaitingCSRTimeout(d time.Duration) { s.awaitingCSRTimeout = d } // SetAwaitingApprovalTimeout sets the AwaitingApproval TTL (I-003). func (s *Scheduler) SetAwaitingApprovalTimeout(d time.Duration) { s.awaitingApprovalTimeout = d } // Start initiates all background scheduler loops. It returns a channel that signals // when the scheduler has started all loops. The scheduler runs until the context is cancelled. func (s *Scheduler) Start(ctx context.Context) <-chan struct{} { startedChan := make(chan struct{}) go func() { s.logger.Info("scheduler starting") // Track all loop goroutines in the WaitGroup so WaitForCompletion // blocks until they've fully exited (prevents test races). // Base count is 8: renewal, job processor, job retry (I-001), // job timeout (I-003), agent health, notification, notification retry // (I-005), short-lived expiry. Optional loops (network scan, digest, // health check, cloud discovery) add to this. loopCount := 8 if s.networkScanService != nil { loopCount++ } if s.digestService != nil { loopCount++ } if s.healthCheckService != nil { loopCount++ } if s.cloudDiscoveryService != nil { loopCount++ } if s.crlCacheService != nil { loopCount++ } if s.acmeGC != nil { loopCount++ } if s.sessionGC != nil { loopCount++ } if s.rateLimitGC != nil { loopCount++ } if s.auditChainVerifier != nil { loopCount++ } if s.userRetention != nil { loopCount++ } s.wg.Add(loopCount) go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }() go func() { defer s.wg.Done(); s.jobProcessorLoop(ctx) }() go func() { defer s.wg.Done(); s.jobRetryLoop(ctx) }() go func() { defer s.wg.Done(); s.jobTimeoutLoop(ctx) }() go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }() go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }() go func() { defer s.wg.Done(); s.notificationRetryLoop(ctx) }() go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }() if s.networkScanService != nil { go func() { defer s.wg.Done(); s.networkScanLoop(ctx) }() } if s.digestService != nil { go func() { defer s.wg.Done(); s.digestLoop(ctx) }() } if s.healthCheckService != nil { go func() { defer s.wg.Done(); s.healthCheckLoop(ctx) }() } if s.cloudDiscoveryService != nil { go func() { defer s.wg.Done(); s.cloudDiscoveryLoop(ctx) }() } if s.crlCacheService != nil { go func() { defer s.wg.Done(); s.crlGenerationLoop(ctx) }() } if s.acmeGC != nil { go func() { defer s.wg.Done(); s.acmeGCLoop(ctx) }() } if s.sessionGC != nil { go func() { defer s.wg.Done(); s.sessionGCLoop(ctx) }() } if s.rateLimitGC != nil { go func() { defer s.wg.Done(); s.rateLimitGCLoop(ctx) }() } if s.auditChainVerifier != nil { go func() { defer s.wg.Done(); s.auditChainVerifyLoop(ctx) }() } if s.userRetention != nil { go func() { defer s.wg.Done(); s.userRetentionLoop(ctx) }() } // Signal that all loops are launched close(startedChan) // Wait for context cancellation <-ctx.Done() s.logger.Info("scheduler shutting down", "reason", ctx.Err()) }() return startedChan } // renewalCheckLoop runs every renewalCheckInterval and checks for expiring certificates. // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) renewalCheckLoop(ctx context.Context) { ticker := NewJitteredTicker(s.renewalCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.renewalCheckRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.renewalCheckRunning.Store(false) s.runRenewalCheck(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.renewalCheckRunning.CompareAndSwap(false, true) { s.logger.Warn("renewal check still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.renewalCheckRunning.Store(false) s.runRenewalCheck(ctx) }() } } } // runRenewalCheck executes a single renewal check with error recovery. func (s *Scheduler) runRenewalCheck(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() if err := s.renewalService.CheckExpiringCertificates(opCtx); err != nil { s.logger.Error("renewal check failed", "error", err, "interval", s.renewalCheckInterval.String()) } else { s.logger.Debug("renewal check completed") } } // jobProcessorLoop runs every jobProcessorInterval and processes pending jobs. // It picks up pending jobs, executes them, and handles the results. // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous job is still running. func (s *Scheduler) jobProcessorLoop(ctx context.Context) { ticker := NewJitteredTicker(s.jobProcessorInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.jobProcessorRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.jobProcessorRunning.Store(false) s.runJobProcessor(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.jobProcessorRunning.CompareAndSwap(false, true) { s.logger.Warn("job processor still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.jobProcessorRunning.Store(false) s.runJobProcessor(ctx) }() } } } // runJobProcessor executes a single job processing cycle with error recovery. func (s *Scheduler) runJobProcessor(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() if err := s.jobService.ProcessPendingJobs(opCtx); err != nil { s.logger.Error("job processor failed", "error", err, "interval", s.jobProcessorInterval.String()) } else { s.logger.Debug("job processor completed") } } // jobRetryLoop runs every jobRetryInterval and transitions eligible Failed jobs // back to Pending so the job processor can pick them up again. Closes coverage // gap I-001 — JobService.RetryFailedJobs had no runtime caller prior to this // loop being wired. Runs immediately on start, then every interval. // Uses atomic.Bool to prevent duplicate execution if the previous retry sweep // is still running. func (s *Scheduler) jobRetryLoop(ctx context.Context) { ticker := NewJitteredTicker(s.jobRetryInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.jobRetryRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.jobRetryRunning.Store(false) s.runJobRetry(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.jobRetryRunning.CompareAndSwap(false, true) { s.logger.Warn("job retry still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.jobRetryRunning.Store(false) s.runJobRetry(ctx) }() } } } // runJobRetry executes a single failed-job retry cycle with error recovery. // Uses the same 2-minute per-tick timeout as runJobProcessor; RetryFailedJobs // issues one SELECT and one UPDATE per eligible job (cheap), so this headroom // covers very large failure backlogs without starving the loop. func (s *Scheduler) runJobRetry(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() // maxRetries is advisory at the service layer (per-job gating uses each // job's own Attempts/MaxAttempts). Passing 3 matches the conventional // default seen across the codebase's job creation paths. if err := s.jobService.RetryFailedJobs(opCtx, 3); err != nil { s.logger.Error("job retry failed", "error", err, "interval", s.jobRetryInterval.String()) } else { s.logger.Debug("job retry completed") } } // jobTimeoutLoop runs every jobTimeoutInterval and transitions jobs stuck in // AwaitingCSR or AwaitingApproval to Failed if they exceed their TTL. I-001's // retry loop then auto-promotes eligible Failed jobs back to Pending. Closes // coverage gap I-003. Uses atomic.Bool to prevent duplicate execution. func (s *Scheduler) jobTimeoutLoop(ctx context.Context) { ticker := NewJitteredTicker(s.jobTimeoutInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.jobTimeoutRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.jobTimeoutRunning.Store(false) s.runJobTimeout(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.jobTimeoutRunning.CompareAndSwap(false, true) { s.logger.Warn("job timeout reaper still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.jobTimeoutRunning.Store(false) s.runJobTimeout(ctx) }() } } } // runJobTimeout executes a single job timeout reaping cycle with error recovery. // When no JobReaperService has been wired (e.g. in tests that don't exercise // I-003) the call is a safe no-op, preserving the always-on loop topology // described in I-003 without forcing every consumer to wire a reaper. // // Bundle C / Audit M-016: the reaping cycle now has TWO arms: // // 1. ReapTimedOutJobs handles AwaitingCSR / AwaitingApproval timeouts (I-003). // 2. ReapJobsWithOfflineAgents handles Running jobs whose owning agent has // gone silent (M-016). Reuses the same agentHealthCheckTimeout as the // mark-stale-agents-offline path for consistency: if the agent is judged // offline by AgentService.MarkStaleAgentsOffline, its in-flight jobs // should be reaped on the same cadence. func (s *Scheduler) runJobTimeout(ctx context.Context) { if s.jobReaper == nil { return } opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() if err := s.jobReaper.ReapTimedOutJobs(opCtx, s.awaitingCSRTimeout, s.awaitingApprovalTimeout); err != nil { s.logger.Error("job timeout reaper failed", "error", err, "interval", s.jobTimeoutInterval.String()) } else { s.logger.Debug("job timeout reaper completed") } // Second arm: offline-agent reaper. Uses agentOfflineTimeout (defaults to // 5 minutes — same value the agent-health-check path uses to flip an // agent to Offline). A sensible default of 5×agentHealthCheckInterval // catches agents that miss multiple consecutive heartbeats while leaving // a single missed beat as a transient blip that does NOT reap. offlineCtx, offlineCancel := context.WithTimeout(ctx, 2*time.Minute) defer offlineCancel() if err := s.jobReaper.ReapJobsWithOfflineAgents(offlineCtx, s.agentOfflineJobTTL); err != nil { s.logger.Error("offline-agent job reaper failed", "error", err, "agent_offline_ttl", s.agentOfflineJobTTL.String()) } else { s.logger.Debug("offline-agent job reaper completed") } } // agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline. // An agent is considered stale if it hasn't sent a heartbeat within the health check interval. // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) { ticker := NewJitteredTicker(s.agentHealthCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.agentHealthCheckRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.agentHealthCheckRunning.Store(false) s.runAgentHealthCheck(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.agentHealthCheckRunning.CompareAndSwap(false, true) { s.logger.Warn("agent health check still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.agentHealthCheckRunning.Store(false) s.runAgentHealthCheck(ctx) }() } } } // runAgentHealthCheck executes a single agent health check with error recovery. func (s *Scheduler) runAgentHealthCheck(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() if err := s.agentService.MarkStaleAgentsOffline(opCtx, s.agentHealthCheckInterval); err != nil { s.logger.Error("agent health check failed", "error", err, "interval", s.agentHealthCheckInterval.String()) } else { s.logger.Debug("agent health check completed") } } // notificationProcessLoop runs every notificationProcessInterval and processes pending notifications. // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous process is still running. func (s *Scheduler) notificationProcessLoop(ctx context.Context) { ticker := NewJitteredTicker(s.notificationProcessInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.notificationProcessRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.notificationProcessRunning.Store(false) s.runNotificationProcess(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.notificationProcessRunning.CompareAndSwap(false, true) { s.logger.Warn("notification processor still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.notificationProcessRunning.Store(false) s.runNotificationProcess(ctx) }() } } } // runNotificationProcess executes a single notification processing cycle with error recovery. func (s *Scheduler) runNotificationProcess(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() if err := s.notificationService.ProcessPendingNotifications(opCtx); err != nil { s.logger.Error("notification processor failed", "error", err, "interval", s.notificationProcessInterval.String()) } else { s.logger.Debug("notification processor completed") } } // notificationRetryLoop runs every notificationRetryInterval and transitions // eligible Failed notifications back to Pending so the notification processor // can pick them up again. Closes coverage gap I-005 — NotificationService. // RetryFailedNotifications had no runtime caller prior to this loop being // wired. Runs immediately on start, then every interval. // Uses atomic.Bool to prevent duplicate execution if the previous retry sweep // is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte. func (s *Scheduler) notificationRetryLoop(ctx context.Context) { ticker := NewJitteredTicker(s.notificationRetryInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.notificationRetryRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.notificationRetryRunning.Store(false) s.runNotificationRetry(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.notificationRetryRunning.CompareAndSwap(false, true) { s.logger.Warn("notification retry still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.notificationRetryRunning.Store(false) s.runNotificationRetry(ctx) }() } } } // runNotificationRetry executes a single failed-notification retry cycle with // error recovery. Uses a 2-minute per-tick timeout matching runJobRetry; // RetryFailedNotifications issues one SELECT and one UPDATE per eligible row // (cheap), so this headroom covers very large failure backlogs without // starving the loop. The service layer swallows per-row send errors (mirrors // ProcessPendingNotifications) and only returns the List error from the // initial ListRetryEligible call. func (s *Scheduler) runNotificationRetry(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() if err := s.notificationService.RetryFailedNotifications(opCtx); err != nil { s.logger.Error("notification retry failed", "error", err, "interval", s.notificationRetryInterval.String()) } else { s.logger.Debug("notification retry completed") } } // shortLivedExpiryCheckLoop runs every shortLivedExpiryCheckInterval and marks expired // short-lived certificates. For certs with TTL < 1 hour, expiry IS revocation — // no CRL/OCSP needed. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) { ticker := NewJitteredTicker(s.shortLivedExpiryCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.shortLivedExpiryCheckRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.shortLivedExpiryCheckRunning.Store(false) s.runShortLivedExpiryCheck(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.shortLivedExpiryCheckRunning.CompareAndSwap(false, true) { s.logger.Warn("short-lived expiry check still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.shortLivedExpiryCheckRunning.Store(false) s.runShortLivedExpiryCheck(ctx) }() } } } // runShortLivedExpiryCheck executes a single short-lived expiry check with error recovery. func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() if err := s.renewalService.ExpireShortLivedCertificates(opCtx); err != nil { s.logger.Error("short-lived expiry check failed", "error", err, "interval", s.shortLivedExpiryCheckInterval.String()) } else { s.logger.Debug("short-lived expiry check completed") } } // networkScanLoop runs every networkScanInterval and performs active TLS scanning // of configured network targets. // Uses atomic.Bool to prevent duplicate execution if the previous scan is still running. func (s *Scheduler) networkScanLoop(ctx context.Context) { ticker := NewJitteredTicker(s.networkScanInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.networkScanRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.networkScanRunning.Store(false) s.runNetworkScan(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.networkScanRunning.CompareAndSwap(false, true) { s.logger.Warn("network scan still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.networkScanRunning.Store(false) s.runNetworkScan(ctx) }() } } } // runNetworkScan executes a single network scan cycle with error recovery. func (s *Scheduler) runNetworkScan(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) defer cancel() if err := s.networkScanService.ScanAllTargets(opCtx); err != nil { s.logger.Error("network scan failed", "error", err, "interval", s.networkScanInterval.String()) } else { s.logger.Debug("network scan completed") } } // digestLoop runs every digestInterval and generates/sends certificate digest emails. // Uses atomic.Bool to prevent duplicate execution if the previous digest is still running. func (s *Scheduler) digestLoop(ctx context.Context) { ticker := NewJitteredTicker(s.digestInterval, DefaultSchedulerJitter) defer ticker.Stop() // Do NOT run immediately on start for digest — wait for the first tick. // Digests are infrequent (24h default) and shouldn't fire on every restart. for { select { case <-ctx.Done(): return case <-ticker.C: if !s.digestRunning.CompareAndSwap(false, true) { s.logger.Warn("digest processor still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.digestRunning.Store(false) s.runDigest(ctx) }() } } } // runDigest executes a single digest processing cycle with error recovery. func (s *Scheduler) runDigest(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() if err := s.digestService.ProcessDigest(opCtx); err != nil { s.logger.Error("digest processor failed", "error", err, "interval", s.digestInterval.String()) } else { s.logger.Debug("digest processor completed") } } // healthCheckLoop runs every healthCheckInterval and performs endpoint TLS health checks. // Do NOT run immediately on start — health checks are frequent (60s default) and may be // resource-intensive. Wait for the first tick. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) healthCheckLoop(ctx context.Context) { ticker := NewJitteredTicker(s.healthCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Do NOT run immediately on start for health checks — wait for the first tick. // Health checks are frequent and shouldn't fire on every restart. for { select { case <-ctx.Done(): return case <-ticker.C: if !s.healthCheckRunning.CompareAndSwap(false, true) { s.logger.Debug("health check still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.healthCheckRunning.Store(false) s.runHealthCheck(ctx) }() } } } // runHealthCheck executes a single health check cycle with error recovery. func (s *Scheduler) runHealthCheck(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() if err := s.healthCheckService.RunHealthChecks(opCtx); err != nil { s.logger.Error("health check run failed", "error", err, "interval", s.healthCheckInterval.String()) } else { s.logger.Debug("health check completed") } } // cloudDiscoveryLoop runs every cloudDiscoveryInterval and discovers certificates from cloud secret managers. // Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop. // Uses atomic.Bool to prevent duplicate execution if the previous scan is still running. func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) { ticker := NewJitteredTicker(s.cloudDiscoveryInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) s.cloudDiscoveryRunning.Store(true) s.wg.Add(1) go func() { defer s.wg.Done() defer s.cloudDiscoveryRunning.Store(false) s.runCloudDiscovery(ctx) }() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.cloudDiscoveryRunning.CompareAndSwap(false, true) { s.logger.Warn("cloud discovery still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.cloudDiscoveryRunning.Store(false) s.runCloudDiscovery(ctx) }() } } } // runCloudDiscovery executes a single cloud discovery cycle with error recovery. func (s *Scheduler) runCloudDiscovery(ctx context.Context) { opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) defer cancel() total, errs := s.cloudDiscoveryService.DiscoverAll(opCtx) if len(errs) > 0 { s.logger.Error("cloud discovery completed with errors", "certificates_found", total, "errors", len(errs), "interval", s.cloudDiscoveryInterval.String()) for _, err := range errs { if !errors.Is(err, context.Canceled) { s.logger.Error("cloud discovery error", "error", err) } } } else { s.logger.Debug("cloud discovery completed", "certificates_found", total) } } // WaitForCompletion waits for all in-flight scheduler work to complete. // It respects the provided timeout and returns an error if work is still in progress after timeout. // Call this after the scheduler context has been cancelled to ensure graceful shutdown. func (s *Scheduler) WaitForCompletion(timeout time.Duration) error { done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() select { case <-done: s.logger.Info("all scheduler work completed") return nil case <-time.After(timeout): s.logger.Warn("scheduler work did not complete within timeout", "timeout", timeout.String()) return ErrSchedulerShutdownTimeout } } // crlGenerationLoop periodically pre-generates CRLs into crl_cache so // the /.well-known/pki/crl/{issuer_id} HTTP endpoint can serve from // cache rather than regenerating per request. Mirrors the digestLoop // shape: ticker, atomic.Bool guard for re-entry, WaitGroup integration // for graceful shutdown. // // Bundle CRL/OCSP-Responder Phase 3. func (s *Scheduler) crlGenerationLoop(ctx context.Context) { ticker := NewJitteredTicker(s.crlGenerationInterval, DefaultSchedulerJitter) defer ticker.Stop() // Do NOT run immediately on start. CRLs are typically valid for // many hours; firing on every restart wastes work. The first tick // arrives after one interval; on cache miss the HTTP handler // triggers an immediate generation via the cache service. for { select { case <-ctx.Done(): return case <-ticker.C: if !s.crlGenerationRunning.CompareAndSwap(false, true) { s.logger.Warn("CRL pre-generation still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.crlGenerationRunning.Store(false) s.runCRLGeneration(ctx) }() } } } // runCRLGeneration executes a single CRL pre-generation cycle with // error recovery. Per-issuer failures inside RegenerateAll are logged // + audited by the cache service itself; this wrapper only reports the // outer context shape and bumps a metric (when wired). func (s *Scheduler) runCRLGeneration(ctx context.Context) { // 5-minute timeout: the per-issuer generation is fast (sub-second // for most CAs), but the loop walks every issuer that supports // CRL. Bound the total cycle so a stuck issuer cannot block the // next tick. opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() s.crlCacheService.RegenerateAll(opCtx) } // ErrSchedulerShutdownTimeout is returned when scheduler graceful shutdown times out. var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeout") // acmeGCLoop runs every acmeGCInterval and invokes ACMEGarbageCollector. // Per the project's scheduler-idempotency architecture decision: an // atomic.Bool guard prevents concurrent tick execution; the // sync.WaitGroup tracks the in-flight goroutine for graceful shutdown. // Phase 5. func (s *Scheduler) acmeGCLoop(ctx context.Context) { ticker := NewJitteredTicker(s.acmeGCInterval, DefaultSchedulerJitter) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.acmeGCRunning.CompareAndSwap(false, true) { s.logger.Warn("ACME GC sweep still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.acmeGCRunning.Store(false) // 1-minute timeout per sweep — the per-statement work is // cheap (single DELETE / UPDATE per sweep, all on indexed // columns), but bound the cycle so a stuck Postgres can't // block the next tick. opCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if err := s.acmeGC.GarbageCollect(opCtx); err != nil { s.logger.Warn("acme gc sweep failed (next tick will retry)", "error", err) } }() } } } // sessionGCLoop runs every sessionGCInterval and invokes // SessionGarbageCollector.GarbageCollect, which sweeps: // - sessions whose absolute_expires_at is in the past (post-login expired); // - pre-login session rows older than 10 minutes; // - retired-past-retention session_signing_keys rows. // // Auth Bundle 2 Phase 4. The atomic.Bool guard + the per-tick // context.WithTimeout match the pattern of every other loop in this // file: a stuck Postgres can't block the next tick, and concurrent // sweeps are skipped not queued. func (s *Scheduler) sessionGCLoop(ctx context.Context) { ticker := NewJitteredTicker(s.sessionGCInterval, DefaultSchedulerJitter) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.sessionGCRunning.CompareAndSwap(false, true) { s.logger.Warn("session GC sweep still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.sessionGCRunning.Store(false) opCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if _, err := s.sessionGC.GarbageCollect(opCtx); err != nil { s.logger.Warn("session gc sweep failed (next tick will retry)", "error", err) } // Audit 2026-05-10 HIGH-3 — sweep expired BCL consumed-jti // rows on the same tick. Best-effort; failure logs at WARN // (the next tick retries). if s.bclReplayGC != nil { if n, err := s.bclReplayGC.SweepExpired(opCtx, time.Now().UTC()); err != nil { s.logger.Warn("bcl replay gc sweep failed (next tick will retry)", "error", err) } else if n > 0 { s.logger.Debug("bcl replay gc swept rows", "rows", n) } } }() } } } // rateLimitGCLoop runs every rateLimitGCInterval and invokes // RateLimitGarbageCollector.GarbageCollect, which sweeps stale rows // from the rate_limit_buckets table introduced in Phase 13 Sprint // 13.2's migration 000046. // // Wired only when CERTCTL_RATE_LIMIT_BACKEND=postgres (the in-memory // backend's prune-on-Allow path keeps buckets short-lived without a // separate sweep — cmd/server/main.go skips SetRateLimitGarbageCollector // for that case so this loop never launches). // // Phase 13 Sprint 13.3 closure. The atomic.Bool guard + per-tick // context.WithTimeout match every other GC loop's pattern. func (s *Scheduler) rateLimitGCLoop(ctx context.Context) { ticker := NewJitteredTicker(s.rateLimitGCInterval, DefaultSchedulerJitter) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.rateLimitGCRunning.CompareAndSwap(false, true) { s.logger.Warn("rate-limit GC sweep still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.rateLimitGCRunning.Store(false) // 1-minute timeout matches acme + session GC loops. opCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if n, err := s.rateLimitGC.GarbageCollect(opCtx); err != nil { s.logger.Warn("rate-limit gc sweep failed (next tick will retry)", "error", err) } else if n > 0 { s.logger.Debug("rate-limit gc swept stale buckets", "rows", n) } }() } } } // auditChainVerifyLoop is the Sprint 6 COMP-001-HASH tamper-evidence // sweeper. Every CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL tick it calls // AuditChainVerifier.VerifyHashChain — which runs migration 000047's // audit_events_verify_chain() plpgsql function entirely server-side — // and reports through the metric-side recorder. // // Why a scheduler loop rather than a CI/cron job: the audit's spec // language ("CI/cron job that walks the chain end-to-end") describes // the intent, not the implementation. A scheduler loop has three // advantages over a sidecar cron: // // 1. Single deploy artifact — no external scheduler / no extra Pod. // 2. Configurable cadence via the same CERTCTL_* env-var pattern as // every other scheduled task. // 3. The certctl_audit_chain_break_detected metric is exposed on // /api/v1/metrics/prometheus immediately, no separate scrape // endpoint to wire. // // Performance: the chain walk is O(N) plpgsql with a single sequential // scan + per-row digest(). On testcontainers PG-16-alpine with 1M // rows it costs ~2-3s — well under the 5-minute per-tick context // timeout. Operators with much larger audit tables should monitor // the per-tick latency and lengthen the interval if the walk crowds // out the application's foreground traffic. // // Self-restart contract: if a tick is still running when the next // tick fires, the new tick is skipped (CompareAndSwap guard); the // log line tells operators we're behind so they can pick a longer // interval. This mirrors every other GC / sweep loop in the file. func (s *Scheduler) auditChainVerifyLoop(ctx context.Context) { ticker := NewJitteredTicker(s.auditChainVerifyInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run once immediately on start so a freshly-deployed instance // gets a baseline metric reading + surfaces tampering on the first // post-restart tick rather than after the first full interval. s.runAuditChainVerify(ctx) for { select { case <-ctx.Done(): return case <-ticker.C: s.runAuditChainVerify(ctx) } } } // userRetentionLoop is the Sprint 6 COMP-002-RETENTION sweeper. Every // CERTCTL_USER_RETENTION_INTERVAL tick it asks // UserRetentionService.PurgeDeactivatedUsers to walk every user whose // deactivated_at is older than the retention window and scrub the PII // columns. The service is responsible for the row-level work + audit // emission; the loop only orchestrates cadence + concurrency control. // // Mirrors the GC-loop pattern: atomic.Bool guard prevents overlapping // ticks; per-tick context.WithTimeout caps the worst case at 5 // minutes. The retention service's purgeBatchCap (default 200) is the // inner-loop budget — large backlogs spread across multiple ticks. func (s *Scheduler) userRetentionLoop(ctx context.Context) { ticker := NewJitteredTicker(s.userRetentionInterval, DefaultSchedulerJitter) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: if !s.userRetentionRunning.CompareAndSwap(false, true) { s.logger.Warn("user retention purge still running, skipping tick") continue } s.wg.Add(1) go func() { defer s.wg.Done() defer s.userRetentionRunning.Store(false) opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() purged, failed, err := s.userRetention.PurgeDeactivatedUsers(opCtx) if err != nil { s.logger.Warn("user retention purge failed (next tick will retry)", "error", err) return } if purged > 0 || failed > 0 { s.logger.Info("user retention purge tick", "purged", purged, "failed", failed) } }() } } } // runAuditChainVerify executes a single chain-verify pass with the // atomic.Bool + WithTimeout + goroutine pattern every other GC loop // uses. Extracted so the loop body + the "run once on start" path // share one implementation. func (s *Scheduler) runAuditChainVerify(ctx context.Context) { if !s.auditChainVerifyRunning.CompareAndSwap(false, true) { s.logger.Warn("audit chain verify still running, skipping tick") return } s.wg.Add(1) go func() { defer s.wg.Done() defer s.auditChainVerifyRunning.Store(false) // 5-minute timeout — chain walk is O(N) over the full // audit_events table; large fleets may want a longer interval // but the per-tick deadline keeps a runaway walk from blocking // the next tick indefinitely. opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() brokenID, brokenPos, rowCount, err := s.auditChainVerifier.VerifyHashChain(opCtx) if err != nil { s.logger.Warn("audit chain verify failed (next tick will retry)", "error", err) return } if brokenID != "" { s.logger.Error("audit chain break detected — tamper-evidence trigger fired", "broken_at_id", brokenID, "broken_at_pos", brokenPos, "row_count", rowCount) if s.auditChainRecorder != nil { s.auditChainRecorder.RecordBreak(brokenID, brokenPos) } return } s.logger.Debug("audit chain verify clean", "rows", rowCount) if s.auditChainRecorder != nil { s.auditChainRecorder.RecordSuccess(rowCount) } }() }