Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhouhao 2020-07-03 17:32:34 +08:00
commit c2896f0a66
70 changed files with 3819 additions and 1470 deletions

View File

@ -2,6 +2,7 @@ root = true
[*]
charset = utf-8
end_of_line = lf
[*.java]
indent_style = space

View File

@ -19,7 +19,7 @@ jobs:
path: ~/.m2
key: jetlinks-community-maven-repository
- name: Build with Maven
run: mvn -B package && cd jetlinks-standalone && mvn docker:build
run: mvn -B package -Pbuild && cd jetlinks-standalone && mvn docker:build
- name: Login Docker Repo
run: echo "${{ secrets.ALIYUN_DOCKER_REPO_PWD }}" | docker login registry.cn-shenzhen.aliyuncs.com -u ${{ secrets.ALIYUN_DOCKER_REPO_USERNAME }} --password-stdin
- name: Push Docker

View File

@ -80,14 +80,14 @@ JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发,
| Http,WebSocket(TLS) | ⭕ | ✅ | ✅ |
| 数据转发:MQTT,HTTP,Kafka... | ⭕ | ✅ | ✅ |
| Geo地理位置支持 | ⭕ | ✅ | ✅ |
| 可视化图表配置 | ⭕ | ✅ | ✅ |
| OpenAPI | ⭕ | ✅ | ✅ |
| 多租户(建设中) | ⭕ | ✅ | ✅ |
| 集群支持 | ⭕ | ✅ | ✅ |
| QQ群技术支持 | ⭕ | ✅ | ✅ |
| 一对一技术支持 | ⭕ | ⭕ | ✅ |
| 微服务架构(建设中) | ⭕ | ⭕ | ✅ |
| 多租户(建设中) | ⭕ | ⭕ | ✅ |
| 统一认证(建设中) | ⭕ | ⭕ | ✅ |
| 选配业务模块(建设中) | ⭕ | ⭕ | ✅ |
| 定制开发 | ⭕ | ⭕ | ✅ |
| 商业限制 | 无 | 单个项目 | 无 |
| 定价 | 免费 | 联系我们 | 联系我们 |

View File

@ -1,7 +1,7 @@
version: '2'
services:
ui:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.1.1-RELEASE
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.2.0
container_name: jetlinks-ce-ui
ports:
- 9000:80
@ -12,7 +12,7 @@ services:
links:
- jetlinks:jetlinks
jetlinks:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.2-SNAPSHOT
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.3.0-SNAPSHOT
container_name: jetlinks-ce
ports:
- 8848:8848 # API端口

View File

@ -48,7 +48,7 @@ services:
POSTGRES_DB: jetlinks
TZ: Asia/Shanghai
ui:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.1.1-RELEASE
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.2.0
container_name: jetlinks-ce-ui
ports:
- 9000:80
@ -59,7 +59,7 @@ services:
links:
- jetlinks:jetlinks
jetlinks:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.2-SNAPSHOT
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.3.0-SNAPSHOT
container_name: jetlinks-ce
ports:
- 8848:8848 # API端口

View File

@ -0,0 +1,13 @@
package org.jetlinks.community;
import lombok.Getter;
@Getter
public class Version {
public static Version current = new Version();
private final String edition = "community";
private final String version = "1.3.0-SNAPSHOT";
}

View File

@ -9,6 +9,7 @@ import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.stats.Stats;
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponseSingleValue;
import java.util.List;
@ -97,11 +98,22 @@ public class AggregationResponseHandle {
bucket.setSum(sum(a));
} else if (a instanceof Stats) {
stats(bucket, a);
} else if (a instanceof ValueCount) {
bucket.setValueCount(count(a));
} else {
throw new UnsupportedOperationException("不支持的聚合类型");
}
}
public static <A extends Aggregation> MetricsResponseSingleValue count(A a) {
ValueCount max = (ValueCount) a;
return MetricsResponseSingleValue.builder()
.value(max.getValue())
.name(a.getName())
.valueAsString(max.getValueAsString())
.build();
}
public static <A extends Aggregation> MetricsResponseSingleValue avg(A a) {
Avg avg = (Avg) a;
return MetricsResponseSingleValue.builder()

View File

@ -44,31 +44,27 @@ public class Bucket {
private List<Bucket> buckets;
private double toNumber(double number) {
return (Double.isInfinite(number) || Double.isNaN(number)) ? 0 : number;
}
public Map<String, Number> toMap() {
Map<String, Number> map = new HashMap<>();
if (this.sum != null) {
map.put(sum.getName(), sum.getValue());
map.put(sum.getName(), toNumber(sum.getValue()));
}
if (this.valueCount != null) {
map.put(valueCount.getName(), valueCount.getValue());
map.put(valueCount.getName(), toNumber(valueCount.getValue()));
}
if (this.avg != null) {
map.put(avg.getName(), avg.getValue());
map.put(avg.getName(), toNumber(avg.getValue()));
}
if (this.min != null) {
map.put(min.getName(), min.getValue());
map.put(min.getName(), toNumber(min.getValue()));
}
if (this.max != null) {
map.put(max.getName(), max.getValue());
map.put(max.getName(), toNumber(max.getValue()));
}
//
// if (this.getBuckets() != null) {
// bucketFlatMap(this.getBuckets(), map);
// }
return map;
}
// private void bucketFlatMap(List<Bucket> buckets, Map<String, Number> map) {
// buckets.forEach(bucket -> map.putAll(bucket.toMap()));
// }
}

View File

@ -52,8 +52,8 @@ public class ElasticSearchConfiguration {
return new ElasticRestClient(client, client);
}
@Bean
public RestHighLevelClient elasticsearchRestHighLevelClient(ElasticRestClient client) {
@Bean(destroyMethod = "close")
public RestHighLevelClient restHighLevelClient(ElasticRestClient client) {
return client.getWriteClient();
}

View File

@ -27,6 +27,7 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.reactivestreams.Publisher;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
@ -47,6 +48,7 @@ import java.util.stream.Collectors;
**/
@Service
@Slf4j
@DependsOn("restHighLevelClient")
public class DefaultElasticSearchService implements ElasticSearchService {
private final ElasticRestClient restClient;

View File

@ -1,25 +1,22 @@
package org.jetlinks.community.gateway;
import com.alibaba.fastjson.JSON;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
public class DeviceMessageUtils {
@SuppressWarnings("all")
public static Optional<DeviceMessage> convert(TopicMessage message){
if (message.getMessage() instanceof EncodableMessage) {
Object nativeMessage = ((EncodableMessage) message.getMessage()).getNativePayload();
if (nativeMessage instanceof DeviceMessage) {
return Optional.of((DeviceMessage)nativeMessage);
} else if (nativeMessage instanceof Map) {
return MessageType.convertMessage(((Map<String, Object>) nativeMessage));
}
Object nativeMessage = message.convertMessage();
if (nativeMessage instanceof DeviceMessage) {
return Optional.of((DeviceMessage)nativeMessage);
} else if (nativeMessage instanceof Map) {
return MessageType.convertMessage(((Map<String, Object>) nativeMessage));
}
return MessageType.convertMessage(JSON.parseObject(message.getMessage().getPayload().toString(StandardCharsets.UTF_8)));
return Optional.empty();
}
}

View File

@ -1,9 +1,11 @@
package org.jetlinks.community.gateway;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBufUtil;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.rule.engine.executor.PayloadType;
import javax.annotation.Nonnull;
import java.nio.charset.StandardCharsets;
public interface TopicMessage {
@ -31,11 +33,17 @@ public interface TopicMessage {
if (getMessage() instanceof EncodableMessage) {
return ((EncodableMessage) getMessage()).getNativePayload();
}
if (getMessage().getPayloadType() == null) {
return getMessage().getBytes();
byte[] payload = getMessage().payloadAsBytes();
//maybe json
if (/* { }*/(payload[0] == 123 && payload[payload.length - 1] == 125)
|| /* [ ] */(payload[0] == 91 && payload[payload.length - 1] == 93)
) {
return JSON.parseObject(new String(payload));
}
return PayloadType.valueOf(getMessage().getPayloadType().name()).read(getMessage().getPayload());
if (ByteBufUtil.isText(getMessage().getPayload(), StandardCharsets.UTF_8)) {
return getMessage().payloadAsString();
}
return payload;
}
static TopicMessage of(String topic, EncodedMessage message) {

View File

@ -1,19 +1,21 @@
package org.jetlinks.community.gateway.rule;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.NotFoundException;
import org.jetlinks.community.gateway.MessageGatewayManager;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@Component
public class MessageGatewayRuleNode extends CommonExecutableRuleNodeFactoryStrategy<MessageGatewayRuleNodeConfig> {
public class MessageGatewayRuleNode implements TaskExecutorProvider {
private final MessageGatewayManager gatewayManager;
@ -26,43 +28,52 @@ public class MessageGatewayRuleNode extends CommonExecutableRuleNodeFactoryStrat
}
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, MessageGatewayRuleNodeConfig config) {
if (config.getType() == PubSubType.consumer) {
return Mono::just;
}
return ruleData -> gatewayManager
.getGateway(config.getGatewayId())
.switchIfEmpty(Mono.error(() -> new NotFoundException("消息网关[{" + config.getGatewayId() + "}]不存在")))
.flatMap(gateway -> config.convert(ruleData)
.flatMap(msg -> gateway.publish(msg, config.isShareCluster()))
.then())
.thenReturn(ruleData);
}
@Override
protected void onStarted(ExecutionContext context, MessageGatewayRuleNodeConfig config) {
super.onStarted(context, config);
if (config.getType() == PubSubType.producer) {
return;
}
//订阅网关中的消息
context.onStop(gatewayManager
.getGateway(config.getGatewayId())
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("消息网关[{" + config.getGatewayId() + "}]不存在")))
.flatMapMany(gateway -> gateway.subscribe(config.createTopics()))
.map(config::convert)
.flatMap(data -> context.getOutput().write(Mono.just(RuleData.create(data))))
.onErrorContinue((err, obj) -> {
context.logger().error(err.getMessage(), err);
})
.subscribe()::dispose);
}
@Override
public String getSupportType() {
public String getExecutor() {
return "message-gateway";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new MessageGatewayPubSubExecutor(context));
}
class MessageGatewayPubSubExecutor extends FunctionTaskExecutor {
MessageGatewayRuleNodeConfig config;
public MessageGatewayPubSubExecutor(ExecutionContext context) {
super("消息网关订阅发布", context);
this.config = FastBeanCopier.copy(context.getJob().getConfiguration(), MessageGatewayRuleNodeConfig.class);
this.config.validate();
}
@Override
protected Publisher<RuleData> apply(RuleData input) {
return gatewayManager
.getGateway(config.getGatewayId())
.switchIfEmpty(Mono.error(() -> new NotFoundException("消息网关[{" + config.getGatewayId() + "}]不存在")))
.flatMap(gateway -> config.convert(input)
.flatMap(msg -> gateway.publish(msg, config.isShareCluster()))
.then())
.thenReturn(input);
}
@Override
protected Disposable doStart() {
if (config.getType() == PubSubType.producer) {
return super.doStart();
}
//订阅网关中的消息
return gatewayManager
.getGateway(config.getGatewayId())
.switchIfEmpty(Mono.fromRunnable(() -> context.getLogger().error("消息网关[{" + config.getGatewayId() + "}]不存在")))
.flatMapMany(gateway -> gateway.subscribe(config.createTopics()))
.map(config::convert)
.flatMap(data -> context.getOutput().write(Mono.just(RuleData.create(data))))
.onErrorContinue((err, obj) -> {
context.getLogger().error(err.getMessage(), err);
})
.subscribe();
}
}
}

View File

@ -5,14 +5,12 @@ import lombok.Setter;
import org.jetlinks.community.gateway.TopicMessage;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
@Getter
@Setter
public class MessageGatewayRuleNodeConfig implements RuleNodeConfig {
public class MessageGatewayRuleNodeConfig {
private String gatewayId;
@ -35,7 +33,6 @@ public class MessageGatewayRuleNodeConfig implements RuleNodeConfig {
return topics.split("[,;\n]");
}
@Override
public void validate() {
Assert.hasText(gatewayId, "gatewayId can not be empty");
Assert.hasText(topics, "topics can not be empty");
@ -43,14 +40,4 @@ public class MessageGatewayRuleNodeConfig implements RuleNodeConfig {
}
@Override
public NodeType getNodeType() {
return NodeType.MAP;
}
@Override
public void setNodeType(NodeType nodeType) {
}
}

View File

@ -15,8 +15,8 @@ import java.util.StringJoiner;
import java.util.function.BiFunction;
class ProxyMessageListener implements MessageListener {
private Class<?> paramType;
private Object target;
private final Class<?> paramType;
private final Object target;
BiFunction<Object, Object, Object> proxy;
@ -72,11 +72,14 @@ class ProxyMessageListener implements MessageListener {
return message.getMessage().getPayload();
}
if (message.getMessage() instanceof EncodableMessage) {
Object payload = ((EncodableMessage) message.getMessage()).getNativePayload();
return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{});
Object payload = message.convertMessage();
if (paramType.isInstance(payload)) {
return payload;
}
return message;
if (payload instanceof byte[]) {
return payload;
}
return FastBeanCopier.DEFAULT_CONVERT.convert(payload, paramType, new Class[]{});
}

View File

@ -0,0 +1,39 @@
package org.jetlinks.community.network.mqtt.executor;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.utils.ExpressionUtils;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.springframework.util.Assert;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Getter
@Setter
public class MqttClientTaskConfiguration {
private String clientId;
private PayloadType payloadType = PayloadType.JSON;
private PubSubType[] clientType;
private List<String> topics;
private List<String> topicVariables;
public List<String> getTopics(Map<String, Object> vars) {
return topics.stream()
.map(topic -> ExpressionUtils.analytical(topic, vars, "spel")).collect(Collectors.toList());
}
public void validate() {
Assert.hasText(clientId, "clientId can not be empty");
Assert.notNull(clientType, "clientType can not be null");
Assert.notEmpty(topics, "topics can not be empty");
}
}

View File

@ -0,0 +1,131 @@
package org.jetlinks.community.network.mqtt.executor;
import lombok.AllArgsConstructor;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@Component
public class MqttClientTaskExecutorProvider implements TaskExecutorProvider {
private final NetworkManager networkManager;
static {
MqttRuleDataCodec.load();
}
@Override
public String getExecutor() {
return "mqtt-client";
}
protected Flux<MqttMessage> convertMessage(RuleData message, MqttClientTaskConfiguration config) {
return RuleDataCodecs.getCodec(MqttMessage.class)
.map(codec ->
codec.decode(message,
config.getPayloadType(),
new MqttTopics(config.getTopics(RuleDataHelper.toContextMap(message))))
.cast(MqttMessage.class))
.orElseThrow(() -> new UnsupportedOperationException("unsupported decode message:{}" + message));
}
protected Mono<RuleData> convertMessage(MqttMessage message, MqttClientTaskConfiguration config) {
return Mono.just(RuleDataCodecs.getCodec(MqttMessage.class)
.map(codec -> codec.encode(message, config.getPayloadType(), new TopicVariables(config.getTopicVariables())))
.map(RuleData::create)
.orElseGet(() -> RuleData.create(message)));
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new MqttClientTaskExecutor(context));
}
class MqttClientTaskExecutor extends AbstractTaskExecutor {
private MqttClientTaskConfiguration config;
public MqttClientTaskExecutor(ExecutionContext context) {
super(context);
reload();
}
@Override
public String getName() {
return "MQTT Client";
}
@Override
public void reload() {
config = FastBeanCopier.copy(context.getJob().getConfiguration(), new MqttClientTaskConfiguration());
config.validate();
if (disposable != null) {
disposable.dispose();
}
}
@Override
public void validate() {
FastBeanCopier
.copy(context.getJob().getConfiguration(), new MqttClientTaskConfiguration())
.validate();
}
@Override
protected Disposable doStart() {
Disposable.Composite disposable = Disposables.composite();
if (EnumDict.in(PubSubType.producer, config.getClientType())) {
disposable.add(context.getInput()
.accept()
.filter((data) -> state == Task.State.running)
.flatMap(data ->
networkManager
.<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
.flatMapMany(client -> convertMessage(data, config)
.flatMap(msg -> client
.publish(msg)
.doOnSuccess((v) -> context.getLogger().debug("推送MQTT[{}]消息:{}", client.getId(), msg))
)
).onErrorContinue((err, e) -> context.onError(err, null).subscribe())
)
.subscribe()
);
}
if (EnumDict.in(PubSubType.consumer, config.getClientType())) {
disposable.add(networkManager
.<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
.flatMapMany(client -> client.subscribe(config.getTopics()))
.filter((data) -> state == Task.State.running)
.doOnNext(message -> context.getLogger().info("consume mqtt message:{}", message))
.flatMap(message -> convertMessage(message, config))
.flatMap(ruleData -> context.getOutput().write(Mono.just(ruleData)).thenReturn(ruleData))
.flatMap(ruleData -> context.fireEvent(RuleConstants.Event.result, ruleData).thenReturn(ruleData))
.onErrorContinue((err, e) -> context.onError(err, null).subscribe())
.subscribe());
}
return disposable;
}
}
}

View File

@ -0,0 +1,111 @@
package org.jetlinks.community.network.mqtt.executor;
import io.netty.buffer.ByteBuf;
import org.apache.commons.collections4.CollectionUtils;
import org.jetlinks.core.message.codec.MessagePayloadType;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.jetlinks.supports.utils.MqttTopicUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public class MqttRuleDataCodec implements RuleDataCodec<MqttMessage> {
static {
MqttRuleDataCodec codec = new MqttRuleDataCodec();
// EncodedMessageCodec.register(DefaultTransport.MQTT, codec);
// EncodedMessageCodec.register(DefaultTransport.MQTT_TLS, codec);
RuleDataCodecs.register(MqttMessage.class, codec);
}
static void load() {
}
@Override
public Object encode(MqttMessage message, Feature... features) {
Map<String, Object> payload = new HashMap<>();
payload.put("topic", message.getTopic());
payload.put("will", message.isWill());
payload.put("qos", message.getQosLevel());
payload.put("dup", message.isDup());
payload.put("retain", message.isRetain());
PayloadType payloadType = Feature.find(PayloadType.class, features).orElse(PayloadType.JSON);
Feature.find(TopicVariables.class, features)
.map(TopicVariables::getVariables)
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> list.stream()
.map(str -> MqttTopicUtils.getPathVariables(str, message.getTopic()))
.reduce((m1, m2) -> {
m1.putAll(m2);
return m1;
}))
.ifPresent(vars -> payload.put("vars", vars));
payload.put("payloadType", payloadType.name());
payload.put("payload", payloadType.read(message.getPayload()));
payload.put("clientId", message.getClientId());
return payload;
}
@Override
public Flux<MqttMessage> decode(RuleData data, Feature... features) {
if (data.getData() instanceof MqttMessage) {
return Flux.just(((MqttMessage) data.getData()));
}
MqttTopics topics = Feature.find(MqttTopics.class, features).orElse(null);
return data
.dataToMap()
.filter(map -> map.containsKey("payload"))
.flatMap(map -> {
if (topics != null && !map.containsKey("topic")) {
return Flux.fromIterable(topics.getTopics())
.flatMap(topic -> {
Map<String, Object> copy = new HashMap<>();
copy.put("topic", topic);
copy.putAll(map);
return Mono.just(copy);
})
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("topic not set")));
}
return Flux.just(map);
})
.map(map -> {
PayloadType payloadType = Feature.find(PayloadType.class, features)
.orElseGet(() -> Optional.ofNullable(map.get("payloadType"))
.map(String::valueOf)
.map(PayloadType::valueOf)
.orElse(PayloadType.JSON));
Object payload = map.get("payload");
ByteBuf byteBuf = payloadType.write(payload);
Integer qos = (Integer) map.get("qos");
return SimpleMqttMessage
.builder()
.clientId((String) map.get("clientId"))
.topic((String) map.get("topic"))
.dup(Boolean.TRUE.equals(map.get("dup")))
.will(Boolean.TRUE.equals(map.get("will")))
.retain(Boolean.TRUE.equals(map.get("retain")))
.qosLevel(qos == null ? 0 : qos)
.payloadType(MessagePayloadType.valueOf(payloadType.name()))
.payload(byteBuf)
.build();
});
}
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.network.mqtt.executor;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import java.util.List;
@Getter
@AllArgsConstructor
public class MqttTopics implements RuleDataCodec.Feature {
private List<String> topics;
@Override
public String getId() {
return "mqtt-topic";
}
@Override
public String getName() {
return "MQTT Topics";
}
}

View File

@ -0,0 +1,13 @@
package org.jetlinks.community.network.mqtt.executor;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import java.util.List;
@Getter
@AllArgsConstructor
public class TopicVariables implements RuleDataCodec.Feature {
List<String> variables;
}

View File

@ -1,90 +0,0 @@
package org.jetlinks.community.network.mqtt.node;
import lombok.AllArgsConstructor;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.mqtt.*;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@AllArgsConstructor
@Component
public class MqttClientNode extends CommonExecutableRuleNodeFactoryStrategy<MqttClientConfiguration> {
private NetworkManager networkManager;
static {
try {
Class.forName("org.jetlinks.rule.engine.executor.node.mqtt.MqttRuleDataCodec");
} catch (ClassNotFoundException ignore) {
}
}
@Override
public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext context, MqttClientConfiguration config) {
if (!EnumDict.in(ClientType.producer, config.getClientType())) {
return Mono::just;
}
return ruleData -> networkManager
.<org.jetlinks.community.network.mqtt.client.MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
.flatMapMany(client -> this.convertMessage(ruleData, config).flatMap(client::publish))
.then(Mono.just(ruleData))
;
}
protected Flux<MqttMessage> convertMessage(RuleData message, MqttClientConfiguration config) {
return RuleDataCodecs.getCodec(MqttMessage.class)
.map(codec ->
codec.decode(message,
config.getPayloadType(),
new MqttTopics(config.getTopics(RuleDataHelper.toContextMap(message))))
.cast(MqttMessage.class))
.orElseThrow(() -> new UnsupportedOperationException("unsupported decode message:{}" + message));
}
protected Mono<RuleData> convertMessage(MqttMessage message, MqttClientConfiguration config) {
return Mono.just(RuleDataCodecs.getCodec(MqttMessage.class)
.map(codec -> codec.encode(message, config.getPayloadType(), new TopicVariables(config.getTopicVariables())))
.map(RuleData::create)
.orElseGet(() -> RuleData.create(message)));
}
@Override
protected void onStarted(ExecutionContext context, MqttClientConfiguration config) {
if (!EnumDict.in(ClientType.consumer, config.getClientType())) {
return;
}
context.onStop(networkManager
.<MqttClient>getNetwork(DefaultNetworkType.MQTT_CLIENT, config.getClientId())
.flatMapMany(client -> client.subscribe(config.getTopics()))
.doOnNext(message -> context.logger().info("consume mqtt message:{}", message))
.flatMap(message -> convertMessage(message, config))
.flatMap(ruleData -> context.getOutput().write(Mono.just(ruleData)).thenReturn(ruleData))
.flatMap(ruleData -> context.fireEvent(RuleEvent.NODE_EXECUTE_RESULT, ruleData).thenReturn(ruleData))
.onErrorContinue((err, e) -> context.onError(RuleData.create("consume mqtt message error"), err).subscribe())
.subscribe()::dispose);
}
@Override
public String getSupportType() {
return "mqtt-client";
}
}

View File

@ -1,7 +1,21 @@
package org.jetlinks.community.network;
public enum PubSubType {
import lombok.Getter;
import org.hswebframework.web.dict.EnumDict;
@Getter
public enum PubSubType implements EnumDict<String> {
producer,
consumer;
@Override
public String getValue() {
return name();
}
@Override
public String getText() {
return name();
}
}

View File

@ -30,14 +30,14 @@ public class VertxTcpClient extends AbstractTcpClient {
volatile PayloadParser payloadParser;
@Getter
private String id;
private final String id;
@Setter
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
private volatile long lastKeepAliveTime = System.currentTimeMillis();
private List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
@Override
public void keepAlive() {

View File

@ -44,23 +44,31 @@ import java.util.function.Function;
class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
@Getter
private String id;
private final String id;
private TcpServer tcpServer;
private final TcpServer tcpServer;
private String protocol;
private final String protocol;
private ProtocolSupports supports;
private final ProtocolSupports supports;
private DeviceRegistry registry;
private final DeviceRegistry registry;
private DecodedClientMessageHandler clientMessageHandler;
private final DecodedClientMessageHandler clientMessageHandler;
private DeviceSessionManager sessionManager;
private final DeviceSessionManager sessionManager;
private DeviceGatewayMonitor gatewayMonitor;
private final DeviceGatewayMonitor gatewayMonitor;
private LongAdder counter = new LongAdder();
private final LongAdder counter = new LongAdder();
private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
private final FluxSink<Message> sink = processor.sink();
private final AtomicBoolean started = new AtomicBoolean();
private final List<Disposable> disposable = new CopyOnWriteArrayList<>();
public TcpServerDeviceGateway(String id,
String protocol,
@ -80,12 +88,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
}
private EmitterProcessor<Message> processor = EmitterProcessor.create(false);
private FluxSink<Message> sink = processor.sink();
private AtomicBoolean started = new AtomicBoolean();
public Mono<ProtocolSupport> getProtocol() {
return supports.getProtocol(protocol);
}
@ -105,8 +107,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
return DefaultNetworkType.TCP_SERVER;
}
private List<Disposable> disposable = new CopyOnWriteArrayList<>();
private void doStart() {
if (started.getAndSet(true) || !disposable.isEmpty()) {
return;
@ -127,7 +127,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
client.subscribe()
.filter(r -> started.get())
.takeWhile(r -> disposable != null)
.takeWhile(r -> !disposable.isEmpty())
.doOnNext(r -> {
log.debug("收到TCP报文:\n{}", r);
gatewayMonitor.receivedMessage();
@ -165,11 +165,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
return getSession().getOperator();
}
}))
.switchIfEmpty(Mono.fromRunnable(() ->
log.warn("无法识别的TCP客户端[{}]消息:\n{}",
clientAddr,
tcpMessage
)))
.cast(DeviceMessage.class)
.flatMap(message -> registry
.getDevice(message.getDeviceId())
@ -182,20 +177,20 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
);
}))
.flatMap(device -> {
DeviceSession fSession = sessionRef.get() == null ?
sessionManager.getSession(device.getDeviceId()) :
sessionRef.get();
DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
//处理设备上线消息
if (message instanceof DeviceOnlineMessage) {
if (fSession == null) {
fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
String sessionId = device.getDeviceId();
fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
};
//保持设备一直在线.通过短连接上报数据的场景.可以让设备一直为在线状态
if (message.getHeader(Headers.keepOnline).orElse(false)) {
//保持设备一直在线.短连接上报数据的场景.可以让设备一直为在线状态
if (keepOnline) {
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
} else {
client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));

View File

@ -0,0 +1,25 @@
package org.jetlinks.community.network.tcp.executor;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.springframework.util.Assert;
@Getter
@Setter
public class TcpClientTaskConfiguration {
private String clientId;
private PubSubType type;
private PayloadType payloadType;
public void validate() {
Assert.hasText(clientId, "clientId can not be empty!");
Assert.notNull(type, "type can not be null!");
Assert.notNull(payloadType, "type can not be null!");
}
}

View File

@ -0,0 +1,109 @@
package org.jetlinks.community.network.tcp.executor;
import lombok.AllArgsConstructor;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@AllArgsConstructor
@Component
public class TcpClientTaskExecutorProvider implements TaskExecutorProvider {
private final NetworkManager clientManager;
static {
TcpMessageCodec.register();
}
@Override
public String getExecutor() {
return "tcp-client";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new TcpTaskExecutor(context));
}
class TcpTaskExecutor extends AbstractTaskExecutor {
private TcpClientTaskConfiguration config;
public TcpTaskExecutor(ExecutionContext context) {
super(context);
reload();
}
@Override
public String getName() {
return "Tcp Client";
}
@Override
public void reload() {
config = FastBeanCopier.copy(context.getJob().getConfiguration(), new TcpClientTaskConfiguration());
config.validate();
}
@Override
public void validate() {
FastBeanCopier
.copy(context.getJob().getConfiguration(), new TcpClientTaskConfiguration())
.validate();
}
@Override
protected Disposable doStart() {
Disposable.Composite disposable = Disposables.composite();
if (config.getType() == PubSubType.producer) {
disposable.add(context
.getInput()
.accept()
.flatMap(data ->
clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, config.getClientId())
.flatMapMany(client -> RuleDataCodecs
.getCodec(TcpMessage.class)
.map(codec -> codec.decode(data, config.getPayloadType())
.cast(TcpMessage.class)
.switchIfEmpty(Mono.fromRunnable(() -> context.getLogger().warn("can not decode rule data to tcp message:{}", data))))
.orElseGet(() -> Flux.just(new TcpMessage(config.getPayloadType().write(data.getData()))))
.flatMap(client::send)
.onErrorContinue((err, r) -> {
context.onError(err, data).subscribe();
})
.then()
)).subscribe()
)
;
}
if (config.getType() == PubSubType.consumer) {
disposable.add(clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT, config.getClientId())
.switchIfEmpty(Mono.fromRunnable(() -> context.getLogger().error("tcp client {} not found", config.getClientId())))
.flatMapMany(TcpClient::subscribe)
.doOnNext(msg -> context.getLogger().info("received tcp client message:{}", config.getPayloadType().read(msg.getPayload())))
.map(r -> RuleDataCodecs.getCodec(TcpMessage.class)
.map(codec -> codec.encode(r, config.getPayloadType()))
.orElse(r.getPayload()))
.flatMap(out -> context.getOutput().write(Mono.just(RuleData.create(out))))
.onErrorContinue((err, obj) -> context.getLogger().error("consume tcp message error", err))
.subscribe());
}
return disposable;
}
}
}

View File

@ -1,12 +1,10 @@
package org.jetlinks.community.network.tcp.node;
package org.jetlinks.community.network.tcp.executor;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodec;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.jetlinks.rule.engine.executor.node.device.EncodedMessageCodec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -19,9 +17,6 @@ public class TcpMessageCodec implements RuleDataCodec<TcpMessage> {
static {
RuleDataCodecs.register(TcpMessage.class, instance);
EncodedMessageCodec.register(DefaultTransport.TCP, instance);
EncodedMessageCodec.register(DefaultTransport.TCP_TLS, instance);
}
static void register() {

View File

@ -1,70 +0,0 @@
package org.jetlinks.community.network.tcp.node;
import lombok.AllArgsConstructor;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataCodecs;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@AllArgsConstructor
@Component
public class TcpClientNode extends CommonExecutableRuleNodeFactoryStrategy<TcpClientNodeConfig> {
private NetworkManager clientManager;
static {
TcpMessageCodec.register();
}
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, TcpClientNodeConfig config) {
if (config.getType() != PubSubType.producer) {
return Mono::just;
}
return data -> clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
.flatMapMany(client -> RuleDataCodecs
.getCodec(TcpMessage.class)
.map(codec -> codec.decode(data, config.getPayloadType())
.cast(TcpMessage.class)
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().warn("can not decode rule data to tcp message:{}", data))))
.orElseGet(() -> Flux.just(new TcpMessage(config.getPayloadType().write(data.getData()))))
.flatMap(client::send)
.all(r-> r))
;
}
@Override
protected void onStarted(ExecutionContext context, TcpClientNodeConfig config) {
super.onStarted(context, config);
if (config.getType() == PubSubType.consumer) {
context.onStop( clientManager.<TcpClient>getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId())
.switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("tcp client {} not found", config.getClientId())))
.flatMapMany(TcpClient::subscribe)
.doOnNext(msg -> context.logger().info("received tcp client message:{}", config.getPayloadType().read(msg.getPayload())))
.map(r -> RuleDataCodecs.getCodec(TcpMessage.class)
.map(codec -> codec.encode(r, config.getPayloadType()))
.orElse(r.getPayload()))
.onErrorContinue((err, obj) -> {
context.logger().error("consume tcp message error", err);
})
.subscribe(msg -> context.getOutput().write(Mono.just(RuleData.create(msg))).subscribe())::dispose);
}
}
@Override
public String getSupportType() {
return "tcp-client";
}
}

View File

@ -1,38 +0,0 @@
package org.jetlinks.community.network.tcp.node;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.network.PubSubType;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.springframework.util.Assert;
@Getter
@Setter
public class TcpClientNodeConfig implements RuleNodeConfig {
private String clientId;
private PubSubType type;
private PayloadType payloadType;
@Override
public NodeType getNodeType() {
return NodeType.MAP;
}
@Override
public void setNodeType(NodeType nodeType) {
}
@Override
public void validate() {
Assert.hasText(clientId, "clientId can not be empty!");
Assert.notNull(type, "type can not be null!");
Assert.notNull(payloadType, "payloadType can not be null!");
}
}

View File

@ -1,8 +1,6 @@
package org.jetlinks.community.notify.event;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.*;
import org.jetlinks.community.notify.template.Template;
import javax.annotation.Nonnull;
@ -12,6 +10,8 @@ import java.util.Map;
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SerializableNotifierEvent {
private boolean success;

View File

@ -1,48 +0,0 @@
package org.jetlinks.community.notify.rule;
import lombok.AllArgsConstructor;
import org.jetlinks.core.Values;
import org.jetlinks.community.notify.NotifierManager;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@Component
@AllArgsConstructor
public class NotifierRuleNode extends CommonExecutableRuleNodeFactoryStrategy<RuleNotifierProperties> {
private NotifierManager notifierManager;
@Override
public String getSupportType() {
return "notifier";
}
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, RuleNotifierProperties config) {
return rule -> notifierManager
.getNotifier(config.getNotifyType(), config.getNotifierId())
.switchIfEmpty(Mono.fromRunnable(() -> {
context.logger().warn("通知器[{}-{}]不存在", config.getNodeType(), config.getNotifierId());
}))
.flatMap(notifier -> notifier.send(config.getTemplateId(), Values.of(RuleDataHelper.toContextMap(rule))))
.doOnError(err -> {
context.logger().error("发送[{}]通知[{}-{}]失败",
config.getNotifyType().getName(),
config.getNotifierId(),
config.getTemplateId(), err);
})
.doOnSuccess(ignore -> {
context.logger().info("发送[{}]通知[{}-{}]完成",
config.getNotifyType().getName(),
config.getNotifierId(),
config.getTemplateId());
});
}
}

View File

@ -0,0 +1,66 @@
package org.jetlinks.community.notify.rule;
import lombok.AllArgsConstructor;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.notify.NotifierManager;
import org.jetlinks.core.Values;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@Component
@AllArgsConstructor
public class NotifierTaskExecutorProvider implements TaskExecutorProvider {
private final NotifierManager notifierManager;
@Override
public String getExecutor() {
return "notifier";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
RuleNotifierProperties properties = FastBeanCopier.copy(context.getJob().getConfiguration(), RuleNotifierProperties.class);
properties.validate();
Function<RuleData, Publisher<RuleData>> executor = createExecutor(context, properties);
return Mono.just(new FunctionTaskExecutor("消息通知", context) {
@Override
protected Publisher<RuleData> apply(RuleData input) {
return executor.apply(input);
}
});
}
public Function<RuleData, Publisher<RuleData>> createExecutor(ExecutionContext context, RuleNotifierProperties config) {
return rule -> notifierManager
.getNotifier(config.getNotifyType(), config.getNotifierId())
.switchIfEmpty(Mono.fromRunnable(() -> {
context.getLogger().warn("通知器[{}-{}]不存在", config.getNotifyType(), config.getNotifierId());
}))
.flatMap(notifier -> notifier.send(config.getTemplateId(), Values.of(RuleDataHelper.toContextMap(rule))))
.doOnError(err -> {
context.getLogger().error("发送[{}]通知[{}-{}]失败",
config.getNotifyType().getName(),
config.getNotifierId(),
config.getTemplateId(), err);
})
.doOnSuccess(ignore -> {
context.getLogger().info("发送[{}]通知[{}-{}]完成",
config.getNotifyType().getName(),
config.getNotifierId(),
config.getTemplateId());
}).then(Mono.empty());
}
}

View File

@ -3,13 +3,11 @@ package org.jetlinks.community.notify.rule;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.notify.DefaultNotifyType;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.springframework.util.Assert;
@Getter
@Setter
public class RuleNotifierProperties implements RuleNodeConfig {
public class RuleNotifierProperties {
private DefaultNotifyType notifyType;
@ -17,17 +15,6 @@ public class RuleNotifierProperties implements RuleNodeConfig {
private String templateId;
@Override
public NodeType getNodeType() {
return NodeType.PEEK;
}
@Override
public void setNodeType(NodeType nodeType) {
}
@Override
public void validate() {
Assert.notNull(notifyType,"notifyType can not be null");
Assert.hasText(notifierId,"notifierId can not be empty");

View File

@ -13,6 +13,11 @@
<artifactId>rule-engine-component</artifactId>
<dependencies>
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.0.2</version>
</dependency>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>rule-engine-support</artifactId>

View File

@ -0,0 +1,42 @@
package org.jetlinks.community.rule.engine.configuration;
import io.netty.buffer.ByteBuf;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.community.gateway.EncodableMessage;
import org.jetlinks.community.gateway.TopicMessage;
import org.jetlinks.rule.engine.api.NativePayload;
import org.jetlinks.rule.engine.api.SubscribePayload;
import javax.annotation.Nonnull;
@Getter
@Setter
public class EventTopicMessage implements TopicMessage, EncodableMessage {
private String topic;
private Object nativePayload;
private SubscribePayload payload;
public EventTopicMessage(SubscribePayload payload) {
this.topic = payload.getTopic();
this.nativePayload = ((NativePayload) payload.getPayload()).getNativeObject();
this.payload = payload;
}
@Nonnull
@Override
public ByteBuf getPayload() {
return payload.getBody();
}
@Nonnull
@Override
public EncodedMessage getMessage() {
return this;
}
}

View File

@ -1,33 +1,36 @@
package org.jetlinks.community.rule.engine.configuration;
import org.jetlinks.community.rule.engine.nodes.TimerWorkerNode;
import org.jetlinks.rule.engine.api.ConditionEvaluator;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.Slf4jLogger;
import org.jetlinks.rule.engine.api.executor.ExecutableRuleNodeFactory;
import org.jetlinks.rule.engine.cluster.logger.ClusterLogger;
import org.jetlinks.rule.engine.api.rpc.RpcService;
import org.jetlinks.rule.engine.api.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.condition.ConditionEvaluatorStrategy;
import org.jetlinks.rule.engine.condition.DefaultConditionEvaluator;
import org.jetlinks.rule.engine.condition.supports.DefaultScriptEvaluator;
import org.jetlinks.rule.engine.condition.supports.ScriptConditionEvaluatorStrategy;
import org.jetlinks.rule.engine.condition.supports.ScriptEvaluator;
import org.jetlinks.rule.engine.executor.DefaultExecutableRuleNodeFactory;
import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.route.RouteEventNode;
import org.jetlinks.rule.engine.defaults.DefaultRuleEngine;
import org.jetlinks.rule.engine.defaults.LocalEventBus;
import org.jetlinks.rule.engine.defaults.LocalScheduler;
import org.jetlinks.rule.engine.defaults.LocalWorker;
import org.jetlinks.rule.engine.defaults.rpc.DefaultRpcServiceFactory;
import org.jetlinks.rule.engine.defaults.rpc.EventBusRcpService;
import org.jetlinks.rule.engine.model.DefaultRuleModelParser;
import org.jetlinks.rule.engine.model.RuleModelParserStrategy;
import org.jetlinks.rule.engine.model.antv.AntVG6RuleModelParserStrategy;
import org.jetlinks.rule.engine.standalone.StandaloneRuleEngine;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
@Configuration
@Slf4j
public class RuleEngineConfiguration {
@Bean
@ -40,20 +43,46 @@ public class RuleEngineConfiguration {
return new DefaultConditionEvaluator();
}
@Bean
public DefaultExecutableRuleNodeFactory defaultExecutableRuleNodeFactory() {
return new DefaultExecutableRuleNodeFactory();
}
@Bean
public AntVG6RuleModelParserStrategy antVG6RuleModelParserStrategy() {
return new AntVG6RuleModelParserStrategy();
}
@Bean
public EventBus eventBus(MessageGateway messageGateway) {
LocalEventBus local = new LocalEventBus();
//转发到消息网关
local.subscribe("/**")
.flatMap(subscribePayload -> messageGateway.publish(new EventTopicMessage(subscribePayload)).then())
.onErrorContinue((err, obj) -> log.error(err.getMessage(), obj))
.subscribe();
return local;
}
@Bean
public RpcService rpcService(EventBus eventBus) {
return new EventBusRcpService(eventBus);
}
@Bean
public RpcServiceFactory rpcServiceFactory(RpcService rpcService) {
return new DefaultRpcServiceFactory(rpcService);
}
@Bean
public Scheduler localScheduler(Worker worker) {
LocalScheduler scheduler = new LocalScheduler("local");
scheduler.addWorker(worker);
return scheduler;
}
@Bean
public BeanPostProcessor autoRegisterStrategy(DefaultRuleModelParser defaultRuleModelParser,
DefaultConditionEvaluator defaultConditionEvaluator,
DefaultExecutableRuleNodeFactory ruleNodeFactory) {
LocalWorker worker) {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
@ -69,9 +98,10 @@ public class RuleEngineConfiguration {
if (bean instanceof ConditionEvaluatorStrategy) {
defaultConditionEvaluator.register(((ConditionEvaluatorStrategy) bean));
}
if (bean instanceof ExecutableRuleNodeFactoryStrategy) {
ruleNodeFactory.registerStrategy(((ExecutableRuleNodeFactoryStrategy) bean));
if (bean instanceof TaskExecutorProvider) {
worker.addExecutor(((TaskExecutorProvider) bean));
}
return bean;
}
};
@ -88,37 +118,14 @@ public class RuleEngineConfiguration {
}
@Bean
public RuleEngine ruleEngine(ExecutableRuleNodeFactory ruleNodeFactory,
ConditionEvaluator conditionEvaluator,
ApplicationEventPublisher eventPublisher,
ExecutorService executorService) {
StandaloneRuleEngine ruleEngine = new StandaloneRuleEngine();
ruleEngine.setNodeFactory(ruleNodeFactory);
ruleEngine.setExecutor(executorService);
ruleEngine.setEvaluator(conditionEvaluator);
ruleEngine.setEventListener(eventPublisher::publishEvent);
ruleEngine.setLoggerSupplier((ctxId, model) -> {
ClusterLogger logger = new ClusterLogger();
logger.setParent(new Slf4jLogger("rule.engine.logger.".concat(model.getId()).concat(".").concat(model.getName())));
logger.setLogInfoConsumer(eventPublisher::publishEvent);
logger.setNodeId(model.getId());
logger.setInstanceId(ctxId);
return logger;
});
return ruleEngine;
public LocalWorker localWorker(EventBus eventBus, ConditionEvaluator evaluator) {
return new LocalWorker("local", "local", eventBus, evaluator);
}
/* 规则引擎节点 */
@Bean //定时调度
public TimerWorkerNode timerWorkerNode() {
return new TimerWorkerNode();
}
@Bean
public RouteEventNode routeEventNode() {
return new RouteEventNode();
public RuleEngine defaultRuleEngine(Scheduler scheduler) {
return new DefaultRuleEngine(scheduler);
}
}

View File

@ -1,64 +0,0 @@
package org.jetlinks.community.rule.engine.event.handler;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.rule.engine.api.events.NodeExecuteEvent;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.cluster.logger.LogInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@Order(3)
public class RuleLogHandler {
@Autowired
private ElasticSearchService elasticSearchService;
@Autowired
private MessageGateway messageGateway;
@EventListener
public void handleRuleLog(LogInfo event) {
RuleEngineExecuteLogInfo logInfo = FastBeanCopier.copy(event, new RuleEngineExecuteLogInfo());
elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_LOG, logInfo)
.subscribe();
// /rule-engine/{instanceId}/{nodeId}/log
messageGateway
.publish(String.join("/",
"/rule-engine",
event.getInstanceId(),
event.getNodeId(),
"log"), logInfo, true)
.subscribe();
}
@EventListener
public void handleRuleExecuteEvent(NodeExecuteEvent event) {
//不记录BEFORE和RESULT事件
if (!RuleEvent.NODE_EXECUTE_BEFORE.equals(event.getEvent())
&& !RuleEvent.NODE_EXECUTE_RESULT.equals(event.getEvent())) {
RuleEngineExecuteEventInfo eventInfo = FastBeanCopier.copy(event, new RuleEngineExecuteEventInfo());
elasticSearchService.commit(RuleEngineLoggerIndexProvider.RULE_EVENT_LOG, eventInfo)
.subscribe();
}
// /rule-engine/{instanceId}/{nodeId}/event/{eventType}
messageGateway
.publish(String.join("/",
"/rule-engine",
event.getInstanceId(),
event.getNodeId(),
"event",
event.getEvent().toLowerCase()
), event, true)
.subscribe();
}
}

View File

@ -1,4 +1,4 @@
package org.jetlinks.community.rule.engine.nodes;
package org.jetlinks.community.rule.engine.executor;
import lombok.Getter;
import lombok.Setter;
@ -6,18 +6,15 @@ import lombok.SneakyThrows;
import org.hswebframework.web.bean.Converter;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.utils.ExpressionUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.reactivestreams.Publisher;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.LambdaTaskExecutor;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.math.BigDecimal;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -25,31 +22,36 @@ import java.util.stream.Collectors;
* @since 1.0.0
*/
@Component
public class DataMappingWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<DataMappingWorkerNode.Config> {
public class DataMappingTaskExecutorProvider implements TaskExecutorProvider {
public static Converter converter = FastBeanCopier.DEFAULT_CONVERT;
@Override
public String getSupportType() {
public String getExecutor() {
return "data-mapping";
}
@Override
public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext context, Config config) {
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return ruleData -> Mono.just(config.mapping(convertObject(ruleData.getData())));
return Mono.just(new LambdaTaskExecutor("Mapping", context, () -> {
Config config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
return data -> Mono.just(data.newData(config.mapping(data.getData())));
}));
}
@Getter
@Setter
public static class Config implements RuleNodeConfig {
public static class Config {
private List<Mapping> mappings = new ArrayList<>();
private boolean keepSourceData = false;
private NodeType nodeType;
private Map<String, Object> toMap(Object source) {
return FastBeanCopier.copy(source, HashMap::new);
}
@ -62,10 +64,10 @@ public class DataMappingWorkerNode extends CommonExecutableRuleNodeFactoryStrate
if (data instanceof Collection) {
Collection<Object> source = ((Collection) data);
return source
.stream()
.map(this::toMap)
.map(this::doMapping)
.collect(Collectors.toList());
.stream()
.map(this::toMap)
.map(this::doMapping)
.collect(Collectors.toList());
}
return data;
}

View File

@ -1,22 +1,25 @@
package org.jetlinks.community.rule.engine.nodes;
package org.jetlinks.community.rule.engine.executor;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -24,18 +27,35 @@ import reactor.core.scheduler.Schedulers;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@AllArgsConstructor
@Component
public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceMessageSendNode.Config> {
@AllArgsConstructor
public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvider {
private final DeviceRegistry registry;
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Config config) {
public String getExecutor() {
return "device-message-sender";
}
return data -> {
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new DeviceMessageSendTaskExecutor(context));
}
class DeviceMessageSendTaskExecutor extends FunctionTaskExecutor {
private Config config;
public DeviceMessageSendTaskExecutor(ExecutionContext context) {
super("发送设备消息", context);
validate();
reload();
}
@Override
protected Publisher<RuleData> apply(RuleData input) {
Flux<DeviceOperator> devices = StringUtils.hasText(config.getDeviceId())
? registry.getDevice(config.getDeviceId()).flux()
: registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices);
@ -44,18 +64,32 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
.filterWhen(DeviceOperator::isOnline)
.publishOn(Schedulers.parallel())
.flatMap(config::doSend)
.onErrorResume(error -> context.onError(data, error).then(Mono.empty()));
};
.onErrorResume(error -> context.onError(error, input).then(Mono.empty()))
.map(reply -> input.newData(reply.toJson()))
;
}
@Override
public void validate() {
if (CollectionUtils.isEmpty(context.getJob().getConfiguration())) {
throw new IllegalArgumentException("配置不能为空");
}
Config config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
config.validate();
}
@Override
public void reload() {
config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
}
}
@Override
public String getSupportType() {
return "device-message-sender";
}
@Getter
@Setter
public static class Config implements RuleNodeConfig {
public static class Config {
//设备ID
private String deviceId;
@ -67,7 +101,8 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
private boolean async;
public Publisher<?> doSend(DeviceOperator device) {
@SuppressWarnings("all")
public Publisher<DeviceMessageReply> doSend(DeviceOperator device) {
Map<String, Object> message = new HashMap<>(this.message);
message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
message.put("deviceId", device.getDeviceId());
@ -78,7 +113,6 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
.flatMapMany(msg -> device.messageSender().send(Mono.just(msg)));
}
@Override
public void validate() {
if (StringUtils.isEmpty(deviceId) && StringUtils.isEmpty(productId)) {
throw new IllegalArgumentException("deviceId和productId不能同时为空");
@ -86,15 +120,5 @@ public class DeviceMessageSendNode extends CommonExecutableRuleNodeFactoryStrate
MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式"));
}
@Override
public NodeType getNodeType() {
return NodeType.MAP;
}
@Override
public void setNodeType(NodeType nodeType) {
}
}
}

View File

@ -0,0 +1,125 @@
package org.jetlinks.community.rule.engine.executor;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.Subscription;
import org.jetlinks.community.gateway.TopicMessage;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Component
@AllArgsConstructor
public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
private final MessageGateway messageGateway;
@Override
public String getExecutor() {
return "reactor-ql";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new ReactorQLTaskExecutor(context));
}
class ReactorQLTaskExecutor extends AbstractTaskExecutor {
private ReactorQL reactorQL;
public ReactorQLTaskExecutor(ExecutionContext context) {
super(context);
reactorQL = createQl();
}
@Override
public String getName() {
return "ReactorQL";
}
@Override
protected Disposable doStart() {
Disposable.Composite composite = Disposables.composite();
Flux<Map<String, Object>> dataStream;
//有上游节点
if (!CollectionUtils.isEmpty(context.getJob().getInputs())) {
dataStream = context.getInput()
.accept()
.map(RuleDataHelper::toContextMap)
.as(reactorQL::start)
;
} else {
dataStream = reactorQL
.start(table -> {
if (table == null || table.equalsIgnoreCase("dual")) {
return Flux.just(1);
}
if (table.startsWith("/")) {
//转换为消息
return messageGateway
.subscribe(
Collections.singleton(new Subscription(table)),
"rule-engine:".concat(context.getInstanceId()),
false)
.map(TopicMessage::convertMessage);
}
return Flux.just(1);
});
}
return dataStream
.flatMap(result -> {
RuleData data = context.newRuleData(result);
//输出到下一节点
return context.getOutput()
.write(Mono.just(data))
.then(context.fireEvent(RuleConstants.Event.result, data));
})
.onErrorResume(err -> context.onError(err, null))
.subscribe();
}
protected ReactorQL createQl() {
ReactorQL.Builder builder = Optional.ofNullable(context.getJob().getConfiguration())
.map(map -> map.get("sql"))
.map(String::valueOf)
.map(ReactorQL.builder()::sql)
.orElseThrow(() -> new IllegalArgumentException("配置sql错误"));
return builder.build();
}
@Override
public void reload() {
reactorQL = createQl();
if (this.disposable != null) {
this.disposable.dispose();
}
start();
}
@Override
public void validate() {
createQl();
}
}
}

View File

@ -1,4 +1,4 @@
package org.jetlinks.community.rule.engine.nodes;
package org.jetlinks.community.rule.engine.executor;
import lombok.Getter;
import lombok.Setter;
@ -7,11 +7,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.expands.script.engine.DynamicScriptEngine;
import org.hswebframework.expands.script.engine.DynamicScriptEngineFactory;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.LambdaTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@ -24,14 +26,22 @@ import java.util.function.Function;
@Component
@Slf4j
public class ScriptWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<ScriptWorkerNode.Config> {
public class ScriptTaskExecutorProvider implements TaskExecutorProvider {
@Override
public String getSupportType() {
public String getExecutor() {
return "script";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new LambdaTaskExecutor("script", context, () -> {
return createExecutor(context, FastBeanCopier.copy(context.getJob().getConfiguration(), new Config()));
}));
}
@SneakyThrows
public Function<RuleData, Publisher<?>> createExecutor(ExecutionContext context, Config config) {
@ -54,16 +64,16 @@ public class ScriptWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<Sc
scriptContext.put("handler", handler);
engine.execute(id, scriptContext).getIfSuccess();
return ruleData -> Flux.defer(()->{
return ruleData -> Flux.defer(() -> {
if (handler.onMessage != null) {
Object result = handler.onMessage.apply(ruleData);
if (result == null || result.getClass().getName().equals("jdk.nashorn.internal.runtime.Undefined")) {
return Flux.empty();
}
if(result instanceof Publisher){
return Flux.from(((Publisher) result));
if (result instanceof Publisher) {
return Flux.from(((Publisher<?>) result));
}
if(result instanceof Map){
if (result instanceof Map) {
result = new HashMap<>((Map<?, ?>) result);
}
return Flux.just(result);
@ -82,13 +92,11 @@ public class ScriptWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<Sc
@Getter
@Setter
public static class Config implements RuleNodeConfig {
public static class Config {
private String lang = "js";
private String script;
private NodeType nodeType;
}
}

View File

@ -1,4 +1,4 @@
package org.jetlinks.community.rule.engine.nodes;
package org.jetlinks.community.rule.engine.executor;
import lombok.Getter;
import lombok.Setter;
@ -6,12 +6,15 @@ import lombok.SneakyThrows;
import org.hswebframework.ezorm.rdb.executor.SqlRequests;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.utils.ExpressionUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.LambdaTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -19,31 +22,28 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@Component
public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<SqlExecutorWorkerNode.Config> {
public class SqlExecutorTaskExecutorProvider implements TaskExecutorProvider {
@Autowired
private ReactiveSqlExecutor sqlExecutor;
@Override
public String getSupportType() {
public String getExecutor() {
return "sql";
}
@Override
public Function<RuleData, Publisher<Object>> createExecutor(ExecutionContext context, Config config) {
public Function<RuleData, Publisher<?>> createExecutor(ExecutionContext context, Config config) {
if (config.isQuery()) {
return (data) -> Flux.defer(() -> {
String sql = config.getSql(data);
List<Flux<Map<String, Object>>> fluxes = new ArrayList<>();
data.acceptMap(map -> fluxes.add(sqlExecutor.select(Mono.just(SqlRequests.template(sql, map)), ResultWrappers.map())));
return Flux.concat(fluxes) ;
return Flux.concat(fluxes);
});
} else {
return data -> Mono.defer(() -> {
@ -57,10 +57,15 @@ public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrate
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new LambdaTaskExecutor("SQL",context, () -> createExecutor(context, FastBeanCopier.copy(context.getJob().getConfiguration(),new Config()))));
}
@Getter
@Setter
public static class Config implements RuleNodeConfig {
public static class Config {
private String dataSourceId;
@ -75,7 +80,7 @@ public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrate
public boolean isQuery() {
return sql.trim().startsWith("SELECT") ||
sql.trim().startsWith("select");
sql.trim().startsWith("select");
}
@SneakyThrows
@ -83,20 +88,10 @@ public class SqlExecutorWorkerNode extends CommonExecutableRuleNodeFactoryStrate
if (!sql.contains("${")) {
return sql;
}
Map<String, Object> map = new HashMap<>();
map.put("data", data.getData());
map.put("ruleData", data);
map.put("attr", data.getAttributes());
return ExpressionUtils.analytical(sql, map, "spel");
return ExpressionUtils.analytical(sql, RuleDataHelper.toContextMap(data), "spel");
}
public void switchDataSource() {
}
public void resetDataSource() {
}
}
}

View File

@ -0,0 +1,125 @@
package org.jetlinks.community.rule.engine.executor;
import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import lombok.AllArgsConstructor;
import org.jetlinks.community.ValueObject;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import java.time.Duration;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.function.Supplier;
@Component
@AllArgsConstructor
public class TimerTaskExecutorProvider implements TaskExecutorProvider {
private final Scheduler scheduler;
@Override
public String getExecutor() {
return "timer";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new TimerTaskExecutor(context));
}
class TimerTaskExecutor extends AbstractTaskExecutor {
Supplier<Duration> nextDelay;
public TimerTaskExecutor(ExecutionContext context) {
super(context);
nextDelay = createNextDelay();
}
@Override
public String getName() {
return "定时调度";
}
@Override
protected Disposable doStart() {
return execute();
}
private Disposable execute() {
Duration nextTime = nextDelay.get();
context.getLogger().debug("trigger timed task after {}", nextTime);
if (this.disposable != null) {
this.disposable.dispose();
}
return this.disposable =
Mono.delay(nextTime, scheduler)
.flatMap(t -> context.getOutput().write(Mono.just(context.newRuleData(t))))
.then(context.fireEvent(RuleConstants.Event.complete, context.newRuleData(System.currentTimeMillis())).thenReturn(1))
.subscribe(t -> execute());
}
@Override
public void reload() {
nextDelay = createNextDelay();
if (disposable != null) {
disposable.dispose();
}
doStart();
}
@Override
public void validate() {
createNextDelay();
}
private Supplier<Duration> createNextDelay() {
ValueObject config = ValueObject.of(context.getJob().getConfiguration());
CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
Cron cron = config.getString("cron")
.map(parser::parse)
.orElseThrow(() -> new IllegalArgumentException("cron配置不存在"));
ExecutionTime executionTime = ExecutionTime.forCron(cron);
return () -> executionTime.timeToNextExecution(ZonedDateTime.now()).orElse(Duration.ofSeconds(10));
}
}
public static Flux<ZonedDateTime> getLastExecuteTimes(String cronExpression, Date from, long times) {
return Flux.create(sink -> {
CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
Cron cron = parser.parse(cronExpression);
ExecutionTime executionTime = ExecutionTime.forCron(cron);
ZonedDateTime dateTime = ZonedDateTime.ofInstant(from.toInstant(), ZoneId.systemDefault());
for (long i = 0; i < times; i++) {
dateTime = executionTime.nextExecution(dateTime)
.orElse(null);
if (dateTime != null) {
sink.next(dateTime);
} else {
break;
}
}
sink.complete();
});
}
}

View File

@ -1,134 +0,0 @@
package org.jetlinks.community.rule.engine.nodes;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.message.codec.MessagePayloadType;
import org.jetlinks.community.gateway.EncodableMessage;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.Subscription;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.function.Function;
/**
* <pre>
* {@code
*
* select avg(this.temperature) avgVal, deviceId
* from "/device/+/message/property/#"
* group _window(10,1) --每10条滚动数据
* having avgVal > 10
*
* }
* </pre>
*/
@Slf4j
@AllArgsConstructor
@Component
public class ReactorSqlNode extends CommonExecutableRuleNodeFactoryStrategy<ReactorSqlNode.Config> {
private final MessageGateway messageGateway;
@Override
public String getSupportType() {
return "reactor-ql";
}
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Config config) {
ReactorQL ql = config.getReactorQL();
return data -> ql.start(Flux.just(RuleDataHelper.toContextMap(data)));
}
@Override
protected void onStarted(ExecutionContext context, Config config) {
log.debug("start reactor ql : {}", config.getSql());
context.onStop(
config.getReactorQL()
.start(table -> {
if (table == null || table.equalsIgnoreCase("dual")) {
return Flux.just(1);
}
if (table.startsWith("/")) {
return messageGateway
.subscribe(
Collections.singleton(new Subscription(table)),
"rule-engine:".concat(context.getInstanceId()),
false)
.map(msg -> {
//转换为消息
if (msg.getMessage() instanceof EncodableMessage) {
return ((EncodableMessage) msg.getMessage()).getNativePayload();
}
MessagePayloadType payloadType = msg.getMessage().getPayloadType();
if (payloadType == null) {
return msg.getMessage().getBytes();
}
return PayloadType.valueOf(payloadType.name()).read(msg.getMessage().getPayload());
});
}
return Flux.just(1);
})
.flatMap(result -> {
RuleData data = RuleData.create(result);
//输出到下一节点
return context.getOutput()
.write(Mono.just(RuleData.create(result)))
.then(context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, data));
})
.onErrorResume(err -> context.onError(RuleData.create(""), err))
.subscribe()::dispose
);
}
public static class Config implements RuleNodeConfig {
@Getter
@Setter
private String sql;
private volatile ReactorQL reactorQL;
@Override
public NodeType getNodeType() {
return NodeType.MAP;
}
@Override
public void setNodeType(NodeType nodeType) {
}
public ReactorQL getReactorQL() {
if (reactorQL == null) {
reactorQL = ReactorQL.builder().sql(sql).build();
}
return reactorQL;
}
@Override
public void validate() {
//不报错就ok
getReactorQL();
}
}
}

View File

@ -1,129 +0,0 @@
package org.jetlinks.community.rule.engine.nodes;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.reactivestreams.Publisher;
import org.springframework.scheduling.support.CronSequenceGenerator;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class TimerWorkerNode extends CommonExecutableRuleNodeFactoryStrategy<TimerWorkerNode.Configuration> {
private Map<String, TimerWorkerNode.TimerJob> jobs = new ConcurrentHashMap<>();
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, Configuration config) {
return Mono::just;
}
@Override
protected void onStarted(ExecutionContext context, Configuration config) {
super.onStarted(context, config);
String id = context.getInstanceId() + ":" + context.getNodeId();
context.onStop(() -> {
TimerJob job = jobs.remove(id);
if (null != job) {
job.cancel();
}
});
TimerJob job = jobs.computeIfAbsent(id, _id -> new TimerJob(config, context));
job.start();
}
@Override
public String getSupportType() {
return "timer";
}
@AllArgsConstructor
private static class TimerJob {
private String id;
private TimerWorkerNode.Configuration configuration;
private ExecutionContext context;
private volatile boolean running;
TimerJob(TimerWorkerNode.Configuration configuration,
ExecutionContext context) {
this.configuration = configuration;
this.context = context;
this.id = context.getInstanceId() + ":" + context.getNodeId();
}
void start() {
running = true;
doStart();
}
void doStart() {
if (!running) {
return;
}
running = true;
Mono.delay(Duration.ofMillis(configuration.nextMillis()))
.subscribe(t -> execute(this::doStart));
}
void execute(Runnable runnable) {
if (!running) {
return;
}
context.logger().debug("execute timer:{}", id);
context.getOutput()
.write(Mono.just(RuleData.create(System.currentTimeMillis())))
.doOnError(err -> context.logger().error("fire timer error", err))
.doFinally(s -> runnable.run())
.subscribe();
}
void cancel() {
running = false;
}
}
public static class Configuration implements RuleNodeConfig {
@Getter
@Setter
private String cron;
private volatile CronSequenceGenerator generator;
@Override
public NodeType getNodeType() {
return NodeType.PEEK;
}
@Override
public void setNodeType(NodeType nodeType) {
}
public void init() {
generator = new CronSequenceGenerator(cron);
}
@Override
public void validate() {
init();
}
public long nextMillis() {
return Math.max(100, generator.next(new Date()).getTime() - System.currentTimeMillis());
}
}
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.device.entity;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.api.crud.entity.GenericTreeSortSupportEntity;
import java.util.List;
@Getter
@Setter
public class DeviceCategory extends GenericTreeSortSupportEntity<String> {
private String id;
private String key;
private String name;
private String parentId;
private List<DeviceCategory> children;
}

View File

@ -49,10 +49,14 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
private String describe;
@Comment("产品id")
@Column(name = "product_id", length = 32)
@Column(name = "product_id", length = 64)
@NotBlank(message = "产品ID不能为空", groups = CreateGroup.class)
private String productId;
@Comment("图片地址")
@Column(name = "photo_url", length = 1024)
private String photoUrl;
@Comment("产品名称")
@Column(name = "product_name")
@NotBlank(message = "产品名称不能为空", groups = CreateGroup.class)
@ -89,11 +93,11 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
@Column(name = "registry_time")
private Long registryTime;
@Column(name = "org_id", length = 32)
@Column(name = "org_id", length = 64)
@Comment("所属机构id")
private String orgId;
@Column(name = "parent_id", length = 32)
@Column(name = "parent_id", length = 64)
@Comment("父级设备ID")
private String parentId;

View File

@ -40,9 +40,13 @@ public class DeviceProductEntity extends GenericEntity<String> implements Record
private String name;
@Comment("所属项目")
@Column(name = "project_id",length = 32)
@Column(name = "project_id",length = 64)
private String projectId;
@Comment("图片地址")
@Column(name = "photo_url", length = 1024)
private String photoUrl;
@Comment("项目名称")
@Column(name = "project_name")
private String projectName;
@ -52,9 +56,13 @@ public class DeviceProductEntity extends GenericEntity<String> implements Record
private String describe;
@Comment("分类ID")
@Column(name = "classified_id")
@Column(name = "classified_id",length = 64)
private String classifiedId;
@Column
@Comment("分类名称")
private String classifiedName;
@Comment("消息协议: Alink,JetLinks")
@Column(name = "message_protocol")
@NotBlank(message = "消息协议不能为空",groups = CreateGroup.class)
@ -63,6 +71,10 @@ public class DeviceProductEntity extends GenericEntity<String> implements Record
})
private String messageProtocol;
@Column
@Comment("协议名称")
private String protocolName;
@Comment("协议元数据")
@Column(name = "metadata")
@ColumnType(jdbcType = JDBCType.CLOB)

View File

@ -60,6 +60,6 @@ public class DeviceTagEntity extends GenericEntity<String> {
}
public static String createTagId(String deviceId,String key){
return DigestUtils.md5Hex(deviceId.concat(":").concat(key));
return DigestUtils.md5Hex(deviceId + ":" + key);
}
}

View File

@ -27,9 +27,15 @@ public class DeviceDetail {
//设备名称
private String name;
//设备图片
private String photoUrl;
//消息协议标识
private String protocol;
//协议名称
private String protocolName;
//通信协议
private String transport;
@ -132,10 +138,11 @@ public class DeviceDetail {
}
setProtocol(productEntity.getMessageProtocol());
setTransport(productEntity.getTransportProtocol());
setPhotoUrl(productEntity.getPhotoUrl());
setProductId(productEntity.getId());
setProductName(productEntity.getName());
setDeviceType(productEntity.getDeviceType());
setProtocolName(productEntity.getProtocolName());
return this;
}

View File

@ -0,0 +1,73 @@
package org.jetlinks.community.device.web;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.TreeSupportEntity;
import org.jetlinks.community.device.entity.DeviceCategory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.StreamUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@RestController
@RequestMapping("/device/category")
@Slf4j
public class DeviceCategoryController {
static List<DeviceCategory> statics;
static void rebuild(String parentId, List<DeviceCategory> children) {
if (children == null) {
return;
}
for (DeviceCategory child : children) {
String id = child.getId();
child.setId(parentId + "|" + id + "|");
child.setParentId(parentId + "|");
rebuild(parentId + "|" + id, child.getChildren());
}
}
static {
try {
ClassPathResource resource = new ClassPathResource("device-category.json");
String json = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
List<DeviceCategory> all = JSON.parseArray(json, DeviceCategory.class);
List<DeviceCategory> root = TreeSupportEntity.list2tree(all, DeviceCategory::setChildren);
for (DeviceCategory category : root) {
String id = category.getId();
category.setId("|" + id + "|");
category.setParentId("|" + category.getParentId()+"|");
rebuild("|" + id, category.getChildren());
}
statics = all;
} catch (Exception e) {
statics = new ArrayList<>();
log.error(e.getMessage(), e);
}
}
@GetMapping
public Flux<DeviceCategory> getAllCategory() {
return Flux.fromIterable(statics);
}
@GetMapping("/_tree")
public Flux<DeviceCategory> getAllCategoryTree() {
return Flux.fromIterable(TreeSupportEntity.list2tree(statics, DeviceCategory::setChildren));
}
}

View File

@ -369,14 +369,17 @@ public class DeviceInstanceController implements
.flatMap(DeviceProductOperator::getMetadata)
.map(metadata -> new DeviceWrapper(metadata.getTags()))
.defaultIfEmpty(DeviceWrapper.empty)
.zipWith(productService.findById(productId))
.flatMapMany(wrapper -> importExportService
.getInputStream(fileUrl)
.flatMapMany(inputStream -> ReactorExcel.read(inputStream, FileUtils.getExtension(fileUrl), wrapper)))
.flatMapMany(inputStream -> ReactorExcel.read(inputStream, FileUtils.getExtension(fileUrl), wrapper.getT1()))
.doOnNext(info -> info.setProductName(wrapper.getT2().getName()))
)
.map(info -> {
DeviceInstanceEntity entity = FastBeanCopier.copy(info, new DeviceInstanceEntity());
entity.setProductId(productId);
if (StringUtils.isEmpty(entity.getId())) {
throw new BusinessException("" + info.getRowNumber() + 1 + "行:设备ID不能为空");
throw new BusinessException("" + (info.getRowNumber() + 1) + "行:设备ID不能为空");
}
return Tuples.of(entity, info.getTags());
})

View File

@ -33,7 +33,7 @@ public class DeviceExcelInfo {
private long rowNumber;
public void tag(String key, String name, Object value) {
public void tag(String key, String name, Object value, String type) {
if (value == null) {
return;
}
@ -41,14 +41,17 @@ public class DeviceExcelInfo {
entity.setKey(key);
entity.setValue(String.valueOf(value));
entity.setName(name);
entity.setId(String.valueOf(id).concat(":").concat(key));
entity.setDeviceId(id);
entity.setType(type);
entity.setId(DeviceTagEntity.createTagId(id,key));
tags.add(entity);
}
public void setId(String id) {
this.id = id;
for (DeviceTagEntity tag : tags) {
tag.setId(String.valueOf(id).concat(":").concat(tag.getKey()));
tag.setDeviceId(id);
tag.setId(DeviceTagEntity.createTagId(tag.getDeviceId(),tag.getKey()));
}
}

View File

@ -18,14 +18,14 @@ import java.util.Map;
*/
public class DeviceWrapper extends RowWrapper<DeviceExcelInfo> {
Map<String, String> tagMapping = new HashMap<>();
Map<String, PropertyMetadata> tagMapping = new HashMap<>();
static Map<String, String> headerMapping = DeviceExcelInfo.getImportHeaderMapping();
public static DeviceWrapper empty = new DeviceWrapper(Collections.emptyList());
public DeviceWrapper(List<PropertyMetadata> tags) {
for (PropertyMetadata tag : tags) {
tagMapping.put(tag.getName(), tag.getId());
tagMapping.put(tag.getName(), tag);
}
}
@ -38,9 +38,9 @@ public class DeviceWrapper extends RowWrapper<DeviceExcelInfo> {
protected DeviceExcelInfo wrap(DeviceExcelInfo deviceExcelInfo, Cell header, Cell cell) {
String headerText = header.valueAsText().orElse("null");
String maybeTag = tagMapping.get(headerText);
if (StringUtils.hasText(maybeTag)) {
deviceExcelInfo.tag(maybeTag, headerText, cell.value().orElse(null));
PropertyMetadata maybeTag = tagMapping.get(headerText);
if (maybeTag != null) {
deviceExcelInfo.tag(maybeTag.getId(), headerText, cell.value().orElse(null), maybeTag.getValueType().getId());
} else {
deviceExcelInfo.with(headerMapping.getOrDefault(headerText, headerText), cell.value().orElse(null));
}

File diff suppressed because it is too large Load Diff

View File

@ -73,6 +73,11 @@ public class DeviceAlarmRule implements Serializable {
*/
private List<Action> actions;
/**
* 防抖限制
*/
private ShakeLimit shakeLimit;
public void validate() {
if (org.apache.commons.collections.CollectionUtils.isEmpty(getTriggers())) {
@ -286,6 +291,28 @@ public class DeviceAlarmRule implements Serializable {
}
/**
* 抖动限制
* <a href="https://github.com/jetlinks/jetlinks-community/issues/8">https://github.com/jetlinks/jetlinks-community/issues/8</a>
*
* @since 1.3
*/
@Getter
@Setter
public static class ShakeLimit implements Serializable {
private boolean enabled;
//时间限制,单位时间内发生多次告警时,只算一次单位:
private int time;
//触发阈值,单位时间内发生n次告警,只算一次
private int threshold;
//当发生第一次告警时就触发,为false时表示最后一次才触发(告警有延迟,但是可以统计出次数)
private boolean alarmFirst;
}
@AllArgsConstructor
@Getter
public enum Operator {

View File

@ -1,72 +1,58 @@
package org.jetlinks.community.rule.engine.device;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.jetlinks.core.message.DeviceMessage;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.ValueObject;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.CommonExecutableRuleNodeFactoryStrategy;
import org.jetlinks.rule.engine.executor.node.RuleNodeConfig;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j(topic = "system.rule.engine.device.alarm")
@Component
@Slf4j
@AllArgsConstructor
public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy<DeviceAlarmRuleNode.Config> {
@Component
public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
private final MessageGateway messageGateway;
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context,DeviceAlarmRuleNode.Config config) {
return Mono::just;
}
@Override
protected void onStarted(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
context.onStop(
config.doSubscribe(messageGateway)
.flatMap(result -> {
RuleData data = RuleData.create(result);
//输出到下一节点
return context.getOutput()
.write(Mono.just(data))
.then(context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, data));
})
.onErrorResume(err -> context.onError(RuleData.create(err.getMessage()), err))
.subscribe()::dispose
);
}
@Override
public String getSupportType() {
public String getExecutor() {
return "device_alarm";
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new DeviceAlarmTaskExecutor(context));
}
@Getter
@Setter
public static class Config implements RuleNodeConfig {
static List<String> default_columns = Arrays.asList(
class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
List<String> default_columns = Arrays.asList(
"timestamp", "deviceId", "this.header.deviceName deviceName"
);
@ -74,19 +60,63 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
private ReactorQL ql;
public DeviceAlarmTaskExecutor(ExecutionContext context) {
super(context);
rule = createRule();
ql = createQL(rule);
}
@Override
public String getName() {
return "设备告警";
}
@Override
protected Disposable doStart() {
rule.validate();
return doSubscribe(messageGateway)
.filter(ignore -> state == Task.State.running)
.flatMap(result -> {
RuleData data = RuleData.create(result);
//输出到下一节点
return context
.getOutput()
.write(Mono.just(data))
.then(context.fireEvent(RuleConstants.Event.result, data));
})
.onErrorResume(err -> context.onError(err, null))
.subscribe();
}
@Override
public void reload() {
rule = createRule();
ql = createQL(rule);
if (disposable != null) {
disposable.dispose();
}
doStart();
}
private DeviceAlarmRule createRule() {
DeviceAlarmRule rule = ValueObject.of(context.getJob().getConfiguration())
.get("rule")
.map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule())).orElseThrow(() -> new IllegalArgumentException("告警配置错误"));
rule.validate();
return rule;
}
@Override
public void validate() {
if (CollectionUtils.isEmpty(rule.getTriggers())) {
throw new IllegalArgumentException("预警条件不能为空");
}
DeviceAlarmRule rule = createRule();
try {
ql = createQL();
createQL(rule);
} catch (Exception e) {
throw new IllegalArgumentException("配置错误:" + e.getMessage(), e);
}
}
private ReactorQL createQL() {
private ReactorQL createQL(DeviceAlarmRule rule) {
List<String> columns = new ArrayList<>(default_columns);
List<String> wheres = new ArrayList<>();
@ -116,13 +146,21 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
continue;
}
String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
// 'message',func(),this[name]
if ((property.getProperty().startsWith("'") && property.getProperty().endsWith("'"))
||
property.getProperty().contains("(") || property.getProperty().contains("[")) {
newColumns.add(property.getProperty() + "\"" + alias + "\"");
} else {
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
}
}
if (newColumns.size() > 3) {
sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
}
}
log.debug("create device alarm sql : \n{}", sql);
return ReactorQL.builder().sql(sql).build();
}
@ -157,9 +195,41 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
);
binds.forEach(context::bind);
return (ql == null ? ql = createQL() : ql)
Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
.start(context)
.map(ReactorQLRecord::asMap)
.map(ReactorQLRecord::asMap);
DeviceAlarmRule.ShakeLimit shakeLimit;
if ((shakeLimit = rule.getShakeLimit()) != null
&& shakeLimit.isEnabled()
&& shakeLimit.getTime() > 0) {
int thresholdNumber = shakeLimit.getThreshold();
//打开时间窗口
Flux<Flux<Map<String, Object>>> window = resultFlux.window(Duration.ofSeconds(shakeLimit.getTime()));
Function<Flux<Tuple2<Long, Map<String, Object>>>, Publisher<Tuple2<Long, Map<String, Object>>>> mapper =
shakeLimit.isAlarmFirst()
?
group -> group
.takeUntil(tp -> tp.getT1() >= thresholdNumber)
.take(1)
.singleOrEmpty()
:
group -> group.takeLast(1).singleOrEmpty();
resultFlux = window
.flatMap(group -> group
.index((index, data) -> Tuples.of(index + 1, data))
.transform(mapper)
.filter(tp -> tp.getT1() >= thresholdNumber) //超过阈值告警
.map(tp2 -> {
tp2.getT2().put("totalAlarms", tp2.getT1());
return tp2.getT2();
}));
}
return resultFlux
.flatMap(map -> {
map.put("productId", rule.getProductId());
map.put("alarmId", rule.getId());
@ -192,15 +262,5 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
.then(Mono.just(map));
});
}
@Override
public NodeType getNodeType() {
return NodeType.MAP;
}
@Override
public void setNodeType(NodeType nodeType) {
}
}
}

View File

@ -7,7 +7,6 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.rule.engine.api.Rule;
import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.springframework.util.StringUtils;
@ -63,14 +62,11 @@ public class RuleInstanceEntity extends GenericEntity<String> implements RecordC
private String instanceDetailJson;
public Rule toRule(RuleEngineModelParser parser) {
public RuleModel toRule(RuleEngineModelParser parser) {
RuleModel model = parser.parse(modelType, modelMeta);
model.setId(StringUtils.hasText(modelId)?modelId:getId());
model.setName(name);
Rule rule = new Rule();
rule.setModel(model);
rule.setVersion(modelVersion);
rule.setId(getId());
return rule;
return model;
}
}

View File

@ -2,9 +2,7 @@ package org.jetlinks.community.rule.engine.model;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
import java.io.Serializable;
import java.util.Map;
@ -18,7 +16,6 @@ public class Action implements Serializable {
* 执行器
*
* @see RuleNodeModel#getExecutor()
* @see ExecutableRuleNodeFactoryStrategy#getSupportType()
*/
private String executor;
@ -26,7 +23,6 @@ public class Action implements Serializable {
* 执行器配置
*
* @see RuleNodeModel#getConfiguration()
* @see RuleNodeConfiguration
*/
private Map<String, Object> configuration;
}

View File

@ -3,11 +3,10 @@ package org.jetlinks.community.rule.engine.model;
import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
import org.jetlinks.community.rule.engine.nodes.DeviceMessageSendNode;
import org.jetlinks.rule.engine.api.cluster.RunMode;
import org.jetlinks.rule.engine.api.model.RuleLink;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
@ -37,7 +36,6 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
RuleModel model = new RuleModel();
model.setId("device_alarm:".concat(rule.getId()));
model.setName(rule.getName());
model.setRunMode(RunMode.CLUSTER);
DeviceAlarmRule alarmRule = rule.getAlarmRule();
alarmRule.validate();
@ -59,7 +57,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
timer.setExecutor("timer");
timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
DeviceMessageSendNode.Config senderConfig = new DeviceMessageSendNode.Config();
DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config();
senderConfig.setAsync(true);
senderConfig.setDeviceId(alarmRule.getDeviceId());
senderConfig.setProductId(alarmRule.getProductId());

View File

@ -3,7 +3,7 @@ package org.jetlinks.community.rule.engine.model;
import com.alibaba.fastjson.JSON;
import org.jetlinks.community.rule.engine.enums.SqlRuleType;
import org.jetlinks.community.rule.engine.ql.SqlRule;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.model.RuleLink;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
@ -59,7 +59,7 @@ public class SqlRuleModelParser implements RuleModelParserStrategy {
link.setId(action.getId().concat(":").concat(action.getId()));
link.setName("错误处理:" + index);
link.setSource(sqlNode);
link.setType(RuleEvent.NODE_EXECUTE_FAIL);
link.setType(RuleConstants.Event.error);
link.setTarget(action);
errorHandler.add(link);
model.getNodes().add(action);

View File

@ -1,361 +0,0 @@
package org.jetlinks.community.rule.engine.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.utils.StringUtils;
import org.hswebframework.web.exception.NotFoundException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterQueue;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.rule.engine.api.*;
import org.jetlinks.rule.engine.api.executor.*;
import org.jetlinks.rule.engine.api.model.Condition;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
import org.jetlinks.rule.engine.cluster.logger.ClusterLogger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@Slf4j
@Service
public class RuleEngineDebugService {
@Autowired
private ExecutableRuleNodeFactory executableRuleNodeFactory;
@Autowired
private RuleEngineModelParser modelParser;
@Autowired
private ConditionEvaluator conditionEvaluator;
private Map<String, Session> sessionStore = new ConcurrentHashMap<>();
public Flux<DebugMessage> getDebugMessages(String sessionId) {
return getSession(sessionId)
.consumeOutPut();
}
private Session getSession(String id) {
return Optional.ofNullable(id)
.map(sessionStore::get)
.orElseThrow(() -> new NotFoundException("session不存在"));
}
public Mono<String> startSession() {
return Mono.fromSupplier(() -> {
String sessionId = IDGenerator.UUID.generate();
Session session = new Session(sessionId);
session.local = true;
sessionStore.put(sessionId, session);
return sessionId;
});
}
@SneakyThrows
public String startNode(String sessionId, RuleNodeConfiguration configuration) {
configuration.setNodeType(NodeType.MAP);
Session session = getSession(sessionId);
DebugExecutionContext context = session.createContext(configuration);
ExecutableRuleNode ruleNode = executableRuleNodeFactory.create(configuration);
ruleNode.start(context);
return context.id;
}
public void sendData(String sessionId, String contextId, RuleData ruleData) {
getSession(sessionId)
.getContext(contextId)
.execute(ruleData);
}
public Mono<Boolean> stopContext(String sessionId, String contextId) {
return Mono.fromRunnable(() -> getSession(sessionId).stopContext(contextId))
.thenReturn(true);
}
public Set<String> getAllContext(String sessionId) {
return getSession(sessionId)
.contexts
.keySet();
}
public Mono<Boolean> closeSession(String sessionId) {
return Mono.fromRunnable(() -> getSession(sessionId).close());
}
public Mono<Boolean> testCondition(String sessionId, Condition condition, Object data) {
Session session = getSession(sessionId);
try {
boolean success = conditionEvaluator.evaluate(condition, RuleData.create(data));
return session.writeMessage(DebugMessage.of("output", null, "测试条件:".concat(success ? "通过" : "未通过")));
} catch (Exception e) {
return session.writeMessage(DebugMessage.of("error", null, StringUtils.throwable2String(e)));
}
}
private class Session {
private String id;
private long lastOperationTime;
private Map<String, DebugExecutionContext> contexts = new ConcurrentHashMap<>();
private Map<String, RuleInstanceContext> instanceContext = new ConcurrentHashMap<>();
private Map<String, String> instanceContextMapping = new ConcurrentHashMap<>();
private EmitterProcessor<DebugMessage> messageQueue = EmitterProcessor.create(false);
@Getter
private boolean local = false;
private Session(String id) {
this.lastOperationTime = System.currentTimeMillis();
this.id = id;
}
private boolean isTimeout() {
return System.currentTimeMillis() - lastOperationTime > TimeUnit.MINUTES.toMillis(15);
}
private void checkContextTimeout() {
contexts.entrySet()
.stream()
.filter(e -> e.getValue().isTimeout())
.map(Map.Entry::getKey)
.map(contexts::remove)
.forEach(DebugExecutionContext::stop);
}
private void stopContext(String contextId) {
Optional.ofNullable(contexts.remove(contextId))
.ifPresent(ExecutionContext::stop);
}
private Logger createLogger(String contextId, String nodeId) {
ClusterLogger logger = new ClusterLogger();
logger.setParent(new Slf4jLogger("rule.engine.debug.".concat(nodeId)));
logger.setNodeId(nodeId);
logger.setInstanceId(contextId);
logger.setLogInfoConsumer(logInfo -> {
Map<String, Object> data = new HashMap<>();
data.put("level", logInfo.getLevel());
data.put("message", logInfo.getMessage());
writeMessage(DebugMessage.of("log", contextId, data))
.subscribe();
});
return logger;
}
private DebugExecutionContext createContext(RuleNodeConfiguration configuration) {
lastOperationTime = System.currentTimeMillis();
String id = Optional.ofNullable(configuration.getId()).orElseGet(IDGenerator.MD5::generate);
DebugExecutionContext context = contexts.get(id);
if (context != null) {
context.stop();
contexts.remove(id);
}
context = new DebugExecutionContext(id, createLogger(id, configuration.getNodeId()), this);
context.local = true;
contexts.put(id, context);
return context;
}
private DebugExecutionContext getContext(String id) {
lastOperationTime = System.currentTimeMillis();
return contexts.computeIfAbsent(id, _id -> new DebugExecutionContext(id, new Slf4jLogger("rule.engine.debug.none"), this));
}
private void execute(RuleData ruleData) {
String instanceId = ruleData.getAttribute("instanceId").map(String::valueOf).orElse(null);
RuleInstanceContext context = instanceContext.get(instanceId);
if (context != null) {
doExecute(context, ruleData);
}
}
private void doExecute(RuleInstanceContext context, RuleData ruleData) {
context.execute(Mono.just(ruleData))
.doOnError((throwable) -> {
writeMessage(DebugMessage.of("error", context.getId(), "执行规则失败:" + StringUtils.throwable2String(throwable)));
})
.subscribe(resp -> {
writeMessage(DebugMessage.of("output", context.getId(), resp.getData()));
});
}
private void execute(ExecuteRuleRequest request) {
RuleInstanceContext context = instanceContext.get(request.getContextId());
if (context == null) {
return;
}
RuleData ruleData = RuleData.create(request.getData());
RuleDataHelper.markStartWith(ruleData, request.getStartWith());
RuleDataHelper.markSyncReturn(ruleData, request.getEndWith());
ruleData.setAttribute("debugSessionId", id);
ruleData.setAttribute("instanceId", request.getContextId());
doExecute(context, ruleData);
}
private Mono<Boolean> writeMessage(DebugMessage message) {
lastOperationTime = System.currentTimeMillis();
return Mono.fromRunnable(() -> messageQueue.onNext(message))
.thenReturn(true);
}
@SneakyThrows
public Flux<DebugMessage> consumeOutPut() {
return messageQueue
.map(Function.identity());
}
public void close() {
contexts.forEach((s, context) -> context.stop());
instanceContext.values().forEach(RuleInstanceContext::stop);
instanceContext.clear();
instanceContextMapping.clear();
messageQueue.onComplete();
}
}
private class DebugExecutionContext implements ExecutionContext {
private Session session;
private String id;
private EmitterProcessor<RuleData> inputQueue = EmitterProcessor.create(false);
private Logger logger;
private List<Runnable> stopListener = new CopyOnWriteArrayList<>();
private long lastOperationTime = System.currentTimeMillis();
@Getter
private boolean local = false;
public DebugExecutionContext(String id, Logger logger, Session session) {
this.session = session;
this.logger = logger;
this.id = id;
}
public boolean isTimeout() {
return System.currentTimeMillis() - lastOperationTime > TimeUnit.MINUTES.toMillis(15);
}
@Override
public String getInstanceId() {
return id;
}
@Override
public String getNodeId() {
return id;
}
@Override
public Logger logger() {
return logger;
}
public void execute(RuleData ruleData) {
lastOperationTime = System.currentTimeMillis();
ruleData.setAttribute("debug", true);
inputQueue.onNext(ruleData);
}
@Override
public Input getInput() {
return new Input() {
@Override
public Flux<RuleData> subscribe() {
return inputQueue.map(Function.identity())
.doOnNext(data -> {
log.debug("handle input :{}", data);
})
.doFinally(s -> {
log.debug("unsubscribe input:{}", id);
});
}
@Override
public void close() {
inputQueue.onComplete();
}
};
}
@Override
public Output getOutput() {
return (data) -> Flux.from(data)
.flatMap(d -> session.writeMessage(DebugMessage.of("output", id, JSON.toJSONString(d.getData(), SerializerFeature.PrettyFormat))))
.then(Mono.just(true));
}
@Override
public Mono<Void> fireEvent(String event, RuleData data) {
return Mono.empty();
}
@Override
public Mono<Void> onError(RuleData data, Throwable e) {
return session
.writeMessage(DebugMessage.of("error", id, StringUtils.throwable2String(e)))
.then();
}
@Override
public void stop() {
stopListener.forEach(Runnable::run);
}
@Override
public void onStop(Runnable runnable) {
stopListener.add(runnable);
}
}
}

View File

@ -4,16 +4,15 @@ import lombok.extern.slf4j.Slf4j;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.event.handler.RuleEngineLoggerIndexProvider;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
import org.jetlinks.rule.engine.api.Rule;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.community.rule.engine.event.handler.RuleEngineLoggerIndexProvider;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.RuleInstanceContext;
import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
@ -44,30 +43,29 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
public Mono<Void> stop(String id) {
return this.ruleEngine
.getInstance(id)
.flatMap(RuleInstanceContext::stop)
.switchIfEmpty(Mono.empty())
.then(createUpdate()
.set(RuleInstanceEntity::getState, RuleInstanceState.stopped)
.where(RuleInstanceEntity::getId,id)
.execute())
.then();
.shutdown(id)
.then(createUpdate()
.set(RuleInstanceEntity::getState, RuleInstanceState.stopped)
.where(RuleInstanceEntity::getId, id)
.execute())
.then();
}
public Mono<RuleInstanceContext> start(String id) {
public Mono<Void> start(String id) {
return findById(Mono.just(id))
.flatMap(this::doStart);
.flatMapMany(instance -> this.ruleEngine.startRule(id, instance.toRule(modelParser)))
.then();
}
private Mono<RuleInstanceContext> doStart(RuleInstanceEntity entity) {
private Mono<Void> doStart(RuleInstanceEntity entity) {
return Mono.defer(() -> {
Rule rule = entity.toRule(modelParser);
return ruleEngine.startRule(rule)
.flatMap(ctx -> createUpdate()
.set(RuleInstanceEntity::getState, RuleInstanceState.started)
.where(entity::getId)
.execute()
.thenReturn(ctx));
RuleModel model = entity.toRule(modelParser);
return ruleEngine
.startRule(entity.getId(), model)
.then(createUpdate()
.set(RuleInstanceEntity::getState, RuleInstanceState.started)
.where(entity::getId)
.execute()).then();
});
}
@ -81,12 +79,10 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
@Override
public void run(String... args) {
createQuery()
.where()
.is(RuleInstanceEntity::getState, RuleInstanceState.started)
.fetch()
.flatMap(this::doStart)
.subscribe(context -> {
log.debug("start rule {}", context.getId());
});
.where()
.is(RuleInstanceEntity::getState, RuleInstanceState.started)
.fetch()
.flatMap(this::doStart)
.subscribe();
}
}

View File

@ -1,80 +0,0 @@
package org.jetlinks.community.rule.engine.web;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.hswebframework.web.authorization.annotation.Resource;
import org.jetlinks.community.rule.engine.service.DebugMessage;
import org.jetlinks.community.rule.engine.service.RuleEngineDebugService;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
import org.jetlinks.rule.engine.api.model.Condition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/rule-engine/debug")
@Authorize
@Resource(id="rule-model",name = "规则引擎-模型")
public class RuleEngineDebugController {
@Autowired
private RuleEngineDebugService ruleDebugService;
@PostMapping
public Mono<String> startSession() {
return ruleDebugService.startSession();
}
@PostMapping("/{sessionId}")
public Mono<String> startNode(@PathVariable String sessionId, @RequestBody RuleNodeConfiguration configuration) {
return Mono.just(ruleDebugService.startNode(sessionId, configuration));
}
@GetMapping(value = "/{sessionId}/logs", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<DebugMessage> getSessionLogs(@PathVariable String sessionId) {
return ruleDebugService.getDebugMessages(sessionId);
}
@PostMapping("/{sessionId}/{contextId}")
public Mono<String> executeNode(@PathVariable String sessionId,
@PathVariable String contextId,
@RequestBody String data) {
Object object = data.trim();
if (data.startsWith("[") || data.startsWith("{")) {
object = JSON.parse(data);
}
RuleData ruleData = RuleData.create(object);
ruleDebugService.sendData(sessionId, contextId, ruleData);
return Mono.just(ruleData.getId());
}
@GetMapping("/{sessionId}/contexts")
public Flux<String> getSessionContexts(@PathVariable String sessionId) {
return Flux.fromIterable(ruleDebugService.getAllContext(sessionId));
}
@PostMapping("/{sessionId}/condition")
public Mono<Boolean> testCondition(@PathVariable String sessionId, @RequestBody JSONObject data) {
return ruleDebugService.testCondition(sessionId, data.getJSONObject("condition").toJavaObject(Condition.class), data.get("data"));
}
@DeleteMapping("/{sessionId}")
public Mono<Boolean> stopSession(@PathVariable String sessionId) {
return ruleDebugService.closeSession(sessionId);
}
@DeleteMapping("/{sessionId}/{contextId}")
public Mono<Boolean> stopContext(@PathVariable String sessionId, @PathVariable String contextId) {
return ruleDebugService.stopContext(sessionId, contextId);
}
}

View File

@ -7,18 +7,12 @@ import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.ResourceAction;
import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.hswebframework.web.exception.NotFoundException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteEventInfo;
import org.jetlinks.community.rule.engine.entity.RuleEngineExecuteLogInfo;
import org.jetlinks.community.rule.engine.entity.RuleInstanceEntity;
import org.jetlinks.community.rule.engine.service.RuleInstanceService;
import org.jetlinks.rule.engine.api.DefaultRuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@ -29,8 +23,6 @@ public class RuleInstanceController implements ReactiveServiceCrudController<Rul
@Autowired
private RuleInstanceService instanceService;
@Autowired
private RuleEngine ruleEngine;
@PostMapping("/{id}/_start")
@ResourceAction(id = "start", name = "启动")
@ -66,25 +58,6 @@ public class RuleInstanceController implements ReactiveServiceCrudController<Rul
}
@PostMapping("/{id}/_execute/{startWith}/{endWith}")
@ResourceAction(id = "execute", name = "执行")
public Flux<Object> execute(@PathVariable String id,
@PathVariable String startWith,
@PathVariable String endWith,
@RequestBody Flux<DefaultRuleData> payload) {
return ruleEngine
.getInstance(id)
.switchIfEmpty(Mono.error(NotFoundException::new))
.flatMapMany(context -> context
.execute(payload
.map(ruleData -> {
ruleData.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
RuleDataHelper.markStartWith(ruleData, startWith);
return RuleDataHelper.markSyncReturn(ruleData, endWith);
})
));
}
@Override
public ReactiveCrudService<RuleInstanceEntity, String> getService() {
return instanceService;

View File

@ -7,12 +7,15 @@ import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.jetlinks.community.rule.engine.entity.RuleModelEntity;
import org.jetlinks.community.rule.engine.service.RuleModelService;
import org.jetlinks.rule.engine.api.executor.ExecutableRuleNodeFactory;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Function;
@RestController
@RequestMapping("rule-engine/model")
@Resource(id = "rule-model", name = "规则引擎-模型")
@ -22,7 +25,7 @@ public class RuleModelController implements ReactiveServiceCrudController<RuleMo
private RuleModelService ruleModelService;
@Autowired
private ExecutableRuleNodeFactory factory;
private RuleEngine ruleEngine;
@Override
public ReactiveCrudService<RuleModelEntity, String> getService() {
@ -39,6 +42,6 @@ public class RuleModelController implements ReactiveServiceCrudController<RuleMo
@GetMapping("/executors")
@QueryAction
public Flux<String> getAllSupportExecutors() {
return Flux.fromIterable(factory.getAllSupportExecutor());
return ruleEngine.getWorkers().flatMap(Worker::getSupportExecutors).flatMapIterable(Function.identity());
}
}

View File

@ -8,6 +8,7 @@ import org.hswebframework.web.loggin.aop.EnableAccessLogger;
import org.hswebframework.web.logging.events.AccessLoggerAfterEvent;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Profile;
@ -18,7 +19,8 @@ import javax.annotation.PostConstruct;
@SpringBootApplication(scanBasePackages = "org.jetlinks.community", exclude = {
DataSourceAutoConfiguration.class
DataSourceAutoConfiguration.class,
RestClientAutoConfiguration.class
})
@EnableCaching
@EnableEasyormRepository("org.jetlinks.community.**.entity")

View File

@ -10,6 +10,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.*;
@ -34,6 +36,11 @@ public class ExecutorConfiguration {
};
}
@Bean
public Scheduler reactorScheduler(ScheduledExecutorService executorService) {
return Schedulers.fromExecutorService(executorService);
}
@Bean
@ConfigurationProperties(prefix = "jetlinks.executor")
@Primary

View File

@ -0,0 +1,20 @@
package org.jetlinks.community.standalone.web;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.jetlinks.community.Version;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RequestMapping("/system")
@RestController
public class SystemInfoController {
@GetMapping("/version")
@Authorize(ignore = true)
public Mono<Version> getVersion() {
return Mono.just(Version.current);
}
}

37
pom.xml
View File

@ -16,19 +16,20 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<spring.boot.version>2.2.6.RELEASE</spring.boot.version>
<spring.boot.version>2.2.8.RELEASE</spring.boot.version>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<hsweb.framework.version>4.0.3</hsweb.framework.version>
<easyorm.version>4.0.3</easyorm.version>
<hsweb.framework.version>4.0.4-SNAPSHOT</hsweb.framework.version>
<easyorm.version>4.0.4-SNAPSHOT</easyorm.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<jetlinks.version>1.0.4-SNAPSHOT</jetlinks.version>
<jetlinks.version>1.1.0-SNAPSHOT</jetlinks.version>
<r2dbc.version>Arabba-SR3</r2dbc.version>
<vertx.version>3.8.5</vertx.version>
<netty.version>4.1.46.Final</netty.version>
<netty.version>4.1.50.Final</netty.version>
<elasticsearch.version>6.8.6</elasticsearch.version>
<reactor.excel.version>1.0-RC</reactor.excel.version>
<reactor.ql.version>1.0.0</reactor.ql.version>
<fastjson.version>1.2.70</fastjson.version>
</properties>
<build>
@ -150,8 +151,22 @@
<version>3.0.0</version>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>build</id>
<repositories>
<repository>
<id>maven-central</id>
<name>central</name>
<url>https://repo1.maven.org/maven2/</url>
</repository>
</repositories>
</profile>
</profiles>
<dependencyManagement>
<dependencies>
@ -182,7 +197,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
<version>${fastjson.version}</version>
</dependency>
<dependency>
@ -374,16 +389,6 @@
<url>https://repo.spring.io/milestone</url>
</repository>
<!-- <repository>-->
<!-- <id>sonatype-snapshots</id>-->
<!-- <name>Nexus Snapshot Repository</name>-->
<!-- <url>https://oss.sonatype.org/content/repositories/snapshots</url>-->
<!-- <snapshots>-->
<!-- <enabled>true</enabled>-->
<!-- <updatePolicy>daily</updatePolicy>-->
<!-- </snapshots>-->
<!-- </repository>-->
</repositories>
<distributionManagement>