Added Distributed Locker to JobOptions (#711)

This commit is contained in:
Giridharan Ramasamy 2024-04-23 00:22:27 +05:30 committed by GitHub
parent 3b653b99e4
commit 3faf525f98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 75 additions and 15 deletions

View File

@ -120,6 +120,7 @@ other instances checking to see if a new leader needs to be elected.
(don't see what you need? request on slack to get a repo created to contribute it!)
- [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker):
A locker can be used to lock each run of a job to a single instance of gocron.
Locker can be at job or scheduler, if it is defined both at job and scheduler then locker of job will take precedence.
- Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=)
(don't see what you need? request on slack to get a repo created to contribute it!)

View File

@ -36,6 +36,7 @@ var (
ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil")
ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil")
ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil")
ErrWithDistributedJobLockerNil = fmt.Errorf("gocron: WithDistributedJobLocker: locker must not be nil")
ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")

View File

@ -335,6 +335,13 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.sendOutForRescheduling(&jIn)
return
}
} else if j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
e.sendOutForRescheduling(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {

15
job.go
View File

@ -42,6 +42,8 @@ type internalJob struct {
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
locker Locker
}
// stop is used to stop the job's timer and cancel the context
@ -485,6 +487,19 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
// JobOption defines the constructor for job options.
type JobOption func(*internalJob) error
// WithDistributedJobLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
func WithDistributedJobLocker(locker Locker) JobOption {
return func(j *internalJob) error {
if locker == nil {
return ErrWithDistributedJobLockerNil
}
j.locker = locker
return nil
}
}
// WithEventListeners sets the event listeners that should be
// run for the job.
func WithEventListeners(eventListeners ...EventListener) JobOption {

View File

@ -728,6 +728,14 @@ func TestScheduler_NewJobErrors(t *testing.T) {
nil,
ErrOneTimeJobStartDateTimePast,
},
{
"WithDistributedJobLocker is nil",
DurationJob(
time.Second,
),
[]JobOption{WithDistributedJobLocker(nil)},
ErrWithDistributedJobLockerNil,
},
}
for _, tt := range tests {
@ -1199,17 +1207,19 @@ func TestScheduler_WithDistributed(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
assertions func(*testing.T)
name string
count int
schedulerOpts []SchedulerOption
jobOpts []JobOption
assertions func(*testing.T)
}{
{
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{
notLeader: notLeader,
}),
[]SchedulerOption{
WithDistributedElector(&testElector{notLeader: notLeader}),
},
nil,
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLeaderCount int
@ -1229,9 +1239,32 @@ func TestScheduler_WithDistributed(t *testing.T) {
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{
notLocked: notLocked,
}),
[]SchedulerOption{
WithDistributedLocker(&testLocker{notLocked: notLocked}),
},
nil,
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLocked:
notLockedCount++
default:
}
}
},
},
{
"3 schedulers and job with Distributed locker",
3,
nil,
[]JobOption{
WithDistributedJobLocker(&testLocker{notLocked: notLocked}),
},
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
@ -1257,12 +1290,17 @@ func TestScheduler_WithDistributed(t *testing.T) {
for i := tt.count; i > 0; i-- {
s := newTestScheduler(t,
tt.opt,
tt.schedulerOpts...,
)
jobOpts := []JobOption{
WithStartAt(
WithStartImmediately(),
),
}
jobOpts = append(jobOpts, tt.jobOpts...)
go func() {
s.Start()
_, err := s.NewJob(
DurationJob(
time.Second,
@ -1273,9 +1311,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
jobsRan <- struct{}{}
},
),
WithStartAt(
WithStartImmediately(),
),
jobOpts...,
)
require.NoError(t, err)