From af5d3a1f5330352b7771d6ba3d58f8b0d193f021 Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Thu, 14 May 2020 15:49:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E7=BB=84?= =?UTF-8?q?=E4=BB=B6=E8=B0=83=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../network/tcp/client/VertxTcpClient.java | 1 + .../tcp/client/VertxTcpClientProvider.java | 2 +- .../network/manager/debug/DebugUtils.java | 22 +++ .../TcpClientDebugSubscriptionProvider.java | 82 +++++++++++ .../TcpServerDebugSubscriptionProvider.java | 128 ++++++++++++++++++ 5 files changed, 234 insertions(+), 1 deletion(-) create mode 100644 jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java create mode 100644 jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java create mode 100644 jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java index 98ed6e63..436bf5d8 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java @@ -110,6 +110,7 @@ public class VertxTcpClient extends AbstractTcpClient { if (this.client != null && this.client != client) { this.client.close(); } + keepAlive(); this.client = client; } diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java index a707d156..03a0e60f 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClientProvider.java @@ -56,13 +56,13 @@ public class VertxTcpClientProvider implements NetworkProvider { if (result.succeeded()) { log.debug("connect tcp [{}:{}] success", properties.getHost(), properties.getPort()); + client.setRecordParser(payloadParserBuilder.build(properties.getParserType(), properties)); client.setSocket(result.result()); } else { log.error("connect tcp [{}:{}] error", properties.getHost(), properties.getPort(),result.cause()); diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java new file mode 100644 index 00000000..4d48388e --- /dev/null +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/DebugUtils.java @@ -0,0 +1,22 @@ +package org.jetlinks.community.network.manager.debug; + +import io.netty.buffer.ByteBufUtil; +import org.springframework.util.StringUtils; + +public class DebugUtils { + + static byte[] stringToBytes(String text){ + byte[] payload; + if (StringUtils.isEmpty(text)) { + payload = new byte[0]; + } else { + if (text.startsWith("0x")) { + payload = ByteBufUtil.decodeHexDump(text, 2, text.length()-2); + } else { + payload = text.getBytes(); + } + } + return payload; + } + +} diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java new file mode 100644 index 00000000..f33c2552 --- /dev/null +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpClientDebugSubscriptionProvider.java @@ -0,0 +1,82 @@ +package org.jetlinks.community.network.manager.debug; + +import io.netty.buffer.Unpooled; +import org.jetlinks.community.gateway.external.SubscribeRequest; +import org.jetlinks.community.gateway.external.SubscriptionProvider; +import org.jetlinks.community.network.DefaultNetworkType; +import org.jetlinks.community.network.NetworkManager; +import org.jetlinks.community.network.tcp.TcpMessage; +import org.jetlinks.community.network.tcp.client.TcpClient; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; + +@Component +public class TcpClientDebugSubscriptionProvider implements SubscriptionProvider { + + private final NetworkManager networkManager; + + public TcpClientDebugSubscriptionProvider(NetworkManager networkManager) { + this.networkManager = networkManager; + } + + @Override + public String id() { + return "network-tcp-client-debug"; + } + + @Override + public String name() { + return "TCP客户端调试"; + } + + @Override + public String[] getTopicPattern() { + return new String[]{ + "/network/tcp/client/*/_send", + "/network/tcp/client/*/_subscribe" + }; + } + + @Override + public Flux subscribe(SubscribeRequest request) { + String id = request.getTopic().split("[/]")[4]; + if (request.getTopic().endsWith("_send")) { + return send(id, request); + } else { + return subscribe(id, request); + } + } + + public Flux send(String id, SubscribeRequest request) { + String message = request.getString("request") + .orElseThrow(() -> new IllegalArgumentException("参数[request]不能为空")); + + byte[] payload=DebugUtils.stringToBytes(message); + + return networkManager + .getNetwork(DefaultNetworkType.TCP_CLIENT, id) + .flatMap(client -> client.send(new TcpMessage(Unpooled.wrappedBuffer(payload)))) + .thenReturn("推送成功") + .flux(); + } + + @SuppressWarnings("all") + public Flux subscribe(String id, SubscribeRequest request) { + String message = request.getString("response").filter(StringUtils::hasText).orElse(null); + + byte[] payload =DebugUtils.stringToBytes(message); + + return networkManager + .getNetwork(DefaultNetworkType.TCP_CLIENT, id) + .flatMapMany(client -> client + .subscribe() + .flatMap(msg -> client + .send(new TcpMessage(Unpooled.wrappedBuffer(payload))) + .thenReturn(msg)) + .map(TcpMessage::toString) + ); + } + + +} diff --git a/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java new file mode 100644 index 00000000..943621c3 --- /dev/null +++ b/jetlinks-manager/network-manager/src/main/java/org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.java @@ -0,0 +1,128 @@ +package org.jetlinks.community.network.manager.debug; + +import io.netty.buffer.Unpooled; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.jetlinks.community.gateway.external.SubscribeRequest; +import org.jetlinks.community.gateway.external.SubscriptionProvider; +import org.jetlinks.community.network.DefaultNetworkType; +import org.jetlinks.community.network.NetworkManager; +import org.jetlinks.community.network.tcp.TcpMessage; +import org.jetlinks.community.network.tcp.client.TcpClient; +import org.jetlinks.community.network.tcp.server.TcpServer; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.Map; + +@Component +public class TcpServerDebugSubscriptionProvider implements SubscriptionProvider { + + private final NetworkManager networkManager; + + public TcpServerDebugSubscriptionProvider(NetworkManager networkManager) { + this.networkManager = networkManager; + } + + @Override + public String id() { + return "network-tcp-server-debug"; + } + + @Override + public String name() { + return "TCP服务调试"; + } + + @Override + public String[] getTopicPattern() { + return new String[]{ + "/network/tcp/server/*/_subscribe" + }; + } + + @Override + public Flux subscribe(SubscribeRequest request) { + String id = request.getTopic().split("[/]")[4]; + + return subscribe(id, request); + } + + @SuppressWarnings("all") + public Flux subscribe(String id, SubscribeRequest request) { + + String message = request.getString("response").filter(StringUtils::hasText).orElse(null); + + byte[] payload = DebugUtils.stringToBytes(message); + + return Flux.create(sink -> + sink.onDispose(networkManager + .getNetwork(DefaultNetworkType.TCP_SERVER, id) + .flatMap(server -> + server + .handleConnection() + .doOnNext(client -> sink.next(TcpClientMessage.of(client))) + .flatMap(client -> { + client.onDisconnect(() -> { + sink.next(TcpClientMessage.ofDisconnect(client)); + }); + return client + .subscribe() + .map(msg -> TcpClientMessage.of(client, msg)) + .doOnNext(sink::next) + .flatMap(msg -> { + if (payload.length > 0) { + return client.send(new TcpMessage(Unpooled.wrappedBuffer(payload))); + } + return Mono.empty(); + }) + .then(); + }) + .then() + ) + .doOnError(sink::error) + .subscriberContext(sink.currentContext()) + .subscribe() + )); + } + + + @AllArgsConstructor(staticName = "of") + @Getter + @Setter + public static class TcpClientMessage { + private String type; + + private String typeText; + + private Object data; + + public static TcpClientMessage of(TcpClient client) { + Map data = new HashMap<>(); + data.put("address", client.getRemoteAddress()); + + return TcpClientMessage.of("connection", "连接", data); + } + + public static TcpClientMessage ofDisconnect(TcpClient client) { + Map data = new HashMap<>(); + data.put("address", client.getRemoteAddress()); + + return TcpClientMessage.of("disconnection", "断开连接", data); + } + + public static TcpClientMessage of(TcpClient connection, TcpMessage message) { + Map data = new HashMap<>(); + data.put("address", connection.getRemoteAddress().toString()); + data.put("message", message.toString()); + + return TcpClientMessage.of("publish", "订阅", data); + } + + + } +}