优化es数据写入性能

This commit is contained in:
zhouhao 2022-09-20 15:34:23 +08:00
parent de028b763f
commit 0aac04ba10
10 changed files with 1026 additions and 131 deletions

View File

@ -44,5 +44,16 @@
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
</dependency>
<dependency>
<groupId>de.ruedigermoeller</groupId>
<artifactId>fst</artifactId>
<version>2.57</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,25 @@
package org.jetlinks.community.buffer;
import lombok.Getter;
import lombok.Setter;
import java.time.Duration;
@Getter
@Setter
public class BufferProperties {
//缓冲文件存储目录
private String filePath;
//缓冲区大小,超过此大小将执行 handler 处理逻辑
private int size = 1000;
//缓冲超时时间
private Duration timeout = Duration.ofSeconds(1);
//并行度,表示支持并行写入的最大线程数.
private int parallelism = Math.max(1, Runtime.getRuntime().availableProcessors());
//最大重试次数,超过此次数的数据将会放入死队列.
private long maxRetryTimes = 64;
}

View File

@ -0,0 +1,134 @@
package org.jetlinks.community.buffer;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.community.utils.ErrorUtils;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.transaction.CannotCreateTransactionException;
import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
/**
* @author zhouhao
* @since 2.0
*/
@Getter
@AllArgsConstructor
public class BufferSettings {
private static final Predicate<Throwable> DEFAULT_RETRY_WHEN_ERROR =
e -> ErrorUtils.hasException(e, IOException.class,
TimeoutException.class,
DataAccessResourceFailureException.class,
CannotCreateTransactionException.class,
QueryTimeoutException.class);
public static Predicate<Throwable> defaultRetryWhenError() {
return DEFAULT_RETRY_WHEN_ERROR;
}
private final String filePath;
private final String fileName;
private final Predicate<Throwable> retryWhenError;
//缓冲区大小,超过此大小将执行 handler 处理逻辑
private final int bufferSize;
//缓冲超时时间
private final Duration bufferTimeout;
//并行度,表示支持并行写入的最大线程数.
private final int parallelism;
//最大重试次数,超过此次数的数据将会放入死队列.
private final long maxRetryTimes;
public static BufferSettings create(String filePath, String fileName) {
return new BufferSettings(
filePath,
fileName,
defaultRetryWhenError(),
1000,
Duration.ofSeconds(1),
Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
5);
}
public static BufferSettings create(BufferProperties properties) {
return create("buffer.queue", properties);
}
public static BufferSettings create(String fileName, BufferProperties properties) {
return create(properties.getFilePath(), fileName).properties(properties);
}
public BufferSettings bufferSize(int bufferSize) {
return new BufferSettings(filePath,
fileName,
retryWhenError,
bufferSize,
bufferTimeout,
parallelism,
maxRetryTimes);
}
public BufferSettings bufferTimeout(Duration bufferTimeout) {
return new BufferSettings(filePath,
fileName,
retryWhenError,
bufferSize,
bufferTimeout,
parallelism,
maxRetryTimes);
}
public BufferSettings parallelism(int parallelism) {
return new BufferSettings(filePath,
fileName,
retryWhenError,
bufferSize,
bufferTimeout,
parallelism,
maxRetryTimes);
}
public BufferSettings maxRetry(int maxRetryTimes) {
return new BufferSettings(filePath,
fileName,
retryWhenError,
bufferSize,
bufferTimeout,
parallelism,
maxRetryTimes);
}
public BufferSettings retryWhenError(Predicate<Throwable> retryWhenError) {
return new BufferSettings(filePath,
fileName,
Objects.requireNonNull(retryWhenError),
bufferSize,
bufferTimeout,
parallelism,
maxRetryTimes);
}
public BufferSettings properties(BufferProperties properties) {
return new BufferSettings(filePath,
fileName,
Objects.requireNonNull(retryWhenError),
properties.getSize(),
properties.getTimeout(),
properties.getParallelism(),
properties.getMaxRetryTimes());
}
}

View File

@ -0,0 +1,7 @@
package org.jetlinks.community.buffer;
public interface MemoryUsage {
int usage();
}

View File

@ -0,0 +1,571 @@
package org.jetlinks.community.buffer;
import io.netty.buffer.*;
import io.netty.util.ReferenceCountUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.BasicDataType;
import org.jetlinks.community.codec.Serializers;
import org.jetlinks.core.cache.FileQueue;
import org.jetlinks.core.cache.FileQueueProxy;
import org.jetlinks.core.utils.SerializeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import javax.annotation.Nonnull;
import java.io.*;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
/**
* 支持持久化的缓存批量操作工具,用于支持数据的批量操作,如批量写入数据到数据库等.
* <p>
* 数据将保存在一个文件队列里,如果写入速度跟不上,数据将会尝试写入到本地文件中.
*
* <pre>{@code
*
* BufferWriter<Data> writer = BufferWriter
* .<Data>create(
* "./data/buffer", //文件目录
* "my-data.queue", //文件名
* buffer->{
* return saveData(buffer);
* })
* .bufferSize(1000)//缓冲大小,当缓冲区超过此数量时将会立即执行写出操作.
* .bufferTimeout(Duration.ofSeconds(2))// 缓冲超时时间,当缓冲区超过此时间将会立即执行写出操作.
* .parallelism(2); //并行度,表示支持并行写入的线程数.
*
* //开始批量写入数据
* writer.start();
*
* //写入缓冲区
* writer.write(data);
* }</pre>
*
* @param <T> 数据类型,需要实现Serializable接口
* @author zhouhao
* @since pro 2.0
*/
public class PersistenceBuffer<T extends Serializable> implements Disposable {
@SuppressWarnings("all")
private final static AtomicIntegerFieldUpdater<PersistenceBuffer> WIP =
AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "wip");
@SuppressWarnings("all")
private final static AtomicIntegerFieldUpdater<PersistenceBuffer> REMAINDER =
AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "remainder");
@SuppressWarnings("all")
private final static AtomicIntegerFieldUpdater<PersistenceBuffer> DEAD_SZIE =
AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "deadSize");
@SuppressWarnings("all")
private final static AtomicReferenceFieldUpdater<PersistenceBuffer, Collection> BUFFER =
AtomicReferenceFieldUpdater.newUpdater(PersistenceBuffer.class, Collection.class, "buffer");
@SuppressWarnings("all")
private final static AtomicReferenceFieldUpdater<PersistenceBuffer, Boolean> DISPOSED =
AtomicReferenceFieldUpdater.newUpdater(PersistenceBuffer.class, Boolean.class, "disposed");
private Logger logger = LoggerFactory.getLogger(PersistenceBuffer.class);
@Getter
private String name = "unknown";
//死队列,存储无法完成操作的数据
private FileQueue<Buf<T>> queue;
//死队列,存储无法完成操作的数据
private FileQueue<Buf<T>> deadQueue;
//缓冲数据处理器,实际处理缓冲数据的逻辑,比如写入数据库.
private final Function<Flux<T>, Mono<Boolean>> handler;
//缓冲区大小,超过此大小将执行 handler 处理逻辑
private BufferSettings settings;
//缓冲区
private volatile Collection<Buf<T>> buffer;
//反序列化时指定快速实例化
private final Supplier<Externalizable> instanceBuilder;
//上一次刷新时间
private long lastFlushTime;
//当前正在进行的操作
private volatile int wip;
//剩余数量
private volatile int remainder;
//死数据数量
private volatile int deadSize;
//刷新缓冲区定时任务
private Disposable intervalFlush;
private volatile Boolean disposed = false;
public PersistenceBuffer(String filePath,
String fileName,
Supplier<T> newInstance,
Function<Flux<T>, Mono<Boolean>> handler) {
this(BufferSettings.create(filePath, fileName), newInstance, handler);
}
public PersistenceBuffer(String filePath,
String fileName,
Function<Flux<T>, Mono<Boolean>> handler) {
this(filePath, fileName, null, handler);
}
public PersistenceBuffer(BufferSettings settings,
Supplier<T> newInstance,
Function<Flux<T>, Mono<Boolean>> handler) {
if (newInstance != null) {
T data = newInstance.get();
if (data instanceof Externalizable) {
this.instanceBuilder = () -> (Externalizable) newInstance.get();
} else {
this.instanceBuilder = null;
}
} else {
this.instanceBuilder = null;
}
this.settings = settings;
//包装一层,防止apply直接报错导致流中断
this.handler = list -> Mono
.defer(() -> handler.apply(list));
}
public PersistenceBuffer<T> bufferSize(int size) {
settings = settings.bufferSize(size);
return this;
}
public PersistenceBuffer<T> bufferTimeout(Duration bufferTimeout) {
settings = settings.bufferTimeout(bufferTimeout);
return this;
}
public PersistenceBuffer<T> parallelism(int parallelism) {
settings = settings.parallelism(parallelism);
return this;
}
public PersistenceBuffer<T> maxRetry(int maxRetryTimes) {
settings = settings.maxRetry(maxRetryTimes);
return this;
}
public PersistenceBuffer<T> retryWhenError(Predicate<Throwable> predicate) {
settings = settings.retryWhenError(predicate);
return this;
}
public PersistenceBuffer<T> settings(Function<BufferSettings, BufferSettings> mapper) {
settings = mapper.apply(settings);
return this;
}
public PersistenceBuffer<T> name(String name) {
this.name = name;
logger = LoggerFactory.getLogger(PersistenceBuffer.class.getName() + "." + name);
return this;
}
static <T> FileQueue<Buf<T>> wrap(FileQueue<Buf<T>> queue) {
return new FileQueueProxy<Buf<T>>(queue) {
@Override
public void clear() {
super.flush();
}
};
}
private void init() {
String filePath = settings.getFilePath();
String fileName = settings.getFileName();
Path path = Paths.get(filePath);
fileName = fileName.replaceAll("[\\s\\\\/:*?\"<>|]", "_");
BufDataType dataType = new BufDataType();
//数据队列
this.queue = wrap(FileQueue
.<Buf<T>>builder()
.name(fileName)
.path(path)
.option("valueType", dataType)
.build());
this.remainder = queue.size();
//死队列,用于存放失败的数据
this.deadQueue = wrap(FileQueue
.<Buf<T>>builder()
.name(fileName + ".dead")
.path(path)
.option("valueType", dataType)
.build());
this.deadSize = this.deadQueue.size();
this.buffer = newBuffer();
}
public void start() {
if (intervalFlush != null) {
return;
}
init();
drain();
if (!settings.getBufferTimeout().isZero()) {
//定时刷新
intervalFlush = Flux
.interval(settings.getBufferTimeout())
.doOnNext(ignore -> intervalFlush())
.subscribe();
}
}
private void dead(Collection<Buf<T>> buf) {
if (deadQueue.addAll(buf)) {
DEAD_SZIE.addAndGet(this, buf.size());
}
}
private void dead(Buf<T> buf) {
if (deadQueue.add(buf)) {
DEAD_SZIE.incrementAndGet(this);
}
}
private void requeue(Collection<Buf<T>> buffer) {
for (Buf<T> buf : buffer) {
if (++buf.retry >= settings.getMaxRetryTimes()) {
dead(buf);
} else {
//直接写入queue,而不是使用write,等待后续有新的数据进入再重试
if (queue.offer(buf)) {
REMAINDER.incrementAndGet(this);
}
}
}
}
private void write(Buf<T> data) {
// remainder ++
REMAINDER.incrementAndGet(this);
queue.offer(data);
drain();
}
public void write(T data) {
write(new Buf<>(data, instanceBuilder));
}
public void dispose() {
if (DISPOSED.compareAndSet(this, false, true)) {
if (this.intervalFlush != null) {
this.intervalFlush.dispose();
}
//写出内存中的数据
queue.addAll(BUFFER.getAndSet(this, newBuffer()));
queue.close();
deadQueue.close();
}
}
@Override
public boolean isDisposed() {
return DISPOSED.get(this);
}
public int size() {
return remainder;
}
private void intervalFlush() {
if (System.currentTimeMillis() - lastFlushTime >= settings.getBufferTimeout().toMillis()
&& WIP.get(this) <= settings.getParallelism()) {
flush();
}
}
private void flush(Collection<Buf<T>> c) {
try {
lastFlushTime = System.currentTimeMillis();
if (c.isEmpty()) {
drain();
return;
}
// wip++
WIP.incrementAndGet(this);
handler
.apply(Flux.fromIterable(c).mapNotNull(buf -> buf.data))
.subscribe(new BaseSubscriber<Boolean>() {
final long startWith = System.currentTimeMillis();
final int remainder = REMAINDER.get(PersistenceBuffer.this);
@Override
protected void hookOnNext(@Nonnull Boolean doRequeue) {
if (logger.isDebugEnabled()) {
logger.debug("write {} data,size:{},remainder:{},requeue: {}.take up time: {} ms",
name,
c.size(),
remainder,
doRequeue,
System.currentTimeMillis() - startWith);
}
if (doRequeue) {
requeue(c);
}
}
@Override
protected void hookOnError(@Nonnull Throwable err) {
if (settings.getRetryWhenError().test(err)) {
if (logger.isWarnEnabled()) {
logger.warn("write {} data failed do retry later,size:{},remainder:{}.use time: {} ms",
name,
c.size(),
remainder,
System.currentTimeMillis() - startWith);
}
requeue(c);
} else {
if (logger.isWarnEnabled()) {
logger.warn("write {} data error,size:{},remainder:{}.use time: {} ms",
name,
c.size(),
remainder,
System.currentTimeMillis() - startWith,
err);
}
dead(c);
}
}
@Override
protected void hookFinally(@Nonnull SignalType type) {
// wip--
WIP.decrementAndGet(PersistenceBuffer.this);
drain();
}
});
} catch (Throwable e) {
logger.warn("flush buffer error", e);
}
}
private void flush() {
@SuppressWarnings("all")
Collection<Buf<T>> c = BUFFER.getAndSet(this, newBuffer());
flush(c);
}
private Collection<Buf<T>> newBuffer() {
return new ArrayList<>(settings.getBufferSize());
}
private void drain() {
//当前未执行完成的操作小于并行度才请求
if (WIP.incrementAndGet(this) <= settings.getParallelism()) {
int size = settings.getBufferSize();
for (int i = 0; i < size; i++) {
if (isDisposed()) {
break;
}
Buf<T> poll = queue.poll();
if (poll != null) {
onNext(poll);
} else {
break;
}
}
}
WIP.decrementAndGet(this);
}
private void onNext(@Nonnull Buf<T> value) {
REMAINDER.decrementAndGet(this);
Collection<Buf<T>> c;
boolean flush = false;
synchronized (this) {
c = buffer();
if (c.size() == settings.getBufferSize() - 1) {
BUFFER.compareAndSet(this, c, newBuffer());
flush = true;
}
c.add(value);
}
if (flush) {
flush(c);
}
}
@SuppressWarnings("unchecked")
private Collection<Buf<T>> buffer() {
return BUFFER.get(this);
}
@SneakyThrows
protected ObjectInput createInput(ByteBuf buffer) {
return Serializers.getDefault().createInput(new ByteBufInputStream(buffer, true));
}
@SneakyThrows
protected ObjectOutput createOutput(ByteBuf buffer) {
return Serializers.getDefault().createOutput(new ByteBufOutputStream(buffer));
}
@AllArgsConstructor
public static class Buf<T> implements Externalizable {
private final Supplier<Externalizable> instanceBuilder;
private T data;
private int retry = 0;
@SneakyThrows
public Buf() {
throw new IllegalAccessException();
}
public Buf(Supplier<Externalizable> instanceBuilder) {
this.instanceBuilder = instanceBuilder;
}
public Buf(T data, Supplier<Externalizable> instanceBuilder) {
this.data = data;
this.instanceBuilder = instanceBuilder;
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(retry);
if (instanceBuilder != null) {
((Externalizable) data).writeExternal(out);
} else {
SerializeUtils.writeObject(data, out);
}
}
@Override
@SuppressWarnings("unchecked")
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
retry = in.readInt();
if (instanceBuilder != null) {
Externalizable data = instanceBuilder.get();
data.readExternal(in);
this.data = (T) data;
} else {
this.data = (T) SerializeUtils.readObject(in);
}
}
}
class BufDataType extends BasicDataType<Buf<T>> {
@Override
public int compare(Buf<T> a, Buf<T> b) {
return 0;
}
@Override
public int getMemory(Buf<T> obj) {
if (obj.data instanceof MemoryUsage) {
return ((MemoryUsage) obj.data).usage();
}
if (obj.data instanceof String) {
return ((String) obj.data).length() * 2;
}
return 10_000;
}
@Override
@SneakyThrows
public void write(WriteBuffer buff, Buf<T> data) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
try (ObjectOutput output = createOutput(buffer)) {
data.writeExternal(output);
output.flush();
buff.put(buffer.nioBuffer());
} finally {
ReferenceCountUtil.safeRelease(buffer);
}
}
@Override
@SneakyThrows
public void write(WriteBuffer buff, Object obj, int len) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
try (ObjectOutput output = createOutput(buffer)) {
for (int i = 0; i < len; i++) {
@SuppressWarnings("all")
Buf<T> buf = ((Buf<T>) Array.get(obj, i));
buf.writeExternal(output);
}
output.flush();
buff.put(buffer.nioBuffer());
} finally {
ReferenceCountUtil.safeRelease(buffer);
}
}
@Override
@SneakyThrows
public void read(ByteBuffer buff, Object obj, int len) {
try (ObjectInput input = createInput(Unpooled.wrappedBuffer(buff))) {
for (int i = 0; i < len; i++) {
Buf<T> data = new Buf<>(instanceBuilder);
data.readExternal(input);
Array.set(obj, i, data);
}
}
}
@Override
@SneakyThrows
public Buf<T> read(ByteBuffer buff) {
Buf<T> data = new Buf<>(instanceBuilder);
try (ObjectInput input = createInput(Unpooled.wrappedBuffer(buff))) {
data.readExternal(input);
}
return data;
}
@Override
public Buf<T>[] createStorage(int size) {
return new Buf[size];
}
}
}

View File

@ -0,0 +1,15 @@
package org.jetlinks.community.codec;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
public interface ObjectSerializer {
ObjectInput createInput(InputStream stream);
ObjectOutput createOutput(OutputStream stream);
}

View File

@ -0,0 +1,71 @@
package org.jetlinks.community.codec;
import io.netty.util.concurrent.FastThreadLocal;
import lombok.SneakyThrows;
import org.nustaq.serialization.FSTConfiguration;
import java.io.*;
public class Serializers {
private static final ObjectSerializer JDK = new ObjectSerializer() {
@Override
@SneakyThrows
public ObjectInput createInput(InputStream stream) {
return new ObjectInputStream(stream);
}
@Override
@SneakyThrows
public ObjectOutput createOutput(OutputStream stream) {
return new ObjectOutputStream(stream);
}
};
private static final ObjectSerializer FST = new ObjectSerializer() {
final FastThreadLocal<FSTConfiguration> conf =
new FastThreadLocal<FSTConfiguration>() {
@Override
protected FSTConfiguration initialValue() {
FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration();
configuration.setForceSerializable(true);
configuration.setClassLoader(FST.getClass().getClassLoader());
return configuration;
}
};
@Override
@SneakyThrows
public ObjectInput createInput(InputStream stream) {
return conf.get().getObjectInput(stream);
}
@Override
@SneakyThrows
public ObjectOutput createOutput(OutputStream stream) {
return conf.get().getObjectOutput(stream);
}
};
private static final ObjectSerializer DEFAULT;
static {
DEFAULT = System.getProperty("jetlinks.object.serializer.type", "fst").equals("fst") ? FST : JDK;
}
public static ObjectSerializer jdk() {
return JDK;
}
public static ObjectSerializer fst() {
return FST;
}
public static ObjectSerializer getDefault() {
return DEFAULT;
}
}

View File

@ -8,7 +8,6 @@ import reactor.core.publisher.Mono;
* 异常处理工具
*
* @author wangzheng
* @see
* @since 1.0
*/
public class ErrorUtils {
@ -16,4 +15,18 @@ public class ErrorUtils {
public static <T> Mono<T> notFound(String message) {
return Mono.error(() -> new NotFoundException(message));
}
@SafeVarargs
public static boolean hasException(Throwable e, Class<? extends Throwable>... target) {
Throwable cause = e;
while (cause != null) {
for (Class<? extends Throwable> aClass : target) {
if (aClass.isInstance(cause)) {
return true;
}
}
cause = cause.getCause();
}
return false;
}
}

View File

@ -0,0 +1,61 @@
package org.jetlinks.community.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
public class ObjectMappers {
public static final ObjectMapper JSON_MAPPER;
public static final ObjectMapper CBOR_MAPPER;
public static final ObjectMapper SMILE_MAPPER;
static {
JSON_MAPPER = Jackson2ObjectMapperBuilder
.json()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
;
{
ObjectMapper cbor;
try {
cbor = Jackson2ObjectMapperBuilder
.cbor()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
} catch (Throwable ignore) {
cbor = null;
}
CBOR_MAPPER = cbor;
}
{
ObjectMapper smile;
try {
smile = Jackson2ObjectMapperBuilder
.smile()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
} catch (Throwable ignore) {
smile = null;
}
SMILE_MAPPER = smile;
}
}
@SneakyThrows
public static String toJsonString(Object data){
return JSON_MAPPER.writeValueAsString(data);
}
}

View File

@ -1,11 +1,12 @@
package org.jetlinks.community.elastic.search.service.reactive;
import com.alibaba.fastjson.JSON;
import io.netty.util.internal.ObjectPool;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
@ -17,6 +18,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.hswebframework.ezorm.core.param.QueryParam;
@ -24,63 +26,73 @@ import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.MemoryUsage;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.util.unit.DataSize;
import reactor.core.publisher.BufferOverflowStrategy;
import org.springframework.web.reactive.function.client.WebClientException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.time.Duration;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* 响应式ES数据库操作类
* 响应式ElasticSearchService
*
* @author zhouhao
* @see ReactiveElasticsearchClient
* @since 1.0
**/
@Service("elasticSearchService")
@Slf4j
@DependsOn("reactiveElasticsearchClient")
@ConfigurationProperties(prefix = "elasticsearch")
public class ReactiveElasticSearchService implements ElasticSearchService {
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
private final ReactiveElasticsearchClient restClient;
@Getter
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false
);
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd HH:mm:ss.SSS"));
}
private PersistenceBuffer<Buffer> writer;
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager) {
this.restClient = restClient;
@ -169,11 +181,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return Flux
.fromIterable(response.getHits())
.map(hit -> {
.mapNotNull(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
hitMap.putIfAbsent("id", hit.getId());
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
@ -191,11 +201,9 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return Flux
.just(searchHit)
.map(hit -> {
.mapNotNull(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
hitMap.putIfAbsent("id", hit.getId());
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
@ -226,12 +234,15 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.filter(CollectionUtils::isNotEmpty)
.flatMapMany(metadataList -> this
.createSearchRequest(queryParam.clone().noPaging(), metadataList)
.doOnNext(search -> search.source().size(queryParam.getPageSize()))
.doOnNext(search -> search.source().size(getNoPagingPageSize(queryParam)))
.flatMapMany(restClient::scroll)
.map(searchHit -> Tuples.of(metadataList, searchHit))
);
}
private int getNoPagingPageSize(QueryParam param) {
return Math.max(10000, param.getPageSize());
}
@Override
public Mono<Long> count(String[] index, QueryParam queryParam) {
@ -255,17 +266,13 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
private boolean checkWritable(String index) {
if (SystemUtils.memoryIsOutOfWatermark()) {
SystemUtils.printError("JVM内存不足,elasticsearch无法处理更多索引[%s]请求!", index);
return false;
}
return true;
}
@Override
public <T> Mono<Void> commit(String index, T payload) {
if (checkWritable(index)) {
sink.next(Buffer.of(index, payload));
writer.write(Buffer.of(index, payload));
}
return Mono.empty();
}
@ -274,7 +281,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
public <T> Mono<Void> commit(String index, Collection<T> payload) {
if (checkWritable(index)) {
for (T t : payload) {
sink.next(Buffer.of(index, t));
writer.write(Buffer.of(index, t));
}
}
return Mono.empty();
@ -299,7 +306,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
public <T> Mono<Void> save(String index, Publisher<T> data) {
return Flux.from(data)
.map(v -> Buffer.of(index, v))
.collectList()
.buffer(buffer.getSize())
.flatMap(this::doSave)
.then();
}
@ -311,123 +318,106 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@PreDestroy
public void shutdown() {
sink.complete();
writer.dispose();
}
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
public static class BufferConfig extends BufferProperties {
public BufferConfig() {
//固定缓冲文件目录
setFilePath("./data/elasticsearch-buffer");
setSize(3000);
}
private boolean refreshWhenWrite = false;
}
//@PostConstruct
public void init() {
int flushRate = buffer.rate;
int bufferSize = buffer.bufferSize;
Duration bufferTimeout = buffer.bufferTimeout;
int bufferBackpressure = buffer.bufferBackpressure;
long bufferBytes = buffer.bufferBytes.toBytes();
AtomicLong bufferedBytes = new AtomicLong();
FluxUtils
.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
.doOnNext(buf -> bufferedBytes.set(0))
.onBackpressureBuffer(bufferBackpressure, drop -> {
// TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
drop.forEach(Buffer::release);
SystemUtils.printError("elasticsearch无法处理更多索引请求!丢弃数据数量:%d", drop.size());
}, BufferOverflowStrategy.DROP_OLDEST)
.publishOn(Schedulers.boundedElastic(), bufferBackpressure)
.flatMap(buffers -> {
return Mono.create(sink -> {
try {
sink.onCancel(this
.doSave(buffers)
.doFinally((s) -> sink.success())
.subscribe());
} catch (Exception e) {
sink.success();
}
});
})
.onErrorResume((err) -> Mono
.fromRunnable(() -> SystemUtils.printError("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err))))
.subscribe();
@PostConstruct
public void reset() {
//spring 启动后更新配置信息
writer.settings(bufferSettings -> bufferSettings.properties(buffer));
}
private void init() {
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
writer = new PersistenceBuffer<>(
BufferSettings.create("writer.queue", buffer),
Buffer::new,
this::doSaveBuffer)
.name("elasticsearch")
.retryWhenError(e -> {
if (e instanceof ElasticsearchException) {
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
if (elasticsearchException.status() == RestStatus.BAD_GATEWAY) {
return true;
}
}
return ErrorUtils.hasException(e, WebClientException.class)
|| ErrorUtils.hasException(e, IOException.class);
});
writer.start();
}
public Mono<Boolean> doSaveBuffer(Flux<Buffer> bufferFlux) {
return bufferFlux
.collectList()
.flatMap(this::doSave)
.subscribeOn(Schedulers.parallel())
.map(i -> i == 0);
}
@Getter
static class Buffer {
public static class Buffer implements Externalizable, MemoryUsage {
private static final long serialVersionUID = 1;
String index;
String id;
String payload;
final ObjectPool.Handle<Buffer> handle;
public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle;
}
byte[] payload;
@SneakyThrows
public static Buffer of(String index, Object payload) {
Buffer buffer;
try {
buffer = pool.get();
} catch (Throwable e) {
buffer = new Buffer(null);
}
Buffer buffer = new Buffer();
buffer.index = index;
@SuppressWarnings("unchecked")
Map<String, Object> data = payload instanceof Map
? ((Map) payload) :
FastBeanCopier.copy(payload, HashMap::new);
Object id = data.get("id");
buffer.id = id == null ? null : String.valueOf(id);
buffer.payload = JSON.toJSONString(data);
buffer.payload = ObjectMappers.JSON_MAPPER.writeValueAsBytes(data);
return buffer;
}
void release() {
this.index = null;
this.id = null;
this.payload = null;
if (null != handle) {
handle.recycle(this);
}
}
int numberOfBytes() {
return payload == null ? 0 : payload.length() * 2;
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeUTF(index);
SerializeUtils.writeNullableUTF(id, out);
out.writeInt(payload.length);
out.write(payload);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
index = in.readUTF();
id = SerializeUtils.readNullableUTF(in);
int length = in.readInt();
payload = new byte[length];
in.readFully(payload);
}
@Override
public int usage() {
return payload == null ? 64 : 64 + payload.length;
}
}
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
@ -443,6 +433,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
int size = buffers.size();
return Flux
.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
@ -455,9 +446,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
request = new IndexRequest(realIndex).id(buffer.id);
} else {
request = new IndexRequest(realIndex).type("_doc");
request = new IndexRequest(realIndex);
}
if (getRestClient().serverVersion().before(Version.V_7_0_0)) {
request.type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
@ -475,14 +469,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
lst.forEach(request::add);
return restClient
.bulk(request)
.as(save -> {
if (buffer.maxRetry > 0) {
return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
}
return save;
});
return restClient.bulk(request);
})
.doOnError((err) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
@ -490,13 +477,12 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
org.hswebframework.utils.StringUtils.throwable2String(err)
});
})
.doOnNext(response -> {
log.trace("保存ElasticSearch数据成功,数量:{},耗时:{}", response.getItems().length, response.getTook());
.map(response -> {
if (response.hasFailures()) {
SystemUtils.printError(response.buildFailureMessage());
return 0;
}
})
.thenReturn(buffers.size());
return size;
});
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
@ -541,4 +527,5 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
}