diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java index 75d0baa8..70c40a5a 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java @@ -3,6 +3,7 @@ package org.jetlinks.community.timeseries.query; import org.jetlinks.community.ValueObject; +import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -20,6 +21,12 @@ public interface AggregationData extends ValueObject { return asMap(); } + default AggregationData merge(AggregationData another) { + Map newVal = new HashMap<>(asMap()); + newVal.putAll(another.asMap()); + return of(newVal); + } + static AggregationData of(Map map) { return () -> map; } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java index 39ecaf08..ec807f9b 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesColumnDeviceDataStoragePolicy.java @@ -6,10 +6,7 @@ import org.jetlinks.community.device.entity.DeviceProperty; import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata; import org.jetlinks.community.timeseries.TimeSeriesData; import org.jetlinks.community.timeseries.TimeSeriesManager; -import org.jetlinks.community.timeseries.query.AggregationData; -import org.jetlinks.community.timeseries.query.AggregationQueryParam; -import org.jetlinks.community.timeseries.query.Group; -import org.jetlinks.community.timeseries.query.TimeGroup; +import org.jetlinks.community.timeseries.query.*; import org.jetlinks.core.device.DeviceOperator; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.message.DeviceMessage; @@ -220,9 +217,28 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat .to(request.to) .filter(request.filter) .execute(timeSeriesManager.getService(getPropertyTimeSeriesMetric(productId))::aggregation) - .groupBy(agg -> agg.getString("time", "")) + .groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE) .flatMap(group -> group - .map(AggregationData::asMap) + .map(data -> { + Map newMap = new HashMap<>(); + newMap.put("time", data.get("time").orElse(null)); + for (DeviceDataService.DevicePropertyAggregation property : properties) { + Object val; + if(property.getAgg() == Aggregation.FIRST || property.getAgg()==Aggregation.TOP){ + val = data + .get(property.getProperty()) + .orElse(null); + }else { + val = data + .get(property.getAlias()) + .orElse(null); + } + if (null != val) { + newMap.put(property.getAlias(), val); + } + } + return newMap; + }) .reduce((a, b) -> { a.putAll(b); return a; @@ -231,6 +247,7 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat .sort(Comparator.comparing(agg -> DateTime .parse(agg.getString("time", ""), formatter) .toDate()).reversed()) + .take(request.getLimit()) ; } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java index c9cc7a51..5c9fcc62 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/TimeSeriesRowDeviceDataStoreStoragePolicy.java @@ -223,47 +223,92 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD } Map propertyAlias = Arrays.stream(properties) - .collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias, DeviceDataService.DevicePropertyAggregation::getProperty)); + .collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias, + DeviceDataService.DevicePropertyAggregation::getProperty)); - return AggregationQueryParam.of() + Map aliasProperty = Arrays + .stream(properties) + .collect(Collectors.toMap(DeviceDataService.DevicePropertyAggregation::getAlias, + Function.identity())); + + return AggregationQueryParam + .of() .as(param -> { Arrays.stream(properties) - .forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg())); + .forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg())); return param; }) - .groupBy((Group) new TimeGroup(request.interval, "time", request.format)) + .as(param -> { + if (request.interval == null) { + return param; + } + return param.groupBy((Group) new TimeGroup(request.interval, "time", request.format)); + }) .groupBy(new LimitGroup("property", "property", properties.length)) .limit(request.limit * properties.length) .from(request.from) .to(request.to) .filter(request.filter) - .filter(query -> query.where().in("property", propertyAlias.values())) + .filter(query -> query + .where() + .in("property", new HashSet<>(propertyAlias.values()))) //执行查询 .execute(timeSeriesManager.getService(getTimeSeriesMetric(productId))::aggregation) //按时间分组,然后将返回的结果合并起来 .groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE) - .flatMap(group -> - { - String time = group.key(); - return group - //按属性分组 - .groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE) - .flatMap(propsGroup -> { - String property = propsGroup.key(); - return propsGroup - .>reduceWith(HashMap::new, (a, b) -> { - a.putIfAbsent("time", time); - a.putIfAbsent("_time", b.get("_time").orElseGet(Date::new)); - b.get("value_" + property).ifPresent(v -> a.put(property, v)); - return a; - }); - }) - .>reduceWith(HashMap::new, (a, b) -> { - a.putAll(b); - return a; - }); + .as(flux -> { + //按时间分组 + if (request.getInterval() != null) { + return flux + .flatMap(group -> { + String time = group.key(); + return group + //按属性分组 + .groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE) + .flatMap(propsGroup -> { + String property = String.valueOf(propsGroup.key()); + return propsGroup + .reduce(AggregationData::merge) + .map(agg -> { + Map data = new HashMap<>(); + data.put("_time", agg.get("_time").orElse(time)); + data.put("time", time); + aliasProperty.forEach((alias, prp) -> { + if (prp.getAgg() == Aggregation.FIRST || prp.getAgg() == Aggregation.TOP) { + data.putIfAbsent(alias, agg + .get("numberValue") + .orElse(agg.get("value").orElse(null))); + } else if (property.equals(prp.getProperty())) { + data.putIfAbsent(alias, agg + .get("value_" + alias) + .orElse(0)); + } + }); + return data; + }); + }) + .>reduceWith(HashMap::new, (a, b) -> { + a.putAll(b); + return a; + }); + } + ); + } else { + return flux + .flatMap(group -> group + .reduce(AggregationData::merge) + .map(agg -> { + Map values = new HashMap<>(); + //values.put("time", group.key()); + for (Map.Entry props : propertyAlias.entrySet()) { + values.put(props.getKey(), agg + .get("value_" + props.getKey()) + .orElse(0)); + } + return values; + })); } - ) + }) .map(map -> { map.remove(""); propertyAlias @@ -271,8 +316,12 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD .forEach(key -> map.putIfAbsent(key, 0)); return AggregationData.of(map); }) - .sort(Comparator.comparing(agg -> CastUtils.castDate(agg.values().get("_time"))).reversed()) + .sort(Comparator.comparing(agg -> CastUtils.castDate(agg + .values() + .get("_time"))) + .reversed()) .doOnNext(agg -> agg.values().remove("_time")) + .take(request.getLimit()) ; }