refactor: 优化
This commit is contained in:
parent
ab95bb1d37
commit
fc62dab895
|
|
@ -89,6 +89,7 @@ public abstract class TemplateElasticSearchIndexStrategy extends AbstractElastic
|
|||
|
||||
builder.template(template -> {
|
||||
template.aliases(getAlias(index), a -> a);
|
||||
template.settings(properties::toSettings);
|
||||
template.mappings(mapping -> {
|
||||
mapping.dynamicTemplates(createDynamicTemplates());
|
||||
mapping.properties(createElasticProperties(metadata.getProperties()));
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.jetlinks.community.device.message.writer.TimeSeriesMessageWriterConne
|
|||
import org.jetlinks.community.device.service.data.*;
|
||||
import org.jetlinks.community.rule.engine.executor.DeviceSelectorBuilder;
|
||||
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProvider;
|
||||
import org.jetlinks.community.things.data.ThingsDataWriter;
|
||||
import org.jetlinks.core.device.DeviceRegistry;
|
||||
import org.jetlinks.core.device.session.DeviceSessionManager;
|
||||
import org.jetlinks.core.event.EventBus;
|
||||
|
|
@ -70,8 +71,9 @@ public class DeviceManagerConfiguration {
|
|||
|
||||
@Bean
|
||||
@ConditionalOnProperty(prefix = "device.message.writer.time-series", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService) {
|
||||
return new TimeSeriesMessageWriterConnector(dataService);
|
||||
public TimeSeriesMessageWriterConnector timeSeriesMessageWriterConnector(DeviceDataService dataService,
|
||||
ThingsDataWriter writer) {
|
||||
return new TimeSeriesMessageWriterConnector(dataService,writer);
|
||||
}
|
||||
|
||||
@AutoConfiguration
|
||||
|
|
|
|||
|
|
@ -18,9 +18,13 @@ package org.jetlinks.community.device.message.writer;
|
|||
import lombok.AllArgsConstructor;
|
||||
import lombok.Generated;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetlinks.community.things.data.ThingsDataWriter;
|
||||
import org.jetlinks.core.message.DeviceMessage;
|
||||
import org.jetlinks.community.device.service.data.DeviceDataService;
|
||||
import org.jetlinks.community.gateway.annotation.Subscribe;
|
||||
import org.jetlinks.core.message.property.Property;
|
||||
import org.jetlinks.core.message.property.PropertyMessage;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
|
|
@ -36,17 +40,32 @@ public class TimeSeriesMessageWriterConnector {
|
|||
|
||||
private final DeviceDataService dataService;
|
||||
|
||||
private final ThingsDataWriter thingsDataWriter;
|
||||
|
||||
@Subscribe(topics = "/device/**", id = "device-message-ts-writer")
|
||||
@Subscribe(topics = "/device/**", id = "device-message-ts-writer", priority = 100)
|
||||
@Generated
|
||||
public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
|
||||
return dataService
|
||||
.saveDeviceMessage(message)
|
||||
.then(writeToThingsDataWriter(message))
|
||||
.onErrorResume(err -> {
|
||||
log.warn("write device message error {}", message, err);
|
||||
return Mono.empty();
|
||||
});
|
||||
}
|
||||
|
||||
private Mono<Void> writeToThingsDataWriter(DeviceMessage message) {
|
||||
if (message instanceof PropertyMessage) {
|
||||
return Flux
|
||||
.fromIterable(((PropertyMessage) message).getCompleteProperties())
|
||||
.concatMap(prop -> thingsDataWriter
|
||||
.updateProperty(message.getThingType(),
|
||||
message.getThingId(),
|
||||
prop))
|
||||
.then();
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue