优化设备消息转发

This commit is contained in:
zhou-hao 2020-11-17 18:50:07 +08:00
parent 2774ab8d9c
commit 90849d57c6
7 changed files with 366 additions and 197 deletions

View File

@ -0,0 +1,43 @@
package org.jetlinks.community;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.message.HeaderKey;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* @author wangzheng
* @since 1.0
*/
public interface PropertyConstants {
Key<String> deviceName = Key.of("deviceName");
Key<String> productId = Key.of("productId");
@SuppressWarnings("all")
static <T> Optional<T> getFromMap(ConfigKey<T> key, Map<String, Object> map) {
return Optional.ofNullable((T) map.get(key.getKey()));
}
interface Key<V> extends ConfigKey<V>, HeaderKey<V> {
static <T> Key<T> of(String key) {
return new Key<T>() {
@Override
public String getKey() {
return key;
}
@Override
public T getDefaultValue() {
return null;
}
};
}
}
}

View File

@ -0,0 +1,32 @@
package org.jetlinks.community.device.configuration;
import org.jetlinks.community.device.message.DeviceMessageConnector;
import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeviceManagerConfiguration {
@Bean
public DeviceMessageConnector deviceMessageConnector(EventBus eventBus,
MessageHandler messageHandler,
DeviceSessionManager sessionManager,
DeviceRegistry registry) {
return new DeviceMessageConnector(eventBus, registry, messageHandler, sessionManager);
}
@Bean
@ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true)
public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService) {
return new TimeSeriesMessageWriterConnector(dataService);
}
}

View File

@ -5,27 +5,34 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import java.util.EnumMap;
import java.util.Map;
import java.util.Optional;
@AllArgsConstructor
@Getter
public enum DeviceLogType implements EnumDict<String> {
event("事件上报"),
readProperty("属性读取"),
writeProperty("属性修改"),
readProperty("读取属性"),
writeProperty("修改属性"),
writePropertyReply("修改属性回复"),
reportProperty("属性上报"),
readPropertyReply("读取属性回复"),
child("子设备消息"),
childReply("子设备消息回复"),
functionInvoke("调用功能"),
readPropertyReply("读取属性回复"),
writePropertyReply("修改属性回复"),
functionReply("调用功能回复"),
register("设备注册"),
unregister("设备注销"),
log("日志"),
tag("标签更新"),
offline("离线"),
online("上线"),
other("其它");
@JSONField(serialize = false)
private final String text;
@ -34,43 +41,37 @@ public enum DeviceLogType implements EnumDict<String> {
return name();
}
public static DeviceLogType of(DeviceMessage message) {
switch (message.getMessageType()) {
case EVENT:
return event;
case ONLINE:
return online;
case OFFLINE:
return offline;
case CHILD:
return child;
case CHILD_REPLY:
return childReply;
case REPORT_PROPERTY:
return reportProperty;
case READ_PROPERTY:
return readProperty;
case INVOKE_FUNCTION:
return functionInvoke;
case WRITE_PROPERTY:
return writeProperty;
case INVOKE_FUNCTION_REPLY:
return functionReply;
case READ_PROPERTY_REPLY:
return readPropertyReply;
case WRITE_PROPERTY_REPLY:
return writePropertyReply;
case REGISTER:
return register;
case UN_REGISTER:
return unregister;
default:
return other;
}
private final static Map<MessageType, DeviceLogType> typeMapping = new EnumMap<>(MessageType.class);
static {
typeMapping.put(MessageType.EVENT, event);
typeMapping.put(MessageType.ONLINE, online);
typeMapping.put(MessageType.OFFLINE, offline);
typeMapping.put(MessageType.CHILD, child);
typeMapping.put(MessageType.CHILD_REPLY, childReply);
typeMapping.put(MessageType.LOG, log);
typeMapping.put(MessageType.UPDATE_TAG, tag);
typeMapping.put(MessageType.REPORT_PROPERTY, reportProperty);
typeMapping.put(MessageType.READ_PROPERTY, readProperty);
typeMapping.put(MessageType.READ_PROPERTY_REPLY, readPropertyReply);
typeMapping.put(MessageType.INVOKE_FUNCTION, functionInvoke);
typeMapping.put(MessageType.INVOKE_FUNCTION_REPLY, functionReply);
typeMapping.put(MessageType.WRITE_PROPERTY, writeProperty);
typeMapping.put(MessageType.WRITE_PROPERTY_REPLY, writePropertyReply);
typeMapping.put(MessageType.REGISTER, register);
typeMapping.put(MessageType.UN_REGISTER, unregister);
}
public static DeviceLogType of(DeviceMessage message) {
return Optional.ofNullable(typeMapping.get(message.getMessageType())).orElse(DeviceLogType.other);
}
// @Override
// public Object getWriteJSONObject() {
// return getValue();

View File

@ -1,20 +1,21 @@
package org.jetlinks.community.device.message;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.Values;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
@ -25,67 +26,106 @@ import java.util.function.Function;
* @since 1.0
*/
@Slf4j
public class DeviceMessageConnector{
//将设备注册中心到配置追加到消息header中,下游订阅者可直接使用.
private final String[] appendConfigHeader = {"productId", "deviceName"};
public class DeviceMessageConnector implements DecodedClientMessageHandler {
//将设备注册中心的配置追加到消息header中,下游订阅者可直接使用.
private final static String[] allConfigHeader = {
PropertyConstants.productId.getKey(),
PropertyConstants.deviceName.getKey(),
};
//设备注册中心
private final DeviceRegistry registry;
private final EventBus eventBus;
private final MessageHandler messageHandler;
private final DeviceSessionManager sessionManager;
private final static BiConsumer<Throwable, Object> doOnError = (error, val) -> log.error(error.getMessage(), error);
private final Function<DeviceOperator, Mono<Values>> configGetter;
private final static Function<DeviceOperator, Mono<Values>> configGetter = operator -> operator.getSelfConfigs(allConfigHeader);
private final static Values emptyValues = Values.of(Collections.emptyMap());
public DeviceMessageConnector(EventBus eventBus,
DeviceRegistry registry) {
DeviceRegistry registry,
MessageHandler messageHandler,
DeviceSessionManager sessionManager) {
this.registry = registry;
this.eventBus = eventBus;
this.configGetter = operator -> operator.getSelfConfigs(appendConfigHeader);
this.messageHandler = messageHandler;
this.sessionManager = sessionManager;
sessionManager
.onRegister()
.flatMap(session -> {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(session.connectTime());
return onMessage(message);
})
.onErrorContinue(doOnError)
.subscribe();
sessionManager
.onUnRegister()
.flatMap(session -> {
DeviceOfflineMessage message = new DeviceOfflineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(System.currentTimeMillis());
return onMessage(message);
})
.onErrorContinue(doOnError)
.subscribe();
}
public Mono<Void> onMessage(Message message) {
if (null == message) {
return Mono.empty();
}
return this.getTopic(message)
return this
.getTopic(message)
.flatMap(topic -> eventBus.publish(topic, message).then())
.onErrorContinue(doOnError)
.then();
}
public Mono<String> getTopic(Message message) {
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
String deviceId = deviceMessage.getDeviceId();
if (deviceId == null) {
log.warn("无法从消息中获取设备ID:{}", deviceMessage);
return Mono.empty();
}
return registry
.getDevice(deviceId)
.flatMap(configGetter)
.defaultIfEmpty(emptyValues)
.flatMap(configs -> {
configs.getAllValues().forEach(deviceMessage::addHeader);
String productId = deviceMessage.getHeader("productId").map(String::valueOf).orElse("null");
String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage);
if (message instanceof ChildDeviceMessage) { //子设备消息
return onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
.thenReturn(topic);
} else if (message instanceof ChildDeviceMessageReply) { //子设备消息
return onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
.thenReturn(topic);
}
return Mono.just(topic);
});
private Flux<String> getTopic(Message message) {
Flux<String> topicsStream = createDeviceMessageTopic(registry, message);
if (message instanceof ChildDeviceMessage) { //子设备消息
return this
.onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
.thenMany(topicsStream);
} else if (message instanceof ChildDeviceMessageReply) { //子设备消息
return this
.onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
.thenMany(topicsStream);
}
return Mono.just("/device/unknown/message/unknown");
return topicsStream;
}
public static Flux<String> createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) {
return Flux.defer(() -> {
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
String deviceId = deviceMessage.getDeviceId();
if (deviceId == null) {
log.warn("无法从消息中获取设备ID:{}", deviceMessage);
return Mono.empty();
}
return deviceRegistry
.getDevice(deviceId)
.flatMap(configGetter)
.defaultIfEmpty(emptyValues)
.map(configs -> {
configs.getAllValues().forEach(deviceMessage::addHeader);
String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null");
return createDeviceMessageTopic(productId, deviceId, deviceMessage);
});
}
return Mono.just("/device/unknown/message/unknown");
});
}
public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) {
@ -99,68 +139,165 @@ public class DeviceMessageConnector{
return builder.toString();
}
public static void appendDeviceMessageTopic(Message message, StringBuilder builder) {
if (message instanceof EventMessage) { //事件
private static final BiConsumer<Message, StringBuilder>[] fastTopicBuilder;
static {
fastTopicBuilder = new BiConsumer[MessageType.values().length];
//事件
createFastBuilder(MessageType.EVENT, (message, builder) -> {
EventMessage event = ((EventMessage) message);
builder.append("/message/event/").append(event.getEvent());
} else if (message instanceof ReportPropertyMessage) { //上报属性
builder.append("/message/property/report");
} else if (message instanceof DeviceOnlineMessage) { //设备上线
builder.append("/online");
} else if (message instanceof DeviceOfflineMessage) { //设备离线
builder.append("/offline");
} else if (message instanceof ChildDeviceMessage) { //子设备消息
});
//上报属性
createFastBuilder(MessageType.REPORT_PROPERTY, "/message/property/report");
//读取属性
createFastBuilder(MessageType.READ_PROPERTY, "/message/send/property/read");
//读取属性回复
createFastBuilder(MessageType.READ_PROPERTY_REPLY, "/message/property/read/reply");
//修改属性
createFastBuilder(MessageType.WRITE_PROPERTY, "/message/send/property/write");
//修改属性回复
createFastBuilder(MessageType.WRITE_PROPERTY_REPLY, "/message/property/write/reply");
//调用功能
createFastBuilder(MessageType.INVOKE_FUNCTION, "/message/send/function");
//调用功能回复
createFastBuilder(MessageType.INVOKE_FUNCTION_REPLY, "/message/function/reply");
//注册
createFastBuilder(MessageType.REGISTER, "/register");
//注销
createFastBuilder(MessageType.UN_REGISTER, "/unregister");
//拉取固件
createFastBuilder(MessageType.REQUEST_FIRMWARE, "/firmware/pull");
//拉取固件回复
createFastBuilder(MessageType.REQUEST_FIRMWARE_REPLY, "/firmware/pull/reply");
//上报固件信息
createFastBuilder(MessageType.REPORT_FIRMWARE, "/firmware/report");
//上报固件安装进度
createFastBuilder(MessageType.UPGRADE_FIRMWARE_PROGRESS, "/firmware/progress");
//推送固件
createFastBuilder(MessageType.UPGRADE_FIRMWARE, "/firmware/push");
//推送固件回复
createFastBuilder(MessageType.UPGRADE_FIRMWARE_REPLY, "/firmware/push/reply");
//未知
createFastBuilder(MessageType.UNKNOWN, "/message/unknown");
//日志
createFastBuilder(MessageType.LOG, "/message/log");
//透传
createFastBuilder(MessageType.DIRECT, "/message/direct");
//更新标签
createFastBuilder(MessageType.UPDATE_TAG, "/message/tags/update");
//上线
createFastBuilder(MessageType.ONLINE, "/online");
//离线
createFastBuilder(MessageType.OFFLINE, "/offline");
//断开连接
createFastBuilder(MessageType.DISCONNECT, "/disconnect");
//断开连接回复
createFastBuilder(MessageType.DISCONNECT_REPLY, "/disconnect/reply");
//子设备消息
createFastBuilder(MessageType.CHILD, (message, builder) -> {
Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
if (msg instanceof DeviceMessage) {
builder.append("/message/children/")
.append(((DeviceMessage) msg).getDeviceId());
appendDeviceMessageTopic(msg, builder);
.append(((DeviceMessage) msg).getDeviceId());
} else {
builder.append("/message/children");
appendDeviceMessageTopic(message, builder);
}
} else if (message instanceof ChildDeviceMessageReply) { //子设备消息
appendDeviceMessageTopic(msg, builder);
});
//子设备消息回复
createFastBuilder(MessageType.CHILD_REPLY, (message, builder) -> {
Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
if (msg instanceof DeviceMessage) {
builder.append("/message/children/reply/")
.append(((DeviceMessage) msg).getDeviceId());
appendDeviceMessageTopic(msg, builder);
.append(((DeviceMessage) msg).getDeviceId());
} else {
builder.append("/message/children/reply");
appendDeviceMessageTopic(message, builder);
}
} else if (message instanceof ReadPropertyMessage) { //读取属性
builder.append("/message/send/property/read");
} else if (message instanceof WritePropertyMessage) { //修改属性
builder.append("/message/send/property/write");
} else if (message instanceof FunctionInvokeMessage) { //调用功能
builder.append("/message/send/function");
} else if (message instanceof ReadPropertyMessageReply) { //读取属性回复
builder.append("/message/property/read/reply");
} else if (message instanceof WritePropertyMessageReply) { //修改属性回复
builder.append("/message/property/write/reply");
} else if (message instanceof FunctionInvokeMessageReply) { //调用功能回复
builder.append("/message/function/reply");
} else if (message instanceof DeviceRegisterMessage) { //注册
builder.append("/register");
} else if (message instanceof DeviceUnRegisterMessage) { //注销
builder.append("/unregister");
} else if (message instanceof RequestFirmwareMessage) { //拉取固件请求 since 1.3
builder.append("/firmware/pull");
} else if (message instanceof RequestFirmwareMessageReply) { //拉取固件响应 since 1.3
builder.append("/firmware/pull/reply");
} else if (message instanceof ReportFirmwareMessage) { //上报固件信息 since 1.3
builder.append("/firmware/report");
} else if (message instanceof UpgradeFirmwareProgressMessage) { //上报固件更新进度 since 1.3
builder.append("/firmware/progress");
} else if (message instanceof UpgradeFirmwareMessage) { //推送固件更新 since 1.3
builder.append("/firmware/push");
} else if (message instanceof UpgradeFirmwareMessageReply) { //推送固件更新回复 since 1.3
builder.append("/firmware/push/reply");
} else if (message instanceof DirectDeviceMessage) { //透传消息 since 1.4
builder.append("/message/direct");
appendDeviceMessageTopic(msg, builder);
});
}
private static void createFastBuilder(MessageType messageType,
String topic) {
fastTopicBuilder[messageType.ordinal()] = (ignore, builder) -> builder.append(topic);
}
private static void createFastBuilder(MessageType messageType,
BiConsumer<Message, StringBuilder> builderBiConsumer) {
fastTopicBuilder[messageType.ordinal()] = builderBiConsumer;
}
public static void appendDeviceMessageTopic(Message message, StringBuilder builder) {
BiConsumer<Message, StringBuilder> fastBuilder = fastTopicBuilder[message.getMessageType().ordinal()];
if (null != fastBuilder) {
fastBuilder.accept(message, builder);
} else {
builder.append("/message/").append(message.getMessageType().name().toLowerCase());
}
}
protected Mono<Boolean> handleChildrenDeviceMessage(DeviceOperator device, String childrenId, Message message) {
if (message instanceof DeviceMessageReply) {
return doReply(((DeviceMessageReply) message));
} else if (message instanceof DeviceOnlineMessage) {
return sessionManager.registerChildren(device.getDeviceId(), childrenId)
.thenReturn(true)
.defaultIfEmpty(false);
} else if (message instanceof DeviceOfflineMessage) {
return sessionManager.unRegisterChildren(device.getDeviceId(), childrenId)
.thenReturn(true)
.defaultIfEmpty(false);
}
return Mono.just(true);
}
protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessage reply) {
return handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage());
}
protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator session, ChildDeviceMessageReply reply) {
return handleChildrenDeviceMessage(session, reply.getChildDeviceId(), reply.getChildDeviceMessage());
}
@Override
public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
return this
.onMessage(message)
.then(Mono.defer(() -> {
if (device != null) {
if (message instanceof ChildDeviceMessageReply) {
return handleChildrenDeviceMessageReply(device, ((ChildDeviceMessageReply) message));
} else if (message instanceof ChildDeviceMessage) {
return handleChildrenDeviceMessageReply(device, ((ChildDeviceMessage) message));
}
}
if (message instanceof DeviceMessageReply) {
return doReply(((DeviceMessageReply) message));
}
return Mono.just(true);
}))
.defaultIfEmpty(false);
}
private Mono<Boolean> doReply(DeviceMessageReply reply) {
if (log.isDebugEnabled()) {
log.debug("reply message {}", reply.getMessageId());
}
return messageHandler
.reply(reply)
.doOnSuccess(success -> {
if (log.isDebugEnabled()) {
log.debug("reply message {} complete", reply.getMessageId());
}
})
.thenReturn(true)
.doOnError((error) -> log.error("reply message error", error))
;
}
}

View File

@ -142,7 +142,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(DeviceMessage message) {
String productId = (String) message.getHeader("productId").orElse("null");
Consumer<DeviceOperationLogEntity> logEntityConsumer = null;
List<Publisher<Tuple2<String, TimeSeriesData>>> all = new ArrayList<>();
List<Publisher<Tuple2<String, TimeSeriesData>>> all = new ArrayList<>(2);
if (message instanceof EventMessage) {
logEntityConsumer = log -> log.setContent(JSON.toJSONString(((EventMessage) message).getData()));

View File

@ -89,6 +89,16 @@
<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>

View File

@ -1,6 +1,7 @@
package org.jetlinks.community.standalone.configuration;
import com.google.common.cache.CacheBuilder;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.guava.CaffeinatedGuava;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
@ -10,56 +11,44 @@ import org.hswebframework.web.authorization.token.UserTokenManager;
import org.hswebframework.web.authorization.token.redis.RedisUserTokenManager;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceProductEntity;
import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConnector;
import org.jetlinks.community.device.service.AutoDiscoverDeviceRegistry;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.monitor.GatewayServerMetrics;
import org.jetlinks.core.server.monitor.GatewayServerMonitor;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.community.device.message.DeviceMessageConnector;
import org.jetlinks.supports.cluster.ClusterDeviceRegistry;
import org.jetlinks.supports.cluster.redis.RedisClusterManager;
import org.jetlinks.supports.config.EventBusStorageManager;
import org.jetlinks.supports.event.BrokerEventBus;
import org.jetlinks.supports.protocol.ServiceLoaderProtocolSupports;
import org.jetlinks.supports.protocol.management.ClusterProtocolSupportManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.jetlinks.supports.server.DefaultClientMessageHandler;
import org.jetlinks.supports.server.DefaultDecodedClientMessageHandler;
import org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler;
import org.jetlinks.supports.server.monitor.MicrometerGatewayServerMetrics;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.boot.web.server.WebServerFactoryCustomizer;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.List;
import java.util.Optional;
@Configuration
@ -98,6 +87,13 @@ public class JetLinksConfiguration {
return new StandaloneDeviceMessageBroker();
}
@Bean
public EventBusStorageManager eventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
return new EventBusStorageManager(clusterManager,
eventBus,
() -> CaffeinatedGuava.build(Caffeine.newBuilder()));
}
@Bean(initMethod = "startup")
public RedisClusterManager clusterManager(JetLinksProperties properties, ReactiveRedisTemplate<Object, Object> template) {
return new RedisClusterManager(properties.getClusterName(), properties.getServerId(), template);
@ -106,8 +102,14 @@ public class JetLinksConfiguration {
@Bean
public ClusterDeviceRegistry clusterDeviceRegistry(ProtocolSupports supports,
ClusterManager manager,
ConfigStorageManager storageManager,
DeviceOperationBroker handler) {
return new ClusterDeviceRegistry(supports, manager, handler, CacheBuilder.newBuilder().build());
return new ClusterDeviceRegistry(supports,
storageManager,
manager,
handler,
CaffeinatedGuava.build(Caffeine.newBuilder()));
}
@Bean
@ -133,35 +135,6 @@ public class JetLinksConfiguration {
};
}
@Bean
public DeviceMessageConnector deviceMessageConnector(EventBus eventBus, DeviceRegistry registry) {
return new DeviceMessageConnector(eventBus, registry);
}
@Bean
@ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true)
public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService deviceDataService) {
return new TimeSeriesMessageWriterConnector(deviceDataService);
}
@Bean(destroyMethod = "shutdown")
public DefaultDecodedClientMessageHandler defaultDecodedClientMessageHandler(MessageHandler handler,
DeviceMessageConnector messageConnector,
DeviceSessionManager deviceSessionManager,
Scheduler scheduler) {
DefaultDecodedClientMessageHandler clientMessageHandler = new DefaultDecodedClientMessageHandler(handler, deviceSessionManager,
EmitterProcessor.create(false)
);
clientMessageHandler
.subscribe()
.parallel()
.runOn(scheduler)
.flatMap(msg -> messageConnector.onMessage(msg).onErrorContinue((err, r) -> log.error(err.getMessage(), err)))
.subscribe();
return clientMessageHandler;
}
@Bean(initMethod = "startup")
public DefaultSendToDeviceMessageHandler defaultSendToDeviceMessageHandler(JetLinksProperties properties,
DeviceSessionManager sessionManager,
@ -171,12 +144,6 @@ public class JetLinksConfiguration {
return new DefaultSendToDeviceMessageHandler(properties.getServerId(), sessionManager, messageHandler, registry, clientMessageHandler);
}
@Bean
public DefaultClientMessageHandler defaultClientMessageHandler(DefaultDecodedClientMessageHandler handler) {
return new DefaultClientMessageHandler(handler);
}
@Bean
public GatewayServerMonitor gatewayServerMonitor(JetLinksProperties properties, MeterRegistry registry) {
GatewayServerMetrics metrics = new MicrometerGatewayServerMetrics(properties.getServerId(), registry);
@ -198,33 +165,12 @@ public class JetLinksConfiguration {
@Bean(initMethod = "init", destroyMethod = "shutdown")
public DefaultDeviceSessionManager deviceSessionManager(JetLinksProperties properties,
GatewayServerMonitor monitor,
DeviceMessageConnector messageConnector,
DeviceRegistry registry) {
DefaultDeviceSessionManager sessionManager = new DefaultDeviceSessionManager();
sessionManager.setGatewayServerMonitor(monitor);
sessionManager.setRegistry(registry);
Optional.ofNullable(properties.getTransportLimit()).ifPresent(sessionManager::setTransportLimits);
sessionManager.onRegister()
.flatMap(session -> {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(session.connectTime());
return messageConnector.onMessage(message);
})
.onErrorContinue((err, r) -> log.error(err.getMessage(), err))
.subscribe();
sessionManager.onUnRegister()
.flatMap(session -> {
DeviceOfflineMessage message = new DeviceOfflineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(System.currentTimeMillis());
return messageConnector.onMessage(message);
})
.onErrorContinue((err, r) -> log.error(err.getMessage(), err))
.subscribe();
return sessionManager;
}