diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java index f213ef25..8fb531b6 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java @@ -19,17 +19,16 @@ import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; class DevicePropertiesMeasurement extends StaticMeasurement { - private MessageGateway messageGateway; + private final MessageGateway messageGateway; - private TimeSeriesService timeSeriesService; + private final TimeSeriesService timeSeriesService; - private DeviceMetadata metadata; - private String productId; + private final DeviceMetadata metadata; + + private final String productId; public DevicePropertiesMeasurement(String productId, MessageGateway messageGateway, @@ -42,6 +41,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { this.metadata = deviceMetadata; addDimension(new RealTimeDevicePropertyDimension()); addDimension(new HistoryDevicePropertyDimension()); + } static AtomicLong num = new AtomicLong(); @@ -103,7 +103,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { } static ConfigMetadata configMetadata = new DefaultConfigMetadata() - .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")); + .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")); /** * 历史设备事件 @@ -117,25 +117,10 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public DataType getValueType() { - SimplePropertyMetadata property = new SimplePropertyMetadata(); - property.setId("property"); - property.setName("属性"); - property.setValueType(new StringType()); - - SimplePropertyMetadata value = new SimplePropertyMetadata(); - value.setId("value"); - value.setName("值"); - value.setValueType(new StringType()); - - SimplePropertyMetadata formatValue = new SimplePropertyMetadata(); - value.setId("formatValue"); - value.setName("格式化值"); - value.setValueType(new StringType()); - return new ObjectType() - .addPropertyMetadata(property) - .addPropertyMetadata(value) - .addPropertyMetadata(formatValue); + .addProperty("property","属性", StringType.GLOBAL) + .addProperty("value","值", StringType.GLOBAL) + .addProperty("formatValue","格式化值", StringType.GLOBAL); } @Override @@ -171,25 +156,10 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public DataType getValueType() { - SimplePropertyMetadata property = new SimplePropertyMetadata(); - property.setId("property"); - property.setName("属性"); - property.setValueType(new StringType()); - - SimplePropertyMetadata value = new SimplePropertyMetadata(); - value.setId("value"); - value.setName("值"); - value.setValueType(new StringType()); - - SimplePropertyMetadata formatValue = new SimplePropertyMetadata(); - value.setId("formatValue"); - value.setName("格式化值"); - value.setValueType(new StringType()); - return new ObjectType() - .addPropertyMetadata(property) - .addPropertyMetadata(value) - .addPropertyMetadata(formatValue); + .addProperty("property","属性", StringType.GLOBAL) + .addProperty("value","值", StringType.GLOBAL) + .addProperty("formatValue","格式化值", StringType.GLOBAL); } @Override diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java index 963cd8da..5f8ce748 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java @@ -5,18 +5,25 @@ import org.jetlinks.core.message.property.ReadPropertyMessageReply; import org.jetlinks.core.message.property.ReportPropertyMessage; import org.jetlinks.core.message.property.WritePropertyMessageReply; import org.jetlinks.core.metadata.*; +import org.jetlinks.core.metadata.types.IntType; +import org.jetlinks.core.metadata.types.NumberType; +import org.jetlinks.core.metadata.types.ObjectType; +import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.Interval; import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.message.DeviceMessageUtils; import org.jetlinks.community.gateway.MessageGateway; import org.jetlinks.community.gateway.Subscription; import org.jetlinks.community.timeseries.TimeSeriesService; -import org.jetlinks.core.metadata.types.IntType; -import org.jetlinks.core.metadata.types.ObjectType; -import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.timeseries.query.Aggregation; +import org.jetlinks.community.timeseries.query.AggregationQueryParam; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; @@ -24,24 +31,28 @@ import java.util.stream.Stream; class DevicePropertyMeasurement extends StaticMeasurement { - private PropertyMetadata metadata; + private final PropertyMetadata metadata; - private MessageGateway messageGateway; + private final MessageGateway messageGateway; - private TimeSeriesService timeSeriesService; + private final TimeSeriesService timeSeriesService; - private String productId; + private final String productId; public DevicePropertyMeasurement(String productId, MessageGateway messageGateway, PropertyMetadata metadata, TimeSeriesService timeSeriesService) { super(MetadataMeasurementDefinition.of(metadata)); - this.productId=productId; + this.productId = productId; this.messageGateway = messageGateway; this.metadata = metadata; this.timeSeriesService = timeSeriesService; addDimension(new RealTimeDevicePropertyDimension()); + addDimension(new HistoryDevicePropertyDimension()); + if (metadata.getValueType() instanceof NumberType) { + addDimension(new AggDevicePropertyDimension()); + } } @@ -67,8 +78,8 @@ class DevicePropertyMeasurement extends StaticMeasurement { Flux fromRealTime(String deviceId) { return messageGateway .subscribe(Stream.of( - "/device/"+productId+"/" + deviceId + "/message/property/report" - , "/device/"+productId+"/" + deviceId + "/message/property/*/reply") + "/device/" + productId + "/" + deviceId + "/message/property/report" + , "/device/" + productId + "/" + deviceId + "/message/property/*/reply") .map(Subscription::new) .collect(Collectors.toList()), true) .flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val))) @@ -92,6 +103,107 @@ class DevicePropertyMeasurement extends StaticMeasurement { .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")) .add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10)); + static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata() + .add("deviceId", "设备ID", "", StringType.GLOBAL) + .add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL) + .add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL) + .add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL) + .add("limit", "最大数据量", "", StringType.GLOBAL) + .add("from", "时间从", "", StringType.GLOBAL) + .add("to", "时间至", "", StringType.GLOBAL); + + /** + * 聚合数据 + */ + private class AggDevicePropertyDimension implements MeasurementDimension { + + @Override + public DimensionDefinition getDefinition() { + return CommonDimensionDefinition.agg; + } + + @Override + public DataType getValueType() { + return new ObjectType() + .addProperty("value", "数据", new ObjectType() + .addProperty("property", StringType.GLOBAL) + .addProperty("value", metadata.getValueType()) + .addProperty("formatValue", StringType.GLOBAL)) + .addProperty("timeString", "时间", StringType.GLOBAL); + } + + @Override + public ConfigMetadata getParams() { + return aggConfigMetadata; + } + + @Override + public boolean isRealTime() { + return false; + } + + @Override + public Flux getValue(MeasurementParameter parameter) { + + return AggregationQueryParam.of() + .agg("numberValue", "value", parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG)) + .filter(query -> query + .where("property", metadata.getId()) + .and("deviceId", parameter.getString("deviceId").orElse(null)) + ) + .limit(parameter.getInt("limit", 10)) + .groupBy(parameter.getInterval("time", Interval.ofSeconds(10)), parameter.getString("format", "HH:mm:ss")) + .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) + .to(parameter.getDate("to").orElse(new Date())) + .execute(timeSeriesService::aggregation) + .index((index, data) -> SimpleMeasurementValue.of( + createValue(data.getInt("value").orElse(0)), + data.getString("time").orElse(""), + index)) + .sort(); + + } + } + + /** + * 历史设备数据 + */ + private class HistoryDevicePropertyDimension implements MeasurementDimension { + + @Override + public DimensionDefinition getDefinition() { + return CommonDimensionDefinition.history; + } + + @Override + public DataType getValueType() { + return new ObjectType() + .addProperty("property", "属性", StringType.GLOBAL) + .addProperty("value", "值", metadata.getValueType()) + .addProperty("formatValue", "格式化值", StringType.GLOBAL); + } + + @Override + public ConfigMetadata getParams() { + return configMetadata; + } + + @Override + public boolean isRealTime() { + return false; + } + + @Override + public Flux getValue(MeasurementParameter parameter) { + return Mono.justOrEmpty(parameter.getString("deviceId")) + .flatMapMany(deviceId -> { + int history = parameter.getInt("history").orElse(1); + //合并历史数据和实时数据 + return fromHistory(deviceId, history); + }); + } + } + /** * 实时设备事件 */ @@ -104,20 +216,10 @@ class DevicePropertyMeasurement extends StaticMeasurement { @Override public DataType getValueType() { - - SimplePropertyMetadata value = new SimplePropertyMetadata(); - value.setId("value"); - value.setName("值"); - value.setValueType(metadata.getValueType()); - - SimplePropertyMetadata formatValue = new SimplePropertyMetadata(); - value.setId("formatValue"); - value.setName("格式化值"); - value.setValueType(new StringType()); - return new ObjectType() - .addPropertyMetadata(value) - .addPropertyMetadata(formatValue); + .addProperty("property", "属性", StringType.GLOBAL) + .addProperty("value", "值", metadata.getValueType()) + .addProperty("formatValue", "格式化值", StringType.GLOBAL); } @Override @@ -146,4 +248,6 @@ class DevicePropertyMeasurement extends StaticMeasurement { }); } } + + } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java index 5774a61f..02563840 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java @@ -1,16 +1,17 @@ package org.jetlinks.community.device.measurements.message; -import org.jetlinks.community.Interval; import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.Interval; import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; import org.jetlinks.community.gateway.MessageGateway; +import org.jetlinks.community.gateway.Subscription; import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.query.AggregationQueryParam; import reactor.core.publisher.Flux; @@ -18,13 +19,14 @@ import reactor.core.publisher.Flux; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Collections; import java.util.Date; class DeviceMessageMeasurement extends StaticMeasurement { - private MessageGateway messageGateway; + private final MessageGateway messageGateway; - private TimeSeriesManager timeSeriesManager; + private final TimeSeriesManager timeSeriesManager; static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量"); @@ -37,8 +39,6 @@ class DeviceMessageMeasurement extends StaticMeasurement { } - static DataType valueType = new IntType(); - static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata() .add("interval", "数据统计周期", "例如: 1s,10s", new StringType()); @@ -51,7 +51,7 @@ class DeviceMessageMeasurement extends StaticMeasurement { @Override public DataType getValueType() { - return valueType; + return IntType.GLOBAL; } @Override @@ -68,7 +68,7 @@ class DeviceMessageMeasurement extends StaticMeasurement { public Flux getValue(MeasurementParameter parameter) { //通过订阅消息来统计实时数据量 return messageGateway - .subscribe("/device/**") + .subscribe(Collections.singleton(new Subscription("/device/**")),true) .window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1))) .flatMap(Flux::count) .map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis())); @@ -95,7 +95,7 @@ class DeviceMessageMeasurement extends StaticMeasurement { @Override public DataType getValueType() { - return valueType; + return IntType.GLOBAL; } @Override @@ -113,16 +113,17 @@ class DeviceMessageMeasurement extends StaticMeasurement { return AggregationQueryParam.of() .sum("count") - .groupBy(parameter.getInterval("time", Interval.ofHours(1)), - parameter.getString("format", "MM月dd日 HH时")) + .groupBy( + parameter.getInterval("time").orElse(Interval.ofHours(1)), + parameter.getString("format").orElse("MM月dd日 HH时")) .filter(query -> query.where("name", "message-count") - .is("productId", parameter.getString("productId", null)) - .is("msgType", parameter.getString("msgType", null)) + .is("productId", parameter.getString("productId").orElse(null)) + .is("msgType", parameter.getString("msgType").orElse(null)) ) - .limit(parameter.getInt("limit", 1)) + .limit(parameter.getInt("limit").orElse(1)) .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) - .to(parameter.getDate("to").orElseGet(Date::new)) + .to(parameter.getDate("to").orElse(new Date())) .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation) .index((index, data) -> SimpleMeasurementValue.of( data.getInt("count").orElse(0), @@ -132,5 +133,4 @@ class DeviceMessageMeasurement extends StaticMeasurement { } } - }