diff --git a/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java index b8c904f2..47c3ecc3 100755 --- a/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/index/strategies/TemplateElasticSearchIndexStrategy.java @@ -89,6 +89,7 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic builder.template(template -> { template.aliases(getAlias(index), a -> a); + template.settings(properties::toSettings); template.mappings(mapping -> { mapping.dynamicTemplates(createDynamicTemplates()); mapping.properties(createElasticProperties(metadata.getProperties())); diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java index 8e1b0a61..3f37d33c 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/configuration/DeviceManagerConfiguration.java @@ -26,6 +26,7 @@ import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConne import org.jetlinks.community.device.service.data.*; import org.jetlinks.community.rule.engine.executor.DeviceSelectorBuilder; import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProvider; +import org.jetlinks.community.things.data.ThingsDataWriter; import org.jetlinks.core.device.DeviceRegistry; import org.jetlinks.core.device.session.DeviceSessionManager; import org.jetlinks.core.event.EventBus; @@ -70,8 +71,9 @@ public class DeviceManagerConfiguration { @Bean @ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true) - public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService) { - return new TimeSeriesMessageWriterConnector(dataService); + public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService, + ThingsDataWriter writer) { + return new TimeSeriesMessageWriterConnector(dataService,writer); } @AutoConfiguration diff --git a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java index 5be7e07c..32fc8956 100644 --- a/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java +++ b/jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/writer/TimeSeriesMessageWriterConnector.java @@ -18,9 +18,13 @@ package org.jetlinks.community.device.message.writer; import lombok.AllArgsConstructor; import lombok.Generated; import lombok.extern.slf4j.Slf4j; +import org.jetlinks.community.things.data.ThingsDataWriter; import org.jetlinks.core.message.DeviceMessage; import org.jetlinks.community.device.service.data.DeviceDataService; import org.jetlinks.community.gateway.annotation.Subscribe; +import org.jetlinks.core.message.property.Property; +import org.jetlinks.core.message.property.PropertyMessage; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -36,17 +40,32 @@ public class TimeSeriesMessageWriterConnector { private final DeviceDataService dataService; + private final ThingsDataWriter thingsDataWriter; - @Subscribe(topics = "/device/**", id = "device-message-ts-writer") + @Subscribe(topics = "/device/**", id = "device-message-ts-writer", priority = 100) @Generated public Mono writeDeviceMessageToTs(DeviceMessage message) { return dataService .saveDeviceMessage(message) + .then(writeToThingsDataWriter(message)) .onErrorResume(err -> { log.warn("write device message error {}", message, err); return Mono.empty(); }); } + private Mono writeToThingsDataWriter(DeviceMessage message) { + if (message instanceof PropertyMessage) { + return Flux + .fromIterable(((PropertyMessage) message).getCompleteProperties()) + .concatMap(prop -> thingsDataWriter + .updateProperty(message.getThingType(), + message.getThingId(), + prop)) + .then(); + } + return Mono.empty(); + } + }