diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java index ffeed9b0..58731e8b 100644 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.java @@ -158,38 +158,38 @@ class MqttServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGate MqttConnection connection) { return Mono .fromCallable(() -> { - String deviceId = device.getDeviceId(); - if (resp.isSuccess()) { - counter.increment(); - DeviceSession session = sessionManager.getSession(deviceId); - MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, gatewayMonitor); - if (null == session) { - sessionManager.register(newSession); - } else if (session instanceof ReplaceableDeviceSession) { - ((ReplaceableDeviceSession) session).replaceWith(newSession); - } - gatewayMonitor.connected(); - gatewayMonitor.totalConnection(counter.sum()); - //监听断开连接 - connection.onClose(conn -> { - counter.decrement(); - DeviceSession _tmp = sessionManager.getSession(newSession.getId()); - - if (newSession == _tmp || _tmp == null) { - sessionManager.unregister(deviceId); + try { + String deviceId = device.getDeviceId(); + if (resp.isSuccess()) { + counter.increment(); + DeviceSession session = sessionManager.getSession(deviceId); + MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, getTransport(), connection, gatewayMonitor); + if (null == session) { + sessionManager.register(newSession); + } else if (session instanceof ReplaceableDeviceSession) { + ((ReplaceableDeviceSession) session).replaceWith(newSession); } - gatewayMonitor.disconnected(); + gatewayMonitor.connected(); gatewayMonitor.totalConnection(counter.sum()); - }); - try { - return Tuples.of(connection.accept(), device, newSession); - } catch (IllegalStateException ignore) { + //监听断开连接 + connection.onClose(conn -> { + counter.decrement(); + DeviceSession _tmp = sessionManager.getSession(newSession.getId()); + if (newSession == _tmp || _tmp == null) { + sessionManager.unregister(deviceId); + } + gatewayMonitor.disconnected(); + gatewayMonitor.totalConnection(counter.sum()); + }); + return Tuples.of(connection.accept(), device, newSession); + } else { + log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage()); + connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); + gatewayMonitor.rejected(); } - } else { - connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD); - gatewayMonitor.rejected(); - log.warn("MQTT客户端认证[{}]失败:{}", deviceId, resp.getMessage()); + } catch (IllegalStateException ignore) { + } return null; })