Added JobsWaitingInQueue in Scheduler (#721)

* Added JobsWaitingInQueue in Scheduler

* Fixed tests
This commit is contained in:
Giridharan Ramasamy 2024-05-01 01:02:20 +05:30 committed by GitHub
parent dbec7a9d47
commit 5f14dac979
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 79 additions and 16 deletions

View File

@ -44,6 +44,9 @@ type Scheduler interface {
// Update replaces the existing Job's JobDefinition with the provided
// JobDefinition. The Job's Job.ID() remains the same.
Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error)
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
// In case of LimitModeReschedule or no limit it will be always zero
JobsWaitingInQueue() int
}
// -----------------------------------------------
@ -678,6 +681,13 @@ func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task,
return s.addOrUpdateJob(id, jobDefinition, task, options)
}
func (s *scheduler) JobsWaitingInQueue() int {
if s.exec.limitMode != nil && s.exec.limitMode.mode == LimitModeWait {
return len(s.exec.limitMode.in)
}
return 0
}
// -----------------------------------------------
// -----------------------------------------------
// ------------- Scheduler Options ---------------

View File

@ -308,7 +308,7 @@ func TestScheduler_StopTimeout(t *testing.T) {
}
func TestScheduler_Shutdown(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
t.Run("start, stop, start, shutdown", func(t *testing.T) {
s := newTestScheduler(t,
@ -365,7 +365,7 @@ func TestScheduler_Shutdown(t *testing.T) {
}
func TestScheduler_NewJob(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition
@ -462,7 +462,7 @@ func TestScheduler_NewJob(t *testing.T) {
}
func TestScheduler_NewJobErrors(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition
@ -762,7 +762,7 @@ func TestScheduler_NewJobErrors(t *testing.T) {
}
func TestScheduler_NewJobTask(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
testFuncPtr := func() {}
testFuncWithParams := func(one, two string) {}
@ -867,7 +867,7 @@ func TestScheduler_NewJobTask(t *testing.T) {
}
func TestScheduler_WithOptionsErrors(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
opt SchedulerOption
@ -929,7 +929,7 @@ func TestScheduler_WithOptionsErrors(t *testing.T) {
}
func TestScheduler_Singleton(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
duration time.Duration
@ -992,7 +992,7 @@ func TestScheduler_Singleton(t *testing.T) {
}
func TestScheduler_LimitMode(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
@ -1064,7 +1064,7 @@ func TestScheduler_LimitMode(t *testing.T) {
}
func TestScheduler_LimitModeAndSingleton(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
@ -1205,7 +1205,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
notLocked := make(chan struct{}, 10)
notLeader := make(chan struct{}, 10)
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
count int
@ -1354,7 +1354,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
}
func TestScheduler_RemoveJob(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
addJob bool
@ -1392,8 +1392,61 @@ func TestScheduler_RemoveJob(t *testing.T) {
}
}
func TestScheduler_JobsWaitingInQueue(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
name string
limit uint
mode LimitMode
startAt func() OneTimeJobStartAtOption
expectedInQueue int
}{
{
"with mode wait limit 1",
1,
LimitModeWait,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
4,
},
{
"with mode wait limit 10",
10,
LimitModeWait,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
0,
},
{
"with mode Reschedule",
1,
LimitModeReschedule,
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond))
},
0,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithLimitConcurrentJobs(tt.limit, tt.mode))
for i := 0; i <= 4; i++ {
_, err := s.NewJob(OneTimeJob(tt.startAt()), NewTask(func() { time.Sleep(500 * time.Millisecond) }))
require.NoError(t, err)
}
s.Start()
time.Sleep(20 * time.Millisecond)
assert.Equal(t, tt.expectedInQueue, s.JobsWaitingInQueue())
require.NoError(t, s.Shutdown())
})
}
}
func TestScheduler_RemoveLotsOfJobs(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
@ -1435,7 +1488,7 @@ func TestScheduler_RemoveLotsOfJobs(t *testing.T) {
}
func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
s := newTestScheduler(t)
s.Start()
@ -1458,7 +1511,7 @@ func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) {
}
func TestScheduler_WithEventListeners(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
listenerRunCh := make(chan error, 1)
testErr := fmt.Errorf("test error")
@ -1733,7 +1786,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
}
func TestScheduler_LastRunSingleton(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
@ -1846,7 +1899,7 @@ func TestScheduler_OneTimeJob(t *testing.T) {
}
func TestScheduler_WithLimitedRuns(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
@ -1984,7 +2037,7 @@ func (t *testMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID,
}
func TestScheduler_WithMonitor(t *testing.T) {
goleak.VerifyNone(t)
defer goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition