diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java index 225f06a0..addd6365 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java @@ -171,47 +171,31 @@ public class DefaultElasticSearchService implements ElasticSearchService { protected Mono doSave(Collection buffers) { return Flux.fromIterable(buffers) - .collect(Collectors.groupingBy(Buffer::getIndex)) - .flatMapMany(map -> Flux - .fromIterable(map.entrySet()) - .flatMap(e -> - this.getIndexForSave(e.getKey()) - .zipWith(indexManager.getIndexMetadata(e.getKey()), (index, metadata) -> Tuples.of(index, metadata, e.getValue())))) - .map(entry -> { - String index = entry.getT1(); - BulkRequest bulkRequest = new BulkRequest(index, "_doc"); - for (Buffer buffer : entry.getT3()) { - IndexRequest request = new IndexRequest(); - Object o = JSON.toJSON(buffer.getPayload()); - if (o instanceof Map) { - request.source(ElasticSearchConverter.convertDataToElastic((Map) o, entry.getT2().getProperties())); - } else { - request.source(o.toString(), XContentType.JSON); - } - bulkRequest.add(request); - } - entry.getT3().clear(); - return bulkRequest; + .groupBy(Buffer::getIndex) + .flatMap(group -> { + String index = group.key(); + return this.getIndexForSave(index) + .zipWith(indexManager.getIndexMetadata(index)) + .flatMapMany(tp2 -> + group.map(buffer -> { + IndexRequest request = new IndexRequest(tp2.getT1(),"_doc"); + Object o = JSON.toJSON(buffer.getPayload()); + if (o instanceof Map) { + request.source(tp2.getT2().convertToElastic((Map) o)); + } else { + request.source(o.toString(), XContentType.JSON); + } + return request; + })); }) - .flatMap(bulkRequest -> - Mono.create(sink -> - restClient.getWriteClient() - .bulkAsync(bulkRequest, RequestOptions.DEFAULT, new ActionListener() { - @Override - public void onResponse(BulkResponse responses) { - if (responses.hasFailures()) { - sink.error(new RuntimeException("保存ElasticSearch数据失败:" + responses.buildFailureMessage())); - return; - } - sink.success(1); - } - - @Override - public void onFailure(Exception e) { - sink.error(e); - } - }))) - .then(Mono.just(buffers.size())); + .collectList() + .flatMap(lst -> { + BulkRequest request = new BulkRequest(); + lst.forEach(request::add); + return ReactorActionListener.mono(listener -> { + restClient.getWriteClient().bulkAsync(request, RequestOptions.DEFAULT, listener); + }); + }).thenReturn(buffers.size()); } private PagerResult translatePageResult(Function, T> mapper, QueryParam param, SearchResponse response) {