refactor: 优化DeviceGatewayHelper

This commit is contained in:
zhouhao 2025-03-19 10:54:27 +08:00
parent c01d31606c
commit 7b7b96f096
1 changed files with 126 additions and 62 deletions

View File

@ -14,17 +14,21 @@ import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.server.session.LostDeviceSession;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
import javax.validation.constraints.NotNull;
import java.time.Duration;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.jetlinks.core.message.Headers.ignoreIfOffline;
/**
* 设备网关消息处理,会话管理工具类,用于统一封装对设备消息和会话的处理逻辑
*
@ -41,19 +45,6 @@ public class DeviceGatewayHelper {
private final DeviceSessionManager sessionManager;
private final DecodedClientMessageHandler messageHandler;
public static Consumer<DeviceSession> applySessionKeepaliveTimeout(DeviceMessage msg, Supplier<Duration> timeoutSupplier) {
return session -> {
Integer timeout = msg.getHeaderOrElse(Headers.keepOnlineTimeoutSeconds, () -> null);
if (null != timeout) {
session.setKeepAliveTimeout(Duration.ofSeconds(timeout));
} else {
Duration defaultTimeout = timeoutSupplier.get();
if (null != defaultTimeout) {
session.setKeepAliveTimeout(defaultTimeout);
}
}
};
}
public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
Function<DeviceOperator, DeviceSession> sessionBuilder) {
@ -83,7 +74,7 @@ public class DeviceGatewayHelper {
return handleDeviceMessage(message, sessionBuilder, sessionConsumer, () -> Mono.fromRunnable(deviceNotFoundCallback));
}
private Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
private void handleChildrenDeviceMessage(String deviceId, DeviceMessage children, HandlerContext ctx) {
//设备状态检查,断开设备连接的消息都忽略
//这些消息属于状态管理,通常是用来自定义子设备状态的,所以这些消息都忽略处理会话
if (deviceId == null
@ -92,13 +83,13 @@ public class DeviceGatewayHelper {
|| children instanceof DisconnectDeviceMessage
|| children instanceof DisconnectDeviceMessageReply
|| children.getHeaderOrDefault(Headers.ignoreSession)) {
return Mono.empty();
return;
}
//子设备回复失败的也忽略
if (children instanceof DeviceMessageReply) {
DeviceMessageReply reply = ((DeviceMessageReply) children);
if (!reply.isSuccess()) {
return Mono.empty();
return;
}
}
String childrenId = children.getDeviceId();
@ -107,27 +98,42 @@ public class DeviceGatewayHelper {
if (children instanceof DeviceOfflineMessage || children instanceof DeviceUnRegisterMessage) {
//注销会话,这里子设备可能会收到多次离线消息
//注销会话一次离线,消息网关转发子设备消息一次
return sessionManager
.remove(childrenId, removeSessionOnlyLocal(children))
.doOnNext(total -> {
if (total > 0 && children instanceof DeviceOfflineMessage) {
children.addHeader(Headers.ignore, true);
}
})
.then();
//先执行移除子设备会话,防止header设置失败
ctx.before(
sessionManager
.remove(childrenId, removeSessionOnlyLocal(children))
.doOnNext(total -> {
//移除了会话会触发离线消息,忽略掉本次的消息.
if (total > 0 && children instanceof DeviceOfflineMessage) {
children.addHeaderIfAbsent(Headers.ignore, true);
}
//没有会话被移除(已经离线),但是手动指定了忽略离线消息.
if (total == 0 && children.getHeaderOrDefault(ignoreIfOffline)) {
children.addHeader(Headers.ignore, true);
}
})
.then()
);
} else {
//子设备上线
if (children instanceof DeviceOnlineMessage) {
children.addHeader(Headers.ignore, true);
//没有标记force,则忽略上线消息,避免产生重复的上线消息.
if (!children.getHeader(Headers.force).orElse(false)) {
children.addHeaderIfAbsent(Headers.ignore, true);
}
}
//子设备会话处理
Mono<DeviceSession> sessionHandler = sessionManager
.getSession(deviceId)
.flatMap(parentSession -> this
.createOrUpdateSession(childrenId,
children,
child -> Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child)),
Mono::empty)
.createOrUpdateSession(
childrenId,
children,
child -> {
//新创建了的会话?
return Mono.just(new ChildrenDeviceSession(childrenId, parentSession, child));
},
Mono::empty)
.doOnNext(session -> {
if (session.isWrapFrom(ChildrenDeviceSession.class)) {
ChildrenDeviceSession childrenSession = session.unwrap(ChildrenDeviceSession.class);
@ -141,17 +147,21 @@ public class DeviceGatewayHelper {
//子设备注册
if (isDoRegister(children)) {
return this
.getDeviceForRegister(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler)
.then();
ctx.after(
this
.getDeviceForRegister(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals))
.flatMap(ignore -> sessionHandler)
.then()
);
} else {
ctx.after(sessionHandler.then());
}
return sessionHandler.then();
}
}
@ -163,26 +173,26 @@ public class DeviceGatewayHelper {
if (!StringUtils.hasText(deviceId)) {
return Mono.empty();
}
Mono<DeviceOperator> then = null;
HandlerContext ctx = new HandlerContext();
boolean doHandle = true;
Context context = Context.of(DeviceMessage.class, message);
//子设备消息
if (message instanceof ChildDeviceMessage) {
DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
then = handleChildrenDeviceMessage(deviceId, childrenMessage)
.then(registry.getDevice(deviceId));
handleChildrenDeviceMessage(deviceId, childrenMessage, ctx);
}
//子设备消息回复
else if (message instanceof ChildDeviceMessageReply) {
DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
then = handleChildrenDeviceMessage(deviceId, childrenMessage)
.then(registry.getDevice(deviceId));
handleChildrenDeviceMessage(deviceId, childrenMessage, ctx);
}
//设备离线消息
else if (message instanceof DeviceOfflineMessage) {
return sessionManager
.remove(deviceId, removeSessionOnlyLocal(message))
.flatMap(l -> {
if (l == 0) {
if (l == 0 && !message.getHeaderOrDefault(ignoreIfOffline)) {
return registry
.getDevice(deviceId)
.flatMap(device -> handleMessage(device, message));
@ -190,7 +200,7 @@ public class DeviceGatewayHelper {
return Mono.empty();
})
.then(registry.getDevice(deviceId))
.contextWrite(Context.of(DeviceMessage.class, message));
.contextWrite(context);
}
//设备上线消息,不发送到messageHandler,防止设备上线存在重复消息
else if (message instanceof DeviceOnlineMessage) {
@ -199,30 +209,37 @@ public class DeviceGatewayHelper {
.orElse(false);
}
if (then == null) {
then = registry.getDevice(deviceId);
}
//忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
if (message.getHeaderOrDefault(Headers.ignoreSession)) {
if (!isDoRegister(message)) {
return handleMessage(null, message)
.then(then);
return ctx
.execute(handleMessage(null, message))
.then(registry.getDevice(deviceId))
.contextWrite(context);
}
return then;
return ctx
.execute(Mono.empty())
.then(registry.getDevice(deviceId))
.contextWrite(context);
}
if (doHandle) {
then = handleMessage(null, message)
.then(then);
ctx.after(handleMessage(null, message));
}
return this
.createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
.flatMap(sessionConsumer)
.then(then)
.contextWrite(Context.of(DeviceMessage.class, message));
return ctx
.execute(
this
.createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
.flatMap(sessionConsumer)
)
.then(registry.getDevice(deviceId))
.contextWrite(context);
// return this
// .createOrUpdateSession(deviceId, message, sessionBuilder, deviceNotFoundCallback)
// .flatMap(sessionConsumer)
// .then(then)
// .contextWrite(context);
}
@ -272,7 +289,8 @@ public class DeviceGatewayHelper {
return Mono.empty();
}),
session -> updateSession(session, message, sessionBuilder))))
.flatMap(Function.identity());
.flatMap(Function.identity())
.map(session -> handleSession(message, session));
}
private Mono<DeviceOperator> getDeviceForRegister(String deviceId) {
@ -343,6 +361,17 @@ public class DeviceGatewayHelper {
: after;
}
private DeviceSession handleSession(DeviceMessage message, DeviceSession session) {
//尝试设置ignoreParent
if (session.isWrapFrom(KeepOnlineSession.class)) {
message
.getHeader(Headers.keepOnlineIgnoreParent)
.ifPresent(session.unwrap(KeepOnlineSession.class)::setIgnoreParent);
}
return session;
}
private static void applySessionKeepaliveTimeout(DeviceMessage msg, DeviceSession session) {
Integer timeout = msg.getHeaderOrElse(Headers.keepOnlineTimeoutSeconds, () -> null);
if (null != timeout) {
@ -413,4 +442,39 @@ public class DeviceGatewayHelper {
}
private static class HandlerContext {
Mono<Void> before;
Mono<Void> after;
public void before(Mono<Void> before) {
if (this.before == null) {
this.before = before;
} else {
this.before = before.then(this.before);
}
}
public void after(Mono<Void> after) {
if (this.after == null) {
this.after = after;
} else {
this.after = this.after.then(after);
}
}
public Mono<Void> execute(Mono<Void> executor) {
Mono<Void> task = executor;
if (before != null) {
task = before.then(task);
}
if (after != null) {
task = task.then(after);
}
return task;
}
}
}