Merge pull request #62 from qq517634644/master

docs(网络组件&设备网关):
This commit is contained in:
老周 2021-06-07 09:25:52 +08:00 committed by GitHub
commit f04e02590d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 131 additions and 46 deletions

View File

@ -3,6 +3,9 @@ package org.jetlinks.community;
import org.jetlinks.core.config.ConfigKey;
/**
* 数据验证配置常量类
*
* @author zhouhao
* @see ConfigKey
*/
public interface ConfigMetadataConstants {

View File

@ -9,6 +9,11 @@ import org.jetlinks.community.utils.TimeUtils;
import java.util.Date;
/**
* 时间反序列化配置
*
* @author zhouhao
*/
public class SmartDateDeserializer extends JsonDeserializer<Date> {
@Override
@SneakyThrows

View File

@ -29,6 +29,11 @@ import java.time.temporal.TemporalAdjusters;
import java.util.function.LongSupplier;
/**
* 时间转换工具
*
* @author zhouhao
*/
public class DateMathParser {

View File

@ -5,17 +5,19 @@ import org.hswebframework.web.exception.NotFoundException;
import reactor.core.publisher.Mono;
/**
* 异常处理工具
*
* @author wangzheng
* @see
* @since 1.0
*/
public class ErrorUtils {
public static <T> Mono<T> notFound(String message){
return Mono.error(()->new NotFoundException(message));
public static <T> Mono<T> notFound(String message) {
return Mono.error(() -> new NotFoundException(message));
}
public static <T> Mono<T> accessDeny(String message){
return Mono.error(()->new AccessDenyException(message));
public static <T> Mono<T> accessDeny(String message) {
return Mono.error(() -> new AccessDenyException(message));
}
}

View File

@ -5,6 +5,11 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Date;
/**
* 时间工具类
*
* @author zhouhao
*/
public class TimeUtils {

View File

@ -54,7 +54,12 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
.subscribe(t -> this.checkNetwork());
}
/**
* 检查网络 把需要加载的网络组件启动起来
*/
protected void checkNetwork() {
// 获取并过滤所有停止的网络组件
// 重新加载启动状态的网络组件
Flux.fromIterable(store.values())
.flatMapIterable(Map::values)
.filter(i -> !i.isAlive())
@ -89,6 +94,14 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
.map(n -> (T) n);
}
/**
* 如果store中不存在网络组件就创建存在就重新加载
*
* @param provider 网络组件支持提供商
* @param id 网络组件唯一标识
* @param properties 网络组件配置
* @return 网络组件
*/
public Network doCreate(NetworkProvider<Object> provider, String id, Object properties) {
return getNetworkStore(provider.getType()).compute(id, (s, network) -> {
if (network == null) {

View File

@ -2,6 +2,11 @@ package org.jetlinks.community.network;
import reactor.core.publisher.Mono;
/**
* 网络组件配置管理器
*
* @author zhouhao
*/
public interface NetworkConfigManager {
Mono<NetworkProperties> getConfig(NetworkType networkType, String id);

View File

@ -6,6 +6,11 @@ import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509KeyManager;
import java.security.cert.X509Certificate;
/**
* 证书接口
*
* @author zhouhao
*/
public interface Certificate {
String getId();

View File

@ -2,6 +2,11 @@ package org.jetlinks.community.network.security;
import reactor.core.publisher.Mono;
/**
* 证书管理接口
*
* @author zhouhao
*/
public interface CertificateManager {
Mono<Certificate> getCertificate(String id);

View File

@ -13,13 +13,18 @@ import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
/**
* 默认证书
*
* @author zhouhao
*/
public class DefaultCertificate implements Certificate {
@Getter
private String id;
private final String id;
@Getter
private String name;
private final String name;
private KeyStoreHelper keyHelper;
@ -131,12 +136,12 @@ public class DefaultCertificate implements Certificate {
return EMPTY;
}
return Arrays.stream(trustHelper
.getTrustMgrs())
.filter(X509TrustManager.class::isInstance)
.map(X509TrustManager.class::cast)
.map(X509TrustManager::getAcceptedIssuers)
.flatMap(Stream::of)
.toArray(X509Certificate[]::new);
.getTrustMgrs())
.filter(X509TrustManager.class::isInstance)
.map(X509TrustManager.class::cast)
.map(X509TrustManager::getAcceptedIssuers)
.flatMap(Stream::of)
.toArray(X509Certificate[]::new);
}
@Override

View File

@ -29,27 +29,23 @@ import java.util.function.Function;
@Slf4j
public class VertxTcpClient implements TcpClient {
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Getter
private final String id;
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final boolean serverClient;
public volatile NetClient client;
public NetSocket socket;
volatile PayloadParser payloadParser;
@Setter
private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
private volatile long lastKeepAliveTime = System.currentTimeMillis();
private final List<Runnable> disconnectListener = new CopyOnWriteArrayList<>();
private final EmitterProcessor<TcpMessage> processor = EmitterProcessor.create(false);
private final FluxSink<TcpMessage> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final boolean serverClient;
public VertxTcpClient(String id, boolean serverClient) {
this.id = id;
this.serverClient = serverClient;
}
@Override
public void keepAlive() {
@ -68,7 +64,6 @@ public class VertxTcpClient implements TcpClient {
}
}
@Override
public InetSocketAddress address() {
return getRemoteAddress();
@ -77,7 +72,7 @@ public class VertxTcpClient implements TcpClient {
@Override
public Mono<Void> sendMessage(EncodedMessage message) {
return Mono
.<Void>create((sink) -> {
.create((sink) -> {
if (socket == null) {
sink.error(new SocketException("socket closed"));
return;
@ -116,11 +111,11 @@ public class VertxTcpClient implements TcpClient {
return true;
}
public VertxTcpClient(String id,boolean serverClient) {
this.id = id;
this.serverClient=serverClient;
}
/**
* 接收TCP消息
*
* @param message TCP消息
*/
protected void received(TcpMessage message) {
if (processor.getPending() > processor.getBufferSize() / 2) {
log.warn("tcp [{}] message pending {} ,drop message:{}", processor.getPending(), getRemoteAddress(), message.toString());
@ -178,7 +173,7 @@ public class VertxTcpClient implements TcpClient {
execute(runnable);
}
disconnectListener.clear();
if(serverClient){
if (serverClient) {
processor.onComplete();
}
}
@ -191,6 +186,11 @@ public class VertxTcpClient implements TcpClient {
this.client = client;
}
/**
* 设置客户端消息解析器
*
* @param payloadParser 消息解析器
*/
public void setRecordParser(PayloadParser payloadParser) {
synchronized (this) {
if (null != this.payloadParser && this.payloadParser != payloadParser) {
@ -206,6 +206,11 @@ public class VertxTcpClient implements TcpClient {
}
}
/**
* socket处理
*
* @param socket socket
*/
public void setSocket(NetSocket socket) {
synchronized (this) {
Objects.requireNonNull(payloadParser);

View File

@ -137,6 +137,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
if (started.getAndSet(true) || disposable != null) {
return;
}
// 从TCPServer中获取连接的client
// client实例化为TcpConnection之后处理消息
disposable = tcpServer
.handleConnection()
.publishOn(Schedulers.parallel())

View File

@ -19,6 +19,11 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* TCP服务提供商
*
* @author zhouhao
*/
@Component
@Slf4j
public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
@ -51,16 +56,26 @@ public class TcpServerProvider implements NetworkProvider<TcpServerProperties> {
return tcpServer;
}
/**
* TCP服务初始化
*
* @param tcpServer TCP服务
* @param properties TCP配置
*/
private void initTcpServer(VertxTcpServer tcpServer, TcpServerProperties properties) {
int instance = Math.max(2, properties.getInstance());
List<NetServer> instances = new ArrayList<>(instance);
for (int i = 0; i < instance; i++) {
instances.add(vertx.createNetServer(properties.getOptions()));
}
// 根据解析类型配置数据解析器
payloadParserBuilder.build(properties.getParserType(), properties);
tcpServer.setParserSupplier(() -> payloadParserBuilder.build(properties.getParserType(), properties));
tcpServer.setServer(instances);
tcpServer.setKeepAliveTimeout(properties.getLong("keepAliveTimeout", Duration.ofMinutes(10).toMillis()));
// 针对JVM做的多路复用优化
// 多个server listen同一个端口每个client连接的时候vertx会分配
// 一个connection只能在一个server中处理
for (NetServer netServer : instances) {
netServer.listen(properties.createSocketAddress(), result -> {
if (result.succeeded()) {

View File

@ -26,19 +26,14 @@ import java.util.function.Supplier;
@Slf4j
public class VertxTcpServer implements TcpServer {
Collection<NetServer> tcpServers;
private Supplier<PayloadParser> parserSupplier;
@Setter
private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
@Getter
private final String id;
private final EmitterProcessor<TcpClient> processor = EmitterProcessor.create(false);
private final FluxSink<TcpClient> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
Collection<NetServer> tcpServers;
private Supplier<PayloadParser> parserSupplier;
@Setter
private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
public VertxTcpServer(String id) {
this.id = id;
@ -62,6 +57,11 @@ public class VertxTcpServer implements TcpServer {
this.parserSupplier = parserSupplier;
}
/**
* 为每个NetServer添加connectHandler
*
* @param servers 创建的所有NetServer
*/
public void setServer(Collection<NetServer> servers) {
if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
shutdown();
@ -74,24 +74,34 @@ public class VertxTcpServer implements TcpServer {
}
/**
* TCP连接处理逻辑
*
* @param socket socket
*/
protected void acceptTcpConnection(NetSocket socket) {
if (!processor.hasDownstreams()) {
log.warn("not handler for tcp client[{}]", socket.remoteAddress());
socket.close();
return;
}
// 客户端连接处理
VertxTcpClient client = new VertxTcpClient(id + "_" + socket.remoteAddress(), true);
client.setKeepAliveTimeoutMs(keepAliveTimeout);
try {
// TCP异常和关闭处理
socket.exceptionHandler(err -> {
log.error("tcp server client [{}] error", socket.remoteAddress(), err);
}).closeHandler((nil) -> {
log.debug("tcp server client [{}] closed", socket.remoteAddress());
client.shutdown();
});
// 这个地方是在TCP服务初始化的时候设置的 parserSupplier
// set方法 org.jetlinks.community.network.tcp.server.VertxTcpServer.setParserSupplier
// 调用坐标 org.jetlinks.community.network.tcp.server.TcpServerProvider.initTcpServer
client.setRecordParser(parserSupplier.get());
client.setSocket(socket);
// client放进了发射器
sink.next(client);
log.debug("accept tcp client [{}] connection", socket.remoteAddress());
} catch (Exception e) {

View File

@ -3,10 +3,10 @@ package org.jetlinks.community.network.tcp.server;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetServerOptions;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.parser.DefaultPayloadParserBuilder;
import org.jetlinks.community.network.tcp.parser.PayloadParserType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;