From 663b14bfd81489cf4ca781cbac20aae7212cbce8 Mon Sep 17 00:00:00 2001 From: shankar0123 Date: Sat, 16 May 2026 06:18:39 +0000 Subject: [PATCH] =?UTF-8?q?feat(retention):=20COMP-002-RETENTION=20?= =?UTF-8?q?=E2=80=94=20federated-user=20PII=20purge=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sprint 6 closure of the audit's MED-severity COMP-002-RETENTION finding. Pre-fix posture: the federated-user admin surface (auth_users.go::Deactivate) sets users.deactivated_at on soft-delete, but the PII columns (email, display_name, oidc_subject) stay populated forever. No in-code primitive for GDPR right-to-be- forgotten; no scheduled retention purge. This commit ships the audit's recommended two-phase fix: Phase 1 — operator-callable scrub primitive internal/service/user_retention.go UserRetentionService.DeleteUserPII(ctx, userID): - revoke all active sessions (defense-in-depth) - email := 'purged@redacted.local' - display_name := '[purged]' - oidc_subject := 'sha256:' || hex(sha256(original)) - audit_events row with action=user.purge_pii, category=auth, actor=system Why hash oidc_subject instead of NULL: 1. (oidc_provider_id, oidc_subject) UNIQUE constraint would trip on multiple purged users converging to NULL 2. The hash is one-way; the original IdP-side identifier is unrecoverable. Re-login under the same subject mints a fresh u-id (right-to-be-forgotten semantics) 3. Forensic continuity: an operator can recompute sha256() and confirm "this user was deactivated then purged" users.id itself is preserved so historical audit_events.actor = u-X rows still resolve. The forensic- attribution chain stays intact even after the PII is gone. Phase 2 — scheduled batch purge internal/scheduler/scheduler.go UserRetentionPurger interface + userRetentionLoop: - PurgeDeactivatedUsers enumerates every user with deactivated_at < NOW() - retention_window - DeleteUserPII per row - per-tick batch cap (default 200) keeps blast radius predictable; large backlogs spread across multiple ticks - atomic.Bool guard + 5-min per-tick context.WithTimeout Repository contract grew a single new method: internal/repository/user.go::ListDeactivatedBefore(ctx, t) internal/repository/postgres/user.go: SQL-side filter (deactivated_at IS NOT NULL AND deactivated_at < $1) ORDER BY deactivated_at ASC, cross-tenant. Configuration CERTCTL_USER_RETENTION_INTERVAL default 24h CERTCTL_USER_RETENTION_WINDOW default 30 days CERTCTL_USER_RETENTION_BATCH_CAP default 200 Test stub additions for repository.UserRepository.ListDeactivatedBefore: internal/auth/oidc/service_test.go::stubUsers internal/api/handler/auth_users_test.go::stubFullUserRepo internal/api/handler/auth_session_oidc_test.go::stubUserRepo Documentation docs/operator/privacy-and-retention.md - retention pipeline diagram (day-0 deactivate → day-N purge) - operator config table - verification runbook (4 steps with SQL) - what's NOT covered (deferred: DSAR export, api_keys cascade, retroactive audit_events.details redaction) Tests internal/service/user_retention_test.go (NEW, 4 tests): TestDeleteUserPII_ScrubsAndRevokes TestDeleteUserPII_IsIdempotent TestPurgeDeactivatedUsers_RespectsWindow TestPurgeDeactivatedUsers_BatchCap Verified locally: go vet ./... (clean) gofmt -l internal/ cmd/ (clean) go test -short -count=1 \ ./internal/service/... ./internal/scheduler/... ./internal/config/... (all green) Cross-sprint interaction: pairs with COMP-001-HASH (prior commit). The user.purge_pii audit row this service emits flows through the new hash chain, so the scrub event is itself tamper-evident. Closes COMP-002-RETENTION. Sprint 6 is complete (2/2 findings). --- cmd/server/main.go | 20 ++ docs/operator/privacy-and-retention.md | 136 ++++++++ .../api/handler/auth_session_oidc_test.go | 8 + internal/api/handler/auth_users_test.go | 14 + internal/auth/oidc/service_test.go | 14 + internal/config/config.go | 29 ++ internal/repository/postgres/user.go | 31 ++ internal/repository/user.go | 9 + internal/scheduler/scheduler.go | 87 +++++ internal/service/user_retention.go | 224 +++++++++++++ internal/service/user_retention_test.go | 302 ++++++++++++++++++ 11 files changed, 874 insertions(+) create mode 100644 docs/operator/privacy-and-retention.md create mode 100644 internal/service/user_retention.go create mode 100644 internal/service/user_retention_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 0af477d..41d6739 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1258,6 +1258,26 @@ func main() { logger.Info("audit chain verify loop enabled", "interval", cfg.AuditChain.VerifyInterval.String()) + // Sprint 6 COMP-002-RETENTION: wire the user-PII purge loop. The + // service nullifies email + display_name on users whose + // deactivated_at exceeds the retention window (default 30d) and + // hashes oidc_subject to preserve audit attribution. The scheduler + // loop ticks on CERTCTL_USER_RETENTION_INTERVAL (default 24h). + userRetentionService := service.NewUserRetentionService( + oidcUserRepo, + sessionRepo, + auditService, + logger, + cfg.UserRetention.RetentionWindow, + cfg.UserRetention.BatchCap, + ) + sched.SetUserRetentionPurger(userRetentionService) + sched.SetUserRetentionInterval(cfg.UserRetention.Interval) + logger.Info("user PII retention purge loop enabled", + "interval", cfg.UserRetention.Interval.String(), + "retention_window", cfg.UserRetention.RetentionWindow.String(), + "batch_cap", cfg.UserRetention.BatchCap) + logger.Info("session GC sweep enabled", "interval", cfg.Auth.Session.GCInterval.String(), "absolute_timeout", cfg.Auth.Session.AbsoluteTimeout.String(), diff --git a/docs/operator/privacy-and-retention.md b/docs/operator/privacy-and-retention.md new file mode 100644 index 0000000..8988c39 --- /dev/null +++ b/docs/operator/privacy-and-retention.md @@ -0,0 +1,136 @@ +# Privacy & retention (federated-user PII) + +> Last reviewed: 2026-05-16 + +Sprint 6 COMP-002-RETENTION closure. certctl stores three categories +of personally-identifiable information for federated humans (Auth +Bundle 2 OIDC users): + +| Column | Source | Used by | +|---|---|---| +| `users.email` | IdP claim (`email`) | Operator GUI "find user by email", display in lists, audit attribution. | +| `users.display_name` | IdP claim (`name`) | UI display string for the human. | +| `users.oidc_subject` | IdP claim (`sub`) | Stable identifier — joined with `oidc_provider_id` in the (provider, subject) UNIQUE constraint. | + +Pre-fix, deactivating a user (admin-side `auth.user.deactivate`) +soft-deleted the row by setting `deactivated_at` but left the PII +columns populated indefinitely. The Sprint 6 fix adds an automatic +purge pipeline. + +## Retention pipeline shape + +``` +Day 0 admin → POST /api/v1/auth/users/u-X/deactivate + ├─ users.deactivated_at = NOW() + └─ all active sessions for u-X revoked + +Day N scheduler's userRetentionLoop tick (default cadence 24h) + └─ UserRetentionService.PurgeDeactivatedUsers + ├─ SELECT users WHERE deactivated_at < NOW() - retention_window + ├─ For each row (batch-capped per tick): + │ UserRetentionService.DeleteUserPII(u.id) + │ ├─ revoke all active sessions (defense-in-depth) + │ ├─ email := "purged@redacted.local" + │ ├─ display_name := "[purged]" + │ ├─ oidc_subject := "sha256:" || hex(sha256(original)) + │ └─ audit_events row (action=user.purge_pii, category=auth) +``` + +`users.id` is **preserved**. Historical `audit_events.actor = u-X` +rows still resolve to the row (now scrubbed). This is the +forensic-attribution guarantee — the operator can prove "user u-X +performed action Y on date Z" even after the PII is gone. + +`oidc_subject` is **hashed**, not nullified, for two reasons: + +1. The `(oidc_provider_id, oidc_subject)` UNIQUE constraint would + trip if multiple purged users converged on the same NULL. +2. Re-login under the same IdP subject creates a fresh row (different + `u-` id) because `GetByOIDCSubject` won't match the hashed token — + the original subject is unrecoverable from the hash. This is the + "right-to-be-forgotten" behavior: the same human logging back in + is functionally a new account. + +## Operator configuration + +| Env var | Default | Notes | +|---|---|---| +| `CERTCTL_USER_RETENTION_INTERVAL` | `24h` | Tick cadence for the scheduler's userRetentionLoop. Zero or negative ignored. | +| `CERTCTL_USER_RETENTION_WINDOW` | `30 * 24h` (30 days) | How long after `deactivated_at` a row's PII stays in the table. Operators with stricter GDPR/CCPA expectations may shorten. | +| `CERTCTL_USER_RETENTION_BATCH_CAP` | `200` | Per-tick row budget. Larger backlogs spread across multiple ticks. 0 = unbounded (test fixtures only). | + +## How to verify retention is working + +1. Deactivate a test user via the admin path: + + ```bash + curl -X POST -H "X-API-Key: $ADMIN_KEY" \ + https://certctl.example.com/api/v1/auth/users/u-test/deactivate + ``` + +2. Confirm the row's `deactivated_at` is set: + + ```sql + SELECT id, email, deactivated_at FROM users WHERE id = 'u-test'; + ``` + +3. Backdate `deactivated_at` to past the retention window (only for + testing — never in production): + + ```sql + UPDATE users SET deactivated_at = NOW() - INTERVAL '60 days' + WHERE id = 'u-test'; + ``` + + (Note: this UPDATE will succeed because `users` doesn't have a + WORM trigger; the audit-events WORM trigger is unrelated.) + +4. Wait for the next `userRetentionLoop` tick (or restart the server + to force an immediate sweep). Confirm scrub: + + ```sql + SELECT id, email, display_name, oidc_subject + FROM users + WHERE id = 'u-test'; + ``` + + Expected: `email = 'purged@redacted.local'`, + `display_name = '[purged]'`, + `oidc_subject LIKE 'sha256:%'`. + +5. Confirm an audit row was emitted: + + ```sql + SELECT id, actor, action, resource_id, timestamp + FROM audit_events + WHERE action = 'user.purge_pii' AND resource_id = 'u-test' + ORDER BY timestamp DESC LIMIT 1; + ``` + +## What's NOT covered (deferred work) + +The Sprint 6 fix is Phase 1 of the audit's COMP-002-RETENTION +recommendation. Two further pieces are forward-looking: + +- **GDPR data-subject access request (DSAR) export.** A "show me + everything you know about me" endpoint is not yet implemented. + Operators on EU-resident data should treat this as a manual SQL + procedure today; track for Phase 2. +- **Cascade purge of related rows.** Sessions are revoked (above); + api_keys with `created_by = u-X` are NOT yet purged on scrub. The + api_keys table doesn't have a foreign key to users (it indexes by + `actor_id` strings, free-form), so the cascade is a service-layer + concern that needs explicit wiring. Track for Phase 2. +- **Per-event PII redaction in `audit_events.details`.** The existing + `RedactDetailsForAudit` (`internal/service/audit_redact.go`) scrubs + credential + PII keys at write time. A future feature for + "retroactively re-redact existing rows" would interact with the WORM + trigger; out of scope today. + +## See also + +- `internal/service/user_retention.go` — `UserRetentionService` source. +- `internal/scheduler/scheduler.go::userRetentionLoop` — scheduler loop. +- `migrations/000036_users.up.sql` — `users` table definition. +- `migrations/000045_users_deactivated_at.up.sql` — `deactivated_at` column. +- `docs/operator/audit-chain.md` — paired Sprint 6 tamper-evidence work. diff --git a/internal/api/handler/auth_session_oidc_test.go b/internal/api/handler/auth_session_oidc_test.go index e20259e..fe4fcac 100644 --- a/internal/api/handler/auth_session_oidc_test.go +++ b/internal/api/handler/auth_session_oidc_test.go @@ -255,6 +255,14 @@ func (s *stubUserRepo) ListAll(_ context.Context, _ string) ([]*userdomain.User, return nil, nil } +// ListDeactivatedBefore satisfies the Sprint 6 COMP-002-RETENTION +// interface addition. The phase-5 OIDC handler tests don't exercise +// retention paths, so an empty result keeps the contract without +// changing test semantics. +func (s *stubUserRepo) ListDeactivatedBefore(_ context.Context, _ time.Time) ([]*userdomain.User, error) { + return nil, nil +} + type phase5StubAudit struct { events []string // Audit 2026-05-11 Fix 13 — capture the details map so the diff --git a/internal/api/handler/auth_users_test.go b/internal/api/handler/auth_users_test.go index 3331b05..6c04cf0 100644 --- a/internal/api/handler/auth_users_test.go +++ b/internal/api/handler/auth_users_test.go @@ -83,6 +83,20 @@ func (s *stubFullUserRepo) ListAll(_ context.Context, tenantID string) ([]*userd return out, nil } +// ListDeactivatedBefore satisfies the Sprint 6 COMP-002-RETENTION +// interface addition. Walk rows, filter by DeactivatedAt-before-threshold. +// Order is intentionally not stabilised — the auth_users handler tests +// don't exercise the retention loop. +func (s *stubFullUserRepo) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) { + var out []*userdomain.User + for _, u := range s.rows { + if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) { + out = append(out, u) + } + } + return out, nil +} + // stubRevoker records cascade-revoke calls. type stubRevoker struct { called bool diff --git a/internal/auth/oidc/service_test.go b/internal/auth/oidc/service_test.go index f60e150..2c60dcf 100644 --- a/internal/auth/oidc/service_test.go +++ b/internal/auth/oidc/service_test.go @@ -392,6 +392,20 @@ func (s *stubUsers) ListAll(_ context.Context, _ string) ([]*userdomain.User, er return out, nil } +// ListDeactivatedBefore satisfies the Sprint 6 COMP-002-RETENTION +// interface addition. Stub-side: walk byID and filter on the +// DeactivatedAt cursor; OIDC service tests don't care about ordering +// stability. +func (s *stubUsers) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) { + var out []*userdomain.User + for _, u := range s.byID { + if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) { + out = append(out, u) + } + } + return out, nil +} + type stubSessions struct { cookieValue string csrfToken string diff --git a/internal/config/config.go b/internal/config/config.go index 97d4b29..d91176b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -108,6 +108,10 @@ type Config struct { // cadence. Scheduler loop auditChainVerifyLoop reads VerifyInterval; // the metric-side counter is wired separately in cmd/server/main.go. AuditChain AuditChainConfig + // UserRetention holds the Sprint 6 COMP-002-RETENTION purge cadence + // + window. The scheduler's userRetentionLoop reads Interval; the + // UserRetentionService reads RetentionWindow + BatchCap. + UserRetention UserRetentionConfig } // AuditChainConfig configures the audit_events tamper-evidence @@ -126,6 +130,26 @@ type AuditChainConfig struct { VerifyInterval time.Duration } +// UserRetentionConfig configures the Sprint 6 COMP-002-RETENTION user +// PII purge sweeper. The scheduler's userRetentionLoop walks every +// user with deactivated_at older than RetentionWindow and scrubs the +// PII columns via UserRetentionService.DeleteUserPII. +type UserRetentionConfig struct { + // Interval is the tick cadence. Default 24h. + // Setting: CERTCTL_USER_RETENTION_INTERVAL. + Interval time.Duration + // RetentionWindow is how long after deactivated_at a row's PII + // stays in the table. Default 30 days. Operators with strict + // GDPR / CCPA expectations may shorten; operators who need + // forensic recovery latitude may lengthen. + // Setting: CERTCTL_USER_RETENTION_WINDOW. + RetentionWindow time.Duration + // BatchCap bounds how many users a single tick processes. Default + // 200 — keeps blast radius predictable. Set to 0 to disable the + // cap (test fixtures only). + // Setting: CERTCTL_USER_RETENTION_BATCH_CAP. + BatchCap int +} // OCSPResponderConfig configures the dedicated OCSP-responder cert // per issuer (RFC 6960 §2.6 + §4.2.2.2). When unset, the local issuer @@ -724,6 +748,11 @@ func Load() (*Config, error) { AuditChain: AuditChainConfig{ VerifyInterval: getEnvDuration("CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL", 6*time.Hour), }, + UserRetention: UserRetentionConfig{ + Interval: getEnvDuration("CERTCTL_USER_RETENTION_INTERVAL", 24*time.Hour), + RetentionWindow: getEnvDuration("CERTCTL_USER_RETENTION_WINDOW", 30*24*time.Hour), + BatchCap: getEnvInt("CERTCTL_USER_RETENTION_BATCH_CAP", 200), + }, } // Parse CERTCTL_API_KEYS_NAMED for named key authentication (M-002). diff --git a/internal/repository/postgres/user.go b/internal/repository/postgres/user.go index 34da837..e3e34a8 100644 --- a/internal/repository/postgres/user.go +++ b/internal/repository/postgres/user.go @@ -8,6 +8,7 @@ import ( "database/sql" "errors" "fmt" + "time" "github.com/lib/pq" @@ -177,3 +178,33 @@ func (r *UserRepository) ListAll(ctx context.Context, tenantID string) ([]*userd } return out, rows.Err() } + +// ListDeactivatedBefore returns every user (across all tenants) whose +// deactivated_at is not NULL AND strictly before threshold. Sprint 6 +// COMP-002-RETENTION — the userRetentionLoop in the scheduler walks +// this list per tick and calls UserRetentionService.DeleteUserPII on +// each. Cross-tenant on purpose: a single retention policy spans the +// whole control plane. +func (r *UserRepository) ListDeactivatedBefore(ctx context.Context, threshold time.Time) ([]*userdomain.User, error) { + rows, err := r.db.QueryContext(ctx, + `SELECT `+userColumns+` + FROM users + WHERE deactivated_at IS NOT NULL + AND deactivated_at < $1 + ORDER BY deactivated_at ASC`, + threshold) + if err != nil { + return nil, fmt.Errorf("users list_deactivated_before: %w", err) + } + defer rows.Close() + + var out []*userdomain.User + for rows.Next() { + u, err := scanUser(rows) + if err != nil { + return nil, fmt.Errorf("users scan: %w", err) + } + out = append(out, u) + } + return out, rows.Err() +} diff --git a/internal/repository/user.go b/internal/repository/user.go index 89c2646..4b0d263 100644 --- a/internal/repository/user.go +++ b/internal/repository/user.go @@ -6,6 +6,7 @@ package repository import ( "context" "errors" + "time" userdomain "github.com/certctl-io/certctl/internal/auth/user/domain" ) @@ -46,4 +47,12 @@ type UserRepository interface { // ListAll returns every user in the tenant. Order: // created_at ASC. Used by the GUI's admin surface. ListAll(ctx context.Context, tenantID string) ([]*userdomain.User, error) + + // ListDeactivatedBefore returns every user whose deactivated_at is + // not NULL AND strictly before the supplied threshold. Sprint 6 + // COMP-002-RETENTION closure — the scheduler's userRetentionLoop + // uses this to enumerate purge-eligible rows on each tick. Order: + // deactivated_at ASC (oldest first, so a tick-budget cap is + // deterministic about which rows it processes). + ListDeactivatedBefore(ctx context.Context, threshold time.Time) ([]*userdomain.User, error) } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 3175ff1..644eb08 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -145,6 +145,16 @@ type AuditChainBreakRecorder interface { RecordSuccess(rowCount int) } +// UserRetentionPurger is the Sprint 6 COMP-002-RETENTION scheduler-side +// interface. Concrete impl is *service.UserRetentionService — it walks +// every user whose deactivated_at exceeds the retention window and +// scrubs PII columns (email / display_name / oidc_subject hash). The +// loop calls PurgeDeactivatedUsers on every CERTCTL_USER_RETENTION_INTERVAL +// tick. nil = loop is not wired (deployments that disable retention). +type UserRetentionPurger interface { + PurgeDeactivatedUsers(ctx context.Context) (purged, failed int, err error) +} + // JobReaperService defines the interface for job timeout reaping used by the scheduler. type JobReaperService interface { ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error @@ -175,6 +185,7 @@ type Scheduler struct { rateLimitGC RateLimitGarbageCollector auditChainVerifier AuditChainVerifier auditChainRecorder AuditChainBreakRecorder + userRetention UserRetentionPurger jobReaper JobReaperService logger *slog.Logger @@ -196,6 +207,7 @@ type Scheduler struct { sessionGCInterval time.Duration rateLimitGCInterval time.Duration auditChainVerifyInterval time.Duration + userRetentionInterval time.Duration // agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose // owning agent has been silent. Bundle C / Audit M-016. Defaults below. agentOfflineJobTTL time.Duration @@ -220,6 +232,7 @@ type Scheduler struct { sessionGCRunning atomic.Bool rateLimitGCRunning atomic.Bool auditChainVerifyRunning atomic.Bool + userRetentionRunning atomic.Bool // Graceful shutdown: wait for in-flight work to complete wg sync.WaitGroup @@ -265,6 +278,11 @@ func NewScheduler( // not dominate a quiet fleet's DB load. Operators with huge // audit tables can lengthen via CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL. auditChainVerifyInterval: 6 * time.Hour, + // Sprint 6 COMP-002-RETENTION: user PII purge cadence. Default + // 24h — deactivated rows persist past the retention window + // (default 30d) only until the next tick, which is fine for + // GDPR-style "delete within reasonable time" expectations. + userRetentionInterval: 24 * time.Hour, // 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent // must miss multiple heartbeats before its in-flight jobs are reaped. agentOfflineJobTTL: 5 * time.Minute, @@ -469,6 +487,25 @@ func (s *Scheduler) SetAuditChainVerifyInterval(d time.Duration) { s.auditChainVerifyInterval = d } +// SetUserRetentionPurger wires the Sprint 6 COMP-002-RETENTION +// user-PII-purge sweeper. Optional — nil disables the loop (deployments +// that don't have any federated humans yet, or those that want manual +// purge via the admin endpoint only). Concrete impl is +// *service.UserRetentionService. +func (s *Scheduler) SetUserRetentionPurger(p UserRetentionPurger) { + s.userRetention = p +} + +// SetUserRetentionInterval configures the userRetentionLoop tick +// cadence. Default 24h. Wire: CERTCTL_USER_RETENTION_INTERVAL. +// Zero or negative values are ignored. +func (s *Scheduler) SetUserRetentionInterval(d time.Duration) { + if d <= 0 { + return + } + s.userRetentionInterval = d +} + // SetAgentOfflineJobTTL sets the threshold past which a Running job whose // owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016. // Zero or negative values are ignored (the default of 5 minutes is kept). @@ -536,6 +573,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} { if s.auditChainVerifier != nil { loopCount++ } + if s.userRetention != nil { + loopCount++ + } s.wg.Add(loopCount) go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }() @@ -573,6 +613,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} { if s.auditChainVerifier != nil { go func() { defer s.wg.Done(); s.auditChainVerifyLoop(ctx) }() } + if s.userRetention != nil { + go func() { defer s.wg.Done(); s.userRetentionLoop(ctx) }() + } // Signal that all loops are launched close(startedChan) @@ -1454,6 +1497,50 @@ func (s *Scheduler) auditChainVerifyLoop(ctx context.Context) { } } +// userRetentionLoop is the Sprint 6 COMP-002-RETENTION sweeper. Every +// CERTCTL_USER_RETENTION_INTERVAL tick it asks +// UserRetentionService.PurgeDeactivatedUsers to walk every user whose +// deactivated_at is older than the retention window and scrub the PII +// columns. The service is responsible for the row-level work + audit +// emission; the loop only orchestrates cadence + concurrency control. +// +// Mirrors the GC-loop pattern: atomic.Bool guard prevents overlapping +// ticks; per-tick context.WithTimeout caps the worst case at 5 +// minutes. The retention service's purgeBatchCap (default 200) is the +// inner-loop budget — large backlogs spread across multiple ticks. +func (s *Scheduler) userRetentionLoop(ctx context.Context) { + ticker := NewJitteredTicker(s.userRetentionInterval, DefaultSchedulerJitter) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !s.userRetentionRunning.CompareAndSwap(false, true) { + s.logger.Warn("user retention purge still running, skipping tick") + continue + } + s.wg.Add(1) + go func() { + defer s.wg.Done() + defer s.userRetentionRunning.Store(false) + opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + purged, failed, err := s.userRetention.PurgeDeactivatedUsers(opCtx) + if err != nil { + s.logger.Warn("user retention purge failed (next tick will retry)", "error", err) + return + } + if purged > 0 || failed > 0 { + s.logger.Info("user retention purge tick", + "purged", purged, "failed", failed) + } + }() + } + } +} + // runAuditChainVerify executes a single chain-verify pass with the // atomic.Bool + WithTimeout + goroutine pattern every other GC loop // uses. Extracted so the loop body + the "run once on start" path diff --git a/internal/service/user_retention.go b/internal/service/user_retention.go new file mode 100644 index 0000000..91bac27 --- /dev/null +++ b/internal/service/user_retention.go @@ -0,0 +1,224 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package service + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "log/slog" + "strings" + "time" + + userdomain "github.com/certctl-io/certctl/internal/auth/user/domain" + "github.com/certctl-io/certctl/internal/domain" + "github.com/certctl-io/certctl/internal/repository" +) + +// Sprint 6 COMP-002-RETENTION closure. The control plane stores three +// PII surfaces: +// +// users.email — IdP-supplied login email. +// users.display_name — IdP-supplied human label. +// users.oidc_subject — IdP's stable identifier for the human. +// +// Pre-fix there was no in-code primitive for GDPR right-to-be-forgotten +// or for automatic retention purges of deactivated accounts. The +// admin-side deactivate flow at internal/api/handler/auth_users.go +// set users.deactivated_at but the PII columns stayed populated +// forever. +// +// This file delivers the two pieces the audit's fix called for: +// +// Phase 1: DeleteUserPII(actorID) — operator-callable primitive that +// scrubs the row's PII while keeping the audit attribution +// chain intact (the row's id stays, so historical +// audit_events.actor = user.id rows still resolve). +// Phase 2: PurgeDeactivatedUsers(ctx) — walks every user whose +// deactivated_at is older than the retention window and +// calls DeleteUserPII on each. Scheduler loop calls this +// on a tick (default 24h); the retention window itself +// (default 30 days post-deactivate) is operator-tunable. +// +// Audit attribution invariant: DeleteUserPII replaces oidc_subject +// with sha256: rather than nullifying it. Three reasons: +// 1. Preserves the (oidc_provider_id, oidc_subject) UNIQUE +// constraint — two purged users on the same provider still have +// different oidc_subject values, so the constraint never trips. +// 2. The hash is a one-way fingerprint; the original IdP-side +// identifier is unrecoverable post-purge. Re-login under the +// same IdP subject mints a fresh u-id (different row) because +// GetByOIDCSubject won't match the hashed token. +// 3. Forensic continuity: if an operator later needs to prove "a +// user with subject X was deactivated then purged", they can +// recompute sha256(X) and look it up. + +// UserRetentionService exposes the DeleteUserPII + PurgeDeactivatedUsers +// primitives. The handler-side admin endpoint (when wired) calls +// DeleteUserPII directly; the scheduler's userRetentionLoop calls +// PurgeDeactivatedUsers. +type UserRetentionService struct { + users repository.UserRepository + sessions repository.SessionRepository + audit *AuditService + logger *slog.Logger + + // retentionWindow is how long after deactivated_at a user's PII + // stays in the table. The scheduler loop subtracts this from + // time.Now() when computing the "purge before" threshold. + retentionWindow time.Duration + // purgeBatchCap bounds how many users a single PurgeDeactivatedUsers + // call processes — keeps a single tick's blast radius predictable + // even if a large backlog accumulates. Zero = unbounded (test default). + purgeBatchCap int +} + +// NewUserRetentionService wires the deps. The audit service is +// optional (nil = skip audit emission); production wiring in +// cmd/server/main.go passes the singleton. +func NewUserRetentionService( + users repository.UserRepository, + sessions repository.SessionRepository, + audit *AuditService, + logger *slog.Logger, + retentionWindow time.Duration, + purgeBatchCap int, +) *UserRetentionService { + if retentionWindow <= 0 { + retentionWindow = 30 * 24 * time.Hour + } + return &UserRetentionService{ + users: users, + sessions: sessions, + audit: audit, + logger: logger, + retentionWindow: retentionWindow, + purgeBatchCap: purgeBatchCap, + } +} + +// DeleteUserPII scrubs the named user's PII columns. Phase 1 fix per +// the COMP-002-RETENTION audit. Steps: +// +// 1. Load the user row. Returns repository.ErrUserNotFound if missing. +// 2. Revoke all active sessions for the actor (defense-in-depth — the +// handler-side Deactivate path already does this, but a purge after +// N days might catch sessions that were created post-deactivate via +// some other path). +// 3. Zero the PII columns: +// email = "" +// display_name = "" +// oidc_subject = "sha256:" || hex(sha256(original)) +// 4. Persist the row via UserRepository.Update. +// 5. Emit an audit event (auth category, action user.purge_pii) so the +// scrub itself is on record. +// +// Returns nil on success. Idempotent: re-calling on an already-purged +// row hashes the already-hashed oidc_subject, which is a no-op semantic +// (the operator can tell purges happened by the "sha256:" prefix). +func (s *UserRetentionService) DeleteUserPII(ctx context.Context, userID string) error { + u, err := s.users.Get(ctx, userID) + if err != nil { + return fmt.Errorf("user_retention: load %s: %w", userID, err) + } + + // Defense-in-depth: revoke all sessions before the row mutates. + if err := s.sessions.RevokeAllForActor(ctx, u.ID, string(domain.ActorTypeUser), u.TenantID); err != nil { + // Log + continue; PII scrub is the load-bearing step. A + // dangling-session row whose actor's PII is already gone is + // less harmful than leaving the PII intact because the + // session revoke failed. + s.logger.Warn("user_retention: session revoke failed during PII scrub (continuing)", + "user_id", userID, "error", err) + } + + // Hash the oidc_subject IF it isn't already a "sha256:..." token. + // Idempotent re-scrubs are safe; a second pass produces the same + // hash of the hash, but the prefix lets operators tell the row was + // already scrubbed. + if !strings.HasPrefix(u.OIDCSubject, "sha256:") { + sum := sha256.Sum256([]byte(u.OIDCSubject)) + u.OIDCSubject = "sha256:" + hex.EncodeToString(sum[:]) + } + + u.Email = "purged@redacted.local" // domain.User.Validate requires plausible email format + u.DisplayName = "[purged]" // domain.User.Validate forbids leading/trailing whitespace + u.WebAuthnCredentials = []byte(`[]`) // v3-reserved field — keep empty JSONB. + + if err := s.users.Update(ctx, u); err != nil { + return fmt.Errorf("user_retention: update %s: %w", userID, err) + } + + if s.audit != nil { + _ = s.audit.RecordEventWithCategory(ctx, + "system", domain.ActorTypeSystem, + "user.purge_pii", + domain.EventCategoryAuth, + "user", userID, + map[string]interface{}{ + "retained_id": userID, + "hashed_oidc_subject_set": true, + }, + ) + } + + s.logger.Info("user_retention: PII scrubbed", "user_id", userID) + return nil +} + +// PurgeDeactivatedUsers enumerates every user whose deactivated_at is +// older than now - retentionWindow and calls DeleteUserPII on each. +// Returns (purged, failed) counts; logs individual failures at WARN +// and continues so a single bad row doesn't stall the rest of the +// batch. Bounded by purgeBatchCap when non-zero. +func (s *UserRetentionService) PurgeDeactivatedUsers(ctx context.Context) (int, int, error) { + threshold := time.Now().Add(-s.retentionWindow) + rows, err := s.users.ListDeactivatedBefore(ctx, threshold) + if err != nil { + return 0, 0, fmt.Errorf("user_retention: list deactivated: %w", err) + } + + var purged, failed int + for i, u := range rows { + if s.purgeBatchCap > 0 && i >= s.purgeBatchCap { + s.logger.Info("user_retention: batch cap reached; remaining rows deferred to next tick", + "cap", s.purgeBatchCap, "remaining", len(rows)-i) + break + } + // Skip rows that are already scrubbed — DeleteUserPII is + // idempotent but skipping saves a transaction and an audit + // row per tick. + if strings.HasPrefix(u.OIDCSubject, "sha256:") && + strings.HasPrefix(u.Email, "purged@") { + continue + } + if err := s.DeleteUserPII(ctx, u.ID); err != nil { + s.logger.Warn("user_retention: purge failed (next tick will retry)", + "user_id", u.ID, "error", err) + failed++ + continue + } + purged++ + } + if purged > 0 || failed > 0 { + s.logger.Info("user_retention: purge sweep complete", + "purged", purged, + "failed", failed, + "threshold", threshold.Format(time.RFC3339)) + } + return purged, failed, nil +} + +// RetentionWindow exposes the configured window for tests + the +// operator-facing "when will my account be scrubbed" GUI surface. +func (s *UserRetentionService) RetentionWindow() time.Duration { + return s.retentionWindow +} + +// userdomain is imported so the compiler recognises the type used by +// the repository contracts even though this file only consumes pointer +// values via the interface. Keep this blank ref so re-organising the +// file later doesn't accidentally drop the import. +var _ = (*userdomain.User)(nil) diff --git a/internal/service/user_retention_test.go b/internal/service/user_retention_test.go new file mode 100644 index 0000000..295711c --- /dev/null +++ b/internal/service/user_retention_test.go @@ -0,0 +1,302 @@ +// Copyright 2026 certctl LLC. All rights reserved. +// SPDX-License-Identifier: BUSL-1.1 + +package service + +import ( + "context" + "errors" + "io" + "log/slog" + "strings" + "sync" + "testing" + "time" + + sessiondomain "github.com/certctl-io/certctl/internal/auth/session/domain" + userdomain "github.com/certctl-io/certctl/internal/auth/user/domain" + "github.com/certctl-io/certctl/internal/domain" + "github.com/certctl-io/certctl/internal/repository" +) + +// Sprint 6 COMP-002-RETENTION unit tests. The repo + session deps are +// covered by in-memory stubs in this file; the integration shape +// (deactivated_at SQL filter, session revocation in PG) is covered by +// the existing postgres tests for those repositories. + +type retentionStubUserRepo struct { + mu sync.Mutex + byID map[string]*userdomain.User + updateOK bool + updateCh chan struct{} +} + +func newRetentionStubUserRepo() *retentionStubUserRepo { + return &retentionStubUserRepo{ + byID: make(map[string]*userdomain.User), + updateOK: true, + updateCh: make(chan struct{}, 100), + } +} + +func (r *retentionStubUserRepo) seed(u *userdomain.User) { + r.mu.Lock() + defer r.mu.Unlock() + r.byID[u.ID] = u +} + +func (r *retentionStubUserRepo) Get(_ context.Context, id string) (*userdomain.User, error) { + r.mu.Lock() + defer r.mu.Unlock() + u, ok := r.byID[id] + if !ok { + return nil, repository.ErrUserNotFound + } + cp := *u + return &cp, nil +} + +func (r *retentionStubUserRepo) GetByOIDCSubject(_ context.Context, _, _ string) (*userdomain.User, error) { + return nil, repository.ErrUserNotFound +} + +func (r *retentionStubUserRepo) Create(_ context.Context, _ *userdomain.User) error { return nil } + +func (r *retentionStubUserRepo) Update(_ context.Context, u *userdomain.User) error { + r.mu.Lock() + defer r.mu.Unlock() + if !r.updateOK { + return errors.New("retentionStubUserRepo: update disabled") + } + cp := *u + r.byID[u.ID] = &cp + r.updateCh <- struct{}{} + return nil +} + +func (r *retentionStubUserRepo) ListAll(_ context.Context, _ string) ([]*userdomain.User, error) { + return nil, nil +} + +func (r *retentionStubUserRepo) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) { + r.mu.Lock() + defer r.mu.Unlock() + var out []*userdomain.User + for _, u := range r.byID { + if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) { + cp := *u + out = append(out, &cp) + } + } + return out, nil +} + +type retentionStubSessionRepo struct { + mu sync.Mutex + revokedActors []string + revokeError error +} + +// retentionStubSessionRepo satisfies the full repository.SessionRepository +// surface. user_retention.go only calls RevokeAllForActor; the other +// methods are no-op stubs that exist so the fixture compiles against +// the interface. + +func (s *retentionStubSessionRepo) Create(_ context.Context, _ *sessiondomain.Session) error { + return nil +} +func (s *retentionStubSessionRepo) Get(_ context.Context, _ string) (*sessiondomain.Session, error) { + return nil, repository.ErrSessionNotFound +} +func (s *retentionStubSessionRepo) ListByActor(_ context.Context, _, _, _ string) ([]*sessiondomain.Session, error) { + return nil, nil +} +func (s *retentionStubSessionRepo) UpdateLastSeen(_ context.Context, _ string) error { + return nil +} +func (s *retentionStubSessionRepo) UpdateCSRFTokenHash(_ context.Context, _, _ string) error { + return nil +} +func (s *retentionStubSessionRepo) Revoke(_ context.Context, _ string) error { return nil } +func (s *retentionStubSessionRepo) RevokeAllForActor(_ context.Context, actorID, _, _ string) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.revokeError != nil { + return s.revokeError + } + s.revokedActors = append(s.revokedActors, actorID) + return nil +} +func (s *retentionStubSessionRepo) RevokeAllExceptForActor(_ context.Context, _, _, _, _ string) (int, error) { + return 0, nil +} +func (s *retentionStubSessionRepo) GarbageCollectExpired(_ context.Context) (int, error) { + return 0, nil +} +func (s *retentionStubSessionRepo) Delete(_ context.Context, _ string) error { + return nil +} + +func newTestUser(id, email, displayName, subject string, deactivated *time.Time) *userdomain.User { + return &userdomain.User{ + ID: id, + TenantID: "t-default", + Email: email, + DisplayName: displayName, + OIDCSubject: subject, + OIDCProviderID: "op-test", + LastLoginAt: time.Now(), + WebAuthnCredentials: []byte(`[]`), + CreatedAt: time.Now().Add(-90 * 24 * time.Hour), + UpdatedAt: time.Now(), + DeactivatedAt: deactivated, + } +} + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError + 10})) +} + +// TestDeleteUserPII_ScrubsAndRevokes: the operator-facing primitive +// nullifies email + display_name, hashes oidc_subject, and revokes +// sessions on the affected actor. +func TestDeleteUserPII_ScrubsAndRevokes(t *testing.T) { + t.Parallel() + + users := newRetentionStubUserRepo() + sessions := &retentionStubSessionRepo{} + now := time.Now() + deact := now.Add(-45 * 24 * time.Hour) + users.seed(newTestUser("u-1", "alice@example.com", "Alice", "sub-alice", &deact)) + + svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0) + if err := svc.DeleteUserPII(context.Background(), "u-1"); err != nil { + t.Fatalf("DeleteUserPII: %v", err) + } + + got, err := users.Get(context.Background(), "u-1") + if err != nil { + t.Fatalf("post-purge Get: %v", err) + } + if got.Email != "purged@redacted.local" { + t.Errorf("email not scrubbed: %q", got.Email) + } + if got.DisplayName != "[purged]" { + t.Errorf("display_name not scrubbed: %q", got.DisplayName) + } + if !strings.HasPrefix(got.OIDCSubject, "sha256:") { + t.Errorf("oidc_subject not hashed: %q", got.OIDCSubject) + } + if got.OIDCSubject == "sha256:" { + t.Errorf("oidc_subject hash is empty") + } + + sessions.mu.Lock() + if len(sessions.revokedActors) != 1 || sessions.revokedActors[0] != "u-1" { + t.Errorf("RevokeAllForActor not called for u-1; got %v", sessions.revokedActors) + } + sessions.mu.Unlock() +} + +// TestDeleteUserPII_IsIdempotent: scrubbing an already-scrubbed row +// is safe — the oidc_subject hash recurses (sha256 of sha256:hex) +// without breaking the UNIQUE constraint or returning an error. We +// verify the scrubbed values + the prefix. +func TestDeleteUserPII_IsIdempotent(t *testing.T) { + t.Parallel() + + users := newRetentionStubUserRepo() + sessions := &retentionStubSessionRepo{} + now := time.Now() + deact := now.Add(-100 * 24 * time.Hour) + users.seed(newTestUser("u-2", "bob@example.com", "Bob", "sub-bob", &deact)) + + svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0) + if err := svc.DeleteUserPII(context.Background(), "u-2"); err != nil { + t.Fatalf("first DeleteUserPII: %v", err) + } + first, _ := users.Get(context.Background(), "u-2") + if err := svc.DeleteUserPII(context.Background(), "u-2"); err != nil { + t.Fatalf("second DeleteUserPII: %v", err) + } + second, _ := users.Get(context.Background(), "u-2") + + // oidc_subject doesn't get re-hashed (prefix guard). + if first.OIDCSubject != second.OIDCSubject { + t.Errorf("idempotent re-scrub re-hashed oidc_subject: %q -> %q", + first.OIDCSubject, second.OIDCSubject) + } + if !strings.HasPrefix(second.OIDCSubject, "sha256:") { + t.Errorf("scrubbed oidc_subject lost prefix: %q", second.OIDCSubject) + } +} + +// TestPurgeDeactivatedUsers_RespectsWindow: only users whose +// deactivated_at is older than now-retentionWindow get scrubbed; rows +// deactivated within the window remain intact. +func TestPurgeDeactivatedUsers_RespectsWindow(t *testing.T) { + t.Parallel() + + users := newRetentionStubUserRepo() + sessions := &retentionStubSessionRepo{} + now := time.Now() + stale := now.Add(-45 * 24 * time.Hour) // past 30d window + recent := now.Add(-7 * 24 * time.Hour) // inside 30d window + users.seed(newTestUser("u-stale", "stale@example.com", "Stale", "sub-stale", &stale)) + users.seed(newTestUser("u-recent", "recent@example.com", "Recent", "sub-recent", &recent)) + users.seed(newTestUser("u-active", "active@example.com", "Active", "sub-active", nil)) + + svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0) + purged, failed, err := svc.PurgeDeactivatedUsers(context.Background()) + if err != nil { + t.Fatalf("PurgeDeactivatedUsers: %v", err) + } + if purged != 1 { + t.Errorf("expected 1 row purged, got %d", purged) + } + if failed != 0 { + t.Errorf("expected 0 failures, got %d", failed) + } + + staleU, _ := users.Get(context.Background(), "u-stale") + if staleU.Email != "purged@redacted.local" { + t.Errorf("stale row not scrubbed: %q", staleU.Email) + } + recentU, _ := users.Get(context.Background(), "u-recent") + if recentU.Email != "recent@example.com" { + t.Errorf("recent row should not have been scrubbed: %q", recentU.Email) + } + activeU, _ := users.Get(context.Background(), "u-active") + if activeU.Email != "active@example.com" { + t.Errorf("active row should not have been scrubbed: %q", activeU.Email) + } +} + +// TestPurgeDeactivatedUsers_BatchCap caps the per-tick blast radius. +func TestPurgeDeactivatedUsers_BatchCap(t *testing.T) { + t.Parallel() + + users := newRetentionStubUserRepo() + sessions := &retentionStubSessionRepo{} + stale := time.Now().Add(-100 * 24 * time.Hour) + for i := 0; i < 5; i++ { + id := "u-cap-" + string(rune('0'+i)) + users.seed(newTestUser(id, id+"@example.com", id, "sub-"+id, &stale)) + } + + svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 2) + purged, failed, err := svc.PurgeDeactivatedUsers(context.Background()) + if err != nil { + t.Fatalf("PurgeDeactivatedUsers: %v", err) + } + if purged != 2 { + t.Errorf("expected exactly 2 rows purged (batch cap = 2), got %d", purged) + } + if failed != 0 { + t.Errorf("expected 0 failures, got %d", failed) + } +} + +// _ guard for unused-import lint when one of the helpers above is +// removed during future refactors. +var _ = domain.ActorTypeUser