diff --git a/executor.go b/executor.go index 0830753..ef7c9f8 100644 --- a/executor.go +++ b/executor.go @@ -157,6 +157,12 @@ func (e *executor) start() { // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric + ctx2, cancel2 := context.WithCancel(executorCtx) + job := requestJobCtx(ctx2, jIn.id, e.jobOutRequest) + cancel2() + if job != nil && e.scheduler != nil { + e.scheduler.notifyConcurrencyLimitReached("limit", e.scheduler.jobFromInternalJob(*job)) + } e.sendOutForRescheduling(&jIn) } } else { @@ -211,6 +217,10 @@ func (e *executor) start() { // which means we just skip it here and do nothing e.incrementJobCounter(*j, SingletonRescheduled) e.sendOutForRescheduling(&jIn) + // Notify concurrency limit reached + if e.scheduler != nil { + e.scheduler.notifyConcurrencyLimitReached("singleton", e.scheduler.jobFromInternalJob(*j)) + } } } else { // wait mode, fill up that queue (buffered channel, so it's ok) @@ -418,11 +428,15 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) - // Notify job started + // Notify job started + actualStartTime := time.Now() if e.scheduler != nil { - out := e.scheduler.jobFromInternalJob(j) - var jobIface Job = out - e.scheduler.notifyJobStarted(&jobIface) + jobObj := e.scheduler.jobFromInternalJob(j) + e.scheduler.notifyJobStarted(jobObj) + // Notify scheduling delay if job had a scheduled time + if len(j.nextScheduled) > 0 { + e.scheduler.notifyJobSchedulingDelay(jobObj, j.nextScheduled[0], actualStartTime) + } } err := callJobFuncWithParams(j.beforeJobRunsSkipIfBeforeFuncErrors, j.id, j.name) @@ -434,18 +448,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { } // Notify job failed (before actual run) if e.scheduler != nil { - out := e.scheduler.jobFromInternalJob(j) - var jobIface Job = out - e.scheduler.notifyJobFailed(&jobIface, err) + e.scheduler.notifyJobFailed(e.scheduler.jobFromInternalJob(j), err) } return } // Notify job running if e.scheduler != nil { - out := e.scheduler.jobFromInternalJob(j) - var jobIface Job = out - e.scheduler.notifyJobRunning(&jobIface) + e.scheduler.notifyJobRunning(e.scheduler.jobFromInternalJob(j)) } // For intervalFromCompletion, we need to reschedule AFTER the job completes, @@ -468,22 +478,24 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { if err != nil { _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) e.incrementJobCounter(j, Fail) - e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err) + endTime := time.Now() + e.recordJobTimingWithStatus(startTime, endTime, j, Fail, err) // Notify job failed if e.scheduler != nil { - out := e.scheduler.jobFromInternalJob(j) - var jobIface Job = out - e.scheduler.notifyJobFailed(&jobIface, err) + jobObj := e.scheduler.jobFromInternalJob(j) + e.scheduler.notifyJobFailed(jobObj, err) + e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime)) } } else { _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) e.incrementJobCounter(j, Success) - e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil) + endTime := time.Now() + e.recordJobTimingWithStatus(startTime, endTime, j, Success, nil) // Notify job completed if e.scheduler != nil { - out := e.scheduler.jobFromInternalJob(j) - var jobIface Job = out - e.scheduler.notifyJobCompleted(&jobIface) + jobObj := e.scheduler.jobFromInternalJob(j) + e.scheduler.notifyJobCompleted(jobObj) + e.scheduler.notifyJobExecutionTime(jobObj, endTime.Sub(startTime)) } } diff --git a/scheduler.go b/scheduler.go index 93b5ab5..ea1b8bf 100644 --- a/scheduler.go +++ b/scheduler.go @@ -277,6 +277,9 @@ func (s *scheduler) stopScheduler() { s.stopErrCh <- err s.started.Store(false) s.logger.Debug("gocron: scheduler stopped") + + // Notify monitor that scheduler has stopped + s.notifySchedulerStopped() } func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { @@ -329,8 +332,7 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { } if s.schedulerMonitor != nil { out := s.jobFromInternalJob(j) - var job Job = &out - s.notifyJobUnregistered(&job) + s.notifyJobUnregistered(out) } j.stop() delete(s.jobs, id) @@ -542,8 +544,7 @@ func (s *scheduler) selectRemoveJobsByTags(tags []string) { if slices.Contains(j.tags, tag) { if s.schedulerMonitor != nil { out := s.jobFromInternalJob(j) - var job Job = &out - s.notifyJobUnregistered(&job) + s.notifyJobUnregistered(out) } j.stop() delete(s.jobs, j.id) @@ -804,8 +805,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW out := s.jobFromInternalJob(j) if s.schedulerMonitor != nil { - var job Job = out - s.notifyJobRegistered(&job) + s.notifyJobRegistered(out) } return &out, nil } @@ -1107,43 +1107,71 @@ func (s *scheduler) notifySchedulerShutdown() { } // notifyJobRegistered notifies the monitor that a job has been registered -func (s *scheduler) notifyJobRegistered(job *Job) { +func (s *scheduler) notifyJobRegistered(job Job) { if s.schedulerMonitor != nil { s.schedulerMonitor.JobRegistered(job) } } // notifyJobUnregistered notifies the monitor that a job has been unregistered -func (s *scheduler) notifyJobUnregistered(job *Job) { +func (s *scheduler) notifyJobUnregistered(job Job) { if s.schedulerMonitor != nil { s.schedulerMonitor.JobUnregistered(job) } } // notifyJobStarted notifies the monitor that a job has started -func (s *scheduler) notifyJobStarted(job *Job) { +func (s *scheduler) notifyJobStarted(job Job) { if s.schedulerMonitor != nil { s.schedulerMonitor.JobStarted(job) } } // notifyJobRunning notifies the monitor that a job is running. -func (s *scheduler) notifyJobRunning(job *Job) { +func (s *scheduler) notifyJobRunning(job Job) { if s.schedulerMonitor != nil { s.schedulerMonitor.JobRunning(job) } } // notifyJobCompleted notifies the monitor that a job has completed. -func (s *scheduler) notifyJobCompleted(job *Job) { +func (s *scheduler) notifyJobCompleted(job Job) { if s.schedulerMonitor != nil { s.schedulerMonitor.JobCompleted(job) } } // notifyJobFailed notifies the monitor that a job has failed. -func (s *scheduler) notifyJobFailed(job *Job, err error) { +func (s *scheduler) notifyJobFailed(job Job, err error) { if s.schedulerMonitor != nil { s.schedulerMonitor.JobFailed(job, err) } } + +// notifySchedulerStopped notifies the monitor that the scheduler has stopped +func (s *scheduler) notifySchedulerStopped() { + if s.schedulerMonitor != nil { + s.schedulerMonitor.SchedulerStopped() + } +} + +// notifyJobExecutionTime notifies the monitor of a job's execution time +func (s *scheduler) notifyJobExecutionTime(job Job, duration time.Duration) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobExecutionTime(job, duration) + } +} + +// notifyJobSchedulingDelay notifies the monitor of scheduling delay +func (s *scheduler) notifyJobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.JobSchedulingDelay(job, scheduledTime, actualStartTime) + } +} + +// notifyConcurrencyLimitReached notifies the monitor that a concurrency limit was reached +func (s *scheduler) notifyConcurrencyLimitReached(limitType string, job Job) { + if s.schedulerMonitor != nil { + s.schedulerMonitor.ConcurrencyLimitReached(limitType, job) + } +} diff --git a/scheduler_monitor.go b/scheduler_monitor.go index f5619e1..b097025 100644 --- a/scheduler_monitor.go +++ b/scheduler_monitor.go @@ -1,29 +1,50 @@ package gocron +import "time" + // SchedulerMonitor is called by the Scheduler to provide scheduler-level // metrics and events. type SchedulerMonitor interface { // SchedulerStarted is called when Start() is invoked on the scheduler. SchedulerStarted() + // SchedulerStopped is called when the scheduler's main loop stops, + // but before final cleanup in Shutdown(). + SchedulerStopped() + // SchedulerShutdown is called when Shutdown() completes successfully. SchedulerShutdown() // JobRegistered is called when a job is registered with the scheduler. - JobRegistered(job *Job) + JobRegistered(job Job) // JobUnregistered is called when a job is unregistered from the scheduler. - JobUnregistered(job *Job) + JobUnregistered(job Job) // JobStarted is called when a job starts running. - JobStarted(job *Job) + JobStarted(job Job) // JobRunning is called when a job is running. - JobRunning(job *Job) + JobRunning(job Job) // JobFailed is called when a job fails to complete successfully. - JobFailed(job *Job, err error) + JobFailed(job Job, err error) // JobCompleted is called when a job has completed running. - JobCompleted(job *Job) + JobCompleted(job Job) + + // JobExecutionTime is called after a job completes (success or failure) + // with the time it took to execute. This enables calculation of metrics + // like AverageExecutionTime. + JobExecutionTime(job Job, duration time.Duration) + + // JobSchedulingDelay is called when a job starts running, providing both + // the scheduled time and actual start time. This enables calculation of + // SchedulingLag metrics to detect when jobs are running behind schedule. + JobSchedulingDelay(job Job, scheduledTime time.Time, actualStartTime time.Time) + + // ConcurrencyLimitReached is called when a job cannot start immediately + // due to concurrency limits (singleton or limit mode). + // limitType will be "singleton" or "limit". + ConcurrencyLimitReached(limitType string, job Job) } diff --git a/scheduler_monitor_test.go b/scheduler_monitor_test.go index 889d97a..5c64313 100644 --- a/scheduler_monitor_test.go +++ b/scheduler_monitor_test.go @@ -14,23 +14,29 @@ import ( // testSchedulerMonitor is a test implementation of SchedulerMonitor // that tracks scheduler lifecycle events type testSchedulerMonitor struct { - mu sync.RWMutex - startedCount int64 - shutdownCount int64 - jobRegCount int64 - jobUnregCount int64 - jobStartCount int64 - jobRunningCount int64 - jobCompletedCount int64 - jobFailedCount int64 - startedCalls []time.Time - shutdownCalls []time.Time - jobRegCalls []Job - jobUnregCalls []Job - jobStartCalls []Job - jobRunningCalls []Job - jobCompletedCalls []Job - jobFailedCalls struct { + mu sync.RWMutex + startedCount int64 + stoppedCount int64 + shutdownCount int64 + jobRegCount int64 + jobUnregCount int64 + jobStartCount int64 + jobRunningCount int64 + jobCompletedCount int64 + jobFailedCount int64 + concurrencyLimitCount int64 + startedCalls []time.Time + stoppedCalls []time.Time + shutdownCalls []time.Time + jobRegCalls []Job + jobUnregCalls []Job + jobStartCalls []Job + jobRunningCalls []Job + jobCompletedCalls []Job + jobExecutionTimes []time.Duration + jobSchedulingDelays []time.Duration + concurrencyLimitCalls []string + jobFailedCalls struct { jobs []Job errs []error } @@ -38,13 +44,17 @@ type testSchedulerMonitor struct { func newTestSchedulerMonitor() *testSchedulerMonitor { return &testSchedulerMonitor{ - startedCalls: make([]time.Time, 0), - shutdownCalls: make([]time.Time, 0), - jobRegCalls: make([]Job, 0), - jobUnregCalls: make([]Job, 0), - jobStartCalls: make([]Job, 0), - jobRunningCalls: make([]Job, 0), - jobCompletedCalls: make([]Job, 0), + startedCalls: make([]time.Time, 0), + stoppedCalls: make([]time.Time, 0), + shutdownCalls: make([]time.Time, 0), + jobRegCalls: make([]Job, 0), + jobUnregCalls: make([]Job, 0), + jobStartCalls: make([]Job, 0), + jobRunningCalls: make([]Job, 0), + jobCompletedCalls: make([]Job, 0), + jobExecutionTimes: make([]time.Duration, 0), + jobSchedulingDelays: make([]time.Duration, 0), + concurrencyLimitCalls: make([]string, 0), jobFailedCalls: struct { jobs []Job errs []error @@ -62,6 +72,13 @@ func (t *testSchedulerMonitor) SchedulerStarted() { t.startedCalls = append(t.startedCalls, time.Now()) } +func (t *testSchedulerMonitor) SchedulerStopped() { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.stoppedCount, 1) + t.stoppedCalls = append(t.stoppedCalls, time.Now()) +} + func (t *testSchedulerMonitor) SchedulerShutdown() { t.mu.Lock() defer t.mu.Unlock() @@ -89,56 +106,78 @@ func (t *testSchedulerMonitor) getShutdownCalls() []time.Time { return append([]time.Time{}, t.shutdownCalls...) } -func (t *testSchedulerMonitor) JobRegistered(job *Job) { +func (t *testSchedulerMonitor) JobRegistered(job Job) { t.mu.Lock() defer t.mu.Unlock() atomic.AddInt64(&t.jobRegCount, 1) - t.jobRegCalls = append(t.jobRegCalls, *job) + t.jobRegCalls = append(t.jobRegCalls, job) } -func (t *testSchedulerMonitor) JobUnregistered(job *Job) { +func (t *testSchedulerMonitor) JobUnregistered(job Job) { t.mu.Lock() defer t.mu.Unlock() atomic.AddInt64(&t.jobUnregCount, 1) - t.jobUnregCalls = append(t.jobUnregCalls, *job) + t.jobUnregCalls = append(t.jobUnregCalls, job) } -func (t *testSchedulerMonitor) JobStarted(job *Job) { +func (t *testSchedulerMonitor) JobStarted(job Job) { t.mu.Lock() defer t.mu.Unlock() atomic.AddInt64(&t.jobStartCount, 1) - t.jobStartCalls = append(t.jobStartCalls, *job) + t.jobStartCalls = append(t.jobStartCalls, job) } -func (t *testSchedulerMonitor) JobRunning(job *Job) { +func (t *testSchedulerMonitor) JobRunning(job Job) { t.mu.Lock() defer t.mu.Unlock() atomic.AddInt64(&t.jobRunningCount, 1) - t.jobRunningCalls = append(t.jobRunningCalls, *job) + t.jobRunningCalls = append(t.jobRunningCalls, job) } -func (t *testSchedulerMonitor) JobCompleted(job *Job) { +func (t *testSchedulerMonitor) JobCompleted(job Job) { t.mu.Lock() defer t.mu.Unlock() atomic.AddInt64(&t.jobCompletedCount, 1) - t.jobCompletedCalls = append(t.jobCompletedCalls, *job) + t.jobCompletedCalls = append(t.jobCompletedCalls, job) } -func (t *testSchedulerMonitor) JobFailed(job *Job, err error) { +func (t *testSchedulerMonitor) JobFailed(job Job, err error) { t.mu.Lock() defer t.mu.Unlock() atomic.AddInt64(&t.jobFailedCount, 1) - t.jobFailedCalls.jobs = append(t.jobFailedCalls.jobs, *job) + t.jobFailedCalls.jobs = append(t.jobFailedCalls.jobs, job) t.jobFailedCalls.errs = append(t.jobFailedCalls.errs, err) } +func (t *testSchedulerMonitor) JobExecutionTime(_ Job, duration time.Duration) { + t.mu.Lock() + defer t.mu.Unlock() + t.jobExecutionTimes = append(t.jobExecutionTimes, duration) +} + +func (t *testSchedulerMonitor) JobSchedulingDelay(_ Job, scheduledTime time.Time, actualStartTime time.Time) { + t.mu.Lock() + defer t.mu.Unlock() + delay := actualStartTime.Sub(scheduledTime) + if delay > 0 { + t.jobSchedulingDelays = append(t.jobSchedulingDelays, delay) + } +} + +func (t *testSchedulerMonitor) ConcurrencyLimitReached(limitType string, _ Job) { + t.mu.Lock() + defer t.mu.Unlock() + atomic.AddInt64(&t.concurrencyLimitCount, 1) + t.concurrencyLimitCalls = append(t.concurrencyLimitCalls, limitType) +} + func (t *testSchedulerMonitor) getJobRegCount() int64 { return atomic.LoadInt64(&t.jobRegCount) } -// func (t *testSchedulerMonitor) getJobUnregCount() int64 { -// return atomic.LoadInt64(&t.jobUnregCount) -// } +func (t *testSchedulerMonitor) getJobUnregCount() int64 { + return atomic.LoadInt64(&t.jobUnregCount) +} func (t *testSchedulerMonitor) getJobStartCount() int64 { return atomic.LoadInt64(&t.jobStartCount) @@ -404,7 +443,7 @@ func TestSchedulerMonitor_IntegrationWithJobs(t *testing.T) { // Test successful job jobRunCount := atomic.Int32{} - _, err := s.NewJob( + j, err := s.NewJob( DurationJob(50*time.Millisecond), NewTask(func() { jobRunCount.Add(1) @@ -446,6 +485,12 @@ func TestSchedulerMonitor_IntegrationWithJobs(t *testing.T) { assert.NotEmpty(t, errors, "Should have recorded job errors") assert.Contains(t, errors[0].Error(), "test error", "Should record the correct error") + // Test unregistration + err = s.RemoveJob(j.ID()) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) // Wait for async removal + assert.Equal(t, int64(1), monitor.getJobUnregCount(), "Should have unregistered 1 job") + // Shutdown err = s.Shutdown() require.NoError(t, err)