diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java index dc31fdec..81cda088 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/enums/DeviceLogType.java @@ -27,7 +27,7 @@ public enum DeviceLogType implements EnumDict { other("其它"); @JSONField(serialize = false) - private String text; + private final String text; @Override public String getValue() { @@ -48,6 +48,10 @@ public enum DeviceLogType implements EnumDict { return childReply; case REPORT_PROPERTY: return reportProperty; + case INVOKE_FUNCTION: + return functionInvoke; + case WRITE_PROPERTY: + return writeProperty; case INVOKE_FUNCTION_REPLY: return functionReply; case READ_PROPERTY_REPLY: @@ -61,6 +65,7 @@ public enum DeviceLogType implements EnumDict { default: return other; } + } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 04826df4..5390b3ab 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -35,7 +35,7 @@ public class DeviceMessageConnector private FluxSink sink = messageProcessor.sink(); //将设备注册中心到配置追加到消息header中,下游订阅者可直接使用. - private String[] appendConfigHeader = { "orgId", "productId","deviceName"}; + private String[] appendConfigHeader = {"orgId", "productId", "deviceName"}; //设备注册中心 private final DeviceRegistry registry; @@ -122,7 +122,7 @@ public class DeviceMessageConnector return Mono.just("/device/unknown/message/unknown"); } - public String createDeviceMessageTopic(Message message) { + public static String createDeviceMessageTopic(Message message) { if (message instanceof EventMessage) { //事件 EventMessage event = ((EventMessage) message); return "/message/event/".concat(event.getEvent()); @@ -145,11 +145,11 @@ public class DeviceMessageConnector } return "/message/children/reply/".concat(createDeviceMessageTopic(message)); } else if (message instanceof ReadPropertyMessage) { //读取属性 - return "/message/property/read"; + return "/message/send/property/read"; } else if (message instanceof WritePropertyMessage) { //修改属性 - return "/message/property/write"; + return "/message/send/property/write"; } else if (message instanceof FunctionInvokeMessage) { //调用功能 - return "/message/function/reply"; + return "/message/send/function"; } else if (message instanceof ReadPropertyMessageReply) { //读取属性回复 return "/message/property/read/reply"; } else if (message instanceof WritePropertyMessageReply) { //修改属性回复 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java new file mode 100644 index 00000000..810acce6 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java @@ -0,0 +1,57 @@ +package org.jetlinks.community.device.message; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.hswebframework.web.logger.ReactiveLogger; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.message.ChildDeviceMessage; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; +import org.jetlinks.community.gateway.MessageGateway; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +/** + * 发送设备指令的时候,将消息推送到网关中. + * + * @author zhouhao + * @since 1.1 + */ +@Component +@Slf4j(topic = "system.device.message.sender") +@AllArgsConstructor +public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInterceptor { + + private final MessageGateway messageGateway; + + private final DeviceRegistry registry; + + public Mono doPublish(Mono device, DeviceMessage message) { + return device + .zipWhen(DeviceOperator::getProduct) + .doOnNext(tp2 -> message.addHeader("productId", tp2.getT2().getId())) + .flatMap(tp2 -> { + String topic = DeviceMessageConnector.createDeviceMessageTopic(message); + Mono publisher = messageGateway + .publish("/device/" + tp2.getT2().getId() + "/" + tp2.getT1().getDeviceId() + topic, message) + .then(); + if (message instanceof ChildDeviceMessage) { + DeviceMessage msg = (DeviceMessage) ((ChildDeviceMessage) message).getChildDeviceMessage(); + publisher = publisher.then(doPublish(registry.getDevice(msg.getDeviceId()), msg)); + } + return publisher; + }); + } + + @Override + public Mono preSend(DeviceOperator device, DeviceMessage message) { + return doPublish(Mono.just(device), message) + .thenReturn(message) + .doOnEach(ReactiveLogger.onComplete(() -> { + if (log.isDebugEnabled()) { + log.debug("发送指令到设备[{}]:{}", message.getDeviceId(), message.toString()); + } + })); + } +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageUtils.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageUtils.java index 61cbc8f9..3585780b 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageUtils.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageUtils.java @@ -11,15 +11,7 @@ import java.util.Optional; public class DeviceMessageUtils { public static Optional convert(TopicMessage message){ - if (message.getMessage() instanceof EncodableMessage) { - Object nativeMessage = ((EncodableMessage) message.getMessage()).getNativePayload(); - if (nativeMessage instanceof DeviceMessage) { - return Optional.of((DeviceMessage)nativeMessage); - } else if (nativeMessage instanceof Map) { - return MessageType.convertMessage(((Map) nativeMessage)); - } - } - return Optional.empty(); + return org.jetlinks.community.gateway.DeviceMessageUtils.convert(message); } } diff --git a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java index e98ed00a..7c674dcd 100644 --- a/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java +++ b/jetlinks-standalone/src/main/java/org/jetlinks/community/standalone/configuration/JetLinksConfiguration.java @@ -15,6 +15,7 @@ import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.StandaloneDeviceMessageBroker; import org.jetlinks.core.message.DeviceOfflineMessage; import org.jetlinks.core.message.DeviceOnlineMessage; +import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; import org.jetlinks.core.server.MessageHandler; import org.jetlinks.core.server.monitor.GatewayServerMetrics; import org.jetlinks.core.server.monitor.GatewayServerMonitor; @@ -36,7 +37,9 @@ import org.jetlinks.supports.server.DefaultClientMessageHandler; import org.jetlinks.supports.server.DefaultDecodedClientMessageHandler; import org.jetlinks.supports.server.DefaultSendToDeviceMessageHandler; import org.jetlinks.supports.server.monitor.MicrometerGatewayServerMetrics; +import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -91,10 +94,25 @@ public class JetLinksConfiguration { } @Bean - public DeviceRegistry deviceRegistry(ProtocolSupports supports, ClusterManager manager, DeviceOperationBroker handler) { + public ClusterDeviceRegistry deviceRegistry(ProtocolSupports supports, + ClusterManager manager, + DeviceOperationBroker handler) { return new ClusterDeviceRegistry(supports, manager, handler); } + @Bean + public BeanPostProcessor interceptorRegister(ClusterDeviceRegistry registry){ + return new BeanPostProcessor() { + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if(bean instanceof DeviceMessageSenderInterceptor){ + registry.addInterceptor(((DeviceMessageSenderInterceptor) bean)); + } + return bean; + } + }; + } + @Bean(initMethod = "startup", destroyMethod = "shutdown") public DefaultMessageGateway defaultMessageGateway(@Autowired(required = false) List connectors) { DefaultMessageGateway gateway = new DefaultMessageGateway("default", "系统默认", new LocalClientSessionManager());