diff --git a/scheduler.go b/scheduler.go index f5c7d26..4477a9b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -268,10 +268,15 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { delete(s.jobs, id) } +// Jobs coming back from the executor to the scheduler that +// need to evaluated for rescheduling. func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { j := s.jobs[id] j.lastRun = j.nextRun + // if the job has a limited number of runs set, we need to + // check how many runs have occurred and stop running this + // job if it has reached the limit. if j.limitRunsTo != nil { j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 if j.limitRunsTo.runCount == j.limitRunsTo.limit { @@ -288,10 +293,25 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { next := j.next(j.lastRun) if next.IsZero() { + // the job's next function will return zero for OneTime jobs. + // since they are one time only, they do not need rescheduling. return } + if next.Before(s.now()) { + // in some cases the next run time can be in the past, for example: + // - the time on the machine was incorrect and has been synced with ntp + // - the machine went to sleep, and woke up some time later + // in those cases, we want to increment to the next run in the future + // and schedule the job for that time. + for next.Before(s.now()) { + next = j.next(next) + } + } j.nextRun = next j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + // set the actual timer on the job here and listen for + // shut down events so that the job doesn't attempt to + // run if the scheduler has been shutdown. select { case <-s.shutdownCtx.Done(): return @@ -301,6 +321,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { }: } }) + // update the job with its new next and last run times and timer. s.jobs[id] = j }