fix: 修复处理keepOnline会话错误问题

This commit is contained in:
zhouhao 2024-06-12 09:30:41 +08:00
parent 9711f857d2
commit 29174496b6
1 changed files with 37 additions and 19 deletions

View File

@ -16,6 +16,7 @@ import reactor.core.publisher.Mono;
import reactor.util.context.Context; import reactor.util.context.Context;
import java.time.Duration; import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -121,22 +122,27 @@ public class DeviceGatewayHelper {
.createOrUpdateSession(childrenId, .createOrUpdateSession(childrenId,
children, children,
child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)), child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)),
Mono::empty)); Mono::empty)
.doOnNext(session -> {
if (session.isWrapFrom(ChildrenDeviceSession.class)) {
ChildrenDeviceSession childrenSession = session.unwrap(ChildrenDeviceSession.class);
//网关发生变化,替换新的上级会话
if (!Objects.equals(deviceId, childrenSession.getParent().getDeviceId())) {
childrenSession.replaceWith(parentSession);
}
}
}));
//子设备注册 //子设备注册
if (isDoRegister(children)) { if (isDoRegister(children)) {
return Mono return this
//延迟2秒因为自动注册是异步的,收到消息后并不能保证马上可以注册成功. .getDeviceForRegister(children.getDeviceId())
.delay(Duration.ofSeconds(2)) .flatMap(device -> device
.then(registry //没有配置状态自管理才自动上线
.getDevice(children.getDeviceId()) .getSelfConfig(DeviceConfigKey.selfManageState)
.flatMap(device -> device .defaultIfEmpty(false)
//没有配置状态自管理才自动上线 .filter(Boolean.FALSE::equals))
.getSelfConfig(DeviceConfigKey.selfManageState) .flatMap(ignore -> sessionHandler)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler))
.then(); .then();
} }
return sessionHandler.then(); return sessionHandler.then();
@ -207,7 +213,7 @@ public class DeviceGatewayHelper {
} }
if (doHandle) { if (doHandle) {
then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt)); then = messageHandler.handleMessage(null, message).then(then);
} }
return this return this
@ -260,6 +266,15 @@ public class DeviceGatewayHelper {
.flatMap(Function.identity()); .flatMap(Function.identity());
} }
private Mono<DeviceOperator> getDeviceForRegister(String deviceId) {
return registry
.getDevice(deviceId)
.switchIfEmpty(Mono.defer(() -> Mono
//延迟2秒因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
.delay(Duration.ofSeconds(2))
.then(registry.getDevice(deviceId))));
}
private Mono<DeviceSession> createNewSession(String deviceId, private Mono<DeviceSession> createNewSession(String deviceId,
DeviceMessage message, DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder, Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
@ -298,7 +313,7 @@ public class DeviceGatewayHelper {
private Mono<DeviceSession> updateSession0(DeviceSession session, private Mono<DeviceSession> updateSession0(DeviceSession session,
DeviceMessage message, DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) { Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
Mono<Void> after = null; Mono<DeviceSession> after = null;
//消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话 //消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话
if (isNewKeeOnline(session, message)) { if (isNewKeeOnline(session, message)) {
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
@ -310,14 +325,13 @@ public class DeviceGatewayHelper {
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
after = sessionBuilder after = sessionBuilder
.apply(session.getOperator()) .apply(session.getOperator())
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds))) .map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)));
.then();
} }
applySessionKeepaliveTimeout(message, session); applySessionKeepaliveTimeout(message, session);
session.keepAlive(); session.keepAlive();
return after == null return after == null
? Mono.just(session) ? Mono.just(session)
: after.thenReturn(session); : after;
} }
private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) { private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) {
@ -348,7 +362,11 @@ public class DeviceGatewayHelper {
//判断保持在线的会话是否以及丢失(服务重启后可能出现) //判断保持在线的会话是否以及丢失(服务重启后可能出现)
private static boolean isKeeOnlineLost(DeviceSession session) { private static boolean isKeeOnlineLost(DeviceSession session) {
return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class); if (!session.isWrapFrom(KeepOnlineSession.class)) {
return false;
}
return session.isWrapFrom(LostDeviceSession.class)
|| !session.unwrap(KeepOnlineSession.class).getParent().isAlive();
} }
//判断是否为设备注册 //判断是否为设备注册