From bf3a8e3d7379c5283b5c3b6f531a6fc5a903e21e Mon Sep 17 00:00:00 2001 From: zhouhao Date: Sat, 13 Mar 2021 18:43:49 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8DReactorQL=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=9C=89=E4=B8=8A=E6=B8=B8=E8=8A=82=E7=82=B9=E6=97=B6?= =?UTF-8?q?,header=E6=97=A0=E6=B3=95=E4=BC=A0=E9=80=92=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ReactorQLTaskExecutorProvider.java | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java index 508cb81b..8c555536 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.java @@ -1,5 +1,6 @@ package org.jetlinks.community.rule.engine.executor; +import com.alibaba.fastjson.JSONObject; import lombok.AllArgsConstructor; import org.jetlinks.core.codec.defaults.JsonCodec; import org.jetlinks.core.event.EventBus; @@ -8,6 +9,7 @@ import org.jetlinks.reactor.ql.ReactorQL; import org.jetlinks.rule.engine.api.RuleConstants; import org.jetlinks.rule.engine.api.RuleData; import org.jetlinks.rule.engine.api.RuleDataHelper; +import org.jetlinks.rule.engine.api.model.RuleNodeModel; import org.jetlinks.rule.engine.api.task.ExecutionContext; import org.jetlinks.rule.engine.api.task.TaskExecutor; import org.jetlinks.rule.engine.api.task.TaskExecutorProvider; @@ -18,6 +20,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -53,19 +56,20 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider { @Override protected Disposable doStart() { - Flux> dataStream; + Flux dataStream; //有上游节点 if (!CollectionUtils.isEmpty(context.getJob().getInputs())) { - dataStream = context.getInput() + dataStream = context + .getInput() .accept() - .map(RuleDataHelper::toContextMap) - .flatMap(v -> reactorQL.start(Flux.just(v)) + .flatMap(ruleData -> reactorQL + .start(Flux.just(RuleDataHelper.toContextMap(ruleData))) + .map(ruleData::newData) .onErrorResume(err -> { - context.getLogger().error(err.getMessage(),err); + context.getLogger().error(err.getMessage(), err); return context.onError(err, null).then(Mono.empty()); - })) - ; + })); } else { dataStream = reactorQL .start(table -> { @@ -75,26 +79,33 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider { if (table.startsWith("/")) { //转换为消息 return eventBus - .subscribe(org.jetlinks.core.event.Subscription.of( - "rule-engine:" - .concat(context.getInstanceId()) - .concat(":") - .concat(context.getJob().getNodeId()), - table, - Subscription.Feature.local - ) - ) - .map(payload -> payload.bodyToJson(true)); + .subscribe(Subscription + .builder() + .subscriberId("rule-engine:" + .concat(context.getInstanceId()) + .concat(":") + .concat(context.getJob().getNodeId())) + .local() + .build()) + .flatMap(payload -> { + try { + return Mono.just(payload.bodyToJson(true)); + } catch (Throwable error) { + return context.onError(error, null); + } + }); } return Flux.just(1); - }); + }) + .cast(Object.class); } return dataStream .flatMap(result -> { RuleData data = context.newRuleData(result); //输出到下一节点 - return context.getOutput() + return context + .getOutput() .write(Mono.just(data)) .then(context.fireEvent(RuleConstants.Event.result, data)); }) @@ -103,12 +114,17 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider { } protected ReactorQL createQl() { - ReactorQL.Builder builder = Optional.ofNullable(context.getJob().getConfiguration()) - .map(map -> map.get("sql")) - .map(String::valueOf) - .map(ReactorQL.builder()::sql) - .orElseThrow(() -> new IllegalArgumentException("配置sql错误")); - return builder.build(); + try { + ReactorQL.Builder builder = Optional + .ofNullable(context.getJob().getConfiguration()) + .map(map -> map.get("sql")) + .map(String::valueOf) + .map(ReactorQL.builder()::sql) + .orElseThrow(() -> new IllegalArgumentException("配置sql错误")); + return builder.build(); + } catch (Exception e) { + throw new IllegalArgumentException("SQL格式错误:" + e.getMessage(), e); + } } @Override From 181da62f50ef8ae84b8fc61945a114d7e8eb5ed1 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Sat, 13 Mar 2021 18:44:24 +0800 Subject: [PATCH 2/2] remove spring repository --- pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pom.xml b/pom.xml index a8a941c9..19238a56 100644 --- a/pom.xml +++ b/pom.xml @@ -409,12 +409,6 @@ - - spring.io - spring - https://repo.spring.io/milestone - -