From 3ee53e03d97118dd400353aa95aeab80699bbaa5 Mon Sep 17 00:00:00 2001 From: Yash Chauhan <53042582+iyashjayesh@users.noreply.github.com> Date: Wed, 22 Oct 2025 08:39:46 +0530 Subject: [PATCH] feat: Add option to calculate intervals from job completion time for interval-based scheduling (fixes #565) (#884) --- README.md | 9 ++ example_test.go | 25 ++++ executor.go | 36 ++++- job.go | 60 ++++++-- job_test.go | 383 ++++++++++++++++++++++++++++++++++++++++++++++++ scheduler.go | 34 +++-- 6 files changed, 521 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 5762e4c..0f0d61d 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,15 @@ Jobs can be run every x months on specific days of the month and at specific tim - [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob): Jobs can be run at specific time(s) (either once or many times). +### Interval Timing +Jobs can be scheduled with different interval timing modes. +- [**Interval from scheduled time (default)**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationJob): +By default, jobs calculate their next run time from when they were scheduled to start, resulting in fixed intervals +regardless of execution time. Good for cron-like scheduling at predictable times. +- [**Interval from completion time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithIntervalFromCompletion): +Jobs can calculate their next run time from when they complete, ensuring consistent rest periods between executions. +Ideal for rate-limited APIs, resource-intensive jobs, and scenarios where execution time varies. + ### Concurrency Limits Jobs can be limited individually or across the entire scheduler. - [**Per job limiting with singleton mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithSingletonMode): diff --git a/example_test.go b/example_test.go index 015f3b5..341ea2a 100644 --- a/example_test.go +++ b/example_test.go @@ -862,6 +862,31 @@ func ExampleWithIdentifier() { // 87b95dfc-3e71-11ef-9454-0242ac120002 } +func ExampleWithIntervalFromCompletion() { + s, _ := gocron.NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + gocron.DurationJob( + 5*time.Minute, + ), + gocron.NewTask( + func() { + time.Sleep(30 * time.Second) + }, + ), + gocron.WithIntervalFromCompletion(), + ) + + // Without WithIntervalFromCompletion (default behavior): + // If the job starts at 00:00 and completes at 00:00:30, + // the next job starts at 00:05:00 (only 4m30s rest). + + // With WithIntervalFromCompletion: + // If the job starts at 00:00 and completes at 00:00:30, + // the next job starts at 00:05:30 (full 5m rest). +} + func ExampleWithLimitConcurrentJobs() { _, _ = gocron.NewScheduler( gocron.WithLimitConcurrentJobs( diff --git a/executor.go b/executor.go index 594d5ea..c5c2a03 100644 --- a/executor.go +++ b/executor.go @@ -200,7 +200,10 @@ func (e *executor) start() { select { case runner.rescheduleLimiter <- struct{}{}: runner.in <- jIn - e.sendOutForRescheduling(&jIn) + // For intervalFromCompletion, skip rescheduling here - it will happen after job completes + if !j.intervalFromCompletion { + e.sendOutForRescheduling(&jIn) + } default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing @@ -210,7 +213,10 @@ func (e *executor) start() { } else { // wait mode, fill up that queue (buffered channel, so it's ok) runner.in <- jIn - e.sendOutForRescheduling(&jIn) + // For intervalFromCompletion, skip rescheduling here - it will happen after job completes + if !j.intervalFromCompletion { + e.sendOutForRescheduling(&jIn) + } } } else { select { @@ -345,7 +351,10 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup // need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling // inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called // when the job is sent to the singleton mode runner. - jIn.shouldSendOut = false + // Exception: for intervalFromCompletion, we want rescheduling to happen AFTER job completion + if !j.intervalFromCompletion { + jIn.shouldSendOut = false + } e.runJob(*j, jIn) } @@ -419,10 +428,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { return } - e.sendOutForRescheduling(&jIn) - select { - case e.jobsOutCompleted <- j.id: - case <-e.ctx.Done(): + // For intervalFromCompletion, we need to reschedule AFTER the job completes, + // not before. For regular jobs, we reschedule before execution (existing behavior). + if !j.intervalFromCompletion { + e.sendOutForRescheduling(&jIn) + select { + case e.jobsOutCompleted <- j.id: + case <-e.ctx.Done(): + } } startTime := time.Now() @@ -441,6 +454,15 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { e.incrementJobCounter(j, Success) e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil) } + + // For intervalFromCompletion, reschedule AFTER the job completes + if j.intervalFromCompletion { + select { + case e.jobsOutCompleted <- j.id: + case <-e.ctx.Done(): + } + e.sendOutForRescheduling(&jIn) + } } func (e *executor) callJobWithRecover(j internalJob) (err error) { diff --git a/job.go b/job.go index 80d1310..b89f10d 100644 --- a/job.go +++ b/job.go @@ -31,16 +31,17 @@ type internalJob struct { // have multiple nextScheduled times nextScheduled []time.Time - lastRun time.Time - function any - parameters []any - timer clockwork.Timer - singletonMode bool - singletonLimitMode LimitMode - limitRunsTo *limitRunsTo - startTime time.Time - startImmediately bool - stopTime time.Time + lastRun time.Time + function any + parameters []any + timer clockwork.Timer + singletonMode bool + singletonLimitMode LimitMode + limitRunsTo *limitRunsTo + startTime time.Time + startImmediately bool + stopTime time.Time + intervalFromCompletion bool // event listeners afterJobRuns func(jobID uuid.UUID, jobName string) beforeJobRuns func(jobID uuid.UUID, jobName string) @@ -682,6 +683,45 @@ func WithSingletonMode(mode LimitMode) JobOption { } } +// WithIntervalFromCompletion configures the job to calculate the next run time +// from the job's completion time rather than its scheduled start time. +// This ensures consistent rest periods between job executions regardless of +// how long each execution takes. +// +// By default (without this option), a job scheduled to run every N time units +// will start N time units after its previous scheduled start time. For example, +// if a job is scheduled to run every 5 minutes starting at 09:00 and takes 2 minutes +// to complete, the next run will start at 09:05 (5 minutes from 09:00), giving +// only 3 minutes of rest between completion and the next start. +// +// With this option enabled, the next run will start N time units after the job +// completes. Using the same example, if the job completes at 09:02, the next run +// will start at 09:07 (5 minutes from 09:02), ensuring a full 5 minutes of rest. +// +// Note: This option only makes sense with interval-based jobs (DurationJob, DurationRandomJob). +// For time-based jobs (CronJob, DailyJob, etc.) that run at specific times, this option +// will be ignored as those jobs are inherently scheduled at fixed times. +// +// Example: +// +// s.NewJob( +// gocron.DurationJob(5*time.Minute), +// gocron.NewTask(func() { +// // Job that takes variable time to complete +// doWork() +// }), +// gocron.WithIntervalFromCompletion(), +// ) +// +// In this example, no matter how long doWork() takes, there will always be +// exactly 5 minutes between when it completes and when it starts again. +func WithIntervalFromCompletion() JobOption { + return func(j *internalJob, _ time.Time) error { + j.intervalFromCompletion = true + return nil + } +} + // WithStartAt sets the option for starting the job at // a specific datetime. func WithStartAt(option StartAtOption) JobOption { diff --git a/job_test.go b/job_test.go index f3d960f..7128667 100644 --- a/job_test.go +++ b/job_test.go @@ -2,6 +2,8 @@ package gocron import ( "math/rand" + "sync" + "sync/atomic" "testing" "time" @@ -787,3 +789,384 @@ func TestNewDaysOfTheMonth(t *testing.T) { assert.Equal(t, domInts, domIntsAgain) } + +func TestWithIntervalFromCompletion_BasicFunctionality(t *testing.T) { + t.Run("interval calculated from completion time", func(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + executions := []struct { + startTime time.Time + completeTime time.Time + }{} + + jobExecutionTime := 2 * time.Second + scheduledInterval := 5 * time.Second + + _, err = s.NewJob( + DurationJob(scheduledInterval), + NewTask(func() { + start := time.Now() + time.Sleep(jobExecutionTime) + complete := time.Now() + + mu.Lock() + executions = append(executions, struct { + startTime time.Time + completeTime time.Time + }{start, complete}) + mu.Unlock() + }), + WithIntervalFromCompletion(), + ) + require.NoError(t, err) + + s.Start() + + // Wait for at least 3 executions + // With intervalFromCompletion: + // Execution 1: 0s-2s + // Wait: 5s (from 2s to 7s) + // Execution 2: 7s-9s + // Wait: 5s (from 9s to 14s) + // Execution 3: 14s-16s + time.Sleep(18 * time.Second) + + mu.Lock() + executionCount := len(executions) + mu.Unlock() + + require.GreaterOrEqual(t, executionCount, 2, + "Expected at least 2 executions") + + mu.Lock() + defer mu.Unlock() + + for i := 1; i < len(executions); i++ { + prev := executions[i-1] + curr := executions[i] + + completionToStartGap := curr.startTime.Sub(prev.completeTime) + + assert.InDelta(t, scheduledInterval.Seconds(), completionToStartGap.Seconds(), 0.5, + "Gap from completion to start should match the interval") + } + }) +} + +func TestWithIntervalFromCompletion_VariableExecutionTime(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + executions := []struct { + startTime time.Time + completeTime time.Time + executionDur time.Duration + }{} + + executionTimes := []time.Duration{ + 1 * time.Second, + 3 * time.Second, + 500 * time.Millisecond, + } + currentExecution := atomic.Int32{} + scheduledInterval := 4 * time.Second + + _, err = s.NewJob( + DurationJob(scheduledInterval), + NewTask(func() { + idx := int(currentExecution.Add(1)) - 1 + if idx >= len(executionTimes) { + return + } + + start := time.Now() + executionTime := executionTimes[idx] + time.Sleep(executionTime) + complete := time.Now() + + mu.Lock() + executions = append(executions, struct { + startTime time.Time + completeTime time.Time + executionDur time.Duration + }{start, complete, executionTime}) + mu.Unlock() + }), + WithIntervalFromCompletion(), + ) + require.NoError(t, err) + + s.Start() + + // Wait for all 3 executions + // Execution 1: 0s-1s, wait 4s → next at 5s + // Execution 2: 5s-8s, wait 4s → next at 12s + // Execution 3: 12s-12.5s + time.Sleep(15 * time.Second) + + mu.Lock() + defer mu.Unlock() + + require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions") + + for i := 1; i < len(executions); i++ { + prev := executions[i-1] + curr := executions[i] + + restPeriod := curr.startTime.Sub(prev.completeTime) + + assert.InDelta(t, scheduledInterval.Seconds(), restPeriod.Seconds(), 0.5, + "Rest period should be consistent regardless of execution time") + } +} + +func TestWithIntervalFromCompletion_LongRunningJob(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + executions := []struct { + startTime time.Time + completeTime time.Time + }{} + + jobExecutionTime := 6 * time.Second + scheduledInterval := 3 * time.Second + + _, err = s.NewJob( + DurationJob(scheduledInterval), + NewTask(func() { + start := time.Now() + time.Sleep(jobExecutionTime) + complete := time.Now() + + mu.Lock() + executions = append(executions, struct { + startTime time.Time + completeTime time.Time + }{start, complete}) + mu.Unlock() + }), + WithIntervalFromCompletion(), + WithSingletonMode(LimitModeReschedule), + ) + require.NoError(t, err) + + s.Start() + + // Wait for 2 executions + // Execution 1: 0s-6s, wait 3s → next at 9s + // Execution 2: 9s-15s, wait 3s → next at 18s + // Need to wait at least 16 seconds for 2 executions + buffer + time.Sleep(22 * time.Second) + + mu.Lock() + defer mu.Unlock() + + require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions") + + if len(executions) < 2 { + t.Logf("Only got %d execution(s), skipping gap assertion", len(executions)) + return + } + + prev := executions[0] + curr := executions[1] + + completionGap := curr.startTime.Sub(prev.completeTime) + + assert.InDelta(t, scheduledInterval.Seconds(), completionGap.Seconds(), 0.5, + "Gap should be the full interval even when execution time exceeds interval") +} + +func TestWithIntervalFromCompletion_ComparedToDefault(t *testing.T) { + jobExecutionTime := 2 * time.Second + scheduledInterval := 5 * time.Second + + t.Run("default behavior - interval from scheduled time", func(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + executions := []struct { + startTime time.Time + completeTime time.Time + }{} + + _, err = s.NewJob( + DurationJob(scheduledInterval), + NewTask(func() { + start := time.Now() + time.Sleep(jobExecutionTime) + complete := time.Now() + + mu.Lock() + executions = append(executions, struct { + startTime time.Time + completeTime time.Time + }{start, complete}) + mu.Unlock() + }), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(13 * time.Second) + + mu.Lock() + defer mu.Unlock() + + require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions") + + prev := executions[0] + curr := executions[1] + completionGap := curr.startTime.Sub(prev.completeTime) + + expectedGap := scheduledInterval - jobExecutionTime + assert.InDelta(t, expectedGap.Seconds(), completionGap.Seconds(), 0.5, + "Default behavior: gap should be interval minus execution time") + }) + + t.Run("with intervalFromCompletion - interval from completion time", func(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + executions := []struct { + startTime time.Time + completeTime time.Time + }{} + + _, err = s.NewJob( + DurationJob(scheduledInterval), + NewTask(func() { + start := time.Now() + time.Sleep(jobExecutionTime) + complete := time.Now() + + mu.Lock() + executions = append(executions, struct { + startTime time.Time + completeTime time.Time + }{start, complete}) + mu.Unlock() + }), + WithIntervalFromCompletion(), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(15 * time.Second) + + mu.Lock() + defer mu.Unlock() + + require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions") + + prev := executions[0] + curr := executions[1] + completionGap := curr.startTime.Sub(prev.completeTime) + + assert.InDelta(t, scheduledInterval.Seconds(), completionGap.Seconds(), 0.5, + "With intervalFromCompletion: gap should be the full interval") + }) +} + +func TestWithIntervalFromCompletion_DurationRandomJob(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + executions := []struct { + startTime time.Time + completeTime time.Time + }{} + + jobExecutionTime := 1 * time.Second + minInterval := 3 * time.Second + maxInterval := 4 * time.Second + + _, err = s.NewJob( + DurationRandomJob(minInterval, maxInterval), + NewTask(func() { + start := time.Now() + time.Sleep(jobExecutionTime) + complete := time.Now() + + mu.Lock() + executions = append(executions, struct { + startTime time.Time + completeTime time.Time + }{start, complete}) + mu.Unlock() + }), + WithIntervalFromCompletion(), + ) + require.NoError(t, err) + + s.Start() + + time.Sleep(15 * time.Second) + + mu.Lock() + defer mu.Unlock() + + require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions") + + for i := 1; i < len(executions); i++ { + prev := executions[i-1] + curr := executions[i] + + restPeriod := curr.startTime.Sub(prev.completeTime) + assert.GreaterOrEqual(t, restPeriod.Seconds(), minInterval.Seconds()-0.5, + "Rest period should be at least minInterval") + assert.LessOrEqual(t, restPeriod.Seconds(), maxInterval.Seconds()+0.5, + "Rest period should be at most maxInterval") + } +} + +func TestWithIntervalFromCompletion_FirstRun(t *testing.T) { + s, err := NewScheduler() + require.NoError(t, err) + defer func() { _ = s.Shutdown() }() + + var mu sync.Mutex + var firstRunTime time.Time + + _, err = s.NewJob( + DurationJob(5*time.Second), + NewTask(func() { + mu.Lock() + if firstRunTime.IsZero() { + firstRunTime = time.Now() + } + mu.Unlock() + }), + WithIntervalFromCompletion(), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + startTime := time.Now() + s.Start() + + time.Sleep(1 * time.Second) + + mu.Lock() + defer mu.Unlock() + + require.False(t, firstRunTime.IsZero(), "Job should have run at least once") + + timeSinceStart := firstRunTime.Sub(startTime) + assert.Less(t, timeSinceStart.Seconds(), 1.0, + "First run should happen quickly with WithStartImmediately") +} diff --git a/scheduler.go b/scheduler.go index 1656aee..274f1e2 100644 --- a/scheduler.go +++ b/scheduler.go @@ -347,16 +347,32 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { } var scheduleFrom time.Time - if len(j.nextScheduled) > 0 { - // always grab the last element in the slice as that is the furthest - // out in the future and the time from which we want to calculate - // the subsequent next run time. - slices.SortStableFunc(j.nextScheduled, ascendingTime) - scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1] - } - if scheduleFrom.IsZero() { - scheduleFrom = j.startTime + // If intervalFromCompletion is enabled, calculate the next run time + // from when the job completed (lastRun) rather than when it was scheduled. + if j.intervalFromCompletion { + // Use the completion time (lastRun is set when the job completes) + scheduleFrom = j.lastRun + if scheduleFrom.IsZero() { + // For the first run, use the start time or current time + scheduleFrom = j.startTime + if scheduleFrom.IsZero() { + scheduleFrom = s.now() + } + } + } else { + // Default behavior: use the scheduled time + if len(j.nextScheduled) > 0 { + // always grab the last element in the slice as that is the furthest + // out in the future and the time from which we want to calculate + // the subsequent next run time. + slices.SortStableFunc(j.nextScheduled, ascendingTime) + scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1] + } + + if scheduleFrom.IsZero() { + scheduleFrom = j.startTime + } } next := j.next(scheduleFrom)