From 5814fbcb6f6d98ff16cea0e14cecc1c9fb1ec0d9 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 28 Nov 2023 06:48:22 -0600 Subject: [PATCH] allow max concurrent runs and singleton mode together (#625) --- executor.go | 254 ++++++++++++++++++++++++++++------------------ job.go | 4 - scheduler.go | 21 ++-- scheduler_test.go | 87 ++++++++++++++++ 4 files changed, 251 insertions(+), 115 deletions(-) diff --git a/executor.go b/executor.go index a3331d8..b562a28 100644 --- a/executor.go +++ b/executor.go @@ -36,10 +36,16 @@ type limitModeConfig struct { limit uint rescheduleLimiter chan struct{} in chan uuid.UUID + // singletonJobs is used to track singleton jobs that are running + // in the limit mode runner. This is used to prevent the same job + // from running multiple times across limit mode runners when both + // a limit mode and singleton mode are enabled. + singletonJobs map[uuid.UUID]struct{} + singletonJobsMu sync.Mutex } func (e *executor) start() { - e.logger.Debug("executor started") + e.logger.Debug("gocron: executor started") // creating the executor's context here as the executor // is the only goroutine that should access this context @@ -48,20 +54,11 @@ func (e *executor) start() { e.ctx, e.cancel = context.WithCancel(context.Background()) // the standardJobsWg tracks - standardJobsWg := &waitGroupWithMutex{ - wg: sync.WaitGroup{}, - mu: sync.Mutex{}, - } + standardJobsWg := &waitGroupWithMutex{} - singletonJobsWg := &waitGroupWithMutex{ - wg: sync.WaitGroup{}, - mu: sync.Mutex{}, - } + singletonJobsWg := &waitGroupWithMutex{} - limitModeJobsWg := &waitGroupWithMutex{ - wg: sync.WaitGroup{}, - mu: sync.Mutex{}, - } + limitModeJobsWg := &waitGroupWithMutex{} // create a fresh map for tracking singleton runners e.singletonRunners = make(map[uuid.UUID]singletonRunner) @@ -141,7 +138,6 @@ func (e *executor) start() { j := requestJobCtx(ctx, id, e.jobOutRequest) if j == nil { // safety check as it'd be strange bug if this occurred - // TODO add a log line here return } if j.singletonMode { @@ -155,7 +151,7 @@ func (e *executor) start() { } e.singletonRunners[id] = runner singletonJobsWg.Add(1) - go e.limitModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) + go e.singletonModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) } if j.singletonLimitMode == LimitModeReschedule { @@ -204,95 +200,14 @@ func (e *executor) start() { } } -func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) { - e.logger.Debug("stopping executor") - // we've been asked to stop. This is either because the scheduler has been told - // to stop all jobs or the scheduler has been asked to completely shutdown. - // - // cancel tells all the functions to stop their work and send in a done response - e.cancel() - - // the wait for job channels are used to report back whether we successfully waited - // for all jobs to complete or if we hit the configured timeout. - waitForJobs := make(chan struct{}, 1) - waitForSingletons := make(chan struct{}, 1) - waitForLimitMode := make(chan struct{}, 1) - - // the waiter context is used to cancel the functions waiting on jobs. - // this is done to avoid goroutine leaks. - waiterCtx, waiterCancel := context.WithCancel(context.Background()) - - // wait for standard jobs to complete - go func() { - e.logger.Debug("waiting for standard jobs to complete") - go func() { - // this is done in a separate goroutine, so we aren't - // blocked by the WaitGroup's Wait call in the event - // that the waiter context is cancelled. - // This particular goroutine could leak in the event that - // some long-running standard job doesn't complete. - standardJobsWg.Wait() - e.logger.Debug("standard jobs completed") - waitForJobs <- struct{}{} - }() - <-waiterCtx.Done() - }() - - // wait for per job singleton limit mode runner jobs to complete - go func() { - e.logger.Debug("waiting for singleton jobs to complete") - go func() { - singletonJobsWg.Wait() - e.logger.Debug("singleton jobs completed") - waitForSingletons <- struct{}{} - }() - <-waiterCtx.Done() - }() - - // wait for limit mode runners to complete - go func() { - e.logger.Debug("waiting for limit mode jobs to complete") - go func() { - limitModeJobsWg.Wait() - e.logger.Debug("limitMode jobs completed") - waitForLimitMode <- struct{}{} - }() - <-waiterCtx.Done() - }() - - // now either wait for all the jobs to complete, - // or hit the timeout. - var count int - timeout := time.Now().Add(e.stopTimeout) - for time.Now().Before(timeout) && count < 3 { - select { - case <-waitForJobs: - count++ - case <-waitForSingletons: - count++ - case <-waitForLimitMode: - count++ - default: - } - } - if count < 3 { - e.done <- ErrStopJobsTimedOut - e.logger.Debug("executor stopped - timed out") - } else { - e.done <- nil - e.logger.Debug("executor stopped") - } - waiterCancel() -} - func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { - e.logger.Debug("limitModeRunner starting", "name", name) + e.logger.Debug("gocron: limitModeRunner starting", "name", name) for { select { case id := <-in: select { case <-e.ctx.Done(): - e.logger.Debug("limitModeRunner shutting down", "name", name) + e.logger.Debug("gocron: limitModeRunner shutting down", "name", name) wg.Done() return default: @@ -300,10 +215,70 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup ctx, cancel := context.WithCancel(e.ctx) j := requestJobCtx(ctx, id, e.jobOutRequest) + cancel() + if j != nil { + if j.singletonMode { + e.limitMode.singletonJobsMu.Lock() + _, ok := e.limitMode.singletonJobs[id] + if ok { + // this job is already running, so don't run it + // but instead reschedule it + e.limitMode.singletonJobsMu.Unlock() + select { + case <-e.ctx.Done(): + return + case <-j.ctx.Done(): + return + case e.jobIDsOut <- j.id: + } + continue + } + e.limitMode.singletonJobs[id] = struct{}{} + e.limitMode.singletonJobsMu.Unlock() + } + e.runJob(*j) + + if j.singletonMode { + e.limitMode.singletonJobsMu.Lock() + delete(e.limitMode.singletonJobs, id) + e.limitMode.singletonJobsMu.Unlock() + } + } + + // remove the limiter block to allow another job to be scheduled + if limitMode == LimitModeReschedule { + select { + case <-rescheduleLimiter: + default: + } + } + case <-e.ctx.Done(): + e.logger.Debug("limitModeRunner shutting down", "name", name) + wg.Done() + return + } + } +} + +func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { + e.logger.Debug("gocron: limitModeRunner starting", "name", name) + for { + select { + case id := <-in: + select { + case <-e.ctx.Done(): + e.logger.Debug("gocron: limitModeRunner shutting down", "name", name) + wg.Done() + return + default: + } + + ctx, cancel := context.WithCancel(e.ctx) + j := requestJobCtx(ctx, id, e.jobOutRequest) + cancel() if j != nil { e.runJob(*j) } - cancel() // remove the limiter block to allow another job to be scheduled if limitMode == LimitModeReschedule { @@ -360,3 +335,84 @@ func (e *executor) runJob(j internalJob) { _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) } } + +func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) { + e.logger.Debug("gocron: stopping executor") + // we've been asked to stop. This is either because the scheduler has been told + // to stop all jobs or the scheduler has been asked to completely shutdown. + // + // cancel tells all the functions to stop their work and send in a done response + e.cancel() + + // the wait for job channels are used to report back whether we successfully waited + // for all jobs to complete or if we hit the configured timeout. + waitForJobs := make(chan struct{}, 1) + waitForSingletons := make(chan struct{}, 1) + waitForLimitMode := make(chan struct{}, 1) + + // the waiter context is used to cancel the functions waiting on jobs. + // this is done to avoid goroutine leaks. + waiterCtx, waiterCancel := context.WithCancel(context.Background()) + + // wait for standard jobs to complete + go func() { + e.logger.Debug("gocron: waiting for standard jobs to complete") + go func() { + // this is done in a separate goroutine, so we aren't + // blocked by the WaitGroup's Wait call in the event + // that the waiter context is cancelled. + // This particular goroutine could leak in the event that + // some long-running standard job doesn't complete. + standardJobsWg.Wait() + e.logger.Debug("gocron: standard jobs completed") + waitForJobs <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // wait for per job singleton limit mode runner jobs to complete + go func() { + e.logger.Debug("gocron: waiting for singleton jobs to complete") + go func() { + singletonJobsWg.Wait() + e.logger.Debug("gocron: singleton jobs completed") + waitForSingletons <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // wait for limit mode runners to complete + go func() { + e.logger.Debug("gocron: waiting for limit mode jobs to complete") + go func() { + limitModeJobsWg.Wait() + e.logger.Debug("gocron: limitMode jobs completed") + waitForLimitMode <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // now either wait for all the jobs to complete, + // or hit the timeout. + var count int + timeout := time.Now().Add(e.stopTimeout) + for time.Now().Before(timeout) && count < 3 { + select { + case <-waitForJobs: + count++ + case <-waitForSingletons: + count++ + case <-waitForLimitMode: + count++ + default: + } + } + if count < 3 { + e.done <- ErrStopJobsTimedOut + e.logger.Debug("gocron: executor stopped - timed out") + } else { + e.done <- nil + e.logger.Debug("gocron: executor stopped") + } + waiterCancel() +} diff --git a/job.go b/job.go index e6ed059..020f60e 100644 --- a/job.go +++ b/job.go @@ -475,10 +475,6 @@ func WithName(name string) JobOption { // WithSingletonMode keeps the job from running again if it is already running. // This is useful for jobs that should not overlap, and that occasionally // (but not consistently) run longer than the interval between job runs. -// -// Note - this is mutually exclusive with WithLimitConcurrentJobs. If both -// are set, WithLimitConcurrentJobs will take precedence. -// WithSingletonMode effectively sets a per-job limit of 1 concurrent job. func WithSingletonMode(mode LimitMode) JobOption { return func(j *internalJob) error { j.singletonMode = true diff --git a/scheduler.go b/scheduler.go index 9b46fbf..a9946f8 100644 --- a/scheduler.go +++ b/scheduler.go @@ -111,7 +111,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { } go func() { - s.logger.Info("new scheduler created") + s.logger.Info("gocron: new scheduler created") for { select { case id := <-s.exec.jobIDsOut: @@ -164,7 +164,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { // about jobs. func (s *scheduler) stopScheduler() { - s.logger.Debug("stopping scheduler") + s.logger.Debug("gocron: stopping scheduler") if s.started { s.exec.stopCh <- struct{}{} } @@ -188,7 +188,7 @@ func (s *scheduler) stopScheduler() { } s.stopErrCh <- err s.started = false - s.logger.Debug("scheduler stopped") + s.logger.Debug("gocron: scheduler stopped") } func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { @@ -294,7 +294,7 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) { } func (s *scheduler) selectStart() { - s.logger.Debug("scheduler starting") + s.logger.Debug("gocron: scheduler starting") go s.exec.start() s.started = true @@ -325,7 +325,7 @@ func (s *scheduler) selectStart() { select { case <-s.shutdownCtx.Done(): case s.startedCh <- struct{}{}: - s.logger.Info("scheduler started") + s.logger.Info("gocron: scheduler started") } } @@ -605,19 +605,16 @@ const ( // WithLimitConcurrentJobs sets the limit and mode to be used by the // Scheduler for limiting the number of jobs that may be running at // a given time. -// -// Note - this is mutually exclusive with WithSingletonMode. If both -// are set, WithLimitConcurrentJobs will take precedence. -// WithSingletonMode effectively sets a per-job limit of 1 concurrent job. func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption { return func(s *scheduler) error { if limit == 0 { return ErrWithLimitConcurrentJobsZero } s.exec.limitMode = &limitModeConfig{ - mode: mode, - limit: limit, - in: make(chan uuid.UUID, 1000), + mode: mode, + limit: limit, + in: make(chan uuid.UUID, 1000), + singletonJobs: make(map[uuid.UUID]struct{}), } if mode == LimitModeReschedule { s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit) diff --git a/scheduler_test.go b/scheduler_test.go index 4dd2821..9b8231e 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -988,6 +988,93 @@ func TestScheduler_LimitMode(t *testing.T) { } } +func TestScheduler_LimitModeAndSingleton(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + numJobs int + limit uint + limitMode LimitMode + singletonMode LimitMode + duration time.Duration + expectedMin time.Duration + expectedMax time.Duration + }{ + { + "limit mode reschedule", + 10, + 2, + LimitModeReschedule, + LimitModeReschedule, + time.Millisecond * 100, + time.Millisecond * 400, + time.Millisecond * 700, + }, + { + "limit mode wait", + 10, + 2, + LimitModeWait, + LimitModeWait, + time.Millisecond * 100, + time.Millisecond * 200, + time.Millisecond * 500, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler( + WithLimitConcurrentJobs(tt.limit, tt.limitMode), + WithStopTimeout(2*time.Second), + ) + require.NoError(t, err) + + jobRanCh := make(chan int, 20) + + for i := 0; i < tt.numJobs; i++ { + jobNum := i + _, err = s.NewJob( + DurationJob(tt.duration), + NewTask(func() { + time.Sleep(tt.duration / 2) + jobRanCh <- jobNum + }), + WithSingletonMode(tt.singletonMode), + ) + require.NoError(t, err) + } + + start := time.Now() + s.Start() + + jobsRan := make(map[int]int) + var runCount int + for runCount < tt.numJobs { + select { + case jobNum := <-jobRanCh: + runCount++ + jobsRan[jobNum]++ + case <-time.After(time.Second): + t.Fatalf("timed out waiting for jobs to run") + } + } + stop := time.Now() + require.NoError(t, s.Shutdown()) + + assert.GreaterOrEqual(t, stop.Sub(start), tt.expectedMin) + assert.LessOrEqual(t, stop.Sub(start), tt.expectedMax) + for _, count := range jobsRan { + if tt.singletonMode == LimitModeWait { + assert.Equal(t, 1, count) + } else { + assert.LessOrEqual(t, count, 5) + } + } + }) + } +} + var _ Elector = (*testElector)(nil) type testElector struct {