diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java index 837b5e13..0b4d3337 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRule.java @@ -10,6 +10,7 @@ import org.jetlinks.core.message.property.ReadPropertyMessage; import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration; import org.jetlinks.rule.engine.api.model.RuleNodeModel; import org.jetlinks.rule.engine.executor.ExecutableRuleNodeFactoryStrategy; +import org.springframework.scheduling.support.CronSequenceGenerator; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -19,7 +20,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** - * 设备预警规则 + * 设备告警规则 * * @author zhouhao * @since 1.1 @@ -27,6 +28,7 @@ import java.util.stream.Stream; @Getter @Setter public class DeviceAlarmRule implements Serializable { + private static final long serialVersionUID = -1L; /** * 规则ID @@ -59,50 +61,45 @@ public class DeviceAlarmRule implements Serializable { private String deviceName; /** - * 类型类型,属性或者事件. + * 触发条件,不能为空 */ - private MessageType type; + private List triggers; /** * 要单独获取哪些字段信息 */ private List properties; - /** - * 执行条件 - */ - private List conditions; - /** * 警告发生后的操作,指向其他规则节点,如发送消息通知. */ - private List operations; + private List actions; public void validate() { - if (org.apache.commons.collections.CollectionUtils.isEmpty(getConditions())) { - throw new IllegalArgumentException("conditions不能为空"); + if (org.apache.commons.collections.CollectionUtils.isEmpty(getTriggers())) { + throw new IllegalArgumentException("触发条件不能为空"); } - + getTriggers().forEach(Trigger::validate); } public List getPlainColumns() { - Stream conditionColumns = conditions + Stream conditionColumns = triggers .stream() - .map(condition -> condition.getColumn(type)); + .flatMap(trigger -> trigger.getColumns().stream()); if (CollectionUtils.isEmpty(properties)) { return conditionColumns.collect(Collectors.toList()); } return Stream.concat(conditionColumns, properties .stream() - .map(property -> type.getPropertyPrefix() + property.toString())) + .map(Property::toString)) .collect(Collectors.toList()); } @Getter @Setter - public static class Operation implements Serializable { + public static class Action implements Serializable { /** * 执行器 @@ -146,12 +143,13 @@ public class DeviceAlarmRule implements Serializable { } @Override - public Optional createMessage(Condition condition) { + public Optional createMessage(Trigger trigger) { ReadPropertyMessage readPropertyMessage = new ReadPropertyMessage(); + readPropertyMessage.setProperties(new ArrayList<>( + StringUtils.hasText(trigger.getModelId()) + ? Collections.singletonList(trigger.getModelId()) + : Collections.emptyList())); - String property = StringUtils.hasText(condition.getModelId()) ? condition.getModelId() : condition.getKey(); - - readPropertyMessage.setProperties(new ArrayList<>(Collections.singletonList(property))); return Optional.of(readPropertyMessage); } }, @@ -170,10 +168,10 @@ public class DeviceAlarmRule implements Serializable { } @Override - public Optional createMessage(Condition condition) { + public Optional createMessage(Trigger trigger) { FunctionInvokeMessage message = new FunctionInvokeMessage(); - message.setFunctionId(condition.getModelId()); - message.setInputs(condition.getParameters()); + message.setFunctionId(trigger.getModelId()); + message.setInputs(trigger.getParameters()); message.setTimestamp(System.currentTimeMillis()); return Optional.of(message); } @@ -185,20 +183,17 @@ public class DeviceAlarmRule implements Serializable { public abstract String getTopic(String productId, String deviceId, String key); - public Optional createMessage(Condition condition) { + public Optional createMessage(Trigger trigger) { return Optional.empty(); } } @Getter @AllArgsConstructor - public enum ConditionType implements Serializable { + public enum TriggerType implements Serializable { //设备消息 - message(Arrays.asList( - MessageType.online, - MessageType.offline, - MessageType.properties, - MessageType.event + device(Arrays.asList( + MessageType.values() )), //定时,定时获取只支持获取设备属性和调用功能. timer(Arrays.asList( @@ -212,20 +207,80 @@ public class DeviceAlarmRule implements Serializable { @Getter @Setter - public static class Condition implements Serializable { + public static class Trigger implements Serializable { - //条件类型,定时 - private ConditionType trigger = ConditionType.message; + //触发方式,定时,设备 + private TriggerType trigger = TriggerType.device; //trigger为定时任务时的cron表达式 private String cron; + //类型,属性或者事件. + private MessageType type; + //trigger为定时任务并且消息类型为功能调用时 private List parameters; //物模型属性或者事件的标识 如: fire_alarm private String modelId; + //过滤条件 + private List filters; + + public Set getColumns() { + return filters == null + ? Collections.emptySet() + : filters.stream() + .map(filter -> filter.getColumn(type)) + .collect(Collectors.toSet()); + } + + public List getFilterValues() { + return filters == null ? Collections.emptyList() : + filters.stream() + .map(ConditionFilter::convertValue) + .collect(Collectors.toList()); + } + + public Optional createExpression() { + if (CollectionUtils.isEmpty(filters)) { + return Optional.empty(); + } + return Optional.of( + filters.stream() + .map(filter -> filter.createExpression(type)) + .collect(Collectors.joining(" and ")) + ); + } + + public void validate() { + if (type == null) { + throw new IllegalArgumentException("类型不能为空"); + } + + if (type != MessageType.online && type != MessageType.offline && StringUtils.isEmpty(modelId)) { + throw new IllegalArgumentException("属性/事件/功能ID不能为空"); + } + + if (trigger == TriggerType.timer) { + if (StringUtils.isEmpty(cron)) { + throw new IllegalArgumentException("cron表达式不能为空"); + } + try { + new CronSequenceGenerator(cron); + } catch (Exception e) { + throw new IllegalArgumentException("cron表达式格式错误", e); + } + } + if (!CollectionUtils.isEmpty(filters)) { + filters.forEach(ConditionFilter::validate); + } + } + } + + @Getter + @Setter + public static class ConditionFilter implements Serializable { //过滤条件key 如: temperature private String key; @@ -240,12 +295,25 @@ public class DeviceAlarmRule implements Serializable { } public String createExpression(MessageType type) { + //函数和this忽略前缀 + if (key.contains("(") || key.startsWith("this")) { + return key; + } return type.getPropertyPrefix() + (key.trim()) + " " + operator.symbol + " ? "; } - public Object convertValue(){ + public Object convertValue() { return operator.convert(value); } + + public void validate() { + if (StringUtils.isEmpty(key)) { + throw new IllegalArgumentException("条件key不能为空"); + } + if (StringUtils.isEmpty(value)) { + throw new IllegalArgumentException("条件值不能为空"); + } + } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java index aea2dafa..51d7c9a6 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmRuleNode.java @@ -35,13 +35,13 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy private final MessageGateway messageGateway; @Override - public Function> createExecutor(ExecutionContext context, org.jetlinks.community.rule.engine.device.DeviceAlarmRuleNode.Config config) { + public Function> createExecutor(ExecutionContext context, DeviceAlarmRuleNode.Config config) { return Mono::just; } @Override - protected void onStarted(ExecutionContext context, org.jetlinks.community.rule.engine.device.DeviceAlarmRuleNode.Config config) { + protected void onStarted(ExecutionContext context, DeviceAlarmRuleNode.Config config) { context.onStop( config.doSubscribe(messageGateway) .flatMap(result -> { @@ -65,14 +65,14 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy @Setter public static class Config implements RuleNodeConfig { static List default_columns = Arrays.asList( - "timestamp", "deviceId" + "timestamp", "deviceId", "this.header.deviceName deviceName" ); private DeviceAlarmRule rule; @Override public void validate() { - if (CollectionUtils.isEmpty(rule.getConditions())) { + if (CollectionUtils.isEmpty(rule.getTriggers())) { throw new IllegalArgumentException("预警条件不能为空"); } } @@ -82,12 +82,16 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy List wheres = new ArrayList<>(); columns.addAll(rule.getPlainColumns()); - for (DeviceAlarmRule.Condition condition : rule.getConditions()) { - wheres.add(condition.createExpression(rule.getType())); + for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) { + trigger.createExpression() + .ifPresent(expr -> wheres.add("(" + expr + ")")); } - String sql = "select " + String.join(",", columns) + - " from msg where " + String.join(" or ", wheres); + String sql = "select " + String.join(",", columns) + " from msg "; + + if (!wheres.isEmpty()) { + sql = "where " + String.join(" or ", wheres); + } log.debug("create device alarm sql : {}", sql); return ReactorQL.builder().sql(sql).build(); @@ -98,10 +102,10 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy List binds = new ArrayList<>(); - for (DeviceAlarmRule.Condition condition : rule.getConditions()) { - String topic = rule.getType().getTopic(rule.getProductId(), rule.getDeviceId(), condition.getModelId()); + for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) { + String topic = trigger.getType().getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId()); topics.add(topic); - binds.add(condition.convertValue()); + binds.add(trigger.getFilterValues()); } List subscriptions = topics.stream().map(Subscription::new).collect(Collectors.toList()); @@ -147,15 +151,15 @@ public class DeviceAlarmRuleNode extends CommonExecutableRuleNodeFactoryStrategy map.putIfAbsent("productName", map.get("productId")); } if (log.isDebugEnabled()) { - log.debug("发生设备预警:{}", map); + log.debug("发生设备告警:{}", map); } - // 推送警告到消息网关中 + // 推送告警信息到消息网关中 // /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId} return gateway .publish(String.format( "/rule-engine/device/alarm/%s/%s/%s", rule.getProductId(), map.get("deviceId"), rule.getId() - ), map) + ), map, true) .then(Mono.just(map)); }); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java index de5813ae..8e144452 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java @@ -43,20 +43,20 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy { //处理定时触发 { - List timerConditions = alarmRule.getConditions().stream() - .filter(condition -> condition.getTrigger() == DeviceAlarmRule.ConditionType.timer) + List timerTriggers = alarmRule.getTriggers().stream() + .filter(trigger -> trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) .collect(Collectors.toList()); int index = 0; - for (DeviceAlarmRule.Condition timerCondition : timerConditions) { - DeviceMessage msg = alarmRule.getType().createMessage(timerCondition).orElse(null); + for (DeviceAlarmRule.Trigger timerTrigger : timerTriggers) { + DeviceMessage msg = timerTrigger.getType().createMessage(timerTrigger).orElse(null); if (msg == null) { - throw new UnsupportedOperationException("不支持定时条件类型:" + alarmRule.getType()); + throw new UnsupportedOperationException("不支持定时条件类型:" + timerTrigger.getType()); } RuleNodeModel timer = new RuleNodeModel(); timer.setId("timer:" + (++index)); timer.setName("定时发送设备消息"); timer.setExecutor("timer"); - timer.setConfiguration(Collections.singletonMap("cron", timerCondition.getCron())); + timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron())); DeviceMessageSendNode.Config senderConfig = new DeviceMessageSendNode.Config(); senderConfig.setAsync(true); @@ -88,9 +88,9 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy { conditionNode.setExecutor("device_alarm"); conditionNode.setConfiguration(Collections.singletonMap("rule", rule.getAlarmRule())); model.getNodes().add(conditionNode); - if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getOperations())) { + if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getActions())) { int index = 0; - for (DeviceAlarmRule.Operation operation : rule.getAlarmRule().getOperations()) { + for (DeviceAlarmRule.Action operation : rule.getAlarmRule().getActions()) { RuleNodeModel action = new RuleNodeModel(); action.setId("device_alarm_action:" + index); action.setName("执行动作:" + index);