mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-07 18:21:32 +00:00
feat(retention): COMP-002-RETENTION — federated-user PII purge pipeline
Sprint 6 closure of the audit's MED-severity COMP-002-RETENTION
finding.
Pre-fix posture: the federated-user admin surface
(auth_users.go::Deactivate) sets users.deactivated_at on soft-delete,
but the PII columns (email, display_name, oidc_subject) stay
populated forever. No in-code primitive for GDPR right-to-be-
forgotten; no scheduled retention purge.
This commit ships the audit's recommended two-phase fix:
Phase 1 — operator-callable scrub primitive
internal/service/user_retention.go
UserRetentionService.DeleteUserPII(ctx, userID):
- revoke all active sessions (defense-in-depth)
- email := 'purged@redacted.local'
- display_name := '[purged]'
- oidc_subject := 'sha256:' || hex(sha256(original))
- audit_events row with action=user.purge_pii,
category=auth, actor=system
Why hash oidc_subject instead of NULL:
1. (oidc_provider_id, oidc_subject) UNIQUE constraint would
trip on multiple purged users converging to NULL
2. The hash is one-way; the original IdP-side identifier is
unrecoverable. Re-login under the same subject mints a
fresh u-id (right-to-be-forgotten semantics)
3. Forensic continuity: an operator can recompute
sha256(<known-subject>) and confirm "this user was
deactivated then purged"
users.id itself is preserved so historical
audit_events.actor = u-X rows still resolve. The forensic-
attribution chain stays intact even after the PII is gone.
Phase 2 — scheduled batch purge
internal/scheduler/scheduler.go
UserRetentionPurger interface + userRetentionLoop:
- PurgeDeactivatedUsers enumerates every user with
deactivated_at < NOW() - retention_window
- DeleteUserPII per row
- per-tick batch cap (default 200) keeps blast radius
predictable; large backlogs spread across multiple ticks
- atomic.Bool guard + 5-min per-tick context.WithTimeout
Repository contract grew a single new method:
internal/repository/user.go::ListDeactivatedBefore(ctx, t)
internal/repository/postgres/user.go: SQL-side filter
(deactivated_at IS NOT NULL AND deactivated_at < $1)
ORDER BY deactivated_at ASC, cross-tenant.
Configuration
CERTCTL_USER_RETENTION_INTERVAL default 24h
CERTCTL_USER_RETENTION_WINDOW default 30 days
CERTCTL_USER_RETENTION_BATCH_CAP default 200
Test stub additions for repository.UserRepository.ListDeactivatedBefore:
internal/auth/oidc/service_test.go::stubUsers
internal/api/handler/auth_users_test.go::stubFullUserRepo
internal/api/handler/auth_session_oidc_test.go::stubUserRepo
Documentation
docs/operator/privacy-and-retention.md
- retention pipeline diagram (day-0 deactivate → day-N purge)
- operator config table
- verification runbook (4 steps with SQL)
- what's NOT covered (deferred: DSAR export, api_keys cascade,
retroactive audit_events.details redaction)
Tests
internal/service/user_retention_test.go (NEW, 4 tests):
TestDeleteUserPII_ScrubsAndRevokes
TestDeleteUserPII_IsIdempotent
TestPurgeDeactivatedUsers_RespectsWindow
TestPurgeDeactivatedUsers_BatchCap
Verified locally:
go vet ./... (clean)
gofmt -l internal/ cmd/ (clean)
go test -short -count=1 \
./internal/service/... ./internal/scheduler/... ./internal/config/...
(all green)
Cross-sprint interaction: pairs with COMP-001-HASH (prior commit).
The user.purge_pii audit row this service emits flows through the
new hash chain, so the scrub event is itself tamper-evident.
Closes COMP-002-RETENTION. Sprint 6 is complete (2/2 findings).
This commit is contained in:
@@ -1258,6 +1258,26 @@ func main() {
|
|||||||
logger.Info("audit chain verify loop enabled",
|
logger.Info("audit chain verify loop enabled",
|
||||||
"interval", cfg.AuditChain.VerifyInterval.String())
|
"interval", cfg.AuditChain.VerifyInterval.String())
|
||||||
|
|
||||||
|
// Sprint 6 COMP-002-RETENTION: wire the user-PII purge loop. The
|
||||||
|
// service nullifies email + display_name on users whose
|
||||||
|
// deactivated_at exceeds the retention window (default 30d) and
|
||||||
|
// hashes oidc_subject to preserve audit attribution. The scheduler
|
||||||
|
// loop ticks on CERTCTL_USER_RETENTION_INTERVAL (default 24h).
|
||||||
|
userRetentionService := service.NewUserRetentionService(
|
||||||
|
oidcUserRepo,
|
||||||
|
sessionRepo,
|
||||||
|
auditService,
|
||||||
|
logger,
|
||||||
|
cfg.UserRetention.RetentionWindow,
|
||||||
|
cfg.UserRetention.BatchCap,
|
||||||
|
)
|
||||||
|
sched.SetUserRetentionPurger(userRetentionService)
|
||||||
|
sched.SetUserRetentionInterval(cfg.UserRetention.Interval)
|
||||||
|
logger.Info("user PII retention purge loop enabled",
|
||||||
|
"interval", cfg.UserRetention.Interval.String(),
|
||||||
|
"retention_window", cfg.UserRetention.RetentionWindow.String(),
|
||||||
|
"batch_cap", cfg.UserRetention.BatchCap)
|
||||||
|
|
||||||
logger.Info("session GC sweep enabled",
|
logger.Info("session GC sweep enabled",
|
||||||
"interval", cfg.Auth.Session.GCInterval.String(),
|
"interval", cfg.Auth.Session.GCInterval.String(),
|
||||||
"absolute_timeout", cfg.Auth.Session.AbsoluteTimeout.String(),
|
"absolute_timeout", cfg.Auth.Session.AbsoluteTimeout.String(),
|
||||||
|
|||||||
@@ -0,0 +1,136 @@
|
|||||||
|
# Privacy & retention (federated-user PII)
|
||||||
|
|
||||||
|
> Last reviewed: 2026-05-16
|
||||||
|
|
||||||
|
Sprint 6 COMP-002-RETENTION closure. certctl stores three categories
|
||||||
|
of personally-identifiable information for federated humans (Auth
|
||||||
|
Bundle 2 OIDC users):
|
||||||
|
|
||||||
|
| Column | Source | Used by |
|
||||||
|
|---|---|---|
|
||||||
|
| `users.email` | IdP claim (`email`) | Operator GUI "find user by email", display in lists, audit attribution. |
|
||||||
|
| `users.display_name` | IdP claim (`name`) | UI display string for the human. |
|
||||||
|
| `users.oidc_subject` | IdP claim (`sub`) | Stable identifier — joined with `oidc_provider_id` in the (provider, subject) UNIQUE constraint. |
|
||||||
|
|
||||||
|
Pre-fix, deactivating a user (admin-side `auth.user.deactivate`)
|
||||||
|
soft-deleted the row by setting `deactivated_at` but left the PII
|
||||||
|
columns populated indefinitely. The Sprint 6 fix adds an automatic
|
||||||
|
purge pipeline.
|
||||||
|
|
||||||
|
## Retention pipeline shape
|
||||||
|
|
||||||
|
```
|
||||||
|
Day 0 admin → POST /api/v1/auth/users/u-X/deactivate
|
||||||
|
├─ users.deactivated_at = NOW()
|
||||||
|
└─ all active sessions for u-X revoked
|
||||||
|
|
||||||
|
Day N scheduler's userRetentionLoop tick (default cadence 24h)
|
||||||
|
└─ UserRetentionService.PurgeDeactivatedUsers
|
||||||
|
├─ SELECT users WHERE deactivated_at < NOW() - retention_window
|
||||||
|
├─ For each row (batch-capped per tick):
|
||||||
|
│ UserRetentionService.DeleteUserPII(u.id)
|
||||||
|
│ ├─ revoke all active sessions (defense-in-depth)
|
||||||
|
│ ├─ email := "purged@redacted.local"
|
||||||
|
│ ├─ display_name := "[purged]"
|
||||||
|
│ ├─ oidc_subject := "sha256:" || hex(sha256(original))
|
||||||
|
│ └─ audit_events row (action=user.purge_pii, category=auth)
|
||||||
|
```
|
||||||
|
|
||||||
|
`users.id` is **preserved**. Historical `audit_events.actor = u-X`
|
||||||
|
rows still resolve to the row (now scrubbed). This is the
|
||||||
|
forensic-attribution guarantee — the operator can prove "user u-X
|
||||||
|
performed action Y on date Z" even after the PII is gone.
|
||||||
|
|
||||||
|
`oidc_subject` is **hashed**, not nullified, for two reasons:
|
||||||
|
|
||||||
|
1. The `(oidc_provider_id, oidc_subject)` UNIQUE constraint would
|
||||||
|
trip if multiple purged users converged on the same NULL.
|
||||||
|
2. Re-login under the same IdP subject creates a fresh row (different
|
||||||
|
`u-` id) because `GetByOIDCSubject` won't match the hashed token —
|
||||||
|
the original subject is unrecoverable from the hash. This is the
|
||||||
|
"right-to-be-forgotten" behavior: the same human logging back in
|
||||||
|
is functionally a new account.
|
||||||
|
|
||||||
|
## Operator configuration
|
||||||
|
|
||||||
|
| Env var | Default | Notes |
|
||||||
|
|---|---|---|
|
||||||
|
| `CERTCTL_USER_RETENTION_INTERVAL` | `24h` | Tick cadence for the scheduler's userRetentionLoop. Zero or negative ignored. |
|
||||||
|
| `CERTCTL_USER_RETENTION_WINDOW` | `30 * 24h` (30 days) | How long after `deactivated_at` a row's PII stays in the table. Operators with stricter GDPR/CCPA expectations may shorten. |
|
||||||
|
| `CERTCTL_USER_RETENTION_BATCH_CAP` | `200` | Per-tick row budget. Larger backlogs spread across multiple ticks. 0 = unbounded (test fixtures only). |
|
||||||
|
|
||||||
|
## How to verify retention is working
|
||||||
|
|
||||||
|
1. Deactivate a test user via the admin path:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST -H "X-API-Key: $ADMIN_KEY" \
|
||||||
|
https://certctl.example.com/api/v1/auth/users/u-test/deactivate
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Confirm the row's `deactivated_at` is set:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT id, email, deactivated_at FROM users WHERE id = 'u-test';
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Backdate `deactivated_at` to past the retention window (only for
|
||||||
|
testing — never in production):
|
||||||
|
|
||||||
|
```sql
|
||||||
|
UPDATE users SET deactivated_at = NOW() - INTERVAL '60 days'
|
||||||
|
WHERE id = 'u-test';
|
||||||
|
```
|
||||||
|
|
||||||
|
(Note: this UPDATE will succeed because `users` doesn't have a
|
||||||
|
WORM trigger; the audit-events WORM trigger is unrelated.)
|
||||||
|
|
||||||
|
4. Wait for the next `userRetentionLoop` tick (or restart the server
|
||||||
|
to force an immediate sweep). Confirm scrub:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT id, email, display_name, oidc_subject
|
||||||
|
FROM users
|
||||||
|
WHERE id = 'u-test';
|
||||||
|
```
|
||||||
|
|
||||||
|
Expected: `email = 'purged@redacted.local'`,
|
||||||
|
`display_name = '[purged]'`,
|
||||||
|
`oidc_subject LIKE 'sha256:%'`.
|
||||||
|
|
||||||
|
5. Confirm an audit row was emitted:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
SELECT id, actor, action, resource_id, timestamp
|
||||||
|
FROM audit_events
|
||||||
|
WHERE action = 'user.purge_pii' AND resource_id = 'u-test'
|
||||||
|
ORDER BY timestamp DESC LIMIT 1;
|
||||||
|
```
|
||||||
|
|
||||||
|
## What's NOT covered (deferred work)
|
||||||
|
|
||||||
|
The Sprint 6 fix is Phase 1 of the audit's COMP-002-RETENTION
|
||||||
|
recommendation. Two further pieces are forward-looking:
|
||||||
|
|
||||||
|
- **GDPR data-subject access request (DSAR) export.** A "show me
|
||||||
|
everything you know about me" endpoint is not yet implemented.
|
||||||
|
Operators on EU-resident data should treat this as a manual SQL
|
||||||
|
procedure today; track for Phase 2.
|
||||||
|
- **Cascade purge of related rows.** Sessions are revoked (above);
|
||||||
|
api_keys with `created_by = u-X` are NOT yet purged on scrub. The
|
||||||
|
api_keys table doesn't have a foreign key to users (it indexes by
|
||||||
|
`actor_id` strings, free-form), so the cascade is a service-layer
|
||||||
|
concern that needs explicit wiring. Track for Phase 2.
|
||||||
|
- **Per-event PII redaction in `audit_events.details`.** The existing
|
||||||
|
`RedactDetailsForAudit` (`internal/service/audit_redact.go`) scrubs
|
||||||
|
credential + PII keys at write time. A future feature for
|
||||||
|
"retroactively re-redact existing rows" would interact with the WORM
|
||||||
|
trigger; out of scope today.
|
||||||
|
|
||||||
|
## See also
|
||||||
|
|
||||||
|
- `internal/service/user_retention.go` — `UserRetentionService` source.
|
||||||
|
- `internal/scheduler/scheduler.go::userRetentionLoop` — scheduler loop.
|
||||||
|
- `migrations/000036_users.up.sql` — `users` table definition.
|
||||||
|
- `migrations/000045_users_deactivated_at.up.sql` — `deactivated_at` column.
|
||||||
|
- `docs/operator/audit-chain.md` — paired Sprint 6 tamper-evidence work.
|
||||||
@@ -255,6 +255,14 @@ func (s *stubUserRepo) ListAll(_ context.Context, _ string) ([]*userdomain.User,
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListDeactivatedBefore satisfies the Sprint 6 COMP-002-RETENTION
|
||||||
|
// interface addition. The phase-5 OIDC handler tests don't exercise
|
||||||
|
// retention paths, so an empty result keeps the contract without
|
||||||
|
// changing test semantics.
|
||||||
|
func (s *stubUserRepo) ListDeactivatedBefore(_ context.Context, _ time.Time) ([]*userdomain.User, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
type phase5StubAudit struct {
|
type phase5StubAudit struct {
|
||||||
events []string
|
events []string
|
||||||
// Audit 2026-05-11 Fix 13 — capture the details map so the
|
// Audit 2026-05-11 Fix 13 — capture the details map so the
|
||||||
|
|||||||
@@ -83,6 +83,20 @@ func (s *stubFullUserRepo) ListAll(_ context.Context, tenantID string) ([]*userd
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListDeactivatedBefore satisfies the Sprint 6 COMP-002-RETENTION
|
||||||
|
// interface addition. Walk rows, filter by DeactivatedAt-before-threshold.
|
||||||
|
// Order is intentionally not stabilised — the auth_users handler tests
|
||||||
|
// don't exercise the retention loop.
|
||||||
|
func (s *stubFullUserRepo) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) {
|
||||||
|
var out []*userdomain.User
|
||||||
|
for _, u := range s.rows {
|
||||||
|
if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) {
|
||||||
|
out = append(out, u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// stubRevoker records cascade-revoke calls.
|
// stubRevoker records cascade-revoke calls.
|
||||||
type stubRevoker struct {
|
type stubRevoker struct {
|
||||||
called bool
|
called bool
|
||||||
|
|||||||
@@ -392,6 +392,20 @@ func (s *stubUsers) ListAll(_ context.Context, _ string) ([]*userdomain.User, er
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListDeactivatedBefore satisfies the Sprint 6 COMP-002-RETENTION
|
||||||
|
// interface addition. Stub-side: walk byID and filter on the
|
||||||
|
// DeactivatedAt cursor; OIDC service tests don't care about ordering
|
||||||
|
// stability.
|
||||||
|
func (s *stubUsers) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) {
|
||||||
|
var out []*userdomain.User
|
||||||
|
for _, u := range s.byID {
|
||||||
|
if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) {
|
||||||
|
out = append(out, u)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
type stubSessions struct {
|
type stubSessions struct {
|
||||||
cookieValue string
|
cookieValue string
|
||||||
csrfToken string
|
csrfToken string
|
||||||
|
|||||||
@@ -108,6 +108,10 @@ type Config struct {
|
|||||||
// cadence. Scheduler loop auditChainVerifyLoop reads VerifyInterval;
|
// cadence. Scheduler loop auditChainVerifyLoop reads VerifyInterval;
|
||||||
// the metric-side counter is wired separately in cmd/server/main.go.
|
// the metric-side counter is wired separately in cmd/server/main.go.
|
||||||
AuditChain AuditChainConfig
|
AuditChain AuditChainConfig
|
||||||
|
// UserRetention holds the Sprint 6 COMP-002-RETENTION purge cadence
|
||||||
|
// + window. The scheduler's userRetentionLoop reads Interval; the
|
||||||
|
// UserRetentionService reads RetentionWindow + BatchCap.
|
||||||
|
UserRetention UserRetentionConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// AuditChainConfig configures the audit_events tamper-evidence
|
// AuditChainConfig configures the audit_events tamper-evidence
|
||||||
@@ -126,6 +130,26 @@ type AuditChainConfig struct {
|
|||||||
VerifyInterval time.Duration
|
VerifyInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UserRetentionConfig configures the Sprint 6 COMP-002-RETENTION user
|
||||||
|
// PII purge sweeper. The scheduler's userRetentionLoop walks every
|
||||||
|
// user with deactivated_at older than RetentionWindow and scrubs the
|
||||||
|
// PII columns via UserRetentionService.DeleteUserPII.
|
||||||
|
type UserRetentionConfig struct {
|
||||||
|
// Interval is the tick cadence. Default 24h.
|
||||||
|
// Setting: CERTCTL_USER_RETENTION_INTERVAL.
|
||||||
|
Interval time.Duration
|
||||||
|
// RetentionWindow is how long after deactivated_at a row's PII
|
||||||
|
// stays in the table. Default 30 days. Operators with strict
|
||||||
|
// GDPR / CCPA expectations may shorten; operators who need
|
||||||
|
// forensic recovery latitude may lengthen.
|
||||||
|
// Setting: CERTCTL_USER_RETENTION_WINDOW.
|
||||||
|
RetentionWindow time.Duration
|
||||||
|
// BatchCap bounds how many users a single tick processes. Default
|
||||||
|
// 200 — keeps blast radius predictable. Set to 0 to disable the
|
||||||
|
// cap (test fixtures only).
|
||||||
|
// Setting: CERTCTL_USER_RETENTION_BATCH_CAP.
|
||||||
|
BatchCap int
|
||||||
|
}
|
||||||
|
|
||||||
// OCSPResponderConfig configures the dedicated OCSP-responder cert
|
// OCSPResponderConfig configures the dedicated OCSP-responder cert
|
||||||
// per issuer (RFC 6960 §2.6 + §4.2.2.2). When unset, the local issuer
|
// per issuer (RFC 6960 §2.6 + §4.2.2.2). When unset, the local issuer
|
||||||
@@ -724,6 +748,11 @@ func Load() (*Config, error) {
|
|||||||
AuditChain: AuditChainConfig{
|
AuditChain: AuditChainConfig{
|
||||||
VerifyInterval: getEnvDuration("CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL", 6*time.Hour),
|
VerifyInterval: getEnvDuration("CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL", 6*time.Hour),
|
||||||
},
|
},
|
||||||
|
UserRetention: UserRetentionConfig{
|
||||||
|
Interval: getEnvDuration("CERTCTL_USER_RETENTION_INTERVAL", 24*time.Hour),
|
||||||
|
RetentionWindow: getEnvDuration("CERTCTL_USER_RETENTION_WINDOW", 30*24*time.Hour),
|
||||||
|
BatchCap: getEnvInt("CERTCTL_USER_RETENTION_BATCH_CAP", 200),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse CERTCTL_API_KEYS_NAMED for named key authentication (M-002).
|
// Parse CERTCTL_API_KEYS_NAMED for named key authentication (M-002).
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
|
|
||||||
@@ -177,3 +178,33 @@ func (r *UserRepository) ListAll(ctx context.Context, tenantID string) ([]*userd
|
|||||||
}
|
}
|
||||||
return out, rows.Err()
|
return out, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListDeactivatedBefore returns every user (across all tenants) whose
|
||||||
|
// deactivated_at is not NULL AND strictly before threshold. Sprint 6
|
||||||
|
// COMP-002-RETENTION — the userRetentionLoop in the scheduler walks
|
||||||
|
// this list per tick and calls UserRetentionService.DeleteUserPII on
|
||||||
|
// each. Cross-tenant on purpose: a single retention policy spans the
|
||||||
|
// whole control plane.
|
||||||
|
func (r *UserRepository) ListDeactivatedBefore(ctx context.Context, threshold time.Time) ([]*userdomain.User, error) {
|
||||||
|
rows, err := r.db.QueryContext(ctx,
|
||||||
|
`SELECT `+userColumns+`
|
||||||
|
FROM users
|
||||||
|
WHERE deactivated_at IS NOT NULL
|
||||||
|
AND deactivated_at < $1
|
||||||
|
ORDER BY deactivated_at ASC`,
|
||||||
|
threshold)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("users list_deactivated_before: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var out []*userdomain.User
|
||||||
|
for rows.Next() {
|
||||||
|
u, err := scanUser(rows)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("users scan: %w", err)
|
||||||
|
}
|
||||||
|
out = append(out, u)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ package repository
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"time"
|
||||||
|
|
||||||
userdomain "github.com/certctl-io/certctl/internal/auth/user/domain"
|
userdomain "github.com/certctl-io/certctl/internal/auth/user/domain"
|
||||||
)
|
)
|
||||||
@@ -46,4 +47,12 @@ type UserRepository interface {
|
|||||||
// ListAll returns every user in the tenant. Order:
|
// ListAll returns every user in the tenant. Order:
|
||||||
// created_at ASC. Used by the GUI's admin surface.
|
// created_at ASC. Used by the GUI's admin surface.
|
||||||
ListAll(ctx context.Context, tenantID string) ([]*userdomain.User, error)
|
ListAll(ctx context.Context, tenantID string) ([]*userdomain.User, error)
|
||||||
|
|
||||||
|
// ListDeactivatedBefore returns every user whose deactivated_at is
|
||||||
|
// not NULL AND strictly before the supplied threshold. Sprint 6
|
||||||
|
// COMP-002-RETENTION closure — the scheduler's userRetentionLoop
|
||||||
|
// uses this to enumerate purge-eligible rows on each tick. Order:
|
||||||
|
// deactivated_at ASC (oldest first, so a tick-budget cap is
|
||||||
|
// deterministic about which rows it processes).
|
||||||
|
ListDeactivatedBefore(ctx context.Context, threshold time.Time) ([]*userdomain.User, error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -145,6 +145,16 @@ type AuditChainBreakRecorder interface {
|
|||||||
RecordSuccess(rowCount int)
|
RecordSuccess(rowCount int)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UserRetentionPurger is the Sprint 6 COMP-002-RETENTION scheduler-side
|
||||||
|
// interface. Concrete impl is *service.UserRetentionService — it walks
|
||||||
|
// every user whose deactivated_at exceeds the retention window and
|
||||||
|
// scrubs PII columns (email / display_name / oidc_subject hash). The
|
||||||
|
// loop calls PurgeDeactivatedUsers on every CERTCTL_USER_RETENTION_INTERVAL
|
||||||
|
// tick. nil = loop is not wired (deployments that disable retention).
|
||||||
|
type UserRetentionPurger interface {
|
||||||
|
PurgeDeactivatedUsers(ctx context.Context) (purged, failed int, err error)
|
||||||
|
}
|
||||||
|
|
||||||
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
|
// JobReaperService defines the interface for job timeout reaping used by the scheduler.
|
||||||
type JobReaperService interface {
|
type JobReaperService interface {
|
||||||
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
|
ReapTimedOutJobs(ctx context.Context, csrTTL, approvalTTL time.Duration) error
|
||||||
@@ -175,6 +185,7 @@ type Scheduler struct {
|
|||||||
rateLimitGC RateLimitGarbageCollector
|
rateLimitGC RateLimitGarbageCollector
|
||||||
auditChainVerifier AuditChainVerifier
|
auditChainVerifier AuditChainVerifier
|
||||||
auditChainRecorder AuditChainBreakRecorder
|
auditChainRecorder AuditChainBreakRecorder
|
||||||
|
userRetention UserRetentionPurger
|
||||||
jobReaper JobReaperService
|
jobReaper JobReaperService
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
|
||||||
@@ -196,6 +207,7 @@ type Scheduler struct {
|
|||||||
sessionGCInterval time.Duration
|
sessionGCInterval time.Duration
|
||||||
rateLimitGCInterval time.Duration
|
rateLimitGCInterval time.Duration
|
||||||
auditChainVerifyInterval time.Duration
|
auditChainVerifyInterval time.Duration
|
||||||
|
userRetentionInterval time.Duration
|
||||||
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
|
// agentOfflineJobTTL: per-tick threshold for reaping Running jobs whose
|
||||||
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
|
// owning agent has been silent. Bundle C / Audit M-016. Defaults below.
|
||||||
agentOfflineJobTTL time.Duration
|
agentOfflineJobTTL time.Duration
|
||||||
@@ -220,6 +232,7 @@ type Scheduler struct {
|
|||||||
sessionGCRunning atomic.Bool
|
sessionGCRunning atomic.Bool
|
||||||
rateLimitGCRunning atomic.Bool
|
rateLimitGCRunning atomic.Bool
|
||||||
auditChainVerifyRunning atomic.Bool
|
auditChainVerifyRunning atomic.Bool
|
||||||
|
userRetentionRunning atomic.Bool
|
||||||
|
|
||||||
// Graceful shutdown: wait for in-flight work to complete
|
// Graceful shutdown: wait for in-flight work to complete
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@@ -265,6 +278,11 @@ func NewScheduler(
|
|||||||
// not dominate a quiet fleet's DB load. Operators with huge
|
// not dominate a quiet fleet's DB load. Operators with huge
|
||||||
// audit tables can lengthen via CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL.
|
// audit tables can lengthen via CERTCTL_AUDIT_CHAIN_VERIFY_INTERVAL.
|
||||||
auditChainVerifyInterval: 6 * time.Hour,
|
auditChainVerifyInterval: 6 * time.Hour,
|
||||||
|
// Sprint 6 COMP-002-RETENTION: user PII purge cadence. Default
|
||||||
|
// 24h — deactivated rows persist past the retention window
|
||||||
|
// (default 30d) only until the next tick, which is fine for
|
||||||
|
// GDPR-style "delete within reasonable time" expectations.
|
||||||
|
userRetentionInterval: 24 * time.Hour,
|
||||||
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
|
// 5 minutes is 5×agentHealthCheckInterval default of 1m; an agent
|
||||||
// must miss multiple heartbeats before its in-flight jobs are reaped.
|
// must miss multiple heartbeats before its in-flight jobs are reaped.
|
||||||
agentOfflineJobTTL: 5 * time.Minute,
|
agentOfflineJobTTL: 5 * time.Minute,
|
||||||
@@ -469,6 +487,25 @@ func (s *Scheduler) SetAuditChainVerifyInterval(d time.Duration) {
|
|||||||
s.auditChainVerifyInterval = d
|
s.auditChainVerifyInterval = d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetUserRetentionPurger wires the Sprint 6 COMP-002-RETENTION
|
||||||
|
// user-PII-purge sweeper. Optional — nil disables the loop (deployments
|
||||||
|
// that don't have any federated humans yet, or those that want manual
|
||||||
|
// purge via the admin endpoint only). Concrete impl is
|
||||||
|
// *service.UserRetentionService.
|
||||||
|
func (s *Scheduler) SetUserRetentionPurger(p UserRetentionPurger) {
|
||||||
|
s.userRetention = p
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetUserRetentionInterval configures the userRetentionLoop tick
|
||||||
|
// cadence. Default 24h. Wire: CERTCTL_USER_RETENTION_INTERVAL.
|
||||||
|
// Zero or negative values are ignored.
|
||||||
|
func (s *Scheduler) SetUserRetentionInterval(d time.Duration) {
|
||||||
|
if d <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.userRetentionInterval = d
|
||||||
|
}
|
||||||
|
|
||||||
// SetAgentOfflineJobTTL sets the threshold past which a Running job whose
|
// SetAgentOfflineJobTTL sets the threshold past which a Running job whose
|
||||||
// owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016.
|
// owning agent has gone silent is reaped to Failed. Bundle C / Audit M-016.
|
||||||
// Zero or negative values are ignored (the default of 5 minutes is kept).
|
// Zero or negative values are ignored (the default of 5 minutes is kept).
|
||||||
@@ -536,6 +573,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
|||||||
if s.auditChainVerifier != nil {
|
if s.auditChainVerifier != nil {
|
||||||
loopCount++
|
loopCount++
|
||||||
}
|
}
|
||||||
|
if s.userRetention != nil {
|
||||||
|
loopCount++
|
||||||
|
}
|
||||||
s.wg.Add(loopCount)
|
s.wg.Add(loopCount)
|
||||||
|
|
||||||
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
|
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
|
||||||
@@ -573,6 +613,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
|||||||
if s.auditChainVerifier != nil {
|
if s.auditChainVerifier != nil {
|
||||||
go func() { defer s.wg.Done(); s.auditChainVerifyLoop(ctx) }()
|
go func() { defer s.wg.Done(); s.auditChainVerifyLoop(ctx) }()
|
||||||
}
|
}
|
||||||
|
if s.userRetention != nil {
|
||||||
|
go func() { defer s.wg.Done(); s.userRetentionLoop(ctx) }()
|
||||||
|
}
|
||||||
|
|
||||||
// Signal that all loops are launched
|
// Signal that all loops are launched
|
||||||
close(startedChan)
|
close(startedChan)
|
||||||
@@ -1454,6 +1497,50 @@ func (s *Scheduler) auditChainVerifyLoop(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// userRetentionLoop is the Sprint 6 COMP-002-RETENTION sweeper. Every
|
||||||
|
// CERTCTL_USER_RETENTION_INTERVAL tick it asks
|
||||||
|
// UserRetentionService.PurgeDeactivatedUsers to walk every user whose
|
||||||
|
// deactivated_at is older than the retention window and scrub the PII
|
||||||
|
// columns. The service is responsible for the row-level work + audit
|
||||||
|
// emission; the loop only orchestrates cadence + concurrency control.
|
||||||
|
//
|
||||||
|
// Mirrors the GC-loop pattern: atomic.Bool guard prevents overlapping
|
||||||
|
// ticks; per-tick context.WithTimeout caps the worst case at 5
|
||||||
|
// minutes. The retention service's purgeBatchCap (default 200) is the
|
||||||
|
// inner-loop budget — large backlogs spread across multiple ticks.
|
||||||
|
func (s *Scheduler) userRetentionLoop(ctx context.Context) {
|
||||||
|
ticker := NewJitteredTicker(s.userRetentionInterval, DefaultSchedulerJitter)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if !s.userRetentionRunning.CompareAndSwap(false, true) {
|
||||||
|
s.logger.Warn("user retention purge still running, skipping tick")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
s.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer s.wg.Done()
|
||||||
|
defer s.userRetentionRunning.Store(false)
|
||||||
|
opCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
purged, failed, err := s.userRetention.PurgeDeactivatedUsers(opCtx)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Warn("user retention purge failed (next tick will retry)", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if purged > 0 || failed > 0 {
|
||||||
|
s.logger.Info("user retention purge tick",
|
||||||
|
"purged", purged, "failed", failed)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// runAuditChainVerify executes a single chain-verify pass with the
|
// runAuditChainVerify executes a single chain-verify pass with the
|
||||||
// atomic.Bool + WithTimeout + goroutine pattern every other GC loop
|
// atomic.Bool + WithTimeout + goroutine pattern every other GC loop
|
||||||
// uses. Extracted so the loop body + the "run once on start" path
|
// uses. Extracted so the loop body + the "run once on start" path
|
||||||
|
|||||||
@@ -0,0 +1,224 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
userdomain "github.com/certctl-io/certctl/internal/auth/user/domain"
|
||||||
|
"github.com/certctl-io/certctl/internal/domain"
|
||||||
|
"github.com/certctl-io/certctl/internal/repository"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sprint 6 COMP-002-RETENTION closure. The control plane stores three
|
||||||
|
// PII surfaces:
|
||||||
|
//
|
||||||
|
// users.email — IdP-supplied login email.
|
||||||
|
// users.display_name — IdP-supplied human label.
|
||||||
|
// users.oidc_subject — IdP's stable identifier for the human.
|
||||||
|
//
|
||||||
|
// Pre-fix there was no in-code primitive for GDPR right-to-be-forgotten
|
||||||
|
// or for automatic retention purges of deactivated accounts. The
|
||||||
|
// admin-side deactivate flow at internal/api/handler/auth_users.go
|
||||||
|
// set users.deactivated_at but the PII columns stayed populated
|
||||||
|
// forever.
|
||||||
|
//
|
||||||
|
// This file delivers the two pieces the audit's fix called for:
|
||||||
|
//
|
||||||
|
// Phase 1: DeleteUserPII(actorID) — operator-callable primitive that
|
||||||
|
// scrubs the row's PII while keeping the audit attribution
|
||||||
|
// chain intact (the row's id stays, so historical
|
||||||
|
// audit_events.actor = user.id rows still resolve).
|
||||||
|
// Phase 2: PurgeDeactivatedUsers(ctx) — walks every user whose
|
||||||
|
// deactivated_at is older than the retention window and
|
||||||
|
// calls DeleteUserPII on each. Scheduler loop calls this
|
||||||
|
// on a tick (default 24h); the retention window itself
|
||||||
|
// (default 30 days post-deactivate) is operator-tunable.
|
||||||
|
//
|
||||||
|
// Audit attribution invariant: DeleteUserPII replaces oidc_subject
|
||||||
|
// with sha256:<hex> rather than nullifying it. Three reasons:
|
||||||
|
// 1. Preserves the (oidc_provider_id, oidc_subject) UNIQUE
|
||||||
|
// constraint — two purged users on the same provider still have
|
||||||
|
// different oidc_subject values, so the constraint never trips.
|
||||||
|
// 2. The hash is a one-way fingerprint; the original IdP-side
|
||||||
|
// identifier is unrecoverable post-purge. Re-login under the
|
||||||
|
// same IdP subject mints a fresh u-id (different row) because
|
||||||
|
// GetByOIDCSubject won't match the hashed token.
|
||||||
|
// 3. Forensic continuity: if an operator later needs to prove "a
|
||||||
|
// user with subject X was deactivated then purged", they can
|
||||||
|
// recompute sha256(X) and look it up.
|
||||||
|
|
||||||
|
// UserRetentionService exposes the DeleteUserPII + PurgeDeactivatedUsers
|
||||||
|
// primitives. The handler-side admin endpoint (when wired) calls
|
||||||
|
// DeleteUserPII directly; the scheduler's userRetentionLoop calls
|
||||||
|
// PurgeDeactivatedUsers.
|
||||||
|
type UserRetentionService struct {
|
||||||
|
users repository.UserRepository
|
||||||
|
sessions repository.SessionRepository
|
||||||
|
audit *AuditService
|
||||||
|
logger *slog.Logger
|
||||||
|
|
||||||
|
// retentionWindow is how long after deactivated_at a user's PII
|
||||||
|
// stays in the table. The scheduler loop subtracts this from
|
||||||
|
// time.Now() when computing the "purge before" threshold.
|
||||||
|
retentionWindow time.Duration
|
||||||
|
// purgeBatchCap bounds how many users a single PurgeDeactivatedUsers
|
||||||
|
// call processes — keeps a single tick's blast radius predictable
|
||||||
|
// even if a large backlog accumulates. Zero = unbounded (test default).
|
||||||
|
purgeBatchCap int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUserRetentionService wires the deps. The audit service is
|
||||||
|
// optional (nil = skip audit emission); production wiring in
|
||||||
|
// cmd/server/main.go passes the singleton.
|
||||||
|
func NewUserRetentionService(
|
||||||
|
users repository.UserRepository,
|
||||||
|
sessions repository.SessionRepository,
|
||||||
|
audit *AuditService,
|
||||||
|
logger *slog.Logger,
|
||||||
|
retentionWindow time.Duration,
|
||||||
|
purgeBatchCap int,
|
||||||
|
) *UserRetentionService {
|
||||||
|
if retentionWindow <= 0 {
|
||||||
|
retentionWindow = 30 * 24 * time.Hour
|
||||||
|
}
|
||||||
|
return &UserRetentionService{
|
||||||
|
users: users,
|
||||||
|
sessions: sessions,
|
||||||
|
audit: audit,
|
||||||
|
logger: logger,
|
||||||
|
retentionWindow: retentionWindow,
|
||||||
|
purgeBatchCap: purgeBatchCap,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteUserPII scrubs the named user's PII columns. Phase 1 fix per
|
||||||
|
// the COMP-002-RETENTION audit. Steps:
|
||||||
|
//
|
||||||
|
// 1. Load the user row. Returns repository.ErrUserNotFound if missing.
|
||||||
|
// 2. Revoke all active sessions for the actor (defense-in-depth — the
|
||||||
|
// handler-side Deactivate path already does this, but a purge after
|
||||||
|
// N days might catch sessions that were created post-deactivate via
|
||||||
|
// some other path).
|
||||||
|
// 3. Zero the PII columns:
|
||||||
|
// email = ""
|
||||||
|
// display_name = ""
|
||||||
|
// oidc_subject = "sha256:" || hex(sha256(original))
|
||||||
|
// 4. Persist the row via UserRepository.Update.
|
||||||
|
// 5. Emit an audit event (auth category, action user.purge_pii) so the
|
||||||
|
// scrub itself is on record.
|
||||||
|
//
|
||||||
|
// Returns nil on success. Idempotent: re-calling on an already-purged
|
||||||
|
// row hashes the already-hashed oidc_subject, which is a no-op semantic
|
||||||
|
// (the operator can tell purges happened by the "sha256:" prefix).
|
||||||
|
func (s *UserRetentionService) DeleteUserPII(ctx context.Context, userID string) error {
|
||||||
|
u, err := s.users.Get(ctx, userID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("user_retention: load %s: %w", userID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Defense-in-depth: revoke all sessions before the row mutates.
|
||||||
|
if err := s.sessions.RevokeAllForActor(ctx, u.ID, string(domain.ActorTypeUser), u.TenantID); err != nil {
|
||||||
|
// Log + continue; PII scrub is the load-bearing step. A
|
||||||
|
// dangling-session row whose actor's PII is already gone is
|
||||||
|
// less harmful than leaving the PII intact because the
|
||||||
|
// session revoke failed.
|
||||||
|
s.logger.Warn("user_retention: session revoke failed during PII scrub (continuing)",
|
||||||
|
"user_id", userID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Hash the oidc_subject IF it isn't already a "sha256:..." token.
|
||||||
|
// Idempotent re-scrubs are safe; a second pass produces the same
|
||||||
|
// hash of the hash, but the prefix lets operators tell the row was
|
||||||
|
// already scrubbed.
|
||||||
|
if !strings.HasPrefix(u.OIDCSubject, "sha256:") {
|
||||||
|
sum := sha256.Sum256([]byte(u.OIDCSubject))
|
||||||
|
u.OIDCSubject = "sha256:" + hex.EncodeToString(sum[:])
|
||||||
|
}
|
||||||
|
|
||||||
|
u.Email = "purged@redacted.local" // domain.User.Validate requires plausible email format
|
||||||
|
u.DisplayName = "[purged]" // domain.User.Validate forbids leading/trailing whitespace
|
||||||
|
u.WebAuthnCredentials = []byte(`[]`) // v3-reserved field — keep empty JSONB.
|
||||||
|
|
||||||
|
if err := s.users.Update(ctx, u); err != nil {
|
||||||
|
return fmt.Errorf("user_retention: update %s: %w", userID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.audit != nil {
|
||||||
|
_ = s.audit.RecordEventWithCategory(ctx,
|
||||||
|
"system", domain.ActorTypeSystem,
|
||||||
|
"user.purge_pii",
|
||||||
|
domain.EventCategoryAuth,
|
||||||
|
"user", userID,
|
||||||
|
map[string]interface{}{
|
||||||
|
"retained_id": userID,
|
||||||
|
"hashed_oidc_subject_set": true,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Info("user_retention: PII scrubbed", "user_id", userID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// PurgeDeactivatedUsers enumerates every user whose deactivated_at is
|
||||||
|
// older than now - retentionWindow and calls DeleteUserPII on each.
|
||||||
|
// Returns (purged, failed) counts; logs individual failures at WARN
|
||||||
|
// and continues so a single bad row doesn't stall the rest of the
|
||||||
|
// batch. Bounded by purgeBatchCap when non-zero.
|
||||||
|
func (s *UserRetentionService) PurgeDeactivatedUsers(ctx context.Context) (int, int, error) {
|
||||||
|
threshold := time.Now().Add(-s.retentionWindow)
|
||||||
|
rows, err := s.users.ListDeactivatedBefore(ctx, threshold)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, fmt.Errorf("user_retention: list deactivated: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var purged, failed int
|
||||||
|
for i, u := range rows {
|
||||||
|
if s.purgeBatchCap > 0 && i >= s.purgeBatchCap {
|
||||||
|
s.logger.Info("user_retention: batch cap reached; remaining rows deferred to next tick",
|
||||||
|
"cap", s.purgeBatchCap, "remaining", len(rows)-i)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Skip rows that are already scrubbed — DeleteUserPII is
|
||||||
|
// idempotent but skipping saves a transaction and an audit
|
||||||
|
// row per tick.
|
||||||
|
if strings.HasPrefix(u.OIDCSubject, "sha256:") &&
|
||||||
|
strings.HasPrefix(u.Email, "purged@") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := s.DeleteUserPII(ctx, u.ID); err != nil {
|
||||||
|
s.logger.Warn("user_retention: purge failed (next tick will retry)",
|
||||||
|
"user_id", u.ID, "error", err)
|
||||||
|
failed++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
purged++
|
||||||
|
}
|
||||||
|
if purged > 0 || failed > 0 {
|
||||||
|
s.logger.Info("user_retention: purge sweep complete",
|
||||||
|
"purged", purged,
|
||||||
|
"failed", failed,
|
||||||
|
"threshold", threshold.Format(time.RFC3339))
|
||||||
|
}
|
||||||
|
return purged, failed, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetentionWindow exposes the configured window for tests + the
|
||||||
|
// operator-facing "when will my account be scrubbed" GUI surface.
|
||||||
|
func (s *UserRetentionService) RetentionWindow() time.Duration {
|
||||||
|
return s.retentionWindow
|
||||||
|
}
|
||||||
|
|
||||||
|
// userdomain is imported so the compiler recognises the type used by
|
||||||
|
// the repository contracts even though this file only consumes pointer
|
||||||
|
// values via the interface. Keep this blank ref so re-organising the
|
||||||
|
// file later doesn't accidentally drop the import.
|
||||||
|
var _ = (*userdomain.User)(nil)
|
||||||
@@ -0,0 +1,302 @@
|
|||||||
|
// Copyright 2026 certctl LLC. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: BUSL-1.1
|
||||||
|
|
||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
sessiondomain "github.com/certctl-io/certctl/internal/auth/session/domain"
|
||||||
|
userdomain "github.com/certctl-io/certctl/internal/auth/user/domain"
|
||||||
|
"github.com/certctl-io/certctl/internal/domain"
|
||||||
|
"github.com/certctl-io/certctl/internal/repository"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Sprint 6 COMP-002-RETENTION unit tests. The repo + session deps are
|
||||||
|
// covered by in-memory stubs in this file; the integration shape
|
||||||
|
// (deactivated_at SQL filter, session revocation in PG) is covered by
|
||||||
|
// the existing postgres tests for those repositories.
|
||||||
|
|
||||||
|
type retentionStubUserRepo struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
byID map[string]*userdomain.User
|
||||||
|
updateOK bool
|
||||||
|
updateCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRetentionStubUserRepo() *retentionStubUserRepo {
|
||||||
|
return &retentionStubUserRepo{
|
||||||
|
byID: make(map[string]*userdomain.User),
|
||||||
|
updateOK: true,
|
||||||
|
updateCh: make(chan struct{}, 100),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) seed(u *userdomain.User) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
r.byID[u.ID] = u
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) Get(_ context.Context, id string) (*userdomain.User, error) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
u, ok := r.byID[id]
|
||||||
|
if !ok {
|
||||||
|
return nil, repository.ErrUserNotFound
|
||||||
|
}
|
||||||
|
cp := *u
|
||||||
|
return &cp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) GetByOIDCSubject(_ context.Context, _, _ string) (*userdomain.User, error) {
|
||||||
|
return nil, repository.ErrUserNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) Create(_ context.Context, _ *userdomain.User) error { return nil }
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) Update(_ context.Context, u *userdomain.User) error {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
if !r.updateOK {
|
||||||
|
return errors.New("retentionStubUserRepo: update disabled")
|
||||||
|
}
|
||||||
|
cp := *u
|
||||||
|
r.byID[u.ID] = &cp
|
||||||
|
r.updateCh <- struct{}{}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) ListAll(_ context.Context, _ string) ([]*userdomain.User, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *retentionStubUserRepo) ListDeactivatedBefore(_ context.Context, threshold time.Time) ([]*userdomain.User, error) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
var out []*userdomain.User
|
||||||
|
for _, u := range r.byID {
|
||||||
|
if u.DeactivatedAt != nil && u.DeactivatedAt.Before(threshold) {
|
||||||
|
cp := *u
|
||||||
|
out = append(out, &cp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type retentionStubSessionRepo struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
revokedActors []string
|
||||||
|
revokeError error
|
||||||
|
}
|
||||||
|
|
||||||
|
// retentionStubSessionRepo satisfies the full repository.SessionRepository
|
||||||
|
// surface. user_retention.go only calls RevokeAllForActor; the other
|
||||||
|
// methods are no-op stubs that exist so the fixture compiles against
|
||||||
|
// the interface.
|
||||||
|
|
||||||
|
func (s *retentionStubSessionRepo) Create(_ context.Context, _ *sessiondomain.Session) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) Get(_ context.Context, _ string) (*sessiondomain.Session, error) {
|
||||||
|
return nil, repository.ErrSessionNotFound
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) ListByActor(_ context.Context, _, _, _ string) ([]*sessiondomain.Session, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) UpdateLastSeen(_ context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) UpdateCSRFTokenHash(_ context.Context, _, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) Revoke(_ context.Context, _ string) error { return nil }
|
||||||
|
func (s *retentionStubSessionRepo) RevokeAllForActor(_ context.Context, actorID, _, _ string) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.revokeError != nil {
|
||||||
|
return s.revokeError
|
||||||
|
}
|
||||||
|
s.revokedActors = append(s.revokedActors, actorID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) RevokeAllExceptForActor(_ context.Context, _, _, _, _ string) (int, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) GarbageCollectExpired(_ context.Context) (int, error) {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
func (s *retentionStubSessionRepo) Delete(_ context.Context, _ string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestUser(id, email, displayName, subject string, deactivated *time.Time) *userdomain.User {
|
||||||
|
return &userdomain.User{
|
||||||
|
ID: id,
|
||||||
|
TenantID: "t-default",
|
||||||
|
Email: email,
|
||||||
|
DisplayName: displayName,
|
||||||
|
OIDCSubject: subject,
|
||||||
|
OIDCProviderID: "op-test",
|
||||||
|
LastLoginAt: time.Now(),
|
||||||
|
WebAuthnCredentials: []byte(`[]`),
|
||||||
|
CreatedAt: time.Now().Add(-90 * 24 * time.Hour),
|
||||||
|
UpdatedAt: time.Now(),
|
||||||
|
DeactivatedAt: deactivated,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func discardLogger() *slog.Logger {
|
||||||
|
return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError + 10}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDeleteUserPII_ScrubsAndRevokes: the operator-facing primitive
|
||||||
|
// nullifies email + display_name, hashes oidc_subject, and revokes
|
||||||
|
// sessions on the affected actor.
|
||||||
|
func TestDeleteUserPII_ScrubsAndRevokes(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
users := newRetentionStubUserRepo()
|
||||||
|
sessions := &retentionStubSessionRepo{}
|
||||||
|
now := time.Now()
|
||||||
|
deact := now.Add(-45 * 24 * time.Hour)
|
||||||
|
users.seed(newTestUser("u-1", "alice@example.com", "Alice", "sub-alice", &deact))
|
||||||
|
|
||||||
|
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0)
|
||||||
|
if err := svc.DeleteUserPII(context.Background(), "u-1"); err != nil {
|
||||||
|
t.Fatalf("DeleteUserPII: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := users.Get(context.Background(), "u-1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("post-purge Get: %v", err)
|
||||||
|
}
|
||||||
|
if got.Email != "purged@redacted.local" {
|
||||||
|
t.Errorf("email not scrubbed: %q", got.Email)
|
||||||
|
}
|
||||||
|
if got.DisplayName != "[purged]" {
|
||||||
|
t.Errorf("display_name not scrubbed: %q", got.DisplayName)
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(got.OIDCSubject, "sha256:") {
|
||||||
|
t.Errorf("oidc_subject not hashed: %q", got.OIDCSubject)
|
||||||
|
}
|
||||||
|
if got.OIDCSubject == "sha256:" {
|
||||||
|
t.Errorf("oidc_subject hash is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
sessions.mu.Lock()
|
||||||
|
if len(sessions.revokedActors) != 1 || sessions.revokedActors[0] != "u-1" {
|
||||||
|
t.Errorf("RevokeAllForActor not called for u-1; got %v", sessions.revokedActors)
|
||||||
|
}
|
||||||
|
sessions.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDeleteUserPII_IsIdempotent: scrubbing an already-scrubbed row
|
||||||
|
// is safe — the oidc_subject hash recurses (sha256 of sha256:hex)
|
||||||
|
// without breaking the UNIQUE constraint or returning an error. We
|
||||||
|
// verify the scrubbed values + the prefix.
|
||||||
|
func TestDeleteUserPII_IsIdempotent(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
users := newRetentionStubUserRepo()
|
||||||
|
sessions := &retentionStubSessionRepo{}
|
||||||
|
now := time.Now()
|
||||||
|
deact := now.Add(-100 * 24 * time.Hour)
|
||||||
|
users.seed(newTestUser("u-2", "bob@example.com", "Bob", "sub-bob", &deact))
|
||||||
|
|
||||||
|
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0)
|
||||||
|
if err := svc.DeleteUserPII(context.Background(), "u-2"); err != nil {
|
||||||
|
t.Fatalf("first DeleteUserPII: %v", err)
|
||||||
|
}
|
||||||
|
first, _ := users.Get(context.Background(), "u-2")
|
||||||
|
if err := svc.DeleteUserPII(context.Background(), "u-2"); err != nil {
|
||||||
|
t.Fatalf("second DeleteUserPII: %v", err)
|
||||||
|
}
|
||||||
|
second, _ := users.Get(context.Background(), "u-2")
|
||||||
|
|
||||||
|
// oidc_subject doesn't get re-hashed (prefix guard).
|
||||||
|
if first.OIDCSubject != second.OIDCSubject {
|
||||||
|
t.Errorf("idempotent re-scrub re-hashed oidc_subject: %q -> %q",
|
||||||
|
first.OIDCSubject, second.OIDCSubject)
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(second.OIDCSubject, "sha256:") {
|
||||||
|
t.Errorf("scrubbed oidc_subject lost prefix: %q", second.OIDCSubject)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPurgeDeactivatedUsers_RespectsWindow: only users whose
|
||||||
|
// deactivated_at is older than now-retentionWindow get scrubbed; rows
|
||||||
|
// deactivated within the window remain intact.
|
||||||
|
func TestPurgeDeactivatedUsers_RespectsWindow(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
users := newRetentionStubUserRepo()
|
||||||
|
sessions := &retentionStubSessionRepo{}
|
||||||
|
now := time.Now()
|
||||||
|
stale := now.Add(-45 * 24 * time.Hour) // past 30d window
|
||||||
|
recent := now.Add(-7 * 24 * time.Hour) // inside 30d window
|
||||||
|
users.seed(newTestUser("u-stale", "stale@example.com", "Stale", "sub-stale", &stale))
|
||||||
|
users.seed(newTestUser("u-recent", "recent@example.com", "Recent", "sub-recent", &recent))
|
||||||
|
users.seed(newTestUser("u-active", "active@example.com", "Active", "sub-active", nil))
|
||||||
|
|
||||||
|
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 0)
|
||||||
|
purged, failed, err := svc.PurgeDeactivatedUsers(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("PurgeDeactivatedUsers: %v", err)
|
||||||
|
}
|
||||||
|
if purged != 1 {
|
||||||
|
t.Errorf("expected 1 row purged, got %d", purged)
|
||||||
|
}
|
||||||
|
if failed != 0 {
|
||||||
|
t.Errorf("expected 0 failures, got %d", failed)
|
||||||
|
}
|
||||||
|
|
||||||
|
staleU, _ := users.Get(context.Background(), "u-stale")
|
||||||
|
if staleU.Email != "purged@redacted.local" {
|
||||||
|
t.Errorf("stale row not scrubbed: %q", staleU.Email)
|
||||||
|
}
|
||||||
|
recentU, _ := users.Get(context.Background(), "u-recent")
|
||||||
|
if recentU.Email != "recent@example.com" {
|
||||||
|
t.Errorf("recent row should not have been scrubbed: %q", recentU.Email)
|
||||||
|
}
|
||||||
|
activeU, _ := users.Get(context.Background(), "u-active")
|
||||||
|
if activeU.Email != "active@example.com" {
|
||||||
|
t.Errorf("active row should not have been scrubbed: %q", activeU.Email)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPurgeDeactivatedUsers_BatchCap caps the per-tick blast radius.
|
||||||
|
func TestPurgeDeactivatedUsers_BatchCap(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
users := newRetentionStubUserRepo()
|
||||||
|
sessions := &retentionStubSessionRepo{}
|
||||||
|
stale := time.Now().Add(-100 * 24 * time.Hour)
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
id := "u-cap-" + string(rune('0'+i))
|
||||||
|
users.seed(newTestUser(id, id+"@example.com", id, "sub-"+id, &stale))
|
||||||
|
}
|
||||||
|
|
||||||
|
svc := NewUserRetentionService(users, sessions, nil, discardLogger(), 30*24*time.Hour, 2)
|
||||||
|
purged, failed, err := svc.PurgeDeactivatedUsers(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("PurgeDeactivatedUsers: %v", err)
|
||||||
|
}
|
||||||
|
if purged != 2 {
|
||||||
|
t.Errorf("expected exactly 2 rows purged (batch cap = 2), got %d", purged)
|
||||||
|
}
|
||||||
|
if failed != 0 {
|
||||||
|
t.Errorf("expected 0 failures, got %d", failed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// _ guard for unused-import lint when one of the helpers above is
|
||||||
|
// removed during future refactors.
|
||||||
|
var _ = domain.ActorTypeUser
|
||||||
Reference in New Issue
Block a user