Files
certctl/internal/ratelimit/postgres_sliding_window.go
shankar0123 c8985cf868 fix(ratelimit): Hotfix #5 — Postgres timestamptz[] scan + skip-inventory drift
Two CI hotfixes surfaced by master CI on 29cb13e7 (Sprint 13.6 tip
before the Sprint 13.7 closure landed):

1. TestRateLimit_PostgresBackend_CapEnforcedAcrossReplicas failed with
   "pq: scanning to time.Time is not implemented; only sql.Scanner".
   Root cause: time.Time does not implement sql.Scanner, and lib/pq's
   pq.GenericArray scan path calls element-Scan() directly rather than
   database/sql's convertAssign (which DOES support time conversions).
   So `pq.Array(&[]time.Time{})` reliably fails on read even though
   the symmetric write `pq.Array([]time.Time{...})` works (the write
   path uses driver.Value() which time.Time implements).

   Fix: cast the timestamptz[] to a text[] of canonical ISO 8601 UTC
   strings at the SQL boundary via to_char(t AT TIME ZONE 'UTC',
   'YYYY-MM-DD"T"HH24:MI:SS.US"Z"'), read via pq.StringArray (well-
   supported), and parse Go-side with layout "2006-01-02T15:04:05.000000Z".
   The format is fully deterministic regardless of the session's
   DateStyle or TimeZone settings.

   Touched: internal/ratelimit/postgres_sliding_window.go (Step 2 of
   the Allow() transaction — locking + read).

   Falsifiable proof on CI: the failing test
   TestRateLimit_PostgresBackend_CapEnforcedAcrossReplicas
   (100 concurrent Allow calls / 3 replicas / cap=10) must now produce
   exactly 10 succeed / 90 ErrRateLimited. Pre-fix it produced 1 / 0
   because every Allow after the first crashed on Scan.

2. skip-inventory-drift.sh CI guard turned red because Sprint 13.2
   added two new t.Skip sites:

     internal/ratelimit/equivalence_test.go:80
       t.Skip("race-style test under -short")
     internal/ratelimit/equivalence_test.go:88
       t.Skip("postgres equivalence tests require testcontainers;
              skipped under -short")

   The inventory at docs/testing/skip-inventory.md is auto-generated
   by scripts/skip-inventory.sh and must be re-generated alongside
   any t.Skip churn. Sprint 13.2 missed the regeneration.

   Fix: re-ran scripts/skip-inventory.sh. Totals walked
   142 → 144 sites; testing.Short() guards 76 → 78. The two new
   entries land in the internal/ratelimit section.

Verification (local sandbox, all clean):
  $ bash scripts/ci-guards/skip-inventory-drift.sh
    skip-inventory-drift guard OK: docs/testing/skip-inventory.md
    matches the live tree
  $ bash scripts/ci-guards/openapi-handler-parity.sh
    openapi-handler-parity: clean.
  $ bash scripts/ci-guards/openapi-rest-deferred-monotonic.sh
    openapi-rest-deferred-monotonic: clean — rest-deferred = 0,
    baseline = 0.
  $ gofmt -l internal/ratelimit/postgres_sliding_window.go
    (no output)
  $ go vet ./internal/ratelimit/
    (no output)

The Postgres rate-limit fix's full falsifiable proof
(TestRateLimit_PostgresBackend_CapEnforcedAcrossReplicas) cannot be
exercised in the sandbox (no docker for testcontainers); CI on the
amd64 runner will re-run it on this push. The diagnosis is verified
against lib/pq source semantics and the fix uses only well-supported
primitives (pq.StringArray + canonical to_char output + time.Parse).
2026-05-14 13:26:47 +00:00

229 lines
8.3 KiB
Go

// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package ratelimit
import (
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/lib/pq"
)
// Phase 13 Sprint 13.2 closure (2026-05-14, architecture diligence audit
// ARCH-M1): the cross-replica-consistent rate-limit backend. Same
// algorithm as SlidingWindowLimiter (prune-on-Allow sliding-window log)
// but the state lives in postgres so N replicas see the same per-key
// bucket. Replaces the per-process in-memory limit when the operator
// sets CERTCTL_RATELIMIT_BACKEND=postgres (wired in Sprint 13.3).
//
// Algorithm
// =========
// Each Allow call runs a single BEGIN/COMMIT transaction:
//
// 1. INSERT ... ON CONFLICT (bucket_key) DO NOTHING — ensure the
// row exists so the SELECT FOR UPDATE below has something to lock.
// 2. SELECT timestamps FROM rate_limit_buckets WHERE bucket_key=$1
// FOR UPDATE — acquire the per-key row lock for the rest of the
// transaction.
// 3. Prune timestamps older than (now - window) in Go (reusing the
// unexported pruneOlderThan helper shared with SlidingWindowLimiter
// — single source of truth for the prune semantics).
// 4. If cardinality(pruned) >= maxN: persist the pruned state without
// appending, COMMIT, return ErrRateLimited.
// 5. Else: append `now`, persist, COMMIT, return nil.
//
// SELECT FOR UPDATE serializes Allow calls for the same key across
// replicas: replicas A and B firing simultaneous Allow("k") never
// race because Postgres' row-lock arbitrates. This is the entire
// reason for the close — the memory backend's sync.Mutex only
// arbitrates within a process; pg's row lock arbitrates the cluster.
//
// Why a transaction (not a single CTE)
// ====================================
// A "compute everything in one SQL statement" approach using
// INSERT ... ON CONFLICT DO UPDATE SET timestamps = CASE WHEN ... is
// possible but the conditional logic to gate the append on the
// pruned-cardinality requires nested CTEs whose check-then-act
// semantics are hard to read + harder to convince yourself are
// race-free across all isolation levels. The explicit transaction
// version above is correct under READ COMMITTED (Postgres' default),
// matches the memory backend's read-decide-write shape line-for-line,
// and shares the same prune helper. Two extra round-trips per Allow
// vs one is acceptable for the rate-limit hot path — the operation
// is gated anyway.
//
// Sprint 13.3 will wire the scheduler janitor loop that GCs rows
// whose updated_at is older than the longest configured window; the
// migration ships the supporting btree index on updated_at.
// PostgresSlidingWindowLimiter implements Limiter against the
// rate_limit_buckets table introduced in migration 000046.
//
// Constructed via NewPostgresSlidingWindowLimiter. The zero value is
// NOT usable — the db handle is required.
//
// Concurrency: safe for concurrent Allow calls across goroutines AND
// across N replicas (the underlying SELECT FOR UPDATE serializes
// per-key access across the cluster).
type PostgresSlidingWindowLimiter struct {
db *sql.DB
maxN int
window time.Duration
disabled bool // maxN <= 0 → all Allow calls return nil
}
// NewPostgresSlidingWindowLimiter returns a limiter with the given
// per-key cap + window. maxN <= 0 disables the limiter (all Allow
// calls return nil); matches the memory backend's opt-out semantics
// for test harnesses + sketchpad deploys.
//
// Window defaults to 24h when zero, mirroring SlidingWindowLimiter.
//
// The db argument is required + must outlive the limiter. Construction
// itself does NOT touch the database — DDL is owned by migration
// 000046_rate_limit_buckets.up.sql which runs at boot via
// cmd/server's RunMigrations path.
func NewPostgresSlidingWindowLimiter(db *sql.DB, maxN int, window time.Duration) *PostgresSlidingWindowLimiter {
if window <= 0 {
window = 24 * time.Hour
}
disabled := maxN <= 0
return &PostgresSlidingWindowLimiter{
db: db,
maxN: maxN,
window: window,
disabled: disabled,
}
}
// Allow records a request at the given (key, now) and returns
// ErrRateLimited if the configured cap is exceeded inside the
// configured window. Matches SlidingWindowLimiter.Allow byte-for-byte
// in caller-visible semantics so Sprint 13.3's backend-selector swap
// is signature-clean.
//
// The `now` argument is the timestamp the call is "happening at".
// Used as the prune cutoff (entries older than now-window are dropped)
// and as the new appended entry. Tests pass synthetic `now` values
// to exercise window-expiry deterministically; production call sites
// pass time.Now() (matching how SlidingWindowLimiter is invoked
// today — see internal/api/handler/{est,export,certificates,
// auth_breakglass}.go).
//
// Empty `key` short-circuits to nil (matches the memory backend's
// chokepoint-avoidance contract).
func (l *PostgresSlidingWindowLimiter) Allow(key string, now time.Time) error {
if l.disabled {
return nil
}
if key == "" {
return nil
}
ctx := context.Background()
tx, err := l.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil {
return fmt.Errorf("ratelimit: begin tx: %w", err)
}
defer func() {
// Rollback is a no-op once the tx is committed; safe to defer
// unconditionally for the error paths.
_ = tx.Rollback()
}()
// Step 1: ensure the row exists so SELECT FOR UPDATE has something
// to lock. ON CONFLICT DO NOTHING is a no-op when the row already
// exists.
if _, err := tx.ExecContext(ctx, `
INSERT INTO rate_limit_buckets (bucket_key, timestamps, updated_at)
VALUES ($1, '{}', $2)
ON CONFLICT (bucket_key) DO NOTHING
`, key, now); err != nil {
return fmt.Errorf("ratelimit: ensure row: %w", err)
}
// Step 2: lock the row + read current state. lib/pq cannot scan a
// TIMESTAMPTZ[] column back into []time.Time directly: time.Time
// does not implement sql.Scanner, and pq.GenericArray's per-element
// scan path calls Scan() (not database/sql's convertAssign), so the
// inner Scan fails with
// "pq: scanning to time.Time is not implemented; only sql.Scanner".
// Workaround: ask Postgres to format each timestamp as a canonical
// ISO 8601 UTC string via to_char(... AT TIME ZONE 'UTC', ...), read
// the column as text[] via pq.StringArray (well-supported), and
// parse Go-side. The to_char format is fully deterministic (6-digit
// microseconds, "T" separator, "Z" suffix) regardless of the
// session's DateStyle / TimeZone settings.
const pgTimestampLayout = "2006-01-02T15:04:05.000000Z"
var tsStrings pq.StringArray
if err := tx.QueryRowContext(ctx, `
SELECT COALESCE(
ARRAY(
SELECT to_char(t AT TIME ZONE 'UTC', 'YYYY-MM-DD"T"HH24:MI:SS.US"Z"')
FROM unnest(timestamps) AS t
),
ARRAY[]::text[]
)
FROM rate_limit_buckets
WHERE bucket_key = $1
FOR UPDATE
`, key).Scan(&tsStrings); err != nil {
// Shouldn't happen — step 1 ensured the row exists. Treat
// the sql.ErrNoRows path as a no-op (be conservative; never
// over-limit on transient DB weirdness).
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return fmt.Errorf("ratelimit: select-for-update: %w", err)
}
ts := make([]time.Time, 0, len(tsStrings))
for _, s := range tsStrings {
parsed, err := time.Parse(pgTimestampLayout, s)
if err != nil {
return fmt.Errorf("ratelimit: parse stored timestamp %q: %w", s, err)
}
ts = append(ts, parsed.UTC())
}
// Step 3: prune in Go via the shared helper. Same prune semantics
// as SlidingWindowLimiter — single source of truth.
cutoff := now.Add(-l.window)
pruned := pruneOlderThan(ts, cutoff)
// Step 4: decide.
rateLimited := len(pruned) >= l.maxN
if !rateLimited {
pruned = append(pruned, now)
}
// Step 5: persist.
if _, err := tx.ExecContext(ctx, `
UPDATE rate_limit_buckets
SET timestamps = $2, updated_at = $3
WHERE bucket_key = $1
`, key, pq.Array(pruned), now); err != nil {
return fmt.Errorf("ratelimit: update: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("ratelimit: commit: %w", err)
}
if rateLimited {
return ErrRateLimited
}
return nil
}
// Disabled reports whether the limiter is in opt-out mode (maxN <= 0).
// Mirrors SlidingWindowLimiter.Disabled() so handler-side gating +
// admin-endpoint observability can ask the same question of either
// backend.
func (l *PostgresSlidingWindowLimiter) Disabled() bool {
return l.disabled
}