refactor: 优化PersistenceBuffer逻辑

This commit is contained in:
zhouhao 2024-06-11 11:05:41 +08:00
parent af11c55ad9
commit 7e24f3d006
6 changed files with 544 additions and 170 deletions

View File

@ -22,4 +22,8 @@ public class BufferProperties {
//最大重试次数,超过此次数的数据将会放入死队列.
private long maxRetryTimes = 64;
public boolean isExceededRetryCount(int count) {
return maxRetryTimes > 0 && count >= maxRetryTimes;
}
}

View File

@ -0,0 +1,31 @@
package org.jetlinks.community.buffer;
/**
* 已缓冲的数据
*
* @param <T> 数据类型
* @author zhouhao
* @since 2.2
*/
public interface Buffered<T> {
/**
* @return 数据
*/
T getData();
/**
* @return 当前重试次数
*/
int getRetryTimes();
/**
* 标记是否重试此数据
*/
void retry(boolean retry);
/**
* 标记此数据为死信
*/
void dead();
}

View File

@ -1,16 +1,19 @@
package org.jetlinks.community.buffer;
import com.google.common.collect.Collections2;
import io.netty.buffer.*;
import io.netty.util.ReferenceCountUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.BasicDataType;
import org.jetlinks.community.codec.Serializers;
import org.jetlinks.core.cache.FileQueue;
import org.jetlinks.core.cache.FileQueueProxy;
import org.jetlinks.core.utils.SerializeUtils;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
@ -26,10 +29,12 @@ import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -69,12 +74,12 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "wip");
@SuppressWarnings("all")
private final static AtomicIntegerFieldUpdater<PersistenceBuffer> REMAINDER =
AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "remainder");
private final static AtomicLongFieldUpdater<PersistenceBuffer> REMAINDER =
AtomicLongFieldUpdater.newUpdater(PersistenceBuffer.class, "remainder");
@SuppressWarnings("all")
private final static AtomicIntegerFieldUpdater<PersistenceBuffer> DEAD_SZIE =
AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "deadSize");
private final static AtomicLongFieldUpdater<PersistenceBuffer> DEAD_SZIE =
AtomicLongFieldUpdater.newUpdater(PersistenceBuffer.class, "deadSize");
@SuppressWarnings("all")
private final static AtomicReferenceFieldUpdater<PersistenceBuffer, Collection> BUFFER =
@ -95,7 +100,7 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
private FileQueue<Buf<T>> deadQueue;
//缓冲数据处理器,实际处理缓冲数据的逻辑,比如写入数据库.
private final Function<Flux<T>, Mono<Boolean>> handler;
private final BiFunction<Collection<Buffered<T>>, FlushContext<T>, Mono<Boolean>> handler;
//缓冲区大小,超过此大小将执行 handler 处理逻辑
private BufferSettings settings;
@ -109,18 +114,21 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
private long lastFlushTime;
//当前正在进行的操作
private volatile int wip;
volatile int wip;
//剩余数量
private volatile int remainder;
private volatile long remainder;
//死数据数量
private volatile int deadSize;
private volatile long deadSize;
//刷新缓冲区定时任务
private Disposable intervalFlush;
private Throwable lastError;
private volatile Boolean disposed = false;
private boolean started = false;
public PersistenceBuffer(String filePath,
String fileName,
@ -137,7 +145,7 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
public PersistenceBuffer(BufferSettings settings,
Supplier<T> newInstance,
Function<Flux<T>, Mono<Boolean>> handler) {
BiFunction<Collection<Buffered<T>>, FlushContext<T>, Mono<Boolean>> handler) {
if (newInstance != null) {
T data = newInstance.get();
if (data instanceof Externalizable) {
@ -149,10 +157,13 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
this.instanceBuilder = null;
}
this.settings = settings;
//包装一层,防止apply直接报错导致流中断
this.handler = list -> Mono
.defer(() -> handler.apply(list));
this.handler = handler;
}
public PersistenceBuffer(BufferSettings settings,
Supplier<T> newInstance,
Function<Flux<T>, Mono<Boolean>> handler) {
this(settings, newInstance, (list, ignore) -> handler.apply(Flux.fromIterable(list).map(Buffered::getData)));
}
public PersistenceBuffer<T> bufferSize(int size) {
@ -200,12 +211,14 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
};
}
private void init() {
String filePath = settings.getFilePath();
String fileName = settings.getFileName();
Path path = Paths.get(filePath);
private static String getSafeFileName(String fileName) {
return fileName.replaceAll("[\\s\\\\/:*?\"<>|]", "_");
}
fileName = fileName.replaceAll("[\\s\\\\/:*?\"<>|]", "_");
public void init() {
String filePath = settings.getFilePath();
String fileName = getSafeFileName(settings.getFileName());
Path path = Paths.get(filePath);
BufDataType dataType = new BufDataType();
@ -229,16 +242,17 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
this.buffer = newBuffer();
}
public void start() {
if (intervalFlush != null) {
return;
public synchronized void start() {
if (queue == null) {
init();
}
init();
started = true;
drain();
if (!settings.getBufferTimeout().isZero()) {
if (intervalFlush != null) {
intervalFlush.dispose();
}
//定时刷新
intervalFlush = Flux
.interval(settings.getBufferTimeout())
@ -251,32 +265,54 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
private void dead(Collection<Buf<T>> buf) {
if (deadQueue.addAll(buf)) {
DEAD_SZIE.addAndGet(this, buf.size());
// DEAD_SZIE.addAndGet(this, buf.size());
}
}
private void dead(Buf<T> buf) {
if (deadQueue.add(buf)) {
DEAD_SZIE.incrementAndGet(this);
// DEAD_SZIE.incrementAndGet(this);
}
}
private void requeue(Collection<Buf<T>> buffer) {
private void requeue(Collection<Buf<T>> buffer, boolean tryDead) {
for (Buf<T> buf : buffer) {
if (++buf.retry >= settings.getMaxRetryTimes()) {
buf.retry++;
if (tryDead && buf.retry >= settings.getMaxRetryTimes() && settings.getMaxRetryTimes() > 0) {
dead(buf);
} else {
//直接写入queue,而不是使用write,等待后续有新的数据进入再重试
if (queue.offer(buf)) {
REMAINDER.incrementAndGet(this);
// REMAINDER.incrementAndGet(this);
} else {
dead(buf);
}
}
}
}
private void requeue(Collection<Buf<T>> buffer) {
requeue(buffer, true);
}
private void write(Buf<T> data) {
// remainder ++
REMAINDER.incrementAndGet(this);
FileQueue<Buf<T>> queue = this.queue;
if (isDisposed()) {
boolean flushNow;
try {
flushNow = queue == null || !queue.offer(data);
} catch (Throwable ignore) {
flushNow = true;
}
if (flushNow) {
logger.info("file queue closed,write data now:{}", data.data);
buffer().add(data);
flush();
}
return;
}
queue.offer(data);
@ -287,15 +323,51 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
write(new Buf<>(data, instanceBuilder));
}
public void stop() {
started = false;
if (this.intervalFlush != null) {
this.intervalFlush.dispose();
}
}
@SneakyThrows
public void dispose() {
logger.info("dispose buffer:{},wip:{},remainder:{}", name, wip, queue.size());
started = false;
if (DISPOSED.compareAndSet(this, false, true)) {
if (this.intervalFlush != null) {
this.intervalFlush.dispose();
}
int max = 16;
long wip = WIP.get(this);
do {
if (wip > 0) {
logger.info("cancel buffer flushing...wip:{},remainder:{}", wip, queue.size());
}
for (FlushSubscriber subscriber : new ArrayList<>(flushing)) {
subscriber.doCancel();
}
try {
Thread.sleep(500);
} catch (Throwable ignore) {
break;
}
wip = WIP.get(this);
}
while (wip > 0 && (max--) > 0);
if (wip > 0) {
logger.warn("wait buffer flushing timeout...wip:{}", wip);
}
@SuppressWarnings("all")
Collection<Buf<T>> remainders = BUFFER.getAndSet(this, newBuffer());
//写出内存中的数据
queue.addAll(BUFFER.getAndSet(this, newBuffer()));
queue.addAll(remainders);
queue.close();
deadQueue.close();
queue = null;
deadQueue = null;
}
}
@ -304,17 +376,20 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
return DISPOSED.get(this);
}
public int size() {
return remainder;
public long size() {
return queue == null ? 0 : queue.size();
}
private void intervalFlush() {
if (System.currentTimeMillis() - lastFlushTime >= settings.getBufferTimeout().toMillis()
&& WIP.get(this) <= settings.getParallelism()) {
&& WIP.get(this) <= settings.getParallelism()
&& started) {
flush();
}
}
private final Set<FlushSubscriber> flushing = ConcurrentHashMap.newKeySet();
private void flush(Collection<Buf<T>> c) {
try {
lastFlushTime = System.currentTimeMillis();
@ -323,65 +398,150 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
return;
}
// wip++
WIP.incrementAndGet(this);
FlushSubscriber subscriber = new FlushSubscriber(c);
handler
.apply(Flux.fromIterable(c).mapNotNull(buf -> buf.data))
.subscribe(new BaseSubscriber<Boolean>() {
final long startWith = System.currentTimeMillis();
final int remainder = REMAINDER.get(PersistenceBuffer.this);
@Override
protected void hookOnNext(@Nonnull Boolean doRequeue) {
if (logger.isDebugEnabled()) {
logger.debug("write {} data,size:{},remainder:{},requeue: {}.take up time: {} ms",
name,
c.size(),
remainder,
doRequeue,
System.currentTimeMillis() - startWith);
}
if (doRequeue) {
requeue(c);
}
}
@Override
protected void hookOnError(@Nonnull Throwable err) {
if (settings.getRetryWhenError().test(err)) {
if (logger.isWarnEnabled()) {
logger.warn("write {} data failed do retry later,size:{},remainder:{}.use time: {} ms",
name,
c.size(),
remainder,
System.currentTimeMillis() - startWith);
}
requeue(c);
} else {
if (logger.isWarnEnabled()) {
logger.warn("write {} data error,size:{},remainder:{}.use time: {} ms",
name,
c.size(),
remainder,
System.currentTimeMillis() - startWith,
err);
}
dead(c);
}
}
@Override
protected void hookFinally(@Nonnull SignalType type) {
// wip--
WIP.decrementAndGet(PersistenceBuffer.this);
drain();
}
});
.apply(Collections.unmodifiableCollection(c), subscriber)
.subscribe(subscriber);
} catch (Throwable e) {
logger.warn("flush buffer error", e);
WIP.decrementAndGet(this);
requeue(c, true);
}
}
class FlushSubscriber extends BaseSubscriber<Boolean> implements FlushContext<T> {
final Collection<Buf<T>> buffer;
final long startWith = System.currentTimeMillis();
@Override
public void error(Throwable e) {
lastError = e;
}
public FlushSubscriber(Collection<Buf<T>> buffer) {
this.buffer = buffer;
}
@Override
@Nonnull
protected Subscription upstream() {
return super.upstream();
}
@Override
protected void hookOnSubscribe(@Nonnull Subscription subscription) {
flushing.add(this);
WIP.incrementAndGet(PersistenceBuffer.this);
super.hookOnSubscribe(subscription);
}
@Override
protected void hookOnNext(@Nonnull Boolean doRequeue) {
synchronized (this) {
if (isDisposed()) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("write {} data,size:{},remainder:{},requeue: {}.take up time: {} ms",
name,
buffer.size(),
queue.size(),
doRequeue,
System.currentTimeMillis() - startWith);
}
if (doRequeue) {
//重试,但是不触发dead
requeue(takeRetryBuffer(), false);
//手动指定了dead的数据
dead(takeDeadBuffer(false));
} else {
lastError = null;
}
}
}
@Override
protected void hookOnError(@Nonnull Throwable err) {
synchronized (this) {
lastError = err;
if (settings.getRetryWhenError().test(err)) {
if (logger.isWarnEnabled()) {
logger.warn("write {} data failed do retry later,size:{},remainder:{}.use time: {} ms,error: {}",
name,
buffer.size(),
size(),
System.currentTimeMillis() - startWith,
ExceptionUtils.getMessage(err));
}
requeue(takeRetryBuffer());
} else {
if (logger.isWarnEnabled()) {
logger.warn("write {} data error,size:{},remainder:{}.use time: {} ms",
name,
buffer.size(),
size(),
System.currentTimeMillis() - startWith,
err);
}
dead(takeDeadBuffer(true));
}
}
}
@Override
protected void hookOnCancel() {
// hookOnNext(true);
}
void doCancel() {
synchronized (this) {
if (this.isDisposed()) {
return;
}
List<Buf<T>> buf = new ArrayList<>(buffer);
buffer.clear();
upstream().cancel();
cancel();
queue.addAll(buf);
// REMAINDER.addAndGet(PersistenceBuffer.this, buffer.size());
}
}
private Collection<Buf<T>> takeRetryBuffer() {
return Collections2.filter(
this.buffer,
//指定了retry并且没有dead
buf -> ((buf.doRetry == null) || buf.doRetry) &&
(buf.doDead == null || !buf.doDead));
}
private Collection<Buf<T>> takeDeadBuffer(boolean defaultDead) {
return Collections2.filter(
this.buffer,
//没有指定重试并且指定了dead
buf -> (buf.doRetry == null || !buf.doRetry)
&& ((defaultDead && buf.doDead == null) || Boolean.TRUE.equals(buf.doDead)));
}
@Override
protected void hookFinally(@Nonnull SignalType type) {
int size = buffer.size();
for (Buf<T> tBuf : buffer) {
tBuf.reset();
}
buffer.forEach(Buf::reset);
flushing.remove(this);
// wip--
WIP.decrementAndGet(PersistenceBuffer.this);
drain();
}
}
private void flush() {
@SuppressWarnings("all")
Collection<Buf<T>> c = BUFFER.getAndSet(this, newBuffer());
@ -393,6 +553,9 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
}
private void drain() {
if (!started) {
return;
}
//当前未执行完成的操作小于并行度才请求
if (WIP.incrementAndGet(this) <= settings.getParallelism()) {
int size = settings.getBufferSize();
@ -412,7 +575,6 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
}
private void onNext(@Nonnull Buf<T> value) {
REMAINDER.decrementAndGet(this);
Collection<Buf<T>> c;
boolean flush = false;
@ -447,10 +609,11 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
}
@AllArgsConstructor
public static class Buf<T> implements Externalizable {
public static class Buf<T> implements Buffered<T>, Externalizable {
private final Supplier<Externalizable> instanceBuilder;
private T data;
private int retry = 0;
private Boolean doRetry, doDead;
@SneakyThrows
public Buf() {
@ -488,6 +651,77 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
this.data = (T) SerializeUtils.readObject(in);
}
}
@Override
public T getData() {
return data;
}
@Override
public int getRetryTimes() {
return retry;
}
@Override
public void retry(boolean retry) {
doRetry = retry;
}
@Override
public void dead() {
doDead = true;
}
private void reset() {
doRetry = null;
doDead = null;
}
@Override
public String toString() {
return String.valueOf(data);
}
}
//尝试恢复文件数据
public long recovery(String fileName, boolean dead) {
if (fileName.startsWith("../")) {
return 0;
}
if (Objects.equals(fileName, settings.getFileName()) ||
Objects.equals(fileName, settings.getFileName() + ".dead")) {
return 0;
}
File file = new File(settings.getFilePath(), fileName);
if (!file.isFile() || !file.exists()) {
return 0;
}
BufDataType dataType = newType();
//数据队列
FileQueue<Buf<T>> _queue = wrap(
FileQueue
.<Buf<T>>builder()
.name(fileName)
.path(Paths.get(settings.getFilePath()))
.option("valueType", dataType)
.build());
try {
long size = _queue.size();
if (dead) {
queue.addAll(_queue);
PersistenceBuffer.REMAINDER.addAndGet(this, size);
} else {
deadQueue.addAll(_queue);
PersistenceBuffer.DEAD_SZIE.addAndGet(this, size);
}
return size;
} finally {
_queue.close();
}
}
BufDataType newType() {
return new BufDataType();
}
class BufDataType extends BasicDataType<Buf<T>> {
@ -568,4 +802,9 @@ public class PersistenceBuffer<T extends Serializable> implements Disposable {
}
}
public interface FlushContext<T> {
//标记错误信息
void error(Throwable e);
}
}

View File

@ -13,6 +13,7 @@ import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.supports.device.session.ClusterDeviceSessionManager;
import org.jetlinks.supports.utils.MVStoreUtils;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
@ -45,25 +46,15 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
}
static MVMap<String, PersistentSessionEntity> initStore(String file) {
File f = new File(file);
if (!f.getParentFile().exists()) {
f.getParentFile().mkdirs();
}
Supplier<MVMap<String, PersistentSessionEntity>>
builder = () -> {
MVStore store = new MVStore.Builder()
.fileName(file)
.cacheSize(1)
.open();
return store.openMap("device-session");
};
try {
return builder.get();
} catch (MVStoreException e) {
log.warn("load session from {} error,delete it and init.", file, e);
f.delete();
return builder.get();
}
MVStore store =
MVStoreUtils.open(
new File(file),
"device-session",
builder -> {
return builder.cacheSize(1);
});
return MVStoreUtils.openMap(store, "device-session", new MVMap.Builder<>());
}
@Override
@ -118,8 +109,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager
.map(ref -> ref.loaded.unwrap(PersistentSession.class))
.as(this::tryPersistent)
.block();
repository.store.compactMoveChunks();
repository.store.close();
repository.store.close(-1);
}
@Override

View File

@ -1,5 +1,6 @@
package org.jetlinks.community.elastic.search.service.reactive;
import com.google.common.collect.Collections2;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
@ -7,13 +8,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@ -26,11 +30,10 @@ import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.exception.BusinessException;
import org.jetlinks.community.buffer.*;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.MemoryUsage;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
@ -40,8 +43,11 @@ import org.jetlinks.community.utils.ErrorUtils;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.reactivestreams.Publisher;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.WebClientException;
@ -58,6 +64,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -72,7 +79,7 @@ import java.util.stream.Collectors;
@Slf4j
@DependsOn("reactiveElasticsearchClient")
@ConfigurationProperties(prefix = "elasticsearch")
public class ReactiveElasticSearchService implements ElasticSearchService {
public class ReactiveElasticSearchService implements ElasticSearchService, CommandLineRunner {
@Getter
private final ReactiveElasticsearchClient restClient;
@ -330,7 +337,20 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@PreDestroy
public void shutdown() {
writer.dispose();
writer.stop();
}
@Override
public void run(String... args) throws Exception {
//spring 启动后更新配置信息
writer
.settings(bufferSettings -> bufferSettings.properties(buffer))
.start();
//最后 shutdown
SpringApplication
.getShutdownHandlers()
.add(writer::dispose);
}
@ -366,22 +386,136 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return true;
}
}
return ErrorUtils.hasException(e, WebClientException.class)
|| ErrorUtils.hasException(e, IOException.class);
return ErrorUtils.hasException(
e,
WebClientException.class,
IOException.class,
TimeoutException.class,
io.netty.handler.timeout.TimeoutException.class);
});
writer.start();
writer.init();
}
public Mono<Boolean> doSaveBuffer(Flux<Buffer> bufferFlux) {
return bufferFlux
public Mono<Boolean> doSaveBuffer(Collection<Buffered<Buffer>> bufferFlux,
PersistenceBuffer.FlushContext<Buffer> context) {
List<Buffered<Buffer>> list = bufferFlux instanceof List
? ((List<Buffered<Buffer>>) bufferFlux)
: new ArrayList<>(bufferFlux);
int size = list.size();
return this
.doSave0(Collections2.transform(list, Buffered::getData))
.map(response -> {
boolean hasError = false;
BulkItemResponse[] arr = response.getItems();
Set<String> errors = null;
//响应数量不一致?
if (arr.length != size) {
log.warn("ElasticSearch response item size not equals to buffer size," +
" response size:{}, buffer size:{}",
arr.length,
size);
}
for (int i = 0; i < arr.length; i++) {
BulkItemResponse item = arr[i];
Buffered<Buffer> buffered = size > i ? list.get(i) : null;
HttpStatus status = HttpStatus.resolve(item.status().getStatus());
if ((status == null || !status.is2xxSuccessful())) {
hasError = true;
if (null != item.getFailure()) {
context.error(new BusinessException.NoStackTrace(item.getFailure().getMessage()));
}
if (log.isInfoEnabled()) {
String msg = item.getFailureMessage();
if (errors == null) {
errors = new HashSet<>();
}
if (msg == null || errors.add(msg)) {
log.info("write elasticsearch data [{}] failed: {}",
buffered,
Strings.toString(item));
}
}
//失败
if (buffered != null) {
if (isDead(buffered, item)) {
//标记dead
buffered.dead();
} else {
//标记重试
buffered.retry(true);
}
}
continue;
}
//成功,标记此条数据不重试.
if (buffered != null) {
buffered.retry(false);
}
}
//有任何错误,则触发重试
return hasError;
});
}
private static final EnumSet<RestStatus> deadStatus = EnumSet.of(
RestStatus.FORBIDDEN,
RestStatus.BAD_REQUEST,
RestStatus.UNAUTHORIZED,
RestStatus.NOT_FOUND,
RestStatus.METHOD_NOT_ALLOWED);
private boolean isDead(Buffered<Buffer> buffered, BulkItemResponse response) {
return buffer.isExceededRetryCount(buffered.getRetryTimes()) ||
//快速失败,不再重试
deadStatus.contains(response.status());
}
protected Mono<BulkResponse> doSave0(Collection<Buffer> buffers) {
return Flux
.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
return this
.getIndexForSave(index)
.flatMapMany(realIndex -> group
.map(buffer -> {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).id(buffer.id);
} else {
request = new IndexRequest(realIndex);
}
if (getRestClient().serverVersion().before(Version.V_7_0_0)) {
@SuppressWarnings("all")
IndexRequest ignore = request.type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
} finally {
buffer.release();
}
}));
})
.collectList()
.flatMap(this::doSave)
.subscribeOn(Schedulers.parallel())
.map(i -> i == 0);
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
if (buffer.isRefreshWhenWrite()) {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
lst.forEach(request::add);
return restClient.bulk(request);
});
}
@Getter
public static class Buffer implements Externalizable, MemoryUsage {
private static final long serialVersionUID = 1;
@ -446,44 +580,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
int size = buffers.size();
return Flux
.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
return this
.getIndexForSave(index)
.flatMapMany(realIndex -> group
.map(buffer -> {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).id(buffer.id);
} else {
request = new IndexRequest(realIndex);
}
if (getRestClient().serverVersion().before(Version.V_7_0_0)) {
request.type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
} finally {
buffer.release();
}
}));
})
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
if (buffer.isRefreshWhenWrite()) {
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
lst.forEach(request::add);
return restClient.bulk(request);
})
return doSave0(buffers)
.doOnError((err) -> {
//这里的错误都输出到控制台,输入到slf4j可能会造成日志递归.
SystemUtils.printError("保存ElasticSearch数据失败:\n%s", () -> new Object[]{
@ -491,10 +588,13 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
});
})
.map(response -> {
if (response.hasFailures()) {
return 0;
int success = 0;
for (BulkItemResponse item : response.getItems()) {
if (!item.isFailed()) {
success++;
}
}
return size;
return success;
});
}

View File

@ -41,6 +41,8 @@ import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationColumn;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
@ -69,7 +71,7 @@ import java.util.stream.Collectors;
* @since 1.5.0
*/
@Slf4j
public class DatabaseDeviceLatestDataService implements DeviceLatestDataService {
public class DatabaseDeviceLatestDataService implements DeviceLatestDataService, CommandLineRunner {
private final DatabaseOperator databaseOperator;
@ -133,18 +135,26 @@ public class DatabaseDeviceLatestDataService implements DeviceLatestDataService
this::doWrite)
.name("device-latest-data");
writer.start();
writer.init();
}
public void destroy() {
writer.dispose();
writer.stop();
}
static GeoCodec geoCodec = new GeoCodec();
static StringCodec stringCodec = new StringCodec();
@Override
public void run(String... args) throws Exception {
writer.start();
SpringApplication
.getShutdownHandlers()
.add(writer::dispose);
}
static class GeoCodec implements ValueCodec<String, GeoPoint> {
@Override