diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java index 1c5968ae..dea6fbe3 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/device/DeviceAlarmTaskExecutorProvider.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.exception.BusinessException; import org.hswebframework.web.id.IDGenerator; import org.jetlinks.community.ValueObject; import org.jetlinks.core.event.EventBus; @@ -29,8 +30,10 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; +import javax.annotation.Nonnull; import java.time.Duration; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; @Slf4j @AllArgsConstructor @@ -54,7 +57,11 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { static class DeviceAlarmTaskExecutor extends AbstractTaskExecutor { List default_columns = Arrays.asList( - "timestamp", "deviceId", "this.headers headers", "this.headers.deviceName deviceName" + "this.timestamp timestamp", + "this.deviceId deviceId", + "this.headers headers", + "this.headers.deviceName deviceName", + "this.messageType messageType" ); private final EventBus eventBus; @@ -62,7 +69,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { private DeviceAlarmRule rule; - private ReactorQL ql; + private final Map triggerQL = new ConcurrentHashMap<>(); DeviceAlarmTaskExecutor(ExecutionContext context, EventBus eventBus, @@ -70,10 +77,10 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { super(context); this.eventBus = eventBus; this.scheduler = scheduler; - rule = createRule(); - ql = createQL(rule); + init(); } + @Override public String getName() { return "设备告警"; @@ -96,50 +103,52 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { .subscribe(); } + void init() { + rule = createRule(); + Map ql = createQL(rule); + triggerQL.clear(); + triggerQL.putAll(ql); + } + @Override public void reload() { - rule = createRule(); - ql = createQL(rule); + init(); if (disposable != null) { disposable.dispose(); } disposable = doStart(); } + @Nonnull private DeviceAlarmRule createRule() { DeviceAlarmRule rule = ValueObject .of(context.getJob().getConfiguration()) .get("rule") .map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule())) - .orElseThrow(() -> new IllegalArgumentException("告警配置错误")); + .orElseThrow(() -> new IllegalArgumentException("error.alarm_configuration_error")); rule.validate(); return rule; } @Override public void validate() { - DeviceAlarmRule rule = createRule(); try { - createQL(rule); + createQL(createRule()); } catch (Exception e) { - throw new IllegalArgumentException("配置错误:" + e.getMessage(), e); + throw new BusinessException("error.configuration_error", 500, e.getMessage(), e); } } - private ReactorQL createQL(DeviceAlarmRule rule) { + private ReactorQL createQL(int index, DeviceAlarmRule.Trigger trigger, DeviceAlarmRule rule) { List columns = new ArrayList<>(default_columns); List wheres = new ArrayList<>(); - List triggers = rule.getTriggers(); + // select this.properties.this trigger0 + columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + index); + columns.addAll(trigger.toColumns()); + trigger.createExpression() + .ifPresent(expr -> wheres.add("(" + expr + ")")); - for (int i = 0; i < triggers.size(); i++) { - DeviceAlarmRule.Trigger trigger = triggers.get(i); - // select this.properties.this trigger0 - columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + i); - columns.addAll(trigger.toColumns()); - trigger.createExpression() - .ifPresent(expr -> wheres.add("(" + expr + ")")); - } String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual "; if (!wheres.isEmpty()) { @@ -147,11 +156,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { } if (CollectionUtils.isNotEmpty(rule.getProperties())) { - List newColumns = new ArrayList<>(Arrays.asList( - "this.deviceName deviceName", - "this.deviceId deviceId", - "this.headers headers", - "this.timestamp timestamp")); + List newColumns = new ArrayList<>(default_columns); for (DeviceAlarmRule.Property property : rule.getProperties()) { if (StringUtils.isEmpty(property.getProperty())) { continue; @@ -166,7 +171,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\""); } } - if (newColumns.size() > 4) { + if (newColumns.size() > default_columns.size()) { sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t"; } } @@ -175,43 +180,49 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { return ReactorQL.builder().sql(sql).build(); } - public Flux> doSubscribe(EventBus eventBus) { - Set topics = new HashSet<>(); - - List binds = new ArrayList<>(); + private Map createQL(DeviceAlarmRule rule) { + Map qlMap = new HashMap<>(); + int index = 0; for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) { - binds.addAll(trigger.toFilterBinds()); - //since 1.11 定时触发的不从eventBus订阅 - if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) { - continue; - } - - String topic = trigger - .getType() - .getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId()); - topics.add(topic); + qlMap.put(trigger, createQL(index++, trigger, rule)); } + return qlMap; + } + + public Flux> doSubscribe(EventBus eventBus) { + List>> inputs = new ArrayList<>(); + int index = 0; + for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) { - //从上游获取输入进行处理(通常是定时触发发送指令后得到的回复) - inputs.add( - context - .getInput() - .accept() - .flatMap(RuleData::dataToMap) - ); + ReactorQL ql = triggerQL.get(trigger); + if (ql == null) { + continue; + } + Flux> datasource; + //since 1.11 定时触发的不从eventBus订阅 + if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) { + //从上游获取输入进行处理(通常是定时触发发送指令后得到的回复) + datasource = context + .getInput() + .accept() + .flatMap(RuleData::dataToMap); + } + //从事件总线中订阅数据 + else { + String topic = trigger + .getType() + .getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId()); - //从事件总线订阅数据进行处理 - if (!topics.isEmpty()) { - Subscription subscription = Subscription.of( - "device_alarm:" + rule.getId(), - topics.toArray(new String[0]), - Subscription.Feature.local - ); - inputs.add( - eventBus + //从事件总线订阅数据进行处理 + Subscription subscription = Subscription.of( + "device_alarm:" + rule.getId() + ":" + index++, + topic, + Subscription.Feature.local + ); + datasource = eventBus .subscribe(subscription, DeviceMessage.class) .map(Jsonable::toJson) .doOnNext(json -> { @@ -224,17 +235,16 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { json.put("productId", rule.getProductId()); json.put("alarmId", rule.getId()); json.put("alarmName", rule.getName()); - }) - ); + }); + + } + + ReactorQLContext qlContext = ReactorQLContext.ofDatasource((t) -> datasource); + trigger.toFilterBinds().forEach(qlContext::bind); + inputs.add(ql.start(qlContext).map(ReactorQLRecord::asMap)); } - ReactorQLContext context = ReactorQLContext - .ofDatasource(ignore -> Flux.merge(inputs)); - binds.forEach(context::bind); - - Flux> resultFlux = (ql == null ? ql = createQL(rule) : ql) - .start(context) - .map(ReactorQLRecord::asMap); + Flux> resultFlux = Flux.merge(inputs); ShakeLimit shakeLimit; if ((shakeLimit = rule.getShakeLimit()) != null) { @@ -296,7 +306,6 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider { map.get("deviceId"), rule.getId()), map) .thenReturn(map); - }); } }