Files
certctl/internal/service/job.go
T
shankar0123 89b910a8f1 security: atomic pending-job claim with FOR UPDATE SKIP LOCKED (H-6)
Fixes H-6 (CWE-362) — GetPendingJobs returned pending rows without row
locks, so two scheduler replicas in an HA deployment could both read the
same row, both decide it was theirs, and race on UpdateStatus, producing
duplicate Running jobs and duplicate certificate issuances.

Remediation: a claim-style repository API that selects + transitions
Pending -> Running in one transaction with SELECT ... FOR UPDATE SKIP
LOCKED. Concurrent claimants observe disjoint row sets; no worker ever
sees another worker's claimed row.

Repository changes (internal/repository/postgres/job.go):
  - New ClaimPendingJobs(ctx, jobType, limit): BEGIN; SELECT id,...
    FROM jobs WHERE status='Pending' (optional type filter, optional
    LIMIT) FOR UPDATE SKIP LOCKED; UPDATE jobs SET status='Running',
    updated_at=NOW() WHERE id = ANY($ids); COMMIT. Returns the claimed
    rows with status already flipped.
  - New ClaimPendingByAgentID(ctx, agentID): mirrors M31 UNION ALL
    semantics (direct agent_id match, target->agent JOIN fallback,
    certificate->target->agent chain for AwaitingCSR) but wraps each
    branch in FOR UPDATE SKIP LOCKED and flips Deployment/Renewal rows
    to Running. AwaitingCSR rows are returned in place (state
    transition deferred until SubmitCSR, consistent with M8 semantics).
  - Existing GetPendingJobs / ListPendingByAgentID retained for legacy
    compatibility; their godoc now directs production callers to the
    Claim* variants.

Production caller switches:
  - internal/service/job.go ProcessPendingJobs: ListByStatus(Pending)
    -> ClaimPendingJobs(ctx, "", 0). Eliminates the real scheduler
    race between two replicas tick-firing simultaneously.
  - internal/service/agent.go GetPendingWork: ListPendingByAgentID ->
    ClaimPendingByAgentID. Eliminates the race between two pollers
    for the same agent (e.g. brief network blip causing duplicate
    poll) and between a scheduler tick and an agent poll.

Safety argument for pre-flipping Pending -> Running inside the claim
transaction: ProcessRenewalJob and ProcessDeploymentJob both call
UpdateStatus(Running) unconditionally on entry, so an early flip is
idempotent. On panic, the scheduler's panic recovery leaves the job
in Running which the existing stale-running reaper handles.

Tests (internal/repository/postgres/repo_test.go, skipped in -short):
  - TestJobRepository_ClaimPendingJobs_FlipsToRunning: seed 5 Pending,
    claim once, assert all 5 returned + DB rows Running, residual
    claim returns 0.
  - TestJobRepository_ClaimPendingJobs_ConcurrentDisjoint: seed M=40
    Pending Renewals, spawn N=8 goroutines each calling
    ClaimPendingJobs(_, JobTypeRenewal, 1) in a loop. Invariants:
    (a) no job ID claimed by more than one worker, (b) sum of claims
    == 40, (c) all 40 rows in Running state in the DB. Bounded
    empty-streak guard (20 iterations) covers SKIP LOCKED transient
    zeros under contention.
  - TestJobRepository_ClaimPendingByAgentID_TransitionsDeployments:
    seeds 2 Pending Deployment + 1 AwaitingCSR for agent A plus 1
    Pending Renewal for agent B (scope check). Asserts deployments
    flip to Running, AwaitingCSR is returned but preserved, agent B's
    renewal never appears.

Mock updates: testutil_test.go, lifecycle_test.go, verification_test.go
gained ClaimPendingJobs/ClaimPendingByAgentID on their mock job repos
mirroring the real Pending -> Running semantics. Mocks intentionally
do NOT write to StatusUpdates (that map tracks UpdateStatus() call
history specifically; the real claim path uses a bulk UPDATE, not
UpdateStatus).

Verification (CI-scope):
  - go build ./cmd/...: ok
  - go vet ./...: ok
  - go test -race -short on service, api/handler, api/middleware,
    scheduler, connector/..., domain, validation, tlsprobe: ok
  - Coverage gates: service 67.6% (>=55), handler 78.6% (>=60),
    middleware 80.0% (>=30), domain 92.7% (>=40). All hold.
  - golangci-lint 2.11.4: 0 issues
  - govulncheck: no vulnerabilities in call graph
  - Frontend: tsc clean, 218 vitest tests pass, vite build ok
  - helm lint + helm template: ok
  - Invariant sweeps: FOR UPDATE SKIP LOCKED present in job.go;
    H-1 through H-5 fixtures unchanged.

Refs: H-6 in certctl-audit-report.md
2026-04-17 02:34:56 +00:00

316 lines
9.7 KiB
Go

package service
import (
"context"
"fmt"
"log/slog"
"github.com/shankar0123/certctl/internal/domain"
"github.com/shankar0123/certctl/internal/repository"
)
// JobService manages job processing and status tracking.
// It coordinates between the scheduler and various job-specific services.
type JobService struct {
jobRepo repository.JobRepository
renewalService *RenewalService
deploymentService *DeploymentService
logger *slog.Logger
}
// NewJobService creates a new job service.
func NewJobService(
jobRepo repository.JobRepository,
renewalService *RenewalService,
deploymentService *DeploymentService,
logger *slog.Logger,
) *JobService {
return &JobService{
jobRepo: jobRepo,
renewalService: renewalService,
deploymentService: deploymentService,
logger: logger,
}
}
// ProcessPendingJobs fetches and processes all pending jobs.
// It routes jobs to the appropriate service based on job type and handles errors gracefully.
//
// Concurrency (H-6 CWE-362): jobs are claimed via ClaimPendingJobs which uses
// SELECT ... FOR UPDATE SKIP LOCKED and flips Pending → Running atomically. Concurrent
// scheduler replicas in HA deployments will therefore never observe the same Pending row,
// and the subsequent UpdateStatus(Running) calls inside the downstream service methods are
// idempotent against the pre-flipped state.
func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
// Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock).
// Empty jobType matches all types; zero limit means unlimited (preserves prior semantics).
pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", 0)
if err != nil {
return fmt.Errorf("failed to claim pending jobs: %w", err)
}
if len(pendingJobs) == 0 {
s.logger.Debug("no pending jobs to process")
return nil
}
s.logger.Info("processing pending jobs", "count", len(pendingJobs))
var processedCount int
var failedCount int
// Process each job
for _, job := range pendingJobs {
// Skip deployment jobs that have an agent_id — those are meant for agent
// pickup via GetPendingWork(), not server-side processing. The server should
// only process deployment jobs without an agent (legacy/serverless targets).
if job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != "" {
s.logger.Debug("skipping agent-routed deployment job",
"job_id", job.ID,
"agent_id", *job.AgentID)
continue
}
if err := s.processJob(ctx, job); err != nil {
s.logger.Error("failed to process job",
"job_id", job.ID,
"job_type", job.Type,
"error", err)
failedCount++
continue
}
processedCount++
}
s.logger.Info("job processing completed",
"processed", processedCount,
"failed", failedCount,
"total", len(pendingJobs))
return nil
}
// processJob routes a single job to the appropriate service based on type.
func (s *JobService) processJob(ctx context.Context, job *domain.Job) error {
s.logger.Debug("processing job",
"job_id", job.ID,
"job_type", job.Type,
"certificate_id", job.CertificateID)
switch job.Type {
case domain.JobTypeRenewal:
return s.renewalService.ProcessRenewalJob(ctx, job)
case domain.JobTypeDeployment:
return s.deploymentService.ProcessDeploymentJob(ctx, job)
case domain.JobTypeIssuance:
return s.processIssuanceJob(ctx, job)
case domain.JobTypeValidation:
return s.processValidationJob(ctx, job)
default:
return fmt.Errorf("unknown job type: %s", job.Type)
}
}
// processIssuanceJob handles a certificate issuance job.
// It reuses the renewal service's ProcessRenewalJob since the flow is identical:
// generate key → create CSR → call issuer → store version → create deployment jobs.
// The only difference is semantics (new cert vs renewed cert), not mechanics.
func (s *JobService) processIssuanceJob(ctx context.Context, job *domain.Job) error {
s.logger.Debug("processing issuance job", "job_id", job.ID)
// Issuance follows the same code path as renewal for the Local CA:
// generate server-side key + CSR → sign via issuer → store cert version → deploy
return s.renewalService.ProcessRenewalJob(ctx, job)
}
// processValidationJob handles a certificate validation job.
// This is a placeholder that documents the flow.
// TODO: Implement actual validation job processing if needed.
func (s *JobService) processValidationJob(ctx context.Context, job *domain.Job) error {
s.logger.Debug("processing validation job", "job_id", job.ID)
// TODO: Implement validation job processing
// In production:
// 1. Fetch the certificate
// 2. For each target, call target connector ValidateDeployment
// 3. Aggregate results
// 4. Update job status based on results
// 5. Send notification if any validation fails
return fmt.Errorf("validation job processing not yet implemented")
}
// RetryFailedJobs finds failed jobs and resets them for retry.
// It only retries jobs that haven't exceeded max attempts.
func (s *JobService) RetryFailedJobs(ctx context.Context, maxRetries int) error {
s.logger.Debug("retrying failed jobs", "max_retries", maxRetries)
failedJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusFailed)
if err != nil {
return fmt.Errorf("failed to fetch failed jobs: %w", err)
}
var retriedCount int
for _, job := range failedJobs {
// Check if we can retry (Attempts < MaxAttempts)
if job.Attempts >= job.MaxAttempts {
s.logger.Debug("job exceeded max retries",
"job_id", job.ID,
"attempts", job.Attempts,
"max_attempts", job.MaxAttempts)
continue
}
// Reset status to pending for retry
if err := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusPending, ""); err != nil {
s.logger.Error("failed to reset job status for retry",
"job_id", job.ID,
"error", err)
continue
}
retriedCount++
}
s.logger.Info("failed jobs retry completed",
"retried", retriedCount,
"total_failed", len(failedJobs))
return nil
}
// GetJobStatus returns the current status of a job.
func (s *JobService) GetJobStatus(ctx context.Context, jobID string) (*domain.Job, error) {
job, err := s.jobRepo.Get(ctx, jobID)
if err != nil {
return nil, fmt.Errorf("failed to fetch job: %w", err)
}
return job, nil
}
// CancelJobWithContext cancels a pending or running job.
func (s *JobService) CancelJobWithContext(ctx context.Context, jobID string) error {
job, err := s.jobRepo.Get(ctx, jobID)
if err != nil {
return fmt.Errorf("failed to fetch job: %w", err)
}
if job.Status != domain.JobStatusPending && job.Status != domain.JobStatusRunning {
return fmt.Errorf("cannot cancel job with status %s", job.Status)
}
if err := s.jobRepo.UpdateStatus(ctx, jobID, domain.JobStatusCancelled, "cancelled by user"); err != nil {
return fmt.Errorf("failed to cancel job: %w", err)
}
s.logger.Info("job cancelled", "job_id", jobID)
return nil
}
// CancelJob cancels a job (handler interface method).
func (s *JobService) CancelJob(id string) error {
return s.CancelJobWithContext(context.Background(), id)
}
// ListJobs returns paginated jobs with optional filtering (handler interface method).
func (s *JobService) ListJobs(status, jobType string, page, perPage int) ([]domain.Job, int64, error) {
if page < 1 {
page = 1
}
if perPage < 1 {
perPage = 50
}
allJobs, err := s.jobRepo.List(context.Background())
if err != nil {
return nil, 0, fmt.Errorf("failed to list jobs: %w", err)
}
// Filter jobs in memory based on status and jobType
var filtered []*domain.Job
for _, job := range allJobs {
if job == nil {
continue
}
if status != "" && string(job.Status) != status {
continue
}
if jobType != "" && string(job.Type) != jobType {
continue
}
filtered = append(filtered, job)
}
total := int64(len(filtered))
start := (page - 1) * perPage
if start >= int(total) {
return nil, total, nil
}
end := start + perPage
if end > int(total) {
end = int(total)
}
var result []domain.Job
for _, job := range filtered[start:end] {
if job != nil {
result = append(result, *job)
}
}
return result, total, nil
}
// GetJob returns a single job (handler interface method).
func (s *JobService) GetJob(id string) (*domain.Job, error) {
return s.jobRepo.Get(context.Background(), id)
}
// ApproveJob approves a renewal job that is awaiting approval.
// Transitions the job from AwaitingApproval to Pending so the scheduler picks it up.
func (s *JobService) ApproveJob(id string) error {
ctx := context.Background()
job, err := s.jobRepo.Get(ctx, id)
if err != nil {
return fmt.Errorf("job not found: %w", err)
}
if job.Status != domain.JobStatusAwaitingApproval {
return fmt.Errorf("cannot approve job with status %s (must be AwaitingApproval)", job.Status)
}
if err := s.jobRepo.UpdateStatus(ctx, id, domain.JobStatusPending, ""); err != nil {
return fmt.Errorf("failed to approve job: %w", err)
}
s.logger.Info("renewal job approved", "job_id", id, "certificate_id", job.CertificateID)
return nil
}
// RejectJob rejects a renewal job that is awaiting approval.
// Transitions the job to Cancelled with a rejection reason.
func (s *JobService) RejectJob(id string, reason string) error {
ctx := context.Background()
job, err := s.jobRepo.Get(ctx, id)
if err != nil {
return fmt.Errorf("job not found: %w", err)
}
if job.Status != domain.JobStatusAwaitingApproval {
return fmt.Errorf("cannot reject job with status %s (must be AwaitingApproval)", job.Status)
}
msg := "rejected by user"
if reason != "" {
msg = "rejected: " + reason
}
if err := s.jobRepo.UpdateStatus(ctx, id, domain.JobStatusCancelled, msg); err != nil {
return fmt.Errorf("failed to reject job: %w", err)
}
s.logger.Info("renewal job rejected", "job_id", id, "certificate_id", job.CertificateID, "reason", reason)
return nil
}