diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java index 75b163e7..b3b0d9f3 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java @@ -38,18 +38,30 @@ import java.util.stream.Collectors; @Slf4j class VertxMqttConnection implements MqttConnection { - private MqttEndpoint endpoint; + private final MqttEndpoint endpoint; private long keepAliveTimeoutMs; @Getter private long lastPingTime = System.currentTimeMillis(); private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true; - private EmitterProcessor messageProcessor = EmitterProcessor.create(false); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(false); - private FluxSink publishingFluxSink = messageProcessor.sink(); + private final FluxSink publishingFluxSink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); - private EmitterProcessor subscription = EmitterProcessor.create(false); - private EmitterProcessor unsubscription = EmitterProcessor.create(false); + private final EmitterProcessor subscription = EmitterProcessor.create(false); + private final EmitterProcessor unsubscription = EmitterProcessor.create(false); + + private static final MqttAuth emptyAuth = new MqttAuth() { + @Override + public String getUsername() { + return ""; + } + + @Override + public String getPassword() { + return ""; + } + }; public VertxMqttConnection(MqttEndpoint endpoint) { this.endpoint = endpoint; @@ -73,7 +85,7 @@ class VertxMqttConnection implements MqttConnection { @Override public Optional getAuth() { - return endpoint.auth() == null ? Optional.empty() : Optional.of(new VertxMqttAuth()); + return endpoint.auth() == null ? Optional.of(emptyAuth) : Optional.of(new VertxMqttAuth()); } @Override @@ -339,7 +351,7 @@ class VertxMqttConnection implements MqttConnection { @AllArgsConstructor class VertxMqttPublishing implements MqttPublishing { - private MqttPublishMessage message; + private final MqttPublishMessage message; private volatile boolean acknowledged; @@ -367,7 +379,7 @@ class VertxMqttConnection implements MqttConnection { @AllArgsConstructor class VertxMqttSubscription implements MqttSubscription { - private MqttSubscribeMessage message; + private final MqttSubscribeMessage message; private volatile boolean acknowledged; @@ -390,7 +402,7 @@ class VertxMqttConnection implements MqttConnection { @AllArgsConstructor class VertxMqttMqttUnSubscription implements MqttUnSubscription { - private MqttUnsubscribeMessage message; + private final MqttUnsubscribeMessage message; private volatile boolean acknowledged; diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 5390b3ab..c299f99d 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -30,12 +30,12 @@ public class DeviceMessageConnector MessageConnection, MessagePublisher { - private EmitterProcessor messageProcessor = EmitterProcessor.create(false); + private final EmitterProcessor messageProcessor = EmitterProcessor.create(false); - private FluxSink sink = messageProcessor.sink(); + private final FluxSink sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); //将设备注册中心到配置追加到消息header中,下游订阅者可直接使用. - private String[] appendConfigHeader = {"orgId", "productId", "deviceName"}; + private final String[] appendConfigHeader = {"orgId", "productId", "deviceName"}; //设备注册中心 private final DeviceRegistry registry;