diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java index 56ca0bac..565c6220 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/utils/DeviceGatewayHelper.java @@ -126,17 +126,14 @@ public class DeviceGatewayHelper { //子设备注册 if (isDoRegister(children)) { - return Mono - //延迟2秒,因为自动注册是异步的,收到消息后并不能保证马上可以注册成功. - .delay(Duration.ofSeconds(2)) - .then(registry - .getDevice(children.getDeviceId()) - .flatMap(device -> device - //没有配置状态自管理才自动上线 - .getSelfConfig(DeviceConfigKey.selfManageState) - .defaultIfEmpty(false) - .filter(Boolean.FALSE::equals)) - .flatMap(ignore -> sessionHandler)) + return this + .getDeviceForRegister(children.getDeviceId()) + .flatMap(device -> device + //没有配置状态自管理才自动上线 + .getSelfConfig(DeviceConfigKey.selfManageState) + .defaultIfEmpty(false) + .filter(Boolean.FALSE::equals)) + .flatMap(ignore -> sessionHandler) .then(); } return sessionHandler.then(); @@ -207,7 +204,7 @@ public class DeviceGatewayHelper { } if (doHandle) { - then = then.flatMap(opt -> messageHandler.handleMessage(opt, message).thenReturn(opt)); + then = messageHandler.handleMessage(null, message).then(then); } return this @@ -260,6 +257,15 @@ public class DeviceGatewayHelper { .flatMap(Function.identity()); } + private Mono getDeviceForRegister(String deviceId) { + return registry + .getDevice(deviceId) + .switchIfEmpty(Mono.defer(() -> Mono + //延迟2秒,因为自动注册是异步的,收到消息后并不能保证马上可以注册成功. + .delay(Duration.ofSeconds(2)) + .then(registry.getDevice(deviceId)))); + } + private Mono createNewSession(String deviceId, DeviceMessage message, Function> sessionBuilder, @@ -298,7 +304,7 @@ public class DeviceGatewayHelper { private Mono updateSession0(DeviceSession session, DeviceMessage message, Function> sessionBuilder) { - Mono after = null; + Mono after = null; //消息中指定保持在线,并且之前的会话不是保持在线,则需要替换之前的会话 if (isNewKeeOnline(session, message)) { Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); @@ -310,14 +316,13 @@ public class DeviceGatewayHelper { Integer timeoutSeconds = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); after = sessionBuilder .apply(session.getOperator()) - .map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds))) - .then(); + .map(newSession -> new KeepOnlineSession(newSession, Duration.ofSeconds(timeoutSeconds))); } applySessionKeepaliveTimeout(message, session); session.keepAlive(); return after == null ? Mono.just(session) - : after.thenReturn(session); + : after; } private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) { @@ -348,7 +353,11 @@ public class DeviceGatewayHelper { //判断保持在线的会话是否以及丢失(服务重启后可能出现) private static boolean isKeeOnlineLost(DeviceSession session) { - return session instanceof KeepOnlineSession && session.isWrapFrom(LostDeviceSession.class); + if (!session.isWrapFrom(KeepOnlineSession.class)) { + return false; + } + return session.isWrapFrom(LostDeviceSession.class) + || !session.unwrap(KeepOnlineSession.class).getParent().isAlive(); } //判断是否为设备注册 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java index 6df7415d..01075052 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceMessageBusinessHandler.java @@ -98,13 +98,12 @@ public class DeviceMessageBusinessHandler { instance.mergeConfiguration(tps.getT5()); return deviceService - .save(Mono.just(instance)) - .thenReturn(instance) - .flatMap(device -> registry - .register(device.toDeviceInfo() - .addConfig("state", selfManageState - ? org.jetlinks.core.device.DeviceState.offline - : org.jetlinks.core.device.DeviceState.online))); + .save(instance) + .then(Mono.defer(() -> registry + .register(instance.toDeviceInfo() + .addConfig("state", selfManageState + ? org.jetlinks.core.device.DeviceState.offline + : org.jetlinks.core.device.DeviceState.online)))); }); }