From 84ba16399275c72a3770b6d32a1921cdbd384413 Mon Sep 17 00:00:00 2001 From: Tensai <517634644@qq.com> Date: Fri, 28 May 2021 10:57:38 +0800 Subject: [PATCH 1/3] =?UTF-8?q?docs(=E6=95=B0=E6=8D=AE=E6=B5=81=E3=80=90?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=BD=91=E5=85=B3-->=E7=89=A9=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E8=BD=AC=E6=8D=A2->=E6=97=B6=E5=BA=8F=E5=85=A5?= =?UTF-8?q?=E5=BA=93=E3=80=91):=20=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=B5=81=E5=85=A5=E5=BA=93=E7=9A=84=E4=BB=A3=E7=A0=81=E6=B3=A8?= =?UTF-8?q?=E9=87=8A=20=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tensai <517634644@qq.com> --- .../dashboard/MeasurementParameter.java | 1 + .../service/DefaultElasticSearchService.java | 5 +- .../search/service/ElasticSearchService.java | 5 + .../ReactiveElasticSearchService.java | 4 +- .../supports/DefaultDeviceGatewayManager.java | 32 +- .../supports/DeviceGatewayProperties.java | 3 + .../DeviceGatewayPropertiesManager.java | 9 + .../supports/DeviceGatewayProvider.java | 5 + .../network/utils/DeviceGatewayHelper.java | 28 +- .../tcp/device/TcpServerDeviceGateway.java | 295 ++++++++++-------- .../network/tcp/server/TcpServer.java | 1 + .../community/timeseries/TimeSeriesData.java | 1 + .../message/DeviceMessageConnector.java | 235 +++++++------- .../TimeSeriesMessageWriterConnector.java | 8 +- .../data/AbstractDeviceDataStoragePolicy.java | 131 ++++---- .../data/DefaultDeviceDataService.java | 42 ++- ...meSeriesColumnDeviceDataStoragePolicy.java | 25 +- .../TimeSeriesDeviceDataStoragePolicy.java | 4 + ...SeriesRowDeviceDataStoreStoragePolicy.java | 22 +- .../timeseries/DeviceTimeSeriesMetric.java | 9 +- .../service/DeviceGatewayConfigService.java | 3 + 21 files changed, 546 insertions(+), 322 deletions(-) diff --git a/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java b/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java index 2941be1b..0cef9c99 100644 --- a/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java +++ b/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java @@ -17,6 +17,7 @@ import java.util.Optional; public class MeasurementParameter implements ValueObject { private Map params = new HashMap<>(); + @Override public Optional get(String name) { return Optional.ofNullable(params).map(p -> p.get(name)); } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java index 5c988f4a..a494ba6f 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/DefaultElasticSearchService.java @@ -37,7 +37,6 @@ import org.jetlinks.community.elastic.search.utils.ReactorActionListener; import org.jetlinks.core.utils.FluxUtils; import org.reactivestreams.Publisher; import org.springframework.context.annotation.DependsOn; -import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; @@ -119,12 +118,14 @@ public class DefaultElasticSearchService implements ElasticSearchService { }); } + @Override public Flux query(String index, QueryParam queryParam, Function, T> mapper) { return this .doQuery(new String[]{index}, queryParam) .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); } + @Override public Flux query(String[] index, QueryParam queryParam, Function, T> mapper) { return this .doQuery(index, queryParam) @@ -373,7 +374,7 @@ public class DefaultElasticSearchService implements ElasticSearchService { private Mono doSearch(SearchRequest request) { return this - .execute(request, restClient.getQueryClient()::searchAsync) + .execute(request, restClient.getQueryClient()::searchAsync) .onErrorResume(err -> { log.error("query elastic error", err); return Mono.empty(); diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java index f067e38b..ec7563f6 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java @@ -12,6 +12,11 @@ import java.util.Collection; import java.util.Map; import java.util.function.Function; +/** + * ES数据库业务操作类 + * + * @author JetLinks + */ public interface ElasticSearchService { default Mono> queryPager(String index, QueryParam queryParam, Function, T> mapper) { diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java index d12764db..fc80b019 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java @@ -27,12 +27,12 @@ import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.utils.time.DefaultDateFormatter; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.bean.FastBeanCopier; -import org.jetlinks.core.utils.FluxUtils; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata; import org.jetlinks.community.elastic.search.service.ElasticSearchService; import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; import org.jetlinks.community.elastic.search.utils.QueryParamTranslator; +import org.jetlinks.core.utils.FluxUtils; import org.reactivestreams.Publisher; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.DependsOn; @@ -119,12 +119,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService { }); } + @Override public Flux query(String index, QueryParam queryParam, Function, T> mapper) { return this .doQuery(new String[]{index}, queryParam) .flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)); } + @Override public Flux query(String[] index, QueryParam queryParam, Function, T> mapper) { return this .doQuery(index, queryParam) diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java index 2e47b455..02c1a294 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java @@ -12,29 +12,53 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +/** + * 设备网关管理器 + *

+ * TCP UDP MQTT CoAP + * + * @author Jetlinks + */ @Component public class DefaultDeviceGatewayManager implements DeviceGatewayManager, BeanPostProcessor { private final DeviceGatewayPropertiesManager propertiesManager; - private Map providers = new ConcurrentHashMap<>(); + /** + * TCP MQTT的设备网关服务提供者 + */ + private final Map providers = new ConcurrentHashMap<>(); - private Map store = new ConcurrentHashMap<>(); + /** + * 启动状态的设备网关 + */ + private final Map store = new ConcurrentHashMap<>(); public DefaultDeviceGatewayManager(DeviceGatewayPropertiesManager propertiesManager) { this.propertiesManager = propertiesManager; } + /** + * 获取设备网关,有则返回,没有就创建返回 + * + * @param id 网关ID + * @return 设备网关 + */ private Mono doGetGateway(String id) { if (store.containsKey(id)) { return Mono.just(store.get(id)); } + + // 数据库查 DeviceGatewayEntity 转换成 DeviceGatewayProperties + // BeanMap中找provider 找不到就是不支持 + // 创建设备网关 + // double check 防止重复创建 return propertiesManager .getProperties(id) - .switchIfEmpty(Mono.error(()->new UnsupportedOperationException("网关配置[" + id + "]不存在"))) + .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网关配置[" + id + "]不存在"))) .flatMap(properties -> Mono .justOrEmpty(providers.get(properties.getProvider())) - .switchIfEmpty(Mono.error(()->new UnsupportedOperationException("不支持的网络服务[" + properties.getProvider() + "]"))) + .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的网络服务[" + properties.getProvider() + "]"))) .flatMap(provider -> provider .createDeviceGateway(properties) .flatMap(gateway -> { diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java index 4cb89919..1e011d27 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java @@ -7,6 +7,9 @@ import org.jetlinks.community.ValueObject; import java.util.HashMap; import java.util.Map; +/** + * @author Tensai + */ @Getter @Setter public class DeviceGatewayProperties implements ValueObject { diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java index 08e609fd..25e02b64 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java @@ -2,8 +2,17 @@ package org.jetlinks.community.gateway.supports; import reactor.core.publisher.Mono; +/** + * @author Jetlinks + */ public interface DeviceGatewayPropertiesManager { + /** + * 获取网关的属性 + * + * @param id 网关ID + * @return 网关属性 + */ Mono getProperties(String id); diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java index 691cf5f5..6b21fb14 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java @@ -4,6 +4,11 @@ import org.jetlinks.community.gateway.DeviceGateway; import org.jetlinks.community.network.NetworkType; import reactor.core.publisher.Mono; +/** + * 设备网关服务提供者 + * + * @author Jetlinks + */ public interface DeviceGatewayProvider { String getId(); diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java index 2dec3025..cdc1ca8b 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java @@ -21,6 +21,11 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +/** + * 设备网关处理工具 + * + * @author Jetlinks + */ @AllArgsConstructor public class DeviceGatewayHelper { @@ -88,13 +93,13 @@ public class DeviceGatewayHelper { return Mono .delay(Duration.ofSeconds(2)) .then(registry - .getDevice(children.getDeviceId()) - .flatMap(device -> device - //没有配置状态自管理才自动上线 - .getSelfConfig(DeviceConfigKey.selfManageState) - .defaultIfEmpty(false) - .filter(Boolean.FALSE::equals) - .flatMap(ignore -> registerSession)) + .getDevice(children.getDeviceId()) + .flatMap(device -> device + //没有配置状态自管理才自动上线 + .getSelfConfig(DeviceConfigKey.selfManageState) + .defaultIfEmpty(false) + .filter(Boolean.FALSE::equals) + .flatMap(ignore -> registerSession)) ); } return registerSession; @@ -102,6 +107,15 @@ public class DeviceGatewayHelper { return Mono.empty(); } + /** + * 处理来自设备网关的设备消息 + * + * @param message 设备消息 + * @param sessionBuilder 设备操作 + * @param sessionConsumer 设备消费 + * @param deviceNotFoundListener 异常监听 + * @return 设备操作 + */ public Mono handleDeviceMessage(DeviceMessage message, Function sessionBuilder, Consumer sessionConsumer, diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java index 8b3842f8..9d6ca9e2 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java @@ -3,6 +3,16 @@ package org.jetlinks.community.network.tcp.device; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.logger.ReactiveLogger; +import org.jetlinks.community.gateway.DeviceGateway; +import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; +import org.jetlinks.community.gateway.monitor.GatewayMonitors; +import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway; +import org.jetlinks.community.network.DefaultNetworkType; +import org.jetlinks.community.network.NetworkType; +import org.jetlinks.community.network.tcp.TcpMessage; +import org.jetlinks.community.network.tcp.client.TcpClient; +import org.jetlinks.community.network.tcp.server.TcpServer; +import org.jetlinks.community.network.utils.DeviceGatewayHelper; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.device.DeviceOperator; @@ -17,16 +27,6 @@ import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.server.DeviceGatewayContext; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSessionManager; -import org.jetlinks.community.gateway.DeviceGateway; -import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; -import org.jetlinks.community.gateway.monitor.GatewayMonitors; -import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway; -import org.jetlinks.community.network.DefaultNetworkType; -import org.jetlinks.community.network.NetworkType; -import org.jetlinks.community.network.tcp.TcpMessage; -import org.jetlinks.community.network.tcp.client.TcpClient; -import org.jetlinks.community.network.tcp.server.TcpServer; -import org.jetlinks.community.network.utils.DeviceGatewayHelper; import org.jetlinks.supports.server.DecodedClientMessageHandler; import reactor.core.Disposable; import reactor.core.publisher.EmitterProcessor; @@ -43,11 +43,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @Slf4j(topic = "system.tcp.gateway") -class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway { +public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway { @Getter private final String id; + /** + * 维护所有创建的tcp server + */ private final TcpServer tcpServer; private final String protocol; @@ -60,6 +63,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew private final DeviceGatewayMonitor gatewayMonitor; + /** + * 连接计数器 + */ private final LongAdder counter = new LongAdder(); private final EmitterProcessor processor = EmitterProcessor.create(false); @@ -67,10 +73,11 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); private final AtomicBoolean started = new AtomicBoolean(); - - private Disposable disposable; - private final DeviceGatewayHelper helper; + /** + * 数据流控开关 + */ + private Disposable disposable; public TcpServerDeviceGateway(String id, String protocol, @@ -93,133 +100,39 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew return supports.getProtocol(protocol); } + /** + * 当前总链接 + * + * @return 当前总链接 + */ @Override public long totalConnection() { return counter.sum(); } + /** + * 传输协议 + * + * @return {@link org.jetlinks.core.message.codec.DefaultTransport} + */ @Override public Transport getTransport() { return DefaultTransport.TCP; } + /** + * 网络类型 + * + * @return {@link org.jetlinks.community.network.DefaultNetworkType} + */ @Override public NetworkType getNetworkType() { return DefaultNetworkType.TCP_SERVER; } - - class TcpConnection implements DeviceGatewayContext { - final TcpClient client; - final AtomicReference keepaliveTimeout = new AtomicReference<>(); - final AtomicReference sessionRef = new AtomicReference<>(); - final InetSocketAddress address; - - TcpConnection(TcpClient client) { - this.client = client; - this.address = client.getRemoteAddress(); - gatewayMonitor.totalConnection(counter.sum()); - client.onDisconnect(() -> { - counter.decrement(); - gatewayMonitor.disconnected(); - gatewayMonitor.totalConnection(counter.sum()); - }); - gatewayMonitor.connected(); - DeviceSession session = sessionManager.getSession(client.getId()); - if (session == null) { - session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) { - @Override - public Mono send(EncodedMessage encodedMessage) { - return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); - } - - @Override - public void setKeepAliveTimeout(Duration timeout) { - keepaliveTimeout.set(timeout); - client.setKeepAliveTimeout(timeout); - } - - @Override - public Optional getClientAddress() { - return Optional.of(address); - } - }; - } - - sessionRef.set(session); - - } - - Mono accept() { - return getProtocol() - .flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this)) - .then( - client - .subscribe() - .filter(tcp -> started.get()) - .publishOn(Schedulers.parallel()) - .flatMap(this::handleTcpMessage) - .onErrorResume((err) -> { - log.error(err.getMessage(), err); - client.shutdown(); - return Mono.empty(); - }) - .then() - ) - .doOnCancel(client::shutdown); - } - - Mono handleTcpMessage(TcpMessage message) { - return getProtocol() - .flatMap(pt -> pt.getMessageCodec(getTransport())) - .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry))) - .cast(DeviceMessage.class) - .doOnNext(msg -> gatewayMonitor.receivedMessage()) - .flatMap(this::handleDeviceMessage) - .doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}", - address, - message - , err))) - .onErrorResume((err) -> Mono.fromRunnable(client::reset)) - .then(); - } - - Mono handleDeviceMessage(DeviceMessage message) { - if (processor.hasDownstreams()) { - sink.next(message); - } - return helper - .handleDeviceMessage(message, - device -> new TcpDeviceSession(device, client, getTransport(), gatewayMonitor), - DeviceGatewayHelper - .applySessionKeepaliveTimeout(message, keepaliveTimeout::get) - .andThen(session -> { - TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class); - deviceSession.setClient(client); - sessionRef.set(deviceSession); - }), - () -> log.warn("无法从tcp[{}]消息中获取设备信息:{}", address, message) - ) - .then(); - } - - @Override - public Mono getDevice(String deviceId) { - return registry.getDevice(deviceId); - } - - @Override - public Mono getProduct(String productId) { - return registry.getProduct(productId); - } - - @Override - public Mono onMessage(DeviceMessage message) { - return handleDeviceMessage(message); - } - } - - + /** + * 启动网关 + */ private void doStart() { if (started.getAndSet(true) || disposable != null) { return; @@ -265,4 +178,134 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew public boolean isAlive() { return started.get(); } + + /** + * TCP 客户端连接 + */ + class TcpConnection implements DeviceGatewayContext { + final TcpClient client; + final AtomicReference keepaliveTimeout = new AtomicReference<>(); + final AtomicReference sessionRef = new AtomicReference<>(); + final InetSocketAddress address; + + TcpConnection(TcpClient client) { + this.client = client; + this.address = client.getRemoteAddress(); + gatewayMonitor.totalConnection(counter.sum()); + client.onDisconnect(() -> { + counter.decrement(); + gatewayMonitor.disconnected(); + gatewayMonitor.totalConnection(counter.sum()); + }); + gatewayMonitor.connected(); + DeviceSession session = sessionManager.getSession(client.getId()); + if (session == null) { + session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) { + @Override + public Mono send(EncodedMessage encodedMessage) { + return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); + } + + @Override + public void setKeepAliveTimeout(Duration timeout) { + keepaliveTimeout.set(timeout); + client.setKeepAliveTimeout(timeout); + } + + @Override + public Optional getClientAddress() { + return Optional.of(address); + } + }; + } + + sessionRef.set(session); + + } + + /** + * 接收消息 + * + * @return null + */ + Mono accept() { + return getProtocol() + .flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this)) + .then( + client + .subscribe() + .filter(tcp -> started.get()) + .publishOn(Schedulers.parallel()) + .flatMap(this::handleTcpMessage) + .onErrorResume((err) -> { + log.error(err.getMessage(), err); + client.shutdown(); + return Mono.empty(); + }) + .then() + ) + .doOnCancel(client::shutdown); + } + + /** + * 处理TCP消息 ==>> 设备消息 + * + * @param message tcp消息 + * @return null + */ + Mono handleTcpMessage(TcpMessage message) { + return getProtocol() + .flatMap(pt -> pt.getMessageCodec(getTransport())) + .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry))) + .cast(DeviceMessage.class) + .doOnNext(msg -> gatewayMonitor.receivedMessage()) + .flatMap(this::handleDeviceMessage) + .doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}", + address, + message + , err))) + .onErrorResume((err) -> Mono.fromRunnable(client::reset)) + .then(); + } + + /** + * 处理设备消息 + * + * @param message 设备消息 + * @return null + */ + Mono handleDeviceMessage(DeviceMessage message) { + if (processor.hasDownstreams()) { + sink.next(message); + } + return helper + .handleDeviceMessage(message, + device -> new TcpDeviceSession(device, client, getTransport(), gatewayMonitor), + DeviceGatewayHelper + .applySessionKeepaliveTimeout(message, keepaliveTimeout::get) + .andThen(session -> { + TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class); + deviceSession.setClient(client); + sessionRef.set(deviceSession); + }), + () -> log.warn("无法从tcp[{}]消息中获取设备信息:{}", address, message) + ) + .then(); + } + + @Override + public Mono getDevice(String deviceId) { + return registry.getDevice(deviceId); + } + + @Override + public Mono getProduct(String productId) { + return registry.getProduct(productId); + } + + @Override + public Mono onMessage(DeviceMessage message) { + return handleDeviceMessage(message); + } + } } diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServer.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServer.java index 3a1246bf..42b8a073 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServer.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServer.java @@ -23,5 +23,6 @@ public interface TcpServer extends Network { /** * 关闭服务端 */ + @Override void shutdown(); } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java index 3da01705..8f7e3e71 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java @@ -31,6 +31,7 @@ public interface TimeSeriesData extends ValueObject { return new SimpleTimeSeriesData(timestamp, data); } + @Override default T as(Class type) { return FastBeanCopier.copy(getData(), type); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index efda209d..19ddd7e0 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -36,115 +36,9 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { PropertyConstants.deviceName.getKey(), PropertyConstants.orgId.getKey() }; - - //设备注册中心 - private final DeviceRegistry registry; - - private final EventBus eventBus; - - private final MessageHandler messageHandler; - private final static BiConsumer doOnError = (error, val) -> DeviceMessageConnector.log.error(error.getMessage(), error); - private final static Function> configGetter = operator -> operator.getSelfConfigs(allConfigHeader); - private final static Values emptyValues = Values.of(Collections.emptyMap()); - - public DeviceMessageConnector(EventBus eventBus, - DeviceRegistry registry, - MessageHandler messageHandler, - DeviceSessionManager sessionManager) { - this.registry = registry; - this.eventBus = eventBus; - this.messageHandler = messageHandler; - sessionManager - .onRegister() - .flatMap(session -> { - DeviceOnlineMessage message = new DeviceOnlineMessage(); - message.setDeviceId(session.getDeviceId()); - message.setTimestamp(session.connectTime()); - return onMessage(message); - }) - .onErrorContinue(doOnError) - .subscribe(); - - sessionManager - .onUnRegister() - .flatMap(session -> { - DeviceOfflineMessage message = new DeviceOfflineMessage(); - message.setDeviceId(session.getDeviceId()); - message.setTimestamp(System.currentTimeMillis()); - return onMessage(message); - }) - .onErrorContinue(doOnError) - .subscribe(); - } - - public Mono onMessage(Message message) { - if (null == message) { - return Mono.empty(); - } - return this - .getTopic(message) - .flatMap(topic -> eventBus.publish(topic, message).then()) - .onErrorContinue(doOnError) - .then(); - } - - private Flux getTopic(Message message) { - Flux topicsStream = createDeviceMessageTopic(registry, message); - if (message instanceof ChildDeviceMessage) { //子设备消息 - return this - .onMessage(((ChildDeviceMessage) message).getChildDeviceMessage()) - .thenMany(topicsStream); - } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 - return this - .onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage()) - .thenMany(topicsStream); - } - return topicsStream; - } - - public static Flux createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) { - return Flux.defer(() -> { - if (message instanceof DeviceMessage) { - DeviceMessage deviceMessage = ((DeviceMessage) message); - String deviceId = deviceMessage.getDeviceId(); - if (deviceId == null) { - log.warn("无法从消息中获取设备ID:{}", deviceMessage); - return Mono.empty(); - } - return deviceRegistry - .getDevice(deviceId) - .flatMap(configGetter) - .defaultIfEmpty(emptyValues) - .flatMapIterable(configs -> { - configs.getAllValues().forEach(deviceMessage::addHeader); - String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null"); - String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage); - List topics = new ArrayList<>(2); - topics.add(topic); - configs.getValue(PropertyConstants.orgId) - .ifPresent(orgId -> topics.add("/org/" + orgId + topic)); - - return topics; - }); - } - return Mono.just("/device/unknown/message/unknown"); - }); - } - - public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) { - StringBuilder builder = new StringBuilder(64) - .append("/device/") - .append(productId) - .append("/") - .append(deviceId); - - appendDeviceMessageTopic(message, builder); - return builder.toString(); - } - private static final BiConsumer[] fastTopicBuilder; static { @@ -208,7 +102,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage(); if (msg instanceof DeviceMessage) { builder.append("/message/children/") - .append(((DeviceMessage) msg).getDeviceId()); + .append(((DeviceMessage) msg).getDeviceId()); } else { builder.append("/message/children"); } @@ -219,7 +113,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage(); if (msg instanceof DeviceMessage) { builder.append("/message/children/reply/") - .append(((DeviceMessage) msg).getDeviceId()); + .append(((DeviceMessage) msg).getDeviceId()); } else { builder.append("/message/children/reply"); } @@ -229,6 +123,81 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived"); } + //设备注册中心 + private final DeviceRegistry registry; + private final EventBus eventBus; + private final MessageHandler messageHandler; + + public DeviceMessageConnector(EventBus eventBus, + DeviceRegistry registry, + MessageHandler messageHandler, + DeviceSessionManager sessionManager) { + this.registry = registry; + this.eventBus = eventBus; + this.messageHandler = messageHandler; + sessionManager + .onRegister() + .flatMap(session -> { + DeviceOnlineMessage message = new DeviceOnlineMessage(); + message.setDeviceId(session.getDeviceId()); + message.setTimestamp(session.connectTime()); + return onMessage(message); + }) + .onErrorContinue(doOnError) + .subscribe(); + + sessionManager + .onUnRegister() + .flatMap(session -> { + DeviceOfflineMessage message = new DeviceOfflineMessage(); + message.setDeviceId(session.getDeviceId()); + message.setTimestamp(System.currentTimeMillis()); + return onMessage(message); + }) + .onErrorContinue(doOnError) + .subscribe(); + } + + public static Flux createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) { + return Flux.defer(() -> { + if (message instanceof DeviceMessage) { + DeviceMessage deviceMessage = ((DeviceMessage) message); + String deviceId = deviceMessage.getDeviceId(); + if (deviceId == null) { + log.warn("无法从消息中获取设备ID:{}", deviceMessage); + return Mono.empty(); + } + return deviceRegistry + .getDevice(deviceId) + .flatMap(configGetter) + .defaultIfEmpty(emptyValues) + .flatMapIterable(configs -> { + configs.getAllValues().forEach(deviceMessage::addHeader); + String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null"); + String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage); + List topics = new ArrayList<>(2); + topics.add(topic); + configs.getValue(PropertyConstants.orgId) + .ifPresent(orgId -> topics.add("/org/" + orgId + topic)); + + return topics; + }); + } + return Mono.just("/device/unknown/message/unknown"); + }); + } + + public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) { + StringBuilder builder = new StringBuilder(64) + .append("/device/") + .append(productId) + .append("/") + .append(deviceId); + + appendDeviceMessageTopic(message, builder); + return builder.toString(); + } + private static void createFastBuilder(MessageType messageType, String topic) { fastTopicBuilder[messageType.ordinal()] = (ignore, builder) -> builder.append(topic); @@ -249,6 +218,37 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { } } + public Mono onMessage(Message message) { + if (null == message) { + return Mono.empty(); + } + return this + .getTopic(message) + .flatMap(topic -> eventBus.publish(topic, message).then()) + .onErrorContinue(doOnError) + .then(); + } + + private Flux getTopic(Message message) { + Flux topicsStream = createDeviceMessageTopic(registry, message); + if (message instanceof ChildDeviceMessage) { //子设备消息 + return this + .onMessage(((ChildDeviceMessage) message).getChildDeviceMessage()) + .thenMany(topicsStream); + } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 + return this + .onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage()) + .thenMany(topicsStream); + } + return topicsStream; + } + + /** + * 处理设备消息 + * + * @param message 设备消息 + * @return 处理结果 + */ protected Mono handleChildrenDeviceMessage(Message message) { if (message instanceof DeviceMessageReply) { return doReply(((DeviceMessageReply) message)); @@ -261,10 +261,23 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { return handleChildrenDeviceMessage(reply.getChildDeviceMessage()); } + /** + * 处理回复消息 + * + * @param reply 子设备回复消息 + * @return 处理结果 + */ protected Mono handleChildrenDeviceMessageReply(ChildDeviceMessageReply reply) { return handleChildrenDeviceMessage(reply.getChildDeviceMessage()); } + /** + * 这里才是真正处理消息的地方 + * + * @param device 设备操作类 + * @param message 设备消息 + * @return 处理结果 + */ @Override public Mono handleMessage(DeviceOperator device, @Nonnull Message message) { Mono then; @@ -284,6 +297,12 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { } + /** + * 回复消息处理逻辑 + * + * @param reply 设备回复消息 + * @return 处理结果 + */ private Mono doReply(DeviceMessageReply reply) { if (log.isDebugEnabled()) { log.debug("reply message {}", reply.getMessageId()); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java index 4d985893..47e9cb2b 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java @@ -15,10 +15,16 @@ import reactor.core.publisher.Mono; */ @Slf4j @AllArgsConstructor -public class TimeSeriesMessageWriterConnector{ +public class TimeSeriesMessageWriterConnector { private final DeviceDataService dataService; + /** + * 订阅设备消息 入库 + * + * @param message 设备消息 + * @return null + */ @Subscribe(topics = "/device/**", id = "device-message-ts-writer") public Mono writeDeviceMessageToTs(DeviceMessage message) { return dataService.saveDeviceMessage(message); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java index ebaa8d0d..c99c0d5d 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java @@ -8,28 +8,22 @@ import org.hswebframework.ezorm.core.param.TermType; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.community.device.entity.DeviceEvent; +import org.jetlinks.community.device.entity.DeviceOperationLogEntity; +import org.jetlinks.community.device.entity.DeviceProperty; +import org.jetlinks.community.device.enums.DeviceLogType; +import org.jetlinks.community.device.events.handler.ValueTypeTranslator; import org.jetlinks.community.gateway.DeviceMessageUtils; +import org.jetlinks.community.timeseries.TimeSeriesData; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.DeviceLogMessage; import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.DeviceMessageReply; import org.jetlinks.core.message.Headers; import org.jetlinks.core.message.event.EventMessage; -import org.jetlinks.core.message.property.ReadPropertyMessageReply; -import org.jetlinks.core.message.property.ReportPropertyMessage; -import org.jetlinks.core.message.property.WritePropertyMessageReply; import org.jetlinks.core.metadata.*; import org.jetlinks.core.metadata.types.*; -import org.jetlinks.community.device.entity.DeviceEvent; -import org.jetlinks.community.device.entity.DeviceOperationLogEntity; -import org.jetlinks.community.device.entity.DevicePropertiesEntity; -import org.jetlinks.community.device.entity.DeviceProperty; -import org.jetlinks.community.device.enums.DeviceLogType; -import org.jetlinks.community.device.events.handler.ValueTypeTranslator; -import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; -import org.jetlinks.community.timeseries.TimeSeriesData; import org.jetlinks.core.utils.DeviceMessageTracer; import org.jetlinks.core.utils.TimestampUtils; import org.reactivestreams.Publisher; @@ -43,7 +37,6 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage; @@ -58,8 +51,8 @@ import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*; */ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStoragePolicy { + private final AtomicInteger nanoInc = new AtomicInteger(); protected DeviceRegistry deviceRegistry; - protected DeviceDataStorageProperties properties; public AbstractDeviceDataStoragePolicy(DeviceRegistry registry, @@ -87,9 +80,11 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora protected abstract Mono doSaveData(String metric, Flux data); /** + * 设备消息转换 二元组 {deviceId, tsData} + * * @param productId 产品ID - * @param message 原始消息 - * @param properties 属性 + * @param message 设备属性消息 + * @param properties 物模型属性 * @return 数据集合 * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map) * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map) @@ -106,7 +101,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora QueryParamEntity paramEntity, Function mapper); - + /** + * 保存单个设备消息,为了提升性能,存储策略会对保存请求进行缓冲,达到一定条件后 + * 再进行批量写出,具体由不同对存储策略实现。 + *

+ * 如果保存失败,在这里不会得到错误信息. + * + * @param message 设备消息 + * @return void + */ @Nonnull @Override public Mono saveDeviceMessage(@Nonnull DeviceMessage message) { @@ -120,10 +123,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora @Override public Mono saveDeviceMessage(@Nonnull Publisher message) { return Flux.from(message) - .flatMap(this::convertMessageToTimeSeriesData) - .groupBy(Tuple2::getT1, Integer.MAX_VALUE) - .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2))) - .then(); + .flatMap(this::convertMessageToTimeSeriesData) + .groupBy(Tuple2::getT1, Integer.MAX_VALUE) + .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2))) + .then(); } protected String createDataId(DeviceMessage message) { @@ -151,6 +154,12 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .toSimpleMap()))); } + /** + * 设备消息转换成时序数据 二元组 {deviceId, tsData} + * + * @param message 设备消息 + * @return 二元组 + */ protected Flux> convertMessageToTimeSeriesData(DeviceMessage message) { boolean ignoreStorage = message.getHeaderOrDefault(Headers.ignoreStorage); boolean ignoreLog = message.getHeaderOrDefault(Headers.ignoreLog); @@ -193,8 +202,16 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return Flux.merge(all); } + /** + * 事件消息转换成 二元组{deviceId, tsData} + * + * @param productId 产品ID + * @param message 事件消息 + * @return 二元组 + */ protected Mono> convertEventMessageToTimeSeriesData(String productId, EventMessage message) { - + // 设备注册中心获取设备操作接口 + // 获取设备元数据 物模型 return deviceRegistry .getDevice(message.getDeviceId()) .flatMap(device -> device @@ -226,20 +243,19 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data)); } - + @Override public Mono> queryDeviceMessageLog(@Nonnull String deviceId, @Nonnull QueryParamEntity entity) { return deviceRegistry .getDevice(deviceId) .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId)) .flatMap(productId -> this .doQueryPager(deviceLogMetricId(productId), - entity.and("deviceId", TermType.eq, deviceId), - data -> data.as(DeviceOperationLogEntity.class) + entity.and("deviceId", TermType.eq, deviceId), + data -> data.as(DeviceOperationLogEntity.class) )) .defaultIfEmpty(PagerResult.empty()); } - @Nonnull @Override public Flux queryEvent(@Nonnull String deviceId, @@ -255,15 +271,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .where("deviceId", deviceId) .execute(param -> this .doQuery(deviceEventMetricId(tp2.getT1().getId(), event), - param, - data -> { - DeviceEvent deviceEvent = new DeviceEvent(data.values()); - if (format) { - deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); - } - deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); - return deviceEvent; - }))); + param, + data -> { + DeviceEvent deviceEvent = new DeviceEvent(data.values()); + if (format) { + deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); + } + deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); + return deviceEvent; + }))); } @Nonnull @@ -277,18 +293,18 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .getDevice(deviceId) .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata())) .flatMap(tp2 -> query.toQuery() - .where("deviceId", deviceId) - .execute(param -> this - .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event), - param, - data -> { - DeviceEvent deviceEvent = new DeviceEvent(data.values()); - if (format) { - deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); - } - deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); - return deviceEvent; - })) + .where("deviceId", deviceId) + .execute(param -> this + .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event), + param, + data -> { + DeviceEvent deviceEvent = new DeviceEvent(data.values()); + if (format) { + deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); + } + deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); + return deviceEvent; + })) ); } @@ -382,6 +398,16 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return Maps.newHashMapWithExpectedSize(size); } + /** + * 设备消息转换 二元组{deviceId, tsData} + * + * @param productId 产品ID + * @param message 设备属性消息 + * @param properties 物模型属性 + * @return 数据集合 + * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map) + * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map) + */ protected Flux> convertPropertiesForRowPolicy(String productId, DeviceMessage message, Map properties) { @@ -414,10 +440,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return Mono .just(TimeSeriesData.of(ts, this .createRowPropertyData(id, - TimestampUtils.toMillis(ts), - device.getDeviceId(), - propertyMetadata, - entry.getT2().getValue())) + TimestampUtils.toMillis(ts), + device.getDeviceId(), + propertyMetadata, + entry.getT2().getValue())) ); }) .map(data -> Tuples.of(devicePropertyMetricId(productId), data))) @@ -511,9 +537,6 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .flatMap(product -> Mono.zip(Mono.just(product), product.getMetadata())); } - - private final AtomicInteger nanoInc = new AtomicInteger(); - //将毫秒转为纳秒,努力让数据不重复 protected long createUniqueNanoTime(long millis) { long nano = TimeUnit.MILLISECONDS.toNanos(millis); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java index a0ead1e8..536c727a 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java @@ -2,16 +2,16 @@ package org.jetlinks.community.device.service.data; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; +import org.jetlinks.community.device.entity.DeviceEvent; +import org.jetlinks.community.device.entity.DeviceOperationLogEntity; +import org.jetlinks.community.device.entity.DeviceProperty; +import org.jetlinks.community.timeseries.query.AggregationData; import org.jetlinks.core.Value; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.metadata.DeviceMetadata; -import org.jetlinks.community.device.entity.DeviceEvent; -import org.jetlinks.community.device.entity.DeviceOperationLogEntity; -import org.jetlinks.community.device.entity.DeviceProperty; -import org.jetlinks.community.timeseries.query.AggregationData; import org.reactivestreams.Publisher; import org.springframework.beans.factory.ObjectProvider; import org.springframework.stereotype.Component; @@ -23,6 +23,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +/** + * 默认设备数据服务 + * + * @author JetLinks + */ @Component public class DefaultDeviceDataService implements DeviceDataService { @@ -55,8 +60,16 @@ public class DefaultDeviceDataService implements DeviceDataService { .then(); } + /** + * 通过产品ID 获取存储策略 + * + * @param productId 产品ID + * @return 存储策略 + */ Mono getStoreStrategy(String productId) { - + // 从注册中心获取产品操作接口 + // 从配置中获取产品的存储策略 + // 巧妙的双层switchIfEmpty 外层判断空配置 内层判断空策略 return deviceRegistry .getProduct(productId) .flatMap(product -> product @@ -69,7 +82,16 @@ public class DefaultDeviceDataService implements DeviceDataService { .flatMap(Function.identity())); } + /** + * 通过设备ID 获取存储策略 + * + * @param deviceId 设备ID + * @return 存储策略 + */ Mono getDeviceStrategy(String deviceId) { + // 从注册中心获取设备操作接口 + // 转换成产品操作接口 + // 继而通过转换的产品ID获取存储策略 return deviceRegistry.getDevice(deviceId) .flatMap(DeviceOperator::getProduct) .map(DeviceProductOperator::getId) @@ -144,7 +166,15 @@ public class DefaultDeviceDataService implements DeviceDataService { .defaultIfEmpty(PagerResult.empty()); } - + /** + * 保存单个设备消息,为了提升性能,存储策略会对保存请求进行缓冲,达到一定条件后 + * 再进行批量写出,具体由不同对存储策略实现。 + *

+ * 如果保存失败,在这里不会得到错误信息. + * + * @param message 设备消息 + * @return void + */ @Nonnull @Override public Mono saveDeviceMessage(@Nonnull DeviceMessage message) { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java index 005782c1..fb2d45b0 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java @@ -2,7 +2,14 @@ package org.jetlinks.community.device.service.data; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; -import org.jetlinks.community.timeseries.query.*; +import org.jetlinks.community.device.entity.DeviceProperty; +import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata; +import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.community.timeseries.TimeSeriesManager; +import org.jetlinks.community.timeseries.query.AggregationData; +import org.jetlinks.community.timeseries.query.AggregationQueryParam; +import org.jetlinks.community.timeseries.query.Group; +import org.jetlinks.community.timeseries.query.TimeGroup; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.DeviceMessage; @@ -10,10 +17,6 @@ import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.Converter; import org.jetlinks.core.metadata.DeviceMetadata; import org.jetlinks.core.metadata.PropertyMetadata; -import org.jetlinks.community.device.entity.DeviceProperty; -import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata; -import org.jetlinks.community.timeseries.TimeSeriesData; -import org.jetlinks.community.timeseries.TimeSeriesManager; import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.springframework.stereotype.Component; @@ -28,7 +31,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric; -import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetricId; @Component public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy { @@ -247,11 +249,22 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat .doOnNext(agg -> agg.values().remove("_time")); } + /** + * 设备消息转换 二元组{deviceId, tsData} + * + * @param productId 产品ID + * @param message 设备属性消息 + * @param properties 物模型属性 + * @return 数据集合 + * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map) + * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map) + */ @Override protected Flux> convertProperties(String productId, DeviceMessage message, Map properties) { return convertPropertiesForColumnPolicy(productId, message, properties); } + @Override protected Object convertPropertyValue(Object value, PropertyMetadata metadata) { if (value == null || metadata == null) { return value; diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java index d784d0bb..b25101b1 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java @@ -22,18 +22,21 @@ public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDa this.timeSeriesManager = timeSeriesManager; } + @Override protected Mono doSaveData(String metric, TimeSeriesData data) { return timeSeriesManager .getService(metric) .commit(data); } + @Override protected Mono doSaveData(String metric, Flux data) { return timeSeriesManager .getService(metric) .save(data); } + @Override protected Flux doQuery(String metric, QueryParamEntity paramEntity, Function mapper) { @@ -44,6 +47,7 @@ public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDa } + @Override protected Mono> doQueryPager(String metric, QueryParamEntity paramEntity, Function mapper) { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java index 650e5618..1a07e413 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java @@ -2,18 +2,18 @@ package org.jetlinks.community.device.service.data; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; -import org.jetlinks.core.device.DeviceOperator; -import org.jetlinks.core.device.DeviceRegistry; -import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.metadata.ConfigMetadata; -import org.jetlinks.core.metadata.DeviceMetadata; -import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.community.device.entity.DeviceProperty; import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata; import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; import org.jetlinks.community.timeseries.TimeSeriesData; import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.query.*; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.metadata.ConfigMetadata; +import org.jetlinks.core.metadata.DeviceMetadata; +import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.reactor.ql.utils.CastUtils; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; @@ -288,6 +288,16 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD .doOnNext(agg -> agg.values().remove("_time")); } + /** + * 设备消息转换 二元组{deviceId, tsData} + * + * @param productId 产品ID + * @param message 设备属性消息 + * @param properties 物模型属性 + * @return 数据集合 + * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map) + * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map) + */ @Override protected Flux> convertProperties(String productId, DeviceMessage message, Map properties) { return convertPropertiesForRowPolicy(productId, message, properties); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DeviceTimeSeriesMetric.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DeviceTimeSeriesMetric.java index b129da4e..bfba5ac2 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DeviceTimeSeriesMetric.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/timeseries/DeviceTimeSeriesMetric.java @@ -1,8 +1,8 @@ package org.jetlinks.community.device.timeseries; +import org.jetlinks.community.timeseries.TimeSeriesMetric; import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.metadata.EventMetadata; -import org.jetlinks.community.timeseries.TimeSeriesMetric; /** * 设备时序数据度量标识 @@ -26,6 +26,13 @@ public interface DeviceTimeSeriesMetric { return TimeSeriesMetric.of(deviceEventMetricId(productId, eventId)); } + /** + * 构建事件指标ID + * + * @param productId 产品ID + * @param eventId 事件ID + * @return 事件指标ID + */ static String deviceEventMetricId(String productId, String eventId) { return "event_".concat(productId).concat("_").concat(eventId); } diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java index 13407c12..3f729b1d 100644 --- a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java @@ -7,6 +7,9 @@ import org.jetlinks.community.gateway.supports.DeviceGatewayPropertiesManager; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; +/** + * @author jetlinks + */ @Service public class DeviceGatewayConfigService implements DeviceGatewayPropertiesManager { From 9c65346a6c8b7a7b24bee406c6cba0030caf6225 Mon Sep 17 00:00:00 2001 From: Tensai <517634644@qq.com> Date: Fri, 28 May 2021 14:06:12 +0800 Subject: [PATCH 2/3] =?UTF-8?q?docs(=E6=95=B0=E6=8D=AE=E6=B5=81=E3=80=90?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=BD=91=E5=85=B3-->=E7=89=A9=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E8=BD=AC=E6=8D=A2->=E6=97=B6=E5=BA=8F=E5=85=A5?= =?UTF-8?q?=E5=BA=93=E3=80=91):=20=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=B5=81=E5=85=A5=E5=BA=93=E7=9A=84=E4=BB=A3=E7=A0=81=E6=B3=A8?= =?UTF-8?q?=E9=87=8A=20=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tensai <517634644@qq.com> --- .../search/service/ElasticSearchService.java | 2 +- .../ReactiveElasticSearchService.java | 412 +++++++++--------- .../supports/DefaultDeviceGatewayManager.java | 2 +- .../supports/DeviceGatewayProperties.java | 11 +- .../DeviceGatewayPropertiesManager.java | 4 +- .../supports/DeviceGatewayProvider.java | 7 +- .../network/DefaultNetworkManager.java | 9 +- .../community/network/NetworkManager.java | 39 +- .../network/utils/DeviceGatewayHelper.java | 5 +- .../tcp/device/TcpServerDeviceGateway.java | 8 +- .../TcpServerDeviceGatewayProvider.java | 16 +- .../community/timeseries/TimeSeriesData.java | 21 +- .../message/DeviceMessageConnector.java | 1 - .../TimeSeriesMessageWriterConnector.java | 2 +- .../data/DefaultDeviceDataService.java | 6 +- ...meSeriesColumnDeviceDataStoragePolicy.java | 41 +- .../TimeSeriesDeviceDataStoragePolicy.java | 8 + ...SeriesRowDeviceDataStoreStoragePolicy.java | 19 +- .../service/DeviceGatewayConfigService.java | 4 +- 19 files changed, 348 insertions(+), 269 deletions(-) diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java index ec7563f6..8a51fee0 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/ElasticSearchService.java @@ -15,7 +15,7 @@ import java.util.function.Function; /** * ES数据库业务操作类 * - * @author JetLinks + * @author zhouhao */ public interface ElasticSearchService { diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java index fc80b019..e955ff5d 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticSearchService.java @@ -57,6 +57,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; /** + * 响应式ES数据库操作类 + * * @author zhouhao * @since 1.0 **/ @@ -66,20 +68,23 @@ import java.util.stream.Collectors; @ConfigurationProperties(prefix = "elasticsearch") public class ReactiveElasticSearchService implements ElasticSearchService { - private final ReactiveElasticsearchClient restClient; - - private final ElasticSearchIndexManager indexManager; - - private FluxSink sink; - public static final IndicesOptions indexOptions = IndicesOptions.fromOptions( true, true, false, false ); + //使用对象池处理Buffer,减少GC消耗 + static ObjectPool pool = ObjectPool.newPool(Buffer::new); static { DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ")); } + private final ReactiveElasticsearchClient restClient; + private final ElasticSearchIndexManager indexManager; + private FluxSink sink; + @Getter + @Setter + private BufferConfig buffer = new BufferConfig(); + public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient, ElasticSearchIndexManager indexManager) { this.restClient = restClient; @@ -136,16 +141,16 @@ public class ReactiveElasticSearchService implements ElasticSearchService { @Override public Mono> queryPager(String[] index, QueryParam queryParam, Function, T> mapper) { return this.doQuery(index, queryParam) - .flatMap(tp2 -> this - .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .map(list -> PagerResult.of((int) tp2 - .getT2() - .getHits() - .getTotalHits().value, list, queryParam)) - ) - .switchIfEmpty(Mono.fromSupplier(PagerResult::empty)); + .flatMap(tp2 -> this + .convertQueryResult(tp2.getT1(), tp2.getT2(), mapper) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .map(list -> PagerResult.of((int) tp2 + .getT2() + .getHits() + .getTotalHits().value, list, queryParam)) + ) + .switchIfEmpty(Mono.fromSupplier(PagerResult::empty)); } private Flux convertQueryResult(List indexList, @@ -164,8 +169,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService { } return mapper .apply(Optional - .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0)) - .convertFromElastic(hitMap)); + .ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0)) + .convertFromElastic(hitMap)); }); } @@ -186,7 +191,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService { }); } - @Override public Mono count(String[] index, QueryParam queryParam) { QueryParam param = queryParam.clone(); @@ -225,8 +229,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService { @Override public Mono commit(String index, Publisher data) { return Flux.from(data) - .flatMap(d -> commit(index, d)) - .then(); + .flatMap(d -> commit(index, d)) + .then(); } @Override @@ -237,10 +241,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService { @Override public Mono save(String index, Publisher data) { return Flux.from(data) - .map(v -> Buffer.of(index, v)) - .collectList() - .flatMap(this::doSave) - .then(); + .map(v -> Buffer.of(index, v)) + .collectList() + .flatMap(this::doSave) + .then(); } @Override @@ -253,32 +257,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService { sink.complete(); } - @Getter - @Setter - private BufferConfig buffer = new BufferConfig(); - - @Getter - @Setter - public static class BufferConfig { - //最小间隔 - private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000); - //缓冲最大数量 - private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000); - //缓冲超时时间 - private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3)); - //背压堆积数量限制. - private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime - .getRuntime() - .availableProcessors()); - //最大缓冲字节 - private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB")); - - //最大重试次数 - private int maxRetry = 3; - //重试间隔 - private Duration minBackoff = Duration.ofSeconds(3); - } - //@PostConstruct public void init() { int flushRate = buffer.rate; @@ -290,10 +268,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService { FluxUtils .bufferRate(Flux.create(sink -> this.sink = sink), - flushRate, - bufferSize, - bufferTimeout, - (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes) + flushRate, + bufferSize, + bufferTimeout, + (b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes) .doOnNext(buf -> bufferedBytes.set(0)) .onBackpressureBuffer(bufferBackpressure, drop -> { // TODO: 2020/11/25 将丢弃的数据存储到本地磁盘 @@ -321,19 +299,186 @@ public class ReactiveElasticSearchService implements ElasticSearchService { }) .onErrorResume((err) -> Mono .fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" + - org.hswebframework.utils.StringUtils.throwable2String(err)))) + org.hswebframework.utils.StringUtils.throwable2String(err)))) .subscribe(); } - //使用对象池处理Buffer,减少GC消耗 - static ObjectPool pool = ObjectPool.newPool(Buffer::new); + private Mono getIndexForSave(String index) { + return indexManager + .getIndexStrategy(index) + .map(strategy -> strategy.getIndexForSave(index)); + + } + + private Mono getIndexForSearch(String index) { + return indexManager + .getIndexStrategy(index) + .map(strategy -> strategy.getIndexForSearch(index)); + + } + + protected Mono doSave(Collection buffers) { + return Flux.fromIterable(buffers) + .groupBy(Buffer::getIndex, Integer.MAX_VALUE) + .flatMap(group -> { + String index = group.key(); + return this + .getIndexForSave(index) + .flatMapMany(realIndex -> group + .map(buffer -> { + try { + IndexRequest request; + if (buffer.id != null) { + request = new IndexRequest(realIndex).type("_doc").id(buffer.id); + } else { + request = new IndexRequest(realIndex).type("_doc"); + } + request.source(buffer.payload, XContentType.JSON); + return request; + } finally { + buffer.release(); + } + })); + }) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(lst -> { + BulkRequest request = new BulkRequest(); + request.timeout(TimeValue.timeValueSeconds(9)); + lst.forEach(request::add); + return restClient + .bulk(request) + .as(save -> { + if (buffer.maxRetry > 0) { + return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff)); + } + return save; + }); + }) + .doOnNext(response -> { + if (response.hasFailures()) { + System.err.println(response.buildFailureMessage()); + } + }) + .thenReturn(buffers.size()); + } + + @SneakyThrows + protected void checkResponse(BulkResponse response) { + if (response.hasFailures()) { + for (BulkItemResponse item : response.getItems()) { + if (item.isFailed()) { + throw item.getFailure().getCause(); + } + } + } + } + + private List translate(Function, T> mapper, SearchResponse response) { + return Arrays.stream(response.getHits().getHits()) + .map(hit -> { + Map hitMap = hit.getSourceAsMap(); + if (StringUtils.isEmpty(hitMap.get("id"))) { + hitMap.put("id", hit.getId()); + } + return mapper.apply(hitMap); + }) + .collect(Collectors.toList()); + } + + private Flux doSearch(SearchRequest request) { + return restClient + .search(request) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Mono.empty(); + }); + } + + private Mono doCount(SearchRequest request) { + return restClient + .count(request) + .onErrorResume(err -> { + log.error("query elastic error", err); + return Mono.empty(); + }); + } + + protected Mono createSearchRequest(QueryParam queryParam, String... indexes) { + return indexManager + .getIndexesMetadata(indexes) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(list -> createSearchRequest(queryParam, list)); + } + + protected Mono createSearchRequest(QueryParam queryParam, List indexes) { + + SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); + return Flux.fromIterable(indexes) + .flatMap(index -> getIndexForSearch(index.getIndex())) + .collectList() + .map(indexList -> + new SearchRequest(indexList.toArray(new String[0])) + .source(builder) + .indicesOptions(indexOptions)); + } + + protected Mono createQueryBuilder(QueryParam queryParam, String index) { + return indexManager + .getIndexMetadata(index) + .map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata)) + .switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null))); + } + + protected Mono createCountRequest(QueryParam queryParam, List indexes) { + QueryParam tempQueryParam = queryParam.clone(); + tempQueryParam.setPaging(false); + tempQueryParam.setSorts(Collections.emptyList()); + + SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); + return Flux.fromIterable(indexes) + .flatMap(index -> getIndexForSearch(index.getIndex())) + .collectList() + .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder)); + } + + private Mono createCountRequest(QueryParam queryParam, String... index) { + return indexManager + .getIndexesMetadata(index) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(list -> createCountRequest(queryParam, list)); + } + + @Getter + @Setter + public static class BufferConfig { + //最小间隔 + private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000); + //缓冲最大数量 + private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000); + //缓冲超时时间 + private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3)); + //背压堆积数量限制. + private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime + .getRuntime() + .availableProcessors()); + //最大缓冲字节 + private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB")); + + //最大重试次数 + private int maxRetry = 3; + //重试间隔 + private Duration minBackoff = Duration.ofSeconds(3); + } @Getter static class Buffer { + final ObjectPool.Handle handle; String index; String id; String payload; - final ObjectPool.Handle handle; public Buffer(ObjectPool.Handle handle) { this.handle = handle; @@ -369,153 +514,4 @@ public class ReactiveElasticSearchService implements ElasticSearchService { return payload == null ? 0 : payload.length() * 2; } } - - - private Mono getIndexForSave(String index) { - return indexManager - .getIndexStrategy(index) - .map(strategy -> strategy.getIndexForSave(index)); - - } - - private Mono getIndexForSearch(String index) { - return indexManager - .getIndexStrategy(index) - .map(strategy -> strategy.getIndexForSearch(index)); - - } - - protected Mono doSave(Collection buffers) { - return Flux.fromIterable(buffers) - .groupBy(Buffer::getIndex,Integer.MAX_VALUE) - .flatMap(group -> { - String index = group.key(); - return this - .getIndexForSave(index) - .flatMapMany(realIndex -> group - .map(buffer -> { - try { - IndexRequest request; - if (buffer.id != null) { - request = new IndexRequest(realIndex).type("_doc").id(buffer.id); - } else { - request = new IndexRequest(realIndex).type("_doc"); - } - request.source(buffer.payload, XContentType.JSON); - return request; - } finally { - buffer.release(); - } - })); - }) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(lst -> { - BulkRequest request = new BulkRequest(); - request.timeout(TimeValue.timeValueSeconds(9)); - lst.forEach(request::add); - return restClient - .bulk(request) - .as(save -> { - if (buffer.maxRetry > 0) { - return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff)); - } - return save; - }); - }) - .doOnNext(response -> { - if (response.hasFailures()) { - System.err.println(response.buildFailureMessage()); - } - }) - .thenReturn(buffers.size()); - } - - @SneakyThrows - protected void checkResponse(BulkResponse response) { - if (response.hasFailures()) { - for (BulkItemResponse item : response.getItems()) { - if (item.isFailed()) { - throw item.getFailure().getCause(); - } - } - } - } - - private List translate(Function, T> mapper, SearchResponse response) { - return Arrays.stream(response.getHits().getHits()) - .map(hit -> { - Map hitMap = hit.getSourceAsMap(); - if (StringUtils.isEmpty(hitMap.get("id"))) { - hitMap.put("id", hit.getId()); - } - return mapper.apply(hitMap); - }) - .collect(Collectors.toList()); - } - - private Flux doSearch(SearchRequest request) { - return restClient - .search(request) - .onErrorResume(err -> { - log.error("query elastic error", err); - return Mono.empty(); - }); - } - - private Mono doCount(SearchRequest request) { - return restClient - .count(request) - .onErrorResume(err -> { - log.error("query elastic error", err); - return Mono.empty(); - }); - } - - protected Mono createSearchRequest(QueryParam queryParam, String... indexes) { - return indexManager - .getIndexesMetadata(indexes) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(list -> createSearchRequest(queryParam, list)); - } - - protected Mono createSearchRequest(QueryParam queryParam, List indexes) { - - SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); - return Flux.fromIterable(indexes) - .flatMap(index -> getIndexForSearch(index.getIndex())) - .collectList() - .map(indexList -> - new SearchRequest(indexList.toArray(new String[0])) - .source(builder) - .indicesOptions(indexOptions)); - } - - protected Mono createQueryBuilder(QueryParam queryParam, String index) { - return indexManager - .getIndexMetadata(index) - .map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata)) - .switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null))); - } - - protected Mono createCountRequest(QueryParam queryParam, List indexes) { - QueryParam tempQueryParam = queryParam.clone(); - tempQueryParam.setPaging(false); - tempQueryParam.setSorts(Collections.emptyList()); - - SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0)); - return Flux.fromIterable(indexes) - .flatMap(index -> getIndexForSearch(index.getIndex())) - .collectList() - .map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder)); - } - - private Mono createCountRequest(QueryParam queryParam, String... index) { - return indexManager - .getIndexesMetadata(index) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(list -> createCountRequest(queryParam, list)); - } } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java index 02c1a294..db1b3bb8 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DefaultDeviceGatewayManager.java @@ -17,7 +17,7 @@ import java.util.concurrent.ConcurrentHashMap; *

* TCP UDP MQTT CoAP * - * @author Jetlinks + * @author zhouhao */ @Component public class DefaultDeviceGatewayManager implements DeviceGatewayManager, BeanPostProcessor { diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java index 1e011d27..66cf1d29 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java @@ -8,11 +8,16 @@ import java.util.HashMap; import java.util.Map; /** - * @author Tensai + * 设备网关属性外观类 + *

+ * 转换设备网关属性数据 + *

+ * + * @author zhouhao */ @Getter @Setter -public class DeviceGatewayProperties implements ValueObject { +public class DeviceGatewayProperties implements ValueObject { private String id; @@ -20,7 +25,7 @@ public class DeviceGatewayProperties implements ValueObject { private String networkId; - private Map configuration=new HashMap<>(); + private Map configuration = new HashMap<>(); @Override public Map values() { diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java index 25e02b64..1893b0dd 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayPropertiesManager.java @@ -3,7 +3,9 @@ package org.jetlinks.community.gateway.supports; import reactor.core.publisher.Mono; /** - * @author Jetlinks + * 设备网关属性管理器 + * + * @author zhouhao */ public interface DeviceGatewayPropertiesManager { diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java index 6b21fb14..c1db8364 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProvider.java @@ -5,9 +5,12 @@ import org.jetlinks.community.network.NetworkType; import reactor.core.publisher.Mono; /** - * 设备网关服务提供者 + * 设备网关支持提供商,用于提供对各种设备网关的支持.在启动设备网关时,会根据对应的提供商以及配置来创建设备网关. + * 实现统一管理网关配置,动态创建设备网关. * - * @author Jetlinks + * @author zhouhao + * @see DeviceGateway + * @since 1.0 */ public interface DeviceGatewayProvider { diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java index 2bec2fc0..3b698aad 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java @@ -19,6 +19,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +/** + * 默认网络管理器 + * + * @author zhouhao + */ @Component @Slf4j public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor { @@ -26,9 +31,9 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor private final NetworkConfigManager configManager; - private Map> store = new ConcurrentHashMap<>(); + private final Map> store = new ConcurrentHashMap<>(); - private Map> providerSupport = new ConcurrentHashMap<>(); + private final Map> providerSupport = new ConcurrentHashMap<>(); public DefaultNetworkManager(NetworkConfigManager configManager) { this.configManager = configManager; diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkManager.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkManager.java index 92ecf2f9..475b0920 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkManager.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkManager.java @@ -4,13 +4,48 @@ import reactor.core.publisher.Mono; import java.util.List; +/** + * 网络服务管理器 + *

+ * 管理所有的网络组件 + * + * @author zhouhao + * @since 1.0 + */ public interface NetworkManager { + /** + * 根据ID获取网络组件,否则根据type和id创建网络组件并返回 + * + * @param type 网络类型 + * @param id 网络组件id + * @param NetWork子类泛型 + * @return 网络组件 + */ Mono getNetwork(NetworkType type, String id); + /** + * 获取所有的网络组件支持提供商 + * + * @return 网络组件支持提供商 + */ List> getProviders(); - Mono reload(NetworkType type, String id); + /** + * 重新加载网络组件 + * + * @param type 网络类型 + * @param id 网络组件ID + * @return void + */ + Mono reload(NetworkType type, String id); - Mono shutdown(NetworkType type, String id); + /** + * 停止网络组件 + * + * @param type 网络类型 + * @param id 网络组件ID + * @return void + */ + Mono shutdown(NetworkType type, String id); } diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java index cdc1ca8b..f4691f43 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java @@ -23,8 +23,11 @@ import java.util.function.Supplier; /** * 设备网关处理工具 + *

+ * 封装常用的设备消息处理操作 + *

* - * @author Jetlinks + * @author zhouhao */ @AllArgsConstructor public class DeviceGatewayHelper { diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java index 9d6ca9e2..e4f4b115 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java @@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @Slf4j(topic = "system.tcp.gateway") -public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway { +class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway { @Getter private final String id; @@ -226,7 +226,7 @@ public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDevi /** * 接收消息 * - * @return null + * @return void */ Mono accept() { return getProtocol() @@ -251,7 +251,7 @@ public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDevi * 处理TCP消息 ==>> 设备消息 * * @param message tcp消息 - * @return null + * @return void */ Mono handleTcpMessage(TcpMessage message) { return getProtocol() @@ -272,7 +272,7 @@ public class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDevi * 处理设备消息 * * @param message 设备消息 - * @return null + * @return void */ Mono handleDeviceMessage(DeviceMessage message) { if (processor.hasDownstreams()) { diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGatewayProvider.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGatewayProvider.java index fa7e7658..2f7ab98a 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGatewayProvider.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGatewayProvider.java @@ -1,8 +1,5 @@ package org.jetlinks.community.network.tcp.device; -import org.jetlinks.core.ProtocolSupports; -import org.jetlinks.core.device.DeviceRegistry; -import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.community.gateway.DeviceGateway; import org.jetlinks.community.gateway.supports.DeviceGatewayProperties; import org.jetlinks.community.gateway.supports.DeviceGatewayProvider; @@ -10,11 +7,20 @@ import org.jetlinks.community.network.DefaultNetworkType; import org.jetlinks.community.network.NetworkManager; import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.tcp.server.TcpServer; +import org.jetlinks.core.ProtocolSupports; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import reactor.core.publisher.Mono; +/** + * TCP服务设备网关提供商 + * + * @author zhouhao + * @since 1.0 + */ @Component public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider { @@ -63,9 +69,9 @@ public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider { .map(mqttServer -> { String protocol = (String) properties.getConfiguration().get("protocol"); - Assert.hasText(protocol,"protocol can not be empty"); + Assert.hasText(protocol, "protocol can not be empty"); - return new TcpServerDeviceGateway(properties.getId(), + return new TcpServerDeviceGateway(properties.getId(), protocol, protocolSupports, registry, diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java index 8f7e3e71..dd05b4da 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java @@ -7,8 +7,21 @@ import java.util.Date; import java.util.Map; import java.util.Optional; +/** + * 时序数据封装类 + * + * @author zhouhao + */ public interface TimeSeriesData extends ValueObject { + static TimeSeriesData of(Date date, Map data) { + return of(date == null ? System.currentTimeMillis() : date.getTime(), data); + } + + static TimeSeriesData of(long timestamp, Map data) { + return new SimpleTimeSeriesData(timestamp, data); + } + long getTimestamp(); Map getData(); @@ -23,14 +36,6 @@ public interface TimeSeriesData extends ValueObject { return Optional.ofNullable(getData().get(name)); } - static TimeSeriesData of(Date date, Map data) { - return of(date == null ? System.currentTimeMillis() : date.getTime(), data); - } - - static TimeSeriesData of(long timestamp, Map data) { - return new SimpleTimeSeriesData(timestamp, data); - } - @Override default T as(Class type) { return FastBeanCopier.copy(getData(), type); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 19ddd7e0..9d9ffc3a 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -123,7 +123,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived"); } - //设备注册中心 private final DeviceRegistry registry; private final EventBus eventBus; private final MessageHandler messageHandler; diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java index 47e9cb2b..67eb1c59 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java @@ -23,7 +23,7 @@ public class TimeSeriesMessageWriterConnector { * 订阅设备消息 入库 * * @param message 设备消息 - * @return null + * @return void */ @Subscribe(topics = "/device/**", id = "device-message-ts-writer") public Mono writeDeviceMessageToTs(DeviceMessage message) { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java index e3e3e9cc..65879d61 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java @@ -25,8 +25,10 @@ import java.util.function.Function; /** * 默认设备数据服务 + *

+ * 管理设备存储策略、提供数据查询和入库操作 * - * @author JetLinks + * @author zhouhao */ @Component public class DefaultDeviceDataService implements DeviceDataService { @@ -116,7 +118,7 @@ public class DefaultDeviceDataService implements DeviceDataService { @Nonnull String... properties) { return this .getDeviceStrategy(deviceId) - .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties)); + .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query, properties)); } @Nonnull diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java index 2e627c6b..39ecaf08 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java @@ -32,6 +32,11 @@ import java.util.stream.Stream; import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric; +/** + * 时序数据列存储策略 + * + * @author zhouhao + */ @Component public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy { @@ -67,10 +72,10 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat public Mono registerMetadata(@Nonnull String productId, @Nonnull DeviceMetadata metadata) { return Flux .concat(Flux - .fromIterable(metadata.getEvents()) - .flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))), - timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())), - timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId))) + .fromIterable(metadata.getEvents()) + .flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))), + timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())), + timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId))) .then(); } @@ -79,8 +84,6 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat String deviceId, Map property, QueryParamEntity param) { - - //查询多个属性,分组聚合获取第一条数据 return param .toQuery() @@ -121,20 +124,20 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat .getDevice(deviceId) .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata())) .flatMap(tp2 -> { - PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property); + PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property); - return param - .toQuery() - .includes(property) - .where("deviceId", deviceId) - .execute(query -> timeSeriesManager - .getService(devicePropertyMetric(tp2.getT1().getId())) - .queryPager(query, - data -> DeviceProperty - .of(data, data.get(property).orElse(0), prop) - .property(property) - )); - } + return param + .toQuery() + .includes(property) + .where("deviceId", deviceId) + .execute(query -> timeSeriesManager + .getService(devicePropertyMetric(tp2.getT1().getId())) + .queryPager(query, + data -> DeviceProperty + .of(data, data.get(property).orElse(0), prop) + .property(property) + )); + } ); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java index b25101b1..5a724576 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesDeviceDataStoragePolicy.java @@ -10,6 +10,14 @@ import reactor.core.publisher.Mono; import java.util.function.Function; +/** + * 抽象时序数据存储策略 + *

+ * 提供时序数据通用的查询存储逻辑 + *

+ * + * @author zhouhao + */ public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDataStoragePolicy { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java index 0bdca790..c9cc7a51 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java @@ -28,6 +28,11 @@ import java.util.stream.Stream; import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric; +/** + * 设备时序数据行存储策略 + * + * @author zhouhao + */ @Component public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy { @@ -184,11 +189,11 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD return timeSeriesManager .getService(devicePropertyMetric(tp2.getT1().getId())) .aggregation(AggregationQueryParam - .of() - .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize())) - .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组 - .filter(query) - .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet())) + .of() + .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize())) + .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组 + .filter(query) + .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet())) ).map(data -> DeviceProperty .of(data, data.getString("property").map(propertiesMap::get).orElse(null)) .deviceId(deviceId)); @@ -236,13 +241,13 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD //执行查询 .execute(timeSeriesManager.getService(getTimeSeriesMetric(productId))::aggregation) //按时间分组,然后将返回的结果合并起来 - .groupBy(agg -> agg.getString("time", ""),Integer.MAX_VALUE) + .groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE) .flatMap(group -> { String time = group.key(); return group //按属性分组 - .groupBy(agg -> agg.getString("property", ""),Integer.MAX_VALUE) + .groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE) .flatMap(propsGroup -> { String property = propsGroup.key(); return propsGroup diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java index 3f729b1d..1e10ff23 100644 --- a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/service/DeviceGatewayConfigService.java @@ -8,7 +8,9 @@ import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; /** - * @author jetlinks + * 设备网关配置服务 + * + * @author zhouhao */ @Service public class DeviceGatewayConfigService implements DeviceGatewayPropertiesManager { From b409932ac90ce136c7ed8f2fda0307b750fb49ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E5=91=A8?= Date: Fri, 28 May 2021 14:33:55 +0800 Subject: [PATCH 3/3] Create pull_request.yml --- .github/workflows/pull_request.yml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .github/workflows/pull_request.yml diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml new file mode 100644 index 00000000..7a661c33 --- /dev/null +++ b/.github/workflows/pull_request.yml @@ -0,0 +1,27 @@ +# This workflow will build a Java project with Maven +# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven + +name: Java CI with Maven + +on: + pull_request: + branches: [ master ] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Cache Maven Repository + uses: actions/cache@v1 + with: + path: ~/.m2 + key: jetlinks-community-maven-repository + - name: Build with Maven + run: ./mvnw package -Dmaven.test.skip=true -Pbuild