refactor: ThingsDataManager增加标签实现

This commit is contained in:
zhouhao 2024-05-30 18:25:26 +08:00
parent 3d722235c0
commit 173680bd92
4 changed files with 361 additions and 27 deletions

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.things.data;
import io.netty.buffer.*;
import io.netty.util.ReferenceCountUtil;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
@ -11,7 +12,10 @@ import org.h2.mvstore.type.BasicDataType;
import org.jetlinks.community.codec.Serializers;
import org.jetlinks.core.things.ThingEvent;
import org.jetlinks.core.things.ThingProperty;
import org.jetlinks.core.things.ThingTag;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.core.utils.NumberUtils;
import org.jetlinks.core.utils.RecyclerUtils;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.supports.utils.MVStoreUtils;
@ -29,9 +33,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
@Slf4j
public class LocalFileThingsDataManager implements ThingsDataManager, ThingsDataWriter {
private static final AtomicIntegerFieldUpdater<LocalFileThingsDataManager>
TAG_INC = AtomicIntegerFieldUpdater.newUpdater(LocalFileThingsDataManager.class, "tagInc");
@ -74,10 +78,14 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
this.mvStore = store;
this.tagStore = mvStore.openMap("tags");
this.tagInc = this.tagStore.size();
this.historyStore = mvStore
.openMap("store", new MVMap
.Builder<Long, PropertyHistory>()
.valueType(new HistoryType()));
this.historyStore = MVStoreUtils
.openMap(
mvStore,
"store",
new MVMap
.Builder<Long, PropertyHistory>()
.valueType(new HistoryType())
);
}
public void shutdown() {
@ -92,8 +100,7 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
historyStore.put(entry.getKey(), entry.getValue());
}
}
mvStore.compactFile(60_000);
mvStore.close(60_000);
mvStore.close(-1);
}
@Override
@ -335,8 +342,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
Property p = new Property();
p.setTime(timestamp);
p.setValue(value);
p.setState(state);
p.setValue(tryIntern(value));
p.setState(RecyclerUtils.intern(state));
propertyStore.update(p);
propertyStore.tryStore(key.toTag(), historyStore::put);
}
@ -365,6 +372,16 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
return this.updateProperty(thingType, thingId, createEventProperty(eventId), timestamp, data, null);
}
@Nonnull
@Override
public Mono<Void> updateTag(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String tagKey,
long timestamp,
@Nonnull Object tagValue) {
return this.updateProperty(thingType, thingId, createTagProperty(tagKey), timestamp, tagValue, null);
}
@Nonnull
@Override
public Mono<Void> removeProperties(@Nonnull String thingType, @Nonnull String thingId) {
@ -390,6 +407,14 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
return this.removeProperty(thingType, thingId, createEventProperty(eventId));
}
@Nonnull
@Override
public Mono<Void> removeTag(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String tagKey) {
return this.removeProperty(thingType, thingId, createTagProperty(tagKey));
}
@Nonnull
@Override
public Mono<Void> removeProperty(@Nonnull String thingType,
@ -449,6 +474,51 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
}
}
@Override
public Mono<ThingTag> getLastTag(String thingType,
String thingId,
String tag,
long baseTime) {
String eventKey = createTagProperty(tag);
PropertyHistory propertyStore = getHistory(thingType, thingId, eventKey);
if (propertyStore == null) {
return Mono.empty();
}
Property pro = propertyStore.getProperty(baseTime);
if (pro == null) {
return Mono.empty();
}
return pro
.toProperty(eventKey)
.map(PropertyThingTag::new);
}
protected String createTagProperty(String tag) {
return "t@" + tag;
}
@AllArgsConstructor
private static class PropertyThingTag implements ThingTag {
private final ThingProperty property;
@Override
public String getTag() {
return property
.getProperty()
.substring(2);
}
@Override
public long getTimestamp() {
return property.getTimestamp();
}
@Override
public Object getValue() {
return property.getValue();
}
}
@AllArgsConstructor
@EqualsAndHashCode(cacheStrategy = EqualsAndHashCode.CacheStrategy.LAZY)
protected static class StoreKey {
@ -536,8 +606,13 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeShort(refs.size());
for (Property ref : refs.values()) {
Collection<Property> properties = refs.values();
int size = Math.min(properties.size(), DEFAULT_MAX_STORE_SIZE_EACH_KEY);
out.writeShort(size);
for (Property ref : properties) {
if (size-- == 0) {
break;
}
ref.writeExternal(out);
}
out.writeBoolean(first != null);
@ -575,6 +650,28 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
}
}
public static <T> T tryIntern(T val) {
if (val instanceof Number) {
if (NumberUtils.isIntNumber(((Number) val))) {
int v = ((Number) val).intValue();
if (v > Short.MIN_VALUE && v < Short.MAX_VALUE) {
return RecyclerUtils.intern(val);
}
} else {
double v = ((Number) val).doubleValue();
//缓存2位小数
if (v > Byte.MIN_VALUE &&
v < Byte.MAX_VALUE &&
v * 1000 == (int) (v * 1000)) {
return RecyclerUtils.intern(val);
}
}
}
return val;
}
@Getter
@Setter
public static class Property implements Externalizable {
@ -626,8 +723,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
@Override
public void readExternal(ObjectInput in) throws IOException {
time = in.readLong();
state = (String) SerializeUtils.readObject(in);
value = SerializeUtils.readObject(in);
state = (String) tryIntern(SerializeUtils.readObject(in));
this.value = tryIntern(SerializeUtils.readObject(in));
}
@Override
@ -670,6 +767,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
output.flush();
buff.put(buffer.nioBuffer());
} catch (Throwable err) {
log.warn("write thing data error", err);
} finally {
ReferenceCountUtil.safeRelease(buffer);
}
@ -683,12 +782,17 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
try (ObjectOutput output = createOutput(buffer)) {
for (int i = 0; i < len; i++) {
((PropertyHistory) Array.get(obj, i)).writeExternal(output);
PropertyHistory history = ((PropertyHistory) Array.get(obj, i));
if (history != null) {
history.writeExternal(output);
}
}
output.flush();
buff.put(buffer.nioBuffer());
} catch (Throwable err) {
log.warn("write thing data error", err);
} finally {
ReferenceCountUtil.safeRelease(buffer);
}
@ -703,6 +807,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
data.readExternal(input);
Array.set(obj, i, data);
}
} catch (Throwable err) {
log.warn("read thing data error", err);
}
}
@ -717,6 +823,8 @@ public class LocalFileThingsDataManager implements ThingsDataManager, ThingsData
PropertyHistory data = new PropertyHistory();
try (ObjectInput input = createInput(Unpooled.wrappedBuffer(buff))) {
data.readExternal(input);
} catch (Throwable err) {
log.warn("read thing data error", err);
}
return data;
}

View File

@ -27,6 +27,13 @@ public interface ThingsDataWriter {
long timestamp,
@Nonnull Object data);
@Nonnull
Mono<Void> updateTag(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String tagKey,
long timestamp,
@Nonnull Object tagValue);
@Nonnull
Mono<Void> removeProperty(@Nonnull String thingType,
@Nonnull String thingId,
@ -40,4 +47,9 @@ public interface ThingsDataWriter {
Mono<Void> removeEvent(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String eventId);
@Nonnull
Mono<Void> removeTag(@Nonnull String thingType,
@Nonnull String thingId,
@Nonnull String tagKey);
}

View File

@ -14,9 +14,13 @@ import org.hswebframework.web.crud.generator.Generators;
import org.hswebframework.web.validator.CreateGroup;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.ArrayType;
import org.jetlinks.core.metadata.types.DataTypes;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.UnknownType;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import javax.persistence.Column;
import javax.persistence.Index;
@ -24,6 +28,10 @@ import javax.persistence.Table;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@Getter
@Setter
@ -39,7 +47,7 @@ public class DeviceTagEntity extends GenericEntity<String> {
@Schema(description = "设备ID")
private String deviceId;
@Column(length = 32, updatable = false, nullable = false)
@Column(length = 64, updatable = false, nullable = false)
@NotBlank(message = "[key]不能为空", groups = CreateGroup.class)
@Schema(description = "标签标识")
private String key;
@ -71,7 +79,7 @@ public class DeviceTagEntity extends GenericEntity<String> {
private DataType dataType;
public static DeviceTagEntity of(PropertyMetadata property) {
DeviceTagEntity entity = new DeviceTagEntity();
DeviceTagEntity entity = new DeviceTagEntity();
entity.setKey(property.getId());
entity.setName(property.getName());
entity.setType(property.getValueType().getId());
@ -81,17 +89,6 @@ public class DeviceTagEntity extends GenericEntity<String> {
return entity;
}
//以物模型标签基础数据为准重构数据库保存的可能已过时的标签数据
public DeviceTagEntity restructure(DeviceTagEntity tag) {
this.setDataType(tag.getDataType());
this.setName(tag.getName());
this.setType(tag.getType());
this.setKey(tag.getKey());
this.setDescription(tag.getDescription());
return this;
}
public static DeviceTagEntity of(PropertyMetadata property, Object value) {
DeviceTagEntity tag = of(property);
@ -118,8 +115,59 @@ public class DeviceTagEntity extends GenericEntity<String> {
return tag;
}
public DeviceProperty toProperty() {
DeviceProperty property = new DeviceProperty();
property.setProperty(getKey());
property.setDeviceId(deviceId);
property.setType(type);
property.setPropertyName(name);
DataType type = Optional
.ofNullable(DataTypes.lookup(getType()))
.map(Supplier::get)
.orElseGet(UnknownType::new);
if (type instanceof Converter) {
property.setValue(((Converter<?>) type).convert(getValue()));
} else {
property.setValue(getValue());
}
return property;
}
//以物模型标签基础数据为准重构数据库保存的可能已过时的标签数据
public DeviceTagEntity restructure(DeviceTagEntity tag) {
this.setDataType(tag.getDataType());
this.setName(tag.getName());
this.setType(tag.getType());
this.setKey(tag.getKey());
this.setDescription(tag.getDescription());
return this;
}
public void generateId() {
setId(createTagId(deviceId, key));
}
public static String createTagId(String deviceId, String key) {
return DigestUtils.md5Hex(deviceId + ":" + key);
}
public static Set<String> parseTagKey(String metadata) {
return JetLinksDeviceMetadataCodec
.getInstance()
.doDecode(metadata)
.getTags()
.stream()
.map(PropertyMetadata::getId)
.collect(Collectors.toSet());
}
public static Set<String> parseTagKey(DeviceMetadata metadata) {
return metadata
.getTags()
.stream()
.map(PropertyMetadata::getId)
.collect(Collectors.toSet());
}
}

View File

@ -0,0 +1,166 @@
package org.jetlinks.community.device.service;
import lombok.AllArgsConstructor;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityPrepareModifyEvent;
import org.hswebframework.web.crud.events.EntityPrepareSaveEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.entity.DeviceProductEntity;
import org.jetlinks.community.device.entity.DeviceTagEntity;
import org.jetlinks.community.device.service.term.DeviceInstanceTerm;
import org.jetlinks.community.things.data.ThingsDataWriter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
/**
* 设备标签自动删除.
*
* @author zhangji 2023/3/28
*/
@Component
@ConditionalOnProperty(prefix = "jetlinks.tag.auto-delete", name = "enabled", havingValue = "true", matchIfMissing = true)
@AllArgsConstructor
public class DeviceTagHandler {
private final DeviceRegistry deviceRegistry;
private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
private final ThingsDataWriter dataWriter;
/**
* 更新设备物模型时若删除了标签自动删除设备标签
*/
@EventListener
public void handleDeviceEvent(EntityPrepareSaveEvent<DeviceInstanceEntity> event) {
event.async(deleteTagByDevice(event.getEntity()));
}
/**
* 更新设备物模型时若删除了标签自动删除设备标签
*/
@EventListener
public void handleDeviceEvent(EntityPrepareModifyEvent<DeviceInstanceEntity> event) {
event.async(deleteTagByDevice(event.getAfter()));
}
private Mono<Void> deleteTagByDevice(List<DeviceInstanceEntity> entity) {
return Flux
.fromIterable(entity)
.flatMap(device -> deviceRegistry
.getDevice(device.getId())
.flatMap(operator -> Mono
.zip(
// tp1旧的标签
operator.getMetadata()
.map(DeviceTagEntity::parseTagKey),
// tp2设备ID
Mono.just(operator.getId()),
// tp3新的标签
Mono.justOrEmpty(device.getDeriveMetadata())
// 设备物模型为空则获取产品物模型
.filter(StringUtils::hasText)
.map(metadata -> DeviceTagEntity.parseTagKey(device.getDeriveMetadata()))
.switchIfEmpty(operator
.getProduct()
.flatMap(DeviceProductOperator::getMetadata)
.map(DeviceTagEntity::parseTagKey))
))
.flatMapMany(tp3 -> Flux
.fromIterable(tp3.getT1())
.filter(tag -> !tp3.getT3().contains(tag))
.map(tag -> DeviceTagEntity.createTagId(tp3.getT2(), tag))
))
.as(tagRepository::deleteById)
.then();
}
/**
* 更新产品物模型时若删除了标签自动删除设备标签
*/
@EventListener
public void autoUpdateDeviceTag(EntityPrepareSaveEvent<DeviceProductEntity> event) {
event.async(deleteTagByProduct(event.getEntity()));
}
/**
* 更新产品物模型时若删除了标签自动删除设备标签
*/
@EventListener
public void autoUpdateDeviceTag(EntityPrepareModifyEvent<DeviceProductEntity> event) {
event.async(deleteTagByProduct(event.getAfter()));
}
private Mono<Void> deleteTagByProduct(List<DeviceProductEntity> entity) {
return Flux.fromIterable(entity)
.flatMap(product -> deviceRegistry
.getProduct(product.getId())
.flatMap(productOperator -> Mono
.zip(
// tp1旧的产品物模型
productOperator
.getMetadata()
.map(DeviceTagEntity::parseTagKey),
// tp2新的产品物模型
Mono.just(DeviceTagEntity.parseTagKey(product.getMetadata()))
, (oldTags, newTags) -> {
oldTags.removeAll(newTags);
return oldTags;
}))
.filter(list -> !CollectionUtils.isEmpty(list))
.flatMap(tag -> tagRepository
.createDelete()
.in(DeviceTagEntity::getKey, tag)
.and(
DeviceTagEntity::getDeviceId,
DeviceInstanceTerm.termType,
Query
.of()
.is(DeviceInstanceEntity::getProductId, product.getId())
.getParam()
.getTerms())
.execute()))
.then();
}
@EventListener
public void handleDeviceTagEvent(EntityCreatedEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getEntity()));
}
@EventListener
public void handleDeviceTagEvent(EntitySavedEvent<DeviceTagEntity> event) {
event.async(updateTag(event.getEntity()));
}
/**
* 更新标签消息
*
* @param entityList 标签
* @return Void
*/
private Mono<Void> updateTag(List<DeviceTagEntity> entityList) {
return Flux
.fromIterable(entityList)
.flatMap(entity -> dataWriter
.updateTag(DeviceThingType.device.getId(),
entity.getDeviceId(),
entity.getKey(),
System.currentTimeMillis(),
entity.getValue()))
.then();
}
}