diff --git a/docker/dev-env/docker-compose.yml b/docker/dev-env/docker-compose.yml index da82505e..7347e38e 100644 --- a/docker/dev-env/docker-compose.yml +++ b/docker/dev-env/docker-compose.yml @@ -6,7 +6,7 @@ services: ports: - "6379:6379" volumes: - - "redis-volume:/data" + - "./data/redis:/data" command: redis-server --appendonly yes environment: - TZ=Asia/Shanghai @@ -40,7 +40,7 @@ services: ports: - "5432:5432" volumes: - - "postgres-volume:/var/lib/postgresql/data" + - "./data/pg:/var/lib/postgresql/data" environment: POSTGRES_PASSWORD: jetlinks POSTGRES_DB: jetlinks diff --git a/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/impl/DefaultRelationOperation.java b/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/impl/DefaultRelationOperation.java index 55ed2b41..4aec690d 100644 --- a/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/impl/DefaultRelationOperation.java +++ b/jetlinks-components/relation-component/src/main/java/org/jetlinks/community/relation/impl/DefaultRelationOperation.java @@ -79,8 +79,8 @@ class DefaultRelationOperation implements RelationOperation { private RelatedObject toObject(RelatedEntity entity) { if (reverse) { return new DefaultRelatedObject( - entity.getRelatedType(), - entity.getRelatedId(), + entity.getObjectType(), + entity.getObjectId(), type, id, entity.getRelation(), @@ -88,10 +88,10 @@ class DefaultRelationOperation implements RelationOperation { objectProvider); } return new DefaultRelatedObject( - type, - id, entity.getRelatedType(), entity.getRelatedId(), + type, + id, entity.getRelation(), relatedRepository, objectProvider); diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/RuleEngineConstants.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/RuleEngineConstants.java new file mode 100644 index 00000000..ec3e015f --- /dev/null +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/RuleEngineConstants.java @@ -0,0 +1,24 @@ +package org.jetlinks.community.rule.engine; + +import org.jetlinks.rule.engine.api.task.ExecutionContext; + +import java.util.Optional; + +public interface RuleEngineConstants { + + String ruleCreatorIdKey = "creatorId"; + + String ruleName = "name"; + + static Optional getCreatorId(ExecutionContext context) { + return context.getJob() + .getRuleConfiguration(ruleCreatorIdKey) + .map(String::valueOf); + } + + static Optional getRuleName(ExecutionContext context) { + return context.getJob() + .getRuleConfiguration(ruleName) + .map(String::valueOf); + } +} diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProvider.java index 4be0f675..6c84f0c8 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProvider.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProvider.java @@ -7,6 +7,8 @@ import reactor.core.publisher.Mono; import java.util.List; import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; public interface DeviceSelectorProvider extends Ordered { @@ -21,6 +23,13 @@ public interface DeviceSelectorProvider extends Ordered { Map ctx, NestConditional conditional); + + default > Function, Mono>> createLazy( + DeviceSelectorSpec source, + Supplier> conditionalSupplier) { + return ctx -> applyCondition(source, ctx, conditionalSupplier.get()); + } + @Override default int getOrder() { return 0; diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProviders.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProviders.java index 9f889627..9026ba60 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProviders.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorProviders.java @@ -12,6 +12,12 @@ public class DeviceSelectorProviders { private final static Map providers = new LinkedHashMap<>(); + public static final String + PROVIDER_FIXED = "fixed", + PROVIDER_CONTEXT = "context", + PROVIDER_PRODUCT = "product", + PROVIDER_TAG = "tag"; + static { register(SimpleDeviceSelectorProvider @@ -19,21 +25,44 @@ public class DeviceSelectorProviders { "all", "全部设备", (args, query) -> query)); - register(SimpleDeviceSelectorProvider - .of( - "fixed", "固定设备", - (args, query) -> query.in("id", args))); + { //固定设备,fixed和context作用和效果完全相同,只是为了前端方便区分不同的操作 + + /* + 选择固定的设备 + { + "selector":"fixed", + "selectorValues":[ {"value":"deviceId","name":"设备名称"} ], + } + */ + register(SimpleDeviceSelectorProvider + .of( + PROVIDER_FIXED, "固定设备", + (args, query) -> query.in("id", args))); + /* + 根据上下文变量选择设备 + { + "selector":"context", + "source":"upper", + "upperKey":"deviceId" + } + */ + register(SimpleDeviceSelectorProvider + .of( + PROVIDER_CONTEXT, "内置参数", + (args, query) -> query.in("id", args))); + } + register(SimpleDeviceSelectorProvider .of("state", "按状态", (args, query) -> query.in("state", args))); register(SimpleDeviceSelectorProvider - .of("product", "按产品", + .of(PROVIDER_PRODUCT, "按产品", (args, query) -> query.in("productId", args))); register(SimpleDeviceSelectorProvider - .of("tag", "按标签", + .of(PROVIDER_TAG, "按标签", (args, query) -> { if (args.size() == 1) { return query.accept("id", @@ -69,9 +98,15 @@ public class DeviceSelectorProviders { } + //判断是否为固定设备选择器,固定设备选择器不需要执行查询库,性能更高 + public static boolean isFixed(DeviceSelectorSpec spec) { + return PROVIDER_FIXED.equals(spec.getSelector()) || + PROVIDER_CONTEXT.equals(spec.getSelector()); + } + public static DeviceSelectorSpec fixed(Object value) { DeviceSelectorSpec spec = new DeviceSelectorSpec(); - spec.setSelector("fixed"); + spec.setSelector(PROVIDER_CONTEXT); spec.setSource(VariableSource.Source.fixed); spec.setValue(value); return spec; @@ -79,7 +114,7 @@ public class DeviceSelectorProviders { public static DeviceSelectorSpec product(String productId) { DeviceSelectorSpec spec = new DeviceSelectorSpec(); - spec.setSelector("product"); + spec.setSelector(PROVIDER_PRODUCT); spec.setSource(VariableSource.Source.fixed); spec.setValue(productId); return spec; @@ -118,4 +153,4 @@ public class DeviceSelectorProviders { return new ArrayList<>(providers.values()); } -} +} \ No newline at end of file diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorSpec.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorSpec.java index a07279be..232e9f85 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorSpec.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/device/DeviceSelectorSpec.java @@ -122,7 +122,7 @@ public class DeviceSelectorSpec extends VariableSource { if (CollectionUtils.isNotEmpty(selectorValues)) { return Flux .fromIterable(selectorValues) - .map(SelectorValue::getValue); + .mapNotNull(SelectorValue::getValue); } return super.resolve(context); } @@ -148,3 +148,4 @@ public class DeviceSelectorSpec extends VariableSource { .flatMap(DeviceProductOperator::getMetadata); } } + diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/function/ReactorQLDeviceSelectorBuilder.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/function/ReactorQLDeviceSelectorBuilder.java index c71fe590..ab8046fc 100755 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/function/ReactorQLDeviceSelectorBuilder.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/function/ReactorQLDeviceSelectorBuilder.java @@ -1,6 +1,8 @@ package org.jetlinks.community.device.function; import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.core.NestConditional; +import org.hswebframework.ezorm.rdb.mapping.ReactiveQuery; import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; @@ -13,11 +15,26 @@ import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec; import org.jetlinks.reactor.ql.ReactorQL; import org.jetlinks.reactor.ql.ReactorQLContext; import org.jetlinks.reactor.ql.ReactorQLRecord; +import org.jetlinks.reactor.ql.feature.FromFeature; +import org.springframework.data.util.Lazy; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.util.Map; +import java.util.function.Function; /** + * 基于ReactorQL的设备选择器,通过自定义{@link FromFeature}来实现设备数据源. + *
+ * in_gourp('groupId') 在指定的设备分组中
+ * in_group_tree('groupId') 在指定分组中(包含下级分组)
+ * same_group('deviceId') 在指定设备的相同分组中
+ * product('productId') 指定产品ID对应的设备
+ * tag('tag1Key','tag1Value','tag2Key','tag2Value') 按指定的标签获取
+ * state('online') 按指定的状态获取
+ * in_tenant('租户ID') 在指定租户中的设备
+ * 
+ * * @author zhouhao * @since 2.0 */ @@ -28,6 +45,7 @@ public class ReactorQLDeviceSelectorBuilder implements DeviceSelectorBuilder { private final ReactiveRepository deviceRepository; + @Override @SuppressWarnings("all") public DeviceSelector createSelector(DeviceSelectorSpec spec) { @@ -35,17 +53,25 @@ public class ReactorQLDeviceSelectorBuilder implements DeviceSelectorBuilder { .getProvider(spec.getSelector()) .orElseThrow(() -> new UnsupportedOperationException("unsupported selector:" + spec.getSelector())); - return context -> provider - .applyCondition(spec, - context, - deviceRepository - .createQuery() - .select(DeviceInstanceEntity::getId) - .nest()) - .flatMapMany(ctd -> ctd - .end() - .fetch() - .map(DeviceInstanceEntity::getId)) + //固定设备,直接获取,避免查询数据库性能低. + if (DeviceSelectorProviders.isFixed(spec)) { + return ctx -> { + return spec + .resolveSelectorValues(ctx) + .map(String::valueOf) + .flatMap(registry::getDevice); + }; + } + Function, Mono>>> lazy = provider + .createLazy(spec, + Lazy.of(() -> deviceRepository + .createQuery() + .select(DeviceInstanceEntity::getId) + .nest())); + + return context -> lazy + .apply(context) + .flatMapMany(ctd -> ctd.end().fetch().map(DeviceInstanceEntity::getId)) .flatMap(registry::getDevice); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmConstants.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmConstants.java new file mode 100644 index 00000000..873b3ba2 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmConstants.java @@ -0,0 +1,20 @@ +package org.jetlinks.community.rule.engine.alarm; + +public interface AlarmConstants { + + interface ConfigKey { + String alarmConfigId = "alarmConfigId"; + String alarming = "alarming"; + String firstAlarm = "firstAlarm"; + String alarmName = "name"; + String level = "level"; + String ownerId = "ownerId"; + String targetType = "targetType"; + String state = "state"; + String alarmTime = "alarmTime"; + String lastAlarmTime = "lastAlarmTime"; + String targetId = "targetId"; + String targetName = "targetName"; + + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmData.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmData.java new file mode 100644 index 00000000..5773b1fd --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmData.java @@ -0,0 +1,25 @@ +package org.jetlinks.community.rule.engine.alarm; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Map; + +@Getter +@Setter +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +public class AlarmData implements Serializable { + private static final long serialVersionUID = 1L; + + private String alarmConfigId; + private String alarmName; + + private String ruleId; + private String ruleName; + + private Map output; +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmRuleHandler.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmRuleHandler.java new file mode 100644 index 00000000..b04409aa --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmRuleHandler.java @@ -0,0 +1,110 @@ +package org.jetlinks.community.rule.engine.alarm; + +import com.google.common.collect.Maps; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.community.rule.engine.alarm.AlarmConstants.ConfigKey; +import org.jetlinks.rule.engine.api.RuleData; +import org.jetlinks.rule.engine.api.task.ExecutionContext; +import reactor.core.publisher.Flux; + +import java.util.Map; + +/** + * 告警规则数据处理器,当场景规则中配置的告警动作被执行时,将调用此处理器的相关方法. + * + * @author zhouhao + * @since 2.0 + * @see AlarmTaskExecutorProvider + */ +public interface AlarmRuleHandler { + + /** + * 触发告警 + * + * @param context 告警规则上下文 + * @param data 告警数据 + * @return 处理结果 + * @see org.jetlinks.community.rule.engine.enums.AlarmMode#trigger + */ + Flux triggered(ExecutionContext context, RuleData data); + + /** + * 解除告警 + * + * @param context 告警规则上下文 + * @param data 告警数据 + * @return 处理结果 + * @see org.jetlinks.community.rule.engine.enums.AlarmMode#relieve + */ + Flux relieved(ExecutionContext context, RuleData data); + + @Getter + @Setter + @AllArgsConstructor(staticName = "of") + @NoArgsConstructor + class Result { + + @Schema(description = "告警配置ID") + private String alarmConfigId; + + @Schema(description = "告警名称") + private String alarmName; + + @Schema(description = "当前是否正在告警") + private boolean alarming; + + @Schema(description = "当前首次触发") + private boolean firstAlarm; + + @Schema(description = "告警级别") + private int level; + + @Schema(description = "上一次告警时间") + private long lastAlarmTime; + + @Schema(description = "首次告警或者解除告警后的再一次告警时间.") + private long alarmTime; + + @Schema(description = "告警目标类型") + private String targetType; + + @Schema(description = "告警目标ID") + private String targetId; + + @Schema(description = "告警目标名称") + private String targetName; + + + public Result copyWith(AlarmTargetInfo targetInfo) { + Result result = FastBeanCopier.copy(this, new Result()); + result.setTargetType(targetInfo.getTargetType()); + result.setTargetId(targetInfo.getTargetId()); + result.setTargetName(targetInfo.getTargetName()); + return result; + } + + + public Map toMap() { + Map map = Maps.newHashMapWithExpectedSize(16); + + map.put(ConfigKey.alarmConfigId, alarmConfigId); + map.put(ConfigKey.alarmName, alarmName); + map.put(ConfigKey.alarming, alarming); + map.put(ConfigKey.firstAlarm, firstAlarm); + map.put(ConfigKey.level, level); + map.put(ConfigKey.alarmTime, alarmTime); + map.put(ConfigKey.lastAlarmTime, lastAlarmTime); + map.put(ConfigKey.targetType, targetType); + map.put(ConfigKey.targetId, targetId); + map.put(ConfigKey.targetName, targetName); + + return map; + } + } + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmSceneHandler.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmSceneHandler.java index 7c5626ae..f6e63971 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmSceneHandler.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmSceneHandler.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; */ @Component @AllArgsConstructor +@Deprecated public class AlarmSceneHandler implements SceneFilter, CommandLineRunner { private final EventBus eventBus; @@ -63,7 +64,7 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner { .fromIterable(alarmConfigMap.values()) .flatMap(alarmConfig -> AlarmTarget .of(alarmConfig.getTargetType()) - .convert(data) + .convert(AlarmData.of(alarmConfig.getId(), alarmConfig.getName(), data.getRule().getId(), data.getRule().getName(), data.getOutput())) .flatMap(targetInfo -> { AlarmRecordEntity record = ofRecord(targetInfo, alarmConfig); //修改告警记录 diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java index 7e7aaac1..10ca5907 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTarget.java @@ -1,6 +1,5 @@ package org.jetlinks.community.rule.engine.alarm; -import org.jetlinks.community.rule.engine.scene.SceneData; import reactor.core.publisher.Flux; /** @@ -15,7 +14,7 @@ public interface AlarmTarget { String getName(); - Flux convert(SceneData data); + Flux convert(AlarmData data); static AlarmTarget of(String type) { return AlarmTargetSupplier diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java new file mode 100644 index 00000000..821ecb6a --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/AlarmTaskExecutorProvider.java @@ -0,0 +1,90 @@ +package org.jetlinks.community.rule.engine.alarm; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.validator.ValidatorUtils; +import org.jetlinks.community.rule.engine.enums.AlarmMode; +import org.jetlinks.rule.engine.api.RuleData; +import org.jetlinks.rule.engine.api.task.ExecutionContext; +import org.jetlinks.rule.engine.api.task.TaskExecutor; +import org.jetlinks.rule.engine.api.task.TaskExecutorProvider; +import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor; +import org.reactivestreams.Publisher; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.validation.constraints.NotNull; +import java.io.Serializable; +import java.util.function.Function; + +@AllArgsConstructor +@Component +public class AlarmTaskExecutorProvider implements TaskExecutorProvider { + public static final String executor = "alarm"; + + private final AlarmRuleHandler alarmHandler; + + @Override + public String getExecutor() { + return executor; + } + + @Override + public Mono createTask(ExecutionContext context) { + return Mono.just(new AlarmTaskExecutor(context, alarmHandler)); + } + + static class AlarmTaskExecutor extends FunctionTaskExecutor { + + private final AlarmRuleHandler handler; + + private Function> executor; + + private Config config; + + public AlarmTaskExecutor(ExecutionContext context, AlarmRuleHandler handler) { + super("告警", context); + this.handler = handler; + reload(); + } + + @Override + public String getName() { + return config.getMode() == AlarmMode.relieve + ? "解除告警" : "触发告警"; + } + + @Override + protected Publisher apply(RuleData input) { + return executor + .apply(input) + .map(result -> context.newRuleData(input.newData(result.toMap()))); + } + + @Override + public void reload() { + config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config()); + ValidatorUtils.tryValidate(config); + if (config.mode == AlarmMode.relieve) { + executor = input -> handler.relieved(context, input); + } else { + executor = input -> handler.triggered(context, input); + } + } + } + + + @Getter + @Setter + public static class Config implements Serializable { + @NotNull + @Schema(description = "告警方式") + private AlarmMode mode; + + + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmRuleHandler.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmRuleHandler.java new file mode 100644 index 00000000..9a54ab6a --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DefaultAlarmRuleHandler.java @@ -0,0 +1,459 @@ +package org.jetlinks.community.rule.engine.alarm; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.authorization.DefaultDimensionType; +import org.hswebframework.web.authorization.ReactiveAuthenticationHolder; +import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.crud.events.EntityCreatedEvent; +import org.hswebframework.web.crud.events.EntityDeletedEvent; +import org.hswebframework.web.crud.events.EntityModifyEvent; +import org.hswebframework.web.crud.events.EntitySavedEvent; +import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.community.rule.engine.RuleEngineConstants; +import org.jetlinks.core.config.ConfigStorage; +import org.jetlinks.core.config.ConfigStorageManager; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.event.Subscription; +import org.jetlinks.core.utils.CompositeSet; +import org.jetlinks.core.utils.Reactors; +import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity; +import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo; +import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity; +import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity; +import org.jetlinks.community.rule.engine.enums.AlarmRecordState; +import org.jetlinks.community.rule.engine.enums.AlarmState; +import org.jetlinks.community.rule.engine.scene.SceneRule; +import org.jetlinks.community.rule.engine.service.AlarmHistoryService; +import org.jetlinks.community.rule.engine.service.AlarmRecordService; +import org.jetlinks.community.topic.Topics; +import org.jetlinks.community.utils.ObjectMappers; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.jetlinks.rule.engine.api.RuleData; +import org.jetlinks.rule.engine.api.RuleDataHelper; +import org.jetlinks.rule.engine.api.task.ExecutionContext; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@AllArgsConstructor +@Component +public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRunner { + + private static final Set configInfoKey = new HashSet<>( + Arrays.asList( + AlarmConstants.ConfigKey.alarmConfigId, + AlarmConstants.ConfigKey.alarmName, + AlarmConstants.ConfigKey.level, + AlarmConstants.ConfigKey.alarmTime, + AlarmConstants.ConfigKey.lastAlarmTime, + AlarmConstants.ConfigKey.targetType, + AlarmConstants.ConfigKey.state, + AlarmConstants.ConfigKey.ownerId + )); + + private final Map, Set> ruleAlarmBinds = new ConcurrentHashMap<>(); + + private final AlarmRecordService alarmRecordService; + private final AlarmHistoryService historyService; + private final ConfigStorageManager storageManager; + private final ApplicationEventPublisher eventPublisher; + + private final EventBus eventBus; + + public final ReactiveRepository bindRepository; + + @Override + public Flux triggered(ExecutionContext context, RuleData data) { + return this + .parseAlarmInfo(context, data) + .flatMap(this::triggerAlarm); + } + + @Override + public Flux relieved(ExecutionContext context, RuleData data) { + return this + .parseAlarmInfo(context, data) + .flatMap(this::relieveAlarm); + } + + private Flux parseAlarmInfo(ExecutionContext context, RuleData data) { + if (ruleAlarmBinds.isEmpty()) { + return Flux.empty(); + } + //节点所在的条件分支索引 + int branchIndex = context + .getJob() + .getConfiguration(SceneRule.ACTION_KEY_BRANCH_INDEX) + .map(idx -> CastUtils.castNumber(idx).intValue()) + .orElse(AlarmRuleBindEntity.ANY_BRANCH_INDEX); + + Set alarmId = getBoundAlarmId(context.getInstanceId(), branchIndex); + + if (CollectionUtils.isEmpty(alarmId)) { + return Flux.empty(); + } + + Map contextMap = RuleDataHelper.toContextMap(data); + return Flux + .fromIterable(alarmId) + .flatMap(this::getAlarmStorage) + .flatMap(store -> parseAlarm(context, store, contextMap)); + } + + private Set getBoundAlarmId(String ruleId, int branchIndex) { + //指定和特定分支绑定的告警 + Set specific = ruleAlarmBinds.get(Tuples.of(ruleId, branchIndex)); + + //未指定特定分支的告警 + Set any = ruleAlarmBinds.get(Tuples.of(ruleId, AlarmRuleBindEntity.ANY_BRANCH_INDEX)); + + //没有任何告警绑定了规则 + if (CollectionUtils.isEmpty(specific) && CollectionUtils.isEmpty(any)) { + return Collections.emptySet(); + } + + //只有特定分支 + if (CollectionUtils.isNotEmpty(specific) && CollectionUtils.isEmpty(any)) { + return specific; + } + //只有任意规则 + else if (CollectionUtils.isEmpty(specific) && CollectionUtils.isNotEmpty(any)) { + return any; + } else { + return new CompositeSet<>(specific, any); + } + } + + private AlarmRecordEntity ofRecord(Result result) { + AlarmRecordEntity entity = new AlarmRecordEntity(); + entity.setAlarmConfigId(result.getAlarmConfigId()); + entity.setState(AlarmRecordState.warning); + entity.setAlarmTime(System.currentTimeMillis()); + entity.setLevel(result.getLevel()); + entity.setTargetType(result.getTargetType()); + entity.setTargetName(result.getTargetName()); + entity.setTargetId(result.getTargetId()); + entity.setAlarmName(result.getAlarmName()); + entity.generateId(); + return entity; + } + + private Flux parseAlarm(ExecutionContext context, ConfigStorage alarm, Map contextMap) { + return this + .getAlarmInfo(alarm) + .flatMapMany(result -> { + + String ruleName = RuleEngineConstants + .getRuleName(context) + .orElse(result.getAlarmName()); + + AlarmData alarmData = AlarmData.of( + result.getAlarmConfigId(), + result.getAlarmName(), + context.getInstanceId(), + ruleName, + contextMap); + + result.setData(alarmData); + + return AlarmTarget + .of(result.getTargetType()) + .convert(alarmData) + .map(result::copyWith); + }); + } + + private Mono relieveAlarm(AlarmInfo result) { + AlarmRecordEntity record = ofRecord(result); + + //更新告警状态. + return alarmRecordService + .createUpdate() + .set(AlarmRecordEntity::getState, AlarmRecordState.normal) + .set(AlarmRecordEntity::getHandleTime, System.currentTimeMillis()) + .where(AlarmRecordEntity::getId, record.getId()) + .and(AlarmRecordEntity::getState, AlarmRecordState.warning) + .execute() + .map(total -> { + + //如果有数据被更新说明是正在告警中 + result.setAlarming(total > 0); + + return result; + }); + + } + + private Mono triggerAlarm(AlarmInfo result) { + AlarmRecordEntity record = ofRecord(result); + + //更新告警状态. + return alarmRecordService + .createUpdate() + .set(record) + .where(AlarmRecordEntity::getId, record.getId()) + .and(AlarmRecordEntity::getState, AlarmRecordState.warning) + .execute() + //更新数据库报错,依然尝试触发告警! + .onErrorResume(err -> { + log.error("trigger alarm error", err); + return Reactors.ALWAYS_ZERO; + }) + .flatMap(total -> { + AlarmHistoryInfo historyInfo = createHistory(record, result); + //更新结果返回0 说明是新产生的告警数据 + if (total == 0) { + result.setFirstAlarm(true); + result.setAlarming(false); + result.setAlarmTime(record.getAlarmTime()); + + return alarmRecordService + .save(record) + .then(historyService.save(historyInfo)) + .then(publishAlarmRecord(historyInfo)) + .then(publishEvent(historyInfo)) + .then(saveAlarmCache(result, record)); + } + result.setFirstAlarm(false); + result.setAlarming(true); + + return historyService + .save(historyInfo) + .then(publishEvent(historyInfo)) + .then(saveAlarmCache(result, record)); + }); + } + + private Mono publishEvent(AlarmHistoryInfo historyInfo) { + return Mono.fromRunnable(() -> eventPublisher.publishEvent(historyInfo)); + } + + private AlarmHistoryInfo createHistory(AlarmRecordEntity record, AlarmInfo alarmInfo) { + AlarmHistoryInfo info = new AlarmHistoryInfo(); + info.setId(IDGenerator.SNOW_FLAKE_STRING.generate()); + info.setAlarmConfigId(record.getAlarmConfigId()); + info.setAlarmConfigName(record.getAlarmName()); + info.setAlarmRecordId(record.getId()); + info.setLevel(record.getLevel()); + info.setAlarmTime(record.getAlarmTime()); + info.setTargetName(record.getTargetName()); + info.setTargetId(record.getTargetId()); + info.setTargetType(record.getTargetType()); + info.setAlarmInfo(ObjectMappers.toJsonString(alarmInfo.getData().getOutput())); + return info; + } + + + public Mono publishAlarmRecord(AlarmHistoryInfo historyInfo) { + String topic = Topics.alarm(historyInfo.getTargetType(), historyInfo.getTargetId(), historyInfo.getAlarmConfigId()); + return eventBus.publish(topic, historyInfo).then(); + } + + private Mono saveAlarmCache(AlarmInfo result, + AlarmRecordEntity record) { + + + return this + .getAlarmStorage(result.getAlarmConfigId()) + .flatMap(store -> { + Mono save = store.setConfig("lastAlarmTime", record.getAlarmTime()).then(); + + if (!result.isAlarming()) { + save = save.then(store.setConfig("alarmTime", record.getAlarmTime()).then()); + } + + return save; + }) + .thenReturn(result); + } + + private Mono getAlarmInfo(ConfigStorage alarm) { + return alarm + .getConfigs(configInfoKey) + .mapNotNull(values -> { + //告警禁用了 + if (values + .getString(AlarmConstants.ConfigKey.state, AlarmState.enabled.name()) + .equals(AlarmState.disabled.name())) { + return null; + } + + AlarmInfo result = FastBeanCopier.copy(values.getAllValues(), new AlarmInfo()); + + if (result.getAlarmConfigId() == null || + result.getAlarmName() == null) { + //缓存丢失了?从数据库里获取? + return null; + } + + return result; + }); + } + + private Mono getAlarmStorage(String alarmId) { + return storageManager.getStorage("alarm:" + alarmId); + } + + + /* 处理告警配置缓存事件 */ + + static final String TOPIC_ALARM_CONFIG_SAVE = "/_sys/device-alarm-config/save"; + static final String TOPIC_ALARM_CONFIG_DELETE = "/_sys/device-alarm-rule/del"; + + @EventListener + public void handleConfigEvent(EntitySavedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, e)) + ); + } + + @EventListener + public void handleConfigEvent(EntityCreatedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, e)) + ); + } + + @EventListener + public void handleConfigEvent(EntityModifyEvent event) { + event.async( + Flux.fromIterable(event.getAfter()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, e)) + ); + } + + @EventListener + public void handleConfigEvent(EntityDeletedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_CONFIG_DELETE, e)) + ); + } + + + @Subscribe(value = TOPIC_ALARM_CONFIG_SAVE, features = {Subscription.Feature.local, Subscription.Feature.broker}) + public Mono handleAlarmConfig(AlarmConfigEntity entity) { + return this + .getAlarmStorage(entity.getId()) + .flatMap(store -> store.setConfigs(entity.toConfigMap())) + .then(); + } + + @Subscribe(value = TOPIC_ALARM_CONFIG_DELETE, features = {Subscription.Feature.local, Subscription.Feature.broker}) + public Mono removeAlarmConfig(AlarmConfigEntity entity) { + return this + .getAlarmStorage(entity.getId()) + .flatMap(ConfigStorage::clear) + .then(); + } + + + /* 处理告警和规则绑定事件 */ + static final String TOPIC_ALARM_RULE_BIND = "/_sys/device-alarm-rule/bind"; + static final String TOPIC_ALARM_RULE_UNBIND = "/_sys/device-alarm-rule/unbind"; + + + @EventListener + public void handleBindEvent(EntitySavedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_RULE_BIND, e)) + ); + } + + @EventListener + public void handleBindEvent(EntityCreatedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_RULE_BIND, e)) + ); + } + + @EventListener + public void handleBindEvent(EntityDeletedEvent event) { + event.async( + Flux.fromIterable(event.getEntity()) + .flatMap(e -> eventBus.publish(TOPIC_ALARM_RULE_UNBIND, e)) + ); + } + + @Subscribe(value = TOPIC_ALARM_RULE_UNBIND, features = {Subscription.Feature.local, Subscription.Feature.broker}) + public void handleUnBind(AlarmRuleBindEntity entity) { + Integer index = entity.getBranchIndex(); + if (index == null) { + index = AlarmRuleBindEntity.ANY_BRANCH_INDEX; + } + + ruleAlarmBinds + .compute(Tuples.of(entity.getRuleId(), index), (key, value) -> { + if (value == null) { + return null; + } + value.remove(entity.getAlarmId()); + if (value.isEmpty()) { + return null; + } + return value; + }); + } + + @Subscribe(value = TOPIC_ALARM_RULE_BIND, features = {Subscription.Feature.local, Subscription.Feature.broker}) + public void handleBind(AlarmRuleBindEntity entity) { + Integer index = entity.getBranchIndex(); + if (index == null) { + index = AlarmRuleBindEntity.ANY_BRANCH_INDEX; + } + ruleAlarmBinds + .computeIfAbsent(Tuples.of(entity.getRuleId(), index), ignore -> ConcurrentHashMap.newKeySet()) + .add(entity.getAlarmId()); + } + + @Override + public void run(String... args) throws Exception { + //启动时加载绑定配置 + bindRepository + .createQuery() + .fetch() + .doOnNext(this::handleBind) + .subscribe(); + + } + + @Getter + @Setter + public static class AlarmInfo extends Result { + /** + * 告警所有者用户ID,表示告警是属于哪个用户的,用于进行数据权限控制 + */ + private String ownerId; + + private AlarmData data; + + @Override + public AlarmInfo copyWith(AlarmTargetInfo targetInfo) { + AlarmInfo result = FastBeanCopier.copy(this, new AlarmInfo()); + result.setTargetType(targetInfo.getTargetType()); + result.setTargetId(targetInfo.getTargetId()); + result.setTargetName(targetInfo.getTargetName()); + return result; + } + } + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java index ef90cf65..4a589d01 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/DeviceAlarmTarget.java @@ -23,7 +23,7 @@ public class DeviceAlarmTarget implements AlarmTarget { } @Override - public Flux convert(SceneData data) { + public Flux convert(AlarmData data) { Map output = data.getOutput(); String deviceId = CastUtils.castString(output.get("deviceId")); String deviceName = CastUtils.castString(output.getOrDefault("deviceName", deviceId)); diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java index 0e242bf2..89fb8049 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/OtherAlarmTarget.java @@ -20,11 +20,12 @@ public class OtherAlarmTarget implements AlarmTarget { } @Override - public Flux convert(SceneData data) { + public Flux convert(AlarmData data) { return Flux.just(AlarmTargetInfo - .of(data.getRule().getId(), - data.getRule().getName(), + .of(data.getRuleId(), + data.getRuleName(), getType())); } + } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java index 293d3890..42ec8428 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/alarm/ProductAlarmTarget.java @@ -1,6 +1,6 @@ package org.jetlinks.community.rule.engine.alarm; -import org.jetlinks.community.rule.engine.scene.SceneData; + import org.jetlinks.reactor.ql.utils.CastUtils; import reactor.core.publisher.Flux; @@ -23,7 +23,7 @@ public class ProductAlarmTarget implements AlarmTarget { } @Override - public Flux convert(SceneData data) { + public Flux convert(AlarmData data) { Map output = data.getOutput(); String productId = CastUtils.castString(output.get("productId")); String productName = CastUtils.castString(output.getOrDefault("productName", productId)); diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineManagerConfiguration.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineManagerConfiguration.java index 1b017550..e6d884bc 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineManagerConfiguration.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/configuration/RuleEngineManagerConfiguration.java @@ -18,9 +18,9 @@ public class RuleEngineManagerConfiguration { @Bean public SceneTaskExecutorProvider sceneTaskExecutorProvider(EventBus eventBus, - ObjectProvider filters) { + ObjectProvider filters) { return new SceneTaskExecutorProvider(eventBus, - SceneFilter.composite(filters)); + SceneFilter.composite(filters)); } @Configuration(proxyBeanMethods = false) diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmConfigEntity.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmConfigEntity.java index 0010cb97..84eccb9f 100755 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmConfigEntity.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmConfigEntity.java @@ -9,14 +9,18 @@ import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue; import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec; import org.hswebframework.web.api.crud.entity.GenericEntity; import org.hswebframework.web.api.crud.entity.RecordCreationEntity; +import org.hswebframework.web.api.crud.entity.RecordModifierEntity; import org.hswebframework.web.crud.annotation.EnableEntityEvent; import org.hswebframework.web.crud.generator.Generators; +import org.jetlinks.community.rule.engine.alarm.AlarmConstants; import org.jetlinks.community.rule.engine.enums.AlarmState; import org.jetlinks.community.rule.engine.scene.TriggerType; import javax.persistence.Column; import javax.persistence.Index; import javax.persistence.Table; +import java.util.HashMap; +import java.util.Map; @Getter @Setter @@ -25,7 +29,7 @@ import javax.persistence.Table; }) @Comment("告警配置表") @EnableEntityEvent -public class AlarmConfigEntity extends GenericEntity implements RecordCreationEntity { +public class AlarmConfigEntity extends GenericEntity implements RecordCreationEntity, RecordModifierEntity { @Column(length = 64, nullable = false) @Schema(description = "名称") @@ -39,32 +43,33 @@ public class AlarmConfigEntity extends GenericEntity implements RecordCr @Schema(description = "告警级别") private Integer level; - @Column(length = 128, nullable = false) + @Column(length = 128) @Schema(description = "关联场景名称") private String sceneName; - @Column(length = 64, nullable = false) + @Column(length = 64) @Schema(description = "关联场景Id") private String sceneId; @Column(length = 32, nullable = false) @EnumCodec @ColumnType(javaType = String.class) - @DefaultValue("enabled") + @DefaultValue("disabled") @Schema(description = "状态") private AlarmState state; - @Column(length = 32, nullable = false) + @Column(length = 32) @EnumCodec @ColumnType(javaType = String.class) @Schema(description = "场景触发类型") + @Deprecated private TriggerType sceneTriggerType; @Column(length = 256) @Schema(description = "说明") private String description; - @Column(updatable = false) + @Column(length = 64, updatable = false) @Schema( description = "创建者ID(只读)" , accessMode = Schema.AccessMode.READ_ONLY @@ -78,4 +83,27 @@ public class AlarmConfigEntity extends GenericEntity implements RecordCr , accessMode = Schema.AccessMode.READ_ONLY ) private Long createTime; + + @Column(length = 64) + @Schema(description = "更新者ID", accessMode = Schema.AccessMode.READ_ONLY) + private String modifierId; + + @Column + @DefaultValue(generator = Generators.CURRENT_TIME) + @Schema(description = "更新时间") + private Long modifyTime; + + + public Map toConfigMap() { + Map configs = new HashMap<>(); + + configs.put(AlarmConstants.ConfigKey.alarmConfigId, getId()); + configs.put(AlarmConstants.ConfigKey.alarmName, getName()); + configs.put(AlarmConstants.ConfigKey.level, getLevel()); + configs.put(AlarmConstants.ConfigKey.ownerId, getModifierId() == null ? getCreatorId() : getModifierId()); + configs.put(AlarmConstants.ConfigKey.targetType, getTargetType()); + configs.put(AlarmConstants.ConfigKey.state, getState().name()); + + return configs; + } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRecordEntity.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRecordEntity.java index bdce70b6..cc6b88fc 100755 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRecordEntity.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRecordEntity.java @@ -25,18 +25,22 @@ public class AlarmRecordEntity extends GenericEntity { @Schema(description = "告警配置ID") private String alarmConfigId; - @Column(length = 64, nullable = false, updatable = false) + @Column(length = 64, nullable = false) @Schema(description = "告警配置名称") private String alarmName; - @Column + @Column(length = 32, updatable = false) @Schema(description = "告警目标类型") private String targetType; - @Column + @Column(length = 64, updatable = false) @Schema(description = "告警目标Id") private String targetId; + @Column(length = 64, updatable = false) + @Schema(description = "告警目标Key") + private String targetKey; + @Column @Schema(description = "告警目标名称") private String targetName; @@ -45,6 +49,10 @@ public class AlarmRecordEntity extends GenericEntity { @Schema(description = "最近一次告警时间") private Long alarmTime; + @Column + @Schema(description = "处理时间") + private Long handleTime; + @Column @Schema(description = "告警级别") private Integer level; @@ -60,14 +68,25 @@ public class AlarmRecordEntity extends GenericEntity { @Schema(description = "说明") private String description; + public String getTargetKey() { + if (targetKey == null) { + generateKey(); + } + return targetKey; + } + + public void generateKey() { + setTargetKey(generateId(targetId, targetType)); + } public void generateId() { setId(generateId(targetId, targetType, alarmConfigId)); } - public static String generateId(String targetId, String targetType, String alarmConfigId) { - return DigestUtils.md5Hex(String.join("-", targetId, targetType, alarmConfigId)); + public static String generateId(String... args) { + return DigestUtils.md5Hex(String.join("-", args)); } } + diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRuleBindEntity.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRuleBindEntity.java new file mode 100644 index 00000000..e3138199 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/entity/AlarmRuleBindEntity.java @@ -0,0 +1,52 @@ +package org.jetlinks.community.rule.engine.entity; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; +import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue; +import org.hswebframework.web.api.crud.entity.GenericEntity; +import org.hswebframework.web.crud.annotation.EnableEntityEvent; +import org.hswebframework.web.utils.DigestUtils; +import org.springframework.util.StringUtils; + +import javax.persistence.Column; +import javax.persistence.Index; +import javax.persistence.Table; +import javax.validation.constraints.NotBlank; + +@Table(name = "s_alarm_rule_bind", indexes = { + @Index(name = "idx_alarm_rule_aid", columnList = "alarmId"), + @Index(name = "idx_alarm_rule_rid", columnList = "ruleId"), +}) +@Getter +@Setter +@Schema(description = "告警规则绑定信息") +@EnableEntityEvent +public class AlarmRuleBindEntity extends GenericEntity { + + public static final int ANY_BRANCH_INDEX = -1; + + @Column(nullable = false, updatable = false) + @NotBlank + @Schema(description = "告警ID") + private String alarmId; + + @Column(nullable = false, updatable = false) + @NotBlank + @Schema(description = "场景规则ID") + private String ruleId; + + @Column(nullable = false, updatable = false) + @Schema(description = "规则条件分支ID") + @DefaultValue("-1") + private Integer branchIndex; + + @Override + public String getId() { + if (StringUtils.hasText(super.getId())) { + return super.getId(); + } + setId(DigestUtils.md5Hex(String.join("|", alarmId, ruleId, String.valueOf(branchIndex)))); + return super.getId(); + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/AlarmMode.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/AlarmMode.java new file mode 100644 index 00000000..c4a46986 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/enums/AlarmMode.java @@ -0,0 +1,8 @@ +package org.jetlinks.community.rule.engine.enums; + +public enum AlarmMode { + //触发告警 + trigger, + //解除告警 + relieve +} \ No newline at end of file diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java index a67ac68a..29b7a9e5 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceOperation.java @@ -135,13 +135,13 @@ public class DeviceOperation { terms.addAll( this.createTerm( metadata.getEvent(eventId) - .>map(event -> Collections - .singletonList( - of("data", - event.getName(), - event.getType()) - )) - .orElse(Collections.emptyList()), + .>map(event -> Collections + .singletonList( + of("data", + event.getName(), + event.getType()) + )) + .orElse(Collections.emptyList()), (property, column) -> column.setChildren(createTermColumn("event", property, false)))); } //调用功能 @@ -149,12 +149,12 @@ public class DeviceOperation { terms.addAll( this.createTerm( metadata.getFunction(functionId) - .>map(meta -> Collections.singletonList( - of("output", - meta.getName(), - meta.getOutput())) - ) - .orElse(Collections.emptyList()), + .>map(meta -> Collections.singletonList( + of("output", + meta.getName(), + meta.getOutput())) + ) + .orElse(Collections.emptyList()), (property, column) -> column.setChildren(createTermColumn("function", property, false)))); } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java index 05e14acf..a5416306 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/DeviceTrigger.java @@ -25,6 +25,7 @@ import org.jetlinks.community.rule.engine.scene.term.TermColumn; import org.jetlinks.community.rule.engine.scene.term.TermTypeSupport; import org.jetlinks.community.rule.engine.scene.term.TermTypes; import org.jetlinks.community.rule.engine.scene.value.TermValue; +import org.jetlinks.reactor.ql.DefaultReactorQLContext; import org.jetlinks.reactor.ql.ReactorQL; import org.jetlinks.reactor.ql.ReactorQLContext; import org.jetlinks.rule.engine.api.model.RuleModel; @@ -56,6 +57,9 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { @NotNull(message = "error.scene_rule_trigger_device_operation_cannot_be_null") private DeviceOperation operation; + @Schema(description = "拓展信息") + private Map options; + public SqlRequest createSql(List terms) { return createSql(terms, true); } @@ -170,16 +174,12 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { .builder() .sql(sql) .build(); - Object[] args = request.getParameters(); + List args = Arrays.asList(request.getParameters()); String sqlString = request.toNativeSql(); return new Function, Mono>() { @Override public Mono apply(Map map) { - ReactorQLContext context = ReactorQLContext.ofDatasource((t) -> Flux.just(map)); - for (Object arg : args) { - context.bind(arg); - } - + ReactorQLContext context = new DefaultReactorQLContext((t) -> Flux.just(map), args); return ql .start(context) .hasElements(); @@ -361,7 +361,7 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable { public List createDefaultVariable() { return Arrays.asList( - Variable.of("deviceId", "设备ID"), + Variable.of("deviceId", "设备ID").withOption(Variable.OPTION_PRODUCT_ID,productId), Variable.of("deviceName", "设备名称"), Variable.of("productId", "产品ID"), Variable.of("productName", "产品名称") diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneAction.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneAction.java index 9ddcd746..ca81b405 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneAction.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneAction.java @@ -3,10 +3,12 @@ package org.jetlinks.community.rule.engine.scene; import io.swagger.v3.oas.annotations.media.Schema; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.i18n.LocaleUtils; +import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.MessageType; @@ -18,6 +20,7 @@ import org.jetlinks.core.metadata.DeviceMetadata; import org.jetlinks.core.metadata.FunctionMetadata; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.types.BooleanType; +import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.community.relation.utils.VariableSource; import org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider; @@ -27,8 +30,10 @@ import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec; import org.jetlinks.community.rule.engine.scene.term.TermTypes; import org.jetlinks.rule.engine.api.model.RuleNodeModel; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; import java.io.Serializable; import java.time.temporal.ChronoUnit; import java.util.*; @@ -46,6 +51,7 @@ import static org.hswebframework.web.i18n.LocaleUtils.*; public class SceneAction implements Serializable { @Schema(description = "执行器类型") + @NotNull private Executor executor; @Schema(description = "执行器类型为[notify]时不能为空") @@ -57,23 +63,55 @@ public class SceneAction implements Serializable { @Schema(description = "执行器类型为[device]时不能为空") private Device device; + @Schema(description = "执行器类型为[alarm]时不能为空") + private Alarm alarm; + @Schema(description = "输出过滤条件,串行执行动作时,满足条件才会进入下一个节点") private List terms; - public Flux createVariables(DeviceRegistry registry, int index) { + @Schema(description = "拓展信息") + private Map options; + + public Flux createVariables(DeviceRegistry registry, Integer branchIndex, int index) { //设备 if (executor == Executor.device && device != null) { return device .getDeviceMetadata(registry, device.productId) - .flatMapMany(metadata -> currentReactive() - .flatMapIterable(locale -> - doWith(metadata, - locale, - (m, l) -> device.createVariables(metadata, index)))); + .map(metadata -> createVariable(branchIndex, index, device.createVariables(metadata))) + .flux() + .as(LocaleUtils::transform); + } + if (executor == Executor.alarm && alarm != null) { + return Mono + .fromSupplier(() -> createVariable(branchIndex, index, alarm.createVariables())) + .flux() + .as(LocaleUtils::transform); } return Flux.empty(); } + private Variable createVariable(Integer branchIndex, int actionIndex, List children) { + int humanIndex = actionIndex + 1; + + String varId = "action_" + humanIndex; + + if (branchIndex != null) { + varId = "branch_" + branchIndex + "_" + varId; + } + + String message = resolveMessage( + "message.action_var_index", + String.format("动作[%s]", humanIndex), + humanIndex + ); + + + Variable variable = Variable.of(varId, message); + variable.setChildren(children); + + return variable; + } + public static SceneAction notify(String notifyType, String notifierId, String templateId, @@ -114,17 +152,27 @@ public class SceneAction implements Serializable { node.setConfiguration(config); return; } + case alarm: + node.setExecutor(AlarmTaskExecutorProvider.executor); + node.setConfiguration(FastBeanCopier.copy(alarm, new HashMap<>())); + return; //设备指令 case device: { DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig config = new DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig(); config.setMessage(device.message); - config.setSelectorSpec( - DeviceSelectorProviders.composite( - //先选择产品下的设备 - DeviceSelectorProviders.product(device.productId), - FastBeanCopier.copy(device, new DeviceSelectorSpec()) - )); + + if (DeviceSelectorProviders.isFixed(device)) { + config.setSelectorSpec(FastBeanCopier.copy(device, new DeviceSelectorSpec())); + } else { + config.setSelectorSpec( + DeviceSelectorProviders.composite( + //先选择产品下的设备 + DeviceSelectorProviders.product(device.productId), + FastBeanCopier.copy(device, new DeviceSelectorSpec()) + )); + } + config.setFrom("fixed"); config.setStateOperator("direct"); config.setProductId(device.productId); @@ -153,8 +201,7 @@ public class SceneAction implements Serializable { private Map message; - public List createVariables(DeviceMetadata metadata, - int actionIndex) { + public List createVariables(DeviceMetadata metadata) { DeviceMessage message = MessageType .convertMessage(this.message) .filter(DeviceMessage.class::isInstance) @@ -164,25 +211,28 @@ public class SceneAction implements Serializable { return Collections.emptyList(); } List variables = new ArrayList<>(); - int humanIndex = actionIndex + 1; - - Variable action = Variable - .of("action_" + humanIndex, resolveMessage( - "message.action_var_index", - String.format("动作[%s]", humanIndex), - humanIndex - )); //下发指令是否成功 variables.add(Variable .of("success", resolveMessage( "message.action_execute_success", - "执行是否成功", - humanIndex + "执行是否成功" )) .withType(BooleanType.ID)); + //设备ID + variables.add(Variable + .of("deviceId", + resolveMessage( + "message.device_id", + "设备ID" + )) + .withType(BooleanType.ID) + //标识变量属于哪个产品 + .withOption(Variable.OPTION_PRODUCT_ID, productId) + ); + if (message instanceof ReadPropertyMessage) { List properties = ((ReadPropertyMessage) message).getProperties(); for (String property : properties) { @@ -220,8 +270,7 @@ public class SceneAction implements Serializable { } } - action.setChildren(variables); - return Collections.singletonList(action); + return variables; } } @@ -272,7 +321,9 @@ public class SceneAction implements Serializable { @Getter @Setter - public static class Delay { + @AllArgsConstructor + @NoArgsConstructor + public static class Delay implements Serializable { @Schema(description = "延迟时间") private int time; @@ -282,7 +333,7 @@ public class SceneAction implements Serializable { @Getter @Setter - public static class Notify { + public static class Notify implements Serializable { @Schema(description = "通知类型") @NotBlank(message = "error.scene_rule_actions_notify_type_cannot_be_empty") private String notifyType; @@ -303,6 +354,40 @@ public class SceneAction implements Serializable { private Map variables; } + + @Getter + @Setter + public static class Alarm extends AlarmTaskExecutorProvider.Config { + + /** + * @see org.jetlinks.community.rule.engine.alarm.AlarmRuleHandler.Result + */ + public List createVariables() { + + List variables = new ArrayList<>(); + + variables.add( + Variable.of("alarmName", + LocaleUtils.resolveMessage("message.alarm_config_name", "告警配置名称")) + ); + + variables.add( + Variable.of("level", + LocaleUtils.resolveMessage("message.alarm_level", "告警级别")) + .withType(IntType.ID) + ); + + variables.add( + Variable.of("alarming", + LocaleUtils.resolveMessage("message.is_alarming", "是否正在告警")) + .withType(BooleanType.ID) + ); + + return variables; + } + } + + @Getter @AllArgsConstructor public enum DelayUnit { @@ -316,7 +401,8 @@ public class SceneAction implements Serializable { public enum Executor { notify, delay, - device + device, + alarm } -} +} \ No newline at end of file diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneConditionAction.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneConditionAction.java index 9a9a1981..bc680df6 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneConditionAction.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneConditionAction.java @@ -13,6 +13,11 @@ import java.util.List; @Setter public class SceneConditionAction implements Serializable { + /** + * @see org.jetlinks.community.rule.engine.scene.term.TermColumn + * @see org.jetlinks.community.rule.engine.scene.term.TermType + * @see org.jetlinks.community.rule.engine.scene.value.TermValue + */ @Schema(description = "条件") private List when; @@ -20,6 +25,6 @@ public class SceneConditionAction implements Serializable { private ShakeLimit shakeLimit; @Schema(description = "满足条件时执行的动作") - private SceneActions then; + private List then; } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java index b35980ed..1a6b9528 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneRule.java @@ -18,6 +18,7 @@ import org.jetlinks.community.rule.engine.commons.ShakeLimit; import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator; import org.jetlinks.community.rule.engine.scene.term.TermColumn; import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping; +import org.jetlinks.rule.engine.api.RuleData; import org.jetlinks.rule.engine.api.model.RuleLink; import org.jetlinks.rule.engine.api.model.RuleModel; import org.jetlinks.rule.engine.api.model.RuleNodeModel; @@ -27,19 +28,23 @@ import reactor.core.Disposables; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.function.Function3; import reactor.util.concurrent.Queues; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import java.io.Serializable; import java.util.*; -import java.util.function.BiFunction; import java.util.function.Function; @Getter @Setter public class SceneRule implements Serializable { + public static final String ACTION_KEY_BRANCH_INDEX = "_branchIndex"; + public static final String ACTION_KEY_GROUP_INDEX = "_groupIndex"; + public static final String ACTION_KEY_ACTION_INDEX = "_actionIndex"; + @Schema(description = "告警ID") @NotBlank(message = "error.scene_rule_id_cannot_be_blank") private String id; @@ -52,6 +57,11 @@ public class SceneRule implements Serializable { @NotNull(message = "error.scene_rule_trigger_cannot_be_null") private Trigger trigger; + /** + * @see org.jetlinks.community.rule.engine.scene.term.TermColumn + * @see org.jetlinks.community.rule.engine.scene.term.TermType + * @see org.jetlinks.community.rule.engine.scene.value.TermValue + */ @Schema(description = "触发条件") private List terms; @@ -64,6 +74,9 @@ public class SceneRule implements Serializable { @Schema(description = "动作分支") private List branches; + @Schema(description = "扩展配置") + private Map options; + @Schema(description = "说明") private String description; @@ -141,6 +154,7 @@ public class SceneRule implements Serializable { public Flux createVariables(List columns, Integer branchIndex, + Integer branchGroupIndex, Integer actionIndex, DeviceRegistry registry) { Flux variables = createSceneVariables(columns); @@ -149,19 +163,20 @@ public class SceneRule implements Serializable { if (branchIndex == null && !parallel && actionIndex != null && CollectionUtils.isNotEmpty(actions)) { for (int i = 0; i < Math.min(actions.size(), actionIndex + 1); i++) { - variables = variables.concatWith(actions.get(i).createVariables(registry, i)); + variables = variables.concatWith(actions.get(i).createVariables(registry, branchIndex, i)); } } //分支条件 - if (branchIndex != null && CollectionUtils.isNotEmpty(branches) && branches.size() > branchIndex) { + if (branchIndex != null && branchGroupIndex != null && CollectionUtils.isNotEmpty(branches) && branches.size() > branchIndex) { SceneConditionAction branch = branches.get(branchIndex); + SceneActions then = branch.getThen() != null && branch.getThen().size() > branchGroupIndex + ? branch.getThen().get(branchGroupIndex) : null; List actionList; - if (branch.getThen() != null && !branch.getThen().isParallel() && - - CollectionUtils.isNotEmpty(actionList = branch.getThen().getActions())) { + if (then != null && !then.isParallel() && + CollectionUtils.isNotEmpty(actionList = then.getActions())) { for (int i = 0; i < Math.min(actionList.size(), actionIndex + 1); i++) { - variables = variables.concatWith(actionList.get(i).createVariables(registry, i)); + variables = variables.concatWith(actionList.get(i).createVariables(registry, branchIndex, i)); } } @@ -171,8 +186,12 @@ public class SceneRule implements Serializable { .doOnNext(Variable::refactorPrefix); } + private String createBranchActionId(int branchIndex, int groupId, int actionIndex) { + return "branch_" + branchIndex + "_group_" + groupId + "_action_" + actionIndex; + } + public Disposable createBranchHandler(Flux> sourceData, - BiFunction, Mono> output) { + Function3, Mono> output) { if (CollectionUtils.isEmpty(branches)) { return Disposables.disposed(); } @@ -186,65 +205,72 @@ public class SceneRule implements Serializable { //执行条件 Function, Mono> filter = createFilter(branch.getWhen()); //满足条件后的输出操作 - Function, Mono> out; + List, Mono>> outs = new ArrayList<>(); - SceneActions then = branch.getThen(); - //执行动作 - if (then != null && CollectionUtils.isNotEmpty(then.getActions())) { + List groups = branch.getThen(); + int thenIndex = 0; + if (CollectionUtils.isNotEmpty(groups)) { + thenIndex++; - int size = then.getActions().size(); - //串行,只传递到第一个动作 - if (!then.isParallel() || size == 1) { - String nodeId = "branch_" + _branchIndex + "_action_1"; - out = data -> output.apply(nodeId, data); - } else { - //多个并行执行动作 - String[] nodeIds = new String[size]; - for (int i = 0; i < nodeIds.length; i++) { - nodeIds[0] = "branch_" + _branchIndex + "_action_" + (i + 1); + for (SceneActions then : groups) { + Function, Mono> out; + + int size = then.getActions().size(); + //串行,只传递到第一个动作 + if (!then.isParallel() || size == 1) { + String nodeId = createBranchActionId(_branchIndex, thenIndex, 1); + out = data -> output.apply(_branchIndex, nodeId, data); + } else { + //多个并行执行动作 + String[] nodeIds = new String[size]; + for (int i = 0; i < nodeIds.length; i++) { + nodeIds[0] = createBranchActionId(_branchIndex, thenIndex, 1 + (i + 1)); + } + Flux nodeIdFlux = Flux.fromArray(nodeIds); + //并行 + out = data -> nodeIdFlux + .flatMap(nodeId -> output.apply(_branchIndex, nodeId, data)) + .then(); } - Flux nodeIdFlux = Flux.fromArray(nodeIds); - //并行 - out = data -> nodeIdFlux - .flatMap(nodeId -> output.apply(nodeId, data)) - .then(); + //防抖 + ShakeLimit shakeLimit = branch.getShakeLimit(); + if (shakeLimit != null && shakeLimit.isEnabled()) { + + Sinks.Many> sinks = Sinks + .many() + .unicast() + .onBackpressureBuffer(Queues.>unboundedMultiproducer().get()); + + //分组方式,比如设备触发时,应该按设备分组,每个设备都走独立的防抖策略 + ShakeLimitGrouping> grouping = createGrouping(); + + Function, Mono> handler = out; + + disposable.add( + shakeLimit + .transfer(sinks.asFlux(), + (duration, stream) -> + grouping + .group(stream)//先按自定义分组再按事件窗口进行分组 + .flatMap(group -> group.window(duration), Integer.MAX_VALUE), + (map, total) -> map.put("_total", total)) + .flatMap(handler) + .subscribe() + ); + //输出到sink进行防抖控制 + out = data -> { + sinks.emitNext(data, Reactors.emitFailureHandler()); + return Mono.empty(); + }; + } + outs.add(out); } - //防抖 - ShakeLimit shakeLimit = branch.getShakeLimit(); - if (shakeLimit != null && shakeLimit.isEnabled()) { - - Sinks.Many> sinks = Sinks - .many() - .unicast() - .onBackpressureBuffer(Queues.>unboundedMultiproducer().get()); - - //分组方式,比如设备触发时,应该按设备分组,每个设备都走独立的防抖策略 - ShakeLimitGrouping> grouping = createGrouping(); - - Function, Mono> handler = out; - - disposable.add( - shakeLimit - .transfer(sinks.asFlux(), - (duration, stream) -> - grouping - .group(stream)//先按自定义分组再按事件窗口进行分组 - .flatMap(group -> group.window(duration), Integer.MAX_VALUE), - (map, total) -> map.put("_total", total)) - .flatMap(handler) - .subscribe() - ); - //输出到sink进行防抖控制 - out = data -> { - sinks.emitNext(data, Reactors.emitFailureHandler()); - return Mono.empty(); - }; - } - } else { - out = ignore -> Mono.empty(); } - Function, Mono> fOut = out; + + Flux, Mono>> outFlux = Flux.fromIterable(outs); + + Function, Mono> fOut = out -> outFlux.flatMap(fun -> fun.apply(out)).then(); Function, Mono> handler = @@ -363,40 +389,52 @@ public class SceneRule implements Serializable { for (SceneConditionAction branch : branches) { branchIndex++; - SceneActions actions = branch.getThen(); - if (actions != null && CollectionUtils.isNotEmpty(actions.getActions())) { - int actionIndex = 1; - RuleNodeModel preNode = null; - SceneAction preAction = null; - for (SceneAction action : actions.getActions()) { - RuleNodeModel actionNode = new RuleNodeModel(); - actionNode.setId("branch_" + branchIndex + "_action_" + actionIndex); - actionNode.setName("条件_" + branchIndex + "_动作_" + actionIndex); + List group = branch.getThen(); - action.applyNode(actionNode); - //串行 - if (!actions.isParallel()) { - //串行的时候 标记记录每一个动作的数据到header中,用于进行条件判断或者数据引用 - actionNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER, true); - actionNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER_KEY, actionNode.getId()); + if (CollectionUtils.isNotEmpty(group)) { + int groupIndex = 0; + for (SceneActions actions : group) { + groupIndex++; + if (actions != null && CollectionUtils.isNotEmpty(actions.getActions())) { + int actionIndex = 1; + RuleNodeModel preNode = null; + SceneAction preAction = null; + for (SceneAction action : actions.getActions()) { + RuleNodeModel actionNode = new RuleNodeModel(); + actionNode.setId(createBranchActionId(branchIndex, groupIndex, actionIndex)); + actionNode.setName("条件" + branchIndex + "_分组" + groupIndex + "_动作" + actionIndex); - if (preNode != null) { - //上一个节点->当前动作节点 - RuleLink link = model.link(preNode, actionNode); - //设置上一个节点到此节点的输出条件 - if (CollectionUtils.isNotEmpty(preAction.getTerms())) { - link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms())); + action.applyNode(actionNode); + //串行 + if (!actions.isParallel()) { + //串行的时候 标记记录每一个动作的数据到header中,用于进行条件判断或者数据引用 + actionNode.addConfiguration(RuleData.RECORD_DATA_TO_HEADER, true); + actionNode.addConfiguration(RuleData.RECORD_DATA_TO_HEADER_KEY, actionNode.getId()); + actionNode.addConfiguration(ACTION_KEY_BRANCH_INDEX, branchIndex); + actionNode.addConfiguration(ACTION_KEY_GROUP_INDEX, groupIndex); + actionNode.addConfiguration(ACTION_KEY_ACTION_INDEX, actionIndex); + + if (preNode != null) { + //上一个节点->当前动作节点 + RuleLink link = model.link(preNode, actionNode); + //设置上一个节点到此节点的输出条件 + if (CollectionUtils.isNotEmpty(preAction.getTerms())) { + link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms())); + } + } + + preNode = actionNode; } + + model.getNodes().add(actionNode); + preAction = action; + actionIndex++; } - - preNode = actionNode; } - - model.getNodes().add(actionNode); - preAction = action; - actionIndex++; } } + + } } @@ -406,7 +444,5 @@ public class SceneRule implements Serializable { public void validate() { ValidatorUtils.tryValidate(this); - trigger.validate(); - } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.java index acc5bab3..bb73ff83 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.java @@ -8,6 +8,7 @@ import org.hswebframework.web.bean.FastBeanCopier; import org.hswebframework.web.id.IDGenerator; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; +import org.jetlinks.core.trace.TraceHolder; import org.jetlinks.core.utils.FluxUtils; import org.jetlinks.community.PropertyConstants; import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping; @@ -24,14 +25,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; -import java.util.Collections; -import java.util.List; import java.util.Map; @Slf4j @AllArgsConstructor public class SceneTaskExecutorProvider implements TaskExecutorProvider { + private static final int BACKPRESSURE_BUFFER_MAX_SIZE = + Integer.getInteger("scene.backpressure-buffer-size", 10_0000); + public static final String EXECUTOR = "scene"; private final EventBus eventBus; @@ -86,49 +88,13 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider { this.rule = sceneRule; } - private Disposable init() { - if (disposable != null) { - disposable.dispose(); - } - boolean useBranch = CollectionUtils.isNotEmpty(rule.getBranches()); - - SqlRequest request = rule.createSql(!useBranch); - - //不是通过SQL来处理数据 - if (request.isEmpty()) { - return context - .getInput() - .accept() - .flatMap(this::handleOutput) - .subscribe(); - } - if (log.isDebugEnabled()) { - log.debug("init scene [{}:{}], sql:{}", rule.getId(), rule.getName(), request.toNativeSql()); - } - //数据源 - ReactorQLContext qlContext = ReactorQLContext + private ReactorQLContext createReactorQLContext() { + return ReactorQLContext .ofDatasource(table -> { - //来自上游(定时等) if (table.startsWith("/")) { //来自事件总线 return this - .refactorTopic(table) - .flatMapMany(topics -> eventBus - .subscribe( - Subscription - .builder() - .justLocal() - .topics(topics) - .subscriberId("scene:" + rule.getId()) - .build())) - .>handle((topicPayload, synchronousSink) -> { - String topic = topicPayload.getTopic(); - try { - synchronousSink.next(topicPayload.bodyToJson(true)); - } catch (Throwable err) { - log.warn("decode payload error {}", topic, err); - } - }) + .subscribe(table) //有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题 .as(FluxUtils.distinct(map -> { Object id = map.get(PropertyConstants.uid.getKey()); @@ -136,26 +102,68 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider { id = IDGenerator.SNOW_FLAKE_STRING.generate(); } return id; - }, Duration.ofSeconds(5))); + }, Duration.ofSeconds(1))); } else { + //来自上游(定时等) return context .getInput() .accept() .flatMap(RuleData::dataToMap); } }); + } - //sql参数 - for (Object parameter : request.getParameters()) { - qlContext.bind(parameter); + private Disposable init() { + if (disposable != null) { + disposable.dispose(); + } + boolean useBranch = CollectionUtils.isNotEmpty(rule.getBranches()); + + SqlRequest request = rule.createSql(!useBranch); + Flux> source; + + //不是通过SQL来处理数据 + if (request.isEmpty()) { + source = context + .getInput() + .accept() + .flatMap(RuleData::dataToMap); + } else { + if (log.isDebugEnabled()) { + log.debug("init scene [{}:{}], sql:{}", rule.getId(), rule.getName(), request.toNativeSql()); + } + + ReactorQLContext qlContext = createReactorQLContext(); + + //sql参数 + for (Object parameter : request.getParameters()) { + qlContext.bind(parameter); + } + source = ReactorQL + .builder() + .sql(request.getSql()) + .build() + .start(qlContext) + .map(ReactorQLRecord::asMap); } - Flux> source = ReactorQL - .builder() - .sql(request.getSql()) - .build() - .start(qlContext) - .map(ReactorQLRecord::asMap); + // 分支条件 + if (useBranch) { + return rule + .createBranchHandler( + source, + (idx,nodeId, data) -> { + if (log.isDebugEnabled()) { + log.debug("scene [{}] branch [{}] execute", rule.getId(), nodeId); + } + RuleData ruleData = context.newRuleData(data); + return context + .getOutput() + .write(nodeId, ruleData) + .onErrorResume(err -> context.onError(err, ruleData)) + .as(tracer()); + }); + } //防抖 Trigger.GroupShakeLimit shakeLimit = rule.getTrigger().getShakeLimit(); @@ -176,10 +184,30 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider { .subscribe(); } - private Mono> refactorTopic(String topic) { - //todo 根据权限对topic进行重构 - - return Mono.just(Collections.singletonList(topic)); + private Flux> subscribe(String topic) { + return eventBus + .subscribe( + Subscription + .builder() + .justLocal() + .topics(topic) + .subscriberId("scene:" + rule.getId()) + .build()) + .>handle((topicPayload, synchronousSink) -> { + try { + synchronousSink.next(topicPayload.bodyToJson(true)); + } catch (Throwable err) { + log.warn("decode payload error {}", topicPayload.getTopic(), err); + } + }) + //有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题 + .as(FluxUtils.distinct(map -> { + Object id = map.get(PropertyConstants.uid.getKey()); + if (null == id) { + id = IDGenerator.SNOW_FLAKE_STRING.generate(); + } + return id; + }, Duration.ofSeconds(5))); } private Mono handleOutput(RuleData data) { @@ -198,7 +226,11 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider { .filter(sceneData) .defaultIfEmpty(true); }) - .flatMap(map -> context.getOutput().write(data.newData(map))) + .flatMap(map -> context + .getOutput() + .write(data.newData(map)) + .as(tracer()) + .contextWrite(ctx -> TraceHolder.readToContext(ctx, map))) .onErrorResume(err -> context.onError(err, data)) .then(); diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java index 7315c072..30063a38 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java @@ -71,8 +71,8 @@ public class SceneUtils { if (term != null) { //有条件的数据会有别名 以_分隔 variables.add(Variable - .of(column.getVariable("_"), variableName) - .withType(column.getDataType())); + .of(column.getVariable("_"), variableName) + .withType(column.getDataType())); List termValues = TermValue.of(term); String property = column.getPropertyOrNull(); for (TermValue termValue : termValues) { diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java index d99b66d8..318a1b52 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Trigger.java @@ -89,6 +89,8 @@ public class Trigger implements Serializable { timerNode.setId("scene:timer"); timerNode.setName("定时触发场景"); timerNode.setExecutor("timer"); + //使用最小负载节点来执行定时 + // timerNode.setSchedulingRule(SchedulerSelectorStrategy.minimumLoad()); timerNode.setConfiguration(FastBeanCopier.copy(timer, new HashMap<>())); model.getNodes().add(timerNode); //定时->场景 diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java index abe04c03..b1595720 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java @@ -9,11 +9,14 @@ import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.community.rule.engine.scene.term.TermType; import org.springframework.util.StringUtils; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Getter @Setter public class Variable { + public static final String OPTION_PRODUCT_ID = "productId"; @Schema(description = "变量ID") private String id; @@ -35,6 +38,23 @@ public class Variable { @Schema(description = "子级变量") private List children; + @Schema(description = "其他配置") + private Map options; + + public synchronized Map safeOptions() { + return options == null ? options = new HashMap<>() : options; + } + + public Variable withOption(String key, Object value) { + safeOptions().put(key, value); + return this; + } + + public Variable withOptions(Map options) { + safeOptions().putAll(options); + return this; + } + public Variable withType(String type) { this.type = type; return this; diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/FixedTermTypeSupport.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/FixedTermTypeSupport.java index b9ad329b..1b766f72 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/FixedTermTypeSupport.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/FixedTermTypeSupport.java @@ -36,13 +36,31 @@ public enum FixedTermTypeSupport implements TermTypeSupport { return val; } }, - in("在...之中", "in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID, ArrayType.ID) { + in("在...之中", "in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID) { @Override protected Object convertValue(Object val) { return val; } }, - nin("不在...之中", "not in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID, ArrayType.ID) { + nin("不在...之中", "nin", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID) { + @Override + protected Object convertValue(Object val) { + return val; + } + }, + contains_all("全部包含在...之中", "contains_all", ArrayType.ID) { + @Override + protected Object convertValue(Object val) { + return val; + } + }, + contains_any("任意包含在...之中", "contains_any", ArrayType.ID) { + @Override + protected Object convertValue(Object val) { + return val; + } + }, + not_contains("不包含在...之中", "not_contains", ArrayType.ID) { @Override protected Object convertValue(Object val) { return val; @@ -50,9 +68,21 @@ public enum FixedTermTypeSupport implements TermTypeSupport { }, like("包含字符", "str_like", StringType.ID), - nlike("不包含字符", "not str_like", StringType.ID), + nlike("不包含字符", "str_nlike", StringType.ID), - ; + // gt(math.sub(column,now()),?) + time_gt_now("距离当前时间大于...秒", "time_gt_now", DateTimeType.ID) { + @Override + protected void appendFunction(String column, PrepareSqlFragments fragments) { + fragments.addSql("gt(math.divi(math.sub(now(),", column, "),1000),"); + } + }, + time_lt_now("距离当前时间小于...秒", "time_lt_now", DateTimeType.ID){ + @Override + protected void appendFunction(String column, PrepareSqlFragments fragments) { + fragments.addSql("lt(math.divi(math.sub(now(),", column, "),1000),"); + } + }; private final String text; private final Set supportTypes; @@ -80,17 +110,23 @@ public enum FixedTermTypeSupport implements TermTypeSupport { return val; } - @Override - public final SqlFragments createSql(String column, Object value) { - PrepareSqlFragments fragments = PrepareSqlFragments.of(); + protected void appendFunction(String column, PrepareSqlFragments fragments) { fragments.addSql(function + "(", column, ","); + } + + @Override + public SqlFragments createSql(String column, Object value) { + PrepareSqlFragments fragments = PrepareSqlFragments.of(); + appendFunction(column, fragments); + value = convertValue(value); + if (value instanceof NativeSql) { fragments .addSql(((NativeSql) value).getSql()) .addParameter(((NativeSql) value).getParameters()); } else { fragments.addSql("?") - .addParameter(convertValue(value)); + .addParameter(value); } fragments.addSql(")"); return fragments; diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java index f7d012cc..6d63a023 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermColumn.java @@ -9,10 +9,10 @@ import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.web.bean.FastBeanCopier; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.metadata.types.EnumType; import org.jetlinks.community.PropertyMetadataConstants; import org.jetlinks.community.PropertyMetric; import org.jetlinks.community.rule.engine.scene.DeviceOperation; -import org.jetlinks.core.metadata.types.EnumType; import org.springframework.util.StringUtils; import java.util.*; @@ -45,30 +45,31 @@ public class TermColumn { @Schema(description = "支持的条件类型") private List termTypes; - @Schema(description = "可选内容") - private List options; - @Schema(description = "支持的指标") private List metrics; + @Schema(description = "可选内容") + private List options; + @Schema(description = "子列,在类型为object时有值") private List children; public TermColumn copyColumn(Predicate childrenPredicate) { - TermColumn copy = FastBeanCopier.copy(this,new TermColumn()); + TermColumn copy = FastBeanCopier.copy(this, new TermColumn()); if (CollectionUtils.isNotEmpty(children)) { copy.setChildren( - children.stream() - .filter(child -> childrenPredicate.test(child.getColumn())) - .map(child -> child.copyColumn(childrenPredicate)) - .collect(Collectors.toList()) + children.stream() + .filter(child -> childrenPredicate.test(child.getColumn())) + .map(child -> child.copyColumn(childrenPredicate)) + .collect(Collectors.toList()) ); } return copy; } + public boolean hasColumn(Collection columns) { for (String column : columns) { if (hasColumn(column)) { diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermTypes.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermTypes.java index 05d8ca2d..68c5594a 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermTypes.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/term/TermTypes.java @@ -12,7 +12,8 @@ public class TermTypes { private static final Map supports = new LinkedHashMap<>(); static { - for (FixedTermTypeSupport value : FixedTermTypeSupport.values()) { + for (FixedTermTypeSupport value : FixedTermTypeSupport + .values()) { register(value); } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/AlarmRuleBindService.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/AlarmRuleBindService.java new file mode 100644 index 00000000..dd61ad67 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/AlarmRuleBindService.java @@ -0,0 +1,17 @@ +package org.jetlinks.community.rule.engine.service; + +import lombok.AllArgsConstructor; +import org.hswebframework.web.crud.service.GenericReactiveCrudService; +import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity; +import org.springframework.stereotype.Component; + +/** + * 告警规则绑定. + * + * @author zhangji 2022/11/23 + */ +@Component +@AllArgsConstructor +public class AlarmRuleBindService extends GenericReactiveCrudService { + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/ElasticSearchAlarmHistoryService.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/ElasticSearchAlarmHistoryService.java index f4fcc590..94dbde0f 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/ElasticSearchAlarmHistoryService.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/ElasticSearchAlarmHistoryService.java @@ -18,7 +18,6 @@ import java.time.Duration; * @author bestfeng */ @AllArgsConstructor -@Service public class ElasticSearchAlarmHistoryService implements AlarmHistoryService { public final static String ALARM_HISTORY_INDEX="alarm_history"; diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/AlarmBindRuleTerm.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/AlarmBindRuleTerm.java new file mode 100644 index 00000000..989e816d --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/AlarmBindRuleTerm.java @@ -0,0 +1,58 @@ +package org.jetlinks.community.rule.engine.service.terms; + +import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * 根据告警配置查询规则. + * + * 例如:查询告警ID为alarm-id绑定的场景联动 + *
+ *     {
+ *             "column":"id",
+ *             "termType":"alarm-bind-rule",
+ *             "value":"alarm-id"
+ *     }
+ * 
+ * + * @author zhangji 2022/11/23 + */ +@Component +public class AlarmBindRuleTerm extends AbstractTermFragmentBuilder { + + public AlarmBindRuleTerm() { + super("alarm-bind-rule", "告警绑定的规则"); + } + + @Override + public SqlFragments createFragments(String columnFullName, + RDBColumnMetadata column, + Term term) { + + PrepareSqlFragments sqlFragments = PrepareSqlFragments.of(); + if (term.getOptions().contains("not")) { + sqlFragments.addSql("not"); + } + sqlFragments + .addSql("exists(select 1 from ", getTableName("s_alarm_rule_bind", column), " _bind where _bind.rule_id =", columnFullName); + + List alarmId = convertList(column, term); + sqlFragments + .addSql( + "and _bind.alarm_id in (", + alarmId.stream().map(r -> "?").collect(Collectors.joining(",")), + ")") + .addParameter(alarmId); + + sqlFragments.addSql(")"); + + return sqlFragments; + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/RuleBindAlarmTerm.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/RuleBindAlarmTerm.java new file mode 100644 index 00000000..3297762a --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/terms/RuleBindAlarmTerm.java @@ -0,0 +1,58 @@ +package org.jetlinks.community.rule.engine.service.terms; + +import org.hswebframework.ezorm.core.param.Term; +import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * 根据规则查询告警配置. + * + * 例如:查询场景联动ID为rule-id绑定的告警 + *
+ *     {
+ *             "column":"id",
+ *             "termType":"rule-bind-alarm",
+ *             "value":"rule-id"
+ *     }
+ * 
+ * + * @author zhangji 2022/11/23 + */ +@Component +public class RuleBindAlarmTerm extends AbstractTermFragmentBuilder { + + public RuleBindAlarmTerm() { + super("rule-bind-alarm", "规则绑定的告警"); + } + + @Override + public SqlFragments createFragments(String columnFullName, + RDBColumnMetadata column, + Term term) { + + PrepareSqlFragments sqlFragments = PrepareSqlFragments.of(); + if (term.getOptions().contains("not")) { + sqlFragments.addSql("not"); + } + sqlFragments + .addSql("exists(select 1 from ", getTableName("s_alarm_rule_bind", column), " _bind where _bind.alarm_id =", columnFullName); + + List ruleId = convertList(column, term); + sqlFragments + .addSql( + "and _bind.rule_id in (", + ruleId.stream().map(r -> "?").collect(Collectors.joining(",")), + ")") + .addParameter(ruleId); + + sqlFragments.addSql(")"); + + return sqlFragments; + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRuleBindController.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRuleBindController.java new file mode 100644 index 00000000..4570cdd5 --- /dev/null +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/AlarmRuleBindController.java @@ -0,0 +1,52 @@ +package org.jetlinks.community.rule.engine.web; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.AllArgsConstructor; +import org.hswebframework.web.authorization.annotation.Authorize; +import org.hswebframework.web.authorization.annotation.DeleteAction; +import org.hswebframework.web.authorization.annotation.Resource; +import org.hswebframework.web.crud.service.ReactiveCrudService; +import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController; +import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity; +import org.jetlinks.community.rule.engine.service.AlarmRuleBindService; +import org.springframework.web.bind.annotation.*; +import reactor.core.publisher.Mono; + +import java.util.List; + +/** + * 告警规则绑定. + * + * @author zhangji 2022/11/23 + */ +@RestController +@RequestMapping("/alarm/rule/bind") +@Resource(id = "alarm-config", name = "告警配置") +@Authorize +@Tag(name = "告警规则绑定") +@AllArgsConstructor +public class AlarmRuleBindController implements ReactiveServiceCrudController { + + private final AlarmRuleBindService service; + + @Override + public ReactiveCrudService getService() { + return service; + } + + @PostMapping("/{alarmId}/_delete") + @DeleteAction + @Operation(summary = "批量删除告警规则绑定") + public Mono deleteAlarmBind(@PathVariable @Parameter(description = "告警配置ID") String alarmId, + @RequestBody @Parameter(description = "场景联动ID") Mono> ruleId) { + return ruleId + .flatMap(idList -> service + .createDelete() + .where(AlarmRuleBindEntity::getAlarmId, alarmId) + .in(AlarmRuleBindEntity::getRuleId, idList) + .execute()); + } + +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java index d0d4ca50..ed51569f 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/web/SceneController.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; + @RestController @RequestMapping("/scene") @Tag(name = "场景管理") @@ -51,8 +52,8 @@ public class SceneController implements ReactiveServiceQueryController updateScene(@PathVariable String id, - @RequestBody Mono sceneRuleMono) { + public Mono update(@PathVariable String id, + @RequestBody Mono sceneRuleMono) { return sceneRuleMono .flatMap(sceneRule -> service.updateScene(id, sceneRule)) .then(); @@ -111,6 +112,7 @@ public class SceneController implements ReactiveServiceQueryController parseVariables(@RequestBody Mono ruleMono, @RequestParam(required = false) Integer branch, + @RequestParam(required = false) Integer branchGroup, @RequestParam(required = false) Integer action) { Mono cache = ruleMono.cache(); return Mono @@ -125,6 +127,7 @@ public class SceneController implements ReactiveServiceQueryController column.copyColumn(terms::containsKey)) .collect(Collectors.toList()), branch, + branchGroup, action, deviceRegistry); })