This commit is contained in:
zhouhao 2020-02-28 22:46:43 +08:00
parent e28e50dacc
commit d3c26b014d
3 changed files with 69 additions and 46 deletions

View File

@ -53,10 +53,8 @@ public class DefaultAggregationService implements AggregationService {
MetricsAggregationStructure structure,
ElasticIndex provider) {
return searchSourceBuilderMono(queryParam, provider)
.doOnNext(builder -> builder.aggregation(
structure.getType().aggregationBuilder(structure.getName(), structure.getField())))
.map(builder -> new SearchRequest(provider.getStandardIndex())
.source(builder))
.source(builder.aggregation(structure.getType().aggregationBuilder(structure.getName(), structure.getField()))))
.flatMap(request -> Mono.<SearchResponse>create(monoSink ->
restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
.map(searchResponse -> structure.getType().getResponse(structure.getName(), searchResponse));
@ -65,23 +63,18 @@ public class DefaultAggregationService implements AggregationService {
@Override
public Mono<BucketResponse> bucketAggregation(QueryParam queryParam, BucketAggregationsStructure structure, ElasticIndex provider) {
return searchSourceBuilderMono(queryParam, provider)
.doOnNext(builder ->
builder.aggregation(structure.getType().aggregationBuilder(structure))
)
.map(builder -> new SearchRequest(provider.getStandardIndex())
.source(builder))
.source(builder.aggregation(structure.getType().aggregationBuilder(structure))))
.doOnNext(searchRequest ->
log.debug("聚合查询index:{},参数:{}",
provider.getStandardIndex(),
JSON.toJSON(searchRequest.source().toString())))
log.debug("聚合查询index:{},参数:{}", provider.getStandardIndex(), JSON.toJSON(searchRequest.source().toString())))
.flatMap(request -> Mono.<SearchResponse>create(monoSink ->
restClient.getQueryClient().searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
.map(response -> structure.getType().convert(response.getAggregations().get(structure.getName())))
.map(buckets -> BucketResponse.builder()
restClient
.getQueryClient()
.searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(monoSink))))
.map(response -> BucketResponse.builder()
.name(structure.getName())
.buckets(buckets)
.build()
)
.buckets(structure.getType().convert(response.getAggregations().get(structure.getName())))
.build())
;
}

View File

@ -67,20 +67,32 @@ public class DefaultElasticSearchService implements ElasticSearchService {
public <T> Mono<PagerResult<T>> queryPager(ElasticIndex index, QueryParam queryParam, Class<T> type) {
return query(searchRequestStructure(queryParam, index))
.map(response -> translatePageResult(type, queryParam, response))
.switchIfEmpty(Mono.just(PagerResult.empty()));
.switchIfEmpty(Mono.just(PagerResult.empty()))
.onErrorReturn(err -> {
log.error("query elastic error", err);
return true;
}, PagerResult.empty());
}
@Override
public <T> Flux<T> query(ElasticIndex index, QueryParam queryParam, Class<T> type) {
return query(searchRequestStructure(queryParam, index))
.flatMapIterable(response -> translate(type, response));
.flatMapIterable(response -> translate(type, response))
.onErrorResume(err -> {
log.error("query elastic error", err);
return Flux.empty();
});
}
@Override
public Mono<Long> count(ElasticIndex index, QueryParam queryParam) {
return countQuery(countRequestStructure(queryParam, index))
.map(CountResponse::getCount)
.switchIfEmpty(Mono.just(0L));
.defaultIfEmpty(0L)
.onErrorReturn(err -> {
log.error("query elastic error", err);
return true;
}, 0L);
}
@ -124,8 +136,12 @@ public class DefaultElasticSearchService implements ElasticSearchService {
drop -> System.err.println("无法处理更多索引请求!"),
BufferOverflowStrategy.DROP_OLDEST)
.flatMap(this::doSave)
.doOnNext((len) -> log.debug("保存ES数据成功,数量:{}", len))
.onErrorContinue((err, obj) -> System.err.println(org.hswebframework.utils.StringUtils.throwable2String(err)))
.doOnNext((len) -> {
if (log.isDebugEnabled() && len > 0) {
log.debug("保存ElasticSearch数据成功,数量:{}", len);
}
})
.onErrorContinue((err, obj) -> System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err)))
.subscribe();
}
@ -159,16 +175,16 @@ public class DefaultElasticSearchService implements ElasticSearchService {
return bulkRequest;
})
.flatMap(bulkRequest ->
Mono.<Boolean>create(sink ->
Mono.<Integer>create(sink ->
restClient.getWriteClient()
.bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse responses) {
if (responses.hasFailures()) {
sink.error(new RuntimeException("批量存储es数据失败" + responses.buildFailureMessage()));
sink.error(new RuntimeException("保存ElasticSearch数据失败:" + responses.buildFailureMessage()));
return;
}
sink.success(!responses.hasFailures());
sink.success(buffers.size());
}
@Override
@ -176,7 +192,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
sink.error(e);
}
})))
.then(Mono.just(buffers.size()));
.collect(Collectors.summingInt(Integer::intValue));
}
private <T> PagerResult<T> translatePageResult(Class<T> clazz, QueryParam param, SearchResponse response) {
@ -196,19 +212,25 @@ public class DefaultElasticSearchService implements ElasticSearchService {
}
private Mono<SearchResponse> query(Mono<SearchRequest> requestMono) {
return requestMono.flatMap((request) -> Mono.create(sink -> {
restClient.getQueryClient()
.searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink));
}
));
return requestMono.<SearchResponse>flatMap((request) ->
Mono.create(sink -> restClient
.getQueryClient()
.searchAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink))))
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<CountResponse> countQuery(Mono<CountRequest> requestMono) {
return requestMono.flatMap((request) -> Mono.create(sink -> {
restClient.getQueryClient()
.countAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink));
}
));
return requestMono.<CountResponse>flatMap((request) ->
Mono.create(sink -> restClient
.getQueryClient()
.countAsync(request, RequestOptions.DEFAULT, translatorActionListener(sink))))
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private <T> ActionListener<T> translatorActionListener(MonoSink<T> sink) {
@ -244,8 +266,11 @@ public class DefaultElasticSearchService implements ElasticSearchService {
}
return request;
})
.doOnError(e -> log.error("查询index:" + provider.getStandardIndex() + "元数据错误", e))
.doOnNext(searchRequest -> log.debug("查询index{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString()));
.doOnNext(searchRequest -> log.debug("查询index{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString()))
.onErrorResume(err -> {
log.error("query index error", err);
return Mono.empty();
});
}
private Mono<CountRequest> countRequestStructure(QueryParam queryParam, ElasticIndex provider) {
@ -255,6 +280,9 @@ public class DefaultElasticSearchService implements ElasticSearchService {
return indexOperationService.getIndexMappingMetadata(provider.getStandardIndex())
.map(metadata -> new CountRequest(provider.getStandardIndex())
.source(translateService.translate(tempQueryParam, metadata)))
.doOnNext(searchRequest -> log.debug("查询index{},es查询参数:{}", provider.getStandardIndex(), searchRequest.source().toString()));
.onErrorResume(err -> {
log.error("query index error", err);
return Mono.empty();
});
}
}

View File

@ -44,8 +44,7 @@ public class ESTimeSeriesService extends AbstractTimeSeriesService {
return elasticSearchService.query(
cleverGetIndex(),
filterAddDefaultSort(queryParam),
Map.class
)
Map.class)
.map(map -> new TimeSeriesData() {
@Override
@ -75,20 +74,23 @@ public class ESTimeSeriesService extends AbstractTimeSeriesService {
return aggregationService.bucketAggregation(
filterAddDefaultSort(addQueryTimeRange(param.getQueryParam(), param)),
structure,
cleverGetIndex()
).flatMapMany(bucketResponse -> Flux.fromIterable(new BucketsParser(bucketResponse.getBuckets()).result)
.take(param.getLimit())
.map(ESAggregationData::new));
cleverGetIndex())
.onErrorResume(err -> Mono.empty())
.flatMapMany(bucketResponse -> Flux.fromIterable(new BucketsParser(bucketResponse.getBuckets()).result)
.take(param.getLimit())
.map(ESAggregationData::new))
;
}
@Override
public Mono<Void> save(Publisher<TimeSeriesData> data) {
return elasticSearchService.commit(getDefaultIndex(), Flux.from(data)
return Flux.from(data)
.map(timeSeriesData -> {
Map<String, Object> map = timeSeriesData.getData();
map.put("timestamp", timeSeriesData.getTimestamp());
return map;
}));
})
.as(stream -> elasticSearchService.commit(getDefaultIndex(), stream));
}
@Override