diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayHelper.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayHelper.java index edf8d6d0..3abdbaf7 100755 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayHelper.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceGatewayHelper.java @@ -1,6 +1,7 @@ package org.jetlinks.community.gateway; import lombok.AllArgsConstructor; +import lombok.Getter; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; @@ -35,8 +36,11 @@ import java.util.function.Supplier; @AllArgsConstructor public class DeviceGatewayHelper { + @Getter private final DeviceRegistry registry; + @Getter private final DeviceSessionManager sessionManager; + @Getter private final DecodedClientMessageHandler messageHandler; public static Consumer applySessionKeepaliveTimeout(DeviceMessage msg, Supplier timeoutSupplier) { @@ -136,6 +140,7 @@ public class DeviceGatewayHelper { } })); + //子设备注册 if (isDoRegister(children)) { return this @@ -182,7 +187,7 @@ public class DeviceGatewayHelper { if (l == 0) { return registry .getDevice(deviceId) - .flatMap(device -> messageHandler.handleMessage(device, message)); + .flatMap(device -> handleMessage(device, message)); } return Mono.empty(); }) @@ -196,28 +201,22 @@ public class DeviceGatewayHelper { .orElse(false); } - //忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突 - if (message.getHeaderOrDefault(Headers.ignoreSession)) { - return registry - .getDevice(deviceId) - .flatMap(device -> { - if (!isDoRegister(message)) { - return messageHandler - .handleMessage(device, message) - .thenReturn(device); - } - return Mono.just(device); - }); - - } - if (then == null) { then = registry.getDevice(deviceId); } + //忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突 + if (message.getHeaderOrDefault(Headers.ignoreSession)) { + if (!isDoRegister(message)) { + return handleMessage(null, message) + .then(then); + } + return then; + + } + if (doHandle) { - then = messageHandler - .handleMessage(null, message) + then = handleMessage(null, message) .then(then); } @@ -229,6 +228,13 @@ public class DeviceGatewayHelper { } + private Mono handleMessage(DeviceOperator device, Message message) { + return messageHandler + .handleMessage(device, message) + //转换为empty,减少触发discard + .flatMap(ignore -> Mono.empty()); + } + private Mono createOrUpdateSession(String deviceId, DeviceMessage message, Function> sessionBuilder, @@ -256,7 +262,7 @@ public class DeviceGatewayHelper { () -> { //设备注册 if (isDoRegister(message)) { - return messageHandler + return this .handleMessage(null, message) //延迟2秒后尝试重新获取设备并上线 .then(Mono.delay(Duration.ofSeconds(2))) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java index 22f6e27e..c902a668 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/transparent/TransparentDeviceMessageConnector.java @@ -12,13 +12,14 @@ import org.hswebframework.web.crud.events.EntityModifyEvent; import org.hswebframework.web.crud.events.EntitySavedEvent; import org.hswebframework.web.exception.ValidationException; import org.jctools.maps.NonBlockingHashMap; +import org.jetlinks.community.gateway.DeviceGatewayHelper; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.device.session.DeviceSessionManager; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; -import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.DirectDeviceMessage; -import org.jetlinks.core.message.Headers; +import org.jetlinks.core.message.*; import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; import org.jetlinks.community.OperationSource; import org.jetlinks.community.device.entity.TransparentMessageCodecEntity; @@ -45,14 +46,19 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev private final Map codecs = new NonBlockingHashMap<>(); + private final DeviceGatewayHelper gatewayHelper; + public TransparentDeviceMessageConnector(@SuppressWarnings("all") - ReactiveRepository repository, + ReactiveRepository repository, DecodedClientMessageHandler messageHandler, + DeviceSessionManager sessionManager, + DeviceRegistry registry, EventBus eventBus, ObjectProvider providers) { this.repository = repository; this.messageHandler = messageHandler; this.eventBus = eventBus; + this.gatewayHelper = new DeviceGatewayHelper(registry, sessionManager, messageHandler); for (TransparentMessageCodecProvider provider : providers) { TransparentMessageCodecProviders.addProvider(provider); } @@ -69,10 +75,24 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev } return codec .decode(message) - .flatMap(msg -> messageHandler.handleMessage(null, msg)) + .flatMap(this::handleMessage) .then(); } + private Mono handleMessage(DeviceMessage msg) { + if (msg instanceof ChildDeviceMessage || msg instanceof ChildDeviceMessageReply) { + msg.addHeader(Headers.ignoreSession, true); + return gatewayHelper + .handleDeviceMessage( + msg, + device -> null + ) + .then(); + } + + return messageHandler.handleMessage(null, msg).then(); + } + private TransparentMessageCodec getCodecOrNull(String productId, String deviceId) { CacheKey cacheKey = new CacheKey(productId, deviceId); TransparentMessageCodec codec = codecs.get(cacheKey); @@ -94,7 +114,7 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev msg.addHeader("encodeBy", message.getMessageType().name()); //所有透传消息都设置为异步 msg.addHeader(Headers.async, true); - // msg.addHeader(Headers.sendAndForget, true); + // msg.addHeader(Headers.sendAndForget, true); }) ) .defaultIfEmpty(message); @@ -110,7 +130,7 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev return provider .createCodec(entity.getConfiguration()) .doOnNext(codec -> codecs.put(key, codec)) - .contextWrite(OperationSource.ofContext(entity.getId(),null,entity)) + .contextWrite(OperationSource.ofContext(entity.getId(), null, entity)) .switchIfEmpty(Mono.fromRunnable(() -> codecs.remove(key))) .then(); }