diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java index 52c39147..69538767 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/PropertyMetadataConstants.java @@ -1,5 +1,7 @@ package org.jetlinks.community; +import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.HeaderKey; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.reactor.ql.utils.CastUtils; @@ -12,12 +14,25 @@ public interface PropertyMetadataConstants { //数据来源 String id = "source"; + HeaderKey headerKey = HeaderKey.of(id, null); + //手动写值 String manual = "manual"; //规则,虚拟属性 String rule = "rule"; + static boolean isManual(DeviceMessage message) { + return message + .getHeader(PropertyMetadataConstants.Source.headerKey) + .map(PropertyMetadataConstants.Source.manual::equals) + .orElse(false); + } + + static void setManual(DeviceMessage message) { + message.addHeader(headerKey, manual); + } + /** * 判断属性是否手动赋值 * diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java index 6f09f503..6c7dd276 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java @@ -14,6 +14,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionParameter; import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; import org.jetlinks.core.message.property.WritePropertyMessage; +import org.jetlinks.core.message.property.WritePropertyMessageReply; import org.jetlinks.core.metadata.FunctionMetadata; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.ValidateResult; @@ -41,16 +42,17 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter private final DeviceRegistry registry; public Mono doPublish(Message message) { + Mono then = Mono.empty(); + if(message.getHeader(Headers.dispatchToParent).orElse(false)){ + return then; + } + if (message instanceof ChildDeviceMessage) { + then = doPublish(((ChildDeviceMessage) message).getChildDeviceMessage()); + } return DeviceMessageConnector .createDeviceMessageTopic(registry, message) - .flatMap(topic -> { - Mono publisher = eventBus.publish(topic, message).then(); - if (message instanceof ChildDeviceMessage) { - publisher = publisher.then(doPublish(((ChildDeviceMessage) message).getChildDeviceMessage())); - } - return publisher; - }) - .then(); + .flatMap(topic -> eventBus.publish(topic, message)) + .then(then); } private Mono convertParameterType(DeviceOperator device, FunctionInvokeMessage message) { @@ -85,34 +87,37 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter if (message instanceof FunctionInvokeMessage) { return convertParameterType(device, ((FunctionInvokeMessage) message)); } + if (message instanceof WritePropertyMessage) { + Map properties = ((WritePropertyMessage) message).getProperties(); + if (properties.size() == 1) { + String property = properties.keySet().iterator().next(); +// Object value = properties.values().iterator().next(); + //手动写值的属性则直接返回 + return device + .getMetadata() + .doOnNext(metadata -> metadata + .getProperty(property) + .filter(PropertyMetadataConstants.Source::isManual) + //标记手动回复 + .ifPresent(ignore -> message.addHeader(PropertyMetadataConstants.Source.headerKey, PropertyMetadataConstants.Source.manual))) + .thenReturn(message); + } + } return Mono.just(message); } @Override public Flux afterSent(DeviceOperator device, DeviceMessage message, Flux reply) { - if (message instanceof WritePropertyMessage) { - Map properties =((WritePropertyMessage) message).getProperties(); - if (properties.size() == 1) { - String property = properties.keySet().iterator().next(); - Object value = properties.values().iterator().next(); - //手动写值的属性则直接返回 - return device - .getMetadata() - .flatMap(metadata -> Mono - .justOrEmpty( - metadata - .getProperty(property) - .filter(PropertyMetadataConstants.Source::isManual) - .map(ignore -> ((WritePropertyMessage) message) - .newReply() - .addHeader("source", PropertyMetadataConstants.Source.manual) - .addProperty(property, value) - .success() - ) - )) - .map(replyMsg -> this.doPublish(replyMsg).thenReturn((R) replyMsg).flux()) - .defaultIfEmpty(reply) - .flatMapMany(Function.identity()); + //属性来源是手动 + if (PropertyMetadataConstants.Source.isManual(message)) { + if (message instanceof WritePropertyMessage) { + WritePropertyMessageReply messageReply = ((WritePropertyMessage) message).newReply(); + PropertyMetadataConstants.Source.setManual(messageReply); + ((WritePropertyMessage) message).getProperties().forEach(messageReply::addProperty); + //推送到事件总线然后进行回复 + return doPublish(messageReply) + .thenMany(Flux.just(messageReply)) + .map(r -> (R) r); } } return reply;