From c1363bd555d881558684d07c6d930e7fb9b1c2aa Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Wed, 26 May 2021 11:14:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=8C=87=E5=AE=9A=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=E6=9F=A5=E8=AF=A2=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DevicePropertiesMeasurement.java | 49 +++++++++++++------ .../data/AbstractDeviceDataStoragePolicy.java | 19 +++++++ .../data/DefaultDeviceDataService.java | 5 +- .../service/data/DeviceDataService.java | 3 +- .../service/data/DeviceDataStoragePolicy.java | 3 +- .../data/NoneDeviceDataStoragePolicy.java | 3 +- ...meSeriesColumnDeviceDataStoragePolicy.java | 24 ++++----- ...SeriesRowDeviceDataStoreStoragePolicy.java | 47 +++++++++--------- 8 files changed, 93 insertions(+), 60 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java index 1798c50a..8871153a 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java @@ -12,13 +12,12 @@ import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.gateway.DeviceMessageUtils; +import org.jetlinks.reactor.ql.utils.CastUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; @Slf4j class DevicePropertiesMeasurement extends StaticMeasurement { @@ -45,16 +44,20 @@ class DevicePropertiesMeasurement extends StaticMeasurement { } - Flux fromHistory(String deviceId, int history) { - return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery() + Flux fromHistory(String deviceId, int history, Set properties) { + return history <= 0 + ? Flux.empty() + : QueryParamEntity + .newQuery() .doPaging(0, history) - .execute(q -> dataService.queryEachProperties(deviceId, q)) + .execute(q -> dataService.queryEachProperties(deviceId, q, properties.toArray(new String[0]))) .map(data -> SimpleMeasurementValue.of(data, data.getTimestamp())) .sort(MeasurementValue.sort()); } Map createValue(String property, Object value) { - return metadata.getProperty(property) + return metadata + .getProperty(property) .map(meta -> { Map values = new HashMap<>(); DataType type = meta.getValueType(); @@ -74,7 +77,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { }); } - Flux fromRealTime(String deviceId) { + Flux fromRealTime(String deviceId, Set properties) { Subscription subscription = Subscription.of( "realtime-device-properties-measurement", @@ -88,7 +91,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement { Map index = new HashMap<>(); int idx = 0; for (PropertyMetadata prop : props) { - index.put(prop.getId(), idx++); + if (properties.isEmpty() || properties.contains(prop.getId())) { + index.put(prop.getId(), idx++); + } } return eventBus @@ -106,6 +111,16 @@ class DevicePropertiesMeasurement extends StaticMeasurement { static ConfigMetadata configMetadata = new DefaultConfigMetadata() .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")); + static Set getPropertiesFromParameter(MeasurementParameter parameter) { + return parameter + .get("properties") + .map(CastUtils::castArray) + .orElse(Collections.emptyList()) + .stream() + .map(String::valueOf) + .collect(Collectors.toSet()); + } + /** * 历史 */ @@ -136,11 +151,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { - return Mono.justOrEmpty(parameter.getString("deviceId")) + return Mono + .justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(1); - //合并历史数据和实时数据 - return fromHistory(deviceId, history); + + return fromHistory(deviceId, history, getPropertiesFromParameter(parameter)); }); } } @@ -175,16 +191,17 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { - return Mono.justOrEmpty(parameter.getString("deviceId")) + return Mono + .justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); //合并历史数据和实时数据 return Flux.concat( //查询历史数据 - fromHistory(deviceId, history) + fromHistory(deviceId, history, getPropertiesFromParameter(parameter)) , //从消息网关订阅实时事件消息 - fromRealTime(deviceId) + fromRealTime(deviceId, getPropertiesFromParameter(parameter)) ); }); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java index ebaa8d0d..ca87cce3 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage; import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsJsonStringStorage; @@ -511,6 +512,24 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .flatMap(product -> Mono.zip(Mono.just(product), product.getMetadata())); } + protected List getPropertyMetadata(DeviceMetadata metadata, String... properties) { + if (properties == null || properties.length == 0) { + return metadata.getProperties(); + } + if (properties.length == 1) { + return metadata.getProperty(properties[0]) + .map(Arrays::asList) + .orElseGet(Collections::emptyList); + } + Set ids = new HashSet<>(Arrays.asList(properties)); + return metadata + .getProperties() + .stream() + .filter(prop -> ids.isEmpty() || ids.contains(prop.getId())) + .collect(Collectors.toList()); + } + + private final AtomicInteger nanoInc = new AtomicInteger(); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java index a0ead1e8..d8fb1c53 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DefaultDeviceDataService.java @@ -90,10 +90,11 @@ public class DefaultDeviceDataService implements DeviceDataService { @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... properties) { return this .getDeviceStrategy(deviceId) - .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query)); + .flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties)); } @Nonnull diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java index 3812c454..d8e70831 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataService.java @@ -98,7 +98,8 @@ public interface DeviceDataService { */ @Nonnull Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query); + @Nonnull QueryParamEntity query, + @Nonnull String... properties); /** * 查询指定的设备属性列表 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java index d652fb11..ac00b135 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/DeviceDataStoragePolicy.java @@ -127,7 +127,8 @@ public interface DeviceDataStoragePolicy { */ @Nonnull Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query); + @Nonnull QueryParamEntity query, + @Nonnull String... property); /** * 查询指定的设备属性列表 diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java index bf4ff3e5..27fb5e40 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/NoneDeviceDataStoragePolicy.java @@ -84,7 +84,8 @@ public class NoneDeviceDataStoragePolicy implements DeviceDataStoragePolicy { @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... property) { return Flux.empty(); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java index 005782c1..ee3a8e64 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java @@ -174,23 +174,19 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... property) { - return deviceRegistry - .getDevice(deviceId) - .flatMapMany(device -> Mono - .zip(device.getProduct(), device.getMetadata()) - .flatMapMany(tp2 -> { + return this + .getProductAndMetadataByDevice(deviceId) + .flatMapMany(tp2 -> { - Map propertiesMap = tp2 - .getT2() - .getProperties() - .stream() - .collect(Collectors.toMap(PropertyMetadata::getId, Function - .identity(), (a, b) -> a)); + Map propertiesMap = getPropertyMetadata(tp2.getT2(), property) + .stream() + .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a)); - return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query); - })); + return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query); + }); } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java index 650e5618..4207bbc6 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java @@ -169,33 +169,30 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD @Nonnull @Override public Flux queryEachProperties(@Nonnull String deviceId, - @Nonnull QueryParamEntity query) { + @Nonnull QueryParamEntity query, + @Nonnull String... property) { - return deviceRegistry - .getDevice(deviceId) - .flatMapMany(device -> Mono - .zip(device.getProduct(), device.getMetadata()) - .flatMapMany(tp2 -> { + return getProductAndMetadataByDevice(deviceId) + .flatMapMany(tp2 -> { - Map propertiesMap = tp2.getT2() - .getProperties() - .stream() - .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a)); - if (propertiesMap.isEmpty()) { - return Flux.empty(); - } - return timeSeriesManager - .getService(devicePropertyMetric(tp2.getT1().getId())) - .aggregation(AggregationQueryParam - .of() - .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize())) - .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组 - .filter(query) - .filter(q -> q.where("deviceId", deviceId)) - ).map(data -> DeviceProperty - .of(data, data.getString("property").map(propertiesMap::get).orElse(null)) - .deviceId(deviceId)); - })); + Map propertiesMap = getPropertyMetadata(tp2.getT2(), property) + .stream() + .collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a)); + if (propertiesMap.isEmpty()) { + return Flux.empty(); + } + return timeSeriesManager + .getService(devicePropertyMetric(tp2.getT1().getId())) + .aggregation(AggregationQueryParam + .of() + .agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize())) + .groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组 + .filter(query) + .filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet())) + ).map(data -> DeviceProperty + .of(data, data.getString("property").map(propertiesMap::get).orElse(null)) + .deviceId(deviceId)); + }); } protected String getTimeSeriesMetric(String productId) {