2023-11-08 17:11:42 +00:00
|
|
|
package gocron
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2024-07-03 15:23:45 +00:00
|
|
|
"fmt"
|
2023-11-08 17:11:42 +00:00
|
|
|
"strconv"
|
|
|
|
|
"sync"
|
|
|
|
|
"time"
|
|
|
|
|
|
2024-07-18 16:32:56 +00:00
|
|
|
"github.com/jonboulle/clockwork"
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
"github.com/google/uuid"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type executor struct {
|
2024-07-18 16:32:56 +00:00
|
|
|
// context used for shutting down
|
|
|
|
|
ctx context.Context
|
|
|
|
|
// cancel used by the executor to signal a stop of it's functions
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
// clock used for regular time or mocking time
|
|
|
|
|
clock clockwork.Clock
|
|
|
|
|
// the executor's logger
|
|
|
|
|
logger Logger
|
|
|
|
|
|
|
|
|
|
// receives jobs scheduled to execute
|
|
|
|
|
jobsIn chan jobIn
|
|
|
|
|
// sends out jobs for rescheduling
|
2024-03-26 14:55:21 +00:00
|
|
|
jobsOutForRescheduling chan uuid.UUID
|
2024-07-18 16:32:56 +00:00
|
|
|
// sends out jobs once completed
|
|
|
|
|
jobsOutCompleted chan uuid.UUID
|
|
|
|
|
// used to request jobs from the scheduler
|
|
|
|
|
jobOutRequest chan jobOutRequest
|
|
|
|
|
|
|
|
|
|
// used by the executor to receive a stop signal from the scheduler
|
|
|
|
|
stopCh chan struct{}
|
|
|
|
|
// the timeout value when stopping
|
|
|
|
|
stopTimeout time.Duration
|
|
|
|
|
// used to signal that the executor has completed shutdown
|
|
|
|
|
done chan error
|
|
|
|
|
|
|
|
|
|
// runners for any singleton type jobs
|
|
|
|
|
// map[uuid.UUID]singletonRunner
|
|
|
|
|
singletonRunners *sync.Map
|
|
|
|
|
// config for limit mode
|
|
|
|
|
limitMode *limitModeConfig
|
|
|
|
|
// the elector when running distributed instances
|
|
|
|
|
elector Elector
|
|
|
|
|
// the locker when running distributed instances
|
|
|
|
|
locker Locker
|
|
|
|
|
// monitor for reporting metrics
|
|
|
|
|
monitor Monitor
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
type jobIn struct {
|
|
|
|
|
id uuid.UUID
|
|
|
|
|
shouldSendOut bool
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
type singletonRunner struct {
|
2023-12-19 03:13:37 +00:00
|
|
|
in chan jobIn
|
2023-11-08 17:11:42 +00:00
|
|
|
rescheduleLimiter chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type limitModeConfig struct {
|
|
|
|
|
started bool
|
|
|
|
|
mode LimitMode
|
|
|
|
|
limit uint
|
|
|
|
|
rescheduleLimiter chan struct{}
|
2023-12-19 03:13:37 +00:00
|
|
|
in chan jobIn
|
2023-11-28 12:48:22 +00:00
|
|
|
// singletonJobs is used to track singleton jobs that are running
|
|
|
|
|
// in the limit mode runner. This is used to prevent the same job
|
|
|
|
|
// from running multiple times across limit mode runners when both
|
|
|
|
|
// a limit mode and singleton mode are enabled.
|
|
|
|
|
singletonJobs map[uuid.UUID]struct{}
|
|
|
|
|
singletonJobsMu sync.Mutex
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (e *executor) start() {
|
2023-11-28 12:48:22 +00:00
|
|
|
e.logger.Debug("gocron: executor started")
|
2023-11-08 17:11:42 +00:00
|
|
|
|
|
|
|
|
// creating the executor's context here as the executor
|
|
|
|
|
// is the only goroutine that should access this context
|
|
|
|
|
// any other uses within the executor should create a context
|
|
|
|
|
// using the executor context as parent.
|
|
|
|
|
e.ctx, e.cancel = context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
// the standardJobsWg tracks
|
2023-11-28 12:48:22 +00:00
|
|
|
standardJobsWg := &waitGroupWithMutex{}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2023-11-28 12:48:22 +00:00
|
|
|
singletonJobsWg := &waitGroupWithMutex{}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2023-11-28 12:48:22 +00:00
|
|
|
limitModeJobsWg := &waitGroupWithMutex{}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2023-11-14 19:52:19 +00:00
|
|
|
// create a fresh map for tracking singleton runners
|
2024-01-30 21:50:10 +00:00
|
|
|
e.singletonRunners = &sync.Map{}
|
2023-11-14 19:52:19 +00:00
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
// start the for leap that is the executor
|
|
|
|
|
// selecting on channels for work to do
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
// job ids in are sent from 1 of 2 places:
|
|
|
|
|
// 1. the scheduler sends directly when jobs
|
|
|
|
|
// are run immediately.
|
|
|
|
|
// 2. sent from time.AfterFuncs in which job schedules
|
|
|
|
|
// are spun up by the scheduler
|
2023-12-19 03:13:37 +00:00
|
|
|
case jIn := <-e.jobsIn:
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-e.stopCh:
|
2023-11-22 12:43:50 +00:00
|
|
|
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
|
2023-11-08 17:11:42 +00:00
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
// this context is used to handle cancellation of the executor
|
|
|
|
|
// on requests for a job to the scheduler via requestJobCtx
|
|
|
|
|
ctx, cancel := context.WithCancel(e.ctx)
|
|
|
|
|
|
|
|
|
|
if e.limitMode != nil && !e.limitMode.started {
|
|
|
|
|
// check if we are already running the limit mode runners
|
|
|
|
|
// if not, spin up the required number i.e. limit!
|
|
|
|
|
e.limitMode.started = true
|
|
|
|
|
for i := e.limitMode.limit; i > 0; i-- {
|
|
|
|
|
limitModeJobsWg.Add(1)
|
2023-11-22 12:43:50 +00:00
|
|
|
go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, limitModeJobsWg, e.limitMode.mode, e.limitMode.rescheduleLimiter)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// spin off into a goroutine to unblock the executor and
|
|
|
|
|
// allow for processing for more work
|
|
|
|
|
go func() {
|
|
|
|
|
// make sure to cancel the above context per the docs
|
|
|
|
|
// // Canceling this context releases resources associated with it, so code should
|
|
|
|
|
// // call cancel as soon as the operations running in this Context complete.
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
|
|
// check for limit mode - this spins up a separate runner which handles
|
|
|
|
|
// limiting the total number of concurrently running jobs
|
|
|
|
|
if e.limitMode != nil {
|
|
|
|
|
if e.limitMode.mode == LimitModeReschedule {
|
|
|
|
|
select {
|
|
|
|
|
// rescheduleLimiter is a channel the size of the limit
|
|
|
|
|
// this blocks publishing to the channel and keeps
|
|
|
|
|
// the executor from building up a waiting queue
|
|
|
|
|
// and forces rescheduling
|
|
|
|
|
case e.limitMode.rescheduleLimiter <- struct{}{}:
|
2023-12-19 03:13:37 +00:00
|
|
|
e.limitMode.in <- jIn
|
2023-11-08 17:11:42 +00:00
|
|
|
default:
|
|
|
|
|
// all runners are busy, reschedule the work for later
|
|
|
|
|
// which means we just skip it here and do nothing
|
|
|
|
|
// TODO when metrics are added, this should increment a rescheduled metric
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// since we're not using LimitModeReschedule, but instead using LimitModeWait
|
|
|
|
|
// we do want to queue up the work to the limit mode runners and allow them
|
|
|
|
|
// to work through the channel backlog. A hard limit of 1000 is in place
|
|
|
|
|
// at which point this call would block.
|
|
|
|
|
// TODO when metrics are added, this should increment a wait metric
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2024-05-02 16:35:57 +00:00
|
|
|
e.limitMode.in <- jIn
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// no limit mode, so we're either running a regular job or
|
|
|
|
|
// a job with a singleton mode
|
|
|
|
|
//
|
|
|
|
|
// get the job, so we can figure out what kind it is and how
|
|
|
|
|
// to execute it
|
2023-12-19 03:13:37 +00:00
|
|
|
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
|
2023-11-08 17:11:42 +00:00
|
|
|
if j == nil {
|
|
|
|
|
// safety check as it'd be strange bug if this occurred
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if j.singletonMode {
|
|
|
|
|
// for singleton mode, get the existing runner for the job
|
|
|
|
|
// or spin up a new one
|
2024-01-30 21:50:10 +00:00
|
|
|
runner := &singletonRunner{}
|
|
|
|
|
runnerSrc, ok := e.singletonRunners.Load(jIn.id)
|
2023-11-08 17:11:42 +00:00
|
|
|
if !ok {
|
2023-12-19 03:13:37 +00:00
|
|
|
runner.in = make(chan jobIn, 1000)
|
2023-11-08 17:11:42 +00:00
|
|
|
if j.singletonLimitMode == LimitModeReschedule {
|
|
|
|
|
runner.rescheduleLimiter = make(chan struct{}, 1)
|
|
|
|
|
}
|
2024-01-30 21:50:10 +00:00
|
|
|
e.singletonRunners.Store(jIn.id, runner)
|
2023-11-08 17:11:42 +00:00
|
|
|
singletonJobsWg.Add(1)
|
2023-12-19 03:13:37 +00:00
|
|
|
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
|
2024-01-30 21:50:10 +00:00
|
|
|
} else {
|
|
|
|
|
runner = runnerSrc.(*singletonRunner)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if j.singletonLimitMode == LimitModeReschedule {
|
|
|
|
|
// reschedule mode uses the limiter channel to check
|
|
|
|
|
// for a running job and reschedules if the channel is full.
|
|
|
|
|
select {
|
|
|
|
|
case runner.rescheduleLimiter <- struct{}{}:
|
2023-12-19 03:13:37 +00:00
|
|
|
runner.in <- jIn
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
default:
|
|
|
|
|
// runner is busy, reschedule the work for later
|
|
|
|
|
// which means we just skip it here and do nothing
|
2024-07-24 15:24:54 +00:00
|
|
|
e.incrementJobCounter(*j, SingletonRescheduled)
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// wait mode, fill up that queue (buffered channel, so it's ok)
|
2023-12-19 03:13:37 +00:00
|
|
|
runner.in <- jIn
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
select {
|
|
|
|
|
case <-e.stopCh:
|
2023-11-22 12:43:50 +00:00
|
|
|
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
|
2023-11-08 17:11:42 +00:00
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
// we've gotten to the basic / standard jobs --
|
|
|
|
|
// the ones without anything special that just want
|
|
|
|
|
// to be run. Add to the WaitGroup so that
|
|
|
|
|
// stopping or shutting down can wait for the jobs to
|
|
|
|
|
// complete.
|
|
|
|
|
standardJobsWg.Add(1)
|
|
|
|
|
go func(j internalJob) {
|
2024-03-12 13:37:11 +00:00
|
|
|
e.runJob(j, jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
standardJobsWg.Done()
|
|
|
|
|
}(*j)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
case <-e.stopCh:
|
2023-11-22 12:43:50 +00:00
|
|
|
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
|
2023-11-08 17:11:42 +00:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-26 14:55:21 +00:00
|
|
|
func (e *executor) sendOutForRescheduling(jIn *jobIn) {
|
2024-03-05 16:55:42 +00:00
|
|
|
if jIn.shouldSendOut {
|
|
|
|
|
select {
|
2024-03-26 14:55:21 +00:00
|
|
|
case e.jobsOutForRescheduling <- jIn.id:
|
2024-03-05 16:55:42 +00:00
|
|
|
case <-e.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// we need to set this to false now, because to handle
|
|
|
|
|
// non-limit jobs, we send out from the e.runJob function
|
|
|
|
|
// and in this case we don't want to send out twice.
|
|
|
|
|
jIn.shouldSendOut = false
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
|
2023-11-28 12:48:22 +00:00
|
|
|
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
2023-12-19 03:13:37 +00:00
|
|
|
case jIn := <-in:
|
2023-11-28 12:48:22 +00:00
|
|
|
select {
|
|
|
|
|
case <-e.ctx.Done():
|
|
|
|
|
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
|
|
|
|
|
wg.Done()
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2023-11-28 12:48:22 +00:00
|
|
|
ctx, cancel := context.WithCancel(e.ctx)
|
2023-12-19 03:13:37 +00:00
|
|
|
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
|
2023-11-28 12:48:22 +00:00
|
|
|
cancel()
|
|
|
|
|
if j != nil {
|
|
|
|
|
if j.singletonMode {
|
|
|
|
|
e.limitMode.singletonJobsMu.Lock()
|
2023-12-19 03:13:37 +00:00
|
|
|
_, ok := e.limitMode.singletonJobs[jIn.id]
|
2023-11-28 12:48:22 +00:00
|
|
|
if ok {
|
|
|
|
|
// this job is already running, so don't run it
|
|
|
|
|
// but instead reschedule it
|
|
|
|
|
e.limitMode.singletonJobsMu.Unlock()
|
2023-12-19 03:13:37 +00:00
|
|
|
if jIn.shouldSendOut {
|
|
|
|
|
select {
|
|
|
|
|
case <-e.ctx.Done():
|
|
|
|
|
return
|
|
|
|
|
case <-j.ctx.Done():
|
|
|
|
|
return
|
2024-03-26 14:55:21 +00:00
|
|
|
case e.jobsOutForRescheduling <- j.id:
|
2023-12-19 03:13:37 +00:00
|
|
|
}
|
2023-11-28 12:48:22 +00:00
|
|
|
}
|
2023-12-19 03:13:37 +00:00
|
|
|
// remove the limiter block, as this particular job
|
|
|
|
|
// was a singleton already running, and we want to
|
|
|
|
|
// allow another job to be scheduled
|
2023-12-11 16:39:59 +00:00
|
|
|
if limitMode == LimitModeReschedule {
|
2024-03-05 16:55:42 +00:00
|
|
|
<-rescheduleLimiter
|
2023-12-11 16:39:59 +00:00
|
|
|
}
|
2023-11-28 12:48:22 +00:00
|
|
|
continue
|
|
|
|
|
}
|
2023-12-19 03:13:37 +00:00
|
|
|
e.limitMode.singletonJobs[jIn.id] = struct{}{}
|
2023-11-28 12:48:22 +00:00
|
|
|
e.limitMode.singletonJobsMu.Unlock()
|
|
|
|
|
}
|
2024-03-12 13:37:11 +00:00
|
|
|
e.runJob(*j, jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2023-11-28 12:48:22 +00:00
|
|
|
if j.singletonMode {
|
|
|
|
|
e.limitMode.singletonJobsMu.Lock()
|
2023-12-19 03:13:37 +00:00
|
|
|
delete(e.limitMode.singletonJobs, jIn.id)
|
2023-11-28 12:48:22 +00:00
|
|
|
e.limitMode.singletonJobsMu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2023-11-28 12:48:22 +00:00
|
|
|
// remove the limiter block to allow another job to be scheduled
|
|
|
|
|
if limitMode == LimitModeReschedule {
|
2024-03-05 16:55:42 +00:00
|
|
|
<-rescheduleLimiter
|
2023-11-28 12:48:22 +00:00
|
|
|
}
|
|
|
|
|
case <-e.ctx.Done():
|
|
|
|
|
e.logger.Debug("limitModeRunner shutting down", "name", name)
|
|
|
|
|
wg.Done()
|
|
|
|
|
return
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
|
|
|
|
|
e.logger.Debug("gocron: singletonModeRunner starting", "name", name)
|
2023-11-08 17:11:42 +00:00
|
|
|
for {
|
|
|
|
|
select {
|
2023-12-19 03:13:37 +00:00
|
|
|
case jIn := <-in:
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-e.ctx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name)
|
2023-11-08 17:11:42 +00:00
|
|
|
wg.Done()
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(e.ctx)
|
2023-12-19 03:13:37 +00:00
|
|
|
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
|
2023-11-28 12:48:22 +00:00
|
|
|
cancel()
|
2023-11-08 17:11:42 +00:00
|
|
|
if j != nil {
|
2024-05-02 16:35:57 +00:00
|
|
|
// need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling
|
|
|
|
|
// inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called
|
|
|
|
|
// when the job is sent to the singleton mode runner.
|
|
|
|
|
jIn.shouldSendOut = false
|
2024-03-12 13:37:11 +00:00
|
|
|
e.runJob(*j, jIn)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// remove the limiter block to allow another job to be scheduled
|
|
|
|
|
if limitMode == LimitModeReschedule {
|
2024-03-05 16:55:42 +00:00
|
|
|
<-rescheduleLimiter
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
case <-e.ctx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
e.logger.Debug("singletonModeRunner shutting down", "name", name)
|
2023-11-08 17:11:42 +00:00
|
|
|
wg.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-12 13:37:11 +00:00
|
|
|
func (e *executor) runJob(j internalJob, jIn jobIn) {
|
2023-11-09 21:04:18 +00:00
|
|
|
if j.ctx == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-e.ctx.Done():
|
2023-11-09 21:04:18 +00:00
|
|
|
return
|
2023-11-08 17:11:42 +00:00
|
|
|
case <-j.ctx.Done():
|
2023-11-09 21:04:18 +00:00
|
|
|
return
|
2023-11-08 17:11:42 +00:00
|
|
|
default:
|
2023-11-09 21:04:18 +00:00
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2024-07-19 18:10:28 +00:00
|
|
|
if j.stopTimeReached(e.clock.Now()) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-09 21:04:18 +00:00
|
|
|
if e.elector != nil {
|
|
|
|
|
if err := e.elector.IsLeader(j.ctx); err != nil {
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2024-04-29 18:29:11 +00:00
|
|
|
e.incrementJobCounter(j, Skip)
|
2023-11-08 17:11:42 +00:00
|
|
|
return
|
|
|
|
|
}
|
2024-04-22 18:52:27 +00:00
|
|
|
} else if j.locker != nil {
|
|
|
|
|
lock, err := j.locker.Lock(j.ctx, j.name)
|
|
|
|
|
if err != nil {
|
2024-06-19 11:33:50 +00:00
|
|
|
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
|
2024-04-22 18:52:27 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2024-04-29 18:29:11 +00:00
|
|
|
e.incrementJobCounter(j, Skip)
|
2024-04-22 18:52:27 +00:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = lock.Unlock(j.ctx) }()
|
2023-11-14 15:56:05 +00:00
|
|
|
} else if e.locker != nil {
|
2023-11-16 13:49:43 +00:00
|
|
|
lock, err := e.locker.Lock(j.ctx, j.name)
|
2023-11-14 15:56:05 +00:00
|
|
|
if err != nil {
|
2024-06-19 11:33:50 +00:00
|
|
|
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
2024-04-29 18:29:11 +00:00
|
|
|
e.incrementJobCounter(j, Skip)
|
2023-11-14 15:56:05 +00:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
defer func() { _ = lock.Unlock(j.ctx) }()
|
2023-11-09 21:04:18 +00:00
|
|
|
}
|
2023-11-14 17:39:27 +00:00
|
|
|
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2024-03-26 14:55:21 +00:00
|
|
|
e.sendOutForRescheduling(&jIn)
|
|
|
|
|
select {
|
|
|
|
|
case e.jobsOutCompleted <- j.id:
|
|
|
|
|
case <-e.ctx.Done():
|
|
|
|
|
}
|
2023-11-09 21:04:18 +00:00
|
|
|
|
2024-01-17 21:39:11 +00:00
|
|
|
startTime := time.Now()
|
2024-11-20 18:17:01 +00:00
|
|
|
var err error
|
|
|
|
|
if j.afterJobRunsWithPanic != nil {
|
|
|
|
|
err = e.callJobWithRecover(j)
|
|
|
|
|
} else {
|
|
|
|
|
err = callJobFuncWithParams(j.function, j.parameters...)
|
|
|
|
|
}
|
2024-07-24 15:24:54 +00:00
|
|
|
e.recordJobTiming(startTime, time.Now(), j)
|
2023-11-09 21:04:18 +00:00
|
|
|
if err != nil {
|
2023-11-14 17:39:27 +00:00
|
|
|
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
|
2024-04-29 18:29:11 +00:00
|
|
|
e.incrementJobCounter(j, Fail)
|
2023-11-09 21:04:18 +00:00
|
|
|
} else {
|
2023-11-14 17:39:27 +00:00
|
|
|
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
|
2024-04-29 18:29:11 +00:00
|
|
|
e.incrementJobCounter(j, Success)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-06-20 12:01:36 +00:00
|
|
|
func (e *executor) callJobWithRecover(j internalJob) (err error) {
|
|
|
|
|
defer func() {
|
|
|
|
|
if recoverData := recover(); recoverData != nil {
|
|
|
|
|
_ = callJobFuncWithParams(j.afterJobRunsWithPanic, j.id, j.name, recoverData)
|
|
|
|
|
|
|
|
|
|
// if panic is occurred, we should return an error
|
2024-07-03 15:23:45 +00:00
|
|
|
err = fmt.Errorf("%w from %v", ErrPanicRecovered, recoverData)
|
2024-06-20 12:01:36 +00:00
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
return callJobFuncWithParams(j.function, j.parameters...)
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-24 15:24:54 +00:00
|
|
|
func (e *executor) recordJobTiming(start time.Time, end time.Time, j internalJob) {
|
|
|
|
|
if e.monitor != nil {
|
|
|
|
|
e.monitor.RecordJobTiming(start, end, j.id, j.name, j.tags)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-29 18:29:11 +00:00
|
|
|
func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
|
|
|
|
|
if e.monitor != nil {
|
|
|
|
|
e.monitor.IncrementJob(j.id, j.name, j.tags, status)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
2023-11-28 12:48:22 +00:00
|
|
|
|
|
|
|
|
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
|
|
|
|
|
e.logger.Debug("gocron: stopping executor")
|
|
|
|
|
// we've been asked to stop. This is either because the scheduler has been told
|
|
|
|
|
// to stop all jobs or the scheduler has been asked to completely shutdown.
|
|
|
|
|
//
|
|
|
|
|
// cancel tells all the functions to stop their work and send in a done response
|
|
|
|
|
e.cancel()
|
|
|
|
|
|
|
|
|
|
// the wait for job channels are used to report back whether we successfully waited
|
|
|
|
|
// for all jobs to complete or if we hit the configured timeout.
|
|
|
|
|
waitForJobs := make(chan struct{}, 1)
|
|
|
|
|
waitForSingletons := make(chan struct{}, 1)
|
|
|
|
|
waitForLimitMode := make(chan struct{}, 1)
|
|
|
|
|
|
|
|
|
|
// the waiter context is used to cancel the functions waiting on jobs.
|
|
|
|
|
// this is done to avoid goroutine leaks.
|
|
|
|
|
waiterCtx, waiterCancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
// wait for standard jobs to complete
|
|
|
|
|
go func() {
|
|
|
|
|
e.logger.Debug("gocron: waiting for standard jobs to complete")
|
|
|
|
|
go func() {
|
|
|
|
|
// this is done in a separate goroutine, so we aren't
|
|
|
|
|
// blocked by the WaitGroup's Wait call in the event
|
|
|
|
|
// that the waiter context is cancelled.
|
|
|
|
|
// This particular goroutine could leak in the event that
|
|
|
|
|
// some long-running standard job doesn't complete.
|
|
|
|
|
standardJobsWg.Wait()
|
|
|
|
|
e.logger.Debug("gocron: standard jobs completed")
|
|
|
|
|
waitForJobs <- struct{}{}
|
|
|
|
|
}()
|
|
|
|
|
<-waiterCtx.Done()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// wait for per job singleton limit mode runner jobs to complete
|
|
|
|
|
go func() {
|
|
|
|
|
e.logger.Debug("gocron: waiting for singleton jobs to complete")
|
|
|
|
|
go func() {
|
|
|
|
|
singletonJobsWg.Wait()
|
|
|
|
|
e.logger.Debug("gocron: singleton jobs completed")
|
|
|
|
|
waitForSingletons <- struct{}{}
|
|
|
|
|
}()
|
|
|
|
|
<-waiterCtx.Done()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// wait for limit mode runners to complete
|
|
|
|
|
go func() {
|
|
|
|
|
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
|
|
|
|
|
go func() {
|
|
|
|
|
limitModeJobsWg.Wait()
|
|
|
|
|
e.logger.Debug("gocron: limitMode jobs completed")
|
|
|
|
|
waitForLimitMode <- struct{}{}
|
|
|
|
|
}()
|
|
|
|
|
<-waiterCtx.Done()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// now either wait for all the jobs to complete,
|
|
|
|
|
// or hit the timeout.
|
|
|
|
|
var count int
|
|
|
|
|
timeout := time.Now().Add(e.stopTimeout)
|
|
|
|
|
for time.Now().Before(timeout) && count < 3 {
|
|
|
|
|
select {
|
|
|
|
|
case <-waitForJobs:
|
|
|
|
|
count++
|
|
|
|
|
case <-waitForSingletons:
|
|
|
|
|
count++
|
|
|
|
|
case <-waitForLimitMode:
|
|
|
|
|
count++
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if count < 3 {
|
|
|
|
|
e.done <- ErrStopJobsTimedOut
|
|
|
|
|
e.logger.Debug("gocron: executor stopped - timed out")
|
|
|
|
|
} else {
|
|
|
|
|
e.done <- nil
|
|
|
|
|
e.logger.Debug("gocron: executor stopped")
|
|
|
|
|
}
|
|
|
|
|
waiterCancel()
|
2024-06-27 19:53:35 +00:00
|
|
|
|
|
|
|
|
if e.limitMode != nil {
|
|
|
|
|
e.limitMode.started = false
|
|
|
|
|
}
|
2023-11-28 12:48:22 +00:00
|
|
|
}
|