I-003: job timeout reaper closes AwaitingCSR/AwaitingApproval gap

Add 11th always-on scheduler loop that transitions jobs stuck in
AwaitingCSR (default 24h TTL) or AwaitingApproval (default 168h TTL)
to Failed. I-001's retry loop then auto-promotes eligible Failed jobs
back to Pending. No new status enum, no schema migration.

- JobRepository.ListTimedOutAwaitingJobs with per-status cutoff WHERE
- JobService.ReapTimedOutJobs mirrors RetryFailedJobs structure
- Scheduler jobTimeoutLoop with atomic.Bool idempotency guard, 2m
  per-tick context, WaitGroup shutdown drain
- Config: CERTCTL_JOB_TIMEOUT_INTERVAL (10m), CERTCTL_JOB_AWAITING_CSR_TIMEOUT
  (24h), CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT (168h)
- Audit event per transition: actor=system, actorType=System,
  action=job_timeout, details={old_status, new_status, timeout_reason,
  age_hours}
- 14 new tests: 3 config, 7 service, 4 scheduler
This commit is contained in:
Shankar
2026-04-19 01:04:56 +00:00
parent 0d7d933e91
commit c17ea577e7
12 changed files with 1020 additions and 14 deletions
+90 -3
View File
@@ -58,6 +58,11 @@ type CloudDiscoveryServicer interface {
DiscoverAll(ctx context.Context) (int, []error)
}
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
type JobReaperService interface {
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
}
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
// and notification processing.
@@ -70,6 +75,7 @@ type Scheduler struct {
digestService DigestServicer
healthCheckService HealthCheckServicer
cloudDiscoveryService CloudDiscoveryServicer
jobReaper JobReaperService
logger *slog.Logger
// Configurable tick intervals
@@ -83,6 +89,9 @@ type Scheduler struct {
digestInterval time.Duration
healthCheckInterval time.Duration
cloudDiscoveryInterval time.Duration
jobTimeoutInterval time.Duration
awaitingCSRTimeout time.Duration
awaitingApprovalTimeout time.Duration
// Idempotency guards: prevent duplicate execution of slow jobs
renewalCheckRunning atomic.Bool
@@ -95,6 +104,7 @@ type Scheduler struct {
digestRunning atomic.Bool
healthCheckRunning atomic.Bool
cloudDiscoveryRunning atomic.Bool
jobTimeoutRunning atomic.Bool
// Graceful shutdown: wait for in-flight work to complete
wg sync.WaitGroup
@@ -128,6 +138,7 @@ func NewScheduler(
digestInterval: 24 * time.Hour,
healthCheckInterval: 60 * time.Second,
cloudDiscoveryInterval: 6 * time.Hour,
jobTimeoutInterval: 10 * time.Minute,
}
}
@@ -201,6 +212,26 @@ func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
s.cloudDiscoveryInterval = d
}
// SetJobReaperService sets the job reaper service (I-003).
func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
s.jobReaper = jr
}
// SetJobTimeoutInterval sets the job timeout reaper tick interval (I-003).
func (s *Scheduler) SetJobTimeoutInterval(d time.Duration) {
s.jobTimeoutInterval = d
}
// SetAwaitingCSRTimeout sets the AwaitingCSR TTL (I-003).
func (s *Scheduler) SetAwaitingCSRTimeout(d time.Duration) {
s.awaitingCSRTimeout = d
}
// SetAwaitingApprovalTimeout sets the AwaitingApproval TTL (I-003).
func (s *Scheduler) SetAwaitingApprovalTimeout(d time.Duration) {
s.awaitingApprovalTimeout = d
}
// Start initiates all background scheduler loops. It returns a channel that signals
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
@@ -211,10 +242,10 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
// Track all loop goroutines in the WaitGroup so WaitForCompletion
// blocks until they've fully exited (prevents test races).
// Base count is 6: renewal, job processor, job retry (I-001),
// agent health, notification, short-lived expiry. Optional loops
// Base count is 7: renewal, job processor, job retry (I-001),
// job timeout (I-003), agent health, notification, short-lived expiry. Optional loops
// (network scan, digest, health check, cloud discovery) add to this.
loopCount := 6
loopCount := 7
if s.networkScanService != nil {
loopCount++
}
@@ -232,6 +263,7 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobProcessorLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobRetryLoop(ctx) }()
go func() { defer s.wg.Done(); s.jobTimeoutLoop(ctx) }()
go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }()
go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }()
@@ -413,6 +445,61 @@ func (s *Scheduler) runJobRetry(ctx context.Context) {
}
}
// jobTimeoutLoop runs every jobTimeoutInterval and transitions jobs stuck in
// AwaitingCSR or AwaitingApproval to Failed if they exceed their TTL. I-001's
// retry loop then auto-promotes eligible Failed jobs back to Pending. Closes
// coverage gap I-003. Uses atomic.Bool to prevent duplicate execution.
func (s *Scheduler) jobTimeoutLoop(ctx context.Context) {
ticker := time.NewTicker(s.jobTimeoutInterval)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.jobTimeoutRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobTimeoutRunning.Store(false)
s.runJobTimeout(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.jobTimeoutRunning.CompareAndSwap(false, true) {
s.logger.Warn("job timeout reaper still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.jobTimeoutRunning.Store(false)
s.runJobTimeout(ctx)
}()
}
}
}
// runJobTimeout executes a single job timeout reaping cycle with error recovery.
// When no JobReaperService has been wired (e.g. in tests that don't exercise
// I-003) the call is a safe no-op, preserving the always-on loop topology
// described in I-003 without forcing every consumer to wire a reaper.
func (s *Scheduler) runJobTimeout(ctx context.Context) {
if s.jobReaper == nil {
return
}
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.jobReaper.ReapTimedOutJobs(opCtx, s.awaitingCSRTimeout, s.awaitingApprovalTimeout); err != nil {
s.logger.Error("job timeout reaper failed",
"error", err,
"interval", s.jobTimeoutInterval.String())
} else {
s.logger.Debug("job timeout reaper completed")
}
}
// agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline.
// An agent is considered stale if it hasn't sent a heartbeat within the health check interval.
// If an error occurs, it logs the error but continues running.
+217
View File
@@ -85,6 +85,13 @@ type mockJobService struct {
retryMaxRetriesSeen []int
retrySlowDelay time.Duration
retryShouldError bool
// Timeout reaper tracking (coverage gap I-003)
reapCallCount int
reapCallTimes []time.Time
reapSlowDelay time.Duration
reapShouldError bool
reapCtxHasDeadline bool
}
func (m *mockJobService) ProcessPendingJobs(ctx context.Context) error {
@@ -131,6 +138,33 @@ func (m *mockJobService) RetryFailedJobs(ctx context.Context, maxRetries int) er
return nil
}
// ReapTimedOutJobs is the scheduler-driven counterpart to ProcessPendingJobs that
// covers coverage gap I-003: JobService.ReapTimedOutJobs (via JobReaperService interface)
// had no runtime caller prior to the jobTimeoutLoop being wired.
func (m *mockJobService) ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error {
m.mu.Lock()
m.reapCallCount++
m.reapCallTimes = append(m.reapCallTimes, time.Now())
// Track whether context has a deadline set
_, hasDeadline := ctx.Deadline()
m.reapCtxHasDeadline = hasDeadline
m.mu.Unlock()
if m.reapSlowDelay > 0 {
select {
case <-time.After(m.reapSlowDelay):
case <-ctx.Done():
return ctx.Err()
}
}
if m.reapShouldError {
return context.Canceled
}
return nil
}
// mockAgentService is a mock implementation for testing.
type mockAgentService struct {
mu sync.Mutex
@@ -1141,3 +1175,186 @@ func TestScheduler_JobRetryLoop_WaitForCompletion(t *testing.T) {
}
t.Logf("retry loop graceful shutdown completed in %v after %d in-flight sweep(s)", elapsed, retryCount)
}
// TestScheduler_JobTimeoutLoop_NormalTick verifies that the job timeout reaper
// loop ticks at the specified interval (coverage gap I-003).
func TestScheduler_JobTimeoutLoop_NormalTick(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetJobTimeoutInterval(50 * time.Millisecond)
sched.SetAwaitingCSRTimeout(24 * time.Hour)
sched.SetAwaitingApprovalTimeout(168 * time.Hour)
sched.SetJobReaperService(jobMock)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<-sched.Start(ctx)
time.Sleep(200 * time.Millisecond)
cancel()
if err := sched.WaitForCompletion(2 * time.Second); err != nil {
t.Fatalf("WaitForCompletion: %v", err)
}
jobMock.mu.Lock()
count := jobMock.reapCallCount
jobMock.mu.Unlock()
if count < 2 {
t.Fatalf("expected >= 2 reap calls, got %d", count)
}
}
// TestScheduler_JobTimeoutLoop_IdempotencyGuard verifies that the timeout reaper
// uses an atomic guard to prevent concurrent execution (coverage gap I-003).
func TestScheduler_JobTimeoutLoop_IdempotencyGuard(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{
reapSlowDelay: 150 * time.Millisecond,
}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetJobTimeoutInterval(50 * time.Millisecond)
sched.SetAwaitingCSRTimeout(24 * time.Hour)
sched.SetAwaitingApprovalTimeout(168 * time.Hour)
sched.SetJobReaperService(jobMock)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<-sched.Start(ctx)
time.Sleep(400 * time.Millisecond)
jobMock.mu.Lock()
reapCount := jobMock.reapCallCount
jobMock.mu.Unlock()
if reapCount > 3 {
t.Logf("WARNING: reap called %d times in 400ms with 50ms interval and 150ms sweep — guard may not be working", reapCount)
}
t.Logf("job timeout idempotency guard: %d calls in 400ms (50ms interval, 150ms sweep)", reapCount)
cancel()
if err := sched.WaitForCompletion(2 * time.Second); err != nil {
t.Fatalf("WaitForCompletion should succeed: %v", err)
}
}
// TestScheduler_JobTimeoutLoop_ShutdownDrainsInFlight verifies that shutdown waits
// for an in-flight timeout reaper to complete (coverage gap I-003).
func TestScheduler_JobTimeoutLoop_ShutdownDrainsInFlight(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{
reapSlowDelay: 100 * time.Millisecond,
}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetJobTimeoutInterval(50 * time.Millisecond)
sched.SetAwaitingCSRTimeout(24 * time.Hour)
sched.SetAwaitingApprovalTimeout(168 * time.Hour)
sched.SetJobReaperService(jobMock)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<-sched.Start(ctx)
// Let the immediate-start timeout reaper goroutine begin its 100ms sweep.
time.Sleep(30 * time.Millisecond)
// Initiate shutdown mid-sweep.
cancel()
start := time.Now()
err := sched.WaitForCompletion(5 * time.Second)
elapsed := time.Since(start)
if err != nil {
t.Fatalf("WaitForCompletion should not error: %v", err)
}
if elapsed > 5*time.Second {
t.Fatalf("WaitForCompletion took longer than expected: %v", elapsed)
}
jobMock.mu.Lock()
reapCount := jobMock.reapCallCount
jobMock.mu.Unlock()
if reapCount < 1 {
t.Fatalf("expected timeout reaper to have started at least once before shutdown, got %d", reapCount)
}
t.Logf("timeout reaper graceful shutdown completed in %v after %d in-flight sweep(s)", elapsed, reapCount)
}
// TestScheduler_JobTimeoutLoop_ContextDeadlineRespected verifies that the timeout
// reaper receives a context with a deadline set for each tick (coverage gap I-003).
func TestScheduler_JobTimeoutLoop_ContextDeadlineRespected(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetJobTimeoutInterval(50 * time.Millisecond)
sched.SetAwaitingCSRTimeout(24 * time.Hour)
sched.SetAwaitingApprovalTimeout(168 * time.Hour)
sched.SetJobReaperService(jobMock)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<-sched.Start(ctx)
time.Sleep(100 * time.Millisecond)
cancel()
if err := sched.WaitForCompletion(2 * time.Second); err != nil {
t.Fatalf("WaitForCompletion: %v", err)
}
jobMock.mu.Lock()
hasDeadline := jobMock.reapCtxHasDeadline
jobMock.mu.Unlock()
if !hasDeadline {
t.Fatal("expected timeout reaper context to have a deadline set, but none found")
}
t.Log("timeout reaper context deadline verified")
}