removes nextRuns in the past when job skipped by locker (#829)

Co-authored-by: manuelarte <manuel.doncel.martos@gmail.com>
This commit is contained in:
Manuel Doncel Martos 2025-02-17 22:43:46 +01:00 committed by GitHub
parent 08b53d788a
commit 2769f0940f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 1 deletions

View File

@ -31,6 +31,9 @@ type executor struct {
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// sends out job needs to update the next runs
jobUpdateNextRuns chan uuid.UUID
// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// the timeout value when stopping
@ -247,6 +250,14 @@ func (e *executor) sendOutForRescheduling(jIn *jobIn) {
jIn.shouldSendOut = false
}
func (e *executor) sendOutForNextRunUpdate(jIn *jobIn) {
select {
case e.jobUpdateNextRuns <- jIn.id:
case <-e.ctx.Done():
return
}
}
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
@ -376,6 +387,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
@ -385,6 +397,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()

View File

@ -138,6 +138,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
@ -176,7 +177,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
select {
case id := <-s.exec.jobsOutForRescheduling:
s.selectExecJobsOutForRescheduling(id)
case id := <-s.exec.jobUpdateNextRuns:
s.updateNextScheduled(id)
case id := <-s.exec.jobsOutCompleted:
s.selectExecJobsOutCompleted(id)
@ -405,6 +407,22 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
s.jobs[id] = j
}
func (s *scheduler) updateNextScheduled(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
return
}
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled
s.jobs[id] = j
}
func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {