重构场景联动 (#227)

* 重构场景联动
This commit is contained in:
bestfeng1020 2022-12-29 11:44:31 +08:00 committed by GitHub
parent ecae631bd9
commit 8cb5ebcd06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 2079 additions and 614 deletions

View File

@ -15,9 +15,9 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.exception.ValidationException;
import org.springframework.util.Assert;
import javax.validation.ValidationException;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
@ -88,6 +88,12 @@ public class TimerSpec implements Serializable {
}
return predicate.and(range);
}
if (mod == ExecuteMod.once){
LocalTime onceTime = once.localTime();
Predicate<LocalDateTime> predicate
= time -> time.toLocalTime().compareTo(onceTime) == 0;
return predicate.and(range);
}
return range;
}
@ -201,10 +207,16 @@ public class TimerSpec implements Serializable {
if (trigger == null) {
Assert.hasText(cron, "error.scene_rule_timer_cron_cannot_be_empty");
}
try {
toCronExpression();
} catch (Throwable e) {
throw new ValidationException("error.cron_format_error");
if (trigger == Trigger.cron) {
try {
toCronExpression();
} catch (Throwable e) {
ValidationException exception = new ValidationException("cron", "error.cron_format_error", cron);
exception.addSuppressed(e);
throw exception;
}
} else {
nextDurationBuilder().apply(ZonedDateTime.now());
}
}
@ -213,7 +225,8 @@ public class TimerSpec implements Serializable {
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public static class Once {
public static class Once implements Serializable {
private static final long serialVersionUID = 1L;
//时间点
@Schema(description = "时间点.格式:[hh:mm],或者[hh:mm:ss]")
@NotBlank
@ -226,7 +239,8 @@ public class TimerSpec implements Serializable {
@Getter
@Setter
public static class Period {
public static class Period implements Serializable {
private static final long serialVersionUID = 1L;
//周期执行的时间区间
@Schema(description = "执行时间范围从.格式:[hh:mm],或者[hh:mm:ss]")
private String from;
@ -356,8 +370,43 @@ public class TimerSpec implements Serializable {
};
}
private TimerIterable onceIterable() {
Assert.notNull(once, "once can not be null");
Predicate<LocalDateTime> filter = createTimeFilter();
LocalTime onceTime = once.localTime();
return baseTime -> new Iterator<ZonedDateTime>() {
ZonedDateTime current = baseTime;
@Override
public boolean hasNext() {
return true;
}
@Override
public ZonedDateTime next() {
ZonedDateTime dateTime = current;
int max = MAX_IT_TIMES;
if (dateTime.toLocalTime().compareTo(onceTime) != 0){
dateTime = onceTime.atDate(dateTime.toLocalDate()).atZone(dateTime.getZone());
}
do {
if (filter.test(dateTime.toLocalDateTime()) && current.compareTo(dateTime) <= 0) {
current = dateTime.plusDays(1);
break;
}
dateTime = dateTime.plusDays(1);
max--;
} while (max > 0);
return dateTime;
}
};
}
public TimerIterable iterable() {
return mod == ExecuteMod.period ? periodIterable() : cronIterable();
if ((trigger == Trigger.cron || trigger == null) && cron != null){
return cronIterable();
}
return mod == ExecuteMod.period ? periodIterable() : onceIterable();
}
public List<ZonedDateTime> getNextExecuteTimes(ZonedDateTime from, long times) {

View File

@ -0,0 +1,26 @@
package org.jetlinks.community.reactorql;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
* 在配置类上加上此注解,并指定{@link EnableReactorQL#value()},将扫描指定包下注解了{@link ReactorQLOperation}的接口类,
* 并生成代理对象注入到spring中.
*
* @author zhouhao
* @since 1.6
* @see ReactorQL
* @see ReactorQLOperation
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(ReactorQLBeanDefinitionRegistrar.class)
public @interface EnableReactorQL {
/**
* @return 扫描的包名
*/
String[] value() default {};
}

View File

@ -0,0 +1,37 @@
package org.jetlinks.community.reactorql;
import java.lang.annotation.*;
/**
* 在接口的方法上注解,使用sql语句来处理参数
*
* @author zhouhao
* @see org.jetlinks.reactor.ql.ReactorQL
* @see ReactorQLOperation
* @since 1.6
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface ReactorQL {
/**
* 使用SQL语句来处理{@link reactor.core.publisher.Flux}操作.例如分组聚合.
* <a href="https://doc.jetlinks.cn/dev-guide/reactor-ql.html">查看文档说明</a>
*
* <pre>
* select count(1) total,name from "arg0" group by name
* </pre>
* <p>
* <p>
* 当方法有参数时,可通过arg{index}来获取参数,:
* <pre>
* select name newName from "arg0" where id = :arg1
* </pre>
*
* @return SQL语句
*/
String[] value();
}

View File

@ -0,0 +1,59 @@
package org.jetlinks.community.reactorql;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.context.index.CandidateComponentsIndex;
import org.springframework.context.index.CandidateComponentsIndexLoader;
import org.springframework.core.type.AnnotationMetadata;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
public class ReactorQLBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
@Override
@SneakyThrows
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, @Nonnull BeanDefinitionRegistry registry) {
Map<String, Object> attr = importingClassMetadata.getAnnotationAttributes(EnableReactorQL.class.getName());
if (attr == null) {
return;
}
String[] packages = (String[]) attr.get("value");
CandidateComponentsIndex index = CandidateComponentsIndexLoader.loadIndex(org.springframework.util.ClassUtils.getDefaultClassLoader());
if (null == index) {
return;
}
Set<String> path = Arrays.stream(packages)
.flatMap(str -> index
.getCandidateTypes(str, ReactorQLOperation.class.getName())
.stream())
.collect(Collectors.toSet());
for (String className : path) {
Class<?> type = org.springframework.util.ClassUtils.forName(className, null);
if (!type.isInterface() || type.getAnnotation(ReactorQLOperation.class) == null) {
continue;
}
RootBeanDefinition definition = new RootBeanDefinition();
definition.setTargetType(type);
definition.setBeanClass(ReactorQLFactoryBean.class);
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
definition.getPropertyValues().add("target", type);
if (!registry.containsBeanDefinition(type.getName())) {
log.debug("register ReactorQL Operator {}", type);
registry.registerBeanDefinition(type.getName(), definition);
}
}
}
}

View File

@ -0,0 +1,164 @@
package org.jetlinks.community.reactorql;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.LocalVariableTableParameterNameDiscoverer;
import org.springframework.core.Ordered;
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.core.ResolvableType;
import org.springframework.util.ClassUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class ReactorQLFactoryBean implements FactoryBean<Object>, InitializingBean, Ordered {
@Getter
@Setter
private Class<?> target;
private Object proxy;
private static final ParameterNameDiscoverer nameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
public ReactorQLFactoryBean() {
}
@Override
public Object getObject() {
return proxy;
}
@Override
public Class<?> getObjectType() {
return target;
}
@Override
public void afterPropertiesSet() {
Map<Method, Function<Object[], Object>> cache = new ConcurrentHashMap<>();
this.proxy = Proxy
.newProxyInstance(ClassUtils.getDefaultClassLoader(),
new Class[]{target},
(proxy, method, args) ->
cache
.computeIfAbsent(method, mtd -> createInvoker(target, mtd, mtd.getAnnotation(ReactorQL.class)))
.apply(args));
}
@SneakyThrows
private Function<Object[], Object> createInvoker(Class<?> type, Method method, ReactorQL ql) {
if (method.isDefault() || ql == null) {
Constructor<MethodHandles.Lookup> constructor = MethodHandles.Lookup.class
.getDeclaredConstructor(Class.class);
constructor.setAccessible(true);
MethodHandles.Lookup lookup = constructor.newInstance(type);
MethodHandle handle = lookup
.in(type)
.unreflectSpecial(method, type)
.bindTo(proxy);
return args -> {
try {
return handle.invokeWithArguments(args);
} catch (Throwable e) {
return Mono.error(e);
}
};
}
ResolvableType returnType = ResolvableType.forMethodReturnType(method);
if (returnType.toClass() != Mono.class && returnType.toClass() != Flux.class) {
throw new UnsupportedOperationException("方法返回值必须为Mono或者Flux");
}
Class<?> genericType = returnType.getGeneric(0).toClass();
Function<Map<String, Object>, ?> mapper;
if (genericType == Map.class || genericType == Object.class) {
mapper = Function.identity();
} else {
mapper = map -> FastBeanCopier.copy(map, genericType);
}
Function<Flux<?>, Publisher<?>> resultMapper =
returnType.resolve() == Mono.class
? flux -> flux.take(1).singleOrEmpty()
: flux -> flux;
String[] names = nameDiscoverer.getParameterNames(method);
try {
org.jetlinks.reactor.ql.ReactorQL reactorQL =
org.jetlinks.reactor.ql.ReactorQL
.builder()
.sql(ql.value())
.build();
return args -> {
Map<String, Object> argsMap = new HashMap<>();
ReactorQLContext context = ReactorQLContext.ofDatasource(name -> {
if (args.length == 0) {
return Flux.just(1);
}
if (args.length == 1) {
return convertToFlux(args[0]);
}
return convertToFlux(argsMap.get(name));
});
for (int i = 0; i < args.length; i++) {
String indexName = "arg" + i;
String name = names == null ? indexName : names[i];
context.bind(i, args[i]);
context.bind(name, args[i]);
context.bind(indexName, args[i]);
argsMap.put(names == null ? indexName : names[i], args[i]);
argsMap.put(indexName, args[i]);
}
return reactorQL.start(context)
.map(record -> mapper.apply(record.asMap()))
.as(resultMapper);
};
} catch (Throwable e) {
throw new IllegalArgumentException(
"create ReactorQL method [" + method + "] error,sql:\n" + (String.join(" ", ql.value())), e);
}
}
protected Flux<Object> convertToFlux(Object arg) {
if (arg == null) {
return Flux.empty();
}
if (arg instanceof Publisher) {
return Flux.from((Publisher<?>) arg);
}
if (arg instanceof Iterable) {
return Flux.fromIterable(((Iterable<?>) arg));
}
if (arg instanceof Object[]) {
return Flux.fromArray(((Object[]) arg));
}
return Flux.just(arg);
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}

View File

@ -0,0 +1,22 @@
package org.jetlinks.community.reactorql;
import org.springframework.stereotype.Indexed;
import java.lang.annotation.*;
/**
* 在接口上添加此注解,开启使用sql来处理reactor数据
*
* @author zhouhao
* @see ReactorQL
* @see EnableReactorQL
* @since 1.6
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Indexed
public @interface ReactorQLOperation {
}

View File

@ -0,0 +1,170 @@
package org.jetlinks.community.reactorql.term;
import lombok.Getter;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import static org.jetlinks.community.reactorql.term.TermType.OPTIONS_NATIVE_SQL;
@Getter
public enum FixedTermTypeSupport implements TermTypeSupport {
eq("等于", "eq"),
neq("不等于", "neq"),
gt("大于", "gt", DateTimeType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID),
gte("大于等于", "gte", DateTimeType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID),
lt("小于", "lt", DateTimeType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID),
lte("小于等于", "lte", DateTimeType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID),
btw("在...之间", "btw", DateTimeType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
nbtw("不在...之间", "nbtw", DateTimeType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
in("在...之中", "in", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
nin("不在...之中", "nin", StringType.ID, IntType.ID, LongType.ID, FloatType.ID, DoubleType.ID, EnumType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
contains_all("全部包含在...之中", "contains_all", ArrayType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
contains_any("任意包含在...之中", "contains_any", ArrayType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
not_contains("不包含在...之中", "not_contains", ArrayType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
return val;
}
},
like("包含字符", "str_like", StringType.ID) {
@Override
protected Object convertValue(Object val, Term term) {
if (val instanceof NativeSql) {
NativeSql sql = ((NativeSql) val);
return NativeSql.of("concat('%'," + sql.getSql() + ",'%')");
}
return super.convertValue(val, term);
}
},
nlike("不包含字符", "str_nlike", StringType.ID){
@Override
protected Object convertValue(Object val, Term term) {
if (val instanceof NativeSql) {
NativeSql sql = ((NativeSql) val);
return NativeSql.of("concat('%'," + sql.getSql() + ",'%')");
}
return super.convertValue(val, term);
}
},
// gt(math.sub(column,now()),?)
time_gt_now("距离当前时间大于...秒", "time_gt_now", DateTimeType.ID) {
@Override
protected void appendFunction(String column, PrepareSqlFragments fragments) {
fragments.addSql("gt(math.divi(math.sub(now(),", column, "),1000),");
}
},
time_lt_now("距离当前时间小于...秒", "time_lt_now", DateTimeType.ID) {
@Override
protected void appendFunction(String column, PrepareSqlFragments fragments) {
fragments.addSql("lt(math.divi(math.sub(now(),", column, "),1000),");
}
};
private final String text;
private final Set<String> supportTypes;
private final String function;
private FixedTermTypeSupport(String text, String function, String... supportTypes) {
this.text = text;
this.function = function;
this.supportTypes = new HashSet<>(Arrays.asList(supportTypes));
}
@Override
public boolean isSupported(DataType type) {
return supportTypes.isEmpty() || supportTypes.contains(type.getType());
}
protected Object convertValue(Object val, Term term) {
if (val instanceof Collection) {
//值为数组,则尝试获取第一个值
if (((Collection<?>) val).size() == 1) {
return ((Collection<?>) val).iterator().next();
}
}
return val;
}
protected void appendFunction(String column, PrepareSqlFragments fragments) {
fragments.addSql(function + "(", column, ",");
}
@Override
public SqlFragments createSql(String column, Object value, Term term) {
PrepareSqlFragments fragments = PrepareSqlFragments.of();
appendFunction(column, fragments);
if (term.getOptions().contains(OPTIONS_NATIVE_SQL)) {
value = NativeSql.of(String.valueOf(value));
}
value = convertValue(value, term);
if (value instanceof NativeSql) {
fragments
.addSql(((NativeSql) value).getSql())
.addParameter(((NativeSql) value).getParameters());
} else {
fragments.addSql("?")
.addParameter(value);
}
fragments.addSql(")");
return fragments;
}
@Override
public String getType() {
return name();
}
@Override
public String getName() {
return LocaleUtils.resolveMessage("message.term_type_" + name(), text);
}
}

View File

@ -0,0 +1,19 @@
package org.jetlinks.community.reactorql.term;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TermType {
public static final String OPTIONS_NATIVE_SQL = "native";
private String id;
private String name;
}

View File

@ -0,0 +1,20 @@
package org.jetlinks.community.reactorql.term;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.jetlinks.core.metadata.DataType;
public interface TermTypeSupport {
String getType();
String getName();
boolean isSupported(DataType type);
SqlFragments createSql(String column, Object value, Term term);
default TermType type() {
return TermType.of(getType(), getName());
}
}

View File

@ -0,0 +1,40 @@
package org.jetlinks.community.reactorql.term;
import org.jetlinks.core.metadata.DataType;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @see org.jetlinks.community.utils.ReactorUtils#createFilter(List)
*/
public class TermTypes {
private static final Map<String, TermTypeSupport> supports = new LinkedHashMap<>();
static {
for (FixedTermTypeSupport value : FixedTermTypeSupport.values()) {
register(value);
}
}
public static void register(TermTypeSupport support){
supports.put(support.getType(),support);
}
public static List<TermType> lookup(DataType dataType) {
return supports
.values()
.stream()
.filter(support -> support.isSupported(dataType))
.map(TermTypeSupport::type)
.collect(Collectors.toList());
}
public static Optional<TermTypeSupport> lookupSupport(String type) {
return Optional.ofNullable(supports.get(type));
}
}

View File

@ -1,15 +1,14 @@
package org.jetlinks.community.utils;
import lombok.Getter;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.reactorql.term.FixedTermTypeSupport;
import org.jetlinks.community.reactorql.term.TermTypeSupport;
import org.jetlinks.community.reactorql.term.TermTypes;
import org.jetlinks.core.metadata.Jsonable;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.reactor.ql.ReactorQL;
@ -20,7 +19,9 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -28,11 +29,10 @@ import java.util.function.Function;
* 响应式相关工具类
*
* @author zhouhao
* @since 2.0
* @since 1.12
*/
public class ReactorUtils {
public static <T> Function<Flux<T>, Flux<T>> limit(Long pageIndex, Long pageSize) {
if (pageIndex == null || pageSize == null) {
return Function.identity();
@ -130,7 +130,7 @@ public class ReactorUtils {
SqlRequest request = fragments.toRequest();
String sql = "select 1 from dual where " + request.getSql();
String sql = "select 1 from t where " + request.getSql();
String nativeSql = request.toNativeSql();
try {
ReactorQL ql = ReactorQL.builder().sql(sql).build();
@ -173,105 +173,37 @@ public class ReactorUtils {
switch (termType) {
case "is":
case "=":
termType = "eq";
termType = FixedTermTypeSupport.eq.name();
break;
case ">":
termType = "gt";
termType = FixedTermTypeSupport.gt.name();
break;
case ">=":
termType = "gte";
termType = FixedTermTypeSupport.gte.name();
break;
case "<":
termType = "lt";
termType = FixedTermTypeSupport.lt.getName();
break;
case "<=":
termType = "lte";
termType = FixedTermTypeSupport.lte.getName();
break;
case "!=":
case "<>":
termType = "neq";
termType = FixedTermTypeSupport.neq.getName();
break;
}
try {
TermTypeSupport support = TermTypeSupport.valueOf(termType);
return support.createSql("this['" + term.getColumn() + "']", term.getValue());
} catch (Throwable e) {
throw new IllegalArgumentException("unsupported termType " + term.getTermType(), e);
TermTypeSupport support = TermTypes.lookupSupport(termType).orElse(null);
if (support == null) {
throw new UnsupportedOperationException("unsupported termType " + term.getTermType());
}
String column = term.getColumn();
if (!column.contains("[") && !column.contains("'")) {
column = "this['" + column + "']";
}
return support.createSql(column, term.getValue(), term);
}
}
@Getter
enum TermTypeSupport {
eq("等于", "eq"),
neq("不等于", "neq"),
gt("大于", "gt", DateTimeType.ID, IntType.ID, FloatType.ID, DoubleType.ID),
gte("大于等于", "gte", DateTimeType.ID, IntType.ID, FloatType.ID, DoubleType.ID),
lt("小于", "lt", DateTimeType.ID, IntType.ID, FloatType.ID, DoubleType.ID),
lte("小于等于", "lte", DateTimeType.ID, IntType.ID, FloatType.ID, DoubleType.ID),
btw("在...之间", "btw", DateTimeType.ID, IntType.ID, FloatType.ID, DoubleType.ID) {
@Override
protected Object convertValue(Object val) {
return val;
}
},
nbtw("不在...之间", "nbtw", DateTimeType.ID, IntType.ID, FloatType.ID, DoubleType.ID) {
@Override
protected Object convertValue(Object val) {
return val;
}
},
in("在...之中", "in", StringType.ID, IntType.ID, FloatType.ID, DoubleType.ID) {
@Override
protected Object convertValue(Object val) {
return val;
}
},
nin("不在...之中", "not in", StringType.ID, IntType.ID, FloatType.ID, DoubleType.ID) {
@Override
protected Object convertValue(Object val) {
return val;
}
},
like("包含字符", "str_like", StringType.ID),
nlike("不包含字符", "not str_like", StringType.ID),
;
private final String text;
private final Set<String> supportTypes;
private final String function;
TermTypeSupport(String text, String function, String... supportTypes) {
this.text = text;
this.function = function;
this.supportTypes = new HashSet<>(Arrays.asList(supportTypes));
}
protected Object convertValue(Object val) {
return val;
}
public final SqlFragments createSql(String column, Object value) {
PrepareSqlFragments fragments = PrepareSqlFragments.of();
fragments.addSql(function + "(", column, ",");
if (value instanceof NativeSql) {
fragments
.addSql(((NativeSql) value).getSql())
.addParameter(((NativeSql) value).getParameters());
} else {
fragments.addSql("?")
.addParameter(convertValue(value));
}
fragments.addSql(")");
return fragments;
}
}
}
}

View File

@ -54,7 +54,7 @@ public class NotifierTaskExecutorProvider implements TaskExecutorProvider {
properties.getNotifyType().getName(),
properties.getNotifierId(),
properties.getTemplateId());
}).then(Mono.empty());
}).thenReturn(context.newRuleData(input));
}
@Override

View File

@ -43,7 +43,7 @@ public class RuleNotifierProperties {
public Map<String, Object> createVariables(RuleData data) {
Map<String, Object> vars = RuleDataHelper.toContextMap(data);
if (MapUtils.isNotEmpty(variables)) {
vars.putAll(VariableSource.wrap(variables));
vars.putAll(VariableSource.wrap(variables,vars));
}
return vars;
}

View File

@ -41,6 +41,9 @@ public class VariableSource implements Serializable {
@Schema(description = "关系,[source]为[relation]时不能为空")
private VariableObjectSpec relation;
@Schema(description = "拓展信息")
private Map<String, Object> options;
public Map<String, Object> toMap() {
return FastBeanCopier.copy(this, new HashMap<>());
}
@ -136,7 +139,7 @@ public class VariableSource implements Serializable {
return value;
}
if (getSource() == VariableSource.Source.upper) {
return DefaultPropertyFeature.GLOBAL.getProperty(getUpperKey(), context);
return DefaultPropertyFeature.GLOBAL.getProperty(getUpperKey(), context).orElse(null);
}
return value;
}
@ -196,15 +199,15 @@ public class VariableSource implements Serializable {
.orElse(null);
}
public static Map<String,Object> wrap(Map<String,Object> context){
Map<String, Object> vars = Maps.newLinkedHashMapWithExpectedSize(context.size());
public static Map<String, Object> wrap(Map<String, Object> def,Map<String, Object> context) {
Map<String, Object> vars = Maps.newLinkedHashMapWithExpectedSize(def.size());
for (Map.Entry<String, Object> entry : context.entrySet()) {
for (Map.Entry<String, Object> entry : def.entrySet()) {
String key = entry.getKey();
VariableSource source = VariableSource.of(entry.getValue());
if (source.getSource() == VariableSource.Source.upper) {
//替换上游值,防止key冲突(source的key和上游的key一样)导致无法获取到真实到上游值
vars.put(key, VariableSource.fixed(VariableSource.getNestProperty(source.getUpperKey(), vars)));
vars.put(key, VariableSource.fixed(VariableSource.getNestProperty(source.getUpperKey(), context)));
} else {
vars.put(key, source);
}

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.rule.engine.configuration;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
@ -37,6 +38,11 @@ public class RuleEngineConfiguration {
return new DefaultConditionEvaluator();
}
@Bean
public TermsConditionEvaluator termsConditionEvaluator(){
return new TermsConditionEvaluator();
}
@Bean
public AntVG6RuleModelParserStrategy antVG6RuleModelParserStrategy() {
return new AntVG6RuleModelParserStrategy();

View File

@ -0,0 +1,48 @@
package org.jetlinks.community.rule.engine.alarm;
import org.jetlinks.community.rule.engine.scene.SceneRule;
import reactor.core.publisher.Flux;
import java.util.Map;
import java.util.Optional;
public abstract class AbstractAlarmTarget implements AlarmTarget {
@Override
public final Flux<AlarmTargetInfo> convert(AlarmData data) {
return this
.doConvert(data)
.doOnNext(info -> {
String sourceType = AbstractAlarmTarget
.getFromOutput(SceneRule.SOURCE_TYPE_KEY, data.getOutput())
.map(String::valueOf)
.orElse(null);
String sourceId = AbstractAlarmTarget
.getFromOutput(SceneRule.SOURCE_ID_KEY, data.getOutput())
.map(String::valueOf)
.orElse(null);
String sourceName = AbstractAlarmTarget
.getFromOutput(SceneRule.SOURCE_NAME_KEY, data.getOutput())
.map(String::valueOf)
.orElse(sourceId);
if (sourceType != null && sourceId != null) {
info.withSource(sourceType, sourceId, sourceName);
}
});
}
protected abstract Flux<AlarmTargetInfo> doConvert(AlarmData data);
static Optional<Object> getFromOutput(String key, Map<String, Object> output) {
//优先从场景输出中获取
Object sceneOutput = output.get(SceneRule.CONTEXT_KEY_SCENE_OUTPUT);
if (sceneOutput instanceof Map) {
return Optional.ofNullable(((Map<?, ?>) sceneOutput).get(key));
}
return Optional.ofNullable(output.get(key));
}
}

View File

@ -6,15 +6,19 @@ public interface AlarmConstants {
String alarmConfigId = "alarmConfigId";
String alarming = "alarming";
String firstAlarm = "firstAlarm";
String alarmName = "name";
String alarmName = "alarmName";
String level = "level";
String ownerId = "ownerId";
String targetType = "targetType";
String state = "state";
String alarmTime = "alarmTime";
String lastAlarmTime = "lastAlarmTime";
String targetType = "targetType";
String targetId = "targetId";
String targetName = "targetName";
String sourceType = "sourceType";
String sourceId = "sourceId";
String sourceName = "sourceName";
}
}

View File

@ -16,12 +16,21 @@ import javax.validation.constraints.NotBlank;
public class AlarmHandleInfo {
@Schema(description = "告警记录ID")
private String id;
@NotBlank
private String alarmRecordId;
@Schema(description = "告警ID")
@NotBlank
private String alarmConfigId;
@Schema(description = "告警时间")
@NotBlank
private Long alarmTime;
@Schema(description = "处理说明")
private String describe;
@Schema(description = "处理说明")
@Schema(description = "处理时间")
private Long handleTime;
@NotBlank
@ -29,6 +38,7 @@ public class AlarmHandleInfo {
private AlarmHandleType type;
@Schema(description = "处理后的状态")
@NotBlank
private AlarmRecordState state;

View File

@ -55,7 +55,7 @@ public interface AlarmRuleHandler {
@Schema(description = "告警名称")
private String alarmName;
@Schema(description = "当前是否正在告警")
@Schema(description = "是否重复告警")
private boolean alarming;
@Schema(description = "当前首次触发")
@ -79,12 +79,24 @@ public interface AlarmRuleHandler {
@Schema(description = "告警目标名称")
private String targetName;
@Schema(description = "告警来源类型")
private String sourceType;
@Schema(description = "告警来源ID")
private String sourceId;
@Schema(description = "告警来源名称")
private String sourceName;
public Result copyWith(AlarmTargetInfo targetInfo) {
Result result = FastBeanCopier.copy(this, new Result());
result.setTargetType(targetInfo.getTargetType());
result.setTargetId(targetInfo.getTargetId());
result.setTargetName(targetInfo.getTargetName());
result.setSourceType(targetInfo.getSourceType());
result.setSourceId(targetInfo.getSourceId());
result.setSourceName(targetInfo.getSourceName());
return result;
}
@ -99,10 +111,15 @@ public interface AlarmRuleHandler {
map.put(ConfigKey.level, level);
map.put(ConfigKey.alarmTime, alarmTime);
map.put(ConfigKey.lastAlarmTime, lastAlarmTime);
map.put(ConfigKey.targetType, targetType);
map.put(ConfigKey.targetId, targetId);
map.put(ConfigKey.targetName, targetName);
map.put(ConfigKey.sourceType, sourceType);
map.put(ConfigKey.sourceId, sourceId);
map.put(ConfigKey.sourceName, sourceName);
return map;
}
}

View File

@ -1,11 +1,11 @@
package org.jetlinks.community.rule.engine.alarm;
import lombok.AllArgsConstructor;
import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
import org.hswebframework.web.crud.events.*;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
@ -22,12 +22,14 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author bestfeng
@ -64,17 +66,30 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
.fromIterable(alarmConfigMap.values())
.flatMap(alarmConfig -> AlarmTarget
.of(alarmConfig.getTargetType())
.convert(AlarmData.of(alarmConfig.getId(), alarmConfig.getName(), data.getRule().getId(), data.getRule().getName(), data.getOutput()))
.convert(AlarmData.of(alarmConfig.getId(), alarmConfig.getName(), data.getRule().getId(), data
.getRule()
.getName(), data.getOutput()))
.flatMap(targetInfo -> {
AlarmRecordEntity record = ofRecord(targetInfo, alarmConfig);
//修改告警记录
return alarmRecordService
.save(record)
//推送告警信息到消息网关中 topic格式: /alarm/{targetType}/{targetId}/{alarmConfigId}/record
//fixme 已经告警中则不再推送事件
.then(publishAlarmRecord(record, alarmConfig))
//保存告警日志
.then(Mono.defer(() -> {
.createUpdate()
.set(record)
.where(AlarmRecordEntity::getId, record.getId())
// 如果已存在告警中的记录则更新
.and(AlarmRecordEntity::getState, AlarmRecordState.warning)
.execute()
.flatMap(warningRecordCount -> {
if (warningRecordCount == 0) {
return alarmRecordService
.save(record)
.thenReturn(warningRecordCount);
} else {
return Mono.just(warningRecordCount);
}
})
.flatMap(warningRecordCount -> {
//保存告警日志
AlarmHistoryInfo alarmHistoryInfo = AlarmHistoryInfo
.of(record.getId(),
targetInfo,
@ -83,8 +98,13 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
//推送告警历史数据
publisher.publishEvent(alarmHistoryInfo);
return alarmHistoryService
.save(alarmHistoryInfo);
}));
.save(alarmHistoryInfo)
//已经告警中则不再推送事件
.then(Mono.defer(() -> warningRecordCount == 0 ?
//推送告警信息到消息网关中 topic格式: /alarm/{targetType}/{targetId}/{alarmConfigId}/record
publishAlarmRecord(alarmHistoryInfo, alarmConfig) :
Mono.empty()));
});
}))
.then()
.thenReturn(true);
@ -92,36 +112,67 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
return Mono.empty();
}
public Mono<Void> publishAlarmRecord(AlarmRecordEntity record, AlarmConfigEntity config) {
public Mono<Void> publishAlarmRecord(AlarmHistoryInfo record, AlarmConfigEntity config) {
String topic = Topics.alarm(record.getTargetType(), record.getTargetId(), record.getAlarmConfigId());
return eventBus
.publish(topic, record)
.then();
}
@Subscribe(value = "/_sys/alarm/config/deleted", features = Subscription.Feature.broker)
public Mono<Void> HandleAlarmConfigDelete(AlarmConfigEntity alarmConfig) {
return doAlarmConfigDelete(alarmConfig);
}
@Subscribe(value = "/_sys/alarm/config/created,saved,modified", features = Subscription.Feature.broker)
public Mono<Void> handleAlarmConfigCRU(AlarmConfigEntity alarmConfig) {
return doAlarmConfigCRU(alarmConfig);
}
@EventListener
public void handleAlarmConfigCreated(EntityCreatedEvent<AlarmConfigEntity> event) {
event.async(Mono.defer(() -> Flux
.fromIterable(event.getEntity())
.flatMap(this::handleAlarmConfigCRU)
.flatMap(config -> handleAlarmConfigCRU("/_sys/alarm/config/created", config))
.then()));
}
@EventListener
public void handleAlarmConfigModify(EntityModifyEvent<AlarmConfigEntity> event) {
event.async(Mono.defer(() -> Flux
.fromIterable(event.getAfter())
.flatMap(this::handleAlarmConfigCRU)
.then()));
Map<String, AlarmConfigEntity> beforeMap = event
.getBefore()
.stream()
.collect(Collectors.toMap(AlarmConfigEntity::getId, Function.identity()));
event.async(Flux
.fromIterable(event.getAfter())
.flatMap(config -> handleAlarmConfigCRU("/_sys/alarm/config/modified", config).thenReturn(config))
.filter(config -> {
AlarmConfigEntity before = beforeMap.get(config.getId());
if (before != null) {
// 字段未修改则不需要修改告警记录
if (StringUtils.hasText(before.getName()) &&
before.getName().equals(config.getName()) &&
before.getLevel() != null &&
before.getLevel().equals(config.getLevel())) {
return false;
}
}
return true;
})
.flatMap(this::updateAlarmRecord)
.then());
}
@EventListener
public void handleAlarmConfigSaved(EntitySavedEvent<AlarmConfigEntity> event) {
event.async(Mono.defer(() -> Flux
.fromIterable(event.getEntity())
.flatMap(this::handleAlarmConfigCRU)
.flatMap(config -> handleAlarmConfigCRU("/_sys/alarm/config/saved", config)
.then(updateAlarmRecord(config)))
.then()));
}
@ -153,9 +204,11 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
}
//处理告警配置创建修改保存
private Mono<Void> handleAlarmConfigCRU(AlarmConfigEntity config) {
private Mono<Void> handleAlarmConfigCRU(String topic, AlarmConfigEntity config) {
if (AlarmState.enabled.equals(config.getState())) {
return doAlarmConfigCRU(config);
return doAlarmConfigCRU(config)
.then(eventBus.publish(topic, config))
.then();
} else {
return handleAlarmConfigDelete(config);
}
@ -163,34 +216,40 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
//处理告警配置删除
private Mono<Void> handleAlarmConfigDelete(AlarmConfigEntity alarmConfig) {
return doAlarmConfigDelete(alarmConfig);
return doAlarmConfigDelete(alarmConfig)
.then(eventBus.publish("/_sys/alarm/config/deleted", alarmConfig))
.then();
}
//告警配置创建修改保存
private Mono<Void> doAlarmConfigCRU(AlarmConfigEntity alarmConfig) {
return Mono
.fromSupplier(() -> alarmConfigCache.compute(alarmConfig.getSceneId(), (k, v) -> {
if (v == null) {
v = new ConcurrentHashMap<>();
}
v.put(alarmConfig.getId(), alarmConfig);
return v;
}))
.then();
if (StringUtils.hasText(alarmConfig.getSceneId())) {
alarmConfigCache
.computeIfAbsent(alarmConfig.getSceneId(), (k) -> new ConcurrentHashMap<>())
.put(alarmConfig.getId(), alarmConfig);
}
return Mono.empty();
}
//告警配置删除
private Mono<Void> doAlarmConfigDelete(AlarmConfigEntity alarmConfig) {
return Mono
.fromSupplier(() -> alarmConfigCache.computeIfPresent(alarmConfig.getSceneId(), (k, v) -> {
v.remove(alarmConfig.getId());
if (v.size() == 0) {
alarmConfigCache.remove(alarmConfig.getSceneId());
if (StringUtils.hasText(alarmConfig.getSceneId())) {
alarmConfigCache.compute(alarmConfig.getSceneId(), (k, v) -> {
if (v != null) {
v.remove(alarmConfig.getId());
if (v.size() == 0) {
return null;
}
}
return v;
}))
.then();
});
}
return Mono.empty();
}
private AlarmRecordEntity ofRecord(AlarmTargetInfo targetInfo,
@ -204,10 +263,24 @@ public class AlarmSceneHandler implements SceneFilter, CommandLineRunner {
entity.setTargetName(targetInfo.getTargetName());
entity.setTargetId(targetInfo.getTargetId());
entity.setAlarmName(alarmConfigEntity.getName());
entity.setSourceId(targetInfo.getSourceId());
entity.setSourceType(targetInfo.getSourceType());
entity.setSourceName(targetInfo.getSourceName());
entity.generateId();
return entity;
}
// 修改告警记录
private Mono<Integer> updateAlarmRecord(AlarmConfigEntity config) {
return alarmRecordService
.createUpdate()
.set(AlarmRecordEntity::getAlarmName, config.getName())
.set(AlarmRecordEntity::getLevel, config.getLevel())
.where(AlarmRecordEntity::getAlarmConfigId, config.getId())
.execute();
}
@Override
public void run(String... args) throws Exception {

View File

@ -20,4 +20,27 @@ public class AlarmTargetInfo {
private String targetType;
private String sourceType;
private String sourceId;
private String sourceName;
public static AlarmTargetInfo of(String targetId, String targetName, String targetType) {
return AlarmTargetInfo.of(targetId, targetName, targetType, null, null, null);
}
public AlarmTargetInfo withTarget(String type, String id, String name) {
this.targetType = type;
this.targetId = id;
this.targetName = name;
return this;
}
public AlarmTargetInfo withSource(String type, String id, String name) {
this.sourceType = type;
this.sourceId = id;
this.sourceName = name;
return this;
}
}

View File

@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.validator.ValidatorUtils;
import org.jetlinks.community.rule.engine.enums.AlarmMode;
@ -23,6 +24,7 @@ import java.util.function.Function;
@AllArgsConstructor
@Component
@Slf4j
public class AlarmTaskExecutorProvider implements TaskExecutorProvider {
public static final String executor = "alarm";
@ -62,6 +64,7 @@ public class AlarmTaskExecutorProvider implements TaskExecutorProvider {
protected Publisher<RuleData> apply(RuleData input) {
return executor
.apply(input)
.doOnError(err -> log.warn("{} alarm error,rule:{}", config.mode, context.getInstanceId(), err))
.map(result -> context.newRuleData(input.newData(result.toMap())));
}

View File

@ -6,33 +6,31 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.authorization.DefaultDimensionType;
import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.i18n.LocaleUtils;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.rule.engine.RuleEngineConstants;
import org.jetlinks.community.rule.engine.entity.*;
import org.jetlinks.community.rule.engine.enums.AlarmHandleType;
import org.jetlinks.community.rule.engine.enums.AlarmRecordState;
import org.jetlinks.community.rule.engine.enums.AlarmState;
import org.jetlinks.community.rule.engine.scene.SceneRule;
import org.jetlinks.community.rule.engine.service.AlarmConfigService;
import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
import org.jetlinks.community.rule.engine.service.AlarmRecordService;
import org.jetlinks.community.topic.Topics;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.utils.CompositeSet;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity;
import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity;
import org.jetlinks.community.rule.engine.enums.AlarmRecordState;
import org.jetlinks.community.rule.engine.enums.AlarmState;
import org.jetlinks.community.rule.engine.scene.SceneRule;
import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
import org.jetlinks.community.rule.engine.service.AlarmRecordService;
import org.jetlinks.community.topic.Topics;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
@ -46,8 +44,13 @@ import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@Slf4j
@AllArgsConstructor
@ -59,8 +62,6 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
AlarmConstants.ConfigKey.alarmConfigId,
AlarmConstants.ConfigKey.alarmName,
AlarmConstants.ConfigKey.level,
AlarmConstants.ConfigKey.alarmTime,
AlarmConstants.ConfigKey.lastAlarmTime,
AlarmConstants.ConfigKey.targetType,
AlarmConstants.ConfigKey.state,
AlarmConstants.ConfigKey.ownerId
@ -77,6 +78,10 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
public final ReactiveRepository<AlarmRuleBindEntity, String> bindRepository;
private final ReactiveRepository<AlarmHandleHistoryEntity, String> handleHistoryRepository;
public final AlarmConfigService alarmConfigService;
@Override
public Flux<Result> triggered(ExecutionContext context, RuleData data) {
return this
@ -148,6 +153,11 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
entity.setTargetType(result.getTargetType());
entity.setTargetName(result.getTargetName());
entity.setTargetId(result.getTargetId());
entity.setSourceType(result.getSourceType());
entity.setSourceName(result.getSourceName());
entity.setSourceId(result.getSourceId());
entity.setAlarmName(result.getAlarmName());
entity.generateId();
return entity;
@ -175,30 +185,51 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
.of(result.getTargetType())
.convert(alarmData)
.map(result::copyWith);
});
})
.flatMap(info -> this
.getRecordCache(info.createRecordId())
.map(info::with)
.defaultIfEmpty(info));
}
private Mono<AlarmInfo> relieveAlarm(AlarmInfo result) {
// 已经被解除不重复更新
if (result.isCached() && !result.isAlarming()) {
return Mono.empty();
}
AlarmRecordEntity record = ofRecord(result);
//更新告警状态.
return alarmRecordService
.createUpdate()
.set(AlarmRecordEntity::getState, AlarmRecordState.normal)
.set(AlarmRecordEntity::getHandleTime, System.currentTimeMillis())
.where(AlarmRecordEntity::getId, record.getId())
.and(AlarmRecordEntity::getState, AlarmRecordState.warning)
.execute()
.map(total -> {
return Mono
.zip(alarmRecordService.changeRecordState(AlarmRecordState.normal, record.getId()),
updateRecordCache(record.getId(), RecordCache::withNormal),
(total, ignore) -> total)
.flatMap(total -> {
//如果有数据被更新说明是正在告警中
result.setAlarming(total > 0);
return result;
});
if (total > 0) {
result.setAlarming(true);
return saveAlarmHandleHistory(record);
}
return Mono.empty();
})
.thenReturn(result);
}
private Mono<Void> saveAlarmHandleHistory(AlarmRecordEntity record) {
AlarmHandleInfo alarmHandleInfo = new AlarmHandleInfo();
alarmHandleInfo.setHandleTime(System.currentTimeMillis());
alarmHandleInfo.setAlarmRecordId(record.getId());
alarmHandleInfo.setAlarmConfigId(record.getAlarmConfigId());
alarmHandleInfo.setAlarmTime(record.getAlarmTime());
alarmHandleInfo.setState(AlarmRecordState.normal);
alarmHandleInfo.setType(AlarmHandleType.system);
alarmHandleInfo.setDescribe(LocaleUtils.resolveMessage("message.scene_triggered_relieve_alarm", "场景触发解除告警"));
// TODO: 2022/12/22 批量缓冲保存
return handleHistoryRepository
.save(AlarmHandleHistoryEntity.of(alarmHandleInfo))
.then();
}
private Mono<AlarmInfo> triggerAlarm(AlarmInfo result) {
AlarmRecordEntity record = ofRecord(result);
@ -216,16 +247,17 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
})
.flatMap(total -> {
AlarmHistoryInfo historyInfo = createHistory(record, result);
result.setAlarmTime(record.getAlarmTime());
//更新结果返回0 说明是新产生的告警数据
if (total == 0) {
result.setFirstAlarm(true);
result.setAlarming(false);
result.setAlarmTime(record.getAlarmTime());
return alarmRecordService
.save(record)
.then(historyService.save(historyInfo))
.then(publishAlarmRecord(historyInfo))
.then(publishAlarmRecord(historyInfo, result))
.then(publishEvent(historyInfo))
.then(saveAlarmCache(result, record));
}
@ -245,41 +277,52 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
private AlarmHistoryInfo createHistory(AlarmRecordEntity record, AlarmInfo alarmInfo) {
AlarmHistoryInfo info = new AlarmHistoryInfo();
info.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
info.setId(IDGenerator.RANDOM.generate());
info.setAlarmConfigId(record.getAlarmConfigId());
info.setAlarmConfigName(record.getAlarmName());
info.setAlarmRecordId(record.getId());
info.setLevel(record.getLevel());
info.setAlarmTime(record.getAlarmTime());
info.setTargetName(record.getTargetName());
info.setTargetId(record.getTargetId());
info.setTargetType(record.getTargetType());
info.setSourceType(record.getSourceType());
info.setSourceName(record.getSourceName());
info.setSourceId(record.getSourceId());
info.setAlarmInfo(ObjectMappers.toJsonString(alarmInfo.getData().getOutput()));
return info;
}
public Mono<Void> publishAlarmRecord(AlarmHistoryInfo historyInfo) {
public Mono<Void> publishAlarmRecord(AlarmHistoryInfo historyInfo, AlarmInfo alarmInfo) {
String topic = Topics.alarm(historyInfo.getTargetType(), historyInfo.getTargetId(), historyInfo.getAlarmConfigId());
return eventBus.publish(topic, historyInfo).then();
return eventBus
.publish(topic, historyInfo)
.then();
}
private Mono<AlarmInfo> saveAlarmCache(AlarmInfo result,
AlarmRecordEntity record) {
return this
.getAlarmStorage(result.getAlarmConfigId())
.flatMap(store -> {
Mono<Void> save = store.setConfig("lastAlarmTime", record.getAlarmTime()).then();
if (!result.isAlarming()) {
save = save.then(store.setConfig("alarmTime", record.getAlarmTime()).then());
}
return save;
})
.updateRecordCache(record.getId(), cache -> cache.with(result))
.thenReturn(result);
// return this
// .getAlarmStorage(result.getAlarmConfigId())
// .flatMap(store -> {
// Map<String, Object> configs = new HashMap<>();
//
// configs.put(AlarmConstants.ConfigKey.lastAlarmTime, record.getAlarmTime());
// if (!result.isAlarming()) {
// configs.put(AlarmConstants.ConfigKey.alarmTime, record.getAlarmTime());
// }
// return store.setConfigs(configs);
// })
// .thenReturn(result);
}
private Mono<AlarmInfo> getAlarmInfo(ConfigStorage alarm) {
@ -432,6 +475,12 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
.createQuery()
.fetch()
.doOnNext(this::handleBind)
//加载告警配置数据到缓存
.thenMany(alarmConfigService
.createQuery()
.fetch()
.doOnNext(this::handleAlarmConfig)
)
.subscribe();
}
@ -446,14 +495,109 @@ public class DefaultAlarmRuleHandler implements AlarmRuleHandler, CommandLineRun
private AlarmData data;
private boolean cached;
@Override
public AlarmInfo copyWith(AlarmTargetInfo targetInfo) {
AlarmInfo result = FastBeanCopier.copy(this, new AlarmInfo());
result.setTargetType(targetInfo.getTargetType());
result.setTargetId(targetInfo.getTargetId());
result.setTargetName(targetInfo.getTargetName());
result.setSourceId(targetInfo.getSourceId());
result.setSourceType(targetInfo.getSourceType());
result.setSourceName(targetInfo.getSourceName());
return result;
}
public AlarmInfo with(RecordCache cache) {
this.setAlarmTime(cache.alarmTime);
this.setLastAlarmTime(cache.lastAlarmTime);
this.setAlarming(cache.isAlarming());
this.cached = true;
return this;
}
public String createRecordId() {
return AlarmRecordEntity.generateId(getTargetId(), getTargetType(), getAlarmConfigId());
}
}
private Mono<RecordCache> getRecordCache(String recordId) {
return storageManager
.getStorage("alarm-records")
.flatMap(store -> store
.getConfig(recordId)
.map(val -> val.as(RecordCache.class)));
}
private Mono<RecordCache> updateRecordCache(String recordId, Function<RecordCache, RecordCache> handler) {
return storageManager
.getStorage("alarm-records")
.flatMap(store -> store
.getConfig(recordId)
.map(val -> val.as(RecordCache.class))
.switchIfEmpty(Mono.fromSupplier(RecordCache::new))
.mapNotNull(handler)
.flatMap(cache -> store.setConfig(recordId, cache)
.thenReturn(cache)));
}
public static class RecordCache implements Externalizable {
static final byte stateNormal = 0x01;
static final byte stateAlarming = 0x02;
byte state;
long alarmTime;
long lastAlarmTime;
public boolean isAlarming() {
return state == stateAlarming;
}
public RecordCache withNormal() {
this.state = stateNormal;
return this;
}
public RecordCache withAlarming() {
this.state = stateAlarming;
return this;
}
public RecordCache with(Result record) {
this.lastAlarmTime = this.alarmTime == 0 ? record.getAlarmTime() : this.alarmTime;
this.alarmTime = record.getAlarmTime();
if (record.isAlarming() || record.isFirstAlarm()) {
this.state = stateAlarming;
} else {
this.state = stateNormal;
}
return this;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeByte(state);
out.writeLong(alarmTime);
out.writeLong(lastAlarmTime);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
state = in.readByte();
alarmTime = in.readLong();
lastAlarmTime = in.readLong();
}
}
}

View File

@ -1,7 +1,5 @@
package org.jetlinks.community.rule.engine.alarm;
import org.jetlinks.community.rule.engine.scene.SceneData;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import java.util.Map;
@ -10,7 +8,7 @@ import java.util.Map;
* @author bestfeng
*/
public class DeviceAlarmTarget implements AlarmTarget {
public class DeviceAlarmTarget extends AbstractAlarmTarget {
@Override
public String getType() {
@ -23,10 +21,15 @@ public class DeviceAlarmTarget implements AlarmTarget {
}
@Override
public Flux<AlarmTargetInfo> convert(AlarmData data) {
public Flux<AlarmTargetInfo> doConvert(AlarmData data) {
Map<String, Object> output = data.getOutput();
String deviceId = CastUtils.castString(output.get("deviceId"));
String deviceName = CastUtils.castString(output.getOrDefault("deviceName", deviceId));
String deviceId = AbstractAlarmTarget.getFromOutput("deviceId", output).map(String::valueOf).orElse(null);
String deviceName = AbstractAlarmTarget.getFromOutput("deviceName", output).map(String::valueOf).orElse(deviceId);
if (deviceId == null) {
return Flux.empty();
}
return Flux.just(AlarmTargetInfo.of(deviceId, deviceName, getType()));
}

View File

@ -1,6 +1,5 @@
package org.jetlinks.community.rule.engine.alarm;
import org.jetlinks.community.rule.engine.scene.SceneData;
import reactor.core.publisher.Flux;
/**
@ -22,10 +21,9 @@ public class OtherAlarmTarget implements AlarmTarget {
@Override
public Flux<AlarmTargetInfo> convert(AlarmData data) {
return Flux.just(AlarmTargetInfo
.of(data.getRuleId(),
.of(data.getAlarmConfigId(),
data.getRuleName(),
getType()));
}
}

View File

@ -1,7 +1,5 @@
package org.jetlinks.community.rule.engine.alarm;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import java.util.Map;
@ -10,7 +8,7 @@ import java.util.Map;
* @author bestfeng
*/
public class ProductAlarmTarget implements AlarmTarget {
public class ProductAlarmTarget extends AbstractAlarmTarget {
@Override
public String getType() {
@ -23,11 +21,13 @@ public class ProductAlarmTarget implements AlarmTarget {
}
@Override
public Flux<AlarmTargetInfo> convert(AlarmData data) {
public Flux<AlarmTargetInfo> doConvert(AlarmData data) {
Map<String, Object> output = data.getOutput();
String productId = CastUtils.castString(output.get("productId"));
String productName = CastUtils.castString(output.getOrDefault("productName", productId));
String productId = AbstractAlarmTarget.getFromOutput("productId", output).map(String::valueOf).orElse(null);
String productName = AbstractAlarmTarget.getFromOutput("productName", output).map(String::valueOf).orElse(productId);
return Flux.just(AlarmTargetInfo.of(productId, productName, getType()));
}
}

View File

@ -0,0 +1,109 @@
package org.jetlinks.community.rule.engine.entity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.rule.engine.enums.AlarmState;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.community.rule.engine.scene.TriggerType;
import javax.persistence.Column;
import javax.validation.constraints.NotBlank;
import java.util.ArrayList;
import java.util.List;
/**
* 告警配置详情.
*
* @author zhangji 2022/12/13
*/
@Getter
@Setter
public class AlarmConfigDetail {
@Schema(description = "告警配置ID")
private String id;
@Schema(description = "名称")
private String name;
@Schema(description = "告警目标类型")
private String targetType;
@Schema(description = "告警级别")
private Integer level;
@Schema(description = "关联场景")
private List<SceneInfo> scene;
@Schema(description = "状态")
private AlarmState state;
@Schema(description = "场景触发类型")
private TriggerType sceneTriggerType;
@Schema(description = "说明")
private String description;
@Schema(
description = "创建者ID(只读)"
, accessMode = Schema.AccessMode.READ_ONLY
)
private String creatorId;
@Schema(
description = "创建时间(只读)"
, accessMode = Schema.AccessMode.READ_ONLY
)
private Long createTime;
@Schema(description = "更新者ID", accessMode = Schema.AccessMode.READ_ONLY)
private String modifierId;
@Schema(description = "更新时间")
private Long modifyTime;
public static AlarmConfigDetail of(AlarmConfigEntity entity) {
return FastBeanCopier.copy(entity, new AlarmConfigDetail(), "sceneTriggerType");
}
public AlarmConfigDetail withScene(List<SceneEntity> sceneEntityList) {
List<SceneInfo> sceneList = new ArrayList<>();
for (SceneEntity sceneEntity : sceneEntityList) {
sceneList.add(SceneInfo.of(sceneEntity));
TriggerType triggerType = sceneEntity.getTriggerType();
// 存在一个手动触发场景则将告警配置视为手动触发类型
if (this.sceneTriggerType == null || triggerType == TriggerType.manual) {
this.sceneTriggerType = triggerType;
}
}
this.scene = sceneList;
return this;
}
/**
* 场景联动信息
*/
@Getter
@Setter
public static class SceneInfo {
@Schema(description = "场景联动ID")
private String id;
@Column(nullable = false)
@Schema(description = "场景联动名称")
@NotBlank
private String name;
@Schema(description = "触发器类型")
private TriggerType triggerType;
@Schema(description = "状态")
private RuleInstanceState state;
public static SceneInfo of(SceneEntity entity) {
return FastBeanCopier.copy(entity, new SceneInfo());
}
}
}

View File

@ -45,10 +45,12 @@ public class AlarmConfigEntity extends GenericEntity<String> implements RecordCr
@Column(length = 128)
@Schema(description = "关联场景名称")
@Deprecated
private String sceneName;
@Column(length = 64)
@Schema(description = "关联场景Id")
@Deprecated
private String sceneId;
@Column(length = 32, nullable = false)

View File

@ -63,14 +63,11 @@ public class AlarmHandleHistoryEntity extends GenericEntity<String> implements R
)
private Long createTime;
public static AlarmHandleHistoryEntity of(String alarmRecordId,
String alarmConfigId,
Long alarmTime,
AlarmHandleInfo handleInfo) {
public static AlarmHandleHistoryEntity of(AlarmHandleInfo handleInfo) {
AlarmHandleHistoryEntity entity = new AlarmHandleHistoryEntity();
entity.setAlarmId(alarmConfigId);
entity.setAlarmRecordId(alarmRecordId);
entity.setAlarmTime(alarmTime);
entity.setAlarmId(handleInfo.getAlarmConfigId());
entity.setAlarmRecordId(handleInfo.getAlarmRecordId());
entity.setAlarmTime(handleInfo.getAlarmTime());
entity.setHandleType(handleInfo.getType());
entity.setDescription(handleInfo.getDescribe());
entity.setHandleTime(handleInfo.getHandleTime() == null ? System.currentTimeMillis() : handleInfo.getHandleTime());

View File

@ -7,13 +7,15 @@ import lombok.Setter;
import org.jetlinks.community.rule.engine.alarm.AlarmTargetInfo;
import org.jetlinks.community.rule.engine.scene.SceneData;
import java.io.Serializable;
import java.util.*;
@Getter
@Setter
public class AlarmHistoryInfo {
public class AlarmHistoryInfo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "id")
@Schema(description = "告警数据ID")
private String id;
@Schema(description = "告警配置ID")
@ -26,13 +28,13 @@ public class AlarmHistoryInfo {
private String alarmRecordId;
@Schema(description = "告警级别")
private Integer level;
private int level;
@Schema(description = "说明")
private String description;
@Schema(description = "告警时间")
private Long alarmTime;
private long alarmTime;
@Schema(description = "告警目标类型")
private String targetType;
@ -43,10 +45,23 @@ public class AlarmHistoryInfo {
@Schema(description = "告警目标Id")
private String targetId;
@Schema(description = "告警源类型")
private String sourceType;
@Schema(description = "告警源Id")
private String sourceId;
@Schema(description = "告警源名称")
private String sourceName;
@Schema(description = "告警信息")
private String alarmInfo;
@Schema(description = "绑定信息")
private List<Map<String, Object>> bindings;
@Deprecated
public static AlarmHistoryInfo of(String alarmRecordId,
AlarmTargetInfo targetInfo,
SceneData data,
@ -58,12 +73,43 @@ public class AlarmHistoryInfo {
info.setLevel(alarmConfig.getLevel());
info.setId(data.getId());
info.setAlarmTime(System.currentTimeMillis());
info.setTargetName(targetInfo.getTargetName());
info.setTargetId(targetInfo.getTargetId());
info.setTargetType(targetInfo.getTargetType());
info.setSourceName(targetInfo.getSourceName());
info.setSourceType(targetInfo.getSourceType());
info.setSourceId(targetInfo.getSourceId());
info.setAlarmInfo(JSON.toJSONString(data.getOutput()));
info.setDescription(alarmConfig.getDescription());
info.setBindings(convertBindings(targetInfo, data, alarmConfig));
return info;
}
@SuppressWarnings("all")
@Deprecated
static List<Map<String, Object>> convertBindings(AlarmTargetInfo targetInfo,
SceneData data,
AlarmConfigEntity alarmConfig) {
List<Map<String, Object>> bindings = new ArrayList<>();
bindings.addAll((List) data.getOutput().getOrDefault("_bindings", Collections.emptyList()));
//添加告警配置创建人到bindings中作为用户维度信息
Map<String, Object> userDimension = new HashMap<>(2);
userDimension.put("type", "user");
userDimension.put("id", alarmConfig.getCreatorId());
bindings.add(userDimension);
//添加组织纬度信息
if ("org".equals(alarmConfig.getTargetType())) {
Map<String, Object> orgDimension = new HashMap<>(2);
userDimension.put("type", targetInfo.getTargetType());
userDimension.put("id", targetInfo.getTargetId());
bindings.add(userDimension);
}
return bindings;
}
}

View File

@ -3,12 +3,12 @@ package org.jetlinks.community.rule.engine.entity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.community.rule.engine.enums.AlarmRecordState;
import javax.persistence.Column;
@ -45,6 +45,19 @@ public class AlarmRecordEntity extends GenericEntity<String> {
@Schema(description = "告警目标名称")
private String targetName;
@Column(length = 32)
@Schema(description = "告警源类型")
private String sourceType;
@Column(length = 64)
@Schema(description = "告警源Id")
private String sourceId;
@Column
@Schema(description = "告警源名称")
private String sourceName;
@Column
@Schema(description = "最近一次告警时间")
private Long alarmTime;
@ -70,12 +83,12 @@ public class AlarmRecordEntity extends GenericEntity<String> {
public String getTargetKey() {
if (targetKey == null) {
generateKey();
generateTargetKey();
}
return targetKey;
}
public void generateKey() {
public void generateTargetKey() {
setTargetKey(generateId(targetId, targetType));
}
@ -89,4 +102,3 @@ public class AlarmRecordEntity extends GenericEntity<String> {
}

View File

@ -5,14 +5,19 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.Comment;
import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
import org.hswebframework.web.crud.annotation.EnableEntityEvent;
import org.hswebframework.web.crud.generator.Generators;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.community.rule.engine.RuleEngineConstants;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.cluster.RuleInstance;
import org.springframework.util.StringUtils;
import javax.persistence.Column;
@ -24,6 +29,8 @@ import java.sql.JDBCType;
@Getter
@Setter
@Table(name = "rule_instance")
@Comment("规则实例表")
@EnableEntityEvent
public class RuleInstanceEntity extends GenericEntity<String> implements RecordCreationEntity {
@Override
@ -51,29 +58,32 @@ public class RuleInstanceEntity extends GenericEntity<String> implements RecordC
private String modelType;
@Column(name = "model_meta")
@ColumnType(jdbcType = JDBCType.CLOB)
@ColumnType(jdbcType = JDBCType.LONGVARCHAR)
@Schema(description = "规则模型配置,不同的类型配置不同.")
private String modelMeta;
@Column(name = "model_version", nullable = false)
@Schema(description = "版本")
@DefaultValue("1")
private Integer modelVersion;
@Column(name = "create_time")
@Schema(description = "创建时间")
@DefaultValue(generator = Generators.CURRENT_TIME)
private Long createTime;
@Column(name = "creator_id")
@Schema(description = "创建者ID")
private String creatorId;
@Column(name = "state",length = 16)
@Column(name = "state", length = 16)
@EnumCodec
@ColumnType(javaType = String.class)
@DefaultValue("stopped")
@DefaultValue("disable")
@Schema(description = "状态")
private RuleInstanceState state;
@Comment("设备详情信息")
@Column(name = "instance_detail_json")
@ColumnType(jdbcType = JDBCType.CLOB)
@Hidden
@ -82,9 +92,17 @@ public class RuleInstanceEntity extends GenericEntity<String> implements RecordC
public RuleModel toRule(RuleEngineModelParser parser) {
RuleModel model = parser.parse(modelType, modelMeta);
model.setId(StringUtils.hasText(modelId)?modelId:getId());
model.setId(StringUtils.hasText(modelId) ? modelId : getId());
model.setName(name);
model.addConfiguration(RuleEngineConstants.ruleCreatorIdKey, creatorId);
return model;
}
public RuleInstance toRuleInstance(RuleEngineModelParser parser) {
RuleModel model = toRule(parser);
RuleInstance instance = new RuleInstance();
instance.setId(getId());
instance.setModel(model);
return instance;
}
}

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.rule.engine.entity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
@ -13,11 +14,11 @@ import org.hswebframework.web.api.crud.entity.RecordCreationEntity;
import org.hswebframework.web.api.crud.entity.RecordModifierEntity;
import org.hswebframework.web.crud.annotation.EnableEntityEvent;
import org.hswebframework.web.crud.generator.Generators;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.rule.engine.RuleEngineConstants;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.community.rule.engine.scene.SceneAction;
import org.jetlinks.community.rule.engine.scene.SceneRule;
import org.jetlinks.community.rule.engine.scene.Trigger;
import org.jetlinks.community.rule.engine.scene.TriggerType;
import org.jetlinks.community.rule.engine.scene.*;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.cluster.RuleInstance;
import javax.persistence.Column;
@ -26,6 +27,7 @@ import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
@Getter
@Setter
@ -69,6 +71,12 @@ public class SceneEntity extends GenericEntity<String> implements RecordCreation
@Schema(description = "执行动作")
private List<SceneAction> actions;
@Column
@JsonCodec
@ColumnType(javaType = String.class, jdbcType = JDBCType.LONGVARCHAR)
@Schema(description = "动作分支")
private List<SceneConditionAction> branches;
@Column(length = 64, updatable = false)
@Schema(description = "创建人")
private String creatorId;
@ -96,9 +104,15 @@ public class SceneEntity extends GenericEntity<String> implements RecordCreation
@EnumCodec
@ColumnType(javaType = String.class)
@NotBlank
@DefaultValue("started")
@DefaultValue("disable")
private RuleInstanceState state;
@Schema(description = "扩展配置")
@Column(name = "options")
@JsonCodec
@ColumnType(jdbcType = JDBCType.LONGVARCHAR)
private Map<String, Object> options;
@Column
@Schema(description = "说明")
private String description;
@ -108,7 +122,10 @@ public class SceneEntity extends GenericEntity<String> implements RecordCreation
RuleInstance instance = new RuleInstance();
instance.setId(getId());
instance.setModel(rule.toModel());
RuleModel model = rule.toModel();
model.addConfiguration(RuleEngineConstants.ruleCreatorIdKey, modifierId);
model.addConfiguration(RuleEngineConstants.ruleName, getName());
instance.setModel(model);
return instance;
}
@ -117,4 +134,12 @@ public class SceneEntity extends GenericEntity<String> implements RecordCreation
entity.setTriggerType(rule.getTrigger().getType());
return entity;
}
public void validate() {
getTrigger().validate();
if (CollectionUtils.isEmpty(getActions()) && CollectionUtils.isEmpty(getBranches())){
throw new BusinessException("error.scene_action_rule_cannot_be_null");
}
}
}

View File

@ -9,9 +9,8 @@ import org.hswebframework.web.dict.EnumDict;
@AllArgsConstructor
@Dict( "rule-instance-state")
public enum RuleInstanceState implements EnumDict<String> {
disable("已禁用"),
started("已启动"),
stopped("已停止");
started("正常"),
disable("禁用");
private final String text;
@Override

View File

@ -5,6 +5,7 @@ import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.community.rule.engine.alarm.AlarmConstants;
import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.utils.ConverterUtils;
@ -42,10 +43,12 @@ public class AlarmRecordMeasurementProvider extends StaticMeasurementProvider {
public String[] getTags(AlarmHistoryInfo info) {
Map<String, Object> tagMap = Maps.newLinkedHashMap();
tagMap.put("targetId", info.getTargetId());
tagMap.put("targetType", info.getTargetType());
tagMap.put("targetName", info.getTargetName());
tagMap.put("alarmConfigId", info.getAlarmConfigId());
tagMap.put(AlarmConstants.ConfigKey.targetId, info.getTargetId());
tagMap.put(AlarmConstants.ConfigKey.targetType, info.getTargetType());
tagMap.put(AlarmConstants.ConfigKey.targetName, info.getTargetName());
tagMap.put(AlarmConstants.ConfigKey.alarmConfigId, info.getAlarmConfigId());
return ConverterUtils.convertMapToTags(tagMap);
}
}
}

View File

@ -104,22 +104,22 @@ public class DeviceOperation {
List<TermColumn> terms = new ArrayList<>(32);
//服务器时间 // _now
terms.add(TermColumn.of("_now",
resolveI18n("message.scene_term_column_now", "服务器时间"),
DateTimeType.GLOBAL,
resolveI18n("message.scene_term_column_now_desc", "收到设备数据时,服务器的时间.")));
resolveI18n("message.scene_term_column_now", "服务器时间"),
DateTimeType.GLOBAL,
resolveI18n("message.scene_term_column_now_desc", "收到设备数据时,服务器的时间.")));
//数据上报时间 // timestamp
terms.add(TermColumn.of("timestamp",
resolveI18n("message.scene_term_column_timestamp", "数据上报时间"),
DateTimeType.GLOBAL,
resolveI18n("message.scene_term_column_timestamp_desc", "设备上报的数据中指定的时间.")));
resolveI18n("message.scene_term_column_timestamp", "数据上报时间"),
DateTimeType.GLOBAL,
resolveI18n("message.scene_term_column_timestamp_desc", "设备上报的数据中指定的时间.")));
//下发指令操作可以判断结果
if (operator == Operator.readProperty
|| operator == Operator.writeProperty
|| operator == Operator.invokeFunction) {
terms.add(TermColumn.of("success",
resolveI18n("message.scene_term_column_event_success", "执行是否成功"),
BooleanType.GLOBAL));
resolveI18n("message.scene_term_column_event_success", "场景触发是否成功"),
BooleanType.GLOBAL));
}
//属性相关
if (operator == Operator.readProperty
@ -128,31 +128,42 @@ public class DeviceOperation {
terms.addAll(
this.createTerm(
metadata.getProperties(),
(property, column) -> column.setChildren(createTermColumn("properties", property, true))));
(property, column) -> column.setChildren(createTermColumn("properties", property, true, PropertyValueType
.values())),
LocaleUtils.resolveMessage("message.device_metadata_property", "属性"))
);
} else {
//其他操作只能获取属性的上一次的值
terms.addAll(
this.createTerm(
metadata.getProperties(),
(property, column) -> column.setChildren(createTermColumn("properties", property, true, PropertyValueType.last)),
LocaleUtils.resolveMessage("message.device_metadata_property", "属性")));
}
//事件上报
else if (operator == Operator.reportEvent) {
if (operator == Operator.reportEvent) {
terms.addAll(
this.createTerm(
metadata.getEvent(eventId)
.<List<PropertyMetadata>>map(event -> Collections
.singletonList(
of("data",
event.getName(),
event.getType())
event.getName(),
event.getType())
))
.orElse(Collections.emptyList()),
(property, column) -> column.setChildren(createTermColumn("event", property, false))));
}
//调用功能
else if (operator == Operator.invokeFunction) {
if (operator == Operator.invokeFunction) {
terms.addAll(
this.createTerm(
metadata.getFunction(functionId)
.<List<PropertyMetadata>>map(meta -> Collections.singletonList(
of("output",
meta.getName(),
meta.getOutput()))
meta.getName(),
meta.getOutput()))
)
.orElse(Collections.emptyList()),
(property, column) -> column.setChildren(createTermColumn("function", property, false))));
@ -182,7 +193,7 @@ public class DeviceOperation {
return joiner.toString();
}
private List<TermColumn> createTermColumn(String prefix, PropertyMetadata property, boolean last) {
private List<TermColumn> createTermColumn(String prefix, PropertyMetadata property, boolean last, PropertyValueType... valueTypes) {
//对象类型嵌套
if (property.getValueType() instanceof ObjectType) {
ObjectType objType = ((ObjectType) property.getValueType());
@ -191,38 +202,44 @@ public class DeviceOperation {
(prop, column) -> {
String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId();
if (!last && !(prop.getValueType() instanceof ObjectType)) {
TermColumn term = createTermColumn(_prefix, prop, false).get(0);
TermColumn term = createTermColumn(_prefix, prop, false, valueTypes).get(0);
column.setColumn(term.getColumn());
column.setName(term.getName());
} else {
column.setChildren(createTermColumn(_prefix, prop, last));
column.setChildren(createTermColumn(_prefix, prop, last, valueTypes));
}
});
} else {
if (!last) {
return Collections.singletonList(
TermColumn.of(appendColumn(prefix, property.getId()),
property.getName(), property.getValueType())
.withMetrics(property)
property.getName(), property.getValueType())
.withMetrics(property)
.withMetadataTrue()
);
}
return Arrays
.stream(PropertyValueType.values())
.stream(valueTypes)
.map(type -> TermColumn
.of(appendColumn(prefix, property.getId(), type.name()), type.getName(), property.getValueType())
.withMetrics(property))
.withMetrics(property)
.withMetadataTrue()
)
.collect(Collectors.toList());
}
}
private List<TermColumn> createTerm(List<PropertyMetadata> metadataList,
BiConsumer<PropertyMetadata, TermColumn> consumer) {
BiConsumer<PropertyMetadata, TermColumn> consumer,
String... description) {
List<TermColumn> columns = new ArrayList<>(metadataList.size());
for (PropertyMetadata metadata : metadataList) {
TermColumn column = TermColumn.of(metadata);
column.setDescription(String.join("", description));
consumer.accept(metadata, column);
columns.add(column);
columns.add(column.withMetadataTrue());
}
return columns;
}
@ -239,17 +256,17 @@ public class DeviceOperation {
return;
case readProperty:
Assert.notEmpty(readProperties,
"error.scene_rule_trigger_device_operation_read_property_cannot_be_empty");
"error.scene_rule_trigger_device_operation_read_property_cannot_be_empty");
return;
case writeProperty:
Assert.notEmpty(writeProperties,
"error.scene_rule_trigger_device_operation_write_property_cannot_be_empty");
"error.scene_rule_trigger_device_operation_write_property_cannot_be_empty");
return;
case invokeFunction:
Assert.hasText(functionId,
"error.scene_rule_trigger_device_operation_function_id_cannot_be_null");
"error.scene_rule_trigger_device_operation_function_id_cannot_be_null");
Assert.notEmpty(functionParameters,
"error.scene_rule_trigger_device_operation_function_parameter_cannot_be_empty");
"error.scene_rule_trigger_device_operation_function_parameter_cannot_be_empty");
}
}

View File

@ -9,21 +9,22 @@ import org.hswebframework.ezorm.rdb.executor.PrepareSqlRequest;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
import org.hswebframework.web.validator.ValidatorUtils;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.TimerSpec;
import org.jetlinks.community.reactorql.term.TermType;
import org.jetlinks.community.reactorql.term.TermTypeSupport;
import org.jetlinks.community.reactorql.term.TermTypes;
import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec;
import org.jetlinks.community.rule.engine.executor.device.SelectorValue;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
import org.jetlinks.community.rule.engine.scene.term.TermTypeSupport;
import org.jetlinks.community.rule.engine.scene.term.TermTypes;
import org.jetlinks.community.rule.engine.scene.value.TermValue;
import org.jetlinks.reactor.ql.DefaultReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQL;
@ -57,9 +58,6 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
@NotNull(message = "error.scene_rule_trigger_device_operation_cannot_be_null")
private DeviceOperation operation;
@Schema(description = "拓展信息")
private Map<String,Object> options;
public SqlRequest createSql(List<Term> terms) {
return createSql(terms, true);
}
@ -86,6 +84,10 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
selectColumns.add("this.headers.deviceName \"deviceName\"");
selectColumns.add("this.headers.productId \"productId\"");
selectColumns.add("this.headers.productName \"productName\"");
//触发源信息
selectColumns.add("'device' \"" + SceneRule.SOURCE_TYPE_KEY + "\"");
selectColumns.add("this.deviceId \"" + SceneRule.SOURCE_ID_KEY + "\"");
selectColumns.add("this.deviceName \"" + SceneRule.SOURCE_NAME_KEY + "\"");
//消息唯一ID
selectColumns.add("this.headers._uid \"_uid\"");
//维度绑定信息,如部门等
@ -255,45 +257,95 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
@Override
protected SqlFragments createTermFragments(DeviceTrigger trigger, Term term) {
if (!StringUtils.hasText(term.getColumn())) {
return EmptySqlFragments.INSTANCE;
}
String termType = StringUtils.hasText(term.getTermType()) ? term.getTermType() : "is";
TermTypeSupport support = TermTypes
.lookupSupport(termType)
.orElseThrow(() -> new UnsupportedOperationException("unsupported termType " + termType));
String[] arr = term.getColumn().split("[.]");
Term copy = refactorTermValue("t", term.clone());
String column;
if (arr.length > 3 && arr[0].equals("properties")) {
column = "t['" + createColumnAlias(term.getColumn(), false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
} else {
column = "t['" + createColumnAlias(term.getColumn(), false) + "']";
}
List<TermValue> values = TermValue.of(term);
if (values.size() == 0) {
return support.createSql(column, null);
}
Object val;
Function<TermValue, Object> parser = value -> {
if (value.getSource() == TermValue.Source.manual) {
return value.getValue();
} else {
return NativeSql.of("t." + arr[1] + "_metric_" + value.getMetric());
}
};
if (values.size() == 1) {
val = parser.apply(values.get(0));
} else {
val = values
.stream()
.map(parser)
.collect(Collectors.toList());
}
return support.createSql(column, val);
return support.createSql(copy.getColumn(), copy.getValue(), term);
}
}
static String createTermColumn(String tableName, String column) {
String[] arr = column.split("[.]");
// properties.xxx.last的场景
if (arr.length > 3 && arr[0].equals("properties")) {
column = tableName + "['" + createColumnAlias(column, false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
} else {
column = tableName + "['" + createColumnAlias(column, false) + "']";
}
return column;
}
static Term refactorTermValue(String tableName, Term term) {
if (term.getColumn() == null) {
return term;
}
String[] arr = term.getColumn().split("[.]");
List<TermValue> values = TermValue.of(term);
if (values.size() == 0) {
return term;
}
Function<TermValue, Object> parser = value -> {
//上游变量
if (value.getSource() == TermValue.Source.variable
|| value.getSource() == TermValue.Source.upper) {
term.getOptions().add(TermType.OPTIONS_NATIVE_SQL);
return tableName + "['" + value.getValue() + "']";
}
//指标
else if (value.getSource() == TermValue.Source.metric) {
term.getOptions().add(TermType.OPTIONS_NATIVE_SQL);
return tableName + "['" + arr[1] + "_metric_" + value.getMetric() + "']";
}
//手动设置值
else {
return value.getValue();
}
};
Object val;
if (values.size() == 1) {
val = parser.apply(values.get(0));
} else {
val = values
.stream()
.map(parser)
.collect(Collectors.toList());
}
if (!term.getOptions().contains(TermType.OPTIONS_NATIVE_SQL)) {
String column;
// properties.xxx.last的场景
if (arr.length > 3 && arr[0].equals("properties")) {
column = tableName + "['" + createColumnAlias(term.getColumn(), false) + "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
} else if (!isBranchTerm(arr[0])) {
column = tableName + "['" + createColumnAlias(term.getColumn(), false) + "']";
} else {
column = term.getColumn();
}
term.setColumn(column);
}
term.setValue(val);
return term;
}
private static boolean isBranchTerm(String column) {
return column.startsWith("branch_") &&
column.contains("_group_")
&& column.contains("_action_");
}
static String parseProperty(String column) {
String[] arr = column.split("[.]");
@ -330,7 +382,7 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
static String createColumnAlias(String column, boolean wrapColumn) {
if (!column.contains(".")) {
return wrapColumnName(column);
return wrapColumn ? wrapColumnName(column) : column;
}
String[] arr = column.split("[.]");
String alias;
@ -361,10 +413,19 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
public List<Variable> createDefaultVariable() {
return Arrays.asList(
Variable.of("deviceId", "设备ID").withOption(Variable.OPTION_PRODUCT_ID,productId),
Variable.of("deviceName", "设备名称"),
Variable.of("productId", "产品ID"),
Variable.of("deviceId", "设备ID")
.withOption(Variable.OPTION_PRODUCT_ID, productId)
.withTermType(TermTypes.lookup(StringType.GLOBAL))
.withColumn("deviceId"),
Variable.of("deviceName", "设备名称")
.withTermType(TermTypes.lookup(StringType.GLOBAL))
.withColumn("deviceName"),
Variable.of("productId", "产品ID")
.withTermType(TermTypes.lookup(StringType.GLOBAL))
.withColumn("productId"),
Variable.of("productName", "产品名称")
.withTermType(TermTypes.lookup(StringType.GLOBAL))
.withColumn("productName")
);
}
@ -440,4 +501,4 @@ public class DeviceTrigger extends DeviceSelectorSpec implements Serializable {
}
}
}

View File

@ -1,33 +1,36 @@
package org.jetlinks.community.rule.engine.scene;
import com.google.common.collect.Lists;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.BooleanType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.community.reactorql.term.TermTypes;
import org.jetlinks.community.relation.utils.VariableSource;
import org.jetlinks.community.rule.engine.alarm.AlarmConstants;
import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider;
import org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider;
import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec;
import org.jetlinks.community.rule.engine.scene.term.TermTypes;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -38,8 +41,10 @@ import java.io.Serializable;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.hswebframework.web.i18n.LocaleUtils.*;
import static org.hswebframework.web.i18n.LocaleUtils.resolveMessage;
import static org.jetlinks.community.rule.engine.scene.SceneRule.createBranchActionId;
/**
* @see org.jetlinks.community.rule.engine.executor.TimerTaskExecutorProvider
@ -70,48 +75,107 @@ public class SceneAction implements Serializable {
private List<Term> terms;
@Schema(description = "拓展信息")
private Map<String,Object> options;
private Map<String, Object> options;
public Flux<Variable> createVariables(DeviceRegistry registry, Integer branchIndex, int index) {
/**
* 从拓展信息中获取需要查询的列,用于在设备触发等场景需要在sql中获取对应的数据.
*
* @param options 拓展信息
* @return terms
*/
private static List<String> parseColumnFromOptions(Map<String, Object> options) {
Object columns;
if (MapUtils.isEmpty(options) || (columns = options.get("columns")) == null) {
return Collections.emptyList();
}
//获取前端设置的columns
return ConverterUtils.convertToList(columns,String::valueOf);
}
/**
* 尝试从动作的变量中提取出需要动态获取的列信息
* @return 条件
*/
private List<String> parseActionTerms() {
if (executor == Executor.device && device != null) {
return device.parseColumns();
}
if (executor == Executor.notify && notify != null) {
return notify.parseColumns();
}
return Collections.emptyList();
}
public List<String> createContextColumns() {
List<String> termList = new ArrayList<>();
termList.addAll(parseColumnFromOptions(options));
termList.addAll(parseActionTerms());
return termList;
}
public Flux<Variable> createVariables(DeviceRegistry registry, Integer branchIndex, Integer group, int index) {
//设备
if (executor == Executor.device && device != null) {
return device
.getDeviceMetadata(registry, device.productId)
.map(metadata -> createVariable(branchIndex, index, device.createVariables(metadata)))
.map(metadata -> createVariable(branchIndex, group, index, device.createVariables(metadata)))
.flux()
.as(LocaleUtils::transform);
}
if (executor == Executor.alarm && alarm != null) {
return Mono
.fromSupplier(() -> createVariable(branchIndex, index, alarm.createVariables()))
.fromSupplier(() -> createVariable(branchIndex, group, index, alarm.createVariables()))
.flux()
.as(LocaleUtils::transform);
}
return Flux.empty();
}
private Variable createVariable(Integer branchIndex, int actionIndex, List<Variable> children) {
int humanIndex = actionIndex + 1;
private Variable createVariable(Integer branchIndex, Integer group, int actionIndex, List<Variable> children) {
String varId = "action_" + humanIndex;
String varId = "action_" + actionIndex;
if (branchIndex != null) {
varId = "branch_" + branchIndex + "_" + varId;
varId = createBranchActionId(branchIndex, group, actionIndex);
}
String message = resolveMessage(
String name = resolveMessage(
"message.action_var_index",
String.format("动作[%s]", humanIndex),
humanIndex
String.format("动作[%s]", actionIndex),
actionIndex
);
String fullName = resolveMessage(
"message.action_var_index_full",
String.format("动作[%s]输出", actionIndex),
actionIndex
);
Variable variable = Variable.of(varId, message);
String description = resolveMessage(
"message.action_var_output_description",
String.format("动作[%s]执行的输出结果", actionIndex),
actionIndex
);
Variable variable = Variable.of(varId, name);
variable.setFullName(fullName);
variable.setDescription(description);
variable.setChildren(children);
return variable;
}
private String getActionDescription() {
if (executor == null) {
return null;
}
return LocaleUtils.resolveMessage("message.scene_action_" + executor.name(), "");
}
public static SceneAction notify(String notifyType,
String notifierId,
String templateId,
@ -219,7 +283,9 @@ public class SceneAction implements Serializable {
"message.action_execute_success",
"执行是否成功"
))
.withType(BooleanType.ID));
.withType(BooleanType.ID)
.withTermType(TermTypes.lookup(BooleanType.GLOBAL))
);
//设备ID
variables.add(Variable
@ -231,6 +297,7 @@ public class SceneAction implements Serializable {
.withType(BooleanType.ID)
//标识变量属于哪个产品
.withOption(Variable.OPTION_PRODUCT_ID, productId)
.withTermType(TermTypes.lookup(StringType.GLOBAL))
);
if (message instanceof ReadPropertyMessage) {
@ -273,13 +340,36 @@ public class SceneAction implements Serializable {
return variables;
}
public List<String> parseColumns() {
if (MapUtils.isEmpty(message)) {
return Collections.emptyList();
}
DeviceMessage msg = (DeviceMessage) MessageType.convertMessage(message).orElse(null);
Collection<Object> readyToParse;
if (msg instanceof WritePropertyMessage) {
readyToParse = ((WritePropertyMessage) msg).getProperties().values();
} else if (msg instanceof FunctionInvokeMessage) {
readyToParse = Lists.transform(((FunctionInvokeMessage) msg).getInputs(), FunctionParameter::getValue);
} else {
return Collections.emptyList();
}
return readyToParse
.stream()
.flatMap(val -> parseColumnFromOptions(VariableSource.of(val).getOptions()).stream())
.collect(Collectors.toList());
}
}
private static Variable toVariable(String prefix,
PropertyMetadata metadata,
String i18nKey,
String msgPattern) {
return toVariable(prefix + metadata.getId(),
return toVariable(prefix.concat(".").concat(metadata.getId()),
metadata.getName(),
metadata.getValueType(),
i18nKey,
@ -300,6 +390,7 @@ public class SceneAction implements Serializable {
fullName));
variable.setType(dataType.getType());
variable.setTermTypes(TermTypes.lookup(dataType));
variable.setColumn(id);
if (dataType instanceof ObjectType) {
List<Variable> children = new ArrayList<>();
for (PropertyMetadata property : ((ObjectType) dataType).getProperties()) {
@ -352,6 +443,17 @@ public class SceneAction implements Serializable {
@Schema(description = "通知变量")
@NotBlank(message = "error.scene_rule_actions_notify_variables_cannot_be_blank")
private Map<String, Object> variables;
public List<String> parseColumns() {
if (MapUtils.isEmpty(variables)) {
return Collections.emptyList();
}
return variables
.values()
.stream()
.flatMap(val -> parseColumnFromOptions(VariableSource.of(val).getOptions()).stream())
.collect(Collectors.toList());
}
}
@ -367,22 +469,46 @@ public class SceneAction implements Serializable {
List<Variable> variables = new ArrayList<>();
variables.add(
Variable.of("alarmName",
Variable.of(AlarmConstants.ConfigKey.alarmName,
LocaleUtils.resolveMessage("message.alarm_config_name", "告警配置名称"))
.withType(StringType.GLOBAL)
);
variables.add(
Variable.of("level",
Variable.of(AlarmConstants.ConfigKey.level,
LocaleUtils.resolveMessage("message.alarm_level", "告警级别"))
.withType(IntType.ID)
.withType(IntType.GLOBAL)
);
// variables.add(
// Variable.of(AlarmConstants.ConfigKey.alarming,
// LocaleUtils.resolveMessage("message.is_alarming", "是否重复告警"))
// .withDescription(LocaleUtils.resolveMessage("message.is_alarming_description", "是否已存在告警中的记录"))
// .withType(BooleanType.GLOBAL)
// );
variables.add(
Variable.of(AlarmConstants.ConfigKey.firstAlarm,
LocaleUtils.resolveMessage("message.first_alarm", "是否首次告警"))
.withDescription(LocaleUtils.resolveMessage("message.first_alarm_description", "是否为首次告警或者解除后的第一次告警"))
.withType(BooleanType.GLOBAL)
);
variables.add(
Variable.of("alarming",
LocaleUtils.resolveMessage("message.is_alarming", "是否正在告警"))
.withType(BooleanType.ID)
Variable.of(AlarmConstants.ConfigKey.alarmTime,
LocaleUtils.resolveMessage("message.alarm_time", "首次告警时间"))
.withDescription(LocaleUtils.resolveMessage("message.alarm_time_description", "首次告警或者解除告警后的第一次告警时间"))
.withType(DateTimeType.GLOBAL)
);
variables.add(
Variable.of(AlarmConstants.ConfigKey.lastAlarmTime,
LocaleUtils.resolveMessage("message.last_alarm_time", "上一次告警时间"))
.withDescription(LocaleUtils.resolveMessage("message.last_alarm_time_description", "上一次触发告警的时间"))
.withType(DateTimeType.GLOBAL)
);
return variables;
}
}
@ -405,4 +531,4 @@ public class SceneAction implements Serializable {
alarm
}
}
}

View File

@ -3,8 +3,10 @@ package org.jetlinks.community.rule.engine.scene;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@Getter
@ -18,4 +20,16 @@ public class SceneActions implements Serializable {
private List<SceneAction> actions;
//仅用于设置到reactQl sql的column中
public List<String> createContextColumns(){
List<String> contextTerm = new ArrayList<>();
if (CollectionUtils.isNotEmpty(actions)){
for (SceneAction action : actions) {
contextTerm.addAll(action.createContextColumns());
}
}
return contextTerm;
}
}

View File

@ -3,11 +3,14 @@ package org.jetlinks.community.rule.engine.scene;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.jetlinks.community.rule.engine.commons.ShakeLimit;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@Getter
@Setter
@ -15,7 +18,7 @@ public class SceneConditionAction implements Serializable {
/**
* @see org.jetlinks.community.rule.engine.scene.term.TermColumn
* @see org.jetlinks.community.rule.engine.scene.term.TermType
* @see org.jetlinks.community.reactorql.term.TermType
* @see org.jetlinks.community.rule.engine.scene.value.TermValue
*/
@Schema(description = "条件")
@ -27,4 +30,26 @@ public class SceneConditionAction implements Serializable {
@Schema(description = "满足条件时执行的动作")
private List<SceneActions> then;
//仅用于设置到reactQl sql的column中
public List<Term> createContextTerm() {
List<Term> contextTerm = new ArrayList<>();
if (CollectionUtils.isNotEmpty(then)) {
for (SceneActions sceneActions : then) {
contextTerm.addAll(sceneActions
.createContextColumns()
.stream()
.map(column -> {
Term term = new Term();
term.setColumn(column);
return term;
})
.collect(Collectors.toList()));
}
}
if (CollectionUtils.isNotEmpty(when)) {
contextTerm.addAll(when);
}
return contextTerm;
}
}

View File

@ -14,6 +14,7 @@ import org.hswebframework.web.validator.ValidatorUtils;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.reactorql.term.TermTypes;
import org.jetlinks.community.rule.engine.commons.ShakeLimit;
import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
@ -45,6 +46,13 @@ public class SceneRule implements Serializable {
public static final String ACTION_KEY_GROUP_INDEX = "_groupIndex";
public static final String ACTION_KEY_ACTION_INDEX = "_actionIndex";
public static final String CONTEXT_KEY_SCENE_OUTPUT = "scene";
public static final String SOURCE_TYPE_KEY = "sourceType";
public static final String SOURCE_ID_KEY = "sourceId";
public static final String SOURCE_NAME_KEY = "sourceName";
@Schema(description = "告警ID")
@NotBlank(message = "error.scene_rule_id_cannot_be_blank")
private String id;
@ -58,9 +66,9 @@ public class SceneRule implements Serializable {
private Trigger trigger;
/**
* @see org.jetlinks.community.rule.engine.scene.term.TermColumn
* @see org.jetlinks.community.rule.engine.scene.term.TermType
* @see org.jetlinks.community.rule.engine.scene.value.TermValue
* @see org.jetlinks.pro.rule.engine.scene.term.TermColumn
* @see org.jetlinks.pro.reactorql.term.TermType
* @see org.jetlinks.pro.rule.engine.scene.value.TermValue
*/
@Schema(description = "触发条件")
private List<Term> terms;
@ -82,9 +90,17 @@ public class SceneRule implements Serializable {
public SqlRequest createSql(boolean hasWhere) {
if (trigger != null && trigger.getType() == TriggerType.device) {
List<Term> terms = new ArrayList<>();
if (CollectionUtils.isNotEmpty(this.terms)) {
terms.addAll(this.terms);
}
if (CollectionUtils.isNotEmpty(this.branches)) {
for (SceneConditionAction branch : branches) {
terms.addAll(branch.createContextTerm());
}
}
return trigger.getDevice().createSql(terms, hasWhere);
}
return EmptySqlRequest.INSTANCE;
}
@ -120,7 +136,7 @@ public class SceneRule implements Serializable {
Variable variable = Variable
.of("scene", LocaleUtils.resolveMessage(
"message.scene_trigger_" + trigger.getType().name() + "_output",
trigger.getType().getText() + "输出"
trigger.getType().getText() + "输出的数据"
));
List<Variable> defaultVariables = createDefaultVariable();
@ -128,19 +144,25 @@ public class SceneRule implements Serializable {
List<Variable> variables = new ArrayList<>(defaultVariables.size() + termVar.size());
//设备触发但是没有指定条件,或者其它触发类型,以下是内置的输出参数
if (trigger.getType() != TriggerType.device || CollectionUtils.isEmpty(termVar)) {
if (trigger.getType() != TriggerType.device) {
variables.add(Variable
.of("_now",
LocaleUtils.resolveMessage(
"message.scene_term_column_now",
"服务器时间"))
.withType(DateTimeType.ID));
variables.add(Variable
.of("timestamp",
LocaleUtils.resolveMessage(
"message.scene_term_column_timestamp",
"数据上报时间"))
.withType(DateTimeType.ID));
.withType(DateTimeType.ID)
.withTermType(TermTypes.lookup(DateTimeType.GLOBAL))
.withColumn("_now")
);
// variables.add(Variable
// .of("timestamp",
// LocaleUtils.resolveMessage(
// "message.scene_term_column_timestamp",
// "数据上报时间"))
// .withType(DateTimeType.ID)
// .withTermType(TermTypes.lookup(DateTimeType.GLOBAL))
// .withColumn("timestamp")
// );
}
variables.addAll(defaultVariables);
@ -163,7 +185,9 @@ public class SceneRule implements Serializable {
if (branchIndex == null && !parallel && actionIndex != null && CollectionUtils.isNotEmpty(actions)) {
for (int i = 0; i < Math.min(actions.size(), actionIndex + 1); i++) {
variables = variables.concatWith(actions.get(i).createVariables(registry, branchIndex, i));
variables = variables.concatWith(actions
.get(i)
.createVariables(registry, null, branchGroupIndex, i + 1));
}
}
//分支条件
@ -176,7 +200,9 @@ public class SceneRule implements Serializable {
CollectionUtils.isNotEmpty(actionList = then.getActions())) {
for (int i = 0; i < Math.min(actionList.size(), actionIndex + 1); i++) {
variables = variables.concatWith(actionList.get(i).createVariables(registry, branchIndex, i));
variables = variables.concatWith(actionList
.get(i)
.createVariables(registry, branchIndex + 1, branchGroupIndex + 1, i + 1));
}
}
@ -186,7 +212,7 @@ public class SceneRule implements Serializable {
.doOnNext(Variable::refactorPrefix);
}
private String createBranchActionId(int branchIndex, int groupId, int actionIndex) {
static String createBranchActionId(int branchIndex, int groupId, int actionIndex) {
return "branch_" + branchIndex + "_group_" + groupId + "_action_" + actionIndex;
}
@ -339,7 +365,7 @@ public class SceneRule implements Serializable {
//传递数据到下级节点
sceneNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER, true);
sceneNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER_KEY, "scene");
sceneNode.addConfiguration(AbstractExecutionContext.RECORD_DATA_TO_HEADER_KEY, CONTEXT_KEY_SCENE_OUTPUT);
//触发器
trigger.applyModel(model, sceneNode);
@ -372,7 +398,7 @@ public class SceneRule implements Serializable {
RuleLink link = model.link(preNode, actionNode);
//设置上一个节点到此节点的输出条件
if (CollectionUtils.isNotEmpty(preAction.getTerms())) {
link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms()));
link.setCondition(TermsConditionEvaluator.createCondition(trigger.refactorTerm("this", preAction.getTerms())));
}
preNode = actionNode;
}
@ -419,11 +445,17 @@ public class SceneRule implements Serializable {
RuleLink link = model.link(preNode, actionNode);
//设置上一个节点到此节点的输出条件
if (CollectionUtils.isNotEmpty(preAction.getTerms())) {
link.setCondition(TermsConditionEvaluator.createCondition(preAction.getTerms()));
link.setCondition(TermsConditionEvaluator.createCondition(trigger.refactorTerm("this", preAction.getTerms())));
}
} else if (trigger.getType() == TriggerType.manual) {
model.link(sceneNode, actionNode);
}
preNode = actionNode;
} else {
if (trigger.getType() == TriggerType.manual) {
model.link(sceneNode, actionNode);
}
}
model.getNodes().add(actionNode);

View File

@ -11,6 +11,7 @@ import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.rule.engine.RuleEngineConstants;
import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
@ -26,6 +27,7 @@ import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
@Slf4j
@AllArgsConstructor
@ -88,6 +90,20 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
this.rule = sceneRule;
}
private Object getDataId(Map<String, Object> data) {
Object id;
Object header = data.get("headers");
if (header instanceof Map) {
id = ((Map<?, ?>) header).get(PropertyConstants.uid.getKey());
} else {
id = data.get(PropertyConstants.uid.getKey());
}
if (null == id) {
id = IDGenerator.RANDOM.generate();
}
return id;
}
private ReactorQLContext createReactorQLContext() {
return ReactorQLContext
.ofDatasource(table -> {
@ -96,13 +112,7 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
return this
.subscribe(table)
//有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题
.as(FluxUtils.distinct(map -> {
Object id = map.get(PropertyConstants.uid.getKey());
if (null == id) {
id = IDGenerator.SNOW_FLAKE_STRING.generate();
}
return id;
}, Duration.ofSeconds(1)));
.as(FluxUtils.distinct(this::getDataId, Duration.ofSeconds(1)));
} else {
//来自上游(定时等)
return context
@ -152,7 +162,7 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
return rule
.createBranchHandler(
source,
(idx,nodeId, data) -> {
(idx, nodeId, data) -> {
if (log.isDebugEnabled()) {
log.debug("scene [{}] branch [{}] execute", rule.getId(), nodeId);
}
@ -193,25 +203,17 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
.topics(topic)
.subscriberId("scene:" + rule.getId())
.build())
.<Map<String, Object>>handle((topicPayload, synchronousSink) -> {
.handle((topicPayload, synchronousSink) -> {
try {
synchronousSink.next(topicPayload.bodyToJson(true));
} catch (Throwable err) {
log.warn("decode payload error {}", topicPayload.getTopic(), err);
}
})
//有效期去重,同一个设备在多个部门的场景下,可能收到2条相同的数据问题
.as(FluxUtils.distinct(map -> {
Object id = map.get(PropertyConstants.uid.getKey());
if (null == id) {
id = IDGenerator.SNOW_FLAKE_STRING.generate();
}
return id;
}, Duration.ofSeconds(5)));
});
}
private Mono<Void> handleOutput(RuleData data) {
private Mono<Void> handleOutput(RuleData data) {
return data
.dataToMap()
.filterWhen(map -> {
@ -242,6 +244,19 @@ public class SceneTaskExecutorProvider implements TaskExecutorProvider {
@Override
public Mono<Void> execute(RuleData ruleData) {
//分支
if (CollectionUtils.isNotEmpty(rule.getBranches())) {
if (log.isDebugEnabled()) {
log.debug("scene [{}] execute", rule.getId());
}
RuleData newData = context.newRuleData(ruleData);
return context
.getOutput()
.write(newData)
.onErrorResume(err -> context.onError(err, ruleData))
.as(tracer())
.then();
}
return handleOutput(ruleData);
}
}

View File

@ -25,9 +25,6 @@ public class SceneUtils {
*/
public static List<Variable> parseVariable(List<Term> terms,
List<TermColumn> columns) {
// if (CollectionUtils.isEmpty(terms)) {
// return Collections.emptyList();
// }
//平铺条件
Map<String, Term> termCache = expandTerm(terms);
@ -64,15 +61,14 @@ public class SceneUtils {
TermColumn column,
Function<String, Term> termSupplier) {
List<Variable> variables = new ArrayList<>(1);
String variableName = prefixName == null ? column.getName() : prefixName + "_" + column.getName();
String variableName = column.getName(); //prefixName == null ? column.getName() : prefixName + "/" + column.getName();
if (CollectionUtils.isEmpty(column.getChildren())) {
Term term = termSupplier.apply(column.getColumn());
variables.add(Variable.of(column.getVariable("_"), variableName)
.with(column)
);
if (term != null) {
//有条件的数据会有别名 以_分隔
variables.add(Variable
.of(column.getVariable("_"), variableName)
.withType(column.getDataType()));
List<TermValue> termValues = TermValue.of(term);
String property = column.getPropertyOrNull();
for (TermValue termValue : termValues) {
@ -80,18 +76,25 @@ public class SceneUtils {
if (property != null && metric != null && termValue.getSource() == TermValue.Source.metric) {
// temp_metric
variables.add(Variable.of(
property + "_metric_" + termValue.getMetric(),
(prefixName == null ? column.getName() : prefixName) + "_指标_" + metric.getName()));
property + "_metric_" + termValue.getMetric(),
(prefixName == null ? column.getName() : prefixName) + "_指标_" + metric.getName())
.withTermType(column.getTermTypes())
.withColumn(column.getColumn())
.withMetadata(column.isMetadata())
);
}
}
} else {
//没有条件,没有别名
variables.add(Variable.of(column.getVariable("."), variableName));
}
} else {
Variable variable = Variable.of(column.getColumn(), column.getName());
List<Variable> children = new ArrayList<>();
variable.setChildren(children);
variable.with(column);
variables.add(variable);
for (TermColumn child : column.getChildren()) {
variables.addAll(columnToVariable(variableName, child, termSupplier));
children.addAll(columnToVariable(column.getName(), child, termSupplier));
}
}
return variables;

View File

@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.Hidden;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.TimerSpec;
import org.jetlinks.community.rule.engine.commons.ShakeLimit;
@ -11,13 +12,11 @@ import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
@Getter
@Setter
@ -37,6 +36,29 @@ public class Trigger implements Serializable {
@Schema(description = "[type]为[timer]时不能为空")
private TimerSpec timer;
/**
* 重构查询条件,替换为实际将要输出的变量.
*
* @param terms 条件
* @return 重构后的条件
* @see DeviceTrigger#refactorTermValue(String,Term)
*/
public List<Term> refactorTerm(String tableName,List<Term> terms) {
if (CollectionUtils.isEmpty(terms)) {
return terms;
}
List<Term> target = new ArrayList<>(terms.size());
for (Term term : terms) {
Term copy = term.clone();
target.add(DeviceTrigger.refactorTermValue(tableName,copy));
if (org.apache.commons.collections4.CollectionUtils.isNotEmpty(copy.getTerms())) {
copy.setTerms(refactorTerm(tableName,copy.getTerms()));
}
}
return target;
}
public void validate() {
Assert.notNull(type, "error.scene_rule_trigger_cannot_be_null");
if (type == TriggerType.device) {

View File

@ -5,8 +5,11 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.rule.engine.scene.term.TermType;
import org.jetlinks.community.reactorql.term.TermType;
import org.jetlinks.community.reactorql.term.TermTypes;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
import org.springframework.util.StringUtils;
import java.util.HashMap;
@ -23,6 +26,12 @@ public class Variable {
@Schema(description = "变量名")
private String name;
@Schema(description = "变量全名")
private String fullName;
@Schema(description = "")
private String column;
@Schema(description = "说明")
private String description;
@ -38,9 +47,26 @@ public class Variable {
@Schema(description = "子级变量")
private List<Variable> children;
@Schema(description = "是否为物模型变量")
private boolean metadata;
@Schema(description = "其他配置")
private Map<String, Object> options;
public String getFullName() {
return fullName == null ? name : fullName;
}
public Variable withDescription(String description) {
this.description = description;
return this;
}
public Variable withMetadata(boolean metadata) {
this.metadata = metadata;
return this;
}
public synchronized Map<String, Object> safeOptions() {
return options == null ? options = new HashMap<>() : options;
}
@ -60,18 +86,58 @@ public class Variable {
return this;
}
public Variable withType(DataType type) {
withType(type.getId())
.withTermType(TermTypes.lookup(type));
return this;
}
public Variable withTermType(List<TermType> termTypes) {
this.termTypes = termTypes;
return this;
}
public Variable withColumn(String column) {
this.column = column;
return this;
}
public String getColumn() {
if (StringUtils.hasText(column)) {
return column;
}
return id;
}
public Variable with(TermColumn column) {
this.name = column.getName();
this.column = column.getColumn();
this.metadata = column.isMetadata();
this.description = column.getDescription();
this.fullName = column.getFullName();
this.type = column.getDataType();
this.termTypes = column.getTermTypes();
return this;
}
public void refactorPrefix() {
refactorPrefix(this);
}
public void refactorPrefix(Variable main) {
if (CollectionUtils.isNotEmpty(children)) {
for (Variable child : children) {
if (!child.getId().startsWith(id + ".")) {
child.setId(id + "." + child.getId());
if (!child.getId().startsWith(main.id + ".")) {
child.setId(main.id + "." + child.getId());
}
if (StringUtils.hasText(child.description)
&& StringUtils.hasText(description)) {
child.setDescription(description + "/" + child.description);
if (StringUtils.hasText(child.getFullName())
&& StringUtils.hasText(main.getFullName())) {
child.setFullName(main.getFullName() + "/" + child.getFullName());
}
child.refactorPrefix();
child.refactorPrefix(main);
}
}
}

View File

@ -13,6 +13,11 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
/**
* @deprecated
* @see org.jetlinks.community.reactorql.term.FixedTermTypeSupport
*/
@Deprecated
@Getter
public enum FixedTermTypeSupport implements TermTypeSupport {

View File

@ -14,6 +14,8 @@ import org.jetlinks.community.PropertyMetadataConstants;
import org.jetlinks.community.PropertyMetric;
import org.jetlinks.community.rule.engine.scene.DeviceOperation;
import org.springframework.util.StringUtils;
import org.jetlinks.community.reactorql.term.TermType;
import org.jetlinks.community.reactorql.term.TermTypes;
import java.util.*;
import java.util.function.Function;
@ -39,6 +41,9 @@ public class TermColumn {
@Schema(description = "数据类型")
private String dataType;
@Schema(description = "是否为物模型列")
private boolean metadata;
/**
* @see Term#getTermType()
*/
@ -54,6 +59,10 @@ public class TermColumn {
@Schema(description = "子列,在类型为object时有值")
private List<TermColumn> children;
public TermColumn withMetadataTrue() {
metadata = true;
return this;
}
public TermColumn copyColumn(Predicate<String> childrenPredicate) {
TermColumn copy = FastBeanCopier.copy(this, new TermColumn());

View File

@ -5,6 +5,10 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* @deprecated
* @see org.jetlinks.community.reactorql.term.TermType
*/
@Getter
@Setter
@AllArgsConstructor(staticName = "of")

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.rule.engine.scene.term;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.jetlinks.core.metadata.DataType;
@Deprecated
public interface TermTypeSupport {
String getType();

View File

@ -8,12 +8,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* @deprecated
* @see org.jetlinks.community.reactorql.term.TermTypes
*/
@Deprecated
public class TermTypes {
private static final Map<String, TermTypeSupport> supports = new LinkedHashMap<>();
static {
for (FixedTermTypeSupport value : FixedTermTypeSupport
.values()) {
for (FixedTermTypeSupport value : FixedTermTypeSupport.values()) {
register(value);
}
}

View File

@ -67,6 +67,8 @@ public class TermValue implements Serializable {
public enum Source {
manual,
metric
metric,
variable,
upper
}
}

View File

@ -2,20 +2,24 @@ package org.jetlinks.community.rule.engine.service;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.rule.engine.alarm.AlarmHandleInfo;
import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
import org.jetlinks.community.rule.engine.entity.AlarmHandleHistoryEntity;
import org.jetlinks.community.rule.engine.entity.SceneEntity;
import org.jetlinks.community.rule.engine.entity.*;
import org.jetlinks.community.rule.engine.enums.AlarmState;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
@ -30,29 +34,66 @@ public class AlarmConfigService extends GenericReactiveCrudService<AlarmConfigEn
private final ReactiveRepository<AlarmHandleHistoryEntity, String> handleHistoryRepository;
private final SceneService sceneService;
/**
* 处理告警
*
* @param id 告警记录ID
* @param handleInfo 处理信息
* @param info 告警处理信息
*/
public Mono<Void> handleAlarm(String id, AlarmHandleInfo handleInfo) {
public Mono<Void> handleAlarm(AlarmHandleInfo info){
return alarmRecordService
.findById(id)
.flatMap(alarmRecord -> alarmRecordService
.changeRecordState(handleInfo.getState(), handleInfo.getId())
.then(handleHistoryRepository
.save(AlarmHandleHistoryEntity.of(handleInfo.getId(),
alarmRecord.getAlarmConfigId(),
alarmRecord.getAlarmTime(),
handleInfo)))
.then());
.changeRecordState(info.getState(), info.getAlarmRecordId())
.flatMap(total-> {
if (total > 0){
return handleHistoryRepository
.save(AlarmHandleHistoryEntity.of(info));
}else {
return Mono.error(new BusinessException("error.the_alarm_record_has_been_processed"));
}
})
.then();
}
public Mono<PagerResult<AlarmConfigDetail>> queryDetailPager(QueryParamEntity query) {
return this
.queryPager(query)
.flatMap(result -> Flux
.fromIterable(result.getData())
.index()
.flatMap(tp2 -> this
// 转换为详情
.convertDetail(tp2.getT2())
.map(detail -> Tuples.of(tp2.getT1(), detail)))
// 重新排序,因为转为详情是异步的可能导致顺序乱掉
.sort(Comparator.comparingLong(Tuple2::getT1))
.map(Tuple2::getT2)
.collectList()
.map(detail -> PagerResult.of(result.getTotal(), detail, query)));
}
/**
* 转换为详情信息
*
* @param entity 告警配置
* @return 告警配置详情
*/
private Mono<AlarmConfigDetail> convertDetail(AlarmConfigEntity entity) {
return sceneService
.createQuery()
.and(SceneEntity::getId, "alarm-bind-rule", entity.getId())
.fetch()
.collectList()
.map(sceneInfo -> AlarmConfigDetail
.of(entity)
.withScene(sceneInfo));
}
//同步场景修改后的数据到告警配置中
@EventListener
@Deprecated
public void handleSceneSaved(EntitySavedEvent<SceneEntity> event) {
event.async(Mono.defer(() -> Flux
.fromIterable(event.getEntity())
@ -62,6 +103,7 @@ public class AlarmConfigService extends GenericReactiveCrudService<AlarmConfigEn
//同步场景修改后的数据到告警配置中
@EventListener
@Deprecated
public void handleSceneSaved(EntityModifyEvent<SceneEntity> event) {
Map<String, SceneEntity> before = event
@ -82,7 +124,7 @@ public class AlarmConfigService extends GenericReactiveCrudService<AlarmConfigEn
);
}
@Deprecated
public Mono<Void> updateByScene(SceneEntity entity) {
return createUpdate()
.set(AlarmConfigEntity::getSceneName, entity.getName())

View File

@ -7,8 +7,6 @@ import org.jetlinks.community.rule.engine.enums.AlarmRecordState;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.Date;
@Service
@AllArgsConstructor
public class AlarmRecordService extends GenericReactiveCrudService<AlarmRecordEntity, String> {
@ -19,13 +17,13 @@ public class AlarmRecordService extends GenericReactiveCrudService<AlarmRecordEn
* @param id 告警记录ID
* @return
*/
public Mono<Void> changeRecordState(AlarmRecordState state, String id) {
public Mono<Integer> changeRecordState(AlarmRecordState state, String id) {
return createUpdate()
.set(AlarmRecordEntity::getState, state)
.set(AlarmRecordEntity::getAlarmTime, new Date())
.set(AlarmRecordEntity::getHandleTime, System.currentTimeMillis())
.where(AlarmRecordEntity::getId, id)
.execute()
.then();
.not(AlarmRecordEntity::getState, state)
.execute();
}

View File

@ -1,9 +1,6 @@
package org.jetlinks.community.rule.engine.service;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.*;
import java.io.Serializable;
import java.util.Date;
@ -13,6 +10,7 @@ import java.util.Date;
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
@Generated
public class DebugMessage implements Serializable {
private String type;

View File

@ -1,18 +1,25 @@
package org.jetlinks.community.rule.engine.service;
import lombok.AllArgsConstructor;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.jetlinks.core.metadata.types.*;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.metadata.types.ArrayType;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* @author bestfeng
@ -20,7 +27,8 @@ import java.time.Duration;
@AllArgsConstructor
public class ElasticSearchAlarmHistoryService implements AlarmHistoryService {
public final static String ALARM_HISTORY_INDEX="alarm_history";
public final static String ALARM_HISTORY_INDEX = "alarm_history";
private final ElasticSearchIndexManager indexManager;
@ -32,18 +40,23 @@ public class ElasticSearchAlarmHistoryService implements AlarmHistoryService {
}
public Mono<Void> save(AlarmHistoryInfo historyInfo) {
return elasticSearchService.save(ALARM_HISTORY_INDEX, historyInfo);
return elasticSearchService.commit(ALARM_HISTORY_INDEX, createData(historyInfo));
}
public Mono<Void> save(Flux<AlarmHistoryInfo> historyInfo) {
return elasticSearchService.save(ALARM_HISTORY_INDEX, historyInfo);
return elasticSearchService.save(ALARM_HISTORY_INDEX, historyInfo.map(this::createData));
}
public Mono<Void> save(Mono<AlarmHistoryInfo> historyInfo) {
return elasticSearchService.save(ALARM_HISTORY_INDEX, historyInfo);
return elasticSearchService.save(ALARM_HISTORY_INDEX, historyInfo.map(this::createData));
}
public void init(){
private Map<String, Object> createData(AlarmHistoryInfo info) {
return FastBeanCopier.copy(info, new HashMap<>(16));
}
public void init() {
indexManager.putIndex(
new DefaultElasticSearchIndexMetadata(ALARM_HISTORY_INDEX)
.addProperty("id", StringType.GLOBAL)
@ -56,8 +69,14 @@ public class ElasticSearchAlarmHistoryService implements AlarmHistoryService {
.addProperty("targetType", StringType.GLOBAL)
.addProperty("targetName", StringType.GLOBAL)
.addProperty("targetId", StringType.GLOBAL)
.addProperty("sourceType", StringType.GLOBAL)
.addProperty("sourceName", StringType.GLOBAL)
.addProperty("sourceId", StringType.GLOBAL)
.addProperty("alarmInfo", StringType.GLOBAL)
.addProperty("creatorId", StringType.GLOBAL)
.addProperty("bindings", new ArrayType().elementType(StringType.GLOBAL))
).block(Duration.ofSeconds(10));
}
}

View File

@ -1,18 +0,0 @@
package org.jetlinks.community.rule.engine.service;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class ExecuteRuleRequest {
private String sessionId;
private String contextId;
private String startWith;
private String endWith;
private Object data;
}

View File

@ -17,6 +17,7 @@ import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -41,16 +42,18 @@ public class RuleInstanceService extends GenericReactiveCrudService<RuleInstance
return elasticSearchService.queryPager(RuleEngineLoggerIndexProvider.RULE_LOG, queryParam, RuleEngineExecuteLogInfo.class);
}
@Transactional
public Mono<Void> stop(String id) {
return this.ruleEngine
.shutdown(id)
.then(createUpdate()
.set(RuleInstanceEntity::getState, RuleInstanceState.stopped)
.set(RuleInstanceEntity::getState, RuleInstanceState.disable)
.where(RuleInstanceEntity::getId, id)
.execute())
.then();
}
@Transactional
public Mono<Void> start(String id) {
return findById(Mono.just(id))
.flatMap(this::doStart);

View File

@ -1,7 +1,6 @@
package org.jetlinks.community.rule.engine.service;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
@ -11,9 +10,9 @@ import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.rule.engine.entity.SceneEntity;
import org.jetlinks.community.rule.engine.enums.RuleInstanceState;
import org.jetlinks.community.rule.engine.scene.SceneRule;
import org.jetlinks.community.rule.engine.web.request.SceneExecuteRequest;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -23,12 +22,12 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@Service
@AllArgsConstructor
@Slf4j
public class SceneService extends GenericReactiveCrudService<SceneEntity, String> implements CommandLineRunner {
public class SceneService extends GenericReactiveCrudService<SceneEntity, String> {
private final RuleEngine ruleEngine;
@ -45,6 +44,25 @@ public class SceneService extends GenericReactiveCrudService<SceneEntity, String
.then();
}
public Mono<Void> executeBatch(Flux<SceneExecuteRequest> requestFlux) {
long t = System.currentTimeMillis();
return requestFlux
.doOnNext(request -> {
if (request.getContext() == null) {
request.setContext(new HashMap<>());
}
request.getContext().put("_now", t);
request.getContext().put("timestamp", t);
})
.flatMap(request -> ruleEngine
.getTasks(request.getId())
.filter(task -> task.getJob().getNodeId().equals(request.getId()))
.next()//只执行一个
.flatMap(task -> task.execute(RuleData.create(request.getContext()))))
.then();
}
@Transactional(rollbackFor = Throwable.class)
public Mono<SceneEntity> createScene(SceneRule rule) {
if (!StringUtils.hasText(rule.getId())) {
@ -52,7 +70,7 @@ public class SceneService extends GenericReactiveCrudService<SceneEntity, String
}
rule.validate();
SceneEntity entity = new SceneEntity().with(rule);
entity.setState(RuleInstanceState.started);
entity.setState(RuleInstanceState.disable);
return this
.insert(entity)
@ -127,8 +145,11 @@ public class SceneService extends GenericReactiveCrudService<SceneEntity, String
//禁用时,停止规则
if (scene.getState() == RuleInstanceState.disable) {
return ruleEngine.shutdown(scene.getId());
}else if (scene.getState() == RuleInstanceState.started){
scene.validate();
return ruleEngine.startRule(scene.getId(), scene.toRule().getModel());
}
return ruleEngine.startRule(scene.getId(), scene.toRule().getModel());
return Mono.empty();
})
.then();
}
@ -143,19 +164,4 @@ public class SceneService extends GenericReactiveCrudService<SceneEntity, String
);
}
@Override
public void run(String... args) {
createQuery()
.where()
.is(SceneEntity::getState, RuleInstanceState.started)
.fetch()
.flatMap(e -> Mono
.defer(() -> ruleEngine.startRule(e.getId(), e.toRule().getModel()).then())
.onErrorResume(err -> {
log.warn("启动场景[{}]失败", e.getName(), err);
return Mono.empty();
}))
.subscribe();
}
}

View File

@ -1,105 +0,0 @@
package org.jetlinks.community.rule.engine.service.terms;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
import org.hswebframework.ezorm.rdb.metadata.TableOrViewMetadata;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.*;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.term.AbstractTermFragmentBuilder;
import org.hswebframework.web.api.crud.entity.TermExpressionParser;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 根据告警记录查询设备相关数据可以使用 {@link org.jetlinks.community.rule.engine.entity.DeviceAlarmHistoryEntity}的属性作为关联条件
* <p>
* where id dev-alarm 'state not xxx'
*
* @author zhouhao
* @see org.jetlinks.community.rule.engine.entity.DeviceAlarmHistoryEntity
* @since 1.12
*/
@Component
public class DeviceAlarmTermBuilder extends AbstractTermFragmentBuilder {
public DeviceAlarmTermBuilder() {
super("dev-alarm", "根据告警查询设备相关数据");
}
@SuppressWarnings("all")
public static List<Term> convertTerms(Object value) {
if (value instanceof String) {
String strVal = String.valueOf(value);
//json字符串
if (strVal.startsWith("[")) {
value = JSON.parseArray(strVal);
} else {
//表达式
return TermExpressionParser.parse(strVal);
}
}
if (value instanceof List) {
return new JSONArray(((List) value)).toJavaList(Term.class);
} else {
throw new UnsupportedOperationException("unsupported term value:" + value);
}
}
@Override
public SqlFragments createFragments(String columnFullName, RDBColumnMetadata column, Term term) {
List<Term> terms = convertTerms(term.getValue());
PrepareSqlFragments fragments = PrepareSqlFragments.of();
if(term.getOptions().contains("not")){
fragments.addSql("not");
}
fragments.addSql("exists( select 1 from rule_dev_alarm_history _his where", columnFullName)
.addSql("= _his.device_id");
RDBTableMetadata metadata = column
.getOwner()
.getSchema()
.getTable("rule_dev_alarm_history")
.orElseThrow(() -> new UnsupportedOperationException("unsupported dev-alarm"));
SqlFragments where = builder.createTermFragments(metadata, terms);
if (!where.isEmpty()) {
fragments.addSql("and")
.addFragments(where);
}
fragments.addSql(")");
return fragments;
}
static AlarmTermBuilder builder = new AlarmTermBuilder();
static class AlarmTermBuilder extends AbstractTermsFragmentBuilder<TableOrViewMetadata> {
@Override
protected SqlFragments createTermFragments(TableOrViewMetadata parameter, List<Term> terms) {
return super.createTermFragments(parameter, terms);
}
@Override
protected SqlFragments createTermFragments(TableOrViewMetadata table, Term term) {
if (term.getValue() instanceof NativeSql) {
NativeSql sql = ((NativeSql) term.getValue());
return PrepareSqlFragments.of(sql.getSql(), sql.getParameters());
}
return table
.getColumn(term.getColumn())
.flatMap(column -> table
.findFeature(TermFragmentBuilder.createFeatureId(term.getTermType()))
.map(termFragment -> termFragment.createFragments(column.getFullName("_his"), column, term)))
.orElse(EmptySqlFragments.INSTANCE);
}
}
}

View File

@ -4,6 +4,8 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.authorization.annotation.Authorize;
import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource;
@ -12,6 +14,7 @@ import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.jetlinks.community.rule.engine.alarm.AlarmLevelInfo;
import org.jetlinks.community.rule.engine.alarm.AlarmTargetSupplier;
import org.jetlinks.community.rule.engine.entity.AlarmConfigDetail;
import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
import org.jetlinks.community.rule.engine.entity.AlarmLevelEntity;
import org.jetlinks.community.rule.engine.service.AlarmConfigService;
@ -80,6 +83,13 @@ public class AlarmConfigController implements ReactiveServiceCrudController<Alar
.then();
}
@PostMapping("/detail/_query")
@Operation(summary = "查询告警配置详情")
@QueryAction
public Mono<PagerResult<AlarmConfigDetail>> queryDetailPager(@RequestBody Mono<QueryParamEntity> query) {
return query.flatMap(alarmConfigService::queryDetailPager);
}
@GetMapping("/default/level")
@Operation(summary = " 获取默认告警级别")
@QueryAction

View File

@ -10,14 +10,17 @@ import org.hswebframework.web.authorization.annotation.QueryAction;
import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.SaveAction;
import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
import org.jetlinks.community.rule.engine.alarm.AlarmHandleInfo;
import org.jetlinks.community.rule.engine.entity.AlarmHandleHistoryEntity;
import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity;
import org.jetlinks.community.rule.engine.service.AlarmConfigService;
import org.jetlinks.community.rule.engine.service.AlarmHandleHistoryService;
import org.jetlinks.community.rule.engine.service.AlarmRecordService;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@ -26,7 +29,7 @@ import reactor.core.publisher.Mono;
@Authorize
@Tag(name = "告警记录")
@AllArgsConstructor
public class AlarmRecordController implements ReactiveServiceCrudController<AlarmRecordEntity, String> {
public class AlarmRecordController implements ReactiveServiceQueryController<AlarmRecordEntity, String> {
private final AlarmRecordService recordService;
@ -39,21 +42,18 @@ public class AlarmRecordController implements ReactiveServiceCrudController<Alar
return recordService;
}
@PostMapping("/{id}/_handle")
@PostMapping("/_handle")
@Operation(summary = "处理告警")
@SaveAction
public Mono<Void> handleAlarm(@PathVariable String id, @RequestBody Mono<AlarmHandleInfo> handleInfo) {
public Mono<Void> handleAlarm(@RequestBody Mono<AlarmHandleInfo> handleInfo) {
return handleInfo
.flatMap(info -> configService.handleAlarm(id, info));
.flatMap(configService::handleAlarm);
}
@PostMapping("/handle-history/_query")
@Operation(summary = "告警处理历史查询")
@QueryAction
public Mono<PagerResult<AlarmHandleHistoryEntity>> queryHandleHistoryPager(@RequestBody Mono<QueryParamEntity> query) {
return query.flatMap(handleHistoryService::queryPager);
}
}

View File

@ -14,6 +14,7 @@ import org.hswebframework.web.authorization.annotation.SaveAction;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.rule.engine.service.SceneService;
import org.jetlinks.community.rule.engine.web.request.SceneExecuteRequest;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.community.rule.engine.entity.SceneEntity;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProvider;
@ -26,7 +27,6 @@ import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@RestController
@ -81,6 +81,13 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
return context.flatMap(ctx -> service.execute(id, ctx));
}
@PostMapping("/batch/_execute")
@Operation(summary = "批量手动执行场景")
@SaveAction
public Mono<Void> executeBatch(@RequestBody Flux<SceneExecuteRequest> request) {
return service.executeBatch(request);
}
@DeleteMapping("/{id}")
@Operation(summary = "删除场景")
@DeleteAction
@ -119,18 +126,12 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
.zip(
parseTermColumns(cache).collectList(),
cache,
(columns, rule) -> {
Map<String, Term> terms = SceneUtils.expandTerm(rule.getTerms());
return rule.createVariables(columns
.stream()
.filter(column -> column.hasColumn(terms.keySet()))
.map(column -> column.copyColumn(terms::containsKey))
.collect(Collectors.toList()),
branch,
branchGroup,
action,
deviceRegistry);
})
(columns, rule) -> rule
.createVariables(columns ,
branch,
branchGroup,
action,
deviceRegistry))
.flatMapMany(Function.identity());
}

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.rule.engine.web.request;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
/**
* 触发场景请求.
*
* @author zhangji 2022/12/14
*/
@Getter
@Setter
public class SceneExecuteRequest {
@Schema(description = "场景ID")
private String id;
@Schema(description = "数据")
private Map<String, Object> context;
}