From e813d23bebef3b8cc1a9adcffed52d9460a169fc Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Mon, 30 Nov 2020 18:30:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AD=90=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../network/utils/DeviceGatewayHelper.java | 65 +++++++++++-------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java index 78007c28..44c0407a 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java @@ -24,7 +24,6 @@ public class DeviceGatewayHelper { private final DeviceSessionManager sessionManager; private final DecodedClientMessageHandler messageHandler; - public static Consumer applySessionKeepaliveTimeout(DeviceMessage msg, Supplier timeoutSupplier) { return session -> { Duration timeout = msg @@ -45,29 +44,35 @@ public class DeviceGatewayHelper { return handleDeviceMessage(message, sessionBuilder, sessionConsumer, () -> Mono.fromRunnable(deviceNotFoundListener)); } + protected Mono handleChildrenDeviceMessage(String deviceId, DeviceMessage children) { + ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId()); + if (deviceSession == null && null != children.getDeviceId()) { + Mono then = sessionManager + .registerChildren(deviceId, children.getDeviceId()) + .then(); + //子设备注册 + if (isDoRegister(children)) { + then = Mono.delay(Duration.ofSeconds(2)) + .then(then); + } + return then; + } + return Mono.empty(); + } + public Mono handleDeviceMessage(DeviceMessage message, - Function sessionBuilder, - Consumer sessionConsumer, - Supplier> deviceNotFoundListener) { + Function sessionBuilder, + Consumer sessionConsumer, + Supplier> deviceNotFoundListener) { String deviceId = message.getDeviceId(); Mono then = Mono.empty(); boolean doHandle = true; if (message instanceof ChildDeviceMessage) { DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage(); - ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, childrenMessage.getDeviceId()); - if (deviceSession == null) { - then = sessionManager - .registerChildren(deviceId, childrenMessage.getDeviceId()) - .then(Mono.empty()); - } + then = handleChildrenDeviceMessage(deviceId,childrenMessage); } else if (message instanceof ChildDeviceMessageReply) { DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage(); - ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, childrenMessage.getDeviceId()); - if (deviceSession == null) { - then = sessionManager - .registerChildren(deviceId, childrenMessage.getDeviceId()) - .then(Mono.empty()); - } + then = handleChildrenDeviceMessage(deviceId,childrenMessage); } else if (message instanceof DeviceOfflineMessage) { //设备离线消息 DeviceSession session = sessionManager.unregister(deviceId); @@ -91,15 +96,12 @@ public class DeviceGatewayHelper { .getDevice(deviceId) .switchIfEmpty(Mono.defer(() -> { //设备注册 - if (message instanceof DeviceRegisterMessage) { - if (message.getHeader(PropertyConstants.deviceName).isPresent() - && message.getHeader(PropertyConstants.productId).isPresent()) { - return messageHandler - .handleMessage(null, message) - //延迟2秒后尝试重新获取设备并上线 - .then(Mono.delay(Duration.ofSeconds(2))) - .then(registry.getDevice(deviceId)); - } + if (isDoRegister(message)) { + return messageHandler + .handleMessage(null, message) + //延迟2秒后尝试重新获取设备并上线 + .then(Mono.delay(Duration.ofSeconds(2))) + .then(registry.getDevice(deviceId)); } if (deviceNotFoundListener != null) { return deviceNotFoundListener.get(); @@ -130,12 +132,23 @@ public class DeviceGatewayHelper { } else { sessionConsumer.accept(session); session.keepAlive(); + if (doHandle) { + return messageHandler + .handleMessage(session.getOperator(), message) + .then(then) + .then(registry.getDevice(deviceId)); + } return then - .then(doHandle ? messageHandler.handleMessage(session.getOperator(), message) : Mono.empty()) .then(registry.getDevice(deviceId)); } } + private boolean isDoRegister(DeviceMessage message) { + return message instanceof DeviceRegisterMessage + && message.getHeader(PropertyConstants.deviceName).isPresent() + && message.getHeader(PropertyConstants.productId).isPresent(); + } + }