优化设备数据存储

This commit is contained in:
zhou-hao 2020-08-18 18:17:58 +08:00
parent f4496a782c
commit 8dc7bb9c8d
13 changed files with 573 additions and 292 deletions

View File

@ -1,6 +1,5 @@
package org.jetlinks.community.elastic.search.aggreation;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
@ -21,9 +20,9 @@ import org.jetlinks.community.elastic.search.aggreation.enums.BucketType;
import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType;
import org.jetlinks.community.elastic.search.aggreation.enums.OrderType;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponse;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
@ -33,7 +32,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
@ -56,44 +54,47 @@ public class DefaultAggregationService implements AggregationService {
this.indexManager = indexManager;
}
@Override
public Mono<MetricsResponse> metricsAggregation(String index, QueryParam queryParam,
MetricsAggregationStructure structure) {
return createSearchSourceBuilder(queryParam, index)
.map(builder -> new SearchRequest(index)
.source(builder.aggregation(structure.getType().aggregationBuilder(structure.getName(), structure.getField()))))
.flatMap(request -> Mono.<SearchResponse>create(monoSink ->
restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
.map(searchResponse -> structure.getType().getResponse(structure.getName(), searchResponse));
}
@Override
public Mono<BucketResponse> bucketAggregation(String index, QueryParam queryParam, BucketAggregationsStructure structure) {
return createSearchSourceBuilder(queryParam, index)
.map(builder -> new SearchRequest(index)
.source(builder.aggregation(structure.getType().aggregationBuilder(structure))))
.doOnNext(searchRequest -> {
if (log.isDebugEnabled()) {
log.debug("聚合查询ElasticSearch:{},参数:{}", index, JSON.toJSON(searchRequest.source().toString()));
}
})
.flatMap(request -> Mono.<SearchResponse>create(monoSink ->
restClient
.getQueryClient()
.searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
.map(response -> BucketResponse.builder()
.name(structure.getName())
.buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
.build())
;
}
// @Override
// public Mono<MetricsResponse> metricsAggregation(String index, QueryParam queryParam,
// MetricsAggregationStructure structure) {
// return createSearchSourceBuilder(queryParam, index)
// .map(builder -> new SearchRequest(index)
// .source(builder.aggregation(structure.getType().aggregationBuilder(structure.getName(), structure.getField()))))
// .flatMap(request -> Mono.<SearchResponse>create(monoSink ->
// restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
// .map(searchResponse -> structure.getType().getResponse(structure.getName(), searchResponse));
// }
//
// @Override
// public Mono<BucketResponse> bucketAggregation(String index, QueryParam queryParam, BucketAggregationsStructure structure) {
// return createSearchSourceBuilder(queryParam, index)
// .map(builder -> new SearchRequest(index)
// .source(builder
// .aggregation(structure.getType().aggregationBuilder(structure))
// //.aggregation(AggregationBuilders.topHits("last_val").from(1))
// ))
//// .doOnNext(searchRequest -> {
//// if (log.isDebugEnabled()) {
//// log.debug("聚合查询ElasticSearch:{},参数:{}", index, JSON.toJSON(searchRequest.source().toString()));
//// }
//// })
// .flatMap(request -> Mono.<SearchResponse>create(monoSink ->
// restClient
// .getQueryClient()
// .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
// .map(response -> BucketResponse.builder()
// .name(structure.getName())
// .buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
// .build())
// ;
//
// }
private Mono<SearchSourceBuilder> createSearchSourceBuilder(QueryParam queryParam, String index) {
return indexManager.getIndexMetadata(index)
.map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata))
.doOnError(e -> log.error("解析queryParam错误:{}", index, e));
return indexManager
.getIndexMetadata(index)
.map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata));
}
private <T> ActionListener<T> translatorActionListener(MonoSink<T> sink) {
@ -119,27 +120,31 @@ public class DefaultAggregationService implements AggregationService {
}
@Override
public Flux<Map<String, Object>> aggregation(String index, AggregationQueryParam aggregationQueryParam) {
public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryParam aggregationQueryParam) {
QueryParam queryParam = prepareQueryParam(aggregationQueryParam);
BucketAggregationsStructure structure = createAggParameter(aggregationQueryParam);
return indexManager
.getIndexStrategy(index)
return Flux.fromArray(index)
.flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
.collectList()
.flatMap(strategy ->
createSearchSourceBuilder(queryParam, index)
createSearchSourceBuilder(queryParam, index[0])
.map(builder ->
new SearchRequest(strategy.getIndexForSearch(index))
.source(builder.aggregation(structure.getType().aggregationBuilder(structure)))))
.doOnNext(searchRequest -> {
if (log.isDebugEnabled()) {
log.debug("聚合查询ElasticSearch:{},参数:{}", index, JSON.toJSON(searchRequest.source().toString()));
}
})
new SearchRequest(strategy
.stream()
.map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2()))
.toArray(String[]::new))
.indicesOptions(DefaultElasticSearchService.indexOptions)
.source(builder.size(0).aggregation(structure.getType().aggregationBuilder(structure))
)
)
)
.flatMap(searchRequest ->
ReactorActionListener
.<SearchResponse>mono(listener ->
restClient.getQueryClient()
.searchAsync(searchRequest, RequestOptions.DEFAULT, listener)
))
.filter(response -> response.getAggregations() != null)
.map(response -> BucketResponse.builder()
.name(structure.getName())
.buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
@ -151,7 +156,7 @@ public class DefaultAggregationService implements AggregationService {
static class BucketsParser {
private List<Map<String, Object>> result = new ArrayList<>();
private final List<Map<String, Object>> result = new ArrayList<>();
public static List<Map<String, Object>> convert(BucketResponse response) {
return new BucketsParser(response).result;
@ -261,20 +266,4 @@ public class DefaultAggregationService implements AggregationService {
return structure;
}).collect(Collectors.toList());
}
protected static String durationFormat(Duration duration) {
String durationStr = duration.toString();
if (durationStr.contains("S")) {
return duration.toMillis() / 1000 + "s";
} else if (!durationStr.contains("S") && durationStr.contains("M")) {
return duration.toMinutes() + "m";
} else if (!durationStr.contains("S") && !durationStr.contains("M")) {
if (duration.toHours() % 24 == 0) {
return duration.toDays() + "d";
} else {
return duration.toHours() + "h";
}
}
throw new UnsupportedOperationException("不支持的时间周期:" + duration.toString());
}
}

View File

@ -6,6 +6,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -20,11 +21,11 @@ public class DefaultElasticSearchIndexManager implements ElasticSearchIndexManag
@Getter
@Setter
private Map<String, String> indexUseStrategy = new ConcurrentHashMap<>();
private Map<String, String> indexUseStrategy = new HashMap<>();
private Map<String, ElasticSearchIndexStrategy> strategies = new ConcurrentHashMap<>();
private final Map<String, ElasticSearchIndexStrategy> strategies = new ConcurrentHashMap<>();
private Map<String, ElasticSearchIndexMetadata> indexMetadataStore = new ConcurrentHashMap<>();
private final Map<String, ElasticSearchIndexMetadata> indexMetadataStore = new ConcurrentHashMap<>();
public DefaultElasticSearchIndexManager(List<ElasticSearchIndexStrategy> strategies) {
strategies.forEach(this::registerStrategy);
@ -55,7 +56,12 @@ public class DefaultElasticSearchIndexManager implements ElasticSearchIndexManag
.switchIfEmpty(Mono.error(() -> new IllegalArgumentException("[" + index + "] 不支持任何索引策略")));
}
protected void registerStrategy(ElasticSearchIndexStrategy strategy) {
@Override
public void useStrategy(String index, String strategy) {
indexUseStrategy.put(index, strategy);
}
public void registerStrategy(ElasticSearchIndexStrategy strategy) {
strategies.put(strategy.getId(), strategy);
}

View File

@ -1,13 +1,65 @@
package org.jetlinks.community.elastic.search.index;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ElasticSearchIndexManager {
/**
* 更新索引
*
* @param index 索引元数据
* @return 更新结果
*/
Mono<Void> putIndex(ElasticSearchIndexMetadata index);
/**
* 获取索引元数据
*
* @param index 索引名称
* @return 索引元数据
*/
Mono<ElasticSearchIndexMetadata> getIndexMetadata(String index);
/**
* 获取多个所有元数据
* @param index 索引名称
* @return 索引元数据
*/
default Flux<ElasticSearchIndexMetadata> getIndexesMetadata(String... index) {
return Flux
.fromArray(index)
.flatMap(this::getIndexMetadata);
}
/**
* 获取索引策略
*
* @param index 索引名称
* @return 索引策略
* @see ElasticSearchIndexStrategy
*/
Mono<ElasticSearchIndexStrategy> getIndexStrategy(String index);
default Flux<ElasticSearchIndexStrategy> getIndexesStrategy(String... index){
return Flux
.fromArray(index)
.flatMap(this::getIndexStrategy);
}
/**
* 设置索引策略
*
* @param index 索引策略
* @param strategy 策略标识
*/
void useStrategy(String index, String strategy);
/**
* 注册索引策略
*
* @param strategy 策略
*/
void registerStrategy(ElasticSearchIndexStrategy strategy);
}

View File

@ -1,14 +1,7 @@
package org.jetlinks.community.elastic.search.service;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure;
import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponse;
import org.jetlinks.community.elastic.search.index.ElasticIndex;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Map;
@ -18,13 +11,6 @@ import java.util.Map;
**/
public interface AggregationService {
Mono<MetricsResponse> metricsAggregation(String index, QueryParam queryParam, MetricsAggregationStructure structure);
Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryParam queryParam);
Mono<BucketResponse> bucketAggregation(String index, QueryParam queryParam, BucketAggregationsStructure structure);
Flux<Map<String, Object>> aggregation(String index, AggregationQueryParam queryParam);
default Flux<Map<String, Object>> aggregation(ElasticIndex index, AggregationQueryParam queryParam) {
return aggregation(index.getIndex(), queryParam);
}
}

View File

@ -1,6 +1,5 @@
package org.jetlinks.community.elastic.search.service;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
@ -11,21 +10,31 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.jetlinks.core.utils.FluxUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
@ -34,6 +43,9 @@ import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.function.Consumer3;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import javax.annotation.PreDestroy;
@ -58,6 +70,10 @@ public class DefaultElasticSearchService implements ElasticSearchService {
FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false
);
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
}
@ -69,33 +85,112 @@ public class DefaultElasticSearchService implements ElasticSearchService {
this.indexManager = indexManager;
}
@Override
public <T> Flux<T> multiQuery(String[] index, Collection<QueryParam> queryParam, Function<Map<String, Object>, T> mapper) {
return indexManager
.getIndexesMetadata(index)
.flatMap(idx -> Mono.zip(
Mono.just(idx), getIndexForSearch(idx.getIndex())
))
.take(1)
.singleOrEmpty()
.flatMapMany(indexMetadata -> {
MultiSearchRequest request = new MultiSearchRequest();
return Flux
.fromIterable(queryParam)
.flatMap(entry -> createSearchRequest(entry, index))
.doOnNext(request::add)
.then(Mono.just(request))
.flatMapMany(searchRequest -> ReactorActionListener
.<MultiSearchResponse>mono(actionListener -> {
restClient.getQueryClient()
.msearchAsync(searchRequest, RequestOptions.DEFAULT, actionListener);
})
.flatMapMany(response -> Flux.fromArray(response.getResponses()))
.flatMap(item -> {
if (item.isFailure()) {
log.warn(item.getFailureMessage(), item.getFailure());
return Mono.empty();
}
return Flux.fromIterable(translate((map) -> mapper.apply(indexMetadata.getT1().convertFromElastic(map)), item.getResponse()));
}))
;
});
}
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return doSearch(createSearchRequest(queryParam, index))
.flatMapIterable(response -> translate(mapper, response))
.onErrorResume(err -> {
log.error("query elastic error", err);
return this
.doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(index, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(index, queryParam)
.flatMap(tp2 ->
convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits(), list, queryParam))
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
}
private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
SearchResponse response,
Function<Map<String, Object>, T> mapper) {
return Flux
.create(sink -> {
Map<String, ElasticSearchIndexMetadata> metadata = indexList
.stream()
.collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity()));
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
sink.next(mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap)));
}
sink.complete();
});
}
private Mono<Tuple2<List<ElasticSearchIndexMetadata>, SearchResponse>> doQuery(String[] index,
QueryParam queryParam) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(metadataList -> this
.createSearchRequest(queryParam, metadataList)
.flatMap(this::doSearch)
.map(response -> Tuples.of(metadataList, response))
).onErrorResume(err -> {
log.error(err.getMessage(), err);
return Mono.empty();
});
}
@Override
public <T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return doSearch(createSearchRequest(queryParam, index))
.map(response -> translatePageResult(mapper, queryParam, response))
.onErrorReturn(err -> {
log.error("query elastic error", err);
return true;
}, PagerResult.empty())
.defaultIfEmpty(PagerResult.empty());
}
@Override
public Mono<Long> count(String index, QueryParam queryParam) {
public Mono<Long> count(String[] index, QueryParam queryParam) {
QueryParam param = queryParam.clone();
param.setPaging(false);
param.setSorts(Collections.emptyList());
return doCount(createCountRequest(param, index))
return createCountRequest(param, index)
.flatMap(this::doCount)
.map(CountResponse::getCount)
.defaultIfEmpty(0L)
.onErrorReturn(err -> {
@ -104,20 +199,32 @@ public class DefaultElasticSearchService implements ElasticSearchService {
}, 0L);
}
@Override
public Mono<Long> delete(String index, QueryParam queryParam) {
return createQueryBuilder(queryParam, index)
.flatMap(request -> ReactorActionListener
.<BulkByScrollResponse>mono(listener ->
restClient
.getWriteClient()
.deleteByQueryAsync(new DeleteByQueryRequest(index)
.setQuery(request),
RequestOptions.DEFAULT, listener)))
.map(BulkByScrollResponse::getDeleted);
}
@Override
public <T> Mono<Void> commit(String index, T payload) {
return Mono.fromRunnable(() -> {
sink.next(new Buffer(index, payload));
});
sink.next(new Buffer(index, payload));
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Collection<T> payload) {
return Mono.fromRunnable(() -> {
for (T t : payload) {
sink.next(new Buffer(index, t));
}
});
for (T t : payload) {
sink.next(new Buffer(index, t));
}
return Mono.empty();
}
@Override
@ -162,7 +269,6 @@ public class DefaultElasticSearchService implements ElasticSearchService {
//缓冲背压
int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64);
//这里的警告都输出到控制台,输入到slf4j可能会造成日志递归.
FluxUtils.bufferRate(
Flux.<Buffer>create(sink -> this.sink = sink),
flushRate,
@ -171,13 +277,18 @@ public class DefaultElasticSearchService implements ElasticSearchService {
.onBackpressureBuffer(bufferBackpressure,
drop -> System.err.println("无法处理更多索引请求!"),
BufferOverflowStrategy.DROP_OLDEST)
.flatMap(this::doSave)
.doOnNext((len) -> {
if (log.isDebugEnabled() && len > 0) {
log.debug("保存ElasticSearch数据成功,数量:{}", len);
}
.parallel()
.runOn(Schedulers.newParallel("elasticsearch-writer"))
.flatMap(buffers -> {
long time = System.currentTimeMillis();
return this
.doSave(buffers)
.doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
.onErrorContinue((err, obj) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
});
})
.onErrorContinue((err, obj) -> System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err)))
.subscribe();
}
@ -212,13 +323,15 @@ public class DefaultElasticSearchService implements ElasticSearchService {
.zipWith(indexManager.getIndexMetadata(index))
.flatMapMany(tp2 ->
group.map(buffer -> {
IndexRequest request = new IndexRequest(tp2.getT1(), "_doc");
Object o = JSON.toJSON(buffer.getPayload());
if (o instanceof Map) {
request.source(tp2.getT2().convertToElastic((Map<String, Object>) o));
Map<String, Object> data = FastBeanCopier.copy(buffer.getPayload(), HashMap::new);
IndexRequest request;
if (data.get("id") != null) {
request = new IndexRequest(tp2.getT1(), "_doc", String.valueOf(data.get("id")));
} else {
request.source(o.toString(), XContentType.JSON);
request = new IndexRequest(tp2.getT1(), "_doc");
}
request.source(tp2.getT2().convertToElastic(data));
return request;
}));
})
@ -228,9 +341,10 @@ public class DefaultElasticSearchService implements ElasticSearchService {
BulkRequest request = new BulkRequest();
lst.forEach(request::add);
return ReactorActionListener.<BulkResponse>mono(listener ->
restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)
).doOnNext(this::checkResponse);
}).thenReturn(buffers.size());
restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener))
.doOnNext(this::checkResponse);
})
.thenReturn(buffers.size());
}
@SneakyThrows
@ -244,11 +358,6 @@ public class DefaultElasticSearchService implements ElasticSearchService {
}
}
private <T> PagerResult<T> translatePageResult(Function<Map<String, Object>, T> mapper, QueryParam param, SearchResponse response) {
long total = response.getHits().getTotalHits();
return PagerResult.of((int) total, translate(mapper, response), param);
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
@ -261,50 +370,73 @@ public class DefaultElasticSearchService implements ElasticSearchService {
.collect(Collectors.toList());
}
private Mono<SearchResponse> doSearch(Mono<SearchRequest> requestMono) {
return requestMono.flatMap((request) ->
ReactorActionListener
.<SearchResponse>mono(listener ->
restClient
.getQueryClient()
.searchAsync(request, RequestOptions.DEFAULT, listener)))
private Mono<SearchResponse> doSearch(SearchRequest request) {
return this
.<SearchRequest, SearchResponse>execute(request, restClient.getQueryClient()::searchAsync)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<CountResponse> doCount(Mono<CountRequest> requestMono) {
return requestMono.flatMap((request) ->
ReactorActionListener
.<CountResponse>mono(listener ->
restClient
.getQueryClient()
.countAsync(request, RequestOptions.DEFAULT, listener)))
private <REQ, RES> Mono<RES> execute(REQ request, Consumer3<REQ, RequestOptions, ActionListener<RES>> function4) {
return ReactorActionListener.mono(actionListener -> function4.accept(request, RequestOptions.DEFAULT, actionListener));
}
private Mono<CountResponse> doCount(CountRequest request) {
return this
.execute(request, restClient.getQueryClient()::countAsync)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String index) {
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
return indexManager
.getIndexesMetadata(indexes)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createSearchRequest(queryParam, list));
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions)
.types("_doc"));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, null)))
.flatMap(builder -> this.getIndexForSearch(index)
.map(idx -> new SearchRequest(idx).source(builder).types("_doc")));
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String index) {
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, null)))
.flatMap(builder -> this.getIndexForSearch(index)
.map(idx -> new CountRequest(idx).source(builder)));
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
}

View File

@ -14,11 +14,30 @@ import java.util.function.Function;
public interface ElasticSearchService {
<T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper);
default <T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return queryPager(new String[]{index}, queryParam, mapper);
}
<T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper);
<T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper);
Mono<Long> count(String index, QueryParam queryParam);
<T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper);
default <T> Flux<T> multiQuery(String index, Collection<QueryParam> queryParam, Function<Map<String, Object>, T> mapper) {
return multiQuery(new String[]{index}, queryParam, mapper);
}
<T> Flux<T> multiQuery(String[] index, Collection<QueryParam> queryParam, Function<Map<String, Object>, T> mapper);
default Mono<Long> count(String index, QueryParam queryParam) {
return count(new String[]{index}, queryParam);
}
Mono<Long> count(String[] index, QueryParam queryParam);
Mono<Long> delete(String index, QueryParam queryParam);
<T> Mono<Void> commit(String index, T payload);
@ -26,9 +45,9 @@ public interface ElasticSearchService {
<T> Mono<Void> commit(String index, Publisher<T> data);
<T> Mono<Void> save(String index, T payload);
<T> Mono<Void> save(String index, T payload);
<T> Mono<Void> save(String index, Collection<T> payload);
<T> Mono<Void> save(String index, Collection<T> payload);
<T> Mono<Void> save(String index, Publisher<T> data);

View File

@ -14,6 +14,7 @@ import org.jetlinks.community.timeseries.TimeSeriesService;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -27,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class ElasticSearchTimeSeriesManager implements TimeSeriesManager {
private Map<String, TimeSeriesService> serviceMap = new ConcurrentHashMap<>(16);
private final Map<String, TimeSeriesService> serviceMap = new ConcurrentHashMap<>(16);
protected final ElasticSearchIndexManager indexManager;
@ -49,10 +50,22 @@ public class ElasticSearchTimeSeriesManager implements TimeSeriesManager {
return getService(metric.getId());
}
@Override
public TimeSeriesService getServices(TimeSeriesMetric... metric) {
return getServices(Arrays
.stream(metric)
.map(TimeSeriesMetric::getId).toArray(String[]::new));
}
@Override
public TimeSeriesService getServices(String... metric) {
return new ElasticSearchTimeSeriesService(metric, elasticSearchService, aggregationService);
}
@Override
public TimeSeriesService getService(String metric) {
return serviceMap.computeIfAbsent(metric,
id -> new ElasticSearchTimeSeriesService(id, elasticSearchService, aggregationService));
id -> new ElasticSearchTimeSeriesService(new String[]{id}, elasticSearchService, aggregationService));
}

View File

@ -16,26 +16,36 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@AllArgsConstructor
@Slf4j
public class ElasticSearchTimeSeriesService implements TimeSeriesService {
private String index;
private final String[] index;
private ElasticSearchService elasticSearchService;
private final ElasticSearchService elasticSearchService;
private AggregationService aggregationService;
private final AggregationService aggregationService;
static DateTimeType timeType=new DateTimeType();
static DateTimeType timeType = DateTimeType.GLOBAL;
@Override
public Flux<TimeSeriesData> query(QueryParam queryParam) {
return elasticSearchService.query(index, applySort(queryParam), map -> TimeSeriesData.of(timeType.convert(map.get("timestamp")), map));
}
@Override
public Flux<TimeSeriesData> multiQuery(Collection<QueryParam> query) {
return elasticSearchService.multiQuery(
index,
query.stream().peek(this::applySort).collect(Collectors.toList()),
map -> TimeSeriesData.of(timeType.convert(map.get("timestamp")), map));
}
@Override
public Mono<Integer> count(QueryParam queryParam) {
return elasticSearchService
@ -65,25 +75,37 @@ public class ElasticSearchTimeSeriesService implements TimeSeriesService {
}
protected QueryParam applySort(QueryParam param){
if(CollectionUtils.isEmpty(param.getSorts())){
protected QueryParam applySort(QueryParam param) {
if (CollectionUtils.isEmpty(param.getSorts())) {
param.orderBy("timestamp").desc();
}
return param;
}
@Override
public Mono<Void> save(Publisher<TimeSeriesData> data) {
public Mono<Void> commit(Publisher<TimeSeriesData> data) {
return Flux.from(data)
.flatMap(this::save)
.flatMap(this::commit)
.then();
}
@Override
public Mono<Void> save(TimeSeriesData data) {
return Mono.defer(() -> {
Map<String, Object> mapData = data.getData();
mapData.put("timestamp", data.getTimestamp());
return elasticSearchService.commit(index, mapData);
});
public Mono<Void> commit(TimeSeriesData data) {
Map<String, Object> mapData = data.getData();
mapData.put("timestamp", data.getTimestamp());
return elasticSearchService.commit(index[0], mapData);
}
@Override
public Mono<Void> save(Publisher<TimeSeriesData> dateList) {
return elasticSearchService.save(index[0],
Flux.from(dateList)
.map(data -> {
Map<String, Object> mapData = data.getData();
mapData.put("timestamp", data.getTimestamp());
return mapData;
}));
}
}

View File

@ -2,6 +2,7 @@ package org.jetlinks.community.elastic.search.utils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
@ -10,7 +11,6 @@ import org.hswebframework.ezorm.core.param.Sort;
import org.hswebframework.ezorm.core.param.Term;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.GeoType;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.parser.DefaultLinkTypeParser;
import org.springframework.util.StringUtils;
@ -38,26 +38,7 @@ public class QueryParamTranslator {
};
static {
//地理位置查询
converter.put(GeoType.ID, (type, term) -> {
// TODO: 2020/3/5
});
}
public static SearchSourceBuilder convertSearchSourceBuilder(QueryParam queryParam, ElasticSearchIndexMetadata metadata) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if (queryParam.isPaging()) {
sourceBuilder.from(queryParam.getPageIndex() * queryParam.getPageSize());
sourceBuilder.size(queryParam.getPageSize());
}
for (Sort sort : queryParam.getSorts()) {
if (!StringUtils.isEmpty(sort.getName())) {
sourceBuilder.sort(sort.getName(), SortOrder.fromString(sort.getOrder()));
}
}
public static QueryBuilder createQueryBuilder(QueryParam queryParam, ElasticSearchIndexMetadata metadata) {
BoolQueryBuilder queryBuilders = QueryBuilders.boolQuery();
Consumer<Term> paramConverter = doNotingParamConverter;
if (metadata != null) {
@ -75,7 +56,21 @@ public class QueryParamTranslator {
for (Term term : queryParam.getTerms()) {
linkTypeParser.process(term, paramConverter, queryBuilders);
}
return sourceBuilder.query(queryBuilders);
return queryBuilders;
}
public static SearchSourceBuilder convertSearchSourceBuilder(QueryParam queryParam, ElasticSearchIndexMetadata metadata) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
if (queryParam.isPaging()) {
sourceBuilder.from(queryParam.getPageIndex() * queryParam.getPageSize());
sourceBuilder.size(queryParam.getPageSize());
}
for (Sort sort : queryParam.getSorts()) {
if (!StringUtils.isEmpty(sort.getName())) {
sourceBuilder.sort(sort.getName(), SortOrder.fromString(sort.getOrder()));
}
}
return sourceBuilder.query(createQueryBuilder(queryParam,metadata));
}
}

View File

@ -13,18 +13,28 @@ public interface TimeSeriesManager {
/**
* 根据指标获取服务
*
* @param metric 指标,通常
* @param metric 指标,通常表名
* @return 时序服务
*/
TimeSeriesService getService(TimeSeriesMetric metric);
/**
* 获取多个指标服务
* @param metric 多个指标
* @return 时序服务
*/
TimeSeriesService getServices(TimeSeriesMetric... metric);
TimeSeriesService getServices(String... metric);
TimeSeriesService getService(String metric);
/**
* 注册元数据
* 注册元数据,将更新表结构
*
* @param metadata 元数据
* @return 注册结果
*/
Mono<Void> registerMetadata(TimeSeriesMetadata metadata);
}

View File

@ -8,6 +8,7 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collection;
import java.util.function.Function;
/**
@ -26,6 +27,8 @@ public interface TimeSeriesService {
*/
Flux<TimeSeriesData> query(QueryParam queryParam);
Flux<TimeSeriesData> multiQuery(Collection<QueryParam> query);
/**
* 查询数量
*
@ -71,25 +74,35 @@ public interface TimeSeriesService {
* .execute(service::aggregation)
*
* </pre>
*
* @param queryParam 聚合查询条件
* @return 查询结果数据流
*/
Flux<AggregationData> aggregation(AggregationQueryParam queryParam);
/**
* 保存数据
* 提交数据,数据不会立即保存
*
* @param data 数据流
* @return 保存结果, {@link Mono#error(Throwable)} 则成功
*/
Mono<Void> save(Publisher<TimeSeriesData> data);
Mono<Void> commit(Publisher<TimeSeriesData> data);
/**
* 保存数据
* 提交数据,数据不会立即保存
*
* @param data 单个数据
* @return 保存结果, {@link Mono#error(Throwable)} 则成功
*/
Mono<Void> save(TimeSeriesData data);
Mono<Void> commit(TimeSeriesData data);
/**
* 批量保存数据
*
* @param data 数据集
* @return 结果
*/
Mono<Void> save(Publisher<TimeSeriesData> data);
}

View File

@ -1,6 +1,8 @@
package org.jetlinks.community.device.message.writer;
import com.alibaba.fastjson.JSON;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.id.IDGenerator;
@ -14,23 +16,23 @@ import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.UnknownType;
import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.function.Consumer;
/**
* 用于将设备消息写入到时序数据库
@ -39,11 +41,16 @@ import java.util.stream.Collectors;
* @since 1.0
*/
@Slf4j
@ConfigurationProperties(prefix = "jetlinks.device.log")
public class TimeSeriesMessageWriterConnector{
public TimeSeriesManager timeSeriesManager;
public DeviceRegistry registry;
@Setter
@Getter
private Set<String> excludes = new HashSet<>();
public TimeSeriesMessageWriterConnector(TimeSeriesManager timeSeriesManager, DeviceRegistry registry) {
this.timeSeriesManager = timeSeriesManager;
this.registry = registry;
@ -51,103 +58,97 @@ public class TimeSeriesMessageWriterConnector{
@Subscribe(topics = "/device/**",id = "device-message-ts-writer")
public Mono<Void> writeDeviceMessageToTs(DeviceMessage message){
return this.doIndex(message);
return commitDeviceMessage(message);
}
private Mono<Void> doIndex(DeviceMessage message) {
Map<String, Object> headers = Optional.ofNullable(message.getHeaders()).orElse(Collections.emptyMap());
public Mono<Void> saveDeviceMessage(Publisher<DeviceMessage> message) {
String productId = (String) headers.getOrDefault("productId","null");
return Flux.from(message)
.flatMap(this::convert)
.groupBy(Tuple2::getT1)
.flatMap(groups -> timeSeriesManager
.getService(groups.key())
.save(groups.map(Tuple2::getT2)))
.then();
}
public Mono<Void> commitDeviceMessage(DeviceMessage message) {
return this
.convert(message)
.flatMap(tp2 -> timeSeriesManager
.getService(tp2.getT1())
.commit(tp2.getT2()))
.then();
}
protected Mono<Tuple2<String, TimeSeriesData>> createLog(String productId, DeviceMessage message, Consumer<DeviceOperationLogEntity> logEntityConsumer) {
if (excludes.contains("*") || excludes.contains(message.getMessageType().name())) {
return Mono.empty();
}
DeviceOperationLogEntity operationLog = new DeviceOperationLogEntity();
operationLog.setId(IDGenerator.MD5.generate());
operationLog.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
operationLog.setDeviceId(message.getDeviceId());
operationLog.setCreateTime(new Date(message.getTimestamp()));
operationLog.setProductId(productId);
operationLog.setType(DeviceLogType.of(message));
Mono<Void> thenJob = null;
if (null != logEntityConsumer) {
logEntityConsumer.accept(operationLog);
}
return Mono.just(Tuples.of(DeviceTimeSeriesMetric.deviceLogMetricId(productId), TimeSeriesData.of(message.getTimestamp(), operationLog.toSimpleMap())));
}
protected Flux<Tuple2<String, TimeSeriesData>> convert(DeviceMessage message) {
Map<String, Object> headers = Optional.ofNullable(message.getHeaders()).orElse(Collections.emptyMap());
String productId = (String) headers.getOrDefault("productId", "null");
Consumer<DeviceOperationLogEntity> logEntityConsumer = null;
List<Publisher<Tuple2<String, TimeSeriesData>>> all = new ArrayList<>();
if (message instanceof EventMessage) {
operationLog.setContent(JSON.toJSONString(((EventMessage) message).getData()));
thenJob = doIndexEventMessage(headers, ((EventMessage) message));
} else if (message instanceof DeviceOfflineMessage) {
operationLog.setContent("设备离线");
} else if (message instanceof DeviceOnlineMessage) {
operationLog.setContent("设备上线");
} else if (message instanceof ReportPropertyMessage) {
logEntityConsumer = log -> log.setContent(JSON.toJSONString(((EventMessage) message).getData()));
all.add(convertEvent(productId, headers, ((EventMessage) message)));
}
//上报属性
else if (message instanceof ReportPropertyMessage) {
ReportPropertyMessage reply = (ReportPropertyMessage) message;
Map<String, Object> properties = reply.getProperties();
if (MapUtils.isNotEmpty(properties)) {
operationLog.setContent(properties);
thenJob = doIndexPropertiesMessage(headers, message, properties);
logEntityConsumer = log -> log.setContent(properties);
all.add(convertProperties(productId, headers, message, properties));
}
} else if (message instanceof ReadPropertyMessageReply) {
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) message;
if (reply.isSuccess()) {
Map<String, Object> properties = reply.getProperties();
operationLog.setContent(properties);
thenJob = doIndexPropertiesMessage(headers, message, properties);
} else {
log.warn("读取设备:{} 属性失败", reply.getDeviceId());
}
} else if (message instanceof WritePropertyMessageReply) {
WritePropertyMessageReply reply = (WritePropertyMessageReply) message;
if (reply.isSuccess()) {
Map<String, Object> properties = reply.getProperties();
operationLog.setContent(properties);
thenJob = doIndexPropertiesMessage(headers, message, properties);
} else {
log.warn("修改设备:{} 属性失败", reply.getDeviceId());
}
} else if (message instanceof FunctionInvokeMessageReply) {
operationLog.setContent(JSON.toJSONString(((FunctionInvokeMessageReply) message).getOutput()));
} else {
operationLog.setContent(JSON.toJSONString(message));
}
if (thenJob == null) {
thenJob = Mono.empty();
//消息回复
else if (message instanceof DeviceMessageReply) {
//失败的回复消息
if (!((DeviceMessageReply) message).isSuccess()) {
logEntityConsumer = log -> log.setContent(message.toString());
} else if (message instanceof ReadPropertyMessageReply) {
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) message;
Map<String, Object> properties = reply.getProperties();
logEntityConsumer = log -> log.setContent(properties);
all.add(convertProperties(productId, headers, message, properties));
} else if (message instanceof WritePropertyMessageReply) {
WritePropertyMessageReply reply = (WritePropertyMessageReply) message;
Map<String, Object> properties = reply.getProperties();
logEntityConsumer = log -> log.setContent(properties);
all.add(convertProperties(productId, headers, message, properties));
} else {
logEntityConsumer = log -> log.setContent(message.toJson().toJSONString());
}
}
return timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceLogMetric(productId))
.save(TimeSeriesData.of(message.getTimestamp(), operationLog.toSimpleMap()))
.then(thenJob);
//其他
else {
logEntityConsumer = log -> log.setContent(message.toJson().toJSONString());
}
all.add(createLog(productId, message, logEntityConsumer));
return Flux.merge(all);
}
protected Mono<Void> doIndexPropertiesMessage(Map<String, Object> headers,
DeviceMessage message,
Map<String, Object> properties) {
String productId = (String) headers.get("productId");
protected Mono<Tuple2<String, TimeSeriesData>> convertEvent(String productId, Map<String, Object> headers, EventMessage message) {
return registry
.getDevice(message.getDeviceId())
.flatMap(device -> device.getMetadata()
.flatMap(metadata -> {
Map<String, PropertyMetadata> propertyMetadata = metadata.getProperties().stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity()));
return Flux.fromIterable(properties.entrySet())
.map(entry -> {
DevicePropertiesEntity entity = DevicePropertiesEntity.builder()
.deviceId(device.getDeviceId())
.timestamp(message.getTimestamp())
.property(entry.getKey())
.propertyName(entry.getKey())
.orgId((String) headers.get("orgId"))
.productId(productId)
.build()
.withValue(propertyMetadata.get(entry.getKey()), entry.getValue());
return TimeSeriesData.of(message.getTimestamp(), entity.toMap());
})
.flatMap(data -> timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(productId)).save(data))
.then();
}));
}
protected Mono<Void> doIndexEventMessage(Map<String, Object> headers, EventMessage message) {
String productId = (String) headers.get("productId");
return registry.getDevice(message.getDeviceId())
.flatMap(device -> device.getMetadata()
.map(metadata -> {
Object value = message.getData();
@ -155,7 +156,7 @@ public class TimeSeriesMessageWriterConnector{
.getEvent(message.getEvent())
.map(EventMetadata::getType)
.orElseGet(UnknownType::new);
Map<String, Object> data = new HashMap<>(headers);
Map<String, Object> data = new HashMap<>();
data.put("deviceId", device.getDeviceId());
data.put("createTime", message.getTimestamp());
Object tempValue = ValueTypeTranslator.translator(value, dataType);
@ -166,6 +167,37 @@ public class TimeSeriesMessageWriterConnector{
}
return TimeSeriesData.of(message.getTimestamp(), data);
}))
.flatMap(data -> timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(productId, message.getEvent())).save(data));
.map(data -> Tuples.of(DeviceTimeSeriesMetric.deviceEventMetricId(productId, message.getEvent()), data));
}
protected Flux<Tuple2<String, TimeSeriesData>> convertProperties(String productId,
Map<String, Object> headers,
DeviceMessage message,
Map<String, Object> properties) {
if (MapUtils.isEmpty(properties)) {
return Flux.empty();
}
return registry
.getDevice(message.getDeviceId())
.flatMapMany(device -> device
.getMetadata()
.flatMapMany(metadata -> Flux
.fromIterable(properties.entrySet())
.map(entry -> {
DevicePropertiesEntity entity = DevicePropertiesEntity.builder()
.deviceId(device.getDeviceId())
.timestamp(message.getTimestamp())
.property(entry.getKey())
.propertyName(entry.getKey())
.orgId((String) headers.get("orgId"))
.productId(productId)
.build()
.withValue(metadata.getPropertyOrNull(entry.getKey()), entry.getValue());
return TimeSeriesData.of(message.getTimestamp(), entity.toMap());
})
.map(data -> Tuples.of(DeviceTimeSeriesMetric.devicePropertyMetricId(productId), data)))
);
}
}

View File

@ -23,7 +23,11 @@ public interface DeviceTimeSeriesMetric {
* @return 度量标识
*/
static TimeSeriesMetric deviceEventMetric(String productId, String eventId) {
return TimeSeriesMetric.of("event_".concat(productId).concat("_").concat(eventId));
return TimeSeriesMetric.of(deviceEventMetricId(productId, eventId));
}
static String deviceEventMetricId(String productId, String eventId) {
return "event_".concat(productId).concat("_").concat(eventId);
}
/**
@ -33,7 +37,11 @@ public interface DeviceTimeSeriesMetric {
* @return 度量标识
*/
static TimeSeriesMetric devicePropertyMetric(String productId) {
return TimeSeriesMetric.of("properties_".concat(productId));
return TimeSeriesMetric.of(devicePropertyMetricId(productId));
}
static String devicePropertyMetricId(String productId) {
return "properties_".concat(productId);
}
/**
@ -43,7 +51,11 @@ public interface DeviceTimeSeriesMetric {
* @return 度量标识
*/
static TimeSeriesMetric deviceLogMetric(String productId) {
return TimeSeriesMetric.of("device_log_".concat(productId));
return TimeSeriesMetric.of(deviceLogMetricId(productId));
}
static String deviceLogMetricId(String productId) {
return "device_log_".concat(productId);
}
/**