diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java index 9cdae161..4b29af6c 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java @@ -1,5 +1,7 @@ package org.jetlinks.community.network.tcp.client; +import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCountUtil; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; import io.vertx.core.net.NetSocket; @@ -8,15 +10,15 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.binary.Hex; -import org.jetlinks.community.network.DefaultNetworkType; -import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.tcp.TcpMessage; import org.jetlinks.community.network.tcp.parser.PayloadParser; import org.jetlinks.core.message.codec.EncodedMessage; -import reactor.core.publisher.EmitterProcessor; +import org.jetlinks.core.utils.Reactors; +import org.jetlinks.community.network.DefaultNetworkType; +import org.jetlinks.community.network.NetworkType; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import java.net.InetSocketAddress; import java.net.SocketException; @@ -24,28 +26,29 @@ import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Function; @Slf4j public class VertxTcpClient implements TcpClient { + public volatile NetClient client; + + public NetSocket socket; + + volatile PayloadParser payloadParser; + @Getter private final String id; - private final List disconnectListener = new CopyOnWriteArrayList<>(); - private final EmitterProcessor processor = EmitterProcessor.create(false); - private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); - private final boolean serverClient; - public volatile NetClient client; - public NetSocket socket; - volatile PayloadParser payloadParser; + @Setter private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis(); + private volatile long lastKeepAliveTime = System.currentTimeMillis(); - public VertxTcpClient(String id, boolean serverClient) { - this.id = id; - this.serverClient = serverClient; - } + private final List disconnectListener = new CopyOnWriteArrayList<>(); + + private final Sinks.Many sink = Reactors.createMany(); + + private final boolean serverClient; @Override public void keepAlive() { @@ -72,15 +75,18 @@ public class VertxTcpClient implements TcpClient { @Override public Mono sendMessage(EncodedMessage message) { return Mono - .create((sink) -> { + .create((sink) -> { if (socket == null) { sink.error(new SocketException("socket closed")); return; } - Buffer buffer = Buffer.buffer(message.getPayload()); + ByteBuf buf = message.getPayload(); + Buffer buffer = Buffer.buffer(buf); + int len = buffer.length(); socket.write(buffer, r -> { - keepAlive(); + ReferenceCountUtil.safeRelease(buf); if (r.succeeded()) { + keepAlive(); sink.success(); } else { sink.error(r.cause()); @@ -111,23 +117,18 @@ public class VertxTcpClient implements TcpClient { return true; } - /** - * 接收TCP消息 - * - * @param message TCP消息 - */ + public VertxTcpClient(String id,boolean serverClient) { + this.id = id; + this.serverClient = serverClient; + } + protected void received(TcpMessage message) { - if (processor.getPending() > processor.getBufferSize() / 2) { - log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString()); - return; - } - sink.next(message); + sink.emitNext(message,Reactors.RETRY_NON_SERIALIZED); } @Override public Flux subscribe() { - return processor - .map(Function.identity()); + return sink.asFlux(); } private void execute(Runnable runnable) { @@ -144,7 +145,7 @@ public class VertxTcpClient implements TcpClient { return null; } SocketAddress socketAddress = socket.remoteAddress(); - return new InetSocketAddress(socketAddress.host(), socketAddress.port()); + return InetSocketAddress.createUnresolved(socketAddress.host(), socketAddress.port()); } @Override @@ -154,6 +155,9 @@ public class VertxTcpClient implements TcpClient { @Override public void shutdown() { + if (socket == null) { + return; + } log.debug("tcp client [{}] disconnect", getId()); synchronized (this) { if (null != client) { @@ -174,7 +178,7 @@ public class VertxTcpClient implements TcpClient { } disconnectListener.clear(); if (serverClient) { - processor.onComplete(); + sink.tryEmitComplete(); } } @@ -186,11 +190,6 @@ public class VertxTcpClient implements TcpClient { this.client = client; } - /** - * 设置客户端消息解析器 - * - * @param payloadParser 消息解析器 - */ public void setRecordParser(PayloadParser payloadParser) { synchronized (this) { if (null != this.payloadParser && this.payloadParser != payloadParser) { @@ -199,18 +198,10 @@ public class VertxTcpClient implements TcpClient { this.payloadParser = payloadParser; this.payloadParser .handlePayload() - .onErrorContinue((err, res) -> { - log.error(err.getMessage(), err); - }) .subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf()))); } } - /** - * socket处理 - * - * @param socket socket - */ public void setSocket(NetSocket socket) { synchronized (this) { Objects.requireNonNull(payloadParser); @@ -222,12 +213,12 @@ public class VertxTcpClient implements TcpClient { .handler(buffer -> { if (log.isDebugEnabled()) { log.debug("handle tcp client[{}] payload:[{}]", - socket.remoteAddress(), - Hex.encodeHexString(buffer.getBytes())); + socket.remoteAddress(), + Hex.encodeHexString(buffer.getBytes())); } keepAlive(); payloadParser.handle(buffer); - if (this.socket != socket) { + if (this.socket != null && this.socket != socket) { log.warn("tcp client [{}] memory leak ", socket.remoteAddress()); socket.close(); }