支持指定属性查询记录

This commit is contained in:
zhou-hao 2021-05-26 11:14:42 +08:00
parent d8d4668da1
commit c1363bd555
8 changed files with 93 additions and 60 deletions

View File

@ -12,13 +12,12 @@ import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
class DevicePropertiesMeasurement extends StaticMeasurement {
@ -45,16 +44,20 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
}
Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history, Set<String> properties) {
return history <= 0
? Flux.empty()
: QueryParamEntity
.newQuery()
.doPaging(0, history)
.execute(q -> dataService.queryEachProperties(deviceId, q))
.execute(q -> dataService.queryEachProperties(deviceId, q, properties.toArray(new String[0])))
.map(data -> SimpleMeasurementValue.of(data, data.getTimestamp()))
.sort(MeasurementValue.sort());
}
Map<String, Object> createValue(String property, Object value) {
return metadata.getProperty(property)
return metadata
.getProperty(property)
.map(meta -> {
Map<String, Object> values = new HashMap<>();
DataType type = meta.getValueType();
@ -74,7 +77,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
});
}
Flux<MeasurementValue> fromRealTime(String deviceId) {
Flux<MeasurementValue> fromRealTime(String deviceId, Set<String> properties) {
Subscription subscription = Subscription.of(
"realtime-device-properties-measurement",
@ -88,7 +91,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
Map<String, Integer> index = new HashMap<>();
int idx = 0;
for (PropertyMetadata prop : props) {
index.put(prop.getId(), idx++);
if (properties.isEmpty() || properties.contains(prop.getId())) {
index.put(prop.getId(), idx++);
}
}
return
eventBus
@ -106,6 +111,16 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
static Set<String> getPropertiesFromParameter(MeasurementParameter parameter) {
return parameter
.get("properties")
.map(CastUtils::castArray)
.orElse(Collections.emptyList())
.stream()
.map(String::valueOf)
.collect(Collectors.toSet());
}
/**
* 历史
*/
@ -136,11 +151,12 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
return Mono
.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(1);
//合并历史数据和实时数据
return fromHistory(deviceId, history);
return fromHistory(deviceId, history, getPropertiesFromParameter(parameter));
});
}
}
@ -175,16 +191,17 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
return Mono
.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
fromHistory(deviceId, history, getPropertiesFromParameter(parameter))
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
fromRealTime(deviceId, getPropertiesFromParameter(parameter))
);
});
}

View File

@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage;
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsJsonStringStorage;
@ -511,6 +512,24 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.flatMap(product -> Mono.zip(Mono.just(product), product.getMetadata()));
}
protected List<PropertyMetadata> getPropertyMetadata(DeviceMetadata metadata, String... properties) {
if (properties == null || properties.length == 0) {
return metadata.getProperties();
}
if (properties.length == 1) {
return metadata.getProperty(properties[0])
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
Set<String> ids = new HashSet<>(Arrays.asList(properties));
return metadata
.getProperties()
.stream()
.filter(prop -> ids.isEmpty() || ids.contains(prop.getId()))
.collect(Collectors.toList());
}
private final AtomicInteger nanoInc = new AtomicInteger();

View File

@ -90,10 +90,11 @@ public class DefaultDeviceDataService implements DeviceDataService {
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... properties) {
return this
.getDeviceStrategy(deviceId)
.flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query));
.flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties));
}
@Nonnull

View File

@ -98,7 +98,8 @@ public interface DeviceDataService {
*/
@Nonnull
Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query);
@Nonnull QueryParamEntity query,
@Nonnull String... properties);
/**
* 查询指定的设备属性列表

View File

@ -127,7 +127,8 @@ public interface DeviceDataStoragePolicy {
*/
@Nonnull
Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query);
@Nonnull QueryParamEntity query,
@Nonnull String... property);
/**
* 查询指定的设备属性列表

View File

@ -84,7 +84,8 @@ public class NoneDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return Flux.empty();
}

View File

@ -174,23 +174,19 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return deviceRegistry
.getDevice(deviceId)
.flatMapMany(device -> Mono
.zip(device.getProduct(), device.getMetadata())
.flatMapMany(tp2 -> {
return this
.getProductAndMetadataByDevice(deviceId)
.flatMapMany(tp2 -> {
Map<String, PropertyMetadata> propertiesMap = tp2
.getT2()
.getProperties()
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function
.identity(), (a, b) -> a));
Map<String, PropertyMetadata> propertiesMap = getPropertyMetadata(tp2.getT2(), property)
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query);
}));
return queryEachDeviceProperty(tp2.getT1().getId(), deviceId, propertiesMap, query);
});
}

View File

@ -169,33 +169,30 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
@Nonnull
@Override
public Flux<DeviceProperty> queryEachProperties(@Nonnull String deviceId,
@Nonnull QueryParamEntity query) {
@Nonnull QueryParamEntity query,
@Nonnull String... property) {
return deviceRegistry
.getDevice(deviceId)
.flatMapMany(device -> Mono
.zip(device.getProduct(), device.getMetadata())
.flatMapMany(tp2 -> {
return getProductAndMetadataByDevice(deviceId)
.flatMapMany(tp2 -> {
Map<String, PropertyMetadata> propertiesMap = tp2.getT2()
.getProperties()
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
if (propertiesMap.isEmpty()) {
return Flux.empty();
}
return timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.aggregation(AggregationQueryParam
.of()
.agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
.groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
.filter(query)
.filter(q -> q.where("deviceId", deviceId))
).map(data -> DeviceProperty
.of(data, data.getString("property").map(propertiesMap::get).orElse(null))
.deviceId(deviceId));
}));
Map<String, PropertyMetadata> propertiesMap = getPropertyMetadata(tp2.getT2(), property)
.stream()
.collect(Collectors.toMap(PropertyMetadata::getId, Function.identity(), (a, b) -> a));
if (propertiesMap.isEmpty()) {
return Flux.empty();
}
return timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.aggregation(AggregationQueryParam
.of()
.agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
.groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
.filter(query)
.filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
).map(data -> DeviceProperty
.of(data, data.getString("property").map(propertiesMap::get).orElse(null))
.deviceId(deviceId));
});
}
protected String getTimeSeriesMetric(String productId) {