diff --git a/jetlinks-components/rule-engine-component/pom.xml b/jetlinks-components/rule-engine-component/pom.xml
index 8c17cfe2..e89c9b69 100644
--- a/jetlinks-components/rule-engine-component/pom.xml
+++ b/jetlinks-components/rule-engine-component/pom.xml
@@ -55,10 +55,8 @@
- ${project.groupId}
- tcp-component
- ${project.version}
- compile
+ org.jetlinks
+ reactor-ql
diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java
new file mode 100644
index 00000000..45612e27
--- /dev/null
+++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/nodes/ReactorSqlNode.java
@@ -0,0 +1,134 @@
+package org.jetlinks.community.rule.engine.nodes;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.message.codec.MessagePayloadType;
+import org.jetlinks.community.gateway.EncodableMessage;
+import org.jetlinks.community.gateway.MessageGateway;
+import org.jetlinks.community.gateway.Subscription;
+import org.jetlinks.reactor.ql.ReactorQL;
+import org.jetlinks.rule.engine.api.RuleData;
+import org.jetlinks.rule.engine.api.RuleDataHelper;
+import org.jetlinks.rule.engine.api.events.RuleEvent;
+import org.jetlinks.rule.engine.api.executor.ExecutionContext;
+import org.jetlinks.rule.engine.api.model.NodeType;
+import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
+import org.jetlinks.rule.engine.executor.PayloadType;
+import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
+import org.reactivestreams.Publisher;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collections;
+import java.util.function.Function;
+
+/**
+ *
+ * {@code
+ *
+ * select avg(this.temperature) avgVal, deviceId
+ * from "/device/+/message/property/#"
+ * group _window(10,1) --每10条滚动数据
+ * having avgVal > 10
+ *
+ * }
+ *
+ */
+@Slf4j
+@AllArgsConstructor
+@Component
+public class ReactorSqlNode extends CommonExecutableRuleNodeFactoryStrategy {
+
+ private final MessageGateway messageGateway;
+
+ @Override
+ public String getSupportType() {
+ return "reactor-ql";
+ }
+
+ @Override
+ public Function> createExecutor(ExecutionContext context, Config config) {
+ ReactorQL ql = config.getReactorQL();
+
+ return data -> ql.start(Flux.just(RuleDataHelper.toContextMap(data)));
+ }
+
+ @Override
+ protected void onStarted(ExecutionContext context, Config config) {
+ log.debug("start reactor ql : {}", config.getSql());
+ context.onStop(
+ config.getReactorQL()
+ .start(table -> {
+ if (table == null || table.equalsIgnoreCase("dual")) {
+ return Flux.just(1);
+ }
+
+ if (table.startsWith("/")) {
+ return messageGateway
+ .subscribe(
+ Collections.singleton(new Subscription(table)),
+ "rule-engine:".concat(context.getInstanceId()),
+ false)
+ .map(msg -> {
+ //转换为消息
+ if (msg.getMessage() instanceof EncodableMessage) {
+ return ((EncodableMessage) msg.getMessage()).getNativePayload();
+ }
+ MessagePayloadType payloadType = msg.getMessage().getPayloadType();
+ if (payloadType == null) {
+ return msg.getMessage().getBytes();
+ }
+ return PayloadType.valueOf(payloadType.name()).read(msg.getMessage().getPayload());
+ });
+ }
+ return Flux.just(1);
+ })
+ .flatMap(result -> {
+ RuleData data = RuleData.create(result);
+ //输出到下一节点
+ return context.getOutput()
+ .write(Mono.just(RuleData.create(result)))
+ .then(context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, data));
+ })
+ .onErrorResume(err -> context.onError(RuleData.create(""), err))
+ .subscribe()::dispose
+ );
+
+ }
+
+ public static class Config implements RuleNodeConfig {
+
+ @Getter
+ @Setter
+ private String sql;
+
+ private volatile ReactorQL reactorQL;
+
+ @Override
+ public NodeType getNodeType() {
+ return NodeType.MAP;
+ }
+
+ @Override
+ public void setNodeType(NodeType nodeType) {
+
+ }
+
+ public ReactorQL getReactorQL() {
+ if (reactorQL == null) {
+ reactorQL = ReactorQL.builder().sql(sql).build();
+ }
+ return reactorQL;
+ }
+
+ @Override
+ public void validate() {
+ //不报错就ok
+ getReactorQL();
+ }
+ }
+
+}