优化设备消息存储

This commit is contained in:
zhou-hao 2021-04-06 18:05:41 +08:00
parent 62edadc979
commit d449aad80d
3 changed files with 255 additions and 102 deletions

View File

@ -44,7 +44,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
private final MessageHandler messageHandler;
private final static BiConsumer<Throwable, Object> doOnError = (error, val) -> log.error(error.getMessage(), error);
private final static BiConsumer<Throwable, Object> doOnError = (error, val) -> DeviceMessageConnector.log.error(error.getMessage(), error);
private final static Function<DeviceOperator, Mono<Values>> configGetter = operator -> operator.getSelfConfigs(allConfigHeader);

View File

@ -1,15 +1,18 @@
package org.jetlinks.community.device.service.data;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Maps;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.ezorm.core.param.TermType;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceLogMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
@ -18,7 +21,7 @@ import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.UnknownType;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.community.device.entity.DeviceEvent;
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
import org.jetlinks.community.device.entity.DevicePropertiesEntity;
@ -27,6 +30,8 @@ import org.jetlinks.community.device.enums.DeviceLogType;
import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.core.utils.DeviceMessageTracer;
import org.jetlinks.core.utils.TimestampUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -37,9 +42,14 @@ import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsIgnoreStorage;
import static org.jetlinks.community.device.service.data.StorageConstants.propertyIsJsonStringStorage;
import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*;
/**
* 抽象设备数据数据存储,实现一些通用的逻辑
*
@ -110,10 +120,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
@Override
public Mono<Void> saveDeviceMessage(@Nonnull Publisher<DeviceMessage> message) {
return Flux.from(message)
.flatMap(this::convertMessageToTimeSeriesData)
.groupBy(Tuple2::getT1)
.flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
.then();
.flatMap(this::convertMessageToTimeSeriesData)
.groupBy(Tuple2::getT1, Integer.MAX_VALUE)
.flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
.then();
}
protected String createDataId(DeviceMessage message) {
@ -123,67 +133,60 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
protected Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String productId,
DeviceMessage message,
Consumer<DeviceOperationLogEntity> logEntityConsumer) {
BiConsumer<DeviceMessage, DeviceOperationLogEntity> logEntityConsumer) {
DeviceOperationLogEntity operationLog = new DeviceOperationLogEntity();
operationLog.setId(IDGenerator.SNOW_FLAKE_STRING.generate());
operationLog.setDeviceId(message.getDeviceId());
operationLog.setTimestamp(message.getTimestamp());
operationLog.setTimestamp(TimestampUtils.toMillis(message.getTimestamp()));
operationLog.setCreateTime(System.currentTimeMillis());
operationLog.setProductId(productId);
operationLog.setMessageId(message.getMessageId());
operationLog.setType(DeviceLogType.of(message));
if (null != logEntityConsumer) {
logEntityConsumer.accept(operationLog);
logEntityConsumer.accept(message, operationLog);
}
message.getHeader("log").ifPresent(operationLog::setContent);
return Mono.just(Tuples.of(DeviceTimeSeriesMetric.deviceLogMetricId(productId), TimeSeriesData.of(message.getTimestamp(), operationLog.toSimpleMap())));
return Mono.just(Tuples.of(deviceLogMetricId(productId), TimeSeriesData.of(message.getTimestamp(), operationLog
.toSimpleMap())));
}
protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(DeviceMessage message) {
boolean ignoreStorage = message.getHeaderOrDefault(Headers.ignoreStorage);
boolean ignoreLog = message.getHeaderOrDefault(Headers.ignoreLog);
if (ignoreStorage && ignoreLog) {
return Flux.empty();
}
DeviceMessageTracer.trace(message, "save.before");
String productId = (String) message.getHeader("productId").orElse("null");
Consumer<DeviceOperationLogEntity> logEntityConsumer = null;
BiConsumer<DeviceMessage, DeviceOperationLogEntity> logEntityConsumer = null;
List<Publisher<Tuple2<String, TimeSeriesData>>> all = new ArrayList<>(2);
if (message instanceof EventMessage) {
logEntityConsumer = log -> log.setContent(JSON.toJSONString(((EventMessage) message).getData()));
all.add(convertEventMessageToTimeSeriesData(productId, ((EventMessage) message)));
}
//上报属性
else if (message instanceof ReportPropertyMessage) {
ReportPropertyMessage reply = (ReportPropertyMessage) message;
Map<String, Object> properties = reply.getProperties();
if (MapUtils.isNotEmpty(properties)) {
logEntityConsumer = log -> log.setContent(properties);
all.add(convertProperties(productId, message, properties));
}
}
//消息回复
else if (message instanceof DeviceMessageReply) {
//失败的回复消息
if (!((DeviceMessageReply) message).isSuccess()) {
logEntityConsumer = log -> log.setContent(message.toString());
} else if (message instanceof ReadPropertyMessageReply) {
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) message;
Map<String, Object> properties = reply.getProperties();
logEntityConsumer = log -> log.setContent(properties);
all.add(convertProperties(productId, message, properties));
} else if (message instanceof WritePropertyMessageReply) {
WritePropertyMessageReply reply = (WritePropertyMessageReply) message;
Map<String, Object> properties = reply.getProperties();
logEntityConsumer = log -> log.setContent(properties);
all.add(convertProperties(productId, message, properties));
//没有忽略数据存储
if (!ignoreStorage) {
//事件上报
if (message instanceof EventMessage) {
all.add(convertEventMessageToTimeSeriesData(productId, ((EventMessage) message)));
} else {
logEntityConsumer = log -> log.setContent(message.toJson().toJSONString());
//属性相关
Map<String, Object> properties = DeviceMessageUtils
.tryGetProperties(message)
.orElseGet(Collections::emptyMap);
if (MapUtils.isNotEmpty(properties)) {
all.add(convertProperties(productId, message, properties));
}
}
}
//其他
else {
logEntityConsumer = log -> log.setContent(message.toJson().toJSONString());
//日志
if (message instanceof DeviceLogMessage) {
logEntityConsumer = (msg, log) -> log.setContent(((DeviceLogMessage) msg).getLog());
}
//配置了记录日志
//配置了记录日志,并且消息头里没有标记忽略日志
if (properties.getLog().match(message.getMessageType())
&& !message.getHeader("ignoreLog").isPresent()) {
&& !ignoreLog) {
if (logEntityConsumer == null) {
logEntityConsumer = (msg, log) -> log.setContent(msg.toJson());
}
all.add(createDeviceMessageLog(productId, message, logEntityConsumer));
}
@ -194,7 +197,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
return deviceRegistry
.getDevice(message.getDeviceId())
.flatMap(device -> device.getMetadata()
.flatMap(device -> device
.getMetadata()
.map(metadata -> {
Object value = message.getData();
DataType dataType = metadata
@ -207,19 +211,19 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
@SuppressWarnings("all")
Map<String, Object> mapValue = ((Map) tempValue);
int size = mapValue.size();
data = new HashMap<>((int) ((size / 0.75) + 7));
data = newMap(size);
data.putAll(mapValue);
} else {
data = new HashMap<>();
data = newMap(16);
data.put("value", tempValue);
}
data.put("id", createDataId(message));
data.put("deviceId", device.getDeviceId());
data.put("createTime", System.currentTimeMillis());
return TimeSeriesData.of(message.getTimestamp(), data);
return TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data);
}))
.map(data -> Tuples.of(DeviceTimeSeriesMetric.deviceEventMetricId(productId, message.getEvent()), data));
.map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data));
}
@ -228,9 +232,9 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.getDevice(deviceId)
.flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
.flatMap(productId -> this
.doQueryPager(DeviceTimeSeriesMetric.deviceLogMetricId(productId),
entity.and("deviceId", TermType.eq, deviceId),
data -> data.as(DeviceOperationLogEntity.class)
.doQueryPager(deviceLogMetricId(productId),
entity.and("deviceId", TermType.eq, deviceId),
data -> data.as(DeviceOperationLogEntity.class)
))
.defaultIfEmpty(PagerResult.empty());
}
@ -246,19 +250,20 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
return deviceRegistry
.getDevice(deviceId)
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
.flatMapMany(tp2 -> query.toQuery()
.flatMapMany(tp2 -> query
.toQuery()
.where("deviceId", deviceId)
.execute(param -> this
.doQuery(DeviceTimeSeriesMetric.deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
})));
.doQuery(deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
})));
}
@Nonnull
@ -272,18 +277,18 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.getDevice(deviceId)
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
.flatMap(tp2 -> query.toQuery()
.where("deviceId", deviceId)
.execute(param -> this
.doQueryPager(DeviceTimeSeriesMetric.deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
}))
.where("deviceId", deviceId)
.execute(param -> this
.doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
}))
);
}
@ -302,8 +307,12 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
if (value == null || metadata == null) {
return value;
}
if (metadata instanceof Converter) {
return ((Converter<?>) metadata).convert(value);
//使用json字符串来存储
if (propertyIsJsonStringStorage(metadata)) {
return value instanceof String ? String.valueOf(value) : JSON.toJSONString(value);
}
if (metadata.getValueType() instanceof Converter) {
return ((Converter<?>) metadata.getValueType()).convert(value);
}
return value;
}
@ -319,7 +328,7 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.getDevice(message.getDeviceId())
.flatMapMany(device -> device
.getMetadata()
.map(metadata -> {
.flatMap(metadata -> {
int size = properties.size();
String id;
//强制使用时间戳作为数据ID
@ -328,17 +337,51 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
} else {
id = createDataId(message);
}
Map<String, Object> newData = new HashMap<>(size < 5 ? 16 : (int) ((size + 5) / 0.75D) + 1);
properties.forEach((k, v) -> newData.put(k, convertPropertyValue(v, metadata.getPropertyOrNull(k))));
newData.put("deviceId", message.getDeviceId());
newData.put("productId", productId);
newData.put("timestamp", message.getTimestamp());
newData.put("createTime", System.currentTimeMillis());
newData.put("id", DigestUtils.md5Hex(id));
return Tuples.of(getPropertyTimeSeriesMetric(productId), TimeSeriesData.of(message.getTimestamp(), newData));
Mono<Map<String, Object>> dataSupplier;
int metaSize = metadata.getProperties().size();
//标记了是部分属性
if (message.getHeader(Headers.partialProperties).orElse(false)) {
dataSupplier = this
.queryEachOneProperties(message.getDeviceId(), QueryParamEntity.of())
.collectMap(DeviceProperty::getProperty, DeviceProperty::getValue, () -> newMap(metaSize + 5));
} else {
dataSupplier = Mono.just(newMap(size));
}
return dataSupplier
.flatMap(newData -> {
//转换属性数据
for (Map.Entry<String, Object> entry : properties.entrySet()) {
PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(entry.getKey());
//没有配置物模型或者忽略了存储
if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
continue;
}
Object value = convertPropertyValue(entry.getValue(), propertyMetadata);
if (null != value) {
newData.put(entry.getKey(), value);
}
}
//没有属性值,可能全部都配置了不存储
if (newData.isEmpty()) {
return Mono.empty();
}
newData.put("deviceId", message.getDeviceId());
newData.put("productId", productId);
newData.put("timestamp", TimestampUtils.toMillis(message.getTimestamp()));
newData.put("createTime", System.currentTimeMillis());
newData.put("id", DigestUtils.md5Hex(id));
return Mono.just(
Tuples.of(getPropertyTimeSeriesMetric(productId), TimeSeriesData.of(message.getTimestamp(), newData))
);
});
}));
}
private Map<String, Object> newMap(int size) {
return Maps.newHashMapWithExpectedSize(size);
}
protected Flux<Tuple2<String, TimeSeriesData>> convertPropertiesForRowPolicy(String productId,
DeviceMessage message,
Map<String, Object> properties) {
@ -353,34 +396,107 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.flatMapMany(metadata -> Flux
.fromIterable(properties.entrySet())
.index()
.map(entry -> {
.flatMap(entry -> {
String id;
long ts = message.getTimestamp();
String property = entry.getT2().getKey();
//忽略存在没有的属性和忽略存储的属性
PropertyMetadata propertyMetadata = metadata.getPropertyOrNull(property);
if (propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
return Mono.empty();
}
//强制使用时间戳作为数据ID
if (message.getHeader(Headers.useTimestampAsId).orElse(false)) {
id = String.join("_", message.getDeviceId(), property, String.valueOf(message.getTimestamp()));
} else {
id = String.join("_", message.getDeviceId(), property, String.valueOf(createUniqueNanoTime(ts)));
}
DevicePropertiesEntity entity = DevicePropertiesEntity.builder()
.id(DigestUtils.md5Hex(id))
.deviceId(device.getDeviceId())
.timestamp(ts)
.property(property)
.productId(productId)
.createTime(System.currentTimeMillis())
.build()
.withValue(metadata.getPropertyOrNull(entry.getT2().getKey()), entry.getT2().getValue());
return TimeSeriesData.of(entity.getTimestamp(), entity.toMap());
return Mono
.just(TimeSeriesData.of(ts, this
.createRowPropertyData(id,
TimestampUtils.toMillis(ts),
device.getDeviceId(),
propertyMetadata,
entry.getT2().getValue()))
);
})
.map(data -> Tuples.of(DeviceTimeSeriesMetric.devicePropertyMetricId(productId), data)))
.map(data -> Tuples.of(devicePropertyMetricId(productId), data)))
);
}
protected Map<String, Object> createRowPropertyData(String id,
long timestamp,
String deviceId,
PropertyMetadata property,
Object value) {
Map<String, Object> propertyData = newMap(24);
propertyData.put("id", DigestUtils.md5Hex(id));
propertyData.put("deviceId", deviceId);
propertyData.put("timestamp", timestamp);
propertyData.put("property", property.getId());
propertyData.put("createTime", System.currentTimeMillis());
fillRowPropertyValue(propertyData, property, value);
return propertyData;
}
protected void fillRowPropertyValue(Map<String, Object> target, PropertyMetadata property, Object value) {
if (value == null) {
return;
}
if (property == null) {
if (value instanceof Number) {
target.put("numberValue", value);
} else if (value instanceof Date) {
target.put("timeValue", value);
}
target.put("value", String.valueOf(value));
return;
}
DataType type = property.getValueType();
target.put("type", type.getId());
String convertedValue;
if (type instanceof NumberType) {
NumberType<?> numberType = (NumberType<?>) type;
Number number = numberType.convertNumber(value);
if (number == null) {
throw new UnsupportedOperationException("无法将" + value + "转为" + type.getId());
}
convertedValue = String.valueOf(number);
target.put("numberValue", number);
} else if (type instanceof DateTimeType) {
DateTimeType dateTimeType = (DateTimeType) type;
convertedValue = String.valueOf(value);
target.put("timeValue", dateTimeType.convert(convertedValue));
} else if (propertyIsJsonStringStorage(property)) {
//使用json字符来存储
convertedValue = value instanceof String
? String.valueOf(value)
: JSON.toJSONString(value);
} else if (type instanceof ObjectType) {
ObjectType objectType = (ObjectType) type;
Object val = objectType.convert(value);
convertedValue = JSON.toJSONString(val);
target.put("objectValue", val);
} else if (type instanceof ArrayType) {
ArrayType objectType = (ArrayType) type;
Object val = objectType.convert(value);
convertedValue = JSON.toJSONString(val);
target.put("arrayValue", val);
} else if (type instanceof GeoType) {
GeoType geoType = (GeoType) type;
GeoPoint val = geoType.convert(value);
convertedValue = String.valueOf(val);
target.put("geoValue", val);
} else {
convertedValue = String.valueOf(value);
}
target.put("value", convertedValue);
}
protected String getPropertyTimeSeriesMetric(String productId) {
return DeviceTimeSeriesMetric.devicePropertyMetricId(productId);
return devicePropertyMetricId(productId);
}
protected Mono<Tuple2<DeviceProductOperator, DeviceMetadata>> getProductAndMetadataByDevice(String deviceId) {
@ -411,5 +527,4 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
return nano + inc;
}
}

View File

@ -0,0 +1,38 @@
package org.jetlinks.community.device.service.data;
import org.jetlinks.core.metadata.PropertyMetadata;
public interface StorageConstants {
String storePolicyConfigKey = "storePolicy";
String propertyStorageType = "storageType";
String propertyStorageTypeJson = "json-string";
String propertyStorageTypeIgnore = "ignore";
/**
* 判断属性是否使用json字符串来存储
*
* @param metadata 属性物模型
* @return 是否使用json字符串存储
*/
static boolean propertyIsJsonStringStorage(PropertyMetadata metadata) {
return metadata
.getExpand(propertyStorageType)
.map(propertyStorageTypeJson::equals)
.orElse(false);
}
/**
* 判断属性是否忽略存储
*
* @param metadata 属性物模型
* @return 属性是否忽略存储
*/
static boolean propertyIsIgnoreStorage(PropertyMetadata metadata) {
return metadata
.getExpand(propertyStorageType)
.map(propertyStorageTypeIgnore::equals)
.orElse(false);
}
}