From 7e24f3d00633bf3d883ae2abe802ee8ac3014413 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Tue, 11 Jun 2024 11:05:41 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96PersistenceBuffer?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../community/buffer/BufferProperties.java | 4 + .../jetlinks/community/buffer/Buffered.java | 31 ++ .../community/buffer/PersistenceBuffer.java | 421 ++++++++++++++---- .../PersistenceDeviceSessionManager.java | 32 +- .../ReactiveElasticSearchService.java | 210 ++++++--- .../data/DatabaseDeviceLatestDataService.java | 16 +- 6 files changed, 544 insertions(+), 170 deletions(-) create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/Buffered.java diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/BufferProperties.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/BufferProperties.java index 9d04a626..46f04ba5 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/BufferProperties.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/BufferProperties.java @@ -22,4 +22,8 @@ public class BufferProperties { //最大重试次数,超过此次数的数据将会放入死队列. private long maxRetryTimes = 64; + + public boolean isExceededRetryCount(int count) { + return maxRetryTimes > 0 && count >= maxRetryTimes; + } } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/Buffered.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/Buffered.java new file mode 100644 index 00000000..cb19a9df --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/Buffered.java @@ -0,0 +1,31 @@ +package org.jetlinks.community.buffer; + +/** + * 已缓冲的数据 + * + * @param 数据类型 + * @author zhouhao + * @since 2.2 + */ +public interface Buffered { + + /** + * @return 数据 + */ + T getData(); + + /** + * @return 当前重试次数 + */ + int getRetryTimes(); + + /** + * 标记是否重试此数据 + */ + void retry(boolean retry); + + /** + * 标记此数据为死信 + */ + void dead(); +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/PersistenceBuffer.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/PersistenceBuffer.java index dae08b8c..a12c0e55 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/PersistenceBuffer.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/buffer/PersistenceBuffer.java @@ -1,16 +1,19 @@ package org.jetlinks.community.buffer; +import com.google.common.collect.Collections2; import io.netty.buffer.*; import io.netty.util.ReferenceCountUtil; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.SneakyThrows; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.type.BasicDataType; import org.jetlinks.community.codec.Serializers; import org.jetlinks.core.cache.FileQueue; import org.jetlinks.core.cache.FileQueueProxy; import org.jetlinks.core.utils.SerializeUtils; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -26,10 +29,12 @@ import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -69,12 +74,12 @@ public class PersistenceBuffer implements Disposable { AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "wip"); @SuppressWarnings("all") - private final static AtomicIntegerFieldUpdater REMAINDER = - AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "remainder"); + private final static AtomicLongFieldUpdater REMAINDER = + AtomicLongFieldUpdater.newUpdater(PersistenceBuffer.class, "remainder"); @SuppressWarnings("all") - private final static AtomicIntegerFieldUpdater DEAD_SZIE = - AtomicIntegerFieldUpdater.newUpdater(PersistenceBuffer.class, "deadSize"); + private final static AtomicLongFieldUpdater DEAD_SZIE = + AtomicLongFieldUpdater.newUpdater(PersistenceBuffer.class, "deadSize"); @SuppressWarnings("all") private final static AtomicReferenceFieldUpdater BUFFER = @@ -95,7 +100,7 @@ public class PersistenceBuffer implements Disposable { private FileQueue> deadQueue; //缓冲数据处理器,实际处理缓冲数据的逻辑,比如写入数据库. - private final Function, Mono> handler; + private final BiFunction>, FlushContext, Mono> handler; //缓冲区大小,超过此大小将执行 handler 处理逻辑 private BufferSettings settings; @@ -109,18 +114,21 @@ public class PersistenceBuffer implements Disposable { private long lastFlushTime; //当前正在进行的操作 - private volatile int wip; + volatile int wip; //剩余数量 - private volatile int remainder; + private volatile long remainder; //死数据数量 - private volatile int deadSize; + private volatile long deadSize; //刷新缓冲区定时任务 private Disposable intervalFlush; + private Throwable lastError; + private volatile Boolean disposed = false; + private boolean started = false; public PersistenceBuffer(String filePath, String fileName, @@ -137,7 +145,7 @@ public class PersistenceBuffer implements Disposable { public PersistenceBuffer(BufferSettings settings, Supplier newInstance, - Function, Mono> handler) { + BiFunction>, FlushContext, Mono> handler) { if (newInstance != null) { T data = newInstance.get(); if (data instanceof Externalizable) { @@ -149,10 +157,13 @@ public class PersistenceBuffer implements Disposable { this.instanceBuilder = null; } this.settings = settings; - //包装一层,防止apply直接报错导致流中断 - this.handler = list -> Mono - .defer(() -> handler.apply(list)); + this.handler = handler; + } + public PersistenceBuffer(BufferSettings settings, + Supplier newInstance, + Function, Mono> handler) { + this(settings, newInstance, (list, ignore) -> handler.apply(Flux.fromIterable(list).map(Buffered::getData))); } public PersistenceBuffer bufferSize(int size) { @@ -200,12 +211,14 @@ public class PersistenceBuffer implements Disposable { }; } - private void init() { - String filePath = settings.getFilePath(); - String fileName = settings.getFileName(); - Path path = Paths.get(filePath); + private static String getSafeFileName(String fileName) { + return fileName.replaceAll("[\\s\\\\/:*?\"<>|]", "_"); + } - fileName = fileName.replaceAll("[\\s\\\\/:*?\"<>|]", "_"); + public void init() { + String filePath = settings.getFilePath(); + String fileName = getSafeFileName(settings.getFileName()); + Path path = Paths.get(filePath); BufDataType dataType = new BufDataType(); @@ -229,16 +242,17 @@ public class PersistenceBuffer implements Disposable { this.buffer = newBuffer(); } - public void start() { - if (intervalFlush != null) { - return; + public synchronized void start() { + if (queue == null) { + init(); } - - init(); - + started = true; drain(); if (!settings.getBufferTimeout().isZero()) { + if (intervalFlush != null) { + intervalFlush.dispose(); + } //定时刷新 intervalFlush = Flux .interval(settings.getBufferTimeout()) @@ -251,32 +265,54 @@ public class PersistenceBuffer implements Disposable { private void dead(Collection> buf) { if (deadQueue.addAll(buf)) { - DEAD_SZIE.addAndGet(this, buf.size()); + // DEAD_SZIE.addAndGet(this, buf.size()); } } private void dead(Buf buf) { if (deadQueue.add(buf)) { - DEAD_SZIE.incrementAndGet(this); + // DEAD_SZIE.incrementAndGet(this); } } - private void requeue(Collection> buffer) { + private void requeue(Collection> buffer, boolean tryDead) { for (Buf buf : buffer) { - if (++buf.retry >= settings.getMaxRetryTimes()) { + + buf.retry++; + + if (tryDead && buf.retry >= settings.getMaxRetryTimes() && settings.getMaxRetryTimes() > 0) { dead(buf); } else { //直接写入queue,而不是使用write,等待后续有新的数据进入再重试 if (queue.offer(buf)) { - REMAINDER.incrementAndGet(this); + // REMAINDER.incrementAndGet(this); + } else { + dead(buf); } } } } + private void requeue(Collection> buffer) { + requeue(buffer, true); + } + private void write(Buf data) { - // remainder ++ - REMAINDER.incrementAndGet(this); + FileQueue> queue = this.queue; + if (isDisposed()) { + boolean flushNow; + try { + flushNow = queue == null || !queue.offer(data); + } catch (Throwable ignore) { + flushNow = true; + } + if (flushNow) { + logger.info("file queue closed,write data now:{}", data.data); + buffer().add(data); + flush(); + } + return; + } queue.offer(data); @@ -287,15 +323,51 @@ public class PersistenceBuffer implements Disposable { write(new Buf<>(data, instanceBuilder)); } + public void stop() { + started = false; + if (this.intervalFlush != null) { + this.intervalFlush.dispose(); + } + } + + @SneakyThrows public void dispose() { + logger.info("dispose buffer:{},wip:{},remainder:{}", name, wip, queue.size()); + started = false; if (DISPOSED.compareAndSet(this, false, true)) { if (this.intervalFlush != null) { this.intervalFlush.dispose(); } + int max = 16; + long wip = WIP.get(this); + do { + if (wip > 0) { + logger.info("cancel buffer flushing...wip:{},remainder:{}", wip, queue.size()); + } + for (FlushSubscriber subscriber : new ArrayList<>(flushing)) { + subscriber.doCancel(); + } + + try { + Thread.sleep(500); + } catch (Throwable ignore) { + break; + } + wip = WIP.get(this); + } + while (wip > 0 && (max--) > 0); + + if (wip > 0) { + logger.warn("wait buffer flushing timeout...wip:{}", wip); + } + @SuppressWarnings("all") + Collection> remainders = BUFFER.getAndSet(this, newBuffer()); //写出内存中的数据 - queue.addAll(BUFFER.getAndSet(this, newBuffer())); + queue.addAll(remainders); queue.close(); deadQueue.close(); + queue = null; + deadQueue = null; } } @@ -304,17 +376,20 @@ public class PersistenceBuffer implements Disposable { return DISPOSED.get(this); } - public int size() { - return remainder; + public long size() { + return queue == null ? 0 : queue.size(); } private void intervalFlush() { if (System.currentTimeMillis() - lastFlushTime >= settings.getBufferTimeout().toMillis() - && WIP.get(this) <= settings.getParallelism()) { + && WIP.get(this) <= settings.getParallelism() + && started) { flush(); } } + private final Set flushing = ConcurrentHashMap.newKeySet(); + private void flush(Collection> c) { try { lastFlushTime = System.currentTimeMillis(); @@ -323,65 +398,150 @@ public class PersistenceBuffer implements Disposable { return; } // wip++ - WIP.incrementAndGet(this); - + FlushSubscriber subscriber = new FlushSubscriber(c); handler - .apply(Flux.fromIterable(c).mapNotNull(buf -> buf.data)) - .subscribe(new BaseSubscriber() { - final long startWith = System.currentTimeMillis(); - final int remainder = REMAINDER.get(PersistenceBuffer.this); - - @Override - protected void hookOnNext(@Nonnull Boolean doRequeue) { - if (logger.isDebugEnabled()) { - logger.debug("write {} data,size:{},remainder:{},requeue: {}.take up time: {} ms", - name, - c.size(), - remainder, - doRequeue, - System.currentTimeMillis() - startWith); - } - if (doRequeue) { - requeue(c); - } - } - - @Override - protected void hookOnError(@Nonnull Throwable err) { - if (settings.getRetryWhenError().test(err)) { - if (logger.isWarnEnabled()) { - logger.warn("write {} data failed do retry later,size:{},remainder:{}.use time: {} ms", - name, - c.size(), - remainder, - System.currentTimeMillis() - startWith); - } - requeue(c); - } else { - if (logger.isWarnEnabled()) { - logger.warn("write {} data error,size:{},remainder:{}.use time: {} ms", - name, - c.size(), - remainder, - System.currentTimeMillis() - startWith, - err); - } - dead(c); - } - } - - @Override - protected void hookFinally(@Nonnull SignalType type) { - // wip-- - WIP.decrementAndGet(PersistenceBuffer.this); - drain(); - } - }); + .apply(Collections.unmodifiableCollection(c), subscriber) + .subscribe(subscriber); } catch (Throwable e) { logger.warn("flush buffer error", e); + WIP.decrementAndGet(this); + requeue(c, true); } } + class FlushSubscriber extends BaseSubscriber implements FlushContext { + final Collection> buffer; + final long startWith = System.currentTimeMillis(); + + @Override + public void error(Throwable e) { + lastError = e; + } + + public FlushSubscriber(Collection> buffer) { + this.buffer = buffer; + } + + @Override + @Nonnull + protected Subscription upstream() { + return super.upstream(); + } + + @Override + protected void hookOnSubscribe(@Nonnull Subscription subscription) { + flushing.add(this); + WIP.incrementAndGet(PersistenceBuffer.this); + super.hookOnSubscribe(subscription); + } + + @Override + protected void hookOnNext(@Nonnull Boolean doRequeue) { + synchronized (this) { + if (isDisposed()) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("write {} data,size:{},remainder:{},requeue: {}.take up time: {} ms", + name, + buffer.size(), + queue.size(), + doRequeue, + System.currentTimeMillis() - startWith); + } + if (doRequeue) { + //重试,但是不触发dead + requeue(takeRetryBuffer(), false); + //手动指定了dead的数据 + dead(takeDeadBuffer(false)); + } else { + lastError = null; + } + } + } + + @Override + protected void hookOnError(@Nonnull Throwable err) { + synchronized (this) { + lastError = err; + if (settings.getRetryWhenError().test(err)) { + if (logger.isWarnEnabled()) { + logger.warn("write {} data failed do retry later,size:{},remainder:{}.use time: {} ms,error: {}", + name, + buffer.size(), + size(), + System.currentTimeMillis() - startWith, + ExceptionUtils.getMessage(err)); + } + requeue(takeRetryBuffer()); + } else { + if (logger.isWarnEnabled()) { + logger.warn("write {} data error,size:{},remainder:{}.use time: {} ms", + name, + buffer.size(), + size(), + System.currentTimeMillis() - startWith, + err); + } + dead(takeDeadBuffer(true)); + } + } + } + + @Override + protected void hookOnCancel() { + // hookOnNext(true); + } + + void doCancel() { + + synchronized (this) { + if (this.isDisposed()) { + return; + } + List> buf = new ArrayList<>(buffer); + buffer.clear(); + + upstream().cancel(); + cancel(); + + queue.addAll(buf); + // REMAINDER.addAndGet(PersistenceBuffer.this, buffer.size()); + } + } + + private Collection> takeRetryBuffer() { + return Collections2.filter( + this.buffer, + //指定了retry并且没有dead + buf -> ((buf.doRetry == null) || buf.doRetry) && + (buf.doDead == null || !buf.doDead)); + } + + private Collection> takeDeadBuffer(boolean defaultDead) { + return Collections2.filter( + this.buffer, + //没有指定重试并且指定了dead + buf -> (buf.doRetry == null || !buf.doRetry) + && ((defaultDead && buf.doDead == null) || Boolean.TRUE.equals(buf.doDead))); + } + + @Override + protected void hookFinally(@Nonnull SignalType type) { + int size = buffer.size(); + for (Buf tBuf : buffer) { + tBuf.reset(); + } + buffer.forEach(Buf::reset); + flushing.remove(this); + // wip-- + WIP.decrementAndGet(PersistenceBuffer.this); + drain(); + } + + } + private void flush() { @SuppressWarnings("all") Collection> c = BUFFER.getAndSet(this, newBuffer()); @@ -393,6 +553,9 @@ public class PersistenceBuffer implements Disposable { } private void drain() { + if (!started) { + return; + } //当前未执行完成的操作小于并行度才请求 if (WIP.incrementAndGet(this) <= settings.getParallelism()) { int size = settings.getBufferSize(); @@ -412,7 +575,6 @@ public class PersistenceBuffer implements Disposable { } private void onNext(@Nonnull Buf value) { - REMAINDER.decrementAndGet(this); Collection> c; boolean flush = false; @@ -447,10 +609,11 @@ public class PersistenceBuffer implements Disposable { } @AllArgsConstructor - public static class Buf implements Externalizable { + public static class Buf implements Buffered, Externalizable { private final Supplier instanceBuilder; private T data; private int retry = 0; + private Boolean doRetry, doDead; @SneakyThrows public Buf() { @@ -488,6 +651,77 @@ public class PersistenceBuffer implements Disposable { this.data = (T) SerializeUtils.readObject(in); } } + + @Override + public T getData() { + return data; + } + + @Override + public int getRetryTimes() { + return retry; + } + + @Override + public void retry(boolean retry) { + doRetry = retry; + } + + @Override + public void dead() { + doDead = true; + } + + private void reset() { + doRetry = null; + doDead = null; + } + + @Override + public String toString() { + return String.valueOf(data); + } + } + + //尝试恢复文件数据 + public long recovery(String fileName, boolean dead) { + if (fileName.startsWith("../")) { + return 0; + } + if (Objects.equals(fileName, settings.getFileName()) || + Objects.equals(fileName, settings.getFileName() + ".dead")) { + return 0; + } + File file = new File(settings.getFilePath(), fileName); + if (!file.isFile() || !file.exists()) { + return 0; + } + BufDataType dataType = newType(); + //数据队列 + FileQueue> _queue = wrap( + FileQueue + .>builder() + .name(fileName) + .path(Paths.get(settings.getFilePath())) + .option("valueType", dataType) + .build()); + try { + long size = _queue.size(); + if (dead) { + queue.addAll(_queue); + PersistenceBuffer.REMAINDER.addAndGet(this, size); + } else { + deadQueue.addAll(_queue); + PersistenceBuffer.DEAD_SZIE.addAndGet(this, size); + } + return size; + } finally { + _queue.close(); + } + } + + BufDataType newType() { + return new BufDataType(); } class BufDataType extends BasicDataType> { @@ -568,4 +802,9 @@ public class PersistenceBuffer implements Disposable { } } + public interface FlushContext { + + //标记错误信息 + void error(Throwable e); + } } diff --git a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java index d92391db..175c0df6 100644 --- a/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java +++ b/jetlinks-components/configure-component/src/main/java/org/jetlinks/community/configure/device/PersistenceDeviceSessionManager.java @@ -13,6 +13,7 @@ import org.jetlinks.core.rpc.RpcManager; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.PersistentSession; import org.jetlinks.supports.device.session.ClusterDeviceSessionManager; +import org.jetlinks.supports.utils.MVStoreUtils; import org.springframework.beans.BeansException; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationContext; @@ -45,25 +46,15 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager } static MVMap initStore(String file) { - File f = new File(file); - if (!f.getParentFile().exists()) { - f.getParentFile().mkdirs(); - } - Supplier> - builder = () -> { - MVStore store = new MVStore.Builder() - .fileName(file) - .cacheSize(1) - .open(); - return store.openMap("device-session"); - }; - try { - return builder.get(); - } catch (MVStoreException e) { - log.warn("load session from {} error,delete it and init.", file, e); - f.delete(); - return builder.get(); - } + MVStore store = + MVStoreUtils.open( + new File(file), + "device-session", + builder -> { + return builder.cacheSize(1); + }); + + return MVStoreUtils.openMap(store, "device-session", new MVMap.Builder<>()); } @Override @@ -118,8 +109,7 @@ public class PersistenceDeviceSessionManager extends ClusterDeviceSessionManager .map(ref -> ref.loaded.unwrap(PersistentSession.class)) .as(this::tryPersistent) .block(); - repository.store.compactMoveChunks(); - repository.store.close(); + repository.store.close(-1); } @Override diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java index b9ce08ac..0c8d5643 100755 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java @@ -1,5 +1,6 @@ package org.jetlinks.community.elastic.search.service.reactive; +import com.google.common.collect.Collections2; import lombok.Getter; import lombok.Setter; import lombok.SneakyThrows; @@ -7,13 +8,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.MultiSearchRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.Strings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -26,11 +30,10 @@ import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DefaultDateFormatter; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.bean.FastBeanCopier; +import org.hswebframework.web.exception.BusinessException; +import org.jetlinks.community.buffer.*; +import org.jetlinks.core.trace.MonoTracer; import org.jetlinks.core.utils.SerializeUtils; -import org.jetlinks.community.buffer.BufferProperties; -import org.jetlinks.community.buffer.BufferSettings; -import org.jetlinks.community.buffer.MemoryUsage; -import org.jetlinks.community.buffer.PersistenceBuffer; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.service.ElasticSearchService; @@ -40,8 +43,11 @@ import org.jetlinks.community.utils.ErrorUtils; import org.jetlinks.community.utils.ObjectMappers; import org.jetlinks.community.utils.SystemUtils; import org.reactivestreams.Publisher; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.DependsOn; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.client.WebClientException; @@ -58,6 +64,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.*; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -72,7 +79,7 @@ import java.util.stream.Collectors; @Slf4j @DependsOn("reactiveElasticsearchClient") @ConfigurationProperties(prefix = "elasticsearch") -public class ReactiveElasticSearchService implements ElasticSearchService { +public class ReactiveElasticSearchService implements ElasticSearchService, CommandLineRunner { @Getter private final ReactiveElasticsearchClient restClient; @@ -330,7 +337,20 @@ public class ReactiveElasticSearchService implements ElasticSearchService { @PreDestroy public void shutdown() { - writer.dispose(); + writer.stop(); + } + + @Override + public void run(String... args) throws Exception { + //spring 启动后更新配置信息 + writer + .settings(bufferSettings -> bufferSettings.properties(buffer)) + .start(); + + //最后 shutdown + SpringApplication + .getShutdownHandlers() + .add(writer::dispose); } @@ -366,22 +386,136 @@ public class ReactiveElasticSearchService implements ElasticSearchService { return true; } } - return ErrorUtils.hasException(e, WebClientException.class) - || ErrorUtils.hasException(e, IOException.class); + return ErrorUtils.hasException( + e, + WebClientException.class, + IOException.class, + TimeoutException.class, + io.netty.handler.timeout.TimeoutException.class); }); - writer.start(); + writer.init(); } - public Mono doSaveBuffer(Flux bufferFlux) { - return bufferFlux + + public Mono doSaveBuffer(Collection> bufferFlux, + PersistenceBuffer.FlushContext context) { + List> list = bufferFlux instanceof List + ? ((List>) bufferFlux) + : new ArrayList<>(bufferFlux); + int size = list.size(); + return this + .doSave0(Collections2.transform(list, Buffered::getData)) + .map(response -> { + boolean hasError = false; + BulkItemResponse[] arr = response.getItems(); + Set errors = null; + //响应数量不一致? + if (arr.length != size) { + log.warn("ElasticSearch response item size not equals to buffer size," + + " response size:{}, buffer size:{}", + arr.length, + size); + } + for (int i = 0; i < arr.length; i++) { + BulkItemResponse item = arr[i]; + Buffered buffered = size > i ? list.get(i) : null; + HttpStatus status = HttpStatus.resolve(item.status().getStatus()); + if ((status == null || !status.is2xxSuccessful())) { + hasError = true; + if (null != item.getFailure()) { + context.error(new BusinessException.NoStackTrace(item.getFailure().getMessage())); + } + if (log.isInfoEnabled()) { + String msg = item.getFailureMessage(); + if (errors == null) { + errors = new HashSet<>(); + } + if (msg == null || errors.add(msg)) { + log.info("write elasticsearch data [{}] failed: {}", + buffered, + Strings.toString(item)); + } + } + //失败 + if (buffered != null) { + if (isDead(buffered, item)) { + //标记dead + buffered.dead(); + } else { + //标记重试 + buffered.retry(true); + } + } + continue; + } + //成功,标记此条数据不重试. + if (buffered != null) { + buffered.retry(false); + } + } + //有任何错误,则触发重试 + return hasError; + }); + + } + + private static final EnumSet deadStatus = EnumSet.of( + RestStatus.FORBIDDEN, + RestStatus.BAD_REQUEST, + RestStatus.UNAUTHORIZED, + RestStatus.NOT_FOUND, + RestStatus.METHOD_NOT_ALLOWED); + + private boolean isDead(Buffered buffered, BulkItemResponse response) { + return buffer.isExceededRetryCount(buffered.getRetryTimes()) || + //快速失败,不再重试 + deadStatus.contains(response.status()); + } + + protected Mono doSave0(Collection buffers) { + return Flux + .fromIterable(buffers) + .groupBy(Buffer::getIndex, Integer.MAX_VALUE) + .flatMap(group -> { + String index = group.key(); + return this + .getIndexForSave(index) + .flatMapMany(realIndex -> group + .map(buffer -> { + try { + IndexRequest request; + if (buffer.id != null) { + request = new IndexRequest(realIndex).id(buffer.id); + } else { + request = new IndexRequest(realIndex); + } + if (getRestClient().serverVersion().before(Version.V_7_0_0)) { + @SuppressWarnings("all") + IndexRequest ignore = request.type("_doc"); + } + request.source(buffer.payload, XContentType.JSON); + return request; + } finally { + buffer.release(); + } + })); + }) .collectList() - .flatMap(this::doSave) - .subscribeOn(Schedulers.parallel()) - .map(i -> i == 0); + .filter(CollectionUtils::isNotEmpty) + .flatMap(lst -> { + BulkRequest request = new BulkRequest(); + request.timeout(TimeValue.timeValueSeconds(9)); + if (buffer.isRefreshWhenWrite()) { + request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + lst.forEach(request::add); + return restClient.bulk(request); + }); } + @Getter public static class Buffer implements Externalizable, MemoryUsage { private static final long serialVersionUID = 1; @@ -446,44 +580,7 @@ public class ReactiveElasticSearchService implements ElasticSearchService { } protected Mono doSave(Collection buffers) { - int size = buffers.size(); - return Flux - .fromIterable(buffers) - .groupBy(Buffer::getIndex, Integer.MAX_VALUE) - .flatMap(group -> { - String index = group.key(); - return this - .getIndexForSave(index) - .flatMapMany(realIndex -> group - .map(buffer -> { - try { - IndexRequest request; - if (buffer.id != null) { - request = new IndexRequest(realIndex).id(buffer.id); - } else { - request = new IndexRequest(realIndex); - } - if (getRestClient().serverVersion().before(Version.V_7_0_0)) { - request.type("_doc"); - } - request.source(buffer.payload, XContentType.JSON); - return request; - } finally { - buffer.release(); - } - })); - }) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(lst -> { - BulkRequest request = new BulkRequest(); - request.timeout(TimeValue.timeValueSeconds(9)); - if (buffer.isRefreshWhenWrite()) { - request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - } - lst.forEach(request::add); - return restClient.bulk(request); - }) + return doSave0(buffers) .doOnError((err) -> { //这里的错误都输出到控制台,输入到slf4j可能会造成日志递归. SystemUtils.printError("保存ElasticSearch数据失败:\n%s", () -> new Object[]{ @@ -491,10 +588,13 @@ public class ReactiveElasticSearchService implements ElasticSearchService { }); }) .map(response -> { - if (response.hasFailures()) { - return 0; + int success = 0; + for (BulkItemResponse item : response.getItems()) { + if (!item.isFailed()) { + success++; + } } - return size; + return success; }); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService.java index ab36b2f6..933b47bb 100755 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService.java @@ -41,6 +41,8 @@ import org.jetlinks.community.gateway.annotation.Subscribe; import org.jetlinks.community.timeseries.query.Aggregation; import org.jetlinks.community.timeseries.query.AggregationColumn; import org.jetlinks.reactor.ql.utils.CastUtils; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; @@ -69,7 +71,7 @@ import java.util.stream.Collectors; * @since 1.5.0 */ @Slf4j -public class DatabaseDeviceLatestDataService implements DeviceLatestDataService { +public class DatabaseDeviceLatestDataService implements DeviceLatestDataService, CommandLineRunner { private final DatabaseOperator databaseOperator; @@ -133,18 +135,26 @@ public class DatabaseDeviceLatestDataService implements DeviceLatestDataService this::doWrite) .name("device-latest-data"); - writer.start(); + writer.init(); } public void destroy() { - writer.dispose(); + writer.stop(); } static GeoCodec geoCodec = new GeoCodec(); static StringCodec stringCodec = new StringCodec(); + @Override + public void run(String... args) throws Exception { + writer.start(); + SpringApplication + .getShutdownHandlers() + .add(writer::dispose); + } + static class GeoCodec implements ValueCodec { @Override