From d150570a111607fd4a3e779506493ad723c8b8a1 Mon Sep 17 00:00:00 2001 From: Shankar Date: Fri, 27 Mar 2026 23:31:52 -0400 Subject: [PATCH] =?UTF-8?q?fix:=20scheduler=20race=20=E2=80=94=20track=20l?= =?UTF-8?q?oop=20goroutines=20in=20WaitGroup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: WaitForCompletion only waited for work goroutines (wg), but the 5-6 loop goroutines (renewalCheckLoop, jobProcessorLoop, etc.) were not tracked. After cancel() + WaitForCompletion(), loop goroutines could still be alive accessing scheduler/mock fields when the next test started, triggering the race detector. Fix: - Start() now adds loop goroutines to wg, so WaitForCompletion blocks until both work items AND loops have fully exited - Removed untracked 100ms timer goroutine for startedChan — now closed immediately after launching loops - Timeout test updated: uses blockCh (ignores context) instead of slowDelay (respects context) so it reliably triggers the timeout path Co-Authored-By: Claude Opus 4.6 --- internal/scheduler/scheduler.go | 30 +++++++++++++++------------ internal/scheduler/scheduler_test.go | 31 ++++++++++++++-------------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 6db1211..c9f800d 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -126,21 +126,25 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} { go func() { s.logger.Info("scheduler starting") - // Signal that the scheduler has started all loops - go func() { - <-time.After(100 * time.Millisecond) - close(startedChan) - }() - - // Start all scheduler loops concurrently - go s.renewalCheckLoop(ctx) - go s.jobProcessorLoop(ctx) - go s.agentHealthCheckLoop(ctx) - go s.notificationProcessLoop(ctx) - go s.shortLivedExpiryCheckLoop(ctx) + // Track all loop goroutines in the WaitGroup so WaitForCompletion + // blocks until they've fully exited (prevents test races). + loopCount := 5 if s.networkScanService != nil { - go s.networkScanLoop(ctx) + loopCount = 6 } + s.wg.Add(loopCount) + + go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }() + go func() { defer s.wg.Done(); s.jobProcessorLoop(ctx) }() + go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }() + go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }() + go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }() + if s.networkScanService != nil { + go func() { defer s.wg.Done(); s.networkScanLoop(ctx) }() + } + + // Signal that all loops are launched + close(startedChan) // Wait for context cancellation <-ctx.Done() diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 30166ab..043e78d 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -16,14 +16,22 @@ type mockRenewalService struct { callTimes []time.Time slowDelay time.Duration shouldError bool + blockCh chan struct{} // if non-nil, blocks until closed (ignores context) } func (m *mockRenewalService) CheckExpiringCertificates(ctx context.Context) error { m.mu.Lock() m.callCount++ m.callTimes = append(m.callTimes, time.Now()) + blockCh := m.blockCh m.mu.Unlock() + // If blockCh is set, block until it's closed (ignores context — for timeout tests) + if blockCh != nil { + <-blockCh + return nil + } + if m.slowDelay > 0 { select { case <-time.After(m.slowDelay): @@ -274,8 +282,9 @@ func TestWaitForCompletionTimeout(t *testing.T) { // Use a channel-blocked mock that ignores context cancellation, // ensuring work is still in-flight when WaitForCompletion is called. blockCh := make(chan struct{}) - renewalMock := &mockRenewalService{} - renewalMock.slowDelay = 0 // We override behavior below + renewalMock := &mockRenewalService{ + blockCh: blockCh, // blocks until closed, ignores ctx + } jobMock := &mockJobService{} agentMock := &mockAgentService{} @@ -290,31 +299,23 @@ func TestWaitForCompletionTimeout(t *testing.T) { defer cancel() defer close(blockCh) // Unblock the mock after test completes - // Override the renewal mock to block on a channel (ignores context cancel) - renewalMock.slowDelay = 30 * time.Second // Long enough to outlast the test - // Start scheduler startedChan := sched.Start(ctx) <-startedChan - // Let it run briefly so a job starts - time.Sleep(150 * time.Millisecond) + // Let it run briefly so the initial job starts and blocks + time.Sleep(50 * time.Millisecond) - // Stop scheduler — but the in-flight job won't finish (blocked) + // Stop scheduler — but the in-flight work goroutine won't finish (blocked on channel) cancel() - // Wait with very short timeout (much shorter than the blocked job) + // Wait with very short timeout (work is stuck on blockCh) start := time.Now() err := sched.WaitForCompletion(200 * time.Millisecond) elapsed := time.Since(start) - if err == nil { - t.Logf("WaitForCompletion completed in %v (job may have been cancelled by context)", elapsed) - t.Skip("flaky: job completed before timeout — context cancellation propagated faster than expected") - } - if err != ErrSchedulerShutdownTimeout { - t.Fatalf("expected ErrSchedulerShutdownTimeout, got %v", err) + t.Fatalf("expected ErrSchedulerShutdownTimeout, got %v (elapsed: %v)", err, elapsed) } t.Logf("WaitForCompletion correctly timed out after %v", elapsed)