From ec73bfec9568151168b676d31dafc4f1c041253f Mon Sep 17 00:00:00 2001 From: zhouhao Date: Thu, 20 Feb 2020 19:30:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96tcp=20client=20node?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../community/network/tcp/node/TcpClientNode.java | 8 ++++---- .../community/network/tcp/node/TcpClientNodeConfig.java | 5 ++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNode.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNode.java index 4be6b079..31cc872d 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNode.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNode.java @@ -34,10 +34,10 @@ public class TcpClientNode extends CommonExecutableRuleNodeFactoryStrategy clientManager.getNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId()) .flatMapMany(client -> RuleDataCodecs .getCodec(TcpMessage.class) - .map(codec -> codec.decode(data, config.getSendPayloadType()) + .map(codec -> codec.decode(data, config.getPayloadType()) .cast(TcpMessage.class) .switchIfEmpty(Mono.fromRunnable(() -> context.logger().warn("can not decode rule data to tcp message:{}", data)))) - .orElseGet(() -> Flux.just(new TcpMessage(config.getSendPayloadType().write(data.getData())))) + .orElseGet(() -> Flux.just(new TcpMessage(config.getPayloadType().write(data.getData())))) .flatMap(client::send) .all(r-> r)) ; @@ -50,9 +50,9 @@ public class TcpClientNode extends CommonExecutableRuleNodeFactoryStrategygetNetwork(DefaultNetworkType.TCP_CLIENT,config.getClientId()) .switchIfEmpty(Mono.fromRunnable(() -> context.logger().error("tcp client {} not found", config.getClientId()))) .flatMapMany(TcpClient::subscribe) - .doOnNext(msg -> context.logger().info("received tcp client message:{}", config.getSubPayloadType().read(msg.getPayload()))) + .doOnNext(msg -> context.logger().info("received tcp client message:{}", config.getPayloadType().read(msg.getPayload()))) .map(r -> RuleDataCodecs.getCodec(TcpMessage.class) - .map(codec -> codec.encode(r, config.getSubPayloadType())) + .map(codec -> codec.encode(r, config.getPayloadType())) .orElse(r.getPayload())) .onErrorContinue((err, obj) -> { context.logger().error("consume tcp message error", err); diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNodeConfig.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNodeConfig.java index db4c516a..ce9a3668 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNodeConfig.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/node/TcpClientNodeConfig.java @@ -16,9 +16,7 @@ public class TcpClientNodeConfig implements RuleNodeConfig { private PubSubType type; - private PayloadType sendPayloadType; - - private PayloadType subPayloadType; + private PayloadType payloadType; @Override public NodeType getNodeType() { @@ -34,6 +32,7 @@ public class TcpClientNodeConfig implements RuleNodeConfig { public void validate() { Assert.hasText(clientId, "clientId can not be empty!"); Assert.notNull(type, "type can not be null!"); + Assert.notNull(payloadType, "payloadType can not be null!"); } }