优化设备dashboard

This commit is contained in:
zhou-hao 2020-04-13 11:14:41 +08:00
parent 5466cb21b5
commit f38d1d934a
3 changed files with 155 additions and 81 deletions

View File

@ -19,17 +19,16 @@ import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class DevicePropertiesMeasurement extends StaticMeasurement {
private MessageGateway messageGateway;
private final MessageGateway messageGateway;
private TimeSeriesService timeSeriesService;
private final TimeSeriesService timeSeriesService;
private DeviceMetadata metadata;
private String productId;
private final DeviceMetadata metadata;
private final String productId;
public DevicePropertiesMeasurement(String productId,
MessageGateway messageGateway,
@ -42,6 +41,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
this.metadata = deviceMetadata;
addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension());
}
static AtomicLong num = new AtomicLong();
@ -103,7 +103,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
}
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
/**
* 历史设备事件
@ -117,25 +117,10 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
SimplePropertyMetadata property = new SimplePropertyMetadata();
property.setId("property");
property.setName("属性");
property.setValueType(new StringType());
SimplePropertyMetadata value = new SimplePropertyMetadata();
value.setId("value");
value.setName("");
value.setValueType(new StringType());
SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
value.setId("formatValue");
value.setName("格式化值");
value.setValueType(new StringType());
return new ObjectType()
.addPropertyMetadata(property)
.addPropertyMetadata(value)
.addPropertyMetadata(formatValue);
.addProperty("property","属性", StringType.GLOBAL)
.addProperty("value","", StringType.GLOBAL)
.addProperty("formatValue","格式化值", StringType.GLOBAL);
}
@Override
@ -171,25 +156,10 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
SimplePropertyMetadata property = new SimplePropertyMetadata();
property.setId("property");
property.setName("属性");
property.setValueType(new StringType());
SimplePropertyMetadata value = new SimplePropertyMetadata();
value.setId("value");
value.setName("");
value.setValueType(new StringType());
SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
value.setId("formatValue");
value.setName("格式化值");
value.setValueType(new StringType());
return new ObjectType()
.addPropertyMetadata(property)
.addPropertyMetadata(value)
.addPropertyMetadata(formatValue);
.addProperty("property","属性", StringType.GLOBAL)
.addProperty("value","", StringType.GLOBAL)
.addProperty("formatValue","格式化值", StringType.GLOBAL);
}
@Override

View File

@ -5,18 +5,25 @@ import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.message.DeviceMessageUtils;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.Subscription;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@ -24,24 +31,28 @@ import java.util.stream.Stream;
class DevicePropertyMeasurement extends StaticMeasurement {
private PropertyMetadata metadata;
private final PropertyMetadata metadata;
private MessageGateway messageGateway;
private final MessageGateway messageGateway;
private TimeSeriesService timeSeriesService;
private final TimeSeriesService timeSeriesService;
private String productId;
private final String productId;
public DevicePropertyMeasurement(String productId,
MessageGateway messageGateway,
PropertyMetadata metadata,
TimeSeriesService timeSeriesService) {
super(MetadataMeasurementDefinition.of(metadata));
this.productId=productId;
this.productId = productId;
this.messageGateway = messageGateway;
this.metadata = metadata;
this.timeSeriesService = timeSeriesService;
addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension());
if (metadata.getValueType() instanceof NumberType) {
addDimension(new AggDevicePropertyDimension());
}
}
@ -67,8 +78,8 @@ class DevicePropertyMeasurement extends StaticMeasurement {
Flux<MeasurementValue> fromRealTime(String deviceId) {
return messageGateway
.subscribe(Stream.of(
"/device/"+productId+"/" + deviceId + "/message/property/report"
, "/device/"+productId+"/" + deviceId + "/message/property/*/reply")
"/device/" + productId + "/" + deviceId + "/message/property/report"
, "/device/" + productId + "/" + deviceId + "/message/property/*/reply")
.map(Subscription::new)
.collect(Collectors.toList()), true)
.flatMap(val -> Mono.justOrEmpty(DeviceMessageUtils.convert(val)))
@ -92,6 +103,107 @@ class DevicePropertyMeasurement extends StaticMeasurement {
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"))
.add("history", "历史数据量", "查询出历史数据后开始推送实时数据", new IntType().min(0).expand("defaultValue", 10));
static ConfigMetadata aggConfigMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备ID", "", StringType.GLOBAL)
.add("time", "周期", "例如: 1h,10m,30s", StringType.GLOBAL)
.add("agg", "聚合类型", "count,sum,avg,max,min", StringType.GLOBAL)
.add("format", "时间格式", "如: MM-dd:HH", StringType.GLOBAL)
.add("limit", "最大数据量", "", StringType.GLOBAL)
.add("from", "时间从", "", StringType.GLOBAL)
.add("to", "时间至", "", StringType.GLOBAL);
/**
* 聚合数据
*/
private class AggDevicePropertyDimension implements MeasurementDimension {
@Override
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.agg;
}
@Override
public DataType getValueType() {
return new ObjectType()
.addProperty("value", "数据", new ObjectType()
.addProperty("property", StringType.GLOBAL)
.addProperty("value", metadata.getValueType())
.addProperty("formatValue", StringType.GLOBAL))
.addProperty("timeString", "时间", StringType.GLOBAL);
}
@Override
public ConfigMetadata getParams() {
return aggConfigMetadata;
}
@Override
public boolean isRealTime() {
return false;
}
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of()
.agg("numberValue", "value", parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG))
.filter(query -> query
.where("property", metadata.getId())
.and("deviceId", parameter.getString("deviceId").orElse(null))
)
.limit(parameter.getInt("limit", 10))
.groupBy(parameter.getInterval("time", Interval.ofSeconds(10)), parameter.getString("format", "HH:mm:ss"))
.from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.execute(timeSeriesService::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
createValue(data.getInt("value").orElse(0)),
data.getString("time").orElse(""),
index))
.sort();
}
}
/**
* 历史设备数据
*/
private class HistoryDevicePropertyDimension implements MeasurementDimension {
@Override
public DimensionDefinition getDefinition() {
return CommonDimensionDefinition.history;
}
@Override
public DataType getValueType() {
return new ObjectType()
.addProperty("property", "属性", StringType.GLOBAL)
.addProperty("value", "", metadata.getValueType())
.addProperty("formatValue", "格式化值", StringType.GLOBAL);
}
@Override
public ConfigMetadata getParams() {
return configMetadata;
}
@Override
public boolean isRealTime() {
return false;
}
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(1);
//合并历史数据和实时数据
return fromHistory(deviceId, history);
});
}
}
/**
* 实时设备事件
*/
@ -104,20 +216,10 @@ class DevicePropertyMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
SimplePropertyMetadata value = new SimplePropertyMetadata();
value.setId("value");
value.setName("");
value.setValueType(metadata.getValueType());
SimplePropertyMetadata formatValue = new SimplePropertyMetadata();
value.setId("formatValue");
value.setName("格式化值");
value.setValueType(new StringType());
return new ObjectType()
.addPropertyMetadata(value)
.addPropertyMetadata(formatValue);
.addProperty("property", "属性", StringType.GLOBAL)
.addProperty("value", "", metadata.getValueType())
.addProperty("formatValue", "格式化值", StringType.GLOBAL);
}
@Override
@ -146,4 +248,6 @@ class DevicePropertyMeasurement extends StaticMeasurement {
});
}
}
}

View File

@ -1,16 +1,17 @@
package org.jetlinks.community.device.measurements.message;
import org.jetlinks.community.Interval;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.gateway.MessageGateway;
import org.jetlinks.community.gateway.Subscription;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
@ -18,13 +19,14 @@ import reactor.core.publisher.Flux;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.Date;
class DeviceMessageMeasurement extends StaticMeasurement {
private MessageGateway messageGateway;
private final MessageGateway messageGateway;
private TimeSeriesManager timeSeriesManager;
private final TimeSeriesManager timeSeriesManager;
static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量");
@ -37,8 +39,6 @@ class DeviceMessageMeasurement extends StaticMeasurement {
}
static DataType valueType = new IntType();
static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata()
.add("interval", "数据统计周期", "例如: 1s,10s", new StringType());
@ -51,7 +51,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
return valueType;
return IntType.GLOBAL;
}
@Override
@ -68,7 +68,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
//通过订阅消息来统计实时数据量
return messageGateway
.subscribe("/device/**")
.subscribe(Collections.singleton(new Subscription("/device/**")),true)
.window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1)))
.flatMap(Flux::count)
.map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis()));
@ -95,7 +95,7 @@ class DeviceMessageMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
return valueType;
return IntType.GLOBAL;
}
@Override
@ -113,16 +113,17 @@ class DeviceMessageMeasurement extends StaticMeasurement {
return AggregationQueryParam.of()
.sum("count")
.groupBy(parameter.getInterval("time", Interval.ofHours(1)),
parameter.getString("format", "MM月dd日 HH时"))
.groupBy(
parameter.getInterval("time").orElse(Interval.ofHours(1)),
parameter.getString("format").orElse("MM月dd日 HH时"))
.filter(query ->
query.where("name", "message-count")
.is("productId", parameter.getString("productId", null))
.is("msgType", parameter.getString("msgType", null))
.is("productId", parameter.getString("productId").orElse(null))
.is("msgType", parameter.getString("msgType").orElse(null))
)
.limit(parameter.getInt("limit", 1))
.limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElseGet(Date::new))
.to(parameter.getDate("to").orElse(new Date()))
.execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
data.getInt("count").orElse(0),
@ -132,5 +133,4 @@ class DeviceMessageMeasurement extends StaticMeasurement {
}
}
}