mirror of https://github.com/go-co-op/gocron.git
feat: allow disabling global distributed locker per job (#811)
* chore: fix distributed locker tests * feat: allow disabling global dist locker per job
This commit is contained in:
parent
bf751076f3
commit
f5a5a2d0bc
|
|
@ -370,7 +370,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
|
|||
e.incrementJobCounter(j, Skip)
|
||||
return
|
||||
}
|
||||
} else if j.locker != nil {
|
||||
} else if !j.disabledLocker && j.locker != nil {
|
||||
lock, err := j.locker.Lock(j.ctx, j.name)
|
||||
if err != nil {
|
||||
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
|
||||
|
|
@ -379,7 +379,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
|
|||
return
|
||||
}
|
||||
defer func() { _ = lock.Unlock(j.ctx) }()
|
||||
} else if e.locker != nil {
|
||||
} else if !j.disabledLocker && e.locker != nil {
|
||||
lock, err := e.locker.Lock(j.ctx, j.name)
|
||||
if err != nil {
|
||||
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
|
||||
|
|
|
|||
11
job.go
11
job.go
|
|
@ -45,6 +45,7 @@ type internalJob struct {
|
|||
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
|
||||
afterJobRunsWithPanic func(jobID uuid.UUID, jobName string, recoverData any)
|
||||
afterLockError func(jobID uuid.UUID, jobName string, err error)
|
||||
disabledLocker bool
|
||||
|
||||
locker Locker
|
||||
}
|
||||
|
|
@ -556,6 +557,16 @@ func WithDistributedJobLocker(locker Locker) JobOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithDisabledDistributedJobLocker disables the distributed job locker.
|
||||
// This is useful when a global distributed locker has been set on the scheduler
|
||||
// level using WithDistributedLocker and need to be disabled for specific jobs.
|
||||
func WithDisabledDistributedJobLocker(disabled bool) JobOption {
|
||||
return func(j *internalJob, _ time.Time) error {
|
||||
j.disabledLocker = disabled
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithEventListeners sets the event listeners that should be
|
||||
// run for the job.
|
||||
func WithEventListeners(eventListeners ...EventListener) JobOption {
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@ func WithDistributedElector(elector Elector) SchedulerOption {
|
|||
// WithDistributedLocker sets the locker to be used by multiple
|
||||
// Scheduler instances to ensure that only one instance of each
|
||||
// job is run.
|
||||
// To disable this global locker for specific jobs, see
|
||||
// WithDisabledDistributedJobLocker.
|
||||
func WithDistributedLocker(locker Locker) SchedulerOption {
|
||||
return func(s *scheduler) error {
|
||||
if locker == nil {
|
||||
|
|
|
|||
|
|
@ -1452,6 +1452,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
count int
|
||||
runCount int
|
||||
schedulerOpts []SchedulerOption
|
||||
jobOpts []JobOption
|
||||
assertions func(*testing.T)
|
||||
|
|
@ -1459,6 +1460,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
{
|
||||
"3 schedulers with elector",
|
||||
3,
|
||||
1,
|
||||
[]SchedulerOption{
|
||||
WithDistributedElector(&testElector{notLeader: notLeader}),
|
||||
},
|
||||
|
|
@ -1482,6 +1484,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
{
|
||||
"3 schedulers with locker",
|
||||
3,
|
||||
1,
|
||||
[]SchedulerOption{
|
||||
WithDistributedLocker(&testLocker{notLocked: notLocked}),
|
||||
},
|
||||
|
|
@ -1499,11 +1502,14 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
default:
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(t, 2, notLockedCount)
|
||||
},
|
||||
},
|
||||
{
|
||||
"3 schedulers and job with Distributed locker",
|
||||
3,
|
||||
1,
|
||||
nil,
|
||||
[]JobOption{
|
||||
WithDistributedJobLocker(&testLocker{notLocked: notLocked}),
|
||||
|
|
@ -1521,6 +1527,35 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
default:
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(t, 2, notLockedCount)
|
||||
},
|
||||
},
|
||||
{
|
||||
"3 schedulers and job with disabled Distributed locker",
|
||||
3,
|
||||
3,
|
||||
[]SchedulerOption{
|
||||
WithDistributedLocker(&testLocker{notLocked: notLocked}),
|
||||
},
|
||||
[]JobOption{
|
||||
WithDisabledDistributedJobLocker(true),
|
||||
},
|
||||
func(_ *testing.T) {
|
||||
timeout := time.Now().Add(1 * time.Second)
|
||||
var notLockedCount int
|
||||
for {
|
||||
if time.Now().After(timeout) {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-notLocked:
|
||||
notLockedCount++
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(t, 0, notLockedCount)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -1531,6 +1566,11 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
schedulersDone := make(chan struct{}, tt.count)
|
||||
|
||||
var (
|
||||
runCount int
|
||||
doneCount int
|
||||
)
|
||||
|
||||
for i := tt.count; i > 0; i-- {
|
||||
s := newTestScheduler(t,
|
||||
tt.schedulerOpts...,
|
||||
|
|
@ -1539,6 +1579,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
WithStartAt(
|
||||
WithStartImmediately(),
|
||||
),
|
||||
WithLimitedRuns(1),
|
||||
}
|
||||
jobOpts = append(jobOpts, tt.jobOpts...)
|
||||
|
||||
|
|
@ -1565,31 +1606,39 @@ func TestScheduler_WithDistributed(t *testing.T) {
|
|||
}()
|
||||
}
|
||||
|
||||
var runCount int
|
||||
select {
|
||||
case <-jobsRan:
|
||||
cancel()
|
||||
runCount++
|
||||
case <-time.After(time.Second):
|
||||
cancel()
|
||||
t.Error("timed out waiting for job to run")
|
||||
RunCountLoop:
|
||||
for {
|
||||
select {
|
||||
case <-jobsRan:
|
||||
runCount++
|
||||
if runCount >= tt.runCount {
|
||||
break RunCountLoop
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Error("timed out waiting for job to run")
|
||||
break RunCountLoop
|
||||
}
|
||||
}
|
||||
|
||||
var doneCount int
|
||||
timeout := time.Now().Add(3 * time.Second)
|
||||
for doneCount < tt.count && time.Now().After(timeout) {
|
||||
cancel()
|
||||
assert.Equal(t, tt.runCount, runCount)
|
||||
|
||||
DoneCountLoop:
|
||||
for {
|
||||
select {
|
||||
case <-schedulersDone:
|
||||
doneCount++
|
||||
default:
|
||||
if doneCount >= tt.count {
|
||||
break DoneCountLoop
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Error("timed out waiting for schedulers to shutdown")
|
||||
break DoneCountLoop
|
||||
}
|
||||
}
|
||||
close(jobsRan)
|
||||
for range jobsRan {
|
||||
runCount++
|
||||
}
|
||||
|
||||
assert.Equal(t, 1, runCount)
|
||||
assert.Equal(t, tt.count, doneCount)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
tt.assertions(t)
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue