diff --git a/internal/repository/postgres/job.go b/internal/repository/postgres/job.go index ae867a6..030e309 100644 --- a/internal/repository/postgres/job.go +++ b/internal/repository/postgres/job.go @@ -462,42 +462,74 @@ func (r *JobRepository) ClaimPendingByAgentID(ctx context.Context, agentID strin // Branch 1 + 2: Pending Deployment jobs (direct agent_id match or legacy // target fallback). These get flipped to Running atomically below. - pendingRows, err := tx.QueryContext(ctx, ` + // + // v2.1.0 Phase-9 cold-compose-smoke fix: Postgres rejects FOR UPDATE on + // a UNION query result with `ERROR: FOR UPDATE is not allowed with + // UNION/INTERSECT/EXCEPT`. The H-6 closure (commit 0a75a30) shipped this + // as a single UNION ALL ... FOR UPDATE SKIP LOCKED query, which means + // every agent work-poll returned HTTP 500 in any real deployment with a + // running agent. The schema-per-test unit harness never polled work, so + // the bug stayed latent until the Phase-9 docker compose smoke surfaced + // it. Fix: split into two separate queries within the same transaction. + // Each branch keeps its own FOR UPDATE SKIP LOCKED so the H-6 atomicity + // invariant (concurrent pollers never see the same Pending row) is + // preserved — the transaction wraps both calls + the subsequent UPDATE. + pendingJobs := make([]*domain.Job, 0) + + // Branch 1: Pending Deployment jobs with direct agent_id match. + directRows, err := tx.QueryContext(ctx, ` SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts, last_error, scheduled_at, started_at, completed_at, created_at FROM jobs WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment' + ORDER BY created_at ASC + FOR UPDATE SKIP LOCKED + `, agentID) + if err != nil { + return nil, fmt.Errorf("failed to query pending deployment jobs for agent (direct): %w", err) + } + for directRows.Next() { + job, err := scanJob(directRows) + if err != nil { + directRows.Close() + return nil, err + } + pendingJobs = append(pendingJobs, job) + } + if err := directRows.Err(); err != nil { + directRows.Close() + return nil, fmt.Errorf("error iterating pending deployment rows (direct): %w", err) + } + directRows.Close() - UNION ALL - + // Branch 2: Pending Deployment jobs assigned via legacy target→agent + // fallback (agent_id IS NULL on the job; target's agent_id = $1). + fallbackRows, err := tx.QueryContext(ctx, ` SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts, j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at FROM jobs j INNER JOIN deployment_targets dt ON j.target_id = dt.id WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment' AND dt.agent_id = $1 - - ORDER BY created_at ASC - FOR UPDATE SKIP LOCKED + ORDER BY j.created_at ASC + FOR UPDATE OF j SKIP LOCKED `, agentID) if err != nil { - return nil, fmt.Errorf("failed to query pending deployment jobs for agent: %w", err) + return nil, fmt.Errorf("failed to query pending deployment jobs for agent (fallback): %w", err) } - - var pendingJobs []*domain.Job - for pendingRows.Next() { - job, err := scanJob(pendingRows) + for fallbackRows.Next() { + job, err := scanJob(fallbackRows) if err != nil { - pendingRows.Close() + fallbackRows.Close() return nil, err } pendingJobs = append(pendingJobs, job) } - if err := pendingRows.Err(); err != nil { - pendingRows.Close() - return nil, fmt.Errorf("error iterating pending deployment rows: %w", err) + if err := fallbackRows.Err(); err != nil { + fallbackRows.Close() + return nil, fmt.Errorf("error iterating pending deployment rows (fallback): %w", err) } - pendingRows.Close() + fallbackRows.Close() // Branch 3: AwaitingCSR jobs for this agent. Locked with FOR UPDATE SKIP // LOCKED to prevent duplicate delivery to concurrent pollers, but state is