From ad26a71e0ec6028e5638fbbc75ad03824b50a050 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 8 Nov 2023 11:11:42 -0600 Subject: [PATCH] initial clean v2 commit history --- .github/FUNDING.yml | 12 + .github/dependabot.yml | 18 + .github/workflows/codeql-analysis.yml | 71 +++ .github/workflows/file_formatting.yml | 19 + .github/workflows/go_test.yml | 30 + .gitignore | 20 + .golangci.yaml | 52 ++ .pre-commit-config.yaml | 24 + CODE_OF_CONDUCT.md | 73 +++ CONTRIBUTING.md | 40 ++ LICENSE | 21 + Makefile | 13 + README.md | 121 ++++ SECURITY.md | 15 + distributed.go | 11 + errors.go | 45 ++ example_test.go | 654 +++++++++++++++++++ executor.go | 355 +++++++++++ go.mod | 21 + go.sum | 28 + job.go | 779 +++++++++++++++++++++++ job_test.go | 430 +++++++++++++ logger.go | 67 ++ scheduler.go | 633 +++++++++++++++++++ scheduler_test.go | 878 ++++++++++++++++++++++++++ util.go | 122 ++++ util_test.go | 165 +++++ 27 files changed, 4717 insertions(+) create mode 100644 .github/FUNDING.yml create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/codeql-analysis.yml create mode 100644 .github/workflows/file_formatting.yml create mode 100644 .github/workflows/go_test.yml create mode 100644 .gitignore create mode 100644 .golangci.yaml create mode 100644 .pre-commit-config.yaml create mode 100644 CODE_OF_CONDUCT.md create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 SECURITY.md create mode 100644 distributed.go create mode 100644 errors.go create mode 100644 example_test.go create mode 100644 executor.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 job.go create mode 100644 job_test.go create mode 100644 logger.go create mode 100644 scheduler.go create mode 100644 scheduler_test.go create mode 100644 util.go create mode 100644 util_test.go diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..6fa87f9 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1,12 @@ +# These are supported funding model platforms + +github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] +patreon: # Replace with a single Patreon username +open_collective: gocron +ko_fi: # Replace with a single Ko-fi username +tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel +community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry +liberapay: # Replace with a single Liberapay username +issuehunt: # Replace with a single IssueHunt username +otechie: # Replace with a single Otechie username +custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2'] diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..ca534ca --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,18 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://help.github.com/github/administering-a-repository/configuration-options-for-dependency-updates + +version: 2 +updates: + # Maintain dependencies for GitHub Actions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + + # Maintain Go dependencies + - package-ecosystem: "gomod" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml new file mode 100644 index 0000000..ba012d4 --- /dev/null +++ b/.github/workflows/codeql-analysis.yml @@ -0,0 +1,71 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL" + +on: + push: + branches: [ v2 ] + branches-ignore: + - "dependabot/**" + pull_request: + paths-ignore: + - '**.md' + # The branches below must be a subset of the branches above + branches: [ v2 ] + schedule: + - cron: '34 7 * * 1' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + # CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ] + # Learn more: + # https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + # queries: ./path/to/local/query, your-org/your-repo/queries@main + + # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). + # If this step fails, then you should remove it and run the build manually (see below) + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + + # ℹ️ Command-line programs to run using the OS shell. + # 📚 https://git.io/JvXDl + + # ✏️ If the Autobuild fails above, remove it and uncomment the following three lines + # and modify them (or add more) to build your code if your project + # uses a compiled language + + #- run: | + # make bootstrap + # make release + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/file_formatting.yml b/.github/workflows/file_formatting.yml new file mode 100644 index 0000000..eb3d588 --- /dev/null +++ b/.github/workflows/file_formatting.yml @@ -0,0 +1,19 @@ +on: + push: + branches: + - v2 + pull_request: + branches: + - v2 + +name: formatting +jobs: + check-sorted: + name: check sorted + runs-on: ubuntu-latest + steps: + - name: checkout code + uses: actions/checkout@v3 + - name: verify example_test.go + run: | + grep "^func " example_test.go | sort -C diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml new file mode 100644 index 0000000..0dcc90a --- /dev/null +++ b/.github/workflows/go_test.yml @@ -0,0 +1,30 @@ +on: + push: + branches: + - v2 + pull_request: + branches: + - v2 + +name: golangci-lint +jobs: + golangci: + strategy: + matrix: + go-version: + - "1.21" + name: lint and test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Install Go + uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go-version }} + - name: golangci-lint + uses: golangci/golangci-lint-action@v3.7.0 + with: + version: v1.55.2 + - name: test + run: make test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6657e3c --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test +local_testing +coverage.out + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +vendor/ + +# IDE project files +.idea diff --git a/.golangci.yaml b/.golangci.yaml new file mode 100644 index 0000000..faa3fdb --- /dev/null +++ b/.golangci.yaml @@ -0,0 +1,52 @@ +run: + timeout: 5m + issues-exit-code: 1 + tests: true + skip-dirs: + - local + +issues: + max-same-issues: 100 + exclude-rules: + - path: _test\.go + linters: + - bodyclose + - errcheck + - gosec + +linters: + enable: + - bodyclose + - exportloopref + - gofumpt + - goimports + - gosec + - gosimple + - govet + - ineffassign + - misspell + - revive + - staticcheck + - typecheck + - unused + - whitespace + +output: + # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" + format: colored-line-number + # print lines of code with issue, default is true + print-issued-lines: true + # print linter name in the end of issue text, default is true + print-linter-name: true + # make issues output unique by line, default is true + uniq-by-line: true + # add a prefix to the output file references; default is no prefix + path-prefix: "" + # sorts results by: filepath, line and column + sort-results: true + +linters-settings: + golint: + min-confidence: 0.8 + +fix: true diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..99b237e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,24 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.5.0 + hooks: + - id: check-added-large-files + - id: check-case-conflict + - id: check-merge-conflict + - id: check-yaml + - id: detect-private-key + - id: end-of-file-fixer + - id: trailing-whitespace + - repo: https://github.com/golangci/golangci-lint + rev: v1.55.2 + hooks: + - id: golangci-lint + - repo: https://github.com/TekWizely/pre-commit-golang + rev: v1.0.0-rc.1 + hooks: + - id: go-fumpt + args: + - -w + - id: go-mod-tidy diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000..7d913b5 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,73 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone. And we mean everyone! + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and kind language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team initially on Slack to coordinate private communication. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..b2d3be8 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,40 @@ +# Contributing to gocron + +Thank you for coming to contribute to gocron! We welcome new ideas, PRs and general feedback. + +## Reporting Bugs + +If you find a bug then please let the project know by opening an issue after doing the following: + +- Do a quick search of the existing issues to make sure the bug isn't already reported +- Try and make a minimal list of steps that can reliably reproduce the bug you are experiencing +- Collect as much information as you can to help identify what the issue is (project version, configuration files, etc) + +## Suggesting Enhancements + +If you have a use case that you don't see a way to support yet, we would welcome the feedback in an issue. Before opening the issue, please consider: + +- Is this a common use case? +- Is it simple to understand? + +You can help us out by doing the following before raising a new issue: + +- Check that the feature hasn't been requested already by searching existing issues +- Try and reduce your enhancement into a single, concise and deliverable request, rather than a general idea +- Explain your own use cases as the basis of the request + +## Adding Features + +Pull requests are always welcome. However, before going through the trouble of implementing a change it's worth creating a bug or feature request issue. +This allows us to discuss the changes and make sure they are a good fit for the project. + +Please always make sure a pull request has been: + +- Unit tested with `make test` +- Linted with `make lint` +- Vetted with `make vet` +- Formatted with `make fmt` or validated with `make check-fmt` + +## Writing Tests + +Tests should follow the [table driven test pattern](https://dave.cheney.net/2013/06/09/writing-table-driven-tests-in-go). See other tests in the code base for additional examples. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3357d57 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2014, 辣椒面 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1e16aef --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.PHONY: fmt check-fmt lint vet test + +GO_PKGS := $(shell go list -f {{.Dir}} ./...) + +fmt: + @go list -f {{.Dir}} ./... | xargs -I{} gofmt -w -s {} + +lint: + @grep "^func " example_test.go | sort -c + @golangci-lint run + +test: + @go test -race -v $(GO_FLAGS) -count=1 $(GO_PKGS) diff --git a/README.md b/README.md new file mode 100644 index 0000000..83a3516 --- /dev/null +++ b/README.md @@ -0,0 +1,121 @@ +# gocron: A Golang Job Scheduling Package. + +[![CI State](https://github.com/go-co-op/gocron/actions/workflows/go_test.yml/badge.svg?branch=v2&event=push)](https://github.com/go-co-op/gocron/actions) +![Go Report Card](https://goreportcard.com/badge/github.com/go-co-op/gocron) [![Go Doc](https://godoc.org/github.com/go-co-op/gocron/v2?status.svg)](https://pkg.go.dev/github.com/go-co-op/gocron/v2) + +gocron is a job scheduling package which lets you run Go functions at pre-determined intervals. + +If you want to chat, you can find us on Slack at +[](https://gophers.slack.com/archives/CQ7T0T1FW) + +## Concepts + +- **Job**: The encapsulates a "task", which is made up of a go func and any function parameters, and then + provides the scheduler with the time the job should be scheduled to run. +- **Executor**: The executor, calls the "task" function and manages the complexities of different job + execution timing (e.g. singletons that shouldn't overrun each other, limiting the max number of jobs running) +- **Scheduler**: The scheduler keeps track of all the jobs and sends each job to the executor when + it is ready to be run. + +## Quick Start + +``` +go get github.com/go-co-op/gocron/v2 +``` + +```golang +package main + +import ( + "fmt" + "time" + + "github.com/go-co-op/gocron/v2" +) + +func main() { + // create a scheduler + s, err := gocron.NewScheduler() + if err != nil { + // handle error + } + + // add a job to the scheduler + j, err := s.NewJob( + gocron.DurationJob( + 10*time.Second, + ), + gocron.NewTask( + func(a string, b int) { + // do things + }, + "hello", + 1, + ), + ) + if err != nil { + // handle error + } + // each job has a unique id + fmt.Println(j.ID()) + + // start the scheduler + s.Start() + + // when you're done, shut it down + err = s.Shutdown() + if err != nil { + // handle error + } +} +``` + +## Features + +- **Job types**: Jobs can be run at various intervals. + - [**Duration**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationJob): + Jobs can be run at a fixed `time.Duration`. + - [**Random duration**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DurationRandomJob): + Jobs can be run at a random `time.Duration` between a min and max. + - [**Cron**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#CronJob): + Jobs can be run using a crontab. + - [**Daily**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#DailyJob): + Jobs can be run every x days at specific times. + - [**Weekly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WeeklyJob): + Jobs can be run every x weeks on specific days of the week and at specific times. + - [**Monthly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonthlyJob): + Jobs can be run every x months on specific days of the month and at specific times. +- **Limited Concurrency**: Jobs can be limited individually or across the entire scheduler. + - [**Per job limiting with singleton mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithSingletonMode): + Jobs can be limited to a single concurrent execution that either reschedules (skips overlapping executions) + or queues (waits for the previous execution to finish). + - [**Per scheduler limiting with limit mode**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithLimitConcurrentJobs): + Jobs can be limited to a certain number of concurrent executions across the entire scheduler + using either reschedule (skip when the limit is met) or queue (jobs are added to a queue to + wait for the limit to be available). +- **Distributed instances of gocron**: Multiple instances of gocron can be run. + - [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector): + An elector can be used to elect a single instance of gocron to run as the primary with the + other instances checking to see if a new leader needs to be elected. +- **Events**: Job events can trigger actions. + - [**Listeners**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithEventListeners): + [Event listeners](https://pkg.go.dev/github.com/go-co-op/gocron/v2#EventListener) + can be added to a job or all jobs in the + [scheduler](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithGlobalJobOptions) + to listen for job events and trigger actions. +- **Options**: Many job and scheduler options are available + - [**Job options**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#JobOption): + Job options can be set when creating a job using `NewJob`. + - [**Global job options**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithGlobalJobOptions): + Global job options can be set when creating a scheduler using `NewScheduler`. + - [**Scheduler options**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#SchedulerOption): + Scheduler options can be set when creating a scheduler using `NewScheduler`. + +## Supporters + +[Jetbrains](https://www.jetbrains.com/?from=gocron) supports this project with Intellij licenses. +We appreciate their support for free and open source software! + +## Star History + +[![Star History Chart](https://api.star-history.com/svg?repos=go-co-op/gocron&type=Date)](https://star-history.com/#go-co-op/gocron&Date) diff --git a/SECURITY.md b/SECURITY.md new file mode 100644 index 0000000..6a97ada --- /dev/null +++ b/SECURITY.md @@ -0,0 +1,15 @@ +# Security Policy + +## Supported Versions + +The current plan is to maintain version 1 as long as possible incorporating any necessary security patches. + +| Version | Supported | +| ------- | ------------------ | +| 1.x.x | :white_check_mark: | + +## Reporting a Vulnerability + +Vulnerabilities can be reported by [opening an issue](https://github.com/go-co-op/gocron/issues/new/choose) or reaching out on Slack: [](https://gophers.slack.com/archives/CQ7T0T1FW) + +We will do our best to addrerss any vulnerabilities in an expeditious manner. diff --git a/distributed.go b/distributed.go new file mode 100644 index 0000000..aef77a0 --- /dev/null +++ b/distributed.go @@ -0,0 +1,11 @@ +package gocron + +import "context" + +// Elector determines the leader from instances asking to be the leader. Only +// the leader runs jobs. If the leader goes down, a new leader will be elected. +type Elector interface { + // IsLeader should return nil if the job should be scheduled by the instance + // making the request and an error if the job should not be scheduled. + IsLeader(context.Context) error +} diff --git a/errors.go b/errors.go new file mode 100644 index 0000000..a331982 --- /dev/null +++ b/errors.go @@ -0,0 +1,45 @@ +package gocron + +import "fmt" + +// Public error definitions +var ( + ErrCronJobParse = fmt.Errorf("gocron: CronJob: crontab parse failure") + ErrDailyJobAtTimeNil = fmt.Errorf("gocron: DailyJob: atTime within atTimes must not be nil") + ErrDailyJobAtTimesNil = fmt.Errorf("gocron: DailyJob: atTimes must not be nil") + ErrDailyJobHours = fmt.Errorf("gocron: DailyJob: atTimes hours must be between 0 and 23 inclusive") + ErrDailyJobMinutesSeconds = fmt.Errorf("gocron: DailyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") + ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") + ErrJobNotFound = fmt.Errorf("gocron: job not found") + ErrMonthlyJobDays = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must be between 31 and -31 inclusive, and not 0") + ErrMonthlyJobAtTimeNil = fmt.Errorf("gocron: MonthlyJob: atTime within atTimes must not be nil") + ErrMonthlyJobAtTimesNil = fmt.Errorf("gocron: MonthlyJob: atTimes must not be nil") + ErrMonthlyJobDaysNil = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must not be nil") + ErrMonthlyJobHours = fmt.Errorf("gocron: MonthlyJob: atTimes hours must be between 0 and 23 inclusive") + ErrMonthlyJobMinutesSeconds = fmt.Errorf("gocron: MonthlyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrNewJobTask = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func") + ErrNewJobTaskNil = fmt.Errorf("gocron: NewJob: Task must not be nil") + ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop") + ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish") + ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop") + ErrWeeklyJobAtTimeNil = fmt.Errorf("gocron: WeeklyJob: atTime within atTimes must not be nil") + ErrWeeklyJobAtTimesNil = fmt.Errorf("gocron: WeeklyJob: atTimes must not be nil") + ErrWeeklyJobDaysOfTheWeekNil = fmt.Errorf("gocron: WeeklyJob: daysOfTheWeek must not be nil") + ErrWeeklyJobHours = fmt.Errorf("gocron: WeeklyJob: atTimes hours must be between 0 and 23 inclusive") + ErrWeeklyJobMinutesSeconds = fmt.Errorf("gocron: WeeklyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrWithDistributedElector = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil") + ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil") + ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") + ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") + ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") + ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") +) + +// internal errors +var ( + errAtTimeNil = fmt.Errorf("errAtTimeNil") + errAtTimesNil = fmt.Errorf("errAtTimesNil") + errAtTimeHours = fmt.Errorf("errAtTimeHours") + errAtTimeMinSec = fmt.Errorf("errAtTimeMinSec") +) diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..e94efbc --- /dev/null +++ b/example_test.go @@ -0,0 +1,654 @@ +package gocron_test + +import ( + "fmt" + "log/slog" + "sync" + "time" + + . "github.com/go-co-op/gocron/v2" // nolint:revive + "github.com/google/uuid" + "github.com/jonboulle/clockwork" +) + +func ExampleAfterJobRuns() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + AfterJobRuns( + func(jobID uuid.UUID) { + // do something after the job completes + }, + ), + ), + ) +} + +func ExampleAfterJobRunsWithError() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + AfterJobRunsWithError( + func(jobID uuid.UUID, err error) { + // do something when the job returns an error + }, + ), + ), + ) +} + +func ExampleBeforeJobRuns() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + BeforeJobRuns( + func(jobID uuid.UUID) { + // do something immediately before the job is run + }, + ), + ), + ) +} + +func ExampleCronJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + CronJob( + // standard cron tab parsing + "1 * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + _, _ = s.NewJob( + CronJob( + // optionally include seconds as the first field + "* 1 * * * *", + true, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleDailyJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DailyJob( + 1, + NewAtTimes( + NewAtTime(10, 30, 0), + NewAtTime(14, 0, 0), + ), + ), + NewTask( + func(a, b string) {}, + "a", + "b", + ), + ) +} + +func ExampleDurationJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second*5, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleDurationRandomJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationRandomJob( + time.Second, + 5*time.Second, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleJob_ID() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(j.ID()) +} + +func ExampleJob_LastRun() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(j.LastRun()) +} + +func ExampleJob_NextRun() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(j.NextRun()) +} + +func ExampleMonthlyJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + MonthlyJob( + 1, + NewDaysOfTheMonth(3, -5, -1), + NewAtTimes( + NewAtTime(10, 30, 0), + NewAtTime(11, 15, 0), + ), + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleNewScheduler() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + fmt.Println(s.Jobs()) +} + +func ExampleScheduler_NewJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, err := s.NewJob( + DurationJob( + 10*time.Second, + ), + NewTask( + func() {}, + ), + ) + if err != nil { + panic(err) + } + fmt.Println(j.ID()) +} + +func ExampleScheduler_RemoveByTags() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithTags("tag1"), + ) + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithTags("tag2"), + ) + fmt.Println(len(s.Jobs())) + + time.Sleep(20 * time.Millisecond) + + s.RemoveByTags("tag1", "tag2") + + fmt.Println(len(s.Jobs())) + // Output: + // 2 + // 0 +} + +func ExampleScheduler_RemoveJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + ) + + fmt.Println(len(s.Jobs())) + time.Sleep(20 * time.Millisecond) + + _ = s.RemoveJob(j.ID()) + + fmt.Println(len(s.Jobs())) + // Output: + // 1 + // 0 +} + +func ExampleScheduler_Start() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + CronJob( + "* * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + + s.Start() +} + +func ExampleScheduler_StopJobs() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + CronJob( + "* * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + + s.Start() + + _ = s.StopJobs() +} + +func ExampleScheduler_Update() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + CronJob( + "* * * * *", + false, + ), + NewTask( + func() {}, + ), + ) + + s.Start() + + // after some time, need to change the job + + j, _ = s.Update( + j.ID(), + DurationJob( + 5*time.Second, + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleWeeklyJob() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + WeeklyJob( + 2, + NewWeekdays(time.Tuesday, time.Wednesday, time.Saturday), + NewAtTimes( + NewAtTime(1, 30, 0), + NewAtTime(12, 0, 30), + ), + ), + NewTask( + func() {}, + ), + ) +} + +func ExampleWithClock() { + fakeClock := clockwork.NewFakeClock() + s, _ := NewScheduler( + WithClock(fakeClock), + ) + var wg sync.WaitGroup + wg.Add(1) + _, _ = s.NewJob( + DurationJob( + time.Second*5, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d\n", one, two) + wg.Done() + }, + "one", 2, + ), + ) + s.Start() + fakeClock.BlockUntil(1) + fakeClock.Advance(time.Second * 5) + wg.Wait() + _ = s.StopJobs() + // Output: + // one, 2 +} + +func ExampleWithDistributedElector() { + //var _ Elector = (*myElector)(nil) + // + //type myElector struct{} + // + //func (m myElector) IsLeader(_ context.Context) error { + // return nil + //} + // + //elector := myElector{} + // + //_, _ = NewScheduler( + // WithDistributedElector(elector), + //) +} + +func ExampleWithEventListeners() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithEventListeners( + AfterJobRuns( + func(jobID uuid.UUID) { + // do something after the job completes + }, + ), + AfterJobRunsWithError( + func(jobID uuid.UUID, err error) { + // do something when the job returns an error + }, + ), + BeforeJobRuns( + func(jobID uuid.UUID) { + // do something immediately before the job is run + }, + ), + ), + ) +} + +func ExampleWithGlobalJobOptions() { + s, _ := NewScheduler( + WithGlobalJobOptions( + WithTags("tag1", "tag2", "tag3"), + ), + ) + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + ) + // The job will have the globally applied tags + fmt.Println(j.Tags()) + + s2, _ := NewScheduler( + WithGlobalJobOptions( + WithTags("tag1", "tag2", "tag3"), + ), + ) + j2, _ := s2.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithTags("tag4", "tag5", "tag6"), + ) + // The job will have the tags set specifically on the job + // overriding those set globally by the scheduler + fmt.Println(j2.Tags()) + // Output: + // [tag1 tag2 tag3] + // [tag4 tag5 tag6] +} + +func ExampleWithLimitConcurrentJobs() { + _, _ = NewScheduler( + WithLimitConcurrentJobs( + 1, + LimitModeReschedule, + ), + ) +} + +func ExampleWithLimitedRuns() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Millisecond, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d\n", one, two) + }, + "one", 2, + ), + WithLimitedRuns(1), + ) + s.Start() + + time.Sleep(100 * time.Millisecond) + fmt.Printf("no jobs in scheduler: %v\n", s.Jobs()) + _ = s.StopJobs() + // Output: + // one, 2 + // no jobs in scheduler: [] +} + +func ExampleWithLocation() { + location, _ := time.LoadLocation("Asia/Kolkata") + + _, _ = NewScheduler( + WithLocation(location), + ) +} + +func ExampleWithLogger() { + _, _ = NewScheduler( + WithLogger( + NewJSONSlogLogger(slog.LevelInfo), + ), + ) +} + +func ExampleWithName() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithName("job 1"), + ) + fmt.Println(j.Name()) + // Output: + // job 1 +} + +func ExampleWithSingletonMode() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + _, _ = s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() { + // this job will skip half it's executions + // and effectively run every 2 seconds + time.Sleep(1500 * time.Second) + }, + ), + WithSingletonMode(LimitModeReschedule), + ) +} + +func ExampleWithStartAt() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + start := time.Date(9999, 9, 9, 9, 9, 9, 9, time.UTC) + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithStartAt( + WithStartDateTime(start), + ), + ) + s.Start() + time.Sleep(20 * time.Millisecond) + + next, _ := j.NextRun() + fmt.Println(next) + + _ = s.StopJobs() + // Output: + // 9999-09-09 09:09:09.000000009 +0000 UTC +} + +func ExampleWithStopTimeout() { + _, _ = NewScheduler( + WithStopTimeout(time.Second * 5), + ) +} + +func ExampleWithTags() { + s, _ := NewScheduler() + defer func() { _ = s.Shutdown() }() + + j, _ := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func(one string, two int) { + fmt.Printf("%s, %d", one, two) + }, + "one", 2, + ), + WithTags("tag1", "tag2", "tag3"), + ) + fmt.Println(j.Tags()) + // Output: + // [tag1 tag2 tag3] +} diff --git a/executor.go b/executor.go new file mode 100644 index 0000000..ea83b7b --- /dev/null +++ b/executor.go @@ -0,0 +1,355 @@ +package gocron + +import ( + "context" + "log" + "strconv" + "sync" + "time" + + "github.com/google/uuid" +) + +type executor struct { + ctx context.Context + cancel context.CancelFunc + logger Logger + stopCh chan struct{} + jobsIDsIn chan uuid.UUID + jobIDsOut chan uuid.UUID + jobOutRequest chan jobOutRequest + stopTimeout time.Duration + done chan error + singletonRunners map[uuid.UUID]singletonRunner + limitMode *limitModeConfig + elector Elector +} + +type singletonRunner struct { + in chan uuid.UUID + rescheduleLimiter chan struct{} +} + +type limitModeConfig struct { + started bool + mode LimitMode + limit uint + rescheduleLimiter chan struct{} + in chan uuid.UUID +} + +func (e *executor) start() { + e.logger.Debug("executor started") + + // creating the executor's context here as the executor + // is the only goroutine that should access this context + // any other uses within the executor should create a context + // using the executor context as parent. + e.ctx, e.cancel = context.WithCancel(context.Background()) + + // the standardJobsWg tracks + standardJobsWg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } + + singletonJobsWg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } + + limitModeJobsWg := waitGroupWithMutex{ + wg: sync.WaitGroup{}, + mu: sync.Mutex{}, + } + + // start the for leap that is the executor + // selecting on channels for work to do + for { + select { + // job ids in are sent from 1 of 2 places: + // 1. the scheduler sends directly when jobs + // are run immediately. + // 2. sent from time.AfterFuncs in which job schedules + // are spun up by the scheduler + case id := <-e.jobsIDsIn: + select { + case <-e.stopCh: + e.stop(&standardJobsWg, &singletonJobsWg, &limitModeJobsWg) + return + default: + } + // this context is used to handle cancellation of the executor + // on requests for a job to the scheduler via requestJobCtx + ctx, cancel := context.WithCancel(e.ctx) + + if e.limitMode != nil && !e.limitMode.started { + // check if we are already running the limit mode runners + // if not, spin up the required number i.e. limit! + e.limitMode.started = true + for i := e.limitMode.limit; i > 0; i-- { + limitModeJobsWg.Add(1) + go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, &limitModeJobsWg, e.limitMode.mode, e.limitMode.rescheduleLimiter) + } + } + + // spin off into a goroutine to unblock the executor and + // allow for processing for more work + go func() { + // make sure to cancel the above context per the docs + // // Canceling this context releases resources associated with it, so code should + // // call cancel as soon as the operations running in this Context complete. + defer cancel() + + // check for limit mode - this spins up a separate runner which handles + // limiting the total number of concurrently running jobs + if e.limitMode != nil { + if e.limitMode.mode == LimitModeReschedule { + select { + // rescheduleLimiter is a channel the size of the limit + // this blocks publishing to the channel and keeps + // the executor from building up a waiting queue + // and forces rescheduling + case e.limitMode.rescheduleLimiter <- struct{}{}: + e.limitMode.in <- id + default: + // all runners are busy, reschedule the work for later + // which means we just skip it here and do nothing + // TODO when metrics are added, this should increment a rescheduled metric + select { + case e.jobIDsOut <- id: + default: + } + } + } else { + // since we're not using LimitModeReschedule, but instead using LimitModeWait + // we do want to queue up the work to the limit mode runners and allow them + // to work through the channel backlog. A hard limit of 1000 is in place + // at which point this call would block. + // TODO when metrics are added, this should increment a wait metric + e.limitMode.in <- id + } + } else { + // no limit mode, so we're either running a regular job or + // a job with a singleton mode + // + // get the job, so we can figure out what kind it is and how + // to execute it + j := requestJobCtx(ctx, id, e.jobOutRequest) + if j == nil { + // safety check as it'd be strange bug if this occurred + // TODO add a log line here + return + } + if j.singletonMode { + // for singleton mode, get the existing runner for the job + // or spin up a new one + runner, ok := e.singletonRunners[id] + if !ok { + runner.in = make(chan uuid.UUID, 1000) + if j.singletonLimitMode == LimitModeReschedule { + runner.rescheduleLimiter = make(chan struct{}, 1) + } + e.singletonRunners[id] = runner + singletonJobsWg.Add(1) + go e.limitModeRunner("singleton-"+id.String(), runner.in, &singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) + } + + if j.singletonLimitMode == LimitModeReschedule { + // reschedule mode uses the limiter channel to check + // for a running job and reschedules if the channel is full. + select { + case runner.rescheduleLimiter <- struct{}{}: + runner.in <- id + default: + // runner is busy, reschedule the work for later + // which means we just skip it here and do nothing + // TODO when metrics are added, this should increment a rescheduled metric + select { + case e.jobIDsOut <- id: + default: + } + } + } else { + // wait mode, fill up that queue (buffered channel, so it's ok) + runner.in <- id + } + } else { + select { + case <-e.stopCh: + e.stop(&standardJobsWg, &singletonJobsWg, &limitModeJobsWg) + return + default: + } + // we've gotten to the basic / standard jobs -- + // the ones without anything special that just want + // to be run. Add to the WaitGroup so that + // stopping or shutting down can wait for the jobs to + // complete. + standardJobsWg.Add(1) + go func(j internalJob) { + e.runJob(j) + standardJobsWg.Done() + }(*j) + } + } + }() + case <-e.stopCh: + e.stop(&standardJobsWg, &singletonJobsWg, &limitModeJobsWg) + return + } + } +} + +func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) { + e.logger.Debug("stopping executor") + // we've been asked to stop. This is either because the scheduler has been told + // to stop all jobs or the scheduler has been asked to completely shutdown. + // + // cancel tells all the functions to stop their work and send in a done response + e.cancel() + + // the wait for job channels are used to report back whether we successfully waited + // for all jobs to complete or if we hit the configured timeout. + waitForJobs := make(chan struct{}, 1) + waitForSingletons := make(chan struct{}, 1) + waitForLimitMode := make(chan struct{}, 1) + + // the waiter context is used to cancel the functions waiting on jobs. + // this is done to avoid goroutine leaks. + waiterCtx, waiterCancel := context.WithCancel(context.Background()) + + // wait for standard jobs to complete + go func() { + e.logger.Debug("waiting for standard jobs to complete") + go func() { + // this is done in a separate goroutine, so we aren't + // blocked by the WaitGroup's Wait call in the event + // that the waiter context is cancelled. + // This particular goroutine could leak in the event that + // some long-running standard job doesn't complete. + standardJobsWg.Wait() + e.logger.Debug("standard jobs completed") + waitForJobs <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // wait for per job singleton limit mode runner jobs to complete + go func() { + e.logger.Debug("waiting for singleton jobs to complete") + go func() { + singletonJobsWg.Wait() + e.logger.Debug("singleton jobs completed") + waitForSingletons <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // wait for limit mode runners to complete + go func() { + e.logger.Debug("waiting for limit mode jobs to complete") + go func() { + limitModeJobsWg.Wait() + e.logger.Debug("limitMode jobs completed") + waitForLimitMode <- struct{}{} + }() + <-waiterCtx.Done() + }() + + // now either wait for all the jobs to complete, + // or hit the timeout. + var count int + timeout := time.Now().Add(e.stopTimeout) + for time.Now().Before(timeout) && count < 3 { + select { + case <-waitForJobs: + count++ + case <-waitForSingletons: + count++ + // emptying the singleton runner map + // as we'll need to spin up new runners + // in the event that the scheduler is started again + e.singletonRunners = make(map[uuid.UUID]singletonRunner) + case <-waitForLimitMode: + count++ + default: + } + } + if count < 3 { + e.done <- ErrStopJobsTimedOut + e.logger.Debug("executor stopped - timed out") + } else { + e.done <- nil + e.logger.Debug("executor stopped") + } + waiterCancel() +} + +func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { + e.logger.Debug("limitModeRunner starting", "name", name) + for { + select { + case id := <-in: + log.Println("limitModeRunner got job", id) + select { + case <-e.ctx.Done(): + e.logger.Debug("limitModeRunner shutting down", "name", name) + wg.Done() + return + default: + } + + ctx, cancel := context.WithCancel(e.ctx) + j := requestJobCtx(ctx, id, e.jobOutRequest) + if j != nil { + log.Println("limitModeRunner running job", id) + e.runJob(*j) + } + cancel() + log.Println("limitModeRunner finished job", id) + + // remove the limiter block to allow another job to be scheduled + if limitMode == LimitModeReschedule { + select { + case <-rescheduleLimiter: + default: + } + } + log.Println("limitModeRunner job done", id) + case <-e.ctx.Done(): + e.logger.Debug("limitModeRunner shutting down", "name", name) + wg.Done() + return + } + } +} + +func (e *executor) runJob(j internalJob) { + select { + case <-e.ctx.Done(): + case <-j.ctx.Done(): + default: + if e.elector != nil { + if err := e.elector.IsLeader(j.ctx); err != nil { + return + } + } + _ = callJobFuncWithParams(j.beforeJobRuns, j.id) + + select { + case <-e.ctx.Done(): + return + case <-j.ctx.Done(): + return + case e.jobIDsOut <- j.id: + } + + err := callJobFuncWithParams(j.function, j.parameters...) + if err != nil { + _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, err) + } else { + _ = callJobFuncWithParams(j.afterJobRuns, j.id) + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2f41e00 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/go-co-op/gocron/v2 + +go 1.21 + +require ( + github.com/google/uuid v1.4.0 + github.com/jonboulle/clockwork v0.4.0 + github.com/robfig/cron/v3 v3.0.1 + github.com/stretchr/testify v1.8.4 + go.uber.org/goleak v1.3.0 + golang.org/x/exp v0.0.0-20231006140011-7918f672742d +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..529bbf4 --- /dev/null +++ b/go.sum @@ -0,0 +1,28 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= +golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job.go b/job.go new file mode 100644 index 0000000..47aece8 --- /dev/null +++ b/job.go @@ -0,0 +1,779 @@ +package gocron + +import ( + "context" + "errors" + "fmt" + "math/rand" + "slices" + "strings" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" + "github.com/robfig/cron/v3" +) + +// internalJob stores the information needed by the scheduler +// to manage scheduling, starting and stopping the job +type internalJob struct { + ctx context.Context + cancel context.CancelFunc + id uuid.UUID + name string + tags []string + jobSchedule + lastRun, nextRun time.Time + function any + parameters []any + timer clockwork.Timer + singletonMode bool + singletonLimitMode LimitMode + limitRunsTo *limitRunsTo + startTime time.Time + startImmediately bool + // event listeners + afterJobRuns func(jobID uuid.UUID) + beforeJobRuns func(jobID uuid.UUID) + afterJobRunsWithError func(jobID uuid.UUID, err error) +} + +// stop is used to stop the job's timer and cancel the context +// stopping the timer is critical for cleaning up jobs that are +// sleeping in a time.AfterFunc timer when the job is being stopped. +// cancelling the context keeps the executor from continuing to try +// and run the job. +func (j *internalJob) stop() { + if j.timer != nil { + j.timer.Stop() + } + j.cancel() +} + +// task stores the function and parameters +// that are actually run when the job is executed. +type task struct { + function any + parameters []any +} + +// Task defines a function that returns the task +// function and parameters. +type Task func() task + +// NewTask provides the job's task function and parameters. +func NewTask(function any, parameters ...any) Task { + return func() task { + return task{ + function: function, + parameters: parameters, + } + } +} + +// limitRunsTo is used for managing the number of runs +// when the user only wants the job to run a certain +// number of times and then be removed from the scheduler. +type limitRunsTo struct { + limit uint + runCount uint +} + +// ----------------------------------------------- +// ----------------------------------------------- +// --------------- Job Variants ------------------ +// ----------------------------------------------- +// ----------------------------------------------- + +// JobDefinition defines the interface that must be +// implemented to create a job from the definition. +type JobDefinition interface { + setup(*internalJob, *time.Location) error +} + +var _ JobDefinition = (*cronJobDefinition)(nil) + +type cronJobDefinition struct { + crontab string + withSeconds bool +} + +func (c cronJobDefinition) setup(j *internalJob, location *time.Location) error { + var withLocation string + if strings.HasPrefix(c.crontab, "TZ=") || strings.HasPrefix(c.crontab, "CRON_TZ=") { + withLocation = c.crontab + } else { + // since the user didn't provide a timezone default to the location + // passed in by the scheduler. Default: time.Local + withLocation = fmt.Sprintf("CRON_TZ=%s %s", location.String(), c.crontab) + } + + var ( + cronSchedule cron.Schedule + err error + ) + + if c.withSeconds { + p := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + cronSchedule, err = p.Parse(withLocation) + } else { + cronSchedule, err = cron.ParseStandard(withLocation) + } + if err != nil { + return errors.Join(ErrCronJobParse, err) + } + + j.jobSchedule = &cronJob{cronSchedule: cronSchedule} + return nil +} + +// CronJob defines a new job using the crontab syntax: `* * * * *`. +// An optional 6th field can be used at the beginning if withSeconds +// is set to true: `* * * * * *`. +// The timezone can be set on the Scheduler using WithLocation, or in the +// crontab in the form `TZ=America/Chicago * * * * *` or +// `CRON_TZ=America/Chicago * * * * *` +func CronJob(crontab string, withSeconds bool) JobDefinition { + return cronJobDefinition{ + crontab: crontab, + withSeconds: withSeconds, + } +} + +var _ JobDefinition = (*durationJobDefinition)(nil) + +type durationJobDefinition struct { + duration time.Duration +} + +func (d durationJobDefinition) setup(j *internalJob, _ *time.Location) error { + j.jobSchedule = &durationJob{duration: d.duration} + return nil +} + +// DurationJob defines a new job using time.Duration +// for the interval. +func DurationJob(duration time.Duration) JobDefinition { + return durationJobDefinition{ + duration: duration, + } +} + +var _ JobDefinition = (*durationRandomJobDefinition)(nil) + +type durationRandomJobDefinition struct { + min, max time.Duration +} + +func (d durationRandomJobDefinition) setup(j *internalJob, _ *time.Location) error { + if d.min >= d.max { + return ErrDurationRandomJobMinMax + } + + j.jobSchedule = &durationRandomJob{ + min: d.min, + max: d.max, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec + } + return nil +} + +// DurationRandomJob defines a new job that runs on a random interval +// between the min and max duration values provided. +// +// To achieve a similar behavior as tools that use a splay/jitter technique +// consider the median value as the baseline and the difference between the +// max-median or median-min as the splay/jitter. +// +// For example, if you want a job to run every 5 minutes, but want to add +// up to 1 min of jitter to the interval, you could use +// DurationRandomJob(4*time.Minute, 6*time.Minute) +func DurationRandomJob(minDuration, maxDuration time.Duration) JobDefinition { + return durationRandomJobDefinition{ + min: minDuration, + max: maxDuration, + } +} + +func DailyJob(interval uint, atTimes AtTimes) JobDefinition { + return dailyJobDefinition{ + interval: interval, + atTimes: atTimes, + } +} + +var _ JobDefinition = (*dailyJobDefinition)(nil) + +type dailyJobDefinition struct { + interval uint + atTimes AtTimes +} + +func (d dailyJobDefinition) setup(j *internalJob, location *time.Location) error { + atTimesDate, err := convertAtTimesToDateTime(d.atTimes, location) + switch { + case errors.Is(err, errAtTimesNil): + return ErrDailyJobAtTimesNil + case errors.Is(err, errAtTimeNil): + return ErrDailyJobAtTimeNil + case errors.Is(err, errAtTimeHours): + return ErrDailyJobHours + case errors.Is(err, errAtTimeMinSec): + return ErrDailyJobMinutesSeconds + } + + ds := dailyJob{ + interval: d.interval, + atTimes: atTimesDate, + } + j.jobSchedule = ds + return nil +} + +var _ JobDefinition = (*weeklyJobDefinition)(nil) + +type weeklyJobDefinition struct { + interval uint + daysOfTheWeek Weekdays + atTimes AtTimes +} + +func (w weeklyJobDefinition) setup(j *internalJob, location *time.Location) error { + var ws weeklyJob + ws.interval = w.interval + + if w.daysOfTheWeek == nil { + return ErrWeeklyJobDaysOfTheWeekNil + } + + daysOfTheWeek := w.daysOfTheWeek() + + slices.Sort(daysOfTheWeek) + + atTimesDate, err := convertAtTimesToDateTime(w.atTimes, location) + switch { + case errors.Is(err, errAtTimesNil): + return ErrWeeklyJobAtTimesNil + case errors.Is(err, errAtTimeNil): + return ErrWeeklyJobAtTimeNil + case errors.Is(err, errAtTimeHours): + return ErrWeeklyJobHours + case errors.Is(err, errAtTimeMinSec): + return ErrWeeklyJobMinutesSeconds + } + ws.atTimes = atTimesDate + + j.jobSchedule = ws + return nil +} + +type Weekdays func() []time.Weekday + +func NewWeekdays(weekday time.Weekday, weekdays ...time.Weekday) Weekdays { + return func() []time.Weekday { + return append(weekdays, weekday) + } +} + +func WeeklyJob(interval uint, daysOfTheWeek Weekdays, atTimes AtTimes) JobDefinition { + return weeklyJobDefinition{ + interval: interval, + daysOfTheWeek: daysOfTheWeek, + atTimes: atTimes, + } +} + +var _ JobDefinition = (*monthlyJobDefinition)(nil) + +type monthlyJobDefinition struct { + interval uint + daysOfTheMonth DaysOfTheMonth + atTimes AtTimes +} + +func (m monthlyJobDefinition) setup(j *internalJob, location *time.Location) error { + var ms monthlyJob + ms.interval = m.interval + + if m.daysOfTheMonth == nil { + return ErrMonthlyJobDaysNil + } + + var daysStart, daysEnd []int + for _, day := range m.daysOfTheMonth() { + if day > 31 || day == 0 || day < -31 { + return ErrMonthlyJobDays + } + if day > 0 { + daysStart = append(daysStart, day) + } else { + daysEnd = append(daysEnd, day) + } + } + daysStart = removeSliceDuplicatesInt(daysStart) + slices.Sort(daysStart) + ms.days = daysStart + + daysEnd = removeSliceDuplicatesInt(daysEnd) + slices.Sort(daysEnd) + ms.daysFromEnd = daysEnd + + atTimesDate, err := convertAtTimesToDateTime(m.atTimes, location) + switch { + case errors.Is(err, errAtTimesNil): + return ErrMonthlyJobAtTimesNil + case errors.Is(err, errAtTimeNil): + return ErrMonthlyJobAtTimeNil + case errors.Is(err, errAtTimeHours): + return ErrMonthlyJobHours + case errors.Is(err, errAtTimeMinSec): + return ErrMonthlyJobMinutesSeconds + } + ms.atTimes = atTimesDate + + j.jobSchedule = ms + return nil +} + +type days []int + +// DaysOfTheMonth defines a function that returns a list of days. +type DaysOfTheMonth func() days + +// NewDaysOfTheMonth provide the days of the month the job should +// run. The days can be positive 1 to 31 and/or negative -31 to -1. +// Negative values count backwards from the end of the month. +// For example: -1 == the last day of the month. +// +// -5 == 5 days before the end of the month. +func NewDaysOfTheMonth(day int, moreDays ...int) DaysOfTheMonth { + return func() days { + moreDays = append(moreDays, day) + return moreDays + } +} + +type atTime struct { + hours, minutes, seconds uint +} + +func (a atTime) time(location *time.Location) time.Time { + return time.Date(0, 0, 0, int(a.hours), int(a.minutes), int(a.seconds), 0, location) +} + +// AtTime defines a function that returns the internal atTime +type AtTime func() atTime + +// NewAtTime provide the hours, minutes and seconds at which +// the job should be run +func NewAtTime(hours, minutes, seconds uint) AtTime { + return func() atTime { + return atTime{hours: hours, minutes: minutes, seconds: seconds} + } +} + +// AtTimes define a list of AtTime +type AtTimes func() []AtTime + +// NewAtTimes provide the hours, minutes and seconds at which +// the job should be run +func NewAtTimes(atTime AtTime, atTimes ...AtTime) AtTimes { + return func() []AtTime { + atTimes = append(atTimes, atTime) + return atTimes + } +} + +// MonthlyJob runs the job on the interval of months, on the specific days of the month +// specified, and at the set times. Days of the month can be 1 to 31 or negative (-1 to -31), which +// count backwards from the end of the month. E.g. -1 is the last day of the month. +// +// If a day of the month is selected that does not exist in all months (e.g. 31st) +// any month that does not have that day will be skipped. +// +// By default, the job will start the next available day, considering the last run to be now, +// and the time and month based on the interval, days and times you input. +// This means, if you select an interval greater than 1, your job by default will run +// X (interval) months from now. +// You can use WithStartAt to tell the scheduler to start the job sooner. +// +// Carefully consider your configuration! +// - For example: an interval of 2 months on the 31st of each month, starting 12/31 +// would skip Feb, April, June, and next run would be in August. +func MonthlyJob(interval uint, daysOfTheMonth DaysOfTheMonth, atTimes AtTimes) JobDefinition { + return monthlyJobDefinition{ + interval: interval, + daysOfTheMonth: daysOfTheMonth, + atTimes: atTimes, + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ----------------- Job Options ----------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type JobOption func(*internalJob) error + +func WithEventListeners(eventListeners ...EventListener) JobOption { + return func(j *internalJob) error { + for _, eventListener := range eventListeners { + if err := eventListener(j); err != nil { + return err + } + } + return nil + } +} + +// WithLimitedRuns limits the number of executions of this job to n. +// Upon reaching the limit, the job is removed from the scheduler. +func WithLimitedRuns(limit uint) JobOption { + return func(j *internalJob) error { + j.limitRunsTo = &limitRunsTo{ + limit: limit, + runCount: 0, + } + return nil + } +} + +// WithName sets the name of the job. Name provides +// a human-readable identifier for the job. +func WithName(name string) JobOption { + // TODO use the name for metrics and future logging option + return func(j *internalJob) error { + if name == "" { + return ErrWithNameEmpty + } + j.name = name + return nil + } +} + +func WithSingletonMode(mode LimitMode) JobOption { + return func(j *internalJob) error { + j.singletonMode = true + j.singletonLimitMode = mode + return nil + } +} + +// WithStartAt sets the option for starting the job +func WithStartAt(option StartAtOption) JobOption { + return func(j *internalJob) error { + return option(j) + } +} + +// StartAtOption defines options for starting the job +type StartAtOption func(*internalJob) error + +// WithStartImmediately tells the scheduler to run the job immediately +// regardless of the type or schedule of job. After this immediate run +// the job is scheduled from this time based on the job definition. +func WithStartImmediately() StartAtOption { + return func(j *internalJob) error { + j.startImmediately = true + return nil + } +} + +// WithStartDateTime sets the first date & time at which the job should run. +func WithStartDateTime(start time.Time) StartAtOption { + return func(j *internalJob) error { + if start.IsZero() || start.Before(time.Now()) { + return ErrWithStartDateTimePast + } + j.startTime = start + return nil + } +} + +func WithTags(tags ...string) JobOption { + return func(j *internalJob) error { + j.tags = tags + return nil + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ------------- Job Event Listeners ------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type EventListener func(*internalJob) error + +func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.afterJobRuns = eventListenerFunc + return nil + } +} + +func AfterJobRunsWithError(eventListenerFunc func(jobID uuid.UUID, err error)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.afterJobRunsWithError = eventListenerFunc + return nil + } +} + +func BeforeJobRuns(eventListenerFunc func(jobID uuid.UUID)) EventListener { + return func(j *internalJob) error { + if eventListenerFunc == nil { + return ErrEventListenerFuncNil + } + j.beforeJobRuns = eventListenerFunc + return nil + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ---------------- Job Schedules ---------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type jobSchedule interface { + next(lastRun time.Time) time.Time +} + +var _ jobSchedule = (*cronJob)(nil) + +type cronJob struct { + cronSchedule cron.Schedule +} + +func (j *cronJob) next(lastRun time.Time) time.Time { + return j.cronSchedule.Next(lastRun) +} + +var _ jobSchedule = (*durationJob)(nil) + +type durationJob struct { + duration time.Duration +} + +func (j *durationJob) next(lastRun time.Time) time.Time { + return lastRun.Add(j.duration) +} + +var _ jobSchedule = (*durationRandomJob)(nil) + +type durationRandomJob struct { + min, max time.Duration + rand *rand.Rand +} + +func (j *durationRandomJob) next(lastRun time.Time) time.Time { + r := j.rand.Int63n(int64(j.max - j.min)) + return lastRun.Add(j.min + time.Duration(r)) +} + +var _ jobSchedule = (*dailyJob)(nil) + +type dailyJob struct { + interval uint + atTimes []time.Time +} + +func (d dailyJob) next(lastRun time.Time) time.Time { + next := d.nextDay(lastRun) + if !next.IsZero() { + return next + } + startNextDay := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(d.interval), 0, 0, 0, lastRun.Nanosecond(), lastRun.Location()) + return d.nextDay(startNextDay) +} + +func (d dailyJob) nextDay(lastRun time.Time) time.Time { + for _, at := range d.atTimes { + // sub the at time hour/min/sec onto the lastRun's values + // to use in checks to see if we've got our next run time + atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) + + if atDate.After(lastRun) { + // checking to see if it is after i.e. greater than, + // and not greater or equal as our lastRun day/time + // will be in the loop, and we don't want to select it again + return atDate + } + } + return time.Time{} +} + +var _ jobSchedule = (*weeklyJob)(nil) + +type weeklyJob struct { + interval uint + daysOfWeek []time.Weekday + atTimes []time.Time +} + +func (w weeklyJob) next(lastRun time.Time) time.Time { + next := w.nextWeekDayAtTime(lastRun) + if !next.IsZero() { + return next + } + + startOfTheNextIntervalWeek := (lastRun.Day() - int(lastRun.Weekday())) + int(w.interval*7) + from := time.Date(lastRun.Year(), lastRun.Month(), startOfTheNextIntervalWeek, 0, 0, 0, 0, lastRun.Location()) + return w.nextWeekDayAtTime(from) +} + +func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time) time.Time { + for _, wd := range w.daysOfWeek { + // checking if we're on the same day or later in the same week + if wd >= lastRun.Weekday() { + // weekDayDiff is used to add the correct amount to the atDate day below + weekDayDiff := wd - lastRun.Weekday() + for _, at := range w.atTimes { + // sub the at time hour/min/sec onto the lastRun's values + // to use in checks to see if we've got our next run time + atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) + + if atDate.After(lastRun) { + // checking to see if it is after i.e. greater than, + // and not greater or equal as our lastRun day/time + // will be in the loop, and we don't want to select it again + return atDate + } + } + } + } + return time.Time{} +} + +var _ jobSchedule = (*monthlyJob)(nil) + +type monthlyJob struct { + interval uint + days []int + daysFromEnd []int + atTimes []time.Time +} + +func (m monthlyJob) next(lastRun time.Time) time.Time { + daysList := make([]int, len(m.days)) + copy(daysList, m.days) + firstDayNextMonth := time.Date(lastRun.Year(), lastRun.Month()+1, 1, 0, 0, 0, 0, lastRun.Location()) + for _, daySub := range m.daysFromEnd { + // getting a combined list of all the daysList and the negative daysList + // which count backwards from the first day of the next month + // -1 == the last day of the month + day := firstDayNextMonth.AddDate(0, 0, daySub).Day() + daysList = append(daysList, day) + } + slices.Sort(daysList) + + next := m.nextMonthDayAtTime(lastRun, daysList) + if !next.IsZero() { + return next + } + + from := time.Date(lastRun.Year(), lastRun.Month()+time.Month(m.interval), 1, 0, 0, 0, 0, lastRun.Location()) + for next.IsZero() { + next = m.nextMonthDayAtTime(from, daysList) + from = from.AddDate(0, int(m.interval), 0) + } + + return next +} + +func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int) time.Time { + // find the next day in the month that should run and then check for an at time + for _, day := range days { + if day >= lastRun.Day() { + for _, at := range m.atTimes { + // sub the day, and the at time hour/min/sec onto the lastRun's values + // to use in checks to see if we've got our next run time + atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) + + if atDate.Month() != lastRun.Month() { + // this check handles if we're setting a day not in the current month + // e.g. setting day 31 in Feb results in March 2nd + continue + } + + if atDate.After(lastRun) { + // checking to see if it is after i.e. greater than, + // and not greater or equal as our lastRun day/time + // will be in the loop, and we don't want to select it again + return atDate + } + } + continue + } + } + return time.Time{} +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ---------------- Job Interface ---------------- +// ----------------------------------------------- +// ----------------------------------------------- + +// Job provides the available methods on the job +// available to the caller. +type Job interface { + ID() uuid.UUID + LastRun() (time.Time, error) + Name() string + NextRun() (time.Time, error) + Tags() []string +} + +var _ Job = (*job)(nil) + +// job is the internal struct that implements +// the public interface. This is used to avoid +// leaking information the caller never needs +// to have or tinker with. +type job struct { + id uuid.UUID + name string + tags []string + jobOutRequest chan jobOutRequest +} + +// ID returns the job's unique identifier. +func (j job) ID() uuid.UUID { + return j.id +} + +// LastRun returns the time of the job's last run +func (j job) LastRun() (time.Time, error) { + ij := requestJob(j.id, j.jobOutRequest) + if ij == nil || ij.id == uuid.Nil { + return time.Time{}, ErrJobNotFound + } + return ij.lastRun, nil +} + +// Name returns the name defined on the job. +func (j job) Name() string { + return j.name +} + +// NextRun returns the time of the job's next scheduled run. +func (j job) NextRun() (time.Time, error) { + ij := requestJob(j.id, j.jobOutRequest) + if ij == nil || ij.id == uuid.Nil { + return time.Time{}, ErrJobNotFound + } + return ij.nextRun, nil +} + +// Tags returns the job's string tags. +func (j job) Tags() []string { + return j.tags +} diff --git a/job_test.go b/job_test.go new file mode 100644 index 0000000..82a521c --- /dev/null +++ b/job_test.go @@ -0,0 +1,430 @@ +package gocron + +import ( + "math/rand" + "testing" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDurationJob_next(t *testing.T) { + tests := []time.Duration{ + time.Millisecond, + time.Second, + 100 * time.Second, + 1000 * time.Second, + 5 * time.Second, + 50 * time.Second, + time.Minute, + 5 * time.Minute, + 100 * time.Minute, + time.Hour, + 2 * time.Hour, + 100 * time.Hour, + 1000 * time.Hour, + } + + lastRun := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + for _, duration := range tests { + t.Run(duration.String(), func(t *testing.T) { + d := durationJob{duration: duration} + next := d.next(lastRun) + expected := lastRun.Add(duration) + + assert.Equal(t, expected, next) + }) + } +} + +func TestDailyJob_next(t *testing.T) { + tests := []struct { + name string + interval uint + atTimes []time.Time + lastRun time.Time + expectedNextRun time.Time + expectedDurationToNextRun time.Duration + }{ + { + "daily multiple at times", + 1, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + time.Date(0, 0, 0, 12, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 1, 12, 30, 0, 0, time.UTC), + 7 * time.Hour, + }, + { + "every 2 days multiple at times", + 2, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + time.Date(0, 0, 0, 12, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 12, 30, 0, 0, time.UTC), + time.Date(2000, 1, 3, 5, 30, 0, 0, time.UTC), + 41 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d := dailyJob{ + interval: tt.interval, + atTimes: tt.atTimes, + } + + next := d.next(tt.lastRun) + assert.Equal(t, tt.expectedNextRun, next) + assert.Equal(t, tt.expectedDurationToNextRun, next.Sub(tt.lastRun)) + }) + } +} + +func TestWeeklyJob_next(t *testing.T) { + tests := []struct { + name string + interval uint + daysOfWeek []time.Weekday + atTimes []time.Time + lastRun time.Time + expectedNextRun time.Time + expectedDurationToNextRun time.Duration + }{ + { + "last run Monday, next run is Thursday", + 1, + []time.Weekday{time.Monday, time.Thursday}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 3, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 6, 5, 30, 0, 0, time.UTC), + 3 * 24 * time.Hour, + }, + { + "last run Thursday, next run is Monday", + 1, + []time.Weekday{time.Monday, time.Thursday}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 6, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 10, 5, 30, 0, 0, time.UTC), + 4 * 24 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := weeklyJob{ + interval: tt.interval, + daysOfWeek: tt.daysOfWeek, + atTimes: tt.atTimes, + } + + next := w.next(tt.lastRun) + assert.Equal(t, tt.expectedNextRun, next) + assert.Equal(t, tt.expectedDurationToNextRun, next.Sub(tt.lastRun)) + }) + } +} + +func TestMonthlyJob_next(t *testing.T) { + americaChicago, err := time.LoadLocation("America/Chicago") + require.NoError(t, err) + + tests := []struct { + name string + interval uint + days []int + daysFromEnd []int + atTimes []time.Time + lastRun time.Time + expectedNextRun time.Time + expectedDurationToNextRun time.Duration + }{ + { + "same day - before at time", + 1, + []int{1}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + 5*time.Hour + 30*time.Minute, + }, + { + "same day - after at time, runs next available date", + 1, + []int{1, 10}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 10, 5, 30, 0, 0, time.UTC), + 9 * 24 * time.Hour, + }, + { + "same day - after at time, runs next available date, following interval month", + 2, + []int{1}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 1, 5, 30, 0, 0, time.UTC), + time.Date(2000, 3, 1, 5, 30, 0, 0, time.UTC), + 60 * 24 * time.Hour, + }, + { + "daylight savings time", + 1, + []int{5}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, americaChicago), + }, + time.Date(2023, 11, 1, 0, 0, 0, 0, americaChicago), + time.Date(2023, 11, 5, 5, 30, 0, 0, americaChicago), + 4*24*time.Hour + 6*time.Hour + 30*time.Minute, + }, + { + "negative days", + 1, + nil, + []int{-1, -3, -5}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 29, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 31, 5, 30, 0, 0, time.UTC), + 2 * 24 * time.Hour, + }, + { + "day not in current month, runs next month (leap year)", + 1, + []int{31}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 31, 5, 30, 0, 0, time.UTC), + time.Date(2000, 3, 31, 5, 30, 0, 0, time.UTC), + 29*24*time.Hour + 31*24*time.Hour, + }, + { + "multiple days not in order", + 1, + []int{10, 7, 19, 2}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2000, 1, 2, 5, 30, 0, 0, time.UTC), + time.Date(2000, 1, 7, 5, 30, 0, 0, time.UTC), + 5 * 24 * time.Hour, + }, + { + "day not in next interval month, selects next available option, skips Feb, April & June", + 2, + []int{31}, + nil, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(1999, 12, 31, 5, 30, 0, 0, time.UTC), + time.Date(2000, 8, 31, 5, 30, 0, 0, time.UTC), + 244 * 24 * time.Hour, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := monthlyJob{ + interval: tt.interval, + days: tt.days, + daysFromEnd: tt.daysFromEnd, + atTimes: tt.atTimes, + } + + next := m.next(tt.lastRun) + assert.Equal(t, tt.expectedNextRun, next) + assert.Equal(t, tt.expectedDurationToNextRun, next.Sub(tt.lastRun)) + }) + } +} + +func TestDurationRandomJob_next(t *testing.T) { + tests := []struct { + name string + min time.Duration + max time.Duration + lastRun time.Time + expectedMin time.Time + expectedMax time.Time + }{ + { + "min 1s, max 5s", + time.Second, + 5 * time.Second, + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), + time.Date(2000, 1, 1, 0, 0, 5, 0, time.UTC), + }, + { + "min 100ms, max 1s", + 100 * time.Millisecond, + 1 * time.Second, + time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + time.Date(2000, 1, 1, 0, 0, 0, 100000000, time.UTC), + time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rj := durationRandomJob{ + min: tt.min, + max: tt.max, + rand: rand.New(rand.NewSource(time.Now().UnixNano())), // nolint:gosec + } + + for i := 0; i < 100; i++ { + next := rj.next(tt.lastRun) + assert.GreaterOrEqual(t, next, tt.expectedMin) + assert.LessOrEqual(t, next, tt.expectedMax) + } + }) + } +} + +func TestJob_LastRun(t *testing.T) { + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local) + fakeClock := clockwork.NewFakeClockAt(testTime) + + s, err := newTestScheduler( + WithClock(fakeClock), + ) + require.NoError(t, err) + + j, err := s.NewJob( + DurationJob( + time.Second, + ), + NewTask( + func() {}, + ), + WithStartAt(WithStartImmediately()), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(10 * time.Millisecond) + + lastRun, err := j.LastRun() + assert.NoError(t, err) + + err = s.Shutdown() + require.NoError(t, err) + + assert.Equal(t, testTime, lastRun) +} + +func TestWithEventListeners(t *testing.T) { + tests := []struct { + name string + eventListeners []EventListener + err error + }{ + { + "no event listeners", + nil, + nil, + }, + { + "afterJobRuns", + []EventListener{ + AfterJobRuns(func(_ uuid.UUID) {}), + }, + nil, + }, + { + "afterJobRunsWithError", + []EventListener{ + AfterJobRunsWithError(func(_ uuid.UUID, _ error) {}), + }, + nil, + }, + { + "beforeJobRuns", + []EventListener{ + BeforeJobRuns(func(_ uuid.UUID) {}), + }, + nil, + }, + { + "multiple event listeners", + []EventListener{ + AfterJobRuns(func(_ uuid.UUID) {}), + AfterJobRunsWithError(func(_ uuid.UUID, _ error) {}), + BeforeJobRuns(func(_ uuid.UUID) {}), + }, + nil, + }, + { + "nil after job runs listener", + []EventListener{ + AfterJobRuns(nil), + }, + ErrEventListenerFuncNil, + }, + { + "nil after job runs with error listener", + []EventListener{ + AfterJobRunsWithError(nil), + }, + ErrEventListenerFuncNil, + }, + { + "nil before job runs listener", + []EventListener{ + BeforeJobRuns(nil), + }, + ErrEventListenerFuncNil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ij internalJob + err := WithEventListeners(tt.eventListeners...)(&ij) + assert.Equal(t, tt.err, err) + + if err != nil { + return + } + var count int + if ij.afterJobRuns != nil { + count++ + } + if ij.afterJobRunsWithError != nil { + count++ + } + if ij.beforeJobRuns != nil { + count++ + } + assert.Equal(t, len(tt.eventListeners), count) + }) + } +} diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..39982c9 --- /dev/null +++ b/logger.go @@ -0,0 +1,67 @@ +package gocron + +import ( + "log/slog" + "os" +) + +// Logger is the interface that wraps the basic logging methods +// used by gocron. The methods are modeled after the standard +// library slog package. The default logger is a no-op logger. +// To enable logging, use one of the provided New*Logger functions +// or implement your own Logger. The actual level of Log that is logged +// is handled by the implementation. +type Logger interface { + Debug(msg string, args ...any) + Error(msg string, args ...any) + Info(msg string, args ...any) + Warn(msg string, args ...any) +} + +var _ Logger = (*noOpLogger)(nil) + +type noOpLogger struct{} + +func (l noOpLogger) Debug(_ string, _ ...any) {} +func (l noOpLogger) Error(_ string, _ ...any) {} +func (l noOpLogger) Info(_ string, _ ...any) {} +func (l noOpLogger) Warn(_ string, _ ...any) {} + +var _ Logger = (*slogLogger)(nil) + +type slogLogger struct { + sl *slog.Logger +} + +func NewJSONSlogLogger(level slog.Level) Logger { + return NewSlogLogger( + slog.New( + slog.NewJSONHandler( + os.Stdout, + &slog.HandlerOptions{ + Level: level, + }, + ), + ), + ) +} + +func NewSlogLogger(sl *slog.Logger) Logger { + return &slogLogger{sl: sl} +} + +func (l *slogLogger) Debug(msg string, args ...any) { + l.sl.Debug(msg, args...) +} + +func (l *slogLogger) Error(msg string, args ...any) { + l.sl.Error(msg, args...) +} + +func (l *slogLogger) Info(msg string, args ...any) { + l.sl.Info(msg, args...) +} + +func (l *slogLogger) Warn(msg string, args ...any) { + l.sl.Warn(msg, args...) +} diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..9ec670e --- /dev/null +++ b/scheduler.go @@ -0,0 +1,633 @@ +package gocron + +import ( + "context" + "reflect" + "slices" + "time" + + "github.com/google/uuid" + "github.com/jonboulle/clockwork" +) + +var _ Scheduler = (*scheduler)(nil) + +type Scheduler interface { + Jobs() []Job + NewJob(JobDefinition, Task, ...JobOption) (Job, error) + RemoveByTags(...string) + RemoveJob(uuid.UUID) error + Start() + StopJobs() error + Shutdown() error + Update(uuid.UUID, JobDefinition, Task, ...JobOption) (Job, error) +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ----------------- Scheduler ------------------- +// ----------------------------------------------- +// ----------------------------------------------- + +type scheduler struct { + shutdownCtx context.Context + shutdownCancel context.CancelFunc + exec executor + jobs map[uuid.UUID]internalJob + location *time.Location + clock clockwork.Clock + started bool + globalJobOptions []JobOption + logger Logger + + startCh chan struct{} + startedCh chan struct{} + stopCh chan struct{} + stopErrCh chan error + allJobsOutRequest chan allJobsOutRequest + jobOutRequestCh chan jobOutRequest + newJobCh chan internalJob + removeJobCh chan uuid.UUID + removeJobsByTagsCh chan []string +} + +type jobOutRequest struct { + id uuid.UUID + outChan chan internalJob +} + +type allJobsOutRequest struct { + outChan chan []Job +} + +func NewScheduler(options ...SchedulerOption) (Scheduler, error) { + schCtx, cancel := context.WithCancel(context.Background()) + + exec := executor{ + stopCh: make(chan struct{}), + stopTimeout: time.Second * 10, + singletonRunners: make(map[uuid.UUID]singletonRunner), + logger: &noOpLogger{}, + + jobsIDsIn: make(chan uuid.UUID), + jobIDsOut: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + done: make(chan error), + } + + s := &scheduler{ + shutdownCtx: schCtx, + shutdownCancel: cancel, + exec: exec, + jobs: make(map[uuid.UUID]internalJob), + location: time.Local, + clock: clockwork.NewRealClock(), + logger: &noOpLogger{}, + + newJobCh: make(chan internalJob), + removeJobCh: make(chan uuid.UUID), + removeJobsByTagsCh: make(chan []string), + startCh: make(chan struct{}), + startedCh: make(chan struct{}), + stopCh: make(chan struct{}), + stopErrCh: make(chan error, 1), + jobOutRequestCh: make(chan jobOutRequest), + allJobsOutRequest: make(chan allJobsOutRequest), + } + + for _, option := range options { + err := option(s) + if err != nil { + return nil, err + } + } + + go func() { + s.logger.Info("new scheduler created") + for { + select { + case id := <-s.exec.jobIDsOut: + s.selectExecJobIDsOut(id) + + case j := <-s.newJobCh: + s.selectNewJob(j) + + case id := <-s.removeJobCh: + s.selectRemoveJob(id) + + case tags := <-s.removeJobsByTagsCh: + s.selectRemoveJobsByTags(tags) + + case out := <-s.exec.jobOutRequest: + s.selectJobOutRequest(out) + + case out := <-s.jobOutRequestCh: + s.selectJobOutRequest(out) + + case out := <-s.allJobsOutRequest: + s.selectAllJobsOutRequest(out) + + case <-s.startCh: + s.selectStart() + + case <-s.stopCh: + s.stopScheduler() + + case <-s.shutdownCtx.Done(): + s.stopScheduler() + return + } + } + }() + + return s, nil +} + +// ----------------------------------------------- +// ----------------------------------------------- +// --------- Scheduler Channel Methods ----------- +// ----------------------------------------------- +// ----------------------------------------------- + +// The scheduler's channel functions are broken out here +// to allow prioritizing within the select blocks. The idea +// being that we want to make sure that scheduling tasks +// are not blocked by requests from the caller for information +// about jobs. + +func (s *scheduler) stopScheduler() { + s.logger.Debug("stopping scheduler") + if s.started { + s.exec.stopCh <- struct{}{} + } + + for _, j := range s.jobs { + j.stop() + } + for id, j := range s.jobs { + <-j.ctx.Done() + + j.ctx, j.cancel = context.WithCancel(s.shutdownCtx) + s.jobs[id] = j + } + var err error + if s.started { + select { + case err = <-s.exec.done: + case <-time.After(s.exec.stopTimeout + 1*time.Second): + err = ErrStopExecutorTimedOut + } + } + s.stopErrCh <- err + s.started = false + s.logger.Debug("scheduler stopped") +} + +func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) { + outJobs := make([]Job, len(s.jobs)) + var counter int + for _, j := range s.jobs { + outJobs[counter] = s.jobFromInternalJob(j) + counter++ + } + select { + case <-s.shutdownCtx.Done(): + case out.outChan <- outJobs: + } +} + +func (s *scheduler) selectRemoveJob(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + return + } + j.stop() + delete(s.jobs, id) +} + +func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { + j := s.jobs[id] + j.lastRun = j.nextRun + + if j.limitRunsTo != nil { + j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1 + if j.limitRunsTo.runCount == j.limitRunsTo.limit { + go func() { + select { + case <-s.shutdownCtx.Done(): + return + case s.removeJobCh <- id: + } + }() + return + } + } + + next := j.next(j.lastRun) + j.nextRun = next + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + select { + case <-s.shutdownCtx.Done(): + return + case s.exec.jobsIDsIn <- id: + } + }) + s.jobs[id] = j +} + +func (s *scheduler) selectJobOutRequest(out jobOutRequest) { + if j, ok := s.jobs[out.id]; ok { + select { + case out.outChan <- j: + case <-s.shutdownCtx.Done(): + } + } + close(out.outChan) +} + +func (s *scheduler) selectNewJob(j internalJob) { + if s.started { + next := j.startTime + if j.startImmediately { + next = s.now() + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- j.id: + } + } else { + if next.IsZero() { + next = j.next(s.now()) + } + + id := j.id + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- id: + } + }) + } + j.nextRun = next + } + + s.jobs[j.id] = j +} + +func (s *scheduler) selectRemoveJobsByTags(tags []string) { + for _, j := range s.jobs { + for _, tag := range tags { + if slices.Contains(j.tags, tag) { + j.stop() + delete(s.jobs, j.id) + break + } + } + } +} + +func (s *scheduler) selectStart() { + s.logger.Debug("scheduler starting") + go s.exec.start() + + s.started = true + for id, j := range s.jobs { + next := j.startTime + if j.startImmediately { + next = s.now() + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- id: + } + } else { + if next.IsZero() { + next = j.next(s.now()) + } + + jobID := id + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + select { + case <-s.shutdownCtx.Done(): + case s.exec.jobsIDsIn <- jobID: + } + }) + } + j.nextRun = next + s.jobs[id] = j + } + select { + case <-s.shutdownCtx.Done(): + case s.startedCh <- struct{}{}: + s.logger.Info("scheduler started") + } +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ------------- Scheduler Methods --------------- +// ----------------------------------------------- +// ----------------------------------------------- + +func (s *scheduler) now() time.Time { + return s.clock.Now().In(s.location) +} + +func (s *scheduler) jobFromInternalJob(in internalJob) job { + return job{ + id: in.id, + name: in.name, + tags: slices.Clone(in.tags), + jobOutRequest: s.jobOutRequestCh, + } +} + +func (s *scheduler) Jobs() []Job { + outChan := make(chan []Job) + select { + case <-s.shutdownCtx.Done(): + case s.allJobsOutRequest <- allJobsOutRequest{outChan: outChan}: + } + + var jobs []Job + select { + case <-s.shutdownCtx.Done(): + case jobs = <-outChan: + } + + return jobs +} + +func (s *scheduler) NewJob(jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) { + return s.addOrUpdateJob(uuid.Nil, jobDefinition, task, options) +} + +func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskWrapper Task, options []JobOption) (Job, error) { + j := internalJob{} + if id == uuid.Nil { + j.id = uuid.New() + } else { + currentJob := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh) + if currentJob != nil && currentJob.id != uuid.Nil { + select { + case <-s.shutdownCtx.Done(): + return nil, nil + case s.removeJobCh <- id: + <-currentJob.ctx.Done() + } + } + + j.id = id + } + + j.ctx, j.cancel = context.WithCancel(s.shutdownCtx) + + if taskWrapper == nil { + return nil, ErrNewJobTaskNil + } + + tsk := taskWrapper() + taskFunc := reflect.ValueOf(tsk.function) + for taskFunc.Kind() == reflect.Ptr { + taskFunc = taskFunc.Elem() + } + + if taskFunc.Kind() != reflect.Func { + return nil, ErrNewJobTask + } + + j.function = tsk.function + j.parameters = tsk.parameters + + // apply global job options + for _, option := range s.globalJobOptions { + if err := option(&j); err != nil { + return nil, err + } + } + + // apply job specific options, which take precedence + for _, option := range options { + if err := option(&j); err != nil { + return nil, err + } + } + + if err := definition.setup(&j, s.location); err != nil { + return nil, err + } + + select { + case <-s.shutdownCtx.Done(): + case s.newJobCh <- j: + } + + return &job{ + id: j.id, + name: j.name, + tags: slices.Clone(j.tags), + jobOutRequest: s.jobOutRequestCh, + }, nil +} + +func (s *scheduler) RemoveByTags(tags ...string) { + select { + case <-s.shutdownCtx.Done(): + case s.removeJobsByTagsCh <- tags: + } +} + +func (s *scheduler) RemoveJob(id uuid.UUID) error { + j := requestJobCtx(s.shutdownCtx, id, s.jobOutRequestCh) + if j == nil || j.id == uuid.Nil { + return ErrJobNotFound + } + select { + case <-s.shutdownCtx.Done(): + case s.removeJobCh <- id: + } + + return nil +} + +// Start begins scheduling jobs for execution based +// on each job's definition. Job's added to an already +// running scheduler will be scheduled immediately based +// on definition. +func (s *scheduler) Start() { + select { + case <-s.shutdownCtx.Done(): + case s.startCh <- struct{}{}: + <-s.startedCh + } +} + +// StopJobs stops the execution of all jobs in the scheduler. +// This can be useful in situations where jobs need to be +// paused globally and then restarted with Start(). +func (s *scheduler) StopJobs() error { + select { + case <-s.shutdownCtx.Done(): + return nil + case s.stopCh <- struct{}{}: + } + select { + case err := <-s.stopErrCh: + return err + case <-time.After(s.exec.stopTimeout + 2*time.Second): + return ErrStopSchedulerTimedOut + } +} + +// Shutdown should be called when you no longer need +// the Scheduler or Job's as the Scheduler cannot +// be restarted after calling Shutdown. +func (s *scheduler) Shutdown() error { + s.shutdownCancel() + select { + case err := <-s.stopErrCh: + return err + case <-time.After(s.exec.stopTimeout + 2*time.Second): + return ErrStopSchedulerTimedOut + } +} + +// Update replaces the existing Job's JobDefinition with the provided +// JobDefinition. The Job's Job.ID() remains the same. +func (s *scheduler) Update(id uuid.UUID, jobDefinition JobDefinition, task Task, options ...JobOption) (Job, error) { + return s.addOrUpdateJob(id, jobDefinition, task, options) +} + +// ----------------------------------------------- +// ----------------------------------------------- +// ------------- Scheduler Options --------------- +// ----------------------------------------------- +// ----------------------------------------------- + +// SchedulerOption defines the function for setting +// options on the Scheduler. +type SchedulerOption func(*scheduler) error + +// WithDistributedElector sets the elector to be used by multiple +// Scheduler instances to determine who should be the leader. +// Only the leader runs jobs, while non-leaders wait and continue +// to check if a new leader has been elected. +func WithDistributedElector(elector Elector) SchedulerOption { + return func(s *scheduler) error { + if elector == nil { + return ErrWithDistributedElector + } + s.exec.elector = elector + return nil + } +} + +// WithClock sets the clock used by the Scheduler +// to the clock provided. See https://github.com/jonboulle/clockwork +func WithClock(clock clockwork.Clock) SchedulerOption { + return func(s *scheduler) error { + if clock == nil { + return ErrWithClockNil + } + s.clock = clock + return nil + } +} + +// WithGlobalJobOptions sets JobOption's that will be applied to +// all jobs added to the scheduler. JobOption's set on the job +// itself will override if the same JobOption is set globally. +func WithGlobalJobOptions(jobOptions ...JobOption) SchedulerOption { + return func(s *scheduler) error { + s.globalJobOptions = jobOptions + return nil + } +} + +// LimitMode defines the modes used for handling jobs that reach +// the limit provided in WithLimitConcurrentJobs +type LimitMode int + +const ( + // LimitModeReschedule causes jobs reaching the limit set in + // WithLimitConcurrentJobs or WithSingletonMode to be skipped + // and rescheduled for the next run time rather than being + // queued up to wait. + LimitModeReschedule = 1 + + // LimitModeWait causes jobs reaching the limit set in + // WithLimitConcurrentJobs or WithSingletonMode to wait + // in a queue until a slot becomes available to run. + // + // Note: this mode can produce unpredictable results as + // job execution order isn't guaranteed. For example, a job that + // executes frequently may pile up in the wait queue and be executed + // many times back to back when the queue opens. + // + // Warning: do not use this mode if your jobs will continue to stack + // up beyond the ability of the limit workers to keep up. An example of + // what NOT to do: + // + // s, _ := gocron.NewScheduler(gocron.WithLimitConcurrentJobs) + // s.NewJob( + // gocron.DurationJob( + // time.Second, + // Task{ + // Function: func() { + // time.Sleep(10 * time.Second) + // }, + // }, + // ), + // ) + LimitModeWait = 2 +) + +// WithLimitConcurrentJobs sets the limit and mode to be used by the +// Scheduler for limiting the number of jobs that may be running at +// a given time. +func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption { + return func(s *scheduler) error { + s.exec.limitMode = &limitModeConfig{ + mode: mode, + limit: limit, + in: make(chan uuid.UUID, 1000), + } + if mode == LimitModeReschedule { + s.exec.limitMode.rescheduleLimiter = make(chan struct{}, limit) + } + return nil + } +} + +// WithLocation sets the location (i.e. timezone) that the scheduler +// should operate within. In many systems time.Local is UTC. +// Default: time.Local +func WithLocation(location *time.Location) SchedulerOption { + return func(s *scheduler) error { + if location == nil { + return ErrWithLocationNil + } + s.location = location + return nil + } +} + +func WithLogger(logger Logger) SchedulerOption { + return func(s *scheduler) error { + if logger == nil { + return ErrWithLoggerNil + } + s.logger = logger + s.exec.logger = logger + return nil + } +} + +// WithStopTimeout sets the amount of time the Scheduler should +// wait gracefully for jobs to complete before returning when +// StopJobs() or Shutdown() are called. +// Default: 10 * time.Second +func WithStopTimeout(timeout time.Duration) SchedulerOption { + return func(s *scheduler) error { + s.exec.stopTimeout = timeout + return nil + } +} diff --git a/scheduler_test.go b/scheduler_test.go new file mode 100644 index 0000000..682fd2b --- /dev/null +++ b/scheduler_test.go @@ -0,0 +1,878 @@ +package gocron + +import ( + "context" + "log/slog" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func newTestScheduler(options ...SchedulerOption) (Scheduler, error) { + // default test options + out := []SchedulerOption{ + WithLogger(NewJSONSlogLogger(slog.LevelDebug)), + WithStopTimeout(time.Second), + } + + // append any additional options 2nd to override defaults if needed + out = append(out, options...) + return NewScheduler(out...) +} + +func TestScheduler_OneSecond_NoOptions(t *testing.T) { + defer goleak.VerifyNone(t) + cronNoOptionsCh := make(chan struct{}, 10) + durationNoOptionsCh := make(chan struct{}, 10) + + tests := []struct { + name string + ch chan struct{} + jd JobDefinition + tsk Task + }{ + { + "cron", + cronNoOptionsCh, + CronJob( + "* * * * * *", + true, + ), + NewTask( + func() { + cronNoOptionsCh <- struct{}{} + }, + ), + }, + { + "duration", + durationNoOptionsCh, + DurationJob( + time.Second, + ), + NewTask( + func() { + durationNoOptionsCh <- struct{}{} + }, + ), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler() + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, tt.tsk) + require.NoError(t, err) + + s.Start() + + startTime := time.Now() + var runCount int + for runCount < 1 { + <-tt.ch + runCount++ + } + err = s.Shutdown() + require.NoError(t, err) + stopTime := time.Now() + + select { + case <-tt.ch: + t.Fatal("job ran after scheduler was stopped") + case <-time.After(time.Millisecond * 50): + } + + runDuration := stopTime.Sub(startTime) + assert.GreaterOrEqual(t, runDuration, time.Millisecond) + assert.LessOrEqual(t, runDuration, 1500*time.Millisecond) + }) + } +} + +func TestScheduler_LongRunningJobs(t *testing.T) { + defer goleak.VerifyNone(t) + + durationCh := make(chan struct{}, 10) + durationSingletonCh := make(chan struct{}, 10) + + tests := []struct { + name string + ch chan struct{} + jd JobDefinition + tsk Task + opts []JobOption + options []SchedulerOption + expectedRuns int + }{ + { + "duration", + durationCh, + DurationJob( + time.Millisecond * 500, + ), + NewTask( + func() { + time.Sleep(1 * time.Second) + durationCh <- struct{}{} + }, + ), + nil, + []SchedulerOption{WithStopTimeout(time.Second * 2)}, + 3, + }, + { + "duration singleton", + durationSingletonCh, + DurationJob( + time.Millisecond * 500, + ), + NewTask( + func() { + time.Sleep(1 * time.Second) + durationSingletonCh <- struct{}{} + }, + ), + []JobOption{WithSingletonMode(LimitModeWait)}, + []SchedulerOption{WithStopTimeout(time.Second * 5)}, + 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler(tt.options...) + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, tt.tsk, tt.opts...) + require.NoError(t, err) + + s.Start() + time.Sleep(1600 * time.Millisecond) + err = s.Shutdown() + require.NoError(t, err) + + var runCount int + timeout := make(chan struct{}) + go func() { + time.Sleep(2 * time.Second) + close(timeout) + }() + Outer: + for { + select { + case <-tt.ch: + runCount++ + case <-timeout: + break Outer + } + } + + assert.Equal(t, tt.expectedRuns, runCount) + }) + } +} + +func TestScheduler_Update(t *testing.T) { + defer goleak.VerifyNone(t) + + durationJobCh := make(chan struct{}) + + tests := []struct { + name string + initialJob JobDefinition + updateJob JobDefinition + tsk Task + ch chan struct{} + runCount int + updateAfterCount int + expectedMinTime time.Duration + expectedMaxRunTime time.Duration + }{ + { + "duration, updated to another duration", + DurationJob( + time.Millisecond * 500, + ), + DurationJob( + time.Second, + ), + NewTask( + func() { + durationJobCh <- struct{}{} + }, + ), + durationJobCh, + 2, + 1, + time.Second * 1, + time.Second * 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler() + require.NoError(t, err) + + j, err := s.NewJob(tt.initialJob, tt.tsk) + require.NoError(t, err) + + startTime := time.Now() + s.Start() + + var runCount int + for runCount < tt.runCount { + select { + case <-tt.ch: + runCount++ + if runCount == tt.updateAfterCount { + _, err = s.Update(j.ID(), tt.updateJob, tt.tsk) + require.NoError(t, err) + } + default: + } + } + err = s.Shutdown() + require.NoError(t, err) + stopTime := time.Now() + + select { + case <-tt.ch: + t.Fatal("job ran after scheduler was stopped") + case <-time.After(time.Millisecond * 50): + } + + runDuration := stopTime.Sub(startTime) + assert.GreaterOrEqual(t, runDuration, tt.expectedMinTime) + assert.LessOrEqual(t, runDuration, tt.expectedMaxRunTime) + }) + } +} + +func TestScheduler_StopTimeout(t *testing.T) { + defer goleak.VerifyNone(t) + + tests := []struct { + name string + jd JobDefinition + f any + opts []JobOption + }{ + { + "duration", + DurationJob( + time.Millisecond * 100, + ), + func(testDoneCtx context.Context) { + select { + case <-time.After(1 * time.Second): + case <-testDoneCtx.Done(): + } + }, + nil, + }, + { + "duration singleton", + DurationJob( + time.Millisecond * 100, + ), + func(testDoneCtx context.Context) { + select { + case <-time.After(1 * time.Second): + case <-testDoneCtx.Done(): + } + }, + []JobOption{WithSingletonMode(LimitModeWait)}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDoneCtx, cancel := context.WithCancel(context.Background()) + s, err := newTestScheduler( + WithStopTimeout(time.Millisecond * 100), + ) + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, NewTask(tt.f, testDoneCtx), tt.opts...) + require.NoError(t, err) + + s.Start() + time.Sleep(time.Millisecond * 200) + err = s.Shutdown() + assert.ErrorIs(t, err, ErrStopJobsTimedOut) + cancel() + time.Sleep(2 * time.Second) + }) + } +} + +func TestScheduler_Shutdown(t *testing.T) { + goleak.VerifyNone(t) + + t.Run("start, stop, start, shutdown", func(t *testing.T) { + s, err := newTestScheduler( + WithStopTimeout(time.Second), + ) + require.NoError(t, err) + _, err = s.NewJob( + DurationJob( + 50*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt( + WithStartImmediately(), + ), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(50 * time.Millisecond) + require.NoError(t, s.StopJobs()) + + time.Sleep(200 * time.Millisecond) + s.Start() + + time.Sleep(50 * time.Millisecond) + require.NoError(t, s.Shutdown()) + time.Sleep(200 * time.Millisecond) + }) + + t.Run("calling Job methods after shutdown errors", func(t *testing.T) { + s, err := newTestScheduler( + WithStopTimeout(time.Second), + ) + require.NoError(t, err) + j, err := s.NewJob( + DurationJob( + 100*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt( + WithStartImmediately(), + ), + ) + require.NoError(t, err) + + s.Start() + time.Sleep(50 * time.Millisecond) + require.NoError(t, s.Shutdown()) + + _, err = j.LastRun() + assert.ErrorIs(t, err, ErrJobNotFound) + + _, err = j.NextRun() + assert.ErrorIs(t, err, ErrJobNotFound) + }) +} + +func TestScheduler_NewJob(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + jd JobDefinition + tsk Task + opts []JobOption + }{ + { + "cron with timezone", + CronJob( + "CRON_TZ=America/Chicago * * * * * *", + true, + ), + NewTask( + func() {}, + ), + nil, + }, + { + "cron with timezone, no seconds", + CronJob( + "CRON_TZ=America/Chicago * * * * *", + false, + ), + NewTask( + func() {}, + ), + nil, + }, + { + "random duration", + DurationRandomJob( + time.Second, + time.Second*5, + ), + NewTask( + func() {}, + ), + nil, + }, + { + "daily", + DailyJob( + 1, + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + NewTask( + func() {}, + ), + nil, + }, + { + "weekly", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + NewTask( + func() {}, + ), + nil, + }, + { + "monthly", + MonthlyJob( + 1, + NewDaysOfTheMonth(1, -1), + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + NewTask( + func() {}, + ), + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler() + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, tt.tsk, tt.opts...) + require.NoError(t, err) + + s.Start() + require.NoError(t, s.Shutdown()) + time.Sleep(50 * time.Millisecond) + }) + } +} + +func TestScheduler_NewJobErrors(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + jd JobDefinition + opts []JobOption + err error + }{ + { + "cron with timezone", + CronJob( + "bad cron", + true, + ), + nil, + ErrCronJobParse, + }, + { + "random with bad min/max", + DurationRandomJob( + time.Second*5, + time.Second, + ), + nil, + ErrDurationRandomJobMinMax, + }, + { + "daily job at times nil", + DailyJob( + 1, + nil, + ), + nil, + ErrDailyJobAtTimesNil, + }, + { + "daily job at time nil", + DailyJob( + 1, + NewAtTimes(nil), + ), + nil, + ErrDailyJobAtTimeNil, + }, + { + "daily job hours out of range", + DailyJob( + 1, + NewAtTimes( + NewAtTime(100, 0, 0), + ), + ), + nil, + ErrDailyJobHours, + }, + { + "daily job minutes out of range", + DailyJob( + 1, + NewAtTimes( + NewAtTime(1, 100, 0), + ), + ), + nil, + ErrDailyJobMinutesSeconds, + }, + { + "daily job seconds out of range", + DailyJob( + 1, + NewAtTimes( + NewAtTime(1, 0, 100), + ), + ), + nil, + ErrDailyJobMinutesSeconds, + }, + { + "weekly job at times nil", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + nil, + ), + nil, + ErrWeeklyJobAtTimesNil, + }, + { + "weekly job at time nil", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes(nil), + ), + nil, + ErrWeeklyJobAtTimeNil, + }, + { + "weekly job weekdays nil", + WeeklyJob( + 1, + nil, + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + nil, + ErrWeeklyJobDaysOfTheWeekNil, + }, + { + "weekly job hours out of range", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(100, 0, 0), + ), + ), + nil, + ErrWeeklyJobHours, + }, + { + "weekly job minutes out of range", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(1, 100, 0), + ), + ), + nil, + ErrWeeklyJobMinutesSeconds, + }, + { + "weekly job seconds out of range", + WeeklyJob( + 1, + NewWeekdays(time.Monday), + NewAtTimes( + NewAtTime(1, 0, 100), + ), + ), + nil, + ErrWeeklyJobMinutesSeconds, + }, + { + "monthly job at times nil", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + nil, + ), + nil, + ErrMonthlyJobAtTimesNil, + }, + { + "monthly job at time nil", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes(nil), + ), + nil, + ErrMonthlyJobAtTimeNil, + }, + { + "monthly job days out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(0), + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + nil, + ErrMonthlyJobDays, + }, + { + "monthly job days out of range", + MonthlyJob( + 1, + nil, + NewAtTimes( + NewAtTime(1, 0, 0), + ), + ), + nil, + ErrMonthlyJobDaysNil, + }, + { + "monthly job hours out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes( + NewAtTime(100, 0, 0), + ), + ), + nil, + ErrMonthlyJobHours, + }, + { + "monthly job minutes out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes( + NewAtTime(1, 100, 0), + ), + ), + nil, + ErrMonthlyJobMinutesSeconds, + }, + { + "monthly job seconds out of range", + MonthlyJob( + 1, + NewDaysOfTheMonth(1), + NewAtTimes( + NewAtTime(1, 0, 100), + ), + ), + nil, + ErrMonthlyJobMinutesSeconds, + }, + { + "WithName no name", + DurationJob( + time.Second, + ), + []JobOption{WithName("")}, + ErrWithNameEmpty, + }, + { + "WithStartDateTime is zero", + DurationJob( + time.Second, + ), + []JobOption{WithStartAt(WithStartDateTime(time.Time{}))}, + ErrWithStartDateTimePast, + }, + { + "WithStartDateTime is in the past", + DurationJob( + time.Second, + ), + []JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))}, + ErrWithStartDateTimePast, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler( + WithStopTimeout(time.Millisecond * 50), + ) + require.NoError(t, err) + + _, err = s.NewJob(tt.jd, NewTask(func() {}), tt.opts...) + assert.ErrorIs(t, err, tt.err) + err = s.Shutdown() + require.NoError(t, err) + }) + } +} + +func TestScheduler_Singleton(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + duration time.Duration + limitMode LimitMode + runCount int + expectedMin time.Duration + expectedMax time.Duration + }{ + { + "singleton mode reschedule", + time.Millisecond * 100, + LimitModeReschedule, + 3, + time.Millisecond * 600, + time.Millisecond * 1100, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + jobRanCh := make(chan struct{}, 10) + + s, err := newTestScheduler( + WithStopTimeout(1 * time.Second), + ) + require.NoError(t, err) + + _, err = s.NewJob( + DurationJob( + tt.duration, + ), + NewTask(func() { + time.Sleep(tt.duration * 2) + jobRanCh <- struct{}{} + }), + WithSingletonMode(tt.limitMode), + ) + require.NoError(t, err) + + start := time.Now() + s.Start() + + var runCount int + for runCount < tt.runCount { + select { + case <-jobRanCh: + runCount++ + case <-time.After(time.Second): + t.Fatalf("timed out waiting for jobs to run") + } + } + + stop := time.Now() + require.NoError(t, s.Shutdown()) + + assert.GreaterOrEqual(t, stop.Sub(start), tt.expectedMin) + assert.LessOrEqual(t, stop.Sub(start), tt.expectedMax) + }) + } +} + +func TestScheduler_LimitMode(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + numJobs int + limit uint + limitMode LimitMode + duration time.Duration + expectedMin time.Duration + expectedMax time.Duration + }{ + { + "limit mode reschedule", + 10, + 2, + LimitModeReschedule, + time.Millisecond * 100, + time.Millisecond * 400, + time.Millisecond * 700, + }, + { + "limit mode wait", + 10, + 2, + LimitModeWait, + time.Millisecond * 100, + time.Millisecond * 200, + time.Millisecond * 500, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s, err := newTestScheduler( + WithLimitConcurrentJobs(tt.limit, tt.limitMode), + WithStopTimeout(2*time.Second), + ) + require.NoError(t, err) + + jobRanCh := make(chan struct{}, 20) + + for i := 0; i < tt.numJobs; i++ { + _, err = s.NewJob( + DurationJob(tt.duration), + NewTask(func() { + time.Sleep(tt.duration / 2) + jobRanCh <- struct{}{} + }), + ) + require.NoError(t, err) + } + + start := time.Now() + s.Start() + + var runCount int + for runCount < tt.numJobs { + select { + case <-jobRanCh: + runCount++ + case <-time.After(time.Second): + t.Fatalf("timed out waiting for jobs to run") + } + } + stop := time.Now() + err = s.Shutdown() + require.NoError(t, err) + + assert.GreaterOrEqual(t, stop.Sub(start), tt.expectedMin) + assert.LessOrEqual(t, stop.Sub(start), tt.expectedMax) + }) + } +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..f539078 --- /dev/null +++ b/util.go @@ -0,0 +1,122 @@ +package gocron + +import ( + "context" + "reflect" + "slices" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/exp/maps" +) + +func callJobFuncWithParams(jobFunc any, params ...any) error { + if jobFunc == nil { + return nil + } + f := reflect.ValueOf(jobFunc) + if f.IsZero() { + return nil + } + if len(params) != f.Type().NumIn() { + return nil + } + in := make([]reflect.Value, len(params)) + for k, param := range params { + in[k] = reflect.ValueOf(param) + } + vals := f.Call(in) + for _, val := range vals { + i := val.Interface() + if err, ok := i.(error); ok { + return err + } + } + return nil +} + +func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + return requestJobCtx(ctx, id, ch) +} + +func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob { + resp := make(chan internalJob, 1) + select { + case <-ctx.Done(): + return nil + default: + } + + select { + case ch <- jobOutRequest{ + id: id, + outChan: resp, + }: + default: + return nil + } + var j internalJob + select { + case <-ctx.Done(): + return nil + case jobReceived := <-resp: + j = jobReceived + } + return &j +} + +func removeSliceDuplicatesInt(in []int) []int { + m := make(map[int]struct{}) + + for _, i := range in { + m[i] = struct{}{} + } + return maps.Keys(m) +} + +func convertAtTimesToDateTime(atTimes AtTimes, location *time.Location) ([]time.Time, error) { + if atTimes == nil { + return nil, errAtTimesNil + } + var atTimesDate []time.Time + for _, a := range atTimes() { + if a == nil { + return nil, errAtTimeNil + } + at := a() + if at.hours > 23 { + return nil, errAtTimeHours + } else if at.minutes > 59 || at.seconds > 59 { + return nil, errAtTimeMinSec + } + atTimesDate = append(atTimesDate, at.time(location)) + } + slices.SortStableFunc(atTimesDate, func(a, b time.Time) int { + return a.Compare(b) + }) + return atTimesDate, nil +} + +type waitGroupWithMutex struct { + wg sync.WaitGroup + mu sync.Mutex +} + +func (w *waitGroupWithMutex) Add(delta int) { + w.mu.Lock() + defer w.mu.Unlock() + w.wg.Add(delta) +} + +func (w *waitGroupWithMutex) Done() { + w.wg.Done() +} + +func (w *waitGroupWithMutex) Wait() { + w.mu.Lock() + defer w.mu.Unlock() + w.wg.Wait() +} diff --git a/util_test.go b/util_test.go new file mode 100644 index 0000000..2214ade --- /dev/null +++ b/util_test.go @@ -0,0 +1,165 @@ +package gocron + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRemoveSliceDuplicatesInt(t *testing.T) { + tests := []struct { + name string + input []int + expected []int + }{ + { + "lots of duplicates", + []int{ + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, + 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, + }, + []int{1, 2, 3, 4, 5}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := removeSliceDuplicatesInt(tt.input) + assert.ElementsMatch(t, tt.expected, result) + }) + } +} + +func TestCallJobFuncWithParams(t *testing.T) { + type f1 func() + tests := []struct { + name string + jobFunc any + params []any + expectedErr error + }{ + { + "nil jobFunc", + nil, + nil, + nil, + }, + { + "zero jobFunc", + f1(nil), + nil, + nil, + }, + { + "wrong number of params", + func(one string, two int) {}, + []any{"one"}, + nil, + }, + { + "function that returns an error", + func() error { + return fmt.Errorf("test error") + }, + nil, + fmt.Errorf("test error"), + }, + { + "function that returns no error", + func() error { + return nil + }, + nil, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := callJobFuncWithParams(tt.jobFunc, tt.params...) + assert.Equal(t, tt.expectedErr, err) + }) + } +} + +func TestConvertAtTimesToDateTime(t *testing.T) { + tests := []struct { + name string + atTimes AtTimes + location *time.Location + expected []time.Time + err error + }{ + { + "atTimes is nil", + nil, + time.UTC, + nil, + errAtTimesNil, + }, + { + "atTime is nil", + NewAtTimes(nil), + time.UTC, + nil, + errAtTimeNil, + }, + { + "atTimes hours is invalid", + NewAtTimes( + NewAtTime(24, 0, 0), + ), + time.UTC, + nil, + errAtTimeHours, + }, + { + "atTimes minutes are invalid", + NewAtTimes( + NewAtTime(0, 60, 0), + ), + time.UTC, + nil, + errAtTimeMinSec, + }, + { + "atTimes seconds are invalid", + NewAtTimes( + NewAtTime(0, 0, 60), + ), + time.UTC, + nil, + errAtTimeMinSec, + }, + { + "atTimes valid", + NewAtTimes( + NewAtTime(0, 0, 3), + NewAtTime(0, 0, 0), + NewAtTime(0, 0, 1), + NewAtTime(0, 0, 2), + ), + time.UTC, + []time.Time{ + time.Date(0, 0, 0, 0, 0, 0, 0, time.UTC), + time.Date(0, 0, 0, 0, 0, 1, 0, time.UTC), + time.Date(0, 0, 0, 0, 0, 2, 0, time.UTC), + time.Date(0, 0, 0, 0, 0, 3, 0, time.UTC), + }, + nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := convertAtTimesToDateTime(tt.atTimes, tt.location) + assert.Equal(t, tt.expected, result) + assert.Equal(t, tt.err, err) + }) + } +}