增加format

This commit is contained in:
zhouhao 2020-02-14 12:03:22 +08:00
parent 8018949808
commit ea639a12bb
1 changed files with 10 additions and 7 deletions

View File

@ -36,16 +36,17 @@ class DevicePropertyMeasurement extends StaticMeasurement {
}
Flux<MeasurementValue> fromHistory(String deviceId, int history) {
Flux<MeasurementValue> fromHistory(String deviceId, int history, boolean format) {
return history <= 0 ? Flux.empty() : QueryParamEntity.newQuery()
.doPaging(0, history)
.where("deviceId", deviceId)
.execute(timeSeriesService::query)
// TODO: 2020/2/7 获取合适的值
.map(data -> SimpleMeasurementValue.of(data.getData().get("value"), data.getTimestamp()));
.map(data -> SimpleMeasurementValue.of(format ?
data.get("formatValue").orElse("/") :
data.get("value").orElse(null), data.getTimestamp()));
}
Flux<MeasurementValue> fromRealTime(String deviceId) {
Flux<MeasurementValue> fromRealTime(String deviceId, boolean format) {
return messageGateway
.subscribe(Stream.of(
"/device/" + deviceId + "/message/property/report"
@ -66,7 +67,7 @@ class DevicePropertyMeasurement extends StaticMeasurement {
return Mono.empty();
})
.filter(msg -> msg.containsKey(metadata.getId()))
.map(msg -> SimpleMeasurementValue.of(msg.get(metadata.getId()), System.currentTimeMillis()));
.map(msg -> SimpleMeasurementValue.of(format ? metadata.getValueType().format(msg.get(metadata.getId())) : msg.get(metadata.getId()), System.currentTimeMillis()));
}
/**
@ -97,16 +98,18 @@ class DevicePropertyMeasurement extends StaticMeasurement {
@Override
public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
boolean format = parameter.getBoolean("format").orElse(true);
return Mono.justOrEmpty(parameter.getString("deviceId"))
.flatMapMany(deviceId -> {
int history = parameter.getInt("history").orElse(0);
//合并历史数据和实时数据
return Flux.concat(
//查询历史数据
fromHistory(deviceId, history)
fromHistory(deviceId, history, format)
,
//从消息网关订阅实时事件消息
fromRealTime(deviceId)
fromRealTime(deviceId, format)
);
});
}