diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java index 56ca0bac..ad130d8e 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java @@ -16,6 +16,7 @@ import reactor.core.publisher.Mono; import reactor.util.context.Context; import java.time.Duration; +import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -121,22 +122,27 @@ public class DeviceGatewayHelper { .createOrUpdateSession(childrenId, children, child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)), - Mono::empty)); - + Mono::empty) + .doOnNext(session -> { + if (session.isWrapFrom(ChildrenDeviceSession.class)) { + ChildrenDeviceSession childrenSession = session.unwrap(ChildrenDeviceSession.class); + //网关发生变化,替换新的上级会话 + if (!Objects.equals(deviceId, childrenSession.getParent().getDeviceId())) { + childrenSession.replaceWith(parentSession); + } + } + })); //子设备注册 if (isDoRegister(children)) { - return Mono - //延迟2秒,因为自动注册是异步的,收到消息后并不能保证马上可以注册成功. - .delay(Duration.ofSeconds(2)) - .then(registry - .getDevice(children.getDeviceId()) - .flatMap(device -> device - //没有配置状态自管理才自动上线 - .getSelfConfig(DeviceConfigKey.selfManageState) - .defaultIfEmpty(false) - .filter(Boolean.FALSE::equals)) - .flatMap(ignore -> sessionHandler)) + return this + .getDeviceForRegister(children.getDeviceId()) + .flatMap(device -> device + //没有配置状态自管理才自动上线 + .getSelfConfig(DeviceConfigKey.selfManageState) + .defaultIfEmpty(false) + .filter(Boolean.FALSE::equals)) + .flatMap(ignore -> sessionHandler) .then(); } return sessionHandler.then(); @@ -207,7 +213,7 @@ public class DeviceGatewayHelper { } if (doHandle) { - then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt)); + then = messageHandler.handleMessage(null, message).then(then); } return this @@ -260,6 +266,15 @@ public class DeviceGatewayHelper { .flatMap(Function.identity()); } + private Mono getDeviceForRegister(String deviceId) { + return registry + .getDevice(deviceId) + .switchIfEmpty(Mono.defer(() -> Mono + //延迟2秒,因为自动注册是异步的,收到消息后并不能保证马上可以注册成功. + .delay(Duration.ofSeconds(2)) + .then(registry.getDevice(deviceId)))); + } + private Mono createNewSession(String deviceId, DeviceMessage message, Function> sessionBuilder, @@ -298,7 +313,7 @@ public class DeviceGatewayHelper { private Mono updateSession0(DeviceSession session, DeviceMessage message, Function> sessionBuilder) { - Mono after = null; + Mono after = null; //消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话 if (isNewKeeOnline(session, message)) { Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); @@ -310,14 +325,13 @@ public class DeviceGatewayHelper { Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); after = sessionBuilder .apply(session.getOperator()) - .map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds))) - .then(); + .map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds))); } applySessionKeepaliveTimeout(message, session); session.keepAlive(); return after == null ? Mono.just(session) - : after.thenReturn(session); + : after; } private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) { @@ -348,7 +362,11 @@ public class DeviceGatewayHelper { //判断保持在线的会话是否以及丢失(服务重启后可能出现) private static boolean isKeeOnlineLost(DeviceSession session) { - return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class); + if (!session.isWrapFrom(KeepOnlineSession.class)) { + return false; + } + return session.isWrapFrom(LostDeviceSession.class) + || !session.unwrap(KeepOnlineSession.class).getParent().isAlive(); } //判断是否为设备注册