gocron/executor.go

629 lines
19 KiB
Go

package gocron
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/jonboulle/clockwork"
"github.com/google/uuid"
)
type executor struct {
// context used for shutting down
ctx context.Context
// cancel used by the executor to signal a stop of it's functions
cancel context.CancelFunc
// clock used for regular time or mocking time
clock clockwork.Clock
// the executor's logger
logger Logger
// receives jobs scheduled to execute
jobsIn chan jobIn
// sends out jobs for rescheduling
jobsOutForRescheduling chan uuid.UUID
// sends out jobs once completed
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan *jobOutRequest
// sends out job needs to update the next runs
jobUpdateNextRuns chan uuid.UUID
// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// ensure that stop runs before the next call to start and only runs once
stopOnce *sync.Once
// the timeout value when stopping
stopTimeout time.Duration
// used to signal that the executor has completed shutdown
done chan error
// runners for any singleton type jobs
// map[uuid.UUID]singletonRunner
singletonRunners *sync.Map
// config for limit mode
limitMode *limitModeConfig
// the elector when running distributed instances
elector Elector
// the locker when running distributed instances
locker Locker
// monitor for reporting metrics
monitor Monitor
// monitorStatus for reporting metrics
monitorStatus MonitorStatus
// reference to parent scheduler for lifecycle notifications
scheduler *scheduler
}
type jobIn struct {
id uuid.UUID
shouldSendOut bool
}
type singletonRunner struct {
in chan jobIn
rescheduleLimiter chan struct{}
}
type limitModeConfig struct {
started bool
mode LimitMode
limit uint
rescheduleLimiter chan struct{}
in chan jobIn
// singletonJobs is used to track singleton jobs that are running
// in the limit mode runner. This is used to prevent the same job
// from running multiple times across limit mode runners when both
// a limit mode and singleton mode are enabled.
singletonJobs map[uuid.UUID]struct{}
singletonJobsMu sync.Mutex
}
func (e *executor) start() {
e.logger.Debug("gocron: executor started")
// creating the executor's context here as the executor
// is the only goroutine that should access this context
// any other uses within the executor should create a context
// using the executor context as parent.
e.ctx, e.cancel = context.WithCancel(context.Background())
e.stopOnce = &sync.Once{}
// the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{}
singletonJobsWg := &waitGroupWithMutex{}
limitModeJobsWg := &waitGroupWithMutex{}
// create a fresh map for tracking singleton runners
e.singletonRunners = &sync.Map{}
// start the for leap that is the executor
// selecting on channels for work to do
for {
select {
// job ids in are sent from 1 of 2 places:
// 1. the scheduler sends directly when jobs
// are run immediately.
// 2. sent from time.AfterFuncs in which job schedules
// are spun up by the scheduler
case jIn := <-e.jobsIn:
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
default:
}
// this context is used to handle cancellation of the executor
// on requests for a job to the scheduler via requestJobCtx
ctx, cancel := context.WithCancel(e.ctx)
if e.limitMode != nil && !e.limitMode.started {
// check if we are already running the limit mode runners
// if not, spin up the required number i.e. limit!
e.limitMode.started = true
for i := e.limitMode.limit; i > 0; i-- {
limitModeJobsWg.Add(1)
go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, limitModeJobsWg, e.limitMode.mode, e.limitMode.rescheduleLimiter)
}
}
// spin off into a goroutine to unblock the executor and
// allow for processing for more work
go func(executorCtx context.Context) {
// make sure to cancel the above context per the docs
// // Canceling this context releases resources associated with it, so code should
// // call cancel as soon as the operations running in this Context complete.
defer cancel()
// check for limit mode - this spins up a separate runner which handles
// limiting the total number of concurrently running jobs
if e.limitMode != nil {
if e.limitMode.mode == LimitModeReschedule {
select {
// rescheduleLimiter is a channel the size of the limit
// this blocks publishing to the channel and keeps
// the executor from building up a waiting queue
// and forces rescheduling
case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- jIn
default:
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
ctx2, cancel2 := context.WithCancel(executorCtx)
job := requestJobCtx(ctx2, jIn.id, e.jobOutRequest)
cancel2()
if job != nil && e.scheduler != nil {
e.scheduler.notifyConcurrencyLimitReached("limit", e.scheduler.jobFromInternalJob(*job))
}
e.sendOutForRescheduling(&jIn)
}
} else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait
// we do want to queue up the work to the limit mode runners and allow them
// to work through the channel backlog. A hard limit of 1000 is in place
// at which point this call would block.
// TODO when metrics are added, this should increment a wait metric
e.sendOutForRescheduling(&jIn)
e.limitMode.in <- jIn
}
} else {
// no limit mode, so we're either running a regular job or
// a job with a singleton mode
//
// get the job, so we can figure out what kind it is and how
// to execute it
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
if j == nil {
// safety check as it'd be strange bug if this occurred
return
}
if j.singletonMode {
// for singleton mode, get the existing runner for the job
// or spin up a new one
runner := &singletonRunner{}
runnerSrc, ok := e.singletonRunners.Load(jIn.id)
if !ok {
runner.in = make(chan jobIn, 1000)
if j.singletonLimitMode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1)
}
e.singletonRunners.Store(jIn.id, runner)
singletonJobsWg.Add(1)
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
} else {
runner = runnerSrc.(*singletonRunner)
}
if j.singletonLimitMode == LimitModeReschedule {
// reschedule mode uses the limiter channel to check
// for a running job and reschedules if the channel is full.
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn
// For intervalFromCompletion, skip rescheduling here - it will happen after job completes
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
}
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
e.incrementJobCounter(*j, SingletonRescheduled)
e.sendOutForRescheduling(&jIn)
// Notify concurrency limit reached
if e.scheduler != nil {
e.scheduler.notifyConcurrencyLimitReached("singleton", e.scheduler.jobFromInternalJob(*j))
}
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn
// For intervalFromCompletion, skip rescheduling here - it will happen after job completes
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
}
}
} else {
select {
case <-executorCtx.Done():
return
default:
}
// we've gotten to the basic / standard jobs --
// the ones without anything special that just want
// to be run. Add to the WaitGroup so that
// stopping or shutting down can wait for the jobs to
// complete.
standardJobsWg.Add(1)
go func(j internalJob) {
e.runJob(j, jIn)
standardJobsWg.Done()
}(*j)
}
}
}(e.ctx)
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
}
}
}
func (e *executor) sendOutForRescheduling(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobsOutForRescheduling <- jIn.id:
case <-e.ctx.Done():
return
}
}
// we need to set this to false now, because to handle
// non-limit jobs, we send out from the e.runJob function
// and in this case we don't want to send out twice.
jIn.shouldSendOut = false
}
func (e *executor) sendOutForNextRunUpdate(jIn *jobIn) {
select {
case e.jobUpdateNextRuns <- jIn.id:
case <-e.ctx.Done():
return
}
}
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
select {
case jIn := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
wg.Done()
return
default:
}
ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
_, ok := e.limitMode.singletonJobs[jIn.id]
if ok {
// this job is already running, so don't run it
// but instead reschedule it
e.limitMode.singletonJobsMu.Unlock()
if jIn.shouldSendOut {
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobsOutForRescheduling <- j.id:
}
}
// remove the limiter block, as this particular job
// was a singleton already running, and we want to
// allow another job to be scheduled
if limitMode == LimitModeReschedule {
<-rescheduleLimiter
}
continue
}
e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock()
}
e.runJob(*j, jIn)
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
delete(e.limitMode.singletonJobs, jIn.id)
e.limitMode.singletonJobsMu.Unlock()
}
}
// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
<-rescheduleLimiter
}
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
wg.Done()
return
}
}
}
func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: singletonModeRunner starting", "name", name)
for {
select {
case jIn := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name)
wg.Done()
return
default:
}
ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
// need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling
// inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called
// when the job is sent to the singleton mode runner.
// Exception: for intervalFromCompletion, we want rescheduling to happen AFTER job completion
if !j.intervalFromCompletion {
jIn.shouldSendOut = false
}
e.runJob(*j, jIn)
}
// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
<-rescheduleLimiter
}
case <-e.ctx.Done():
e.logger.Debug("singletonModeRunner shutting down", "name", name)
wg.Done()
return
}
}
}
func (e *executor) runJob(j internalJob, jIn jobIn) {
if j.ctx == nil {
return
}
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
default:
}
if j.stopTimeReached(e.clock.Now()) {
return
}
if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
}
} else if !j.disabledLocker && j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
} else if !j.disabledLocker && e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
e.sendOutForNextRunUpdate(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
// Notify job started
actualStartTime := time.Now()
if e.scheduler != nil {
jobObj := e.scheduler.jobFromInternalJob(j)
e.scheduler.notifyJobStarted(jobObj)
// Notify scheduling delay if job had a scheduled time
if len(j.nextScheduled) > 0 {
e.scheduler.notifyJobSchedulingDelay(jobObj, j.nextScheduled[0], actualStartTime)
}
}
err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name)
if err != nil {
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
// Notify job failed (before actual run)
if e.scheduler != nil {
e.scheduler.notifyJobFailed(e.scheduler.jobFromInternalJob(j), err)
}
return
}
// Notify job running
if e.scheduler != nil {
e.scheduler.notifyJobRunning(e.scheduler.jobFromInternalJob(j))
}
// For intervalFromCompletion, we need to reschedule AFTER the job completes,
// not before. For regular jobs, we reschedule before execution (existing behavior).
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
}
startTime := time.Now()
if j.afterJobRunsWithPanic != nil {
err = e.callJobWithRecover(j)
} else {
err = callJobFuncWithParams(j.function, j.parameters...)
}
e.recordJobTiming(startTime, time.Now(), j)
if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
e.incrementJobCounter(j, Fail)
endTime := time.Now()
e.recordJobTimingWithStatus(startTime, endTime, j, Fail, err)
// Notify job failed
if e.scheduler != nil {
jobObj := e.scheduler.jobFromInternalJob(j)
e.scheduler.notifyJobFailed(jobObj, err)
e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime))
}
} else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
e.incrementJobCounter(j, Success)
endTime := time.Now()
e.recordJobTimingWithStatus(startTime, endTime, j, Success, nil)
// Notify job completed
if e.scheduler != nil {
jobObj := e.scheduler.jobFromInternalJob(j)
e.scheduler.notifyJobCompleted(jobObj)
e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime))
}
}
// For intervalFromCompletion, reschedule AFTER the job completes
if j.intervalFromCompletion {
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
e.sendOutForRescheduling(&jIn)
}
}
func (e *executor) callJobWithRecover(j internalJob) (err error) {
defer func() {
if recoverData := recover(); recoverData != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithPanic, j.id, j.name, recoverData)
// if panic is occurred, we should return an error
err = fmt.Errorf("%w from %v", ErrPanicRecovered, recoverData)
}
}()
return callJobFuncWithParams(j.function, j.parameters...)
}
func (e *executor) recordJobTiming(start time.Time, end time.Time, j internalJob) {
if e.monitor != nil {
e.monitor.RecordJobTiming(start, end, j.id, j.name, j.tags)
}
}
func (e *executor) recordJobTimingWithStatus(start time.Time, end time.Time, j internalJob, status JobStatus, err error) {
if e.monitorStatus != nil {
e.monitorStatus.RecordJobTimingWithStatus(start, end, j.id, j.name, j.tags, status, err)
}
}
func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
if e.monitor != nil {
e.monitor.IncrementJob(j.id, j.name, j.tags, status)
}
}
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.stopOnce.Do(func() {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()
// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)
// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())
// wait for standard jobs to complete
go func() {
e.logger.Debug("gocron: waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}()
// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("gocron: waiting for singleton jobs to complete")
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}()
// wait for limit mode runners to complete
go func() {
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}()
// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
}
}
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()
if e.limitMode != nil {
e.limitMode.started = false
}
})
}