diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java index 84ec96bd..ed7a72b2 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java @@ -2,6 +2,7 @@ package org.jetlinks.community.device.service.data; import com.alibaba.fastjson.JSON; import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.MapUtils; import org.hswebframework.ezorm.core.param.TermType; @@ -50,6 +51,7 @@ import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*; * @author zhouhao * @since 1.5.0 */ +@Slf4j public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStoragePolicy { private final AtomicInteger nanoInc = new AtomicInteger(); @@ -87,8 +89,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora * @param message 设备属性消息 * @param properties 物模型属性 * @return 数据集合 - * @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map) - * @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map) + * @see AbstractDeviceDataStoragePolicy#convertPropertiesForColumnPolicy(String, DeviceMessage, Map) + * @see AbstractDeviceDataStoragePolicy#convertPropertiesForRowPolicy(String, DeviceMessage, Map) */ protected abstract Flux> convertProperties(String productId, DeviceMessage message, @@ -135,6 +137,14 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return DigestUtils.md5Hex(String.join("_", message.getDeviceId(), String.valueOf(createUniqueNanoTime(ts)))); } + protected String getDeviceLogMetric(String productId) { + return deviceLogMetricId(productId); + } + + protected String getDeviceEventMetric(String productId, String eventId) { + return deviceEventMetricId(productId, eventId); + } + protected Mono> createDeviceMessageLog(String productId, DeviceMessage message, BiConsumer logEntityConsumer) { @@ -211,37 +221,45 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora * @return 二元组 */ protected Mono> convertEventMessageToTimeSeriesData(String productId, EventMessage message) { - // 设备注册中心获取设备操作接口 - // 获取设备元数据 物模型 - return deviceRegistry - .getDevice(message.getDeviceId()) - .flatMap(device -> device - .getMetadata() - .map(metadata -> { - Object value = message.getData(); - DataType dataType = metadata - .getEvent(message.getEvent()) - .map(EventMetadata::getType) - .orElseGet(UnknownType::new); - Object tempValue = ValueTypeTranslator.translator(value, dataType); - Map data; - if (tempValue instanceof Map) { - @SuppressWarnings("all") - Map mapValue = ((Map) tempValue); - int size = mapValue.size(); - data = newMap(size); - data.putAll(mapValue); - } else { - data = newMap(16); - data.put("value", tempValue); - } - data.put("id", createDataId(message)); - data.put("deviceId", device.getDeviceId()); - data.put("createTime", System.currentTimeMillis()); - return TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data); + return deviceRegistry + .getProduct(productId) + .flatMap(product -> product + .getMetadata() + .handle((metadata, sink) -> { + if (metadata.getEventOrNull(message.getEvent()) == null) { + log.warn("产品[{}]物模型中未定义事件:{}", productId, message.getEvent()); + return; + } + Map data = createEventData(message, metadata); + sink.next(TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data)); })) - .map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data)); + .map(data -> Tuples.of(getDeviceEventMetric(productId, message.getEvent()), data)); + } + + protected Map createEventData(EventMessage message, DeviceMetadata metadata) { + Object value = message.getData(); + DataType dataType = metadata + .getEvent(message.getEvent()) + .map(EventMetadata::getType) + .orElseGet(UnknownType::new); + Object tempValue = ValueTypeTranslator.translator(value, dataType); + Map data; + if (tempValue instanceof Map) { + @SuppressWarnings("all") + Map mapValue = ((Map) tempValue); + int size = mapValue.size(); + data = newMap(size); + data.putAll(mapValue); + } else { + data = newMap(16); + data.put("value", tempValue); + } + data.put("id", createDataId(message)); + data.put("deviceId", message.getDeviceId()); + data.put("createTime", System.currentTimeMillis()); + + return data; } @Override @@ -429,7 +447,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .flatMap(entry -> { String id; String property = entry.getT2().getKey(); - long ts = propertySourceTimes.getOrDefault(property,message.getTimestamp()); + long ts = propertySourceTimes.getOrDefault(property, message.getTimestamp()); //忽略存在没有的属性和忽略存储的属性 PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(property); if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {