Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhouhao 2023-11-01 09:26:47 +08:00
commit edd1b9aba2
4 changed files with 129 additions and 62 deletions

View File

@ -1,10 +1,12 @@
package org.jetlinks.community.device.measurements;
import lombok.Generated;
import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.dashboard.Measurement;
import org.jetlinks.community.dashboard.ObjectDefinition;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.DeviceMetadata;
import reactor.core.publisher.Flux;
@ -21,33 +23,40 @@ public class DeviceDashboardObject implements DashboardObject {
private final DeviceDataService deviceDataService;
private final DeviceRegistry registry;
private DeviceDashboardObject(String id, String name,
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService) {
DeviceDataService dataService,
DeviceRegistry registry) {
this.id = id;
this.name = name;
this.productOperator = productOperator;
this.eventBus = eventBus;
this.deviceDataService = dataService;
this.registry = registry;
}
public static DeviceDashboardObject of(String id, String name,
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService ) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService);
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService,
DeviceRegistry registry) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService, registry);
}
@Override
public ObjectDefinition getDefinition() {
return new ObjectDefinition() {
@Override
@Generated
public String getId() {
return id;
}
@Override
@Generated
public String getName() {
return name;
}
@ -56,40 +65,57 @@ public class DeviceDashboardObject implements DashboardObject {
@Override
public Flux<Measurement> getMeasurements() {
return Flux.concat(
return Flux
.concat(
productOperator.getMetadata()
.flatMapIterable(DeviceMetadata::getEvents)
.map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)),
productOperator
.getMetadata()
.flatMapIterable(DeviceMetadata::getEvents)
.map(event -> new DeviceEventMeasurement(productOperator.getId(),
eventBus,
event,
deviceDataService)),
productOperator.getMetadata()
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)),
Mono.just(new DevicePropertiesMeasurement(productOperator.getId(),
eventBus,
deviceDataService,
registry)),
productOperator.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)),
productOperator
.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(),
eventBus,
metadata,
deviceDataService)),
productOperator.getMetadata()
.flatMapIterable(DeviceMetadata::getProperties)
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
);
productOperator
.getMetadata()
.flatMapIterable(DeviceMetadata::getProperties)
.map(event -> new DevicePropertyMeasurement(productOperator.getId(),
eventBus,
event,
deviceDataService))
);
}
@Override
public Mono<Measurement> getMeasurement(String id) {
if ("properties".equals(id)) {
return productOperator.getMetadata()
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata));
return Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, registry));
}
if ("events".equals(id)) {
return productOperator.getMetadata()
return productOperator
.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService));
}
return productOperator.getMetadata()
return productOperator
.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id)))
.<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
//事件没获取到则尝试获取属性
.switchIfEmpty(productOperator.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)));
.switchIfEmpty(productOperator
.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)));
}
}

View File

@ -53,6 +53,6 @@ public class DeviceDynamicDashboard implements DeviceDashboard {
protected Mono<DeviceDashboardObject> convertObject(DeviceProductEntity product) {
return registry.getProduct(product.getId())
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService));
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService,registry));
}
}

View File

@ -1,22 +1,29 @@
package org.jetlinks.community.device.measurements;
import lombok.Generated;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.property.Property;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.metadata.unit.ValueUnit;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@ -24,21 +31,21 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
private final EventBus eventBus;
private final DeviceMetadata metadata;
private final DeviceDataService dataService;
private final String productId;
private final DeviceRegistry registry;
public DevicePropertiesMeasurement(String productId,
EventBus eventBus,
DeviceDataService dataService,
DeviceMetadata deviceMetadata) {
DeviceRegistry registry) {
super(MeasurementDefinition.of("properties", "属性记录"));
this.productId = productId;
this.eventBus = eventBus;
this.metadata = deviceMetadata;
this.dataService = dataService;
this.registry = registry;
addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension());
@ -55,29 +62,50 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
.sort(MeasurementValue.sort());
}
Map<String, Object> createValue(String property, Object value) {
Map<String, Object> createValue(DeviceMetadata metadata, Property property) {
return metadata
.getProperty(property)
.getProperty(property.getId())
.map(meta -> {
Map<String, Object> values = new HashMap<>();
DataType type = meta.getValueType();
Object val = type instanceof Converter ? ((Converter<?>) type).convert(value) : value;
Object val;
if (type instanceof NumberType) {
NumberType<?> numberType = ((NumberType<?>) type);
val = NumberType.convertScaleNumber(property.getValue(), numberType.getScale(), numberType.getRound(), Function.identity());
} else if (type instanceof Converter) {
val = ((Converter<?>) type).convert(property.getValue());
} else {
val = property.getValue();
}
values.put("formatValue", type.format(val));
values.put("value", val);
values.put("property", property);
values.put("state", property.getState());
values.put("property", property.getId());
values.put("timestamp",property.getTimestamp());
if (type instanceof UnitSupported) {
UnitSupported unitSupported = (UnitSupported) type;
values.put("unit", Optional.ofNullable(unitSupported.getUnit())
.map(ValueUnit::getSymbol)
.orElse(null));
}
return values;
})
.orElseGet(() -> {
Map<String, Object> values = new HashMap<>();
values.put("formatValue", value);
values.put("value", value);
values.put("property", property);
values.put("formatValue", property.getValue());
values.put("value", property.getValue());
values.put("state", property.getState());
values.put("property", property.getId());
values.put("timestamp",property.getTimestamp());
return values;
});
}
Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties) {
static Subscription.Feature[] clusterFeature = {Subscription.Feature.local, Subscription.Feature.broker};
static Subscription.Feature[] nonClusterFeature = {Subscription.Feature.local};
Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties, boolean cluster) {
Subscription subscription = Subscription.of(
"realtime-device-properties-measurement",
@ -85,27 +113,32 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
"/device/" + productId + "/" + deviceId + "/message/property/report",
"/device/" + productId + "/" + deviceId + "/message/property/*/reply"
},
Subscription.Feature.local, Subscription.Feature.broker
cluster ? clusterFeature : nonClusterFeature
);
List<PropertyMetadata> props = metadata.getProperties();
Map<String, Integer> index = new HashMap<>();
int idx = 0;
for (PropertyMetadata prop : props) {
if (properties.isEmpty() || properties.contains(prop.getId())) {
index.put(prop.getId(), idx++);
}
}
return
eventBus
.subscribe(subscription, DeviceMessage.class)
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg)))
.flatMap(map -> Flux
.fromIterable(map.entrySet())
//对本次上报的属性进行排序
.sort(Comparator.comparingInt(e -> index.getOrDefault(e.getKey(), 0))))
.<MeasurementValue>map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
;
return registry
.getDevice(deviceId)
.flatMap(DeviceOperator::getMetadata)
.flatMapMany(metadata -> {
List<PropertyMetadata> props = metadata.getProperties();
Map<String, Integer> index = new HashMap<>();
int idx = 0;
for (PropertyMetadata prop : props) {
if (properties.isEmpty() || properties.contains(prop.getId())) {
index.put(prop.getId(), idx++);
}
}
return
eventBus
.subscribe(subscription, DeviceMessage.class)
.flatMap(msg -> Flux
.fromIterable(DeviceMessageUtils.tryGetCompleteProperties(msg))
.filter(e -> index.containsKey(e.getId()))
//对本次上报的属性进行排序
.sort(Comparator.comparingInt(e -> index.getOrDefault(e.getId(), 0)))
.<MeasurementValue>map(e -> SimpleMeasurementValue.of(createValue(metadata, e), e.getTimestamp())))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
;
});
}
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
@ -127,11 +160,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
private class HistoryDevicePropertyDimension implements MeasurementDimension {
@Override
@Generated
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.history;
}
@Override
@Generated
public DataType getValueType() {
return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL)
@ -140,11 +175,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
}
@Override
@Generated
public ConfigMetadata getParams() {
return configMetadata;
}
@Override
@Generated
public boolean isRealTime() {
return false;
}
@ -167,11 +204,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
private class RealTimeDevicePropertyDimension implements MeasurementDimension {
@Override
@Generated
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.realTime;
}
@Override
@Generated
public DataType getValueType() {
return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL)
@ -180,11 +219,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
}
@Override
@Generated
public ConfigMetadata getParams() {
return configMetadata;
}
@Override
@Generated
public boolean isRealTime() {
return true;
}
@ -196,12 +237,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history, getPropertiesFromParameter(parameter))
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId, getPropertiesFromParameter(parameter))
fromRealTime(deviceId, getPropertiesFromParameter(parameter), parameter.getBoolean("cluster", true))
);
});
}

View File

@ -37,7 +37,7 @@
<r2dbc.version>Borca-SR2</r2dbc.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<netty.version>4.1.97.Final</netty.version>
<elasticsearch.version>7.17.5</elasticsearch.version>
<elasticsearch.version>7.17.13</elasticsearch.version>
<californium.version>3.7.0</californium.version>
<fastjson.version>1.2.83</fastjson.version>
<reactor.version>2020.0.31</reactor.version>
@ -223,7 +223,7 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20230227</version>
<version>20231013</version>
</dependency>
<dependency>