From 0438fe7167fd133fb2b05be52a1f5ec1c84c4fa5 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Mon, 30 Nov 2020 18:51:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AE=BE=E5=A4=87=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=BD=AC=E5=8F=91=E5=88=B0=E6=9C=BA=E6=9E=84topic?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jetlinks/community/PropertyConstants.java | 1 + .../device/message/DeviceMessageConnector.java | 16 +++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java index 9fb6ed49..dcae09d7 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyConstants.java @@ -12,6 +12,7 @@ import java.util.Optional; * @since 1.0 */ public interface PropertyConstants { + Key orgId = Key.of("orgId"); Key deviceName = Key.of("deviceName"); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 055865ae..6c5792df 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -15,7 +15,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.annotation.Nonnull; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.function.BiConsumer; import java.util.function.Function; @@ -32,6 +34,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { private final static String[] allConfigHeader = { PropertyConstants.productId.getKey(), PropertyConstants.deviceName.getKey(), + PropertyConstants.orgId.getKey() }; //设备注册中心 @@ -41,8 +44,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { private final MessageHandler messageHandler; - private final DeviceSessionManager sessionManager; - private final static BiConsumer doOnError = (error, val) -> log.error(error.getMessage(), error); private final static Function> configGetter = operator -> operator.getSelfConfigs(allConfigHeader); @@ -56,7 +57,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { this.registry = registry; this.eventBus = eventBus; this.messageHandler = messageHandler; - this.sessionManager = sessionManager; sessionManager .onRegister() .flatMap(session -> { @@ -118,10 +118,16 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { .getDevice(deviceId) .flatMap(configGetter) .defaultIfEmpty(emptyValues) - .map(configs -> { + .flatMapIterable(configs -> { configs.getAllValues().forEach(deviceMessage::addHeader); String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null"); - return createDeviceMessageTopic(productId, deviceId, deviceMessage); + String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage); + List topics = new ArrayList<>(2); + topics.add(topic); + configs.getValue(PropertyConstants.orgId) + .ifPresent(orgId -> topics.add("/org/" + orgId + topic)); + + return topics; }); } return Mono.just("/device/unknown/message/unknown");