优化资源释放

This commit is contained in:
zhouhao 2022-11-02 18:13:30 +08:00
parent 7540ab9cac
commit c67d7b99ce
2 changed files with 53 additions and 38 deletions

View File

@ -1,6 +1,8 @@
package org.jetlinks.community.network.mqtt.client;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -98,7 +100,11 @@ public class VertxMqttClient implements MqttClient {
subscriber
.findTopic("/**")
.filter(topic -> topic.getSubscribers().size() > 0)
.collectMap(topic -> convertMqttTopic(topic.getSubscribers().iterator().next().getT1()), topic -> topic.getSubscribers().iterator().next().getT3())
.collectMap(topic -> convertMqttTopic(topic.getSubscribers().iterator().next().getT1()), topic -> topic
.getSubscribers()
.iterator()
.next()
.getT3())
.filter(MapUtils::isNotEmpty)
.subscribe(topics -> {
log.debug("subscribe mqtt topic {}", topics);
@ -113,13 +119,13 @@ public class VertxMqttClient implements MqttClient {
protected String parseTopic(String topic) {
//适配emqx共享订阅
if (topic.startsWith("$share")) {
topic= Stream.of(topic.split("/"))
.skip(2)
.collect(Collectors.joining("/", "/", ""));
topic = Stream.of(topic.split("/"))
.skip(2)
.collect(Collectors.joining("/", "/", ""));
} else if (topic.startsWith("$queue")) {
topic= topic.substring(6);
topic = topic.substring(6);
}
if(topic.startsWith("//")){
if (topic.startsWith("//")) {
return topic.substring(1);
}
return topic;
@ -172,21 +178,26 @@ public class VertxMqttClient implements MqttClient {
private Mono<Void> doPublish(MqttMessage message) {
return Mono.create((sink) -> {
Buffer buffer = Buffer.buffer(message.getPayload());
ByteBuf payload = message.getPayload();
Buffer buffer = Buffer.buffer(payload);
client.publish(message.getTopic(),
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
if (result.succeeded()) {
log.info("publish mqtt [{}] message success: {}", client.clientId(), message);
sink.success();
} else {
log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause());
sink.error(result.cause());
}
});
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
try {
if (result.succeeded()) {
log.info("publish mqtt [{}] message success: {}", client.clientId(), message);
sink.success();
} else {
log.info("publish mqtt [{}] message error : {}", client.clientId(), message, result.cause());
sink.error(result.cause());
}
} finally {
ReferenceCountUtil.safeRelease(payload);
}
});
});
}
@ -194,11 +205,11 @@ public class VertxMqttClient implements MqttClient {
public Mono<Void> publish(MqttMessage message) {
if (loading) {
return Mono.create(sink ->
loadSuccessListener
.add(() -> doPublish(message)
.doOnSuccess(sink::success)
.doOnError(sink::error)
.subscribe()));
loadSuccessListener
.add(() -> doPublish(message)
.doOnSuccess(sink::success)
.doOnError(sink::error)
.subscribe()));
}
return doPublish(message);
}

View File

@ -5,6 +5,7 @@ import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.SocketAddress;
import io.vertx.mqtt.MqttEndpoint;
@ -264,19 +265,22 @@ class VertxMqttConnection implements MqttConnection {
ping();
return Mono
.<Void>create(sink -> {
Buffer buffer = Buffer.buffer(message.getPayload());
endpoint.publish(message.getTopic(),
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
if (result.succeeded()) {
sink.success();
} else {
sink.error(result.cause());
}
}
ByteBuf buf = message.getPayload();
Buffer buffer = Buffer.buffer(buf);
endpoint.publish(
message.getTopic(),
buffer,
MqttQoS.valueOf(message.getQosLevel()),
message.isDup(),
message.isRetain(),
result -> {
if (result.succeeded()) {
sink.success();
} else {
sink.error(result.cause());
}
ReferenceCountUtil.safeRelease(buf);
}
);
});
}