feat() updated with remaning metrics & events (JobRegistered/JobUnregistered, JobStarted/JobRunning/JobFailed/JobCompleted)

This commit is contained in:
iyashjayesh 2025-11-11 19:16:34 +05:30
parent ddfeae61ed
commit 345ad8ad3a
9 changed files with 728 additions and 145 deletions

View File

@ -48,6 +48,7 @@ var (
ErrWithDistributedLockerNil = errors.New("gocron: WithDistributedLocker: locker must not be nil") ErrWithDistributedLockerNil = errors.New("gocron: WithDistributedLocker: locker must not be nil")
ErrWithDistributedJobLockerNil = errors.New("gocron: WithDistributedJobLocker: locker must not be nil") ErrWithDistributedJobLockerNil = errors.New("gocron: WithDistributedJobLocker: locker must not be nil")
ErrWithIdentifierNil = errors.New("gocron: WithIdentifier: identifier must not be nil") ErrWithIdentifierNil = errors.New("gocron: WithIdentifier: identifier must not be nil")
ErrSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: monitor must not be nil")
ErrWithLimitConcurrentJobsZero = errors.New("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLimitConcurrentJobsZero = errors.New("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = errors.New("gocron: WithLocation: location must not be nil") ErrWithLocationNil = errors.New("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = errors.New("gocron: WithLogger: logger must not be nil") ErrWithLoggerNil = errors.New("gocron: WithLogger: logger must not be nil")
@ -59,7 +60,7 @@ var (
ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end") ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end")
ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start") ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start")
ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0") ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0")
ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil") ErrWithSchedulerMonitorNil = errors.New("gocron: WithSchedulerMonitor: scheduler monitor cannot be nil")
) )
// internal errors // internal errors

View File

@ -56,6 +56,8 @@ type executor struct {
monitor Monitor monitor Monitor
// monitorStatus for reporting metrics // monitorStatus for reporting metrics
monitorStatus MonitorStatus monitorStatus MonitorStatus
// reference to parent scheduler for lifecycle notifications
scheduler *scheduler
} }
type jobIn struct { type jobIn struct {
@ -416,18 +418,36 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
// Notify job started
if e.scheduler != nil {
out := e.scheduler.jobFromInternalJob(j)
var jobIface Job = out
e.scheduler.notifyJobStarted(&jobIface)
}
err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name) err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name)
if err != nil { if err != nil {
e.sendOutForRescheduling(&jIn) e.sendOutForRescheduling(&jIn)
select { select {
case e.jobsOutCompleted <- j.id: case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done(): case <-e.ctx.Done():
} }
// Notify job failed (before actual run)
if e.scheduler != nil {
out := e.scheduler.jobFromInternalJob(j)
var jobIface Job = out
e.scheduler.notifyJobFailed(&jobIface, err)
}
return return
} }
// Notify job running
if e.scheduler != nil {
out := e.scheduler.jobFromInternalJob(j)
var jobIface Job = out
e.scheduler.notifyJobRunning(&jobIface)
}
// For intervalFromCompletion, we need to reschedule AFTER the job completes, // For intervalFromCompletion, we need to reschedule AFTER the job completes,
// not before. For regular jobs, we reschedule before execution (existing behavior). // not before. For regular jobs, we reschedule before execution (existing behavior).
if !j.intervalFromCompletion { if !j.intervalFromCompletion {
@ -449,10 +469,22 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
e.incrementJobCounter(j, Fail) e.incrementJobCounter(j, Fail)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err) e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err)
// Notify job failed
if e.scheduler != nil {
out := e.scheduler.jobFromInternalJob(j)
var jobIface Job = out
e.scheduler.notifyJobFailed(&jobIface, err)
}
} else { } else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
e.incrementJobCounter(j, Success) e.incrementJobCounter(j, Success)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil) e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil)
// Notify job completed
if e.scheduler != nil {
out := e.scheduler.jobFromInternalJob(j)
var jobIface Job = out
e.scheduler.notifyJobCompleted(&jobIface)
}
} }
// For intervalFromCompletion, reschedule AFTER the job completes // For intervalFromCompletion, reschedule AFTER the job completes

2
go.mod
View File

@ -1,6 +1,6 @@
module github.com/go-co-op/gocron/v2 module github.com/go-co-op/gocron/v2
go 1.24.0 go 1.21.4
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0

View File

@ -0,0 +1,150 @@
package main
import (
"fmt"
"time"
"github.com/go-co-op/gocron/v2"
)
type DebugMonitor struct {
startCount int
stopCount int
jobRegCount int
jobUnregCount int
jobStartCount int
jobRunningCount int
jobCompletCount int
jobFailCount int
}
func (m *DebugMonitor) SchedulerStarted() {
m.startCount++
fmt.Printf("✓ SchedulerStarted() called (total: %d)\n", m.startCount)
}
func (m *DebugMonitor) SchedulerShutdown() {
m.stopCount++
fmt.Printf("✓ SchedulerShutdown() called (total: %d)\n", m.stopCount)
}
func (m *DebugMonitor) JobRegistered(job *gocron.Job) {
m.jobRegCount++
fmt.Printf("✓ JobRegistered() called (total: %d) - Job ID: %s\n", m.jobRegCount, (*job).ID())
}
func (m *DebugMonitor) JobUnregistered(job *gocron.Job) {
m.jobUnregCount++
fmt.Printf("✓ JobUnregistered() called (total: %d) - Job ID: %s\n", m.jobUnregCount, (*job).ID())
}
func (m *DebugMonitor) JobStarted(job *gocron.Job) {
m.jobStartCount++
fmt.Printf("✓ JobStarted() called (total: %d) - Job ID: %s\n", m.jobStartCount, (*job).ID())
}
func (m *DebugMonitor) JobRunning(job *gocron.Job) {
m.jobRunningCount++
fmt.Printf("✓ JobRunning() called (total: %d) - Job ID: %s\n", m.jobRunningCount, (*job).ID())
}
func (m *DebugMonitor) JobCompleted(job *gocron.Job) {
m.jobCompletCount++
fmt.Printf("✓ JobCompleted() called (total: %d) - Job ID: %s\n", m.jobCompletCount, (*job).ID())
}
func (m *DebugMonitor) JobFailed(job *gocron.Job, err error) {
m.jobFailCount++
fmt.Printf("✓ JobFailed() called (total: %d) - Job ID: %s, Error: %v\n", m.jobFailCount, (*job).ID(), err)
}
func main() {
// ONE monitor, multiple scheduler instances
monitor := &DebugMonitor{}
fmt.Println("=== Cycle 1 (Scheduler Instance 1) ===")
s1, err := gocron.NewScheduler(
gocron.WithSchedulerMonitor(monitor),
)
if err != nil {
panic(err)
}
// Create and register some test jobs
fmt.Println("Creating jobs...")
_, err = s1.NewJob(
gocron.DurationJob(1*time.Second),
gocron.NewTask(func() { fmt.Println("Job 1 running") }),
)
if err != nil {
panic(err)
}
_, err = s1.NewJob(
gocron.DurationJob(2*time.Second),
gocron.NewTask(func() error {
fmt.Println("Job 2 executing and returning error")
return fmt.Errorf("simulated job failure")
}), // This job will fail with error
)
if err != nil {
panic(err)
}
fmt.Println("Calling Start()...")
s1.Start()
time.Sleep(3 * time.Second) // Wait for jobs to execute
fmt.Println("Calling Shutdown()...")
err = s1.Shutdown()
if err != nil {
fmt.Printf("Shutdown error: %v\n", err)
}
fmt.Println("\n=== Cycle 2 (Job Updates) ===")
s2, err := gocron.NewScheduler(
gocron.WithSchedulerMonitor(monitor),
)
if err != nil {
panic(err)
}
fmt.Println("Creating and updating jobs...")
job3, err := s2.NewJob(
gocron.DurationJob(1*time.Second),
gocron.NewTask(func() { fmt.Println("Job 3 running") }),
)
if err != nil {
panic(err)
}
// Update the job
_, err = s2.Update(
job3.ID(),
gocron.DurationJob(2*time.Second),
gocron.NewTask(func() { fmt.Println("Job 3 updated") }),
)
if err != nil {
panic(err)
}
fmt.Println("Calling Start()...")
s2.Start()
time.Sleep(3 * time.Second)
fmt.Println("Calling Shutdown()...")
err = s2.Shutdown()
if err != nil {
fmt.Printf("Shutdown error: %v\n", err)
}
fmt.Println("\n=== Summary ===")
fmt.Printf("Total Scheduler Starts: %d\n", monitor.startCount)
fmt.Printf("Total Scheduler Stops: %d\n", monitor.stopCount)
fmt.Printf("Total Jobs Registered: %d\n", monitor.jobRegCount)
fmt.Printf("Total Jobs Unregistered: %d\n", monitor.jobUnregCount)
fmt.Printf("Total Jobs Started: %d\n", monitor.jobStartCount)
fmt.Printf("Total Jobs Running: %d\n", monitor.jobRunningCount)
fmt.Printf("Total Jobs Completed: %d\n", monitor.jobCompletCount)
fmt.Printf("Total Jobs Failed: %d\n", monitor.jobFailCount)
}

View File

@ -0,0 +1,13 @@
module test
go 1.21.4
require github.com/go-co-op/gocron/v2 v2.17.0
require (
github.com/google/uuid v1.6.0 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
)
replace github.com/go-co-op/gocron/v2 => ../

View File

@ -0,0 +1,16 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -7,7 +7,6 @@ import (
"runtime" "runtime"
"slices" "slices"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -104,8 +103,6 @@ type scheduler struct {
// scheduler monitor from which metrics can be collected // scheduler monitor from which metrics can be collected
schedulerMonitor SchedulerMonitor schedulerMonitor SchedulerMonitor
// mutex to protect access to the scheduler monitor
schedulerMonitorMu sync.RWMutex
} }
type newJobIn struct { type newJobIn struct {
@ -154,7 +151,6 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
s := &scheduler{ s := &scheduler{
shutdownCtx: schCtx, shutdownCtx: schCtx,
shutdownCancel: cancel, shutdownCancel: cancel,
exec: exec,
jobs: make(map[uuid.UUID]internalJob), jobs: make(map[uuid.UUID]internalJob),
location: time.Local, location: time.Local,
logger: &noOpLogger{}, logger: &noOpLogger{},
@ -170,6 +166,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
runJobRequestCh: make(chan runJobRequest), runJobRequestCh: make(chan runJobRequest),
allJobsOutRequest: make(chan allJobsOutRequest), allJobsOutRequest: make(chan allJobsOutRequest),
} }
exec.scheduler = s
s.exec = exec
for _, option := range options { for _, option := range options {
err := option(s) err := option(s)
@ -329,6 +327,11 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {
if !ok { if !ok {
return return
} }
if s.schedulerMonitor != nil {
out := s.jobFromInternalJob(j)
var job Job = &out
s.notifyJobUnregistered(&job)
}
j.stop() j.stop()
delete(s.jobs, id) delete(s.jobs, id)
} }
@ -537,6 +540,11 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) {
for _, j := range s.jobs { for _, j := range s.jobs {
for _, tag := range tags { for _, tag := range tags {
if slices.Contains(j.tags, tag) { if slices.Contains(j.tags, tag) {
if s.schedulerMonitor != nil {
out := s.jobFromInternalJob(j)
var job Job = &out
s.notifyJobUnregistered(&job)
}
j.stop() j.stop()
delete(s.jobs, j.id) delete(s.jobs, j.id)
break break
@ -697,7 +705,7 @@ func (s *scheduler) verifyParameterType(taskFunc reflect.Value, tsk task) error
return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength) return s.verifyNonVariadic(taskFunc, tsk, expectedParameterLength)
} }
var contextType = reflect.TypeFor[context.Context]() var contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) { func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) {
j := internalJob{} j := internalJob{}
@ -795,6 +803,10 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
} }
out := s.jobFromInternalJob(j) out := s.jobFromInternalJob(j)
if s.schedulerMonitor != nil {
var job Job = out
s.notifyJobRegistered(&job)
}
return &out, nil return &out, nil
} }
@ -862,7 +874,7 @@ func (s *scheduler) Shutdown() error {
t.Stop() t.Stop()
// notify monitor that scheduler stopped // notify monitor that scheduler stopped
s.notifySchedulerStopped() s.notifySchedulerShutdown()
return err return err
case <-t.C: case <-t.C:
return ErrStopSchedulerTimedOut return ErrStopSchedulerTimedOut
@ -1075,8 +1087,6 @@ func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption {
if monitor == nil { if monitor == nil {
return ErrSchedulerMonitorNil return ErrSchedulerMonitorNil
} }
s.schedulerMonitorMu.Lock()
defer s.schedulerMonitorMu.Unlock()
s.schedulerMonitor = monitor s.schedulerMonitor = monitor
return nil return nil
} }
@ -1084,18 +1094,56 @@ func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption {
// notifySchedulerStarted notifies the monitor that scheduler has started // notifySchedulerStarted notifies the monitor that scheduler has started
func (s *scheduler) notifySchedulerStarted() { func (s *scheduler) notifySchedulerStarted() {
s.schedulerMonitorMu.RLock()
defer s.schedulerMonitorMu.RUnlock()
if s.schedulerMonitor != nil { if s.schedulerMonitor != nil {
s.schedulerMonitor.SchedulerStarted() s.schedulerMonitor.SchedulerStarted()
} }
} }
// notifySchedulerStopped notifies the monitor that scheduler has stopped // notifySchedulerShutdown notifies the monitor that scheduler has stopped
func (s *scheduler) notifySchedulerStopped() { func (s *scheduler) notifySchedulerShutdown() {
s.schedulerMonitorMu.RLock()
defer s.schedulerMonitorMu.RUnlock()
if s.schedulerMonitor != nil { if s.schedulerMonitor != nil {
s.schedulerMonitor.SchedulerStopped() s.schedulerMonitor.SchedulerShutdown()
}
}
// notifyJobRegistered notifies the monitor that a job has been registered
func (s *scheduler) notifyJobRegistered(job *Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobRegistered(job)
}
}
// notifyJobUnregistered notifies the monitor that a job has been unregistered
func (s *scheduler) notifyJobUnregistered(job *Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobUnregistered(job)
}
}
// notifyJobStarted notifies the monitor that a job has started
func (s *scheduler) notifyJobStarted(job *Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobStarted(job)
}
}
// notifyJobRunning notifies the monitor that a job is running.
func (s *scheduler) notifyJobRunning(job *Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobRunning(job)
}
}
// notifyJobCompleted notifies the monitor that a job has completed.
func (s *scheduler) notifyJobCompleted(job *Job) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobCompleted(job)
}
}
// notifyJobFailed notifies the monitor that a job has failed.
func (s *scheduler) notifyJobFailed(job *Job, err error) {
if s.schedulerMonitor != nil {
s.schedulerMonitor.JobFailed(job, err)
} }
} }

View File

@ -6,6 +6,24 @@ type SchedulerMonitor interface {
// SchedulerStarted is called when Start() is invoked on the scheduler. // SchedulerStarted is called when Start() is invoked on the scheduler.
SchedulerStarted() SchedulerStarted()
// SchedulerStopped is called when Shutdown() completes successfully. // SchedulerShutdown is called when Shutdown() completes successfully.
SchedulerStopped() SchedulerShutdown()
// JobRegistered is called when a job is registered with the scheduler.
JobRegistered(job *Job)
// JobUnregistered is called when a job is unregistered from the scheduler.
JobUnregistered(job *Job)
// JobStarted is called when a job starts running.
JobStarted(job *Job)
// JobRunning is called when a job is running.
JobRunning(job *Job)
// JobFailed is called when a job fails to complete successfully.
JobFailed(job *Job, err error)
// JobCompleted is called when a job has completed running.
JobCompleted(job *Job)
} }

View File

@ -1,7 +1,9 @@
package gocron package gocron
import ( import (
"fmt"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -9,140 +11,443 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// mockSchedulerMonitor is a test implementation // testSchedulerMonitor is a test implementation of SchedulerMonitor
type mockSchedulerMonitor struct { // that tracks scheduler lifecycle events
mu sync.Mutex type testSchedulerMonitor struct {
schedulerStartedCount int mu sync.RWMutex
schedulerStoppedCount int startedCount int64
shutdownCount int64
jobRegCount int64
jobUnregCount int64
jobStartCount int64
jobRunningCount int64
jobCompletedCount int64
jobFailedCount int64
startedCalls []time.Time
shutdownCalls []time.Time
jobRegCalls []Job
jobUnregCalls []Job
jobStartCalls []Job
jobRunningCalls []Job
jobCompletedCalls []Job
jobFailedCalls struct {
jobs []Job
errs []error
}
} }
// SchedulerStarted increments the started count func newTestSchedulerMonitor() *testSchedulerMonitor {
func (m *mockSchedulerMonitor) SchedulerStarted() { return &testSchedulerMonitor{
m.mu.Lock() startedCalls: make([]time.Time, 0),
defer m.mu.Unlock() shutdownCalls: make([]time.Time, 0),
m.schedulerStartedCount++ jobRegCalls: make([]Job, 0),
} jobUnregCalls: make([]Job, 0),
jobStartCalls: make([]Job, 0),
// SchedulerStopped increments the stopped count jobRunningCalls: make([]Job, 0),
func (m *mockSchedulerMonitor) SchedulerStopped() { jobCompletedCalls: make([]Job, 0),
m.mu.Lock() jobFailedCalls: struct {
defer m.mu.Unlock() jobs []Job
m.schedulerStoppedCount++ errs []error
} }{
jobs: make([]Job, 0),
// getSchedulerStartedCount returns the count of SchedulerStarted calls errs: make([]error, 0),
func (m *mockSchedulerMonitor) getSchedulerStartedCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.schedulerStartedCount
}
// getSchedulerStoppedCount returns the count of SchedulerStopped calls
func (m *mockSchedulerMonitor) getSchedulerStoppedCount() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.schedulerStoppedCount
}
// Test that SchedulerStopped is called
func TestSchedulerMonitor_SchedulerStopped(t *testing.T) {
monitor := &mockSchedulerMonitor{}
s, err := NewScheduler(WithSchedulerMonitor(monitor))
require.NoError(t, err)
s.Start()
time.Sleep(10 * time.Millisecond)
err = s.Shutdown()
require.NoError(t, err)
// verify SchedulerStopped was called exactly once
assert.Equal(t, 1, monitor.getSchedulerStoppedCount())
}
// Test multiple start/stop cycles
func TestSchedulerMonitor_MultipleStartStopCycles(t *testing.T) {
monitor := &mockSchedulerMonitor{}
s, err := NewScheduler(WithSchedulerMonitor(monitor))
require.NoError(t, err)
// first cycle
s.Start()
time.Sleep(10 * time.Millisecond)
err = s.Shutdown()
require.NoError(t, err)
// second cycle
s.Start()
time.Sleep(10 * time.Millisecond)
err = s.Shutdown()
require.NoError(t, err)
// verifying counts
assert.Equal(t, 2, monitor.getSchedulerStartedCount())
assert.Equal(t, 2, monitor.getSchedulerStoppedCount())
}
// Test that start and stop are called in correct order
func TestSchedulerMonitor_StartStopOrder(t *testing.T) {
events := []string{}
var mu sync.Mutex
monitor := &orderTrackingMonitor{
onStart: func() {
mu.Lock()
events = append(events, "started")
mu.Unlock()
},
onStop: func() {
mu.Lock()
events = append(events, "stopped")
mu.Unlock()
}, },
} }
}
s, err := NewScheduler(WithSchedulerMonitor(monitor)) func (t *testSchedulerMonitor) SchedulerStarted() {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.startedCount, 1)
t.startedCalls = append(t.startedCalls, time.Now())
}
func (t *testSchedulerMonitor) SchedulerShutdown() {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.shutdownCount, 1)
t.shutdownCalls = append(t.shutdownCalls, time.Now())
}
func (t *testSchedulerMonitor) getStartedCount() int64 {
return atomic.LoadInt64(&t.startedCount)
}
func (t *testSchedulerMonitor) getShutdownCount() int64 {
return atomic.LoadInt64(&t.shutdownCount)
}
func (t *testSchedulerMonitor) getStartedCalls() []time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return append([]time.Time{}, t.startedCalls...)
}
func (t *testSchedulerMonitor) getShutdownCalls() []time.Time {
t.mu.RLock()
defer t.mu.RUnlock()
return append([]time.Time{}, t.shutdownCalls...)
}
func (t *testSchedulerMonitor) JobRegistered(job *Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobRegCount, 1)
t.jobRegCalls = append(t.jobRegCalls, *job)
}
func (t *testSchedulerMonitor) JobUnregistered(job *Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobUnregCount, 1)
t.jobUnregCalls = append(t.jobUnregCalls, *job)
}
func (t *testSchedulerMonitor) JobStarted(job *Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobStartCount, 1)
t.jobStartCalls = append(t.jobStartCalls, *job)
}
func (t *testSchedulerMonitor) JobRunning(job *Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobRunningCount, 1)
t.jobRunningCalls = append(t.jobRunningCalls, *job)
}
func (t *testSchedulerMonitor) JobCompleted(job *Job) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobCompletedCount, 1)
t.jobCompletedCalls = append(t.jobCompletedCalls, *job)
}
func (t *testSchedulerMonitor) JobFailed(job *Job, err error) {
t.mu.Lock()
defer t.mu.Unlock()
atomic.AddInt64(&t.jobFailedCount, 1)
t.jobFailedCalls.jobs = append(t.jobFailedCalls.jobs, *job)
t.jobFailedCalls.errs = append(t.jobFailedCalls.errs, err)
}
func (t *testSchedulerMonitor) getJobRegCount() int64 {
return atomic.LoadInt64(&t.jobRegCount)
}
// func (t *testSchedulerMonitor) getJobUnregCount() int64 {
// return atomic.LoadInt64(&t.jobUnregCount)
// }
func (t *testSchedulerMonitor) getJobStartCount() int64 {
return atomic.LoadInt64(&t.jobStartCount)
}
func (t *testSchedulerMonitor) getJobRunningCount() int64 {
return atomic.LoadInt64(&t.jobRunningCount)
}
func (t *testSchedulerMonitor) getJobCompletedCount() int64 {
return atomic.LoadInt64(&t.jobCompletedCount)
}
func (t *testSchedulerMonitor) getJobFailedCount() int64 {
return atomic.LoadInt64(&t.jobFailedCount)
}
// func (t *testSchedulerMonitor) getJobRegCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobRegCalls...)
// }
// func (t *testSchedulerMonitor) getJobUnregCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobUnregCalls...)
// }
// func (t *testSchedulerMonitor) getJobStartCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobStartCalls...)
// }
// func (t *testSchedulerMonitor) getJobRunningCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobRunningCalls...)
// }
// func (t *testSchedulerMonitor) getJobCompletedCalls() []Job {
// t.mu.RLock()
// defer t.mu.RUnlock()
// return append([]Job{}, t.jobCompletedCalls...)
// }
func (t *testSchedulerMonitor) getJobFailedCalls() ([]Job, []error) {
t.mu.RLock()
defer t.mu.RUnlock()
jobs := append([]Job{}, t.jobFailedCalls.jobs...)
errs := append([]error{}, t.jobFailedCalls.errs...)
return jobs, errs
}
func TestSchedulerMonitor_Basic(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Before starting, monitor should not have been called
assert.Equal(t, int64(0), monitor.getStartedCount())
assert.Equal(t, int64(0), monitor.getShutdownCount())
// Add a simple job
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
// Start the scheduler
s.Start()
// Wait a bit for the start to complete
time.Sleep(50 * time.Millisecond)
// SchedulerStarted should have been called once
assert.Equal(t, int64(1), monitor.getStartedCount())
assert.Equal(t, int64(0), monitor.getShutdownCount())
// Shutdown the scheduler
err = s.Shutdown()
require.NoError(t, err)
// SchedulerShutdown should have been called once
assert.Equal(t, int64(1), monitor.getStartedCount())
assert.Equal(t, int64(1), monitor.getShutdownCount())
// Verify the order of calls
startedCalls := monitor.getStartedCalls()
shutdownCalls := monitor.getShutdownCalls()
require.Len(t, startedCalls, 1)
require.Len(t, shutdownCalls, 1)
assert.True(t, startedCalls[0].Before(shutdownCalls[0]),
"SchedulerStarted should be called before SchedulerShutdown")
}
func TestSchedulerMonitor_MultipleStartStop(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
// Start and stop multiple times
s.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(1), monitor.getStartedCount())
err = s.StopJobs()
require.NoError(t, err)
// StopJobs shouldn't call SchedulerShutdown
assert.Equal(t, int64(0), monitor.getShutdownCount())
// Start again
s.Start()
time.Sleep(50 * time.Millisecond)
assert.Equal(t, int64(2), monitor.getStartedCount())
// Final shutdown
err = s.Shutdown()
require.NoError(t, err)
assert.Equal(t, int64(1), monitor.getShutdownCount())
}
func TestSchedulerMonitor_WithoutMonitor(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
// Create scheduler without monitor - should not panic
s := newTestScheduler(t)
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err) require.NoError(t, err)
s.Start() s.Start()
time.Sleep(10 * time.Millisecond) time.Sleep(50 * time.Millisecond)
require.NoError(t, s.Shutdown())
time.Sleep(10 * time.Millisecond)
mu.Lock()
defer mu.Unlock()
assert.Equal(t, []string{"started", "stopped"}, events)
}
// Helper monitor for tracking order
type orderTrackingMonitor struct {
onStart func()
onStop func()
}
func (m *orderTrackingMonitor) SchedulerStarted() {
if m.onStart != nil {
m.onStart()
}
}
func (m *orderTrackingMonitor) SchedulerStopped() {
if m.onStop != nil {
m.onStop()
}
}
// Test that stopped is not called if scheduler wasn't started
func TestSchedulerMonitor_StoppedNotCalledIfNotStarted(t *testing.T) {
monitor := &mockSchedulerMonitor{}
s, err := NewScheduler(WithSchedulerMonitor(monitor))
require.NoError(t, err)
err = s.Shutdown() err = s.Shutdown()
if err != nil { require.NoError(t, err)
t.Logf("Shutdown returned error as expected: %v", err) }
func TestSchedulerMonitor_NilMonitor(t *testing.T) {
// Attempting to create a scheduler with nil monitor should error
_, err := NewScheduler(WithSchedulerMonitor(nil))
assert.Error(t, err)
assert.Equal(t, ErrSchedulerMonitorNil, err)
}
func TestSchedulerMonitor_ConcurrentAccess(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Add multiple jobs
for i := 0; i < 10; i++ {
_, err := s.NewJob(
DurationJob(100*time.Millisecond),
NewTask(func() {}),
)
require.NoError(t, err)
} }
// Depending on implementation, this might return an error // Start scheduler once (normal use case)
// But stopped should not be called s.Start()
assert.Equal(t, 0, monitor.getSchedulerStoppedCount()) time.Sleep(150 * time.Millisecond)
// Verify monitor was called
assert.Equal(t, int64(1), monitor.getStartedCount())
err := s.Shutdown()
require.NoError(t, err)
// Monitor should be called for shutdown
assert.Equal(t, int64(1), monitor.getShutdownCount())
}
func TestSchedulerMonitor_StartWithoutJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Start scheduler without any jobs
s.Start()
time.Sleep(50 * time.Millisecond)
// Monitor should still be called
assert.Equal(t, int64(1), monitor.getStartedCount())
err := s.Shutdown()
require.NoError(t, err)
assert.Equal(t, int64(1), monitor.getShutdownCount())
}
func TestSchedulerMonitor_ShutdownWithoutStart(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
_, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
// Shutdown without starting
err = s.Shutdown()
require.NoError(t, err)
// SchedulerStarted should not be called
assert.Equal(t, int64(0), monitor.getStartedCount())
// SchedulerShutdown should not be called if scheduler was never started
assert.Equal(t, int64(0), monitor.getShutdownCount())
}
func TestSchedulerMonitor_ThreadSafety(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
// Simulate concurrent calls to the monitor from multiple goroutines
var wg sync.WaitGroup
iterations := 100
for i := 0; i < iterations; i++ {
wg.Add(2)
go func() {
defer wg.Done()
monitor.SchedulerStarted()
}()
go func() {
defer wg.Done()
monitor.SchedulerShutdown()
}()
}
wg.Wait()
// Verify all calls were recorded
assert.Equal(t, int64(iterations), monitor.getStartedCount())
assert.Equal(t, int64(iterations), monitor.getShutdownCount())
assert.Len(t, monitor.getStartedCalls(), iterations)
assert.Len(t, monitor.getShutdownCalls(), iterations)
}
func TestSchedulerMonitor_IntegrationWithJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)
monitor := newTestSchedulerMonitor()
s := newTestScheduler(t, WithSchedulerMonitor(monitor))
// Test successful job
jobRunCount := atomic.Int32{}
_, err := s.NewJob(
DurationJob(50*time.Millisecond),
NewTask(func() {
jobRunCount.Add(1)
}),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
// Test failing job
_, err = s.NewJob(
DurationJob(50*time.Millisecond),
NewTask(func() error {
return fmt.Errorf("test error")
}),
WithStartAt(WithStartImmediately()),
)
require.NoError(t, err)
// Start scheduler
s.Start()
time.Sleep(150 * time.Millisecond) // Wait for jobs to execute
// Verify scheduler lifecycle events
assert.Equal(t, int64(1), monitor.getStartedCount())
assert.GreaterOrEqual(t, jobRunCount.Load(), int32(1))
// Verify job registration
assert.Equal(t, int64(2), monitor.getJobRegCount(), "Should have registered 2 jobs")
// Verify job execution events
assert.GreaterOrEqual(t, monitor.getJobStartCount(), int64(1), "Jobs should have started")
assert.GreaterOrEqual(t, monitor.getJobRunningCount(), int64(1), "Jobs should be running")
assert.GreaterOrEqual(t, monitor.getJobCompletedCount(), int64(1), "Successful job should complete")
assert.GreaterOrEqual(t, monitor.getJobFailedCount(), int64(1), "Failing job should fail")
// Get failed job details
failedJobs, errors := monitor.getJobFailedCalls()
assert.NotEmpty(t, failedJobs, "Should have recorded failed jobs")
assert.NotEmpty(t, errors, "Should have recorded job errors")
assert.Contains(t, errors[0].Error(), "test error", "Should record the correct error")
// Shutdown
err = s.Shutdown()
require.NoError(t, err)
assert.Equal(t, int64(1), monitor.getShutdownCount())
} }