feat: add option to calculate intervals from job completion time

This commit is contained in:
iyashjayesh 2025-10-21 19:44:25 +05:30
parent 361bc6a4c7
commit e94379147a
6 changed files with 504 additions and 23 deletions

View File

@ -103,6 +103,15 @@ Jobs can be run every x months on specific days of the month and at specific tim
- [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob): - [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob):
Jobs can be run at specific time(s) (either once or many times). Jobs can be run at specific time(s) (either once or many times).
### Interval Timing
Jobs can be scheduled with different interval timing modes.
- [**Interval from scheduled time (default)**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationJob):
By default, jobs calculate their next run time from when they were scheduled to start, resulting in fixed intervals
regardless of execution time. Good for cron-like scheduling at predictable times.
- [**Interval from completion time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithIntervalFromCompletion):
Jobs can calculate their next run time from when they complete, ensuring consistent rest periods between executions.
Ideal for rate-limited APIs, resource-intensive jobs, and scenarios where execution time varies.
### Concurrency Limits ### Concurrency Limits
Jobs can be limited individually or across the entire scheduler. Jobs can be limited individually or across the entire scheduler.
- [**Per job limiting with singleton mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithSingletonMode): - [**Per job limiting with singleton mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithSingletonMode):

View File

@ -1016,6 +1016,31 @@ func ExampleWithSingletonMode() {
) )
} }
func ExampleWithIntervalFromCompletion() {
s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }()
_, _ = s.NewJob(
gocron.DurationJob(
5*time.Minute,
),
gocron.NewTask(
func() {
time.Sleep(30 * time.Second)
},
),
gocron.WithIntervalFromCompletion(),
)
// Without WithIntervalFromCompletion (default behavior):
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:00 (only 4m30s rest).
// With WithIntervalFromCompletion:
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:30 (full 5m rest).
}
func ExampleWithStartAt() { func ExampleWithStartAt() {
s, _ := gocron.NewScheduler() s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }() defer func() { _ = s.Shutdown() }()

View File

@ -419,10 +419,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
return return
} }
e.sendOutForRescheduling(&jIn) // For intervalFromCompletion, we need to reschedule AFTER the job completes,
select { // not before. For regular jobs, we reschedule before execution (existing behavior).
case e.jobsOutCompleted <- j.id: if !j.intervalFromCompletion {
case <-e.ctx.Done(): e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
} }
startTime := time.Now() startTime := time.Now()
@ -441,6 +445,15 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Success) e.incrementJobCounter(j, Success)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil) e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil)
} }
// For intervalFromCompletion, reschedule AFTER the job completes
if j.intervalFromCompletion {
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
e.sendOutForRescheduling(&jIn)
}
} }
func (e *executor) callJobWithRecover(j internalJob) (err error) { func (e *executor) callJobWithRecover(j internalJob) (err error) {

60
job.go
View File

@ -31,16 +31,17 @@ type internalJob struct {
// have multiple nextScheduled times // have multiple nextScheduled times
nextScheduled []time.Time nextScheduled []time.Time
lastRun time.Time lastRun time.Time
function any function any
parameters []any parameters []any
timer clockwork.Timer timer clockwork.Timer
singletonMode bool singletonMode bool
singletonLimitMode LimitMode singletonLimitMode LimitMode
limitRunsTo *limitRunsTo limitRunsTo *limitRunsTo
startTime time.Time startTime time.Time
startImmediately bool startImmediately bool
stopTime time.Time stopTime time.Time
intervalFromCompletion bool
// event listeners // event listeners
afterJobRuns func(jobID uuid.UUID, jobName string) afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string) beforeJobRuns func(jobID uuid.UUID, jobName string)
@ -682,6 +683,45 @@ func WithSingletonMode(mode LimitMode) JobOption {
} }
} }
// WithIntervalFromCompletion configures the job to calculate the next run time
// from the job's completion time rather than its scheduled start time.
// This ensures consistent rest periods between job executions regardless of
// how long each execution takes.
//
// By default (without this option), a job scheduled to run every N time units
// will start N time units after its previous scheduled start time. For example,
// if a job is scheduled to run every 5 minutes starting at 09:00 and takes 2 minutes
// to complete, the next run will start at 09:05 (5 minutes from 09:00), giving
// only 3 minutes of rest between completion and the next start.
//
// With this option enabled, the next run will start N time units after the job
// completes. Using the same example, if the job completes at 09:02, the next run
// will start at 09:07 (5 minutes from 09:02), ensuring a full 5 minutes of rest.
//
// Note: This option only makes sense with interval-based jobs (DurationJob, DurationRandomJob).
// For time-based jobs (CronJob, DailyJob, etc.) that run at specific times, this option
// will be ignored as those jobs are inherently scheduled at fixed times.
//
// Example:
//
// s.NewJob(
// gocron.DurationJob(5*time.Minute),
// gocron.NewTask(func() {
// // Job that takes variable time to complete
// doWork()
// }),
// gocron.WithIntervalFromCompletion(),
// )
//
// In this example, no matter how long doWork() takes, there will always be
// exactly 5 minutes between when it completes and when it starts again.
func WithIntervalFromCompletion() JobOption {
return func(j *internalJob, _ time.Time) error {
j.intervalFromCompletion = true
return nil
}
}
// WithStartAt sets the option for starting the job at // WithStartAt sets the option for starting the job at
// a specific datetime. // a specific datetime.
func WithStartAt(option StartAtOption) JobOption { func WithStartAt(option StartAtOption) JobOption {

View File

@ -2,6 +2,8 @@ package gocron
import ( import (
"math/rand" "math/rand"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -787,3 +789,379 @@ func TestNewDaysOfTheMonth(t *testing.T) {
assert.Equal(t, domInts, domIntsAgain) assert.Equal(t, domInts, domIntsAgain)
} }
func TestWithIntervalFromCompletion_BasicFunctionality(t *testing.T) {
t.Run("interval calculated from completion time", func(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
jobExecutionTime := 2 * time.Second
scheduledInterval := 5 * time.Second
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
// Wait for at least 3 executions
// With intervalFromCompletion:
// Execution 1: 0s-2s
// Wait: 5s (from 2s to 7s)
// Execution 2: 7s-9s
// Wait: 5s (from 9s to 14s)
// Execution 3: 14s-16s
time.Sleep(18 * time.Second)
mu.Lock()
executionCount := len(executions)
mu.Unlock()
require.GreaterOrEqual(t, executionCount, 2,
"Expected at least 2 executions")
mu.Lock()
defer mu.Unlock()
for i := 1; i < len(executions); i++ {
prev := executions[i-1]
curr := executions[i]
completionToStartGap := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), completionToStartGap.Seconds(), 0.5,
"Gap from completion to start should match the interval")
}
})
}
func TestWithIntervalFromCompletion_VariableExecutionTime(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
executionDur time.Duration
}{}
executionTimes := []time.Duration{
1 * time.Second,
3 * time.Second,
500 * time.Millisecond,
}
currentExecution := atomic.Int32{}
scheduledInterval := 4 * time.Second
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
idx := int(currentExecution.Add(1)) - 1
if idx >= len(executionTimes) {
return
}
start := time.Now()
executionTime := executionTimes[idx]
time.Sleep(executionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
executionDur time.Duration
}{start, complete, executionTime})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
// Wait for all 3 executions
// Execution 1: 0s-1s, wait 4s → next at 5s
// Execution 2: 5s-8s, wait 4s → next at 12s
// Execution 3: 12s-12.5s
time.Sleep(15 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
for i := 1; i < len(executions); i++ {
prev := executions[i-1]
curr := executions[i]
restPeriod := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), restPeriod.Seconds(), 0.5,
"Rest period should be consistent regardless of execution time")
}
}
func TestWithIntervalFromCompletion_LongRunningJob(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
jobExecutionTime := 6 * time.Second
scheduledInterval := 3 * time.Second
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
WithSingletonMode(LimitModeReschedule),
)
require.NoError(t, err)
s.Start()
// Wait for 2 executions
// Execution 1: 0s-6s, wait 3s → next at 9s
// Execution 2: 9s-15s, wait 3s → next at 18s
// Need to wait at least 16 seconds for 2 executions + buffer
time.Sleep(20 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
prev := executions[0]
curr := executions[1]
completionGap := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), completionGap.Seconds(), 0.5,
"Gap should be the full interval even when execution time exceeds interval")
}
func TestWithIntervalFromCompletion_ComparedToDefault(t *testing.T) {
jobExecutionTime := 2 * time.Second
scheduledInterval := 5 * time.Second
t.Run("default behavior - interval from scheduled time", func(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
)
require.NoError(t, err)
s.Start()
time.Sleep(13 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
prev := executions[0]
curr := executions[1]
completionGap := curr.startTime.Sub(prev.completeTime)
expectedGap := scheduledInterval - jobExecutionTime
assert.InDelta(t, expectedGap.Seconds(), completionGap.Seconds(), 0.5,
"Default behavior: gap should be interval minus execution time")
})
t.Run("with intervalFromCompletion - interval from completion time", func(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
time.Sleep(15 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
prev := executions[0]
curr := executions[1]
completionGap := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), completionGap.Seconds(), 0.5,
"With intervalFromCompletion: gap should be the full interval")
})
}
func TestWithIntervalFromCompletion_DurationRandomJob(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
jobExecutionTime := 1 * time.Second
minInterval := 3 * time.Second
maxInterval := 4 * time.Second
_, err = s.NewJob(
DurationRandomJob(minInterval, maxInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
time.Sleep(15 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
for i := 1; i < len(executions); i++ {
prev := executions[i-1]
curr := executions[i]
restPeriod := curr.startTime.Sub(prev.completeTime)
assert.GreaterOrEqual(t, restPeriod.Seconds(), minInterval.Seconds()-0.5,
"Rest period should be at least minInterval")
assert.LessOrEqual(t, restPeriod.Seconds(), maxInterval.Seconds()+0.5,
"Rest period should be at most maxInterval")
}
}
func TestWithIntervalFromCompletion_FirstRun(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
var firstRunTime time.Time
_, err = s.NewJob(
DurationJob(5*time.Second),
NewTask(func() {
mu.Lock()
if firstRunTime.IsZero() {
firstRunTime = time.Now()
}
mu.Unlock()
}),
WithIntervalFromCompletion(),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
startTime := time.Now()
s.Start()
time.Sleep(1 * time.Second)
mu.Lock()
defer mu.Unlock()
require.False(t, firstRunTime.IsZero(), "Job should have run at least once")
timeSinceStart := firstRunTime.Sub(startTime)
assert.Less(t, timeSinceStart.Seconds(), 1.0,
"First run should happen quickly with WithStartImmediately")
}

View File

@ -347,16 +347,32 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
} }
var scheduleFrom time.Time var scheduleFrom time.Time
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest // If intervalFromCompletion is enabled, calculate the next run time
// out in the future and the time from which we want to calculate // from when the job completed (lastRun) rather than when it was scheduled.
// the subsequent next run time. if j.intervalFromCompletion {
slices.SortStableFunc(j.nextScheduled, ascendingTime) // Use the completion time (lastRun is set when the job completes)
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1] scheduleFrom = j.lastRun
} if scheduleFrom.IsZero() {
// For the first run, use the start time or current time
scheduleFrom = j.startTime
if scheduleFrom.IsZero() {
scheduleFrom = s.now()
}
}
} else {
// Default behavior: use the scheduled time
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
// the subsequent next run time.
slices.SortStableFunc(j.nextScheduled, ascendingTime)
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1]
}
if scheduleFrom.IsZero() { if scheduleFrom.IsZero() {
scheduleFrom = j.startTime scheduleFrom = j.startTime
}
} }
next := j.next(scheduleFrom) next := j.next(scheduleFrom)