security: atomic pending-job claim with FOR UPDATE SKIP LOCKED (H-6)

Fixes H-6 (CWE-362) — GetPendingJobs returned pending rows without row
locks, so two scheduler replicas in an HA deployment could both read the
same row, both decide it was theirs, and race on UpdateStatus, producing
duplicate Running jobs and duplicate certificate issuances.

Remediation: a claim-style repository API that selects + transitions
Pending -> Running in one transaction with SELECT ... FOR UPDATE SKIP
LOCKED. Concurrent claimants observe disjoint row sets; no worker ever
sees another worker's claimed row.

Repository changes (internal/repository/postgres/job.go):
  - New ClaimPendingJobs(ctx, jobType, limit): BEGIN; SELECT id,...
    FROM jobs WHERE status='Pending' (optional type filter, optional
    LIMIT) FOR UPDATE SKIP LOCKED; UPDATE jobs SET status='Running',
    updated_at=NOW() WHERE id = ANY($ids); COMMIT. Returns the claimed
    rows with status already flipped.
  - New ClaimPendingByAgentID(ctx, agentID): mirrors M31 UNION ALL
    semantics (direct agent_id match, target->agent JOIN fallback,
    certificate->target->agent chain for AwaitingCSR) but wraps each
    branch in FOR UPDATE SKIP LOCKED and flips Deployment/Renewal rows
    to Running. AwaitingCSR rows are returned in place (state
    transition deferred until SubmitCSR, consistent with M8 semantics).
  - Existing GetPendingJobs / ListPendingByAgentID retained for legacy
    compatibility; their godoc now directs production callers to the
    Claim* variants.

Production caller switches:
  - internal/service/job.go ProcessPendingJobs: ListByStatus(Pending)
    -> ClaimPendingJobs(ctx, "", 0). Eliminates the real scheduler
    race between two replicas tick-firing simultaneously.
  - internal/service/agent.go GetPendingWork: ListPendingByAgentID ->
    ClaimPendingByAgentID. Eliminates the race between two pollers
    for the same agent (e.g. brief network blip causing duplicate
    poll) and between a scheduler tick and an agent poll.

Safety argument for pre-flipping Pending -> Running inside the claim
transaction: ProcessRenewalJob and ProcessDeploymentJob both call
UpdateStatus(Running) unconditionally on entry, so an early flip is
idempotent. On panic, the scheduler's panic recovery leaves the job
in Running which the existing stale-running reaper handles.

Tests (internal/repository/postgres/repo_test.go, skipped in -short):
  - TestJobRepository_ClaimPendingJobs_FlipsToRunning: seed 5 Pending,
    claim once, assert all 5 returned + DB rows Running, residual
    claim returns 0.
  - TestJobRepository_ClaimPendingJobs_ConcurrentDisjoint: seed M=40
    Pending Renewals, spawn N=8 goroutines each calling
    ClaimPendingJobs(_, JobTypeRenewal, 1) in a loop. Invariants:
    (a) no job ID claimed by more than one worker, (b) sum of claims
    == 40, (c) all 40 rows in Running state in the DB. Bounded
    empty-streak guard (20 iterations) covers SKIP LOCKED transient
    zeros under contention.
  - TestJobRepository_ClaimPendingByAgentID_TransitionsDeployments:
    seeds 2 Pending Deployment + 1 AwaitingCSR for agent A plus 1
    Pending Renewal for agent B (scope check). Asserts deployments
    flip to Running, AwaitingCSR is returned but preserved, agent B's
    renewal never appears.

Mock updates: testutil_test.go, lifecycle_test.go, verification_test.go
gained ClaimPendingJobs/ClaimPendingByAgentID on their mock job repos
mirroring the real Pending -> Running semantics. Mocks intentionally
do NOT write to StatusUpdates (that map tracks UpdateStatus() call
history specifically; the real claim path uses a bulk UPDATE, not
UpdateStatus).

Verification (CI-scope):
  - go build ./cmd/...: ok
  - go vet ./...: ok
  - go test -race -short on service, api/handler, api/middleware,
    scheduler, connector/..., domain, validation, tlsprobe: ok
  - Coverage gates: service 67.6% (>=55), handler 78.6% (>=60),
    middleware 80.0% (>=30), domain 92.7% (>=40). All hold.
  - golangci-lint 2.11.4: 0 issues
  - govulncheck: no vulnerabilities in call graph
  - Frontend: tsc clean, 218 vitest tests pass, vite build ok
  - helm lint + helm template: ok
  - Invariant sweeps: FOR UPDATE SKIP LOCKED present in job.go;
    H-1 through H-5 fixtures unchanged.

Refs: H-6 in certctl-audit-report.md
This commit is contained in:
shankar0123
2026-04-17 02:34:56 +00:00
parent 6315ef102a
commit 89b910a8f1
8 changed files with 709 additions and 11 deletions
+334
View File
@@ -7,6 +7,9 @@ import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
@@ -1682,3 +1685,334 @@ func TestEmptyResultSets(t *testing.T) {
t.Errorf("expected empty agent groups, got %d", len(groups))
}
}
// ============================================================
// H-6 (CWE-362) Claim-Based Concurrency Tests
//
// These tests exercise the `SELECT ... FOR UPDATE SKIP LOCKED` worker-queue pattern
// introduced to remediate the H-6 race condition. They validate two invariants:
//
// 1. Disjoint claim: under concurrent callers, no Pending row is returned to more
// than one worker (i.e. each claim is exclusive).
// 2. State transition: claimed rows are atomically flipped to Running inside the
// same transaction that locked them, so a subsequent query must see the row in
// the Running state and no other worker can observe it as Pending again.
//
// Skipped automatically in `-short` mode (CI) since they require a real PostgreSQL
// instance and take ~1s under contention.
// ============================================================
// seedPendingJobs creates n Pending renewal jobs against a single prerequisite
// certificate and returns the generated job IDs.
func seedPendingJobs(t *testing.T, ctx context.Context, db *sql.DB, certID string, n int) []string {
t.Helper()
certRepo := postgres.NewCertificateRepository(db)
jobRepo := postgres.NewJobRepository(db)
ownerID, teamID, issuerID, policyID := insertCertPrereqsRaw(t, db, ctx, certID)
now := time.Now().Truncate(time.Microsecond)
cert := &domain.ManagedCertificate{
ID: "mc-" + certID, Name: certID, CommonName: certID + ".example.com",
SANs: []string{}, OwnerID: ownerID, TeamID: teamID,
IssuerID: issuerID, RenewalPolicyID: policyID,
Status: domain.CertificateStatusActive,
ExpiresAt: now.Add(30 * 24 * time.Hour), Tags: map[string]string{},
CreatedAt: now, UpdatedAt: now,
}
if err := certRepo.Create(ctx, cert); err != nil {
t.Fatalf("seedPendingJobs: create cert failed: %v", err)
}
ids := make([]string, 0, n)
for i := 0; i < n; i++ {
job := &domain.Job{
ID: fmt.Sprintf("job-%s-%03d", certID, i),
Type: domain.JobTypeRenewal,
CertificateID: "mc-" + certID,
Status: domain.JobStatusPending,
Attempts: 0,
MaxAttempts: 3,
ScheduledAt: now,
CreatedAt: now,
}
if err := jobRepo.Create(ctx, job); err != nil {
t.Fatalf("seedPendingJobs: create job %d failed: %v", i, err)
}
ids = append(ids, job.ID)
}
return ids
}
// TestJobRepository_ClaimPendingJobs_FlipsToRunning validates the basic claim
// semantics: a single call transitions Pending rows to Running atomically, and
// the rows returned to the caller reflect the post-update state.
func TestJobRepository_ClaimPendingJobs_FlipsToRunning(t *testing.T) {
if testing.Short() {
t.Skip("integration test requires PostgreSQL")
}
tdb := getTestDB(t)
db := tdb.freshSchema(t)
jobRepo := postgres.NewJobRepository(db)
ctx := context.Background()
seeded := seedPendingJobs(t, ctx, db, "claimflip", 5)
claimed, err := jobRepo.ClaimPendingJobs(ctx, domain.JobTypeRenewal, 0)
if err != nil {
t.Fatalf("ClaimPendingJobs failed: %v", err)
}
if len(claimed) != len(seeded) {
t.Fatalf("len(claimed) = %d, want %d", len(claimed), len(seeded))
}
// In-memory return values must reflect the transitioned state.
for _, j := range claimed {
if j.Status != domain.JobStatusRunning {
t.Errorf("claimed job %s Status = %q, want %q", j.ID, j.Status, domain.JobStatusRunning)
}
}
// Persisted rows must also be Running — a fresh Get must not see Pending.
for _, id := range seeded {
got, err := jobRepo.Get(ctx, id)
if err != nil {
t.Fatalf("Get(%s) failed: %v", id, err)
}
if got.Status != domain.JobStatusRunning {
t.Errorf("persisted job %s Status = %q, want %q", id, got.Status, domain.JobStatusRunning)
}
}
// A subsequent claim must return zero rows — nothing is Pending anymore.
residual, err := jobRepo.ClaimPendingJobs(ctx, domain.JobTypeRenewal, 0)
if err != nil {
t.Fatalf("residual ClaimPendingJobs failed: %v", err)
}
if len(residual) != 0 {
t.Errorf("residual claims = %d, want 0 (all should be Running now)", len(residual))
}
}
// TestJobRepository_ClaimPendingJobs_ConcurrentDisjoint validates the core H-6
// invariant: under concurrent access, no row is handed to more than one worker.
//
// The test seeds M Pending jobs, fans out N goroutines each of which loops
// calling ClaimPendingJobs with limit=1, and finally asserts the union of all
// claimed IDs is exactly M with zero duplicates. Workers that transiently
// observe zero rows (because peers are holding the only remaining rows) re-check
// an atomic progress counter before exiting, so transient SKIP-LOCKED zeros do
// not cause premature termination.
func TestJobRepository_ClaimPendingJobs_ConcurrentDisjoint(t *testing.T) {
if testing.Short() {
t.Skip("integration test requires PostgreSQL")
}
tdb := getTestDB(t)
db := tdb.freshSchema(t)
jobRepo := postgres.NewJobRepository(db)
ctx := context.Background()
const M = 40 // seeded Pending jobs
const N = 8 // concurrent workers
seeded := seedPendingJobs(t, ctx, db, "concurrent", M)
seededSet := make(map[string]bool, M)
for _, id := range seeded {
seededSet[id] = true
}
var (
totalClaimed int64
allClaims []string
mu sync.Mutex
wg sync.WaitGroup
)
for w := 0; w < N; w++ {
wg.Add(1)
go func(worker int) {
defer wg.Done()
emptyStreak := 0
for iter := 0; iter < M*4; iter++ { // generous ceiling to prevent hangs
claimed, err := jobRepo.ClaimPendingJobs(ctx, domain.JobTypeRenewal, 1)
if err != nil {
t.Errorf("worker %d ClaimPendingJobs failed: %v", worker, err)
return
}
if len(claimed) == 0 {
// Transient zero (peer holds lock) vs. terminal zero (all claimed).
// Bail only once the shared counter proves work is done, but guard
// with a streak so we don't spin forever under starvation.
if atomic.LoadInt64(&totalClaimed) >= int64(M) {
return
}
emptyStreak++
if emptyStreak >= 20 {
return
}
time.Sleep(500 * time.Microsecond)
continue
}
emptyStreak = 0
mu.Lock()
for _, j := range claimed {
if j.Status != domain.JobStatusRunning {
t.Errorf("worker %d got job %s in Status=%q (want Running) — claim did not flip state", worker, j.ID, j.Status)
}
allClaims = append(allClaims, j.ID)
}
mu.Unlock()
atomic.AddInt64(&totalClaimed, int64(len(claimed)))
}
}(w)
}
wg.Wait()
// Invariant 1: no duplicate claims across the worker pool.
seen := make(map[string]int, len(allClaims))
for _, id := range allClaims {
seen[id]++
}
for id, count := range seen {
if count > 1 {
t.Errorf("job %s claimed %d times — SKIP LOCKED invariant violated", id, count)
}
}
// Invariant 2: every seeded job appears in the claim set exactly once.
if len(seen) != M {
t.Errorf("distinct claimed IDs = %d, want %d (all seeded jobs must be claimed)", len(seen), M)
}
for id := range seededSet {
if seen[id] == 0 {
t.Errorf("seeded job %s was never claimed by any worker", id)
}
}
// Invariant 3: persisted state reflects the transition — every seeded row
// is now Running; none is Pending.
for id := range seededSet {
got, err := jobRepo.Get(ctx, id)
if err != nil {
t.Fatalf("Get(%s) failed: %v", id, err)
}
if got.Status != domain.JobStatusRunning {
t.Errorf("job %s Status = %q, want %q", id, got.Status, domain.JobStatusRunning)
}
}
// Final progress counter must match the total number of seeded jobs.
if got := atomic.LoadInt64(&totalClaimed); got != int64(M) {
t.Errorf("totalClaimed = %d, want %d", got, M)
}
}
// TestJobRepository_ClaimPendingByAgentID_TransitionsDeployments validates the
// agent-scoped claim variant: Pending deployment rows for a given agent flip to
// Running; AwaitingCSR rows are returned but their state is preserved (the CSR
// submission path drives their next transition).
func TestJobRepository_ClaimPendingByAgentID_TransitionsDeployments(t *testing.T) {
if testing.Short() {
t.Skip("integration test requires PostgreSQL")
}
tdb := getTestDB(t)
db := tdb.freshSchema(t)
jobRepo := postgres.NewJobRepository(db)
agentRepo := postgres.NewAgentRepository(db)
ctx := context.Background()
ownerID, teamID, issuerID, policyID := insertCertPrereqsRaw(t, db, ctx, "agentclaim")
now := time.Now().Truncate(time.Microsecond)
cert := &domain.ManagedCertificate{
ID: "mc-agentclaim", Name: "agentclaim", CommonName: "agentclaim.example.com",
SANs: []string{}, OwnerID: ownerID, TeamID: teamID,
IssuerID: issuerID, RenewalPolicyID: policyID,
Status: domain.CertificateStatusActive,
ExpiresAt: now.Add(30 * 24 * time.Hour), Tags: map[string]string{},
CreatedAt: now, UpdatedAt: now,
}
if err := postgres.NewCertificateRepository(db).Create(ctx, cert); err != nil {
t.Fatalf("create cert failed: %v", err)
}
agent := &domain.Agent{
ID: "a-claim",
Name: "claim-agent",
Hostname: "claim-agent-host",
Status: domain.AgentStatusOnline,
RegisteredAt: now,
APIKeyHash: "hash-claim",
}
if err := agentRepo.Create(ctx, agent); err != nil {
t.Fatalf("create agent failed: %v", err)
}
agentID := agent.ID
mkJob := func(id string, typ domain.JobType, status domain.JobStatus) *domain.Job {
return &domain.Job{
ID: id, Type: typ, CertificateID: cert.ID,
AgentID: &agentID,
Status: status,
Attempts: 0,
MaxAttempts: 3,
ScheduledAt: now,
CreatedAt: now,
}
}
jobs := []*domain.Job{
mkJob("job-agentclaim-dep-1", domain.JobTypeDeployment, domain.JobStatusPending),
mkJob("job-agentclaim-dep-2", domain.JobTypeDeployment, domain.JobStatusPending),
mkJob("job-agentclaim-csr-1", domain.JobTypeRenewal, domain.JobStatusAwaitingCSR),
// A Pending Renewal (not Deployment) must NOT be returned by the per-agent claim.
mkJob("job-agentclaim-ren-pending", domain.JobTypeRenewal, domain.JobStatusPending),
}
for _, j := range jobs {
if err := jobRepo.Create(ctx, j); err != nil {
t.Fatalf("create job %s failed: %v", j.ID, err)
}
}
claimed, err := jobRepo.ClaimPendingByAgentID(ctx, agentID)
if err != nil {
t.Fatalf("ClaimPendingByAgentID failed: %v", err)
}
// Expect exactly the 2 deployments + 1 AwaitingCSR.
if len(claimed) != 3 {
t.Fatalf("len(claimed) = %d, want 3 (2 deployments + 1 AwaitingCSR)", len(claimed))
}
statusByID := map[string]domain.JobStatus{}
for _, j := range claimed {
statusByID[j.ID] = j.Status
}
// Both deployments must be Running in the returned slice (in-memory reflection).
for _, id := range []string{"job-agentclaim-dep-1", "job-agentclaim-dep-2"} {
if statusByID[id] != domain.JobStatusRunning {
t.Errorf("returned deployment %s Status = %q, want Running", id, statusByID[id])
}
}
// AwaitingCSR must remain AwaitingCSR.
if statusByID["job-agentclaim-csr-1"] != domain.JobStatusAwaitingCSR {
t.Errorf("returned AwaitingCSR Status = %q, want AwaitingCSR", statusByID["job-agentclaim-csr-1"])
}
// The unrelated Pending Renewal must not be returned.
if _, ok := statusByID["job-agentclaim-ren-pending"]; ok {
t.Errorf("Pending Renewal job was returned by ClaimPendingByAgentID — scope violation")
}
// Persisted state: deployments Running, AwaitingCSR unchanged, Pending Renewal still Pending.
for id, want := range map[string]domain.JobStatus{
"job-agentclaim-dep-1": domain.JobStatusRunning,
"job-agentclaim-dep-2": domain.JobStatusRunning,
"job-agentclaim-csr-1": domain.JobStatusAwaitingCSR,
"job-agentclaim-ren-pending": domain.JobStatusPending,
} {
got, err := jobRepo.Get(ctx, id)
if err != nil {
t.Fatalf("Get(%s) failed: %v", id, err)
}
if got.Status != want {
t.Errorf("persisted %s Status = %q, want %q", id, got.Status, want)
}
}
}