From fbba69a28f8314f4115dc9837efa16b064de9fcf Mon Sep 17 00:00:00 2001 From: zhouhao Date: Wed, 2 Nov 2022 18:20:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=B5=84=E6=BA=90=E9=87=8A?= =?UTF-8?q?=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../http/server/vertx/VertxHttpExchange.java | 9 ++++- .../network/mqtt/client/VertxMqttClient.java | 21 ++++++++---- .../server/vertx/VertxMqttConnection.java | 33 +++++++++++-------- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java index f2897f75..4ecbdc59 100755 --- a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java +++ b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/server/vertx/VertxHttpExchange.java @@ -2,6 +2,8 @@ package org.jetlinks.community.network.http.server.vertx; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledHeapByteBuf; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.MultiMap; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpMethod; @@ -224,7 +226,12 @@ public class VertxHttpExchange implements HttpExchange, HttpResponse, HttpReques .create(sink -> { Buffer buf = Buffer.buffer(buffer); setResponseDefaultLength(buf.length()); - response.write(buf, v -> sink.success()); + response.write(buf, v -> { + sink.success(); + if(!(buffer instanceof UnpooledHeapByteBuf)){ + ReferenceCountUtil.safeRelease(buffer); + } + }); }); } diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java index c6d40351..dc7cf842 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/VertxMqttClient.java @@ -1,6 +1,8 @@ package org.jetlinks.community.network.mqtt.client; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import lombok.Getter; import lombok.Setter; @@ -199,19 +201,24 @@ public class VertxMqttClient implements MqttClient { private Mono doPublish(MqttMessage message) { return Mono.create((sink) -> { - Buffer buffer = Buffer.buffer(message.getPayload()); + ByteBuf payload = message.getPayload(); + Buffer buffer = Buffer.buffer(payload); client.publish(message.getTopic(), buffer, MqttQoS.valueOf(message.getQosLevel()), message.isDup(), message.isRetain(), result -> { - if (result.succeeded()) { - log.info("publish mqtt [{}] message success: {}", client.clientId(), message); - sink.success(); - } else { - log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause()); - sink.error(result.cause()); + try { + if (result.succeeded()) { + log.info("publish mqtt [{}] message success: {}", client.clientId(), message); + sink.success(); + } else { + log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause()); + sink.error(result.cause()); + } + } finally { + ReferenceCountUtil.safeRelease(payload); } }); }); diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java index 7b427050..7a1f4cad 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.java @@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.SocketAddress; import io.vertx.mqtt.MqttEndpoint; @@ -32,6 +33,7 @@ import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -41,7 +43,7 @@ class VertxMqttConnection implements MqttConnection { private long keepAliveTimeoutMs; @Getter private long lastPingTime = System.currentTimeMillis(); - private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = false; + private volatile boolean closed = false, accepted = false, autoAckSub = true, autoAckUnSub = true, autoAckMsg = true; private static final MqttAuth emptyAuth = new MqttAuth() { @Override public String getUsername() { @@ -259,19 +261,22 @@ class VertxMqttConnection implements MqttConnection { ping(); return Mono .create(sink -> { - Buffer buffer = Buffer.buffer(message.getPayload()); - endpoint.publish(message.getTopic(), - buffer, - MqttQoS.valueOf(message.getQosLevel()), - message.isDup(), - message.isRetain(), - result -> { - if (result.succeeded()) { - sink.success(); - } else { - sink.error(result.cause()); - } - } + ByteBuf buf = message.getPayload(); + Buffer buffer = Buffer.buffer(buf); + endpoint.publish( + message.getTopic(), + buffer, + MqttQoS.valueOf(message.getQosLevel()), + message.isDup(), + message.isRetain(), + result -> { + if (result.succeeded()) { + sink.success(); + } else { + sink.error(result.cause()); + } + ReferenceCountUtil.safeRelease(buf); + } ); }); }