fix: 修复设备标签无法更新问题
This commit is contained in:
parent
2cfc78f1da
commit
3a1f0b65de
|
|
@ -0,0 +1,18 @@
|
|||
package org.jetlinks.community.device.service.tag;
|
||||
|
||||
import org.jetlinks.community.buffer.BufferProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties(prefix = "device.tag.synchronizer")
|
||||
public class DeviceTagProperties extends BufferProperties {
|
||||
|
||||
public DeviceTagProperties(){
|
||||
setFilePath("./data/device-tag-synchronizer");
|
||||
setSize(500);
|
||||
setParallelism(1);
|
||||
getEviction().setMaxSize(100_0000);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,195 @@
|
|||
package org.jetlinks.community.device.service.tag;
|
||||
|
||||
import lombok.*;
|
||||
import org.apache.commons.collections4.MapUtils;
|
||||
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
|
||||
import org.hswebframework.web.crud.events.EntityCreatedEvent;
|
||||
import org.hswebframework.web.crud.events.EntitySavedEvent;
|
||||
import org.jetlinks.core.device.DeviceOperator;
|
||||
import org.jetlinks.core.device.DeviceRegistry;
|
||||
import org.jetlinks.core.device.DeviceThingType;
|
||||
import org.jetlinks.core.message.UpdateTagMessage;
|
||||
import org.jetlinks.core.things.ThingsDataManager;
|
||||
import org.jetlinks.core.utils.Reactors;
|
||||
import org.jetlinks.community.buffer.BufferSettings;
|
||||
import org.jetlinks.community.buffer.PersistenceBuffer;
|
||||
import org.jetlinks.community.device.entity.DeviceTagEntity;
|
||||
import org.jetlinks.community.gateway.annotation.Subscribe;
|
||||
import org.jetlinks.community.things.data.ThingsDataWriter;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.Externalizable;
|
||||
import java.io.ObjectInput;
|
||||
import java.io.ObjectOutput;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DeviceTagSynchronizer implements CommandLineRunner {
|
||||
|
||||
private final DeviceTagProperties properties;
|
||||
|
||||
private final DeviceRegistry registry;
|
||||
|
||||
private final ThingsDataWriter dataWriter;
|
||||
|
||||
private final ThingsDataManager dataManager;
|
||||
|
||||
private final ReactiveRepository<DeviceTagEntity, String> tagRepository;
|
||||
|
||||
public PersistenceBuffer<DeviceTagBuffer> buffer;
|
||||
|
||||
@Subscribe(value = "/device/*/*/message/tags/update")
|
||||
public Mono<Void> updateDeviceTag(UpdateTagMessage message) {
|
||||
Map<String, Object> tags = message.getTags();
|
||||
if (MapUtils.isEmpty(tags)) {
|
||||
return Mono.empty();
|
||||
}
|
||||
String deviceId = message.getDeviceId();
|
||||
|
||||
return registry
|
||||
.getDevice(deviceId)
|
||||
.flatMap(DeviceOperator::getMetadata)
|
||||
.flatMapMany(metadata -> Flux
|
||||
.fromIterable(tags.entrySet())
|
||||
.filter(e -> e.getValue() != null)
|
||||
.flatMap(e -> {
|
||||
DeviceTagEntity tagEntity = metadata
|
||||
.getTag(e.getKey())
|
||||
.map(tagMeta -> DeviceTagEntity.of(tagMeta, e.getValue()))
|
||||
.orElseGet(() -> {
|
||||
DeviceTagEntity entity = new DeviceTagEntity();
|
||||
entity.setKey(e.getKey());
|
||||
entity.setType("string");
|
||||
entity.setName(e.getKey());
|
||||
entity.setCreateTime(new Date());
|
||||
entity.setDescription("设备上报");
|
||||
entity.setValue(String.valueOf(e.getValue()));
|
||||
return entity;
|
||||
});
|
||||
tagEntity.setTimestamp(message.getTimestamp());
|
||||
tagEntity.setDeviceId(deviceId);
|
||||
tagEntity.setId(DeviceTagEntity.createTagId(deviceId, tagEntity.getKey()));
|
||||
return writeBuffer(tagEntity);
|
||||
}))
|
||||
.then();
|
||||
}
|
||||
|
||||
|
||||
public Mono<Void> writeBuffer(DeviceTagEntity entity) {
|
||||
return buffer.writeAsync(new DeviceTagBuffer(entity));
|
||||
}
|
||||
|
||||
|
||||
private Mono<DeviceTagEntity> convertEntity(DeviceTagBuffer buffer) {
|
||||
//从最新缓存中获取最新的数据,并填入准备入库的实体中
|
||||
return dataManager
|
||||
.getLastTag(DeviceThingType.device.getId(),
|
||||
buffer.getTag().getDeviceId(),
|
||||
buffer.getTag().getKey(),
|
||||
System.currentTimeMillis())
|
||||
.map(tag -> {
|
||||
//缓存中的数据比buffer中的新,则更新为buffer中的数据
|
||||
if (tag.getTimestamp() >= buffer.tag.getTimestamp()) {
|
||||
buffer.getTag().setTimestamp(tag.getTimestamp());
|
||||
buffer.getTag().setValue(String.valueOf(tag.getValue()));
|
||||
}
|
||||
return buffer.getTag();
|
||||
})
|
||||
.defaultIfEmpty(buffer.tag);
|
||||
}
|
||||
|
||||
public Mono<Boolean> handleBuffer(Flux<DeviceTagBuffer> buffer) {
|
||||
|
||||
return tagRepository
|
||||
.save(buffer.flatMap(this::convertEntity))
|
||||
.contextWrite(ctx -> ctx.put(DeviceTagSynchronizer.class, this))
|
||||
.then(Reactors.ALWAYS_FALSE);
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeviceTagEvent(EntityCreatedEvent<DeviceTagEntity> event) {
|
||||
event.async(updateTag(event.getEntity()));
|
||||
}
|
||||
|
||||
@EventListener
|
||||
public void handleDeviceTagEvent(EntitySavedEvent<DeviceTagEntity> event) {
|
||||
event.async(updateTag(event.getEntity()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新标签,界面上手动修改标签?
|
||||
*
|
||||
* @param entityList 标签
|
||||
* @return Void
|
||||
*/
|
||||
private Mono<Void> updateTag(List<DeviceTagEntity> entityList) {
|
||||
return Mono.deferContextual(ctx -> {
|
||||
//更新来自消息的标签,不需要再次更新
|
||||
if (ctx.hasKey(DeviceTagSynchronizer.class)) {
|
||||
return Mono.empty();
|
||||
}
|
||||
return Flux
|
||||
.fromIterable(entityList)
|
||||
.flatMap(entity -> dataWriter
|
||||
.updateTag(DeviceThingType.device.getId(),
|
||||
entity.getDeviceId(),
|
||||
entity.getKey(),
|
||||
System.currentTimeMillis(),
|
||||
entity.getValue()))
|
||||
.then();
|
||||
});
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
buffer = new PersistenceBuffer<>(
|
||||
BufferSettings.create(properties),
|
||||
DeviceTagBuffer::new,
|
||||
this::handleBuffer)
|
||||
.name("device-tag-synchronizer");
|
||||
buffer.init();
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
buffer.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) throws Exception {
|
||||
buffer.start();
|
||||
SpringApplication
|
||||
.getShutdownHandlers()
|
||||
.add(buffer::dispose);
|
||||
}
|
||||
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public static class DeviceTagBuffer implements Externalizable {
|
||||
private DeviceTagEntity tag;
|
||||
|
||||
@Override
|
||||
public void writeExternal(ObjectOutput out) {
|
||||
tag.writeExternal(out);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readExternal(ObjectInput in) {
|
||||
tag = new DeviceTagEntity();
|
||||
tag.readExternal(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue