优化设备告警逻辑

This commit is contained in:
zhouhao 2021-12-30 16:31:40 +08:00
parent fc696856d6
commit 0618d457d6
4 changed files with 152 additions and 70 deletions

View File

@ -86,7 +86,13 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
.onErrorResume(error -> context.onError(error, input))
.subscribeOn(Schedulers.parallel())
)
.map(reply -> context.newRuleData(input.newData(reply.toJson())))
.map(reply -> {
RuleData data = context.newRuleData(input.newData(reply.toJson()));
if (config.getResponseHeaders() != null) {
config.getResponseHeaders().forEach(data::setHeader);
}
return data;
})
;
}
@ -146,6 +152,8 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
private String stateOperator = "ignoreOffline";
private Map<String, Object> responseHeaders;
public Map<String, Object> toMap() {
Map<String, Object> conf = FastBeanCopier.copy(this, new HashMap<>());
conf.put("timeout", timeout.toString());

View File

@ -261,6 +261,45 @@ public class DeviceAlarmRule implements Serializable {
);
}
public String toSQL(int index, List<String> defaultColumns, List<DeviceAlarmRule.Property> properties) {
List<String> columns = new ArrayList<>(defaultColumns);
List<String> wheres = new ArrayList<>();
// select this.properties.this trigger0
columns.add(getType().getPropertyPrefix() + "this trigger" + index);
columns.addAll(toColumns());
createExpression()
.ifPresent(expr -> wheres.add("(" + expr + ")"));
String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
if (!wheres.isEmpty()) {
sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
}
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(properties)) {
List<String> newColumns = new ArrayList<>(defaultColumns);
for (DeviceAlarmRule.Property property : properties) {
if (StringUtils.isEmpty(property.getProperty())) {
continue;
}
String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
// 'message',func(),this[name]
if ((property.getProperty().startsWith("'") && property.getProperty().endsWith("'"))
||
property.getProperty().contains("(") || property.getProperty().contains("[")) {
newColumns.add(property.getProperty() + " \"" + alias + "\"");
} else {
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
}
}
if (newColumns.size() > defaultColumns.size()) {
sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
}
}
return sql;
}
public void validate() {
if (type == null) {
throw new IllegalArgumentException("类型不能为空");

View File

@ -6,14 +6,17 @@ import org.apache.commons.collections.CollectionUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.ValueObject;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.metadata.Jsonable;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
@ -56,21 +59,33 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
static class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
List<String> default_columns = Arrays.asList(
/**
* 默认要查询的列
*/
static List<String> default_columns = Arrays.asList(
//时间戳
"this.timestamp timestamp",
//设备ID
"this.deviceId deviceId",
//header
"this.headers headers",
//设备名称,通过DeviceMessageConnector自定填充了值
"this.headers.deviceName deviceName",
//消息唯一ID
"this.headers._uid _uid",
//消息类型,下游可以根据消息类型来做处理,比如:离线时,如果网关设备也不在线则不触发.
"this.messageType messageType"
);
private final EventBus eventBus;
private final Scheduler scheduler;
private DeviceAlarmRule rule;
//触发器对应的ReactorQL缓存
private final Map<DeviceAlarmRule.Trigger, ReactorQL> triggerQL = new ConcurrentHashMap<>();
//告警规则
private DeviceAlarmRule rule;
DeviceAlarmTaskExecutor(ExecutionContext context,
EventBus eventBus,
Scheduler scheduler) {
@ -139,48 +154,12 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
}
}
private ReactorQL createQL(int index, DeviceAlarmRule.Trigger trigger, DeviceAlarmRule rule) {
List<String> columns = new ArrayList<>(default_columns);
List<String> wheres = new ArrayList<>();
// select this.properties.this trigger0
columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + index);
columns.addAll(trigger.toColumns());
trigger.createExpression()
.ifPresent(expr -> wheres.add("(" + expr + ")"));
String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
if (!wheres.isEmpty()) {
sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
}
if (CollectionUtils.isNotEmpty(rule.getProperties())) {
List<String> newColumns = new ArrayList<>(default_columns);
for (DeviceAlarmRule.Property property : rule.getProperties()) {
if (StringUtils.isEmpty(property.getProperty())) {
continue;
}
String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
// 'message',func(),this[name]
if ((property.getProperty().startsWith("'") && property.getProperty().endsWith("'"))
||
property.getProperty().contains("(") || property.getProperty().contains("[")) {
newColumns.add(property.getProperty() + " \"" + alias + "\"");
} else {
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
}
}
if (newColumns.size() > default_columns.size()) {
sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
}
}
static ReactorQL createQL(int index, DeviceAlarmRule.Trigger trigger, DeviceAlarmRule rule) {
String sql = trigger.toSQL(index, default_columns, rule.getProperties());
log.debug("create device alarm sql : \n{}", sql);
return ReactorQL.builder().sql(sql).build();
}
private Map<DeviceAlarmRule.Trigger, ReactorQL> createQL(DeviceAlarmRule rule) {
Map<DeviceAlarmRule.Trigger, ReactorQL> qlMap = new HashMap<>();
int index = 0;
@ -192,22 +171,41 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
//满足触发条件的输出数据流
List<Flux<? extends Map<String, Object>>> triggerOutputs = new ArrayList<>();
List<Flux<? extends Map<String, Object>>> inputs = new ArrayList<>();
int index = 0;
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
//上游节点的输入
//定时触发时: 定时节点输出到设备指令节点,设备指令节点输出到当前节点
Flux<RuleData> input = context
.getInput()
.accept()
//使用cache,多个定时收到相同的数据
//通过header来进行判断具体是哪个触发器触发的,应该还有更好的方式.
.cache(0);
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
//QL不存在,理论上不会发生
ReactorQL ql = triggerQL.get(trigger);
if (ql == null) {
log.warn("DeviceAlarmRule trigger {} init error", index);
continue;
}
Flux<? extends Map<String, Object>> datasource;
int currentIndex = index;
//since 1.11 定时触发的不从eventBus订阅
if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) {
//从上游获取输入进行处理(通常是定时触发发送指令后得到的回复)
datasource = context
.getInput()
.accept()
datasource = input
.filter(data -> {
//通过上游输出的header来判断是否为同一个触发规则还有更好的方式?
return data
.getHeader("triggerIndex")
.map(idx -> CastUtils.castNumber(idx).intValue() == currentIndex)
.orElse(true);
})
.flatMap(RuleData::dataToMap);
}
//从事件总线中订阅数据
@ -224,28 +222,33 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
);
datasource = eventBus
.subscribe(subscription, DeviceMessage.class)
.map(Jsonable::toJson)
.doOnNext(json -> {
if (StringUtils.hasText(rule.getDeviceName())) {
json.putIfAbsent("deviceName", rule.getDeviceName());
}
if (StringUtils.hasText(rule.getProductName())) {
json.putIfAbsent("productName", rule.getProductName());
}
json.put("productId", rule.getProductId());
json.put("alarmId", rule.getId());
json.put("alarmName", rule.getName());
});
.map(Jsonable::toJson);
}
ReactorQLContext qlContext = ReactorQLContext.ofDatasource((t) -> datasource);
ReactorQLContext qlContext = ReactorQLContext
.ofDatasource((t) -> datasource
.doOnNext(map -> {
if (StringUtils.hasText(rule.getDeviceName())) {
map.putIfAbsent("deviceName", rule.getDeviceName());
}
if (StringUtils.hasText(rule.getProductName())) {
map.putIfAbsent("productName", rule.getProductName());
}
map.put("productId", rule.getProductId());
map.put("alarmId", rule.getId());
map.put("alarmName", rule.getName());
}));
//绑定SQL中的预编译变量
trigger.toFilterBinds().forEach(qlContext::bind);
inputs.add(ql.start(qlContext).map(ReactorQLRecord::asMap));
//启动ReactorQL进行实时数据处理
triggerOutputs.add(ql.start(qlContext).map(ReactorQLRecord::asMap));
}
Flux<Map<String, Object>> resultFlux = Flux.merge(inputs);
Flux<Map<String, Object>> resultFlux = Flux.merge(triggerOutputs);
//防抖
ShakeLimit shakeLimit;
if ((shakeLimit = rule.getShakeLimit()) != null) {
@ -256,6 +259,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
//规则已经指定了固定的设备,直接开启时间窗口就行
? flux.window(duration, scheduler)
//规则配置在设备产品上,则按设备ID分组后再开窗口
//设备越多,消耗的内存越大
: flux
.groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
.flatMap(group -> group.window(duration, scheduler), Integer.MAX_VALUE),
@ -264,6 +268,17 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
}
return resultFlux
.as(result -> {
//有多个触发条件时对重复的数据进行去重,
//防止同时满足条件时会产生多个告警记录
if (rule.getTriggers().size() > 1) {
return result
.as(FluxUtils.distinct(
map -> map.getOrDefault(PropertyConstants.uid.getKey(), ""),
Duration.ofSeconds(1)));
}
return result;
})
.flatMap(map -> {
@SuppressWarnings("all")
Map<String, Object> headers = (Map<String, Object>) map.remove("headers");
@ -297,8 +312,6 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
//生成告警记录时生成ID方便下游做处理
map.putIfAbsent("id", IDGenerator.MD5.generate());
// 推送告警信息到消息网关中
// /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId}
return eventBus
.publish(String.format(
"/rule-engine/device/alarm/%s/%s/%s",
@ -306,6 +319,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
map.get("deviceId"),
rule.getId()), map)
.thenReturn(map);
});
}
}

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.rule.engine.model;
import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
@ -32,6 +33,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
@Override
public RuleModel parse(String modelDefineString) {
//模型就是DeviceAlarmEntity的json
DeviceAlarmEntity rule = FastBeanCopier.copy(JSON.parseObject(modelDefineString), DeviceAlarmEntity::new);
RuleModel model = new RuleModel();
@ -39,28 +41,33 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
model.setName(rule.getName());
DeviceAlarmRule alarmRule = rule.getAlarmRule();
//验证规则
alarmRule.validate();
//告警条件节点
RuleNodeModel conditionNode = new RuleNodeModel();
conditionNode.setId("conditions");
conditionNode.setName("警条件");
conditionNode.setName("警条件");
conditionNode.setExecutor("device_alarm");
conditionNode.setConfiguration(Collections.singletonMap("rule", rule.getAlarmRule()));
//处理定时触发
//处理定时触发(定时向设备发送指令并获取返回结果)
{
List<DeviceAlarmRule.Trigger> timerTriggers = alarmRule
.getTriggers()
.stream()
//定时节点
.filter(trigger -> trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer)
.collect(Collectors.toList());
int index = 0;
for (DeviceAlarmRule.Trigger timerTrigger : timerTriggers) {
DeviceMessage msg = timerTrigger.getType().createMessage(timerTrigger).orElse(null);
if (msg == null) {
throw new UnsupportedOperationException("不支持定时条件类型:" + timerTrigger.getType());
throw new BusinessException("error.unsupported_timing_condition_type", 500, timerTrigger.getType());
}
//定时节点
//TimerTaskExecutorProvider
RuleNodeModel timer = new RuleNodeModel();
timer.setId("timer:" + (++index));
timer.setName("定时发送设备消息");
@ -68,30 +75,42 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
//发送指令节点
//DeviceMessageSendTaskExecutorProvider
DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig senderDeviceMessageSendConfig = new DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig();
//同步等待回复
senderDeviceMessageSendConfig.setAsync(false);
//直接发送不管设备是否在线
senderDeviceMessageSendConfig.setStateOperator("direct");
senderDeviceMessageSendConfig.setDeviceId(alarmRule.getDeviceId());
senderDeviceMessageSendConfig.setProductId(alarmRule.getProductId());
senderDeviceMessageSendConfig.setMessage(msg.toJson());
// 添加自定义响应头到RuleData中
// 用于在收到结果时,判断是由哪个触发条件触发的
// 因为所有告警节点只有一个,所有的定时执行结果都会输入到同一个节点中
senderDeviceMessageSendConfig.setResponseHeaders(Collections.singletonMap("triggerIndex", index));
//设备指令发送节点
//DeviceMessageSendTaskExecutorProvider
RuleNodeModel messageSender = new RuleNodeModel();
messageSender.setId("message-sender:" + (++index));
messageSender.setName("定时发送设备消息");
messageSender.setExecutor("device-message-sender");
messageSender.setConfiguration(senderDeviceMessageSendConfig.toMap());
//连接定时和设备指令节点
RuleLink link = new RuleLink();
link.setId(timer.getId().concat(":").concat(messageSender.getId()));
link.setName("执行动作:" + index);
link.setName("发送指令:" + index);
link.setSource(timer);
link.setTarget(messageSender);
//timer -> device-message-sender
timer.getOutputs().add(link);
//device-message-sender -> timer
messageSender.getInputs().add(link);
//添加定时和消息节点到模型
model.getNodes().add(timer);
model.getNodes().add(messageSender);
//输出传递到告警节点
//设备指令和告警条件节点连接起来
RuleLink toAlarm = new RuleLink();
toAlarm.setId(messageSender.getId().concat(":").concat(conditionNode.getId()));
toAlarm.setName("定时触发告警:" + index);
@ -102,7 +121,9 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
}
}
//添加告警条件到模型
model.getNodes().add(conditionNode);
//执行动作
if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getActions())) {
int index = 0;
for (Action operation : rule.getAlarmRule().getActions()) {