优化消息日志处理逻辑

This commit is contained in:
zhou-hao 2021-08-05 09:39:43 +08:00
parent 84a3418521
commit d27d54a234
2 changed files with 51 additions and 31 deletions

View File

@ -1,5 +1,7 @@
package org.jetlinks.community;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.reactor.ql.utils.CastUtils;
@ -12,12 +14,25 @@ public interface PropertyMetadataConstants {
//数据来源
String id = "source";
HeaderKey<String> headerKey = HeaderKey.of(id, null);
//手动写值
String manual = "manual";
//规则,虚拟属性
String rule = "rule";
static boolean isManual(DeviceMessage message) {
return message
.getHeader(PropertyMetadataConstants.Source.headerKey)
.map(PropertyMetadataConstants.Source.manual::equals)
.orElse(false);
}
static void setManual(DeviceMessage message) {
message.addHeader(headerKey, manual);
}
/**
* 判断属性是否手动赋值
*

View File

@ -14,6 +14,7 @@ import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.ValidateResult;
@ -41,16 +42,17 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
private final DeviceRegistry registry;
public Mono<Void> doPublish(Message message) {
Mono<Void> then = Mono.empty();
if(message.getHeader(Headers.dispatchToParent).orElse(false)){
return then;
}
if (message instanceof ChildDeviceMessage) {
then = doPublish(((ChildDeviceMessage) message).getChildDeviceMessage());
}
return DeviceMessageConnector
.createDeviceMessageTopic(registry, message)
.flatMap(topic -> {
Mono<Void> publisher = eventBus.publish(topic, message).then();
if (message instanceof ChildDeviceMessage) {
publisher = publisher.then(doPublish(((ChildDeviceMessage) message).getChildDeviceMessage()));
}
return publisher;
})
.then();
.flatMap(topic -> eventBus.publish(topic, message))
.then(then);
}
private Mono<DeviceMessage> convertParameterType(DeviceOperator device, FunctionInvokeMessage message) {
@ -85,34 +87,37 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
if (message instanceof FunctionInvokeMessage) {
return convertParameterType(device, ((FunctionInvokeMessage) message));
}
if (message instanceof WritePropertyMessage) {
Map<String, Object> properties = ((WritePropertyMessage) message).getProperties();
if (properties.size() == 1) {
String property = properties.keySet().iterator().next();
// Object value = properties.values().iterator().next();
//手动写值的属性则直接返回
return device
.getMetadata()
.doOnNext(metadata -> metadata
.getProperty(property)
.filter(PropertyMetadataConstants.Source::isManual)
//标记手动回复
.ifPresent(ignore -> message.addHeader(PropertyMetadataConstants.Source.headerKey, PropertyMetadataConstants.Source.manual)))
.thenReturn(message);
}
}
return Mono.just(message);
}
@Override
public <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply) {
if (message instanceof WritePropertyMessage) {
Map<String, Object> properties =((WritePropertyMessage) message).getProperties();
if (properties.size() == 1) {
String property = properties.keySet().iterator().next();
Object value = properties.values().iterator().next();
//手动写值的属性则直接返回
return device
.getMetadata()
.flatMap(metadata -> Mono
.justOrEmpty(
metadata
.getProperty(property)
.filter(PropertyMetadataConstants.Source::isManual)
.map(ignore -> ((WritePropertyMessage) message)
.newReply()
.addHeader("source", PropertyMetadataConstants.Source.manual)
.addProperty(property, value)
.success()
)
))
.map(replyMsg -> this.doPublish(replyMsg).thenReturn((R) replyMsg).flux())
.defaultIfEmpty(reply)
.flatMapMany(Function.identity());
//属性来源是手动
if (PropertyMetadataConstants.Source.isManual(message)) {
if (message instanceof WritePropertyMessage) {
WritePropertyMessageReply messageReply = ((WritePropertyMessage) message).newReply();
PropertyMetadataConstants.Source.setManual(messageReply);
((WritePropertyMessage) message).getProperties().forEach(messageReply::addProperty);
//推送到事件总线然后进行回复
return doPublish(messageReply)
.thenMany(Flux.just(messageReply))
.map(r -> (R) r);
}
}
return reply;