fix cases where default on send out is resulting in job not going out (#686)

This commit is contained in:
John Roesler 2024-03-05 10:55:42 -06:00 committed by GitHub
parent 387cbe4fc2
commit 27f2cbaa41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 27 deletions

View File

@ -122,12 +122,7 @@ func (e *executor) start() {
// all runners are busy, reschedule the work for later // all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric // TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut { e.sendOutToScheduler(&jIn)
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
} }
} else { } else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait // since we're not using LimitModeReschedule, but instead using LimitModeWait
@ -136,6 +131,7 @@ func (e *executor) start() {
// at which point this call would block. // at which point this call would block.
// TODO when metrics are added, this should increment a wait metric // TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- jIn e.limitMode.in <- jIn
e.sendOutToScheduler(&jIn)
} }
} else { } else {
// no limit mode, so we're either running a regular job or // no limit mode, so we're either running a regular job or
@ -171,20 +167,17 @@ func (e *executor) start() {
select { select {
case runner.rescheduleLimiter <- struct{}{}: case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn runner.in <- jIn
e.sendOutToScheduler(&jIn)
default: default:
// runner is busy, reschedule the work for later // runner is busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric // TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut { e.sendOutToScheduler(&jIn)
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
} }
} else { } else {
// wait mode, fill up that queue (buffered channel, so it's ok) // wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn runner.in <- jIn
e.sendOutToScheduler(&jIn)
} }
} else { } else {
select { select {
@ -213,6 +206,20 @@ func (e *executor) start() {
} }
} }
func (e *executor) sendOutToScheduler(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
case <-e.ctx.Done():
return
}
}
// we need to set this to false now, because to handle
// non-limit jobs, we send out from the e.runJob function
// and in this case we don't want to send out twice.
jIn.shouldSendOut = false
}
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name) e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for { for {
@ -250,10 +257,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
// was a singleton already running, and we want to // was a singleton already running, and we want to
// allow another job to be scheduled // allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { <-rescheduleLimiter
case <-rescheduleLimiter:
default:
}
} }
continue continue
} }
@ -271,10 +275,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
// remove the limiter block to allow another job to be scheduled // remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { <-rescheduleLimiter
case <-rescheduleLimiter:
default:
}
} }
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name) e.logger.Debug("limitModeRunner shutting down", "name", name)
@ -306,10 +307,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
// remove the limiter block to allow another job to be scheduled // remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { <-rescheduleLimiter
case <-rescheduleLimiter:
default:
}
} }
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("singletonModeRunner shutting down", "name", name) e.logger.Debug("singletonModeRunner shutting down", "name", name)

4
job.go
View File

@ -846,7 +846,9 @@ type Job interface {
NextRun() (time.Time, error) NextRun() (time.Time, error)
// RunNow runs the job once, now. This does not alter // RunNow runs the job once, now. This does not alter
// the existing run schedule, and will respect all job // the existing run schedule, and will respect all job
// and scheduler limits. // and scheduler limits. This means that running a job now may
// cause the job's regular interval to be rescheduled due to
// the instance being run by RunNow blocking your run limit.
RunNow() error RunNow() error
// Tags returns the job's string tags. // Tags returns the job's string tags.
Tags() []string Tags() []string

View File

@ -8,7 +8,6 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/goleak" "go.uber.org/goleak"
@ -1508,7 +1507,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
WithSingletonMode(LimitModeReschedule), WithSingletonMode(LimitModeReschedule),
}, },
func() time.Duration { func() time.Duration {
return 10 * time.Second return 20 * time.Second
}, },
1, 1,
}, },