diff --git a/internal/integration/ratelimit_multi_replica_test.go b/internal/integration/ratelimit_multi_replica_test.go new file mode 100644 index 0000000..4a4d29e --- /dev/null +++ b/internal/integration/ratelimit_multi_replica_test.go @@ -0,0 +1,195 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +//go:build integration + +package integration + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" + + _ "github.com/lib/pq" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + + "github.com/certctl-io/certctl/internal/ratelimit" +) + +// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit +// ARCH-M1) — the falsifiable closure proof for cross-replica rate-limit +// consistency. +// +// Scenario: +// - ONE postgres container (representing the shared backend). +// - N=3 independent *PostgresSlidingWindowLimiter instances pointing +// at it (representing 3 server replicas — each replica's process +// has its own constructed limiter, but they all share the same +// database state). +// - 100 concurrent Allow("test-key") calls spread across the 3 +// limiters via sync.WaitGroup. +// - Assert: exactly 10 succeed + 90 return ErrRateLimited. +// +// If the postgres backend's SELECT FOR UPDATE serialization weren't +// arbitrating across the 3 limiters, more than 10 calls would be +// allowed (each replica would independently let through 10/3 ≈ 4 +// requests, giving ~12-15 successes depending on scheduling). The +// hard-pass on exactly-10 is what makes ARCH-M1 closure substantive +// rather than wishful. +// +// Gated by //go:build integration matching the rest of +// internal/integration/. Sprint 13.3 promotes this test to a +// required CI status check. + +func TestRateLimit_PostgresBackend_CapEnforcedAcrossReplicas(t *testing.T) { + const ( + replicas = 3 + cap = 10 + window = 1 * time.Minute + concurrentReq = 100 + key = "test-key" + ) + + ctx := context.Background() + + // Boot a shared postgres container. + container, dsn := startPostgresContainer(ctx, t) + t.Cleanup(func() { _ = container.Terminate(context.Background()) }) + + // Each "replica" gets its own *sql.DB pool — same database, different + // connection pool — matching how N server processes would each open + // their own pool to the same control-plane database. + dbs := make([]*sql.DB, replicas) + for i := 0; i < replicas; i++ { + db, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatalf("open db (replica %d): %v", i, err) + } + db.SetMaxOpenConns(8) + if err := db.Ping(); err != nil { + t.Fatalf("ping (replica %d): %v", i, err) + } + t.Cleanup(func() { db.Close() }) + dbs[i] = db + } + + // Apply the rate_limit_buckets migration via dbs[0]. All replicas + // see the same schema since they share the same database. + migPath := findMigrationFromHere("000046_rate_limit_buckets.up.sql") + body, err := os.ReadFile(migPath) + if err != nil { + t.Fatalf("read migration: %v", err) + } + if _, err := dbs[0].ExecContext(ctx, string(body)); err != nil { + t.Fatalf("apply migration: %v", err) + } + + // Instantiate one limiter per replica. + limiters := make([]*ratelimit.PostgresSlidingWindowLimiter, replicas) + for i := 0; i < replicas; i++ { + limiters[i] = ratelimit.NewPostgresSlidingWindowLimiter(dbs[i], cap, window) + } + + // Fire concurrentReq parallel Allow calls, round-robining across the + // replicas. Each call uses the SAME key + a SHARED `now` so the + // scenario is deterministic. The cross-replica row lock is what + // enforces the cap globally. + var ( + allowed int64 + denied int64 + wg sync.WaitGroup + ) + now := time.Now() + for i := 0; i < concurrentReq; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + l := limiters[idx%replicas] + err := l.Allow(key, now) + if err == nil { + atomic.AddInt64(&allowed, 1) + } else if errors.Is(err, ratelimit.ErrRateLimited) { + atomic.AddInt64(&denied, 1) + } else { + t.Errorf("unexpected error from Allow: %v", err) + } + }(i) + } + wg.Wait() + + gotAllowed := atomic.LoadInt64(&allowed) + gotDenied := atomic.LoadInt64(&denied) + + t.Logf("replicas=%d cap=%d concurrent=%d → allowed=%d denied=%d", + replicas, cap, concurrentReq, gotAllowed, gotDenied) + + if gotAllowed != int64(cap) { + t.Errorf("allowed = %d, want exactly %d (cross-replica row lock should serialize Allow calls so exactly cap succeed)", + gotAllowed, cap) + } + if gotDenied != int64(concurrentReq-cap) { + t.Errorf("denied = %d, want %d (concurrentReq - cap)", gotDenied, concurrentReq-cap) + } +} + +// ---------------------------------------------------------------- +// Local testcontainers harness. Kept in-file because the rest of +// internal/integration/ uses HTTP-against-running-server smoke tests +// against a docker-compose stack — different shape from ours. +// ---------------------------------------------------------------- + +func startPostgresContainer(ctx context.Context, t *testing.T) (testcontainers.Container, string) { + t.Helper() + + req := testcontainers.ContainerRequest{ + Image: "postgres:16-alpine", + ExposedPorts: []string{"5432/tcp"}, + Env: map[string]string{ + "POSTGRES_DB": "certctl_test", + "POSTGRES_USER": "certctl", + "POSTGRES_PASSWORD": "certctl", + }, + WaitingFor: wait.ForLog("database system is ready to accept connections").WithOccurrence(2), + } + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("start postgres container: %v", err) + } + + host, err := container.Host(ctx) + if err != nil { + t.Fatalf("container host: %v", err) + } + port, err := container.MappedPort(ctx, "5432") + if err != nil { + t.Fatalf("container port: %v", err) + } + dsn := fmt.Sprintf("postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable", + host, port.Port()) + return container, dsn +} + +func findMigrationFromHere(filename string) string { + _, here, _, _ := runtime.Caller(0) + dir := filepath.Dir(here) + for i := 0; i < 6; i++ { + candidate := filepath.Join(dir, "migrations", filename) + if _, err := os.Stat(candidate); err == nil { + return candidate + } + dir = filepath.Dir(dir) + } + return "" +} diff --git a/internal/ratelimit/equivalence_test.go b/internal/ratelimit/equivalence_test.go new file mode 100644 index 0000000..6679bd9 --- /dev/null +++ b/internal/ratelimit/equivalence_test.go @@ -0,0 +1,412 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package ratelimit_test + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "path/filepath" + "runtime" + "strings" + "sync" + "testing" + "time" + + _ "github.com/lib/pq" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" + + "github.com/certctl-io/certctl/internal/ratelimit" +) + +// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit +// ARCH-M1): backend-equivalence test suite. Runs the same scenario +// surface against both backends (in-memory + postgres) via the shared +// Limiter interface — if the postgres backend's caller-visible +// semantics drift from the memory backend's, this file fails first. +// +// Mirrors the white-box test names in sliding_window_test.go: every +// public-surface behavior pinned there (cap, expiry, disabled bypass, +// empty-key short-circuit, concurrency) gets re-pinned here for the +// postgres backend. +// +// Postgres tests skip under -short (matches the pattern in +// internal/repository/postgres/testutil_test.go); CI's +// `go test -race -short -count=1 ./...` exercises only the memory +// half. The integration job runs the full suite. + +// ---------------------------------------------------------------- +// Backend-equivalence helpers +// ---------------------------------------------------------------- + +// limiterFactory builds a fresh Limiter for one test case. +// Memory backends discard `db`; postgres backends use it. +type limiterFactory func(t *testing.T, db *sql.DB, maxN int, window time.Duration) ratelimit.Limiter + +func memoryFactory(t *testing.T, _ *sql.DB, maxN int, window time.Duration) ratelimit.Limiter { + t.Helper() + // Map cap of 10_000 — large enough that none of the equivalence + // scenarios trip the LRU-eviction branch (the eviction branch is + // memory-specific; postgres has no equivalent so it's not part of + // the cross-backend contract). + return ratelimit.NewSlidingWindowLimiter(maxN, window, 10_000) +} + +func postgresFactory(t *testing.T, db *sql.DB, maxN int, window time.Duration) ratelimit.Limiter { + t.Helper() + if db == nil { + t.Fatal("postgresFactory requires a non-nil *sql.DB") + } + return ratelimit.NewPostgresSlidingWindowLimiter(db, maxN, window) +} + +// ---------------------------------------------------------------- +// Per-backend test entry points +// ---------------------------------------------------------------- + +func TestSlidingWindowLimiter_Equivalence_Memory(t *testing.T) { + t.Run("AllowsUpToCap", func(t *testing.T) { caseAllowsUpToCap(t, memoryFactory, nil) }) + t.Run("DistinctKeysIndependent", func(t *testing.T) { caseDistinctKeysIndependent(t, memoryFactory, nil) }) + t.Run("WindowExpiry", func(t *testing.T) { caseWindowExpiry(t, memoryFactory, nil) }) + t.Run("DisabledBypass", func(t *testing.T) { caseDisabledBypass(t, memoryFactory, nil) }) + t.Run("NegativeCapDisabled", func(t *testing.T) { caseNegativeCapDisabled(t, memoryFactory, nil) }) + t.Run("EmptyKeyShortCircuits", func(t *testing.T) { caseEmptyKeyShortCircuits(t, memoryFactory, nil) }) + t.Run("ConcurrentRaceFree", func(t *testing.T) { + if testing.Short() { + t.Skip("race-style test under -short") + } + caseConcurrentRaceFree(t, memoryFactory, nil) + }) +} + +func TestSlidingWindowLimiter_Equivalence_Postgres(t *testing.T) { + if testing.Short() { + t.Skip("postgres equivalence tests require testcontainers; skipped under -short") + } + tdb := setupTestDB(t) + defer tdb.teardown(t) + + t.Run("AllowsUpToCap", func(t *testing.T) { + db := tdb.freshSchema(t, "AllowsUpToCap") + caseAllowsUpToCap(t, postgresFactory, db) + }) + t.Run("DistinctKeysIndependent", func(t *testing.T) { + db := tdb.freshSchema(t, "DistinctKeysIndependent") + caseDistinctKeysIndependent(t, postgresFactory, db) + }) + t.Run("WindowExpiry", func(t *testing.T) { + db := tdb.freshSchema(t, "WindowExpiry") + caseWindowExpiry(t, postgresFactory, db) + }) + t.Run("DisabledBypass", func(t *testing.T) { + db := tdb.freshSchema(t, "DisabledBypass") + caseDisabledBypass(t, postgresFactory, db) + }) + t.Run("NegativeCapDisabled", func(t *testing.T) { + db := tdb.freshSchema(t, "NegativeCapDisabled") + caseNegativeCapDisabled(t, postgresFactory, db) + }) + t.Run("EmptyKeyShortCircuits", func(t *testing.T) { + db := tdb.freshSchema(t, "EmptyKeyShortCircuits") + caseEmptyKeyShortCircuits(t, postgresFactory, db) + }) + t.Run("ConcurrentRaceFree", func(t *testing.T) { + db := tdb.freshSchema(t, "ConcurrentRaceFree") + caseConcurrentRaceFree(t, postgresFactory, db) + }) +} + +// ---------------------------------------------------------------- +// Backend-agnostic test cases (one per behavior pinned in +// sliding_window_test.go's public-surface tests) +// ---------------------------------------------------------------- + +func caseAllowsUpToCap(t *testing.T, mk limiterFactory, db *sql.DB) { + l := mk(t, db, 3, 24*time.Hour) + now := time.Now() + for i := 0; i < 3; i++ { + if err := l.Allow("k", now.Add(time.Duration(i)*time.Minute)); err != nil { + t.Fatalf("call %d should be allowed: %v", i+1, err) + } + } + if err := l.Allow("k", now.Add(4*time.Minute)); !errors.Is(err, ratelimit.ErrRateLimited) { + t.Fatalf("4th call should be rate-limited; got %v", err) + } +} + +func caseDistinctKeysIndependent(t *testing.T, mk limiterFactory, db *sql.DB) { + l := mk(t, db, 1, 24*time.Hour) + now := time.Now() + + if err := l.Allow("k-1", now); err != nil { + t.Fatalf("first allow: %v", err) + } + if err := l.Allow("k-2", now); err != nil { + t.Fatalf("different key must have its own bucket: %v", err) + } + if err := l.Allow("k-1", now.Add(1*time.Second)); !errors.Is(err, ratelimit.ErrRateLimited) { + t.Fatalf("repeat key should be limited; got %v", err) + } +} + +func caseWindowExpiry(t *testing.T, mk limiterFactory, db *sql.DB) { + l := mk(t, db, 2, 1*time.Hour) + now := time.Now() + + if err := l.Allow("k", now); err != nil { + t.Fatal(err) + } + if err := l.Allow("k", now.Add(30*time.Minute)); err != nil { + t.Fatal(err) + } + // Inside window — limited. + if err := l.Allow("k", now.Add(45*time.Minute)); !errors.Is(err, ratelimit.ErrRateLimited) { + t.Fatalf("inside-window 3rd call should be limited: %v", err) + } + // Past window — slots reopen. + if err := l.Allow("k", now.Add(2*time.Hour)); err != nil { + t.Fatalf("past-window call should be allowed (window reset): %v", err) + } +} + +func caseDisabledBypass(t *testing.T, mk limiterFactory, db *sql.DB) { + l := mk(t, db, 0, 24*time.Hour) // maxN=0 → disabled + type disablable interface { + Disabled() bool + } + if d, ok := l.(disablable); ok && !d.Disabled() { + t.Fatal("limiter with maxN=0 must report Disabled()=true") + } + now := time.Now() + for i := 0; i < 100; i++ { + if err := l.Allow("k", now); err != nil { + t.Fatalf("disabled limiter must allow everything: %v", err) + } + } +} + +func caseNegativeCapDisabled(t *testing.T, mk limiterFactory, db *sql.DB) { + l := mk(t, db, -1, 24*time.Hour) + type disablable interface { + Disabled() bool + } + if d, ok := l.(disablable); ok && !d.Disabled() { + t.Fatal("negative maxN must produce a disabled limiter") + } + now := time.Now() + if err := l.Allow("k", now); err != nil { + t.Fatalf("disabled limiter must allow: %v", err) + } +} + +func caseEmptyKeyShortCircuits(t *testing.T, mk limiterFactory, db *sql.DB) { + // Empty key is the caller's defense-in-depth case — caller's + // validation upstream should reject empty-key events first. Limiter + // must not build a single shared bucket keyed by empty-key — that + // would be a chokepoint for every empty-key event. + l := mk(t, db, 1, 24*time.Hour) + now := time.Now() + for i := 0; i < 50; i++ { + if err := l.Allow("", now); err != nil { + t.Fatalf("empty key must short-circuit (call %d): %v", i, err) + } + } +} + +func caseConcurrentRaceFree(t *testing.T, mk limiterFactory, db *sql.DB) { + l := mk(t, db, 50, 24*time.Hour) + var wg sync.WaitGroup + for g := 0; g < 20; g++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + now := time.Now() + key := fmt.Sprintf("k-%d", id) + for i := 0; i < 30; i++ { + _ = l.Allow(key, now) + } + }(g) + } + wg.Wait() +} + +// ---------------------------------------------------------------- +// Postgres-only testcontainers harness — mirrors +// internal/repository/postgres/testutil_test.go's setupTestDB + +// freshSchema pattern. +// ---------------------------------------------------------------- + +type testDB struct { + db *sql.DB + container testcontainers.Container +} + +func setupTestDB(t *testing.T) *testDB { + t.Helper() + ctx := context.Background() + + req := testcontainers.ContainerRequest{ + Image: "postgres:16-alpine", + ExposedPorts: []string{"5432/tcp"}, + Env: map[string]string{ + "POSTGRES_DB": "certctl_test", + "POSTGRES_USER": "certctl", + "POSTGRES_PASSWORD": "certctl", + }, + WaitingFor: wait.ForLog("database system is ready to accept connections").WithOccurrence(2), + } + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatalf("start postgres container: %v", err) + } + + host, err := container.Host(ctx) + if err != nil { + t.Fatalf("container host: %v", err) + } + port, err := container.MappedPort(ctx, "5432") + if err != nil { + t.Fatalf("container port: %v", err) + } + + connStr := fmt.Sprintf("postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable", host, port.Port()) + db, err := sql.Open("postgres", connStr) + if err != nil { + t.Fatalf("open db: %v", err) + } + // Pool size > 1 so the multi-goroutine concurrency case can hold + // multiple connections simultaneously; the row-lock arbitrates. + db.SetMaxOpenConns(8) + + if err := db.Ping(); err != nil { + t.Fatalf("ping: %v", err) + } + + return &testDB{db: db, container: container} +} + +func (tdb *testDB) teardown(t *testing.T) { + t.Helper() + if tdb.db != nil { + tdb.db.Close() + } + if tdb.container != nil { + _ = tdb.container.Terminate(context.Background()) + } +} + +// freshSchema creates an isolated schema per test case + runs the +// rate_limit_buckets migration inside it. Returns a *sql.DB whose +// search_path is scoped to the new schema. +// +// Note: this helper takes a sub-test label (caller-supplied) so the +// schema name is deterministic-per-case + stable across runs. The +// canonical postgres testutil uses t.Name() but we're inside Run- +// nested subtests where t.Name() includes "/" — flatten it. +func (tdb *testDB) freshSchema(t *testing.T, label string) *sql.DB { + t.Helper() + schema := sanitizeSchemaName(label + "_" + t.Name()) + ctx := context.Background() + + // One connection-scoped session so SET search_path persists. + conn, err := tdb.db.Conn(ctx) + if err != nil { + t.Fatalf("acquire conn: %v", err) + } + + if _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", schema)); err != nil { + t.Fatalf("create schema: %v", err) + } + if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET search_path TO %s, public", schema)); err != nil { + t.Fatalf("set search_path: %v", err) + } + + // Run the rate_limit_buckets migration in this schema. The migration + // is the only one that introduces our table; other migrations don't + // matter for limiter behavior. + migPath := findMigration("000046_rate_limit_buckets.up.sql") + body, err := os.ReadFile(migPath) + if err != nil { + t.Fatalf("read migration: %v", err) + } + if _, err := conn.ExecContext(ctx, string(body)); err != nil { + t.Fatalf("apply migration: %v", err) + } + + t.Cleanup(func() { + conn.ExecContext(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema)) + conn.Close() + }) + + // Wrap the single connection in a *sql.DB-like by returning a fresh + // pool that goes through the same search_path. Simpler: just return + // the underlying *sql.DB and SET search_path session-wide by re- + // running the SET on every checkout. The cleanest move is to use + // the per-connection helper: return a *sql.DB that's actually a + // "limited to N=1 connection with search_path pinned" handle. + // + // Workaround the easy way: build a fresh *sql.DB whose dsn embeds + // search_path as a connection-time setting, so every connection + // auto-applies it. + dsn := connDSNWithSearchPath(tdb, schema) + scoped, err := sql.Open("postgres", dsn) + if err != nil { + t.Fatalf("open scoped db: %v", err) + } + scoped.SetMaxOpenConns(8) + t.Cleanup(func() { scoped.Close() }) + + // Sanity: row exists / table exists. + if _, err := scoped.ExecContext(ctx, "SELECT 1 FROM rate_limit_buckets LIMIT 1"); err != nil && !strings.Contains(err.Error(), "no rows") { + // Empty table is fine; only a missing-table error matters. + // "no rows" never fires here (we used Exec not Query). + t.Fatalf("smoke select: %v", err) + } + + return scoped +} + +func connDSNWithSearchPath(tdb *testDB, schema string) string { + // Derive the DSN by introspection of the container's host/port. + // Couldn't pre-store because freshSchema can be called many times. + ctx := context.Background() + host, _ := tdb.container.Host(ctx) + port, _ := tdb.container.MappedPort(ctx, "5432") + return fmt.Sprintf( + "postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable&search_path=%s,public", + host, port.Port(), schema, + ) +} + +func sanitizeSchemaName(name string) string { + name = strings.ToLower(name) + for _, ch := range []string{"/", " ", "-", "."} { + name = strings.ReplaceAll(name, ch, "_") + } + if len(name) > 50 { + name = name[:50] + } + return "test_rl_" + name +} + +func findMigration(filename string) string { + _, here, _, _ := runtime.Caller(0) + // here = .../internal/ratelimit/equivalence_test.go + // migrations = .../migrations + dir := filepath.Dir(here) + for i := 0; i < 6; i++ { + candidate := filepath.Join(dir, "migrations", filename) + if _, err := os.Stat(candidate); err == nil { + return candidate + } + dir = filepath.Dir(dir) + } + return "" +} diff --git a/internal/ratelimit/limiter.go b/internal/ratelimit/limiter.go new file mode 100644 index 0000000..80fb877 --- /dev/null +++ b/internal/ratelimit/limiter.go @@ -0,0 +1,54 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package ratelimit + +import "time" + +// Limiter is the rate-limit primitive every caller in cmd/server + +// internal/api/handler + internal/service consumes. Two backends +// satisfy this interface: +// +// - SlidingWindowLimiter (in-memory; the historical default; +// declared in sliding_window.go). +// - PostgresSlidingWindowLimiter (cross-replica-consistent; +// declared in postgres_sliding_window.go; introduced in Phase 13 +// Sprint 13.2 for the ARCH-M1 substantive close). +// +// Sprint 13.3 (next) wires every call site through the operator- +// chosen backend via the CERTCTL_RATELIMIT_BACKEND={memory,postgres} +// env var. Until then, both backends compile + tests for both pass, +// but the production call sites still construct SlidingWindowLimiter +// directly. +// +// Sprint 13.2 signature note: the prompt template specified +// `Allow(key string) error`, but the actual repo signature has been +// `Allow(key string, now time.Time) error` since the EST RFC 7030 +// hardening master bundle Phase 4.1 — the `now` parameter is what +// makes the memory limiter testable against synthetic time. The +// interface matches the actual signature so the existing +// SlidingWindowLimiter satisfies Limiter without a method-set change. +// +// Per CLAUDE.md "the repo is truth" principle, code grounded against +// the live signature (not the prompt's draft). +type Limiter interface { + // Allow records a request at the given key/time and returns + // ErrRateLimited if the configured cap is exceeded inside the + // configured window. nil otherwise. + // + // Empty `key` short-circuits to nil (caller's defense-in-depth; + // caller upstream validation should reject empty-key events + // first — building a single shared bucket keyed by empty-key + // would be a chokepoint for every empty-key event). + // + // Disabled limiters (maxN <= 0) return nil for every call. + Allow(key string, now time.Time) error +} + +// Compile-time interface satisfaction checks. Drift in either +// backend's Allow signature fails the build at this file before any +// caller breaks. +var ( + _ Limiter = (*SlidingWindowLimiter)(nil) + _ Limiter = (*PostgresSlidingWindowLimiter)(nil) +) diff --git a/internal/ratelimit/postgres_sliding_window.go b/internal/ratelimit/postgres_sliding_window.go new file mode 100644 index 0000000..8ff6b74 --- /dev/null +++ b/internal/ratelimit/postgres_sliding_window.go @@ -0,0 +1,204 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package ratelimit + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/lib/pq" +) + +// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit +// ARCH-M1): the cross-replica-consistent rate-limit backend. Same +// algorithm as SlidingWindowLimiter (prune-on-Allow sliding-window log) +// but the state lives in postgres so N replicas see the same per-key +// bucket. Replaces the per-process in-memory limit when the operator +// sets CERTCTL_RATELIMIT_BACKEND=postgres (wired in Sprint 13.3). +// +// Algorithm +// ========= +// Each Allow call runs a single BEGIN/COMMIT transaction: +// +// 1. INSERT ... ON CONFLICT (bucket_key) DO NOTHING — ensure the +// row exists so the SELECT FOR UPDATE below has something to lock. +// 2. SELECT timestamps FROM rate_limit_buckets WHERE bucket_key=$1 +// FOR UPDATE — acquire the per-key row lock for the rest of the +// transaction. +// 3. Prune timestamps older than (now - window) in Go (reusing the +// unexported pruneOlderThan helper shared with SlidingWindowLimiter +// — single source of truth for the prune semantics). +// 4. If cardinality(pruned) >= maxN: persist the pruned state without +// appending, COMMIT, return ErrRateLimited. +// 5. Else: append `now`, persist, COMMIT, return nil. +// +// SELECT FOR UPDATE serializes Allow calls for the same key across +// replicas: replicas A and B firing simultaneous Allow("k") never +// race because Postgres' row-lock arbitrates. This is the entire +// reason for the close — the memory backend's sync.Mutex only +// arbitrates within a process; pg's row lock arbitrates the cluster. +// +// Why a transaction (not a single CTE) +// ==================================== +// A "compute everything in one SQL statement" approach using +// INSERT ... ON CONFLICT DO UPDATE SET timestamps = CASE WHEN ... is +// possible but the conditional logic to gate the append on the +// pruned-cardinality requires nested CTEs whose check-then-act +// semantics are hard to read + harder to convince yourself are +// race-free across all isolation levels. The explicit transaction +// version above is correct under READ COMMITTED (Postgres' default), +// matches the memory backend's read-decide-write shape line-for-line, +// and shares the same prune helper. Two extra round-trips per Allow +// vs one is acceptable for the rate-limit hot path — the operation +// is gated anyway. +// +// Sprint 13.3 will wire the scheduler janitor loop that GCs rows +// whose updated_at is older than the longest configured window; the +// migration ships the supporting btree index on updated_at. + +// PostgresSlidingWindowLimiter implements Limiter against the +// rate_limit_buckets table introduced in migration 000046. +// +// Constructed via NewPostgresSlidingWindowLimiter. The zero value is +// NOT usable — the db handle is required. +// +// Concurrency: safe for concurrent Allow calls across goroutines AND +// across N replicas (the underlying SELECT FOR UPDATE serializes +// per-key access across the cluster). +type PostgresSlidingWindowLimiter struct { + db *sql.DB + maxN int + window time.Duration + disabled bool // maxN <= 0 → all Allow calls return nil +} + +// NewPostgresSlidingWindowLimiter returns a limiter with the given +// per-key cap + window. maxN <= 0 disables the limiter (all Allow +// calls return nil); matches the memory backend's opt-out semantics +// for test harnesses + sketchpad deploys. +// +// Window defaults to 24h when zero, mirroring SlidingWindowLimiter. +// +// The db argument is required + must outlive the limiter. Construction +// itself does NOT touch the database — DDL is owned by migration +// 000046_rate_limit_buckets.up.sql which runs at boot via +// cmd/server's RunMigrations path. +func NewPostgresSlidingWindowLimiter(db *sql.DB, maxN int, window time.Duration) *PostgresSlidingWindowLimiter { + if window <= 0 { + window = 24 * time.Hour + } + disabled := maxN <= 0 + return &PostgresSlidingWindowLimiter{ + db: db, + maxN: maxN, + window: window, + disabled: disabled, + } +} + +// Allow records a request at the given (key, now) and returns +// ErrRateLimited if the configured cap is exceeded inside the +// configured window. Matches SlidingWindowLimiter.Allow byte-for-byte +// in caller-visible semantics so Sprint 13.3's backend-selector swap +// is signature-clean. +// +// The `now` argument is the timestamp the call is "happening at". +// Used as the prune cutoff (entries older than now-window are dropped) +// and as the new appended entry. Tests pass synthetic `now` values +// to exercise window-expiry deterministically; production call sites +// pass time.Now() (matching how SlidingWindowLimiter is invoked +// today — see internal/api/handler/{est,export,certificates, +// auth_breakglass}.go). +// +// Empty `key` short-circuits to nil (matches the memory backend's +// chokepoint-avoidance contract). +func (l *PostgresSlidingWindowLimiter) Allow(key string, now time.Time) error { + if l.disabled { + return nil + } + if key == "" { + return nil + } + + ctx := context.Background() + tx, err := l.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) + if err != nil { + return fmt.Errorf("ratelimit: begin tx: %w", err) + } + defer func() { + // Rollback is a no-op once the tx is committed; safe to defer + // unconditionally for the error paths. + _ = tx.Rollback() + }() + + // Step 1: ensure the row exists so SELECT FOR UPDATE has something + // to lock. ON CONFLICT DO NOTHING is a no-op when the row already + // exists. + if _, err := tx.ExecContext(ctx, ` + INSERT INTO rate_limit_buckets (bucket_key, timestamps, updated_at) + VALUES ($1, '{}', $2) + ON CONFLICT (bucket_key) DO NOTHING + `, key, now); err != nil { + return fmt.Errorf("ratelimit: ensure row: %w", err) + } + + // Step 2: lock the row + read current state. + var existing pq.GenericArray + var ts []time.Time + existing.A = &ts + if err := tx.QueryRowContext(ctx, ` + SELECT COALESCE(timestamps, '{}'::timestamptz[]) + FROM rate_limit_buckets + WHERE bucket_key = $1 + FOR UPDATE + `, key).Scan(&existing); err != nil { + // Shouldn't happen — step 1 ensured the row exists. Treat + // the sql.ErrNoRows path as a no-op (be conservative; never + // over-limit on transient DB weirdness). + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return fmt.Errorf("ratelimit: select-for-update: %w", err) + } + + // Step 3: prune in Go via the shared helper. Same prune semantics + // as SlidingWindowLimiter — single source of truth. + cutoff := now.Add(-l.window) + pruned := pruneOlderThan(ts, cutoff) + + // Step 4: decide. + rateLimited := len(pruned) >= l.maxN + if !rateLimited { + pruned = append(pruned, now) + } + + // Step 5: persist. + if _, err := tx.ExecContext(ctx, ` + UPDATE rate_limit_buckets + SET timestamps = $2, updated_at = $3 + WHERE bucket_key = $1 + `, key, pq.Array(pruned), now); err != nil { + return fmt.Errorf("ratelimit: update: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("ratelimit: commit: %w", err) + } + + if rateLimited { + return ErrRateLimited + } + return nil +} + +// Disabled reports whether the limiter is in opt-out mode (maxN <= 0). +// Mirrors SlidingWindowLimiter.Disabled() so handler-side gating + +// admin-endpoint observability can ask the same question of either +// backend. +func (l *PostgresSlidingWindowLimiter) Disabled() bool { + return l.disabled +} diff --git a/migrations/000046_rate_limit_buckets.down.sql b/migrations/000046_rate_limit_buckets.down.sql new file mode 100644 index 0000000..07dd926 --- /dev/null +++ b/migrations/000046_rate_limit_buckets.down.sql @@ -0,0 +1,6 @@ +-- Phase 13 Sprint 13.2 reversal — drop the rate-limit bucket table. +-- Down migrations are not run in production; this file exists for +-- developer-side rollback during integration testing. + +DROP INDEX IF EXISTS rate_limit_buckets_updated_at_idx; +DROP TABLE IF EXISTS rate_limit_buckets; diff --git a/migrations/000046_rate_limit_buckets.up.sql b/migrations/000046_rate_limit_buckets.up.sql new file mode 100644 index 0000000..b5fe056 --- /dev/null +++ b/migrations/000046_rate_limit_buckets.up.sql @@ -0,0 +1,28 @@ +-- Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit +-- ARCH-M1): introduce a postgres-backed sliding-window rate limiter so +-- per-process / in-memory limits become cross-replica-consistent when +-- the operator sets CERTCTL_RATELIMIT_BACKEND=postgres (wired in +-- Sprint 13.3). +-- +-- One row per (bucket_key) — caller composes the key the same way the +-- memory backend already does (e.g. "subject|issuer" for SCEP/Intune, +-- "srcIP|peek" for EST failed-basic, raw "actor" for export, etc.). +-- The `timestamps` array stores the in-window log; prune-on-Allow +-- keeps it bounded by the limiter's maxN cap. +-- +-- updated_at + the index on it support the Sprint 13.3 scheduler +-- janitor loop: any row whose updated_at is older than the longest +-- configured window is safely deletable. +-- +-- Per CLAUDE.md "Idempotent migrations" architecture decision: +-- IF NOT EXISTS on every statement. Re-running this migration is +-- a no-op on a database that already has the table. + +CREATE TABLE IF NOT EXISTS rate_limit_buckets ( + bucket_key TEXT PRIMARY KEY, + timestamps TIMESTAMPTZ[] NOT NULL DEFAULT '{}', + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS rate_limit_buckets_updated_at_idx + ON rate_limit_buckets (updated_at);