mqtt 增加短链接支持

This commit is contained in:
zhou-hao 2020-08-24 16:23:53 +08:00
parent 5cce02a1e1
commit defd5a294f
2 changed files with 110 additions and 44 deletions

View File

@ -12,9 +12,7 @@ import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.MqttAuthenticationRequest;
import org.jetlinks.core.message.CommonDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
@ -24,6 +22,8 @@ import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.server.session.ReplaceableDeviceSession;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
@ -36,6 +36,7 @@ import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import javax.annotation.Nonnull;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
@ -74,7 +75,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
MqttServer mqttServer,
DecodedClientMessageHandler messageHandler,
Mono<ProtocolSupport> customProtocol
) {
) {
this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id);
this.id = id;
this.registry = registry;
@ -89,7 +90,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
return counter.sum();
}
private void doStart() {
if (started.getAndSet(true) || disposable != null) {
return;
@ -103,7 +103,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
}
return started.get();
})
.publishOn(Schedulers.parallel())
.flatMap(this::handleConnection)
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
@ -116,7 +115,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
//处理连接并进行认证
private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
return Mono
return Mono
.justOrEmpty(connection.getAuth())
.flatMap(auth -> {
MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), getTransport());
@ -147,31 +147,35 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
}
//处理认证结果
private Mono<Tuple3<MqttConnection, DeviceOperator, DeviceSession>> handleAuthResponse(DeviceOperator device,
AuthenticationResponse resp,
MqttConnection connection) {
private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device,
AuthenticationResponse resp,
MqttConnection connection) {
return Mono
.fromCallable(() -> {
String deviceId = device.getDeviceId();
if (resp.isSuccess()) {
counter.increment();
DeviceSession session = new MqttConnectionSession(deviceId, device, getTransport(), connection) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
}
};
sessionManager.register(session);
DeviceSession session = sessionManager.getSession(deviceId);
MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, gatewayMonitor);
if (null == session) {
sessionManager.register(newSession);
} else if (session instanceof ReplaceableDeviceSession) {
((ReplaceableDeviceSession) session).replaceWith(newSession);
}
gatewayMonitor.connected();
gatewayMonitor.totalConnection(counter.sum());
//监听断开连接
connection.onClose(conn -> {
counter.decrement();
sessionManager.unregister(deviceId);
DeviceSession _tmp = sessionManager.getSession(newSession.getId());
if (newSession == _tmp || _tmp == null) {
sessionManager.unregister(deviceId);
}
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
return Tuples.of(connection.accept(), device, session);
return Tuples.of(connection.accept(), device, newSession);
} else {
connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
gatewayMonitor.rejected();
@ -187,7 +191,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
}
//处理已经建立连接的MQTT连接
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) {
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) {
return connection
.handleMessage()
@ -214,7 +218,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
//解码消息并处理
private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
DeviceSession session,
MqttConnectionSession session,
MqttMessage message,
MqttConnection connection) {
return operator
@ -229,34 +233,19 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
_msg.setDeviceId(operator.getDeviceId());
}
}
if (messageProcessor.hasDownstreams()) {
sink.next(msg);
}
String deviceId = msg.getDeviceId();
if (!StringUtils.isEmpty(deviceId)) {
//返回了其他设备的消息,则自动创建会话
if (!deviceId.equals(operator.getDeviceId())) {
DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());
if (anotherSession == null) {
connection.onClose(c -> sessionManager.unregister(deviceId));
return registry
.getDevice(msg.getDeviceId())
.doOnNext(device -> sessionManager.register(
new MqttConnectionSession(msg.getDeviceId(), device, getTransport(), connection) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage());
}
}))
.then(messageHandler.handleMessage(operator, msg))
;
.flatMap(device -> handleMessage(device.getDeviceId(), device, msg, session));
}
}
}
//丢给默认的消息处理逻辑
return messageHandler.handleMessage(operator, msg);
return handleMessage(deviceId, operator, msg, session);
})
.then()
.doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err)))
@ -264,6 +253,50 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
;
}
private Mono<Void> handleMessage(String deviceId,
DeviceOperator device,
DeviceMessage message,
MqttConnectionSession firstSession) {
DeviceSession managedSession = sessionManager.getSession(deviceId);
//主动离线
if (message instanceof DeviceOfflineMessage) {
sessionManager.unregister(deviceId);
return Mono.empty();
}
//session 不存在,可能是同一个mqtt返回多个设备消息
if (managedSession == null) {
firstSession = new MqttConnectionSession(deviceId, device, getTransport(), firstSession.getConnection(), gatewayMonitor);
sessionManager.register(managedSession = firstSession);
}
//保持会话在低功率设备上,可能无法保持mqtt长连接.
if (message.getHeader(Headers.keepOnline).orElse(false)) {
if (!managedSession.isWrapFrom(KeepOnlineSession.class)) {
int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds);
KeepOnlineSession keepOnlineSession = new KeepOnlineSession(firstSession, Duration.ofSeconds(timeout));
//替换会话
managedSession = sessionManager.replace(firstSession, keepOnlineSession);
}
} else {
managedSession = firstSession;
}
managedSession.keepAlive();
if (messageProcessor.hasDownstreams()) {
sink.next(message);
}
if (message instanceof DeviceOnlineMessage) {
return Mono.empty();
}
return messageHandler
.handleMessage(device, message)
.then();
}
@Override
public Transport getTransport() {
return DefaultTransport.MQTT;

View File

@ -1,40 +1,51 @@
package org.jetlinks.community.network.mqtt.gateway.device.session;
import lombok.Getter;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.core.server.session.ReplaceableDeviceSession;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
public class MqttConnectionSession implements DeviceSession {
public class MqttConnectionSession implements DeviceSession, ReplaceableDeviceSession {
@Getter
private String id;
private final String id;
@Getter
private DeviceOperator operator;
private final DeviceOperator operator;
@Getter
private Transport transport;
private final Transport transport;
@Getter
private MqttConnection connection;
public MqttConnectionSession(String id, DeviceOperator operator, Transport transport, MqttConnection connection) {
private final DeviceGatewayMonitor monitor;
private final long connectTime = System.currentTimeMillis();
public MqttConnectionSession(String id,
DeviceOperator operator,
Transport transport,
MqttConnection connection,
DeviceGatewayMonitor monitor) {
this.id = id;
this.operator = operator;
this.transport = transport;
this.connection = connection;
this.monitor = monitor;
}
private long connectTime = System.currentTimeMillis();
@Override
public String getDeviceId() {
@ -54,6 +65,7 @@ public class MqttConnectionSession implements DeviceSession {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return Mono.defer(() -> connection.publish(((MqttMessage) encodedMessage)))
.doOnSuccess(nil -> monitor.sentMessage())
.thenReturn(true);
}
@ -86,4 +98,25 @@ public class MqttConnectionSession implements DeviceSession {
public Optional<InetSocketAddress> getClientAddress() {
return Optional.ofNullable(connection.getClientAddress());
}
@Override
public void replaceWith(DeviceSession session) {
if (session instanceof MqttConnectionSession) {
MqttConnectionSession connectionSession = ((MqttConnectionSession) session);
this.connection = connectionSession.connection;
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
MqttConnectionSession that = (MqttConnectionSession) o;
return Objects.equals(connection, that.connection);
}
@Override
public int hashCode() {
return Objects.hash(connection);
}
}