diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java index 39c401ea..4e225bf1 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/LocalFileThingsDataManager.java @@ -3,6 +3,7 @@ package org.jetlinks.community.things.data; import io.netty.buffer.*; import io.netty.util.ReferenceCountUtil; import lombok.*; +import lombok.extern.slf4j.Slf4j; import org.h2.mvstore.Cursor; import org.h2.mvstore.MVMap; import org.h2.mvstore.MVStore; @@ -11,7 +12,10 @@ import org.h2.mvstore.type.BasicDataType; import org.jetlinks.community.codec.Serializers; import org.jetlinks.core.things.ThingEvent; import org.jetlinks.core.things.ThingProperty; +import org.jetlinks.core.things.ThingTag; import org.jetlinks.core.things.ThingsDataManager; +import org.jetlinks.core.utils.NumberUtils; +import org.jetlinks.core.utils.RecyclerUtils; import org.jetlinks.core.utils.SerializeUtils; import org.jetlinks.core.utils.StringBuilderUtils; import org.jetlinks.supports.utils.MVStoreUtils; @@ -29,9 +33,9 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiConsumer; +@Slf4j public class LocalFileThingsDataManager implements ThingsDataManager, ThingsDataWriter { - private static final AtomicIntegerFieldUpdater TAG_INC = AtomicIntegerFieldUpdater.newUpdater(LocalFileThingsDataManager.class, "tagInc"); @@ -74,10 +78,14 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData this.mvStore = store; this.tagStore = mvStore.openMap("tags"); this.tagInc = this.tagStore.size(); - this.historyStore = mvStore - .openMap("store", new MVMap - .Builder() - .valueType(new HistoryType())); + this.historyStore = MVStoreUtils + .openMap( + mvStore, + "store", + new MVMap + .Builder() + .valueType(new HistoryType()) + ); } public void shutdown() { @@ -92,8 +100,7 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData historyStore.put(entry.getKey(), entry.getValue()); } } - mvStore.compactFile(60_000); - mvStore.close(60_000); + mvStore.close(-1); } @Override @@ -335,8 +342,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData Property p = new Property(); p.setTime(timestamp); - p.setValue(value); - p.setState(state); + p.setValue(tryIntern(value)); + p.setState(RecyclerUtils.intern(state)); propertyStore.update(p); propertyStore.tryStore(key.toTag(), historyStore::put); } @@ -365,6 +372,16 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData return this.updateProperty(thingType, thingId, createEventProperty(eventId), timestamp, data, null); } + @Nonnull + @Override + public Mono updateTag(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String tagKey, + long timestamp, + @Nonnull Object tagValue) { + return this.updateProperty(thingType, thingId, createTagProperty(tagKey), timestamp, tagValue, null); + } + @Nonnull @Override public Mono removeProperties(@Nonnull String thingType, @Nonnull String thingId) { @@ -390,6 +407,14 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData return this.removeProperty(thingType, thingId, createEventProperty(eventId)); } + @Nonnull + @Override + public Mono removeTag(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String tagKey) { + return this.removeProperty(thingType, thingId, createTagProperty(tagKey)); + } + @Nonnull @Override public Mono removeProperty(@Nonnull String thingType, @@ -449,6 +474,51 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData } } + @Override + public Mono getLastTag(String thingType, + String thingId, + String tag, + long baseTime) { + String eventKey = createTagProperty(tag); + PropertyHistory propertyStore = getHistory(thingType, thingId, eventKey); + if (propertyStore == null) { + return Mono.empty(); + } + Property pro = propertyStore.getProperty(baseTime); + if (pro == null) { + return Mono.empty(); + } + return pro + .toProperty(eventKey) + .map(PropertyThingTag::new); + } + + protected String createTagProperty(String tag) { + return "t@" + tag; + } + + @AllArgsConstructor + private static class PropertyThingTag implements ThingTag { + private final ThingProperty property; + + @Override + public String getTag() { + return property + .getProperty() + .substring(2); + } + + @Override + public long getTimestamp() { + return property.getTimestamp(); + } + + @Override + public Object getValue() { + return property.getValue(); + } + } + @AllArgsConstructor @EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY) protected static class StoreKey { @@ -536,8 +606,13 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeShort(refs.size()); - for (Property ref : refs.values()) { + Collection properties = refs.values(); + int size = Math.min(properties.size(), DEFAULT_MAX_STORE_SIZE_EACH_KEY); + out.writeShort(size); + for (Property ref : properties) { + if (size-- == 0) { + break; + } ref.writeExternal(out); } out.writeBoolean(first != null); @@ -575,6 +650,28 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData } } + public static T tryIntern(T val) { + + if (val instanceof Number) { + if (NumberUtils.isIntNumber(((Number) val))) { + int v = ((Number) val).intValue(); + if (v > Short.MIN_VALUE && v < Short.MAX_VALUE) { + return RecyclerUtils.intern(val); + } + } else { + double v = ((Number) val).doubleValue(); + //缓存2位小数 + if (v > Byte.MIN_VALUE && + v < Byte.MAX_VALUE && + v * 1000 == (int) (v * 1000)) { + return RecyclerUtils.intern(val); + } + } + } + + return val; + } + @Getter @Setter public static class Property implements Externalizable { @@ -626,8 +723,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData @Override public void readExternal(ObjectInput in) throws IOException { time = in.readLong(); - state = (String) SerializeUtils.readObject(in); - value = SerializeUtils.readObject(in); + state = (String) tryIntern(SerializeUtils.readObject(in)); + this.value = tryIntern(SerializeUtils.readObject(in)); } @Override @@ -670,6 +767,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData output.flush(); buff.put(buffer.nioBuffer()); + } catch (Throwable err) { + log.warn("write thing data error", err); } finally { ReferenceCountUtil.safeRelease(buffer); } @@ -683,12 +782,17 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData try (ObjectOutput output = createOutput(buffer)) { for (int i = 0; i < len; i++) { - ((PropertyHistory) Array.get(obj, i)).writeExternal(output); + PropertyHistory history = ((PropertyHistory) Array.get(obj, i)); + if (history != null) { + history.writeExternal(output); + } } output.flush(); buff.put(buffer.nioBuffer()); + } catch (Throwable err) { + log.warn("write thing data error", err); } finally { ReferenceCountUtil.safeRelease(buffer); } @@ -703,6 +807,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData data.readExternal(input); Array.set(obj, i, data); } + } catch (Throwable err) { + log.warn("read thing data error", err); } } @@ -717,6 +823,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData PropertyHistory data = new PropertyHistory(); try (ObjectInput input = createInput(Unpooled.wrappedBuffer(buff))) { data.readExternal(input); + } catch (Throwable err) { + log.warn("read thing data error", err); } return data; } diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java index 6132042e..422ae132 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/ThingsDataWriter.java @@ -27,6 +27,13 @@ public interface ThingsDataWriter { long timestamp, @Nonnull Object data); + @Nonnull + Mono updateTag(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String tagKey, + long timestamp, + @Nonnull Object tagValue); + @Nonnull Mono removeProperty(@Nonnull String thingType, @Nonnull String thingId, @@ -40,4 +47,9 @@ public interface ThingsDataWriter { Mono removeEvent(@Nonnull String thingType, @Nonnull String thingId, @Nonnull String eventId); + + @Nonnull + Mono removeTag(@Nonnull String thingType, + @Nonnull String thingId, + @Nonnull String tagKey); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java index ec269d41..34d605f3 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/entity/DeviceTagEntity.java @@ -14,9 +14,13 @@ import org.hswebframework.web.crud.generator.Generators; import org.hswebframework.web.validator.CreateGroup; import org.jetlinks.core.metadata.Converter; import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.DeviceMetadata; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.types.ArrayType; +import org.jetlinks.core.metadata.types.DataTypes; import org.jetlinks.core.metadata.types.ObjectType; +import org.jetlinks.core.metadata.types.UnknownType; +import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec; import javax.persistence.Column; import javax.persistence.Index; @@ -24,6 +28,10 @@ import javax.persistence.Table; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import java.util.Date; +import java.util.Optional; +import java.util.Set; +import java.util.function.Supplier; +import java.util.stream.Collectors; @Getter @Setter @@ -39,7 +47,7 @@ public class DeviceTagEntity extends GenericEntity { @Schema(description = "设备ID") private String deviceId; - @Column(length = 32, updatable = false, nullable = false) + @Column(length = 64, updatable = false, nullable = false) @NotBlank(message = "[key]不能为空", groups = CreateGroup.class) @Schema(description = "标签标识") private String key; @@ -71,7 +79,7 @@ public class DeviceTagEntity extends GenericEntity { private DataType dataType; public static DeviceTagEntity of(PropertyMetadata property) { - DeviceTagEntity entity = new DeviceTagEntity(); + DeviceTagEntity entity = new DeviceTagEntity(); entity.setKey(property.getId()); entity.setName(property.getName()); entity.setType(property.getValueType().getId()); @@ -81,17 +89,6 @@ public class DeviceTagEntity extends GenericEntity { return entity; } - - //以物模型标签基础数据为准,重构数据库保存的可能已过时的标签数据 - public DeviceTagEntity restructure(DeviceTagEntity tag) { - this.setDataType(tag.getDataType()); - this.setName(tag.getName()); - this.setType(tag.getType()); - this.setKey(tag.getKey()); - this.setDescription(tag.getDescription()); - return this; - } - public static DeviceTagEntity of(PropertyMetadata property, Object value) { DeviceTagEntity tag = of(property); @@ -118,8 +115,59 @@ public class DeviceTagEntity extends GenericEntity { return tag; } + public DeviceProperty toProperty() { + DeviceProperty property = new DeviceProperty(); + property.setProperty(getKey()); + property.setDeviceId(deviceId); + property.setType(type); + property.setPropertyName(name); + DataType type = Optional + .ofNullable(DataTypes.lookup(getType())) + .map(Supplier::get) + .orElseGet(UnknownType::new); + if (type instanceof Converter) { + property.setValue(((Converter) type).convert(getValue())); + } else { + property.setValue(getValue()); + } + return property; + + + } + + //以物模型标签基础数据为准,重构数据库保存的可能已过时的标签数据 + public DeviceTagEntity restructure(DeviceTagEntity tag) { + this.setDataType(tag.getDataType()); + this.setName(tag.getName()); + this.setType(tag.getType()); + this.setKey(tag.getKey()); + this.setDescription(tag.getDescription()); + return this; + } + + public void generateId() { + setId(createTagId(deviceId, key)); + } public static String createTagId(String deviceId, String key) { return DigestUtils.md5Hex(deviceId + ":" + key); } + + public static Set parseTagKey(String metadata) { + return JetLinksDeviceMetadataCodec + .getInstance() + .doDecode(metadata) + .getTags() + .stream() + .map(PropertyMetadata::getId) + .collect(Collectors.toSet()); + } + + public static Set parseTagKey(DeviceMetadata metadata) { + return metadata + .getTags() + .stream() + .map(PropertyMetadata::getId) + .collect(Collectors.toSet()); + } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceTagHandler.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceTagHandler.java new file mode 100644 index 00000000..9a827eb7 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/DeviceTagHandler.java @@ -0,0 +1,166 @@ +package org.jetlinks.community.device.service; + +import lombok.AllArgsConstructor; +import org.hswebframework.ezorm.core.dsl.Query; +import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository; +import org.hswebframework.web.crud.events.EntityCreatedEvent; +import org.hswebframework.web.crud.events.EntityPrepareModifyEvent; +import org.hswebframework.web.crud.events.EntityPrepareSaveEvent; +import org.hswebframework.web.crud.events.EntitySavedEvent; +import org.jetlinks.core.device.DeviceProductOperator; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.device.DeviceThingType; +import org.jetlinks.community.device.entity.DeviceInstanceEntity; +import org.jetlinks.community.device.entity.DeviceProductEntity; +import org.jetlinks.community.device.entity.DeviceTagEntity; +import org.jetlinks.community.device.service.term.DeviceInstanceTerm; +import org.jetlinks.community.things.data.ThingsDataWriter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; + +/** + * 设备标签自动删除. + * + * @author zhangji 2023/3/28 + */ +@Component +@ConditionalOnProperty(prefix = "jetlinks.tag.auto-delete", name = "enabled", havingValue = "true", matchIfMissing = true) +@AllArgsConstructor +public class DeviceTagHandler { + + private final DeviceRegistry deviceRegistry; + + private final ReactiveRepository tagRepository; + + private final ThingsDataWriter dataWriter; + + /** + * 更新设备物模型时,若删除了标签,自动删除设备标签 + */ + @EventListener + public void handleDeviceEvent(EntityPrepareSaveEvent event) { + event.async(deleteTagByDevice(event.getEntity())); + } + + /** + * 更新设备物模型时,若删除了标签,自动删除设备标签 + */ + @EventListener + public void handleDeviceEvent(EntityPrepareModifyEvent event) { + event.async(deleteTagByDevice(event.getAfter())); + } + + private Mono deleteTagByDevice(List entity) { + return Flux + .fromIterable(entity) + .flatMap(device -> deviceRegistry + .getDevice(device.getId()) + .flatMap(operator -> Mono + .zip( + // tp1:旧的标签 + operator.getMetadata() + .map(DeviceTagEntity::parseTagKey), + // tp2:设备ID + Mono.just(operator.getId()), + // tp3:新的标签 + Mono.justOrEmpty(device.getDeriveMetadata()) + // 设备物模型为空,则获取产品物模型 + .filter(StringUtils::hasText) + .map(metadata -> DeviceTagEntity.parseTagKey(device.getDeriveMetadata())) + .switchIfEmpty(operator + .getProduct() + .flatMap(DeviceProductOperator::getMetadata) + .map(DeviceTagEntity::parseTagKey)) + )) + .flatMapMany(tp3 -> Flux + .fromIterable(tp3.getT1()) + .filter(tag -> !tp3.getT3().contains(tag)) + .map(tag -> DeviceTagEntity.createTagId(tp3.getT2(), tag)) + )) + .as(tagRepository::deleteById) + .then(); + } + + /** + * 更新产品物模型时,若删除了标签,自动删除设备标签 + */ + @EventListener + public void autoUpdateDeviceTag(EntityPrepareSaveEvent event) { + event.async(deleteTagByProduct(event.getEntity())); + } + + /** + * 更新产品物模型时,若删除了标签,自动删除设备标签 + */ + @EventListener + public void autoUpdateDeviceTag(EntityPrepareModifyEvent event) { + event.async(deleteTagByProduct(event.getAfter())); + } + + private Mono deleteTagByProduct(List entity) { + return Flux.fromIterable(entity) + .flatMap(product -> deviceRegistry + .getProduct(product.getId()) + .flatMap(productOperator -> Mono + .zip( + // tp1:旧的产品物模型 + productOperator + .getMetadata() + .map(DeviceTagEntity::parseTagKey), + // tp2:新的产品物模型 + Mono.just(DeviceTagEntity.parseTagKey(product.getMetadata())) + , (oldTags, newTags) -> { + oldTags.removeAll(newTags); + return oldTags; + })) + .filter(list -> !CollectionUtils.isEmpty(list)) + .flatMap(tag -> tagRepository + .createDelete() + .in(DeviceTagEntity::getKey, tag) + .and( + DeviceTagEntity::getDeviceId, + DeviceInstanceTerm.termType, + Query + .of() + .is(DeviceInstanceEntity::getProductId, product.getId()) + .getParam() + .getTerms()) + .execute())) + .then(); + } + + @EventListener + public void handleDeviceTagEvent(EntityCreatedEvent event) { + event.async(updateTag(event.getEntity())); + } + + @EventListener + public void handleDeviceTagEvent(EntitySavedEvent event) { + event.async(updateTag(event.getEntity())); + } + + /** + * 更新标签消息 + * + * @param entityList 标签 + * @return Void + */ + private Mono updateTag(List entityList) { + return Flux + .fromIterable(entityList) + .flatMap(entity -> dataWriter + .updateTag(DeviceThingType.device.getId(), + entity.getDeviceId(), + entity.getKey(), + System.currentTimeMillis(), + entity.getValue())) + .then(); + } +}