mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 12:21:31 +00:00
fix(repo/job): split UNION ALL + FOR UPDATE into two queries (Postgres-correctness)
Phase-9 docker compose smoke surfaced a latent production-breaking bug introduced by commit89b910a(H-6 atomic pending-job claim). The ClaimPendingByAgentID query in internal/repository/postgres/job.go combined UNION ALL with FOR UPDATE SKIP LOCKED in a single statement. Postgres rejects this with: ERROR: FOR UPDATE is not allowed with UNION/INTERSECT/EXCEPT Every agent work-poll returns HTTP 500 in any real deployment where an agent is actually polling. From the compose log: request_id=6da47015-... GET /api/v1/agents/agent-demo-1/work status=500 duration_ms=2 The schema-per-test unit harness in internal/repository/postgres/ *_test.go never inserted jobs and polled, so the SQL execution path was never exercised. The bug has been latent in master since89b910alanded. Fix: split the UNION ALL into two separate FOR UPDATE SKIP LOCKED queries within the existing transaction. The H-6 atomicity invariant (concurrent pollers never see the same Pending row) is preserved because: 1. The two queries run inside the same transaction (tx). 2. Each query independently locks its result rows with FOR UPDATE SKIP LOCKED. 3. The subsequent UPDATE that flips Pending -> Running runs in the same transaction, so the rows stay invisible to concurrent callers from initial SELECT through final COMMIT. 4. The transaction is the unit of consistency, not the single SQL statement. Two queries: - Branch 1 (direct): jobs.agent_id = + status='Pending' + type='Deployment'. ORDER BY created_at ASC, FOR UPDATE SKIP LOCKED. - Branch 2 (fallback): jobs.agent_id IS NULL + INNER JOIN deployment_targets dt ON jobs.target_id = dt.id WHERE dt.agent_id = . ORDER BY j.created_at ASC, FOR UPDATE OF j SKIP LOCKED (FOR UPDATE OF needed because the join brings in dt). Branch 3 (AwaitingCSR) is unchanged — already a single SELECT, not affected by the UNION restriction. Inline comment explains the fix's load-bearing-ness so a future refactor doesn't merge them back into one UNION query. Verify (sandbox): go vet clean; go test -short -count=1 PASS on internal/repository/postgres/. Workstation re-runs 'docker compose up' to confirm the agent's GET /work returns 200 with the next pending-deployment claim. Note: this is NOT a regression introduced by Auth Bundle 2 or the 2026-05-11 audit fixes; it's a pre-existing latent defect from H-6. Including in v2.1.0 because shipping with a broken agent work-poll would block the demo path on day one of release.
This commit is contained in:
@@ -462,42 +462,74 @@ func (r *JobRepository) ClaimPendingByAgentID(ctx context.Context, agentID strin
|
||||
|
||||
// Branch 1 + 2: Pending Deployment jobs (direct agent_id match or legacy
|
||||
// target fallback). These get flipped to Running atomically below.
|
||||
pendingRows, err := tx.QueryContext(ctx, `
|
||||
//
|
||||
// v2.1.0 Phase-9 cold-compose-smoke fix: Postgres rejects FOR UPDATE on
|
||||
// a UNION query result with `ERROR: FOR UPDATE is not allowed with
|
||||
// UNION/INTERSECT/EXCEPT`. The H-6 closure (commit 0a75a30) shipped this
|
||||
// as a single UNION ALL ... FOR UPDATE SKIP LOCKED query, which means
|
||||
// every agent work-poll returned HTTP 500 in any real deployment with a
|
||||
// running agent. The schema-per-test unit harness never polled work, so
|
||||
// the bug stayed latent until the Phase-9 docker compose smoke surfaced
|
||||
// it. Fix: split into two separate queries within the same transaction.
|
||||
// Each branch keeps its own FOR UPDATE SKIP LOCKED so the H-6 atomicity
|
||||
// invariant (concurrent pollers never see the same Pending row) is
|
||||
// preserved — the transaction wraps both calls + the subsequent UPDATE.
|
||||
pendingJobs := make([]*domain.Job, 0)
|
||||
|
||||
// Branch 1: Pending Deployment jobs with direct agent_id match.
|
||||
directRows, err := tx.QueryContext(ctx, `
|
||||
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at
|
||||
FROM jobs
|
||||
WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment'
|
||||
ORDER BY created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
`, agentID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query pending deployment jobs for agent (direct): %w", err)
|
||||
}
|
||||
for directRows.Next() {
|
||||
job, err := scanJob(directRows)
|
||||
if err != nil {
|
||||
directRows.Close()
|
||||
return nil, err
|
||||
}
|
||||
pendingJobs = append(pendingJobs, job)
|
||||
}
|
||||
if err := directRows.Err(); err != nil {
|
||||
directRows.Close()
|
||||
return nil, fmt.Errorf("error iterating pending deployment rows (direct): %w", err)
|
||||
}
|
||||
directRows.Close()
|
||||
|
||||
UNION ALL
|
||||
|
||||
// Branch 2: Pending Deployment jobs assigned via legacy target→agent
|
||||
// fallback (agent_id IS NULL on the job; target's agent_id = $1).
|
||||
fallbackRows, err := tx.QueryContext(ctx, `
|
||||
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
||||
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
||||
FROM jobs j
|
||||
INNER JOIN deployment_targets dt ON j.target_id = dt.id
|
||||
WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment'
|
||||
AND dt.agent_id = $1
|
||||
|
||||
ORDER BY created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
ORDER BY j.created_at ASC
|
||||
FOR UPDATE OF j SKIP LOCKED
|
||||
`, agentID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query pending deployment jobs for agent: %w", err)
|
||||
return nil, fmt.Errorf("failed to query pending deployment jobs for agent (fallback): %w", err)
|
||||
}
|
||||
|
||||
var pendingJobs []*domain.Job
|
||||
for pendingRows.Next() {
|
||||
job, err := scanJob(pendingRows)
|
||||
for fallbackRows.Next() {
|
||||
job, err := scanJob(fallbackRows)
|
||||
if err != nil {
|
||||
pendingRows.Close()
|
||||
fallbackRows.Close()
|
||||
return nil, err
|
||||
}
|
||||
pendingJobs = append(pendingJobs, job)
|
||||
}
|
||||
if err := pendingRows.Err(); err != nil {
|
||||
pendingRows.Close()
|
||||
return nil, fmt.Errorf("error iterating pending deployment rows: %w", err)
|
||||
if err := fallbackRows.Err(); err != nil {
|
||||
fallbackRows.Close()
|
||||
return nil, fmt.Errorf("error iterating pending deployment rows (fallback): %w", err)
|
||||
}
|
||||
pendingRows.Close()
|
||||
fallbackRows.Close()
|
||||
|
||||
// Branch 3: AwaitingCSR jobs for this agent. Locked with FOR UPDATE SKIP
|
||||
// LOCKED to prevent duplicate delivery to concurrent pollers, but state is
|
||||
|
||||
Reference in New Issue
Block a user