update the functionality for singleton mode jobs

This commit is contained in:
iyashjayesh 2025-10-21 20:10:08 +05:30
parent e94379147a
commit 1ee2481e48
5 changed files with 45 additions and 31 deletions

View File

@ -862,6 +862,31 @@ func ExampleWithIdentifier() {
// 87b95dfc-3e71-11ef-9454-0242ac120002 // 87b95dfc-3e71-11ef-9454-0242ac120002
} }
func ExampleWithIntervalFromCompletion() {
s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }()
_, _ = s.NewJob(
gocron.DurationJob(
5*time.Minute,
),
gocron.NewTask(
func() {
time.Sleep(30 * time.Second)
},
),
gocron.WithIntervalFromCompletion(),
)
// Without WithIntervalFromCompletion (default behavior):
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:00 (only 4m30s rest).
// With WithIntervalFromCompletion:
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:30 (full 5m rest).
}
func ExampleWithLimitConcurrentJobs() { func ExampleWithLimitConcurrentJobs() {
_, _ = gocron.NewScheduler( _, _ = gocron.NewScheduler(
gocron.WithLimitConcurrentJobs( gocron.WithLimitConcurrentJobs(
@ -1016,31 +1041,6 @@ func ExampleWithSingletonMode() {
) )
} }
func ExampleWithIntervalFromCompletion() {
s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }()
_, _ = s.NewJob(
gocron.DurationJob(
5*time.Minute,
),
gocron.NewTask(
func() {
time.Sleep(30 * time.Second)
},
),
gocron.WithIntervalFromCompletion(),
)
// Without WithIntervalFromCompletion (default behavior):
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:00 (only 4m30s rest).
// With WithIntervalFromCompletion:
// If the job starts at 00:00 and completes at 00:00:30,
// the next job starts at 00:05:30 (full 5m rest).
}
func ExampleWithStartAt() { func ExampleWithStartAt() {
s, _ := gocron.NewScheduler() s, _ := gocron.NewScheduler()
defer func() { _ = s.Shutdown() }() defer func() { _ = s.Shutdown() }()

View File

@ -200,7 +200,10 @@ func (e *executor) start() {
select { select {
case runner.rescheduleLimiter <- struct{}{}: case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn runner.in <- jIn
e.sendOutForRescheduling(&jIn) // For intervalFromCompletion, skip rescheduling here - it will happen after job completes
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
}
default: default:
// runner is busy, reschedule the work for later // runner is busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
@ -210,7 +213,10 @@ func (e *executor) start() {
} else { } else {
// wait mode, fill up that queue (buffered channel, so it's ok) // wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn runner.in <- jIn
e.sendOutForRescheduling(&jIn) // For intervalFromCompletion, skip rescheduling here - it will happen after job completes
if !j.intervalFromCompletion {
e.sendOutForRescheduling(&jIn)
}
} }
} else { } else {
select { select {
@ -345,7 +351,10 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
// need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling // need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling
// inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called // inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called
// when the job is sent to the singleton mode runner. // when the job is sent to the singleton mode runner.
jIn.shouldSendOut = false // Exception: for intervalFromCompletion, we want rescheduling to happen AFTER job completion
if !j.intervalFromCompletion {
jIn.shouldSendOut = false
}
e.runJob(*j, jIn) e.runJob(*j, jIn)
} }

2
job.go
View File

@ -701,7 +701,7 @@ func WithSingletonMode(mode LimitMode) JobOption {
// Note: This option only makes sense with interval-based jobs (DurationJob, DurationRandomJob). // Note: This option only makes sense with interval-based jobs (DurationJob, DurationRandomJob).
// For time-based jobs (CronJob, DailyJob, etc.) that run at specific times, this option // For time-based jobs (CronJob, DailyJob, etc.) that run at specific times, this option
// will be ignored as those jobs are inherently scheduled at fixed times. // will be ignored as those jobs are inherently scheduled at fixed times.
// //
// Example: // Example:
// //
// s.NewJob( // s.NewJob(

View File

@ -964,13 +964,18 @@ func TestWithIntervalFromCompletion_LongRunningJob(t *testing.T) {
// Execution 1: 0s-6s, wait 3s → next at 9s // Execution 1: 0s-6s, wait 3s → next at 9s
// Execution 2: 9s-15s, wait 3s → next at 18s // Execution 2: 9s-15s, wait 3s → next at 18s
// Need to wait at least 16 seconds for 2 executions + buffer // Need to wait at least 16 seconds for 2 executions + buffer
time.Sleep(20 * time.Second) time.Sleep(22 * time.Second)
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions") require.GreaterOrEqual(t, len(executions), 2, "Expected at least 2 executions")
if len(executions) < 2 {
t.Logf("Only got %d execution(s), skipping gap assertion", len(executions))
return
}
prev := executions[0] prev := executions[0]
curr := executions[1] curr := executions[1]

View File

@ -347,7 +347,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
} }
var scheduleFrom time.Time var scheduleFrom time.Time
// If intervalFromCompletion is enabled, calculate the next run time // If intervalFromCompletion is enabled, calculate the next run time
// from when the job completed (lastRun) rather than when it was scheduled. // from when the job completed (lastRun) rather than when it was scheduled.
if j.intervalFromCompletion { if j.intervalFromCompletion {