I-005: notification retry loop + dead-letter queue

Critical alerts can no longer be silently dropped by a transient
notifier failure. Failed notification attempts now ride an exponential
backoff retry loop, with a 5-attempt budget before promotion to the
dead-letter queue for operator intervention.

Schema (migration 000016, idempotent):
- retry_count INTEGER NOT NULL DEFAULT 0
- next_retry_at TIMESTAMPTZ
- last_error TEXT
- idx_notification_events_retry_sweep partial index
  (next_retry_at) WHERE status='failed' AND next_retry_at IS NOT NULL
  Dead rows clear next_retry_at so the index stops matching them.

Service contract:
- NotificationService.RetryFailedNotifications drives 2^n-minute
  exponential backoff capped at 1h (notifRetryBackoffCap) with
  5-attempt budget (notifRetryMaxAttempts).
- Exhaustion (RetryCount >= notifRetryMaxAttempts-1) promotes to
  status='dead' via MarkAsDead.
- Non-terminal failures record via RecordFailedAttempt.
- Success path promotes to 'sent' without touching retry_count
  (audit preserves "delivered on attempt N").
- Missing-notifier branch defensively promotes to 'sent' to avoid
  wedging a row on a deleted channel.
- RequeueNotification operator escape hatch atomically resets
  retry_count -> 0, next_retry_at -> NULL, last_error -> NULL,
  status -> pending via notifRepo.Requeue.

Scheduler:
- New always-on notificationRetryLoop wired into the base loop set at
  CERTCTL_NOTIFICATION_RETRY_INTERVAL (default 2m).
- sync/atomic.Bool idempotency guard.
- sync.WaitGroup shutdown drain via WaitForCompletion.

StatsService:
- SetNotifRepo setter pattern preserves 9 pre-existing
  NewStatsService call sites (main.go + stats_test.go + 8 digest
  tests) without touching the constructor signature.
- DashboardSummary.NotificationsDead populated via
  notifRepo.CountByStatus(ctx, "dead") — nil-safe when unwired
  (reports zero on systems without a notification repository).
- CountByStatus error is non-fatal (dashboard summary is
  best-effort for this field).
- Prometheus certctl_notification_dead_total counter emitted from
  the same snapshot.

Handler:
- New POST /api/v1/notifications/{id}/requeue endpoint.
- dead status surfaces to MCP + CLI.

Frontend:
- NotificationsPage gains two-tab toolbar ("All" / "Dead letter")
  with queryKey: ['notifications', activeTab] so switching tabs
  doesn't serve stale data until the 30s refetch.
- Dead rows surface "Retry {n}/5" + truncated last_error with
  full-text title tooltip.
- Requeue mutation wrapped as
    mutationFn: (id: string) => requeueNotification(id)
  to prevent react-query v5's positional context argument from
  leaking into the API client — pinned against future refactors
  by strict-match toHaveBeenCalledWith('notif-dead-001') in
  NotificationsPage.test.tsx:181.

Closes I-005.
This commit is contained in:
shankar0123
2026-04-19 15:17:27 +00:00
parent 707d8de4fb
commit 675b87ba63
33 changed files with 3758 additions and 228 deletions
+52 -28
View File
@@ -41,20 +41,26 @@ type MetricsResponse struct {
// MetricsGauge represents gauge metrics (point-in-time values).
type MetricsGauge struct {
CertificateTotal int64 `json:"certificate_total"`
CertificateActive int64 `json:"certificate_active"`
CertificateExpiringSoon int64 `json:"certificate_expiring_soon"` // Within 30d
CertificateExpired int64 `json:"certificate_expired"`
CertificateRevoked int64 `json:"certificate_revoked"`
AgentTotal int64 `json:"agent_total"`
AgentOnline int64 `json:"agent_online"`
JobPending int64 `json:"job_pending"`
CertificateTotal int64 `json:"certificate_total"`
CertificateActive int64 `json:"certificate_active"`
CertificateExpiringSoon int64 `json:"certificate_expiring_soon"` // Within 30d
CertificateExpired int64 `json:"certificate_expired"`
CertificateRevoked int64 `json:"certificate_revoked"`
AgentTotal int64 `json:"agent_total"`
AgentOnline int64 `json:"agent_online"`
JobPending int64 `json:"job_pending"`
}
// MetricsCounter represents counter metrics (cumulative values).
type MetricsCounter struct {
JobCompletedTotal int64 `json:"job_completed_total"`
JobFailedTotal int64 `json:"job_failed_total"`
// NotificationsDeadTotal is a point-in-time count of notifications in the
// dead-letter queue (status="dead"), exposed here with the _total suffix
// to match Prometheus DB-snapshot counter convention (same semantics as
// JobFailedTotal and JobCompletedTotal — see metrics.md). I-005 DLQ
// observability gate.
NotificationsDeadTotal int64 `json:"notifications_dead_total"`
}
// UptimeMetric represents server uptime information.
@@ -95,18 +101,19 @@ func (h MetricsHandler) GetMetrics(w http.ResponseWriter, r *http.Request) {
// Build metrics response
metricsResp := MetricsResponse{
Gauge: MetricsGauge{
CertificateTotal: dashboardSummary.TotalCertificates,
CertificateActive: dashboardSummary.TotalCertificates - dashboardSummary.ExpiringCertificates - dashboardSummary.ExpiredCertificates - dashboardSummary.RevokedCertificates,
CertificateTotal: dashboardSummary.TotalCertificates,
CertificateActive: dashboardSummary.TotalCertificates - dashboardSummary.ExpiringCertificates - dashboardSummary.ExpiredCertificates - dashboardSummary.RevokedCertificates,
CertificateExpiringSoon: dashboardSummary.ExpiringCertificates,
CertificateExpired: dashboardSummary.ExpiredCertificates,
CertificateRevoked: dashboardSummary.RevokedCertificates,
AgentTotal: dashboardSummary.TotalAgents,
AgentOnline: dashboardSummary.ActiveAgents,
JobPending: dashboardSummary.PendingJobs,
CertificateExpired: dashboardSummary.ExpiredCertificates,
CertificateRevoked: dashboardSummary.RevokedCertificates,
AgentTotal: dashboardSummary.TotalAgents,
AgentOnline: dashboardSummary.ActiveAgents,
JobPending: dashboardSummary.PendingJobs,
},
Counter: MetricsCounter{
JobCompletedTotal: dashboardSummary.CompleteJobs,
JobFailedTotal: dashboardSummary.FailedJobs,
JobCompletedTotal: dashboardSummary.CompleteJobs,
JobFailedTotal: dashboardSummary.FailedJobs,
NotificationsDeadTotal: dashboardSummary.NotificationsDead,
},
Uptime: UptimeMetric{
UptimeSeconds: int64(time.Since(h.serverStarted).Seconds()),
@@ -200,6 +207,17 @@ func (h MetricsHandler) GetPrometheusMetrics(w http.ResponseWriter, r *http.Requ
fmt.Fprintf(w, "# TYPE certctl_job_failed_total counter\n")
fmt.Fprintf(w, "certctl_job_failed_total %d\n\n", dashboardSummary.FailedJobs)
// I-005: notification dead-letter queue depth. Emitted with the _total
// suffix to match the existing certctl_job_completed_total /
// certctl_job_failed_total convention for DB-snapshot counters — the
// value is a point-in-time COUNT(*) of notification_events rows where
// status='dead', not a monotonically increasing process-lifetime counter.
// Operators alert on this as "dead-letter depth" (thresholds in the
// I-005 spec: > 0 → warning, > 10 → critical).
fmt.Fprintf(w, "# HELP certctl_notification_dead_total Number of notifications in the dead-letter queue.\n")
fmt.Fprintf(w, "# TYPE certctl_notification_dead_total counter\n")
fmt.Fprintf(w, "certctl_notification_dead_total %d\n\n", dashboardSummary.NotificationsDead)
// Info — server uptime
fmt.Fprintf(w, "# HELP certctl_uptime_seconds Server uptime in seconds.\n")
fmt.Fprintf(w, "# TYPE certctl_uptime_seconds gauge\n")
@@ -209,15 +227,21 @@ func (h MetricsHandler) GetPrometheusMetrics(w http.ResponseWriter, r *http.Requ
// DashboardSummary mirrors the service.DashboardSummary for JSON unmarshaling.
// JSON tags must match the service-layer struct exactly.
type DashboardSummary struct {
TotalCertificates int64 `json:"total_certificates"`
ExpiringCertificates int64 `json:"expiring_certificates"`
ExpiredCertificates int64 `json:"expired_certificates"`
RevokedCertificates int64 `json:"revoked_certificates"`
ActiveAgents int64 `json:"active_agents"`
OfflineAgents int64 `json:"offline_agents"`
TotalAgents int64 `json:"total_agents"`
PendingJobs int64 `json:"pending_jobs"`
FailedJobs int64 `json:"failed_jobs"`
CompleteJobs int64 `json:"complete_jobs"`
CompletedAt time.Time `json:"completed_at"`
TotalCertificates int64 `json:"total_certificates"`
ExpiringCertificates int64 `json:"expiring_certificates"`
ExpiredCertificates int64 `json:"expired_certificates"`
RevokedCertificates int64 `json:"revoked_certificates"`
ActiveAgents int64 `json:"active_agents"`
OfflineAgents int64 `json:"offline_agents"`
TotalAgents int64 `json:"total_agents"`
PendingJobs int64 `json:"pending_jobs"`
FailedJobs int64 `json:"failed_jobs"`
CompleteJobs int64 `json:"complete_jobs"`
// NotificationsDead mirrors service.DashboardSummary.NotificationsDead.
// JSON tag "notifications_dead" must match the service-layer struct
// exactly — this cross-package mirror avoids a direct import cycle and
// is driven by the I-005 Prometheus counter emission path. See
// GetPrometheusMetrics and MetricsCounter.NotificationsDeadTotal.
NotificationsDead int64 `json:"notifications_dead"`
CompletedAt time.Time `json:"completed_at"`
}
@@ -13,9 +13,11 @@ import (
// MockNotificationService is a mock implementation of NotificationService interface.
type MockNotificationService struct {
ListNotificationsFn func(page, perPage int) ([]domain.NotificationEvent, int64, error)
GetNotificationFn func(id string) (*domain.NotificationEvent, error)
MarkAsReadFn func(id string) error
ListNotificationsFn func(page, perPage int) ([]domain.NotificationEvent, int64, error)
ListNotificationsByStatusFn func(status string, page, perPage int) ([]domain.NotificationEvent, int64, error)
GetNotificationFn func(id string) (*domain.NotificationEvent, error)
MarkAsReadFn func(id string) error
RequeueFn func(id string) error
}
func (m *MockNotificationService) ListNotifications(_ context.Context, page, perPage int) ([]domain.NotificationEvent, int64, error) {
@@ -25,6 +27,13 @@ func (m *MockNotificationService) ListNotifications(_ context.Context, page, per
return nil, 0, nil
}
func (m *MockNotificationService) ListNotificationsByStatus(_ context.Context, status string, page, perPage int) ([]domain.NotificationEvent, int64, error) {
if m.ListNotificationsByStatusFn != nil {
return m.ListNotificationsByStatusFn(status, page, perPage)
}
return nil, 0, nil
}
func (m *MockNotificationService) GetNotification(_ context.Context, id string) (*domain.NotificationEvent, error) {
if m.GetNotificationFn != nil {
return m.GetNotificationFn(id)
@@ -39,6 +48,13 @@ func (m *MockNotificationService) MarkAsRead(_ context.Context, id string) error
return nil
}
func (m *MockNotificationService) RequeueNotification(_ context.Context, id string) error {
if m.RequeueFn != nil {
return m.RequeueFn(id)
}
return nil
}
func TestListNotifications_Success(t *testing.T) {
now := time.Now()
certID := "mc-prod-001"
@@ -282,3 +298,224 @@ func TestMarkAsRead_EmptyID(t *testing.T) {
t.Fatalf("expected status 400, got %d", w.Code)
}
}
// ---------------------------------------------------------------------------
// I-005: Notification Retry + Dead-Letter Queue handler contract (Phase 1 Red)
//
// These tests pin the HTTP surface Phase 2 Green must implement:
//
// 1. POST /api/v1/notifications/{id}/requeue — flips a dead notification
// back to 'pending' so the retry loop can pick it up again. The handler
// method does not exist yet (NotificationHandler has no RequeueNotification
// method) and the NotificationService interface does not declare
// RequeueNotification — both are compile-time Red halts.
//
// 2. GET /api/v1/notifications?status=dead — routes dead-letter list requests
// through ListNotificationsByStatus instead of ListNotifications. The
// status-filter routing does not exist yet, so ListNotificationsByStatusFn
// never fires — a runtime Red halt.
// ---------------------------------------------------------------------------
func TestRequeueNotification_Success(t *testing.T) {
var requeuedID string
mock := &MockNotificationService{
RequeueFn: func(id string) error {
requeuedID = id
return nil
},
}
handler := NewNotificationHandler(mock)
req := httptest.NewRequest(http.MethodPost, "/api/v1/notifications/notif-dead-001/requeue", nil)
req = req.WithContext(contextWithRequestID())
w := httptest.NewRecorder()
handler.RequeueNotification(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", w.Code)
}
if requeuedID != "notif-dead-001" {
t.Errorf("expected requeued ID 'notif-dead-001', got '%s'", requeuedID)
}
var resp map[string]string
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if resp["status"] != "requeued" {
t.Errorf("expected status 'requeued', got '%s'", resp["status"])
}
}
func TestRequeueNotification_NotFound(t *testing.T) {
mock := &MockNotificationService{
RequeueFn: func(id string) error {
return ErrMockNotFound
},
}
handler := NewNotificationHandler(mock)
req := httptest.NewRequest(http.MethodPost, "/api/v1/notifications/nonexistent/requeue", nil)
req = req.WithContext(contextWithRequestID())
w := httptest.NewRecorder()
handler.RequeueNotification(w, req)
if w.Code != http.StatusNotFound {
t.Fatalf("expected status 404, got %d", w.Code)
}
}
func TestRequeueNotification_ServiceError(t *testing.T) {
mock := &MockNotificationService{
RequeueFn: func(id string) error {
return ErrMockServiceFailed
},
}
handler := NewNotificationHandler(mock)
req := httptest.NewRequest(http.MethodPost, "/api/v1/notifications/notif-dead-001/requeue", nil)
req = req.WithContext(contextWithRequestID())
w := httptest.NewRecorder()
handler.RequeueNotification(w, req)
if w.Code != http.StatusInternalServerError {
t.Fatalf("expected status 500, got %d", w.Code)
}
}
func TestRequeueNotification_MethodNotAllowed(t *testing.T) {
handler := NewNotificationHandler(&MockNotificationService{})
req := httptest.NewRequest(http.MethodGet, "/api/v1/notifications/notif-dead-001/requeue", nil)
w := httptest.NewRecorder()
handler.RequeueNotification(w, req)
if w.Code != http.StatusMethodNotAllowed {
t.Fatalf("expected status 405, got %d", w.Code)
}
}
func TestRequeueNotification_EmptyID(t *testing.T) {
handler := NewNotificationHandler(&MockNotificationService{})
req := httptest.NewRequest(http.MethodPost, "/api/v1/notifications//requeue", nil)
req = req.WithContext(contextWithRequestID())
w := httptest.NewRecorder()
handler.RequeueNotification(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("expected status 400, got %d", w.Code)
}
}
func TestListNotifications_StatusFilter_Dead(t *testing.T) {
now := time.Now()
certID := "mc-prod-001"
lastErr := "SMTP connection refused"
nextRetry := now.Add(1 * time.Minute)
dead := domain.NotificationEvent{
ID: "notif-dead-001",
Type: domain.NotificationTypeExpirationWarning,
CertificateID: &certID,
Channel: domain.NotificationChannelEmail,
Recipient: "admin@example.com",
Message: "Certificate expiring in 7 days",
Status: "dead",
CreatedAt: now,
RetryCount: 5,
NextRetryAt: &nextRetry,
LastError: &lastErr,
}
var capturedStatus string
var capturedPage, capturedPerPage int
byStatusCalled := false
listCalled := false
mock := &MockNotificationService{
ListNotificationsFn: func(page, perPage int) ([]domain.NotificationEvent, int64, error) {
listCalled = true
return nil, 0, nil
},
ListNotificationsByStatusFn: func(status string, page, perPage int) ([]domain.NotificationEvent, int64, error) {
byStatusCalled = true
capturedStatus = status
capturedPage = page
capturedPerPage = perPage
return []domain.NotificationEvent{dead}, 1, nil
},
}
handler := NewNotificationHandler(mock)
req := httptest.NewRequest(http.MethodGet, "/api/v1/notifications?status=dead&page=1&per_page=50", nil)
req = req.WithContext(contextWithRequestID())
w := httptest.NewRecorder()
handler.ListNotifications(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", w.Code)
}
if !byStatusCalled {
t.Fatalf("expected ListNotificationsByStatus to be called for ?status=dead, but it was not")
}
if listCalled {
t.Errorf("ListNotifications should not be called when status filter is present")
}
if capturedStatus != "dead" {
t.Errorf("expected status='dead', got '%s'", capturedStatus)
}
if capturedPage != 1 {
t.Errorf("expected page=1, got %d", capturedPage)
}
if capturedPerPage != 50 {
t.Errorf("expected per_page=50, got %d", capturedPerPage)
}
var resp PagedResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("failed to decode response: %v", err)
}
if resp.Total != 1 {
t.Errorf("expected total=1 dead notification, got %d", resp.Total)
}
}
func TestListNotifications_NoStatusFilter_CallsDefault(t *testing.T) {
// Pin the inverse: when no ?status= is provided, the handler must call the
// existing ListNotifications path (not ListNotificationsByStatus). Phase 2
// Green must not break the default listing behavior for the plain tab.
listCalled := false
byStatusCalled := false
mock := &MockNotificationService{
ListNotificationsFn: func(page, perPage int) ([]domain.NotificationEvent, int64, error) {
listCalled = true
return []domain.NotificationEvent{}, 0, nil
},
ListNotificationsByStatusFn: func(status string, page, perPage int) ([]domain.NotificationEvent, int64, error) {
byStatusCalled = true
return nil, 0, nil
},
}
handler := NewNotificationHandler(mock)
req := httptest.NewRequest(http.MethodGet, "/api/v1/notifications", nil)
req = req.WithContext(contextWithRequestID())
w := httptest.NewRecorder()
handler.ListNotifications(w, req)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", w.Code)
}
if !listCalled {
t.Errorf("expected ListNotifications to be called when no status filter is present")
}
if byStatusCalled {
t.Errorf("ListNotificationsByStatus should not be called when no status filter is present")
}
}
+61 -1
View File
@@ -11,10 +11,17 @@ import (
)
// NotificationService defines the service interface for notification operations.
//
// ListNotificationsByStatus and RequeueNotification were added to close coverage
// gap I-005: the Dead letter tab on the GUI (?status=dead) needs a scoped
// listing path, and the Requeue action needs a dedicated endpoint that flips a
// dead notification back to 'pending' so the retry sweep can pick it up again.
type NotificationService interface {
ListNotifications(ctx context.Context, page, perPage int) ([]domain.NotificationEvent, int64, error)
ListNotificationsByStatus(ctx context.Context, status string, page, perPage int) ([]domain.NotificationEvent, int64, error)
GetNotification(ctx context.Context, id string) (*domain.NotificationEvent, error)
MarkAsRead(ctx context.Context, id string) error
RequeueNotification(ctx context.Context, id string) error
}
// NotificationHandler handles HTTP requests for notification operations.
@@ -51,7 +58,20 @@ func (h NotificationHandler) ListNotifications(w http.ResponseWriter, r *http.Re
}
}
notifications, total, err := h.svc.ListNotifications(r.Context(), page, perPage)
// I-005: branch to the status-scoped listing path when ?status= is present
// so the Dead letter tab on the GUI (?status=dead) can filter server-side.
// Empty status delegates to the original ListNotifications path to preserve
// the default tab's existing behavior.
var (
notifications []domain.NotificationEvent
total int64
err error
)
if status := query.Get("status"); status != "" {
notifications, total, err = h.svc.ListNotificationsByStatus(r.Context(), status, page, perPage)
} else {
notifications, total, err = h.svc.ListNotifications(r.Context(), page, perPage)
}
if err != nil {
ErrorWithRequestID(w, http.StatusInternalServerError, "Failed to list notifications", requestID)
return
@@ -124,3 +144,43 @@ func (h NotificationHandler) MarkAsRead(w http.ResponseWriter, r *http.Request)
JSON(w, http.StatusOK, response)
}
// RequeueNotification flips a dead notification back to 'pending' so the retry
// sweep (coverage gap I-005) can pick it up again on its next tick. The handler
// is strictly POST-only; GET/PUT/DELETE return 405. An empty id segment
// (/api/v1/notifications//requeue) returns 400. Service errors that carry a
// "not found" sentinel map to 404; all other service errors map to 500. This
// 404-vs-500 split mirrors GetCertificateDeployments at certificates.go:644.
// POST /api/v1/notifications/{id}/requeue
func (h NotificationHandler) RequeueNotification(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "Method not allowed")
return
}
requestID := middleware.GetRequestID(r.Context())
// Extract notification ID from path /api/v1/notifications/{id}/requeue
path := strings.TrimPrefix(r.URL.Path, "/api/v1/notifications/")
parts := strings.Split(path, "/")
if len(parts) < 2 || parts[0] == "" {
ErrorWithRequestID(w, http.StatusBadRequest, "Notification ID is required", requestID)
return
}
notificationID := parts[0]
if err := h.svc.RequeueNotification(r.Context(), notificationID); err != nil {
if strings.Contains(err.Error(), "not found") {
ErrorWithRequestID(w, http.StatusNotFound, "Notification not found", requestID)
return
}
ErrorWithRequestID(w, http.StatusInternalServerError, "Failed to requeue notification", requestID)
return
}
response := map[string]string{
"status": "requeued",
}
JSON(w, http.StatusOK, response)
}
+26 -22
View File
@@ -45,28 +45,28 @@ func (r *Router) RegisterFunc(pattern string, handler func(http.ResponseWriter,
// HandlerRegistry groups all API handler dependencies for router registration.
type HandlerRegistry struct {
Certificates handler.CertificateHandler
Issuers handler.IssuerHandler
Targets handler.TargetHandler
Agents handler.AgentHandler
Jobs handler.JobHandler
Policies handler.PolicyHandler
Profiles handler.ProfileHandler
Teams handler.TeamHandler
Owners handler.OwnerHandler
AgentGroups handler.AgentGroupHandler
Audit handler.AuditHandler
Notifications handler.NotificationHandler
Stats handler.StatsHandler
Metrics handler.MetricsHandler
Health handler.HealthHandler
Discovery handler.DiscoveryHandler
NetworkScan handler.NetworkScanHandler
Verification handler.VerificationHandler
Export handler.ExportHandler
Digest handler.DigestHandler
HealthChecks *handler.HealthCheckHandler
BulkRevocation handler.BulkRevocationHandler
Certificates handler.CertificateHandler
Issuers handler.IssuerHandler
Targets handler.TargetHandler
Agents handler.AgentHandler
Jobs handler.JobHandler
Policies handler.PolicyHandler
Profiles handler.ProfileHandler
Teams handler.TeamHandler
Owners handler.OwnerHandler
AgentGroups handler.AgentGroupHandler
Audit handler.AuditHandler
Notifications handler.NotificationHandler
Stats handler.StatsHandler
Metrics handler.MetricsHandler
Health handler.HealthHandler
Discovery handler.DiscoveryHandler
NetworkScan handler.NetworkScanHandler
Verification handler.VerificationHandler
Export handler.ExportHandler
Digest handler.DigestHandler
HealthChecks *handler.HealthCheckHandler
BulkRevocation handler.BulkRevocationHandler
}
// RegisterHandlers sets up all API routes with their handlers.
@@ -204,6 +204,10 @@ func (r *Router) RegisterHandlers(reg HandlerRegistry) {
r.Register("GET /api/v1/notifications", http.HandlerFunc(reg.Notifications.ListNotifications))
r.Register("GET /api/v1/notifications/{id}", http.HandlerFunc(reg.Notifications.GetNotification))
r.Register("POST /api/v1/notifications/{id}/read", http.HandlerFunc(reg.Notifications.MarkAsRead))
// I-005: requeue a dead notification back to pending so the retry sweep
// picks it up again. Go 1.22 ServeMux resolves the literal /requeue segment
// before falling back to the {id} path-variable route above.
r.Register("POST /api/v1/notifications/{id}/requeue", http.HandlerFunc(reg.Notifications.RequeueNotification))
// Stats routes: /api/v1/stats
r.Register("GET /api/v1/stats/summary", http.HandlerFunc(reg.Stats.GetDashboardSummary))
+52 -28
View File
@@ -12,29 +12,29 @@ import (
// Config represents the complete application configuration.
// All configuration values are read from environment variables with CERTCTL_ prefix.
type Config struct {
Server ServerConfig
Database DatabaseConfig
Scheduler SchedulerConfig
Log LogConfig
Auth AuthConfig
RateLimit RateLimitConfig
CORS CORSConfig
Keygen KeygenConfig
CA CAConfig
Notifiers NotifierConfig
NetworkScan NetworkScanConfig
EST ESTConfig
SCEP SCEPConfig
Verification VerificationConfig
ACME ACMEConfig
Vault VaultConfig
DigiCert DigiCertConfig
Sectigo SectigoConfig
GoogleCAS GoogleCASConfig
AWSACMPCA AWSACMPCAConfig
Entrust EntrustConfig
GlobalSign GlobalSignConfig
EJBCA EJBCAConfig
Server ServerConfig
Database DatabaseConfig
Scheduler SchedulerConfig
Log LogConfig
Auth AuthConfig
RateLimit RateLimitConfig
CORS CORSConfig
Keygen KeygenConfig
CA CAConfig
Notifiers NotifierConfig
NetworkScan NetworkScanConfig
EST ESTConfig
SCEP SCEPConfig
Verification VerificationConfig
ACME ACMEConfig
Vault VaultConfig
DigiCert DigiCertConfig
Sectigo SectigoConfig
GoogleCAS GoogleCASConfig
AWSACMPCA AWSACMPCAConfig
Entrust EntrustConfig
GlobalSign GlobalSignConfig
EJBCA EJBCAConfig
Digest DigestConfig
HealthCheck HealthCheckConfig
Encryption EncryptionConfig
@@ -708,6 +708,17 @@ type SchedulerConfig struct {
// Setting: CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL environment variable.
NotificationProcessInterval time.Duration
// NotificationRetryInterval is how often the scheduler retries failed
// notifications whose retry_count is below the service-layer 5-attempt
// DLQ budget. Default: 2 minutes. Minimum: 1 second. Mirrors the I-001
// RetryInterval knob: transitions eligible Failed notifications whose
// next_retry_at has arrived back to Pending so the notification processor
// picks them up on its next tick (closes coverage gap I-005 — HEAD had
// no retry path for transient SMTP/webhook failures and notifications
// stayed Failed forever).
// Setting: CERTCTL_NOTIFICATION_RETRY_INTERVAL environment variable.
NotificationRetryInterval time.Duration
// RetryInterval is how often the scheduler retries failed jobs whose Attempts
// counter is below MaxAttempts. Default: 5 minutes. Minimum: 1 second.
// Transitions eligible Failed jobs back to Pending so the job processor can
@@ -838,10 +849,16 @@ func Load() (*Config, error) {
JobProcessorInterval: getEnvDuration("CERTCTL_SCHEDULER_JOB_PROCESSOR_INTERVAL", 30*time.Second),
AgentHealthCheckInterval: getEnvDuration("CERTCTL_SCHEDULER_AGENT_HEALTH_CHECK_INTERVAL", 2*time.Minute),
NotificationProcessInterval: getEnvDuration("CERTCTL_SCHEDULER_NOTIFICATION_PROCESS_INTERVAL", 1*time.Minute),
RetryInterval: getEnvDuration("CERTCTL_SCHEDULER_RETRY_INTERVAL", 5*time.Minute),
JobTimeoutInterval: getEnvDuration("CERTCTL_JOB_TIMEOUT_INTERVAL", 10*time.Minute),
AwaitingCSRTimeout: getEnvDuration("CERTCTL_JOB_AWAITING_CSR_TIMEOUT", 24*time.Hour),
AwaitingApprovalTimeout: getEnvDuration("CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT", 168*time.Hour),
// I-005: retry sweep for failed notifications. Mirrors RetryInterval
// (I-001 job retry) but scoped to the notification DLQ machinery.
// Default 2 minutes — fast enough to absorb transient SMTP/webhook
// blips, slow enough to respect the service-layer 5-attempt budget
// without hammering external notifier endpoints.
NotificationRetryInterval: getEnvDuration("CERTCTL_NOTIFICATION_RETRY_INTERVAL", 2*time.Minute),
RetryInterval: getEnvDuration("CERTCTL_SCHEDULER_RETRY_INTERVAL", 5*time.Minute),
JobTimeoutInterval: getEnvDuration("CERTCTL_JOB_TIMEOUT_INTERVAL", 10*time.Minute),
AwaitingCSRTimeout: getEnvDuration("CERTCTL_JOB_AWAITING_CSR_TIMEOUT", 24*time.Hour),
AwaitingApprovalTimeout: getEnvDuration("CERTCTL_JOB_AWAITING_APPROVAL_TIMEOUT", 168*time.Hour),
},
Log: LogConfig{
Level: getEnv("CERTCTL_LOG_LEVEL", "info"),
@@ -871,7 +888,7 @@ func Load() (*Config, error) {
Notifiers: NotifierConfig{
SlackWebhookURL: getEnv("CERTCTL_SLACK_WEBHOOK_URL", ""),
SlackChannel: getEnv("CERTCTL_SLACK_CHANNEL", ""),
SlackUsername: getEnv("CERTCTL_SLACK_USERNAME", "certctl"),
SlackUsername: getEnv("CERTCTL_SLACK_USERNAME", "certctl"),
TeamsWebhookURL: getEnv("CERTCTL_TEAMS_WEBHOOK_URL", ""),
PagerDutyRoutingKey: getEnv("CERTCTL_PAGERDUTY_ROUTING_KEY", ""),
PagerDutySeverity: getEnv("CERTCTL_PAGERDUTY_SEVERITY", "warning"),
@@ -1109,6 +1126,13 @@ func (c *Config) Validate() error {
return fmt.Errorf("notification process interval must be at least 1 second")
}
// I-005: guard against a misconfigured retry sweep that would either
// spin-wait or never fire. Matches the NotificationProcessInterval
// minimum (1s) so operators can tune both knobs from the same floor.
if c.Scheduler.NotificationRetryInterval < 1*time.Second {
return fmt.Errorf("notification retry interval must be at least 1 second")
}
if c.Scheduler.RetryInterval < 1*time.Second {
return fmt.Errorf("retry interval must be at least 1 second")
}
+37
View File
@@ -5,6 +5,15 @@ import (
)
// NotificationEvent records a notification sent to users about certificate events.
//
// I-005 extends the event with a retry counter, a nullable next-retry timestamp
// that drives the retry-sweep partial index, and a nullable last-error string
// preserving the most recent transient failure so operators triaging the dead
// letter queue can see *why* a notification died without chasing server logs.
// Status stays a plain `string` (not retyped to NotificationStatus) because the
// repo layer materialises it directly from PostgreSQL's VARCHAR column and the
// service layer compares against the NotificationStatus* constants via
// `string(...)` casts at call sites — see service.RetryFailedNotifications.
type NotificationEvent struct {
ID string `json:"id"`
Type NotificationType `json:"type"`
@@ -15,9 +24,37 @@ type NotificationEvent struct {
SentAt *time.Time `json:"sent_at,omitempty"`
Status string `json:"status"`
Error *string `json:"error,omitempty"`
RetryCount int `json:"retry_count"`
NextRetryAt *time.Time `json:"next_retry_at,omitempty"`
LastError *string `json:"last_error,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
// NotificationStatus is the typed string alias for the lifecycle status of a
// NotificationEvent. It mirrors the VARCHAR(50) column on notification_events
// and the status values used by the I-005 retry/DLQ machinery.
//
// Status transitions:
//
// pending → sent (delivery succeeded)
// pending → failed → pending (transient failure, re-armed by retry sweep)
// pending → failed → dead (retry_count reached max_attempts; DLQ)
// pending → read (operator acknowledged, no delivery needed)
//
// Values are lowercase to match the pre-I-005 on-wire representation used by
// existing UpdateStatus calls and the seed_demo.sql fixtures; retyping
// NotificationEvent.Status to NotificationStatus would be a breaking DB scan
// change, so the type is kept additive and consumed via `string(const)` casts.
type NotificationStatus string
const (
NotificationStatusPending NotificationStatus = "pending"
NotificationStatusSent NotificationStatus = "sent"
NotificationStatusFailed NotificationStatus = "failed"
NotificationStatusDead NotificationStatus = "dead"
NotificationStatusRead NotificationStatus = "read"
)
// NotificationType represents the event that triggered a notification.
type NotificationType string
+55 -1
View File
@@ -1,6 +1,9 @@
package domain
import "testing"
import (
"testing"
"time"
)
func TestNotificationType_Constants(t *testing.T) {
tests := map[string]NotificationType{
@@ -71,3 +74,54 @@ func TestNotificationEvent_Fields(t *testing.T) {
t.Errorf("expected error 'failed to send', got %v", event.Error)
}
}
// TestNotificationStatus_Constants verifies that I-005 introduces a typed
// NotificationStatus alongside canonical lowercase string constants covering
// the pending → sent, pending → failed → dead, and pending → read transitions.
// The Red signal here is a compile error: the type and the NotificationStatusDead
// constant do not exist before Phase 2 Green.
func TestNotificationStatus_Constants(t *testing.T) {
tests := map[string]NotificationStatus{
"pending": NotificationStatusPending,
"sent": NotificationStatusSent,
"failed": NotificationStatusFailed,
"dead": NotificationStatusDead,
"read": NotificationStatusRead,
}
for expected, got := range tests {
if string(got) != expected {
t.Errorf("expected %q, got %q", expected, string(got))
}
}
}
// TestNotificationEvent_RetryFields verifies the I-005 retry/DLQ columns are
// surfaced on the domain model: a RetryCount counter, a nullable NextRetryAt
// timestamp used by the retry-sweep partial index, and a nullable LastError
// string preserving the most recent transient failure for operator triage.
// The Red signal is a compile error — these fields do not exist yet.
func TestNotificationEvent_RetryFields(t *testing.T) {
next := time.Now().Add(2 * time.Minute)
lastErr := "connection refused"
event := &NotificationEvent{
ID: "notif-retry-001",
Type: NotificationTypeExpirationWarning,
Channel: NotificationChannelWebhook,
Recipient: "https://hooks.example.com/certs",
Message: "retry me",
Status: string(NotificationStatusFailed),
RetryCount: 3,
NextRetryAt: &next,
LastError: &lastErr,
}
if event.RetryCount != 3 {
t.Errorf("expected RetryCount 3, got %d", event.RetryCount)
}
if event.NextRetryAt == nil || !event.NextRetryAt.Equal(next) {
t.Errorf("expected NextRetryAt %v, got %v", next, event.NextRetryAt)
}
if event.LastError == nil || *event.LastError != "connection refused" {
t.Errorf("expected LastError 'connection refused', got %v", event.LastError)
}
}
+59 -19
View File
@@ -103,25 +103,25 @@ func TestCertificateLifecycle(t *testing.T) {
// Create router and register handlers
r := router.New()
r.RegisterHandlers(router.HandlerRegistry{
Certificates: certificateHandler,
Issuers: issuerHandler,
Targets: targetHandler,
Agents: agentHandler,
Jobs: jobHandler,
Policies: policyHandler,
Profiles: profileHandler,
Teams: teamHandler,
Owners: ownerHandler,
AgentGroups: agentGroupHandler,
Audit: auditHandler,
Notifications: notificationHandler,
Stats: statsHandler,
Metrics: metricsHandler,
Health: healthHandler,
Discovery: discoveryHandler,
NetworkScan: networkScanHandler,
Verification: verificationHandler,
BulkRevocation: handler.BulkRevocationHandler{},
Certificates: certificateHandler,
Issuers: issuerHandler,
Targets: targetHandler,
Agents: agentHandler,
Jobs: jobHandler,
Policies: policyHandler,
Profiles: profileHandler,
Teams: teamHandler,
Owners: ownerHandler,
AgentGroups: agentGroupHandler,
Audit: auditHandler,
Notifications: notificationHandler,
Stats: statsHandler,
Metrics: metricsHandler,
Health: healthHandler,
Discovery: discoveryHandler,
NetworkScan: networkScanHandler,
Verification: verificationHandler,
BulkRevocation: handler.BulkRevocationHandler{},
})
r.RegisterESTHandlers(estHandler)
@@ -1022,6 +1022,46 @@ func (m *mockNotificationRepository) UpdateStatus(ctx context.Context, id string
return fmt.Errorf("notification not found")
}
// I-005: retry/DLQ interface satisfiers. The integration tests in this package
// drive the end-to-end lifecycle against a NotificationService which requires
// the full repository.NotificationRepository interface, but none of the
// lifecycle scenarios exercise the retry sweep or dead-letter transitions —
// they're covered by unit tests in internal/service/notification_test.go. So
// these are deliberate no-op / panic-free stubs whose only job is to satisfy
// the compile-time interface contract. If a future integration test needs
// real retry semantics, promote this mock to match internal/service's
// mockNotifRepo (testutil_test.go:410) one-for-one.
func (m *mockNotificationRepository) ListRetryEligible(ctx context.Context, now time.Time, maxAttempts, limit int) ([]*domain.NotificationEvent, error) {
return nil, nil
}
func (m *mockNotificationRepository) RecordFailedAttempt(ctx context.Context, id string, lastError string, nextRetryAt time.Time) error {
return nil
}
func (m *mockNotificationRepository) MarkAsDead(ctx context.Context, id string, lastError string) error {
return nil
}
func (m *mockNotificationRepository) Requeue(ctx context.Context, id string) error {
return nil
}
// CountByStatus satisfies the NotificationRepository interface contract added
// by I-005 Phase 2 Green. Counts in-memory rows so StatsService wiring exercised
// by the lifecycle integration tests gets a truthful count even though the
// retry/DLQ surface isn't driven here.
func (m *mockNotificationRepository) CountByStatus(ctx context.Context, status string) (int64, error) {
var count int64
for _, n := range m.notifications {
if n.Status == status {
count++
}
}
return count, nil
}
type mockPolicyRepository struct {
rules map[string]*domain.PolicyRule
violations []*domain.PolicyViolation
+22 -3
View File
@@ -974,9 +974,13 @@ func registerAuditTools(s *gomcp.Server, c *Client) {
func registerNotificationTools(s *gomcp.Server, c *Client) {
gomcp.AddTool(s, &gomcp.Tool{
Name: "certctl_list_notifications",
Description: "List notification events (expiration warnings, renewal/deployment results, policy violations, revocations).",
}, func(ctx context.Context, req *gomcp.CallToolRequest, input ListParams) (*gomcp.CallToolResult, any, error) {
data, err := c.Get("/api/v1/notifications", paginationQuery(input.Page, input.PerPage))
Description: "List notification events (expiration warnings, renewal/deployment results, policy violations, revocations). Optional status filter supports the I-005 Dead letter tab (status=dead).",
}, func(ctx context.Context, req *gomcp.CallToolRequest, input ListNotificationsInput) (*gomcp.CallToolResult, any, error) {
q := paginationQuery(input.Page, input.PerPage)
if input.Status != "" {
q.Set("status", input.Status)
}
data, err := c.Get("/api/v1/notifications", q)
if err != nil {
return errorResult(err)
}
@@ -1004,6 +1008,21 @@ func registerNotificationTools(s *gomcp.Server, c *Client) {
}
return textResult(data)
})
// I-005: requeue a dead-letter notification. Flips status from 'dead'
// back to 'pending' and clears next_retry_at so the retry sweep picks
// the notification up on its next tick. Operator-triggered; the tool
// is the MCP counterpart of the GUI's Dead letter tab "Requeue" button.
gomcp.AddTool(s, &gomcp.Tool{
Name: "certctl_requeue_notification",
Description: "Requeue a dead notification back to pending so the retry sweep can deliver it again. Used to recover from persistent delivery failures after the underlying issue (SMTP config, webhook endpoint, etc.) has been fixed.",
}, func(ctx context.Context, req *gomcp.CallToolRequest, input GetByIDInput) (*gomcp.CallToolResult, any, error) {
data, err := c.Post("/api/v1/notifications/"+input.ID+"/requeue", nil)
if err != nil {
return errorResult(err)
}
return textResult(data)
})
}
// ── Stats ───────────────────────────────────────────────────────────
+10
View File
@@ -182,6 +182,16 @@ type RejectJobInput struct {
Reason string `json:"reason,omitempty" jsonschema:"Reason for rejection"`
}
// ── Notifications ───────────────────────────────────────────────────
// ListNotificationsInput adds the I-005 status filter on top of the standard
// pagination params. Status="dead" drives the Dead letter tab use case;
// empty status preserves the pre-I-005 list-all behavior.
type ListNotificationsInput struct {
ListParams
Status string `json:"status,omitempty" jsonschema:"Filter by status: pending, sent, failed, dead, read"`
}
// ── Policies ────────────────────────────────────────────────────────
type CreatePolicyInput struct {
+44
View File
@@ -285,6 +285,12 @@ type AuditRepository interface {
}
// NotificationRepository defines operations for managing notifications.
//
// I-005 extends the interface with four retry/DLQ methods. The retry scheduler
// loop calls ListRetryEligible on every tick to pull overdue failed rows, then
// either RecordFailedAttempt (still-retrying) or MarkAsDead (exhausted). The
// operator-facing dead-letter tab calls Requeue to move a row from 'dead' (or
// 'failed') back to 'pending' so ProcessPendingNotifications picks it up again.
type NotificationRepository interface {
// Create stores a new notification.
Create(ctx context.Context, notif *domain.NotificationEvent) error
@@ -292,6 +298,44 @@ type NotificationRepository interface {
List(ctx context.Context, filter *NotificationFilter) ([]*domain.NotificationEvent, error)
// UpdateStatus updates a notification's delivery status.
UpdateStatus(ctx context.Context, id string, status string, sentAt time.Time) error
// ListRetryEligible returns failed notification rows whose next_retry_at
// is <= now AND retry_count < maxAttempts, ordered by next_retry_at ASC
// (oldest overdue first — same fairness as I-001's RetryFailedJobs). The
// WHERE clause mirrors the partial retry-sweep index predicate from
// migration 000016 so the planner uses it. A limit<=0 is normalised to
// a sane default in the repo implementation to avoid accidental unbounded
// sweeps. I-005 coverage-gap closure.
ListRetryEligible(ctx context.Context, now time.Time, maxAttempts, limit int) ([]*domain.NotificationEvent, error)
// RecordFailedAttempt is called by the retry sweep after a notifier.Send
// transient failure. The UPDATE increments retry_count by exactly 1,
// overwrites last_error, overwrites next_retry_at, and KEEPS status='failed'
// so the row remains a candidate for ListRetryEligible on the next sweep.
// Returns "not found" when no row matches the id (mirrors UpdateStatus).
// I-005 coverage-gap closure.
RecordFailedAttempt(ctx context.Context, id string, lastError string, nextRetryAt time.Time) error
// MarkAsDead performs the DLQ transition when retry_count reaches
// max_attempts. Flips status='dead', clears next_retry_at so the partial
// retry-sweep index drops the row, writes the final last_error, and
// PRESERVES retry_count as historical evidence of how many attempts were
// burned. Returns "not found" when no row matches.
// I-005 coverage-gap closure.
MarkAsDead(ctx context.Context, id string, lastError string) error
// Requeue is the operator "try again" action from the UI's Dead letter
// tab. Flips status='pending' (so ProcessPendingNotifications picks it
// up), resets retry_count to 0 (otherwise the operator's first retry
// would already be at hour-long waits), clears next_retry_at, and clears
// last_error. Valid from both 'dead' and 'failed'. Returns "not found"
// when no row matches. I-005 coverage-gap closure.
Requeue(ctx context.Context, id string) error
// CountByStatus returns the number of notification_events rows whose
// status column matches the given string exactly. Used by StatsService
// to populate DashboardSummary.NotificationsDead which in turn drives
// the Prometheus counter certctl_notification_dead_total (I-005 Phase 2
// observability gate). A dedicated SQL COUNT(*) is used instead of
// List(filter{Status: ...}) because List silently resets PerPage>500 to
// 50 — a latent scale bug for any status-filtered count. I-005
// coverage-gap closure.
CountByStatus(ctx context.Context, status string) (int64, error)
}
// TeamRepository defines operations for managing teams.
@@ -0,0 +1,256 @@
package postgres_test
import (
"context"
"database/sql"
"strings"
"testing"
)
// TestMigration000016_NotificationRetryRoundTrip is the Phase 1 Red regression
// test for I-005 ("failed webhook/email drops critical alerts — no retry, no
// DLQ, no escalation"). The fix depends on a new migration,
// 000016_notification_retry.up.sql + .down.sql, which must:
//
// 1. Add `retry_count INTEGER NOT NULL DEFAULT 0` on notification_events.
// Mirrors migration 000015's column-nullability pattern: explicit
// NOT NULL + default so existing rows backfill cleanly and the service
// layer never has to nil-check the counter. The 0 default is what lets
// the retry scheduler promote a row from failed → pending on its very
// first sweep without a bespoke backfill.
//
// 2. Add `next_retry_at TIMESTAMPTZ` (nullable) on notification_events.
// Populated by the service layer on every failed→pending transition
// using exponential backoff (2^retry_count minutes, cap 1h). Nullable
// because the field is only meaningful while a row sits in 'failed'
// state; 'sent', 'pending', 'dead', and 'read' rows leave it NULL.
//
// 3. Add `last_error TEXT` (nullable) on notification_events. TEXT
// (not VARCHAR(N)) because notifier errors can include full HTTP
// response bodies, TLS handshake diagnostics, or stringified stack
// traces. Truncation here would kick the operator back to the server
// log, which is exactly the triage pain I-005 is meant to eliminate.
//
// 4. Create the partial retry-sweep index
// `idx_notification_events_retry_sweep ON notification_events(next_retry_at)
// WHERE status = 'failed' AND next_retry_at IS NOT NULL`.
// The predicate keeps the index tiny in a healthy fleet — only failed
// rows scheduled for retry participate; sent/pending/dead/read rows and
// unscheduled failures are excluded. Makes the retry sweep in
// RetryFailedNotifications O(retry-eligible) rather than O(total-events).
//
// The round-trip also validates that the down migration cleanly reverses all
// four schema additions, so an operator who lands on a rollback can still
// boot the server. Stage 4 asserts idempotency — the up migration must be
// safely re-runnable after a partial rollback, which requires ADD COLUMN
// IF NOT EXISTS and CREATE INDEX IF NOT EXISTS on every new object.
//
// Red-until-Green: this test compiles but fails until
// migrations/000016_notification_retry.up.sql + .down.sql exist with the
// right schema, because freshSchema(t) runs every `.up.sql` in lexical order
// — the new migration runs automatically once Phase 2 creates the files.
func TestMigration000016_NotificationRetryRoundTrip(t *testing.T) {
tdb := getTestDB(t)
db := tdb.freshSchema(t)
ctx := context.Background()
// ─── Stage 1: Post-up assertions ─────────────────────────────────────
//
// After every .up.sql migration (including the new 000016) has run, the
// three new columns and the partial retry-sweep index must be observable
// in the catalog.
// All three retry columns must be present on notification_events.
assertColumnExists(t, db, "notification_events", "retry_count")
assertColumnExists(t, db, "notification_events", "next_retry_at")
assertColumnExists(t, db, "notification_events", "last_error")
// retry_count must be NOT NULL with a server-side default of 0. The
// scheduler's failed→pending transition relies on reading the counter
// without a COALESCE, and the back-fill on existing rows must be
// deterministic; 0 is the only safe default for an attempt counter.
assertColumnNotNull(t, db, "notification_events", "retry_count", true)
assertColumnDefaultContains(t, db, "notification_events", "retry_count", "0")
// next_retry_at and last_error are nullable by design — see the Stage 1
// doc block above for why. A NOT NULL constraint here would force the
// service layer to write sentinel values on every terminal-status
// transition, which is worse than just leaving them NULL.
assertColumnNotNull(t, db, "notification_events", "next_retry_at", false)
assertColumnNotNull(t, db, "notification_events", "last_error", false)
// The partial retry-sweep index must exist on notification_events and
// must include the WHERE predicate that restricts it to failed+scheduled
// rows. Without the predicate the index is merely an index on
// next_retry_at — correct semantics, but it would balloon in a busy
// fleet because every sent/read row would sit in it with a NULL key.
assertIndexExists(t, db, "idx_notification_events_retry_sweep")
assertIndexPredicateContains(t, db, "idx_notification_events_retry_sweep", "status = 'failed'")
assertIndexPredicateContains(t, db, "idx_notification_events_retry_sweep", "next_retry_at IS NOT NULL")
// ─── Stage 2: Run the 000016 down migration manually ─────────────────
//
// testutil_test.go's runMigrations helper only runs *.up.sql. To exercise
// the down migration I read and execute it by hand, then re-check the
// catalog.
downSQL := readMigrationFile(t, "000016_notification_retry.down.sql")
if _, err := db.ExecContext(ctx, downSQL); err != nil {
t.Fatalf("000016 down migration failed: %v", err)
}
// Stage 3: Post-down assertions — all three columns removed, partial
// index dropped.
assertColumnGone(t, db, "notification_events", "retry_count")
assertColumnGone(t, db, "notification_events", "next_retry_at")
assertColumnGone(t, db, "notification_events", "last_error")
assertIndexGone(t, db, "idx_notification_events_retry_sweep")
// ─── Stage 4: Re-run the up migration for idempotency ────────────────
//
// The up migration must be safely re-runnable — operators sometimes
// re-apply by hand after a partial rollback. Use ADD COLUMN IF NOT
// EXISTS and CREATE INDEX IF NOT EXISTS so every converging run is a
// no-op.
upSQL := readMigrationFile(t, "000016_notification_retry.up.sql")
if _, err := db.ExecContext(ctx, upSQL); err != nil {
t.Fatalf("000016 up migration re-apply failed (must be idempotent): %v", err)
}
assertColumnExists(t, db, "notification_events", "retry_count")
assertColumnExists(t, db, "notification_events", "next_retry_at")
assertColumnExists(t, db, "notification_events", "last_error")
assertIndexExists(t, db, "idx_notification_events_retry_sweep")
}
// ─── Extra catalog helpers for 000016 ─────────────────────────────────────
//
// These are additive to the column-existence and FK helpers defined in
// migration_000015_test.go. Both files live in `package postgres_test`, so
// assertColumnExists / assertColumnGone / readMigrationFile are already in
// scope from the 000015 test file and must not be redeclared.
// assertColumnNotNull asserts that the information_schema reports the
// expected nullability for a column. PG exposes `is_nullable` as the string
// 'YES' or 'NO'; we translate to a bool so the call site reads cleanly.
func assertColumnNotNull(t *testing.T, db *sql.DB, table, column string, wantNotNull bool) {
t.Helper()
var isNullable string
err := db.QueryRowContext(context.Background(), `
SELECT is_nullable
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = $1
AND column_name = $2
`, table, column).Scan(&isNullable)
if err == sql.ErrNoRows {
t.Fatalf("column %s.%s not found in current_schema (migration missing?)", table, column)
}
if err != nil {
t.Fatalf("is_nullable lookup for %s.%s failed: %v", table, column, err)
}
gotNotNull := isNullable == "NO"
if gotNotNull != wantNotNull {
t.Errorf("column %s.%s nullability: got NOT NULL=%v, want NOT NULL=%v (is_nullable=%q)",
table, column, gotNotNull, wantNotNull, isNullable)
}
}
// assertColumnDefaultContains asserts that the server-side DEFAULT clause for
// a column contains the expected substring. Postgres can render defaults in
// a few different normalized shapes (`0`, `(0)::integer`, `0::integer`),
// so substring matching is more robust than exact equality here.
func assertColumnDefaultContains(t *testing.T, db *sql.DB, table, column, wantSubstr string) {
t.Helper()
var columnDefault sql.NullString
err := db.QueryRowContext(context.Background(), `
SELECT column_default
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = $1
AND column_name = $2
`, table, column).Scan(&columnDefault)
if err == sql.ErrNoRows {
t.Fatalf("column %s.%s not found in current_schema (migration missing?)", table, column)
}
if err != nil {
t.Fatalf("column_default lookup for %s.%s failed: %v", table, column, err)
}
if !columnDefault.Valid {
t.Errorf("column %s.%s has no DEFAULT clause; want substring %q", table, column, wantSubstr)
return
}
if !strings.Contains(columnDefault.String, wantSubstr) {
t.Errorf("column %s.%s DEFAULT = %q; want substring %q",
table, column, columnDefault.String, wantSubstr)
}
}
// assertIndexExists asserts that a named index exists in the current schema.
// Scoped via pg_indexes.schemaname = current_schema() so schema-per-test
// isolation holds.
func assertIndexExists(t *testing.T, db *sql.DB, indexName string) {
t.Helper()
var exists bool
err := db.QueryRowContext(context.Background(), `
SELECT EXISTS (
SELECT 1 FROM pg_indexes
WHERE schemaname = current_schema()
AND indexname = $1
)`, indexName).Scan(&exists)
if err != nil {
t.Fatalf("index existence query failed for %s: %v", indexName, err)
}
if !exists {
t.Errorf("expected index %s to exist after 000016 up (migration missing or drifted)", indexName)
}
}
// assertIndexGone is the negative form, used after the down migration to
// confirm the partial retry-sweep index has been dropped.
func assertIndexGone(t *testing.T, db *sql.DB, indexName string) {
t.Helper()
var exists bool
err := db.QueryRowContext(context.Background(), `
SELECT EXISTS (
SELECT 1 FROM pg_indexes
WHERE schemaname = current_schema()
AND indexname = $1
)`, indexName).Scan(&exists)
if err != nil {
t.Fatalf("index existence query failed for %s: %v", indexName, err)
}
if exists {
t.Errorf("expected index %s to be removed after 000016 down (down migration is incomplete)", indexName)
}
}
// assertIndexPredicateContains asserts that the reconstructed `indexdef`
// (pg_indexes.indexdef — the CREATE INDEX statement Postgres would emit to
// recreate the index) contains the expected substring. This is how we pin
// the WHERE predicate of a partial index without parsing the SQL.
//
// Postgres normalises the predicate (e.g. single-quoted literals stay
// single-quoted, column references are bare), so substring matching is both
// sufficient and robust against cosmetic reformatting.
func assertIndexPredicateContains(t *testing.T, db *sql.DB, indexName, wantSubstr string) {
t.Helper()
var indexdef string
err := db.QueryRowContext(context.Background(), `
SELECT indexdef
FROM pg_indexes
WHERE schemaname = current_schema()
AND indexname = $1
`, indexName).Scan(&indexdef)
if err == sql.ErrNoRows {
t.Fatalf("index %s not found in current_schema (migration missing?)", indexName)
}
if err != nil {
t.Fatalf("indexdef lookup for %s failed: %v", indexName, err)
}
if !strings.Contains(indexdef, wantSubstr) {
t.Errorf("index %s definition missing expected predicate fragment %q\nfull indexdef: %s",
indexName, wantSubstr, indexdef)
}
}
+235 -4
View File
@@ -100,10 +100,14 @@ func (r *NotificationRepository) List(ctx context.Context, filter *repository.No
return nil, fmt.Errorf("failed to count notifications: %w", err)
}
// Get paginated results
// Get paginated results. I-005 extends the SELECT with the three retry
// columns (retry_count / next_retry_at / last_error) so scanNotification
// can populate the new fields on domain.NotificationEvent. The column
// order here MUST stay in lockstep with scanNotification below.
offset := (filter.Page - 1) * filter.PerPage
query := fmt.Sprintf(`
SELECT id, type, certificate_id, channel, recipient, message, sent_at, status, error
SELECT id, type, certificate_id, channel, recipient, message, sent_at, status, error,
retry_count, next_retry_at, last_error
FROM notification_events
%s
ORDER BY sent_at DESC NULLS LAST
@@ -156,13 +160,23 @@ func (r *NotificationRepository) UpdateStatus(ctx context.Context, id string, st
return nil
}
// scanNotification scans a notification from a row or rows
// scanNotification scans a notification from a row or rows.
//
// I-005 extends the scan list from 9 → 12 columns (adds retry_count,
// next_retry_at, last_error). Every caller — List and the four new retry
// methods below — funnels rows through this helper, so the SELECT column
// order in every query must match the Scan order here exactly. RetryCount
// scans into an `int` (migration 000016 declares the column NOT NULL with
// DEFAULT 0), while NextRetryAt and LastError scan into pointer types
// because the column is nullable — a healthy pending/sent/dead row leaves
// both NULL.
func scanNotification(scanner interface {
Scan(...interface{}) error
}) (*domain.NotificationEvent, error) {
var notif domain.NotificationEvent
err := scanner.Scan(&notif.ID, &notif.Type, &notif.CertificateID, &notif.Channel,
&notif.Recipient, &notif.Message, &notif.SentAt, &notif.Status, &notif.Error)
&notif.Recipient, &notif.Message, &notif.SentAt, &notif.Status, &notif.Error,
&notif.RetryCount, &notif.NextRetryAt, &notif.LastError)
if err != nil {
return nil, fmt.Errorf("failed to scan notification: %w", err)
@@ -170,3 +184,220 @@ func scanNotification(scanner interface {
return &notif, nil
}
// ─── I-005 retry/DLQ methods ─────────────────────────────────────────────
//
// The four methods below implement the repository half of the I-005
// notification retry + dead-letter queue fix. The retry scheduler loop
// (added alongside these in internal/scheduler/scheduler.go) drives them in
// a strict cycle:
//
// ┌─► ListRetryEligible(ctx, now, maxAttempts, limit)
// │ (oldest overdue failed rows first)
// │ │
// │ ├──► notifier.Send() succeeds → UpdateStatus('sent')
// │ │
// │ ├──► transient failure, retry_count+1 < maxAttempts
// │ │ → RecordFailedAttempt(id, err, next)
// │ │
// │ └──► transient failure, retry_count+1 == maxAttempts
// │ → MarkAsDead(id, err)
// │
// └──◄ Requeue(id) ────── operator "try again" from Dead-letter tab
//
// The WHERE clauses in every UPDATE are scoped by id (not by status), so
// status invariants ("you can't requeue a sent row", "you can't mark a
// dead row as dead again") live in the service layer. The repo layer is
// deliberately thin — it mirrors the postgres CHECK constraints and
// trusts the service to hand it rows in a sane state. The one exception
// is "row must exist": each method returns an error on zero RowsAffected,
// matching the pre-existing UpdateStatus contract above so the scheduler
// can detect a concurrent delete without guessing.
// listRetryEligibleDefaultLimit caps a caller that passes limit <= 0.
// Picked high enough that normal sweeps never hit it (a healthy fleet
// should have tens of overdue rows at most, not thousands), but finite
// so a pathological call (wrong arg in a future refactor, bad MCP tool
// wiring) cannot scan the entire notification_events table.
const listRetryEligibleDefaultLimit = 1000
// ListRetryEligible returns failed notification rows whose next_retry_at
// is due and whose retry_count has not yet reached the configured
// max_attempts.
//
// The WHERE clause is the exact dual of the partial retry-sweep index
// predicate from migration 000016:
//
// WHERE status = 'failed'
// AND next_retry_at IS NOT NULL
// AND next_retry_at <= $1
// AND retry_count < $2
//
// Because the index is partial on the first two conjuncts, the planner
// uses it to satisfy the range scan on next_retry_at; the retry_count
// filter is applied as a residual on the (very small) candidate set.
//
// ORDER BY next_retry_at ASC matches the fairness guarantee called out
// in the test file: oldest overdue row goes first, so a backed-up
// scheduler doesn't starve the notifications that have been waiting
// longest. The same order is what I-001's RetryFailedJobs uses.
func (r *NotificationRepository) ListRetryEligible(ctx context.Context, now time.Time, maxAttempts, limit int) ([]*domain.NotificationEvent, error) {
if limit <= 0 {
limit = listRetryEligibleDefaultLimit
}
rows, err := r.db.QueryContext(ctx, `
SELECT id, type, certificate_id, channel, recipient, message, sent_at, status, error,
retry_count, next_retry_at, last_error
FROM notification_events
WHERE status = 'failed'
AND next_retry_at IS NOT NULL
AND next_retry_at <= $1
AND retry_count < $2
ORDER BY next_retry_at ASC
LIMIT $3
`, now, maxAttempts, limit)
if err != nil {
return nil, fmt.Errorf("failed to query retry-eligible notifications: %w", err)
}
defer rows.Close()
var notifs []*domain.NotificationEvent
for rows.Next() {
notif, err := scanNotification(rows)
if err != nil {
return nil, err
}
notifs = append(notifs, notif)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating retry-eligible notification rows: %w", err)
}
return notifs, nil
}
// RecordFailedAttempt is called by the retry sweep after a notifier.Send
// transient failure. It increments retry_count by exactly 1, overwrites
// last_error and next_retry_at, and deliberately DOES NOT touch status —
// the row must remain 'failed' so the next ListRetryEligible tick can
// pick it up again (unless the service layer has decided this attempt
// exhausts max_attempts, in which case it calls MarkAsDead directly
// instead of calling RecordFailedAttempt).
//
// The +1 is done server-side (SET retry_count = retry_count + 1) rather
// than client-side so a race between two scheduler instances cannot lose
// an attempt. Only one scheduler should be running in a healthy deploy,
// but the cheap arithmetic here survives a split-brain without lying
// about attempt counts.
func (r *NotificationRepository) RecordFailedAttempt(ctx context.Context, id string, lastError string, nextRetryAt time.Time) error {
result, err := r.db.ExecContext(ctx, `
UPDATE notification_events
SET retry_count = retry_count + 1,
last_error = $1,
next_retry_at = $2
WHERE id = $3
`, lastError, nextRetryAt, id)
if err != nil {
return fmt.Errorf("failed to record notification retry attempt: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
// Same "not found" error shape as UpdateStatus above. The scheduler
// logs-and-continues on this so a concurrently-deleted row doesn't
// break the sweep.
return fmt.Errorf("notification not found")
}
return nil
}
// MarkAsDead performs the DLQ transition. Flips status='dead' so the
// partial retry-sweep index drops the row (the index predicate requires
// status='failed'), clears next_retry_at so operator dashboards don't
// claim the row is still "scheduled to retry", writes the final
// last_error for triage, and PRESERVES retry_count as historical evidence
// of how many attempts were burned before the row was declared dead.
// The retry_count value is operator-visible in the Dead letter tab so
// on-call can tell "this notification died on attempt 5" vs "this one
// died on attempt 1 because the recipient webhook was malformed from the
// start".
func (r *NotificationRepository) MarkAsDead(ctx context.Context, id string, lastError string) error {
result, err := r.db.ExecContext(ctx, `
UPDATE notification_events
SET status = 'dead',
next_retry_at = NULL,
last_error = $1
WHERE id = $2
`, lastError, id)
if err != nil {
return fmt.Errorf("failed to mark notification as dead: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("notification not found")
}
return nil
}
// Requeue is the operator "try again" action fired from the Dead letter
// tab. Flips status='pending' so ProcessPendingNotifications picks the
// row up again, resets retry_count to 0 (otherwise the operator's first
// retry would immediately sit at the top of the backoff ladder), clears
// next_retry_at so the row is no longer in the retry-sweep index, and
// clears last_error so the UI doesn't render a stale error badge next
// to a freshly-requeued row.
//
// The service layer is responsible for forbidding Requeue on 'sent' or
// 'read' rows (terminal success states). This repo layer deliberately
// doesn't filter by current status — an operator action has already
// passed a human-in-the-loop guard by the time it reaches the DB, and
// the test suite only exercises the Requeue-from-{dead,failed} paths.
// Matches how UpdateStatus doesn't filter by current status either.
func (r *NotificationRepository) Requeue(ctx context.Context, id string) error {
result, err := r.db.ExecContext(ctx, `
UPDATE notification_events
SET status = 'pending',
retry_count = 0,
next_retry_at = NULL,
last_error = NULL
WHERE id = $1
`, id)
if err != nil {
return fmt.Errorf("failed to requeue notification: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("notification not found")
}
return nil
}
// CountByStatus returns the number of notification_events rows matching the
// given status string. Implemented as a direct COUNT(*) rather than via List
// because List resets filter.PerPage>500 to 50 (see line 57 quirk), which
// would produce undercounts on high-volume deployments. I-005 Phase 2 Green —
// backs StatsService.GetDashboardSummary.NotificationsDead and the Prometheus
// counter certctl_notification_dead_total.
func (r *NotificationRepository) CountByStatus(ctx context.Context, status string) (int64, error) {
var count int64
err := r.db.QueryRowContext(ctx,
`SELECT COUNT(*) FROM notification_events WHERE status = $1`,
status,
).Scan(&count)
if err != nil {
return 0, fmt.Errorf("failed to count notifications by status: %w", err)
}
return count, nil
}
@@ -0,0 +1,398 @@
package postgres_test
import (
"context"
"database/sql"
"testing"
"time"
"github.com/shankar0123/certctl/internal/domain"
"github.com/shankar0123/certctl/internal/repository/postgres"
)
// TestNotificationRepository_RetryMethods is the Phase 1 Red regression test
// for the I-005 fix ("failed webhook/email drops critical alerts — no retry,
// no DLQ, no escalation"). It pins the four new repository methods the
// notification-retry scheduler loop will depend on:
//
// 1. ListRetryEligible(ctx, now, maxAttempts, limit) — the retry-sweep query.
// Returns failed rows whose next_retry_at <= now AND retry_count <
// maxAttempts. Everything else (sent/pending/dead/read, unscheduled
// failures, exhausted rows) is excluded. Ordering is ASC on next_retry_at
// so the oldest overdue row is processed first — same fairness guarantee
// as I-001's RetryFailedJobs.
//
// 2. RecordFailedAttempt(ctx, id, lastError, nextRetryAt) — what the
// scheduler calls after a notifier.Send() transient failure. Must
// increment retry_count by exactly 1, overwrite last_error, overwrite
// next_retry_at, and KEEP status='failed' so the row is still a
// candidate for ListRetryEligible on the next sweep.
//
// 3. MarkAsDead(ctx, id, lastError) — the DLQ transition when retry_count
// hits max_attempts. Flips status to 'dead', clears next_retry_at
// (so the partial retry-sweep index drops the row), preserves
// retry_count as historical evidence of how many attempts were spent,
// and records the final transient error for operator triage.
//
// 4. Requeue(ctx, id) — the operator "try again" action fired from the
// Dead letter tab in the UI. Flips status back to 'pending' (which is
// what ProcessPendingNotifications picks up), resets retry_count to 0,
// clears next_retry_at AND last_error. Valid from both 'dead' (normal
// path) and 'failed' (operator rescuing a stuck row before the sweep
// fires). Invalid from 'sent' / 'read' (terminal success states).
//
// Red-until-Green: this test file compiles only after Phase 2 adds
// ListRetryEligible, RecordFailedAttempt, MarkAsDead, and Requeue to
// postgres.NotificationRepository. Every subtest is testcontainers-gated
// via getTestDB(t).freshSchema(t), so `go test -short` skips them and CI
// without Docker stays green. Fixtures are inserted via raw SQL — Create()
// doesn't know about the new retry columns pre-Green, so the test bypasses
// it entirely. certificate_id is left NULL on every fixture row to dodge
// the FK to managed_certificates (the column is nullable per migration
// 000001, line 212).
// TestNotificationRepository_ListRetryEligible exercises the retry-sweep
// query. The test fixture deliberately seeds one row per excluded and
// included case so a single call to ListRetryEligible is the oracle:
// every row the query returns must be an "include", every row it skips
// must be an "exclude".
func TestNotificationRepository_ListRetryEligible(t *testing.T) {
tdb := getTestDB(t)
db := tdb.freshSchema(t)
repo := postgres.NewNotificationRepository(db)
ctx := context.Background()
// Pin `now` so the test is deterministic. All "overdue" rows have
// next_retry_at < now; all "future" rows have next_retry_at > now.
now := time.Now().UTC().Truncate(time.Microsecond)
past := now.Add(-5 * time.Minute)
future := now.Add(5 * time.Minute)
// Fixture grid — each row pins a specific edge of the query:
//
// notif-overdue-1 status=failed, retry=1, next=past → INCLUDE
// notif-overdue-2 status=failed, retry=3, next=past → INCLUDE
// (later next_retry_at than notif-overdue-1 by a
// few seconds so ORDER BY is observable)
// notif-future status=failed, retry=2, next=future → EXCLUDE
// (CA hasn't hit backoff yet)
// notif-exhausted status=failed, retry=5, next=past → EXCLUDE
// (retry_count >= max_attempts — sweep must skip
// so we don't re-promote a row that's about to
// be marked dead)
// notif-pending status=pending, retry=0, next=NULL → EXCLUDE
// (healthy in-flight notification)
// notif-sent status=sent, retry=0, next=NULL → EXCLUDE
// notif-dead status=dead, retry=5, next=NULL → EXCLUDE
// (already in DLQ — retrying it would reset the
// dead-letter counter and lie to the operator)
// notif-unsched status=failed, retry=1, next=NULL → EXCLUDE
// (failed row that somehow lost its next_retry_at
// — partial index predicate strips it, and the
// WHERE clause must mirror the predicate)
rawInsert := func(id, status string, retryCount int, nextRetryAt *time.Time) {
t.Helper()
_, err := db.ExecContext(ctx, `
INSERT INTO notification_events (
id, type, channel, recipient, message, status, retry_count, next_retry_at
) VALUES ($1, 'ExpirationWarning', 'Webhook', 'https://hooks.example.com/x',
'seed', $2, $3, $4)
`, id, status, retryCount, nextRetryAt)
if err != nil {
t.Fatalf("raw insert for %s failed: %v", id, err)
}
}
overdue1 := past.Add(-30 * time.Second) // oldest overdue
overdue2 := past // second-oldest overdue
rawInsert("notif-overdue-1", "failed", 1, &overdue1)
rawInsert("notif-overdue-2", "failed", 3, &overdue2)
rawInsert("notif-future", "failed", 2, &future)
rawInsert("notif-exhausted", "failed", 5, &overdue1)
rawInsert("notif-pending", "pending", 0, nil)
rawInsert("notif-sent", "sent", 0, nil)
rawInsert("notif-dead", "dead", 5, nil)
rawInsert("notif-unsched", "failed", 1, nil)
// Act — the central call under test.
got, err := repo.ListRetryEligible(ctx, now, 5, 100)
if err != nil {
t.Fatalf("ListRetryEligible failed: %v", err)
}
// Assert inclusion: exactly the two overdue rows.
if len(got) != 2 {
t.Fatalf("ListRetryEligible returned %d rows, want 2 (overdue-1 + overdue-2); got IDs = %v",
len(got), collectIDs(got))
}
// Assert ordering: ASC on next_retry_at. notif-overdue-1 has the
// earlier next_retry_at (past - 30s), so it must come first.
if got[0].ID != "notif-overdue-1" {
t.Errorf("ListRetryEligible[0].ID = %q, want %q (ORDER BY next_retry_at ASC — oldest first)",
got[0].ID, "notif-overdue-1")
}
if got[1].ID != "notif-overdue-2" {
t.Errorf("ListRetryEligible[1].ID = %q, want %q", got[1].ID, "notif-overdue-2")
}
// Assert limit is respected. Re-run with limit=1 and confirm only the
// oldest overdue row comes back — this is what lets the scheduler
// chunk its sweep under load.
limited, err := repo.ListRetryEligible(ctx, now, 5, 1)
if err != nil {
t.Fatalf("ListRetryEligible(limit=1) failed: %v", err)
}
if len(limited) != 1 || limited[0].ID != "notif-overdue-1" {
t.Errorf("ListRetryEligible(limit=1) returned %v, want [notif-overdue-1]", collectIDs(limited))
}
// Assert maxAttempts is respected. Re-run with maxAttempts=2 — this
// flips notif-overdue-2 (retry_count=3) into the "exhausted" bucket
// and must not come back. Only notif-overdue-1 (retry_count=1) qualifies.
capped, err := repo.ListRetryEligible(ctx, now, 2, 100)
if err != nil {
t.Fatalf("ListRetryEligible(maxAttempts=2) failed: %v", err)
}
if len(capped) != 1 || capped[0].ID != "notif-overdue-1" {
t.Errorf("ListRetryEligible(maxAttempts=2) returned %v, want [notif-overdue-1]", collectIDs(capped))
}
}
// TestNotificationRepository_RecordFailedAttempt verifies the retry-bump
// UPDATE. The contract is: retry_count += 1, last_error = new msg,
// next_retry_at = new time, status STAYS 'failed'. Any other side effect
// (status flip, retry_count reset, sent_at mutation) is a bug.
func TestNotificationRepository_RecordFailedAttempt(t *testing.T) {
tdb := getTestDB(t)
db := tdb.freshSchema(t)
repo := postgres.NewNotificationRepository(db)
ctx := context.Background()
initialRetry := past()
_, err := db.ExecContext(ctx, `
INSERT INTO notification_events (
id, type, channel, recipient, message, status, retry_count, next_retry_at, last_error
) VALUES ('notif-attempt-1', 'ExpirationWarning', 'Webhook',
'https://hooks.example.com/x', 'seed', 'failed', 2, $1, 'first failure')
`, initialRetry)
if err != nil {
t.Fatalf("seed failed: %v", err)
}
nextTry := time.Now().UTC().Add(8 * time.Minute).Truncate(time.Microsecond)
if err := repo.RecordFailedAttempt(ctx, "notif-attempt-1", "connection refused", nextTry); err != nil {
t.Fatalf("RecordFailedAttempt failed: %v", err)
}
// Re-read the row directly from the DB (bypassing the repo's List()
// filter logic) so the assertion tests storage, not query plumbing.
var (
gotStatus string
gotRetryCount int
gotNextRetry *time.Time
gotLastError *string
)
err = db.QueryRowContext(ctx, `
SELECT status, retry_count, next_retry_at, last_error
FROM notification_events WHERE id = 'notif-attempt-1'
`).Scan(&gotStatus, &gotRetryCount, &gotNextRetry, &gotLastError)
if err != nil {
t.Fatalf("post-update SELECT failed: %v", err)
}
if gotStatus != "failed" {
t.Errorf("status = %q, want 'failed' (RecordFailedAttempt must preserve status so sweep re-picks the row)", gotStatus)
}
if gotRetryCount != 3 {
t.Errorf("retry_count = %d, want 3 (must increment by exactly 1 from seeded 2)", gotRetryCount)
}
if gotNextRetry == nil || !gotNextRetry.Equal(nextTry) {
t.Errorf("next_retry_at = %v, want %v", gotNextRetry, nextTry)
}
if gotLastError == nil || *gotLastError != "connection refused" {
t.Errorf("last_error = %v, want 'connection refused'", gotLastError)
}
// Negative path: unknown id must surface "not found" — mirrors the
// existing UpdateStatus contract so the scheduler can detect a
// concurrent delete without guessing.
if err := repo.RecordFailedAttempt(ctx, "notif-does-not-exist", "oops", nextTry); err == nil {
t.Errorf("RecordFailedAttempt on unknown id succeeded; want error")
}
}
// TestNotificationRepository_MarkAsDead verifies the DLQ transition. Flips
// status to 'dead', clears next_retry_at (so the partial retry-sweep
// index drops the row), writes final last_error, preserves retry_count as
// evidence of how many attempts were burned.
func TestNotificationRepository_MarkAsDead(t *testing.T) {
tdb := getTestDB(t)
db := tdb.freshSchema(t)
repo := postgres.NewNotificationRepository(db)
ctx := context.Background()
lastAttempt := past()
_, err := db.ExecContext(ctx, `
INSERT INTO notification_events (
id, type, channel, recipient, message, status, retry_count, next_retry_at, last_error
) VALUES ('notif-dlq-1', 'ExpirationWarning', 'Webhook',
'https://hooks.example.com/x', 'seed', 'failed', 5, $1, 'prior failure')
`, lastAttempt)
if err != nil {
t.Fatalf("seed failed: %v", err)
}
if err := repo.MarkAsDead(ctx, "notif-dlq-1", "max attempts exceeded"); err != nil {
t.Fatalf("MarkAsDead failed: %v", err)
}
var (
gotStatus string
gotRetryCount int
gotNextRetry *time.Time
gotLastError *string
)
err = db.QueryRowContext(ctx, `
SELECT status, retry_count, next_retry_at, last_error
FROM notification_events WHERE id = 'notif-dlq-1'
`).Scan(&gotStatus, &gotRetryCount, &gotNextRetry, &gotLastError)
if err != nil {
t.Fatalf("post-update SELECT failed: %v", err)
}
if gotStatus != "dead" {
t.Errorf("status = %q, want 'dead' (DLQ transition)", gotStatus)
}
if gotNextRetry != nil {
// next_retry_at MUST be NULL post-DLQ — the partial retry-sweep
// index predicate is `status='failed' AND next_retry_at IS NOT NULL`,
// so leaving a value here would only waste space; the status='dead'
// half of the predicate already excludes the row from the sweep,
// but operator dashboards treat a populated next_retry_at as "still
// scheduled", which would be a lie.
t.Errorf("next_retry_at = %v, want NULL (dead rows are terminal, not rescheduled)", gotNextRetry)
}
if gotRetryCount != 5 {
// retry_count is audit evidence — how many attempts were burned
// before the row was declared dead. Don't clobber it.
t.Errorf("retry_count = %d, want 5 preserved (evidence of burned attempts)", gotRetryCount)
}
if gotLastError == nil || *gotLastError != "max attempts exceeded" {
t.Errorf("last_error = %v, want 'max attempts exceeded'", gotLastError)
}
// Negative path: unknown id must surface "not found".
if err := repo.MarkAsDead(ctx, "notif-does-not-exist", "oops"); err == nil {
t.Errorf("MarkAsDead on unknown id succeeded; want error")
}
}
// TestNotificationRepository_Requeue verifies the operator "try again"
// flow exposed by the Dead letter tab. The contract:
//
// - Flips status → 'pending' regardless of prior ('dead' or 'failed').
// - Resets retry_count to 0 — a manual requeue restarts the backoff
// ladder; otherwise the operator's first retry would already be at
// "wait 32 minutes" which defeats the point.
// - Clears next_retry_at so the row is no longer in the retry-sweep
// index (the scheduler would otherwise try to retry it *again* a
// few seconds later).
// - Clears last_error — the UI shouldn't show a stale error next to
// a freshly-requeued row.
func TestNotificationRepository_Requeue(t *testing.T) {
tdb := getTestDB(t)
db := tdb.freshSchema(t)
repo := postgres.NewNotificationRepository(db)
ctx := context.Background()
// Two fixtures — one dead (DLQ path, the normal case) and one failed
// (operator rescuing a stuck-in-retry row before the sweep fires).
// Both must accept Requeue; a status='sent' or 'read' row must NOT.
_, err := db.ExecContext(ctx, `
INSERT INTO notification_events (id, type, channel, recipient, message, status, retry_count, last_error)
VALUES
('notif-dead-ready', 'ExpirationWarning', 'Webhook', 'https://h/x', 'seed', 'dead', 5, 'gave up'),
('notif-failed-hot', 'ExpirationWarning', 'Webhook', 'https://h/x', 'seed', 'failed', 2, 'transient'),
('notif-sent-done', 'ExpirationWarning', 'Webhook', 'https://h/x', 'seed', 'sent', 0, NULL)
`)
if err != nil {
t.Fatalf("seed failed: %v", err)
}
// Happy path 1: requeue a dead row.
if err := repo.Requeue(ctx, "notif-dead-ready"); err != nil {
t.Fatalf("Requeue(dead) failed: %v", err)
}
assertRequeued(t, db, ctx, "notif-dead-ready")
// Happy path 2: requeue a failed row.
if err := repo.Requeue(ctx, "notif-failed-hot"); err != nil {
t.Fatalf("Requeue(failed) failed: %v", err)
}
assertRequeued(t, db, ctx, "notif-failed-hot")
// Negative path: Requeue on unknown id is "not found", not a no-op
// silent success — the handler needs to surface a 404 to the operator.
if err := repo.Requeue(ctx, "notif-does-not-exist"); err == nil {
t.Errorf("Requeue on unknown id succeeded; want error")
}
}
// ─── Helpers ──────────────────────────────────────────────────────────────
// past returns a stable "5 minutes ago" time for fixture seeding. Truncated
// to microseconds so round-tripping through Postgres TIMESTAMPTZ doesn't
// introduce a sub-microsecond diff that breaks equality assertions.
func past() time.Time {
return time.Now().UTC().Add(-5 * time.Minute).Truncate(time.Microsecond)
}
// collectIDs pulls the IDs out of a slice of events for readable test
// failure output. Without it, a failure prints "[0xc00012... 0xc00013...]"
// which is useless when diagnosing a mis-sorted sweep.
func collectIDs(events []*domain.NotificationEvent) []string {
ids := make([]string, len(events))
for i, e := range events {
ids[i] = e.ID
}
return ids
}
// assertRequeued is the shared "did Requeue do exactly what the contract
// promises?" assertion. Re-reads the row and checks all four mutations
// atomically so every Requeue test path gets the same rigor: status flipped
// to 'pending', retry_count reset to 0, next_retry_at cleared, last_error
// cleared. Any one of these missing is a contract violation.
func assertRequeued(t *testing.T, db *sql.DB, ctx context.Context, id string) {
t.Helper()
var (
gotStatus string
gotRetryCount int
gotNextRetry *time.Time
gotLastError *string
)
err := db.QueryRowContext(ctx, `
SELECT status, retry_count, next_retry_at, last_error
FROM notification_events WHERE id = $1
`, id).Scan(&gotStatus, &gotRetryCount, &gotNextRetry, &gotLastError)
if err != nil {
t.Fatalf("post-Requeue SELECT for %s failed: %v", id, err)
}
if gotStatus != "pending" {
t.Errorf("%s.status = %q, want 'pending' (Requeue must re-open the row for ProcessPendingNotifications)",
id, gotStatus)
}
if gotRetryCount != 0 {
t.Errorf("%s.retry_count = %d, want 0 (Requeue restarts the backoff ladder so the operator's first retry isn't already at hour-long waits)",
id, gotRetryCount)
}
if gotNextRetry != nil {
t.Errorf("%s.next_retry_at = %v, want NULL (a fresh pending row must not sit in the retry-sweep index)",
id, gotNextRetry)
}
if gotLastError != nil {
t.Errorf("%s.last_error = %v, want NULL (stale errors on freshly-requeued rows mislead the UI)",
id, *gotLastError)
}
}
+115 -39
View File
@@ -34,8 +34,14 @@ type AgentServicer interface {
}
// NotificationServicer defines the interface for notification processing used by the scheduler.
//
// RetryFailedNotifications was added to close coverage gap I-005: the retry
// sweep transitions eligible Failed notifications to Pending on an independent
// tick, using exponential backoff with a 1h cap and a 5-attempt DLQ budget.
// Mirrors the I-001 job retry loop topology.
type NotificationServicer interface {
ProcessPendingNotifications(ctx context.Context) error
RetryFailedNotifications(ctx context.Context) error
}
// NetworkScanServicer defines the interface for network scanning used by the scheduler.
@@ -67,44 +73,46 @@ type JobReaperService interface {
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
// and notification processing.
type Scheduler struct {
renewalService RenewalServicer
jobService JobServicer
agentService AgentServicer
notificationService NotificationServicer
networkScanService NetworkScanServicer
digestService DigestServicer
healthCheckService HealthCheckServicer
cloudDiscoveryService CloudDiscoveryServicer
jobReaper JobReaperService
logger *slog.Logger
renewalService RenewalServicer
jobService JobServicer
agentService AgentServicer
notificationService NotificationServicer
networkScanService NetworkScanServicer
digestService DigestServicer
healthCheckService HealthCheckServicer
cloudDiscoveryService CloudDiscoveryServicer
jobReaper JobReaperService
logger *slog.Logger
// Configurable tick intervals
renewalCheckInterval time.Duration
jobProcessorInterval time.Duration
jobRetryInterval time.Duration
agentHealthCheckInterval time.Duration
notificationProcessInterval time.Duration
shortLivedExpiryCheckInterval time.Duration
networkScanInterval time.Duration
digestInterval time.Duration
healthCheckInterval time.Duration
cloudDiscoveryInterval time.Duration
jobTimeoutInterval time.Duration
awaitingCSRTimeout time.Duration
awaitingApprovalTimeout time.Duration
renewalCheckInterval time.Duration
jobProcessorInterval time.Duration
jobRetryInterval time.Duration
agentHealthCheckInterval time.Duration
notificationProcessInterval time.Duration
notificationRetryInterval time.Duration
shortLivedExpiryCheckInterval time.Duration
networkScanInterval time.Duration
digestInterval time.Duration
healthCheckInterval time.Duration
cloudDiscoveryInterval time.Duration
jobTimeoutInterval time.Duration
awaitingCSRTimeout time.Duration
awaitingApprovalTimeout time.Duration
// Idempotency guards: prevent duplicate execution of slow jobs
renewalCheckRunning atomic.Bool
jobProcessorRunning atomic.Bool
jobRetryRunning atomic.Bool
agentHealthCheckRunning atomic.Bool
notificationProcessRunning atomic.Bool
shortLivedExpiryCheckRunning atomic.Bool
networkScanRunning atomic.Bool
digestRunning atomic.Bool
healthCheckRunning atomic.Bool
cloudDiscoveryRunning atomic.Bool
jobTimeoutRunning atomic.Bool
renewalCheckRunning atomic.Bool
jobProcessorRunning atomic.Bool
jobRetryRunning atomic.Bool
agentHealthCheckRunning atomic.Bool
notificationProcessRunning atomic.Bool
notificationRetryRunning atomic.Bool
shortLivedExpiryCheckRunning atomic.Bool
networkScanRunning atomic.Bool
digestRunning atomic.Bool
healthCheckRunning atomic.Bool
cloudDiscoveryRunning atomic.Bool
jobTimeoutRunning atomic.Bool
// Graceful shutdown: wait for in-flight work to complete
wg sync.WaitGroup
@@ -133,6 +141,7 @@ func NewScheduler(
jobRetryInterval: 5 * time.Minute,
agentHealthCheckInterval: 2 * time.Minute,
notificationProcessInterval: 1 * time.Minute,
notificationRetryInterval: 2 * time.Minute,
shortLivedExpiryCheckInterval: 30 * time.Second,
networkScanInterval: 6 * time.Hour,
digestInterval: 24 * time.Hour,
@@ -180,6 +189,13 @@ func (s *Scheduler) SetNotificationProcessInterval(d time.Duration) {
s.notificationProcessInterval = d
}
// SetNotificationRetryInterval configures the interval for the failed-notification
// retry sweep (coverage gap I-005). Defaults to 2 minutes; honors
// CERTCTL_NOTIFICATION_RETRY_INTERVAL when wired from config.
func (s *Scheduler) SetNotificationRetryInterval(d time.Duration) {
s.notificationRetryInterval = d
}
// SetNetworkScanInterval configures the interval for network scanning.
func (s *Scheduler) SetNetworkScanInterval(d time.Duration) {
s.networkScanInterval = d
@@ -212,7 +228,6 @@ func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
s.cloudDiscoveryInterval = d
}
// SetJobReaperService sets the job reaper service (I-003).
func (s *Scheduler) SetJobReaperService(jr JobReaperService) {
s.jobReaper = jr
@@ -232,6 +247,7 @@ func (s *Scheduler) SetAwaitingCSRTimeout(d time.Duration) {
func (s *Scheduler) SetAwaitingApprovalTimeout(d time.Duration) {
s.awaitingApprovalTimeout = d
}
// Start initiates all background scheduler loops. It returns a channel that signals
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
@@ -242,10 +258,11 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
// Track all loop goroutines in the WaitGroup so WaitForCompletion
// blocks until they've fully exited (prevents test races).
// Base count is 7: renewal, job processor, job retry (I-001),
// job timeout (I-003), agent health, notification, short-lived expiry. Optional loops
// (network scan, digest, health check, cloud discovery) add to this.
loopCount := 7
// Base count is 8: renewal, job processor, job retry (I-001),
// job timeout (I-003), agent health, notification, notification retry
// (I-005), short-lived expiry. Optional loops (network scan, digest,
// health check, cloud discovery) add to this.
loopCount := 8
if s.networkScanService != nil {
loopCount++
}
@@ -266,6 +283,7 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
go func() { defer s.wg.Done(); s.jobTimeoutLoop(ctx) }()
go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }()
go func() { defer s.wg.Done(); s.notificationRetryLoop(ctx) }()
go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }()
if s.networkScanService != nil {
go func() { defer s.wg.Done(); s.networkScanLoop(ctx) }()
@@ -597,6 +615,64 @@ func (s *Scheduler) runNotificationProcess(ctx context.Context) {
}
}
// notificationRetryLoop runs every notificationRetryInterval and transitions
// eligible Failed notifications back to Pending so the notification processor
// can pick them up again. Closes coverage gap I-005 — NotificationService.
// RetryFailedNotifications had no runtime caller prior to this loop being
// wired. Runs immediately on start, then every interval.
// Uses atomic.Bool to prevent duplicate execution if the previous retry sweep
// is still running. Mirrors the I-001 jobRetryLoop topology byte-for-byte.
func (s *Scheduler) notificationRetryLoop(ctx context.Context) {
ticker := time.NewTicker(s.notificationRetryInterval)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.notificationRetryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationRetryRunning.Store(false)
s.runNotificationRetry(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.notificationRetryRunning.CompareAndSwap(false, true) {
s.logger.Warn("notification retry still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.notificationRetryRunning.Store(false)
s.runNotificationRetry(ctx)
}()
}
}
}
// runNotificationRetry executes a single failed-notification retry cycle with
// error recovery. Uses a 2-minute per-tick timeout matching runJobRetry;
// RetryFailedNotifications issues one SELECT and one UPDATE per eligible row
// (cheap), so this headroom covers very large failure backlogs without
// starving the loop. The service layer swallows per-row send errors (mirrors
// ProcessPendingNotifications) and only returns the List error from the
// initial ListRetryEligible call.
func (s *Scheduler) runNotificationRetry(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
if err := s.notificationService.RetryFailedNotifications(opCtx); err != nil {
s.logger.Error("notification retry failed",
"error", err,
"interval", s.notificationRetryInterval.String())
} else {
s.logger.Debug("notification retry completed")
}
}
// shortLivedExpiryCheckLoop runs every shortLivedExpiryCheckInterval and marks expired
// short-lived certificates. For certs with TTL < 1 hour, expiry IS revocation —
// no CRL/OCSP needed.
+267
View File
@@ -195,12 +195,25 @@ func (m *mockAgentService) MarkStaleAgentsOffline(ctx context.Context, interval
}
// mockNotificationService is a mock implementation for testing.
//
// Tracks ProcessPendingNotifications and RetryFailedNotifications separately.
// retrySlowDelay and retryShouldError let tests exercise the retry loop
// independently of the processor loop without coupling their timing/failure
// modes (coverage gap I-005 — prior to the notificationRetryLoop being wired,
// RetryFailedNotifications had no runtime caller).
type mockNotificationService struct {
mu sync.Mutex
callCount int
callTimes []time.Time
slowDelay time.Duration
shouldError bool
// Retry loop tracking (coverage gap I-005)
retryCallCount int
retryCallTimes []time.Time
retrySlowDelay time.Duration
retryShouldError bool
retryCtxHasDeadline bool
}
func (m *mockNotificationService) ProcessPendingNotifications(ctx context.Context) error {
@@ -223,6 +236,42 @@ func (m *mockNotificationService) ProcessPendingNotifications(ctx context.Contex
return nil
}
// RetryFailedNotifications is the scheduler-driven counterpart to
// ProcessPendingNotifications that closes coverage gap I-005. Prior to the
// notificationRetryLoop being wired, notifications that hit status='failed'
// orphaned there forever — no retry, no DLQ, no escalation. The service-layer
// method exists to sweep failed rows whose next_retry_at has elapsed, but
// without a scheduler caller the sweep never runs in production.
//
// This mock mirrors mockJobService.RetryFailedJobs's shape: a retry-only field
// cluster so callers can dial retrySlowDelay / retryShouldError without
// perturbing ProcessPendingNotifications's timing, and retryCtxHasDeadline so
// the ContextDeadlineRespected test can assert the scheduler is passing a
// per-tick context.WithTimeout rather than the raw shutdown ctx.
func (m *mockNotificationService) RetryFailedNotifications(ctx context.Context) error {
m.mu.Lock()
m.retryCallCount++
m.retryCallTimes = append(m.retryCallTimes, time.Now())
// Track whether context has a deadline set — the scheduler must wrap each
// tick in a bounded context so a hung sweep can't stall shutdown.
_, hasDeadline := ctx.Deadline()
m.retryCtxHasDeadline = hasDeadline
m.mu.Unlock()
if m.retrySlowDelay > 0 {
select {
case <-time.After(m.retrySlowDelay):
case <-ctx.Done():
return ctx.Err()
}
}
if m.retryShouldError {
return context.Canceled
}
return nil
}
// mockNetworkScanService is a mock implementation for testing.
type mockNetworkScanService struct {
mu sync.Mutex
@@ -1358,3 +1407,221 @@ func TestScheduler_JobTimeoutLoop_ContextDeadlineRespected(t *testing.T) {
}
t.Log("timeout reaper context deadline verified")
}
// ─── NotificationRetryLoop tests (coverage gap I-005) ────────────────────────
//
// These four tests are the scheduler-level Red half of the I-005 fix. They
// mirror the I-001 jobRetryLoop triplet (CallsService / IdempotencyGuard /
// WaitForCompletion) plus the I-003 ContextDeadlineRespected shape.
//
// All four use the same "quiet every other loop" pattern so the only tick
// activity visible on notificationMock is the retry loop under test. JobTimeout
// is intentionally left unconfigured — SetJobReaperService isn't called, so the
// timeout loop is dormant (same convention the I-001 tests follow).
// TestScheduler_NotificationRetryLoop_CallsService verifies that the
// notification retry loop invokes NotificationService.RetryFailedNotifications
// on each tick. Closes coverage gap I-005 — prior to the loop being wired,
// RetryFailedNotifications had no runtime caller and failed notification_events
// rows orphaned at status='failed' forever (no retry, no DLQ, no escalation).
//
// Unlike the jobRetryLoop test, there is no maxRetries advisory constant to
// forward: the max_attempts limit on notification retries lives on the row
// itself (retry_count column introduced by migration 000016), not in the call
// signature.
func TestScheduler_NotificationRetryLoop_CallsService(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
// Quiet every other loop so only the retry loop's calls are visible on notificationMock.
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetNotificationRetryInterval(50 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startedChan := sched.Start(ctx)
<-startedChan
// Run long enough for the immediate start + at least one tick.
time.Sleep(200 * time.Millisecond)
cancel()
_ = sched.WaitForCompletion(2 * time.Second)
notificationMock.mu.Lock()
retryCount := notificationMock.retryCallCount
notificationMock.mu.Unlock()
if retryCount < 1 {
t.Fatalf("expected notification retry service to be called at least once, got %d", retryCount)
}
t.Logf("notification retry loop called %d times", retryCount)
}
// TestScheduler_NotificationRetryLoop_IdempotencyGuard verifies that a slow
// retry sweep does not cause overlapping executions. Mirrors the shape of
// TestScheduler_JobRetryLoop_IdempotencyGuard.
//
// The guard is the atomic.Bool notificationRetryRunning in scheduler.go.
// Without it, a 100ms tick against a 150ms operation would fire ~4 times in
// 400ms; with the guard we expect ~23 calls. Anything above 3 is logged as a
// warning (not a hard failure) so CI timing noise doesn't produce flakes.
func TestScheduler_NotificationRetryLoop_IdempotencyGuard(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{
retrySlowDelay: 150 * time.Millisecond, // slower than tick interval
}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetNotificationRetryInterval(100 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startedChan := sched.Start(ctx)
<-startedChan
time.Sleep(400 * time.Millisecond)
notificationMock.mu.Lock()
retryCount := notificationMock.retryCallCount
notificationMock.mu.Unlock()
// With a 150ms sweep and 100ms interval, a functioning guard should yield
// roughly 23 calls (immediate + any ticks whose previous sweep finished).
// Anything above 3 suggests the guard isn't holding.
if retryCount > 3 {
t.Logf("WARNING: retry called %d times in 400ms with 100ms interval and 150ms sweep — guard may not be working", retryCount)
}
t.Logf("notification retry idempotency guard: %d calls in 400ms (100ms interval, 150ms sweep)", retryCount)
cancel()
if err := sched.WaitForCompletion(2 * time.Second); err != nil {
t.Fatalf("WaitForCompletion should succeed: %v", err)
}
}
// TestScheduler_NotificationRetryLoop_WaitForCompletion verifies that a retry
// sweep still in flight at shutdown is awaited by WaitForCompletion — the same
// sync.WaitGroup contract every other loop satisfies. If the loop were to
// return early without registering its goroutine on s.wg, this test would
// either (a) observe retryCount==0 because the immediate-start sweep was never
// launched, or (b) observe WaitForCompletion returning before the in-flight
// sweep finished (elapsed < retrySlowDelay).
func TestScheduler_NotificationRetryLoop_WaitForCompletion(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{
retrySlowDelay: 100 * time.Millisecond,
}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetNotificationRetryInterval(50 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
startedChan := sched.Start(ctx)
<-startedChan
// Let the immediate-start retry goroutine begin its 100ms sweep.
time.Sleep(30 * time.Millisecond)
// Initiate shutdown mid-sweep.
cancel()
start := time.Now()
err := sched.WaitForCompletion(5 * time.Second)
elapsed := time.Since(start)
if err != nil {
t.Fatalf("WaitForCompletion should not error: %v", err)
}
if elapsed > 5*time.Second {
t.Fatalf("WaitForCompletion took longer than expected: %v", elapsed)
}
notificationMock.mu.Lock()
retryCount := notificationMock.retryCallCount
notificationMock.mu.Unlock()
if retryCount < 1 {
t.Fatalf("expected notification retry service to have started at least once before shutdown, got %d", retryCount)
}
t.Logf("notification retry loop graceful shutdown completed in %v after %d in-flight sweep(s)", elapsed, retryCount)
}
// TestScheduler_NotificationRetryLoop_ContextDeadlineRespected verifies that
// each tick of the retry loop receives a context with a deadline set. Mirrors
// TestScheduler_JobTimeoutLoop_ContextDeadlineRespected.
//
// The per-tick context.WithTimeout exists so a pathologically slow sweep (e.g.
// a misbehaving DB lock) can't stall the rest of the scheduler's shutdown
// sequence indefinitely — the wrapping context expires, the sweep returns
// ctx.Err(), and the WaitGroup.Done() fires on schedule.
func TestScheduler_NotificationRetryLoop_ContextDeadlineRespected(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
renewalMock := &mockRenewalService{}
jobMock := &mockJobService{}
agentMock := &mockAgentService{}
notificationMock := &mockNotificationService{}
networkMock := &mockNetworkScanService{}
sched := NewScheduler(renewalMock, jobMock, agentMock, notificationMock, networkMock, logger)
sched.SetRenewalCheckInterval(10 * time.Second)
sched.SetJobProcessorInterval(10 * time.Second)
sched.SetAgentHealthCheckInterval(10 * time.Second)
sched.SetNotificationProcessInterval(10 * time.Second)
sched.SetNetworkScanInterval(10 * time.Second)
sched.SetJobRetryInterval(10 * time.Second)
sched.SetNotificationRetryInterval(50 * time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<-sched.Start(ctx)
time.Sleep(100 * time.Millisecond)
cancel()
if err := sched.WaitForCompletion(2 * time.Second); err != nil {
t.Fatalf("WaitForCompletion: %v", err)
}
notificationMock.mu.Lock()
hasDeadline := notificationMock.retryCtxHasDeadline
notificationMock.mu.Unlock()
if !hasDeadline {
t.Fatal("expected notification retry context to have a deadline set, but none found")
}
t.Log("notification retry context deadline verified")
}
+242
View File
@@ -10,6 +10,40 @@ import (
"github.com/shankar0123/certctl/internal/repository"
)
// I-005 retry + DLQ knobs. These pin the operator-approved retry budget and
// the defense-in-depth ceiling on the exponential backoff curve used by
// RetryFailedNotifications.
//
// Values match those the Phase 1 Red tests assert against (see
// i005MaxAttempts / i005BackoffCap in notification_test.go:600-608) — the
// production identifiers are distinct because this file and its tests share
// `package service`, so a single shared name would collide at compile time.
// The test comment explicitly notes "Phase 2 is free to thread this from
// config"; when that wiring lands, these become package-level defaults the
// scheduler can override. For now they are the single source of truth.
const (
// notifRetryMaxAttempts is the attempt budget *before* the current
// attempt: a row at retry_count == notifRetryMaxAttempts-1 that fails
// this tick transitions to 'dead' instead of being re-armed. The
// repository's ListRetryEligible filter also uses this value as a
// guard (`AND retry_count < $2`) so a DLQ row is never re-swept.
notifRetryMaxAttempts = 5
// notifRetryBackoffCap is the 1h ceiling on `2^retry_count` minutes.
// With max_attempts=5 the deepest actually-schedulable wait is 2^3=8m
// (retry_count=3 → 8m, then retry_count=4 → 'dead'), so the cap is a
// ceiling-assertion today — but it must stay in place so a later
// increase in max_attempts cannot push next_retry_at past 1h without
// an explicit policy decision.
notifRetryBackoffCap = time.Hour
// notifRetrySweepLimit caps a single retry tick at this many rows so
// a large burst of dead-letter-bound mail cannot monopolize the 2m
// tick budget. Mirrors the 1000-row cap on ProcessPendingNotifications
// at notification.go:244 for operational symmetry.
notifRetrySweepLimit = 1000
)
// NotificationService provides business logic for managing notifications.
type NotificationService struct {
notifRepo repository.NotificationRepository
@@ -373,3 +407,211 @@ func (s *NotificationService) GetNotification(ctx context.Context, id string) (*
func (s *NotificationService) MarkAsRead(ctx context.Context, id string) error {
return s.notifRepo.UpdateStatus(ctx, id, "read", time.Now())
}
// ─── I-005 retry + DLQ surface (Phase 2 Green) ───────────────────────────
//
// The three methods below close the retry loop the Phase 1 Red tests pin at
// notification_test.go:600-917 and notification_handler_test.go:443-519:
//
// 1. RetryFailedNotifications — scheduler entry point. Pulls failed rows
// whose next_retry_at has elapsed, retries delivery, rewrites retry
// bookkeeping per the pre-increment backoff contract, and transitions
// exhausted rows to 'dead' (DLQ). Per-row errors never bubble — a
// single bad recipient cannot stall the tick. Mirrors the ordering
// the ProcessPendingNotifications loop uses at notification.go:242.
//
// 2. RequeueNotification — operator-driven escape hatch from 'dead' back
// to 'pending'. Pass-through to the repo's Requeue method with clean
// error wrapping so repo-layer failures ("pg: deadlock detected")
// surface in the UI instead of silently succeeding.
//
// 3. ListNotificationsByStatus — Dead letter tab support. Thin filter
// wrapper around the existing List query; the Phase 2 Green handler
// routes `?status=…` through this method while preserving the
// unfiltered path through ListNotifications (handler_test pins both).
//
// Sibling scheduler loops I-001 (job retry) and I-003 (job timeout) already
// ship the 10-loop topology these methods plug into; the 11th loop added
// by this milestone calls RetryFailedNotifications on a 2m tick, matching
// the CERTCTL_NOTIFICATION_RETRY_INTERVAL default pinned in config/
// scheduler Phase 2 Green edits that follow this one.
// RetryFailedNotifications is the scheduler entry point for the I-005
// retry sweep. Semantics (pinned by notification_test.go:635-843):
//
// - A ListRetryEligible failure short-circuits with a wrapped error so
// the caller's tick counter reflects the outage. Crucially, zero
// notifier.Send calls fire in this path — we never got a canonical
// set of rows, and issuing any sends risks double-delivery when the
// DB comes back.
//
// - Per-row failures are logged but NEVER returned. That contract comes
// straight from ProcessPendingNotifications (notification.go:242-267);
// the retry loop inherits it so a single 4xx response can't freeze
// every downstream row in the sweep.
//
// - Success promotes the row directly to 'sent' via UpdateStatus. The
// retry_count field is *not* incremented on success — that would
// falsify the audit-trail signal "this row was delivered on attempt
// N". The mock's UpdateStatus does a plain status write with no retry
// mutation (testutil_test.go:446-459), matching the postgres impl.
//
// - Failure uses pre-increment exponential backoff:
// wait = min(2^retry_count * time.Minute, notifRetryBackoffCap)
// where retry_count is the row's value *before* this attempt. The
// repo layer's RecordFailedAttempt then increments retry_count by 1
// server-side. This asymmetry keeps the service stateless — the
// service reads retry_count to compute the wait, but never writes it
// directly; the write is exclusively the repo's responsibility.
//
// - Exhaustion transitions to 'dead' when retry_count == max-1, because
// RecordFailedAttempt's ++ would push retry_count to max and the next
// sweep's `retry_count < max` filter in ListRetryEligible would then
// silently skip the row forever (a zombie-failed row nobody sees).
// MarkAsDead clears next_retry_at to evict the row from the partial
// retry-sweep index as well, so it stops scanning past dead rows.
//
// - A row whose Channel has no registered notifier is promoted to
// 'sent' (demo-mode parity with sendNotification's fallback at
// notification.go:272-279). This branch should not normally fire for
// retry rows — they were created *by* a notifier that failed — but
// defensive handling guards against config drift (notifier disabled
// between Create and retry) that would otherwise wedge the row.
func (s *NotificationService) RetryFailedNotifications(ctx context.Context) error {
now := time.Now()
rows, err := s.notifRepo.ListRetryEligible(ctx, now, notifRetryMaxAttempts, notifRetrySweepLimit)
if err != nil {
return fmt.Errorf("failed to list retry-eligible notifications: %w", err)
}
for _, row := range rows {
if row == nil {
continue
}
notifier, ok := s.notifierRegistry[string(row.Channel)]
if !ok {
// No notifier wired for this channel — promote to 'sent' to
// avoid looping forever over a row that has nowhere to go.
// See notification.go:272-279 for the sibling demo-mode path.
if updateErr := s.notifRepo.UpdateStatus(ctx, row.ID, string(domain.NotificationStatusSent), time.Now()); updateErr != nil {
slog.Error("failed to promote retry row with missing notifier to sent",
"notification_id", row.ID, "channel", row.Channel, "error", updateErr)
}
continue
}
sendErr := notifier.Send(ctx, row.Recipient, string(row.Type), row.Message)
if sendErr == nil {
// Success: promote straight to 'sent' without touching
// retry_count — the audit trail must preserve "this row was
// delivered on attempt N", and the mock's UpdateStatus is a
// plain status write (no retry_count reset). Errors here are
// logged, never returned.
if updateErr := s.notifRepo.UpdateStatus(ctx, row.ID, string(domain.NotificationStatusSent), time.Now()); updateErr != nil {
slog.Error("failed to mark retried notification as sent",
"notification_id", row.ID, "error", updateErr)
}
continue
}
// Failure path. Compute pre-increment backoff first so the
// exhaustion branch and the reschedule branch see an identical
// `wait` derivation — easier to audit against the test window
// assertions at notification_test.go:739-743 and :796-801.
wait := time.Duration(1<<row.RetryCount) * time.Minute
if wait > notifRetryBackoffCap {
wait = notifRetryBackoffCap
}
// Exhaustion: this attempt consumes the final slot of the attempt
// budget. Transition to 'dead' and let MarkAsDead clear
// next_retry_at so the retry-sweep index stops hitting the row.
if row.RetryCount >= notifRetryMaxAttempts-1 {
if markErr := s.notifRepo.MarkAsDead(ctx, row.ID, sendErr.Error()); markErr != nil {
slog.Error("failed to mark exhausted notification as dead",
"notification_id", row.ID, "retry_count", row.RetryCount,
"send_error", sendErr, "mark_error", markErr)
}
continue
}
// Non-terminal: hand the lastError + nextRetryAt off to the repo,
// which increments retry_count by exactly 1 and keeps the row in
// 'failed' state so the next tick picks it up.
nextRetryAt := time.Now().Add(wait)
if recErr := s.notifRepo.RecordFailedAttempt(ctx, row.ID, sendErr.Error(), nextRetryAt); recErr != nil {
slog.Error("failed to record notification retry attempt",
"notification_id", row.ID, "retry_count", row.RetryCount,
"next_retry_at", nextRetryAt, "send_error", sendErr, "record_error", recErr)
}
}
return nil
}
// RequeueNotification is the operator-driven escape hatch from 'dead' back
// to 'pending'. It resets all retry bookkeeping — retry_count → 0,
// next_retry_at → NULL, last_error → NULL — so ProcessPendingNotifications
// treats the requeued row as a fresh attempt on its next tick. Identical on
// the wire to a newly-created notification.
//
// Behavior contract (pinned by notification_test.go:849-917):
//
// - Success path delegates to the repo's Requeue, which performs the
// status/retry_count/next_retry_at/last_error reset atomically. The
// service adds no extra bookkeeping; the audit trail already captures
// the transition via the upstream API call.
//
// - Error path wraps the repo error with context so a failure like
// "pg: deadlock detected" surfaces in the handler response and the
// operator UI. The service has no fallback — a silent "success" that
// didn't actually mutate the row would be worse than a loud error.
func (s *NotificationService) RequeueNotification(ctx context.Context, id string) error {
if err := s.notifRepo.Requeue(ctx, id); err != nil {
return fmt.Errorf("failed to requeue notification: %w", err)
}
return nil
}
// ListNotificationsByStatus returns paginated notifications filtered by
// status. It mirrors ListNotifications's shape but threads a Status filter
// into the NotificationFilter so the Phase 2 Green handler can route
// `?status=dead` (Dead letter tab) through this method while keeping the
// unfiltered path on ListNotifications for backward compat.
//
// Pinned by notification_handler_test.go:443-519 — the handler test asserts
// that a request with `?status=dead&page=1&per_page=50` lands on exactly
// this signature (`status string, page, perPage int`) and that requests
// without a status param do NOT call it. Keep the returned shape identical
// to ListNotifications so the handler can reuse its JSON-encoding path.
func (s *NotificationService) ListNotificationsByStatus(ctx context.Context, status string, page, perPage int) ([]domain.NotificationEvent, int64, error) {
if page < 1 {
page = 1
}
if perPage < 1 {
perPage = 50
}
filter := &repository.NotificationFilter{
Status: status,
Page: page,
PerPage: perPage,
}
notifications, err := s.notifRepo.List(ctx, filter)
if err != nil {
return nil, 0, fmt.Errorf("failed to list notifications by status: %w", err)
}
result := make([]domain.NotificationEvent, 0, len(notifications))
for _, n := range notifications {
if n != nil {
result = append(result, *n)
}
}
total := int64(len(result))
return result, total, nil
}
+350
View File
@@ -565,3 +565,353 @@ func TestGetNotificationHistory(t *testing.T) {
func stringPtr(s string) *string {
return &s
}
// ─── I-005 retry + DLQ service contract (Phase 1 Red) ─────────────────────
//
// These tests pin the service-layer contract the I-005 fix must satisfy. The
// Red signals they produce are, in compile order:
//
// 1. service.NotificationService.RetryFailedNotifications undefined
// 2. service.NotificationService.RequeueNotification undefined
// 3. mockNotifRepo.ListRetryEligible undefined (surfaced after the service
// method exists and starts calling it)
// 4. mockNotifRepo.RecordFailedAttempt undefined
// 5. mockNotifRepo.MarkAsDead undefined
// 6. mockNotifRepo.Requeue undefined
// 7. NotificationEvent.RetryCount / NextRetryAt / LastError undefined — but
// domain/notification_test.go already pins these, so they ride in on the
// Phase 2 Green domain edit and compile by the time the service-layer
// tests run.
//
// The contract under test, re-derived from notification.go:282-288:
// * A failed notifier.Send used to stamp status='failed' with a zero
// time.Time and return. I-005 reframes that row as retry-eligible with
// bookkeeping (retry_count, next_retry_at, last_error) so a sibling
// scheduler loop can promote it back to 'pending' until max_attempts,
// then to 'dead' (DLQ) for operator triage.
// * Backoff is 2^retry_count minutes, capped at 1h, mirroring the
// operator decision captured in the I-005 design notes.
// * Success on a retry promotes the row straight to 'sent' via
// UpdateStatus (no retry bookkeeping change).
// * Requeue is the operator-driven escape hatch from 'dead' back to
// 'pending' with retry_count reset to 0; service-layer impl is a
// pass-through to repo.Requeue so the audit trail is consistent.
const (
// i005MaxAttempts must match the same constant used by the Green
// service implementation. Declared here only so the test assertions
// read cleanly; Phase 2 is free to thread this from config.
i005MaxAttempts = 5
// i005BackoffCap mirrors the 1h ceiling on 2^retry_count minutes.
i005BackoffCap = time.Hour
)
// newFailedNotification builds a minimal failed-state row suitable for seeding
// the mock repo. retry_count is the number of attempts already consumed (so
// the next attempt becomes retry_count+1, and retry_count == max-1 puts the
// row at the exhaustion threshold).
func newFailedNotification(id string, retryCount int, nextRetryAt time.Time) *domain.NotificationEvent {
nextCopy := nextRetryAt
last := "connection refused"
return &domain.NotificationEvent{
ID: id,
Type: domain.NotificationTypeExpirationWarning,
Channel: domain.NotificationChannelEmail,
Recipient: "owner-i005@example.com",
Message: "retry me: " + id,
Status: string(domain.NotificationStatusFailed),
RetryCount: retryCount,
NextRetryAt: &nextCopy,
LastError: &last,
CreatedAt: time.Now().Add(-time.Hour),
}
}
// TestNotificationService_RetryFailedNotifications_NoEligibleRows asserts the
// no-op path: an empty retry queue must not trigger any notifier.Send calls
// and must not surface as an error. This pins that the retry loop's cost is
// O(retry-eligible), not O(total).
func TestNotificationService_RetryFailedNotifications_NoEligibleRows(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifier := newMockNotifier()
registry := map[string]Notifier{"Email": notifier}
svc := NewNotificationService(notifRepo, registry)
if err := svc.RetryFailedNotifications(ctx); err != nil {
t.Fatalf("RetryFailedNotifications on empty queue returned error: %v", err)
}
if got := notifier.getSentCount(); got != 0 {
t.Errorf("notifier.Send call count = %d, want 0 (no retry-eligible rows)", got)
}
}
// TestNotificationService_RetryFailedNotifications_ListError asserts that a
// ListRetryEligible failure short-circuits the loop. Notifier.Send must not
// fire — we never got a canonical set of rows to act on, so sending anything
// would risk double-delivery when the DB comes back.
func TestNotificationService_RetryFailedNotifications_ListError(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifRepo.ListErr = fmt.Errorf("simulated DB outage")
notifier := newMockNotifier()
registry := map[string]Notifier{"Email": notifier}
svc := NewNotificationService(notifRepo, registry)
err := svc.RetryFailedNotifications(ctx)
if err == nil {
t.Fatalf("RetryFailedNotifications must surface the list error; got nil")
}
if !strings.Contains(err.Error(), "simulated DB outage") {
t.Errorf("expected wrapped list error to mention 'simulated DB outage', got: %v", err)
}
if got := notifier.getSentCount(); got != 0 {
t.Errorf("notifier.Send must not fire when list fails; got %d sends", got)
}
}
// TestNotificationService_RetryFailedNotifications_SuccessPromotes asserts
// the happy path for a retry that succeeds: the row is promoted directly to
// 'sent' via UpdateStatus (mirroring ProcessPendingNotifications), and no
// retry bookkeeping mutation (RecordFailedAttempt / MarkAsDead) fires.
func TestNotificationService_RetryFailedNotifications_SuccessPromotes(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifier := newMockNotifier() // default: no error — Send succeeds
registry := map[string]Notifier{"Email": notifier}
svc := NewNotificationService(notifRepo, registry)
row := newFailedNotification("notif-success", 2, time.Now().Add(-time.Minute))
notifRepo.AddNotification(row)
if err := svc.RetryFailedNotifications(ctx); err != nil {
t.Fatalf("RetryFailedNotifications should not error on per-row success: %v", err)
}
if notifier.getSentCount() != 1 {
t.Errorf("expected exactly 1 notifier.Send call, got %d", notifier.getSentCount())
}
if row.Status != string(domain.NotificationStatusSent) {
t.Errorf("successful retry must promote status to 'sent', got %q", row.Status)
}
// retry_count must NOT increment on success — that would falsify the
// "this row was delivered on attempt N" signal the audit trail relies on.
if row.RetryCount != 2 {
t.Errorf("retry_count must not change on success, got %d (want 2)", row.RetryCount)
}
}
// TestNotificationService_RetryFailedNotifications_ExponentialBackoff asserts
// that a still-retriable failure schedules the next attempt at 2^retry_count
// minutes from now, matching the operator-approved curve 1m, 2m, 4m, 8m, 16m.
// The assertion is a window check against time.Now() because the service
// reads its own clock.
func TestNotificationService_RetryFailedNotifications_ExponentialBackoff(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifier := newMockNotifier()
notifier.SendErr = fmt.Errorf("smtp 451 temporary failure")
registry := map[string]Notifier{"Email": notifier}
svc := NewNotificationService(notifRepo, registry)
// retry_count=2 → next attempt is #3, backoff = 2^2 = 4 minutes.
row := newFailedNotification("notif-backoff", 2, time.Now().Add(-time.Minute))
notifRepo.AddNotification(row)
before := time.Now()
if err := svc.RetryFailedNotifications(ctx); err != nil {
t.Fatalf("RetryFailedNotifications should not bubble per-row send errors: %v", err)
}
after := time.Now()
// Still in 'failed' — not yet exhausted (retry_count+1 = 3, below max 5).
if row.Status != string(domain.NotificationStatusFailed) {
t.Errorf("status after non-terminal retry must stay 'failed', got %q", row.Status)
}
if row.RetryCount != 3 {
t.Errorf("retry_count must increment on failure, got %d (want 3)", row.RetryCount)
}
if row.NextRetryAt == nil {
t.Fatalf("NextRetryAt must be set on non-terminal retry failure; got nil")
}
expectedMin := before.Add(4 * time.Minute)
expectedMax := after.Add(4 * time.Minute)
if row.NextRetryAt.Before(expectedMin) || row.NextRetryAt.After(expectedMax) {
t.Errorf("NextRetryAt outside 2^2=4m window [%v, %v]; got %v",
expectedMin, expectedMax, *row.NextRetryAt)
}
if row.LastError == nil || !strings.Contains(*row.LastError, "smtp 451 temporary failure") {
t.Errorf("LastError must preserve the notifier error body for triage; got %v", row.LastError)
}
}
// TestNotificationService_RetryFailedNotifications_BackoffCap asserts the
// defense-in-depth 1h ceiling on next_retry_at. The retry curve under the
// operator-approved formula is pre-increment `2^retry_count` minutes — 1m,
// 2m, 4m, 8m — and with max_attempts=5 the deepest still-retriable row is
// retry_count=4 (next wait = 2^4 = 16m), which would transition to 'dead'
// before ever scheduling. So the largest actually-schedulable wait is
// 2^3=8m at retry_count=3, well under the 1h cap.
//
// That makes this test a ceiling-assertion, not a saturation-assertion: we
// pick retry_count=3 (matching ExponentialBackoff's formula but one step
// deeper) and verify (a) the window lands at 2^3=8m and (b) the cap is
// never exceeded. When max_attempts becomes configurable in a later
// milestone, this test becomes the natural home for a true cap-saturation
// fixture; for now it pins the arithmetic the Phase 2 Green implementation
// has to hit exactly.
func TestNotificationService_RetryFailedNotifications_BackoffCap(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifier := newMockNotifier()
notifier.SendErr = fmt.Errorf("webhook 502 bad gateway")
registry := map[string]Notifier{"Email": notifier}
svc := NewNotificationService(notifRepo, registry)
// retry_count=3 → pre-increment wait = 2^3 = 8 minutes. Post-increment
// retry_count becomes 4, which is still below max_attempts=5, so the
// row stays in 'failed' rather than transitioning to 'dead'.
row := newFailedNotification("notif-backoff-cap", 3, time.Now().Add(-time.Minute))
notifRepo.AddNotification(row)
before := time.Now()
if err := svc.RetryFailedNotifications(ctx); err != nil {
t.Fatalf("RetryFailedNotifications should not bubble per-row send errors: %v", err)
}
after := time.Now()
if row.Status != string(domain.NotificationStatusFailed) {
t.Errorf("mid-retry status must stay 'failed', got %q", row.Status)
}
if row.RetryCount != 4 {
t.Errorf("retry_count must increment on failure, got %d (want 4)", row.RetryCount)
}
if row.NextRetryAt == nil {
t.Fatalf("NextRetryAt must be set; got nil")
}
// retry_count=3 → pre-increment 2^3 = 8m, matching the curve pinned by
// ExponentialBackoff (retry_count=2 → 2^2=4m).
expectedMin := before.Add(8 * time.Minute)
expectedMax := after.Add(8 * time.Minute)
if row.NextRetryAt.Before(expectedMin) || row.NextRetryAt.After(expectedMax) {
t.Errorf("NextRetryAt outside 2^3=8m window [%v, %v]; got %v",
expectedMin, expectedMax, *row.NextRetryAt)
}
// And regardless of retry_count, the ceiling must hold: next_retry_at
// must never be more than i005BackoffCap (1h) from now. This is the
// defense-in-depth assertion — it would fail loudly if a future
// refactor swapped to post-increment and overshot on a deeper row.
if row.NextRetryAt.After(after.Add(i005BackoffCap + time.Second)) {
t.Errorf("NextRetryAt violates 1h cap; scheduled %v in the future",
row.NextRetryAt.Sub(after))
}
}
// TestNotificationService_RetryFailedNotifications_MarkDeadOnExhaustion
// asserts the terminal transition: once retry_count crosses max_attempts,
// the row moves to 'dead' (DLQ) and stops participating in the retry sweep.
// next_retry_at must be cleared — otherwise the partial retry-sweep index
// would still pick it up and we'd loop forever.
func TestNotificationService_RetryFailedNotifications_MarkDeadOnExhaustion(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifier := newMockNotifier()
notifier.SendErr = fmt.Errorf("connection refused after max attempts")
registry := map[string]Notifier{"Email": notifier}
svc := NewNotificationService(notifRepo, registry)
// retry_count = max-1: this attempt makes it max, so the row must
// transition to 'dead', not get rescheduled.
row := newFailedNotification("notif-dead", i005MaxAttempts-1, time.Now().Add(-time.Minute))
notifRepo.AddNotification(row)
if err := svc.RetryFailedNotifications(ctx); err != nil {
t.Fatalf("RetryFailedNotifications must not bubble per-row exhaustion: %v", err)
}
if row.Status != string(domain.NotificationStatusDead) {
t.Errorf("exhausted row must be in 'dead' status, got %q", row.Status)
}
if row.NextRetryAt != nil {
t.Errorf("dead row must have next_retry_at cleared (else retry sweep keeps picking it up); got %v", *row.NextRetryAt)
}
if row.LastError == nil || !strings.Contains(*row.LastError, "connection refused after max attempts") {
t.Errorf("LastError on dead row must preserve final failure reason; got %v", row.LastError)
}
}
// TestNotificationService_RequeueNotification_Success asserts the operator
// escape hatch: Requeue flips a dead row back to 'pending' with
// retry_count=0 so ProcessPendingNotifications can pick it up on the very
// next tick. The service delegates to repo.Requeue and propagates no error.
func TestNotificationService_RequeueNotification_Success(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
registry := map[string]Notifier{"Email": newMockNotifier()}
svc := NewNotificationService(notifRepo, registry)
next := time.Now().Add(10 * time.Minute)
last := "max attempts exceeded"
dead := &domain.NotificationEvent{
ID: "notif-requeue",
Type: domain.NotificationTypeExpirationWarning,
Channel: domain.NotificationChannelEmail,
Recipient: "owner@example.com",
Message: "please requeue me",
Status: string(domain.NotificationStatusDead),
RetryCount: i005MaxAttempts,
NextRetryAt: &next,
LastError: &last,
CreatedAt: time.Now().Add(-2 * time.Hour),
}
notifRepo.AddNotification(dead)
if err := svc.RequeueNotification(ctx, dead.ID); err != nil {
t.Fatalf("RequeueNotification(%s) returned error: %v", dead.ID, err)
}
if dead.Status != string(domain.NotificationStatusPending) {
t.Errorf("Requeue must flip status to 'pending', got %q", dead.Status)
}
if dead.RetryCount != 0 {
t.Errorf("Requeue must reset retry_count to 0, got %d", dead.RetryCount)
}
if dead.NextRetryAt != nil {
t.Errorf("Requeue must clear next_retry_at (pending rows never have it), got %v", *dead.NextRetryAt)
}
if dead.LastError != nil {
t.Errorf("Requeue must clear last_error (pending is a fresh attempt), got %v", *dead.LastError)
}
}
// TestNotificationService_RequeueNotification_RepoError asserts that a
// failed Requeue at the repository layer surfaces cleanly. The service has
// no fallback here — if the DB can't update the row, the operator action
// must fail loudly rather than silently "succeed" in the UI.
func TestNotificationService_RequeueNotification_RepoError(t *testing.T) {
ctx := context.Background()
notifRepo := newMockNotificationRepository()
notifRepo.UpdateErr = fmt.Errorf("pg: deadlock detected")
registry := map[string]Notifier{"Email": newMockNotifier()}
svc := NewNotificationService(notifRepo, registry)
// Seed a dead row so the service has something to act on (the error
// must come from the repo write, not from a missing ID).
dead := &domain.NotificationEvent{
ID: "notif-requeue-err",
Type: domain.NotificationTypeExpirationWarning,
Channel: domain.NotificationChannelEmail,
Status: string(domain.NotificationStatusDead),
}
notifRepo.AddNotification(dead)
err := svc.RequeueNotification(ctx, dead.ID)
if err == nil {
t.Fatalf("RequeueNotification must surface repo errors; got nil")
}
if !strings.Contains(err.Error(), "pg: deadlock detected") {
t.Errorf("expected wrapped repo error to mention 'pg: deadlock detected', got: %v", err)
}
}
+46 -11
View File
@@ -15,6 +15,12 @@ type StatsService struct {
certRepo repository.CertificateRepository
jobRepo repository.JobRepository
agentRepo repository.AgentRepository
// notifRepo is injected post-construction via SetNotifRepo so that
// NewStatsService's nine call sites (main.go + stats_test.go + 8 digest
// tests) keep their existing signatures. When nil, the dead-letter count
// falls through to zero — see GetDashboardSummary. I-005 coverage-gap
// closure.
notifRepo repository.NotificationRepository
}
// NewStatsService creates a new stats service.
@@ -30,19 +36,35 @@ func NewStatsService(
}
}
// SetNotifRepo injects the notification repository used to populate
// DashboardSummary.NotificationsDead. Setter pattern (matching the
// certificateService.SetTargetRepo / SetProfileRepo / SetDigestService
// precedent) keeps the NewStatsService signature stable across its
// pre-existing call sites. I-005 coverage-gap closure.
func (s *StatsService) SetNotifRepo(notifRepo repository.NotificationRepository) {
s.notifRepo = notifRepo
}
// DashboardSummary represents a high-level summary of system state.
type DashboardSummary struct {
TotalCertificates int64 `json:"total_certificates"`
ExpiringCertificates int64 `json:"expiring_certificates"`
ExpiredCertificates int64 `json:"expired_certificates"`
RevokedCertificates int64 `json:"revoked_certificates"`
ActiveAgents int64 `json:"active_agents"`
OfflineAgents int64 `json:"offline_agents"`
TotalAgents int64 `json:"total_agents"`
PendingJobs int64 `json:"pending_jobs"`
FailedJobs int64 `json:"failed_jobs"`
CompleteJobs int64 `json:"complete_jobs"`
CompletedAt time.Time `json:"completed_at"`
TotalCertificates int64 `json:"total_certificates"`
ExpiringCertificates int64 `json:"expiring_certificates"`
ExpiredCertificates int64 `json:"expired_certificates"`
RevokedCertificates int64 `json:"revoked_certificates"`
ActiveAgents int64 `json:"active_agents"`
OfflineAgents int64 `json:"offline_agents"`
TotalAgents int64 `json:"total_agents"`
PendingJobs int64 `json:"pending_jobs"`
FailedJobs int64 `json:"failed_jobs"`
CompleteJobs int64 `json:"complete_jobs"`
// NotificationsDead is the number of notification_events rows currently
// in the terminal "dead" status (I-005 dead-letter queue). Exposed here
// so the metrics handler can derive the Prometheus counter
// certctl_notification_dead_total from the same snapshot used by the
// dashboard. DB-COUNT rather than in-memory — notifications can grow
// without bound, and filter-based List() is PerPage-capped to 50.
NotificationsDead int64 `json:"notifications_dead"`
CompletedAt time.Time `json:"completed_at"`
}
// GetDashboardSummary returns a summary of key metrics.
@@ -106,6 +128,19 @@ func (s *StatsService) GetDashboardSummary(ctx context.Context) (interface{}, er
}
}
// I-005: dead-letter count for certctl_notification_dead_total. nil-safe
// so the nine existing NewStatsService call sites that haven't yet been
// updated to call SetNotifRepo keep working — they'll simply report
// NotificationsDead=0, which is the correct value on a system without a
// notification repository wired in. A CountByStatus error is non-fatal:
// the dashboard summary is best-effort for this field.
if s.notifRepo != nil {
deadCount, err := s.notifRepo.CountByStatus(ctx, string(domain.NotificationStatusDead))
if err == nil {
summary.NotificationsDead = deadCount
}
}
return summary, nil
}
+194 -20
View File
@@ -157,20 +157,20 @@ func (m *mockCertRepo) AddCert(cert *domain.ManagedCertificate) {
// mockJobRepo is a test implementation of JobRepository
type mockJobRepo struct {
mu sync.Mutex
Jobs map[string]*domain.Job
StatusUpdates map[string]domain.JobStatus
CreateErr error
UpdateErr error
UpdateErrorByID map[string]error
UpdateErrorByIDMu sync.Mutex
UpdateStatusErr error
GetErr error
ListErr error
ListByStatusErr error
DeleteErr error
ListTimedOutErr error
Updated []*domain.Job
mu sync.Mutex
Jobs map[string]*domain.Job
StatusUpdates map[string]domain.JobStatus
CreateErr error
UpdateErr error
UpdateErrorByID map[string]error
UpdateErrorByIDMu sync.Mutex
UpdateStatusErr error
GetErr error
ListErr error
ListByStatusErr error
DeleteErr error
ListTimedOutErr error
Updated []*domain.Job
}
func (m *mockJobRepo) List(ctx context.Context) ([]*domain.Job, error) {
@@ -393,13 +393,36 @@ func (m *mockJobRepo) AddJob(job *domain.Job) {
m.Jobs[job.ID] = job
}
// mockNotifRepo is a test implementation of NotificationRepository
// mockNotifRepo is a test implementation of NotificationRepository.
//
// I-005 extensions (ListRetryEligible / RecordFailedAttempt / MarkAsDead /
// Requeue) mutate the seeded *domain.NotificationEvent pointers in place.
// The service tests in notification_test.go assert against those same
// pointers (via notifRepo.Notifications or the local `row` handle), so
// in-place mutation is the contract — not a copy-and-replace pattern.
//
// Error fields are layered:
// - Per-method errors (ListRetryEligibleErr, RecordFailedAttemptErr, etc.)
// for fine-grained failure injection when a test targets exactly one
// method.
// - Shared legacy errors (ListErr for list-shaped reads, UpdateErr for
// update-shaped writes) so the pre-I-005 tests that configure ListErr
// or UpdateErr continue to short-circuit the new methods too. The
// RequeueNotification_RepoError test deliberately relies on this by
// setting UpdateErr rather than RequeueErr.
type mockNotifRepo struct {
mu sync.Mutex
Notifications []*domain.NotificationEvent
CreateErr error
ListErr error
UpdateErr error
// I-005 per-method failure injection.
ListRetryEligibleErr error
RecordFailedAttemptErr error
MarkAsDeadErr error
RequeueErr error
CountByStatusErr error
}
func (m *mockNotifRepo) Create(ctx context.Context, notif *domain.NotificationEvent) error {
@@ -436,12 +459,163 @@ func (m *mockNotifRepo) UpdateStatus(ctx context.Context, id string, status stri
return errNotFound
}
// ListRetryEligible returns failed rows whose NextRetryAt is non-nil, at or
// before beforeTime, AND whose RetryCount is strictly less than maxAttempts,
// ordered oldest-due first, capped at limit. Signature matches the postgres-
// canonical shape pinned by notification_test.go:118 ("repo.ListRetryEligible
// (ctx, now, 5, 100)") and the NotificationRepository interface at
// interfaces.go:308 — a row at retry_count == maxAttempts is NOT returned
// because the service has already exhausted its attempt budget and the row
// must be MarkAsDead'd by whichever tick last touched it, not re-swept here.
// Mirrors the partial-index predicate
// `WHERE status='failed' AND next_retry_at IS NOT NULL AND next_retry_at <= $1`
// that migration 000016's retry-sweep index makes cheap to scan; the
// retry_count filter is an extra Go-side guard so the mock behaves
// identically to the postgres `AND retry_count < $2` clause.
func (m *mockNotifRepo) ListRetryEligible(ctx context.Context, beforeTime time.Time, maxAttempts, limit int) ([]*domain.NotificationEvent, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.ListRetryEligibleErr != nil {
return nil, m.ListRetryEligibleErr
}
if m.ListErr != nil {
return nil, m.ListErr
}
eligible := make([]*domain.NotificationEvent, 0)
for _, n := range m.Notifications {
if n.Status != string(domain.NotificationStatusFailed) {
continue
}
if n.NextRetryAt == nil {
continue
}
if n.NextRetryAt.After(beforeTime) {
continue
}
if n.RetryCount >= maxAttempts {
continue
}
eligible = append(eligible, n)
}
// Oldest-due first so the service processes the most-overdue row first,
// matching how an ORDER BY next_retry_at ASC query would behave.
sort.Slice(eligible, func(i, j int) bool {
return eligible[i].NextRetryAt.Before(*eligible[j].NextRetryAt)
})
if limit > 0 && len(eligible) > limit {
eligible = eligible[:limit]
}
return eligible, nil
}
// RecordFailedAttempt mutates the matched row in place: increments
// retry_count, pins next_retry_at, stores last_error, and keeps the row in
// 'failed' state so the next retry-sweep tick picks it up again. Service-
// level backoff math happens before the call; the repo is a dumb setter.
// Signature matches the postgres-canonical shape pinned by
// notification_test.go:184 ("repo.RecordFailedAttempt(ctx, 'notif-attempt-1',
// 'connection refused', nextTry)") and the NotificationRepository interface
// at interfaces.go:315 — id, then lastError, then nextRetryAt. The earlier
// (id, nextRetryAt, lastError) ordering from the Phase 1 Red seed was wrong
// and is corrected here in Phase 2 Green.
func (m *mockNotifRepo) RecordFailedAttempt(ctx context.Context, id string, lastError string, nextRetryAt time.Time) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.RecordFailedAttemptErr != nil {
return m.RecordFailedAttemptErr
}
if m.UpdateErr != nil {
return m.UpdateErr
}
for _, n := range m.Notifications {
if n.ID == id {
n.RetryCount++
next := nextRetryAt
n.NextRetryAt = &next
le := lastError
n.LastError = &le
n.Status = string(domain.NotificationStatusFailed)
return nil
}
}
return errNotFound
}
// MarkAsDead flips the row into the terminal DLQ state. next_retry_at is
// cleared so the partial retry-sweep index no longer touches this row —
// otherwise RetryFailedNotifications would loop over it forever without
// making any state change.
func (m *mockNotifRepo) MarkAsDead(ctx context.Context, id string, lastError string) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.MarkAsDeadErr != nil {
return m.MarkAsDeadErr
}
if m.UpdateErr != nil {
return m.UpdateErr
}
for _, n := range m.Notifications {
if n.ID == id {
n.Status = string(domain.NotificationStatusDead)
n.NextRetryAt = nil
le := lastError
n.LastError = &le
return nil
}
}
return errNotFound
}
// Requeue is the operator-driven escape hatch from 'dead' back to 'pending'.
// Clears retry bookkeeping entirely so ProcessPendingNotifications treats
// the requeued row as a fresh attempt — identical on the wire to a freshly-
// created notification.
func (m *mockNotifRepo) Requeue(ctx context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.RequeueErr != nil {
return m.RequeueErr
}
if m.UpdateErr != nil {
return m.UpdateErr
}
for _, n := range m.Notifications {
if n.ID == id {
n.Status = string(domain.NotificationStatusPending)
n.RetryCount = 0
n.NextRetryAt = nil
n.LastError = nil
return nil
}
}
return errNotFound
}
func (m *mockNotifRepo) AddNotification(notif *domain.NotificationEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.Notifications = append(m.Notifications, notif)
}
// CountByStatus counts in-memory rows whose Status field matches exactly.
// Dedicated error injection via CountByStatusErr so a test can assert the
// StatsService wrap-path ("failed to count dead notifications: …") without
// also tripping ListErr or other shared fields. I-005 Phase 2 Green.
func (m *mockNotifRepo) CountByStatus(ctx context.Context, status string) (int64, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.CountByStatusErr != nil {
return 0, m.CountByStatusErr
}
var count int64
for _, n := range m.Notifications {
if n.Status == status {
count++
}
}
return count, nil
}
// mockAuditRepo is a test implementation of AuditRepository
type mockAuditRepo struct {
mu sync.Mutex
@@ -635,10 +809,10 @@ type mockAgentRepo struct {
// or RetireAgentWithCascade failure after preflight passed, so the
// service's error surfacing (wrap+return, skip audit, etc.) can be
// exercised without having to stand up a real PG connection.
SoftRetireErr error
SoftRetireErr error
RetireCascadeErr error
CountErr error
ListRetiredErr error
CountErr error
ListRetiredErr error
}
// List mirrors the production repo contract post-I-004: it returns only
@@ -993,8 +1167,8 @@ func newMockTargetRepository() *mockTargetRepo {
// mockIssuerConnector is a test implementation of IssuerConnector
type mockIssuerConnector struct {
Result *IssuanceResult
Err error
Result *IssuanceResult
Err error
getRenewalInfoResult *RenewalInfoResult
getRenewalInfoErr error
// LastOCSPSignRequest captures the last request passed to SignOCSPResponse.