after lock error listener (#734)

* after job listener

* fixing lint issues

---------

Co-authored-by: lv90no <manuel.doncel.martos@ing.com>
This commit is contained in:
Manuel Doncel Martos 2024-06-19 13:33:50 +02:00 committed by GitHub
parent b12ca98e93
commit 894970124c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 127 additions and 0 deletions

View File

@ -1,6 +1,8 @@
package gocron_test
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -10,6 +12,14 @@ import (
"github.com/jonboulle/clockwork"
)
var _ Locker = new(errorLocker)
type errorLocker struct{}
func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) {
return nil, errors.New("locked")
}
func ExampleAfterJobRuns() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
@ -52,6 +62,28 @@ func ExampleAfterJobRunsWithError() {
)
}
func ExampleAfterLockError() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
_, _ = s.NewJob(
DurationJob(
time.Second,
),
NewTask(
func() {},
),
WithDistributedJobLocker(&errorLocker{}),
WithEventListeners(
AfterLockError(
func(jobID uuid.UUID, jobName string, err error) {
// do something immediately before the job is run
},
),
),
)
}
func ExampleBeforeJobRuns() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()

View File

@ -343,6 +343,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
} else if j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
@ -351,6 +352,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return

13
job.go
View File

@ -42,6 +42,7 @@ type internalJob struct {
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error)
afterLockError func(jobID uuid.UUID, jobName string, err error)
locker Locker
}
@ -639,6 +640,18 @@ func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) Even
}
}
// AfterLockError is used to when the distributed locker returns an error and
// then run the provided function.
func AfterLockError(eventListenerFunc func(jobID uuid.UUID, jobName string, err error)) EventListener {
return func(j *internalJob) error {
if eventListenerFunc == nil {
return ErrEventListenerFuncNil
}
j.afterLockError = eventListenerFunc
return nil
}
}
// -----------------------------------------------
// -----------------------------------------------
// ---------------- Job Schedules ----------------

View File

@ -437,12 +437,20 @@ func TestWithEventListeners(t *testing.T) {
},
nil,
},
{
"afterLockError",
[]EventListener{
AfterLockError(func(_ uuid.UUID, _ string, _ error) {}),
},
nil,
},
{
"multiple event listeners",
[]EventListener{
AfterJobRuns(func(_ uuid.UUID, _ string) {}),
AfterJobRunsWithError(func(_ uuid.UUID, _ string, _ error) {}),
BeforeJobRuns(func(_ uuid.UUID, _ string) {}),
AfterLockError(func(_ uuid.UUID, _ string, _ error) {}),
},
nil,
},
@ -488,6 +496,9 @@ func TestWithEventListeners(t *testing.T) {
if ij.beforeJobRuns != nil {
count++
}
if ij.afterLockError != nil {
count++
}
assert.Equal(t, len(tt.eventListeners), count)
})
}

View File

@ -2,6 +2,7 @@ package gocron
import (
"context"
"errors"
"fmt"
"os"
"sync"
@ -49,6 +50,14 @@ func newTestScheduler(t *testing.T, options ...SchedulerOption) Scheduler {
return s
}
var _ Locker = new(errorLocker)
type errorLocker struct{}
func (e errorLocker) Lock(_ context.Context, _ string) (Lock, error) {
return nil, errors.New("locked")
}
func TestScheduler_OneSecond_NoOptions(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
cronNoOptionsCh := make(chan struct{}, 10)
@ -1631,6 +1640,66 @@ func TestScheduler_WithEventListeners(t *testing.T) {
}
}
func TestScheduler_WithLocker_WithEventListeners(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
listenerRunCh := make(chan error, 1)
tests := []struct {
name string
locker Locker
tsk Task
el EventListener
expectRun bool
expectErr error
}{
{
"AfterLockError",
errorLocker{},
NewTask(func() {}),
AfterLockError(func(_ uuid.UUID, _ string, err error) {
listenerRunCh <- nil
}),
true,
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t)
_, err := s.NewJob(
DurationJob(time.Minute*10),
tt.tsk,
WithStartAt(
WithStartImmediately(),
),
WithDistributedJobLocker(tt.locker),
WithEventListeners(tt.el),
WithLimitedRuns(1),
)
require.NoError(t, err)
s.Start()
if tt.expectRun {
select {
case err = <-listenerRunCh:
assert.ErrorIs(t, err, tt.expectErr)
case <-time.After(time.Second):
t.Fatal("timed out waiting for listener to run")
}
} else {
select {
case <-listenerRunCh:
t.Fatal("listener ran when it shouldn't have")
case <-time.After(time.Millisecond * 100):
}
}
require.NoError(t, s.Shutdown())
})
}
}
func TestScheduler_ManyJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)