mirror of https://github.com/go-co-op/gocron.git
git commit -m "feat: add SchedulerStarted and SchedulerStopped monitoring"
This commit is contained in:
parent
11cf7cf8e2
commit
83f5f60b67
|
|
@ -59,6 +59,7 @@ var (
|
||||||
ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end")
|
ErrStartTimeLaterThanEndTime = errors.New("gocron: WithStartDateTime: start must not be later than end")
|
||||||
ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start")
|
ErrStopTimeEarlierThanStartTime = errors.New("gocron: WithStopDateTime: end must not be earlier than start")
|
||||||
ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0")
|
ErrWithStopTimeoutZeroOrNegative = errors.New("gocron: WithStopTimeout: timeout must be greater than 0")
|
||||||
|
ErrSchedulerMonitorNil = errors.New("gocron: scheduler monitor cannot be nil")
|
||||||
)
|
)
|
||||||
|
|
||||||
// internal errors
|
// internal errors
|
||||||
|
|
|
||||||
47
scheduler.go
47
scheduler.go
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -100,6 +101,11 @@ type scheduler struct {
|
||||||
removeJobCh chan uuid.UUID
|
removeJobCh chan uuid.UUID
|
||||||
// requests from the client to remove jobs by tags are received here
|
// requests from the client to remove jobs by tags are received here
|
||||||
removeJobsByTagsCh chan []string
|
removeJobsByTagsCh chan []string
|
||||||
|
|
||||||
|
// scheduler monitor from which metrics can be collected
|
||||||
|
schedulerMonitor SchedulerMonitor
|
||||||
|
// mutex to protect access to the scheduler monitor
|
||||||
|
schedulerMonitorMu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
type newJobIn struct {
|
type newJobIn struct {
|
||||||
|
|
@ -815,8 +821,13 @@ func (s *scheduler) RemoveJob(id uuid.UUID) error {
|
||||||
func (s *scheduler) Start() {
|
func (s *scheduler) Start() {
|
||||||
select {
|
select {
|
||||||
case <-s.shutdownCtx.Done():
|
case <-s.shutdownCtx.Done():
|
||||||
|
// Scheduler already shut down, don't notify
|
||||||
|
return
|
||||||
case s.startCh <- struct{}{}:
|
case s.startCh <- struct{}{}:
|
||||||
<-s.startedCh
|
<-s.startedCh // Wait for scheduler to actually start
|
||||||
|
|
||||||
|
// Scheduler has started
|
||||||
|
s.notifySchedulerStarted()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -849,6 +860,9 @@ func (s *scheduler) Shutdown() error {
|
||||||
select {
|
select {
|
||||||
case err := <-s.stopErrCh:
|
case err := <-s.stopErrCh:
|
||||||
t.Stop()
|
t.Stop()
|
||||||
|
|
||||||
|
// notify monitor that scheduler stopped
|
||||||
|
s.notifySchedulerStopped()
|
||||||
return err
|
return err
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
return ErrStopSchedulerTimedOut
|
return ErrStopSchedulerTimedOut
|
||||||
|
|
@ -1054,3 +1068,34 @@ func WithMonitorStatus(monitor MonitorStatus) SchedulerOption {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithSchedulerMonitor sets a monitor that will be called with scheduler-level events.
|
||||||
|
func WithSchedulerMonitor(monitor SchedulerMonitor) SchedulerOption {
|
||||||
|
return func(s *scheduler) error {
|
||||||
|
if monitor == nil {
|
||||||
|
return ErrSchedulerMonitorNil
|
||||||
|
}
|
||||||
|
s.schedulerMonitorMu.Lock()
|
||||||
|
defer s.schedulerMonitorMu.Unlock()
|
||||||
|
s.schedulerMonitor = monitor
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifySchedulerStarted notifies the monitor that scheduler has started
|
||||||
|
func (s *scheduler) notifySchedulerStarted() {
|
||||||
|
s.schedulerMonitorMu.RLock()
|
||||||
|
defer s.schedulerMonitorMu.RUnlock()
|
||||||
|
if s.schedulerMonitor != nil {
|
||||||
|
s.schedulerMonitor.SchedulerStarted()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// notifySchedulerStopped notifies the monitor that scheduler has stopped
|
||||||
|
func (s *scheduler) notifySchedulerStopped() {
|
||||||
|
s.schedulerMonitorMu.RLock()
|
||||||
|
defer s.schedulerMonitorMu.RUnlock()
|
||||||
|
if s.schedulerMonitor != nil {
|
||||||
|
s.schedulerMonitor.SchedulerStopped()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,11 @@
|
||||||
|
package gocron
|
||||||
|
|
||||||
|
// SchedulerMonitor is called by the Scheduler to provide scheduler-level
|
||||||
|
// metrics and events.
|
||||||
|
type SchedulerMonitor interface {
|
||||||
|
// SchedulerStarted is called when Start() is invoked on the scheduler.
|
||||||
|
SchedulerStarted()
|
||||||
|
|
||||||
|
// SchedulerStopped is called when Shutdown() completes successfully.
|
||||||
|
SchedulerStopped()
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,148 @@
|
||||||
|
package gocron
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mockSchedulerMonitor is a test implementation
|
||||||
|
type mockSchedulerMonitor struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
schedulerStartedCount int
|
||||||
|
schedulerStoppedCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
// SchedulerStarted increments the started count
|
||||||
|
func (m *mockSchedulerMonitor) SchedulerStarted() {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.schedulerStartedCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
// SchedulerStopped increments the stopped count
|
||||||
|
func (m *mockSchedulerMonitor) SchedulerStopped() {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.schedulerStoppedCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
// getSchedulerStartedCount returns the count of SchedulerStarted calls
|
||||||
|
func (m *mockSchedulerMonitor) getSchedulerStartedCount() int {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
return m.schedulerStartedCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// getSchedulerStoppedCount returns the count of SchedulerStopped calls
|
||||||
|
func (m *mockSchedulerMonitor) getSchedulerStoppedCount() int {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
return m.schedulerStoppedCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that SchedulerStopped is called
|
||||||
|
func TestSchedulerMonitor_SchedulerStopped(t *testing.T) {
|
||||||
|
monitor := &mockSchedulerMonitor{}
|
||||||
|
s, err := NewScheduler(WithSchedulerMonitor(monitor))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s.Start()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
err = s.Shutdown()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verify SchedulerStopped was called exactly once
|
||||||
|
assert.Equal(t, 1, monitor.getSchedulerStoppedCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test multiple start/stop cycles
|
||||||
|
func TestSchedulerMonitor_MultipleStartStopCycles(t *testing.T) {
|
||||||
|
monitor := &mockSchedulerMonitor{}
|
||||||
|
s, err := NewScheduler(WithSchedulerMonitor(monitor))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// first cycle
|
||||||
|
s.Start()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
err = s.Shutdown()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// second cycle
|
||||||
|
s.Start()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
err = s.Shutdown()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// verifying counts
|
||||||
|
assert.Equal(t, 2, monitor.getSchedulerStartedCount())
|
||||||
|
assert.Equal(t, 2, monitor.getSchedulerStoppedCount())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that start and stop are called in correct order
|
||||||
|
func TestSchedulerMonitor_StartStopOrder(t *testing.T) {
|
||||||
|
events := []string{}
|
||||||
|
var mu sync.Mutex
|
||||||
|
|
||||||
|
monitor := &orderTrackingMonitor{
|
||||||
|
onStart: func() {
|
||||||
|
mu.Lock()
|
||||||
|
events = append(events, "started")
|
||||||
|
mu.Unlock()
|
||||||
|
},
|
||||||
|
onStop: func() {
|
||||||
|
mu.Lock()
|
||||||
|
events = append(events, "stopped")
|
||||||
|
mu.Unlock()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
s, err := NewScheduler(WithSchedulerMonitor(monitor))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s.Start()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
s.Shutdown()
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
assert.Equal(t, []string{"started", "stopped"}, events)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper monitor for tracking order
|
||||||
|
type orderTrackingMonitor struct {
|
||||||
|
onStart func()
|
||||||
|
onStop func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *orderTrackingMonitor) SchedulerStarted() {
|
||||||
|
if m.onStart != nil {
|
||||||
|
m.onStart()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *orderTrackingMonitor) SchedulerStopped() {
|
||||||
|
if m.onStop != nil {
|
||||||
|
m.onStop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that stopped is not called if scheduler wasn't started
|
||||||
|
func TestSchedulerMonitor_StoppedNotCalledIfNotStarted(t *testing.T) {
|
||||||
|
monitor := &mockSchedulerMonitor{}
|
||||||
|
s, err := NewScheduler(WithSchedulerMonitor(monitor))
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = s.Shutdown()
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Shutdown returned error as expected: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Depending on implementation, this might return an error
|
||||||
|
// But stopped should not be called
|
||||||
|
assert.Equal(t, 0, monitor.getSchedulerStoppedCount())
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue