diff --git a/jetlinks-components/elasticsearch-component/pom.xml b/jetlinks-components/elasticsearch-component/pom.xml index a619d3be..a46253f5 100644 --- a/jetlinks-components/elasticsearch-component/pom.xml +++ b/jetlinks-components/elasticsearch-component/pom.xml @@ -72,6 +72,11 @@ reactor-netty + + ${project.groupId} + things-component + ${project.version} + \ No newline at end of file diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java deleted file mode 100644 index c26796ec..00000000 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java +++ /dev/null @@ -1,268 +0,0 @@ -package org.jetlinks.community.elastic.search.aggreation; - -import lombok.extern.slf4j.Slf4j; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.hswebframework.ezorm.core.param.QueryParam; -import org.hswebframework.ezorm.core.param.TermType; -import org.jetlinks.community.elastic.search.ElasticRestClient; -import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket; -import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure; -import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse; -import org.jetlinks.community.elastic.search.aggreation.bucket.Sort; -import org.jetlinks.community.elastic.search.aggreation.enums.BucketType; -import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType; -import org.jetlinks.community.elastic.search.aggreation.enums.OrderType; -import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure; -import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; -import org.jetlinks.community.elastic.search.service.AggregationService; -import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService; -import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; -import org.jetlinks.community.elastic.search.utils.ReactorActionListener; -import org.jetlinks.community.timeseries.query.AggregationQueryParam; -import org.springframework.beans.factory.annotation.Autowired; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; - -import java.util.*; -import java.util.stream.Collectors; - -/** - * @author bsetfeng - * @since 1.0 - **/ -//@Service -@Slf4j -public class DefaultAggregationService implements AggregationService { - - private final ElasticRestClient restClient; - - private final ElasticSearchIndexManager indexManager; - - @Autowired - public DefaultAggregationService(ElasticSearchIndexManager indexManager, - ElasticRestClient restClient) { - this.restClient = restClient; - this.indexManager = indexManager; - } - -// @Override -// public Mono metricsAggregation(String index, QueryParam queryParam, -// MetricsAggregationStructure structure) { -// return createSearchSourceBuilder(queryParam, index) -// .map(builder -> new SearchRequest(index) -// .source(builder.aggregation(structure.getType().aggregationBuilder(structure.getName(), structure.getField())))) -// .flatMap(request -> Mono.create(monoSink -> -// restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink)))) -// .map(searchResponse -> structure.getType().getResponse(structure.getName(), searchResponse)); -// } -// -// @Override -// public Mono bucketAggregation(String index, QueryParam queryParam, BucketAggregationsStructure structure) { -// return createSearchSourceBuilder(queryParam, index) -// .map(builder -> new SearchRequest(index) -// .source(builder -// .aggregation(structure.getType().aggregationBuilder(structure)) -// //.aggregation(AggregationBuilders.topHits("last_val").from(1)) -// )) -//// .doOnNext(searchRequest -> { -//// if (log.isDebugEnabled()) { -//// log.debug("聚合查询ElasticSearch:{},参数:{}", index, JSON.toJSON(searchRequest.source().toString())); -//// } -//// }) -// .flatMap(request -> Mono.create(monoSink -> -// restClient -// .getQueryClient() -// .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink)))) -// .map(response -> BucketResponse.builder() -// .name(structure.getName()) -// .buckets(structure.getType().convert(response.getAggregations().get(structure.getName()))) -// .build()) -// ; -// -// } - - private Mono createSearchSourceBuilder(QueryParam queryParam, String index) { - - return indexManager - .getIndexMetadata(index) - .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata)); - } - - private ActionListener translatorActionListener(MonoSink sink) { - return new ActionListener() { - @Override - public void onResponse(T response) { - sink.success(response); - } - - @Override - public void onFailure(Exception e) { - if (e instanceof ElasticsearchException) { - if (((ElasticsearchException) e).status().getStatus() == 404) { - sink.success(); - return; - } else if (((ElasticsearchException) e).status().getStatus() == 400) { - sink.error(new ElasticsearchParseException("查询参数格式错误", e)); - } - } - sink.error(e); - } - }; - } - - @Override - public Flux> aggregation(String[] index, AggregationQueryParam aggregationQueryParam) { - QueryParam queryParam = prepareQueryParam(aggregationQueryParam); - BucketAggregationsStructure structure = createAggParameter(aggregationQueryParam); - return Flux.fromArray(index) - .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx))) - .collectList() - .flatMap(strategy -> - createSearchSourceBuilder(queryParam, index[0]) - .map(builder -> - new SearchRequest(strategy - .stream() - .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2())) - .toArray(String[]::new)) - .indicesOptions(DefaultElasticSearchService.indexOptions) - .source(builder.size(0).aggregation(structure.getType().aggregationBuilder(structure)) - ) - ) - ) - .flatMap(searchRequest -> - ReactorActionListener - .mono(listener -> - restClient.getQueryClient() - .searchAsync(searchRequest, RequestOptions.DEFAULT, listener) - )) - .filter(response -> response.getAggregations() != null) - .map(response -> BucketResponse.builder() - .name(structure.getName()) - .buckets(structure.getType().convert(response.getAggregations().get(structure.getName()))) - .build()) - .flatMapIterable(BucketsParser::convert) - .take(aggregationQueryParam.getLimit()) - ; - } - - static class BucketsParser { - - private final List> result = new ArrayList<>(); - - public static List> convert(BucketResponse response) { - return new BucketsParser(response).result; - } - - public BucketsParser(BucketResponse response) { - this(response.getBuckets()); - } - - public BucketsParser(List buckets) { - buckets.forEach(bucket -> parser(bucket, new HashMap<>())); - } - - public void parser(Bucket bucket, Map fMap) { - addBucketProperty(bucket, fMap); - if (bucket.getBuckets() != null && !bucket.getBuckets().isEmpty()) { - bucket.getBuckets().forEach(b -> { - Map map = new HashMap<>(fMap); - addBucketProperty(b, map); - parser(b, map); - }); - } else { - result.add(fMap); - } - } - - private void addBucketProperty(Bucket bucket, Map fMap) { - fMap.put(bucket.getName(), bucket.getKey()); - fMap.putAll(bucket.toMap()); - } - } - - protected static QueryParam prepareQueryParam(AggregationQueryParam param) { - QueryParam queryParam = param.getQueryParam().clone(); - queryParam.setPaging(false); - queryParam.and(param.getTimeProperty(), TermType.btw, Arrays.asList(calculateStartWithTime(param), param.getEndWithTime())); - if (queryParam.getSorts().isEmpty()) { - queryParam.orderBy(param.getTimeProperty()).desc(); - } - return queryParam; - } - - protected BucketAggregationsStructure createAggParameter(AggregationQueryParam param) { - List structures = new ArrayList<>(); - if (param.getGroupByTime() != null) { - structures.add(convertAggGroupTimeStructure(param)); - } - if (param.getGroupBy() != null && !param.getGroupBy().isEmpty()) { - structures.addAll(getTermTypeStructures(param)); - } - for (int i = 0, size = structures.size(); i < size; i++) { - if (i < size - 1) { - structures.get(i).setSubBucketAggregation(Collections.singletonList(structures.get(i + 1))); - } - if (i == size - 1) { - structures.get(i) - .setSubMetricsAggregation(param - .getAggColumns() - .stream() - .map(agg -> { - MetricsAggregationStructure metricsAggregationStructure = new MetricsAggregationStructure(); - metricsAggregationStructure.setField(agg.getProperty()); - metricsAggregationStructure.setName(agg.getAlias()); - metricsAggregationStructure.setType(MetricsType.of(agg.getAggregation().name())); - return metricsAggregationStructure; - }).collect(Collectors.toList())); - } - } - return structures.get(0); - } - - protected BucketAggregationsStructure convertAggGroupTimeStructure(AggregationQueryParam param) { - BucketAggregationsStructure structure = new BucketAggregationsStructure(); - structure.setInterval(param.getGroupByTime().getInterval().toString()); - structure.setType(BucketType.DATE_HISTOGRAM); - structure.setFormat(param.getGroupByTime().getFormat()); - structure.setName(param.getGroupByTime().getAlias()); - structure.setField(param.getGroupByTime().getProperty()); - structure.setSort(Sort.desc(OrderType.KEY)); - structure.setExtendedBounds(getExtendedBounds(param)); - return structure; - } - - protected static LongBounds getExtendedBounds(AggregationQueryParam param) { - return new LongBounds(calculateStartWithTime(param), param.getEndWithTime()); - } - - private static long calculateStartWithTime(AggregationQueryParam param) { - long startWithParam = param.getStartWithTime(); -// if (param.getGroupByTime() != null && param.getGroupByTime().getInterval() != null) { -// long timeInterval = param.getGroupByTime().getInterval().toMillis() * param.getLimit(); -// long tempStartWithParam = param.getEndWithTime() - timeInterval; -// startWithParam = Math.max(tempStartWithParam, startWithParam); -// } - return startWithParam; - } - - protected List getTermTypeStructures(AggregationQueryParam param) { - return param.getGroupBy() - .stream() - .map(group -> { - BucketAggregationsStructure structure = new BucketAggregationsStructure(); - structure.setType(BucketType.TERMS); - structure.setSize(param.getLimit()); - structure.setField(group.getProperty()); - structure.setName(group.getAlias()); - return structure; - }).collect(Collectors.toList()); - } -} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java old mode 100644 new mode 100755 index 2d58119b..da55377d --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java @@ -1,64 +1,58 @@ package org.jetlinks.community.elastic.search.configuration; -import io.netty.channel.ChannelOption; -import io.netty.handler.ssl.ApplicationProtocolConfig; -import io.netty.handler.ssl.ClientAuth; -import io.netty.handler.ssl.IdentityCipherSuiteFilter; -import io.netty.handler.ssl.JdkSslContext; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.handler.timeout.WriteTimeoutHandler; +import lombok.Generated; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.jetlinks.community.elastic.search.ElasticRestClient; import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearch; import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties; +import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexManager; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; -import org.jetlinks.community.elastic.search.service.reactive.DefaultReactiveElasticsearchClient; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy; +import org.jetlinks.community.elastic.search.index.strategies.DirectElasticSearchIndexStrategy; +import org.jetlinks.community.elastic.search.index.strategies.TimeByMonthElasticSearchIndexStrategy; +import org.jetlinks.community.elastic.search.service.AggregationService; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.elastic.search.service.reactive.*; +import org.jetlinks.community.elastic.search.timeseries.ElasticSearchTimeSeriesManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRestClientAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.reactive.HostProvider; import org.springframework.data.elasticsearch.client.reactive.RequestCreator; import org.springframework.data.elasticsearch.client.reactive.WebClientProvider; -import org.springframework.http.client.reactive.ReactorClientHttpConnector; -import reactor.netty.http.client.HttpClient; -import reactor.netty.tcp.TcpClient; -import reactor.netty.transport.ProxyProvider; -import javax.net.ssl.SSLContext; import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.TimeUnit; +import java.util.List; /** * @author bsetfeng * @author zhouhao * @since 1.0 **/ -@Configuration +@Configuration(proxyBeanMethods = false) @Slf4j @EnableConfigurationProperties({ - ElasticSearchProperties.class, EmbeddedElasticSearchProperties.class, - ElasticSearchIndexProperties.class}) + ElasticSearchIndexProperties.class, + ElasticSearchBufferProperties.class}) +@AutoConfigureAfter(ReactiveElasticsearchRestClientAutoConfiguration.class) +@ConditionalOnBean(ClientConfiguration.class) +@Generated public class ElasticSearchConfiguration { - private final ElasticSearchProperties properties; - - private final EmbeddedElasticSearchProperties embeddedProperties; - - public ElasticSearchConfiguration(ElasticSearchProperties properties, EmbeddedElasticSearchProperties embeddedProperties) { - this.properties = properties; - this.embeddedProperties = embeddedProperties; - } - @Bean @SneakyThrows - public DefaultReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) { + @Primary + public DefaultReactiveElasticsearchClient defaultReactiveElasticsearchClient(EmbeddedElasticSearchProperties embeddedProperties, + ClientConfiguration clientConfiguration) { if (embeddedProperties.isEnabled()) { log.debug("starting embedded elasticsearch on {}:{}", embeddedProperties.getHost(), @@ -66,10 +60,10 @@ public class ElasticSearchConfiguration { new EmbeddedElasticSearch(embeddedProperties).start(); } - WebClientProvider provider = getWebClientProvider(clientConfiguration); - HostProvider hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(), + HostProvider hostProvider = HostProvider.provider(provider, + clientConfiguration.getHeadersSupplier(), clientConfiguration .getEndpoints() .toArray(new InetSocketAddress[0])); @@ -85,77 +79,45 @@ public class ElasticSearchConfiguration { private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) { - Duration connectTimeout = clientConfiguration.getConnectTimeout(); - Duration soTimeout = clientConfiguration.getSocketTimeout(); - - TcpClient tcpClient = TcpClient.create(); - - if (!connectTimeout.isNegative()) { - tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis())); - } - - if (!soTimeout.isNegative()) { - tcpClient = tcpClient.doOnConnected(connection -> connection // - .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)) - .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))); - } - - if (clientConfiguration.getProxy().isPresent()) { - String proxy = clientConfiguration.getProxy().get(); - String[] hostPort = proxy.split(":"); - - if (hostPort.length != 2) { - throw new IllegalArgumentException("invalid proxy configuration " + proxy + ", should be \"host:port\""); - } - tcpClient = tcpClient.proxy(proxyOptions -> proxyOptions - .type(ProxyProvider.Proxy.HTTP).host(hostPort[0]) - .port(Integer.parseInt(hostPort[1]))); - } - - String scheme = "http"; - HttpClient httpClient = HttpClient.from(tcpClient); - - if (clientConfiguration.useSsl()) { - - Optional sslContext = clientConfiguration.getSslContext(); - - if (sslContext.isPresent()) { - httpClient = httpClient.secure(sslContextSpec -> { - sslContextSpec.sslContext(new JdkSslContext(sslContext.get(), true, null, IdentityCipherSuiteFilter.INSTANCE, - ApplicationProtocolConfig.DISABLED, ClientAuth.NONE, null, false)); - }); - } else { - httpClient = httpClient.secure(); - } - - scheme = "https"; - } - - ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); - WebClientProvider provider = WebClientProvider.create(scheme, connector); - - if (clientConfiguration.getPathPrefix() != null) { - provider = provider.withPathPrefix(clientConfiguration.getPathPrefix()); - } - - provider = provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()) // - .withWebClientConfigurer(clientConfiguration.getWebClientConfigurer()); - return provider; + return WebClientProvider.getWebClientProvider(clientConfiguration); } @Bean - @SneakyThrows - public ElasticRestClient elasticRestClient() { - - RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(properties.createHosts()) - .setRequestConfigCallback(properties::applyRequestConfigBuilder) - .setHttpClientConfigCallback(properties::applyHttpAsyncClientBuilder)); - return new ElasticRestClient(client, client); + public DirectElasticSearchIndexStrategy directElasticSearchIndexStrategy(ReactiveElasticsearchClient elasticsearchClient, + ElasticSearchIndexProperties indexProperties) { + return new DirectElasticSearchIndexStrategy(elasticsearchClient, indexProperties); } - @Bean(destroyMethod = "close") - public RestHighLevelClient restHighLevelClient(ElasticRestClient client) { - return client.getWriteClient(); + @Bean + public TimeByMonthElasticSearchIndexStrategy timeByMonthElasticSearchIndexStrategy(ReactiveElasticsearchClient elasticsearchClient, + ElasticSearchIndexProperties indexProperties) { + return new TimeByMonthElasticSearchIndexStrategy(elasticsearchClient, indexProperties); + } + + @Bean + public DefaultElasticSearchIndexManager elasticSearchIndexManager(@Autowired(required = false) List strategies) { + return new DefaultElasticSearchIndexManager(strategies); + } + + @Bean + @ConditionalOnMissingBean(ElasticSearchService.class) + public ReactiveElasticSearchService reactiveElasticSearchService(ReactiveElasticsearchClient elasticsearchClient, + ElasticSearchIndexManager indexManager, + ElasticSearchBufferProperties properties) { + return new ReactiveElasticSearchService(elasticsearchClient, indexManager, properties); + } + + @Bean + public ReactiveAggregationService reactiveAggregationService(ElasticSearchIndexManager indexManager, + ReactiveElasticsearchClient restClient) { + return new ReactiveAggregationService(indexManager, restClient); + } + + @Bean + public ElasticSearchTimeSeriesManager elasticSearchTimeSeriesManager(ElasticSearchIndexManager indexManager, + ElasticSearchService elasticSearchService, + AggregationService aggregationService) { + return new ElasticSearchTimeSeriesManager(indexManager, elasticSearchService, aggregationService); } } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java old mode 100644 new mode 100755 index d0c7f276..1971fd6b --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java @@ -1,11 +1,9 @@ package org.jetlinks.community.elastic.search.index.strategies; -import org.hswebframework.utils.time.DateFormatter; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient; -import org.springframework.stereotype.Component; -import java.util.Date; +import java.time.LocalDate; /** * 按月对来划分索引策略 @@ -13,17 +11,16 @@ import java.util.Date; * @author zhouhao * @since 1.0 */ -@Component public class TimeByMonthElasticSearchIndexStrategy extends TemplateElasticSearchIndexStrategy { - private final String format = "yyyy-MM"; - public TimeByMonthElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) { - super("time-by-month", client,properties); + super("time-by-month", client, properties); } @Override public String getIndexForSave(String index) { - return wrapIndex(index).concat("_").concat(DateFormatter.toString(new Date(), format)); + LocalDate now = LocalDate.now(); + String idx = wrapIndex(index); + return idx + "_" + now.getYear() + "-" + now.getMonthValue(); } } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/AggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/AggregationService.java index 2e15f373..6a3d87bb 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/AggregationService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/AggregationService.java @@ -13,4 +13,13 @@ public interface AggregationService { Flux> aggregation(String[] index, AggregationQueryParam queryParam); + /** + * @param index 索引 + * @param queryParam 聚合查询参数 + * @return 查询结果 + * @see AggregationService#aggregation(String[], AggregationQueryParam) + */ + default Flux> aggregation(String index, AggregationQueryParam queryParam) { + return aggregation(new String[]{index}, queryParam); + } } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java deleted file mode 100644 index a494ba6f..00000000 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java +++ /dev/null @@ -1,444 +0,0 @@ -package org.jetlinks.community.elastic.search.service; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.CollectionUtils; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.MultiSearchRequest; -import org.elasticsearch.action.search.MultiSearchResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.core.CountRequest; -import org.elasticsearch.client.core.CountResponse; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.reindex.BulkByScrollResponse; -import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.hswebframework.ezorm.core.param.QueryParam; -import org.hswebframework.utils.time.DateFormatter; -import org.hswebframework.utils.time.DefaultDateFormatter; -import org.hswebframework.web.api.crud.entity.PagerResult; -import org.hswebframework.web.bean.FastBeanCopier; -import org.jetlinks.community.elastic.search.ElasticRestClient; -import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; -import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; -import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; -import org.jetlinks.community.elastic.search.utils.QueryParamTranslator; -import org.jetlinks.community.elastic.search.utils.ReactorActionListener; -import org.jetlinks.core.utils.FluxUtils; -import org.reactivestreams.Publisher; -import org.springframework.context.annotation.DependsOn; -import org.springframework.util.StringUtils; -import reactor.core.publisher.BufferOverflowStrategy; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.function.Consumer3; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.function.Function; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -/** - * @author zhouhao - * @since 1.0 - **/ -//@Service -@Slf4j -@DependsOn("restHighLevelClient") -@Deprecated -public class DefaultElasticSearchService implements ElasticSearchService { - - private final ElasticRestClient restClient; - - private final ElasticSearchIndexManager indexManager; - - FluxSink sink; - - public static final IndicesOptions indexOptions = IndicesOptions.fromOptions( - true, true, false, false - ); - - static { - DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")); - } - - public DefaultElasticSearchService(ElasticRestClient restClient, - ElasticSearchIndexManager indexManager) { - this.restClient = restClient; - init(); - this.indexManager = indexManager; - } - - @Override - public Flux multiQuery(String[] index, Collection queryParam, Function, T> mapper) { - return indexManager - .getIndexesMetadata(index) - .flatMap(idx -> Mono.zip( - Mono.just(idx), getIndexForSearch(idx.getIndex()) - )) - .take(1) - .singleOrEmpty() - .flatMapMany(indexMetadata -> { - MultiSearchRequest request = new MultiSearchRequest(); - return Flux - .fromIterable(queryParam) - .flatMap(entry -> createSearchRequest(entry, index)) - .doOnNext(request::add) - .then(Mono.just(request)) - .flatMapMany(searchRequest -> ReactorActionListener - .mono(actionListener -> { - restClient.getQueryClient() - .msearchAsync(searchRequest, RequestOptions.DEFAULT, actionListener); - }) - .flatMapMany(response -> Flux.fromArray(response.getResponses())) - .flatMap(item -> { - if (item.isFailure()) { - log.warn(item.getFailureMessage(), item.getFailure()); - return Mono.empty(); - } - return Flux.fromIterable(translate((map) -> mapper.apply(indexMetadata.getT1().convertFromElastic(map)), item.getResponse())); - })) - ; - }); - } - - @Override - public Flux query(String index, QueryParam queryParam, Function, T> mapper) { - return this - .doQuery(new String[]{index}, queryParam) - .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); - } - - @Override - public Flux query(String[] index, QueryParam queryParam, Function, T> mapper) { - return this - .doQuery(index, queryParam) - .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); - } - - @Override - public Mono> queryPager(String[] index, QueryParam queryParam, Function, T> mapper) { - return this - .doQuery(index, queryParam) - .flatMap(tp2 -> - convertQueryResult(tp2.getT1(), tp2.getT2(), mapper) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits().value, list, queryParam)) - ) - .switchIfEmpty(Mono.fromSupplier(PagerResult::empty)); - } - - private Flux convertQueryResult(List indexList, - SearchResponse response, - Function, T> mapper) { - return Flux - .create(sink -> { - Map metadata = indexList - .stream() - .collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity())); - SearchHit[] hits = response.getHits().getHits(); - for (SearchHit hit : hits) { - Map hitMap = hit.getSourceAsMap(); - if (StringUtils.isEmpty(hitMap.get("id"))) { - hitMap.put("id", hit.getId()); - } - - sink.next(mapper - .apply(Optional - .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0)) - .convertFromElastic(hitMap))); - } - sink.complete(); - }); - - } - - private Mono, SearchResponse>> doQuery(String[] index, - QueryParam queryParam) { - return indexManager - .getIndexesMetadata(index) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(metadataList -> this - .createSearchRequest(queryParam, metadataList) - .flatMap(this::doSearch) - .map(response -> Tuples.of(metadataList, response)) - ).onErrorResume(err -> { - log.error(err.getMessage(), err); - return Mono.empty(); - }); - } - - - @Override - public Mono count(String[] index, QueryParam queryParam) { - QueryParam param = queryParam.clone(); - param.setPaging(false); - return createCountRequest(param, index) - .flatMap(this::doCount) - .map(CountResponse::getCount) - .defaultIfEmpty(0L) - .onErrorReturn(err -> { - log.error("query elastic error", err); - return true; - }, 0L); - } - - @Override - public Mono delete(String index, QueryParam queryParam) { - - return createQueryBuilder(queryParam, index) - .flatMap(request -> ReactorActionListener - .mono(listener -> - restClient - .getWriteClient() - .deleteByQueryAsync(new DeleteByQueryRequest(index) - .setQuery(request), - RequestOptions.DEFAULT, listener))) - .map(BulkByScrollResponse::getDeleted); - } - - @Override - public Mono commit(String index, T payload) { - sink.next(new Buffer(index, payload)); - return Mono.empty(); - } - - @Override - public Mono commit(String index, Collection payload) { - for (T t : payload) { - sink.next(new Buffer(index, t)); - } - return Mono.empty(); - } - - @Override - public Mono commit(String index, Publisher data) { - return Flux.from(data) - .flatMap(d -> commit(index, d)) - .then(); - } - - @Override - public Mono save(String index, T payload) { - return save(index, Mono.just(payload)); - } - - @Override - public Mono save(String index, Publisher data) { - return Flux.from(data) - .map(v -> new Buffer(index, v)) - .collectList() - .flatMap(this::doSave) - .then(); - } - - @Override - public Mono save(String index, Collection payload) { - return save(index, Flux.fromIterable(payload)); - } - - @PreDestroy - public void shutdown() { - sink.complete(); - } - - //@PostConstruct - public void init() { - //最小间隔 - int flushRate = Integer.getInteger("elasticsearch.buffer.rate", 1000); - //缓冲最大数量 - int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000); - //缓冲超时时间 - Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3)); - //缓冲背压 - int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64); - - FluxUtils.bufferRate( - Flux.create(sink -> this.sink = sink), - flushRate, - bufferSize, - bufferTimeout) - .onBackpressureBuffer(bufferBackpressure, - drop -> System.err.println("无法处理更多索引请求!"), - BufferOverflowStrategy.DROP_OLDEST) - .parallel() - .runOn(Schedulers.newParallel("elasticsearch-writer")) - .flatMap(buffers -> { - long time = System.currentTimeMillis(); - return this - .doSave(buffers) - .doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time))) - .onErrorContinue((err, obj) -> { - //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归. - System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err)); - }); - }) - .subscribe(); - } - - @AllArgsConstructor - @Getter - static class Buffer { - String index; - Object payload; - } - - - private Mono getIndexForSave(String index) { - return indexManager - .getIndexStrategy(index) - .map(strategy -> strategy.getIndexForSave(index)); - - } - - private Mono getIndexForSearch(String index) { - return indexManager - .getIndexStrategy(index) - .map(strategy -> strategy.getIndexForSearch(index)); - - } - - protected Mono doSave(Collection buffers) { - return Flux.fromIterable(buffers) - .groupBy(Buffer::getIndex) - .flatMap(group -> { - String index = group.key(); - return this.getIndexForSave(index) - .zipWith(indexManager.getIndexMetadata(index)) - .flatMapMany(tp2 -> - group.map(buffer -> { - Map data = FastBeanCopier.copy(buffer.getPayload(), HashMap::new); - - IndexRequest request; - if (data.get("id") != null) { - request = new IndexRequest(tp2.getT1(), "_doc", String.valueOf(data.get("id"))); - } else { - request = new IndexRequest(tp2.getT1(), "_doc"); - } - request.source(tp2.getT2().convertToElastic(data)); - return request; - })); - }) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(lst -> { - BulkRequest request = new BulkRequest(); - lst.forEach(request::add); - return ReactorActionListener.mono(listener -> - restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener)) - .doOnNext(this::checkResponse); - }) - .thenReturn(buffers.size()); - } - - @SneakyThrows - protected void checkResponse(BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse item : response.getItems()) { - if (item.isFailed()) { - throw item.getFailure().getCause(); - } - } - } - } - - private List translate(Function, T> mapper, SearchResponse response) { - return Arrays.stream(response.getHits().getHits()) - .map(hit -> { - Map hitMap = hit.getSourceAsMap(); - if (StringUtils.isEmpty(hitMap.get("id"))) { - hitMap.put("id", hit.getId()); - } - return mapper.apply(hitMap); - }) - .collect(Collectors.toList()); - } - - private Mono doSearch(SearchRequest request) { - return this - .execute(request, restClient.getQueryClient()::searchAsync) - .onErrorResume(err -> { - log.error("query elastic error", err); - return Mono.empty(); - }); - } - - private Mono execute(REQ request, Consumer3> function4) { - return ReactorActionListener.mono(actionListener -> function4.accept(request, RequestOptions.DEFAULT, actionListener)); - } - - private Mono doCount(CountRequest request) { - return this - .execute(request, restClient.getQueryClient()::countAsync) - .onErrorResume(err -> { - log.error("query elastic error", err); - return Mono.empty(); - }); - } - - protected Mono createSearchRequest(QueryParam queryParam, String... indexes) { - return indexManager - .getIndexesMetadata(indexes) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(list -> createSearchRequest(queryParam, list)); - } - - protected Mono createSearchRequest(QueryParam queryParam, List indexes) { - - SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); - return Flux.fromIterable(indexes) - .flatMap(index -> getIndexForSearch(index.getIndex())) - .collectList() - .map(indexList -> - new SearchRequest(indexList.toArray(new String[0])) - .source(builder) - .indicesOptions(indexOptions) - .types("_doc")); - } - - protected Mono createQueryBuilder(QueryParam queryParam, String index) { - return indexManager - .getIndexMetadata(index) - .map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata)) - .switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null))); - } - - protected Mono createCountRequest(QueryParam queryParam, List indexes) { - QueryParam tempQueryParam = queryParam.clone(); - tempQueryParam.setPaging(false); - tempQueryParam.setSorts(Collections.emptyList()); - - SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); - return Flux.fromIterable(indexes) - .flatMap(index -> getIndexForSearch(index.getIndex())) - .collectList() - .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder)); - } - - private Mono createCountRequest(QueryParam queryParam, String... index) { - return indexManager - .getIndexesMetadata(index) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(list -> createCountRequest(queryParam, list)); - } -} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java old mode 100644 new mode 100755 index 38dcaf0d..409ba07c --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java @@ -20,8 +20,20 @@ public enum AggType { public AggregationBuilder aggregationBuilder(String name, String filed) { return AggregationBuilders.max(name).field(filed).missing(0); } - - + }, + MEDIAN("中间值") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.medianAbsoluteDeviation(name).field(filed).missing(0); + } + }, + STDDEV("标准差") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.extendedStats(name) + .field(filed) + .missing(0); + } }, COUNT("非空值计数") { @Override @@ -30,13 +42,6 @@ public enum AggType { } }, - MIN("最小") { - @Override - public AggregationBuilder aggregationBuilder(String name, String filed) { - return AggregationBuilders.min(name).field(filed).missing(0); - } - }, - DISTINCT_COUNT("去重计数") { @Override public AggregationBuilder aggregationBuilder(String name, String filed) { @@ -47,7 +52,12 @@ public enum AggType { } }, - + MIN("最小") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.min(name).field(filed).missing(0); + } + }, FIRST("第一条数据") { @Override public AggregationBuilder aggregationBuilder(String name, String filed) { @@ -85,7 +95,7 @@ public enum AggType { return type; } } - throw new UnsupportedOperationException("不支持的聚合度量类型:" + name); + throw new UnsupportedOperationException("不支持的聚合类型:" + name); } } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java index f0db48e5..56a0bd44 100755 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java @@ -60,17 +60,19 @@ import org.elasticsearch.client.indices.GetFieldMappingsRequest; import org.elasticsearch.client.indices.GetFieldMappingsResponse; import org.elasticsearch.client.indices.GetIndexResponse; import org.elasticsearch.client.indices.IndexTemplatesExistRequest; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.BytesRestResponse; @@ -84,16 +86,17 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.*; import org.reactivestreams.Publisher; import org.springframework.data.elasticsearch.client.ClientLogger; import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.reactive.HostProvider; -import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.RequestCreator; import org.springframework.data.elasticsearch.client.util.NamedXContents; import org.springframework.data.elasticsearch.client.util.RequestConverters; import org.springframework.data.elasticsearch.client.util.ScrollState; +import org.springframework.data.elasticsearch.core.ResponseConverter; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; @@ -107,12 +110,11 @@ import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.function.Function3; +import javax.annotation.Nonnull; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.lang.reflect.Method; @@ -130,7 +132,7 @@ import static org.springframework.data.elasticsearch.client.util.RequestConverte @Slf4j @Generated -public class DefaultReactiveElasticsearchClient implements org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient, +public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster { private final HostProvider hostProvider; private final RequestCreator requestCreator; @@ -342,12 +344,8 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit .flatMap(Flux::fromIterable); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) - */ - @Override - public Flux scroll(HttpHeaders headers, SearchRequest searchRequest) { + @Override@Nonnull + public Flux scroll(@Nonnull HttpHeaders headers, SearchRequest searchRequest) { TimeValue scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive() : TimeValue.timeValueMinutes(1); @@ -356,60 +354,31 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit searchRequest.scroll(scrollTimeout); } - EmitterProcessor outbound = EmitterProcessor.create(false); - FluxSink request = outbound.sink(); + return Flux + .usingWhen(Mono.fromSupplier(ScrollState::new), + state -> this + .sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) + .expand(searchResponse -> { - EmitterProcessor inbound = EmitterProcessor.create(false); + state.updateScrollId(searchResponse.getScrollId()); + if (isEmpty(searchResponse.getHits())) { + return Mono.empty(); + } - Flux exchange = outbound.startWith(searchRequest).flatMap(it -> { + return this + .sendRequest(new SearchScrollRequest(searchResponse.getScrollId()).scroll(scrollTimeout), + requestCreator.scroll(), + SearchResponse.class, + headers); - if (it instanceof SearchRequest) { - return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers); - } else if (it instanceof SearchScrollRequest) { - return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers); - } else if (it instanceof ClearScrollRequest) { - return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers) - .flatMap(discard -> Flux.empty()); - } - - throw new IllegalArgumentException( - String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it)); - }); - - return Flux.usingWhen(Mono.fromSupplier(ScrollState::new), - - scrollState -> { - - Flux searchHits = inbound - .handle((searchResponse, sink) -> { - - scrollState.updateScrollId(searchResponse.getScrollId()); - if (isEmpty(searchResponse.getHits())) { - - inbound.onComplete(); - outbound.onComplete(); - - } else { - - sink.next(searchResponse); - - SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId()) - .scroll(scrollTimeout); - request.next(searchScrollRequest); - } - - }) - .map(SearchResponse::getHits) // - .flatMap(Flux::fromIterable); - - return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound)); - - }, - state -> cleanupScroll(headers, state), // - (state, error) -> cleanupScroll(headers, state), // - state -> cleanupScroll(headers, state)); // + }), + state -> cleanupScroll(headers, state), + (state, ex) -> cleanupScroll(headers, state), + state -> cleanupScroll(headers, state)) + .filter(it -> !isEmpty(it.getHits())) + .map(SearchResponse::getHits) + .flatMapIterable(Function.identity()); } - private static boolean isEmpty(@Nullable SearchHits hits) { return hits != null && hits.getHits() != null && hits.getHits().length == 0; } @@ -442,7 +411,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit public Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) { return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) .next() - .map(ByQueryResponse::of); + .map(ResponseConverter::byQueryResponseOf); } static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { @@ -605,6 +574,17 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit .publishNext(); } + @Override + public Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers).next(); + } + + @Override + public Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers).next() + .map(TaskSubmissionResponse::getTask); + } + // --> INDICES /* @@ -712,11 +692,25 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit @Override public Mono putMapping(HttpHeaders headers, org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) { - return sendRequest(putMappingRequest, requestCreator.putMappingRequest(), AcknowledgedResponse.class, headers) + return sendRequest(putMappingRequest, this::createPutMapping, AcknowledgedResponse.class, headers) .map(AcknowledgedResponse::isAcknowledged) .next(); } + private Request createPutMapping(org.elasticsearch.client.indices.PutMappingRequest putMappingRequest) { + Request request = requestCreator.putMappingRequest().apply(putMappingRequest); + Request newReq = new Request(request.getMethod(), request.getEndpoint()); + + Params params = new Params(newReq) + .withTimeout(putMappingRequest.timeout()) + .withMasterTimeout(putMappingRequest.masterNodeTimeout()); + if (serverVersion().before(Version.V_7_0_0)) { + params.putParam("include_type_name", "false"); + } + newReq.setEntity(request.getEntity()); + return newReq; + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#flushIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.flush.FlushRequest) @@ -1167,7 +1161,7 @@ public class DefaultReactiveElasticsearchClient implements org.jetlinks.communit @Override public Mono getMapping(HttpHeaders headers, GetMappingsRequest getMappingsRequest) { return sendRequest(getMappingsRequest, requestCreator.getMapping(), - org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse.class, headers).next(); + GetMappingsResponse.class, headers).next(); } @Override diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ElasticSearchBufferProperties.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ElasticSearchBufferProperties.java new file mode 100644 index 00000000..ff6d334c --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ElasticSearchBufferProperties.java @@ -0,0 +1,20 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.community.buffer.BufferProperties; +import org.jetlinks.community.buffer.BufferProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Getter +@Setter +@ConfigurationProperties(prefix = "elasticsearch.buffer") +public class ElasticSearchBufferProperties extends BufferProperties { + public ElasticSearchBufferProperties() { + //固定缓冲文件目录 + setFilePath("./data/elasticsearch-buffer"); + setSize(3000); + } + + private boolean refreshWhenWrite = false; +} \ No newline at end of file diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java old mode 100644 new mode 100755 index f38a4ba8..06bfd0f6 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java @@ -21,16 +21,14 @@ import org.elasticsearch.search.sort.SortOrder; import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.Term; import org.hswebframework.ezorm.core.param.TermType; +import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.community.Interval; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.service.AggregationService; -import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService; import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; import org.jetlinks.community.timeseries.query.*; -import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.reactor.ql.utils.CastUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; @@ -45,11 +43,9 @@ import java.util.stream.Collectors; * @author zhouhao * @since 1.5 **/ -@Service @Slf4j public class ReactiveAggregationService implements AggregationService { - private final ReactiveElasticsearchClient restClient; private final ElasticSearchIndexManager indexManager; @@ -108,7 +104,9 @@ public class ReactiveAggregationService implements AggregationService { .terms(group.getAlias()) .field(group.getProperty()); if (group instanceof LimitGroup) { - builder.size(((LimitGroup) group).getLimit()); + if (((LimitGroup) group).getLimit() > 0) { + builder.size(((LimitGroup) group).getLimit()); + } } else { builder.size(100); } @@ -142,8 +140,10 @@ public class ReactiveAggregationService implements AggregationService { boolean group = aggregationBuilder != null; for (AggregationColumn aggColumn : aggregationQueryParam.getAggColumns()) { - AggregationBuilder builder = AggType.of(aggColumn.getAggregation().name()) - .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty()); + AggregationBuilder builder = AggType + .of(aggColumn.getAggregation().name()) + .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty()); + if (builder instanceof TopHitsAggregationBuilder) { TopHitsAggregationBuilder topHitsBuilder = ((TopHitsAggregationBuilder) builder); if (CollectionUtils.isEmpty(queryParam.getSorts())) { @@ -154,7 +154,9 @@ public class ReactiveAggregationService implements AggregationService { .stream() .map(sort -> SortBuilders .fieldSort(sort.getName()) - .order("desc".equalsIgnoreCase(sort.getOrder()) ? SortOrder.DESC : SortOrder.ASC)) + .order("desc".equalsIgnoreCase(sort.getOrder()) + ? SortOrder.DESC + : SortOrder.ASC)) .collect(Collectors.toList())); } if (aggColumn instanceof LimitAggregationColumn) { @@ -170,36 +172,37 @@ public class ReactiveAggregationService implements AggregationService { } } - return Flux.fromArray(index) - .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx))) - .collectList() - .flatMap(strategy -> this - .createSearchSourceBuilder(queryParam, index[0]) - .map(builder -> { - aggs.forEach(builder.size(0)::aggregation); - return new SearchRequest(strategy - .stream() - .map(tp2 -> tp2 - .getT1() - .getIndexForSearch(tp2.getT2())) - .toArray(String[]::new)) - .indicesOptions(ReactiveElasticSearchService.indexOptions) - .source(builder); - } - ) - ) - .flatMap(restClient::searchForPage) - .flatMapMany(this::parseResult) - .as(flux -> { - if (!group) { - return flux - .map(Map::entrySet) - .flatMap(Flux::fromIterable) - .collectMap(Map.Entry::getKey, Map.Entry::getValue) - .flux(); - } - return flux; - }) + return Flux + .fromArray(index) + .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx))) + .collectList() + .flatMap(strategy -> this + .createSearchSourceBuilder(queryParam, index[0]) + .map(builder -> { + aggs.forEach(builder.size(0)::aggregation); + return new SearchRequest(strategy + .stream() + .map(tp2 -> tp2 + .getT1() + .getIndexForSearch(tp2.getT2())) + .toArray(String[]::new)) + .indicesOptions(ReactiveElasticSearchService.indexOptions) + .source(builder); + } + ) + ) + .flatMap(restClient::searchForPage) + .flatMapMany(this::parseResult) + .as(flux -> { + if (!group) { + return flux + .map(Map::entrySet) + .flatMap(Flux::fromIterable) + .collectMap(Map.Entry::getKey, Map.Entry::getValue) + .flux(); + } + return flux; + }) ; } @@ -209,7 +212,8 @@ public class ReactiveAggregationService implements AggregationService { .flatMap(agg -> parseAggregation(agg.getName(), agg), Integer.MAX_VALUE); } - private Flux> parseAggregation(String name, org.elasticsearch.search.aggregations.Aggregation aggregation) { + private Flux> parseAggregation(String name, + org.elasticsearch.search.aggregations.Aggregation aggregation) { if (aggregation instanceof Terms) { return parseAggregation(((Terms) aggregation)); } @@ -310,7 +314,7 @@ public class ReactiveAggregationService implements AggregationService { .ofDays(Integer.getInteger("elasticsearch.agg.default-range-day", 90)) .toMillis(); - private static long calculateStartWithTime(AggregationQueryParam param) { + static long calculateStartWithTime(AggregationQueryParam param) { long startWithParam = param.getStartWithTime(); if (startWithParam == 0) { //从查询条件中提取时间参数来获取时间区间 @@ -336,5 +340,4 @@ public class ReactiveAggregationService implements AggregationService { return startWithParam; } - } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java index 2d8de503..b9ce08ac 100755 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java @@ -14,18 +14,19 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.xcontent.XContentType; import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DefaultDateFormatter; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.core.utils.SerializeUtils; import org.jetlinks.community.buffer.BufferProperties; import org.jetlinks.community.buffer.BufferSettings; import org.jetlinks.community.buffer.MemoryUsage; @@ -38,11 +39,9 @@ import org.jetlinks.community.elastic.search.utils.QueryParamTranslator; import org.jetlinks.community.utils.ErrorUtils; import org.jetlinks.community.utils.ObjectMappers; import org.jetlinks.community.utils.SystemUtils; -import org.jetlinks.core.utils.SerializeUtils; import org.reactivestreams.Publisher; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.DependsOn; -import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.client.WebClientException; @@ -73,13 +72,8 @@ import java.util.stream.Collectors; @Slf4j @DependsOn("reactiveElasticsearchClient") @ConfigurationProperties(prefix = "elasticsearch") -@Service public class ReactiveElasticSearchService implements ElasticSearchService { - @Getter - @Setter - private BufferConfig buffer = new BufferConfig(); - @Getter private final ReactiveElasticsearchClient restClient; @Getter @@ -96,10 +90,21 @@ public class ReactiveElasticSearchService implements ElasticSearchService { private PersistenceBuffer writer; + @Getter + @Setter + private ElasticSearchBufferProperties buffer; + public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, ElasticSearchIndexManager indexManager) { + this(restClient, indexManager, new ElasticSearchBufferProperties()); + } + + public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, + ElasticSearchIndexManager indexManager, + ElasticSearchBufferProperties buffer) { this.restClient = restClient; this.indexManager = indexManager; + this.buffer = buffer; init(); } @@ -269,6 +274,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService { } private boolean checkWritable(String index) { +// if (SystemUtils.memoryIsOutOfWatermark()) { +// SystemUtils.printError("JVM内存不足,elasticsearch无法处理更多索引[%s]请求!", index); +// return false; +// } return true; } @@ -324,6 +333,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService { writer.dispose(); } + @Getter @Setter public static class BufferConfig extends BufferProperties { @@ -468,7 +478,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService { .flatMap(lst -> { BulkRequest request = new BulkRequest(); request.timeout(TimeValue.timeValueSeconds(9)); - if (buffer.refreshWhenWrite) { + if (buffer.isRefreshWhenWrite()) { request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); } lst.forEach(request::add); diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java old mode 100644 new mode 100755 index fae07a3f..6c3d7730 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java @@ -28,5 +28,4 @@ public interface ReactiveElasticsearchClient extends Mono updateTemplate(PutIndexTemplateRequest request); Version serverVersion(); - } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeDDLOperations.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeDDLOperations.java new file mode 100644 index 00000000..ba82742d --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeDDLOperations.java @@ -0,0 +1,38 @@ +package org.jetlinks.community.elastic.search.things; + +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; +import org.jetlinks.community.things.data.operations.ColumnModeDDLOperationsBase; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import reactor.core.publisher.Mono; + +import java.util.List; + +class ElasticSearchColumnModeDDLOperations extends ColumnModeDDLOperationsBase { + + private final ElasticSearchIndexManager indexManager; + + public ElasticSearchColumnModeDDLOperations(String thingType, + String templateId, + String thingId, + DataSettings settings, + MetricBuilder metricBuilder, + ElasticSearchIndexManager indexManager) { + super(thingType, templateId, thingId, settings, metricBuilder); + this.indexManager = indexManager; + } + + @Override + protected Mono register(MetricType metricType,String metric, List properties) { + return indexManager + .putIndex(new DefaultElasticSearchIndexMetadata(metric, properties)); + } + + @Override + protected Mono reload(MetricType metricType,String metric, List properties) { + return indexManager + .putIndex(new DefaultElasticSearchIndexMetadata(metric, properties)); + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeQueryOperations.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeQueryOperations.java new file mode 100644 index 00000000..12210986 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeQueryOperations.java @@ -0,0 +1,130 @@ +package org.jetlinks.community.elastic.search.things; + +import org.hswebframework.ezorm.core.dsl.Query; +import org.hswebframework.web.api.crud.entity.PagerResult; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.elastic.search.service.AggregationService; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.things.data.AggregationRequest; +import org.jetlinks.community.things.data.PropertyAggregation; +import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.community.timeseries.query.*; +import org.jetlinks.reactor.ql.utils.CastUtils; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; + +class ElasticSearchColumnModeQueryOperations extends ColumnModeQueryOperationsBase { + + private final ElasticSearchService searchService; + private final AggregationService aggregationService; + + public ElasticSearchColumnModeQueryOperations(String thingType, + String thingTemplateId, + String thingId, + MetricBuilder metricBuilder, + DataSettings settings, + ThingsRegistry registry, + ElasticSearchService service, + AggregationService aggregationService) { + super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry); + this.searchService = service; + this.aggregationService = aggregationService; + } + + @Override + protected Flux doQuery(String metric, Query query) { + + return searchService + .query(metric, + query.getParam(), + data -> { + long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue(); + data.put("timestamp",ts); + return TimeSeriesData.of(ts, data); + }); + } + + @Override + protected Mono> doQueryPage(String metric, + Query query, + Function mapper) { + return searchService + .queryPager(metric, + query.getParam(), + data -> { + long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue(); + data.put("timestamp",ts); + return mapper.apply(TimeSeriesData.of(ts, data)); + }); + } + + @Override + protected Flux doAggregation(String metric, AggregationRequest request, AggregationContext context) { + org.joda.time.format.DateTimeFormatter formatter = DateTimeFormat.forPattern(request.getFormat()); + PropertyAggregation[] properties = context.getProperties(); + return AggregationQueryParam + .of() + .as(param -> { + for (PropertyAggregation property : properties) { + param.agg(property.getProperty(), property.getAlias(), property.getAgg()); + } + return param; + }) + .as(param -> { + if (request.getInterval() == null) { + return param; + } + return param.groupBy((Group) new TimeGroup(request.getInterval(), "time", request.getFormat())); + }) + .limit(request.getLimit() * properties.length) + .from(request.getFrom()) + .to(request.getTo()) + .filter(request.getFilter()) + .execute(param -> aggregationService.aggregation(metric, param)) + .map(AggregationData::of) + .groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE) + .flatMap(group -> group + .map(data -> { + Map newMap = new HashMap<>(); + newMap.put("time", data.get("time").orElse(null)); + for (PropertyAggregation property : properties) { + Object val; + if(property.getAgg() ==Aggregation.FIRST || property.getAgg()==Aggregation.TOP){ + val = data + .get(property.getProperty()) + .orElse(null); + }else { + val = data + .get(property.getAlias()) + .orElse(null); + } + if (null != val) { + newMap.put(property.getAlias(), val); + } + } + return newMap; + }) + .reduce((a, b) -> { + a.putAll(b); + return a; + }) + .map(AggregationData::of)) + .sort(Comparator.comparing(agg -> DateTime + .parse(agg.getString("time", ""), formatter) + .toDate()).reversed()) + .take(request.getLimit()) + ; + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeSaveOperations.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeSaveOperations.java new file mode 100644 index 00000000..807e3d03 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeSaveOperations.java @@ -0,0 +1,33 @@ +package org.jetlinks.community.elastic.search.things; + +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.things.data.operations.ColumnModeSaveOperationsBase; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.timeseries.TimeSeriesData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class ElasticSearchColumnModeSaveOperations extends ColumnModeSaveOperationsBase { + + private final ElasticSearchService searchService; + + public ElasticSearchColumnModeSaveOperations(ThingsRegistry registry, + MetricBuilder metricBuilder, + DataSettings settings, + ElasticSearchService searchService) { + super(registry, metricBuilder, settings); + this.searchService = searchService; + } + + @Override + protected Mono doSave(String metric, TimeSeriesData data) { + return searchService.commit(metric, data.getData()); + } + + @Override + protected Mono doSave(String metric, Flux data) { + return searchService.save(metric, data.map(TimeSeriesData::getData)); + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeStrategy.java new file mode 100644 index 00000000..db83c929 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchColumnModeStrategy.java @@ -0,0 +1,66 @@ +package org.jetlinks.community.elastic.search.things; + +import lombok.AllArgsConstructor; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; +import org.jetlinks.community.elastic.search.service.AggregationService; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy; +import org.jetlinks.community.things.data.operations.*; + +@AllArgsConstructor +public class ElasticSearchColumnModeStrategy extends AbstractThingDataRepositoryStrategy { + + private final ThingsRegistry registry; + private final ElasticSearchService searchService; + private final AggregationService aggregationService; + private final ElasticSearchIndexManager indexManager; + + @Override + public String getId() { + return "default-column"; + } + + @Override + public String getName() { + return "ElasticSearch-列式存储"; + } + + @Override + public SaveOperations createOpsForSave(OperationsContext context) { + return new ElasticSearchColumnModeSaveOperations( + registry, + context.getMetricBuilder(), + context.getSettings(), + searchService); + } + + @Override + protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) { + return new ElasticSearchColumnModeQueryOperations( + thingType, + templateId, + thingId, + context.getMetricBuilder(), + context.getSettings(), + registry, + searchService, + aggregationService); + } + + @Override + protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) { + return new ElasticSearchColumnModeDDLOperations( + thingType, + templateId, + thingId, + context.getSettings(), + context.getMetricBuilder(), + indexManager); + } + + @Override + public int getOrder() { + return 10001; + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeDDLOperations.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeDDLOperations.java new file mode 100644 index 00000000..b5816669 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeDDLOperations.java @@ -0,0 +1,38 @@ +package org.jetlinks.community.elastic.search.things; + +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.things.data.operations.RowModeDDLOperationsBase; +import reactor.core.publisher.Mono; + +import java.util.List; + +class ElasticSearchRowModeDDLOperations extends RowModeDDLOperationsBase { + + private final ElasticSearchIndexManager indexManager; + + public ElasticSearchRowModeDDLOperations(String thingType, + String templateId, + String thingId, + DataSettings settings, + MetricBuilder metricBuilder, + ElasticSearchIndexManager indexManager) { + super(thingType, templateId, thingId, settings, metricBuilder); + this.indexManager = indexManager; + } + + @Override + protected Mono register(MetricType metricType, String metric, List properties) { + return indexManager + .putIndex(new DefaultElasticSearchIndexMetadata(metric, properties)); + } + + @Override + protected Mono reload(MetricType metricType, String metric, List properties) { + return indexManager + .putIndex(new DefaultElasticSearchIndexMetadata(metric, properties)); + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeQueryOperations.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeQueryOperations.java new file mode 100644 index 00000000..8aec383d --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeQueryOperations.java @@ -0,0 +1,235 @@ +package org.jetlinks.community.elastic.search.things; + +import org.hswebframework.ezorm.core.dsl.Query; +import org.hswebframework.web.api.crud.entity.PagerResult; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.things.ThingMetadata; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.elastic.search.service.AggregationService; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.things.data.AggregationRequest; +import org.jetlinks.community.things.data.PropertyAggregation; +import org.jetlinks.community.things.data.ThingPropertyDetail; +import org.jetlinks.community.things.data.ThingsDataConstants; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase; +import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.community.timeseries.query.*; +import org.jetlinks.reactor.ql.utils.CastUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.Nonnull; +import java.util.*; +import java.util.function.Function; + +class ElasticSearchRowModeQueryOperations extends RowModeQueryOperationsBase { + + private final ElasticSearchService searchService; + private final AggregationService aggregationService; + + public ElasticSearchRowModeQueryOperations(String thingType, + String thingTemplateId, + String thingId, + MetricBuilder metricBuilder, + DataSettings settings, + ThingsRegistry registry, + ElasticSearchService service, + AggregationService aggregationService) { + super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry); + this.searchService = service; + this.aggregationService = aggregationService; + } + + @Override + protected Flux doQuery(String metric, Query query) { + + return searchService + .query(metric, + query.getParam(), + data -> { + long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue(); + return TimeSeriesData.of(ts, data); + }); + } + + @Override + protected Mono> doQueryPage(String metric, + Query query, + Function mapper) { + return searchService + .queryPager(metric, + query.getParam(), + data -> { + long ts = CastUtils.castNumber(data.getOrDefault("timestamp", 0L)).longValue(); + return mapper.apply(TimeSeriesData.of(ts, data)); + }); + } + + @Override + protected Flux queryEachProperty(@Nonnull String metric, + @Nonnull Query query, + @Nonnull ThingMetadata metadata, + @Nonnull Map properties) { + QueryParamEntity param = query.getParam(); + //不分页或者每页数量大于1000则回退到普通方式查询 + if (!param.isPaging() || param.getPageSize() >= 1000) { + return super.queryEachProperty(metric, query, metadata, properties); + } + + if (properties.size() <= 200) { + query.in(ThingsDataConstants.COLUMN_PROPERTY_ID, properties.keySet()); + } + + //通过聚合查询求top n来查询每一个属性数据 + return AggregationQueryParam + .of() + .agg(new LimitAggregationColumn(ThingsDataConstants.COLUMN_PROPERTY_ID, + ThingsDataConstants.COLUMN_PROPERTY_ID, + Aggregation.FIRST, + param.getPageSize())) + .groupBy(new LimitGroup(ThingsDataConstants.COLUMN_PROPERTY_ID, + ThingsDataConstants.COLUMN_PROPERTY_ID, + param.getPageSize() * properties.size())) //按property分组 + .filter(param) + .execute(params -> aggregationService.aggregation(metric, params)) + .mapNotNull(data -> { + long ts = CastUtils.castNumber(data.getOrDefault(ThingsDataConstants.COLUMN_TIMESTAMP, 0L)).longValue(); + + String property = (String) data.getOrDefault(ThingsDataConstants.COLUMN_PROPERTY_ID, null); + String thingId = (String) data.getOrDefault(metricBuilder.getThingIdProperty(), null); + Object value = data.getOrDefault(ThingsDataConstants.COLUMN_PROPERTY_VALUE, null); + if (property == null || thingId == null || value == null) { + return null; + } + return ThingPropertyDetail + .of(TimeSeriesData.of(ts, data), properties.get(property)); + }); + } + + @Override + protected Flux doAggregation(String metric, AggregationRequest request, AggregationContext context) { + PropertyAggregation[] properties = context.getProperties(); + //只聚合一个属性时 + if (properties.length == 1) { + return AggregationQueryParam + .of() + .agg(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE, properties[0].getAlias(), properties[0].getAgg()) + .as(param -> { + if (request.getInterval() == null) { + return param; + } + return param.groupBy(request.getInterval(), request.getFormat()); + }) + .limit(request.getLimit()) + .from(request.getFrom()) + .to(request.getTo()) + .filter(request.getFilter()) + .filter(query -> query.where(ThingsDataConstants.COLUMN_PROPERTY_ID, properties[0].getProperty())) + .execute(param -> aggregationService.aggregation(metric, param)) + .doOnNext(agg -> agg.remove("_time")) + .map(AggregationData::of) + .take(request.getLimit()); + } + + Map propertyAlias = context.getPropertyAlias(); + + Map aliasProperty = context.getAliasToProperty(); + + return AggregationQueryParam + .of() + .as(param -> { + Arrays.stream(properties) + .forEach(agg -> param.agg(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE, "value_" + agg.getAlias(), agg.getAgg())); + return param; + }) + .as(param -> { + if (request.getInterval() == null) { + return param; + } + return param.groupBy((Group) new TimeGroup(request.getInterval(), "time", request.getFormat())); + }) + .groupBy(new LimitGroup(ThingsDataConstants.COLUMN_PROPERTY_ID, ThingsDataConstants.COLUMN_PROPERTY_ID, properties.length)) + .limit(request.getLimit() * properties.length) + .from(request.getFrom()) + .to(request.getTo()) + .filter(request.getFilter()) + .filter(query -> query + .where() + .in(ThingsDataConstants.COLUMN_PROPERTY_ID, new HashSet<>(propertyAlias.values()))) + //执行查询 + .execute(param -> aggregationService.aggregation(metric, param)) + .map(AggregationData::of) + //按时间分组,然后将返回的结果合并起来 + .groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE) + .as(flux -> { + //按时间分组 + if (request.getInterval() != null) { + return flux + .flatMap(group -> { + String time = group.key(); + return group + //按属性分组 + .groupBy(agg -> agg.getString(ThingsDataConstants.COLUMN_PROPERTY_ID, ""), Integer.MAX_VALUE) + .flatMap(propsGroup -> { + String property = propsGroup.key(); + return propsGroup + .reduce(AggregationData::merge) + .map(agg -> { + Map data = new HashMap<>(); + data.put("_time", agg.get("_time").orElse(time)); + data.put("time", time); + aliasProperty.forEach((alias, prp) -> { + if (prp.getAgg() == Aggregation.FIRST || prp.getAgg() == Aggregation.TOP) { + data.putIfAbsent(alias, agg + .get(ThingsDataConstants.COLUMN_PROPERTY_NUMBER_VALUE) + .orElse(agg.get("value").orElse(null))); + } else if (property.equals(prp.getProperty())) { + Object value = agg + .get("value_" + alias) + .orElse(0); + data.putIfAbsent(alias, value); + } + }); + return data; + }); + }) + .>reduceWith(HashMap::new, (a, b) -> { + a.putAll(b); + return a; + }); + } + ); + } else { + return flux + .flatMap(group -> group + .reduce(AggregationData::merge) + .map(agg -> { + Map values = new HashMap<>(); + //values.put("time", group.key()); + for (Map.Entry props : propertyAlias.entrySet()) { + values.put(props.getKey(), agg + .get("value_" + props.getKey()) + .orElse(0)); + } + return values; + })); + } + }) + .map(map -> { + map.remove(""); + propertyAlias + .keySet() + .forEach(key -> map.putIfAbsent(key, 0)); + return AggregationData.of(map); + }) + .sort(Comparator + .comparing(agg -> CastUtils.castDate(agg.values().get("_time"))) + .reversed()) + .doOnNext(agg -> agg.values().remove("_time")) + .take(request.getLimit()) + ; + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeSaveOperations.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeSaveOperations.java new file mode 100644 index 00000000..e2ff31ec --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeSaveOperations.java @@ -0,0 +1,33 @@ +package org.jetlinks.community.elastic.search.things; + +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.things.data.operations.DataSettings; +import org.jetlinks.community.things.data.operations.MetricBuilder; +import org.jetlinks.community.things.data.operations.RowModeSaveOperationsBase; +import org.jetlinks.community.timeseries.TimeSeriesData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class ElasticSearchRowModeSaveOperations extends RowModeSaveOperationsBase { + + private final ElasticSearchService searchService; + + public ElasticSearchRowModeSaveOperations(ThingsRegistry registry, + MetricBuilder metricBuilder, + DataSettings settings, + ElasticSearchService searchService) { + super(registry, metricBuilder, settings); + this.searchService = searchService; + } + + @Override + protected Mono doSave(String metric, TimeSeriesData data) { + return searchService.commit(metric, data.getData()); + } + + @Override + protected Mono doSave(String metric, Flux data) { + return searchService.save(metric, data.map(TimeSeriesData::getData)); + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeStrategy.java new file mode 100644 index 00000000..36366ac1 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/things/ElasticSearchRowModeStrategy.java @@ -0,0 +1,66 @@ +package org.jetlinks.community.elastic.search.things; + +import lombok.AllArgsConstructor; +import org.jetlinks.core.things.ThingsRegistry; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; +import org.jetlinks.community.elastic.search.service.AggregationService; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.things.data.AbstractThingDataRepositoryStrategy; +import org.jetlinks.community.things.data.operations.*; + +@AllArgsConstructor +public class ElasticSearchRowModeStrategy extends AbstractThingDataRepositoryStrategy { + + private final ThingsRegistry registry; + private final ElasticSearchService searchService; + private final AggregationService aggregationService; + private final ElasticSearchIndexManager indexManager; + + @Override + public String getId() { + return "default-row"; + } + + @Override + public String getName() { + return "ElasticSearch-行式存储"; + } + + @Override + public SaveOperations createOpsForSave(OperationsContext context) { + return new ElasticSearchRowModeSaveOperations( + registry, + context.getMetricBuilder(), + context.getSettings(), + searchService); + } + + @Override + protected QueryOperations createForQuery(String thingType, String templateId, String thingId, OperationsContext context) { + return new ElasticSearchRowModeQueryOperations( + thingType, + templateId, + thingId, + context.getMetricBuilder(), + context.getSettings(), + registry, + searchService, + aggregationService); + } + + @Override + protected DDLOperations createForDDL(String thingType, String templateId, String thingId, OperationsContext context) { + return new ElasticSearchRowModeDDLOperations( + thingType, + templateId, + thingId, + context.getSettings(), + context.getMetricBuilder(), + indexManager); + } + + @Override + public int getOrder() { + return 10000; + } +}