feat: Add option to calculate intervals from job completion time for interval-based scheduling (fixes #565) (#884)

This commit is contained in:
Yash Chauhan 2025-10-22 08:39:46 +05:30 committed by GitHub
parent 361bc6a4c7
commit 3ee53e03d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 521 additions and 26 deletions

View File

@ -103,6 +103,15 @@ Jobs can be run every x months on specific days of the month and at specific tim
- [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob):
Jobs can be run at specific time(s) (either once or many times).
### Interval Timing
Jobs can be scheduled with different interval timing modes.
- [**Interval from scheduled time (default)**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationJob):
By default, jobs calculate their next run time from when they were scheduled to start, resulting in fixed intervals
regardless of execution time. Good for cron-like scheduling at predictable times.
- [**Interval from completion time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithIntervalFromCompletion):
Jobs can calculate their next run time from when they complete, ensuring consistent rest periods between executions.
Ideal for rate-limited APIs, resource-intensive jobs, and scenarios where execution time varies.
### Concurrency Limits
Jobs can be limited individually or across the entire scheduler.
- [**Per job limiting with singleton mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithSingletonMode):

View File

@ -862,6 +862,31 @@ func ExampleWithIdentifier() {
// 87b95dfc-3e71-11ef-9454-0242ac120002
}
func ExampleWithIntervalFromCompletion() {
s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }()
_, _ = s.NewJob(
gocron.DurationJob(
5*time.Minute,
),
gocron.NewTask(
func() {
time.Sleep(30 * time.Second)
},
),
gocron.WithIntervalFromCompletion(),
)
// Without WithIntervalFromCompletion (default behavior):
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:00 (only 4m30s rest).
// With WithIntervalFromCompletion:
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:30 (full 5m rest).
}
func ExampleWithLimitConcurrentJobs() {
_, _ = gocron.NewScheduler(
gocron.WithLimitConcurrentJobs(

View File

@ -200,7 +200,10 @@ func (e *executor) start() {
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn
// For intervalFromCompletion, skip rescheduling here - it will happen after job completes
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
}
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
@ -210,8 +213,11 @@ func (e *executor) start() {
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn
// For intervalFromCompletion, skip rescheduling here - it will happen after job completes
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
}
}
} else {
select {
case <-executorCtx.Done():
@ -345,7 +351,10 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
// need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling
// inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called
// when the job is sent to the singleton mode runner.
// Exception: for intervalFromCompletion, we want rescheduling to happen AFTER job completion
if !j.intervalFromCompletion {
jIn.shouldSendOut = false
}
e.runJob(*j, jIn)
}
@ -419,11 +428,15 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
return
}
// For intervalFromCompletion, we need to reschedule AFTER the job completes,
// not before. For regular jobs, we reschedule before execution (existing behavior).
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
}
startTime := time.Now()
if j.afterJobRunsWithPanic != nil {
@ -441,6 +454,15 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Success)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil)
}
// For intervalFromCompletion, reschedule AFTER the job completes
if j.intervalFromCompletion {
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
e.sendOutForRescheduling(&jIn)
}
}
func (e *executor) callJobWithRecover(j internalJob) (err error) {

40
job.go
View File

@ -41,6 +41,7 @@ type internalJob struct {
startTime time.Time
startImmediately bool
stopTime time.Time
intervalFromCompletion bool
// event listeners
afterJobRuns func(jobID uuid.UUID, jobName string)
beforeJobRuns func(jobID uuid.UUID, jobName string)
@ -682,6 +683,45 @@ func WithSingletonMode(mode LimitMode) JobOption {
}
}
// WithIntervalFromCompletion configures the job to calculate the next run time
// from the job's completion time rather than its scheduled start time.
// This ensures consistent rest periods between job executions regardless of
// how long each execution takes.
//
// By default (without this option), a job scheduled to run every N time units
// will start N time units after its previous scheduled start time. For example,
// if a job is scheduled to run every 5 minutes starting at 09:00 and takes 2 minutes
// to complete, the next run will start at 09:05 (5 minutes from 09:00), giving
// only 3 minutes of rest between completion and the next start.
//
// With this option enabled, the next run will start N time units after the job
// completes. Using the same example, if the job completes at 09:02, the next run
// will start at 09:07 (5 minutes from 09:02), ensuring a full 5 minutes of rest.
//
// Note: This option only makes sense with interval-based jobs (DurationJob, DurationRandomJob).
// For time-based jobs (CronJob, DailyJob, etc.) that run at specific times, this option
// will be ignored as those jobs are inherently scheduled at fixed times.
//
// Example:
//
// s.NewJob(
// gocron.DurationJob(5*time.Minute),
// gocron.NewTask(func() {
// // Job that takes variable time to complete
// doWork()
// }),
// gocron.WithIntervalFromCompletion(),
// )
//
// In this example, no matter how long doWork() takes, there will always be
// exactly 5 minutes between when it completes and when it starts again.
func WithIntervalFromCompletion() JobOption {
return func(j *internalJob, _ time.Time) error {
j.intervalFromCompletion = true
return nil
}
}
// WithStartAt sets the option for starting the job at
// a specific datetime.
func WithStartAt(option StartAtOption) JobOption {

View File

@ -2,6 +2,8 @@ package gocron
import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
@ -787,3 +789,384 @@ func TestNewDaysOfTheMonth(t *testing.T) {
assert.Equal(t, domInts, domIntsAgain)
}
func TestWithIntervalFromCompletion_BasicFunctionality(t *testing.T) {
t.Run("interval calculated from completion time", func(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
jobExecutionTime := 2 * time.Second
scheduledInterval := 5 * time.Second
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
// Wait for at least 3 executions
// With intervalFromCompletion:
// Execution 1: 0s-2s
// Wait: 5s (from 2s to 7s)
// Execution 2: 7s-9s
// Wait: 5s (from 9s to 14s)
// Execution 3: 14s-16s
time.Sleep(18 * time.Second)
mu.Lock()
executionCount := len(executions)
mu.Unlock()
require.GreaterOrEqual(t, executionCount, 2,
"Expected at least 2 executions")
mu.Lock()
defer mu.Unlock()
for i := 1; i < len(executions); i++ {
prev := executions[i-1]
curr := executions[i]
completionToStartGap := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), completionToStartGap.Seconds(), 0.5,
"Gap from completion to start should match the interval")
}
})
}
func TestWithIntervalFromCompletion_VariableExecutionTime(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
executionDur time.Duration
}{}
executionTimes := []time.Duration{
1 * time.Second,
3 * time.Second,
500 * time.Millisecond,
}
currentExecution := atomic.Int32{}
scheduledInterval := 4 * time.Second
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
idx := int(currentExecution.Add(1)) - 1
if idx >= len(executionTimes) {
return
}
start := time.Now()
executionTime := executionTimes[idx]
time.Sleep(executionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
executionDur time.Duration
}{start, complete, executionTime})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
// Wait for all 3 executions
// Execution 1: 0s-1s, wait 4s → next at 5s
// Execution 2: 5s-8s, wait 4s → next at 12s
// Execution 3: 12s-12.5s
time.Sleep(15 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
for i := 1; i < len(executions); i++ {
prev := executions[i-1]
curr := executions[i]
restPeriod := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), restPeriod.Seconds(), 0.5,
"Rest period should be consistent regardless of execution time")
}
}
func TestWithIntervalFromCompletion_LongRunningJob(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
jobExecutionTime := 6 * time.Second
scheduledInterval := 3 * time.Second
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
WithSingletonMode(LimitModeReschedule),
)
require.NoError(t, err)
s.Start()
// Wait for 2 executions
// Execution 1: 0s-6s, wait 3s → next at 9s
// Execution 2: 9s-15s, wait 3s → next at 18s
// Need to wait at least 16 seconds for 2 executions + buffer
time.Sleep(22 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
if len(executions) < 2 {
t.Logf("Only got %d execution(s), skipping gap assertion", len(executions))
return
}
prev := executions[0]
curr := executions[1]
completionGap := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), completionGap.Seconds(), 0.5,
"Gap should be the full interval even when execution time exceeds interval")
}
func TestWithIntervalFromCompletion_ComparedToDefault(t *testing.T) {
jobExecutionTime := 2 * time.Second
scheduledInterval := 5 * time.Second
t.Run("default behavior - interval from scheduled time", func(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
)
require.NoError(t, err)
s.Start()
time.Sleep(13 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
prev := executions[0]
curr := executions[1]
completionGap := curr.startTime.Sub(prev.completeTime)
expectedGap := scheduledInterval - jobExecutionTime
assert.InDelta(t, expectedGap.Seconds(), completionGap.Seconds(), 0.5,
"Default behavior: gap should be interval minus execution time")
})
t.Run("with intervalFromCompletion - interval from completion time", func(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
_, err = s.NewJob(
DurationJob(scheduledInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
time.Sleep(15 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
prev := executions[0]
curr := executions[1]
completionGap := curr.startTime.Sub(prev.completeTime)
assert.InDelta(t, scheduledInterval.Seconds(), completionGap.Seconds(), 0.5,
"With intervalFromCompletion: gap should be the full interval")
})
}
func TestWithIntervalFromCompletion_DurationRandomJob(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
executions := []struct {
startTime time.Time
completeTime time.Time
}{}
jobExecutionTime := 1 * time.Second
minInterval := 3 * time.Second
maxInterval := 4 * time.Second
_, err = s.NewJob(
DurationRandomJob(minInterval, maxInterval),
NewTask(func() {
start := time.Now()
time.Sleep(jobExecutionTime)
complete := time.Now()
mu.Lock()
executions = append(executions, struct {
startTime time.Time
completeTime time.Time
}{start, complete})
mu.Unlock()
}),
WithIntervalFromCompletion(),
)
require.NoError(t, err)
s.Start()
time.Sleep(15 * time.Second)
mu.Lock()
defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
for i := 1; i < len(executions); i++ {
prev := executions[i-1]
curr := executions[i]
restPeriod := curr.startTime.Sub(prev.completeTime)
assert.GreaterOrEqual(t, restPeriod.Seconds(), minInterval.Seconds()-0.5,
"Rest period should be at least minInterval")
assert.LessOrEqual(t, restPeriod.Seconds(), maxInterval.Seconds()+0.5,
"Rest period should be at most maxInterval")
}
}
func TestWithIntervalFromCompletion_FirstRun(t *testing.T) {
s, err := NewScheduler()
require.NoError(t, err)
defer func() { _ = s.Shutdown() }()
var mu sync.Mutex
var firstRunTime time.Time
_, err = s.NewJob(
DurationJob(5*time.Second),
NewTask(func() {
mu.Lock()
if firstRunTime.IsZero() {
firstRunTime = time.Now()
}
mu.Unlock()
}),
WithIntervalFromCompletion(),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
startTime := time.Now()
s.Start()
time.Sleep(1 * time.Second)
mu.Lock()
defer mu.Unlock()
require.False(t, firstRunTime.IsZero(), "Job should have run at least once")
timeSinceStart := firstRunTime.Sub(startTime)
assert.Less(t, timeSinceStart.Seconds(), 1.0,
"First run should happen quickly with WithStartImmediately")
}

View File

@ -347,6 +347,21 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
}
var scheduleFrom time.Time
// If intervalFromCompletion is enabled, calculate the next run time
// from when the job completed (lastRun) rather than when it was scheduled.
if j.intervalFromCompletion {
// Use the completion time (lastRun is set when the job completes)
scheduleFrom = j.lastRun
if scheduleFrom.IsZero() {
// For the first run, use the start time or current time
scheduleFrom = j.startTime
if scheduleFrom.IsZero() {
scheduleFrom = s.now()
}
}
} else {
// Default behavior: use the scheduled time
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
@ -358,6 +373,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
if scheduleFrom.IsZero() {
scheduleFrom = j.startTime
}
}
next := j.next(scheduleFrom)
if next.IsZero() {