add distributed locker for v2 (#614)

* add distributed locker for v2

* fix logger test

* enhance logger test
This commit is contained in:
John Roesler 2023-11-14 09:56:05 -06:00 committed by GitHub
parent 3e2df30371
commit 7fea987137
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 193 additions and 58 deletions

View File

@ -97,6 +97,10 @@ func main() {
- [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector):
An elector can be used to elect a single instance of gocron to run as the primary with the
other instances checking to see if a new leader needs to be elected.
- Implementations: [go-co-op electors](https://github.com/go-co-op?q=-elector&type=all&language=&sort=)
- [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker):
A locker can be used to lock each run of a job to a single instance of gocron.
- Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=)
- **Events**: Job events can trigger actions.
- [**Listeners**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithEventListeners):
[Event listeners](https://pkg.go.dev/github.com/go-co-op/gocron/v2#EventListener)

View File

@ -1,7 +1,9 @@
//go:generate mockgen -source=distributed.go -destination=mocks/distributed.go -package=gocronmocks
package gocron
import "context"
import (
"context"
)
// Elector determines the leader from instances asking to be the leader. Only
// the leader runs jobs. If the leader goes down, a new leader will be elected.
@ -10,3 +12,19 @@ type Elector interface {
// making the request and an error if the job should not be scheduled.
IsLeader(context.Context) error
}
// Locker represents the required interface to lock jobs when running multiple schedulers.
// The lock is held for the duration of the job's run, and it is expected that the
// locker implementation handles time splay between schedulers.
// The lock key passed is the job's name - which, if not set, defaults to the
// go function's name, e.g. "pkg.myJob" for func myJob() {} in pkg
type Locker interface {
// Lock if an error is returned by lock, the job will not be scheduled.
Lock(ctx context.Context, key string) (Lock, error)
}
// Lock represents an obtained lock. The lock is released after the execution of the job
// by the scheduler.
type Lock interface {
Unlock(ctx context.Context) error
}

View File

@ -30,6 +30,7 @@ var (
ErrWeeklyJobMinutesSeconds = fmt.Errorf("gocron: WeeklyJob: atTimes minutes and seconds must be between 0 and 59 inclusive")
ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil")
ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil")
ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil")
ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")

View File

@ -426,6 +426,31 @@ func ExampleWithDistributedElector() {
//)
}
func ExampleWithDistributedLocker() {
//var _ Locker = (*myLocker)(nil)
//
//type myLocker struct{}
//
//func (m myLocker) Lock(ctx context.Context, key string) (Lock, error) {
// return &testLock, nil
//}
//
//var _ Lock = (*testLock)(nil)
//
//type testLock struct {
//}
//
//func (t testLock) Unlock(_ context.Context) error {
// return nil
//}
//
//locker := myLocker{}
//
//_, _ = NewScheduler(
// WithDistributedLocker(locker),
//)
}
func ExampleWithEventListeners() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()

View File

@ -22,6 +22,7 @@ type executor struct {
singletonRunners map[uuid.UUID]singletonRunner
limitMode *limitModeConfig
elector Elector
locker Locker
}
type singletonRunner struct {
@ -340,6 +341,12 @@ func (e *executor) runJob(j internalJob) {
if err := e.elector.IsLeader(j.ctx); err != nil {
return
}
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.id.String())
if err != nil {
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id)

View File

@ -2,7 +2,9 @@
package gocron
import (
"fmt"
"log"
"strings"
)
// Logger is the interface that wraps the basic logging methods
@ -12,20 +14,20 @@ import (
// or implement your own Logger. The actual level of Log that is logged
// is handled by the implementation.
type Logger interface {
Debug(msg string, args ...interface{})
Error(msg string, args ...interface{})
Info(msg string, args ...interface{})
Warn(msg string, args ...interface{})
Debug(msg string, args ...any)
Error(msg string, args ...any)
Info(msg string, args ...any)
Warn(msg string, args ...any)
}
var _ Logger = (*noOpLogger)(nil)
type noOpLogger struct{}
func (l noOpLogger) Debug(_ string, _ ...interface{}) {}
func (l noOpLogger) Error(_ string, _ ...interface{}) {}
func (l noOpLogger) Info(_ string, _ ...interface{}) {}
func (l noOpLogger) Warn(_ string, _ ...interface{}) {}
func (l noOpLogger) Debug(_ string, _ ...any) {}
func (l noOpLogger) Error(_ string, _ ...any) {}
func (l noOpLogger) Info(_ string, _ ...any) {}
func (l noOpLogger) Warn(_ string, _ ...any) {}
var _ Logger = (*logger)(nil)
@ -49,46 +51,44 @@ func NewLogger(level LogLevel) Logger {
return &logger{level: level}
}
func (l *logger) Debug(msg string, args ...interface{}) {
func (l *logger) Debug(msg string, args ...any) {
if l.level < LogLevelDebug {
return
}
if len(args) == 0 {
log.Printf("DEBUG: %s\n", msg)
return
}
log.Printf("DEBUG: %s, %v\n", msg, args)
log.Printf("DEBUG: %s%s\n", msg, logFormatArgs(args...))
}
func (l *logger) Error(msg string, args ...interface{}) {
func (l *logger) Error(msg string, args ...any) {
if l.level < LogLevelError {
return
}
if len(args) == 0 {
log.Printf("ERROR: %s\n", msg)
return
}
log.Printf("ERROR: %s, %v\n", msg, args)
log.Printf("ERROR: %s%s\n", msg, logFormatArgs(args...))
}
func (l *logger) Info(msg string, args ...interface{}) {
func (l *logger) Info(msg string, args ...any) {
if l.level < LogLevelInfo {
return
}
if len(args) == 0 {
log.Printf("INFO: %s\n", msg)
return
}
log.Printf("INFO: %s, %v\n", msg, args)
log.Printf("INFO: %s%s\n", msg, logFormatArgs(args...))
}
func (l *logger) Warn(msg string, args ...interface{}) {
func (l *logger) Warn(msg string, args ...any) {
if l.level < LogLevelWarn {
return
}
if len(args) == 0 {
log.Printf("WARN: %s\n", msg)
return
}
log.Printf("WARN: %s, %v\n", msg, args)
log.Printf("WARN: %s%s\n", msg, logFormatArgs(args...))
}
func logFormatArgs(args ...any) string {
if len(args) == 0 {
return ""
}
if len(args)%2 != 0 {
return ", " + fmt.Sprint(args...)
}
var pairs []string
for i := 0; i < len(args); i += 2 {
pairs = append(pairs, fmt.Sprintf("%s=%v", args[i], args[i+1]))
}
return ", " + strings.Join(pairs, ", ")
}

View File

@ -3,6 +3,7 @@ package gocron
import (
"bytes"
"log"
"strings"
"testing"
"github.com/stretchr/testify/assert"
@ -45,32 +46,60 @@ func TestNewLogger(t *testing.T) {
log.SetOutput(&results)
l := NewLogger(tt.level)
l.Debug("debug", "arg1", "arg2")
if tt.level >= LogLevelDebug {
assert.Contains(t, results.String(), "DEBUG: debug, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
}
var noArgs []any
oneArg := []any{"arg1"}
twoArgs := []any{"arg1", "arg2"}
var noArgsStr []string
oneArgStr := []string{"arg1"}
twoArgsStr := []string{"arg1", "arg2"}
l.Info("info", "arg1", "arg2")
if tt.level >= LogLevelInfo {
assert.Contains(t, results.String(), "INFO: info, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
}
for _, args := range []struct {
argsAny []any
argsStr []string
}{
{noArgs, noArgsStr},
{oneArg, oneArgStr},
{twoArgs, twoArgsStr},
} {
l.Debug("debug", args.argsAny...)
if tt.level >= LogLevelDebug {
r := results.String()
assert.Contains(t, r, "DEBUG: debug")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()
l.Warn("warn", "arg1", "arg2")
if tt.level >= LogLevelWarn {
assert.Contains(t, results.String(), "WARN: warn, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
}
l.Info("info", args.argsAny...)
if tt.level >= LogLevelInfo {
r := results.String()
assert.Contains(t, r, "INFO: info")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()
l.Error("error", "arg1", "arg2")
if tt.level >= LogLevelError {
assert.Contains(t, results.String(), "ERROR: error, [arg1 arg2]\n")
} else {
assert.Empty(t, results.String())
l.Warn("warn", args.argsAny...)
if tt.level >= LogLevelWarn {
r := results.String()
assert.Contains(t, r, "WARN: warn")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()
l.Error("error", args.argsAny...)
if tt.level >= LogLevelError {
r := results.String()
assert.Contains(t, r, "ERROR: error")
assert.Contains(t, r, strings.Join(args.argsStr, "="))
} else {
assert.Empty(t, results.String())
}
results.Reset()
}
})
}

View File

@ -4,6 +4,7 @@ package gocron
import (
"context"
"reflect"
"runtime"
"time"
"github.com/google/uuid"
@ -395,6 +396,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
return nil, ErrNewJobTaskNotFunc
}
j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
j.function = tsk.function
j.parameters = tsk.parameters
@ -533,6 +535,19 @@ func WithDistributedElector(elector Elector) SchedulerOption {
}
}
// WithDistributedLocker sets the locker to be used by multiple
// Scheduler instances to ensure that only one instance of each
// job is run.
func WithDistributedLocker(locker Locker) SchedulerOption {
return func(s *scheduler) error {
if locker == nil {
return ErrWithDistributedLockerNil
}
s.exec.locker = locker
return nil
}
}
// WithGlobalJobOptions sets JobOption's that will be applied to
// all jobs added to the scheduler. JobOption's set on the job
// itself will override if the same JobOption is set globally.

View File

@ -811,6 +811,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) {
WithDistributedElector(nil),
ErrWithDistributedElectorNil,
},
{
"WithDistributedLocker nil",
WithDistributedLocker(nil),
ErrWithDistributedLockerNil,
},
{
"WithLimitConcurrentJobs limit 0",
WithLimitConcurrentJobs(0, LimitModeWait),
@ -1006,15 +1011,46 @@ func (t *testElector) IsLeader(ctx context.Context) error {
return nil
}
func TestScheduler_WithDistributedElector(t *testing.T) {
var _ Locker = (*testLocker)(nil)
type testLocker struct {
mu sync.Mutex
jobLocked bool
}
func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.jobLocked {
return nil, fmt.Errorf("job already locked")
}
return &testLock{}, nil
}
var _ Lock = (*testLock)(nil)
type testLock struct{}
func (t testLock) Unlock(_ context.Context) error {
return nil
}
func TestScheduler_WithDistributed(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
count int
opt SchedulerOption
}{
{
"3 schedulers",
"3 schedulers with elector",
3,
WithDistributedElector(&testElector{}),
},
{
"3 schedulers with locker",
3,
WithDistributedLocker(&testLocker{}),
},
}