diff --git a/jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/logback/SystemLoggingAppender.java b/jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/logback/SystemLoggingAppender.java index 35b88f83..d56e5d4f 100644 --- a/jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/logback/SystemLoggingAppender.java +++ b/jetlinks-components/logging-component/src/main/java/org/jetlinks/community/logging/logback/SystemLoggingAppender.java @@ -87,7 +87,7 @@ public class SystemLoggingAppender extends UnsynchronizedAppenderBase ERROR_KEY = + PropertyConstants.Key.of("store_error", () -> null, String.class); + protected final ThingsRegistry registry; protected final MetricBuilder metricBuilder; @@ -70,9 +75,9 @@ public abstract class AbstractSaveOperations implements SaveOperations { protected Map createLogData(ThingMessage message) { Map data = Maps.newHashMapWithExpectedSize(8); - data.put(COLUMN_ID, IDGenerator.SNOW_FLAKE_STRING.generate()); + data.put(COLUMN_ID, getOrCreateUid(message)); data.put(metricBuilder.getThingIdProperty(), message.getThingId()); - data.put(COLUMN_TIMESTAMP, message.getTimestamp()); + data.put(COLUMN_TIMESTAMP, convertTimestamp(message.getTimestamp())); data.put(COLUMN_CREATE_TIME, System.currentTimeMillis()); data.put(COLUMN_MESSAGE_ID, message.getMessageId()); data.put(COLUMN_LOG_TYPE, ThingLogType.of(message).name()); @@ -86,12 +91,31 @@ public abstract class AbstractSaveOperations implements SaveOperations { return data; } + protected long convertTimestamp(long timestamp) { + return TimestampUtils.toMillis(timestamp); + } + + protected String getOrCreateUid(ThingMessage message) { + return message.getHeaderOrElse(PropertyConstants.uid, this::randomId); + } + + protected String randomId() { + return IDGenerator.RANDOM.generate(); + } + protected String getTemplateIdFromMessage(ThingMessage message) { String templateId = message.getHeader(Headers.productId).orElse(null); if (templateId == null) { templateId = message.getHeader(ThingConstants.templateId).orElse(null); } - return templateId == null ? "null" : templateId; + if (templateId == null) { + log.warn("{} [{}] message not contains templateId(productId) : {}", + message.getThingType(), + message.getThingId(), + message); + return null; + } + return templateId; } protected Flux> convertMessageToTimeSeriesData(ThingMessage message) { @@ -101,29 +125,29 @@ public abstract class AbstractSaveOperations implements SaveOperations { return Flux.empty(); } String templateId = getTemplateIdFromMessage(message); + if (templateId == null) { + return Flux.empty(); + } List>> all = new ArrayList<>(2); //没有忽略数据存储 if (!ignoreStorage) { //事件上报 if (message instanceof ThingEventMessage) { - all.add(convertEventMessageToTimeSeriesData(templateId, ((ThingEventMessage) message))); + all.add(convertEventMessageToTimeSeriesData(templateId, ((ThingEventMessage) message)) + //记录错误信息 + .onErrorResume(error -> { + handlerError("convert event message to TimeSeries data", message, error); + return Mono.empty(); + })); } //属性相关消息 else if (message instanceof PropertyMessage) { - //配置了只保存属性上报 - if (!settings.getProperty().isOnlySaveReport() - || (message instanceof ThingReportPropertyMessage)) { - PropertyMessage propertyMessage = ((PropertyMessage) message); - Map properties = propertyMessage.getProperties(); - if (MapUtils.isNotEmpty(properties)) { - //属性源时间 - Map propertiesTimes = propertyMessage.getPropertySourceTimes(); - if (propertiesTimes == null) { - propertiesTimes = Collections.emptyMap(); - } - all.add(convertProperties(templateId, message, properties, propertiesTimes)); - } - } + all.add(convertPropertyMessageToTimeSeriesData(templateId, message) + //记录错误信息 + .onErrorResume(error -> { + handlerError("convert property message to TimeSeries data", message, error); + return Mono.empty(); + })); } } @@ -132,10 +156,45 @@ public abstract class AbstractSaveOperations implements SaveOperations { all.add(createDeviceMessageLog(templateId, message)); } - return Flux.merge(all); + return Flux.concat(all); } - private Mono> convertEventMessageToTimeSeriesData(String templateId, ThingEventMessage message) { + protected void handlerError(String operations, ThingMessage source, Throwable error) { + log.warn("{} {}", operations, source, error); + String msg = operations + ":" + error.getMessage(); + source.computeHeader(ERROR_KEY, (ignore, old) -> { + if (old == null) { + old = msg; + } else { + old = old + "\n" + msg; + } + return old; + }); + } + + Flux> convertPropertyMessageToTimeSeriesData(String templateId, ThingMessage message) { + try { + //配置了只保存属性上报 + if (!settings.getProperty().isOnlySaveReport() + || (message instanceof ThingReportPropertyMessage)) { + PropertyMessage propertyMessage = ((PropertyMessage) message); + Map properties = propertyMessage.getProperties(); + if (MapUtils.isNotEmpty(properties)) { + //属性源时间 + Map propertiesTimes = propertyMessage.getPropertySourceTimes(); + if (propertiesTimes == null) { + propertiesTimes = Collections.emptyMap(); + } + return convertProperties(templateId, message, properties, propertiesTimes); + } + } + } catch (Throwable error) { + return Flux.error(error); + } + return Flux.empty(); + } + + Mono> convertEventMessageToTimeSeriesData(String templateId, ThingEventMessage message) { return registry .getTemplate(message.getThingType(), templateId) @@ -157,7 +216,7 @@ public abstract class AbstractSaveOperations implements SaveOperations { return; } Map data = createEventData(message, metadata); - sink.next(TimeSeriesData.of(TimestampUtils.toMillis(message.getTimestamp()), data)); + sink.next(TimeSeriesData.of(convertTimestamp(message.getTimestamp()), data)); }) .map(data -> Tuples.of(createEventMetric(message.getThingType(), templateId, message.getThingId(), message.getEvent()), data)); } @@ -195,7 +254,7 @@ public abstract class AbstractSaveOperations implements SaveOperations { @SuppressWarnings("all") Map mapValue = ((Map) tempValue); int size = mapValue.size(); - data = Maps.newHashMapWithExpectedSize(size); + data = Maps.newHashMapWithExpectedSize(size + 6); data.putAll(mapValue); //严格模式,只记录物模型中记录的字段 if (settings.isStrict()) { @@ -217,24 +276,49 @@ public abstract class AbstractSaveOperations implements SaveOperations { if (settings.getEvent().eventIsAllInOne()) { data.put(COLUMN_EVENT_ID, message.getEvent()); } - data.put(COLUMN_ID, createEventDataId(message)); + long ts = convertTimestamp(message.getTimestamp()); + data.put(COLUMN_ID, createEventDataId(ts, message, data.get(COLUMN_ID))); data.put(metricBuilder.getThingIdProperty(), message.getThingId()); data.put(COLUMN_CREATE_TIME, System.currentTimeMillis()); - data.put(COLUMN_TIMESTAMP, message.getTimestamp()); + data.put(COLUMN_TIMESTAMP, ts); return data; } - protected String createEventDataId(ThingMessage message) { - return DigestUtils - .md5Hex(StringBuilderUtils.buildString(message, (msg, builder) -> builder - .append(msg.getThingId()) - .append('-') - .append(msg.getTimestamp()))); + protected String createEventDataId(long ts, ThingEventMessage message, Object idMaybe) { + + return md5Hex( + StringBuilderUtils + .buildString(message, idMaybe, ts, (msg, _idMaybe, _ts, builder) -> { + //追加thingId,方式不同的设备数据id相同被覆盖. + builder + .append(msg.getThingId()) + .append('-') + .append(msg.getEvent()); + + //使用时间戳作为数据ID + if (useTimestampId(msg)) { + builder.append(_ts); + } + //根据传入的ID生成ID + else { + String dataId = ObjectUtils.isEmpty(_idMaybe) + ? getOrCreateUid(msg) + : String.valueOf(_idMaybe); + + builder.append(dataId); + } + + })); } - private Mono> createDeviceMessageLog(String templateId, - ThingMessage message) { + protected boolean useTimestampId(ThingMessage message) { + return message.getHeaderOrDefault(Headers.useTimestampAsId); + } + + + Mono> createDeviceMessageLog(String templateId, + ThingMessage message) { return Mono.just(Tuples.of( diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeSaveOperationsBase.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeSaveOperationsBase.java index 37cdda22..946525d3 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeSaveOperationsBase.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/ColumnModeSaveOperationsBase.java @@ -12,7 +12,6 @@ import org.jetlinks.core.metadata.types.NumberType; import org.jetlinks.core.things.ThingMetadata; import org.jetlinks.core.things.ThingsRegistry; import org.jetlinks.core.utils.StringBuilderUtils; -import org.jetlinks.core.utils.TimestampUtils; import org.jetlinks.community.things.data.ThingsDataConstants; import org.jetlinks.community.timeseries.TimeSeriesData; import org.jetlinks.community.utils.ObjectMappers; @@ -32,6 +31,9 @@ public abstract class ColumnModeSaveOperationsBase extends AbstractSaveOperation } protected String createPropertyDataId(ThingMessage message) { + if (!useTimestampId(message)) { + return randomId(); + } return DigestUtils.md5Hex( StringBuilderUtils .buildString(message, (m, builder) -> { @@ -76,13 +78,13 @@ public abstract class ColumnModeSaveOperationsBase extends AbstractSaveOperation if (data.isEmpty()) { return null; } - + long timestamp = convertTimestamp(message.getTimestamp()); data.put(metricBuilder.getThingIdProperty(), message.getThingId()); - data.put(COLUMN_TIMESTAMP, TimestampUtils.toMillis(message.getTimestamp())); + data.put(COLUMN_TIMESTAMP, timestamp); data.put(COLUMN_CREATE_TIME, System.currentTimeMillis()); data.put(COLUMN_ID, id); return Tuples.of(metricBuilder.createPropertyMetric(message.getThingType(), templateId, message.getThingId()), - TimeSeriesData.of(message.getTimestamp(), handlePropertiesData(metadata, data))); + TimeSeriesData.of(timestamp, handlePropertiesData(metadata, data))); })); } diff --git a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/RowModeSaveOperationsBase.java b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/RowModeSaveOperationsBase.java index b3521634..a44cc4a9 100644 --- a/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/RowModeSaveOperationsBase.java +++ b/jetlinks-components/things-component/src/main/java/org/jetlinks/community/things/data/operations/RowModeSaveOperationsBase.java @@ -57,6 +57,7 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations { Map properties, Map propertySourceTimes) { List> data = new ArrayList<>(properties.size()); + String metric = metricBuilder.createPropertyMetric(message.getThingType(), templateId, message.getThingId()); for (Map.Entry entry : properties.entrySet()) { @@ -67,21 +68,26 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations { if (value == null || propertyMetadata == null || propertyIsIgnoreStorage(propertyMetadata)) { continue; } - long timestamp = propertySourceTimes.getOrDefault(property, message.getTimestamp()); + try { + long timestamp = convertTimestamp( + propertySourceTimes.getOrDefault(property, message.getTimestamp())); + String dataId = createPropertyDataId(property, message, timestamp); - String dataId = createPropertyDataId(property, message, timestamp); + data.add( + Tuples.of( + metric, + TimeSeriesData.of(timestamp, this + .createRowPropertyData(dataId, + TimestampUtils.toMillis(timestamp), + message, + propertyMetadata, + value)) + ) + ); + } catch (Throwable err) { + handlerError("create property[" + property + "] ts data", message, err); + } - data.add( - Tuples.of( - metric, - TimeSeriesData.of(timestamp, this - .createRowPropertyData(dataId, - TimestampUtils.toMillis(timestamp), - message, - propertyMetadata, - value)) - ) - ); } return data; } @@ -90,13 +96,9 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations { return ThingsDataConstants.propertyIsIgnoreStorage(metadata); } - protected boolean useTimestampId(ThingMessage message) { - return message.getHeaderOrDefault(Headers.useTimestampAsId); - } - protected String createPropertyDataId(String property, ThingMessage message, long timestamp) { if (!useTimestampId(message)) { - return IDGenerator.SNOW_FLAKE_STRING.generate(); + return randomId(); } return DigestUtils.md5Hex( StringBuilderUtils @@ -137,7 +139,8 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations { return; } DataType type = property.getValueType(); - target.put(COLUMN_PROPERTY_TYPE, type.getId()); + //不存储type,没啥意义 + // target.put(COLUMN_PROPERTY_TYPE, type.getId()); String convertedValue; if (type instanceof NumberType) { NumberType numberType = (NumberType) type; @@ -145,7 +148,7 @@ public abstract class RowModeSaveOperationsBase extends AbstractSaveOperations { ? ((Number) value) : numberType.convertNumber(value); if (number == null) { - throw new BusinessException("error.cannot_convert", 500, value, type.getId()); + throw new BusinessException.NoStackTrace("error.cannot_convert", 500, value, type.getId()); } convertedValue = String.valueOf(number); target.put(COLUMN_PROPERTY_NUMBER_VALUE, convertNumberValue(number)); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java index a4dab208..61ba77ca 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java @@ -253,7 +253,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler { if (null == message) { return Mono.empty(); } - message.addHeader(PropertyConstants.uid, IDGenerator.SNOW_FLAKE_STRING.generate()); + message.addHeaderIfAbsent(PropertyConstants.uid, IDGenerator.RANDOM.generate()); return this .getTopic(message) .flatMap(topic -> eventBus.publish(topic, message).then())