diff --git a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java index c9def9aa..6c94a356 100755 --- a/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java +++ b/jetlinks-components/network-component/mqtt-component/src/main/java/org/jetlinks/community/network/mqtt/client/MqttClientProvider.java @@ -125,12 +125,12 @@ public class MqttClientProvider implements NetworkProvider return new DefaultConfigMetadata() .add("id", "id", "", new StringType()) .add("remoteHost", "远程地址", "", new StringType()) - .add("remotePort", "远程地址", "", new IntType()) - .add("certId", "证书id", "", new StringType()) + .add("remotePort", "远程端口", "", new IntType()) + .add("certId", "证书ID", "", new StringType()) .add("secure", "开启TSL", "", new BooleanType()) - .add("clientId", "客户端ID", "", new BooleanType()) - .add("username", "用户名", "", new BooleanType()) - .add("password", "密码", "", new BooleanType()); + .add("clientId", "客户端ID", "", new StringType()) + .add("username", "用户名", "", new StringType()) + .add("password", "密码", "", new StringType()); } @Nonnull diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java index eedeb32d..ecd5e14e 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java @@ -142,24 +142,27 @@ public class DeviceTagEntity extends GenericEntity { return tag; } + public DeviceProperty toProperty() { DeviceProperty property = new DeviceProperty(); property.setProperty(getKey()); property.setDeviceId(deviceId); property.setType(type); property.setPropertyName(name); + property.setValue(parseValue()); + return property; + } + + public Object parseValue() { DataType type = Optional .ofNullable(DataTypes.lookup(getType())) .map(Supplier::get) .orElseGet(UnknownType::new); if (type instanceof Converter) { - property.setValue(((Converter) type).convert(getValue())); + return ((Converter) type).convert(getValue()); } else { - property.setValue(getValue()); + return getValue(); } - return property; - - } //以物模型标签基础数据为准,重构数据库保存的可能已过时的标签数据 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/tag/DeviceTagProperties.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/tag/DeviceTagProperties.java new file mode 100644 index 00000000..82719f3e --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/tag/DeviceTagProperties.java @@ -0,0 +1,18 @@ +package org.jetlinks.community.device.service.tag; + +import org.jetlinks.community.buffer.BufferProperties; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties(prefix = "device.tag.synchronizer") +public class DeviceTagProperties extends BufferProperties { + + public DeviceTagProperties(){ + setFilePath("./data/device-tag-synchronizer"); + setSize(500); + setParallelism(1); + getEviction().setMaxSize(100_0000); + } + +} diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/tag/DeviceTagSynchronizer.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/tag/DeviceTagSynchronizer.java new file mode 100644 index 00000000..4bb4cf17 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/tag/DeviceTagSynchronizer.java @@ -0,0 +1,224 @@ +package org.jetlinks.community.device.service.tag; + +import lombok.*; +import org.apache.commons.collections4.MapUtils; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.crud.events.EntityCreatedEvent; +import org.hswebframework.web.crud.events.EntityDeletedEvent; +import org.hswebframework.web.crud.events.EntityModifyEvent; +import org.hswebframework.web.crud.events.EntitySavedEvent; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.device.DeviceThingType; +import org.jetlinks.core.message.UpdateTagMessage; +import org.jetlinks.core.things.ThingsDataManager; +import org.jetlinks.core.utils.Reactors; +import org.jetlinks.community.buffer.BufferSettings; +import org.jetlinks.community.buffer.PersistenceBuffer; +import org.jetlinks.community.device.entity.DeviceTagEntity; +import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.community.things.data.ThingsDataWriter; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.Externalizable; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Date; +import java.util.List; +import java.util.Map; + +@Component +@RequiredArgsConstructor +public class DeviceTagSynchronizer implements CommandLineRunner { + + private final DeviceTagProperties properties; + + private final DeviceRegistry registry; + + private final ThingsDataWriter dataWriter; + + private final ThingsDataManager dataManager; + + private final ReactiveRepository tagRepository; + + public PersistenceBuffer buffer; + + @Subscribe(value = "/device/*/*/message/tags/update") + public Mono updateDeviceTag(UpdateTagMessage message) { + Map tags = message.getTags(); + if (MapUtils.isEmpty(tags)) { + return Mono.empty(); + } + String deviceId = message.getDeviceId(); + + return registry + .getDevice(deviceId) + .flatMap(DeviceOperator::getMetadata) + .flatMapMany(metadata -> Flux + .fromIterable(tags.entrySet()) + .filter(e -> e.getValue() != null) + .flatMap(e -> { + DeviceTagEntity tagEntity = metadata + .getTag(e.getKey()) + .map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue())) + .orElseGet(() -> { + DeviceTagEntity entity = new DeviceTagEntity(); + entity.setKey(e.getKey()); + entity.setType("string"); + entity.setName(e.getKey()); + entity.setCreateTime(new Date()); + entity.setDescription("设备上报"); + entity.setValue(String.valueOf(e.getValue())); + return entity; + }); + tagEntity.setTimestamp(message.getTimestamp()); + tagEntity.setDeviceId(deviceId); + tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey())); + + return dataWriter + .updateTag(DeviceThingType.device.getId(), + tagEntity.getDeviceId(), + tagEntity.getKey(), + System.currentTimeMillis(), + e.getValue()) + .then(writeBuffer(tagEntity)); + + })) + .then(); + } + + + public Mono writeBuffer(DeviceTagEntity entity) { + return buffer.writeAsync(new DeviceTagBuffer(entity)); + } + + + private Mono convertEntity(DeviceTagBuffer buffer) { + //从最新缓存中获取最新的数据,并填入准备入库的实体中 + return dataManager + .getLastTag(DeviceThingType.device.getId(), + buffer.getTag().getDeviceId(), + buffer.getTag().getKey(), + System.currentTimeMillis()) + .map(tag -> { + //缓存中的数据比buffer中的新,则更新为buffer中的数据 + if (tag.getTimestamp() >= buffer.tag.getTimestamp()) { + buffer.getTag().setTimestamp(tag.getTimestamp()); + buffer.getTag().setValue(String.valueOf(tag.getValue())); + } + return buffer.getTag(); + }) + .defaultIfEmpty(buffer.tag); + } + + public Mono handleBuffer(Flux buffer) { + + return tagRepository + .save(buffer.flatMap(this::convertEntity)) + .contextWrite(ctx -> ctx.put(DeviceTagSynchronizer.class, this)) + .then(Reactors.ALWAYS_FALSE); + } + + @EventListener + public void handleDeviceTagEvent(EntityCreatedEvent event) { + event.async(updateTag(event.getEntity())); + } + + @EventListener + public void handleDeviceTagEvent(EntitySavedEvent event) { + event.async(updateTag(event.getEntity())); + } + + @EventListener + public void handleDeviceTagEvent(EntityModifyEvent event) { + event.async(updateTag(event.getAfter())); + } + + @EventListener + public void handleDeviceTagEvent(EntityDeletedEvent event) { + event.async( + Flux + .fromIterable(event.getEntity()) + .flatMap(entity -> dataWriter + .removeTag(DeviceThingType.device.getId(), + entity.getDeviceId(), + entity.getKey()) + .then() + )); + } + + + /** + * 更新标签,界面上手动修改标签? + * + * @param entityList 标签 + * @return Void + */ + private Mono updateTag(List entityList) { + return Mono.deferContextual(ctx -> { + //更新来自消息的标签,不需要再次更新 + if (ctx.hasKey(DeviceTagSynchronizer.class)) { + return Mono.empty(); + } + return Flux + .fromIterable(entityList) + .flatMap(entity -> dataWriter + .updateTag(DeviceThingType.device.getId(), + entity.getDeviceId(), + entity.getKey(), + System.currentTimeMillis(), + entity.parseValue())) + .then(); + }); + } + + @PostConstruct + public void init() { + buffer = new PersistenceBuffer<>( + BufferSettings.create(properties), + DeviceTagBuffer::new, + this::handleBuffer) + .name("device-tag-synchronizer"); + buffer.init(); + } + + @PreDestroy + public void shutdown() { + buffer.stop(); + } + + @Override + public void run(String... args) throws Exception { + buffer.start(); + SpringApplication + .getShutdownHandlers() + .add(buffer::dispose); + } + + + @Getter + @Setter + @AllArgsConstructor + @NoArgsConstructor + public static class DeviceTagBuffer implements Externalizable { + private DeviceTagEntity tag; + + @Override + public void writeExternal(ObjectOutput out) { + tag.writeExternal(out); + } + + @Override + public void readExternal(ObjectInput in) { + tag = new DeviceTagEntity(); + tag.readExternal(in); + } + } +} diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java index 11aedc91..428f3c84 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/SceneUtils.java @@ -234,15 +234,32 @@ public class SceneUtils { } } - public static void refactorUpperKey(DeviceSelectorSpec deviceSelectorSpec) { + @SuppressWarnings("all") + public static void refactorUpperKey(Object source) { // 将变量格式改为与查询的别名一致 - if (VariableSource.Source.upper.equals(deviceSelectorSpec.getSource())) { - // scene.xx.current -> scene.scene_xx_current - if (deviceSelectorSpec.getUpperKey().startsWith("scene.")) { - String alias = SceneUtils.createColumnAlias("properties", deviceSelectorSpec.getUpperKey(), false); - deviceSelectorSpec.setUpperKey("scene." + alias); + if (source instanceof VariableSource) { + VariableSource variableSource = (VariableSource) source; + if (VariableSource.Source.upper.equals(variableSource.getSource())) { + variableSource.setUpperKey(transferSceneUpperKey(variableSource.getUpperKey())); } } + if (source instanceof Map) { + Map map = (Map) source; + VariableSource variableSource = VariableSource.of(source); + // 将变量格式改为与查询的别名一致 + if (VariableSource.Source.upper.equals(variableSource.getSource())) { + map.put("upperKey", transferSceneUpperKey(variableSource.getUpperKey())); + } + } + } + + public static String transferSceneUpperKey(String upperKey) { + // scene.xx.current -> scene.scene_xx_current + if (upperKey.startsWith("scene.")) { + String alias = SceneUtils.createColumnAlias("scene", upperKey, false); + return "scene." + alias; + } + return upperKey; } private static boolean isContainThis(String[] arr) { diff --git a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java index 2535df23..8f87afc1 100644 --- a/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java +++ b/jetlinks-manager/rule-engine-manager/src/main/java/org/jetlinks/community/rule/engine/scene/Variable.java @@ -163,11 +163,13 @@ public class Variable { } public void refactorPrefix(Variable main) { + id = SceneUtils.transferSceneUpperKey(id); if (CollectionUtils.isNotEmpty(children)) { for (Variable child : children) { if (!child.getId().startsWith(main.id + ".")) { child.setId(main.id + "." + child.getId()); } + child.setId(SceneUtils.transferSceneUpperKey(child.getId())); if (StringUtils.hasText(child.getFullName()) && StringUtils.hasText(main.getFullName())) { child.setFullName(main.getFullName() + "/" + child.getFullName());