feat(M50): cloud secret manager discovery — AWS SM, Azure KV, GCP SM

Extend certificate discovery from filesystem + network to cloud secret
managers. Three pluggable DiscoverySource connectors feed into the
existing discovery pipeline via sentinel agent pattern, with a 9th
scheduler loop for periodic cloud scanning.

- AWS Secrets Manager: aws-sdk-go-v2, tag/prefix filtering, 10 tests
- Azure Key Vault: stdlib HTTP + OAuth2, base64 DER/PEM, 16 tests
- GCP Secret Manager: stdlib HTTP + JWT OAuth2, label filter, 14 tests
- CloudDiscoveryService orchestrator with 9 tests
- 9th scheduler loop (6h default, atomic.Bool idempotency)
- Discovery page: color-coded source type badges
- 14 new env vars across CloudDiscoveryConfig structs
- Docs: connectors.md, architecture.md, features.md, README updated

49 new tests. All CI checks pass (go vet, race, lint, coverage).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
shankar0123
2026-04-15 23:01:00 -04:00
parent 3f619bcaac
commit e1bcde4cf1
19 changed files with 3791 additions and 24 deletions
+90 -8
View File
@@ -45,18 +45,24 @@ type HealthCheckServicer interface {
RunHealthChecks(ctx context.Context) error
}
// CloudDiscoveryServicer defines the interface for cloud secret manager discovery used by the scheduler.
type CloudDiscoveryServicer interface {
DiscoverAll(ctx context.Context) (int, []error)
}
// Scheduler manages background jobs and periodic tasks for the certificate control plane.
// It runs multiple concurrent loops for renewal checks, job processing, agent health checks,
// and notification processing.
type Scheduler struct {
renewalService RenewalServicer
jobService JobServicer
agentService AgentServicer
notificationService NotificationServicer
networkScanService NetworkScanServicer
digestService DigestServicer
healthCheckService HealthCheckServicer
logger *slog.Logger
renewalService RenewalServicer
jobService JobServicer
agentService AgentServicer
notificationService NotificationServicer
networkScanService NetworkScanServicer
digestService DigestServicer
healthCheckService HealthCheckServicer
cloudDiscoveryService CloudDiscoveryServicer
logger *slog.Logger
// Configurable tick intervals
renewalCheckInterval time.Duration
@@ -67,6 +73,7 @@ type Scheduler struct {
networkScanInterval time.Duration
digestInterval time.Duration
healthCheckInterval time.Duration
cloudDiscoveryInterval time.Duration
// Idempotency guards: prevent duplicate execution of slow jobs
renewalCheckRunning atomic.Bool
@@ -77,6 +84,7 @@ type Scheduler struct {
networkScanRunning atomic.Bool
digestRunning atomic.Bool
healthCheckRunning atomic.Bool
cloudDiscoveryRunning atomic.Bool
// Graceful shutdown: wait for in-flight work to complete
wg sync.WaitGroup
@@ -108,6 +116,7 @@ func NewScheduler(
networkScanInterval: 6 * time.Hour,
digestInterval: 24 * time.Hour,
healthCheckInterval: 60 * time.Second,
cloudDiscoveryInterval: 6 * time.Hour,
}
}
@@ -163,6 +172,17 @@ func (s *Scheduler) SetHealthCheckInterval(d time.Duration) {
s.healthCheckInterval = d
}
// SetCloudDiscoveryService sets the cloud discovery service for the 9th scheduler loop.
// Called after construction since cloud discovery is optional.
func (s *Scheduler) SetCloudDiscoveryService(cds CloudDiscoveryServicer) {
s.cloudDiscoveryService = cds
}
// SetCloudDiscoveryInterval configures the interval for cloud secret manager discovery.
func (s *Scheduler) SetCloudDiscoveryInterval(d time.Duration) {
s.cloudDiscoveryInterval = d
}
// Start initiates all background scheduler loops. It returns a channel that signals
// when the scheduler has started all loops. The scheduler runs until the context is cancelled.
func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
@@ -183,6 +203,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
if s.healthCheckService != nil {
loopCount++
}
if s.cloudDiscoveryService != nil {
loopCount++
}
s.wg.Add(loopCount)
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
@@ -199,6 +222,9 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
if s.healthCheckService != nil {
go func() { defer s.wg.Done(); s.healthCheckLoop(ctx) }()
}
if s.cloudDiscoveryService != nil {
go func() { defer s.wg.Done(); s.cloudDiscoveryLoop(ctx) }()
}
// Signal that all loops are launched
close(startedChan)
@@ -586,6 +612,62 @@ func (s *Scheduler) runHealthCheck(ctx context.Context) {
}
}
// cloudDiscoveryLoop runs every cloudDiscoveryInterval and discovers certificates from cloud secret managers.
// Runs immediately on start, then on each tick. Same idempotency pattern as networkScanLoop.
// Uses atomic.Bool to prevent duplicate execution if the previous scan is still running.
func (s *Scheduler) cloudDiscoveryLoop(ctx context.Context) {
ticker := time.NewTicker(s.cloudDiscoveryInterval)
defer ticker.Stop()
// Run immediately on start (with idempotency guard)
s.cloudDiscoveryRunning.Store(true)
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.cloudDiscoveryRunning.Store(false)
s.runCloudDiscovery(ctx)
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !s.cloudDiscoveryRunning.CompareAndSwap(false, true) {
s.logger.Warn("cloud discovery still running, skipping tick")
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.cloudDiscoveryRunning.Store(false)
s.runCloudDiscovery(ctx)
}()
}
}
}
// runCloudDiscovery executes a single cloud discovery cycle with error recovery.
func (s *Scheduler) runCloudDiscovery(ctx context.Context) {
opCtx, cancel := context.WithTimeout(ctx, 30*time.Minute)
defer cancel()
total, errs := s.cloudDiscoveryService.DiscoverAll(opCtx)
if len(errs) > 0 {
s.logger.Error("cloud discovery completed with errors",
"certificates_found", total,
"errors", len(errs),
"interval", s.cloudDiscoveryInterval.String())
for _, err := range errs {
if !errors.Is(err, context.Canceled) {
s.logger.Error("cloud discovery error", "error", err)
}
}
} else {
s.logger.Debug("cloud discovery completed",
"certificates_found", total)
}
}
// WaitForCompletion waits for all in-flight scheduler work to complete.
// It respects the provided timeout and returns an error if work is still in progress after timeout.
// Call this after the scheduler context has been cancelled to ensure graceful shutdown.