From 06c57e91a6b559334ba4a6b5cbb96fc55a36cec0 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 22 Apr 2020 11:00:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=A7=84=E5=88=99=E5=BC=95?= =?UTF-8?q?=E6=93=8E=20reactor-ql=E8=8A=82=E7=82=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rule-engine-component/pom.xml | 6 +- .../rule/engine/nodes/ReactorSqlNode.java | 134 ++++++++++++++++++ 2 files changed, 136 insertions(+), 4 deletions(-) create mode 100644 jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java diff --git a/jetlinks-components/rule-engine-component/pom.xml b/jetlinks-components/rule-engine-component/pom.xml index 8c17cfe2..e89c9b69 100644 --- a/jetlinks-components/rule-engine-component/pom.xml +++ b/jetlinks-components/rule-engine-component/pom.xml @@ -55,10 +55,8 @@ - ${project.groupId} - tcp-component - ${project.version} - compile + org.jetlinks + reactor-ql diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java new file mode 100644 index 00000000..45612e27 --- /dev/null +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java @@ -0,0 +1,134 @@ +package org.jetlinks.community.rule.engine.nodes; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.jetlinks.core.message.codec.MessagePayloadType; +import org.jetlinks.community.gateway.EncodableMessage; +import org.jetlinks.community.gateway.MessageGateway; +import org.jetlinks.community.gateway.Subscription; +import org.jetlinks.reactor.ql.ReactorQL; +import org.jetlinks.rule.engine.api.RuleData; +import org.jetlinks.rule.engine.api.RuleDataHelper; +import org.jetlinks.rule.engine.api.events.RuleEvent; +import org.jetlinks.rule.engine.api.executor.ExecutionContext; +import org.jetlinks.rule.engine.api.model.NodeType; +import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy; +import org.jetlinks.rule.engine.executor.PayloadType; +import org.jetlinks.rule.engine.executor.node.RuleNodeConfig; +import org.reactivestreams.Publisher; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collections; +import java.util.function.Function; + +/** + *
+ *     {@code
+ *
+ *     select avg(this.temperature) avgVal, deviceId
+ *     from "/device/+/message/property/#"
+ *     group _window(10,1) --每10条滚动数据
+ *     having avgVal > 10
+ *
+ *     }
+ * 
+ */ +@Slf4j +@AllArgsConstructor +@Component +public class ReactorSqlNode extends CommonExecutableRuleNodeFactoryStrategy { + + private final MessageGateway messageGateway; + + @Override + public String getSupportType() { + return "reactor-ql"; + } + + @Override + public Function> createExecutor(ExecutionContext context, Config config) { + ReactorQL ql = config.getReactorQL(); + + return data -> ql.start(Flux.just(RuleDataHelper.toContextMap(data))); + } + + @Override + protected void onStarted(ExecutionContext context, Config config) { + log.debug("start reactor ql : {}", config.getSql()); + context.onStop( + config.getReactorQL() + .start(table -> { + if (table == null || table.equalsIgnoreCase("dual")) { + return Flux.just(1); + } + + if (table.startsWith("/")) { + return messageGateway + .subscribe( + Collections.singleton(new Subscription(table)), + "rule-engine:".concat(context.getInstanceId()), + false) + .map(msg -> { + //转换为消息 + if (msg.getMessage() instanceof EncodableMessage) { + return ((EncodableMessage) msg.getMessage()).getNativePayload(); + } + MessagePayloadType payloadType = msg.getMessage().getPayloadType(); + if (payloadType == null) { + return msg.getMessage().getBytes(); + } + return PayloadType.valueOf(payloadType.name()).read(msg.getMessage().getPayload()); + }); + } + return Flux.just(1); + }) + .flatMap(result -> { + RuleData data = RuleData.create(result); + //输出到下一节点 + return context.getOutput() + .write(Mono.just(RuleData.create(result))) + .then(context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, data)); + }) + .onErrorResume(err -> context.onError(RuleData.create(""), err)) + .subscribe()::dispose + ); + + } + + public static class Config implements RuleNodeConfig { + + @Getter + @Setter + private String sql; + + private volatile ReactorQL reactorQL; + + @Override + public NodeType getNodeType() { + return NodeType.MAP; + } + + @Override + public void setNodeType(NodeType nodeType) { + + } + + public ReactorQL getReactorQL() { + if (reactorQL == null) { + reactorQL = ReactorQL.builder().sql(sql).build(); + } + return reactorQL; + } + + @Override + public void validate() { + //不报错就ok + getReactorQL(); + } + } + +}