diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java index 5dcb8ecf..b6790af5 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java @@ -1,15 +1,13 @@ package org.jetlinks.community.network.tcp.device; +import io.netty.buffer.ByteBufUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.ProtocolSupports; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; -import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.DeviceOfflineMessage; -import org.jetlinks.core.message.DeviceOnlineMessage; -import org.jetlinks.core.message.Message; +import org.jetlinks.core.message.*; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.FromDeviceMessageContext; @@ -23,6 +21,7 @@ import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway; import org.jetlinks.community.network.DefaultNetworkType; import org.jetlinks.community.network.NetworkType; import org.jetlinks.community.network.tcp.server.TcpServer; +import org.jetlinks.core.server.session.KeepOnlineSession; import org.jetlinks.supports.server.DecodedClientMessageHandler; import reactor.core.Disposable; import reactor.core.publisher.EmitterProcessor; @@ -119,10 +118,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew counter.decrement(); gatewayMonitor.disconnected(); gatewayMonitor.totalConnection(counter.sum()); - sessionManager.unregister(client.getId()); }); AtomicReference keepaliveTimeout = new AtomicReference<>(); - + DeviceSession session = sessionManager.getSession(client.getId()); return client.subscribe() .filter(r -> started.get()) .doOnNext(r -> gatewayMonitor.receivedMessage()) @@ -136,7 +134,6 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew @Override public DeviceSession getSession() { - DeviceSession session = sessionManager.getSession(client.getId()); //session还未注册 if (session == null) { return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) { @@ -158,39 +155,70 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew public DeviceOperator getDevice() { return null; } - }))) - .cast(DeviceMessage.class) - .flatMap(message -> registry - .getDevice(message.getDeviceId()) - .flatMap(device -> { - //设备上线 - if (message instanceof DeviceOnlineMessage) { - TcpDeviceSession session = new TcpDeviceSession(client.getId(), device, client, getTransport()) { - @Override - public Mono send(EncodedMessage encodedMessage) { - return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); - } - }; - if (keepaliveTimeout.get() != null) { - session.setKeepAliveTimeout(keepaliveTimeout.get()); - } - sessionManager.register(session); - return Mono.empty(); - } - //设备下线 - if (message instanceof DeviceOfflineMessage) { - sessionManager.unregister(device.getDeviceId()); - return Mono.empty(); - } - if (processor.hasDownstreams()) { - sink.next(message); - } - return clientMessageHandler.handleMessage(device, message); })) - .onErrorContinue((err, o) -> log.error(err.getMessage(), err)); + .switchIfEmpty(Mono.fromRunnable(() -> + log.warn("无法识别的TCP客户端[{}]消息:[{}]", + client.getRemoteAddress(), + ByteBufUtil.hexDump(tcpMessage.getPayload()) + ))) + .cast(DeviceMessage.class) + .flatMap(message -> registry + .getDevice(message.getDeviceId()) + .switchIfEmpty(Mono.fromRunnable(() -> { + log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}", + message.getDeviceId(), + client.getRemoteAddress(), + ByteBufUtil.hexDump(tcpMessage.getPayload()), + message + ); + })) + .flatMap(device -> { + //处理设备上线消息 + if (message instanceof DeviceOnlineMessage) { + DeviceSession fSession = session == null ? + sessionManager.getSession(device.getDeviceId()) : + session; + + if (fSession == null) { + fSession = new TcpDeviceSession(client.getId(), device, client, getTransport()) { + @Override + public Mono send(EncodedMessage encodedMessage) { + return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); + } + }; + //保持设备一直在线.(通过短连接上报数据的场景.可以让设备一直为在线状态) + if (message.getHeader(Headers.keepOnline).orElse(false)) { + fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1)); + } else { + client.onDisconnect(() -> sessionManager.unregister(client.getId())); + } + sessionManager.register(fSession); + } + if (keepaliveTimeout.get() != null) { + fSession.setKeepAliveTimeout(keepaliveTimeout.get()); + } + return Mono.empty(); + } + //设备下线 + if (message instanceof DeviceOfflineMessage) { + sessionManager.unregister(device.getDeviceId()); + return Mono.empty(); + } + message.addHeaderIfAbsent(Headers.clientAddress, client.getRemoteAddress().toString()); + + if (processor.hasDownstreams()) { + sink.next(message); + } + return clientMessageHandler.handleMessage(device, message); + })) + .onErrorContinue((err, o) -> + log.error("处理TCP[{}]消息[{}]失败", + client.getRemoteAddress(), + ByteBufUtil.hexDump(tcpMessage.getPayload()) + , err)) + ); }).subscribe()); } - @Override public Flux onMessage() { return processor.map(Function.identity());