feat(基础模块): MqttClient设备会话支持可恢复. (#538)

This commit is contained in:
老周 2024-07-10 10:59:22 +08:00 committed by GitHub
parent 8b3509442f
commit 2dd0ef3c93
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 191 additions and 14 deletions

View File

@ -2,6 +2,8 @@ package org.jetlinks.community.network.http.device;
import lombok.Setter;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport;
@ -70,8 +72,9 @@ class HttpDeviceSession implements DeviceSession {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
if(websocket==null){
return Reactors.ALWAYS_FALSE;
//未建立websocket链接,不支持此类消息.
if(websocket == null){
return Mono.error(new DeviceOperationException(ErrorCode.UNSUPPORTED_MESSAGE));
}
if (encodedMessage instanceof WebSocketMessage) {
return websocket

View File

@ -174,7 +174,9 @@ public class MqttClientDeviceGateway extends AbstractDeviceGateway {
}
private MqttClientSession createDeviceSession(DeviceOperator device, MqttClient client) {
return new MqttClientSession(device.getDeviceId(), device, client, monitor);
MqttClientSession session = new MqttClientSession(device.getDeviceId(), device, client, monitor);
session.setGatewayId(getId());
return session;
}
@Override

View File

@ -1,14 +1,17 @@
package org.jetlinks.community.network.mqtt.gateway.device.session;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import reactor.core.publisher.Mono;
@ -20,29 +23,45 @@ import java.time.Duration;
* @author zhouhao
* @since 1.0
*/
public class MqttClientSession implements DeviceSession {
public class MqttClientSession implements PersistentSession {
@Getter
private final String id;
@Getter
private final DeviceOperator operator;
private MqttClient clientTemp;
@Getter
@Setter
private MqttClient client;
private Mono<MqttClient> client;
private final long connectTime = System.currentTimeMillis();
@Setter(AccessLevel.PROTECTED)
private long connectTime = System.currentTimeMillis();
@Setter(AccessLevel.PROTECTED)
private long lastPingTime = System.currentTimeMillis();
private long keepAliveTimeout = -1;
private final DeviceGatewayMonitor monitor;
@Getter
@Setter
private String gatewayId;
public MqttClientSession(String id,
DeviceOperator operator,
MqttClient client,
DeviceGatewayMonitor monitor) {
this(id, operator, Mono.just(client), monitor);
this.clientTemp = client;
}
public MqttClientSession(String id,
DeviceOperator operator,
Mono<MqttClient> client,
DeviceGatewayMonitor monitor) {
this.id = id;
this.operator = operator;
this.client = client;
@ -67,13 +86,19 @@ public class MqttClientSession implements DeviceSession {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
if (encodedMessage instanceof MqttMessage) {
monitor.sentMessage();
if (monitor != null) {
monitor.sentMessage();
}
return client
.publish(((MqttMessage) encodedMessage))
.thenReturn(true)
;
.flatMap(client -> {
this.clientTemp = client;
return client
.publish(((MqttMessage) encodedMessage))
.thenReturn(true);
});
}
return Mono.error(new UnsupportedOperationException("unsupported message type:" + encodedMessage.getClass()));
return Mono.error(new DeviceOperationException
.NoStackTrace(ErrorCode.UNSUPPORTED_MESSAGE, "error.unsupported_mqtt_message_type"));
}
@Override
@ -93,10 +118,19 @@ public class MqttClientSession implements DeviceSession {
@Override
public boolean isAlive() {
return client.isAlive() &&
return (clientTemp == null || clientTemp.isAlive()) &&
(keepAliveTimeout <= 0 || System.currentTimeMillis() - lastPingTime < keepAliveTimeout);
}
@Override
public Mono<Boolean> isAliveAsync() {
return client
.map(client -> {
this.clientTemp = client;
return isAlive();
});
}
@Override
public void onClose(Runnable call) {
@ -113,4 +147,9 @@ public class MqttClientSession implements DeviceSession {
"id=" + id + ",device=" + getDeviceId() +
'}';
}
@Override
public String getProvider() {
return MqttClientSessionPersistentProvider.PROVIDER;
}
}

View File

@ -0,0 +1,133 @@
package org.jetlinks.community.network.mqtt.gateway.device.session;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionProvider;
import org.jetlinks.core.server.session.DeviceSessionProviders;
import org.jetlinks.core.server.session.PersistentSession;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.time.Duration;
import static org.jetlinks.community.codec.Serializers.getDefault;
@Component
public class MqttClientSessionPersistentProvider implements DeviceSessionProvider {
public static final String PROVIDER = "mqtt-client";
private final NetworkManager networkManager;
public MqttClientSessionPersistentProvider(NetworkManager networkManager) {
this.networkManager = networkManager;
DeviceSessionProviders.register(this);
}
@Override
public String getId() {
return PROVIDER;
}
@Override
public Mono<PersistentSession> deserialize(byte[] sessionData, DeviceRegistry registry) {
return Mono
.fromCallable(() -> {
ByteArrayInputStream stream = new ByteArrayInputStream(sessionData);
SessionData data = new SessionData();
try (ObjectInput input = getDefault().createInput(stream)) {
data.readExternal(input);
}
return data;
})
.flatMap(data -> data.toSession(registry, networkManager));
}
@Override
public Mono<byte[]> serialize(PersistentSession session, DeviceRegistry registry) {
if (!session.isWrapFrom(MqttClientSession.class)) {
return Mono.empty();
}
return SessionData
.of(session.unwrap(MqttClientSession.class))
.flatMap(data -> Mono
.fromCallable(() -> {
ByteArrayOutputStream stream = new ByteArrayOutputStream(128);
try (ObjectOutput output = getDefault().createOutput(stream)) {
data.writeExternal(output);
}
return stream.toByteArray();
}));
}
static class SessionData {
private String deviceId;
private String networkId;
private String gatewayId;
private long lastPingTime;
private long connectTime;
private long keepAliveTimeout;
public SessionData() {
}
public static Mono<SessionData> of(MqttClientSession session) {
SessionData data = new SessionData();
data.deviceId = session.getDeviceId();
data.gatewayId = session.getGatewayId();
data.lastPingTime = session.lastPingTime();
data.connectTime = session.connectTime();
data.keepAliveTimeout = session.getKeepAliveTimeout().toMillis();
return session
.getClient()
.doOnNext(client -> data.networkId = client.getId())
.thenReturn(data);
}
public Mono<PersistentSession> toSession(DeviceRegistry registry,
NetworkManager manager) {
return registry
.getDevice(deviceId)
.map(device -> {
MqttClientSession session = new MqttClientSession(
deviceId,
device,
manager.getNetwork(DefaultNetworkType.MQTT_CLIENT, networkId),
GatewayMonitors.getDeviceGatewayMonitor(gatewayId));
session.setKeepAliveTimeout(Duration.ofMillis(keepAliveTimeout));
session.setLastPingTime(lastPingTime);
session.setConnectTime(connectTime);
return session;
});
}
@SneakyThrows
public void writeExternal(ObjectOutput out) {
out.writeUTF(deviceId);
out.writeUTF(networkId);
SerializeUtils.writeNullableUTF(gatewayId, out);
out.writeLong(lastPingTime);
out.writeLong(connectTime);
out.writeLong(keepAliveTimeout);
}
@SneakyThrows
public void readExternal(ObjectInput in) {
deviceId = in.readUTF();
networkId = in.readUTF();
gatewayId = SerializeUtils.readNullableUTF(in);
lastPingTime = in.readLong();
connectTime = in.readLong();
keepAliveTimeout = in.readLong();
}
}
}