From f6cbc93baa12dde2f57e965ac3451abc4198b59e Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Thu, 20 May 2021 17:41:38 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8C=87=E4=BB=A4=E5=8F=91?= =?UTF-8?q?=E9=80=81=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DeviceMessageSendLogInterceptor.java | 69 ++++++++++++++++--- 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java index 67ab4c85..6f09f503 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageSendLogInterceptor.java @@ -6,18 +6,24 @@ import org.hswebframework.web.logger.ReactiveLogger; import org.jetlinks.community.PropertyMetadataConstants; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.enums.ErrorCode; import org.jetlinks.core.event.EventBus; -import org.jetlinks.core.message.ChildDeviceMessage; -import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.Message; +import org.jetlinks.core.exception.DeviceOperationException; +import org.jetlinks.core.message.*; +import org.jetlinks.core.message.function.FunctionInvokeMessage; +import org.jetlinks.core.message.function.FunctionParameter; import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor; import org.jetlinks.core.message.property.WritePropertyMessage; +import org.jetlinks.core.metadata.FunctionMetadata; +import org.jetlinks.core.metadata.PropertyMetadata; +import org.jetlinks.core.metadata.ValidateResult; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Map; import java.util.function.Function; +import java.util.stream.Collectors; /** * 发送设备指令的时候,将消息推送到网关中. @@ -47,6 +53,41 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter .then(); } + private Mono convertParameterType(DeviceOperator device, FunctionInvokeMessage message) { + if (message.getHeader(Headers.force).orElse(false)) { + return Mono.just(message); + } + return device + .getMetadata() + .doOnNext(metadata -> { + FunctionMetadata function = metadata + .getFunction(message.getFunctionId()) + .orElseThrow(() -> new DeviceOperationException(ErrorCode.FUNCTION_UNDEFINED, "功能[" + message + .getFunctionId() + "]未定义")); + Map parameters = message + .getInputs() + .stream() + .collect(Collectors.toMap(FunctionParameter::getName, Function.identity())); + message.addHeaderIfAbsent(Headers.async, function.isAsync()); + for (PropertyMetadata input : function.getInputs()) { + FunctionParameter parameter = parameters.get(input.getId()); + if (parameter == null) { + continue; + } + ValidateResult result = input.getValueType().validate(parameter.getValue()); + parameter.setValue(result.assertSuccess()); + } + }) + .thenReturn(message); + } + + private Mono prepareMessage(DeviceOperator device, DeviceMessage message) { + if (message instanceof FunctionInvokeMessage) { + return convertParameterType(device, ((FunctionInvokeMessage) message)); + } + return Mono.just(message); + } + @Override public Flux afterSent(DeviceOperator device, DeviceMessage message, Flux reply) { if (message instanceof WritePropertyMessage) { @@ -79,13 +120,19 @@ public class DeviceMessageSendLogInterceptor implements DeviceMessageSenderInter @Override public Mono preSend(DeviceOperator device, DeviceMessage message) { - return this - .doPublish(message ) - .thenReturn(message) - .doOnEach(ReactiveLogger.onComplete(() -> { - if (log.isDebugEnabled()) { - log.debug("发送指令到设备[{}]:{}", message.getDeviceId(), message.toString()); - } - })); + if (message instanceof RepayableDeviceMessage) { + return this + .prepareMessage(device, message) + .flatMap(msg -> this + .doPublish(msg) + .thenReturn(msg) + .doOnEach(ReactiveLogger.onComplete(() -> { + if (log.isDebugEnabled()) { + log.debug("向设备[{}]发送指令:{}", msg.getDeviceId(), msg.toString()); + } + }))); + } else { + return Mono.just(message); + } } }