优化子设备消息处理

This commit is contained in:
zhou-hao 2020-11-30 18:30:14 +08:00
parent 1ce8d1e0d9
commit e813d23beb
1 changed files with 39 additions and 26 deletions

View File

@ -24,7 +24,6 @@ public class DeviceGatewayHelper {
private final DeviceSessionManager sessionManager;
private final DecodedClientMessageHandler messageHandler;
public static Consumer<DeviceSession> applySessionKeepaliveTimeout(DeviceMessage msg, Supplier<Duration> timeoutSupplier) {
return session -> {
Duration timeout = msg
@ -45,29 +44,35 @@ public class DeviceGatewayHelper {
return handleDeviceMessage(message, sessionBuilder, sessionConsumer, () -> Mono.fromRunnable(deviceNotFoundListener));
}
protected Mono<Void> handleChildrenDeviceMessage(String deviceId, DeviceMessage children) {
ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, children.getDeviceId());
if (deviceSession == null && null != children.getDeviceId()) {
Mono<Void> then = sessionManager
.registerChildren(deviceId, children.getDeviceId())
.then();
//子设备注册
if (isDoRegister(children)) {
then = Mono.delay(Duration.ofSeconds(2))
.then(then);
}
return then;
}
return Mono.empty();
}
public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
Function<DeviceOperator, DeviceSession> sessionBuilder,
Consumer<DeviceSession> sessionConsumer,
Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
Function<DeviceOperator, DeviceSession> sessionBuilder,
Consumer<DeviceSession> sessionConsumer,
Supplier<Mono<DeviceOperator>> deviceNotFoundListener) {
String deviceId = message.getDeviceId();
Mono<Void> then = Mono.empty();
boolean doHandle = true;
if (message instanceof ChildDeviceMessage) {
DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, childrenMessage.getDeviceId());
if (deviceSession == null) {
then = sessionManager
.registerChildren(deviceId, childrenMessage.getDeviceId())
.then(Mono.empty());
}
then = handleChildrenDeviceMessage(deviceId,childrenMessage);
} else if (message instanceof ChildDeviceMessageReply) {
DeviceMessage childrenMessage = (DeviceMessage) ((ChildDeviceMessageReply) message).getChildDeviceMessage();
ChildrenDeviceSession deviceSession = sessionManager.getSession(deviceId, childrenMessage.getDeviceId());
if (deviceSession == null) {
then = sessionManager
.registerChildren(deviceId, childrenMessage.getDeviceId())
.then(Mono.empty());
}
then = handleChildrenDeviceMessage(deviceId,childrenMessage);
} else if (message instanceof DeviceOfflineMessage) {
//设备离线消息
DeviceSession session = sessionManager.unregister(deviceId);
@ -91,15 +96,12 @@ public class DeviceGatewayHelper {
.getDevice(deviceId)
.switchIfEmpty(Mono.defer(() -> {
//设备注册
if (message instanceof DeviceRegisterMessage) {
if (message.getHeader(PropertyConstants.deviceName).isPresent()
&& message.getHeader(PropertyConstants.productId).isPresent()) {
return messageHandler
.handleMessage(null, message)
//延迟2秒后尝试重新获取设备并上线
.then(Mono.delay(Duration.ofSeconds(2)))
.then(registry.getDevice(deviceId));
}
if (isDoRegister(message)) {
return messageHandler
.handleMessage(null, message)
//延迟2秒后尝试重新获取设备并上线
.then(Mono.delay(Duration.ofSeconds(2)))
.then(registry.getDevice(deviceId));
}
if (deviceNotFoundListener != null) {
return deviceNotFoundListener.get();
@ -130,12 +132,23 @@ public class DeviceGatewayHelper {
} else {
sessionConsumer.accept(session);
session.keepAlive();
if (doHandle) {
return messageHandler
.handleMessage(session.getOperator(), message)
.then(then)
.then(registry.getDevice(deviceId));
}
return then
.then(doHandle ? messageHandler.handleMessage(session.getOperator(), message) : Mono.empty())
.then(registry.getDevice(deviceId));
}
}
private boolean isDoRegister(DeviceMessage message) {
return message instanceof DeviceRegisterMessage
&& message.getHeader(PropertyConstants.deviceName).isPresent()
&& message.getHeader(PropertyConstants.productId).isPresent();
}
}