refactor(基础模块): 优化存储策略以及ID生成策略.

This commit is contained in:
zhouhao 2024-08-02 16:27:49 +08:00
parent 2dd0ef3c93
commit 7322bf1fe7
5 changed files with 148 additions and 59 deletions

View File

@ -87,7 +87,7 @@ public class SystemLoggingAppender extends UnsynchronizedAppenderBase<ILoggingEv
context.putAll(mdc);
}
SerializableSystemLog info = SerializableSystemLog.builder()
.id(IDGenerator.SNOW_FLAKE_STRING.generate())
.id(IDGenerator.RANDOM.generate())
.mavenModule(mavenModule)
.context(context)
.name(event.getLoggerName())

View File

@ -5,7 +5,7 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.id.IDGenerator;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.message.DeviceLogMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.ThingMessage;
@ -26,6 +26,7 @@ import org.jetlinks.community.things.data.ThingLogType;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import org.reactivestreams.Publisher;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -33,12 +34,16 @@ import reactor.util.function.Tuples;
import java.util.*;
import static org.hswebframework.web.utils.DigestUtils.md5Hex;
import static org.jetlinks.community.things.data.ThingsDataConstants.*;
@Slf4j
@AllArgsConstructor
public abstract class AbstractSaveOperations implements SaveOperations {
protected static final PropertyConstants.Key<String> ERROR_KEY =
PropertyConstants.Key.of("store_error", () -> null, String.class);
protected final ThingsRegistry registry;
protected final MetricBuilder metricBuilder;
@ -70,9 +75,9 @@ public abstract class AbstractSaveOperations implements SaveOperations {
protected Map<String, Object> createLogData(ThingMessage message) {
Map<String, Object> data = Maps.newHashMapWithExpectedSize(8);
data.put(COLUMN_ID, IDGenerator.SNOW_FLAKE_STRING.generate());
data.put(COLUMN_ID, getOrCreateUid(message));
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
data.put(COLUMN_TIMESTAMP, message.getTimestamp());
data.put(COLUMN_TIMESTAMP, convertTimestamp(message.getTimestamp()));
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_MESSAGE_ID, message.getMessageId());
data.put(COLUMN_LOG_TYPE, ThingLogType.of(message).name());
@ -86,12 +91,31 @@ public abstract class AbstractSaveOperations implements SaveOperations {
return data;
}
protected long convertTimestamp(long timestamp) {
return TimestampUtils.toMillis(timestamp);
}
protected String getOrCreateUid(ThingMessage message) {
return message.getHeaderOrElse(PropertyConstants.uid, this::randomId);
}
protected String randomId() {
return IDGenerator.RANDOM.generate();
}
protected String getTemplateIdFromMessage(ThingMessage message) {
String templateId = message.getHeader(Headers.productId).orElse(null);
if (templateId == null) {
templateId = message.getHeader(ThingConstants.templateId).orElse(null);
}
return templateId == null ? "null" : templateId;
if (templateId == null) {
log.warn("{} [{}] message not contains templateId(productId) : {}",
message.getThingType(),
message.getThingId(),
message);
return null;
}
return templateId;
}
protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(ThingMessage message) {
@ -101,15 +125,55 @@ public abstract class AbstractSaveOperations implements SaveOperations {
return Flux.empty();
}
String templateId = getTemplateIdFromMessage(message);
if (templateId == null) {
return Flux.empty();
}
List<Publisher<Tuple2<String, TimeSeriesData>>> all = new ArrayList<>(2);
//没有忽略数据存储
if (!ignoreStorage) {
//事件上报
if (message instanceof ThingEventMessage) {
all.add(convertEventMessageToTimeSeriesData(templateId, ((ThingEventMessage) message)));
all.add(convertEventMessageToTimeSeriesData(templateId, ((ThingEventMessage) message))
//记录错误信息
.onErrorResume(error -> {
handlerError("convert event message to TimeSeries data", message, error);
return Mono.empty();
}));
}
//属性相关消息
else if (message instanceof PropertyMessage) {
all.add(convertPropertyMessageToTimeSeriesData(templateId, message)
//记录错误信息
.onErrorResume(error -> {
handlerError("convert property message to TimeSeries data", message, error);
return Mono.empty();
}));
}
}
//配置了记录日志,并且消息头里没有标记忽略日志
if (settings.getLogFilter().match(message.getMessageType()) && !ignoreLog) {
all.add(createDeviceMessageLog(templateId, message));
}
return Flux.concat(all);
}
protected void handlerError(String operations, ThingMessage source, Throwable error) {
log.warn("{} {}", operations, source, error);
String msg = operations + ":" + error.getMessage();
source.computeHeader(ERROR_KEY, (ignore, old) -> {
if (old == null) {
old = msg;
} else {
old = old + "\n" + msg;
}
return old;
});
}
Flux<Tuple2<String, TimeSeriesData>> convertPropertyMessageToTimeSeriesData(String templateId, ThingMessage message) {
try {
//配置了只保存属性上报
if (!settings.getProperty().isOnlySaveReport()
|| (message instanceof ThingReportPropertyMessage)) {
@ -121,21 +185,16 @@ public abstract class AbstractSaveOperations implements SaveOperations {
if (propertiesTimes == null) {
propertiesTimes = Collections.emptyMap();
}
all.add(convertProperties(templateId, message, properties, propertiesTimes));
return convertProperties(templateId, message, properties, propertiesTimes);
}
}
} catch (Throwable error) {
return Flux.error(error);
}
return Flux.empty();
}
}
}
//配置了记录日志,并且消息头里没有标记忽略日志
if (settings.getLogFilter().match(message.getMessageType()) && !ignoreLog) {
all.add(createDeviceMessageLog(templateId, message));
}
return Flux.merge(all);
}
private Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String templateId, ThingEventMessage message) {
Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String templateId, ThingEventMessage message) {
return registry
.getTemplate(message.getThingType(), templateId)
@ -157,7 +216,7 @@ public abstract class AbstractSaveOperations implements SaveOperations {
return;
}
Map<String, Object> data = createEventData(message, metadata);
sink.next(TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data));
sink.next(TimeSeriesData.of(convertTimestamp(message.getTimestamp()), data));
})
.map(data -> Tuples.of(createEventMetric(message.getThingType(), templateId, message.getThingId(), message.getEvent()), data));
}
@ -195,7 +254,7 @@ public abstract class AbstractSaveOperations implements SaveOperations {
@SuppressWarnings("all")
Map<String, Object> mapValue = ((Map) tempValue);
int size = mapValue.size();
data = Maps.newHashMapWithExpectedSize(size);
data = Maps.newHashMapWithExpectedSize(size + 6);
data.putAll(mapValue);
//严格模式,只记录物模型中记录的字段
if (settings.isStrict()) {
@ -217,23 +276,48 @@ public abstract class AbstractSaveOperations implements SaveOperations {
if (settings.getEvent().eventIsAllInOne()) {
data.put(COLUMN_EVENT_ID, message.getEvent());
}
data.put(COLUMN_ID, createEventDataId(message));
long ts = convertTimestamp(message.getTimestamp());
data.put(COLUMN_ID, createEventDataId(ts, message, data.get(COLUMN_ID)));
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_TIMESTAMP, message.getTimestamp());
data.put(COLUMN_TIMESTAMP, ts);
return data;
}
protected String createEventDataId(ThingMessage message) {
return DigestUtils
.md5Hex(StringBuilderUtils.buildString(message, (msg, builder) -> builder
protected String createEventDataId(long ts, ThingEventMessage message, Object idMaybe) {
return md5Hex(
StringBuilderUtils
.buildString(message, idMaybe, ts, (msg, _idMaybe, _ts, builder) -> {
//追加thingId,方式不同的设备数据id相同被覆盖.
builder
.append(msg.getThingId())
.append('-')
.append(msg.getTimestamp())));
.append(msg.getEvent());
//使用时间戳作为数据ID
if (useTimestampId(msg)) {
builder.append(_ts);
}
//根据传入的ID生成ID
else {
String dataId = ObjectUtils.isEmpty(_idMaybe)
? getOrCreateUid(msg)
: String.valueOf(_idMaybe);
builder.append(dataId);
}
private Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String templateId,
}));
}
protected boolean useTimestampId(ThingMessage message) {
return message.getHeaderOrDefault(Headers.useTimestampAsId);
}
Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String templateId,
ThingMessage message) {

View File

@ -12,7 +12,6 @@ import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TimestampUtils;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
@ -32,6 +31,9 @@ public abstract class ColumnModeSaveOperationsBase extends AbstractSaveOperation
}
protected String createPropertyDataId(ThingMessage message) {
if (!useTimestampId(message)) {
return randomId();
}
return DigestUtils.md5Hex(
StringBuilderUtils
.buildString(message, (m, builder) -> {
@ -76,13 +78,13 @@ public abstract class ColumnModeSaveOperationsBase extends AbstractSaveOperation
if (data.isEmpty()) {
return null;
}
long timestamp = convertTimestamp(message.getTimestamp());
data.put(metricBuilder.getThingIdProperty(), message.getThingId());
data.put(COLUMN_TIMESTAMP, TimestampUtils.toMillis(message.getTimestamp()));
data.put(COLUMN_TIMESTAMP, timestamp);
data.put(COLUMN_CREATE_TIME, System.currentTimeMillis());
data.put(COLUMN_ID, id);
return Tuples.of(metricBuilder.createPropertyMetric(message.getThingType(), templateId, message.getThingId()),
TimeSeriesData.of(message.getTimestamp(), handlePropertiesData(metadata, data)));
TimeSeriesData.of(timestamp, handlePropertiesData(metadata, data)));
}));
}

View File

@ -57,6 +57,7 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
Map<String, Object> properties,
Map<String, Long> propertySourceTimes) {
List<Tuple2<String, TimeSeriesData>> data = new ArrayList<>(properties.size());
String metric = metricBuilder.createPropertyMetric(message.getThingType(), templateId, message.getThingId());
for (Map.Entry<String, Object> entry : properties.entrySet()) {
@ -67,8 +68,9 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
if (value == null || propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) {
continue;
}
long timestamp = propertySourceTimes.getOrDefault(property, message.getTimestamp());
try {
long timestamp = convertTimestamp(
propertySourceTimes.getOrDefault(property, message.getTimestamp()));
String dataId = createPropertyDataId(property, message, timestamp);
data.add(
@ -82,6 +84,10 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
value))
)
);
} catch (Throwable err) {
handlerError("create property[" + property + "] ts data", message, err);
}
}
return data;
}
@ -90,13 +96,9 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
return ThingsDataConstants.propertyIsIgnoreStorage(metadata);
}
protected boolean useTimestampId(ThingMessage message) {
return message.getHeaderOrDefault(Headers.useTimestampAsId);
}
protected String createPropertyDataId(String property, ThingMessage message, long timestamp) {
if (!useTimestampId(message)) {
return IDGenerator.SNOW_FLAKE_STRING.generate();
return randomId();
}
return DigestUtils.md5Hex(
StringBuilderUtils
@ -137,7 +139,8 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
return;
}
DataType type = property.getValueType();
target.put(COLUMN_PROPERTY_TYPE, type.getId());
//不存储type,没啥意义
// target.put(COLUMN_PROPERTY_TYPE, type.getId());
String convertedValue;
if (type instanceof NumberType) {
NumberType<?> numberType = (NumberType<?>) type;
@ -145,7 +148,7 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations {
? ((Number) value)
: numberType.convertNumber(value);
if (number == null) {
throw new BusinessException("error.cannot_convert", 500, value, type.getId());
throw new BusinessException.NoStackTrace("error.cannot_convert", 500, value, type.getId());
}
convertedValue = String.valueOf(number);
target.put(COLUMN_PROPERTY_NUMBER_VALUE, convertNumberValue(number));

View File

@ -253,7 +253,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
if (null == message) {
return Mono.empty();
}
message.addHeader(PropertyConstants.uid, IDGenerator.SNOW_FLAKE_STRING.generate());
message.addHeaderIfAbsent(PropertyConstants.uid, IDGenerator.RANDOM.generate());
return this
.getTopic(message)
.flatMap(topic -> eventBus.publish(topic, message).then())