From d449aad80d858dfe459bc2f511dc7f2872c67b1e Mon Sep 17 00:00:00 2001 From: zhou-hao Date: Tue, 6 Apr 2021 18:05:41 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=AE=BE=E5=A4=87=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/DeviceMessageConnector.java | 2 +- .../data/AbstractDeviceDataStoragePolicy.java | 317 ++++++++++++------ .../device/service/data/StorageConstants.java | 38 +++ 3 files changed, 255 insertions(+), 102 deletions(-) create mode 100755 jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/StorageConstants.java diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index 41c7ac73..efda209d 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -44,7 +44,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { private final MessageHandler messageHandler; - private final static BiConsumer doOnError = (error, val) -> log.error(error.getMessage(), error); + private final static BiConsumer doOnError = (error, val) -> DeviceMessageConnector.log.error(error.getMessage(), error); private final static Function> configGetter = operator -> operator.getSelfConfigs(allConfigHeader); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java index 4424815f..ebaa8d0d 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/AbstractDeviceDataStoragePolicy.java @@ -1,15 +1,18 @@ package org.jetlinks.community.device.service.data; import com.alibaba.fastjson.JSON; +import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.MapUtils; import org.hswebframework.ezorm.core.param.TermType; import org.hswebframework.web.api.crud.entity.PagerResult; import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.hswebframework.web.id.IDGenerator; +import org.jetlinks.community.gateway.DeviceMessageUtils; import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceProductOperator; import org.jetlinks.core.device.DeviceRegistry; +import org.jetlinks.core.message.DeviceLogMessage; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.core.message.DeviceMessageReply; import org.jetlinks.core.message.Headers; @@ -18,7 +21,7 @@ import org.jetlinks.core.message.property.ReadPropertyMessageReply; import org.jetlinks.core.message.property.ReportPropertyMessage; import org.jetlinks.core.message.property.WritePropertyMessageReply; import org.jetlinks.core.metadata.*; -import org.jetlinks.core.metadata.types.UnknownType; +import org.jetlinks.core.metadata.types.*; import org.jetlinks.community.device.entity.DeviceEvent; import org.jetlinks.community.device.entity.DeviceOperationLogEntity; import org.jetlinks.community.device.entity.DevicePropertiesEntity; @@ -27,6 +30,8 @@ import org.jetlinks.community.device.enums.DeviceLogType; import org.jetlinks.community.device.events.handler.ValueTypeTranslator; import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric; import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.core.utils.DeviceMessageTracer; +import org.jetlinks.core.utils.TimestampUtils; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -37,9 +42,14 @@ import javax.annotation.Nonnull; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage; +import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsJsonStringStorage; +import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*; + /** * 抽象设备数据数据存储,实现一些通用的逻辑 * @@ -110,10 +120,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora @Override public Mono saveDeviceMessage(@Nonnull Publisher message) { return Flux.from(message) - .flatMap(this::convertMessageToTimeSeriesData) - .groupBy(Tuple2::getT1) - .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2))) - .then(); + .flatMap(this::convertMessageToTimeSeriesData) + .groupBy(Tuple2::getT1, Integer.MAX_VALUE) + .flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2))) + .then(); } protected String createDataId(DeviceMessage message) { @@ -123,67 +133,60 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora protected Mono> createDeviceMessageLog(String productId, DeviceMessage message, - Consumer logEntityConsumer) { + BiConsumer logEntityConsumer) { DeviceOperationLogEntity operationLog = new DeviceOperationLogEntity(); operationLog.setId(IDGenerator.SNOW_FLAKE_STRING.generate()); operationLog.setDeviceId(message.getDeviceId()); - operationLog.setTimestamp(message.getTimestamp()); + operationLog.setTimestamp(TimestampUtils.toMillis(message.getTimestamp())); operationLog.setCreateTime(System.currentTimeMillis()); operationLog.setProductId(productId); operationLog.setMessageId(message.getMessageId()); operationLog.setType(DeviceLogType.of(message)); if (null != logEntityConsumer) { - logEntityConsumer.accept(operationLog); + logEntityConsumer.accept(message, operationLog); } message.getHeader("log").ifPresent(operationLog::setContent); - return Mono.just(Tuples.of(DeviceTimeSeriesMetric.deviceLogMetricId(productId), TimeSeriesData.of(message.getTimestamp(), operationLog.toSimpleMap()))); + return Mono.just(Tuples.of(deviceLogMetricId(productId), TimeSeriesData.of(message.getTimestamp(), operationLog + .toSimpleMap()))); } protected Flux> convertMessageToTimeSeriesData(DeviceMessage message) { + boolean ignoreStorage = message.getHeaderOrDefault(Headers.ignoreStorage); + boolean ignoreLog = message.getHeaderOrDefault(Headers.ignoreLog); + if (ignoreStorage && ignoreLog) { + return Flux.empty(); + } + DeviceMessageTracer.trace(message, "save.before"); String productId = (String) message.getHeader("productId").orElse("null"); - Consumer logEntityConsumer = null; + BiConsumer logEntityConsumer = null; List>> all = new ArrayList<>(2); - if (message instanceof EventMessage) { - logEntityConsumer = log -> log.setContent(JSON.toJSONString(((EventMessage) message).getData())); - all.add(convertEventMessageToTimeSeriesData(productId, ((EventMessage) message))); - } - //上报属性 - else if (message instanceof ReportPropertyMessage) { - ReportPropertyMessage reply = (ReportPropertyMessage) message; - Map properties = reply.getProperties(); - if (MapUtils.isNotEmpty(properties)) { - logEntityConsumer = log -> log.setContent(properties); - all.add(convertProperties(productId, message, properties)); - } - } - //消息回复 - else if (message instanceof DeviceMessageReply) { - //失败的回复消息 - if (!((DeviceMessageReply) message).isSuccess()) { - logEntityConsumer = log -> log.setContent(message.toString()); - } else if (message instanceof ReadPropertyMessageReply) { - ReadPropertyMessageReply reply = (ReadPropertyMessageReply) message; - Map properties = reply.getProperties(); - logEntityConsumer = log -> log.setContent(properties); - all.add(convertProperties(productId, message, properties)); - } else if (message instanceof WritePropertyMessageReply) { - WritePropertyMessageReply reply = (WritePropertyMessageReply) message; - Map properties = reply.getProperties(); - logEntityConsumer = log -> log.setContent(properties); - all.add(convertProperties(productId, message, properties)); + //没有忽略数据存储 + if (!ignoreStorage) { + //事件上报 + if (message instanceof EventMessage) { + all.add(convertEventMessageToTimeSeriesData(productId, ((EventMessage) message))); } else { - logEntityConsumer = log -> log.setContent(message.toJson().toJSONString()); + //属性相关 + Map properties = DeviceMessageUtils + .tryGetProperties(message) + .orElseGet(Collections::emptyMap); + if (MapUtils.isNotEmpty(properties)) { + all.add(convertProperties(productId, message, properties)); + } } } - //其他 - else { - logEntityConsumer = log -> log.setContent(message.toJson().toJSONString()); + //日志 + if (message instanceof DeviceLogMessage) { + logEntityConsumer = (msg, log) -> log.setContent(((DeviceLogMessage) msg).getLog()); } - //配置了记录日志 + //配置了记录日志,并且消息头里没有标记忽略日志 if (properties.getLog().match(message.getMessageType()) - && !message.getHeader("ignoreLog").isPresent()) { + && !ignoreLog) { + if (logEntityConsumer == null) { + logEntityConsumer = (msg, log) -> log.setContent(msg.toJson()); + } all.add(createDeviceMessageLog(productId, message, logEntityConsumer)); } @@ -194,7 +197,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return deviceRegistry .getDevice(message.getDeviceId()) - .flatMap(device -> device.getMetadata() + .flatMap(device -> device + .getMetadata() .map(metadata -> { Object value = message.getData(); DataType dataType = metadata @@ -207,19 +211,19 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora @SuppressWarnings("all") Map mapValue = ((Map) tempValue); int size = mapValue.size(); - data = new HashMap<>((int) ((size / 0.75) + 7)); + data = newMap(size); data.putAll(mapValue); } else { - data = new HashMap<>(); + data = newMap(16); data.put("value", tempValue); } data.put("id", createDataId(message)); data.put("deviceId", device.getDeviceId()); data.put("createTime", System.currentTimeMillis()); - return TimeSeriesData.of(message.getTimestamp(), data); + return TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data); })) - .map(data -> Tuples.of(DeviceTimeSeriesMetric.deviceEventMetricId(productId, message.getEvent()), data)); + .map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data)); } @@ -228,9 +232,9 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .getDevice(deviceId) .flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId)) .flatMap(productId -> this - .doQueryPager(DeviceTimeSeriesMetric.deviceLogMetricId(productId), - entity.and("deviceId", TermType.eq, deviceId), - data -> data.as(DeviceOperationLogEntity.class) + .doQueryPager(deviceLogMetricId(productId), + entity.and("deviceId", TermType.eq, deviceId), + data -> data.as(DeviceOperationLogEntity.class) )) .defaultIfEmpty(PagerResult.empty()); } @@ -246,19 +250,20 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return deviceRegistry .getDevice(deviceId) .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata())) - .flatMapMany(tp2 -> query.toQuery() + .flatMapMany(tp2 -> query + .toQuery() .where("deviceId", deviceId) .execute(param -> this - .doQuery(DeviceTimeSeriesMetric.deviceEventMetricId(tp2.getT1().getId(), event), - param, - data -> { - DeviceEvent deviceEvent = new DeviceEvent(data.values()); - if (format) { - deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); - } - deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); - return deviceEvent; - }))); + .doQuery(deviceEventMetricId(tp2.getT1().getId(), event), + param, + data -> { + DeviceEvent deviceEvent = new DeviceEvent(data.values()); + if (format) { + deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); + } + deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); + return deviceEvent; + }))); } @Nonnull @@ -272,18 +277,18 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .getDevice(deviceId) .flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata())) .flatMap(tp2 -> query.toQuery() - .where("deviceId", deviceId) - .execute(param -> this - .doQueryPager(DeviceTimeSeriesMetric.deviceEventMetricId(tp2.getT1().getId(), event), - param, - data -> { - DeviceEvent deviceEvent = new DeviceEvent(data.values()); - if (format) { - deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); - } - deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); - return deviceEvent; - })) + .where("deviceId", deviceId) + .execute(param -> this + .doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event), + param, + data -> { + DeviceEvent deviceEvent = new DeviceEvent(data.values()); + if (format) { + deviceEvent.putFormat(tp2.getT2().getEventOrNull(event)); + } + deviceEvent.putIfAbsent("timestamp", data.getTimestamp()); + return deviceEvent; + })) ); } @@ -302,8 +307,12 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora if (value == null || metadata == null) { return value; } - if (metadata instanceof Converter) { - return ((Converter) metadata).convert(value); + //使用json字符串来存储 + if (propertyIsJsonStringStorage(metadata)) { + return value instanceof String ? String.valueOf(value) : JSON.toJSONString(value); + } + if (metadata.getValueType() instanceof Converter) { + return ((Converter) metadata.getValueType()).convert(value); } return value; } @@ -319,7 +328,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .getDevice(message.getDeviceId()) .flatMapMany(device -> device .getMetadata() - .map(metadata -> { + .flatMap(metadata -> { int size = properties.size(); String id; //强制使用时间戳作为数据ID @@ -328,17 +337,51 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora } else { id = createDataId(message); } - Map newData = new HashMap<>(size < 5 ? 16 : (int) ((size + 5) / 0.75D) + 1); - properties.forEach((k, v) -> newData.put(k, convertPropertyValue(v, metadata.getPropertyOrNull(k)))); - newData.put("deviceId", message.getDeviceId()); - newData.put("productId", productId); - newData.put("timestamp", message.getTimestamp()); - newData.put("createTime", System.currentTimeMillis()); - newData.put("id", DigestUtils.md5Hex(id)); - return Tuples.of(getPropertyTimeSeriesMetric(productId), TimeSeriesData.of(message.getTimestamp(), newData)); + Mono> dataSupplier; + + int metaSize = metadata.getProperties().size(); + //标记了是部分属性 + if (message.getHeader(Headers.partialProperties).orElse(false)) { + dataSupplier = this + .queryEachOneProperties(message.getDeviceId(), QueryParamEntity.of()) + .collectMap(DeviceProperty::getProperty, DeviceProperty::getValue, () -> newMap(metaSize + 5)); + } else { + dataSupplier = Mono.just(newMap(size)); + } + return dataSupplier + .flatMap(newData -> { + //转换属性数据 + for (Map.Entry entry : properties.entrySet()) { + PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(entry.getKey()); + //没有配置物模型或者忽略了存储 + if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) { + continue; + } + Object value = convertPropertyValue(entry.getValue(), propertyMetadata); + if (null != value) { + newData.put(entry.getKey(), value); + } + } + //没有属性值,可能全部都配置了不存储 + if (newData.isEmpty()) { + return Mono.empty(); + } + newData.put("deviceId", message.getDeviceId()); + newData.put("productId", productId); + newData.put("timestamp", TimestampUtils.toMillis(message.getTimestamp())); + newData.put("createTime", System.currentTimeMillis()); + newData.put("id", DigestUtils.md5Hex(id)); + return Mono.just( + Tuples.of(getPropertyTimeSeriesMetric(productId), TimeSeriesData.of(message.getTimestamp(), newData)) + ); + }); })); } + private Map newMap(int size) { + return Maps.newHashMapWithExpectedSize(size); + } + protected Flux> convertPropertiesForRowPolicy(String productId, DeviceMessage message, Map properties) { @@ -353,34 +396,107 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora .flatMapMany(metadata -> Flux .fromIterable(properties.entrySet()) .index() - .map(entry -> { + .flatMap(entry -> { String id; long ts = message.getTimestamp(); String property = entry.getT2().getKey(); + //忽略存在没有的属性和忽略存储的属性 + PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(property); + if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) { + return Mono.empty(); + } //强制使用时间戳作为数据ID if (message.getHeader(Headers.useTimestampAsId).orElse(false)) { id = String.join("_", message.getDeviceId(), property, String.valueOf(message.getTimestamp())); } else { id = String.join("_", message.getDeviceId(), property, String.valueOf(createUniqueNanoTime(ts))); } - DevicePropertiesEntity entity = DevicePropertiesEntity.builder() - .id(DigestUtils.md5Hex(id)) - .deviceId(device.getDeviceId()) - .timestamp(ts) - .property(property) - .productId(productId) - .createTime(System.currentTimeMillis()) - .build() - .withValue(metadata.getPropertyOrNull(entry.getT2().getKey()), entry.getT2().getValue()); - - return TimeSeriesData.of(entity.getTimestamp(), entity.toMap()); + return Mono + .just(TimeSeriesData.of(ts, this + .createRowPropertyData(id, + TimestampUtils.toMillis(ts), + device.getDeviceId(), + propertyMetadata, + entry.getT2().getValue())) + ); }) - .map(data -> Tuples.of(DeviceTimeSeriesMetric.devicePropertyMetricId(productId), data))) + .map(data -> Tuples.of(devicePropertyMetricId(productId), data))) ); } + protected Map createRowPropertyData(String id, + long timestamp, + String deviceId, + PropertyMetadata property, + Object value) { + Map propertyData = newMap(24); + propertyData.put("id", DigestUtils.md5Hex(id)); + propertyData.put("deviceId", deviceId); + propertyData.put("timestamp", timestamp); + propertyData.put("property", property.getId()); + propertyData.put("createTime", System.currentTimeMillis()); + + fillRowPropertyValue(propertyData, property, value); + return propertyData; + } + + protected void fillRowPropertyValue(Map target, PropertyMetadata property, Object value) { + if (value == null) { + return; + } + if (property == null) { + if (value instanceof Number) { + target.put("numberValue", value); + } else if (value instanceof Date) { + target.put("timeValue", value); + } + target.put("value", String.valueOf(value)); + return; + } + DataType type = property.getValueType(); + target.put("type", type.getId()); + String convertedValue; + if (type instanceof NumberType) { + NumberType numberType = (NumberType) type; + Number number = numberType.convertNumber(value); + if (number == null) { + throw new UnsupportedOperationException("无法将" + value + "转为" + type.getId()); + } + convertedValue = String.valueOf(number); + target.put("numberValue", number); + } else if (type instanceof DateTimeType) { + DateTimeType dateTimeType = (DateTimeType) type; + convertedValue = String.valueOf(value); + target.put("timeValue", dateTimeType.convert(convertedValue)); + } else if (propertyIsJsonStringStorage(property)) { + //使用json字符来存储 + convertedValue = value instanceof String + ? String.valueOf(value) + : JSON.toJSONString(value); + + } else if (type instanceof ObjectType) { + ObjectType objectType = (ObjectType) type; + Object val = objectType.convert(value); + convertedValue = JSON.toJSONString(val); + target.put("objectValue", val); + } else if (type instanceof ArrayType) { + ArrayType objectType = (ArrayType) type; + Object val = objectType.convert(value); + convertedValue = JSON.toJSONString(val); + target.put("arrayValue", val); + } else if (type instanceof GeoType) { + GeoType geoType = (GeoType) type; + GeoPoint val = geoType.convert(value); + convertedValue = String.valueOf(val); + target.put("geoValue", val); + } else { + convertedValue = String.valueOf(value); + } + target.put("value", convertedValue); + } + protected String getPropertyTimeSeriesMetric(String productId) { - return DeviceTimeSeriesMetric.devicePropertyMetricId(productId); + return devicePropertyMetricId(productId); } protected Mono> getProductAndMetadataByDevice(String deviceId) { @@ -411,5 +527,4 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora return nano + inc; } - } diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/StorageConstants.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/StorageConstants.java new file mode 100755 index 00000000..97f4df90 --- /dev/null +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/service/data/StorageConstants.java @@ -0,0 +1,38 @@ +package org.jetlinks.community.device.service.data; + +import org.jetlinks.core.metadata.PropertyMetadata; + +public interface StorageConstants { + String storePolicyConfigKey = "storePolicy"; + + String propertyStorageType = "storageType"; + String propertyStorageTypeJson = "json-string"; + String propertyStorageTypeIgnore = "ignore"; + + /** + * 判断属性是否使用json字符串来存储 + * + * @param metadata 属性物模型 + * @return 是否使用json字符串存储 + */ + static boolean propertyIsJsonStringStorage(PropertyMetadata metadata) { + return metadata + .getExpand(propertyStorageType) + .map(propertyStorageTypeJson::equals) + .orElse(false); + } + + /** + * 判断属性是否忽略存储 + * + * @param metadata 属性物模型 + * @return 属性是否忽略存储 + */ + static boolean propertyIsIgnoreStorage(PropertyMetadata metadata) { + return metadata + .getExpand(propertyStorageType) + .map(propertyStorageTypeIgnore::equals) + .orElse(false); + } + +}