优化设备消息转发到机构topic

This commit is contained in:
zhou-hao 2020-11-30 18:51:03 +08:00
parent c728da0e4f
commit 0438fe7167
2 changed files with 12 additions and 5 deletions

View File

@ -12,6 +12,7 @@ import java.util.Optional;
* @since 1.0
*/
public interface PropertyConstants {
Key<String> orgId = Key.of("orgId");
Key<String> deviceName = Key.of("deviceName");

View File

@ -15,7 +15,9 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
@ -32,6 +34,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
private final static String[] allConfigHeader = {
PropertyConstants.productId.getKey(),
PropertyConstants.deviceName.getKey(),
PropertyConstants.orgId.getKey()
};
//设备注册中心
@ -41,8 +44,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
private final MessageHandler messageHandler;
private final DeviceSessionManager sessionManager;
private final static BiConsumer<Throwable, Object> doOnError = (error, val) -> log.error(error.getMessage(), error);
private final static Function<DeviceOperator, Mono<Values>> configGetter = operator -> operator.getSelfConfigs(allConfigHeader);
@ -56,7 +57,6 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
this.registry = registry;
this.eventBus = eventBus;
this.messageHandler = messageHandler;
this.sessionManager = sessionManager;
sessionManager
.onRegister()
.flatMap(session -> {
@ -118,10 +118,16 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
.getDevice(deviceId)
.flatMap(configGetter)
.defaultIfEmpty(emptyValues)
.map(configs -> {
.flatMapIterable(configs -> {
configs.getAllValues().forEach(deviceMessage::addHeader);
String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null");
return createDeviceMessageTopic(productId, deviceId, deviceMessage);
String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage);
List<String> topics = new ArrayList<>(2);
topics.add(topic);
configs.getValue(PropertyConstants.orgId)
.ifPresent(orgId -> topics.add("/org/" + orgId + topic));
return topics;
});
}
return Mono.just("/device/unknown/message/unknown");