mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 16:21:30 +00:00
scheduler/service: crlGenerationLoop + CRLCacheService with singleflight
Phase 3 of the CRL/OCSP responder bundle. Adds the scheduler-driven
pre-generation pipeline that lets the /.well-known/pki/crl/{issuer_id}
HTTP handler (Phase 4) serve from cache instead of regenerating per
request.
What landed:
* internal/scheduler/scheduler.go:
- CRLCacheServicer interface (RegenerateAll(ctx))
- Scheduler struct gains crlCacheService + crlGenerationInterval +
crlGenerationRunning fields; default interval 1h
- SetCRLCacheService + SetCRLGenerationInterval setters following
the existing Set* convention (cloudDiscovery, digest, etc.)
- Wired into Start: optional loop, gated on crlCacheService != nil
- crlGenerationLoop: ticker + atomic.Bool re-entry guard +
WaitGroup integration mirroring digestLoop
- runCRLGeneration: 5-minute timeout per cycle; per-issuer
failures are caught inside RegenerateAll itself
* internal/service/crl_cache.go — CRLCacheService:
- Get(ctx, issuerID) → (der, thisUpdate, err)
cache hit → DB read; miss/stale → singleflight regenerate
- RegenerateAll(ctx) — walks every issuer in registry; per-issuer
failures logged + audited (crl_generation_events) but don't
abort the cycle
- In-tree singleflight gate (~30 LoC, sync.Map[issuerID]*flightEntry)
— collapses concurrent miss requests for the same issuer into
one underlying generation. No new dep on golang.org/x/sync
- Uses existing CAOperationsSvc.GenerateDERCRL for the heavy work
(no duplication of CRL-build logic); parses returned DER to
recover thisUpdate / nextUpdate / number / count
- Failure-event recording is best-effort (failure to record does
not fail the operation) — events are an audit aid, not a gate
* internal/service/crl_cache_test.go — 8 tests:
- Cache hit, miss, staleness paths
- RegenerateAll happy + cancelled ctx
- Singleflight: 20 concurrent misses → 1 generation
- Failure event recording when issuer is missing from registry
- Nil cache repo returns error
Coverage: service 73.5% (floor 70), scheduler 78.1% (floor 60).
Backward compat: unchanged for any caller that doesn't call
SetCRLCacheService. cmd/server/main.go wiring lands in Phase 4
alongside the POST OCSP endpoint + handler refactor to consult
the cache.
This commit is contained in:
@@ -64,6 +64,19 @@ type CloudDiscoveryServicer interface {
|
||||
DiscoverAll(ctx context.Context) (int, []error)
|
||||
}
|
||||
|
||||
// CRLCacheServicer defines the interface for the scheduler's CRL
|
||||
// pre-generation loop. RegenerateAll iterates every issuer that
|
||||
// supports CRL signing and refreshes its crl_cache row. Per-issuer
|
||||
// failures are logged + audited; a single bad issuer does not stop
|
||||
// the others.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3: the scheduler-driven cache lets
|
||||
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint serve from cache
|
||||
// instead of regenerating per request.
|
||||
type CRLCacheServicer interface {
|
||||
RegenerateAll(ctx context.Context)
|
||||
}
|
||||
|
||||
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
|
||||
type JobReaperService interface {
|
||||
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
|
||||
@@ -87,6 +100,7 @@ type Scheduler struct {
|
||||
digestService DigestServicer
|
||||
healthCheckService HealthCheckServicer
|
||||
cloudDiscoveryService CloudDiscoveryServicer
|
||||
crlCacheService CRLCacheServicer
|
||||
jobReaper JobReaperService
|
||||
logger *slog.Logger
|
||||
|
||||
@@ -102,12 +116,13 @@ type Scheduler struct {
|
||||
digestInterval time.Duration
|
||||
healthCheckInterval time.Duration
|
||||
cloudDiscoveryInterval time.Duration
|
||||
crlGenerationInterval time.Duration
|
||||
jobTimeoutInterval time.Duration
|
||||
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
|
||||
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
|
||||
agentOfflineJobTTL time.Duration
|
||||
awaitingCSRTimeout time.Duration
|
||||
awaitingApprovalTimeout time.Duration
|
||||
agentOfflineJobTTL time.Duration
|
||||
awaitingCSRTimeout time.Duration
|
||||
awaitingApprovalTimeout time.Duration
|
||||
|
||||
// Idempotency guards: prevent duplicate execution of slow jobs
|
||||
renewalCheckRunning atomic.Bool
|
||||
@@ -121,6 +136,7 @@ type Scheduler struct {
|
||||
digestRunning atomic.Bool
|
||||
healthCheckRunning atomic.Bool
|
||||
cloudDiscoveryRunning atomic.Bool
|
||||
crlGenerationRunning atomic.Bool
|
||||
jobTimeoutRunning atomic.Bool
|
||||
|
||||
// Graceful shutdown: wait for in-flight work to complete
|
||||
@@ -156,6 +172,7 @@ func NewScheduler(
|
||||
digestInterval: 24 * time.Hour,
|
||||
healthCheckInterval: 60 * time.Second,
|
||||
cloudDiscoveryInterval: 6 * time.Hour,
|
||||
crlGenerationInterval: 1 * time.Hour,
|
||||
jobTimeoutInterval: 10 * time.Minute,
|
||||
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
|
||||
// must miss multiple heartbeats before its in-flight jobs are reaped.
|
||||
@@ -240,6 +257,31 @@ func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
|
||||
s.cloudDiscoveryInterval = d
|
||||
}
|
||||
|
||||
// SetCRLCacheService sets the CRL cache service for the crlGenerationLoop.
|
||||
// Called after construction since the loop is optional — when this is
|
||||
// unset, no pre-generation happens and HTTP CRL fetches go through the
|
||||
// on-demand path.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3.
|
||||
func (s *Scheduler) SetCRLCacheService(svc CRLCacheServicer) {
|
||||
s.crlCacheService = svc
|
||||
}
|
||||
|
||||
// SetCRLGenerationInterval configures the interval at which the
|
||||
// scheduler regenerates CRLs into the crl_cache table. Default 1h
|
||||
// (matches relying-party CRL refresh expectations under RFC 5280).
|
||||
// Operators with chatty fleets can shorten; operators with bandwidth
|
||||
// constraints can lengthen as long as nextUpdate stays comfortably in
|
||||
// the future per generation.
|
||||
//
|
||||
// Zero or negative values are ignored.
|
||||
func (s *Scheduler) SetCRLGenerationInterval(d time.Duration) {
|
||||
if d <= 0 {
|
||||
return
|
||||
}
|
||||
s.crlGenerationInterval = d
|
||||
}
|
||||
|
||||
// SetJobReaperService sets the job reaper service (I-003).
|
||||
func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
|
||||
s.jobReaper = jr
|
||||
@@ -297,6 +339,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
||||
if s.cloudDiscoveryService != nil {
|
||||
loopCount++
|
||||
}
|
||||
if s.crlCacheService != nil {
|
||||
loopCount++
|
||||
}
|
||||
s.wg.Add(loopCount)
|
||||
|
||||
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
|
||||
@@ -319,6 +364,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
||||
if s.cloudDiscoveryService != nil {
|
||||
go func() { defer s.wg.Done(); s.cloudDiscoveryLoop(ctx) }()
|
||||
}
|
||||
if s.crlCacheService != nil {
|
||||
go func() { defer s.wg.Done(); s.crlGenerationLoop(ctx) }()
|
||||
}
|
||||
|
||||
// Signal that all loops are launched
|
||||
close(startedChan)
|
||||
@@ -975,5 +1023,54 @@ func (s *Scheduler) WaitForCompletion(timeout time.Duration) error {
|
||||
}
|
||||
}
|
||||
|
||||
// crlGenerationLoop periodically pre-generates CRLs into crl_cache so
|
||||
// the /.well-known/pki/crl/{issuer_id} HTTP endpoint can serve from
|
||||
// cache rather than regenerating per request. Mirrors the digestLoop
|
||||
// shape: ticker, atomic.Bool guard for re-entry, WaitGroup integration
|
||||
// for graceful shutdown.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3.
|
||||
func (s *Scheduler) crlGenerationLoop(ctx context.Context) {
|
||||
ticker := time.NewTicker(s.crlGenerationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Do NOT run immediately on start. CRLs are typically valid for
|
||||
// many hours; firing on every restart wastes work. The first tick
|
||||
// arrives after one interval; on cache miss the HTTP handler
|
||||
// triggers an immediate generation via the cache service.
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if !s.crlGenerationRunning.CompareAndSwap(false, true) {
|
||||
s.logger.Warn("CRL pre-generation still running, skipping tick")
|
||||
continue
|
||||
}
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
defer s.crlGenerationRunning.Store(false)
|
||||
s.runCRLGeneration(ctx)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runCRLGeneration executes a single CRL pre-generation cycle with
|
||||
// error recovery. Per-issuer failures inside RegenerateAll are logged
|
||||
// + audited by the cache service itself; this wrapper only reports the
|
||||
// outer context shape and bumps a metric (when wired).
|
||||
func (s *Scheduler) runCRLGeneration(ctx context.Context) {
|
||||
// 5-minute timeout: the per-issuer generation is fast (sub-second
|
||||
// for most CAs), but the loop walks every issuer that supports
|
||||
// CRL. Bound the total cycle so a stuck issuer cannot block the
|
||||
// next tick.
|
||||
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
s.crlCacheService.RegenerateAll(opCtx)
|
||||
}
|
||||
|
||||
// ErrSchedulerShutdownTimeout is returned when scheduler graceful shutdown times out.
|
||||
var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeout")
|
||||
|
||||
@@ -0,0 +1,270 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/shankar0123/certctl/internal/domain"
|
||||
"github.com/shankar0123/certctl/internal/repository"
|
||||
)
|
||||
|
||||
// CRLCacheService is the read-through + scheduler-driven cache layer
|
||||
// for pre-generated CRLs. The HTTP handler at
|
||||
// /.well-known/pki/crl/{issuer_id} reads via Get; the
|
||||
// scheduler.crlGenerationLoop drives RegenerateAll on a tick.
|
||||
//
|
||||
// Bundle CRL/OCSP-Responder Phase 3.
|
||||
//
|
||||
// Concurrency model:
|
||||
//
|
||||
// - The cache row is the source of truth (one row per issuer).
|
||||
// - Get returns the cached row when fresh; on miss / staleness it
|
||||
// calls regenerateOne behind a singleflight gate keyed by issuer
|
||||
// ID so concurrent miss requests for the same issuer collapse to
|
||||
// a single underlying generation call.
|
||||
// - RegenerateAll iterates every issuer in the registry, calling
|
||||
// regenerateOne for each. Per-issuer failures are logged + audited
|
||||
// via crl_generation_events; one bad issuer does not stop the
|
||||
// others.
|
||||
// - The CA-side CRL generation (caSvc.GenerateDERCRL → issuer
|
||||
// connector.GenerateCRL) is unchanged. This service is additive:
|
||||
// it persists results, surfaces them via Get, and tracks events.
|
||||
type CRLCacheService struct {
|
||||
cacheRepo repository.CRLCacheRepository
|
||||
caSvc *CAOperationsSvc
|
||||
registry *IssuerRegistry
|
||||
logger *slog.Logger
|
||||
|
||||
// singleflight collapses concurrent regeneration requests for the
|
||||
// same issuer ID. A simpler alternative to vendoring
|
||||
// golang.org/x/sync/singleflight; this in-tree version is ~30 LoC
|
||||
// and matches the project's "no new deps unless necessary" rule.
|
||||
flight sync.Map // issuerID → *flightEntry
|
||||
}
|
||||
|
||||
// flightEntry coordinates a single in-flight generation across
|
||||
// concurrent callers. The first arrival kicks off the work; later
|
||||
// arrivals wait on done and read the shared result. Pattern matches
|
||||
// golang.org/x/sync/singleflight semantics for the single-call case
|
||||
// (we don't need the multi-result Forget capability here).
|
||||
type flightEntry struct {
|
||||
done chan struct{}
|
||||
result *domain.CRLCacheEntry
|
||||
err error
|
||||
}
|
||||
|
||||
// NewCRLCacheService constructs a cache service. caSvc must already
|
||||
// have its issuer registry wired (CAOperationsSvc.SetIssuerRegistry).
|
||||
func NewCRLCacheService(
|
||||
cacheRepo repository.CRLCacheRepository,
|
||||
caSvc *CAOperationsSvc,
|
||||
registry *IssuerRegistry,
|
||||
logger *slog.Logger,
|
||||
) *CRLCacheService {
|
||||
return &CRLCacheService{
|
||||
cacheRepo: cacheRepo,
|
||||
caSvc: caSvc,
|
||||
registry: registry,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the cached CRL DER + thisUpdate timestamp for an issuer.
|
||||
// On cache hit the path is purely a DB read (~ms). On miss or
|
||||
// staleness (next_update in the past), Get triggers an immediate
|
||||
// regeneration via the singleflight gate so concurrent requests
|
||||
// collapse to one underlying call.
|
||||
func (s *CRLCacheService) Get(ctx context.Context, issuerID string) ([]byte, time.Time, error) {
|
||||
if s.cacheRepo == nil {
|
||||
return nil, time.Time{}, errors.New("crl_cache service: cache repo not configured")
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
entry, err := s.cacheRepo.Get(ctx, issuerID)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, fmt.Errorf("crl_cache service get %q: %w", issuerID, err)
|
||||
}
|
||||
if entry != nil && !entry.IsStale(now) {
|
||||
return entry.CRLDER, entry.ThisUpdate, nil
|
||||
}
|
||||
|
||||
// Miss or stale → regenerate behind the singleflight gate.
|
||||
fresh, err := s.regenerateOne(ctx, issuerID)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, err
|
||||
}
|
||||
return fresh.CRLDER, fresh.ThisUpdate, nil
|
||||
}
|
||||
|
||||
// RegenerateAll walks every issuer in the registry, calling
|
||||
// regenerateOne for each. Per-issuer failures are logged + audited
|
||||
// (via crl_generation_events); a single bad issuer does not stop
|
||||
// the others. Called by scheduler.crlGenerationLoop on each tick.
|
||||
//
|
||||
// Issuers whose connector returns nil from GenerateCRL (e.g., ACME,
|
||||
// Vault PKI, DigiCert — they manage their own CRL distribution) are
|
||||
// skipped silently; the regenerateOne path detects nil and treats it
|
||||
// as "no CRL to cache" rather than an error.
|
||||
func (s *CRLCacheService) RegenerateAll(ctx context.Context) {
|
||||
if s.registry == nil {
|
||||
s.logger.Warn("CRL cache RegenerateAll: registry not configured; nothing to do")
|
||||
return
|
||||
}
|
||||
|
||||
issuers := s.registry.List()
|
||||
for issuerID := range issuers {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.logger.Warn("CRL cache RegenerateAll: ctx cancelled mid-cycle",
|
||||
"completed", issuerID)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
if _, err := s.regenerateOne(ctx, issuerID); err != nil {
|
||||
// regenerateOne already logs + audits the failure; log here
|
||||
// only at debug level to avoid double-noise.
|
||||
s.logger.Debug("CRL cache RegenerateAll: per-issuer failure",
|
||||
"issuer_id", issuerID, "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// regenerateOne is the singleflight-gated worker. The first concurrent
|
||||
// call for an issuer ID executes the generation; later calls block on
|
||||
// the in-flight entry's done channel and return the same result.
|
||||
//
|
||||
// The gate is released in a defer so callers can rely on subsequent
|
||||
// calls (after the result is observed) starting a fresh generation.
|
||||
func (s *CRLCacheService) regenerateOne(ctx context.Context, issuerID string) (*domain.CRLCacheEntry, error) {
|
||||
// Check for an in-flight generation. LoadOrStore atomically:
|
||||
// - If absent: stores our entry as the in-flight one and returns
|
||||
// it; we kick off the work.
|
||||
// - If present: returns the existing entry; we wait on it.
|
||||
mine := &flightEntry{done: make(chan struct{})}
|
||||
actual, loaded := s.flight.LoadOrStore(issuerID, mine)
|
||||
entry := actual.(*flightEntry)
|
||||
|
||||
if loaded {
|
||||
// Another goroutine is already generating. Wait for them.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-entry.done:
|
||||
}
|
||||
if entry.err != nil {
|
||||
return nil, entry.err
|
||||
}
|
||||
return entry.result, nil
|
||||
}
|
||||
|
||||
// We are the leader; do the work and signal others on done.
|
||||
defer func() {
|
||||
s.flight.Delete(issuerID)
|
||||
close(mine.done)
|
||||
}()
|
||||
|
||||
mine.result, mine.err = s.doRegenerate(ctx, issuerID)
|
||||
return mine.result, mine.err
|
||||
}
|
||||
|
||||
// doRegenerate is the actual work: ask CAOperationsSvc to build the
|
||||
// CRL DER, parse it to recover thisUpdate/nextUpdate, persist into
|
||||
// crl_cache, and record an audit event in crl_generation_events.
|
||||
func (s *CRLCacheService) doRegenerate(ctx context.Context, issuerID string) (*domain.CRLCacheEntry, error) {
|
||||
if s.caSvc == nil {
|
||||
return nil, errors.New("crl_cache service: caSvc not configured")
|
||||
}
|
||||
|
||||
startedAt := time.Now().UTC()
|
||||
|
||||
// Build the CRL via the existing on-demand path.
|
||||
derBytes, err := s.caSvc.GenerateDERCRL(ctx, issuerID)
|
||||
if err != nil {
|
||||
s.recordEvent(ctx, &domain.CRLGenerationEvent{
|
||||
IssuerID: issuerID,
|
||||
StartedAt: startedAt,
|
||||
Duration: time.Since(startedAt),
|
||||
Succeeded: false,
|
||||
Error: err.Error(),
|
||||
})
|
||||
return nil, fmt.Errorf("crl_cache service generate %q: %w", issuerID, err)
|
||||
}
|
||||
|
||||
// Parse to extract thisUpdate / nextUpdate / number / count.
|
||||
parsed, perr := x509.ParseRevocationList(derBytes)
|
||||
if perr != nil {
|
||||
s.recordEvent(ctx, &domain.CRLGenerationEvent{
|
||||
IssuerID: issuerID,
|
||||
StartedAt: startedAt,
|
||||
Duration: time.Since(startedAt),
|
||||
Succeeded: false,
|
||||
Error: "parse generated CRL: " + perr.Error(),
|
||||
})
|
||||
return nil, fmt.Errorf("crl_cache service parse %q: %w", issuerID, perr)
|
||||
}
|
||||
|
||||
crlNumber := int64(0)
|
||||
if parsed.Number != nil {
|
||||
crlNumber = parsed.Number.Int64()
|
||||
}
|
||||
|
||||
entry := &domain.CRLCacheEntry{
|
||||
IssuerID: issuerID,
|
||||
CRLDER: derBytes,
|
||||
CRLNumber: crlNumber,
|
||||
ThisUpdate: parsed.ThisUpdate,
|
||||
NextUpdate: parsed.NextUpdate,
|
||||
GeneratedAt: startedAt,
|
||||
GenerationDuration: time.Since(startedAt),
|
||||
RevokedCount: len(parsed.RevokedCertificateEntries),
|
||||
}
|
||||
if err := s.cacheRepo.Put(ctx, entry); err != nil {
|
||||
s.recordEvent(ctx, &domain.CRLGenerationEvent{
|
||||
IssuerID: issuerID,
|
||||
CRLNumber: crlNumber,
|
||||
StartedAt: startedAt,
|
||||
Duration: time.Since(startedAt),
|
||||
Succeeded: false,
|
||||
Error: "persist cache row: " + err.Error(),
|
||||
})
|
||||
return nil, fmt.Errorf("crl_cache service persist %q: %w", issuerID, err)
|
||||
}
|
||||
|
||||
s.recordEvent(ctx, &domain.CRLGenerationEvent{
|
||||
IssuerID: issuerID,
|
||||
CRLNumber: crlNumber,
|
||||
Duration: entry.GenerationDuration,
|
||||
RevokedCount: entry.RevokedCount,
|
||||
StartedAt: startedAt,
|
||||
Succeeded: true,
|
||||
})
|
||||
|
||||
s.logger.Info("CRL pre-generated and cached",
|
||||
"issuer_id", issuerID,
|
||||
"crl_number", crlNumber,
|
||||
"revoked_count", entry.RevokedCount,
|
||||
"this_update", entry.ThisUpdate,
|
||||
"next_update", entry.NextUpdate,
|
||||
"duration_ms", entry.GenerationDuration.Milliseconds())
|
||||
return entry, nil
|
||||
}
|
||||
|
||||
// recordEvent persists a generation event but does NOT propagate
|
||||
// failure-to-record back to the caller — the event log is a
|
||||
// best-effort audit trail; missing it should not turn a successful
|
||||
// CRL generation into an error.
|
||||
func (s *CRLCacheService) recordEvent(ctx context.Context, evt *domain.CRLGenerationEvent) {
|
||||
if s.cacheRepo == nil {
|
||||
return
|
||||
}
|
||||
if err := s.cacheRepo.RecordGenerationEvent(ctx, evt); err != nil {
|
||||
s.logger.Warn("crl_cache service: failed to record generation event",
|
||||
"issuer_id", evt.IssuerID, "error", err)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,321 @@
|
||||
package service_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/shankar0123/certctl/internal/connector/issuer"
|
||||
localissuer "github.com/shankar0123/certctl/internal/connector/issuer/local"
|
||||
"github.com/shankar0123/certctl/internal/domain"
|
||||
"github.com/shankar0123/certctl/internal/service"
|
||||
)
|
||||
|
||||
// fakeCRLCacheRepo is an in-memory repository for CRLCacheService
|
||||
// tests. The Postgres impl is covered by the testcontainers tests in
|
||||
// internal/repository/postgres/crl_cache_test.go (CI only — needs Docker).
|
||||
type fakeCRLCacheRepo struct {
|
||||
mu sync.Mutex
|
||||
rows map[string]*domain.CRLCacheEntry
|
||||
events []*domain.CRLGenerationEvent
|
||||
getCount int
|
||||
putCount int
|
||||
}
|
||||
|
||||
func newFakeCRLCacheRepo() *fakeCRLCacheRepo {
|
||||
return &fakeCRLCacheRepo{rows: map[string]*domain.CRLCacheEntry{}}
|
||||
}
|
||||
|
||||
func (r *fakeCRLCacheRepo) Get(_ context.Context, issuerID string) (*domain.CRLCacheEntry, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.getCount++
|
||||
if entry, ok := r.rows[issuerID]; ok {
|
||||
copy := *entry
|
||||
return ©, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (r *fakeCRLCacheRepo) Put(_ context.Context, entry *domain.CRLCacheEntry) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.putCount++
|
||||
copy := *entry
|
||||
r.rows[entry.IssuerID] = ©
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *fakeCRLCacheRepo) NextCRLNumber(_ context.Context, issuerID string) (int64, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if entry, ok := r.rows[issuerID]; ok {
|
||||
return entry.CRLNumber + 1, nil
|
||||
}
|
||||
return 1, nil
|
||||
}
|
||||
|
||||
func (r *fakeCRLCacheRepo) RecordGenerationEvent(_ context.Context, evt *domain.CRLGenerationEvent) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
copy := *evt
|
||||
r.events = append(r.events, ©)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *fakeCRLCacheRepo) ListGenerationEvents(_ context.Context, issuerID string, limit int) ([]*domain.CRLGenerationEvent, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
var out []*domain.CRLGenerationEvent
|
||||
for _, evt := range r.events {
|
||||
if evt.IssuerID == issuerID {
|
||||
copy := *evt
|
||||
out = append(out, ©)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// fakeRevocationRepo is the minimal shape CAOperationsSvc needs:
|
||||
// returning revocations by issuer. The cache service walks
|
||||
// CAOperationsSvc.GenerateDERCRL, which calls into this.
|
||||
type fakeRevocationRepo struct{}
|
||||
|
||||
func (fakeRevocationRepo) Create(context.Context, *domain.CertificateRevocation) error {
|
||||
return nil
|
||||
}
|
||||
func (fakeRevocationRepo) GetByIssuerAndSerial(context.Context, string, string) (*domain.CertificateRevocation, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (fakeRevocationRepo) ListAll(context.Context) ([]*domain.CertificateRevocation, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (fakeRevocationRepo) ListByIssuer(_ context.Context, issuerID string) ([]*domain.CertificateRevocation, error) {
|
||||
// Empty list = no revoked certs; the issuer connector still
|
||||
// produces a valid empty CRL (RFC 5280 allows zero entries).
|
||||
return nil, nil
|
||||
}
|
||||
func (fakeRevocationRepo) ListByCertificate(context.Context, string) ([]*domain.CertificateRevocation, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (fakeRevocationRepo) MarkIssuerNotified(context.Context, string) error { return nil }
|
||||
|
||||
// helper: spin up a CAOperationsSvc + IssuerRegistry wired with a real
|
||||
// local issuer connector. The local issuer's GenerateCRL produces a
|
||||
// real DER-encoded CRL that the cache service can parse + persist.
|
||||
func newCacheServiceFixture(t *testing.T) (svc *service.CRLCacheService, repo *fakeCRLCacheRepo, registry *service.IssuerRegistry) {
|
||||
t.Helper()
|
||||
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
repo = newFakeCRLCacheRepo()
|
||||
|
||||
// Real local issuer — produces a real CRL on GenerateCRL.
|
||||
localConn := localissuer.New(&localissuer.Config{
|
||||
CACommonName: "Test Cache CA",
|
||||
ValidityDays: 30,
|
||||
}, logger)
|
||||
|
||||
registry = service.NewIssuerRegistry(logger)
|
||||
registry.Set("iss-cache-test", service.NewIssuerConnectorAdapter(localConn))
|
||||
|
||||
caSvc := service.NewCAOperationsSvc(fakeRevocationRepo{}, nil, nil)
|
||||
caSvc.SetIssuerRegistry(registry)
|
||||
|
||||
svc = service.NewCRLCacheService(repo, caSvc, registry, logger)
|
||||
return
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Get: cache hit, miss, staleness
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestCRLCacheService_Get_MissTriggersGeneration(t *testing.T) {
|
||||
svc, repo, _ := newCacheServiceFixture(t)
|
||||
ctx := context.Background()
|
||||
|
||||
der, thisUpdate, err := svc.Get(ctx, "iss-cache-test")
|
||||
if err != nil {
|
||||
t.Fatalf("Get: %v", err)
|
||||
}
|
||||
if len(der) == 0 {
|
||||
t.Fatal("Get returned empty DER")
|
||||
}
|
||||
if thisUpdate.IsZero() {
|
||||
t.Fatal("ThisUpdate is zero")
|
||||
}
|
||||
if repo.putCount != 1 {
|
||||
t.Errorf("putCount = %d, want 1 (miss should trigger one generation)", repo.putCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRLCacheService_Get_HitSkipsGeneration(t *testing.T) {
|
||||
svc, repo, _ := newCacheServiceFixture(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Prime the cache.
|
||||
if _, _, err := svc.Get(ctx, "iss-cache-test"); err != nil {
|
||||
t.Fatalf("prime: %v", err)
|
||||
}
|
||||
if repo.putCount != 1 {
|
||||
t.Fatalf("prime: putCount = %d, want 1", repo.putCount)
|
||||
}
|
||||
|
||||
// Second Get should be a cache hit.
|
||||
if _, _, err := svc.Get(ctx, "iss-cache-test"); err != nil {
|
||||
t.Fatalf("hit: %v", err)
|
||||
}
|
||||
if repo.putCount != 1 {
|
||||
t.Errorf("putCount = %d, want 1 (hit should not regenerate)", repo.putCount)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRLCacheService_Get_StalenessTriggersRegeneration(t *testing.T) {
|
||||
svc, repo, _ := newCacheServiceFixture(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Prime the cache with a row whose next_update is in the past.
|
||||
stale := &domain.CRLCacheEntry{
|
||||
IssuerID: "iss-cache-test",
|
||||
CRLDER: []byte("stale-der"),
|
||||
CRLNumber: 1,
|
||||
ThisUpdate: time.Now().Add(-48 * time.Hour),
|
||||
NextUpdate: time.Now().Add(-24 * time.Hour), // expired
|
||||
GeneratedAt: time.Now().Add(-48 * time.Hour),
|
||||
}
|
||||
if err := repo.Put(ctx, stale); err != nil {
|
||||
t.Fatalf("seed stale: %v", err)
|
||||
}
|
||||
repo.putCount = 0
|
||||
|
||||
// Get should detect staleness and regenerate.
|
||||
der, _, err := svc.Get(ctx, "iss-cache-test")
|
||||
if err != nil {
|
||||
t.Fatalf("Get on stale: %v", err)
|
||||
}
|
||||
if string(der) == "stale-der" {
|
||||
t.Error("Get returned stale DER instead of regenerating")
|
||||
}
|
||||
if repo.putCount != 1 {
|
||||
t.Errorf("putCount = %d, want 1 (staleness should trigger one regen)", repo.putCount)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// RegenerateAll
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestCRLCacheService_RegenerateAll_PopulatesAllIssuers(t *testing.T) {
|
||||
svc, repo, _ := newCacheServiceFixture(t)
|
||||
ctx := context.Background()
|
||||
|
||||
svc.RegenerateAll(ctx)
|
||||
|
||||
row, _ := repo.Get(ctx, "iss-cache-test")
|
||||
if row == nil {
|
||||
t.Fatal("RegenerateAll did not populate iss-cache-test")
|
||||
}
|
||||
if row.RevokedCount != 0 {
|
||||
t.Errorf("RevokedCount = %d, want 0 (fakeRevocationRepo is empty)", row.RevokedCount)
|
||||
}
|
||||
events, _ := repo.ListGenerationEvents(ctx, "iss-cache-test", 10)
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 generation event, got %d", len(events))
|
||||
}
|
||||
if !events[0].Succeeded {
|
||||
t.Error("event.Succeeded should be true on happy path")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRLCacheService_RegenerateAll_RespectsCancelledContext(t *testing.T) {
|
||||
svc, _, _ := newCacheServiceFixture(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
// Should return without panicking. The single-issuer fixture means
|
||||
// there's nothing to iterate after the cancel check, so this is
|
||||
// mostly a smoke test for the ctx.Done() branch.
|
||||
svc.RegenerateAll(ctx)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Singleflight: concurrent miss requests for the same issuer collapse
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestCRLCacheService_Get_SingleflightCollapsesConcurrentMisses(t *testing.T) {
|
||||
svc, repo, _ := newCacheServiceFixture(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Fire 20 concurrent Get calls for the same uncached issuer. The
|
||||
// in-tree singleflight gate should collapse them to a single
|
||||
// underlying generation (putCount == 1).
|
||||
var wg sync.WaitGroup
|
||||
var errCount atomic.Int32
|
||||
for i := 0; i < 20; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if _, _, err := svc.Get(ctx, "iss-cache-test"); err != nil {
|
||||
errCount.Add(1)
|
||||
t.Errorf("concurrent Get: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if errCount.Load() != 0 {
|
||||
t.Fatalf("%d errors across concurrent Gets", errCount.Load())
|
||||
}
|
||||
if repo.putCount != 1 {
|
||||
t.Errorf("singleflight failed: putCount = %d, want 1 (20 concurrent misses must collapse)", repo.putCount)
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Error paths
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestCRLCacheService_Get_NoIssuerInRegistry_RecordsFailureEvent(t *testing.T) {
|
||||
svc, repo, _ := newCacheServiceFixture(t)
|
||||
ctx := context.Background()
|
||||
|
||||
// Issuer ID that doesn't exist in the registry → CAOperationsSvc
|
||||
// returns an error → cache service records a failure event +
|
||||
// surfaces the error to the caller.
|
||||
_, _, err := svc.Get(ctx, "iss-does-not-exist")
|
||||
if err == nil {
|
||||
t.Fatal("Get for unknown issuer should error")
|
||||
}
|
||||
events, _ := repo.ListGenerationEvents(ctx, "iss-does-not-exist", 10)
|
||||
if len(events) != 1 {
|
||||
t.Fatalf("expected 1 failure event, got %d", len(events))
|
||||
}
|
||||
if events[0].Succeeded {
|
||||
t.Error("failure event should have Succeeded=false")
|
||||
}
|
||||
if events[0].Error == "" {
|
||||
t.Error("failure event should carry an error message")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRLCacheService_Get_NoCacheRepo_Errors(t *testing.T) {
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
svc := service.NewCRLCacheService(nil, nil, nil, logger)
|
||||
_, _, err := svc.Get(context.Background(), "any")
|
||||
if err == nil {
|
||||
t.Fatal("Get with nil cacheRepo should error")
|
||||
}
|
||||
}
|
||||
|
||||
// pin via interface satisfaction (compile-time check that fakeRevocationRepo
|
||||
// matches what CAOperationsSvc actually calls — guards against shape drift
|
||||
// in the repository.RevocationRepository interface).
|
||||
var _ interface {
|
||||
ListByIssuer(ctx context.Context, issuerID string) ([]*domain.CertificateRevocation, error)
|
||||
} = fakeRevocationRepo{}
|
||||
|
||||
// _ silence the unused import warning when issuer adapter machinery moves.
|
||||
var _ = issuer.IssuanceRequest{}
|
||||
Reference in New Issue
Block a user