feat(场景联动): 添加设备数据执行动作,扩展数组条件,优化国际化 (#605)

This commit is contained in:
Zhang Ji 2025-01-23 18:57:15 +08:00 committed by GitHub
parent ad6279ed22
commit 27a9fd6618
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
53 changed files with 2331 additions and 144 deletions

View File

@ -19,6 +19,11 @@
<version>${jetlinks.version}</version>
</dependency>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
</dependency>
<dependency>
<groupId>org.hswebframework.web</groupId>
<artifactId>hsweb-authorization-api</artifactId>

View File

@ -20,6 +20,8 @@ import org.jetlinks.community.config.ConfigScopeProperties;
import org.jetlinks.community.config.SimpleConfigManager;
import org.jetlinks.community.config.entity.ConfigEntity;
import org.jetlinks.community.dictionary.DictionaryJsonDeserializer;
import org.jetlinks.community.reactorql.aggregation.InternalAggregationSupports;
import org.jetlinks.community.reactorql.function.InternalFunctionSupport;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.community.reference.DataReferenceProvider;
import org.jetlinks.community.reference.DefaultDataReferenceManager;
@ -32,10 +34,12 @@ import org.jetlinks.community.service.DefaultUserBindService;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.DataTypes;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.reactor.ql.feature.Feature;
import org.jetlinks.reactor.ql.supports.DefaultReactorQLMetadata;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.supports.official.JetLinksDataTypeCodecs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanPostProcessor;
@ -47,6 +51,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import reactor.core.Exceptions;
import reactor.core.publisher.Hooks;
@ -59,6 +64,7 @@ import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Map;
@Configuration
@SuppressWarnings("all")
@ -66,6 +72,9 @@ import java.util.Date;
public class CommonConfiguration {
static {
InternalAggregationSupports.register();
InternalFunctionSupport.register();
BeanUtilsBean.getInstance().getConvertUtils().register(new Converter() {
@Override
public <T> T convert(Class<T> aClass, Object o) {
@ -149,6 +158,23 @@ public class CommonConfiguration {
}
}, EnumDict.class);
BeanUtilsBean.getInstance().getConvertUtils().register(new Converter() {
@Override
@Generated
public <T> T convert(Class<T> type, Object value) {
if (value instanceof Map) {
Map<String, Object> map = ((Map) value);
String typeId = (String) map.get("type");
if (StringUtils.isEmpty(typeId)) {
return null;
}
return (T) JetLinksDataTypeCodecs.decode(DataTypes.lookup(typeId).get(), map);
}
return null;
}
}, DataType.class);
//捕获jvm错误,防止Flux被挂起
Hooks.onOperatorError((err, val) -> {
if (Exceptions.isJvmFatal(err)) {
@ -180,6 +206,7 @@ public class CommonConfiguration {
@Bean
public Jackson2ObjectMapperBuilderCustomizer jackson2ObjectMapperBuilderCustomizer(){
return builder->{
builder.deserializerByType(DataType.class, new DataTypeJSONDeserializer());
builder.deserializerByType(Date.class,new SmartDateDeserializer());
builder.deserializerByType(EnumDict.class, new DictionaryJsonDeserializer());
};

View File

@ -0,0 +1,26 @@
package org.jetlinks.community.configuration;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.jetlinks.core.metadata.DataType;
import java.io.IOException;
import java.util.Map;
/**
*
* @author zhangji 2025/1/23
* @since 2.3
*/
public class DataTypeJSONDeserializer extends JsonDeserializer<DataType> {
@Override
public DataType deserialize(JsonParser parser, DeserializationContext ctxt) throws IOException, JsonProcessingException {
Map<String,Object> map= ctxt.readValue(parser, Map.class);
return (DataType) BeanUtilsBean.getInstance().getConvertUtils().convert(map, DataType.class);
}
}

View File

@ -0,0 +1,35 @@
package org.jetlinks.community.reactorql.aggregation;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.ezorm.rdb.operator.dml.FunctionColumn;
import org.jetlinks.community.spi.Provider;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.function.Function;
/**
* 聚合函数支持.
*
* @author zhangji 2025/1/22
* @since 2.3
*/
public interface AggregationSupport extends Function<Publisher<?>, Mono<?>> {
Provider<AggregationSupport> supports = Provider.create(AggregationSupport.class);
String getId();
String getName();
SqlFragments createSql(FunctionColumn column);
static AggregationSupport getNow(String id) {
return AggregationSupport.supports
.get(id.toUpperCase())
.orElseGet(() -> AggregationSupport.supports
.get(id.toLowerCase())
.orElseGet(() -> AggregationSupport.supports.getNow(id)));
}
}

View File

@ -0,0 +1,82 @@
package org.jetlinks.community.reactorql.aggregation;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.BatchSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.ezorm.rdb.operator.dml.FunctionColumn;
import org.jetlinks.reactor.ql.supports.agg.MapAggFeature;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.math.MathFlux;
import java.util.Comparator;
import java.util.function.Function;
import static org.jetlinks.reactor.ql.supports.DefaultReactorQLMetadata.addGlobal;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@AllArgsConstructor
public enum InternalAggregationSupports implements AggregationSupport {
COUNT("总数", Flux::count, 0),
//去重计数
DISTINCT_COUNT("总数(去重)", flux -> flux.distinct().count(), 0) {
@Override
public SqlFragments createSql(FunctionColumn column) {
return new BatchSqlFragments().addSql("count(distinct ", column.getColumn(), ")");
}
},
MIN("最小值",
numberFlux -> MathFlux.min(numberFlux.map(CastUtils::castNumber), Comparator.comparing(Number::doubleValue)), null),
MAX("最大值", numberFlux -> MathFlux.max(numberFlux.map(CastUtils::castNumber), Comparator.comparing(Number::doubleValue)), null),
AVG("平均值", numberFlux -> MathFlux.averageDouble(numberFlux.map(CastUtils::castNumber), Number::doubleValue), null),
SUM("总和", numberFlux -> MathFlux.sumDouble(numberFlux.map(CastUtils::castNumber), Number::doubleValue), 0),
FIRST("第一个值", numberFlux -> numberFlux.take(1).singleOrEmpty(), null),
LAST("最后一个值", numberFlux -> numberFlux.takeLast(1).singleOrEmpty(), null),
// MEDIAN("中位数", numberFlux -> Mono.empty(), null),//中位数
// SPREAD("极差", numberFlux -> Mono.empty(), null),//差值
// STDDEV("标准差", numberFlux -> Mono.empty(), null),//标准差
;
static {
for (InternalAggregationSupports value : values()) {
addGlobal(new MapAggFeature(value.getId(), value::apply));
AggregationSupport.supports.register(value.getId(), value);
}
}
public static void register(){
}
@Getter
private final String name;
private final Function<Flux<?>, Mono<?>> computer;
@Getter
private final Object defaultValue;
@Override
public SqlFragments createSql(FunctionColumn column) {
return new BatchSqlFragments()
.addSql(name() + "(").addSql(column.getColumn()).addSql(")");
}
@Override
public String getId() {
return name();
}
@Override
public Mono<?> apply(Publisher<?> publisher) {
return computer.apply(Flux.from(publisher));
}
}

View File

@ -0,0 +1,25 @@
package org.jetlinks.community.reactorql.function;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import java.util.List;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Getter
@Setter
public class FunctionInfo {
private String id;
private String name;
private DataType outputType;
private List<PropertyMetadata> args;
}

View File

@ -0,0 +1,72 @@
package org.jetlinks.community.reactorql.function;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.spi.Provider;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 函数支持,用于定义在可以在ReactorQL中使用的函数.
*
* @author zhangji 2025/1/22
* @since 2.3
*/
public interface FunctionSupport {
Provider<FunctionSupport> supports = Provider.create(FunctionSupport.class);
String getId();
String getName();
/**
* 是否支持列的数据类型
*
* @param type 数据类型
* @return 是否支持
*/
boolean isSupported(DataType type);
/**
* 获取输出数据类型
*
* @return 输出数据类型
*/
DataType getOutputType();
/**
* 创建SQL函数片段
*
* @param column 列名
* @param args 参数
* @return SQL函数片段
*/
SqlFragments createSql(String column, Map<String, Object> args);
/**
* 查找支持的函数
*
* @param type 数据类型
* @return 函数信息
*/
static List<FunctionInfo> lookup(DataType type) {
return supports
.getAll()
.stream()
.filter(support -> support.isSupported(type))
.map(FunctionSupport::toInfo)
.collect(Collectors.toList());
}
default FunctionInfo toInfo() {
FunctionInfo info = new FunctionInfo();
info.setId(getId());
info.setOutputType(getOutputType());
info.setName(getName());
return info;
}
}

View File

@ -0,0 +1,70 @@
package org.jetlinks.community.reactorql.function;
import com.google.common.collect.Sets;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.ArrayType;
import org.jetlinks.core.metadata.types.LongType;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@AllArgsConstructor
public enum InternalFunctionSupport implements FunctionSupport {
array_len("集合长度", LongType.GLOBAL, ArrayType.ID);
private final String name;
private final DataType outputType;
private final Set<String> supportTypes;
static {
for (InternalFunctionSupport value : values()) {
InternalFunctionSupport.supports.register(value.getId(), value);
}
}
public static void register(){
}
InternalFunctionSupport(String name, DataType outputType, String... supportTypes) {
this.name = name;
this.outputType = outputType;
this.supportTypes = Collections.unmodifiableSet(
Sets.newHashSet(supportTypes)
);
}
@Override
public String getId() {
return name();
}
@Override
public String getName() {
return name;
}
@Override
public boolean isSupported(DataType type) {
return supportTypes.contains(type.getId());
}
@Override
public DataType getOutputType() {
return outputType;
}
@Override
public SqlFragments createSql(String column, Map<String, Object> args) {
return SqlFragments.of(getId() + "(", column, ")");
}
}

View File

@ -0,0 +1,424 @@
package org.jetlinks.community.reactorql.impl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import net.sf.jsqlparser.expression.*;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SubSelect;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.SqlRequests;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.BatchSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.ezorm.rdb.operator.dml.FunctionColumn;
import org.hswebframework.ezorm.rdb.operator.dml.FunctionTerm;
import org.hswebframework.web.api.crud.entity.TermExpressionParser;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.reactorql.aggregation.AggregationSupport;
import org.jetlinks.community.utils.ConverterUtils;
import org.jetlinks.community.utils.ReactorUtils;
import org.jetlinks.core.metadata.Jsonable;
import org.jetlinks.reactor.ql.DefaultReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.feature.FeatureId;
import org.jetlinks.reactor.ql.feature.ValueMapFeature;
import org.jetlinks.reactor.ql.supports.DefaultReactorQLMetadata;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
public class ComplexExistsFunction implements ValueMapFeature {
//集合中的元素
public static final String COL_ELEMENT = "_element";
//集合自身
public static final String COL_SELF = "_self";
//原始行
public static final String COL_ROW = "_row";
public static final ComplexExistsFunction INSTANCE = new ComplexExistsFunction();
public static final String function = "complex_exists";
static {
DefaultReactorQLMetadata.addGlobal(INSTANCE);
}
public static void register() {
}
public static ExistsSpec createExistsSpec(Object val) {
if (val instanceof ExistsSpec) {
return ((ExistsSpec) val);
}
if (val instanceof Map) {
ExistsSpec spec = new ExistsSpec();
spec.fromJson(new JSONObject((Map) val));
return spec;
}
if (val instanceof String) {
ExistsSpec spec = new ExistsSpec();
spec.setFilter(TermExpressionParser.parse(val.toString()));
return spec;
}
return FastBeanCopier.copy(val, new ExistsSpec());
}
private static ExistsProcessor createExprProcessor(Expression expr) {
String sql;
if (expr instanceof Select) {
sql = expr.toString();
} else if (expr instanceof SubSelect) {
sql = ((SubSelect) expr).getSelectBody().toString();
} else if (expr instanceof BinaryExpression) {
sql = "select 1 from t where " + expr;
} else {
throw new UnsupportedOperationException("不支持的表达式:" + expr);
}
SqlRequest request = SqlRequests.of(sql);
ReactorQL ql = ReactorQL
.builder()
.sql(request.getSql())
.build();
return new ReactorQLProcessor(request, ql);
}
private static ExistsProcessor createExprProcessor(String str) {
if (str.startsWith("select") || str.startsWith("SELECT")) {
SqlRequest request = SqlRequests.of(str);
ReactorQL ql = ReactorQL
.builder()
.sql(request.getSql())
.build();
return new ReactorQLProcessor(request, ql);
}
ExistsSpec spec = new ExistsSpec();
spec.setFilter(TermExpressionParser.parse(str));
return spec.compile();
}
@Override
public Function<ReactorQLRecord, Publisher<?>> createMapper(Expression expression, ReactorQLMetadata metadata) {
net.sf.jsqlparser.expression.Function func = ((net.sf.jsqlparser.expression.Function) expression);
ExpressionList args = func.getParameters();
if (args == null || args.getExpressions() == null || args.getExpressions().size() < 2) {
throw new UnsupportedOperationException("complex_exists函数参数错误");
}
Function<ReactorQLRecord, Mono<ExistsProcessor>> processorMapper;
List<Function<ReactorQLRecord, Publisher<?>>> mappers = args
.getExpressions()
.stream()
.skip(1)
.map(expr -> ValueMapFeature.createMapperNow(expr, metadata))
.collect(Collectors.toList());
Expression firstExpr = args.getExpressions().get(0);
//以字符串来定义 complex_exists('name is test')
if (firstExpr instanceof StringValue) {
Mono<ExistsProcessor> processor = Mono.just(createExprProcessor(((StringValue) firstExpr).getValue()));
processorMapper = record -> processor;
} else if (firstExpr instanceof NumericBind) {
int argIndex = ((NumericBind) firstExpr).getBindId() - 1;
processorMapper = record -> Mono.justOrEmpty(
record.getContext().getParameter(argIndex).map(this::transformProcessor)
);
} else if (firstExpr instanceof JdbcParameter) {
int argIndex = ((JdbcParameter) firstExpr).getIndex() - 1;
processorMapper = record -> Mono.justOrEmpty(
record.getContext().getParameter(argIndex).map(this::transformProcessor)
);
}
// complex_exists(select 1 from dual where ,arr)
else if (firstExpr instanceof JdbcNamedParameter) {
String name = ((JdbcNamedParameter) firstExpr).getName();
processorMapper = record -> Mono.justOrEmpty(
transformProcessor(record.getContext().getParameter(name))
);
} else {
Mono<ExistsProcessor> processor = Mono.just(createExprProcessor(firstExpr));
processorMapper = record -> processor;
}
return record ->
processorMapper
.apply(record)
.flatMap(processor -> {
if (mappers.size() == 1) {
return processor.apply(
record,
mappers.get(0).apply(record));
}
return processor.apply(
record,
Flux.fromIterable(mappers)
.flatMap(mapper -> mapper.apply(record))
);
});
}
@Override
public String getId() {
return FeatureId.ValueMap.of(function).getId();
}
private ExistsProcessor transformProcessor(Object value) {
if (value instanceof ExistsProcessor) {
return (ExistsProcessor) value;
}
if (value instanceof ExistsSpec) {
return ((ExistsSpec) value).compile();
}
return null;
}
public interface ExistsProcessor extends BiFunction<ReactorQLRecord, Publisher<?>, Mono<Boolean>> {
@Override
Mono<Boolean> apply(ReactorQLRecord record, Publisher<?> publisher);
Mono<Boolean> apply(ReactorQLRecord record, List<?> list);
}
@AllArgsConstructor
static class ReactorQLProcessor implements ExistsProcessor {
private final SqlRequest request;
private final ReactorQL ql;
public Mono<Boolean> apply(ReactorQLRecord record, List<?> list) {
Flux<?> data = Flux
.fromIterable(list)
.map(v -> {
Map<String, Object> _data = Maps.newHashMapWithExpectedSize(3);
_data.put(COL_ROW,record.getRecord());
_data.put(COL_ELEMENT, v);
_data.put(COL_SELF, list);
return _data;
});
DefaultReactorQLContext ctx = new DefaultReactorQLContext((t) -> data);
for (Object parameter : request.getParameters()) {
ctx.bind(parameter);
}
return ql.start(ctx)
.hasElements();
}
@Override
public Mono<Boolean> apply(ReactorQLRecord record, Publisher<?> publisher) {
if (publisher instanceof Mono) {
return ((Mono<?>) publisher)
.map(ConverterUtils::convertToList)
.flatMap(list -> apply(record, list));
}
return Flux
.from(publisher)
.as(CastUtils::flatStream)
.collectList()
.flatMap(list -> apply(record, list));
}
@Override
public String toString() {
return request.toNativeSql();
}
}
private static FunctionTerm convertFunctionTerm(Object obj) {
if (obj instanceof FunctionTerm) {
return (FunctionTerm) obj;
}
if (obj instanceof Map) {
return convertFunctionTerm(new JSONObject((Map) obj));
}
throw new UnsupportedOperationException("不支持的类型:" + obj);
}
private static FunctionTerm convertFunctionTerm(JSONObject obj) {
FunctionTerm term = new FunctionTerm();
FastBeanCopier.copy(obj, term, "terms");
JSONArray terms = obj.getJSONArray("terms");
if (terms != null) {
terms
.forEach(o -> {
term.addTerm(convertFunctionTerm((JSONObject) o));
});
}
return term;
}
@Getter
@Setter
public static class ExistsSpec implements Jsonable {
//过滤
private List<Term> filter;
//聚合
private List<FunctionTerm> aggregation;
@Override
public void fromJson(JSONObject json) {
FastBeanCopier.copy(json, this, "aggregation");
JSONArray aggregation = json.getJSONArray("aggregation");
if (aggregation != null) {
this.aggregation = aggregation
.stream()
.map(ComplexExistsFunction::convertFunctionTerm)
.collect(Collectors.toList());
}
}
//todo 其他简便配置的方式?: 任意满足,全部满足等
public void walkTerms(Consumer<Term> consumer) {
if (filter != null) {
filter.forEach(consumer);
}
if (aggregation != null) {
aggregation.forEach(consumer);
}
}
private void applyAggregation(BatchSqlFragments fragments,
AtomicInteger count,
FunctionTerm agg,
Map<String, String> distinct,
Map<Term, String> aliasMapping) {
// 大小写都支持
AggregationSupport support = AggregationSupport.getNow(agg.getFunction());
FunctionColumn col = new FunctionColumn();
col.setFunction(agg.getFunction());
col.setColumn("this['" + agg.getColumn() + "']");
col.setOpts(agg.getOpts() == null ? null : Maps.transformValues(agg.getOpts(), Object.class::cast));
SqlFragments frg = support.createSql(col);
String sqlStr = frg.toRequest().toNativeSql();
String alias = distinct.get(frg.toRequest().toNativeSql());
if (alias == null) {
alias = "_agg_" + count.incrementAndGet();
distinct.put(sqlStr, alias);
aliasMapping.put(agg, alias);
if (count.get() > 1) {
fragments.addSql(",");
}
fragments.add(frg).addSql(alias);
}
if (CollectionUtils.isNotEmpty(agg.getTerms())) {
for (Term term : agg.getTerms()) {
if (term instanceof FunctionTerm) {
applyAggregation(fragments,
count,
((FunctionTerm) term),
distinct,
aliasMapping);
}
}
}
}
private List<Term> createHavingTerm(List<? extends Term> aggregation,
Map<Term, String> aliasMapping) {
List<Term> terms = new ArrayList<>(aggregation.size());
for (Term functionTerm : aggregation) {
Term term = new Term();
term.setColumn(aliasMapping.getOrDefault(functionTerm, functionTerm.getColumn()));
term.setTermType(functionTerm.getTermType());
term.setValue(functionTerm.getValue());
term.setOptions(functionTerm.getOptions());
term.setType(Term.Type.and);
if (CollectionUtils.isNotEmpty(functionTerm.getTerms())) {
term.setTerms(createHavingTerm(functionTerm.getTerms(), aliasMapping));
}
terms.add(term);
}
return terms;
}
public ExistsProcessor compile() {
SqlFragments cols;
SqlFragments having;
if (CollectionUtils.isNotEmpty(aggregation)) {
Map<String, String> distinct = new HashMap<>();
Map<Term, String> aliasMapping = new HashMap<>();
BatchSqlFragments cols_ = new BatchSqlFragments();
AtomicInteger count = new AtomicInteger(1);
count.incrementAndGet();
cols_.addSql(COL_SELF,",",COL_ELEMENT);
for (FunctionTerm functionTerm : aggregation) {
applyAggregation(cols_, count, functionTerm, distinct, aliasMapping);
}
cols = cols_;
List<Term> havingTerms = createHavingTerm(aggregation, aliasMapping);
having = ReactorUtils.createFilterSql(havingTerms);
} else {
cols = SqlFragments.ONE;
having = EmptySqlFragments.INSTANCE;
}
SqlFragments where = ReactorUtils.createFilterSql(filter);
BatchSqlFragments fragments = new BatchSqlFragments();
if (having.isNotEmpty()) {
fragments.addSql("select 1 from (");
}
fragments.addSql("select").addFragments(cols).addSql("from t");
if (where.isNotEmpty()) {
fragments.addSql("where").addFragments(where);
}
if (having.isNotEmpty()) {
fragments.add(SqlFragments.RIGHT_BRACKET)
.add(SqlFragments.WHERE)
.addFragments(having);
}
SqlRequest request = fragments.toRequest();
ReactorQL ql = ReactorQL.builder()
.sql(request.getSql())
.build();
return new ReactorQLProcessor(request, ql);
}
}
}

View File

@ -0,0 +1,73 @@
package org.jetlinks.community.reactorql.term;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.BatchSqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.jetlinks.community.reactorql.impl.ComplexExistsFunction;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.ArrayType;
import java.util.function.BiFunction;
/**
* @author zhangji 2025/1/23
* @since 2.3
*/
public class ExistsTermSupport implements TermTypeSupport {
static {
ComplexExistsFunction.register();
}
@Override
public String getType() {
return "complex_exists";
}
@Override
public String getName() {
return "满足";
}
@Override
public boolean isSupported(DataType type) {
return type instanceof ArrayType;
}
@Override
public Term refactorTerm(String tableName,
Term term,
BiFunction<String, Term, Term> refactor) {
Term t = refactor.apply(tableName, term);
ComplexExistsFunction.ExistsSpec existsSpec = ComplexExistsFunction.createExistsSpec(t.getValue());
existsSpec.walkTerms(__term -> {
String col = __term.getColumn();
//使用 _row 获取原始行数据
refactor.apply(ComplexExistsFunction.COL_ROW, __term);
//由原始条件指定的列名为准,: _element.this _element.num
__term.setColumn(col);
});
t.setValue(existsSpec);
return t;
}
@Override
public SqlFragments createSql(String column, Object value, Term term) {
ComplexExistsFunction.ExistsSpec existsSpec = ComplexExistsFunction.createExistsSpec(value);
BatchSqlFragments fragments = new BatchSqlFragments();
fragments
.addSql("complex_exists(?,", column, ")")
.addParameter(existsSpec.compile());
return fragments;
}
}

View File

@ -9,6 +9,7 @@ import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
public interface TermTypeSupport {
@ -40,6 +41,18 @@ public interface TermTypeSupport {
*/
SqlFragments createSql(String column, Object value, Term term);
/**
* 重构条件
*
* @param tableName 表名
* @param term 条件
* @param refactor 重构函数
* @return 重构后的条件
*/
default Term refactorTerm(String tableName, Term term, BiFunction<String,Term,Term> refactor){
return refactor.apply(tableName,term);
}
/**
* 判断是否已经过时,过时的条件应当不可选择.
*

View File

@ -18,6 +18,7 @@ public class TermTypes {
for (FixedTermTypeSupport value : FixedTermTypeSupport.values()) {
register(value);
}
register(new ExistsTermSupport());
}
public static void register(TermTypeSupport support){
@ -35,6 +36,9 @@ public class TermTypes {
}
public static Optional<TermTypeSupport> lookupSupport(String type) {
if (type == null) {
return Optional.empty();
}
return Optional.ofNullable(supports.get(type));
}
}

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.utils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.AbstractTermsFragmentBuilder;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.reactorql.term.FixedTermTypeSupport;
@ -132,6 +133,13 @@ public class ReactorUtils {
return createFilter(terms, converter, (arg, data) -> arg);
}
public static SqlFragments createFilterSql(List<Term> terms) {
if (CollectionUtils.isEmpty(terms)) {
return EmptySqlFragments.INSTANCE;
}
return termBuilder.createTermFragments(null, terms);
}
@SuppressWarnings("all")
public static <T> Function<T, Mono<Boolean>> createFilter(List<Term> terms,
Function<T, Map<String, Object>> converter,

View File

@ -84,6 +84,25 @@ public class VariableSource implements Serializable {
return variableSource;
}
public static Object resolveStatic(Object value, Map<String, Object> ctx) {
if (value instanceof VariableSource) {
return ((VariableSource) value).resolveStatic(ctx);
}
if (value instanceof Map) {
Map<?, ?> mapVal = ((Map<?, ?>) value);
if (!mapVal.containsKey("$noVariable")) {
Object sourceName = mapVal.get("source");
if (sourceName != null && VariableSource.Source.of(String.valueOf(sourceName)).isPresent()) {
VariableSource source = FastBeanCopier.copy(mapVal, new VariableSource());
if (source.getSource() != null) {
return source.resolveStatic(ctx);
}
}
}
}
return value;
}
public static VariableSource of(Object value) {
if (value instanceof VariableSource) {
return ((VariableSource) value);

View File

@ -1,8 +1,12 @@
package org.jetlinks.community.rule.engine.commons;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
import org.hswebframework.web.api.crud.entity.TermExpressionParser;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.I18nSupportException;
import org.jetlinks.community.relation.utils.VariableSource;
import org.jetlinks.community.utils.ReactorUtils;
import org.jetlinks.reactor.ql.utils.CastUtils;
@ -14,6 +18,7 @@ import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -35,14 +40,20 @@ import java.util.stream.Collectors;
* }
*
* }</pre>
* <p>
* 建议使用{@link TermsConditionEvaluator#createCondition(List)}来创建用户输入的条件
*
* @author zhouhao
* @see TermsConditionEvaluator#createCondition(List)
* @since 2.0
*/
public class TermsConditionEvaluator implements ConditionEvaluatorStrategy {
public static final String TYPE = "terms";
public static Condition createCondition(List<Term> terms) {
//校验条件是否合法
validateUnsafeTerm(terms);
Condition condition = new Condition();
condition.setType(TYPE);
condition.setConfiguration(Collections.singletonMap("terms", terms));
@ -59,6 +70,49 @@ public class TermsConditionEvaluator implements ConditionEvaluatorStrategy {
return false;
}
public static void validateUnsafeTerm(List<Term> term) {
if (CollectionUtils.isNotEmpty(term)) {
term.forEach(TermsConditionEvaluator::validateUnsafeTerm);
}
}
public static void validateUnsafeTerm(Term term) {
Object val = term.getValue();
//如果值是Map,并且包含sql字段,说明是有用户输入了sql参数.
//程序构造应当使用NativeSql
if (val instanceof Map) {
@SuppressWarnings("all")
JSONObject map = new JSONObject((Map) val);
if (map.containsKey("sql")) {
throw new I18nSupportException("error.illegal_term_value");
}
}
validateUnsafeTerm(term.getTerms());
}
private Term convertTermValue(Term term) {
Object val = term.getValue();
if (val instanceof Map) {
@SuppressWarnings("all")
JSONObject map = new JSONObject((Map) val);
//nativeSql,这里存在缺陷? 用户在特殊情况下可以传入任意sql(不使用 createCondition 方法创建的条件)
//但是此sql使用reactorQL执行,不会直接执行到数据库,所以不会有sql注入风险.
if (map.containsKey("sql")) {
String sql = map.getString("sql");
Object[] params = map.containsKey("parameters") ? map
.getJSONArray("parameters")
.toArray() : new Object[0];
term.setValue(NativeSql.of(sql, params));
}
}
if (CollectionUtils.isNotEmpty(term.getTerms())) {
for (Term termTerm : term.getTerms()) {
convertTermValue(termTerm);
}
}
return term;
}
@Override
public Function<RuleData, Mono<Boolean>> prepare(Condition condition) {
List<Term> terms = condition
@ -66,7 +120,7 @@ public class TermsConditionEvaluator implements ConditionEvaluatorStrategy {
.map(val -> CastUtils
.castArray(val)
.stream()
.map(obj -> FastBeanCopier.copy(obj, new Term()))
.map(obj -> convertTermValue(FastBeanCopier.copy(obj, new Term())))
.collect(Collectors.toList()))
.orElseGet(() -> condition
.getConfig("where")
@ -77,6 +131,6 @@ public class TermsConditionEvaluator implements ConditionEvaluatorStrategy {
return ReactorUtils.createFilter(terms,
RuleDataHelper::toContextMap,
(arg, map) -> VariableSource.of(arg).resolveStatic(map));
VariableSource::resolveStatic);
}
}

View File

@ -0,0 +1,265 @@
package org.jetlinks.community.rule.engine.commons.things;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.core.Values;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.things.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Getter
@Setter
public class ThingInfoSpec {
private Set<String> configs;
private Set<String> properties;
private Set<String> tags;
private Set<String> events;
public Mono<Map<String, Object>> read(Thing thing, ThingsDataManager dataManager, long time) {
Map<String, Object> data = Maps.newConcurrentMap();
Mono<?> task = Mono.empty();
if (CollectionUtils.isNotEmpty(properties)) {
task = this
.readProperties(dataManager, thing, time)
.doOnNext(v -> data.put("properties", v));
}
if (CollectionUtils.isNotEmpty(configs)) {
task = task
.then(readConfigs(thing))
.doOnNext(v -> data.put("configs", v));
}
if (CollectionUtils.isNotEmpty(tags)) {
task = task
.then(readTags(dataManager, thing, time))
.doOnNext(v -> data.put("tags", v));
}
if (CollectionUtils.isNotEmpty(events)) {
task = task
.then(readEvent(dataManager, thing, time))
.doOnNext(v -> data.put("events", v));
}
return task.thenReturn(data);
}
private Mono<Map<String, Object>> readConfigs(Thing thing) {
return thing
.getSelfConfigs(configs)
.map(Values::getAllValues);
}
private Mono<Map<String, Object>> readEvent(ThingsDataManager dataManager, Thing thing, long baseTime) {
if (CollectionUtils.isEmpty(events)) {
return Mono.empty();
}
int size = tags.size();
if (size == 1) {
return dataManager
.getLastEvent(thing.getType().getId(), thing.getId(), events.iterator().next(), baseTime)
.map(p -> converterEvent(p, Maps.newHashMapWithExpectedSize(1)));
}
Map<String, Object> tags = Maps.newConcurrentMap();
return Flux
.fromIterable(this.tags)
.flatMap(property -> dataManager
.getLastEvent(thing.getType().getId(), thing.getId(), property, baseTime)
.map(p -> converterEvent(p, tags)))
.then(Mono.just(tags));
}
private Mono<Map<String, Object>> readTags(ThingsDataManager dataManager, Thing thing, long baseTime) {
if (CollectionUtils.isEmpty(tags)) {
return Mono.empty();
}
int size = tags.size();
if (size == 1) {
return dataManager
.getLastTag(thing.getType().getId(), thing.getId(), tags.iterator().next(), baseTime)
.map(p -> converterTag(p, Maps.newHashMapWithExpectedSize(1)));
}
Map<String, Object> tags = Maps.newConcurrentMap();
return Flux
.fromIterable(this.tags)
.flatMap(property -> dataManager
.getLastTag(thing.getType().getId(), thing.getId(), property, baseTime)
.map(p -> converterTag(p, tags)))
.then(Mono.just(tags));
}
private Mono<Map<String, Object>> readProperties(ThingsDataManager dataManager, Thing thing, long baseTime) {
if (CollectionUtils.isEmpty(properties)) {
return Mono.empty();
}
int size = properties.size();
if (size == 1) {
return dataManager
.getLastProperty(thing.getType().getId(), thing.getId(), properties.iterator().next(), baseTime)
.map(p -> convertProperty(p, Maps.newHashMapWithExpectedSize(1)));
}
Map<String, Object> properties = Maps.newConcurrentMap();
return Flux
.fromIterable(this.properties)
.flatMap(property -> dataManager
.getLastProperty(thing.getType().getId(), thing.getId(), property, baseTime)
.map(p -> convertProperty(p, properties)))
.then(Mono.just(properties));
}
private Map<String, Object> converterEvent(ThingEvent event, Map<String, Object> container) {
Map<String, Object> val = Maps.newHashMapWithExpectedSize(3);
val.put("data", event.getData());
val.put("timestamp", event.getTimestamp());
container.put(event.getEvent(), val);
return container;
}
private Map<String, Object> converterTag(ThingTag tag, Map<String, Object> container) {
Map<String, Object> val = Maps.newHashMapWithExpectedSize(3);
val.put("value", tag.getValue());
val.put("timestamp", tag.getTimestamp());
container.put(tag.getTag(), val);
return container;
}
private Map<String, Object> convertProperty(ThingProperty property, Map<String, Object> container) {
Map<String, Object> val = Maps.newHashMapWithExpectedSize(3);
val.put("value", property.getValue());
val.put("timestamp", property.getTimestamp());
val.put("state", property.getState());
container.put(property.getProperty(), val);
return container;
}
protected ObjectType createConfigType() {
return new ObjectType();
}
protected ObjectType createPropertiesType(ThingMetadata metadata) {
ObjectType type = new ObjectType();
for (PropertyMetadata tag : metadata.getProperties()) {
type.addProperty(
tag.getId(),
tag.getName(),
new ObjectType()
.addProperty("value",
LocaleUtils.resolveMessage("message.thing.info.spec.property.value", "属性值"),
tag.getValueType())
.addProperty("state",
LocaleUtils.resolveMessage("message.thing.info.spec.property.value", "属性状态"),
StringType.GLOBAL)
.addProperty("timestamp",
LocaleUtils.resolveMessage("message.thing.info.spec.property.timestamp", "时间戳"),
DateTimeType.GLOBAL)
);
}
return type;
}
protected ObjectType createTagsType(ThingMetadata metadata) {
ObjectType type = new ObjectType();
for (PropertyMetadata tag : metadata.getTags()) {
type.addProperty(
tag.getId(),
tag.getName(),
new ObjectType()
.addProperty("value",
LocaleUtils.resolveMessage("message.thing.info.spec.tag.value", "标签值"),
tag.getValueType())
.addProperty("timestamp",
LocaleUtils.resolveMessage("message.thing.info.spec.tag.timestamp", "时间戳"),
DateTimeType.GLOBAL)
);
}
return type;
}
protected ObjectType createEventsType(ThingMetadata metadata) {
ObjectType type = new ObjectType();
for (EventMetadata event : metadata.getEvents()) {
type.addProperty(
event.getId(),
event.getName(),
new ObjectType()
.addProperty("data",
LocaleUtils.resolveMessage("message.thing.info.spec.event.data", "时间数据"),
event.getType())
.addProperty("timestamp",
LocaleUtils.resolveMessage("message.thing.info.spec.event.timestamp", "时间戳"),
DateTimeType.GLOBAL)
);
}
return type;
}
public ObjectType createOutputType(ThingMetadata metadata) {
ObjectType type = new ObjectType();
//基本信息
{
ObjectType _type = createConfigType();
if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) {
type.addProperty("configs",
LocaleUtils.resolveMessage("message.thing.info.spec.configs", "基本信息"),
_type);
}
}
//属性
{
ObjectType _type = createPropertiesType(metadata);
if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) {
type.addProperty("properties",
LocaleUtils.resolveMessage("message.thing.info.spec.properties", "物模型属性"),
_type);
}
}
//标签
{
ObjectType _type = createTagsType(metadata);
if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) {
type.addProperty("tags",
LocaleUtils.resolveMessage("message.thing.info.spec.tags", "标签信息"),
_type);
}
}
//事件
{
ObjectType _type = createEventsType(metadata);
if (_type != null && CollectionUtils.isNotEmpty(_type.getProperties())) {
type.addProperty("events",
LocaleUtils.resolveMessage("message.thing.info.spec.events", "事件信息"),
_type);
}
}
return type;
}
public Map<String, Object> toMap() {
return FastBeanCopier.copy(this, new HashMap<>());
}
}

View File

@ -2,7 +2,10 @@ package org.jetlinks.community.rule.engine.configuration;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.rule.engine.commons.TermsConditionEvaluator;
import org.jetlinks.community.rule.engine.executor.DeviceSelectorBuilder;
import org.jetlinks.community.rule.engine.executor.device.DeviceDataTaskExecutorProvider;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.rule.engine.api.RuleEngine;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
@ -104,4 +107,10 @@ public class RuleEngineConfiguration {
return new DefaultRuleEngine(scheduler);
}
@Bean
public DeviceDataTaskExecutorProvider deviceDataTaskExecutorProvider(ThingsDataManager dataManager,
DeviceSelectorBuilder selectorBuilder) {
return new DeviceDataTaskExecutorProvider(dataManager, selectorBuilder);
}
}

View File

@ -0,0 +1,144 @@
package org.jetlinks.community.rule.engine.executor.device;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.relation.utils.VariableSource;
import org.jetlinks.community.rule.engine.commons.things.ThingInfoSpec;
import org.jetlinks.community.rule.engine.executor.DeviceSelector;
import org.jetlinks.community.rule.engine.executor.DeviceSelectorBuilder;
import org.jetlinks.core.device.DeviceState;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Map;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@AllArgsConstructor
public class DeviceDataTaskExecutorProvider implements TaskExecutorProvider {
public static final String ID = "device-info";
private final ThingsDataManager dataManager;
private final DeviceSelectorBuilder selectorBuilder;
@Override
public String getExecutor() {
return ID;
}
@Override
public Mono<TaskExecutor> createTask(ExecutionContext context) {
return Mono.just(new ThingInfoTaskExecutor(context, dataManager, selectorBuilder));
}
static class ThingInfoTaskExecutor extends FunctionTaskExecutor {
private Config config;
private DeviceSelector selector;
private final ThingsDataManager dataManager;
private final DeviceSelectorBuilder selectorBuilder;
public ThingInfoTaskExecutor(ExecutionContext context,
ThingsDataManager dataManager,
DeviceSelectorBuilder selectorBuilder) {
super("获取设备信息", context);
this.dataManager = dataManager;
this.selectorBuilder = selectorBuilder;
reload();
}
@Override
public void reload() {
this.config = createConfig();
this.selector = selectorBuilder.createSelector(config.getSelector());
}
@Override
protected Publisher<RuleData> apply(RuleData input) {
Map<String, Object> ctx = RuleDataHelper.toContextMap(input);
long ts = config.resolveTime(ctx);
return selector
.select(ctx)
.flatMap(device -> config
.read(device, dataManager, ts)
.map(output -> context.newRuleData(input, output)));
}
private Config createConfig() {
return FastBeanCopier.copy(context.getJob().getConfiguration(), new Config());
}
}
@Getter
@Setter
public static class Config extends ThingInfoSpec {
@Schema(title = "设备选择器")
private DeviceSelectorSpec selector;
@Schema(title = "基准时间来源")
private VariableSource baseTime;
@Override
protected ObjectType createConfigType() {
return new ObjectType()
.addProperty("state",
LocaleUtils.resolveMessage("message.thing.info.spec.config.state", "在线状态"),
new EnumType()
.addElement(EnumType.Element.of(
String.valueOf(DeviceState.online),
LocaleUtils.resolveMessage("message.thing.info.spec.config.state-online", "在线")))
.addElement(EnumType.Element.of(
String.valueOf(DeviceState.offline),
LocaleUtils.resolveMessage("message.thing.info.spec.config.state-offline", "离线")))
)
.addProperty("onlineTime",
LocaleUtils.resolveMessage("message.thing.info.spec.config.onlineTime", "上一次在线时间"),
DateTimeType.GLOBAL
)
.addProperty("offlineTime",
LocaleUtils.resolveMessage("message.thing.info.spec.config.offlineTime", "上一次离线时间"),
DateTimeType.GLOBAL
)
.addProperty(PropertyConstants.deviceName.getKey(),
LocaleUtils.resolveMessage("message.thing.info.spec.config.deviceName", "设备名称"),
StringType.GLOBAL
)
.addProperty(PropertyConstants.productName.getKey(),
LocaleUtils.resolveMessage("message.thing.info.spec.config.productName", "产品名称"),
StringType.GLOBAL
);
}
long resolveTime(Map<String, Object> ctx) {
if (baseTime != null) {
Number time = CastUtils.castNumber(baseTime.resolveStatic(ctx), (ignore) -> null);
if (time != null) {
return time.longValue();
}
}
return System.currentTimeMillis();
}
}
}

View File

@ -17,10 +17,11 @@ import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.LongType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.utils.FluxUtils;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -28,7 +29,6 @@ import java.time.Duration;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@Component
@Slf4j
@ -67,7 +67,6 @@ public class AlarmProvider implements SubscriberProvider {
}
protected Mono<Subscriber> doCreateSubscriber(String id,
Authentication authentication,
String topic) {
@ -76,21 +75,26 @@ public class AlarmProvider implements SubscriberProvider {
.as(FluxUtils.distinct(Notify::getDataId, Duration.ofSeconds(10))));
}
protected String getAlarmId( Map<String, Object> config) {
protected String getAlarmId(Map<String, Object> config) {
ValueObject configs = ValueObject.of(config);
return configs.getString("alarmConfigId").orElse("*");
}
private Flux<Notify> createSubscribe(Locale locale,
String id,
String[] topics) {
Subscription.Feature[] features = new Subscription.Feature[]{Subscription.Feature.local};
return Flux
.defer(() -> this
String[] topic) {
return this
.eventBus
.subscribe(Subscription.of("alarm:" + id, topics, features))
.map(msg -> {
JSONObject json = msg.bodyToJson();
.subscribe(
Subscription
.builder()
.justLocal()
.subscriberId("alarm:" + id)
.topics(topic)
.build())
.mapNotNull(payload -> {
try {
JSONObject json = payload.bodyToJson();
return Notify.of(
getNotifyMessage(locale, json),
//告警记录ID
@ -99,9 +103,12 @@ public class AlarmProvider implements SubscriberProvider {
"alarm",
json
);
}));
} catch (Throwable error) {
log.warn("handle alarm notify error", error);
}
return null;
});
}
private static String getNotifyMessage(Locale locale, JSONObject json) {
@ -109,7 +116,7 @@ public class AlarmProvider implements SubscriberProvider {
TargetType targetType = TargetType.of(json.getString("targetType"));
String targetName = json.getString("targetName");
String alarmName = json.getString("alarmConfigName");
if (targetType == TargetType.other) {
if (targetType == TargetType.scene) {
message = String.format("[%s]发生告警:[%s]!", targetName, alarmName);
} else {
message = String.format("%s[%s]发生告警:[%s]!", targetType.getText(), targetName, alarmName);
@ -123,7 +130,11 @@ public class AlarmProvider implements SubscriberProvider {
return Flux.just(
SimplePropertyMetadata.of("targetType", "告警类型", StringType.GLOBAL),
SimplePropertyMetadata.of("alarmConfigName", "告警名称", StringType.GLOBAL),
SimplePropertyMetadata.of("targetName", "目标名称", StringType.GLOBAL)
SimplePropertyMetadata.of("targetName", "告警目标名称", StringType.GLOBAL),
SimplePropertyMetadata.of("level", "告警级别", IntType.GLOBAL),
SimplePropertyMetadata.of("alarmTime", "告警时间", LongType.GLOBAL),
SimplePropertyMetadata.of("sourceType", "告警源类型", StringType.GLOBAL),
SimplePropertyMetadata.of("sourceName", "告警源名称", StringType.GLOBAL)
);
}
@ -132,7 +143,7 @@ public class AlarmProvider implements SubscriberProvider {
enum TargetType {
device("设备"),
product("产品"),
other("其它");
scene("场景");
private final String text;
@ -142,7 +153,7 @@ public class AlarmProvider implements SubscriberProvider {
return value;
}
}
return TargetType.other;
return TargetType.scene;
}
}
}

View File

@ -12,9 +12,9 @@ import java.util.Map;
@Component
@Slf4j
public class AlarmOtherProvider extends AlarmProvider {
public class AlarmSceneProvider extends AlarmProvider {
public AlarmOtherProvider(EventBus eventBus) {
public AlarmSceneProvider(EventBus eventBus) {
super(eventBus);
}
@ -25,12 +25,12 @@ public class AlarmOtherProvider extends AlarmProvider {
@Override
public String getName() {
return "其他告警";
return "场景告警";
}
@Override
public Mono<Subscriber> createSubscriber(String id, Authentication authentication, Map<String, Object> config) {
String topic = Topics.alarm(TargetType.other.name(), "*", getAlarmId(config));
String topic = Topics.alarm(TargetType.scene.name(), "*", getAlarmId(config));
return doCreateSubscriber(id, authentication, topic);
}

View File

@ -16,6 +16,10 @@ public interface AlarmTarget {
Flux<AlarmTargetInfo> convert(AlarmData data);
default boolean isSupported(String trigger) {
return true;
};
static AlarmTarget of(String type) {
return AlarmTargetSupplier
.get()

View File

@ -23,7 +23,6 @@ import java.io.Serializable;
import java.util.function.Function;
@AllArgsConstructor
@Component
@Slf4j
public class AlarmTaskExecutorProvider implements TaskExecutorProvider {
public static final String executor = "alarm";

View File

@ -13,12 +13,6 @@ public class CustomAlarmTargetSupplier implements AlarmTargetSupplier {
public static CustomAlarmTargetSupplier defaultSupplier = new CustomAlarmTargetSupplier();
static {
register(new ProductAlarmTarget());
register(new DeviceAlarmTarget());
register(new OtherAlarmTarget());
}
public static void register(AlarmTarget target) {
targets.put(target.getType(), target);
}

View File

@ -1,5 +1,7 @@
package org.jetlinks.community.rule.engine.alarm;
import org.jetlinks.community.rule.engine.scene.internal.triggers.DeviceTriggerProvider;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.util.Map;
@ -7,7 +9,7 @@ import java.util.Map;
/**
* @author bestfeng
*/
@Component
public class DeviceAlarmTarget extends AbstractAlarmTarget {
@Override
@ -33,4 +35,9 @@ public class DeviceAlarmTarget extends AbstractAlarmTarget {
return Flux.just(AlarmTargetInfo.of(deviceId, deviceName, getType()));
}
@Override
public boolean isSupported(String trigger) {
return DeviceTriggerProvider.PROVIDER.equals(trigger);
};
}

View File

@ -1,5 +1,7 @@
package org.jetlinks.community.rule.engine.alarm;
import org.jetlinks.community.rule.engine.scene.internal.triggers.DeviceTriggerProvider;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.util.Map;
@ -7,7 +9,7 @@ import java.util.Map;
/**
* @author bestfeng
*/
@Component
public class ProductAlarmTarget extends AbstractAlarmTarget {
@Override
@ -29,5 +31,9 @@ public class ProductAlarmTarget extends AbstractAlarmTarget {
return Flux.just(AlarmTargetInfo.of(productId, productName, getType()));
}
@Override
public boolean isSupported(String trigger) {
return DeviceTriggerProvider.PROVIDER.equals(trigger);
};
}

View File

@ -1,21 +1,24 @@
package org.jetlinks.community.rule.engine.alarm;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
/**
* @author bestfeng
*/
@Component
public class SceneAlarmTarget implements AlarmTarget {
public class OtherAlarmTarget implements AlarmTarget {
public static final String TYPE = "scene";
@Override
public String getType() {
return "other";
return TYPE;
}
@Override
public String getName() {
return "其它";
return "场景";
}
@Override
@ -23,7 +26,8 @@ public class OtherAlarmTarget implements AlarmTarget {
return Flux.just(AlarmTargetInfo
.of(data.getRuleId(),
data.getRuleName(),
getType()));
getType())
.withSource(TYPE, data.getRuleId(), data.getRuleName()));
}
}

View File

@ -0,0 +1,26 @@
package org.jetlinks.community.rule.engine.configuration;
import org.jetlinks.community.rule.engine.alarm.AlarmRuleHandler;
import org.jetlinks.community.rule.engine.alarm.AlarmTarget;
import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider;
import org.jetlinks.community.rule.engine.alarm.CustomAlarmTargetSupplier;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@AutoConfiguration
public class AlarmTargetConfiguration {
@Bean
public AlarmTaskExecutorProvider alarmTaskExecutorProvider(AlarmRuleHandler alarmHandler,
ObjectProvider<AlarmTarget> targetProviders) {
targetProviders.forEach(CustomAlarmTargetSupplier::register);
return new AlarmTaskExecutorProvider(alarmHandler);
}
}

View File

@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.rule.engine.utils.TermColumnUtils;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import org.jetlinks.core.message.property.ReadPropertyMessage;
@ -129,18 +130,21 @@ public class DeviceOperation {
|| operator == Operator.reportProperty
|| operator == Operator.writeProperty) {
terms.addAll(
this.createTerm(
TermColumnUtils.createTerm(
metadata.getProperties(),
(property, column) -> column.setChildren(createTermColumn("properties", property, true, PropertyValueType.values())),
(property, column) -> column.setChildren(TermColumnUtils.createTermColumn("properties",
property,
true,
PropertyValueType.values())),
LocaleUtils.resolveMessage("message.device_metadata_property", "属性"))
);
} else {
//其他操作只能获取属性的上一次的值
terms.addAll(
this.createTerm(
TermColumnUtils.createTerm(
metadata.getProperties(),
(property, column) -> column.setChildren(
createTermColumn(
TermColumnUtils.createTermColumn(
"properties",
property,
true,
@ -151,7 +155,7 @@ public class DeviceOperation {
//事件上报
if (operator == Operator.reportEvent) {
terms.addAll(
this.createTerm(
TermColumnUtils.createTerm(
metadata.getEvent(eventId)
.<List<PropertyMetadata>>map(event -> Collections
.singletonList(
@ -160,13 +164,13 @@ public class DeviceOperation {
event.getType())
))
.orElse(Collections.emptyList()),
(property, column) -> column.setChildren(createTermColumn("event", property, false)),
(property, column) -> column.setChildren(TermColumnUtils.createTermColumn("event", property, false)),
LocaleUtils.resolveMessage("message.device_metadata_event", "事件")));
}
//调用功能
if (operator == Operator.invokeFunction) {
terms.addAll(
this.createTerm(
TermColumnUtils.createTerm(
metadata.getFunction(functionId)
//过滤掉异步功能和无返回值功能的参数输出
.filter(fun -> !fun.isAsync() && !(fun.getOutput() instanceof UnknownType))
@ -176,76 +180,13 @@ public class DeviceOperation {
meta.getOutput()))
)
.orElse(Collections.emptyList()),
(property, column) -> column.setChildren(createTermColumn("function", property, false)),
(property, column) -> column.setChildren(TermColumnUtils.createTermColumn("function", property, false)),
LocaleUtils.resolveMessage("message.device_metadata_function", "功能调用")));
}
return TermColumn.refactorTermsInfo("properties", terms);
}
private String resolveI18n(String key, String name) {
return LocaleUtils.resolveMessage(key, name);
}
private List<TermColumn> createTermColumn(String prefix, PropertyMetadata property, boolean last, PropertyValueType... valueTypes) {
//对象类型嵌套
if (property.getValueType() instanceof ObjectType) {
ObjectType objType = ((ObjectType) property.getValueType());
return this.createTerm(
objType.getProperties(),
(prop, column) -> {
String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId();
if (!last && !(prop.getValueType() instanceof ObjectType)) {
TermColumn term = createTermColumn(_prefix, prop, false, valueTypes).get(0);
column.setColumn(term.getColumn());
column.setName(term.getName());
column.setOptions(term.getOptions());
column.withOthers(term.getOthers());
} else {
column.setChildren(createTermColumn(_prefix, prop, last, valueTypes));
}
});
} else {
if (!last) {
return Collections.singletonList(
TermColumn.of(SceneUtils.appendColumn(prefix, property.getId()),
property.getName(), property.getValueType())
.withMetrics(property)
.withMetadataTrue()
);
}
return Arrays
.stream(valueTypes)
.map(type -> TermColumn
.of(SceneUtils
.appendColumn(prefix,
property.getId(),
type.name()),
type.getKey(),
null,
type.dataType == null ? property.getValueType() : type.dataType)
.withMetrics(property)
.withMetadataTrue()
)
.collect(Collectors.toList());
}
}
private List<TermColumn> createTerm(List<PropertyMetadata> metadataList,
BiConsumer<PropertyMetadata, TermColumn> consumer,
String... description) {
List<TermColumn> columns = new ArrayList<>(metadataList.size());
for (PropertyMetadata metadata : metadataList) {
TermColumn column = TermColumn.of(metadata);
column.setDescription(String.join("", description));
consumer.accept(metadata, column);
columns.add(column.withMetadataTrue());
}
return columns;
}
public void validate() {
Assert.notNull(operator, "error.scene_rule_trigger_device_operation_cannot_be_null");
switch (operator) {

View File

@ -1,29 +1,74 @@
package org.jetlinks.community.rule.engine.scene;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.community.rule.engine.alarm.AlarmConstants;
import org.jetlinks.community.terms.TermSpec;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
public interface SceneActionProvider<C> {
/**
* 提供商标识
*
* @return 提供商标识
*/
String getProvider();
/**
* 创建一个新的配置
*
* @return 配置
*/
C newConfig();
/**
* 尝试从动作的变量中提取出需要动态获取的列信息
*
* @param config 配置
* @return 名称
*/
List<String> parseColumns(C config);
/**
* 根据配置创建变量,用于获取此动作将要输出的变量
*
* @param config 配置
* @return 变量
*/
Flux<Variable> createVariable(C config);
/**
* 应用配置到规则节点
*
* @param config 配置
* @param model 规则节点
*/
void applyRuleNode(C config, RuleNodeModel model);
/**
* 应用过滤条件描述到规则节点
*
* @param node 规则节点
* @param specs 过滤条件
*/
default void applyFilterSpec(RuleNodeModel node, List<TermSpec> specs) {
node.addConfiguration(
AlarmConstants.ConfigKey.alarmFilterTermSpecs,
SerializeUtils.convertToSafelySerializable(specs)
);
}
/**
* 获取详细类型
* 用于区分同一个类型支持的多个执行动作
* @return 详细类型
*/
default List<String> getMode() {
return null;
}
}

View File

@ -161,7 +161,7 @@ public class SceneRule implements Serializable {
));
List<Variable> defaultVariables = createDefaultVariable();
List<Variable> termVar = SceneUtils.parseVariable(terms, columns);
List<Variable> termVar = trigger.provider().parseVariable(terms, columns);
List<Variable> variables = new ArrayList<>(defaultVariables.size() + termVar.size());
variables.addAll(defaultVariables);
@ -561,7 +561,7 @@ public class SceneRule implements Serializable {
});
link.setCondition(TermsConditionEvaluator.createCondition(
trigger.refactorTerm("this", termList)));
trigger.refactorTerm("this", preAction.getTerms())));
}
} else if (Objects.equals(trigger.getType(), ManualTriggerProvider.PROVIDER)) {

View File

@ -9,6 +9,7 @@ import org.jetlinks.community.rule.engine.commons.ShakeLimitResult;
import org.jetlinks.community.rule.engine.commons.impl.SimpleShakeLimitProvider;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
import org.jetlinks.community.terms.TermSpec;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.rule.engine.api.model.RuleModel;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import reactor.core.publisher.Flux;
@ -98,6 +99,11 @@ public interface SceneTriggerProvider<E extends SceneTriggerProvider.TriggerConf
*/
List<Variable> createDefaultVariable(E config);
default List<Variable> parseVariable(List<Term> terms,
List<TermColumn> columns) {
return SceneUtils.parseVariable(terms, columns);
}
/**
* 应用规则节点配置
*

View File

@ -2,8 +2,10 @@ package org.jetlinks.community.rule.engine.scene;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql;
import org.jetlinks.community.PropertyMetric;
import org.jetlinks.community.reactorql.function.FunctionSupport;
import org.jetlinks.community.reactorql.term.TermType;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
@ -14,6 +16,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -154,18 +157,15 @@ public class SceneUtils {
return variables;
}
public static Flux<String> getSupportTriggers() {
return Flux
.fromIterable(SceneProviders.triggerProviders())
.map(SceneTriggerProvider::getProvider);
public static Flux<SceneTriggerProvider<SceneTriggerProvider.TriggerConfig>> getSupportTriggers() {
return Flux.fromIterable(SceneProviders.triggerProviders());
}
public static Flux<String> getSupportActions() {
return Flux
.fromIterable(SceneProviders.actionProviders())
.map(SceneActionProvider::getProvider);
public static Flux<SceneActionProvider<?>> getSupportActions() {
return Flux.fromIterable(SceneProviders.actionProviders());
}
public static Flux<TermColumn> parseTermColumns(SceneRule ruleMono) {
Trigger trigger = ruleMono.getTrigger();
if (trigger != null) {
@ -196,7 +196,9 @@ public class SceneUtils {
.map(SelectorInfo::of);
}
public static Term refactorTerm(String tableName, Term term) {
public static Term refactorTerm(String tableName,
Term term,
BiFunction<String, String, String> columnRefactor) {
if (term.getColumn() == null) {
return term;
}
@ -219,6 +221,15 @@ public class SceneUtils {
term.getOptions().add(TermType.OPTIONS_NATIVE_SQL);
return tableName + "['" + arr[1] + "_metric_" + value.getMetric() + "']";
}
//函数, : array_len() , device_prop()
else if (value.getSource() == TermValue.Source.function) {
SqlRequest request = FunctionSupport
.supports
.getNow(value.getFunction())
.createSql(columnRefactor.apply(tableName, value.getColumn()), value.getArgs())
.toRequest();
return NativeSql.of(request.getSql(), request.getParameters());
}
//手动设置值
else {
return value.getValue();
@ -234,28 +245,42 @@ public class SceneUtils {
.collect(Collectors.toList());
}
String column;
// properties.xxx.last的场景
if (arr.length > 3 && arr[0].equals("properties")) {
column = tableName + "['" + createColumnAlias("properties", term.getColumn(), false)
+ "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
} else if (!isDirectTerm(arr[0])) {
column = tableName + "['" + createColumnAlias("properties", term.getColumn(), false) + "']";
} else {
column = term.getColumn();
}
if (term.getOptions().contains(TermType.OPTIONS_NATIVE_SQL) && !(val instanceof NativeSql)) {
val = NativeSql.of(String.valueOf(val));
}
term.setColumn(column);
term.setColumn(columnRefactor.apply(tableName, term.getColumn()));
term.setValue(val);
return term;
}
public static Term refactorTerm(String tableName, Term term) {
return refactorTerm(tableName, term, SceneUtils::refactorColumn);
}
private static String refactorColumn(String tableName, String column) {
String[] arr = column.split("[.]");
// fixme 重构 条件列解析逻辑
// properties.xxx.last的场景
if (arr.length > 3 && arr[0].equals("properties")) {
return tableName + "['" + createColumnAlias("properties", column, false)
+ "." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1)) + "']";
} else if (!isDirectTerm(arr[0])) {
return tableName + "['" + createColumnAlias(arr[0], column, false) + "']";
} else {
// scene.obj1.xx.val1.current => t['scene.obj1_current.val1']
if (arr.length > 3 && isSceneTerm(column)) {
return tableName + "['" + arr[0] + "." + createColumnAlias(arr[0], column, false) +
"." + String.join(".", Arrays.copyOfRange(arr, 2, arr.length - 1))
+ "']";
} else {
return tableName + "['" + column + "']";
}
}
}
private static boolean isDirectTerm(String column) {
//直接term,构建Condition输出条件时使用

View File

@ -9,6 +9,8 @@ import org.hswebframework.ezorm.rdb.executor.EmptySqlRequest;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments;
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
import org.jetlinks.community.reactorql.term.TermTypeSupport;
import org.jetlinks.community.reactorql.term.TermTypes;
import org.jetlinks.community.rule.engine.commons.ShakeLimit;
import org.jetlinks.community.rule.engine.scene.internal.triggers.*;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
@ -72,7 +74,12 @@ public class Trigger implements Serializable {
List<Term> target = new ArrayList<>(terms.size());
for (Term term : terms) {
Term copy = term.clone();
TermTypeSupport support = TermTypes.lookupSupport(term.getTermType()).orElse(null);
if (support != null) {
target.add(support.refactorTerm(tableName, copy, provider()::refactorTerm));
} else {
target.add(provider().refactorTerm(tableName, copy));
}
if (org.apache.commons.collections4.CollectionUtils.isNotEmpty(copy.getTerms())) {
copy.setTerms(refactorTerm(tableName, copy.getTerms()));
}

View File

@ -2,12 +2,14 @@ package org.jetlinks.community.rule.engine.scene.internal.actions;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.rule.engine.alarm.AlarmTaskExecutorProvider;
import org.jetlinks.community.rule.engine.enums.AlarmMode;
import org.jetlinks.community.rule.engine.scene.SceneActionProvider;
import org.jetlinks.community.rule.engine.scene.Variable;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -40,4 +42,9 @@ public class AlarmActionProvider implements SceneActionProvider<AlarmAction> {
model.setExecutor(AlarmTaskExecutorProvider.executor);
model.setConfiguration(FastBeanCopier.copy(config, new HashMap<>()));
}
@Override
public List<String> getMode() {
return Arrays.asList(AlarmMode.trigger.name(), AlarmMode.relieve.name());
}
}

View File

@ -0,0 +1,99 @@
package org.jetlinks.community.rule.engine.scene.internal.actions;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.rule.engine.executor.device.DeviceDataTaskExecutorProvider;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec;
import org.jetlinks.community.rule.engine.scene.SceneAction;
import org.jetlinks.community.rule.engine.scene.SceneActionProvider;
import org.jetlinks.community.rule.engine.scene.Variable;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.rule.engine.api.model.RuleNodeModel;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Component
@AllArgsConstructor
public class DeviceDataActionProvider implements SceneActionProvider<DeviceDataActionProvider.DeviceDataAction> {
public static final String PROVIDER = "device-data";
private final ThingsRegistry registry;
@Override
public String getProvider() {
return PROVIDER;
}
@Override
public DeviceDataAction newConfig() {
return new DeviceDataAction();
}
@Override
public List<String> parseColumns(DeviceDataAction config) {
return Collections.emptyList();
}
@Override
public Flux<Variable> createVariable(DeviceDataAction config) {
return config
.getSelector()
.getDeviceMetadata(registry, config.getProductId())
.map(config::createOutputType)
.flatMapIterable(type -> {
List<PropertyMetadata> props = type.getProperties();
List<Variable> variables = new ArrayList<>(props.size());
for (PropertyMetadata prop : props) {
variables.add(
SceneAction
.toVariable(prop.getId(),
prop.getName(),
prop.getValueType(),
"message.scene.action.device-data." + prop.getId(),
"设备[%s]信息",
null)
);
}
return variables;
});
}
@Override
public void applyRuleNode(DeviceDataAction config, RuleNodeModel model) {
model.setExecutor(DeviceDataTaskExecutorProvider.ID);
if (DeviceSelectorProviders.isFixed(config.getSelector())) {
config.setSelector(FastBeanCopier.copy(config.getSelector(), new DeviceSelectorSpec()));
} else {
config.setSelector(
DeviceSelectorProviders.composite(
//先选择产品下的设备
DeviceSelectorProviders.product(config.getProductId()),
FastBeanCopier.copy(config.getSelector(), new DeviceSelectorSpec())
));
}
model.setConfiguration(config.toMap());
}
@Getter
@Setter
public static class DeviceDataAction extends DeviceDataTaskExecutorProvider.Config {
private String productId;
}
}

View File

@ -242,7 +242,11 @@ public class DeviceTrigger extends DeviceSelectorSpec implements SceneTriggerPro
.lookupSupport(termType)
.orElseThrow(() -> new UnsupportedOperationException("unsupported termType " + termType));
Term copy = refactorTermValue(DEFAULT_FILTER_TABLE, term.clone());
Term copy = support
.refactorTerm(
DEFAULT_FILTER_TABLE, term.clone(),
DeviceTrigger::refactorTermValue
);
return support.createSql(copy.getColumn(), copy.getValue(), copy);
}

View File

@ -50,6 +50,11 @@ public class DeviceTriggerProvider extends AbstractSceneTriggerProvider<DeviceTr
return config.createFragments(terms);
}
@Override
public Term refactorTerm(String mainTableName, Term term) {
return DeviceTrigger.refactorTermValue(mainTableName, term);
}
@Override
public List<Variable> createDefaultVariable(DeviceTrigger config) {
return config.createDefaultVariable();

View File

@ -8,6 +8,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.reactorql.function.FunctionInfo;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.BooleanType;
@ -66,6 +67,9 @@ public class TermColumn {
@Schema(description = "支持的条件类型")
private List<TermType> termTypes;
@Schema(description = "支持的函数")
private List<FunctionInfo> functions;
@Schema(description = "支持的指标")
private List<PropertyMetric> metrics;

View File

@ -28,6 +28,15 @@ public class TermValue implements Serializable {
@Schema(description = "[source]为[metric]时不能为空")
private String metric;
@Schema(description = "[source]为[function]时不能为空")
private String function;
@Schema(description = "[source]为[function]时有效")
private String column;
@Schema(description = "[source]为[function]时有效")
private Map<String, Object> args;
public static TermValue manual(Object value) {
TermValue termValue = new TermValue();
termValue.setValue(value);
@ -66,9 +75,26 @@ public class TermValue implements Serializable {
}
public enum Source {
/**
* 和manual一样,
* 兼容{@link org.jetlinks.community.relation.utils.VariableSource.Source#fixed}
*/
fixed,
manual,
metric,
variable,
upper
/**
* 和variable一样,兼容{@link org.jetlinks.community.relation.utils.VariableSource.Source#upper}
*/
upper,
/**
* 函数
*
* @see org.jetlinks.community.reactorql.function.FunctionSupport
*/
function
}
}

View File

@ -0,0 +1,178 @@
package org.jetlinks.community.rule.engine.utils;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.reactorql.impl.ComplexExistsFunction;
import org.jetlinks.community.rule.engine.scene.DeviceOperation;
import org.jetlinks.community.rule.engine.scene.SceneUtils;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.ArrayType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.LongType;
import org.jetlinks.core.metadata.types.ObjectType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import static org.jetlinks.core.metadata.SimplePropertyMetadata.of;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
public class TermColumnUtils {
public static List<TermColumn> parseArrayChildTermColumns(DataType dataType) {
String prefix = ComplexExistsFunction.COL_ELEMENT;
if (!(dataType instanceof ArrayType)) {
return new ArrayList<>();
}
ArrayType arrayType = (ArrayType) dataType;
List<TermColumn> columns = new ArrayList<>();
if (arrayType.getElementType() instanceof ObjectType) {
List<PropertyMetadata> properties = ((ObjectType) arrayType.getElementType()).getProperties();
return TermColumnUtils.createTerm(
prefix,
properties,
(property, column) -> column.setChildren(TermColumnUtils.createTermColumn(prefix, property, true)),
LocaleUtils.resolveMessage("message.device_metadata_property", "属性"));
} else {
SimplePropertyMetadata prop =
of("this",
resolveI18n("message.term_element_of_array",
"数组元素"),
(arrayType.getElementType() instanceof ArrayType) ? IntType.GLOBAL : arrayType.getElementType());
columns.addAll(TermColumnUtils.createTermColumn(prefix, prop, false));
}
return columns;
}
public static List<TermColumn> createTermColumn(String prefix,
PropertyMetadata property,
boolean last,
DeviceOperation.PropertyValueType... valueTypes) {
//对象类型嵌套
if (property.getValueType() instanceof ObjectType) {
ObjectType objType = ((ObjectType) property.getValueType());
if (objType.getProperties().isEmpty()) {
String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId();
return createTermColumn0(_prefix, property, last, valueTypes);
}
return createTerm(
objType.getProperties(),
(prop, column) -> {
String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId();
if (!last && !(prop.getValueType() instanceof ObjectType)) {
TermColumn term = createTermColumn(_prefix, prop, false, valueTypes).get(0);
column.setColumn(term.getColumn());
column.setName(term.getName());
column.setOptions(term.getOptions());
column.withOthers(term.getOthers());
} else {
column.setColumn(SceneUtils.appendColumn(_prefix, prop.getId()));
column.setChildren(createTermColumn(_prefix, prop, last, valueTypes));
}
});
} else if (property.getValueType() instanceof ArrayType) {
PropertyMetadata sizeMetadata = of("size",
resolveI18n("message.term_size_of_array", "长度"),
new LongType());
PropertyMetadata valueMetadata = of("this",
resolveI18n("message.term_origin_of_array", "原始值"),
property.getValueType());
String _prefix = prefix == null ? property.getId() : prefix + "." + property.getId();
TermColumn size = TermColumn
.of(SceneUtils.appendColumn(_prefix, sizeMetadata.getId()),
sizeMetadata.getName(), sizeMetadata.getValueType())
.withMetadataTrue();
size.setChildren(createTermColumn0(_prefix, sizeMetadata, last, valueTypes));
TermColumn valueThis =
TermColumn
.of(SceneUtils.appendColumn(_prefix, valueMetadata.getId()),
valueMetadata.getName(), valueMetadata.getValueType())
.withMetadataTrue();
valueThis.setChildren(createTermColumn0(_prefix, valueMetadata, last, valueTypes));
return Arrays.asList(size, valueThis);
} else {
return createTermColumn0(prefix, property, last, valueTypes);
}
}
public static List<TermColumn> createTerm(List<PropertyMetadata> metadataList,
BiConsumer<PropertyMetadata, TermColumn> consumer,
String... description) {
List<TermColumn> columns = new ArrayList<>(metadataList.size());
for (PropertyMetadata metadata : metadataList) {
TermColumn column = TermColumn.of(metadata);
column.setDescription(String.join("", description));
consumer.accept(metadata, column);
columns.add(column.withMetadataTrue());
}
return columns;
}
public static List<TermColumn> createTerm(String prefix,
List<PropertyMetadata> metadataList,
BiConsumer<PropertyMetadata, TermColumn> consumer,
String... description) {
List<TermColumn> columns = new ArrayList<>(metadataList.size());
for (PropertyMetadata metadata : metadataList) {
TermColumn column = TermColumn.
of(SceneUtils.appendColumn(prefix, metadata.getId()),
metadata.getName(),
metadata.getValueType());
column.setDescription(String.join("", description));
consumer.accept(metadata, column);
columns.add(column.withMetadataTrue());
}
return columns;
}
private static List<TermColumn> createTermColumn0(String prefix,
PropertyMetadata property,
boolean last,
DeviceOperation.PropertyValueType... valueTypes) {
if (!last) {
return Collections.singletonList(
TermColumn.of(SceneUtils.appendColumn(prefix, property.getId()),
property.getName(), property.getValueType())
.withMetrics(property)
.withMetadataTrue()
);
}
return Arrays
.stream(valueTypes)
.map(type -> TermColumn
.of(SceneUtils
.appendColumn(prefix,
property.getId(),
type.name()),
type.getKey(),
null,
type.getDataType() == null ? property.getValueType() : type.getDataType())
.withMetrics(property)
.withMetadataTrue()
)
.collect(Collectors.toList());
}
private static String resolveI18n(String key, String name) {
return LocaleUtils.resolveMessage(key, name);
}
}

View File

@ -17,6 +17,8 @@ import org.jetlinks.community.rule.engine.alarm.AlarmTargetSupplier;
import org.jetlinks.community.rule.engine.entity.AlarmConfigDetail;
import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
import org.jetlinks.community.rule.engine.entity.AlarmLevelEntity;
import org.jetlinks.community.rule.engine.scene.SceneUtils;
import org.jetlinks.community.rule.engine.scene.SceneTriggerProvider;
import org.jetlinks.community.rule.engine.service.AlarmConfigService;
import org.jetlinks.community.rule.engine.service.AlarmLevelService;
import org.jetlinks.community.rule.engine.web.response.AlarmTargetTypeInfo;
@ -55,12 +57,21 @@ public class AlarmConfigController implements ReactiveServiceCrudController<Alar
@GetMapping("/target-type/supports")
@Operation(summary = "获取支持的告警目标类型")
public Flux<AlarmTargetTypeInfo> getTargetTypeSupports() {
Flux<String> triggerCache = SceneUtils
.getSupportTriggers()
.map(SceneTriggerProvider::getProvider)
.cache();
return Flux
.fromIterable(AlarmTargetSupplier
.get()
.getAll()
.values())
.map(AlarmTargetTypeInfo::of);
.flatMap(alarmTarget -> triggerCache
.filter(alarmTarget::isSupported)
.collectList()
.map(supportTriggers -> AlarmTargetTypeInfo
.of(alarmTarget)
.with(supportTriggers)));
}

View File

@ -47,4 +47,30 @@ public class AlarmHistoryController {
.flatMap(alarmHistoryService::queryPager);
}
@PostMapping("/alarm-record/{recordId}/_query")
@Operation(summary = "按告警记录查询告警历史")
@QueryAction
public Mono<PagerResult<AlarmHistoryInfo>> queryHistoryPager(
@PathVariable @Parameter(description = "告警记录ID") String recordId,
@RequestBody Mono<QueryParamEntity> query) {
return query
.map(q -> q
.toNestQuery()
.and(AlarmHistoryInfo::getAlarmRecordId, recordId)
.getParam())
.flatMap(alarmHistoryService::queryPager);
}
@PostMapping("/{dimensionType}/{alarmConfigId}/_query")
@Operation(summary = "按维度查询告警历史")
@QueryAction
public Mono<PagerResult<AlarmHistoryInfo>> queryHandleHistoryPagerByDimensionType(@PathVariable @Parameter(description = "告警配置ID") String alarmConfigId,
@PathVariable @Parameter(description = "告警维度") String dimensionType,
@RequestBody Mono<QueryParamEntity> query) {
return query
.doOnNext(queryParamEntity -> queryParamEntity
.toNestQuery(q -> q.and(AlarmHistoryInfo::getAlarmConfigId, alarmConfigId)))
.flatMap(alarmHistoryService::queryPager);
}
}

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.rule.engine.web;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.core.param.TermType;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.authorization.annotation.Authorize;
@ -11,16 +12,14 @@ import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.SaveAction;
import org.hswebframework.web.crud.service.ReactiveCrudService;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
import org.hswebframework.web.exception.NotFoundException;
import org.jetlinks.community.rule.engine.alarm.AlarmHandleInfo;
import org.jetlinks.community.rule.engine.entity.AlarmHandleHistoryEntity;
import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity;
import org.jetlinks.community.rule.engine.service.AlarmConfigService;
import org.jetlinks.community.rule.engine.service.AlarmHandleHistoryService;
import org.jetlinks.community.rule.engine.service.AlarmRecordService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
@RestController
@ -42,6 +41,17 @@ public class AlarmRecordController implements ReactiveServiceQueryController<Ala
return recordService;
}
@PostMapping("/{dimensionType}/_query")
@Operation(summary = "按不同维度查询告警记录")
@QueryAction
public Mono<PagerResult<AlarmRecordEntity>> queryPagerByDimensionType(@PathVariable String dimensionType,
@RequestBody Mono<QueryParamEntity> query) {
return query
.doOnNext(queryParamEntity -> queryParamEntity
.toNestQuery(q -> q.and(AlarmRecordEntity::getTargetType, TermType.eq, dimensionType)))
.flatMap(this::queryPager1);
}
@PostMapping("/_handle")
@Operation(summary = "处理告警")
@SaveAction
@ -56,4 +66,51 @@ public class AlarmRecordController implements ReactiveServiceQueryController<Ala
public Mono<PagerResult<AlarmHandleHistoryEntity>> queryHandleHistoryPager(@RequestBody Mono<QueryParamEntity> query) {
return query.flatMap(handleHistoryService::queryPager);
}
@PostMapping("/{id}/handle-history/_query")
@Operation(summary = "按告警记录查询告警处理历史")
@QueryAction
public Mono<PagerResult<AlarmHandleHistoryEntity>> queryHandleHistoryPager(
@PathVariable String id,
@RequestBody Mono<QueryParamEntity> query) {
return query
.doOnNext(queryParamEntity -> queryParamEntity
.toNestQuery(q -> q.and(AlarmHandleHistoryEntity::getAlarmRecordId, TermType.eq, id)))
.flatMap(handleHistoryService::queryPager);
}
@PostMapping("/{dimensionType}/_handle")
@Operation(summary = "按维度处理告警")
@SaveAction
@Deprecated
public Mono<Void> handleAlarm(@PathVariable String dimensionType,
@RequestBody Mono<AlarmHandleInfo> handleInfo) {
return handleAlarm(handleInfo);
}
@PostMapping("/handle-history/{dimensionType}/{recordId}/_query")
@Operation(summary = "根据维度查询告警处理历史")
@QueryAction
public Mono<PagerResult<AlarmHandleHistoryEntity>> queryHandleHistoryPager(@PathVariable String dimensionType,
@PathVariable String recordId,
@RequestBody Mono<QueryParamEntity> query) {
return recordService
.createQuery()
.where(AlarmRecordEntity::getId, recordId)
.fetchOne()
.switchIfEmpty(Mono.error(() -> new NotFoundException.NoStackTrace("error.alarm_record_not_exist", 500, recordId)))
.flatMap(record -> query.flatMap(handleHistoryService::queryPager));
}
private Mono<PagerResult<AlarmRecordEntity>> queryPager1(QueryParamEntity query) {
if (query.getTotal() != null) {
return getService()
.createQuery()
.setParam(query.rePaging(query.getTotal()))
.fetch()
.collectList()
.map(list -> PagerResult.of(query.getTotal(), list, query));
}
return getService().queryPager(query);
}
}

View File

@ -13,14 +13,20 @@ import org.hswebframework.web.authorization.annotation.Resource;
import org.hswebframework.web.authorization.annotation.SaveAction;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.reactorql.aggregation.AggregationSupport;
import org.jetlinks.community.rule.engine.service.SceneService;
import org.jetlinks.community.rule.engine.utils.TermColumnUtils;
import org.jetlinks.community.rule.engine.web.request.SceneExecuteRequest;
import org.jetlinks.community.rule.engine.web.response.SceneActionInfo;
import org.jetlinks.community.rule.engine.web.response.SceneAggregationInfo;
import org.jetlinks.community.rule.engine.web.response.SceneTriggerInfo;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.community.rule.engine.entity.SceneEntity;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProvider;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders;
import org.jetlinks.community.rule.engine.scene.*;
import org.jetlinks.community.rule.engine.scene.term.TermColumn;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -110,6 +116,32 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
.then();
}
@GetMapping("/trigger/supports")
@Operation(summary = "获取支持的触发器类型")
public Flux<SceneTriggerInfo> getSupportTriggers() {
return SceneUtils
.getSupportTriggers()
.map(SceneTriggerInfo::of);
}
@GetMapping("/action/supports")
@Operation(summary = "获取支持的动作类型")
public Flux<SceneActionInfo> getSupportActions() {
return SceneUtils
.getSupportActions()
.flatMap(provider -> SceneActionInfo.of(provider));
}
@GetMapping("/aggregation/supports")
@Operation(summary = "获取支持的聚合函数")
public Flux<SceneAggregationInfo> getSupportAggregations() {
return LocaleUtils
.currentReactive()
.flatMapMany(locale -> Flux
.fromIterable(AggregationSupport.supports.getAll())
.map(aggregation -> SceneAggregationInfo.of(aggregation, locale)));
}
@PostMapping("/parse-term-column")
@Operation(summary = "根据触发器解析出支持的条件列")
@QueryAction
@ -124,6 +156,15 @@ public class SceneController implements ReactiveServiceQueryController<SceneEnti
});
}
@PostMapping("/parse-array-child-term-column")
@Operation(summary = "解析数组需要的子元素支持的条件列")
@QueryAction
public Flux<TermColumn> parseArrayChildTermColumns(@RequestBody Mono<SimplePropertyMetadata> metadataMono) {
return metadataMono
.flatMapMany(metadata -> Flux
.fromIterable(TermColumnUtils.parseArrayChildTermColumns(metadata.getValueType())));
}
@PostMapping("/parse-variables")
@Operation(summary = "解析规则中输出的变量")
@QueryAction

View File

@ -0,0 +1,24 @@
package org.jetlinks.community.rule.engine.web.request;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
/**
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Getter
@Setter
public class ArrayChildTermColumnRequest {
@Schema(description = "条件前缀")
private String prefix;
@Schema(description = "数组属性")
private SimplePropertyMetadata propertyMetadata;
}

View File

@ -2,8 +2,11 @@ package org.jetlinks.community.rule.engine.web.response;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.rule.engine.alarm.AlarmTarget;
import java.util.List;
/**
* @author bestfeng
*/
@ -15,6 +18,7 @@ public class AlarmTargetTypeInfo {
private String name;
private List<String> supportTriggers;
public static AlarmTargetTypeInfo of(AlarmTarget type) {
@ -26,4 +30,13 @@ public class AlarmTargetTypeInfo {
return info;
}
public AlarmTargetTypeInfo with(List<String> supportTriggers) {
this.supportTriggers = supportTriggers;
return this;
}
public String getName() {
return LocaleUtils.resolveMessage("message.rule_engine_alarm_" + id, name);
}
}

View File

@ -0,0 +1,55 @@
package org.jetlinks.community.rule.engine.web.response;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.rule.engine.scene.SceneActionProvider;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Function;
/**
* 执行动作类型.
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Getter
@Setter
public class SceneActionInfo {
@Schema(description = "类型")
private String provider;
@Schema(description = "名称")
private String name;
@Schema(description = "说明")
private String description;
public String getName() {
return LocaleUtils.resolveMessage("message.scene_action_name_" + provider, name);
}
public String getDescription() {
return LocaleUtils.resolveMessage("message.scene_action_desc_" + provider, description);
}
public static Flux<SceneActionInfo> of(SceneActionProvider<?> actionProvider) {
return Mono
.justOrEmpty(actionProvider.getMode())
.flatMapIterable(Function.identity())
.map(SceneActionInfo::of)
.defaultIfEmpty(SceneActionInfo.of(actionProvider.getProvider()));
}
public static SceneActionInfo of(String provider) {
SceneActionInfo actionInfo = new SceneActionInfo();
actionInfo.setProvider(provider);
actionInfo.setName(provider);
return actionInfo;
}
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.rule.engine.web.response;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.reactorql.aggregation.AggregationSupport;
import java.util.Locale;
/**
* 聚合函数类型.
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Getter
@Setter
public class SceneAggregationInfo {
@Schema(description = "函数")
private String id;
@Schema(description = "名称")
private String name;
public static SceneAggregationInfo of(AggregationSupport support, Locale locale) {
String id = support.getId();
SceneAggregationInfo aggregationInfo = new SceneAggregationInfo();
aggregationInfo.setId(id);
aggregationInfo.setName(LocaleUtils
.resolveMessage("message.scene_aggregation_name_" + id,
locale,
support.getName()));
return aggregationInfo;
}
}

View File

@ -0,0 +1,44 @@
package org.jetlinks.community.rule.engine.web.response;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;
import lombok.Setter;
import org.hswebframework.web.i18n.LocaleUtils;
import org.jetlinks.community.rule.engine.scene.SceneTriggerProvider;
/**
* 触发器类型.
*
* @author zhangji 2025/1/22
* @since 2.3
*/
@Getter
@Setter
public class SceneTriggerInfo {
@Schema(description = "类型")
private String provider;
@Schema(description = "名称")
private String name;
@Schema(description = "说明")
private String description;
public String getName() {
return LocaleUtils.resolveMessage("message.scene_trigger_name_" + provider, name);
}
public String getDescription() {
return LocaleUtils.resolveMessage("message.scene_trigger_desc_" + provider, description);
}
public static SceneTriggerInfo of(SceneTriggerProvider<?> triggerProvider) {
String provider = triggerProvider.getProvider();
SceneTriggerInfo triggerInfo = new SceneTriggerInfo();
triggerInfo.setProvider(provider);
triggerInfo.setName(triggerProvider.getName());
return triggerInfo;
}
}

View File

@ -1 +1,2 @@
org.jetlinks.community.rule.engine.configuration.RuleEngineManagerConfiguration
org.jetlinks.community.rule.engine.configuration.AlarmTargetConfiguration

View File

@ -94,7 +94,69 @@ message.property_value_type_recent_nest_desc=When the current value is empty, th
message.property_value_type_last=Last value
message.property_value_type_last_time=Last time
message.property_value_type_last_time_desc=Last reported data time
message.property_value_type_last_time_nest_desc=Data time in last [{0}]
message.property_value_type_last_desc=Last reported data value
message.property_value_type_last_nest_desc=Data value in last [{0}]
message.scene_triggered_relieve_alarm=scene triggered relieve alarm
message.scene_term_column_full_name={1} of {0}
error.alarm_record_not_exist = The alarm record [{0}] not exist
message.scene_trigger_type=Scene trigger type
message.term_type_scene_manual_desc=When the system receives a manual trigger command, it triggers the scene
message.term_type_scene_manual_actual_desc=Manual trigger
message.term_type_scene_timer_actual_desc=Timer trigger
message.term_size_of_array=array size
message.term_origin_of_array=array origin
message.term_element_of_array=array element
message.rule_engine_alarm_collector=collector
message.rule_engine_alarm_aiModel=aiModel
message.rule_engine_alarm_aiTask=aiTask
message.rule_engine_alarm_device=device
message.rule_engine_alarm_organization=org
message.rule_engine_alarm_product=product
message.rule_engine_alarm_scene=scene
message.rule_engine_alarm_subscriber_provider_alarm-other=scene alarm
message.rule_engine_alarm_subscriber_provider_alarm-product=product alarm
message.rule_engine_alarm_subscriber_provider_alarm-device=device alarm
message.rule_engine_alarm_subscriber_provider_alarm-org=org alarm
message.rule_engine_alarm_subscriber_provider_alarm=alarm
#scene-trigger
message.scene_trigger_name_manual=manual trigger
message.scene_trigger_name_device=device trigger
message.scene_trigger_name_timer=timer trigger
message.scene_trigger_name_collector=collector trigger
message.scene_trigger_desc_manual=Suitable for third-party platforms issuing instructions to IoT platforms to control devices
message.scene_trigger_desc_device=Suitable for executing specified actions when device data or behavior meets triggering conditions
message.scene_trigger_desc_timer=Suitable for regularly executing fixed tasks
message.scene_trigger_desc_collector=Suitable for executing specified actions when the collector point meets the triggering conditions
#scene-action
message.scene_action_name_device=device output
message.scene_action_name_device-data=device data
message.scene_action_name_notify=message notification
message.scene_action_name_delay=delay execution
message.scene_action_name_trigger=trigger alarm
message.scene_action_name_relieve=relieve alarm
message.scene_action_name_collector=collector output
message.scene_action_desc_device=Configure device invocation function, read attributes, set attribute rules
message.scene_action_desc_device-data=Get basic information, attributes, tags, events, etc. of the device
message.scene_action_desc_notify=Configure notifications to be sent to designated users via email, DingTalk, WeChat, SMS, etc
message.scene_action_desc_delay=Wait for a period of time before executing subsequent actions
message.scene_action_desc_trigger=Configure trigger alarm rules, which need to be used in conjunction with "alarm configuration"
message.scene_action_desc_relieve=Configure relieve alarm rules, which need to be used in conjunction with "Alarm Configuration"
message.scene_action_desc_collector=Configure reading points and set point rules
#aggregation
message.scene_aggregation_name_COUNT=COUNT
message.scene_aggregation_name_DISTINCT_COUNT=DISTINCT_COUNT
message.scene_aggregation_name_MIN=MIN
message.scene_aggregation_name_MAX=MAX
message.scene_aggregation_name_AVG=AVG
message.scene_aggregation_name_SUM=SUM
message.scene_aggregation_name_FIRST=FIRST
message.scene_aggregation_name_LAST=LAST
message.scene_aggregation_name_MEDIAN=MEDIAN
message.scene_aggregation_name_SPREAD=SPREAD
message.scene_aggregation_name_STDDEV=STDDEV

View File

@ -27,11 +27,12 @@ error.scene_rule_trigger_device_operation_event_id_cannot_be_null=\u8BBE\u5907\u
error.scene_rule_trigger_device_operation_read_property_cannot_be_empty=\u5C5E\u6027\u8BFB\u53D6\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_trigger_device_operation_write_property_cannot_be_empty=\u4FEE\u6539\u5C5E\u6027\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_trigger_device_operation_function_id_cannot_be_null=\u529F\u80FDid\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_trigger_device_operation_function_parameter_cannot_be_empty=\u529F\u80FD\u53C2\u6570\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_actions_notify_type_cannot_be_empty=\u901A\u77E5\u7C7B\u578B\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_actions_notify_id_cannot_be_empty=\u901A\u77E5\u7C7B\u578B\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_actions_notify_template_cannot_be_blank=\u901A\u77E5\u6A21\u677F\u4E0D\u80FD\u4E3A\u7A7A
error.scene_rule_actions_notify_variables_cannot_be_blank=\u901A\u77E5\u53D8\u91CF\u4E0D\u80FD\u4E3A\u7A7A
error.alarm_record_not_fount=\u627E\u4E0D\u5230\u3010{0}\u3011\u62A5\u8B66\u8BB0\u5F55
error.the_alarm_record_has_been_processed=\u544A\u8B66\u8BB0\u5F55\u5DF2\u88AB\u5904\u7406
#entity-package
error.not_set_alarm_rule=\u672A\u8BBE\u7F6E\u544A\u8B66\u89C4\u5219
@ -99,7 +100,68 @@ message.property_value_type_recent_nest_desc=\u5F53\u524D\u503C\u4E3A\u7A7A\u65F
message.property_value_type_last=\u4E0A\u4E00\u503C
message.property_value_type_last_time=\u4E0A\u4E00\u6B21\u4E0A\u62A5\u65F6\u95F4
message.property_value_type_last_time_desc=\u4E0A\u4E00\u6B21\u4E0A\u62A5\u6570\u636E\u7684\u65F6\u95F4
message.property_value_type_last_time_nest_desc=\u4e0a\u4e00\u6b21\u3010{0}\u3011\u4e2d\u4e0a\u62a5\u6570\u636e\u7684\u65f6\u95f4
message.property_value_type_last_desc=\u4E0A\u4E00\u6B21\u4E0A\u62A5\u7684\u6570\u636E\u503C
message.property_value_type_last_nest_desc=\u4E0A\u4E00\u6B21\u3010{0}\u3011\u4E2D\u7684\u6570\u636E\u503C
message.scene_triggered_relieve_alarm=\u573A\u666F\u89E6\u53D1\u89E3\u9664\u544A\u8B66
message.scene_term_column_full_name={0}/{1}
error.alarm_record_not_exist = \u544A\u8B66\u8BB0\u5F55[{0}]\u4E0D\u5B58\u5728
message.scene_trigger_type=\u573A\u666F\u89E6\u53D1\u7C7B\u578B
message.term_type_scene_manual_trigger_desc=\u7CFB\u7EDF\u5728\u63A5\u6536\u5230\u624B\u52A8\u89E6\u53D1\u6307\u4EE4\u65F6\uFF0C\u89E6\u53D1\u573A\u666F
message.term_type_scene_manual_actual_desc=\u624B\u52A8\u89E6\u53D1\u544A\u8B66
message.term_type_scene_timer_actual_desc=\u5B9A\u65F6\u89E6\u53D1\u544A\u8B66
message.term_size_of_array=\u6570\u7EC4\u957F\u5EA6
message.term_origin_of_array=\u539F\u59CB\u503C
message.term_element_of_array=\u6570\u503C\u5143\u7D20
message.rule_engine_alarm_collector=\u91C7\u96C6\u5668
message.rule_engine_alarm_aiModel=AI\u6A21\u578B
message.rule_engine_alarm_aiTask=AI\u4EFB\u52A1
message.rule_engine_alarm_device=\u8BBE\u5907
message.rule_engine_alarm_organization=\u7EC4\u7EC7
message.rule_engine_alarm_product=\u4EA7\u54C1
message.rule_engine_alarm_scene=\u573A\u666F
message.rule_engine_alarm_subscriber_provider_alarm-other=\u573A\u666F\u544A\u8B66
message.rule_engine_alarm_subscriber_provider_alarm-product=\u4EA7\u54C1\u544A\u8B66
message.rule_engine_alarm_subscriber_provider_alarm-device=\u8BBE\u5907\u544A\u8B66
message.rule_engine_alarm_subscriber_provider_alarm-org=\u7EC4\u7EC7\u544A\u8B66
message.rule_engine_alarm_subscriber_provider_alarm=\u544A\u8B66
#scene-trigger
message.scene_trigger_name_manual=\u624b\u52a8\u89e6\u53d1
message.scene_trigger_name_device=\u8bbe\u5907\u89e6\u53d1
message.scene_trigger_name_timer=\u5b9a\u65f6\u89e6\u53d1
message.scene_trigger_name_collector=\u91c7\u96c6\u5668\u89e6\u53d1
message.scene_trigger_desc_manual=\u9002\u7528\u4e8e\u7b2c\u4e09\u65b9\u5e73\u53f0\u5411\u7269\u8054\u7f51\u5e73\u53f0\u4e0b\u53d1\u6307\u4ee4\u63a7\u5236\u8bbe\u5907
message.scene_trigger_desc_device=\u9002\u7528\u4e8e\u8bbe\u5907\u6570\u636e\u6216\u884c\u4e3a\u6ee1\u8db3\u89e6\u53d1\u6761\u4ef6\u65f6\uff0c\u6267\u884c\u6307\u5b9a\u7684\u52a8\u4f5c
message.scene_trigger_desc_timer=\u9002\u7528\u4e8e\u5b9a\u671f\u6267\u884c\u56fa\u5b9a\u4efb\u52a1
message.scene_trigger_desc_collector=\u9002\u7528\u4e8e\u91c7\u96c6\u5668\u70b9\u4f4d\u6ee1\u8db3\u89e6\u53d1\u6761\u4ef6\u65f6\uff0c\u6267\u884c\u6307\u5b9a\u7684\u52a8\u4f5c
#scene-action
message.scene_action_name_device=\u8bbe\u5907\u8f93\u51fa
message.scene_action_name_device-data=\u8bbe\u5907\u4fe1\u606f
message.scene_action_name_notify=\u6d88\u606f\u901a\u77e5
message.scene_action_name_delay=\u5ef6\u8fdf\u6267\u884c
message.scene_action_name_trigger=\u89e6\u53d1\u544a\u8b66
message.scene_action_name_relieve=\u89e3\u9664\u544a\u8b66
message.scene_action_name_collector=\u91c7\u96c6\u5668\u8f93\u51fa
message.scene_action_desc_device=\u914d\u7f6e\u8bbe\u5907\u8c03\u7528\u529f\u80fd\u3001\u8bfb\u53d6\u5c5e\u6027\u3001\u8bbe\u7f6e\u5c5e\u6027\u89c4\u5219
message.scene_action_desc_device-data=\u83b7\u53d6\u8bbe\u5907\u7684\u57fa\u672c\u4fe1\u606f\u3001\u5c5e\u6027\u3001\u6807\u7b7e\u3001\u4e8b\u4ef6\u7b49
message.scene_action_desc_notify=\u914d\u7f6e\u5411\u6307\u5b9a\u7528\u6237\u53d1\u90ae\u4ef6\u3001\u9489\u9489\u3001\u5fae\u4fe1\u3001\u77ed\u4fe1\u7b49\u901a\u77e5
message.scene_action_desc_delay=\u7b49\u5f85\u4e00\u6bb5\u65f6\u95f4\u540e\uff0c\u518d\u6267\u884c\u540e\u7eed\u52a8\u4f5c
message.scene_action_desc_trigger=\u914d\u7f6e\u89e6\u53d1\u544a\u8b66\u89c4\u5219\uff0c\u9700\u914d\u5408\u201c\u544a\u8b66\u914d\u7f6e\u201d\u4f7f\u7528
message.scene_action_desc_relieve=\u914d\u7f6e\u89e3\u9664\u544a\u8b66\u89c4\u5219\uff0c\u9700\u914d\u5408\u201c\u544a\u8b66\u914d\u7f6e\u201d\u4f7f\u7528
message.scene_action_desc_collector=\u914d\u7f6e\u8bfb\u53d6\u70b9\u4f4d\u3001\u8bbe\u7f6e\u70b9\u4f4d\u89c4\u5219
#aggregation
message.scene_aggregation_name_COUNT=\u603b\u6570
message.scene_aggregation_name_DISTINCT_COUNT=\u603b\u6570(\u53bb\u91cd)
message.scene_aggregation_name_MIN=\u6700\u5c0f\u503c
message.scene_aggregation_name_MAX=\u6700\u5927\u503c
message.scene_aggregation_name_AVG=\u5e73\u5747\u503c
message.scene_aggregation_name_SUM=\u603b\u548c
message.scene_aggregation_name_FIRST=\u7b2c\u4e00\u4e2a\u503c
message.scene_aggregation_name_LAST=\u6700\u540e\u4e00\u4e2a\u503c
message.scene_aggregation_name_MEDIAN=\u4e2d\u4f4d\u6570
message.scene_aggregation_name_SPREAD=\u6781\u5dee
message.scene_aggregation_name_STDDEV=\u6807\u51c6\u5dee

View File

@ -30,7 +30,7 @@
<!-- https://github.com/hs-web/reactor-excel -->
<reactor.excel.version>1.0.6</reactor.excel.version>
<!-- https://github.com/jetlinks/reactor-ql -->
<reactor.ql.version>1.0.17</reactor.ql.version>
<reactor.ql.version>1.0.18-SNAPSHOT</reactor.ql.version>
<!-- https://github.com/jetlinks/jetlinks-plugin -->
<jetlinks.plugin.version>1.0.2</jetlinks.plugin.version>
<!-- https://github.com/jetlinks/jetlinks-sdk -->