优化mqtt设备网关逻辑
This commit is contained in:
parent
81a3f4f58d
commit
8bee960d66
|
|
@ -12,6 +12,7 @@ import org.jetlinks.core.device.DeviceOperator;
|
|||
import org.jetlinks.core.device.DeviceRegistry;
|
||||
import org.jetlinks.core.device.MqttAuthenticationRequest;
|
||||
import org.jetlinks.core.message.CommonDeviceMessage;
|
||||
import org.jetlinks.core.message.DeviceMessage;
|
||||
import org.jetlinks.core.message.Message;
|
||||
import org.jetlinks.core.message.codec.*;
|
||||
import org.jetlinks.core.server.session.DeviceSession;
|
||||
|
|
@ -38,7 +39,7 @@ import java.util.concurrent.atomic.LongAdder;
|
|||
import java.util.function.Function;
|
||||
|
||||
@Slf4j
|
||||
class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGateway {
|
||||
class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
|
||||
|
||||
@Getter
|
||||
private final String id;
|
||||
|
|
@ -103,7 +104,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|||
}
|
||||
|
||||
//处理连接,并进行认证
|
||||
private Mono<Tuple3<DeviceOperator,AuthenticationResponse,MqttConnection>> handleConnection(MqttConnection connection) {
|
||||
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
||||
return Mono.justOrEmpty(connection.getAuth())
|
||||
//没有认证信息,则拒绝连接.
|
||||
.switchIfEmpty(Mono.fromRunnable(() -> {
|
||||
|
|
@ -186,14 +187,14 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|||
.takeWhile(pub -> disposable != null)
|
||||
.doOnNext(msg -> gatewayMonitor.receivedMessage())
|
||||
.flatMap(publishing ->
|
||||
this.decodeAndHandleMessage(operator, session, publishing.getMessage())
|
||||
this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
|
||||
//ack
|
||||
.doOnSuccess(s -> publishing.acknowledge())
|
||||
)
|
||||
//合并遗言消息
|
||||
.mergeWith(
|
||||
Mono.justOrEmpty(connection.getWillMessage())
|
||||
.flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage))
|
||||
.flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection))
|
||||
)
|
||||
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
|
||||
.subscribe();
|
||||
|
|
@ -202,21 +203,45 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|||
//解码消息并处理
|
||||
private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
|
||||
DeviceSession session,
|
||||
MqttMessage message) {
|
||||
MqttMessage message,
|
||||
MqttConnection connection) {
|
||||
return operator
|
||||
.getProtocol()
|
||||
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(msg -> {
|
||||
if(msg instanceof CommonDeviceMessage){
|
||||
CommonDeviceMessage _msg= ((CommonDeviceMessage) msg);
|
||||
if(StringUtils.isEmpty(_msg.getDeviceId())){
|
||||
if (msg instanceof CommonDeviceMessage) {
|
||||
CommonDeviceMessage _msg = ((CommonDeviceMessage) msg);
|
||||
if (StringUtils.isEmpty(_msg.getDeviceId())) {
|
||||
_msg.setDeviceId(operator.getDeviceId());
|
||||
}
|
||||
}
|
||||
if (messageProcessor.hasDownstreams()) {
|
||||
sink.next(msg);
|
||||
}
|
||||
String deviceId = msg.getDeviceId();
|
||||
|
||||
//返回了其他设备的消息,则自动创建会话
|
||||
if (!deviceId.equals(operator.getDeviceId())) {
|
||||
DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());
|
||||
if (anotherSession == null) {
|
||||
|
||||
connection.onClose(c -> sessionManager.unregister(deviceId));
|
||||
|
||||
return registry
|
||||
.getDevice(msg.getDeviceId())
|
||||
.doOnNext(device -> sessionManager.register(
|
||||
new MqttConnectionSession(msg.getDeviceId(), device, getTransport(), connection) {
|
||||
@Override
|
||||
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
||||
return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
|
||||
}
|
||||
}))
|
||||
.then(messageHandler.handleMessage(operator, msg))
|
||||
;
|
||||
}
|
||||
}
|
||||
//丢给默认的消息处理逻辑
|
||||
return messageHandler.handleMessage(operator, msg);
|
||||
})
|
||||
|
|
@ -225,6 +250,7 @@ class MqttServerDeviceGateway implements DeviceGateway , MonitorSupportDeviceGat
|
|||
.onErrorResume((err) -> Mono.empty())//发生错误不中断流
|
||||
;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Transport getTransport() {
|
||||
return DefaultTransport.MQTT;
|
||||
|
|
|
|||
Loading…
Reference in New Issue