优化消息日志

This commit is contained in:
zhou-hao 2020-04-24 17:16:17 +08:00
parent 3bfb1219d8
commit db541ce3bf
5 changed files with 88 additions and 16 deletions

View File

@ -27,7 +27,7 @@ public enum DeviceLogType implements EnumDict<String> {
other("其它");
@JSONField(serialize = false)
private String text;
private final String text;
@Override
public String getValue() {
@ -48,6 +48,10 @@ public enum DeviceLogType implements EnumDict<String> {
return childReply;
case REPORT_PROPERTY:
return reportProperty;
case INVOKE_FUNCTION:
return functionInvoke;
case WRITE_PROPERTY:
return writeProperty;
case INVOKE_FUNCTION_REPLY:
return functionReply;
case READ_PROPERTY_REPLY:
@ -61,6 +65,7 @@ public enum DeviceLogType implements EnumDict<String> {
default:
return other;
}
}

View File

@ -35,7 +35,7 @@ public class DeviceMessageConnector
private FluxSink<TopicMessage> sink = messageProcessor.sink();
//将设备注册中心到配置追加到消息header中,下游订阅者可直接使用.
private String[] appendConfigHeader = { "orgId", "productId","deviceName"};
private String[] appendConfigHeader = {"orgId", "productId", "deviceName"};
//设备注册中心
private final DeviceRegistry registry;
@ -122,7 +122,7 @@ public class DeviceMessageConnector
return Mono.just("/device/unknown/message/unknown");
}
public String createDeviceMessageTopic(Message message) {
public static String createDeviceMessageTopic(Message message) {
if (message instanceof EventMessage) { //事件
EventMessage event = ((EventMessage) message);
return "/message/event/".concat(event.getEvent());
@ -145,11 +145,11 @@ public class DeviceMessageConnector
}
return "/message/children/reply/".concat(createDeviceMessageTopic(message));
} else if (message instanceof ReadPropertyMessage) { //读取属性
return "/message/property/read";
return "/message/send/property/read";
} else if (message instanceof WritePropertyMessage) { //修改属性
return "/message/property/write";
return "/message/send/property/write";
} else if (message instanceof FunctionInvokeMessage) { //调用功能
return "/message/function/reply";
return "/message/send/function";
} else if (message instanceof ReadPropertyMessageReply) { //读取属性回复
return "/message/property/read/reply";
} else if (message instanceof WritePropertyMessageReply) { //修改属性回复

View File

@ -0,0 +1,57 @@
package org.jetlinks.community.device.message;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.community.gateway.MessageGateway;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
/**
* 发送设备指令的时候,将消息推送到网关中.
*
* @author zhouhao
* @since 1.1
*/
@Component
@Slf4j(topic = "system.device.message.sender")
@AllArgsConstructor
public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInterceptor {
private final MessageGateway messageGateway;
private final DeviceRegistry registry;
public Mono<Void> doPublish(Mono<DeviceOperator> device, DeviceMessage message) {
return device
.zipWhen(DeviceOperator::getProduct)
.doOnNext(tp2 -> message.addHeader("productId", tp2.getT2().getId()))
.flatMap(tp2 -> {
String topic = DeviceMessageConnector.createDeviceMessageTopic(message);
Mono<Void> publisher = messageGateway
.publish("/device/" + tp2.getT2().getId() + "/" + tp2.getT1().getDeviceId() + topic, message)
.then();
if (message instanceof ChildDeviceMessage) {
DeviceMessage msg = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage();
publisher = publisher.then(doPublish(registry.getDevice(msg.getDeviceId()), msg));
}
return publisher;
});
}
@Override
public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
return doPublish(Mono.just(device), message)
.thenReturn(message)
.doOnEach(ReactiveLogger.onComplete(() -> {
if (log.isDebugEnabled()) {
log.debug("发送指令到设备[{}]:{}", message.getDeviceId(), message.toString());
}
}));
}
}

View File

@ -11,15 +11,7 @@ import java.util.Optional;
public class DeviceMessageUtils {
public static Optional<DeviceMessage> convert(TopicMessage message){
if (message.getMessage() instanceof EncodableMessage) {
Object nativeMessage = ((EncodableMessage) message.getMessage()).getNativePayload();
if (nativeMessage instanceof DeviceMessage) {
return Optional.of((DeviceMessage)nativeMessage);
} else if (nativeMessage instanceof Map) {
return MessageType.convertMessage(((Map<String, Object>) nativeMessage));
}
}
return Optional.empty();
return org.jetlinks.community.gateway.DeviceMessageUtils.convert(message);
}
}

View File

@ -15,6 +15,7 @@ import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.monitor.GatewayServerMetrics;
import org.jetlinks.core.server.monitor.GatewayServerMonitor;
@ -36,7 +37,9 @@ import org.jetlinks.supports.server.DefaultClientMessageHandler;
import org.jetlinks.supports.server.DefaultDecodedClientMessageHandler;
import org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler;
import org.jetlinks.supports.server.monitor.MicrometerGatewayServerMetrics;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -91,10 +94,25 @@ public class JetLinksConfiguration {
}
@Bean
public DeviceRegistry deviceRegistry(ProtocolSupports supports, ClusterManager manager, DeviceOperationBroker handler) {
public ClusterDeviceRegistry deviceRegistry(ProtocolSupports supports,
ClusterManager manager,
DeviceOperationBroker handler) {
return new ClusterDeviceRegistry(supports, manager, handler);
}
@Bean
public BeanPostProcessor interceptorRegister(ClusterDeviceRegistry registry){
return new BeanPostProcessor() {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if(bean instanceof DeviceMessageSenderInterceptor){
registry.addInterceptor(((DeviceMessageSenderInterceptor) bean));
}
return bean;
}
};
}
@Bean(initMethod = "startup", destroyMethod = "shutdown")
public DefaultMessageGateway defaultMessageGateway(@Autowired(required = false) List<MessageConnector> connectors) {
DefaultMessageGateway gateway = new DefaultMessageGateway("default", "系统默认", new LocalClientSessionManager());