refactor(规则引擎): 优化定时任务

This commit is contained in:
zhouhao 2023-08-02 10:33:10 +08:00
parent b1319225e8
commit 92ba89714f
2 changed files with 155 additions and 57 deletions

View File

@ -16,18 +16,23 @@ import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.exception.ValidationException;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import javax.annotation.Nonnull;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.time.*;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
@ -88,15 +93,27 @@ public class TimerSpec implements Serializable {
}
return predicate.and(range);
}
if (mod == ExecuteMod.once){
if (mod == ExecuteMod.once) {
LocalTime onceTime = once.localTime();
Predicate<LocalDateTime> predicate
= time -> time.toLocalTime().compareTo(onceTime) == 0;
= time -> compareOnceTime(time.toLocalTime(), onceTime) == 0;
return predicate.and(range);
}
return range;
}
public int compareOnceTime(LocalTime time1, LocalTime time2) {
int cmp = Integer.compare(time1.getHour(), time2.getHour());
if (cmp == 0) {
cmp = Integer.compare(time1.getMinute(), time2.getMinute());
if (cmp == 0) {
cmp = Integer.compare(time1.getSecond(), time2.getSecond());
//不比较纳秒
}
}
return cmp;
}
public String toCronExpression() {
return toCron().asString();
}
@ -280,10 +297,23 @@ public class TimerSpec implements Serializable {
* @return 构造器
*/
public Function<ZonedDateTime, Duration> nextDurationBuilder() {
Function<ZonedDateTime, ZonedDateTime> nextTime = nextTimeBuilder();
return time -> Duration.between(time, nextTime.apply(time));
return nextDurationBuilder(ZonedDateTime.now());
}
public Function<ZonedDateTime, Duration> nextDurationBuilder(ZonedDateTime baseTime) {
Iterator<ZonedDateTime> it = iterable().iterator(baseTime);
return (time) -> {
Duration duration;
do {
duration = Duration.between(time, time = it.next());
}
while (duration.toMillis() < 0);
return duration;
};
}
/**
* 创建一个时间构造器,通过构造器来获取下一次时间
* <pre>{@code
@ -386,7 +416,7 @@ public class TimerSpec implements Serializable {
public ZonedDateTime next() {
ZonedDateTime dateTime = current;
int max = MAX_IT_TIMES;
if (dateTime.toLocalTime().compareTo(onceTime) != 0){
if (!dateTime.toLocalTime().equals(onceTime)) {
dateTime = onceTime.atDate(dateTime.toLocalDate()).atZone(dateTime.getZone());
}
do {
@ -403,7 +433,7 @@ public class TimerSpec implements Serializable {
}
public TimerIterable iterable() {
if ((trigger == Trigger.cron || trigger == null) && cron != null){
if ((trigger == Trigger.cron || trigger == null) && cron != null) {
return cronIterable();
}
return mod == ExecuteMod.period ? periodIterable() : onceIterable();
@ -418,6 +448,87 @@ public class TimerSpec implements Serializable {
return timeList;
}
public Flux<Long> flux() {
return flux(Schedulers.parallel());
}
public Flux<Long> flux(Scheduler scheduler) {
return new TimerFlux(nextDurationBuilder(), scheduler);
}
@AllArgsConstructor
static class TimerFlux extends Flux<Long> {
final Function<ZonedDateTime, Duration> spec;
final Scheduler scheduler;
@Override
public void subscribe(@Nonnull CoreSubscriber<? super Long> coreSubscriber) {
TimerSubscriber subscriber = new TimerSubscriber(spec, scheduler, coreSubscriber);
coreSubscriber.onSubscribe(subscriber);
}
}
static class TimerSubscriber implements Subscription {
final Function<ZonedDateTime, Duration> spec;
final CoreSubscriber<? super Long> subscriber;
final Scheduler scheduler;
long count;
Disposable scheduling;
public TimerSubscriber(Function<ZonedDateTime, Duration> spec,
Scheduler scheduler,
CoreSubscriber<? super Long> subscriber) {
this.scheduler = scheduler;
this.spec = spec;
this.subscriber = subscriber;
}
@Override
public void request(long l) {
trySchedule();
}
@Override
public void cancel() {
if (scheduling != null) {
scheduling.dispose();
}
}
public void onNext() {
if (canSchedule()) {
subscriber.onNext(count++);
}
trySchedule();
}
void trySchedule() {
if (scheduling != null) {
scheduling.dispose();
}
ZonedDateTime now = ZonedDateTime.ofInstant(Instant.ofEpochMilli(scheduler.now(TimeUnit.MILLISECONDS)), ZoneId.systemDefault());
Duration delay = spec.apply(now);
scheduling = scheduler
.schedule(
this::onNext,
delay.toMillis(),
TimeUnit.MILLISECONDS
);
}
protected boolean canSchedule() {
return true;
}
}
public enum Trigger {
week,
month,

View File

@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.TimerSpec;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
@ -24,6 +25,7 @@ import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -32,8 +34,6 @@ import java.util.function.Supplier;
@AllArgsConstructor
public class TimerTaskExecutorProvider implements TaskExecutorProvider {
private final Scheduler scheduler = Schedulers.parallel();
@Override
public String getExecutor() {
return "timer";
@ -44,17 +44,13 @@ public class TimerTaskExecutorProvider implements TaskExecutorProvider {
return Mono.just(new TimerTaskExecutor(context));
}
class TimerTaskExecutor extends AbstractTaskExecutor {
Supplier<Duration> nextDelay;
static class TimerTaskExecutor extends AbstractTaskExecutor {
TimerSpec spec;
Predicate<LocalDateTime> filter;
public TimerTaskExecutor(ExecutionContext context) {
super(context);
nextDelay = createNextDelay();
spec = FastBeanCopier.copy(context.getJob().getConfiguration(), new TimerSpec());
}
@Override
@ -68,57 +64,48 @@ public class TimerTaskExecutorProvider implements TaskExecutorProvider {
}
private Disposable execute() {
Duration nextTime = nextDelay.get();
context.getLogger().debug("trigger timed task after {}", nextTime);
if (this.disposable != null) {
this.disposable.dispose();
}
return this.disposable =
Mono.delay(nextTime, scheduler)
.flatMap(t -> {
if (!this.filter.test(LocalDateTime.now())) {
return Mono.empty();
}
Map<String, Object> data = new HashMap<>();
long currentTime = System.currentTimeMillis();
data.put("timestamp", currentTime);
data.put("_now", currentTime);
return context
.getOutput()
.write(Mono.just(context.newRuleData(data)))
.then(context
.fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis()))
.thenReturn(1));
})
.onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
.doFinally(s -> {
if (getState() == Task.State.running && s != SignalType.CANCEL) {
execute();
}
})
.subscribe();
return spec
.flux()
.onBackpressureDrop()
.concatMap(t -> {
Map<String, Object> data = new HashMap<>();
long currentTime = System.currentTimeMillis();
data.put("timestamp", currentTime);
data.put("_now", currentTime);
data.put("times", t);
RuleData ruleData = context.newRuleData(data);
return context
.getOutput()
.write(ruleData)
.then(context.fireEvent(RuleConstants.Event.result, ruleData))
.onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
.as(tracer());
})
.subscribe();
}
@Override
public void reload() {
nextDelay = createNextDelay();
spec = FastBeanCopier.copy(context.getJob().getConfiguration(), new TimerSpec());
if (disposable != null) {
disposable.dispose();
}
disposable = doStart();
}
@Override
public void validate() {
createNextDelay();
}
private Supplier<Duration> createNextDelay() {
TimerSpec spec = FastBeanCopier.copy(context.getJob().getConfiguration(), new TimerSpec());
Function<ZonedDateTime, Duration> builder = spec.nextDurationBuilder();
this.filter = spec.createTimeFilter();
return () -> builder.apply(ZonedDateTime.now());
spec.nextDurationBuilder();
spec.createTimeFilter();
}
@Override
public synchronized void shutdown() {
super.shutdown();
}
}
public static Flux<ZonedDateTime> getLastExecuteTimes(String cronExpression, Date from, long times) {