使用es实现物数据存储

This commit is contained in:
zhouhao 2022-09-26 17:28:42 +08:00
parent e0fe2a5960
commit 279e21a5d3
20 changed files with 885 additions and 949 deletions

View File

@ -72,6 +72,11 @@
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>things-component</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -1,268 +0,0 @@
package org.jetlinks.community.elastic.search.aggreation;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.ezorm.core.param.TermType;
import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket;
import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure;
import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse;
import org.jetlinks.community.elastic.search.aggreation.bucket.Sort;
import org.jetlinks.community.elastic.search.aggreation.enums.BucketType;
import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType;
import org.jetlinks.community.elastic.search.aggreation.enums.OrderType;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author bsetfeng
* @since 1.0
**/
//@Service
@Slf4j
public class DefaultAggregationService implements AggregationService {
private final ElasticRestClient restClient;
private final ElasticSearchIndexManager indexManager;
@Autowired
public DefaultAggregationService(ElasticSearchIndexManager indexManager,
ElasticRestClient restClient) {
this.restClient = restClient;
this.indexManager = indexManager;
}
// @Override
// public Mono<MetricsResponse> metricsAggregation(String index, QueryParam queryParam,
// MetricsAggregationStructure structure) {
// return createSearchSourceBuilder(queryParam, index)
// .map(builder -> new SearchRequest(index)
// .source(builder.aggregation(structure.getType().aggregationBuilder(structure.getName(), structure.getField()))))
// .flatMap(request -> Mono.<SearchResponse>create(monoSink ->
// restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
// .map(searchResponse -> structure.getType().getResponse(structure.getName(), searchResponse));
// }
//
// @Override
// public Mono<BucketResponse> bucketAggregation(String index, QueryParam queryParam, BucketAggregationsStructure structure) {
// return createSearchSourceBuilder(queryParam, index)
// .map(builder -> new SearchRequest(index)
// .source(builder
// .aggregation(structure.getType().aggregationBuilder(structure))
// //.aggregation(AggregationBuilders.topHits("last_val").from(1))
// ))
//// .doOnNext(searchRequest -> {
//// if (log.isDebugEnabled()) {
//// log.debug("聚合查询ElasticSearch:{},参数:{}", index, JSON.toJSON(searchRequest.source().toString()));
//// }
//// })
// .flatMap(request -> Mono.<SearchResponse>create(monoSink ->
// restClient
// .getQueryClient()
// .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
// .map(response -> BucketResponse.builder()
// .name(structure.getName())
// .buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
// .build())
// ;
//
// }
private Mono<SearchSourceBuilder> createSearchSourceBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata));
}
private <T> ActionListener<T> translatorActionListener(MonoSink<T> sink) {
return new ActionListener<T>() {
@Override
public void onResponse(T response) {
sink.success(response);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ElasticsearchException) {
if (((ElasticsearchException) e).status().getStatus() == 404) {
sink.success();
return;
} else if (((ElasticsearchException) e).status().getStatus() == 400) {
sink.error(new ElasticsearchParseException("查询参数格式错误", e));
}
}
sink.error(e);
}
};
}
@Override
public Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryParam aggregationQueryParam) {
QueryParam queryParam = prepareQueryParam(aggregationQueryParam);
BucketAggregationsStructure structure = createAggParameter(aggregationQueryParam);
return Flux.fromArray(index)
.flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
.collectList()
.flatMap(strategy ->
createSearchSourceBuilder(queryParam, index[0])
.map(builder ->
new SearchRequest(strategy
.stream()
.map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2()))
.toArray(String[]::new))
.indicesOptions(DefaultElasticSearchService.indexOptions)
.source(builder.size(0).aggregation(structure.getType().aggregationBuilder(structure))
)
)
)
.flatMap(searchRequest ->
ReactorActionListener
.<SearchResponse>mono(listener ->
restClient.getQueryClient()
.searchAsync(searchRequest, RequestOptions.DEFAULT, listener)
))
.filter(response -> response.getAggregations() != null)
.map(response -> BucketResponse.builder()
.name(structure.getName())
.buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
.build())
.flatMapIterable(BucketsParser::convert)
.take(aggregationQueryParam.getLimit())
;
}
static class BucketsParser {
private final List<Map<String, Object>> result = new ArrayList<>();
public static List<Map<String, Object>> convert(BucketResponse response) {
return new BucketsParser(response).result;
}
public BucketsParser(BucketResponse response) {
this(response.getBuckets());
}
public BucketsParser(List<Bucket> buckets) {
buckets.forEach(bucket -> parser(bucket, new HashMap<>()));
}
public void parser(Bucket bucket, Map<String, Object> fMap) {
addBucketProperty(bucket, fMap);
if (bucket.getBuckets() != null && !bucket.getBuckets().isEmpty()) {
bucket.getBuckets().forEach(b -> {
Map<String, Object> map = new HashMap<>(fMap);
addBucketProperty(b, map);
parser(b, map);
});
} else {
result.add(fMap);
}
}
private void addBucketProperty(Bucket bucket, Map<String, Object> fMap) {
fMap.put(bucket.getName(), bucket.getKey());
fMap.putAll(bucket.toMap());
}
}
protected static QueryParam prepareQueryParam(AggregationQueryParam param) {
QueryParam queryParam = param.getQueryParam().clone();
queryParam.setPaging(false);
queryParam.and(param.getTimeProperty(), TermType.btw, Arrays.asList(calculateStartWithTime(param), param.getEndWithTime()));
if (queryParam.getSorts().isEmpty()) {
queryParam.orderBy(param.getTimeProperty()).desc();
}
return queryParam;
}
protected BucketAggregationsStructure createAggParameter(AggregationQueryParam param) {
List<BucketAggregationsStructure> structures = new ArrayList<>();
if (param.getGroupByTime() != null) {
structures.add(convertAggGroupTimeStructure(param));
}
if (param.getGroupBy() != null && !param.getGroupBy().isEmpty()) {
structures.addAll(getTermTypeStructures(param));
}
for (int i = 0, size = structures.size(); i < size; i++) {
if (i < size - 1) {
structures.get(i).setSubBucketAggregation(Collections.singletonList(structures.get(i + 1)));
}
if (i == size - 1) {
structures.get(i)
.setSubMetricsAggregation(param
.getAggColumns()
.stream()
.map(agg -> {
MetricsAggregationStructure metricsAggregationStructure = new MetricsAggregationStructure();
metricsAggregationStructure.setField(agg.getProperty());
metricsAggregationStructure.setName(agg.getAlias());
metricsAggregationStructure.setType(MetricsType.of(agg.getAggregation().name()));
return metricsAggregationStructure;
}).collect(Collectors.toList()));
}
}
return structures.get(0);
}
protected BucketAggregationsStructure convertAggGroupTimeStructure(AggregationQueryParam param) {
BucketAggregationsStructure structure = new BucketAggregationsStructure();
structure.setInterval(param.getGroupByTime().getInterval().toString());
structure.setType(BucketType.DATE_HISTOGRAM);
structure.setFormat(param.getGroupByTime().getFormat());
structure.setName(param.getGroupByTime().getAlias());
structure.setField(param.getGroupByTime().getProperty());
structure.setSort(Sort.desc(OrderType.KEY));
structure.setExtendedBounds(getExtendedBounds(param));
return structure;
}
protected static LongBounds getExtendedBounds(AggregationQueryParam param) {
return new LongBounds(calculateStartWithTime(param), param.getEndWithTime());
}
private static long calculateStartWithTime(AggregationQueryParam param) {
long startWithParam = param.getStartWithTime();
// if (param.getGroupByTime() != null && param.getGroupByTime().getInterval() != null) {
// long timeInterval = param.getGroupByTime().getInterval().toMillis() * param.getLimit();
// long tempStartWithParam = param.getEndWithTime() - timeInterval;
// startWithParam = Math.max(tempStartWithParam, startWithParam);
// }
return startWithParam;
}
protected List<BucketAggregationsStructure> getTermTypeStructures(AggregationQueryParam param) {
return param.getGroupBy()
.stream()
.map(group -> {
BucketAggregationsStructure structure = new BucketAggregationsStructure();
structure.setType(BucketType.TERMS);
structure.setSize(param.getLimit());
structure.setField(group.getProperty());
structure.setName(group.getAlias());
return structure;
}).collect(Collectors.toList());
}
}

View File

@ -1,64 +1,58 @@
package org.jetlinks.community.elastic.search.configuration;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.IdentityCipherSuiteFilter;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.Generated;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearch;
import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
import org.jetlinks.community.elastic.search.service.reactive.DefaultReactiveElasticsearchClient;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy;
import org.jetlinks.community.elastic.search.index.strategies.DirectElasticSearchIndexStrategy;
import org.jetlinks.community.elastic.search.index.strategies.TimeByMonthElasticSearchIndexStrategy;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.service.reactive.*;
import org.jetlinks.community.elastic.search.timeseries.ElasticSearchTimeSeriesManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRestClientAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
import org.springframework.data.elasticsearch.client.reactive.WebClientProvider;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import reactor.netty.http.client.HttpClient;
import reactor.netty.tcp.TcpClient;
import reactor.netty.transport.ProxyProvider;
import javax.net.ssl.SSLContext;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.List;
/**
* @author bsetfeng
* @author zhouhao
* @since 1.0
**/
@Configuration
@Configuration(proxyBeanMethods = false)
@Slf4j
@EnableConfigurationProperties({
ElasticSearchProperties.class,
EmbeddedElasticSearchProperties.class,
ElasticSearchIndexProperties.class})
ElasticSearchIndexProperties.class,
ElasticSearchBufferProperties.class})
@AutoConfigureAfter(ReactiveElasticsearchRestClientAutoConfiguration.class)
@ConditionalOnBean(ClientConfiguration.class)
@Generated
public class ElasticSearchConfiguration {
private final ElasticSearchProperties properties;
private final EmbeddedElasticSearchProperties embeddedProperties;
public ElasticSearchConfiguration(ElasticSearchProperties properties, EmbeddedElasticSearchProperties embeddedProperties) {
this.properties = properties;
this.embeddedProperties = embeddedProperties;
}
@Bean
@SneakyThrows
public DefaultReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {
@Primary
public DefaultReactiveElasticsearchClient defaultReactiveElasticsearchClient(EmbeddedElasticSearchProperties embeddedProperties,
ClientConfiguration clientConfiguration) {
if (embeddedProperties.isEnabled()) {
log.debug("starting embedded elasticsearch on {}:{}",
embeddedProperties.getHost(),
@ -66,10 +60,10 @@ public class ElasticSearchConfiguration {
new EmbeddedElasticSearch(embeddedProperties).start();
}
WebClientProvider provider = getWebClientProvider(clientConfiguration);
HostProvider hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(),
HostProvider<?> hostProvider = HostProvider.provider(provider,
clientConfiguration.getHeadersSupplier(),
clientConfiguration
.getEndpoints()
.toArray(new InetSocketAddress[0]));
@ -85,77 +79,45 @@ public class ElasticSearchConfiguration {
private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) {
Duration connectTimeout = clientConfiguration.getConnectTimeout();
Duration soTimeout = clientConfiguration.getSocketTimeout();
TcpClient tcpClient = TcpClient.create();
if (!connectTimeout.isNegative()) {
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
}
if (!soTimeout.isNegative()) {
tcpClient = tcpClient.doOnConnected(connection -> connection //
.addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)));
}
if (clientConfiguration.getProxy().isPresent()) {
String proxy = clientConfiguration.getProxy().get();
String[] hostPort = proxy.split(":");
if (hostPort.length != 2) {
throw new IllegalArgumentException("invalid proxy configuration " + proxy + ", should be \"host:port\"");
}
tcpClient = tcpClient.proxy(proxyOptions -> proxyOptions
.type(ProxyProvider.Proxy.HTTP).host(hostPort[0])
.port(Integer.parseInt(hostPort[1])));
}
String scheme = "http";
HttpClient httpClient = HttpClient.from(tcpClient);
if (clientConfiguration.useSsl()) {
Optional<SSLContext> sslContext = clientConfiguration.getSslContext();
if (sslContext.isPresent()) {
httpClient = httpClient.secure(sslContextSpec -> {
sslContextSpec.sslContext(new JdkSslContext(sslContext.get(), true, null, IdentityCipherSuiteFilter.INSTANCE,
ApplicationProtocolConfig.DISABLED, ClientAuth.NONE, null, false));
});
} else {
httpClient = httpClient.secure();
}
scheme = "https";
}
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
WebClientProvider provider = WebClientProvider.create(scheme, connector);
if (clientConfiguration.getPathPrefix() != null) {
provider = provider.withPathPrefix(clientConfiguration.getPathPrefix());
}
provider = provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()) //
.withWebClientConfigurer(clientConfiguration.getWebClientConfigurer());
return provider;
return WebClientProvider.getWebClientProvider(clientConfiguration);
}
@Bean
@SneakyThrows
public ElasticRestClient elasticRestClient() {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(properties.createHosts())
.setRequestConfigCallback(properties::applyRequestConfigBuilder)
.setHttpClientConfigCallback(properties::applyHttpAsyncClientBuilder));
return new ElasticRestClient(client, client);
public DirectElasticSearchIndexStrategy directElasticSearchIndexStrategy(ReactiveElasticsearchClient elasticsearchClient,
ElasticSearchIndexProperties indexProperties) {
return new DirectElasticSearchIndexStrategy(elasticsearchClient, indexProperties);
}
@Bean(destroyMethod = "close")
public RestHighLevelClient restHighLevelClient(ElasticRestClient client) {
return client.getWriteClient();
@Bean
public TimeByMonthElasticSearchIndexStrategy timeByMonthElasticSearchIndexStrategy(ReactiveElasticsearchClient elasticsearchClient,
ElasticSearchIndexProperties indexProperties) {
return new TimeByMonthElasticSearchIndexStrategy(elasticsearchClient, indexProperties);
}
@Bean
public DefaultElasticSearchIndexManager elasticSearchIndexManager(@Autowired(required = false) List<ElasticSearchIndexStrategy> strategies) {
return new DefaultElasticSearchIndexManager(strategies);
}
@Bean
@ConditionalOnMissingBean(ElasticSearchService.class)
public ReactiveElasticSearchService reactiveElasticSearchService(ReactiveElasticsearchClient elasticsearchClient,
ElasticSearchIndexManager indexManager,
ElasticSearchBufferProperties properties) {
return new ReactiveElasticSearchService(elasticsearchClient, indexManager, properties);
}
@Bean
public ReactiveAggregationService reactiveAggregationService(ElasticSearchIndexManager indexManager,
ReactiveElasticsearchClient restClient) {
return new ReactiveAggregationService(indexManager, restClient);
}
@Bean
public ElasticSearchTimeSeriesManager elasticSearchTimeSeriesManager(ElasticSearchIndexManager indexManager,
ElasticSearchService elasticSearchService,
AggregationService aggregationService) {
return new ElasticSearchTimeSeriesManager(indexManager, elasticSearchService, aggregationService);
}
}

View File

@ -1,11 +1,9 @@
package org.jetlinks.community.elastic.search.index.strategies;
import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.time.LocalDate;
/**
* 按月对来划分索引策略
@ -13,17 +11,16 @@ import java.util.Date;
* @author zhouhao
* @since 1.0
*/
@Component
public class TimeByMonthElasticSearchIndexStrategy extends TemplateElasticSearchIndexStrategy {
private final String format = "yyyy-MM";
public TimeByMonthElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) {
super("time-by-month", client,properties);
super("time-by-month", client, properties);
}
@Override
public String getIndexForSave(String index) {
return wrapIndex(index).concat("_").concat(DateFormatter.toString(new Date(), format));
LocalDate now = LocalDate.now();
String idx = wrapIndex(index);
return idx + "_" + now.getYear() + "-" + now.getMonthValue();
}
}

View File

@ -13,4 +13,13 @@ public interface AggregationService {
Flux<Map<String, Object>> aggregation(String[] index, AggregationQueryParam queryParam);
/**
* @param index 索引
* @param queryParam 聚合查询参数
* @return 查询结果
* @see AggregationService#aggregation(String[], AggregationQueryParam)
*/
default Flux<Map<String, Object>> aggregation(String index, AggregationQueryParam queryParam) {
return aggregation(new String[]{index}, queryParam);
}
}

View File

@ -1,444 +0,0 @@
package org.jetlinks.community.elastic.search.service;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher;
import org.springframework.context.annotation.DependsOn;
import org.springframework.util.StringUtils;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.function.Consumer3;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* @author zhouhao
* @since 1.0
**/
//@Service
@Slf4j
@DependsOn("restHighLevelClient")
@Deprecated
public class DefaultElasticSearchService implements ElasticSearchService {
private final ElasticRestClient restClient;
private final ElasticSearchIndexManager indexManager;
FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false
);
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
}
public DefaultElasticSearchService(ElasticRestClient restClient,
ElasticSearchIndexManager indexManager) {
this.restClient = restClient;
init();
this.indexManager = indexManager;
}
@Override
public <T> Flux<T> multiQuery(String[] index, Collection<QueryParam> queryParam, Function<Map<String, Object>, T> mapper) {
return indexManager
.getIndexesMetadata(index)
.flatMap(idx -> Mono.zip(
Mono.just(idx), getIndexForSearch(idx.getIndex())
))
.take(1)
.singleOrEmpty()
.flatMapMany(indexMetadata -> {
MultiSearchRequest request = new MultiSearchRequest();
return Flux
.fromIterable(queryParam)
.flatMap(entry -> createSearchRequest(entry, index))
.doOnNext(request::add)
.then(Mono.just(request))
.flatMapMany(searchRequest -> ReactorActionListener
.<MultiSearchResponse>mono(actionListener -> {
restClient.getQueryClient()
.msearchAsync(searchRequest, RequestOptions.DEFAULT, actionListener);
})
.flatMapMany(response -> Flux.fromArray(response.getResponses()))
.flatMap(item -> {
if (item.isFailure()) {
log.warn(item.getFailureMessage(), item.getFailure());
return Mono.empty();
}
return Flux.fromIterable(translate((map) -> mapper.apply(indexMetadata.getT1().convertFromElastic(map)), item.getResponse()));
}))
;
});
}
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(index, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(index, queryParam)
.flatMap(tp2 ->
convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits().value, list, queryParam))
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
}
private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
SearchResponse response,
Function<Map<String, Object>, T> mapper) {
return Flux
.create(sink -> {
Map<String, ElasticSearchIndexMetadata> metadata = indexList
.stream()
.collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity()));
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
sink.next(mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap)));
}
sink.complete();
});
}
private Mono<Tuple2<List<ElasticSearchIndexMetadata>, SearchResponse>> doQuery(String[] index,
QueryParam queryParam) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(metadataList -> this
.createSearchRequest(queryParam, metadataList)
.flatMap(this::doSearch)
.map(response -> Tuples.of(metadataList, response))
).onErrorResume(err -> {
log.error(err.getMessage(), err);
return Mono.empty();
});
}
@Override
public Mono<Long> count(String[] index, QueryParam queryParam) {
QueryParam param = queryParam.clone();
param.setPaging(false);
return createCountRequest(param, index)
.flatMap(this::doCount)
.map(CountResponse::getCount)
.defaultIfEmpty(0L)
.onErrorReturn(err -> {
log.error("query elastic error", err);
return true;
}, 0L);
}
@Override
public Mono<Long> delete(String index, QueryParam queryParam) {
return createQueryBuilder(queryParam, index)
.flatMap(request -> ReactorActionListener
.<BulkByScrollResponse>mono(listener ->
restClient
.getWriteClient()
.deleteByQueryAsync(new DeleteByQueryRequest(index)
.setQuery(request),
RequestOptions.DEFAULT, listener)))
.map(BulkByScrollResponse::getDeleted);
}
@Override
public <T> Mono<Void> commit(String index, T payload) {
sink.next(new Buffer(index, payload));
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Collection<T> payload) {
for (T t : payload) {
sink.next(new Buffer(index, t));
}
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Publisher<T> data) {
return Flux.from(data)
.flatMap(d -> commit(index, d))
.then();
}
@Override
public <T> Mono<Void> save(String index, T payload) {
return save(index, Mono.just(payload));
}
@Override
public <T> Mono<Void> save(String index, Publisher<T> data) {
return Flux.from(data)
.map(v -> new Buffer(index, v))
.collectList()
.flatMap(this::doSave)
.then();
}
@Override
public <T> Mono<Void> save(String index, Collection<T> payload) {
return save(index, Flux.fromIterable(payload));
}
@PreDestroy
public void shutdown() {
sink.complete();
}
//@PostConstruct
public void init() {
//最小间隔
int flushRate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//缓冲背压
int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64);
FluxUtils.bufferRate(
Flux.<Buffer>create(sink -> this.sink = sink),
flushRate,
bufferSize,
bufferTimeout)
.onBackpressureBuffer(bufferBackpressure,
drop -> System.err.println("无法处理更多索引请求!"),
BufferOverflowStrategy.DROP_OLDEST)
.parallel()
.runOn(Schedulers.newParallel("elasticsearch-writer"))
.flatMap(buffers -> {
long time = System.currentTimeMillis();
return this
.doSave(buffers)
.doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
.onErrorContinue((err, obj) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
});
})
.subscribe();
}
@AllArgsConstructor
@Getter
static class Buffer {
String index;
Object payload;
}
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSave(index));
}
private Mono<String> getIndexForSearch(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSearch(index));
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
.groupBy(Buffer::getIndex)
.flatMap(group -> {
String index = group.key();
return this.getIndexForSave(index)
.zipWith(indexManager.getIndexMetadata(index))
.flatMapMany(tp2 ->
group.map(buffer -> {
Map<String, Object> data = FastBeanCopier.copy(buffer.getPayload(), HashMap::new);
IndexRequest request;
if (data.get("id") != null) {
request = new IndexRequest(tp2.getT1(), "_doc", String.valueOf(data.get("id")));
} else {
request = new IndexRequest(tp2.getT1(), "_doc");
}
request.source(tp2.getT2().convertToElastic(data));
return request;
}));
})
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
lst.forEach(request::add);
return ReactorActionListener.<BulkResponse>mono(listener ->
restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener))
.doOnNext(this::checkResponse);
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Mono<SearchResponse> doSearch(SearchRequest request) {
return this
.execute(request, restClient.getQueryClient()::searchAsync)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private <REQ, RES> Mono<RES> execute(REQ request, Consumer3<REQ, RequestOptions, ActionListener<RES>> function4) {
return ReactorActionListener.mono(actionListener -> function4.accept(request, RequestOptions.DEFAULT, actionListener));
}
private Mono<CountResponse> doCount(CountRequest request) {
return this
.execute(request, restClient.getQueryClient()::countAsync)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
return indexManager
.getIndexesMetadata(indexes)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createSearchRequest(queryParam, list));
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions)
.types("_doc"));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
}

View File

@ -20,8 +20,20 @@ public enum AggType {
public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.max(name).field(filed).missing(0);
}
},
MEDIAN("中间值") {
@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.medianAbsoluteDeviation(name).field(filed).missing(0);
}
},
STDDEV("标准差") {
@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.extendedStats(name)
.field(filed)
.missing(0);
}
},
COUNT("非空值计数") {
@Override
@ -30,13 +42,6 @@ public enum AggType {
}
},
MIN("最小") {
@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.min(name).field(filed).missing(0);
}
},
DISTINCT_COUNT("去重计数") {
@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
@ -47,7 +52,12 @@ public enum AggType {
}
},
MIN("最小") {
@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
return AggregationBuilders.min(name).field(filed).missing(0);
}
},
FIRST("第一条数据") {
@Override
public AggregationBuilder aggregationBuilder(String name, String filed) {
@ -85,7 +95,7 @@ public enum AggType {
return type;
}
}
throw new UnsupportedOperationException("不支持的聚合度量类型:" + name);
throw new UnsupportedOperationException("不支持的聚合类型:" + name);
}
}

View File

@ -60,17 +60,19 @@ import org.elasticsearch.client.indices.GetFieldMappingsRequest;
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.BytesRestResponse;
@ -84,16 +86,17 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.*;
import org.reactivestreams.Publisher;
import org.springframework.data.elasticsearch.client.ClientLogger;
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.RequestConverters;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.elasticsearch.core.ResponseConverter;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.util.Lazy;
import org.springframework.http.HttpHeaders;
@ -107,12 +110,11 @@ import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.function.Function3;
import javax.annotation.Nonnull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
@ -130,7 +132,7 @@ import static org.springframework.data.elasticsearch.client.util.RequestConverte
@Slf4j
@Generated
public class DefaultReactiveElasticsearchClient implements org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient,
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient,
org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster {
private final HostProvider<?> hostProvider;
private final RequestCreator requestCreator;
@ -342,12 +344,8 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
.flatMap(Flux::fromIterable);
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {
@Override@Nonnull
public Flux<SearchHit> scroll(@Nonnull HttpHeaders headers, SearchRequest searchRequest) {
TimeValue scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive()
: TimeValue.timeValueMinutes(1);
@ -356,60 +354,31 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
searchRequest.scroll(scrollTimeout);
}
EmitterProcessor<ActionRequest> outbound = EmitterProcessor.create(false);
FluxSink<ActionRequest> request = outbound.sink();
return Flux
.usingWhen(Mono.fromSupplier(ScrollState::new),
state -> this
.sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers)
.expand(searchResponse -> {
EmitterProcessor<SearchResponse> inbound = EmitterProcessor.create(false);
state.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
return Mono.empty();
}
Flux<SearchResponse> exchange = outbound.startWith(searchRequest).flatMap(it -> {
return this
.sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout),
requestCreator.scroll(),
SearchResponse.class,
headers);
if (it instanceof SearchRequest) {
return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
} else if (it instanceof SearchScrollRequest) {
return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers);
} else if (it instanceof ClearScrollRequest) {
return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers)
.flatMap(discard -> Flux.empty());
}
throw new IllegalArgumentException(
String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it));
});
return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
scrollState -> {
Flux<SearchHit> searchHits = inbound
.<SearchResponse>handle((searchResponse, sink) -> {
scrollState.updateScrollId(searchResponse.getScrollId());
if (isEmpty(searchResponse.getHits())) {
inbound.onComplete();
outbound.onComplete();
} else {
sink.next(searchResponse);
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
.scroll(scrollTimeout);
request.next(searchScrollRequest);
}
})
.map(SearchResponse::getHits) //
.flatMap(Flux::fromIterable);
return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
},
state -> cleanupScroll(headers, state), //
(state, error) -> cleanupScroll(headers, state), //
state -> cleanupScroll(headers, state)); //
}),
state -> cleanupScroll(headers, state),
(state, ex) -> cleanupScroll(headers, state),
state -> cleanupScroll(headers, state))
.filter(it -> !isEmpty(it.getHits()))
.map(SearchResponse::getHits)
.flatMapIterable(Function.identity());
}
private static boolean isEmpty(@Nullable SearchHits hits) {
return hits != null && hits.getHits() != null && hits.getHits().length == 0;
}
@ -442,7 +411,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers)
.next()
.map(ByQueryResponse::of);
.map(ResponseConverter::byQueryResponseOf);
}
static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
@ -605,6 +574,17 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
.publishNext();
}
@Override
public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers).next();
}
@Override
public Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers).next()
.map(TaskSubmissionResponse::getTask);
}
// --> INDICES
/*
@ -712,11 +692,25 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
@Override
public Mono<Boolean> putMapping(HttpHeaders headers, org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
return sendRequest(putMappingRequest, requestCreator.putMappingRequest(), AcknowledgedResponse.class, headers)
return sendRequest(putMappingRequest, this::createPutMapping, AcknowledgedResponse.class, headers)
.map(AcknowledgedResponse::isAcknowledged)
.next();
}
private Request createPutMapping(org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) {
Request request = requestCreator.putMappingRequest().apply(putMappingRequest);
Request newReq = new Request(request.getMethod(), request.getEndpoint());
Params params = new Params(newReq)
.withTimeout(putMappingRequest.timeout())
.withMasterTimeout(putMappingRequest.masterNodeTimeout());
if (serverVersion().before(Version.V_7_0_0)) {
params.putParam("include_type_name", "false");
}
newReq.setEntity(request.getEntity());
return newReq;
}
/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#flushIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.flush.FlushRequest)
@ -1167,7 +1161,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit
@Override
public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) {
return sendRequest(getMappingsRequest, requestCreator.getMapping(),
org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse.class, headers).next();
GetMappingsResponse.class, headers).next();
}
@Override

View File

@ -0,0 +1,20 @@
package org.jetlinks.community.elastic.search.service.reactive;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Getter
@Setter
@ConfigurationProperties(prefix = "elasticsearch.buffer")
public class ElasticSearchBufferProperties extends BufferProperties {
public ElasticSearchBufferProperties() {
//固定缓冲文件目录
setFilePath("./data/elasticsearch-buffer");
setSize(3000);
}
private boolean refreshWhenWrite = false;
}

View File

@ -21,16 +21,14 @@ import org.elasticsearch.search.sort.SortOrder;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.ezorm.core.param.Term;
import org.hswebframework.ezorm.core.param.TermType;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
@ -45,11 +43,9 @@ import java.util.stream.Collectors;
* @author zhouhao
* @since 1.5
**/
@Service
@Slf4j
public class ReactiveAggregationService implements AggregationService {
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
@ -108,7 +104,9 @@ public class ReactiveAggregationService implements AggregationService {
.terms(group.getAlias())
.field(group.getProperty());
if (group instanceof LimitGroup) {
builder.size(((LimitGroup) group).getLimit());
if (((LimitGroup) group).getLimit() > 0) {
builder.size(((LimitGroup) group).getLimit());
}
} else {
builder.size(100);
}
@ -142,8 +140,10 @@ public class ReactiveAggregationService implements AggregationService {
boolean group = aggregationBuilder != null;
for (AggregationColumn aggColumn : aggregationQueryParam.getAggColumns()) {
AggregationBuilder builder = AggType.of(aggColumn.getAggregation().name())
.aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty());
AggregationBuilder builder = AggType
.of(aggColumn.getAggregation().name())
.aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty());
if (builder instanceof TopHitsAggregationBuilder) {
TopHitsAggregationBuilder topHitsBuilder = ((TopHitsAggregationBuilder) builder);
if (CollectionUtils.isEmpty(queryParam.getSorts())) {
@ -154,7 +154,9 @@ public class ReactiveAggregationService implements AggregationService {
.stream()
.map(sort -> SortBuilders
.fieldSort(sort.getName())
.order("desc".equalsIgnoreCase(sort.getOrder()) ? SortOrder.DESC : SortOrder.ASC))
.order("desc".equalsIgnoreCase(sort.getOrder())
? SortOrder.DESC
: SortOrder.ASC))
.collect(Collectors.toList()));
}
if (aggColumn instanceof LimitAggregationColumn) {
@ -170,36 +172,37 @@ public class ReactiveAggregationService implements AggregationService {
}
}
return Flux.fromArray(index)
.flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
.collectList()
.flatMap(strategy -> this
.createSearchSourceBuilder(queryParam, index[0])
.map(builder -> {
aggs.forEach(builder.size(0)::aggregation);
return new SearchRequest(strategy
.stream()
.map(tp2 -> tp2
.getT1()
.getIndexForSearch(tp2.getT2()))
.toArray(String[]::new))
.indicesOptions(ReactiveElasticSearchService.indexOptions)
.source(builder);
}
)
)
.flatMap(restClient::searchForPage)
.flatMapMany(this::parseResult)
.as(flux -> {
if (!group) {
return flux
.map(Map::entrySet)
.flatMap(Flux::fromIterable)
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
.flux();
}
return flux;
})
return Flux
.fromArray(index)
.flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx)))
.collectList()
.flatMap(strategy -> this
.createSearchSourceBuilder(queryParam, index[0])
.map(builder -> {
aggs.forEach(builder.size(0)::aggregation);
return new SearchRequest(strategy
.stream()
.map(tp2 -> tp2
.getT1()
.getIndexForSearch(tp2.getT2()))
.toArray(String[]::new))
.indicesOptions(ReactiveElasticSearchService.indexOptions)
.source(builder);
}
)
)
.flatMap(restClient::searchForPage)
.flatMapMany(this::parseResult)
.as(flux -> {
if (!group) {
return flux
.map(Map::entrySet)
.flatMap(Flux::fromIterable)
.collectMap(Map.Entry::getKey, Map.Entry::getValue)
.flux();
}
return flux;
})
;
}
@ -209,7 +212,8 @@ public class ReactiveAggregationService implements AggregationService {
.flatMap(agg -> parseAggregation(agg.getName(), agg), Integer.MAX_VALUE);
}
private Flux<Map<String, Object>> parseAggregation(String name, org.elasticsearch.search.aggregations.Aggregation aggregation) {
private Flux<Map<String, Object>> parseAggregation(String name,
org.elasticsearch.search.aggregations.Aggregation aggregation) {
if (aggregation instanceof Terms) {
return parseAggregation(((Terms) aggregation));
}
@ -310,7 +314,7 @@ public class ReactiveAggregationService implements AggregationService {
.ofDays(Integer.getInteger("elasticsearch.agg.default-range-day", 90))
.toMillis();
private static long calculateStartWithTime(AggregationQueryParam param) {
static long calculateStartWithTime(AggregationQueryParam param) {
long startWithParam = param.getStartWithTime();
if (startWithParam == 0) {
//从查询条件中提取时间参数来获取时间区间
@ -336,5 +340,4 @@ public class ReactiveAggregationService implements AggregationService {
return startWithParam;
}
}

View File

@ -14,18 +14,19 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.hswebframework.ezorm.core.param.QueryParam;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.MemoryUsage;
@ -38,11 +39,9 @@ import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClientException;
@ -73,13 +72,8 @@ import java.util.stream.Collectors;
@Slf4j
@DependsOn("reactiveElasticsearchClient")
@ConfigurationProperties(prefix = "elasticsearch")
@Service
public class ReactiveElasticSearchService implements ElasticSearchService {
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
private final ReactiveElasticsearchClient restClient;
@Getter
@ -96,10 +90,21 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
private PersistenceBuffer<Buffer> writer;
@Getter
@Setter
private ElasticSearchBufferProperties buffer;
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager) {
this(restClient, indexManager, new ElasticSearchBufferProperties());
}
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager,
ElasticSearchBufferProperties buffer) {
this.restClient = restClient;
this.indexManager = indexManager;
this.buffer = buffer;
init();
}
@ -269,6 +274,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
private boolean checkWritable(String index) {
// if (SystemUtils.memoryIsOutOfWatermark()) {
// SystemUtils.printError("JVM内存不足,elasticsearch无法处理更多索引[%s]请求!", index);
// return false;
// }
return true;
}
@ -324,6 +333,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
writer.dispose();
}
@Getter
@Setter
public static class BufferConfig extends BufferProperties {
@ -468,7 +478,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
if (buffer.refreshWhenWrite) {
if (buffer.isRefreshWhenWrite()) {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
lst.forEach(request::add);

View File

@ -28,5 +28,4 @@ public interface ReactiveElasticsearchClient extends
Mono<AcknowledgedResponse> updateTemplate(PutIndexTemplateRequest request);
Version serverVersion();
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.elastic.search.things;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.things.data.operations.ColumnModeDDLOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import reactor.core.publisher.Mono;
import java.util.List;
class ElasticSearchColumnModeDDLOperations extends ColumnModeDDLOperationsBase {
private final ElasticSearchIndexManager indexManager;
public ElasticSearchColumnModeDDLOperations(String thingType,
String templateId,
String thingId,
DataSettings settings,
MetricBuilder metricBuilder,
ElasticSearchIndexManager indexManager) {
super(thingType, templateId, thingId, settings, metricBuilder);
this.indexManager = indexManager;
}
@Override
protected Mono<Void> register(MetricType metricType,String metric, List<PropertyMetadata> properties) {
return indexManager
.putIndex(new DefaultElasticSearchIndexMetadata(metric, properties));
}
@Override
protected Mono<Void> reload(MetricType metricType,String metric, List<PropertyMetadata> properties) {
return indexManager
.putIndex(new DefaultElasticSearchIndexMetadata(metric, properties));
}
}

View File

@ -0,0 +1,130 @@
package org.jetlinks.community.elastic.search.things;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
class ElasticSearchColumnModeQueryOperations extends ColumnModeQueryOperationsBase {
private final ElasticSearchService searchService;
private final AggregationService aggregationService;
public ElasticSearchColumnModeQueryOperations(String thingType,
String thingTemplateId,
String thingId,
MetricBuilder metricBuilder,
DataSettings settings,
ThingsRegistry registry,
ElasticSearchService service,
AggregationService aggregationService) {
super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
this.searchService = service;
this.aggregationService = aggregationService;
}
@Override
protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
return searchService
.query(metric,
query.getParam(),
data -> {
long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue();
data.put("timestamp",ts);
return TimeSeriesData.of(ts, data);
});
}
@Override
protected <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper) {
return searchService
.queryPager(metric,
query.getParam(),
data -> {
long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue();
data.put("timestamp",ts);
return mapper.apply(TimeSeriesData.of(ts, data));
});
}
@Override
protected Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AggregationContext context) {
org.joda.time.format.DateTimeFormatter formatter = DateTimeFormat.forPattern(request.getFormat());
PropertyAggregation[] properties = context.getProperties();
return AggregationQueryParam
.of()
.as(param -> {
for (PropertyAggregation property : properties) {
param.agg(property.getProperty(), property.getAlias(), property.getAgg());
}
return param;
})
.as(param -> {
if (request.getInterval() == null) {
return param;
}
return param.groupBy((Group) new TimeGroup(request.getInterval(), "time", request.getFormat()));
})
.limit(request.getLimit() * properties.length)
.from(request.getFrom())
.to(request.getTo())
.filter(request.getFilter())
.execute(param -> aggregationService.aggregation(metric, param))
.map(AggregationData::of)
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.flatMap(group -> group
.map(data -> {
Map<String, Object> newMap = new HashMap<>();
newMap.put("time", data.get("time").orElse(null));
for (PropertyAggregation property : properties) {
Object val;
if(property.getAgg() ==Aggregation.FIRST || property.getAgg()==Aggregation.TOP){
val = data
.get(property.getProperty())
.orElse(null);
}else {
val = data
.get(property.getAlias())
.orElse(null);
}
if (null != val) {
newMap.put(property.getAlias(), val);
}
}
return newMap;
})
.reduce((a, b) -> {
a.putAll(b);
return a;
})
.map(AggregationData::of))
.sort(Comparator.<AggregationData, Date>comparing(agg -> DateTime
.parse(agg.getString("time", ""), formatter)
.toDate()).reversed())
.take(request.getLimit())
;
}
}

View File

@ -0,0 +1,33 @@
package org.jetlinks.community.elastic.search.things;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.operations.ColumnModeSaveOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.timeseries.TimeSeriesData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class ElasticSearchColumnModeSaveOperations extends ColumnModeSaveOperationsBase {
private final ElasticSearchService searchService;
public ElasticSearchColumnModeSaveOperations(ThingsRegistry registry,
MetricBuilder metricBuilder,
DataSettings settings,
ElasticSearchService searchService) {
super(registry, metricBuilder, settings);
this.searchService = searchService;
}
@Override
protected Mono<Void> doSave(String metric, TimeSeriesData data) {
return searchService.commit(metric, data.getData());
}
@Override
protected Mono<Void> doSave(String metric, Flux<TimeSeriesData> data) {
return searchService.save(metric, data.map(TimeSeriesData::getData));
}
}

View File

@ -0,0 +1,66 @@
package org.jetlinks.community.elastic.search.things;
import lombok.AllArgsConstructor;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy;
import org.jetlinks.community.things.data.operations.*;
@AllArgsConstructor
public class ElasticSearchColumnModeStrategy extends AbstractThingDataRepositoryStrategy {
private final ThingsRegistry registry;
private final ElasticSearchService searchService;
private final AggregationService aggregationService;
private final ElasticSearchIndexManager indexManager;
@Override
public String getId() {
return "default-column";
}
@Override
public String getName() {
return "ElasticSearch-列式存储";
}
@Override
public SaveOperations createOpsForSave(OperationsContext context) {
return new ElasticSearchColumnModeSaveOperations(
registry,
context.getMetricBuilder(),
context.getSettings(),
searchService);
}
@Override
protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) {
return new ElasticSearchColumnModeQueryOperations(
thingType,
templateId,
thingId,
context.getMetricBuilder(),
context.getSettings(),
registry,
searchService,
aggregationService);
}
@Override
protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) {
return new ElasticSearchColumnModeDDLOperations(
thingType,
templateId,
thingId,
context.getSettings(),
context.getMetricBuilder(),
indexManager);
}
@Override
public int getOrder() {
return 10001;
}
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.elastic.search.things;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeDDLOperationsBase;
import reactor.core.publisher.Mono;
import java.util.List;
class ElasticSearchRowModeDDLOperations extends RowModeDDLOperationsBase {
private final ElasticSearchIndexManager indexManager;
public ElasticSearchRowModeDDLOperations(String thingType,
String templateId,
String thingId,
DataSettings settings,
MetricBuilder metricBuilder,
ElasticSearchIndexManager indexManager) {
super(thingType, templateId, thingId, settings, metricBuilder);
this.indexManager = indexManager;
}
@Override
protected Mono<Void> register(MetricType metricType, String metric, List<PropertyMetadata> properties) {
return indexManager
.putIndex(new DefaultElasticSearchIndexMetadata(metric, properties));
}
@Override
protected Mono<Void> reload(MetricType metricType, String metric, List<PropertyMetadata> properties) {
return indexManager
.putIndex(new DefaultElasticSearchIndexMetadata(metric, properties));
}
}

View File

@ -0,0 +1,235 @@
package org.jetlinks.community.elastic.search.things;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.things.data.ThingPropertyDetail;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.function.Function;
class ElasticSearchRowModeQueryOperations extends RowModeQueryOperationsBase {
private final ElasticSearchService searchService;
private final AggregationService aggregationService;
public ElasticSearchRowModeQueryOperations(String thingType,
String thingTemplateId,
String thingId,
MetricBuilder metricBuilder,
DataSettings settings,
ThingsRegistry registry,
ElasticSearchService service,
AggregationService aggregationService) {
super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
this.searchService = service;
this.aggregationService = aggregationService;
}
@Override
protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
return searchService
.query(metric,
query.getParam(),
data -> {
long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue();
return TimeSeriesData.of(ts, data);
});
}
@Override
protected <T> Mono<PagerResult<T>> doQueryPage(String metric,
Query<?, QueryParamEntity> query,
Function<TimeSeriesData, T> mapper) {
return searchService
.queryPager(metric,
query.getParam(),
data -> {
long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue();
return mapper.apply(TimeSeriesData.of(ts, data));
});
}
@Override
protected Flux<ThingPropertyDetail> queryEachProperty(@Nonnull String metric,
@Nonnull Query<?, QueryParamEntity> query,
@Nonnull ThingMetadata metadata,
@Nonnull Map<String, PropertyMetadata> properties) {
QueryParamEntity param = query.getParam();
//不分页或者每页数量大于1000则回退到普通方式查询
if (!param.isPaging() || param.getPageSize() >= 1000) {
return super.queryEachProperty(metric, query, metadata, properties);
}
if (properties.size() <= 200) {
query.in(ThingsDataConstants.COLUMN_PROPERTY_ID, properties.keySet());
}
//通过聚合查询求top n来查询每一个属性数据
return AggregationQueryParam
.of()
.agg(new LimitAggregationColumn(ThingsDataConstants.COLUMN_PROPERTY_ID,
ThingsDataConstants.COLUMN_PROPERTY_ID,
Aggregation.FIRST,
param.getPageSize()))
.groupBy(new LimitGroup(ThingsDataConstants.COLUMN_PROPERTY_ID,
ThingsDataConstants.COLUMN_PROPERTY_ID,
param.getPageSize() * properties.size())) //按property分组
.filter(param)
.execute(params -> aggregationService.aggregation(metric, params))
.mapNotNull(data -> {
long ts = CastUtils.castNumber(data.getOrDefault(ThingsDataConstants.COLUMN_TIMESTAMP, 0L)).longValue();
String property = (String) data.getOrDefault(ThingsDataConstants.COLUMN_PROPERTY_ID, null);
String thingId = (String) data.getOrDefault(metricBuilder.getThingIdProperty(), null);
Object value = data.getOrDefault(ThingsDataConstants.COLUMN_PROPERTY_VALUE, null);
if (property == null || thingId == null || value == null) {
return null;
}
return ThingPropertyDetail
.of(TimeSeriesData.of(ts, data), properties.get(property));
});
}
@Override
protected Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AggregationContext context) {
PropertyAggregation[] properties = context.getProperties();
//只聚合一个属性时
if (properties.length == 1) {
return AggregationQueryParam
.of()
.agg(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE, properties[0].getAlias(), properties[0].getAgg())
.as(param -> {
if (request.getInterval() == null) {
return param;
}
return param.groupBy(request.getInterval(), request.getFormat());
})
.limit(request.getLimit())
.from(request.getFrom())
.to(request.getTo())
.filter(request.getFilter())
.filter(query -> query.where(ThingsDataConstants.COLUMN_PROPERTY_ID, properties[0].getProperty()))
.execute(param -> aggregationService.aggregation(metric, param))
.doOnNext(agg -> agg.remove("_time"))
.map(AggregationData::of)
.take(request.getLimit());
}
Map<String, String> propertyAlias = context.getPropertyAlias();
Map<String, PropertyAggregation> aliasProperty = context.getAliasToProperty();
return AggregationQueryParam
.of()
.as(param -> {
Arrays.stream(properties)
.forEach(agg -> param.agg(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE, "value_" + agg.getAlias(), agg.getAgg()));
return param;
})
.as(param -> {
if (request.getInterval() == null) {
return param;
}
return param.groupBy((Group) new TimeGroup(request.getInterval(), "time", request.getFormat()));
})
.groupBy(new LimitGroup(ThingsDataConstants.COLUMN_PROPERTY_ID, ThingsDataConstants.COLUMN_PROPERTY_ID, properties.length))
.limit(request.getLimit() * properties.length)
.from(request.getFrom())
.to(request.getTo())
.filter(request.getFilter())
.filter(query -> query
.where()
.in(ThingsDataConstants.COLUMN_PROPERTY_ID, new HashSet<>(propertyAlias.values())))
//执行查询
.execute(param -> aggregationService.aggregation(metric, param))
.map(AggregationData::of)
//按时间分组,然后将返回的结果合并起来
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.as(flux -> {
//按时间分组
if (request.getInterval() != null) {
return flux
.flatMap(group -> {
String time = group.key();
return group
//按属性分组
.groupBy(agg -> agg.getString(ThingsDataConstants.COLUMN_PROPERTY_ID, ""), Integer.MAX_VALUE)
.flatMap(propsGroup -> {
String property = propsGroup.key();
return propsGroup
.reduce(AggregationData::merge)
.map(agg -> {
Map<String, Object> data = new HashMap<>();
data.put("_time", agg.get("_time").orElse(time));
data.put("time", time);
aliasProperty.forEach((alias, prp) -> {
if (prp.getAgg() == Aggregation.FIRST || prp.getAgg() == Aggregation.TOP) {
data.putIfAbsent(alias, agg
.get(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE)
.orElse(agg.get("value").orElse(null)));
} else if (property.equals(prp.getProperty())) {
Object value = agg
.get("value_" + alias)
.orElse(0);
data.putIfAbsent(alias, value);
}
});
return data;
});
})
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putAll(b);
return a;
});
}
);
} else {
return flux
.flatMap(group -> group
.reduce(AggregationData::merge)
.map(agg -> {
Map<String, Object> values = new HashMap<>();
//values.put("time", group.key());
for (Map.Entry<String, String> props : propertyAlias.entrySet()) {
values.put(props.getKey(), agg
.get("value_" + props.getKey())
.orElse(0));
}
return values;
}));
}
})
.map(map -> {
map.remove("");
propertyAlias
.keySet()
.forEach(key -> map.putIfAbsent(key, 0));
return AggregationData.of(map);
})
.sort(Comparator
.<AggregationData, Date>comparing(agg -> CastUtils.castDate(agg.values().get("_time")))
.reversed())
.doOnNext(agg -> agg.values().remove("_time"))
.take(request.getLimit())
;
}
}

View File

@ -0,0 +1,33 @@
package org.jetlinks.community.elastic.search.things;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeSaveOperationsBase;
import org.jetlinks.community.timeseries.TimeSeriesData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class ElasticSearchRowModeSaveOperations extends RowModeSaveOperationsBase {
private final ElasticSearchService searchService;
public ElasticSearchRowModeSaveOperations(ThingsRegistry registry,
MetricBuilder metricBuilder,
DataSettings settings,
ElasticSearchService searchService) {
super(registry, metricBuilder, settings);
this.searchService = searchService;
}
@Override
protected Mono<Void> doSave(String metric, TimeSeriesData data) {
return searchService.commit(metric, data.getData());
}
@Override
protected Mono<Void> doSave(String metric, Flux<TimeSeriesData> data) {
return searchService.save(metric, data.map(TimeSeriesData::getData));
}
}

View File

@ -0,0 +1,66 @@
package org.jetlinks.community.elastic.search.things;
import lombok.AllArgsConstructor;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy;
import org.jetlinks.community.things.data.operations.*;
@AllArgsConstructor
public class ElasticSearchRowModeStrategy extends AbstractThingDataRepositoryStrategy {
private final ThingsRegistry registry;
private final ElasticSearchService searchService;
private final AggregationService aggregationService;
private final ElasticSearchIndexManager indexManager;
@Override
public String getId() {
return "default-row";
}
@Override
public String getName() {
return "ElasticSearch-行式存储";
}
@Override
public SaveOperations createOpsForSave(OperationsContext context) {
return new ElasticSearchRowModeSaveOperations(
registry,
context.getMetricBuilder(),
context.getSettings(),
searchService);
}
@Override
protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) {
return new ElasticSearchRowModeQueryOperations(
thingType,
templateId,
thingId,
context.getMetricBuilder(),
context.getSettings(),
registry,
searchService,
aggregationService);
}
@Override
protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) {
return new ElasticSearchRowModeDDLOperations(
thingType,
templateId,
thingId,
context.getSettings(),
context.getMetricBuilder(),
indexManager);
}
@Override
public int getOrder() {
return 10000;
}
}