fix: 修复在设备里单独定义物模型时,订阅的数据格式不对问题. (#432)

* fix: 修复在设备里单独定义物模型时,订阅的数据格式不对问题.

* fix: 修复编译错误
This commit is contained in:
老周 2023-10-31 16:03:14 +08:00 committed by GitHub
parent 827dc1ef6b
commit 2814fac218
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 127 additions and 60 deletions

View File

@ -1,10 +1,12 @@
package org.jetlinks.community.device.measurements; package org.jetlinks.community.device.measurements;
import lombok.Generated;
import org.jetlinks.community.dashboard.DashboardObject; import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.dashboard.Measurement; import org.jetlinks.community.dashboard.Measurement;
import org.jetlinks.community.dashboard.ObjectDefinition; import org.jetlinks.community.dashboard.ObjectDefinition;
import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.DeviceMetadata; import org.jetlinks.core.metadata.DeviceMetadata;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -21,33 +23,40 @@ public class DeviceDashboardObject implements DashboardObject {
private final DeviceDataService deviceDataService; private final DeviceDataService deviceDataService;
private final DeviceRegistry registry;
private DeviceDashboardObject(String id, String name, private DeviceDashboardObject(String id, String name,
DeviceProductOperator productOperator, DeviceProductOperator productOperator,
EventBus eventBus, EventBus eventBus,
DeviceDataService dataService) { DeviceDataService dataService,
DeviceRegistry registry) {
this.id = id; this.id = id;
this.name = name; this.name = name;
this.productOperator = productOperator; this.productOperator = productOperator;
this.eventBus = eventBus; this.eventBus = eventBus;
this.deviceDataService = dataService; this.deviceDataService = dataService;
this.registry = registry;
} }
public static DeviceDashboardObject of(String id, String name, public static DeviceDashboardObject of(String id, String name,
DeviceProductOperator productOperator, DeviceProductOperator productOperator,
EventBus eventBus, EventBus eventBus,
DeviceDataService dataService ) { DeviceDataService dataService,
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService); DeviceRegistry registry) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService, registry);
} }
@Override @Override
public ObjectDefinition getDefinition() { public ObjectDefinition getDefinition() {
return new ObjectDefinition() { return new ObjectDefinition() {
@Override @Override
@Generated
public String getId() { public String getId() {
return id; return id;
} }
@Override @Override
@Generated
public String getName() { public String getName() {
return name; return name;
} }
@ -56,40 +65,57 @@ public class DeviceDashboardObject implements DashboardObject {
@Override @Override
public Flux<Measurement> getMeasurements() { public Flux<Measurement> getMeasurements() {
return Flux.concat( return Flux
.concat(
productOperator.getMetadata() productOperator
.flatMapIterable(DeviceMetadata::getEvents) .getMetadata()
.map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)), .flatMapIterable(DeviceMetadata::getEvents)
.map(event -> new DeviceEventMeasurement(productOperator.getId(),
eventBus,
event,
deviceDataService)),
productOperator.getMetadata() Mono.just(new DevicePropertiesMeasurement(productOperator.getId(),
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)), eventBus,
deviceDataService,
registry)),
productOperator.getMetadata() productOperator
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)), .getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(),
eventBus,
metadata,
deviceDataService)),
productOperator.getMetadata() productOperator
.flatMapIterable(DeviceMetadata::getProperties) .getMetadata()
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) .flatMapIterable(DeviceMetadata::getProperties)
); .map(event -> new DevicePropertyMeasurement(productOperator.getId(),
eventBus,
event,
deviceDataService))
);
} }
@Override @Override
public Mono<Measurement> getMeasurement(String id) { public Mono<Measurement> getMeasurement(String id) {
if ("properties".equals(id)) { if ("properties".equals(id)) {
return productOperator.getMetadata() return Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, registry));
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata));
} }
if ("events".equals(id)) { if ("events".equals(id)) {
return productOperator.getMetadata() return productOperator
.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)); .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService));
} }
return productOperator.getMetadata() return productOperator
.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id))) .flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id)))
.<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) .<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
//事件没获取到则尝试获取属性 //事件没获取到则尝试获取属性
.switchIfEmpty(productOperator.getMetadata() .switchIfEmpty(productOperator
.flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id))) .getMetadata()
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))); .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)));
} }
} }

View File

@ -53,6 +53,6 @@ public class DeviceDynamicDashboard implements DeviceDashboard {
protected Mono<DeviceDashboardObject> convertObject(DeviceProductEntity product) { protected Mono<DeviceDashboardObject> convertObject(DeviceProductEntity product) {
return registry.getProduct(product.getId()) return registry.getProduct(product.getId())
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService)); .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService,registry));
} }
} }

View File

@ -1,22 +1,29 @@
package org.jetlinks.community.device.measurements; package org.jetlinks.community.device.measurements;
import lombok.Generated;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils; import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription; import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.property.Property;
import org.jetlinks.core.metadata.*; import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType; import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.metadata.unit.ValueUnit;
import org.jetlinks.reactor.ql.utils.CastUtils; import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.util.*; import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Slf4j @Slf4j
@ -24,21 +31,21 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
private final EventBus eventBus; private final EventBus eventBus;
private final DeviceMetadata metadata;
private final DeviceDataService dataService; private final DeviceDataService dataService;
private final String productId; private final String productId;
private final DeviceRegistry registry;
public DevicePropertiesMeasurement(String productId, public DevicePropertiesMeasurement(String productId,
EventBus eventBus, EventBus eventBus,
DeviceDataService dataService, DeviceDataService dataService,
DeviceMetadata deviceMetadata) { DeviceRegistry registry) {
super(MeasurementDefinition.of("properties", "属性记录")); super(MeasurementDefinition.of("properties", "属性记录"));
this.productId = productId; this.productId = productId;
this.eventBus = eventBus; this.eventBus = eventBus;
this.metadata = deviceMetadata;
this.dataService = dataService; this.dataService = dataService;
this.registry = registry;
addDimension(new RealTimeDevicePropertyDimension()); addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension()); addDimension(new HistoryDevicePropertyDimension());
@ -55,29 +62,50 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
.sort(MeasurementValue.sort()); .sort(MeasurementValue.sort());
} }
Map<String, Object> createValue(String property, Object value) { Map<String, Object> createValue(DeviceMetadata metadata, Property property) {
return metadata return metadata
.getProperty(property) .getProperty(property.getId())
.map(meta -> { .map(meta -> {
Map<String, Object> values = new HashMap<>(); Map<String, Object> values = new HashMap<>();
DataType type = meta.getValueType(); DataType type = meta.getValueType();
Object val = type instanceof Converter ? ((Converter<?>) type).convert(value) : value; Object val;
if (type instanceof NumberType) {
NumberType<?> numberType = ((NumberType<?>) type);
val = NumberType.convertScaleNumber(property.getValue(), numberType.getScale(), numberType.getRound(), Function.identity());
} else if (type instanceof Converter) {
val = ((Converter<?>) type).convert(property.getValue());
} else {
val = property.getValue();
}
values.put("formatValue", type.format(val)); values.put("formatValue", type.format(val));
values.put("value", val); values.put("value", val);
values.put("property", property); values.put("state", property.getState());
values.put("property", property.getId());
values.put("timestamp",property.getTimestamp());
if (type instanceof UnitSupported) {
UnitSupported unitSupported = (UnitSupported) type;
values.put("unit", Optional.ofNullable(unitSupported.getUnit())
.map(ValueUnit::getSymbol)
.orElse(null));
}
return values; return values;
}) })
.orElseGet(() -> { .orElseGet(() -> {
Map<String, Object> values = new HashMap<>(); Map<String, Object> values = new HashMap<>();
values.put("formatValue", value); values.put("formatValue", property.getValue());
values.put("value", value); values.put("value", property.getValue());
values.put("property", property); values.put("state", property.getState());
values.put("property", property.getId());
values.put("timestamp",property.getTimestamp());
return values; return values;
}); });
} }
Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties) { static Subscription.Feature[] clusterFeature = {Subscription.Feature.local, Subscription.Feature.broker};
static Subscription.Feature[] nonClusterFeature = {Subscription.Feature.local};
Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties, boolean cluster) {
Subscription subscription = Subscription.of( Subscription subscription = Subscription.of(
"realtime-device-properties-measurement", "realtime-device-properties-measurement",
@ -85,27 +113,32 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
"/device/" + productId + "/" + deviceId + "/message/property/report", "/device/" + productId + "/" + deviceId + "/message/property/report",
"/device/" + productId + "/" + deviceId + "/message/property/*/reply" "/device/" + productId + "/" + deviceId + "/message/property/*/reply"
}, },
Subscription.Feature.local, Subscription.Feature.broker cluster ? clusterFeature : nonClusterFeature
); );
List<PropertyMetadata> props = metadata.getProperties(); return registry
Map<String, Integer> index = new HashMap<>(); .getDevice(deviceId)
int idx = 0; .flatMap(DeviceOperator::getMetadata)
for (PropertyMetadata prop : props) { .flatMapMany(metadata -> {
if (properties.isEmpty() || properties.contains(prop.getId())) { List<PropertyMetadata> props = metadata.getProperties();
index.put(prop.getId(), idx++); Map<String, Integer> index = new HashMap<>();
} int idx = 0;
} for (PropertyMetadata prop : props) {
return if (properties.isEmpty() || properties.contains(prop.getId())) {
eventBus index.put(prop.getId(), idx++);
.subscribe(subscription, DeviceMessage.class) }
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg))) }
.flatMap(map -> Flux return
.fromIterable(map.entrySet()) eventBus
//对本次上报的属性进行排序 .subscribe(subscription, DeviceMessage.class)
.sort(Comparator.comparingInt(e -> index.getOrDefault(e.getKey(), 0)))) .flatMap(msg -> Flux
.<MeasurementValue>map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis())) .fromIterable(DeviceMessageUtils.tryGetCompleteProperties(msg))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err)) .filter(e -> index.containsKey(e.getId()))
; //对本次上报的属性进行排序
.sort(Comparator.comparingInt(e -> index.getOrDefault(e.getId(), 0)))
.<MeasurementValue>map(e -> SimpleMeasurementValue.of(createValue(metadata, e), e.getTimestamp())))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
;
});
} }
static ConfigMetadata configMetadata = new DefaultConfigMetadata() static ConfigMetadata configMetadata = new DefaultConfigMetadata()
@ -127,11 +160,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
private class HistoryDevicePropertyDimension implements MeasurementDimension { private class HistoryDevicePropertyDimension implements MeasurementDimension {
@Override @Override
@Generated
public DimensionDefinition getDefinition() { public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.history; return CommonDimensionDefinition.history;
} }
@Override @Override
@Generated
public DataType getValueType() { public DataType getValueType() {
return new ObjectType() return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL) .addProperty("property", "属性", StringType.GLOBAL)
@ -140,11 +175,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
} }
@Override @Override
@Generated
public ConfigMetadata getParams() { public ConfigMetadata getParams() {
return configMetadata; return configMetadata;
} }
@Override @Override
@Generated
public boolean isRealTime() { public boolean isRealTime() {
return false; return false;
} }
@ -167,11 +204,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
private class RealTimeDevicePropertyDimension implements MeasurementDimension { private class RealTimeDevicePropertyDimension implements MeasurementDimension {
@Override @Override
@Generated
public DimensionDefinition getDefinition() { public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.realTime; return CommonDimensionDefinition.realTime;
} }
@Override @Override
@Generated
public DataType getValueType() { public DataType getValueType() {
return new ObjectType() return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL) .addProperty("property", "属性", StringType.GLOBAL)
@ -180,11 +219,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
} }
@Override @Override
@Generated
public ConfigMetadata getParams() { public ConfigMetadata getParams() {
return configMetadata; return configMetadata;
} }
@Override @Override
@Generated
public boolean isRealTime() { public boolean isRealTime() {
return true; return true;
} }
@ -196,12 +237,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
.flatMapMany(deviceId -> { .flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0); int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据 //合并历史数据和实时数据
return Flux.concat( return Flux.concat(
//查询历史数据 //查询历史数据
fromHistory(deviceId, history, getPropertiesFromParameter(parameter)) fromHistory(deviceId, history, getPropertiesFromParameter(parameter))
, ,
//从消息网关订阅实时事件消息 //从消息网关订阅实时事件消息
fromRealTime(deviceId, getPropertiesFromParameter(parameter)) fromRealTime(deviceId, getPropertiesFromParameter(parameter), parameter.getBoolean("cluster", true))
); );
}); });
} }