diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java index 34ad1683..5bda65ea 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/DeviceMessageUtils.java @@ -1,8 +1,15 @@ package org.jetlinks.community.gateway; +import com.alibaba.fastjson.JSON; +import io.netty.buffer.ByteBuf; +import org.jetlinks.core.event.TopicPayload; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.MessageType; +import org.jetlinks.core.message.property.ReadPropertyMessageReply; +import org.jetlinks.core.message.property.ReportPropertyMessage; +import org.jetlinks.core.message.property.WritePropertyMessageReply; +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Optional; @@ -19,4 +26,29 @@ public class DeviceMessageUtils { return Optional.empty(); } + public static Optional convert(TopicPayload message) { + return Optional.of(message.decode(DeviceMessage.class)); + } + + public static Optional convert(ByteBuf payload) { + + return MessageType.convertMessage(JSON.parseObject(payload.toString(StandardCharsets.UTF_8))); + + } + + public static Optional> tryGetProperties(DeviceMessage message) { + + if (message instanceof ReportPropertyMessage) { + return Optional.ofNullable(((ReportPropertyMessage) message).getProperties()); + } + + if (message instanceof ReadPropertyMessageReply) { + return Optional.ofNullable(((ReadPropertyMessageReply) message).getProperties()); + } + if (message instanceof WritePropertyMessageReply) { + return Optional.ofNullable(((WritePropertyMessageReply) message).getProperties()); + } + return Optional.empty(); + } + } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java index 799009ac..e8fce0c2 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDashboardObject.java @@ -1,13 +1,12 @@ package org.jetlinks.community.device.measurements; -import org.jetlinks.community.dashboard.DashboardObject; -import org.jetlinks.community.dashboard.Measurement; -import org.jetlinks.community.dashboard.ObjectDefinition; -import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; -import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.metadata.DeviceMetadata; +import org.jetlinks.community.dashboard.DashboardObject; +import org.jetlinks.community.dashboard.Measurement; +import org.jetlinks.community.dashboard.ObjectDefinition; +import org.jetlinks.community.device.service.data.DeviceDataService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -20,24 +19,24 @@ public class DeviceDashboardObject implements DashboardObject { private final EventBus eventBus; - private final TimeSeriesManager timeSeriesManager; + private final DeviceDataService deviceDataService; private DeviceDashboardObject(String id, String name, DeviceProductOperator productOperator, EventBus eventBus, - TimeSeriesManager timeSeriesManager) { + DeviceDataService dataService) { this.id = id; this.name = name; this.productOperator = productOperator; this.eventBus = eventBus; - this.timeSeriesManager = timeSeriesManager; + this.deviceDataService = dataService; } public static DeviceDashboardObject of(String id, String name, - DeviceProductOperator productOperator, - EventBus eventBus, - TimeSeriesManager timeSeriesManager) { - return new DeviceDashboardObject(id, name, productOperator, eventBus, timeSeriesManager); + DeviceProductOperator productOperator, + EventBus eventBus, + DeviceDataService dataService ) { + return new DeviceDashboardObject(id, name, productOperator, eventBus, dataService); } @Override @@ -61,17 +60,17 @@ public class DeviceDashboardObject implements DashboardObject { productOperator.getMetadata() .flatMapIterable(DeviceMetadata::getEvents) - .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId())))), + .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)), productOperator.getMetadata() - .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),eventBus, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id)))), + .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)), productOperator.getMetadata() - .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, timeSeriesManager)), + .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)), productOperator.getMetadata() .flatMapIterable(DeviceMetadata::getProperties) - .map(event -> new DevicePropertyMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(id)))) + .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) ); } @@ -79,18 +78,18 @@ public class DeviceDashboardObject implements DashboardObject { public Mono getMeasurement(String id) { if ("properties".equals(id)) { return productOperator.getMetadata() - .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(),eventBus, metadata, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id)))); + .map(metadata -> new DevicePropertiesMeasurement(productOperator.getId(), eventBus, deviceDataService, metadata)); } if ("events".equals(id)) { return productOperator.getMetadata() - .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, timeSeriesManager)); + .map(metadata -> new DeviceEventsMeasurement(productOperator.getId(), eventBus, metadata, deviceDataService)); } return productOperator.getMetadata() .flatMap(metadata -> Mono.justOrEmpty(metadata.getEvent(id))) - .map(event -> new DeviceEventMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(this.id, event.getId())))) + .map(event -> new DeviceEventMeasurement(productOperator.getId(), eventBus, event, deviceDataService)) //事件没获取到则尝试获取属性 .switchIfEmpty(productOperator.getMetadata() .flatMap(metadata -> Mono.justOrEmpty(metadata.getProperty(id))) - .map(event -> new DevicePropertyMeasurement(productOperator.getId(),eventBus, event, timeSeriesManager.getService(DeviceTimeSeriesMetric.devicePropertyMetric(this.id))))); + .map(event -> new DevicePropertyMeasurement(productOperator.getId(), eventBus, event, deviceDataService))); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java index 77d9b613..056fe6b1 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceDynamicDashboard.java @@ -1,11 +1,11 @@ package org.jetlinks.community.device.measurements; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.event.EventBus; import org.jetlinks.community.dashboard.DashboardObject; import org.jetlinks.community.device.entity.DeviceProductEntity; import org.jetlinks.community.device.service.LocalDeviceProductService; -import org.jetlinks.community.timeseries.TimeSeriesManager; -import org.jetlinks.core.device.DeviceRegistry; -import org.jetlinks.core.event.EventBus; +import org.jetlinks.community.device.service.data.DeviceDataService; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -21,16 +21,16 @@ public class DeviceDynamicDashboard implements DeviceDashboard { private final EventBus eventBus; - private final TimeSeriesManager timeSeriesManager; + private final DeviceDataService dataService; public DeviceDynamicDashboard(LocalDeviceProductService productService, DeviceRegistry registry, - EventBus eventBus, - TimeSeriesManager timeSeriesManager) { + DeviceDataService deviceDataService, + EventBus eventBus) { this.productService = productService; this.registry = registry; this.eventBus = eventBus; - this.timeSeriesManager = timeSeriesManager; + this.dataService = deviceDataService; } @PostConstruct @@ -53,6 +53,6 @@ public class DeviceDynamicDashboard implements DeviceDashboard { protected Mono convertObject(DeviceProductEntity product) { return registry.getProduct(product.getId()) - .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, timeSeriesManager)); + .map(operator -> DeviceDashboardObject.of(product.getId(), product.getName(), operator, eventBus, dataService)); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java index a2690e9b..f9e79616 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventMeasurement.java @@ -1,9 +1,6 @@ package org.jetlinks.community.device.measurements; import org.hswebframework.web.api.crud.entity.QueryParamEntity; -import org.jetlinks.community.dashboard.*; -import org.jetlinks.community.dashboard.supports.StaticMeasurement; -import org.jetlinks.community.timeseries.TimeSeriesService; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.DeviceMessage; @@ -14,6 +11,9 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.EventMetadata; import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.dashboard.*; +import org.jetlinks.community.dashboard.supports.StaticMeasurement; +import org.jetlinks.community.device.service.data.DeviceDataService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -23,19 +23,19 @@ class DeviceEventMeasurement extends StaticMeasurement { public EventBus eventBus; - private final TimeSeriesService eventTsService; + private final DeviceDataService deviceDataService; private final String productId; public DeviceEventMeasurement(String productId, EventBus eventBus, EventMetadata eventMetadata, - TimeSeriesService eventTsService) { + DeviceDataService deviceDataService) { super(MetadataMeasurementDefinition.of(eventMetadata)); this.productId = productId; this.eventBus = eventBus; this.eventMetadata = eventMetadata; - this.eventTsService = eventTsService; + this.deviceDataService = deviceDataService; addDimension(new RealTimeDeviceEventDimension()); } @@ -48,14 +48,14 @@ class DeviceEventMeasurement extends StaticMeasurement { return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery() .doPaging(0, history) .where("deviceId", deviceId) - .execute(eventTsService::query) - .map(data -> SimpleMeasurementValue.of(data.getData(), data.getTimestamp())) + .execute(q->deviceDataService.queryEvent(deviceId,eventMetadata.getId(),q,false)) + .map(data -> SimpleMeasurementValue.of(data, data.getTimestamp())) .sort(MeasurementValue.sort()); } Flux fromRealTime(String deviceId) { - org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription + Subscription subscription = Subscription .of("deviceEventMeasurement", "/device/" + productId + "/" + deviceId + "/message/event/" + eventMetadata.getId(), Subscription.Feature.local); return eventBus diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java index 0ea3bfe7..e5254cfb 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DeviceEventsMeasurement.java @@ -1,10 +1,6 @@ package org.jetlinks.community.device.measurements; import org.hswebframework.web.api.crud.entity.QueryParamEntity; -import org.jetlinks.community.dashboard.*; -import org.jetlinks.community.dashboard.supports.StaticMeasurement; -import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; -import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.DeviceMessage; @@ -12,6 +8,9 @@ import org.jetlinks.core.message.event.EventMessage; import org.jetlinks.core.metadata.*; import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.dashboard.*; +import org.jetlinks.community.dashboard.supports.StaticMeasurement; +import org.jetlinks.community.device.service.data.DeviceDataService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -23,7 +22,7 @@ class DeviceEventsMeasurement extends StaticMeasurement { private final EventBus eventBus; - private final TimeSeriesManager timeSeriesManager; + private final DeviceDataService deviceDataService; private final DeviceMetadata metadata; @@ -32,11 +31,11 @@ class DeviceEventsMeasurement extends StaticMeasurement { public DeviceEventsMeasurement(String productId, EventBus eventBus, DeviceMetadata deviceMetadata, - TimeSeriesManager timeSeriesManager) { + DeviceDataService deviceDataService) { super(MeasurementDefinition.of("events", "事件记录")); this.productId = productId; this.eventBus = eventBus; - this.timeSeriesManager = timeSeriesManager; + this.deviceDataService = deviceDataService; this.metadata = deviceMetadata; addDimension(new RealTimeDevicePropertyDimension()); } @@ -47,9 +46,8 @@ class DeviceEventsMeasurement extends StaticMeasurement { return history <= 0 ? Flux.empty() : Flux.fromIterable(metadata.getEvents()) .flatMap(event -> QueryParamEntity.newQuery() .doPaging(0, history) - .where("deviceId", deviceId) - .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceEventMetric(productId, event.getId()))::query) - .map(data -> SimpleMeasurementValue.of(createValue(event.getId(), data.getData()), data.getTimestamp())) + .execute(q -> deviceDataService.queryEvent(deviceId, event.getId(), q, false)) + .map(data -> SimpleMeasurementValue.of(createValue(event.getId(), data), data.getTimestamp())) .sort(MeasurementValue.sort())); } @@ -61,10 +59,10 @@ class DeviceEventsMeasurement extends StaticMeasurement { } Flux fromRealTime(String deviceId) { - org.jetlinks.core.event.Subscription subscription = org.jetlinks.core.event.Subscription.of( + Subscription subscription = Subscription.of( "realtime-device-events-measurement", "/device/" + productId + "/" + deviceId + "/message/event/*", - org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker + Subscription.Feature.local, Subscription.Feature.broker ); return eventBus @@ -117,17 +115,18 @@ class DeviceEventsMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { - return Mono.justOrEmpty(parameter.getString("deviceId")) + return Mono + .justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); - //合并历史数据和实时数据 - return Flux.concat( - //查询历史数据 - fromHistory(deviceId, history) - , - //从消息网关订阅实时事件消息 - fromRealTime(deviceId) - ); + return //合并历史数据和实时数据 + Flux.concat( + //查询历史数据 + fromHistory(deviceId, history) + , + //从消息网关订阅实时事件消息 + fromRealTime(deviceId) + ); }); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java index 898f277c..19b5c9bb 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertiesMeasurement.java @@ -1,57 +1,54 @@ package org.jetlinks.community.device.measurements; +import lombok.extern.slf4j.Slf4j; import org.hswebframework.web.api.crud.entity.QueryParamEntity; -import org.jetlinks.community.dashboard.*; -import org.jetlinks.community.dashboard.supports.StaticMeasurement; -import org.jetlinks.community.timeseries.TimeSeriesService; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.property.ReadPropertyMessageReply; -import org.jetlinks.core.message.property.ReportPropertyMessage; -import org.jetlinks.core.message.property.WritePropertyMessageReply; import org.jetlinks.core.metadata.*; import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.dashboard.*; +import org.jetlinks.community.dashboard.supports.StaticMeasurement; +import org.jetlinks.community.device.service.data.DeviceDataService; +import org.jetlinks.community.gateway.DeviceMessageUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.HashMap; import java.util.Map; +@Slf4j class DevicePropertiesMeasurement extends StaticMeasurement { private final EventBus eventBus; - private final TimeSeriesService timeSeriesService; - private final DeviceMetadata metadata; + private final DeviceDataService dataService; + private final String productId; public DevicePropertiesMeasurement(String productId, EventBus eventBus, - DeviceMetadata deviceMetadata, - TimeSeriesService timeSeriesService) { + DeviceDataService dataService, + DeviceMetadata deviceMetadata) { super(MeasurementDefinition.of("properties", "属性记录")); this.productId = productId; this.eventBus = eventBus; - this.timeSeriesService = timeSeriesService; this.metadata = deviceMetadata; + this.dataService = dataService; addDimension(new RealTimeDevicePropertyDimension()); addDimension(new HistoryDevicePropertyDimension()); } Flux fromHistory(String deviceId, int history) { - return history <= 0 ? Flux.empty() : Flux.fromIterable(metadata.getProperties()) - .flatMap(propertyMetadata -> QueryParamEntity.newQuery() - .doPaging(0, history) - .where("deviceId", deviceId) - .and("property", propertyMetadata.getId()) - .execute(timeSeriesService::query) - .map(data -> SimpleMeasurementValue.of(createValue(propertyMetadata.getId(), data.get("value").orElse(null)), data.getTimestamp())) - .sort(MeasurementValue.sort())); + return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery() + .doPaging(0, history) + .execute(q -> dataService.queryEachProperties(deviceId, q)) + .map(data -> SimpleMeasurementValue.of(data, data.getTimestamp())) + .sort(MeasurementValue.sort()); } Map createValue(String property, Object value) { @@ -77,32 +74,22 @@ class DevicePropertiesMeasurement extends StaticMeasurement { Flux fromRealTime(String deviceId) { - org.jetlinks.core.event.Subscription subscription= org.jetlinks.core.event.Subscription.of( + Subscription subscription = Subscription.of( "realtime-device-properties-measurement", new String[]{ "/device/" + productId + "/" + deviceId + "/message/property/report", "/device/" + productId + "/" + deviceId + "/message/property/*/reply" }, - org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker + Subscription.Feature.local, Subscription.Feature.broker ); return eventBus .subscribe(subscription, DeviceMessage.class) - .flatMap(msg -> { - if (msg instanceof ReportPropertyMessage) { - return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties()); - } - if (msg instanceof ReadPropertyMessageReply) { - return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties()); - } - if (msg instanceof WritePropertyMessageReply) { - return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties()); - } - return Mono.empty(); - }) + .flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg))) .flatMap(map -> Flux.fromIterable(map.entrySet())) - .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis())) + .map(kv -> SimpleMeasurementValue.of(createValue(kv.getKey(), kv.getValue()), System.currentTimeMillis())) + .onErrorContinue((err, v) -> log.error(err.getMessage(), err)) ; } @@ -110,7 +97,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")); /** - * 历史设备事件 + * 历史 */ private class HistoryDevicePropertyDimension implements MeasurementDimension { @@ -122,9 +109,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public DataType getValueType() { return new ObjectType() - .addProperty("property","属性", StringType.GLOBAL) - .addProperty("value","值", StringType.GLOBAL) - .addProperty("formatValue","格式化值", StringType.GLOBAL); + .addProperty("property", "属性", StringType.GLOBAL) + .addProperty("value", "值", StringType.GLOBAL) + .addProperty("formatValue", "格式化值", StringType.GLOBAL); } @Override @@ -149,7 +136,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { } /** - * 实时设备事件 + * 实时 */ private class RealTimeDevicePropertyDimension implements MeasurementDimension { @@ -161,9 +148,9 @@ class DevicePropertiesMeasurement extends StaticMeasurement { @Override public DataType getValueType() { return new ObjectType() - .addProperty("property","属性", StringType.GLOBAL) - .addProperty("value","值", StringType.GLOBAL) - .addProperty("formatValue","格式化值", StringType.GLOBAL); + .addProperty("property", "属性", StringType.GLOBAL) + .addProperty("value", "值", StringType.GLOBAL) + .addProperty("formatValue", "格式化值", StringType.GLOBAL); } @Override @@ -182,7 +169,7 @@ class DevicePropertiesMeasurement extends StaticMeasurement { .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); //合并历史数据和实时数据 - return Flux.concat( + return Flux.concat( //查询历史数据 fromHistory(deviceId, history) , diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java index 5533e4dd..38348f54 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java @@ -2,27 +2,27 @@ package org.jetlinks.community.device.measurements; import org.hswebframework.utils.time.DateFormatter; import org.hswebframework.web.api.crud.entity.QueryParamEntity; -import org.jetlinks.community.Interval; -import org.jetlinks.community.dashboard.*; -import org.jetlinks.community.dashboard.supports.StaticMeasurement; -import org.jetlinks.community.timeseries.TimeSeriesService; -import org.jetlinks.community.timeseries.query.Aggregation; -import org.jetlinks.community.timeseries.query.AggregationQueryParam; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.message.DeviceMessage; -import org.jetlinks.core.message.property.ReadPropertyMessageReply; -import org.jetlinks.core.message.property.ReportPropertyMessage; -import org.jetlinks.core.message.property.WritePropertyMessageReply; import org.jetlinks.core.metadata.*; import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.NumberType; import org.jetlinks.core.metadata.types.ObjectType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.Interval; +import org.jetlinks.community.dashboard.*; +import org.jetlinks.community.dashboard.supports.StaticMeasurement; +import org.jetlinks.community.device.service.data.DeviceDataService; +import org.jetlinks.community.gateway.DeviceMessageUtils; +import org.jetlinks.community.timeseries.query.Aggregation; +import org.jetlinks.community.timeseries.query.AggregationData; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.LocalDateTime; -import java.time.ZoneId; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -33,19 +33,19 @@ class DevicePropertyMeasurement extends StaticMeasurement { private final EventBus eventBus; - private final TimeSeriesService timeSeriesService; + private final DeviceDataService deviceDataService; private final String productId; public DevicePropertyMeasurement(String productId, EventBus eventBus, PropertyMetadata metadata, - TimeSeriesService timeSeriesService) { + DeviceDataService deviceDataService) { super(MetadataMeasurementDefinition.of(metadata)); this.productId = productId; this.eventBus = eventBus; this.metadata = metadata; - this.timeSeriesService = timeSeriesService; + this.deviceDataService = deviceDataService; addDimension(new RealTimeDevicePropertyDimension()); addDimension(new HistoryDevicePropertyDimension()); if (metadata.getValueType() instanceof NumberType) { @@ -64,12 +64,15 @@ class DevicePropertyMeasurement extends StaticMeasurement { } Flux fromHistory(String deviceId, int history) { - return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery() + return history <= 0 + ? Flux.empty() + : QueryParamEntity + .newQuery() .doPaging(0, history) .where("deviceId", deviceId) .and("property", metadata.getId()) - .execute(timeSeriesService::query) - .map(data -> SimpleMeasurementValue.of(createValue(data.get("value").orElse(null)), data.getTimestamp())) + .execute(q -> deviceDataService.queryProperty(deviceId, q, metadata.getId())) + .map(data -> SimpleMeasurementValue.of(data, data.getTimestamp())) .sort(MeasurementValue.sort()); } @@ -85,18 +88,7 @@ class DevicePropertyMeasurement extends StaticMeasurement { return eventBus .subscribe(subscription, DeviceMessage.class) - .flatMap(msg -> { - if (msg instanceof ReportPropertyMessage) { - return Mono.justOrEmpty(((ReportPropertyMessage) msg).getProperties()); - } - if (msg instanceof ReadPropertyMessageReply) { - return Mono.justOrEmpty(((ReadPropertyMessageReply) msg).getProperties()); - } - if (msg instanceof WritePropertyMessageReply) { - return Mono.justOrEmpty(((WritePropertyMessageReply) msg).getProperties()); - } - return Mono.empty(); - }) + .flatMap(msg -> Mono.justOrEmpty(DeviceMessageUtils.tryGetProperties(msg))) .filter(msg -> msg.containsKey(metadata.getId())) .map(msg -> SimpleMeasurementValue.of(createValue(msg.get(metadata.getId())), System.currentTimeMillis())); } @@ -150,23 +142,38 @@ class DevicePropertyMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { - return AggregationQueryParam.of() - .agg("numberValue", "value", parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG)) - .filter(query -> query - .where("property", metadata.getId()) - .and("deviceId", parameter.getString("deviceId").orElse(null)) - ) - .limit(parameter.getInt("limit", 10)) - .groupBy(parameter.getInterval("time", Interval.ofSeconds(10)), parameter.getString("format", "HH:mm:ss")) - .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) - .to(parameter.getDate("to").orElse(new Date())) - .execute(timeSeriesService::aggregation) - .index((index, data) -> SimpleMeasurementValue.of( - createValue(data.getInt("value").orElse(0)), - data.getString("time").orElse(""), - index)) - .sort(); + String deviceId = parameter.getString("deviceId", null); + DeviceDataService.AggregationRequest request = new DeviceDataService.AggregationRequest(); + DeviceDataService.DevicePropertyAggregation aggregation = new DeviceDataService.DevicePropertyAggregation( + metadata.getId(), metadata.getId(), parameter.getString("agg").map(String::toUpperCase).map(Aggregation::valueOf).orElse(Aggregation.AVG) + ); + String format = parameter.getString("format", "HH:mm:ss"); + DateTimeFormatter formatter = DateTimeFormat.forPattern(format); + request.setLimit(parameter.getInt("limit", 10)); + request.setInterval(parameter.getInterval("time", Interval.ofSeconds(10))); + request.setFormat(format); + request.setFrom(parameter.getDate("from", DateTime.now().plusDays(-1).toDate())); + request.setTo(parameter.getDate("to", DateTime.now().plusDays(-1).toDate())); + Flux dataFlux; + + if (StringUtils.hasText(deviceId)) { + dataFlux = deviceDataService + .aggregationPropertiesByDevice(deviceId, request, aggregation); + } else { + dataFlux = deviceDataService.aggregationPropertiesByProduct(productId, request, aggregation); + } + return dataFlux + .map(data -> { + long ts = data.getString("time") + .map(time -> DateTime.parse(time, formatter).getMillis()) + .orElse(System.currentTimeMillis()); + return SimpleMeasurementValue.of(createValue( + data.get(metadata.getId()).orElse(0)), + data.getString("time",""), + ts); + }) + .sort(); } } @@ -203,18 +210,15 @@ class DevicePropertyMeasurement extends StaticMeasurement { return Mono.justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(1); - - return QueryParamEntity.newQuery() + return QueryParamEntity.newQuery() .doPaging(0, history) - .where("deviceId", deviceId) - .and("property", metadata.getId()) .as(query -> query .gte("timestamp", parameter.getDate("from").orElse(null)) .lte("timestamp", parameter.getDate("to").orElse(null))) - .execute(timeSeriesService::query) + .execute(q -> deviceDataService.queryProperty(deviceId, q, metadata.getId())) .map(data -> SimpleMeasurementValue.of( - createValue(data.get("value").orElse(null)), - DateFormatter.toString(new Date(data.getTimestamp()), parameter.getString("timeFormat","HH:mm:ss")), + data, + DateFormatter.toString(new Date(data.getTimestamp()), parameter.getString("timeFormat", "HH:mm:ss")), data.getTimestamp())) .sort(MeasurementValue.sort()); }); @@ -254,14 +258,14 @@ class DevicePropertyMeasurement extends StaticMeasurement { return Mono.justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); - //合并历史数据和实时数据 - return Flux.concat( - //查询历史数据 - fromHistory(deviceId, history) - , - //从消息网关订阅实时事件消息 - fromRealTime(deviceId) - ); + return //合并历史数据和实时数据 + Flux.concat( + //查询历史数据 + fromHistory(deviceId, history) + , + //从消息网关订阅实时事件消息 + fromRealTime(deviceId) + ); }); } } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java index d20cdceb..66fd8277 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.java @@ -1,11 +1,7 @@ package org.jetlinks.community.device.measurements.message; -import org.jetlinks.community.Interval; -import org.jetlinks.community.dashboard.*; -import org.jetlinks.community.dashboard.supports.StaticMeasurement; -import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; -import org.jetlinks.community.timeseries.TimeSeriesManager; -import org.jetlinks.community.timeseries.query.AggregationQueryParam; +import org.jetlinks.core.device.DeviceProductOperator; +import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.metadata.ConfigMetadata; @@ -14,12 +10,25 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata; import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.Interval; +import org.jetlinks.community.dashboard.*; +import org.jetlinks.community.dashboard.supports.StaticMeasurement; +import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; +import org.jetlinks.community.timeseries.TimeSeriesManager; +import org.jetlinks.community.timeseries.TimeSeriesMetric; +import org.jetlinks.community.timeseries.query.AggregationData; +import org.jetlinks.community.timeseries.query.AggregationQueryParam; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; class DeviceMessageMeasurement extends StaticMeasurement { @@ -27,11 +36,14 @@ class DeviceMessageMeasurement extends StaticMeasurement { private final TimeSeriesManager timeSeriesManager; + private final DeviceRegistry deviceRegistry; static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量"); public DeviceMessageMeasurement(EventBus eventBus, + DeviceRegistry registry, TimeSeriesManager timeSeriesManager) { super(definition); + this.deviceRegistry = registry; this.eventBus = eventBus; this.timeSeriesManager = timeSeriesManager; addDimension(new RealTimeMessageDimension()); @@ -65,9 +77,11 @@ class DeviceMessageMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { + + //通过订阅消息来统计实时数据量 return eventBus - .subscribe(org.jetlinks.core.event.Subscription.of("real-time-device-message", "/device/**", org.jetlinks.core.event.Subscription.Feature.local, Subscription.Feature.broker)) + .subscribe(Subscription.of("real-time-device-message", "/device/**", Subscription.Feature.local, Subscription.Feature.broker)) .window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1))) .flatMap(Flux::count) .map(total -> SimpleMeasurementValue.of(total, System.currentTimeMillis())); @@ -107,10 +121,41 @@ class DeviceMessageMeasurement extends StaticMeasurement { return false; } + private AggregationQueryParam createQueryParam(MeasurementParameter parameter) { + return AggregationQueryParam.of() +// .sum("count") + .groupBy( + parameter.getInterval("time").orElse(Interval.ofHours(1)), + parameter.getString("format").orElse("MM月dd日 HH时")) +// .filter(query -> +// query +// .where("name", "message-count") +// .is("productId", parameter.getString("productId").orElse(null)) +// .is("msgType", parameter.getString("msgType").orElse(null)) +// ) + .limit(parameter.getInt("limit").orElse(1)) + .from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) + .to(parameter.getDate("to").orElse(new Date())); + } + + private Mono getProductMetrics(List productIdList) { + return Flux + .fromIterable(productIdList) + .flatMap(id -> deviceRegistry + .getProduct(id) + .flatMap(DeviceProductOperator::getMetadata) + .onErrorResume(err -> Mono.empty()) + .flatMapMany(metadata -> Flux.fromIterable(metadata.getEvents()) + .map(event -> DeviceTimeSeriesMetric.deviceEventMetric(id, event.getId()))) + .concatWithValues(DeviceTimeSeriesMetric.devicePropertyMetric(id))) + .collectList() + .map(list -> list.toArray(new TimeSeriesMetric[0])); + } + @Override public Flux getValue(MeasurementParameter parameter) { - return AggregationQueryParam.of() + return AggregationQueryParam.of() .sum("count") .groupBy( parameter.getInterval("time").orElse(Interval.ofHours(1)), diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java index 97a35fa1..9dbdd985 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/message/DeviceMessageMeasurementProvider.java @@ -1,6 +1,9 @@ package org.jetlinks.community.device.measurements.message; import io.micrometer.core.instrument.MeterRegistry; +import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider; import org.jetlinks.community.device.measurements.DeviceDashboardDefinition; import org.jetlinks.community.device.measurements.DeviceObjectDefinition; @@ -8,25 +11,24 @@ import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; import org.jetlinks.community.gateway.annotation.Subscribe; import org.jetlinks.community.micrometer.MeterRegistryManager; import org.jetlinks.community.timeseries.TimeSeriesManager; -import org.jetlinks.core.event.EventBus; -import org.jetlinks.core.message.DeviceMessage; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @Component public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider { - MeterRegistry registry; public DeviceMessageMeasurementProvider(EventBus eventBus, MeterRegistryManager registryManager, + DeviceRegistry deviceRegistry, TimeSeriesManager timeSeriesManager) { super(DeviceDashboardDefinition.instance, DeviceObjectDefinition.message); + registry = registryManager.getMeterRegister(DeviceTimeSeriesMetric.deviceMetrics().getId(), "target", "msgType", "productId"); - addMeasurement(new DeviceMessageMeasurement(eventBus, timeSeriesManager)); + addMeasurement(new DeviceMessageMeasurement(eventBus, deviceRegistry, timeSeriesManager)); } @@ -46,7 +48,6 @@ public class DeviceMessageMeasurementProvider extends StaticMeasurementProvider return empty; } return new String[]{ - "msgType", message.getMessageType().name().toLowerCase(), "productId", message.getHeader("productId").map(String::valueOf).orElse("unknown") }; } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java index 960b4bd2..25480084 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusChangeMeasurement.java @@ -1,11 +1,5 @@ package org.jetlinks.community.device.measurements.status; -import org.jetlinks.community.Interval; -import org.jetlinks.community.dashboard.*; -import org.jetlinks.community.dashboard.supports.StaticMeasurement; -import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; -import org.jetlinks.community.timeseries.TimeSeriesManager; -import org.jetlinks.community.timeseries.query.AggregationQueryParam; import org.jetlinks.core.event.EventBus; import org.jetlinks.core.event.Subscription; import org.jetlinks.core.message.DeviceMessage; @@ -17,6 +11,15 @@ import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.core.metadata.types.EnumType; import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.Interval; +import org.jetlinks.community.dashboard.*; +import org.jetlinks.community.dashboard.supports.StaticMeasurement; +import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; +import org.jetlinks.community.timeseries.TimeSeriesManager; +import org.jetlinks.community.timeseries.query.AggregationQueryParam; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,7 +35,7 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement { private final TimeSeriesManager timeSeriesManager; - static final MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更"); + static MeasurementDefinition definition = MeasurementDefinition.of("change", "设备状态变更"); static ConfigMetadata configMetadata = new DefaultConfigMetadata() .add("deviceId", "设备", "指定设备", new StringType().expand("selector", "device-selector")); @@ -88,11 +91,12 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { + String format = parameter.getString("format").orElse("yyyy年MM月dd日"); + DateTimeFormatter formatter = DateTimeFormat.forPattern(format); return AggregationQueryParam.of() .sum("count") - .groupBy(parameter.getInterval("time").orElse(Interval.ofHours(1)), - parameter.getString("format").orElse("MM月dd日 HH时")) + .groupBy(parameter.getInterval("time", Interval.ofDays(1)), format) .filter(query -> query.where("name", parameter.getString("type").orElse("online")) .is("productId", parameter.getString("productId").orElse(null)) @@ -101,10 +105,15 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement { .from(parameter.getDate("from").orElse(Date.from(LocalDateTime.now().plusDays(-1).atZone(ZoneId.systemDefault()).toInstant()))) .to(parameter.getDate("to").orElse(new Date())) .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation) - .index((index, data) -> SimpleMeasurementValue.of( - data.getInt("count").orElse(0), - data.getString("time").orElse(""), - index)) + .map(data -> { + long ts = data.getString("time") + .map(time -> DateTime.parse(time, formatter).getMillis()) + .orElse(System.currentTimeMillis()); + return SimpleMeasurementValue.of( + data.get("count").orElse(0), + data.getString("time", ""), + ts); + }) .sort(); } } @@ -124,7 +133,6 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement { return type; } - @Override public ConfigMetadata getParams() { return configMetadata; @@ -137,21 +145,20 @@ class DeviceStatusChangeMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { + + return Mono.justOrEmpty(parameter.getString("deviceId")) - .flatMapMany(deviceId -> { - //从消息网关订阅消息 - return eventBus.subscribe(org.jetlinks.core.event.Subscription.of( + .flatMapMany(deviceId ->//从消息网关订阅消息 + eventBus.subscribe(Subscription.of( "RealTimeDeviceStateDimension" , new String[]{ "/device/*/" + deviceId + "/online", "/device/*/" + deviceId + "/offline" }, - org.jetlinks.core.event.Subscription.Feature.local, + Subscription.Feature.local, Subscription.Feature.broker ), DeviceMessage.class) - .map(msg -> SimpleMeasurementValue.of(createStateValue(msg), msg.getTimestamp())) - ; - }); + .map(msg -> SimpleMeasurementValue.of(createStateValue(msg), msg.getTimestamp()))); } Map createStateValue(DeviceMessage message) { diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java index e24bbf7e..5a26350a 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusMeasurementProvider.java @@ -2,6 +2,8 @@ package org.jetlinks.community.device.measurements.status; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; +import org.jetlinks.core.event.EventBus; +import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider; import org.jetlinks.community.device.measurements.DeviceDashboardDefinition; import org.jetlinks.community.device.measurements.DeviceObjectDefinition; @@ -10,8 +12,6 @@ import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; import org.jetlinks.community.gateway.annotation.Subscribe; import org.jetlinks.community.micrometer.MeterRegistryManager; import org.jetlinks.community.timeseries.TimeSeriesManager; -import org.jetlinks.core.event.EventBus; -import org.jetlinks.core.message.DeviceMessage; import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; @@ -51,8 +51,8 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider { } @Subscribe("/device/*/*/online") - public Mono incrementOnline(DeviceMessage msg) { - return Mono.fromRunnable(() -> { + public Mono incrementOnline(DeviceMessage msg){ + return Mono.fromRunnable(()->{ String productId = parseProductId(msg); counterAdder.apply(productId).increment(); registry @@ -62,18 +62,18 @@ public class DeviceStatusMeasurementProvider extends StaticMeasurementProvider { } @Subscribe("/device/*/*/offline") - public Mono incrementOffline(DeviceMessage msg) { - return Mono.fromRunnable(() -> { + public Mono incrementOffline(DeviceMessage msg){ + return Mono.fromRunnable(()->{ String productId = parseProductId(msg); - counterAdder.apply(productId).increment(); + // counterAdder.apply(productId).decrement(); registry .counter("offline", "productId", productId) .increment(); }); } - private String parseProductId(DeviceMessage deviceMessage) { - return deviceMessage + private String parseProductId(DeviceMessage msg) { + return msg .getHeader("productId") .map(String::valueOf) .orElse("unknown"); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java index c19e859c..3037c8d8 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/status/DeviceStatusRecordMeasurement.java @@ -1,6 +1,5 @@ package org.jetlinks.community.device.measurements.status; -import org.jetlinks.community.Interval; import org.jetlinks.core.metadata.ConfigMetadata; import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.DefaultConfigMetadata; @@ -8,6 +7,7 @@ import org.jetlinks.core.metadata.types.DateTimeType; import org.jetlinks.core.metadata.types.EnumType; import org.jetlinks.core.metadata.types.IntType; import org.jetlinks.core.metadata.types.StringType; +import org.jetlinks.community.Interval; import org.jetlinks.community.dashboard.*; import org.jetlinks.community.dashboard.supports.StaticMeasurement; import org.jetlinks.community.device.entity.DeviceInstanceEntity; @@ -16,10 +16,12 @@ import org.jetlinks.community.device.service.LocalDeviceInstanceService; import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.query.AggregationQueryParam; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Date; @@ -27,9 +29,9 @@ import java.util.Date; class DeviceStatusRecordMeasurement extends StaticMeasurement { - private final LocalDeviceInstanceService instanceService; + public LocalDeviceInstanceService instanceService; - private final TimeSeriesManager timeSeriesManager; + private TimeSeriesManager timeSeriesManager; static MeasurementDefinition definition = MeasurementDefinition.of("record", "设备状态记录"); @@ -76,6 +78,9 @@ class DeviceStatusRecordMeasurement @Override public Flux getValue(MeasurementParameter parameter) { + String format = parameter.getString("format").orElse("yyyy年MM月dd日"); + DateTimeFormatter formatter = DateTimeFormat.forPattern(format); + return AggregationQueryParam.of() .max("value") .filter(query -> @@ -88,10 +93,15 @@ class DeviceStatusRecordMeasurement parameter.getString("format").orElse("yyyy年MM月dd日")) .limit(parameter.getInt("limit").orElse(10)) .execute(timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())::aggregation) - .index((index, data) -> SimpleMeasurementValue.of( - data.getInt("value").orElse(0), - data.getString("time").orElse("-"), - index)) + .map(data -> { + long ts = data.getString("time") + .map(time -> DateTime.parse(time, formatter).getMillis()) + .orElse(System.currentTimeMillis()); + return SimpleMeasurementValue.of( + data.get("value").orElse(0), + data.getString("time", ""), + ts); + }) .sort(); } }