// Copyright 2026 certctl LLC. All rights reserved. // SPDX-License-Identifier: BUSL-1.1 package service import ( "context" "errors" "fmt" "log/slog" "strings" "sync" "sync/atomic" "time" "golang.org/x/sync/semaphore" "github.com/certctl-io/certctl/internal/domain" "github.com/certctl-io/certctl/internal/repository" ) // ErrSelfApproval is returned by ApproveJob when the actor attempting to // approve a renewal job is the same person listed as the owner of the // underlying certificate. M-003 enforces separation of duties: the owner who // requested (or benefits from) the renewal must not be the same identity that // approves it. Handlers map this sentinel to HTTP 403 Forbidden. var ErrSelfApproval = errors.New("self-approval forbidden: actor is the owner of the certificate") // JobService manages job processing and status tracking. // It coordinates between the scheduler and various job-specific services. type JobService struct { jobRepo repository.JobRepository certRepo repository.CertificateRepository ownerRepo repository.OwnerRepository renewalService *RenewalService deploymentService *DeploymentService auditService *AuditService logger *slog.Logger // renewalConcurrency caps the number of concurrent goroutines that // ProcessPendingJobs spawns. 0 (zero-value) means "sequential" so // existing test wiring that constructs JobService directly via // NewJobService keeps its strict-serial behaviour. Production // wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency) // to switch on the bounded fan-out. Audit fix #9. renewalConcurrency int // claimLimit caps the number of Pending rows ProcessPendingJobs // claims in a single tick via ClaimPendingJobs. 0 (zero-value) // preserves the legacy "unlimited" semantics so existing test // wiring keeps its byte-for-byte behaviour. Production wiring // calls SetClaimLimit(cfg.Scheduler.JobClaimLimit) (default 1000). // SCALE-001 closure (Sprint 2, 2026-05-16). claimLimit int } // NewJobService creates a new job service. // // certRepo and ownerRepo are required for the M-003 not-self-approval check // in ApproveJob. Callers may pass nil for either to disable the check // (useful for tests that don't exercise the approval path); when nil, the // service logs a warning on the first approval attempt and permits the // transition. Production wiring must supply both. func NewJobService( jobRepo repository.JobRepository, certRepo repository.CertificateRepository, ownerRepo repository.OwnerRepository, renewalService *RenewalService, deploymentService *DeploymentService, logger *slog.Logger, ) *JobService { return &JobService{ jobRepo: jobRepo, certRepo: certRepo, ownerRepo: ownerRepo, renewalService: renewalService, deploymentService: deploymentService, logger: logger, } } // SetRenewalConcurrency wires the per-tick fan-out concurrency cap that // ProcessPendingJobs uses. Called from cmd/server/main.go with // cfg.Scheduler.RenewalConcurrency (default 25). Values ≤ 0 are // normalised to 1 — fail-safe to sequential rather than panicking on // semaphore.NewWeighted(0) which constructs a semaphore that blocks // every Acquire. // // Audit fix #9: bounded scheduler concurrency. Pre-fix // ProcessPendingJobs ran every claimed job sequentially in a single // goroutine (slow but safe); operators with large fleets needed a // performance lever, but switching to fire-and-forget per-job // goroutines would have unbounded the upstream-CA call rate and // tripped DigiCert / Entrust / Sectigo rate limits. The cap gives // the operator a knob (CERTCTL_RENEWAL_CONCURRENCY) to dial // throughput up without losing the rate-limit headroom. func (s *JobService) SetRenewalConcurrency(n int) { if n <= 0 { n = 1 } s.renewalConcurrency = n } // SetClaimLimit wires the per-tick cap on the number of Pending rows // ProcessPendingJobs claims from the JobRepository. Production wiring // passes cfg.Scheduler.JobClaimLimit (default 1000). Values ≤ 0 fall // back to 1000 — fail-safe rather than reverting to the legacy // unlimited semantics that SCALE-001 closed. // // Test wiring that constructs JobService via NewJobService and never // calls SetClaimLimit retains the historical limit:0 ClaimPendingJobs // invocation (the JobService.claimLimit zero-value), preserving // byte-for-byte unit-test behaviour. Repository-level tests that // exercise the LIMIT clause specifically pass a non-zero limit // directly to ClaimPendingJobs and don't go through this seam. // // SCALE-001 closure (Sprint 2, 2026-05-16). func (s *JobService) SetClaimLimit(n int) { if n <= 0 { n = 1000 } s.claimLimit = n } // SetAuditService wires an optional audit service for emitting lifecycle // events (e.g., scheduler-driven job_retry transitions recorded by // RetryFailedJobs). Construction keeps the audit dependency optional so // bootstrap/test wiring that doesn't exercise the retry path can omit it; // production wiring in cmd/server/main.go should always call this. func (s *JobService) SetAuditService(a *AuditService) { s.auditService = a } // ProcessPendingJobs fetches and processes all pending jobs. // It routes jobs to the appropriate service based on job type and handles errors gracefully. // // Concurrency (H-6 CWE-362): jobs are claimed via ClaimPendingJobs which uses // SELECT ... FOR UPDATE SKIP LOCKED and flips Pending → Running atomically. Concurrent // scheduler replicas in HA deployments will therefore never observe the same Pending row, // and the subsequent UpdateStatus(Running) calls inside the downstream service methods are // idempotent against the pre-flipped state. func (s *JobService) ProcessPendingJobs(ctx context.Context) error { // Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock). // Empty jobType matches all types; claimLimit caps the per-tick claim // (SCALE-001 closure — pre-fix limit:0 meant unlimited, which page- // thrashed on 100K-job bursts). Zero claimLimit preserves legacy // unlimited semantics for test wiring that hasn't called SetClaimLimit. pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", s.claimLimit) if err != nil { return fmt.Errorf("failed to claim pending jobs: %w", err) } if len(pendingJobs) == 0 { s.logger.Debug("no pending jobs to process") return nil } // SCALE-001: emit the per-tick claim count so operators can spot // the cap engaging (when count == claimLimit, the queue is // running ahead of the fan-out and the operator may want to bump // CERTCTL_SCHEDULER_JOB_CLAIM_LIMIT or CERTCTL_RENEWAL_CONCURRENCY). s.logger.Info("processing pending jobs", "count", len(pendingJobs), "claim_limit", s.claimLimit) // Audit fix #9: bounded concurrent fan-out. When renewalConcurrency // is the zero value (caller never set it), fall through to the // historical strict-sequential loop so every existing test path // keeps its byte-for-byte behaviour. Production wiring in // cmd/server/main.go always calls SetRenewalConcurrency with the // configured cap (default 25), so the bounded path is always taken // in real deployments. if s.renewalConcurrency <= 0 { return s.processPendingJobsSequential(ctx, pendingJobs) } return s.processPendingJobsConcurrent(ctx, pendingJobs, s.renewalConcurrency) } // processPendingJobsSequential is the legacy strict-serial fan-out // (preserved for unit-test wiring that constructs JobService via // NewJobService and never calls SetRenewalConcurrency). One job at a // time, no concurrency. func (s *JobService) processPendingJobsSequential(ctx context.Context, pendingJobs []*domain.Job) error { var processedCount int var failedCount int for _, job := range pendingJobs { if shouldSkipJob(job) { s.logger.Debug("skipping agent-routed deployment job", "job_id", job.ID, "agent_id", *job.AgentID) continue } if err := s.processJob(ctx, job); err != nil { s.logger.Error("failed to process job", "job_id", job.ID, "job_type", job.Type, "error", err) failedCount++ continue } processedCount++ } s.logger.Info("job processing completed", "processed", processedCount, "failed", failedCount, "total", len(pendingJobs)) return nil } // processPendingJobsConcurrent is the production entry point for the // bounded fan-out — it pins the per-job work function to s.processJob. // Test code at internal/service/job_concurrency_test.go drives // boundedFanOut directly with a counter-tracking stub so the // concurrency cap can be asserted without standing up the full // renewal/deployment service graph. func (s *JobService) processPendingJobsConcurrent(ctx context.Context, pendingJobs []*domain.Job, capN int) error { return boundedFanOut(ctx, pendingJobs, capN, s.processJob, s.logger) } // boundedFanOut runs `work` over `pendingJobs` with at most `capN` // goroutines in flight at any moment, using // golang.org/x/sync/semaphore.Weighted for the gate. The Acquire(ctx, // 1) call BLOCKS the loop when at the cap, providing real backpressure // rather than fire-and-forget — the scheduler tick can't outrun the // upstream-CA rate limit. ctx cancellation propagates through Acquire // so a context.Done() interrupts the fan-out promptly; in-flight // goroutines drain via Wait before the function returns, so no // goroutine outlives the scheduler tick. // // processed / failed are tracked via atomic.Int64 to avoid mutex // overhead on every job completion; the final log line reads both // after Wait, so the values reflect every dispatched job. // // Audit fix #9: the cap is a load-bearing pre-condition for "operators // with permissive upstream limits and large fleets can scale up the // renewal sweep without losing rate-limit headroom." func boundedFanOut(ctx context.Context, pendingJobs []*domain.Job, capN int, work func(context.Context, *domain.Job) error, logger *slog.Logger) error { var processed, failed atomic.Int64 sem := semaphore.NewWeighted(int64(capN)) var wg sync.WaitGroup for _, job := range pendingJobs { if shouldSkipJob(job) { logger.Debug("skipping agent-routed deployment job", "job_id", job.ID, "agent_id", *job.AgentID) continue } // Acquire BEFORE launching so the offered load to upstream // CAs respects the cap regardless of how fast individual // jobs complete. if err := sem.Acquire(ctx, 1); err != nil { dispatched := processed.Load() + failed.Load() logger.Warn("renewal fan-out cancelled mid-tick", "reason", err, "jobs_dispatched", dispatched, "jobs_remaining", int64(len(pendingJobs))-dispatched) break } wg.Add(1) go func(j *domain.Job) { defer wg.Done() defer sem.Release(1) if err := work(ctx, j); err != nil { logger.Error("failed to process job", "job_id", j.ID, "job_type", j.Type, "error", err) failed.Add(1) return } processed.Add(1) }(job) } wg.Wait() logger.Info("job processing completed", "processed", processed.Load(), "failed", failed.Load(), "total", len(pendingJobs), "concurrency_cap", capN) return nil } // shouldSkipJob returns true for deployment jobs that have an agent_id — // those are meant for agent pickup via GetPendingWork(), not server-side // processing. The server should only process deployment jobs without an // agent (legacy/serverless targets). func shouldSkipJob(job *domain.Job) bool { return job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != "" } // processJob routes a single job to the appropriate service based on type. func (s *JobService) processJob(ctx context.Context, job *domain.Job) error { s.logger.Debug("processing job", "job_id", job.ID, "job_type", job.Type, "certificate_id", job.CertificateID) switch job.Type { case domain.JobTypeRenewal: return s.renewalService.ProcessRenewalJob(ctx, job) case domain.JobTypeDeployment: return s.deploymentService.ProcessDeploymentJob(ctx, job) case domain.JobTypeIssuance: return s.processIssuanceJob(ctx, job) case domain.JobTypeValidation: return s.processValidationJob(ctx, job) default: return fmt.Errorf("unknown job type: %s", job.Type) } } // processIssuanceJob handles a certificate issuance job. // It reuses the renewal service's ProcessRenewalJob since the flow is identical: // generate key → create CSR → call issuer → store version → create deployment jobs. // The only difference is semantics (new cert vs renewed cert), not mechanics. func (s *JobService) processIssuanceJob(ctx context.Context, job *domain.Job) error { s.logger.Debug("processing issuance job", "job_id", job.ID) // Issuance follows the same code path as renewal for the Local CA: // generate server-side key + CSR → sign via issuer → store cert version → deploy return s.renewalService.ProcessRenewalJob(ctx, job) } // processValidationJob handles a certificate validation job. // This is a placeholder that documents the flow. // see #validation-job-impl — implement actual validation job processing // when a customer ask materializes. Today's renewal-loop already calls // target connector ValidateDeployment after every deploy; this code // path is reserved for an out-of-band "verify what's deployed matches // the issued cert" scheduler tick. Not currently wired into the job // dispatcher — the job type is reserved. func (s *JobService) processValidationJob(ctx context.Context, job *domain.Job) error { s.logger.Debug("processing validation job", "job_id", job.ID) // see #validation-job-impl — when implemented: // In production: // 1. Fetch the certificate // 2. For each target, call target connector ValidateDeployment // 3. Aggregate results // 4. Update job status based on results // 5. Send notification if any validation fails return fmt.Errorf("validation job processing not yet implemented") } // RetryFailedJobs finds failed jobs and resets them for retry. // It only retries jobs that haven't exceeded max attempts. // // Audit trail (I-001): each successful Failed → Pending transition emits a // "job_retry" audit event with actor "system" (ActorTypeSystem), capturing // the old→new state and attempt counters so operators can reconstruct // scheduler-driven retry activity. The audit service is optional — callers // that haven't wired it via SetAuditService simply skip emission. // // maxRetries is retained for interface compatibility with // scheduler.JobServicer but is advisory: per-job eligibility is governed by // each job's own Attempts vs. MaxAttempts, not this parameter. func (s *JobService) RetryFailedJobs(ctx context.Context, maxRetries int) error { s.logger.Debug("retrying failed jobs", "max_retries", maxRetries) failedJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusFailed) if err != nil { return fmt.Errorf("failed to fetch failed jobs: %w", err) } var retriedCount int for _, job := range failedJobs { // Check if we can retry (Attempts < MaxAttempts) if job.Attempts >= job.MaxAttempts { s.logger.Debug("job exceeded max retries", "job_id", job.ID, "attempts", job.Attempts, "max_attempts", job.MaxAttempts) continue } // Reset status to pending for retry if err := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusPending, ""); err != nil { s.logger.Error("failed to reset job status for retry", "job_id", job.ID, "error", err) continue } if s.auditService != nil { if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem, "job_retry", "job", job.ID, map[string]interface{}{ "old_status": string(domain.JobStatusFailed), "new_status": string(domain.JobStatusPending), "attempts": job.Attempts, "max_attempts": job.MaxAttempts, }); auditErr != nil { s.logger.Error("failed to record job retry audit event", "job_id", job.ID, "error", auditErr) } } retriedCount++ } s.logger.Info("failed jobs retry completed", "retried", retriedCount, "total_failed", len(failedJobs)) return nil } // ReapJobsWithOfflineAgents transitions jobs in Running status whose // owning agent has been silent longer than agentTTL to Failed with // reason "agent_offline". Bundle C / Audit M-016 (CWE-754): closes the // gap left by ReapTimedOutJobs (which only handles AwaitingCSR / // AwaitingApproval). I-001's retry loop then auto-promotes eligible // Failed jobs back to Pending so a healthy agent can claim them. func (s *JobService) ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error { if agentTTL <= 0 { return fmt.Errorf("ReapJobsWithOfflineAgents: agentTTL must be positive, got %s", agentTTL) } cutoff := time.Now().Add(-agentTTL) staleJobs, err := s.jobRepo.ListJobsWithOfflineAgents(ctx, cutoff) if err != nil { return fmt.Errorf("list jobs with offline agents: %w", err) } var reaped int for _, job := range staleJobs { oldStatus := job.Status errMsg := fmt.Sprintf("agent offline (no heartbeat for >%s)", agentTTL) job.Status = domain.JobStatusFailed job.LastError = &errMsg if err := s.jobRepo.Update(ctx, job); err != nil { s.logger.Error("failed to transition offline-agent job", "job_id", job.ID, "agent_id", job.AgentID, "error", err) continue } if s.auditService != nil { if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem, "job_offline_agent_reap", "job", job.ID, map[string]interface{}{ "old_status": string(oldStatus), "new_status": string(domain.JobStatusFailed), "timeout_reason": "agent_offline", "agent_id": job.AgentID, }); auditErr != nil { s.logger.Error("failed to record offline-agent reap audit event", "job_id", job.ID, "error", auditErr) } } reaped++ } s.logger.Info("offline-agent job reaper completed", "reaped", reaped, "total_stale", len(staleJobs)) return nil } // ReapTimedOutJobs transitions jobs stuck in AwaitingCSR or AwaitingApproval // to Failed if they've exceeded their TTL. I-001's retry loop then auto-promotes // eligible Failed jobs back to Pending (closes coverage gap I-003). func (s *JobService) ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error { s.logger.Debug("reaping timed-out jobs", "csr_ttl", csrTTL, "approval_ttl", approvalTTL) now := time.Now() csrCutoff := now.Add(-csrTTL) approvalCutoff := now.Add(-approvalTTL) timedOutJobs, err := s.jobRepo.ListTimedOutAwaitingJobs(ctx, csrCutoff, approvalCutoff) if err != nil { return fmt.Errorf("failed to fetch timed-out jobs: %w", err) } var reaped int for _, job := range timedOutJobs { oldStatus := job.Status var ( newErrMsg string reason string ttl time.Duration ) switch job.Status { case domain.JobStatusAwaitingCSR: ttl = csrTTL reason = "csr_timeout" newErrMsg = fmt.Sprintf("timed out in %s after %s", oldStatus, csrTTL) case domain.JobStatusAwaitingApproval: ttl = approvalTTL reason = "approval_timeout" newErrMsg = fmt.Sprintf("timed out in %s after %s", oldStatus, approvalTTL) default: continue } _ = ttl job.Status = domain.JobStatusFailed job.LastError = &newErrMsg if err := s.jobRepo.Update(ctx, job); err != nil { s.logger.Error("failed to transition timed-out job", "job_id", job.ID, "old_status", oldStatus, "error", err) continue } if s.auditService != nil { ageHours := time.Since(job.CreatedAt).Hours() if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem, "job_timeout", "job", job.ID, map[string]interface{}{ "old_status": string(oldStatus), "new_status": string(domain.JobStatusFailed), "timeout_reason": reason, "age_hours": ageHours, }); auditErr != nil { s.logger.Error("failed to record job timeout audit event", "job_id", job.ID, "error", auditErr) } } reaped++ } s.logger.Info("job timeout reaper completed", "reaped", reaped, "total_timed_out", len(timedOutJobs)) return nil } // GetJobStatus returns the current status of a job. func (s *JobService) GetJobStatus(ctx context.Context, jobID string) (*domain.Job, error) { job, err := s.jobRepo.Get(ctx, jobID) if err != nil { return nil, fmt.Errorf("failed to fetch job: %w", err) } return job, nil } // CancelJob cancels a pending or running job (handler interface method). func (s *JobService) CancelJob(ctx context.Context, jobID string) error { job, err := s.jobRepo.Get(ctx, jobID) if err != nil { return fmt.Errorf("failed to fetch job: %w", err) } if job.Status != domain.JobStatusPending && job.Status != domain.JobStatusRunning { return fmt.Errorf("cannot cancel job with status %s", job.Status) } if err := s.jobRepo.UpdateStatus(ctx, jobID, domain.JobStatusCancelled, "cancelled by user"); err != nil { return fmt.Errorf("failed to cancel job: %w", err) } s.logger.Info("job cancelled", "job_id", jobID) return nil } // ListJobs returns paginated jobs with optional filtering (handler interface method). func (s *JobService) ListJobs(ctx context.Context, status, jobType string, page, perPage int) ([]domain.Job, int64, error) { if page < 1 { page = 1 } if perPage < 1 { perPage = 50 } allJobs, err := s.jobRepo.List(ctx) if err != nil { return nil, 0, fmt.Errorf("failed to list jobs: %w", err) } // Filter jobs in memory based on status and jobType var filtered []*domain.Job for _, job := range allJobs { if job == nil { continue } if status != "" && string(job.Status) != status { continue } if jobType != "" && string(job.Type) != jobType { continue } filtered = append(filtered, job) } total := int64(len(filtered)) start := (page - 1) * perPage if start >= int(total) { return nil, total, nil } end := start + perPage if end > int(total) { end = int(total) } var result []domain.Job for _, job := range filtered[start:end] { if job != nil { result = append(result, *job) } } return result, total, nil } // GetJob returns a single job (handler interface method). func (s *JobService) GetJob(ctx context.Context, id string) (*domain.Job, error) { return s.jobRepo.Get(ctx, id) } // ApproveJob approves a renewal job that is awaiting approval. // Transitions the job from AwaitingApproval to Pending so the scheduler picks it up. // // actor is the named-key identity of the approver (from the auth middleware // via resolveActor). M-003: if actor matches the certificate owner's Name or // Email (case-insensitive), returns ErrSelfApproval to enforce separation of // duties. Callers must pass a non-empty actor; empty actor is treated as an // anonymous system caller and permitted (internal/system paths). func (s *JobService) ApproveJob(ctx context.Context, id, actor string) error { job, err := s.jobRepo.Get(ctx, id) if err != nil { return fmt.Errorf("job not found: %w", err) } if job.Status != domain.JobStatusAwaitingApproval { return fmt.Errorf("cannot approve job with status %s (must be AwaitingApproval)", job.Status) } if err := s.checkNotSelf(ctx, job, actor); err != nil { return err } if err := s.jobRepo.UpdateStatus(ctx, id, domain.JobStatusPending, ""); err != nil { return fmt.Errorf("failed to approve job: %w", err) } s.logger.Info("renewal job approved", "job_id", id, "certificate_id", job.CertificateID, "actor", actor) return nil } // RejectJob rejects a renewal job that is awaiting approval. // Transitions the job to Cancelled with a rejection reason. // // actor is the named-key identity of the rejector (from the auth middleware // via resolveActor). Rejection is NOT subject to the not-self check — an // owner is permitted to cancel their own pending renewal. actor is recorded // on the log line for audit attribution. func (s *JobService) RejectJob(ctx context.Context, id, reason, actor string) error { job, err := s.jobRepo.Get(ctx, id) if err != nil { return fmt.Errorf("job not found: %w", err) } if job.Status != domain.JobStatusAwaitingApproval { return fmt.Errorf("cannot reject job with status %s (must be AwaitingApproval)", job.Status) } msg := "rejected by user" if reason != "" { msg = "rejected: " + reason } if err := s.jobRepo.UpdateStatus(ctx, id, domain.JobStatusCancelled, msg); err != nil { return fmt.Errorf("failed to reject job: %w", err) } s.logger.Info("renewal job rejected", "job_id", id, "certificate_id", job.CertificateID, "reason", reason, "actor", actor) return nil } // checkNotSelf enforces the M-003 separation-of-duties rule for renewal // approval: the actor approving a job may not be the owner of the underlying // certificate. // // Resolution rules: // - Empty actor → permitted (internal/system caller; auth middleware already // short-circuits anonymous users at the handler layer). // - certRepo or ownerRepo nil → warn once, permit (test/bootstrap wiring). // - Job has no certificate or certificate has no OwnerID → permitted (no // owner to collide with). // - Owner record not found → warn, permit (defensive: stale FK should not // block operations). // - Case-insensitive match against owner.Name OR owner.Email → returns // ErrSelfApproval. func (s *JobService) checkNotSelf(ctx context.Context, job *domain.Job, actor string) error { if actor == "" { return nil } if s.certRepo == nil || s.ownerRepo == nil { s.logger.Warn("not-self approval check skipped: cert/owner repo not wired", "job_id", job.ID, "actor", actor) return nil } if job.CertificateID == "" { return nil } cert, err := s.certRepo.Get(ctx, job.CertificateID) if err != nil { s.logger.Warn("not-self approval check: certificate lookup failed", "job_id", job.ID, "certificate_id", job.CertificateID, "error", err) return nil } if cert == nil || cert.OwnerID == "" { return nil } owner, err := s.ownerRepo.Get(ctx, cert.OwnerID) if err != nil || owner == nil { s.logger.Warn("not-self approval check: owner lookup failed", "job_id", job.ID, "owner_id", cert.OwnerID, "error", err) return nil } actorLower := strings.ToLower(actor) if strings.ToLower(owner.Name) == actorLower || strings.ToLower(owner.Email) == actorLower { s.logger.Warn("self-approval blocked", "job_id", job.ID, "certificate_id", job.CertificateID, "owner_id", owner.ID, "actor", actor) return ErrSelfApproval } return nil }