Files
certctl/internal/service/notification.go
T
shankar0123 6af95ccf5f notifications: per-policy multi-channel expiry-alert routing
Closes Rank 4 of the 2026-05-03 Infisical deep-research deliverable
(see cowork/infisical-deep-research-results.md Part 5). Pre-fix,
RenewalService.CheckExpiringCertificates already ran daily,
RenewalPolicy.AlertThresholdsDays drove per-cert thresholds, and
NotificationService.SendThresholdAlert deduped per (cert, threshold)
— but the channel was hardcoded to Email
(internal/service/notification.go:118 pre-fix). Operators who
configured PagerDuty / Slack / Teams / OpsGenie via
CERTCTL_PAGERDUTY_ROUTING_KEY etc. got nothing at any threshold
unless SMTP was also wired. Their first signal of an expired cert
was a 3 AM outage.

This commit lands the routing matrix on top of the existing
infrastructure:

  1. RenewalPolicy gains AlertChannels (per-tier channel list) +
     AlertSeverityMap (per-threshold tier assignment) +
     EffectiveAlertChannels / EffectiveAlertSeverity accessors.
     Default*() helpers preserve the back-compat Email-only
     behaviour for operators who haven't touched their policies
     post-upgrade. Migration 000026 adds the JSONB columns
     idempotently.
  2. NotificationService.SendThresholdAlertOnChannel — the new
     per-channel dispatch helper. Old SendThresholdAlert stays as
     an Email-only alias so non-policy callers (admin "send test
     alert" surfaces) keep working byte-for-byte.
  3. NotificationService.HasThresholdNotificationOnChannel — per-
     (cert, threshold, channel) deduplication so a transient
     PagerDuty 5xx today does NOT suppress today's Slack alert and
     tomorrow's PagerDuty retry will still fire.
  4. RenewalService.sendThresholdAlerts walks the resolved channel
     set per threshold tier, fans out to every configured channel,
     handles per-channel failures independently, defensively drops
     off-enum channels with an audit row trail, and records a per-
     channel audit event with metadata.channel + metadata.severity_tier.
  5. service.ExpiryAlertMetrics — atomic counter table mirrored on
     the VaultRenewalMetrics shape from the 2026-05-03 audit fix #5
     (commit ceca364). Three labels: channel × threshold × result
     (success / failure / deduped). Cardinality bound: 6 × 4 × 3 =
     72 series for the standard 4-threshold matrix.
  6. handler.MetricsHandler.SetExpiryAlerts wires the Prometheus
     exposer for certctl_expiry_alerts_total{channel,threshold,result}.
     Pre-sorted snapshot for byte-stable emission.
  7. cmd/server/main.go threads ONE service.ExpiryAlertMetrics
     instance through both the recording side (notificationService.
     SetExpiryAlertMetrics) and the exposing side
     (metricsHandler.SetExpiryAlerts).

Dispatch flow (post-fix, per renewal-loop tick):

  cert ages past T-30  → daily renewal-loop fires
                       → policy lookup
                       → for each crossed threshold:
                           - resolve severity tier (informational/
                             warning/critical) via AlertSeverityMap
                           - look up channel set in AlertChannels[tier]
                           - for each channel: dedup → SendThresholdAlertOnChannel
                             → notifierRegistry[channel] → audit row →
                             Prometheus counter increment

Tests (internal/service/renewal_expiry_alerts_test.go):

  TestExpiryAlerts_DefaultMatrix_EmailOnly
  TestExpiryAlerts_PerTierFanOut
  TestExpiryAlerts_PerChannelDedup
  TestExpiryAlerts_OneChannelFails_OthersStillFire
  TestExpiryAlerts_OffEnumChannelDropped
  TestExpiryAlerts_MetricCounterIncrements
  TestExpiryAlerts_NilPolicy_FallsToDefault
  TestExpiryAlerts_OperatorOptOutOfTier

The PerTierFanOut test wires 6 mock notifiers, drives a cert at 0
days through the canonical 4 thresholds with the matrix
{informational:[Slack], warning:[Slack,Email],
critical:[PagerDuty,OpsGenie,Email]}, and asserts the exact
recipient counts: Slack=3, Email=3, PagerDuty=1, OpsGenie=1, no
Teams, no Webhook. The OneChannelFails test pins that PagerDuty
returning a 503 does NOT skip Slack/Email at the same threshold.

Drive-by fix (internal/service/testutil_test.go): the existing
mockNotifRepo.List ignored its filter and returned all rows, which
let legacy tests pass on dedup-via-substring even though the
postgres repo actually applied the filter. Updated the mock to
honour CertificateID / Type / Status / Channel / MessageLike
filters in the same shape as the postgres implementation
(internal/repository/postgres/notification.go). All pre-existing
service tests still pass — the legacy test suite happened to be
robust to the mock filter doing nothing.

Documentation:
  - docs/connectors.md Notifier section gains "Routing expiry
    alerts across channels" — operator-facing, JSON example,
    procurement playbook ("How do I make sure PagerDuty pages on
    the T-1 alert?"), debug recipe via SQL on audit_events +
    notification_events + Prometheus.
  - docs/runbook-expiry-alerts.md — sysadmin-grade flowchart,
    per-policy channel-matrix configuration recipes, "did the on-
    call team get paged?" SQL queries, cardinality budget, V3-Pro
    forward path.
  - cowork/WORKSPACE-ROADMAP.md gains "Multi-channel expiry
    alerts: per-owner routing" V3-Pro entry under Adapter
    hardening.

Out of scope (intentional, flagged in V3-Pro forward path):
  - Per-owner / per-team / per-tenant channel routing (matrix is
    per-policy today, not per-owner).
  - Calendar-aware suppression (no T-30 alerts on weekends).
  - Escalation chains (T-1 unanswered for 30m → escalate).
  - Per-channel rate limiting (downstream of I-005 retry+DLQ).

CHANGELOG.md is intentionally not hand-edited per CHANGELOG.md
itself ("no longer maintains a hand-edited per-version changelog;
per-release notes are auto-generated from commit messages between
consecutive tags").

Verified locally:
- gofmt clean.
- go vet ./internal/domain/... ./internal/service/...
  ./internal/api/handler/... ./cmd/server/...  clean.
  (./internal/repository/postgres/... vet failed on transitive
  testcontainers/docker module download — sandbox disk pressure,
  not a code issue; postgres-repo build succeeds and tests pass.)
- go test -short -count=1 ./internal/domain/...
  ./internal/service/... ./internal/api/handler/...  green.
- go test -race -count=10 -run 'TestExpiryAlerts'
  ./internal/service/...  green (per-channel dedup race-free).

Reference: cowork/infisical-deep-research-results.md Part 5 Rank 4.
Acquisition prompt: cowork/rank-4-multichannel-expiry-alerts-prompt.md.
2026-05-03 22:12:32 +00:00

735 lines
29 KiB
Go

package service
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/shankar0123/certctl/internal/domain"
"github.com/shankar0123/certctl/internal/repository"
)
// I-005 retry + DLQ knobs. These pin the operator-approved retry budget and
// the defense-in-depth ceiling on the exponential backoff curve used by
// RetryFailedNotifications.
//
// Values match those the Phase 1 Red tests assert against (see
// i005MaxAttempts / i005BackoffCap in notification_test.go:600-608) — the
// production identifiers are distinct because this file and its tests share
// `package service`, so a single shared name would collide at compile time.
// The test comment explicitly notes "Phase 2 is free to thread this from
// config"; when that wiring lands, these become package-level defaults the
// scheduler can override. For now they are the single source of truth.
const (
// notifRetryMaxAttempts is the attempt budget *before* the current
// attempt: a row at retry_count == notifRetryMaxAttempts-1 that fails
// this tick transitions to 'dead' instead of being re-armed. The
// repository's ListRetryEligible filter also uses this value as a
// guard (`AND retry_count < $2`) so a DLQ row is never re-swept.
notifRetryMaxAttempts = 5
// notifRetryBackoffCap is the 1h ceiling on `2^retry_count` minutes.
// With max_attempts=5 the deepest actually-schedulable wait is 2^3=8m
// (retry_count=3 → 8m, then retry_count=4 → 'dead'), so the cap is a
// ceiling-assertion today — but it must stay in place so a later
// increase in max_attempts cannot push next_retry_at past 1h without
// an explicit policy decision.
notifRetryBackoffCap = time.Hour
// notifRetrySweepLimit caps a single retry tick at this many rows so
// a large burst of dead-letter-bound mail cannot monopolize the 2m
// tick budget. Mirrors the 1000-row cap on ProcessPendingNotifications
// at notification.go:244 for operational symmetry.
notifRetrySweepLimit = 1000
)
// NotificationService provides business logic for managing notifications.
type NotificationService struct {
notifRepo repository.NotificationRepository
ownerRepo repository.OwnerRepository
notifierRegistry map[string]Notifier
// expiryAlertMetrics — when set via SetExpiryAlertMetrics, every call
// to SendThresholdAlertOnChannel reports its outcome (success / failure)
// to the metric sink so the Prometheus exposer surfaces
// certctl_expiry_alerts_total{channel,threshold,result}. Rank 4 of the
// 2026-05-03 Infisical deep-research deliverable. Nil leaves the
// dispatch path unchanged (no metric emission, but alerts still fire).
expiryAlertMetrics ExpiryAlertRecorder
}
// ExpiryAlertRecorder is the metric-sink surface SendThresholdAlertOnChannel
// uses. result is one of: "success", "failure", "deduped". Implementations
// MUST be goroutine-safe — RecordExpiryAlert is called from the renewal
// loop's own goroutine on every threshold-channel tick.
//
// service.ExpiryAlertMetrics satisfies this interface. cmd/server wires
// the same instance into the service (recording side) and into
// MetricsHandler (exposing side, for the Prometheus emitter).
type ExpiryAlertRecorder interface {
RecordExpiryAlert(channel string, threshold int, result string)
}
// SetExpiryAlertMetrics wires the per-(channel, threshold, result) counter
// table for expiry-alert dispatch. Pass nil to disable recording. Safe to
// call before any SendThresholdAlertOnChannel call; calling later just
// means earlier calls didn't increment the counters.
func (s *NotificationService) SetExpiryAlertMetrics(r ExpiryAlertRecorder) {
s.expiryAlertMetrics = r
}
// recordExpiryAlert is the internal hook used by SendThresholdAlertOnChannel
// to report per-(channel, threshold, result) counts. Nil-safe.
func (s *NotificationService) recordExpiryAlert(channel string, threshold int, result string) {
if s == nil || s.expiryAlertMetrics == nil {
return
}
s.expiryAlertMetrics.RecordExpiryAlert(channel, threshold, result)
}
// RecordExpiryAlertDeduped is the public hook RenewalService uses to report
// (channel, threshold, "deduped") — dedup happens before
// SendThresholdAlertOnChannel runs, so the call site is in the caller, not
// the dispatch helper. Kept on NotificationService rather than exposed on
// the recorder directly so callers don't need to know whether the recorder
// is wired.
func (s *NotificationService) RecordExpiryAlertDeduped(channel string, threshold int) {
s.recordExpiryAlert(channel, threshold, "deduped")
}
// Notifier defines the interface for notification channels (email, Slack, webhooks, etc.).
type Notifier interface {
// Send delivers a notification and returns error if unsuccessful.
Send(ctx context.Context, recipient string, subject string, body string) error
// Channel returns the channel identifier (e.g., "email", "slack").
Channel() string
}
// NewNotificationService creates a new notification service.
func NewNotificationService(
notifRepo repository.NotificationRepository,
notifierRegistry map[string]Notifier,
) *NotificationService {
return &NotificationService{
notifRepo: notifRepo,
notifierRegistry: notifierRegistry,
}
}
// SetOwnerRepo sets the owner repository for email resolution.
// Called after construction to avoid circular dependency during initialization.
func (s *NotificationService) SetOwnerRepo(ownerRepo repository.OwnerRepository) {
s.ownerRepo = ownerRepo
}
// resolveRecipient resolves an owner ID to an email address.
// Falls back to the raw owner ID if the owner repo is not set or lookup fails.
func (s *NotificationService) resolveRecipient(ctx context.Context, ownerID string) string {
if s.ownerRepo == nil || ownerID == "" {
return ownerID
}
owner, err := s.ownerRepo.Get(ctx, ownerID)
if err != nil || owner == nil || owner.Email == "" {
return ownerID
}
return owner.Email
}
// SendExpirationWarning sends a certificate expiration warning for a specific threshold.
func (s *NotificationService) SendExpirationWarning(ctx context.Context, cert *domain.ManagedCertificate, daysUntilExpiry int) error {
return s.SendThresholdAlert(ctx, cert, daysUntilExpiry, daysUntilExpiry)
}
// SendThresholdAlert sends an expiration alert for a specific threshold via
// the Email channel. Preserved for backwards-compat with non-policy callers
// (admin "send test alert" surfaces in the GUI, etc.); equivalent to
// SendThresholdAlertOnChannel(ctx, cert, days, threshold,
// domain.NotificationChannelEmail).
//
// Policy-driven dispatch in RenewalService.sendThresholdAlerts uses
// SendThresholdAlertOnChannel directly with the channel resolved from the
// per-policy AlertChannels matrix. Rank 4 of the 2026-05-03 Infisical
// deep-research deliverable.
func (s *NotificationService) SendThresholdAlert(ctx context.Context, cert *domain.ManagedCertificate, daysUntilExpiry int, threshold int) error {
return s.SendThresholdAlertOnChannel(ctx, cert, daysUntilExpiry, threshold, domain.NotificationChannelEmail)
}
// SendThresholdAlertOnChannel sends an expiration alert for a specific
// (cert, threshold, channel) triple. The channel must be one of the
// closed-enum NotificationChannel values; off-enum channels surface as a
// failure metric increment + ERROR log + a wrapped error so the caller can
// react (typically: log and continue with the next channel in the
// policy's tier list — see RenewalService.sendThresholdAlerts).
//
// The notification record is persisted with the channel field set to the
// requested value, and the message body carries the [threshold:N] tag for
// dedup at HasThresholdNotification's substring filter. Combined with the
// repository.NotificationFilter.Channel field, this gives us per-(cert,
// threshold, channel) dedup so a transient PagerDuty 5xx today does NOT
// suppress today's Slack delivery and tomorrow's PagerDuty retry will
// still fire.
//
// Result is reported to expiryAlertMetrics (when wired): "success" on
// successful send, "failure" on send error or persistence error.
// "deduped" results are reported by the caller (sendThresholdAlerts) since
// dedup happens before this method runs.
func (s *NotificationService) SendThresholdAlertOnChannel(
ctx context.Context, cert *domain.ManagedCertificate, daysUntilExpiry int,
threshold int, channel domain.NotificationChannel,
) error {
if !domain.IsValidNotificationChannel(string(channel)) {
s.recordExpiryAlert(string(channel), threshold, "failure")
return fmt.Errorf("invalid notification channel %q for threshold %d", channel, threshold)
}
var body string
if threshold <= 0 {
body = fmt.Sprintf(
"[EXPIRED] The certificate for %s has expired (%s).\n\nImmediate action required.\n\n[threshold:%d]",
cert.CommonName, cert.ExpiresAt.Format("2006-01-02"), threshold,
)
} else {
body = fmt.Sprintf(
"The certificate for %s will expire in %d days (%s).\n\nPlease schedule renewal.\n\n[threshold:%d]",
cert.CommonName, daysUntilExpiry, cert.ExpiresAt.Format("2006-01-02"), threshold,
)
}
notif := &domain.NotificationEvent{
ID: generateID("notif"),
CertificateID: &cert.ID,
Type: domain.NotificationTypeExpirationWarning,
Channel: channel,
Recipient: s.resolveRecipient(ctx, cert.OwnerID),
Message: body,
Status: "pending",
CreatedAt: time.Now(),
}
if err := s.notifRepo.Create(ctx, notif); err != nil {
s.recordExpiryAlert(string(channel), threshold, "failure")
return fmt.Errorf("failed to create notification: %w", err)
}
if err := s.sendNotification(ctx, notif); err != nil {
s.recordExpiryAlert(string(channel), threshold, "failure")
return err
}
s.recordExpiryAlert(string(channel), threshold, "success")
return nil
}
// HasThresholdNotification checks whether an expiration warning has already
// been sent for a specific (cert, threshold) pair via the Email channel.
// Preserved for backwards-compat. Equivalent to
// HasThresholdNotificationOnChannel(ctx, certID, threshold, "Email").
//
// New callers driven by the per-policy channel matrix should use
// HasThresholdNotificationOnChannel directly with the explicit channel —
// see RenewalService.sendThresholdAlerts.
func (s *NotificationService) HasThresholdNotification(ctx context.Context, certID string, threshold int) (bool, error) {
return s.HasThresholdNotificationOnChannel(ctx, certID, threshold, domain.NotificationChannelEmail)
}
// HasThresholdNotificationOnChannel reports whether an ExpirationWarning
// notification has already been persisted for a specific (cert, threshold,
// channel) triple. Used to dedupe per-channel fan-out so a successful
// PagerDuty page today doesn't fire again tomorrow when the renewal loop
// re-checks the same threshold (and so a transient PagerDuty 5xx today
// doesn't suppress tomorrow's successful retry).
//
// The match is on the substring "[threshold:N]" in the stored message body
// (the same dedup pattern used by HasThresholdNotification pre-2026-05-03)
// AND the channel column. Both filters apply; a match requires both.
//
// channel == "" preserves the legacy (cert, threshold) dedup for the same
// reason HasThresholdNotification kept its old shape — admin-surface
// callers still need that behaviour.
func (s *NotificationService) HasThresholdNotificationOnChannel(
ctx context.Context, certID string, threshold int, channel domain.NotificationChannel,
) (bool, error) {
filter := &repository.NotificationFilter{
CertificateID: certID,
Type: string(domain.NotificationTypeExpirationWarning),
MessageLike: fmt.Sprintf("%%[threshold:%d]%%", threshold),
Channel: string(channel),
PerPage: 1,
}
existing, err := s.notifRepo.List(ctx, filter)
if err != nil {
return false, fmt.Errorf("failed to check existing notifications: %w", err)
}
return len(existing) > 0, nil
}
// SendRenewalNotification sends a renewal success or failure notification.
func (s *NotificationService) SendRenewalNotification(ctx context.Context, cert *domain.ManagedCertificate, success bool, err error) error {
var body string
if success {
body = fmt.Sprintf(
"The certificate for %s has been successfully renewed.\n\nNew expiry: %s",
cert.CommonName, cert.ExpiresAt.Format("2006-01-02"),
)
} else {
body = fmt.Sprintf(
"The certificate for %s failed to renew.\n\nError: %v\n\nPlease investigate.",
cert.CommonName, err,
)
}
var notifType domain.NotificationType
if success {
notifType = domain.NotificationTypeRenewalSuccess
} else {
notifType = domain.NotificationTypeRenewalFailure
}
notif := &domain.NotificationEvent{
ID: generateID("notif"),
CertificateID: &cert.ID,
Type: notifType,
Channel: domain.NotificationChannelEmail,
Recipient: s.resolveRecipient(ctx, cert.OwnerID),
Message: body,
Status: "pending",
CreatedAt: time.Now(),
}
if err := s.notifRepo.Create(ctx, notif); err != nil {
return fmt.Errorf("failed to create notification: %w", err)
}
return s.sendNotification(ctx, notif)
}
// SendDeploymentNotification sends a deployment success or failure notification.
func (s *NotificationService) SendDeploymentNotification(ctx context.Context, cert *domain.ManagedCertificate, target *domain.DeploymentTarget, success bool, err error) error {
var body string
if success {
body = fmt.Sprintf(
"The certificate for %s has been successfully deployed to %s.",
cert.CommonName, target.Name,
)
} else {
body = fmt.Sprintf(
"The certificate for %s failed to deploy to %s.\n\nError: %v\n\nPlease investigate.",
cert.CommonName, target.Name, err,
)
}
notifType := domain.NotificationTypeDeploymentSuccess
if !success {
notifType = domain.NotificationTypeDeploymentFailure
}
notif := &domain.NotificationEvent{
ID: generateID("notif"),
CertificateID: &cert.ID,
Type: notifType,
Channel: domain.NotificationChannelEmail,
Recipient: s.resolveRecipient(ctx, cert.OwnerID),
Message: body,
Status: "pending",
CreatedAt: time.Now(),
}
if err := s.notifRepo.Create(ctx, notif); err != nil {
return fmt.Errorf("failed to create notification: %w", err)
}
return s.sendNotification(ctx, notif)
}
// SendRevocationNotification sends a certificate revocation notification.
func (s *NotificationService) SendRevocationNotification(ctx context.Context, cert *domain.ManagedCertificate, reason string) error {
body := fmt.Sprintf(
"[REVOKED] The certificate for %s has been revoked.\n\nReason: %s\n\nThis certificate is no longer valid.",
cert.CommonName, reason,
)
notif := &domain.NotificationEvent{
ID: generateID("notif"),
CertificateID: &cert.ID,
Type: domain.NotificationTypeRevocation,
Channel: domain.NotificationChannelWebhook,
Recipient: s.resolveRecipient(ctx, cert.OwnerID),
Message: body,
Status: "pending",
CreatedAt: time.Now(),
}
if err := s.notifRepo.Create(ctx, notif); err != nil {
return fmt.Errorf("failed to create revocation notification: %w", err)
}
// Also send via email channel
emailNotif := &domain.NotificationEvent{
ID: generateID("notif"),
CertificateID: &cert.ID,
Type: domain.NotificationTypeRevocation,
Channel: domain.NotificationChannelEmail,
Recipient: s.resolveRecipient(ctx, cert.OwnerID),
Message: body,
Status: "pending",
CreatedAt: time.Now(),
}
if err := s.notifRepo.Create(ctx, emailNotif); err != nil {
slog.Error("failed to create email revocation notification", "error", err)
}
// Attempt immediate send for both
if err := s.sendNotification(ctx, notif); err != nil {
slog.Error("failed to send webhook revocation notification", "error", err)
}
return s.sendNotification(ctx, emailNotif)
}
// ProcessPendingNotifications sends all pending notifications in batch.
func (s *NotificationService) ProcessPendingNotifications(ctx context.Context) error {
filter := &repository.NotificationFilter{
Status: "pending",
PerPage: 1000,
}
pending, err := s.notifRepo.List(ctx, filter)
if err != nil {
return fmt.Errorf("failed to list pending notifications: %w", err)
}
var failedCount int
for _, notif := range pending {
if err := s.sendNotification(ctx, notif); err != nil {
slog.Error("failed to send notification", "notification_id", notif.ID, "error", err)
failedCount++
}
}
if failedCount > 0 {
return fmt.Errorf("failed to send %d out of %d notifications", failedCount, len(pending))
}
return nil
}
// sendNotification delivers a single notification via the appropriate channel.
func (s *NotificationService) sendNotification(ctx context.Context, notif *domain.NotificationEvent) error {
// Get the appropriate notifier for the channel
notifier, ok := s.notifierRegistry[string(notif.Channel)]
if !ok {
// No notifier configured for this channel — mark as sent (demo mode)
if updateErr := s.notifRepo.UpdateStatus(ctx, notif.ID, "sent", time.Now()); updateErr != nil {
slog.Error("failed to update notification status", "notification_id", notif.ID, "error", updateErr)
}
return nil
}
// Send the notification
if err := notifier.Send(ctx, notif.Recipient, string(notif.Type), notif.Message); err != nil {
// Update status to failed
if updateErr := s.notifRepo.UpdateStatus(ctx, notif.ID, "failed", time.Time{}); updateErr != nil {
slog.Error("failed to update notification status", "notification_id", notif.ID, "error", updateErr)
}
return fmt.Errorf("failed to send via %s: %w", notif.Channel, err)
}
// Update status to sent
if err := s.notifRepo.UpdateStatus(ctx, notif.ID, "sent", time.Now()); err != nil {
slog.Error("failed to update notification status", "notification_id", notif.ID, "error", err)
}
return nil
}
// RegisterNotifier registers a new notification channel handler.
func (s *NotificationService) RegisterNotifier(channel string, notifier Notifier) {
if s.notifierRegistry == nil {
s.notifierRegistry = make(map[string]Notifier)
}
s.notifierRegistry[channel] = notifier
}
// GetNotificationHistory returns all notifications for a certificate.
func (s *NotificationService) GetNotificationHistory(ctx context.Context, certID string) ([]*domain.NotificationEvent, error) {
filter := &repository.NotificationFilter{
CertificateID: certID,
PerPage: 1000,
}
notifications, err := s.notifRepo.List(ctx, filter)
if err != nil {
return nil, fmt.Errorf("failed to list notifications: %w", err)
}
return notifications, nil
}
// ListNotifications returns paginated notifications (handler interface method).
func (s *NotificationService) ListNotifications(ctx context.Context, page, perPage int) ([]domain.NotificationEvent, int64, error) {
if page < 1 {
page = 1
}
if perPage < 1 {
perPage = 50
}
filter := &repository.NotificationFilter{
Page: page,
PerPage: perPage,
}
notifications, err := s.notifRepo.List(ctx, filter)
if err != nil {
return nil, 0, fmt.Errorf("failed to list notifications: %w", err)
}
var result []domain.NotificationEvent
for _, n := range notifications {
if n != nil {
result = append(result, *n)
}
}
total := int64(len(result))
return result, total, nil
}
// GetNotification returns a single notification (handler interface method).
func (s *NotificationService) GetNotification(ctx context.Context, id string) (*domain.NotificationEvent, error) {
filter := &repository.NotificationFilter{
PerPage: 1,
}
notifications, err := s.notifRepo.List(ctx, filter)
if err != nil {
return nil, fmt.Errorf("failed to get notification: %w", err)
}
// Find notification with matching ID (repository filter doesn't support ID directly)
for _, n := range notifications {
if n != nil && n.ID == id {
return n, nil
}
}
return nil, fmt.Errorf("notification not found")
}
// MarkAsRead marks a notification as read (handler interface method).
func (s *NotificationService) MarkAsRead(ctx context.Context, id string) error {
return s.notifRepo.UpdateStatus(ctx, id, "read", time.Now())
}
// ─── I-005 retry + DLQ surface (Phase 2 Green) ───────────────────────────
//
// The three methods below close the retry loop the Phase 1 Red tests pin at
// notification_test.go:600-917 and notification_handler_test.go:443-519:
//
// 1. RetryFailedNotifications — scheduler entry point. Pulls failed rows
// whose next_retry_at has elapsed, retries delivery, rewrites retry
// bookkeeping per the pre-increment backoff contract, and transitions
// exhausted rows to 'dead' (DLQ). Per-row errors never bubble — a
// single bad recipient cannot stall the tick. Mirrors the ordering
// the ProcessPendingNotifications loop uses at notification.go:242.
//
// 2. RequeueNotification — operator-driven escape hatch from 'dead' back
// to 'pending'. Pass-through to the repo's Requeue method with clean
// error wrapping so repo-layer failures ("pg: deadlock detected")
// surface in the UI instead of silently succeeding.
//
// 3. ListNotificationsByStatus — Dead letter tab support. Thin filter
// wrapper around the existing List query; the Phase 2 Green handler
// routes `?status=…` through this method while preserving the
// unfiltered path through ListNotifications (handler_test pins both).
//
// Sibling scheduler loops I-001 (job retry) and I-003 (job timeout) already
// ship the 10-loop topology these methods plug into; the 11th loop added
// by this milestone calls RetryFailedNotifications on a 2m tick, matching
// the CERTCTL_NOTIFICATION_RETRY_INTERVAL default pinned in config/
// scheduler Phase 2 Green edits that follow this one.
// RetryFailedNotifications is the scheduler entry point for the I-005
// retry sweep. Semantics (pinned by notification_test.go:635-843):
//
// - A ListRetryEligible failure short-circuits with a wrapped error so
// the caller's tick counter reflects the outage. Crucially, zero
// notifier.Send calls fire in this path — we never got a canonical
// set of rows, and issuing any sends risks double-delivery when the
// DB comes back.
//
// - Per-row failures are logged but NEVER returned. That contract comes
// straight from ProcessPendingNotifications (notification.go:242-267);
// the retry loop inherits it so a single 4xx response can't freeze
// every downstream row in the sweep.
//
// - Success promotes the row directly to 'sent' via UpdateStatus. The
// retry_count field is *not* incremented on success — that would
// falsify the audit-trail signal "this row was delivered on attempt
// N". The mock's UpdateStatus does a plain status write with no retry
// mutation (testutil_test.go:446-459), matching the postgres impl.
//
// - Failure uses pre-increment exponential backoff:
// wait = min(2^retry_count * time.Minute, notifRetryBackoffCap)
// where retry_count is the row's value *before* this attempt. The
// repo layer's RecordFailedAttempt then increments retry_count by 1
// server-side. This asymmetry keeps the service stateless — the
// service reads retry_count to compute the wait, but never writes it
// directly; the write is exclusively the repo's responsibility.
//
// - Exhaustion transitions to 'dead' when retry_count == max-1, because
// RecordFailedAttempt's ++ would push retry_count to max and the next
// sweep's `retry_count < max` filter in ListRetryEligible would then
// silently skip the row forever (a zombie-failed row nobody sees).
// MarkAsDead clears next_retry_at to evict the row from the partial
// retry-sweep index as well, so it stops scanning past dead rows.
//
// - A row whose Channel has no registered notifier is promoted to
// 'sent' (demo-mode parity with sendNotification's fallback at
// notification.go:272-279). This branch should not normally fire for
// retry rows — they were created *by* a notifier that failed — but
// defensive handling guards against config drift (notifier disabled
// between Create and retry) that would otherwise wedge the row.
func (s *NotificationService) RetryFailedNotifications(ctx context.Context) error {
now := time.Now()
rows, err := s.notifRepo.ListRetryEligible(ctx, now, notifRetryMaxAttempts, notifRetrySweepLimit)
if err != nil {
return fmt.Errorf("failed to list retry-eligible notifications: %w", err)
}
for _, row := range rows {
if row == nil {
continue
}
notifier, ok := s.notifierRegistry[string(row.Channel)]
if !ok {
// No notifier wired for this channel — promote to 'sent' to
// avoid looping forever over a row that has nowhere to go.
// See notification.go:272-279 for the sibling demo-mode path.
if updateErr := s.notifRepo.UpdateStatus(ctx, row.ID, string(domain.NotificationStatusSent), time.Now()); updateErr != nil {
slog.Error("failed to promote retry row with missing notifier to sent",
"notification_id", row.ID, "channel", row.Channel, "error", updateErr)
}
continue
}
sendErr := notifier.Send(ctx, row.Recipient, string(row.Type), row.Message)
if sendErr == nil {
// Success: promote straight to 'sent' without touching
// retry_count — the audit trail must preserve "this row was
// delivered on attempt N", and the mock's UpdateStatus is a
// plain status write (no retry_count reset). Errors here are
// logged, never returned.
if updateErr := s.notifRepo.UpdateStatus(ctx, row.ID, string(domain.NotificationStatusSent), time.Now()); updateErr != nil {
slog.Error("failed to mark retried notification as sent",
"notification_id", row.ID, "error", updateErr)
}
continue
}
// Failure path. Compute pre-increment backoff first so the
// exhaustion branch and the reschedule branch see an identical
// `wait` derivation — easier to audit against the test window
// assertions at notification_test.go:739-743 and :796-801.
wait := time.Duration(1<<row.RetryCount) * time.Minute
if wait > notifRetryBackoffCap {
wait = notifRetryBackoffCap
}
// Exhaustion: this attempt consumes the final slot of the attempt
// budget. Transition to 'dead' and let MarkAsDead clear
// next_retry_at so the retry-sweep index stops hitting the row.
if row.RetryCount >= notifRetryMaxAttempts-1 {
if markErr := s.notifRepo.MarkAsDead(ctx, row.ID, sendErr.Error()); markErr != nil {
slog.Error("failed to mark exhausted notification as dead",
"notification_id", row.ID, "retry_count", row.RetryCount,
"send_error", sendErr, "mark_error", markErr)
}
continue
}
// Non-terminal: hand the lastError + nextRetryAt off to the repo,
// which increments retry_count by exactly 1 and keeps the row in
// 'failed' state so the next tick picks it up.
nextRetryAt := time.Now().Add(wait)
if recErr := s.notifRepo.RecordFailedAttempt(ctx, row.ID, sendErr.Error(), nextRetryAt); recErr != nil {
slog.Error("failed to record notification retry attempt",
"notification_id", row.ID, "retry_count", row.RetryCount,
"next_retry_at", nextRetryAt, "send_error", sendErr, "record_error", recErr)
}
}
return nil
}
// RequeueNotification is the operator-driven escape hatch from 'dead' back
// to 'pending'. It resets all retry bookkeeping — retry_count → 0,
// next_retry_at → NULL, last_error → NULL — so ProcessPendingNotifications
// treats the requeued row as a fresh attempt on its next tick. Identical on
// the wire to a newly-created notification.
//
// Behavior contract (pinned by notification_test.go:849-917):
//
// - Success path delegates to the repo's Requeue, which performs the
// status/retry_count/next_retry_at/last_error reset atomically. The
// service adds no extra bookkeeping; the audit trail already captures
// the transition via the upstream API call.
//
// - Error path wraps the repo error with context so a failure like
// "pg: deadlock detected" surfaces in the handler response and the
// operator UI. The service has no fallback — a silent "success" that
// didn't actually mutate the row would be worse than a loud error.
func (s *NotificationService) RequeueNotification(ctx context.Context, id string) error {
if err := s.notifRepo.Requeue(ctx, id); err != nil {
return fmt.Errorf("failed to requeue notification: %w", err)
}
return nil
}
// ListNotificationsByStatus returns paginated notifications filtered by
// status. It mirrors ListNotifications's shape but threads a Status filter
// into the NotificationFilter so the Phase 2 Green handler can route
// `?status=dead` (Dead letter tab) through this method while keeping the
// unfiltered path on ListNotifications for backward compat.
//
// Pinned by notification_handler_test.go:443-519 — the handler test asserts
// that a request with `?status=dead&page=1&per_page=50` lands on exactly
// this signature (`status string, page, perPage int`) and that requests
// without a status param do NOT call it. Keep the returned shape identical
// to ListNotifications so the handler can reuse its JSON-encoding path.
func (s *NotificationService) ListNotificationsByStatus(ctx context.Context, status string, page, perPage int) ([]domain.NotificationEvent, int64, error) {
if page < 1 {
page = 1
}
if perPage < 1 {
perPage = 50
}
filter := &repository.NotificationFilter{
Status: status,
Page: page,
PerPage: perPage,
}
notifications, err := s.notifRepo.List(ctx, filter)
if err != nil {
return nil, 0, fmt.Errorf("failed to list notifications by status: %w", err)
}
result := make([]domain.NotificationEvent, 0, len(notifications))
for _, n := range notifications {
if n != nil {
result = append(result, *n)
}
}
total := int64(len(result))
return result, total, nil
}