From defd5a294f618a11da743420d68fbd92431aa977 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Mon, 24 Aug 2020 16:23:53 +0800 Subject: [PATCH] =?UTF-8?q?mqtt=20=E5=A2=9E=E5=8A=A0=E7=9F=AD=E9=93=BE?= =?UTF-8?q?=E6=8E=A5=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/MqttServerDeviceGateway.java | 109 ++++++++++++------ .../device/session/MqttConnectionSession.java | 45 +++++++- 2 files changed, 110 insertions(+), 44 deletions(-) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index c66785e0..35682579 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -12,9 +12,7 @@ import org.jetlinks.core.device.AuthenticationResponse; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.MqttAuthenticationRequest; -import org.jetlinks.core.message.CommonDeviceMessage; -import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.*; import org.jetlinks.core.message.codec.*; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSessionManager; @@ -24,6 +22,8 @@ import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession; import org.jetlinks.community.network.mqtt.server.MqttConnection; import org.jetlinks.community.network.mqtt.server.MqttServer; +import org.jetlinks.core.server.session.KeepOnlineSession; +import org.jetlinks.core.server.session.ReplaceableDeviceSession; import org.jetlinks.supports.server.DecodedClientMessageHandler; import org.springframework.util.StringUtils; import reactor.core.Disposable; @@ -36,6 +36,7 @@ import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import javax.annotation.Nonnull; +import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; @@ -74,7 +75,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate MqttServer mqttServer, DecodedClientMessageHandler messageHandler, Mono customProtocol - ) { + ) { this.gatewayMonitor = GatewayMonitors.getDeviceGatewayMonitor(id); this.id = id; this.registry = registry; @@ -89,7 +90,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate return counter.sum(); } - private void doStart() { if (started.getAndSet(true) || disposable != null) { return; @@ -103,7 +103,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate } return started.get(); }) - .publishOn(Schedulers.parallel()) .flatMap(this::handleConnection) .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3())) @@ -116,7 +115,8 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate //处理连接,并进行认证 private Mono> handleConnection(MqttConnection connection) { - return Mono + + return Mono .justOrEmpty(connection.getAuth()) .flatMap(auth -> { MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), getTransport()); @@ -147,31 +147,35 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate } //处理认证结果 - private Mono> handleAuthResponse(DeviceOperator device, - AuthenticationResponse resp, - MqttConnection connection) { + private Mono> handleAuthResponse(DeviceOperator device, + AuthenticationResponse resp, + MqttConnection connection) { return Mono .fromCallable(() -> { String deviceId = device.getDeviceId(); if (resp.isSuccess()) { counter.increment(); - DeviceSession session = new MqttConnectionSession(deviceId, device, getTransport(), connection) { - @Override - public Mono send(EncodedMessage encodedMessage) { - return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage()); - } - }; - sessionManager.register(session); + DeviceSession session = sessionManager.getSession(deviceId); + MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, gatewayMonitor); + if (null == session) { + sessionManager.register(newSession); + } else if (session instanceof ReplaceableDeviceSession) { + ((ReplaceableDeviceSession) session).replaceWith(newSession); + } gatewayMonitor.connected(); gatewayMonitor.totalConnection(counter.sum()); //监听断开连接 connection.onClose(conn -> { counter.decrement(); - sessionManager.unregister(deviceId); + DeviceSession _tmp = sessionManager.getSession(newSession.getId()); + + if (newSession == _tmp || _tmp == null) { + sessionManager.unregister(deviceId); + } gatewayMonitor.disconnected(); gatewayMonitor.totalConnection(counter.sum()); }); - return Tuples.of(connection.accept(), device, session); + return Tuples.of(connection.accept(), device, newSession); } else { connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); gatewayMonitor.rejected(); @@ -187,7 +191,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate } //处理已经建立连接的MQTT连接 - private Mono handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) { + private Mono handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) { return connection .handleMessage() @@ -214,7 +218,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate //解码消息并处理 private Mono decodeAndHandleMessage(DeviceOperator operator, - DeviceSession session, + MqttConnectionSession session, MqttMessage message, MqttConnection connection) { return operator @@ -229,34 +233,19 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate _msg.setDeviceId(operator.getDeviceId()); } } - if (messageProcessor.hasDownstreams()) { - sink.next(msg); - } String deviceId = msg.getDeviceId(); if (!StringUtils.isEmpty(deviceId)) { //返回了其他设备的消息,则自动创建会话 if (!deviceId.equals(operator.getDeviceId())) { DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId()); if (anotherSession == null) { - - connection.onClose(c -> sessionManager.unregister(deviceId)); - return registry .getDevice(msg.getDeviceId()) - .doOnNext(device -> sessionManager.register( - new MqttConnectionSession(msg.getDeviceId(), device, getTransport(), connection) { - @Override - public Mono send(EncodedMessage encodedMessage) { - return super.send(encodedMessage).doOnSuccess(s -> gatewayMonitor.sentMessage()); - } - })) - .then(messageHandler.handleMessage(operator, msg)) - ; + .flatMap(device -> handleMessage(device.getDeviceId(), device, msg, session)); } } } - //丢给默认的消息处理逻辑 - return messageHandler.handleMessage(operator, msg); + return handleMessage(deviceId, operator, msg, session); }) .then() .doOnEach(ReactiveLogger.onError(err -> log.error("处理MQTT连接[{}]消息失败:{}", operator.getDeviceId(), message, err))) @@ -264,6 +253,50 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate ; } + private Mono handleMessage(String deviceId, + DeviceOperator device, + DeviceMessage message, + MqttConnectionSession firstSession) { + DeviceSession managedSession = sessionManager.getSession(deviceId); + + //主动离线 + if (message instanceof DeviceOfflineMessage) { + sessionManager.unregister(deviceId); + return Mono.empty(); + } + + //session 不存在,可能是同一个mqtt返回多个设备消息 + if (managedSession == null) { + firstSession = new MqttConnectionSession(deviceId, device, getTransport(), firstSession.getConnection(), gatewayMonitor); + sessionManager.register(managedSession = firstSession); + } + + //保持会话,在低功率设备上,可能无法保持mqtt长连接. + if (message.getHeader(Headers.keepOnline).orElse(false)) { + if (!managedSession.isWrapFrom(KeepOnlineSession.class)) { + int timeout = message.getHeaderOrDefault(Headers.keepOnlineTimeoutSeconds); + KeepOnlineSession keepOnlineSession = new KeepOnlineSession(firstSession, Duration.ofSeconds(timeout)); + //替换会话 + managedSession = sessionManager.replace(firstSession, keepOnlineSession); + } + } else { + managedSession = firstSession; + } + + managedSession.keepAlive(); + + if (messageProcessor.hasDownstreams()) { + sink.next(message); + } + if (message instanceof DeviceOnlineMessage) { + return Mono.empty(); + } + + return messageHandler + .handleMessage(device, message) + .then(); + } + @Override public Transport getTransport() { return DefaultTransport.MQTT; diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttConnectionSession.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttConnectionSession.java index b9c02656..dea3a7c1 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttConnectionSession.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttConnectionSession.java @@ -1,40 +1,51 @@ package org.jetlinks.community.network.mqtt.gateway.device.session; import lombok.Getter; +import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.community.network.mqtt.server.MqttConnection; +import org.jetlinks.core.server.session.ReplaceableDeviceSession; import reactor.core.publisher.Mono; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Objects; import java.util.Optional; -public class MqttConnectionSession implements DeviceSession { +public class MqttConnectionSession implements DeviceSession, ReplaceableDeviceSession { @Getter - private String id; + private final String id; @Getter - private DeviceOperator operator; + private final DeviceOperator operator; @Getter - private Transport transport; + private final Transport transport; @Getter private MqttConnection connection; - public MqttConnectionSession(String id, DeviceOperator operator, Transport transport, MqttConnection connection) { + private final DeviceGatewayMonitor monitor; + + private final long connectTime = System.currentTimeMillis(); + + public MqttConnectionSession(String id, + DeviceOperator operator, + Transport transport, + MqttConnection connection, + DeviceGatewayMonitor monitor) { this.id = id; this.operator = operator; this.transport = transport; this.connection = connection; + this.monitor = monitor; } - private long connectTime = System.currentTimeMillis(); @Override public String getDeviceId() { @@ -54,6 +65,7 @@ public class MqttConnectionSession implements DeviceSession { @Override public Mono send(EncodedMessage encodedMessage) { return Mono.defer(() -> connection.publish(((MqttMessage) encodedMessage))) + .doOnSuccess(nil -> monitor.sentMessage()) .thenReturn(true); } @@ -86,4 +98,25 @@ public class MqttConnectionSession implements DeviceSession { public Optional getClientAddress() { return Optional.ofNullable(connection.getClientAddress()); } + + @Override + public void replaceWith(DeviceSession session) { + if (session instanceof MqttConnectionSession) { + MqttConnectionSession connectionSession = ((MqttConnectionSession) session); + this.connection = connectionSession.connection; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MqttConnectionSession that = (MqttConnectionSession) o; + return Objects.equals(connection, that.connection); + } + + @Override + public int hashCode() { + return Objects.hash(connection); + } }