优化网关
This commit is contained in:
parent
8663d01be8
commit
539702caa6
|
|
@ -30,6 +30,7 @@ import reactor.core.publisher.EmitterProcessor;
|
|||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.util.function.Tuple3;
|
||||
import reactor.util.function.Tuples;
|
||||
|
||||
|
|
@ -82,11 +83,12 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|||
return counter.sum();
|
||||
}
|
||||
|
||||
|
||||
private void doStart() {
|
||||
if (started.getAndSet(true) || disposable != null) {
|
||||
return;
|
||||
}
|
||||
disposable = mqttServer
|
||||
disposable = (Disposable) mqttServer
|
||||
.handleConnection()
|
||||
.filter(conn -> {
|
||||
if (!started.get()) {
|
||||
|
|
@ -95,11 +97,14 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|||
}
|
||||
return started.get();
|
||||
})
|
||||
|
||||
.publishOn(Schedulers.parallel())
|
||||
.flatMap(this::handleConnection)
|
||||
.flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
|
||||
.flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()) , Integer.MAX_VALUE)
|
||||
.onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err))
|
||||
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
|
||||
.subscribe(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()));
|
||||
.subscribe();
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -180,11 +185,16 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|||
}
|
||||
|
||||
//处理已经建立连接的MQTT连接
|
||||
private void handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) {
|
||||
private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) {
|
||||
|
||||
connection.handleMessage()
|
||||
return connection
|
||||
.handleMessage()
|
||||
.filter(pb -> started.get())
|
||||
.takeWhile(pub -> disposable != null)
|
||||
.doOnCancel(() -> {
|
||||
//流被取消时(可能网关关闭了)断开连接
|
||||
connection.close().subscribe();
|
||||
})
|
||||
.publishOn(Schedulers.parallel())
|
||||
.doOnNext(msg -> gatewayMonitor.receivedMessage())
|
||||
.flatMap(publishing ->
|
||||
this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection)
|
||||
|
|
@ -197,7 +207,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|||
.flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection))
|
||||
)
|
||||
.subscriberContext(ReactiveLogger.start("network", mqttServer.getId()))
|
||||
.subscribe();
|
||||
.then();
|
||||
}
|
||||
|
||||
//解码消息并处理
|
||||
|
|
@ -221,7 +231,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate
|
|||
sink.next(msg);
|
||||
}
|
||||
String deviceId = msg.getDeviceId();
|
||||
|
||||
//返回了其他设备的消息,则自动创建会话
|
||||
if (!deviceId.equals(operator.getDeviceId())) {
|
||||
DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId());
|
||||
|
|
|
|||
|
|
@ -1,12 +1,19 @@
|
|||
package org.jetlinks.community.network.tcp.device;
|
||||
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.hswebframework.web.logger.ReactiveLogger;
|
||||
import org.jetlinks.community.gateway.DeviceGateway;
|
||||
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
|
||||
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
|
||||
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
|
||||
import org.jetlinks.community.network.DefaultNetworkType;
|
||||
import org.jetlinks.community.network.NetworkType;
|
||||
import org.jetlinks.community.network.tcp.TcpMessage;
|
||||
import org.jetlinks.community.network.tcp.client.TcpClient;
|
||||
import org.jetlinks.community.network.tcp.server.TcpServer;
|
||||
import org.jetlinks.core.ProtocolSupport;
|
||||
import org.jetlinks.core.ProtocolSupports;
|
||||
import org.jetlinks.core.device.DeviceOperator;
|
||||
import org.jetlinks.core.device.DeviceRegistry;
|
||||
import org.jetlinks.core.message.*;
|
||||
import org.jetlinks.core.message.codec.DefaultTransport;
|
||||
|
|
@ -15,13 +22,6 @@ import org.jetlinks.core.message.codec.FromDeviceMessageContext;
|
|||
import org.jetlinks.core.message.codec.Transport;
|
||||
import org.jetlinks.core.server.session.DeviceSession;
|
||||
import org.jetlinks.core.server.session.DeviceSessionManager;
|
||||
import org.jetlinks.community.gateway.DeviceGateway;
|
||||
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
|
||||
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
|
||||
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
|
||||
import org.jetlinks.community.network.DefaultNetworkType;
|
||||
import org.jetlinks.community.network.NetworkType;
|
||||
import org.jetlinks.community.network.tcp.server.TcpServer;
|
||||
import org.jetlinks.core.server.session.KeepOnlineSession;
|
||||
import org.jetlinks.supports.server.DecodedClientMessageHandler;
|
||||
import reactor.core.Disposable;
|
||||
|
|
@ -29,12 +29,11 @@ import reactor.core.publisher.EmitterProcessor;
|
|||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
|
@ -68,7 +67,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|||
|
||||
private final AtomicBoolean started = new AtomicBoolean();
|
||||
|
||||
private final List<Disposable> disposable = new CopyOnWriteArrayList<>();
|
||||
private Disposable disposable;
|
||||
|
||||
public TcpServerDeviceGateway(String id,
|
||||
String protocol,
|
||||
|
|
@ -107,129 +106,149 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|||
return DefaultNetworkType.TCP_SERVER;
|
||||
}
|
||||
|
||||
private void doStart() {
|
||||
if (started.getAndSet(true) || !disposable.isEmpty()) {
|
||||
return;
|
||||
|
||||
class TcpConnection {
|
||||
final TcpClient client;
|
||||
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
||||
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
|
||||
final InetSocketAddress address;
|
||||
|
||||
TcpConnection(TcpClient client) {
|
||||
this.client = client;
|
||||
this.address = client.getRemoteAddress();
|
||||
gatewayMonitor.totalConnection(counter.sum());
|
||||
client.onDisconnect(() -> {
|
||||
counter.decrement();
|
||||
gatewayMonitor.disconnected();
|
||||
gatewayMonitor.totalConnection(counter.sum());
|
||||
});
|
||||
|
||||
DeviceSession session = sessionManager.getSession(client.getId());
|
||||
if (session == null) {
|
||||
session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
|
||||
@Override
|
||||
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
||||
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setKeepAliveTimeout(Duration timeout) {
|
||||
keepaliveTimeout.set(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<InetSocketAddress> getClientAddress() {
|
||||
return Optional.of(address);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
sessionRef.set(session);
|
||||
|
||||
}
|
||||
|
||||
disposable.add(tcpServer
|
||||
Mono<Void> accept() {
|
||||
return client
|
||||
.subscribe()
|
||||
.filter(tcp -> started.get())
|
||||
.doOnCancel(client::shutdown)
|
||||
.flatMap(this::handleTcpMessage)
|
||||
.onErrorContinue((err, ignore) -> log.error(err.getMessage(), err))
|
||||
.then();
|
||||
}
|
||||
|
||||
Mono<Void> handleTcpMessage(TcpMessage message) {
|
||||
return getProtocol()
|
||||
.flatMap(pt -> pt.getMessageCodec(getTransport()))
|
||||
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message)))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(this::handleDeviceMessage)
|
||||
.doOnEach(ReactiveLogger.onError(err ->
|
||||
log.error("处理TCP[{}]消息失败:\n{}",
|
||||
address,
|
||||
message
|
||||
, err)))
|
||||
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
|
||||
.then();
|
||||
}
|
||||
|
||||
Mono<Void> handleDeviceMessage(DeviceMessage message) {
|
||||
return registry
|
||||
.getDevice(message.getDeviceId())
|
||||
.switchIfEmpty(Mono.defer(() -> {
|
||||
if (processor.hasDownstreams()) {
|
||||
sink.next(message);
|
||||
}
|
||||
if (message instanceof DeviceRegisterMessage) {
|
||||
return clientMessageHandler
|
||||
.handleMessage(null, message)
|
||||
.then(Mono.empty());
|
||||
} else {
|
||||
log.warn("无法从tcp[{}]消息中获取设备信息:{}",address, message);
|
||||
return Mono.empty();
|
||||
}
|
||||
}))
|
||||
.flatMap(device -> {
|
||||
DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
|
||||
//处理设备上线消息
|
||||
if (message instanceof DeviceOnlineMessage) {
|
||||
if (fSession == null) {
|
||||
boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
|
||||
String sessionId = device.getDeviceId();
|
||||
fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
|
||||
@Override
|
||||
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
||||
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
||||
}
|
||||
};
|
||||
//保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
|
||||
if (keepOnline) {
|
||||
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
||||
} else {
|
||||
client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
|
||||
}
|
||||
sessionRef.set(fSession);
|
||||
sessionManager.register(fSession);
|
||||
}
|
||||
fSession.keepAlive();
|
||||
if (keepaliveTimeout.get() != null) {
|
||||
fSession.setKeepAliveTimeout(keepaliveTimeout.get());
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
if (fSession != null) {
|
||||
fSession.keepAlive();
|
||||
}
|
||||
//设备下线
|
||||
if (message instanceof DeviceOfflineMessage) {
|
||||
sessionManager.unregister(device.getDeviceId());
|
||||
return Mono.empty();
|
||||
}
|
||||
message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(address));
|
||||
if (processor.hasDownstreams()) {
|
||||
sink.next(message);
|
||||
}
|
||||
return clientMessageHandler.handleMessage(device, message);
|
||||
})
|
||||
.then()
|
||||
;
|
||||
}
|
||||
}
|
||||
|
||||
private void doStart() {
|
||||
if (started.getAndSet(true) || disposable != null) {
|
||||
return;
|
||||
}
|
||||
disposable = tcpServer
|
||||
.handleConnection()
|
||||
.subscribe(client -> {
|
||||
InetSocketAddress clientAddr = client.getRemoteAddress();
|
||||
counter.increment();
|
||||
gatewayMonitor.totalConnection(counter.intValue());
|
||||
client.onDisconnect(() -> {
|
||||
counter.decrement();
|
||||
gatewayMonitor.disconnected();
|
||||
gatewayMonitor.totalConnection(counter.sum());
|
||||
});
|
||||
AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
|
||||
AtomicReference<DeviceSession> sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId()));
|
||||
client.subscribe()
|
||||
.filter(r -> started.get())
|
||||
.takeWhile(r -> !disposable.isEmpty())
|
||||
.doOnNext(r -> {
|
||||
log.debug("收到TCP报文:\n{}", r);
|
||||
gatewayMonitor.receivedMessage();
|
||||
})
|
||||
.flatMap(tcpMessage -> getProtocol()
|
||||
.flatMap(pt -> pt.getMessageCodec(getTransport()))
|
||||
.flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() {
|
||||
@Override
|
||||
@Nonnull
|
||||
public EncodedMessage getMessage() {
|
||||
return tcpMessage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceSession getSession() {
|
||||
//session还未注册
|
||||
if (sessionRef.get() == null) {
|
||||
return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
|
||||
@Override
|
||||
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
||||
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setKeepAliveTimeout(Duration timeout) {
|
||||
keepaliveTimeout.set(timeout);
|
||||
}
|
||||
};
|
||||
}
|
||||
return sessionRef.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceOperator getDevice() {
|
||||
return getSession().getOperator();
|
||||
}
|
||||
}))
|
||||
.cast(DeviceMessage.class)
|
||||
.flatMap(message -> registry
|
||||
.getDevice(message.getDeviceId())
|
||||
.switchIfEmpty(Mono.fromRunnable(() -> {
|
||||
log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
|
||||
message.getDeviceId(),
|
||||
clientAddr,
|
||||
ByteBufUtil.hexDump(tcpMessage.getPayload()),
|
||||
message
|
||||
);
|
||||
}))
|
||||
.flatMap(device -> {
|
||||
DeviceSession fSession = sessionManager.getSession(device.getDeviceId());
|
||||
//处理设备上线消息
|
||||
if (message instanceof DeviceOnlineMessage) {
|
||||
if (fSession == null) {
|
||||
boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false);
|
||||
String sessionId = device.getDeviceId();
|
||||
fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) {
|
||||
@Override
|
||||
public Mono<Boolean> send(EncodedMessage encodedMessage) {
|
||||
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
|
||||
}
|
||||
};
|
||||
//保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态)
|
||||
if (keepOnline) {
|
||||
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
|
||||
} else {
|
||||
client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId()));
|
||||
}
|
||||
sessionRef.set(fSession);
|
||||
sessionManager.register(fSession);
|
||||
}
|
||||
fSession.keepAlive();
|
||||
if (keepaliveTimeout.get() != null) {
|
||||
fSession.setKeepAliveTimeout(keepaliveTimeout.get());
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
if (fSession != null) {
|
||||
fSession.keepAlive();
|
||||
}
|
||||
//设备下线
|
||||
if (message instanceof DeviceOfflineMessage) {
|
||||
sessionManager.unregister(device.getDeviceId());
|
||||
return Mono.empty();
|
||||
}
|
||||
message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(clientAddr));
|
||||
|
||||
if (processor.hasDownstreams()) {
|
||||
sink.next(message);
|
||||
}
|
||||
return clientMessageHandler.handleMessage(device, message);
|
||||
}))
|
||||
.doOnEach(ReactiveLogger.onError(err ->
|
||||
log.error("处理TCP[{}]消息失败:\n{}",
|
||||
clientAddr,
|
||||
tcpMessage
|
||||
, err)))
|
||||
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
|
||||
)
|
||||
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
|
||||
.subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
|
||||
.subscribe();
|
||||
}));
|
||||
.publishOn(Schedulers.parallel())
|
||||
.flatMap(client -> new TcpConnection(client).accept(), Integer.MAX_VALUE)
|
||||
.subscriberContext(ReactiveLogger.start("network", tcpServer.getId()))
|
||||
.subscribe(
|
||||
ignore -> {
|
||||
},
|
||||
error -> log.error(error.getMessage(), error)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -251,10 +270,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
|
|||
public Mono<Void> shutdown() {
|
||||
return Mono.fromRunnable(() -> {
|
||||
started.set(false);
|
||||
|
||||
disposable.forEach(Disposable::dispose);
|
||||
|
||||
disposable.clear();
|
||||
disposable.dispose();
|
||||
disposable = null;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue