diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java index aee05203..7b37da5c 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java @@ -1,10 +1,12 @@ package org.jetlinks.community.device.measurements; +import lombok.Generated; import org.jetlinks.community.dashboard.DashboardObject; import org.jetlinks.community.dashboard.Measurement; import org.jetlinks.community.dashboard.ObjectDefinition; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.core.device.DeviceProductOperator; +import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.metadata.DeviceMetadata; import reactor.core.publisher.Flux; @@ -21,33 +23,40 @@ public class DeviceDashboardObject implements DashboardObject { private final DeviceDataService deviceDataService; + private final DeviceRegistry registry; + private DeviceDashboardObject(String id, String name, DeviceProductOperator productOperator, EventBus eventBus, - DeviceDataService dataService) { + DeviceDataService dataService, + DeviceRegistry registry) { this.id = id; this.name = name; this.productOperator = productOperator; this.eventBus = eventBus; this.deviceDataService = dataService; + this.registry = registry; } public static DeviceDashboardObject of(String id, String name, - DeviceProductOperator productOperator, - EventBus eventBus, - DeviceDataService dataService ) { - return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService); + DeviceProductOperator productOperator, + EventBus eventBus, + DeviceDataService dataService, + DeviceRegistry registry) { + return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService, registry); } @Override public ObjectDefinition getDefinition() { return new ObjectDefinition() { @Override + @Generated public String getId() { return id; } @Override + @Generated public String getName() { return name; } @@ -56,40 +65,57 @@ public class DeviceDashboardObject implements DashboardObject { @Override public Flux getMeasurements() { - return Flux.concat( + return Flux + .concat( - productOperator.getMetadata() - .flatMapIterable(DeviceMetadata::getEvents) - .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)), + productOperator + .getMetadata() + .flatMapIterable(DeviceMetadata::getEvents) + .map(event -> new DeviceEventMeasurement(productOperator.getId(), + eventBus, + event, + deviceDataService)), - productOperator.getMetadata() - .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)), + Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), + eventBus, + deviceDataService, + registry)), - productOperator.getMetadata() - .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)), + productOperator + .getMetadata() + .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), + eventBus, + metadata, + deviceDataService)), - productOperator.getMetadata() - .flatMapIterable(DeviceMetadata::getProperties) - .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) - ); + productOperator + .getMetadata() + .flatMapIterable(DeviceMetadata::getProperties) + .map(event -> new DevicePropertyMeasurement(productOperator.getId(), + eventBus, + event, + deviceDataService)) + ); } @Override public Mono getMeasurement(String id) { if ("properties".equals(id)) { - return productOperator.getMetadata() - .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)); + return Mono.just(new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, registry)); } if ("events".equals(id)) { - return productOperator.getMetadata() + return productOperator + .getMetadata() .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)); } - return productOperator.getMetadata() + return productOperator + .getMetadata() .flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id))) .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) //事件没获取到则尝试获取属性 - .switchIfEmpty(productOperator.getMetadata() - .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id))) - .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))); + .switchIfEmpty(productOperator + .getMetadata() + .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id))) + .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java index 98b574d0..58edcd51 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java @@ -53,6 +53,6 @@ public class DeviceDynamicDashboard implements DeviceDashboard { protected Mono convertObject(DeviceProductEntity product) { return registry.getProduct(product.getId()) - .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService)); + .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService,registry)); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java index 6bb0b2c4..c17e6723 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java @@ -1,22 +1,29 @@ package org.jetlinks.community.device.measurements; +import lombok.Generated; import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.gateway.DeviceMessageUtils; +import org.jetlinks.core.device.DeviceOperator; +import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.DeviceMessage; +import org.jetlinks.core.message.property.Property; import org.jetlinks.core.metadata.*; +import org.jetlinks.core.metadata.types.NumberType; import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.core.metadata.unit.ValueUnit; import org.jetlinks.reactor.ql.utils.CastUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; @Slf4j @@ -24,21 +31,21 @@ class DevicePropertiesMeasurement extends StaticMeasurement { private final EventBus eventBus; - private final DeviceMetadata metadata; - private final DeviceDataService dataService; private final String productId; + private final DeviceRegistry registry; + public DevicePropertiesMeasurement(String productId, EventBus eventBus, DeviceDataService dataService, - DeviceMetadata deviceMetadata) { + DeviceRegistry registry) { super(MeasurementDefinition.of("properties", "属性记录")); this.productId = productId; this.eventBus = eventBus; - this.metadata = deviceMetadata; this.dataService = dataService; + this.registry = registry; addDimension(new RealTimeDevicePropertyDimension()); addDimension(new HistoryDevicePropertyDimension()); @@ -55,29 +62,50 @@ class DevicePropertiesMeasurement extends StaticMeasurement { .sort(MeasurementValue.sort()); } - Map createValue(String property, Object value) { + Map createValue(DeviceMetadata metadata, Property property) { return metadata - .getProperty(property) + .getProperty(property.getId()) .map(meta -> { Map values = new HashMap<>(); DataType type = meta.getValueType(); - Object val = type instanceof Converter ? ((Converter) type).convert(value) : value; + Object val; + if (type instanceof NumberType) { + NumberType numberType = ((NumberType) type); + val = NumberType.convertScaleNumber(property.getValue(), numberType.getScale(), numberType.getRound(), Function.identity()); + } else if (type instanceof Converter) { + val = ((Converter) type).convert(property.getValue()); + } else { + val = property.getValue(); + } values.put("formatValue", type.format(val)); values.put("value", val); - values.put("property", property); - + values.put("state", property.getState()); + values.put("property", property.getId()); + values.put("timestamp",property.getTimestamp()); + if (type instanceof UnitSupported) { + UnitSupported unitSupported = (UnitSupported) type; + values.put("unit", Optional.ofNullable(unitSupported.getUnit()) + .map(ValueUnit::getSymbol) + .orElse(null)); + } return values; }) .orElseGet(() -> { Map values = new HashMap<>(); - values.put("formatValue", value); - values.put("value", value); - values.put("property", property); + values.put("formatValue", property.getValue()); + values.put("value", property.getValue()); + values.put("state", property.getState()); + values.put("property", property.getId()); + values.put("timestamp",property.getTimestamp()); return values; }); } - Flux fromRealTime(String deviceId, Set properties) { + static Subscription.Feature[] clusterFeature = {Subscription.Feature.local, Subscription.Feature.broker}; + static Subscription.Feature[] nonClusterFeature = {Subscription.Feature.local}; + + + Flux fromRealTime(String deviceId, Set properties, boolean cluster) { Subscription subscription = Subscription.of( "realtime-device-properties-measurement", @@ -85,27 +113,32 @@ class DevicePropertiesMeasurement extends StaticMeasurement { "/device/" + productId + "/" + deviceId + "/message/property/report", "/device/" + productId + "/" + deviceId + "/message/property/*/reply" }, - Subscription.Feature.local, Subscription.Feature.broker + cluster ? clusterFeature : nonClusterFeature ); - List props = metadata.getProperties(); - Map index = new HashMap<>(); - int idx = 0; - for (PropertyMetadata prop : props) { - if (properties.isEmpty() || properties.contains(prop.getId())) { - index.put(prop.getId(), idx++); - } - } - return - eventBus - .subscribe(subscription, DeviceMessage.class) - .flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg))) - .flatMap(map -> Flux - .fromIterable(map.entrySet()) - //对本次上报的属性进行排序 - .sort(Comparator.comparingInt(e -> index.getOrDefault(e.getKey(), 0)))) - .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis())) - .onErrorContinue((err, v) -> log.error(err.getMessage(), err)) - ; + return registry + .getDevice(deviceId) + .flatMap(DeviceOperator::getMetadata) + .flatMapMany(metadata -> { + List props = metadata.getProperties(); + Map index = new HashMap<>(); + int idx = 0; + for (PropertyMetadata prop : props) { + if (properties.isEmpty() || properties.contains(prop.getId())) { + index.put(prop.getId(), idx++); + } + } + return + eventBus + .subscribe(subscription, DeviceMessage.class) + .flatMap(msg -> Flux + .fromIterable(DeviceMessageUtils.tryGetCompleteProperties(msg)) + .filter(e -> index.containsKey(e.getId())) + //对本次上报的属性进行排序 + .sort(Comparator.comparingInt(e -> index.getOrDefault(e.getId(), 0))) + .map(e -> SimpleMeasurementValue.of(createValue(metadata, e), e.getTimestamp()))) + .onErrorContinue((err, v) -> log.error(err.getMessage(), err)) + ; + }); } static ConfigMetadata configMetadata = new DefaultConfigMetadata() @@ -127,11 +160,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement { private class HistoryDevicePropertyDimension implements MeasurementDimension { @Override + @Generated public DimensionDefinition getDefinition() { return CommonDimensionDefinition.history; } @Override + @Generated public DataType getValueType() { return new ObjectType() .addProperty("property", "属性", StringType.GLOBAL) @@ -140,11 +175,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement { } @Override + @Generated public ConfigMetadata getParams() { return configMetadata; } @Override + @Generated public boolean isRealTime() { return false; } @@ -167,11 +204,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement { private class RealTimeDevicePropertyDimension implements MeasurementDimension { @Override + @Generated public DimensionDefinition getDefinition() { return CommonDimensionDefinition.realTime; } @Override + @Generated public DataType getValueType() { return new ObjectType() .addProperty("property", "属性", StringType.GLOBAL) @@ -180,11 +219,13 @@ class DevicePropertiesMeasurement extends StaticMeasurement { } @Override + @Generated public ConfigMetadata getParams() { return configMetadata; } @Override + @Generated public boolean isRealTime() { return true; } @@ -196,12 +237,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement { .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); //合并历史数据和实时数据 - return Flux.concat( + return Flux.concat( //查询历史数据 fromHistory(deviceId, history, getPropertiesFromParameter(parameter)) , //从消息网关订阅实时事件消息 - fromRealTime(deviceId, getPropertiesFromParameter(parameter)) + fromRealTime(deviceId, getPropertiesFromParameter(parameter), parameter.getBoolean("cluster", true)) ); }); } diff --git a/pom.xml b/pom.xml index 7fc2dd95..94c97a75 100644 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,7 @@ Borca-SR2 3.0.2 4.1.97.Final - 7.17.5 + 7.17.13 3.7.0 1.2.83 2020.0.31 @@ -223,7 +223,7 @@ org.json json - 20230227 + 20231013