增加保持设备在线功能

This commit is contained in:
zhouhao 2020-03-20 23:48:00 +08:00
parent 7bf9fe8214
commit b04bb10329
1 changed files with 65 additions and 37 deletions

View File

@ -1,15 +1,13 @@
package org.jetlinks.community.network.tcp.device;
import io.netty.buffer.ByteBufUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
@ -23,6 +21,7 @@ import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
@ -119,10 +118,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
counter.decrement();
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
sessionManager.unregister(client.getId());
});
AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
DeviceSession session = sessionManager.getSession(client.getId());
return client.subscribe()
.filter(r -> started.get())
.doOnNext(r -> gatewayMonitor.receivedMessage())
@ -136,7 +134,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
@Override
public DeviceSession getSession() {
DeviceSession session = sessionManager.getSession(client.getId());
//session还未注册
if (session == null) {
return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
@ -158,39 +155,70 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
public DeviceOperator getDevice() {
return null;
}
})))
.cast(DeviceMessage.class)
.flatMap(message -> registry
.getDevice(message.getDeviceId())
.flatMap(device -> {
//设备上线
if (message instanceof DeviceOnlineMessage) {
TcpDeviceSession session = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
};
if (keepaliveTimeout.get() != null) {
session.setKeepAliveTimeout(keepaliveTimeout.get());
}
sessionManager.register(session);
return Mono.empty();
}
//设备下线
if (message instanceof DeviceOfflineMessage) {
sessionManager.unregister(device.getDeviceId());
return Mono.empty();
}
if (processor.hasDownstreams()) {
sink.next(message);
}
return clientMessageHandler.handleMessage(device, message);
}))
.onErrorContinue((err, o) -> log.error(err.getMessage(), err));
.switchIfEmpty(Mono.fromRunnable(() ->
log.warn("无法识别的TCP客户端[{}]消息:[{}]",
client.getRemoteAddress(),
ByteBufUtil.hexDump(tcpMessage.getPayload())
)))
.cast(DeviceMessage.class)
.flatMap(message -> registry
.getDevice(message.getDeviceId())
.switchIfEmpty(Mono.fromRunnable(() -> {
log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}",
message.getDeviceId(),
client.getRemoteAddress(),
ByteBufUtil.hexDump(tcpMessage.getPayload()),
message
);
}))
.flatMap(device -> {
//处理设备上线消息
if (message instanceof DeviceOnlineMessage) {
DeviceSession fSession = session == null ?
sessionManager.getSession(device.getDeviceId()) :
session;
if (fSession == null) {
fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
};
//保持设备一直在线.通过短连接上报数据的场景.可以让设备一直为在线状态
if (message.getHeader(Headers.keepOnline).orElse(false)) {
fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1));
} else {
client.onDisconnect(() -> sessionManager.unregister(client.getId()));
}
sessionManager.register(fSession);
}
if (keepaliveTimeout.get() != null) {
fSession.setKeepAliveTimeout(keepaliveTimeout.get());
}
return Mono.empty();
}
//设备下线
if (message instanceof DeviceOfflineMessage) {
sessionManager.unregister(device.getDeviceId());
return Mono.empty();
}
message.addHeaderIfAbsent(Headers.clientAddress, client.getRemoteAddress().toString());
if (processor.hasDownstreams()) {
sink.next(message);
}
return clientMessageHandler.handleMessage(device, message);
}))
.onErrorContinue((err, o) ->
log.error("处理TCP[{}]消息[{}]失败",
client.getRemoteAddress(),
ByteBufUtil.hexDump(tcpMessage.getPayload())
, err))
);
}).subscribe());
}
@Override
public Flux<Message> onMessage() {
return processor.map(Function.identity());