mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 14:11:31 +00:00
4f90be9311
M21 adds server-side active TLS scanning of CIDR ranges with concurrent probing, sentinel agent pattern for pipeline reuse, and full CRUD API for scan targets. M22 adds Prometheus exposition format endpoint alongside existing JSON metrics. Comprehensive documentation audit updates all docs to reflect 91 endpoints, 19 tables, 6 scheduler loops, and 900+ tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
306 lines
9.1 KiB
Go
306 lines
9.1 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/shankar0123/certctl/internal/service"
|
|
)
|
|
|
|
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
|
|
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
|
|
// and notification processing.
|
|
type Scheduler struct {
|
|
renewalService *service.RenewalService
|
|
jobService *service.JobService
|
|
agentService *service.AgentService
|
|
notificationService *service.NotificationService
|
|
networkScanService *service.NetworkScanService
|
|
logger *slog.Logger
|
|
|
|
// Configurable tick intervals
|
|
renewalCheckInterval time.Duration
|
|
jobProcessorInterval time.Duration
|
|
agentHealthCheckInterval time.Duration
|
|
notificationProcessInterval time.Duration
|
|
shortLivedExpiryCheckInterval time.Duration
|
|
networkScanInterval time.Duration
|
|
}
|
|
|
|
// NewScheduler creates a new scheduler with configurable intervals.
|
|
func NewScheduler(
|
|
renewalService *service.RenewalService,
|
|
jobService *service.JobService,
|
|
agentService *service.AgentService,
|
|
notificationService *service.NotificationService,
|
|
networkScanService *service.NetworkScanService,
|
|
logger *slog.Logger,
|
|
) *Scheduler {
|
|
return &Scheduler{
|
|
renewalService: renewalService,
|
|
jobService: jobService,
|
|
agentService: agentService,
|
|
notificationService: notificationService,
|
|
networkScanService: networkScanService,
|
|
logger: logger,
|
|
|
|
// Default intervals
|
|
renewalCheckInterval: 1 * time.Hour,
|
|
jobProcessorInterval: 30 * time.Second,
|
|
agentHealthCheckInterval: 2 * time.Minute,
|
|
notificationProcessInterval: 1 * time.Minute,
|
|
shortLivedExpiryCheckInterval: 30 * time.Second,
|
|
networkScanInterval: 6 * time.Hour,
|
|
}
|
|
}
|
|
|
|
// SetRenewalCheckInterval configures the interval for renewal checks.
|
|
func (s *Scheduler) SetRenewalCheckInterval(d time.Duration) {
|
|
s.renewalCheckInterval = d
|
|
}
|
|
|
|
// SetJobProcessorInterval configures the interval for job processing.
|
|
func (s *Scheduler) SetJobProcessorInterval(d time.Duration) {
|
|
s.jobProcessorInterval = d
|
|
}
|
|
|
|
// SetAgentHealthCheckInterval configures the interval for agent health checks.
|
|
func (s *Scheduler) SetAgentHealthCheckInterval(d time.Duration) {
|
|
s.agentHealthCheckInterval = d
|
|
}
|
|
|
|
// SetNotificationProcessInterval configures the interval for notification processing.
|
|
func (s *Scheduler) SetNotificationProcessInterval(d time.Duration) {
|
|
s.notificationProcessInterval = d
|
|
}
|
|
|
|
// SetNetworkScanInterval configures the interval for network scanning.
|
|
func (s *Scheduler) SetNetworkScanInterval(d time.Duration) {
|
|
s.networkScanInterval = d
|
|
}
|
|
|
|
// Start initiates all background scheduler loops. It returns a channel that signals
|
|
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
|
|
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
|
startedChan := make(chan struct{})
|
|
|
|
go func() {
|
|
s.logger.Info("scheduler starting")
|
|
|
|
// Signal that the scheduler has started all loops
|
|
go func() {
|
|
<-time.After(100 * time.Millisecond)
|
|
close(startedChan)
|
|
}()
|
|
|
|
// Start all scheduler loops concurrently
|
|
go s.renewalCheckLoop(ctx)
|
|
go s.jobProcessorLoop(ctx)
|
|
go s.agentHealthCheckLoop(ctx)
|
|
go s.notificationProcessLoop(ctx)
|
|
go s.shortLivedExpiryCheckLoop(ctx)
|
|
if s.networkScanService != nil {
|
|
go s.networkScanLoop(ctx)
|
|
}
|
|
|
|
// Wait for context cancellation
|
|
<-ctx.Done()
|
|
s.logger.Info("scheduler shutting down", "reason", ctx.Err())
|
|
}()
|
|
|
|
return startedChan
|
|
}
|
|
|
|
// renewalCheckLoop runs every renewalCheckInterval and checks for expiring certificates.
|
|
// If an error occurs, it logs the error but continues running.
|
|
func (s *Scheduler) renewalCheckLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(s.renewalCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Run immediately on start
|
|
s.runRenewalCheck(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.runRenewalCheck(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runRenewalCheck executes a single renewal check with error recovery.
|
|
func (s *Scheduler) runRenewalCheck(ctx context.Context) {
|
|
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
|
defer cancel()
|
|
if err := s.renewalService.CheckExpiringCertificates(opCtx); err != nil {
|
|
s.logger.Error("renewal check failed",
|
|
"error", err,
|
|
"interval", s.renewalCheckInterval.String())
|
|
} else {
|
|
s.logger.Debug("renewal check completed")
|
|
}
|
|
}
|
|
|
|
// jobProcessorLoop runs every jobProcessorInterval and processes pending jobs.
|
|
// It picks up pending jobs, executes them, and handles the results.
|
|
// If an error occurs, it logs the error but continues running.
|
|
func (s *Scheduler) jobProcessorLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(s.jobProcessorInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Run immediately on start
|
|
s.runJobProcessor(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.runJobProcessor(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runJobProcessor executes a single job processing cycle with error recovery.
|
|
func (s *Scheduler) runJobProcessor(ctx context.Context) {
|
|
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
|
defer cancel()
|
|
if err := s.jobService.ProcessPendingJobs(opCtx); err != nil {
|
|
s.logger.Error("job processor failed",
|
|
"error", err,
|
|
"interval", s.jobProcessorInterval.String())
|
|
} else {
|
|
s.logger.Debug("job processor completed")
|
|
}
|
|
}
|
|
|
|
// agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline.
|
|
// An agent is considered stale if it hasn't sent a heartbeat within the health check interval.
|
|
// If an error occurs, it logs the error but continues running.
|
|
func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(s.agentHealthCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Run immediately on start
|
|
s.runAgentHealthCheck(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.runAgentHealthCheck(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runAgentHealthCheck executes a single agent health check with error recovery.
|
|
func (s *Scheduler) runAgentHealthCheck(ctx context.Context) {
|
|
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
|
defer cancel()
|
|
if err := s.agentService.MarkStaleAgentsOffline(opCtx, s.agentHealthCheckInterval); err != nil {
|
|
s.logger.Error("agent health check failed",
|
|
"error", err,
|
|
"interval", s.agentHealthCheckInterval.String())
|
|
} else {
|
|
s.logger.Debug("agent health check completed")
|
|
}
|
|
}
|
|
|
|
// notificationProcessLoop runs every notificationProcessInterval and processes pending notifications.
|
|
// If an error occurs, it logs the error but continues running.
|
|
func (s *Scheduler) notificationProcessLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(s.notificationProcessInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Run immediately on start
|
|
s.runNotificationProcess(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.runNotificationProcess(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runNotificationProcess executes a single notification processing cycle with error recovery.
|
|
func (s *Scheduler) runNotificationProcess(ctx context.Context) {
|
|
opCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
|
defer cancel()
|
|
if err := s.notificationService.ProcessPendingNotifications(opCtx); err != nil {
|
|
s.logger.Error("notification processor failed",
|
|
"error", err,
|
|
"interval", s.notificationProcessInterval.String())
|
|
} else {
|
|
s.logger.Debug("notification processor completed")
|
|
}
|
|
}
|
|
|
|
// shortLivedExpiryCheckLoop runs every shortLivedExpiryCheckInterval and marks expired
|
|
// short-lived certificates. For certs with TTL < 1 hour, expiry IS revocation —
|
|
// no CRL/OCSP needed.
|
|
func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(s.shortLivedExpiryCheckInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.runShortLivedExpiryCheck(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runShortLivedExpiryCheck executes a single short-lived expiry check with error recovery.
|
|
func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) {
|
|
opCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
if err := s.renewalService.ExpireShortLivedCertificates(opCtx); err != nil {
|
|
s.logger.Error("short-lived expiry check failed",
|
|
"error", err,
|
|
"interval", s.shortLivedExpiryCheckInterval.String())
|
|
} else {
|
|
s.logger.Debug("short-lived expiry check completed")
|
|
}
|
|
}
|
|
|
|
// networkScanLoop runs every networkScanInterval and performs active TLS scanning
|
|
// of configured network targets.
|
|
func (s *Scheduler) networkScanLoop(ctx context.Context) {
|
|
ticker := time.NewTicker(s.networkScanInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Run immediately on start
|
|
s.runNetworkScan(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
s.runNetworkScan(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runNetworkScan executes a single network scan cycle with error recovery.
|
|
func (s *Scheduler) runNetworkScan(ctx context.Context) {
|
|
opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
|
|
defer cancel()
|
|
if err := s.networkScanService.ScanAllTargets(opCtx); err != nil {
|
|
s.logger.Error("network scan failed",
|
|
"error", err,
|
|
"interval", s.networkScanInterval.String())
|
|
} else {
|
|
s.logger.Debug("network scan completed")
|
|
}
|
|
}
|