diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java index dbbed028..97cca036 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClient.java @@ -2,6 +2,7 @@ package org.jetlinks.community.network.tcp.client; import org.jetlinks.community.network.Network; import org.jetlinks.community.network.tcp.TcpMessage; +import org.jetlinks.core.server.ClientConnection; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -14,7 +15,7 @@ import java.time.Duration; * @author zhouhao * @version 1.0 */ -public interface TcpClient extends Network { +public interface TcpClient extends Network, ClientConnection { /** * 获取客户端远程地址 diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java index bae531dc..6da4c4f2 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java @@ -12,6 +12,7 @@ import org.jetlinks.community.network.DefaultNetworkType; import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.tcp.TcpMessage; import org.jetlinks.community.network.tcp.parser.PayloadParser; +import org.jetlinks.core.message.codec.EncodedMessage; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -67,6 +68,44 @@ public class VertxTcpClient implements TcpClient { } } + + @Override + public InetSocketAddress address() { + return getRemoteAddress(); + } + + @Override + public Mono sendMessage(EncodedMessage message) { + return Mono + .create((sink) -> { + if (socket == null) { + sink.error(new SocketException("socket closed")); + return; + } + Buffer buffer = Buffer.buffer(message.getPayload()); + socket.write(buffer, r -> { + keepAlive(); + if (r.succeeded()) { + sink.success(); + } else { + sink.error(r.cause()); + } + }); + }); + } + + @Override + public Flux receiveMessage() { + return this + .subscribe() + .cast(EncodedMessage.class); + } + + @Override + public void disconnect() { + shutdown(); + } + @Override public boolean isAlive() { return socket != null && (keepAliveTimeoutMs < 0 || System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs); @@ -193,21 +232,8 @@ public class VertxTcpClient implements TcpClient { @Override public Mono send(TcpMessage message) { - return Mono.create((sink) -> { - if (socket == null) { - sink.error(new SocketException("socket closed")); - return; - } - Buffer buffer = Buffer.buffer(message.getPayload()); - socket.write(buffer, r -> { - keepAlive(); - if (r.succeeded()) { - sink.success(true); - } else { - sink.error(r.cause()); - } - }); - }); + return sendMessage(message) + .thenReturn(true); } @Override diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java index 9a88d082..8b3842f8 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java @@ -5,6 +5,8 @@ import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.logger.ReactiveLogger; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.ProtocolSupports; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.Message; @@ -12,6 +14,7 @@ import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.FromDeviceMessageContext; import org.jetlinks.core.message.codec.Transport; +import org.jetlinks.core.server.DeviceGatewayContext; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSessionManager; import org.jetlinks.community.gateway.DeviceGateway; @@ -106,7 +109,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew } - class TcpConnection { + class TcpConnection implements DeviceGatewayContext { final TcpClient client; final AtomicReference keepaliveTimeout = new AtomicReference<>(); final AtomicReference sessionRef = new AtomicReference<>(); @@ -148,17 +151,21 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew } Mono accept() { - return client - .subscribe() - .filter(tcp -> started.get()) - .publishOn(Schedulers.parallel()) - .flatMap(this::handleTcpMessage) - .onErrorResume((err) -> { - log.error(err.getMessage(), err); - client.shutdown(); - return Mono.empty(); - }) - .then() + return getProtocol() + .flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this)) + .then( + client + .subscribe() + .filter(tcp -> started.get()) + .publishOn(Schedulers.parallel()) + .flatMap(this::handleTcpMessage) + .onErrorResume((err) -> { + log.error(err.getMessage(), err); + client.shutdown(); + return Mono.empty(); + }) + .then() + ) .doOnCancel(client::shutdown); } @@ -169,8 +176,10 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew .cast(DeviceMessage.class) .doOnNext(msg -> gatewayMonitor.receivedMessage()) .flatMap(this::handleDeviceMessage) - .doOnEach(ReactiveLogger.onError(err -> log - .error("处理TCP[{}]消息失败:\n{}", address, message, err))) + .doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}", + address, + message + , err))) .onErrorResume((err) -> Mono.fromRunnable(client::reset)) .then(); } @@ -194,6 +203,20 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew .then(); } + @Override + public Mono getDevice(String deviceId) { + return registry.getDevice(deviceId); + } + + @Override + public Mono getProduct(String productId) { + return registry.getProduct(productId); + } + + @Override + public Mono onMessage(DeviceMessage message) { + return handleDeviceMessage(message); + } }