From 90849d57c6244d796537e0ceb1a1a56cdfb04fbc Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Tue, 17 Nov 2020 18:50:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AE=BE=E5=A4=87=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=BD=AC=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jetlinks/community/PropertyConstants.java | 43 +++ .../DeviceManagerConfiguration.java | 32 ++ .../community/device/enums/DeviceLogType.java | 77 ++--- .../message/DeviceMessageConnector.java | 309 +++++++++++++----- .../data/AbstractDeviceDataStoragePolicy.java | 2 +- jetlinks-standalone/pom.xml | 10 + .../configuration/JetLinksConfiguration.java | 90 +---- 7 files changed, 366 insertions(+), 197 deletions(-) create mode 100644 jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java create mode 100644 jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java new file mode 100644 index 00000000..1daa7a6b --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java @@ -0,0 +1,43 @@ +package org.jetlinks.community; + +import org.jetlinks.core.config.ConfigKey; +import org.jetlinks.core.message.HeaderKey; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * @author wangzheng + * @since 1.0 + */ +public interface PropertyConstants { + + Key deviceName = Key.of("deviceName"); + + Key productId = Key.of("productId"); + + + @SuppressWarnings("all") + static Optional getFromMap(ConfigKey key, Map map) { + return Optional.ofNullable((T) map.get(key.getKey())); + } + + interface Key extends ConfigKey, HeaderKey { + + static Key of(String key) { + return new Key() { + @Override + public String getKey() { + return key; + } + + @Override + public T getDefaultValue() { + return null; + } + }; + } + + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java new file mode 100644 index 00000000..7d41b2c2 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java @@ -0,0 +1,32 @@ +package org.jetlinks.community.device.configuration; + +import org.jetlinks.community.device.message.DeviceMessageConnector; +import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector; +import org.jetlinks.community.device.service.data.DeviceDataService; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.server.MessageHandler; +import org.jetlinks.core.server.session.DeviceSessionManager; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DeviceManagerConfiguration { + + @Bean + public DeviceMessageConnector deviceMessageConnector(EventBus eventBus, + MessageHandler messageHandler, + DeviceSessionManager sessionManager, + DeviceRegistry registry) { + return new DeviceMessageConnector(eventBus, registry, messageHandler, sessionManager); + } + + @Bean + @ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true) + public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService) { + return new TimeSeriesMessageWriterConnector(dataService); + } + + +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java index 09ab6c27..773fb261 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java @@ -5,27 +5,34 @@ import lombok.AllArgsConstructor; import lombok.Getter; import org.hswebframework.web.dict.EnumDict; import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.MessageType; + +import java.util.EnumMap; +import java.util.Map; +import java.util.Optional; @AllArgsConstructor @Getter public enum DeviceLogType implements EnumDict { event("事件上报"), - readProperty("属性读取"), - writeProperty("属性修改"), + readProperty("读取属性"), + writeProperty("修改属性"), + writePropertyReply("修改属性回复"), reportProperty("属性上报"), + readPropertyReply("读取属性回复"), child("子设备消息"), childReply("子设备消息回复"), functionInvoke("调用功能"), - readPropertyReply("读取属性回复"), - writePropertyReply("修改属性回复"), functionReply("调用功能回复"), register("设备注册"), unregister("设备注销"), - + log("日志"), + tag("标签更新"), offline("离线"), online("上线"), other("其它"); + @JSONField(serialize = false) private final String text; @@ -34,43 +41,37 @@ public enum DeviceLogType implements EnumDict { return name(); } - public static DeviceLogType of(DeviceMessage message) { - switch (message.getMessageType()) { - case EVENT: - return event; - case ONLINE: - return online; - case OFFLINE: - return offline; - case CHILD: - return child; - case CHILD_REPLY: - return childReply; - case REPORT_PROPERTY: - return reportProperty; - case READ_PROPERTY: - return readProperty; - case INVOKE_FUNCTION: - return functionInvoke; - case WRITE_PROPERTY: - return writeProperty; - case INVOKE_FUNCTION_REPLY: - return functionReply; - case READ_PROPERTY_REPLY: - return readPropertyReply; - case WRITE_PROPERTY_REPLY: - return writePropertyReply; - case REGISTER: - return register; - case UN_REGISTER: - return unregister; - default: - return other; - } + private final static Map typeMapping = new EnumMap<>(MessageType.class); + + static { + + typeMapping.put(MessageType.EVENT, event); + typeMapping.put(MessageType.ONLINE, online); + typeMapping.put(MessageType.OFFLINE, offline); + typeMapping.put(MessageType.CHILD, child); + typeMapping.put(MessageType.CHILD_REPLY, childReply); + typeMapping.put(MessageType.LOG, log); + typeMapping.put(MessageType.UPDATE_TAG, tag); + + typeMapping.put(MessageType.REPORT_PROPERTY, reportProperty); + typeMapping.put(MessageType.READ_PROPERTY, readProperty); + typeMapping.put(MessageType.READ_PROPERTY_REPLY, readPropertyReply); + + typeMapping.put(MessageType.INVOKE_FUNCTION, functionInvoke); + typeMapping.put(MessageType.INVOKE_FUNCTION_REPLY, functionReply); + + typeMapping.put(MessageType.WRITE_PROPERTY, writeProperty); + typeMapping.put(MessageType.WRITE_PROPERTY_REPLY, writePropertyReply); + + typeMapping.put(MessageType.REGISTER, register); + typeMapping.put(MessageType.UN_REGISTER, unregister); } + public static DeviceLogType of(DeviceMessage message) { + return Optional.ofNullable(typeMapping.get(message.getMessageType())).orElse(DeviceLogType.other); + } // @Override // public Object getWriteJSONObject() { // return getValue(); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index a8662b57..95b9b421 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -1,20 +1,21 @@ package org.jetlinks.community.device.message; import lombok.extern.slf4j.Slf4j; +import org.jetlinks.community.PropertyConstants; import org.jetlinks.core.Values; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.message.*; import org.jetlinks.core.message.event.EventMessage; -import org.jetlinks.core.message.firmware.*; -import org.jetlinks.core.message.function.FunctionInvokeMessage; -import org.jetlinks.core.message.function.FunctionInvokeMessageReply; -import org.jetlinks.core.message.property.*; +import org.jetlinks.core.server.MessageHandler; +import org.jetlinks.core.server.session.DeviceSessionManager; +import org.jetlinks.supports.server.DecodedClientMessageHandler; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import javax.annotation.Nonnull; import java.util.Collections; -import java.util.HashMap; import java.util.function.BiConsumer; import java.util.function.Function; @@ -25,67 +26,106 @@ import java.util.function.Function; * @since 1.0 */ @Slf4j -public class DeviceMessageConnector{ - //将设备注册中心到配置追加到消息header中,下游订阅者可直接使用. - private final String[] appendConfigHeader = {"productId", "deviceName"}; +public class DeviceMessageConnector implements DecodedClientMessageHandler { + + //将设备注册中心的配置追加到消息header中,下游订阅者可直接使用. + private final static String[] allConfigHeader = { + PropertyConstants.productId.getKey(), + PropertyConstants.deviceName.getKey(), + }; //设备注册中心 private final DeviceRegistry registry; private final EventBus eventBus; + private final MessageHandler messageHandler; + + private final DeviceSessionManager sessionManager; + private final static BiConsumer doOnError = (error, val) -> log.error(error.getMessage(), error); - private final Function> configGetter; + private final static Function> configGetter = operator -> operator.getSelfConfigs(allConfigHeader); private final static Values emptyValues = Values.of(Collections.emptyMap()); public DeviceMessageConnector(EventBus eventBus, - DeviceRegistry registry) { + DeviceRegistry registry, + MessageHandler messageHandler, + DeviceSessionManager sessionManager) { this.registry = registry; this.eventBus = eventBus; - this.configGetter = operator -> operator.getSelfConfigs(appendConfigHeader); + this.messageHandler = messageHandler; + this.sessionManager = sessionManager; + sessionManager + .onRegister() + .flatMap(session -> { + DeviceOnlineMessage message = new DeviceOnlineMessage(); + message.setDeviceId(session.getDeviceId()); + message.setTimestamp(session.connectTime()); + return onMessage(message); + }) + .onErrorContinue(doOnError) + .subscribe(); + + sessionManager + .onUnRegister() + .flatMap(session -> { + DeviceOfflineMessage message = new DeviceOfflineMessage(); + message.setDeviceId(session.getDeviceId()); + message.setTimestamp(System.currentTimeMillis()); + return onMessage(message); + }) + .onErrorContinue(doOnError) + .subscribe(); } public Mono onMessage(Message message) { if (null == message) { return Mono.empty(); } - return this.getTopic(message) + return this + .getTopic(message) .flatMap(topic -> eventBus.publish(topic, message).then()) .onErrorContinue(doOnError) .then(); } - public Mono getTopic(Message message) { - if (message instanceof DeviceMessage) { - DeviceMessage deviceMessage = ((DeviceMessage) message); - String deviceId = deviceMessage.getDeviceId(); - if (deviceId == null) { - log.warn("无法从消息中获取设备ID:{}", deviceMessage); - return Mono.empty(); - } - return registry - .getDevice(deviceId) - .flatMap(configGetter) - .defaultIfEmpty(emptyValues) - .flatMap(configs -> { - configs.getAllValues().forEach(deviceMessage::addHeader); - String productId = deviceMessage.getHeader("productId").map(String::valueOf).orElse("null"); - String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage); - - if (message instanceof ChildDeviceMessage) { //子设备消息 - return onMessage(((ChildDeviceMessage) message).getChildDeviceMessage()) - .thenReturn(topic); - } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 - return onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage()) - .thenReturn(topic); - } - return Mono.just(topic); - }); - + private Flux getTopic(Message message) { + Flux topicsStream = createDeviceMessageTopic(registry, message); + if (message instanceof ChildDeviceMessage) { //子设备消息 + return this + .onMessage(((ChildDeviceMessage) message).getChildDeviceMessage()) + .thenMany(topicsStream); + } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 + return this + .onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage()) + .thenMany(topicsStream); } - return Mono.just("/device/unknown/message/unknown"); + return topicsStream; + } + + public static Flux createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) { + return Flux.defer(() -> { + if (message instanceof DeviceMessage) { + DeviceMessage deviceMessage = ((DeviceMessage) message); + String deviceId = deviceMessage.getDeviceId(); + if (deviceId == null) { + log.warn("无法从消息中获取设备ID:{}", deviceMessage); + return Mono.empty(); + } + return deviceRegistry + .getDevice(deviceId) + .flatMap(configGetter) + .defaultIfEmpty(emptyValues) + .map(configs -> { + configs.getAllValues().forEach(deviceMessage::addHeader); + String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null"); + return createDeviceMessageTopic(productId, deviceId, deviceMessage); + }); + } + return Mono.just("/device/unknown/message/unknown"); + }); } public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) { @@ -99,68 +139,165 @@ public class DeviceMessageConnector{ return builder.toString(); } - public static void appendDeviceMessageTopic(Message message, StringBuilder builder) { - if (message instanceof EventMessage) { //事件 + private static final BiConsumer[] fastTopicBuilder; + + static { + fastTopicBuilder = new BiConsumer[MessageType.values().length]; + + + //事件 + createFastBuilder(MessageType.EVENT, (message, builder) -> { EventMessage event = ((EventMessage) message); builder.append("/message/event/").append(event.getEvent()); - } else if (message instanceof ReportPropertyMessage) { //上报属性 - builder.append("/message/property/report"); - } else if (message instanceof DeviceOnlineMessage) { //设备上线 - builder.append("/online"); - } else if (message instanceof DeviceOfflineMessage) { //设备离线 - builder.append("/offline"); - } else if (message instanceof ChildDeviceMessage) { //子设备消息 + }); + + //上报属性 + createFastBuilder(MessageType.REPORT_PROPERTY, "/message/property/report"); + //读取属性 + createFastBuilder(MessageType.READ_PROPERTY, "/message/send/property/read"); + //读取属性回复 + createFastBuilder(MessageType.READ_PROPERTY_REPLY, "/message/property/read/reply"); + //修改属性 + createFastBuilder(MessageType.WRITE_PROPERTY, "/message/send/property/write"); + //修改属性回复 + createFastBuilder(MessageType.WRITE_PROPERTY_REPLY, "/message/property/write/reply"); + //调用功能 + createFastBuilder(MessageType.INVOKE_FUNCTION, "/message/send/function"); + //调用功能回复 + createFastBuilder(MessageType.INVOKE_FUNCTION_REPLY, "/message/function/reply"); + //注册 + createFastBuilder(MessageType.REGISTER, "/register"); + //注销 + createFastBuilder(MessageType.UN_REGISTER, "/unregister"); + //拉取固件 + createFastBuilder(MessageType.REQUEST_FIRMWARE, "/firmware/pull"); + //拉取固件回复 + createFastBuilder(MessageType.REQUEST_FIRMWARE_REPLY, "/firmware/pull/reply"); + //上报固件信息 + createFastBuilder(MessageType.REPORT_FIRMWARE, "/firmware/report"); + //上报固件安装进度 + createFastBuilder(MessageType.UPGRADE_FIRMWARE_PROGRESS, "/firmware/progress"); + //推送固件 + createFastBuilder(MessageType.UPGRADE_FIRMWARE, "/firmware/push"); + //推送固件回复 + createFastBuilder(MessageType.UPGRADE_FIRMWARE_REPLY, "/firmware/push/reply"); + //未知 + createFastBuilder(MessageType.UNKNOWN, "/message/unknown"); + //日志 + createFastBuilder(MessageType.LOG, "/message/log"); + //透传 + createFastBuilder(MessageType.DIRECT, "/message/direct"); + //更新标签 + createFastBuilder(MessageType.UPDATE_TAG, "/message/tags/update"); + //上线 + createFastBuilder(MessageType.ONLINE, "/online"); + //离线 + createFastBuilder(MessageType.OFFLINE, "/offline"); + //断开连接 + createFastBuilder(MessageType.DISCONNECT, "/disconnect"); + //断开连接回复 + createFastBuilder(MessageType.DISCONNECT_REPLY, "/disconnect/reply"); + //子设备消息 + createFastBuilder(MessageType.CHILD, (message, builder) -> { Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage(); if (msg instanceof DeviceMessage) { builder.append("/message/children/") - .append(((DeviceMessage) msg).getDeviceId()); - appendDeviceMessageTopic(msg, builder); + .append(((DeviceMessage) msg).getDeviceId()); } else { builder.append("/message/children"); - appendDeviceMessageTopic(message, builder); } - } else if (message instanceof ChildDeviceMessageReply) { //子设备消息 + appendDeviceMessageTopic(msg, builder); + }); + //子设备消息回复 + createFastBuilder(MessageType.CHILD_REPLY, (message, builder) -> { Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage(); if (msg instanceof DeviceMessage) { builder.append("/message/children/reply/") - .append(((DeviceMessage) msg).getDeviceId()); - appendDeviceMessageTopic(msg, builder); + .append(((DeviceMessage) msg).getDeviceId()); } else { builder.append("/message/children/reply"); - appendDeviceMessageTopic(message, builder); } - } else if (message instanceof ReadPropertyMessage) { //读取属性 - builder.append("/message/send/property/read"); - } else if (message instanceof WritePropertyMessage) { //修改属性 - builder.append("/message/send/property/write"); - } else if (message instanceof FunctionInvokeMessage) { //调用功能 - builder.append("/message/send/function"); - } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复 - builder.append("/message/property/read/reply"); - } else if (message instanceof WritePropertyMessageReply) { //修改属性回复 - builder.append("/message/property/write/reply"); - } else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复 - builder.append("/message/function/reply"); - } else if (message instanceof DeviceRegisterMessage) { //注册 - builder.append("/register"); - } else if (message instanceof DeviceUnRegisterMessage) { //注销 - builder.append("/unregister"); - } else if (message instanceof RequestFirmwareMessage) { //拉取固件请求 since 1.3 - builder.append("/firmware/pull"); - } else if (message instanceof RequestFirmwareMessageReply) { //拉取固件响应 since 1.3 - builder.append("/firmware/pull/reply"); - } else if (message instanceof ReportFirmwareMessage) { //上报固件信息 since 1.3 - builder.append("/firmware/report"); - } else if (message instanceof UpgradeFirmwareProgressMessage) { //上报固件更新进度 since 1.3 - builder.append("/firmware/progress"); - } else if (message instanceof UpgradeFirmwareMessage) { //推送固件更新 since 1.3 - builder.append("/firmware/push"); - } else if (message instanceof UpgradeFirmwareMessageReply) { //推送固件更新回复 since 1.3 - builder.append("/firmware/push/reply"); - } else if (message instanceof DirectDeviceMessage) { //透传消息 since 1.4 - builder.append("/message/direct"); + appendDeviceMessageTopic(msg, builder); + }); + } + + private static void createFastBuilder(MessageType messageType, + String topic) { + fastTopicBuilder[messageType.ordinal()] = (ignore, builder) -> builder.append(topic); + } + + private static void createFastBuilder(MessageType messageType, + BiConsumer builderBiConsumer) { + fastTopicBuilder[messageType.ordinal()] = builderBiConsumer; + } + + public static void appendDeviceMessageTopic(Message message, StringBuilder builder) { + + BiConsumer fastBuilder = fastTopicBuilder[message.getMessageType().ordinal()]; + if (null != fastBuilder) { + fastBuilder.accept(message, builder); } else { builder.append("/message/").append(message.getMessageType().name().toLowerCase()); } } + + protected Mono handleChildrenDeviceMessage(DeviceOperator device, String childrenId, Message message) { + if (message instanceof DeviceMessageReply) { + return doReply(((DeviceMessageReply) message)); + } else if (message instanceof DeviceOnlineMessage) { + return sessionManager.registerChildren(device.getDeviceId(), childrenId) + .thenReturn(true) + .defaultIfEmpty(false); + } else if (message instanceof DeviceOfflineMessage) { + return sessionManager.unRegisterChildren(device.getDeviceId(), childrenId) + .thenReturn(true) + .defaultIfEmpty(false); + } + return Mono.just(true); + } + + protected Mono handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessage reply) { + return handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage()); + } + + protected Mono handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessageReply reply) { + return handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage()); + } + + @Override + public Mono handleMessage(DeviceOperator device, @Nonnull Message message) { + return this + .onMessage(message) + .then(Mono.defer(() -> { + if (device != null) { + if (message instanceof ChildDeviceMessageReply) { + return handleChildrenDeviceMessageReply(device, ((ChildDeviceMessageReply) message)); + } else if (message instanceof ChildDeviceMessage) { + return handleChildrenDeviceMessageReply(device, ((ChildDeviceMessage) message)); + } + } + if (message instanceof DeviceMessageReply) { + return doReply(((DeviceMessageReply) message)); + } + return Mono.just(true); + })) + .defaultIfEmpty(false); + + } + + private Mono doReply(DeviceMessageReply reply) { + if (log.isDebugEnabled()) { + log.debug("reply message {}", reply.getMessageId()); + } + return messageHandler + .reply(reply) + .doOnSuccess(success -> { + if (log.isDebugEnabled()) { + log.debug("reply message {} complete", reply.getMessageId()); + } + }) + .thenReturn(true) + .doOnError((error) -> log.error("reply message error", error)) + ; + } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java index 0c1af382..7093dd30 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java @@ -142,7 +142,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora protected Flux> convertMessageToTimeSeriesData(DeviceMessage message) { String productId = (String) message.getHeader("productId").orElse("null"); Consumer logEntityConsumer = null; - List>> all = new ArrayList<>(); + List>> all = new ArrayList<>(2); if (message instanceof EventMessage) { logEntityConsumer = log -> log.setContent(JSON.toJSONString(((EventMessage) message).getData())); diff --git a/jetlinks-standalone/pom.xml b/jetlinks-standalone/pom.xml index a580fb9f..5fe3a5e6 100644 --- a/jetlinks-standalone/pom.xml +++ b/jetlinks-standalone/pom.xml @@ -89,6 +89,16 @@ + + com.github.ben-manes.caffeine + caffeine + + + + com.github.ben-manes.caffeine + guava + + io.netty netty-transport-native-epoll diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java index ed659c93..0ddbd25e 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java @@ -1,6 +1,7 @@ package org.jetlinks.community.standalone.configuration; -import com.google.common.cache.CacheBuilder; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.guava.CaffeinatedGuava; import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; @@ -10,56 +11,44 @@ import org.hswebframework.web.authorization.token.UserTokenManager; import org.hswebframework.web.authorization.token.redis.RedisUserTokenManager; import org.jetlinks.community.device.entity.DeviceInstanceEntity; import org.jetlinks.community.device.entity.DeviceProductEntity; -import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector; import org.jetlinks.community.device.service.AutoDiscoverDeviceRegistry; -import org.jetlinks.community.device.service.data.DeviceDataService; -import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.cluster.ClusterManager; +import org.jetlinks.core.config.ConfigStorageManager; import org.jetlinks.core.device.DeviceOperationBroker; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.StandaloneDeviceMessageBroker; import org.jetlinks.core.event.EventBus; -import org.jetlinks.core.message.DeviceOfflineMessage; -import org.jetlinks.core.message.DeviceOnlineMessage; import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; import org.jetlinks.core.server.MessageHandler; import org.jetlinks.core.server.monitor.GatewayServerMetrics; import org.jetlinks.core.server.monitor.GatewayServerMonitor; import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.core.spi.ServiceContext; -import org.jetlinks.community.device.message.DeviceMessageConnector; import org.jetlinks.supports.cluster.ClusterDeviceRegistry; import org.jetlinks.supports.cluster.redis.RedisClusterManager; +import org.jetlinks.supports.config.EventBusStorageManager; import org.jetlinks.supports.event.BrokerEventBus; import org.jetlinks.supports.protocol.ServiceLoaderProtocolSupports; import org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager; import org.jetlinks.supports.protocol.management.ProtocolSupportLoader; import org.jetlinks.supports.protocol.management.ProtocolSupportManager; import org.jetlinks.supports.server.DecodedClientMessageHandler; -import org.jetlinks.supports.server.DefaultClientMessageHandler; -import org.jetlinks.supports.server.DefaultDecodedClientMessageHandler; import org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler; import org.jetlinks.supports.server.monitor.MicrometerGatewayServerMetrics; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; import org.springframework.boot.web.server.WebServerFactoryCustomizer; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.core.ReactiveRedisOperations; import org.springframework.data.redis.core.ReactiveRedisTemplate; -import reactor.core.publisher.EmitterProcessor; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; -import java.util.List; import java.util.Optional; @Configuration @@ -98,6 +87,13 @@ public class JetLinksConfiguration { return new StandaloneDeviceMessageBroker(); } + @Bean + public EventBusStorageManager eventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) { + return new EventBusStorageManager(clusterManager, + eventBus, + () -> CaffeinatedGuava.build(Caffeine.newBuilder())); + } + @Bean(initMethod = "startup") public RedisClusterManager clusterManager(JetLinksProperties properties, ReactiveRedisTemplate template) { return new RedisClusterManager(properties.getClusterName(), properties.getServerId(), template); @@ -106,8 +102,14 @@ public class JetLinksConfiguration { @Bean public ClusterDeviceRegistry clusterDeviceRegistry(ProtocolSupports supports, ClusterManager manager, + ConfigStorageManager storageManager, DeviceOperationBroker handler) { - return new ClusterDeviceRegistry(supports, manager, handler, CacheBuilder.newBuilder().build()); + + return new ClusterDeviceRegistry(supports, + storageManager, + manager, + handler, + CaffeinatedGuava.build(Caffeine.newBuilder())); } @Bean @@ -133,35 +135,6 @@ public class JetLinksConfiguration { }; } - @Bean - public DeviceMessageConnector deviceMessageConnector(EventBus eventBus, DeviceRegistry registry) { - return new DeviceMessageConnector(eventBus, registry); - } - - @Bean - @ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true) - public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService deviceDataService) { - return new TimeSeriesMessageWriterConnector(deviceDataService); - } - - @Bean(destroyMethod = "shutdown") - public DefaultDecodedClientMessageHandler defaultDecodedClientMessageHandler(MessageHandler handler, - DeviceMessageConnector messageConnector, - DeviceSessionManager deviceSessionManager, - Scheduler scheduler) { - DefaultDecodedClientMessageHandler clientMessageHandler = new DefaultDecodedClientMessageHandler(handler, deviceSessionManager, - EmitterProcessor.create(false) - ); - clientMessageHandler - .subscribe() - .parallel() - .runOn(scheduler) - .flatMap(msg -> messageConnector.onMessage(msg).onErrorContinue((err, r) -> log.error(err.getMessage(), err))) - .subscribe(); - - return clientMessageHandler; - } - @Bean(initMethod = "startup") public DefaultSendToDeviceMessageHandler defaultSendToDeviceMessageHandler(JetLinksProperties properties, DeviceSessionManager sessionManager, @@ -171,12 +144,6 @@ public class JetLinksConfiguration { return new DefaultSendToDeviceMessageHandler(properties.getServerId(), sessionManager, messageHandler, registry, clientMessageHandler); } - @Bean - public DefaultClientMessageHandler defaultClientMessageHandler(DefaultDecodedClientMessageHandler handler) { - return new DefaultClientMessageHandler(handler); - } - - @Bean public GatewayServerMonitor gatewayServerMonitor(JetLinksProperties properties, MeterRegistry registry) { GatewayServerMetrics metrics = new MicrometerGatewayServerMetrics(properties.getServerId(), registry); @@ -198,33 +165,12 @@ public class JetLinksConfiguration { @Bean(initMethod = "init", destroyMethod = "shutdown") public DefaultDeviceSessionManager deviceSessionManager(JetLinksProperties properties, GatewayServerMonitor monitor, - DeviceMessageConnector messageConnector, DeviceRegistry registry) { DefaultDeviceSessionManager sessionManager = new DefaultDeviceSessionManager(); sessionManager.setGatewayServerMonitor(monitor); sessionManager.setRegistry(registry); Optional.ofNullable(properties.getTransportLimit()).ifPresent(sessionManager::setTransportLimits); - sessionManager.onRegister() - .flatMap(session -> { - DeviceOnlineMessage message = new DeviceOnlineMessage(); - message.setDeviceId(session.getDeviceId()); - message.setTimestamp(session.connectTime()); - return messageConnector.onMessage(message); - }) - .onErrorContinue((err, r) -> log.error(err.getMessage(), err)) - .subscribe(); - - sessionManager.onUnRegister() - .flatMap(session -> { - DeviceOfflineMessage message = new DeviceOfflineMessage(); - message.setDeviceId(session.getDeviceId()); - message.setTimestamp(System.currentTimeMillis()); - return messageConnector.onMessage(message); - }) - .onErrorContinue((err, r) -> log.error(err.getMessage(), err)) - .subscribe(); - return sessionManager; }