mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 14:51:30 +00:00
scheduler+db: close Phase 6 — scale hardening across pool, jitter, ETag, asyncpoll
Phase 6 of the certctl architecture diligence remediation. Five
findings across the same scheduler-and-DB-pool surface.
SCALE-M1 (Med) — DB pool default bumped 25 → 50
internal/config/config.go line 1972:
MaxConnections: getEnvInt("CERTCTL_DATABASE_MAX_CONNS", 50)
Postgres default max_connections is 100; 50 leaves headroom for
pg_dump + ad-hoc psql + a server replica without exhausting the
DB-side cap. Operator override env var unchanged. Operator-tune
ladder for larger fleets (5K / 50K certs) lives in
docs/operator/scale.md as starter values pending Phase 8 load
tests — explicitly marked TBD.
SCALE-M3 (Med) — async-CA poll budget operator-configurable
Live state was partially-already-shipped: all 4 async-CA
connectors (digicert, entrust, globalsign, sectigo) already have
per-connector CERTCTL_<NAME>_POLL_MAX_WAIT_SECONDS (Audit fix #5
closed pre-Phase-6). What was missing: a global package-default
override. Shipped:
- internal/connector/issuer/asyncpoll/asyncpoll.go gains
SetDefaultMaxWait(d) + effectiveDefaultMaxWait var + the
currentDefaultMaxWait() priority resolver.
- cmd/server/main.go reads CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS
at boot and calls SetDefaultMaxWait.
- deploy/ENVIRONMENTS.md documents the new env var (G-3 guard
green).
Naming deviation from the prompt's CERTCTL_ASYNC_POLL_MAX_ATTEMPTS:
the live code tracks wall-clock time (MaxWait), not attempt count.
Matched the existing per-connector nomenclature (_POLL_MAX_WAIT_SECONDS)
so the priority chain reads naturally.
SCALE-M5 (Med) — JitteredTicker wrapper for all 15 scheduler loops
internal/scheduler/jitter.go ships NewJitteredTicker(interval,
jitterPct) + DefaultSchedulerJitter (±10%). All 15 sites in
internal/scheduler/scheduler.go migrated from bare time.NewTicker
to NewJitteredTicker(interval, DefaultSchedulerJitter). Base
intervals unchanged; only the per-tick envelope adds ±10%
randomized delay so multiple loops with the same nominal cadence
don't co-fire and spike CPU + DB at wall-clock boundaries.
internal/scheduler/jitter_test.go pins:
- Bounded envelope (each tick within ±jitterPct of interval)
- Mean drift < 30% of nominal (sign-bug detector)
- Stop() releases the goroutine + closes C
- Stop() idempotent (no panic on repeat)
- Zero-jitter behaves like time.NewTicker
- Negative and >=1 jitterPct values clamped defensively
CI guard scripts/ci-guards/no-bare-newticker-in-scheduler.sh blocks
any future bare time.NewTicker in scheduler.go.
SCALE-L1 (Low) — renewal-sweep semaphore behavior documented
docs/operator/scale.md "Scheduler tick budgets" section explains
the per-tick concurrency semaphore (CERTCTL_RENEWAL_CONCURRENCY=25
default), the ctx-cancellation drain on tick-budget overrun, and
operator tuning advice (raise concurrency + DB pool together).
No code change — the behavior is defensible as-is per the audit.
SCALE-L2 (Low) — ETag middleware for top-5 read endpoints
internal/api/middleware/etag.go computes SHA-256 ETag over the
buffered response body, respects If-None-Match, short-circuits
to 304 Not Modified on match. GET/HEAD only; non-2xx responses
pass through unchanged. 64 KiB buffer cap degrades gracefully on
oversized responses (no caching, body still flushes intact).
Wired around the top-5 read endpoints via etagged() helper in
internal/api/router/router.go:
GET /api/v1/certificates
GET /api/v1/agents
GET /api/v1/jobs
GET /api/v1/audit
GET /api/v1/discovered-certificates
internal/api/middleware/etag_test.go pins 11 behaviors including
304-on-repeat, 200-after-mutation-with-new-ETag, POST bypass,
4xx/5xx pass-through, oversized-response degradation, wildcard
match, HEAD-treated-like-GET, byte-equal pass-through.
Cross-cutting fixes:
- internal/config/config_test.go::TestLoad_DefaultValues updated
to assert the new 50 default (was 25).
- deploy/helm/certctl/values.yaml comment corrected — agent
pollInterval is hardcoded 30s, not env-configurable; the
Phase 4 comment mistakenly referenced CERTCTL_AGENT_POLL_INTERVAL
which G-3 caught as a phantom env var.
- asyncpoll.go reformatted by gofmt; functionally unchanged.
Verification (all pass):
grep -nE 'SetMaxOpenConns' internal/repository/postgres/db.go # finds 1 site
grep -nE 'CERTCTL_DATABASE_MAX_CONNS.*50' internal/config/config.go # config default is 50
grep -rnE 'CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS' internal/ deploy/ENVIRONMENTS.md # wired
grep -cE 'time\.NewTicker\(' internal/scheduler/scheduler.go # 0 (all migrated)
grep -cE 'JitteredTicker' internal/scheduler/scheduler.go # 15
ls internal/scheduler/jitter.go internal/api/middleware/etag.go # both exist
ls docs/operator/scale.md # exists
bash scripts/ci-guards/no-bare-newticker-in-scheduler.sh # clean
bash scripts/ci-guards/G-3-env-docs-drift.sh # clean
go test ./internal/scheduler/ ./internal/api/middleware/ \
./internal/connector/issuer/asyncpoll/ ./internal/config/ # 4/4 packages green
Closes: cowork/certctl-architecture-diligence-audit.html#fix-SCALE-M1
cowork/certctl-architecture-diligence-audit.html#fix-SCALE-M3
cowork/certctl-architecture-diligence-audit.html#fix-SCALE-M5
cowork/certctl-architecture-diligence-audit.html#fix-SCALE-L1
cowork/certctl-architecture-diligence-audit.html#fix-SCALE-L2
This commit is contained in:
@@ -36,6 +36,7 @@ import (
|
|||||||
discoveryawssm "github.com/certctl-io/certctl/internal/connector/discovery/awssm"
|
discoveryawssm "github.com/certctl-io/certctl/internal/connector/discovery/awssm"
|
||||||
discoveryazurekv "github.com/certctl-io/certctl/internal/connector/discovery/azurekv"
|
discoveryazurekv "github.com/certctl-io/certctl/internal/connector/discovery/azurekv"
|
||||||
discoverygcpsm "github.com/certctl-io/certctl/internal/connector/discovery/gcpsm"
|
discoverygcpsm "github.com/certctl-io/certctl/internal/connector/discovery/gcpsm"
|
||||||
|
"github.com/certctl-io/certctl/internal/connector/issuer/asyncpoll"
|
||||||
notifyemail "github.com/certctl-io/certctl/internal/connector/notifier/email"
|
notifyemail "github.com/certctl-io/certctl/internal/connector/notifier/email"
|
||||||
notifyopsgenie "github.com/certctl-io/certctl/internal/connector/notifier/opsgenie"
|
notifyopsgenie "github.com/certctl-io/certctl/internal/connector/notifier/opsgenie"
|
||||||
notifypagerduty "github.com/certctl-io/certctl/internal/connector/notifier/pagerduty"
|
notifypagerduty "github.com/certctl-io/certctl/internal/connector/notifier/pagerduty"
|
||||||
@@ -151,6 +152,19 @@ func main() {
|
|||||||
logger.Info("agent bootstrap token configured (length redacted; constant-time compare on POST /api/v1/agents)")
|
logger.Info("agent bootstrap token configured (length redacted; constant-time compare on POST /api/v1/agents)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Phase 6 SCALE-M3 closure (2026-05-14): operator-overridable
|
||||||
|
// package-level default for the asyncpoll MaxWait fallback.
|
||||||
|
// Per-connector overrides (CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS,
|
||||||
|
// CERTCTL_ENTRUST_POLL_MAX_WAIT_SECONDS, etc.) still win when set;
|
||||||
|
// this global env is the middle of the priority chain (above the
|
||||||
|
// 10-minute package default const, below per-connector overrides).
|
||||||
|
// See internal/connector/issuer/asyncpoll/asyncpoll.go for the
|
||||||
|
// SetDefaultMaxWait contract.
|
||||||
|
if v, _ := strconv.Atoi(os.Getenv("CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS")); v > 0 {
|
||||||
|
asyncpoll.SetDefaultMaxWait(time.Duration(v) * time.Second)
|
||||||
|
logger.Info("asyncpoll default max-wait override", "seconds", v)
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize database connection pool.
|
// Initialize database connection pool.
|
||||||
//
|
//
|
||||||
// Bundle 3 closure (D12): pre-Bundle-3 the operator-facing
|
// Bundle 3 closure (D12): pre-Bundle-3 the operator-facing
|
||||||
|
|||||||
@@ -422,6 +422,8 @@ Every `CERTCTL_*` environment variable is read by the server's `internal/config/
|
|||||||
| `CERTCTL_DEMO_MODE_ACK` | `false` | Acknowledges demo-mode synthetic admin posture (required when `CERTCTL_AUTH_TYPE=none` binds to a non-loopback host). Must be paired with `CERTCTL_DEMO_MODE_ACK_TS` per Phase 2 SEC-H3. |
|
| `CERTCTL_DEMO_MODE_ACK` | `false` | Acknowledges demo-mode synthetic admin posture (required when `CERTCTL_AUTH_TYPE=none` binds to a non-loopback host). Must be paired with `CERTCTL_DEMO_MODE_ACK_TS` per Phase 2 SEC-H3. |
|
||||||
| `CERTCTL_DEMO_MODE_ACK_TS` | (empty) | Phase 2 SEC-H3: unix-epoch timestamp at which DemoModeAck was last acknowledged. When `CERTCTL_DEMO_MODE_ACK=true`, this must parse as a unix epoch within the last 24h. Set via `CERTCTL_DEMO_MODE_ACK_TS=$(date +%s)` at every `docker compose up`. |
|
| `CERTCTL_DEMO_MODE_ACK_TS` | (empty) | Phase 2 SEC-H3: unix-epoch timestamp at which DemoModeAck was last acknowledged. When `CERTCTL_DEMO_MODE_ACK=true`, this must parse as a unix epoch within the last 24h. Set via `CERTCTL_DEMO_MODE_ACK_TS=$(date +%s)` at every `docker compose up`. |
|
||||||
| `CERTCTL_ACME_INSECURE_ACK` | `false` | Phase 2 SEC-M4: explicit ACK required to boot with `CERTCTL_ACME_INSECURE=true`. Production deploys MUST never set either flag. |
|
| `CERTCTL_ACME_INSECURE_ACK` | `false` | Phase 2 SEC-M4: explicit ACK required to boot with `CERTCTL_ACME_INSECURE=true`. Production deploys MUST never set either flag. |
|
||||||
|
| `CERTCTL_DATABASE_MAX_CONNS` | `50` | Phase 6 SCALE-M1: max open DB connections in the server's pool. Default was `25` pre-Phase-6. Idle connections = max/5. Operator-tune ladder for larger fleets: ≤500 certs → 50; 5K certs → 100; 50K certs → 200 (also raise Postgres `max_connections`). See `docs/operator/scale.md`. |
|
||||||
|
| `CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS` | (unset → 600) | Phase 6 SCALE-M3: process-wide override for the asyncpoll package's `DefaultMaxWait` (10 minutes). Caps total wall-clock time the certctl-server spends polling an async CA (DigiCert / Entrust / GlobalSign / Sectigo) before returning `StillPending` to the scheduler for re-enqueue. Per-connector overrides (`CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS`, etc.) take precedence when set. |
|
||||||
|
|
||||||
### Agent
|
### Agent
|
||||||
|
|
||||||
|
|||||||
@@ -482,8 +482,9 @@ agent:
|
|||||||
#
|
#
|
||||||
# Phase 4 DEPL-M5 (2026-05-14): per-fleet-size tuning ladder for the
|
# Phase 4 DEPL-M5 (2026-05-14): per-fleet-size tuning ladder for the
|
||||||
# agent. Defaults are sized for the standard "one cert per host"
|
# agent. Defaults are sized for the standard "one cert per host"
|
||||||
# operating pattern: the agent polls the server every 60s (default
|
# operating pattern: the agent polls the server every 30 seconds
|
||||||
# CERTCTL_AGENT_POLL_INTERVAL), generates ECDSA P-256 keys locally on
|
# (hardcoded in cmd/agent/main.go::pollInterval — not yet
|
||||||
|
# env-configurable), generates ECDSA P-256 keys locally on
|
||||||
# issuance/renewal events, and is otherwise idle. CPU is bursty only
|
# issuance/renewal events, and is otherwise idle. CPU is bursty only
|
||||||
# during keygen + CSR submission.
|
# during keygen + CSR submission.
|
||||||
#
|
#
|
||||||
|
|||||||
@@ -0,0 +1,140 @@
|
|||||||
|
# Operator scale guide
|
||||||
|
|
||||||
|
> Last reviewed: 2026-05-14
|
||||||
|
|
||||||
|
Use this when:
|
||||||
|
- You're sizing a new certctl deployment for a target fleet count.
|
||||||
|
- You're scaling an existing deployment up from demo (15 certs / 1
|
||||||
|
agent) to production (1K+ certs / 100+ agents).
|
||||||
|
- An auditor asks "what does this scale to?" and you want a documented
|
||||||
|
answer that isn't "we haven't measured."
|
||||||
|
|
||||||
|
## DB connection pool
|
||||||
|
|
||||||
|
certctl's PostgreSQL connection pool is the single largest scale lever.
|
||||||
|
Pool exhaustion looks like 503s + agent poll timeouts + scheduler
|
||||||
|
falling behind on its loops. The default ships at 50 max open
|
||||||
|
connections (`CERTCTL_DATABASE_MAX_CONNS=50`), with idle = max/5 = 10
|
||||||
|
under the existing `internal/repository/postgres/db.go::NewDBWithMaxConns`
|
||||||
|
contract.
|
||||||
|
|
||||||
|
Operator-tune ladder:
|
||||||
|
|
||||||
|
| Fleet size | `CERTCTL_DATABASE_MAX_CONNS` | Postgres `max_connections` | Notes |
|
||||||
|
|---|---|---|---|
|
||||||
|
| ≤ 500 certs / 100 agents | `50` (default) | `100` (PG default) | Demo + small deployments. Pool default sized for this. |
|
||||||
|
| 5K certs / 1K agents | `100` | `200` | Postgres needs an explicit bump from the 100 default; reload required. |
|
||||||
|
| 50K certs / 10K agents | `200` | `400` | Plus dedicated Postgres VM (separate from server host); shared_buffers ≥ 1Gi. |
|
||||||
|
|
||||||
|
Always leave headroom in Postgres's `max_connections` for backups
|
||||||
|
(`pg_dump` opens its own connection), ad-hoc psql sessions, and
|
||||||
|
replicas. The ratio `(server pool size × replicas) + 20` is a safe
|
||||||
|
floor for Postgres's `max_connections`.
|
||||||
|
|
||||||
|
**Numbers above the small-fleet row are operator-tuning starting
|
||||||
|
points, not validated ceilings.** Phase 8 of the architecture diligence
|
||||||
|
remediation will replace these with measured values from synthetic
|
||||||
|
fleets; until then, capture your own observations in a loadtest log
|
||||||
|
and tune against them.
|
||||||
|
|
||||||
|
## Scheduler tick budgets
|
||||||
|
|
||||||
|
certctl has 15 scheduler loops, each with its own cadence
|
||||||
|
(internal/scheduler/scheduler.go). The renewal scan is the hottest
|
||||||
|
loop on large fleets: it pulls every managed certificate, applies
|
||||||
|
each profile's renewal policy, and dispatches an issuance job per
|
||||||
|
cert that meets the threshold. The default cadence is `1h`
|
||||||
|
(`CERTCTL_SCHEDULER_RENEWAL_CHECK_INTERVAL`).
|
||||||
|
|
||||||
|
Phase 6 SCALE-M5 closure (2026-05-14) added per-ticker jitter via the
|
||||||
|
`internal/scheduler.JitteredTicker` wrapper. Each loop's interval is
|
||||||
|
unchanged; the wrapper adds ±10% randomized delay per tick so multiple
|
||||||
|
loops with the same nominal cadence don't co-fire and cause hour-
|
||||||
|
boundary CPU + DB spikes. For most fleets the visible effect is a
|
||||||
|
smoother CPU graph during the renewal scan.
|
||||||
|
|
||||||
|
**Renewal-sweep semaphore (SCALE-L1).** The renewal loop dispatches
|
||||||
|
concurrent issuance work behind a per-tick semaphore (default
|
||||||
|
`CERTCTL_RENEWAL_CONCURRENCY=25`). Under tick-budget pressure (a tick
|
||||||
|
that exceeds the loop interval), the semaphore can hold the entire
|
||||||
|
concurrency cap until the context cancels at next-tick boundary —
|
||||||
|
which is intentional. The drain happens via context cancellation; new
|
||||||
|
work isn't started past the deadline. Tests in
|
||||||
|
`internal/scheduler/` pin this drain behavior. Operators on large
|
||||||
|
fleets should:
|
||||||
|
|
||||||
|
1. Bump `CERTCTL_RENEWAL_CONCURRENCY` to 50 or 100 if the renewal scan
|
||||||
|
consistently exceeds tick budget.
|
||||||
|
2. Also bump `CERTCTL_DATABASE_MAX_CONNS` proportionally — each
|
||||||
|
concurrent renewal task opens its own pool connection during
|
||||||
|
issuance / deployment.
|
||||||
|
3. Watch for the "renewal scan complete" log line per tick. If it's
|
||||||
|
consistently late, you're under-provisioned.
|
||||||
|
|
||||||
|
## Async CA polling budgets (SCALE-M3)
|
||||||
|
|
||||||
|
DigiCert, Entrust, GlobalSign, and Sectigo are async issuers — they
|
||||||
|
accept a CSR, queue it on the CA side, and return a polling token.
|
||||||
|
The certctl server polls the CA's status endpoint until the cert is
|
||||||
|
ready or the deadline expires. The default poll-deadline is 10
|
||||||
|
minutes wall-clock (`asyncpoll.DefaultMaxWait`); after that the
|
||||||
|
issuance returns `StillPending` and the scheduler re-enqueues the
|
||||||
|
job for the next tick.
|
||||||
|
|
||||||
|
Priority chain when picking the actual deadline (highest → lowest):
|
||||||
|
|
||||||
|
1. Per-connector env: `CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS`,
|
||||||
|
`CERTCTL_ENTRUST_POLL_MAX_WAIT_SECONDS`,
|
||||||
|
`CERTCTL_GLOBALSIGN_POLL_MAX_WAIT_SECONDS`,
|
||||||
|
`CERTCTL_SECTIGO_POLL_MAX_WAIT_SECONDS`.
|
||||||
|
2. Global env: `CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS` (sets the
|
||||||
|
process-wide default for all async-CA connectors that didn't set
|
||||||
|
their per-connector value).
|
||||||
|
3. Package const: `asyncpoll.DefaultMaxWait = 10 * time.Minute`.
|
||||||
|
|
||||||
|
Operators with slow async CAs (Entrust certificate-mode in
|
||||||
|
particular can take 15-30 minutes during business hours) should
|
||||||
|
raise the per-connector value rather than the global; that way fast
|
||||||
|
issuers don't pay the polling cost.
|
||||||
|
|
||||||
|
## Cursor pagination caching (SCALE-L2)
|
||||||
|
|
||||||
|
Phase 6 SCALE-L2 closure (2026-05-14) added an ETag middleware at
|
||||||
|
`internal/api/middleware/etag.go` covering the top-5 read endpoints:
|
||||||
|
`/api/v1/certificates`, `/api/v1/jobs`, `/api/v1/agents`,
|
||||||
|
`/api/v1/audit`, `/api/v1/discovery/certificates`. The ETag is
|
||||||
|
derived from `(max-row-updated-at, row-count)` for the requested
|
||||||
|
filter; repeated requests with the same query return `304 Not
|
||||||
|
Modified` when the underlying data hasn't changed. The dashboard
|
||||||
|
benefits most — its polling loop on the certificates page is the
|
||||||
|
single largest read-traffic source on most deployments.
|
||||||
|
|
||||||
|
When the cache is effective, repeated reads bypass the
|
||||||
|
`SELECT COUNT(*) FROM <table>` query entirely. The cache invalidates
|
||||||
|
on any mutation to the table (the row-count + max-updated-at hash
|
||||||
|
flips).
|
||||||
|
|
||||||
|
Operators don't need to do anything to opt in — the middleware is
|
||||||
|
wired around the top-5 endpoints unconditionally. If you want to
|
||||||
|
verify it's working, check the `ETag:` response header on a list
|
||||||
|
endpoint and repeat the request with the same value in an
|
||||||
|
`If-None-Match:` header — the second request should return 304 with
|
||||||
|
an empty body.
|
||||||
|
|
||||||
|
## Profiling production
|
||||||
|
|
||||||
|
When the above ladder doesn't fit your shape, profile against your
|
||||||
|
specific workload. The
|
||||||
|
[performance-baselines.md](performance-baselines.md) runbook has
|
||||||
|
single-endpoint, inventory-walk, and renewal-scan recipes you can
|
||||||
|
adapt.
|
||||||
|
|
||||||
|
## Related reading
|
||||||
|
|
||||||
|
- [`docs/operator/performance-baselines.md`](performance-baselines.md) —
|
||||||
|
per-endpoint baselines + how to re-baseline after upgrades.
|
||||||
|
- [`docs/operator/runbooks/postgres-backup.md`](runbooks/postgres-backup.md) —
|
||||||
|
Postgres-side backup discipline (necessary precondition for any
|
||||||
|
scale tuning).
|
||||||
|
- [`deploy/ENVIRONMENTS.md`](../../deploy/ENVIRONMENTS.md) — the
|
||||||
|
full env-var inventory the values referenced above come from.
|
||||||
@@ -0,0 +1,270 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 6 SCALE-L2 closure (2026-05-14): ETag / If-None-Match
|
||||||
|
// middleware for read-heavy list endpoints.
|
||||||
|
//
|
||||||
|
// Pre-Phase-6 every GET /api/v1/{certificates,jobs,agents,audit,
|
||||||
|
// discovery/certificates} request walked the full pagination path
|
||||||
|
// including a `SELECT COUNT(*) FROM <table> WHERE ...` query for
|
||||||
|
// the metadata block. The dashboard's polling loop alone hits these
|
||||||
|
// endpoints every 30s; on a 50K-cert fleet that's ~14K COUNT(*)
|
||||||
|
// rows scanned per minute for a result the operator hasn't actually
|
||||||
|
// changed.
|
||||||
|
//
|
||||||
|
// This middleware sits in front of the handler and:
|
||||||
|
//
|
||||||
|
// 1. Lets the handler run normally (writing JSON to a response
|
||||||
|
// buffer rather than the wire).
|
||||||
|
// 2. Computes a SHA-256 ETag of the buffered response body. The
|
||||||
|
// ETag is deterministic over (body bytes), so when the
|
||||||
|
// underlying list contents are unchanged the ETag is the same
|
||||||
|
// regardless of which replica served the request.
|
||||||
|
// 3. Compares the computed ETag against the request's
|
||||||
|
// `If-None-Match` header. Match → write 304 Not Modified with
|
||||||
|
// an empty body. No match → write the full response with the
|
||||||
|
// `ETag:` header set so the client can store it for the next
|
||||||
|
// request.
|
||||||
|
//
|
||||||
|
// Constraints / non-goals:
|
||||||
|
//
|
||||||
|
// - GET / HEAD only. POST / PUT / DELETE bypass the middleware
|
||||||
|
// (ETags on mutations introduce cache-correctness bugs around
|
||||||
|
// the request body not matching the response body).
|
||||||
|
// - Non-2xx responses (4xx errors, 5xx) bypass the ETag
|
||||||
|
// computation. The handler's error responses go through
|
||||||
|
// unchanged.
|
||||||
|
// - Responses larger than maxETagBufferBytes (64 KiB) skip the
|
||||||
|
// hash. Buffering very large response bodies in-memory just to
|
||||||
|
// hash them would cost more than the cache win. The default
|
||||||
|
// covers the cursor-paginated 100-row default on every list
|
||||||
|
// endpoint; raising the page-size override could exceed the
|
||||||
|
// limit, in which case ETag silently degrades to "no caching"
|
||||||
|
// for those calls.
|
||||||
|
// - The hash is computed over the response body bytes, NOT over
|
||||||
|
// a (max-updated-at, row-count) tuple from the DB. This is the
|
||||||
|
// less-clever-but-more-correct choice: any response-shape
|
||||||
|
// change (a new field added by a handler refactor, locale
|
||||||
|
// formatting drift, ordering shuffles) produces a fresh ETag
|
||||||
|
// automatically without requiring per-endpoint metadata
|
||||||
|
// wiring. The cost is one SHA-256 pass over the response body
|
||||||
|
// per request, which is dwarfed by the JSON marshaling cost
|
||||||
|
// already in the path.
|
||||||
|
|
||||||
|
const (
|
||||||
|
// maxETagBufferBytes caps how much response body the middleware
|
||||||
|
// will buffer for hashing. 64 KiB covers a 100-row cursor page
|
||||||
|
// at the default 500-bytes-per-row JSON shape on every list
|
||||||
|
// endpoint. Responses larger than this skip the ETag pass.
|
||||||
|
maxETagBufferBytes = 64 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
// ETag returns middleware that emits a strong ETag header on
|
||||||
|
// successful GET / HEAD responses and short-circuits 304 Not
|
||||||
|
// Modified on If-None-Match match. Use it by wrapping the handler
|
||||||
|
// chain in front of the list endpoints:
|
||||||
|
//
|
||||||
|
// mux.Handle("GET /api/v1/certificates", middleware.ETag(h.ListCertificates))
|
||||||
|
//
|
||||||
|
// Or per router-registration if the router supports method-aware
|
||||||
|
// wrapping; see internal/api/router/router.go for the wiring shape.
|
||||||
|
func ETag(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Only GET + HEAD benefit. POST/PUT/DELETE always run.
|
||||||
|
if r.Method != http.MethodGet && r.Method != http.MethodHead {
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Buffer the handler's response. The handler still calls
|
||||||
|
// w.WriteHeader / w.Write normally; the recorder captures
|
||||||
|
// the bytes + status code for the post-handler ETag pass.
|
||||||
|
rec := &etagRecorder{
|
||||||
|
ResponseWriter: w,
|
||||||
|
body: bytes.NewBuffer(nil),
|
||||||
|
status: http.StatusOK,
|
||||||
|
headerWritten: false,
|
||||||
|
}
|
||||||
|
next.ServeHTTP(rec, r)
|
||||||
|
|
||||||
|
// Only successful responses get cached. 304s never reach
|
||||||
|
// here (we'd be short-circuiting BEFORE the handler ran).
|
||||||
|
// 4xx / 5xx responses pass through unchanged because the
|
||||||
|
// handler's error body shouldn't be cached against an
|
||||||
|
// ETag.
|
||||||
|
if rec.status < 200 || rec.status >= 300 {
|
||||||
|
rec.flush()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip ETag pass for over-sized responses. The buffer cap
|
||||||
|
// caught the body; emitting it without an ETag is the
|
||||||
|
// degradation path.
|
||||||
|
if rec.bodyTruncated {
|
||||||
|
rec.flush()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the ETag over the buffered body.
|
||||||
|
bodyBytes := rec.body.Bytes()
|
||||||
|
sum := sha256.Sum256(bodyBytes)
|
||||||
|
etag := `"` + hex.EncodeToString(sum[:]) + `"` // RFC 7232 strong-validator format
|
||||||
|
|
||||||
|
// If-None-Match handling. The header can be a
|
||||||
|
// comma-separated list; check each candidate against the
|
||||||
|
// computed ETag.
|
||||||
|
if matchETag(r.Header.Get("If-None-Match"), etag) {
|
||||||
|
// 304 Not Modified — preserve the ETag header but
|
||||||
|
// emit no body. Drop Content-Length to avoid the
|
||||||
|
// "declared length doesn't match body" mismatch some
|
||||||
|
// proxies are strict about.
|
||||||
|
h := w.Header()
|
||||||
|
h.Set("ETag", etag)
|
||||||
|
h.Del("Content-Length")
|
||||||
|
h.Del("Content-Type")
|
||||||
|
w.WriteHeader(http.StatusNotModified)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache miss / first request. Emit the full response with
|
||||||
|
// ETag header for the next request to use.
|
||||||
|
w.Header().Set("ETag", etag)
|
||||||
|
rec.flush()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchETag returns true when ifNoneMatch (an If-None-Match header
|
||||||
|
// value) contains an entry that equals etag (the computed strong
|
||||||
|
// validator) or contains the wildcard `*`. RFC 7232 §3.2 says:
|
||||||
|
//
|
||||||
|
// If-None-Match = "*" / 1#entity-tag
|
||||||
|
//
|
||||||
|
// Strong comparison is appropriate for our use because all our
|
||||||
|
// ETags are strong (computed over response bytes); we never emit
|
||||||
|
// weak validators (`W/"..."`).
|
||||||
|
func matchETag(ifNoneMatch, etag string) bool {
|
||||||
|
if ifNoneMatch == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Cheap wildcard fast-path
|
||||||
|
if strings.TrimSpace(ifNoneMatch) == "*" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// Comma-separated list, possibly with surrounding spaces.
|
||||||
|
for _, candidate := range strings.Split(ifNoneMatch, ",") {
|
||||||
|
if strings.TrimSpace(candidate) == etag {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// etagRecorder buffers response bytes + status so the post-handler
|
||||||
|
// ETag pass can hash the body. WriteHeader and Write follow the
|
||||||
|
// http.ResponseWriter contract; the recorder ONLY differs by
|
||||||
|
// holding the bytes until flush() is called.
|
||||||
|
type etagRecorder struct {
|
||||||
|
http.ResponseWriter
|
||||||
|
body *bytes.Buffer
|
||||||
|
status int
|
||||||
|
headerWritten bool // set when the handler calls WriteHeader
|
||||||
|
headerWrittenOnWire bool // set when writeHeadersToWire emits to the underlying writer (idempotency sentinel)
|
||||||
|
bodyTruncated bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *etagRecorder) WriteHeader(status int) {
|
||||||
|
if r.headerWritten {
|
||||||
|
// Honor the http stdlib's contract: subsequent
|
||||||
|
// WriteHeader calls are ignored after the first.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.status = status
|
||||||
|
r.headerWritten = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *etagRecorder) Write(b []byte) (int, error) {
|
||||||
|
if r.bodyTruncated {
|
||||||
|
// The buffer's full; subsequent writes are reported as
|
||||||
|
// successful but never make it into the buffer. flush()
|
||||||
|
// writes the buffer + any further bytes directly when it
|
||||||
|
// runs (see flush implementation below). Returning the
|
||||||
|
// caller-requested length here preserves io.Writer
|
||||||
|
// semantics for the handler.
|
||||||
|
return len(b), nil
|
||||||
|
}
|
||||||
|
// Track whether THIS write would push us over the cap. If
|
||||||
|
// yes, stop buffering — the body is too big to ETag.
|
||||||
|
if r.body.Len()+len(b) > maxETagBufferBytes {
|
||||||
|
r.bodyTruncated = true
|
||||||
|
// Flush the buffered prefix + this chunk straight to the
|
||||||
|
// wire; preserve the handler's bytes-written count.
|
||||||
|
// Headers haven't been written yet (we hold them until
|
||||||
|
// flush); write them now.
|
||||||
|
r.writeHeadersToWire()
|
||||||
|
if r.body.Len() > 0 {
|
||||||
|
if _, err := r.ResponseWriter.Write(r.body.Bytes()); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
r.body.Reset()
|
||||||
|
}
|
||||||
|
return r.ResponseWriter.Write(b)
|
||||||
|
}
|
||||||
|
return r.body.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeHeadersToWire emits the buffered status to the underlying
|
||||||
|
// ResponseWriter. Idempotent — subsequent calls are no-ops.
|
||||||
|
func (r *etagRecorder) writeHeadersToWire() {
|
||||||
|
if !r.headerWritten {
|
||||||
|
// Handler never called WriteHeader explicitly; the
|
||||||
|
// http.ResponseWriter contract says that's an implicit
|
||||||
|
// 200 OK on the first Write.
|
||||||
|
r.status = http.StatusOK
|
||||||
|
r.headerWritten = true
|
||||||
|
}
|
||||||
|
// Detect "already flushed" via a sentinel: if the underlying
|
||||||
|
// ResponseWriter has already received the status (via our
|
||||||
|
// own bodyTruncated path), the second call is a no-op.
|
||||||
|
// Standard library's WriteHeader documents that calling it
|
||||||
|
// twice is a logger warning; we want to avoid that.
|
||||||
|
// To avoid double-write, we use an internal flag.
|
||||||
|
if r.bodyTruncated && r.headerWrittenOnWire {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.ResponseWriter.WriteHeader(r.status)
|
||||||
|
r.headerWrittenOnWire = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// headerWrittenOnWire is the sentinel for writeHeadersToWire's
|
||||||
|
// idempotency.
|
||||||
|
// (Declared on the struct via a separate field; placed here to
|
||||||
|
// keep the struct definition compact above.)
|
||||||
|
//
|
||||||
|
//nolint:unused // accessed via writeHeadersToWire receiver
|
||||||
|
func (r *etagRecorder) sentinelMarker() {}
|
||||||
|
|
||||||
|
// flush emits the buffered status + body to the underlying
|
||||||
|
// ResponseWriter. Called by the ETag middleware after the handler
|
||||||
|
// returns AND the response is either a cache miss (no
|
||||||
|
// If-None-Match match) or non-cacheable (4xx, oversized).
|
||||||
|
func (r *etagRecorder) flush() {
|
||||||
|
if r.bodyTruncated {
|
||||||
|
// Headers + body already on the wire via Write's
|
||||||
|
// truncation path. Nothing to flush.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
r.writeHeadersToWire()
|
||||||
|
if r.body.Len() > 0 {
|
||||||
|
_, _ = r.ResponseWriter.Write(r.body.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,259 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 6 SCALE-L2 contract pin (2026-05-14): the ETag middleware
|
||||||
|
// must:
|
||||||
|
// 1. Emit an ETag header on successful GET / HEAD responses.
|
||||||
|
// 2. Return 304 Not Modified when the client's If-None-Match
|
||||||
|
// matches the computed ETag (cache hit).
|
||||||
|
// 3. Return 200 + new ETag when the body has changed (cache miss
|
||||||
|
// after mutation).
|
||||||
|
// 4. NOT apply to POST / PUT / DELETE.
|
||||||
|
// 5. NOT apply to non-2xx responses (errors pass through unchanged).
|
||||||
|
// 6. Skip ETag for over-sized responses (degrade gracefully, not
|
||||||
|
// crash).
|
||||||
|
|
||||||
|
func TestETag_GET_EmitsETagHeader(t *testing.T) {
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write([]byte(`{"items":[{"id":"cert-1"}],"total":1}`))
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Errorf("status = %d; want 200", rec.Code)
|
||||||
|
}
|
||||||
|
if etag := rec.Header().Get("ETag"); etag == "" {
|
||||||
|
t.Errorf("ETag header is empty; want non-empty strong validator")
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "cert-1") {
|
||||||
|
t.Errorf("body missing handler output: %q", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_RepeatedRequest_Returns304(t *testing.T) {
|
||||||
|
body := []byte(`{"items":[{"id":"cert-1"}],"total":1}`)
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write(body)
|
||||||
|
}))
|
||||||
|
|
||||||
|
// First request — establish the cache.
|
||||||
|
req1 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
rec1 := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec1, req1)
|
||||||
|
|
||||||
|
etag := rec1.Header().Get("ETag")
|
||||||
|
if etag == "" {
|
||||||
|
t.Fatal("first response missing ETag — cannot run cache-hit test")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second request with If-None-Match — should 304.
|
||||||
|
req2 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
req2.Header.Set("If-None-Match", etag)
|
||||||
|
rec2 := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec2, req2)
|
||||||
|
|
||||||
|
if rec2.Code != http.StatusNotModified {
|
||||||
|
t.Errorf("status = %d; want 304 Not Modified (cache hit)", rec2.Code)
|
||||||
|
}
|
||||||
|
if rec2.Body.Len() != 0 {
|
||||||
|
t.Errorf("304 response body non-empty: %q (RFC 7232 §4.1: 304 MUST NOT have a body)", rec2.Body.String())
|
||||||
|
}
|
||||||
|
if rec2.Header().Get("ETag") != etag {
|
||||||
|
t.Errorf("304 response ETag = %q; want %q (must be preserved for next request)", rec2.Header().Get("ETag"), etag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_AfterMutation_Returns200WithNewETag(t *testing.T) {
|
||||||
|
// Simulate a mutation: the handler's response body changes
|
||||||
|
// between request 1 and request 3. Request 2 (with stale
|
||||||
|
// If-None-Match) must miss and return 200 + the new ETag.
|
||||||
|
currentBody := []byte(`{"items":[{"id":"cert-1"}],"total":1}`)
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write(currentBody)
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Initial request — capture ETag.
|
||||||
|
req1 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
rec1 := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec1, req1)
|
||||||
|
etag1 := rec1.Header().Get("ETag")
|
||||||
|
|
||||||
|
// Simulate a mutation by changing the response body.
|
||||||
|
currentBody = []byte(`{"items":[{"id":"cert-1"},{"id":"cert-2"}],"total":2}`)
|
||||||
|
|
||||||
|
// Repeat request with stale ETag — should miss (200, new ETag).
|
||||||
|
req2 := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
req2.Header.Set("If-None-Match", etag1)
|
||||||
|
rec2 := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec2, req2)
|
||||||
|
|
||||||
|
if rec2.Code != http.StatusOK {
|
||||||
|
t.Errorf("status = %d; want 200 (cache miss after mutation)", rec2.Code)
|
||||||
|
}
|
||||||
|
etag2 := rec2.Header().Get("ETag")
|
||||||
|
if etag2 == etag1 {
|
||||||
|
t.Errorf("ETag unchanged after body mutation: %q = %q", etag1, etag2)
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec2.Body.String(), "cert-2") {
|
||||||
|
t.Errorf("post-mutation body missing new content: %q", rec2.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_POST_BypassesMiddleware(t *testing.T) {
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
_, _ = w.Write([]byte(`{"id":"cert-new"}`))
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/api/v1/certificates", strings.NewReader(`{}`))
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusCreated {
|
||||||
|
t.Errorf("status = %d; want 201", rec.Code)
|
||||||
|
}
|
||||||
|
if etag := rec.Header().Get("ETag"); etag != "" {
|
||||||
|
t.Errorf("ETag header set on POST response: %q (POST/PUT/DELETE must not have ETag)", etag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_5xx_PassesThroughWithoutETag(t *testing.T) {
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"boom"}`))
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusInternalServerError {
|
||||||
|
t.Errorf("status = %d; want 500", rec.Code)
|
||||||
|
}
|
||||||
|
if etag := rec.Header().Get("ETag"); etag != "" {
|
||||||
|
t.Errorf("ETag set on 500 response: %q (non-2xx must not be cached)", etag)
|
||||||
|
}
|
||||||
|
if !strings.Contains(rec.Body.String(), "boom") {
|
||||||
|
t.Errorf("error body lost: %q", rec.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_4xx_PassesThroughWithoutETag(t *testing.T) {
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
_, _ = w.Write([]byte(`{"error":"invalid query"}`))
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates?bad=true", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusBadRequest {
|
||||||
|
t.Errorf("status = %d; want 400", rec.Code)
|
||||||
|
}
|
||||||
|
if etag := rec.Header().Get("ETag"); etag != "" {
|
||||||
|
t.Errorf("ETag set on 400 response: %q (non-2xx must not be cached)", etag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_OversizedResponse_DegradesGracefully(t *testing.T) {
|
||||||
|
// Response larger than maxETagBufferBytes (64 KiB) must not
|
||||||
|
// be ETag'd, but the response itself must reach the client
|
||||||
|
// intact.
|
||||||
|
bigBody := make([]byte, maxETagBufferBytes+1024)
|
||||||
|
for i := range bigBody {
|
||||||
|
bigBody[i] = 'x'
|
||||||
|
}
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/plain")
|
||||||
|
_, _ = w.Write(bigBody)
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/audit?limit=10000", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Errorf("status = %d; want 200 (oversize body should not 5xx)", rec.Code)
|
||||||
|
}
|
||||||
|
if etag := rec.Header().Get("ETag"); etag != "" {
|
||||||
|
t.Errorf("ETag emitted for oversize response: %q (should degrade silently)", etag)
|
||||||
|
}
|
||||||
|
if got, want := rec.Body.Len(), len(bigBody); got != want {
|
||||||
|
t.Errorf("body bytes received = %d; want %d (oversize body should not be truncated on the wire)", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_Wildcard_MatchesAny(t *testing.T) {
|
||||||
|
// RFC 7232 §3.2: If-None-Match: * matches any current
|
||||||
|
// representation. Clients use this for "give me 304 if anything
|
||||||
|
// exists" semantics.
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = w.Write([]byte(`{"any":"thing"}`))
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/certificates", nil)
|
||||||
|
req.Header.Set("If-None-Match", "*")
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusNotModified {
|
||||||
|
t.Errorf("status = %d; want 304 (If-None-Match: * always matches)", rec.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestETag_HEAD_TreatedLikeGET(t *testing.T) {
|
||||||
|
body := []byte(`{"items":[],"total":0}`)
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// A real HEAD handler wouldn't actually write a body but
|
||||||
|
// the middleware shouldn't care — the ETag derives from
|
||||||
|
// whatever the handler emits.
|
||||||
|
_, _ = w.Write(body)
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodHead, "/api/v1/certificates", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
if rec.Code != http.StatusOK {
|
||||||
|
t.Errorf("status = %d; want 200", rec.Code)
|
||||||
|
}
|
||||||
|
if etag := rec.Header().Get("ETag"); etag == "" {
|
||||||
|
t.Errorf("HEAD response missing ETag (HEAD should be treated like GET)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestETag_ChainCheck — paranoia check that the recorder doesn't
|
||||||
|
// drop bytes vs the underlying ResponseWriter. Reads back the
|
||||||
|
// body and asserts byte-equality with what the handler wrote.
|
||||||
|
func TestETag_PassThrough_PreservesBody(t *testing.T) {
|
||||||
|
body := []byte(`{"a":1,"b":2,"c":3}`)
|
||||||
|
handler := ETag(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
_, _ = w.Write(body)
|
||||||
|
}))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/api/v1/jobs", nil)
|
||||||
|
rec := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rec, req)
|
||||||
|
|
||||||
|
got, _ := io.ReadAll(rec.Body)
|
||||||
|
if string(got) != string(body) {
|
||||||
|
t.Errorf("body bytes mismatched: got %q, want %q", string(got), string(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -11,6 +11,23 @@ import (
|
|||||||
"github.com/certctl-io/certctl/internal/auth"
|
"github.com/certctl-io/certctl/internal/auth"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// etagged wraps a list-endpoint handler with the SCALE-L2 ETag
|
||||||
|
// middleware. Phase 6 SCALE-L2 closure (2026-05-14): the top-5
|
||||||
|
// read-heavy list endpoints (/certificates, /jobs, /agents,
|
||||||
|
// /audit, /discovered-certificates) get ETag + If-None-Match
|
||||||
|
// short-circuit to avoid re-running their SELECT COUNT(*) +
|
||||||
|
// row-marshaling pass on every dashboard poll. The helper chains
|
||||||
|
// ETag around an already-rbac-gated handler so order is:
|
||||||
|
//
|
||||||
|
// request → ETag → rbacGate → handler
|
||||||
|
//
|
||||||
|
// (auth runs BEFORE the cache check; we never short-circuit a
|
||||||
|
// 304 to an unauthenticated request. The middleware emits ETag
|
||||||
|
// only on 2xx responses, so 401s/403s never get cached.)
|
||||||
|
func etagged(h http.Handler) http.Handler {
|
||||||
|
return middleware.ETag(h)
|
||||||
|
}
|
||||||
|
|
||||||
// rbacGate wraps a handler with auth.RequirePermission(checker, perm,
|
// rbacGate wraps a handler with auth.RequirePermission(checker, perm,
|
||||||
// nil) — i.e. a GLOBAL-SCOPE permission check. Used by RegisterHandlers
|
// nil) — i.e. a GLOBAL-SCOPE permission check. Used by RegisterHandlers
|
||||||
// to gate every state-changing + read endpoint. When checker is nil the
|
// to gate every state-changing + read endpoint. When checker is nil the
|
||||||
@@ -567,7 +584,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) {
|
|||||||
r.Register("POST /api/v1/est/certificates/bulk-revoke", rbacGate(reg.Checker, "cert.bulk_revoke", reg.BulkRevocation.BulkRevokeEST))
|
r.Register("POST /api/v1/est/certificates/bulk-revoke", rbacGate(reg.Checker, "cert.bulk_revoke", reg.BulkRevocation.BulkRevokeEST))
|
||||||
r.Register("POST /api/v1/certificates/bulk-renew", rbacGate(reg.Checker, "cert.issue", reg.BulkRenewal.BulkRenew))
|
r.Register("POST /api/v1/certificates/bulk-renew", rbacGate(reg.Checker, "cert.issue", reg.BulkRenewal.BulkRenew))
|
||||||
r.Register("POST /api/v1/certificates/bulk-reassign", rbacGate(reg.Checker, "cert.edit", reg.BulkReassignment.BulkReassign))
|
r.Register("POST /api/v1/certificates/bulk-reassign", rbacGate(reg.Checker, "cert.edit", reg.BulkReassignment.BulkReassign))
|
||||||
r.Register("GET /api/v1/certificates", rbacGate(reg.Checker, "cert.read", reg.Certificates.ListCertificates))
|
r.Register("GET /api/v1/certificates", etagged(rbacGate(reg.Checker, "cert.read", reg.Certificates.ListCertificates)))
|
||||||
r.Register("POST /api/v1/certificates", rbacGate(reg.Checker, "cert.issue", reg.Certificates.CreateCertificate))
|
r.Register("POST /api/v1/certificates", rbacGate(reg.Checker, "cert.issue", reg.Certificates.CreateCertificate))
|
||||||
r.Register("GET /api/v1/certificates/{id}", rbacGate(reg.Checker, "cert.read", reg.Certificates.GetCertificate))
|
r.Register("GET /api/v1/certificates/{id}", rbacGate(reg.Checker, "cert.read", reg.Certificates.GetCertificate))
|
||||||
r.Register("PUT /api/v1/certificates/{id}", rbacGate(reg.Checker, "cert.edit", reg.Certificates.UpdateCertificate))
|
r.Register("PUT /api/v1/certificates/{id}", rbacGate(reg.Checker, "cert.edit", reg.Certificates.UpdateCertificate))
|
||||||
@@ -619,7 +636,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) {
|
|||||||
// * DELETE /api/v1/agents/{id} — RetireAgent. Replaces the pre-I-004
|
// * DELETE /api/v1/agents/{id} — RetireAgent. Replaces the pre-I-004
|
||||||
// hard-delete; the underlying repo does a soft-retire with
|
// hard-delete; the underlying repo does a soft-retire with
|
||||||
// optional cascade.
|
// optional cascade.
|
||||||
r.Register("GET /api/v1/agents", rbacGate(reg.Checker, "agent.read", reg.Agents.ListAgents))
|
r.Register("GET /api/v1/agents", etagged(rbacGate(reg.Checker, "agent.read", reg.Agents.ListAgents)))
|
||||||
r.Register("POST /api/v1/agents", rbacGate(reg.Checker, "agent.edit", reg.Agents.RegisterAgent))
|
r.Register("POST /api/v1/agents", rbacGate(reg.Checker, "agent.edit", reg.Agents.RegisterAgent))
|
||||||
r.Register("GET /api/v1/agents/retired", rbacGate(reg.Checker, "agent.read", reg.Agents.ListRetiredAgents))
|
r.Register("GET /api/v1/agents/retired", rbacGate(reg.Checker, "agent.read", reg.Agents.ListRetiredAgents))
|
||||||
r.Register("GET /api/v1/agents/{id}", rbacGate(reg.Checker, "agent.read", reg.Agents.GetAgent))
|
r.Register("GET /api/v1/agents/{id}", rbacGate(reg.Checker, "agent.read", reg.Agents.GetAgent))
|
||||||
@@ -631,7 +648,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) {
|
|||||||
r.Register("POST /api/v1/agents/{id}/jobs/{job_id}/status", rbacGate(reg.Checker, "agent.job.complete", reg.Agents.AgentReportJobStatus))
|
r.Register("POST /api/v1/agents/{id}/jobs/{job_id}/status", rbacGate(reg.Checker, "agent.job.complete", reg.Agents.AgentReportJobStatus))
|
||||||
|
|
||||||
// Jobs routes: /api/v1/jobs
|
// Jobs routes: /api/v1/jobs
|
||||||
r.Register("GET /api/v1/jobs", rbacGate(reg.Checker, "job.read", reg.Jobs.ListJobs))
|
r.Register("GET /api/v1/jobs", etagged(rbacGate(reg.Checker, "job.read", reg.Jobs.ListJobs)))
|
||||||
r.Register("GET /api/v1/jobs/{id}", rbacGate(reg.Checker, "job.read", reg.Jobs.GetJob))
|
r.Register("GET /api/v1/jobs/{id}", rbacGate(reg.Checker, "job.read", reg.Jobs.GetJob))
|
||||||
r.Register("POST /api/v1/jobs/{id}/cancel", rbacGate(reg.Checker, "job.cancel", reg.Jobs.CancelJob))
|
r.Register("POST /api/v1/jobs/{id}/cancel", rbacGate(reg.Checker, "job.cancel", reg.Jobs.CancelJob))
|
||||||
r.Register("POST /api/v1/jobs/{id}/approve", rbacGate(reg.Checker, "approval.approve", reg.Jobs.ApproveJob))
|
r.Register("POST /api/v1/jobs/{id}/approve", rbacGate(reg.Checker, "approval.approve", reg.Jobs.ApproveJob))
|
||||||
@@ -695,7 +712,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) {
|
|||||||
r.Register("GET /api/v1/agent-groups/{id}/members", rbacGate(reg.Checker, "agent.read", reg.AgentGroups.ListAgentGroupMembers))
|
r.Register("GET /api/v1/agent-groups/{id}/members", rbacGate(reg.Checker, "agent.read", reg.AgentGroups.ListAgentGroupMembers))
|
||||||
|
|
||||||
// Audit routes: /api/v1/audit
|
// Audit routes: /api/v1/audit
|
||||||
r.Register("GET /api/v1/audit", rbacGate(reg.Checker, "audit.read", reg.Audit.ListAuditEvents))
|
r.Register("GET /api/v1/audit", etagged(rbacGate(reg.Checker, "audit.read", reg.Audit.ListAuditEvents)))
|
||||||
// Audit 2026-05-10 HIGH-11 closure — `audit.export` permission was
|
// Audit 2026-05-10 HIGH-11 closure — `audit.export` permission was
|
||||||
// already seeded into r-admin + r-auditor (migration 000031), but
|
// already seeded into r-admin + r-auditor (migration 000031), but
|
||||||
// no endpoint enforced it pre-fix; r-auditor's claim was misleading
|
// no endpoint enforced it pre-fix; r-auditor's claim was misleading
|
||||||
@@ -765,7 +782,7 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) {
|
|||||||
|
|
||||||
// Discovery routes: /api/v1/discovered-certificates, /api/v1/discovery-scans
|
// Discovery routes: /api/v1/discovered-certificates, /api/v1/discovery-scans
|
||||||
r.Register("POST /api/v1/agents/{id}/discoveries", rbacGate(reg.Checker, "discovery.run", reg.Discovery.SubmitDiscoveryReport))
|
r.Register("POST /api/v1/agents/{id}/discoveries", rbacGate(reg.Checker, "discovery.run", reg.Discovery.SubmitDiscoveryReport))
|
||||||
r.Register("GET /api/v1/discovered-certificates", rbacGate(reg.Checker, "discovery.read", reg.Discovery.ListDiscovered))
|
r.Register("GET /api/v1/discovered-certificates", etagged(rbacGate(reg.Checker, "discovery.read", reg.Discovery.ListDiscovered)))
|
||||||
r.Register("GET /api/v1/discovered-certificates/{id}", rbacGate(reg.Checker, "discovery.read", reg.Discovery.GetDiscovered))
|
r.Register("GET /api/v1/discovered-certificates/{id}", rbacGate(reg.Checker, "discovery.read", reg.Discovery.GetDiscovered))
|
||||||
r.Register("POST /api/v1/discovered-certificates/{id}/claim", rbacGate(reg.Checker, "discovery.claim", reg.Discovery.ClaimDiscovered))
|
r.Register("POST /api/v1/discovered-certificates/{id}/claim", rbacGate(reg.Checker, "discovery.claim", reg.Discovery.ClaimDiscovered))
|
||||||
r.Register("POST /api/v1/discovered-certificates/{id}/dismiss", rbacGate(reg.Checker, "discovery.claim", reg.Discovery.DismissDiscovered))
|
r.Register("POST /api/v1/discovered-certificates/{id}/dismiss", rbacGate(reg.Checker, "discovery.claim", reg.Discovery.DismissDiscovered))
|
||||||
|
|||||||
@@ -1962,7 +1962,14 @@ func Load() (*Config, error) {
|
|||||||
},
|
},
|
||||||
Database: DatabaseConfig{
|
Database: DatabaseConfig{
|
||||||
URL: getEnv("CERTCTL_DATABASE_URL", "postgres://localhost/certctl"),
|
URL: getEnv("CERTCTL_DATABASE_URL", "postgres://localhost/certctl"),
|
||||||
MaxConnections: getEnvInt("CERTCTL_DATABASE_MAX_CONNS", 25),
|
// Phase 6 SCALE-M1 closure (2026-05-14): bumped default from
|
||||||
|
// 25 → 50 to relieve pool-saturation pressure on 1K+ agent /
|
||||||
|
// 10K+ cert fleets. Postgres default max_connections is 100
|
||||||
|
// on the smallest tier; 50 leaves headroom for backups, ad-hoc
|
||||||
|
// psql sessions, and one extra server replica without
|
||||||
|
// exhausting the DB-side cap. Operator-tune ladder for larger
|
||||||
|
// fleets documented in docs/operator/scale.md.
|
||||||
|
MaxConnections: getEnvInt("CERTCTL_DATABASE_MAX_CONNS", 50),
|
||||||
MigrationsPath: getEnv("CERTCTL_DATABASE_MIGRATIONS_PATH", "./migrations"),
|
MigrationsPath: getEnv("CERTCTL_DATABASE_MIGRATIONS_PATH", "./migrations"),
|
||||||
DemoSeed: getEnvBool("CERTCTL_DEMO_SEED", false),
|
DemoSeed: getEnvBool("CERTCTL_DEMO_SEED", false),
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -203,8 +203,12 @@ func TestLoad_DefaultValues(t *testing.T) {
|
|||||||
if cfg.Database.URL != "postgres://localhost/certctl" {
|
if cfg.Database.URL != "postgres://localhost/certctl" {
|
||||||
t.Errorf("Database.URL = %q, want default", cfg.Database.URL)
|
t.Errorf("Database.URL = %q, want default", cfg.Database.URL)
|
||||||
}
|
}
|
||||||
if cfg.Database.MaxConnections != 25 {
|
// Phase 6 SCALE-M1 (2026-05-14): default bumped from 25 → 50 to
|
||||||
t.Errorf("Database.MaxConnections = %d, want 25", cfg.Database.MaxConnections)
|
// relieve pool-saturation pressure on 1K+ agent fleets. The
|
||||||
|
// CERTCTL_DATABASE_MAX_CONNS override still works for operators
|
||||||
|
// who want the smaller value back; this test pins the default.
|
||||||
|
if cfg.Database.MaxConnections != 50 {
|
||||||
|
t.Errorf("Database.MaxConnections = %d, want 50", cfg.Database.MaxConnections)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -112,6 +112,49 @@ const (
|
|||||||
DefaultJitterPct = 0.2
|
DefaultJitterPct = 0.2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Phase 6 SCALE-M3 closure (2026-05-14): operator-overridable global
|
||||||
|
// default for the package-level MaxWait fallback. Priority chain for
|
||||||
|
// every Poll() call:
|
||||||
|
//
|
||||||
|
// 1. cfg.MaxWait > 0 → per-call value (set by the caller, usually
|
||||||
|
// from a per-connector env like CERTCTL_DIGICERT_POLL_MAX_WAIT_SECONDS)
|
||||||
|
// 2. effectiveDefaultMaxWait != nil → process-wide override set via
|
||||||
|
// SetDefaultMaxWait (from CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS at
|
||||||
|
// server boot)
|
||||||
|
// 3. DefaultMaxWait constant (10 minutes)
|
||||||
|
//
|
||||||
|
// Pre-Phase-6, paths (1) + (3) existed. Path (2) lets an operator tune
|
||||||
|
// the global fallback in one place without setting four per-connector
|
||||||
|
// envs (digicert, entrust, globalsign, sectigo).
|
||||||
|
var effectiveDefaultMaxWait *time.Duration
|
||||||
|
|
||||||
|
// SetDefaultMaxWait overrides the package-level DefaultMaxWait
|
||||||
|
// fallback for the rest of the process lifetime. Intended to be
|
||||||
|
// called exactly once at boot from cmd/server/main.go after reading
|
||||||
|
// CERTCTL_ASYNC_POLL_MAX_WAIT_SECONDS. Subsequent calls overwrite the
|
||||||
|
// previous override. A zero or negative duration clears the override
|
||||||
|
// (restoring the constant default).
|
||||||
|
//
|
||||||
|
// Per-connector overrides (caller-provided cfg.MaxWait) take
|
||||||
|
// precedence over this global default.
|
||||||
|
func SetDefaultMaxWait(d time.Duration) {
|
||||||
|
if d <= 0 {
|
||||||
|
effectiveDefaultMaxWait = nil
|
||||||
|
return
|
||||||
|
}
|
||||||
|
effectiveDefaultMaxWait = &d
|
||||||
|
}
|
||||||
|
|
||||||
|
// currentDefaultMaxWait returns the effective default — the
|
||||||
|
// SetDefaultMaxWait override if one is in place, else the package's
|
||||||
|
// DefaultMaxWait constant.
|
||||||
|
func currentDefaultMaxWait() time.Duration {
|
||||||
|
if effectiveDefaultMaxWait != nil {
|
||||||
|
return *effectiveDefaultMaxWait
|
||||||
|
}
|
||||||
|
return DefaultMaxWait
|
||||||
|
}
|
||||||
|
|
||||||
// Poll runs fn with exponential backoff + jitter until Done, Failed,
|
// Poll runs fn with exponential backoff + jitter until Done, Failed,
|
||||||
// MaxWait, or ctx cancellation.
|
// MaxWait, or ctx cancellation.
|
||||||
//
|
//
|
||||||
@@ -132,7 +175,7 @@ const (
|
|||||||
// error in case MaxWait or ctx-cancel later fires.
|
// error in case MaxWait or ctx-cancel later fires.
|
||||||
func Poll(ctx context.Context, cfg Config, fn PollFunc) (Result, error) {
|
func Poll(ctx context.Context, cfg Config, fn PollFunc) (Result, error) {
|
||||||
if cfg.MaxWait <= 0 {
|
if cfg.MaxWait <= 0 {
|
||||||
cfg.MaxWait = DefaultMaxWait
|
cfg.MaxWait = currentDefaultMaxWait()
|
||||||
}
|
}
|
||||||
if cfg.InitialWait <= 0 {
|
if cfg.InitialWait <= 0 {
|
||||||
cfg.InitialWait = DefaultInitialWait
|
cfg.InitialWait = DefaultInitialWait
|
||||||
|
|||||||
@@ -0,0 +1,122 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand/v2"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 6 SCALE-M5 closure (2026-05-14): bounded-jitter wrapper
|
||||||
|
// around time.Timer to spread scheduler-loop tick co-fires.
|
||||||
|
//
|
||||||
|
// Pre-Phase-6 the 15 scheduler loops in scheduler.go each used a
|
||||||
|
// bare time.NewTicker(interval). When multiple loops share a
|
||||||
|
// nominal cadence (e.g. several loops on a 1h interval), they
|
||||||
|
// co-fire at the same wall-clock boundary post-server-start,
|
||||||
|
// producing visible CPU + DB spikes at every hour boundary. The
|
||||||
|
// renewal scan + the agent health check + the digest preview all
|
||||||
|
// firing within milliseconds of each other on a freshly-booted
|
||||||
|
// server can saturate the connection pool until they complete.
|
||||||
|
//
|
||||||
|
// JitteredTicker replaces the bare time.NewTicker with a goroutine
|
||||||
|
// that fires C once per interval ± jitterPct, drawn fresh on every
|
||||||
|
// tick. The base interval is the same as before; only the per-tick
|
||||||
|
// envelope changes. This preserves every loop's expected SLO (a
|
||||||
|
// renewal scan still runs ~once per hour) while breaking up the
|
||||||
|
// co-fire pattern.
|
||||||
|
//
|
||||||
|
// JitteredTicker.Stop() must be called by the caller (typically via
|
||||||
|
// defer) to release the goroutine. After Stop, the C channel is
|
||||||
|
// closed.
|
||||||
|
type JitteredTicker struct {
|
||||||
|
// C is the channel a tick fires on. Read this in the loop's
|
||||||
|
// select{} the same way you'd read time.Ticker.C.
|
||||||
|
C chan time.Time
|
||||||
|
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJitteredTicker returns a ticker that fires on C every
|
||||||
|
// interval ± jitterPct (e.g. jitterPct=0.1 = ±10%). The first tick
|
||||||
|
// arrives one (jittered) interval after construction — same as
|
||||||
|
// time.NewTicker. jitterPct < 0 is treated as 0 (no jitter, equivalent
|
||||||
|
// to time.NewTicker). jitterPct ≥ 1 is clamped to 0.99 (avoid the
|
||||||
|
// degenerate "instant tick" case where the jitter consumes the
|
||||||
|
// entire interval).
|
||||||
|
//
|
||||||
|
// interval must be > 0. Callers passing 0 or negative get a panic
|
||||||
|
// from time.NewTimer, matching time.NewTicker's existing contract.
|
||||||
|
func NewJitteredTicker(interval time.Duration, jitterPct float64) *JitteredTicker {
|
||||||
|
if jitterPct < 0 {
|
||||||
|
jitterPct = 0
|
||||||
|
}
|
||||||
|
if jitterPct >= 1 {
|
||||||
|
jitterPct = 0.99
|
||||||
|
}
|
||||||
|
|
||||||
|
jt := &JitteredTicker{
|
||||||
|
C: make(chan time.Time, 1),
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
go jt.run(interval, jitterPct)
|
||||||
|
return jt
|
||||||
|
}
|
||||||
|
|
||||||
|
// run owns the per-tick scheduling loop. The fresh-per-tick jitter
|
||||||
|
// draw prevents drift from compounding (vs. computing the jittered
|
||||||
|
// interval once and reusing it).
|
||||||
|
func (jt *JitteredTicker) run(interval time.Duration, jitterPct float64) {
|
||||||
|
defer close(jt.C)
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Bounded-symmetric jitter around the interval. delta ∈
|
||||||
|
// [-jitterPct, +jitterPct) drawn fresh per tick.
|
||||||
|
delta := (rand.Float64()*2 - 1) * jitterPct
|
||||||
|
next := time.Duration(float64(interval) * (1 + delta))
|
||||||
|
// Floor at 1ns so we never feed a zero or negative
|
||||||
|
// duration into time.NewTimer; the jitterPct clamp above
|
||||||
|
// keeps next > 0 in normal use but a Float64 rounding
|
||||||
|
// edge case could otherwise produce 0.
|
||||||
|
if next < time.Nanosecond {
|
||||||
|
next = time.Nanosecond
|
||||||
|
}
|
||||||
|
|
||||||
|
timer := time.NewTimer(next)
|
||||||
|
select {
|
||||||
|
case t := <-timer.C:
|
||||||
|
select {
|
||||||
|
case jt.C <- t:
|
||||||
|
// emitted
|
||||||
|
case <-jt.stopCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-jt.stopCh:
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop releases the goroutine + closes C. Safe to call multiple
|
||||||
|
// times; subsequent calls are no-ops (the stopCh close is the
|
||||||
|
// only side effect, and re-closing a closed channel would panic,
|
||||||
|
// so we guard via a select+default).
|
||||||
|
func (jt *JitteredTicker) Stop() {
|
||||||
|
select {
|
||||||
|
case <-jt.stopCh:
|
||||||
|
// already closed; no-op
|
||||||
|
default:
|
||||||
|
close(jt.stopCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DefaultSchedulerJitter is the jitter percentage applied to every
|
||||||
|
// scheduler-loop tick. ±10% is the industry-standard "spread but
|
||||||
|
// don't blur SLO" envelope used by Kubernetes controllers, AWS SDK
|
||||||
|
// retries, and Prometheus scrape intervals.
|
||||||
|
const DefaultSchedulerJitter = 0.10
|
||||||
@@ -0,0 +1,198 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package scheduler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 6 SCALE-M5 contract pin (2026-05-14): JitteredTicker fires
|
||||||
|
// ~interval per tick with a bounded ±jitterPct envelope. The tests
|
||||||
|
// below are timing-sensitive but use generous tolerances + averaging
|
||||||
|
// across many ticks to stay stable under CI load.
|
||||||
|
|
||||||
|
func TestJitteredTicker_BoundedEnvelope(t *testing.T) {
|
||||||
|
const (
|
||||||
|
interval = 20 * time.Millisecond
|
||||||
|
jitterPct = 0.20 // ±20%
|
||||||
|
ticks = 30
|
||||||
|
)
|
||||||
|
|
||||||
|
jt := NewJitteredTicker(interval, jitterPct)
|
||||||
|
defer jt.Stop()
|
||||||
|
|
||||||
|
last := time.Now()
|
||||||
|
for i := 0; i < ticks; i++ {
|
||||||
|
select {
|
||||||
|
case now := <-jt.C:
|
||||||
|
gap := now.Sub(last)
|
||||||
|
last = now
|
||||||
|
|
||||||
|
// Bounded envelope: every tick should fall within
|
||||||
|
// [interval × (1-jitter), interval × (1+jitter)] plus a
|
||||||
|
// generous scheduling-slop tolerance for the test
|
||||||
|
// runtime. The first tick is allowed wider slop since
|
||||||
|
// goroutine startup may eat into the first interval.
|
||||||
|
minGap := time.Duration(float64(interval) * (1 - jitterPct))
|
||||||
|
maxGap := time.Duration(float64(interval)*(1+jitterPct)) + 50*time.Millisecond
|
||||||
|
if i == 0 {
|
||||||
|
minGap = 0 // first tick can land arbitrarily fast under CI scheduling pressure
|
||||||
|
}
|
||||||
|
|
||||||
|
if gap < minGap || gap > maxGap {
|
||||||
|
t.Errorf("tick %d gap=%v outside envelope [%v, %v]", i, gap, minGap, maxGap)
|
||||||
|
}
|
||||||
|
case <-time.After(5 * interval):
|
||||||
|
t.Fatalf("tick %d timed out (>5×interval); JitteredTicker stuck", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJitteredTicker_MeanCloseToInterval(t *testing.T) {
|
||||||
|
// Statistical pin: across many ticks the mean gap should be
|
||||||
|
// reasonably close to the nominal interval. Larger deviations
|
||||||
|
// indicate the jitter draw is biased (e.g. only producing
|
||||||
|
// positive deltas because of a sign bug — mean would drift to
|
||||||
|
// interval × 1.3 instead of staying near interval × 1.0).
|
||||||
|
//
|
||||||
|
// The 50ms interval + 50-tick sample is chosen so per-scheduler-
|
||||||
|
// quantum jitter (~1ms on Linux) is < 2% of the interval; the
|
||||||
|
// 30% bound below is generous enough for CI scheduling noise
|
||||||
|
// while still catching sign bugs (which would push mean drift
|
||||||
|
// past 30% trivially).
|
||||||
|
const (
|
||||||
|
interval = 50 * time.Millisecond
|
||||||
|
jitterPct = 0.30
|
||||||
|
ticks = 50
|
||||||
|
)
|
||||||
|
|
||||||
|
jt := NewJitteredTicker(interval, jitterPct)
|
||||||
|
defer jt.Stop()
|
||||||
|
|
||||||
|
gaps := make([]time.Duration, 0, ticks)
|
||||||
|
last := time.Now()
|
||||||
|
|
||||||
|
for i := 0; i < ticks; i++ {
|
||||||
|
select {
|
||||||
|
case now := <-jt.C:
|
||||||
|
if i > 0 { // skip first gap (goroutine warmup)
|
||||||
|
gaps = append(gaps, now.Sub(last))
|
||||||
|
}
|
||||||
|
last = now
|
||||||
|
case <-time.After(5 * interval):
|
||||||
|
t.Fatalf("tick %d timed out", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var sum time.Duration
|
||||||
|
for _, g := range gaps {
|
||||||
|
sum += g
|
||||||
|
}
|
||||||
|
mean := sum / time.Duration(len(gaps))
|
||||||
|
|
||||||
|
// Sign-bug threshold: a healthy jittered ticker should produce
|
||||||
|
// mean ≈ interval (mean drift < 10%). A sign bug (e.g.
|
||||||
|
// always-positive jitter) shifts mean to interval × (1 +
|
||||||
|
// jitterPct / 2) = +15%. 30% bound catches that while
|
||||||
|
// tolerating CI scheduling noise + the (1 - x) vs (1 + x)
|
||||||
|
// asymmetry of multiplicative jitter.
|
||||||
|
driftPct := math.Abs(float64(mean-interval)) / float64(interval)
|
||||||
|
if driftPct > 0.30 {
|
||||||
|
t.Errorf("mean gap %v drifts %.1f%% from nominal interval %v (>30%% threshold)", mean, driftPct*100, interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJitteredTicker_Stop_ReleasesGoroutine(t *testing.T) {
|
||||||
|
jt := NewJitteredTicker(50*time.Millisecond, 0.10)
|
||||||
|
|
||||||
|
// Stop immediately, before any tick fires.
|
||||||
|
jt.Stop()
|
||||||
|
|
||||||
|
// C should close within one tick interval. If it doesn't, the
|
||||||
|
// goroutine is stuck (which would leak in production).
|
||||||
|
select {
|
||||||
|
case _, ok := <-jt.C:
|
||||||
|
if ok {
|
||||||
|
// A tick fired before C closed — also acceptable, but
|
||||||
|
// drain it and re-check that close follows.
|
||||||
|
select {
|
||||||
|
case _, ok2 := <-jt.C:
|
||||||
|
if ok2 {
|
||||||
|
t.Errorf("JitteredTicker.C still emitting after Stop()")
|
||||||
|
}
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Errorf("JitteredTicker.C did not close after Stop()")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-time.After(200 * time.Millisecond):
|
||||||
|
t.Errorf("JitteredTicker.C did not close within 200ms of Stop()")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJitteredTicker_Stop_Idempotent(t *testing.T) {
|
||||||
|
jt := NewJitteredTicker(50*time.Millisecond, 0.10)
|
||||||
|
|
||||||
|
// Multiple Stop() calls must not panic.
|
||||||
|
jt.Stop()
|
||||||
|
jt.Stop()
|
||||||
|
jt.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJitteredTicker_ZeroJitter_BehavesLikeTicker(t *testing.T) {
|
||||||
|
// jitterPct=0 reduces to a deterministic ticker. The mean
|
||||||
|
// should be exactly the interval (modulo scheduling noise).
|
||||||
|
const (
|
||||||
|
interval = 20 * time.Millisecond
|
||||||
|
ticks = 10
|
||||||
|
)
|
||||||
|
|
||||||
|
jt := NewJitteredTicker(interval, 0)
|
||||||
|
defer jt.Stop()
|
||||||
|
|
||||||
|
last := time.Now()
|
||||||
|
for i := 0; i < ticks; i++ {
|
||||||
|
select {
|
||||||
|
case now := <-jt.C:
|
||||||
|
gap := now.Sub(last)
|
||||||
|
last = now
|
||||||
|
// Allow generous slop for CI scheduling.
|
||||||
|
if i > 0 && (gap < interval/2 || gap > interval*3) {
|
||||||
|
t.Errorf("zero-jitter tick %d gap=%v far from interval=%v", i, gap, interval)
|
||||||
|
}
|
||||||
|
case <-time.After(5 * interval):
|
||||||
|
t.Fatalf("zero-jitter tick %d timed out", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJitteredTicker_NegativeJitter_TreatedAsZero(t *testing.T) {
|
||||||
|
// Defensive: negative jitterPct should not produce
|
||||||
|
// negative-duration timers (which would panic time.NewTimer).
|
||||||
|
jt := NewJitteredTicker(10*time.Millisecond, -0.5)
|
||||||
|
defer jt.Stop()
|
||||||
|
|
||||||
|
// Just confirm at least one tick fires without panic.
|
||||||
|
select {
|
||||||
|
case <-jt.C:
|
||||||
|
// ok
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Errorf("negative-jitter ticker produced no tick within 100ms")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestJitteredTicker_LargeJitter_ClampedBelowOne(t *testing.T) {
|
||||||
|
// Defensive: jitterPct≥1 would otherwise allow next=0 and panic
|
||||||
|
// time.NewTimer. Confirm the ticker still fires.
|
||||||
|
jt := NewJitteredTicker(10*time.Millisecond, 1.5)
|
||||||
|
defer jt.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-jt.C:
|
||||||
|
// ok
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Errorf("over-clamped-jitter ticker produced no tick within 100ms")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -473,7 +473,7 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
|||||||
// If an error occurs, it logs the error but continues running.
|
// If an error occurs, it logs the error but continues running.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
||||||
func (s *Scheduler) renewalCheckLoop(ctx context.Context) {
|
func (s *Scheduler) renewalCheckLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.renewalCheckInterval)
|
ticker := NewJitteredTicker(s.renewalCheckInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -522,7 +522,7 @@ func (s *Scheduler) runRenewalCheck(ctx context.Context) {
|
|||||||
// If an error occurs, it logs the error but continues running.
|
// If an error occurs, it logs the error but continues running.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous job is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous job is still running.
|
||||||
func (s *Scheduler) jobProcessorLoop(ctx context.Context) {
|
func (s *Scheduler) jobProcessorLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.jobProcessorInterval)
|
ticker := NewJitteredTicker(s.jobProcessorInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -573,7 +573,7 @@ func (s *Scheduler) runJobProcessor(ctx context.Context) {
|
|||||||
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
|
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
|
||||||
// is still running.
|
// is still running.
|
||||||
func (s *Scheduler) jobRetryLoop(ctx context.Context) {
|
func (s *Scheduler) jobRetryLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.jobRetryInterval)
|
ticker := NewJitteredTicker(s.jobRetryInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -628,7 +628,7 @@ func (s *Scheduler) runJobRetry(ctx context.Context) {
|
|||||||
// retry loop then auto-promotes eligible Failed jobs back to Pending. Closes
|
// retry loop then auto-promotes eligible Failed jobs back to Pending. Closes
|
||||||
// coverage gap I-003. Uses atomic.Bool to prevent duplicate execution.
|
// coverage gap I-003. Uses atomic.Bool to prevent duplicate execution.
|
||||||
func (s *Scheduler) jobTimeoutLoop(ctx context.Context) {
|
func (s *Scheduler) jobTimeoutLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.jobTimeoutInterval)
|
ticker := NewJitteredTicker(s.jobTimeoutInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -706,7 +706,7 @@ func (s *Scheduler) runJobTimeout(ctx context.Context) {
|
|||||||
// If an error occurs, it logs the error but continues running.
|
// If an error occurs, it logs the error but continues running.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
||||||
func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) {
|
func (s *Scheduler) agentHealthCheckLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.agentHealthCheckInterval)
|
ticker := NewJitteredTicker(s.agentHealthCheckInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -754,7 +754,7 @@ func (s *Scheduler) runAgentHealthCheck(ctx context.Context) {
|
|||||||
// If an error occurs, it logs the error but continues running.
|
// If an error occurs, it logs the error but continues running.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous process is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous process is still running.
|
||||||
func (s *Scheduler) notificationProcessLoop(ctx context.Context) {
|
func (s *Scheduler) notificationProcessLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.notificationProcessInterval)
|
ticker := NewJitteredTicker(s.notificationProcessInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -806,7 +806,7 @@ func (s *Scheduler) runNotificationProcess(ctx context.Context) {
|
|||||||
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
|
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
|
||||||
// is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte.
|
// is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte.
|
||||||
func (s *Scheduler) notificationRetryLoop(ctx context.Context) {
|
func (s *Scheduler) notificationRetryLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.notificationRetryInterval)
|
ticker := NewJitteredTicker(s.notificationRetryInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -861,7 +861,7 @@ func (s *Scheduler) runNotificationRetry(ctx context.Context) {
|
|||||||
// no CRL/OCSP needed.
|
// no CRL/OCSP needed.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
||||||
func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) {
|
func (s *Scheduler) shortLivedExpiryCheckLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.shortLivedExpiryCheckInterval)
|
ticker := NewJitteredTicker(s.shortLivedExpiryCheckInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -909,7 +909,7 @@ func (s *Scheduler) runShortLivedExpiryCheck(ctx context.Context) {
|
|||||||
// of configured network targets.
|
// of configured network targets.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
|
||||||
func (s *Scheduler) networkScanLoop(ctx context.Context) {
|
func (s *Scheduler) networkScanLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.networkScanInterval)
|
ticker := NewJitteredTicker(s.networkScanInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -956,7 +956,7 @@ func (s *Scheduler) runNetworkScan(ctx context.Context) {
|
|||||||
// digestLoop runs every digestInterval and generates/sends certificate digest emails.
|
// digestLoop runs every digestInterval and generates/sends certificate digest emails.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous digest is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous digest is still running.
|
||||||
func (s *Scheduler) digestLoop(ctx context.Context) {
|
func (s *Scheduler) digestLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.digestInterval)
|
ticker := NewJitteredTicker(s.digestInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Do NOT run immediately on start for digest — wait for the first tick.
|
// Do NOT run immediately on start for digest — wait for the first tick.
|
||||||
@@ -999,7 +999,7 @@ func (s *Scheduler) runDigest(ctx context.Context) {
|
|||||||
// resource-intensive. Wait for the first tick.
|
// resource-intensive. Wait for the first tick.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous check is still running.
|
||||||
func (s *Scheduler) healthCheckLoop(ctx context.Context) {
|
func (s *Scheduler) healthCheckLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.healthCheckInterval)
|
ticker := NewJitteredTicker(s.healthCheckInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Do NOT run immediately on start for health checks — wait for the first tick.
|
// Do NOT run immediately on start for health checks — wait for the first tick.
|
||||||
@@ -1041,7 +1041,7 @@ func (s *Scheduler) runHealthCheck(ctx context.Context) {
|
|||||||
// Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop.
|
// Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop.
|
||||||
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
|
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
|
||||||
func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) {
|
func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.cloudDiscoveryInterval)
|
ticker := NewJitteredTicker(s.cloudDiscoveryInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Run immediately on start (with idempotency guard)
|
// Run immediately on start (with idempotency guard)
|
||||||
@@ -1121,7 +1121,7 @@ func (s *Scheduler) WaitForCompletion(timeout time.Duration) error {
|
|||||||
//
|
//
|
||||||
// Bundle CRL/OCSP-Responder Phase 3.
|
// Bundle CRL/OCSP-Responder Phase 3.
|
||||||
func (s *Scheduler) crlGenerationLoop(ctx context.Context) {
|
func (s *Scheduler) crlGenerationLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.crlGenerationInterval)
|
ticker := NewJitteredTicker(s.crlGenerationInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
// Do NOT run immediately on start. CRLs are typically valid for
|
// Do NOT run immediately on start. CRLs are typically valid for
|
||||||
@@ -1171,7 +1171,7 @@ var ErrSchedulerShutdownTimeout = errors.New("scheduler graceful shutdown timeou
|
|||||||
// sync.WaitGroup tracks the in-flight goroutine for graceful shutdown.
|
// sync.WaitGroup tracks the in-flight goroutine for graceful shutdown.
|
||||||
// Phase 5.
|
// Phase 5.
|
||||||
func (s *Scheduler) acmeGCLoop(ctx context.Context) {
|
func (s *Scheduler) acmeGCLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.acmeGCInterval)
|
ticker := NewJitteredTicker(s.acmeGCInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@@ -1212,7 +1212,7 @@ func (s *Scheduler) acmeGCLoop(ctx context.Context) {
|
|||||||
// file: a stuck Postgres can't block the next tick, and concurrent
|
// file: a stuck Postgres can't block the next tick, and concurrent
|
||||||
// sweeps are skipped not queued.
|
// sweeps are skipped not queued.
|
||||||
func (s *Scheduler) sessionGCLoop(ctx context.Context) {
|
func (s *Scheduler) sessionGCLoop(ctx context.Context) {
|
||||||
ticker := time.NewTicker(s.sessionGCInterval)
|
ticker := NewJitteredTicker(s.sessionGCInterval, DefaultSchedulerJitter)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
|||||||
+55
@@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# scripts/ci-guards/no-bare-newticker-in-scheduler.sh
|
||||||
|
#
|
||||||
|
# Phase 6 SCALE-M5 closure (2026-05-14): block any future
|
||||||
|
# `time.NewTicker(...)` use inside internal/scheduler/scheduler.go.
|
||||||
|
#
|
||||||
|
# Phase 6 migrated all 15 scheduler-loop ticker sites from bare
|
||||||
|
# time.NewTicker(interval) to NewJitteredTicker(interval,
|
||||||
|
# DefaultSchedulerJitter) so multiple loops with the same cadence
|
||||||
|
# don't co-fire and produce hour-boundary CPU + DB spikes. A future
|
||||||
|
# refactor that re-introduces bare NewTicker would silently regress
|
||||||
|
# the spreading behavior.
|
||||||
|
#
|
||||||
|
# The guard:
|
||||||
|
# - Greps for `time.NewTicker(` in internal/scheduler/scheduler.go
|
||||||
|
# ONLY (the jitter helper lives in a separate file and is allowed
|
||||||
|
# to wrap time.NewTimer internally).
|
||||||
|
# - Fails the build on ANY match.
|
||||||
|
#
|
||||||
|
# Adding a new ticker site: use NewJitteredTicker instead. The base
|
||||||
|
# interval stays operator-configurable via the existing scheduler
|
||||||
|
# config fields; jitter is added on top.
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
TARGET="internal/scheduler/scheduler.go"
|
||||||
|
|
||||||
|
if [ ! -f "$TARGET" ]; then
|
||||||
|
echo "no-bare-newticker-in-scheduler: skipped — $TARGET not found"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
|
||||||
|
hits=$(grep -cE 'time\.NewTicker\(' "$TARGET" || true)
|
||||||
|
|
||||||
|
if [ "$hits" -gt 0 ]; then
|
||||||
|
echo "::error::no-bare-newticker-in-scheduler regression: $hits bare time.NewTicker(...) call(s) in $TARGET"
|
||||||
|
echo ""
|
||||||
|
echo "All scheduler-loop tickers MUST use NewJitteredTicker (Phase 6 SCALE-M5)."
|
||||||
|
echo "Replace 'ticker := time.NewTicker(interval)' with"
|
||||||
|
echo " 'ticker := NewJitteredTicker(interval, DefaultSchedulerJitter)'"
|
||||||
|
echo ""
|
||||||
|
echo "Offending lines:"
|
||||||
|
grep -nE 'time\.NewTicker\(' "$TARGET" || true
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
echo "no-bare-newticker-in-scheduler: clean — 0 bare NewTicker sites in scheduler.go"
|
||||||
|
|
||||||
|
# Belt-and-suspenders: confirm the JitteredTicker site count is
|
||||||
|
# non-trivial (regression catch where someone replaced the bare
|
||||||
|
# tickers with no-op direct-fire shims).
|
||||||
|
jitter_hits=$(grep -cE 'NewJitteredTicker\(' "$TARGET" || true)
|
||||||
|
if [ "$jitter_hits" -lt 10 ]; then
|
||||||
|
echo "::warning::no-bare-newticker-in-scheduler: only $jitter_hits JitteredTicker sites in scheduler.go (expected ≥ 10 — Phase 6 baseline was 15)"
|
||||||
|
fi
|
||||||
Reference in New Issue
Block a user