Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
d42f9e6638
|
|
@ -1,5 +1,6 @@
|
|||
package org.jetlinks.community.rule.engine.executor;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.AllArgsConstructor;
|
||||
import org.jetlinks.core.codec.defaults.JsonCodec;
|
||||
import org.jetlinks.core.event.EventBus;
|
||||
|
|
@ -8,6 +9,7 @@ import org.jetlinks.reactor.ql.ReactorQL;
|
|||
import org.jetlinks.rule.engine.api.RuleConstants;
|
||||
import org.jetlinks.rule.engine.api.RuleData;
|
||||
import org.jetlinks.rule.engine.api.RuleDataHelper;
|
||||
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
|
||||
import org.jetlinks.rule.engine.api.task.ExecutionContext;
|
||||
import org.jetlinks.rule.engine.api.task.TaskExecutor;
|
||||
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
|
||||
|
|
@ -18,6 +20,7 @@ import reactor.core.Disposable;
|
|||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
|
|
@ -53,19 +56,20 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
|
|||
|
||||
@Override
|
||||
protected Disposable doStart() {
|
||||
Flux<Map<String, Object>> dataStream;
|
||||
Flux<Object> dataStream;
|
||||
//有上游节点
|
||||
if (!CollectionUtils.isEmpty(context.getJob().getInputs())) {
|
||||
|
||||
dataStream = context.getInput()
|
||||
dataStream = context
|
||||
.getInput()
|
||||
.accept()
|
||||
.map(RuleDataHelper::toContextMap)
|
||||
.flatMap(v -> reactorQL.start(Flux.just(v))
|
||||
.flatMap(ruleData -> reactorQL
|
||||
.start(Flux.just(RuleDataHelper.toContextMap(ruleData)))
|
||||
.map(ruleData::newData)
|
||||
.onErrorResume(err -> {
|
||||
context.getLogger().error(err.getMessage(),err);
|
||||
context.getLogger().error(err.getMessage(), err);
|
||||
return context.onError(err, null).then(Mono.empty());
|
||||
}))
|
||||
;
|
||||
}));
|
||||
} else {
|
||||
dataStream = reactorQL
|
||||
.start(table -> {
|
||||
|
|
@ -75,26 +79,33 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
|
|||
if (table.startsWith("/")) {
|
||||
//转换为消息
|
||||
return eventBus
|
||||
.subscribe(org.jetlinks.core.event.Subscription.of(
|
||||
"rule-engine:"
|
||||
.concat(context.getInstanceId())
|
||||
.concat(":")
|
||||
.concat(context.getJob().getNodeId()),
|
||||
table,
|
||||
Subscription.Feature.local
|
||||
)
|
||||
)
|
||||
.map(payload -> payload.bodyToJson(true));
|
||||
.subscribe(Subscription
|
||||
.builder()
|
||||
.subscriberId("rule-engine:"
|
||||
.concat(context.getInstanceId())
|
||||
.concat(":")
|
||||
.concat(context.getJob().getNodeId()))
|
||||
.local()
|
||||
.build())
|
||||
.flatMap(payload -> {
|
||||
try {
|
||||
return Mono.just(payload.bodyToJson(true));
|
||||
} catch (Throwable error) {
|
||||
return context.onError(error, null);
|
||||
}
|
||||
});
|
||||
}
|
||||
return Flux.just(1);
|
||||
});
|
||||
})
|
||||
.cast(Object.class);
|
||||
}
|
||||
|
||||
return dataStream
|
||||
.flatMap(result -> {
|
||||
RuleData data = context.newRuleData(result);
|
||||
//输出到下一节点
|
||||
return context.getOutput()
|
||||
return context
|
||||
.getOutput()
|
||||
.write(Mono.just(data))
|
||||
.then(context.fireEvent(RuleConstants.Event.result, data));
|
||||
})
|
||||
|
|
@ -103,12 +114,17 @@ public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
|
|||
}
|
||||
|
||||
protected ReactorQL createQl() {
|
||||
ReactorQL.Builder builder = Optional.ofNullable(context.getJob().getConfiguration())
|
||||
.map(map -> map.get("sql"))
|
||||
.map(String::valueOf)
|
||||
.map(ReactorQL.builder()::sql)
|
||||
.orElseThrow(() -> new IllegalArgumentException("配置sql错误"));
|
||||
return builder.build();
|
||||
try {
|
||||
ReactorQL.Builder builder = Optional
|
||||
.ofNullable(context.getJob().getConfiguration())
|
||||
.map(map -> map.get("sql"))
|
||||
.map(String::valueOf)
|
||||
.map(ReactorQL.builder()::sql)
|
||||
.orElseThrow(() -> new IllegalArgumentException("配置sql错误"));
|
||||
return builder.build();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("SQL格式错误:" + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue