fix: 优化tcp client逻辑

This commit is contained in:
zhouhao 2024-06-12 09:34:48 +08:00
parent 29174496b6
commit 477ecd911a
1 changed files with 40 additions and 49 deletions

View File

@ -1,5 +1,7 @@
package org.jetlinks.community.network.tcp.client;
import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
@ -8,15 +10,15 @@ import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.message.codec.EncodedMessage;
import reactor.core.publisher.EmitterProcessor;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import java.net.InetSocketAddress;
import java.net.SocketException;
@ -24,28 +26,29 @@ import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
@Slf4j
public class VertxTcpClient implements TcpClient {
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Getter
private final String id;
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final boolean serverClient;
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Setter
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
private volatile long lastKeepAliveTime = System.currentTimeMillis();
public VertxTcpClient(String id, boolean serverClient) {
this.id = id;
this.serverClient = serverClient;
}
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final Sinks.Many<TcpMessage> sink = Reactors.createMany();
private final boolean serverClient;
@Override
public void keepAlive() {
@ -72,15 +75,18 @@ public class VertxTcpClient implements TcpClient {
@Override
public Mono<Void> sendMessage(EncodedMessage message) {
return Mono
.create((sink) -> {
.<Void>create((sink) -> {
if (socket == null) {
sink.error(new SocketException("socket closed"));
return;
}
Buffer buffer = Buffer.buffer(message.getPayload());
ByteBuf buf = message.getPayload();
Buffer buffer = Buffer.buffer(buf);
int len = buffer.length();
socket.write(buffer, r -> {
keepAlive();
ReferenceCountUtil.safeRelease(buf);
if (r.succeeded()) {
keepAlive();
sink.success();
} else {
sink.error(r.cause());
@ -111,23 +117,18 @@ public class VertxTcpClient implements TcpClient {
return true;
}
/**
* 接收TCP消息
*
* @param message TCP消息
*/
public VertxTcpClient(String id,boolean serverClient) {
this.id = id;
this.serverClient = serverClient;
}
protected void received(TcpMessage message) {
if (processor.getPending() > processor.getBufferSize() / 2) {
log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString());
return;
}
sink.next(message);
sink.emitNext(message,Reactors.RETRY_NON_SERIALIZED);
}
@Override
public Flux<TcpMessage> subscribe() {
return processor
.map(Function.identity());
return sink.asFlux();
}
private void execute(Runnable runnable) {
@ -144,7 +145,7 @@ public class VertxTcpClient implements TcpClient {
return null;
}
SocketAddress socketAddress = socket.remoteAddress();
return new InetSocketAddress(socketAddress.host(), socketAddress.port());
return InetSocketAddress.createUnresolved(socketAddress.host(), socketAddress.port());
}
@Override
@ -154,6 +155,9 @@ public class VertxTcpClient implements TcpClient {
@Override
public void shutdown() {
if (socket == null) {
return;
}
log.debug("tcp client [{}] disconnect", getId());
synchronized (this) {
if (null != client) {
@ -174,7 +178,7 @@ public class VertxTcpClient implements TcpClient {
}
disconnectListener.clear();
if (serverClient) {
processor.onComplete();
sink.tryEmitComplete();
}
}
@ -186,11 +190,6 @@ public class VertxTcpClient implements TcpClient {
this.client = client;
}
/**
* 设置客户端消息解析器
*
* @param payloadParser 消息解析器
*/
public void setRecordParser(PayloadParser payloadParser) {
synchronized (this) {
if (null != this.payloadParser && this.payloadParser != payloadParser) {
@ -199,18 +198,10 @@ public class VertxTcpClient implements TcpClient {
this.payloadParser = payloadParser;
this.payloadParser
.handlePayload()
.onErrorContinue((err, res) -> {
log.error(err.getMessage(), err);
})
.subscribe(buffer -> received(new TcpMessage(buffer.getByteBuf())));
}
}
/**
* socket处理
*
* @param socket socket
*/
public void setSocket(NetSocket socket) {
synchronized (this) {
Objects.requireNonNull(payloadParser);
@ -222,12 +213,12 @@ public class VertxTcpClient implements TcpClient {
.handler(buffer -> {
if (log.isDebugEnabled()) {
log.debug("handle tcp client[{}] payload:[{}]",
socket.remoteAddress(),
Hex.encodeHexString(buffer.getBytes()));
socket.remoteAddress(),
Hex.encodeHexString(buffer.getBytes()));
}
keepAlive();
payloadParser.handle(buffer);
if (this.socket != socket) {
if (this.socket != null && this.socket != socket) {
log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
socket.close();
}