mirror of https://github.com/go-co-op/gocron.git
issue-654: allow setting a stopTime for job. (#760)
This commit is contained in:
parent
256265f9a2
commit
3b2dcd869b
|
|
@ -45,6 +45,9 @@ var (
|
|||
ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil")
|
||||
ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty")
|
||||
ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past")
|
||||
ErrWithStopDateTimePast = fmt.Errorf("gocron: WithStopDateTime: end must not be in the past")
|
||||
ErrStartTimeLaterThanEndTime = fmt.Errorf("gocron: WithStartDateTime: start must not be later than end")
|
||||
ErrStopTimeEarlierThanStartTime = fmt.Errorf("gocron: WithStopDateTime: end must not be earlier than start")
|
||||
ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0")
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -358,6 +358,10 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
|
|||
default:
|
||||
}
|
||||
|
||||
if j.stopTimeReached(e.clock.Now()) {
|
||||
return
|
||||
}
|
||||
|
||||
if e.elector != nil {
|
||||
if err := e.elector.IsLeader(j.ctx); err != nil {
|
||||
e.sendOutForRescheduling(&jIn)
|
||||
|
|
|
|||
38
job.go
38
job.go
|
|
@ -38,6 +38,7 @@ type internalJob struct {
|
|||
limitRunsTo *limitRunsTo
|
||||
startTime time.Time
|
||||
startImmediately bool
|
||||
stopTime time.Time
|
||||
// event listeners
|
||||
afterJobRuns func(jobID uuid.UUID, jobName string)
|
||||
beforeJobRuns func(jobID uuid.UUID, jobName string)
|
||||
|
|
@ -60,6 +61,13 @@ func (j *internalJob) stop() {
|
|||
j.cancel()
|
||||
}
|
||||
|
||||
func (j *internalJob) stopTimeReached(now time.Time) bool {
|
||||
if j.stopTime.IsZero() {
|
||||
return false
|
||||
}
|
||||
return j.stopTime.Before(now)
|
||||
}
|
||||
|
||||
// task stores the function and parameters
|
||||
// that are actually run when the job is executed.
|
||||
type task struct {
|
||||
|
|
@ -594,11 +602,41 @@ func WithStartDateTime(start time.Time) StartAtOption {
|
|||
if start.IsZero() || start.Before(now) {
|
||||
return ErrWithStartDateTimePast
|
||||
}
|
||||
if !j.stopTime.IsZero() && j.stopTime.Before(start) {
|
||||
return ErrStartTimeLaterThanEndTime
|
||||
}
|
||||
j.startTime = start
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithStopAt sets the option for stopping the job from running
|
||||
// after the specified time.
|
||||
func WithStopAt(option StopAtOption) JobOption {
|
||||
return func(j *internalJob, now time.Time) error {
|
||||
return option(j, now)
|
||||
}
|
||||
}
|
||||
|
||||
// StopAtOption defines options for stopping the job
|
||||
type StopAtOption func(*internalJob, time.Time) error
|
||||
|
||||
// WithStopDateTime sets the final date & time after which the job should stop.
|
||||
// This must be in the future and should be after the startTime (if specified).
|
||||
// The job's final run may be at the stop time, but not after.
|
||||
func WithStopDateTime(end time.Time) StopAtOption {
|
||||
return func(j *internalJob, now time.Time) error {
|
||||
if end.IsZero() || end.Before(now) {
|
||||
return ErrWithStopDateTimePast
|
||||
}
|
||||
if end.Before(j.startTime) {
|
||||
return ErrStopTimeEarlierThanStartTime
|
||||
}
|
||||
j.stopTime = end
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithTags sets the tags for the job. Tags provide
|
||||
// a way to identify jobs by a set of tags and remove
|
||||
// multiple jobs by tag.
|
||||
|
|
|
|||
|
|
@ -325,6 +325,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
|
|||
return
|
||||
}
|
||||
|
||||
if j.stopTimeReached(s.now()) {
|
||||
return
|
||||
}
|
||||
|
||||
scheduleFrom := j.lastRun
|
||||
if len(j.nextScheduled) > 0 {
|
||||
// always grab the last element in the slice as that is the furthest
|
||||
|
|
|
|||
|
|
@ -150,6 +150,21 @@ func TestScheduler_LongRunningJobs(t *testing.T) {
|
|||
options []SchedulerOption
|
||||
expectedRuns int
|
||||
}{
|
||||
{
|
||||
"duration with stop time between executions",
|
||||
durationCh,
|
||||
DurationJob(
|
||||
time.Millisecond * 500,
|
||||
),
|
||||
NewTask(
|
||||
func() {
|
||||
time.Sleep(1 * time.Second)
|
||||
durationCh <- struct{}{}
|
||||
}),
|
||||
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Millisecond * 1100)))},
|
||||
[]SchedulerOption{WithStopTimeout(time.Second * 2)},
|
||||
2,
|
||||
},
|
||||
{
|
||||
"duration",
|
||||
durationCh,
|
||||
|
|
@ -755,6 +770,22 @@ func TestScheduler_NewJobErrors(t *testing.T) {
|
|||
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))},
|
||||
ErrWithStartDateTimePast,
|
||||
},
|
||||
{
|
||||
"WithStartDateTime is later than the end",
|
||||
DurationJob(
|
||||
time.Second,
|
||||
),
|
||||
[]JobOption{WithStopAt(WithStopDateTime(time.Now().Add(time.Second))), WithStartAt(WithStartDateTime(time.Now().Add(time.Hour)))},
|
||||
ErrStartTimeLaterThanEndTime,
|
||||
},
|
||||
{
|
||||
"WithStopDateTime is earlier than the start",
|
||||
DurationJob(
|
||||
time.Second,
|
||||
),
|
||||
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(time.Hour))), WithStopAt(WithStopDateTime(time.Now().Add(time.Second)))},
|
||||
ErrStopTimeEarlierThanStartTime,
|
||||
},
|
||||
{
|
||||
"oneTimeJob start at is zero",
|
||||
OneTimeJob(OneTimeJobStartDateTime(time.Time{})),
|
||||
|
|
|
|||
Loading…
Reference in New Issue