Files
certctl/internal/scheduler/scheduler.go
T
shankar0123 a579a84c7f feat: M11a — certificate profiles, crypto policy enforcement, short-lived cert expiry
Add certificate profiles as named enrollment templates that control allowed
key algorithms, max TTL, permitted EKUs, required SAN patterns, and optional
SPIFFE URI SANs. CSR submissions are validated against profile rules at
signing time (key type + minimum size). Short-lived certs (TTL < 1 hour)
auto-expire via a new scheduler loop — expiry acts as revocation, no
CRL/OCSP needed.

New files:
- Migration 000003: certificate_profiles table, FK columns on
  managed_certificates/renewal_policies, key metadata on certificate_versions
- domain/profile.go: CertificateProfile + KeyAlgorithmRule structs
- repository/postgres/profile.go: full CRUD with JSONB marshaling
- service/profile.go: ProfileService with validation + audit logging
- service/crypto_validation.go: CSR-against-profile validation (RSA/ECDSA/Ed25519)
- handler/profiles.go: 5 HTTP endpoints under /api/v1/profiles
- web/src/pages/ProfilesPage.tsx: profiles management page

Modified:
- renewal.go: CSR validation in CompleteAgentCSRRenewal, ExpireShortLivedCertificates
- scheduler.go: 30s short-lived expiry check loop
- certificate.go (repo): nullable profile FK, key metadata on versions
- main.go: profile repo/service/handler wiring, 8-param NewRenewalService
- router.go: 12-param RegisterHandlers with profile routes
- seed_demo.sql: 4 demo profiles (standard, mtls, short-lived, high-security)
- Frontend: types, API client, routing, sidebar nav

Tests: 40 new tests across handler (15), service (13), crypto validation (12)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-20 20:39:49 -04:00

261 lines
7.8 KiB
Go

package scheduler
import (
"context"
"log/slog"
"time"
"github.com/shankar0123/certctl/internal/service"
)
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
// and notification processing.
type Scheduler struct {
renewalService *service.RenewalService
jobService *service.JobService
agentService *service.AgentService
notificationService *service.NotificationService
logger *slog.Logger
// Configurable tick intervals
renewalCheckInterval time.Duration
jobProcessorInterval time.Duration
agentHealthCheckInterval time.Duration
notificationProcessInterval time.Duration
shortLivedExpiryCheckInterval time.Duration
}
// NewScheduler creates a new scheduler with configurable intervals.
func NewScheduler(
renewalService *service.RenewalService,
jobService *service.JobService,
agentService *service.AgentService,
notificationService *service.NotificationService,
logger *slog.Logger,
) *Scheduler {
return &Scheduler{
renewalService: renewalService,
jobService: jobService,
agentService: agentService,
notificationService: notificationService,
logger: logger,
// Default intervals
renewalCheckInterval: 1 * time.Hour,
jobProcessorInterval: 30 * time.Second,
agentHealthCheckInterval: 2 * time.Minute,
notificationProcessInterval: 1 * time.Minute,
shortLivedExpiryCheckInterval: 30 * time.Second,
}
}
// SetRenewalCheckInterval configures the interval for renewal checks.
func (s *Scheduler) SetRenewalCheckInterval(d time.Duration) {
s.renewalCheckInterval = d
}
// SetJobProcessorInterval configures the interval for job processing.
func (s *Scheduler) SetJobProcessorInterval(d time.Duration) {
s.jobProcessorInterval = d
}
// SetAgentHealthCheckInterval configures the interval for agent health checks.
func (s *Scheduler) SetAgentHealthCheckInterval(d time.Duration) {
s.agentHealthCheckInterval = d
}
// SetNotificationProcessInterval configures the interval for notification processing.
func (s *Scheduler) SetNotificationProcessInterval(d time.Duration) {
s.notificationProcessInterval = d
}
// Start initiates all background scheduler loops. It returns a channel that signals
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
startedChan := make(chan struct{})
go func() {
s.logger.Info("scheduler starting")
// Signal that the scheduler has started all loops
go func() {
<-time.After(100 * time.Millisecond)
close(startedChan)
}()
// Start all scheduler loops concurrently
go s.renewalCheckLoop(ctx)
go s.jobProcessorLoop(ctx)
go s.agentHealthCheckLoop(ctx)
go s.notificationProcessLoop(ctx)
go s.shortLivedExpiryCheckLoop(ctx)
// Wait for context cancellation
<-ctx.Done()
s.logger.Info("scheduler shutting down", "reason", ctx.Err())
}()
return startedChan
}
// renewalCheckLoop runs every renewalCheckInterval and checks for expiring certificates.
// If an error occurs, it logs the error but continues running.
func (s *Scheduler) renewalCheckLoop(ctx context.Context) {
ticker := time.NewTicker(s.renewalCheckInterval)
defer ticker.Stop()
// Run immediately on start
s.runRenewalCheck(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.runRenewalCheck(ctx)
}
}
}
// runRenewalCheck executes a single renewal check with error recovery.
func (s *Scheduler) runRenewalCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := s.renewalService.CheckExpiringCertificates(opCtx); err != nil {
s.logger.Error("renewal check failed",
"error", err,
"interval", s.renewalCheckInterval.String())
} else {
s.logger.Debug("renewal check completed")
}
}
// jobProcessorLoop runs every jobProcessorInterval and processes pending jobs.
// It picks up pending jobs, executes them, and handles the results.
// If an error occurs, it logs the error but continues running.
func (s *Scheduler) jobProcessorLoop(ctx context.Context) {
ticker := time.NewTicker(s.jobProcessorInterval)
defer ticker.Stop()
// Run immediately on start
s.runJobProcessor(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.runJobProcessor(ctx)
}
}
}
// runJobProcessor executes a single job processing cycle with error recovery.
func (s *Scheduler) runJobProcessor(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.jobService.ProcessPendingJobs(opCtx); err != nil {
s.logger.Error("job processor failed",
"error", err,
"interval", s.jobProcessorInterval.String())
} else {
s.logger.Debug("job processor completed")
}
}
// agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline.
// An agent is considered stale if it hasn't sent a heartbeat within the health check interval.
// If an error occurs, it logs the error but continues running.
func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) {
ticker := time.NewTicker(s.agentHealthCheckInterval)
defer ticker.Stop()
// Run immediately on start
s.runAgentHealthCheck(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.runAgentHealthCheck(ctx)
}
}
}
// runAgentHealthCheck executes a single agent health check with error recovery.
func (s *Scheduler) runAgentHealthCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
if err := s.agentService.MarkStaleAgentsOffline(opCtx, s.agentHealthCheckInterval); err != nil {
s.logger.Error("agent health check failed",
"error", err,
"interval", s.agentHealthCheckInterval.String())
} else {
s.logger.Debug("agent health check completed")
}
}
// notificationProcessLoop runs every notificationProcessInterval and processes pending notifications.
// If an error occurs, it logs the error but continues running.
func (s *Scheduler) notificationProcessLoop(ctx context.Context) {
ticker := time.NewTicker(s.notificationProcessInterval)
defer ticker.Stop()
// Run immediately on start
s.runNotificationProcess(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.runNotificationProcess(ctx)
}
}
}
// runNotificationProcess executes a single notification processing cycle with error recovery.
func (s *Scheduler) runNotificationProcess(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
if err := s.notificationService.ProcessPendingNotifications(opCtx); err != nil {
s.logger.Error("notification processor failed",
"error", err,
"interval", s.notificationProcessInterval.String())
} else {
s.logger.Debug("notification processor completed")
}
}
// shortLivedExpiryCheckLoop runs every shortLivedExpiryCheckInterval and marks expired
// short-lived certificates. For certs with TTL < 1 hour, expiry IS revocation —
// no CRL/OCSP needed.
func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) {
ticker := time.NewTicker(s.shortLivedExpiryCheckInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.runShortLivedExpiryCheck(ctx)
}
}
}
// runShortLivedExpiryCheck executes a single short-lived expiry check with error recovery.
func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := s.renewalService.ExpireShortLivedCertificates(opCtx); err != nil {
s.logger.Error("short-lived expiry check failed",
"error", err,
"interval", s.shortLivedExpiryCheckInterval.String())
} else {
s.logger.Debug("short-lived expiry check completed")
}
}