diff --git a/job_test.go b/job_test.go index cfe82b7..33b5b0e 100644 --- a/job_test.go +++ b/job_test.go @@ -609,14 +609,14 @@ func TestJob_NextRuns(t *testing.T) { tests := []struct { name string jd JobDefinition - assertion func(t *testing.T, iteration int, previousRun, nextRun time.Time) + assertion func(t *testing.T, previousRun, nextRun time.Time) }{ { "simple - milliseconds", DurationJob( 100 * time.Millisecond, ), - func(t *testing.T, _ int, previousRun, nextRun time.Time) { + func(t *testing.T, previousRun, nextRun time.Time) { assert.Equal(t, previousRun.UnixMilli()+100, nextRun.UnixMilli()) }, }, @@ -629,13 +629,11 @@ func TestJob_NextRuns(t *testing.T) { NewAtTime(0, 0, 0), ), ), - func(t *testing.T, iteration int, previousRun, nextRun time.Time) { + func(t *testing.T, previousRun, nextRun time.Time) { + // With the fix for NextRun accuracy, the immediate run (Jan 1) is removed + // from nextScheduled after it completes. So all intervals should be 14 days + // (2 weeks as configured). diff := time.Hour * 14 * 24 - if iteration == 1 { - // because the job is run immediately, the first run is on - // Saturday 1/1/2000. The following run is then on Tuesday 1/11/2000 - diff = time.Hour * 10 * 24 - } assert.Equal(t, previousRun.Add(diff).Day(), nextRun.Day()) }, }, @@ -672,7 +670,7 @@ func TestJob_NextRuns(t *testing.T) { // skipping because there is no previous run continue } - tt.assertion(t, i, nextRuns[i-1], nextRuns[i]) + tt.assertion(t, nextRuns[i-1], nextRuns[i]) } assert.NoError(t, s.Shutdown()) @@ -1197,3 +1195,191 @@ func TestWithIntervalFromCompletion_FirstRun(t *testing.T) { assert.Less(t, timeSinceStart.Seconds(), 1.0, "First run should happen quickly with WithStartImmediately") } + +func TestJob_NextRun_MultipleJobsSimultaneously(t *testing.T) { + // This test reproduces the bug where multiple jobs completing simultaneously + // would cause NextRun() to return stale values due to race conditions in + // nextScheduled cleanup. + + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + fakeClock := clockwork.NewFakeClockAt(testTime) + + s := newTestScheduler(t, + WithClock(fakeClock), + WithLocation(time.UTC), + ) + + jobsCompleted := make(chan struct{}, 4) + + // Create multiple jobs with different intervals that will complete around the same time + job1, err := s.NewJob( + DurationJob(1*time.Minute), + NewTask(func() { + jobsCompleted <- struct{}{} + }), + WithName("job1"), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + job2, err := s.NewJob( + DurationJob(2*time.Minute), + NewTask(func() { + jobsCompleted <- struct{}{} + }), + WithName("job2"), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + job3, err := s.NewJob( + DurationJob(3*time.Minute), + NewTask(func() { + jobsCompleted <- struct{}{} + }), + WithName("job3"), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + job4, err := s.NewJob( + DurationJob(4*time.Minute), + NewTask(func() { + jobsCompleted <- struct{}{} + }), + WithName("job4"), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + s.Start() + + // Wait for all 4 jobs to complete their immediate run + for i := 0; i < 4; i++ { + <-jobsCompleted + } + + // Give the scheduler time to process the completions and reschedule + time.Sleep(50 * time.Millisecond) + + // Verify that NextRun() returns the correct next scheduled time for each job + // and not a stale value from the just-completed run + + nextRun1, err := job1.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(1*time.Minute), nextRun1, "job1 NextRun should be 1 minute from start") + + nextRun2, err := job2.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(2*time.Minute), nextRun2, "job2 NextRun should be 2 minutes from start") + + nextRun3, err := job3.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(3*time.Minute), nextRun3, "job3 NextRun should be 3 minutes from start") + + nextRun4, err := job4.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(4*time.Minute), nextRun4, "job4 NextRun should be 4 minutes from start") + + // Advance time to trigger job1's next run + fakeClock.Advance(1 * time.Minute) + + // Wait for job1 to complete + <-jobsCompleted + time.Sleep(50 * time.Millisecond) + + // After job1's second run, it should be scheduled for +2 minutes from start + nextRun1, err = job1.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(2*time.Minute), nextRun1, "job1 NextRun should be 2 minutes from start after first interval") + + require.NoError(t, s.Shutdown()) +} + +func TestJob_NextRun_ConcurrentCompletions(t *testing.T) { + // This test verifies that when multiple jobs complete at exactly the same time, + // their NextRun() values are correctly updated without race conditions. + + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + fakeClock := clockwork.NewFakeClockAt(testTime) + + s := newTestScheduler(t, + WithClock(fakeClock), + WithLocation(time.UTC), // Set scheduler to use UTC to match our test time + ) + + var wg sync.WaitGroup + jobCompletionBarrier := make(chan struct{}) + + // Create jobs that will all complete at the same instant + createJob := func(name string, interval time.Duration) Job { + job, err := s.NewJob( + DurationJob(interval), + NewTask(func() { + wg.Done() + <-jobCompletionBarrier // Wait until all jobs are ready to complete + }), + WithName(name), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + return job + } + + wg.Add(4) + job1 := createJob("concurrent-job1", 1*time.Minute) + job2 := createJob("concurrent-job2", 2*time.Minute) + job3 := createJob("concurrent-job3", 3*time.Minute) + job4 := createJob("concurrent-job4", 4*time.Minute) + + s.Start() + + wg.Wait() + close(jobCompletionBarrier) + + // Give the scheduler time to process all completions + time.Sleep(100 * time.Millisecond) + + // Verify NextRun() for all jobs concurrently to stress test the race condition + var testWg sync.WaitGroup + testWg.Add(4) + + go func() { + defer testWg.Done() + for i := 0; i < 10; i++ { + nextRun, err := job1.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(1*time.Minute), nextRun) + } + }() + + go func() { + defer testWg.Done() + for i := 0; i < 10; i++ { + nextRun, err := job2.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(2*time.Minute), nextRun) + } + }() + + go func() { + defer testWg.Done() + for i := 0; i < 10; i++ { + nextRun, err := job3.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(3*time.Minute), nextRun) + } + }() + + go func() { + defer testWg.Done() + for i := 0; i < 10; i++ { + nextRun, err := job4.NextRun() + require.NoError(t, err) + assert.Equal(t, testTime.Add(4*time.Minute), nextRun) + } + }() + + testWg.Wait() + require.NoError(t, s.Shutdown()) +} diff --git a/scheduler.go b/scheduler.go index a35c42c..b0538b3 100644 --- a/scheduler.go +++ b/scheduler.go @@ -443,11 +443,11 @@ func (s *scheduler) updateNextScheduled(id uuid.UUID) { return } var newNextScheduled []time.Time + now := s.now() for _, t := range j.nextScheduled { - if t.Before(s.now()) { - continue + if t.After(now) { // Changed to match selectExecJobsOutCompleted + newNextScheduled = append(newNextScheduled, t) } - newNextScheduled = append(newNextScheduled, t) } j.nextScheduled = newNextScheduled s.jobs[id] = j @@ -460,13 +460,13 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { } // if the job has nextScheduled time in the past, - // we need to remove any that are in the past. + // we need to remove any that are in the past or at the current time (just executed). var newNextScheduled []time.Time + now := s.now() for _, t := range j.nextScheduled { - if t.Before(s.now()) { - continue + if t.After(now) { + newNextScheduled = append(newNextScheduled, t) } - newNextScheduled = append(newNextScheduled, t) } j.nextScheduled = newNextScheduled