优化自动注册以及设备会话管理

This commit is contained in:
zhouhao 2022-08-03 17:43:43 +08:00
parent 80185b911a
commit f31f71dc14
2 changed files with 32 additions and 24 deletions

View File

@ -126,17 +126,14 @@ public class DeviceGatewayHelper {
//子设备注册
if (isDoRegister(children)) {
return Mono
//延迟2秒因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
.delay(Duration.ofSeconds(2))
.then(registry
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler))
return this
.getDeviceForRegister(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler)
.then();
}
return sessionHandler.then();
@ -207,7 +204,7 @@ public class DeviceGatewayHelper {
}
if (doHandle) {
then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt));
then = messageHandler.handleMessage(null, message).then(then);
}
return this
@ -260,6 +257,15 @@ public class DeviceGatewayHelper {
.flatMap(Function.identity());
}
private Mono<DeviceOperator> getDeviceForRegister(String deviceId) {
return registry
.getDevice(deviceId)
.switchIfEmpty(Mono.defer(() -> Mono
//延迟2秒因为自动注册是异步的,收到消息后并不能保证马上可以注册成功.
.delay(Duration.ofSeconds(2))
.then(registry.getDevice(deviceId))));
}
private Mono<DeviceSession> createNewSession(String deviceId,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
@ -298,7 +304,7 @@ public class DeviceGatewayHelper {
private Mono<DeviceSession> updateSession0(DeviceSession session,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder) {
Mono<Void> after = null;
Mono<DeviceSession> after = null;
//消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话
if (isNewKeeOnline(session, message)) {
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
@ -310,14 +316,13 @@ public class DeviceGatewayHelper {
Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
after = sessionBuilder
.apply(session.getOperator())
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)))
.then();
.map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds)));
}
applySessionKeepaliveTimeout(message, session);
session.keepAlive();
return after == null
? Mono.just(session)
: after.thenReturn(session);
: after;
}
private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) {
@ -348,7 +353,11 @@ public class DeviceGatewayHelper {
//判断保持在线的会话是否以及丢失(服务重启后可能出现)
private static boolean isKeeOnlineLost(DeviceSession session) {
return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class);
if (!session.isWrapFrom(KeepOnlineSession.class)) {
return false;
}
return session.isWrapFrom(LostDeviceSession.class)
|| !session.unwrap(KeepOnlineSession.class).getParent().isAlive();
}
//判断是否为设备注册

View File

@ -98,13 +98,12 @@ public class DeviceMessageBusinessHandler {
instance.mergeConfiguration(tps.getT5());
return deviceService
.save(Mono.just(instance))
.thenReturn(instance)
.flatMap(device -> registry
.register(device.toDeviceInfo()
.addConfig("state", selfManageState
? org.jetlinks.core.device.DeviceState.offline
: org.jetlinks.core.device.DeviceState.online)));
.save(instance)
.then(Mono.defer(() -> registry
.register(instance.toDeviceInfo()
.addConfig("state", selfManageState
? org.jetlinks.core.device.DeviceState.offline
: org.jetlinks.core.device.DeviceState.online))));
});
}