From 345ad8ad3abd09396daf3bf1d97824fd2f3a77ae Mon Sep 17 00:00:00 2001 From: iyashjayesh Date: Tue, 11 Nov 2025 19:16:34 +0530 Subject: [PATCH] feat() updated with remaning metrics & events (JobRegistered/JobUnregistered, JobStarted/JobRunning/JobFailed/JobCompleted) --- errors.go | 3 +- executor.go | 36 +- go.mod | 2 +- gocron-monitor-test/debug_restart.go | 150 ++++++++ gocron-monitor-test/go.mod | 13 + gocron-monitor-test/go.sum | 16 + scheduler.go | 78 +++- scheduler_monitor.go | 22 +- scheduler_monitor_test.go | 553 +++++++++++++++++++++------ 9 files changed, 728 insertions(+), 145 deletions(-) create mode 100644 gocron-monitor-test/debug_restart.go create mode 100644 gocron-monitor-test/go.mod create mode 100644 gocron-monitor-test/go.sum diff --git a/errors.go b/errors.go index e087adb..63a86f3 100644 --- a/errors.go +++ b/errors.go @@ -48,6 +48,7 @@ var ( ErrWithDistributedLockerNil = errors.New("gocron: WithDistributedLocker: locker must not be nil") ErrWithDistributedJobLockerNil = errors.New("gocron: WithDistributedJobLocker: locker must not be nil") ErrWithIdentifierNil = errors.New("gocron: WithIdentifier: identifier must not be nil") + ErrSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: monitor must not be nil") ErrWithLimitConcurrentJobsZero = errors.New("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLocationNil = errors.New("gocron: WithLocation: location must not be nil") ErrWithLoggerNil = errors.New("gocron: WithLogger: logger must not be nil") @@ -59,7 +60,7 @@ var ( ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end") ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start") ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0") - ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil") + ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil") ) // internal errors diff --git a/executor.go b/executor.go index c5c2a03..0830753 100644 --- a/executor.go +++ b/executor.go @@ -56,6 +56,8 @@ type executor struct { monitor Monitor // monitorStatus for reporting metrics monitorStatus MonitorStatus + // reference to parent scheduler for lifecycle notifications + scheduler *scheduler } type jobIn struct { @@ -416,18 +418,36 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) + // Notify job started + if e.scheduler != nil { + out := e.scheduler.jobFromInternalJob(j) + var jobIface Job = out + e.scheduler.notifyJobStarted(&jobIface) + } + err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name) if err != nil { e.sendOutForRescheduling(&jIn) - select { case e.jobsOutCompleted <- j.id: case <-e.ctx.Done(): } - + // Notify job failed (before actual run) + if e.scheduler != nil { + out := e.scheduler.jobFromInternalJob(j) + var jobIface Job = out + e.scheduler.notifyJobFailed(&jobIface, err) + } return } + // Notify job running + if e.scheduler != nil { + out := e.scheduler.jobFromInternalJob(j) + var jobIface Job = out + e.scheduler.notifyJobRunning(&jobIface) + } + // For intervalFromCompletion, we need to reschedule AFTER the job completes, // not before. For regular jobs, we reschedule before execution (existing behavior). if !j.intervalFromCompletion { @@ -449,10 +469,22 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) e.incrementJobCounter(j, Fail) e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err) + // Notify job failed + if e.scheduler != nil { + out := e.scheduler.jobFromInternalJob(j) + var jobIface Job = out + e.scheduler.notifyJobFailed(&jobIface, err) + } } else { _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) e.incrementJobCounter(j, Success) e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil) + // Notify job completed + if e.scheduler != nil { + out := e.scheduler.jobFromInternalJob(j) + var jobIface Job = out + e.scheduler.notifyJobCompleted(&jobIface) + } } // For intervalFromCompletion, reschedule AFTER the job completes diff --git a/go.mod b/go.mod index 0bd95cb..12c6440 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/go-co-op/gocron/v2 -go 1.24.0 +go 1.21.4 require ( github.com/google/uuid v1.6.0 diff --git a/gocron-monitor-test/debug_restart.go b/gocron-monitor-test/debug_restart.go new file mode 100644 index 0000000..c5711a0 --- /dev/null +++ b/gocron-monitor-test/debug_restart.go @@ -0,0 +1,150 @@ +package main + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" +) + +type DebugMonitor struct { + startCount int + stopCount int + jobRegCount int + jobUnregCount int + jobStartCount int + jobRunningCount int + jobCompletCount int + jobFailCount int +} + +func (m *DebugMonitor) SchedulerStarted() { + m.startCount++ + fmt.Printf("✓ SchedulerStarted() called (total: %d)\n", m.startCount) +} + +func (m *DebugMonitor) SchedulerShutdown() { + m.stopCount++ + fmt.Printf("✓ SchedulerShutdown() called (total: %d)\n", m.stopCount) +} + +func (m *DebugMonitor) JobRegistered(job *gocron.Job) { + m.jobRegCount++ + fmt.Printf("✓ JobRegistered() called (total: %d) - Job ID: %s\n", m.jobRegCount, (*job).ID()) +} + +func (m *DebugMonitor) JobUnregistered(job *gocron.Job) { + m.jobUnregCount++ + fmt.Printf("✓ JobUnregistered() called (total: %d) - Job ID: %s\n", m.jobUnregCount, (*job).ID()) +} + +func (m *DebugMonitor) JobStarted(job *gocron.Job) { + m.jobStartCount++ + fmt.Printf("✓ JobStarted() called (total: %d) - Job ID: %s\n", m.jobStartCount, (*job).ID()) +} + +func (m *DebugMonitor) JobRunning(job *gocron.Job) { + m.jobRunningCount++ + fmt.Printf("✓ JobRunning() called (total: %d) - Job ID: %s\n", m.jobRunningCount, (*job).ID()) +} + +func (m *DebugMonitor) JobCompleted(job *gocron.Job) { + m.jobCompletCount++ + fmt.Printf("✓ JobCompleted() called (total: %d) - Job ID: %s\n", m.jobCompletCount, (*job).ID()) +} + +func (m *DebugMonitor) JobFailed(job *gocron.Job, err error) { + m.jobFailCount++ + fmt.Printf("✓ JobFailed() called (total: %d) - Job ID: %s, Error: %v\n", m.jobFailCount, (*job).ID(), err) +} + +func main() { + // ONE monitor, multiple scheduler instances + monitor := &DebugMonitor{} + + fmt.Println("=== Cycle 1 (Scheduler Instance 1) ===") + s1, err := gocron.NewScheduler( + gocron.WithSchedulerMonitor(monitor), + ) + if err != nil { + panic(err) + } + + // Create and register some test jobs + fmt.Println("Creating jobs...") + _, err = s1.NewJob( + gocron.DurationJob(1*time.Second), + gocron.NewTask(func() { fmt.Println("Job 1 running") }), + ) + if err != nil { + panic(err) + } + + _, err = s1.NewJob( + gocron.DurationJob(2*time.Second), + gocron.NewTask(func() error { + fmt.Println("Job 2 executing and returning error") + return fmt.Errorf("simulated job failure") + }), // This job will fail with error + ) + if err != nil { + panic(err) + } + + fmt.Println("Calling Start()...") + s1.Start() + time.Sleep(3 * time.Second) // Wait for jobs to execute + + fmt.Println("Calling Shutdown()...") + err = s1.Shutdown() + if err != nil { + fmt.Printf("Shutdown error: %v\n", err) + } + + fmt.Println("\n=== Cycle 2 (Job Updates) ===") + s2, err := gocron.NewScheduler( + gocron.WithSchedulerMonitor(monitor), + ) + if err != nil { + panic(err) + } + + fmt.Println("Creating and updating jobs...") + job3, err := s2.NewJob( + gocron.DurationJob(1*time.Second), + gocron.NewTask(func() { fmt.Println("Job 3 running") }), + ) + if err != nil { + panic(err) + } + + // Update the job + _, err = s2.Update( + job3.ID(), + gocron.DurationJob(2*time.Second), + gocron.NewTask(func() { fmt.Println("Job 3 updated") }), + ) + if err != nil { + panic(err) + } + + fmt.Println("Calling Start()...") + s2.Start() + time.Sleep(3 * time.Second) + + fmt.Println("Calling Shutdown()...") + err = s2.Shutdown() + if err != nil { + fmt.Printf("Shutdown error: %v\n", err) + } + + fmt.Println("\n=== Summary ===") + fmt.Printf("Total Scheduler Starts: %d\n", monitor.startCount) + fmt.Printf("Total Scheduler Stops: %d\n", monitor.stopCount) + fmt.Printf("Total Jobs Registered: %d\n", monitor.jobRegCount) + fmt.Printf("Total Jobs Unregistered: %d\n", monitor.jobUnregCount) + fmt.Printf("Total Jobs Started: %d\n", monitor.jobStartCount) + fmt.Printf("Total Jobs Running: %d\n", monitor.jobRunningCount) + fmt.Printf("Total Jobs Completed: %d\n", monitor.jobCompletCount) + fmt.Printf("Total Jobs Failed: %d\n", monitor.jobFailCount) +} diff --git a/gocron-monitor-test/go.mod b/gocron-monitor-test/go.mod new file mode 100644 index 0000000..6c479ac --- /dev/null +++ b/gocron-monitor-test/go.mod @@ -0,0 +1,13 @@ +module test + +go 1.21.4 + +require github.com/go-co-op/gocron/v2 v2.17.0 + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/jonboulle/clockwork v0.5.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect +) + +replace github.com/go-co-op/gocron/v2 => ../ diff --git a/gocron-monitor-test/go.sum b/gocron-monitor-test/go.sum new file mode 100644 index 0000000..0bfec8d --- /dev/null +++ b/gocron-monitor-test/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I= +github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduler.go b/scheduler.go index 0d7ff78..93b5ab5 100644 --- a/scheduler.go +++ b/scheduler.go @@ -7,7 +7,6 @@ import ( "runtime" "slices" "strings" - "sync" "sync/atomic" "time" @@ -104,8 +103,6 @@ type scheduler struct { // scheduler monitor from which metrics can be collected schedulerMonitor SchedulerMonitor - // mutex to protect access to the scheduler monitor - schedulerMonitorMu sync.RWMutex } type newJobIn struct { @@ -154,7 +151,6 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { s := &scheduler{ shutdownCtx: schCtx, shutdownCancel: cancel, - exec: exec, jobs: make(map[uuid.UUID]internalJob), location: time.Local, logger: &noOpLogger{}, @@ -170,6 +166,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { runJobRequestCh: make(chan runJobRequest), allJobsOutRequest: make(chan allJobsOutRequest), } + exec.scheduler = s + s.exec = exec for _, option := range options { err := option(s) @@ -329,6 +327,11 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { if !ok { return } + if s.schedulerMonitor != nil { + out := s.jobFromInternalJob(j) + var job Job = &out + s.notifyJobUnregistered(&job) + } j.stop() delete(s.jobs, id) } @@ -537,6 +540,11 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) { for _, j := range s.jobs { for _, tag := range tags { if slices.Contains(j.tags, tag) { + if s.schedulerMonitor != nil { + out := s.jobFromInternalJob(j) + var job Job = &out + s.notifyJobUnregistered(&job) + } j.stop() delete(s.jobs, j.id) break @@ -697,7 +705,7 @@ func (s *scheduler) verifyParameterType(taskFunc reflect.Value, tsk task) error return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength) } -var contextType = reflect.TypeFor[context.Context]() +var contextType = reflect.TypeOf((*context.Context)(nil)).Elem() func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) { j := internalJob{} @@ -795,6 +803,10 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW } out := s.jobFromInternalJob(j) + if s.schedulerMonitor != nil { + var job Job = out + s.notifyJobRegistered(&job) + } return &out, nil } @@ -862,7 +874,7 @@ func (s *scheduler) Shutdown() error { t.Stop() // notify monitor that scheduler stopped - s.notifySchedulerStopped() + s.notifySchedulerShutdown() return err case <-t.C: return ErrStopSchedulerTimedOut @@ -1075,8 +1087,6 @@ func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption { if monitor == nil { return ErrSchedulerMonitorNil } - s.schedulerMonitorMu.Lock() - defer s.schedulerMonitorMu.Unlock() s.schedulerMonitor = monitor return nil } @@ -1084,18 +1094,56 @@ func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption { // notifySchedulerStarted notifies the monitor that scheduler has started func (s *scheduler) notifySchedulerStarted() { - s.schedulerMonitorMu.RLock() - defer s.schedulerMonitorMu.RUnlock() if s.schedulerMonitor != nil { s.schedulerMonitor.SchedulerStarted() } } -// notifySchedulerStopped notifies the monitor that scheduler has stopped -func (s *scheduler) notifySchedulerStopped() { - s.schedulerMonitorMu.RLock() - defer s.schedulerMonitorMu.RUnlock() +// notifySchedulerShutdown notifies the monitor that scheduler has stopped +func (s *scheduler) notifySchedulerShutdown() { if s.schedulerMonitor != nil { - s.schedulerMonitor.SchedulerStopped() + s.schedulerMonitor.SchedulerShutdown() + } +} + +// notifyJobRegistered notifies the monitor that a job has been registered +func (s *scheduler) notifyJobRegistered(job *Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobRegistered(job) + } +} + +// notifyJobUnregistered notifies the monitor that a job has been unregistered +func (s *scheduler) notifyJobUnregistered(job *Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobUnregistered(job) + } +} + +// notifyJobStarted notifies the monitor that a job has started +func (s *scheduler) notifyJobStarted(job *Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobStarted(job) + } +} + +// notifyJobRunning notifies the monitor that a job is running. +func (s *scheduler) notifyJobRunning(job *Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobRunning(job) + } +} + +// notifyJobCompleted notifies the monitor that a job has completed. +func (s *scheduler) notifyJobCompleted(job *Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobCompleted(job) + } +} + +// notifyJobFailed notifies the monitor that a job has failed. +func (s *scheduler) notifyJobFailed(job *Job, err error) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobFailed(job, err) } } diff --git a/scheduler_monitor.go b/scheduler_monitor.go index ab788c8..f5619e1 100644 --- a/scheduler_monitor.go +++ b/scheduler_monitor.go @@ -6,6 +6,24 @@ type SchedulerMonitor interface { // SchedulerStarted is called when Start() is invoked on the scheduler. SchedulerStarted() - // SchedulerStopped is called when Shutdown() completes successfully. - SchedulerStopped() + // SchedulerShutdown is called when Shutdown() completes successfully. + SchedulerShutdown() + + // JobRegistered is called when a job is registered with the scheduler. + JobRegistered(job *Job) + + // JobUnregistered is called when a job is unregistered from the scheduler. + JobUnregistered(job *Job) + + // JobStarted is called when a job starts running. + JobStarted(job *Job) + + // JobRunning is called when a job is running. + JobRunning(job *Job) + + // JobFailed is called when a job fails to complete successfully. + JobFailed(job *Job, err error) + + // JobCompleted is called when a job has completed running. + JobCompleted(job *Job) } diff --git a/scheduler_monitor_test.go b/scheduler_monitor_test.go index e686635..889d97a 100644 --- a/scheduler_monitor_test.go +++ b/scheduler_monitor_test.go @@ -1,7 +1,9 @@ package gocron import ( + "fmt" "sync" + "sync/atomic" "testing" "time" @@ -9,140 +11,443 @@ import ( "github.com/stretchr/testify/require" ) -// mockSchedulerMonitor is a test implementation -type mockSchedulerMonitor struct { - mu sync.Mutex - schedulerStartedCount int - schedulerStoppedCount int +// testSchedulerMonitor is a test implementation of SchedulerMonitor +// that tracks scheduler lifecycle events +type testSchedulerMonitor struct { + mu sync.RWMutex + startedCount int64 + shutdownCount int64 + jobRegCount int64 + jobUnregCount int64 + jobStartCount int64 + jobRunningCount int64 + jobCompletedCount int64 + jobFailedCount int64 + startedCalls []time.Time + shutdownCalls []time.Time + jobRegCalls []Job + jobUnregCalls []Job + jobStartCalls []Job + jobRunningCalls []Job + jobCompletedCalls []Job + jobFailedCalls struct { + jobs []Job + errs []error + } } -// SchedulerStarted increments the started count -func (m *mockSchedulerMonitor) SchedulerStarted() { - m.mu.Lock() - defer m.mu.Unlock() - m.schedulerStartedCount++ -} - -// SchedulerStopped increments the stopped count -func (m *mockSchedulerMonitor) SchedulerStopped() { - m.mu.Lock() - defer m.mu.Unlock() - m.schedulerStoppedCount++ -} - -// getSchedulerStartedCount returns the count of SchedulerStarted calls -func (m *mockSchedulerMonitor) getSchedulerStartedCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.schedulerStartedCount -} - -// getSchedulerStoppedCount returns the count of SchedulerStopped calls -func (m *mockSchedulerMonitor) getSchedulerStoppedCount() int { - m.mu.Lock() - defer m.mu.Unlock() - return m.schedulerStoppedCount -} - -// Test that SchedulerStopped is called -func TestSchedulerMonitor_SchedulerStopped(t *testing.T) { - monitor := &mockSchedulerMonitor{} - s, err := NewScheduler(WithSchedulerMonitor(monitor)) - require.NoError(t, err) - - s.Start() - time.Sleep(10 * time.Millisecond) - - err = s.Shutdown() - require.NoError(t, err) - - // verify SchedulerStopped was called exactly once - assert.Equal(t, 1, monitor.getSchedulerStoppedCount()) -} - -// Test multiple start/stop cycles -func TestSchedulerMonitor_MultipleStartStopCycles(t *testing.T) { - monitor := &mockSchedulerMonitor{} - s, err := NewScheduler(WithSchedulerMonitor(monitor)) - require.NoError(t, err) - - // first cycle - s.Start() - time.Sleep(10 * time.Millisecond) - err = s.Shutdown() - require.NoError(t, err) - - // second cycle - s.Start() - time.Sleep(10 * time.Millisecond) - err = s.Shutdown() - require.NoError(t, err) - - // verifying counts - assert.Equal(t, 2, monitor.getSchedulerStartedCount()) - assert.Equal(t, 2, monitor.getSchedulerStoppedCount()) -} - -// Test that start and stop are called in correct order -func TestSchedulerMonitor_StartStopOrder(t *testing.T) { - events := []string{} - var mu sync.Mutex - - monitor := &orderTrackingMonitor{ - onStart: func() { - mu.Lock() - events = append(events, "started") - mu.Unlock() - }, - onStop: func() { - mu.Lock() - events = append(events, "stopped") - mu.Unlock() +func newTestSchedulerMonitor() *testSchedulerMonitor { + return &testSchedulerMonitor{ + startedCalls: make([]time.Time, 0), + shutdownCalls: make([]time.Time, 0), + jobRegCalls: make([]Job, 0), + jobUnregCalls: make([]Job, 0), + jobStartCalls: make([]Job, 0), + jobRunningCalls: make([]Job, 0), + jobCompletedCalls: make([]Job, 0), + jobFailedCalls: struct { + jobs []Job + errs []error + }{ + jobs: make([]Job, 0), + errs: make([]error, 0), }, } +} - s, err := NewScheduler(WithSchedulerMonitor(monitor)) +func (t *testSchedulerMonitor) SchedulerStarted() { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.startedCount, 1) + t.startedCalls = append(t.startedCalls, time.Now()) +} + +func (t *testSchedulerMonitor) SchedulerShutdown() { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.shutdownCount, 1) + t.shutdownCalls = append(t.shutdownCalls, time.Now()) +} + +func (t *testSchedulerMonitor) getStartedCount() int64 { + return atomic.LoadInt64(&t.startedCount) +} + +func (t *testSchedulerMonitor) getShutdownCount() int64 { + return atomic.LoadInt64(&t.shutdownCount) +} + +func (t *testSchedulerMonitor) getStartedCalls() []time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + return append([]time.Time{}, t.startedCalls...) +} + +func (t *testSchedulerMonitor) getShutdownCalls() []time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + return append([]time.Time{}, t.shutdownCalls...) +} + +func (t *testSchedulerMonitor) JobRegistered(job *Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobRegCount, 1) + t.jobRegCalls = append(t.jobRegCalls, *job) +} + +func (t *testSchedulerMonitor) JobUnregistered(job *Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobUnregCount, 1) + t.jobUnregCalls = append(t.jobUnregCalls, *job) +} + +func (t *testSchedulerMonitor) JobStarted(job *Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobStartCount, 1) + t.jobStartCalls = append(t.jobStartCalls, *job) +} + +func (t *testSchedulerMonitor) JobRunning(job *Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobRunningCount, 1) + t.jobRunningCalls = append(t.jobRunningCalls, *job) +} + +func (t *testSchedulerMonitor) JobCompleted(job *Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobCompletedCount, 1) + t.jobCompletedCalls = append(t.jobCompletedCalls, *job) +} + +func (t *testSchedulerMonitor) JobFailed(job *Job, err error) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobFailedCount, 1) + t.jobFailedCalls.jobs = append(t.jobFailedCalls.jobs, *job) + t.jobFailedCalls.errs = append(t.jobFailedCalls.errs, err) +} + +func (t *testSchedulerMonitor) getJobRegCount() int64 { + return atomic.LoadInt64(&t.jobRegCount) +} + +// func (t *testSchedulerMonitor) getJobUnregCount() int64 { +// return atomic.LoadInt64(&t.jobUnregCount) +// } + +func (t *testSchedulerMonitor) getJobStartCount() int64 { + return atomic.LoadInt64(&t.jobStartCount) +} + +func (t *testSchedulerMonitor) getJobRunningCount() int64 { + return atomic.LoadInt64(&t.jobRunningCount) +} + +func (t *testSchedulerMonitor) getJobCompletedCount() int64 { + return atomic.LoadInt64(&t.jobCompletedCount) +} + +func (t *testSchedulerMonitor) getJobFailedCount() int64 { + return atomic.LoadInt64(&t.jobFailedCount) +} + +// func (t *testSchedulerMonitor) getJobRegCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobRegCalls...) +// } + +// func (t *testSchedulerMonitor) getJobUnregCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobUnregCalls...) +// } + +// func (t *testSchedulerMonitor) getJobStartCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobStartCalls...) +// } + +// func (t *testSchedulerMonitor) getJobRunningCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobRunningCalls...) +// } + +// func (t *testSchedulerMonitor) getJobCompletedCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobCompletedCalls...) +// } + +func (t *testSchedulerMonitor) getJobFailedCalls() ([]Job, []error) { + t.mu.RLock() + defer t.mu.RUnlock() + jobs := append([]Job{}, t.jobFailedCalls.jobs...) + errs := append([]error{}, t.jobFailedCalls.errs...) + return jobs, errs +} + +func TestSchedulerMonitor_Basic(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Before starting, monitor should not have been called + assert.Equal(t, int64(0), monitor.getStartedCount()) + assert.Equal(t, int64(0), monitor.getShutdownCount()) + + // Add a simple job + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + // Start the scheduler + s.Start() + + // Wait a bit for the start to complete + time.Sleep(50 * time.Millisecond) + + // SchedulerStarted should have been called once + assert.Equal(t, int64(1), monitor.getStartedCount()) + assert.Equal(t, int64(0), monitor.getShutdownCount()) + + // Shutdown the scheduler + err = s.Shutdown() + require.NoError(t, err) + + // SchedulerShutdown should have been called once + assert.Equal(t, int64(1), monitor.getStartedCount()) + assert.Equal(t, int64(1), monitor.getShutdownCount()) + + // Verify the order of calls + startedCalls := monitor.getStartedCalls() + shutdownCalls := monitor.getShutdownCalls() + require.Len(t, startedCalls, 1) + require.Len(t, shutdownCalls, 1) + assert.True(t, startedCalls[0].Before(shutdownCalls[0]), + "SchedulerStarted should be called before SchedulerShutdown") +} + +func TestSchedulerMonitor_MultipleStartStop(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + // Start and stop multiple times + s.Start() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int64(1), monitor.getStartedCount()) + + err = s.StopJobs() + require.NoError(t, err) + // StopJobs shouldn't call SchedulerShutdown + assert.Equal(t, int64(0), monitor.getShutdownCount()) + + // Start again + s.Start() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int64(2), monitor.getStartedCount()) + + // Final shutdown + err = s.Shutdown() + require.NoError(t, err) + assert.Equal(t, int64(1), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_WithoutMonitor(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Create scheduler without monitor - should not panic + s := newTestScheduler(t) + + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) require.NoError(t, err) s.Start() - time.Sleep(10 * time.Millisecond) - require.NoError(t, s.Shutdown()) - time.Sleep(10 * time.Millisecond) + time.Sleep(50 * time.Millisecond) - mu.Lock() - defer mu.Unlock() - assert.Equal(t, []string{"started", "stopped"}, events) -} - -// Helper monitor for tracking order -type orderTrackingMonitor struct { - onStart func() - onStop func() -} - -func (m *orderTrackingMonitor) SchedulerStarted() { - if m.onStart != nil { - m.onStart() - } -} - -func (m *orderTrackingMonitor) SchedulerStopped() { - if m.onStop != nil { - m.onStop() - } -} - -// Test that stopped is not called if scheduler wasn't started -func TestSchedulerMonitor_StoppedNotCalledIfNotStarted(t *testing.T) { - monitor := &mockSchedulerMonitor{} - s, err := NewScheduler(WithSchedulerMonitor(monitor)) - require.NoError(t, err) err = s.Shutdown() - if err != nil { - t.Logf("Shutdown returned error as expected: %v", err) + require.NoError(t, err) +} + +func TestSchedulerMonitor_NilMonitor(t *testing.T) { + // Attempting to create a scheduler with nil monitor should error + _, err := NewScheduler(WithSchedulerMonitor(nil)) + assert.Error(t, err) + assert.Equal(t, ErrSchedulerMonitorNil, err) +} + +func TestSchedulerMonitor_ConcurrentAccess(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Add multiple jobs + for i := 0; i < 10; i++ { + _, err := s.NewJob( + DurationJob(100*time.Millisecond), + NewTask(func() {}), + ) + require.NoError(t, err) } - // Depending on implementation, this might return an error - // But stopped should not be called - assert.Equal(t, 0, monitor.getSchedulerStoppedCount()) + // Start scheduler once (normal use case) + s.Start() + time.Sleep(150 * time.Millisecond) + + // Verify monitor was called + assert.Equal(t, int64(1), monitor.getStartedCount()) + + err := s.Shutdown() + require.NoError(t, err) + + // Monitor should be called for shutdown + assert.Equal(t, int64(1), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_StartWithoutJobs(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Start scheduler without any jobs + s.Start() + time.Sleep(50 * time.Millisecond) + + // Monitor should still be called + assert.Equal(t, int64(1), monitor.getStartedCount()) + + err := s.Shutdown() + require.NoError(t, err) + assert.Equal(t, int64(1), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_ShutdownWithoutStart(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + // Shutdown without starting + err = s.Shutdown() + require.NoError(t, err) + + // SchedulerStarted should not be called + assert.Equal(t, int64(0), monitor.getStartedCount()) + // SchedulerShutdown should not be called if scheduler was never started + assert.Equal(t, int64(0), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_ThreadSafety(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + + // Simulate concurrent calls to the monitor from multiple goroutines + var wg sync.WaitGroup + iterations := 100 + + for i := 0; i < iterations; i++ { + wg.Add(2) + go func() { + defer wg.Done() + monitor.SchedulerStarted() + }() + go func() { + defer wg.Done() + monitor.SchedulerShutdown() + }() + } + + wg.Wait() + + // Verify all calls were recorded + assert.Equal(t, int64(iterations), monitor.getStartedCount()) + assert.Equal(t, int64(iterations), monitor.getShutdownCount()) + assert.Len(t, monitor.getStartedCalls(), iterations) + assert.Len(t, monitor.getShutdownCalls(), iterations) +} + +func TestSchedulerMonitor_IntegrationWithJobs(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Test successful job + jobRunCount := atomic.Int32{} + _, err := s.NewJob( + DurationJob(50*time.Millisecond), + NewTask(func() { + jobRunCount.Add(1) + }), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + // Test failing job + _, err = s.NewJob( + DurationJob(50*time.Millisecond), + NewTask(func() error { + return fmt.Errorf("test error") + }), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + // Start scheduler + s.Start() + time.Sleep(150 * time.Millisecond) // Wait for jobs to execute + + // Verify scheduler lifecycle events + assert.Equal(t, int64(1), monitor.getStartedCount()) + assert.GreaterOrEqual(t, jobRunCount.Load(), int32(1)) + + // Verify job registration + assert.Equal(t, int64(2), monitor.getJobRegCount(), "Should have registered 2 jobs") + + // Verify job execution events + assert.GreaterOrEqual(t, monitor.getJobStartCount(), int64(1), "Jobs should have started") + assert.GreaterOrEqual(t, monitor.getJobRunningCount(), int64(1), "Jobs should be running") + assert.GreaterOrEqual(t, monitor.getJobCompletedCount(), int64(1), "Successful job should complete") + assert.GreaterOrEqual(t, monitor.getJobFailedCount(), int64(1), "Failing job should fail") + + // Get failed job details + failedJobs, errors := monitor.getJobFailedCalls() + assert.NotEmpty(t, failedJobs, "Should have recorded failed jobs") + assert.NotEmpty(t, errors, "Should have recorded job errors") + assert.Contains(t, errors[0].Error(), "test error", "Should record the correct error") + + // Shutdown + err = s.Shutdown() + require.NoError(t, err) + assert.Equal(t, int64(1), monitor.getShutdownCount()) }