优化ElasticSearchService

This commit is contained in:
zhouhao 2022-02-08 14:44:06 +08:00
parent 0d62f1c9eb
commit 7eb0a320d8
2 changed files with 303 additions and 189 deletions

View File

@ -0,0 +1,88 @@
package org.jetlinks.community.utils;
import java.util.function.Supplier;
public class SystemUtils {
static float memoryWatermark = Float.parseFloat(
System.getProperty("memory.watermark", System.getProperty("memory.waterline", "0.1")));
//水位线持续
static long memoryWatermarkDuration = TimeUtils
.parse(System.getProperty("memory.watermark.duration", "5s"))
.toMillis();
static long errorPintInterval = TimeUtils
.parse(System.getProperty("memory.watermark.duration", "500"))
.toMillis();
static Supplier<Float> memoryRemainderSupplier = () -> {
Runtime rt = Runtime.getRuntime();
long free = rt.freeMemory();
long total = rt.totalMemory();
long max = rt.maxMemory();
return (max - total + free) / (max + 0.0F);
};
/**
* 获取内存剩余比例,值为0-1之间,值越小,剩余可用内存越小
*
* @return 内存剩余比例
*/
public static float getMemoryRemainder() {
return memoryRemainderSupplier.get();
}
private static volatile long outTimes = 0;
private static volatile long lastPrintTime = 0;
/**
* 判断当前内存是否已经超过水位线
*
* @return 是否已经超过水位线
*/
public static boolean memoryIsOutOfWatermark() {
boolean out = getMemoryRemainder() < memoryWatermark;
if (!out) {
outTimes = 0;
return false;
}
//连续超水位线
if (outTimes == 0) {
outTimes = System.currentTimeMillis();
} else {
if(System.currentTimeMillis() - outTimes > memoryWatermarkDuration){
System.gc();
return true;
}
}
return false;
}
/**
* 直接打印消息到控制台,支持格式化,<code>printError("save error %s",id);</code>
*
* @param format 格式化
* @param args 格式化参数
* @see java.util.Formatter
*/
public static void printError(String format, Object... args) {
printError(format, () -> args);
}
/**
* 直接打印消息到控制台,支持格式化,<code>printError("save error %s",id);</code>
*
* @param format 格式化
* @param argSupplier 格式化参数
* @see java.util.Formatter
*/
public static void printError(String format, Supplier<Object[]> argSupplier) {
long now = System.currentTimeMillis();
//防止频繁打印导致线程阻塞
if (now - lastPrintTime > errorPintInterval) {
lastPrintTime = now;
System.err.printf((format) + "%n", argSupplier.get());
}
}
}

View File

@ -4,18 +4,15 @@ import com.alibaba.fastjson.JSON;
import io.netty.util.internal.ObjectPool;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
@ -32,6 +29,7 @@ import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -68,28 +66,26 @@ import java.util.stream.Collectors;
@ConfigurationProperties(prefix = "elasticsearch")
public class ReactiveElasticSearchService implements ElasticSearchService {
@Getter
private final ReactiveElasticsearchClient restClient;
@Getter
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false
);
//使用对象池处理Buffer,减少GC消耗
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
}
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager) {
this.restClient = restClient;
init();
this.indexManager = indexManager;
init();
}
@Override
@ -124,23 +120,34 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
});
}
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
return this.query(new String[]{index}, queryParam, mapper);
}
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
if (queryParam.isPaging()) {
return this
.doQuery(index, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
return this
.doQuery(index, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
.doScrollQuery(index, queryParam)
.flatMap(tp2 -> convertQueryHit(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this.doQuery(index, queryParam)
if (!queryParam.isPaging()) {
return Mono
.zip(
this.count(index, queryParam),
this.query(index, queryParam, mapper).collectList(),
(total, list) -> PagerResult.of(total.intValue(), list, queryParam)
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
}
return this
.doQuery(index, queryParam)
.flatMap(tp2 -> this
.convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
@ -169,8 +176,30 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
});
}
private <T> Flux<T> convertQueryHit(List<ElasticSearchIndexMetadata> indexList,
SearchHit searchHit,
Function<Map<String, Object>, T> mapper) {
Map<String, ElasticSearchIndexMetadata> metadata = indexList
.stream()
.collect(Collectors.toMap(ElasticSearchIndexMetadata::getIndex, Function.identity()));
return Flux
.just(searchHit)
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
});
}
@ -185,52 +214,79 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.createSearchRequest(queryParam, metadataList)
.flatMap(restClient::searchForPage)
.map(response -> Tuples.of(metadataList, response))
).onErrorResume(err -> {
log.error(err.getMessage(), err);
return Mono.empty();
});
)
;
}
private Flux<Tuple2<List<ElasticSearchIndexMetadata>, SearchHit>> doScrollQuery(String[] index,
QueryParam queryParam) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMapMany(metadataList -> this
.createSearchRequest(queryParam, metadataList)
.flatMapMany(restClient::scroll)
.map(searchHit -> Tuples.of(metadataList, searchHit))
);
}
@Override
public Mono<Long> count(String[] index, QueryParam queryParam) {
QueryParam param = queryParam.clone();
param.setPaging(false);
return createSearchRequest(param, index)
return this
.createSearchRequest(param, index)
.flatMap(this::doCount)
.defaultIfEmpty(0L)
.onErrorReturn(err -> {
log.error("query elastic error", err);
return true;
}, 0L);
.defaultIfEmpty(0L);
}
@Override
public Mono<Long> delete(String index, QueryParam queryParam) {
return this
.getIndexForSearch(index)
.flatMap(inx -> this
.createQueryBuilder(queryParam, index)
.flatMap(request -> restClient.deleteBy(delete -> delete.setQuery(request).indices(inx)))
.map(BulkByScrollResponse::getDeleted))
.defaultIfEmpty(0L);
}
return createQueryBuilder(queryParam, index)
.flatMap(request -> restClient.deleteBy(delete -> delete.setQuery(request).indices(index)))
.map(BulkByScrollResponse::getDeleted);
private boolean checkWritable(String index) {
if (SystemUtils.memoryIsOutOfWatermark()) {
SystemUtils.printError("JVM内存不足,elasticsearch无法处理更多索引[%s]请求!", index);
return false;
}
return true;
}
@Override
public <T> Mono<Void> commit(String index, T payload) {
sink.next(Buffer.of(index, payload));
if (checkWritable(index)) {
sink.next(Buffer.of(index, payload));
}
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Collection<T> payload) {
for (T t : payload) {
sink.next(Buffer.of(index, t));
if (checkWritable(index)) {
for (T t : payload) {
sink.next(Buffer.of(index, t));
}
}
return Mono.empty();
}
@Override
public <T> Mono<Void> commit(String index, Publisher<T> data) {
if (!checkWritable(index)) {
return Mono.empty();
}
return Flux.from(data)
.flatMap(d -> commit(index, d))
.then();
.flatMap(d -> commit(index, d))
.then();
}
@Override
@ -241,10 +297,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override
public <T> Mono<Void> save(String index, Publisher<T> data) {
return Flux.from(data)
.map(v -> Buffer.of(index, v))
.collectList()
.flatMap(this::doSave)
.then();
.map(v -> Buffer.of(index, v))
.collectList()
.flatMap(this::doSave)
.then();
}
@Override
@ -257,6 +313,34 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
sink.complete();
}
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
private boolean refreshWhenWrite = false;
}
//@PostConstruct
public void init() {
int flushRate = buffer.rate;
@ -268,41 +352,81 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
FluxUtils
.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
.doOnNext(buf -> bufferedBytes.set(0))
.onBackpressureBuffer(bufferBackpressure, drop -> {
// TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
drop.forEach(Buffer::release);
System.err.println("elasticsearch无法处理更多索引请求!丢弃数据数量:" + drop.size());
SystemUtils.printError("elasticsearch无法处理更多索引请求!丢弃数据数量:%d", drop.size());
}, BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.boundedElastic(), bufferBackpressure)
.flatMap(buffers -> {
long time = System.currentTimeMillis();
return Mono.create(sink -> {
try {
this
.doSave(buffers)
.doOnNext((len) -> log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}ms", len, (System.currentTimeMillis() - time)))
.doOnError((err) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
System.err.println("保存ElasticSearch数据失败:\n" + org.hswebframework.utils.StringUtils.throwable2String(err));
})
.doFinally((s) -> sink.success())
.subscribe();
sink.onCancel(this
.doSave(buffers)
.doFinally((s) -> sink.success())
.subscribe());
} catch (Exception e) {
sink.success();
}
});
})
.onErrorResume((err) -> Mono
.fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err))))
.fromRunnable(() -> SystemUtils.printError("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err))))
.subscribe();
}
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
@Getter
static class Buffer {
String index;
String id;
String payload;
final ObjectPool.Handle<Buffer> handle;
public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle;
}
public static Buffer of(String index, Object payload) {
Buffer buffer;
try {
buffer = pool.get();
} catch (Throwable e) {
buffer = new Buffer(null);
}
buffer.index = index;
Map<String, Object> data = payload instanceof Map
? ((Map) payload) :
FastBeanCopier.copy(payload, HashMap::new);
Object id = data.get("id");
buffer.id = id == null ? null : String.valueOf(id);
buffer.payload = JSON.toJSONString(data);
return buffer;
}
void release() {
this.index = null;
this.id = null;
this.payload = null;
if (null != handle) {
handle.recycle(this);
}
}
int numberOfBytes() {
return payload == null ? 0 : payload.length() * 2;
}
}
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
@ -318,7 +442,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
return Flux
.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
@ -345,6 +470,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
if (buffer.refreshWhenWrite) {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
lst.forEach(request::add);
return restClient
.bulk(request)
@ -355,53 +483,35 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return save;
});
})
.doOnError((err) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
SystemUtils.printError("保存ElasticSearch数据失败:\n%s", () -> new Object[]{
org.hswebframework.utils.StringUtils.throwable2String(err)
});
})
.doOnNext(response -> {
log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}", response.getItems().length, response.getTook());
if (response.hasFailures()) {
System.err.println(response.buildFailureMessage());
SystemUtils.printError(response.buildFailureMessage());
}
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Flux<SearchHit> doSearch(SearchRequest request) {
return restClient
.search(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Mono<Long> doCount(SearchRequest request) {
return restClient
.count(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
return restClient.count(request);
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
@ -416,12 +526,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
@ -430,88 +540,4 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
}
@Getter
static class Buffer {
final ObjectPool.Handle<Buffer> handle;
String index;
String id;
String payload;
public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle;
}
public static Buffer of(String index, Object payload) {
Buffer buffer;
try {
buffer = pool.get();
} catch (Exception e) {
buffer = new Buffer(null);
}
buffer.index = index;
Map<String, Object> data = payload instanceof Map
? ((Map) payload) :
FastBeanCopier.copy(payload, HashMap::new);
Object id = data.get("id");
buffer.id = id == null ? null : String.valueOf(id);
buffer.payload = JSON.toJSONString(data);
return buffer;
}
void release() {
this.index = null;
this.id = null;
this.payload = null;
if (null != handle) {
handle.recycle(this);
}
}
int numberOfBytes() {
return payload == null ? 0 : payload.length() * 2;
}
}
}