diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java index 20044cf1..56ca0bac 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java @@ -1,7 +1,6 @@ package org.jetlinks.community.network.utils; import lombok.AllArgsConstructor; -import org.jetlinks.community.PropertyConstants; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; @@ -9,10 +8,8 @@ import org.jetlinks.core.device.session.DeviceSessionManager; import org.jetlinks.core.message.*; import org.jetlinks.core.message.state.DeviceStateCheckMessage; import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; -import org.jetlinks.core.server.session.ChildrenDeviceSession; -import org.jetlinks.core.server.session.DeviceSession; -import org.jetlinks.core.server.session.KeepOnlineSession; -import org.jetlinks.core.server.session.LostDeviceSession; +import org.jetlinks.core.server.session.*; +import org.jetlinks.community.PropertyConstants; import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.springframework.util.StringUtils; import reactor.core.publisher.Mono; @@ -226,7 +223,8 @@ public class DeviceGatewayHelper { Function> sessionBuilder, Supplier> deviceNotFoundCallback) { return sessionManager - .getSession(deviceId) + .getSession(deviceId, false) + .filterWhen(DeviceSession::isAliveAsync) .map(old -> { //需要更新会话时才进行更新 if (needUpdateSession(old, message)) { @@ -240,7 +238,7 @@ public class DeviceGatewayHelper { //会话不存在则尝试创建或者更新 .defaultIfEmpty(Mono.defer(() -> sessionManager .compute(deviceId, - registerNewSession( + createNewSession( deviceId, message, sessionBuilder, @@ -262,10 +260,10 @@ public class DeviceGatewayHelper { .flatMap(Function.identity()); } - private Mono registerNewSession(String deviceId, - DeviceMessage message, - Function> sessionBuilder, - Supplier> deviceNotFoundCallback) { + private Mono createNewSession(String deviceId, + DeviceMessage message, + Function> sessionBuilder, + Supplier> deviceNotFoundCallback) { return registry .getDevice(deviceId) .switchIfEmpty(Mono.defer(deviceNotFoundCallback)) @@ -284,6 +282,22 @@ public class DeviceGatewayHelper { private Mono updateSession(DeviceSession session, DeviceMessage message, Function> sessionBuilder) { + + return session + .isAliveAsync() + .flatMap(alive -> { + //设备会话存活才更新 + if (alive) { + return updateSession0(session, message, sessionBuilder); + } + //创建新的会话 + return createNewSession(message.getDeviceId(), message, sessionBuilder, Mono::empty); + }); + } + + private Mono updateSession0(DeviceSession session, + DeviceMessage message, + Function> sessionBuilder) { Mono after = null; //消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话 if (isNewKeeOnline(session, message)) {