mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 18:21:32 +00:00
Merge bundle-C: Renewal/reliability cluster — 7 findings closed
This commit is contained in:
@@ -4,6 +4,38 @@ All notable changes to certctl are documented in this file. Dates use ISO 8601.
|
||||
|
||||
## [unreleased] — 2026-04-26
|
||||
|
||||
### Bundle C (Renewal/Reliability cluster): 7 audit findings closed
|
||||
|
||||
> Closes the audit's renewal/reliability cluster — `M-006` (idempotent migration 000014), `M-007` (3 partial-failure tests across bulk-revoke / bulk-renew / bulk-reassign), `M-008` (admin-gated handler enumeration pin, verified-already-clean), `M-015` (cardinality invariant pinned at struct level via reflect, verified-already-clean), `M-016` (new ListJobsWithOfflineAgents repo method + ReapJobsWithOfflineAgents service path + scheduler wiring), `M-019` (configurable ARI HTTP timeout + 4 dispatch tests, audit-claim verified wrong), `M-020` (rate limiter on noAuthHandler chain + Must-Staple operator runbook). M-028 was already closed by the Bundle B CI follow-up.
|
||||
|
||||
#### Added
|
||||
|
||||
- **`internal/repository/postgres/job.go::ListJobsWithOfflineAgents` (NEW, Audit M-016 / CWE-754)** — JOINs jobs to agents on agent_id and filters `(status='Running' AND a.last_heartbeat_at < agentCutoff)`. Server-keygen jobs (no agent_id) excluded by design.
|
||||
- **`internal/service/job.go::ReapJobsWithOfflineAgents` (NEW, Audit M-016)** — Flips matched jobs to Failed with reason `agent_offline`; emits an audit event per reap; rejects non-positive TTL with a fail-loud error.
|
||||
- **`Scheduler.agentOfflineJobTTL` + `SetAgentOfflineJobTTL` (NEW, Audit M-016)** — Defaults to 5 minutes (5× the default agent-health-check interval); operators can override. The existing `runJobTimeout` cycle now calls both reaper arms.
|
||||
- **`Config.ARIHTTPTimeoutSeconds` + `Connector.ariHTTPTimeout()` (NEW, Audit M-019)** — Configurable per-issuer ARI HTTP timeout. Defaults to 15s when zero (preserves the pre-bundle default). `CERTCTL_ACME_ARI_HTTP_TIMEOUT_SECONDS` env var path.
|
||||
- **`router.AuthExemptDispatchPrefixes` extended with rate-limited noAuthHandler chain (Audit M-020 / CWE-770)** — `cmd/server/main.go` noAuthHandler is now constructed via a slice that conditionally appends `middleware.NewRateLimiter` when `cfg.RateLimit.Enabled`. Per-IP keying protects unauth surfaces (OCSP, CRL, EST, SCEP) from DoS-as-revocation-bypass for fail-open relying parties.
|
||||
- **`docs/security.md` (NEW, Audit M-020)** — Operator runbook documenting OCSP Must-Staple (RFC 7633) as the architectural fix for fail-open relying parties; profile-flip guidance; server-side OCSP-stapling config snippets for nginx / Apache / HAProxy / Envoy; explicit scope statement.
|
||||
|
||||
#### Tests
|
||||
|
||||
- **`internal/api/handler/bulk_partial_failure_test.go` (NEW, 3 tests, Audit M-007)** — Mixed-result branch coverage for all 3 bulk handlers: HTTP 200 with both success counters and per-cert errors[] preserved.
|
||||
- **`internal/api/handler/m008_admin_gate_test.go` (NEW, 2 tests, Audit M-008)** — Walks every handler `.go` file, asserts every `middleware.IsAdmin` call site is in `AdminGatedHandlers` (with required test triplet) or `InformationalIsAdminCallers` (justified). Pin against future bypass.
|
||||
- **`internal/domain/m015_cardinality_test.go` (NEW, 2 tests, Audit M-015)** — reflect-based pin on `ManagedCertificate.{CertificateProfileID,RenewalPolicyID,IssuerID,OwnerID}` and `RenewalPolicy.CertificateProfileID` kind=String. Schema change to N:N would have to update renewal.go's lookup loop in the same commit.
|
||||
- **`internal/connector/issuer/acme/ari_timeout_test.go` (NEW, 4 tests, Audit M-019)** — `ariHTTPTimeout()` dispatch contract: default-15s / non-zero-overrides / negative-falls-back-to-default / nil-config-safe-default.
|
||||
- **`internal/service/job_offline_agent_reaper_test.go` (NEW, 6 tests, Audit M-016)** — Flips Running to Failed; skips server-keygen (no agent_id); skips non-Running; rejects non-positive TTL; propagates repo error; records audit event.
|
||||
|
||||
#### Changed
|
||||
|
||||
- **`migrations/000014_policy_violation_severity_check.up.sql` (Audit M-006 / CWE-913)** — Prepended `ALTER TABLE policy_violations DROP CONSTRAINT IF EXISTS policy_violations_severity_check;` before the ADD. Re-runs on partially-applied DBs now succeed.
|
||||
- **`internal/connector/issuer/acme/ari.go` (Audit M-019)** — Both HTTP clients (`GetRenewalInfo` and `getARIEndpoint`) now use the configurable `ariHTTPTimeout()` helper instead of the hardcoded 15s.
|
||||
- **`cmd/server/main.go` noAuthHandler construction (Audit M-020)** — From fixed `middleware.Chain(...)` to conditional slice with rate-limiter append. Backwards-compatible: when `cfg.RateLimit.Enabled=false` the chain reduces to the prior shape.
|
||||
|
||||
#### Audit Deliverables Updated
|
||||
|
||||
- `cowork/comprehensive-audit-2026-04-25/audit-report.md` — score 31/55 → 38/55 closed (Critical 0/0; High 7/9; **Medium 13/27 → 20/27**; Low 8/19); M-006/M-007/M-008/M-015/M-016/M-019/M-020 boxes flipped `[x]` with closure notes.
|
||||
- `cowork/comprehensive-audit-2026-04-25/findings.yaml` — corresponding status flips with closure notes citing the Bundle C mechanism.
|
||||
|
||||
### Bundle B (Auth & Transport Surface Tightening): 5 audit findings closed
|
||||
|
||||
> Closes the audit's auth + transport hardening cluster: `M-001` (PBKDF2 100k → 600k via new v3 blob format with v2/v1 read fallback), `M-002` (auth-exempt allowlist constants + AST-walking regression tests pin both router-layer and dispatch-layer bypass paths), `M-013` (CORS deny-by-default verified-already-clean + explicit nil/empty/star contract pin), `M-018` (Postgres TLS opt-in via Helm `postgresql.tls.mode` toggle + operator runbook `docs/database-tls.md`), `M-025` (rate-limiter rewritten from global single-bucket to per-key map keyed on UserKey-from-context with IP fallback). **Breaking change:** Bundle B's M-001 makes new ciphertext blobs use v3 format (magic byte `0x03`); reads still accept v1+v2 transparently and the next UPDATE re-seals as v3 — no operator action required, but rolling back to a pre-Bundle-B binary will leave v3 rows un-readable.
|
||||
|
||||
+18
-2
@@ -888,13 +888,29 @@ func main() {
|
||||
// same bodyLimitMiddleware that wraps the authed surface also wraps
|
||||
// the unauth surface — same default cap (CERTCTL_MAX_BODY_SIZE,
|
||||
// default 1MB), same 413 response on overflow.
|
||||
noAuthHandler := middleware.Chain(apiRouter,
|
||||
//
|
||||
// Bundle C / Audit M-020 (CWE-770): rate limiter added to the noAuth
|
||||
// chain. Pre-bundle the unauth surface had NO rate limit — an attacker
|
||||
// could DoS the OCSP responder, which for fail-open relying parties
|
||||
// constitutes a revocation bypass (every cert appears valid when the
|
||||
// responder is unreachable). The same per-key keyed bucket from
|
||||
// Bundle B / M-025 is reused; the per-source-IP keying applies because
|
||||
// none of these endpoints are authenticated.
|
||||
noAuthMiddleware := []func(http.Handler) http.Handler{
|
||||
middleware.RequestID,
|
||||
structuredLogger,
|
||||
middleware.Recovery,
|
||||
bodyLimitMiddleware,
|
||||
securityHeadersMiddleware,
|
||||
)
|
||||
}
|
||||
if cfg.RateLimit.Enabled {
|
||||
noAuthRateLimiter := middleware.NewRateLimiter(middleware.RateLimitConfig{
|
||||
RPS: cfg.RateLimit.RPS,
|
||||
BurstSize: cfg.RateLimit.BurstSize,
|
||||
})
|
||||
noAuthMiddleware = append(noAuthMiddleware, noAuthRateLimiter)
|
||||
}
|
||||
noAuthHandler := middleware.Chain(apiRouter, noAuthMiddleware...)
|
||||
|
||||
dashboardEnabled := false
|
||||
if _, err := os.Stat(webDir + "/index.html"); err == nil {
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
# certctl Security Posture & Operator Guidance
|
||||
|
||||
This document collects the operator-facing security guidance that the source
|
||||
code's per-finding comment blocks reference. Each section names the audit
|
||||
finding it closes, the threat model, and the operator action required (if
|
||||
any).
|
||||
|
||||
## OCSP responder availability
|
||||
|
||||
**Audit reference:** Bundle C / M-020. CWE-770 (uncontrolled resource
|
||||
consumption); RFC 6960 (OCSP); RFC 7633 (Must-Staple).
|
||||
|
||||
certctl ships an OCSP responder at `/.well-known/pki/ocsp/{issuer_id}/{serial}`
|
||||
that signs a fresh response per request. Pre-Bundle-C the unauth handler
|
||||
chain had no rate limit, so an attacker could DoS the responder and force
|
||||
fail-open relying parties to accept revoked certificates as valid. Bundle C
|
||||
adds the same per-key rate limiter to the unauth chain that the authenticated
|
||||
chain has used since Bundle B. Per-IP keying applies because OCSP traffic is
|
||||
unauthenticated.
|
||||
|
||||
The rate limiter alone does not solve the underlying revocation-bypass risk.
|
||||
**The architectural fix is for issued certificates to carry the OCSP
|
||||
Must-Staple TLS Feature extension** (RFC 7633, OID 1.3.6.1.5.5.7.1.24). When
|
||||
present, conforming TLS clients refuse to negotiate a session unless the
|
||||
server staples a fresh signed OCSP response in the TLS handshake. This shifts
|
||||
revocation enforcement from the client's discretion (which most fail-open by
|
||||
default) to a hard requirement that the connection cannot complete without
|
||||
proof of non-revocation.
|
||||
|
||||
### Operator action
|
||||
|
||||
For certificates issued to systems where revocation correctness matters:
|
||||
|
||||
1. **Configure the issuer profile to set `must-staple: true`.** Out-of-the-box
|
||||
profiles in `migrations/seed.sql` do not set this; operators add it at
|
||||
profile-creation time via the API or by editing seed data.
|
||||
2. **Confirm the relying party honors the extension.** OpenSSL ≥ 1.1.0,
|
||||
Firefox, and Chrome 84+ all enforce Must-Staple. Older clients silently
|
||||
ignore it.
|
||||
3. **Confirm the deployment target is configured for OCSP stapling** so the
|
||||
server can actually deliver the stapled response in the handshake.
|
||||
- **nginx:** `ssl_stapling on; ssl_stapling_verify on;`
|
||||
- **Apache:** `SSLUseStapling on`
|
||||
- **HAProxy:** `set ssl ocsp-response /path/to/response.der`
|
||||
- **Envoy:** `ocsp_staple_policy: must_staple`
|
||||
|
||||
### What this does NOT cover
|
||||
|
||||
- **CRL fallback.** Must-Staple does not affect CRL behavior. Operators with
|
||||
CRL-based relying parties should use the rate-limit + caching defense
|
||||
alone; there is no client-side equivalent to Must-Staple for CRLs.
|
||||
- **Self-issued certs in air-gapped networks.** When the relying party
|
||||
cannot reach the OCSP responder at all (the threat model the audit
|
||||
cited), Must-Staple is the only mechanism that closes the bypass. CRL
|
||||
distribution similarly requires the relying party to fetch the CRL,
|
||||
which is also subject to the same network-availability concern.
|
||||
|
||||
## Postgres transport encryption
|
||||
|
||||
See [docs/database-tls.md](database-tls.md). Bundle B / M-018.
|
||||
|
||||
## Encryption at rest
|
||||
|
||||
Bundle B / M-001. PBKDF2-SHA256 at 600,000 rounds (OWASP 2024 Password
|
||||
Storage Cheat Sheet floor) for the operator-supplied passphrase that
|
||||
derives the AES-256-GCM key for sensitive config columns. v3 blob format
|
||||
with a per-ciphertext random salt; v1/v2 read fallback for legacy rows.
|
||||
See [internal/crypto/encryption.go](../internal/crypto/encryption.go) and
|
||||
the accompanying tests for the format spec.
|
||||
|
||||
## Authentication surface
|
||||
|
||||
Bundle B / M-002. Two layers decide auth-exempt status:
|
||||
|
||||
1. **Router layer:** `internal/api/router/router.go::AuthExemptRouterRoutes`
|
||||
— the 4 endpoints registered via direct `r.mux.Handle` without going
|
||||
through the middleware chain (`/health`, `/ready`, `/api/v1/auth/info`,
|
||||
`/api/v1/version`).
|
||||
2. **Dispatch layer:** `internal/api/router/router.go::AuthExemptDispatchPrefixes`
|
||||
— URL-prefix routing in `cmd/server/main.go::buildFinalHandler` for
|
||||
`/.well-known/pki/*`, `/.well-known/est/*`, and `/scep[/...]*`.
|
||||
|
||||
Both lists have AST-walking regression tests (`auth_exempt_test.go`) that
|
||||
fail CI if a new bypass lands without an updating the documented constant.
|
||||
|
||||
## Per-user rate limiting
|
||||
|
||||
Bundle B / M-025. Authenticated callers are bucketed by API-key name;
|
||||
unauthenticated callers (probes, OCSP relying parties, EST/SCEP enrollees)
|
||||
are bucketed by source IP. `RPS` and `BurstSize` are per-key budgets.
|
||||
`PerUserRPS` / `PerUserBurstSize` give authenticated clients a separate
|
||||
budget when set non-zero.
|
||||
|
||||
## Reporting a vulnerability
|
||||
|
||||
Email `certctl@proton.me`. Coordinated disclosure preferred; we will
|
||||
acknowledge within 72h.
|
||||
@@ -0,0 +1,180 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/shankar0123/certctl/internal/domain"
|
||||
)
|
||||
|
||||
// Bundle C / Audit M-007 (CWE-754): partial-failure tests for the three
|
||||
// bulk endpoints. Pre-bundle all three handlers had only happy-path
|
||||
// (TotalRevoked = TotalMatched, no Errors) and full-failure (service
|
||||
// returns err) tests. The mixed-result branch — where some certs
|
||||
// succeed and others fail — is the most operationally common shape
|
||||
// and was completely uncovered.
|
||||
//
|
||||
// Each test asserts:
|
||||
// 1. HTTP 200 (mixed result is a successful HTTP response carrying
|
||||
// both succeeded and failed counters).
|
||||
// 2. The response body's TotalMatched / Total<verb> / TotalFailed
|
||||
// counters all round-trip from the service mock.
|
||||
// 3. The Errors[] array is preserved and operators can correlate
|
||||
// each failure to its certificate ID.
|
||||
|
||||
// --- bulk-revoke ----------------------------------------------------------
|
||||
|
||||
func TestBulkRevoke_PartialFailure_ReportsBoth(t *testing.T) {
|
||||
svc := &mockBulkRevocationService{
|
||||
BulkRevokeFn: func(ctx context.Context, criteria domain.BulkRevocationCriteria, reason string, actor string) (*domain.BulkRevocationResult, error) {
|
||||
return &domain.BulkRevocationResult{
|
||||
TotalMatched: 3,
|
||||
TotalRevoked: 2,
|
||||
TotalSkipped: 0,
|
||||
TotalFailed: 1,
|
||||
Errors: []domain.BulkRevocationError{
|
||||
{CertificateID: "mc-failed", Error: "issuer connector unreachable"},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
h := NewBulkRevocationHandler(svc)
|
||||
|
||||
body := `{"reason":"keyCompromise","certificate_ids":["mc-1","mc-2","mc-failed"]}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v1/certificates/bulk-revoke", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req = req.WithContext(adminContext())
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
h.BulkRevoke(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("partial failure must still return HTTP 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var result domain.BulkRevocationResult
|
||||
if err := json.NewDecoder(w.Body).Decode(&result); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if result.TotalMatched != 3 {
|
||||
t.Errorf("TotalMatched = %d, want 3", result.TotalMatched)
|
||||
}
|
||||
if result.TotalRevoked != 2 {
|
||||
t.Errorf("TotalRevoked = %d, want 2", result.TotalRevoked)
|
||||
}
|
||||
if result.TotalFailed != 1 {
|
||||
t.Errorf("TotalFailed = %d, want 1", result.TotalFailed)
|
||||
}
|
||||
if len(result.Errors) != 1 {
|
||||
t.Fatalf("Errors len = %d, want 1", len(result.Errors))
|
||||
}
|
||||
if result.Errors[0].CertificateID != "mc-failed" {
|
||||
t.Errorf("error CertificateID = %q, want mc-failed", result.Errors[0].CertificateID)
|
||||
}
|
||||
if result.Errors[0].Error == "" {
|
||||
t.Error("error message must be non-empty so operators can triage")
|
||||
}
|
||||
}
|
||||
|
||||
// --- bulk-renew -----------------------------------------------------------
|
||||
|
||||
func TestBulkRenew_PartialFailure_ReportsBoth(t *testing.T) {
|
||||
svc := &mockBulkRenewalService{
|
||||
BulkRenewFn: func(ctx context.Context, criteria domain.BulkRenewalCriteria, actor string) (*domain.BulkRenewalResult, error) {
|
||||
return &domain.BulkRenewalResult{
|
||||
TotalMatched: 3,
|
||||
TotalEnqueued: 2,
|
||||
TotalSkipped: 0,
|
||||
TotalFailed: 1,
|
||||
Errors: []domain.BulkOperationError{
|
||||
{CertificateID: "mc-failed", Error: "renewal job enqueue failed: db timeout"},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
h := NewBulkRenewalHandler(svc)
|
||||
|
||||
body := `{"certificate_ids":["mc-1","mc-2","mc-failed"]}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v1/certificates/bulk-renew", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req = req.WithContext(authenticatedContext("test-actor"))
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
h.BulkRenew(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("partial failure must still return HTTP 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var result domain.BulkRenewalResult
|
||||
if err := json.NewDecoder(w.Body).Decode(&result); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if result.TotalMatched != 3 || result.TotalEnqueued != 2 || result.TotalFailed != 1 {
|
||||
t.Errorf("counters mismatch: matched=%d enqueued=%d failed=%d, want 3/2/1",
|
||||
result.TotalMatched, result.TotalEnqueued, result.TotalFailed)
|
||||
}
|
||||
if len(result.Errors) != 1 || result.Errors[0].CertificateID != "mc-failed" {
|
||||
t.Errorf("Errors not preserved: %+v", result.Errors)
|
||||
}
|
||||
}
|
||||
|
||||
// --- bulk-reassign --------------------------------------------------------
|
||||
|
||||
func TestBulkReassign_PartialFailure_ReportsBoth(t *testing.T) {
|
||||
svc := &mockBulkReassignmentService{
|
||||
BulkReassignFn: func(ctx context.Context, request domain.BulkReassignmentRequest, actor string) (*domain.BulkReassignmentResult, error) {
|
||||
return &domain.BulkReassignmentResult{
|
||||
TotalMatched: 3,
|
||||
TotalReassigned: 2,
|
||||
TotalSkipped: 0,
|
||||
TotalFailed: 1,
|
||||
Errors: []domain.BulkOperationError{
|
||||
{CertificateID: "mc-failed", Error: "FK violation: cert no longer exists"},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
h := NewBulkReassignmentHandler(svc)
|
||||
|
||||
body := `{"certificate_ids":["mc-1","mc-2","mc-failed"],"owner_id":"o-bob"}`
|
||||
req := httptest.NewRequest(http.MethodPost, "/api/v1/certificates/bulk-reassign", bytes.NewBufferString(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req = req.WithContext(authenticatedContext("test-actor"))
|
||||
w := httptest.NewRecorder()
|
||||
|
||||
h.BulkReassign(w, req)
|
||||
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("partial failure must still return HTTP 200, got %d", w.Code)
|
||||
}
|
||||
|
||||
var result domain.BulkReassignmentResult
|
||||
if err := json.NewDecoder(w.Body).Decode(&result); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
if result.TotalMatched != 3 || result.TotalReassigned != 2 || result.TotalFailed != 1 {
|
||||
t.Errorf("counters mismatch: matched=%d reassigned=%d failed=%d, want 3/2/1",
|
||||
result.TotalMatched, result.TotalReassigned, result.TotalFailed)
|
||||
}
|
||||
if len(result.Errors) != 1 || result.Errors[0].CertificateID != "mc-failed" {
|
||||
t.Errorf("Errors not preserved: %+v", result.Errors)
|
||||
}
|
||||
}
|
||||
|
||||
// --- helper context for unauth-allowed handlers (renew + reassign aren't admin-gated) ---
|
||||
|
||||
func authenticatedContext(actor string) context.Context {
|
||||
type userKey struct{}
|
||||
// The middleware UserKey is a private type in the middleware package, so
|
||||
// in this handler test we can't construct one directly. Bulk-renew and
|
||||
// bulk-reassign read the actor through the same middleware.GetUser path
|
||||
// that bulk-revoke does — adminContext() in the existing test suite is
|
||||
// the canonical helper. Reuse it (delivers both UserKey and AdminKey).
|
||||
_ = userKey{}
|
||||
return adminContext()
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"go/parser"
|
||||
"go/token"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Bundle C / Audit M-008: pin the admin-gated handler set.
|
||||
//
|
||||
// The audit's request is "Admin-gated operation role-gate test coverage
|
||||
// needs verification". Verified-already-clean recon: only one handler
|
||||
// in internal/api/handler/ calls middleware.IsAdmin to gate access:
|
||||
// bulk_revocation.go — which has 3 dedicated tests
|
||||
// (NonAdmin_Returns403, AdminExplicitFalse_Returns403,
|
||||
// AdminPermitted_ForwardsActor) covering all three branches.
|
||||
//
|
||||
// This test enforces the invariant going forward by walking every
|
||||
// .go file in this package, finding every middleware.IsAdmin call
|
||||
// site, and asserting the file appears in AdminGatedHandlers below.
|
||||
// Adding a new middleware.IsAdmin call without updating the constant
|
||||
// AND adding a parallel test triplet fails CI.
|
||||
|
||||
// AdminGatedHandlers is the documented allowlist of handler files that
|
||||
// gate access on middleware.IsAdmin. Every entry MUST have:
|
||||
// - a non-admin-rejection test ("_NonAdmin_Returns403")
|
||||
// - an explicit-false-admin-rejection test ("_AdminExplicitFalse_Returns403")
|
||||
// - an admin-allowed actor-attribution test ("_AdminPermitted_ForwardsActor")
|
||||
//
|
||||
// Keys are the handler filenames; values are short descriptions of why
|
||||
// the gate exists. health.go is an INFORMATIONAL caller of IsAdmin (it
|
||||
// surfaces the flag to the GUI but does not gate) — explicitly excluded.
|
||||
var AdminGatedHandlers = map[string]string{
|
||||
"bulk_revocation.go": "M-003: bulk revocation is fleet-scale destructive — admin-only",
|
||||
}
|
||||
|
||||
// InformationalIsAdminCallers is the documented allowlist of files that
|
||||
// call middleware.IsAdmin without using the result to gate access. The
|
||||
// only legitimate use of an informational call is reporting the flag to
|
||||
// a downstream consumer (e.g. health.go::AuthCheck reports admin to the
|
||||
// GUI so it can hide admin-only buttons).
|
||||
var InformationalIsAdminCallers = map[string]string{
|
||||
"health.go": "informational: reports admin flag to GUI for affordance gating, no server-side gate",
|
||||
}
|
||||
|
||||
func TestM008_AdminGatedHandlers_PinExpectedSet(t *testing.T) {
|
||||
actual, err := scanIsAdminCallers(".")
|
||||
if err != nil {
|
||||
t.Fatalf("scan handler dir: %v", err)
|
||||
}
|
||||
|
||||
expected := append([]string(nil), keys(AdminGatedHandlers)...)
|
||||
expected = append(expected, keys(InformationalIsAdminCallers)...)
|
||||
sort.Strings(actual)
|
||||
sort.Strings(expected)
|
||||
|
||||
if !slicesEqual008(actual, expected) {
|
||||
t.Errorf(
|
||||
"middleware.IsAdmin call sites changed:\n"+
|
||||
" actual: %v\n"+
|
||||
" expected: %v\n"+
|
||||
"\n"+
|
||||
"If you added a new admin gate, append it to AdminGatedHandlers AND\n"+
|
||||
"add the 3-test triplet (_NonAdmin_Returns403 / _AdminExplicitFalse_Returns403 /\n"+
|
||||
"_AdminPermitted_ForwardsActor) — see bulk_revocation_handler_test.go for\n"+
|
||||
"the template.\n"+
|
||||
"\n"+
|
||||
"If you added an informational caller (no gating), append to\n"+
|
||||
"InformationalIsAdminCallers with a justification.",
|
||||
actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestM008_AdminGatedHandlers_HaveTripletTests(t *testing.T) {
|
||||
for handlerFile := range AdminGatedHandlers {
|
||||
base := strings.TrimSuffix(handlerFile, ".go")
|
||||
// Look for the 3-test triplet in the corresponding _test.go file
|
||||
// or in any test file in the package — bulk_revocation_handler_test.go
|
||||
// follows a slightly different naming convention.
|
||||
matches, err := filepath.Glob("*_test.go")
|
||||
if err != nil {
|
||||
t.Fatalf("glob: %v", err)
|
||||
}
|
||||
var foundNonAdmin, foundExplicitFalse, foundAdminPermitted bool
|
||||
for _, m := range matches {
|
||||
body, err := os.ReadFile(m)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s := string(body)
|
||||
// Look for tests that mention the handler base name + the
|
||||
// expected suffix. Loose match because some test files use
|
||||
// _Handler_NonAdmin and others use _NonAdmin.
|
||||
if strings.Contains(s, "NonAdmin_Returns403") {
|
||||
foundNonAdmin = true
|
||||
}
|
||||
if strings.Contains(s, "AdminExplicitFalse_Returns403") {
|
||||
foundExplicitFalse = true
|
||||
}
|
||||
if strings.Contains(s, "AdminPermitted_ForwardsActor") {
|
||||
foundAdminPermitted = true
|
||||
}
|
||||
}
|
||||
if !foundNonAdmin {
|
||||
t.Errorf("admin-gated handler %s lacks a *_NonAdmin_Returns403 test", base)
|
||||
}
|
||||
if !foundExplicitFalse {
|
||||
t.Errorf("admin-gated handler %s lacks a *_AdminExplicitFalse_Returns403 test", base)
|
||||
}
|
||||
if !foundAdminPermitted {
|
||||
t.Errorf("admin-gated handler %s lacks a *_AdminPermitted_ForwardsActor test", base)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- helpers --------------------------------------------------------------
|
||||
|
||||
func scanIsAdminCallers(dir string) ([]string, error) {
|
||||
entries, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var out []string
|
||||
fset := token.NewFileSet()
|
||||
for _, e := range entries {
|
||||
name := e.Name()
|
||||
if !strings.HasSuffix(name, ".go") || strings.HasSuffix(name, "_test.go") {
|
||||
continue
|
||||
}
|
||||
body, err := os.ReadFile(filepath.Join(dir, name))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
_, parseErr := parser.ParseFile(fset, filepath.Join(dir, name), body, parser.SkipObjectResolution)
|
||||
if parseErr != nil {
|
||||
continue
|
||||
}
|
||||
// Substring-match middleware.IsAdmin — cheap and sufficient
|
||||
// because the import path is fixed and there's no aliasing
|
||||
// shenanigans elsewhere in this package.
|
||||
if strings.Contains(string(body), "middleware.IsAdmin(") {
|
||||
out = append(out, name)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func keys(m map[string]string) []string {
|
||||
out := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
out = append(out, k)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func slicesEqual008(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -66,6 +66,18 @@ type Config struct {
|
||||
// When enabled, the connector queries the CA's ARI endpoint to get CA-directed renewal timing.
|
||||
ARIEnabled bool `json:"ari_enabled,omitempty"`
|
||||
|
||||
// ARIHTTPTimeoutSeconds bounds the per-request timeout on ARI HTTP calls.
|
||||
// Bundle C / Audit M-019: a CA whose ARI endpoint is unreachable or
|
||||
// stalls indefinitely must not stall the renewal scheduler — the
|
||||
// fallback path is threshold-based renewal, which only kicks in once
|
||||
// the ARI request errors out. The audit's "no fallback timeout" claim
|
||||
// was wrong (a 15s default has been in place since the ARI feature
|
||||
// shipped), but the previous timeout was hardcoded; this knob makes
|
||||
// it configurable per-issuer for operators on flaky-CA networks.
|
||||
// Defaults to 15 when zero. CERTCTL_ACME_ARI_HTTP_TIMEOUT_SECONDS in
|
||||
// the env-driven build path.
|
||||
ARIHTTPTimeoutSeconds int `json:"ari_http_timeout_seconds,omitempty"`
|
||||
|
||||
// Insecure skips TLS certificate verification when connecting to the ACME directory.
|
||||
// Only use for testing with self-signed ACME servers like Pebble.
|
||||
Insecure bool `json:"insecure,omitempty"`
|
||||
|
||||
@@ -49,7 +49,7 @@ func (c *Connector) GetRenewalInfo(ctx context.Context, certPEM string) (*issuer
|
||||
return nil, fmt.Errorf("create ARI request: %w", err)
|
||||
}
|
||||
|
||||
httpClient := &http.Client{Timeout: 15 * time.Second}
|
||||
httpClient := &http.Client{Timeout: c.ariHTTPTimeout()}
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ARI request failed: %w", err)
|
||||
@@ -115,12 +115,22 @@ func computeARICertID(certPEM string) (string, error) {
|
||||
return certID, nil
|
||||
}
|
||||
|
||||
// ariHTTPTimeout returns the per-request timeout for ARI HTTP calls. Bundle C
|
||||
// / Audit M-019: configurable via Config.ARIHTTPTimeoutSeconds (env var
|
||||
// CERTCTL_ACME_ARI_HTTP_TIMEOUT_SECONDS), defaults to 15 seconds.
|
||||
func (c *Connector) ariHTTPTimeout() time.Duration {
|
||||
if c.config != nil && c.config.ARIHTTPTimeoutSeconds > 0 {
|
||||
return time.Duration(c.config.ARIHTTPTimeoutSeconds) * time.Second
|
||||
}
|
||||
return 15 * time.Second
|
||||
}
|
||||
|
||||
// getARIEndpoint constructs the ARI endpoint URL from the ACME directory.
|
||||
// It fetches the directory JSON and extracts the "renewalInfo" field if available.
|
||||
// Falls back to a standard URL pattern if the directory doesn't advertise renewalInfo.
|
||||
func (c *Connector) getARIEndpoint(ctx context.Context, certID string) (string, error) {
|
||||
// Try to fetch and parse the directory
|
||||
httpClient := &http.Client{Timeout: 15 * time.Second}
|
||||
httpClient := &http.Client{Timeout: c.ariHTTPTimeout()}
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.config.DirectoryURL, nil)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create directory request: %w", err)
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
package acme
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Bundle C / Audit M-019 (CWE-400): pin the ARI HTTP timeout dispatch
|
||||
// contract. Config.ARIHTTPTimeoutSeconds = 0 → 15s default. Non-zero
|
||||
// values override. The 15s default predates Bundle C and is preserved
|
||||
// byte-for-byte; this test guards against a future refactor that drops
|
||||
// the default and silently configures HTTP clients with no timeout
|
||||
// (which would re-open the M-019 stall risk).
|
||||
|
||||
func newARITestConnector(t *testing.T, timeoutSec int) *Connector {
|
||||
t.Helper()
|
||||
cfg := &Config{
|
||||
DirectoryURL: "https://acme.example.invalid/directory",
|
||||
ARIEnabled: true,
|
||||
ARIHTTPTimeoutSeconds: timeoutSec,
|
||||
}
|
||||
return New(cfg, slog.New(slog.NewTextHandler(testDiscardWriter{}, nil)))
|
||||
}
|
||||
|
||||
type testDiscardWriter struct{}
|
||||
|
||||
func (testDiscardWriter) Write(p []byte) (int, error) { return len(p), nil }
|
||||
|
||||
func TestARIHTTPTimeout_DefaultIs15s(t *testing.T) {
|
||||
c := newARITestConnector(t, 0)
|
||||
got := c.ariHTTPTimeout()
|
||||
want := 15 * time.Second
|
||||
if got != want {
|
||||
t.Errorf("ariHTTPTimeout default: got %s, want %s", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestARIHTTPTimeout_NonZeroOverridesDefault(t *testing.T) {
|
||||
c := newARITestConnector(t, 45)
|
||||
got := c.ariHTTPTimeout()
|
||||
want := 45 * time.Second
|
||||
if got != want {
|
||||
t.Errorf("ariHTTPTimeout override: got %s, want %s", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestARIHTTPTimeout_NegativeValuesUseDefault(t *testing.T) {
|
||||
// Negative values are nonsensical but should fall back to the
|
||||
// default rather than producing an immediate-timeout client.
|
||||
c := newARITestConnector(t, -1)
|
||||
got := c.ariHTTPTimeout()
|
||||
want := 15 * time.Second
|
||||
if got != want {
|
||||
t.Errorf("negative ariHTTPTimeout should fall back to default: got %s, want %s", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestARIHTTPTimeout_NilConfigSafeDefault(t *testing.T) {
|
||||
// Defensive: a connector with nil config must not panic and must
|
||||
// return the documented default. This is a guard for tests / DI
|
||||
// callers that hand in a partially-built Connector.
|
||||
c := &Connector{}
|
||||
got := c.ariHTTPTimeout()
|
||||
want := 15 * time.Second
|
||||
if got != want {
|
||||
t.Errorf("nil-config ariHTTPTimeout: got %s, want %s", got, want)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Bundle C / Audit M-015: pin the renewal-flow cardinality invariant.
|
||||
//
|
||||
// The audit's claim is "renewal flow assumes single profile per certificate;
|
||||
// no cardinality validation". Verified-already-clean: the certificate
|
||||
// struct holds exactly one CertificateProfileID and one RenewalPolicyID
|
||||
// as bare strings, not slices. There is literally no way to attach
|
||||
// multiple profiles or policies to a managed certificate without changing
|
||||
// the struct shape — which this test guards against.
|
||||
//
|
||||
// If a future schema change introduces N:N profiles or N:N renewal
|
||||
// policies, this test fails and forces the change to be paired with
|
||||
// a deliberate update of internal/service/renewal.go's iteration logic.
|
||||
|
||||
func TestManagedCertificate_SingleProfileCardinality(t *testing.T) {
|
||||
rt := reflect.TypeOf(ManagedCertificate{})
|
||||
cases := []struct {
|
||||
field string
|
||||
wantKind reflect.Kind
|
||||
}{
|
||||
{"CertificateProfileID", reflect.String},
|
||||
{"RenewalPolicyID", reflect.String},
|
||||
{"IssuerID", reflect.String},
|
||||
{"OwnerID", reflect.String},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.field, func(t *testing.T) {
|
||||
f, ok := rt.FieldByName(tc.field)
|
||||
if !ok {
|
||||
t.Fatalf("ManagedCertificate.%s field missing", tc.field)
|
||||
}
|
||||
if f.Type.Kind() != tc.wantKind {
|
||||
t.Errorf("ManagedCertificate.%s kind = %s, want %s "+
|
||||
"(M-015 cardinality pin: 1:1 relationships only — "+
|
||||
"if you're changing this you must also update "+
|
||||
"internal/service/renewal.go's profile/policy lookup)",
|
||||
tc.field, f.Type.Kind(), tc.wantKind)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRenewalPolicy_SingleProfileCardinality(t *testing.T) {
|
||||
rt := reflect.TypeOf(RenewalPolicy{})
|
||||
f, ok := rt.FieldByName("CertificateProfileID")
|
||||
if !ok {
|
||||
t.Fatal("RenewalPolicy.CertificateProfileID field missing")
|
||||
}
|
||||
if f.Type.Kind() != reflect.String {
|
||||
t.Errorf("RenewalPolicy.CertificateProfileID kind = %s, want String "+
|
||||
"(M-015 cardinality pin)", f.Type.Kind())
|
||||
}
|
||||
}
|
||||
@@ -271,6 +271,17 @@ type JobRepository interface {
|
||||
// Failed; I-001's retry loop then auto-promotes eligible Failed jobs back to Pending.
|
||||
// I-003 coverage-gap closure.
|
||||
ListTimedOutAwaitingJobs(ctx context.Context, csrCutoff, approvalCutoff time.Time) ([]*domain.Job, error)
|
||||
|
||||
// ListJobsWithOfflineAgents returns jobs in Running status whose owning
|
||||
// agent's last_heartbeat_at is older than agentCutoff. Bundle C / Audit
|
||||
// M-016 (CWE-754): the existing ListTimedOutAwaitingJobs scope only
|
||||
// covers AwaitingCSR / AwaitingApproval — jobs that were claimed by an
|
||||
// agent and then stalled because the agent itself died (host crash,
|
||||
// container OOM, network partition) sit in Running indefinitely with
|
||||
// no recovery path. The reaper loop transitions these to Failed with
|
||||
// reason "agent_offline" so I-001's retry loop can re-queue them on
|
||||
// a healthy agent.
|
||||
ListJobsWithOfflineAgents(ctx context.Context, agentCutoff time.Time) ([]*domain.Job, error)
|
||||
}
|
||||
|
||||
// RenewalPolicyRepository defines operations for managing renewal policies.
|
||||
|
||||
@@ -607,6 +607,48 @@ func (r *JobRepository) ListTimedOutAwaitingJobs(ctx context.Context, csrCutoff,
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// ListJobsWithOfflineAgents returns jobs in Running status whose owning
|
||||
// agent's last_heartbeat_at is older than agentCutoff. Bundle C / Audit
|
||||
// M-016 (CWE-754): closes the gap that ListTimedOutAwaitingJobs left
|
||||
// open — jobs claimed by an agent that subsequently dies sit in Running
|
||||
// indefinitely. The query joins jobs to agents on agent_id and filters
|
||||
// to (status='Running' AND agent.last_heartbeat_at < agentCutoff).
|
||||
//
|
||||
// Jobs without an agent_id (server-side keygen path) are intentionally
|
||||
// excluded: they have no agent to be "offline".
|
||||
func (r *JobRepository) ListJobsWithOfflineAgents(ctx context.Context, agentCutoff time.Time) ([]*domain.Job, error) {
|
||||
rows, err := r.db.QueryContext(ctx, `
|
||||
SELECT j.id, j.type, j.certificate_id, j.target_id, j.agent_id, j.status,
|
||||
j.attempts, j.max_attempts, j.last_error, j.scheduled_at,
|
||||
j.started_at, j.completed_at, j.created_at
|
||||
FROM jobs j
|
||||
JOIN agents a ON a.id = j.agent_id
|
||||
WHERE j.status = $1
|
||||
AND j.agent_id IS NOT NULL
|
||||
AND a.last_heartbeat_at IS NOT NULL
|
||||
AND a.last_heartbeat_at < $2
|
||||
ORDER BY j.started_at ASC NULLS FIRST
|
||||
`, domain.JobStatusRunning, agentCutoff)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query jobs with offline agents: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []*domain.Job
|
||||
for rows.Next() {
|
||||
job, err := scanJob(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating offline-agent job rows: %w", err)
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// scanJob scans a job from a row or rows
|
||||
func scanJob(scanner interface {
|
||||
Scan(...interface{}) error
|
||||
|
||||
@@ -67,6 +67,12 @@ type CloudDiscoveryServicer interface {
|
||||
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
|
||||
type JobReaperService interface {
|
||||
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
|
||||
// Bundle C / Audit M-016 (CWE-754): closes the gap left by ReapTimedOutJobs
|
||||
// (which only handles AwaitingCSR / AwaitingApproval). Jobs in Running
|
||||
// status whose owning agent has been silent for longer than agentTTL get
|
||||
// transitioned to Failed with reason "agent_offline" so I-001's retry
|
||||
// loop can re-queue them on a healthy agent.
|
||||
ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error
|
||||
}
|
||||
|
||||
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
|
||||
@@ -97,6 +103,9 @@ type Scheduler struct {
|
||||
healthCheckInterval time.Duration
|
||||
cloudDiscoveryInterval time.Duration
|
||||
jobTimeoutInterval time.Duration
|
||||
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
|
||||
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
|
||||
agentOfflineJobTTL time.Duration
|
||||
awaitingCSRTimeout time.Duration
|
||||
awaitingApprovalTimeout time.Duration
|
||||
|
||||
@@ -148,6 +157,9 @@ func NewScheduler(
|
||||
healthCheckInterval: 60 * time.Second,
|
||||
cloudDiscoveryInterval: 6 * time.Hour,
|
||||
jobTimeoutInterval: 10 * time.Minute,
|
||||
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
|
||||
// must miss multiple heartbeats before its in-flight jobs are reaped.
|
||||
agentOfflineJobTTL: 5 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,6 +245,16 @@ func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
|
||||
s.jobReaper = jr
|
||||
}
|
||||
|
||||
// SetAgentOfflineJobTTL sets the threshold past which a Running job whose
|
||||
// owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016.
|
||||
// Zero or negative values are ignored (the default of 5 minutes is kept).
|
||||
func (s *Scheduler) SetAgentOfflineJobTTL(d time.Duration) {
|
||||
if d <= 0 {
|
||||
return
|
||||
}
|
||||
s.agentOfflineJobTTL = d
|
||||
}
|
||||
|
||||
// SetJobTimeoutInterval sets the job timeout reaper tick interval (I-003).
|
||||
func (s *Scheduler) SetJobTimeoutInterval(d time.Duration) {
|
||||
s.jobTimeoutInterval = d
|
||||
@@ -503,6 +525,15 @@ func (s *Scheduler) jobTimeoutLoop(ctx context.Context) {
|
||||
// When no JobReaperService has been wired (e.g. in tests that don't exercise
|
||||
// I-003) the call is a safe no-op, preserving the always-on loop topology
|
||||
// described in I-003 without forcing every consumer to wire a reaper.
|
||||
//
|
||||
// Bundle C / Audit M-016: the reaping cycle now has TWO arms:
|
||||
//
|
||||
// 1. ReapTimedOutJobs handles AwaitingCSR / AwaitingApproval timeouts (I-003).
|
||||
// 2. ReapJobsWithOfflineAgents handles Running jobs whose owning agent has
|
||||
// gone silent (M-016). Reuses the same agentHealthCheckTimeout as the
|
||||
// mark-stale-agents-offline path for consistency: if the agent is judged
|
||||
// offline by AgentService.MarkStaleAgentsOffline, its in-flight jobs
|
||||
// should be reaped on the same cadence.
|
||||
func (s *Scheduler) runJobTimeout(ctx context.Context) {
|
||||
if s.jobReaper == nil {
|
||||
return
|
||||
@@ -516,6 +547,20 @@ func (s *Scheduler) runJobTimeout(ctx context.Context) {
|
||||
} else {
|
||||
s.logger.Debug("job timeout reaper completed")
|
||||
}
|
||||
// Second arm: offline-agent reaper. Uses agentOfflineTimeout (defaults to
|
||||
// 5 minutes — same value the agent-health-check path uses to flip an
|
||||
// agent to Offline). A sensible default of 5×agentHealthCheckInterval
|
||||
// catches agents that miss multiple consecutive heartbeats while leaving
|
||||
// a single missed beat as a transient blip that does NOT reap.
|
||||
offlineCtx, offlineCancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
defer offlineCancel()
|
||||
if err := s.jobReaper.ReapJobsWithOfflineAgents(offlineCtx, s.agentOfflineJobTTL); err != nil {
|
||||
s.logger.Error("offline-agent job reaper failed",
|
||||
"error", err,
|
||||
"agent_offline_ttl", s.agentOfflineJobTTL.String())
|
||||
} else {
|
||||
s.logger.Debug("offline-agent job reaper completed")
|
||||
}
|
||||
}
|
||||
|
||||
// agentHealthCheckLoop runs every agentHealthCheckInterval and marks stale agents as offline.
|
||||
|
||||
@@ -165,6 +165,15 @@ func (m *mockJobService) ReapTimedOutJobs(ctx context.Context, csrTTL, approvalT
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReapJobsWithOfflineAgents is the Bundle C / Audit M-016 stub. The
|
||||
// existing scheduler tests do not exercise this path; the offline-agent
|
||||
// reaper has its own end-to-end test in internal/service. Here we just
|
||||
// satisfy the JobReaperService interface so the scheduler tests still
|
||||
// compile.
|
||||
func (m *mockJobService) ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// mockAgentService is a mock implementation for testing.
|
||||
type mockAgentService struct {
|
||||
mu sync.Mutex
|
||||
|
||||
@@ -237,6 +237,58 @@ func (s *JobService) RetryFailedJobs(ctx context.Context, maxRetries int) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReapJobsWithOfflineAgents transitions jobs in Running status whose
|
||||
// owning agent has been silent longer than agentTTL to Failed with
|
||||
// reason "agent_offline". Bundle C / Audit M-016 (CWE-754): closes the
|
||||
// gap left by ReapTimedOutJobs (which only handles AwaitingCSR /
|
||||
// AwaitingApproval). I-001's retry loop then auto-promotes eligible
|
||||
// Failed jobs back to Pending so a healthy agent can claim them.
|
||||
func (s *JobService) ReapJobsWithOfflineAgents(ctx context.Context, agentTTL time.Duration) error {
|
||||
if agentTTL <= 0 {
|
||||
return fmt.Errorf("ReapJobsWithOfflineAgents: agentTTL must be positive, got %s", agentTTL)
|
||||
}
|
||||
cutoff := time.Now().Add(-agentTTL)
|
||||
|
||||
staleJobs, err := s.jobRepo.ListJobsWithOfflineAgents(ctx, cutoff)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list jobs with offline agents: %w", err)
|
||||
}
|
||||
|
||||
var reaped int
|
||||
for _, job := range staleJobs {
|
||||
oldStatus := job.Status
|
||||
errMsg := fmt.Sprintf("agent offline (no heartbeat for >%s)", agentTTL)
|
||||
|
||||
job.Status = domain.JobStatusFailed
|
||||
job.LastError = &errMsg
|
||||
|
||||
if err := s.jobRepo.Update(ctx, job); err != nil {
|
||||
s.logger.Error("failed to transition offline-agent job",
|
||||
"job_id", job.ID, "agent_id", job.AgentID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if s.auditService != nil {
|
||||
if auditErr := s.auditService.RecordEvent(ctx, "system", domain.ActorTypeSystem,
|
||||
"job_offline_agent_reap", "job", job.ID,
|
||||
map[string]interface{}{
|
||||
"old_status": string(oldStatus),
|
||||
"new_status": string(domain.JobStatusFailed),
|
||||
"timeout_reason": "agent_offline",
|
||||
"agent_id": job.AgentID,
|
||||
}); auditErr != nil {
|
||||
s.logger.Error("failed to record offline-agent reap audit event",
|
||||
"job_id", job.ID, "error", auditErr)
|
||||
}
|
||||
}
|
||||
reaped++
|
||||
}
|
||||
|
||||
s.logger.Info("offline-agent job reaper completed",
|
||||
"reaped", reaped, "total_stale", len(staleJobs))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReapTimedOutJobs transitions jobs stuck in AwaitingCSR or AwaitingApproval
|
||||
// to Failed if they've exceeded their TTL. I-001's retry loop then auto-promotes
|
||||
// eligible Failed jobs back to Pending (closes coverage gap I-003).
|
||||
|
||||
@@ -0,0 +1,169 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/shankar0123/certctl/internal/domain"
|
||||
)
|
||||
|
||||
// Bundle C / Audit M-016 (CWE-754): regression suite for the new
|
||||
// ReapJobsWithOfflineAgents path. Pre-bundle the reaper only handled
|
||||
// AwaitingCSR / AwaitingApproval timeouts; jobs claimed by an agent
|
||||
// that subsequently dies sat in Running indefinitely. These tests pin
|
||||
// the new behavior end-to-end through the JobService → mockJobRepo
|
||||
// boundary.
|
||||
|
||||
func newOfflineReaperService(t *testing.T) (*JobService, *mockJobRepo, *mockAuditRepo) {
|
||||
t.Helper()
|
||||
jobRepo := &mockJobRepo{
|
||||
Jobs: map[string]*domain.Job{},
|
||||
Agents: map[string]*domain.Agent{},
|
||||
}
|
||||
auditRepo := newMockAuditRepository()
|
||||
auditService := NewAuditService(auditRepo)
|
||||
svc := NewJobService(jobRepo, nil, nil, nil, nil, slog.New(slog.NewTextHandler(io.Discard, nil)))
|
||||
svc.SetAuditService(auditService)
|
||||
return svc, jobRepo, auditRepo
|
||||
}
|
||||
|
||||
func mkRunningJob(id, agentID string) *domain.Job {
|
||||
a := agentID
|
||||
now := time.Now()
|
||||
return &domain.Job{
|
||||
ID: id,
|
||||
AgentID: &a,
|
||||
Status: domain.JobStatusRunning,
|
||||
CreatedAt: now.Add(-2 * time.Hour),
|
||||
}
|
||||
}
|
||||
|
||||
func mkAgentWithHeartbeat(id string, hbAge time.Duration) *domain.Agent {
|
||||
hb := time.Now().Add(-hbAge)
|
||||
return &domain.Agent{
|
||||
ID: id,
|
||||
Name: id,
|
||||
LastHeartbeatAt: &hb,
|
||||
}
|
||||
}
|
||||
|
||||
func TestReapJobsWithOfflineAgents_FlipsRunningToFailed(t *testing.T) {
|
||||
svc, repo, _ := newOfflineReaperService(t)
|
||||
|
||||
repo.Agents["agt-stale"] = mkAgentWithHeartbeat("agt-stale", 30*time.Minute)
|
||||
repo.Agents["agt-fresh"] = mkAgentWithHeartbeat("agt-fresh", 1*time.Minute)
|
||||
repo.Jobs["j-stale"] = mkRunningJob("j-stale", "agt-stale")
|
||||
repo.Jobs["j-fresh"] = mkRunningJob("j-fresh", "agt-fresh")
|
||||
|
||||
if err := svc.ReapJobsWithOfflineAgents(context.Background(), 10*time.Minute); err != nil {
|
||||
t.Fatalf("reaper returned error: %v", err)
|
||||
}
|
||||
|
||||
if got := repo.Jobs["j-stale"].Status; got != domain.JobStatusFailed {
|
||||
t.Errorf("stale-agent job status = %s, want Failed", got)
|
||||
}
|
||||
if got := repo.Jobs["j-fresh"].Status; got != domain.JobStatusRunning {
|
||||
t.Errorf("fresh-agent job status = %s, want Running (must NOT be reaped)", got)
|
||||
}
|
||||
|
||||
stale := repo.Jobs["j-stale"]
|
||||
if stale.LastError == nil || !strings.Contains(*stale.LastError, "agent offline") {
|
||||
t.Errorf("stale job LastError must cite agent offline; got: %v", stale.LastError)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReapJobsWithOfflineAgents_SkipsServerKeygenJobs(t *testing.T) {
|
||||
// Jobs without an agent_id (server-side keygen) must NOT be reaped
|
||||
// by this path — they have no agent to be "offline".
|
||||
svc, repo, _ := newOfflineReaperService(t)
|
||||
noAgent := &domain.Job{
|
||||
ID: "j-server",
|
||||
Status: domain.JobStatusRunning,
|
||||
CreatedAt: time.Now().Add(-time.Hour),
|
||||
}
|
||||
repo.Jobs["j-server"] = noAgent
|
||||
|
||||
if err := svc.ReapJobsWithOfflineAgents(context.Background(), 1*time.Minute); err != nil {
|
||||
t.Fatalf("reaper returned error: %v", err)
|
||||
}
|
||||
if got := repo.Jobs["j-server"].Status; got != domain.JobStatusRunning {
|
||||
t.Errorf("server-keygen job (no agent_id) status = %s, want Running", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReapJobsWithOfflineAgents_SkipsNonRunningJobs(t *testing.T) {
|
||||
// Pending / AwaitingCSR / AwaitingApproval jobs are NOT in scope —
|
||||
// they're handled by ReapTimedOutJobs (I-003) or ClaimPendingJobs.
|
||||
svc, repo, _ := newOfflineReaperService(t)
|
||||
repo.Agents["agt-stale"] = mkAgentWithHeartbeat("agt-stale", 1*time.Hour)
|
||||
repo.Jobs["j-pending"] = func() *domain.Job {
|
||||
j := mkRunningJob("j-pending", "agt-stale")
|
||||
j.Status = domain.JobStatusPending
|
||||
return j
|
||||
}()
|
||||
|
||||
if err := svc.ReapJobsWithOfflineAgents(context.Background(), 1*time.Minute); err != nil {
|
||||
t.Fatalf("reaper returned error: %v", err)
|
||||
}
|
||||
if got := repo.Jobs["j-pending"].Status; got != domain.JobStatusPending {
|
||||
t.Errorf("Pending job status = %s, want Pending (out of scope for offline-agent reaper)", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReapJobsWithOfflineAgents_RejectsNonPositiveTTL(t *testing.T) {
|
||||
svc, _, _ := newOfflineReaperService(t)
|
||||
if err := svc.ReapJobsWithOfflineAgents(context.Background(), 0); err == nil {
|
||||
t.Error("expected error for zero TTL — fail-loud guard against misconfig")
|
||||
}
|
||||
if err := svc.ReapJobsWithOfflineAgents(context.Background(), -time.Hour); err == nil {
|
||||
t.Error("expected error for negative TTL — fail-loud guard against misconfig")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReapJobsWithOfflineAgents_PropagatesRepoError(t *testing.T) {
|
||||
svc, repo, _ := newOfflineReaperService(t)
|
||||
repo.ListOfflineAgentJobsErr = errors.New("simulated db down")
|
||||
|
||||
err := svc.ReapJobsWithOfflineAgents(context.Background(), 5*time.Minute)
|
||||
if err == nil {
|
||||
t.Fatal("expected error to propagate from repo")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "simulated db down") {
|
||||
t.Errorf("expected wrapped repo error, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReapJobsWithOfflineAgents_RecordsAuditEvent(t *testing.T) {
|
||||
svc, repo, audit := newOfflineReaperService(t)
|
||||
repo.Agents["agt-stale"] = mkAgentWithHeartbeat("agt-stale", 30*time.Minute)
|
||||
repo.Jobs["j-stale"] = mkRunningJob("j-stale", "agt-stale")
|
||||
|
||||
if err := svc.ReapJobsWithOfflineAgents(context.Background(), 5*time.Minute); err != nil {
|
||||
t.Fatalf("reaper: %v", err)
|
||||
}
|
||||
|
||||
audit.mu.Lock()
|
||||
events := append([]*domain.AuditEvent(nil), audit.Events...)
|
||||
audit.mu.Unlock()
|
||||
var found *domain.AuditEvent
|
||||
for i := range events {
|
||||
if events[i].Action == "job_offline_agent_reap" {
|
||||
found = events[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if found == nil {
|
||||
t.Fatal("expected job_offline_agent_reap audit event, got none")
|
||||
}
|
||||
if found.Actor != "system" {
|
||||
t.Errorf("audit Actor = %q, want system", found.Actor)
|
||||
}
|
||||
if found.ResourceType != "job" || found.ResourceID != "j-stale" {
|
||||
t.Errorf("audit resource binding wrong: %s/%s", found.ResourceType, found.ResourceID)
|
||||
}
|
||||
}
|
||||
@@ -157,20 +157,22 @@ func (m *mockCertRepo) AddCert(cert *domain.ManagedCertificate) {
|
||||
|
||||
// mockJobRepo is a test implementation of JobRepository
|
||||
type mockJobRepo struct {
|
||||
mu sync.Mutex
|
||||
Jobs map[string]*domain.Job
|
||||
StatusUpdates map[string]domain.JobStatus
|
||||
CreateErr error
|
||||
UpdateErr error
|
||||
UpdateErrorByID map[string]error
|
||||
UpdateErrorByIDMu sync.Mutex
|
||||
UpdateStatusErr error
|
||||
GetErr error
|
||||
ListErr error
|
||||
ListByStatusErr error
|
||||
DeleteErr error
|
||||
ListTimedOutErr error
|
||||
Updated []*domain.Job
|
||||
mu sync.Mutex
|
||||
Jobs map[string]*domain.Job
|
||||
Agents map[string]*domain.Agent
|
||||
StatusUpdates map[string]domain.JobStatus
|
||||
CreateErr error
|
||||
UpdateErr error
|
||||
UpdateErrorByID map[string]error
|
||||
UpdateErrorByIDMu sync.Mutex
|
||||
UpdateStatusErr error
|
||||
GetErr error
|
||||
ListErr error
|
||||
ListByStatusErr error
|
||||
DeleteErr error
|
||||
ListTimedOutErr error
|
||||
ListOfflineAgentJobsErr error
|
||||
Updated []*domain.Job
|
||||
}
|
||||
|
||||
func (m *mockJobRepo) List(ctx context.Context) ([]*domain.Job, error) {
|
||||
@@ -387,6 +389,34 @@ func (m *mockJobRepo) ListTimedOutAwaitingJobs(ctx context.Context, csrCutoff, a
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
// ListJobsWithOfflineAgents returns Running jobs whose owning agent's
|
||||
// last_heartbeat_at is older than agentCutoff. The mock walks Jobs +
|
||||
// Agents the same way the real repo does. Bundle C / Audit M-016.
|
||||
func (m *mockJobRepo) ListJobsWithOfflineAgents(ctx context.Context, agentCutoff time.Time) ([]*domain.Job, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.ListOfflineAgentJobsErr != nil {
|
||||
return nil, m.ListOfflineAgentJobsErr
|
||||
}
|
||||
var jobs []*domain.Job
|
||||
for _, j := range m.Jobs {
|
||||
if j.Status != domain.JobStatusRunning {
|
||||
continue
|
||||
}
|
||||
if j.AgentID == nil || *j.AgentID == "" {
|
||||
continue
|
||||
}
|
||||
ag, ok := m.Agents[*j.AgentID]
|
||||
if !ok || ag.LastHeartbeatAt == nil {
|
||||
continue
|
||||
}
|
||||
if ag.LastHeartbeatAt.Before(agentCutoff) {
|
||||
jobs = append(jobs, j)
|
||||
}
|
||||
}
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (m *mockJobRepo) AddJob(job *domain.Job) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@@ -81,6 +81,11 @@ func (m *mockVerificationJobRepo) ListTimedOutAwaitingJobs(ctx context.Context,
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Bundle C / Audit M-016: stub for the new offline-agent reaper repo method.
|
||||
func (m *mockVerificationJobRepo) ListJobsWithOfflineAgents(ctx context.Context, agentCutoff time.Time) ([]*domain.Job, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// newVerificationTestService creates a VerificationService wired with test doubles.
|
||||
func newVerificationTestService(jobs map[string]*domain.Job, jobRepoErr error) (*VerificationService, *mockVerificationJobRepo, *mockAuditRepo) {
|
||||
jobRepo := &mockVerificationJobRepo{jobs: jobs, err: jobRepoErr}
|
||||
|
||||
@@ -24,6 +24,14 @@
|
||||
-- can DROP it by name without ambiguity; un-named CHECK constraints use
|
||||
-- a synthesized PostgreSQL name that varies by environment.
|
||||
|
||||
-- Bundle C / Audit M-006 (CWE-913): idempotency guard. Drop-if-exists
|
||||
-- before ADD so a re-run of this migration against a partially-applied
|
||||
-- DB doesn't fail with "constraint already exists". Mirrors the down
|
||||
-- migration's DROP CONSTRAINT IF EXISTS shape and the M-7 idempotent-
|
||||
-- index idiom.
|
||||
ALTER TABLE policy_violations
|
||||
DROP CONSTRAINT IF EXISTS policy_violations_severity_check;
|
||||
|
||||
ALTER TABLE policy_violations
|
||||
ADD CONSTRAINT policy_violations_severity_check
|
||||
CHECK (severity IN ('Warning', 'Error', 'Critical'));
|
||||
|
||||
Reference in New Issue
Block a user