feat(设备管理): 透传消息解析支持子设备会话创建

This commit is contained in:
zhouhao 2023-04-07 10:35:23 +08:00
parent 8957c8b386
commit 887bda8ebf
2 changed files with 52 additions and 26 deletions

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.gateway;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
@ -35,8 +36,11 @@ import java.util.function.Supplier;
@AllArgsConstructor
public class DeviceGatewayHelper {
@Getter
private final DeviceRegistry registry;
@Getter
private final DeviceSessionManager sessionManager;
@Getter
private final DecodedClientMessageHandler messageHandler;
public static Consumer<DeviceSession> applySessionKeepaliveTimeout(DeviceMessage msg, Supplier<Duration> timeoutSupplier) {
@ -136,6 +140,7 @@ public class DeviceGatewayHelper {
}
}));
//子设备注册
if (isDoRegister(children)) {
return this
@ -182,7 +187,7 @@ public class DeviceGatewayHelper {
if (l == 0) {
return registry
.getDevice(deviceId)
.flatMap(device -> messageHandler.handleMessage(device, message));
.flatMap(device -> handleMessage(device, message));
}
return Mono.empty();
})
@ -196,28 +201,22 @@ public class DeviceGatewayHelper {
.orElse(false);
}
//忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
if (message.getHeaderOrDefault(Headers.ignoreSession)) {
return registry
.getDevice(deviceId)
.flatMap(device -> {
if (!isDoRegister(message)) {
return messageHandler
.handleMessage(device, message)
.thenReturn(device);
}
return Mono.just(device);
});
}
if (then == null) {
then = registry.getDevice(deviceId);
}
//忽略会话管理,比如一个设备存在多种接入方式时,其中一种接入方式收到的消息设置忽略会话来防止会话冲突
if (message.getHeaderOrDefault(Headers.ignoreSession)) {
if (!isDoRegister(message)) {
return handleMessage(null, message)
.then(then);
}
return then;
}
if (doHandle) {
then = messageHandler
.handleMessage(null, message)
then = handleMessage(null, message)
.then(then);
}
@ -229,6 +228,13 @@ public class DeviceGatewayHelper {
}
private Mono<Void> handleMessage(DeviceOperator device, Message message) {
return messageHandler
.handleMessage(device, message)
//转换为empty,减少触发discard
.flatMap(ignore -> Mono.empty());
}
private Mono<DeviceSession> createOrUpdateSession(String deviceId,
DeviceMessage message,
Function<DeviceOperator, Mono<DeviceSession>> sessionBuilder,
@ -256,7 +262,7 @@ public class DeviceGatewayHelper {
() -> {
//设备注册
if (isDoRegister(message)) {
return messageHandler
return this
.handleMessage(null, message)
//延迟2秒后尝试重新获取设备并上线
.then(Mono.delay(Duration.ofSeconds(2)))

View File

@ -12,13 +12,14 @@ import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.exception.ValidationException;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DirectDeviceMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.community.OperationSource;
import org.jetlinks.community.device.entity.TransparentMessageCodecEntity;
@ -45,14 +46,19 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev
private final Map<CacheKey, TransparentMessageCodec> codecs = new NonBlockingHashMap<>();
private final DeviceGatewayHelper gatewayHelper;
public TransparentDeviceMessageConnector(@SuppressWarnings("all")
ReactiveRepository<TransparentMessageCodecEntity, String> repository,
ReactiveRepository<TransparentMessageCodecEntity, String> repository,
DecodedClientMessageHandler messageHandler,
DeviceSessionManager sessionManager,
DeviceRegistry registry,
EventBus eventBus,
ObjectProvider<TransparentMessageCodecProvider> providers) {
this.repository = repository;
this.messageHandler = messageHandler;
this.eventBus = eventBus;
this.gatewayHelper = new DeviceGatewayHelper(registry, sessionManager, messageHandler);
for (TransparentMessageCodecProvider provider : providers) {
TransparentMessageCodecProviders.addProvider(provider);
}
@ -69,10 +75,24 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev
}
return codec
.decode(message)
.flatMap(msg -> messageHandler.handleMessage(null, msg))
.flatMap(this::handleMessage)
.then();
}
private Mono<Void> handleMessage(DeviceMessage msg) {
if (msg instanceof ChildDeviceMessage || msg instanceof ChildDeviceMessageReply) {
msg.addHeader(Headers.ignoreSession, true);
return gatewayHelper
.handleDeviceMessage(
msg,
device -> null
)
.then();
}
return messageHandler.handleMessage(null, msg).then();
}
private TransparentMessageCodec getCodecOrNull(String productId, String deviceId) {
CacheKey cacheKey = new CacheKey(productId, deviceId);
TransparentMessageCodec codec = codecs.get(cacheKey);
@ -94,7 +114,7 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev
msg.addHeader("encodeBy", message.getMessageType().name());
//所有透传消息都设置为异步
msg.addHeader(Headers.async, true);
// msg.addHeader(Headers.sendAndForget, true);
// msg.addHeader(Headers.sendAndForget, true);
})
)
.defaultIfEmpty(message);
@ -110,7 +130,7 @@ public class TransparentDeviceMessageConnector implements CommandLineRunner, Dev
return provider
.createCodec(entity.getConfiguration())
.doOnNext(codec -> codecs.put(key, codec))
.contextWrite(OperationSource.ofContext(entity.getId(),null,entity))
.contextWrite(OperationSource.ofContext(entity.getId(), null, entity))
.switchIfEmpty(Mono.fromRunnable(() -> codecs.remove(key)))
.then();
}