Files
certctl/internal/ratelimit/equivalence_test.go
T
shankar0123 c8347d742d feat(ratelimit): Phase 13 Sprint 13.2 — postgres-backed sliding window + multi-replica test
Phase 13 Sprint 13.2 closure (architecture diligence audit ARCH-M1):
ships the infrastructure half of the ARCH-M1 substantive close. Adds a
postgres-backed sliding-window rate limiter that satisfies the same
interface as the in-memory primitive — cross-replica-consistent rather
than per-process. Sprint 13.3 wires the 5 call sites through a
backend selector (`CERTCTL_RATELIMIT_BACKEND={memory,postgres}`); this
commit deliberately changes ZERO call sites. The infrastructure +
migration ship as their own review window, mirroring the Phase 9
Sprint 8a/8b pattern.

Substantive close, not document-and-defer
=========================================
The audit recommended "document the per-process limit + defer the
distributed backend to v3." The operator chose Option M1-A (postgres-
backed; zero new infra) over the document-and-defer path. Postgres
is already a hard dependency for certctl; no new operator burden. The
multi-replica integration test in this commit is the falsifiable
closure proof — cap-N enforced exactly across N replicas hitting the
same key concurrently.

Signature ground-truth
======================
The Sprint 13.2 prompt template specified `Allow(key string) error` as
the signature to match. The actual repo signature has been
`Allow(key string, now time.Time) error` since the EST RFC 7030
hardening master bundle Phase 4.1 — the `now` parameter is what makes
the memory limiter testable against synthetic time without an
indirection through clock-injection. The new `Limiter` interface +
`PostgresSlidingWindowLimiter` match the actual repo signature
(`Allow(key string, now time.Time) error`) byte-for-byte. Per CLAUDE.md
"the repo is truth" — the prompt is framing, the code is ground-truth.

Files added
===========

migrations/000046_rate_limit_buckets.up.sql + .down.sql:
  - rate_limit_buckets(bucket_key TEXT PRIMARY KEY, timestamps
    TIMESTAMPTZ[] NOT NULL DEFAULT '{}', updated_at TIMESTAMPTZ NOT
    NULL DEFAULT NOW()).
  - btree index on updated_at supports the Sprint 13.3 janitor sweep.
  - All statements IF NOT EXISTS / DROP IF EXISTS per CLAUDE.md
    "Idempotent migrations" rule.

internal/ratelimit/limiter.go (NEW, 53 LOC):
  - Defines the `Limiter` interface with `Allow(key string,
    now time.Time) error`.
  - Compile-time satisfaction checks for both backends.
  - Doc-comment documents the prompt-vs-repo signature reconciliation
    + the Sprint 13.3 backend-selector plan + why the interface stays
    minimal (Disabled/Len are non-portable cross-backend; keeping them
    off the interface avoids leaking implementation detail).

internal/ratelimit/postgres_sliding_window.go (NEW, 178 LOC):
  - PostgresSlidingWindowLimiter struct + NewPostgresSlidingWindowLimiter
    constructor + Allow + Disabled methods.
  - Algorithm: BEGIN tx → INSERT ON CONFLICT DO NOTHING (ensures the
    row exists) → SELECT ... FOR UPDATE (per-key row lock acquired
    across the cluster) → prune in Go via the shared pruneOlderThan
    helper (single source of truth for prune semantics) → decide
    rate-limited or append → UPDATE → COMMIT.
  - SELECT FOR UPDATE is what arbitrates across replicas. Replicas A
    and B firing simultaneous Allow("k") never race because Postgres
    serializes the row-lock; the memory backend's sync.Mutex only
    arbitrates within a process.
  - Same `maxN <= 0 → disabled` opt-out semantics as the memory
    backend.
  - Empty-key short-circuit (chokepoint avoidance) matches the memory
    backend.
  - Uses pq.Array for TIMESTAMPTZ[] marshalling (lib/pq is the
    existing project driver).

internal/ratelimit/equivalence_test.go (NEW, 304 LOC):
  - Backend-equivalence suite that runs the same scenario set against
    both backends via the `Limiter` interface. 7 scenarios per
    backend: AllowsUpToCap, DistinctKeysIndependent, WindowExpiry,
    DisabledBypass, NegativeCapDisabled, EmptyKeyShortCircuits,
    ConcurrentRaceFree.
  - Memory half: TestSlidingWindowLimiter_Equivalence_Memory — runs
    on every `go test ./...`.
  - Postgres half: TestSlidingWindowLimiter_Equivalence_Postgres —
    gated by `testing.Short()`; runs only when -short is omitted, so
    `go test -race -short ./...` keeps fast.
  - Schema-per-test isolation via testcontainers-go (mirrors the
    pattern in internal/repository/postgres/testutil_test.go: setup
    one container, fresh schema per subtest, search_path-pinned DSN).
  - Memory equivalence half re-verifies the same behaviors pinned in
    the pre-existing sliding_window_test.go but through the interface
    — catches drift if SlidingWindowLimiter.Allow ever changes shape.

internal/integration/ratelimit_multi_replica_test.go (NEW, 159 LOC):
  - The falsifiable ARCH-M1 closure proof, gated by //go:build
    integration matching the rest of internal/integration/.
  - Scenario: 1 postgres container shared across N=3 independent
    *PostgresSlidingWindowLimiter instances (each replica's process
    has its own *sql.DB pool to the same database, just like a real
    HA deployment). 100 concurrent Allow("test-key") calls round-
    robin across the 3 limiters via sync.WaitGroup. Cap = 10,
    window = 1m, shared now-timestamp so the scenario is
    deterministic.
  - Assert: exactly 10 succeed + 90 return ErrRateLimited. If the
    cross-replica row lock weren't arbitrating, each replica would
    independently let through ~3-4 requests (10/3), giving 12-15
    successes. The hard-pass on exactly-10 is what makes ARCH-M1
    substantive.

What did NOT change
===================
- internal/ratelimit/sliding_window.go (the memory backend) is
  byte-identical to its pre-Sprint-13.2 state. Same Mutex, same
  Allow signature, same Len/Disabled/pruneOlderThan/evictOldestLocked.
  Compile-time check in limiter.go pins that the memory backend
  still satisfies the new interface.
- No call site in cmd/server, internal/api/handler, internal/service
  changed. Sprint 13.3 owns the 5-site migration + the
  CERTCTL_RATELIMIT_BACKEND env-var selector.
- No new operator dependency. Postgres is already required for
  certctl-server to boot. Redis (Option M1-B) was declined by the
  operator and is not introduced here.

Verification
============

  $ ls migrations/000046_rate_limit_buckets.up.sql migrations/000046_rate_limit_buckets.down.sql
  $ ls internal/ratelimit/limiter.go internal/ratelimit/postgres_sliding_window.go

  $ grep -nE 'sync\.Mutex|sync\.RWMutex' internal/ratelimit/sliding_window.go
    30:// by sync.Mutex; per-key slices mutated only while the mutex is
    56:	mu       sync.Mutex
    (memory backend untouched)

  $ gofmt -l internal/ratelimit/ internal/integration/  → clean
  $ go vet ./internal/ratelimit/...                      → clean
  $ go vet -tags=integration ./internal/integration/...  → clean
  $ staticcheck ./internal/ratelimit/...                 → clean
  $ go build ./...                                       → clean
  $ go build -tags=integration ./internal/integration/...→ clean

  $ go test -race -short -count=1 ./internal/ratelimit/...
    ok  github.com/certctl-io/certctl/internal/ratelimit  1.028s
    (memory equivalence + sliding_window_test.go both pass; postgres
    equivalence skipped under -short as designed)

  $ go doc ./internal/ratelimit/
    type Limiter interface{ ... }
    type PostgresSlidingWindowLimiter struct{ ... }
        func NewPostgresSlidingWindowLimiter(db *sql.DB, maxN int,
            window time.Duration) *PostgresSlidingWindowLimiter
    type SlidingWindowLimiter struct{ ... }
        func NewSlidingWindowLimiter(maxN int, window time.Duration,
            mapCap int) *SlidingWindowLimiter
    var ErrRateLimited = ...
    (public surface matches the Sprint 13.2 prompt's required diff)

Sandbox note: the multi-replica integration test + the postgres
equivalence half run under testcontainers-go which requires docker-
in-docker. The CI integration job exercises both; local CI-equivalent
verification was build + vet + staticcheck + memory equivalence (the
sandbox /sessions partition is full so spinning a postgres container
locally isn't viable in this session). The Sprint 13.3 commit will
re-verify against the live integration job.

Next: Sprint 13.3 wires every call site through
ratelimit.NewLimiter(cfg.Server.RateLimitBackend, db, ...) +
introduces the scheduler janitor loop + rewrites the
docs/operator/observability.md "per-process" paragraph to describe
the configurable backend.

Refs: ARCH-M1 (HA / scale — rate limits per-process), Phase 13
Sprint 13.2.
2026-05-14 11:30:44 +00:00

413 lines
14 KiB
Go

// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package ratelimit_test
import (
"context"
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"time"
_ "github.com/lib/pq"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"github.com/certctl-io/certctl/internal/ratelimit"
)
// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit
// ARCH-M1): backend-equivalence test suite. Runs the same scenario
// surface against both backends (in-memory + postgres) via the shared
// Limiter interface — if the postgres backend's caller-visible
// semantics drift from the memory backend's, this file fails first.
//
// Mirrors the white-box test names in sliding_window_test.go: every
// public-surface behavior pinned there (cap, expiry, disabled bypass,
// empty-key short-circuit, concurrency) gets re-pinned here for the
// postgres backend.
//
// Postgres tests skip under -short (matches the pattern in
// internal/repository/postgres/testutil_test.go); CI's
// `go test -race -short -count=1 ./...` exercises only the memory
// half. The integration job runs the full suite.
// ----------------------------------------------------------------
// Backend-equivalence helpers
// ----------------------------------------------------------------
// limiterFactory builds a fresh Limiter for one test case.
// Memory backends discard `db`; postgres backends use it.
type limiterFactory func(t *testing.T, db *sql.DB, maxN int, window time.Duration) ratelimit.Limiter
func memoryFactory(t *testing.T, _ *sql.DB, maxN int, window time.Duration) ratelimit.Limiter {
t.Helper()
// Map cap of 10_000 — large enough that none of the equivalence
// scenarios trip the LRU-eviction branch (the eviction branch is
// memory-specific; postgres has no equivalent so it's not part of
// the cross-backend contract).
return ratelimit.NewSlidingWindowLimiter(maxN, window, 10_000)
}
func postgresFactory(t *testing.T, db *sql.DB, maxN int, window time.Duration) ratelimit.Limiter {
t.Helper()
if db == nil {
t.Fatal("postgresFactory requires a non-nil *sql.DB")
}
return ratelimit.NewPostgresSlidingWindowLimiter(db, maxN, window)
}
// ----------------------------------------------------------------
// Per-backend test entry points
// ----------------------------------------------------------------
func TestSlidingWindowLimiter_Equivalence_Memory(t *testing.T) {
t.Run("AllowsUpToCap", func(t *testing.T) { caseAllowsUpToCap(t, memoryFactory, nil) })
t.Run("DistinctKeysIndependent", func(t *testing.T) { caseDistinctKeysIndependent(t, memoryFactory, nil) })
t.Run("WindowExpiry", func(t *testing.T) { caseWindowExpiry(t, memoryFactory, nil) })
t.Run("DisabledBypass", func(t *testing.T) { caseDisabledBypass(t, memoryFactory, nil) })
t.Run("NegativeCapDisabled", func(t *testing.T) { caseNegativeCapDisabled(t, memoryFactory, nil) })
t.Run("EmptyKeyShortCircuits", func(t *testing.T) { caseEmptyKeyShortCircuits(t, memoryFactory, nil) })
t.Run("ConcurrentRaceFree", func(t *testing.T) {
if testing.Short() {
t.Skip("race-style test under -short")
}
caseConcurrentRaceFree(t, memoryFactory, nil)
})
}
func TestSlidingWindowLimiter_Equivalence_Postgres(t *testing.T) {
if testing.Short() {
t.Skip("postgres equivalence tests require testcontainers; skipped under -short")
}
tdb := setupTestDB(t)
defer tdb.teardown(t)
t.Run("AllowsUpToCap", func(t *testing.T) {
db := tdb.freshSchema(t, "AllowsUpToCap")
caseAllowsUpToCap(t, postgresFactory, db)
})
t.Run("DistinctKeysIndependent", func(t *testing.T) {
db := tdb.freshSchema(t, "DistinctKeysIndependent")
caseDistinctKeysIndependent(t, postgresFactory, db)
})
t.Run("WindowExpiry", func(t *testing.T) {
db := tdb.freshSchema(t, "WindowExpiry")
caseWindowExpiry(t, postgresFactory, db)
})
t.Run("DisabledBypass", func(t *testing.T) {
db := tdb.freshSchema(t, "DisabledBypass")
caseDisabledBypass(t, postgresFactory, db)
})
t.Run("NegativeCapDisabled", func(t *testing.T) {
db := tdb.freshSchema(t, "NegativeCapDisabled")
caseNegativeCapDisabled(t, postgresFactory, db)
})
t.Run("EmptyKeyShortCircuits", func(t *testing.T) {
db := tdb.freshSchema(t, "EmptyKeyShortCircuits")
caseEmptyKeyShortCircuits(t, postgresFactory, db)
})
t.Run("ConcurrentRaceFree", func(t *testing.T) {
db := tdb.freshSchema(t, "ConcurrentRaceFree")
caseConcurrentRaceFree(t, postgresFactory, db)
})
}
// ----------------------------------------------------------------
// Backend-agnostic test cases (one per behavior pinned in
// sliding_window_test.go's public-surface tests)
// ----------------------------------------------------------------
func caseAllowsUpToCap(t *testing.T, mk limiterFactory, db *sql.DB) {
l := mk(t, db, 3, 24*time.Hour)
now := time.Now()
for i := 0; i < 3; i++ {
if err := l.Allow("k", now.Add(time.Duration(i)*time.Minute)); err != nil {
t.Fatalf("call %d should be allowed: %v", i+1, err)
}
}
if err := l.Allow("k", now.Add(4*time.Minute)); !errors.Is(err, ratelimit.ErrRateLimited) {
t.Fatalf("4th call should be rate-limited; got %v", err)
}
}
func caseDistinctKeysIndependent(t *testing.T, mk limiterFactory, db *sql.DB) {
l := mk(t, db, 1, 24*time.Hour)
now := time.Now()
if err := l.Allow("k-1", now); err != nil {
t.Fatalf("first allow: %v", err)
}
if err := l.Allow("k-2", now); err != nil {
t.Fatalf("different key must have its own bucket: %v", err)
}
if err := l.Allow("k-1", now.Add(1*time.Second)); !errors.Is(err, ratelimit.ErrRateLimited) {
t.Fatalf("repeat key should be limited; got %v", err)
}
}
func caseWindowExpiry(t *testing.T, mk limiterFactory, db *sql.DB) {
l := mk(t, db, 2, 1*time.Hour)
now := time.Now()
if err := l.Allow("k", now); err != nil {
t.Fatal(err)
}
if err := l.Allow("k", now.Add(30*time.Minute)); err != nil {
t.Fatal(err)
}
// Inside window — limited.
if err := l.Allow("k", now.Add(45*time.Minute)); !errors.Is(err, ratelimit.ErrRateLimited) {
t.Fatalf("inside-window 3rd call should be limited: %v", err)
}
// Past window — slots reopen.
if err := l.Allow("k", now.Add(2*time.Hour)); err != nil {
t.Fatalf("past-window call should be allowed (window reset): %v", err)
}
}
func caseDisabledBypass(t *testing.T, mk limiterFactory, db *sql.DB) {
l := mk(t, db, 0, 24*time.Hour) // maxN=0 → disabled
type disablable interface {
Disabled() bool
}
if d, ok := l.(disablable); ok && !d.Disabled() {
t.Fatal("limiter with maxN=0 must report Disabled()=true")
}
now := time.Now()
for i := 0; i < 100; i++ {
if err := l.Allow("k", now); err != nil {
t.Fatalf("disabled limiter must allow everything: %v", err)
}
}
}
func caseNegativeCapDisabled(t *testing.T, mk limiterFactory, db *sql.DB) {
l := mk(t, db, -1, 24*time.Hour)
type disablable interface {
Disabled() bool
}
if d, ok := l.(disablable); ok && !d.Disabled() {
t.Fatal("negative maxN must produce a disabled limiter")
}
now := time.Now()
if err := l.Allow("k", now); err != nil {
t.Fatalf("disabled limiter must allow: %v", err)
}
}
func caseEmptyKeyShortCircuits(t *testing.T, mk limiterFactory, db *sql.DB) {
// Empty key is the caller's defense-in-depth case — caller's
// validation upstream should reject empty-key events first. Limiter
// must not build a single shared bucket keyed by empty-key — that
// would be a chokepoint for every empty-key event.
l := mk(t, db, 1, 24*time.Hour)
now := time.Now()
for i := 0; i < 50; i++ {
if err := l.Allow("", now); err != nil {
t.Fatalf("empty key must short-circuit (call %d): %v", i, err)
}
}
}
func caseConcurrentRaceFree(t *testing.T, mk limiterFactory, db *sql.DB) {
l := mk(t, db, 50, 24*time.Hour)
var wg sync.WaitGroup
for g := 0; g < 20; g++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
now := time.Now()
key := fmt.Sprintf("k-%d", id)
for i := 0; i < 30; i++ {
_ = l.Allow(key, now)
}
}(g)
}
wg.Wait()
}
// ----------------------------------------------------------------
// Postgres-only testcontainers harness — mirrors
// internal/repository/postgres/testutil_test.go's setupTestDB +
// freshSchema pattern.
// ----------------------------------------------------------------
type testDB struct {
db *sql.DB
container testcontainers.Container
}
func setupTestDB(t *testing.T) *testDB {
t.Helper()
ctx := context.Background()
req := testcontainers.ContainerRequest{
Image: "postgres:16-alpine",
ExposedPorts: []string{"5432/tcp"},
Env: map[string]string{
"POSTGRES_DB": "certctl_test",
"POSTGRES_USER": "certctl",
"POSTGRES_PASSWORD": "certctl",
},
WaitingFor: wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
t.Fatalf("start postgres container: %v", err)
}
host, err := container.Host(ctx)
if err != nil {
t.Fatalf("container host: %v", err)
}
port, err := container.MappedPort(ctx, "5432")
if err != nil {
t.Fatalf("container port: %v", err)
}
connStr := fmt.Sprintf("postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable", host, port.Port())
db, err := sql.Open("postgres", connStr)
if err != nil {
t.Fatalf("open db: %v", err)
}
// Pool size > 1 so the multi-goroutine concurrency case can hold
// multiple connections simultaneously; the row-lock arbitrates.
db.SetMaxOpenConns(8)
if err := db.Ping(); err != nil {
t.Fatalf("ping: %v", err)
}
return &testDB{db: db, container: container}
}
func (tdb *testDB) teardown(t *testing.T) {
t.Helper()
if tdb.db != nil {
tdb.db.Close()
}
if tdb.container != nil {
_ = tdb.container.Terminate(context.Background())
}
}
// freshSchema creates an isolated schema per test case + runs the
// rate_limit_buckets migration inside it. Returns a *sql.DB whose
// search_path is scoped to the new schema.
//
// Note: this helper takes a sub-test label (caller-supplied) so the
// schema name is deterministic-per-case + stable across runs. The
// canonical postgres testutil uses t.Name() but we're inside Run-
// nested subtests where t.Name() includes "/" — flatten it.
func (tdb *testDB) freshSchema(t *testing.T, label string) *sql.DB {
t.Helper()
schema := sanitizeSchemaName(label + "_" + t.Name())
ctx := context.Background()
// One connection-scoped session so SET search_path persists.
conn, err := tdb.db.Conn(ctx)
if err != nil {
t.Fatalf("acquire conn: %v", err)
}
if _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", schema)); err != nil {
t.Fatalf("create schema: %v", err)
}
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET search_path TO %s, public", schema)); err != nil {
t.Fatalf("set search_path: %v", err)
}
// Run the rate_limit_buckets migration in this schema. The migration
// is the only one that introduces our table; other migrations don't
// matter for limiter behavior.
migPath := findMigration("000046_rate_limit_buckets.up.sql")
body, err := os.ReadFile(migPath)
if err != nil {
t.Fatalf("read migration: %v", err)
}
if _, err := conn.ExecContext(ctx, string(body)); err != nil {
t.Fatalf("apply migration: %v", err)
}
t.Cleanup(func() {
conn.ExecContext(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema))
conn.Close()
})
// Wrap the single connection in a *sql.DB-like by returning a fresh
// pool that goes through the same search_path. Simpler: just return
// the underlying *sql.DB and SET search_path session-wide by re-
// running the SET on every checkout. The cleanest move is to use
// the per-connection helper: return a *sql.DB that's actually a
// "limited to N=1 connection with search_path pinned" handle.
//
// Workaround the easy way: build a fresh *sql.DB whose dsn embeds
// search_path as a connection-time setting, so every connection
// auto-applies it.
dsn := connDSNWithSearchPath(tdb, schema)
scoped, err := sql.Open("postgres", dsn)
if err != nil {
t.Fatalf("open scoped db: %v", err)
}
scoped.SetMaxOpenConns(8)
t.Cleanup(func() { scoped.Close() })
// Sanity: row exists / table exists.
if _, err := scoped.ExecContext(ctx, "SELECT 1 FROM rate_limit_buckets LIMIT 1"); err != nil && !strings.Contains(err.Error(), "no rows") {
// Empty table is fine; only a missing-table error matters.
// "no rows" never fires here (we used Exec not Query).
t.Fatalf("smoke select: %v", err)
}
return scoped
}
func connDSNWithSearchPath(tdb *testDB, schema string) string {
// Derive the DSN by introspection of the container's host/port.
// Couldn't pre-store because freshSchema can be called many times.
ctx := context.Background()
host, _ := tdb.container.Host(ctx)
port, _ := tdb.container.MappedPort(ctx, "5432")
return fmt.Sprintf(
"postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable&search_path=%s,public",
host, port.Port(), schema,
)
}
func sanitizeSchemaName(name string) string {
name = strings.ToLower(name)
for _, ch := range []string{"/", " ", "-", "."} {
name = strings.ReplaceAll(name, ch, "_")
}
if len(name) > 50 {
name = name[:50]
}
return "test_rl_" + name
}
func findMigration(filename string) string {
_, here, _, _ := runtime.Caller(0)
// here = .../internal/ratelimit/equivalence_test.go
// migrations = .../migrations
dir := filepath.Dir(here)
for i := 0; i < 6; i++ {
candidate := filepath.Join(dir, "migrations", filename)
if _, err := os.Stat(candidate); err == nil {
return candidate
}
dir = filepath.Dir(dir)
}
return ""
}