This commit is contained in:
apocelipes 2025-02-26 23:28:43 +08:00 committed by GitHub
parent e1b7d52ebc
commit 63f3701d57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 77 additions and 73 deletions

View File

@ -36,6 +36,8 @@ type executor struct {
// used by the executor to receive a stop signal from the scheduler // used by the executor to receive a stop signal from the scheduler
stopCh chan struct{} stopCh chan struct{}
// ensure that stop runs before the next call to start and only runs once
stopOnce *sync.Once
// the timeout value when stopping // the timeout value when stopping
stopTimeout time.Duration stopTimeout time.Duration
// used to signal that the executor has completed shutdown // used to signal that the executor has completed shutdown
@ -88,6 +90,7 @@ func (e *executor) start() {
// any other uses within the executor should create a context // any other uses within the executor should create a context
// using the executor context as parent. // using the executor context as parent.
e.ctx, e.cancel = context.WithCancel(context.Background()) e.ctx, e.cancel = context.WithCancel(context.Background())
e.stopOnce = &sync.Once{}
// the standardJobsWg tracks // the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{} standardJobsWg := &waitGroupWithMutex{}
@ -131,7 +134,7 @@ func (e *executor) start() {
// spin off into a goroutine to unblock the executor and // spin off into a goroutine to unblock the executor and
// allow for processing for more work // allow for processing for more work
go func() { go func(executorCtx context.Context) {
// make sure to cancel the above context per the docs // make sure to cancel the above context per the docs
// // Canceling this context releases resources associated with it, so code should // // Canceling this context releases resources associated with it, so code should
// // call cancel as soon as the operations running in this Context complete. // // call cancel as soon as the operations running in this Context complete.
@ -211,8 +214,7 @@ func (e *executor) start() {
} }
} else { } else {
select { select {
case <-e.stopCh: case <-executorCtx.Done():
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return return
default: default:
} }
@ -228,7 +230,7 @@ func (e *executor) start() {
}(*j) }(*j)
} }
} }
}() }(e.ctx)
case <-e.stopCh: case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg) e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return return
@ -473,86 +475,88 @@ func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
} }
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) { func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.logger.Debug("gocron: stopping executor") e.stopOnce.Do(func() {
// we've been asked to stop. This is either because the scheduler has been told e.logger.Debug("gocron: stopping executor")
// to stop all jobs or the scheduler has been asked to completely shutdown. // we've been asked to stop. This is either because the scheduler has been told
// // to stop all jobs or the scheduler has been asked to completely shutdown.
// cancel tells all the functions to stop their work and send in a done response //
e.cancel() // cancel tells all the functions to stop their work and send in a done response
e.cancel()
// the wait for job channels are used to report back whether we successfully waited // the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout. // for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1) waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1) waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1) waitForLimitMode := make(chan struct{}, 1)
// the waiter context is used to cancel the functions waiting on jobs. // the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks. // this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background()) waiterCtx, waiterCancel := context.WithCancel(context.Background())
// wait for standard jobs to complete // wait for standard jobs to complete
go func() {
e.logger.Debug("gocron: waiting for standard jobs to complete")
go func() { go func() {
// this is done in a separate goroutine, so we aren't e.logger.Debug("gocron: waiting for standard jobs to complete")
// blocked by the WaitGroup's Wait call in the event go func() {
// that the waiter context is cancelled. // this is done in a separate goroutine, so we aren't
// This particular goroutine could leak in the event that // blocked by the WaitGroup's Wait call in the event
// some long-running standard job doesn't complete. // that the waiter context is cancelled.
standardJobsWg.Wait() // This particular goroutine could leak in the event that
e.logger.Debug("gocron: standard jobs completed") // some long-running standard job doesn't complete.
waitForJobs <- struct{}{} standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}() }()
<-waiterCtx.Done()
}()
// wait for per job singleton limit mode runner jobs to complete // wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("gocron: waiting for singleton jobs to complete")
go func() { go func() {
singletonJobsWg.Wait() e.logger.Debug("gocron: waiting for singleton jobs to complete")
e.logger.Debug("gocron: singleton jobs completed") go func() {
waitForSingletons <- struct{}{} singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}() }()
<-waiterCtx.Done()
}()
// wait for limit mode runners to complete // wait for limit mode runners to complete
go func() {
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
go func() { go func() {
limitModeJobsWg.Wait() e.logger.Debug("gocron: waiting for limit mode jobs to complete")
e.logger.Debug("gocron: limitMode jobs completed") go func() {
waitForLimitMode <- struct{}{} limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}() }()
<-waiterCtx.Done()
}()
// now either wait for all the jobs to complete, // now either wait for all the jobs to complete,
// or hit the timeout. // or hit the timeout.
var count int var count int
timeout := time.Now().Add(e.stopTimeout) timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 { for time.Now().Before(timeout) && count < 3 {
select { select {
case <-waitForJobs: case <-waitForJobs:
count++ count++
case <-waitForSingletons: case <-waitForSingletons:
count++ count++
case <-waitForLimitMode: case <-waitForLimitMode:
count++ count++
default: default:
}
} }
} if count < 3 {
if count < 3 { e.done <- ErrStopJobsTimedOut
e.done <- ErrStopJobsTimedOut e.logger.Debug("gocron: executor stopped - timed out")
e.logger.Debug("gocron: executor stopped - timed out") } else {
} else { e.done <- nil
e.done <- nil e.logger.Debug("gocron: executor stopped")
e.logger.Debug("gocron: executor stopped") }
} waiterCancel()
waiterCancel()
if e.limitMode != nil { if e.limitMode != nil {
e.limitMode.started = false e.limitMode.started = false
} }
})
} }

View File

@ -141,7 +141,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
jobUpdateNextRuns: make(chan uuid.UUID), jobUpdateNextRuns: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID), jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000), jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error), done: make(chan error, 1),
} }
s := &scheduler{ s := &scheduler{