Files
certctl/internal/repository/postgres/session.go
T
shankar0123 b37cd6991b auth-bundle-2 Phase 2b: repository interfaces + Postgres impls + integration tests
Closes Phase 2 end-to-end. Builds on Phase 2a's three migrations
(000034 oidc_providers + group_role_mappings, 000035 sessions +
session_signing_keys, 000036 users) by shipping the repository surface
Phase 3+ services consume.

Interfaces:
* internal/repository/oidc.go - OIDCProviderRepository (List, Get,
  GetByName, Create, Update, Delete) + GroupRoleMappingRepository
  (ListByProvider, Get, Add, Remove, Map). Sentinels:
  ErrOIDCProviderNotFound, ErrOIDCProviderDuplicateName,
  ErrOIDCProviderInUse (FK ON DELETE RESTRICT translation),
  ErrGroupRoleMappingNotFound, ErrGroupRoleMappingDuplicate.
* internal/repository/session.go - SessionRepository (Create, Get,
  ListByActor, UpdateLastSeen, Revoke, RevokeAllForActor,
  GarbageCollectExpired, Delete) + SessionSigningKeyRepository (List,
  GetActive, Get, Add, Retire, Delete). Sentinels: ErrSessionNotFound,
  ErrSessionRevoked, ErrSessionExpired, ErrSessionSigningKeyNotFound,
  ErrSessionSigningKeyInUse.
* internal/repository/user.go - UserRepository (Get, GetByOIDCSubject,
  Create, Update, ListAll). Sentinels: ErrUserNotFound,
  ErrUserDuplicateOIDCSubject.

Postgres implementations:
* internal/repository/postgres/oidc.go - 309 lines. Translates
  SQLSTATE 23505 (unique_violation) to ErrOIDCProviderDuplicateName /
  ErrGroupRoleMappingDuplicate; SQLSTATE 23503 (foreign_key_violation)
  to ErrOIDCProviderInUse so the Phase 5 handler maps to HTTP 409
  when an operator tries to delete a provider with authenticated
  users. pq.StringArray bridges Go []string to Postgres TEXT[] for
  scopes + allowed_email_domains. Map() uses
  `WHERE group_name = ANY($2)` so a single SELECT resolves N IdP
  group claims at once.
* internal/repository/postgres/session.go - 350 lines. Both Session +
  SessionSigningKey repos. Revoke + Retire are idempotent (re-revoking
  an already-revoked session returns nil; same for retire). The
  GarbageCollectExpired sweep deletes both
  absolute-expiry-passed sessions AND pre-login rows older than the
  10-minute TTL in one DELETE so the scheduler tick is cheap.
  ErrSessionSigningKeyInUse pinned via SQLSTATE 23503 from the
  sessions.signing_key_id FK ON DELETE RESTRICT.
* internal/repository/postgres/user.go - 137 lines. GetByOIDCSubject
  is the Phase 3 hot-path lookup; the (oidc_provider_id,
  oidc_subject) UNIQUE constraint trip translates to
  ErrUserDuplicateOIDCSubject. Update only writes the mutable field
  set (email, display_name, last_login_at, webauthn_credentials);
  oidc_subject + oidc_provider_id are immutable per the
  per-(provider, subject) identity model.

Integration tests (testing.Short()-gated, testcontainers + Postgres
16 Alpine, schema-per-test isolation via getTestDB().freshSchema):

* oidc_test.go: 11 tests covering happy-path + GetNotFound +
  DuplicateName + List + Update + DeleteNotFound + DeleteSucceeds +
  DeleteRefusedWhenUsersReference (the FK ON DELETE RESTRICT pin);
  GroupRoleMapping coverage includes Add/List/Map (3 cases:
  marketing-not-mapped, multi-group hits, empty groups returns
  empty), Duplicate rejection, and the ON DELETE CASCADE on
  provider deletion.
* session_test.go: 12 tests covering SessionSigningKey + Session.
  Key tests: GetActiveSkipsRetired (mints older, retires it, mints
  newer, asserts GetActive returns newer), DeleteRefusedWhenSessions-
  Reference (FK pin), RetireIsIdempotent. Session tests:
  CreateAndGet roundtrip, GetNotFound, Revoke + idempotent re-Revoke,
  ListByActor (3 active + 1 revoked + 1 pre-login -> returns 3,
  pinning the WHERE filter), RevokeAllForActor, GarbageCollectExpired
  (seeds an absolute-expired row + pre-login >10min row + active
  session via raw SQL to bypass CHECK constraints, asserts GC kills
  exactly 2 + active survives), UpdateLastSeen.
* user_test.go: 7 tests covering CreateAndGet, GetNotFound,
  GetByOIDCSubject (hit + miss), DuplicateOIDCSubjectRejected,
  UpdateMutableFields (asserts oidc_subject NOT mutated by Update),
  ListAll, FKRestrictsProviderDelete (mirror of the OIDC test from
  the user side - both ends of the FK contract pinned).

Verifications:
* gofmt -l clean across all 9 new files.
* go vet ./internal/repository/postgres/ rc=0.
* go test -short -count=1 green on internal/repository/postgres/ +
  internal/auth/... + Bundle 1 packages (testing.Short() skips the
  testcontainers integration tests, but the test files compile + the
  short-mode skip path is exercised so the suite is wired correctly).
* Full integration tests run in CI's non-short job against Postgres
  16 Alpine via testcontainers-go.
* govulncheck ./... clean.
* All 24 ci-guards pass.

Phase 2 exit criteria from cowork/auth-bundle-2-prompt.md (all met):
* All three Phase-2 migrations apply cleanly, idempotently: yes
  (Phase 2a). Break-glass migration ships separately in Phase 7.5.
* Repository tests pass against Postgres 16 Alpine: integration
  tests written, gated by testing.Short(), structured to run cleanly
  in CI's non-short job.
* make verify equivalent green: gofmt + vet + go test pass;
  golangci-lint deferred to CI per Phase 0/1's same pattern.
2026-05-10 04:18:27 +00:00

351 lines
12 KiB
Go

package postgres
import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/lib/pq"
sessiondomain "github.com/certctl-io/certctl/internal/auth/session/domain"
"github.com/certctl-io/certctl/internal/repository"
)
// =============================================================================
// SessionRepository (Auth Bundle 2 Phase 2)
// =============================================================================
// SessionRepository is the postgres implementation of
// repository.SessionRepository.
type SessionRepository struct {
db *sql.DB
}
// NewSessionRepository constructs a SessionRepository.
func NewSessionRepository(db *sql.DB) *SessionRepository {
return &SessionRepository{db: db}
}
const sessionColumns = `id, tenant_id, actor_id, actor_type,
signing_key_id, is_pre_login, csrf_token_hash,
idle_expires_at, absolute_expires_at, created_at, last_seen_at,
ip_address, user_agent, revoked_at`
func scanSession(row interface{ Scan(...interface{}) error }) (*sessiondomain.Session, error) {
var s sessiondomain.Session
var revokedAt sql.NullTime
if err := row.Scan(
&s.ID, &s.TenantID, &s.ActorID, &s.ActorType,
&s.SigningKeyID, &s.IsPreLogin, &s.CSRFTokenHash,
&s.IdleExpiresAt, &s.AbsoluteExpiresAt, &s.CreatedAt, &s.LastSeenAt,
&s.IPAddress, &s.UserAgent, &revokedAt,
); err != nil {
return nil, err
}
if revokedAt.Valid {
s.RevokedAt = &revokedAt.Time
}
return &s, nil
}
// Create persists a session row. Caller MUST have called s.Validate().
func (r *SessionRepository) Create(ctx context.Context, s *sessiondomain.Session) error {
_, err := r.db.ExecContext(ctx, `
INSERT INTO sessions (
id, tenant_id, actor_id, actor_type, signing_key_id,
is_pre_login, csrf_token_hash, idle_expires_at,
absolute_expires_at, created_at, last_seen_at,
ip_address, user_agent
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)`,
s.ID, s.TenantID, s.ActorID, s.ActorType, s.SigningKeyID,
s.IsPreLogin, s.CSRFTokenHash, s.IdleExpiresAt,
s.AbsoluteExpiresAt, s.CreatedAt, s.LastSeenAt,
s.IPAddress, s.UserAgent)
if err != nil {
var pqErr *pq.Error
if errors.As(err, &pqErr) && pqErr.Code == "23505" {
return repository.ErrAuthDuplicateName
}
return fmt.Errorf("sessions create: %w", err)
}
return nil
}
// Get returns a session by id. Returns the row even if revoked /
// expired; the service layer handles the disposition.
func (r *SessionRepository) Get(ctx context.Context, id string) (*sessiondomain.Session, error) {
row := r.db.QueryRowContext(ctx, `SELECT `+sessionColumns+` FROM sessions WHERE id = $1`, id)
s, err := scanSession(row)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, repository.ErrSessionNotFound
}
return nil, fmt.Errorf("sessions get: %w", err)
}
return s, nil
}
// ListByActor returns active (non-revoked, non-expired, non-pre-login)
// sessions for an actor.
func (r *SessionRepository) ListByActor(ctx context.Context, actorID, actorType, tenantID string) ([]*sessiondomain.Session, error) {
rows, err := r.db.QueryContext(ctx, `
SELECT `+sessionColumns+`
FROM sessions
WHERE actor_id = $1
AND actor_type = $2
AND tenant_id = $3
AND revoked_at IS NULL
AND is_pre_login = FALSE
AND absolute_expires_at > NOW()
ORDER BY created_at DESC`,
actorID, actorType, tenantID)
if err != nil {
return nil, fmt.Errorf("sessions list_by_actor: %w", err)
}
defer rows.Close()
var out []*sessiondomain.Session
for rows.Next() {
s, err := scanSession(rows)
if err != nil {
return nil, fmt.Errorf("sessions scan: %w", err)
}
out = append(out, s)
}
return out, rows.Err()
}
// UpdateLastSeen sets last_seen_at = NOW() for the named session.
func (r *SessionRepository) UpdateLastSeen(ctx context.Context, id string) error {
res, err := r.db.ExecContext(ctx, `UPDATE sessions SET last_seen_at = NOW() WHERE id = $1`, id)
if err != nil {
return fmt.Errorf("sessions update_last_seen: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
return repository.ErrSessionNotFound
}
return nil
}
// Revoke sets revoked_at = NOW() for the named session. Idempotent:
// re-revoking an already-revoked session is a no-op (returns nil).
func (r *SessionRepository) Revoke(ctx context.Context, id string) error {
res, err := r.db.ExecContext(ctx, `UPDATE sessions SET revoked_at = NOW() WHERE id = $1 AND revoked_at IS NULL`, id)
if err != nil {
return fmt.Errorf("sessions revoke: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
// Distinguish "not found" from "already revoked" by re-querying.
row := r.db.QueryRowContext(ctx, `SELECT 1 FROM sessions WHERE id = $1`, id)
var x int
if err := row.Scan(&x); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return repository.ErrSessionNotFound
}
return fmt.Errorf("sessions revoke probe: %w", err)
}
// Row exists but already revoked: idempotent success.
}
return nil
}
// RevokeAllForActor sets revoked_at = NOW() on every active session
// for an actor. Returns nil on zero matches (idempotent).
func (r *SessionRepository) RevokeAllForActor(ctx context.Context, actorID, actorType, tenantID string) error {
_, err := r.db.ExecContext(ctx, `
UPDATE sessions SET revoked_at = NOW()
WHERE actor_id = $1 AND actor_type = $2 AND tenant_id = $3 AND revoked_at IS NULL`,
actorID, actorType, tenantID)
if err != nil {
return fmt.Errorf("sessions revoke_all_for_actor: %w", err)
}
return nil
}
// GarbageCollectExpired deletes:
// - Sessions whose absolute_expires_at < NOW() (post-login expired).
// - Pre-login sessions older than 10 minutes.
//
// Returns the number of rows deleted across both classes.
func (r *SessionRepository) GarbageCollectExpired(ctx context.Context) (int, error) {
res, err := r.db.ExecContext(ctx, `
DELETE FROM sessions
WHERE absolute_expires_at < NOW()
OR (is_pre_login = TRUE AND created_at < NOW() - INTERVAL '10 minutes')`)
if err != nil {
return 0, fmt.Errorf("sessions garbage_collect: %w", err)
}
n, _ := res.RowsAffected()
return int(n), nil
}
// Delete unconditionally removes a session row.
func (r *SessionRepository) Delete(ctx context.Context, id string) error {
res, err := r.db.ExecContext(ctx, `DELETE FROM sessions WHERE id = $1`, id)
if err != nil {
return fmt.Errorf("sessions delete: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
return repository.ErrSessionNotFound
}
return nil
}
// =============================================================================
// SessionSigningKeyRepository (Auth Bundle 2 Phase 2)
// =============================================================================
// SessionSigningKeyRepository is the postgres implementation of
// repository.SessionSigningKeyRepository.
type SessionSigningKeyRepository struct {
db *sql.DB
}
// NewSessionSigningKeyRepository constructs a SessionSigningKeyRepository.
func NewSessionSigningKeyRepository(db *sql.DB) *SessionSigningKeyRepository {
return &SessionSigningKeyRepository{db: db}
}
const sessionSigningKeyColumns = `id, tenant_id, key_material_encrypted, created_at, retired_at`
func scanSessionSigningKey(row interface{ Scan(...interface{}) error }) (*sessiondomain.SessionSigningKey, error) {
var k sessiondomain.SessionSigningKey
var retiredAt sql.NullTime
if err := row.Scan(&k.ID, &k.TenantID, &k.KeyMaterialEncrypted, &k.CreatedAt, &retiredAt); err != nil {
return nil, err
}
if retiredAt.Valid {
k.RetiredAt = &retiredAt.Time
}
return &k, nil
}
// List returns every signing key in the tenant, including retired ones.
func (r *SessionSigningKeyRepository) List(ctx context.Context, tenantID string) ([]*sessiondomain.SessionSigningKey, error) {
rows, err := r.db.QueryContext(ctx,
`SELECT `+sessionSigningKeyColumns+` FROM session_signing_keys WHERE tenant_id = $1 ORDER BY created_at DESC`,
tenantID)
if err != nil {
return nil, fmt.Errorf("session_signing_keys list: %w", err)
}
defer rows.Close()
var out []*sessiondomain.SessionSigningKey
for rows.Next() {
k, err := scanSessionSigningKey(rows)
if err != nil {
return nil, fmt.Errorf("session_signing_keys scan: %w", err)
}
out = append(out, k)
}
return out, rows.Err()
}
// GetActive returns the most-recently-created non-retired key. Returns
// ErrSessionSigningKeyNotFound when no non-retired key exists.
func (r *SessionSigningKeyRepository) GetActive(ctx context.Context, tenantID string) (*sessiondomain.SessionSigningKey, error) {
row := r.db.QueryRowContext(ctx, `
SELECT `+sessionSigningKeyColumns+`
FROM session_signing_keys
WHERE tenant_id = $1 AND retired_at IS NULL
ORDER BY created_at DESC
LIMIT 1`, tenantID)
k, err := scanSessionSigningKey(row)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, repository.ErrSessionSigningKeyNotFound
}
return nil, fmt.Errorf("session_signing_keys get_active: %w", err)
}
return k, nil
}
// Get returns a key by id (including retired keys; Phase 4's Validate
// consults this for cookies signed under retired-but-in-retention keys).
func (r *SessionSigningKeyRepository) Get(ctx context.Context, id string) (*sessiondomain.SessionSigningKey, error) {
row := r.db.QueryRowContext(ctx,
`SELECT `+sessionSigningKeyColumns+` FROM session_signing_keys WHERE id = $1`, id)
k, err := scanSessionSigningKey(row)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, repository.ErrSessionSigningKeyNotFound
}
return nil, fmt.Errorf("session_signing_keys get: %w", err)
}
return k, nil
}
// Add persists a new signing key. Caller MUST have called k.Validate().
func (r *SessionSigningKeyRepository) Add(ctx context.Context, k *sessiondomain.SessionSigningKey) error {
if k.CreatedAt.IsZero() {
_, err := r.db.ExecContext(ctx, `
INSERT INTO session_signing_keys (id, tenant_id, key_material_encrypted)
VALUES ($1, $2, $3)`,
k.ID, k.TenantID, k.KeyMaterialEncrypted)
if err != nil {
return fmt.Errorf("session_signing_keys add: %w", err)
}
// Read the row back to populate CreatedAt.
row := r.db.QueryRowContext(ctx, `SELECT created_at FROM session_signing_keys WHERE id = $1`, k.ID)
if err := row.Scan(&k.CreatedAt); err != nil {
return fmt.Errorf("session_signing_keys add (read created_at): %w", err)
}
return nil
}
_, err := r.db.ExecContext(ctx, `
INSERT INTO session_signing_keys (id, tenant_id, key_material_encrypted, created_at)
VALUES ($1, $2, $3, $4)`,
k.ID, k.TenantID, k.KeyMaterialEncrypted, k.CreatedAt)
if err != nil {
return fmt.Errorf("session_signing_keys add: %w", err)
}
return nil
}
// Retire marks an active key as retired (sets retired_at = NOW()).
// Idempotent: re-retiring an already-retired key is a no-op.
func (r *SessionSigningKeyRepository) Retire(ctx context.Context, id string) error {
res, err := r.db.ExecContext(ctx,
`UPDATE session_signing_keys SET retired_at = NOW() WHERE id = $1 AND retired_at IS NULL`, id)
if err != nil {
return fmt.Errorf("session_signing_keys retire: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
// Distinguish not-found vs already-retired.
row := r.db.QueryRowContext(ctx, `SELECT 1 FROM session_signing_keys WHERE id = $1`, id)
var x int
if err := row.Scan(&x); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return repository.ErrSessionSigningKeyNotFound
}
return fmt.Errorf("session_signing_keys retire probe: %w", err)
}
// Row exists but already retired: idempotent success.
}
return nil
}
// Delete unconditionally removes a signing key. Returns
// ErrSessionSigningKeyInUse on SQLSTATE 23503 (FK ON DELETE RESTRICT
// from sessions.signing_key_id).
func (r *SessionSigningKeyRepository) Delete(ctx context.Context, id string) error {
res, err := r.db.ExecContext(ctx, `DELETE FROM session_signing_keys WHERE id = $1`, id)
if err != nil {
var pqErr *pq.Error
if errors.As(err, &pqErr) && pqErr.Code == "23503" {
return repository.ErrSessionSigningKeyInUse
}
return fmt.Errorf("session_signing_keys delete: %w", err)
}
n, _ := res.RowsAffected()
if n == 0 {
return repository.ErrSessionSigningKeyNotFound
}
return nil
}