diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index 122c26bd..3a9a272c 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -12,6 +12,7 @@ import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.MqttAuthenticationRequest; import org.jetlinks.core.message.CommonDeviceMessage; +import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.Message; import org.jetlinks.core.message.codec.*; import org.jetlinks.core.server.session.DeviceSession; @@ -38,7 +39,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @Slf4j -class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGateway { +class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway { @Getter private final String id; @@ -103,7 +104,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat } //处理连接,并进行认证 - private Mono> handleConnection(MqttConnection connection) { + private Mono> handleConnection(MqttConnection connection) { return Mono.justOrEmpty(connection.getAuth()) //没有认证信息,则拒绝连接. .switchIfEmpty(Mono.fromRunnable(() -> { @@ -186,14 +187,14 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat .takeWhile(pub -> disposable != null) .doOnNext(msg -> gatewayMonitor.receivedMessage()) .flatMap(publishing -> - this.decodeAndHandleMessage(operator, session, publishing.getMessage()) + this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection) //ack .doOnSuccess(s -> publishing.acknowledge()) ) //合并遗言消息 .mergeWith( Mono.justOrEmpty(connection.getWillMessage()) - .flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage)) + .flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection)) ) .subscriberContext(ReactiveLogger.start("network", mqttServer.getId())) .subscribe(); @@ -202,21 +203,45 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat //解码消息并处理 private Mono decodeAndHandleMessage(DeviceOperator operator, DeviceSession session, - MqttMessage message) { + MqttMessage message, + MqttConnection connection) { return operator .getProtocol() .flatMap(protocol -> protocol.getMessageCodec(getTransport())) .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message))) + .cast(DeviceMessage.class) .flatMap(msg -> { - if(msg instanceof CommonDeviceMessage){ - CommonDeviceMessage _msg= ((CommonDeviceMessage) msg); - if(StringUtils.isEmpty(_msg.getDeviceId())){ + if (msg instanceof CommonDeviceMessage) { + CommonDeviceMessage _msg = ((CommonDeviceMessage) msg); + if (StringUtils.isEmpty(_msg.getDeviceId())) { _msg.setDeviceId(operator.getDeviceId()); } } if (messageProcessor.hasDownstreams()) { sink.next(msg); } + String deviceId = msg.getDeviceId(); + + //返回了其他设备的消息,则自动创建会话 + if (!deviceId.equals(operator.getDeviceId())) { + DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId()); + if (anotherSession == null) { + + connection.onClose(c -> sessionManager.unregister(deviceId)); + + return registry + .getDevice(msg.getDeviceId()) + .doOnNext(device -> sessionManager.register( + new MqttConnectionSession(msg.getDeviceId(), device, getTransport(), connection) { + @Override + public Mono send(EncodedMessage encodedMessage) { + return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage()); + } + })) + .then(messageHandler.handleMessage(operator, msg)) + ; + } + } //丢给默认的消息处理逻辑 return messageHandler.handleMessage(operator, msg); }) @@ -225,6 +250,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat .onErrorResume((err) -> Mono.empty())//发生错误不中断流 ; } + @Override public Transport getTransport() { return DefaultTransport.MQTT;