commit ad26a71e0ec6028e5638fbbc75ad03824b50a050 Author: John Roesler Date: Wed Nov 8 11:11:42 2023 -0600 initial clean v2 commit history diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..6fa87f9 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,12 @@ +# These are supported funding model platforms + +github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] +patreon: # Replace with a single Patreon username +open_collective: gocron +ko_fi: # Replace with a single Ko-fi username +tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel +community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry +liberapay: # Replace with a single Liberapay username +issuehunt: # Replace with a single IssueHunt username +otechie: # Replace with a single Otechie username +custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2'] diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..ca534ca --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,18 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates + +version: 2 +updates: + # Maintain dependencies for GitHub Actions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + + # Maintain Go dependencies + - package-ecosystem: "gomod" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..ba012d4 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,71 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ v2 ] + branches-ignore: + - "dependabot/**" + pull_request: + paths-ignore: + - '**.md' + # The branches below must be a subset of the branches above + branches: [ v2 ] + schedule: + - cron: '34 7 * * 1' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + # ℹ️ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/file_formatting.yml b/.github/workflows/file_formatting.yml new file mode 100644 index 0000000..eb3d588 --- /dev/null +++ b/.github/workflows/file_formatting.yml @@ -0,0 +1,19 @@ +on: + push: + branches: + - v2 + pull_request: + branches: + - v2 + +name: formatting +jobs: + check-sorted: + name: check sorted + runs-on: ubuntu-latest + steps: + - name: checkout code + uses: actions/checkout@v3 + - name: verify example_test.go + run: | + grep "^func " example_test.go | sort -C diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml new file mode 100644 index 0000000..0dcc90a --- /dev/null +++ b/.github/workflows/go_test.yml @@ -0,0 +1,30 @@ +on: + push: + branches: + - v2 + pull_request: + branches: + - v2 + +name: golangci-lint +jobs: + golangci: + strategy: + matrix: + go-version: + - "1.21" + name: lint and test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-version }} + - name: golangci-lint + uses: golangci/golangci-lint-action@v3.7.0 + with: + version: v1.55.2 + - name: test + run: make test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6657e3c --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test +local_testing +coverage.out + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +vendor/ + +# IDE project files +.idea diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..faa3fdb --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,52 @@ +run: + timeout: 5m + issues-exit-code: 1 + tests: true + skip-dirs: + - local + +issues: + max-same-issues: 100 + exclude-rules: + - path: _test\.go + linters: + - bodyclose + - errcheck + - gosec + +linters: + enable: + - bodyclose + - exportloopref + - gofumpt + - goimports + - gosec + - gosimple + - govet + - ineffassign + - misspell + - revive + - staticcheck + - typecheck + - unused + - whitespace + +output: + # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" + format: colored-line-number + # print lines of code with issue, default is true + print-issued-lines: true + # print linter name in the end of issue text, default is true + print-linter-name: true + # make issues output unique by line, default is true + uniq-by-line: true + # add a prefix to the output file references; default is no prefix + path-prefix: "" + # sorts results by: filepath, line and column + sort-results: true + +linters-settings: + golint: + min-confidence: 0.8 + +fix: true diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..99b237e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,24 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-added-large-files + - id: check-case-conflict + - id: check-merge-conflict + - id: check-yaml + - id: detect-private-key + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/golangci/golangci-lint + rev: v1.55.2 + hooks: + - id: golangci-lint + - repo: https://github.com/TekWizely/pre-commit-golang + rev: v1.0.0-rc.1 + hooks: + - id: go-fumpt + args: + - -w + - id: go-mod-tidy diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..7d913b5 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,73 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone. And we mean everyone! + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and kind language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team initially on Slack to coordinate private communication. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..b2d3be8 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,40 @@ +# Contributing to gocron + +Thank you for coming to contribute to gocron! We welcome new ideas, PRs and general feedback. + +## Reporting Bugs + +If you find a bug then please let the project know by opening an issue after doing the following: + +- Do a quick search of the existing issues to make sure the bug isn't already reported +- Try and make a minimal list of steps that can reliably reproduce the bug you are experiencing +- Collect as much information as you can to help identify what the issue is (project version, configuration files, etc) + +## Suggesting Enhancements + +If you have a use case that you don't see a way to support yet, we would welcome the feedback in an issue. Before opening the issue, please consider: + +- Is this a common use case? +- Is it simple to understand? + +You can help us out by doing the following before raising a new issue: + +- Check that the feature hasn't been requested already by searching existing issues +- Try and reduce your enhancement into a single, concise and deliverable request, rather than a general idea +- Explain your own use cases as the basis of the request + +## Adding Features + +Pull requests are always welcome. However, before going through the trouble of implementing a change it's worth creating a bug or feature request issue. +This allows us to discuss the changes and make sure they are a good fit for the project. + +Please always make sure a pull request has been: + +- Unit tested with `make test` +- Linted with `make lint` +- Vetted with `make vet` +- Formatted with `make fmt` or validated with `make check-fmt` + +## Writing Tests + +Tests should follow the [table driven test pattern](https://dave.cheney.net/2013/06/09/writing-table-driven-tests-in-go). See other tests in the code base for additional examples. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3357d57 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2014, 辣椒面 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1e16aef --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.PHONY: fmt check-fmt lint vet test + +GO_PKGS := $(shell go list -f {{.Dir}} ./...) + +fmt: + @go list -f {{.Dir}} ./... | xargs -I{} gofmt -w -s {} + +lint: + @grep "^func " example_test.go | sort -c + @golangci-lint run + +test: + @go test -race -v $(GO_FLAGS) -count=1 $(GO_PKGS) diff --git a/README.md b/README.md new file mode 100644 index 0000000..83a3516 --- /dev/null +++ b/README.md @@ -0,0 +1,121 @@ +# gocron: A Golang Job Scheduling Package. + +[![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=v2&event=push)](https://github.com/go-co-op/gocron/actions) +![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron/v2?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron/v2) + +gocron is a job scheduling package which lets you run Go functions at pre-determined intervals. + +If you want to chat, you can find us on Slack at +[](https://gophers.slack.com/archives/CQ7T0T1FW) + +## Concepts + +- **Job**: The encapsulates a "task", which is made up of a go func and any function parameters, and then + provides the scheduler with the time the job should be scheduled to run. +- **Executor**: The executor, calls the "task" function and manages the complexities of different job + execution timing (e.g. singletons that shouldn't overrun each other, limiting the max number of jobs running) +- **Scheduler**: The scheduler keeps track of all the jobs and sends each job to the executor when + it is ready to be run. + +## Quick Start + +``` +go get github.com/go-co-op/gocron/v2 +``` + +```golang +package main + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" +) + +func main() { + // create a scheduler + s, err := gocron.NewScheduler() + if err != nil { + // handle error + } + + // add a job to the scheduler + j, err := s.NewJob( + gocron.DurationJob( + 10*time.Second, + ), + gocron.NewTask( + func(a string, b int) { + // do things + }, + "hello", + 1, + ), + ) + if err != nil { + // handle error + } + // each job has a unique id + fmt.Println(j.ID()) + + // start the scheduler + s.Start() + + // when you're done, shut it down + err = s.Shutdown() + if err != nil { + // handle error + } +} +``` + +## Features + +- **Job types**: Jobs can be run at various intervals. + - [**Duration**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationJob): + Jobs can be run at a fixed `time.Duration`. + - [**Random duration**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationRandomJob): + Jobs can be run at a random `time.Duration` between a min and max. + - [**Cron**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#CronJob): + Jobs can be run using a crontab. + - [**Daily**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DailyJob): + Jobs can be run every x days at specific times. + - [**Weekly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WeeklyJob): + Jobs can be run every x weeks on specific days of the week and at specific times. + - [**Monthly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonthlyJob): + Jobs can be run every x months on specific days of the month and at specific times. +- **Limited Concurrency**: Jobs can be limited individually or across the entire scheduler. + - [**Per job limiting with singleton mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithSingletonMode): + Jobs can be limited to a single concurrent execution that either reschedules (skips overlapping executions) + or queues (waits for the previous execution to finish). + - [**Per scheduler limiting with limit mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithLimitConcurrentJobs): + Jobs can be limited to a certain number of concurrent executions across the entire scheduler + using either reschedule (skip when the limit is met) or queue (jobs are added to a queue to + wait for the limit to be available). +- **Distributed instances of gocron**: Multiple instances of gocron can be run. + - [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector): + An elector can be used to elect a single instance of gocron to run as the primary with the + other instances checking to see if a new leader needs to be elected. +- **Events**: Job events can trigger actions. + - [**Listeners**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithEventListeners): + [Event listeners](https://pkg.go.dev/github.com/go-co-op/gocron/v2#EventListener) + can be added to a job or all jobs in the + [scheduler](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithGlobalJobOptions) + to listen for job events and trigger actions. +- **Options**: Many job and scheduler options are available + - [**Job options**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#JobOption): + Job options can be set when creating a job using `NewJob`. + - [**Global job options**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithGlobalJobOptions): + Global job options can be set when creating a scheduler using `NewScheduler`. + - [**Scheduler options**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#SchedulerOption): + Scheduler options can be set when creating a scheduler using `NewScheduler`. + +## Supporters + +[Jetbrains](https://www.jetbrains.com/?from=gocron) supports this project with Intellij licenses. +We appreciate their support for free and open source software! + +## Star History + +[![Star History Chart](https://api.star-history.com/svg?repos=go-co-op/gocron&type=Date)](https://star-history.com/#go-co-op/gocron&Date) diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..6a97ada --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,15 @@ +# Security Policy + +## Supported Versions + +The current plan is to maintain version 1 as long as possible incorporating any necessary security patches. + +| Version | Supported | +| ------- | ------------------ | +| 1.x.x | :white_check_mark: | + +## Reporting a Vulnerability + +Vulnerabilities can be reported by [opening an issue](https://github.com/go-co-op/gocron/issues/new/choose) or reaching out on Slack: [](https://gophers.slack.com/archives/CQ7T0T1FW) + +We will do our best to addrerss any vulnerabilities in an expeditious manner. diff --git a/distributed.go b/distributed.go new file mode 100644 index 0000000..aef77a0 --- /dev/null +++ b/distributed.go @@ -0,0 +1,11 @@ +package gocron + +import "context" + +// Elector determines the leader from instances asking to be the leader. Only +// the leader runs jobs. If the leader goes down, a new leader will be elected. +type Elector interface { + // IsLeader should return nil if the job should be scheduled by the instance + // making the request and an error if the job should not be scheduled. + IsLeader(context.Context) error +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..a331982 --- /dev/null +++ b/errors.go @@ -0,0 +1,45 @@ +package gocron + +import "fmt" + +// Public error definitions +var ( + ErrCronJobParse = fmt.Errorf("gocron: CronJob: crontab parse failure") + ErrDailyJobAtTimeNil = fmt.Errorf("gocron: DailyJob: atTime within atTimes must not be nil") + ErrDailyJobAtTimesNil = fmt.Errorf("gocron: DailyJob: atTimes must not be nil") + ErrDailyJobHours = fmt.Errorf("gocron: DailyJob: atTimes hours must be between 0 and 23 inclusive") + ErrDailyJobMinutesSeconds = fmt.Errorf("gocron: DailyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") + ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") + ErrJobNotFound = fmt.Errorf("gocron: job not found") + ErrMonthlyJobDays = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must be between 31 and -31 inclusive, and not 0") + ErrMonthlyJobAtTimeNil = fmt.Errorf("gocron: MonthlyJob: atTime within atTimes must not be nil") + ErrMonthlyJobAtTimesNil = fmt.Errorf("gocron: MonthlyJob: atTimes must not be nil") + ErrMonthlyJobDaysNil = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must not be nil") + ErrMonthlyJobHours = fmt.Errorf("gocron: MonthlyJob: atTimes hours must be between 0 and 23 inclusive") + ErrMonthlyJobMinutesSeconds = fmt.Errorf("gocron: MonthlyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrNewJobTask = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func") + ErrNewJobTaskNil = fmt.Errorf("gocron: NewJob: Task must not be nil") + ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop") + ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish") + ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop") + ErrWeeklyJobAtTimeNil = fmt.Errorf("gocron: WeeklyJob: atTime within atTimes must not be nil") + ErrWeeklyJobAtTimesNil = fmt.Errorf("gocron: WeeklyJob: atTimes must not be nil") + ErrWeeklyJobDaysOfTheWeekNil = fmt.Errorf("gocron: WeeklyJob: daysOfTheWeek must not be nil") + ErrWeeklyJobHours = fmt.Errorf("gocron: WeeklyJob: atTimes hours must be between 0 and 23 inclusive") + ErrWeeklyJobMinutesSeconds = fmt.Errorf("gocron: WeeklyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrWithDistributedElector = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil") + ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil") + ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") + ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") + ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") + ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") +) + +// internal errors +var ( + errAtTimeNil = fmt.Errorf("errAtTimeNil") + errAtTimesNil = fmt.Errorf("errAtTimesNil") + errAtTimeHours = fmt.Errorf("errAtTimeHours") + errAtTimeMinSec = fmt.Errorf("errAtTimeMinSec") +) diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..e94efbc --- /dev/null +++ b/example_test.go @@ -0,0 +1,654 @@ +package gocron_test + +import ( + "fmt" + "log/slog" + "sync" + "time" + + . "github.com/go-co-op/gocron/v2" // nolint:revive + "github.com/google/uuid" + "github.com/jonboulle/clockwork" +) + +func ExampleAfterJobRuns() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + AfterJobRuns( + func(jobID uuid.UUID) { + // do something after the job completes + }, + ), + ), + ) +} + +func ExampleAfterJobRunsWithError() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + AfterJobRunsWithError( + func(jobID uuid.UUID, err error) { + // do something when the job returns an error + }, + ), + ), + ) +} + +func ExampleBeforeJobRuns() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + BeforeJobRuns( + func(jobID uuid.UUID) { + // do something immediately before the job is run + }, + ), + ), + ) +} + +func ExampleCronJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + CronJob( + // standard cron tab parsing + "1 * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + _, _ = s.NewJob( + CronJob( + // optionally include seconds as the first field + "* 1 * * * *", + true, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleDailyJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DailyJob( + 1, + NewAtTimes( + NewAtTime(10, 30, 0), + NewAtTime(14, 0, 0), + ), + ), + NewTask( + func(a, b string) {}, + "a", + "b", + ), + ) +} + +func ExampleDurationJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second*5, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleDurationRandomJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationRandomJob( + time.Second, + 5*time.Second, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleJob_ID() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(j.ID()) +} + +func ExampleJob_LastRun() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(j.LastRun()) +} + +func ExampleJob_NextRun() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(j.NextRun()) +} + +func ExampleMonthlyJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + MonthlyJob( + 1, + NewDaysOfTheMonth(3, -5, -1), + NewAtTimes( + NewAtTime(10, 30, 0), + NewAtTime(11, 15, 0), + ), + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleNewScheduler() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + fmt.Println(s.Jobs()) +} + +func ExampleScheduler_NewJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, err := s.NewJob( + DurationJob( + 10*time.Second, + ), + NewTask( + func() {}, + ), + ) + if err != nil { + panic(err) + } + fmt.Println(j.ID()) +} + +func ExampleScheduler_RemoveByTags() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithTags("tag1"), + ) + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithTags("tag2"), + ) + fmt.Println(len(s.Jobs())) + + time.Sleep(20 * time.Millisecond) + + s.RemoveByTags("tag1", "tag2") + + fmt.Println(len(s.Jobs())) + // Output: + // 2 + // 0 +} + +func ExampleScheduler_RemoveJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(len(s.Jobs())) + time.Sleep(20 * time.Millisecond) + + _ = s.RemoveJob(j.ID()) + + fmt.Println(len(s.Jobs())) + // Output: + // 1 + // 0 +} + +func ExampleScheduler_Start() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + CronJob( + "* * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + + s.Start() +} + +func ExampleScheduler_StopJobs() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + CronJob( + "* * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + + s.Start() + + _ = s.StopJobs() +} + +func ExampleScheduler_Update() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + CronJob( + "* * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + + s.Start() + + // after some time, need to change the job + + j, _ = s.Update( + j.ID(), + DurationJob( + 5*time.Second, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleWeeklyJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + WeeklyJob( + 2, + NewWeekdays(time.Tuesday, time.Wednesday, time.Saturday), + NewAtTimes( + NewAtTime(1, 30, 0), + NewAtTime(12, 0, 30), + ), + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleWithClock() { + fakeClock := clockwork.NewFakeClock() + s, _ := NewScheduler( + WithClock(fakeClock), + ) + var wg sync.WaitGroup + wg.Add(1) + _, _ = s.NewJob( + DurationJob( + time.Second*5, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d\n", one, two) + wg.Done() + }, + "one", 2, + ), + ) + s.Start() + fakeClock.BlockUntil(1) + fakeClock.Advance(time.Second * 5) + wg.Wait() + _ = s.StopJobs() + // Output: + // one, 2 +} + +func ExampleWithDistributedElector() { + //var _ Elector = (*myElector)(nil) + // + //type myElector struct{} + // + //func (m myElector) IsLeader(_ context.Context) error { + // return nil + //} + // + //elector := myElector{} + // + //_, _ = NewScheduler( + // WithDistributedElector(elector), + //) +} + +func ExampleWithEventListeners() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + AfterJobRuns( + func(jobID uuid.UUID) { + // do something after the job completes + }, + ), + AfterJobRunsWithError( + func(jobID uuid.UUID, err error) { + // do something when the job returns an error + }, + ), + BeforeJobRuns( + func(jobID uuid.UUID) { + // do something immediately before the job is run + }, + ), + ), + ) +} + +func ExampleWithGlobalJobOptions() { + s, _ := NewScheduler( + WithGlobalJobOptions( + WithTags("tag1", "tag2", "tag3"), + ), + ) + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + ) + // The job will have the globally applied tags + fmt.Println(j.Tags()) + + s2, _ := NewScheduler( + WithGlobalJobOptions( + WithTags("tag1", "tag2", "tag3"), + ), + ) + j2, _ := s2.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithTags("tag4", "tag5", "tag6"), + ) + // The job will have the tags set specifically on the job + // overriding those set globally by the scheduler + fmt.Println(j2.Tags()) + // Output: + // [tag1 tag2 tag3] + // [tag4 tag5 tag6] +} + +func ExampleWithLimitConcurrentJobs() { + _, _ = NewScheduler( + WithLimitConcurrentJobs( + 1, + LimitModeReschedule, + ), + ) +} + +func ExampleWithLimitedRuns() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Millisecond, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d\n", one, two) + }, + "one", 2, + ), + WithLimitedRuns(1), + ) + s.Start() + + time.Sleep(100 * time.Millisecond) + fmt.Printf("no jobs in scheduler: %v\n", s.Jobs()) + _ = s.StopJobs() + // Output: + // one, 2 + // no jobs in scheduler: [] +} + +func ExampleWithLocation() { + location, _ := time.LoadLocation("Asia/Kolkata") + + _, _ = NewScheduler( + WithLocation(location), + ) +} + +func ExampleWithLogger() { + _, _ = NewScheduler( + WithLogger( + NewJSONSlogLogger(slog.LevelInfo), + ), + ) +} + +func ExampleWithName() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithName("job 1"), + ) + fmt.Println(j.Name()) + // Output: + // job 1 +} + +func ExampleWithSingletonMode() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() { + // this job will skip half it's executions + // and effectively run every 2 seconds + time.Sleep(1500 * time.Second) + }, + ), + WithSingletonMode(LimitModeReschedule), + ) +} + +func ExampleWithStartAt() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + start := time.Date(9999, 9, 9, 9, 9, 9, 9, time.UTC) + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithStartAt( + WithStartDateTime(start), + ), + ) + s.Start() + time.Sleep(20 * time.Millisecond) + + next, _ := j.NextRun() + fmt.Println(next) + + _ = s.StopJobs() + // Output: + // 9999-09-09 09:09:09.000000009 +0000 UTC +} + +func ExampleWithStopTimeout() { + _, _ = NewScheduler( + WithStopTimeout(time.Second * 5), + ) +} + +func ExampleWithTags() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithTags("tag1", "tag2", "tag3"), + ) + fmt.Println(j.Tags()) + // Output: + // [tag1 tag2 tag3] +} diff --git a/executor.go b/executor.go new file mode 100644 index 0000000..ea83b7b --- /dev/null +++ b/executor.go @@ -0,0 +1,355 @@ +package gocron + +import ( + "context" + "log" + "strconv" + "sync" + "time" + + "github.com/google/uuid" +) + +type executor struct { + ctx context.Context + cancel context.CancelFunc + logger Logger + stopCh chan struct{} + jobsIDsIn chan uuid.UUID + jobIDsOut chan uuid.UUID + jobOutRequest chan jobOutRequest + stopTimeout time.Duration + done chan error + singletonRunners map[uuid.UUID]singletonRunner + limitMode *limitModeConfig + elector Elector +} + +type singletonRunner struct { + in chan uuid.UUID + rescheduleLimiter chan struct{} +} + +type limitModeConfig struct { + started bool + mode LimitMode + limit uint + rescheduleLimiter chan struct{} + in chan uuid.UUID +} + +func (e *executor) start() { + e.logger.Debug("executor started") + + // creating the executor's context here as the executor + // is the only goroutine that should access this context + // any other uses within the executor should create a context + // using the executor context as parent. + e.ctx, e.cancel = context.WithCancel(context.Background()) + + // the standardJobsWg tracks + standardJobsWg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } + + singletonJobsWg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } + + limitModeJobsWg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } + + // start the for leap that is the executor + // selecting on channels for work to do + for { + select { + // job ids in are sent from 1 of 2 places: + // 1. the scheduler sends directly when jobs + // are run immediately. + // 2. sent from time.AfterFuncs in which job schedules + // are spun up by the scheduler + case id := <-e.jobsIDsIn: + select { + case <-e.stopCh: + e.stop(&standardJobsWg, &singletonJobsWg, &limitModeJobsWg) + return + default: + } + // this context is used to handle cancellation of the executor + // on requests for a job to the scheduler via requestJobCtx + ctx, cancel := context.WithCancel(e.ctx) + + if e.limitMode != nil && !e.limitMode.started { + // check if we are already running the limit mode runners + // if not, spin up the required number i.e. limit! + e.limitMode.started = true + for i := e.limitMode.limit; i > 0; i-- { + limitModeJobsWg.Add(1) + go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, &limitModeJobsWg, e.limitMode.mode, e.limitMode.rescheduleLimiter) + } + } + + // spin off into a goroutine to unblock the executor and + // allow for processing for more work + go func() { + // make sure to cancel the above context per the docs + // // Canceling this context releases resources associated with it, so code should + // // call cancel as soon as the operations running in this Context complete. + defer cancel() + + // check for limit mode - this spins up a separate runner which handles + // limiting the total number of concurrently running jobs + if e.limitMode != nil { + if e.limitMode.mode == LimitModeReschedule { + select { + // rescheduleLimiter is a channel the size of the limit + // this blocks publishing to the channel and keeps + // the executor from building up a waiting queue + // and forces rescheduling + case e.limitMode.rescheduleLimiter <- struct{}{}: + e.limitMode.in <- id + default: + // all runners are busy, reschedule the work for later + // which means we just skip it here and do nothing + // TODO when metrics are added, this should increment a rescheduled metric + select { + case e.jobIDsOut <- id: + default: + } + } + } else { + // since we're not using LimitModeReschedule, but instead using LimitModeWait + // we do want to queue up the work to the limit mode runners and allow them + // to work through the channel backlog. A hard limit of 1000 is in place + // at which point this call would block. + // TODO when metrics are added, this should increment a wait metric + e.limitMode.in <- id + } + } else { + // no limit mode, so we're either running a regular job or + // a job with a singleton mode + // + // get the job, so we can figure out what kind it is and how + // to execute it + j := requestJobCtx(ctx, id, e.jobOutRequest) + if j == nil { + // safety check as it'd be strange bug if this occurred + // TODO add a log line here + return + } + if j.singletonMode { + // for singleton mode, get the existing runner for the job + // or spin up a new one + runner, ok := e.singletonRunners[id] + if !ok { + runner.in = make(chan uuid.UUID, 1000) + if j.singletonLimitMode == LimitModeReschedule { + runner.rescheduleLimiter = make(chan struct{}, 1) + } + e.singletonRunners[id] = runner + singletonJobsWg.Add(1) + go e.limitModeRunner("singleton-"+id.String(), runner.in, &singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) + } + + if j.singletonLimitMode == LimitModeReschedule { + // reschedule mode uses the limiter channel to check + // for a running job and reschedules if the channel is full. + select { + case runner.rescheduleLimiter <- struct{}{}: + runner.in <- id + default: + // runner is busy, reschedule the work for later + // which means we just skip it here and do nothing + // TODO when metrics are added, this should increment a rescheduled metric + select { + case e.jobIDsOut <- id: + default: + } + } + } else { + // wait mode, fill up that queue (buffered channel, so it's ok) + runner.in <- id + } + } else { + select { + case <-e.stopCh: + e.stop(&standardJobsWg, &singletonJobsWg, &limitModeJobsWg) + return + default: + } + // we've gotten to the basic / standard jobs -- + // the ones without anything special that just want + // to be run. Add to the WaitGroup so that + // stopping or shutting down can wait for the jobs to + // complete. + standardJobsWg.Add(1) + go func(j internalJob) { + e.runJob(j) + standardJobsWg.Done() + }(*j) + } + } + }() + case <-e.stopCh: + e.stop(&standardJobsWg, &singletonJobsWg, &limitModeJobsWg) + return + } + } +} + +func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) { + e.logger.Debug("stopping executor") + // we've been asked to stop. This is either because the scheduler has been told + // to stop all jobs or the scheduler has been asked to completely shutdown. + // + // cancel tells all the functions to stop their work and send in a done response + e.cancel() + + // the wait for job channels are used to report back whether we successfully waited + // for all jobs to complete or if we hit the configured timeout. + waitForJobs := make(chan struct{}, 1) + waitForSingletons := make(chan struct{}, 1) + waitForLimitMode := make(chan struct{}, 1) + + // the waiter context is used to cancel the functions waiting on jobs. + // this is done to avoid goroutine leaks. + waiterCtx, waiterCancel := context.WithCancel(context.Background()) + + // wait for standard jobs to complete + go func() { + e.logger.Debug("waiting for standard jobs to complete") + go func() { + // this is done in a separate goroutine, so we aren't + // blocked by the WaitGroup's Wait call in the event + // that the waiter context is cancelled. + // This particular goroutine could leak in the event that + // some long-running standard job doesn't complete. + standardJobsWg.Wait() + e.logger.Debug("standard jobs completed") + waitForJobs <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // wait for per job singleton limit mode runner jobs to complete + go func() { + e.logger.Debug("waiting for singleton jobs to complete") + go func() { + singletonJobsWg.Wait() + e.logger.Debug("singleton jobs completed") + waitForSingletons <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // wait for limit mode runners to complete + go func() { + e.logger.Debug("waiting for limit mode jobs to complete") + go func() { + limitModeJobsWg.Wait() + e.logger.Debug("limitMode jobs completed") + waitForLimitMode <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // now either wait for all the jobs to complete, + // or hit the timeout. + var count int + timeout := time.Now().Add(e.stopTimeout) + for time.Now().Before(timeout) && count < 3 { + select { + case <-waitForJobs: + count++ + case <-waitForSingletons: + count++ + // emptying the singleton runner map + // as we'll need to spin up new runners + // in the event that the scheduler is started again + e.singletonRunners = make(map[uuid.UUID]singletonRunner) + case <-waitForLimitMode: + count++ + default: + } + } + if count < 3 { + e.done <- ErrStopJobsTimedOut + e.logger.Debug("executor stopped - timed out") + } else { + e.done <- nil + e.logger.Debug("executor stopped") + } + waiterCancel() +} + +func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { + e.logger.Debug("limitModeRunner starting", "name", name) + for { + select { + case id := <-in: + log.Println("limitModeRunner got job", id) + select { + case <-e.ctx.Done(): + e.logger.Debug("limitModeRunner shutting down", "name", name) + wg.Done() + return + default: + } + + ctx, cancel := context.WithCancel(e.ctx) + j := requestJobCtx(ctx, id, e.jobOutRequest) + if j != nil { + log.Println("limitModeRunner running job", id) + e.runJob(*j) + } + cancel() + log.Println("limitModeRunner finished job", id) + + // remove the limiter block to allow another job to be scheduled + if limitMode == LimitModeReschedule { + select { + case <-rescheduleLimiter: + default: + } + } + log.Println("limitModeRunner job done", id) + case <-e.ctx.Done(): + e.logger.Debug("limitModeRunner shutting down", "name", name) + wg.Done() + return + } + } +} + +func (e *executor) runJob(j internalJob) { + select { + case <-e.ctx.Done(): + case <-j.ctx.Done(): + default: + if e.elector != nil { + if err := e.elector.IsLeader(j.ctx); err != nil { + return + } + } + _ = callJobFuncWithParams(j.beforeJobRuns, j.id) + + select { + case <-e.ctx.Done(): + return + case <-j.ctx.Done(): + return + case e.jobIDsOut <- j.id: + } + + err := callJobFuncWithParams(j.function, j.parameters...) + if err != nil { + _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, err) + } else { + _ = callJobFuncWithParams(j.afterJobRuns, j.id) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2f41e00 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/go-co-op/gocron/v2 + +go 1.21 + +require ( + github.com/google/uuid v1.4.0 + github.com/jonboulle/clockwork v0.4.0 + github.com/robfig/cron/v3 v3.0.1 + github.com/stretchr/testify v1.8.4 + go.uber.org/goleak v1.3.0 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..529bbf4 --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job.go b/job.go new file mode 100644 index 0000000..47aece8 --- /dev/null +++ b/job.go @@ -0,0 +1,779 @@ +package gocron + +import ( + "context" + "errors" + "fmt" + "math/rand" + "slices" + "strings" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" + "github.com/robfig/cron/v3" +) + +// internalJob stores the information needed by the scheduler +// to manage scheduling, starting and stopping the job +type internalJob struct { + ctx context.Context + cancel context.CancelFunc + id uuid.UUID + name string + tags []string + jobSchedule + lastRun, nextRun time.Time + function any + parameters []any + timer clockwork.Timer + singletonMode bool + singletonLimitMode LimitMode + limitRunsTo *limitRunsTo + startTime time.Time + startImmediately bool + // event listeners + afterJobRuns func(jobID uuid.UUID) + beforeJobRuns func(jobID uuid.UUID) + afterJobRunsWithError func(jobID uuid.UUID, err error) +} + +// stop is used to stop the job's timer and cancel the context +// stopping the timer is critical for cleaning up jobs that are +// sleeping in a time.AfterFunc timer when the job is being stopped. +// cancelling the context keeps the executor from continuing to try +// and run the job. +func (j *internalJob) stop() { + if j.timer != nil { + j.timer.Stop() + } + j.cancel() +} + +// task stores the function and parameters +// that are actually run when the job is executed. +type task struct { + function any + parameters []any +} + +// Task defines a function that returns the task +// function and parameters. +type Task func() task + +// NewTask provides the job's task function and parameters. +func NewTask(function any, parameters ...any) Task { + return func() task { + return task{ + function: function, + parameters: parameters, + } + } +} + +// limitRunsTo is used for managing the number of runs +// when the user only wants the job to run a certain +// number of times and then be removed from the scheduler. +type limitRunsTo struct { + limit uint + runCount uint +} + +// ----------------------------------------------- +// ----------------------------------------------- +// --------------- Job Variants ------------------ +// ----------------------------------------------- +// ----------------------------------------------- + +// JobDefinition defines the interface that must be +// implemented to create a job from the definition. +type JobDefinition interface { + setup(*internalJob, *time.Location) error +} + +var _ JobDefinition = (*cronJobDefinition)(nil) + +type cronJobDefinition struct { + crontab string + withSeconds bool +} + +func (c cronJobDefinition) setup(j *internalJob, location *time.Location) error { + var withLocation string + if strings.HasPrefix(c.crontab, "TZ=") || strings.HasPrefix(c.crontab, "CRON_TZ=") { + withLocation = c.crontab + } else { + // since the user didn't provide a timezone default to the location + // passed in by the scheduler. Default: time.Local + withLocation = fmt.Sprintf("CRON_TZ=%s %s", location.String(), c.crontab) + } + + var ( + cronSchedule cron.Schedule + err error + ) + + if c.withSeconds { + p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + cronSchedule, err = p.Parse(withLocation) + } else { + cronSchedule, err = cron.ParseStandard(withLocation) + } + if err != nil { + return errors.Join(ErrCronJobParse, err) + } + + j.jobSchedule = &cronJob{cronSchedule: cronSchedule} + return nil +} + +// CronJob defines a new job using the crontab syntax: `* * * * *`. +// An optional 6th field can be used at the beginning if withSeconds +// is set to true: `* * * * * *`. +// The timezone can be set on the Scheduler using WithLocation, or in the +// crontab in the form `TZ=America/Chicago * * * * *` or +// `CRON_TZ=America/Chicago * * * * *` +func CronJob(crontab string, withSeconds bool) JobDefinition { + return cronJobDefinition{ + crontab: crontab, + withSeconds: withSeconds, + } +} + +var _ JobDefinition = (*durationJobDefinition)(nil) + +type durationJobDefinition struct { + duration time.Duration +} + +func (d durationJobDefinition) setup(j *internalJob, _ *time.Location) error { + j.jobSchedule = &durationJob{duration: d.duration} + return nil +} + +// DurationJob defines a new job using time.Duration +// for the interval. +func DurationJob(duration time.Duration) JobDefinition { + return durationJobDefinition{ + duration: duration, + } +} + +var _ JobDefinition = (*durationRandomJobDefinition)(nil) + +type durationRandomJobDefinition struct { + min, max time.Duration +} + +func (d durationRandomJobDefinition) setup(j *internalJob, _ *time.Location) error { + if d.min >= d.max { + return ErrDurationRandomJobMinMax + } + + j.jobSchedule = &durationRandomJob{ + min: d.min, + max: d.max, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec + } + return nil +} + +// DurationRandomJob defines a new job that runs on a random interval +// between the min and max duration values provided. +// +// To achieve a similar behavior as tools that use a splay/jitter technique +// consider the median value as the baseline and the difference between the +// max-median or median-min as the splay/jitter. +// +// For example, if you want a job to run every 5 minutes, but want to add +// up to 1 min of jitter to the interval, you could use +// DurationRandomJob(4*time.Minute, 6*time.Minute) +func DurationRandomJob(minDuration, maxDuration time.Duration) JobDefinition { + return durationRandomJobDefinition{ + min: minDuration, + max: maxDuration, + } +} + +func DailyJob(interval uint, atTimes AtTimes) JobDefinition { + return dailyJobDefinition{ + interval: interval, + atTimes: atTimes, + } +} + +var _ JobDefinition = (*dailyJobDefinition)(nil) + +type dailyJobDefinition struct { + interval uint + atTimes AtTimes +} + +func (d dailyJobDefinition) setup(j *internalJob, location *time.Location) error { + atTimesDate, err := convertAtTimesToDateTime(d.atTimes, location) + switch { + case errors.Is(err, errAtTimesNil): + return ErrDailyJobAtTimesNil + case errors.Is(err, errAtTimeNil): + return ErrDailyJobAtTimeNil + case errors.Is(err, errAtTimeHours): + return ErrDailyJobHours + case errors.Is(err, errAtTimeMinSec): + return ErrDailyJobMinutesSeconds + } + + ds := dailyJob{ + interval: d.interval, + atTimes: atTimesDate, + } + j.jobSchedule = ds + return nil +} + +var _ JobDefinition = (*weeklyJobDefinition)(nil) + +type weeklyJobDefinition struct { + interval uint + daysOfTheWeek Weekdays + atTimes AtTimes +} + +func (w weeklyJobDefinition) setup(j *internalJob, location *time.Location) error { + var ws weeklyJob + ws.interval = w.interval + + if w.daysOfTheWeek == nil { + return ErrWeeklyJobDaysOfTheWeekNil + } + + daysOfTheWeek := w.daysOfTheWeek() + + slices.Sort(daysOfTheWeek) + + atTimesDate, err := convertAtTimesToDateTime(w.atTimes, location) + switch { + case errors.Is(err, errAtTimesNil): + return ErrWeeklyJobAtTimesNil + case errors.Is(err, errAtTimeNil): + return ErrWeeklyJobAtTimeNil + case errors.Is(err, errAtTimeHours): + return ErrWeeklyJobHours + case errors.Is(err, errAtTimeMinSec): + return ErrWeeklyJobMinutesSeconds + } + ws.atTimes = atTimesDate + + j.jobSchedule = ws + return nil +} + +type Weekdays func() []time.Weekday + +func NewWeekdays(weekday time.Weekday, weekdays ...time.Weekday) Weekdays { + return func() []time.Weekday { + return append(weekdays, weekday) + } +} + +func WeeklyJob(interval uint, daysOfTheWeek Weekdays, atTimes AtTimes) JobDefinition { + return weeklyJobDefinition{ + interval: interval, + daysOfTheWeek: daysOfTheWeek, + atTimes: atTimes, + } +} + +var _ JobDefinition = (*monthlyJobDefinition)(nil) + +type monthlyJobDefinition struct { + interval uint + daysOfTheMonth DaysOfTheMonth + atTimes AtTimes +} + +func (m monthlyJobDefinition) setup(j *internalJob, location *time.Location) error { + var ms monthlyJob + ms.interval = m.interval + + if m.daysOfTheMonth == nil { + return ErrMonthlyJobDaysNil + } + + var daysStart, daysEnd []int + for _, day := range m.daysOfTheMonth() { + if day > 31 || day == 0 || day < -31 { + return ErrMonthlyJobDays + } + if day > 0 { + daysStart = append(daysStart, day) + } else { + daysEnd = append(daysEnd, day) + } + } + daysStart = removeSliceDuplicatesInt(daysStart) + slices.Sort(daysStart) + ms.days = daysStart + + daysEnd = removeSliceDuplicatesInt(daysEnd) + slices.Sort(daysEnd) + ms.daysFromEnd = daysEnd + + atTimesDate, err := convertAtTimesToDateTime(m.atTimes, location) + switch { + case errors.Is(err, errAtTimesNil): + return ErrMonthlyJobAtTimesNil + case errors.Is(err, errAtTimeNil): + return ErrMonthlyJobAtTimeNil + case errors.Is(err, errAtTimeHours): + return ErrMonthlyJobHours + case errors.Is(err, errAtTimeMinSec): + return ErrMonthlyJobMinutesSeconds + } + ms.atTimes = atTimesDate + + j.jobSchedule = ms + return nil +} + +type days []int + +// DaysOfTheMonth defines a function that returns a list of days. +type DaysOfTheMonth func() days + +// NewDaysOfTheMonth provide the days of the month the job should +// run. The days can be positive 1 to 31 and/or negative -31 to -1. +// Negative values count backwards from the end of the month. +// For example: -1 == the last day of the month. +// +// -5 == 5 days before the end of the month. +func NewDaysOfTheMonth(day int, moreDays ...int) DaysOfTheMonth { + return func() days { + moreDays = append(moreDays, day) + return moreDays + } +} + +type atTime struct { + hours, minutes, seconds uint +} + +func (a atTime) time(location *time.Location) time.Time { + return time.Date(0, 0, 0, int(a.hours), int(a.minutes), int(a.seconds), 0, location) +} + +// AtTime defines a function that returns the internal atTime +type AtTime func() atTime + +// NewAtTime provide the hours, minutes and seconds at which +// the job should be run +func NewAtTime(hours, minutes, seconds uint) AtTime { + return func() atTime { + return atTime{hours: hours, minutes: minutes, seconds: seconds} + } +} + +// AtTimes define a list of AtTime +type AtTimes func() []AtTime + +// NewAtTimes provide the hours, minutes and seconds at which +// the job should be run +func NewAtTimes(atTime AtTime, atTimes ...AtTime) AtTimes { + return func() []AtTime { + atTimes = append(atTimes, atTime) + return atTimes + } +} + +// MonthlyJob runs the job on the interval of months, on the specific days of the month +// specified, and at the set times. Days of the month can be 1 to 31 or negative (-1 to -31), which +// count backwards from the end of the month. E.g. -1 is the last day of the month. +// +// If a day of the month is selected that does not exist in all months (e.g. 31st) +// any month that does not have that day will be skipped. +// +// By default, the job will start the next available day, considering the last run to be now, +// and the time and month based on the interval, days and times you input. +// This means, if you select an interval greater than 1, your job by default will run +// X (interval) months from now. +// You can use WithStartAt to tell the scheduler to start the job sooner. +// +// Carefully consider your configuration! +// - For example: an interval of 2 months on the 31st of each month, starting 12/31 +// would skip Feb, April, June, and next run would be in August. +func MonthlyJob(interval uint, daysOfTheMonth DaysOfTheMonth, atTimes AtTimes) JobDefinition { + return monthlyJobDefinition{ + interval: interval, + daysOfTheMonth: daysOfTheMonth, + atTimes: atTimes, + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ----------------- Job Options ----------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type JobOption func(*internalJob) error + +func WithEventListeners(eventListeners ...EventListener) JobOption { + return func(j *internalJob) error { + for _, eventListener := range eventListeners { + if err := eventListener(j); err != nil { + return err + } + } + return nil + } +} + +// WithLimitedRuns limits the number of executions of this job to n. +// Upon reaching the limit, the job is removed from the scheduler. +func WithLimitedRuns(limit uint) JobOption { + return func(j *internalJob) error { + j.limitRunsTo = &limitRunsTo{ + limit: limit, + runCount: 0, + } + return nil + } +} + +// WithName sets the name of the job. Name provides +// a human-readable identifier for the job. +func WithName(name string) JobOption { + // TODO use the name for metrics and future logging option + return func(j *internalJob) error { + if name == "" { + return ErrWithNameEmpty + } + j.name = name + return nil + } +} + +func WithSingletonMode(mode LimitMode) JobOption { + return func(j *internalJob) error { + j.singletonMode = true + j.singletonLimitMode = mode + return nil + } +} + +// WithStartAt sets the option for starting the job +func WithStartAt(option StartAtOption) JobOption { + return func(j *internalJob) error { + return option(j) + } +} + +// StartAtOption defines options for starting the job +type StartAtOption func(*internalJob) error + +// WithStartImmediately tells the scheduler to run the job immediately +// regardless of the type or schedule of job. After this immediate run +// the job is scheduled from this time based on the job definition. +func WithStartImmediately() StartAtOption { + return func(j *internalJob) error { + j.startImmediately = true + return nil + } +} + +// WithStartDateTime sets the first date & time at which the job should run. +func WithStartDateTime(start time.Time) StartAtOption { + return func(j *internalJob) error { + if start.IsZero() || start.Before(time.Now()) { + return ErrWithStartDateTimePast + } + j.startTime = start + return nil + } +} + +func WithTags(tags ...string) JobOption { + return func(j *internalJob) error { + j.tags = tags + return nil + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ------------- Job Event Listeners ------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type EventListener func(*internalJob) error + +func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.afterJobRuns = eventListenerFunc + return nil + } +} + +func AfterJobRunsWithError(eventListenerFunc func(jobID uuid.UUID, err error)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.afterJobRunsWithError = eventListenerFunc + return nil + } +} + +func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.beforeJobRuns = eventListenerFunc + return nil + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ---------------- Job Schedules ---------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type jobSchedule interface { + next(lastRun time.Time) time.Time +} + +var _ jobSchedule = (*cronJob)(nil) + +type cronJob struct { + cronSchedule cron.Schedule +} + +func (j *cronJob) next(lastRun time.Time) time.Time { + return j.cronSchedule.Next(lastRun) +} + +var _ jobSchedule = (*durationJob)(nil) + +type durationJob struct { + duration time.Duration +} + +func (j *durationJob) next(lastRun time.Time) time.Time { + return lastRun.Add(j.duration) +} + +var _ jobSchedule = (*durationRandomJob)(nil) + +type durationRandomJob struct { + min, max time.Duration + rand *rand.Rand +} + +func (j *durationRandomJob) next(lastRun time.Time) time.Time { + r := j.rand.Int63n(int64(j.max - j.min)) + return lastRun.Add(j.min + time.Duration(r)) +} + +var _ jobSchedule = (*dailyJob)(nil) + +type dailyJob struct { + interval uint + atTimes []time.Time +} + +func (d dailyJob) next(lastRun time.Time) time.Time { + next := d.nextDay(lastRun) + if !next.IsZero() { + return next + } + startNextDay := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(d.interval), 0, 0, 0, lastRun.Nanosecond(), lastRun.Location()) + return d.nextDay(startNextDay) +} + +func (d dailyJob) nextDay(lastRun time.Time) time.Time { + for _, at := range d.atTimes { + // sub the at time hour/min/sec onto the lastRun's values + // to use in checks to see if we've got our next run time + atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) + + if atDate.After(lastRun) { + // checking to see if it is after i.e. greater than, + // and not greater or equal as our lastRun day/time + // will be in the loop, and we don't want to select it again + return atDate + } + } + return time.Time{} +} + +var _ jobSchedule = (*weeklyJob)(nil) + +type weeklyJob struct { + interval uint + daysOfWeek []time.Weekday + atTimes []time.Time +} + +func (w weeklyJob) next(lastRun time.Time) time.Time { + next := w.nextWeekDayAtTime(lastRun) + if !next.IsZero() { + return next + } + + startOfTheNextIntervalWeek := (lastRun.Day() - int(lastRun.Weekday())) + int(w.interval*7) + from := time.Date(lastRun.Year(), lastRun.Month(), startOfTheNextIntervalWeek, 0, 0, 0, 0, lastRun.Location()) + return w.nextWeekDayAtTime(from) +} + +func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time) time.Time { + for _, wd := range w.daysOfWeek { + // checking if we're on the same day or later in the same week + if wd >= lastRun.Weekday() { + // weekDayDiff is used to add the correct amount to the atDate day below + weekDayDiff := wd - lastRun.Weekday() + for _, at := range w.atTimes { + // sub the at time hour/min/sec onto the lastRun's values + // to use in checks to see if we've got our next run time + atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) + + if atDate.After(lastRun) { + // checking to see if it is after i.e. greater than, + // and not greater or equal as our lastRun day/time + // will be in the loop, and we don't want to select it again + return atDate + } + } + } + } + return time.Time{} +} + +var _ jobSchedule = (*monthlyJob)(nil) + +type monthlyJob struct { + interval uint + days []int + daysFromEnd []int + atTimes []time.Time +} + +func (m monthlyJob) next(lastRun time.Time) time.Time { + daysList := make([]int, len(m.days)) + copy(daysList, m.days) + firstDayNextMonth := time.Date(lastRun.Year(), lastRun.Month()+1, 1, 0, 0, 0, 0, lastRun.Location()) + for _, daySub := range m.daysFromEnd { + // getting a combined list of all the daysList and the negative daysList + // which count backwards from the first day of the next month + // -1 == the last day of the month + day := firstDayNextMonth.AddDate(0, 0, daySub).Day() + daysList = append(daysList, day) + } + slices.Sort(daysList) + + next := m.nextMonthDayAtTime(lastRun, daysList) + if !next.IsZero() { + return next + } + + from := time.Date(lastRun.Year(), lastRun.Month()+time.Month(m.interval), 1, 0, 0, 0, 0, lastRun.Location()) + for next.IsZero() { + next = m.nextMonthDayAtTime(from, daysList) + from = from.AddDate(0, int(m.interval), 0) + } + + return next +} + +func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int) time.Time { + // find the next day in the month that should run and then check for an at time + for _, day := range days { + if day >= lastRun.Day() { + for _, at := range m.atTimes { + // sub the day, and the at time hour/min/sec onto the lastRun's values + // to use in checks to see if we've got our next run time + atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) + + if atDate.Month() != lastRun.Month() { + // this check handles if we're setting a day not in the current month + // e.g. setting day 31 in Feb results in March 2nd + continue + } + + if atDate.After(lastRun) { + // checking to see if it is after i.e. greater than, + // and not greater or equal as our lastRun day/time + // will be in the loop, and we don't want to select it again + return atDate + } + } + continue + } + } + return time.Time{} +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ---------------- Job Interface ---------------- +// ----------------------------------------------- +// ----------------------------------------------- + +// Job provides the available methods on the job +// available to the caller. +type Job interface { + ID() uuid.UUID + LastRun() (time.Time, error) + Name() string + NextRun() (time.Time, error) + Tags() []string +} + +var _ Job = (*job)(nil) + +// job is the internal struct that implements +// the public interface. This is used to avoid +// leaking information the caller never needs +// to have or tinker with. +type job struct { + id uuid.UUID + name string + tags []string + jobOutRequest chan jobOutRequest +} + +// ID returns the job's unique identifier. +func (j job) ID() uuid.UUID { + return j.id +} + +// LastRun returns the time of the job's last run +func (j job) LastRun() (time.Time, error) { + ij := requestJob(j.id, j.jobOutRequest) + if ij == nil || ij.id == uuid.Nil { + return time.Time{}, ErrJobNotFound + } + return ij.lastRun, nil +} + +// Name returns the name defined on the job. +func (j job) Name() string { + return j.name +} + +// NextRun returns the time of the job's next scheduled run. +func (j job) NextRun() (time.Time, error) { + ij := requestJob(j.id, j.jobOutRequest) + if ij == nil || ij.id == uuid.Nil { + return time.Time{}, ErrJobNotFound + } + return ij.nextRun, nil +} + +// Tags returns the job's string tags. +func (j job) Tags() []string { + return j.tags +} diff --git a/job_test.go b/job_test.go new file mode 100644 index 0000000..82a521c --- /dev/null +++ b/job_test.go @@ -0,0 +1,430 @@ +package gocron + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDurationJob_next(t *testing.T) { + tests := []time.Duration{ + time.Millisecond, + time.Second, + 100 * time.Second, + 1000 * time.Second, + 5 * time.Second, + 50 * time.Second, + time.Minute, + 5 * time.Minute, + 100 * time.Minute, + time.Hour, + 2 * time.Hour, + 100 * time.Hour, + 1000 * time.Hour, + } + + lastRun := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + for _, duration := range tests { + t.Run(duration.String(), func(t *testing.T) { + d := durationJob{duration: duration} + next := d.next(lastRun) + expected := lastRun.Add(duration) + + assert.Equal(t, expected, next) + }) + } +} + +func TestDailyJob_next(t *testing.T) { + tests := []struct { + name string + interval uint + atTimes []time.Time + lastRun time.Time + expectedNextRun time.Time + expectedDurationToNextRun time.Duration + }{ + { + "daily multiple at times", + 1, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + time.Date(0, 0, 0, 12, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 1, 12, 30, 0, 0, time.UTC), + 7 * time.Hour, + }, + { + "every 2 days multiple at times", + 2, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + time.Date(0, 0, 0, 12, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 12, 30, 0, 0, time.UTC), + time.Date(2000, 1, 3, 5, 30, 0, 0, time.UTC), + 41 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := dailyJob{ + interval: tt.interval, + atTimes: tt.atTimes, + } + + next := d.next(tt.lastRun) + assert.Equal(t, tt.expectedNextRun, next) + assert.Equal(t, tt.expectedDurationToNextRun, next.Sub(tt.lastRun)) + }) + } +} + +func TestWeeklyJob_next(t *testing.T) { + tests := []struct { + name string + interval uint + daysOfWeek []time.Weekday + atTimes []time.Time + lastRun time.Time + expectedNextRun time.Time + expectedDurationToNextRun time.Duration + }{ + { + "last run Monday, next run is Thursday", + 1, + []time.Weekday{time.Monday, time.Thursday}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 3, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 6, 5, 30, 0, 0, time.UTC), + 3 * 24 * time.Hour, + }, + { + "last run Thursday, next run is Monday", + 1, + []time.Weekday{time.Monday, time.Thursday}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 6, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 10, 5, 30, 0, 0, time.UTC), + 4 * 24 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := weeklyJob{ + interval: tt.interval, + daysOfWeek: tt.daysOfWeek, + atTimes: tt.atTimes, + } + + next := w.next(tt.lastRun) + assert.Equal(t, tt.expectedNextRun, next) + assert.Equal(t, tt.expectedDurationToNextRun, next.Sub(tt.lastRun)) + }) + } +} + +func TestMonthlyJob_next(t *testing.T) { + americaChicago, err := time.LoadLocation("America/Chicago") + require.NoError(t, err) + + tests := []struct { + name string + interval uint + days []int + daysFromEnd []int + atTimes []time.Time + lastRun time.Time + expectedNextRun time.Time + expectedDurationToNextRun time.Duration + }{ + { + "same day - before at time", + 1, + []int{1}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + 5*time.Hour + 30*time.Minute, + }, + { + "same day - after at time, runs next available date", + 1, + []int{1, 10}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 10, 5, 30, 0, 0, time.UTC), + 9 * 24 * time.Hour, + }, + { + "same day - after at time, runs next available date, following interval month", + 2, + []int{1}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + time.Date(2000, 3, 1, 5, 30, 0, 0, time.UTC), + 60 * 24 * time.Hour, + }, + { + "daylight savings time", + 1, + []int{5}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, americaChicago), + }, + time.Date(2023, 11, 1, 0, 0, 0, 0, americaChicago), + time.Date(2023, 11, 5, 5, 30, 0, 0, americaChicago), + 4*24*time.Hour + 6*time.Hour + 30*time.Minute, + }, + { + "negative days", + 1, + nil, + []int{-1, -3, -5}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 29, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 31, 5, 30, 0, 0, time.UTC), + 2 * 24 * time.Hour, + }, + { + "day not in current month, runs next month (leap year)", + 1, + []int{31}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 31, 5, 30, 0, 0, time.UTC), + time.Date(2000, 3, 31, 5, 30, 0, 0, time.UTC), + 29*24*time.Hour + 31*24*time.Hour, + }, + { + "multiple days not in order", + 1, + []int{10, 7, 19, 2}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 2, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 7, 5, 30, 0, 0, time.UTC), + 5 * 24 * time.Hour, + }, + { + "day not in next interval month, selects next available option, skips Feb, April & June", + 2, + []int{31}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(1999, 12, 31, 5, 30, 0, 0, time.UTC), + time.Date(2000, 8, 31, 5, 30, 0, 0, time.UTC), + 244 * 24 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := monthlyJob{ + interval: tt.interval, + days: tt.days, + daysFromEnd: tt.daysFromEnd, + atTimes: tt.atTimes, + } + + next := m.next(tt.lastRun) + assert.Equal(t, tt.expectedNextRun, next) + assert.Equal(t, tt.expectedDurationToNextRun, next.Sub(tt.lastRun)) + }) + } +} + +func TestDurationRandomJob_next(t *testing.T) { + tests := []struct { + name string + min time.Duration + max time.Duration + lastRun time.Time + expectedMin time.Time + expectedMax time.Time + }{ + { + "min 1s, max 5s", + time.Second, + 5 * time.Second, + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), + time.Date(2000, 1, 1, 0, 0, 5, 0, time.UTC), + }, + { + "min 100ms, max 1s", + 100 * time.Millisecond, + 1 * time.Second, + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2000, 1, 1, 0, 0, 0, 100000000, time.UTC), + time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rj := durationRandomJob{ + min: tt.min, + max: tt.max, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec + } + + for i := 0; i < 100; i++ { + next := rj.next(tt.lastRun) + assert.GreaterOrEqual(t, next, tt.expectedMin) + assert.LessOrEqual(t, next, tt.expectedMax) + } + }) + } +} + +func TestJob_LastRun(t *testing.T) { + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local) + fakeClock := clockwork.NewFakeClockAt(testTime) + + s, err := newTestScheduler( + WithClock(fakeClock), + ) + require.NoError(t, err) + + j, err := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(10 * time.Millisecond) + + lastRun, err := j.LastRun() + assert.NoError(t, err) + + err = s.Shutdown() + require.NoError(t, err) + + assert.Equal(t, testTime, lastRun) +} + +func TestWithEventListeners(t *testing.T) { + tests := []struct { + name string + eventListeners []EventListener + err error + }{ + { + "no event listeners", + nil, + nil, + }, + { + "afterJobRuns", + []EventListener{ + AfterJobRuns(func(_ uuid.UUID) {}), + }, + nil, + }, + { + "afterJobRunsWithError", + []EventListener{ + AfterJobRunsWithError(func(_ uuid.UUID, _ error) {}), + }, + nil, + }, + { + "beforeJobRuns", + []EventListener{ + BeforeJobRuns(func(_ uuid.UUID) {}), + }, + nil, + }, + { + "multiple event listeners", + []EventListener{ + AfterJobRuns(func(_ uuid.UUID) {}), + AfterJobRunsWithError(func(_ uuid.UUID, _ error) {}), + BeforeJobRuns(func(_ uuid.UUID) {}), + }, + nil, + }, + { + "nil after job runs listener", + []EventListener{ + AfterJobRuns(nil), + }, + ErrEventListenerFuncNil, + }, + { + "nil after job runs with error listener", + []EventListener{ + AfterJobRunsWithError(nil), + }, + ErrEventListenerFuncNil, + }, + { + "nil before job runs listener", + []EventListener{ + BeforeJobRuns(nil), + }, + ErrEventListenerFuncNil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ij internalJob + err := WithEventListeners(tt.eventListeners...)(&ij) + assert.Equal(t, tt.err, err) + + if err != nil { + return + } + var count int + if ij.afterJobRuns != nil { + count++ + } + if ij.afterJobRunsWithError != nil { + count++ + } + if ij.beforeJobRuns != nil { + count++ + } + assert.Equal(t, len(tt.eventListeners), count) + }) + } +} diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..39982c9 --- /dev/null +++ b/logger.go @@ -0,0 +1,67 @@ +package gocron + +import ( + "log/slog" + "os" +) + +// Logger is the interface that wraps the basic logging methods +// used by gocron. The methods are modeled after the standard +// library slog package. The default logger is a no-op logger. +// To enable logging, use one of the provided New*Logger functions +// or implement your own Logger. The actual level of Log that is logged +// is handled by the implementation. +type Logger interface { + Debug(msg string, args ...any) + Error(msg string, args ...any) + Info(msg string, args ...any) + Warn(msg string, args ...any) +} + +var _ Logger = (*noOpLogger)(nil) + +type noOpLogger struct{} + +func (l noOpLogger) Debug(_ string, _ ...any) {} +func (l noOpLogger) Error(_ string, _ ...any) {} +func (l noOpLogger) Info(_ string, _ ...any) {} +func (l noOpLogger) Warn(_ string, _ ...any) {} + +var _ Logger = (*slogLogger)(nil) + +type slogLogger struct { + sl *slog.Logger +} + +func NewJSONSlogLogger(level slog.Level) Logger { + return NewSlogLogger( + slog.New( + slog.NewJSONHandler( + os.Stdout, + &slog.HandlerOptions{ + Level: level, + }, + ), + ), + ) +} + +func NewSlogLogger(sl *slog.Logger) Logger { + return &slogLogger{sl: sl} +} + +func (l *slogLogger) Debug(msg string, args ...any) { + l.sl.Debug(msg, args...) +} + +func (l *slogLogger) Error(msg string, args ...any) { + l.sl.Error(msg, args...) +} + +func (l *slogLogger) Info(msg string, args ...any) { + l.sl.Info(msg, args...) +} + +func (l *slogLogger) Warn(msg string, args ...any) { + l.sl.Warn(msg, args...) +} diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..9ec670e --- /dev/null +++ b/scheduler.go @@ -0,0 +1,633 @@ +package gocron + +import ( + "context" + "reflect" + "slices" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" +) + +var _ Scheduler = (*scheduler)(nil) + +type Scheduler interface { + Jobs() []Job + NewJob(JobDefinition, Task, ...JobOption) (Job, error) + RemoveByTags(...string) + RemoveJob(uuid.UUID) error + Start() + StopJobs() error + Shutdown() error + Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error) +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ----------------- Scheduler ------------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type scheduler struct { + shutdownCtx context.Context + shutdownCancel context.CancelFunc + exec executor + jobs map[uuid.UUID]internalJob + location *time.Location + clock clockwork.Clock + started bool + globalJobOptions []JobOption + logger Logger + + startCh chan struct{} + startedCh chan struct{} + stopCh chan struct{} + stopErrCh chan error + allJobsOutRequest chan allJobsOutRequest + jobOutRequestCh chan jobOutRequest + newJobCh chan internalJob + removeJobCh chan uuid.UUID + removeJobsByTagsCh chan []string +} + +type jobOutRequest struct { + id uuid.UUID + outChan chan internalJob +} + +type allJobsOutRequest struct { + outChan chan []Job +} + +func NewScheduler(options ...SchedulerOption) (Scheduler, error) { + schCtx, cancel := context.WithCancel(context.Background()) + + exec := executor{ + stopCh: make(chan struct{}), + stopTimeout: time.Second * 10, + singletonRunners: make(map[uuid.UUID]singletonRunner), + logger: &noOpLogger{}, + + jobsIDsIn: make(chan uuid.UUID), + jobIDsOut: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + done: make(chan error), + } + + s := &scheduler{ + shutdownCtx: schCtx, + shutdownCancel: cancel, + exec: exec, + jobs: make(map[uuid.UUID]internalJob), + location: time.Local, + clock: clockwork.NewRealClock(), + logger: &noOpLogger{}, + + newJobCh: make(chan internalJob), + removeJobCh: make(chan uuid.UUID), + removeJobsByTagsCh: make(chan []string), + startCh: make(chan struct{}), + startedCh: make(chan struct{}), + stopCh: make(chan struct{}), + stopErrCh: make(chan error, 1), + jobOutRequestCh: make(chan jobOutRequest), + allJobsOutRequest: make(chan allJobsOutRequest), + } + + for _, option := range options { + err := option(s) + if err != nil { + return nil, err + } + } + + go func() { + s.logger.Info("new scheduler created") + for { + select { + case id := <-s.exec.jobIDsOut: + s.selectExecJobIDsOut(id) + + case j := <-s.newJobCh: + s.selectNewJob(j) + + case id := <-s.removeJobCh: + s.selectRemoveJob(id) + + case tags := <-s.removeJobsByTagsCh: + s.selectRemoveJobsByTags(tags) + + case out := <-s.exec.jobOutRequest: + s.selectJobOutRequest(out) + + case out := <-s.jobOutRequestCh: + s.selectJobOutRequest(out) + + case out := <-s.allJobsOutRequest: + s.selectAllJobsOutRequest(out) + + case <-s.startCh: + s.selectStart() + + case <-s.stopCh: + s.stopScheduler() + + case <-s.shutdownCtx.Done(): + s.stopScheduler() + return + } + } + }() + + return s, nil +} + +// ----------------------------------------------- +// ----------------------------------------------- +// --------- Scheduler Channel Methods ----------- +// ----------------------------------------------- +// ----------------------------------------------- + +// The scheduler's channel functions are broken out here +// to allow prioritizing within the select blocks. The idea +// being that we want to make sure that scheduling tasks +// are not blocked by requests from the caller for information +// about jobs. + +func (s *scheduler) stopScheduler() { + s.logger.Debug("stopping scheduler") + if s.started { + s.exec.stopCh <- struct{}{} + } + + for _, j := range s.jobs { + j.stop() + } + for id, j := range s.jobs { + <-j.ctx.Done() + + j.ctx, j.cancel = context.WithCancel(s.shutdownCtx) + s.jobs[id] = j + } + var err error + if s.started { + select { + case err = <-s.exec.done: + case <-time.After(s.exec.stopTimeout + 1*time.Second): + err = ErrStopExecutorTimedOut + } + } + s.stopErrCh <- err + s.started = false + s.logger.Debug("scheduler stopped") +} + +func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { + outJobs := make([]Job, len(s.jobs)) + var counter int + for _, j := range s.jobs { + outJobs[counter] = s.jobFromInternalJob(j) + counter++ + } + select { + case <-s.shutdownCtx.Done(): + case out.outChan <- outJobs: + } +} + +func (s *scheduler) selectRemoveJob(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + return + } + j.stop() + delete(s.jobs, id) +} + +func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { + j := s.jobs[id] + j.lastRun = j.nextRun + + if j.limitRunsTo != nil { + j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 + if j.limitRunsTo.runCount == j.limitRunsTo.limit { + go func() { + select { + case <-s.shutdownCtx.Done(): + return + case s.removeJobCh <- id: + } + }() + return + } + } + + next := j.next(j.lastRun) + j.nextRun = next + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + select { + case <-s.shutdownCtx.Done(): + return + case s.exec.jobsIDsIn <- id: + } + }) + s.jobs[id] = j +} + +func (s *scheduler) selectJobOutRequest(out jobOutRequest) { + if j, ok := s.jobs[out.id]; ok { + select { + case out.outChan <- j: + case <-s.shutdownCtx.Done(): + } + } + close(out.outChan) +} + +func (s *scheduler) selectNewJob(j internalJob) { + if s.started { + next := j.startTime + if j.startImmediately { + next = s.now() + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- j.id: + } + } else { + if next.IsZero() { + next = j.next(s.now()) + } + + id := j.id + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- id: + } + }) + } + j.nextRun = next + } + + s.jobs[j.id] = j +} + +func (s *scheduler) selectRemoveJobsByTags(tags []string) { + for _, j := range s.jobs { + for _, tag := range tags { + if slices.Contains(j.tags, tag) { + j.stop() + delete(s.jobs, j.id) + break + } + } + } +} + +func (s *scheduler) selectStart() { + s.logger.Debug("scheduler starting") + go s.exec.start() + + s.started = true + for id, j := range s.jobs { + next := j.startTime + if j.startImmediately { + next = s.now() + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- id: + } + } else { + if next.IsZero() { + next = j.next(s.now()) + } + + jobID := id + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- jobID: + } + }) + } + j.nextRun = next + s.jobs[id] = j + } + select { + case <-s.shutdownCtx.Done(): + case s.startedCh <- struct{}{}: + s.logger.Info("scheduler started") + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ------------- Scheduler Methods --------------- +// ----------------------------------------------- +// ----------------------------------------------- + +func (s *scheduler) now() time.Time { + return s.clock.Now().In(s.location) +} + +func (s *scheduler) jobFromInternalJob(in internalJob) job { + return job{ + id: in.id, + name: in.name, + tags: slices.Clone(in.tags), + jobOutRequest: s.jobOutRequestCh, + } +} + +func (s *scheduler) Jobs() []Job { + outChan := make(chan []Job) + select { + case <-s.shutdownCtx.Done(): + case s.allJobsOutRequest <- allJobsOutRequest{outChan: outChan}: + } + + var jobs []Job + select { + case <-s.shutdownCtx.Done(): + case jobs = <-outChan: + } + + return jobs +} + +func (s *scheduler) NewJob(jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) { + return s.addOrUpdateJob(uuid.Nil, jobDefinition, task, options) +} + +func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) { + j := internalJob{} + if id == uuid.Nil { + j.id = uuid.New() + } else { + currentJob := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh) + if currentJob != nil && currentJob.id != uuid.Nil { + select { + case <-s.shutdownCtx.Done(): + return nil, nil + case s.removeJobCh <- id: + <-currentJob.ctx.Done() + } + } + + j.id = id + } + + j.ctx, j.cancel = context.WithCancel(s.shutdownCtx) + + if taskWrapper == nil { + return nil, ErrNewJobTaskNil + } + + tsk := taskWrapper() + taskFunc := reflect.ValueOf(tsk.function) + for taskFunc.Kind() == reflect.Ptr { + taskFunc = taskFunc.Elem() + } + + if taskFunc.Kind() != reflect.Func { + return nil, ErrNewJobTask + } + + j.function = tsk.function + j.parameters = tsk.parameters + + // apply global job options + for _, option := range s.globalJobOptions { + if err := option(&j); err != nil { + return nil, err + } + } + + // apply job specific options, which take precedence + for _, option := range options { + if err := option(&j); err != nil { + return nil, err + } + } + + if err := definition.setup(&j, s.location); err != nil { + return nil, err + } + + select { + case <-s.shutdownCtx.Done(): + case s.newJobCh <- j: + } + + return &job{ + id: j.id, + name: j.name, + tags: slices.Clone(j.tags), + jobOutRequest: s.jobOutRequestCh, + }, nil +} + +func (s *scheduler) RemoveByTags(tags ...string) { + select { + case <-s.shutdownCtx.Done(): + case s.removeJobsByTagsCh <- tags: + } +} + +func (s *scheduler) RemoveJob(id uuid.UUID) error { + j := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh) + if j == nil || j.id == uuid.Nil { + return ErrJobNotFound + } + select { + case <-s.shutdownCtx.Done(): + case s.removeJobCh <- id: + } + + return nil +} + +// Start begins scheduling jobs for execution based +// on each job's definition. Job's added to an already +// running scheduler will be scheduled immediately based +// on definition. +func (s *scheduler) Start() { + select { + case <-s.shutdownCtx.Done(): + case s.startCh <- struct{}{}: + <-s.startedCh + } +} + +// StopJobs stops the execution of all jobs in the scheduler. +// This can be useful in situations where jobs need to be +// paused globally and then restarted with Start(). +func (s *scheduler) StopJobs() error { + select { + case <-s.shutdownCtx.Done(): + return nil + case s.stopCh <- struct{}{}: + } + select { + case err := <-s.stopErrCh: + return err + case <-time.After(s.exec.stopTimeout + 2*time.Second): + return ErrStopSchedulerTimedOut + } +} + +// Shutdown should be called when you no longer need +// the Scheduler or Job's as the Scheduler cannot +// be restarted after calling Shutdown. +func (s *scheduler) Shutdown() error { + s.shutdownCancel() + select { + case err := <-s.stopErrCh: + return err + case <-time.After(s.exec.stopTimeout + 2*time.Second): + return ErrStopSchedulerTimedOut + } +} + +// Update replaces the existing Job's JobDefinition with the provided +// JobDefinition. The Job's Job.ID() remains the same. +func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) { + return s.addOrUpdateJob(id, jobDefinition, task, options) +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ------------- Scheduler Options --------------- +// ----------------------------------------------- +// ----------------------------------------------- + +// SchedulerOption defines the function for setting +// options on the Scheduler. +type SchedulerOption func(*scheduler) error + +// WithDistributedElector sets the elector to be used by multiple +// Scheduler instances to determine who should be the leader. +// Only the leader runs jobs, while non-leaders wait and continue +// to check if a new leader has been elected. +func WithDistributedElector(elector Elector) SchedulerOption { + return func(s *scheduler) error { + if elector == nil { + return ErrWithDistributedElector + } + s.exec.elector = elector + return nil + } +} + +// WithClock sets the clock used by the Scheduler +// to the clock provided. See https://github.com/jonboulle/clockwork +func WithClock(clock clockwork.Clock) SchedulerOption { + return func(s *scheduler) error { + if clock == nil { + return ErrWithClockNil + } + s.clock = clock + return nil + } +} + +// WithGlobalJobOptions sets JobOption's that will be applied to +// all jobs added to the scheduler. JobOption's set on the job +// itself will override if the same JobOption is set globally. +func WithGlobalJobOptions(jobOptions ...JobOption) SchedulerOption { + return func(s *scheduler) error { + s.globalJobOptions = jobOptions + return nil + } +} + +// LimitMode defines the modes used for handling jobs that reach +// the limit provided in WithLimitConcurrentJobs +type LimitMode int + +const ( + // LimitModeReschedule causes jobs reaching the limit set in + // WithLimitConcurrentJobs or WithSingletonMode to be skipped + // and rescheduled for the next run time rather than being + // queued up to wait. + LimitModeReschedule = 1 + + // LimitModeWait causes jobs reaching the limit set in + // WithLimitConcurrentJobs or WithSingletonMode to wait + // in a queue until a slot becomes available to run. + // + // Note: this mode can produce unpredictable results as + // job execution order isn't guaranteed. For example, a job that + // executes frequently may pile up in the wait queue and be executed + // many times back to back when the queue opens. + // + // Warning: do not use this mode if your jobs will continue to stack + // up beyond the ability of the limit workers to keep up. An example of + // what NOT to do: + // + // s, _ := gocron.NewScheduler(gocron.WithLimitConcurrentJobs) + // s.NewJob( + // gocron.DurationJob( + // time.Second, + // Task{ + // Function: func() { + // time.Sleep(10 * time.Second) + // }, + // }, + // ), + // ) + LimitModeWait = 2 +) + +// WithLimitConcurrentJobs sets the limit and mode to be used by the +// Scheduler for limiting the number of jobs that may be running at +// a given time. +func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption { + return func(s *scheduler) error { + s.exec.limitMode = &limitModeConfig{ + mode: mode, + limit: limit, + in: make(chan uuid.UUID, 1000), + } + if mode == LimitModeReschedule { + s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit) + } + return nil + } +} + +// WithLocation sets the location (i.e. timezone) that the scheduler +// should operate within. In many systems time.Local is UTC. +// Default: time.Local +func WithLocation(location *time.Location) SchedulerOption { + return func(s *scheduler) error { + if location == nil { + return ErrWithLocationNil + } + s.location = location + return nil + } +} + +func WithLogger(logger Logger) SchedulerOption { + return func(s *scheduler) error { + if logger == nil { + return ErrWithLoggerNil + } + s.logger = logger + s.exec.logger = logger + return nil + } +} + +// WithStopTimeout sets the amount of time the Scheduler should +// wait gracefully for jobs to complete before returning when +// StopJobs() or Shutdown() are called. +// Default: 10 * time.Second +func WithStopTimeout(timeout time.Duration) SchedulerOption { + return func(s *scheduler) error { + s.exec.stopTimeout = timeout + return nil + } +} diff --git a/scheduler_test.go b/scheduler_test.go new file mode 100644 index 0000000..682fd2b --- /dev/null +++ b/scheduler_test.go @@ -0,0 +1,878 @@ +package gocron + +import ( + "context" + "log/slog" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func newTestScheduler(options ...SchedulerOption) (Scheduler, error) { + // default test options + out := []SchedulerOption{ + WithLogger(NewJSONSlogLogger(slog.LevelDebug)), + WithStopTimeout(time.Second), + } + + // append any additional options 2nd to override defaults if needed + out = append(out, options...) + return NewScheduler(out...) +} + +func TestScheduler_OneSecond_NoOptions(t *testing.T) { + defer goleak.VerifyNone(t) + cronNoOptionsCh := make(chan struct{}, 10) + durationNoOptionsCh := make(chan struct{}, 10) + + tests := []struct { + name string + ch chan struct{} + jd JobDefinition + tsk Task + }{ + { + "cron", + cronNoOptionsCh, + CronJob( + "* * * * * *", + true, + ), + NewTask( + func() { + cronNoOptionsCh <- struct{}{} + }, + ), + }, + { + "duration", + durationNoOptionsCh, + DurationJob( + time.Second, + ), + NewTask( + func() { + durationNoOptionsCh <- struct{}{} + }, + ), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler() + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, tt.tsk) + require.NoError(t, err) + + s.Start() + + startTime := time.Now() + var runCount int + for runCount < 1 { + <-tt.ch + runCount++ + } + err = s.Shutdown() + require.NoError(t, err) + stopTime := time.Now() + + select { + case <-tt.ch: + t.Fatal("job ran after scheduler was stopped") + case <-time.After(time.Millisecond * 50): + } + + runDuration := stopTime.Sub(startTime) + assert.GreaterOrEqual(t, runDuration, time.Millisecond) + assert.LessOrEqual(t, runDuration, 1500*time.Millisecond) + }) + } +} + +func TestScheduler_LongRunningJobs(t *testing.T) { + defer goleak.VerifyNone(t) + + durationCh := make(chan struct{}, 10) + durationSingletonCh := make(chan struct{}, 10) + + tests := []struct { + name string + ch chan struct{} + jd JobDefinition + tsk Task + opts []JobOption + options []SchedulerOption + expectedRuns int + }{ + { + "duration", + durationCh, + DurationJob( + time.Millisecond * 500, + ), + NewTask( + func() { + time.Sleep(1 * time.Second) + durationCh <- struct{}{} + }, + ), + nil, + []SchedulerOption{WithStopTimeout(time.Second * 2)}, + 3, + }, + { + "duration singleton", + durationSingletonCh, + DurationJob( + time.Millisecond * 500, + ), + NewTask( + func() { + time.Sleep(1 * time.Second) + durationSingletonCh <- struct{}{} + }, + ), + []JobOption{WithSingletonMode(LimitModeWait)}, + []SchedulerOption{WithStopTimeout(time.Second * 5)}, + 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler(tt.options...) + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, tt.tsk, tt.opts...) + require.NoError(t, err) + + s.Start() + time.Sleep(1600 * time.Millisecond) + err = s.Shutdown() + require.NoError(t, err) + + var runCount int + timeout := make(chan struct{}) + go func() { + time.Sleep(2 * time.Second) + close(timeout) + }() + Outer: + for { + select { + case <-tt.ch: + runCount++ + case <-timeout: + break Outer + } + } + + assert.Equal(t, tt.expectedRuns, runCount) + }) + } +} + +func TestScheduler_Update(t *testing.T) { + defer goleak.VerifyNone(t) + + durationJobCh := make(chan struct{}) + + tests := []struct { + name string + initialJob JobDefinition + updateJob JobDefinition + tsk Task + ch chan struct{} + runCount int + updateAfterCount int + expectedMinTime time.Duration + expectedMaxRunTime time.Duration + }{ + { + "duration, updated to another duration", + DurationJob( + time.Millisecond * 500, + ), + DurationJob( + time.Second, + ), + NewTask( + func() { + durationJobCh <- struct{}{} + }, + ), + durationJobCh, + 2, + 1, + time.Second * 1, + time.Second * 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler() + require.NoError(t, err) + + j, err := s.NewJob(tt.initialJob, tt.tsk) + require.NoError(t, err) + + startTime := time.Now() + s.Start() + + var runCount int + for runCount < tt.runCount { + select { + case <-tt.ch: + runCount++ + if runCount == tt.updateAfterCount { + _, err = s.Update(j.ID(), tt.updateJob, tt.tsk) + require.NoError(t, err) + } + default: + } + } + err = s.Shutdown() + require.NoError(t, err) + stopTime := time.Now() + + select { + case <-tt.ch: + t.Fatal("job ran after scheduler was stopped") + case <-time.After(time.Millisecond * 50): + } + + runDuration := stopTime.Sub(startTime) + assert.GreaterOrEqual(t, runDuration, tt.expectedMinTime) + assert.LessOrEqual(t, runDuration, tt.expectedMaxRunTime) + }) + } +} + +func TestScheduler_StopTimeout(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + jd JobDefinition + f any + opts []JobOption + }{ + { + "duration", + DurationJob( + time.Millisecond * 100, + ), + func(testDoneCtx context.Context) { + select { + case <-time.After(1 * time.Second): + case <-testDoneCtx.Done(): + } + }, + nil, + }, + { + "duration singleton", + DurationJob( + time.Millisecond * 100, + ), + func(testDoneCtx context.Context) { + select { + case <-time.After(1 * time.Second): + case <-testDoneCtx.Done(): + } + }, + []JobOption{WithSingletonMode(LimitModeWait)}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDoneCtx, cancel := context.WithCancel(context.Background()) + s, err := newTestScheduler( + WithStopTimeout(time.Millisecond * 100), + ) + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, NewTask(tt.f, testDoneCtx), tt.opts...) + require.NoError(t, err) + + s.Start() + time.Sleep(time.Millisecond * 200) + err = s.Shutdown() + assert.ErrorIs(t, err, ErrStopJobsTimedOut) + cancel() + time.Sleep(2 * time.Second) + }) + } +} + +func TestScheduler_Shutdown(t *testing.T) { + goleak.VerifyNone(t) + + t.Run("start, stop, start, shutdown", func(t *testing.T) { + s, err := newTestScheduler( + WithStopTimeout(time.Second), + ) + require.NoError(t, err) + _, err = s.NewJob( + DurationJob( + 50*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt( + WithStartImmediately(), + ), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(50 * time.Millisecond) + require.NoError(t, s.StopJobs()) + + time.Sleep(200 * time.Millisecond) + s.Start() + + time.Sleep(50 * time.Millisecond) + require.NoError(t, s.Shutdown()) + time.Sleep(200 * time.Millisecond) + }) + + t.Run("calling Job methods after shutdown errors", func(t *testing.T) { + s, err := newTestScheduler( + WithStopTimeout(time.Second), + ) + require.NoError(t, err) + j, err := s.NewJob( + DurationJob( + 100*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt( + WithStartImmediately(), + ), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(50 * time.Millisecond) + require.NoError(t, s.Shutdown()) + + _, err = j.LastRun() + assert.ErrorIs(t, err, ErrJobNotFound) + + _, err = j.NextRun() + assert.ErrorIs(t, err, ErrJobNotFound) + }) +} + +func TestScheduler_NewJob(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + jd JobDefinition + tsk Task + opts []JobOption + }{ + { + "cron with timezone", + CronJob( + "CRON_TZ=America/Chicago * * * * * *", + true, + ), + NewTask( + func() {}, + ), + nil, + }, + { + "cron with timezone, no seconds", + CronJob( + "CRON_TZ=America/Chicago * * * * *", + false, + ), + NewTask( + func() {}, + ), + nil, + }, + { + "random duration", + DurationRandomJob( + time.Second, + time.Second*5, + ), + NewTask( + func() {}, + ), + nil, + }, + { + "daily", + DailyJob( + 1, + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + NewTask( + func() {}, + ), + nil, + }, + { + "weekly", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + NewTask( + func() {}, + ), + nil, + }, + { + "monthly", + MonthlyJob( + 1, + NewDaysOfTheMonth(1, -1), + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + NewTask( + func() {}, + ), + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler() + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, tt.tsk, tt.opts...) + require.NoError(t, err) + + s.Start() + require.NoError(t, s.Shutdown()) + time.Sleep(50 * time.Millisecond) + }) + } +} + +func TestScheduler_NewJobErrors(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + jd JobDefinition + opts []JobOption + err error + }{ + { + "cron with timezone", + CronJob( + "bad cron", + true, + ), + nil, + ErrCronJobParse, + }, + { + "random with bad min/max", + DurationRandomJob( + time.Second*5, + time.Second, + ), + nil, + ErrDurationRandomJobMinMax, + }, + { + "daily job at times nil", + DailyJob( + 1, + nil, + ), + nil, + ErrDailyJobAtTimesNil, + }, + { + "daily job at time nil", + DailyJob( + 1, + NewAtTimes(nil), + ), + nil, + ErrDailyJobAtTimeNil, + }, + { + "daily job hours out of range", + DailyJob( + 1, + NewAtTimes( + NewAtTime(100, 0, 0), + ), + ), + nil, + ErrDailyJobHours, + }, + { + "daily job minutes out of range", + DailyJob( + 1, + NewAtTimes( + NewAtTime(1, 100, 0), + ), + ), + nil, + ErrDailyJobMinutesSeconds, + }, + { + "daily job seconds out of range", + DailyJob( + 1, + NewAtTimes( + NewAtTime(1, 0, 100), + ), + ), + nil, + ErrDailyJobMinutesSeconds, + }, + { + "weekly job at times nil", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + nil, + ), + nil, + ErrWeeklyJobAtTimesNil, + }, + { + "weekly job at time nil", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes(nil), + ), + nil, + ErrWeeklyJobAtTimeNil, + }, + { + "weekly job weekdays nil", + WeeklyJob( + 1, + nil, + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + nil, + ErrWeeklyJobDaysOfTheWeekNil, + }, + { + "weekly job hours out of range", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(100, 0, 0), + ), + ), + nil, + ErrWeeklyJobHours, + }, + { + "weekly job minutes out of range", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(1, 100, 0), + ), + ), + nil, + ErrWeeklyJobMinutesSeconds, + }, + { + "weekly job seconds out of range", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(1, 0, 100), + ), + ), + nil, + ErrWeeklyJobMinutesSeconds, + }, + { + "monthly job at times nil", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + nil, + ), + nil, + ErrMonthlyJobAtTimesNil, + }, + { + "monthly job at time nil", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes(nil), + ), + nil, + ErrMonthlyJobAtTimeNil, + }, + { + "monthly job days out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(0), + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + nil, + ErrMonthlyJobDays, + }, + { + "monthly job days out of range", + MonthlyJob( + 1, + nil, + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + nil, + ErrMonthlyJobDaysNil, + }, + { + "monthly job hours out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes( + NewAtTime(100, 0, 0), + ), + ), + nil, + ErrMonthlyJobHours, + }, + { + "monthly job minutes out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes( + NewAtTime(1, 100, 0), + ), + ), + nil, + ErrMonthlyJobMinutesSeconds, + }, + { + "monthly job seconds out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes( + NewAtTime(1, 0, 100), + ), + ), + nil, + ErrMonthlyJobMinutesSeconds, + }, + { + "WithName no name", + DurationJob( + time.Second, + ), + []JobOption{WithName("")}, + ErrWithNameEmpty, + }, + { + "WithStartDateTime is zero", + DurationJob( + time.Second, + ), + []JobOption{WithStartAt(WithStartDateTime(time.Time{}))}, + ErrWithStartDateTimePast, + }, + { + "WithStartDateTime is in the past", + DurationJob( + time.Second, + ), + []JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))}, + ErrWithStartDateTimePast, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler( + WithStopTimeout(time.Millisecond * 50), + ) + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, NewTask(func() {}), tt.opts...) + assert.ErrorIs(t, err, tt.err) + err = s.Shutdown() + require.NoError(t, err) + }) + } +} + +func TestScheduler_Singleton(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + duration time.Duration + limitMode LimitMode + runCount int + expectedMin time.Duration + expectedMax time.Duration + }{ + { + "singleton mode reschedule", + time.Millisecond * 100, + LimitModeReschedule, + 3, + time.Millisecond * 600, + time.Millisecond * 1100, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jobRanCh := make(chan struct{}, 10) + + s, err := newTestScheduler( + WithStopTimeout(1 * time.Second), + ) + require.NoError(t, err) + + _, err = s.NewJob( + DurationJob( + tt.duration, + ), + NewTask(func() { + time.Sleep(tt.duration * 2) + jobRanCh <- struct{}{} + }), + WithSingletonMode(tt.limitMode), + ) + require.NoError(t, err) + + start := time.Now() + s.Start() + + var runCount int + for runCount < tt.runCount { + select { + case <-jobRanCh: + runCount++ + case <-time.After(time.Second): + t.Fatalf("timed out waiting for jobs to run") + } + } + + stop := time.Now() + require.NoError(t, s.Shutdown()) + + assert.GreaterOrEqual(t, stop.Sub(start), tt.expectedMin) + assert.LessOrEqual(t, stop.Sub(start), tt.expectedMax) + }) + } +} + +func TestScheduler_LimitMode(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + numJobs int + limit uint + limitMode LimitMode + duration time.Duration + expectedMin time.Duration + expectedMax time.Duration + }{ + { + "limit mode reschedule", + 10, + 2, + LimitModeReschedule, + time.Millisecond * 100, + time.Millisecond * 400, + time.Millisecond * 700, + }, + { + "limit mode wait", + 10, + 2, + LimitModeWait, + time.Millisecond * 100, + time.Millisecond * 200, + time.Millisecond * 500, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler( + WithLimitConcurrentJobs(tt.limit, tt.limitMode), + WithStopTimeout(2*time.Second), + ) + require.NoError(t, err) + + jobRanCh := make(chan struct{}, 20) + + for i := 0; i < tt.numJobs; i++ { + _, err = s.NewJob( + DurationJob(tt.duration), + NewTask(func() { + time.Sleep(tt.duration / 2) + jobRanCh <- struct{}{} + }), + ) + require.NoError(t, err) + } + + start := time.Now() + s.Start() + + var runCount int + for runCount < tt.numJobs { + select { + case <-jobRanCh: + runCount++ + case <-time.After(time.Second): + t.Fatalf("timed out waiting for jobs to run") + } + } + stop := time.Now() + err = s.Shutdown() + require.NoError(t, err) + + assert.GreaterOrEqual(t, stop.Sub(start), tt.expectedMin) + assert.LessOrEqual(t, stop.Sub(start), tt.expectedMax) + }) + } +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..f539078 --- /dev/null +++ b/util.go @@ -0,0 +1,122 @@ +package gocron + +import ( + "context" + "reflect" + "slices" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/exp/maps" +) + +func callJobFuncWithParams(jobFunc any, params ...any) error { + if jobFunc == nil { + return nil + } + f := reflect.ValueOf(jobFunc) + if f.IsZero() { + return nil + } + if len(params) != f.Type().NumIn() { + return nil + } + in := make([]reflect.Value, len(params)) + for k, param := range params { + in[k] = reflect.ValueOf(param) + } + vals := f.Call(in) + for _, val := range vals { + i := val.Interface() + if err, ok := i.(error); ok { + return err + } + } + return nil +} + +func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + return requestJobCtx(ctx, id, ch) +} + +func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob { + resp := make(chan internalJob, 1) + select { + case <-ctx.Done(): + return nil + default: + } + + select { + case ch <- jobOutRequest{ + id: id, + outChan: resp, + }: + default: + return nil + } + var j internalJob + select { + case <-ctx.Done(): + return nil + case jobReceived := <-resp: + j = jobReceived + } + return &j +} + +func removeSliceDuplicatesInt(in []int) []int { + m := make(map[int]struct{}) + + for _, i := range in { + m[i] = struct{}{} + } + return maps.Keys(m) +} + +func convertAtTimesToDateTime(atTimes AtTimes, location *time.Location) ([]time.Time, error) { + if atTimes == nil { + return nil, errAtTimesNil + } + var atTimesDate []time.Time + for _, a := range atTimes() { + if a == nil { + return nil, errAtTimeNil + } + at := a() + if at.hours > 23 { + return nil, errAtTimeHours + } else if at.minutes > 59 || at.seconds > 59 { + return nil, errAtTimeMinSec + } + atTimesDate = append(atTimesDate, at.time(location)) + } + slices.SortStableFunc(atTimesDate, func(a, b time.Time) int { + return a.Compare(b) + }) + return atTimesDate, nil +} + +type waitGroupWithMutex struct { + wg sync.WaitGroup + mu sync.Mutex +} + +func (w *waitGroupWithMutex) Add(delta int) { + w.mu.Lock() + defer w.mu.Unlock() + w.wg.Add(delta) +} + +func (w *waitGroupWithMutex) Done() { + w.wg.Done() +} + +func (w *waitGroupWithMutex) Wait() { + w.mu.Lock() + defer w.mu.Unlock() + w.wg.Wait() +} diff --git a/util_test.go b/util_test.go new file mode 100644 index 0000000..2214ade --- /dev/null +++ b/util_test.go @@ -0,0 +1,165 @@ +package gocron + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRemoveSliceDuplicatesInt(t *testing.T) { + tests := []struct { + name string + input []int + expected []int + }{ + { + "lots of duplicates", + []int{ + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, + }, + []int{1, 2, 3, 4, 5}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := removeSliceDuplicatesInt(tt.input) + assert.ElementsMatch(t, tt.expected, result) + }) + } +} + +func TestCallJobFuncWithParams(t *testing.T) { + type f1 func() + tests := []struct { + name string + jobFunc any + params []any + expectedErr error + }{ + { + "nil jobFunc", + nil, + nil, + nil, + }, + { + "zero jobFunc", + f1(nil), + nil, + nil, + }, + { + "wrong number of params", + func(one string, two int) {}, + []any{"one"}, + nil, + }, + { + "function that returns an error", + func() error { + return fmt.Errorf("test error") + }, + nil, + fmt.Errorf("test error"), + }, + { + "function that returns no error", + func() error { + return nil + }, + nil, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := callJobFuncWithParams(tt.jobFunc, tt.params...) + assert.Equal(t, tt.expectedErr, err) + }) + } +} + +func TestConvertAtTimesToDateTime(t *testing.T) { + tests := []struct { + name string + atTimes AtTimes + location *time.Location + expected []time.Time + err error + }{ + { + "atTimes is nil", + nil, + time.UTC, + nil, + errAtTimesNil, + }, + { + "atTime is nil", + NewAtTimes(nil), + time.UTC, + nil, + errAtTimeNil, + }, + { + "atTimes hours is invalid", + NewAtTimes( + NewAtTime(24, 0, 0), + ), + time.UTC, + nil, + errAtTimeHours, + }, + { + "atTimes minutes are invalid", + NewAtTimes( + NewAtTime(0, 60, 0), + ), + time.UTC, + nil, + errAtTimeMinSec, + }, + { + "atTimes seconds are invalid", + NewAtTimes( + NewAtTime(0, 0, 60), + ), + time.UTC, + nil, + errAtTimeMinSec, + }, + { + "atTimes valid", + NewAtTimes( + NewAtTime(0, 0, 3), + NewAtTime(0, 0, 0), + NewAtTime(0, 0, 1), + NewAtTime(0, 0, 2), + ), + time.UTC, + []time.Time{ + time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC), + time.Date(0, 0, 0, 0, 0, 1, 0, time.UTC), + time.Date(0, 0, 0, 0, 0, 2, 0, time.UTC), + time.Date(0, 0, 0, 0, 0, 3, 0, time.UTC), + }, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := convertAtTimesToDateTime(tt.atTimes, tt.location) + assert.Equal(t, tt.expected, result) + assert.Equal(t, tt.err, err) + }) + } +}