diff --git a/executor.go b/executor.go index e2aa0d7..1b51e57 100644 --- a/executor.go +++ b/executor.go @@ -10,20 +10,21 @@ import ( ) type executor struct { - ctx context.Context - cancel context.CancelFunc - logger Logger - stopCh chan struct{} - jobsIn chan jobIn - jobIDsOut chan uuid.UUID - jobOutRequest chan jobOutRequest - stopTimeout time.Duration - done chan error - singletonRunners *sync.Map // map[uuid.UUID]singletonRunner - limitMode *limitModeConfig - elector Elector - locker Locker - monitor Monitor + ctx context.Context + cancel context.CancelFunc + logger Logger + stopCh chan struct{} + jobsIn chan jobIn + jobsOutForRescheduling chan uuid.UUID + jobsOutCompleted chan uuid.UUID + jobOutRequest chan jobOutRequest + stopTimeout time.Duration + done chan error + singletonRunners *sync.Map // map[uuid.UUID]singletonRunner + limitMode *limitModeConfig + elector Elector + locker Locker + monitor Monitor } type jobIn struct { @@ -122,7 +123,7 @@ func (e *executor) start() { // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { // since we're not using LimitModeReschedule, but instead using LimitModeWait @@ -131,7 +132,7 @@ func (e *executor) start() { // at which point this call would block. // TODO when metrics are added, this should increment a wait metric e.limitMode.in <- jIn - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { // no limit mode, so we're either running a regular job or @@ -167,17 +168,17 @@ func (e *executor) start() { select { case runner.rescheduleLimiter <- struct{}{}: runner.in <- jIn - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { // wait mode, fill up that queue (buffered channel, so it's ok) runner.in <- jIn - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) } } else { select { @@ -206,10 +207,10 @@ func (e *executor) start() { } } -func (e *executor) sendOutToScheduler(jIn *jobIn) { +func (e *executor) sendOutForRescheduling(jIn *jobIn) { if jIn.shouldSendOut { select { - case e.jobIDsOut <- jIn.id: + case e.jobsOutForRescheduling <- jIn.id: case <-e.ctx.Done(): return } @@ -250,7 +251,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith return case <-j.ctx.Done(): return - case e.jobIDsOut <- j.id: + case e.jobsOutForRescheduling <- j.id: } } // remove the limiter block, as this particular job @@ -331,20 +332,24 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { if e.elector != nil { if err := e.elector.IsLeader(j.ctx); err != nil { - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) return } } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) return } defer func() { _ = lock.Unlock(j.ctx) }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) - e.sendOutToScheduler(&jIn) + e.sendOutForRescheduling(&jIn) + select { + case e.jobsOutCompleted <- j.id: + case <-e.ctx.Done(): + } startTime := time.Now() err := callJobFuncWithParams(j.function, j.parameters...) diff --git a/job.go b/job.go index 59e532a..7641dbe 100644 --- a/job.go +++ b/job.go @@ -24,7 +24,9 @@ type internalJob struct { name string tags []string jobSchedule - lastRun, nextRun time.Time + lastScheduledRun time.Time + nextScheduled time.Time + lastRun time.Time function any parameters []any timer clockwork.Timer @@ -681,18 +683,18 @@ func (d dailyJob) next(lastRun time.Time) time.Time { func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time { for _, at := range d.atTimes { - // sub the at time hour/min/sec onto the lastRun's values + // sub the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next day, it's ok to consider - // the same at time that was last run (as lastRun has been incremented) + // the same at time that was last run (as lastScheduledRun has been incremented) return atDate } } @@ -727,18 +729,18 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim // weekDayDiff is used to add the correct amount to the atDate day below weekDayDiff := wd - lastRun.Weekday() for _, at := range w.atTimes { - // sub the at time hour/min/sec onto the lastRun's values + // sub the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next week, it's ok to consider - // the same at time that was last run (as lastRun has been incremented) + // the same at time that was last run (as lastScheduledRun has been incremented) return atDate } } @@ -795,7 +797,7 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass for _, day := range days { if day >= lastRun.Day() { for _, at := range m.atTimes { - // sub the day, and the at time hour/min/sec onto the lastRun's values + // sub the day, and the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) @@ -807,12 +809,12 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next month, it's ok to consider - // the same at time that was lastRun (as lastRun has been incremented) + // the same at time that was lastScheduledRun (as lastScheduledRun has been incremented) return atDate } } @@ -892,7 +894,7 @@ func (j job) NextRun() (time.Time, error) { if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } - return ij.nextRun, nil + return ij.nextScheduled, nil } func (j job) Tags() []string { diff --git a/scheduler.go b/scheduler.go index 428c493..1c1735c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -109,10 +109,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { singletonRunners: nil, logger: &noOpLogger{}, - jobsIn: make(chan jobIn), - jobIDsOut: make(chan uuid.UUID), - jobOutRequest: make(chan jobOutRequest, 1000), - done: make(chan error), + jobsIn: make(chan jobIn), + jobsOutForRescheduling: make(chan uuid.UUID), + jobsOutCompleted: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + done: make(chan error), } s := &scheduler{ @@ -147,8 +148,11 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { s.logger.Info("gocron: new scheduler created") for { select { - case id := <-s.exec.jobIDsOut: - s.selectExecJobIDsOut(id) + case id := <-s.exec.jobsOutForRescheduling: + s.selectExecJobsOutForRescheduling(id) + + case id := <-s.exec.jobsOutCompleted: + s.selectExecJobsOutCompleted(id) case in := <-s.newJobCh: s.selectNewJob(in) @@ -287,14 +291,14 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { // Jobs coming back from the executor to the scheduler that // need to evaluated for rescheduling. -func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { +func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { j, ok := s.jobs[id] if !ok { // the job was removed while it was running, and // so we don't need to reschedule it. return } - j.lastRun = j.nextRun + j.lastScheduledRun = j.nextScheduled // if the job has a limited number of runs set, we need to // check how many runs have occurred and stop running this @@ -313,7 +317,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { } } - next := j.next(j.lastRun) + next := j.next(j.lastScheduledRun) if next.IsZero() { // the job's next function will return zero for OneTime jobs. // since they are one time only, they do not need rescheduling. @@ -329,7 +333,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { next = j.next(next) } } - j.nextRun = next + j.nextScheduled = next j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { // set the actual timer on the job here and listen for // shut down events so that the job doesn't attempt to @@ -347,6 +351,15 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { s.jobs[id] = j } +func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + return + } + j.lastRun = s.now() + s.jobs[id] = j +} + func (s *scheduler) selectJobOutRequest(out jobOutRequest) { if j, ok := s.jobs[out.id]; ok { select { @@ -386,7 +399,7 @@ func (s *scheduler) selectNewJob(in newJobIn) { } }) } - j.nextRun = next + j.nextScheduled = next } s.jobs[j.id] = j @@ -437,7 +450,7 @@ func (s *scheduler) selectStart() { } }) } - j.nextRun = next + j.nextScheduled = next s.jobs[id] = j } select { diff --git a/scheduler_test.go b/scheduler_test.go index ed03726..1f922a9 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1398,6 +1398,29 @@ func TestScheduler_RemoveLotsOfJobs(t *testing.T) { } } +func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) { + goleak.VerifyNone(t) + s := newTestScheduler(t) + s.Start() + + _, err := s.NewJob( + DurationJob(100*time.Millisecond), + NewTask(func() {}), + WithEventListeners( + BeforeJobRuns( + func(jobID uuid.UUID, jobName string) { + s.RemoveByTags("tag1") + }, + ), + ), + WithTags("tag1"), + ) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 400) + assert.NoError(t, s.Shutdown()) +} + func TestScheduler_WithEventListeners(t *testing.T) { goleak.VerifyNone(t) @@ -1673,6 +1696,67 @@ func TestScheduler_RunJobNow(t *testing.T) { } } +func TestScheduler_LastRunSingleton(t *testing.T) { + goleak.VerifyNone(t) + + tests := []struct { + name string + f func(t *testing.T, j Job) + }{ + { + "simple", + func(t *testing.T, j Job) {}, + }, + { + "with runNow", + func(t *testing.T, j Job) { + runTime := time.Now() + assert.NoError(t, j.RunNow()) + + // because we're using wait mode we need to wait here + // to make sure the job queued with RunNow has finished running + time.Sleep(time.Millisecond * 200) + lastRun, err := j.LastRun() + assert.NoError(t, err) + assert.LessOrEqual(t, lastRun.Sub(runTime), time.Millisecond*125) + assert.GreaterOrEqual(t, lastRun.Sub(runTime), time.Millisecond*75) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t) + j, err := s.NewJob( + DurationJob(time.Millisecond*100), + NewTask(func() { + time.Sleep(time.Millisecond * 200) + }), + WithSingletonMode(LimitModeWait), + ) + require.NoError(t, err) + + startTime := time.Now() + s.Start() + + lastRun, err := j.LastRun() + assert.NoError(t, err) + assert.True(t, lastRun.IsZero()) + + time.Sleep(time.Millisecond * 200) + + lastRun, err = j.LastRun() + assert.NoError(t, err) + assert.LessOrEqual(t, lastRun.Sub(startTime), time.Millisecond*125) + assert.GreaterOrEqual(t, lastRun.Sub(startTime), time.Millisecond*75) + + tt.f(t, j) + + assert.NoError(t, s.Shutdown()) + }) + } +} + func TestScheduler_OneTimeJob(t *testing.T) { tests := []struct { name string @@ -1796,7 +1880,7 @@ func TestScheduler_WithMonitor(t *testing.T) { jobName string }{ { - "scheduler with monitorer", + "scheduler with monitor", DurationJob(time.Millisecond * 50), "job", },