mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 12:21:31 +00:00
02438ad9e1
Twelve findings from the architecture diligence audit's Phase 3 bundle
closed in one PR. All touch the CI workflows + small doc-drift fixes
across the production Go tree + migration headers.
CI workflow changes
====================
TEST-H1 — Race detection on ./... -short
.github/workflows/ci.yml:106 was a 9-package explicit list. Audit
finding TEST-H1 flagged that 25+ packages (internal/auth/*,
internal/repository/*, internal/mcp, internal/scep, internal/pkcs7,
internal/api/router, internal/api/acme, internal/cli, internal/cms,
internal/config, internal/deploy, internal/integration,
internal/ratelimit, internal/secret, internal/trustanchor, all of
cmd/) silently dropped off race coverage.
Post-fix: 'go test -race -short ./... -count=1 -timeout 600s'.
76 testing.Short() guards already cover testcontainers + live-DB
integration suites, so -short keeps the long-running tests out.
TEST-H2 — Cross-platform build matrix
New 'cross-platform-build' job in ci.yml. Matrix:
ubuntu-latest + windows-latest + macos-latest, fail-fast: false.
Builds cmd/server + cmd/agent + cmd/cli + cmd/mcp-server on each.
Catches Windows-specific regressions (path separators, file
permissions, exec.Command semantics) the pre-Phase-3 Ubuntu-only
CI missed.
TEST-L1 — actions/setup-go cache: true (explicit)
setup-go v5 defaults cache: true; making it explicit so a future
setup-go upgrade can't silently flip it. Re-runs hit the Go module
+ build cache instead of recompiling cold.
TEST-M1 — Mutation-testing floor at 55%
security-deep-scan.yml::go-mutesting step rewritten. Removed
continue-on-error + per-package '|| true'. New post-loop check
extracts every 'The mutation score is X.YZ' line and fails the
step if any package drops below 0.55. Floor rationale: starter
ratio catches major regressions without rejecting the audit's
'this is OK' steady state; raise quarterly.
TEST-M2 — 3 advisory deep-scan gates promoted to blocking
Removed continue-on-error: true from:
- gosec (filtered to G201/G202/G304/G108 high-signal rules:
SQL-injection + path-traversal + pprof-exposed)
- osv-scanner (multi-ecosystem CVE; complements govulncheck
which is already blocking in ci.yml)
- trivy image scan (--severity HIGH,CRITICAL --exit-code 1)
continue-on-error count: 15 → 11.
ZAP / schemathesis / nuclei / testssl stay advisory because their
false-positive rates on https://localhost:8443-targeted DAST runs
are high.
TEST-M3 — Playwright harness stub
web/package.json adds '@playwright/test' devDep + 'e2e' / 'e2e:install'
npm scripts. web/playwright.config.ts ships single chromium project
with webServer block pointing at 'npm run dev'. web/src/__tests__/
e2e/smoke.spec.ts proves the harness wires through. The full 15-flow
suite ships in frontend-design-audit Phase 8 (TEST-H1 in THAT audit);
this is the wiring + a single smoke test as the regression floor.
New Makefile target: 'make e2e-test'.
Doc/code drift fixes
====================
TEST-M4 + ARCH-L2 — Skip inventory artifact + CI guard
scripts/skip-inventory.sh walks every t.Skip site under cmd/ +
internal/ + deploy/test/ and emits docs/testing/skip-inventory.md
grouped by package with file:line:expression triples. Current
inventory: 142 t.Skip sites, 76 testing.Short() guards.
scripts/ci-guards/skip-inventory-drift.sh regenerates and fails on
diff (excluding the 'Last reviewed' timestamp line which drifts
daily). The Markdown is the canonical acquisition-diligence artifact
for 'what tests are being skipped and why.'
ARCH-H3 — MCP catalogue floor reconciliation
Audit framing was '121 vs floor 150 — doc/code drift.' Live count
via the test's actual regex over all 5 tool files (tools.go +
tools_audit_fix.go + tools_auth.go + tools_auth_bundle2.go +
tools_est.go): 155 unique 'Name: "certctl_*"' declarations.
Pre-Phase-3 audit measured tools.go in isolation (121) and missed
the other 4 files (+34 unique names). The test at
internal/ciparity/surface_parity_test.go::TestSurfaceParity_MCP
passes today (155 ≥ 150). Added a clarifying comment near
mcpBaselineFloor explaining the measurement scope so future
reviewers don't repeat the audit's framing error.
STATUS: stale — no code drift, just a measurement scoping error in
the audit.
ARCH-L1 — panic() rationale comments
5 panic sites in production Go (excluding _test.go):
- internal/repository/postgres/tx.go:84
- internal/service/issuer.go:861 (mustJSON)
- internal/service/est.go:728 (mustParseTime)
- internal/service/acme.go:1288 (rand source failure — already documented)
- internal/pkcs7/certrep.go:270 (OID marshal — already documented)
Added ARCH-L1 rationale comments to the 3 sites that didn't have
them. All 5 are defensible impossible-path / rethrow / hardcoded-
constant guards.
ARCH-L3 — Migration IF-NOT-EXISTS carve-outs
4 migrations skip the literal 'IF NOT EXISTS' token but ARE
idempotent via different Postgres patterns:
- 000014_policy_violation_severity_check.up.sql: ALTER TABLE
ADD CONSTRAINT CHECK doesn't accept IF NOT EXISTS; idempotency
via DROP CONSTRAINT IF EXISTS preamble.
- 000018_audit_events_worm.up.sql: CREATE OR REPLACE FUNCTION
+ DROP TRIGGER IF EXISTS + CREATE TRIGGER + DO $$ pg_roles
existence check. CREATE TRIGGER doesn't take IF NOT EXISTS.
- 000030_rbac_admin_perms.up.sql: INSERT ... ON CONFLICT DO NOTHING.
- 000039_audit_crit1_perms.up.sql: same INSERT + ON CONFLICT pattern.
Added ARCH-L3 header comments to each explaining the carve-out so
reviewers don't flag the missing literal token.
STATUS: largely stale — migrations are already idempotent.
ARCH-L4 — TODO/FIXME → see #<descriptor>
5 TODOs rewritten to the allowed 'see #<descriptor>' pattern:
- internal/repository/postgres/auth.go:220 → see #bundle-2-scope-fk
- internal/connector/discovery/gcpsm/gcpsm.go:547 → see #gcpsm-pagination
- internal/service/audit.go:244 → see #audit-pagination-count
- internal/service/job.go:295, 299 → see #validation-job-impl
New CI guard scripts/ci-guards/no-todo-in-prod.sh grep-fails any
new TODO/FIXME in cmd/ + internal/ (excluding _test.go); allows
'see #N' / 'see #<descriptor>' patterns.
Sandbox limitation
==================
The 6.1 GB certctl working tree fills the sandbox volume; go1.25.10
toolchain download fails with 'no space left on device' (sandbox has
1.25.9; go.mod requires 1.25.10). Local 'go test' / 'go build' NOT
run in this commit. Operator must run 'make verify' on their
workstation before push per CLAUDE.md operating rules.
The smoke.spec.ts NOT executed in the sandbox (no chromium installed).
Operator runs 'cd web && npm install && npx playwright install
--with-deps chromium && npm run e2e' on first wire-up.
All CI guards (no-todo-in-prod, skip-inventory-drift, G-3
env-docs-drift, doc-rot-detector, and every existing guard) verified
clean by running each individually.
Closes: cowork/certctl-architecture-diligence-audit.html#fix-TEST-H1,
cowork/certctl-architecture-diligence-audit.html#fix-TEST-H2,
cowork/certctl-architecture-diligence-audit.html#fix-TEST-M1,
cowork/certctl-architecture-diligence-audit.html#fix-TEST-M2,
cowork/certctl-architecture-diligence-audit.html#fix-TEST-M3,
cowork/certctl-architecture-diligence-audit.html#fix-TEST-M4,
cowork/certctl-architecture-diligence-audit.html#fix-TEST-L1,
cowork/certctl-architecture-diligence-audit.html#fix-ARCH-H3,
cowork/certctl-architecture-diligence-audit.html#fix-ARCH-L1,
cowork/certctl-architecture-diligence-audit.html#fix-ARCH-L2,
cowork/certctl-architecture-diligence-audit.html#fix-ARCH-L3,
cowork/certctl-architecture-diligence-audit.html#fix-ARCH-L4
712 lines
24 KiB
Go
712 lines
24 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"golang.org/x/sync/semaphore"
|
|
|
|
"github.com/certctl-io/certctl/internal/domain"
|
|
"github.com/certctl-io/certctl/internal/repository"
|
|
)
|
|
|
|
// ErrSelfApproval is returned by ApproveJob when the actor attempting to
|
|
// approve a renewal job is the same person listed as the owner of the
|
|
// underlying certificate. M-003 enforces separation of duties: the owner who
|
|
// requested (or benefits from) the renewal must not be the same identity that
|
|
// approves it. Handlers map this sentinel to HTTP 403 Forbidden.
|
|
var ErrSelfApproval = errors.New("self-approval forbidden: actor is the owner of the certificate")
|
|
|
|
// JobService manages job processing and status tracking.
|
|
// It coordinates between the scheduler and various job-specific services.
|
|
type JobService struct {
|
|
jobRepo repository.JobRepository
|
|
certRepo repository.CertificateRepository
|
|
ownerRepo repository.OwnerRepository
|
|
renewalService *RenewalService
|
|
deploymentService *DeploymentService
|
|
auditService *AuditService
|
|
logger *slog.Logger
|
|
|
|
// renewalConcurrency caps the number of concurrent goroutines that
|
|
// ProcessPendingJobs spawns. 0 (zero-value) means "sequential" so
|
|
// existing test wiring that constructs JobService directly via
|
|
// NewJobService keeps its strict-serial behaviour. Production
|
|
// wiring calls SetRenewalConcurrency(cfg.Scheduler.RenewalConcurrency)
|
|
// to switch on the bounded fan-out. Audit fix #9.
|
|
renewalConcurrency int
|
|
}
|
|
|
|
// NewJobService creates a new job service.
|
|
//
|
|
// certRepo and ownerRepo are required for the M-003 not-self-approval check
|
|
// in ApproveJob. Callers may pass nil for either to disable the check
|
|
// (useful for tests that don't exercise the approval path); when nil, the
|
|
// service logs a warning on the first approval attempt and permits the
|
|
// transition. Production wiring must supply both.
|
|
func NewJobService(
|
|
jobRepo repository.JobRepository,
|
|
certRepo repository.CertificateRepository,
|
|
ownerRepo repository.OwnerRepository,
|
|
renewalService *RenewalService,
|
|
deploymentService *DeploymentService,
|
|
logger *slog.Logger,
|
|
) *JobService {
|
|
return &JobService{
|
|
jobRepo: jobRepo,
|
|
certRepo: certRepo,
|
|
ownerRepo: ownerRepo,
|
|
renewalService: renewalService,
|
|
deploymentService: deploymentService,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// SetRenewalConcurrency wires the per-tick fan-out concurrency cap that
|
|
// ProcessPendingJobs uses. Called from cmd/server/main.go with
|
|
// cfg.Scheduler.RenewalConcurrency (default 25). Values ≤ 0 are
|
|
// normalised to 1 — fail-safe to sequential rather than panicking on
|
|
// semaphore.NewWeighted(0) which constructs a semaphore that blocks
|
|
// every Acquire.
|
|
//
|
|
// Audit fix #9: bounded scheduler concurrency. Pre-fix
|
|
// ProcessPendingJobs ran every claimed job sequentially in a single
|
|
// goroutine (slow but safe); operators with large fleets needed a
|
|
// performance lever, but switching to fire-and-forget per-job
|
|
// goroutines would have unbounded the upstream-CA call rate and
|
|
// tripped DigiCert / Entrust / Sectigo rate limits. The cap gives
|
|
// the operator a knob (CERTCTL_RENEWAL_CONCURRENCY) to dial
|
|
// throughput up without losing the rate-limit headroom.
|
|
func (s *JobService) SetRenewalConcurrency(n int) {
|
|
if n <= 0 {
|
|
n = 1
|
|
}
|
|
s.renewalConcurrency = n
|
|
}
|
|
|
|
// SetAuditService wires an optional audit service for emitting lifecycle
|
|
// events (e.g., scheduler-driven job_retry transitions recorded by
|
|
// RetryFailedJobs). Construction keeps the audit dependency optional so
|
|
// bootstrap/test wiring that doesn't exercise the retry path can omit it;
|
|
// production wiring in cmd/server/main.go should always call this.
|
|
func (s *JobService) SetAuditService(a *AuditService) {
|
|
s.auditService = a
|
|
}
|
|
|
|
// ProcessPendingJobs fetches and processes all pending jobs.
|
|
// It routes jobs to the appropriate service based on job type and handles errors gracefully.
|
|
//
|
|
// Concurrency (H-6 CWE-362): jobs are claimed via ClaimPendingJobs which uses
|
|
// SELECT ... FOR UPDATE SKIP LOCKED and flips Pending → Running atomically. Concurrent
|
|
// scheduler replicas in HA deployments will therefore never observe the same Pending row,
|
|
// and the subsequent UpdateStatus(Running) calls inside the downstream service methods are
|
|
// idempotent against the pre-flipped state.
|
|
func (s *JobService) ProcessPendingJobs(ctx context.Context) error {
|
|
// Claim pending jobs atomically (H-6 remediation: was ListByStatus which had no row lock).
|
|
// Empty jobType matches all types; zero limit means unlimited (preserves prior semantics).
|
|
pendingJobs, err := s.jobRepo.ClaimPendingJobs(ctx, "", 0)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to claim pending jobs: %w", err)
|
|
}
|
|
|
|
if len(pendingJobs) == 0 {
|
|
s.logger.Debug("no pending jobs to process")
|
|
return nil
|
|
}
|
|
|
|
s.logger.Info("processing pending jobs", "count", len(pendingJobs))
|
|
|
|
// Audit fix #9: bounded concurrent fan-out. When renewalConcurrency
|
|
// is the zero value (caller never set it), fall through to the
|
|
// historical strict-sequential loop so every existing test path
|
|
// keeps its byte-for-byte behaviour. Production wiring in
|
|
// cmd/server/main.go always calls SetRenewalConcurrency with the
|
|
// configured cap (default 25), so the bounded path is always taken
|
|
// in real deployments.
|
|
if s.renewalConcurrency <= 0 {
|
|
return s.processPendingJobsSequential(ctx, pendingJobs)
|
|
}
|
|
return s.processPendingJobsConcurrent(ctx, pendingJobs, s.renewalConcurrency)
|
|
}
|
|
|
|
// processPendingJobsSequential is the legacy strict-serial fan-out
|
|
// (preserved for unit-test wiring that constructs JobService via
|
|
// NewJobService and never calls SetRenewalConcurrency). One job at a
|
|
// time, no concurrency.
|
|
func (s *JobService) processPendingJobsSequential(ctx context.Context, pendingJobs []*domain.Job) error {
|
|
var processedCount int
|
|
var failedCount int
|
|
|
|
for _, job := range pendingJobs {
|
|
if shouldSkipJob(job) {
|
|
s.logger.Debug("skipping agent-routed deployment job",
|
|
"job_id", job.ID,
|
|
"agent_id", *job.AgentID)
|
|
continue
|
|
}
|
|
if err := s.processJob(ctx, job); err != nil {
|
|
s.logger.Error("failed to process job",
|
|
"job_id", job.ID,
|
|
"job_type", job.Type,
|
|
"error", err)
|
|
failedCount++
|
|
continue
|
|
}
|
|
processedCount++
|
|
}
|
|
|
|
s.logger.Info("job processing completed",
|
|
"processed", processedCount,
|
|
"failed", failedCount,
|
|
"total", len(pendingJobs))
|
|
|
|
return nil
|
|
}
|
|
|
|
// processPendingJobsConcurrent is the production entry point for the
|
|
// bounded fan-out — it pins the per-job work function to s.processJob.
|
|
// Test code at internal/service/job_concurrency_test.go drives
|
|
// boundedFanOut directly with a counter-tracking stub so the
|
|
// concurrency cap can be asserted without standing up the full
|
|
// renewal/deployment service graph.
|
|
func (s *JobService) processPendingJobsConcurrent(ctx context.Context, pendingJobs []*domain.Job, capN int) error {
|
|
return boundedFanOut(ctx, pendingJobs, capN, s.processJob, s.logger)
|
|
}
|
|
|
|
// boundedFanOut runs `work` over `pendingJobs` with at most `capN`
|
|
// goroutines in flight at any moment, using
|
|
// golang.org/x/sync/semaphore.Weighted for the gate. The Acquire(ctx,
|
|
// 1) call BLOCKS the loop when at the cap, providing real backpressure
|
|
// rather than fire-and-forget — the scheduler tick can't outrun the
|
|
// upstream-CA rate limit. ctx cancellation propagates through Acquire
|
|
// so a context.Done() interrupts the fan-out promptly; in-flight
|
|
// goroutines drain via Wait before the function returns, so no
|
|
// goroutine outlives the scheduler tick.
|
|
//
|
|
// processed / failed are tracked via atomic.Int64 to avoid mutex
|
|
// overhead on every job completion; the final log line reads both
|
|
// after Wait, so the values reflect every dispatched job.
|
|
//
|
|
// Audit fix #9: the cap is a load-bearing pre-condition for "operators
|
|
// with permissive upstream limits and large fleets can scale up the
|
|
// renewal sweep without losing rate-limit headroom."
|
|
func boundedFanOut(ctx context.Context, pendingJobs []*domain.Job, capN int, work func(context.Context, *domain.Job) error, logger *slog.Logger) error {
|
|
var processed, failed atomic.Int64
|
|
sem := semaphore.NewWeighted(int64(capN))
|
|
var wg sync.WaitGroup
|
|
|
|
for _, job := range pendingJobs {
|
|
if shouldSkipJob(job) {
|
|
logger.Debug("skipping agent-routed deployment job",
|
|
"job_id", job.ID,
|
|
"agent_id", *job.AgentID)
|
|
continue
|
|
}
|
|
|
|
// Acquire BEFORE launching so the offered load to upstream
|
|
// CAs respects the cap regardless of how fast individual
|
|
// jobs complete.
|
|
if err := sem.Acquire(ctx, 1); err != nil {
|
|
dispatched := processed.Load() + failed.Load()
|
|
logger.Warn("renewal fan-out cancelled mid-tick",
|
|
"reason", err,
|
|
"jobs_dispatched", dispatched,
|
|
"jobs_remaining", int64(len(pendingJobs))-dispatched)
|
|
break
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(j *domain.Job) {
|
|
defer wg.Done()
|
|
defer sem.Release(1)
|
|
|
|
if err := work(ctx, j); err != nil {
|
|
logger.Error("failed to process job",
|
|
"job_id", j.ID,
|
|
"job_type", j.Type,
|
|
"error", err)
|
|
failed.Add(1)
|
|
return
|
|
}
|
|
processed.Add(1)
|
|
}(job)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
logger.Info("job processing completed",
|
|
"processed", processed.Load(),
|
|
"failed", failed.Load(),
|
|
"total", len(pendingJobs),
|
|
"concurrency_cap", capN)
|
|
|
|
return nil
|
|
}
|
|
|
|
// shouldSkipJob returns true for deployment jobs that have an agent_id —
|
|
// those are meant for agent pickup via GetPendingWork(), not server-side
|
|
// processing. The server should only process deployment jobs without an
|
|
// agent (legacy/serverless targets).
|
|
func shouldSkipJob(job *domain.Job) bool {
|
|
return job.Type == domain.JobTypeDeployment && job.AgentID != nil && *job.AgentID != ""
|
|
}
|
|
|
|
// processJob routes a single job to the appropriate service based on type.
|
|
func (s *JobService) processJob(ctx context.Context, job *domain.Job) error {
|
|
s.logger.Debug("processing job",
|
|
"job_id", job.ID,
|
|
"job_type", job.Type,
|
|
"certificate_id", job.CertificateID)
|
|
|
|
switch job.Type {
|
|
case domain.JobTypeRenewal:
|
|
return s.renewalService.ProcessRenewalJob(ctx, job)
|
|
case domain.JobTypeDeployment:
|
|
return s.deploymentService.ProcessDeploymentJob(ctx, job)
|
|
case domain.JobTypeIssuance:
|
|
return s.processIssuanceJob(ctx, job)
|
|
case domain.JobTypeValidation:
|
|
return s.processValidationJob(ctx, job)
|
|
default:
|
|
return fmt.Errorf("unknown job type: %s", job.Type)
|
|
}
|
|
}
|
|
|
|
// processIssuanceJob handles a certificate issuance job.
|
|
// It reuses the renewal service's ProcessRenewalJob since the flow is identical:
|
|
// generate key → create CSR → call issuer → store version → create deployment jobs.
|
|
// The only difference is semantics (new cert vs renewed cert), not mechanics.
|
|
func (s *JobService) processIssuanceJob(ctx context.Context, job *domain.Job) error {
|
|
s.logger.Debug("processing issuance job", "job_id", job.ID)
|
|
|
|
// Issuance follows the same code path as renewal for the Local CA:
|
|
// generate server-side key + CSR → sign via issuer → store cert version → deploy
|
|
return s.renewalService.ProcessRenewalJob(ctx, job)
|
|
}
|
|
|
|
// processValidationJob handles a certificate validation job.
|
|
// This is a placeholder that documents the flow.
|
|
// see #validation-job-impl — implement actual validation job processing
|
|
// when a customer ask materializes. Today's renewal-loop already calls
|
|
// target connector ValidateDeployment after every deploy; this code
|
|
// path is reserved for an out-of-band "verify what's deployed matches
|
|
// the issued cert" scheduler tick. Not currently wired into the job
|
|
// dispatcher — the job type is reserved.
|
|
func (s *JobService) processValidationJob(ctx context.Context, job *domain.Job) error {
|
|
s.logger.Debug("processing validation job", "job_id", job.ID)
|
|
|
|
// see #validation-job-impl — when implemented:
|
|
// In production:
|
|
// 1. Fetch the certificate
|
|
// 2. For each target, call target connector ValidateDeployment
|
|
// 3. Aggregate results
|
|
// 4. Update job status based on results
|
|
// 5. Send notification if any validation fails
|
|
|
|
return fmt.Errorf("validation job processing not yet implemented")
|
|
}
|
|
|
|
// RetryFailedJobs finds failed jobs and resets them for retry.
|
|
// It only retries jobs that haven't exceeded max attempts.
|
|
//
|
|
// Audit trail (I-001): each successful Failed → Pending transition emits a
|
|
// "job_retry" audit event with actor "system" (ActorTypeSystem), capturing
|
|
// the old→new state and attempt counters so operators can reconstruct
|
|
// scheduler-driven retry activity. The audit service is optional — callers
|
|
// that haven't wired it via SetAuditService simply skip emission.
|
|
//
|
|
// maxRetries is retained for interface compatibility with
|
|
// scheduler.JobServicer but is advisory: per-job eligibility is governed by
|
|
// each job's own Attempts vs. MaxAttempts, not this parameter.
|
|
func (s *JobService) RetryFailedJobs(ctx context.Context, maxRetries int) error {
|
|
s.logger.Debug("retrying failed jobs", "max_retries", maxRetries)
|
|
|
|
failedJobs, err := s.jobRepo.ListByStatus(ctx, domain.JobStatusFailed)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch failed jobs: %w", err)
|
|
}
|
|
|
|
var retriedCount int
|
|
|
|
for _, job := range failedJobs {
|
|
// Check if we can retry (Attempts < MaxAttempts)
|
|
if job.Attempts >= job.MaxAttempts {
|
|
s.logger.Debug("job exceeded max retries",
|
|
"job_id", job.ID,
|
|
"attempts", job.Attempts,
|
|
"max_attempts", job.MaxAttempts)
|
|
continue
|
|
}
|
|
|
|
// Reset status to pending for retry
|
|
if err := s.jobRepo.UpdateStatus(ctx, job.ID, domain.JobStatusPending, ""); err != nil {
|
|
s.logger.Error("failed to reset job status for retry",
|
|
"job_id", job.ID,
|
|
"error", err)
|
|
continue
|
|
}
|
|
|
|
if s.auditService != nil {
|
|
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
|
|
"job_retry", "job", job.ID,
|
|
map[string]interface{}{
|
|
"old_status": string(domain.JobStatusFailed),
|
|
"new_status": string(domain.JobStatusPending),
|
|
"attempts": job.Attempts,
|
|
"max_attempts": job.MaxAttempts,
|
|
}); auditErr != nil {
|
|
s.logger.Error("failed to record job retry audit event",
|
|
"job_id", job.ID,
|
|
"error", auditErr)
|
|
}
|
|
}
|
|
|
|
retriedCount++
|
|
}
|
|
|
|
s.logger.Info("failed jobs retry completed",
|
|
"retried", retriedCount,
|
|
"total_failed", len(failedJobs))
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReapJobsWithOfflineAgents transitions jobs in Running status whose
|
|
// owning agent has been silent longer than agentTTL to Failed with
|
|
// reason "agent_offline". Bundle C / Audit M-016 (CWE-754): closes the
|
|
// gap left by ReapTimedOutJobs (which only handles AwaitingCSR /
|
|
// AwaitingApproval). I-001's retry loop then auto-promotes eligible
|
|
// Failed jobs back to Pending so a healthy agent can claim them.
|
|
func (s *JobService) ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error {
|
|
if agentTTL <= 0 {
|
|
return fmt.Errorf("ReapJobsWithOfflineAgents: agentTTL must be positive, got %s", agentTTL)
|
|
}
|
|
cutoff := time.Now().Add(-agentTTL)
|
|
|
|
staleJobs, err := s.jobRepo.ListJobsWithOfflineAgents(ctx, cutoff)
|
|
if err != nil {
|
|
return fmt.Errorf("list jobs with offline agents: %w", err)
|
|
}
|
|
|
|
var reaped int
|
|
for _, job := range staleJobs {
|
|
oldStatus := job.Status
|
|
errMsg := fmt.Sprintf("agent offline (no heartbeat for >%s)", agentTTL)
|
|
|
|
job.Status = domain.JobStatusFailed
|
|
job.LastError = &errMsg
|
|
|
|
if err := s.jobRepo.Update(ctx, job); err != nil {
|
|
s.logger.Error("failed to transition offline-agent job",
|
|
"job_id", job.ID, "agent_id", job.AgentID, "error", err)
|
|
continue
|
|
}
|
|
|
|
if s.auditService != nil {
|
|
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
|
|
"job_offline_agent_reap", "job", job.ID,
|
|
map[string]interface{}{
|
|
"old_status": string(oldStatus),
|
|
"new_status": string(domain.JobStatusFailed),
|
|
"timeout_reason": "agent_offline",
|
|
"agent_id": job.AgentID,
|
|
}); auditErr != nil {
|
|
s.logger.Error("failed to record offline-agent reap audit event",
|
|
"job_id", job.ID, "error", auditErr)
|
|
}
|
|
}
|
|
reaped++
|
|
}
|
|
|
|
s.logger.Info("offline-agent job reaper completed",
|
|
"reaped", reaped, "total_stale", len(staleJobs))
|
|
return nil
|
|
}
|
|
|
|
// ReapTimedOutJobs transitions jobs stuck in AwaitingCSR or AwaitingApproval
|
|
// to Failed if they've exceeded their TTL. I-001's retry loop then auto-promotes
|
|
// eligible Failed jobs back to Pending (closes coverage gap I-003).
|
|
func (s *JobService) ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error {
|
|
s.logger.Debug("reaping timed-out jobs", "csr_ttl", csrTTL, "approval_ttl", approvalTTL)
|
|
|
|
now := time.Now()
|
|
csrCutoff := now.Add(-csrTTL)
|
|
approvalCutoff := now.Add(-approvalTTL)
|
|
|
|
timedOutJobs, err := s.jobRepo.ListTimedOutAwaitingJobs(ctx, csrCutoff, approvalCutoff)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch timed-out jobs: %w", err)
|
|
}
|
|
|
|
var reaped int
|
|
|
|
for _, job := range timedOutJobs {
|
|
oldStatus := job.Status
|
|
var (
|
|
newErrMsg string
|
|
reason string
|
|
ttl time.Duration
|
|
)
|
|
switch job.Status {
|
|
case domain.JobStatusAwaitingCSR:
|
|
ttl = csrTTL
|
|
reason = "csr_timeout"
|
|
newErrMsg = fmt.Sprintf("timed out in %s after %s", oldStatus, csrTTL)
|
|
case domain.JobStatusAwaitingApproval:
|
|
ttl = approvalTTL
|
|
reason = "approval_timeout"
|
|
newErrMsg = fmt.Sprintf("timed out in %s after %s", oldStatus, approvalTTL)
|
|
default:
|
|
continue
|
|
}
|
|
_ = ttl
|
|
|
|
job.Status = domain.JobStatusFailed
|
|
job.LastError = &newErrMsg
|
|
|
|
if err := s.jobRepo.Update(ctx, job); err != nil {
|
|
s.logger.Error("failed to transition timed-out job",
|
|
"job_id", job.ID,
|
|
"old_status", oldStatus,
|
|
"error", err)
|
|
continue
|
|
}
|
|
|
|
if s.auditService != nil {
|
|
ageHours := time.Since(job.CreatedAt).Hours()
|
|
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
|
|
"job_timeout", "job", job.ID,
|
|
map[string]interface{}{
|
|
"old_status": string(oldStatus),
|
|
"new_status": string(domain.JobStatusFailed),
|
|
"timeout_reason": reason,
|
|
"age_hours": ageHours,
|
|
}); auditErr != nil {
|
|
s.logger.Error("failed to record job timeout audit event",
|
|
"job_id", job.ID,
|
|
"error", auditErr)
|
|
}
|
|
}
|
|
|
|
reaped++
|
|
}
|
|
|
|
s.logger.Info("job timeout reaper completed",
|
|
"reaped", reaped,
|
|
"total_timed_out", len(timedOutJobs))
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetJobStatus returns the current status of a job.
|
|
func (s *JobService) GetJobStatus(ctx context.Context, jobID string) (*domain.Job, error) {
|
|
job, err := s.jobRepo.Get(ctx, jobID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch job: %w", err)
|
|
}
|
|
return job, nil
|
|
}
|
|
|
|
// CancelJob cancels a pending or running job (handler interface method).
|
|
func (s *JobService) CancelJob(ctx context.Context, jobID string) error {
|
|
job, err := s.jobRepo.Get(ctx, jobID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch job: %w", err)
|
|
}
|
|
|
|
if job.Status != domain.JobStatusPending && job.Status != domain.JobStatusRunning {
|
|
return fmt.Errorf("cannot cancel job with status %s", job.Status)
|
|
}
|
|
|
|
if err := s.jobRepo.UpdateStatus(ctx, jobID, domain.JobStatusCancelled, "cancelled by user"); err != nil {
|
|
return fmt.Errorf("failed to cancel job: %w", err)
|
|
}
|
|
|
|
s.logger.Info("job cancelled", "job_id", jobID)
|
|
return nil
|
|
}
|
|
|
|
// ListJobs returns paginated jobs with optional filtering (handler interface method).
|
|
func (s *JobService) ListJobs(ctx context.Context, status, jobType string, page, perPage int) ([]domain.Job, int64, error) {
|
|
if page < 1 {
|
|
page = 1
|
|
}
|
|
if perPage < 1 {
|
|
perPage = 50
|
|
}
|
|
|
|
allJobs, err := s.jobRepo.List(ctx)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("failed to list jobs: %w", err)
|
|
}
|
|
|
|
// Filter jobs in memory based on status and jobType
|
|
var filtered []*domain.Job
|
|
for _, job := range allJobs {
|
|
if job == nil {
|
|
continue
|
|
}
|
|
if status != "" && string(job.Status) != status {
|
|
continue
|
|
}
|
|
if jobType != "" && string(job.Type) != jobType {
|
|
continue
|
|
}
|
|
filtered = append(filtered, job)
|
|
}
|
|
|
|
total := int64(len(filtered))
|
|
start := (page - 1) * perPage
|
|
if start >= int(total) {
|
|
return nil, total, nil
|
|
}
|
|
end := start + perPage
|
|
if end > int(total) {
|
|
end = int(total)
|
|
}
|
|
|
|
var result []domain.Job
|
|
for _, job := range filtered[start:end] {
|
|
if job != nil {
|
|
result = append(result, *job)
|
|
}
|
|
}
|
|
|
|
return result, total, nil
|
|
}
|
|
|
|
// GetJob returns a single job (handler interface method).
|
|
func (s *JobService) GetJob(ctx context.Context, id string) (*domain.Job, error) {
|
|
return s.jobRepo.Get(ctx, id)
|
|
}
|
|
|
|
// ApproveJob approves a renewal job that is awaiting approval.
|
|
// Transitions the job from AwaitingApproval to Pending so the scheduler picks it up.
|
|
//
|
|
// actor is the named-key identity of the approver (from the auth middleware
|
|
// via resolveActor). M-003: if actor matches the certificate owner's Name or
|
|
// Email (case-insensitive), returns ErrSelfApproval to enforce separation of
|
|
// duties. Callers must pass a non-empty actor; empty actor is treated as an
|
|
// anonymous system caller and permitted (internal/system paths).
|
|
func (s *JobService) ApproveJob(ctx context.Context, id, actor string) error {
|
|
job, err := s.jobRepo.Get(ctx, id)
|
|
if err != nil {
|
|
return fmt.Errorf("job not found: %w", err)
|
|
}
|
|
|
|
if job.Status != domain.JobStatusAwaitingApproval {
|
|
return fmt.Errorf("cannot approve job with status %s (must be AwaitingApproval)", job.Status)
|
|
}
|
|
|
|
if err := s.checkNotSelf(ctx, job, actor); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.jobRepo.UpdateStatus(ctx, id, domain.JobStatusPending, ""); err != nil {
|
|
return fmt.Errorf("failed to approve job: %w", err)
|
|
}
|
|
|
|
s.logger.Info("renewal job approved",
|
|
"job_id", id,
|
|
"certificate_id", job.CertificateID,
|
|
"actor", actor)
|
|
return nil
|
|
}
|
|
|
|
// RejectJob rejects a renewal job that is awaiting approval.
|
|
// Transitions the job to Cancelled with a rejection reason.
|
|
//
|
|
// actor is the named-key identity of the rejector (from the auth middleware
|
|
// via resolveActor). Rejection is NOT subject to the not-self check — an
|
|
// owner is permitted to cancel their own pending renewal. actor is recorded
|
|
// on the log line for audit attribution.
|
|
func (s *JobService) RejectJob(ctx context.Context, id, reason, actor string) error {
|
|
job, err := s.jobRepo.Get(ctx, id)
|
|
if err != nil {
|
|
return fmt.Errorf("job not found: %w", err)
|
|
}
|
|
|
|
if job.Status != domain.JobStatusAwaitingApproval {
|
|
return fmt.Errorf("cannot reject job with status %s (must be AwaitingApproval)", job.Status)
|
|
}
|
|
|
|
msg := "rejected by user"
|
|
if reason != "" {
|
|
msg = "rejected: " + reason
|
|
}
|
|
|
|
if err := s.jobRepo.UpdateStatus(ctx, id, domain.JobStatusCancelled, msg); err != nil {
|
|
return fmt.Errorf("failed to reject job: %w", err)
|
|
}
|
|
|
|
s.logger.Info("renewal job rejected",
|
|
"job_id", id,
|
|
"certificate_id", job.CertificateID,
|
|
"reason", reason,
|
|
"actor", actor)
|
|
return nil
|
|
}
|
|
|
|
// checkNotSelf enforces the M-003 separation-of-duties rule for renewal
|
|
// approval: the actor approving a job may not be the owner of the underlying
|
|
// certificate.
|
|
//
|
|
// Resolution rules:
|
|
// - Empty actor → permitted (internal/system caller; auth middleware already
|
|
// short-circuits anonymous users at the handler layer).
|
|
// - certRepo or ownerRepo nil → warn once, permit (test/bootstrap wiring).
|
|
// - Job has no certificate or certificate has no OwnerID → permitted (no
|
|
// owner to collide with).
|
|
// - Owner record not found → warn, permit (defensive: stale FK should not
|
|
// block operations).
|
|
// - Case-insensitive match against owner.Name OR owner.Email → returns
|
|
// ErrSelfApproval.
|
|
func (s *JobService) checkNotSelf(ctx context.Context, job *domain.Job, actor string) error {
|
|
if actor == "" {
|
|
return nil
|
|
}
|
|
if s.certRepo == nil || s.ownerRepo == nil {
|
|
s.logger.Warn("not-self approval check skipped: cert/owner repo not wired",
|
|
"job_id", job.ID, "actor", actor)
|
|
return nil
|
|
}
|
|
if job.CertificateID == "" {
|
|
return nil
|
|
}
|
|
|
|
cert, err := s.certRepo.Get(ctx, job.CertificateID)
|
|
if err != nil {
|
|
s.logger.Warn("not-self approval check: certificate lookup failed",
|
|
"job_id", job.ID, "certificate_id", job.CertificateID, "error", err)
|
|
return nil
|
|
}
|
|
if cert == nil || cert.OwnerID == "" {
|
|
return nil
|
|
}
|
|
|
|
owner, err := s.ownerRepo.Get(ctx, cert.OwnerID)
|
|
if err != nil || owner == nil {
|
|
s.logger.Warn("not-self approval check: owner lookup failed",
|
|
"job_id", job.ID, "owner_id", cert.OwnerID, "error", err)
|
|
return nil
|
|
}
|
|
|
|
actorLower := strings.ToLower(actor)
|
|
if strings.ToLower(owner.Name) == actorLower || strings.ToLower(owner.Email) == actorLower {
|
|
s.logger.Warn("self-approval blocked",
|
|
"job_id", job.ID,
|
|
"certificate_id", job.CertificateID,
|
|
"owner_id", owner.ID,
|
|
"actor", actor)
|
|
return ErrSelfApproval
|
|
}
|
|
|
|
return nil
|
|
}
|