From bc985539e22f98f81aef3e9bebb2e47cfe424d98 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Mon, 27 Jun 2022 15:40:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96MQTT?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/MqttServerDeviceGateway.java | 55 +++--- .../network/mqtt/server/MqttPublishing.java | 3 +- .../server/vertx/VertxMqttConnection.java | 174 +++++++++++------- 3 files changed, 136 insertions(+), 96 deletions(-) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index ef79ed22..a0a5c24c 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -182,44 +182,45 @@ class MqttServerDeviceGateway extends AbstractDeviceGateway implements MonitorSu String deviceId = device.getDeviceId(); //认证通过 if (resp.isSuccess()) { + //监听断开连接 + connection.onClose(conn -> { + counter.decrement(); + //监控信息 + monitor.disconnected(); + monitor.totalConnection(counter.sum()); + + sessionManager + .getSession(deviceId) + .flatMap(_tmp -> { + //只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline, + //或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况. + if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) { + MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class); + if (connectionSession.getConnection() == conn) { + return sessionManager.remove(deviceId, true); + } + } + return Mono.empty(); + }) + .subscribe(); + }); + counter.increment(); return sessionManager .compute(deviceId, old -> { MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, monitor); return old - .doOnNext(session -> { - if (session instanceof ReplaceableDeviceSession) { - //如果是可替换的会话,则替换为新的会话 - //通常是设置了keepOnline或者之前的会话还没有来得及移除时,直接更新为新的会话. - ((ReplaceableDeviceSession) session).replaceWith(newSession); + .map(session -> { + if (session instanceof KeepOnlineSession) { + //KeepOnlineSession 则依旧保持keepOnline + return new KeepOnlineSession(newSession, session.getKeepAliveTimeout()); } + return newSession; }) .defaultIfEmpty(newSession); }) .flatMap(session -> Mono.fromCallable(() -> { try { - //监听断开连接 - connection.onClose(conn -> { - counter.decrement(); - //监控信息 - monitor.disconnected(); - monitor.totalConnection(counter.sum()); - - sessionManager - .getSession(session.getDeviceId()) - .flatMap(_tmp -> { - //只有与创建的会话相同才移除(下线),因为有可能设置了keepOnline, - //或者设备通过其他方式注册了会话,这里断开连接不能影响到以上情况. - if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession)) { - MqttConnectionSession connectionSession = _tmp.unwrap(MqttConnectionSession.class); - if (connectionSession.getConnection() == conn) { - return sessionManager.remove(deviceId, true); - } - } - return Mono.empty(); - }) - .subscribe(); - }); return Tuples.of(connection.accept(), device, session.unwrap(MqttConnectionSession.class)); } catch (IllegalStateException ignore) { //忽略错误,偶尔可能会出现网络异常,导致accept时,连接已经中断.还有其他更好的处理方式? diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java index ecea2f9b..d31f3d65 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/MqttPublishing.java @@ -1,8 +1,9 @@ package org.jetlinks.community.network.mqtt.server; import org.jetlinks.core.message.codec.MqttMessage; +import org.jetlinks.core.server.mqtt.MqttPublishingMessage; -public interface MqttPublishing { +public interface MqttPublishing extends MqttPublishingMessage { MqttMessage getMessage(); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java index b018aede..e5742530 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java @@ -2,6 +2,7 @@ package org.jetlinks.community.network.mqtt.server.vertx; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.buffer.Buffer; @@ -18,17 +19,16 @@ import org.jetlinks.community.network.mqtt.server.MqttConnection; import org.jetlinks.community.network.mqtt.server.MqttPublishing; import org.jetlinks.community.network.mqtt.server.MqttSubscription; import org.jetlinks.community.network.mqtt.server.MqttUnSubscription; +import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.SimpleMqttMessage; import org.jetlinks.core.server.mqtt.MqttAuth; -import reactor.core.publisher.EmitterProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; +import reactor.core.publisher.*; import javax.annotation.Nonnull; import java.net.InetSocketAddress; import java.time.Duration; +import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; @@ -42,14 +42,6 @@ class VertxMqttConnection implements MqttConnection { @Getter private long lastPingTime = System.currentTimeMillis(); private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true; - - private final EmitterProcessor messageProcessor = EmitterProcessor.create(false); - - private final FluxSink publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); - - private final EmitterProcessor subscription = EmitterProcessor.create(false); - private final EmitterProcessor unsubscription = EmitterProcessor.create(false); - private static final MqttAuth emptyAuth = new MqttAuth() { @Override public String getUsername() { @@ -61,6 +53,20 @@ class VertxMqttConnection implements MqttConnection { return ""; } }; + private final Sinks.Many messageProcessor = Sinks + .many() + .multicast() + .onBackpressureBuffer(Integer.MAX_VALUE); + + private final Sinks.Many subscription = Sinks + .many() + .multicast() + .onBackpressureBuffer(Integer.MAX_VALUE); + private final Sinks.Many unsubscription = Sinks + .many() + .multicast() + .onBackpressureBuffer(Integer.MAX_VALUE); + public VertxMqttConnection(MqttEndpoint endpoint) { this.endpoint = endpoint; @@ -68,10 +74,10 @@ class VertxMqttConnection implements MqttConnection { } private final Consumer defaultListener = mqttConnection -> { - log.debug("mqtt client [{}] disconnected", getClientId()); - subscription.onComplete(); - unsubscription.onComplete(); - messageProcessor.onComplete(); + VertxMqttConnection.log.debug("mqtt client [{}] disconnected", getClientId()); + subscription.tryEmitComplete(); + unsubscription.tryEmitComplete(); + messageProcessor.tryEmitComplete(); }; @@ -92,20 +98,28 @@ class VertxMqttConnection implements MqttConnection { if (closed) { return; } - endpoint.reject(code); - complete(); + try { + endpoint.reject(code); + } catch (Throwable ignore) { + } + try { + complete(); + } catch (Throwable ignore) { + + } } @Override public Optional getWillMessage() { return Optional.ofNullable(endpoint.will()) - .filter(will -> will.getWillMessageBytes() != null) - .map(will -> SimpleMqttMessage.builder() - .will(true) - .payload(Unpooled.wrappedBuffer(will.getWillMessageBytes())) - .topic(will.getWillTopic()) - .qosLevel(will.getWillQos()) - .build()); + .filter(will -> will.getWillMessageBytes() != null) + .map(will -> SimpleMqttMessage + .builder() + .will(true) + .payload(Unpooled.wrappedBuffer(will.getWillMessageBytes())) + .topic(will.getWillTopic()) + .qosLevel(will.getWillQos()) + .build()); } @Override @@ -141,6 +155,15 @@ class VertxMqttConnection implements MqttConnection { this.endpoint .disconnectHandler(ignore -> this.complete()) .closeHandler(ignore -> this.complete()) + .exceptionHandler(error -> { + if (error instanceof DecoderException) { + if (error.getMessage().contains("too large message")) { + log.error("MQTT消息过大,请在网络组件中设置[最大消息长度].", error); + return; + } + } + log.error(error.getMessage(), error); + }) .pingHandler(ignore -> { this.ping(); if (!endpoint.isAutoKeepAlive()) { @@ -150,12 +173,12 @@ class VertxMqttConnection implements MqttConnection { .publishHandler(msg -> { ping(); VertxMqttPublishing publishing = new VertxMqttPublishing(msg, false); - boolean hasDownstream = this.messageProcessor.hasDownstreams(); - if (autoAckMsg || !hasDownstream) { + boolean hasDownstream = this.messageProcessor.currentSubscriberCount() > 0; + if (autoAckMsg && hasDownstream) { publishing.acknowledge(); } if (hasDownstream) { - this.publishingFluxSink.next(publishing); + this.messageProcessor.tryEmitNext(publishing); } }) //QoS 1 PUBACK @@ -183,23 +206,23 @@ class VertxMqttConnection implements MqttConnection { .subscribeHandler(msg -> { ping(); VertxMqttSubscription subscription = new VertxMqttSubscription(msg, false); - boolean hasDownstream = this.subscription.hasDownstreams(); + boolean hasDownstream = this.subscription.currentSubscriberCount() > 0; if (autoAckSub || !hasDownstream) { subscription.acknowledge(); } if (hasDownstream) { - this.subscription.onNext(subscription); + this.subscription.tryEmitNext(subscription); } }) .unsubscribeHandler(msg -> { ping(); VertxMqttMqttUnSubscription unSubscription = new VertxMqttMqttUnSubscription(msg, false); - boolean hasDownstream = this.unsubscription.hasDownstreams(); + boolean hasDownstream = this.unsubscription.currentSubscriberCount() > 0; if (autoAckUnSub || !hasDownstream) { unSubscription.acknowledge(); } if (hasDownstream) { - this.unsubscription.onNext(unSubscription); + this.unsubscription.tryEmitNext(unSubscription); } }); } @@ -220,7 +243,7 @@ class VertxMqttConnection implements MqttConnection { clientAddress = new InetSocketAddress(address.host(), address.port()); } } - }catch (Throwable ignore){ + } catch (Throwable ignore) { } return clientAddress; @@ -233,11 +256,7 @@ class VertxMqttConnection implements MqttConnection { @Override public Flux handleMessage() { - if (messageProcessor.isCancelled()) { - return Flux.empty(); - } - return messageProcessor - .map(Function.identity()); + return messageProcessor.asFlux(); } @Override @@ -247,17 +266,17 @@ class VertxMqttConnection implements MqttConnection { .create(sink -> { Buffer buffer = Buffer.buffer(message.getPayload()); endpoint.publish(message.getTopic(), - buffer, - MqttQoS.valueOf(message.getQosLevel()), - message.isDup(), - message.isRetain(), - result -> { - if (result.succeeded()) { - sink.success(); - } else { - sink.error(result.cause()); - } - } + buffer, + MqttQoS.valueOf(message.getQosLevel()), + message.isDup(), + message.isRetain(), + result -> { + if (result.succeeded()) { + sink.success(); + } else { + sink.error(result.cause()); + } + } ); }); } @@ -266,13 +285,13 @@ class VertxMqttConnection implements MqttConnection { public Flux handleSubscribe(boolean autoAck) { autoAckSub = autoAck; - return subscription.map(Function.identity()); + return subscription.asFlux(); } @Override public Flux handleUnSubscribe(boolean autoAck) { autoAckUnSub = autoAck; - return unsubscription.map(Function.identity()); + return unsubscription.asFlux(); } @Override @@ -282,11 +301,19 @@ class VertxMqttConnection implements MqttConnection { @Override public Mono close() { + if (closed) { + return Mono.empty(); + } return Mono.fromRunnable(() -> { - if (endpoint.isConnected()) { - endpoint.close(); + try { + if (endpoint.isConnected()) { + endpoint.close(); + } else { + complete(); + } + } catch (Throwable ignore) { } - }).doFinally(s -> this.complete()); + }); } @@ -296,12 +323,15 @@ class VertxMqttConnection implements MqttConnection { } closed = true; disconnectConsumer.accept(this); - disconnectConsumer = defaultListener; } + @AllArgsConstructor - class VertxMqttMessage implements MqttMessage { - MqttPublishMessage message; + class VertxMqttPublishing implements MqttPublishing { + + private final MqttPublishMessage message; + + private volatile boolean acknowledged; @Nonnull @Override @@ -349,18 +379,10 @@ class VertxMqttConnection implements MqttConnection { public String toString() { return print(); } - } - - @AllArgsConstructor - class VertxMqttPublishing implements MqttPublishing { - - private final MqttPublishMessage message; - - private volatile boolean acknowledged; @Override public MqttMessage getMessage() { - return new VertxMqttMessage(message); + return this; } @Override @@ -397,8 +419,11 @@ class VertxMqttConnection implements MqttConnection { return; } acknowledged = true; - endpoint.subscribeAcknowledge(message.messageId(), message.topicSubscriptions().stream() - .map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList())); + endpoint.subscribeAcknowledge(message.messageId(), message + .topicSubscriptions() + .stream() + .map(MqttTopicSubscription::qualityOfService) + .collect(Collectors.toList())); } } @@ -438,4 +463,17 @@ class VertxMqttConnection implements MqttConnection { return endpoint.auth().getPassword(); } } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VertxMqttConnection that = (VertxMqttConnection) o; + return Objects.equals(endpoint, that.endpoint); + } + + @Override + public int hashCode() { + return Objects.hash(endpoint); + } }