From 926c97a996daad0a290d379ae53a77799387badb Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 2 Sep 2020 11:36:11 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96mqtt=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../network/mqtt/client/MqttClient.java | 6 +- .../mqtt/client/MqttClientProvider.java | 39 ++-- .../network/mqtt/client/VertxMqttClient.java | 198 +++++++++++------- 3 files changed, 150 insertions(+), 93 deletions(-) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClient.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClient.java index 4c0f10aa..9c8d97d6 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClient.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClient.java @@ -9,7 +9,11 @@ import java.util.List; public interface MqttClient extends Network { - Flux subscribe(List topics); + default Flux subscribe(List topics){ + return subscribe(topics,0); + } + + Flux subscribe(List topics,int qos); Mono publish(MqttMessage message); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java index 7d56cee6..30a2b870 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java @@ -1,12 +1,17 @@ package org.jetlinks.community.network.mqtt.client; +import com.alibaba.fastjson.JSONObject; import io.vertx.core.Vertx; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.bean.FastBeanCopier; -import org.jetlinks.community.network.*; import org.jetlinks.core.metadata.ConfigMetadata; +import org.jetlinks.core.metadata.DefaultConfigMetadata; +import org.jetlinks.core.metadata.types.BooleanType; +import org.jetlinks.core.metadata.types.IntType; +import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.network.*; import org.jetlinks.community.network.security.CertificateManager; import org.jetlinks.community.network.security.VertxKeyCertTrustOptions; import org.springframework.stereotype.Component; @@ -44,19 +49,23 @@ public class MqttClientProvider implements NetworkProvider @Override public void reload(@Nonnull Network network, @Nonnull MqttClientProperties properties) { - VertxMqttClient mqttClient = ((VertxMqttClient) network); - mqttClient.shutdown(); - + VertxMqttClient mqttClient = ((VertxMqttClient) network); + if (mqttClient.isLoading()) { + return; + } initMqttClient(mqttClient, properties); } public void initMqttClient(VertxMqttClient mqttClient, MqttClientProperties properties) { + mqttClient.setLoading(true); MqttClient client = MqttClient.create(vertx, properties.getOptions()); + mqttClient.setClient(client); client.connect(properties.getPort(), properties.getHost(), result -> { + mqttClient.setLoading(false); if (!result.succeeded()) { log.warn("connect mqtt [{}] error", properties.getId(), result.cause()); } else { - mqttClient.setClient(client); + log.debug("connect mqtt [{}] success", properties.getId()); } }); } @@ -64,8 +73,12 @@ public class MqttClientProvider implements NetworkProvider @Nullable @Override public ConfigMetadata getConfigMetadata() { - // TODO: 2019/12/19 - return null; + return new DefaultConfigMetadata() + .add("id", "id", "", new StringType()) + .add("instance", "服务实例数量(线程数)", "", new IntType()) + .add("certId", "证书id", "", new StringType()) + .add("ssl", "是否开启ssl", "", new BooleanType()) + .add("options.port", "MQTT服务设置", "", new IntType()); } @Nonnull @@ -74,12 +87,12 @@ public class MqttClientProvider implements NetworkProvider return Mono.defer(() -> { MqttClientProperties config = FastBeanCopier.copy(properties.getConfigurations(), new MqttClientProperties()); config.setId(properties.getId()); - if (config.getOptions() == null) { - config.setOptions(new MqttClientOptions()); - config.getOptions().setClientId(config.getClientId()); - config.getOptions().setPassword(config.getPassword()); - config.getOptions().setUsername(config.getUsername()); - } + config.setOptions(new JSONObject(properties.getConfigurations()).toJavaObject(MqttClientOptions.class)); + + config.getOptions().setClientId(config.getClientId()); + config.getOptions().setPassword(config.getPassword()); + config.getOptions().setUsername(config.getUsername()); + if (config.isSsl()) { config.getOptions().setSsl(true); return certificateManager.getCertificate(config.getCertId()) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java index 89939c60..f02bc888 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java @@ -6,19 +6,19 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.SimpleMqttMessage; +import org.jetlinks.core.topic.Topic; import org.jetlinks.community.network.DefaultNetworkType; import org.jetlinks.community.network.NetworkType; -import org.jetlinks.core.utils.TopicUtils; -import reactor.core.publisher.*; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; -import java.io.IOException; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.concurrent.CopyOnWriteArrayList; @Slf4j public class VertxMqttClient implements MqttClient { @@ -26,101 +26,126 @@ public class VertxMqttClient implements MqttClient { @Getter private io.vertx.mqtt.MqttClient client; - private final FluxProcessor messageProcessor = EmitterProcessor.create(false); - - private final FluxSink sink = messageProcessor.sink(FluxSink.OverflowStrategy.BUFFER); - - private final Map topicsSubscribeCounter = new ConcurrentHashMap<>(); - - private boolean neverSubscribe = true; + private final Topic, Integer>> subscriber = Topic.createRoot(); private final String id; - @Getter - private final AtomicInteger reloadCounter = new AtomicInteger(); + private volatile boolean loading; + + private final List loadSuccessListener = new CopyOnWriteArrayList<>(); + + public void setLoading(boolean loading) { + this.loading = loading; + if (!loading) { + loadSuccessListener.forEach(Runnable::run); + loadSuccessListener.clear(); + } + } + + public boolean isLoading() { + return loading; + } + public VertxMqttClient(String id) { this.id = id; } public void setClient(io.vertx.mqtt.MqttClient client) { + if (this.client != null && this.client != client) { + this.client.disconnect(); + } this.client = client; - if (isAlive()) { - reloadCounter.set(0); - client.publishHandler(msg -> { - //从未订阅,可能消息是还没来得及 - //或者已经有了下游消费者 - if (neverSubscribe || messageProcessor.hasDownstreams()) { - sink.next(SimpleMqttMessage - .builder() - .topic(msg.topicName()) - .clientId(client.clientId()) - .qosLevel(msg.qosLevel().value()) - .retain(msg.isRetain()) - .dup(msg.isDup()) - .payload(msg.payload().getByteBuf()) - .messageId(msg.messageId()) - .build()); - } + client + .closeHandler(nil -> log.debug("mqtt client [{}] closed", id)) + .publishHandler(msg -> { + MqttMessage mqttMessage = SimpleMqttMessage + .builder() + .messageId(msg.messageId()) + .topic(msg.topicName()) + .payload(msg.payload().getByteBuf()) + .dup(msg.isDup()) + .retain(msg.isRetain()) + .qosLevel(msg.qosLevel().value()) + .build(); + log.debug("handle mqtt message \n{}", mqttMessage); + subscriber + .findTopic(msg.topicName().replace("#","**").replace("+","*")) + .flatMapIterable(Topic::getSubscribers) + .subscribe(sink -> { + try { + sink.getT1().next(mqttMessage); + } catch (Exception e) { + log.error("handle mqtt message error", e); + } + }); }); - if (!topicsSubscribeCounter.isEmpty()) { - Map reSubscribe = topicsSubscribeCounter - .entrySet() - .stream() - .filter(e -> e.getValue().get() > 0) - .map(Map.Entry::getKey) - .collect(Collectors.toMap(Function.identity(), (r) -> 0)); - if (!reSubscribe.isEmpty()) { - log.info("re subscribe [{}] topic {}", client.clientId(), reSubscribe.keySet()); - client.subscribe(reSubscribe); - } - } + if (isAlive()) { + reSubscribe(); + } else if (loading) { + loadSuccessListener.add(this::reSubscribe); } } - private AtomicInteger getTopicCounter(String topic) { - return topicsSubscribeCounter.computeIfAbsent(topic, (ignore) -> new AtomicInteger()); + private void reSubscribe() { + subscriber + .findTopic("/**") + .filter(topic -> topic.getSubscribers().size() > 0) + .collectMap(topic -> convertMqttTopic(topic.getTopic()), topic -> topic.getSubscribers().iterator().next().getT2()) + .subscribe(topics -> { + log.debug("subscribe mqtt topic {}", topics); + client.subscribe(topics); + }); } + private String convertMqttTopic(String topic) { + return topic.replace("**", "#").replace("*", "+"); + } @Override - public Flux subscribe(List topics) { - neverSubscribe = false; - AtomicBoolean canceled = new AtomicBoolean(); - return Flux.defer(() -> { - Map subscribeTopic = topics.stream() - .filter(r -> getTopicCounter(r).getAndIncrement() == 0) - .collect(Collectors.toMap(Function.identity(), (r) -> 0)); - if (isAlive()) { - if (!subscribeTopic.isEmpty()) { - log.info("subscribe mqtt [{}] topic : {}", client.clientId(), subscribeTopic); - client.subscribe(subscribeTopic); - } - } - return messageProcessor - .filter(msg -> topics - .stream() - .anyMatch(topic -> TopicUtils.match(topic, msg.getTopic()))); - }).doOnCancel(() -> { - if (!canceled.getAndSet(true)) { - for (String topic : topics) { - if (getTopicCounter(topic).decrementAndGet() <= 0 && isAlive()) { - log.info("unsubscribe mqtt [{}] topic : {}", client.clientId(), topic); - client.unsubscribe(topic); + public Flux subscribe(List topics, int qos) { + return Flux.create(sink -> { + + Disposable.Composite composite = Disposables.composite(); + + for (String topic : topics) { + Topic, Integer>> sinkTopic = subscriber.append(topic.replace("#", "**").replace("+", "*")); + + Tuple2, Integer> topicQos = Tuples.of(sink, qos); + + boolean first = sinkTopic.getSubscribers().size() == 0; + sinkTopic.subscribe(topicQos); + composite.add(() -> { + if (sinkTopic.unsubscribe(topicQos).size() > 0) { + client.unsubscribe(convertMqttTopic(topic), result -> { + if (result.succeeded()) { + log.debug("unsubscribe mqtt topic {}", topic); + } else { + log.debug("unsubscribe mqtt topic {} error", topic, result.cause()); + } + }); } + }); + + //首次订阅 + if (isAlive() && first) { + log.debug("subscribe mqtt topic {}", topic); + client.subscribe(convertMqttTopic(topic), qos, result -> { + if (!result.succeeded()) { + sink.error(result.cause()); + } + }); } } + + sink.onDispose(composite); + }); } - @Override - public Mono publish(MqttMessage message) { + private Mono doPublish(MqttMessage message) { return Mono.create((sink) -> { - if (!isAlive()) { - sink.error(new IOException("mqtt client not alive")); - return; - } Buffer buffer = Buffer.buffer(message.getPayload()); client.publish(message.getTopic(), buffer, @@ -139,6 +164,21 @@ public class VertxMqttClient implements MqttClient { }); } + @Override + public Mono publish(MqttMessage message) { + if (loading) { + return Mono.create(sink -> { + loadSuccessListener.add(() -> { + doPublish(message) + .doOnSuccess(sink::success) + .doOnError(sink::error) + .subscribe(); + }); + }); + } + return doPublish(message); + } + @Override public String getId() { return id; @@ -151,11 +191,11 @@ public class VertxMqttClient implements MqttClient { @Override public void shutdown() { + loading = false; if (isAlive()) { client.disconnect(); client = null; } - } @Override