mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-12 18:48:51 +00:00
fix(scale): close BUNDLE 4 — migrations, scheduler HA, rate-limits, scale receipts
Bundle 4 closure (2026-05-13 acquisition diligence audit). Closes the
"what happens under multi-replica" question cluster: migration runner
had no concurrency control + no applied-version ledger, 15 scheduler
loops had per-process idempotency but no cross-replica documentation,
rate limits were process-local without an operator-facing scope
statement, load-test scope explicitly omitted four hot paths without
linking them to a roadmap.
Source findings closed:
HIGH-1 + D4 + finding 4 (migration tracking)
D8 (scheduler loop ownership)
MED-1 + MED-2 (rate-limit scope)
T9 + LOW-7 + finding 7 (load-test receipt scope)
Closures by source ID:
HIGH-1 + D4 + finding 4 — Migration tracking + advisory lock.
internal/repository/postgres/db.go::RunMigrations now wraps every
migration execution in:
1. A dedicated *sql.Conn pinned to one connection for the entire
scan + apply lifecycle (pg_advisory_lock is connection-scoped).
2. pg_advisory_lock(migrationAdvisoryLockID) — fixed int64 key
derived from "certctl-migrations" so the same constant resolves
across deployments without colliding with operator advisory
locks. Blocks the second replica until the first finishes.
3. CREATE TABLE IF NOT EXISTS schema_migrations(version TEXT PK,
applied_at TIMESTAMPTZ DEFAULT NOW()) — audit ledger.
4. Skip-applied loop: SELECT version FROM schema_migrations →
map[string]struct{} → skip every .up.sql whose filename is in
the map. INSERT after successful execute, ON CONFLICT
(version) DO NOTHING for defense in depth.
Pre-Bundle-4 every server boot re-ran all 45 .up.sql files. The
"idempotency via IF NOT EXISTS / ON CONFLICT" contract in CLAUDE.md
held per-migration but offered no protection when two Helm replicas
raced on schema DDL. Post-Bundle-4 single-replica deploys see zero
behavior change beyond the audit-table population; multi-replica
deploys get HA-safe schema bootstrap.
D8 — Scheduler HA semantics documented.
New docs/operator/scheduler-ha.md with per-loop inventory of all 15
loops in internal/scheduler/scheduler.go. Classification:
- HA-safe (jobProcessorLoop, jobRetryLoop) — FOR UPDATE SKIP
LOCKED via ClaimPendingJobs (Bundle 1 H-6 closure, 3e78ecb).
- HA-safe-ish (jobTimeoutLoop) — atomic UPDATE-WHERE-status.
- Idempotent under N>1 replicas (renewalCheckLoop,
agentHealthCheckLoop, shortLivedExpiryCheckLoop, networkScanLoop,
healthCheckLoop, acmeGCLoop, sessionGCLoop) — duplicate ticks
produce idempotent side effects.
- Side-effect-duplicating under N>1 replicas
(notificationProcessLoop, notificationRetryLoop, digestLoop,
cloudDiscoveryLoop, crlGenerationLoop) — duplicate
webhook/email/AWS-API/CRL-signing operations. Operators
running multi-replica accept N× side effects or pin to
server.replicas: 1.
Leader-election work tracked in WORKSPACE-ROADMAP.md as v3.
MED-1 + MED-2 — Rate-limit scope.
New docs/operator/rate-limit-scope.md states the contract verbatim:
process-local sync.Mutex-guarded sliding-window log, effective
cluster-wide cap = configured-per-replica × server.replicas,
restart-safe (no persistent state, no shared store), bounded
(50k/100k key cap with eviction). Five call sites documented:
ocspLimiter (1m/IP), exportLimiter (1h/actor), EST per-principal
(24h/CN), EST failed-auth (1h/IP), Intune dispatcher
(24h/Subject+Issuer), plus the HTTP middleware token-bucket
(RPS+Burst per replica). Cluster-wide shared limits via Redis or
Postgres-backed bucket are tracked in WORKSPACE-ROADMAP.md as v3.
T9 + LOW-7 + finding 7 — Load-test receipt scope.
The existing harness at deploy/test/loadtest/ already
self-documents the gap ("What it explicitly does NOT measure"). No
code change needed for this finding; Bundle 4 cross-references
scheduler-ha.md and rate-limit-scope.md from those gap callouts so
the four deferred coverage classes (issuer connector, scheduler
throughput, agent fleet, DB p99) land in the same place an
acquirer reads about HA semantics and rate limits.
Tests:
internal/repository/postgres/migrations_test.go (new, 4 tests):
- TestRunMigrations_PopulatesSchemaMigrations: audit table
exists and is non-empty after the first migration run.
- TestRunMigrations_SkipsAppliedOnSecondCall: second call is
observable no-op on row count.
- TestRunMigrations_ConcurrentCallsSerialized: two goroutines
racing the migrator both return without error; row count
unchanged; no duplicate versions.
- TestRunMigrations_FreshDatabaseHappyPath: ≥ 30 migrations
land on a fresh schema.
Gated by testcontainers via the existing repo_test.go getTestDB
pattern; skipped under -short. The integration lane runs them.
Verification:
gofmt -l # clean
go vet ./internal/repository/postgres ./cmd/server # clean
go build ./cmd/server ./internal/repository/postgres # clean
go test -short -count=1 ./internal/repository/postgres
./internal/ratelimit # PASS
Operator follow-up: full integration run on workstation:
go test -count=1 ./internal/repository/postgres -run TestRunMigrations_
Receipts (paths for the audit packet):
Migration runner evidence: internal/repository/postgres/db.go
L135-340 (advisory-lock + ledger + skip-applied loop) +
internal/repository/postgres/migrations_test.go (4 tests).
Scheduler loop inventory: docs/operator/scheduler-ha.md (15-loop
table with HA classification per loop).
Rate-limit storage matrix: docs/operator/rate-limit-scope.md.
Load-test baseline: deploy/test/loadtest/README.md (already
self-documenting), cross-linked from scheduler-ha.md.
Remaining operator warnings (deferred, tracked in WORKSPACE-ROADMAP.md):
- Leader election for the four duplicate-side-effect loops
(notificationProcessLoop, notificationRetryLoop, digestLoop,
cloudDiscoveryLoop, crlGenerationLoop). v3 work item.
- Shared rate-limits across replicas (Redis / Postgres token
bucket). v3 work item.
- Issuer-connector + scheduler-throughput + agent-fleet + DB-p99
load-test coverage. Tracked separately; per-issuer Prometheus
histograms already capture issuer round-trip latency in
production runs.
Audit-Closes: BUNDLE-4 HIGH-1 D4 D8 MED-1 MED-2 T9 LOW-7 finding-4 finding-7
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -131,20 +132,97 @@ func wrapPingError(err error) error {
|
||||
return fmt.Errorf("failed to ping database: %w", err)
|
||||
}
|
||||
|
||||
// migrationAdvisoryLockID is the Postgres pg_advisory_lock key that
|
||||
// gates concurrent migration execution. Computed as a stable hash of
|
||||
// the literal string "certctl-migrations" so the same constant resolves
|
||||
// across deployments without colliding with operator-supplied advisory
|
||||
// locks for other workloads on the same database.
|
||||
//
|
||||
// Bundle 4 closure (HIGH-1 + D4 + finding 4, 2026-05-13): pre-Bundle-4
|
||||
// `RunMigrations` re-executed every `.up.sql` file on every server
|
||||
// boot with no concurrency control. The "idempotency via IF NOT EXISTS
|
||||
// / ON CONFLICT" contract in the CLAUDE.md operating rules held for
|
||||
// every individual migration, but offered no protection when two
|
||||
// server replicas executed RunMigrations against the same database
|
||||
// simultaneously (the Helm chart's `server.replicas > 1` HA path):
|
||||
// duplicate DDL races could still produce SQLSTATE 42P07 (relation
|
||||
// already exists) under specific schema-modification interleavings,
|
||||
// and any future migration that drifted from the IF-NOT-EXISTS
|
||||
// pattern would race-corrupt without warning.
|
||||
//
|
||||
// Post-Bundle-4 RunMigrations acquires this advisory lock on a
|
||||
// dedicated connection before scanning the directory; runner-up
|
||||
// replicas block at pg_advisory_lock() until the migrator finishes,
|
||||
// then observe the populated schema_migrations table and skip every
|
||||
// already-applied file. Single-replica deploys see no behavior change
|
||||
// beyond the schema_migrations audit trail and the lock acquire/
|
||||
// release log lines.
|
||||
const migrationAdvisoryLockID int64 = 7283164759461502341
|
||||
|
||||
// migrationsTableDDL creates the audit-trail table that records which
|
||||
// migration files have already been applied. Idempotent: subsequent
|
||||
// boots see the table, scan its contents, and skip any .up.sql whose
|
||||
// version (the filename, e.g. "000001_initial_schema.up.sql") is
|
||||
// already present.
|
||||
//
|
||||
// The version column intentionally stores the FULL filename rather
|
||||
// than a parsed version-number prefix. Migrations have always been
|
||||
// filename-keyed; storing the raw name keeps the lookup mechanical
|
||||
// and survives any future renames (a hash-based fingerprint would
|
||||
// re-run every migration on rename, an explicit numeric version
|
||||
// breaks if a migration is ever renamed without renumbering).
|
||||
const migrationsTableDDL = `
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version TEXT PRIMARY KEY,
|
||||
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
)`
|
||||
|
||||
// RunMigrations reads and executes SQL migration files from a directory.
|
||||
//
|
||||
// Bundle 4 closure: now wraps every migration execution in:
|
||||
//
|
||||
// 1. A `pg_advisory_lock(migrationAdvisoryLockID)` so only one
|
||||
// server replica applies migrations at a time. Other replicas
|
||||
// block at acquire and proceed once migrations finish.
|
||||
// 2. A `schema_migrations` audit table so each subsequent boot
|
||||
// skips already-applied files instead of re-executing them.
|
||||
// This converts the implicit "idempotency via IF NOT EXISTS"
|
||||
// contract into an explicit applied-versions ledger an operator
|
||||
// or acquirer can audit with `SELECT * FROM schema_migrations
|
||||
// ORDER BY applied_at`.
|
||||
//
|
||||
// Single-replica deploys see zero behavior change beyond the new
|
||||
// audit-table population. Multi-replica deploys (Helm
|
||||
// `server.replicas > 1`) get HA-safe schema bootstrap.
|
||||
//
|
||||
// The advisory lock and audit table are managed through a dedicated
|
||||
// *sql.Conn so the lock survives even if the underlying pool rotates
|
||||
// connections; the deferred unlock is `LOCAL` to that connection.
|
||||
// Failure modes:
|
||||
//
|
||||
// - Migrations directory missing: returned unchanged from the
|
||||
// pre-Bundle-4 error path so existing callers ergonomics hold.
|
||||
// - Lock acquire fails (network / shutdown): wrapped with context
|
||||
// so the operator-visible error names the wrapping operation.
|
||||
// - Individual migration fails: aborts with the same %s wrap as
|
||||
// pre-Bundle-4, leaving schema_migrations untouched for that
|
||||
// migration so a retry rerun reattempts only the failing file.
|
||||
func RunMigrations(db *sql.DB, migrationsPath string) error {
|
||||
// Check if migrations directory exists
|
||||
// Check if migrations directory exists.
|
||||
if _, err := os.Stat(migrationsPath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("migrations directory not found: %s", migrationsPath)
|
||||
}
|
||||
|
||||
// Read all SQL files from the migrations directory
|
||||
// Read all SQL files from the migrations directory.
|
||||
files, err := os.ReadDir(migrationsPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read migrations directory: %w", err)
|
||||
}
|
||||
|
||||
// Sort and filter to only .up.sql migration files (skip .down.sql rollbacks and seed files)
|
||||
// Sort and filter to only .up.sql migration files (skip .down.sql
|
||||
// rollbacks and seed files). os.ReadDir returns entries in lexical
|
||||
// order which matches the migration file naming convention
|
||||
// (000001_, 000002_, ..., 000045_).
|
||||
var sqlFiles []string
|
||||
for _, file := range files {
|
||||
if !file.IsDir() && strings.HasSuffix(file.Name(), ".up.sql") {
|
||||
@@ -152,18 +230,89 @@ func RunMigrations(db *sql.DB, migrationsPath string) error {
|
||||
}
|
||||
}
|
||||
|
||||
// Execute each migration file in order
|
||||
// Pin to a single connection for the lock-table-migrations
|
||||
// lifecycle. pg_advisory_lock is connection-scoped: releasing it
|
||||
// requires the SAME connection that acquired it. Using a pooled
|
||||
// db.Exec for the unlock could land on a different connection and
|
||||
// silently fail to release.
|
||||
ctx := context.Background()
|
||||
conn, err := db.Conn(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to acquire migration connection: %w", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Bundle 4: acquire the cross-replica advisory lock. Blocks until
|
||||
// acquired; postgres queues the second replica behind the first.
|
||||
if _, err := conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, migrationAdvisoryLockID); err != nil {
|
||||
return fmt.Errorf("failed to acquire migration advisory lock: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
// Best-effort release on the same connection. The connection
|
||||
// will close anyway via the defer above, which also releases
|
||||
// the lock at the postgres backend.
|
||||
_, _ = conn.ExecContext(ctx, `SELECT pg_advisory_unlock($1)`, migrationAdvisoryLockID)
|
||||
}()
|
||||
|
||||
// Bundle 4: ensure the audit table exists before consulting it.
|
||||
// IF NOT EXISTS keeps this idempotent on every boot.
|
||||
if _, err := conn.ExecContext(ctx, migrationsTableDDL); err != nil {
|
||||
return fmt.Errorf("failed to ensure schema_migrations table: %w", err)
|
||||
}
|
||||
|
||||
// Read the set of already-applied versions. We hold the advisory
|
||||
// lock through the entire SELECT → apply → INSERT cycle, so no
|
||||
// other replica can race-INSERT a duplicate row between this read
|
||||
// and our subsequent writes.
|
||||
applied := make(map[string]struct{})
|
||||
rows, err := conn.QueryContext(ctx, `SELECT version FROM schema_migrations`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read schema_migrations: %w", err)
|
||||
}
|
||||
for rows.Next() {
|
||||
var v string
|
||||
if err := rows.Scan(&v); err != nil {
|
||||
rows.Close()
|
||||
return fmt.Errorf("failed to scan schema_migrations row: %w", err)
|
||||
}
|
||||
applied[v] = struct{}{}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return fmt.Errorf("schema_migrations scan error: %w", err)
|
||||
}
|
||||
rows.Close()
|
||||
|
||||
// Execute each migration file in order, skipping already-applied
|
||||
// versions.
|
||||
for _, filename := range sqlFiles {
|
||||
if _, ok := applied[filename]; ok {
|
||||
// Already applied on a prior boot; skip.
|
||||
continue
|
||||
}
|
||||
filePath := filepath.Join(migrationsPath, filename)
|
||||
content, err := os.ReadFile(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read migration file %s: %w", filename, err)
|
||||
}
|
||||
|
||||
// Execute the SQL content
|
||||
if _, err := db.Exec(string(content)); err != nil {
|
||||
// Execute the SQL content on the locked connection so any
|
||||
// session-scoped state set by the migration (e.g. SET ROLE)
|
||||
// stays within the migration's session.
|
||||
if _, err := conn.ExecContext(ctx, string(content)); err != nil {
|
||||
return fmt.Errorf("failed to execute migration %s: %w", filename, err)
|
||||
}
|
||||
|
||||
// Record the applied version. ON CONFLICT (version) DO NOTHING
|
||||
// is defense-in-depth: even though we hold the advisory lock
|
||||
// (so no concurrent migrator should be writing here), a stale
|
||||
// `schema_migrations` row from a half-rolled-back deploy
|
||||
// shouldn't crash the boot.
|
||||
if _, err := conn.ExecContext(ctx,
|
||||
`INSERT INTO schema_migrations (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`,
|
||||
filename,
|
||||
); err != nil {
|
||||
return fmt.Errorf("failed to record applied migration %s in schema_migrations: %w", filename, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user