diff --git a/README.md b/README.md index 90b5a06..3b9b7bd 100644 --- a/README.md +++ b/README.md @@ -169,12 +169,52 @@ The Logger interface can be implemented with your desired logging library. The provided NewLogger uses the standard library's log package. ### Metrics -Metrics may be collected from the execution of each job. +Metrics may be collected from the execution of each job and scheduler lifecycle events. - [**Monitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#Monitor): - [**MonitorStatus**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonitorStatus) (includes status and error (if any) of the Job) A monitor can be used to collect metrics for each job from a scheduler. - Implementations: [go-co-op monitors](https://github.com/go-co-op?q=-monitor&type=all&language=&sort=) (don't see what you need? request on slack to get a repo created to contribute it!) +- [**SchedulerMonitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#SchedulerMonitor): +A scheduler monitor provides comprehensive observability into scheduler and job lifecycle events. + + **Available Metrics:** + - **Scheduler Lifecycle**: `SchedulerStarted`, `SchedulerStopped`, `SchedulerShutdown` + - **Job Management**: `JobRegistered`, `JobUnregistered` - track jobs added/removed from scheduler + - **Job Execution**: `JobStarted`, `JobRunning`, `JobCompleted`, `JobFailed` - monitor job execution flow + - **Performance**: `JobExecutionTime`, `JobSchedulingDelay` - measure job duration and scheduling lag + - **Concurrency**: `ConcurrencyLimitReached` - detect when singleton or limit mode constraints are hit + + **Derived Metrics** (calculable from events): + - Error rate: `JobFailed / (JobCompleted + JobFailed)` + - Average execution time: from `JobExecutionTime` events + - Active jobs: `JobRegistered - JobUnregistered` + - Current queue depth: `JobStarted - (JobCompleted + JobFailed)` + + **Example - Prometheus Integration:** + ```go + type PrometheusMonitor struct { + jobsCompleted prometheus.Counter + jobsFailed prometheus.Counter + executionTime prometheus.Histogram + schedulingDelay prometheus.Histogram + } + + func (p *PrometheusMonitor) JobExecutionTime(job gocron.Job, duration time.Duration) { + p.executionTime.Observe(duration.Seconds()) + } + + func (p *PrometheusMonitor) JobSchedulingDelay(job gocron.Job, scheduled, actual time.Time) { + if delay := actual.Sub(scheduled); delay > 0 { + p.schedulingDelay.Observe(delay.Seconds()) + } + } + + // Initialize scheduler with monitor + s, _ := gocron.NewScheduler(gocron.WithSchedulerMonitor(monitor)) + ``` + + **Use Cases:** Prometheus metrics, custom dashboards, alerting systems, performance monitoring ### Testing The gocron library is set up to enable testing. diff --git a/errors.go b/errors.go index 36f7455..523da0d 100644 --- a/errors.go +++ b/errors.go @@ -48,6 +48,7 @@ var ( ErrWithDistributedLockerNil = errors.New("gocron: WithDistributedLocker: locker must not be nil") ErrWithDistributedJobLockerNil = errors.New("gocron: WithDistributedJobLocker: locker must not be nil") ErrWithIdentifierNil = errors.New("gocron: WithIdentifier: identifier must not be nil") + ErrSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: monitor must not be nil") ErrWithLimitConcurrentJobsZero = errors.New("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLocationNil = errors.New("gocron: WithLocation: location must not be nil") ErrWithLoggerNil = errors.New("gocron: WithLogger: logger must not be nil") @@ -59,6 +60,7 @@ var ( ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end") ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start") ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0") + ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil") ErrWithLimitedRunsZero = errors.New("gocron: WithLimitedRuns: limit must be greater than 0") ) diff --git a/executor.go b/executor.go index c5c2a03..ca4363c 100644 --- a/executor.go +++ b/executor.go @@ -56,6 +56,8 @@ type executor struct { monitor Monitor // monitorStatus for reporting metrics monitorStatus MonitorStatus + // reference to parent scheduler for lifecycle notifications + scheduler *scheduler } type jobIn struct { @@ -155,6 +157,15 @@ func (e *executor) start() { // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric + // Notify concurrency limit reached if monitor is configured + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + ctx2, cancel2 := context.WithCancel(executorCtx) + job := requestJobCtx(ctx2, jIn.id, e.jobOutRequest) + cancel2() + if job != nil { + e.scheduler.notifyConcurrencyLimitReached("limit", e.scheduler.jobFromInternalJob(*job)) + } + } e.sendOutForRescheduling(&jIn) } } else { @@ -209,6 +220,10 @@ func (e *executor) start() { // which means we just skip it here and do nothing e.incrementJobCounter(*j, SingletonRescheduled) e.sendOutForRescheduling(&jIn) + // Notify concurrency limit reached if monitor is configured + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + e.scheduler.notifyConcurrencyLimitReached("singleton", e.scheduler.jobFromInternalJob(*j)) + } } } else { // wait mode, fill up that queue (buffered channel, so it's ok) @@ -416,18 +431,36 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) + // Notify job started + actualStartTime := time.Now() + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + jobObj := e.scheduler.jobFromInternalJob(j) + e.scheduler.notifyJobStarted(jobObj) + // Notify scheduling delay if job had a scheduled time + if len(j.nextScheduled) > 0 { + e.scheduler.notifyJobSchedulingDelay(jobObj, j.nextScheduled[0], actualStartTime) + } + } + err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name) if err != nil { e.sendOutForRescheduling(&jIn) - select { case e.jobsOutCompleted <- j.id: case <-e.ctx.Done(): } - + // Notify job failed (before actual run) + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + e.scheduler.notifyJobFailed(e.scheduler.jobFromInternalJob(j), err) + } return } + // Notify job running + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + e.scheduler.notifyJobRunning(e.scheduler.jobFromInternalJob(j)) + } + // For intervalFromCompletion, we need to reschedule AFTER the job completes, // not before. For regular jobs, we reschedule before execution (existing behavior). if !j.intervalFromCompletion { @@ -448,11 +481,25 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { if err != nil { _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) e.incrementJobCounter(j, Fail) - e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err) + endTime := time.Now() + e.recordJobTimingWithStatus(startTime, endTime, j, Fail, err) + // Notify job failed + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + jobObj := e.scheduler.jobFromInternalJob(j) + e.scheduler.notifyJobFailed(jobObj, err) + e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime)) + } } else { _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) e.incrementJobCounter(j, Success) - e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil) + endTime := time.Now() + e.recordJobTimingWithStatus(startTime, endTime, j, Success, nil) + // Notify job completed + if e.scheduler != nil && e.scheduler.schedulerMonitor != nil { + jobObj := e.scheduler.jobFromInternalJob(j) + e.scheduler.notifyJobCompleted(jobObj) + e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime)) + } } // For intervalFromCompletion, reschedule AFTER the job completes diff --git a/go.mod b/go.mod index 0bd95cb..12c6440 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/go-co-op/gocron/v2 -go 1.24.0 +go 1.21.4 require ( github.com/google/uuid v1.6.0 diff --git a/gocron-monitor-test/debug_restart.go b/gocron-monitor-test/debug_restart.go new file mode 100644 index 0000000..c5711a0 --- /dev/null +++ b/gocron-monitor-test/debug_restart.go @@ -0,0 +1,150 @@ +package main + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" +) + +type DebugMonitor struct { + startCount int + stopCount int + jobRegCount int + jobUnregCount int + jobStartCount int + jobRunningCount int + jobCompletCount int + jobFailCount int +} + +func (m *DebugMonitor) SchedulerStarted() { + m.startCount++ + fmt.Printf("✓ SchedulerStarted() called (total: %d)\n", m.startCount) +} + +func (m *DebugMonitor) SchedulerShutdown() { + m.stopCount++ + fmt.Printf("✓ SchedulerShutdown() called (total: %d)\n", m.stopCount) +} + +func (m *DebugMonitor) JobRegistered(job *gocron.Job) { + m.jobRegCount++ + fmt.Printf("✓ JobRegistered() called (total: %d) - Job ID: %s\n", m.jobRegCount, (*job).ID()) +} + +func (m *DebugMonitor) JobUnregistered(job *gocron.Job) { + m.jobUnregCount++ + fmt.Printf("✓ JobUnregistered() called (total: %d) - Job ID: %s\n", m.jobUnregCount, (*job).ID()) +} + +func (m *DebugMonitor) JobStarted(job *gocron.Job) { + m.jobStartCount++ + fmt.Printf("✓ JobStarted() called (total: %d) - Job ID: %s\n", m.jobStartCount, (*job).ID()) +} + +func (m *DebugMonitor) JobRunning(job *gocron.Job) { + m.jobRunningCount++ + fmt.Printf("✓ JobRunning() called (total: %d) - Job ID: %s\n", m.jobRunningCount, (*job).ID()) +} + +func (m *DebugMonitor) JobCompleted(job *gocron.Job) { + m.jobCompletCount++ + fmt.Printf("✓ JobCompleted() called (total: %d) - Job ID: %s\n", m.jobCompletCount, (*job).ID()) +} + +func (m *DebugMonitor) JobFailed(job *gocron.Job, err error) { + m.jobFailCount++ + fmt.Printf("✓ JobFailed() called (total: %d) - Job ID: %s, Error: %v\n", m.jobFailCount, (*job).ID(), err) +} + +func main() { + // ONE monitor, multiple scheduler instances + monitor := &DebugMonitor{} + + fmt.Println("=== Cycle 1 (Scheduler Instance 1) ===") + s1, err := gocron.NewScheduler( + gocron.WithSchedulerMonitor(monitor), + ) + if err != nil { + panic(err) + } + + // Create and register some test jobs + fmt.Println("Creating jobs...") + _, err = s1.NewJob( + gocron.DurationJob(1*time.Second), + gocron.NewTask(func() { fmt.Println("Job 1 running") }), + ) + if err != nil { + panic(err) + } + + _, err = s1.NewJob( + gocron.DurationJob(2*time.Second), + gocron.NewTask(func() error { + fmt.Println("Job 2 executing and returning error") + return fmt.Errorf("simulated job failure") + }), // This job will fail with error + ) + if err != nil { + panic(err) + } + + fmt.Println("Calling Start()...") + s1.Start() + time.Sleep(3 * time.Second) // Wait for jobs to execute + + fmt.Println("Calling Shutdown()...") + err = s1.Shutdown() + if err != nil { + fmt.Printf("Shutdown error: %v\n", err) + } + + fmt.Println("\n=== Cycle 2 (Job Updates) ===") + s2, err := gocron.NewScheduler( + gocron.WithSchedulerMonitor(monitor), + ) + if err != nil { + panic(err) + } + + fmt.Println("Creating and updating jobs...") + job3, err := s2.NewJob( + gocron.DurationJob(1*time.Second), + gocron.NewTask(func() { fmt.Println("Job 3 running") }), + ) + if err != nil { + panic(err) + } + + // Update the job + _, err = s2.Update( + job3.ID(), + gocron.DurationJob(2*time.Second), + gocron.NewTask(func() { fmt.Println("Job 3 updated") }), + ) + if err != nil { + panic(err) + } + + fmt.Println("Calling Start()...") + s2.Start() + time.Sleep(3 * time.Second) + + fmt.Println("Calling Shutdown()...") + err = s2.Shutdown() + if err != nil { + fmt.Printf("Shutdown error: %v\n", err) + } + + fmt.Println("\n=== Summary ===") + fmt.Printf("Total Scheduler Starts: %d\n", monitor.startCount) + fmt.Printf("Total Scheduler Stops: %d\n", monitor.stopCount) + fmt.Printf("Total Jobs Registered: %d\n", monitor.jobRegCount) + fmt.Printf("Total Jobs Unregistered: %d\n", monitor.jobUnregCount) + fmt.Printf("Total Jobs Started: %d\n", monitor.jobStartCount) + fmt.Printf("Total Jobs Running: %d\n", monitor.jobRunningCount) + fmt.Printf("Total Jobs Completed: %d\n", monitor.jobCompletCount) + fmt.Printf("Total Jobs Failed: %d\n", monitor.jobFailCount) +} diff --git a/gocron-monitor-test/go.mod b/gocron-monitor-test/go.mod new file mode 100644 index 0000000..6c479ac --- /dev/null +++ b/gocron-monitor-test/go.mod @@ -0,0 +1,13 @@ +module test + +go 1.21.4 + +require github.com/go-co-op/gocron/v2 v2.17.0 + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/jonboulle/clockwork v0.5.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect +) + +replace github.com/go-co-op/gocron/v2 => ../ diff --git a/gocron-monitor-test/go.sum b/gocron-monitor-test/go.sum new file mode 100644 index 0000000..0bfec8d --- /dev/null +++ b/gocron-monitor-test/go.sum @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I= +github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduler.go b/scheduler.go index c57f826..a35c42c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -100,6 +100,9 @@ type scheduler struct { removeJobCh chan uuid.UUID // requests from the client to remove jobs by tags are received here removeJobsByTagsCh chan []string + + // scheduler monitor from which metrics can be collected + schedulerMonitor SchedulerMonitor } type newJobIn struct { @@ -148,7 +151,6 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { s := &scheduler{ shutdownCtx: schCtx, shutdownCancel: cancel, - exec: exec, jobs: make(map[uuid.UUID]internalJob), location: time.Local, logger: &noOpLogger{}, @@ -164,6 +166,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { runJobRequestCh: make(chan runJobRequest), allJobsOutRequest: make(chan allJobsOutRequest), } + exec.scheduler = s + s.exec = exec for _, option := range options { err := option(s) @@ -273,6 +277,9 @@ func (s *scheduler) stopScheduler() { s.stopErrCh <- err s.started.Store(false) s.logger.Debug("gocron: scheduler stopped") + + // Notify monitor that scheduler has stopped + s.notifySchedulerStopped() } func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { @@ -323,6 +330,10 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { if !ok { return } + if s.schedulerMonitor != nil { + out := s.jobFromInternalJob(j) + s.notifyJobUnregistered(out) + } j.stop() delete(s.jobs, id) } @@ -537,6 +548,10 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) { for _, j := range s.jobs { for _, tag := range tags { if slices.Contains(j.tags, tag) { + if s.schedulerMonitor != nil { + out := s.jobFromInternalJob(j) + s.notifyJobUnregistered(out) + } j.stop() delete(s.jobs, j.id) break @@ -702,7 +717,7 @@ func (s *scheduler) verifyParameterType(taskFunc reflect.Value, tsk task) error return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength) } -var contextType = reflect.TypeFor[context.Context]() +var contextType = reflect.TypeOf((*context.Context)(nil)).Elem() func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) { j := internalJob{} @@ -800,6 +815,9 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW } out := s.jobFromInternalJob(j) + if s.schedulerMonitor != nil { + s.notifyJobRegistered(out) + } return &out, nil } @@ -831,8 +849,13 @@ func (s *scheduler) Start() { select { case <-s.shutdownCtx.Done(): + // Scheduler already shut down, don't notify + return case s.startCh <- struct{}{}: - <-s.startedCh + <-s.startedCh // Wait for scheduler to actually start + + // Scheduler has started + s.notifySchedulerStarted() } } @@ -865,6 +888,9 @@ func (s *scheduler) Shutdown() error { select { case err := <-s.stopErrCh: t.Stop() + + // notify monitor that scheduler stopped + s.notifySchedulerShutdown() return err case <-t.C: return ErrStopSchedulerTimedOut @@ -1070,3 +1096,98 @@ func WithMonitorStatus(monitor MonitorStatus) SchedulerOption { return nil } } + +// WithSchedulerMonitor sets a monitor that will be called with scheduler-level events. +func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption { + return func(s *scheduler) error { + if monitor == nil { + return ErrSchedulerMonitorNil + } + s.schedulerMonitor = monitor + return nil + } +} + +// notifySchedulerStarted notifies the monitor that scheduler has started +func (s *scheduler) notifySchedulerStarted() { + if s.schedulerMonitor != nil { + s.schedulerMonitor.SchedulerStarted() + } +} + +// notifySchedulerShutdown notifies the monitor that scheduler has stopped +func (s *scheduler) notifySchedulerShutdown() { + if s.schedulerMonitor != nil { + s.schedulerMonitor.SchedulerShutdown() + } +} + +// notifyJobRegistered notifies the monitor that a job has been registered +func (s *scheduler) notifyJobRegistered(job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobRegistered(job) + } +} + +// notifyJobUnregistered notifies the monitor that a job has been unregistered +func (s *scheduler) notifyJobUnregistered(job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobUnregistered(job) + } +} + +// notifyJobStarted notifies the monitor that a job has started +func (s *scheduler) notifyJobStarted(job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobStarted(job) + } +} + +// notifyJobRunning notifies the monitor that a job is running. +func (s *scheduler) notifyJobRunning(job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobRunning(job) + } +} + +// notifyJobCompleted notifies the monitor that a job has completed. +func (s *scheduler) notifyJobCompleted(job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobCompleted(job) + } +} + +// notifyJobFailed notifies the monitor that a job has failed. +func (s *scheduler) notifyJobFailed(job Job, err error) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobFailed(job, err) + } +} + +// notifySchedulerStopped notifies the monitor that the scheduler has stopped +func (s *scheduler) notifySchedulerStopped() { + if s.schedulerMonitor != nil { + s.schedulerMonitor.SchedulerStopped() + } +} + +// notifyJobExecutionTime notifies the monitor of a job's execution time +func (s *scheduler) notifyJobExecutionTime(job Job, duration time.Duration) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobExecutionTime(job, duration) + } +} + +// notifyJobSchedulingDelay notifies the monitor of scheduling delay +func (s *scheduler) notifyJobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobSchedulingDelay(job, scheduledTime, actualStartTime) + } +} + +// notifyConcurrencyLimitReached notifies the monitor that a concurrency limit was reached +func (s *scheduler) notifyConcurrencyLimitReached(limitType string, job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.ConcurrencyLimitReached(limitType, job) + } +} diff --git a/scheduler_monitor.go b/scheduler_monitor.go new file mode 100644 index 0000000..b097025 --- /dev/null +++ b/scheduler_monitor.go @@ -0,0 +1,50 @@ +package gocron + +import "time" + +// SchedulerMonitor is called by the Scheduler to provide scheduler-level +// metrics and events. +type SchedulerMonitor interface { + // SchedulerStarted is called when Start() is invoked on the scheduler. + SchedulerStarted() + + // SchedulerStopped is called when the scheduler's main loop stops, + // but before final cleanup in Shutdown(). + SchedulerStopped() + + // SchedulerShutdown is called when Shutdown() completes successfully. + SchedulerShutdown() + + // JobRegistered is called when a job is registered with the scheduler. + JobRegistered(job Job) + + // JobUnregistered is called when a job is unregistered from the scheduler. + JobUnregistered(job Job) + + // JobStarted is called when a job starts running. + JobStarted(job Job) + + // JobRunning is called when a job is running. + JobRunning(job Job) + + // JobFailed is called when a job fails to complete successfully. + JobFailed(job Job, err error) + + // JobCompleted is called when a job has completed running. + JobCompleted(job Job) + + // JobExecutionTime is called after a job completes (success or failure) + // with the time it took to execute. This enables calculation of metrics + // like AverageExecutionTime. + JobExecutionTime(job Job, duration time.Duration) + + // JobSchedulingDelay is called when a job starts running, providing both + // the scheduled time and actual start time. This enables calculation of + // SchedulingLag metrics to detect when jobs are running behind schedule. + JobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time) + + // ConcurrencyLimitReached is called when a job cannot start immediately + // due to concurrency limits (singleton or limit mode). + // limitType will be "singleton" or "limit". + ConcurrencyLimitReached(limitType string, job Job) +} diff --git a/scheduler_monitor_test.go b/scheduler_monitor_test.go new file mode 100644 index 0000000..5c64313 --- /dev/null +++ b/scheduler_monitor_test.go @@ -0,0 +1,498 @@ +package gocron + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// testSchedulerMonitor is a test implementation of SchedulerMonitor +// that tracks scheduler lifecycle events +type testSchedulerMonitor struct { + mu sync.RWMutex + startedCount int64 + stoppedCount int64 + shutdownCount int64 + jobRegCount int64 + jobUnregCount int64 + jobStartCount int64 + jobRunningCount int64 + jobCompletedCount int64 + jobFailedCount int64 + concurrencyLimitCount int64 + startedCalls []time.Time + stoppedCalls []time.Time + shutdownCalls []time.Time + jobRegCalls []Job + jobUnregCalls []Job + jobStartCalls []Job + jobRunningCalls []Job + jobCompletedCalls []Job + jobExecutionTimes []time.Duration + jobSchedulingDelays []time.Duration + concurrencyLimitCalls []string + jobFailedCalls struct { + jobs []Job + errs []error + } +} + +func newTestSchedulerMonitor() *testSchedulerMonitor { + return &testSchedulerMonitor{ + startedCalls: make([]time.Time, 0), + stoppedCalls: make([]time.Time, 0), + shutdownCalls: make([]time.Time, 0), + jobRegCalls: make([]Job, 0), + jobUnregCalls: make([]Job, 0), + jobStartCalls: make([]Job, 0), + jobRunningCalls: make([]Job, 0), + jobCompletedCalls: make([]Job, 0), + jobExecutionTimes: make([]time.Duration, 0), + jobSchedulingDelays: make([]time.Duration, 0), + concurrencyLimitCalls: make([]string, 0), + jobFailedCalls: struct { + jobs []Job + errs []error + }{ + jobs: make([]Job, 0), + errs: make([]error, 0), + }, + } +} + +func (t *testSchedulerMonitor) SchedulerStarted() { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.startedCount, 1) + t.startedCalls = append(t.startedCalls, time.Now()) +} + +func (t *testSchedulerMonitor) SchedulerStopped() { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.stoppedCount, 1) + t.stoppedCalls = append(t.stoppedCalls, time.Now()) +} + +func (t *testSchedulerMonitor) SchedulerShutdown() { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.shutdownCount, 1) + t.shutdownCalls = append(t.shutdownCalls, time.Now()) +} + +func (t *testSchedulerMonitor) getStartedCount() int64 { + return atomic.LoadInt64(&t.startedCount) +} + +func (t *testSchedulerMonitor) getShutdownCount() int64 { + return atomic.LoadInt64(&t.shutdownCount) +} + +func (t *testSchedulerMonitor) getStartedCalls() []time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + return append([]time.Time{}, t.startedCalls...) +} + +func (t *testSchedulerMonitor) getShutdownCalls() []time.Time { + t.mu.RLock() + defer t.mu.RUnlock() + return append([]time.Time{}, t.shutdownCalls...) +} + +func (t *testSchedulerMonitor) JobRegistered(job Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobRegCount, 1) + t.jobRegCalls = append(t.jobRegCalls, job) +} + +func (t *testSchedulerMonitor) JobUnregistered(job Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobUnregCount, 1) + t.jobUnregCalls = append(t.jobUnregCalls, job) +} + +func (t *testSchedulerMonitor) JobStarted(job Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobStartCount, 1) + t.jobStartCalls = append(t.jobStartCalls, job) +} + +func (t *testSchedulerMonitor) JobRunning(job Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobRunningCount, 1) + t.jobRunningCalls = append(t.jobRunningCalls, job) +} + +func (t *testSchedulerMonitor) JobCompleted(job Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobCompletedCount, 1) + t.jobCompletedCalls = append(t.jobCompletedCalls, job) +} + +func (t *testSchedulerMonitor) JobFailed(job Job, err error) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.jobFailedCount, 1) + t.jobFailedCalls.jobs = append(t.jobFailedCalls.jobs, job) + t.jobFailedCalls.errs = append(t.jobFailedCalls.errs, err) +} + +func (t *testSchedulerMonitor) JobExecutionTime(_ Job, duration time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + t.jobExecutionTimes = append(t.jobExecutionTimes, duration) +} + +func (t *testSchedulerMonitor) JobSchedulingDelay(_ Job, scheduledTime time.Time, actualStartTime time.Time) { + t.mu.Lock() + defer t.mu.Unlock() + delay := actualStartTime.Sub(scheduledTime) + if delay > 0 { + t.jobSchedulingDelays = append(t.jobSchedulingDelays, delay) + } +} + +func (t *testSchedulerMonitor) ConcurrencyLimitReached(limitType string, _ Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.concurrencyLimitCount, 1) + t.concurrencyLimitCalls = append(t.concurrencyLimitCalls, limitType) +} + +func (t *testSchedulerMonitor) getJobRegCount() int64 { + return atomic.LoadInt64(&t.jobRegCount) +} + +func (t *testSchedulerMonitor) getJobUnregCount() int64 { + return atomic.LoadInt64(&t.jobUnregCount) +} + +func (t *testSchedulerMonitor) getJobStartCount() int64 { + return atomic.LoadInt64(&t.jobStartCount) +} + +func (t *testSchedulerMonitor) getJobRunningCount() int64 { + return atomic.LoadInt64(&t.jobRunningCount) +} + +func (t *testSchedulerMonitor) getJobCompletedCount() int64 { + return atomic.LoadInt64(&t.jobCompletedCount) +} + +func (t *testSchedulerMonitor) getJobFailedCount() int64 { + return atomic.LoadInt64(&t.jobFailedCount) +} + +// func (t *testSchedulerMonitor) getJobRegCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobRegCalls...) +// } + +// func (t *testSchedulerMonitor) getJobUnregCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobUnregCalls...) +// } + +// func (t *testSchedulerMonitor) getJobStartCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobStartCalls...) +// } + +// func (t *testSchedulerMonitor) getJobRunningCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobRunningCalls...) +// } + +// func (t *testSchedulerMonitor) getJobCompletedCalls() []Job { +// t.mu.RLock() +// defer t.mu.RUnlock() +// return append([]Job{}, t.jobCompletedCalls...) +// } + +func (t *testSchedulerMonitor) getJobFailedCalls() ([]Job, []error) { + t.mu.RLock() + defer t.mu.RUnlock() + jobs := append([]Job{}, t.jobFailedCalls.jobs...) + errs := append([]error{}, t.jobFailedCalls.errs...) + return jobs, errs +} + +func TestSchedulerMonitor_Basic(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Before starting, monitor should not have been called + assert.Equal(t, int64(0), monitor.getStartedCount()) + assert.Equal(t, int64(0), monitor.getShutdownCount()) + + // Add a simple job + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + // Start the scheduler + s.Start() + + // Wait a bit for the start to complete + time.Sleep(50 * time.Millisecond) + + // SchedulerStarted should have been called once + assert.Equal(t, int64(1), monitor.getStartedCount()) + assert.Equal(t, int64(0), monitor.getShutdownCount()) + + // Shutdown the scheduler + err = s.Shutdown() + require.NoError(t, err) + + // SchedulerShutdown should have been called once + assert.Equal(t, int64(1), monitor.getStartedCount()) + assert.Equal(t, int64(1), monitor.getShutdownCount()) + + // Verify the order of calls + startedCalls := monitor.getStartedCalls() + shutdownCalls := monitor.getShutdownCalls() + require.Len(t, startedCalls, 1) + require.Len(t, shutdownCalls, 1) + assert.True(t, startedCalls[0].Before(shutdownCalls[0]), + "SchedulerStarted should be called before SchedulerShutdown") +} + +func TestSchedulerMonitor_MultipleStartStop(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + // Start and stop multiple times + s.Start() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int64(1), monitor.getStartedCount()) + + err = s.StopJobs() + require.NoError(t, err) + // StopJobs shouldn't call SchedulerShutdown + assert.Equal(t, int64(0), monitor.getShutdownCount()) + + // Start again + s.Start() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int64(2), monitor.getStartedCount()) + + // Final shutdown + err = s.Shutdown() + require.NoError(t, err) + assert.Equal(t, int64(1), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_WithoutMonitor(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + // Create scheduler without monitor - should not panic + s := newTestScheduler(t) + + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(50 * time.Millisecond) + + err = s.Shutdown() + require.NoError(t, err) +} + +func TestSchedulerMonitor_NilMonitor(t *testing.T) { + // Attempting to create a scheduler with nil monitor should error + _, err := NewScheduler(WithSchedulerMonitor(nil)) + assert.Error(t, err) + assert.Equal(t, ErrSchedulerMonitorNil, err) +} + +func TestSchedulerMonitor_ConcurrentAccess(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Add multiple jobs + for i := 0; i < 10; i++ { + _, err := s.NewJob( + DurationJob(100*time.Millisecond), + NewTask(func() {}), + ) + require.NoError(t, err) + } + + // Start scheduler once (normal use case) + s.Start() + time.Sleep(150 * time.Millisecond) + + // Verify monitor was called + assert.Equal(t, int64(1), monitor.getStartedCount()) + + err := s.Shutdown() + require.NoError(t, err) + + // Monitor should be called for shutdown + assert.Equal(t, int64(1), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_StartWithoutJobs(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Start scheduler without any jobs + s.Start() + time.Sleep(50 * time.Millisecond) + + // Monitor should still be called + assert.Equal(t, int64(1), monitor.getStartedCount()) + + err := s.Shutdown() + require.NoError(t, err) + assert.Equal(t, int64(1), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_ShutdownWithoutStart(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + _, err := s.NewJob( + DurationJob(time.Second), + NewTask(func() {}), + ) + require.NoError(t, err) + + // Shutdown without starting + err = s.Shutdown() + require.NoError(t, err) + + // SchedulerStarted should not be called + assert.Equal(t, int64(0), monitor.getStartedCount()) + // SchedulerShutdown should not be called if scheduler was never started + assert.Equal(t, int64(0), monitor.getShutdownCount()) +} + +func TestSchedulerMonitor_ThreadSafety(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + + // Simulate concurrent calls to the monitor from multiple goroutines + var wg sync.WaitGroup + iterations := 100 + + for i := 0; i < iterations; i++ { + wg.Add(2) + go func() { + defer wg.Done() + monitor.SchedulerStarted() + }() + go func() { + defer wg.Done() + monitor.SchedulerShutdown() + }() + } + + wg.Wait() + + // Verify all calls were recorded + assert.Equal(t, int64(iterations), monitor.getStartedCount()) + assert.Equal(t, int64(iterations), monitor.getShutdownCount()) + assert.Len(t, monitor.getStartedCalls(), iterations) + assert.Len(t, monitor.getShutdownCalls(), iterations) +} + +func TestSchedulerMonitor_IntegrationWithJobs(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + monitor := newTestSchedulerMonitor() + s := newTestScheduler(t, WithSchedulerMonitor(monitor)) + + // Test successful job + jobRunCount := atomic.Int32{} + j, err := s.NewJob( + DurationJob(50*time.Millisecond), + NewTask(func() { + jobRunCount.Add(1) + }), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + // Test failing job + _, err = s.NewJob( + DurationJob(50*time.Millisecond), + NewTask(func() error { + return fmt.Errorf("test error") + }), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + // Start scheduler + s.Start() + time.Sleep(150 * time.Millisecond) // Wait for jobs to execute + + // Verify scheduler lifecycle events + assert.Equal(t, int64(1), monitor.getStartedCount()) + assert.GreaterOrEqual(t, jobRunCount.Load(), int32(1)) + + // Verify job registration + assert.Equal(t, int64(2), monitor.getJobRegCount(), "Should have registered 2 jobs") + + // Verify job execution events + assert.GreaterOrEqual(t, monitor.getJobStartCount(), int64(1), "Jobs should have started") + assert.GreaterOrEqual(t, monitor.getJobRunningCount(), int64(1), "Jobs should be running") + assert.GreaterOrEqual(t, monitor.getJobCompletedCount(), int64(1), "Successful job should complete") + assert.GreaterOrEqual(t, monitor.getJobFailedCount(), int64(1), "Failing job should fail") + + // Get failed job details + failedJobs, errors := monitor.getJobFailedCalls() + assert.NotEmpty(t, failedJobs, "Should have recorded failed jobs") + assert.NotEmpty(t, errors, "Should have recorded job errors") + assert.Contains(t, errors[0].Error(), "test error", "Should record the correct error") + + // Test unregistration + err = s.RemoveJob(j.ID()) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) // Wait for async removal + assert.Equal(t, int64(1), monitor.getJobUnregCount(), "Should have unregistered 1 job") + + // Shutdown + err = s.Shutdown() + require.NoError(t, err) + assert.Equal(t, int64(1), monitor.getShutdownCount()) +}