From 539702caa6ae92f088d9187bbd45181596c4efc3 Mon Sep 17 00:00:00 2001 From: zhouhao Date: Thu, 30 Jul 2020 21:51:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E5=85=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../device/MqttServerDeviceGateway.java | 23 +- .../tcp/device/TcpServerDeviceGateway.java | 291 +++++++++--------- 2 files changed, 170 insertions(+), 144 deletions(-) diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index 3a9a272c..ab656d5c 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -30,6 +30,7 @@ import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple3; import reactor.util.function.Tuples; @@ -82,11 +83,12 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate return counter.sum(); } + private void doStart() { if (started.getAndSet(true) || disposable != null) { return; } - disposable = mqttServer + disposable = (Disposable) mqttServer .handleConnection() .filter(conn -> { if (!started.get()) { @@ -95,11 +97,14 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate } return started.get(); }) + + .publishOn(Schedulers.parallel()) .flatMap(this::handleConnection) .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3())) + .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()) , Integer.MAX_VALUE) .onErrorContinue((err, obj) -> log.error("处理MQTT连接失败", err)) .subscriberContext(ReactiveLogger.start("network", mqttServer.getId())) - .subscribe(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3())); + .subscribe(); } @@ -180,11 +185,16 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate } //处理已经建立连接的MQTT连接 - private void handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) { + private Mono handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, DeviceSession session) { - connection.handleMessage() + return connection + .handleMessage() .filter(pb -> started.get()) - .takeWhile(pub -> disposable != null) + .doOnCancel(() -> { + //流被取消时(可能网关关闭了)断开连接 + connection.close().subscribe(); + }) + .publishOn(Schedulers.parallel()) .doOnNext(msg -> gatewayMonitor.receivedMessage()) .flatMap(publishing -> this.decodeAndHandleMessage(operator, session, publishing.getMessage(), connection) @@ -197,7 +207,7 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate .flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, mqttMessage, connection)) ) .subscriberContext(ReactiveLogger.start("network", mqttServer.getId())) - .subscribe(); + .then(); } //解码消息并处理 @@ -221,7 +231,6 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate sink.next(msg); } String deviceId = msg.getDeviceId(); - //返回了其他设备的消息,则自动创建会话 if (!deviceId.equals(operator.getDeviceId())) { DeviceSession anotherSession = sessionManager.getSession(msg.getDeviceId()); diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java index 7c494389..8b3cfff2 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/device/TcpServerDeviceGateway.java @@ -1,12 +1,19 @@ package org.jetlinks.community.network.tcp.device; -import io.netty.buffer.ByteBufUtil; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.logger.ReactiveLogger; +import org.jetlinks.community.gateway.DeviceGateway; +import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; +import org.jetlinks.community.gateway.monitor.GatewayMonitors; +import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway; +import org.jetlinks.community.network.DefaultNetworkType; +import org.jetlinks.community.network.NetworkType; +import org.jetlinks.community.network.tcp.TcpMessage; +import org.jetlinks.community.network.tcp.client.TcpClient; +import org.jetlinks.community.network.tcp.server.TcpServer; import org.jetlinks.core.ProtocolSupport; import org.jetlinks.core.ProtocolSupports; -import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.*; import org.jetlinks.core.message.codec.DefaultTransport; @@ -15,13 +22,6 @@ import org.jetlinks.core.message.codec.FromDeviceMessageContext; import org.jetlinks.core.message.codec.Transport; import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.server.session.DeviceSessionManager; -import org.jetlinks.community.gateway.DeviceGateway; -import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; -import org.jetlinks.community.gateway.monitor.GatewayMonitors; -import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway; -import org.jetlinks.community.network.DefaultNetworkType; -import org.jetlinks.community.network.NetworkType; -import org.jetlinks.community.network.tcp.server.TcpServer; import org.jetlinks.core.server.session.KeepOnlineSession; import org.jetlinks.supports.server.DecodedClientMessageHandler; import reactor.core.Disposable; @@ -29,12 +29,11 @@ import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; -import javax.annotation.Nonnull; import java.net.InetSocketAddress; import java.time.Duration; -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -68,7 +67,7 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew private final AtomicBoolean started = new AtomicBoolean(); - private final List disposable = new CopyOnWriteArrayList<>(); + private Disposable disposable; public TcpServerDeviceGateway(String id, String protocol, @@ -107,129 +106,149 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew return DefaultNetworkType.TCP_SERVER; } - private void doStart() { - if (started.getAndSet(true) || !disposable.isEmpty()) { - return; + + class TcpConnection { + final TcpClient client; + final AtomicReference keepaliveTimeout = new AtomicReference<>(); + final AtomicReference sessionRef = new AtomicReference<>(); + final InetSocketAddress address; + + TcpConnection(TcpClient client) { + this.client = client; + this.address = client.getRemoteAddress(); + gatewayMonitor.totalConnection(counter.sum()); + client.onDisconnect(() -> { + counter.decrement(); + gatewayMonitor.disconnected(); + gatewayMonitor.totalConnection(counter.sum()); + }); + + DeviceSession session = sessionManager.getSession(client.getId()); + if (session == null) { + session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) { + @Override + public Mono send(EncodedMessage encodedMessage) { + return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); + } + + @Override + public void setKeepAliveTimeout(Duration timeout) { + keepaliveTimeout.set(timeout); + } + + @Override + public Optional getClientAddress() { + return Optional.of(address); + } + }; + } + + sessionRef.set(session); + } - disposable.add(tcpServer + Mono accept() { + return client + .subscribe() + .filter(tcp -> started.get()) + .doOnCancel(client::shutdown) + .flatMap(this::handleTcpMessage) + .onErrorContinue((err, ignore) -> log.error(err.getMessage(), err)) + .then(); + } + + Mono handleTcpMessage(TcpMessage message) { + return getProtocol() + .flatMap(pt -> pt.getMessageCodec(getTransport())) + .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message))) + .cast(DeviceMessage.class) + .flatMap(this::handleDeviceMessage) + .doOnEach(ReactiveLogger.onError(err -> + log.error("处理TCP[{}]消息失败:\n{}", + address, + message + , err))) + .onErrorResume((err) -> Mono.fromRunnable(client::reset)) + .then(); + } + + Mono handleDeviceMessage(DeviceMessage message) { + return registry + .getDevice(message.getDeviceId()) + .switchIfEmpty(Mono.defer(() -> { + if (processor.hasDownstreams()) { + sink.next(message); + } + if (message instanceof DeviceRegisterMessage) { + return clientMessageHandler + .handleMessage(null, message) + .then(Mono.empty()); + } else { + log.warn("无法从tcp[{}]消息中获取设备信息:{}",address, message); + return Mono.empty(); + } + })) + .flatMap(device -> { + DeviceSession fSession = sessionManager.getSession(device.getDeviceId()); + //处理设备上线消息 + if (message instanceof DeviceOnlineMessage) { + if (fSession == null) { + boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false); + String sessionId = device.getDeviceId(); + fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) { + @Override + public Mono send(EncodedMessage encodedMessage) { + return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); + } + }; + //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态) + if (keepOnline) { + fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1)); + } else { + client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId())); + } + sessionRef.set(fSession); + sessionManager.register(fSession); + } + fSession.keepAlive(); + if (keepaliveTimeout.get() != null) { + fSession.setKeepAliveTimeout(keepaliveTimeout.get()); + } + return Mono.empty(); + } + if (fSession != null) { + fSession.keepAlive(); + } + //设备下线 + if (message instanceof DeviceOfflineMessage) { + sessionManager.unregister(device.getDeviceId()); + return Mono.empty(); + } + message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(address)); + if (processor.hasDownstreams()) { + sink.next(message); + } + return clientMessageHandler.handleMessage(device, message); + }) + .then() + ; + } + } + + private void doStart() { + if (started.getAndSet(true) || disposable != null) { + return; + } + disposable = tcpServer .handleConnection() - .subscribe(client -> { - InetSocketAddress clientAddr = client.getRemoteAddress(); - counter.increment(); - gatewayMonitor.totalConnection(counter.intValue()); - client.onDisconnect(() -> { - counter.decrement(); - gatewayMonitor.disconnected(); - gatewayMonitor.totalConnection(counter.sum()); - }); - AtomicReference keepaliveTimeout = new AtomicReference<>(); - AtomicReference sessionRef = new AtomicReference<>(sessionManager.getSession(client.getId())); - client.subscribe() - .filter(r -> started.get()) - .takeWhile(r -> !disposable.isEmpty()) - .doOnNext(r -> { - log.debug("收到TCP报文:\n{}", r); - gatewayMonitor.receivedMessage(); - }) - .flatMap(tcpMessage -> getProtocol() - .flatMap(pt -> pt.getMessageCodec(getTransport())) - .flatMapMany(codec -> codec.decode(new FromDeviceMessageContext() { - @Override - @Nonnull - public EncodedMessage getMessage() { - return tcpMessage; - } - - @Override - public DeviceSession getSession() { - //session还未注册 - if (sessionRef.get() == null) { - return new UnknownTcpDeviceSession(client.getId(), client, getTransport()) { - @Override - public Mono send(EncodedMessage encodedMessage) { - return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); - } - - @Override - public void setKeepAliveTimeout(Duration timeout) { - keepaliveTimeout.set(timeout); - } - }; - } - return sessionRef.get(); - } - - @Override - public DeviceOperator getDevice() { - return getSession().getOperator(); - } - })) - .cast(DeviceMessage.class) - .flatMap(message -> registry - .getDevice(message.getDeviceId()) - .switchIfEmpty(Mono.fromRunnable(() -> { - log.warn("设备[{}]未注册,TCP[{}]消息:[{}],设备消息:{}", - message.getDeviceId(), - clientAddr, - ByteBufUtil.hexDump(tcpMessage.getPayload()), - message - ); - })) - .flatMap(device -> { - DeviceSession fSession = sessionManager.getSession(device.getDeviceId()); - //处理设备上线消息 - if (message instanceof DeviceOnlineMessage) { - if (fSession == null) { - boolean keepOnline = message.getHeader(Headers.keepOnline).orElse(false); - String sessionId = device.getDeviceId(); - fSession = new TcpDeviceSession(sessionId, device, client, getTransport()) { - @Override - public Mono send(EncodedMessage encodedMessage) { - return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage()); - } - }; - //保持设备一直在线.(短连接上报数据的场景.可以让设备一直为在线状态) - if (keepOnline) { - fSession = new KeepOnlineSession(fSession, Duration.ofMillis(-1)); - } else { - client.onDisconnect(() -> sessionManager.unregister(device.getDeviceId())); - } - sessionRef.set(fSession); - sessionManager.register(fSession); - } - fSession.keepAlive(); - if (keepaliveTimeout.get() != null) { - fSession.setKeepAliveTimeout(keepaliveTimeout.get()); - } - return Mono.empty(); - } - if (fSession != null) { - fSession.keepAlive(); - } - //设备下线 - if (message instanceof DeviceOfflineMessage) { - sessionManager.unregister(device.getDeviceId()); - return Mono.empty(); - } - message.addHeaderIfAbsent(Headers.clientAddress, String.valueOf(clientAddr)); - - if (processor.hasDownstreams()) { - sink.next(message); - } - return clientMessageHandler.handleMessage(device, message); - })) - .doOnEach(ReactiveLogger.onError(err -> - log.error("处理TCP[{}]消息失败:\n{}", - clientAddr, - tcpMessage - , err))) - .onErrorResume((err) -> Mono.fromRunnable(client::reset)) - ) - .onErrorResume((err) -> Mono.fromRunnable(client::reset)) - .subscriberContext(ReactiveLogger.start("network", tcpServer.getId())) - .subscribe(); - })); + .publishOn(Schedulers.parallel()) + .flatMap(client -> new TcpConnection(client).accept(), Integer.MAX_VALUE) + .subscriberContext(ReactiveLogger.start("network", tcpServer.getId())) + .subscribe( + ignore -> { + }, + error -> log.error(error.getMessage(), error) + ); } @Override @@ -251,10 +270,8 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew public Mono shutdown() { return Mono.fromRunnable(() -> { started.set(false); - - disposable.forEach(Disposable::dispose); - - disposable.clear(); + disposable.dispose(); + disposable = null; }); }