mirror of
https://github.com/shankar0123/certctl.git
synced 2026-06-13 02:09:24 +00:00
fix: scheduler race — track loop goroutines in WaitGroup
Root cause: WaitForCompletion only waited for work goroutines (wg), but the 5-6 loop goroutines (renewalCheckLoop, jobProcessorLoop, etc.) were not tracked. After cancel() + WaitForCompletion(), loop goroutines could still be alive accessing scheduler/mock fields when the next test started, triggering the race detector. Fix: - Start() now adds loop goroutines to wg, so WaitForCompletion blocks until both work items AND loops have fully exited - Removed untracked 100ms timer goroutine for startedChan — now closed immediately after launching loops - Timeout test updated: uses blockCh (ignores context) instead of slowDelay (respects context) so it reliably triggers the timeout path Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -126,21 +126,25 @@ func (s *Scheduler) Start(ctx context.Context) <-chan struct{} {
|
|||||||
go func() {
|
go func() {
|
||||||
s.logger.Info("scheduler starting")
|
s.logger.Info("scheduler starting")
|
||||||
|
|
||||||
// Signal that the scheduler has started all loops
|
// Track all loop goroutines in the WaitGroup so WaitForCompletion
|
||||||
go func() {
|
// blocks until they've fully exited (prevents test races).
|
||||||
<-time.After(100 * time.Millisecond)
|
loopCount := 5
|
||||||
close(startedChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Start all scheduler loops concurrently
|
|
||||||
go s.renewalCheckLoop(ctx)
|
|
||||||
go s.jobProcessorLoop(ctx)
|
|
||||||
go s.agentHealthCheckLoop(ctx)
|
|
||||||
go s.notificationProcessLoop(ctx)
|
|
||||||
go s.shortLivedExpiryCheckLoop(ctx)
|
|
||||||
if s.networkScanService != nil {
|
if s.networkScanService != nil {
|
||||||
go s.networkScanLoop(ctx)
|
loopCount = 6
|
||||||
}
|
}
|
||||||
|
s.wg.Add(loopCount)
|
||||||
|
|
||||||
|
go func() { defer s.wg.Done(); s.renewalCheckLoop(ctx) }()
|
||||||
|
go func() { defer s.wg.Done(); s.jobProcessorLoop(ctx) }()
|
||||||
|
go func() { defer s.wg.Done(); s.agentHealthCheckLoop(ctx) }()
|
||||||
|
go func() { defer s.wg.Done(); s.notificationProcessLoop(ctx) }()
|
||||||
|
go func() { defer s.wg.Done(); s.shortLivedExpiryCheckLoop(ctx) }()
|
||||||
|
if s.networkScanService != nil {
|
||||||
|
go func() { defer s.wg.Done(); s.networkScanLoop(ctx) }()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal that all loops are launched
|
||||||
|
close(startedChan)
|
||||||
|
|
||||||
// Wait for context cancellation
|
// Wait for context cancellation
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
|
|||||||
@@ -16,14 +16,22 @@ type mockRenewalService struct {
|
|||||||
callTimes []time.Time
|
callTimes []time.Time
|
||||||
slowDelay time.Duration
|
slowDelay time.Duration
|
||||||
shouldError bool
|
shouldError bool
|
||||||
|
blockCh chan struct{} // if non-nil, blocks until closed (ignores context)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockRenewalService) CheckExpiringCertificates(ctx context.Context) error {
|
func (m *mockRenewalService) CheckExpiringCertificates(ctx context.Context) error {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
m.callCount++
|
m.callCount++
|
||||||
m.callTimes = append(m.callTimes, time.Now())
|
m.callTimes = append(m.callTimes, time.Now())
|
||||||
|
blockCh := m.blockCh
|
||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
// If blockCh is set, block until it's closed (ignores context — for timeout tests)
|
||||||
|
if blockCh != nil {
|
||||||
|
<-blockCh
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if m.slowDelay > 0 {
|
if m.slowDelay > 0 {
|
||||||
select {
|
select {
|
||||||
case <-time.After(m.slowDelay):
|
case <-time.After(m.slowDelay):
|
||||||
@@ -274,8 +282,9 @@ func TestWaitForCompletionTimeout(t *testing.T) {
|
|||||||
// Use a channel-blocked mock that ignores context cancellation,
|
// Use a channel-blocked mock that ignores context cancellation,
|
||||||
// ensuring work is still in-flight when WaitForCompletion is called.
|
// ensuring work is still in-flight when WaitForCompletion is called.
|
||||||
blockCh := make(chan struct{})
|
blockCh := make(chan struct{})
|
||||||
renewalMock := &mockRenewalService{}
|
renewalMock := &mockRenewalService{
|
||||||
renewalMock.slowDelay = 0 // We override behavior below
|
blockCh: blockCh, // blocks until closed, ignores ctx
|
||||||
|
}
|
||||||
|
|
||||||
jobMock := &mockJobService{}
|
jobMock := &mockJobService{}
|
||||||
agentMock := &mockAgentService{}
|
agentMock := &mockAgentService{}
|
||||||
@@ -290,31 +299,23 @@ func TestWaitForCompletionTimeout(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
defer close(blockCh) // Unblock the mock after test completes
|
defer close(blockCh) // Unblock the mock after test completes
|
||||||
|
|
||||||
// Override the renewal mock to block on a channel (ignores context cancel)
|
|
||||||
renewalMock.slowDelay = 30 * time.Second // Long enough to outlast the test
|
|
||||||
|
|
||||||
// Start scheduler
|
// Start scheduler
|
||||||
startedChan := sched.Start(ctx)
|
startedChan := sched.Start(ctx)
|
||||||
<-startedChan
|
<-startedChan
|
||||||
|
|
||||||
// Let it run briefly so a job starts
|
// Let it run briefly so the initial job starts and blocks
|
||||||
time.Sleep(150 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
// Stop scheduler — but the in-flight job won't finish (blocked)
|
// Stop scheduler — but the in-flight work goroutine won't finish (blocked on channel)
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
// Wait with very short timeout (much shorter than the blocked job)
|
// Wait with very short timeout (work is stuck on blockCh)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := sched.WaitForCompletion(200 * time.Millisecond)
|
err := sched.WaitForCompletion(200 * time.Millisecond)
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
t.Logf("WaitForCompletion completed in %v (job may have been cancelled by context)", elapsed)
|
|
||||||
t.Skip("flaky: job completed before timeout — context cancellation propagated faster than expected")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != ErrSchedulerShutdownTimeout {
|
if err != ErrSchedulerShutdownTimeout {
|
||||||
t.Fatalf("expected ErrSchedulerShutdownTimeout, got %v", err)
|
t.Fatalf("expected ErrSchedulerShutdownTimeout, got %v (elapsed: %v)", err, elapsed)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("WaitForCompletion correctly timed out after %v", elapsed)
|
t.Logf("WaitForCompletion correctly timed out after %v", elapsed)
|
||||||
|
|||||||
Reference in New Issue
Block a user