fix nextRun with singleton mode reporting incorrect time (#705)

* fix nextRun with singleton mode reporting incorrect time

* only remove past if >1, sort next scheduled

* update test, remove no longer needed lastScheduledRun
This commit is contained in:
John Roesler 2024-04-05 20:56:22 -05:00 committed by GitHub
parent f021cc4721
commit 3b653b99e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 95 additions and 9 deletions

14
job.go
View File

@ -24,8 +24,11 @@ type internalJob struct {
name string
tags []string
jobSchedule
lastScheduledRun time.Time
nextScheduled time.Time
// as some jobs may queue up, it's possible to
// have multiple nextScheduled times
nextScheduled []time.Time
lastRun time.Time
function any
parameters []any
@ -894,7 +897,12 @@ func (j job) NextRun() (time.Time, error) {
if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound
}
return ij.nextScheduled, nil
if len(ij.nextScheduled) == 0 {
return time.Time{}, nil
}
// the first element is the next scheduled run with subsequent
// runs following after in the slice
return ij.nextScheduled[0], nil
}
func (j job) Tags() []string {

View File

@ -492,3 +492,59 @@ func TestWithEventListeners(t *testing.T) {
})
}
}
func TestJob_NextRun(t *testing.T) {
tests := []struct {
name string
f func()
}{
{
"simple",
func() {},
},
{
"sleep 3 seconds",
func() {
time.Sleep(300 * time.Millisecond)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testTime := time.Now()
s := newTestScheduler(t)
// run a job every 10 milliseconds that starts 10 milliseconds after the current time
j, err := s.NewJob(
DurationJob(
100*time.Millisecond,
),
NewTask(
func() {},
),
WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))),
WithSingletonMode(LimitModeReschedule),
)
require.NoError(t, err)
s.Start()
nextRun, err := j.NextRun()
require.NoError(t, err)
assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun)
time.Sleep(150 * time.Millisecond)
nextRun, err = j.NextRun()
assert.NoError(t, err)
assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun)
assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime))
err = s.Shutdown()
require.NoError(t, err)
})
}
}

View File

@ -298,9 +298,18 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
// so we don't need to reschedule it.
return
}
j.lastScheduledRun = j.nextScheduled
var scheduleFrom time.Time
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
// the subsequent next run time.
slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int {
return a.Compare(b)
})
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1]
}
next := j.next(j.lastScheduledRun)
next := j.next(scheduleFrom)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
@ -316,7 +325,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
next = j.next(next)
}
}
j.nextScheduled = next
j.nextScheduled = append(j.nextScheduled, next)
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
@ -340,6 +349,19 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
return
}
// if the job has more than one nextScheduled time,
// we need to remove any that are in the past.
if len(j.nextScheduled) > 1 {
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled
}
// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
@ -400,7 +422,7 @@ func (s *scheduler) selectNewJob(in newJobIn) {
}
})
}
j.nextScheduled = next
j.nextScheduled = append(j.nextScheduled, next)
}
s.jobs[j.id] = j
@ -451,7 +473,7 @@ func (s *scheduler) selectStart() {
}
})
}
j.nextScheduled = next
j.nextScheduled = append(j.nextScheduled, next)
s.jobs[id] = j
}
select {

View File

@ -1622,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
WithSingletonMode(LimitModeReschedule),
},
func() time.Duration {
return 20 * time.Second
return 10 * time.Second
},
1,
},