diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java index 02c9d21b..fad16676 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java @@ -1,6 +1,8 @@ package org.jetlinks.community.network.mqtt.client; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -98,7 +100,11 @@ public class VertxMqttClient implements MqttClient { subscriber .findTopic("/**") .filter(topic -> topic.getSubscribers().size() > 0) - .collectMap(topic -> convertMqttTopic(topic.getSubscribers().iterator().next().getT1()), topic -> topic.getSubscribers().iterator().next().getT3()) + .collectMap(topic -> convertMqttTopic(topic.getSubscribers().iterator().next().getT1()), topic -> topic + .getSubscribers() + .iterator() + .next() + .getT3()) .filter(MapUtils::isNotEmpty) .subscribe(topics -> { log.debug("subscribe mqtt topic {}", topics); @@ -113,13 +119,13 @@ public class VertxMqttClient implements MqttClient { protected String parseTopic(String topic) { //适配emqx共享订阅 if (topic.startsWith("$share")) { - topic= Stream.of(topic.split("/")) - .skip(2) - .collect(Collectors.joining("/", "/", "")); + topic = Stream.of(topic.split("/")) + .skip(2) + .collect(Collectors.joining("/", "/", "")); } else if (topic.startsWith("$queue")) { - topic= topic.substring(6); + topic = topic.substring(6); } - if(topic.startsWith("//")){ + if (topic.startsWith("//")) { return topic.substring(1); } return topic; @@ -172,21 +178,26 @@ public class VertxMqttClient implements MqttClient { private Mono doPublish(MqttMessage message) { return Mono.create((sink) -> { - Buffer buffer = Buffer.buffer(message.getPayload()); + ByteBuf payload = message.getPayload(); + Buffer buffer = Buffer.buffer(payload); client.publish(message.getTopic(), - buffer, - MqttQoS.valueOf(message.getQosLevel()), - message.isDup(), - message.isRetain(), - result -> { - if (result.succeeded()) { - log.info("publish mqtt [{}] message success: {}", client.clientId(), message); - sink.success(); - } else { - log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause()); - sink.error(result.cause()); - } - }); + buffer, + MqttQoS.valueOf(message.getQosLevel()), + message.isDup(), + message.isRetain(), + result -> { + try { + if (result.succeeded()) { + log.info("publish mqtt [{}] message success: {}", client.clientId(), message); + sink.success(); + } else { + log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause()); + sink.error(result.cause()); + } + } finally { + ReferenceCountUtil.safeRelease(payload); + } + }); }); } @@ -194,11 +205,11 @@ public class VertxMqttClient implements MqttClient { public Mono publish(MqttMessage message) { if (loading) { return Mono.create(sink -> - loadSuccessListener - .add(() -> doPublish(message) - .doOnSuccess(sink::success) - .doOnError(sink::error) - .subscribe())); + loadSuccessListener + .add(() -> doPublish(message) + .doOnSuccess(sink::success) + .doOnError(sink::error) + .subscribe())); } return doPublish(message); } diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java index e5742530..eb557f14 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java @@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.SocketAddress; import io.vertx.mqtt.MqttEndpoint; @@ -264,19 +265,22 @@ class VertxMqttConnection implements MqttConnection { ping(); return Mono .create(sink -> { - Buffer buffer = Buffer.buffer(message.getPayload()); - endpoint.publish(message.getTopic(), - buffer, - MqttQoS.valueOf(message.getQosLevel()), - message.isDup(), - message.isRetain(), - result -> { - if (result.succeeded()) { - sink.success(); - } else { - sink.error(result.cause()); - } - } + ByteBuf buf = message.getPayload(); + Buffer buffer = Buffer.buffer(buf); + endpoint.publish( + message.getTopic(), + buffer, + MqttQoS.valueOf(message.getQosLevel()), + message.isDup(), + message.isRetain(), + result -> { + if (result.succeeded()) { + sink.success(); + } else { + sink.error(result.cause()); + } + ReferenceCountUtil.safeRelease(buf); + } ); }); }