From ebec5e9f913890ddcf2ab465cb4e6946f6c01077 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Tue, 12 Mar 2024 08:37:11 -0500 Subject: [PATCH] elector & locker were failing to send out when not leader (#688) * elector & locker were failing to send out when not leader * update test to confirm non-active elector/locker are checked * clean up data race * try to make test more reliable --- README.md | 5 +++ example_test.go | 16 ++++----- examples/elector/main.go | 73 ++++++++++++++++++++++++++++++++++++++++ executor.go | 20 ++++------- scheduler_test.go | 54 ++++++++++++++++++++++++++--- 5 files changed, 142 insertions(+), 26 deletions(-) create mode 100644 examples/elector/main.go diff --git a/README.md b/README.md index 2e473bf..7fc1957 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,11 @@ func main() { } ``` +## Examples + +- [Go doc examples](https://pkg.go.dev/github.com/go-co-op/gocron/v2#pkg-examples) +- [Examples directory](examples) + ## Concepts - **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then diff --git a/example_test.go b/example_test.go index e877965..5c83bd4 100644 --- a/example_test.go +++ b/example_test.go @@ -516,7 +516,7 @@ func ExampleWithClock() { } func ExampleWithDistributedElector() { - //var _ Elector = (*myElector)(nil) + //var _ gocron.Elector = (*myElector)(nil) // //type myElector struct{} // @@ -524,15 +524,15 @@ func ExampleWithDistributedElector() { // return nil //} // - //elector := myElector{} + //elector := &myElector{} // - //_, _ = NewScheduler( - // WithDistributedElector(elector), + //_, _ = gocron.NewScheduler( + // gocron.WithDistributedElector(elector), //) } func ExampleWithDistributedLocker() { - //var _ Locker = (*myLocker)(nil) + //var _ gocron.Locker = (*myLocker)(nil) // //type myLocker struct{} // @@ -549,10 +549,10 @@ func ExampleWithDistributedLocker() { // return nil //} // - //locker := myLocker{} + //locker := &myLocker{} // - //_, _ = NewScheduler( - // WithDistributedLocker(locker), + //_, _ = gocron.NewScheduler( + // gocron.WithDistributedLocker(locker), //) } diff --git a/examples/elector/main.go b/examples/elector/main.go new file mode 100644 index 0000000..0788ab9 --- /dev/null +++ b/examples/elector/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/go-co-op/gocron/v2" +) + +var _ gocron.Elector = (*myElector)(nil) + +type myElector struct { + num int + leader bool +} + +func (m myElector) IsLeader(_ context.Context) error { + if m.leader { + log.Printf("node %d is leader", m.num) + return nil + } + log.Printf("node %d is not leader", m.num) + return fmt.Errorf("not leader") +} + +func main() { + log.SetFlags(log.LstdFlags | log.Lmicroseconds) + + for i := 0; i < 3; i++ { + go func(i int) { + elector := &myElector{ + num: i, + } + if i == 0 { + elector.leader = true + } + + scheduler, err := gocron.NewScheduler( + gocron.WithDistributedElector(elector), + ) + if err != nil { + log.Println(err) + return + } + + _, err = scheduler.NewJob( + gocron.DurationJob(time.Second), + gocron.NewTask(func() { + log.Println("run job") + }), + ) + + if err != nil { + log.Println(err) + return + } + scheduler.Start() + + if i == 0 { + time.Sleep(5 * time.Second) + elector.leader = false + } + if i == 1 { + time.Sleep(5 * time.Second) + elector.leader = true + } + }(i) + } + + select {} // wait forever +} diff --git a/executor.go b/executor.go index eae65e6..e2aa0d7 100644 --- a/executor.go +++ b/executor.go @@ -193,7 +193,7 @@ func (e *executor) start() { // complete. standardJobsWg.Add(1) go func(j internalJob) { - e.runJob(j, jIn.shouldSendOut) + e.runJob(j, jIn) standardJobsWg.Done() }(*j) } @@ -264,7 +264,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith e.limitMode.singletonJobs[jIn.id] = struct{}{} e.limitMode.singletonJobsMu.Unlock() } - e.runJob(*j, jIn.shouldSendOut) + e.runJob(*j, jIn) if j.singletonMode { e.limitMode.singletonJobsMu.Lock() @@ -302,7 +302,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) cancel() if j != nil { - e.runJob(*j, jIn.shouldSendOut) + e.runJob(*j, jIn) } // remove the limiter block to allow another job to be scheduled @@ -317,7 +317,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup } } -func (e *executor) runJob(j internalJob, shouldSendOut bool) { +func (e *executor) runJob(j internalJob, jIn jobIn) { if j.ctx == nil { return } @@ -331,26 +331,20 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) { if e.elector != nil { if err := e.elector.IsLeader(j.ctx); err != nil { + e.sendOutToScheduler(&jIn) return } } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { + e.sendOutToScheduler(&jIn) return } defer func() { _ = lock.Unlock(j.ctx) }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) - if shouldSendOut { - select { - case <-e.ctx.Done(): - return - case <-j.ctx.Done(): - return - case e.jobIDsOut <- j.id: - } - } + e.sendOutToScheduler(&jIn) startTime := time.Now() err := callJobFuncWithParams(j.function, j.parameters...) diff --git a/scheduler_test.go b/scheduler_test.go index 6727408..4689b23 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1140,6 +1140,7 @@ var _ Elector = (*testElector)(nil) type testElector struct { mu sync.Mutex leaderElected bool + notLeader chan struct{} } func (t *testElector) IsLeader(ctx context.Context) error { @@ -1152,6 +1153,7 @@ func (t *testElector) IsLeader(ctx context.Context) error { t.mu.Lock() defer t.mu.Unlock() if t.leaderElected { + t.notLeader <- struct{}{} return fmt.Errorf("already elected leader") } t.leaderElected = true @@ -1163,12 +1165,14 @@ var _ Locker = (*testLocker)(nil) type testLocker struct { mu sync.Mutex jobLocked bool + notLocked chan struct{} } func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) { t.mu.Lock() defer t.mu.Unlock() if t.jobLocked { + t.notLocked <- struct{}{} return nil, fmt.Errorf("job already locked") } t.jobLocked = true @@ -1184,21 +1188,58 @@ func (t testLock) Unlock(_ context.Context) error { } func TestScheduler_WithDistributed(t *testing.T) { + notLocked := make(chan struct{}, 10) + notLeader := make(chan struct{}, 10) + goleak.VerifyNone(t) tests := []struct { - name string - count int - opt SchedulerOption + name string + count int + opt SchedulerOption + assertions func(*testing.T) }{ { "3 schedulers with elector", 3, - WithDistributedElector(&testElector{}), + WithDistributedElector(&testElector{ + notLeader: notLeader, + }), + func(t *testing.T) { + timeout := time.Now().Add(1 * time.Second) + var notLeaderCount int + for { + if time.Now().After(timeout) { + break + } + select { + case <-notLeader: + notLeaderCount++ + default: + } + } + assert.Equal(t, 2, notLeaderCount) + }, }, { "3 schedulers with locker", 3, - WithDistributedLocker(&testLocker{}), + WithDistributedLocker(&testLocker{ + notLocked: notLocked, + }), + func(t *testing.T) { + timeout := time.Now().Add(1 * time.Second) + var notLockedCount int + for { + if time.Now().After(timeout) { + break + } + select { + case <-notLocked: + notLockedCount++ + default: + } + } + }, }, } @@ -1222,6 +1263,7 @@ func TestScheduler_WithDistributed(t *testing.T) { ), NewTask( func() { + time.Sleep(100 * time.Millisecond) jobsRan <- struct{}{} }, ), @@ -1263,6 +1305,8 @@ func TestScheduler_WithDistributed(t *testing.T) { } assert.Equal(t, 1, runCount) + time.Sleep(time.Second) + tt.assertions(t) }) } }