优化指令发送处理

This commit is contained in:
zhou-hao 2021-05-20 17:41:38 +08:00
parent f19c3421b4
commit f6cbc93baa
1 changed files with 58 additions and 11 deletions

View File

@ -6,18 +6,24 @@ import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.PropertyMetadataConstants;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.core.metadata.FunctionMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.ValidateResult;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 发送设备指令的时候,将消息推送到网关中.
@ -47,6 +53,41 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
.then();
}
private Mono<DeviceMessage> convertParameterType(DeviceOperator device, FunctionInvokeMessage message) {
if (message.getHeader(Headers.force).orElse(false)) {
return Mono.just(message);
}
return device
.getMetadata()
.doOnNext(metadata -> {
FunctionMetadata function = metadata
.getFunction(message.getFunctionId())
.orElseThrow(() -> new DeviceOperationException(ErrorCode.FUNCTION_UNDEFINED, "功能[" + message
.getFunctionId() + "]未定义"));
Map<String, FunctionParameter> parameters = message
.getInputs()
.stream()
.collect(Collectors.toMap(FunctionParameter::getName, Function.identity()));
message.addHeaderIfAbsent(Headers.async, function.isAsync());
for (PropertyMetadata input : function.getInputs()) {
FunctionParameter parameter = parameters.get(input.getId());
if (parameter == null) {
continue;
}
ValidateResult result = input.getValueType().validate(parameter.getValue());
parameter.setValue(result.assertSuccess());
}
})
.thenReturn(message);
}
private Mono<DeviceMessage> prepareMessage(DeviceOperator device, DeviceMessage message) {
if (message instanceof FunctionInvokeMessage) {
return convertParameterType(device, ((FunctionInvokeMessage) message));
}
return Mono.just(message);
}
@Override
public <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply) {
if (message instanceof WritePropertyMessage) {
@ -79,13 +120,19 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter
@Override
public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
return this
.doPublish(message )
.thenReturn(message)
.doOnEach(ReactiveLogger.onComplete(() -> {
if (log.isDebugEnabled()) {
log.debug("发送指令到设备[{}]:{}", message.getDeviceId(), message.toString());
}
}));
if (message instanceof RepayableDeviceMessage) {
return this
.prepareMessage(device, message)
.flatMap(msg -> this
.doPublish(msg)
.thenReturn(msg)
.doOnEach(ReactiveLogger.onComplete(() -> {
if (log.isDebugEnabled()) {
log.debug("向设备[{}]发送指令:{}", msg.getDeviceId(), msg.toString());
}
})));
} else {
return Mono.just(message);
}
}
}