diff --git a/docker/run-all/docker-compose.yml b/docker/run-all/docker-compose.yml
index 762c4a94..c7c65f12 100644
--- a/docker/run-all/docker-compose.yml
+++ b/docker/run-all/docker-compose.yml
@@ -59,7 +59,7 @@ services:
links:
- jetlinks:jetlinks
jetlinks:
- image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.4.0
+ image: registry.cn-shenzhen.aliyuncs.com/jetlinks/jetlinks-standalone:1.5.0-SNAPSHOT
container_name: jetlinks-ce
ports:
- 8848:8848 # API端口
diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java
index 41cdb319..0e9db4b2 100644
--- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java
+++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Version.java
@@ -8,6 +8,6 @@ public class Version {
private final String edition = "community";
- private final String version = "1.4.0";
+ private final String version = "1.5.0-SNAPSHOT";
}
diff --git a/jetlinks-components/elasticsearch-component/pom.xml b/jetlinks-components/elasticsearch-component/pom.xml
index 9a91b7d7..347c1a88 100644
--- a/jetlinks-components/elasticsearch-component/pom.xml
+++ b/jetlinks-components/elasticsearch-component/pom.xml
@@ -43,6 +43,12 @@
org.elasticsearch.client
elasticsearch-rest-high-level-client
+
+
+ org.springframework.data
+ spring-data-elasticsearch
+
+
org.hswebframework.web
hsweb-commons-crud
@@ -65,7 +71,12 @@
timeseries-component
${project.version}
-
+
+
+ io.projectreactor.netty
+ reactor-netty
+
+
\ No newline at end of file
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java
index 6782c0b1..08030245 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
* @author bsetfeng
* @since 1.0
**/
-@Service
+//@Service
@Slf4j
public class DefaultAggregationService implements AggregationService {
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java
index 15cb901b..a284f906 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/bucket/AggregationResponseHandle.java
@@ -4,12 +4,7 @@ import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.range.Range;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.metrics.avg.Avg;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.aggregations.metrics.min.Min;
-import org.elasticsearch.search.aggregations.metrics.stats.Stats;
-import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
+import org.elasticsearch.search.aggregations.metrics.*;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponseSingleValue;
import java.util.List;
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java
index 4fcf3720..b8337454 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/BucketType.java
@@ -17,9 +17,9 @@ import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket;
import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure;
import org.jetlinks.community.elastic.search.aggreation.bucket.Sort;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
-import org.joda.time.DateTimeZone;
import org.springframework.util.StringUtils;
+import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -104,7 +104,7 @@ public enum BucketType {
if (structure.getMissingValue() != null) {
builder.missing(structure.getMissingValue());
}
- builder.timeZone(DateTimeZone.getDefault());
+ builder.timeZone(ZoneId.systemDefault());
commonAggregationSetting(builder, structure);
return builder;
}
@@ -136,7 +136,7 @@ public enum BucketType {
if (sort != null) {
builder.order(mapping.get(OrderBuilder.of(sort.getOrder(), sort.getType())));
}
- builder.timeZone(DateTimeZone.getDefault());
+ builder.timeZone(ZoneId.systemDefault());
commonAggregationSetting(builder, structure);
return builder;
}
@@ -148,7 +148,7 @@ public enum BucketType {
}
};
- private String text;
+ private final String text;
public abstract AggregationBuilder aggregationBuilder(BucketAggregationsStructure structure);
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java
index 873f9278..090e12a3 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/enums/MetricsType.java
@@ -5,12 +5,7 @@ import lombok.Getter;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.metrics.avg.Avg;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.aggregations.metrics.min.Min;
-import org.elasticsearch.search.aggregations.metrics.stats.Stats;
-import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCount;
+import org.elasticsearch.search.aggregations.metrics.*;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponse;
import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsResponseSingleValue;
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java
index 2a0079e4..70af8acc 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/configuration/ElasticSearchConfiguration.java
@@ -1,5 +1,12 @@
package org.jetlinks.community.elastic.search.configuration;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ClientAuth;
+import io.netty.handler.ssl.IdentityCipherSuiteFilter;
+import io.netty.handler.ssl.JdkSslContext;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RestClient;
@@ -8,10 +15,25 @@ import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearch;
import org.jetlinks.community.elastic.search.embedded.EmbeddedElasticSearchProperties;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
+import org.jetlinks.community.elastic.search.service.reactive.DefaultReactiveElasticsearchClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.data.elasticsearch.client.ClientConfiguration;
+import org.springframework.data.elasticsearch.client.reactive.HostProvider;
+import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
+import org.springframework.data.elasticsearch.client.reactive.WebClientProvider;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.tcp.ProxyProvider;
+import reactor.netty.tcp.TcpClient;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
/**
* @author bsetfeng
@@ -34,18 +56,94 @@ public class ElasticSearchConfiguration {
this.properties = properties;
this.embeddedProperties = embeddedProperties;
}
-
@Bean
@SneakyThrows
- public ElasticRestClient elasticRestClient() {
+ public DefaultReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {
if (embeddedProperties.isEnabled()) {
log.debug("starting embedded elasticsearch on {}:{}",
embeddedProperties.getHost(),
embeddedProperties.getPort());
- new EmbeddedElasticSearch(embeddedProperties)
- .start();
+ new EmbeddedElasticSearch(embeddedProperties).start();
}
+
+ WebClientProvider provider = getWebClientProvider(clientConfiguration);
+
+ HostProvider hostProvider = HostProvider.provider(provider, clientConfiguration.getHeadersSupplier(),
+ clientConfiguration.getEndpoints().toArray(new InetSocketAddress[0]));
+
+ DefaultReactiveElasticsearchClient client =
+ new DefaultReactiveElasticsearchClient(hostProvider, new RequestCreator() {
+ });
+
+ client.setHeadersSupplier(clientConfiguration.getHeadersSupplier());
+
+ return client;
+ }
+
+ private static WebClientProvider getWebClientProvider(ClientConfiguration clientConfiguration) {
+
+ Duration connectTimeout = clientConfiguration.getConnectTimeout();
+ Duration soTimeout = clientConfiguration.getSocketTimeout();
+
+ TcpClient tcpClient = TcpClient.create();
+
+ if (!connectTimeout.isNegative()) {
+ tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
+ }
+
+ if (!soTimeout.isNegative()) {
+ tcpClient = tcpClient.doOnConnected(connection -> connection //
+ .addHandlerLast(new ReadTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS))
+ .addHandlerLast(new WriteTimeoutHandler(soTimeout.toMillis(), TimeUnit.MILLISECONDS)));
+ }
+
+ if (clientConfiguration.getProxy().isPresent()) {
+ String proxy = clientConfiguration.getProxy().get();
+ String[] hostPort = proxy.split(":");
+
+ if (hostPort.length != 2) {
+ throw new IllegalArgumentException("invalid proxy configuration " + proxy + ", should be \"host:port\"");
+ }
+ tcpClient = tcpClient.proxy(proxyOptions -> proxyOptions.type(ProxyProvider.Proxy.HTTP).host(hostPort[0])
+ .port(Integer.parseInt(hostPort[1])));
+ }
+
+ String scheme = "http";
+ HttpClient httpClient = HttpClient.from(tcpClient);
+
+ if (clientConfiguration.useSsl()) {
+
+ Optional sslContext = clientConfiguration.getSslContext();
+
+ if (sslContext.isPresent()) {
+ httpClient = httpClient.secure(sslContextSpec -> {
+ sslContextSpec.sslContext(new JdkSslContext(sslContext.get(), true, null, IdentityCipherSuiteFilter.INSTANCE,
+ ApplicationProtocolConfig.DISABLED, ClientAuth.NONE, null, false));
+ });
+ } else {
+ httpClient = httpClient.secure();
+ }
+
+ scheme = "https";
+ }
+
+ ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+ WebClientProvider provider = WebClientProvider.create(scheme, connector);
+
+ if (clientConfiguration.getPathPrefix() != null) {
+ provider = provider.withPathPrefix(clientConfiguration.getPathPrefix());
+ }
+
+ provider = provider.withDefaultHeaders(clientConfiguration.getDefaultHeaders()) //
+ .withWebClientConfigurer(clientConfiguration.getWebClientConfigurer());
+ return provider;
+ }
+
+ @Bean
+ @SneakyThrows
+ public ElasticRestClient elasticRestClient() {
+
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(properties.createHosts())
.setRequestConfigCallback(properties::applyRequestConfigBuilder)
.setHttpClientConfigCallback(properties::applyHttpAsyncClientBuilder));
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java
index 70d102a1..fbd01801 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/embedded/EmbeddedElasticSearch.java
@@ -22,17 +22,12 @@ public class EmbeddedElasticSearch extends Node {
Settings.builder()
.put("node.name", "test")
.put("discovery.type", "single-node")
- .put("transport.type", "netty4")
- .put("http.type", "netty4")
+ .put("transport.type", Netty4Plugin.NETTY_TRANSPORT_NAME)
+ .put("http.type", Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
.put("network.host", "0.0.0.0")
.put("http.port", 9200)
- ).build(), null),
+ ).build(), Collections.emptyMap(), null, () -> "default"),
Collections.singleton(Netty4Plugin.class), false);
}
- @Override
- protected void registerDerivedNodeNameWithLogger(String nodeName) {
-
- }
-
}
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java
index 9a962224..f98e69e2 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java
@@ -3,22 +3,23 @@ package org.jetlinks.community.elastic.search.index.strategies;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.indices.*;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.compress.CompressedXContent;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.*;
-import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.enums.ElasticDateFormat;
import org.jetlinks.community.elastic.search.enums.ElasticPropertyType;
import org.jetlinks.community.elastic.search.index.DefaultElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexStrategy;
-import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
+import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import reactor.core.publisher.Mono;
import java.util.*;
@@ -30,7 +31,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
@Getter
private final String id;
- protected ElasticRestClient client;
+ protected ReactiveElasticsearchClient client;
protected ElasticSearchIndexProperties properties;
@@ -39,19 +40,11 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
}
protected Mono indexExists(String index) {
- return ReactorActionListener.mono(
- actionListener ->
- client.getQueryClient()
- .indices()
- .existsAsync(new GetIndexRequest(wrapIndex(index)), RequestOptions.DEFAULT, actionListener));
+ return client.existsIndex(req -> req.indices(wrapIndex(index)));
}
protected Mono doCreateIndex(ElasticSearchIndexMetadata metadata) {
- return ReactorActionListener.mono(
- actionListener -> client.getQueryClient()
- .indices()
- .createAsync(createIndexRequest(metadata), RequestOptions.DEFAULT, actionListener))
- .then();
+ return client.createIndex(createIndexRequest(metadata));
}
protected Mono doPutIndex(ElasticSearchIndexMetadata metadata,
@@ -62,11 +55,7 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
if (exists) {
return doLoadIndexMetadata(index)
.flatMap(oldMapping -> Mono.justOrEmpty(createPutMappingRequest(metadata, oldMapping)))
- .flatMap(request -> ReactorActionListener.mono(
- actionListener ->
- client.getWriteClient()
- .indices()
- .putMappingAsync(request, RequestOptions.DEFAULT, actionListener)))
+ .flatMap(request -> client.updateMapping(request))
.then();
}
if (justUpdateMapping) {
@@ -78,21 +67,18 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
protected Mono doLoadIndexMetadata(String _index) {
String index = wrapIndex(_index);
- return ReactorActionListener
- .mono(listener -> client.getQueryClient()
- .indices()
- .getMappingAsync(new GetMappingsRequest().indices(index), RequestOptions.DEFAULT, listener))
+ return client.getMapping(new GetMappingsRequest().indices(index))
.flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.mappings().get(index))));
}
- public CreateIndexRequest createIndexRequest(ElasticSearchIndexMetadata metadata) {
+ protected CreateIndexRequest createIndexRequest(ElasticSearchIndexMetadata metadata) {
CreateIndexRequest request = new CreateIndexRequest(wrapIndex(metadata.getIndex()));
request.settings(properties.toSettings());
Map mappingConfig = new HashMap<>();
mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
mappingConfig.put("dynamic_templates", createDynamicTemplates());
- request.mapping(mappingConfig);
+ mappingConfig.forEach(request::mapping);
return request;
}
@@ -107,8 +93,10 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
log.debug("ignore update index [{}] mapping", wrapIndex(metadata.getIndex()));
return null;
}
+ Map mappingConfig = new HashMap<>();
PutMappingRequest request = new PutMappingRequest(wrapIndex(metadata.getIndex()));
- request.source(Collections.singletonMap("properties", properties));
+ mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
+ request.source(mappingConfig);
return request;
}
@@ -142,6 +130,8 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
property.put("type", "boolean");
} else if (type instanceof GeoType) {
property.put("type", "geo_point");
+ } else if (type instanceof GeoShapeType) {
+ property.put("type", "geo_shape");
} else if (type instanceof ArrayType) {
ArrayType arrayType = ((ArrayType) type);
return createElasticProperty(arrayType.getElementType());
@@ -151,14 +141,26 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
property.put("properties", createElasticProperties(objectType.getProperties()));
} else {
property.put("type", "keyword");
- property.put("ignore_above",512);
+ property.put("ignore_above", 512);
}
return property;
}
- protected ElasticSearchIndexMetadata convertMetadata(String index, MappingMetaData metaData) {
- Map response = metaData.getSourceAsMap();
- Object properties = response.get("properties");
+ protected ElasticSearchIndexMetadata convertMetadata(String index, ImmutableOpenMap metaData) {
+ MappingMetadata mappingMetadata = null;
+ if (metaData.size() == 1) {
+ Object res = metaData.values().iterator().next().value;
+ if (res instanceof MappingMetadata) {
+ mappingMetadata = ((MappingMetadata) res);
+ } else if (res instanceof CompressedXContent) {
+ mappingMetadata = new MappingMetadata(((CompressedXContent) res));
+ }
+ }
+ if (mappingMetadata == null) {
+ throw new UnsupportedOperationException("unsupported index metadata" + metaData);
+ }
+ Object properties = mappingMetadata.getSourceAsMap().get("properties");
+
return new DefaultElasticSearchIndexMetadata(index, convertProperties(properties));
}
@@ -216,5 +218,4 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc
return maps;
}
-
}
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java
index 96ece4c1..decfd9df 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/DirectElasticSearchIndexStrategy.java
@@ -1,16 +1,16 @@
package org.jetlinks.community.elastic.search.index.strategies;
-import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
+import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
public class DirectElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
- public DirectElasticSearchIndexStrategy(ElasticRestClient client, ElasticSearchIndexProperties properties) {
- super("direct", client,properties);
+ public DirectElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) {
+ super("direct", client, properties);
}
@Override
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java
index 178175e7..b6942d1f 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java
@@ -1,15 +1,11 @@
package org.jetlinks.community.elastic.search.index.strategies;
import org.elasticsearch.action.admin.indices.alias.Alias;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
-import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
-import org.elasticsearch.client.indices.PutIndexTemplateRequest;
-import org.jetlinks.community.elastic.search.ElasticRestClient;
+import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
-import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
+import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import reactor.core.publisher.Mono;
import java.util.Collections;
@@ -19,8 +15,8 @@ import java.util.Map;
public abstract class TemplateElasticSearchIndexStrategy extends AbstractElasticSearchIndexStrategy {
- public TemplateElasticSearchIndexStrategy(String id, ElasticRestClient client, ElasticSearchIndexProperties properties) {
- super(id, client,properties);
+ public TemplateElasticSearchIndexStrategy(String id, ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) {
+ super(id, client, properties);
}
protected String getTemplate(String index) {
@@ -45,15 +41,12 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
@Override
public Mono putIndex(ElasticSearchIndexMetadata metadata) {
- return ReactorActionListener
- .mono(listener -> client.getWriteClient()
- .indices()//修改索引模版
- .putTemplateAsync(createIndexTemplateRequest(metadata), RequestOptions.DEFAULT, listener))
+ return client
+ .updateTemplate(createIndexTemplateRequest(metadata))
//修改当前索引
.then(doPutIndex(metadata.newIndexName(getIndexForSave(metadata.getIndex())), true));
}
-
protected PutIndexTemplateRequest createIndexTemplateRequest(ElasticSearchIndexMetadata metadata) {
String index = wrapIndex(metadata.getIndex());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(getTemplate(index));
@@ -62,7 +55,7 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
Map mappingConfig = new HashMap<>();
mappingConfig.put("properties", createElasticProperties(metadata.getProperties()));
mappingConfig.put("dynamic_templates", createDynamicTemplates());
- request.mapping(mappingConfig);
+ request.mapping("_doc",mappingConfig);
request.patterns(getIndexPatterns(index));
return request;
}
@@ -70,11 +63,7 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
@Override
public Mono loadIndexMetadata(String index) {
-
- return ReactorActionListener
- .mono(listener -> client.getQueryClient()
- .indices()
- .getIndexTemplateAsync(new GetIndexTemplatesRequest(getTemplate(index)), RequestOptions.DEFAULT, listener))
+ return client.getTemplate(new GetIndexTemplatesRequest(getTemplate(index)))
.filter(resp -> resp.getIndexTemplates().size() > 0)
.flatMap(resp -> Mono.justOrEmpty(convertMetadata(index, resp.getIndexTemplates().get(0).mappings())));
}
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java
index 31ec0789..d0c7f276 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TimeByMonthElasticSearchIndexStrategy.java
@@ -1,8 +1,8 @@
package org.jetlinks.community.elastic.search.index.strategies;
import org.hswebframework.utils.time.DateFormatter;
-import org.jetlinks.community.elastic.search.ElasticRestClient;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexProperties;
+import org.jetlinks.community.elastic.search.service.reactive.ReactiveElasticsearchClient;
import org.springframework.stereotype.Component;
import java.util.Date;
@@ -18,7 +18,7 @@ public class TimeByMonthElasticSearchIndexStrategy extends TemplateElasticSearch
private final String format = "yyyy-MM";
- public TimeByMonthElasticSearchIndexStrategy(ElasticRestClient client, ElasticSearchIndexProperties properties) {
+ public TimeByMonthElasticSearchIndexStrategy(ReactiveElasticsearchClient client, ElasticSearchIndexProperties properties) {
super("time-by-month", client,properties);
}
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java
index 470f9c5a..5c988f4a 100644
--- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java
@@ -59,9 +59,10 @@ import java.util.stream.Collectors;
* @author zhouhao
* @since 1.0
**/
-@Service
+//@Service
@Slf4j
@DependsOn("restHighLevelClient")
+@Deprecated
public class DefaultElasticSearchService implements ElasticSearchService {
private final ElasticRestClient restClient;
@@ -138,7 +139,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
.filter(CollectionUtils::isNotEmpty)
- .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits(), list, queryParam))
+ .map(list -> PagerResult.of((int) tp2.getT2().getHits().getTotalHits().value, list, queryParam))
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
}
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java
new file mode 100644
index 00000000..bb05429a
--- /dev/null
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java
@@ -0,0 +1,1295 @@
+package org.jetlinks.community.elastic.search.service.reactive;
+
+import lombok.SneakyThrows;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.util.EntityUtils;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
+import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
+import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
+import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
+import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.get.MultiGetRequest;
+import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.main.MainRequest;
+import org.elasticsearch.action.main.MainResponse;
+import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.support.ActiveShardCount;
+import org.elasticsearch.action.support.IndicesOptions;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.lucene.uid.Versions;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.xcontent.*;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.get.GetResult;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.rest.BytesRestResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.elasticsearch.tasks.TaskId;
+import org.reactivestreams.Publisher;
+import org.springframework.data.elasticsearch.client.ClientLogger;
+import org.springframework.data.elasticsearch.client.ElasticsearchHost;
+import org.springframework.data.elasticsearch.client.NoReachableHostException;
+import org.springframework.data.elasticsearch.client.reactive.HostProvider;
+import org.springframework.data.elasticsearch.client.reactive.RequestBodyEncodingException;
+import org.springframework.data.elasticsearch.client.reactive.RequestCreator;
+import org.springframework.data.elasticsearch.client.util.NamedXContents;
+import org.springframework.data.elasticsearch.client.util.RequestConverters;
+import org.springframework.data.elasticsearch.client.util.ScrollState;
+import org.springframework.data.util.Lazy;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.web.reactive.function.BodyExtractors;
+import org.springframework.web.reactive.function.client.ClientRequest;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.ConnectException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Locale;
+import java.util.Map;
+import java.util.StringJoiner;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType;
+
+public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient {
+ private final HostProvider hostProvider;
+ private final RequestCreator requestCreator;
+ private Supplier headersSupplier = () -> HttpHeaders.EMPTY;
+
+ /**
+ * Create a new {@link org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient} using the given {@link HostProvider} to obtain server
+ * connections and the given {@link RequestCreator}.
+ *
+ * @param hostProvider must not be {@literal null}.
+ * @param requestCreator must not be {@literal null}.
+ */
+ public DefaultReactiveElasticsearchClient(HostProvider hostProvider, RequestCreator requestCreator) {
+
+ Assert.notNull(hostProvider, "HostProvider must not be null");
+ Assert.notNull(requestCreator, "RequestCreator must not be null");
+
+ this.hostProvider = hostProvider;
+ this.requestCreator = requestCreator;
+ }
+
+ public void setHeadersSupplier(Supplier headersSupplier) {
+
+ Assert.notNull(headersSupplier, "headersSupplier must not be null");
+
+ this.headersSupplier = headersSupplier;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders)
+ */
+ @Override
+ public Mono ping(HttpHeaders headers) {
+
+ return sendRequest(new MainRequest(), requestCreator.ping(), RawActionResponse.class, headers) //
+ .map(response -> response.statusCode().is2xxSuccessful()) //
+ .onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders)
+ */
+ @Override
+ public Mono info(HttpHeaders headers) {
+
+ return sendRequest(new MainRequest(), requestCreator.info(), MainResponse.class, headers) //
+ .next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
+ */
+ @Override
+ public Mono get(HttpHeaders headers, GetRequest getRequest) {
+
+ return sendRequest(getRequest, requestCreator.get(), GetResponse.class, headers) //
+ .filter(GetResponse::isExists) //
+ .map(DefaultReactiveElasticsearchClient::getResponseToGetResult) //
+ .next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#multiGet(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.MultiGetRequest)
+ */
+ @Override
+ public Flux multiGet(HttpHeaders headers, MultiGetRequest multiGetRequest) {
+
+ return sendRequest(multiGetRequest, requestCreator.multiGet(), MultiGetResponse.class, headers)
+ .map(MultiGetResponse::getResponses) //
+ .flatMap(Flux::fromArray) //
+ .filter(it -> !it.isFailed() && it.getResponse().isExists()) //
+ .map(it -> DefaultReactiveElasticsearchClient.getResponseToGetResult(it.getResponse()));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
+ */
+ @Override
+ public Mono exists(HttpHeaders headers, GetRequest getRequest) {
+
+ return sendRequest(getRequest, requestCreator.exists(), RawActionResponse.class, headers) //
+ .map(response -> response.statusCode().is2xxSuccessful()) //
+ .next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest)
+ */
+ @Override
+ public Mono index(HttpHeaders headers, IndexRequest indexRequest) {
+ return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).publishNext();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
+ */
+ @Override
+ public Indices indices() {
+ return this;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest)
+ */
+ @Override
+ public Mono update(HttpHeaders headers, UpdateRequest updateRequest) {
+ return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).publishNext();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest)
+ */
+ @Override
+ public Mono delete(HttpHeaders headers, DeleteRequest deleteRequest) {
+
+ return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) //
+ .publishNext();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
+ */
+ @Override
+ public Mono count(HttpHeaders headers, SearchRequest searchRequest) {
+ searchRequest.source().trackTotalHits(true);
+ searchRequest.source().size(0);
+ searchRequest.source().fetchSource(false);
+ return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
+ .map(SearchResponse::getHits) //
+ .map(searchHits -> searchHits.getTotalHits().value) //
+ .next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
+ */
+ @Override
+ public Flux search(HttpHeaders headers, SearchRequest searchRequest) {
+
+ return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
+ .map(SearchResponse::getHits) //
+ .flatMap(Flux::fromIterable);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
+ */
+ @Override
+ public Flux aggregate(HttpHeaders headers, SearchRequest searchRequest) {
+
+ Assert.notNull(headers, "headers must not be null");
+ Assert.notNull(searchRequest, "searchRequest must not be null");
+
+ searchRequest.source().size(0);
+ searchRequest.source().trackTotalHits(false);
+
+ return sendRequest(searchRequest, requestCreator.search(), SearchResponse.class, headers) //
+ .map(SearchResponse::getAggregations) //
+ .flatMap(Flux::fromIterable);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
+ */
+ @Override
+ public Flux scroll(HttpHeaders headers, SearchRequest searchRequest) {
+
+ TimeValue scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll().keepAlive()
+ : TimeValue.timeValueMinutes(1);
+
+ if (searchRequest.scroll() == null) {
+ searchRequest.scroll(scrollTimeout);
+ }
+
+ EmitterProcessor outbound = EmitterProcessor.create(false);
+ FluxSink request = outbound.sink();
+
+ EmitterProcessor inbound = EmitterProcessor.create(false);
+
+ Flux exchange = outbound.startWith(searchRequest).flatMap(it -> {
+
+ if (it instanceof SearchRequest) {
+ return sendRequest((SearchRequest) it, requestCreator.search(), SearchResponse.class, headers);
+ } else if (it instanceof SearchScrollRequest) {
+ return sendRequest((SearchScrollRequest) it, requestCreator.scroll(), SearchResponse.class, headers);
+ } else if (it instanceof ClearScrollRequest) {
+ return sendRequest((ClearScrollRequest) it, requestCreator.clearScroll(), ClearScrollResponse.class, headers)
+ .flatMap(discard -> Flux.empty());
+ }
+
+ throw new IllegalArgumentException(
+ String.format("Cannot handle '%s'. Please make sure to use a 'SearchRequest' or 'SearchScrollRequest'.", it));
+ });
+
+ return Flux.usingWhen(Mono.fromSupplier(ScrollState::new),
+
+ scrollState -> {
+
+ Flux searchHits = inbound.handle((searchResponse, sink) -> {
+
+ scrollState.updateScrollId(searchResponse.getScrollId());
+ if (isEmpty(searchResponse.getHits())) {
+
+ inbound.onComplete();
+ outbound.onComplete();
+
+ } else {
+
+ sink.next(searchResponse);
+
+ SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollState.getScrollId())
+ .scroll(scrollTimeout);
+ request.next(searchScrollRequest);
+ }
+
+ }).map(SearchResponse::getHits) //
+ .flatMap(Flux::fromIterable);
+
+ return searchHits.doOnSubscribe(ignore -> exchange.subscribe(inbound));
+
+ },
+ state -> cleanupScroll(headers, state), //
+ (state,error) -> cleanupScroll(headers, state), //
+ state -> cleanupScroll(headers, state)); //
+ }
+
+ private static boolean isEmpty(@Nullable SearchHits hits) {
+ return hits != null && hits.getHits() != null && hits.getHits().length == 0;
+ }
+
+ private Publisher> cleanupScroll(HttpHeaders headers, ScrollState state) {
+
+ if (state.getScrollIds().isEmpty()) {
+ return Mono.empty();
+ }
+
+ ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.scrollIds(state.getScrollIds());
+
+ // just send the request, resources get cleaned up anyways after scrollTimeout has been reached.
+ return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
+ */
+ @Override
+ public Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
+
+ return sendRequest(deleteRequest, requestCreator.deleteByQuery(), BulkByScrollResponse.class, headers) //
+ .publishNext();
+ }
+
+ static XContentType enforceSameContentType(IndexRequest indexRequest, @Nullable XContentType xContentType) {
+ XContentType requestContentType = indexRequest.getContentType();
+ if (requestContentType != XContentType.JSON && requestContentType != XContentType.SMILE) {
+ throw new IllegalArgumentException("Unsupported content-type found for request with content-type ["
+ + requestContentType + "], only JSON and SMILE are supported");
+ }
+ if (xContentType == null) {
+ return requestContentType;
+ }
+ if (requestContentType != xContentType) {
+ throw new IllegalArgumentException("Mismatching content-type found for request with content-type ["
+ + requestContentType + "], previous requests have content-type [" + xContentType + ']');
+ }
+ return xContentType;
+ }
+
+ @SneakyThrows
+ Request convertBulk(BulkRequest bulkRequest) {
+ Request request = new Request(HttpMethod.POST.name(), "/_bulk");
+
+ Params parameters = new Params(request);
+ parameters.withTimeout(bulkRequest.timeout());
+ parameters.withRefreshPolicy(bulkRequest.getRefreshPolicy());
+
+ // parameters.withPipeline(bulkRequest.pipeline());
+ // parameters.withRouting(bulkRequest.routing());
+
+ // Bulk API only supports newline delimited JSON or Smile. Before executing
+ // the bulk, we need to check that all requests have the same content-type
+ // and this content-type is supported by the Bulk API.
+ XContentType bulkContentType = null;
+ for (int i = 0; i < bulkRequest.numberOfActions(); i++) {
+ DocWriteRequest> action = bulkRequest.requests().get(i);
+
+ DocWriteRequest.OpType opType = action.opType();
+ if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
+ bulkContentType = enforceSameContentType((IndexRequest) action, bulkContentType);
+
+ } else if (opType == DocWriteRequest.OpType.UPDATE) {
+ UpdateRequest updateRequest = (UpdateRequest) action;
+ if (updateRequest.doc() != null) {
+ bulkContentType = enforceSameContentType(updateRequest.doc(), bulkContentType);
+ }
+ if (updateRequest.upsertRequest() != null) {
+ bulkContentType = enforceSameContentType(updateRequest.upsertRequest(), bulkContentType);
+ }
+ }
+ }
+
+ if (bulkContentType == null) {
+ bulkContentType = XContentType.JSON;
+ }
+
+ final byte separator = bulkContentType.xContent().streamSeparator();
+ final ContentType requestContentType = createContentType(bulkContentType);
+
+ ByteArrayOutputStream content = new ByteArrayOutputStream();
+ for (DocWriteRequest> action : bulkRequest.requests()) {
+ DocWriteRequest.OpType opType = action.opType();
+
+ try (XContentBuilder metadata = XContentBuilder.builder(bulkContentType.xContent())) {
+ metadata.startObject();
+ {
+ metadata.startObject(opType.getLowercase());
+ if (Strings.hasLength(action.index())) {
+ metadata.field("_index", action.index());
+ }
+ if (Strings.hasLength(action.type())) {
+ metadata.field("_type", action.type());
+ }
+ if (Strings.hasLength(action.id())) {
+ metadata.field("_id", action.id());
+ }
+ if (Strings.hasLength(action.routing())) {
+ metadata.field("routing", action.routing());
+ }
+ if (action.version() != Versions.MATCH_ANY) {
+ metadata.field("version", action.version());
+ }
+
+ VersionType versionType = action.versionType();
+ if (versionType != VersionType.INTERNAL) {
+ if (versionType == VersionType.EXTERNAL) {
+ metadata.field("version_type", "external");
+ } else if (versionType == VersionType.EXTERNAL_GTE) {
+ metadata.field("version_type", "external_gte");
+ }
+ }
+
+ if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+ metadata.field("if_seq_no", action.ifSeqNo());
+ metadata.field("if_primary_term", action.ifPrimaryTerm());
+ }
+
+ if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
+ IndexRequest indexRequest = (IndexRequest) action;
+ if (Strings.hasLength(indexRequest.getPipeline())) {
+ metadata.field("pipeline", indexRequest.getPipeline());
+ }
+ } else if (opType == DocWriteRequest.OpType.UPDATE) {
+ UpdateRequest updateRequest = (UpdateRequest) action;
+ if (updateRequest.retryOnConflict() > 0) {
+ metadata.field("retry_on_conflict", updateRequest.retryOnConflict());
+ }
+ if (updateRequest.fetchSource() != null) {
+ metadata.field("_source", updateRequest.fetchSource());
+ }
+ }
+ metadata.endObject();
+ }
+ metadata.endObject();
+
+ BytesRef metadataSource = BytesReference.bytes(metadata).toBytesRef();
+ content.write(metadataSource.bytes, metadataSource.offset, metadataSource.length);
+ content.write(separator);
+ }
+
+ BytesRef source = null;
+ if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
+ IndexRequest indexRequest = (IndexRequest) action;
+ BytesReference indexSource = indexRequest.source();
+ XContentType indexXContentType = indexRequest.getContentType();
+
+ try (XContentParser parser = XContentHelper.createParser(
+ /*
+ * EMPTY and THROW are fine here because we just call
+ * copyCurrentStructure which doesn't touch the
+ * registry or deprecation.
+ */
+ NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, indexSource,
+ indexXContentType)) {
+ try (XContentBuilder builder = XContentBuilder.builder(bulkContentType.xContent())) {
+ builder.copyCurrentStructure(parser);
+ source = BytesReference.bytes(builder).toBytesRef();
+ }
+ }
+ } else if (opType == DocWriteRequest.OpType.UPDATE) {
+ source = XContentHelper.toXContent((UpdateRequest) action, bulkContentType, false).toBytesRef();
+ }
+
+ if (source != null) {
+ content.write(source.bytes, source.offset, source.length);
+ content.write(separator);
+ }
+ }
+ request.setEntity(new ByteArrayEntity(content.toByteArray(), 0, content.size(), requestContentType));
+ return request;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
+ */
+ @Override
+ public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) {
+ return sendRequest(bulkRequest, this::convertBulk, BulkResponse.class, headers) //
+ .publishNext();
+ }
+
+ // --> INDICES
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#existsIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.get.GetIndexRequest)
+ */
+ @Override
+ public Mono existsIndex(HttpHeaders headers, GetIndexRequest request) {
+
+ return sendRequest(request, requestCreator.indexExists(), RawActionResponse.class, headers) //
+ .map(response -> response.statusCode().is2xxSuccessful()) //
+ .next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#deleteIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest)
+ */
+ @Override
+ public Mono deleteIndex(HttpHeaders headers, DeleteIndexRequest request) {
+
+ return sendRequest(request, requestCreator.indexDelete(), AcknowledgedResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#createIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.create.CreateIndexRequest)
+ */
+ @Override
+ public Mono createIndex(HttpHeaders headers, CreateIndexRequest createIndexRequest) {
+
+ return sendRequest(createIndexRequest, requestCreator.indexCreate().andThen(request -> {
+ request.addParameter("include_type_name","true");
+ return request;
+ }), AcknowledgedResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#openIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.open.OpenIndexRequest)
+ */
+ @Override
+ public Mono openIndex(HttpHeaders headers, OpenIndexRequest request) {
+
+ return sendRequest(request, requestCreator.indexOpen(), AcknowledgedResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#closeIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.close.CloseIndexRequest)
+ */
+ @Override
+ public Mono closeIndex(HttpHeaders headers, CloseIndexRequest closeIndexRequest) {
+
+ return sendRequest(closeIndexRequest, requestCreator.indexClose(), AcknowledgedResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#refreshIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.refresh.RefreshRequest)
+ */
+ @Override
+ public Mono refreshIndex(HttpHeaders headers, RefreshRequest refreshRequest) {
+
+ return sendRequest(refreshRequest, requestCreator.indexRefresh(), RefreshResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#updateMapping(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest)
+ */
+ @Override
+ public Mono updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) {
+
+ return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices#flushIndex(org.springframework.http.HttpHeaders, org.elasticsearch.action.admin.indices.flush.FlushRequest)
+ */
+ @Override
+ public Mono flushIndex(HttpHeaders headers, FlushRequest flushRequest) {
+
+ return sendRequest(flushRequest, requestCreator.flushIndex(), FlushResponse.class, headers) //
+ .then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback)
+ */
+ @Override
+ public Mono execute(ReactiveElasticsearchClientCallback callback) {
+
+ return this.hostProvider.getActive(HostProvider.Verification.LAZY) //
+ .flatMap(callback::doWithClient) //
+ .onErrorResume(throwable -> {
+
+ if (throwable instanceof ConnectException) {
+
+ return hostProvider.getActive(HostProvider.Verification.ACTIVE) //
+ .flatMap(callback::doWithClient);
+ }
+
+ return Mono.error(throwable);
+ });
+ }
+
+ @Override
+ public Mono status() {
+
+ return hostProvider.clusterInfo() //
+ .map(it -> new ClientStatus(it.getNodes()));
+ }
+
+ // --> Private Response helpers
+
+ private static GetResult getResponseToGetResult(GetResponse response) {
+
+ return new GetResult(response.getIndex(), response.getType(), response.getId(), response.getSeqNo(),
+ response.getPrimaryTerm(), response.getVersion(), response.isExists(), response.getSourceAsBytesRef(),
+ response.getFields(), null);
+ }
+
+ // -->
+
+ private Flux sendRequest(Req request, Function converter,
+ Class responseType, HttpHeaders headers) {
+ return sendRequest(converter.apply(request), responseType, headers);
+ }
+
+ private Flux sendRequest(Request request, Class responseType, HttpHeaders headers) {
+
+ String logId = ClientLogger.newLogId();
+
+ return execute(webClient -> sendRequest(webClient, logId, request, headers))
+ .flatMapMany(response -> readResponseBody(logId, request, response, responseType));
+ }
+
+ private Mono sendRequest(WebClient webClient, String logId, Request request, HttpHeaders headers) {
+
+ WebClient.RequestBodySpec requestBodySpec = webClient.method(HttpMethod.valueOf(request.getMethod().toUpperCase())) //
+ .uri(builder -> {
+
+ builder = builder.path(request.getEndpoint());
+
+ if (!ObjectUtils.isEmpty(request.getParameters())) {
+ for (Map.Entry entry : request.getParameters().entrySet()) {
+ builder = builder.queryParam(entry.getKey(), entry.getValue());
+ }
+ }
+ return builder.build();
+ }) //
+ .attribute(ClientRequest.LOG_ID_ATTRIBUTE, logId) //
+ .headers(theHeaders -> {
+
+ // add all the headers explicitly set
+ theHeaders.addAll(headers);
+
+ // and now those that might be set on the request.
+ if (request.getOptions() != null) {
+
+ if (!ObjectUtils.isEmpty(request.getOptions().getHeaders())) {
+ request.getOptions().getHeaders().forEach(it -> theHeaders.add(it.getName(), it.getValue()));
+ }
+ }
+
+ // plus the ones from the supplier
+ HttpHeaders suppliedHeaders = headersSupplier.get();
+ if (suppliedHeaders != null && suppliedHeaders != HttpHeaders.EMPTY) {
+ theHeaders.addAll(suppliedHeaders);
+ }
+ });
+
+ if (request.getEntity() != null) {
+
+ Lazy body = bodyExtractor(request);
+
+ ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters(),
+ body::get);
+
+ requestBodySpec.contentType(MediaType.valueOf(request.getEntity().getContentType().getValue()));
+ requestBodySpec.body(Mono.fromSupplier(body), String.class);
+ } else {
+ ClientLogger.logRequest(logId, request.getMethod().toUpperCase(), request.getEndpoint(), request.getParameters());
+ }
+
+ return requestBodySpec //
+ .exchange() //
+ .onErrorReturn(ConnectException.class, ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE).build());
+ }
+
+ private Lazy bodyExtractor(Request request) {
+
+ return Lazy.of(() -> {
+
+ try {
+ return EntityUtils.toString(request.getEntity());
+ } catch (IOException e) {
+ throw new RequestBodyEncodingException("Error encoding request", e);
+ }
+ });
+ }
+
+ private Publisher extends T> readResponseBody(String logId, Request request, ClientResponse response,
+ Class responseType) {
+
+ if (RawActionResponse.class.equals(responseType)) {
+
+ ClientLogger.logRawResponse(logId, response.statusCode());
+ return Mono.just(responseType.cast(RawActionResponse.create(response)));
+ }
+
+ if (response.statusCode().is5xxServerError()) {
+
+ ClientLogger.logRawResponse(logId, response.statusCode());
+ return handleServerError(request, response);
+ }
+
+ if (response.statusCode().is4xxClientError()) {
+
+ ClientLogger.logRawResponse(logId, response.statusCode());
+ return handleClientError(logId, request, response, responseType);
+ }
+
+ return response.body(BodyExtractors.toMono(byte[].class)) //
+ .map(it -> new String(it, StandardCharsets.UTF_8)) //
+ .doOnNext(it -> ClientLogger.logResponse(logId, response.statusCode(), it)) //
+ .flatMap(content -> doDecode(response, responseType, content));
+ }
+
+ private static Mono doDecode(ClientResponse response, Class responseType, String content) {
+
+ String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+
+ try {
+
+ Method fromXContent = ReflectionUtils.findMethod(responseType, "fromXContent", XContentParser.class);
+
+ return Mono.justOrEmpty(responseType
+ .cast(ReflectionUtils.invokeMethod(fromXContent, responseType, createParser(mediaType, content))));
+
+ } catch (Throwable errorParseFailure) { // cause elasticsearch also uses AssertionError
+
+ try {
+ return Mono.error(BytesRestResponse.errorFromXContent(createParser(mediaType, content)));
+ } catch (Exception e) {
+
+ return Mono
+ .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value())));
+ }
+ }
+ }
+
+ private static XContentParser createParser(String mediaType, String content) throws IOException {
+ return XContentType.fromMediaTypeOrFormat(mediaType) //
+ .xContent() //
+ .createParser(new NamedXContentRegistry(NamedXContents.getDefaultNamedXContents()),
+ DeprecationHandler.THROW_UNSUPPORTED_OPERATION, content);
+ }
+
+ private Publisher extends T> handleServerError(Request request, ClientResponse response) {
+
+ int statusCode = response.statusCode().value();
+ RestStatus status = RestStatus.fromCode(statusCode);
+ String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+
+ return response.body(BodyExtractors.toMono(byte[].class)) //
+ .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
+ .flatMap(content -> contentOrError(content, mediaType, status))
+ .flatMap(unused -> Mono
+ .error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s.",
+ request.getMethod(), request.getEndpoint(), statusCode), status)));
+ }
+
+ private Publisher extends T> handleClientError(String logId, Request request, ClientResponse response,
+ Class responseType) {
+
+ int statusCode = response.statusCode().value();
+ RestStatus status = RestStatus.fromCode(statusCode);
+ String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
+
+ return response.body(BodyExtractors.toMono(byte[].class)) //
+ .map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
+ .flatMap(content -> contentOrError(content, mediaType, status)) //
+ .doOnNext(content -> ClientLogger.logResponse(logId, response.statusCode(), content)) //
+ .flatMap(content -> doDecode(response, responseType, content));
+ }
+
+ // region ElasticsearchException helper
+
+ /**
+ * checks if the given content body contains an {@link ElasticsearchException}, if yes it is returned in a Mono.error.
+ * Otherwise the content is returned in the Mono
+ *
+ * @param content the content to analyze
+ * @param mediaType the returned media type
+ * @param status the response status
+ * @return a Mono with the content or an Mono.error
+ */
+ private static Mono contentOrError(String content, String mediaType, RestStatus status) {
+
+ ElasticsearchException exception = getElasticsearchException(content, mediaType, status);
+
+ if (exception != null) {
+ StringBuilder sb = new StringBuilder();
+ buildExceptionMessages(sb, exception);
+ return Mono.error(new ElasticsearchStatusException(sb.toString(), status, exception));
+ }
+
+ return Mono.just(content);
+ }
+
+ /**
+ * tries to parse an {@link ElasticsearchException} from the given body content
+ *
+ * @param content the content to analyse
+ * @param mediaType the type of the body content
+ * @return an {@link ElasticsearchException} or {@literal null}.
+ */
+ @Nullable
+ private static ElasticsearchException getElasticsearchException(String content, String mediaType, RestStatus status) {
+
+ try {
+ XContentParser parser = createParser(mediaType, content);
+ // we have a JSON object with an error and a status field
+ XContentParser.Token token = parser.nextToken(); // Skip START_OBJECT
+
+ do {
+ token = parser.nextToken();
+
+ if ("error".equals(parser.currentName())) {
+ return ElasticsearchException.failureFromXContent(parser);
+ }
+ } while (token == XContentParser.Token.FIELD_NAME);
+
+ return null;
+ } catch (IOException e) {
+ return new ElasticsearchStatusException(content, status);
+ }
+ }
+
+ private static void buildExceptionMessages(StringBuilder sb, Throwable t) {
+
+ sb.append(t.getMessage());
+ for (Throwable throwable : t.getSuppressed()) {
+ sb.append(", ");
+ buildExceptionMessages(sb, throwable);
+ }
+ }
+
+ @Override
+ public Mono searchForPage(SearchRequest request) {
+
+ return sendRequest(request, requestCreator.search(), SearchResponse.class, HttpHeaders.EMPTY)
+ .singleOrEmpty();
+ }
+
+ @SneakyThrows
+ protected Request convertMultiSearchRequest(MultiSearchRequest searchRequest) {
+ return RequestConverters.multiSearch(searchRequest);
+ }
+
+ @Override
+ @SneakyThrows
+ public Mono multiSearch(MultiSearchRequest request) {
+
+ return sendRequest(request, this::convertMultiSearchRequest, MultiSearchResponse.class, HttpHeaders.EMPTY)
+ .singleOrEmpty();
+ }
+
+ Request convertGetMappingRequest(GetMappingsRequest getMappingsRequest) {
+ String[] indices = getMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getMappingsRequest.indices();
+
+ Request request = new Request(HttpGet.METHOD_NAME, "/" + String.join(",", indices) + "/_mapping");
+
+ Params parameters = new Params(request);
+ parameters.withMasterTimeout(getMappingsRequest.masterNodeTimeout());
+ parameters.withIndicesOptions(getMappingsRequest.indicesOptions());
+ parameters.withLocal(getMappingsRequest.local());
+
+ return request;
+ }
+
+
+ @Override
+ public Mono getMapping(GetMappingsRequest request) {
+
+ return sendRequest(request, this::convertGetMappingRequest, GetMappingsResponse.class, HttpHeaders.EMPTY)
+ .singleOrEmpty();
+ }
+
+ Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) {
+ final Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names()));
+
+ return request;
+ }
+
+ @Override
+ public Mono getTemplate(GetIndexTemplatesRequest request) {
+ return sendRequest(request, this::convertGetIndexTemplateRequest, GetIndexTemplatesResponse.class, HttpHeaders.EMPTY)
+ .singleOrEmpty();
+ }
+
+ @SneakyThrows
+ Request convertPutIndexTemplateRequest(PutIndexTemplateRequest putIndexTemplateRequest) {
+ Request request = new Request(HttpPut.METHOD_NAME, "/_template/" + putIndexTemplateRequest.name());
+ Params params = new Params(request);
+ params.withMasterTimeout(putIndexTemplateRequest.masterNodeTimeout());
+ if (putIndexTemplateRequest.create()) {
+ params.putParam("create", Boolean.TRUE.toString());
+ }
+ if (Strings.hasText(putIndexTemplateRequest.cause())) {
+ params.putParam("cause", putIndexTemplateRequest.cause());
+ }
+ params.putParam("include_type_name","true");
+ BytesRef source = XContentHelper.toXContent(putIndexTemplateRequest, XContentType.JSON, false).toBytesRef();
+ request.setEntity(new ByteArrayEntity(source.bytes, source.offset, source.length, ContentType.APPLICATION_JSON));
+ return request;
+ }
+
+ @Override
+ public Mono updateTemplate(PutIndexTemplateRequest request) {
+ return sendRequest(request, this::convertPutIndexTemplateRequest, AcknowledgedResponse.class, HttpHeaders.EMPTY)
+ .singleOrEmpty();
+ }
+
+ // endregion
+
+ // region internal classes
+
+ /**
+ * Reactive client {@link Status} implementation.
+ *
+ * @author Christoph Strobl
+ */
+ class ClientStatus implements Status {
+
+ private final Collection connectedHosts;
+
+ ClientStatus(Collection connectedHosts) {
+ this.connectedHosts = connectedHosts;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Status#hosts()
+ */
+ @Override
+ public Collection hosts() {
+ return connectedHosts;
+ }
+ }
+
+ static class Params {
+ private final Request request;
+
+ Params(Request request) {
+ this.request = request;
+ }
+
+ Params putParam(String name, String value) {
+ if (Strings.hasLength(value)) {
+ request.addParameter(name, value);
+ }
+ return this;
+ }
+
+ Params putParam(String key, TimeValue value) {
+ if (value != null) {
+ return putParam(key, value.getStringRep());
+ }
+ return this;
+ }
+
+ Params withDocAsUpsert(boolean docAsUpsert) {
+ if (docAsUpsert) {
+ return putParam("doc_as_upsert", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+ Params withFetchSourceContext(FetchSourceContext fetchSourceContext) {
+ if (fetchSourceContext != null) {
+ if (!fetchSourceContext.fetchSource()) {
+ putParam("_source", Boolean.FALSE.toString());
+ }
+ if (fetchSourceContext.includes() != null && fetchSourceContext.includes().length > 0) {
+ putParam("_source_includes", String.join(",", fetchSourceContext.includes()));
+ }
+ if (fetchSourceContext.excludes() != null && fetchSourceContext.excludes().length > 0) {
+ putParam("_source_excludes", String.join(",", fetchSourceContext.excludes()));
+ }
+ }
+ return this;
+ }
+
+ Params withFields(String[] fields) {
+ if (fields != null && fields.length > 0) {
+ return putParam("fields", String.join(",", fields));
+ }
+ return this;
+ }
+
+ Params withMasterTimeout(TimeValue masterTimeout) {
+ return putParam("master_timeout", masterTimeout);
+ }
+
+ Params withPipeline(String pipeline) {
+ return putParam("pipeline", pipeline);
+ }
+
+ Params withPreference(String preference) {
+ return putParam("preference", preference);
+ }
+
+ Params withRealtime(boolean realtime) {
+ if (!realtime) {
+ return putParam("realtime", Boolean.FALSE.toString());
+ }
+ return this;
+ }
+
+ Params withRefresh(boolean refresh) {
+ if (refresh) {
+ return withRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+ }
+ return this;
+ }
+
+ Params withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
+ if (refreshPolicy != WriteRequest.RefreshPolicy.NONE) {
+ return putParam("refresh", refreshPolicy.getValue());
+ }
+ return this;
+ }
+
+ Params withRequestsPerSecond(float requestsPerSecond) {
+ // the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
+ // but we don't want to add that to the URL parameters, instead we use -1
+ if (Float.isFinite(requestsPerSecond)) {
+ return putParam("requests_per_second", Float.toString(requestsPerSecond));
+ } else {
+ return putParam("requests_per_second", "-1");
+ }
+ }
+
+ Params withRetryOnConflict(int retryOnConflict) {
+ if (retryOnConflict > 0) {
+ return putParam("retry_on_conflict", String.valueOf(retryOnConflict));
+ }
+ return this;
+ }
+
+ Params withRouting(String routing) {
+ return putParam("routing", routing);
+ }
+
+ Params withStoredFields(String[] storedFields) {
+ if (storedFields != null && storedFields.length > 0) {
+ return putParam("stored_fields", String.join(",", storedFields));
+ }
+ return this;
+ }
+
+ Params withTimeout(TimeValue timeout) {
+ return putParam("timeout", timeout);
+ }
+
+ Params withVersion(long version) {
+ if (version != Versions.MATCH_ANY) {
+ return putParam("version", Long.toString(version));
+ }
+ return this;
+ }
+
+ Params withVersionType(VersionType versionType) {
+ if (versionType != VersionType.INTERNAL) {
+ return putParam("version_type", versionType.name().toLowerCase(Locale.ROOT));
+ }
+ return this;
+ }
+
+ Params withIfSeqNo(long seqNo) {
+ if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
+ return putParam("if_seq_no", Long.toString(seqNo));
+ }
+ return this;
+ }
+
+ Params withIfPrimaryTerm(long primaryTerm) {
+ if (primaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
+ return putParam("if_primary_term", Long.toString(primaryTerm));
+ }
+ return this;
+ }
+
+ Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
+ return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
+ }
+
+ Params withWaitForActiveShards(ActiveShardCount activeShardCount, ActiveShardCount defaultActiveShardCount) {
+ if (activeShardCount != null && activeShardCount != defaultActiveShardCount) {
+ // in Elasticsearch 7, "default" cannot be sent anymore, so it needs to be mapped to the default value of 1
+ String value = activeShardCount == ActiveShardCount.DEFAULT ? "1"
+ : activeShardCount.toString().toLowerCase(Locale.ROOT);
+ return putParam("wait_for_active_shards", value);
+ }
+ return this;
+ }
+
+ Params withIndicesOptions(IndicesOptions indicesOptions) {
+ withIgnoreUnavailable(indicesOptions.ignoreUnavailable());
+ putParam("allow_no_indices", Boolean.toString(indicesOptions.allowNoIndices()));
+ String expandWildcards;
+ if (!indicesOptions.expandWildcardsOpen() && !indicesOptions.expandWildcardsClosed()) {
+ expandWildcards = "none";
+ } else {
+ StringJoiner joiner = new StringJoiner(",");
+ if (indicesOptions.expandWildcardsOpen()) {
+ joiner.add("open");
+ }
+ if (indicesOptions.expandWildcardsClosed()) {
+ joiner.add("closed");
+ }
+ expandWildcards = joiner.toString();
+ }
+ putParam("expand_wildcards", expandWildcards);
+ return this;
+ }
+
+ Params withIgnoreUnavailable(boolean ignoreUnavailable) {
+ // Always explicitly place the ignore_unavailable value.
+ putParam("ignore_unavailable", Boolean.toString(ignoreUnavailable));
+ return this;
+ }
+
+ Params withHuman(boolean human) {
+ if (human) {
+ putParam("human", "true");
+ }
+ return this;
+ }
+
+ Params withLocal(boolean local) {
+ if (local) {
+ putParam("local", "true");
+ }
+ return this;
+ }
+
+ Params withIncludeDefaults(boolean includeDefaults) {
+ if (includeDefaults) {
+ return putParam("include_defaults", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+ Params withPreserveExisting(boolean preserveExisting) {
+ if (preserveExisting) {
+ return putParam("preserve_existing", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+ Params withDetailed(boolean detailed) {
+ if (detailed) {
+ return putParam("detailed", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+ Params withWaitForCompletion(Boolean waitForCompletion) {
+ return putParam("wait_for_completion", waitForCompletion.toString());
+ }
+
+ Params withNodes(String[] nodes) {
+ if (nodes != null && nodes.length > 0) {
+ return putParam("nodes", String.join(",", nodes));
+ }
+ return this;
+ }
+
+ Params withActions(String[] actions) {
+ if (actions != null && actions.length > 0) {
+ return putParam("actions", String.join(",", actions));
+ }
+ return this;
+ }
+
+ Params withTaskId(TaskId taskId) {
+ if (taskId != null && taskId.isSet()) {
+ return putParam("task_id", taskId.toString());
+ }
+ return this;
+ }
+
+ Params withParentTaskId(TaskId parentTaskId) {
+ if (parentTaskId != null && parentTaskId.isSet()) {
+ return putParam("parent_task_id", parentTaskId.toString());
+ }
+ return this;
+ }
+
+ Params withVerify(boolean verify) {
+ if (verify) {
+ return putParam("verify", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+ Params withWaitForStatus(ClusterHealthStatus status) {
+ if (status != null) {
+ return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT));
+ }
+ return this;
+ }
+
+ Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) {
+ if (waitNoRelocatingShards) {
+ return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+
+ Params withWaitForNoInitializingShards(boolean waitNoInitShards) {
+ if (waitNoInitShards) {
+ return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString());
+ }
+ return this;
+ }
+
+ Params withWaitForNodes(String waitForNodes) {
+ return putParam("wait_for_nodes", waitForNodes);
+ }
+
+ Params withLevel(ClusterHealthRequest.Level level) {
+ return putParam("level", level.name().toLowerCase(Locale.ROOT));
+ }
+
+ Params withWaitForEvents(Priority waitForEvents) {
+ if (waitForEvents != null) {
+ return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT));
+ }
+ return this;
+ }
+
+ }
+
+}
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java
new file mode 100644
index 00000000..fc4de112
--- /dev/null
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/RawActionResponse.java
@@ -0,0 +1,59 @@
+package org.jetlinks.community.elastic.search.service.reactive;
+
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.client.reactive.ClientHttpResponse;
+import org.springframework.web.reactive.function.BodyExtractor;
+import org.springframework.web.reactive.function.client.ClientResponse;
+
+import java.io.IOException;
+
+/**
+ * Extension to {@link ActionResponse} that also delegates to {@link ClientResponse}.
+ *
+ * @author Christoph Strobl
+ * @author Peter-Josef Meisch
+ * @author Mark Paluch
+ * @since 3.2
+ */
+class RawActionResponse extends ActionResponse {
+
+ private final ClientResponse delegate;
+
+ private RawActionResponse(ClientResponse delegate) {
+ this.delegate = delegate;
+ }
+
+ static RawActionResponse create(ClientResponse response) {
+ return new RawActionResponse(response);
+ }
+
+ public HttpStatus statusCode() {
+ return delegate.statusCode();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.web.reactive.function.client.ClientResponse#headers()
+ */
+ public ClientResponse.Headers headers() {
+ return delegate.headers();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.web.reactive.function.client.ClientResponse#body(org.springframework.web.reactive.function.BodyExtractor)
+ */
+ public T body(BodyExtractor extractor) {
+ return delegate.body(extractor);
+ }
+
+ /*
+ * (non-Javadoc)
+ * until Elasticsearch 7.4 this empty implementation was available in the abstract base class
+ */
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ }
+}
\ No newline at end of file
diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java
new file mode 100644
index 00000000..83ccdd50
--- /dev/null
+++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java
@@ -0,0 +1,199 @@
+package org.jetlinks.community.elastic.search.service.reactive;
+
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.hswebframework.ezorm.core.param.QueryParam;
+import org.hswebframework.ezorm.core.param.TermType;
+import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket;
+import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure;
+import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse;
+import org.jetlinks.community.elastic.search.aggreation.bucket.Sort;
+import org.jetlinks.community.elastic.search.aggreation.enums.BucketType;
+import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType;
+import org.jetlinks.community.elastic.search.aggreation.enums.OrderType;
+import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure;
+import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
+import org.jetlinks.community.elastic.search.service.AggregationService;
+import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService;
+import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
+import org.jetlinks.community.timeseries.query.AggregationQueryParam;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author bsetfeng
+ * @since 1.0
+ **/
+@Service
+@Slf4j
+public class ReactiveAggregationService implements AggregationService {
+
+ private final ReactiveElasticsearchClient restClient;
+
+ private final ElasticSearchIndexManager indexManager;
+
+ @Autowired
+ public ReactiveAggregationService(ElasticSearchIndexManager indexManager,
+ ReactiveElasticsearchClient restClient) {
+ this.restClient = restClient;
+ this.indexManager = indexManager;
+ }
+
+ private Mono createSearchSourceBuilder(QueryParam queryParam, String index) {
+
+ return indexManager
+ .getIndexMetadata(index)
+ .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata));
+ }
+
+ @Override
+ public Flux