From 5f14dac97965473003609e9dafaab2bdb35b7d66 Mon Sep 17 00:00:00 2001 From: Giridharan Ramasamy <46867712+giri-vsr@users.noreply.github.com> Date: Wed, 1 May 2024 01:02:20 +0530 Subject: [PATCH] Added JobsWaitingInQueue in Scheduler (#721) * Added JobsWaitingInQueue in Scheduler * Fixed tests --- scheduler.go | 10 ++++++ scheduler_test.go | 85 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/scheduler.go b/scheduler.go index 7980349..89afc97 100644 --- a/scheduler.go +++ b/scheduler.go @@ -44,6 +44,9 @@ type Scheduler interface { // Update replaces the existing Job's JobDefinition with the provided // JobDefinition. The Job's Job.ID() remains the same. Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error) + // JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait + // In case of LimitModeReschedule or no limit it will be always zero + JobsWaitingInQueue() int } // ----------------------------------------------- @@ -678,6 +681,13 @@ func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task, return s.addOrUpdateJob(id, jobDefinition, task, options) } +func (s *scheduler) JobsWaitingInQueue() int { + if s.exec.limitMode != nil && s.exec.limitMode.mode == LimitModeWait { + return len(s.exec.limitMode.in) + } + return 0 +} + // ----------------------------------------------- // ----------------------------------------------- // ------------- Scheduler Options --------------- diff --git a/scheduler_test.go b/scheduler_test.go index f26a7c6..a4724b7 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -308,7 +308,7 @@ func TestScheduler_StopTimeout(t *testing.T) { } func TestScheduler_Shutdown(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) t.Run("start, stop, start, shutdown", func(t *testing.T) { s := newTestScheduler(t, @@ -365,7 +365,7 @@ func TestScheduler_Shutdown(t *testing.T) { } func TestScheduler_NewJob(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string jd JobDefinition @@ -462,7 +462,7 @@ func TestScheduler_NewJob(t *testing.T) { } func TestScheduler_NewJobErrors(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string jd JobDefinition @@ -762,7 +762,7 @@ func TestScheduler_NewJobErrors(t *testing.T) { } func TestScheduler_NewJobTask(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) testFuncPtr := func() {} testFuncWithParams := func(one, two string) {} @@ -867,7 +867,7 @@ func TestScheduler_NewJobTask(t *testing.T) { } func TestScheduler_WithOptionsErrors(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string opt SchedulerOption @@ -929,7 +929,7 @@ func TestScheduler_WithOptionsErrors(t *testing.T) { } func TestScheduler_Singleton(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string duration time.Duration @@ -992,7 +992,7 @@ func TestScheduler_Singleton(t *testing.T) { } func TestScheduler_LimitMode(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string numJobs int @@ -1064,7 +1064,7 @@ func TestScheduler_LimitMode(t *testing.T) { } func TestScheduler_LimitModeAndSingleton(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string numJobs int @@ -1205,7 +1205,7 @@ func TestScheduler_WithDistributed(t *testing.T) { notLocked := make(chan struct{}, 10) notLeader := make(chan struct{}, 10) - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string count int @@ -1354,7 +1354,7 @@ func TestScheduler_WithDistributed(t *testing.T) { } func TestScheduler_RemoveJob(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string addJob bool @@ -1392,8 +1392,61 @@ func TestScheduler_RemoveJob(t *testing.T) { } } +func TestScheduler_JobsWaitingInQueue(t *testing.T) { + defer goleak.VerifyNone(t) + tests := []struct { + name string + limit uint + mode LimitMode + startAt func() OneTimeJobStartAtOption + expectedInQueue int + }{ + { + "with mode wait limit 1", + 1, + LimitModeWait, + func() OneTimeJobStartAtOption { + return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond)) + }, + 4, + }, + { + "with mode wait limit 10", + 10, + LimitModeWait, + func() OneTimeJobStartAtOption { + return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond)) + }, + 0, + }, + { + "with mode Reschedule", + 1, + LimitModeReschedule, + func() OneTimeJobStartAtOption { + return OneTimeJobStartDateTime(time.Now().Add(10 * time.Millisecond)) + }, + 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t, WithLimitConcurrentJobs(tt.limit, tt.mode)) + for i := 0; i <= 4; i++ { + _, err := s.NewJob(OneTimeJob(tt.startAt()), NewTask(func() { time.Sleep(500 * time.Millisecond) })) + require.NoError(t, err) + } + s.Start() + time.Sleep(20 * time.Millisecond) + assert.Equal(t, tt.expectedInQueue, s.JobsWaitingInQueue()) + require.NoError(t, s.Shutdown()) + }) + } +} + func TestScheduler_RemoveLotsOfJobs(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string numJobs int @@ -1435,7 +1488,7 @@ func TestScheduler_RemoveLotsOfJobs(t *testing.T) { } func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) s := newTestScheduler(t) s.Start() @@ -1458,7 +1511,7 @@ func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) { } func TestScheduler_WithEventListeners(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) listenerRunCh := make(chan error, 1) testErr := fmt.Errorf("test error") @@ -1733,7 +1786,7 @@ func TestScheduler_RunJobNow(t *testing.T) { } func TestScheduler_LastRunSingleton(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string @@ -1846,7 +1899,7 @@ func TestScheduler_OneTimeJob(t *testing.T) { } func TestScheduler_WithLimitedRuns(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string @@ -1984,7 +2037,7 @@ func (t *testMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, } func TestScheduler_WithMonitor(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) tests := []struct { name string jd JobDefinition