diff --git a/docs/README.md b/docs/README.md index 0a7a9ff..545a48a 100644 --- a/docs/README.md +++ b/docs/README.md @@ -74,6 +74,8 @@ You're running certctl in production and need operational guidance. | [Helm deployment](operator/helm-deployment.md) | Kubernetes installation via the bundled chart | | [Performance baselines](operator/performance-baselines.md) | Operator-runnable benchmarks for regression spot checks | | [Auth benchmarks](operator/auth-benchmarks.md) | Session + OIDC validation p99 targets and measured baselines | +| [Scheduler HA semantics](operator/scheduler-ha.md) | Per-loop HA truth table for the 15 scheduler loops; what duplicates on multi-replica | +| [Rate-limit scope](operator/rate-limit-scope.md) | Process-local vs cluster-wide rate-limit behavior, restart semantics, multi-replica mental math | | [Legacy clients (TLS 1.2)](operator/legacy-clients-tls-1.2.md) | Reverse-proxy runbook for embedded EST/SCEP clients on TLS 1.2 | ### Runbooks diff --git a/docs/operator/rate-limit-scope.md b/docs/operator/rate-limit-scope.md new file mode 100644 index 0000000..b88d9b4 --- /dev/null +++ b/docs/operator/rate-limit-scope.md @@ -0,0 +1,54 @@ +# Rate-Limit Scope + +> Last reviewed: 2026-05-13 + +How certctl's rate limiters behave under multi-replica and restart, and where the boundaries are. Closes Bundle 4 audit findings **MED-1** (process-local rate limits) and **MED-2** (rate-limit semantics across replicas). + +## TL;DR + +Every rate limiter in certctl is **process-local**: in-memory `sync.Mutex`-guarded maps in the `internal/ratelimit` package. The effective rate-limit across an N-replica deployment is **N × the configured per-replica limit**. Limiter state is lost on restart — no persistent ledger, no shared store. This is intentional for v2.1.0 and documented; shared rate limits across replicas (via Redis or a Postgres-backed token bucket) is a v3 work item tracked in `WORKSPACE-ROADMAP.md`. + +## Limiter inventory + +The shared primitive lives at `internal/ratelimit/sliding_window.go::SlidingWindowLimiter`. It's a sliding-window log algorithm (each key holds timestamps within the configured window; on `Allow` the bucket prunes timestamps older than `now - window` and admits or rejects based on the post-prune count). + +Five call sites exercise it across `cmd/server/main.go`: + +| Call site | Key | Window | Cap | What it protects | +|---|---|---|---|---| +| `ocspLimiter` | source IP | 1 minute | `CERTCTL_OCSP_RATE_LIMIT_PER_IP_MIN` (default 1000) | RFC 6960 OCSP responder against scan amplification. | +| `exportLimiter` | actor ID | 1 hour | `CERTCTL_CERT_EXPORT_RATE_LIMIT_PER_ACTOR_HR` (default 50) | `/api/v1/certificates/{id}/export` bulk-cert-pull abuse. | +| EST per-principal | CN | 24 hours | per-profile `RateLimitPerPrincipal24h` | EST RFC 7030 device enrollment abuse. | +| EST failed-auth | source IP | 1 hour | 10 attempts | EST HTTP-Basic brute force. | +| Intune dispatcher | (Subject, Issuer) | 24 hours | per-profile `PerDeviceRateLimit24h` | SCEP + Intune duplicate-enrollment cap. | + +The HTTP middleware rate-limiter (`internal/api/middleware/middleware.go::RateLimitConfig`, knobs `CERTCTL_RATE_LIMIT_RPS` + `CERTCTL_RATE_LIMIT_BURST`) is a separate token-bucket implementation but follows the same process-local scope. + +## What is in scope + +- **Per-process abuse mitigation**. A scanner hitting one replica's OCSP responder at 5000 rps gets dropped to 1000 rps by `ocspLimiter`. A compromised API key trying to bulk-export 1000 certs in an hour against one replica gets dropped to 50. +- **Bounded memory footprint**. Each limiter caps its key-tracking map at the size passed to `NewSlidingWindowLimiter` (50 000 for OCSP/export, 100 000 for EST/Intune per-device). The at-cap eviction janitor drops the oldest entry by newest-timestamp so a key explosion (random source IPs from a botnet, random CNs from a misconfigured fleet) never bloats memory. +- **Restart safety**. The sliding-window state is per-process in-memory. On a server restart the limiter state resets — any attacker who'd burned through their window cap before the restart gets a fresh window after. Conversely, a legitimate caller that had hit their cap right before a restart gets immediately unblocked. Both directions are intentional: we don't ship a persistent state store, so the post-restart state is "fresh start". + +## What is NOT in scope + +- **Shared limits across replicas**. With `server.replicas: N`, an attacker hitting all replicas in parallel gets N × the per-replica cap. For the default OCSP knob (1000 rps per IP per minute) at N=3, that's 3000 rps per IP per minute before any single replica drops the traffic. The chart's default is N=1; operators running multi-replica should multiply published per-replica caps by the replica count to get the cluster-wide effective cap. +- **Cross-restart persistence**. See "Restart safety" above — this is by-design but operators need to know. +- **First-party (`Authorization: Bearer ...`) request rate-limit cohesion across replicas**. The middleware-level RPS/burst rate-limit (`CERTCTL_RATE_LIMIT_RPS`) is also process-local. At N=3 replicas with default `RPS=50, Burst=100`, the effective cluster-wide rate is 150 rps with 300 burst. + +## Operator guidance + +**Single-replica deployments** (Helm chart default `server.replicas: 1`): published caps are the effective caps. No mental math. + +**Multi-replica deployments**: multiply every published cap by `server.replicas` to get the effective per-key cap. If your threat model needs strict cluster-wide rate limits (e.g., SOC 2 control mapping that quotes "≤ 1000 OCSP requests per IP per minute"), one of: + +1. **Pin to single replica** and scale vertically. The default v2.1.0 posture; works for substantial fleets. +2. **Front the cluster with a rate-limited gateway** (NGINX `limit_req_zone`, Envoy global rate-limit service, Cloudflare WAF Bypass rules) and treat the cluster-internal limiter as defense-in-depth. +3. **Wait for the v3 shared-rate-limit work** (tracked in WORKSPACE-ROADMAP.md). Likely Redis or Postgres-backed token-bucket plumbed through the same `internal/ratelimit` package so call sites stay unchanged. + +## Source-of-truth references + +- Shared primitive: `internal/ratelimit/sliding_window.go` (the package comment at the top is the canonical algorithm + concurrency reference). +- Middleware rate limiter: `internal/api/middleware/middleware.go::RateLimitConfig`. +- Call sites: `grep -n "ratelimit.NewSlidingWindowLimiter\|RateLimitConfig" cmd/server/main.go`. +- Configuration knobs: `docs/reference/configuration.md` (search "rate limit"). diff --git a/docs/operator/scheduler-ha.md b/docs/operator/scheduler-ha.md new file mode 100644 index 0000000..56fabb8 --- /dev/null +++ b/docs/operator/scheduler-ha.md @@ -0,0 +1,70 @@ +# Scheduler HA Semantics + +> Last reviewed: 2026-05-13 + +What happens when you run more than one `certctl-server` replica? Which scheduler loops are safe to run on every replica simultaneously, which need leader election, and which silently duplicate work today? + +This page closes Bundle 4 audit findings **D8** (singleton loop ambiguity) and **HIGH-1 + MED-2** (HA semantics across the scheduler surface). It is a per-loop inventory of the 15 scheduler loops in `internal/scheduler/scheduler.go`, classified by HA-safety. + +## TL;DR + +The only loops that are HA-safe today via `FOR UPDATE SKIP LOCKED` job claiming are `jobProcessorLoop` and `jobRetryLoop`. Every other loop is *intra-process* idempotent (a per-replica `sync/atomic.Bool` guard prevents a single replica from running the same loop twice at once) but is *cross-replica* duplicative — two replicas tick at the same interval and both do the work. + +For ten of the fourteen non-job-claim loops this is harmless: the work is read-only DB scanning that produces idempotent side effects (e.g., create-job-if-not-exists, send-notification-once-per-event-via-DB-ledger), and duplicate execution wastes CPU but cannot corrupt data. For four loops (`notificationProcessLoop`, `digestLoop`, `crlGenerationLoop`, `cloudDiscoveryLoop`) duplicate execution produces observable duplicate side effects: duplicate emails, duplicate webhooks, duplicate CRL writes. v2.1.0 supports `server.replicas > 1` for read availability and api throughput, but operators running multi-replica should accept these four duplication classes or pin replicas to 1 until the leader-election work lands. + +True leader-election via Postgres advisory lock or Kubernetes lease is tracked in `WORKSPACE-ROADMAP.md` as a v3 work item. + +## Per-loop inventory + +The 15 loops live in `internal/scheduler/scheduler.go`. Each is a `func (s *Scheduler) Loop(ctx context.Context)` driven by a `time.Ticker`. The intra-process guard pattern is `sync/atomic.Bool` `CompareAndSwap(false, true)` at the top of the loop body — pre-Bundle-4 every loop already had this guard. Bundle 4 added the cross-replica classification below. + +| # | Loop | HA mode | Side-effect duplication risk under N>1 replicas | +|---|---|---|---| +| 1 | `renewalCheckLoop` | **Idempotent** — creates renewal jobs via `service.CheckExpiringCertificates`. | None. Duplicate ticks try to create the same `RenewalRequested` job; service-layer dedup (cert_id + status uniqueness window) collapses the second. Result: 2× CPU, 1× job, no data corruption. | +| 2 | `jobProcessorLoop` | **HA-safe** — `service.ProcessPendingJobs` ultimately calls `repository/postgres.JobRepository.ClaimPendingJobs` which uses `SELECT ... FOR UPDATE SKIP LOCKED`. | None. Postgres guarantees exactly-once row claim per tick across the replica set. | +| 3 | `jobRetryLoop` | **HA-safe** — `service.RetryFailedJobs` uses the same `ClaimPendingJobs` primitive (Bundle 1 audit fix H-6, commit `6cb4414`). | None. | +| 4 | `jobTimeoutLoop` | **HA-safe-ish** — `service.TimeoutStalledJobs` UPDATEs with `WHERE status = 'Running' AND started_at < $cutoff` inside a single statement. Two replicas may UPDATE the same row but the second UPDATE sees `Running → Failed` already applied and matches zero rows. | None. | +| 5 | `agentHealthCheckLoop` | **Idempotent** — UPDATEs `agents SET operational_status = 'Offline' WHERE last_heartbeat < $cutoff`. Two replicas running the same UPDATE land the same final state. | None. | +| 6 | `notificationProcessLoop` | **Duplicates** — reads pending notification queue, dispatches to Slack / PagerDuty / SMTP / Teams / OpsGenie, marks dispatched. The dispatch and the "mark dispatched" are not in a single transaction; two replicas can both dispatch the same notification before the mark lands. | **Duplicate webhook + email sends**. Bounded — at most N duplicates for N replicas — but operator-observable. | +| 7 | `notificationRetryLoop` | **Duplicates** — same shape as `notificationProcessLoop`. | Same as #6. | +| 8 | `shortLivedExpiryCheckLoop` | **Idempotent** — UPDATEs cert status to `Expired` based on `expires_at < NOW()`. Two replicas land the same status. | None. | +| 9 | `networkScanLoop` | **Idempotent** — invokes `service.NetworkScanService.ScanAllEnabledTargets` which iterates scan targets, probes each, and INSERTs discovered certs with `ON CONFLICT (fingerprint, agent_id, source_path) DO NOTHING`. | None on cert insertion. Duplicate TLS probes hit the operator's targets twice per tick. Operator may want to cap to 1 replica for low-egress-budget environments. | +| 10 | `digestLoop` | **Duplicates** — assembles the periodic digest email and dispatches via SMTP. Two replicas at the same digest tick both send. | **Duplicate digest emails**. | +| 11 | `healthCheckLoop` | **Idempotent** — runs the active TLS-fingerprint health-check sweep across deployed certs. Same idempotency story as #8. | None on state. Duplicate TLS probes to operator targets. | +| 12 | `cloudDiscoveryLoop` | **Duplicates the scan; idempotent on the result store** — fetches cert lists from AWS Secrets Manager / Azure Key Vault / GCP Secret Manager, INSERTs into discovered-certs with `ON CONFLICT DO NOTHING`. | **Duplicate AWS/Azure/GCP API calls** — bills operator cloud accounts 2× per tick on the discovery API surface. Storage stays clean. | +| 13 | `crlGenerationLoop` | **Duplicates the signing; last-writer wins on storage** — regenerates CRL DER blobs per issuer, writes to `certificate_revocation_lists` table with `UPDATE ... WHERE issuer_id = $1`. Two replicas sign two CRLs with two `thisUpdate` timestamps; the later UPDATE wins. | **Duplicate CA signing operations** (cost on HSM-backed issuers). CRL output is single-valued but the audit trail records both signings. | +| 14 | `acmeGCLoop` | **Idempotent** — DELETEs ACME nonce / authz / order rows older than the retention window. Two replicas race the same DELETEs; second one matches zero rows. | None. | +| 15 | `sessionGCLoop` | **Idempotent** — DELETEs expired session rows. Same shape as #14. | None. | + +## What Bundle 4 closes + +Bundle 4 does NOT introduce leader election. It introduces: + +1. **Documented HA truth table** (this page) — operators know exactly which loops are safe to multi-replica and which produce operator-observable duplicates. +2. **Migration HA** via `pg_advisory_lock` + `schema_migrations` audit table (see `internal/repository/postgres/db.go::RunMigrations`). Pre-Bundle-4 every replica race-ran all 45 migrations on boot. Post-Bundle-4 the first replica acquires the lock, applies migrations, populates `schema_migrations`, releases the lock. Subsequent replicas block at the lock, then observe the audit table and skip every already-applied file. +3. **Rate-limit scope statement** at `docs/operator/rate-limit-scope.md` — process-local per-replica, restart-safe. + +## What Bundle 4 does NOT close (deferred, tracked in WORKSPACE-ROADMAP.md) + +- **Leader election** for `notificationProcessLoop`, `notificationRetryLoop`, `digestLoop`, `cloudDiscoveryLoop`, `crlGenerationLoop`. The cleanest implementation is a per-loop `pg_try_advisory_lock(lock_id)` at the top of `runX` so only one replica per tick claims the work, with a small leader-renewal mechanic for long-running loops. This would close the four duplicate-side-effect cases above. v3 work item. +- **Shared rate limits across replicas**. See `docs/operator/rate-limit-scope.md`. + +## Operator guidance + +**Single-replica deployments (Helm `server.replicas: 1` — the chart default)**: all 15 loops work as documented. No action needed. + +**Multi-replica deployments**: review the four duplicate-side-effect loops above against your tolerance: + +- If your alerting fan-out can swallow duplicate webhooks (PagerDuty deduplicates by `dedup_key`, Slack does not), set `server.replicas > 1` and accept the duplication. +- If your CRL signing uses an HSM with per-operation cost, pin to single-replica until leader election lands. +- If you're running cloud discovery against billed AWS/Azure/GCP secret-manager APIs and you have a 6 h discovery interval, the doubling is bearable; at 30 min intervals it doubles your API spend. + +For any duplicate-side-effect class above, the operational mitigation is pinning `server.replicas: 1` and scaling vertically. The certctl-server process is CPU-bound on issuance and IO-bound on Postgres; a single replica handles substantial fleets when given enough cores + a fast database. + +## Source-of-truth references + +- Scheduler loops: `internal/scheduler/scheduler.go` (15 `Loop` functions, search `^func \(s \*Scheduler\) [a-zA-Z]+Loop`). +- Job claim primitive: `internal/repository/postgres/job.go::ClaimPendingJobs` (Bundle 1 H-6 closure, commit `6cb4414`). +- Migration HA: `internal/repository/postgres/db.go::RunMigrations` (Bundle 4 closure). +- Rate-limit scope: `docs/operator/rate-limit-scope.md`. +- Load-test scope: `deploy/test/loadtest/README.md` ("What it explicitly does NOT measure"). diff --git a/internal/repository/postgres/db.go b/internal/repository/postgres/db.go index 31ee403..56b9789 100644 --- a/internal/repository/postgres/db.go +++ b/internal/repository/postgres/db.go @@ -1,6 +1,7 @@ package postgres import ( + "context" "database/sql" "errors" "fmt" @@ -131,20 +132,97 @@ func wrapPingError(err error) error { return fmt.Errorf("failed to ping database: %w", err) } +// migrationAdvisoryLockID is the Postgres pg_advisory_lock key that +// gates concurrent migration execution. Computed as a stable hash of +// the literal string "certctl-migrations" so the same constant resolves +// across deployments without colliding with operator-supplied advisory +// locks for other workloads on the same database. +// +// Bundle 4 closure (HIGH-1 + D4 + finding 4, 2026-05-13): pre-Bundle-4 +// `RunMigrations` re-executed every `.up.sql` file on every server +// boot with no concurrency control. The "idempotency via IF NOT EXISTS +// / ON CONFLICT" contract in the CLAUDE.md operating rules held for +// every individual migration, but offered no protection when two +// server replicas executed RunMigrations against the same database +// simultaneously (the Helm chart's `server.replicas > 1` HA path): +// duplicate DDL races could still produce SQLSTATE 42P07 (relation +// already exists) under specific schema-modification interleavings, +// and any future migration that drifted from the IF-NOT-EXISTS +// pattern would race-corrupt without warning. +// +// Post-Bundle-4 RunMigrations acquires this advisory lock on a +// dedicated connection before scanning the directory; runner-up +// replicas block at pg_advisory_lock() until the migrator finishes, +// then observe the populated schema_migrations table and skip every +// already-applied file. Single-replica deploys see no behavior change +// beyond the schema_migrations audit trail and the lock acquire/ +// release log lines. +const migrationAdvisoryLockID int64 = 7283164759461502341 + +// migrationsTableDDL creates the audit-trail table that records which +// migration files have already been applied. Idempotent: subsequent +// boots see the table, scan its contents, and skip any .up.sql whose +// version (the filename, e.g. "000001_initial_schema.up.sql") is +// already present. +// +// The version column intentionally stores the FULL filename rather +// than a parsed version-number prefix. Migrations have always been +// filename-keyed; storing the raw name keeps the lookup mechanical +// and survives any future renames (a hash-based fingerprint would +// re-run every migration on rename, an explicit numeric version +// breaks if a migration is ever renamed without renumbering). +const migrationsTableDDL = ` +CREATE TABLE IF NOT EXISTS schema_migrations ( + version TEXT PRIMARY KEY, + applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +)` + // RunMigrations reads and executes SQL migration files from a directory. +// +// Bundle 4 closure: now wraps every migration execution in: +// +// 1. A `pg_advisory_lock(migrationAdvisoryLockID)` so only one +// server replica applies migrations at a time. Other replicas +// block at acquire and proceed once migrations finish. +// 2. A `schema_migrations` audit table so each subsequent boot +// skips already-applied files instead of re-executing them. +// This converts the implicit "idempotency via IF NOT EXISTS" +// contract into an explicit applied-versions ledger an operator +// or acquirer can audit with `SELECT * FROM schema_migrations +// ORDER BY applied_at`. +// +// Single-replica deploys see zero behavior change beyond the new +// audit-table population. Multi-replica deploys (Helm +// `server.replicas > 1`) get HA-safe schema bootstrap. +// +// The advisory lock and audit table are managed through a dedicated +// *sql.Conn so the lock survives even if the underlying pool rotates +// connections; the deferred unlock is `LOCAL` to that connection. +// Failure modes: +// +// - Migrations directory missing: returned unchanged from the +// pre-Bundle-4 error path so existing callers ergonomics hold. +// - Lock acquire fails (network / shutdown): wrapped with context +// so the operator-visible error names the wrapping operation. +// - Individual migration fails: aborts with the same %s wrap as +// pre-Bundle-4, leaving schema_migrations untouched for that +// migration so a retry rerun reattempts only the failing file. func RunMigrations(db *sql.DB, migrationsPath string) error { - // Check if migrations directory exists + // Check if migrations directory exists. if _, err := os.Stat(migrationsPath); os.IsNotExist(err) { return fmt.Errorf("migrations directory not found: %s", migrationsPath) } - // Read all SQL files from the migrations directory + // Read all SQL files from the migrations directory. files, err := os.ReadDir(migrationsPath) if err != nil { return fmt.Errorf("failed to read migrations directory: %w", err) } - // Sort and filter to only .up.sql migration files (skip .down.sql rollbacks and seed files) + // Sort and filter to only .up.sql migration files (skip .down.sql + // rollbacks and seed files). os.ReadDir returns entries in lexical + // order which matches the migration file naming convention + // (000001_, 000002_, ..., 000045_). var sqlFiles []string for _, file := range files { if !file.IsDir() && strings.HasSuffix(file.Name(), ".up.sql") { @@ -152,18 +230,89 @@ func RunMigrations(db *sql.DB, migrationsPath string) error { } } - // Execute each migration file in order + // Pin to a single connection for the lock-table-migrations + // lifecycle. pg_advisory_lock is connection-scoped: releasing it + // requires the SAME connection that acquired it. Using a pooled + // db.Exec for the unlock could land on a different connection and + // silently fail to release. + ctx := context.Background() + conn, err := db.Conn(ctx) + if err != nil { + return fmt.Errorf("failed to acquire migration connection: %w", err) + } + defer conn.Close() + + // Bundle 4: acquire the cross-replica advisory lock. Blocks until + // acquired; postgres queues the second replica behind the first. + if _, err := conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, migrationAdvisoryLockID); err != nil { + return fmt.Errorf("failed to acquire migration advisory lock: %w", err) + } + defer func() { + // Best-effort release on the same connection. The connection + // will close anyway via the defer above, which also releases + // the lock at the postgres backend. + _, _ = conn.ExecContext(ctx, `SELECT pg_advisory_unlock($1)`, migrationAdvisoryLockID) + }() + + // Bundle 4: ensure the audit table exists before consulting it. + // IF NOT EXISTS keeps this idempotent on every boot. + if _, err := conn.ExecContext(ctx, migrationsTableDDL); err != nil { + return fmt.Errorf("failed to ensure schema_migrations table: %w", err) + } + + // Read the set of already-applied versions. We hold the advisory + // lock through the entire SELECT → apply → INSERT cycle, so no + // other replica can race-INSERT a duplicate row between this read + // and our subsequent writes. + applied := make(map[string]struct{}) + rows, err := conn.QueryContext(ctx, `SELECT version FROM schema_migrations`) + if err != nil { + return fmt.Errorf("failed to read schema_migrations: %w", err) + } + for rows.Next() { + var v string + if err := rows.Scan(&v); err != nil { + rows.Close() + return fmt.Errorf("failed to scan schema_migrations row: %w", err) + } + applied[v] = struct{}{} + } + if err := rows.Err(); err != nil { + return fmt.Errorf("schema_migrations scan error: %w", err) + } + rows.Close() + + // Execute each migration file in order, skipping already-applied + // versions. for _, filename := range sqlFiles { + if _, ok := applied[filename]; ok { + // Already applied on a prior boot; skip. + continue + } filePath := filepath.Join(migrationsPath, filename) content, err := os.ReadFile(filePath) if err != nil { return fmt.Errorf("failed to read migration file %s: %w", filename, err) } - // Execute the SQL content - if _, err := db.Exec(string(content)); err != nil { + // Execute the SQL content on the locked connection so any + // session-scoped state set by the migration (e.g. SET ROLE) + // stays within the migration's session. + if _, err := conn.ExecContext(ctx, string(content)); err != nil { return fmt.Errorf("failed to execute migration %s: %w", filename, err) } + + // Record the applied version. ON CONFLICT (version) DO NOTHING + // is defense-in-depth: even though we hold the advisory lock + // (so no concurrent migrator should be writing here), a stale + // `schema_migrations` row from a half-rolled-back deploy + // shouldn't crash the boot. + if _, err := conn.ExecContext(ctx, + `INSERT INTO schema_migrations (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, + filename, + ); err != nil { + return fmt.Errorf("failed to record applied migration %s in schema_migrations: %w", filename, err) + } } return nil diff --git a/internal/repository/postgres/migrations_test.go b/internal/repository/postgres/migrations_test.go new file mode 100644 index 0000000..8462dbc --- /dev/null +++ b/internal/repository/postgres/migrations_test.go @@ -0,0 +1,216 @@ +// Bundle 4 closure (HIGH-1 + D4 + finding 4, 2026-05-13): tests for the +// schema_migrations audit table + pg_advisory_lock concurrency control +// added to RunMigrations in db.go. +// +// Pre-Bundle-4 every .up.sql ran on every boot with no concurrency +// control. The "IF NOT EXISTS / ON CONFLICT idempotency contract" held +// for individual migrations but two replicas could race on the +// schema-modification phase. These tests pin the post-Bundle-4 +// contract: the audit table is populated, repeated calls skip applied +// migrations, and the advisory lock prevents concurrent execution. +// +// Gated by testcontainers — skipped under -short. The integration lane +// invokes the full test set via the testcontainers harness. + +package postgres_test + +import ( + "context" + "database/sql" + "sync" + "testing" + "time" + + "github.com/certctl-io/certctl/internal/repository/postgres" +) + +// TestRunMigrations_PopulatesSchemaMigrations pins that the audit +// table is created and populated. Pre-Bundle-4 the table did not +// exist; this test would fail on a pre-Bundle-4 binary at the +// `SELECT version FROM schema_migrations` step (relation does not +// exist), confirming the closure ships. +func TestRunMigrations_PopulatesSchemaMigrations(t *testing.T) { + tdb := getTestDB(t) + db := tdb.freshSchema(t) + ctx := context.Background() + + // freshSchema already ran the migrations once via the test-local + // runner. The schema_migrations table must exist and be + // non-empty. + var count int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM schema_migrations`, + ).Scan(&count); err != nil { + t.Fatalf("schema_migrations table missing or unreadable: %v", err) + } + if count == 0 { + t.Fatalf("schema_migrations row count = 0; expected at least 1 (Bundle 4 audit-trail contract)") + } + + // Sanity: every row's version ends in .up.sql (we store the + // full filename, not a parsed number). + rows, err := db.QueryContext(ctx, `SELECT version FROM schema_migrations`) + if err != nil { + t.Fatalf("query schema_migrations: %v", err) + } + defer rows.Close() + for rows.Next() { + var v string + if err := rows.Scan(&v); err != nil { + t.Fatalf("scan: %v", err) + } + if len(v) < len(".up.sql") || v[len(v)-len(".up.sql"):] != ".up.sql" { + t.Errorf("schema_migrations.version=%q does not end in .up.sql; the Bundle 4 closure stores filenames verbatim so the lookup is mechanical", v) + } + } +} + +// TestRunMigrations_SkipsAppliedOnSecondCall pins the skip-applied +// contract. After freshSchema ran migrations once, calling RunMigrations +// again MUST be a near-no-op: every file is already in +// schema_migrations, so the migrator loops, sees each file in the +// applied map, and skips the file-read + db.Exec. The fastest way to +// witness this is to time the second call and assert it's measurably +// faster than the first — but freshSchema doesn't expose the timing of +// the first run, so we go with a behavior pin instead: counting the +// schema_migrations rows before and after the second call must show no +// change. +func TestRunMigrations_SkipsAppliedOnSecondCall(t *testing.T) { + tdb := getTestDB(t) + db := tdb.freshSchema(t) + ctx := context.Background() + + // Snapshot row count (post-first-migration via freshSchema). + var before int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM schema_migrations`, + ).Scan(&before); err != nil { + t.Fatalf("snapshot row count: %v", err) + } + if before == 0 { + t.Fatalf("pre-call snapshot=0; expected at least 1 (Bundle 4 contract)") + } + + migrationsPath := findMigrationsDir() + if err := postgres.RunMigrations(db, migrationsPath); err != nil { + t.Fatalf("RunMigrations second-call: %v (Bundle 4 contract: every applied file must be in schema_migrations and skipped)", err) + } + + var after int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM schema_migrations`, + ).Scan(&after); err != nil { + t.Fatalf("post-call row count: %v", err) + } + if after != before { + t.Errorf("schema_migrations row count changed: before=%d after=%d; the second RunMigrations call must skip every already-applied file", before, after) + } +} + +// TestRunMigrations_ConcurrentCallsSerialized pins the +// pg_advisory_lock contract. Two goroutines call RunMigrations +// concurrently against the same database; both must return without +// error and the final state must be identical to a single call. The +// advisory lock guarantees Postgres-side serialization — without it, +// the parallel callers would both attempt to run migrations against +// a non-locked database and race on DDL. The lock makes one of them +// run to completion + populate schema_migrations, then the other +// wakes up, observes the populated table, and skips everything. +// +// This test does not directly measure that pg_advisory_lock was +// acquired — that would require either a SQL-level inspection of +// pg_locks (racy timing) or instrumenting db.go for testing (intrusive). +// Instead it pins the observable end-state: no error, no duplicate +// rows, no extra rows. +func TestRunMigrations_ConcurrentCallsSerialized(t *testing.T) { + tdb := getTestDB(t) + // Use a single freshSchema so both goroutines target the same + // schema (test isolation is per-test, not per-goroutine). + db := tdb.freshSchema(t) + ctx := context.Background() + + var before int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM schema_migrations`, + ).Scan(&before); err != nil { + t.Fatalf("snapshot row count: %v", err) + } + + migrationsPath := findMigrationsDir() + + // Launch two concurrent RunMigrations calls. + const goroutines = 2 + errCh := make(chan error, goroutines) + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + errCh <- postgres.RunMigrations(db, migrationsPath) + }() + } + wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + t.Errorf("RunMigrations concurrent call returned %v; the Bundle 4 advisory lock must serialize concurrent migrators without error", err) + } + } + + var after int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM schema_migrations`, + ).Scan(&after); err != nil { + t.Fatalf("post-concurrent row count: %v", err) + } + if after != before { + t.Errorf("schema_migrations row count changed after concurrent calls: before=%d after=%d; concurrent migrators must both observe the populated audit table and skip", before, after) + } + + // Defensive: no duplicate version rows. + var dupCount int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM (SELECT version, COUNT(*) c FROM schema_migrations GROUP BY version HAVING COUNT(*) > 1) d`, + ).Scan(&dupCount); err != nil { + t.Fatalf("duplicate-version query: %v", err) + } + if dupCount != 0 { + t.Errorf("schema_migrations has %d duplicate version rows; the PRIMARY KEY + advisory-lock contract should prevent any duplicates", dupCount) + } +} + +// TestRunMigrations_PingsConnection is a smoke test that confirms the +// Bundle 4 changes did not break the basic happy path. The pre-existing +// RunSeed_AppliesIdempotently test already covers post-migration +// behavior; this test pins that the lock + table-create + version +// scan + apply path completes without error against an empty +// database. +func TestRunMigrations_FreshDatabaseHappyPath(t *testing.T) { + tdb := getTestDB(t) + db := tdb.freshSchema(t) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := db.PingContext(ctx); err != nil { + t.Fatalf("post-migration ping failed: %v", err) + } + + // At least 30 migrations ship in the v2.1.0-pre tree (the test + // asserts a floor, not an exact count, so this stays robust as + // future migrations land). + var count int + if err := db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM schema_migrations`, + ).Scan(&count); err != nil { + t.Fatalf("schema_migrations count: %v", err) + } + if count < 30 { + t.Errorf("schema_migrations count = %d; expected ≥ 30 (Bundle 4 ledger should be populated from the migrations/ directory)", count) + } +} + +// witness: keep the database/sql import alive on this file so the +// build tagger doesn't whine about unused imports if the test bodies +// stop using the type alias. +var _ = sql.ErrNoRows