2023-12-11 16:39:59 +00:00
|
|
|
//go:generate mockgen -destination=mocks/scheduler.go -package=gocronmocks . Scheduler
|
2023-11-08 17:11:42 +00:00
|
|
|
package gocron
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"reflect"
|
2023-11-14 15:56:05 +00:00
|
|
|
"runtime"
|
2025-01-31 15:17:45 +00:00
|
|
|
"slices"
|
|
|
|
|
"strings"
|
2025-10-09 16:19:35 +00:00
|
|
|
"sync/atomic"
|
2023-11-08 17:11:42 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
|
"github.com/jonboulle/clockwork"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var _ Scheduler = (*scheduler)(nil)
|
|
|
|
|
|
2023-11-22 12:43:50 +00:00
|
|
|
// Scheduler defines the interface for the Scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
type Scheduler interface {
|
2023-12-20 21:13:38 +00:00
|
|
|
// Jobs returns all the jobs currently in the scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
Jobs() []Job
|
2023-12-20 21:13:38 +00:00
|
|
|
// NewJob creates a new job in the Scheduler. The job is scheduled per the provided
|
|
|
|
|
// definition when the Scheduler is started. If the Scheduler is already running
|
|
|
|
|
// the job will be scheduled when the Scheduler is started.
|
2025-01-23 19:30:12 +00:00
|
|
|
// If you set the first argument of your Task func to be a context.Context,
|
2025-01-23 20:09:03 +00:00
|
|
|
// gocron will pass in a context (either the default Job context, or one
|
|
|
|
|
// provided via WithContext) to the job and will cancel the context on shutdown.
|
2025-01-23 19:30:12 +00:00
|
|
|
// This allows you to listen for and handle cancellation within your job.
|
2023-11-08 17:11:42 +00:00
|
|
|
NewJob(JobDefinition, Task, ...JobOption) (Job, error)
|
2023-12-20 21:13:38 +00:00
|
|
|
// RemoveByTags removes all jobs that have at least one of the provided tags.
|
2023-11-08 17:11:42 +00:00
|
|
|
RemoveByTags(...string)
|
2023-12-20 13:13:58 +00:00
|
|
|
// RemoveJob removes the job with the provided id.
|
2023-11-08 17:11:42 +00:00
|
|
|
RemoveJob(uuid.UUID) error
|
2023-12-20 21:13:38 +00:00
|
|
|
// Shutdown should be called when you no longer need
|
|
|
|
|
// the Scheduler or Job's as the Scheduler cannot
|
|
|
|
|
// be restarted after calling Shutdown. This is similar
|
|
|
|
|
// to a Close or Cleanup method and is often deferred after
|
|
|
|
|
// starting the scheduler.
|
|
|
|
|
Shutdown() error
|
2023-12-20 13:13:58 +00:00
|
|
|
// Start begins scheduling jobs for execution based
|
|
|
|
|
// on each job's definition. Job's added to an already
|
|
|
|
|
// running scheduler will be scheduled immediately based
|
|
|
|
|
// on definition. Start is non-blocking.
|
2023-11-08 17:11:42 +00:00
|
|
|
Start()
|
2023-12-20 13:13:58 +00:00
|
|
|
// StopJobs stops the execution of all jobs in the scheduler.
|
|
|
|
|
// This can be useful in situations where jobs need to be
|
|
|
|
|
// paused globally and then restarted with Start().
|
2023-11-08 17:11:42 +00:00
|
|
|
StopJobs() error
|
2023-12-20 13:13:58 +00:00
|
|
|
// Update replaces the existing Job's JobDefinition with the provided
|
|
|
|
|
// JobDefinition. The Job's Job.ID() remains the same.
|
2023-11-08 17:11:42 +00:00
|
|
|
Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error)
|
2024-04-30 19:32:20 +00:00
|
|
|
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
|
|
|
|
|
// In case of LimitModeReschedule or no limit it will be always zero
|
|
|
|
|
JobsWaitingInQueue() int
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// ----------------- Scheduler -------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
type scheduler struct {
|
2024-07-18 16:32:56 +00:00
|
|
|
// context used for shutting down
|
|
|
|
|
shutdownCtx context.Context
|
|
|
|
|
// cancel used to signal scheduler should shut down
|
|
|
|
|
shutdownCancel context.CancelFunc
|
|
|
|
|
// the executor, which actually runs the jobs sent to it via the scheduler
|
|
|
|
|
exec executor
|
|
|
|
|
// the map of jobs registered in the scheduler
|
|
|
|
|
jobs map[uuid.UUID]internalJob
|
|
|
|
|
// the location used by the scheduler for scheduling when relevant
|
|
|
|
|
location *time.Location
|
|
|
|
|
// whether the scheduler has been started or not
|
2025-10-09 16:19:35 +00:00
|
|
|
started atomic.Bool
|
2024-07-18 16:32:56 +00:00
|
|
|
// globally applied JobOption's set on all jobs added to the scheduler
|
|
|
|
|
// note: individually set JobOption's take precedence.
|
2023-11-08 17:11:42 +00:00
|
|
|
globalJobOptions []JobOption
|
2024-07-18 16:32:56 +00:00
|
|
|
// the scheduler's logger
|
|
|
|
|
logger Logger
|
|
|
|
|
|
|
|
|
|
// used to tell the scheduler to start
|
|
|
|
|
startCh chan struct{}
|
|
|
|
|
// used to report that the scheduler has started
|
|
|
|
|
startedCh chan struct{}
|
|
|
|
|
// used to tell the scheduler to stop
|
|
|
|
|
stopCh chan struct{}
|
|
|
|
|
// used to report that the scheduler has stopped
|
|
|
|
|
stopErrCh chan error
|
|
|
|
|
// used to send all the jobs out when a request is made by the client
|
|
|
|
|
allJobsOutRequest chan allJobsOutRequest
|
|
|
|
|
// used to send a jobs out when a request is made by the client
|
2025-08-27 16:03:34 +00:00
|
|
|
jobOutRequestCh chan *jobOutRequest
|
2024-07-18 16:32:56 +00:00
|
|
|
// used to run a job on-demand when requested by the client
|
|
|
|
|
runJobRequestCh chan runJobRequest
|
|
|
|
|
// new jobs are received here
|
|
|
|
|
newJobCh chan newJobIn
|
|
|
|
|
// requests from the client to remove jobs by ID are received here
|
|
|
|
|
removeJobCh chan uuid.UUID
|
|
|
|
|
// requests from the client to remove jobs by tags are received here
|
2023-11-08 17:11:42 +00:00
|
|
|
removeJobsByTagsCh chan []string
|
2025-12-02 16:25:51 +00:00
|
|
|
|
|
|
|
|
// scheduler monitor from which metrics can be collected
|
|
|
|
|
schedulerMonitor SchedulerMonitor
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
2024-01-17 17:48:25 +00:00
|
|
|
type newJobIn struct {
|
|
|
|
|
ctx context.Context
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
job internalJob
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
type jobOutRequest struct {
|
|
|
|
|
id uuid.UUID
|
|
|
|
|
outChan chan internalJob
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
type runJobRequest struct {
|
|
|
|
|
id uuid.UUID
|
|
|
|
|
outChan chan error
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
type allJobsOutRequest struct {
|
|
|
|
|
outChan chan []Job
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-22 12:43:50 +00:00
|
|
|
// NewScheduler creates a new Scheduler instance.
|
|
|
|
|
// The Scheduler is not started until Start() is called.
|
|
|
|
|
//
|
|
|
|
|
// NewJob will add jobs to the Scheduler, but they will not
|
|
|
|
|
// be scheduled until Start() is called.
|
2023-11-08 17:11:42 +00:00
|
|
|
func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
|
|
|
|
|
schCtx, cancel := context.WithCancel(context.Background())
|
|
|
|
|
|
|
|
|
|
exec := executor{
|
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
|
stopTimeout: time.Second * 10,
|
2024-01-30 21:50:10 +00:00
|
|
|
singletonRunners: nil,
|
2023-11-08 17:11:42 +00:00
|
|
|
logger: &noOpLogger{},
|
2024-07-18 16:32:56 +00:00
|
|
|
clock: clockwork.NewRealClock(),
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2024-03-26 14:55:21 +00:00
|
|
|
jobsIn: make(chan jobIn),
|
|
|
|
|
jobsOutForRescheduling: make(chan uuid.UUID),
|
2025-02-17 21:43:46 +00:00
|
|
|
jobUpdateNextRuns: make(chan uuid.UUID),
|
2024-03-26 14:55:21 +00:00
|
|
|
jobsOutCompleted: make(chan uuid.UUID),
|
2025-08-27 16:03:34 +00:00
|
|
|
jobOutRequest: make(chan *jobOutRequest, 100),
|
2025-02-26 15:28:43 +00:00
|
|
|
done: make(chan error, 1),
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s := &scheduler{
|
|
|
|
|
shutdownCtx: schCtx,
|
|
|
|
|
shutdownCancel: cancel,
|
|
|
|
|
jobs: make(map[uuid.UUID]internalJob),
|
|
|
|
|
location: time.Local,
|
|
|
|
|
logger: &noOpLogger{},
|
|
|
|
|
|
2024-01-17 17:48:25 +00:00
|
|
|
newJobCh: make(chan newJobIn),
|
2023-11-08 17:11:42 +00:00
|
|
|
removeJobCh: make(chan uuid.UUID),
|
|
|
|
|
removeJobsByTagsCh: make(chan []string),
|
|
|
|
|
startCh: make(chan struct{}),
|
|
|
|
|
startedCh: make(chan struct{}),
|
|
|
|
|
stopCh: make(chan struct{}),
|
|
|
|
|
stopErrCh: make(chan error, 1),
|
2025-08-27 16:03:34 +00:00
|
|
|
jobOutRequestCh: make(chan *jobOutRequest),
|
2023-12-19 03:13:37 +00:00
|
|
|
runJobRequestCh: make(chan runJobRequest),
|
2023-11-08 17:11:42 +00:00
|
|
|
allJobsOutRequest: make(chan allJobsOutRequest),
|
|
|
|
|
}
|
2025-12-02 16:25:51 +00:00
|
|
|
exec.scheduler = s
|
|
|
|
|
s.exec = exec
|
2023-11-08 17:11:42 +00:00
|
|
|
|
|
|
|
|
for _, option := range options {
|
|
|
|
|
err := option(s)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
go func() {
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Info("gocron: new scheduler created")
|
2023-11-08 17:11:42 +00:00
|
|
|
for {
|
|
|
|
|
select {
|
2024-03-26 14:55:21 +00:00
|
|
|
case id := <-s.exec.jobsOutForRescheduling:
|
|
|
|
|
s.selectExecJobsOutForRescheduling(id)
|
2025-02-17 21:43:46 +00:00
|
|
|
case id := <-s.exec.jobUpdateNextRuns:
|
|
|
|
|
s.updateNextScheduled(id)
|
2024-03-26 14:55:21 +00:00
|
|
|
case id := <-s.exec.jobsOutCompleted:
|
|
|
|
|
s.selectExecJobsOutCompleted(id)
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2024-01-17 17:48:25 +00:00
|
|
|
case in := <-s.newJobCh:
|
|
|
|
|
s.selectNewJob(in)
|
2023-11-08 17:11:42 +00:00
|
|
|
|
|
|
|
|
case id := <-s.removeJobCh:
|
|
|
|
|
s.selectRemoveJob(id)
|
|
|
|
|
|
|
|
|
|
case tags := <-s.removeJobsByTagsCh:
|
|
|
|
|
s.selectRemoveJobsByTags(tags)
|
|
|
|
|
|
|
|
|
|
case out := <-s.exec.jobOutRequest:
|
|
|
|
|
s.selectJobOutRequest(out)
|
|
|
|
|
|
|
|
|
|
case out := <-s.jobOutRequestCh:
|
|
|
|
|
s.selectJobOutRequest(out)
|
|
|
|
|
|
|
|
|
|
case out := <-s.allJobsOutRequest:
|
|
|
|
|
s.selectAllJobsOutRequest(out)
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
case run := <-s.runJobRequestCh:
|
|
|
|
|
s.selectRunJobRequest(run)
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
case <-s.startCh:
|
|
|
|
|
s.selectStart()
|
|
|
|
|
|
|
|
|
|
case <-s.stopCh:
|
|
|
|
|
s.stopScheduler()
|
|
|
|
|
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
s.stopScheduler()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
return s, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// --------- Scheduler Channel Methods -----------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
// The scheduler's channel functions are broken out here
|
|
|
|
|
// to allow prioritizing within the select blocks. The idea
|
|
|
|
|
// being that we want to make sure that scheduling tasks
|
|
|
|
|
// are not blocked by requests from the caller for information
|
|
|
|
|
// about jobs.
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) stopScheduler() {
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Debug("gocron: stopping scheduler")
|
2025-10-09 16:19:35 +00:00
|
|
|
|
|
|
|
|
if s.started.Load() {
|
2023-11-08 17:11:42 +00:00
|
|
|
s.exec.stopCh <- struct{}{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for _, j := range s.jobs {
|
|
|
|
|
j.stop()
|
|
|
|
|
}
|
2025-02-07 16:28:32 +00:00
|
|
|
for _, j := range s.jobs {
|
2023-11-08 17:11:42 +00:00
|
|
|
<-j.ctx.Done()
|
|
|
|
|
}
|
|
|
|
|
var err error
|
2025-10-09 16:19:35 +00:00
|
|
|
if s.started.Load() {
|
2024-12-11 16:43:27 +00:00
|
|
|
t := time.NewTimer(s.exec.stopTimeout + 1*time.Second)
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case err = <-s.exec.done:
|
2024-12-11 16:43:27 +00:00
|
|
|
t.Stop()
|
|
|
|
|
case <-t.C:
|
2023-11-08 17:11:42 +00:00
|
|
|
err = ErrStopExecutorTimedOut
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-02-07 16:28:32 +00:00
|
|
|
for id, j := range s.jobs {
|
|
|
|
|
oldCtx := j.ctx
|
|
|
|
|
if j.parentCtx == nil {
|
|
|
|
|
j.parentCtx = s.shutdownCtx
|
|
|
|
|
}
|
|
|
|
|
j.ctx, j.cancel = context.WithCancel(j.parentCtx)
|
|
|
|
|
|
|
|
|
|
// also replace the old context with the new one in the parameters
|
|
|
|
|
if len(j.parameters) > 0 && j.parameters[0] == oldCtx {
|
|
|
|
|
j.parameters[0] = j.ctx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
s.stopErrCh <- err
|
2025-10-09 16:19:35 +00:00
|
|
|
s.started.Store(false)
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Debug("gocron: scheduler stopped")
|
2025-12-02 16:25:51 +00:00
|
|
|
|
|
|
|
|
// Notify monitor that scheduler has stopped
|
|
|
|
|
s.notifySchedulerStopped()
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
|
|
|
|
|
outJobs := make([]Job, len(s.jobs))
|
|
|
|
|
var counter int
|
|
|
|
|
for _, j := range s.jobs {
|
|
|
|
|
outJobs[counter] = s.jobFromInternalJob(j)
|
|
|
|
|
counter++
|
|
|
|
|
}
|
2024-01-02 16:47:01 +00:00
|
|
|
slices.SortFunc(outJobs, func(a, b Job) int {
|
|
|
|
|
aID, bID := a.ID().String(), b.ID().String()
|
2025-01-31 15:17:45 +00:00
|
|
|
return strings.Compare(aID, bID)
|
2024-01-02 16:47:01 +00:00
|
|
|
})
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case out.outChan <- outJobs:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-19 03:13:37 +00:00
|
|
|
func (s *scheduler) selectRunJobRequest(run runJobRequest) {
|
|
|
|
|
j, ok := s.jobs[run.id]
|
|
|
|
|
if !ok {
|
|
|
|
|
select {
|
|
|
|
|
case run.outChan <- ErrJobNotFound:
|
|
|
|
|
default:
|
|
|
|
|
}
|
2025-09-17 15:06:55 +00:00
|
|
|
return
|
2023-12-19 03:13:37 +00:00
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
select {
|
|
|
|
|
case run.outChan <- ErrJobRunNowFailed:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: j.id,
|
|
|
|
|
shouldSendOut: false,
|
|
|
|
|
}:
|
|
|
|
|
select {
|
|
|
|
|
case run.outChan <- nil:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
func (s *scheduler) selectRemoveJob(id uuid.UUID) {
|
|
|
|
|
j, ok := s.jobs[id]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
2025-12-02 16:25:51 +00:00
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
out := s.jobFromInternalJob(j)
|
|
|
|
|
s.notifyJobUnregistered(out)
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
j.stop()
|
|
|
|
|
delete(s.jobs, id)
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-02 16:32:14 +00:00
|
|
|
// Jobs coming back from the executor to the scheduler that
|
2024-07-24 15:24:54 +00:00
|
|
|
// need to be evaluated for rescheduling.
|
2024-03-26 14:55:21 +00:00
|
|
|
func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
|
2024-05-02 16:35:57 +00:00
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
}
|
2024-03-23 18:57:09 +00:00
|
|
|
j, ok := s.jobs[id]
|
|
|
|
|
if !ok {
|
|
|
|
|
// the job was removed while it was running, and
|
|
|
|
|
// so we don't need to reschedule it.
|
|
|
|
|
return
|
|
|
|
|
}
|
2024-07-09 14:39:51 +00:00
|
|
|
|
2024-07-19 18:10:28 +00:00
|
|
|
if j.stopTimeReached(s.now()) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-29 03:22:56 +00:00
|
|
|
var scheduleFrom time.Time
|
2023-11-08 17:11:42 +00:00
|
|
|
|
2025-10-22 03:09:46 +00:00
|
|
|
// If intervalFromCompletion is enabled, calculate the next run time
|
|
|
|
|
// from when the job completed (lastRun) rather than when it was scheduled.
|
|
|
|
|
if j.intervalFromCompletion {
|
|
|
|
|
// Use the completion time (lastRun is set when the job completes)
|
|
|
|
|
scheduleFrom = j.lastRun
|
|
|
|
|
if scheduleFrom.IsZero() {
|
|
|
|
|
// For the first run, use the start time or current time
|
|
|
|
|
scheduleFrom = j.startTime
|
|
|
|
|
if scheduleFrom.IsZero() {
|
|
|
|
|
scheduleFrom = s.now()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Default behavior: use the scheduled time
|
|
|
|
|
if len(j.nextScheduled) > 0 {
|
|
|
|
|
// always grab the last element in the slice as that is the furthest
|
|
|
|
|
// out in the future and the time from which we want to calculate
|
|
|
|
|
// the subsequent next run time.
|
|
|
|
|
slices.SortStableFunc(j.nextScheduled, ascendingTime)
|
|
|
|
|
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if scheduleFrom.IsZero() {
|
|
|
|
|
scheduleFrom = j.startTime
|
|
|
|
|
}
|
2024-07-09 14:39:51 +00:00
|
|
|
}
|
|
|
|
|
|
2024-04-06 01:56:22 +00:00
|
|
|
next := j.next(scheduleFrom)
|
2023-12-19 03:13:37 +00:00
|
|
|
if next.IsZero() {
|
2024-01-02 16:32:14 +00:00
|
|
|
// the job's next function will return zero for OneTime jobs.
|
|
|
|
|
// since they are one time only, they do not need rescheduling.
|
2023-12-19 03:13:37 +00:00
|
|
|
return
|
|
|
|
|
}
|
2024-07-09 14:39:51 +00:00
|
|
|
|
2024-01-02 16:32:14 +00:00
|
|
|
if next.Before(s.now()) {
|
|
|
|
|
// in some cases the next run time can be in the past, for example:
|
|
|
|
|
// - the time on the machine was incorrect and has been synced with ntp
|
|
|
|
|
// - the machine went to sleep, and woke up some time later
|
|
|
|
|
// in those cases, we want to increment to the next run in the future
|
|
|
|
|
// and schedule the job for that time.
|
|
|
|
|
for next.Before(s.now()) {
|
|
|
|
|
next = j.next(next)
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-09-16 14:57:39 +00:00
|
|
|
|
2025-01-29 03:22:56 +00:00
|
|
|
if slices.Contains(j.nextScheduled, next) {
|
|
|
|
|
// if the next value is a duplicate of what's already in the nextScheduled slice, for example:
|
|
|
|
|
// - the job is being rescheduled off the same next run value as before
|
|
|
|
|
// increment to the next, next value
|
|
|
|
|
for slices.Contains(j.nextScheduled, next) {
|
|
|
|
|
next = j.next(next)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-09-16 14:57:39 +00:00
|
|
|
// Clean up any existing timer to prevent leaks
|
|
|
|
|
if j.timer != nil {
|
|
|
|
|
j.timer.Stop()
|
|
|
|
|
j.timer = nil // Ensure timer is cleared for GC
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-06 01:56:22 +00:00
|
|
|
j.nextScheduled = append(j.nextScheduled, next)
|
2024-07-18 16:32:56 +00:00
|
|
|
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
|
2024-01-02 16:32:14 +00:00
|
|
|
// set the actual timer on the job here and listen for
|
|
|
|
|
// shut down events so that the job doesn't attempt to
|
|
|
|
|
// run if the scheduler has been shutdown.
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: j.id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
})
|
2024-01-02 16:32:14 +00:00
|
|
|
// update the job with its new next and last run times and timer.
|
2023-11-08 17:11:42 +00:00
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
|
2025-02-17 21:43:46 +00:00
|
|
|
func (s *scheduler) updateNextScheduled(id uuid.UUID) {
|
|
|
|
|
j, ok := s.jobs[id]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
var newNextScheduled []time.Time
|
2026-01-28 15:43:58 +00:00
|
|
|
now := s.now()
|
2025-02-17 21:43:46 +00:00
|
|
|
for _, t := range j.nextScheduled {
|
2026-01-28 15:43:58 +00:00
|
|
|
if t.After(now) { // Changed to match selectExecJobsOutCompleted
|
|
|
|
|
newNextScheduled = append(newNextScheduled, t)
|
2025-02-17 21:43:46 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
j.nextScheduled = newNextScheduled
|
|
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-26 14:55:21 +00:00
|
|
|
func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
|
|
|
|
|
j, ok := s.jobs[id]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
2024-03-26 21:29:25 +00:00
|
|
|
|
2024-06-21 14:17:34 +00:00
|
|
|
// if the job has nextScheduled time in the past,
|
2026-01-28 15:43:58 +00:00
|
|
|
// we need to remove any that are in the past or at the current time (just executed).
|
2024-06-21 14:17:34 +00:00
|
|
|
var newNextScheduled []time.Time
|
2026-01-28 15:43:58 +00:00
|
|
|
now := s.now()
|
2024-06-21 14:17:34 +00:00
|
|
|
for _, t := range j.nextScheduled {
|
2026-01-28 15:43:58 +00:00
|
|
|
if t.After(now) {
|
|
|
|
|
newNextScheduled = append(newNextScheduled, t)
|
2024-04-06 01:56:22 +00:00
|
|
|
}
|
|
|
|
|
}
|
2024-06-21 14:17:34 +00:00
|
|
|
j.nextScheduled = newNextScheduled
|
2024-04-06 01:56:22 +00:00
|
|
|
|
2024-03-26 21:29:25 +00:00
|
|
|
// if the job has a limited number of runs set, we need to
|
|
|
|
|
// check how many runs have occurred and stop running this
|
|
|
|
|
// job if it has reached the limit.
|
|
|
|
|
if j.limitRunsTo != nil {
|
|
|
|
|
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
|
|
|
|
|
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
|
|
|
|
|
go func() {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return
|
|
|
|
|
case s.removeJobCh <- id:
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-03-26 14:55:21 +00:00
|
|
|
j.lastRun = s.now()
|
|
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-27 16:03:34 +00:00
|
|
|
func (s *scheduler) selectJobOutRequest(out *jobOutRequest) {
|
2023-11-08 17:11:42 +00:00
|
|
|
if j, ok := s.jobs[out.id]; ok {
|
|
|
|
|
select {
|
|
|
|
|
case out.outChan <- j:
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
close(out.outChan)
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-17 17:48:25 +00:00
|
|
|
func (s *scheduler) selectNewJob(in newJobIn) {
|
|
|
|
|
j := in.job
|
2025-10-09 16:19:35 +00:00
|
|
|
if s.started.Load() {
|
2023-11-08 17:11:42 +00:00
|
|
|
next := j.startTime
|
|
|
|
|
if j.startImmediately {
|
|
|
|
|
next = s.now()
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: j.id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if next.IsZero() {
|
|
|
|
|
next = j.next(s.now())
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-24 16:08:17 +00:00
|
|
|
if next.Before(s.now()) {
|
|
|
|
|
for next.Before(s.now()) {
|
|
|
|
|
next = j.next(next)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
id := j.id
|
2024-07-18 16:32:56 +00:00
|
|
|
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
2024-07-09 14:39:51 +00:00
|
|
|
j.startTime = next
|
2024-04-06 01:56:22 +00:00
|
|
|
j.nextScheduled = append(j.nextScheduled, next)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.jobs[j.id] = j
|
2024-01-17 17:48:25 +00:00
|
|
|
in.cancel()
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectRemoveJobsByTags(tags []string) {
|
|
|
|
|
for _, j := range s.jobs {
|
|
|
|
|
for _, tag := range tags {
|
|
|
|
|
if slices.Contains(j.tags, tag) {
|
2025-12-02 16:25:51 +00:00
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
out := s.jobFromInternalJob(j)
|
|
|
|
|
s.notifyJobUnregistered(out)
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
j.stop()
|
|
|
|
|
delete(s.jobs, j.id)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) selectStart() {
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Debug("gocron: scheduler starting")
|
2023-11-08 17:11:42 +00:00
|
|
|
go s.exec.start()
|
|
|
|
|
|
2025-10-09 16:19:35 +00:00
|
|
|
s.started.Store(true)
|
2023-11-08 17:11:42 +00:00
|
|
|
for id, j := range s.jobs {
|
|
|
|
|
next := j.startTime
|
|
|
|
|
if j.startImmediately {
|
|
|
|
|
next = s.now()
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: id,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if next.IsZero() {
|
|
|
|
|
next = j.next(s.now())
|
|
|
|
|
}
|
2025-11-24 16:08:17 +00:00
|
|
|
if next.Before(s.now()) {
|
|
|
|
|
for next.Before(s.now()) {
|
|
|
|
|
next = j.next(next)
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
|
|
|
|
|
jobID := id
|
2024-07-18 16:32:56 +00:00
|
|
|
j.timer = s.exec.clock.AfterFunc(next.Sub(s.now()), func() {
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2023-12-19 03:13:37 +00:00
|
|
|
case s.exec.jobsIn <- jobIn{
|
|
|
|
|
id: jobID,
|
|
|
|
|
shouldSendOut: true,
|
|
|
|
|
}:
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
2024-07-09 14:39:51 +00:00
|
|
|
j.startTime = next
|
2024-04-06 01:56:22 +00:00
|
|
|
j.nextScheduled = append(j.nextScheduled, next)
|
2023-11-08 17:11:42 +00:00
|
|
|
s.jobs[id] = j
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.startedCh <- struct{}{}:
|
2023-11-28 12:48:22 +00:00
|
|
|
s.logger.Info("gocron: scheduler started")
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// ------------- Scheduler Methods ---------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) now() time.Time {
|
2024-07-18 16:32:56 +00:00
|
|
|
return s.exec.clock.Now().In(s.location)
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) jobFromInternalJob(in internalJob) job {
|
|
|
|
|
return job{
|
2024-02-02 15:46:08 +00:00
|
|
|
in.id,
|
|
|
|
|
in.name,
|
|
|
|
|
slices.Clone(in.tags),
|
|
|
|
|
s.jobOutRequestCh,
|
|
|
|
|
s.runJobRequestCh,
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Jobs() []Job {
|
|
|
|
|
outChan := make(chan []Job)
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.allJobsOutRequest <- allJobsOutRequest{outChan: outChan}:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var jobs []Job
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case jobs = <-outChan:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return jobs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) NewJob(jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) {
|
|
|
|
|
return s.addOrUpdateJob(uuid.Nil, jobDefinition, task, options)
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-17 11:56:17 +00:00
|
|
|
func (s *scheduler) verifyInterfaceVariadic(taskFunc reflect.Value, tsk task, variadicStart int) error {
|
|
|
|
|
ifaceType := taskFunc.Type().In(variadicStart).Elem()
|
|
|
|
|
for i := variadicStart; i < len(tsk.parameters); i++ {
|
|
|
|
|
if !reflect.TypeOf(tsk.parameters[i]).Implements(ifaceType) {
|
|
|
|
|
return ErrNewJobWrongTypeOfParameters
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-12 02:24:08 +00:00
|
|
|
func (s *scheduler) verifyVariadic(taskFunc reflect.Value, tsk task, variadicStart int) error {
|
|
|
|
|
if err := s.verifyNonVariadic(taskFunc, tsk, variadicStart); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2025-09-29 03:35:39 +00:00
|
|
|
parameterType := taskFunc.Type().In(variadicStart)
|
|
|
|
|
parameterTypeKind := parameterType.Elem().Kind()
|
|
|
|
|
if parameterTypeKind == reflect.Interface {
|
2024-07-17 11:56:17 +00:00
|
|
|
return s.verifyInterfaceVariadic(taskFunc, tsk, variadicStart)
|
|
|
|
|
}
|
2025-09-29 03:35:39 +00:00
|
|
|
if parameterTypeKind == reflect.Pointer {
|
|
|
|
|
parameterTypeKind = reflect.Indirect(reflect.ValueOf(parameterType)).Kind()
|
2024-07-12 02:24:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for i := variadicStart; i < len(tsk.parameters); i++ {
|
2025-09-29 03:35:39 +00:00
|
|
|
argumentType := reflect.TypeOf(tsk.parameters[i])
|
|
|
|
|
argumentTypeKind := argumentType.Kind()
|
|
|
|
|
if argumentTypeKind == reflect.Interface || argumentTypeKind == reflect.Pointer {
|
|
|
|
|
argumentTypeKind = argumentType.Elem().Kind()
|
2024-07-12 02:24:08 +00:00
|
|
|
}
|
2025-09-29 03:35:39 +00:00
|
|
|
if argumentTypeKind != parameterTypeKind {
|
2024-07-12 02:24:08 +00:00
|
|
|
return ErrNewJobWrongTypeOfParameters
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) verifyNonVariadic(taskFunc reflect.Value, tsk task, length int) error {
|
|
|
|
|
for i := 0; i < length; i++ {
|
2025-09-29 03:35:39 +00:00
|
|
|
argumentType := reflect.TypeOf(tsk.parameters[i])
|
|
|
|
|
t1 := argumentType.Kind()
|
2024-07-12 02:24:08 +00:00
|
|
|
if t1 == reflect.Interface || t1 == reflect.Pointer {
|
2025-09-29 03:35:39 +00:00
|
|
|
t1 = argumentType.Elem().Kind()
|
2024-07-12 02:24:08 +00:00
|
|
|
}
|
2025-09-29 03:35:39 +00:00
|
|
|
parameterType := taskFunc.Type().In(i)
|
|
|
|
|
t2 := reflect.New(parameterType).Elem().Kind()
|
2024-07-12 02:24:08 +00:00
|
|
|
if t2 == reflect.Interface || t2 == reflect.Pointer {
|
2025-09-29 03:35:39 +00:00
|
|
|
t2 = reflect.Indirect(reflect.ValueOf(parameterType)).Kind()
|
2024-07-12 02:24:08 +00:00
|
|
|
}
|
|
|
|
|
if t1 != t2 {
|
|
|
|
|
return ErrNewJobWrongTypeOfParameters
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) verifyParameterType(taskFunc reflect.Value, tsk task) error {
|
2025-09-29 03:35:39 +00:00
|
|
|
taskFuncType := taskFunc.Type()
|
|
|
|
|
isVariadic := taskFuncType.IsVariadic()
|
2024-07-12 02:24:08 +00:00
|
|
|
if isVariadic {
|
2025-09-29 03:35:39 +00:00
|
|
|
variadicStart := taskFuncType.NumIn() - 1
|
2024-07-12 02:24:08 +00:00
|
|
|
return s.verifyVariadic(taskFunc, tsk, variadicStart)
|
|
|
|
|
}
|
2025-09-29 03:35:39 +00:00
|
|
|
expectedParameterLength := taskFuncType.NumIn()
|
2024-07-12 02:24:08 +00:00
|
|
|
if len(tsk.parameters) != expectedParameterLength {
|
|
|
|
|
return ErrNewJobWrongNumberOfParameters
|
|
|
|
|
}
|
|
|
|
|
return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength)
|
|
|
|
|
}
|
|
|
|
|
|
2025-12-02 16:25:51 +00:00
|
|
|
var contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
|
2025-09-29 03:35:39 +00:00
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) {
|
|
|
|
|
j := internalJob{}
|
|
|
|
|
if id == uuid.Nil {
|
|
|
|
|
j.id = uuid.New()
|
|
|
|
|
} else {
|
|
|
|
|
currentJob := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh)
|
|
|
|
|
if currentJob != nil && currentJob.id != uuid.Nil {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return nil, nil
|
|
|
|
|
case s.removeJobCh <- id:
|
|
|
|
|
<-currentJob.ctx.Done()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
j.id = id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if taskWrapper == nil {
|
|
|
|
|
return nil, ErrNewJobTaskNil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsk := taskWrapper()
|
|
|
|
|
taskFunc := reflect.ValueOf(tsk.function)
|
|
|
|
|
for taskFunc.Kind() == reflect.Ptr {
|
|
|
|
|
taskFunc = taskFunc.Elem()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if taskFunc.Kind() != reflect.Func {
|
2023-11-09 21:04:18 +00:00
|
|
|
return nil, ErrNewJobTaskNotFunc
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
2023-11-14 15:56:05 +00:00
|
|
|
j.name = runtime.FuncForPC(taskFunc.Pointer()).Name()
|
2023-11-08 17:11:42 +00:00
|
|
|
j.function = tsk.function
|
|
|
|
|
j.parameters = tsk.parameters
|
|
|
|
|
|
|
|
|
|
// apply global job options
|
|
|
|
|
for _, option := range s.globalJobOptions {
|
2024-07-18 16:32:56 +00:00
|
|
|
if err := option(&j, s.now()); err != nil {
|
2023-11-08 17:11:42 +00:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// apply job specific options, which take precedence
|
|
|
|
|
for _, option := range options {
|
2024-07-18 16:32:56 +00:00
|
|
|
if err := option(&j, s.now()); err != nil {
|
2023-11-08 17:11:42 +00:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2025-01-23 18:27:28 +00:00
|
|
|
if j.parentCtx == nil {
|
|
|
|
|
j.parentCtx = s.shutdownCtx
|
|
|
|
|
}
|
|
|
|
|
j.ctx, j.cancel = context.WithCancel(j.parentCtx)
|
|
|
|
|
|
|
|
|
|
if !taskFunc.IsZero() && taskFunc.Type().NumIn() > 0 {
|
|
|
|
|
// if the first parameter is a context.Context and params have no context.Context, add current ctx to the params
|
2025-09-29 03:35:39 +00:00
|
|
|
if taskFunc.Type().In(0) == contextType {
|
2025-01-23 18:27:28 +00:00
|
|
|
if len(tsk.parameters) == 0 {
|
|
|
|
|
tsk.parameters = []any{j.ctx}
|
|
|
|
|
j.parameters = []any{j.ctx}
|
|
|
|
|
} else if _, ok := tsk.parameters[0].(context.Context); !ok {
|
|
|
|
|
tsk.parameters = append([]any{j.ctx}, tsk.parameters...)
|
|
|
|
|
j.parameters = append([]any{j.ctx}, j.parameters...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := s.verifyParameterType(taskFunc, tsk); err != nil {
|
2025-07-21 17:28:36 +00:00
|
|
|
j.cancel()
|
2025-01-23 18:27:28 +00:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-18 16:32:56 +00:00
|
|
|
if err := definition.setup(&j, s.location, s.exec.clock.Now()); err != nil {
|
2025-07-21 17:28:36 +00:00
|
|
|
j.cancel()
|
2023-11-08 17:11:42 +00:00
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
2024-01-17 17:48:25 +00:00
|
|
|
newJobCtx, newJobCancel := context.WithCancel(context.Background())
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2025-07-21 17:28:36 +00:00
|
|
|
newJobCancel()
|
2024-01-17 17:48:25 +00:00
|
|
|
case s.newJobCh <- newJobIn{
|
|
|
|
|
ctx: newJobCtx,
|
|
|
|
|
cancel: newJobCancel,
|
|
|
|
|
job: j,
|
|
|
|
|
}:
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
2024-01-17 17:48:25 +00:00
|
|
|
case <-newJobCtx.Done():
|
2023-11-08 17:11:42 +00:00
|
|
|
case <-s.shutdownCtx.Done():
|
2025-07-21 17:28:36 +00:00
|
|
|
newJobCancel()
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
2024-07-01 14:28:04 +00:00
|
|
|
out := s.jobFromInternalJob(j)
|
2025-12-02 16:25:51 +00:00
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.notifyJobRegistered(out)
|
|
|
|
|
}
|
2024-07-01 14:28:04 +00:00
|
|
|
return &out, nil
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) RemoveByTags(tags ...string) {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.removeJobsByTagsCh <- tags:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) RemoveJob(id uuid.UUID) error {
|
|
|
|
|
j := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh)
|
|
|
|
|
if j == nil || j.id == uuid.Nil {
|
|
|
|
|
return ErrJobNotFound
|
|
|
|
|
}
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
case s.removeJobCh <- id:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Start() {
|
2025-11-26 19:38:55 +00:00
|
|
|
if s.started.Load() {
|
|
|
|
|
s.logger.Warn("gocron: scheduler already started")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
2025-12-02 16:25:51 +00:00
|
|
|
// Scheduler already shut down, don't notify
|
|
|
|
|
return
|
2023-11-08 17:11:42 +00:00
|
|
|
case s.startCh <- struct{}{}:
|
2025-12-02 16:25:51 +00:00
|
|
|
<-s.startedCh // Wait for scheduler to actually start
|
|
|
|
|
|
|
|
|
|
// Scheduler has started
|
|
|
|
|
s.notifySchedulerStarted()
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) StopJobs() error {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.shutdownCtx.Done():
|
|
|
|
|
return nil
|
|
|
|
|
case s.stopCh <- struct{}{}:
|
|
|
|
|
}
|
2024-12-11 16:43:27 +00:00
|
|
|
|
|
|
|
|
t := time.NewTimer(s.exec.stopTimeout + 2*time.Second)
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case err := <-s.stopErrCh:
|
2024-12-11 16:43:27 +00:00
|
|
|
t.Stop()
|
2023-11-08 17:11:42 +00:00
|
|
|
return err
|
2024-12-11 16:43:27 +00:00
|
|
|
case <-t.C:
|
2023-11-08 17:11:42 +00:00
|
|
|
return ErrStopSchedulerTimedOut
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Shutdown() error {
|
2025-10-09 16:19:35 +00:00
|
|
|
s.logger.Debug("scheduler shutting down")
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
s.shutdownCancel()
|
2025-10-09 16:19:35 +00:00
|
|
|
if !s.started.Load() {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2024-12-11 16:43:27 +00:00
|
|
|
|
|
|
|
|
t := time.NewTimer(s.exec.stopTimeout + 2*time.Second)
|
2023-11-08 17:11:42 +00:00
|
|
|
select {
|
|
|
|
|
case err := <-s.stopErrCh:
|
2024-12-11 16:43:27 +00:00
|
|
|
t.Stop()
|
2025-12-02 16:25:51 +00:00
|
|
|
|
|
|
|
|
// notify monitor that scheduler stopped
|
|
|
|
|
s.notifySchedulerShutdown()
|
2023-11-08 17:11:42 +00:00
|
|
|
return err
|
2024-12-11 16:43:27 +00:00
|
|
|
case <-t.C:
|
2023-11-08 17:11:42 +00:00
|
|
|
return ErrStopSchedulerTimedOut
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) {
|
|
|
|
|
return s.addOrUpdateJob(id, jobDefinition, task, options)
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-30 19:32:20 +00:00
|
|
|
func (s *scheduler) JobsWaitingInQueue() int {
|
|
|
|
|
if s.exec.limitMode != nil && s.exec.limitMode.mode == LimitModeWait {
|
|
|
|
|
return len(s.exec.limitMode.in)
|
|
|
|
|
}
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// ------------- Scheduler Options ---------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
// -----------------------------------------------
|
|
|
|
|
|
|
|
|
|
// SchedulerOption defines the function for setting
|
|
|
|
|
// options on the Scheduler.
|
|
|
|
|
type SchedulerOption func(*scheduler) error
|
|
|
|
|
|
2023-11-09 21:04:18 +00:00
|
|
|
// WithClock sets the clock used by the Scheduler
|
|
|
|
|
// to the clock provided. See https://github.com/jonboulle/clockwork
|
|
|
|
|
func WithClock(clock clockwork.Clock) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if clock == nil {
|
|
|
|
|
return ErrWithClockNil
|
|
|
|
|
}
|
2024-07-18 16:32:56 +00:00
|
|
|
s.exec.clock = clock
|
2023-11-09 21:04:18 +00:00
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
// WithDistributedElector sets the elector to be used by multiple
|
|
|
|
|
// Scheduler instances to determine who should be the leader.
|
|
|
|
|
// Only the leader runs jobs, while non-leaders wait and continue
|
|
|
|
|
// to check if a new leader has been elected.
|
|
|
|
|
func WithDistributedElector(elector Elector) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if elector == nil {
|
2023-11-09 21:04:18 +00:00
|
|
|
return ErrWithDistributedElectorNil
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
s.exec.elector = elector
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-14 15:56:05 +00:00
|
|
|
// WithDistributedLocker sets the locker to be used by multiple
|
|
|
|
|
// Scheduler instances to ensure that only one instance of each
|
|
|
|
|
// job is run.
|
2025-01-03 21:12:34 +00:00
|
|
|
// To disable this global locker for specific jobs, see
|
|
|
|
|
// WithDisabledDistributedJobLocker.
|
2023-11-14 15:56:05 +00:00
|
|
|
func WithDistributedLocker(locker Locker) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if locker == nil {
|
|
|
|
|
return ErrWithDistributedLockerNil
|
|
|
|
|
}
|
|
|
|
|
s.exec.locker = locker
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-08 17:11:42 +00:00
|
|
|
// WithGlobalJobOptions sets JobOption's that will be applied to
|
|
|
|
|
// all jobs added to the scheduler. JobOption's set on the job
|
|
|
|
|
// itself will override if the same JobOption is set globally.
|
|
|
|
|
func WithGlobalJobOptions(jobOptions ...JobOption) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
s.globalJobOptions = jobOptions
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// LimitMode defines the modes used for handling jobs that reach
|
|
|
|
|
// the limit provided in WithLimitConcurrentJobs
|
|
|
|
|
type LimitMode int
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// LimitModeReschedule causes jobs reaching the limit set in
|
|
|
|
|
// WithLimitConcurrentJobs or WithSingletonMode to be skipped
|
|
|
|
|
// and rescheduled for the next run time rather than being
|
|
|
|
|
// queued up to wait.
|
|
|
|
|
LimitModeReschedule = 1
|
|
|
|
|
|
|
|
|
|
// LimitModeWait causes jobs reaching the limit set in
|
|
|
|
|
// WithLimitConcurrentJobs or WithSingletonMode to wait
|
|
|
|
|
// in a queue until a slot becomes available to run.
|
|
|
|
|
//
|
|
|
|
|
// Note: this mode can produce unpredictable results as
|
|
|
|
|
// job execution order isn't guaranteed. For example, a job that
|
|
|
|
|
// executes frequently may pile up in the wait queue and be executed
|
|
|
|
|
// many times back to back when the queue opens.
|
|
|
|
|
//
|
|
|
|
|
// Warning: do not use this mode if your jobs will continue to stack
|
|
|
|
|
// up beyond the ability of the limit workers to keep up. An example of
|
|
|
|
|
// what NOT to do:
|
|
|
|
|
//
|
|
|
|
|
// s, _ := gocron.NewScheduler(gocron.WithLimitConcurrentJobs)
|
|
|
|
|
// s.NewJob(
|
|
|
|
|
// gocron.DurationJob(
|
|
|
|
|
// time.Second,
|
|
|
|
|
// Task{
|
|
|
|
|
// Function: func() {
|
|
|
|
|
// time.Sleep(10 * time.Second)
|
|
|
|
|
// },
|
|
|
|
|
// },
|
|
|
|
|
// ),
|
|
|
|
|
// )
|
|
|
|
|
LimitModeWait = 2
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// WithLimitConcurrentJobs sets the limit and mode to be used by the
|
|
|
|
|
// Scheduler for limiting the number of jobs that may be running at
|
|
|
|
|
// a given time.
|
2024-05-06 15:26:56 +00:00
|
|
|
//
|
|
|
|
|
// Note: the limit mode selected for WithLimitConcurrentJobs takes initial
|
|
|
|
|
// precedence in the event you are also running a limit mode at the job level
|
|
|
|
|
// using WithSingletonMode.
|
|
|
|
|
//
|
|
|
|
|
// Warning: a single time consuming job can dominate your limit in the event
|
|
|
|
|
// you are running both the scheduler limit WithLimitConcurrentJobs(1, LimitModeWait)
|
|
|
|
|
// and a job limit WithSingletonMode(LimitModeReschedule).
|
2023-11-08 17:11:42 +00:00
|
|
|
func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
2023-11-09 21:04:18 +00:00
|
|
|
if limit == 0 {
|
|
|
|
|
return ErrWithLimitConcurrentJobsZero
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
s.exec.limitMode = &limitModeConfig{
|
2023-11-28 12:48:22 +00:00
|
|
|
mode: mode,
|
|
|
|
|
limit: limit,
|
2023-12-19 03:13:37 +00:00
|
|
|
in: make(chan jobIn, 1000),
|
2023-11-28 12:48:22 +00:00
|
|
|
singletonJobs: make(map[uuid.UUID]struct{}),
|
2023-11-08 17:11:42 +00:00
|
|
|
}
|
|
|
|
|
if mode == LimitModeReschedule {
|
|
|
|
|
s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithLocation sets the location (i.e. timezone) that the scheduler
|
|
|
|
|
// should operate within. In many systems time.Local is UTC.
|
|
|
|
|
// Default: time.Local
|
|
|
|
|
func WithLocation(location *time.Location) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if location == nil {
|
|
|
|
|
return ErrWithLocationNil
|
|
|
|
|
}
|
|
|
|
|
s.location = location
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2023-11-22 12:43:50 +00:00
|
|
|
// WithLogger sets the logger to be used by the Scheduler.
|
2023-11-08 17:11:42 +00:00
|
|
|
func WithLogger(logger Logger) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if logger == nil {
|
|
|
|
|
return ErrWithLoggerNil
|
|
|
|
|
}
|
|
|
|
|
s.logger = logger
|
|
|
|
|
s.exec.logger = logger
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WithStopTimeout sets the amount of time the Scheduler should
|
|
|
|
|
// wait gracefully for jobs to complete before returning when
|
|
|
|
|
// StopJobs() or Shutdown() are called.
|
|
|
|
|
// Default: 10 * time.Second
|
|
|
|
|
func WithStopTimeout(timeout time.Duration) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
2023-11-09 21:04:18 +00:00
|
|
|
if timeout <= 0 {
|
|
|
|
|
return ErrWithStopTimeoutZeroOrNegative
|
|
|
|
|
}
|
2023-11-08 17:11:42 +00:00
|
|
|
s.exec.stopTimeout = timeout
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-01-17 21:39:11 +00:00
|
|
|
|
|
|
|
|
// WithMonitor sets the metrics provider to be used by the Scheduler.
|
|
|
|
|
func WithMonitor(monitor Monitor) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if monitor == nil {
|
|
|
|
|
return ErrWithMonitorNil
|
|
|
|
|
}
|
|
|
|
|
s.exec.monitor = monitor
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2024-12-12 04:23:57 +00:00
|
|
|
|
|
|
|
|
// WithMonitorStatus sets the metrics provider to be used by the Scheduler.
|
|
|
|
|
func WithMonitorStatus(monitor MonitorStatus) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if monitor == nil {
|
|
|
|
|
return ErrWithMonitorNil
|
|
|
|
|
}
|
|
|
|
|
s.exec.monitorStatus = monitor
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
2025-12-02 16:25:51 +00:00
|
|
|
|
|
|
|
|
// WithSchedulerMonitor sets a monitor that will be called with scheduler-level events.
|
|
|
|
|
func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption {
|
|
|
|
|
return func(s *scheduler) error {
|
|
|
|
|
if monitor == nil {
|
|
|
|
|
return ErrSchedulerMonitorNil
|
|
|
|
|
}
|
|
|
|
|
s.schedulerMonitor = monitor
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifySchedulerStarted notifies the monitor that scheduler has started
|
|
|
|
|
func (s *scheduler) notifySchedulerStarted() {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.SchedulerStarted()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifySchedulerShutdown notifies the monitor that scheduler has stopped
|
|
|
|
|
func (s *scheduler) notifySchedulerShutdown() {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.SchedulerShutdown()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobRegistered notifies the monitor that a job has been registered
|
|
|
|
|
func (s *scheduler) notifyJobRegistered(job Job) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobRegistered(job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobUnregistered notifies the monitor that a job has been unregistered
|
|
|
|
|
func (s *scheduler) notifyJobUnregistered(job Job) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobUnregistered(job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobStarted notifies the monitor that a job has started
|
|
|
|
|
func (s *scheduler) notifyJobStarted(job Job) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobStarted(job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobRunning notifies the monitor that a job is running.
|
|
|
|
|
func (s *scheduler) notifyJobRunning(job Job) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobRunning(job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobCompleted notifies the monitor that a job has completed.
|
|
|
|
|
func (s *scheduler) notifyJobCompleted(job Job) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobCompleted(job)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobFailed notifies the monitor that a job has failed.
|
|
|
|
|
func (s *scheduler) notifyJobFailed(job Job, err error) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobFailed(job, err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifySchedulerStopped notifies the monitor that the scheduler has stopped
|
|
|
|
|
func (s *scheduler) notifySchedulerStopped() {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.SchedulerStopped()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobExecutionTime notifies the monitor of a job's execution time
|
|
|
|
|
func (s *scheduler) notifyJobExecutionTime(job Job, duration time.Duration) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobExecutionTime(job, duration)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyJobSchedulingDelay notifies the monitor of scheduling delay
|
|
|
|
|
func (s *scheduler) notifyJobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.JobSchedulingDelay(job, scheduledTime, actualStartTime)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// notifyConcurrencyLimitReached notifies the monitor that a concurrency limit was reached
|
|
|
|
|
func (s *scheduler) notifyConcurrencyLimitReached(limitType string, job Job) {
|
|
|
|
|
if s.schedulerMonitor != nil {
|
|
|
|
|
s.schedulerMonitor.ConcurrencyLimitReached(limitType, job)
|
|
|
|
|
}
|
|
|
|
|
}
|