diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/TimerSpec.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/TimerSpec.java index 579d024a..e91d52c8 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/TimerSpec.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/TimerSpec.java @@ -16,18 +16,23 @@ import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.commons.collections4.CollectionUtils; import org.hswebframework.web.exception.ValidationException; +import org.reactivestreams.Subscription; import org.springframework.util.Assert; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import javax.annotation.Nonnull; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import java.io.Serializable; -import java.time.Duration; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.ZonedDateTime; +import java.time.*; import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalUnit; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Predicate; @@ -88,15 +93,27 @@ public class TimerSpec implements Serializable { } return predicate.and(range); } - if (mod == ExecuteMod.once){ + if (mod == ExecuteMod.once) { LocalTime onceTime = once.localTime(); Predicate predicate - = time -> time.toLocalTime().compareTo(onceTime) == 0; + = time -> compareOnceTime(time.toLocalTime(), onceTime) == 0; return predicate.and(range); } return range; } + public int compareOnceTime(LocalTime time1, LocalTime time2) { + int cmp = Integer.compare(time1.getHour(), time2.getHour()); + if (cmp == 0) { + cmp = Integer.compare(time1.getMinute(), time2.getMinute()); + if (cmp == 0) { + cmp = Integer.compare(time1.getSecond(), time2.getSecond()); + //不比较纳秒 + } + } + return cmp; + } + public String toCronExpression() { return toCron().asString(); } @@ -280,10 +297,23 @@ public class TimerSpec implements Serializable { * @return 构造器 */ public Function nextDurationBuilder() { - Function nextTime = nextTimeBuilder(); - return time -> Duration.between(time, nextTime.apply(time)); + return nextDurationBuilder(ZonedDateTime.now()); } + + public Function nextDurationBuilder(ZonedDateTime baseTime) { + Iterator it = iterable().iterator(baseTime); + return (time) -> { + Duration duration; + do { + duration = Duration.between(time, time = it.next()); + } + while (duration.toMillis() < 0); + return duration; + }; + } + + /** * 创建一个时间构造器,通过构造器来获取下一次时间 *
{@code
@@ -386,7 +416,7 @@ public class TimerSpec implements Serializable {
             public ZonedDateTime next() {
                 ZonedDateTime dateTime = current;
                 int max = MAX_IT_TIMES;
-                if (dateTime.toLocalTime().compareTo(onceTime) != 0){
+                if (!dateTime.toLocalTime().equals(onceTime)) {
                     dateTime = onceTime.atDate(dateTime.toLocalDate()).atZone(dateTime.getZone());
                 }
                 do {
@@ -403,7 +433,7 @@ public class TimerSpec implements Serializable {
     }
 
     public TimerIterable iterable() {
-        if ((trigger == Trigger.cron || trigger == null) && cron != null){
+        if ((trigger == Trigger.cron || trigger == null) && cron != null) {
             return cronIterable();
         }
         return mod == ExecuteMod.period ? periodIterable() : onceIterable();
@@ -418,6 +448,87 @@ public class TimerSpec implements Serializable {
         return timeList;
     }
 
+
+    public Flux flux() {
+        return flux(Schedulers.parallel());
+    }
+
+    public Flux flux(Scheduler scheduler) {
+        return new TimerFlux(nextDurationBuilder(), scheduler);
+    }
+
+    @AllArgsConstructor
+    static class TimerFlux extends Flux {
+        final Function  spec;
+        final Scheduler scheduler;
+
+        @Override
+        public void subscribe(@Nonnull CoreSubscriber coreSubscriber) {
+
+            TimerSubscriber subscriber = new TimerSubscriber(spec, scheduler, coreSubscriber);
+            coreSubscriber.onSubscribe(subscriber);
+        }
+    }
+
+    static class TimerSubscriber implements Subscription {
+        final Function spec;
+        final CoreSubscriber subscriber;
+        final Scheduler scheduler;
+        long count;
+        Disposable scheduling;
+
+        public TimerSubscriber(Function spec,
+                               Scheduler scheduler,
+                               CoreSubscriber subscriber) {
+            this.scheduler = scheduler;
+            this.spec = spec;
+            this.subscriber = subscriber;
+        }
+
+
+        @Override
+        public void request(long l) {
+            trySchedule();
+        }
+
+        @Override
+        public void cancel() {
+            if (scheduling != null) {
+                scheduling.dispose();
+            }
+        }
+
+        public void onNext() {
+
+            if (canSchedule()) {
+                subscriber.onNext(count++);
+            }
+            trySchedule();
+        }
+
+        void trySchedule() {
+            if (scheduling != null) {
+                scheduling.dispose();
+            }
+
+            ZonedDateTime now = ZonedDateTime.ofInstant(Instant.ofEpochMilli(scheduler.now(TimeUnit.MILLISECONDS)), ZoneId.systemDefault());
+            Duration delay = spec.apply(now);
+
+            scheduling = scheduler
+                .schedule(
+                    this::onNext,
+                    delay.toMillis(),
+                    TimeUnit.MILLISECONDS
+                );
+        }
+
+        protected boolean canSchedule() {
+            return true;
+        }
+
+
+    }
+
     public enum Trigger {
         week,
         month,
diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/TimerTaskExecutorProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/TimerTaskExecutorProvider.java
index 35782fb3..6bdaeb5e 100644
--- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/TimerTaskExecutorProvider.java
+++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/TimerTaskExecutorProvider.java
@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.TimerSpec;
 import org.jetlinks.rule.engine.api.RuleConstants;
+import org.jetlinks.rule.engine.api.RuleData;
 import org.jetlinks.rule.engine.api.task.ExecutionContext;
 import org.jetlinks.rule.engine.api.task.Task;
 import org.jetlinks.rule.engine.api.task.TaskExecutor;
@@ -24,6 +25,7 @@ import java.time.ZonedDateTime;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -32,8 +34,6 @@ import java.util.function.Supplier;
 @AllArgsConstructor
 public class TimerTaskExecutorProvider implements TaskExecutorProvider {
 
-    private final Scheduler scheduler = Schedulers.parallel();
-
     @Override
     public String getExecutor() {
         return "timer";
@@ -44,17 +44,13 @@ public class TimerTaskExecutorProvider implements TaskExecutorProvider {
         return Mono.just(new TimerTaskExecutor(context));
     }
 
-    class TimerTaskExecutor extends AbstractTaskExecutor {
-
-        Supplier nextDelay;
+    static class TimerTaskExecutor extends AbstractTaskExecutor {
 
         TimerSpec spec;
 
-        Predicate filter;
-
         public TimerTaskExecutor(ExecutionContext context) {
             super(context);
-            nextDelay = createNextDelay();
+            spec = FastBeanCopier.copy(context.getJob().getConfiguration(), new TimerSpec());
         }
 
         @Override
@@ -68,57 +64,48 @@ public class TimerTaskExecutorProvider implements TaskExecutorProvider {
         }
 
         private Disposable execute() {
-            Duration nextTime = nextDelay.get();
-            context.getLogger().debug("trigger timed task after {}", nextTime);
-            if (this.disposable != null) {
-                this.disposable.dispose();
-            }
-            return this.disposable =
-                Mono.delay(nextTime, scheduler)
-                    .flatMap(t -> {
-                        if (!this.filter.test(LocalDateTime.now())) {
-                            return Mono.empty();
-                        }
-                        Map data = new HashMap<>();
-                        long currentTime = System.currentTimeMillis();
-                        data.put("timestamp", currentTime);
-                        data.put("_now", currentTime);
-                        return context
-                            .getOutput()
-                            .write(Mono.just(context.newRuleData(data)))
-                            .then(context
-                                      .fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis()))
-                                      .thenReturn(1));
-
-                    })
-                    .onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
-                    .doFinally(s -> {
-                        if (getState() == Task.State.running && s != SignalType.CANCEL) {
-                            execute();
-                        }
-                    })
-                    .subscribe();
+            return spec
+                .flux()
+                .onBackpressureDrop()
+                .concatMap(t -> {
+                    Map data = new HashMap<>();
+                    long currentTime = System.currentTimeMillis();
+                    data.put("timestamp", currentTime);
+                    data.put("_now", currentTime);
+                    data.put("times", t);
+                    RuleData ruleData = context.newRuleData(data);
+                    return context
+                        .getOutput()
+                        .write(ruleData)
+                        .then(context.fireEvent(RuleConstants.Event.result, ruleData))
+                        .onErrorResume(err -> context.onError(err, null).then(Mono.empty()))
+                        .as(tracer());
+                })
+                .subscribe();
         }
 
         @Override
         public void reload() {
-            nextDelay = createNextDelay();
+            spec = FastBeanCopier.copy(context.getJob().getConfiguration(), new TimerSpec());
+            if (disposable != null) {
+                disposable.dispose();
+            }
             disposable = doStart();
         }
 
         @Override
         public void validate() {
-            createNextDelay();
-        }
-
-        private Supplier createNextDelay() {
             TimerSpec spec = FastBeanCopier.copy(context.getJob().getConfiguration(), new TimerSpec());
-            Function builder = spec.nextDurationBuilder();
-            this.filter = spec.createTimeFilter();
-            return () -> builder.apply(ZonedDateTime.now());
-
+            spec.nextDurationBuilder();
+            spec.createTimeFilter();
         }
 
+        @Override
+        public synchronized void shutdown() {
+            super.shutdown();
+        }
+
+
     }
 
     public static Flux getLastExecuteTimes(String cronExpression, Date from, long times) {