Files
certctl/internal/repository/postgres/job.go
T
Shankar 345bafe5aa Bundle C: Renewal/reliability cluster — 7 findings closed
Closes M-006 + M-007 + M-008 + M-015 + M-016 + M-019 + M-020 from
comprehensive-audit-2026-04-25. M-028 was already closed by the
Bundle B CI follow-up.

M-006 (CWE-913) — Idempotent migration 000014
  migrations/000014_policy_violation_severity_check.up.sql:
    Prepended ALTER TABLE ... DROP CONSTRAINT IF EXISTS before the
    ADD. Mirrors the down migration's existing IF EXISTS shape and
    the M-7 idempotent-index idiom. Re-runs against partially-applied
    DBs now succeed.

M-007 — Bulk-op partial-failure tests (3 new)
  internal/api/handler/bulk_partial_failure_test.go:
    TestBulkRevoke_PartialFailure_ReportsBoth
    TestBulkRenew_PartialFailure_ReportsBoth
    TestBulkReassign_PartialFailure_ReportsBoth
  Each asserts HTTP 200 + both success/failure counters round-trip
  + per-cert errors[] preserved with non-empty messages so operators
  can correlate each failure to its certificate ID.

M-008 — Admin-gated handler enumeration pin (verified-already-clean)
  Recon: only one admin-gated handler — bulk_revocation.go — with
  full 3-branch test triplet already in place. health.go calls
  IsAdmin informationally to surface the flag to the GUI without
  gating.
  internal/api/handler/m008_admin_gate_test.go:
    Walks every handler .go file, asserts every middleware.IsAdmin
    call site is in AdminGatedHandlers (with required test triplet)
    or InformationalIsAdminCallers (justified). Adding a new admin
    gate without updating both the constant AND adding the test
    triplet fails CI.

M-015 — Single-profile cardinality pin (verified-already-clean)
  Audit claim 'no cardinality validation' was wrong — enforced at
  struct level. domain.ManagedCertificate.{CertificateProfileID,
  RenewalPolicyID,IssuerID,OwnerID} and RenewalPolicy.
  CertificateProfileID are bare strings, not slices.
  internal/domain/m015_cardinality_test.go:
    reflect-based pin on kind=String. Schema change to N:N would
    have to update renewal.go's lookup loop in the same commit.

M-016 (CWE-754) — Reap stale-agent jobs
  internal/repository/postgres/job.go::ListJobsWithOfflineAgents:
    JOIN jobs to agents on agent_id, filter (status=Running AND
    a.last_heartbeat_at < cutoff), exclude server-keygen jobs.
  internal/service/job.go::ReapJobsWithOfflineAgents:
    Flips matched jobs to Failed reason agent_offline so I-001
    retry loop re-queues them on a healthy agent. Records audit
    event per reap.
  internal/scheduler/scheduler.go:
    Scheduler.runJobTimeout cycle now calls both reaper arms.
    agentOfflineJobTTL default 5min (5x agent-health-check default);
    SetAgentOfflineJobTTL knob for operator override.
  internal/service/job_offline_agent_reaper_test.go: 6 unit tests
  cover happy path, server-keygen-skip, non-Running-skip, non-
  positive-TTL fail-loud, repo-error propagation, audit-event
  recording.

M-019 — Configurable ARI HTTP timeout
  Audit claim 'no fallback timeout' was wrong — ari.go:52 already
  had a 15s timeout. Bundle C makes it configurable.
  internal/connector/issuer/acme/acme.go:
    Config.ARIHTTPTimeoutSeconds field with env path
    CERTCTL_ACME_ARI_HTTP_TIMEOUT_SECONDS.
  internal/connector/issuer/acme/ari.go:
    Both HTTP clients (GetRenewalInfo + getARIEndpoint) now use the
    new ariHTTPTimeout() helper. Zero / negative / nil-config all
    fall back to the historic 15s default.
  ari_timeout_test.go: 4 dispatch arm tests.

M-020 (CWE-770) — OCSP DoS hardening
  Pre-bundle the noAuthHandler chain had no rate limit. An attacker
  could DoS the OCSP responder, which for fail-open relying parties
  is a revocation bypass.
  cmd/server/main.go:
    noAuthHandler refactored from fixed middleware.Chain(...) to a
    conditional slice that appends middleware.NewRateLimiter when
    cfg.RateLimit.Enabled. Per-IP keying applies; OCSP/CRL/EST/SCEP
    are unauth.
  docs/security.md (NEW):
    Operator runbook documenting Must-Staple TLS Feature extension
    RFC 7633 as the architectural fix for fail-open relying parties.
    Profile-flip guidance + nginx/Apache/HAProxy/Envoy stapling
    snippets + explicit scope statement on what the rate limiter
    alone does NOT solve.

Audit deliverables:
  cowork/comprehensive-audit-2026-04-25/audit-report.md: score
    31/55 -> 38/55 closed (Medium 13/27 -> 20/27).
  cowork/comprehensive-audit-2026-04-25/findings.yaml: 7 status
    flips open -> closed with closure notes citing the Bundle C
    mechanism.
  certctl/CHANGELOG.md: Bundle C section under [unreleased].

Verification:
  go vet ./internal/service ./internal/scheduler ./internal/connector/issuer/acme
    ./internal/api/handler ./internal/domain ./cmd/server     clean
  go test -count=1 -short on the same packages              all green
  helm template + helm lint                                 clean
  internal/repository/postgres setup-fail                   sandbox disk
    pressure (same on master HEAD before this branch)
2026-04-27 00:08:25 +00:00

667 lines
21 KiB
Go

package postgres
import (
"github.com/shankar0123/certctl/internal/repository"
"context"
"database/sql"
"fmt"
"time"
"github.com/google/uuid"
"github.com/shankar0123/certctl/internal/domain"
)
// JobRepository implements repository.JobRepository
type JobRepository struct {
db *sql.DB
}
// NewJobRepository creates a new JobRepository
func NewJobRepository(db *sql.DB) *JobRepository {
return &JobRepository{db: db}
}
// List returns all jobs
func (r *JobRepository) List(ctx context.Context) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
ORDER BY created_at DESC
`)
if err != nil {
return nil, fmt.Errorf("failed to query jobs: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating job rows: %w", err)
}
return jobs, nil
}
// Get retrieves a job by ID
func (r *JobRepository) Get(ctx context.Context, id string) (*domain.Job, error) {
row := r.db.QueryRowContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE id = $1
`, id)
job, err := scanJob(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("job not found: %w", repository.ErrNotFound)
}
return nil, fmt.Errorf("failed to query job: %w", err)
}
return job, nil
}
// Create stores a new job
func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error {
if job.ID == "" {
job.ID = uuid.New().String()
}
err := r.db.QueryRowContext(ctx, `
INSERT INTO jobs (
id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING id
`, job.ID, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts,
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt, job.CompletedAt,
job.CreatedAt).Scan(&job.ID)
if err != nil {
return fmt.Errorf("failed to create job: %w", err)
}
return nil
}
// Update modifies an existing job
func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error {
result, err := r.db.ExecContext(ctx, `
UPDATE jobs SET
type = $1,
certificate_id = $2,
target_id = $3,
agent_id = $4,
status = $5,
attempts = $6,
max_attempts = $7,
last_error = $8,
scheduled_at = $9,
started_at = $10,
completed_at = $11
WHERE id = $12
`, job.Type, job.CertificateID, job.TargetID, job.AgentID, job.Status, job.Attempts,
job.MaxAttempts, job.LastError, job.ScheduledAt, job.StartedAt,
job.CompletedAt, job.ID)
if err != nil {
return fmt.Errorf("failed to update job: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("job not found: %w", repository.ErrNotFound)
}
return nil
}
// Delete removes a job
func (r *JobRepository) Delete(ctx context.Context, id string) error {
result, err := r.db.ExecContext(ctx, "DELETE FROM jobs WHERE id = $1", id)
if err != nil {
return fmt.Errorf("failed to delete job: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("job not found: %w", repository.ErrNotFound)
}
return nil
}
// ListByStatus returns jobs with a specific status
func (r *JobRepository) ListByStatus(ctx context.Context, status domain.JobStatus) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE status = $1
ORDER BY created_at DESC
`, status)
if err != nil {
return nil, fmt.Errorf("failed to query jobs by status: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating job rows: %w", err)
}
return jobs, nil
}
// ListByCertificate returns all jobs for a certificate
func (r *JobRepository) ListByCertificate(ctx context.Context, certID string) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE certificate_id = $1
ORDER BY created_at DESC
`, certID)
if err != nil {
return nil, fmt.Errorf("failed to query jobs for certificate: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating job rows: %w", err)
}
return jobs, nil
}
// UpdateStatus updates a job's status and optional error message
func (r *JobRepository) UpdateStatus(ctx context.Context, id string, status domain.JobStatus, errMsg string) error {
var lastError *string
if errMsg != "" {
lastError = &errMsg
}
result, err := r.db.ExecContext(ctx, `
UPDATE jobs SET status = $1, last_error = $2 WHERE id = $3
`, status, lastError, id)
if err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("job not found: %w", repository.ErrNotFound)
}
return nil
}
// GetPendingJobs returns jobs not yet processed of a specific type.
//
// The SELECT uses FOR UPDATE SKIP LOCKED so that concurrent scheduler replicas
// cannot observe the same rows when invoked inside a transaction; combine with
// a subsequent UPDATE to Running for correct dispatch semantics. For the
// standard production dispatch path, prefer ClaimPendingJobs which wraps the
// lock, read, and state transition in a single transaction and is the
// authoritative race-free claim primitive (CWE-362 fix for H-6).
func (r *JobRepository) GetPendingJobs(ctx context.Context, jobType domain.JobType) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE type = $1 AND status = $2
ORDER BY scheduled_at ASC
FOR UPDATE SKIP LOCKED
`, jobType, domain.JobStatusPending)
if err != nil {
return nil, fmt.Errorf("failed to query pending jobs: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating job rows: %w", err)
}
return jobs, nil
}
// ClaimPendingJobs atomically claims up to `limit` Pending jobs and transitions
// them to Running inside a single transaction. The SELECT uses FOR UPDATE SKIP
// LOCKED so concurrent scheduler replicas observe disjoint result sets — each
// row can be claimed by exactly one caller per tick (CWE-362 fix for H-6).
//
// Passing an empty jobType claims any type. Passing limit<=0 claims all
// available rows. The claimed rows are returned with Status already set to
// domain.JobStatusRunning.
//
// Downstream processors (ProcessRenewalJob, ProcessDeploymentJob) already call
// UpdateStatus(Running) unconditionally on entry, so this pre-flip is
// idempotent with respect to existing processing logic.
func (r *JobRepository) ClaimPendingJobs(ctx context.Context, jobType domain.JobType, limit int) ([]*domain.Job, error) {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin claim transaction: %w", err)
}
// Rollback is a no-op after Commit — safe deferred cleanup if an error path
// triggers an early return before Commit().
defer func() { _ = tx.Rollback() }()
// Build the SELECT — jobType="" means any type, limit<=0 means unlimited.
query := `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE status = $1`
args := []interface{}{domain.JobStatusPending}
if jobType != "" {
query += ` AND type = $2`
args = append(args, jobType)
}
query += `
ORDER BY scheduled_at ASC
FOR UPDATE SKIP LOCKED`
if limit > 0 {
query += fmt.Sprintf(` LIMIT %d`, limit)
}
rows, err := tx.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query claimable jobs: %w", err)
}
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
rows.Close()
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
rows.Close()
return nil, fmt.Errorf("error iterating claimable job rows: %w", err)
}
rows.Close()
if len(jobs) == 0 {
// No rows to claim — commit the (read-only) tx and return.
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit empty claim tx: %w", err)
}
return nil, nil
}
// Flip claimed rows to Running. Build IN clause safely with placeholders.
ids := make([]interface{}, len(jobs))
placeholders := make([]byte, 0, len(jobs)*5)
for i, job := range jobs {
ids[i] = job.ID
if i > 0 {
placeholders = append(placeholders, ',')
}
placeholders = append(placeholders, fmt.Sprintf("$%d", i+2)...)
}
updateQuery := fmt.Sprintf(
`UPDATE jobs SET status = $1 WHERE id IN (%s)`,
string(placeholders),
)
updateArgs := append([]interface{}{domain.JobStatusRunning}, ids...)
if _, err := tx.ExecContext(ctx, updateQuery, updateArgs...); err != nil {
return nil, fmt.Errorf("failed to transition claimed jobs to Running: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit claim transaction: %w", err)
}
// Reflect the committed state in the returned objects.
for _, job := range jobs {
job.Status = domain.JobStatusRunning
}
return jobs, nil
}
// ListPendingByAgentID returns pending deployment jobs and AwaitingCSR jobs for
// a specific agent. Deployment jobs are matched by agent_id directly (set at
// creation time), with a fallback for legacy jobs where agent_id is NULL but
// target_id resolves to the agent via deployment_targets. AwaitingCSR jobs are
// matched through certificate → target mappings → agent ownership.
//
// The SELECT uses FOR UPDATE SKIP LOCKED so concurrent pollers (e.g. two agent
// instances running with the same agent_id) cannot observe the same rows when
// this method is invoked inside a transaction. For the production agent work
// poll path, prefer ClaimPendingByAgentID which additionally transitions
// claimed Pending deployment rows to Running atomically (H-6 CWE-362 fix).
func (r *JobRepository) ListPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment'
UNION ALL
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
FROM jobs j
INNER JOIN deployment_targets dt ON j.target_id = dt.id
WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment'
AND dt.agent_id = $1
UNION ALL
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
FROM jobs j
WHERE j.status = 'AwaitingCSR'
AND j.type IN ('Renewal', 'Issuance')
AND EXISTS (
SELECT 1 FROM certificate_target_mappings ctm
INNER JOIN deployment_targets dt ON ctm.target_id = dt.id
WHERE ctm.certificate_id = j.certificate_id
AND dt.agent_id = $1
)
ORDER BY created_at ASC
`, agentID)
if err != nil {
return nil, fmt.Errorf("failed to query pending jobs for agent: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating pending agent job rows: %w", err)
}
return jobs, nil
}
// ClaimPendingByAgentID atomically claims agent work inside a single
// transaction. Pending Deployment jobs assigned to the agent (directly via
// agent_id, or via legacy target→agent fallback) are transitioned from
// Pending to Running. AwaitingCSR Renewal/Issuance jobs linked to the agent
// via certificate → target mappings are locked with FOR UPDATE SKIP LOCKED
// and returned without a state transition — the flow requires the agent to
// submit a CSR to advance state, and pre-flipping AwaitingCSR would violate
// the renewal state machine (CWE-362 fix for H-6).
//
// Claimed rows are invisible to other concurrent claim calls for the lifetime
// of the transaction; rows claimed as Running remain invisible after commit
// because ListPendingByAgentID's filter is status='Pending'.
func (r *JobRepository) ClaimPendingByAgentID(ctx context.Context, agentID string) ([]*domain.Job, error) {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin agent claim transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
// Branch 1 + 2: Pending Deployment jobs (direct agent_id match or legacy
// target fallback). These get flipped to Running atomically below.
pendingRows, err := tx.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE agent_id = $1 AND status = 'Pending' AND type = 'Deployment'
UNION ALL
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
FROM jobs j
INNER JOIN deployment_targets dt ON j.target_id = dt.id
WHERE j.agent_id IS NULL AND j.status = 'Pending' AND j.type = 'Deployment'
AND dt.agent_id = $1
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
`, agentID)
if err != nil {
return nil, fmt.Errorf("failed to query pending deployment jobs for agent: %w", err)
}
var pendingJobs []*domain.Job
for pendingRows.Next() {
job, err := scanJob(pendingRows)
if err != nil {
pendingRows.Close()
return nil, err
}
pendingJobs = append(pendingJobs, job)
}
if err := pendingRows.Err(); err != nil {
pendingRows.Close()
return nil, fmt.Errorf("error iterating pending deployment rows: %w", err)
}
pendingRows.Close()
// Branch 3: AwaitingCSR jobs for this agent. Locked with FOR UPDATE SKIP
// LOCKED to prevent duplicate delivery to concurrent pollers, but state is
// NOT transitioned — the agent advances state via CSR submission.
csrRows, err := tx.QueryContext(ctx, `
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status, j.attempts, j.max_attempts,
j.last_error, j.scheduled_at, j.started_at, j.completed_at, j.created_at
FROM jobs j
WHERE j.status = 'AwaitingCSR'
AND j.type IN ('Renewal', 'Issuance')
AND EXISTS (
SELECT 1 FROM certificate_target_mappings ctm
INNER JOIN deployment_targets dt ON ctm.target_id = dt.id
WHERE ctm.certificate_id = j.certificate_id
AND dt.agent_id = $1
)
ORDER BY j.created_at ASC
FOR UPDATE SKIP LOCKED
`, agentID)
if err != nil {
return nil, fmt.Errorf("failed to query AwaitingCSR jobs for agent: %w", err)
}
var csrJobs []*domain.Job
for csrRows.Next() {
job, err := scanJob(csrRows)
if err != nil {
csrRows.Close()
return nil, err
}
csrJobs = append(csrJobs, job)
}
if err := csrRows.Err(); err != nil {
csrRows.Close()
return nil, fmt.Errorf("error iterating AwaitingCSR rows: %w", err)
}
csrRows.Close()
// Transition locked Pending deployments to Running before commit.
if len(pendingJobs) > 0 {
ids := make([]interface{}, len(pendingJobs))
placeholders := make([]byte, 0, len(pendingJobs)*5)
for i, job := range pendingJobs {
ids[i] = job.ID
if i > 0 {
placeholders = append(placeholders, ',')
}
placeholders = append(placeholders, fmt.Sprintf("$%d", i+2)...)
}
updateQuery := fmt.Sprintf(
`UPDATE jobs SET status = $1 WHERE id IN (%s)`,
string(placeholders),
)
updateArgs := append([]interface{}{domain.JobStatusRunning}, ids...)
if _, err := tx.ExecContext(ctx, updateQuery, updateArgs...); err != nil {
return nil, fmt.Errorf("failed to transition claimed deployment jobs to Running: %w", err)
}
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("failed to commit agent claim transaction: %w", err)
}
// Reflect the committed state in returned Pending deployment jobs; leave
// AwaitingCSR jobs untouched.
for _, job := range pendingJobs {
job.Status = domain.JobStatusRunning
}
// Preserve the legacy ordering: Pending deployments first, AwaitingCSR
// second. Callers that want a strict created_at merge can re-sort.
return append(pendingJobs, csrJobs...), nil
}
// ListTimedOutAwaitingJobs returns jobs stuck in AwaitingCSR or AwaitingApproval past
// their respective cutoff timestamps (created_at < cutoff). The reaper loop transitions
// them to Failed; I-001's retry loop then auto-promotes eligible Failed jobs back to
// Pending. I-003 coverage-gap closure.
func (r *JobRepository) ListTimedOutAwaitingJobs(ctx context.Context, csrCutoff, approvalCutoff time.Time) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, target_id, agent_id, status, attempts, max_attempts,
last_error, scheduled_at, started_at, completed_at, created_at
FROM jobs
WHERE (status = $1 AND created_at < $2)
OR (status = $3 AND created_at < $4)
ORDER BY created_at ASC
`, domain.JobStatusAwaitingCSR, csrCutoff, domain.JobStatusAwaitingApproval, approvalCutoff)
if err != nil {
return nil, fmt.Errorf("failed to query timed-out awaiting jobs: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating timed-out job rows: %w", err)
}
return jobs, nil
}
// ListJobsWithOfflineAgents returns jobs in Running status whose owning
// agent's last_heartbeat_at is older than agentCutoff. Bundle C / Audit
// M-016 (CWE-754): closes the gap that ListTimedOutAwaitingJobs left
// open — jobs claimed by an agent that subsequently dies sit in Running
// indefinitely. The query joins jobs to agents on agent_id and filters
// to (status='Running' AND agent.last_heartbeat_at < agentCutoff).
//
// Jobs without an agent_id (server-side keygen path) are intentionally
// excluded: they have no agent to be "offline".
func (r *JobRepository) ListJobsWithOfflineAgents(ctx context.Context, agentCutoff time.Time) ([]*domain.Job, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status,
j.attempts, j.max_attempts, j.last_error, j.scheduled_at,
j.started_at, j.completed_at, j.created_at
FROM jobs j
JOIN agents a ON a.id = j.agent_id
WHERE j.status = $1
AND j.agent_id IS NOT NULL
AND a.last_heartbeat_at IS NOT NULL
AND a.last_heartbeat_at < $2
ORDER BY j.started_at ASC NULLS FIRST
`, domain.JobStatusRunning, agentCutoff)
if err != nil {
return nil, fmt.Errorf("failed to query jobs with offline agents: %w", err)
}
defer rows.Close()
var jobs []*domain.Job
for rows.Next() {
job, err := scanJob(rows)
if err != nil {
return nil, err
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating offline-agent job rows: %w", err)
}
return jobs, nil
}
// scanJob scans a job from a row or rows
func scanJob(scanner interface {
Scan(...interface{}) error
}) (*domain.Job, error) {
var job domain.Job
err := scanner.Scan(&job.ID, &job.Type, &job.CertificateID, &job.TargetID,
&job.AgentID, &job.Status, &job.Attempts, &job.MaxAttempts, &job.LastError,
&job.ScheduledAt, &job.StartedAt, &job.CompletedAt, &job.CreatedAt)
if err != nil {
return nil, fmt.Errorf("failed to scan job: %w", err)
}
return &job, nil
}