diff --git a/executor.go b/executor.go index b8c7e34..109805e 100644 --- a/executor.go +++ b/executor.go @@ -19,7 +19,7 @@ type executor struct { jobOutRequest chan jobOutRequest stopTimeout time.Duration done chan error - singletonRunners map[uuid.UUID]singletonRunner + singletonRunners *sync.Map // map[uuid.UUID]singletonRunner limitMode *limitModeConfig elector Elector locker Locker @@ -67,7 +67,7 @@ func (e *executor) start() { limitModeJobsWg := &waitGroupWithMutex{} // create a fresh map for tracking singleton runners - e.singletonRunners = make(map[uuid.UUID]singletonRunner) + e.singletonRunners = &sync.Map{} // start the for leap that is the executor // selecting on channels for work to do @@ -151,15 +151,18 @@ func (e *executor) start() { if j.singletonMode { // for singleton mode, get the existing runner for the job // or spin up a new one - runner, ok := e.singletonRunners[jIn.id] + runner := &singletonRunner{} + runnerSrc, ok := e.singletonRunners.Load(jIn.id) if !ok { runner.in = make(chan jobIn, 1000) if j.singletonLimitMode == LimitModeReschedule { runner.rescheduleLimiter = make(chan struct{}, 1) } - e.singletonRunners[jIn.id] = runner + e.singletonRunners.Store(jIn.id, runner) singletonJobsWg.Add(1) go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) + } else { + runner = runnerSrc.(*singletonRunner) } if j.singletonLimitMode == LimitModeReschedule { diff --git a/scheduler.go b/scheduler.go index 503a0d8..c2bc53d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -106,7 +106,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { exec := executor{ stopCh: make(chan struct{}), stopTimeout: time.Second * 10, - singletonRunners: make(map[uuid.UUID]singletonRunner), + singletonRunners: nil, logger: &noOpLogger{}, jobsIn: make(chan jobIn),