diff --git a/executor.go b/executor.go index 3a9dc0d..3f48a1e 100644 --- a/executor.go +++ b/executor.go @@ -31,6 +31,9 @@ type executor struct { // used to request jobs from the scheduler jobOutRequest chan jobOutRequest + // sends out job needs to update the next runs + jobUpdateNextRuns chan uuid.UUID + // used by the executor to receive a stop signal from the scheduler stopCh chan struct{} // the timeout value when stopping @@ -247,6 +250,14 @@ func (e *executor) sendOutForRescheduling(jIn *jobIn) { jIn.shouldSendOut = false } +func (e *executor) sendOutForNextRunUpdate(jIn *jobIn) { + select { + case e.jobUpdateNextRuns <- jIn.id: + case <-e.ctx.Done(): + return + } +} + func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { e.logger.Debug("gocron: limitModeRunner starting", "name", name) for { @@ -376,6 +387,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { _ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err) e.sendOutForRescheduling(&jIn) e.incrementJobCounter(j, Skip) + e.sendOutForNextRunUpdate(&jIn) return } defer func() { _ = lock.Unlock(j.ctx) }() @@ -385,6 +397,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { _ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err) e.sendOutForRescheduling(&jIn) e.incrementJobCounter(j, Skip) + e.sendOutForNextRunUpdate(&jIn) return } defer func() { _ = lock.Unlock(j.ctx) }() diff --git a/scheduler.go b/scheduler.go index fd8fac2..f2c31d4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -138,6 +138,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { jobsIn: make(chan jobIn), jobsOutForRescheduling: make(chan uuid.UUID), + jobUpdateNextRuns: make(chan uuid.UUID), jobsOutCompleted: make(chan uuid.UUID), jobOutRequest: make(chan jobOutRequest, 1000), done: make(chan error), @@ -176,7 +177,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { select { case id := <-s.exec.jobsOutForRescheduling: s.selectExecJobsOutForRescheduling(id) - + case id := <-s.exec.jobUpdateNextRuns: + s.updateNextScheduled(id) case id := <-s.exec.jobsOutCompleted: s.selectExecJobsOutCompleted(id) @@ -405,6 +407,22 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { s.jobs[id] = j } +func (s *scheduler) updateNextScheduled(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + return + } + var newNextScheduled []time.Time + for _, t := range j.nextScheduled { + if t.Before(s.now()) { + continue + } + newNextScheduled = append(newNextScheduled, t) + } + j.nextScheduled = newNextScheduled + s.jobs[id] = j +} + func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { j, ok := s.jobs[id] if !ok {