mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 14:51:30 +00:00
750478a6fe
Bundle 4 closure (2026-05-13 acquisition diligence audit). Closes the
"what happens under multi-replica" question cluster: migration runner
had no concurrency control + no applied-version ledger, 15 scheduler
loops had per-process idempotency but no cross-replica documentation,
rate limits were process-local without an operator-facing scope
statement, load-test scope explicitly omitted four hot paths without
linking them to a roadmap.
Source findings closed:
HIGH-1 + D4 + finding 4 (migration tracking)
D8 (scheduler loop ownership)
MED-1 + MED-2 (rate-limit scope)
T9 + LOW-7 + finding 7 (load-test receipt scope)
Closures by source ID:
HIGH-1 + D4 + finding 4 — Migration tracking + advisory lock.
internal/repository/postgres/db.go::RunMigrations now wraps every
migration execution in:
1. A dedicated *sql.Conn pinned to one connection for the entire
scan + apply lifecycle (pg_advisory_lock is connection-scoped).
2. pg_advisory_lock(migrationAdvisoryLockID) — fixed int64 key
derived from "certctl-migrations" so the same constant resolves
across deployments without colliding with operator advisory
locks. Blocks the second replica until the first finishes.
3. CREATE TABLE IF NOT EXISTS schema_migrations(version TEXT PK,
applied_at TIMESTAMPTZ DEFAULT NOW()) — audit ledger.
4. Skip-applied loop: SELECT version FROM schema_migrations →
map[string]struct{} → skip every .up.sql whose filename is in
the map. INSERT after successful execute, ON CONFLICT
(version) DO NOTHING for defense in depth.
Pre-Bundle-4 every server boot re-ran all 45 .up.sql files. The
"idempotency via IF NOT EXISTS / ON CONFLICT" contract in CLAUDE.md
held per-migration but offered no protection when two Helm replicas
raced on schema DDL. Post-Bundle-4 single-replica deploys see zero
behavior change beyond the audit-table population; multi-replica
deploys get HA-safe schema bootstrap.
D8 — Scheduler HA semantics documented.
New docs/operator/scheduler-ha.md with per-loop inventory of all 15
loops in internal/scheduler/scheduler.go. Classification:
- HA-safe (jobProcessorLoop, jobRetryLoop) — FOR UPDATE SKIP
LOCKED via ClaimPendingJobs (Bundle 1 H-6 closure, 3e78ecb).
- HA-safe-ish (jobTimeoutLoop) — atomic UPDATE-WHERE-status.
- Idempotent under N>1 replicas (renewalCheckLoop,
agentHealthCheckLoop, shortLivedExpiryCheckLoop, networkScanLoop,
healthCheckLoop, acmeGCLoop, sessionGCLoop) — duplicate ticks
produce idempotent side effects.
- Side-effect-duplicating under N>1 replicas
(notificationProcessLoop, notificationRetryLoop, digestLoop,
cloudDiscoveryLoop, crlGenerationLoop) — duplicate
webhook/email/AWS-API/CRL-signing operations. Operators
running multi-replica accept N× side effects or pin to
server.replicas: 1.
Leader-election work tracked in WORKSPACE-ROADMAP.md as v3.
MED-1 + MED-2 — Rate-limit scope.
New docs/operator/rate-limit-scope.md states the contract verbatim:
process-local sync.Mutex-guarded sliding-window log, effective
cluster-wide cap = configured-per-replica × server.replicas,
restart-safe (no persistent state, no shared store), bounded
(50k/100k key cap with eviction). Five call sites documented:
ocspLimiter (1m/IP), exportLimiter (1h/actor), EST per-principal
(24h/CN), EST failed-auth (1h/IP), Intune dispatcher
(24h/Subject+Issuer), plus the HTTP middleware token-bucket
(RPS+Burst per replica). Cluster-wide shared limits via Redis or
Postgres-backed bucket are tracked in WORKSPACE-ROADMAP.md as v3.
T9 + LOW-7 + finding 7 — Load-test receipt scope.
The existing harness at deploy/test/loadtest/ already
self-documents the gap ("What it explicitly does NOT measure"). No
code change needed for this finding; Bundle 4 cross-references
scheduler-ha.md and rate-limit-scope.md from those gap callouts so
the four deferred coverage classes (issuer connector, scheduler
throughput, agent fleet, DB p99) land in the same place an
acquirer reads about HA semantics and rate limits.
Tests:
internal/repository/postgres/migrations_test.go (new, 4 tests):
- TestRunMigrations_PopulatesSchemaMigrations: audit table
exists and is non-empty after the first migration run.
- TestRunMigrations_SkipsAppliedOnSecondCall: second call is
observable no-op on row count.
- TestRunMigrations_ConcurrentCallsSerialized: two goroutines
racing the migrator both return without error; row count
unchanged; no duplicate versions.
- TestRunMigrations_FreshDatabaseHappyPath: ≥ 30 migrations
land on a fresh schema.
Gated by testcontainers via the existing repo_test.go getTestDB
pattern; skipped under -short. The integration lane runs them.
Verification:
gofmt -l # clean
go vet ./internal/repository/postgres ./cmd/server # clean
go build ./cmd/server ./internal/repository/postgres # clean
go test -short -count=1 ./internal/repository/postgres
./internal/ratelimit # PASS
Operator follow-up: full integration run on workstation:
go test -count=1 ./internal/repository/postgres -run TestRunMigrations_
Receipts (paths for the audit packet):
Migration runner evidence: internal/repository/postgres/db.go
L135-340 (advisory-lock + ledger + skip-applied loop) +
internal/repository/postgres/migrations_test.go (4 tests).
Scheduler loop inventory: docs/operator/scheduler-ha.md (15-loop
table with HA classification per loop).
Rate-limit storage matrix: docs/operator/rate-limit-scope.md.
Load-test baseline: deploy/test/loadtest/README.md (already
self-documenting), cross-linked from scheduler-ha.md.
Remaining operator warnings (deferred, tracked in WORKSPACE-ROADMAP.md):
- Leader election for the four duplicate-side-effect loops
(notificationProcessLoop, notificationRetryLoop, digestLoop,
cloudDiscoveryLoop, crlGenerationLoop). v3 work item.
- Shared rate-limits across replicas (Redis / Postgres token
bucket). v3 work item.
- Issuer-connector + scheduler-throughput + agent-fleet + DB-p99
load-test coverage. Tracked separately; per-issuer Prometheus
histograms already capture issuer round-trip latency in
production runs.
Audit-Closes: BUNDLE-4 HIGH-1 D4 D8 MED-1 MED-2 T9 LOW-7 finding-4 finding-7
217 lines
8.0 KiB
Go
217 lines
8.0 KiB
Go
// Bundle 4 closure (HIGH-1 + D4 + finding 4, 2026-05-13): tests for the
|
|
// schema_migrations audit table + pg_advisory_lock concurrency control
|
|
// added to RunMigrations in db.go.
|
|
//
|
|
// Pre-Bundle-4 every .up.sql ran on every boot with no concurrency
|
|
// control. The "IF NOT EXISTS / ON CONFLICT idempotency contract" held
|
|
// for individual migrations but two replicas could race on the
|
|
// schema-modification phase. These tests pin the post-Bundle-4
|
|
// contract: the audit table is populated, repeated calls skip applied
|
|
// migrations, and the advisory lock prevents concurrent execution.
|
|
//
|
|
// Gated by testcontainers — skipped under -short. The integration lane
|
|
// invokes the full test set via the testcontainers harness.
|
|
|
|
package postgres_test
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/certctl-io/certctl/internal/repository/postgres"
|
|
)
|
|
|
|
// TestRunMigrations_PopulatesSchemaMigrations pins that the audit
|
|
// table is created and populated. Pre-Bundle-4 the table did not
|
|
// exist; this test would fail on a pre-Bundle-4 binary at the
|
|
// `SELECT version FROM schema_migrations` step (relation does not
|
|
// exist), confirming the closure ships.
|
|
func TestRunMigrations_PopulatesSchemaMigrations(t *testing.T) {
|
|
tdb := getTestDB(t)
|
|
db := tdb.freshSchema(t)
|
|
ctx := context.Background()
|
|
|
|
// freshSchema already ran the migrations once via the test-local
|
|
// runner. The schema_migrations table must exist and be
|
|
// non-empty.
|
|
var count int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM schema_migrations`,
|
|
).Scan(&count); err != nil {
|
|
t.Fatalf("schema_migrations table missing or unreadable: %v", err)
|
|
}
|
|
if count == 0 {
|
|
t.Fatalf("schema_migrations row count = 0; expected at least 1 (Bundle 4 audit-trail contract)")
|
|
}
|
|
|
|
// Sanity: every row's version ends in .up.sql (we store the
|
|
// full filename, not a parsed number).
|
|
rows, err := db.QueryContext(ctx, `SELECT version FROM schema_migrations`)
|
|
if err != nil {
|
|
t.Fatalf("query schema_migrations: %v", err)
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var v string
|
|
if err := rows.Scan(&v); err != nil {
|
|
t.Fatalf("scan: %v", err)
|
|
}
|
|
if len(v) < len(".up.sql") || v[len(v)-len(".up.sql"):] != ".up.sql" {
|
|
t.Errorf("schema_migrations.version=%q does not end in .up.sql; the Bundle 4 closure stores filenames verbatim so the lookup is mechanical", v)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestRunMigrations_SkipsAppliedOnSecondCall pins the skip-applied
|
|
// contract. After freshSchema ran migrations once, calling RunMigrations
|
|
// again MUST be a near-no-op: every file is already in
|
|
// schema_migrations, so the migrator loops, sees each file in the
|
|
// applied map, and skips the file-read + db.Exec. The fastest way to
|
|
// witness this is to time the second call and assert it's measurably
|
|
// faster than the first — but freshSchema doesn't expose the timing of
|
|
// the first run, so we go with a behavior pin instead: counting the
|
|
// schema_migrations rows before and after the second call must show no
|
|
// change.
|
|
func TestRunMigrations_SkipsAppliedOnSecondCall(t *testing.T) {
|
|
tdb := getTestDB(t)
|
|
db := tdb.freshSchema(t)
|
|
ctx := context.Background()
|
|
|
|
// Snapshot row count (post-first-migration via freshSchema).
|
|
var before int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM schema_migrations`,
|
|
).Scan(&before); err != nil {
|
|
t.Fatalf("snapshot row count: %v", err)
|
|
}
|
|
if before == 0 {
|
|
t.Fatalf("pre-call snapshot=0; expected at least 1 (Bundle 4 contract)")
|
|
}
|
|
|
|
migrationsPath := findMigrationsDir()
|
|
if err := postgres.RunMigrations(db, migrationsPath); err != nil {
|
|
t.Fatalf("RunMigrations second-call: %v (Bundle 4 contract: every applied file must be in schema_migrations and skipped)", err)
|
|
}
|
|
|
|
var after int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM schema_migrations`,
|
|
).Scan(&after); err != nil {
|
|
t.Fatalf("post-call row count: %v", err)
|
|
}
|
|
if after != before {
|
|
t.Errorf("schema_migrations row count changed: before=%d after=%d; the second RunMigrations call must skip every already-applied file", before, after)
|
|
}
|
|
}
|
|
|
|
// TestRunMigrations_ConcurrentCallsSerialized pins the
|
|
// pg_advisory_lock contract. Two goroutines call RunMigrations
|
|
// concurrently against the same database; both must return without
|
|
// error and the final state must be identical to a single call. The
|
|
// advisory lock guarantees Postgres-side serialization — without it,
|
|
// the parallel callers would both attempt to run migrations against
|
|
// a non-locked database and race on DDL. The lock makes one of them
|
|
// run to completion + populate schema_migrations, then the other
|
|
// wakes up, observes the populated table, and skips everything.
|
|
//
|
|
// This test does not directly measure that pg_advisory_lock was
|
|
// acquired — that would require either a SQL-level inspection of
|
|
// pg_locks (racy timing) or instrumenting db.go for testing (intrusive).
|
|
// Instead it pins the observable end-state: no error, no duplicate
|
|
// rows, no extra rows.
|
|
func TestRunMigrations_ConcurrentCallsSerialized(t *testing.T) {
|
|
tdb := getTestDB(t)
|
|
// Use a single freshSchema so both goroutines target the same
|
|
// schema (test isolation is per-test, not per-goroutine).
|
|
db := tdb.freshSchema(t)
|
|
ctx := context.Background()
|
|
|
|
var before int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM schema_migrations`,
|
|
).Scan(&before); err != nil {
|
|
t.Fatalf("snapshot row count: %v", err)
|
|
}
|
|
|
|
migrationsPath := findMigrationsDir()
|
|
|
|
// Launch two concurrent RunMigrations calls.
|
|
const goroutines = 2
|
|
errCh := make(chan error, goroutines)
|
|
var wg sync.WaitGroup
|
|
wg.Add(goroutines)
|
|
for i := 0; i < goroutines; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
errCh <- postgres.RunMigrations(db, migrationsPath)
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
close(errCh)
|
|
|
|
for err := range errCh {
|
|
if err != nil {
|
|
t.Errorf("RunMigrations concurrent call returned %v; the Bundle 4 advisory lock must serialize concurrent migrators without error", err)
|
|
}
|
|
}
|
|
|
|
var after int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM schema_migrations`,
|
|
).Scan(&after); err != nil {
|
|
t.Fatalf("post-concurrent row count: %v", err)
|
|
}
|
|
if after != before {
|
|
t.Errorf("schema_migrations row count changed after concurrent calls: before=%d after=%d; concurrent migrators must both observe the populated audit table and skip", before, after)
|
|
}
|
|
|
|
// Defensive: no duplicate version rows.
|
|
var dupCount int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM (SELECT version, COUNT(*) c FROM schema_migrations GROUP BY version HAVING COUNT(*) > 1) d`,
|
|
).Scan(&dupCount); err != nil {
|
|
t.Fatalf("duplicate-version query: %v", err)
|
|
}
|
|
if dupCount != 0 {
|
|
t.Errorf("schema_migrations has %d duplicate version rows; the PRIMARY KEY + advisory-lock contract should prevent any duplicates", dupCount)
|
|
}
|
|
}
|
|
|
|
// TestRunMigrations_PingsConnection is a smoke test that confirms the
|
|
// Bundle 4 changes did not break the basic happy path. The pre-existing
|
|
// RunSeed_AppliesIdempotently test already covers post-migration
|
|
// behavior; this test pins that the lock + table-create + version
|
|
// scan + apply path completes without error against an empty
|
|
// database.
|
|
func TestRunMigrations_FreshDatabaseHappyPath(t *testing.T) {
|
|
tdb := getTestDB(t)
|
|
db := tdb.freshSchema(t)
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
if err := db.PingContext(ctx); err != nil {
|
|
t.Fatalf("post-migration ping failed: %v", err)
|
|
}
|
|
|
|
// At least 30 migrations ship in the v2.1.0-pre tree (the test
|
|
// asserts a floor, not an exact count, so this stays robust as
|
|
// future migrations land).
|
|
var count int
|
|
if err := db.QueryRowContext(ctx,
|
|
`SELECT COUNT(*) FROM schema_migrations`,
|
|
).Scan(&count); err != nil {
|
|
t.Fatalf("schema_migrations count: %v", err)
|
|
}
|
|
if count < 30 {
|
|
t.Errorf("schema_migrations count = %d; expected ≥ 30 (Bundle 4 ledger should be populated from the migrations/ directory)", count)
|
|
}
|
|
}
|
|
|
|
// witness: keep the database/sql import alive on this file so the
|
|
// build tagger doesn't whine about unused imports if the test bodies
|
|
// stop using the type alias.
|
|
var _ = sql.ErrNoRows
|