diff --git a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpDeviceSession.java b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpDeviceSession.java index 41868ed8..84d86f3e 100755 --- a/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpDeviceSession.java +++ b/jetlinks-components/network-component/http-component/src/main/java/org/jetlinks/community/network/http/device/HttpDeviceSession.java @@ -2,6 +2,8 @@ package org.jetlinks.community.network.http.device; import lombok.Setter; import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.enums.ErrorCode; +import org.jetlinks.core.exception.DeviceOperationException; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.Transport; @@ -70,8 +72,9 @@ class HttpDeviceSession implements DeviceSession { @Override public Mono send(EncodedMessage encodedMessage) { - if(websocket==null){ - return Reactors.ALWAYS_FALSE; + //未建立websocket链接,不支持此类消息. + if(websocket == null){ + return Mono.error(new DeviceOperationException(ErrorCode.UNSUPPORTED_MESSAGE)); } if (encodedMessage instanceof WebSocketMessage) { return websocket diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java index 9be03b5b..665e4d17 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.java @@ -174,7 +174,9 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway { } private MqttClientSession createDeviceSession(DeviceOperator device, MqttClient client) { - return new MqttClientSession(device.getDeviceId(), device, client, monitor); + MqttClientSession session = new MqttClientSession(device.getDeviceId(), device, client, monitor); + session.setGatewayId(getId()); + return session; } @Override diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSession.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSession.java index 1b81310e..3d1afb73 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSession.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSession.java @@ -1,14 +1,17 @@ package org.jetlinks.community.network.mqtt.gateway.device.session; +import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; -import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.enums.ErrorCode; +import org.jetlinks.core.exception.DeviceOperationException; import org.jetlinks.core.message.codec.DefaultTransport; import org.jetlinks.core.message.codec.EncodedMessage; import org.jetlinks.core.message.codec.MqttMessage; import org.jetlinks.core.message.codec.Transport; -import org.jetlinks.core.server.session.DeviceSession; +import org.jetlinks.core.server.session.PersistentSession; +import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor; import org.jetlinks.community.network.mqtt.client.MqttClient; import reactor.core.publisher.Mono; @@ -20,29 +23,45 @@ import java.time.Duration; * @author zhouhao * @since 1.0 */ -public class MqttClientSession implements DeviceSession { +public class MqttClientSession implements PersistentSession { @Getter private final String id; @Getter private final DeviceOperator operator; + private MqttClient clientTemp; + @Getter @Setter - private MqttClient client; + private Mono client; - private final long connectTime = System.currentTimeMillis(); + @Setter(AccessLevel.PROTECTED) + private long connectTime = System.currentTimeMillis(); + @Setter(AccessLevel.PROTECTED) private long lastPingTime = System.currentTimeMillis(); private long keepAliveTimeout = -1; private final DeviceGatewayMonitor monitor; + @Getter + @Setter + private String gatewayId; + public MqttClientSession(String id, DeviceOperator operator, MqttClient client, DeviceGatewayMonitor monitor) { + this(id, operator, Mono.just(client), monitor); + this.clientTemp = client; + } + + public MqttClientSession(String id, + DeviceOperator operator, + Mono client, + DeviceGatewayMonitor monitor) { this.id = id; this.operator = operator; this.client = client; @@ -67,13 +86,19 @@ public class MqttClientSession implements DeviceSession { @Override public Mono send(EncodedMessage encodedMessage) { if (encodedMessage instanceof MqttMessage) { - monitor.sentMessage(); + if (monitor != null) { + monitor.sentMessage(); + } return client - .publish(((MqttMessage) encodedMessage)) - .thenReturn(true) - ; + .flatMap(client -> { + this.clientTemp = client; + return client + .publish(((MqttMessage) encodedMessage)) + .thenReturn(true); + }); } - return Mono.error(new UnsupportedOperationException("unsupported message type:" + encodedMessage.getClass())); + return Mono.error(new DeviceOperationException + .NoStackTrace(ErrorCode.UNSUPPORTED_MESSAGE, "error.unsupported_mqtt_message_type")); } @Override @@ -93,10 +118,19 @@ public class MqttClientSession implements DeviceSession { @Override public boolean isAlive() { - return client.isAlive() && + return (clientTemp == null || clientTemp.isAlive()) && (keepAliveTimeout <= 0 || System.currentTimeMillis() - lastPingTime < keepAliveTimeout); } + @Override + public Mono isAliveAsync() { + return client + .map(client -> { + this.clientTemp = client; + return isAlive(); + }); + } + @Override public void onClose(Runnable call) { @@ -113,4 +147,9 @@ public class MqttClientSession implements DeviceSession { "id=" + id + ",device=" + getDeviceId() + '}'; } + + @Override + public String getProvider() { + return MqttClientSessionPersistentProvider.PROVIDER; + } } diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSessionPersistentProvider.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSessionPersistentProvider.java new file mode 100644 index 00000000..441b132a --- /dev/null +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/gateway/device/session/MqttClientSessionPersistentProvider.java @@ -0,0 +1,133 @@ +package org.jetlinks.community.network.mqtt.gateway.device.session; + +import lombok.SneakyThrows; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.server.session.DeviceSessionProvider; +import org.jetlinks.core.server.session.DeviceSessionProviders; +import org.jetlinks.core.server.session.PersistentSession; +import org.jetlinks.core.utils.SerializeUtils; +import org.jetlinks.community.gateway.monitor.GatewayMonitors; +import org.jetlinks.community.network.DefaultNetworkType; +import org.jetlinks.community.network.NetworkManager; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.time.Duration; + +import static org.jetlinks.community.codec.Serializers.getDefault; + +@Component +public class MqttClientSessionPersistentProvider implements DeviceSessionProvider { + public static final String PROVIDER = "mqtt-client"; + + private final NetworkManager networkManager; + + public MqttClientSessionPersistentProvider(NetworkManager networkManager) { + this.networkManager = networkManager; + DeviceSessionProviders.register(this); + } + + @Override + public String getId() { + return PROVIDER; + } + + @Override + public Mono deserialize(byte[] sessionData, DeviceRegistry registry) { + + return Mono + .fromCallable(() -> { + ByteArrayInputStream stream = new ByteArrayInputStream(sessionData); + SessionData data = new SessionData(); + try (ObjectInput input = getDefault().createInput(stream)) { + data.readExternal(input); + } + return data; + }) + .flatMap(data -> data.toSession(registry, networkManager)); + } + + @Override + public Mono serialize(PersistentSession session, DeviceRegistry registry) { + if (!session.isWrapFrom(MqttClientSession.class)) { + return Mono.empty(); + } + return SessionData + .of(session.unwrap(MqttClientSession.class)) + .flatMap(data -> Mono + .fromCallable(() -> { + ByteArrayOutputStream stream = new ByteArrayOutputStream(128); + try (ObjectOutput output = getDefault().createOutput(stream)) { + data.writeExternal(output); + } + return stream.toByteArray(); + })); + } + + static class SessionData { + private String deviceId; + private String networkId; + private String gatewayId; + private long lastPingTime; + private long connectTime; + private long keepAliveTimeout; + + public SessionData() { + } + + public static Mono of(MqttClientSession session) { + SessionData data = new SessionData(); + data.deviceId = session.getDeviceId(); + data.gatewayId = session.getGatewayId(); + data.lastPingTime = session.lastPingTime(); + data.connectTime = session.connectTime(); + data.keepAliveTimeout = session.getKeepAliveTimeout().toMillis(); + return session + .getClient() + .doOnNext(client -> data.networkId = client.getId()) + .thenReturn(data); + } + + public Mono toSession(DeviceRegistry registry, + NetworkManager manager) { + return registry + .getDevice(deviceId) + .map(device -> { + MqttClientSession session = new MqttClientSession( + deviceId, + device, + manager.getNetwork(DefaultNetworkType.MQTT_CLIENT, networkId), + GatewayMonitors.getDeviceGatewayMonitor(gatewayId)); + + session.setKeepAliveTimeout(Duration.ofMillis(keepAliveTimeout)); + session.setLastPingTime(lastPingTime); + session.setConnectTime(connectTime); + return session; + }); + } + + @SneakyThrows + public void writeExternal(ObjectOutput out) { + out.writeUTF(deviceId); + out.writeUTF(networkId); + SerializeUtils.writeNullableUTF(gatewayId, out); + out.writeLong(lastPingTime); + out.writeLong(connectTime); + out.writeLong(keepAliveTimeout); + } + + @SneakyThrows + public void readExternal(ObjectInput in) { + deviceId = in.readUTF(); + networkId = in.readUTF(); + gatewayId = SerializeUtils.readNullableUTF(in); + lastPingTime = in.readLong(); + connectTime = in.readLong(); + keepAliveTimeout = in.readLong(); + } + } +}