优化统计

This commit is contained in:
zhou-hao 2020-09-29 10:12:05 +08:00
parent 15dd5dc9ca
commit c2518daf29
12 changed files with 296 additions and 212 deletions

View File

@ -1,8 +1,15 @@
package org.jetlinks.community.gateway;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
@ -19,4 +26,29 @@ public class DeviceMessageUtils {
return Optional.empty();
}
public static Optional<DeviceMessage> convert(TopicPayload message) {
return Optional.of(message.decode(DeviceMessage.class));
}
public static Optional<DeviceMessage> convert(ByteBuf payload) {
return MessageType.convertMessage(JSON.parseObject(payload.toString(StandardCharsets.UTF_8)));
}
public static Optional<Map<String, Object>> tryGetProperties(DeviceMessage message) {
if (message instanceof ReportPropertyMessage) {
return Optional.ofNullable(((ReportPropertyMessage) message).getProperties());
}
if (message instanceof ReadPropertyMessageReply) {
return Optional.ofNullable(((ReadPropertyMessageReply) message).getProperties());
}
if (message instanceof WritePropertyMessageReply) {
return Optional.ofNullable(((WritePropertyMessageReply) message).getProperties());
}
return Optional.empty();
}
}

View File

@ -1,13 +1,12 @@
package org.jetlinks.community.device.measurements;
import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.dashboard.Measurement;
import org.jetlinks.community.dashboard.ObjectDefinition;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.dashboard.Measurement;
import org.jetlinks.community.dashboard.ObjectDefinition;
import org.jetlinks.community.device.service.data.DeviceDataService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -20,24 +19,24 @@ public class DeviceDashboardObject implements DashboardObject {
private final EventBus eventBus;
private final TimeSeriesManager timeSeriesManager;
private final DeviceDataService deviceDataService;
private DeviceDashboardObject(String id, String name,
DeviceProductOperator productOperator,
EventBus eventBus,
TimeSeriesManager timeSeriesManager) {
DeviceDataService dataService) {
this.id = id;
this.name = name;
this.productOperator = productOperator;
this.eventBus = eventBus;
this.timeSeriesManager = timeSeriesManager;
this.deviceDataService = dataService;
}
public static DeviceDashboardObject of(String id, String name,
DeviceProductOperator productOperator,
EventBus eventBus,
TimeSeriesManager timeSeriesManager) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, timeSeriesManager);
DeviceProductOperator productOperator,
EventBus eventBus,
DeviceDataService dataService ) {
return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService);
}
@Override
@ -61,17 +60,17 @@ public class DeviceDashboardObject implements DashboardObject {
productOperator.getMetadata()
.flatMapIterable(DeviceMetadata::getEvents)
.map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId())))),
.map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)),
productOperator.getMetadata()
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),eventBus, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id)))),
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)),
productOperator.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, timeSeriesManager)),
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)),
productOperator.getMetadata()
.flatMapIterable(DeviceMetadata::getProperties)
.map(event -> new DevicePropertyMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id))))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
);
}
@ -79,18 +78,18 @@ public class DeviceDashboardObject implements DashboardObject {
public Mono<Measurement> getMeasurement(String id) {
if ("properties".equals(id)) {
return productOperator.getMetadata()
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),eventBus, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id))));
.map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata));
}
if ("events".equals(id)) {
return productOperator.getMetadata()
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, timeSeriesManager));
.map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService));
}
return productOperator.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id)))
.<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(this.id, event.getId()))))
.<Measurement>map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService))
//事件没获取到则尝试获取属性
.switchIfEmpty(productOperator.getMetadata()
.flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id)))
.map(event -> new DevicePropertyMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id)))));
.map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)));
}
}

View File

@ -1,11 +1,11 @@
package org.jetlinks.community.device.measurements;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.community.dashboard.DashboardObject;
import org.jetlinks.community.device.entity.DeviceProductEntity;
import org.jetlinks.community.device.service.LocalDeviceProductService;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -21,16 +21,16 @@ public class DeviceDynamicDashboard implements DeviceDashboard {
private final EventBus eventBus;
private final TimeSeriesManager timeSeriesManager;
private final DeviceDataService dataService;
public DeviceDynamicDashboard(LocalDeviceProductService productService,
DeviceRegistry registry,
EventBus eventBus,
TimeSeriesManager timeSeriesManager) {
DeviceDataService deviceDataService,
EventBus eventBus) {
this.productService = productService;
this.registry = registry;
this.eventBus = eventBus;
this.timeSeriesManager = timeSeriesManager;
this.dataService = deviceDataService;
}
@PostConstruct
@ -53,6 +53,6 @@ public class DeviceDynamicDashboard implements DeviceDashboard {
protected Mono<DeviceDashboardObject> convertObject(DeviceProductEntity product) {
return registry.getProduct(product.getId())
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, timeSeriesManager));
.map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService));
}
}

View File

@ -1,9 +1,6 @@
package org.jetlinks.community.device.measurements;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
@ -14,6 +11,9 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -23,19 +23,19 @@ class DeviceEventMeasurement extends StaticMeasurement {
public EventBus eventBus;
private final TimeSeriesService eventTsService;
private final DeviceDataService deviceDataService;
private final String productId;
public DeviceEventMeasurement(String productId,
EventBus eventBus,
EventMetadata eventMetadata,
TimeSeriesService eventTsService) {
DeviceDataService deviceDataService) {
super(MetadataMeasurementDefinition.of(eventMetadata));
this.productId = productId;
this.eventBus = eventBus;
this.eventMetadata = eventMetadata;
this.eventTsService = eventTsService;
this.deviceDataService = deviceDataService;
addDimension(new RealTimeDeviceEventDimension());
}
@ -48,14 +48,14 @@ class DeviceEventMeasurement extends StaticMeasurement {
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
.doPaging(0, history)
.where("deviceId", deviceId)
.execute(eventTsService::query)
.map(data -> SimpleMeasurementValue.of(data.getData(), data.getTimestamp()))
.execute(q->deviceDataService.queryEvent(deviceId,eventMetadata.getId(),q,false))
.map(data -> SimpleMeasurementValue.of(data, data.getTimestamp()))
.sort(MeasurementValue.sort());
}
Flux<MeasurementValue> fromRealTime(String deviceId) {
org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription
Subscription subscription = Subscription
.of("deviceEventMeasurement", "/device/" + productId + "/" + deviceId + "/message/event/" + eventMetadata.getId(), Subscription.Feature.local);
return eventBus

View File

@ -1,10 +1,6 @@
package org.jetlinks.community.device.measurements;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
@ -12,6 +8,9 @@ import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -23,7 +22,7 @@ class DeviceEventsMeasurement extends StaticMeasurement {
private final EventBus eventBus;
private final TimeSeriesManager timeSeriesManager;
private final DeviceDataService deviceDataService;
private final DeviceMetadata metadata;
@ -32,11 +31,11 @@ class DeviceEventsMeasurement extends StaticMeasurement {
public DeviceEventsMeasurement(String productId,
EventBus eventBus,
DeviceMetadata deviceMetadata,
TimeSeriesManager timeSeriesManager) {
DeviceDataService deviceDataService) {
super(MeasurementDefinition.of("events", "事件记录"));
this.productId = productId;
this.eventBus = eventBus;
this.timeSeriesManager = timeSeriesManager;
this.deviceDataService = deviceDataService;
this.metadata = deviceMetadata;
addDimension(new RealTimeDevicePropertyDimension());
}
@ -47,9 +46,8 @@ class DeviceEventsMeasurement extends StaticMeasurement {
return history <= 0 ? Flux.empty() : Flux.fromIterable(metadata.getEvents())
.flatMap(event -> QueryParamEntity.newQuery()
.doPaging(0, history)
.where("deviceId", deviceId)
.execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(productId, event.getId()))::query)
.map(data -> SimpleMeasurementValue.of(createValue(event.getId(), data.getData()), data.getTimestamp()))
.execute(q -> deviceDataService.queryEvent(deviceId, event.getId(), q, false))
.map(data -> SimpleMeasurementValue.of(createValue(event.getId(), data), data.getTimestamp()))
.sort(MeasurementValue.sort()));
}
@ -61,10 +59,10 @@ class DeviceEventsMeasurement extends StaticMeasurement {
}
Flux<MeasurementValue> fromRealTime(String deviceId) {
org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of(
Subscription subscription = Subscription.of(
"realtime-device-events-measurement",
"/device/" + productId + "/" + deviceId + "/message/event/*",
org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker
Subscription.Feature.local, Subscription.Feature.broker
);
return
eventBus
@ -117,17 +115,18 @@ class DeviceEventsMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
return Mono
.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
);
return //合并历史数据和实时数据
Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
);
});
}
}

View File

@ -1,57 +1,54 @@
package org.jetlinks.community.device.measurements;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@Slf4j
class DevicePropertiesMeasurement extends StaticMeasurement {
private final EventBus eventBus;
private final TimeSeriesService timeSeriesService;
private final DeviceMetadata metadata;
private final DeviceDataService dataService;
private final String productId;
public DevicePropertiesMeasurement(String productId,
EventBus eventBus,
DeviceMetadata deviceMetadata,
TimeSeriesService timeSeriesService) {
DeviceDataService dataService,
DeviceMetadata deviceMetadata) {
super(MeasurementDefinition.of("properties", "属性记录"));
this.productId = productId;
this.eventBus = eventBus;
this.timeSeriesService = timeSeriesService;
this.metadata = deviceMetadata;
this.dataService = dataService;
addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension());
}
Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
return history <= 0 ? Flux.empty() : Flux.fromIterable(metadata.getProperties())
.flatMap(propertyMetadata -> QueryParamEntity.newQuery()
.doPaging(0, history)
.where("deviceId", deviceId)
.and("property", propertyMetadata.getId())
.execute(timeSeriesService::query)
.map(data -> SimpleMeasurementValue.of(createValue(propertyMetadata.getId(), data.get("value").orElse(null)), data.getTimestamp()))
.sort(MeasurementValue.sort()));
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
.doPaging(0, history)
.execute(q -> dataService.queryEachProperties(deviceId, q))
.map(data -> SimpleMeasurementValue.of(data, data.getTimestamp()))
.sort(MeasurementValue.sort());
}
Map<String, Object> createValue(String property, Object value) {
@ -77,32 +74,22 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
Flux<MeasurementValue> fromRealTime(String deviceId) {
org.jetlinks.core.event.Subscription subscription= org.jetlinks.core.event.Subscription.of(
Subscription subscription = Subscription.of(
"realtime-device-properties-measurement",
new String[]{
"/device/" + productId + "/" + deviceId + "/message/property/report",
"/device/" + productId + "/" + deviceId + "/message/property/*/reply"
},
org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker
Subscription.Feature.local, Subscription.Feature.broker
);
return
eventBus
.subscribe(subscription, DeviceMessage.class)
.flatMap(msg -> {
if (msg instanceof ReportPropertyMessage) {
return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties());
}
if (msg instanceof ReadPropertyMessageReply) {
return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties());
}
if (msg instanceof WritePropertyMessageReply) {
return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties());
}
return Mono.empty();
})
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg)))
.flatMap(map -> Flux.fromIterable(map.entrySet()))
.map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
.<MeasurementValue>map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis()))
.onErrorContinue((err, v) -> log.error(err.getMessage(), err))
;
}
@ -110,7 +97,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
/**
* 历史设备事件
* 历史
*/
private class HistoryDevicePropertyDimension implements MeasurementDimension {
@ -122,9 +109,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
return new ObjectType()
.addProperty("property","属性", StringType.GLOBAL)
.addProperty("value","", StringType.GLOBAL)
.addProperty("formatValue","格式化值", StringType.GLOBAL);
.addProperty("property", "属性", StringType.GLOBAL)
.addProperty("value", "", StringType.GLOBAL)
.addProperty("formatValue", "格式化值", StringType.GLOBAL);
}
@Override
@ -149,7 +136,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
}
/**
* 实时设备事件
* 实时
*/
private class RealTimeDevicePropertyDimension implements MeasurementDimension {
@ -161,9 +148,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
@Override
public DataType getValueType() {
return new ObjectType()
.addProperty("property","属性", StringType.GLOBAL)
.addProperty("value","", StringType.GLOBAL)
.addProperty("formatValue","格式化值", StringType.GLOBAL);
.addProperty("property", "属性", StringType.GLOBAL)
.addProperty("value", "", StringType.GLOBAL)
.addProperty("formatValue", "格式化值", StringType.GLOBAL);
}
@Override
@ -182,7 +169,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement {
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
,

View File

@ -2,27 +2,27 @@ package org.jetlinks.community.device.measurements;
import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.service.data.DeviceDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@ -33,19 +33,19 @@ class DevicePropertyMeasurement extends StaticMeasurement {
private final EventBus eventBus;
private final TimeSeriesService timeSeriesService;
private final DeviceDataService deviceDataService;
private final String productId;
public DevicePropertyMeasurement(String productId,
EventBus eventBus,
PropertyMetadata metadata,
TimeSeriesService timeSeriesService) {
DeviceDataService deviceDataService) {
super(MetadataMeasurementDefinition.of(metadata));
this.productId = productId;
this.eventBus = eventBus;
this.metadata = metadata;
this.timeSeriesService = timeSeriesService;
this.deviceDataService = deviceDataService;
addDimension(new RealTimeDevicePropertyDimension());
addDimension(new HistoryDevicePropertyDimension());
if (metadata.getValueType() instanceof NumberType) {
@ -64,12 +64,15 @@ class DevicePropertyMeasurement extends StaticMeasurement {
}
Flux<SimpleMeasurementValue> fromHistory(String deviceId, int history) {
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
return history <= 0
? Flux.empty()
: QueryParamEntity
.newQuery()
.doPaging(0, history)
.where("deviceId", deviceId)
.and("property", metadata.getId())
.execute(timeSeriesService::query)
.map(data -> SimpleMeasurementValue.of(createValue(data.get("value").orElse(null)), data.getTimestamp()))
.execute(q -> deviceDataService.queryProperty(deviceId, q, metadata.getId()))
.map(data -> SimpleMeasurementValue.of(data, data.getTimestamp()))
.sort(MeasurementValue.sort());
}
@ -85,18 +88,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
return eventBus
.subscribe(subscription, DeviceMessage.class)
.flatMap(msg -> {
if (msg instanceof ReportPropertyMessage) {
return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties());
}
if (msg instanceof ReadPropertyMessageReply) {
return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties());
}
if (msg instanceof WritePropertyMessageReply) {
return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties());
}
return Mono.empty();
})
.flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg)))
.filter(msg -> msg.containsKey(metadata.getId()))
.map(msg -> SimpleMeasurementValue.of(createValue(msg.get(metadata.getId())), System.currentTimeMillis()));
}
@ -150,23 +142,38 @@ class DevicePropertyMeasurement extends StaticMeasurement {
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of()
.agg("numberValue", "value", parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG))
.filter(query -> query
.where("property", metadata.getId())
.and("deviceId", parameter.getString("deviceId").orElse(null))
)
.limit(parameter.getInt("limit", 10))
.groupBy(parameter.getInterval("time", Interval.ofSeconds(10)), parameter.getString("format", "HH:mm:ss"))
.from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.execute(timeSeriesService::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
createValue(data.getInt("value").orElse(0)),
data.getString("time").orElse(""),
index))
.sort();
String deviceId = parameter.getString("deviceId", null);
DeviceDataService.AggregationRequest request = new DeviceDataService.AggregationRequest();
DeviceDataService.DevicePropertyAggregation aggregation = new DeviceDataService.DevicePropertyAggregation(
metadata.getId(), metadata.getId(), parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG)
);
String format = parameter.getString("format", "HH:mm:ss");
DateTimeFormatter formatter = DateTimeFormat.forPattern(format);
request.setLimit(parameter.getInt("limit", 10));
request.setInterval(parameter.getInterval("time", Interval.ofSeconds(10)));
request.setFormat(format);
request.setFrom(parameter.getDate("from", DateTime.now().plusDays(-1).toDate()));
request.setTo(parameter.getDate("to", DateTime.now().plusDays(-1).toDate()));
Flux<AggregationData> dataFlux;
if (StringUtils.hasText(deviceId)) {
dataFlux = deviceDataService
.aggregationPropertiesByDevice(deviceId, request, aggregation);
} else {
dataFlux = deviceDataService.aggregationPropertiesByProduct(productId, request, aggregation);
}
return dataFlux
.map(data -> {
long ts = data.getString("time")
.map(time -> DateTime.parse(time, formatter).getMillis())
.orElse(System.currentTimeMillis());
return SimpleMeasurementValue.of(createValue(
data.get(metadata.getId()).orElse(0)),
data.getString("time",""),
ts);
})
.sort();
}
}
@ -203,18 +210,15 @@ class DevicePropertyMeasurement extends StaticMeasurement {
return Mono.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(1);
return QueryParamEntity.newQuery()
return QueryParamEntity.newQuery()
.doPaging(0, history)
.where("deviceId", deviceId)
.and("property", metadata.getId())
.as(query -> query
.gte("timestamp", parameter.getDate("from").orElse(null))
.lte("timestamp", parameter.getDate("to").orElse(null)))
.execute(timeSeriesService::query)
.execute(q -> deviceDataService.queryProperty(deviceId, q, metadata.getId()))
.map(data -> SimpleMeasurementValue.of(
createValue(data.get("value").orElse(null)),
DateFormatter.toString(new Date(data.getTimestamp()), parameter.getString("timeFormat","HH:mm:ss")),
data,
DateFormatter.toString(new Date(data.getTimestamp()), parameter.getString("timeFormat", "HH:mm:ss")),
data.getTimestamp()))
.sort(MeasurementValue.sort());
});
@ -254,14 +258,14 @@ class DevicePropertyMeasurement extends StaticMeasurement {
return Mono.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
);
return //合并历史数据和实时数据
Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
);
});
}
}

View File

@ -1,11 +1,7 @@
package org.jetlinks.community.device.measurements.message;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.metadata.ConfigMetadata;
@ -14,12 +10,25 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
class DeviceMessageMeasurement extends StaticMeasurement {
@ -27,11 +36,14 @@ class DeviceMessageMeasurement extends StaticMeasurement {
private final TimeSeriesManager timeSeriesManager;
private final DeviceRegistry deviceRegistry;
static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量");
public DeviceMessageMeasurement(EventBus eventBus,
DeviceRegistry registry,
TimeSeriesManager timeSeriesManager) {
super(definition);
this.deviceRegistry = registry;
this.eventBus = eventBus;
this.timeSeriesManager = timeSeriesManager;
addDimension(new RealTimeMessageDimension());
@ -65,9 +77,11 @@ class DeviceMessageMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
//通过订阅消息来统计实时数据量
return eventBus
.subscribe(org.jetlinks.core.event.Subscription.of("real-time-device-message", "/device/**", org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker))
.subscribe(Subscription.of("real-time-device-message", "/device/**", Subscription.Feature.local, Subscription.Feature.broker))
.window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1)))
.flatMap(Flux::count)
.map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis()));
@ -107,10 +121,41 @@ class DeviceMessageMeasurement extends StaticMeasurement {
return false;
}
private AggregationQueryParam createQueryParam(MeasurementParameter parameter) {
return AggregationQueryParam.of()
// .sum("count")
.groupBy(
parameter.getInterval("time").orElse(Interval.ofHours(1)),
parameter.getString("format").orElse("MM月dd日 HH时"))
// .filter(query ->
// query
// .where("name", "message-count")
// .is("productId", parameter.getString("productId").orElse(null))
// .is("msgType", parameter.getString("msgType").orElse(null))
// )
.limit(parameter.getInt("limit").orElse(1))
.from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()));
}
private Mono<TimeSeriesMetric[]> getProductMetrics(List<String> productIdList) {
return Flux
.fromIterable(productIdList)
.flatMap(id -> deviceRegistry
.getProduct(id)
.flatMap(DeviceProductOperator::getMetadata)
.onErrorResume(err -> Mono.empty())
.flatMapMany(metadata -> Flux.fromIterable(metadata.getEvents())
.map(event -> DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId())))
.concatWithValues(DeviceTimeSeriesMetric.devicePropertyMetric(id)))
.collectList()
.map(list -> list.toArray(new TimeSeriesMetric[0]));
}
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
return AggregationQueryParam.of()
return AggregationQueryParam.of()
.sum("count")
.groupBy(
parameter.getInterval("time").orElse(Interval.ofHours(1)),

View File

@ -1,6 +1,9 @@
package org.jetlinks.community.device.measurements.message;
import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
@ -8,25 +11,24 @@ import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.DeviceMessage;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider {
MeterRegistry registry;
public DeviceMessageMeasurementProvider(EventBus eventBus,
MeterRegistryManager registryManager,
DeviceRegistry deviceRegistry,
TimeSeriesManager timeSeriesManager) {
super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.message);
registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(),
"target", "msgType", "productId");
addMeasurement(new DeviceMessageMeasurement(eventBus, timeSeriesManager));
addMeasurement(new DeviceMessageMeasurement(eventBus, deviceRegistry, timeSeriesManager));
}
@ -46,7 +48,6 @@ public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider
return empty;
}
return new String[]{
"msgType", message.getMessageType().name().toLowerCase(),
"productId", message.getHeader("productId").map(String::valueOf).orElse("unknown")
};
}

View File

@ -1,11 +1,5 @@
package org.jetlinks.community.device.measurements.status;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
@ -17,6 +11,15 @@ import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -32,7 +35,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
private final TimeSeriesManager timeSeriesManager;
static final MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更");
static MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更");
static ConfigMetadata configMetadata = new DefaultConfigMetadata()
.add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector"));
@ -88,11 +91,12 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
String format = parameter.getString("format").orElse("yyyy年MM月dd日");
DateTimeFormatter formatter = DateTimeFormat.forPattern(format);
return AggregationQueryParam.of()
.sum("count")
.groupBy(parameter.getInterval("time").orElse(Interval.ofHours(1)),
parameter.getString("format").orElse("MM月dd日 HH时"))
.groupBy(parameter.getInterval("time", Interval.ofDays(1)), format)
.filter(query ->
query.where("name", parameter.getString("type").orElse("online"))
.is("productId", parameter.getString("productId").orElse(null))
@ -101,10 +105,15 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
.from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant())))
.to(parameter.getDate("to").orElse(new Date()))
.execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
data.getInt("count").orElse(0),
data.getString("time").orElse(""),
index))
.map(data -> {
long ts = data.getString("time")
.map(time -> DateTime.parse(time, formatter).getMillis())
.orElse(System.currentTimeMillis());
return SimpleMeasurementValue.of(
data.get("count").orElse(0),
data.getString("time", ""),
ts);
})
.sort();
}
}
@ -124,7 +133,6 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
return type;
}
@Override
public ConfigMetadata getParams() {
return configMetadata;
@ -137,21 +145,20 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
return Mono.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
//从消息网关订阅消息
return eventBus.subscribe(org.jetlinks.core.event.Subscription.of(
.flatMapMany(deviceId ->//从消息网关订阅消息
eventBus.subscribe(Subscription.of(
"RealTimeDeviceStateDimension"
, new String[]{
"/device/*/" + deviceId + "/online",
"/device/*/" + deviceId + "/offline"
},
org.jetlinks.core.event.Subscription.Feature.local,
Subscription.Feature.local,
Subscription.Feature.broker
), DeviceMessage.class)
.map(msg -> SimpleMeasurementValue.of(createStateValue(msg), msg.getTimestamp()))
;
});
.map(msg -> SimpleMeasurementValue.of(createStateValue(msg), msg.getTimestamp())));
}
Map<String, Object> createStateValue(DeviceMessage message) {

View File

@ -2,6 +2,8 @@ package org.jetlinks.community.device.measurements.status;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.device.measurements.DeviceDashboardDefinition;
import org.jetlinks.community.device.measurements.DeviceObjectDefinition;
@ -10,8 +12,6 @@ import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.micrometer.MeterRegistryManager;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.DeviceMessage;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@ -51,8 +51,8 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
}
@Subscribe("/device/*/*/online")
public Mono<Void> incrementOnline(DeviceMessage msg) {
return Mono.fromRunnable(() -> {
public Mono<Void> incrementOnline(DeviceMessage msg){
return Mono.fromRunnable(()->{
String productId = parseProductId(msg);
counterAdder.apply(productId).increment();
registry
@ -62,18 +62,18 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider {
}
@Subscribe("/device/*/*/offline")
public Mono<Void> incrementOffline(DeviceMessage msg) {
return Mono.fromRunnable(() -> {
public Mono<Void> incrementOffline(DeviceMessage msg){
return Mono.fromRunnable(()->{
String productId = parseProductId(msg);
counterAdder.apply(productId).increment();
// counterAdder.apply(productId).decrement();
registry
.counter("offline", "productId", productId)
.increment();
});
}
private String parseProductId(DeviceMessage deviceMessage) {
return deviceMessage
private String parseProductId(DeviceMessage msg) {
return msg
.getHeader("productId")
.map(String::valueOf)
.orElse("unknown");

View File

@ -1,6 +1,5 @@
package org.jetlinks.community.device.measurements.status;
import org.jetlinks.community.Interval;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
@ -8,6 +7,7 @@ import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.*;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
@ -16,10 +16,12 @@ import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
@ -27,9 +29,9 @@ import java.util.Date;
class DeviceStatusRecordMeasurement
extends StaticMeasurement {
private final LocalDeviceInstanceService instanceService;
public LocalDeviceInstanceService instanceService;
private final TimeSeriesManager timeSeriesManager;
private TimeSeriesManager timeSeriesManager;
static MeasurementDefinition definition = MeasurementDefinition.of("record", "设备状态记录");
@ -76,6 +78,9 @@ class DeviceStatusRecordMeasurement
@Override
public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
String format = parameter.getString("format").orElse("yyyy年MM月dd日");
DateTimeFormatter formatter = DateTimeFormat.forPattern(format);
return AggregationQueryParam.of()
.max("value")
.filter(query ->
@ -88,10 +93,15 @@ class DeviceStatusRecordMeasurement
parameter.getString("format").orElse("yyyy年MM月dd日"))
.limit(parameter.getInt("limit").orElse(10))
.execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation)
.index((index, data) -> SimpleMeasurementValue.of(
data.getInt("value").orElse(0),
data.getString("time").orElse("-"),
index))
.map(data -> {
long ts = data.getString("time")
.map(time -> DateTime.parse(time, formatter).getMillis())
.orElse(System.currentTimeMillis());
return SimpleMeasurementValue.of(
data.get("value").orElse(0),
data.getString("time", ""),
ts);
})
.sort();
}
}