feat(retention): COMP-002-RETENTION — federated-user PII purge pipeline

Sprint 6 closure of the audit's MED-severity COMP-002-RETENTION
finding.

Pre-fix posture: the federated-user admin surface
(auth_users.go::Deactivate) sets users.deactivated_at on soft-delete,
but the PII columns (email, display_name, oidc_subject) stay
populated forever. No in-code primitive for GDPR right-to-be-
forgotten; no scheduled retention purge.

This commit ships the audit's recommended two-phase fix:

  Phase 1 — operator-callable scrub primitive
    internal/service/user_retention.go
      UserRetentionService.DeleteUserPII(ctx, userID):
        - revoke all active sessions (defense-in-depth)
        - email := 'purged@redacted.local'
        - display_name := '[purged]'
        - oidc_subject := 'sha256:' || hex(sha256(original))
        - audit_events row with action=user.purge_pii,
          category=auth, actor=system

      Why hash oidc_subject instead of NULL:
        1. (oidc_provider_id, oidc_subject) UNIQUE constraint would
           trip on multiple purged users converging to NULL
        2. The hash is one-way; the original IdP-side identifier is
           unrecoverable. Re-login under the same subject mints a
           fresh u-id (right-to-be-forgotten semantics)
        3. Forensic continuity: an operator can recompute
           sha256(<known-subject>) and confirm "this user was
           deactivated then purged"

      users.id itself is preserved so historical
      audit_events.actor = u-X rows still resolve. The forensic-
      attribution chain stays intact even after the PII is gone.

  Phase 2 — scheduled batch purge
    internal/scheduler/scheduler.go
      UserRetentionPurger interface + userRetentionLoop:
        - PurgeDeactivatedUsers enumerates every user with
          deactivated_at < NOW() - retention_window
        - DeleteUserPII per row
        - per-tick batch cap (default 200) keeps blast radius
          predictable; large backlogs spread across multiple ticks
        - atomic.Bool guard + 5-min per-tick context.WithTimeout

    Repository contract grew a single new method:
      internal/repository/user.go::ListDeactivatedBefore(ctx, t)
      internal/repository/postgres/user.go: SQL-side filter
      (deactivated_at IS NOT NULL AND deactivated_at < $1)
      ORDER BY deactivated_at ASC, cross-tenant.

  Configuration
    CERTCTL_USER_RETENTION_INTERVAL   default 24h
    CERTCTL_USER_RETENTION_WINDOW     default 30 days
    CERTCTL_USER_RETENTION_BATCH_CAP  default 200

  Test stub additions for repository.UserRepository.ListDeactivatedBefore:
    internal/auth/oidc/service_test.go::stubUsers
    internal/api/handler/auth_users_test.go::stubFullUserRepo
    internal/api/handler/auth_session_oidc_test.go::stubUserRepo

  Documentation
    docs/operator/privacy-and-retention.md
      - retention pipeline diagram (day-0 deactivate → day-N purge)
      - operator config table
      - verification runbook (4 steps with SQL)
      - what's NOT covered (deferred: DSAR export, api_keys cascade,
        retroactive audit_events.details redaction)

  Tests
    internal/service/user_retention_test.go (NEW, 4 tests):
      TestDeleteUserPII_ScrubsAndRevokes
      TestDeleteUserPII_IsIdempotent
      TestPurgeDeactivatedUsers_RespectsWindow
      TestPurgeDeactivatedUsers_BatchCap

Verified locally:
  go vet ./...                                   (clean)
  gofmt -l internal/ cmd/                        (clean)
  go test -short -count=1 \
    ./internal/service/... ./internal/scheduler/... ./internal/config/...
    (all green)

Cross-sprint interaction: pairs with COMP-001-HASH (prior commit).
The user.purge_pii audit row this service emits flows through the
new hash chain, so the scrub event is itself tamper-evident.

Closes COMP-002-RETENTION. Sprint 6 is complete (2/2 findings).
This commit is contained in:
shankar0123
2026-05-16 06:18:39 +00:00
parent 43836aca7c
commit 663b14bfd8
11 changed files with 874 additions and 0 deletions
+224
View File
@@ -0,0 +1,224 @@
// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package service
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"log/slog"
"strings"
"time"
userdomain "github.com/certctl-io/certctl/internal/auth/user/domain"
"github.com/certctl-io/certctl/internal/domain"
"github.com/certctl-io/certctl/internal/repository"
)
// Sprint 6 COMP-002-RETENTION closure. The control plane stores three
// PII surfaces:
//
// users.email — IdP-supplied login email.
// users.display_name — IdP-supplied human label.
// users.oidc_subject — IdP's stable identifier for the human.
//
// Pre-fix there was no in-code primitive for GDPR right-to-be-forgotten
// or for automatic retention purges of deactivated accounts. The
// admin-side deactivate flow at internal/api/handler/auth_users.go
// set users.deactivated_at but the PII columns stayed populated
// forever.
//
// This file delivers the two pieces the audit's fix called for:
//
// Phase 1: DeleteUserPII(actorID) — operator-callable primitive that
// scrubs the row's PII while keeping the audit attribution
// chain intact (the row's id stays, so historical
// audit_events.actor = user.id rows still resolve).
// Phase 2: PurgeDeactivatedUsers(ctx) — walks every user whose
// deactivated_at is older than the retention window and
// calls DeleteUserPII on each. Scheduler loop calls this
// on a tick (default 24h); the retention window itself
// (default 30 days post-deactivate) is operator-tunable.
//
// Audit attribution invariant: DeleteUserPII replaces oidc_subject
// with sha256:<hex> rather than nullifying it. Three reasons:
// 1. Preserves the (oidc_provider_id, oidc_subject) UNIQUE
// constraint — two purged users on the same provider still have
// different oidc_subject values, so the constraint never trips.
// 2. The hash is a one-way fingerprint; the original IdP-side
// identifier is unrecoverable post-purge. Re-login under the
// same IdP subject mints a fresh u-id (different row) because
// GetByOIDCSubject won't match the hashed token.
// 3. Forensic continuity: if an operator later needs to prove "a
// user with subject X was deactivated then purged", they can
// recompute sha256(X) and look it up.
// UserRetentionService exposes the DeleteUserPII + PurgeDeactivatedUsers
// primitives. The handler-side admin endpoint (when wired) calls
// DeleteUserPII directly; the scheduler's userRetentionLoop calls
// PurgeDeactivatedUsers.
type UserRetentionService struct {
users repository.UserRepository
sessions repository.SessionRepository
audit *AuditService
logger *slog.Logger
// retentionWindow is how long after deactivated_at a user's PII
// stays in the table. The scheduler loop subtracts this from
// time.Now() when computing the "purge before" threshold.
retentionWindow time.Duration
// purgeBatchCap bounds how many users a single PurgeDeactivatedUsers
// call processes — keeps a single tick's blast radius predictable
// even if a large backlog accumulates. Zero = unbounded (test default).
purgeBatchCap int
}
// NewUserRetentionService wires the deps. The audit service is
// optional (nil = skip audit emission); production wiring in
// cmd/server/main.go passes the singleton.
func NewUserRetentionService(
users repository.UserRepository,
sessions repository.SessionRepository,
audit *AuditService,
logger *slog.Logger,
retentionWindow time.Duration,
purgeBatchCap int,
) *UserRetentionService {
if retentionWindow <= 0 {
retentionWindow = 30 * 24 * time.Hour
}
return &UserRetentionService{
users: users,
sessions: sessions,
audit: audit,
logger: logger,
retentionWindow: retentionWindow,
purgeBatchCap: purgeBatchCap,
}
}
// DeleteUserPII scrubs the named user's PII columns. Phase 1 fix per
// the COMP-002-RETENTION audit. Steps:
//
// 1. Load the user row. Returns repository.ErrUserNotFound if missing.
// 2. Revoke all active sessions for the actor (defense-in-depth — the
// handler-side Deactivate path already does this, but a purge after
// N days might catch sessions that were created post-deactivate via
// some other path).
// 3. Zero the PII columns:
// email = ""
// display_name = ""
// oidc_subject = "sha256:" || hex(sha256(original))
// 4. Persist the row via UserRepository.Update.
// 5. Emit an audit event (auth category, action user.purge_pii) so the
// scrub itself is on record.
//
// Returns nil on success. Idempotent: re-calling on an already-purged
// row hashes the already-hashed oidc_subject, which is a no-op semantic
// (the operator can tell purges happened by the "sha256:" prefix).
func (s *UserRetentionService) DeleteUserPII(ctx context.Context, userID string) error {
u, err := s.users.Get(ctx, userID)
if err != nil {
return fmt.Errorf("user_retention: load %s: %w", userID, err)
}
// Defense-in-depth: revoke all sessions before the row mutates.
if err := s.sessions.RevokeAllForActor(ctx, u.ID, string(domain.ActorTypeUser), u.TenantID); err != nil {
// Log + continue; PII scrub is the load-bearing step. A
// dangling-session row whose actor's PII is already gone is
// less harmful than leaving the PII intact because the
// session revoke failed.
s.logger.Warn("user_retention: session revoke failed during PII scrub (continuing)",
"user_id", userID, "error", err)
}
// Hash the oidc_subject IF it isn't already a "sha256:..." token.
// Idempotent re-scrubs are safe; a second pass produces the same
// hash of the hash, but the prefix lets operators tell the row was
// already scrubbed.
if !strings.HasPrefix(u.OIDCSubject, "sha256:") {
sum := sha256.Sum256([]byte(u.OIDCSubject))
u.OIDCSubject = "sha256:" + hex.EncodeToString(sum[:])
}
u.Email = "purged@redacted.local" // domain.User.Validate requires plausible email format
u.DisplayName = "[purged]" // domain.User.Validate forbids leading/trailing whitespace
u.WebAuthnCredentials = []byte(`[]`) // v3-reserved field — keep empty JSONB.
if err := s.users.Update(ctx, u); err != nil {
return fmt.Errorf("user_retention: update %s: %w", userID, err)
}
if s.audit != nil {
_ = s.audit.RecordEventWithCategory(ctx,
"system", domain.ActorTypeSystem,
"user.purge_pii",
domain.EventCategoryAuth,
"user", userID,
map[string]interface{}{
"retained_id": userID,
"hashed_oidc_subject_set": true,
},
)
}
s.logger.Info("user_retention: PII scrubbed", "user_id", userID)
return nil
}
// PurgeDeactivatedUsers enumerates every user whose deactivated_at is
// older than now - retentionWindow and calls DeleteUserPII on each.
// Returns (purged, failed) counts; logs individual failures at WARN
// and continues so a single bad row doesn't stall the rest of the
// batch. Bounded by purgeBatchCap when non-zero.
func (s *UserRetentionService) PurgeDeactivatedUsers(ctx context.Context) (int, int, error) {
threshold := time.Now().Add(-s.retentionWindow)
rows, err := s.users.ListDeactivatedBefore(ctx, threshold)
if err != nil {
return 0, 0, fmt.Errorf("user_retention: list deactivated: %w", err)
}
var purged, failed int
for i, u := range rows {
if s.purgeBatchCap > 0 && i >= s.purgeBatchCap {
s.logger.Info("user_retention: batch cap reached; remaining rows deferred to next tick",
"cap", s.purgeBatchCap, "remaining", len(rows)-i)
break
}
// Skip rows that are already scrubbed — DeleteUserPII is
// idempotent but skipping saves a transaction and an audit
// row per tick.
if strings.HasPrefix(u.OIDCSubject, "sha256:") &&
strings.HasPrefix(u.Email, "purged@") {
continue
}
if err := s.DeleteUserPII(ctx, u.ID); err != nil {
s.logger.Warn("user_retention: purge failed (next tick will retry)",
"user_id", u.ID, "error", err)
failed++
continue
}
purged++
}
if purged > 0 || failed > 0 {
s.logger.Info("user_retention: purge sweep complete",
"purged", purged,
"failed", failed,
"threshold", threshold.Format(time.RFC3339))
}
return purged, failed, nil
}
// RetentionWindow exposes the configured window for tests + the
// operator-facing "when will my account be scrubbed" GUI surface.
func (s *UserRetentionService) RetentionWindow() time.Duration {
return s.retentionWindow
}
// userdomain is imported so the compiler recognises the type used by
// the repository contracts even though this file only consumes pointer
// values via the interface. Keep this blank ref so re-organising the
// file later doesn't accidentally drop the import.
var _ = (*userdomain.User)(nil)
+302
View File
@@ -0,0 +1,302 @@
// Copyright 2026 certctl LLC. All rights reserved.
// SPDX-License-Identifier: BUSL-1.1
package service
import (
"context"
"errors"
"io"
"log/slog"
"strings"
"sync"
"testing"
"time"
sessiondomain "github.com/certctl-io/certctl/internal/auth/session/domain"
userdomain "github.com/certctl-io/certctl/internal/auth/user/domain"
"github.com/certctl-io/certctl/internal/domain"
"github.com/certctl-io/certctl/internal/repository"
)
// Sprint 6 COMP-002-RETENTION unit tests. The repo + session deps are
// covered by in-memory stubs in this file; the integration shape
// (deactivated_at SQL filter, session revocation in PG) is covered by
// the existing postgres tests for those repositories.
type retentionStubUserRepo struct {
mu sync.Mutex
byID map[string]*userdomain.User
updateOK bool
updateCh chan struct{}
}
func newRetentionStubUserRepo() *retentionStubUserRepo {
return &retentionStubUserRepo{
byID: make(map[string]*userdomain.User),
updateOK: true,
updateCh: make(chan struct{}, 100),
}
}
func (r *retentionStubUserRepo) seed(u *userdomain.User) {
r.mu.Lock()
defer r.mu.Unlock()
r.byID[u.ID] = u
}
func (r *retentionStubUserRepo) Get(_ context.Context, id string) (*userdomain.User, error) {
r.mu.Lock()
defer r.mu.Unlock()
u, ok := r.byID[id]
if !ok {
return nil, repository.ErrUserNotFound
}
cp := *u
return &cp, nil
}
func (r *retentionStubUserRepo) GetByOIDCSubject(_ context.Context, _, _ string) (*userdomain.User, error) {
return nil, repository.ErrUserNotFound
}
func (r *retentionStubUserRepo) Create(_ context.Context, _ *userdomain.User) error { return nil }
func (r *retentionStubUserRepo) Update(_ context.Context, u *userdomain.User) error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.updateOK {
return errors.New("retentionStubUserRepo: update disabled")
}
cp := *u
r.byID[u.ID] = &cp
r.updateCh <- struct{}{}
return nil
}
func (r *retentionStubUserRepo) ListAll(_ context.Context, _ string) ([]*userdomain.User, error) {
return nil, nil
}
func (r *retentionStubUserRepo) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) {
r.mu.Lock()
defer r.mu.Unlock()
var out []*userdomain.User
for _, u := range r.byID {
if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) {
cp := *u
out = append(out, &cp)
}
}
return out, nil
}
type retentionStubSessionRepo struct {
mu sync.Mutex
revokedActors []string
revokeError error
}
// retentionStubSessionRepo satisfies the full repository.SessionRepository
// surface. user_retention.go only calls RevokeAllForActor; the other
// methods are no-op stubs that exist so the fixture compiles against
// the interface.
func (s *retentionStubSessionRepo) Create(_ context.Context, _ *sessiondomain.Session) error {
return nil
}
func (s *retentionStubSessionRepo) Get(_ context.Context, _ string) (*sessiondomain.Session, error) {
return nil, repository.ErrSessionNotFound
}
func (s *retentionStubSessionRepo) ListByActor(_ context.Context, _, _, _ string) ([]*sessiondomain.Session, error) {
return nil, nil
}
func (s *retentionStubSessionRepo) UpdateLastSeen(_ context.Context, _ string) error {
return nil
}
func (s *retentionStubSessionRepo) UpdateCSRFTokenHash(_ context.Context, _, _ string) error {
return nil
}
func (s *retentionStubSessionRepo) Revoke(_ context.Context, _ string) error { return nil }
func (s *retentionStubSessionRepo) RevokeAllForActor(_ context.Context, actorID, _, _ string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.revokeError != nil {
return s.revokeError
}
s.revokedActors = append(s.revokedActors, actorID)
return nil
}
func (s *retentionStubSessionRepo) RevokeAllExceptForActor(_ context.Context, _, _, _, _ string) (int, error) {
return 0, nil
}
func (s *retentionStubSessionRepo) GarbageCollectExpired(_ context.Context) (int, error) {
return 0, nil
}
func (s *retentionStubSessionRepo) Delete(_ context.Context, _ string) error {
return nil
}
func newTestUser(id, email, displayName, subject string, deactivated *time.Time) *userdomain.User {
return &userdomain.User{
ID: id,
TenantID: "t-default",
Email: email,
DisplayName: displayName,
OIDCSubject: subject,
OIDCProviderID: "op-test",
LastLoginAt: time.Now(),
WebAuthnCredentials: []byte(`[]`),
CreatedAt: time.Now().Add(-90 * 24 * time.Hour),
UpdatedAt: time.Now(),
DeactivatedAt: deactivated,
}
}
func discardLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError + 10}))
}
// TestDeleteUserPII_ScrubsAndRevokes: the operator-facing primitive
// nullifies email + display_name, hashes oidc_subject, and revokes
// sessions on the affected actor.
func TestDeleteUserPII_ScrubsAndRevokes(t *testing.T) {
t.Parallel()
users := newRetentionStubUserRepo()
sessions := &retentionStubSessionRepo{}
now := time.Now()
deact := now.Add(-45 * 24 * time.Hour)
users.seed(newTestUser("u-1", "alice@example.com", "Alice", "sub-alice", &deact))
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0)
if err := svc.DeleteUserPII(context.Background(), "u-1"); err != nil {
t.Fatalf("DeleteUserPII: %v", err)
}
got, err := users.Get(context.Background(), "u-1")
if err != nil {
t.Fatalf("post-purge Get: %v", err)
}
if got.Email != "purged@redacted.local" {
t.Errorf("email not scrubbed: %q", got.Email)
}
if got.DisplayName != "[purged]" {
t.Errorf("display_name not scrubbed: %q", got.DisplayName)
}
if !strings.HasPrefix(got.OIDCSubject, "sha256:") {
t.Errorf("oidc_subject not hashed: %q", got.OIDCSubject)
}
if got.OIDCSubject == "sha256:" {
t.Errorf("oidc_subject hash is empty")
}
sessions.mu.Lock()
if len(sessions.revokedActors) != 1 || sessions.revokedActors[0] != "u-1" {
t.Errorf("RevokeAllForActor not called for u-1; got %v", sessions.revokedActors)
}
sessions.mu.Unlock()
}
// TestDeleteUserPII_IsIdempotent: scrubbing an already-scrubbed row
// is safe — the oidc_subject hash recurses (sha256 of sha256:hex)
// without breaking the UNIQUE constraint or returning an error. We
// verify the scrubbed values + the prefix.
func TestDeleteUserPII_IsIdempotent(t *testing.T) {
t.Parallel()
users := newRetentionStubUserRepo()
sessions := &retentionStubSessionRepo{}
now := time.Now()
deact := now.Add(-100 * 24 * time.Hour)
users.seed(newTestUser("u-2", "bob@example.com", "Bob", "sub-bob", &deact))
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0)
if err := svc.DeleteUserPII(context.Background(), "u-2"); err != nil {
t.Fatalf("first DeleteUserPII: %v", err)
}
first, _ := users.Get(context.Background(), "u-2")
if err := svc.DeleteUserPII(context.Background(), "u-2"); err != nil {
t.Fatalf("second DeleteUserPII: %v", err)
}
second, _ := users.Get(context.Background(), "u-2")
// oidc_subject doesn't get re-hashed (prefix guard).
if first.OIDCSubject != second.OIDCSubject {
t.Errorf("idempotent re-scrub re-hashed oidc_subject: %q -> %q",
first.OIDCSubject, second.OIDCSubject)
}
if !strings.HasPrefix(second.OIDCSubject, "sha256:") {
t.Errorf("scrubbed oidc_subject lost prefix: %q", second.OIDCSubject)
}
}
// TestPurgeDeactivatedUsers_RespectsWindow: only users whose
// deactivated_at is older than now-retentionWindow get scrubbed; rows
// deactivated within the window remain intact.
func TestPurgeDeactivatedUsers_RespectsWindow(t *testing.T) {
t.Parallel()
users := newRetentionStubUserRepo()
sessions := &retentionStubSessionRepo{}
now := time.Now()
stale := now.Add(-45 * 24 * time.Hour) // past 30d window
recent := now.Add(-7 * 24 * time.Hour) // inside 30d window
users.seed(newTestUser("u-stale", "stale@example.com", "Stale", "sub-stale", &stale))
users.seed(newTestUser("u-recent", "recent@example.com", "Recent", "sub-recent", &recent))
users.seed(newTestUser("u-active", "active@example.com", "Active", "sub-active", nil))
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0)
purged, failed, err := svc.PurgeDeactivatedUsers(context.Background())
if err != nil {
t.Fatalf("PurgeDeactivatedUsers: %v", err)
}
if purged != 1 {
t.Errorf("expected 1 row purged, got %d", purged)
}
if failed != 0 {
t.Errorf("expected 0 failures, got %d", failed)
}
staleU, _ := users.Get(context.Background(), "u-stale")
if staleU.Email != "purged@redacted.local" {
t.Errorf("stale row not scrubbed: %q", staleU.Email)
}
recentU, _ := users.Get(context.Background(), "u-recent")
if recentU.Email != "recent@example.com" {
t.Errorf("recent row should not have been scrubbed: %q", recentU.Email)
}
activeU, _ := users.Get(context.Background(), "u-active")
if activeU.Email != "active@example.com" {
t.Errorf("active row should not have been scrubbed: %q", activeU.Email)
}
}
// TestPurgeDeactivatedUsers_BatchCap caps the per-tick blast radius.
func TestPurgeDeactivatedUsers_BatchCap(t *testing.T) {
t.Parallel()
users := newRetentionStubUserRepo()
sessions := &retentionStubSessionRepo{}
stale := time.Now().Add(-100 * 24 * time.Hour)
for i := 0; i < 5; i++ {
id := "u-cap-" + string(rune('0'+i))
users.seed(newTestUser(id, id+"@example.com", id, "sub-"+id, &stale))
}
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 2)
purged, failed, err := svc.PurgeDeactivatedUsers(context.Background())
if err != nil {
t.Fatalf("PurgeDeactivatedUsers: %v", err)
}
if purged != 2 {
t.Errorf("expected exactly 2 rows purged (batch cap = 2), got %d", purged)
}
if failed != 0 {
t.Errorf("expected 0 failures, got %d", failed)
}
}
// _ guard for unused-import lint when one of the helpers above is
// removed during future refactors.
var _ = domain.ActorTypeUser