I-003: job timeout reaper closes AwaitingCSR/AwaitingApproval gap

Add 11th always-on scheduler loop that transitions jobs stuck in
AwaitingCSR (default 24h TTL) or AwaitingApproval (default 168h TTL)
to Failed. I-001's retry loop then auto-promotes eligible Failed jobs
back to Pending. No new status enum, no schema migration.

- JobRepository.ListTimedOutAwaitingJobs with per-status cutoff WHERE
- JobService.ReapTimedOutJobs mirrors RetryFailedJobs structure
- Scheduler jobTimeoutLoop with atomic.Bool idempotency guard, 2m
  per-tick context, WaitGroup shutdown drain
- Config: CERTCTL_JOB_TIMEOUT_INTERVAL (10m), CERTCTL_JOB_AWAITING_CSR_TIMEOUT
  (24h), CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT (168h)
- Audit event per transition: actor=system, actorType=System,
  action=job_timeout, details={old_status, new_status, timeout_reason,
  age_hours}
- 14 new tests: 3 config, 7 service, 4 scheduler
This commit is contained in:
shankar0123
2026-04-19 01:04:56 +00:00
parent 4bc8b3e723
commit 1ee77c89f8
12 changed files with 1020 additions and 14 deletions
+76
View File
@@ -1,6 +1,7 @@
package service
import (
"time"
"context"
"errors"
"fmt"
@@ -236,6 +237,81 @@ func (s *JobService) RetryFailedJobs(ctx context.Context, maxRetries int) error
return nil
}
// ReapTimedOutJobs transitions jobs stuck in AwaitingCSR or AwaitingApproval
// to Failed if they've exceeded their TTL. I-001's retry loop then auto-promotes
// eligible Failed jobs back to Pending (closes coverage gap I-003).
func (s *JobService) ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error {
s.logger.Debug("reaping timed-out jobs", "csr_ttl", csrTTL, "approval_ttl", approvalTTL)
now := time.Now()
csrCutoff := now.Add(-csrTTL)
approvalCutoff := now.Add(-approvalTTL)
timedOutJobs, err := s.jobRepo.ListTimedOutAwaitingJobs(ctx, csrCutoff, approvalCutoff)
if err != nil {
return fmt.Errorf("failed to fetch timed-out jobs: %w", err)
}
var reaped int
for _, job := range timedOutJobs {
oldStatus := job.Status
var (
newErrMsg string
reason string
ttl time.Duration
)
switch job.Status {
case domain.JobStatusAwaitingCSR:
ttl = csrTTL
reason = "csr_timeout"
newErrMsg = fmt.Sprintf("timed out in %s after %s", oldStatus, csrTTL)
case domain.JobStatusAwaitingApproval:
ttl = approvalTTL
reason = "approval_timeout"
newErrMsg = fmt.Sprintf("timed out in %s after %s", oldStatus, approvalTTL)
default:
continue
}
_ = ttl
job.Status = domain.JobStatusFailed
job.LastError = &newErrMsg
if err := s.jobRepo.Update(ctx, job); err != nil {
s.logger.Error("failed to transition timed-out job",
"job_id", job.ID,
"old_status", oldStatus,
"error", err)
continue
}
if s.auditService != nil {
ageHours := time.Since(job.CreatedAt).Hours()
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
"job_timeout", "job", job.ID,
map[string]interface{}{
"old_status": string(oldStatus),
"new_status": string(domain.JobStatusFailed),
"timeout_reason": reason,
"age_hours": ageHours,
}); auditErr != nil {
s.logger.Error("failed to record job timeout audit event",
"job_id", job.ID,
"error", auditErr)
}
}
reaped++
}
s.logger.Info("job timeout reaper completed",
"reaped", reaped,
"total_timed_out", len(timedOutJobs))
return nil
}
// GetJobStatus returns the current status of a job.
func (s *JobService) GetJobStatus(ctx context.Context, jobID string) (*domain.Job, error) {
job, err := s.jobRepo.Get(ctx, jobID)