From ea639a12bb890d5fac76d3a271936c55ca611cef Mon Sep 17 00:00:00 2001 From: zhouhao Date: Fri, 14 Feb 2020 12:03:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0format?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../measurements/DevicePropertyMeasurement.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java index e65ebd13..3a5403a2 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/measurements/DevicePropertyMeasurement.java @@ -36,16 +36,17 @@ class DevicePropertyMeasurement extends StaticMeasurement { } - Flux fromHistory(String deviceId, int history) { + Flux fromHistory(String deviceId, int history, boolean format) { return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery() .doPaging(0, history) .where("deviceId", deviceId) .execute(timeSeriesService::query) - // TODO: 2020/2/7 获取合适的值 - .map(data -> SimpleMeasurementValue.of(data.getData().get("value"), data.getTimestamp())); + .map(data -> SimpleMeasurementValue.of(format ? + data.get("formatValue").orElse("/") : + data.get("value").orElse(null), data.getTimestamp())); } - Flux fromRealTime(String deviceId) { + Flux fromRealTime(String deviceId, boolean format) { return messageGateway .subscribe(Stream.of( "/device/" + deviceId + "/message/property/report" @@ -66,7 +67,7 @@ class DevicePropertyMeasurement extends StaticMeasurement { return Mono.empty(); }) .filter(msg -> msg.containsKey(metadata.getId())) - .map(msg -> SimpleMeasurementValue.of(msg.get(metadata.getId()), System.currentTimeMillis())); + .map(msg -> SimpleMeasurementValue.of(format ? metadata.getValueType().format(msg.get(metadata.getId())) : msg.get(metadata.getId()), System.currentTimeMillis())); } /** @@ -97,16 +98,18 @@ class DevicePropertyMeasurement extends StaticMeasurement { @Override public Flux getValue(MeasurementParameter parameter) { + boolean format = parameter.getBoolean("format").orElse(true); + return Mono.justOrEmpty(parameter.getString("deviceId")) .flatMapMany(deviceId -> { int history = parameter.getInt("history").orElse(0); //合并历史数据和实时数据 return Flux.concat( //查询历史数据 - fromHistory(deviceId, history) + fromHistory(deviceId, history, format) , //从消息网关订阅实时事件消息 - fromRealTime(deviceId) + fromRealTime(deviceId, format) ); }); }