mirror of https://github.com/go-co-op/gocron.git
cancel running jobs if possible when run Scheduler.StopJobs() (#819)
* cancel running job when run Scheduler.StopJobs() Signed-off-by: lou <alex1988@outlook.com> * trivial Signed-off-by: lou <alex1988@outlook.com> * Update scheduler_test.go --------- Signed-off-by: lou <alex1988@outlook.com> Co-authored-by: John Roesler <johnrroesler@gmail.com>
This commit is contained in:
parent
cc66c780d8
commit
50966c7365
19
job.go
19
job.go
|
|
@ -18,11 +18,12 @@ import (
|
|||
// internalJob stores the information needed by the scheduler
|
||||
// to manage scheduling, starting and stopping the job
|
||||
type internalJob struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
id uuid.UUID
|
||||
name string
|
||||
tags []string
|
||||
ctx context.Context
|
||||
parentCtx context.Context
|
||||
cancel context.CancelFunc
|
||||
id uuid.UUID
|
||||
name string
|
||||
tags []string
|
||||
jobSchedule
|
||||
|
||||
// as some jobs may queue up, it's possible to
|
||||
|
|
@ -703,6 +704,14 @@ func WithIdentifier(id uuid.UUID) JobOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithContext sets the parent context for the job
|
||||
func WithContext(ctx context.Context) JobOption {
|
||||
return func(j *internalJob, _ time.Time) error {
|
||||
j.parentCtx = ctx
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------
|
||||
// -----------------------------------------------
|
||||
// ------------- Job Event Listeners -------------
|
||||
|
|
|
|||
28
scheduler.go
28
scheduler.go
|
|
@ -648,8 +648,6 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
|
|||
j.id = id
|
||||
}
|
||||
|
||||
j.ctx, j.cancel = context.WithCancel(s.shutdownCtx)
|
||||
|
||||
if taskWrapper == nil {
|
||||
return nil, ErrNewJobTaskNil
|
||||
}
|
||||
|
|
@ -664,10 +662,6 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
|
|||
return nil, ErrNewJobTaskNotFunc
|
||||
}
|
||||
|
||||
if err := s.verifyParameterType(taskFunc, tsk); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
|
||||
j.function = tsk.function
|
||||
j.parameters = tsk.parameters
|
||||
|
|
@ -686,6 +680,28 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
|
|||
}
|
||||
}
|
||||
|
||||
if j.parentCtx == nil {
|
||||
j.parentCtx = s.shutdownCtx
|
||||
}
|
||||
j.ctx, j.cancel = context.WithCancel(j.parentCtx)
|
||||
|
||||
if !taskFunc.IsZero() && taskFunc.Type().NumIn() > 0 {
|
||||
// if the first parameter is a context.Context and params have no context.Context, add current ctx to the params
|
||||
if taskFunc.Type().In(0) == reflect.TypeOf((*context.Context)(nil)).Elem() {
|
||||
if len(tsk.parameters) == 0 {
|
||||
tsk.parameters = []any{j.ctx}
|
||||
j.parameters = []any{j.ctx}
|
||||
} else if _, ok := tsk.parameters[0].(context.Context); !ok {
|
||||
tsk.parameters = append([]any{j.ctx}, tsk.parameters...)
|
||||
j.parameters = append([]any{j.ctx}, j.parameters...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.verifyParameterType(taskFunc, tsk); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := definition.setup(&j, s.location, s.exec.clock.Now()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -361,6 +361,41 @@ func TestScheduler_StopTimeout(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestScheduler_StopLongRunningJobs(t *testing.T) {
|
||||
t.Run("start, run job, stop jobs before job is completed", func(t *testing.T) {
|
||||
s := newTestScheduler(t,
|
||||
WithStopTimeout(50*time.Millisecond),
|
||||
)
|
||||
|
||||
_, err := s.NewJob(
|
||||
DurationJob(
|
||||
50*time.Millisecond,
|
||||
),
|
||||
NewTask(
|
||||
func(ctx context.Context) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("job can not been canceled")
|
||||
}
|
||||
},
|
||||
),
|
||||
WithStartAt(
|
||||
WithStartImmediately(),
|
||||
),
|
||||
WithSingletonMode(LimitModeReschedule),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.Start()
|
||||
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
// the running job is canceled, no unexpected timeout error
|
||||
require.NoError(t, s.StopJobs())
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
})
|
||||
}
|
||||
|
||||
func TestScheduler_Shutdown(t *testing.T) {
|
||||
defer verifyNoGoroutineLeaks(t)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue