优化设备告警规则

This commit is contained in:
zhouhao 2020-04-10 22:56:25 +08:00
parent d5cdff3a9b
commit 57a3876ccb
3 changed files with 128 additions and 56 deletions

View File

@ -10,6 +10,7 @@ import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@ -19,7 +20,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 设备警规则
* 设备警规则
*
* @author zhouhao
* @since 1.1
@ -27,6 +28,7 @@ import java.util.stream.Stream;
@Getter
@Setter
public class DeviceAlarmRule implements Serializable {
private static final long serialVersionUID = -1L;
/**
* 规则ID
@ -59,50 +61,45 @@ public class DeviceAlarmRule implements Serializable {
private String deviceName;
/**
* 类型类型,属性或者事件.
* 触发条件,不能为空
*/
private MessageType type;
private List<Trigger> triggers;
/**
* 要单独获取哪些字段信息
*/
private List<Property> properties;
/**
* 执行条件
*/
private List<Condition> conditions;
/**
* 警告发生后的操作,指向其他规则节点,如发送消息通知.
*/
private List<Operation> operations;
private List<Action> actions;
public void validate() {
if (org.apache.commons.collections.CollectionUtils.isEmpty(getConditions())) {
throw new IllegalArgumentException("conditions不能为空");
if (org.apache.commons.collections.CollectionUtils.isEmpty(getTriggers())) {
throw new IllegalArgumentException("触发条件不能为空");
}
getTriggers().forEach(Trigger::validate);
}
public List<String> getPlainColumns() {
Stream<String> conditionColumns = conditions
Stream<String> conditionColumns = triggers
.stream()
.map(condition -> condition.getColumn(type));
.flatMap(trigger -> trigger.getColumns().stream());
if (CollectionUtils.isEmpty(properties)) {
return conditionColumns.collect(Collectors.toList());
}
return Stream.concat(conditionColumns, properties
.stream()
.map(property -> type.getPropertyPrefix() + property.toString()))
.map(Property::toString))
.collect(Collectors.toList());
}
@Getter
@Setter
public static class Operation implements Serializable {
public static class Action implements Serializable {
/**
* 执行器
@ -146,12 +143,13 @@ public class DeviceAlarmRule implements Serializable {
}
@Override
public Optional<DeviceMessage> createMessage(Condition condition) {
public Optional<DeviceMessage> createMessage(Trigger trigger) {
ReadPropertyMessage readPropertyMessage = new ReadPropertyMessage();
readPropertyMessage.setProperties(new ArrayList<>(
StringUtils.hasText(trigger.getModelId())
? Collections.singletonList(trigger.getModelId())
: Collections.emptyList()));
String property = StringUtils.hasText(condition.getModelId()) ? condition.getModelId() : condition.getKey();
readPropertyMessage.setProperties(new ArrayList<>(Collections.singletonList(property)));
return Optional.of(readPropertyMessage);
}
},
@ -170,10 +168,10 @@ public class DeviceAlarmRule implements Serializable {
}
@Override
public Optional<DeviceMessage> createMessage(Condition condition) {
public Optional<DeviceMessage> createMessage(Trigger trigger) {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setFunctionId(condition.getModelId());
message.setInputs(condition.getParameters());
message.setFunctionId(trigger.getModelId());
message.setInputs(trigger.getParameters());
message.setTimestamp(System.currentTimeMillis());
return Optional.of(message);
}
@ -185,20 +183,17 @@ public class DeviceAlarmRule implements Serializable {
public abstract String getTopic(String productId, String deviceId, String key);
public Optional<DeviceMessage> createMessage(Condition condition) {
public Optional<DeviceMessage> createMessage(Trigger trigger) {
return Optional.empty();
}
}
@Getter
@AllArgsConstructor
public enum ConditionType implements Serializable {
public enum TriggerType implements Serializable {
//设备消息
message(Arrays.asList(
MessageType.online,
MessageType.offline,
MessageType.properties,
MessageType.event
device(Arrays.asList(
MessageType.values()
)),
//定时,定时获取只支持获取设备属性和调用功能.
timer(Arrays.asList(
@ -212,20 +207,80 @@ public class DeviceAlarmRule implements Serializable {
@Getter
@Setter
public static class Condition implements Serializable {
public static class Trigger implements Serializable {
//条件类型,定时
private ConditionType trigger = ConditionType.message;
//触发方式,定时,设备
private TriggerType trigger = TriggerType.device;
//trigger为定时任务时的cron表达式
private String cron;
//类型,属性或者事件.
private MessageType type;
//trigger为定时任务并且消息类型为功能调用时
private List<FunctionParameter> parameters;
//物模型属性或者事件的标识 : fire_alarm
private String modelId;
//过滤条件
private List<ConditionFilter> filters;
public Set<String> getColumns() {
return filters == null
? Collections.emptySet()
: filters.stream()
.map(filter -> filter.getColumn(type))
.collect(Collectors.toSet());
}
public List<Object> getFilterValues() {
return filters == null ? Collections.emptyList() :
filters.stream()
.map(ConditionFilter::convertValue)
.collect(Collectors.toList());
}
public Optional<String> createExpression() {
if (CollectionUtils.isEmpty(filters)) {
return Optional.empty();
}
return Optional.of(
filters.stream()
.map(filter -> filter.createExpression(type))
.collect(Collectors.joining(" and "))
);
}
public void validate() {
if (type == null) {
throw new IllegalArgumentException("类型不能为空");
}
if (type != MessageType.online && type != MessageType.offline && StringUtils.isEmpty(modelId)) {
throw new IllegalArgumentException("属性/事件/功能ID不能为空");
}
if (trigger == TriggerType.timer) {
if (StringUtils.isEmpty(cron)) {
throw new IllegalArgumentException("cron表达式不能为空");
}
try {
new CronSequenceGenerator(cron);
} catch (Exception e) {
throw new IllegalArgumentException("cron表达式格式错误", e);
}
}
if (!CollectionUtils.isEmpty(filters)) {
filters.forEach(ConditionFilter::validate);
}
}
}
@Getter
@Setter
public static class ConditionFilter implements Serializable {
//过滤条件key : temperature
private String key;
@ -240,12 +295,25 @@ public class DeviceAlarmRule implements Serializable {
}
public String createExpression(MessageType type) {
//函数和this忽略前缀
if (key.contains("(") || key.startsWith("this")) {
return key;
}
return type.getPropertyPrefix() + (key.trim()) + " " + operator.symbol + " ? ";
}
public Object convertValue(){
public Object convertValue() {
return operator.convert(value);
}
public void validate() {
if (StringUtils.isEmpty(key)) {
throw new IllegalArgumentException("条件key不能为空");
}
if (StringUtils.isEmpty(value)) {
throw new IllegalArgumentException("条件值不能为空");
}
}
}

View File

@ -35,13 +35,13 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
private final MessageGateway messageGateway;
@Override
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, org.jetlinks.community.rule.engine.device.DeviceAlarmRuleNode.Config config) {
public Function<RuleData, ? extends Publisher<?>> createExecutor(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
return Mono::just;
}
@Override
protected void onStarted(ExecutionContext context, org.jetlinks.community.rule.engine.device.DeviceAlarmRuleNode.Config config) {
protected void onStarted(ExecutionContext context, DeviceAlarmRuleNode.Config config) {
context.onStop(
config.doSubscribe(messageGateway)
.flatMap(result -> {
@ -65,14 +65,14 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
@Setter
public static class Config implements RuleNodeConfig {
static List<String> default_columns = Arrays.asList(
"timestamp", "deviceId"
"timestamp", "deviceId", "this.header.deviceName deviceName"
);
private DeviceAlarmRule rule;
@Override
public void validate() {
if (CollectionUtils.isEmpty(rule.getConditions())) {
if (CollectionUtils.isEmpty(rule.getTriggers())) {
throw new IllegalArgumentException("预警条件不能为空");
}
}
@ -82,12 +82,16 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
List<String> wheres = new ArrayList<>();
columns.addAll(rule.getPlainColumns());
for (DeviceAlarmRule.Condition condition : rule.getConditions()) {
wheres.add(condition.createExpression(rule.getType()));
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
trigger.createExpression()
.ifPresent(expr -> wheres.add("(" + expr + ")"));
}
String sql = "select " + String.join(",", columns) +
" from msg where " + String.join(" or ", wheres);
String sql = "select " + String.join(",", columns) + " from msg ";
if (!wheres.isEmpty()) {
sql = "where " + String.join(" or ", wheres);
}
log.debug("create device alarm sql : {}", sql);
return ReactorQL.builder().sql(sql).build();
@ -98,10 +102,10 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
List<Object> binds = new ArrayList<>();
for (DeviceAlarmRule.Condition condition : rule.getConditions()) {
String topic = rule.getType().getTopic(rule.getProductId(), rule.getDeviceId(), condition.getModelId());
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
String topic = trigger.getType().getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
topics.add(topic);
binds.add(condition.convertValue());
binds.add(trigger.getFilterValues());
}
List<Subscription> subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList());
@ -147,15 +151,15 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy
map.putIfAbsent("productName", map.get("productId"));
}
if (log.isDebugEnabled()) {
log.debug("发生设备警:{}", map);
log.debug("发生设备警:{}", map);
}
// 推送告到消息网关中
// 推送警信息到消息网关中
// /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId}
return gateway
.publish(String.format(
"/rule-engine/device/alarm/%s/%s/%s",
rule.getProductId(), map.get("deviceId"), rule.getId()
), map)
), map, true)
.then(Mono.just(map));
});
}

View File

@ -43,20 +43,20 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
//处理定时触发
{
List<DeviceAlarmRule.Condition> timerConditions = alarmRule.getConditions().stream()
.filter(condition -> condition.getTrigger() == DeviceAlarmRule.ConditionType.timer)
List<DeviceAlarmRule.Trigger> timerTriggers = alarmRule.getTriggers().stream()
.filter(trigger -> trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer)
.collect(Collectors.toList());
int index = 0;
for (DeviceAlarmRule.Condition timerCondition : timerConditions) {
DeviceMessage msg = alarmRule.getType().createMessage(timerCondition).orElse(null);
for (DeviceAlarmRule.Trigger timerTrigger : timerTriggers) {
DeviceMessage msg = timerTrigger.getType().createMessage(timerTrigger).orElse(null);
if (msg == null) {
throw new UnsupportedOperationException("不支持定时条件类型:" + alarmRule.getType());
throw new UnsupportedOperationException("不支持定时条件类型:" + timerTrigger.getType());
}
RuleNodeModel timer = new RuleNodeModel();
timer.setId("timer:" + (++index));
timer.setName("定时发送设备消息");
timer.setExecutor("timer");
timer.setConfiguration(Collections.singletonMap("cron", timerCondition.getCron()));
timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
DeviceMessageSendNode.Config senderConfig = new DeviceMessageSendNode.Config();
senderConfig.setAsync(true);
@ -88,9 +88,9 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
conditionNode.setExecutor("device_alarm");
conditionNode.setConfiguration(Collections.singletonMap("rule", rule.getAlarmRule()));
model.getNodes().add(conditionNode);
if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getOperations())) {
if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getActions())) {
int index = 0;
for (DeviceAlarmRule.Operation operation : rule.getAlarmRule().getOperations()) {
for (DeviceAlarmRule.Action operation : rule.getAlarmRule().getActions()) {
RuleNodeModel action = new RuleNodeModel();
action.setId("device_alarm_action:" + index);
action.setName("执行动作:" + index);