From d3c26b014dc0900ca1fcfd861311bf27b68fde43 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Fri, 28 Feb 2020 22:46:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96es?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../aggreation/DefaultAggregationService.java | 25 +++---- .../service/DefaultElasticSearchService.java | 72 +++++++++++++------ .../timeseries/ESTimeSeriesService.java | 18 ++--- 3 files changed, 69 insertions(+), 46 deletions(-) diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java index 06558667..dfa1beba 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/aggreation/DefaultAggregationService.java @@ -53,10 +53,8 @@ public class DefaultAggregationService implements AggregationService { MetricsAggregationStructure structure, ElasticIndex provider) { return searchSourceBuilderMono(queryParam, provider) - .doOnNext(builder -> builder.aggregation( - structure.getType().aggregationBuilder(structure.getName(), structure.getField()))) .map(builder -> new SearchRequest(provider.getStandardIndex()) - .source(builder)) + .source(builder.aggregation(structure.getType().aggregationBuilder(structure.getName(), structure.getField())))) .flatMap(request -> Mono.create(monoSink -> restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink)))) .map(searchResponse -> structure.getType().getResponse(structure.getName(), searchResponse)); @@ -65,23 +63,18 @@ public class DefaultAggregationService implements AggregationService { @Override public Mono bucketAggregation(QueryParam queryParam, BucketAggregationsStructure structure, ElasticIndex provider) { return searchSourceBuilderMono(queryParam, provider) - .doOnNext(builder -> - builder.aggregation(structure.getType().aggregationBuilder(structure)) - ) .map(builder -> new SearchRequest(provider.getStandardIndex()) - .source(builder)) + .source(builder.aggregation(structure.getType().aggregationBuilder(structure)))) .doOnNext(searchRequest -> - log.debug("聚合查询index:{},参数:{}", - provider.getStandardIndex(), - JSON.toJSON(searchRequest.source().toString()))) + log.debug("聚合查询index:{},参数:{}", provider.getStandardIndex(), JSON.toJSON(searchRequest.source().toString()))) .flatMap(request -> Mono.create(monoSink -> - restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink)))) - .map(response -> structure.getType().convert(response.getAggregations().get(structure.getName()))) - .map(buckets -> BucketResponse.builder() + restClient + .getQueryClient() + .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink)))) + .map(response -> BucketResponse.builder() .name(structure.getName()) - .buckets(buckets) - .build() - ) + .buckets(structure.getType().convert(response.getAggregations().get(structure.getName()))) + .build()) ; } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java index 2634d70e..a31f0ea2 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java @@ -67,20 +67,32 @@ public class DefaultElasticSearchService implements ElasticSearchService { public Mono> queryPager(ElasticIndex index, QueryParam queryParam, Class type) { return query(searchRequestStructure(queryParam, index)) .map(response -> translatePageResult(type, queryParam, response)) - .switchIfEmpty(Mono.just(PagerResult.empty())); + .switchIfEmpty(Mono.just(PagerResult.empty())) + .onErrorReturn(err -> { + log.error("query elastic error", err); + return true; + }, PagerResult.empty()); } @Override public Flux query(ElasticIndex index, QueryParam queryParam, Class type) { return query(searchRequestStructure(queryParam, index)) - .flatMapIterable(response -> translate(type, response)); + .flatMapIterable(response -> translate(type, response)) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Flux.empty(); + }); } @Override public Mono count(ElasticIndex index, QueryParam queryParam) { return countQuery(countRequestStructure(queryParam, index)) .map(CountResponse::getCount) - .switchIfEmpty(Mono.just(0L)); + .defaultIfEmpty(0L) + .onErrorReturn(err -> { + log.error("query elastic error", err); + return true; + }, 0L); } @@ -124,8 +136,12 @@ public class DefaultElasticSearchService implements ElasticSearchService { drop -> System.err.println("无法处理更多索引请求!"), BufferOverflowStrategy.DROP_OLDEST) .flatMap(this::doSave) - .doOnNext((len) -> log.debug("保存ES数据成功,数量:{}", len)) - .onErrorContinue((err, obj) -> System.err.println(org.hswebframework.utils.StringUtils.throwable2String(err))) + .doOnNext((len) -> { + if (log.isDebugEnabled() && len > 0) { + log.debug("保存ElasticSearch数据成功,数量:{}", len); + } + }) + .onErrorContinue((err, obj) -> System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err))) .subscribe(); } @@ -159,16 +175,16 @@ public class DefaultElasticSearchService implements ElasticSearchService { return bulkRequest; }) .flatMap(bulkRequest -> - Mono.create(sink -> + Mono.create(sink -> restClient.getWriteClient() .bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener() { @Override public void onResponse(BulkResponse responses) { if (responses.hasFailures()) { - sink.error(new RuntimeException("批量存储es数据失败:" + responses.buildFailureMessage())); + sink.error(new RuntimeException("保存ElasticSearch数据失败:" + responses.buildFailureMessage())); return; } - sink.success(!responses.hasFailures()); + sink.success(buffers.size()); } @Override @@ -176,7 +192,7 @@ public class DefaultElasticSearchService implements ElasticSearchService { sink.error(e); } }))) - .then(Mono.just(buffers.size())); + .collect(Collectors.summingInt(Integer::intValue)); } private PagerResult translatePageResult(Class clazz, QueryParam param, SearchResponse response) { @@ -196,19 +212,25 @@ public class DefaultElasticSearchService implements ElasticSearchService { } private Mono query(Mono requestMono) { - return requestMono.flatMap((request) -> Mono.create(sink -> { - restClient.getQueryClient() - .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink)); - } - )); + return requestMono.flatMap((request) -> + Mono.create(sink -> restClient + .getQueryClient() + .searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink)))) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Mono.empty(); + }); } private Mono countQuery(Mono requestMono) { - return requestMono.flatMap((request) -> Mono.create(sink -> { - restClient.getQueryClient() - .countAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink)); - } - )); + return requestMono.flatMap((request) -> + Mono.create(sink -> restClient + .getQueryClient() + .countAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink)))) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Mono.empty(); + }); } private ActionListener translatorActionListener(MonoSink sink) { @@ -244,8 +266,11 @@ public class DefaultElasticSearchService implements ElasticSearchService { } return request; }) - .doOnError(e -> log.error("查询index:" + provider.getStandardIndex() + "元数据错误", e)) - .doOnNext(searchRequest -> log.debug("查询index:{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString())); + .doOnNext(searchRequest -> log.debug("查询index:{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString())) + .onErrorResume(err -> { + log.error("query index error", err); + return Mono.empty(); + }); } private Mono countRequestStructure(QueryParam queryParam, ElasticIndex provider) { @@ -255,6 +280,9 @@ public class DefaultElasticSearchService implements ElasticSearchService { return indexOperationService.getIndexMappingMetadata(provider.getStandardIndex()) .map(metadata -> new CountRequest(provider.getStandardIndex()) .source(translateService.translate(tempQueryParam, metadata))) - .doOnNext(searchRequest -> log.debug("查询index:{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString())); + .onErrorResume(err -> { + log.error("query index error", err); + return Mono.empty(); + }); } } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/timeseries/ESTimeSeriesService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/timeseries/ESTimeSeriesService.java index 61ee5db9..56804411 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/timeseries/ESTimeSeriesService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/timeseries/ESTimeSeriesService.java @@ -44,8 +44,7 @@ public class ESTimeSeriesService extends AbstractTimeSeriesService { return elasticSearchService.query( cleverGetIndex(), filterAddDefaultSort(queryParam), - Map.class - ) + Map.class) .map(map -> new TimeSeriesData() { @Override @@ -75,20 +74,23 @@ public class ESTimeSeriesService extends AbstractTimeSeriesService { return aggregationService.bucketAggregation( filterAddDefaultSort(addQueryTimeRange(param.getQueryParam(), param)), structure, - cleverGetIndex() - ).flatMapMany(bucketResponse -> Flux.fromIterable(new BucketsParser(bucketResponse.getBuckets()).result) - .take(param.getLimit()) - .map(ESAggregationData::new)); + cleverGetIndex()) + .onErrorResume(err -> Mono.empty()) + .flatMapMany(bucketResponse -> Flux.fromIterable(new BucketsParser(bucketResponse.getBuckets()).result) + .take(param.getLimit()) + .map(ESAggregationData::new)) + ; } @Override public Mono save(Publisher data) { - return elasticSearchService.commit(getDefaultIndex(), Flux.from(data) + return Flux.from(data) .map(timeSeriesData -> { Map map = timeSeriesData.getData(); map.put("timestamp", timeSeriesData.getTimestamp()); return map; - })); + }) + .as(stream -> elasticSearchService.commit(getDefaultIndex(), stream)); } @Override