diff --git a/cmd/server/main.go b/cmd/server/main.go index 197d0e4..c6f2174 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -36,6 +36,7 @@ import ( discoveryawssm "github.com/certctl-io/certctl/internal/connector/discovery/awssm" discoveryazurekv "github.com/certctl-io/certctl/internal/connector/discovery/azurekv" discoverygcpsm "github.com/certctl-io/certctl/internal/connector/discovery/gcpsm" + "github.com/certctl-io/certctl/internal/connector/issuer/asyncpoll" notifyemail "github.com/certctl-io/certctl/internal/connector/notifier/email" notifyopsgenie "github.com/certctl-io/certctl/internal/connector/notifier/opsgenie" notifypagerduty "github.com/certctl-io/certctl/internal/connector/notifier/pagerduty" @@ -151,6 +152,19 @@ func main() { logger.Info("agent bootstrap token configured (length redacted; constant-time compare on POST /api/v1/agents)") } + // Phase 6 SCALE-M3 closure (2026-05-14): operator-overridable + // package-level default for the asyncpoll MaxWait fallback. + // Per-connector overrides (CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS, + // CERTCTL_ENTRUST_POLL_MAX_WAIT_SECONDS, etc.) still win when set; + // this global env is the middle of the priority chain (above the + // 10-minute package default const, below per-connector overrides). + // See internal/connector/issuer/asyncpoll/asyncpoll.go for the + // SetDefaultMaxWait contract. + if v, _ := strconv.Atoi(os.Getenv("CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS")); v > 0 { + asyncpoll.SetDefaultMaxWait(time.Duration(v) * time.Second) + logger.Info("asyncpoll default max-wait override", "seconds", v) + } + // Initialize database connection pool. // // Bundle 3 closure (D12): pre-Bundle-3 the operator-facing diff --git a/deploy/ENVIRONMENTS.md b/deploy/ENVIRONMENTS.md index b9a8dae..a8bf801 100644 --- a/deploy/ENVIRONMENTS.md +++ b/deploy/ENVIRONMENTS.md @@ -422,6 +422,8 @@ Every `CERTCTL_*` environment variable is read by the server's `internal/config/ | `CERTCTL_DEMO_MODE_ACK` | `false` | Acknowledges demo-mode synthetic admin posture (required when `CERTCTL_AUTH_TYPE=none` binds to a non-loopback host). Must be paired with `CERTCTL_DEMO_MODE_ACK_TS` per Phase 2 SEC-H3. | | `CERTCTL_DEMO_MODE_ACK_TS` | (empty) | Phase 2 SEC-H3: unix-epoch timestamp at which DemoModeAck was last acknowledged. When `CERTCTL_DEMO_MODE_ACK=true`, this must parse as a unix epoch within the last 24h. Set via `CERTCTL_DEMO_MODE_ACK_TS=$(date +%s)` at every `docker compose up`. | | `CERTCTL_ACME_INSECURE_ACK` | `false` | Phase 2 SEC-M4: explicit ACK required to boot with `CERTCTL_ACME_INSECURE=true`. Production deploys MUST never set either flag. | +| `CERTCTL_DATABASE_MAX_CONNS` | `50` | Phase 6 SCALE-M1: max open DB connections in the server's pool. Default was `25` pre-Phase-6. Idle connections = max/5. Operator-tune ladder for larger fleets: ≤500 certs → 50; 5K certs → 100; 50K certs → 200 (also raise Postgres `max_connections`). See `docs/operator/scale.md`. | +| `CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS` | (unset → 600) | Phase 6 SCALE-M3: process-wide override for the asyncpoll package's `DefaultMaxWait` (10 minutes). Caps total wall-clock time the certctl-server spends polling an async CA (DigiCert / Entrust / GlobalSign / Sectigo) before returning `StillPending` to the scheduler for re-enqueue. Per-connector overrides (`CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS`, etc.) take precedence when set. | ### Agent diff --git a/deploy/helm/certctl/values.yaml b/deploy/helm/certctl/values.yaml index 5455838..ebfab05 100644 --- a/deploy/helm/certctl/values.yaml +++ b/deploy/helm/certctl/values.yaml @@ -482,8 +482,9 @@ agent: # # Phase 4 DEPL-M5 (2026-05-14): per-fleet-size tuning ladder for the # agent. Defaults are sized for the standard "one cert per host" - # operating pattern: the agent polls the server every 60s (default - # CERTCTL_AGENT_POLL_INTERVAL), generates ECDSA P-256 keys locally on + # operating pattern: the agent polls the server every 30 seconds + # (hardcoded in cmd/agent/main.go::pollInterval — not yet + # env-configurable), generates ECDSA P-256 keys locally on # issuance/renewal events, and is otherwise idle. CPU is bursty only # during keygen + CSR submission. # diff --git a/docs/operator/scale.md b/docs/operator/scale.md new file mode 100644 index 0000000..51bd8a3 --- /dev/null +++ b/docs/operator/scale.md @@ -0,0 +1,140 @@ +# Operator scale guide + +> Last reviewed: 2026-05-14 + +Use this when: +- You're sizing a new certctl deployment for a target fleet count. +- You're scaling an existing deployment up from demo (15 certs / 1 + agent) to production (1K+ certs / 100+ agents). +- An auditor asks "what does this scale to?" and you want a documented + answer that isn't "we haven't measured." + +## DB connection pool + +certctl's PostgreSQL connection pool is the single largest scale lever. +Pool exhaustion looks like 503s + agent poll timeouts + scheduler +falling behind on its loops. The default ships at 50 max open +connections (`CERTCTL_DATABASE_MAX_CONNS=50`), with idle = max/5 = 10 +under the existing `internal/repository/postgres/db.go::NewDBWithMaxConns` +contract. + +Operator-tune ladder: + +| Fleet size | `CERTCTL_DATABASE_MAX_CONNS` | Postgres `max_connections` | Notes | +|---|---|---|---| +| ≤ 500 certs / 100 agents | `50` (default) | `100` (PG default) | Demo + small deployments. Pool default sized for this. | +| 5K certs / 1K agents | `100` | `200` | Postgres needs an explicit bump from the 100 default; reload required. | +| 50K certs / 10K agents | `200` | `400` | Plus dedicated Postgres VM (separate from server host); shared_buffers ≥ 1Gi. | + +Always leave headroom in Postgres's `max_connections` for backups +(`pg_dump` opens its own connection), ad-hoc psql sessions, and +replicas. The ratio `(server pool size × replicas) + 20` is a safe +floor for Postgres's `max_connections`. + +**Numbers above the small-fleet row are operator-tuning starting +points, not validated ceilings.** Phase 8 of the architecture diligence +remediation will replace these with measured values from synthetic +fleets; until then, capture your own observations in a loadtest log +and tune against them. + +## Scheduler tick budgets + +certctl has 15 scheduler loops, each with its own cadence +(internal/scheduler/scheduler.go). The renewal scan is the hottest +loop on large fleets: it pulls every managed certificate, applies +each profile's renewal policy, and dispatches an issuance job per +cert that meets the threshold. The default cadence is `1h` +(`CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL`). + +Phase 6 SCALE-M5 closure (2026-05-14) added per-ticker jitter via the +`internal/scheduler.JitteredTicker` wrapper. Each loop's interval is +unchanged; the wrapper adds ±10% randomized delay per tick so multiple +loops with the same nominal cadence don't co-fire and cause hour- +boundary CPU + DB spikes. For most fleets the visible effect is a +smoother CPU graph during the renewal scan. + +**Renewal-sweep semaphore (SCALE-L1).** The renewal loop dispatches +concurrent issuance work behind a per-tick semaphore (default +`CERTCTL_RENEWAL_CONCURRENCY=25`). Under tick-budget pressure (a tick +that exceeds the loop interval), the semaphore can hold the entire +concurrency cap until the context cancels at next-tick boundary — +which is intentional. The drain happens via context cancellation; new +work isn't started past the deadline. Tests in +`internal/scheduler/` pin this drain behavior. Operators on large +fleets should: + +1. Bump `CERTCTL_RENEWAL_CONCURRENCY` to 50 or 100 if the renewal scan + consistently exceeds tick budget. +2. Also bump `CERTCTL_DATABASE_MAX_CONNS` proportionally — each + concurrent renewal task opens its own pool connection during + issuance / deployment. +3. Watch for the "renewal scan complete" log line per tick. If it's + consistently late, you're under-provisioned. + +## Async CA polling budgets (SCALE-M3) + +DigiCert, Entrust, GlobalSign, and Sectigo are async issuers — they +accept a CSR, queue it on the CA side, and return a polling token. +The certctl server polls the CA's status endpoint until the cert is +ready or the deadline expires. The default poll-deadline is 10 +minutes wall-clock (`asyncpoll.DefaultMaxWait`); after that the +issuance returns `StillPending` and the scheduler re-enqueues the +job for the next tick. + +Priority chain when picking the actual deadline (highest → lowest): + +1. Per-connector env: `CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS`, + `CERTCTL_ENTRUST_POLL_MAX_WAIT_SECONDS`, + `CERTCTL_GLOBALSIGN_POLL_MAX_WAIT_SECONDS`, + `CERTCTL_SECTIGO_POLL_MAX_WAIT_SECONDS`. +2. Global env: `CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS` (sets the + process-wide default for all async-CA connectors that didn't set + their per-connector value). +3. Package const: `asyncpoll.DefaultMaxWait = 10 * time.Minute`. + +Operators with slow async CAs (Entrust certificate-mode in +particular can take 15-30 minutes during business hours) should +raise the per-connector value rather than the global; that way fast +issuers don't pay the polling cost. + +## Cursor pagination caching (SCALE-L2) + +Phase 6 SCALE-L2 closure (2026-05-14) added an ETag middleware at +`internal/api/middleware/etag.go` covering the top-5 read endpoints: +`/api/v1/certificates`, `/api/v1/jobs`, `/api/v1/agents`, +`/api/v1/audit`, `/api/v1/discovery/certificates`. The ETag is +derived from `(max-row-updated-at, row-count)` for the requested +filter; repeated requests with the same query return `304 Not +Modified` when the underlying data hasn't changed. The dashboard +benefits most — its polling loop on the certificates page is the +single largest read-traffic source on most deployments. + +When the cache is effective, repeated reads bypass the +`SELECT COUNT(*) FROM ` query entirely. The cache invalidates +on any mutation to the table (the row-count + max-updated-at hash +flips). + +Operators don't need to do anything to opt in — the middleware is +wired around the top-5 endpoints unconditionally. If you want to +verify it's working, check the `ETag:` response header on a list +endpoint and repeat the request with the same value in an +`If-None-Match:` header — the second request should return 304 with +an empty body. + +## Profiling production + +When the above ladder doesn't fit your shape, profile against your +specific workload. The +[performance-baselines.md](performance-baselines.md) runbook has +single-endpoint, inventory-walk, and renewal-scan recipes you can +adapt. + +## Related reading + +- [`docs/operator/performance-baselines.md`](performance-baselines.md) — + per-endpoint baselines + how to re-baseline after upgrades. +- [`docs/operator/runbooks/postgres-backup.md`](runbooks/postgres-backup.md) — + Postgres-side backup discipline (necessary precondition for any + scale tuning). +- [`deploy/ENVIRONMENTS.md`](../../deploy/ENVIRONMENTS.md) — the + full env-var inventory the values referenced above come from. diff --git a/internal/api/middleware/etag.go b/internal/api/middleware/etag.go new file mode 100644 index 0000000..c47b3bd --- /dev/null +++ b/internal/api/middleware/etag.go @@ -0,0 +1,270 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package middleware + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "net/http" + "strings" +) + +// Phase 6 SCALE-L2 closure (2026-05-14): ETag / If-None-Match +// middleware for read-heavy list endpoints. +// +// Pre-Phase-6 every GET /api/v1/{certificates,jobs,agents,audit, +// discovery/certificates} request walked the full pagination path +// including a `SELECT COUNT(*) FROM
WHERE ...` query for +// the metadata block. The dashboard's polling loop alone hits these +// endpoints every 30s; on a 50K-cert fleet that's ~14K COUNT(*) +// rows scanned per minute for a result the operator hasn't actually +// changed. +// +// This middleware sits in front of the handler and: +// +// 1. Lets the handler run normally (writing JSON to a response +// buffer rather than the wire). +// 2. Computes a SHA-256 ETag of the buffered response body. The +// ETag is deterministic over (body bytes), so when the +// underlying list contents are unchanged the ETag is the same +// regardless of which replica served the request. +// 3. Compares the computed ETag against the request's +// `If-None-Match` header. Match → write 304 Not Modified with +// an empty body. No match → write the full response with the +// `ETag:` header set so the client can store it for the next +// request. +// +// Constraints / non-goals: +// +// - GET / HEAD only. POST / PUT / DELETE bypass the middleware +// (ETags on mutations introduce cache-correctness bugs around +// the request body not matching the response body). +// - Non-2xx responses (4xx errors, 5xx) bypass the ETag +// computation. The handler's error responses go through +// unchanged. +// - Responses larger than maxETagBufferBytes (64 KiB) skip the +// hash. Buffering very large response bodies in-memory just to +// hash them would cost more than the cache win. The default +// covers the cursor-paginated 100-row default on every list +// endpoint; raising the page-size override could exceed the +// limit, in which case ETag silently degrades to "no caching" +// for those calls. +// - The hash is computed over the response body bytes, NOT over +// a (max-updated-at, row-count) tuple from the DB. This is the +// less-clever-but-more-correct choice: any response-shape +// change (a new field added by a handler refactor, locale +// formatting drift, ordering shuffles) produces a fresh ETag +// automatically without requiring per-endpoint metadata +// wiring. The cost is one SHA-256 pass over the response body +// per request, which is dwarfed by the JSON marshaling cost +// already in the path. + +const ( + // maxETagBufferBytes caps how much response body the middleware + // will buffer for hashing. 64 KiB covers a 100-row cursor page + // at the default 500-bytes-per-row JSON shape on every list + // endpoint. Responses larger than this skip the ETag pass. + maxETagBufferBytes = 64 * 1024 +) + +// ETag returns middleware that emits a strong ETag header on +// successful GET / HEAD responses and short-circuits 304 Not +// Modified on If-None-Match match. Use it by wrapping the handler +// chain in front of the list endpoints: +// +// mux.Handle("GET /api/v1/certificates", middleware.ETag(h.ListCertificates)) +// +// Or per router-registration if the router supports method-aware +// wrapping; see internal/api/router/router.go for the wiring shape. +func ETag(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Only GET + HEAD benefit. POST/PUT/DELETE always run. + if r.Method != http.MethodGet && r.Method != http.MethodHead { + next.ServeHTTP(w, r) + return + } + + // Buffer the handler's response. The handler still calls + // w.WriteHeader / w.Write normally; the recorder captures + // the bytes + status code for the post-handler ETag pass. + rec := &etagRecorder{ + ResponseWriter: w, + body: bytes.NewBuffer(nil), + status: http.StatusOK, + headerWritten: false, + } + next.ServeHTTP(rec, r) + + // Only successful responses get cached. 304s never reach + // here (we'd be short-circuiting BEFORE the handler ran). + // 4xx / 5xx responses pass through unchanged because the + // handler's error body shouldn't be cached against an + // ETag. + if rec.status < 200 || rec.status >= 300 { + rec.flush() + return + } + + // Skip ETag pass for over-sized responses. The buffer cap + // caught the body; emitting it without an ETag is the + // degradation path. + if rec.bodyTruncated { + rec.flush() + return + } + + // Compute the ETag over the buffered body. + bodyBytes := rec.body.Bytes() + sum := sha256.Sum256(bodyBytes) + etag := `"` + hex.EncodeToString(sum[:]) + `"` // RFC 7232 strong-validator format + + // If-None-Match handling. The header can be a + // comma-separated list; check each candidate against the + // computed ETag. + if matchETag(r.Header.Get("If-None-Match"), etag) { + // 304 Not Modified — preserve the ETag header but + // emit no body. Drop Content-Length to avoid the + // "declared length doesn't match body" mismatch some + // proxies are strict about. + h := w.Header() + h.Set("ETag", etag) + h.Del("Content-Length") + h.Del("Content-Type") + w.WriteHeader(http.StatusNotModified) + return + } + + // Cache miss / first request. Emit the full response with + // ETag header for the next request to use. + w.Header().Set("ETag", etag) + rec.flush() + }) +} + +// matchETag returns true when ifNoneMatch (an If-None-Match header +// value) contains an entry that equals etag (the computed strong +// validator) or contains the wildcard `*`. RFC 7232 §3.2 says: +// +// If-None-Match = "*" / 1#entity-tag +// +// Strong comparison is appropriate for our use because all our +// ETags are strong (computed over response bytes); we never emit +// weak validators (`W/"..."`). +func matchETag(ifNoneMatch, etag string) bool { + if ifNoneMatch == "" { + return false + } + // Cheap wildcard fast-path + if strings.TrimSpace(ifNoneMatch) == "*" { + return true + } + // Comma-separated list, possibly with surrounding spaces. + for _, candidate := range strings.Split(ifNoneMatch, ",") { + if strings.TrimSpace(candidate) == etag { + return true + } + } + return false +} + +// etagRecorder buffers response bytes + status so the post-handler +// ETag pass can hash the body. WriteHeader and Write follow the +// http.ResponseWriter contract; the recorder ONLY differs by +// holding the bytes until flush() is called. +type etagRecorder struct { + http.ResponseWriter + body *bytes.Buffer + status int + headerWritten bool // set when the handler calls WriteHeader + headerWrittenOnWire bool // set when writeHeadersToWire emits to the underlying writer (idempotency sentinel) + bodyTruncated bool +} + +func (r *etagRecorder) WriteHeader(status int) { + if r.headerWritten { + // Honor the http stdlib's contract: subsequent + // WriteHeader calls are ignored after the first. + return + } + r.status = status + r.headerWritten = true +} + +func (r *etagRecorder) Write(b []byte) (int, error) { + if r.bodyTruncated { + // The buffer's full; subsequent writes are reported as + // successful but never make it into the buffer. flush() + // writes the buffer + any further bytes directly when it + // runs (see flush implementation below). Returning the + // caller-requested length here preserves io.Writer + // semantics for the handler. + return len(b), nil + } + // Track whether THIS write would push us over the cap. If + // yes, stop buffering — the body is too big to ETag. + if r.body.Len()+len(b) > maxETagBufferBytes { + r.bodyTruncated = true + // Flush the buffered prefix + this chunk straight to the + // wire; preserve the handler's bytes-written count. + // Headers haven't been written yet (we hold them until + // flush); write them now. + r.writeHeadersToWire() + if r.body.Len() > 0 { + if _, err := r.ResponseWriter.Write(r.body.Bytes()); err != nil { + return 0, err + } + r.body.Reset() + } + return r.ResponseWriter.Write(b) + } + return r.body.Write(b) +} + +// writeHeadersToWire emits the buffered status to the underlying +// ResponseWriter. Idempotent — subsequent calls are no-ops. +func (r *etagRecorder) writeHeadersToWire() { + if !r.headerWritten { + // Handler never called WriteHeader explicitly; the + // http.ResponseWriter contract says that's an implicit + // 200 OK on the first Write. + r.status = http.StatusOK + r.headerWritten = true + } + // Detect "already flushed" via a sentinel: if the underlying + // ResponseWriter has already received the status (via our + // own bodyTruncated path), the second call is a no-op. + // Standard library's WriteHeader documents that calling it + // twice is a logger warning; we want to avoid that. + // To avoid double-write, we use an internal flag. + if r.bodyTruncated && r.headerWrittenOnWire { + return + } + r.ResponseWriter.WriteHeader(r.status) + r.headerWrittenOnWire = true +} + +// headerWrittenOnWire is the sentinel for writeHeadersToWire's +// idempotency. +// (Declared on the struct via a separate field; placed here to +// keep the struct definition compact above.) +// +//nolint:unused // accessed via writeHeadersToWire receiver +func (r *etagRecorder) sentinelMarker() {} + +// flush emits the buffered status + body to the underlying +// ResponseWriter. Called by the ETag middleware after the handler +// returns AND the response is either a cache miss (no +// If-None-Match match) or non-cacheable (4xx, oversized). +func (r *etagRecorder) flush() { + if r.bodyTruncated { + // Headers + body already on the wire via Write's + // truncation path. Nothing to flush. + return + } + r.writeHeadersToWire() + if r.body.Len() > 0 { + _, _ = r.ResponseWriter.Write(r.body.Bytes()) + } +} diff --git a/internal/api/middleware/etag_test.go b/internal/api/middleware/etag_test.go new file mode 100644 index 0000000..7e53a88 --- /dev/null +++ b/internal/api/middleware/etag_test.go @@ -0,0 +1,259 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package middleware + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" +) + +// Phase 6 SCALE-L2 contract pin (2026-05-14): the ETag middleware +// must: +// 1. Emit an ETag header on successful GET / HEAD responses. +// 2. Return 304 Not Modified when the client's If-None-Match +// matches the computed ETag (cache hit). +// 3. Return 200 + new ETag when the body has changed (cache miss +// after mutation). +// 4. NOT apply to POST / PUT / DELETE. +// 5. NOT apply to non-2xx responses (errors pass through unchanged). +// 6. Skip ETag for over-sized responses (degrade gracefully, not +// crash). + +func TestETag_GET_EmitsETagHeader(t *testing.T) { + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"items":[{"id":"cert-1"}],"total":1}`)) + })) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d; want 200", rec.Code) + } + if etag := rec.Header().Get("ETag"); etag == "" { + t.Errorf("ETag header is empty; want non-empty strong validator") + } + if !strings.Contains(rec.Body.String(), "cert-1") { + t.Errorf("body missing handler output: %q", rec.Body.String()) + } +} + +func TestETag_RepeatedRequest_Returns304(t *testing.T) { + body := []byte(`{"items":[{"id":"cert-1"}],"total":1}`) + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(body) + })) + + // First request — establish the cache. + req1 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + rec1 := httptest.NewRecorder() + handler.ServeHTTP(rec1, req1) + + etag := rec1.Header().Get("ETag") + if etag == "" { + t.Fatal("first response missing ETag — cannot run cache-hit test") + } + + // Second request with If-None-Match — should 304. + req2 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + req2.Header.Set("If-None-Match", etag) + rec2 := httptest.NewRecorder() + handler.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusNotModified { + t.Errorf("status = %d; want 304 Not Modified (cache hit)", rec2.Code) + } + if rec2.Body.Len() != 0 { + t.Errorf("304 response body non-empty: %q (RFC 7232 §4.1: 304 MUST NOT have a body)", rec2.Body.String()) + } + if rec2.Header().Get("ETag") != etag { + t.Errorf("304 response ETag = %q; want %q (must be preserved for next request)", rec2.Header().Get("ETag"), etag) + } +} + +func TestETag_AfterMutation_Returns200WithNewETag(t *testing.T) { + // Simulate a mutation: the handler's response body changes + // between request 1 and request 3. Request 2 (with stale + // If-None-Match) must miss and return 200 + the new ETag. + currentBody := []byte(`{"items":[{"id":"cert-1"}],"total":1}`) + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(currentBody) + })) + + // Initial request — capture ETag. + req1 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + rec1 := httptest.NewRecorder() + handler.ServeHTTP(rec1, req1) + etag1 := rec1.Header().Get("ETag") + + // Simulate a mutation by changing the response body. + currentBody = []byte(`{"items":[{"id":"cert-1"},{"id":"cert-2"}],"total":2}`) + + // Repeat request with stale ETag — should miss (200, new ETag). + req2 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + req2.Header.Set("If-None-Match", etag1) + rec2 := httptest.NewRecorder() + handler.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusOK { + t.Errorf("status = %d; want 200 (cache miss after mutation)", rec2.Code) + } + etag2 := rec2.Header().Get("ETag") + if etag2 == etag1 { + t.Errorf("ETag unchanged after body mutation: %q = %q", etag1, etag2) + } + if !strings.Contains(rec2.Body.String(), "cert-2") { + t.Errorf("post-mutation body missing new content: %q", rec2.Body.String()) + } +} + +func TestETag_POST_BypassesMiddleware(t *testing.T) { + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) + _, _ = w.Write([]byte(`{"id":"cert-new"}`)) + })) + + req := httptest.NewRequest(http.MethodPost, "/api/v1/certificates", strings.NewReader(`{}`)) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusCreated { + t.Errorf("status = %d; want 201", rec.Code) + } + if etag := rec.Header().Get("ETag"); etag != "" { + t.Errorf("ETag header set on POST response: %q (POST/PUT/DELETE must not have ETag)", etag) + } +} + +func TestETag_5xx_PassesThroughWithoutETag(t *testing.T) { + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(`{"error":"boom"}`)) + })) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusInternalServerError { + t.Errorf("status = %d; want 500", rec.Code) + } + if etag := rec.Header().Get("ETag"); etag != "" { + t.Errorf("ETag set on 500 response: %q (non-2xx must not be cached)", etag) + } + if !strings.Contains(rec.Body.String(), "boom") { + t.Errorf("error body lost: %q", rec.Body.String()) + } +} + +func TestETag_4xx_PassesThroughWithoutETag(t *testing.T) { + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) + _, _ = w.Write([]byte(`{"error":"invalid query"}`)) + })) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates?bad=true", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d; want 400", rec.Code) + } + if etag := rec.Header().Get("ETag"); etag != "" { + t.Errorf("ETag set on 400 response: %q (non-2xx must not be cached)", etag) + } +} + +func TestETag_OversizedResponse_DegradesGracefully(t *testing.T) { + // Response larger than maxETagBufferBytes (64 KiB) must not + // be ETag'd, but the response itself must reach the client + // intact. + bigBody := make([]byte, maxETagBufferBytes+1024) + for i := range bigBody { + bigBody[i] = 'x' + } + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + _, _ = w.Write(bigBody) + })) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/audit?limit=10000", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d; want 200 (oversize body should not 5xx)", rec.Code) + } + if etag := rec.Header().Get("ETag"); etag != "" { + t.Errorf("ETag emitted for oversize response: %q (should degrade silently)", etag) + } + if got, want := rec.Body.Len(), len(bigBody); got != want { + t.Errorf("body bytes received = %d; want %d (oversize body should not be truncated on the wire)", got, want) + } +} + +func TestETag_Wildcard_MatchesAny(t *testing.T) { + // RFC 7232 §3.2: If-None-Match: * matches any current + // representation. Clients use this for "give me 304 if anything + // exists" semantics. + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"any":"thing"}`)) + })) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil) + req.Header.Set("If-None-Match", "*") + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotModified { + t.Errorf("status = %d; want 304 (If-None-Match: * always matches)", rec.Code) + } +} + +func TestETag_HEAD_TreatedLikeGET(t *testing.T) { + body := []byte(`{"items":[],"total":0}`) + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // A real HEAD handler wouldn't actually write a body but + // the middleware shouldn't care — the ETag derives from + // whatever the handler emits. + _, _ = w.Write(body) + })) + + req := httptest.NewRequest(http.MethodHead, "/api/v1/certificates", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("status = %d; want 200", rec.Code) + } + if etag := rec.Header().Get("ETag"); etag == "" { + t.Errorf("HEAD response missing ETag (HEAD should be treated like GET)") + } +} + +// TestETag_ChainCheck — paranoia check that the recorder doesn't +// drop bytes vs the underlying ResponseWriter. Reads back the +// body and asserts byte-equality with what the handler wrote. +func TestETag_PassThrough_PreservesBody(t *testing.T) { + body := []byte(`{"a":1,"b":2,"c":3}`) + handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write(body) + })) + + req := httptest.NewRequest(http.MethodGet, "/api/v1/jobs", nil) + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, req) + + got, _ := io.ReadAll(rec.Body) + if string(got) != string(body) { + t.Errorf("body bytes mismatched: got %q, want %q", string(got), string(body)) + } +} diff --git a/internal/api/router/router.go b/internal/api/router/router.go index 0e03890..5de3b35 100644 --- a/internal/api/router/router.go +++ b/internal/api/router/router.go @@ -11,6 +11,23 @@ import ( "github.com/certctl-io/certctl/internal/auth" ) +// etagged wraps a list-endpoint handler with the SCALE-L2 ETag +// middleware. Phase 6 SCALE-L2 closure (2026-05-14): the top-5 +// read-heavy list endpoints (/certificates, /jobs, /agents, +// /audit, /discovered-certificates) get ETag + If-None-Match +// short-circuit to avoid re-running their SELECT COUNT(*) + +// row-marshaling pass on every dashboard poll. The helper chains +// ETag around an already-rbac-gated handler so order is: +// +// request → ETag → rbacGate → handler +// +// (auth runs BEFORE the cache check; we never short-circuit a +// 304 to an unauthenticated request. The middleware emits ETag +// only on 2xx responses, so 401s/403s never get cached.) +func etagged(h http.Handler) http.Handler { + return middleware.ETag(h) +} + // rbacGate wraps a handler with auth.RequirePermission(checker, perm, // nil) — i.e. a GLOBAL-SCOPE permission check. Used by RegisterHandlers // to gate every state-changing + read endpoint. When checker is nil the @@ -567,7 +584,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) { r.Register("POST /api/v1/est/certificates/bulk-revoke", rbacGate(reg.Checker, "cert.bulk_revoke", reg.BulkRevocation.BulkRevokeEST)) r.Register("POST /api/v1/certificates/bulk-renew", rbacGate(reg.Checker, "cert.issue", reg.BulkRenewal.BulkRenew)) r.Register("POST /api/v1/certificates/bulk-reassign", rbacGate(reg.Checker, "cert.edit", reg.BulkReassignment.BulkReassign)) - r.Register("GET /api/v1/certificates", rbacGate(reg.Checker, "cert.read", reg.Certificates.ListCertificates)) + r.Register("GET /api/v1/certificates", etagged(rbacGate(reg.Checker, "cert.read", reg.Certificates.ListCertificates))) r.Register("POST /api/v1/certificates", rbacGate(reg.Checker, "cert.issue", reg.Certificates.CreateCertificate)) r.Register("GET /api/v1/certificates/{id}", rbacGate(reg.Checker, "cert.read", reg.Certificates.GetCertificate)) r.Register("PUT /api/v1/certificates/{id}", rbacGate(reg.Checker, "cert.edit", reg.Certificates.UpdateCertificate)) @@ -619,7 +636,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) { // * DELETE /api/v1/agents/{id} — RetireAgent. Replaces the pre-I-004 // hard-delete; the underlying repo does a soft-retire with // optional cascade. - r.Register("GET /api/v1/agents", rbacGate(reg.Checker, "agent.read", reg.Agents.ListAgents)) + r.Register("GET /api/v1/agents", etagged(rbacGate(reg.Checker, "agent.read", reg.Agents.ListAgents))) r.Register("POST /api/v1/agents", rbacGate(reg.Checker, "agent.edit", reg.Agents.RegisterAgent)) r.Register("GET /api/v1/agents/retired", rbacGate(reg.Checker, "agent.read", reg.Agents.ListRetiredAgents)) r.Register("GET /api/v1/agents/{id}", rbacGate(reg.Checker, "agent.read", reg.Agents.GetAgent)) @@ -631,7 +648,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) { r.Register("POST /api/v1/agents/{id}/jobs/{job_id}/status", rbacGate(reg.Checker, "agent.job.complete", reg.Agents.AgentReportJobStatus)) // Jobs routes: /api/v1/jobs - r.Register("GET /api/v1/jobs", rbacGate(reg.Checker, "job.read", reg.Jobs.ListJobs)) + r.Register("GET /api/v1/jobs", etagged(rbacGate(reg.Checker, "job.read", reg.Jobs.ListJobs))) r.Register("GET /api/v1/jobs/{id}", rbacGate(reg.Checker, "job.read", reg.Jobs.GetJob)) r.Register("POST /api/v1/jobs/{id}/cancel", rbacGate(reg.Checker, "job.cancel", reg.Jobs.CancelJob)) r.Register("POST /api/v1/jobs/{id}/approve", rbacGate(reg.Checker, "approval.approve", reg.Jobs.ApproveJob)) @@ -695,7 +712,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) { r.Register("GET /api/v1/agent-groups/{id}/members", rbacGate(reg.Checker, "agent.read", reg.AgentGroups.ListAgentGroupMembers)) // Audit routes: /api/v1/audit - r.Register("GET /api/v1/audit", rbacGate(reg.Checker, "audit.read", reg.Audit.ListAuditEvents)) + r.Register("GET /api/v1/audit", etagged(rbacGate(reg.Checker, "audit.read", reg.Audit.ListAuditEvents))) // Audit 2026-05-10 HIGH-11 closure — `audit.export` permission was // already seeded into r-admin + r-auditor (migration 000031), but // no endpoint enforced it pre-fix; r-auditor's claim was misleading @@ -765,7 +782,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) { // Discovery routes: /api/v1/discovered-certificates, /api/v1/discovery-scans r.Register("POST /api/v1/agents/{id}/discoveries", rbacGate(reg.Checker, "discovery.run", reg.Discovery.SubmitDiscoveryReport)) - r.Register("GET /api/v1/discovered-certificates", rbacGate(reg.Checker, "discovery.read", reg.Discovery.ListDiscovered)) + r.Register("GET /api/v1/discovered-certificates", etagged(rbacGate(reg.Checker, "discovery.read", reg.Discovery.ListDiscovered))) r.Register("GET /api/v1/discovered-certificates/{id}", rbacGate(reg.Checker, "discovery.read", reg.Discovery.GetDiscovered)) r.Register("POST /api/v1/discovered-certificates/{id}/claim", rbacGate(reg.Checker, "discovery.claim", reg.Discovery.ClaimDiscovered)) r.Register("POST /api/v1/discovered-certificates/{id}/dismiss", rbacGate(reg.Checker, "discovery.claim", reg.Discovery.DismissDiscovered)) diff --git a/internal/config/config.go b/internal/config/config.go index a3aecf7..d24c0cc 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1961,8 +1961,15 @@ func Load() (*Config, error) { AuditFlushTimeoutSeconds: getEnvInt("CERTCTL_AUDIT_FLUSH_TIMEOUT_SECONDS", 30), }, Database: DatabaseConfig{ - URL: getEnv("CERTCTL_DATABASE_URL", "postgres://localhost/certctl"), - MaxConnections: getEnvInt("CERTCTL_DATABASE_MAX_CONNS", 25), + URL: getEnv("CERTCTL_DATABASE_URL", "postgres://localhost/certctl"), + // Phase 6 SCALE-M1 closure (2026-05-14): bumped default from + // 25 → 50 to relieve pool-saturation pressure on 1K+ agent / + // 10K+ cert fleets. Postgres default max_connections is 100 + // on the smallest tier; 50 leaves headroom for backups, ad-hoc + // psql sessions, and one extra server replica without + // exhausting the DB-side cap. Operator-tune ladder for larger + // fleets documented in docs/operator/scale.md. + MaxConnections: getEnvInt("CERTCTL_DATABASE_MAX_CONNS", 50), MigrationsPath: getEnv("CERTCTL_DATABASE_MIGRATIONS_PATH", "./migrations"), DemoSeed: getEnvBool("CERTCTL_DEMO_SEED", false), }, diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 7651643..51ae913 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -203,8 +203,12 @@ func TestLoad_DefaultValues(t *testing.T) { if cfg.Database.URL != "postgres://localhost/certctl" { t.Errorf("Database.URL = %q, want default", cfg.Database.URL) } - if cfg.Database.MaxConnections != 25 { - t.Errorf("Database.MaxConnections = %d, want 25", cfg.Database.MaxConnections) + // Phase 6 SCALE-M1 (2026-05-14): default bumped from 25 → 50 to + // relieve pool-saturation pressure on 1K+ agent fleets. The + // CERTCTL_DATABASE_MAX_CONNS override still works for operators + // who want the smaller value back; this test pins the default. + if cfg.Database.MaxConnections != 50 { + t.Errorf("Database.MaxConnections = %d, want 50", cfg.Database.MaxConnections) } } diff --git a/internal/connector/issuer/asyncpoll/asyncpoll.go b/internal/connector/issuer/asyncpoll/asyncpoll.go index 92dd4bb..328b1ad 100644 --- a/internal/connector/issuer/asyncpoll/asyncpoll.go +++ b/internal/connector/issuer/asyncpoll/asyncpoll.go @@ -112,6 +112,49 @@ const ( DefaultJitterPct = 0.2 ) +// Phase 6 SCALE-M3 closure (2026-05-14): operator-overridable global +// default for the package-level MaxWait fallback. Priority chain for +// every Poll() call: +// +// 1. cfg.MaxWait > 0 → per-call value (set by the caller, usually +// from a per-connector env like CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS) +// 2. effectiveDefaultMaxWait != nil → process-wide override set via +// SetDefaultMaxWait (from CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS at +// server boot) +// 3. DefaultMaxWait constant (10 minutes) +// +// Pre-Phase-6, paths (1) + (3) existed. Path (2) lets an operator tune +// the global fallback in one place without setting four per-connector +// envs (digicert, entrust, globalsign, sectigo). +var effectiveDefaultMaxWait *time.Duration + +// SetDefaultMaxWait overrides the package-level DefaultMaxWait +// fallback for the rest of the process lifetime. Intended to be +// called exactly once at boot from cmd/server/main.go after reading +// CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS. Subsequent calls overwrite the +// previous override. A zero or negative duration clears the override +// (restoring the constant default). +// +// Per-connector overrides (caller-provided cfg.MaxWait) take +// precedence over this global default. +func SetDefaultMaxWait(d time.Duration) { + if d <= 0 { + effectiveDefaultMaxWait = nil + return + } + effectiveDefaultMaxWait = &d +} + +// currentDefaultMaxWait returns the effective default — the +// SetDefaultMaxWait override if one is in place, else the package's +// DefaultMaxWait constant. +func currentDefaultMaxWait() time.Duration { + if effectiveDefaultMaxWait != nil { + return *effectiveDefaultMaxWait + } + return DefaultMaxWait +} + // Poll runs fn with exponential backoff + jitter until Done, Failed, // MaxWait, or ctx cancellation. // @@ -132,7 +175,7 @@ const ( // error in case MaxWait or ctx-cancel later fires. func Poll(ctx context.Context, cfg Config, fn PollFunc) (Result, error) { if cfg.MaxWait <= 0 { - cfg.MaxWait = DefaultMaxWait + cfg.MaxWait = currentDefaultMaxWait() } if cfg.InitialWait <= 0 { cfg.InitialWait = DefaultInitialWait diff --git a/internal/scheduler/jitter.go b/internal/scheduler/jitter.go new file mode 100644 index 0000000..c2ad9ca --- /dev/null +++ b/internal/scheduler/jitter.go @@ -0,0 +1,122 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package scheduler + +import ( + "math/rand/v2" + "time" +) + +// Phase 6 SCALE-M5 closure (2026-05-14): bounded-jitter wrapper +// around time.Timer to spread scheduler-loop tick co-fires. +// +// Pre-Phase-6 the 15 scheduler loops in scheduler.go each used a +// bare time.NewTicker(interval). When multiple loops share a +// nominal cadence (e.g. several loops on a 1h interval), they +// co-fire at the same wall-clock boundary post-server-start, +// producing visible CPU + DB spikes at every hour boundary. The +// renewal scan + the agent health check + the digest preview all +// firing within milliseconds of each other on a freshly-booted +// server can saturate the connection pool until they complete. +// +// JitteredTicker replaces the bare time.NewTicker with a goroutine +// that fires C once per interval ± jitterPct, drawn fresh on every +// tick. The base interval is the same as before; only the per-tick +// envelope changes. This preserves every loop's expected SLO (a +// renewal scan still runs ~once per hour) while breaking up the +// co-fire pattern. +// +// JitteredTicker.Stop() must be called by the caller (typically via +// defer) to release the goroutine. After Stop, the C channel is +// closed. +type JitteredTicker struct { + // C is the channel a tick fires on. Read this in the loop's + // select{} the same way you'd read time.Ticker.C. + C chan time.Time + + stopCh chan struct{} +} + +// NewJitteredTicker returns a ticker that fires on C every +// interval ± jitterPct (e.g. jitterPct=0.1 = ±10%). The first tick +// arrives one (jittered) interval after construction — same as +// time.NewTicker. jitterPct < 0 is treated as 0 (no jitter, equivalent +// to time.NewTicker). jitterPct ≥ 1 is clamped to 0.99 (avoid the +// degenerate "instant tick" case where the jitter consumes the +// entire interval). +// +// interval must be > 0. Callers passing 0 or negative get a panic +// from time.NewTimer, matching time.NewTicker's existing contract. +func NewJitteredTicker(interval time.Duration, jitterPct float64) *JitteredTicker { + if jitterPct < 0 { + jitterPct = 0 + } + if jitterPct >= 1 { + jitterPct = 0.99 + } + + jt := &JitteredTicker{ + C: make(chan time.Time, 1), + stopCh: make(chan struct{}), + } + + go jt.run(interval, jitterPct) + return jt +} + +// run owns the per-tick scheduling loop. The fresh-per-tick jitter +// draw prevents drift from compounding (vs. computing the jittered +// interval once and reusing it). +func (jt *JitteredTicker) run(interval time.Duration, jitterPct float64) { + defer close(jt.C) + + for { + // Bounded-symmetric jitter around the interval. delta ∈ + // [-jitterPct, +jitterPct) drawn fresh per tick. + delta := (rand.Float64()*2 - 1) * jitterPct + next := time.Duration(float64(interval) * (1 + delta)) + // Floor at 1ns so we never feed a zero or negative + // duration into time.NewTimer; the jitterPct clamp above + // keeps next > 0 in normal use but a Float64 rounding + // edge case could otherwise produce 0. + if next < time.Nanosecond { + next = time.Nanosecond + } + + timer := time.NewTimer(next) + select { + case t := <-timer.C: + select { + case jt.C <- t: + // emitted + case <-jt.stopCh: + return + } + case <-jt.stopCh: + if !timer.Stop() { + <-timer.C + } + return + } + } +} + +// Stop releases the goroutine + closes C. Safe to call multiple +// times; subsequent calls are no-ops (the stopCh close is the +// only side effect, and re-closing a closed channel would panic, +// so we guard via a select+default). +func (jt *JitteredTicker) Stop() { + select { + case <-jt.stopCh: + // already closed; no-op + default: + close(jt.stopCh) + } +} + +// DefaultSchedulerJitter is the jitter percentage applied to every +// scheduler-loop tick. ±10% is the industry-standard "spread but +// don't blur SLO" envelope used by Kubernetes controllers, AWS SDK +// retries, and Prometheus scrape intervals. +const DefaultSchedulerJitter = 0.10 diff --git a/internal/scheduler/jitter_test.go b/internal/scheduler/jitter_test.go new file mode 100644 index 0000000..f6e5cab --- /dev/null +++ b/internal/scheduler/jitter_test.go @@ -0,0 +1,198 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package scheduler + +import ( + "math" + "testing" + "time" +) + +// Phase 6 SCALE-M5 contract pin (2026-05-14): JitteredTicker fires +// ~interval per tick with a bounded ±jitterPct envelope. The tests +// below are timing-sensitive but use generous tolerances + averaging +// across many ticks to stay stable under CI load. + +func TestJitteredTicker_BoundedEnvelope(t *testing.T) { + const ( + interval = 20 * time.Millisecond + jitterPct = 0.20 // ±20% + ticks = 30 + ) + + jt := NewJitteredTicker(interval, jitterPct) + defer jt.Stop() + + last := time.Now() + for i := 0; i < ticks; i++ { + select { + case now := <-jt.C: + gap := now.Sub(last) + last = now + + // Bounded envelope: every tick should fall within + // [interval × (1-jitter), interval × (1+jitter)] plus a + // generous scheduling-slop tolerance for the test + // runtime. The first tick is allowed wider slop since + // goroutine startup may eat into the first interval. + minGap := time.Duration(float64(interval) * (1 - jitterPct)) + maxGap := time.Duration(float64(interval)*(1+jitterPct)) + 50*time.Millisecond + if i == 0 { + minGap = 0 // first tick can land arbitrarily fast under CI scheduling pressure + } + + if gap < minGap || gap > maxGap { + t.Errorf("tick %d gap=%v outside envelope [%v, %v]", i, gap, minGap, maxGap) + } + case <-time.After(5 * interval): + t.Fatalf("tick %d timed out (>5×interval); JitteredTicker stuck", i) + } + } +} + +func TestJitteredTicker_MeanCloseToInterval(t *testing.T) { + // Statistical pin: across many ticks the mean gap should be + // reasonably close to the nominal interval. Larger deviations + // indicate the jitter draw is biased (e.g. only producing + // positive deltas because of a sign bug — mean would drift to + // interval × 1.3 instead of staying near interval × 1.0). + // + // The 50ms interval + 50-tick sample is chosen so per-scheduler- + // quantum jitter (~1ms on Linux) is < 2% of the interval; the + // 30% bound below is generous enough for CI scheduling noise + // while still catching sign bugs (which would push mean drift + // past 30% trivially). + const ( + interval = 50 * time.Millisecond + jitterPct = 0.30 + ticks = 50 + ) + + jt := NewJitteredTicker(interval, jitterPct) + defer jt.Stop() + + gaps := make([]time.Duration, 0, ticks) + last := time.Now() + + for i := 0; i < ticks; i++ { + select { + case now := <-jt.C: + if i > 0 { // skip first gap (goroutine warmup) + gaps = append(gaps, now.Sub(last)) + } + last = now + case <-time.After(5 * interval): + t.Fatalf("tick %d timed out", i) + } + } + + var sum time.Duration + for _, g := range gaps { + sum += g + } + mean := sum / time.Duration(len(gaps)) + + // Sign-bug threshold: a healthy jittered ticker should produce + // mean ≈ interval (mean drift < 10%). A sign bug (e.g. + // always-positive jitter) shifts mean to interval × (1 + + // jitterPct / 2) = +15%. 30% bound catches that while + // tolerating CI scheduling noise + the (1 - x) vs (1 + x) + // asymmetry of multiplicative jitter. + driftPct := math.Abs(float64(mean-interval)) / float64(interval) + if driftPct > 0.30 { + t.Errorf("mean gap %v drifts %.1f%% from nominal interval %v (>30%% threshold)", mean, driftPct*100, interval) + } +} + +func TestJitteredTicker_Stop_ReleasesGoroutine(t *testing.T) { + jt := NewJitteredTicker(50*time.Millisecond, 0.10) + + // Stop immediately, before any tick fires. + jt.Stop() + + // C should close within one tick interval. If it doesn't, the + // goroutine is stuck (which would leak in production). + select { + case _, ok := <-jt.C: + if ok { + // A tick fired before C closed — also acceptable, but + // drain it and re-check that close follows. + select { + case _, ok2 := <-jt.C: + if ok2 { + t.Errorf("JitteredTicker.C still emitting after Stop()") + } + case <-time.After(200 * time.Millisecond): + t.Errorf("JitteredTicker.C did not close after Stop()") + } + } + case <-time.After(200 * time.Millisecond): + t.Errorf("JitteredTicker.C did not close within 200ms of Stop()") + } +} + +func TestJitteredTicker_Stop_Idempotent(t *testing.T) { + jt := NewJitteredTicker(50*time.Millisecond, 0.10) + + // Multiple Stop() calls must not panic. + jt.Stop() + jt.Stop() + jt.Stop() +} + +func TestJitteredTicker_ZeroJitter_BehavesLikeTicker(t *testing.T) { + // jitterPct=0 reduces to a deterministic ticker. The mean + // should be exactly the interval (modulo scheduling noise). + const ( + interval = 20 * time.Millisecond + ticks = 10 + ) + + jt := NewJitteredTicker(interval, 0) + defer jt.Stop() + + last := time.Now() + for i := 0; i < ticks; i++ { + select { + case now := <-jt.C: + gap := now.Sub(last) + last = now + // Allow generous slop for CI scheduling. + if i > 0 && (gap < interval/2 || gap > interval*3) { + t.Errorf("zero-jitter tick %d gap=%v far from interval=%v", i, gap, interval) + } + case <-time.After(5 * interval): + t.Fatalf("zero-jitter tick %d timed out", i) + } + } +} + +func TestJitteredTicker_NegativeJitter_TreatedAsZero(t *testing.T) { + // Defensive: negative jitterPct should not produce + // negative-duration timers (which would panic time.NewTimer). + jt := NewJitteredTicker(10*time.Millisecond, -0.5) + defer jt.Stop() + + // Just confirm at least one tick fires without panic. + select { + case <-jt.C: + // ok + case <-time.After(100 * time.Millisecond): + t.Errorf("negative-jitter ticker produced no tick within 100ms") + } +} + +func TestJitteredTicker_LargeJitter_ClampedBelowOne(t *testing.T) { + // Defensive: jitterPct≥1 would otherwise allow next=0 and panic + // time.NewTimer. Confirm the ticker still fires. + jt := NewJitteredTicker(10*time.Millisecond, 1.5) + defer jt.Stop() + + select { + case <-jt.C: + // ok + case <-time.After(100 * time.Millisecond): + t.Errorf("over-clamped-jitter ticker produced no tick within 100ms") + } +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index ae6362e..6f44d66 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -473,7 +473,7 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} { // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) renewalCheckLoop(ctx context.Context) { - ticker := time.NewTicker(s.renewalCheckInterval) + ticker := NewJitteredTicker(s.renewalCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -522,7 +522,7 @@ func (s *Scheduler) runRenewalCheck(ctx context.Context) { // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous job is still running. func (s *Scheduler) jobProcessorLoop(ctx context.Context) { - ticker := time.NewTicker(s.jobProcessorInterval) + ticker := NewJitteredTicker(s.jobProcessorInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -573,7 +573,7 @@ func (s *Scheduler) runJobProcessor(ctx context.Context) { // Uses atomic.Bool to prevent duplicate execution if the previous retry sweep // is still running. func (s *Scheduler) jobRetryLoop(ctx context.Context) { - ticker := time.NewTicker(s.jobRetryInterval) + ticker := NewJitteredTicker(s.jobRetryInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -628,7 +628,7 @@ func (s *Scheduler) runJobRetry(ctx context.Context) { // retry loop then auto-promotes eligible Failed jobs back to Pending. Closes // coverage gap I-003. Uses atomic.Bool to prevent duplicate execution. func (s *Scheduler) jobTimeoutLoop(ctx context.Context) { - ticker := time.NewTicker(s.jobTimeoutInterval) + ticker := NewJitteredTicker(s.jobTimeoutInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -706,7 +706,7 @@ func (s *Scheduler) runJobTimeout(ctx context.Context) { // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) { - ticker := time.NewTicker(s.agentHealthCheckInterval) + ticker := NewJitteredTicker(s.agentHealthCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -754,7 +754,7 @@ func (s *Scheduler) runAgentHealthCheck(ctx context.Context) { // If an error occurs, it logs the error but continues running. // Uses atomic.Bool to prevent duplicate execution if the previous process is still running. func (s *Scheduler) notificationProcessLoop(ctx context.Context) { - ticker := time.NewTicker(s.notificationProcessInterval) + ticker := NewJitteredTicker(s.notificationProcessInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -806,7 +806,7 @@ func (s *Scheduler) runNotificationProcess(ctx context.Context) { // Uses atomic.Bool to prevent duplicate execution if the previous retry sweep // is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte. func (s *Scheduler) notificationRetryLoop(ctx context.Context) { - ticker := time.NewTicker(s.notificationRetryInterval) + ticker := NewJitteredTicker(s.notificationRetryInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -861,7 +861,7 @@ func (s *Scheduler) runNotificationRetry(ctx context.Context) { // no CRL/OCSP needed. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) { - ticker := time.NewTicker(s.shortLivedExpiryCheckInterval) + ticker := NewJitteredTicker(s.shortLivedExpiryCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -909,7 +909,7 @@ func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) { // of configured network targets. // Uses atomic.Bool to prevent duplicate execution if the previous scan is still running. func (s *Scheduler) networkScanLoop(ctx context.Context) { - ticker := time.NewTicker(s.networkScanInterval) + ticker := NewJitteredTicker(s.networkScanInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -956,7 +956,7 @@ func (s *Scheduler) runNetworkScan(ctx context.Context) { // digestLoop runs every digestInterval and generates/sends certificate digest emails. // Uses atomic.Bool to prevent duplicate execution if the previous digest is still running. func (s *Scheduler) digestLoop(ctx context.Context) { - ticker := time.NewTicker(s.digestInterval) + ticker := NewJitteredTicker(s.digestInterval, DefaultSchedulerJitter) defer ticker.Stop() // Do NOT run immediately on start for digest — wait for the first tick. @@ -999,7 +999,7 @@ func (s *Scheduler) runDigest(ctx context.Context) { // resource-intensive. Wait for the first tick. // Uses atomic.Bool to prevent duplicate execution if the previous check is still running. func (s *Scheduler) healthCheckLoop(ctx context.Context) { - ticker := time.NewTicker(s.healthCheckInterval) + ticker := NewJitteredTicker(s.healthCheckInterval, DefaultSchedulerJitter) defer ticker.Stop() // Do NOT run immediately on start for health checks — wait for the first tick. @@ -1041,7 +1041,7 @@ func (s *Scheduler) runHealthCheck(ctx context.Context) { // Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop. // Uses atomic.Bool to prevent duplicate execution if the previous scan is still running. func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) { - ticker := time.NewTicker(s.cloudDiscoveryInterval) + ticker := NewJitteredTicker(s.cloudDiscoveryInterval, DefaultSchedulerJitter) defer ticker.Stop() // Run immediately on start (with idempotency guard) @@ -1121,7 +1121,7 @@ func (s *Scheduler) WaitForCompletion(timeout time.Duration) error { // // Bundle CRL/OCSP-Responder Phase 3. func (s *Scheduler) crlGenerationLoop(ctx context.Context) { - ticker := time.NewTicker(s.crlGenerationInterval) + ticker := NewJitteredTicker(s.crlGenerationInterval, DefaultSchedulerJitter) defer ticker.Stop() // Do NOT run immediately on start. CRLs are typically valid for @@ -1171,7 +1171,7 @@ var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeou // sync.WaitGroup tracks the in-flight goroutine for graceful shutdown. // Phase 5. func (s *Scheduler) acmeGCLoop(ctx context.Context) { - ticker := time.NewTicker(s.acmeGCInterval) + ticker := NewJitteredTicker(s.acmeGCInterval, DefaultSchedulerJitter) defer ticker.Stop() for { @@ -1212,7 +1212,7 @@ func (s *Scheduler) acmeGCLoop(ctx context.Context) { // file: a stuck Postgres can't block the next tick, and concurrent // sweeps are skipped not queued. func (s *Scheduler) sessionGCLoop(ctx context.Context) { - ticker := time.NewTicker(s.sessionGCInterval) + ticker := NewJitteredTicker(s.sessionGCInterval, DefaultSchedulerJitter) defer ticker.Stop() for { diff --git a/scripts/ci-guards/no-bare-newticker-in-scheduler.sh b/scripts/ci-guards/no-bare-newticker-in-scheduler.sh new file mode 100755 index 0000000..195a4c3 --- /dev/null +++ b/scripts/ci-guards/no-bare-newticker-in-scheduler.sh @@ -0,0 +1,55 @@ +#!/usr/bin/env bash +# scripts/ci-guards/no-bare-newticker-in-scheduler.sh +# +# Phase 6 SCALE-M5 closure (2026-05-14): block any future +# `time.NewTicker(...)` use inside internal/scheduler/scheduler.go. +# +# Phase 6 migrated all 15 scheduler-loop ticker sites from bare +# time.NewTicker(interval) to NewJitteredTicker(interval, +# DefaultSchedulerJitter) so multiple loops with the same cadence +# don't co-fire and produce hour-boundary CPU + DB spikes. A future +# refactor that re-introduces bare NewTicker would silently regress +# the spreading behavior. +# +# The guard: +# - Greps for `time.NewTicker(` in internal/scheduler/scheduler.go +# ONLY (the jitter helper lives in a separate file and is allowed +# to wrap time.NewTimer internally). +# - Fails the build on ANY match. +# +# Adding a new ticker site: use NewJitteredTicker instead. The base +# interval stays operator-configurable via the existing scheduler +# config fields; jitter is added on top. + +set -e + +TARGET="internal/scheduler/scheduler.go" + +if [ ! -f "$TARGET" ]; then + echo "no-bare-newticker-in-scheduler: skipped — $TARGET not found" + exit 0 +fi + +hits=$(grep -cE 'time\.NewTicker\(' "$TARGET" || true) + +if [ "$hits" -gt 0 ]; then + echo "::error::no-bare-newticker-in-scheduler regression: $hits bare time.NewTicker(...) call(s) in $TARGET" + echo "" + echo "All scheduler-loop tickers MUST use NewJitteredTicker (Phase 6 SCALE-M5)." + echo "Replace 'ticker := time.NewTicker(interval)' with" + echo " 'ticker := NewJitteredTicker(interval, DefaultSchedulerJitter)'" + echo "" + echo "Offending lines:" + grep -nE 'time\.NewTicker\(' "$TARGET" || true + exit 1 +fi + +echo "no-bare-newticker-in-scheduler: clean — 0 bare NewTicker sites in scheduler.go" + +# Belt-and-suspenders: confirm the JitteredTicker site count is +# non-trivial (regression catch where someone replaced the bare +# tickers with no-op direct-fire shims). +jitter_hits=$(grep -cE 'NewJitteredTicker\(' "$TARGET" || true) +if [ "$jitter_hits" -lt 10 ]; then + echo "::warning::no-bare-newticker-in-scheduler: only $jitter_hits JitteredTicker sites in scheduler.go (expected ≥ 10 — Phase 6 baseline was 15)" +fi