diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ConfigMetadataConstants.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ConfigMetadataConstants.java index 9dff0381..181b3e8b 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ConfigMetadataConstants.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ConfigMetadataConstants.java @@ -3,6 +3,9 @@ package org.jetlinks.community; import org.jetlinks.core.config.ConfigKey; /** + * 数据验证配置常量类 + * + * @author zhouhao * @see ConfigKey */ public interface ConfigMetadataConstants { diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/SmartDateDeserializer.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/SmartDateDeserializer.java index c1e36468..12fbc95f 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/SmartDateDeserializer.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/SmartDateDeserializer.java @@ -9,6 +9,11 @@ import org.jetlinks.community.utils.TimeUtils; import java.util.Date; +/** + * 时间反序列化配置 + * + * @author zhouhao + */ public class SmartDateDeserializer extends JsonDeserializer { @Override @SneakyThrows diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/DateMathParser.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/DateMathParser.java index 125767c0..57dca237 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/DateMathParser.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/DateMathParser.java @@ -29,6 +29,11 @@ import java.time.temporal.TemporalAdjusters; import java.util.function.LongSupplier; +/** + * 时间转换工具 + * + * @author zhouhao + */ public class DateMathParser { diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ErrorUtils.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ErrorUtils.java index dd4d31d0..f29d9d04 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ErrorUtils.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/ErrorUtils.java @@ -5,17 +5,19 @@ import org.hswebframework.web.exception.NotFoundException; import reactor.core.publisher.Mono; /** + * 异常处理工具 + * * @author wangzheng * @see * @since 1.0 */ public class ErrorUtils { - public static Mono notFound(String message){ - return Mono.error(()->new NotFoundException(message)); + public static Mono notFound(String message) { + return Mono.error(() -> new NotFoundException(message)); } - public static Mono accessDeny(String message){ - return Mono.error(()->new AccessDenyException(message)); + public static Mono accessDeny(String message) { + return Mono.error(() -> new AccessDenyException(message)); } } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/TimeUtils.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/TimeUtils.java index a2659869..1dc44e28 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/TimeUtils.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/utils/TimeUtils.java @@ -5,6 +5,11 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Date; +/** + * 时间工具类 + * + * @author zhouhao + */ public class TimeUtils { diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java index 3b698aad..4d866549 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/DefaultNetworkManager.java @@ -54,7 +54,12 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor .subscribe(t -> this.checkNetwork()); } + /** + * 检查网络 把需要加载的网络组件启动起来 + */ protected void checkNetwork() { + // 获取并过滤所有停止的网络组件 + // 重新加载启动状态的网络组件 Flux.fromIterable(store.values()) .flatMapIterable(Map::values) .filter(i -> !i.isAlive()) @@ -89,6 +94,14 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor .map(n -> (T) n); } + /** + * 如果store中不存在网络组件就创建,存在就重新加载 + * + * @param provider 网络组件支持提供商 + * @param id 网络组件唯一标识 + * @param properties 网络组件配置 + * @return 网络组件 + */ public Network doCreate(NetworkProvider provider, String id, Object properties) { return getNetworkStore(provider.getType()).compute(id, (s, network) -> { if (network == null) { diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkConfigManager.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkConfigManager.java index 4010642e..730c0c58 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkConfigManager.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/NetworkConfigManager.java @@ -2,6 +2,11 @@ package org.jetlinks.community.network; import reactor.core.publisher.Mono; +/** + * 网络组件配置管理器 + * + * @author zhouhao + */ public interface NetworkConfigManager { Mono getConfig(NetworkType networkType, String id); diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/Certificate.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/Certificate.java index f0cc0489..3335ac4b 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/Certificate.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/Certificate.java @@ -6,6 +6,11 @@ import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509KeyManager; import java.security.cert.X509Certificate; +/** + * 证书接口 + * + * @author zhouhao + */ public interface Certificate { String getId(); diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/CertificateManager.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/CertificateManager.java index a6e6b4d4..b991faed 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/CertificateManager.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/CertificateManager.java @@ -2,6 +2,11 @@ package org.jetlinks.community.network.security; import reactor.core.publisher.Mono; +/** + * 证书管理接口 + * + * @author zhouhao + */ public interface CertificateManager { Mono getCertificate(String id); diff --git a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/DefaultCertificate.java b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/DefaultCertificate.java index 3da6a5fa..d16d2762 100644 --- a/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/DefaultCertificate.java +++ b/jetlinks-components/network-component/network-core/src/main/java/org/jetlinks/community/network/security/DefaultCertificate.java @@ -13,13 +13,18 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Stream; +/** + * 默认证书 + * + * @author zhouhao + */ public class DefaultCertificate implements Certificate { @Getter - private String id; + private final String id; @Getter - private String name; + private final String name; private KeyStoreHelper keyHelper; @@ -131,12 +136,12 @@ public class DefaultCertificate implements Certificate { return EMPTY; } return Arrays.stream(trustHelper - .getTrustMgrs()) - .filter(X509TrustManager.class::isInstance) - .map(X509TrustManager.class::cast) - .map(X509TrustManager::getAcceptedIssuers) - .flatMap(Stream::of) - .toArray(X509Certificate[]::new); + .getTrustMgrs()) + .filter(X509TrustManager.class::isInstance) + .map(X509TrustManager.class::cast) + .map(X509TrustManager::getAcceptedIssuers) + .flatMap(Stream::of) + .toArray(X509Certificate[]::new); } @Override diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java index 6da4c4f2..9cdae161 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/VertxTcpClient.java @@ -29,27 +29,23 @@ import java.util.function.Function; @Slf4j public class VertxTcpClient implements TcpClient { - public volatile NetClient client; - - public NetSocket socket; - - volatile PayloadParser payloadParser; - @Getter private final String id; - + private final List disconnectListener = new CopyOnWriteArrayList<>(); + private final EmitterProcessor processor = EmitterProcessor.create(false); + private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); + private final boolean serverClient; + public volatile NetClient client; + public NetSocket socket; + volatile PayloadParser payloadParser; @Setter private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis(); - private volatile long lastKeepAliveTime = System.currentTimeMillis(); - private final List disconnectListener = new CopyOnWriteArrayList<>(); - - private final EmitterProcessor processor = EmitterProcessor.create(false); - - private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); - - private final boolean serverClient; + public VertxTcpClient(String id, boolean serverClient) { + this.id = id; + this.serverClient = serverClient; + } @Override public void keepAlive() { @@ -68,7 +64,6 @@ public class VertxTcpClient implements TcpClient { } } - @Override public InetSocketAddress address() { return getRemoteAddress(); @@ -77,7 +72,7 @@ public class VertxTcpClient implements TcpClient { @Override public Mono sendMessage(EncodedMessage message) { return Mono - .create((sink) -> { + .create((sink) -> { if (socket == null) { sink.error(new SocketException("socket closed")); return; @@ -116,11 +111,11 @@ public class VertxTcpClient implements TcpClient { return true; } - public VertxTcpClient(String id,boolean serverClient) { - this.id = id; - this.serverClient=serverClient; - } - + /** + * 接收TCP消息 + * + * @param message TCP消息 + */ protected void received(TcpMessage message) { if (processor.getPending() > processor.getBufferSize() / 2) { log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString()); @@ -178,7 +173,7 @@ public class VertxTcpClient implements TcpClient { execute(runnable); } disconnectListener.clear(); - if(serverClient){ + if (serverClient) { processor.onComplete(); } } @@ -191,6 +186,11 @@ public class VertxTcpClient implements TcpClient { this.client = client; } + /** + * 设置客户端消息解析器 + * + * @param payloadParser 消息解析器 + */ public void setRecordParser(PayloadParser payloadParser) { synchronized (this) { if (null != this.payloadParser && this.payloadParser != payloadParser) { @@ -206,6 +206,11 @@ public class VertxTcpClient implements TcpClient { } } + /** + * socket处理 + * + * @param socket socket + */ public void setSocket(NetSocket socket) { synchronized (this) { Objects.requireNonNull(payloadParser); diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java index e4f4b115..532e30bc 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java @@ -137,6 +137,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew if (started.getAndSet(true) || disposable != null) { return; } + // 从TCPServer中获取连接的client + // client实例化为TcpConnection之后处理消息 disposable = tcpServer .handleConnection() .publishOn(Schedulers.parallel()) diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java index 200bf81a..2fcbeff8 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProvider.java @@ -19,6 +19,11 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +/** + * TCP服务提供商 + * + * @author zhouhao + */ @Component @Slf4j public class TcpServerProvider implements NetworkProvider { @@ -51,16 +56,26 @@ public class TcpServerProvider implements NetworkProvider { return tcpServer; } + /** + * TCP服务初始化 + * + * @param tcpServer TCP服务 + * @param properties TCP配置 + */ private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) { int instance = Math.max(2, properties.getInstance()); List instances = new ArrayList<>(instance); for (int i = 0; i < instance; i++) { instances.add(vertx.createNetServer(properties.getOptions())); } + // 根据解析类型配置数据解析器 payloadParserBuilder.build(properties.getParserType(), properties); tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties)); tcpServer.setServer(instances); tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis())); + // 针对JVM做的多路复用优化 + // 多个server listen同一个端口,每个client连接的时候vertx会分配 + // 一个connection只能在一个server中处理 for (NetServer netServer : instances) { netServer.listen(properties.createSocketAddress(), result -> { if (result.succeeded()) { diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java index acaabc39..d4863790 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/VertxTcpServer.java @@ -26,19 +26,14 @@ import java.util.function.Supplier; @Slf4j public class VertxTcpServer implements TcpServer { - Collection tcpServers; - - private Supplier parserSupplier; - - @Setter - private long keepAliveTimeout = Duration.ofMinutes(10).toMillis(); - @Getter private final String id; - private final EmitterProcessor processor = EmitterProcessor.create(false); - private final FluxSink sink = processor.sink(FluxSink.OverflowStrategy.BUFFER); + Collection tcpServers; + private Supplier parserSupplier; + @Setter + private long keepAliveTimeout = Duration.ofMinutes(10).toMillis(); public VertxTcpServer(String id) { this.id = id; @@ -62,6 +57,11 @@ public class VertxTcpServer implements TcpServer { this.parserSupplier = parserSupplier; } + /** + * 为每个NetServer添加connectHandler + * + * @param servers 创建的所有NetServer + */ public void setServer(Collection servers) { if (this.tcpServers != null && !this.tcpServers.isEmpty()) { shutdown(); @@ -74,24 +74,34 @@ public class VertxTcpServer implements TcpServer { } - + /** + * TCP连接处理逻辑 + * + * @param socket socket + */ protected void acceptTcpConnection(NetSocket socket) { if (!processor.hasDownstreams()) { log.warn("not handler for tcp client[{}]", socket.remoteAddress()); socket.close(); return; } + // 客户端连接处理 VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress(), true); client.setKeepAliveTimeoutMs(keepAliveTimeout); try { + // TCP异常和关闭处理 socket.exceptionHandler(err -> { log.error("tcp server client [{}] error", socket.remoteAddress(), err); }).closeHandler((nil) -> { log.debug("tcp server client [{}] closed", socket.remoteAddress()); client.shutdown(); }); + // 这个地方是在TCP服务初始化的时候设置的 parserSupplier + // set方法 org.jetlinks.community.network.tcp.server.VertxTcpServer.setParserSupplier + // 调用坐标 org.jetlinks.community.network.tcp.server.TcpServerProvider.initTcpServer client.setRecordParser(parserSupplier.get()); client.setSocket(socket); + // client放进了发射器 sink.next(client); log.debug("accept tcp client [{}] connection", socket.remoteAddress()); } catch (Exception e) { diff --git a/jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/server/TcpServerProviderTest.java b/jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/server/TcpServerProviderTest.java index 69f82b86..b76e1088 100644 --- a/jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/server/TcpServerProviderTest.java +++ b/jetlinks-components/network-component/tcp-component/src/test/java/org/jetlinks/community/network/tcp/server/TcpServerProviderTest.java @@ -3,10 +3,10 @@ package org.jetlinks.community.network.tcp.server; import io.vertx.core.Vertx; import io.vertx.core.net.NetServerOptions; import lombok.extern.slf4j.Slf4j; +import org.jetlinks.community.network.tcp.TcpMessage; import org.jetlinks.community.network.tcp.client.TcpClient; import org.jetlinks.community.network.tcp.parser.DefaultPayloadParserBuilder; import org.jetlinks.community.network.tcp.parser.PayloadParserType; -import org.jetlinks.community.network.tcp.TcpMessage; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono;