Merge remote-tracking branch 'origin/master' into 2.10
# Conflicts: # jetlinks-components/common-component/src/main/java/org/jetlinks/community/configuration/UiResourceConfiguration.java # jetlinks-components/common-component/src/main/java/org/jetlinks/community/resource/ui/UiMenuResourceProvider.java # jetlinks-components/common-component/src/main/java/org/jetlinks/community/resource/ui/UiResourceProvider.java # jetlinks-components/common-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
This commit is contained in:
commit
94342f13dc
|
|
@ -125,12 +125,12 @@ public class MqttClientProvider implements NetworkProvider<MqttClientProperties>
|
|||
return new DefaultConfigMetadata()
|
||||
.add("id", "id", "", new StringType())
|
||||
.add("remoteHost", "远程地址", "", new StringType())
|
||||
.add("remotePort", "远程地址", "", new IntType())
|
||||
.add("certId", "证书id", "", new StringType())
|
||||
.add("remotePort", "远程端口", "", new IntType())
|
||||
.add("certId", "证书ID", "", new StringType())
|
||||
.add("secure", "开启TSL", "", new BooleanType())
|
||||
.add("clientId", "客户端ID", "", new BooleanType())
|
||||
.add("username", "用户名", "", new BooleanType())
|
||||
.add("password", "密码", "", new BooleanType());
|
||||
.add("clientId", "客户端ID", "", new StringType())
|
||||
.add("username", "用户名", "", new StringType())
|
||||
.add("password", "密码", "", new StringType());
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
|
|
|
|||
|
|
@ -142,24 +142,27 @@ public class DeviceTagEntity extends GenericEntity<String> {
|
|||
return tag;
|
||||
}
|
||||
|
||||
|
||||
public DeviceProperty toProperty() {
|
||||
DeviceProperty property = new DeviceProperty();
|
||||
property.setProperty(getKey());
|
||||
property.setDeviceId(deviceId);
|
||||
property.setType(type);
|
||||
property.setPropertyName(name);
|
||||
property.setValue(parseValue());
|
||||
return property;
|
||||
}
|
||||
|
||||
public Object parseValue() {
|
||||
DataType type = Optional
|
||||
.ofNullable(DataTypes.lookup(getType()))
|
||||
.map(Supplier::get)
|
||||
.orElseGet(UnknownType::new);
|
||||
if (type instanceof Converter) {
|
||||
property.setValue(((Converter<?>) type).convert(getValue()));
|
||||
return ((Converter<?>) type).convert(getValue());
|
||||
} else {
|
||||
property.setValue(getValue());
|
||||
return getValue();
|
||||
}
|
||||
return property;
|
||||
|
||||
|
||||
}
|
||||
|
||||
//以物模型标签基础数据为准,重构数据库保存的可能已过时的标签数据
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
package org.jetlinks.community.device.service.tag;
|
||||
|
||||
import org.jetlinks.community.buffer.BufferProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "device.tag.synchronizer")
|
||||
public class DeviceTagProperties extends BufferProperties {
|
||||
|
||||
public DeviceTagProperties(){
|
||||
setFilePath("./data/device-tag-synchronizer");
|
||||
setSize(500);
|
||||
setParallelism(1);
|
||||
getEviction().setMaxSize(100_0000);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,224 @@
|
|||
package org.jetlinks.community.device.service.tag;
|
||||
|
||||
import lombok.*;
|
||||
import org.apache.commons.collections4.MapUtils;
|
||||
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
||||
import org.hswebframework.web.crud.events.EntityCreatedEvent;
|
||||
import org.hswebframework.web.crud.events.EntityDeletedEvent;
|
||||
import org.hswebframework.web.crud.events.EntityModifyEvent;
|
||||
import org.hswebframework.web.crud.events.EntitySavedEvent;
|
||||
import org.jetlinks.core.device.DeviceOperator;
|
||||
import org.jetlinks.core.device.DeviceRegistry;
|
||||
import org.jetlinks.core.device.DeviceThingType;
|
||||
import org.jetlinks.core.message.UpdateTagMessage;
|
||||
import org.jetlinks.core.things.ThingsDataManager;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import org.jetlinks.community.buffer.BufferSettings;
|
||||
import org.jetlinks.community.buffer.PersistenceBuffer;
|
||||
import org.jetlinks.community.device.entity.DeviceTagEntity;
|
||||
import org.jetlinks.community.gateway.annotation.Subscribe;
|
||||
import org.jetlinks.community.things.data.ThingsDataWriter;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.Externalizable;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectOutput;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DeviceTagSynchronizer implements CommandLineRunner {
|
||||
|
||||
private final DeviceTagProperties properties;
|
||||
|
||||
private final DeviceRegistry registry;
|
||||
|
||||
private final ThingsDataWriter dataWriter;
|
||||
|
||||
private final ThingsDataManager dataManager;
|
||||
|
||||
private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
|
||||
|
||||
public PersistenceBuffer<DeviceTagBuffer> buffer;
|
||||
|
||||
@Subscribe(value = "/device/*/*/message/tags/update")
|
||||
public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
|
||||
Map<String, Object> tags = message.getTags();
|
||||
if (MapUtils.isEmpty(tags)) {
|
||||
return Mono.empty();
|
||||
}
|
||||
String deviceId = message.getDeviceId();
|
||||
|
||||
return registry
|
||||
.getDevice(deviceId)
|
||||
.flatMap(DeviceOperator::getMetadata)
|
||||
.flatMapMany(metadata -> Flux
|
||||
.fromIterable(tags.entrySet())
|
||||
.filter(e -> e.getValue() != null)
|
||||
.flatMap(e -> {
|
||||
DeviceTagEntity tagEntity = metadata
|
||||
.getTag(e.getKey())
|
||||
.map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue()))
|
||||
.orElseGet(() -> {
|
||||
DeviceTagEntity entity = new DeviceTagEntity();
|
||||
entity.setKey(e.getKey());
|
||||
entity.setType("string");
|
||||
entity.setName(e.getKey());
|
||||
entity.setCreateTime(new Date());
|
||||
entity.setDescription("设备上报");
|
||||
entity.setValue(String.valueOf(e.getValue()));
|
||||
return entity;
|
||||
});
|
||||
tagEntity.setTimestamp(message.getTimestamp());
|
||||
tagEntity.setDeviceId(deviceId);
|
||||
tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
|
||||
|
||||
return dataWriter
|
||||
.updateTag(DeviceThingType.device.getId(),
|
||||
tagEntity.getDeviceId(),
|
||||
tagEntity.getKey(),
|
||||
System.currentTimeMillis(),
|
||||
e.getValue())
|
||||
.then(writeBuffer(tagEntity));
|
||||
|
||||
}))
|
||||
.then();
|
||||
}
|
||||
|
||||
|
||||
public Mono<Void> writeBuffer(DeviceTagEntity entity) {
|
||||
return buffer.writeAsync(new DeviceTagBuffer(entity));
|
||||
}
|
||||
|
||||
|
||||
private Mono<DeviceTagEntity> convertEntity(DeviceTagBuffer buffer) {
|
||||
//从最新缓存中获取最新的数据,并填入准备入库的实体中
|
||||
return dataManager
|
||||
.getLastTag(DeviceThingType.device.getId(),
|
||||
buffer.getTag().getDeviceId(),
|
||||
buffer.getTag().getKey(),
|
||||
System.currentTimeMillis())
|
||||
.map(tag -> {
|
||||
//缓存中的数据比buffer中的新,则更新为buffer中的数据
|
||||
if (tag.getTimestamp() >= buffer.tag.getTimestamp()) {
|
||||
buffer.getTag().setTimestamp(tag.getTimestamp());
|
||||
buffer.getTag().setValue(String.valueOf(tag.getValue()));
|
||||
}
|
||||
return buffer.getTag();
|
||||
})
|
||||
.defaultIfEmpty(buffer.tag);
|
||||
}
|
||||
|
||||
public Mono<Boolean> handleBuffer(Flux<DeviceTagBuffer> buffer) {
|
||||
|
||||
return tagRepository
|
||||
.save(buffer.flatMap(this::convertEntity))
|
||||
.contextWrite(ctx -> ctx.put(DeviceTagSynchronizer.class, this))
|
||||
.then(Reactors.ALWAYS_FALSE);
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeviceTagEvent(EntityCreatedEvent<DeviceTagEntity> event) {
|
||||
event.async(updateTag(event.getEntity()));
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeviceTagEvent(EntitySavedEvent<DeviceTagEntity> event) {
|
||||
event.async(updateTag(event.getEntity()));
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeviceTagEvent(EntityModifyEvent<DeviceTagEntity> event) {
|
||||
event.async(updateTag(event.getAfter()));
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeviceTagEvent(EntityDeletedEvent<DeviceTagEntity> event) {
|
||||
event.async(
|
||||
Flux
|
||||
.fromIterable(event.getEntity())
|
||||
.flatMap(entity -> dataWriter
|
||||
.removeTag(DeviceThingType.device.getId(),
|
||||
entity.getDeviceId(),
|
||||
entity.getKey())
|
||||
.then()
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 更新标签,界面上手动修改标签?
|
||||
*
|
||||
* @param entityList 标签
|
||||
* @return Void
|
||||
*/
|
||||
private Mono<Void> updateTag(List<DeviceTagEntity> entityList) {
|
||||
return Mono.deferContextual(ctx -> {
|
||||
//更新来自消息的标签,不需要再次更新
|
||||
if (ctx.hasKey(DeviceTagSynchronizer.class)) {
|
||||
return Mono.empty();
|
||||
}
|
||||
return Flux
|
||||
.fromIterable(entityList)
|
||||
.flatMap(entity -> dataWriter
|
||||
.updateTag(DeviceThingType.device.getId(),
|
||||
entity.getDeviceId(),
|
||||
entity.getKey(),
|
||||
System.currentTimeMillis(),
|
||||
entity.parseValue()))
|
||||
.then();
|
||||
});
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
buffer = new PersistenceBuffer<>(
|
||||
BufferSettings.create(properties),
|
||||
DeviceTagBuffer::new,
|
||||
this::handleBuffer)
|
||||
.name("device-tag-synchronizer");
|
||||
buffer.init();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
buffer.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
buffer.start();
|
||||
SpringApplication
|
||||
.getShutdownHandlers()
|
||||
.add(buffer::dispose);
|
||||
}
|
||||
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public static class DeviceTagBuffer implements Externalizable {
|
||||
private DeviceTagEntity tag;
|
||||
|
||||
@Override
|
||||
public void writeExternal(ObjectOutput out) {
|
||||
tag.writeExternal(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readExternal(ObjectInput in) {
|
||||
tag = new DeviceTagEntity();
|
||||
tag.readExternal(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -234,15 +234,32 @@ public class SceneUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static void refactorUpperKey(DeviceSelectorSpec deviceSelectorSpec) {
|
||||
@SuppressWarnings("all")
|
||||
public static void refactorUpperKey(Object source) {
|
||||
// 将变量格式改为与查询的别名一致
|
||||
if (VariableSource.Source.upper.equals(deviceSelectorSpec.getSource())) {
|
||||
// scene.xx.current -> scene.scene_xx_current
|
||||
if (deviceSelectorSpec.getUpperKey().startsWith("scene.")) {
|
||||
String alias = SceneUtils.createColumnAlias("properties", deviceSelectorSpec.getUpperKey(), false);
|
||||
deviceSelectorSpec.setUpperKey("scene." + alias);
|
||||
if (source instanceof VariableSource) {
|
||||
VariableSource variableSource = (VariableSource) source;
|
||||
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
|
||||
variableSource.setUpperKey(transferSceneUpperKey(variableSource.getUpperKey()));
|
||||
}
|
||||
}
|
||||
if (source instanceof Map) {
|
||||
Map<String, Object> map = (Map<String, Object>) source;
|
||||
VariableSource variableSource = VariableSource.of(source);
|
||||
// 将变量格式改为与查询的别名一致
|
||||
if (VariableSource.Source.upper.equals(variableSource.getSource())) {
|
||||
map.put("upperKey", transferSceneUpperKey(variableSource.getUpperKey()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static String transferSceneUpperKey(String upperKey) {
|
||||
// scene.xx.current -> scene.scene_xx_current
|
||||
if (upperKey.startsWith("scene.")) {
|
||||
String alias = SceneUtils.createColumnAlias("scene", upperKey, false);
|
||||
return "scene." + alias;
|
||||
}
|
||||
return upperKey;
|
||||
}
|
||||
|
||||
private static boolean isContainThis(String[] arr) {
|
||||
|
|
|
|||
|
|
@ -163,11 +163,13 @@ public class Variable {
|
|||
}
|
||||
|
||||
public void refactorPrefix(Variable main) {
|
||||
id = SceneUtils.transferSceneUpperKey(id);
|
||||
if (CollectionUtils.isNotEmpty(children)) {
|
||||
for (Variable child : children) {
|
||||
if (!child.getId().startsWith(main.id + ".")) {
|
||||
child.setId(main.id + "." + child.getId());
|
||||
}
|
||||
child.setId(SceneUtils.transferSceneUpperKey(child.getId()));
|
||||
|
||||
if (StringUtils.hasText(child.getFullName()) && StringUtils.hasText(main.getFullName())) {
|
||||
child.setFullName(main.getFullName() + "/" + child.getFullName());
|
||||
|
|
|
|||
Loading…
Reference in New Issue