diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/LocalMessageConnector.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/LocalMessageConnector.java index ae31a4a4..de6799bd 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/LocalMessageConnector.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/LocalMessageConnector.java @@ -4,6 +4,7 @@ import org.jetlinks.community.gateway.MessageConnection; import org.jetlinks.community.gateway.MessageConnector; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import javax.annotation.Nonnull; import java.util.function.Function; @@ -26,11 +27,13 @@ class LocalMessageConnector implements MessageConnector { return "本地连接器"; } - EmitterProcessor processor = EmitterProcessor.create(false); + private final EmitterProcessor processor = EmitterProcessor.create(false); + + private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); public LocalMessageConnection addConnection(String id, boolean shareCluster) { - LocalMessageConnection connection = new LocalMessageConnection(id,shareCluster); - processor.onNext(connection); + LocalMessageConnection connection = new LocalMessageConnection(id, shareCluster); + sink.next(connection); return connection; } diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java index c628277d..1de44b1c 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java @@ -26,24 +26,21 @@ public class VertxMqttClient implements MqttClient { @Getter private io.vertx.mqtt.MqttClient client; - private FluxProcessor messageProcessor; + private final FluxProcessor messageProcessor = EmitterProcessor.create(false); - private FluxSink sink; + private final FluxSink sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); - private Map topicsSubscribeCounter = new ConcurrentHashMap<>(); + private final Map topicsSubscribeCounter = new ConcurrentHashMap<>(); private boolean neverSubscribe = true; - private String id; + private final String id; @Getter - private AtomicInteger reloadCounter = new AtomicInteger(); - + private final AtomicInteger reloadCounter = new AtomicInteger(); public VertxMqttClient(String id) { this.id = id; - this.messageProcessor = EmitterProcessor.create(false); - sink = this.messageProcessor.sink(); } public void setClient(io.vertx.mqtt.MqttClient client) { diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java index aad55918..32ec09d3 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import javax.annotation.Nonnull; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; @@ -45,32 +46,31 @@ import java.util.function.Function; public class MqttClientDeviceGateway implements DeviceGateway { @Getter - private String id; + private final String id; - private MqttClient mqttClient; + private final MqttClient mqttClient; - private DeviceRegistry registry; + private final DeviceRegistry registry; - private List topics; + private final List topics; - private String protocol; + private final String protocol; - private ProtocolSupports protocolSupport; + private final ProtocolSupports protocolSupport; - private DecodedClientMessageHandler clientMessageHandler; + private final DecodedClientMessageHandler clientMessageHandler; + private final EmitterProcessor messageProcessor = EmitterProcessor.create(false); - private EmitterProcessor messageProcessor = EmitterProcessor.create(false); + private final FluxSink sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); - private FluxSink sink = messageProcessor.sink(); + private final AtomicBoolean started = new AtomicBoolean(); - private AtomicBoolean started = new AtomicBoolean(); + private final List disposable = new CopyOnWriteArrayList<>(); - private List disposable = new CopyOnWriteArrayList<>(); + private final DeviceGatewayMonitor gatewayMonitor; - private DeviceSessionManager sessionManager; - - private DeviceGatewayMonitor gatewayMonitor; + private final DeviceSessionManager sessionManager; public MqttClientDeviceGateway(String id, MqttClient mqttClient, @@ -87,8 +87,8 @@ public class MqttClientDeviceGateway implements DeviceGateway { this.registry = Objects.requireNonNull(registry, "registry"); this.protocolSupport = Objects.requireNonNull(protocolSupport, "protocolSupport"); this.protocol = Objects.requireNonNull(protocol, "protocol"); - this.sessionManager = Objects.requireNonNull(sessionManager, "sessionManager"); this.clientMessageHandler = Objects.requireNonNull(clientMessageHandler, "clientMessageHandler"); + this.sessionManager = Objects.requireNonNull(sessionManager, "sessionManager"); this.topics = Objects.requireNonNull(topics, "topics"); } @@ -109,6 +109,7 @@ public class MqttClientDeviceGateway implements DeviceGateway { return getProtocol() .flatMap(codec -> codec.getMessageCodec(getTransport())) .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() { + @Nonnull @Override public EncodedMessage getMessage() { return mqttMessage; diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java index a6c67c95..5858e240 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/parser/strateies/PipePayloadParser.java @@ -28,13 +28,13 @@ import java.util.function.Function; @Slf4j public class PipePayloadParser implements PayloadParser { - private EmitterProcessor processor = EmitterProcessor.create(true); + private final EmitterProcessor processor = EmitterProcessor.create(true); - private FluxSink sink = processor.sink(); + private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); - private List> pipe = new CopyOnWriteArrayList<>(); + private final List> pipe = new CopyOnWriteArrayList<>(); - private List result = new CopyOnWriteArrayList<>(); + private final List result = new CopyOnWriteArrayList<>(); private volatile RecordParser recordParser; @@ -42,7 +42,7 @@ public class PipePayloadParser implements PayloadParser { private Consumer firstInit; - private AtomicInteger currentPipe = new AtomicInteger(); + private final AtomicInteger currentPipe = new AtomicInteger(); public PipePayloadParser result(String buffer) { return result(Buffer.buffer(buffer)); @@ -68,6 +68,10 @@ public class PipePayloadParser implements PayloadParser { } public PipePayloadParser fixed(int size) { + if (size == 0) { + complete(); + return this; + } if (recordParser == null) { setParser(RecordParser.newFixed(size)); firstInit = (parser -> parser.fixedSizeMode(size)); diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/AbstractTcpServer.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/AbstractTcpServer.java deleted file mode 100644 index cfba88d4..00000000 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/AbstractTcpServer.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.jetlinks.community.network.tcp.server; - -import org.jetlinks.community.network.tcp.client.TcpClient; -import reactor.core.publisher.EmitterProcessor; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; - -import java.util.function.Function; - -/** - * @author bsetfeng - * @since 1.0 - **/ -public abstract class AbstractTcpServer implements TcpServer{ - - private EmitterProcessor processor = EmitterProcessor.create(false); - - FluxSink sink=processor.sink(); - - protected void received(TcpClient tcpClient) { - // if (processor.hasDownstreams()) { - sink.next(tcpClient); - // } - } - - @Override - public Flux handleConnection() { - return processor - .map(Function.identity()); - } -} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/DeviceAlarmHistoryService.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/DeviceAlarmHistoryService.java index 8baec436..479ad7b0 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/DeviceAlarmHistoryService.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/service/DeviceAlarmHistoryService.java @@ -20,7 +20,7 @@ public class DeviceAlarmHistoryService extends GenericReactiveCrudService processor = EmitterProcessor.create(false); - FluxSink sink = processor.sink(); + FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); @PostConstruct public void init() {