mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-12 18:18:51 +00:00
scheduler/service: crlGenerationLoop + CRLCacheService with singleflight
Phase 3 of the CRL/OCSP responder bundle. Adds the scheduler-driven
pre-generation pipeline that lets the /.well-known/pki/crl/{issuer_id}
HTTP handler (Phase 4) serve from cache instead of regenerating per
request.
What landed:
* internal/scheduler/scheduler.go:
- CRLCacheServicer interface (RegenerateAll(ctx))
- Scheduler struct gains crlCacheService + crlGenerationInterval +
crlGenerationRunning fields; default interval 1h
- SetCRLCacheService + SetCRLGenerationInterval setters following
the existing Set* convention (cloudDiscovery, digest, etc.)
- Wired into Start: optional loop, gated on crlCacheService != nil
- crlGenerationLoop: ticker + atomic.Bool re-entry guard +
WaitGroup integration mirroring digestLoop
- runCRLGeneration: 5-minute timeout per cycle; per-issuer
failures are caught inside RegenerateAll itself
* internal/service/crl_cache.go — CRLCacheService:
- Get(ctx, issuerID) → (der, thisUpdate, err)
cache hit → DB read; miss/stale → singleflight regenerate
- RegenerateAll(ctx) — walks every issuer in registry; per-issuer
failures logged + audited (crl_generation_events) but don't
abort the cycle
- In-tree singleflight gate (~30 LoC, sync.Map[issuerID]*flightEntry)
— collapses concurrent miss requests for the same issuer into
one underlying generation. No new dep on golang.org/x/sync
- Uses existing CAOperationsSvc.GenerateDERCRL for the heavy work
(no duplication of CRL-build logic); parses returned DER to
recover thisUpdate / nextUpdate / number / count
- Failure-event recording is best-effort (failure to record does
not fail the operation) — events are an audit aid, not a gate
* internal/service/crl_cache_test.go — 8 tests:
- Cache hit, miss, staleness paths
- RegenerateAll happy + cancelled ctx
- Singleflight: 20 concurrent misses → 1 generation
- Failure event recording when issuer is missing from registry
- Nil cache repo returns error
Coverage: service 73.5% (floor 70), scheduler 78.1% (floor 60).
Backward compat: unchanged for any caller that doesn't call
SetCRLCacheService. cmd/server/main.go wiring lands in Phase 4
alongside the POST OCSP endpoint + handler refactor to consult
the cache.
This commit is contained in:
@@ -64,6 +64,19 @@ type CloudDiscoveryServicer interface {
|
||||
DiscoverAll(ctx context.Context) (int, []error)
|
||||
}
|
||||
|
||||
// CRLCacheServicer defines the interface for the scheduler's CRL
|
||||
// pre-generation loop. RegenerateAll iterates every issuer that
|
||||
// supports CRL signing and refreshes its crl_cache row. Per-issuer
|
||||
// failures are logged + audited; a single bad issuer does not stop
|
||||
// the others.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3: the scheduler-driven cache lets
|
||||
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint serve from cache
|
||||
// instead of regenerating per request.
|
||||
type CRLCacheServicer interface {
|
||||
RegenerateAll(ctx context.Context)
|
||||
}
|
||||
|
||||
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
|
||||
type JobReaperService interface {
|
||||
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
|
||||
@@ -87,6 +100,7 @@ type Scheduler struct {
|
||||
digestService DigestServicer
|
||||
healthCheckService HealthCheckServicer
|
||||
cloudDiscoveryService CloudDiscoveryServicer
|
||||
crlCacheService CRLCacheServicer
|
||||
jobReaper JobReaperService
|
||||
logger *slog.Logger
|
||||
|
||||
@@ -102,12 +116,13 @@ type Scheduler struct {
|
||||
digestInterval time.Duration
|
||||
healthCheckInterval time.Duration
|
||||
cloudDiscoveryInterval time.Duration
|
||||
crlGenerationInterval time.Duration
|
||||
jobTimeoutInterval time.Duration
|
||||
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
|
||||
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
|
||||
agentOfflineJobTTL time.Duration
|
||||
awaitingCSRTimeout time.Duration
|
||||
awaitingApprovalTimeout time.Duration
|
||||
agentOfflineJobTTL time.Duration
|
||||
awaitingCSRTimeout time.Duration
|
||||
awaitingApprovalTimeout time.Duration
|
||||
|
||||
// Idempotency guards: prevent duplicate execution of slow jobs
|
||||
renewalCheckRunning atomic.Bool
|
||||
@@ -121,6 +136,7 @@ type Scheduler struct {
|
||||
digestRunning atomic.Bool
|
||||
healthCheckRunning atomic.Bool
|
||||
cloudDiscoveryRunning atomic.Bool
|
||||
crlGenerationRunning atomic.Bool
|
||||
jobTimeoutRunning atomic.Bool
|
||||
|
||||
// Graceful shutdown: wait for in-flight work to complete
|
||||
@@ -156,6 +172,7 @@ func NewScheduler(
|
||||
digestInterval: 24 * time.Hour,
|
||||
healthCheckInterval: 60 * time.Second,
|
||||
cloudDiscoveryInterval: 6 * time.Hour,
|
||||
crlGenerationInterval: 1 * time.Hour,
|
||||
jobTimeoutInterval: 10 * time.Minute,
|
||||
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
|
||||
// must miss multiple heartbeats before its in-flight jobs are reaped.
|
||||
@@ -240,6 +257,31 @@ func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
|
||||
s.cloudDiscoveryInterval = d
|
||||
}
|
||||
|
||||
// SetCRLCacheService sets the CRL cache service for the crlGenerationLoop.
|
||||
// Called after construction since the loop is optional — when this is
|
||||
// unset, no pre-generation happens and HTTP CRL fetches go through the
|
||||
// on-demand path.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3.
|
||||
func (s *Scheduler) SetCRLCacheService(svc CRLCacheServicer) {
|
||||
s.crlCacheService = svc
|
||||
}
|
||||
|
||||
// SetCRLGenerationInterval configures the interval at which the
|
||||
// scheduler regenerates CRLs into the crl_cache table. Default 1h
|
||||
// (matches relying-party CRL refresh expectations under RFC 5280).
|
||||
// Operators with chatty fleets can shorten; operators with bandwidth
|
||||
// constraints can lengthen as long as nextUpdate stays comfortably in
|
||||
// the future per generation.
|
||||
//
|
||||
// Zero or negative values are ignored.
|
||||
func (s *Scheduler) SetCRLGenerationInterval(d time.Duration) {
|
||||
if d <= 0 {
|
||||
return
|
||||
}
|
||||
s.crlGenerationInterval = d
|
||||
}
|
||||
|
||||
// SetJobReaperService sets the job reaper service (I-003).
|
||||
func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
|
||||
s.jobReaper = jr
|
||||
@@ -297,6 +339,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
||||
if s.cloudDiscoveryService != nil {
|
||||
loopCount++
|
||||
}
|
||||
if s.crlCacheService != nil {
|
||||
loopCount++
|
||||
}
|
||||
s.wg.Add(loopCount)
|
||||
|
||||
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
|
||||
@@ -319,6 +364,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
||||
if s.cloudDiscoveryService != nil {
|
||||
go func() { defer s.wg.Done(); s.cloudDiscoveryLoop(ctx) }()
|
||||
}
|
||||
if s.crlCacheService != nil {
|
||||
go func() { defer s.wg.Done(); s.crlGenerationLoop(ctx) }()
|
||||
}
|
||||
|
||||
// Signal that all loops are launched
|
||||
close(startedChan)
|
||||
@@ -975,5 +1023,54 @@ func (s *Scheduler) WaitForCompletion(timeout time.Duration) error {
|
||||
}
|
||||
}
|
||||
|
||||
// crlGenerationLoop periodically pre-generates CRLs into crl_cache so
|
||||
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint can serve from
|
||||
// cache rather than regenerating per request. Mirrors the digestLoop
|
||||
// shape: ticker, atomic.Bool guard for re-entry, WaitGroup integration
|
||||
// for graceful shutdown.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3.
|
||||
func (s *Scheduler) crlGenerationLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(s.crlGenerationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Do NOT run immediately on start. CRLs are typically valid for
|
||||
// many hours; firing on every restart wastes work. The first tick
|
||||
// arrives after one interval; on cache miss the HTTP handler
|
||||
// triggers an immediate generation via the cache service.
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if !s.crlGenerationRunning.CompareAndSwap(false, true) {
|
||||
s.logger.Warn("CRL pre-generation still running, skipping tick")
|
||||
continue
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
defer s.crlGenerationRunning.Store(false)
|
||||
s.runCRLGeneration(ctx)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runCRLGeneration executes a single CRL pre-generation cycle with
|
||||
// error recovery. Per-issuer failures inside RegenerateAll are logged
|
||||
// + audited by the cache service itself; this wrapper only reports the
|
||||
// outer context shape and bumps a metric (when wired).
|
||||
func (s *Scheduler) runCRLGeneration(ctx context.Context) {
|
||||
// 5-minute timeout: the per-issuer generation is fast (sub-second
|
||||
// for most CAs), but the loop walks every issuer that supports
|
||||
// CRL. Bound the total cycle so a stuck issuer cannot block the
|
||||
// next tick.
|
||||
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
s.crlCacheService.RegenerateAll(opCtx)
|
||||
}
|
||||
|
||||
// ErrSchedulerShutdownTimeout is returned when scheduler graceful shutdown times out.
|
||||
var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeout")
|
||||
|
||||
Reference in New Issue
Block a user