From 75c7022ebfc4eec91c22fb4ab2fecc3ae41965fa Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Tue, 1 Sep 2020 18:20:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=87=E7=BA=A7elasticsearch=207.9.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/run-all/docker-compose.yml | 2 +- .../java/org/jetlinks/community/Version.java | 2 +- .../elasticsearch-component/pom.xml | 13 +- .../aggreation/DefaultAggregationService.java | 2 +- .../bucket/AggregationResponseHandle.java | 7 +- .../search/aggreation/enums/BucketType.java | 8 +- .../search/aggreation/enums/MetricsType.java | 7 +- .../ElasticSearchConfiguration.java | 106 +- .../embedded/EmbeddedElasticSearch.java | 11 +- .../AbstractElasticSearchIndexStrategy.java | 71 +- .../DirectElasticSearchIndexStrategy.java | 6 +- .../TemplateElasticSearchIndexStrategy.java | 29 +- ...TimeByMonthElasticSearchIndexStrategy.java | 4 +- .../service/DefaultElasticSearchService.java | 5 +- .../DefaultReactiveElasticsearchClient.java | 1295 +++++++++++++++++ .../service/reactive/RawActionResponse.java | 59 + .../reactive/ReactiveAggregationService.java | 199 +++ .../ReactiveElasticSearchService.java | 413 ++++++ .../reactive/ReactiveElasticsearchClient.java | 29 + .../main/resources/application-embedded.yml | 8 + pom.xml | 2 +- 21 files changed, 2183 insertions(+), 95 deletions(-) create mode 100644 jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java create mode 100644 jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java create mode 100644 jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java create mode 100644 jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java create mode 100644 jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java diff --git a/docker/run-all/docker-compose.yml b/docker/run-all/docker-compose.yml index 762c4a94..c7c65f12 100644 --- a/docker/run-all/docker-compose.yml +++ b/docker/run-all/docker-compose.yml @@ -59,7 +59,7 @@ services: links: - jetlinks:jetlinks jetlinks: - image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.4.0 + image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.5.0-SNAPSHOT container_name: jetlinks-ce ports: - 8848:8848 # API端口 diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java index 41cdb319..0e9db4b2 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java @@ -8,6 +8,6 @@ public class Version { private final String edition = "community"; - private final String version = "1.4.0"; + private final String version = "1.5.0-SNAPSHOT"; } diff --git a/jetlinks-components/elasticsearch-component/pom.xml b/jetlinks-components/elasticsearch-component/pom.xml index 9a91b7d7..347c1a88 100644 --- a/jetlinks-components/elasticsearch-component/pom.xml +++ b/jetlinks-components/elasticsearch-component/pom.xml @@ -43,6 +43,12 @@ org.elasticsearch.client elasticsearch-rest-high-level-client + + + org.springframework.data + spring-data-elasticsearch + + org.hswebframework.web hsweb-commons-crud @@ -65,7 +71,12 @@ timeseries-component ${project.version} - + + + io.projectreactor.netty + reactor-netty + + \ No newline at end of file diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java index 6782c0b1..08030245 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java @@ -39,7 +39,7 @@ import java.util.stream.Collectors; * @author bsetfeng * @since 1.0 **/ -@Service +//@Service @Slf4j public class DefaultAggregationService implements AggregationService { diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java index 15cb901b..a284f906 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java @@ -4,12 +4,7 @@ import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.range.Range; import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.avg.Avg; -import org.elasticsearch.search.aggregations.metrics.max.Max; -import org.elasticsearch.search.aggregations.metrics.min.Min; -import org.elasticsearch.search.aggregations.metrics.stats.Stats; -import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount; +import org.elasticsearch.search.aggregations.metrics.*; import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponseSingleValue; import java.util.List; diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java index 4fcf3720..b8337454 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java @@ -17,9 +17,9 @@ import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket; import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure; import org.jetlinks.community.elastic.search.aggreation.bucket.Sort; import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure; -import org.joda.time.DateTimeZone; import org.springframework.util.StringUtils; +import java.time.ZoneId; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,7 +104,7 @@ public enum BucketType { if (structure.getMissingValue() != null) { builder.missing(structure.getMissingValue()); } - builder.timeZone(DateTimeZone.getDefault()); + builder.timeZone(ZoneId.systemDefault()); commonAggregationSetting(builder, structure); return builder; } @@ -136,7 +136,7 @@ public enum BucketType { if (sort != null) { builder.order(mapping.get(OrderBuilder.of(sort.getOrder(), sort.getType()))); } - builder.timeZone(DateTimeZone.getDefault()); + builder.timeZone(ZoneId.systemDefault()); commonAggregationSetting(builder, structure); return builder; } @@ -148,7 +148,7 @@ public enum BucketType { } }; - private String text; + private final String text; public abstract AggregationBuilder aggregationBuilder(BucketAggregationsStructure structure); diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java index 873f9278..090e12a3 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java @@ -5,12 +5,7 @@ import lombok.Getter; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.metrics.avg.Avg; -import org.elasticsearch.search.aggregations.metrics.max.Max; -import org.elasticsearch.search.aggregations.metrics.min.Min; -import org.elasticsearch.search.aggregations.metrics.stats.Stats; -import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount; +import org.elasticsearch.search.aggregations.metrics.*; import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponse; import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponseSingleValue; diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java index 2a0079e4..70af8acc 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java @@ -1,5 +1,12 @@ package org.jetlinks.community.elastic.search.configuration; +import io.netty.channel.ChannelOption; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.ClientAuth; +import io.netty.handler.ssl.IdentityCipherSuiteFilter; +import io.netty.handler.ssl.JdkSslContext; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestClient; @@ -8,10 +15,25 @@ import org.jetlinks.community.elastic.search.ElasticRestClient; import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearch; import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; +import org.jetlinks.community.elastic.search.service.reactive.DefaultReactiveElasticsearchClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.elasticsearch.client.ClientConfiguration; +import org.springframework.data.elasticsearch.client.reactive.HostProvider; +import org.springframework.data.elasticsearch.client.reactive.RequestCreator; +import org.springframework.data.elasticsearch.client.reactive.WebClientProvider; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import reactor.netty.http.client.HttpClient; +import reactor.netty.tcp.ProxyProvider; +import reactor.netty.tcp.TcpClient; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.TimeUnit; /** * @author bsetfeng @@ -34,18 +56,94 @@ public class ElasticSearchConfiguration { this.properties = properties; this.embeddedProperties = embeddedProperties; } - @Bean @SneakyThrows - public ElasticRestClient elasticRestClient() { + public DefaultReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) { if (embeddedProperties.isEnabled()) { log.debug("starting embedded elasticsearch on {}:{}", embeddedProperties.getHost(), embeddedProperties.getPort()); - new EmbeddedElasticSearch(embeddedProperties) - .start(); + new EmbeddedElasticSearch(embeddedProperties).start(); } + + WebClientProvider provider = getWebClientProvider(clientConfiguration); + + HostProvider hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(), + clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0])); + + DefaultReactiveElasticsearchClient client = + new DefaultReactiveElasticsearchClient(hostProvider, new RequestCreator() { + }); + + client.setHeadersSupplier(clientConfiguration.getHeadersSupplier()); + + return client; + } + + private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) { + + Duration connectTimeout = clientConfiguration.getConnectTimeout(); + Duration soTimeout = clientConfiguration.getSocketTimeout(); + + TcpClient tcpClient = TcpClient.create(); + + if (!connectTimeout.isNegative()) { + tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis())); + } + + if (!soTimeout.isNegative()) { + tcpClient = tcpClient.doOnConnected(connection -> connection // + .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))); + } + + if (clientConfiguration.getProxy().isPresent()) { + String proxy = clientConfiguration.getProxy().get(); + String[] hostPort = proxy.split(":"); + + if (hostPort.length != 2) { + throw new IllegalArgumentException("invalid proxy configuration " + proxy + ", should be \"host:port\""); + } + tcpClient = tcpClient.proxy(proxyOptions -> proxyOptions.type(ProxyProvider.Proxy.HTTP).host(hostPort[0]) + .port(Integer.parseInt(hostPort[1]))); + } + + String scheme = "http"; + HttpClient httpClient = HttpClient.from(tcpClient); + + if (clientConfiguration.useSsl()) { + + Optional sslContext = clientConfiguration.getSslContext(); + + if (sslContext.isPresent()) { + httpClient = httpClient.secure(sslContextSpec -> { + sslContextSpec.sslContext(new JdkSslContext(sslContext.get(), true, null, IdentityCipherSuiteFilter.INSTANCE, + ApplicationProtocolConfig.DISABLED, ClientAuth.NONE, null, false)); + }); + } else { + httpClient = httpClient.secure(); + } + + scheme = "https"; + } + + ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); + WebClientProvider provider = WebClientProvider.create(scheme, connector); + + if (clientConfiguration.getPathPrefix() != null) { + provider = provider.withPathPrefix(clientConfiguration.getPathPrefix()); + } + + provider = provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()) // + .withWebClientConfigurer(clientConfiguration.getWebClientConfigurer()); + return provider; + } + + @Bean + @SneakyThrows + public ElasticRestClient elasticRestClient() { + RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(properties.createHosts()) .setRequestConfigCallback(properties::applyRequestConfigBuilder) .setHttpClientConfigCallback(properties::applyHttpAsyncClientBuilder)); diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java index 70d102a1..fbd01801 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java @@ -22,17 +22,12 @@ public class EmbeddedElasticSearch extends Node { Settings.builder() .put("node.name", "test") .put("discovery.type", "single-node") - .put("transport.type", "netty4") - .put("http.type", "netty4") + .put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME) + .put("http.type", Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) .put("network.host", "0.0.0.0") .put("http.port", 9200) - ).build(), null), + ).build(), Collections.emptyMap(), null, () -> "default"), Collections.singleton(Netty4Plugin.class), false); } - @Override - protected void registerDerivedNodeNameWithLogger(String nodeName) { - - } - } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java index 9a962224..f98e69e2 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java @@ -3,22 +3,23 @@ package org.jetlinks.community.elastic.search.index.strategies; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.indices.*; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.compress.CompressedXContent; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.SimplePropertyMetadata; import org.jetlinks.core.metadata.types.*; -import org.jetlinks.community.elastic.search.ElasticRestClient; import org.jetlinks.community.elastic.search.enums.ElasticDateFormat; import org.jetlinks.community.elastic.search.enums.ElasticPropertyType; import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy; -import org.jetlinks.community.elastic.search.utils.ReactorActionListener; +import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient; import reactor.core.publisher.Mono; import java.util.*; @@ -30,7 +31,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc @Getter private final String id; - protected ElasticRestClient client; + protected ReactiveElasticsearchClient client; protected ElasticSearchIndexProperties properties; @@ -39,19 +40,11 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc } protected Mono indexExists(String index) { - return ReactorActionListener.mono( - actionListener -> - client.getQueryClient() - .indices() - .existsAsync(new GetIndexRequest(wrapIndex(index)), RequestOptions.DEFAULT, actionListener)); + return client.existsIndex(req -> req.indices(wrapIndex(index))); } protected Mono doCreateIndex(ElasticSearchIndexMetadata metadata) { - return ReactorActionListener.mono( - actionListener -> client.getQueryClient() - .indices() - .createAsync(createIndexRequest(metadata), RequestOptions.DEFAULT, actionListener)) - .then(); + return client.createIndex(createIndexRequest(metadata)); } protected Mono doPutIndex(ElasticSearchIndexMetadata metadata, @@ -62,11 +55,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc if (exists) { return doLoadIndexMetadata(index) .flatMap(oldMapping -> Mono.justOrEmpty(createPutMappingRequest(metadata, oldMapping))) - .flatMap(request -> ReactorActionListener.mono( - actionListener -> - client.getWriteClient() - .indices() - .putMappingAsync(request, RequestOptions.DEFAULT, actionListener))) + .flatMap(request -> client.updateMapping(request)) .then(); } if (justUpdateMapping) { @@ -78,21 +67,18 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc protected Mono doLoadIndexMetadata(String _index) { String index = wrapIndex(_index); - return ReactorActionListener - .mono(listener -> client.getQueryClient() - .indices() - .getMappingAsync(new GetMappingsRequest().indices(index), RequestOptions.DEFAULT, listener)) + return client.getMapping(new GetMappingsRequest().indices(index)) .flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.mappings().get(index)))); } - public CreateIndexRequest createIndexRequest(ElasticSearchIndexMetadata metadata) { + protected CreateIndexRequest createIndexRequest(ElasticSearchIndexMetadata metadata) { CreateIndexRequest request = new CreateIndexRequest(wrapIndex(metadata.getIndex())); request.settings(properties.toSettings()); Map mappingConfig = new HashMap<>(); mappingConfig.put("properties", createElasticProperties(metadata.getProperties())); mappingConfig.put("dynamic_templates", createDynamicTemplates()); - request.mapping(mappingConfig); + mappingConfig.forEach(request::mapping); return request; } @@ -107,8 +93,10 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc log.debug("ignore update index [{}] mapping", wrapIndex(metadata.getIndex())); return null; } + Map mappingConfig = new HashMap<>(); PutMappingRequest request = new PutMappingRequest(wrapIndex(metadata.getIndex())); - request.source(Collections.singletonMap("properties", properties)); + mappingConfig.put("properties", createElasticProperties(metadata.getProperties())); + request.source(mappingConfig); return request; } @@ -142,6 +130,8 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc property.put("type", "boolean"); } else if (type instanceof GeoType) { property.put("type", "geo_point"); + } else if (type instanceof GeoShapeType) { + property.put("type", "geo_shape"); } else if (type instanceof ArrayType) { ArrayType arrayType = ((ArrayType) type); return createElasticProperty(arrayType.getElementType()); @@ -151,14 +141,26 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc property.put("properties", createElasticProperties(objectType.getProperties())); } else { property.put("type", "keyword"); - property.put("ignore_above",512); + property.put("ignore_above", 512); } return property; } - protected ElasticSearchIndexMetadata convertMetadata(String index, MappingMetaData metaData) { - Map response = metaData.getSourceAsMap(); - Object properties = response.get("properties"); + protected ElasticSearchIndexMetadata convertMetadata(String index, ImmutableOpenMap metaData) { + MappingMetadata mappingMetadata = null; + if (metaData.size() == 1) { + Object res = metaData.values().iterator().next().value; + if (res instanceof MappingMetadata) { + mappingMetadata = ((MappingMetadata) res); + } else if (res instanceof CompressedXContent) { + mappingMetadata = new MappingMetadata(((CompressedXContent) res)); + } + } + if (mappingMetadata == null) { + throw new UnsupportedOperationException("unsupported index metadata" + metaData); + } + Object properties = mappingMetadata.getSourceAsMap().get("properties"); + return new DefaultElasticSearchIndexMetadata(index, convertProperties(properties)); } @@ -216,5 +218,4 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc return maps; } - } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java index 96ece4c1..decfd9df 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java @@ -1,16 +1,16 @@ package org.jetlinks.community.elastic.search.index.strategies; -import org.jetlinks.community.elastic.search.ElasticRestClient; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; +import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @Component public class DirectElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy { - public DirectElasticSearchIndexStrategy(ElasticRestClient client, ElasticSearchIndexProperties properties) { - super("direct", client,properties); + public DirectElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) { + super("direct", client, properties); } @Override diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java index 178175e7..b6942d1f 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java @@ -1,15 +1,11 @@ package org.jetlinks.community.elastic.search.index.strategies; import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.indices.GetIndexTemplatesRequest; -import org.elasticsearch.client.indices.GetIndexTemplatesResponse; -import org.elasticsearch.client.indices.PutIndexTemplateRequest; -import org.jetlinks.community.elastic.search.ElasticRestClient; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; -import org.jetlinks.community.elastic.search.utils.ReactorActionListener; +import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient; import reactor.core.publisher.Mono; import java.util.Collections; @@ -19,8 +15,8 @@ import java.util.Map; public abstract class TemplateElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy { - public TemplateElasticSearchIndexStrategy(String id, ElasticRestClient client, ElasticSearchIndexProperties properties) { - super(id, client,properties); + public TemplateElasticSearchIndexStrategy(String id, ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) { + super(id, client, properties); } protected String getTemplate(String index) { @@ -45,15 +41,12 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic @Override public Mono putIndex(ElasticSearchIndexMetadata metadata) { - return ReactorActionListener - .mono(listener -> client.getWriteClient() - .indices()//修改索引模版 - .putTemplateAsync(createIndexTemplateRequest(metadata), RequestOptions.DEFAULT, listener)) + return client + .updateTemplate(createIndexTemplateRequest(metadata)) //修改当前索引 .then(doPutIndex(metadata.newIndexName(getIndexForSave(metadata.getIndex())), true)); } - protected PutIndexTemplateRequest createIndexTemplateRequest(ElasticSearchIndexMetadata metadata) { String index = wrapIndex(metadata.getIndex()); PutIndexTemplateRequest request = new PutIndexTemplateRequest(getTemplate(index)); @@ -62,7 +55,7 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic Map mappingConfig = new HashMap<>(); mappingConfig.put("properties", createElasticProperties(metadata.getProperties())); mappingConfig.put("dynamic_templates", createDynamicTemplates()); - request.mapping(mappingConfig); + request.mapping("_doc",mappingConfig); request.patterns(getIndexPatterns(index)); return request; } @@ -70,11 +63,7 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic @Override public Mono loadIndexMetadata(String index) { - - return ReactorActionListener - .mono(listener -> client.getQueryClient() - .indices() - .getIndexTemplateAsync(new GetIndexTemplatesRequest(getTemplate(index)), RequestOptions.DEFAULT, listener)) + return client.getTemplate(new GetIndexTemplatesRequest(getTemplate(index))) .filter(resp -> resp.getIndexTemplates().size() > 0) .flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.getIndexTemplates().get(0).mappings()))); } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java index 31ec0789..d0c7f276 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java @@ -1,8 +1,8 @@ package org.jetlinks.community.elastic.search.index.strategies; import org.hswebframework.utils.time.DateFormatter; -import org.jetlinks.community.elastic.search.ElasticRestClient; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties; +import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient; import org.springframework.stereotype.Component; import java.util.Date; @@ -18,7 +18,7 @@ public class TimeByMonthElasticSearchIndexStrategy extends TemplateElasticSearch private final String format = "yyyy-MM"; - public TimeByMonthElasticSearchIndexStrategy(ElasticRestClient client, ElasticSearchIndexProperties properties) { + public TimeByMonthElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) { super("time-by-month", client,properties); } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java index 470f9c5a..5c988f4a 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java @@ -59,9 +59,10 @@ import java.util.stream.Collectors; * @author zhouhao * @since 1.0 **/ -@Service +//@Service @Slf4j @DependsOn("restHighLevelClient") +@Deprecated public class DefaultElasticSearchService implements ElasticSearchService { private final ElasticRestClient restClient; @@ -138,7 +139,7 @@ public class DefaultElasticSearchService implements ElasticSearchService { convertQueryResult(tp2.getT1(), tp2.getT2(), mapper) .collectList() .filter(CollectionUtils::isNotEmpty) - .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits(), list, queryParam)) + .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits().value, list, queryParam)) ) .switchIfEmpty(Mono.fromSupplier(PagerResult::empty)); } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java new file mode 100644 index 00000000..bb05429a --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java @@ -0,0 +1,1295 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import lombok.SneakyThrows; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.http.util.EntityUtils; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.admin.indices.flush.FlushResponse; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; +import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.main.MainRequest; +import org.elasticsearch.action.main.MainResponse; +import org.elasticsearch.action.search.*; +import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Request; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.uid.Versions; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.*; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.tasks.TaskId; +import org.reactivestreams.Publisher; +import org.springframework.data.elasticsearch.client.ClientLogger; +import org.springframework.data.elasticsearch.client.ElasticsearchHost; +import org.springframework.data.elasticsearch.client.NoReachableHostException; +import org.springframework.data.elasticsearch.client.reactive.HostProvider; +import org.springframework.data.elasticsearch.client.reactive.RequestBodyEncodingException; +import org.springframework.data.elasticsearch.client.reactive.RequestCreator; +import org.springframework.data.elasticsearch.client.util.NamedXContents; +import org.springframework.data.elasticsearch.client.util.RequestConverters; +import org.springframework.data.elasticsearch.client.util.ScrollState; +import org.springframework.data.util.Lazy; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.util.ReflectionUtils; +import org.springframework.web.reactive.function.BodyExtractors; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.ConnectException; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Locale; +import java.util.Map; +import java.util.StringJoiner; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType; + +public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient { + private final HostProvider hostProvider; + private final RequestCreator requestCreator; + private Supplier headersSupplier = () -> HttpHeaders.EMPTY; + + /** + * Create a new {@link org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient} using the given {@link HostProvider} to obtain server + * connections and the given {@link RequestCreator}. + * + * @param hostProvider must not be {@literal null}. + * @param requestCreator must not be {@literal null}. + */ + public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) { + + Assert.notNull(hostProvider, "HostProvider must not be null"); + Assert.notNull(requestCreator, "RequestCreator must not be null"); + + this.hostProvider = hostProvider; + this.requestCreator = requestCreator; + } + + public void setHeadersSupplier(Supplier headersSupplier) { + + Assert.notNull(headersSupplier, "headersSupplier must not be null"); + + this.headersSupplier = headersSupplier; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders) + */ + @Override + public Mono ping(HttpHeaders headers) { + + return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) // + .map(response -> response.statusCode().is2xxSuccessful()) // + .onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders) + */ + @Override + public Mono info(HttpHeaders headers) { + + return sendRequest(new MainRequest(), requestCreator.info(), MainResponse.class, headers) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest) + */ + @Override + public Mono get(HttpHeaders headers, GetRequest getRequest) { + + return sendRequest(getRequest, requestCreator.get(), GetResponse.class, headers) // + .filter(GetResponse::isExists) // + .map(DefaultReactiveElasticsearchClient::getResponseToGetResult) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#multiGet(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.MultiGetRequest) + */ + @Override + public Flux multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) { + + return sendRequest(multiGetRequest, requestCreator.multiGet(), MultiGetResponse.class, headers) + .map(MultiGetResponse::getResponses) // + .flatMap(Flux::fromArray) // + .filter(it -> !it.isFailed() && it.getResponse().isExists()) // + .map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse())); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest) + */ + @Override + public Mono exists(HttpHeaders headers, GetRequest getRequest) { + + return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) // + .map(response -> response.statusCode().is2xxSuccessful()) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest) + */ + @Override + public Mono index(HttpHeaders headers, IndexRequest indexRequest) { + return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).publishNext(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices() + */ + @Override + public Indices indices() { + return this; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest) + */ + @Override + public Mono update(HttpHeaders headers, UpdateRequest updateRequest) { + return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).publishNext(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest) + */ + @Override + public Mono delete(HttpHeaders headers, DeleteRequest deleteRequest) { + + return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) // + .publishNext(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Mono count(HttpHeaders headers, SearchRequest searchRequest) { + searchRequest.source().trackTotalHits(true); + searchRequest.source().size(0); + searchRequest.source().fetchSource(false); + return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) // + .map(SearchResponse::getHits) // + .map(searchHits -> searchHits.getTotalHits().value) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Flux search(HttpHeaders headers, SearchRequest searchRequest) { + + return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) // + .map(SearchResponse::getHits) // + .flatMap(Flux::fromIterable); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Flux aggregate(HttpHeaders headers, SearchRequest searchRequest) { + + Assert.notNull(headers, "headers must not be null"); + Assert.notNull(searchRequest, "searchRequest must not be null"); + + searchRequest.source().size(0); + searchRequest.source().trackTotalHits(false); + + return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) // + .map(SearchResponse::getAggregations) // + .flatMap(Flux::fromIterable); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Flux scroll(HttpHeaders headers, SearchRequest searchRequest) { + + TimeValue scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive() + : TimeValue.timeValueMinutes(1); + + if (searchRequest.scroll() == null) { + searchRequest.scroll(scrollTimeout); + } + + EmitterProcessor outbound = EmitterProcessor.create(false); + FluxSink request = outbound.sink(); + + EmitterProcessor inbound = EmitterProcessor.create(false); + + Flux exchange = outbound.startWith(searchRequest).flatMap(it -> { + + if (it instanceof SearchRequest) { + return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers); + } else if (it instanceof SearchScrollRequest) { + return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers); + } else if (it instanceof ClearScrollRequest) { + return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers) + .flatMap(discard -> Flux.empty()); + } + + throw new IllegalArgumentException( + String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it)); + }); + + return Flux.usingWhen(Mono.fromSupplier(ScrollState::new), + + scrollState -> { + + Flux searchHits = inbound.handle((searchResponse, sink) -> { + + scrollState.updateScrollId(searchResponse.getScrollId()); + if (isEmpty(searchResponse.getHits())) { + + inbound.onComplete(); + outbound.onComplete(); + + } else { + + sink.next(searchResponse); + + SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId()) + .scroll(scrollTimeout); + request.next(searchScrollRequest); + } + + }).map(SearchResponse::getHits) // + .flatMap(Flux::fromIterable); + + return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound)); + + }, + state -> cleanupScroll(headers, state), // + (state,error) -> cleanupScroll(headers, state), // + state -> cleanupScroll(headers, state)); // + } + + private static boolean isEmpty(@Nullable SearchHits hits) { + return hits != null && hits.getHits() != null && hits.getHits().length == 0; + } + + private Publisher cleanupScroll(HttpHeaders headers, ScrollState state) { + + if (state.getScrollIds().isEmpty()) { + return Mono.empty(); + } + + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.scrollIds(state.getScrollIds()); + + // just send the request, resources get cleaned up anyways after scrollTimeout has been reached. + return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest) + */ + @Override + public Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) { + + return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) // + .publishNext(); + } + + static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) { + XContentType requestContentType = indexRequest.getContentType(); + if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) { + throw new IllegalArgumentException("Unsupported content-type found for request with content-type [" + + requestContentType + "], only JSON and SMILE are supported"); + } + if (xContentType == null) { + return requestContentType; + } + if (requestContentType != xContentType) { + throw new IllegalArgumentException("Mismatching content-type found for request with content-type [" + + requestContentType + "], previous requests have content-type [" + xContentType + ']'); + } + return xContentType; + } + + @SneakyThrows + Request convertBulk(BulkRequest bulkRequest) { + Request request = new Request(HttpMethod.POST.name(), "/_bulk"); + + Params parameters = new Params(request); + parameters.withTimeout(bulkRequest.timeout()); + parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy()); + + // parameters.withPipeline(bulkRequest.pipeline()); + // parameters.withRouting(bulkRequest.routing()); + + // Bulk API only supports newline delimited JSON or Smile. Before executing + // the bulk, we need to check that all requests have the same content-type + // and this content-type is supported by the Bulk API. + XContentType bulkContentType = null; + for (int i = 0; i < bulkRequest.numberOfActions(); i++) { + DocWriteRequest action = bulkRequest.requests().get(i); + + DocWriteRequest.OpType opType = action.opType(); + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + bulkContentType = enforceSameContentType((IndexRequest) action, bulkContentType); + + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) action; + if (updateRequest.doc() != null) { + bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType); + } + if (updateRequest.upsertRequest() != null) { + bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType); + } + } + } + + if (bulkContentType == null) { + bulkContentType = XContentType.JSON; + } + + final byte separator = bulkContentType.xContent().streamSeparator(); + final ContentType requestContentType = createContentType(bulkContentType); + + ByteArrayOutputStream content = new ByteArrayOutputStream(); + for (DocWriteRequest action : bulkRequest.requests()) { + DocWriteRequest.OpType opType = action.opType(); + + try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) { + metadata.startObject(); + { + metadata.startObject(opType.getLowercase()); + if (Strings.hasLength(action.index())) { + metadata.field("_index", action.index()); + } + if (Strings.hasLength(action.type())) { + metadata.field("_type", action.type()); + } + if (Strings.hasLength(action.id())) { + metadata.field("_id", action.id()); + } + if (Strings.hasLength(action.routing())) { + metadata.field("routing", action.routing()); + } + if (action.version() != Versions.MATCH_ANY) { + metadata.field("version", action.version()); + } + + VersionType versionType = action.versionType(); + if (versionType != VersionType.INTERNAL) { + if (versionType == VersionType.EXTERNAL) { + metadata.field("version_type", "external"); + } else if (versionType == VersionType.EXTERNAL_GTE) { + metadata.field("version_type", "external_gte"); + } + } + + if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + metadata.field("if_seq_no", action.ifSeqNo()); + metadata.field("if_primary_term", action.ifPrimaryTerm()); + } + + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) action; + if (Strings.hasLength(indexRequest.getPipeline())) { + metadata.field("pipeline", indexRequest.getPipeline()); + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + UpdateRequest updateRequest = (UpdateRequest) action; + if (updateRequest.retryOnConflict() > 0) { + metadata.field("retry_on_conflict", updateRequest.retryOnConflict()); + } + if (updateRequest.fetchSource() != null) { + metadata.field("_source", updateRequest.fetchSource()); + } + } + metadata.endObject(); + } + metadata.endObject(); + + BytesRef metadataSource = BytesReference.bytes(metadata).toBytesRef(); + content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length); + content.write(separator); + } + + BytesRef source = null; + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { + IndexRequest indexRequest = (IndexRequest) action; + BytesReference indexSource = indexRequest.source(); + XContentType indexXContentType = indexRequest.getContentType(); + + try (XContentParser parser = XContentHelper.createParser( + /* + * EMPTY and THROW are fine here because we just call + * copyCurrentStructure which doesn't touch the + * registry or deprecation. + */ + NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexSource, + indexXContentType)) { + try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) { + builder.copyCurrentStructure(parser); + source = BytesReference.bytes(builder).toBytesRef(); + } + } + } else if (opType == DocWriteRequest.OpType.UPDATE) { + source = XContentHelper.toXContent((UpdateRequest) action, bulkContentType, false).toBytesRef(); + } + + if (source != null) { + content.write(source.bytes, source.offset, source.length); + content.write(separator); + } + } + request.setEntity(new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType)); + return request; + } + + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest) + */ + @Override + public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { + return sendRequest(bulkRequest, this::convertBulk, BulkResponse.class, headers) // + .publishNext(); + } + + // --> INDICES + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#existsIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.get.GetIndexRequest) + */ + @Override + public Mono existsIndex(HttpHeaders headers, GetIndexRequest request) { + + return sendRequest(request, requestCreator.indexExists(), RawActionResponse.class, headers) // + .map(response -> response.statusCode().is2xxSuccessful()) // + .next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#deleteIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest) + */ + @Override + public Mono deleteIndex(HttpHeaders headers, DeleteIndexRequest request) { + + return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#createIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.create.CreateIndexRequest) + */ + @Override + public Mono createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) { + + return sendRequest(createIndexRequest, requestCreator.indexCreate().andThen(request -> { + request.addParameter("include_type_name","true"); + return request; + }), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#openIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.open.OpenIndexRequest) + */ + @Override + public Mono openIndex(HttpHeaders headers, OpenIndexRequest request) { + + return sendRequest(request, requestCreator.indexOpen(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#closeIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.close.CloseIndexRequest) + */ + @Override + public Mono closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) { + + return sendRequest(closeIndexRequest, requestCreator.indexClose(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#refreshIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.refresh.RefreshRequest) + */ + @Override + public Mono refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) { + + return sendRequest(refreshRequest, requestCreator.indexRefresh(), RefreshResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest) + */ + @Override + public Mono updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) { + + return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#flushIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.flush.FlushRequest) + */ + @Override + public Mono flushIndex(HttpHeaders headers, FlushRequest flushRequest) { + + return sendRequest(flushRequest, requestCreator.flushIndex(), FlushResponse.class, headers) // + .then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) + */ + @Override + public Mono execute(ReactiveElasticsearchClientCallback callback) { + + return this.hostProvider.getActive(HostProvider.Verification.LAZY) // + .flatMap(callback::doWithClient) // + .onErrorResume(throwable -> { + + if (throwable instanceof ConnectException) { + + return hostProvider.getActive(HostProvider.Verification.ACTIVE) // + .flatMap(callback::doWithClient); + } + + return Mono.error(throwable); + }); + } + + @Override + public Mono status() { + + return hostProvider.clusterInfo() // + .map(it -> new ClientStatus(it.getNodes())); + } + + // --> Private Response helpers + + private static GetResult getResponseToGetResult(GetResponse response) { + + return new GetResult(response.getIndex(), response.getType(), response.getId(), response.getSeqNo(), + response.getPrimaryTerm(), response.getVersion(), response.isExists(), response.getSourceAsBytesRef(), + response.getFields(), null); + } + + // --> + + private Flux sendRequest(Req request, Function converter, + Class responseType, HttpHeaders headers) { + return sendRequest(converter.apply(request), responseType, headers); + } + + private Flux sendRequest(Request request, Class responseType, HttpHeaders headers) { + + String logId = ClientLogger.newLogId(); + + return execute(webClient -> sendRequest(webClient, logId, request, headers)) + .flatMapMany(response -> readResponseBody(logId, request, response, responseType)); + } + + private Mono sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) { + + WebClient.RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) // + .uri(builder -> { + + builder = builder.path(request.getEndpoint()); + + if (!ObjectUtils.isEmpty(request.getParameters())) { + for (Map.Entry entry : request.getParameters().entrySet()) { + builder = builder.queryParam(entry.getKey(), entry.getValue()); + } + } + return builder.build(); + }) // + .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) // + .headers(theHeaders -> { + + // add all the headers explicitly set + theHeaders.addAll(headers); + + // and now those that might be set on the request. + if (request.getOptions() != null) { + + if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) { + request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue())); + } + } + + // plus the ones from the supplier + HttpHeaders suppliedHeaders = headersSupplier.get(); + if (suppliedHeaders != null && suppliedHeaders != HttpHeaders.EMPTY) { + theHeaders.addAll(suppliedHeaders); + } + }); + + if (request.getEntity() != null) { + + Lazy body = bodyExtractor(request); + + ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(), + body::get); + + requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue())); + requestBodySpec.body(Mono.fromSupplier(body), String.class); + } else { + ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters()); + } + + return requestBodySpec // + .exchange() // + .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build()); + } + + private Lazy bodyExtractor(Request request) { + + return Lazy.of(() -> { + + try { + return EntityUtils.toString(request.getEntity()); + } catch (IOException e) { + throw new RequestBodyEncodingException("Error encoding request", e); + } + }); + } + + private Publisher readResponseBody(String logId, Request request, ClientResponse response, + Class responseType) { + + if (RawActionResponse.class.equals(responseType)) { + + ClientLogger.logRawResponse(logId, response.statusCode()); + return Mono.just(responseType.cast(RawActionResponse.create(response))); + } + + if (response.statusCode().is5xxServerError()) { + + ClientLogger.logRawResponse(logId, response.statusCode()); + return handleServerError(request, response); + } + + if (response.statusCode().is4xxClientError()) { + + ClientLogger.logRawResponse(logId, response.statusCode()); + return handleClientError(logId, request, response, responseType); + } + + return response.body(BodyExtractors.toMono(byte[].class)) // + .map(it -> new String(it, StandardCharsets.UTF_8)) // + .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) // + .flatMap(content -> doDecode(response, responseType, content)); + } + + private static Mono doDecode(ClientResponse response, Class responseType, String content) { + + String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); + + try { + + Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class); + + return Mono.justOrEmpty(responseType + .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content)))); + + } catch (Throwable errorParseFailure) { // cause elasticsearch also uses AssertionError + + try { + return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content))); + } catch (Exception e) { + + return Mono + .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); + } + } + } + + private static XContentParser createParser(String mediaType, String content) throws IOException { + return XContentType.fromMediaTypeOrFormat(mediaType) // + .xContent() // + .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()), + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content); + } + + private Publisher handleServerError(Request request, ClientResponse response) { + + int statusCode = response.statusCode().value(); + RestStatus status = RestStatus.fromCode(statusCode); + String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); + + return response.body(BodyExtractors.toMono(byte[].class)) // + .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // + .flatMap(content -> contentOrError(content, mediaType, status)) + .flatMap(unused -> Mono + .error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.", + request.getMethod(), request.getEndpoint(), statusCode), status))); + } + + private Publisher handleClientError(String logId, Request request, ClientResponse response, + Class responseType) { + + int statusCode = response.statusCode().value(); + RestStatus status = RestStatus.fromCode(statusCode); + String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType()); + + return response.body(BodyExtractors.toMono(byte[].class)) // + .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) // + .flatMap(content -> contentOrError(content, mediaType, status)) // + .doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)) // + .flatMap(content -> doDecode(response, responseType, content)); + } + + // region ElasticsearchException helper + + /** + * checks if the given content body contains an {@link ElasticsearchException}, if yes it is returned in a Mono.error. + * Otherwise the content is returned in the Mono + * + * @param content the content to analyze + * @param mediaType the returned media type + * @param status the response status + * @return a Mono with the content or an Mono.error + */ + private static Mono contentOrError(String content, String mediaType, RestStatus status) { + + ElasticsearchException exception = getElasticsearchException(content, mediaType, status); + + if (exception != null) { + StringBuilder sb = new StringBuilder(); + buildExceptionMessages(sb, exception); + return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception)); + } + + return Mono.just(content); + } + + /** + * tries to parse an {@link ElasticsearchException} from the given body content + * + * @param content the content to analyse + * @param mediaType the type of the body content + * @return an {@link ElasticsearchException} or {@literal null}. + */ + @Nullable + private static ElasticsearchException getElasticsearchException(String content, String mediaType, RestStatus status) { + + try { + XContentParser parser = createParser(mediaType, content); + // we have a JSON object with an error and a status field + XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT + + do { + token = parser.nextToken(); + + if ("error".equals(parser.currentName())) { + return ElasticsearchException.failureFromXContent(parser); + } + } while (token == XContentParser.Token.FIELD_NAME); + + return null; + } catch (IOException e) { + return new ElasticsearchStatusException(content, status); + } + } + + private static void buildExceptionMessages(StringBuilder sb, Throwable t) { + + sb.append(t.getMessage()); + for (Throwable throwable : t.getSuppressed()) { + sb.append(", "); + buildExceptionMessages(sb, throwable); + } + } + + @Override + public Mono searchForPage(SearchRequest request) { + + return sendRequest(request, requestCreator.search(), SearchResponse.class, HttpHeaders.EMPTY) + .singleOrEmpty(); + } + + @SneakyThrows + protected Request convertMultiSearchRequest(MultiSearchRequest searchRequest) { + return RequestConverters.multiSearch(searchRequest); + } + + @Override + @SneakyThrows + public Mono multiSearch(MultiSearchRequest request) { + + return sendRequest(request, this::convertMultiSearchRequest, MultiSearchResponse.class, HttpHeaders.EMPTY) + .singleOrEmpty(); + } + + Request convertGetMappingRequest(GetMappingsRequest getMappingsRequest) { + String[] indices = getMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.indices(); + + Request request = new Request(HttpGet.METHOD_NAME, "/" + String.join(",", indices) + "/_mapping"); + + Params parameters = new Params(request); + parameters.withMasterTimeout(getMappingsRequest.masterNodeTimeout()); + parameters.withIndicesOptions(getMappingsRequest.indicesOptions()); + parameters.withLocal(getMappingsRequest.local()); + + return request; + } + + + @Override + public Mono getMapping(GetMappingsRequest request) { + + return sendRequest(request, this::convertGetMappingRequest, GetMappingsResponse.class, HttpHeaders.EMPTY) + .singleOrEmpty(); + } + + Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) { + final Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names())); + + return request; + } + + @Override + public Mono getTemplate(GetIndexTemplatesRequest request) { + return sendRequest(request, this::convertGetIndexTemplateRequest, GetIndexTemplatesResponse.class, HttpHeaders.EMPTY) + .singleOrEmpty(); + } + + @SneakyThrows + Request convertPutIndexTemplateRequest(PutIndexTemplateRequest putIndexTemplateRequest) { + Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + putIndexTemplateRequest.name()); + Params params = new Params(request); + params.withMasterTimeout(putIndexTemplateRequest.masterNodeTimeout()); + if (putIndexTemplateRequest.create()) { + params.putParam("create", Boolean.TRUE.toString()); + } + if (Strings.hasText(putIndexTemplateRequest.cause())) { + params.putParam("cause", putIndexTemplateRequest.cause()); + } + params.putParam("include_type_name","true"); + BytesRef source = XContentHelper.toXContent(putIndexTemplateRequest, XContentType.JSON, false).toBytesRef(); + request.setEntity(new ByteArrayEntity(source.bytes, source.offset, source.length, ContentType.APPLICATION_JSON)); + return request; + } + + @Override + public Mono updateTemplate(PutIndexTemplateRequest request) { + return sendRequest(request, this::convertPutIndexTemplateRequest, AcknowledgedResponse.class, HttpHeaders.EMPTY) + .singleOrEmpty(); + } + + // endregion + + // region internal classes + + /** + * Reactive client {@link Status} implementation. + * + * @author Christoph Strobl + */ + class ClientStatus implements Status { + + private final Collection connectedHosts; + + ClientStatus(Collection connectedHosts) { + this.connectedHosts = connectedHosts; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Status#hosts() + */ + @Override + public Collection hosts() { + return connectedHosts; + } + } + + static class Params { + private final Request request; + + Params(Request request) { + this.request = request; + } + + Params putParam(String name, String value) { + if (Strings.hasLength(value)) { + request.addParameter(name, value); + } + return this; + } + + Params putParam(String key, TimeValue value) { + if (value != null) { + return putParam(key, value.getStringRep()); + } + return this; + } + + Params withDocAsUpsert(boolean docAsUpsert) { + if (docAsUpsert) { + return putParam("doc_as_upsert", Boolean.TRUE.toString()); + } + return this; + } + + Params withFetchSourceContext(FetchSourceContext fetchSourceContext) { + if (fetchSourceContext != null) { + if (!fetchSourceContext.fetchSource()) { + putParam("_source", Boolean.FALSE.toString()); + } + if (fetchSourceContext.includes() != null && fetchSourceContext.includes().length > 0) { + putParam("_source_includes", String.join(",", fetchSourceContext.includes())); + } + if (fetchSourceContext.excludes() != null && fetchSourceContext.excludes().length > 0) { + putParam("_source_excludes", String.join(",", fetchSourceContext.excludes())); + } + } + return this; + } + + Params withFields(String[] fields) { + if (fields != null && fields.length > 0) { + return putParam("fields", String.join(",", fields)); + } + return this; + } + + Params withMasterTimeout(TimeValue masterTimeout) { + return putParam("master_timeout", masterTimeout); + } + + Params withPipeline(String pipeline) { + return putParam("pipeline", pipeline); + } + + Params withPreference(String preference) { + return putParam("preference", preference); + } + + Params withRealtime(boolean realtime) { + if (!realtime) { + return putParam("realtime", Boolean.FALSE.toString()); + } + return this; + } + + Params withRefresh(boolean refresh) { + if (refresh) { + return withRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + return this; + } + + Params withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) { + return putParam("refresh", refreshPolicy.getValue()); + } + return this; + } + + Params withRequestsPerSecond(float requestsPerSecond) { + // the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY, + // but we don't want to add that to the URL parameters, instead we use -1 + if (Float.isFinite(requestsPerSecond)) { + return putParam("requests_per_second", Float.toString(requestsPerSecond)); + } else { + return putParam("requests_per_second", "-1"); + } + } + + Params withRetryOnConflict(int retryOnConflict) { + if (retryOnConflict > 0) { + return putParam("retry_on_conflict", String.valueOf(retryOnConflict)); + } + return this; + } + + Params withRouting(String routing) { + return putParam("routing", routing); + } + + Params withStoredFields(String[] storedFields) { + if (storedFields != null && storedFields.length > 0) { + return putParam("stored_fields", String.join(",", storedFields)); + } + return this; + } + + Params withTimeout(TimeValue timeout) { + return putParam("timeout", timeout); + } + + Params withVersion(long version) { + if (version != Versions.MATCH_ANY) { + return putParam("version", Long.toString(version)); + } + return this; + } + + Params withVersionType(VersionType versionType) { + if (versionType != VersionType.INTERNAL) { + return putParam("version_type", versionType.name().toLowerCase(Locale.ROOT)); + } + return this; + } + + Params withIfSeqNo(long seqNo) { + if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + return putParam("if_seq_no", Long.toString(seqNo)); + } + return this; + } + + Params withIfPrimaryTerm(long primaryTerm) { + if (primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { + return putParam("if_primary_term", Long.toString(primaryTerm)); + } + return this; + } + + Params withWaitForActiveShards(ActiveShardCount activeShardCount) { + return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT); + } + + Params withWaitForActiveShards(ActiveShardCount activeShardCount, ActiveShardCount defaultActiveShardCount) { + if (activeShardCount != null && activeShardCount != defaultActiveShardCount) { + // in Elasticsearch 7, "default" cannot be sent anymore, so it needs to be mapped to the default value of 1 + String value = activeShardCount == ActiveShardCount.DEFAULT ? "1" + : activeShardCount.toString().toLowerCase(Locale.ROOT); + return putParam("wait_for_active_shards", value); + } + return this; + } + + Params withIndicesOptions(IndicesOptions indicesOptions) { + withIgnoreUnavailable(indicesOptions.ignoreUnavailable()); + putParam("allow_no_indices", Boolean.toString(indicesOptions.allowNoIndices())); + String expandWildcards; + if (!indicesOptions.expandWildcardsOpen() && !indicesOptions.expandWildcardsClosed()) { + expandWildcards = "none"; + } else { + StringJoiner joiner = new StringJoiner(","); + if (indicesOptions.expandWildcardsOpen()) { + joiner.add("open"); + } + if (indicesOptions.expandWildcardsClosed()) { + joiner.add("closed"); + } + expandWildcards = joiner.toString(); + } + putParam("expand_wildcards", expandWildcards); + return this; + } + + Params withIgnoreUnavailable(boolean ignoreUnavailable) { + // Always explicitly place the ignore_unavailable value. + putParam("ignore_unavailable", Boolean.toString(ignoreUnavailable)); + return this; + } + + Params withHuman(boolean human) { + if (human) { + putParam("human", "true"); + } + return this; + } + + Params withLocal(boolean local) { + if (local) { + putParam("local", "true"); + } + return this; + } + + Params withIncludeDefaults(boolean includeDefaults) { + if (includeDefaults) { + return putParam("include_defaults", Boolean.TRUE.toString()); + } + return this; + } + + Params withPreserveExisting(boolean preserveExisting) { + if (preserveExisting) { + return putParam("preserve_existing", Boolean.TRUE.toString()); + } + return this; + } + + Params withDetailed(boolean detailed) { + if (detailed) { + return putParam("detailed", Boolean.TRUE.toString()); + } + return this; + } + + Params withWaitForCompletion(Boolean waitForCompletion) { + return putParam("wait_for_completion", waitForCompletion.toString()); + } + + Params withNodes(String[] nodes) { + if (nodes != null && nodes.length > 0) { + return putParam("nodes", String.join(",", nodes)); + } + return this; + } + + Params withActions(String[] actions) { + if (actions != null && actions.length > 0) { + return putParam("actions", String.join(",", actions)); + } + return this; + } + + Params withTaskId(TaskId taskId) { + if (taskId != null && taskId.isSet()) { + return putParam("task_id", taskId.toString()); + } + return this; + } + + Params withParentTaskId(TaskId parentTaskId) { + if (parentTaskId != null && parentTaskId.isSet()) { + return putParam("parent_task_id", parentTaskId.toString()); + } + return this; + } + + Params withVerify(boolean verify) { + if (verify) { + return putParam("verify", Boolean.TRUE.toString()); + } + return this; + } + + Params withWaitForStatus(ClusterHealthStatus status) { + if (status != null) { + return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT)); + } + return this; + } + + Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) { + if (waitNoRelocatingShards) { + return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString()); + } + return this; + } + + + Params withWaitForNoInitializingShards(boolean waitNoInitShards) { + if (waitNoInitShards) { + return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString()); + } + return this; + } + + Params withWaitForNodes(String waitForNodes) { + return putParam("wait_for_nodes", waitForNodes); + } + + Params withLevel(ClusterHealthRequest.Level level) { + return putParam("level", level.name().toLowerCase(Locale.ROOT)); + } + + Params withWaitForEvents(Priority waitForEvents) { + if (waitForEvents != null) { + return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT)); + } + return this; + } + + } + +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java new file mode 100644 index 00000000..fc4de112 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java @@ -0,0 +1,59 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.springframework.http.HttpStatus; +import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.web.reactive.function.BodyExtractor; +import org.springframework.web.reactive.function.client.ClientResponse; + +import java.io.IOException; + +/** + * Extension to {@link ActionResponse} that also delegates to {@link ClientResponse}. + * + * @author Christoph Strobl + * @author Peter-Josef Meisch + * @author Mark Paluch + * @since 3.2 + */ +class RawActionResponse extends ActionResponse { + + private final ClientResponse delegate; + + private RawActionResponse(ClientResponse delegate) { + this.delegate = delegate; + } + + static RawActionResponse create(ClientResponse response) { + return new RawActionResponse(response); + } + + public HttpStatus statusCode() { + return delegate.statusCode(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#headers() + */ + public ClientResponse.Headers headers() { + return delegate.headers(); + } + + /* + * (non-Javadoc) + * @see org.springframework.web.reactive.function.client.ClientResponse#body(org.springframework.web.reactive.function.BodyExtractor) + */ + public T body(BodyExtractor extractor) { + return delegate.body(extractor); + } + + /* + * (non-Javadoc) + * until Elasticsearch 7.4 this empty implementation was available in the abstract base class + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + } +} \ No newline at end of file diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java new file mode 100644 index 00000000..83ccdd50 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java @@ -0,0 +1,199 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.hswebframework.ezorm.core.param.QueryParam; +import org.hswebframework.ezorm.core.param.TermType; +import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket; +import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure; +import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse; +import org.jetlinks.community.elastic.search.aggreation.bucket.Sort; +import org.jetlinks.community.elastic.search.aggreation.enums.BucketType; +import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType; +import org.jetlinks.community.elastic.search.aggreation.enums.OrderType; +import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; +import org.jetlinks.community.elastic.search.service.AggregationService; +import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService; +import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; +import org.jetlinks.community.timeseries.query.AggregationQueryParam; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * @author bsetfeng + * @since 1.0 + **/ +@Service +@Slf4j +public class ReactiveAggregationService implements AggregationService { + + private final ReactiveElasticsearchClient restClient; + + private final ElasticSearchIndexManager indexManager; + + @Autowired + public ReactiveAggregationService(ElasticSearchIndexManager indexManager, + ReactiveElasticsearchClient restClient) { + this.restClient = restClient; + this.indexManager = indexManager; + } + + private Mono createSearchSourceBuilder(QueryParam queryParam, String index) { + + return indexManager + .getIndexMetadata(index) + .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata)); + } + + @Override + public Flux> aggregation(String[] index, AggregationQueryParam aggregationQueryParam) { + QueryParam queryParam = prepareQueryParam(aggregationQueryParam); + BucketAggregationsStructure structure = createAggParameter(aggregationQueryParam); + return Flux.fromArray(index) + .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx))) + .collectList() + .flatMap(strategy -> + createSearchSourceBuilder(queryParam, index[0]) + .map(builder -> + new SearchRequest(strategy + .stream() + .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2())) + .toArray(String[]::new)) + .indicesOptions(DefaultElasticSearchService.indexOptions) + .source(builder.size(0).aggregation(structure.getType().aggregationBuilder(structure)) + ) + ) + ) + .flatMap(restClient::searchForPage) + .filter(response -> response.getAggregations() != null) + .map(response -> BucketResponse.builder() + .name(structure.getName()) + .buckets(structure.getType().convert(response.getAggregations().get(structure.getName()))) + .build()) + .flatMapIterable(BucketsParser::convert) + .take(aggregationQueryParam.getLimit()) + ; + } + + static class BucketsParser { + + private final List> result = new ArrayList<>(); + + public static List> convert(BucketResponse response) { + return new BucketsParser(response).result; + } + + public BucketsParser(BucketResponse response) { + this(response.getBuckets()); + } + + public BucketsParser(List buckets) { + buckets.forEach(bucket -> parser(bucket, new HashMap<>())); + } + + public void parser(Bucket bucket, Map fMap) { + addBucketProperty(bucket, fMap); + if (bucket.getBuckets() != null && !bucket.getBuckets().isEmpty()) { + bucket.getBuckets().forEach(b -> { + Map map = new HashMap<>(fMap); + addBucketProperty(b, map); + parser(b, map); + }); + } else { + result.add(fMap); + } + } + + private void addBucketProperty(Bucket bucket, Map fMap) { + fMap.put(bucket.getName(), bucket.getKey()); + fMap.putAll(bucket.toMap()); + } + } + + protected static QueryParam prepareQueryParam(AggregationQueryParam param) { + QueryParam queryParam = param.getQueryParam().clone(); + queryParam.setPaging(false); + queryParam.and(param.getTimeProperty(), TermType.btw, Arrays.asList(calculateStartWithTime(param), param.getEndWithTime())); + if (queryParam.getSorts().isEmpty()) { + queryParam.orderBy(param.getTimeProperty()).desc(); + } + return queryParam; + } + + protected BucketAggregationsStructure createAggParameter(AggregationQueryParam param) { + List structures = new ArrayList<>(); + if (param.getGroupByTime() != null) { + structures.add(convertAggGroupTimeStructure(param)); + } + if (param.getGroupBy() != null && !param.getGroupBy().isEmpty()) { + structures.addAll(getTermTypeStructures(param)); + } + for (int i = 0, size = structures.size(); i < size; i++) { + if (i < size - 1) { + structures.get(i).setSubBucketAggregation(Collections.singletonList(structures.get(i + 1))); + } + if (i == size - 1) { + structures.get(i) + .setSubMetricsAggregation(param + .getAggColumns() + .stream() + .map(agg -> { + MetricsAggregationStructure metricsAggregationStructure = new MetricsAggregationStructure(); + metricsAggregationStructure.setField(agg.getProperty()); + metricsAggregationStructure.setName(agg.getAlias()); + metricsAggregationStructure.setType(MetricsType.of(agg.getAggregation().name())); + return metricsAggregationStructure; + }).collect(Collectors.toList())); + } + } + return structures.get(0); + } + + protected BucketAggregationsStructure convertAggGroupTimeStructure(AggregationQueryParam param) { + BucketAggregationsStructure structure = new BucketAggregationsStructure(); + structure.setInterval(param.getGroupByTime().getInterval().toString()); + structure.setType(BucketType.DATE_HISTOGRAM); + structure.setFormat(param.getGroupByTime().getFormat()); + structure.setName(param.getGroupByTime().getAlias()); + structure.setField(param.getGroupByTime().getProperty()); + structure.setSort(Sort.desc(OrderType.KEY)); + structure.setExtendedBounds(getExtendedBounds(param)); + return structure; + } + + protected static ExtendedBounds getExtendedBounds(AggregationQueryParam param) { + return new ExtendedBounds(calculateStartWithTime(param), param.getEndWithTime()); + } + + private static long calculateStartWithTime(AggregationQueryParam param) { + long startWithParam = param.getStartWithTime(); +// if (param.getGroupByTime() != null && param.getGroupByTime().getInterval() != null) { +// long timeInterval = param.getGroupByTime().getInterval().toMillis() * param.getLimit(); +// long tempStartWithParam = param.getEndWithTime() - timeInterval; +// startWithParam = Math.max(tempStartWithParam, startWithParam); +// } + return startWithParam; + } + + protected List getTermTypeStructures(AggregationQueryParam param) { + return param.getGroupBy() + .stream() + .map(group -> { + BucketAggregationsStructure structure = new BucketAggregationsStructure(); + structure.setType(BucketType.TERMS); + structure.setSize(param.getLimit()); + structure.setField(group.getProperty()); + structure.setName(group.getAlias()); + return structure; + }).collect(Collectors.toList()); + } + +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java new file mode 100644 index 00000000..0d0777bc --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java @@ -0,0 +1,413 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.core.CountRequest; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.hswebframework.ezorm.core.param.QueryParam; +import org.hswebframework.utils.time.DateFormatter; +import org.hswebframework.utils.time.DefaultDateFormatter; +import org.hswebframework.web.api.crud.entity.PagerResult; +import org.hswebframework.web.bean.FastBeanCopier; +import org.jetlinks.core.utils.FluxUtils; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; +import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; +import org.jetlinks.community.elastic.search.service.ElasticSearchService; +import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; +import org.jetlinks.community.elastic.search.utils.QueryParamTranslator; +import org.reactivestreams.Publisher; +import org.springframework.context.annotation.DependsOn; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; +import reactor.core.publisher.BufferOverflowStrategy; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; + +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.*; +import java.util.function.Function; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * @author zhouhao + * @since 1.0 + **/ +@Service("elasticSearchService") +@Slf4j +@DependsOn("reactiveElasticsearchClient") +public class ReactiveElasticSearchService implements ElasticSearchService { + + private final ReactiveElasticsearchClient restClient; + + private final ElasticSearchIndexManager indexManager; + + FluxSink sink; + + public static final IndicesOptions indexOptions = IndicesOptions.fromOptions( + true, true, false, false + ); + + static { + DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + } + + public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, + ElasticSearchIndexManager indexManager) { + this.restClient = restClient; + init(); + this.indexManager = indexManager; + } + + @Override + public Flux multiQuery(String[] index, Collection queryParam, Function, T> mapper) { + return indexManager + .getIndexesMetadata(index) + .flatMap(idx -> Mono.zip( + Mono.just(idx), getIndexForSearch(idx.getIndex()) + )) + .take(1) + .singleOrEmpty() + .flatMapMany(indexMetadata -> { + MultiSearchRequest request = new MultiSearchRequest(); + return Flux + .fromIterable(queryParam) + .flatMap(entry -> createSearchRequest(entry, index)) + .doOnNext(request::add) + .then(Mono.just(request)) + .flatMapMany(searchRequest -> restClient.multiSearch(searchRequest) + .flatMapMany(response -> Flux.fromArray(response.getResponses())) + .flatMap(item -> { + if (item.isFailure()) { + log.warn(item.getFailureMessage(), item.getFailure()); + return Mono.empty(); + } + return Flux.fromIterable(translate((map) -> mapper.apply(indexMetadata.getT1().convertFromElastic(map)), item.getResponse())); + })) + ; + }); + } + + public Flux query(String index, QueryParam queryParam, Function, T> mapper) { + return this + .doQuery(new String[]{index}, queryParam) + .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); + } + + public Flux query(String[] index, QueryParam queryParam, Function, T> mapper) { + return this + .doQuery(index, queryParam) + .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); + } + + @Override + public Mono> queryPager(String[] index, QueryParam queryParam, Function, T> mapper) { + return this.doQuery(index, queryParam) + .flatMap(tp2 -> + convertQueryResult(tp2.getT1(), tp2.getT2(), mapper) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits().value, list, queryParam)) + ) + .switchIfEmpty(Mono.fromSupplier(PagerResult::empty)); + } + + private Flux convertQueryResult(List indexList, + SearchResponse response, + Function, T> mapper) { + Map metadata = indexList + .stream() + .collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity())); + + return Flux + .fromIterable(response.getHits()) + .map(hit -> { + Map hitMap = hit.getSourceAsMap(); + if (StringUtils.isEmpty(hitMap.get("id"))) { + hitMap.put("id", hit.getId()); + } + return mapper + .apply(Optional + .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0)) + .convertFromElastic(hitMap)); + }); + + } + + private Mono, SearchResponse>> doQuery(String[] index, + QueryParam queryParam) { + return indexManager + .getIndexesMetadata(index) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(metadataList -> this + .createSearchRequest(queryParam, metadataList) + .flatMap(restClient::searchForPage) + .map(response -> Tuples.of(metadataList, response)) + ).onErrorResume(err -> { + log.error(err.getMessage(), err); + return Mono.empty(); + }); + } + + + @Override + public Mono count(String[] index, QueryParam queryParam) { + QueryParam param = queryParam.clone(); + param.setPaging(false); + return createSearchRequest(param, index) + .flatMap(this::doCount) + .defaultIfEmpty(0L) + .onErrorReturn(err -> { + log.error("query elastic error", err); + return true; + }, 0L); + } + + @Override + public Mono delete(String index, QueryParam queryParam) { + + return createQueryBuilder(queryParam, index) + .flatMap(request -> restClient.deleteBy(delete -> delete.setQuery(request).indices(index))) + .map(BulkByScrollResponse::getDeleted); + } + + @Override + public Mono commit(String index, T payload) { + sink.next(new Buffer(index, payload)); + return Mono.empty(); + } + + @Override + public Mono commit(String index, Collection payload) { + for (T t : payload) { + sink.next(new Buffer(index, t)); + } + return Mono.empty(); + } + + @Override + public Mono commit(String index, Publisher data) { + return Flux.from(data) + .flatMap(d -> commit(index, d)) + .then(); + } + + @Override + public Mono save(String index, T payload) { + return save(index, Mono.just(payload)); + } + + @Override + public Mono save(String index, Publisher data) { + return Flux.from(data) + .map(v -> new Buffer(index, v)) + .collectList() + .flatMap(this::doSave) + .then(); + } + + @Override + public Mono save(String index, Collection payload) { + return save(index, Flux.fromIterable(payload)); + } + + @PreDestroy + public void shutdown() { + sink.complete(); + } + + //@PostConstruct + public void init() { + //最小间隔 + int flushRate = Integer.getInteger("elasticsearch.buffer.rate", 1000); + //缓冲最大数量 + int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000); + //缓冲超时时间 + Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3)); + //缓冲背压 + int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", 64); + + FluxUtils.bufferRate( + Flux.create(sink -> this.sink = sink), + flushRate, + bufferSize, + bufferTimeout) + .onBackpressureBuffer(bufferBackpressure, + drop -> System.err.println("无法处理更多索引请求!"), + BufferOverflowStrategy.DROP_OLDEST) + .parallel() + .runOn(Schedulers.parallel()) + .flatMap(buffers -> { + long time = System.currentTimeMillis(); + return this + .doSave(buffers) + .doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time))) + .onErrorContinue((err, obj) -> { + //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归. + System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err)); + }); + }) + .subscribe(); + } + + @AllArgsConstructor + @Getter + static class Buffer { + String index; + Object payload; + } + + + private Mono getIndexForSave(String index) { + return indexManager + .getIndexStrategy(index) + .map(strategy -> strategy.getIndexForSave(index)); + + } + + private Mono getIndexForSearch(String index) { + return indexManager + .getIndexStrategy(index) + .map(strategy -> strategy.getIndexForSearch(index)); + + } + + protected Mono doSave(Collection buffers) { + return Flux.fromIterable(buffers) + .groupBy(Buffer::getIndex) + .flatMap(group -> { + String index = group.key(); + return this.getIndexForSave(index) + .zipWith(indexManager.getIndexMetadata(index)) + .flatMapMany(tp2 -> + group.map(buffer -> { + Map data = FastBeanCopier.copy(buffer.getPayload(), HashMap::new); + + IndexRequest request; + if (data.get("id") != null) { + request = new IndexRequest(tp2.getT1()).type("_doc").id(String.valueOf(data.get("id"))); + } else { + request = new IndexRequest(tp2.getT1()).type("_doc"); + } + request.source(tp2.getT2().convertToElastic(data)); + return request; + })); + }) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(lst -> { + BulkRequest request = new BulkRequest(); + lst.forEach(request::add); + return restClient.bulk(request); + }) + .thenReturn(buffers.size()); + } + + @SneakyThrows + protected void checkResponse(BulkResponse response) { + if (response.hasFailures()) { + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + throw item.getFailure().getCause(); + } + } + } + } + + private List translate(Function, T> mapper, SearchResponse response) { + return Arrays.stream(response.getHits().getHits()) + .map(hit -> { + Map hitMap = hit.getSourceAsMap(); + if (StringUtils.isEmpty(hitMap.get("id"))) { + hitMap.put("id", hit.getId()); + } + return mapper.apply(hitMap); + }) + .collect(Collectors.toList()); + } + + private Flux doSearch(SearchRequest request) { + return restClient + .search(request) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Mono.empty(); + }); + } + + private Mono doCount(SearchRequest request) { + return restClient + .count(request) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Mono.empty(); + }); + } + + protected Mono createSearchRequest(QueryParam queryParam, String... indexes) { + return indexManager + .getIndexesMetadata(indexes) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(list -> createSearchRequest(queryParam, list)); + } + + protected Mono createSearchRequest(QueryParam queryParam, List indexes) { + + SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); + return Flux.fromIterable(indexes) + .flatMap(index -> getIndexForSearch(index.getIndex())) + .collectList() + .map(indexList -> + new SearchRequest(indexList.toArray(new String[0])) + .source(builder) + .indicesOptions(indexOptions)); + } + + protected Mono createQueryBuilder(QueryParam queryParam, String index) { + return indexManager + .getIndexMetadata(index) + .map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata)) + .switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null))); + } + + protected Mono createCountRequest(QueryParam queryParam, List indexes) { + QueryParam tempQueryParam = queryParam.clone(); + tempQueryParam.setPaging(false); + tempQueryParam.setSorts(Collections.emptyList()); + + SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); + return Flux.fromIterable(indexes) + .flatMap(index -> getIndexForSearch(index.getIndex())) + .collectList() + .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder)); + } + + private Mono createCountRequest(QueryParam queryParam, String... index) { + return indexManager + .getIndexesMetadata(index) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(list -> createCountRequest(queryParam, list)); + } +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java new file mode 100644 index 00000000..81c5009f --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java @@ -0,0 +1,29 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest; +import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; +import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import reactor.core.publisher.Mono; + +public interface ReactiveElasticsearchClient extends + org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient + , org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices { + + Mono searchForPage(SearchRequest request); + + Mono multiSearch(MultiSearchRequest request); + + Mono getMapping(GetMappingsRequest request); + + Mono getTemplate(GetIndexTemplatesRequest request); + + Mono updateTemplate(PutIndexTemplateRequest request); + +} diff --git a/jetlinks-standalone/src/main/resources/application-embedded.yml b/jetlinks-standalone/src/main/resources/application-embedded.yml index 3afcc6e4..34386f53 100644 --- a/jetlinks-standalone/src/main/resources/application-embedded.yml +++ b/jetlinks-standalone/src/main/resources/application-embedded.yml @@ -14,6 +14,14 @@ spring: password: pool: max-size: 32 + data: + elasticsearch: + client: + reactive: + endpoints: ${elasticsearch.client.host}:${elasticsearch.client.port} + max-in-memory-size: 100MB + socket-timeout: ${elasticsearch.client.socket-timeout} + connection-timeout: ${elasticsearch.client.socket-timeout} easyorm: default-schema: PUBLIC # 数据库默认的schema dialect: h2 #数据库方言 diff --git a/pom.xml b/pom.xml index e9823220..745f6c41 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ Arabba-SR6 3.8.5 4.1.50.Final - 6.8.11 + 7.9.0 1.0.0 1.0.6 1.2.70