diff --git a/README.md b/README.md index ae4b7bc..08e58e0 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,8 @@ Jobs can be run every x days at specific times. Jobs can be run every x weeks on specific days of the week and at specific times. - [**Monthly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonthlyJob): Jobs can be run every x months on specific days of the month and at specific times. +- [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob): +Jobs can be run once at a specific time. These are non-recurring jobs. ### Concurrency Limits Jobs can be limited individually or across the entire scheduler. diff --git a/errors.go b/errors.go index 7549135..2c9f7c2 100644 --- a/errors.go +++ b/errors.go @@ -12,6 +12,7 @@ var ( ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") ErrJobNotFound = fmt.Errorf("gocron: job not found") + ErrJobRunNowFailed = fmt.Errorf("gocron: Job: RunNow: scheduler unreachable") ErrMonthlyJobDays = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must be between 31 and -31 inclusive, and not 0") ErrMonthlyJobAtTimeNil = fmt.Errorf("gocron: MonthlyJob: atTime within atTimes must not be nil") ErrMonthlyJobAtTimesNil = fmt.Errorf("gocron: MonthlyJob: atTimes must not be nil") @@ -22,6 +23,7 @@ var ( ErrNewJobTaskNotFunc = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func") ErrNewJobWrongNumberOfParameters = fmt.Errorf("gocron: NewJob: Number of provided parameters does not match expected") ErrNewJobWrongTypeOfParameters = fmt.Errorf("gocron: NewJob: Type of provided parameters does not match expected") + ErrOneTimeJobStartDateTimePast = fmt.Errorf("gocron: OneTimeJob: start must not be in the past") ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop") ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish") ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop") diff --git a/example_test.go b/example_test.go index af40a97..1f613ee 100644 --- a/example_test.go +++ b/example_test.go @@ -196,6 +196,28 @@ func ExampleJob_NextRun() { fmt.Println(j.NextRun()) } +func ExampleJob_RunNow() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + MonthlyJob( + 1, + NewDaysOfTheMonth(3, -5, -1), + NewAtTimes( + NewAtTime(10, 30, 0), + NewAtTime(11, 15, 0), + ), + ), + NewTask( + func() {}, + ), + ) + s.Start() + // Runs the job one time now, without impacting the schedule + _ = j.RunNow() +} + func ExampleMonthlyJob() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() @@ -222,6 +244,32 @@ func ExampleNewScheduler() { fmt.Println(s.Jobs()) } +func ExampleOneTimeJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + // run a job once, immediately + _, _ = s.NewJob( + OneTimeJob( + OneTimeJobStartImmediately(), + ), + NewTask( + func() {}, + ), + ) + // run a job once in 10 seconds + _, _ = s.NewJob( + OneTimeJob( + OneTimeJobStartDateTime(time.Now().Add(10*time.Second)), + ), + NewTask( + func() {}, + ), + ) + + s.Start() +} + func ExampleScheduler_NewJob() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() diff --git a/executor.go b/executor.go index d13d5aa..1f013f5 100644 --- a/executor.go +++ b/executor.go @@ -14,7 +14,7 @@ type executor struct { cancel context.CancelFunc logger Logger stopCh chan struct{} - jobsIDsIn chan uuid.UUID + jobsIn chan jobIn jobIDsOut chan uuid.UUID jobOutRequest chan jobOutRequest stopTimeout time.Duration @@ -25,8 +25,13 @@ type executor struct { locker Locker } +type jobIn struct { + id uuid.UUID + shouldSendOut bool +} + type singletonRunner struct { - in chan uuid.UUID + in chan jobIn rescheduleLimiter chan struct{} } @@ -35,7 +40,7 @@ type limitModeConfig struct { mode LimitMode limit uint rescheduleLimiter chan struct{} - in chan uuid.UUID + in chan jobIn // singletonJobs is used to track singleton jobs that are running // in the limit mode runner. This is used to prevent the same job // from running multiple times across limit mode runners when both @@ -72,7 +77,7 @@ func (e *executor) start() { // are run immediately. // 2. sent from time.AfterFuncs in which job schedules // are spun up by the scheduler - case id := <-e.jobsIDsIn: + case jIn := <-e.jobsIn: select { case <-e.stopCh: e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg) @@ -111,14 +116,16 @@ func (e *executor) start() { // the executor from building up a waiting queue // and forces rescheduling case e.limitMode.rescheduleLimiter <- struct{}{}: - e.limitMode.in <- id + e.limitMode.in <- jIn default: // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - select { - case e.jobIDsOut <- id: - default: + if jIn.shouldSendOut { + select { + case e.jobIDsOut <- jIn.id: + default: + } } } } else { @@ -127,7 +134,7 @@ func (e *executor) start() { // to work through the channel backlog. A hard limit of 1000 is in place // at which point this call would block. // TODO when metrics are added, this should increment a wait metric - e.limitMode.in <- id + e.limitMode.in <- jIn } } else { // no limit mode, so we're either running a regular job or @@ -135,7 +142,7 @@ func (e *executor) start() { // // get the job, so we can figure out what kind it is and how // to execute it - j := requestJobCtx(ctx, id, e.jobOutRequest) + j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) if j == nil { // safety check as it'd be strange bug if this occurred return @@ -143,15 +150,15 @@ func (e *executor) start() { if j.singletonMode { // for singleton mode, get the existing runner for the job // or spin up a new one - runner, ok := e.singletonRunners[id] + runner, ok := e.singletonRunners[jIn.id] if !ok { - runner.in = make(chan uuid.UUID, 1000) + runner.in = make(chan jobIn, 1000) if j.singletonLimitMode == LimitModeReschedule { runner.rescheduleLimiter = make(chan struct{}, 1) } - e.singletonRunners[id] = runner + e.singletonRunners[jIn.id] = runner singletonJobsWg.Add(1) - go e.singletonModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) + go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) } if j.singletonLimitMode == LimitModeReschedule { @@ -159,19 +166,21 @@ func (e *executor) start() { // for a running job and reschedules if the channel is full. select { case runner.rescheduleLimiter <- struct{}{}: - runner.in <- id + runner.in <- jIn default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - select { - case e.jobIDsOut <- id: - default: + if jIn.shouldSendOut { + select { + case e.jobIDsOut <- jIn.id: + default: + } } } } else { // wait mode, fill up that queue (buffered channel, so it's ok) - runner.in <- id + runner.in <- jIn } } else { select { @@ -187,7 +196,7 @@ func (e *executor) start() { // complete. standardJobsWg.Add(1) go func(j internalJob) { - e.runJob(j) + e.runJob(j, jIn.shouldSendOut) standardJobsWg.Done() }(*j) } @@ -200,11 +209,11 @@ func (e *executor) start() { } } -func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { +func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { e.logger.Debug("gocron: limitModeRunner starting", "name", name) for { select { - case id := <-in: + case jIn := <-in: select { case <-e.ctx.Done(): e.logger.Debug("gocron: limitModeRunner shutting down", "name", name) @@ -214,24 +223,28 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup } ctx, cancel := context.WithCancel(e.ctx) - j := requestJobCtx(ctx, id, e.jobOutRequest) + j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) cancel() if j != nil { if j.singletonMode { e.limitMode.singletonJobsMu.Lock() - _, ok := e.limitMode.singletonJobs[id] + _, ok := e.limitMode.singletonJobs[jIn.id] if ok { // this job is already running, so don't run it // but instead reschedule it e.limitMode.singletonJobsMu.Unlock() - select { - case <-e.ctx.Done(): - return - case <-j.ctx.Done(): - return - case e.jobIDsOut <- j.id: + if jIn.shouldSendOut { + select { + case <-e.ctx.Done(): + return + case <-j.ctx.Done(): + return + case e.jobIDsOut <- j.id: + } } - // remove the limiter block to allow another job to be scheduled + // remove the limiter block, as this particular job + // was a singleton already running, and we want to + // allow another job to be scheduled if limitMode == LimitModeReschedule { select { case <-rescheduleLimiter: @@ -240,14 +253,14 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup } continue } - e.limitMode.singletonJobs[id] = struct{}{} + e.limitMode.singletonJobs[jIn.id] = struct{}{} e.limitMode.singletonJobsMu.Unlock() } - e.runJob(*j) + e.runJob(*j, jIn.shouldSendOut) if j.singletonMode { e.limitMode.singletonJobsMu.Lock() - delete(e.limitMode.singletonJobs, id) + delete(e.limitMode.singletonJobs, jIn.id) e.limitMode.singletonJobsMu.Unlock() } } @@ -267,24 +280,24 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup } } -func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { - e.logger.Debug("gocron: limitModeRunner starting", "name", name) +func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { + e.logger.Debug("gocron: singletonModeRunner starting", "name", name) for { select { - case id := <-in: + case jIn := <-in: select { case <-e.ctx.Done(): - e.logger.Debug("gocron: limitModeRunner shutting down", "name", name) + e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name) wg.Done() return default: } ctx, cancel := context.WithCancel(e.ctx) - j := requestJobCtx(ctx, id, e.jobOutRequest) + j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) cancel() if j != nil { - e.runJob(*j) + e.runJob(*j, jIn.shouldSendOut) } // remove the limiter block to allow another job to be scheduled @@ -295,14 +308,14 @@ func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitG } } case <-e.ctx.Done(): - e.logger.Debug("limitModeRunner shutting down", "name", name) + e.logger.Debug("singletonModeRunner shutting down", "name", name) wg.Done() return } } } -func (e *executor) runJob(j internalJob) { +func (e *executor) runJob(j internalJob, shouldSendOut bool) { if j.ctx == nil { return } @@ -327,12 +340,14 @@ func (e *executor) runJob(j internalJob) { } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) - select { - case <-e.ctx.Done(): - return - case <-j.ctx.Done(): - return - case e.jobIDsOut <- j.id: + if shouldSendOut { + select { + case <-e.ctx.Done(): + return + case <-j.ctx.Done(): + return + case e.jobIDsOut <- j.id: + } } err := callJobFuncWithParams(j.function, j.parameters...) diff --git a/job.go b/job.go index cd5eecb..9d76501 100644 --- a/job.go +++ b/job.go @@ -426,6 +426,48 @@ func MonthlyJob(interval uint, daysOfTheMonth DaysOfTheMonth, atTimes AtTimes) J } } +var _ JobDefinition = (*oneTimeJobDefinition)(nil) + +type oneTimeJobDefinition struct { + startAt OneTimeJobStartAtOption +} + +func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location) error { + j.jobSchedule = oneTimeJob{} + return o.startAt(j) +} + +// OneTimeJobStartAtOption defines when the one time job is run +type OneTimeJobStartAtOption func(*internalJob) error + +// OneTimeJobStartImmediately tells the scheduler to run the one time job immediately. +func OneTimeJobStartImmediately() OneTimeJobStartAtOption { + return func(j *internalJob) error { + j.startImmediately = true + return nil + } +} + +// OneTimeJobStartDateTime sets the date & time at which the job should run. +// This datetime must be in the future. +func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption { + return func(j *internalJob) error { + if start.IsZero() || start.Before(time.Now()) { + return ErrOneTimeJobStartDateTimePast + } + j.startTime = start + return nil + } +} + +// OneTimeJob is to run a job once at a specified time and not on +// any regular schedule. +func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition { + return oneTimeJobDefinition{ + startAt: startAt, + } +} + // ----------------------------------------------- // ----------------------------------------------- // ----------------- Job Options ----------------- @@ -772,6 +814,14 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass return time.Time{} } +var _ jobSchedule = (*oneTimeJob)(nil) + +type oneTimeJob struct{} + +func (o oneTimeJob) next(_ time.Time) time.Time { + return time.Time{} +} + // ----------------------------------------------- // ----------------------------------------------- // ---------------- Job Interface ---------------- @@ -786,6 +836,7 @@ type Job interface { Name() string NextRun() (time.Time, error) Tags() []string + RunNow() error } var _ Job = (*job)(nil) @@ -799,6 +850,7 @@ type job struct { name string tags []string jobOutRequest chan jobOutRequest + runJobRequest chan runJobRequest } // ID returns the job's unique identifier. @@ -833,3 +885,29 @@ func (j job) NextRun() (time.Time, error) { func (j job) Tags() []string { return j.tags } + +// RunNow runs the job once, now. This does not alter +// the existing run schedule, and will respect all job +// and scheduler limits. +func (j job) RunNow() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + resp := make(chan error, 1) + + select { + case j.runJobRequest <- runJobRequest{ + id: j.id, + outChan: resp, + }: + case <-time.After(100 * time.Millisecond): + return ErrJobRunNowFailed + } + var err error + select { + case <-ctx.Done(): + return ErrJobRunNowFailed + case errReceived := <-resp: + err = errReceived + } + return err +} diff --git a/job_test.go b/job_test.go index 47a9519..fae68db 100644 --- a/job_test.go +++ b/job_test.go @@ -319,6 +319,25 @@ func TestDurationRandomJob_next(t *testing.T) { } } +func TestOneTimeJob_next(t *testing.T) { + otj := oneTimeJob{} + assert.Zero(t, otj.next(time.Time{})) +} + +func TestJob_RunNow_Error(t *testing.T) { + s := newTestScheduler(t) + + j, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + require.NoError(t, s.Shutdown()) + + assert.EqualError(t, j.RunNow(), ErrJobRunNowFailed.Error()) +} + func TestJob_LastRun(t *testing.T) { testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local) fakeClock := clockwork.NewFakeClockAt(testTime) diff --git a/scheduler.go b/scheduler.go index 644a70e..fd61343 100644 --- a/scheduler.go +++ b/scheduler.go @@ -49,6 +49,7 @@ type scheduler struct { stopErrCh chan error allJobsOutRequest chan allJobsOutRequest jobOutRequestCh chan jobOutRequest + runJobRequestCh chan runJobRequest newJobCh chan internalJob removeJobCh chan uuid.UUID removeJobsByTagsCh chan []string @@ -59,6 +60,11 @@ type jobOutRequest struct { outChan chan internalJob } +type runJobRequest struct { + id uuid.UUID + outChan chan error +} + type allJobsOutRequest struct { outChan chan []Job } @@ -77,7 +83,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { singletonRunners: make(map[uuid.UUID]singletonRunner), logger: &noOpLogger{}, - jobsIDsIn: make(chan uuid.UUID), + jobsIn: make(chan jobIn), jobIDsOut: make(chan uuid.UUID), jobOutRequest: make(chan jobOutRequest, 1000), done: make(chan error), @@ -100,6 +106,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { stopCh: make(chan struct{}), stopErrCh: make(chan error, 1), jobOutRequestCh: make(chan jobOutRequest), + runJobRequestCh: make(chan runJobRequest), allJobsOutRequest: make(chan allJobsOutRequest), } @@ -135,6 +142,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { case out := <-s.allJobsOutRequest: s.selectAllJobsOutRequest(out) + case run := <-s.runJobRequestCh: + s.selectRunJobRequest(run) + case <-s.startCh: s.selectStart() @@ -204,6 +214,31 @@ func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { } } +func (s *scheduler) selectRunJobRequest(run runJobRequest) { + j, ok := s.jobs[run.id] + if !ok { + select { + case run.outChan <- ErrJobNotFound: + default: + } + } + select { + case <-s.shutdownCtx.Done(): + select { + case run.outChan <- ErrJobRunNowFailed: + default: + } + case s.exec.jobsIn <- jobIn{ + id: j.id, + shouldSendOut: false, + }: + select { + case run.outChan <- nil: + default: + } + } +} + func (s *scheduler) selectRemoveJob(id uuid.UUID) { j, ok := s.jobs[id] if !ok { @@ -232,12 +267,18 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { } next := j.next(j.lastRun) + if next.IsZero() { + return + } j.nextRun = next j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { select { case <-s.shutdownCtx.Done(): return - case s.exec.jobsIDsIn <- id: + case s.exec.jobsIn <- jobIn{ + id: j.id, + shouldSendOut: true, + }: } }) s.jobs[id] = j @@ -260,7 +301,10 @@ func (s *scheduler) selectNewJob(j internalJob) { next = s.now() select { case <-s.shutdownCtx.Done(): - case s.exec.jobsIDsIn <- j.id: + case s.exec.jobsIn <- jobIn{ + id: j.id, + shouldSendOut: true, + }: } } else { if next.IsZero() { @@ -271,7 +315,10 @@ func (s *scheduler) selectNewJob(j internalJob) { j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { select { case <-s.shutdownCtx.Done(): - case s.exec.jobsIDsIn <- id: + case s.exec.jobsIn <- jobIn{ + id: id, + shouldSendOut: true, + }: } }) } @@ -304,7 +351,10 @@ func (s *scheduler) selectStart() { next = s.now() select { case <-s.shutdownCtx.Done(): - case s.exec.jobsIDsIn <- id: + case s.exec.jobsIn <- jobIn{ + id: id, + shouldSendOut: true, + }: } } else { if next.IsZero() { @@ -315,7 +365,10 @@ func (s *scheduler) selectStart() { j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { select { case <-s.shutdownCtx.Done(): - case s.exec.jobsIDsIn <- jobID: + case s.exec.jobsIn <- jobIn{ + id: jobID, + shouldSendOut: true, + }: } }) } @@ -453,6 +506,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW name: j.name, tags: slices.Clone(j.tags), jobOutRequest: s.jobOutRequestCh, + runJobRequest: s.runJobRequestCh, }, nil } @@ -632,7 +686,7 @@ func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption { s.exec.limitMode = &limitModeConfig{ mode: mode, limit: limit, - in: make(chan uuid.UUID, 1000), + in: make(chan jobIn, 1000), singletonJobs: make(map[uuid.UUID]struct{}), } if mode == LimitModeReschedule { diff --git a/scheduler_test.go b/scheduler_test.go index c1051c1..a2a0fb1 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -719,6 +719,18 @@ func TestScheduler_NewJobErrors(t *testing.T) { []JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))}, ErrWithStartDateTimePast, }, + { + "oneTimeJob start at is zero", + OneTimeJob(OneTimeJobStartDateTime(time.Time{})), + nil, + ErrOneTimeJobStartDateTimePast, + }, + { + "oneTimeJob start at is in past", + OneTimeJob(OneTimeJobStartDateTime(time.Now().Add(-time.Second))), + nil, + ErrOneTimeJobStartDateTimePast, + }, } for _, tt := range tests { @@ -1425,3 +1437,201 @@ func TestScheduler_ManyJobs(t *testing.T) { assert.GreaterOrEqual(t, count, 9900) assert.LessOrEqual(t, count, 11000) } + +func TestScheduler_RunJobNow(t *testing.T) { + chDuration := make(chan struct{}, 10) + chMonthly := make(chan struct{}, 10) + chDurationImmediate := make(chan struct{}, 10) + chDurationSingleton := make(chan struct{}, 10) + chOneTime := make(chan struct{}, 10) + + tests := []struct { + name string + ch chan struct{} + j JobDefinition + fun any + opts []JobOption + expectedDiff func() time.Duration + expectedRuns int + }{ + { + "duration job", + chDuration, + DurationJob(time.Second * 10), + func() { + chDuration <- struct{}{} + }, + nil, + func() time.Duration { + return 0 + }, + 1, + }, + { + "monthly job", + chMonthly, + MonthlyJob(1, NewDaysOfTheMonth(1), NewAtTimes(NewAtTime(0, 0, 0))), + func() { + chMonthly <- struct{}{} + }, + nil, + func() time.Duration { + return 0 + }, + 1, + }, + { + "duration job - start immediately", + chDurationImmediate, + DurationJob(time.Second * 10), + func() { + chDurationImmediate <- struct{}{} + }, + []JobOption{ + WithStartAt( + WithStartImmediately(), + ), + }, + func() time.Duration { + return 10 * time.Second + }, + 2, + }, + { + "duration job - singleton", + chDurationSingleton, + DurationJob(time.Second * 10), + func() { + chDurationSingleton <- struct{}{} + time.Sleep(200 * time.Millisecond) + }, + []JobOption{ + WithStartAt( + WithStartImmediately(), + ), + WithSingletonMode(LimitModeReschedule), + }, + func() time.Duration { + return 10 * time.Second + }, + 1, + }, + { + "one time job", + chOneTime, + OneTimeJob(OneTimeJobStartImmediately()), + func() { + chOneTime <- struct{}{} + }, + nil, + nil, + 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t) + + j, err := s.NewJob(tt.j, NewTask(tt.fun), tt.opts...) + require.NoError(t, err) + + s.Start() + + var nextRunBefore time.Time + if tt.expectedDiff != nil { + for ; nextRunBefore.IsZero() || err != nil; nextRunBefore, err = j.NextRun() { //nolint:revive + } + } + + assert.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + require.NoError(t, j.RunNow()) + var runCount int + + select { + case <-tt.ch: + runCount++ + case <-time.After(time.Second): + t.Fatal("timed out waiting for job to run") + } + + timeout := time.Now().Add(time.Second) + for time.Now().Before(timeout) { + select { + case <-tt.ch: + runCount++ + default: + } + } + + assert.Equal(t, tt.expectedRuns, runCount) + + nextRunAfter, err := j.NextRun() + if tt.expectedDiff != nil && tt.expectedDiff() > 0 { + for ; nextRunBefore.IsZero() || nextRunAfter.Equal(nextRunBefore); nextRunAfter, err = j.NextRun() { //nolint:revive + } + } + + assert.NoError(t, err) + assert.NoError(t, s.Shutdown()) + + if tt.expectedDiff != nil { + assert.Equal(t, tt.expectedDiff(), nextRunAfter.Sub(nextRunBefore)) + } + }) + } +} + +func TestScheduler_OneTimeJob(t *testing.T) { + tests := []struct { + name string + startAt func() OneTimeJobStartAtOption + }{ + { + "start now", + func() OneTimeJobStartAtOption { + return OneTimeJobStartImmediately() + }, + }, + { + "start in 100 ms", + func() OneTimeJobStartAtOption { + return OneTimeJobStartDateTime(time.Now().Add(100 * time.Millisecond)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jobRan := make(chan struct{}, 2) + + s := newTestScheduler(t) + + j, err := s.NewJob( + OneTimeJob(tt.startAt()), + NewTask(func() { + jobRan <- struct{}{} + }), + ) + require.NoError(t, err) + + s.Start() + + select { + case <-jobRan: + case <-time.After(500 * time.Millisecond): + t.Fatal("timed out waiting for job to run") + } + + var nextRun time.Time + for ; nextRun.IsZero(); nextRun, err = j.NextRun() { //nolint:revive + } + assert.NoError(t, err) + assert.True(t, nextRun.Before(time.Now())) + + assert.NoError(t, s.Shutdown()) + }) + } +}