Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhouhao 2022-04-06 18:23:00 +08:00
commit 3621ef1ea8
60 changed files with 1094 additions and 477 deletions

View File

@ -1,11 +1,14 @@
# JetLinks 物联网基础平台
![GitHub Workflow Status](https://img.shields.io/github/workflow/status/jetlinks/jetlinks-community/Auto%20Deploy%20Docker?label=docker)
![Version](https://img.shields.io/badge/version-1.11--RELEASE-brightgreen)
![Version](https://img.shields.io/badge/version-1.12--RELEASE-brightgreen)
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/e8d527d692c24633aba4f869c1c5d6ad)](https://app.codacy.com/gh/jetlinks/jetlinks-community?utm_source=github.com&utm_medium=referral&utm_content=jetlinks/jetlinks-community&utm_campaign=Badge_Grade_Settings)
![jetlinks](https://visitor-badge.glitch.me/badge?page_id=jetlinks)
[![QQ①群2021514](https://img.shields.io/badge/QQ①群-2021514-brightgreen)](https://qm.qq.com/cgi-bin/qm/qr?k=LGf0OPQqvLGdJIZST3VTcypdVWhdfAOG&jump_from=webapi)
[![QQ②群324606263](https://img.shields.io/badge/QQ②群-324606263-brightgreen)](https://qm.qq.com/cgi-bin/qm/qr?k=IMas2cH-TNsYxUcY8lRbsXqPnA2sGHYQ&jump_from=webapi)
![jetlinks](https://visitor-badge.glitch.me/badge?page_id=jetlinks)
[![QQ③群647954464](https://img.shields.io/badge/QQ③群-647954464-brightgreen)](https://qm.qq.com/cgi-bin/qm/qr?k=K5m27CkhDn3B_Owr-g6rfiTBC5DKEY59&jump_from=webapi)
JetLinks 基于Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等开发,
是一个开箱即用,可二次开发的企业级物联网基础平台。平台实现了物联网相关的众多基础功能,

View File

@ -48,7 +48,7 @@ services:
POSTGRES_DB: jetlinks
TZ: Asia/Shanghai
ui:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.12.0
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-ui-antd:1.13.0
container_name: jetlinks-ce-ui
ports:
- 9000:80
@ -59,7 +59,7 @@ services:
links:
- jetlinks:jetlinks
jetlinks:
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.12.0-SNAPSHOT
image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.13.0-SNAPSHOT
container_name: jetlinks-ce
ports:
- "8848:8848" # API端口

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -16,6 +16,7 @@ public interface PropertyConstants {
Key<String> deviceName = Key.of("deviceName");
Key<String> productId = Key.of("productId");
Key<String> uid = Key.of("_uid");
@SuppressWarnings("all")

View File

@ -0,0 +1,88 @@
package org.jetlinks.community.utils;
import java.util.function.Supplier;
public class SystemUtils {
static float memoryWatermark = Float.parseFloat(
System.getProperty("memory.watermark", System.getProperty("memory.waterline", "0.1")));
//水位线持续
static long memoryWatermarkDuration = TimeUtils
.parse(System.getProperty("memory.watermark.duration", "5s"))
.toMillis();
static long errorPintInterval = TimeUtils
.parse(System.getProperty("memory.watermark.duration", "500"))
.toMillis();
static Supplier<Float> memoryRemainderSupplier = () -> {
Runtime rt = Runtime.getRuntime();
long free = rt.freeMemory();
long total = rt.totalMemory();
long max = rt.maxMemory();
return (max - total + free) / (max + 0.0F);
};
/**
* 获取内存剩余比例,值为0-1之间,值越小,剩余可用内存越小
*
* @return 内存剩余比例
*/
public static float getMemoryRemainder() {
return memoryRemainderSupplier.get();
}
private static volatile long outTimes = 0;
private static volatile long lastPrintTime = 0;
/**
* 判断当前内存是否已经超过水位线
*
* @return 是否已经超过水位线
*/
public static boolean memoryIsOutOfWatermark() {
boolean out = getMemoryRemainder() < memoryWatermark;
if (!out) {
outTimes = 0;
return false;
}
//连续超水位线
if (outTimes == 0) {
outTimes = System.currentTimeMillis();
} else {
if(System.currentTimeMillis() - outTimes > memoryWatermarkDuration){
System.gc();
return true;
}
}
return false;
}
/**
* 直接打印消息到控制台,支持格式化,<code>printError("save error %s",id);</code>
*
* @param format 格式化
* @param args 格式化参数
* @see java.util.Formatter
*/
public static void printError(String format, Object... args) {
printError(format, () -> args);
}
/**
* 直接打印消息到控制台,支持格式化,<code>printError("save error %s",id);</code>
*
* @param format 格式化
* @param argSupplier 格式化参数
* @see java.util.Formatter
*/
public static void printError(String format, Supplier<Object[]> argSupplier) {
long now = System.currentTimeMillis();
//防止频繁打印导致线程阻塞
if (now - lastPrintTime > errorPintInterval) {
lastPrintTime = now;
System.err.printf((format) + "%n", argSupplier.get());
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -0,0 +1,32 @@
package org.jetlinks.community.elastic.search.index.strategies;
import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import org.springframework.stereotype.Component;
import java.time.LocalDate;
import java.util.Date;
/**
* 按日期来划分索引策略
*
* @author caizz
* @since 1.0
*/
@Component
public class TimeByDayElasticSearchIndexStrategy extends TemplateElasticSearchIndexStrategy {
public TimeByDayElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) {
super("time-by-day", client, properties);
}
@Override
public String getIndexForSave(String index) {
LocalDate now = LocalDate.now();
String idx = wrapIndex(index);
return idx + "_" + now.getYear()
+ "-" + (now.getMonthValue() < 10 ? "0" : "") + now.getMonthValue()
+ "-" + (now.getDayOfMonth() < 10 ? "0" : "") + now.getDayOfMonth();
}
}

View File

@ -4,18 +4,15 @@ import com.alibaba.fastjson.JSON;
import io.netty.util.internal.ObjectPool;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
@ -32,6 +29,7 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -68,28 +66,26 @@ import java.util.stream.Collectors;
@ConfigurationProperties(prefix = "elasticsearch")
public class ReactiveElasticSearchService implements ElasticSearchService {
@Getter
private final ReactiveElasticsearchClient restClient;
@Getter
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false
);
//使用对象池处理Buffer,减少GC消耗
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
}
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager) {
this.restClient = restClient;
init();
this.indexManager = indexManager;
init();
}
@Override
@ -124,23 +120,34 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
});
}
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
return this.query(new String[]{index}, queryParam, mapper);
}
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
if (queryParam.isPaging()) {
return this
.doQuery(index, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
return this
.doQuery(index, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
.doScrollQuery(index, queryParam)
.flatMap(tp2 -> convertQueryHit(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this.doQuery(index, queryParam)
if (!queryParam.isPaging()) {
return Mono
.zip(
this.count(index, queryParam),
this.query(index, queryParam, mapper).collectList(),
(total, list) -> PagerResult.of(total.intValue(), list, queryParam)
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
}
return this
.doQuery(index, queryParam)
.flatMap(tp2 -> this
.convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
@ -169,8 +176,30 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
});
}
private <T> Flux<T> convertQueryHit(List<ElasticSearchIndexMetadata> indexList,
SearchHit searchHit,
Function<Map<String, Object>, T> mapper) {
Map<String, ElasticSearchIndexMetadata> metadata = indexList
.stream()
.collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity()));
return Flux
.just(searchHit)
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
});
}
@ -185,52 +214,79 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.createSearchRequest(queryParam, metadataList)
.flatMap(restClient::searchForPage)
.map(response -> Tuples.of(metadataList, response))
).onErrorResume(err -> {
log.error(err.getMessage(), err);
return Mono.empty();
});
)
;
}
private Flux<Tuple2<List<ElasticSearchIndexMetadata>, SearchHit>> doScrollQuery(String[] index,
QueryParam queryParam) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMapMany(metadataList -> this
.createSearchRequest(queryParam, metadataList)
.flatMapMany(restClient::scroll)
.map(searchHit -> Tuples.of(metadataList, searchHit))
);
}
@Override
public Mono<Long> count(String[] index, QueryParam queryParam) {
QueryParam param = queryParam.clone();
param.setPaging(false);
return createSearchRequest(param, index)
return this
.createSearchRequest(param, index)
.flatMap(this::doCount)
.defaultIfEmpty(0L)
.onErrorReturn(err -> {
log.error("query elastic error", err);
return true;
}, 0L);
.defaultIfEmpty(0L);
}
@Override
public Mono<Long> delete(String index, QueryParam queryParam) {
return this
.getIndexForSearch(index)
.flatMap(inx -> this
.createQueryBuilder(queryParam, index)
.flatMap(request -> restClient.deleteBy(delete -> delete.setQuery(request).indices(inx)))
.map(BulkByScrollResponse::getDeleted))
.defaultIfEmpty(0L);
}
return createQueryBuilder(queryParam, index)
.flatMap(request -> restClient.deleteBy(delete -> delete.setQuery(request).indices(index)))
.map(BulkByScrollResponse::getDeleted);
private boolean checkWritable(String index) {
if (SystemUtils.memoryIsOutOfWatermark()) {
SystemUtils.printError("JVM内存不足,elasticsearch无法处理更多索引[%s]请求!", index);
return false;
}
return true;
}
@Override
public <T> Mono<Void> commit(String index, T payload) {
sink.next(Buffer.of(index, payload));
if (checkWritable(index)) {
sink.next(Buffer.of(index, payload));
}
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Collection<T> payload) {
for (T t : payload) {
sink.next(Buffer.of(index, t));
if (checkWritable(index)) {
for (T t : payload) {
sink.next(Buffer.of(index, t));
}
}
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Publisher<T> data) {
if (!checkWritable(index)) {
return Mono.empty();
}
return Flux.from(data)
.flatMap(d -> commit(index, d))
.then();
.flatMap(d -> commit(index, d))
.then();
}
@Override
@ -241,10 +297,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override
public <T> Mono<Void> save(String index, Publisher<T> data) {
return Flux.from(data)
.map(v -> Buffer.of(index, v))
.collectList()
.flatMap(this::doSave)
.then();
.map(v -> Buffer.of(index, v))
.collectList()
.flatMap(this::doSave)
.then();
}
@Override
@ -257,6 +313,34 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
sink.complete();
}
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
private boolean refreshWhenWrite = false;
}
//@PostConstruct
public void init() {
int flushRate = buffer.rate;
@ -268,41 +352,81 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
FluxUtils
.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
.doOnNext(buf -> bufferedBytes.set(0))
.onBackpressureBuffer(bufferBackpressure, drop -> {
// TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
drop.forEach(Buffer::release);
System.err.println("elasticsearch无法处理更多索引请求!丢弃数据数量:" + drop.size());
SystemUtils.printError("elasticsearch无法处理更多索引请求!丢弃数据数量:%d", drop.size());
}, BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.boundedElastic(), bufferBackpressure)
.flatMap(buffers -> {
long time = System.currentTimeMillis();
return Mono.create(sink -> {
try {
this
.doSave(buffers)
.doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
.doOnError((err) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
})
.doFinally((s) -> sink.success())
.subscribe();
sink.onCancel(this
.doSave(buffers)
.doFinally((s) -> sink.success())
.subscribe());
} catch (Exception e) {
sink.success();
}
});
})
.onErrorResume((err) -> Mono
.fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err))))
.fromRunnable(() -> SystemUtils.printError("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err))))
.subscribe();
}
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
@Getter
static class Buffer {
String index;
String id;
String payload;
final ObjectPool.Handle<Buffer> handle;
public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle;
}
public static Buffer of(String index, Object payload) {
Buffer buffer;
try {
buffer = pool.get();
} catch (Throwable e) {
buffer = new Buffer(null);
}
buffer.index = index;
Map<String, Object> data = payload instanceof Map
? ((Map) payload) :
FastBeanCopier.copy(payload, HashMap::new);
Object id = data.get("id");
buffer.id = id == null ? null : String.valueOf(id);
buffer.payload = JSON.toJSONString(data);
return buffer;
}
void release() {
this.index = null;
this.id = null;
this.payload = null;
if (null != handle) {
handle.recycle(this);
}
}
int numberOfBytes() {
return payload == null ? 0 : payload.length() * 2;
}
}
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
@ -318,7 +442,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
return Flux
.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
@ -345,6 +470,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
if (buffer.refreshWhenWrite) {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
lst.forEach(request::add);
return restClient
.bulk(request)
@ -355,53 +483,35 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return save;
});
})
.doOnError((err) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
SystemUtils.printError("保存ElasticSearch数据失败:\n%s", () -> new Object[]{
org.hswebframework.utils.StringUtils.throwable2String(err)
});
})
.doOnNext(response -> {
log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}", response.getItems().length, response.getTook());
if (response.hasFailures()) {
System.err.println(response.buildFailureMessage());
SystemUtils.printError(response.buildFailureMessage());
}
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Flux<SearchHit> doSearch(SearchRequest request) {
return restClient
.search(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Mono<Long> doCount(SearchRequest request) {
return restClient
.count(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
return restClient.count(request);
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
@ -416,12 +526,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
@ -430,88 +540,4 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
}
@Getter
static class Buffer {
final ObjectPool.Handle<Buffer> handle;
String index;
String id;
String payload;
public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle;
}
public static Buffer of(String index, Object payload) {
Buffer buffer;
try {
buffer = pool.get();
} catch (Exception e) {
buffer = new Buffer(null);
}
buffer.index = index;
Map<String, Object> data = payload instanceof Map
? ((Map) payload) :
FastBeanCopier.copy(payload, HashMap::new);
Object id = data.get("id");
buffer.id = id == null ? null : String.valueOf(id);
buffer.payload = JSON.toJSONString(data);
return buffer;
}
void release() {
this.index = null;
this.id = null;
this.payload = null;
if (null != handle) {
handle.recycle(this);
}
}
int numberOfBytes() {
return payload == null ? 0 : payload.length() * 2;
}
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -6,10 +6,19 @@ import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* 订阅来自消息网关的消息
* 从事件总线{@link org.jetlinks.core.event.EventBus}中订阅消息并执行注解的方法,
* 事件总线的输出数据可以作为方法参数,如果类型不一致会自动转换
* 也可以通过方法参数直接获取事件总线的原始数据:{@link org.jetlinks.core.event.TopicPayload}
*
* <pre>
* &#64;Subscribe("/device/&#42;/&#42;/message")
* public Mono&lt;Void&gt; handleEvent(DeviceMessage msg){
* return doSomeThing(msg);
* }
* </pre>
* @author zhouhao
* @see org.jetlinks.community.gateway.MessageSubscriber
* @see org.jetlinks.core.event.EventBus
* @see org.jetlinks.community.gateway.spring.SpringMessageBroker
* @since 1.0
*/
@Target({ElementType.METHOD})
@ -18,14 +27,55 @@ import java.lang.annotation.*;
@Documented
public @interface Subscribe {
/**
* 要订阅的topic,topic是树结构,
* {@link org.springframework.util.AntPathMatcher}类似,支持通配符: **表示多层目录,*表示单层目录.
* <ul>
* <li>
* /device/p1/d1/online
* </li>
* <li>
* /device/p1/d1,d2/online
* </li>
* <li>
* /device/p1/&#42;/online
* </li>
* <li>
* /device/&#42;&#42;
* </li>
* </ul>
* <p>
* 支持使用表达式
* <pre>
* /device/${sub.product-id}/**
* </pre>
*
* @return topics
* @see Subscribe#value()
* @see org.jetlinks.core.event.EventBus#subscribe(Subscription)
*/
@AliasFor("value")
String[] topics() default {};
/**
* @return topics
* @see Subscribe#topics()
*/
@AliasFor("topics")
String[] value() default {};
/**
* 指定订阅者ID,默认为方法名
*
* @return 订阅者ID
*/
String id() default "";
/**
* 订阅特性默认只订阅本地进程内部的消息
*
* @return 订阅特性
*/
Subscription.Feature[] features() default Subscription.Feature.local;
}

View File

@ -3,18 +3,26 @@ package org.jetlinks.community.gateway.spring;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger;
import org.hswebframework.web.utils.TemplateParser;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.utils.TopicUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Signal;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@Component
@Slf4j
@ -23,6 +31,8 @@ public class SpringMessageBroker implements BeanPostProcessor {
private final EventBus eventBus;
private final Environment environment;
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> type = ClassUtils.getUserClass(bean);
@ -35,24 +45,35 @@ public class SpringMessageBroker implements BeanPostProcessor {
if (!StringUtils.hasText(id)) {
id = type.getSimpleName().concat(".").concat(method.getName());
}
Subscription subscription = Subscription.of(
"spring:" + id,
subscribes.getStringArray("value"),
(Subscription.Feature[]) subscribes.get("features"));
Subscription subscription = Subscription
.builder()
.subscriberId("spring:" + id)
.topics(Arrays.stream(subscribes.getStringArray("value"))
.map(this::convertTopic)
.flatMap(topic -> TopicUtils.expand(topic).stream())
.collect(Collectors.toList())
)
.features((Subscription.Feature[]) subscribes.get("features"))
.build();
ProxyMessageListener listener = new ProxyMessageListener(bean, method);
Consumer<Signal<Void>> logError = ReactiveLogger
.onError(error -> log.error("handle[{}] event message error", listener, error));
eventBus
.subscribe(subscription)
.doOnNext(msg ->
listener
.onMessage(msg)
.doOnEach(ReactiveLogger.onError(error -> {
log.error(error.getMessage(), error);
}))
.subscribe()
)
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
.doOnNext(msg -> {
try {
listener
.onMessage(msg)
.doOnEach(logError)
.subscribe();
} catch (Throwable e) {
log.error("handle[{}] event message error", listener, e);
}
})
.subscribe();
});
@ -60,4 +81,18 @@ public class SpringMessageBroker implements BeanPostProcessor {
return bean;
}
protected String convertTopic(String topic) {
if (!topic.contains("${")) {
return topic;
}
return TemplateParser.parse(topic, template -> {
String[] arr = template.split(":", 2);
String property = environment.getProperty(arr[0], arr.length > 1 ? arr[1] : "");
if (StringUtils.isEmpty(property)) {
throw new IllegalArgumentException("Parse topic [" + template + "] error, can not get property : " + arr[0]);
}
return property;
});
}
}

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>network-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -113,7 +113,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
.flatMap(this::handleConnection)
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
.onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
.subscribe();
@ -175,9 +174,13 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
connection.onClose(conn -> {
counter.decrement();
DeviceSession _tmp = sessionManager.getSession(newSession.getId());
if (newSession == _tmp || _tmp == null) {
sessionManager.unregister(deviceId);
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class)) {
MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class);
if (connectionSession.getConnection() == conn) {
sessionManager.unregister(deviceId);
}
}
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>network-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -71,6 +71,11 @@ public class DeviceGatewayHelper {
}
}
ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId());
if (deviceSession != null) {
deviceSession.keepAlive();
applySessionKeepaliveTimeout(children, () -> null)
.accept(deviceSession);
}
//子设备离线或者注销
if (children instanceof DeviceOfflineMessage || children instanceof DeviceUnRegisterMessage) {
//注销会话,这里子设备可能会收到多次离线消息
@ -96,13 +101,13 @@ public class DeviceGatewayHelper {
return Mono
.delay(Duration.ofSeconds(2))
.then(registry
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals)
.flatMap(ignore -> registerSession))
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals)
.flatMap(ignore -> registerSession))
);
}
return registerSession;

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<packaging>pom</packaging>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>network-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>notify-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>notify-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>notify-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -41,7 +41,7 @@
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.11.3</version>
<version>1.14.3</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>notify-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>notify-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>notify-component</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-community</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -16,7 +16,7 @@
<dependency>
<groupId>com.cronutils</groupId>
<artifactId>cron-utils</artifactId>
<version>9.0.2</version>
<version>9.1.6</version>
</dependency>
<dependency>
<groupId>org.jetlinks</groupId>

View File

@ -86,7 +86,13 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
.onErrorResume(error -> context.onError(error, input))
.subscribeOn(Schedulers.parallel())
)
.map(reply -> context.newRuleData(input.newData(reply.toJson())))
.map(reply -> {
RuleData data = context.newRuleData(input.newData(reply.toJson()));
if (config.getResponseHeaders() != null) {
config.getResponseHeaders().forEach(data::setHeader);
}
return data;
})
;
}
@ -146,6 +152,8 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid
private String stateOperator = "ignoreOffline";
private Map<String, Object> responseHeaders;
public Map<String, Object> toMap() {
Map<String, Object> conf = FastBeanCopier.copy(this, new HashMap<>());
conf.put("timeout", timeout.toString());

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-components</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.timeseries.query;
import org.jetlinks.community.ValueObject;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -20,6 +21,12 @@ public interface AggregationData extends ValueObject {
return asMap();
}
default AggregationData merge(AggregationData another) {
Map<String, Object> newVal = new HashMap<>(asMap());
newVal.putAll(another.asMap());
return of(newVal);
}
static AggregationData of(Map<String, Object> map) {
return () -> map;
}

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>authentication-manager</artifactId>

View File

@ -5,12 +5,14 @@ import lombok.Getter;
import lombok.Setter;
import org.apache.commons.collections4.CollectionUtils;
import java.io.Serializable;
import java.util.*;
import java.util.function.BiPredicate;
@Getter
@Setter
public class MenuButtonInfo {
public class MenuButtonInfo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "按钮ID")
private String id;

View File

@ -6,13 +6,15 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
import java.util.Set;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class PermissionInfo {
public class PermissionInfo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "权限ID")
private String permission;

View File

@ -13,9 +13,8 @@ import org.jetlinks.community.auth.entity.MenuEntity;
import org.jetlinks.community.auth.entity.MenuView;
import org.jetlinks.community.auth.entity.PermissionInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -55,7 +54,7 @@ public class MenuGrantRequest {
detail.setMerge(merge);
detail.setPriority(priority);
List<AuthorizationSettingDetail.PermissionInfo> permissionInfos = new ArrayList<>();
Map<String, Set<String>> permissionInfos = new ConcurrentHashMap<>();
for (MenuView menu : menus) {
//平铺
@ -68,7 +67,9 @@ public class MenuGrantRequest {
//自动持有配置的权限
if (CollectionUtils.isNotEmpty(entity.getPermissions())) {
for (PermissionInfo permission : entity.getPermissions()) {
permissionInfos.add(AuthorizationSettingDetail.PermissionInfo.of(permission.getPermission(), permission.getActions()));
permissionInfos
.computeIfAbsent(permission.getPermission(), ignore -> new HashSet<>())
.addAll(permission.getActions());
}
}
@ -78,8 +79,12 @@ public class MenuGrantRequest {
.ifPresent(buttonInfo -> {
if (CollectionUtils.isNotEmpty(buttonInfo.getPermissions())) {
for (PermissionInfo permission : buttonInfo.getPermissions()) {
if (CollectionUtils.isEmpty(permission.getActions())) {
continue;
}
permissionInfos
.add(AuthorizationSettingDetail.PermissionInfo.of(permission.getPermission(), permission.getActions()));
.computeIfAbsent(permission.getPermission(), ignore -> new HashSet<>())
.addAll(permission.getActions());
}
}
@ -88,7 +93,12 @@ public class MenuGrantRequest {
}
}
}
detail.setPermissionList(permissionInfos);
detail.setPermissionList(permissionInfos
.entrySet()
.stream()
.map(e -> AuthorizationSettingDetail.PermissionInfo.of(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
return detail;
}

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>device-manager</artifactId>

View File

@ -13,8 +13,12 @@ import org.jetlinks.community.device.enums.DeviceFeature;
import org.jetlinks.community.device.enums.DeviceState;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceInfo;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.MergeOption;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import javax.persistence.Column;
import javax.persistence.GeneratedValue;
@ -23,7 +27,7 @@ import javax.persistence.Table;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.Pattern;
import java.sql.JDBCType;
import java.util.Map;
import java.util.*;
import java.util.stream.Stream;
@Getter
@ -158,6 +162,65 @@ public class DeviceInstanceEntity extends GenericEntity<String> implements Recor
return info;
}
public void mergeConfiguration(Map<String, Object> configuration) {
if (this.configuration == null) {
this.configuration = new HashMap<>();
}
Map<String, Object> newConf = new HashMap<>(configuration);
//状态自管理单独设置到feature中
Object selfManageState = newConf.remove(DeviceConfigKey.selfManageState.getKey());
if (null != selfManageState) {
if (Boolean.TRUE.equals(selfManageState)) {
addFeature(DeviceFeature.selfManageState);
} else {
removeFeature(DeviceFeature.selfManageState);
}
}
//物模型单独设置
Object metadata = newConf.remove(DeviceConfigKey.metadata.getKey());
if (null != metadata) {
setDeriveMetadata(String.valueOf(metadata));
}
this.configuration.putAll(newConf);
}
public void removeFeature(DeviceFeature... features) {
if (this.features != null) {
List<DeviceFeature> featureList = new ArrayList<>(Arrays.asList(this.features));
for (DeviceFeature feature : features) {
featureList.remove(feature);
}
this.features = featureList.toArray(new DeviceFeature[0]);
}
}
public Mono<String> mergeMetadata(String metadata) {
return JetLinksDeviceMetadataCodec
.getInstance()
.decode(metadata)
.flatMap(this::mergeMetadata);
}
public Mono<String> mergeMetadata(DeviceMetadata metadata) {
JetLinksDeviceMetadataCodec codec = JetLinksDeviceMetadataCodec.getInstance();
if (StringUtils.isEmpty(this.getDeriveMetadata())) {
return codec.encode(metadata)
.doOnNext(this::setDeriveMetadata);
}
return Mono
.zip(
codec.decode(getDeriveMetadata()),
Mono.just(metadata),
(derive, another) -> derive.merge(another, MergeOption.ignoreExists)
)
.flatMap(codec::encode)
.doOnNext(this::setDeriveMetadata);
}
public void addFeature(DeviceFeature... features) {
if (this.features == null) {
this.features = features;

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.device.message;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.Values;
import org.jetlinks.core.device.DeviceOperator;
@ -221,6 +222,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
if (null == message) {
return Mono.empty();
}
message.addHeader(PropertyConstants.uid, IDGenerator.SNOW_FLAKE_STRING.generate());
return this
.getTopic(message)
.flatMap(topic -> eventBus.publish(topic, message).then())

View File

@ -12,6 +12,8 @@ import org.jetlinks.community.device.enums.DeviceType;
import org.jetlinks.core.Values;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.metadata.ConfigPropertyMetadata;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
@ -127,9 +129,29 @@ public class DeviceDetail {
public DeviceDetail notActive() {
state = DeviceState.notActive;
initTags();
return this;
}
private DeviceMetadata decodeMetadata() {
if (StringUtils.isEmpty(metadata)) {
return null;
}
return JetLinksDeviceMetadataCodec.getInstance().doDecode(metadata);
}
private void initTags() {
DeviceMetadata metadata = decodeMetadata();
if (null != metadata) {
with(metadata
.getTags()
.stream()
.map(DeviceTagEntity::of)
.collect(Collectors.toList()));
}
}
public Mono<DeviceDetail> with(DeviceOperator operator, List<ConfigPropertyMetadata> configs) {
return Mono
.zip(
@ -140,7 +162,7 @@ public class DeviceDetail {
//T3: 离线时间
operator.getOfflineTime().defaultIfEmpty(0L),
//T4: 物模型
operator.getMetadata(),
operator.getMetadata().switchIfEmpty(Mono.fromSupplier(this::decodeMetadata)),
//T5: 真实的配置信息
operator.getSelfConfigs(configs
.stream()

View File

@ -4,6 +4,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.device.enums.DeviceFeature;
@ -49,6 +50,14 @@ public class DeviceMessageBusinessHandler {
private final EventBus eventBus;
/**
* 自动注册设备信息
* <p>
* 设备消息的header需要包含{@code deviceName},{@code productId}才会自动注册.
*
* @param message 注册消息
* @return 注册后的设备操作接口
*/
private Mono<DeviceOperator> doAutoRegister(DeviceRegisterMessage message) {
//自动注册
return Mono
@ -75,15 +84,18 @@ public class DeviceMessageBusinessHandler {
instance.setCreateTimeNow();
instance.setCreatorId(tps.getT4().getCreatorId());
instance.setOrgId(tps.getT4().getOrgId());
//设备自状态管理
//网关注册设备子设备时,设置自状态管理
//在检查子设备状态时,将会发送ChildDeviceMessage<DeviceStateCheckMessage>到网关
//网关需要回复ChildDeviceMessageReply<DeviceStateCheckMessageReply>
@SuppressWarnings("all")
boolean selfManageState = CastUtils
.castBoolean(tps.getT5().getOrDefault(DeviceConfigKey.selfManageState.getKey(), false));
if (selfManageState) {
instance.addFeature(DeviceFeature.selfManageState);
}
instance.setState(selfManageState ? DeviceState.offline : DeviceState.online);
//合并配置
instance.mergeConfiguration(tps.getT5());
return deviceService
.save(Mono.just(instance))
@ -96,25 +108,29 @@ public class DeviceMessageBusinessHandler {
});
}
@Subscribe("/device/*/*/register")
@Transactional(propagation = Propagation.NEVER)
public Mono<Void> autoRegisterDevice(DeviceRegisterMessage message) {
return registry
.getDevice(message.getDeviceId())
.flatMap(device -> {
//注册消息中修改了配置信息
@SuppressWarnings("all")
Map<String, Object> config = message.getHeader("configuration").map(Map.class::cast).orElse(null);
if (MapUtils.isNotEmpty(config)) {
return device
.setConfigs(config)
return deviceService
.mergeConfiguration(device.getDeviceId(), config, update ->
//更新设备名称
update.set(DeviceInstanceEntity::getName,
message.getHeader(PropertyConstants.deviceName).orElse(null)))
.thenReturn(device);
}
return Mono.just(device);
})
.switchIfEmpty(Mono.defer(() -> {
//自动注册
return doAutoRegister(message);
}))
//注册中心中没有此设备则进行自动注册
.switchIfEmpty(Mono.defer(() -> doAutoRegister(message)))
.then();
}

View File

@ -5,6 +5,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
import org.hswebframework.ezorm.rdb.operator.dml.Terms;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
@ -60,6 +61,7 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
public LocalDeviceInstanceService(DeviceRegistry registry,
LocalDeviceProductService deviceProductService,
DeviceConfigMetadataManager metadataManager,
@SuppressWarnings("all")
ReactiveRepository<DeviceTagEntity, String> tagRepository) {
this.registry = registry;
this.deviceProductService = deviceProductService;
@ -327,20 +329,20 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.collect(Collectors.groupingBy(Tuple2::getT1))
.flatMapIterable(Map::entrySet)
.flatMap(group -> {
List<String> deviceId = group
List<String> deviceIdList = group
.getValue()
.stream()
.map(Tuple3::getT2)
.collect(Collectors.toList());
DeviceState state = DeviceState.of(group.getKey());
return Mono
.zip(
return Flux
.concat(
//批量修改设备状态
this.getRepository()
getRepository()
.createUpdate()
.set(DeviceInstanceEntity::getState, state)
.where()
.in(DeviceInstanceEntity::getId, deviceId)
.in(DeviceInstanceEntity::getId, deviceIdList)
.execute()
.thenReturn(group.getValue().size()),
//修改子设备状态
@ -355,6 +357,8 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.set(DeviceInstanceEntity::getState, state)
.where()
.in(DeviceInstanceEntity::getParentId, parents)
//不修改未激活的状态
.not(DeviceInstanceEntity::getState, DeviceState.notActive)
.nest()
/* */.accept(DeviceInstanceEntity::getFeatures, Terms.Enums.notInAny, DeviceFeature.selfManageState)
/* */.or()
@ -363,10 +367,12 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
.execute())
.defaultIfEmpty(0)
)
.thenReturn(deviceId
.stream()
.map(id -> DeviceStateInfo.of(id, state))
.collect(Collectors.toList()));
.then(Mono.just(
deviceIdList
.stream()
.map(id -> DeviceStateInfo.of(id, state))
.collect(Collectors.toList())
));
}));
}
@ -472,4 +478,35 @@ public class LocalDeviceInstanceService extends GenericReactiveCrudService<Devic
return checker.check(instance);
}
public Mono<Void> mergeConfiguration(String deviceId,
Map<String, Object> configuration,
Function<ReactiveUpdate<DeviceInstanceEntity>,
ReactiveUpdate<DeviceInstanceEntity>> updateOperation) {
if (MapUtils.isEmpty(configuration)) {
return Mono.empty();
}
return this
.findById(deviceId)
.flatMap(device -> {
//合并更新配置
device.mergeConfiguration(configuration);
return createUpdate()
.set(device::getConfiguration)
.set(device::getFeatures)
.set(device::getDeriveMetadata)
.as(updateOperation)
.where(device::getId)
.execute();
})
.then(
//更新缓存里到信息
registry
.getDevice(deviceId)
.flatMap(device -> device.setConfigs(configuration))
)
.then();
}
}

View File

@ -6,10 +6,7 @@ import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.community.timeseries.query.Group;
import org.jetlinks.community.timeseries.query.TimeGroup;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
@ -220,9 +217,28 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
.to(request.to)
.filter(request.filter)
.execute(timeSeriesManager.getService(getPropertyTimeSeriesMetric(productId))::aggregation)
.groupBy(agg -> agg.getString("time", ""))
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.flatMap(group -> group
.map(AggregationData::asMap)
.map(data -> {
Map<String, Object> newMap = new HashMap<>();
newMap.put("time", data.get("time").orElse(null));
for (DeviceDataService.DevicePropertyAggregation property : properties) {
Object val;
if(property.getAgg() == Aggregation.FIRST || property.getAgg()==Aggregation.TOP){
val = data
.get(property.getProperty())
.orElse(null);
}else {
val = data
.get(property.getAlias())
.orElse(null);
}
if (null != val) {
newMap.put(property.getAlias(), val);
}
}
return newMap;
})
.reduce((a, b) -> {
a.putAll(b);
return a;
@ -231,6 +247,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
.sort(Comparator.<AggregationData, Date>comparing(agg -> DateTime
.parse(agg.getString("time", ""), formatter)
.toDate()).reversed())
.take(request.getLimit())
;
}

View File

@ -223,47 +223,92 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
}
Map<String, String> propertyAlias = Arrays.stream(properties)
.collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias, DeviceDataService.DevicePropertyAggregation::getProperty));
.collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias,
DeviceDataService.DevicePropertyAggregation::getProperty));
return AggregationQueryParam.of()
Map<String, DeviceDataService.DevicePropertyAggregation> aliasProperty = Arrays
.stream(properties)
.collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias,
Function.identity()));
return AggregationQueryParam
.of()
.as(param -> {
Arrays.stream(properties)
.forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg()));
.forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg()));
return param;
})
.groupBy((Group) new TimeGroup(request.interval, "time", request.format))
.as(param -> {
if (request.interval == null) {
return param;
}
return param.groupBy((Group) new TimeGroup(request.interval, "time", request.format));
})
.groupBy(new LimitGroup("property", "property", properties.length))
.limit(request.limit * properties.length)
.from(request.from)
.to(request.to)
.filter(request.filter)
.filter(query -> query.where().in("property", propertyAlias.values()))
.filter(query -> query
.where()
.in("property", new HashSet<>(propertyAlias.values())))
//执行查询
.execute(timeSeriesManager.getService(getTimeSeriesMetric(productId))::aggregation)
//按时间分组,然后将返回的结果合并起来
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.flatMap(group ->
{
String time = group.key();
return group
//按属性分组
.groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE)
.flatMap(propsGroup -> {
String property = propsGroup.key();
return propsGroup
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putIfAbsent("time", time);
a.putIfAbsent("_time", b.get("_time").orElseGet(Date::new));
b.get("value_" + property).ifPresent(v -> a.put(property, v));
return a;
});
})
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putAll(b);
return a;
});
.as(flux -> {
//按时间分组
if (request.getInterval() != null) {
return flux
.flatMap(group -> {
String time = group.key();
return group
//按属性分组
.groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE)
.flatMap(propsGroup -> {
String property = String.valueOf(propsGroup.key());
return propsGroup
.reduce(AggregationData::merge)
.map(agg -> {
Map<String, Object> data = new HashMap<>();
data.put("_time", agg.get("_time").orElse(time));
data.put("time", time);
aliasProperty.forEach((alias, prp) -> {
if (prp.getAgg() == Aggregation.FIRST || prp.getAgg() == Aggregation.TOP) {
data.putIfAbsent(alias, agg
.get("numberValue")
.orElse(agg.get("value").orElse(null)));
} else if (property.equals(prp.getProperty())) {
data.putIfAbsent(alias, agg
.get("value_" + alias)
.orElse(0));
}
});
return data;
});
})
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putAll(b);
return a;
});
}
);
} else {
return flux
.flatMap(group -> group
.reduce(AggregationData::merge)
.map(agg -> {
Map<String, Object> values = new HashMap<>();
//values.put("time", group.key());
for (Map.Entry<String, String> props : propertyAlias.entrySet()) {
values.put(props.getKey(), agg
.get("value_" + props.getKey())
.orElse(0));
}
return values;
}));
}
)
})
.map(map -> {
map.remove("");
propertyAlias
@ -271,8 +316,12 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
.forEach(key -> map.putIfAbsent(key, 0));
return AggregationData.of(map);
})
.sort(Comparator.<AggregationData, Date>comparing(agg -> CastUtils.castDate(agg.values().get("_time"))).reversed())
.sort(Comparator.<AggregationData, Date>comparing(agg -> CastUtils.castDate(agg
.values()
.get("_time")))
.reversed())
.doOnNext(agg -> agg.values().remove("_time"))
.take(request.getLimit())
;
}

View File

@ -819,5 +819,29 @@ public class DeviceInstanceController implements
.then());
}
//合并产品的物模型
@PutMapping(value = "/{id}/metadata/merge-product")
@SaveAction
@Operation(summary = "合并产品的物模型")
public Mono<Void> mergeProductMetadata(@PathVariable String id) {
return service
.findById(id)
//只有单独保存过物模型的才合并
.filter(deviceInstance -> StringUtils.hasText(deviceInstance.getDeriveMetadata()))
.flatMap(deviceInstance -> productService
.findById(deviceInstance.getProductId())
.flatMap(product -> deviceInstance.mergeMetadata(product.getMetadata()))
.then(
Mono.defer(() -> service
.createUpdate()
.set(deviceInstance::getDeriveMetadata)
.where(deviceInstance::getId)
.execute()
.then(registry.getDevice(deviceInstance.getId()))
.flatMap(device -> device.updateMetadata(deviceInstance.getDeriveMetadata()))
.then())
));
}
}

View File

@ -65,39 +65,44 @@ public class GatewayDeviceController {
@QueryAction
@QueryOperation(summary = "查询网关设备详情")
public Mono<PagerResult<GatewayDeviceInfo>> queryGatewayDevice(@Parameter(hidden = true) QueryParamEntity param) {
return getGatewayProductList()
.flatMap(productIdList ->
param.toNestQuery(query -> query.in(DeviceInstanceEntity::getProductId, productIdList))
.execute(instanceService::queryPager)
.filter(r -> r.getTotal() > 0)
.flatMap(result -> {
Map<String, DeviceInstanceEntity> mapping =
result.getData()
.stream()
.collect(Collectors.toMap(DeviceInstanceEntity::getId, Function.identity()));
return this
.getGatewayProductList()
.flatMap(productIdList -> param
.toNestQuery(query -> query.in(DeviceInstanceEntity::getProductId, productIdList))
.execute(instanceService::queryPager)
.filter(r -> r.getTotal() > 0)
.flatMap(result -> {
Map<String, DeviceInstanceEntity> mapping =
result.getData()
.stream()
.collect(Collectors.toMap(DeviceInstanceEntity::getId, Function.identity()));
//查询所有子设备并按父设备ID分组
return instanceService.createQuery()
.where()
.in(DeviceInstanceEntity::getParentId, mapping.keySet())
.fetch()
.groupBy(DeviceInstanceEntity::getParentId, Integer.MAX_VALUE)
.flatMap(group -> {
String parentId = group.key();
return group
.collectList()
//将父设备和分组的子设备合并在一起
.map(children -> GatewayDeviceInfo.of(mapping.get(parentId), children));
})
.collectMap(GatewayDeviceInfo::getId)//收集所有有子设备的网关设备信息
.defaultIfEmpty(Collections.emptyMap())
.flatMapMany(map -> Flux.fromIterable(mapping.values())
.flatMap(ins -> Mono.justOrEmpty(map.get(ins.getId()))
//处理没有子设备的网关信息
.switchIfEmpty(Mono.fromSupplier(() -> GatewayDeviceInfo.of(ins, Collections.emptyList())))))
.collectList()
.map(list -> PagerResult.of(result.getTotal(), list, param));
}))
//查询所有子设备并按父设备ID分组
return instanceService
.createQuery()
.where()
.in(DeviceInstanceEntity::getParentId, mapping.keySet())
.fetch()
.groupBy(DeviceInstanceEntity::getParentId, Integer.MAX_VALUE)
.flatMap(group -> {
String parentId = group.key();
return group
.collectList()
//将父设备和分组的子设备合并在一起
.map(children -> GatewayDeviceInfo.of(mapping.get(parentId), children));
})
//收集所有有子设备的网关设备信息
.collectMap(GatewayDeviceInfo::getId)
.defaultIfEmpty(Collections.emptyMap())
.flatMapMany(map -> Flux
.fromIterable(mapping.values())
.flatMap(ins -> Mono
.justOrEmpty(map.get(ins.getId()))
//处理没有子设备的网关信息
.switchIfEmpty(Mono.fromSupplier(() -> GatewayDeviceInfo.of(ins, Collections.emptyList())))))
.collectList()
.map(list -> PagerResult.of(result.getTotal(), list, param));
}))
.defaultIfEmpty(PagerResult.empty());
}

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<artifactId>logging-manager</artifactId>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>network-manager</artifactId>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>notify-manager</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-community</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>
<packaging>pom</packaging>
<modelVersion>4.0.0</modelVersion>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>rule-engine-manager</artifactId>

View File

@ -225,16 +225,22 @@ public class DeviceAlarmRule implements Serializable {
private List<ConditionFilter> filters;
public Set<String> toColumns() {
Set<String> columns = new LinkedHashSet<>();
columns.add(type.getPropertyPrefix() + "this $this");
return Stream.concat(
(StringUtils.hasText(modelId)
? Collections.singleton(type.getPropertyPrefix() + "this['" + modelId + "'] \"" + modelId + "\"")
: Collections.<String>emptySet()).stream(),
(CollectionUtils.isEmpty(filters)
? Stream.<ConditionFilter>empty()
: filters.stream())
.map(filter -> filter.getColumn(type)))
.collect(Collectors.toSet());
if (StringUtils.hasText(modelId)) {
//this.properties.this['temp'] temp
columns.add(
type.getPropertyPrefix() + "this['" + modelId + "'] \"" + modelId + "\""
);
}
if (!CollectionUtils.isEmpty(filters)) {
for (ConditionFilter filter : filters) {
columns.add(filter.getColumn(type));
}
}
return columns;
}
public List<Object> toFilterBinds() {
@ -255,6 +261,45 @@ public class DeviceAlarmRule implements Serializable {
);
}
public String toSQL(int index, List<String> defaultColumns, List<DeviceAlarmRule.Property> properties) {
List<String> columns = new ArrayList<>(defaultColumns);
List<String> wheres = new ArrayList<>();
// select this.properties.this trigger0
columns.add(getType().getPropertyPrefix() + "this trigger" + index);
columns.addAll(toColumns());
createExpression()
.ifPresent(expr -> wheres.add("(" + expr + ")"));
String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
if (!wheres.isEmpty()) {
sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
}
if (org.apache.commons.collections.CollectionUtils.isNotEmpty(properties)) {
List<String> newColumns = new ArrayList<>(defaultColumns);
for (DeviceAlarmRule.Property property : properties) {
if (StringUtils.isEmpty(property.getProperty())) {
continue;
}
String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
// 'message',func(),this[name]
if ((property.getProperty().startsWith("'") && property.getProperty().endsWith("'"))
||
property.getProperty().contains("(") || property.getProperty().contains("[")) {
newColumns.add(property.getProperty() + " \"" + alias + "\"");
} else {
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
}
}
if (newColumns.size() > defaultColumns.size()) {
sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
}
}
return sql;
}
public void validate() {
if (type == null) {
throw new IllegalArgumentException("类型不能为空");

View File

@ -4,15 +4,19 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.BusinessException;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.ValueObject;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.metadata.Jsonable;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
@ -29,8 +33,10 @@ import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@AllArgsConstructor
@ -53,16 +59,32 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
static class DeviceAlarmTaskExecutor extends AbstractTaskExecutor {
List<String> default_columns = Arrays.asList(
"timestamp", "deviceId", "this.headers headers", "this.headers.deviceName deviceName"
/**
* 默认要查询的列
*/
static List<String> default_columns = Arrays.asList(
//时间戳
"this.timestamp timestamp",
//设备ID
"this.deviceId deviceId",
//header
"this.headers headers",
//设备名称,通过DeviceMessageConnector自定填充了值
"this.headers.deviceName deviceName",
//消息唯一ID
"this.headers._uid _uid",
//消息类型,下游可以根据消息类型来做处理,比如:离线时,如果网关设备也不在线则不触发.
"this.messageType messageType"
);
private final EventBus eventBus;
private final Scheduler scheduler;
private DeviceAlarmRule rule;
//触发器对应的ReactorQL缓存
private final Map<DeviceAlarmRule.Trigger, ReactorQL> triggerQL = new ConcurrentHashMap<>();
private ReactorQL ql;
//告警规则
private DeviceAlarmRule rule;
DeviceAlarmTaskExecutor(ExecutionContext context,
EventBus eventBus,
@ -70,10 +92,10 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
super(context);
this.eventBus = eventBus;
this.scheduler = scheduler;
rule = createRule();
ql = createQL(rule);
init();
}
@Override
public String getName() {
return "设备告警";
@ -96,146 +118,137 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
.subscribe();
}
void init() {
rule = createRule();
Map<DeviceAlarmRule.Trigger, ReactorQL> ql = createQL(rule);
triggerQL.clear();
triggerQL.putAll(ql);
}
@Override
public void reload() {
rule = createRule();
ql = createQL(rule);
init();
if (disposable != null) {
disposable.dispose();
}
disposable = doStart();
}
@Nonnull
private DeviceAlarmRule createRule() {
DeviceAlarmRule rule = ValueObject
.of(context.getJob().getConfiguration())
.get("rule")
.map(val -> FastBeanCopier.copy(val, new DeviceAlarmRule()))
.orElseThrow(() -> new IllegalArgumentException("告警配置错误"));
.orElseThrow(() -> new IllegalArgumentException("error.alarm_configuration_error"));
rule.validate();
return rule;
}
@Override
public void validate() {
DeviceAlarmRule rule = createRule();
try {
createQL(rule);
createQL(createRule());
} catch (Exception e) {
throw new IllegalArgumentException("配置错误:" + e.getMessage(), e);
throw new BusinessException("error.configuration_error", 500, e.getMessage(), e);
}
}
private ReactorQL createQL(DeviceAlarmRule rule) {
List<String> columns = new ArrayList<>(default_columns);
List<String> wheres = new ArrayList<>();
List<DeviceAlarmRule.Trigger> triggers = rule.getTriggers();
for (int i = 0; i < triggers.size(); i++) {
DeviceAlarmRule.Trigger trigger = triggers.get(i);
// select this.properties.this trigger0
columns.add(trigger.getType().getPropertyPrefix() + "this trigger" + i);
columns.addAll(trigger.toColumns());
trigger.createExpression()
.ifPresent(expr -> wheres.add("(" + expr + ")"));
}
String sql = "select \n\t\t" + String.join("\n\t\t,", columns) + " \n\tfrom dual ";
if (!wheres.isEmpty()) {
sql += "\n\twhere " + String.join("\n\t\t or ", wheres);
}
if (CollectionUtils.isNotEmpty(rule.getProperties())) {
List<String> newColumns = new ArrayList<>(Arrays.asList(
"this.deviceName deviceName",
"this.deviceId deviceId",
"this.headers headers",
"this.timestamp timestamp"));
for (DeviceAlarmRule.Property property : rule.getProperties()) {
if (StringUtils.isEmpty(property.getProperty())) {
continue;
}
String alias = StringUtils.hasText(property.getAlias()) ? property.getAlias() : property.getProperty();
// 'message',func(),this[name]
if ((property.getProperty().startsWith("'") && property.getProperty().endsWith("'"))
||
property.getProperty().contains("(") || property.getProperty().contains("[")) {
newColumns.add(property.getProperty() + " \"" + alias + "\"");
} else {
newColumns.add("this['" + property.getProperty() + "'] \"" + alias + "\"");
}
}
if (newColumns.size() > 4) {
sql = "select \n\t" + String.join("\n\t,", newColumns) + "\n from (\n\t" + sql + "\n) t";
}
}
static ReactorQL createQL(int index, DeviceAlarmRule.Trigger trigger, DeviceAlarmRule rule) {
String sql = trigger.toSQL(index, default_columns, rule.getProperties());
log.debug("create device alarm sql : \n{}", sql);
return ReactorQL.builder().sql(sql).build();
}
public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
Set<String> topics = new HashSet<>();
private Map<DeviceAlarmRule.Trigger, ReactorQL> createQL(DeviceAlarmRule rule) {
Map<DeviceAlarmRule.Trigger, ReactorQL> qlMap = new HashMap<>();
int index = 0;
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
qlMap.put(trigger, createQL(index++, trigger, rule));
}
return qlMap;
}
List<Object> binds = new ArrayList<>();
public Flux<Map<String, Object>> doSubscribe(EventBus eventBus) {
//满足触发条件的输出数据流
List<Flux<? extends Map<String, Object>>> triggerOutputs = new ArrayList<>();
int index = 0;
//上游节点的输入
//定时触发时: 定时节点输出到设备指令节点,设备指令节点输出到当前节点
Flux<RuleData> input = context
.getInput()
.accept()
//使用cache,多个定时收到相同的数据
//通过header来进行判断具体是哪个触发器触发的,应该还有更好的方式.
.cache(0);
for (DeviceAlarmRule.Trigger trigger : rule.getTriggers()) {
binds.addAll(trigger.toFilterBinds());
//since 1.11 定时触发的不从eventBus订阅
if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) {
//QL不存在,理论上不会发生
ReactorQL ql = triggerQL.get(trigger);
if (ql == null) {
log.warn("DeviceAlarmRule trigger {} init error", index);
continue;
}
Flux<? extends Map<String, Object>> datasource;
String topic = trigger
.getType()
.getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
topics.add(topic);
}
int currentIndex = index;
//since 1.11 定时触发的不从eventBus订阅
if (trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer) {
//从上游获取输入进行处理(通常是定时触发发送指令后得到的回复)
datasource = input
.filter(data -> {
//通过上游输出的header来判断是否为同一个触发规则还有更好的方式?
return data
.getHeader("triggerIndex")
.map(idx -> CastUtils.castNumber(idx).intValue() == currentIndex)
.orElse(true);
})
.flatMap(RuleData::dataToMap);
}
//从事件总线中订阅数据
else {
String topic = trigger
.getType()
.getTopic(rule.getProductId(), rule.getDeviceId(), trigger.getModelId());
List<Flux<? extends Map<String, Object>>> inputs = new ArrayList<>();
//从上游获取输入进行处理(通常是定时触发发送指令后得到的回复)
inputs.add(
context
.getInput()
.accept()
.flatMap(RuleData::dataToMap)
);
//从事件总线订阅数据进行处理
if (!topics.isEmpty()) {
Subscription subscription = Subscription.of(
"device_alarm:" + rule.getId(),
topics.toArray(new String[0]),
Subscription.Feature.local
);
inputs.add(
eventBus
//从事件总线订阅数据进行处理
Subscription subscription = Subscription.of(
"device_alarm:" + rule.getId() + ":" + index++,
topic,
Subscription.Feature.local
);
datasource = eventBus
.subscribe(subscription, DeviceMessage.class)
.map(Jsonable::toJson)
.doOnNext(json -> {
.map(Jsonable::toJson);
}
ReactorQLContext qlContext = ReactorQLContext
.ofDatasource((t) -> datasource
.doOnNext(map -> {
if (StringUtils.hasText(rule.getDeviceName())) {
json.putIfAbsent("deviceName", rule.getDeviceName());
map.putIfAbsent("deviceName", rule.getDeviceName());
}
if (StringUtils.hasText(rule.getProductName())) {
json.putIfAbsent("productName", rule.getProductName());
map.putIfAbsent("productName", rule.getProductName());
}
json.put("productId", rule.getProductId());
json.put("alarmId", rule.getId());
json.put("alarmName", rule.getName());
})
);
map.put("productId", rule.getProductId());
map.put("alarmId", rule.getId());
map.put("alarmName", rule.getName());
}));
//绑定SQL中的预编译变量
trigger.toFilterBinds().forEach(qlContext::bind);
//启动ReactorQL进行实时数据处理
triggerOutputs.add(ql.start(qlContext).map(ReactorQLRecord::asMap));
}
ReactorQLContext context = ReactorQLContext
.ofDatasource(ignore -> Flux.merge(inputs));
binds.forEach(context::bind);
Flux<Map<String, Object>> resultFlux = (ql == null ? ql = createQL(rule) : ql)
.start(context)
.map(ReactorQLRecord::asMap);
Flux<Map<String, Object>> resultFlux = Flux.merge(triggerOutputs);
//防抖
ShakeLimit shakeLimit;
if ((shakeLimit = rule.getShakeLimit()) != null) {
@ -246,6 +259,7 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
//规则已经指定了固定的设备,直接开启时间窗口就行
? flux.window(duration, scheduler)
//规则配置在设备产品上,则按设备ID分组后再开窗口
//设备越多,消耗的内存越大
: flux
.groupBy(map -> String.valueOf(map.get("deviceId")), Integer.MAX_VALUE)
.flatMap(group -> group.window(duration, scheduler), Integer.MAX_VALUE),
@ -254,6 +268,17 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
}
return resultFlux
.as(result -> {
//有多个触发条件时对重复的数据进行去重,
//防止同时满足条件时会产生多个告警记录
if (rule.getTriggers().size() > 1) {
return result
.as(FluxUtils.distinct(
map -> map.getOrDefault(PropertyConstants.uid.getKey(), ""),
Duration.ofSeconds(1)));
}
return result;
})
.flatMap(map -> {
@SuppressWarnings("all")
Map<String, Object> headers = (Map<String, Object>) map.remove("headers");
@ -287,8 +312,6 @@ public class DeviceAlarmTaskExecutorProvider implements TaskExecutorProvider {
//生成告警记录时生成ID方便下游做处理
map.putIfAbsent("id", IDGenerator.MD5.generate());
// 推送告警信息到消息网关中
// /rule-engine/device/alarm/{productId}/{deviceId}/{ruleId}
return eventBus
.publish(String.format(
"/rule-engine/device/alarm/%s/%s/%s",

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.rule.engine.model;
import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.rule.engine.device.DeviceAlarmRule;
import org.jetlinks.community.rule.engine.entity.DeviceAlarmEntity;
import org.jetlinks.community.rule.engine.executor.DeviceMessageSendTaskExecutorProvider;
@ -32,6 +33,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
@Override
public RuleModel parse(String modelDefineString) {
//模型就是DeviceAlarmEntity的json
DeviceAlarmEntity rule = FastBeanCopier.copy(JSON.parseObject(modelDefineString), DeviceAlarmEntity::new);
RuleModel model = new RuleModel();
@ -39,28 +41,33 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
model.setName(rule.getName());
DeviceAlarmRule alarmRule = rule.getAlarmRule();
//验证规则
alarmRule.validate();
//告警条件节点
RuleNodeModel conditionNode = new RuleNodeModel();
conditionNode.setId("conditions");
conditionNode.setName("警条件");
conditionNode.setName("警条件");
conditionNode.setExecutor("device_alarm");
conditionNode.setConfiguration(Collections.singletonMap("rule", rule.getAlarmRule()));
//处理定时触发
//处理定时触发(定时向设备发送指令并获取返回结果)
{
List<DeviceAlarmRule.Trigger> timerTriggers = alarmRule
.getTriggers()
.stream()
//定时节点
.filter(trigger -> trigger.getTrigger() == DeviceAlarmRule.TriggerType.timer)
.collect(Collectors.toList());
int index = 0;
for (DeviceAlarmRule.Trigger timerTrigger : timerTriggers) {
DeviceMessage msg = timerTrigger.getType().createMessage(timerTrigger).orElse(null);
if (msg == null) {
throw new UnsupportedOperationException("不支持定时条件类型:" + timerTrigger.getType());
throw new BusinessException("error.unsupported_timing_condition_type", 500, timerTrigger.getType());
}
//定时节点
//TimerTaskExecutorProvider
RuleNodeModel timer = new RuleNodeModel();
timer.setId("timer:" + (index));
timer.setName("定时发送设备消息");
@ -68,30 +75,42 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron()));
//发送指令节点
//DeviceMessageSendTaskExecutorProvider
DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig senderDeviceMessageSendConfig = new DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig();
//同步等待回复
senderDeviceMessageSendConfig.setAsync(false);
//直接发送不管设备是否在线
senderDeviceMessageSendConfig.setStateOperator("direct");
senderDeviceMessageSendConfig.setDeviceId(alarmRule.getDeviceId());
senderDeviceMessageSendConfig.setProductId(alarmRule.getProductId());
senderDeviceMessageSendConfig.setMessage(msg.toJson());
// 添加自定义响应头到RuleData中
// 用于在收到结果时,判断是由哪个触发条件触发的
// 因为所有告警节点只有一个,所有的定时执行结果都会输入到同一个节点中
senderDeviceMessageSendConfig.setResponseHeaders(Collections.singletonMap("triggerIndex", index));
//设备指令发送节点
//DeviceMessageSendTaskExecutorProvider
RuleNodeModel messageSender = new RuleNodeModel();
messageSender.setId("message-sender:" + (index));
messageSender.setName("定时发送设备消息");
messageSender.setExecutor("device-message-sender");
messageSender.setConfiguration(senderDeviceMessageSendConfig.toMap());
//连接定时和设备指令节点
RuleLink link = new RuleLink();
link.setId(timer.getId().concat(":").concat(messageSender.getId()));
link.setName("执行动作:" + index);
link.setName("发送指令:" + index);
link.setSource(timer);
link.setTarget(messageSender);
//timer -> device-message-sender
timer.getOutputs().add(link);
//device-message-sender -> timer
messageSender.getInputs().add(link);
//添加定时和消息节点到模型
model.getNodes().add(timer);
model.getNodes().add(messageSender);
//输出传递到告警节点
//设备指令和告警条件节点连接起来
RuleLink toAlarm = new RuleLink();
toAlarm.setId(messageSender.getId().concat(":").concat(conditionNode.getId()));
toAlarm.setName("定时触发告警:" + index);
@ -103,7 +122,9 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy {
}
}
//添加告警条件到模型
model.getNodes().add(conditionNode);
//执行动作
if (CollectionUtils.isNotEmpty(rule.getAlarmRule().getActions())) {
int index = 0;
for (Action operation : rule.getAlarmRule().getActions()) {

View File

@ -7,7 +7,7 @@
<parent>
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-manager</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>visualization-manager</artifactId>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>jetlinks-community</artifactId>
<groupId>org.jetlinks.community</groupId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@ -30,7 +30,7 @@ import java.util.List;
title = "物联网平台",
description = "物联网平台接口文档",
contact = @Contact(name = "admin",url = "https://github.com/jetlinks"),
version = "1.12.0-SNAPSHOT"
version = "1.12.0"
)
)
@SecuritySchemes(

View File

@ -25,7 +25,9 @@ spring:
# database: 3
# max-wait: 10s
r2dbc:
# 需要手动创建数据库,启动会自动创建表,修改了配置easyorm相关配置也要修改
url: r2dbc:postgresql://localhost:5432/jetlinks
# url: r2dbc:mysql://localhost:3306/jetlinks?ssl=false&serverZoneId=Asia/Shanghai # 修改了配置easyorm相关配置也要修改
username: postgres
password: jetlinks
pool:

37
pom.xml
View File

@ -6,7 +6,7 @@
<groupId>org.jetlinks.community</groupId>
<artifactId>jetlinks-community</artifactId>
<version>1.12.0-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
<modules>
<module>jetlinks-components</module>
<module>jetlinks-manager</module>
@ -19,17 +19,19 @@
<spring.boot.version>2.3.11.RELEASE</spring.boot.version>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<hsweb.framework.version>4.0.13-SNAPSHOT</hsweb.framework.version>
<easyorm.version>4.0.13-SNAPSHOT</easyorm.version>
<hsweb.framework.version>4.0.14-SNAPSHOT</hsweb.framework.version>
<easyorm.version>4.0.14-SNAPSHOT</easyorm.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<jetlinks.version>1.1.9-SNAPSHOT</jetlinks.version>
<jetlinks.version>1.1.10-SNAPSHOT</jetlinks.version>
<r2dbc.version>Arabba-SR10</r2dbc.version>
<vertx.version>3.8.5</vertx.version>
<netty.version>4.1.51.Final</netty.version>
<vertx.version>4.2.3</vertx.version>
<netty.version>4.1.73.Final</netty.version>
<elasticsearch.version>7.11.2</elasticsearch.version>
<reactor.excel.version>1.0.1</reactor.excel.version>
<reactor.ql.version>1.0.13</reactor.ql.version>
<fastjson.version>1.2.70</fastjson.version>
<log4j.version>2.17.1</log4j.version>
<logback.version>1.2.9</logback.version>
</properties>
<build>
@ -171,6 +173,24 @@
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
@ -350,8 +370,7 @@
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>2.4.17</version>
<artifactId>groovy</artifactId>
</dependency>
<dependency>
@ -363,7 +382,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
<version>1.7.32</version>
</dependency>
<dependency>