From 331f31a62b87036b027141146ccf25fc6512b5af Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 28 Jul 2021 14:45:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AE=BE=E5=A4=87=E6=8C=87?= =?UTF-8?q?=E4=BB=A4=E5=8F=91=E9=80=81=E8=A7=84=E5=88=99=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...DeviceMessageSendTaskExecutorProvider.java | 141 ++++++++++++++---- .../engine/model/DeviceAlarmModelParser.java | 4 +- 2 files changed, 110 insertions(+), 35 deletions(-) diff --git a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java index 5260e15e..195d99b8 100644 --- a/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java +++ b/jetlinks-components/rule-engine-component/src/main/java/org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.java @@ -9,10 +9,9 @@ import org.hswebframework.web.utils.ExpressionUtils; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceRegistry; -import org.jetlinks.core.message.DeviceMessageReply; -import org.jetlinks.core.message.Headers; -import org.jetlinks.core.message.MessageType; -import org.jetlinks.core.message.RepayableDeviceMessage; +import org.jetlinks.core.enums.ErrorCode; +import org.jetlinks.core.exception.DeviceOperationException; +import org.jetlinks.core.message.*; import org.jetlinks.core.message.function.FunctionInvokeMessage; import org.jetlinks.core.message.function.FunctionParameter; import org.jetlinks.core.message.property.ReadPropertyMessage; @@ -33,9 +32,11 @@ import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; @Component @@ -56,26 +57,36 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid class DeviceMessageSendTaskExecutor extends FunctionTaskExecutor { - private Config config; + private DeviceMessageSendConfig config; + + private Function, Flux> selector; public DeviceMessageSendTaskExecutor(ExecutionContext context) { super("发送设备消息", context); - validate(); reload(); } + protected Flux selectDevice(Map ctx) { + return selector.apply(ctx); + } + @Override protected Publisher apply(RuleData input) { - Flux devices = StringUtils.hasText(config.getDeviceId()) - ? registry.getDevice(config.getDeviceId()).flux() - : registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices); Map ctx = RuleDataHelper.toContextMap(input); - return devices - .filterWhen(DeviceOperator::isOnline) - .publishOn(Schedulers.parallel()) - .flatMap(device -> config.doSend(ctx, device)) - .onErrorResume(error -> context.onError(error, input).then(Mono.empty())) - .map(reply -> input.newData(reply.toJson())) + + Flux readySendDevice = + "ignoreOffline".equals(config.getStateOperator()) + ? selectDevice(ctx).filterWhen(DeviceOperator::isOnline) + : selectDevice(ctx); + + return readySendDevice + .switchIfEmpty(context.onError(() -> new DeviceOperationException(ErrorCode.SYSTEM_ERROR, "无可用设备"), input)) + .flatMap(device -> config + .doSend(ctx, context, device, input) + .onErrorResume(error -> context.onError(error, input)) + .subscribeOn(Schedulers.parallel()) + ) + .map(reply -> context.newRuleData(input.newData(reply.toJson()))) ; } @@ -84,22 +95,37 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid if (CollectionUtils.isEmpty(context.getJob().getConfiguration())) { throw new IllegalArgumentException("配置不能为空"); } - Config config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config()); - config.validate(); + FastBeanCopier.copy(context.getJob().getConfiguration(), new DeviceMessageSendConfig()).validate(); } @Override public void reload() { - config = FastBeanCopier.copy(context.getJob().getConfiguration(), new Config()); + config = FastBeanCopier.copy(context.getJob().getConfiguration(), new DeviceMessageSendConfig()); + config.validate(); + if (StringUtils.hasText(config.deviceId)) { + selector = ctx -> registry.getDevice(config.getDeviceId()).flux(); + } else if (StringUtils.hasText(config.productId)) { + selector = ctx -> registry.getProduct(config.getProductId()).flatMapMany(DeviceProductOperator::getDevices); + } else { + if (config.isFixed() && config.getMessage() != null) { + selector = ctx -> registry.getDevice((String) config.getMessage().get("deviceId")).flux(); + } else { + selector = ctx -> registry + .getDevice((String) ctx + .getOrDefault("deviceId", + config.getMessage() == null + ? null + : config.getMessage().get("deviceId"))) + .flux(); + } + } } - } - @Getter @Setter - public static class Config { + public static class DeviceMessageSendConfig { //设备ID private String deviceId; @@ -107,21 +133,55 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid //产品ID private String productId; + //消息来源: pre-node(上游节点),fixed(固定消息) + private String from; + + private Duration timeout = Duration.ofSeconds(10); + private Map message; private boolean async; + private String waitType = "sync"; + + private String stateOperator = "ignoreOffline"; + + public Map toMap() { + Map conf = FastBeanCopier.copy(this, new HashMap<>()); + conf.put("timeout", timeout.toString()); + return conf; + } + @SuppressWarnings("all") - public Publisher doSend(Map ctx, DeviceOperator device) { - Map message = new HashMap<>(this.message); + public Flux doSend(Map ctx, + ExecutionContext context, + DeviceOperator device, + RuleData input) { + Map message = new HashMap<>("pre-node".equals(from) ? ctx : this.message); message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate()); message.put("deviceId", device.getDeviceId()); + message.put("timestamp", System.currentTimeMillis()); return Mono .justOrEmpty(MessageType.convertMessage(message)) - .cast(RepayableDeviceMessage.class) + .switchIfEmpty(context.onError(() -> new DeviceOperationException(ErrorCode.UNSUPPORTED_MESSAGE), input)) + .cast(DeviceMessage.class) .map(msg -> applyMessageExpression(ctx, msg)) - .doOnNext(msg -> msg.addHeader(Headers.async, async)) - .flatMapMany(msg -> device.messageSender().send(Mono.just(msg))); + .doOnNext(msg -> msg + .addHeader(Headers.async, async || !"sync".equals(waitType)) + .addHeader(Headers.sendAndForget, "forget".equals(waitType)) + .addHeader(Headers.timeout, timeout.toMillis())) + .flatMapMany(msg -> "forget".equals(waitType) + ? device.messageSender().send(msg).then(Mono.empty()) + : device.messageSender() + .send(msg) + .onErrorResume(err -> { + //失败尝试转为消息回复 + if (msg instanceof RepayableDeviceMessage) { + return Mono.just(((RepayableDeviceMessage) msg).newReply().error(err)); + } + return Mono.error(err); + }) + ); } private ReadPropertyMessage applyMessageExpression(Map ctx, ReadPropertyMessage message) { @@ -129,7 +189,10 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid if (!CollectionUtils.isEmpty(properties)) { message.setProperties( - properties.stream().map(prop -> ExpressionUtils.analytical(prop, ctx, "spel")).collect(Collectors.toList()) + properties + .stream() + .map(prop -> ExpressionUtils.analytical(prop, ctx, "spel")) + .collect(Collectors.toList()) ); } @@ -141,7 +204,8 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid if (!CollectionUtils.isEmpty(properties)) { message.setProperties( - properties.entrySet() + properties + .entrySet() .stream() .map(prop -> Tuples.of(prop.getKey(), ExpressionUtils.analytical(String.valueOf(prop.getValue()), ctx, "spel"))) .collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2)) @@ -156,14 +220,17 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid if (!CollectionUtils.isEmpty(inputs)) { for (FunctionParameter input : inputs) { - input.setValue(ExpressionUtils.analytical(String.valueOf(input.getValue()), ctx, "spel")); + String stringVal = String.valueOf(input.getValue()); + if (stringVal.contains("$")) { + input.setValue(ExpressionUtils.analytical(stringVal, ctx, "spel")); + } } } return message; } - private RepayableDeviceMessage applyMessageExpression(Map ctx, RepayableDeviceMessage message) { + private DeviceMessage applyMessageExpression(Map ctx, DeviceMessage message) { if (message instanceof ReadPropertyMessage) { return applyMessageExpression(ctx, ((ReadPropertyMessage) message)); } @@ -176,11 +243,19 @@ public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvid return message; } + private boolean isFixed() { + return "fixed".equals(from); + } + + private boolean isPreNode() { + return "pre-node".equals(from); + } + + public void validate() { - if (StringUtils.isEmpty(deviceId) && StringUtils.isEmpty(productId)) { - throw new IllegalArgumentException("deviceId和productId不能同时为空"); + if ("fixed".equals(from)) { + MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式")); } - MessageType.convertMessage(message).orElseThrow(() -> new IllegalArgumentException("不支持的消息格式")); } } diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java index dcb0638d..37e5c28a 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/model/DeviceAlarmModelParser.java @@ -57,7 +57,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy { timer.setExecutor("timer"); timer.setConfiguration(Collections.singletonMap("cron", timerTrigger.getCron())); - DeviceMessageSendTaskExecutorProvider.Config senderConfig = new DeviceMessageSendTaskExecutorProvider.Config(); + DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig senderConfig = new DeviceMessageSendTaskExecutorProvider.DeviceMessageSendConfig(); senderConfig.setAsync(true); senderConfig.setDeviceId(alarmRule.getDeviceId()); senderConfig.setProductId(alarmRule.getProductId()); @@ -67,7 +67,7 @@ public class DeviceAlarmModelParser implements RuleModelParserStrategy { messageSender.setId("message-sender:" + (++index)); messageSender.setName("定时发送设备消息"); messageSender.setExecutor("device-message-sender"); - messageSender.setConfiguration(FastBeanCopier.copy(senderConfig, new HashMap<>())); + messageSender.setConfiguration(senderConfig.toMap()); RuleLink link = new RuleLink(); link.setId(timer.getId().concat(":").concat(messageSender.getId()));