2023-12-11 16:39:59 +00:00
|
|
|
//go:generate mockgen -destination=mocks/scheduler.go -package=gocronmocks . Scheduler
|
2023-11-08 17:11:42 +00:00
|
|
|
package gocron
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"reflect"
|
2023-11-14 15:56:05 +00:00
|
|
|
"runtime"
|
2023-11-08 17:11:42 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
|
"github.com/jonboulle/clockwork"
|
2023-11-09 15:49:21 +00:00
|
|
|
"golang.org/x/exp/slices"
|
2023-11-08 17:11:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var _ Scheduler = (*scheduler)(nil)
|
|
|
|
|
|
2023-11-22 12:43:50 +00:00
|
|
|
// Scheduler defines the interface for the Scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
type Scheduler interface {
|
2023-12-20 13:13:58 +00:00
|
|
|
// Jobs returns a list of all jobs in the scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
Jobs() []Job
|
2023-12-20 13:13:58 +00:00
|
|
|
// NewJob creates a new Job and adds it to the Scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
NewJob(JobDefinition, Task, ...JobOption) (Job, error)
|
2023-12-20 13:13:58 +00:00
|
|
|
// RemoveByTags removes all jobs matching the provided tags.
|
2023-11-08 17:11:42 +00:00
|
|
|
RemoveByTags(...string)
|
2023-12-20 13:13:58 +00:00
|
|
|
// RemoveJob removes the job with the provided id.
|
2023-11-08 17:11:42 +00:00
|
|
|
RemoveJob(uuid.UUID) error
|
2023-12-20 13:13:58 +00:00
|
|
|
// Start begins scheduling jobs for execution based
|
|
|
|
|
// on each job's definition. Job's added to an already
|
|
|
|
|
// running scheduler will be scheduled immediately based
|
|
|
|
|
// on definition. Start is non-blocking.
|
2023-11-08 17:11:42 +00:00
|
|
|
Start()
|
2023-12-20 13:13:58 +00:00
|
|
|
// StopJobs stops the execution of all jobs in the scheduler.
|
|
|
|
|
// This can be useful in situations where jobs need to be
|
|
|
|
|
// paused globally and then restarted with Start().
|
2023-11-08 17:11:42 +00:00
|
|
|
StopJobs() error
|
2023-12-20 13:13:58 +00:00
|
|
|
// Shutdown should be called when you no longer need
|
|
|
|
|
// the Scheduler or Job's as the Scheduler cannot
|
|
|
|
|
// be restarted after calling Shutdown. This is similar
|
|
|
|
|
// to a Close or Cleanup method and is often deferred after
|
|
|
|
|
// starting the scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
Shutdown() error
|
2023-12-20 13:13:58 +00:00
|
|
|
// Update replaces the existing Job's JobDefinition with the provided
|
|
|
|
|
// JobDefinition. The Job's Job.ID() remains the same.
|
2023-11-08 17:11:42 +00:00
|
|
|
Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// ----------------- Scheduler -------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
type scheduler struct {
|
|
|
|
|
shutdownCtx context.Context
|
|
|
|
|
shutdownCancel context.CancelFunc
|
|
|
|
|
exec executor
|
|
|
|
|
jobs map[uuid.UUID]internalJob
|
|
|
|
|
location *time.Location
|
|
|
|
|
clock clockwork.Clock
|
|
|
|
|
started bool
|
|
|
|
|
globalJobOptions []JobOption
|
|
|
|
|
logger Logger
|
|
|
|
|
|
|
|
|
|
startCh chan struct{}
|
|
|
|
|
startedCh chan struct{}
|
|
|
|
|
stopCh chan struct{}
|
|
|
|
|
stopErrCh chan error
|
|
|
|
|
allJobsOutRequest chan allJobsOutRequest
|
|
|
|
|
jobOutRequestCh chan jobOutRequest
|
2023-12-19 03:13:37 +00:00
|
|
|
runJobRequestCh chan runJobRequest
|
2023-11-08 17:11:42 +00:00
|
|
|
newJobCh chan internalJob
|
|
|
|
|
removeJobCh chan uuid.UUID
|
|
|
|
|
removeJobsByTagsCh chan []string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type jobOutRequest struct {
|
|
|
|
|
id uuid.UUID
|
|
|
|
|
outChan chan internalJob
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
type runJobRequest struct {
|
|
|
|
|
id uuid.UUID
|
|
|
|
|
outChan chan error
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
type allJobsOutRequest struct {
|
|
|
|
|
outChan chan []Job
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-22 12:43:50 +00:00
|
|
|
// NewScheduler creates a new Scheduler instance.
|
|
|
|
|
// The Scheduler is not started until Start() is called.
|
|
|
|
|
//
|
|
|
|
|
// NewJob will add jobs to the Scheduler, but they will not
|
|
|
|
|
// be scheduled until Start() is called.
|
2023-11-08 17:11:42 +00:00
|
|
|
func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
|
|
|
|
|
schCtx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
exec := executor{
|
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
|
stopTimeout: time.Second * 10,
|
|
|
|
|
singletonRunners: make(map[uuid.UUID]singletonRunner),
|
|
|
|
|
logger: &noOpLogger{},
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
jobsIn: make(chan jobIn),
|
2023-11-08 17:11:42 +00:00
|
|
|
jobIDsOut: make(chan uuid.UUID),
|
|
|
|
|
jobOutRequest: make(chan jobOutRequest, 1000),
|
|
|
|
|
done: make(chan error),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s := &scheduler{
|
|
|
|
|
shutdownCtx: schCtx,
|
|
|
|
|
shutdownCancel: cancel,
|
|
|
|
|
exec: exec,
|
|
|
|
|
jobs: make(map[uuid.UUID]internalJob),
|
|
|
|
|
location: time.Local,
|
|
|
|
|
clock: clockwork.NewRealClock(),
|
|
|
|
|
logger: &noOpLogger{},
|
|
|
|
|
|
|
|
|
|
newJobCh: make(chan internalJob),
|
|
|
|
|
removeJobCh: make(chan uuid.UUID),
|
|
|
|
|
removeJobsByTagsCh: make(chan []string),
|
|
|
|
|
startCh: make(chan struct{}),
|
|
|
|
|
startedCh: make(chan struct{}),
|
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
|
stopErrCh: make(chan error, 1),
|
|
|
|
|
jobOutRequestCh: make(chan jobOutRequest),
|
2023-12-19 03:13:37 +00:00
|
|
|
runJobRequestCh: make(chan runJobRequest),
|
2023-11-08 17:11:42 +00:00
|
|
|
allJobsOutRequest: make(chan allJobsOutRequest),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, option := range options {
|
|
|
|
|
err := option(s)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go func() {
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Info("gocron: new scheduler created")
|
2023-11-08 17:11:42 +00:00
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case id := <-s.exec.jobIDsOut:
|
|
|
|
|
s.selectExecJobIDsOut(id)
|
|
|
|
|
|
|
|
|
|
case j := <-s.newJobCh:
|
|
|
|
|
s.selectNewJob(j)
|
|
|
|
|
|
|
|
|
|
case id := <-s.removeJobCh:
|
|
|
|
|
s.selectRemoveJob(id)
|
|
|
|
|
|
|
|
|
|
case tags := <-s.removeJobsByTagsCh:
|
|
|
|
|
s.selectRemoveJobsByTags(tags)
|
|
|
|
|
|
|
|
|
|
case out := <-s.exec.jobOutRequest:
|
|
|
|
|
s.selectJobOutRequest(out)
|
|
|
|
|
|
|
|
|
|
case out := <-s.jobOutRequestCh:
|
|
|
|
|
s.selectJobOutRequest(out)
|
|
|
|
|
|
|
|
|
|
case out := <-s.allJobsOutRequest:
|
|
|
|
|
s.selectAllJobsOutRequest(out)
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
case run := <-s.runJobRequestCh:
|
|
|
|
|
s.selectRunJobRequest(run)
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
case <-s.startCh:
|
|
|
|
|
s.selectStart()
|
|
|
|
|
|
|
|
|
|
case <-s.stopCh:
|
|
|
|
|
s.stopScheduler()
|
|
|
|
|
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
s.stopScheduler()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
return s, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// --------- Scheduler Channel Methods -----------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
// The scheduler's channel functions are broken out here
|
|
|
|
|
// to allow prioritizing within the select blocks. The idea
|
|
|
|
|
// being that we want to make sure that scheduling tasks
|
|
|
|
|
// are not blocked by requests from the caller for information
|
|
|
|
|
// about jobs.
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) stopScheduler() {
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Debug("gocron: stopping scheduler")
|
2023-11-08 17:11:42 +00:00
|
|
|
if s.started {
|
|
|
|
|
s.exec.stopCh <- struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, j := range s.jobs {
|
|
|
|
|
j.stop()
|
|
|
|
|
}
|
|
|
|
|
for id, j := range s.jobs {
|
|
|
|
|
<-j.ctx.Done()
|
|
|
|
|
|
|
|
|
|
j.ctx, j.cancel = context.WithCancel(s.shutdownCtx)
|
|
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
var err error
|
|
|
|
|
if s.started {
|
|
|
|
|
select {
|
|
|
|
|
case err = <-s.exec.done:
|
|
|
|
|
case <-time.After(s.exec.stopTimeout + 1*time.Second):
|
|
|
|
|
err = ErrStopExecutorTimedOut
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.stopErrCh <- err
|
|
|
|
|
s.started = false
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Debug("gocron: scheduler stopped")
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
|
|
|
|
|
outJobs := make([]Job, len(s.jobs))
|
|
|
|
|
var counter int
|
|
|
|
|
for _, j := range s.jobs {
|
|
|
|
|
outJobs[counter] = s.jobFromInternalJob(j)
|
|
|
|
|
counter++
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case out.outChan <- outJobs:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
func (s *scheduler) selectRunJobRequest(run runJobRequest) {
|
|
|
|
|
j, ok := s.jobs[run.id]
|
|
|
|
|
if !ok {
|
|
|
|
|
select {
|
|
|
|
|
case run.outChan <- ErrJobNotFound:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
select {
|
|
|
|
|
case run.outChan <- ErrJobRunNowFailed:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: j.id,
|
|
|
|
|
shouldSendOut: false,
|
|
|
|
|
}:
|
|
|
|
|
select {
|
|
|
|
|
case run.outChan <- nil:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
func (s *scheduler) selectRemoveJob(id uuid.UUID) {
|
|
|
|
|
j, ok := s.jobs[id]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
j.stop()
|
|
|
|
|
delete(s.jobs, id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
|
|
|
|
|
j := s.jobs[id]
|
|
|
|
|
j.lastRun = j.nextRun
|
|
|
|
|
|
|
|
|
|
if j.limitRunsTo != nil {
|
|
|
|
|
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
|
|
|
|
|
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
|
|
|
|
|
go func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return
|
|
|
|
|
case s.removeJobCh <- id:
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
next := j.next(j.lastRun)
|
2023-12-19 03:13:37 +00:00
|
|
|
if next.IsZero() {
|
|
|
|
|
return
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
j.nextRun = next
|
|
|
|
|
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: j.id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
|
|
|
|
|
if j, ok := s.jobs[out.id]; ok {
|
|
|
|
|
select {
|
|
|
|
|
case out.outChan <- j:
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
close(out.outChan)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectNewJob(j internalJob) {
|
|
|
|
|
if s.started {
|
|
|
|
|
next := j.startTime
|
|
|
|
|
if j.startImmediately {
|
|
|
|
|
next = s.now()
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: j.id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if next.IsZero() {
|
|
|
|
|
next = j.next(s.now())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id := j.id
|
|
|
|
|
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
j.nextRun = next
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.jobs[j.id] = j
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectRemoveJobsByTags(tags []string) {
|
|
|
|
|
for _, j := range s.jobs {
|
|
|
|
|
for _, tag := range tags {
|
|
|
|
|
if slices.Contains(j.tags, tag) {
|
|
|
|
|
j.stop()
|
|
|
|
|
delete(s.jobs, j.id)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectStart() {
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Debug("gocron: scheduler starting")
|
2023-11-08 17:11:42 +00:00
|
|
|
go s.exec.start()
|
|
|
|
|
|
|
|
|
|
s.started = true
|
|
|
|
|
for id, j := range s.jobs {
|
|
|
|
|
next := j.startTime
|
|
|
|
|
if j.startImmediately {
|
|
|
|
|
next = s.now()
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if next.IsZero() {
|
|
|
|
|
next = j.next(s.now())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
jobID := id
|
|
|
|
|
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: jobID,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
j.nextRun = next
|
|
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.startedCh <- struct{}{}:
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Info("gocron: scheduler started")
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// ------------- Scheduler Methods ---------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) now() time.Time {
|
|
|
|
|
return s.clock.Now().In(s.location)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) jobFromInternalJob(in internalJob) job {
|
|
|
|
|
return job{
|
|
|
|
|
id: in.id,
|
|
|
|
|
name: in.name,
|
|
|
|
|
tags: slices.Clone(in.tags),
|
|
|
|
|
jobOutRequest: s.jobOutRequestCh,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Jobs() []Job {
|
|
|
|
|
outChan := make(chan []Job)
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.allJobsOutRequest <- allJobsOutRequest{outChan: outChan}:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var jobs []Job
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case jobs = <-outChan:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return jobs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) NewJob(jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) {
|
|
|
|
|
return s.addOrUpdateJob(uuid.Nil, jobDefinition, task, options)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) {
|
|
|
|
|
j := internalJob{}
|
|
|
|
|
if id == uuid.Nil {
|
|
|
|
|
j.id = uuid.New()
|
|
|
|
|
} else {
|
|
|
|
|
currentJob := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh)
|
|
|
|
|
if currentJob != nil && currentJob.id != uuid.Nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return nil, nil
|
|
|
|
|
case s.removeJobCh <- id:
|
|
|
|
|
<-currentJob.ctx.Done()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
j.id = id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
j.ctx, j.cancel = context.WithCancel(s.shutdownCtx)
|
|
|
|
|
|
|
|
|
|
if taskWrapper == nil {
|
|
|
|
|
return nil, ErrNewJobTaskNil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsk := taskWrapper()
|
|
|
|
|
taskFunc := reflect.ValueOf(tsk.function)
|
|
|
|
|
for taskFunc.Kind() == reflect.Ptr {
|
|
|
|
|
taskFunc = taskFunc.Elem()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if taskFunc.Kind() != reflect.Func {
|
2023-11-09 21:04:18 +00:00
|
|
|
return nil, ErrNewJobTaskNotFunc
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
2023-12-14 19:52:32 +00:00
|
|
|
expectedParameterLength := taskFunc.Type().NumIn()
|
|
|
|
|
if len(tsk.parameters) != expectedParameterLength {
|
|
|
|
|
return nil, ErrNewJobWrongNumberOfParameters
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := 0; i < expectedParameterLength; i++ {
|
|
|
|
|
t1 := reflect.TypeOf(tsk.parameters[i]).Kind()
|
|
|
|
|
if t1 == reflect.Interface || t1 == reflect.Pointer {
|
|
|
|
|
t1 = reflect.TypeOf(tsk.parameters[i]).Elem().Kind()
|
|
|
|
|
}
|
|
|
|
|
t2 := reflect.New(taskFunc.Type().In(i)).Elem().Kind()
|
|
|
|
|
if t2 == reflect.Interface || t2 == reflect.Pointer {
|
|
|
|
|
t2 = reflect.Indirect(reflect.ValueOf(taskFunc.Type().In(i))).Kind()
|
|
|
|
|
}
|
|
|
|
|
if t1 != t2 {
|
|
|
|
|
return nil, ErrNewJobWrongTypeOfParameters
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-14 15:56:05 +00:00
|
|
|
j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
|
2023-11-08 17:11:42 +00:00
|
|
|
j.function = tsk.function
|
|
|
|
|
j.parameters = tsk.parameters
|
|
|
|
|
|
|
|
|
|
// apply global job options
|
|
|
|
|
for _, option := range s.globalJobOptions {
|
|
|
|
|
if err := option(&j); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// apply job specific options, which take precedence
|
|
|
|
|
for _, option := range options {
|
|
|
|
|
if err := option(&j); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := definition.setup(&j, s.location); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.newJobCh <- j:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return &job{
|
|
|
|
|
id: j.id,
|
|
|
|
|
name: j.name,
|
|
|
|
|
tags: slices.Clone(j.tags),
|
|
|
|
|
jobOutRequest: s.jobOutRequestCh,
|
2023-12-19 03:13:37 +00:00
|
|
|
runJobRequest: s.runJobRequestCh,
|
2023-11-08 17:11:42 +00:00
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) RemoveByTags(tags ...string) {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.removeJobsByTagsCh <- tags:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) RemoveJob(id uuid.UUID) error {
|
|
|
|
|
j := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh)
|
|
|
|
|
if j == nil || j.id == uuid.Nil {
|
|
|
|
|
return ErrJobNotFound
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.removeJobCh <- id:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Start() {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.startCh <- struct{}{}:
|
|
|
|
|
<-s.startedCh
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) StopJobs() error {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return nil
|
|
|
|
|
case s.stopCh <- struct{}{}:
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case err := <-s.stopErrCh:
|
|
|
|
|
return err
|
|
|
|
|
case <-time.After(s.exec.stopTimeout + 2*time.Second):
|
|
|
|
|
return ErrStopSchedulerTimedOut
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Shutdown() error {
|
|
|
|
|
s.shutdownCancel()
|
|
|
|
|
select {
|
|
|
|
|
case err := <-s.stopErrCh:
|
|
|
|
|
return err
|
|
|
|
|
case <-time.After(s.exec.stopTimeout + 2*time.Second):
|
|
|
|
|
return ErrStopSchedulerTimedOut
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) {
|
|
|
|
|
return s.addOrUpdateJob(id, jobDefinition, task, options)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// ------------- Scheduler Options ---------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
// SchedulerOption defines the function for setting
|
|
|
|
|
// options on the Scheduler.
|
|
|
|
|
type SchedulerOption func(*scheduler) error
|
|
|
|
|
|
2023-11-09 21:04:18 +00:00
|
|
|
// WithClock sets the clock used by the Scheduler
|
|
|
|
|
// to the clock provided. See https://github.com/jonboulle/clockwork
|
|
|
|
|
func WithClock(clock clockwork.Clock) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if clock == nil {
|
|
|
|
|
return ErrWithClockNil
|
|
|
|
|
}
|
|
|
|
|
s.clock = clock
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
// WithDistributedElector sets the elector to be used by multiple
|
|
|
|
|
// Scheduler instances to determine who should be the leader.
|
|
|
|
|
// Only the leader runs jobs, while non-leaders wait and continue
|
|
|
|
|
// to check if a new leader has been elected.
|
|
|
|
|
func WithDistributedElector(elector Elector) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if elector == nil {
|
2023-11-09 21:04:18 +00:00
|
|
|
return ErrWithDistributedElectorNil
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
s.exec.elector = elector
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-14 15:56:05 +00:00
|
|
|
// WithDistributedLocker sets the locker to be used by multiple
|
|
|
|
|
// Scheduler instances to ensure that only one instance of each
|
|
|
|
|
// job is run.
|
|
|
|
|
func WithDistributedLocker(locker Locker) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if locker == nil {
|
|
|
|
|
return ErrWithDistributedLockerNil
|
|
|
|
|
}
|
|
|
|
|
s.exec.locker = locker
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
// WithGlobalJobOptions sets JobOption's that will be applied to
|
|
|
|
|
// all jobs added to the scheduler. JobOption's set on the job
|
|
|
|
|
// itself will override if the same JobOption is set globally.
|
|
|
|
|
func WithGlobalJobOptions(jobOptions ...JobOption) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
s.globalJobOptions = jobOptions
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LimitMode defines the modes used for handling jobs that reach
|
|
|
|
|
// the limit provided in WithLimitConcurrentJobs
|
|
|
|
|
type LimitMode int
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// LimitModeReschedule causes jobs reaching the limit set in
|
|
|
|
|
// WithLimitConcurrentJobs or WithSingletonMode to be skipped
|
|
|
|
|
// and rescheduled for the next run time rather than being
|
|
|
|
|
// queued up to wait.
|
|
|
|
|
LimitModeReschedule = 1
|
|
|
|
|
|
|
|
|
|
// LimitModeWait causes jobs reaching the limit set in
|
|
|
|
|
// WithLimitConcurrentJobs or WithSingletonMode to wait
|
|
|
|
|
// in a queue until a slot becomes available to run.
|
|
|
|
|
//
|
|
|
|
|
// Note: this mode can produce unpredictable results as
|
|
|
|
|
// job execution order isn't guaranteed. For example, a job that
|
|
|
|
|
// executes frequently may pile up in the wait queue and be executed
|
|
|
|
|
// many times back to back when the queue opens.
|
|
|
|
|
//
|
|
|
|
|
// Warning: do not use this mode if your jobs will continue to stack
|
|
|
|
|
// up beyond the ability of the limit workers to keep up. An example of
|
|
|
|
|
// what NOT to do:
|
|
|
|
|
//
|
|
|
|
|
// s, _ := gocron.NewScheduler(gocron.WithLimitConcurrentJobs)
|
|
|
|
|
// s.NewJob(
|
|
|
|
|
// gocron.DurationJob(
|
|
|
|
|
// time.Second,
|
|
|
|
|
// Task{
|
|
|
|
|
// Function: func() {
|
|
|
|
|
// time.Sleep(10 * time.Second)
|
|
|
|
|
// },
|
|
|
|
|
// },
|
|
|
|
|
// ),
|
|
|
|
|
// )
|
|
|
|
|
LimitModeWait = 2
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// WithLimitConcurrentJobs sets the limit and mode to be used by the
|
|
|
|
|
// Scheduler for limiting the number of jobs that may be running at
|
|
|
|
|
// a given time.
|
|
|
|
|
func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
2023-11-09 21:04:18 +00:00
|
|
|
if limit == 0 {
|
|
|
|
|
return ErrWithLimitConcurrentJobsZero
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
s.exec.limitMode = &limitModeConfig{
|
2023-11-28 12:48:22 +00:00
|
|
|
mode: mode,
|
|
|
|
|
limit: limit,
|
2023-12-19 03:13:37 +00:00
|
|
|
in: make(chan jobIn, 1000),
|
2023-11-28 12:48:22 +00:00
|
|
|
singletonJobs: make(map[uuid.UUID]struct{}),
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
if mode == LimitModeReschedule {
|
|
|
|
|
s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithLocation sets the location (i.e. timezone) that the scheduler
|
|
|
|
|
// should operate within. In many systems time.Local is UTC.
|
|
|
|
|
// Default: time.Local
|
|
|
|
|
func WithLocation(location *time.Location) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if location == nil {
|
|
|
|
|
return ErrWithLocationNil
|
|
|
|
|
}
|
|
|
|
|
s.location = location
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-22 12:43:50 +00:00
|
|
|
// WithLogger sets the logger to be used by the Scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
func WithLogger(logger Logger) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if logger == nil {
|
|
|
|
|
return ErrWithLoggerNil
|
|
|
|
|
}
|
|
|
|
|
s.logger = logger
|
|
|
|
|
s.exec.logger = logger
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithStopTimeout sets the amount of time the Scheduler should
|
|
|
|
|
// wait gracefully for jobs to complete before returning when
|
|
|
|
|
// StopJobs() or Shutdown() are called.
|
|
|
|
|
// Default: 10 * time.Second
|
|
|
|
|
func WithStopTimeout(timeout time.Duration) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
2023-11-09 21:04:18 +00:00
|
|
|
if timeout <= 0 {
|
|
|
|
|
return ErrWithStopTimeoutZeroOrNegative
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
s.exec.stopTimeout = timeout
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|