refactor: 优化mqtt设备接入
This commit is contained in:
parent
101aa5bfa7
commit
f3a5fb2bff
|
|
@ -13,6 +13,7 @@ import org.jetlinks.community.network.DefaultNetworkType;
|
|||
import org.jetlinks.community.network.NetworkType;
|
||||
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
|
||||
import org.jetlinks.community.network.mqtt.server.MqttConnection;
|
||||
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
|
||||
import org.jetlinks.community.network.mqtt.server.MqttServer;
|
||||
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
|
||||
import org.jetlinks.community.utils.SystemUtils;
|
||||
|
|
@ -112,27 +113,25 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||
monitor.rejected();
|
||||
}
|
||||
return isStarted();
|
||||
return true;
|
||||
})
|
||||
.publishOn(Schedulers.parallel())
|
||||
//处理mqtt连接请求
|
||||
.flatMap(this::handleConnection)
|
||||
//处理认证结果
|
||||
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
||||
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()), Integer.MAX_VALUE)
|
||||
.contextWrite(ReactiveLogger.start("network", mqttServer.getId()))
|
||||
.flatMap(connection -> this
|
||||
.handleConnection(connection)
|
||||
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
||||
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()))
|
||||
.onErrorResume(err -> {
|
||||
log.error(err.getMessage(), err);
|
||||
return Mono.empty();
|
||||
}),
|
||||
Integer.MAX_VALUE)
|
||||
.subscribe();
|
||||
|
||||
}
|
||||
|
||||
//处理连接,并进行认证
|
||||
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
|
||||
//内存不够了
|
||||
if (SystemUtils.memoryIsOutOfWatermark()) {
|
||||
//直接拒绝,响应SERVER_UNAVAILABLE,不再处理此连接
|
||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
return Mono
|
||||
.justOrEmpty(connection.getAuth())
|
||||
.flatMap(auth -> {
|
||||
|
|
@ -170,7 +169,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
//应答SERVER_UNAVAILABLE
|
||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
|
||||
}))
|
||||
.subscribeOn(Schedulers.parallel());
|
||||
;
|
||||
}
|
||||
|
||||
//处理认证结果
|
||||
|
|
@ -190,7 +189,7 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
monitor.totalConnection(counter.sum());
|
||||
|
||||
sessionManager
|
||||
.getSession(deviceId)
|
||||
.getSession(deviceId, false)
|
||||
.flatMap(_tmp -> {
|
||||
//只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline,
|
||||
//或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况.
|
||||
|
|
@ -219,19 +218,21 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
})
|
||||
.defaultIfEmpty(newSession);
|
||||
})
|
||||
.flatMap(session -> Mono.fromCallable(() -> {
|
||||
.mapNotNull(session->{
|
||||
try {
|
||||
return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class));
|
||||
} catch (IllegalStateException ignore) {
|
||||
//忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式?
|
||||
return null;
|
||||
}
|
||||
}))
|
||||
})
|
||||
.doOnNext(o -> {
|
||||
//监控信息
|
||||
monitor.connected();
|
||||
monitor.totalConnection(counter.sum());
|
||||
});
|
||||
})
|
||||
//会话empty说明注册会话失败?
|
||||
.switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
|
||||
} else {
|
||||
//认证失败返回 0x04 BAD_USER_NAME_OR_PASSWORD
|
||||
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
|
||||
|
|
@ -253,20 +254,16 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection,
|
||||
DeviceOperator operator,
|
||||
MqttConnectionSession session) {
|
||||
|
||||
|
||||
return Flux
|
||||
.usingWhen(Mono.just(connection),
|
||||
MqttConnection::handleMessage,
|
||||
MqttConnection::close)
|
||||
//网关暂停或者已停止时,则不处理消息
|
||||
.filter(pb -> isStarted())
|
||||
.doOnNext(msg -> monitor.receivedMessage())
|
||||
.publishOn(Schedulers.parallel())
|
||||
//解码收到的mqtt报文
|
||||
.flatMap(publishing -> this
|
||||
.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
|
||||
//应答MQTT(QoS1,2的场景)
|
||||
.doOnSuccess(s -> publishing.acknowledge())
|
||||
.concatMap(publishing -> this
|
||||
.decodeAndHandleMessage(operator, session, publishing, connection)
|
||||
)
|
||||
//合并遗言消息
|
||||
.mergeWith(
|
||||
|
|
@ -282,30 +279,32 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu
|
|||
MqttConnectionSession session,
|
||||
MqttMessage message,
|
||||
MqttConnection connection) {
|
||||
monitor.receivedMessage();
|
||||
|
||||
return operator
|
||||
.getProtocol()
|
||||
.flatMap(protocol -> protocol.getMessageCodec(getTransport()))
|
||||
//解码
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(session, message, registry)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(msg -> {
|
||||
.concatMap(msg -> {
|
||||
//回填deviceId,有的场景协议包不能或者没有解析出deviceId,则直接使用连接对应的设备id进行填充.
|
||||
if (!StringUtils.hasText(msg.getDeviceId())) {
|
||||
msg.thingId(DeviceThingType.device, operator.getDeviceId());
|
||||
}
|
||||
return this
|
||||
.handleMessage(operator, msg, connection);
|
||||
return this.handleMessage(operator, msg, connection);
|
||||
})
|
||||
.doOnComplete(() -> {
|
||||
if (message instanceof MqttPublishing) {
|
||||
((MqttPublishing) message).acknowledge();
|
||||
}
|
||||
})
|
||||
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
|
||||
.as(FluxTracer
|
||||
.create(DeviceTracer.SpanName.decode(operator.getDeviceId()),
|
||||
(span, msg) -> span.setAttribute(DeviceTracer.SpanKey.message, msg
|
||||
.toJson()
|
||||
.toJSONString())))
|
||||
//发生错误不中断流
|
||||
.onErrorResume((err) -> Mono.empty())
|
||||
.then()
|
||||
.subscribeOn(Schedulers.parallel());
|
||||
.onErrorResume((err) -> {
|
||||
log.error("handle mqtt message [{}] error:{}", operator.getDeviceId(), message, err);
|
||||
return Mono.empty();
|
||||
})
|
||||
.then();
|
||||
}
|
||||
|
||||
private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice,
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import org.jetlinks.core.message.codec.EncodedMessage;
|
|||
import org.jetlinks.core.message.codec.MqttMessage;
|
||||
import org.jetlinks.core.message.codec.SimpleMqttMessage;
|
||||
import org.jetlinks.core.server.mqtt.MqttAuth;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import reactor.core.publisher.*;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
|
@ -41,7 +42,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
private long keepAliveTimeoutMs;
|
||||
@Getter
|
||||
private long lastPingTime = System.currentTimeMillis();
|
||||
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true;
|
||||
private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false;
|
||||
private static final MqttAuth emptyAuth = new MqttAuth() {
|
||||
@Override
|
||||
public String getUsername() {
|
||||
|
|
@ -53,19 +54,9 @@ class VertxMqttConnection implements MqttConnection {
|
|||
return "";
|
||||
}
|
||||
};
|
||||
private final Sinks.Many<MqttPublishing> messageProcessor = Sinks
|
||||
.many()
|
||||
.multicast()
|
||||
.onBackpressureBuffer(Integer.MAX_VALUE);
|
||||
|
||||
private final Sinks.Many<MqttSubscription> subscription = Sinks
|
||||
.many()
|
||||
.multicast()
|
||||
.onBackpressureBuffer(Integer.MAX_VALUE);
|
||||
private final Sinks.Many<MqttUnSubscription> unsubscription = Sinks
|
||||
.many()
|
||||
.multicast()
|
||||
.onBackpressureBuffer(Integer.MAX_VALUE);
|
||||
private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
private final Sinks.Many<MqttSubscription> subscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);
|
||||
|
||||
|
||||
public VertxMqttConnection(MqttEndpoint endpoint) {
|
||||
|
|
@ -178,7 +169,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
publishing.acknowledge();
|
||||
}
|
||||
if (hasDownstream) {
|
||||
this.messageProcessor.tryEmitNext(publishing);
|
||||
this.messageProcessor.emitNext(publishing, Reactors.emitFailureHandler());
|
||||
}
|
||||
})
|
||||
//QoS 1 PUBACK
|
||||
|
|
@ -211,7 +202,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
subscription.acknowledge();
|
||||
}
|
||||
if (hasDownstream) {
|
||||
this.subscription.tryEmitNext(subscription);
|
||||
this.subscription.emitNext(subscription, Reactors.emitFailureHandler());
|
||||
}
|
||||
})
|
||||
.unsubscribeHandler(msg -> {
|
||||
|
|
@ -222,7 +213,7 @@ class VertxMqttConnection implements MqttConnection {
|
|||
unSubscription.acknowledge();
|
||||
}
|
||||
if (hasDownstream) {
|
||||
this.unsubscription.tryEmitNext(unSubscription);
|
||||
this.unsubscription.emitNext(unSubscription, Reactors.emitFailureHandler());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue