优化聚合查询

This commit is contained in:
zhou-hao 2021-12-08 10:54:26 +08:00
parent 7a13499887
commit ec414a3cc4
3 changed files with 106 additions and 33 deletions

View File

@ -3,6 +3,7 @@ package org.jetlinks.community.timeseries.query;
import org.jetlinks.community.ValueObject;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -20,6 +21,12 @@ public interface AggregationData extends ValueObject {
return asMap();
}
default AggregationData merge(AggregationData another) {
Map<String, Object> newVal = new HashMap<>(asMap());
newVal.putAll(another.asMap());
return of(newVal);
}
static AggregationData of(Map<String, Object> map) {
return () -> map;
}

View File

@ -6,10 +6,7 @@ import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.community.timeseries.query.Group;
import org.jetlinks.community.timeseries.query.TimeGroup;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
@ -220,9 +217,28 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
.to(request.to)
.filter(request.filter)
.execute(timeSeriesManager.getService(getPropertyTimeSeriesMetric(productId))::aggregation)
.groupBy(agg -> agg.getString("time", ""))
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.flatMap(group -> group
.map(AggregationData::asMap)
.map(data -> {
Map<String, Object> newMap = new HashMap<>();
newMap.put("time", data.get("time").orElse(null));
for (DeviceDataService.DevicePropertyAggregation property : properties) {
Object val;
if(property.getAgg() == Aggregation.FIRST || property.getAgg()==Aggregation.TOP){
val = data
.get(property.getProperty())
.orElse(null);
}else {
val = data
.get(property.getAlias())
.orElse(null);
}
if (null != val) {
newMap.put(property.getAlias(), val);
}
}
return newMap;
})
.reduce((a, b) -> {
a.putAll(b);
return a;
@ -231,6 +247,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
.sort(Comparator.<AggregationData, Date>comparing(agg -> DateTime
.parse(agg.getString("time", ""), formatter)
.toDate()).reversed())
.take(request.getLimit())
;
}

View File

@ -223,47 +223,92 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
}
Map<String, String> propertyAlias = Arrays.stream(properties)
.collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias, DeviceDataService.DevicePropertyAggregation::getProperty));
.collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias,
DeviceDataService.DevicePropertyAggregation::getProperty));
return AggregationQueryParam.of()
Map<String, DeviceDataService.DevicePropertyAggregation> aliasProperty = Arrays
.stream(properties)
.collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias,
Function.identity()));
return AggregationQueryParam
.of()
.as(param -> {
Arrays.stream(properties)
.forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg()));
.forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg()));
return param;
})
.groupBy((Group) new TimeGroup(request.interval, "time", request.format))
.as(param -> {
if (request.interval == null) {
return param;
}
return param.groupBy((Group) new TimeGroup(request.interval, "time", request.format));
})
.groupBy(new LimitGroup("property", "property", properties.length))
.limit(request.limit * properties.length)
.from(request.from)
.to(request.to)
.filter(request.filter)
.filter(query -> query.where().in("property", propertyAlias.values()))
.filter(query -> query
.where()
.in("property", new HashSet<>(propertyAlias.values())))
//执行查询
.execute(timeSeriesManager.getService(getTimeSeriesMetric(productId))::aggregation)
//按时间分组,然后将返回的结果合并起来
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.flatMap(group ->
{
String time = group.key();
return group
//按属性分组
.groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE)
.flatMap(propsGroup -> {
String property = propsGroup.key();
return propsGroup
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putIfAbsent("time", time);
a.putIfAbsent("_time", b.get("_time").orElseGet(Date::new));
b.get("value_" + property).ifPresent(v -> a.put(property, v));
return a;
});
})
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putAll(b);
return a;
});
.as(flux -> {
//按时间分组
if (request.getInterval() != null) {
return flux
.flatMap(group -> {
String time = group.key();
return group
//按属性分组
.groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE)
.flatMap(propsGroup -> {
String property = String.valueOf(propsGroup.key());
return propsGroup
.reduce(AggregationData::merge)
.map(agg -> {
Map<String, Object> data = new HashMap<>();
data.put("_time", agg.get("_time").orElse(time));
data.put("time", time);
aliasProperty.forEach((alias, prp) -> {
if (prp.getAgg() == Aggregation.FIRST || prp.getAgg() == Aggregation.TOP) {
data.putIfAbsent(alias, agg
.get("numberValue")
.orElse(agg.get("value").orElse(null)));
} else if (property.equals(prp.getProperty())) {
data.putIfAbsent(alias, agg
.get("value_" + alias)
.orElse(0));
}
});
return data;
});
})
.<Map<String, Object>>reduceWith(HashMap::new, (a, b) -> {
a.putAll(b);
return a;
});
}
);
} else {
return flux
.flatMap(group -> group
.reduce(AggregationData::merge)
.map(agg -> {
Map<String, Object> values = new HashMap<>();
//values.put("time", group.key());
for (Map.Entry<String, String> props : propertyAlias.entrySet()) {
values.put(props.getKey(), agg
.get("value_" + props.getKey())
.orElse(0));
}
return values;
}));
}
)
})
.map(map -> {
map.remove("");
propertyAlias
@ -271,8 +316,12 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
.forEach(key -> map.putIfAbsent(key, 0));
return AggregationData.of(map);
})
.sort(Comparator.<AggregationData, Date>comparing(agg -> CastUtils.castDate(agg.values().get("_time"))).reversed())
.sort(Comparator.<AggregationData, Date>comparing(agg -> CastUtils.castDate(agg
.values()
.get("_time")))
.reversed())
.doOnNext(agg -> agg.values().remove("_time"))
.take(request.getLimit())
;
}