优化设备会话处理

This commit is contained in:
zhouhao 2022-06-27 18:17:37 +08:00
parent bc985539e2
commit 3fc16470b0
1 changed files with 25 additions and 11 deletions

View File

@ -1,7 +1,6 @@
package org.jetlinks.community.network.utils;
import lombok.AllArgsConstructor;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
@ -9,10 +8,8 @@ import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.server.session.LostDeviceSession;
import org.jetlinks.core.server.session.*;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
@ -226,7 +223,8 @@ public class DeviceGatewayHelper {
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
return sessionManager
.getSession(deviceId)
.getSession(deviceId, false)
.filterWhen(DeviceSession::isAliveAsync)
.map(old -> {
//需要更新会话时才进行更新
if (needUpdateSession(old, message)) {
@ -240,7 +238,7 @@ public class DeviceGatewayHelper {
//会话不存在则尝试创建或者更新
.defaultIfEmpty(Mono.defer(() -> sessionManager
.compute(deviceId,
registerNewSession(
createNewSession(
deviceId,
message,
sessionBuilder,
@ -262,10 +260,10 @@ public class DeviceGatewayHelper {
.flatMap(Function.identity());
}
private Mono<DeviceSession> registerNewSession(String deviceId,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
private Mono<DeviceSession> createNewSession(String deviceId,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
Supplier<Mono<DeviceOperator>> deviceNotFoundCallback) {
return registry
.getDevice(deviceId)
.switchIfEmpty(Mono.defer(deviceNotFoundCallback))
@ -284,6 +282,22 @@ public class DeviceGatewayHelper {
private Mono<DeviceSession> updateSession(DeviceSession session,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
return session
.isAliveAsync()
.flatMap(alive -> {
//设备会话存活才更新
if (alive) {
return updateSession0(session, message, sessionBuilder);
}
//创建新的会话
return createNewSession(message.getDeviceId(), message, sessionBuilder, Mono::empty);
});
}
private Mono<DeviceSession> updateSession0(DeviceSession session,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
Mono<Void> after = null;
//消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话
if (isNewKeeOnline(session, message)) {