compressChildren() {
+ if (CollectionUtils.isEmpty(children)) {
+ return children;
+ }
+ if (children.size() == 1) {
+ TermSpec child = children.get(0);
+ if (child.column == null && Objects.equals(child.getTermType(), TermType.eq)) {
+ return child.compressChildren();
+ }
+ }
+ return children;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ toString(sb);
+ return sb.toString();
+ }
+}
diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/topic/Topics.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/topic/Topics.java
index e43c73b7..10f4d3b8 100755
--- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/topic/Topics.java
+++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/topic/Topics.java
@@ -3,14 +3,65 @@ package org.jetlinks.community.topic;
import lombok.Generated;
import org.jetlinks.core.utils.StringBuilderUtils;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
public interface Topics {
+ @Deprecated
+ static String org(String orgId, String topic) {
+ if (!topic.startsWith("/")) {
+ topic = "/" + topic;
+ }
+ return String.join("", "/org/", orgId, topic);
+ }
+
+ static String creator(String creatorId, String topic) {
+ return StringBuilderUtils.buildString(creatorId, topic, Topics::creator);
+ }
+
+ static void creator(String creatorId, String topic, StringBuilder builder) {
+ builder.append("/user/").append(creatorId);
+ if (topic.charAt(0) != '/') {
+ builder.append('/');
+ }
+ builder.append(topic);
+ }
+
+ static void binding(String type, String id, String topic, StringBuilder builder) {
+ builder.append('/')
+ .append(type)
+ .append('/')
+ .append(id);
+ if (topic.charAt(0) != '/') {
+ builder.append('/');
+ }
+ builder.append(topic);
+ }
+
+ /**
+ * 根据关系构造topic
+ * {@code
+ * /rel/{objectType}/{objectId}/{relation}/{topic}
+ *
+ * 如: /rel/用户/user1/manager/{topic}
+ * }
+ *
+ * @param objectType 对象类型
+ * @param objectId 对象ID
+ * @param relation 关系标识
+ * @param topic topic后缀
+ * @param builder StringBuilder
+ */
+ static void relation(String objectType, String objectId, String relation, String topic, StringBuilder builder) {
+ builder.append("/rel/")
+ .append(objectType)
+ .append('/')
+ .append(objectId)
+ .append('/')
+ .append(relation);
+ if (topic.charAt(0) != '/') {
+ builder.append('/');
+ }
+ builder.append(topic);
+ }
String allDeviceRegisterEvent = "/_sys/registry-device/*/register";
String allDeviceUnRegisterEvent = "/_sys/registry-device/*/unregister";
@@ -64,6 +115,11 @@ public interface Topics {
return String.join("", "/alarm/", targetType, "/", targetId, "/", alarmId, "/record");
}
+ static String alarmRelieve(String targetType, String targetId, String alarmId) {
+ // /alarm/{targetType}/{targetId}/{alarmId}/relieve
+ return String.join("", "/alarm/", targetType, "/", targetId, "/", alarmId, "/relieve");
+ }
+
interface Authentications {
String allUserAuthenticationChanged = "/_sys/user-dimension-changed/*";
diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java
index 3d1aae29..1bf3261e 100644
--- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java
+++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ReactorUtils.java
@@ -58,6 +58,7 @@ public class ReactorUtils {
return FluxUtils.distinct(keySelector, duration);
}
+
/**
* 尝试执行 {@link Disposable#dispose()}
*
@@ -198,14 +199,14 @@ public class ReactorUtils {
termType = FixedTermTypeSupport.gte.name();
break;
case "<":
- termType = FixedTermTypeSupport.lt.getName();
+ termType = FixedTermTypeSupport.lt.name();
break;
case "<=":
- termType = FixedTermTypeSupport.lte.getName();
+ termType = FixedTermTypeSupport.lte.name();
break;
case "!=":
case "<>":
- termType = FixedTermTypeSupport.neq.getName();
+ termType = FixedTermTypeSupport.neq.name();
break;
}
diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimit.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimit.java
index 35241b3e..b8dc5a3d 100644
--- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimit.java
+++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimit.java
@@ -5,6 +5,7 @@ import lombok.Getter;
import lombok.Setter;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;
import javax.annotation.Nonnull;
@@ -41,9 +42,8 @@ public class ShakeLimit implements Serializable {
private boolean alarmFirst;
/**
- *
* 利用窗口函数,将ReactorQL语句包装为支持抖动限制的SQL.
- *
+ *
* select * from ( sql )
* group by
* _window('1s') --时间窗口
@@ -78,6 +78,7 @@ public class ShakeLimit implements Serializable {
* @param totalConsumer 总数接收器
* @param 数据类型
* @return 新流
+ * @deprecated {@link ShakeLimitProvider#shakeLimit(String, Flux, ShakeLimit)}
*/
public Flux transfer(Flux source,
BiFunction, Flux>> windowFunction,
@@ -88,18 +89,27 @@ public class ShakeLimit implements Serializable {
int thresholdNumber = getThreshold();
Duration windowTime = Duration.ofSeconds(getTime());
- return source
- .as(flux -> windowFunction.apply(windowTime, flux))
+ return windowFunction
+ .apply(windowTime, source)
//处理每一组数据
.flatMap(group -> group
//给数据打上索引,索引号就是告警次数
.index((index, data) -> Tuples.of(index + 1, data))
+ .switchOnFirst((e, flux) -> {
+ if (e.hasValue()) {
+ @SuppressWarnings("all")
+ T ele = e.get().getT2();
+ return flux.map(tp2 -> Tuples.of(tp2.getT1(), tp2.getT2(), ele));
+ }
+ return flux.then(Mono.empty());
+ })
//超过阈值告警时
.filter(tp -> tp.getT1() >= thresholdNumber)
.as(flux -> isAlarmFirst() ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
- .map(tp2 -> {
- totalConsumer.accept(tp2.getT2(), tp2.getT1());
- return tp2.getT2();
- }));
+ .map(tp3 -> {
+ T next = isAlarmFirst() ? tp3.getT3() : tp3.getT2();
+ totalConsumer.accept(next, tp3.getT1());
+ return next;
+ }), Integer.MAX_VALUE);
}
}
\ No newline at end of file
diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimitProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimitProvider.java
new file mode 100644
index 00000000..86215b9e
--- /dev/null
+++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimitProvider.java
@@ -0,0 +1,37 @@
+package org.jetlinks.community.rule.engine.commons;
+
+import org.jetlinks.community.spi.Provider;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.GroupedFlux;
+
+/**
+ * 防抖提供商
+ *
+ * @author zhouhao
+ * @since 2.2
+ */
+public interface ShakeLimitProvider {
+
+ Provider supports = Provider.create(ShakeLimitProvider.class);
+
+ /**
+ * @return 提供商唯一标识
+ */
+ String provider();
+
+ /**
+ * 对指定分组数据源进行防抖,并输出满足条件的数据.
+ *
+ * @param sourceKey 数据源唯一标识
+ * @param grouped 分组数据源
+ * @param limit 防抖条件
+ * @param 数据类型
+ * @return 防抖结果
+ */
+ Flux> shakeLimit(
+ String sourceKey,
+ Flux> grouped,
+ ShakeLimit limit);
+
+
+}
diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimitResult.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimitResult.java
new file mode 100644
index 00000000..24b4c60c
--- /dev/null
+++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/ShakeLimitResult.java
@@ -0,0 +1,16 @@
+package org.jetlinks.community.rule.engine.commons;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.ToString;
+
+@Getter
+@AllArgsConstructor
+@ToString
+public class ShakeLimitResult {
+
+ private String groupKey;
+ private long times;
+ private T element;
+
+}
diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/impl/SimpleShakeLimitProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/impl/SimpleShakeLimitProvider.java
new file mode 100644
index 00000000..f270bad4
--- /dev/null
+++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/commons/impl/SimpleShakeLimitProvider.java
@@ -0,0 +1,87 @@
+package org.jetlinks.community.rule.engine.commons.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.community.rule.engine.commons.ShakeLimit;
+import org.jetlinks.community.rule.engine.commons.ShakeLimitProvider;
+import org.jetlinks.community.rule.engine.commons.ShakeLimitResult;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.GroupedFlux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuples;
+
+import java.time.Duration;
+
+@Slf4j
+public class SimpleShakeLimitProvider implements ShakeLimitProvider {
+
+ public static final ShakeLimitProvider GLOBAL = new SimpleShakeLimitProvider();
+
+ public static final String PROVIDER = "simple";
+
+ @Override
+ public String provider() {
+ return PROVIDER;
+ }
+
+ protected Flux wrapSource(String sourceKey, Flux source) {
+ return source;
+ }
+
+ @Override
+ public Flux> shakeLimit(String sourceKey,
+ Flux> grouped,
+ ShakeLimit limit) {
+ int thresholdNumber = limit.getThreshold();
+ boolean isAlarmFirst = limit.isAlarmFirst();
+ Duration windowSpan = Duration.ofSeconds(limit.getTime());
+ return grouped
+ .flatMap(group -> {
+ String groupKey = group.key();
+ String key = sourceKey + ":" + groupKey;
+ return Flux
+ .defer(() -> this
+ //使用timeout,当2倍窗口时间没有收到数据时,则结束分组.释放内存.
+ .wrapSource(key, group.timeout(windowSpan.plus(windowSpan), Mono.empty())))
+ //按时间窗口分组
+ .window(windowSpan)
+ .flatMap(source -> this
+ .handleWindow(key,
+ groupKey,
+ windowSpan,
+ source,
+ thresholdNumber,
+ isAlarmFirst))
+ .onErrorResume(err -> {
+ log.warn("shake limit [{}] error", key, err);
+ return Mono.empty();
+ });
+ }, Integer.MAX_VALUE);
+ }
+
+ protected Mono> handleWindow(String key,
+ String groupKey,
+ Duration duration,
+ Flux source,
+ long thresholdNumber,
+ boolean isAlarmFirst) {
+ //给数据打上索引,索引号就是告警次数
+ return source
+ .index((index, data) -> Tuples.of(index + 1, data))
+ .switchOnFirst((e, flux) -> {
+ if (e.hasValue()) {
+ @SuppressWarnings("all")
+ T ele = e.get().getT2();
+ return flux.map(tp2 -> Tuples.of(tp2.getT1(), tp2.getT2(), ele));
+ }
+ return flux.then(Mono.empty());
+ })
+ //超过阈值告警时
+ .filter(tp -> tp.getT1() >= thresholdNumber)
+ .as(flux -> isAlarmFirst ? flux.take(1) : flux.takeLast(1))//取第一个或者最后一个
+ .map(tp3 -> {
+ T next = isAlarmFirst ? tp3.getT3() : tp3.getT2();
+ return new ShakeLimitResult<>(groupKey, tp3.getT1(), next);
+ })
+ .singleOrEmpty();
+ }
+}
diff --git a/jetlinks-manager/rule-engine-manager/pom.xml b/jetlinks-manager/rule-engine-manager/pom.xml
index 3dc5b7bd..8ab8c789 100644
--- a/jetlinks-manager/rule-engine-manager/pom.xml
+++ b/jetlinks-manager/rule-engine-manager/pom.xml
@@ -44,6 +44,10 @@
${project.version}