[fix] race condition in `NextRun()` when multiple jobs complete simultaneously (#907)

* fix: select execute job on complete

* fix: update next job scheduling

* fix: handle test case

* feat: test next run with multiple jobs

* feat: test next run with concurrent completions

* fix: remove iteration param based on feedbacks

---------

Co-authored-by: Barkhayot Juraev <barkhayotjuraev@Barkhayots-MacBook-Pro.local>
This commit is contained in:
Barkhayot Juraev 2026-01-29 00:43:58 +09:00 committed by GitHub
parent 51570c3648
commit 93ffeadbf6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 202 additions and 16 deletions

View File

@ -609,14 +609,14 @@ func TestJob_NextRuns(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
jd JobDefinition jd JobDefinition
assertion func(t *testing.T, iteration int, previousRun, nextRun time.Time) assertion func(t *testing.T, previousRun, nextRun time.Time)
}{ }{
{ {
"simple - milliseconds", "simple - milliseconds",
DurationJob( DurationJob(
100 * time.Millisecond, 100 * time.Millisecond,
), ),
func(t *testing.T, _ int, previousRun, nextRun time.Time) { func(t *testing.T, previousRun, nextRun time.Time) {
assert.Equal(t, previousRun.UnixMilli()+100, nextRun.UnixMilli()) assert.Equal(t, previousRun.UnixMilli()+100, nextRun.UnixMilli())
}, },
}, },
@ -629,13 +629,11 @@ func TestJob_NextRuns(t *testing.T) {
NewAtTime(0, 0, 0), NewAtTime(0, 0, 0),
), ),
), ),
func(t *testing.T, iteration int, previousRun, nextRun time.Time) { func(t *testing.T, previousRun, nextRun time.Time) {
// With the fix for NextRun accuracy, the immediate run (Jan 1) is removed
// from nextScheduled after it completes. So all intervals should be 14 days
// (2 weeks as configured).
diff := time.Hour * 14 * 24 diff := time.Hour * 14 * 24
if iteration == 1 {
// because the job is run immediately, the first run is on
// Saturday 1/1/2000. The following run is then on Tuesday 1/11/2000
diff = time.Hour * 10 * 24
}
assert.Equal(t, previousRun.Add(diff).Day(), nextRun.Day()) assert.Equal(t, previousRun.Add(diff).Day(), nextRun.Day())
}, },
}, },
@ -672,7 +670,7 @@ func TestJob_NextRuns(t *testing.T) {
// skipping because there is no previous run // skipping because there is no previous run
continue continue
} }
tt.assertion(t, i, nextRuns[i-1], nextRuns[i]) tt.assertion(t, nextRuns[i-1], nextRuns[i])
} }
assert.NoError(t, s.Shutdown()) assert.NoError(t, s.Shutdown())
@ -1197,3 +1195,191 @@ func TestWithIntervalFromCompletion_FirstRun(t *testing.T) {
assert.Less(t, timeSinceStart.Seconds(), 1.0, assert.Less(t, timeSinceStart.Seconds(), 1.0,
"First run should happen quickly with WithStartImmediately") "First run should happen quickly with WithStartImmediately")
} }
func TestJob_NextRun_MultipleJobsSimultaneously(t *testing.T) {
// This test reproduces the bug where multiple jobs completing simultaneously
// would cause NextRun() to return stale values due to race conditions in
// nextScheduled cleanup.
testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
fakeClock := clockwork.NewFakeClockAt(testTime)
s := newTestScheduler(t,
WithClock(fakeClock),
WithLocation(time.UTC),
)
jobsCompleted := make(chan struct{}, 4)
// Create multiple jobs with different intervals that will complete around the same time
job1, err := s.NewJob(
DurationJob(1*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job1"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
job2, err := s.NewJob(
DurationJob(2*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job2"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
job3, err := s.NewJob(
DurationJob(3*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job3"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
job4, err := s.NewJob(
DurationJob(4*time.Minute),
NewTask(func() {
jobsCompleted <- struct{}{}
}),
WithName("job4"),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
s.Start()
// Wait for all 4 jobs to complete their immediate run
for i := 0; i < 4; i++ {
<-jobsCompleted
}
// Give the scheduler time to process the completions and reschedule
time.Sleep(50 * time.Millisecond)
// Verify that NextRun() returns the correct next scheduled time for each job
// and not a stale value from the just-completed run
nextRun1, err := job1.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(1*time.Minute), nextRun1, "job1 NextRun should be 1 minute from start")
nextRun2, err := job2.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(2*time.Minute), nextRun2, "job2 NextRun should be 2 minutes from start")
nextRun3, err := job3.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(3*time.Minute), nextRun3, "job3 NextRun should be 3 minutes from start")
nextRun4, err := job4.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(4*time.Minute), nextRun4, "job4 NextRun should be 4 minutes from start")
// Advance time to trigger job1's next run
fakeClock.Advance(1 * time.Minute)
// Wait for job1 to complete
<-jobsCompleted
time.Sleep(50 * time.Millisecond)
// After job1's second run, it should be scheduled for +2 minutes from start
nextRun1, err = job1.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(2*time.Minute), nextRun1, "job1 NextRun should be 2 minutes from start after first interval")
require.NoError(t, s.Shutdown())
}
func TestJob_NextRun_ConcurrentCompletions(t *testing.T) {
// This test verifies that when multiple jobs complete at exactly the same time,
// their NextRun() values are correctly updated without race conditions.
testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
fakeClock := clockwork.NewFakeClockAt(testTime)
s := newTestScheduler(t,
WithClock(fakeClock),
WithLocation(time.UTC), // Set scheduler to use UTC to match our test time
)
var wg sync.WaitGroup
jobCompletionBarrier := make(chan struct{})
// Create jobs that will all complete at the same instant
createJob := func(name string, interval time.Duration) Job {
job, err := s.NewJob(
DurationJob(interval),
NewTask(func() {
wg.Done()
<-jobCompletionBarrier // Wait until all jobs are ready to complete
}),
WithName(name),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
return job
}
wg.Add(4)
job1 := createJob("concurrent-job1", 1*time.Minute)
job2 := createJob("concurrent-job2", 2*time.Minute)
job3 := createJob("concurrent-job3", 3*time.Minute)
job4 := createJob("concurrent-job4", 4*time.Minute)
s.Start()
wg.Wait()
close(jobCompletionBarrier)
// Give the scheduler time to process all completions
time.Sleep(100 * time.Millisecond)
// Verify NextRun() for all jobs concurrently to stress test the race condition
var testWg sync.WaitGroup
testWg.Add(4)
go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job1.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(1*time.Minute), nextRun)
}
}()
go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job2.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(2*time.Minute), nextRun)
}
}()
go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job3.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(3*time.Minute), nextRun)
}
}()
go func() {
defer testWg.Done()
for i := 0; i < 10; i++ {
nextRun, err := job4.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(4*time.Minute), nextRun)
}
}()
testWg.Wait()
require.NoError(t, s.Shutdown())
}

View File

@ -443,11 +443,11 @@ func (s *scheduler) updateNextScheduled(id uuid.UUID) {
return return
} }
var newNextScheduled []time.Time var newNextScheduled []time.Time
now := s.now()
for _, t := range j.nextScheduled { for _, t := range j.nextScheduled {
if t.Before(s.now()) { if t.After(now) { // Changed to match selectExecJobsOutCompleted
continue newNextScheduled = append(newNextScheduled, t)
} }
newNextScheduled = append(newNextScheduled, t)
} }
j.nextScheduled = newNextScheduled j.nextScheduled = newNextScheduled
s.jobs[id] = j s.jobs[id] = j
@ -460,13 +460,13 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
} }
// if the job has nextScheduled time in the past, // if the job has nextScheduled time in the past,
// we need to remove any that are in the past. // we need to remove any that are in the past or at the current time (just executed).
var newNextScheduled []time.Time var newNextScheduled []time.Time
now := s.now()
for _, t := range j.nextScheduled { for _, t := range j.nextScheduled {
if t.Before(s.now()) { if t.After(now) {
continue newNextScheduled = append(newNextScheduled, t)
} }
newNextScheduled = append(newNextScheduled, t)
} }
j.nextScheduled = newNextScheduled j.nextScheduled = newNextScheduled