issue-740: expand oneTimeJob to support multiple times (#741)

This commit is contained in:
Rodrigo Broggi 2024-06-21 23:51:42 +02:00 committed by GitHub
parent 7c391d4326
commit 64d6e48dae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 256 additions and 15 deletions

View File

@ -354,6 +354,17 @@ func ExampleOneTimeJob() {
func() {}, func() {},
), ),
) )
// run job twice - once in 10 seconds and once in 55 minutes
n := time.Now()
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartDateTimes(
n.Add(10*time.Second),
n.Add(55*time.Minute),
),
),
NewTask(func() {}),
)
s.Start() s.Start()
} }

76
job.go
View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/rand" "math/rand"
"sort"
"strings" "strings"
"time" "time"
@ -446,35 +447,47 @@ type oneTimeJobDefinition struct {
} }
func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error { func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location, now time.Time) error {
j.jobSchedule = oneTimeJob{} sortedTimes := o.startAt(j)
if err := o.startAt(j); err != nil { sort.Slice(sortedTimes, func(i, j int) bool {
return err return sortedTimes[i].Before(sortedTimes[j])
})
// keep only schedules that are in the future
idx, found := slices.BinarySearchFunc(sortedTimes, now, timeCmp())
if found {
idx++
} }
// in case we are not in the `startImmediately` case, our start-date must be in sortedTimes = sortedTimes[idx:]
// the future according to the scheduler clock if !j.startImmediately && len(sortedTimes) == 0 {
if !j.startImmediately && (j.startTime.IsZero() || j.startTime.Before(now)) {
return ErrOneTimeJobStartDateTimePast return ErrOneTimeJobStartDateTimePast
} }
j.jobSchedule = oneTimeJob{sortedTimes: sortedTimes}
return nil return nil
} }
// OneTimeJobStartAtOption defines when the one time job is run // OneTimeJobStartAtOption defines when the one time job is run
type OneTimeJobStartAtOption func(*internalJob) error type OneTimeJobStartAtOption func(*internalJob) []time.Time
// OneTimeJobStartImmediately tells the scheduler to run the one time job immediately. // OneTimeJobStartImmediately tells the scheduler to run the one time job immediately.
func OneTimeJobStartImmediately() OneTimeJobStartAtOption { func OneTimeJobStartImmediately() OneTimeJobStartAtOption {
return func(j *internalJob) error { return func(j *internalJob) []time.Time {
j.startImmediately = true j.startImmediately = true
return nil return []time.Time{}
} }
} }
// OneTimeJobStartDateTime sets the date & time at which the job should run. // OneTimeJobStartDateTime sets the date & time at which the job should run.
// This datetime must be in the future (according to the scheduler clock). // This datetime must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption { func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) error { return func(j *internalJob) []time.Time {
j.startTime = start return []time.Time{start}
return nil }
}
// OneTimeJobStartDateTimes sets the date & times at which the job should run.
// At least one of the date/times must be in the future (according to the scheduler clock).
func OneTimeJobStartDateTimes(times ...time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) []time.Time {
return times
} }
} }
@ -486,6 +499,18 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
} }
} }
func timeCmp() func(element time.Time, target time.Time) int {
return func(element time.Time, target time.Time) int {
if element.Equal(target) {
return 0
}
if element.Before(target) {
return -1
}
return 1
}
}
// ----------------------------------------------- // -----------------------------------------------
// ----------------------------------------------- // -----------------------------------------------
// ----------------- Job Options ----------------- // ----------------- Job Options -----------------
@ -876,12 +901,35 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
var _ jobSchedule = (*oneTimeJob)(nil) var _ jobSchedule = (*oneTimeJob)(nil)
type oneTimeJob struct{} type oneTimeJob struct {
sortedTimes []time.Time
}
func (o oneTimeJob) next(_ time.Time) time.Time { // next finds the next item in a sorted list of times using binary-search.
//
// example: sortedTimes: [2, 4, 6, 8]
//
// lastRun: 1 => [idx=0,found=false] => next is 2 - sorted[idx] idx=0
// lastRun: 2 => [idx=0,found=true] => next is 4 - sorted[idx+1] idx=1
// lastRun: 3 => [idx=1,found=false] => next is 4 - sorted[idx] idx=1
// lastRun: 4 => [idx=1,found=true] => next is 6 - sorted[idx+1] idx=2
// lastRun: 7 => [idx=3,found=false] => next is 8 - sorted[idx] idx=3
// lastRun: 8 => [idx=3,found=found] => next is none
// lastRun: 9 => [idx=3,found=found] => next is none
func (o oneTimeJob) next(lastRun time.Time) time.Time {
idx, found := slices.BinarySearchFunc(o.sortedTimes, lastRun, timeCmp())
// if found, the next run is the following index
if found {
idx++
}
// exhausted runs
if idx >= len(o.sortedTimes) {
return time.Time{} return time.Time{}
} }
return o.sortedTimes[idx]
}
// ----------------------------------------------- // -----------------------------------------------
// ----------------------------------------------- // -----------------------------------------------
// ---------------- Job Interface ---------------- // ---------------- Job Interface ----------------

View File

@ -2089,6 +2089,188 @@ func TestScheduler_OneTimeJob(t *testing.T) {
} }
} }
func TestScheduler_AtTimesJob(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
n := time.Now().UTC()
tests := []struct {
name string
atTimes []time.Time
fakeClock clockwork.FakeClock
assertErr require.ErrorAssertionFunc
// asserts things about schedules, advance time and perform new assertions
advanceAndAsserts []func(
t *testing.T,
j Job,
clock clockwork.FakeClock,
runs *atomic.Uint32,
)
}{
{
name: "no at times",
atTimes: []time.Time{},
fakeClock: clockwork.NewFakeClock(),
assertErr: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
},
},
{
name: "all in the past",
atTimes: []time.Time{n.Add(-1 * time.Second)},
fakeClock: clockwork.NewFakeClockAt(n),
assertErr: func(t require.TestingT, err error, i ...interface{}) {
require.ErrorIs(t, err, ErrOneTimeJobStartDateTimePast)
},
},
{
name: "one run 1 millisecond in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())
// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)
// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)
// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
{
name: "one run in the past and one in the future",
atTimes: []time.Time{n.Add(-1 * time.Millisecond), n.Add(1 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())
// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)
// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)
// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
},
},
},
{
name: "two runs in the future",
atTimes: []time.Time{n.Add(1 * time.Millisecond), n.Add(3 * time.Millisecond)},
fakeClock: clockwork.NewFakeClockAt(n),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())
// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)
// next is now
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(1*time.Millisecond), nextRunAt)
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)
// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(1*time.Millisecond), lastRunAt, 1*time.Millisecond)
nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, n.Add(3*time.Millisecond), nextRunAt)
},
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
// advance and eventually run
clock.Advance(2 * time.Millisecond)
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(2), runs.Load())
}, 3*time.Second, 100*time.Millisecond)
// last was run
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, n.Add(3*time.Millisecond), lastRunAt, 1*time.Millisecond)
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithClock(tt.fakeClock))
t.Cleanup(func() {
require.NoError(t, s.Shutdown())
})
runs := atomic.Uint32{}
j, err := s.NewJob(
OneTimeJob(OneTimeJobStartDateTimes(tt.atTimes...)),
NewTask(func() {
runs.Add(1)
}),
)
if tt.assertErr != nil {
tt.assertErr(t, err)
} else {
require.NoError(t, err)
s.Start()
for _, advanceAndAssert := range tt.advanceAndAsserts {
advanceAndAssert(t, j, tt.fakeClock, &runs)
}
}
})
}
}
func TestScheduler_WithLimitedRuns(t *testing.T) { func TestScheduler_WithLimitedRuns(t *testing.T) {
defer verifyNoGoroutineLeaks(t) defer verifyNoGoroutineLeaks(t)