feat: add scheduler lifecycle monitoring <> #785 (#889)

* git commit -m "feat: add SchedulerStarted and SchedulerStopped monitoring"

* fix lint issue

* Update errors.go

Co-authored-by: John Roesler <johnrroesler@gmail.com>

* feat() updated with remaning metrics & events  (JobRegistered/JobUnregistered, JobStarted/JobRunning/JobFailed/JobCompleted)

* feat: enhance scheduler and job observability by adding new monitor events for lifecycle, performance, and concurrency limits.

* docs: expand metrics section to include scheduler lifecycle events and `SchedulerMonitor` details with Prometheus example

* refactor: conditionally send scheduler notifications only when a scheduler monitor is configured.

---------

Co-authored-by: John Roesler <johnrroesler@gmail.com>
This commit is contained in:
Yash Chauhan
2025-12-02 21:55:51 +05:30
committed by GitHub
parent 9cc3be7cff
commit f4c6d141a9
10 changed files with 946 additions and 9 deletions

View File

@@ -169,12 +169,52 @@ The Logger interface can be implemented with your desired logging library.
The provided NewLogger uses the standard library's log package.
### Metrics
Metrics may be collected from the execution of each job.
Metrics may be collected from the execution of each job and scheduler lifecycle events.
- [**Monitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#Monitor):
- [**MonitorStatus**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonitorStatus) (includes status and error (if any) of the Job)
A monitor can be used to collect metrics for each job from a scheduler.
- Implementations: [go-co-op monitors](https://github.com/go-co-op?q=-monitor&type=all&language=&sort=)
(don't see what you need? request on slack to get a repo created to contribute it!)
- [**SchedulerMonitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#SchedulerMonitor):
A scheduler monitor provides comprehensive observability into scheduler and job lifecycle events.
**Available Metrics:**
- **Scheduler Lifecycle**: `SchedulerStarted`, `SchedulerStopped`, `SchedulerShutdown`
- **Job Management**: `JobRegistered`, `JobUnregistered` - track jobs added/removed from scheduler
- **Job Execution**: `JobStarted`, `JobRunning`, `JobCompleted`, `JobFailed` - monitor job execution flow
- **Performance**: `JobExecutionTime`, `JobSchedulingDelay` - measure job duration and scheduling lag
- **Concurrency**: `ConcurrencyLimitReached` - detect when singleton or limit mode constraints are hit
**Derived Metrics** (calculable from events):
- Error rate: `JobFailed / (JobCompleted + JobFailed)`
- Average execution time: from `JobExecutionTime` events
- Active jobs: `JobRegistered - JobUnregistered`
- Current queue depth: `JobStarted - (JobCompleted + JobFailed)`
**Example - Prometheus Integration:**
```go
type PrometheusMonitor struct {
jobsCompleted prometheus.Counter
jobsFailed prometheus.Counter
executionTime prometheus.Histogram
schedulingDelay prometheus.Histogram
}
func (p *PrometheusMonitor) JobExecutionTime(job gocron.Job, duration time.Duration) {
p.executionTime.Observe(duration.Seconds())
}
func (p *PrometheusMonitor) JobSchedulingDelay(job gocron.Job, scheduled, actual time.Time) {
if delay := actual.Sub(scheduled); delay > 0 {
p.schedulingDelay.Observe(delay.Seconds())
}
}
// Initialize scheduler with monitor
s, _ := gocron.NewScheduler(gocron.WithSchedulerMonitor(monitor))
```
**Use Cases:** Prometheus metrics, custom dashboards, alerting systems, performance monitoring
### Testing
The gocron library is set up to enable testing.

View File

@@ -48,6 +48,7 @@ var (
ErrWithDistributedLockerNil = errors.New("gocron: WithDistributedLocker: locker must not be nil")
ErrWithDistributedJobLockerNil = errors.New("gocron: WithDistributedJobLocker: locker must not be nil")
ErrWithIdentifierNil = errors.New("gocron: WithIdentifier: identifier must not be nil")
ErrSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: monitor must not be nil")
ErrWithLimitConcurrentJobsZero = errors.New("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = errors.New("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = errors.New("gocron: WithLogger: logger must not be nil")
@@ -59,6 +60,7 @@ var (
ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end")
ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start")
ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0")
ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil")
ErrWithLimitedRunsZero = errors.New("gocron: WithLimitedRuns: limit must be greater than 0")
)

View File

@@ -56,6 +56,8 @@ type executor struct {
monitor Monitor
// monitorStatus for reporting metrics
monitorStatus MonitorStatus
// reference to parent scheduler for lifecycle notifications
scheduler *scheduler
}
type jobIn struct {
@@ -155,6 +157,15 @@ func (e *executor) start() {
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
// Notify concurrency limit reached if monitor is configured
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
ctx2, cancel2 := context.WithCancel(executorCtx)
job := requestJobCtx(ctx2, jIn.id, e.jobOutRequest)
cancel2()
if job != nil {
e.scheduler.notifyConcurrencyLimitReached("limit", e.scheduler.jobFromInternalJob(*job))
}
}
e.sendOutForRescheduling(&jIn)
}
} else {
@@ -209,6 +220,10 @@ func (e *executor) start() {
// which means we just skip it here and do nothing
e.incrementJobCounter(*j, SingletonRescheduled)
e.sendOutForRescheduling(&jIn)
// Notify concurrency limit reached if monitor is configured
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
e.scheduler.notifyConcurrencyLimitReached("singleton", e.scheduler.jobFromInternalJob(*j))
}
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
@@ -416,18 +431,36 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
// Notify job started
actualStartTime := time.Now()
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
jobObj := e.scheduler.jobFromInternalJob(j)
e.scheduler.notifyJobStarted(jobObj)
// Notify scheduling delay if job had a scheduled time
if len(j.nextScheduled) > 0 {
e.scheduler.notifyJobSchedulingDelay(jobObj, j.nextScheduled[0], actualStartTime)
}
}
err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name)
if err != nil {
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
// Notify job failed (before actual run)
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
e.scheduler.notifyJobFailed(e.scheduler.jobFromInternalJob(j), err)
}
return
}
// Notify job running
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
e.scheduler.notifyJobRunning(e.scheduler.jobFromInternalJob(j))
}
// For intervalFromCompletion, we need to reschedule AFTER the job completes,
// not before. For regular jobs, we reschedule before execution (existing behavior).
if !j.intervalFromCompletion {
@@ -448,11 +481,25 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
e.incrementJobCounter(j, Fail)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err)
endTime := time.Now()
e.recordJobTimingWithStatus(startTime, endTime, j, Fail, err)
// Notify job failed
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
jobObj := e.scheduler.jobFromInternalJob(j)
e.scheduler.notifyJobFailed(jobObj, err)
e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime))
}
} else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
e.incrementJobCounter(j, Success)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil)
endTime := time.Now()
e.recordJobTimingWithStatus(startTime, endTime, j, Success, nil)
// Notify job completed
if e.scheduler != nil && e.scheduler.schedulerMonitor != nil {
jobObj := e.scheduler.jobFromInternalJob(j)
e.scheduler.notifyJobCompleted(jobObj)
e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime))
}
}
// For intervalFromCompletion, reschedule AFTER the job completes

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/go-co-op/gocron/v2
go 1.24.0
go 1.21.4
require (
github.com/google/uuid v1.6.0

View File

@@ -0,0 +1,150 @@
package main
import (
"fmt"
"time"
"github.com/go-co-op/gocron/v2"
)
type DebugMonitor struct {
startCount int
stopCount int
jobRegCount int
jobUnregCount int
jobStartCount int
jobRunningCount int
jobCompletCount int
jobFailCount int
}
func (m *DebugMonitor) SchedulerStarted() {
m.startCount++
fmt.Printf("✓ SchedulerStarted() called (total: %d)\n", m.startCount)
}
func (m *DebugMonitor) SchedulerShutdown() {
m.stopCount++
fmt.Printf("✓ SchedulerShutdown() called (total: %d)\n", m.stopCount)
}
func (m *DebugMonitor) JobRegistered(job *gocron.Job) {
m.jobRegCount++
fmt.Printf("✓ JobRegistered() called (total: %d) - Job ID: %s\n", m.jobRegCount, (*job).ID())
}
func (m *DebugMonitor) JobUnregistered(job *gocron.Job) {
m.jobUnregCount++
fmt.Printf("✓ JobUnregistered() called (total: %d) - Job ID: %s\n", m.jobUnregCount, (*job).ID())
}
func (m *DebugMonitor) JobStarted(job *gocron.Job) {
m.jobStartCount++
fmt.Printf("✓ JobStarted() called (total: %d) - Job ID: %s\n", m.jobStartCount, (*job).ID())
}
func (m *DebugMonitor) JobRunning(job *gocron.Job) {
m.jobRunningCount++
fmt.Printf("✓ JobRunning() called (total: %d) - Job ID: %s\n", m.jobRunningCount, (*job).ID())
}
func (m *DebugMonitor) JobCompleted(job *gocron.Job) {
m.jobCompletCount++
fmt.Printf("✓ JobCompleted() called (total: %d) - Job ID: %s\n", m.jobCompletCount, (*job).ID())
}
func (m *DebugMonitor) JobFailed(job *gocron.Job, err error) {
m.jobFailCount++
fmt.Printf("✓ JobFailed() called (total: %d) - Job ID: %s, Error: %v\n", m.jobFailCount, (*job).ID(), err)
}
func main() {
// ONE monitor, multiple scheduler instances
monitor := &DebugMonitor{}
fmt.Println("=== Cycle 1 (Scheduler Instance 1) ===")
s1, err := gocron.NewScheduler(
gocron.WithSchedulerMonitor(monitor),
)
if err != nil {
panic(err)
}
// Create and register some test jobs
fmt.Println("Creating jobs...")
_, err = s1.NewJob(
gocron.DurationJob(1*time.Second),
gocron.NewTask(func() { fmt.Println("Job 1 running") }),
)
if err != nil {
panic(err)
}
_, err = s1.NewJob(
gocron.DurationJob(2*time.Second),
gocron.NewTask(func() error {
fmt.Println("Job 2 executing and returning error")
return fmt.Errorf("simulated job failure")
}), // This job will fail with error
)
if err != nil {
panic(err)
}
fmt.Println("Calling Start()...")
s1.Start()
time.Sleep(3 * time.Second) // Wait for jobs to execute
fmt.Println("Calling Shutdown()...")
err = s1.Shutdown()
if err != nil {
fmt.Printf("Shutdown error: %v\n", err)
}
fmt.Println("\n=== Cycle 2 (Job Updates) ===")
s2, err := gocron.NewScheduler(
gocron.WithSchedulerMonitor(monitor),
)
if err != nil {
panic(err)
}
fmt.Println("Creating and updating jobs...")
job3, err := s2.NewJob(
gocron.DurationJob(1*time.Second),
gocron.NewTask(func() { fmt.Println("Job 3 running") }),
)
if err != nil {
panic(err)
}
// Update the job
_, err = s2.Update(
job3.ID(),
gocron.DurationJob(2*time.Second),
gocron.NewTask(func() { fmt.Println("Job 3 updated") }),
)
if err != nil {
panic(err)
}
fmt.Println("Calling Start()...")
s2.Start()
time.Sleep(3 * time.Second)
fmt.Println("Calling Shutdown()...")
err = s2.Shutdown()
if err != nil {
fmt.Printf("Shutdown error: %v\n", err)
}
fmt.Println("\n=== Summary ===")
fmt.Printf("Total Scheduler Starts: %d\n", monitor.startCount)
fmt.Printf("Total Scheduler Stops: %d\n", monitor.stopCount)
fmt.Printf("Total Jobs Registered: %d\n", monitor.jobRegCount)
fmt.Printf("Total Jobs Unregistered: %d\n", monitor.jobUnregCount)
fmt.Printf("Total Jobs Started: %d\n", monitor.jobStartCount)
fmt.Printf("Total Jobs Running: %d\n", monitor.jobRunningCount)
fmt.Printf("Total Jobs Completed: %d\n", monitor.jobCompletCount)
fmt.Printf("Total Jobs Failed: %d\n", monitor.jobFailCount)
}

View File

@@ -0,0 +1,13 @@
module test
go 1.21.4
require github.com/go-co-op/gocron/v2 v2.17.0
require (
github.com/google/uuid v1.6.0 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
)
replace github.com/go-co-op/gocron/v2 => ../

View File

@@ -0,0 +1,16 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -100,6 +100,9 @@ type scheduler struct {
removeJobCh chan uuid.UUID
// requests from the client to remove jobs by tags are received here
removeJobsByTagsCh chan []string
// scheduler monitor from which metrics can be collected
schedulerMonitor SchedulerMonitor
}
type newJobIn struct {
@@ -148,7 +151,6 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
s := &scheduler{
shutdownCtx: schCtx,
shutdownCancel: cancel,
exec: exec,
jobs: make(map[uuid.UUID]internalJob),
location: time.Local,
logger: &noOpLogger{},
@@ -164,6 +166,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
runJobRequestCh: make(chan runJobRequest),
allJobsOutRequest: make(chan allJobsOutRequest),
}
exec.scheduler = s
s.exec = exec
for _, option := range options {
err := option(s)
@@ -273,6 +277,9 @@ func (s *scheduler) stopScheduler() {
s.stopErrCh <- err
s.started.Store(false)
s.logger.Debug("gocron: scheduler stopped")
// Notify monitor that scheduler has stopped
s.notifySchedulerStopped()
}
func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
@@ -323,6 +330,10 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {
if !ok {
return
}
if s.schedulerMonitor != nil {
out := s.jobFromInternalJob(j)
s.notifyJobUnregistered(out)
}
j.stop()
delete(s.jobs, id)
}
@@ -537,6 +548,10 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) {
for _, j := range s.jobs {
for _, tag := range tags {
if slices.Contains(j.tags, tag) {
if s.schedulerMonitor != nil {
out := s.jobFromInternalJob(j)
s.notifyJobUnregistered(out)
}
j.stop()
delete(s.jobs, j.id)
break
@@ -702,7 +717,7 @@ func (s *scheduler) verifyParameterType(taskFunc reflect.Value, tsk task) error
return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength)
}
var contextType = reflect.TypeFor[context.Context]()
var contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) {
j := internalJob{}
@@ -800,6 +815,9 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
}
out := s.jobFromInternalJob(j)
if s.schedulerMonitor != nil {
s.notifyJobRegistered(out)
}
return &out, nil
}
@@ -831,8 +849,13 @@ func (s *scheduler) Start() {
select {
case <-s.shutdownCtx.Done():
// Scheduler already shut down, don't notify
return
case s.startCh <- struct{}{}:
<-s.startedCh
<-s.startedCh // Wait for scheduler to actually start
// Scheduler has started
s.notifySchedulerStarted()
}
}
@@ -865,6 +888,9 @@ func (s *scheduler) Shutdown() error {
select {
case err := <-s.stopErrCh:
t.Stop()
// notify monitor that scheduler stopped
s.notifySchedulerShutdown()
return err
case <-t.C:
return ErrStopSchedulerTimedOut
@@ -1070,3 +1096,98 @@ func WithMonitorStatus(monitor MonitorStatus) SchedulerOption {
return nil
}
}
// WithSchedulerMonitor sets a monitor that will be called with scheduler-level events.
func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption {
return func(s *scheduler) error {
if monitor == nil {
return ErrSchedulerMonitorNil
}
s.schedulerMonitor = monitor
return nil
}
}
// notifySchedulerStarted notifies the monitor that scheduler has started
func (s *scheduler) notifySchedulerStarted() {
if s.schedulerMonitor != nil {
s.schedulerMonitor.SchedulerStarted()
}
}
// notifySchedulerShutdown notifies the monitor that scheduler has stopped
func (s *scheduler) notifySchedulerShutdown() {
if s.schedulerMonitor != nil {
s.schedulerMonitor.SchedulerShutdown()
}
}
// notifyJobRegistered notifies the monitor that a job has been registered
func (s *scheduler) notifyJobRegistered(job Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobRegistered(job)
}
}
// notifyJobUnregistered notifies the monitor that a job has been unregistered
func (s *scheduler) notifyJobUnregistered(job Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobUnregistered(job)
}
}
// notifyJobStarted notifies the monitor that a job has started
func (s *scheduler) notifyJobStarted(job Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobStarted(job)
}
}
// notifyJobRunning notifies the monitor that a job is running.
func (s *scheduler) notifyJobRunning(job Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobRunning(job)
}
}
// notifyJobCompleted notifies the monitor that a job has completed.
func (s *scheduler) notifyJobCompleted(job Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobCompleted(job)
}
}
// notifyJobFailed notifies the monitor that a job has failed.
func (s *scheduler) notifyJobFailed(job Job, err error) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobFailed(job, err)
}
}
// notifySchedulerStopped notifies the monitor that the scheduler has stopped
func (s *scheduler) notifySchedulerStopped() {
if s.schedulerMonitor != nil {
s.schedulerMonitor.SchedulerStopped()
}
}
// notifyJobExecutionTime notifies the monitor of a job's execution time
func (s *scheduler) notifyJobExecutionTime(job Job, duration time.Duration) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobExecutionTime(job, duration)
}
}
// notifyJobSchedulingDelay notifies the monitor of scheduling delay
func (s *scheduler) notifyJobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobSchedulingDelay(job, scheduledTime, actualStartTime)
}
}
// notifyConcurrencyLimitReached notifies the monitor that a concurrency limit was reached
func (s *scheduler) notifyConcurrencyLimitReached(limitType string, job Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.ConcurrencyLimitReached(limitType, job)
}
}

50
scheduler_monitor.go Normal file
View File

@@ -0,0 +1,50 @@
package gocron
import "time"
// SchedulerMonitor is called by the Scheduler to provide scheduler-level
// metrics and events.
type SchedulerMonitor interface {
// SchedulerStarted is called when Start() is invoked on the scheduler.
SchedulerStarted()
// SchedulerStopped is called when the scheduler's main loop stops,
// but before final cleanup in Shutdown().
SchedulerStopped()
// SchedulerShutdown is called when Shutdown() completes successfully.
SchedulerShutdown()
// JobRegistered is called when a job is registered with the scheduler.
JobRegistered(job Job)
// JobUnregistered is called when a job is unregistered from the scheduler.
JobUnregistered(job Job)
// JobStarted is called when a job starts running.
JobStarted(job Job)
// JobRunning is called when a job is running.
JobRunning(job Job)
// JobFailed is called when a job fails to complete successfully.
JobFailed(job Job, err error)
// JobCompleted is called when a job has completed running.
JobCompleted(job Job)
// JobExecutionTime is called after a job completes (success or failure)
// with the time it took to execute. This enables calculation of metrics
// like AverageExecutionTime.
JobExecutionTime(job Job, duration time.Duration)
// JobSchedulingDelay is called when a job starts running, providing both
// the scheduled time and actual start time. This enables calculation of
// SchedulingLag metrics to detect when jobs are running behind schedule.
JobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time)
// ConcurrencyLimitReached is called when a job cannot start immediately
// due to concurrency limits (singleton or limit mode).
// limitType will be "singleton" or "limit".
ConcurrencyLimitReached(limitType string, job Job)
}

498
scheduler_monitor_test.go Normal file
View File

@@ -0,0 +1,498 @@
package gocron
import (
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// testSchedulerMonitor is a test implementation of SchedulerMonitor
// that tracks scheduler lifecycle events
type testSchedulerMonitor struct {
mu sync.RWMutex
startedCount int64
stoppedCount int64
shutdownCount int64
jobRegCount int64
jobUnregCount int64
jobStartCount int64
jobRunningCount int64
jobCompletedCount int64
jobFailedCount int64
concurrencyLimitCount int64
startedCalls []time.Time
stoppedCalls []time.Time
shutdownCalls []time.Time
jobRegCalls []Job
jobUnregCalls []Job
jobStartCalls []Job
jobRunningCalls []Job
jobCompletedCalls []Job
jobExecutionTimes []time.Duration
jobSchedulingDelays []time.Duration
concurrencyLimitCalls []string
jobFailedCalls struct {
jobs []Job
errs []error
}
}
func newTestSchedulerMonitor() *testSchedulerMonitor {
return &testSchedulerMonitor{
startedCalls: make([]time.Time, 0),
stoppedCalls: make([]time.Time, 0),
shutdownCalls: make([]time.Time, 0),
jobRegCalls: make([]Job, 0),
jobUnregCalls: make([]Job, 0),
jobStartCalls: make([]Job, 0),
jobRunningCalls: make([]Job, 0),
jobCompletedCalls: make([]Job, 0),
jobExecutionTimes: make([]time.Duration, 0),
jobSchedulingDelays: make([]time.Duration, 0),
concurrencyLimitCalls: make([]string, 0),
jobFailedCalls: struct {
jobs []Job
errs []error
}{
jobs: make([]Job, 0),
errs: make([]error, 0),
},
}
}
func (t *testSchedulerMonitor) SchedulerStarted() {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.startedCount, 1)
t.startedCalls = append(t.startedCalls, time.Now())
}
func (t *testSchedulerMonitor) SchedulerStopped() {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.stoppedCount, 1)
t.stoppedCalls = append(t.stoppedCalls, time.Now())
}
func (t *testSchedulerMonitor) SchedulerShutdown() {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.shutdownCount, 1)
t.shutdownCalls = append(t.shutdownCalls, time.Now())
}
func (t *testSchedulerMonitor) getStartedCount() int64 {
return atomic.LoadInt64(&t.startedCount)
}
func (t *testSchedulerMonitor) getShutdownCount() int64 {
return atomic.LoadInt64(&t.shutdownCount)
}
func (t *testSchedulerMonitor) getStartedCalls() []time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return append([]time.Time{}, t.startedCalls...)
}
func (t *testSchedulerMonitor) getShutdownCalls() []time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return append([]time.Time{}, t.shutdownCalls...)
}
func (t *testSchedulerMonitor) JobRegistered(job Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobRegCount, 1)
t.jobRegCalls = append(t.jobRegCalls, job)
}
func (t *testSchedulerMonitor) JobUnregistered(job Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobUnregCount, 1)
t.jobUnregCalls = append(t.jobUnregCalls, job)
}
func (t *testSchedulerMonitor) JobStarted(job Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobStartCount, 1)
t.jobStartCalls = append(t.jobStartCalls, job)
}
func (t *testSchedulerMonitor) JobRunning(job Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobRunningCount, 1)
t.jobRunningCalls = append(t.jobRunningCalls, job)
}
func (t *testSchedulerMonitor) JobCompleted(job Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobCompletedCount, 1)
t.jobCompletedCalls = append(t.jobCompletedCalls, job)
}
func (t *testSchedulerMonitor) JobFailed(job Job, err error) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobFailedCount, 1)
t.jobFailedCalls.jobs = append(t.jobFailedCalls.jobs, job)
t.jobFailedCalls.errs = append(t.jobFailedCalls.errs, err)
}
func (t *testSchedulerMonitor) JobExecutionTime(_ Job, duration time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()
t.jobExecutionTimes = append(t.jobExecutionTimes, duration)
}
func (t *testSchedulerMonitor) JobSchedulingDelay(_ Job, scheduledTime time.Time, actualStartTime time.Time) {
t.mu.Lock()
defer t.mu.Unlock()
delay := actualStartTime.Sub(scheduledTime)
if delay > 0 {
t.jobSchedulingDelays = append(t.jobSchedulingDelays, delay)
}
}
func (t *testSchedulerMonitor) ConcurrencyLimitReached(limitType string, _ Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.concurrencyLimitCount, 1)
t.concurrencyLimitCalls = append(t.concurrencyLimitCalls, limitType)
}
func (t *testSchedulerMonitor) getJobRegCount() int64 {
return atomic.LoadInt64(&t.jobRegCount)
}
func (t *testSchedulerMonitor) getJobUnregCount() int64 {
return atomic.LoadInt64(&t.jobUnregCount)
}
func (t *testSchedulerMonitor) getJobStartCount() int64 {
return atomic.LoadInt64(&t.jobStartCount)
}
func (t *testSchedulerMonitor) getJobRunningCount() int64 {
return atomic.LoadInt64(&t.jobRunningCount)
}
func (t *testSchedulerMonitor) getJobCompletedCount() int64 {
return atomic.LoadInt64(&t.jobCompletedCount)
}
func (t *testSchedulerMonitor) getJobFailedCount() int64 {
return atomic.LoadInt64(&t.jobFailedCount)
}
// func (t *testSchedulerMonitor) getJobRegCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobRegCalls...)
// }
// func (t *testSchedulerMonitor) getJobUnregCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobUnregCalls...)
// }
// func (t *testSchedulerMonitor) getJobStartCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobStartCalls...)
// }
// func (t *testSchedulerMonitor) getJobRunningCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobRunningCalls...)
// }
// func (t *testSchedulerMonitor) getJobCompletedCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobCompletedCalls...)
// }
func (t *testSchedulerMonitor) getJobFailedCalls() ([]Job, []error) {
t.mu.RLock()
defer t.mu.RUnlock()
jobs := append([]Job{}, t.jobFailedCalls.jobs...)
errs := append([]error{}, t.jobFailedCalls.errs...)
return jobs, errs
}
func TestSchedulerMonitor_Basic(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Before starting, monitor should not have been called
assert.Equal(t, int64(0), monitor.getStartedCount())
assert.Equal(t, int64(0), monitor.getShutdownCount())
// Add a simple job
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
// Start the scheduler
s.Start()
// Wait a bit for the start to complete
time.Sleep(50 * time.Millisecond)
// SchedulerStarted should have been called once
assert.Equal(t, int64(1), monitor.getStartedCount())
assert.Equal(t, int64(0), monitor.getShutdownCount())
// Shutdown the scheduler
err = s.Shutdown()
require.NoError(t, err)
// SchedulerShutdown should have been called once
assert.Equal(t, int64(1), monitor.getStartedCount())
assert.Equal(t, int64(1), monitor.getShutdownCount())
// Verify the order of calls
startedCalls := monitor.getStartedCalls()
shutdownCalls := monitor.getShutdownCalls()
require.Len(t, startedCalls, 1)
require.Len(t, shutdownCalls, 1)
assert.True(t, startedCalls[0].Before(shutdownCalls[0]),
"SchedulerStarted should be called before SchedulerShutdown")
}
func TestSchedulerMonitor_MultipleStartStop(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
// Start and stop multiple times
s.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(1), monitor.getStartedCount())
err = s.StopJobs()
require.NoError(t, err)
// StopJobs shouldn't call SchedulerShutdown
assert.Equal(t, int64(0), monitor.getShutdownCount())
// Start again
s.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(2), monitor.getStartedCount())
// Final shutdown
err = s.Shutdown()
require.NoError(t, err)
assert.Equal(t, int64(1), monitor.getShutdownCount())
}
func TestSchedulerMonitor_WithoutMonitor(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
// Create scheduler without monitor - should not panic
s := newTestScheduler(t)
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
s.Start()
time.Sleep(50 * time.Millisecond)
err = s.Shutdown()
require.NoError(t, err)
}
func TestSchedulerMonitor_NilMonitor(t *testing.T) {
// Attempting to create a scheduler with nil monitor should error
_, err := NewScheduler(WithSchedulerMonitor(nil))
assert.Error(t, err)
assert.Equal(t, ErrSchedulerMonitorNil, err)
}
func TestSchedulerMonitor_ConcurrentAccess(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Add multiple jobs
for i := 0; i < 10; i++ {
_, err := s.NewJob(
DurationJob(100*time.Millisecond),
NewTask(func() {}),
)
require.NoError(t, err)
}
// Start scheduler once (normal use case)
s.Start()
time.Sleep(150 * time.Millisecond)
// Verify monitor was called
assert.Equal(t, int64(1), monitor.getStartedCount())
err := s.Shutdown()
require.NoError(t, err)
// Monitor should be called for shutdown
assert.Equal(t, int64(1), monitor.getShutdownCount())
}
func TestSchedulerMonitor_StartWithoutJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Start scheduler without any jobs
s.Start()
time.Sleep(50 * time.Millisecond)
// Monitor should still be called
assert.Equal(t, int64(1), monitor.getStartedCount())
err := s.Shutdown()
require.NoError(t, err)
assert.Equal(t, int64(1), monitor.getShutdownCount())
}
func TestSchedulerMonitor_ShutdownWithoutStart(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
// Shutdown without starting
err = s.Shutdown()
require.NoError(t, err)
// SchedulerStarted should not be called
assert.Equal(t, int64(0), monitor.getStartedCount())
// SchedulerShutdown should not be called if scheduler was never started
assert.Equal(t, int64(0), monitor.getShutdownCount())
}
func TestSchedulerMonitor_ThreadSafety(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
// Simulate concurrent calls to the monitor from multiple goroutines
var wg sync.WaitGroup
iterations := 100
for i := 0; i < iterations; i++ {
wg.Add(2)
go func() {
defer wg.Done()
monitor.SchedulerStarted()
}()
go func() {
defer wg.Done()
monitor.SchedulerShutdown()
}()
}
wg.Wait()
// Verify all calls were recorded
assert.Equal(t, int64(iterations), monitor.getStartedCount())
assert.Equal(t, int64(iterations), monitor.getShutdownCount())
assert.Len(t, monitor.getStartedCalls(), iterations)
assert.Len(t, monitor.getShutdownCalls(), iterations)
}
func TestSchedulerMonitor_IntegrationWithJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Test successful job
jobRunCount := atomic.Int32{}
j, err := s.NewJob(
DurationJob(50*time.Millisecond),
NewTask(func() {
jobRunCount.Add(1)
}),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
// Test failing job
_, err = s.NewJob(
DurationJob(50*time.Millisecond),
NewTask(func() error {
return fmt.Errorf("test error")
}),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
// Start scheduler
s.Start()
time.Sleep(150 * time.Millisecond) // Wait for jobs to execute
// Verify scheduler lifecycle events
assert.Equal(t, int64(1), monitor.getStartedCount())
assert.GreaterOrEqual(t, jobRunCount.Load(), int32(1))
// Verify job registration
assert.Equal(t, int64(2), monitor.getJobRegCount(), "Should have registered 2 jobs")
// Verify job execution events
assert.GreaterOrEqual(t, monitor.getJobStartCount(), int64(1), "Jobs should have started")
assert.GreaterOrEqual(t, monitor.getJobRunningCount(), int64(1), "Jobs should be running")
assert.GreaterOrEqual(t, monitor.getJobCompletedCount(), int64(1), "Successful job should complete")
assert.GreaterOrEqual(t, monitor.getJobFailedCount(), int64(1), "Failing job should fail")
// Get failed job details
failedJobs, errors := monitor.getJobFailedCalls()
assert.NotEmpty(t, failedJobs, "Should have recorded failed jobs")
assert.NotEmpty(t, errors, "Should have recorded job errors")
assert.Contains(t, errors[0].Error(), "test error", "Should record the correct error")
// Test unregistration
err = s.RemoveJob(j.ID())
require.NoError(t, err)
time.Sleep(50 * time.Millisecond) // Wait for async removal
assert.Equal(t, int64(1), monitor.getJobUnregCount(), "Should have unregistered 1 job")
// Shutdown
err = s.Shutdown()
require.NoError(t, err)
assert.Equal(t, int64(1), monitor.getShutdownCount())
}