diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java index 92dc1fe1..ff0d1b54 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java @@ -73,6 +73,11 @@ public class DeviceAlarmRule implements Serializable { */ private List actions; + /** + * 防抖限制 + */ + private ShakeLimit shakeLimit; + public void validate() { if (org.apache.commons.collections.CollectionUtils.isEmpty(getTriggers())) { @@ -286,6 +291,28 @@ public class DeviceAlarmRule implements Serializable { } + /** + * 抖动限制 + * https://github.com/jetlinks/jetlinks-community/issues/8 + * + * @since 1.3 + */ + @Getter + @Setter + public static class ShakeLimit implements Serializable { + private boolean enabled; + + //时间限制,单位时间内发生多次告警时,只算一次。单位:秒 + private int time; + + //触发阈值,单位时间内发生n次告警,只算一次。 + private int threshold; + + //当发生第一次告警时就触发,为false时表示最后一次才触发(告警有延迟,但是可以统计出次数) + private boolean alarmFirst; + + } + @AllArgsConstructor @Getter public enum Operator { diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java index 08bbb743..e9709fa8 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java @@ -23,7 +23,10 @@ import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; +import java.time.Duration; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -157,9 +160,41 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy ); binds.forEach(context::bind); - return (ql == null ? ql = createQL() : ql) + + Flux> resultFlux = (ql == null ? ql = createQL() : ql) .start(context) - .map(ReactorQLRecord::asMap) + .map(ReactorQLRecord::asMap); + + DeviceAlarmRule.ShakeLimit shakeLimit; + if ((shakeLimit = rule.getShakeLimit()) != null + && shakeLimit.isEnabled() + && shakeLimit.getTime() > 0) { + int thresholdNumber = shakeLimit.getThreshold(); + //打开时间窗口 + Flux>> window = resultFlux.window(Duration.ofSeconds(shakeLimit.getTime())); + + Function>>, Publisher>>> mapper = + shakeLimit.isAlarmFirst() + ? + group -> group + .takeUntil(tp -> tp.getT1() >= thresholdNumber) //达到触发阈值 + .take(1) //取第一个 + .singleOrEmpty() + : + group -> group.takeLast(1).singleOrEmpty();//取最后一个 + + resultFlux = window + .flatMap(group -> group + .index((index, data) -> Tuples.of(index + 1, data)) + .transform(mapper) + .filter(tp -> tp.getT1() >= thresholdNumber) //超过阈值告警 + .map(tp2 -> { + tp2.getT2().put("totalAlarms", tp2.getT1()); + return tp2.getT2(); + })); + } + + return resultFlux .flatMap(map -> { map.put("productId", rule.getProductId()); map.put("alarmId", rule.getId());