elector & locker were failing to send out when not leader (#688)

* elector & locker were failing to send out when not leader

* update test to confirm non-active elector/locker are checked

* clean up data race

* try to make test more reliable
This commit is contained in:
John Roesler 2024-03-12 08:37:11 -05:00 committed by GitHub
parent c2f95759d2
commit ebec5e9f91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 142 additions and 26 deletions

View File

@ -66,6 +66,11 @@ func main() {
}
```
## Examples
- [Go doc examples](https://pkg.go.dev/github.com/go-co-op/gocron/v2#pkg-examples)
- [Examples directory](examples)
## Concepts
- **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then

View File

@ -516,7 +516,7 @@ func ExampleWithClock() {
}
func ExampleWithDistributedElector() {
//var _ Elector = (*myElector)(nil)
//var _ gocron.Elector = (*myElector)(nil)
//
//type myElector struct{}
//
@ -524,15 +524,15 @@ func ExampleWithDistributedElector() {
// return nil
//}
//
//elector := myElector{}
//elector := &myElector{}
//
//_, _ = NewScheduler(
// WithDistributedElector(elector),
//_, _ = gocron.NewScheduler(
// gocron.WithDistributedElector(elector),
//)
}
func ExampleWithDistributedLocker() {
//var _ Locker = (*myLocker)(nil)
//var _ gocron.Locker = (*myLocker)(nil)
//
//type myLocker struct{}
//
@ -549,10 +549,10 @@ func ExampleWithDistributedLocker() {
// return nil
//}
//
//locker := myLocker{}
//locker := &myLocker{}
//
//_, _ = NewScheduler(
// WithDistributedLocker(locker),
//_, _ = gocron.NewScheduler(
// gocron.WithDistributedLocker(locker),
//)
}

73
examples/elector/main.go Normal file
View File

@ -0,0 +1,73 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-co-op/gocron/v2"
)
var _ gocron.Elector = (*myElector)(nil)
type myElector struct {
num int
leader bool
}
func (m myElector) IsLeader(_ context.Context) error {
if m.leader {
log.Printf("node %d is leader", m.num)
return nil
}
log.Printf("node %d is not leader", m.num)
return fmt.Errorf("not leader")
}
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
for i := 0; i < 3; i++ {
go func(i int) {
elector := &myElector{
num: i,
}
if i == 0 {
elector.leader = true
}
scheduler, err := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
if err != nil {
log.Println(err)
return
}
_, err = scheduler.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
log.Println("run job")
}),
)
if err != nil {
log.Println(err)
return
}
scheduler.Start()
if i == 0 {
time.Sleep(5 * time.Second)
elector.leader = false
}
if i == 1 {
time.Sleep(5 * time.Second)
elector.leader = true
}
}(i)
}
select {} // wait forever
}

View File

@ -193,7 +193,7 @@ func (e *executor) start() {
// complete.
standardJobsWg.Add(1)
go func(j internalJob) {
e.runJob(j, jIn.shouldSendOut)
e.runJob(j, jIn)
standardJobsWg.Done()
}(*j)
}
@ -264,7 +264,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock()
}
e.runJob(*j, jIn.shouldSendOut)
e.runJob(*j, jIn)
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
@ -302,7 +302,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
e.runJob(*j, jIn.shouldSendOut)
e.runJob(*j, jIn)
}
// remove the limiter block to allow another job to be scheduled
@ -317,7 +317,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
}
}
func (e *executor) runJob(j internalJob, shouldSendOut bool) {
func (e *executor) runJob(j internalJob, jIn jobIn) {
if j.ctx == nil {
return
}
@ -331,26 +331,20 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) {
if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutToScheduler(&jIn)
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
e.sendOutToScheduler(&jIn)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
if shouldSendOut {
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
}
}
e.sendOutToScheduler(&jIn)
startTime := time.Now()
err := callJobFuncWithParams(j.function, j.parameters...)

View File

@ -1140,6 +1140,7 @@ var _ Elector = (*testElector)(nil)
type testElector struct {
mu sync.Mutex
leaderElected bool
notLeader chan struct{}
}
func (t *testElector) IsLeader(ctx context.Context) error {
@ -1152,6 +1153,7 @@ func (t *testElector) IsLeader(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.leaderElected {
t.notLeader <- struct{}{}
return fmt.Errorf("already elected leader")
}
t.leaderElected = true
@ -1163,12 +1165,14 @@ var _ Locker = (*testLocker)(nil)
type testLocker struct {
mu sync.Mutex
jobLocked bool
notLocked chan struct{}
}
func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.jobLocked {
t.notLocked <- struct{}{}
return nil, fmt.Errorf("job already locked")
}
t.jobLocked = true
@ -1184,21 +1188,58 @@ func (t testLock) Unlock(_ context.Context) error {
}
func TestScheduler_WithDistributed(t *testing.T) {
notLocked := make(chan struct{}, 10)
notLeader := make(chan struct{}, 10)
goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
assertions func(*testing.T)
}{
{
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{}),
WithDistributedElector(&testElector{
notLeader: notLeader,
}),
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLeaderCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLeader:
notLeaderCount++
default:
}
}
assert.Equal(t, 2, notLeaderCount)
},
},
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{}),
WithDistributedLocker(&testLocker{
notLocked: notLocked,
}),
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLocked:
notLockedCount++
default:
}
}
},
},
}
@ -1222,6 +1263,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
),
NewTask(
func() {
time.Sleep(100 * time.Millisecond)
jobsRan <- struct{}{}
},
),
@ -1263,6 +1305,8 @@ func TestScheduler_WithDistributed(t *testing.T) {
}
assert.Equal(t, 1, runCount)
time.Sleep(time.Second)
tt.assertions(t)
})
}
}