From a03b86cb295a86676ae0483bdd2f606d1000ea4a Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Fri, 31 Jul 2020 17:14:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96buffer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/vertx/VertxMqttConnection.java | 30 +++++++++++++------ .../message/DeviceMessageConnector.java | 6 ++-- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java index 75b163e7..b3b0d9f3 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java @@ -38,18 +38,30 @@ import java.util.stream.Collectors; @Slf4j class VertxMqttConnection implements MqttConnection { - private MqttEndpoint endpoint; + private final MqttEndpoint endpoint; private long keepAliveTimeoutMs; @Getter private long lastPingTime = System.currentTimeMillis(); private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true; - private EmitterProcessor messageProcessor = EmitterProcessor.create(false); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(false); - private FluxSink publishingFluxSink = messageProcessor.sink(); + private final FluxSink publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); - private EmitterProcessor subscription = EmitterProcessor.create(false); - private EmitterProcessor unsubscription = EmitterProcessor.create(false); + private final EmitterProcessor subscription = EmitterProcessor.create(false); + private final EmitterProcessor unsubscription = EmitterProcessor.create(false); + + private static final MqttAuth emptyAuth = new MqttAuth() { + @Override + public String getUsername() { + return ""; + } + + @Override + public String getPassword() { + return ""; + } + }; public VertxMqttConnection(MqttEndpoint endpoint) { this.endpoint = endpoint; @@ -73,7 +85,7 @@ class VertxMqttConnection implements MqttConnection { @Override public Optional getAuth() { - return endpoint.auth() == null ? Optional.empty() : Optional.of(new VertxMqttAuth()); + return endpoint.auth() == null ? Optional.of(emptyAuth) : Optional.of(new VertxMqttAuth()); } @Override @@ -339,7 +351,7 @@ class VertxMqttConnection implements MqttConnection { @AllArgsConstructor class VertxMqttPublishing implements MqttPublishing { - private MqttPublishMessage message; + private final MqttPublishMessage message; private volatile boolean acknowledged; @@ -367,7 +379,7 @@ class VertxMqttConnection implements MqttConnection { @AllArgsConstructor class VertxMqttSubscription implements MqttSubscription { - private MqttSubscribeMessage message; + private final MqttSubscribeMessage message; private volatile boolean acknowledged; @@ -390,7 +402,7 @@ class VertxMqttConnection implements MqttConnection { @AllArgsConstructor class VertxMqttMqttUnSubscription implements MqttUnSubscription { - private MqttUnsubscribeMessage message; + private final MqttUnsubscribeMessage message; private volatile boolean acknowledged; diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 5390b3ab..c299f99d 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -30,12 +30,12 @@ public class DeviceMessageConnector MessageConnection, MessagePublisher { - private EmitterProcessor messageProcessor = EmitterProcessor.create(false); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(false); - private FluxSink sink = messageProcessor.sink(); + private final FluxSink sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); //将设备注册中心到配置追加到消息header中,下游订阅者可直接使用. - private String[] appendConfigHeader = {"orgId", "productId", "deviceName"}; + private final String[] appendConfigHeader = {"orgId", "productId", "deviceName"}; //设备注册中心 private final DeviceRegistry registry;