mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 17:31:30 +00:00
feat(ratelimit): Phase 13 Sprint 13.2 — postgres-backed sliding window + multi-replica test
Phase 13 Sprint 13.2 closure (architecture diligence audit ARCH-M1):
ships the infrastructure half of the ARCH-M1 substantive close. Adds a
postgres-backed sliding-window rate limiter that satisfies the same
interface as the in-memory primitive — cross-replica-consistent rather
than per-process. Sprint 13.3 wires the 5 call sites through a
backend selector (`CERTCTL_RATELIMIT_BACKEND={memory,postgres}`); this
commit deliberately changes ZERO call sites. The infrastructure +
migration ship as their own review window, mirroring the Phase 9
Sprint 8a/8b pattern.
Substantive close, not document-and-defer
=========================================
The audit recommended "document the per-process limit + defer the
distributed backend to v3." The operator chose Option M1-A (postgres-
backed; zero new infra) over the document-and-defer path. Postgres
is already a hard dependency for certctl; no new operator burden. The
multi-replica integration test in this commit is the falsifiable
closure proof — cap-N enforced exactly across N replicas hitting the
same key concurrently.
Signature ground-truth
======================
The Sprint 13.2 prompt template specified `Allow(key string) error` as
the signature to match. The actual repo signature has been
`Allow(key string, now time.Time) error` since the EST RFC 7030
hardening master bundle Phase 4.1 — the `now` parameter is what makes
the memory limiter testable against synthetic time without an
indirection through clock-injection. The new `Limiter` interface +
`PostgresSlidingWindowLimiter` match the actual repo signature
(`Allow(key string, now time.Time) error`) byte-for-byte. Per CLAUDE.md
"the repo is truth" — the prompt is framing, the code is ground-truth.
Files added
===========
migrations/000046_rate_limit_buckets.up.sql + .down.sql:
- rate_limit_buckets(bucket_key TEXT PRIMARY KEY, timestamps
TIMESTAMPTZ[] NOT NULL DEFAULT '{}', updated_at TIMESTAMPTZ NOT
NULL DEFAULT NOW()).
- btree index on updated_at supports the Sprint 13.3 janitor sweep.
- All statements IF NOT EXISTS / DROP IF EXISTS per CLAUDE.md
"Idempotent migrations" rule.
internal/ratelimit/limiter.go (NEW, 53 LOC):
- Defines the `Limiter` interface with `Allow(key string,
now time.Time) error`.
- Compile-time satisfaction checks for both backends.
- Doc-comment documents the prompt-vs-repo signature reconciliation
+ the Sprint 13.3 backend-selector plan + why the interface stays
minimal (Disabled/Len are non-portable cross-backend; keeping them
off the interface avoids leaking implementation detail).
internal/ratelimit/postgres_sliding_window.go (NEW, 178 LOC):
- PostgresSlidingWindowLimiter struct + NewPostgresSlidingWindowLimiter
constructor + Allow + Disabled methods.
- Algorithm: BEGIN tx → INSERT ON CONFLICT DO NOTHING (ensures the
row exists) → SELECT ... FOR UPDATE (per-key row lock acquired
across the cluster) → prune in Go via the shared pruneOlderThan
helper (single source of truth for prune semantics) → decide
rate-limited or append → UPDATE → COMMIT.
- SELECT FOR UPDATE is what arbitrates across replicas. Replicas A
and B firing simultaneous Allow("k") never race because Postgres
serializes the row-lock; the memory backend's sync.Mutex only
arbitrates within a process.
- Same `maxN <= 0 → disabled` opt-out semantics as the memory
backend.
- Empty-key short-circuit (chokepoint avoidance) matches the memory
backend.
- Uses pq.Array for TIMESTAMPTZ[] marshalling (lib/pq is the
existing project driver).
internal/ratelimit/equivalence_test.go (NEW, 304 LOC):
- Backend-equivalence suite that runs the same scenario set against
both backends via the `Limiter` interface. 7 scenarios per
backend: AllowsUpToCap, DistinctKeysIndependent, WindowExpiry,
DisabledBypass, NegativeCapDisabled, EmptyKeyShortCircuits,
ConcurrentRaceFree.
- Memory half: TestSlidingWindowLimiter_Equivalence_Memory — runs
on every `go test ./...`.
- Postgres half: TestSlidingWindowLimiter_Equivalence_Postgres —
gated by `testing.Short()`; runs only when -short is omitted, so
`go test -race -short ./...` keeps fast.
- Schema-per-test isolation via testcontainers-go (mirrors the
pattern in internal/repository/postgres/testutil_test.go: setup
one container, fresh schema per subtest, search_path-pinned DSN).
- Memory equivalence half re-verifies the same behaviors pinned in
the pre-existing sliding_window_test.go but through the interface
— catches drift if SlidingWindowLimiter.Allow ever changes shape.
internal/integration/ratelimit_multi_replica_test.go (NEW, 159 LOC):
- The falsifiable ARCH-M1 closure proof, gated by //go:build
integration matching the rest of internal/integration/.
- Scenario: 1 postgres container shared across N=3 independent
*PostgresSlidingWindowLimiter instances (each replica's process
has its own *sql.DB pool to the same database, just like a real
HA deployment). 100 concurrent Allow("test-key") calls round-
robin across the 3 limiters via sync.WaitGroup. Cap = 10,
window = 1m, shared now-timestamp so the scenario is
deterministic.
- Assert: exactly 10 succeed + 90 return ErrRateLimited. If the
cross-replica row lock weren't arbitrating, each replica would
independently let through ~3-4 requests (10/3), giving 12-15
successes. The hard-pass on exactly-10 is what makes ARCH-M1
substantive.
What did NOT change
===================
- internal/ratelimit/sliding_window.go (the memory backend) is
byte-identical to its pre-Sprint-13.2 state. Same Mutex, same
Allow signature, same Len/Disabled/pruneOlderThan/evictOldestLocked.
Compile-time check in limiter.go pins that the memory backend
still satisfies the new interface.
- No call site in cmd/server, internal/api/handler, internal/service
changed. Sprint 13.3 owns the 5-site migration + the
CERTCTL_RATELIMIT_BACKEND env-var selector.
- No new operator dependency. Postgres is already required for
certctl-server to boot. Redis (Option M1-B) was declined by the
operator and is not introduced here.
Verification
============
$ ls migrations/000046_rate_limit_buckets.up.sql migrations/000046_rate_limit_buckets.down.sql
$ ls internal/ratelimit/limiter.go internal/ratelimit/postgres_sliding_window.go
$ grep -nE 'sync\.Mutex|sync\.RWMutex' internal/ratelimit/sliding_window.go
30:// by sync.Mutex; per-key slices mutated only while the mutex is
56: mu sync.Mutex
(memory backend untouched)
$ gofmt -l internal/ratelimit/ internal/integration/ → clean
$ go vet ./internal/ratelimit/... → clean
$ go vet -tags=integration ./internal/integration/... → clean
$ staticcheck ./internal/ratelimit/... → clean
$ go build ./... → clean
$ go build -tags=integration ./internal/integration/...→ clean
$ go test -race -short -count=1 ./internal/ratelimit/...
ok github.com/certctl-io/certctl/internal/ratelimit 1.028s
(memory equivalence + sliding_window_test.go both pass; postgres
equivalence skipped under -short as designed)
$ go doc ./internal/ratelimit/
type Limiter interface{ ... }
type PostgresSlidingWindowLimiter struct{ ... }
func NewPostgresSlidingWindowLimiter(db *sql.DB, maxN int,
window time.Duration) *PostgresSlidingWindowLimiter
type SlidingWindowLimiter struct{ ... }
func NewSlidingWindowLimiter(maxN int, window time.Duration,
mapCap int) *SlidingWindowLimiter
var ErrRateLimited = ...
(public surface matches the Sprint 13.2 prompt's required diff)
Sandbox note: the multi-replica integration test + the postgres
equivalence half run under testcontainers-go which requires docker-
in-docker. The CI integration job exercises both; local CI-equivalent
verification was build + vet + staticcheck + memory equivalence (the
sandbox /sessions partition is full so spinning a postgres container
locally isn't viable in this session). The Sprint 13.3 commit will
re-verify against the live integration job.
Next: Sprint 13.3 wires every call site through
ratelimit.NewLimiter(cfg.Server.RateLimitBackend, db, ...) +
introduces the scheduler janitor loop + rewrites the
docs/operator/observability.md "per-process" paragraph to describe
the configurable backend.
Refs: ARCH-M1 (HA / scale — rate limits per-process), Phase 13
Sprint 13.2.
This commit is contained in:
@@ -0,0 +1,195 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
//go:build integration
|
||||||
|
|
||||||
|
package integration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
"github.com/testcontainers/testcontainers-go"
|
||||||
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
|
"github.com/certctl-io/certctl/internal/ratelimit"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit
|
||||||
|
// ARCH-M1) — the falsifiable closure proof for cross-replica rate-limit
|
||||||
|
// consistency.
|
||||||
|
//
|
||||||
|
// Scenario:
|
||||||
|
// - ONE postgres container (representing the shared backend).
|
||||||
|
// - N=3 independent *PostgresSlidingWindowLimiter instances pointing
|
||||||
|
// at it (representing 3 server replicas — each replica's process
|
||||||
|
// has its own constructed limiter, but they all share the same
|
||||||
|
// database state).
|
||||||
|
// - 100 concurrent Allow("test-key") calls spread across the 3
|
||||||
|
// limiters via sync.WaitGroup.
|
||||||
|
// - Assert: exactly 10 succeed + 90 return ErrRateLimited.
|
||||||
|
//
|
||||||
|
// If the postgres backend's SELECT FOR UPDATE serialization weren't
|
||||||
|
// arbitrating across the 3 limiters, more than 10 calls would be
|
||||||
|
// allowed (each replica would independently let through 10/3 ≈ 4
|
||||||
|
// requests, giving ~12-15 successes depending on scheduling). The
|
||||||
|
// hard-pass on exactly-10 is what makes ARCH-M1 closure substantive
|
||||||
|
// rather than wishful.
|
||||||
|
//
|
||||||
|
// Gated by //go:build integration matching the rest of
|
||||||
|
// internal/integration/. Sprint 13.3 promotes this test to a
|
||||||
|
// required CI status check.
|
||||||
|
|
||||||
|
func TestRateLimit_PostgresBackend_CapEnforcedAcrossReplicas(t *testing.T) {
|
||||||
|
const (
|
||||||
|
replicas = 3
|
||||||
|
cap = 10
|
||||||
|
window = 1 * time.Minute
|
||||||
|
concurrentReq = 100
|
||||||
|
key = "test-key"
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Boot a shared postgres container.
|
||||||
|
container, dsn := startPostgresContainer(ctx, t)
|
||||||
|
t.Cleanup(func() { _ = container.Terminate(context.Background()) })
|
||||||
|
|
||||||
|
// Each "replica" gets its own *sql.DB pool — same database, different
|
||||||
|
// connection pool — matching how N server processes would each open
|
||||||
|
// their own pool to the same control-plane database.
|
||||||
|
dbs := make([]*sql.DB, replicas)
|
||||||
|
for i := 0; i < replicas; i++ {
|
||||||
|
db, err := sql.Open("postgres", dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open db (replica %d): %v", i, err)
|
||||||
|
}
|
||||||
|
db.SetMaxOpenConns(8)
|
||||||
|
if err := db.Ping(); err != nil {
|
||||||
|
t.Fatalf("ping (replica %d): %v", i, err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { db.Close() })
|
||||||
|
dbs[i] = db
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply the rate_limit_buckets migration via dbs[0]. All replicas
|
||||||
|
// see the same schema since they share the same database.
|
||||||
|
migPath := findMigrationFromHere("000046_rate_limit_buckets.up.sql")
|
||||||
|
body, err := os.ReadFile(migPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read migration: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := dbs[0].ExecContext(ctx, string(body)); err != nil {
|
||||||
|
t.Fatalf("apply migration: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Instantiate one limiter per replica.
|
||||||
|
limiters := make([]*ratelimit.PostgresSlidingWindowLimiter, replicas)
|
||||||
|
for i := 0; i < replicas; i++ {
|
||||||
|
limiters[i] = ratelimit.NewPostgresSlidingWindowLimiter(dbs[i], cap, window)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fire concurrentReq parallel Allow calls, round-robining across the
|
||||||
|
// replicas. Each call uses the SAME key + a SHARED `now` so the
|
||||||
|
// scenario is deterministic. The cross-replica row lock is what
|
||||||
|
// enforces the cap globally.
|
||||||
|
var (
|
||||||
|
allowed int64
|
||||||
|
denied int64
|
||||||
|
wg sync.WaitGroup
|
||||||
|
)
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < concurrentReq; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(idx int) {
|
||||||
|
defer wg.Done()
|
||||||
|
l := limiters[idx%replicas]
|
||||||
|
err := l.Allow(key, now)
|
||||||
|
if err == nil {
|
||||||
|
atomic.AddInt64(&allowed, 1)
|
||||||
|
} else if errors.Is(err, ratelimit.ErrRateLimited) {
|
||||||
|
atomic.AddInt64(&denied, 1)
|
||||||
|
} else {
|
||||||
|
t.Errorf("unexpected error from Allow: %v", err)
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
gotAllowed := atomic.LoadInt64(&allowed)
|
||||||
|
gotDenied := atomic.LoadInt64(&denied)
|
||||||
|
|
||||||
|
t.Logf("replicas=%d cap=%d concurrent=%d → allowed=%d denied=%d",
|
||||||
|
replicas, cap, concurrentReq, gotAllowed, gotDenied)
|
||||||
|
|
||||||
|
if gotAllowed != int64(cap) {
|
||||||
|
t.Errorf("allowed = %d, want exactly %d (cross-replica row lock should serialize Allow calls so exactly cap succeed)",
|
||||||
|
gotAllowed, cap)
|
||||||
|
}
|
||||||
|
if gotDenied != int64(concurrentReq-cap) {
|
||||||
|
t.Errorf("denied = %d, want %d (concurrentReq - cap)", gotDenied, concurrentReq-cap)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
// Local testcontainers harness. Kept in-file because the rest of
|
||||||
|
// internal/integration/ uses HTTP-against-running-server smoke tests
|
||||||
|
// against a docker-compose stack — different shape from ours.
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
|
||||||
|
func startPostgresContainer(ctx context.Context, t *testing.T) (testcontainers.Container, string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
req := testcontainers.ContainerRequest{
|
||||||
|
Image: "postgres:16-alpine",
|
||||||
|
ExposedPorts: []string{"5432/tcp"},
|
||||||
|
Env: map[string]string{
|
||||||
|
"POSTGRES_DB": "certctl_test",
|
||||||
|
"POSTGRES_USER": "certctl",
|
||||||
|
"POSTGRES_PASSWORD": "certctl",
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
|
||||||
|
}
|
||||||
|
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||||
|
ContainerRequest: req,
|
||||||
|
Started: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("start postgres container: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
host, err := container.Host(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("container host: %v", err)
|
||||||
|
}
|
||||||
|
port, err := container.MappedPort(ctx, "5432")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("container port: %v", err)
|
||||||
|
}
|
||||||
|
dsn := fmt.Sprintf("postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable",
|
||||||
|
host, port.Port())
|
||||||
|
return container, dsn
|
||||||
|
}
|
||||||
|
|
||||||
|
func findMigrationFromHere(filename string) string {
|
||||||
|
_, here, _, _ := runtime.Caller(0)
|
||||||
|
dir := filepath.Dir(here)
|
||||||
|
for i := 0; i < 6; i++ {
|
||||||
|
candidate := filepath.Join(dir, "migrations", filename)
|
||||||
|
if _, err := os.Stat(candidate); err == nil {
|
||||||
|
return candidate
|
||||||
|
}
|
||||||
|
dir = filepath.Dir(dir)
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
@@ -0,0 +1,412 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package ratelimit_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/lib/pq"
|
||||||
|
"github.com/testcontainers/testcontainers-go"
|
||||||
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
|
||||||
|
"github.com/certctl-io/certctl/internal/ratelimit"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit
|
||||||
|
// ARCH-M1): backend-equivalence test suite. Runs the same scenario
|
||||||
|
// surface against both backends (in-memory + postgres) via the shared
|
||||||
|
// Limiter interface — if the postgres backend's caller-visible
|
||||||
|
// semantics drift from the memory backend's, this file fails first.
|
||||||
|
//
|
||||||
|
// Mirrors the white-box test names in sliding_window_test.go: every
|
||||||
|
// public-surface behavior pinned there (cap, expiry, disabled bypass,
|
||||||
|
// empty-key short-circuit, concurrency) gets re-pinned here for the
|
||||||
|
// postgres backend.
|
||||||
|
//
|
||||||
|
// Postgres tests skip under -short (matches the pattern in
|
||||||
|
// internal/repository/postgres/testutil_test.go); CI's
|
||||||
|
// `go test -race -short -count=1 ./...` exercises only the memory
|
||||||
|
// half. The integration job runs the full suite.
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
// Backend-equivalence helpers
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
|
||||||
|
// limiterFactory builds a fresh Limiter for one test case.
|
||||||
|
// Memory backends discard `db`; postgres backends use it.
|
||||||
|
type limiterFactory func(t *testing.T, db *sql.DB, maxN int, window time.Duration) ratelimit.Limiter
|
||||||
|
|
||||||
|
func memoryFactory(t *testing.T, _ *sql.DB, maxN int, window time.Duration) ratelimit.Limiter {
|
||||||
|
t.Helper()
|
||||||
|
// Map cap of 10_000 — large enough that none of the equivalence
|
||||||
|
// scenarios trip the LRU-eviction branch (the eviction branch is
|
||||||
|
// memory-specific; postgres has no equivalent so it's not part of
|
||||||
|
// the cross-backend contract).
|
||||||
|
return ratelimit.NewSlidingWindowLimiter(maxN, window, 10_000)
|
||||||
|
}
|
||||||
|
|
||||||
|
func postgresFactory(t *testing.T, db *sql.DB, maxN int, window time.Duration) ratelimit.Limiter {
|
||||||
|
t.Helper()
|
||||||
|
if db == nil {
|
||||||
|
t.Fatal("postgresFactory requires a non-nil *sql.DB")
|
||||||
|
}
|
||||||
|
return ratelimit.NewPostgresSlidingWindowLimiter(db, maxN, window)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
// Per-backend test entry points
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
|
||||||
|
func TestSlidingWindowLimiter_Equivalence_Memory(t *testing.T) {
|
||||||
|
t.Run("AllowsUpToCap", func(t *testing.T) { caseAllowsUpToCap(t, memoryFactory, nil) })
|
||||||
|
t.Run("DistinctKeysIndependent", func(t *testing.T) { caseDistinctKeysIndependent(t, memoryFactory, nil) })
|
||||||
|
t.Run("WindowExpiry", func(t *testing.T) { caseWindowExpiry(t, memoryFactory, nil) })
|
||||||
|
t.Run("DisabledBypass", func(t *testing.T) { caseDisabledBypass(t, memoryFactory, nil) })
|
||||||
|
t.Run("NegativeCapDisabled", func(t *testing.T) { caseNegativeCapDisabled(t, memoryFactory, nil) })
|
||||||
|
t.Run("EmptyKeyShortCircuits", func(t *testing.T) { caseEmptyKeyShortCircuits(t, memoryFactory, nil) })
|
||||||
|
t.Run("ConcurrentRaceFree", func(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("race-style test under -short")
|
||||||
|
}
|
||||||
|
caseConcurrentRaceFree(t, memoryFactory, nil)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSlidingWindowLimiter_Equivalence_Postgres(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("postgres equivalence tests require testcontainers; skipped under -short")
|
||||||
|
}
|
||||||
|
tdb := setupTestDB(t)
|
||||||
|
defer tdb.teardown(t)
|
||||||
|
|
||||||
|
t.Run("AllowsUpToCap", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "AllowsUpToCap")
|
||||||
|
caseAllowsUpToCap(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
t.Run("DistinctKeysIndependent", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "DistinctKeysIndependent")
|
||||||
|
caseDistinctKeysIndependent(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
t.Run("WindowExpiry", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "WindowExpiry")
|
||||||
|
caseWindowExpiry(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
t.Run("DisabledBypass", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "DisabledBypass")
|
||||||
|
caseDisabledBypass(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
t.Run("NegativeCapDisabled", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "NegativeCapDisabled")
|
||||||
|
caseNegativeCapDisabled(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
t.Run("EmptyKeyShortCircuits", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "EmptyKeyShortCircuits")
|
||||||
|
caseEmptyKeyShortCircuits(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
t.Run("ConcurrentRaceFree", func(t *testing.T) {
|
||||||
|
db := tdb.freshSchema(t, "ConcurrentRaceFree")
|
||||||
|
caseConcurrentRaceFree(t, postgresFactory, db)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
// Backend-agnostic test cases (one per behavior pinned in
|
||||||
|
// sliding_window_test.go's public-surface tests)
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
|
||||||
|
func caseAllowsUpToCap(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
l := mk(t, db, 3, 24*time.Hour)
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
if err := l.Allow("k", now.Add(time.Duration(i)*time.Minute)); err != nil {
|
||||||
|
t.Fatalf("call %d should be allowed: %v", i+1, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := l.Allow("k", now.Add(4*time.Minute)); !errors.Is(err, ratelimit.ErrRateLimited) {
|
||||||
|
t.Fatalf("4th call should be rate-limited; got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func caseDistinctKeysIndependent(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
l := mk(t, db, 1, 24*time.Hour)
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
if err := l.Allow("k-1", now); err != nil {
|
||||||
|
t.Fatalf("first allow: %v", err)
|
||||||
|
}
|
||||||
|
if err := l.Allow("k-2", now); err != nil {
|
||||||
|
t.Fatalf("different key must have its own bucket: %v", err)
|
||||||
|
}
|
||||||
|
if err := l.Allow("k-1", now.Add(1*time.Second)); !errors.Is(err, ratelimit.ErrRateLimited) {
|
||||||
|
t.Fatalf("repeat key should be limited; got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func caseWindowExpiry(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
l := mk(t, db, 2, 1*time.Hour)
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
if err := l.Allow("k", now); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := l.Allow("k", now.Add(30*time.Minute)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// Inside window — limited.
|
||||||
|
if err := l.Allow("k", now.Add(45*time.Minute)); !errors.Is(err, ratelimit.ErrRateLimited) {
|
||||||
|
t.Fatalf("inside-window 3rd call should be limited: %v", err)
|
||||||
|
}
|
||||||
|
// Past window — slots reopen.
|
||||||
|
if err := l.Allow("k", now.Add(2*time.Hour)); err != nil {
|
||||||
|
t.Fatalf("past-window call should be allowed (window reset): %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func caseDisabledBypass(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
l := mk(t, db, 0, 24*time.Hour) // maxN=0 → disabled
|
||||||
|
type disablable interface {
|
||||||
|
Disabled() bool
|
||||||
|
}
|
||||||
|
if d, ok := l.(disablable); ok && !d.Disabled() {
|
||||||
|
t.Fatal("limiter with maxN=0 must report Disabled()=true")
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if err := l.Allow("k", now); err != nil {
|
||||||
|
t.Fatalf("disabled limiter must allow everything: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func caseNegativeCapDisabled(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
l := mk(t, db, -1, 24*time.Hour)
|
||||||
|
type disablable interface {
|
||||||
|
Disabled() bool
|
||||||
|
}
|
||||||
|
if d, ok := l.(disablable); ok && !d.Disabled() {
|
||||||
|
t.Fatal("negative maxN must produce a disabled limiter")
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
if err := l.Allow("k", now); err != nil {
|
||||||
|
t.Fatalf("disabled limiter must allow: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func caseEmptyKeyShortCircuits(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
// Empty key is the caller's defense-in-depth case — caller's
|
||||||
|
// validation upstream should reject empty-key events first. Limiter
|
||||||
|
// must not build a single shared bucket keyed by empty-key — that
|
||||||
|
// would be a chokepoint for every empty-key event.
|
||||||
|
l := mk(t, db, 1, 24*time.Hour)
|
||||||
|
now := time.Now()
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
if err := l.Allow("", now); err != nil {
|
||||||
|
t.Fatalf("empty key must short-circuit (call %d): %v", i, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func caseConcurrentRaceFree(t *testing.T, mk limiterFactory, db *sql.DB) {
|
||||||
|
l := mk(t, db, 50, 24*time.Hour)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for g := 0; g < 20; g++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(id int) {
|
||||||
|
defer wg.Done()
|
||||||
|
now := time.Now()
|
||||||
|
key := fmt.Sprintf("k-%d", id)
|
||||||
|
for i := 0; i < 30; i++ {
|
||||||
|
_ = l.Allow(key, now)
|
||||||
|
}
|
||||||
|
}(g)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
// Postgres-only testcontainers harness — mirrors
|
||||||
|
// internal/repository/postgres/testutil_test.go's setupTestDB +
|
||||||
|
// freshSchema pattern.
|
||||||
|
// ----------------------------------------------------------------
|
||||||
|
|
||||||
|
type testDB struct {
|
||||||
|
db *sql.DB
|
||||||
|
container testcontainers.Container
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupTestDB(t *testing.T) *testDB {
|
||||||
|
t.Helper()
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
req := testcontainers.ContainerRequest{
|
||||||
|
Image: "postgres:16-alpine",
|
||||||
|
ExposedPorts: []string{"5432/tcp"},
|
||||||
|
Env: map[string]string{
|
||||||
|
"POSTGRES_DB": "certctl_test",
|
||||||
|
"POSTGRES_USER": "certctl",
|
||||||
|
"POSTGRES_PASSWORD": "certctl",
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForLog("database system is ready to accept connections").WithOccurrence(2),
|
||||||
|
}
|
||||||
|
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||||
|
ContainerRequest: req,
|
||||||
|
Started: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("start postgres container: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
host, err := container.Host(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("container host: %v", err)
|
||||||
|
}
|
||||||
|
port, err := container.MappedPort(ctx, "5432")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("container port: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
connStr := fmt.Sprintf("postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable", host, port.Port())
|
||||||
|
db, err := sql.Open("postgres", connStr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open db: %v", err)
|
||||||
|
}
|
||||||
|
// Pool size > 1 so the multi-goroutine concurrency case can hold
|
||||||
|
// multiple connections simultaneously; the row-lock arbitrates.
|
||||||
|
db.SetMaxOpenConns(8)
|
||||||
|
|
||||||
|
if err := db.Ping(); err != nil {
|
||||||
|
t.Fatalf("ping: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &testDB{db: db, container: container}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tdb *testDB) teardown(t *testing.T) {
|
||||||
|
t.Helper()
|
||||||
|
if tdb.db != nil {
|
||||||
|
tdb.db.Close()
|
||||||
|
}
|
||||||
|
if tdb.container != nil {
|
||||||
|
_ = tdb.container.Terminate(context.Background())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// freshSchema creates an isolated schema per test case + runs the
|
||||||
|
// rate_limit_buckets migration inside it. Returns a *sql.DB whose
|
||||||
|
// search_path is scoped to the new schema.
|
||||||
|
//
|
||||||
|
// Note: this helper takes a sub-test label (caller-supplied) so the
|
||||||
|
// schema name is deterministic-per-case + stable across runs. The
|
||||||
|
// canonical postgres testutil uses t.Name() but we're inside Run-
|
||||||
|
// nested subtests where t.Name() includes "/" — flatten it.
|
||||||
|
func (tdb *testDB) freshSchema(t *testing.T, label string) *sql.DB {
|
||||||
|
t.Helper()
|
||||||
|
schema := sanitizeSchemaName(label + "_" + t.Name())
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// One connection-scoped session so SET search_path persists.
|
||||||
|
conn, err := tdb.db.Conn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("acquire conn: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", schema)); err != nil {
|
||||||
|
t.Fatalf("create schema: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := conn.ExecContext(ctx, fmt.Sprintf("SET search_path TO %s, public", schema)); err != nil {
|
||||||
|
t.Fatalf("set search_path: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run the rate_limit_buckets migration in this schema. The migration
|
||||||
|
// is the only one that introduces our table; other migrations don't
|
||||||
|
// matter for limiter behavior.
|
||||||
|
migPath := findMigration("000046_rate_limit_buckets.up.sql")
|
||||||
|
body, err := os.ReadFile(migPath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read migration: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := conn.ExecContext(ctx, string(body)); err != nil {
|
||||||
|
t.Fatalf("apply migration: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
conn.ExecContext(context.Background(), fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", schema))
|
||||||
|
conn.Close()
|
||||||
|
})
|
||||||
|
|
||||||
|
// Wrap the single connection in a *sql.DB-like by returning a fresh
|
||||||
|
// pool that goes through the same search_path. Simpler: just return
|
||||||
|
// the underlying *sql.DB and SET search_path session-wide by re-
|
||||||
|
// running the SET on every checkout. The cleanest move is to use
|
||||||
|
// the per-connection helper: return a *sql.DB that's actually a
|
||||||
|
// "limited to N=1 connection with search_path pinned" handle.
|
||||||
|
//
|
||||||
|
// Workaround the easy way: build a fresh *sql.DB whose dsn embeds
|
||||||
|
// search_path as a connection-time setting, so every connection
|
||||||
|
// auto-applies it.
|
||||||
|
dsn := connDSNWithSearchPath(tdb, schema)
|
||||||
|
scoped, err := sql.Open("postgres", dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("open scoped db: %v", err)
|
||||||
|
}
|
||||||
|
scoped.SetMaxOpenConns(8)
|
||||||
|
t.Cleanup(func() { scoped.Close() })
|
||||||
|
|
||||||
|
// Sanity: row exists / table exists.
|
||||||
|
if _, err := scoped.ExecContext(ctx, "SELECT 1 FROM rate_limit_buckets LIMIT 1"); err != nil && !strings.Contains(err.Error(), "no rows") {
|
||||||
|
// Empty table is fine; only a missing-table error matters.
|
||||||
|
// "no rows" never fires here (we used Exec not Query).
|
||||||
|
t.Fatalf("smoke select: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return scoped
|
||||||
|
}
|
||||||
|
|
||||||
|
func connDSNWithSearchPath(tdb *testDB, schema string) string {
|
||||||
|
// Derive the DSN by introspection of the container's host/port.
|
||||||
|
// Couldn't pre-store because freshSchema can be called many times.
|
||||||
|
ctx := context.Background()
|
||||||
|
host, _ := tdb.container.Host(ctx)
|
||||||
|
port, _ := tdb.container.MappedPort(ctx, "5432")
|
||||||
|
return fmt.Sprintf(
|
||||||
|
"postgres://certctl:certctl@%s:%s/certctl_test?sslmode=disable&search_path=%s,public",
|
||||||
|
host, port.Port(), schema,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sanitizeSchemaName(name string) string {
|
||||||
|
name = strings.ToLower(name)
|
||||||
|
for _, ch := range []string{"/", " ", "-", "."} {
|
||||||
|
name = strings.ReplaceAll(name, ch, "_")
|
||||||
|
}
|
||||||
|
if len(name) > 50 {
|
||||||
|
name = name[:50]
|
||||||
|
}
|
||||||
|
return "test_rl_" + name
|
||||||
|
}
|
||||||
|
|
||||||
|
func findMigration(filename string) string {
|
||||||
|
_, here, _, _ := runtime.Caller(0)
|
||||||
|
// here = .../internal/ratelimit/equivalence_test.go
|
||||||
|
// migrations = .../migrations
|
||||||
|
dir := filepath.Dir(here)
|
||||||
|
for i := 0; i < 6; i++ {
|
||||||
|
candidate := filepath.Join(dir, "migrations", filename)
|
||||||
|
if _, err := os.Stat(candidate); err == nil {
|
||||||
|
return candidate
|
||||||
|
}
|
||||||
|
dir = filepath.Dir(dir)
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
@@ -0,0 +1,54 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package ratelimit
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// Limiter is the rate-limit primitive every caller in cmd/server +
|
||||||
|
// internal/api/handler + internal/service consumes. Two backends
|
||||||
|
// satisfy this interface:
|
||||||
|
//
|
||||||
|
// - SlidingWindowLimiter (in-memory; the historical default;
|
||||||
|
// declared in sliding_window.go).
|
||||||
|
// - PostgresSlidingWindowLimiter (cross-replica-consistent;
|
||||||
|
// declared in postgres_sliding_window.go; introduced in Phase 13
|
||||||
|
// Sprint 13.2 for the ARCH-M1 substantive close).
|
||||||
|
//
|
||||||
|
// Sprint 13.3 (next) wires every call site through the operator-
|
||||||
|
// chosen backend via the CERTCTL_RATELIMIT_BACKEND={memory,postgres}
|
||||||
|
// env var. Until then, both backends compile + tests for both pass,
|
||||||
|
// but the production call sites still construct SlidingWindowLimiter
|
||||||
|
// directly.
|
||||||
|
//
|
||||||
|
// Sprint 13.2 signature note: the prompt template specified
|
||||||
|
// `Allow(key string) error`, but the actual repo signature has been
|
||||||
|
// `Allow(key string, now time.Time) error` since the EST RFC 7030
|
||||||
|
// hardening master bundle Phase 4.1 — the `now` parameter is what
|
||||||
|
// makes the memory limiter testable against synthetic time. The
|
||||||
|
// interface matches the actual signature so the existing
|
||||||
|
// SlidingWindowLimiter satisfies Limiter without a method-set change.
|
||||||
|
//
|
||||||
|
// Per CLAUDE.md "the repo is truth" principle, code grounded against
|
||||||
|
// the live signature (not the prompt's draft).
|
||||||
|
type Limiter interface {
|
||||||
|
// Allow records a request at the given key/time and returns
|
||||||
|
// ErrRateLimited if the configured cap is exceeded inside the
|
||||||
|
// configured window. nil otherwise.
|
||||||
|
//
|
||||||
|
// Empty `key` short-circuits to nil (caller's defense-in-depth;
|
||||||
|
// caller upstream validation should reject empty-key events
|
||||||
|
// first — building a single shared bucket keyed by empty-key
|
||||||
|
// would be a chokepoint for every empty-key event).
|
||||||
|
//
|
||||||
|
// Disabled limiters (maxN <= 0) return nil for every call.
|
||||||
|
Allow(key string, now time.Time) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compile-time interface satisfaction checks. Drift in either
|
||||||
|
// backend's Allow signature fails the build at this file before any
|
||||||
|
// caller breaks.
|
||||||
|
var (
|
||||||
|
_ Limiter = (*SlidingWindowLimiter)(nil)
|
||||||
|
_ Limiter = (*PostgresSlidingWindowLimiter)(nil)
|
||||||
|
)
|
||||||
@@ -0,0 +1,204 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package ratelimit
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/lib/pq"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit
|
||||||
|
// ARCH-M1): the cross-replica-consistent rate-limit backend. Same
|
||||||
|
// algorithm as SlidingWindowLimiter (prune-on-Allow sliding-window log)
|
||||||
|
// but the state lives in postgres so N replicas see the same per-key
|
||||||
|
// bucket. Replaces the per-process in-memory limit when the operator
|
||||||
|
// sets CERTCTL_RATELIMIT_BACKEND=postgres (wired in Sprint 13.3).
|
||||||
|
//
|
||||||
|
// Algorithm
|
||||||
|
// =========
|
||||||
|
// Each Allow call runs a single BEGIN/COMMIT transaction:
|
||||||
|
//
|
||||||
|
// 1. INSERT ... ON CONFLICT (bucket_key) DO NOTHING — ensure the
|
||||||
|
// row exists so the SELECT FOR UPDATE below has something to lock.
|
||||||
|
// 2. SELECT timestamps FROM rate_limit_buckets WHERE bucket_key=$1
|
||||||
|
// FOR UPDATE — acquire the per-key row lock for the rest of the
|
||||||
|
// transaction.
|
||||||
|
// 3. Prune timestamps older than (now - window) in Go (reusing the
|
||||||
|
// unexported pruneOlderThan helper shared with SlidingWindowLimiter
|
||||||
|
// — single source of truth for the prune semantics).
|
||||||
|
// 4. If cardinality(pruned) >= maxN: persist the pruned state without
|
||||||
|
// appending, COMMIT, return ErrRateLimited.
|
||||||
|
// 5. Else: append `now`, persist, COMMIT, return nil.
|
||||||
|
//
|
||||||
|
// SELECT FOR UPDATE serializes Allow calls for the same key across
|
||||||
|
// replicas: replicas A and B firing simultaneous Allow("k") never
|
||||||
|
// race because Postgres' row-lock arbitrates. This is the entire
|
||||||
|
// reason for the close — the memory backend's sync.Mutex only
|
||||||
|
// arbitrates within a process; pg's row lock arbitrates the cluster.
|
||||||
|
//
|
||||||
|
// Why a transaction (not a single CTE)
|
||||||
|
// ====================================
|
||||||
|
// A "compute everything in one SQL statement" approach using
|
||||||
|
// INSERT ... ON CONFLICT DO UPDATE SET timestamps = CASE WHEN ... is
|
||||||
|
// possible but the conditional logic to gate the append on the
|
||||||
|
// pruned-cardinality requires nested CTEs whose check-then-act
|
||||||
|
// semantics are hard to read + harder to convince yourself are
|
||||||
|
// race-free across all isolation levels. The explicit transaction
|
||||||
|
// version above is correct under READ COMMITTED (Postgres' default),
|
||||||
|
// matches the memory backend's read-decide-write shape line-for-line,
|
||||||
|
// and shares the same prune helper. Two extra round-trips per Allow
|
||||||
|
// vs one is acceptable for the rate-limit hot path — the operation
|
||||||
|
// is gated anyway.
|
||||||
|
//
|
||||||
|
// Sprint 13.3 will wire the scheduler janitor loop that GCs rows
|
||||||
|
// whose updated_at is older than the longest configured window; the
|
||||||
|
// migration ships the supporting btree index on updated_at.
|
||||||
|
|
||||||
|
// PostgresSlidingWindowLimiter implements Limiter against the
|
||||||
|
// rate_limit_buckets table introduced in migration 000046.
|
||||||
|
//
|
||||||
|
// Constructed via NewPostgresSlidingWindowLimiter. The zero value is
|
||||||
|
// NOT usable — the db handle is required.
|
||||||
|
//
|
||||||
|
// Concurrency: safe for concurrent Allow calls across goroutines AND
|
||||||
|
// across N replicas (the underlying SELECT FOR UPDATE serializes
|
||||||
|
// per-key access across the cluster).
|
||||||
|
type PostgresSlidingWindowLimiter struct {
|
||||||
|
db *sql.DB
|
||||||
|
maxN int
|
||||||
|
window time.Duration
|
||||||
|
disabled bool // maxN <= 0 → all Allow calls return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPostgresSlidingWindowLimiter returns a limiter with the given
|
||||||
|
// per-key cap + window. maxN <= 0 disables the limiter (all Allow
|
||||||
|
// calls return nil); matches the memory backend's opt-out semantics
|
||||||
|
// for test harnesses + sketchpad deploys.
|
||||||
|
//
|
||||||
|
// Window defaults to 24h when zero, mirroring SlidingWindowLimiter.
|
||||||
|
//
|
||||||
|
// The db argument is required + must outlive the limiter. Construction
|
||||||
|
// itself does NOT touch the database — DDL is owned by migration
|
||||||
|
// 000046_rate_limit_buckets.up.sql which runs at boot via
|
||||||
|
// cmd/server's RunMigrations path.
|
||||||
|
func NewPostgresSlidingWindowLimiter(db *sql.DB, maxN int, window time.Duration) *PostgresSlidingWindowLimiter {
|
||||||
|
if window <= 0 {
|
||||||
|
window = 24 * time.Hour
|
||||||
|
}
|
||||||
|
disabled := maxN <= 0
|
||||||
|
return &PostgresSlidingWindowLimiter{
|
||||||
|
db: db,
|
||||||
|
maxN: maxN,
|
||||||
|
window: window,
|
||||||
|
disabled: disabled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Allow records a request at the given (key, now) and returns
|
||||||
|
// ErrRateLimited if the configured cap is exceeded inside the
|
||||||
|
// configured window. Matches SlidingWindowLimiter.Allow byte-for-byte
|
||||||
|
// in caller-visible semantics so Sprint 13.3's backend-selector swap
|
||||||
|
// is signature-clean.
|
||||||
|
//
|
||||||
|
// The `now` argument is the timestamp the call is "happening at".
|
||||||
|
// Used as the prune cutoff (entries older than now-window are dropped)
|
||||||
|
// and as the new appended entry. Tests pass synthetic `now` values
|
||||||
|
// to exercise window-expiry deterministically; production call sites
|
||||||
|
// pass time.Now() (matching how SlidingWindowLimiter is invoked
|
||||||
|
// today — see internal/api/handler/{est,export,certificates,
|
||||||
|
// auth_breakglass}.go).
|
||||||
|
//
|
||||||
|
// Empty `key` short-circuits to nil (matches the memory backend's
|
||||||
|
// chokepoint-avoidance contract).
|
||||||
|
func (l *PostgresSlidingWindowLimiter) Allow(key string, now time.Time) error {
|
||||||
|
if l.disabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if key == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
tx, err := l.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("ratelimit: begin tx: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
// Rollback is a no-op once the tx is committed; safe to defer
|
||||||
|
// unconditionally for the error paths.
|
||||||
|
_ = tx.Rollback()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Step 1: ensure the row exists so SELECT FOR UPDATE has something
|
||||||
|
// to lock. ON CONFLICT DO NOTHING is a no-op when the row already
|
||||||
|
// exists.
|
||||||
|
if _, err := tx.ExecContext(ctx, `
|
||||||
|
INSERT INTO rate_limit_buckets (bucket_key, timestamps, updated_at)
|
||||||
|
VALUES ($1, '{}', $2)
|
||||||
|
ON CONFLICT (bucket_key) DO NOTHING
|
||||||
|
`, key, now); err != nil {
|
||||||
|
return fmt.Errorf("ratelimit: ensure row: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2: lock the row + read current state.
|
||||||
|
var existing pq.GenericArray
|
||||||
|
var ts []time.Time
|
||||||
|
existing.A = &ts
|
||||||
|
if err := tx.QueryRowContext(ctx, `
|
||||||
|
SELECT COALESCE(timestamps, '{}'::timestamptz[])
|
||||||
|
FROM rate_limit_buckets
|
||||||
|
WHERE bucket_key = $1
|
||||||
|
FOR UPDATE
|
||||||
|
`, key).Scan(&existing); err != nil {
|
||||||
|
// Shouldn't happen — step 1 ensured the row exists. Treat
|
||||||
|
// the sql.ErrNoRows path as a no-op (be conservative; never
|
||||||
|
// over-limit on transient DB weirdness).
|
||||||
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("ratelimit: select-for-update: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 3: prune in Go via the shared helper. Same prune semantics
|
||||||
|
// as SlidingWindowLimiter — single source of truth.
|
||||||
|
cutoff := now.Add(-l.window)
|
||||||
|
pruned := pruneOlderThan(ts, cutoff)
|
||||||
|
|
||||||
|
// Step 4: decide.
|
||||||
|
rateLimited := len(pruned) >= l.maxN
|
||||||
|
if !rateLimited {
|
||||||
|
pruned = append(pruned, now)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 5: persist.
|
||||||
|
if _, err := tx.ExecContext(ctx, `
|
||||||
|
UPDATE rate_limit_buckets
|
||||||
|
SET timestamps = $2, updated_at = $3
|
||||||
|
WHERE bucket_key = $1
|
||||||
|
`, key, pq.Array(pruned), now); err != nil {
|
||||||
|
return fmt.Errorf("ratelimit: update: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return fmt.Errorf("ratelimit: commit: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rateLimited {
|
||||||
|
return ErrRateLimited
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disabled reports whether the limiter is in opt-out mode (maxN <= 0).
|
||||||
|
// Mirrors SlidingWindowLimiter.Disabled() so handler-side gating +
|
||||||
|
// admin-endpoint observability can ask the same question of either
|
||||||
|
// backend.
|
||||||
|
func (l *PostgresSlidingWindowLimiter) Disabled() bool {
|
||||||
|
return l.disabled
|
||||||
|
}
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
-- Phase 13 Sprint 13.2 reversal — drop the rate-limit bucket table.
|
||||||
|
-- Down migrations are not run in production; this file exists for
|
||||||
|
-- developer-side rollback during integration testing.
|
||||||
|
|
||||||
|
DROP INDEX IF EXISTS rate_limit_buckets_updated_at_idx;
|
||||||
|
DROP TABLE IF EXISTS rate_limit_buckets;
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
-- Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit
|
||||||
|
-- ARCH-M1): introduce a postgres-backed sliding-window rate limiter so
|
||||||
|
-- per-process / in-memory limits become cross-replica-consistent when
|
||||||
|
-- the operator sets CERTCTL_RATELIMIT_BACKEND=postgres (wired in
|
||||||
|
-- Sprint 13.3).
|
||||||
|
--
|
||||||
|
-- One row per (bucket_key) — caller composes the key the same way the
|
||||||
|
-- memory backend already does (e.g. "subject|issuer" for SCEP/Intune,
|
||||||
|
-- "srcIP|peek" for EST failed-basic, raw "actor" for export, etc.).
|
||||||
|
-- The `timestamps` array stores the in-window log; prune-on-Allow
|
||||||
|
-- keeps it bounded by the limiter's maxN cap.
|
||||||
|
--
|
||||||
|
-- updated_at + the index on it support the Sprint 13.3 scheduler
|
||||||
|
-- janitor loop: any row whose updated_at is older than the longest
|
||||||
|
-- configured window is safely deletable.
|
||||||
|
--
|
||||||
|
-- Per CLAUDE.md "Idempotent migrations" architecture decision:
|
||||||
|
-- IF NOT EXISTS on every statement. Re-running this migration is
|
||||||
|
-- a no-op on a database that already has the table.
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS rate_limit_buckets (
|
||||||
|
bucket_key TEXT PRIMARY KEY,
|
||||||
|
timestamps TIMESTAMPTZ[] NOT NULL DEFAULT '{}',
|
||||||
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS rate_limit_buckets_updated_at_idx
|
||||||
|
ON rate_limit_buckets (updated_at);
|
||||||
Reference in New Issue
Block a user