mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 17:41:29 +00:00
21aeed4f4e
Phase 0 closure (Path B2, post-rewrite):
addlicense sweep — adds the canonical certctl LLC copyright + BUSL-1.1
SPDX header to every production Go file. Template:
// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
Coverage: 338 / 338 production Go files (cmd/ + internal/, excluding
*_test.go and **/testdata/**). Pre-sweep coverage was 22 / 338 (6.5%);
post-sweep is 338 / 338 (100%).
Normalized 22 pre-existing legacy headers (`// Copyright (c) certctl`
+ `// SPDX-License-Identifier: BSL-1.1`) and 1 file using a
`Certctl Contributors` attribution. The legacy SPDX ID `BSL-1.1`
is non-standard; the official SPDX identifier for Business Source
License 1.1 is `BUSL-1.1` (capital U). All 338 files now share the
canonical form.
Generated via:
addlicense -c "certctl LLC" -y 2026 \
-f cowork/legal/copyright-header.tpl \
-ignore '**/testdata/**' -ignore '**/*_test.go' \
cmd/ internal/
Verification:
find cmd internal -name '*.go' -not -name '*_test.go' \
-not -path '*/testdata/*' \
-exec grep -L '^// Copyright 2026 certctl LLC' {} \; | wc -l
Returns: 0
gofmt clean. Header additions are comments only, no compile impact.
Closes: cowork/certctl-architecture-diligence-audit.html#fix-RED-4
702 lines
22 KiB
Go
702 lines
22 KiB
Go
// Copyright 2026 certctl LLC. All rights reserved.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"github.com/certctl-io/certctl/internal/repository"
|
|
"time"
|
|
|
|
"github.com/certctl-io/certctl/internal/domain"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// JobRepository implements repository.JobRepository
|
|
type JobRepository struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewJobRepository creates a new JobRepository
|
|
func NewJobRepository(db *sql.DB) *JobRepository {
|
|
return &JobRepository{db: db}
|
|
}
|
|
|
|
// List returns all jobs
|
|
func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
ORDER BY created_at DESC
|
|
`)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// Get retrieves a job by ID
|
|
func (r *JobRepository) Get(ctx context.Context, id string) (*domain.Job, error) {
|
|
row := r.db.QueryRowContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE id = $1
|
|
`, id)
|
|
|
|
job, err := scanJob(row)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, fmt.Errorf("job not found: %w", repository.ErrNotFound)
|
|
}
|
|
return nil, fmt.Errorf("failed to query job: %w", err)
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
// Create stores a new job
|
|
func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error {
|
|
if job.ID == "" {
|
|
job.ID = uuid.New().String()
|
|
}
|
|
|
|
err := r.db.QueryRowContext(ctx, `
|
|
INSERT INTO jobs (
|
|
id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
|
RETURNING id
|
|
`, job.ID, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts,
|
|
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt,
|
|
job.CreatedAt).Scan(&job.ID)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create job: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Update modifies an existing job
|
|
func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error {
|
|
result, err := r.db.ExecContext(ctx, `
|
|
UPDATE jobs SET
|
|
type = $1,
|
|
certificate_id = $2,
|
|
target_id = $3,
|
|
agent_id = $4,
|
|
status = $5,
|
|
attempts = $6,
|
|
max_attempts = $7,
|
|
last_error = $8,
|
|
scheduled_at = $9,
|
|
started_at = $10,
|
|
completed_at = $11
|
|
WHERE id = $12
|
|
`, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts,
|
|
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt,
|
|
job.CompletedAt, job.ID)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update job: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
|
}
|
|
|
|
if rows == 0 {
|
|
return fmt.Errorf("job not found: %w", repository.ErrNotFound)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Delete removes a job
|
|
func (r *JobRepository) Delete(ctx context.Context, id string) error {
|
|
result, err := r.db.ExecContext(ctx, "DELETE FROM jobs WHERE id = $1", id)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete job: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
|
}
|
|
|
|
if rows == 0 {
|
|
return fmt.Errorf("job not found: %w", repository.ErrNotFound)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListByStatus returns jobs with a specific status
|
|
func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatus) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE status = $1
|
|
ORDER BY created_at DESC
|
|
`, status)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query jobs by status: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// ListByCertificate returns all jobs for a certificate
|
|
func (r *JobRepository) ListByCertificate(ctx context.Context, certID string) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE certificate_id = $1
|
|
ORDER BY created_at DESC
|
|
`, certID)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query jobs for certificate: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// UpdateStatus updates a job's status and optional error message
|
|
func (r *JobRepository) UpdateStatus(ctx context.Context, id string, status domain.JobStatus, errMsg string) error {
|
|
var lastError *string
|
|
if errMsg != "" {
|
|
lastError = &errMsg
|
|
}
|
|
|
|
result, err := r.db.ExecContext(ctx, `
|
|
UPDATE jobs SET status = $1, last_error = $2 WHERE id = $3
|
|
`, status, lastError, id)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update job status: %w", err)
|
|
}
|
|
|
|
rows, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get rows affected: %w", err)
|
|
}
|
|
|
|
if rows == 0 {
|
|
return fmt.Errorf("job not found: %w", repository.ErrNotFound)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetPendingJobs returns jobs not yet processed of a specific type.
|
|
//
|
|
// The SELECT uses FOR UPDATE SKIP LOCKED so that concurrent scheduler replicas
|
|
// cannot observe the same rows when invoked inside a transaction; combine with
|
|
// a subsequent UPDATE to Running for correct dispatch semantics. For the
|
|
// standard production dispatch path, prefer ClaimPendingJobs which wraps the
|
|
// lock, read, and state transition in a single transaction and is the
|
|
// authoritative race-free claim primitive (CWE-362 fix for H-6).
|
|
func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE type = $1 AND status = $2
|
|
ORDER BY scheduled_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
`, jobType, domain.JobStatusPending)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query pending jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating job rows: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// ClaimPendingJobs atomically claims up to `limit` Pending jobs and transitions
|
|
// them to Running inside a single transaction. The SELECT uses FOR UPDATE SKIP
|
|
// LOCKED so concurrent scheduler replicas observe disjoint result sets — each
|
|
// row can be claimed by exactly one caller per tick (CWE-362 fix for H-6).
|
|
//
|
|
// Passing an empty jobType claims any type. Passing limit<=0 claims all
|
|
// available rows. The claimed rows are returned with Status already set to
|
|
// domain.JobStatusRunning.
|
|
//
|
|
// Downstream processors (ProcessRenewalJob, ProcessDeploymentJob) already call
|
|
// UpdateStatus(Running) unconditionally on entry, so this pre-flip is
|
|
// idempotent with respect to existing processing logic.
|
|
func (r *JobRepository) ClaimPendingJobs(ctx context.Context, jobType domain.JobType, limit int) ([]*domain.Job, error) {
|
|
tx, err := r.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to begin claim transaction: %w", err)
|
|
}
|
|
// Rollback is a no-op after Commit — safe deferred cleanup if an error path
|
|
// triggers an early return before Commit().
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
// Build the SELECT — jobType="" means any type, limit<=0 means unlimited.
|
|
query := `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE status = $1`
|
|
args := []interface{}{domain.JobStatusPending}
|
|
if jobType != "" {
|
|
query += ` AND type = $2`
|
|
args = append(args, jobType)
|
|
}
|
|
query += `
|
|
ORDER BY scheduled_at ASC
|
|
FOR UPDATE SKIP LOCKED`
|
|
if limit > 0 {
|
|
query += fmt.Sprintf(` LIMIT %d`, limit)
|
|
}
|
|
|
|
rows, err := tx.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query claimable jobs: %w", err)
|
|
}
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
rows.Close()
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
rows.Close()
|
|
return nil, fmt.Errorf("error iterating claimable job rows: %w", err)
|
|
}
|
|
rows.Close()
|
|
|
|
if len(jobs) == 0 {
|
|
// No rows to claim — commit the (read-only) tx and return.
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, fmt.Errorf("failed to commit empty claim tx: %w", err)
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// Flip claimed rows to Running. Build IN clause safely with placeholders.
|
|
ids := make([]interface{}, len(jobs))
|
|
placeholders := make([]byte, 0, len(jobs)*5)
|
|
for i, job := range jobs {
|
|
ids[i] = job.ID
|
|
if i > 0 {
|
|
placeholders = append(placeholders, ',')
|
|
}
|
|
placeholders = append(placeholders, fmt.Sprintf("$%d", i+2)...)
|
|
}
|
|
updateQuery := fmt.Sprintf(
|
|
`UPDATE jobs SET status = $1 WHERE id IN (%s)`,
|
|
string(placeholders),
|
|
)
|
|
updateArgs := append([]interface{}{domain.JobStatusRunning}, ids...)
|
|
if _, err := tx.ExecContext(ctx, updateQuery, updateArgs...); err != nil {
|
|
return nil, fmt.Errorf("failed to transition claimed jobs to Running: %w", err)
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, fmt.Errorf("failed to commit claim transaction: %w", err)
|
|
}
|
|
|
|
// Reflect the committed state in the returned objects.
|
|
for _, job := range jobs {
|
|
job.Status = domain.JobStatusRunning
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// ListPendingByAgentID returns pending deployment jobs and AwaitingCSR jobs for
|
|
// a specific agent. Deployment jobs are matched by agent_id directly (set at
|
|
// creation time), with a fallback for legacy jobs where agent_id is NULL but
|
|
// target_id resolves to the agent via deployment_targets. AwaitingCSR jobs are
|
|
// matched through certificate → target mappings → agent ownership.
|
|
//
|
|
// The SELECT uses FOR UPDATE SKIP LOCKED so concurrent pollers (e.g. two agent
|
|
// instances running with the same agent_id) cannot observe the same rows when
|
|
// this method is invoked inside a transaction. For the production agent work
|
|
// poll path, prefer ClaimPendingByAgentID which additionally transitions
|
|
// claimed Pending deployment rows to Running atomically (H-6 CWE-362 fix).
|
|
func (r *JobRepository) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment'
|
|
|
|
UNION ALL
|
|
|
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
|
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
|
FROM jobs j
|
|
INNER JOIN deployment_targets dt ON j.target_id = dt.id
|
|
WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment'
|
|
AND dt.agent_id = $1
|
|
|
|
UNION ALL
|
|
|
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
|
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
|
FROM jobs j
|
|
WHERE j.status = 'AwaitingCSR'
|
|
AND j.type IN ('Renewal', 'Issuance')
|
|
AND EXISTS (
|
|
SELECT 1 FROM certificate_target_mappings ctm
|
|
INNER JOIN deployment_targets dt ON ctm.target_id = dt.id
|
|
WHERE ctm.certificate_id = j.certificate_id
|
|
AND dt.agent_id = $1
|
|
)
|
|
|
|
ORDER BY created_at ASC
|
|
`, agentID)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query pending jobs for agent: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating pending agent job rows: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// ClaimPendingByAgentID atomically claims agent work inside a single
|
|
// transaction. Pending Deployment jobs assigned to the agent (directly via
|
|
// agent_id, or via legacy target→agent fallback) are transitioned from
|
|
// Pending to Running. AwaitingCSR Renewal/Issuance jobs linked to the agent
|
|
// via certificate → target mappings are locked with FOR UPDATE SKIP LOCKED
|
|
// and returned without a state transition — the flow requires the agent to
|
|
// submit a CSR to advance state, and pre-flipping AwaitingCSR would violate
|
|
// the renewal state machine (CWE-362 fix for H-6).
|
|
//
|
|
// Claimed rows are invisible to other concurrent claim calls for the lifetime
|
|
// of the transaction; rows claimed as Running remain invisible after commit
|
|
// because ListPendingByAgentID's filter is status='Pending'.
|
|
func (r *JobRepository) ClaimPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
|
|
tx, err := r.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to begin agent claim transaction: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback() }()
|
|
|
|
// Branch 1 + 2: Pending Deployment jobs (direct agent_id match or legacy
|
|
// target fallback). These get flipped to Running atomically below.
|
|
//
|
|
// v2.1.0 Phase-9 cold-compose-smoke fix: Postgres rejects FOR UPDATE on
|
|
// a UNION query result with `ERROR: FOR UPDATE is not allowed with
|
|
// UNION/INTERSECT/EXCEPT`. The H-6 closure (commit 0a75a30) shipped this
|
|
// as a single UNION ALL ... FOR UPDATE SKIP LOCKED query, which means
|
|
// every agent work-poll returned HTTP 500 in any real deployment with a
|
|
// running agent. The schema-per-test unit harness never polled work, so
|
|
// the bug stayed latent until the Phase-9 docker compose smoke surfaced
|
|
// it. Fix: split into two separate queries within the same transaction.
|
|
// Each branch keeps its own FOR UPDATE SKIP LOCKED so the H-6 atomicity
|
|
// invariant (concurrent pollers never see the same Pending row) is
|
|
// preserved — the transaction wraps both calls + the subsequent UPDATE.
|
|
pendingJobs := make([]*domain.Job, 0)
|
|
|
|
// Branch 1: Pending Deployment jobs with direct agent_id match.
|
|
directRows, err := tx.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment'
|
|
ORDER BY created_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
`, agentID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query pending deployment jobs for agent (direct): %w", err)
|
|
}
|
|
for directRows.Next() {
|
|
job, err := scanJob(directRows)
|
|
if err != nil {
|
|
directRows.Close()
|
|
return nil, err
|
|
}
|
|
pendingJobs = append(pendingJobs, job)
|
|
}
|
|
if err := directRows.Err(); err != nil {
|
|
directRows.Close()
|
|
return nil, fmt.Errorf("error iterating pending deployment rows (direct): %w", err)
|
|
}
|
|
directRows.Close()
|
|
|
|
// Branch 2: Pending Deployment jobs assigned via legacy target→agent
|
|
// fallback (agent_id IS NULL on the job; target's agent_id = $1).
|
|
fallbackRows, err := tx.QueryContext(ctx, `
|
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
|
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
|
FROM jobs j
|
|
INNER JOIN deployment_targets dt ON j.target_id = dt.id
|
|
WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment'
|
|
AND dt.agent_id = $1
|
|
ORDER BY j.created_at ASC
|
|
FOR UPDATE OF j SKIP LOCKED
|
|
`, agentID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query pending deployment jobs for agent (fallback): %w", err)
|
|
}
|
|
for fallbackRows.Next() {
|
|
job, err := scanJob(fallbackRows)
|
|
if err != nil {
|
|
fallbackRows.Close()
|
|
return nil, err
|
|
}
|
|
pendingJobs = append(pendingJobs, job)
|
|
}
|
|
if err := fallbackRows.Err(); err != nil {
|
|
fallbackRows.Close()
|
|
return nil, fmt.Errorf("error iterating pending deployment rows (fallback): %w", err)
|
|
}
|
|
fallbackRows.Close()
|
|
|
|
// Branch 3: AwaitingCSR jobs for this agent. Locked with FOR UPDATE SKIP
|
|
// LOCKED to prevent duplicate delivery to concurrent pollers, but state is
|
|
// NOT transitioned — the agent advances state via CSR submission.
|
|
csrRows, err := tx.QueryContext(ctx, `
|
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
|
|
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
|
|
FROM jobs j
|
|
WHERE j.status = 'AwaitingCSR'
|
|
AND j.type IN ('Renewal', 'Issuance')
|
|
AND EXISTS (
|
|
SELECT 1 FROM certificate_target_mappings ctm
|
|
INNER JOIN deployment_targets dt ON ctm.target_id = dt.id
|
|
WHERE ctm.certificate_id = j.certificate_id
|
|
AND dt.agent_id = $1
|
|
)
|
|
ORDER BY j.created_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
`, agentID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query AwaitingCSR jobs for agent: %w", err)
|
|
}
|
|
|
|
var csrJobs []*domain.Job
|
|
for csrRows.Next() {
|
|
job, err := scanJob(csrRows)
|
|
if err != nil {
|
|
csrRows.Close()
|
|
return nil, err
|
|
}
|
|
csrJobs = append(csrJobs, job)
|
|
}
|
|
if err := csrRows.Err(); err != nil {
|
|
csrRows.Close()
|
|
return nil, fmt.Errorf("error iterating AwaitingCSR rows: %w", err)
|
|
}
|
|
csrRows.Close()
|
|
|
|
// Transition locked Pending deployments to Running before commit.
|
|
if len(pendingJobs) > 0 {
|
|
ids := make([]interface{}, len(pendingJobs))
|
|
placeholders := make([]byte, 0, len(pendingJobs)*5)
|
|
for i, job := range pendingJobs {
|
|
ids[i] = job.ID
|
|
if i > 0 {
|
|
placeholders = append(placeholders, ',')
|
|
}
|
|
placeholders = append(placeholders, fmt.Sprintf("$%d", i+2)...)
|
|
}
|
|
updateQuery := fmt.Sprintf(
|
|
`UPDATE jobs SET status = $1 WHERE id IN (%s)`,
|
|
string(placeholders),
|
|
)
|
|
updateArgs := append([]interface{}{domain.JobStatusRunning}, ids...)
|
|
if _, err := tx.ExecContext(ctx, updateQuery, updateArgs...); err != nil {
|
|
return nil, fmt.Errorf("failed to transition claimed deployment jobs to Running: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := tx.Commit(); err != nil {
|
|
return nil, fmt.Errorf("failed to commit agent claim transaction: %w", err)
|
|
}
|
|
|
|
// Reflect the committed state in returned Pending deployment jobs; leave
|
|
// AwaitingCSR jobs untouched.
|
|
for _, job := range pendingJobs {
|
|
job.Status = domain.JobStatusRunning
|
|
}
|
|
|
|
// Preserve the legacy ordering: Pending deployments first, AwaitingCSR
|
|
// second. Callers that want a strict created_at merge can re-sort.
|
|
return append(pendingJobs, csrJobs...), nil
|
|
}
|
|
|
|
// ListTimedOutAwaitingJobs returns jobs stuck in AwaitingCSR or AwaitingApproval past
|
|
// their respective cutoff timestamps (created_at < cutoff). The reaper loop transitions
|
|
// them to Failed; I-001's retry loop then auto-promotes eligible Failed jobs back to
|
|
// Pending. I-003 coverage-gap closure.
|
|
func (r *JobRepository) ListTimedOutAwaitingJobs(ctx context.Context, csrCutoff, approvalCutoff time.Time) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
|
|
last_error, scheduled_at, started_at, completed_at, created_at
|
|
FROM jobs
|
|
WHERE (status = $1 AND created_at < $2)
|
|
OR (status = $3 AND created_at < $4)
|
|
ORDER BY created_at ASC
|
|
`, domain.JobStatusAwaitingCSR, csrCutoff, domain.JobStatusAwaitingApproval, approvalCutoff)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query timed-out awaiting jobs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating timed-out job rows: %w", err)
|
|
}
|
|
|
|
return jobs, nil
|
|
}
|
|
|
|
// ListJobsWithOfflineAgents returns jobs in Running status whose owning
|
|
// agent's last_heartbeat_at is older than agentCutoff. Bundle C / Audit
|
|
// M-016 (CWE-754): closes the gap that ListTimedOutAwaitingJobs left
|
|
// open — jobs claimed by an agent that subsequently dies sit in Running
|
|
// indefinitely. The query joins jobs to agents on agent_id and filters
|
|
// to (status='Running' AND agent.last_heartbeat_at < agentCutoff).
|
|
//
|
|
// Jobs without an agent_id (server-side keygen path) are intentionally
|
|
// excluded: they have no agent to be "offline".
|
|
func (r *JobRepository) ListJobsWithOfflineAgents(ctx context.Context, agentCutoff time.Time) ([]*domain.Job, error) {
|
|
rows, err := r.db.QueryContext(ctx, `
|
|
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status,
|
|
j.attempts, j.max_attempts, j.last_error, j.scheduled_at,
|
|
j.started_at, j.completed_at, j.created_at
|
|
FROM jobs j
|
|
JOIN agents a ON a.id = j.agent_id
|
|
WHERE j.status = $1
|
|
AND j.agent_id IS NOT NULL
|
|
AND a.last_heartbeat_at IS NOT NULL
|
|
AND a.last_heartbeat_at < $2
|
|
ORDER BY j.started_at ASC NULLS FIRST
|
|
`, domain.JobStatusRunning, agentCutoff)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query jobs with offline agents: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []*domain.Job
|
|
for rows.Next() {
|
|
job, err := scanJob(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, fmt.Errorf("error iterating offline-agent job rows: %w", err)
|
|
}
|
|
return jobs, nil
|
|
}
|
|
|
|
// scanJob scans a job from a row or rows
|
|
func scanJob(scanner interface {
|
|
Scan(...interface{}) error
|
|
}) (*domain.Job, error) {
|
|
var job domain.Job
|
|
err := scanner.Scan(&job.ID, &job.Type, &job.CertificateID, &job.TargetID,
|
|
&job.AgentID, &job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError,
|
|
&job.ScheduledAt, &job.StartedAt, &job.CompletedAt, &job.CreatedAt)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan job: %w", err)
|
|
}
|
|
|
|
return &job, nil
|
|
}
|