Merge remote-tracking branch 'origin/master'

This commit is contained in:
zhou-hao 2021-05-28 17:43:41 +08:00
commit c93df4715b
25 changed files with 913 additions and 579 deletions

27
.github/workflows/pull_request.yml vendored Normal file
View File

@ -0,0 +1,27 @@
# This workflow will build a Java project with Maven
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
name: Java CI with Maven
on:
pull_request:
branches: [ master ]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Cache Maven Repository
uses: actions/cache@v1
with:
path: ~/.m2
key: jetlinks-community-maven-repository
- name: Build with Maven
run: ./mvnw package -Dmaven.test.skip=true -Pbuild

View File

@ -17,6 +17,7 @@ import java.util.Optional;
public class MeasurementParameter implements ValueObject {
private Map<String, Object> params = new HashMap<>();
@Override
public Optional<Object> get(String name) {
return Optional.ofNullable(params).map(p -> p.get(name));
}

View File

@ -37,7 +37,6 @@ import org.jetlinks.community.elastic.search.utils.ReactorActionListener;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
@ -119,12 +118,14 @@ public class DefaultElasticSearchService implements ElasticSearchService {
});
}
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(index, queryParam)
@ -373,7 +374,7 @@ public class DefaultElasticSearchService implements ElasticSearchService {
private Mono<SearchResponse> doSearch(SearchRequest request) {
return this
.<SearchRequest, SearchResponse>execute(request, restClient.getQueryClient()::searchAsync)
.execute(request, restClient.getQueryClient()::searchAsync)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();

View File

@ -12,6 +12,11 @@ import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
/**
* ES数据库业务操作类
*
* @author zhouhao
*/
public interface ElasticSearchService {
default <T> Mono<PagerResult<T>> queryPager(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {

View File

@ -27,12 +27,12 @@ import org.hswebframework.utils.time.DateFormatter;
import org.hswebframework.utils.time.DefaultDateFormatter;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager;
import org.jetlinks.community.elastic.search.index.ElasticSearchIndexMetadata;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter;
import org.jetlinks.community.elastic.search.utils.QueryParamTranslator;
import org.jetlinks.core.utils.FluxUtils;
import org.reactivestreams.Publisher;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.DependsOn;
@ -57,6 +57,8 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* 响应式ES数据库操作类
*
* @author zhouhao
* @since 1.0
**/
@ -66,20 +68,23 @@ import java.util.stream.Collectors;
@ConfigurationProperties(prefix = "elasticsearch")
public class ReactiveElasticSearchService implements ElasticSearchService {
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
public static final IndicesOptions indexOptions = IndicesOptions.fromOptions(
true, true, false, false
);
//使用对象池处理Buffer,减少GC消耗
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
static {
DateFormatter.supportFormatter.add(new DefaultDateFormatter(Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.+"), "yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
}
private final ReactiveElasticsearchClient restClient;
private final ElasticSearchIndexManager indexManager;
private FluxSink<Buffer> sink;
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
public ReactiveElasticSearchService(ReactiveElasticsearchClient restClient,
ElasticSearchIndexManager indexManager) {
this.restClient = restClient;
@ -119,12 +124,14 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
});
}
@Override
public <T> Flux<T> query(String index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(new String[]{index}, queryParam)
.flatMapMany(tp2 -> convertQueryResult(tp2.getT1(), tp2.getT2(), mapper));
}
@Override
public <T> Flux<T> query(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this
.doQuery(index, queryParam)
@ -134,16 +141,16 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override
public <T> Mono<PagerResult<T>> queryPager(String[] index, QueryParam queryParam, Function<Map<String, Object>, T> mapper) {
return this.doQuery(index, queryParam)
.flatMap(tp2 -> this
.convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.map(list -> PagerResult.of((int) tp2
.getT2()
.getHits()
.getTotalHits().value, list, queryParam))
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
.flatMap(tp2 -> this
.convertQueryResult(tp2.getT1(), tp2.getT2(), mapper)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.map(list -> PagerResult.of((int) tp2
.getT2()
.getHits()
.getTotalHits().value, list, queryParam))
)
.switchIfEmpty(Mono.fromSupplier(PagerResult::empty));
}
private <T> Flux<T> convertQueryResult(List<ElasticSearchIndexMetadata> indexList,
@ -162,8 +169,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
}
return mapper
.apply(Optional
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
.ofNullable(metadata.get(hit.getIndex())).orElse(indexList.get(0))
.convertFromElastic(hitMap));
});
}
@ -184,7 +191,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
});
}
@Override
public Mono<Long> count(String[] index, QueryParam queryParam) {
QueryParam param = queryParam.clone();
@ -223,8 +229,8 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override
public <T> Mono<Void> commit(String index, Publisher<T> data) {
return Flux.from(data)
.flatMap(d -> commit(index, d))
.then();
.flatMap(d -> commit(index, d))
.then();
}
@Override
@ -235,10 +241,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
@Override
public <T> Mono<Void> save(String index, Publisher<T> data) {
return Flux.from(data)
.map(v -> Buffer.of(index, v))
.collectList()
.flatMap(this::doSave)
.then();
.map(v -> Buffer.of(index, v))
.collectList()
.flatMap(this::doSave)
.then();
}
@Override
@ -251,32 +257,6 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
sink.complete();
}
@Getter
@Setter
private BufferConfig buffer = new BufferConfig();
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
}
//@PostConstruct
public void init() {
int flushRate = buffer.rate;
@ -288,10 +268,10 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
FluxUtils
.bufferRate(Flux.<Buffer>create(sink -> this.sink = sink),
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
flushRate,
bufferSize,
bufferTimeout,
(b, l) -> bufferedBytes.addAndGet(b.numberOfBytes()) >= bufferBytes)
.doOnNext(buf -> bufferedBytes.set(0))
.onBackpressureBuffer(bufferBackpressure, drop -> {
// TODO: 2020/11/25 将丢弃的数据存储到本地磁盘
@ -319,19 +299,186 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
})
.onErrorResume((err) -> Mono
.fromRunnable(() -> System.err.println("保存ElasticSearch数据失败:\n" +
org.hswebframework.utils.StringUtils.throwable2String(err))))
org.hswebframework.utils.StringUtils.throwable2String(err))))
.subscribe();
}
//使用对象池处理Buffer,减少GC消耗
static ObjectPool<Buffer> pool = ObjectPool.newPool(Buffer::new);
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSave(index));
}
private Mono<String> getIndexForSearch(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSearch(index));
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
.groupBy(Buffer::getIndex, Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
return this
.getIndexForSave(index)
.flatMapMany(realIndex -> group
.map(buffer -> {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
} else {
request = new IndexRequest(realIndex).type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
} finally {
buffer.release();
}
}));
})
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
lst.forEach(request::add);
return restClient
.bulk(request)
.as(save -> {
if (buffer.maxRetry > 0) {
return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
}
return save;
});
})
.doOnNext(response -> {
if (response.hasFailures()) {
System.err.println(response.buildFailureMessage());
}
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Flux<SearchHit> doSearch(SearchRequest request) {
return restClient
.search(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<Long> doCount(SearchRequest request) {
return restClient
.count(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
return indexManager
.getIndexesMetadata(indexes)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createSearchRequest(queryParam, list));
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
@Getter
@Setter
public static class BufferConfig {
//最小间隔
private int rate = Integer.getInteger("elasticsearch.buffer.rate", 1000);
//缓冲最大数量
private int bufferSize = Integer.getInteger("elasticsearch.buffer.size", 3000);
//缓冲超时时间
private Duration bufferTimeout = Duration.ofSeconds(Integer.getInteger("elasticsearch.buffer.timeout", 3));
//背压堆积数量限制.
private int bufferBackpressure = Integer.getInteger("elasticsearch.buffer.backpressure", Runtime
.getRuntime()
.availableProcessors());
//最大缓冲字节
private DataSize bufferBytes = DataSize.parse(System.getProperty("elasticsearch.buffer.bytes", "15MB"));
//最大重试次数
private int maxRetry = 3;
//重试间隔
private Duration minBackoff = Duration.ofSeconds(3);
}
@Getter
static class Buffer {
final ObjectPool.Handle<Buffer> handle;
String index;
String id;
String payload;
final ObjectPool.Handle<Buffer> handle;
public Buffer(ObjectPool.Handle<Buffer> handle) {
this.handle = handle;
@ -367,153 +514,4 @@ public class ReactiveElasticSearchService implements ElasticSearchService {
return payload == null ? 0 : payload.length() * 2;
}
}
private Mono<String> getIndexForSave(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSave(index));
}
private Mono<String> getIndexForSearch(String index) {
return indexManager
.getIndexStrategy(index)
.map(strategy -> strategy.getIndexForSearch(index));
}
protected Mono<Integer> doSave(Collection<Buffer> buffers) {
return Flux.fromIterable(buffers)
.groupBy(Buffer::getIndex,Integer.MAX_VALUE)
.flatMap(group -> {
String index = group.key();
return this
.getIndexForSave(index)
.flatMapMany(realIndex -> group
.map(buffer -> {
try {
IndexRequest request;
if (buffer.id != null) {
request = new IndexRequest(realIndex).type("_doc").id(buffer.id);
} else {
request = new IndexRequest(realIndex).type("_doc");
}
request.source(buffer.payload, XContentType.JSON);
return request;
} finally {
buffer.release();
}
}));
})
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(lst -> {
BulkRequest request = new BulkRequest();
request.timeout(TimeValue.timeValueSeconds(9));
lst.forEach(request::add);
return restClient
.bulk(request)
.as(save -> {
if (buffer.maxRetry > 0) {
return save.retryWhen(Retry.backoff(buffer.maxRetry, buffer.minBackoff));
}
return save;
});
})
.doOnNext(response -> {
if (response.hasFailures()) {
System.err.println(response.buildFailureMessage());
}
})
.thenReturn(buffers.size());
}
@SneakyThrows
protected void checkResponse(BulkResponse response) {
if (response.hasFailures()) {
for (BulkItemResponse item : response.getItems()) {
if (item.isFailed()) {
throw item.getFailure().getCause();
}
}
}
}
private <T> List<T> translate(Function<Map<String, Object>, T> mapper, SearchResponse response) {
return Arrays.stream(response.getHits().getHits())
.map(hit -> {
Map<String, Object> hitMap = hit.getSourceAsMap();
if (StringUtils.isEmpty(hitMap.get("id"))) {
hitMap.put("id", hit.getId());
}
return mapper.apply(hitMap);
})
.collect(Collectors.toList());
}
private Flux<SearchHit> doSearch(SearchRequest request) {
return restClient
.search(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
private Mono<Long> doCount(SearchRequest request) {
return restClient
.count(request)
.onErrorResume(err -> {
log.error("query elastic error", err);
return Mono.empty();
});
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, String... indexes) {
return indexManager
.getIndexesMetadata(indexes)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createSearchRequest(queryParam, list));
}
protected Mono<SearchRequest> createSearchRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList ->
new SearchRequest(indexList.toArray(new String[0]))
.source(builder)
.indicesOptions(indexOptions));
}
protected Mono<QueryBuilder> createQueryBuilder(QueryParam queryParam, String index) {
return indexManager
.getIndexMetadata(index)
.map(metadata -> QueryParamTranslator.createQueryBuilder(queryParam, metadata))
.switchIfEmpty(Mono.fromSupplier(() -> QueryParamTranslator.createQueryBuilder(queryParam, null)));
}
protected Mono<CountRequest> createCountRequest(QueryParam queryParam, List<ElasticSearchIndexMetadata> indexes) {
QueryParam tempQueryParam = queryParam.clone();
tempQueryParam.setPaging(false);
tempQueryParam.setSorts(Collections.emptyList());
SearchSourceBuilder builder = ElasticSearchConverter.convertSearchSourceBuilder(queryParam, indexes.get(0));
return Flux.fromIterable(indexes)
.flatMap(index -> getIndexForSearch(index.getIndex()))
.collectList()
.map(indexList -> new CountRequest(indexList.toArray(new String[0])).source(builder));
}
private Mono<CountRequest> createCountRequest(QueryParam queryParam, String... index) {
return indexManager
.getIndexesMetadata(index)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> createCountRequest(queryParam, list));
}
}

View File

@ -12,29 +12,53 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 设备网关管理器
* <p>
* TCP UDP MQTT CoAP
*
* @author zhouhao
*/
@Component
public class DefaultDeviceGatewayManager implements DeviceGatewayManager, BeanPostProcessor {
private final DeviceGatewayPropertiesManager propertiesManager;
private Map<String, DeviceGatewayProvider> providers = new ConcurrentHashMap<>();
/**
* TCP MQTT的设备网关服务提供者
*/
private final Map<String, DeviceGatewayProvider> providers = new ConcurrentHashMap<>();
private Map<String, DeviceGateway> store = new ConcurrentHashMap<>();
/**
* 启动状态的设备网关
*/
private final Map<String, DeviceGateway> store = new ConcurrentHashMap<>();
public DefaultDeviceGatewayManager(DeviceGatewayPropertiesManager propertiesManager) {
this.propertiesManager = propertiesManager;
}
/**
* 获取设备网关有则返回没有就创建返回
*
* @param id 网关ID
* @return 设备网关
*/
private Mono<DeviceGateway> doGetGateway(String id) {
if (store.containsKey(id)) {
return Mono.just(store.get(id));
}
// 数据库查 DeviceGatewayEntity 转换成 DeviceGatewayProperties
// BeanMap中找provider 找不到就是不支持
// 创建设备网关
// double check 防止重复创建
return propertiesManager
.getProperties(id)
.switchIfEmpty(Mono.error(()->new UnsupportedOperationException("网关配置[" + id + "]不存在")))
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("网关配置[" + id + "]不存在")))
.flatMap(properties -> Mono
.justOrEmpty(providers.get(properties.getProvider()))
.switchIfEmpty(Mono.error(()->new UnsupportedOperationException("不支持的网络服务[" + properties.getProvider() + "]")))
.switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("不支持的网络服务[" + properties.getProvider() + "]")))
.flatMap(provider -> provider
.createDeviceGateway(properties)
.flatMap(gateway -> {

View File

@ -7,9 +7,17 @@ import org.jetlinks.community.ValueObject;
import java.util.HashMap;
import java.util.Map;
/**
* 设备网关属性外观类
* <p>
* 转换设备网关属性数据
* </p>
*
* @author zhouhao
*/
@Getter
@Setter
public class DeviceGatewayProperties implements ValueObject {
public class DeviceGatewayProperties implements ValueObject {
private String id;
@ -17,7 +25,7 @@ public class DeviceGatewayProperties implements ValueObject {
private String networkId;
private Map<String,Object> configuration=new HashMap<>();
private Map<String, Object> configuration = new HashMap<>();
@Override
public Map<String, Object> values() {

View File

@ -2,8 +2,19 @@ package org.jetlinks.community.gateway.supports;
import reactor.core.publisher.Mono;
/**
* 设备网关属性管理器
*
* @author zhouhao
*/
public interface DeviceGatewayPropertiesManager {
/**
* 获取网关的属性
*
* @param id 网关ID
* @return 网关属性
*/
Mono<DeviceGatewayProperties> getProperties(String id);

View File

@ -4,6 +4,14 @@ import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.network.NetworkType;
import reactor.core.publisher.Mono;
/**
* 设备网关支持提供商,用于提供对各种设备网关的支持.在启动设备网关时,会根据对应的提供商以及配置来创建设备网关.
* 实现统一管理网关配置,动态创建设备网关.
*
* @author zhouhao
* @see DeviceGateway
* @since 1.0
*/
public interface DeviceGatewayProvider {
String getId();

View File

@ -19,6 +19,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 默认网络管理器
*
* @author zhouhao
*/
@Component
@Slf4j
public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor {
@ -26,9 +31,9 @@ public class DefaultNetworkManager implements NetworkManager, BeanPostProcessor
private final NetworkConfigManager configManager;
private Map<String, Map<String, Network>> store = new ConcurrentHashMap<>();
private final Map<String, Map<String, Network>> store = new ConcurrentHashMap<>();
private Map<String, NetworkProvider<Object>> providerSupport = new ConcurrentHashMap<>();
private final Map<String, NetworkProvider<Object>> providerSupport = new ConcurrentHashMap<>();
public DefaultNetworkManager(NetworkConfigManager configManager) {
this.configManager = configManager;

View File

@ -4,13 +4,48 @@ import reactor.core.publisher.Mono;
import java.util.List;
/**
* 网络服务管理器
* <p>
* 管理所有的网络组件
*
* @author zhouhao
* @since 1.0
*/
public interface NetworkManager {
/**
* 根据ID获取网络组件否则根据type和id创建网络组件并返回
*
* @param type 网络类型
* @param id 网络组件id
* @param <T> NetWork子类泛型
* @return 网络组件
*/
<T extends Network> Mono<T> getNetwork(NetworkType type, String id);
/**
* 获取所有的网络组件支持提供商
*
* @return 网络组件支持提供商
*/
List<NetworkProvider<?>> getProviders();
Mono<Void> reload(NetworkType type, String id);
/**
* 重新加载网络组件
*
* @param type 网络类型
* @param id 网络组件ID
* @return void
*/
Mono<Void> reload(NetworkType type, String id);
Mono<Void> shutdown(NetworkType type, String id);
/**
* 停止网络组件
*
* @param type 网络类型
* @param id 网络组件ID
* @return void
*/
Mono<Void> shutdown(NetworkType type, String id);
}

View File

@ -21,6 +21,14 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* 设备网关处理工具
* <p>
* 封装常用的设备消息处理操作
* </p>
*
* @author zhouhao
*/
@AllArgsConstructor
public class DeviceGatewayHelper {
@ -88,13 +96,13 @@ public class DeviceGatewayHelper {
return Mono
.delay(Duration.ofSeconds(2))
.then(registry
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals)
.flatMap(ignore -> registerSession))
.getDevice(children.getDeviceId())
.flatMap(device -> device
//没有配置状态自管理才自动上线
.getSelfConfig(DeviceConfigKey.selfManageState)
.defaultIfEmpty(false)
.filter(Boolean.FALSE::equals)
.flatMap(ignore -> registerSession))
);
}
return registerSession;
@ -102,6 +110,15 @@ public class DeviceGatewayHelper {
return Mono.empty();
}
/**
* 处理来自设备网关的设备消息
*
* @param message 设备消息
* @param sessionBuilder 设备操作
* @param sessionConsumer 设备消费
* @param deviceNotFoundListener 异常监听
* @return 设备操作
*/
public Mono<DeviceOperator> handleDeviceMessage(DeviceMessage message,
Function<DeviceOperator, DeviceSession> sessionBuilder,
Consumer<DeviceSession> sessionConsumer,

View File

@ -3,6 +3,16 @@ package org.jetlinks.community.network.tcp.device;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceOperator;
@ -17,16 +27,6 @@ import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.monitor.DeviceGatewayMonitor;
import org.jetlinks.community.gateway.monitor.GatewayMonitors;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.community.network.utils.DeviceGatewayHelper;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
@ -48,6 +48,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
@Getter
private final String id;
/**
* 维护所有创建的tcp server
*/
private final TcpServer tcpServer;
private final String protocol;
@ -60,6 +63,9 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
private final DeviceGatewayMonitor gatewayMonitor;
/**
* 连接计数器
*/
private final LongAdder counter = new LongAdder();
private final EmitterProcessor<Message> processor = EmitterProcessor.create(false);
@ -67,10 +73,11 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
private final FluxSink<Message> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
private final AtomicBoolean started = new AtomicBoolean();
private Disposable disposable;
private final DeviceGatewayHelper helper;
/**
* 数据流控开关
*/
private Disposable disposable;
public TcpServerDeviceGateway(String id,
String protocol,
@ -93,133 +100,39 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
return supports.getProtocol(protocol);
}
/**
* 当前总链接
*
* @return 当前总链接
*/
@Override
public long totalConnection() {
return counter.sum();
}
/**
* 传输协议
*
* @return {@link org.jetlinks.core.message.codec.DefaultTransport}
*/
@Override
public Transport getTransport() {
return DefaultTransport.TCP;
}
/**
* 网络类型
*
* @return {@link org.jetlinks.community.network.DefaultNetworkType}
*/
@Override
public NetworkType getNetworkType() {
return DefaultNetworkType.TCP_SERVER;
}
class TcpConnection implements DeviceGatewayContext {
final TcpClient client;
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
final InetSocketAddress address;
TcpConnection(TcpClient client) {
this.client = client;
this.address = client.getRemoteAddress();
gatewayMonitor.totalConnection(counter.sum());
client.onDisconnect(() -> {
counter.decrement();
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
gatewayMonitor.connected();
DeviceSession session = sessionManager.getSession(client.getId());
if (session == null) {
session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
@Override
public void setKeepAliveTimeout(Duration timeout) {
keepaliveTimeout.set(timeout);
client.setKeepAliveTimeout(timeout);
}
@Override
public Optional<InetSocketAddress> getClientAddress() {
return Optional.of(address);
}
};
}
sessionRef.set(session);
}
Mono<Void> accept() {
return getProtocol()
.flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this))
.then(
client
.subscribe()
.filter(tcp -> started.get())
.publishOn(Schedulers.parallel())
.flatMap(this::handleTcpMessage)
.onErrorResume((err) -> {
log.error(err.getMessage(), err);
client.shutdown();
return Mono.empty();
})
.then()
)
.doOnCancel(client::shutdown);
}
Mono<Void> handleTcpMessage(TcpMessage message) {
return getProtocol()
.flatMap(pt -> pt.getMessageCodec(getTransport()))
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry)))
.cast(DeviceMessage.class)
.doOnNext(msg -> gatewayMonitor.receivedMessage())
.flatMap(this::handleDeviceMessage)
.doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}",
address,
message
, err)))
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
.then();
}
Mono<Void> handleDeviceMessage(DeviceMessage message) {
if (processor.hasDownstreams()) {
sink.next(message);
}
return helper
.handleDeviceMessage(message,
device -> new TcpDeviceSession(device, client, getTransport(), gatewayMonitor),
DeviceGatewayHelper
.applySessionKeepaliveTimeout(message, keepaliveTimeout::get)
.andThen(session -> {
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
deviceSession.setClient(client);
sessionRef.set(deviceSession);
}),
() -> log.warn("无法从tcp[{}]消息中获取设备信息:{}", address, message)
)
.then();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return registry.getProduct(productId);
}
@Override
public Mono<Void> onMessage(DeviceMessage message) {
return handleDeviceMessage(message);
}
}
/**
* 启动网关
*/
private void doStart() {
if (started.getAndSet(true) || disposable != null) {
return;
@ -265,4 +178,134 @@ class TcpServerDeviceGateway implements DeviceGateway, MonitorSupportDeviceGatew
public boolean isAlive() {
return started.get();
}
/**
* TCP 客户端连接
*/
class TcpConnection implements DeviceGatewayContext {
final TcpClient client;
final AtomicReference<Duration> keepaliveTimeout = new AtomicReference<>();
final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
final InetSocketAddress address;
TcpConnection(TcpClient client) {
this.client = client;
this.address = client.getRemoteAddress();
gatewayMonitor.totalConnection(counter.sum());
client.onDisconnect(() -> {
counter.decrement();
gatewayMonitor.disconnected();
gatewayMonitor.totalConnection(counter.sum());
});
gatewayMonitor.connected();
DeviceSession session = sessionManager.getSession(client.getId());
if (session == null) {
session = new UnknownTcpDeviceSession(client.getId(), client, getTransport()) {
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
return super.send(encodedMessage).doOnSuccess(r -> gatewayMonitor.sentMessage());
}
@Override
public void setKeepAliveTimeout(Duration timeout) {
keepaliveTimeout.set(timeout);
client.setKeepAliveTimeout(timeout);
}
@Override
public Optional<InetSocketAddress> getClientAddress() {
return Optional.of(address);
}
};
}
sessionRef.set(session);
}
/**
* 接收消息
*
* @return void
*/
Mono<Void> accept() {
return getProtocol()
.flatMap(protocol -> protocol.onClientConnect(getTransport(), client, this))
.then(
client
.subscribe()
.filter(tcp -> started.get())
.publishOn(Schedulers.parallel())
.flatMap(this::handleTcpMessage)
.onErrorResume((err) -> {
log.error(err.getMessage(), err);
client.shutdown();
return Mono.empty();
})
.then()
)
.doOnCancel(client::shutdown);
}
/**
* 处理TCP消息 ==>> 设备消息
*
* @param message tcp消息
* @return void
*/
Mono<Void> handleTcpMessage(TcpMessage message) {
return getProtocol()
.flatMap(pt -> pt.getMessageCodec(getTransport()))
.flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(sessionRef.get(), message, registry)))
.cast(DeviceMessage.class)
.doOnNext(msg -> gatewayMonitor.receivedMessage())
.flatMap(this::handleDeviceMessage)
.doOnEach(ReactiveLogger.onError(err -> log.error("处理TCP[{}]消息失败:\n{}",
address,
message
, err)))
.onErrorResume((err) -> Mono.fromRunnable(client::reset))
.then();
}
/**
* 处理设备消息
*
* @param message 设备消息
* @return void
*/
Mono<Void> handleDeviceMessage(DeviceMessage message) {
if (processor.hasDownstreams()) {
sink.next(message);
}
return helper
.handleDeviceMessage(message,
device -> new TcpDeviceSession(device, client, getTransport(), gatewayMonitor),
DeviceGatewayHelper
.applySessionKeepaliveTimeout(message, keepaliveTimeout::get)
.andThen(session -> {
TcpDeviceSession deviceSession = session.unwrap(TcpDeviceSession.class);
deviceSession.setClient(client);
sessionRef.set(deviceSession);
}),
() -> log.warn("无法从tcp[{}]消息中获取设备信息:{}", address, message)
)
.then();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return registry.getProduct(productId);
}
@Override
public Mono<Void> onMessage(DeviceMessage message) {
return handleDeviceMessage(message);
}
}
}

View File

@ -1,8 +1,5 @@
package org.jetlinks.community.network.tcp.device;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.supports.DeviceGatewayProperties;
import org.jetlinks.community.gateway.supports.DeviceGatewayProvider;
@ -10,11 +7,20 @@ import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
/**
* TCP服务设备网关提供商
*
* @author zhouhao
* @since 1.0
*/
@Component
public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider {
@ -63,9 +69,9 @@ public class TcpServerDeviceGatewayProvider implements DeviceGatewayProvider {
.map(mqttServer -> {
String protocol = (String) properties.getConfiguration().get("protocol");
Assert.hasText(protocol,"protocol can not be empty");
Assert.hasText(protocol, "protocol can not be empty");
return new TcpServerDeviceGateway(properties.getId(),
return new TcpServerDeviceGateway(properties.getId(),
protocol,
protocolSupports,
registry,

View File

@ -23,5 +23,6 @@ public interface TcpServer extends Network {
/**
* 关闭服务端
*/
@Override
void shutdown();
}

View File

@ -7,8 +7,21 @@ import java.util.Date;
import java.util.Map;
import java.util.Optional;
/**
* 时序数据封装类
*
* @author zhouhao
*/
public interface TimeSeriesData extends ValueObject {
static TimeSeriesData of(Date date, Map<String, Object> data) {
return of(date == null ? System.currentTimeMillis() : date.getTime(), data);
}
static TimeSeriesData of(long timestamp, Map<String, Object> data) {
return new SimpleTimeSeriesData(timestamp, data);
}
long getTimestamp();
Map<String, Object> getData();
@ -23,14 +36,7 @@ public interface TimeSeriesData extends ValueObject {
return Optional.ofNullable(getData().get(name));
}
static TimeSeriesData of(Date date, Map<String, Object> data) {
return of(date == null ? System.currentTimeMillis() : date.getTime(), data);
}
static TimeSeriesData of(long timestamp, Map<String, Object> data) {
return new SimpleTimeSeriesData(timestamp, data);
}
@Override
default <T> T as(Class<T> type) {
return FastBeanCopier.copy(getData(), type);
}

View File

@ -36,115 +36,9 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
PropertyConstants.deviceName.getKey(),
PropertyConstants.orgId.getKey()
};
//设备注册中心
private final DeviceRegistry registry;
private final EventBus eventBus;
private final MessageHandler messageHandler;
private final static BiConsumer<Throwable, Object> doOnError = (error, val) -> DeviceMessageConnector.log.error(error.getMessage(), error);
private final static Function<DeviceOperator, Mono<Values>> configGetter = operator -> operator.getSelfConfigs(allConfigHeader);
private final static Values emptyValues = Values.of(Collections.emptyMap());
public DeviceMessageConnector(EventBus eventBus,
DeviceRegistry registry,
MessageHandler messageHandler,
DeviceSessionManager sessionManager) {
this.registry = registry;
this.eventBus = eventBus;
this.messageHandler = messageHandler;
sessionManager
.onRegister()
.flatMap(session -> {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(session.connectTime());
return onMessage(message);
})
.onErrorContinue(doOnError)
.subscribe();
sessionManager
.onUnRegister()
.flatMap(session -> {
DeviceOfflineMessage message = new DeviceOfflineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(System.currentTimeMillis());
return onMessage(message);
})
.onErrorContinue(doOnError)
.subscribe();
}
public Mono<Void> onMessage(Message message) {
if (null == message) {
return Mono.empty();
}
return this
.getTopic(message)
.flatMap(topic -> eventBus.publish(topic, message).then())
.onErrorContinue(doOnError)
.then();
}
private Flux<String> getTopic(Message message) {
Flux<String> topicsStream = createDeviceMessageTopic(registry, message);
if (message instanceof ChildDeviceMessage) { //子设备消息
return this
.onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
.thenMany(topicsStream);
} else if (message instanceof ChildDeviceMessageReply) { //子设备消息
return this
.onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
.thenMany(topicsStream);
}
return topicsStream;
}
public static Flux<String> createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) {
return Flux.defer(() -> {
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
String deviceId = deviceMessage.getDeviceId();
if (deviceId == null) {
log.warn("无法从消息中获取设备ID:{}", deviceMessage);
return Mono.empty();
}
return deviceRegistry
.getDevice(deviceId)
.flatMap(configGetter)
.defaultIfEmpty(emptyValues)
.flatMapIterable(configs -> {
configs.getAllValues().forEach(deviceMessage::addHeader);
String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null");
String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage);
List<String> topics = new ArrayList<>(2);
topics.add(topic);
configs.getValue(PropertyConstants.orgId)
.ifPresent(orgId -> topics.add("/org/" + orgId + topic));
return topics;
});
}
return Mono.just("/device/unknown/message/unknown");
});
}
public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) {
StringBuilder builder = new StringBuilder(64)
.append("/device/")
.append(productId)
.append("/")
.append(deviceId);
appendDeviceMessageTopic(message, builder);
return builder.toString();
}
private static final BiConsumer<Message, StringBuilder>[] fastTopicBuilder;
static {
@ -208,7 +102,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
if (msg instanceof DeviceMessage) {
builder.append("/message/children/")
.append(((DeviceMessage) msg).getDeviceId());
.append(((DeviceMessage) msg).getDeviceId());
} else {
builder.append("/message/children");
}
@ -219,7 +113,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
if (msg instanceof DeviceMessage) {
builder.append("/message/children/reply/")
.append(((DeviceMessage) msg).getDeviceId());
.append(((DeviceMessage) msg).getDeviceId());
} else {
builder.append("/message/children/reply");
}
@ -229,6 +123,80 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived");
}
private final DeviceRegistry registry;
private final EventBus eventBus;
private final MessageHandler messageHandler;
public DeviceMessageConnector(EventBus eventBus,
DeviceRegistry registry,
MessageHandler messageHandler,
DeviceSessionManager sessionManager) {
this.registry = registry;
this.eventBus = eventBus;
this.messageHandler = messageHandler;
sessionManager
.onRegister()
.flatMap(session -> {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(session.connectTime());
return onMessage(message);
})
.onErrorContinue(doOnError)
.subscribe();
sessionManager
.onUnRegister()
.flatMap(session -> {
DeviceOfflineMessage message = new DeviceOfflineMessage();
message.setDeviceId(session.getDeviceId());
message.setTimestamp(System.currentTimeMillis());
return onMessage(message);
})
.onErrorContinue(doOnError)
.subscribe();
}
public static Flux<String> createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) {
return Flux.defer(() -> {
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
String deviceId = deviceMessage.getDeviceId();
if (deviceId == null) {
log.warn("无法从消息中获取设备ID:{}", deviceMessage);
return Mono.empty();
}
return deviceRegistry
.getDevice(deviceId)
.flatMap(configGetter)
.defaultIfEmpty(emptyValues)
.flatMapIterable(configs -> {
configs.getAllValues().forEach(deviceMessage::addHeader);
String productId = deviceMessage.getHeader(PropertyConstants.productId).orElse("null");
String topic = createDeviceMessageTopic(productId, deviceId, deviceMessage);
List<String> topics = new ArrayList<>(2);
topics.add(topic);
configs.getValue(PropertyConstants.orgId)
.ifPresent(orgId -> topics.add("/org/" + orgId + topic));
return topics;
});
}
return Mono.just("/device/unknown/message/unknown");
});
}
public static String createDeviceMessageTopic(String productId, String deviceId, DeviceMessage message) {
StringBuilder builder = new StringBuilder(64)
.append("/device/")
.append(productId)
.append("/")
.append(deviceId);
appendDeviceMessageTopic(message, builder);
return builder.toString();
}
private static void createFastBuilder(MessageType messageType,
String topic) {
fastTopicBuilder[messageType.ordinal()] = (ignore, builder) -> builder.append(topic);
@ -249,6 +217,37 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
}
}
public Mono<Void> onMessage(Message message) {
if (null == message) {
return Mono.empty();
}
return this
.getTopic(message)
.flatMap(topic -> eventBus.publish(topic, message).then())
.onErrorContinue(doOnError)
.then();
}
private Flux<String> getTopic(Message message) {
Flux<String> topicsStream = createDeviceMessageTopic(registry, message);
if (message instanceof ChildDeviceMessage) { //子设备消息
return this
.onMessage(((ChildDeviceMessage) message).getChildDeviceMessage())
.thenMany(topicsStream);
} else if (message instanceof ChildDeviceMessageReply) { //子设备消息
return this
.onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage())
.thenMany(topicsStream);
}
return topicsStream;
}
/**
* 处理设备消息
*
* @param message 设备消息
* @return 处理结果
*/
protected Mono<Boolean> handleChildrenDeviceMessage(Message message) {
if (message instanceof DeviceMessageReply) {
return doReply(((DeviceMessageReply) message));
@ -261,10 +260,23 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
return handleChildrenDeviceMessage(reply.getChildDeviceMessage());
}
/**
* 处理回复消息
*
* @param reply 子设备回复消息
* @return 处理结果
*/
protected Mono<Boolean> handleChildrenDeviceMessageReply(ChildDeviceMessageReply reply) {
return handleChildrenDeviceMessage(reply.getChildDeviceMessage());
}
/**
* 这里才是真正处理消息的地方
*
* @param device 设备操作类
* @param message 设备消息
* @return 处理结果
*/
@Override
public Mono<Boolean> handleMessage(DeviceOperator device, @Nonnull Message message) {
Mono<Boolean> then;
@ -284,6 +296,12 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
}
/**
* 回复消息处理逻辑
*
* @param reply 设备回复消息
* @return 处理结果
*/
private Mono<Boolean> doReply(DeviceMessageReply reply) {
if (log.isDebugEnabled()) {
log.debug("reply message {}", reply.getMessageId());

View File

@ -15,10 +15,16 @@ import reactor.core.publisher.Mono;
*/
@Slf4j
@AllArgsConstructor
public class TimeSeriesMessageWriterConnector{
public class TimeSeriesMessageWriterConnector {
private final DeviceDataService dataService;
/**
* 订阅设备消息 入库
*
* @param message 设备消息
* @return void
*/
@Subscribe(topics = "/device/**", id = "device-message-ts-writer")
public Mono<Void> writeDeviceMessageToTs(DeviceMessage message) {
return dataService.saveDeviceMessage(message);

View File

@ -8,28 +8,22 @@ import org.hswebframework.ezorm.core.param.TermType;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.device.entity.DeviceEvent;
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.enums.DeviceLogType;
import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceLogMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import org.jetlinks.core.metadata.*;
import org.jetlinks.core.metadata.types.*;
import org.jetlinks.community.device.entity.DeviceEvent;
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
import org.jetlinks.community.device.entity.DevicePropertiesEntity;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.enums.DeviceLogType;
import org.jetlinks.community.device.events.handler.ValueTypeTranslator;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.core.utils.DeviceMessageTracer;
import org.jetlinks.core.utils.TimestampUtils;
import org.reactivestreams.Publisher;
@ -43,7 +37,6 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -59,8 +52,8 @@ import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.*;
*/
public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
private final AtomicInteger nanoInc = new AtomicInteger();
protected DeviceRegistry deviceRegistry;
protected DeviceDataStorageProperties properties;
public AbstractDeviceDataStoragePolicy(DeviceRegistry registry,
@ -88,9 +81,11 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
protected abstract Mono<Void> doSaveData(String metric, Flux<TimeSeriesData> data);
/**
* 设备消息转换 二元组 {deviceId, tsData}
*
* @param productId 产品ID
* @param message 原始消息
* @param properties 属性
* @param message 设备属性消息
* @param properties 物模型属性
* @return 数据集合
* @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
* @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
@ -107,7 +102,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
QueryParamEntity paramEntity,
Function<TimeSeriesData, T> mapper);
/**
* 保存单个设备消息,为了提升性能,存储策略会对保存请求进行缓冲,达到一定条件后
* 再进行批量写出,具体由不同对存储策略实现
* <p>
* 如果保存失败,在这里不会得到错误信息.
*
* @param message 设备消息
* @return void
*/
@Nonnull
@Override
public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {
@ -121,10 +124,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
@Override
public Mono<Void> saveDeviceMessage(@Nonnull Publisher<DeviceMessage> message) {
return Flux.from(message)
.flatMap(this::convertMessageToTimeSeriesData)
.groupBy(Tuple2::getT1, Integer.MAX_VALUE)
.flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
.then();
.flatMap(this::convertMessageToTimeSeriesData)
.groupBy(Tuple2::getT1, Integer.MAX_VALUE)
.flatMap(group -> doSaveData(group.key(), group.map(Tuple2::getT2)))
.then();
}
protected String createDataId(DeviceMessage message) {
@ -152,6 +155,12 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.toSimpleMap())));
}
/**
* 设备消息转换成时序数据 二元组 {deviceId, tsData}
*
* @param message 设备消息
* @return 二元组
*/
protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(DeviceMessage message) {
boolean ignoreStorage = message.getHeaderOrDefault(Headers.ignoreStorage);
boolean ignoreLog = message.getHeaderOrDefault(Headers.ignoreLog);
@ -194,8 +203,16 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
return Flux.merge(all);
}
/**
* 事件消息转换成 二元组{deviceId, tsData}
*
* @param productId 产品ID
* @param message 事件消息
* @return 二元组
*/
protected Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String productId, EventMessage message) {
// 设备注册中心获取设备操作接口
// 获取设备元数据 物模型
return deviceRegistry
.getDevice(message.getDeviceId())
.flatMap(device -> device
@ -227,20 +244,19 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.map(data -> Tuples.of(deviceEventMetricId(productId, message.getEvent()), data));
}
@Override
public Mono<PagerResult<DeviceOperationLogEntity>> queryDeviceMessageLog(@Nonnull String deviceId, @Nonnull QueryParamEntity entity) {
return deviceRegistry
.getDevice(deviceId)
.flatMap(operator -> operator.getSelfConfig(DeviceConfigKey.productId))
.flatMap(productId -> this
.doQueryPager(deviceLogMetricId(productId),
entity.and("deviceId", TermType.eq, deviceId),
data -> data.as(DeviceOperationLogEntity.class)
entity.and("deviceId", TermType.eq, deviceId),
data -> data.as(DeviceOperationLogEntity.class)
))
.defaultIfEmpty(PagerResult.empty());
}
@Nonnull
@Override
public Flux<DeviceEvent> queryEvent(@Nonnull String deviceId,
@ -256,15 +272,15 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.where("deviceId", deviceId)
.execute(param -> this
.doQuery(deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
})));
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
})));
}
@Nonnull
@ -278,18 +294,18 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.getDevice(deviceId)
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
.flatMap(tp2 -> query.toQuery()
.where("deviceId", deviceId)
.execute(param -> this
.doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
}))
.where("deviceId", deviceId)
.execute(param -> this
.doQueryPager(deviceEventMetricId(tp2.getT1().getId(), event),
param,
data -> {
DeviceEvent deviceEvent = new DeviceEvent(data.values());
if (format) {
deviceEvent.putFormat(tp2.getT2().getEventOrNull(event));
}
deviceEvent.putIfAbsent("timestamp", data.getTimestamp());
return deviceEvent;
}))
);
}
@ -383,6 +399,16 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
return Maps.newHashMapWithExpectedSize(size);
}
/**
* 设备消息转换 二元组{deviceId, tsData}
*
* @param productId 产品ID
* @param message 设备属性消息
* @param properties 物模型属性
* @return 数据集合
* @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
* @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
*/
protected Flux<Tuple2<String, TimeSeriesData>> convertPropertiesForRowPolicy(String productId,
DeviceMessage message,
Map<String, Object> properties) {
@ -415,10 +441,10 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
return Mono
.just(TimeSeriesData.of(ts, this
.createRowPropertyData(id,
TimestampUtils.toMillis(ts),
device.getDeviceId(),
propertyMetadata,
entry.getT2().getValue()))
TimestampUtils.toMillis(ts),
device.getDeviceId(),
propertyMetadata,
entry.getT2().getValue()))
);
})
.map(data -> Tuples.of(devicePropertyMetricId(productId), data)))
@ -518,8 +544,8 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
}
if (properties.length == 1) {
return metadata.getProperty(properties[0])
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
.map(Arrays::asList)
.orElseGet(Collections::emptyList);
}
Set<String> ids = new HashSet<>(Arrays.asList(properties));
return metadata
@ -529,11 +555,12 @@ public abstract class AbstractDeviceDataStoragePolicy implements DeviceDataStora
.collect(Collectors.toList());
}
private final AtomicInteger nanoInc = new AtomicInteger();
//将毫秒转为纳秒努力让数据不重复
/**
* 将毫秒转为纳秒努力让数据不重复
*
* @param millis 毫秒值
* @return 尽可能不会重复的long值
*/
protected long createUniqueNanoTime(long millis) {
long nano = TimeUnit.MILLISECONDS.toNanos(millis);

View File

@ -2,16 +2,16 @@ package org.jetlinks.community.device.service.data;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.device.entity.DeviceEvent;
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.core.Value;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.community.device.entity.DeviceEvent;
import org.jetlinks.community.device.entity.DeviceOperationLogEntity;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.stereotype.Component;
@ -23,6 +23,13 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* 默认设备数据服务
* <p>
* 管理设备存储策略提供数据查询和入库操作
*
* @author zhouhao
*/
@Component
public class DefaultDeviceDataService implements DeviceDataService {
@ -55,8 +62,16 @@ public class DefaultDeviceDataService implements DeviceDataService {
.then();
}
/**
* 通过产品ID 获取存储策略
*
* @param productId 产品ID
* @return 存储策略
*/
Mono<DeviceDataStoragePolicy> getStoreStrategy(String productId) {
// 从注册中心获取产品操作接口
// 从配置中获取产品的存储策略
// 巧妙的双层switchIfEmpty 外层判断空配置 内层判断空策略
return deviceRegistry
.getProduct(productId)
.flatMap(product -> product
@ -69,7 +84,16 @@ public class DefaultDeviceDataService implements DeviceDataService {
.flatMap(Function.identity()));
}
/**
* 通过设备ID 获取存储策略
*
* @param deviceId 设备ID
* @return 存储策略
*/
Mono<DeviceDataStoragePolicy> getDeviceStrategy(String deviceId) {
// 从注册中心获取设备操作接口
// 转换成产品操作接口
// 继而通过转换的产品ID获取存储策略
return deviceRegistry.getDevice(deviceId)
.flatMap(DeviceOperator::getProduct)
.map(DeviceProductOperator::getId)
@ -94,7 +118,7 @@ public class DefaultDeviceDataService implements DeviceDataService {
@Nonnull String... properties) {
return this
.getDeviceStrategy(deviceId)
.flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query,properties));
.flatMapMany(strategy -> strategy.queryEachProperties(deviceId, query, properties));
}
@Nonnull
@ -145,7 +169,15 @@ public class DefaultDeviceDataService implements DeviceDataService {
.defaultIfEmpty(PagerResult.empty());
}
/**
* 保存单个设备消息,为了提升性能,存储策略会对保存请求进行缓冲,达到一定条件后
* 再进行批量写出,具体由不同对存储策略实现
* <p>
* 如果保存失败,在这里不会得到错误信息.
*
* @param message 设备消息
* @return void
*/
@Nonnull
@Override
public Mono<Void> saveDeviceMessage(@Nonnull DeviceMessage message) {

View File

@ -2,7 +2,14 @@ package org.jetlinks.community.device.service.data;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.community.timeseries.query.Group;
import org.jetlinks.community.timeseries.query.TimeGroup;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
@ -10,10 +17,6 @@ import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.springframework.stereotype.Component;
@ -28,8 +31,12 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetricId;
/**
* 时序数据列存储策略
*
* @author zhouhao
*/
@Component
public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
@ -65,10 +72,10 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
public Mono<Void> registerMetadata(@Nonnull String productId, @Nonnull DeviceMetadata metadata) {
return Flux
.concat(Flux
.fromIterable(metadata.getEvents())
.flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))),
timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())),
timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId)))
.fromIterable(metadata.getEvents())
.flatMap(event -> timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.event(productId, event))),
timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.properties(productId, metadata.getProperties())),
timeSeriesManager.registerMetadata(DeviceTimeSeriesMetadata.log(productId)))
.then();
}
@ -77,8 +84,6 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
String deviceId,
Map<String, PropertyMetadata> property,
QueryParamEntity param) {
//查询多个属性,分组聚合获取第一条数据
return param
.toQuery()
@ -119,20 +124,20 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
.getDevice(deviceId)
.flatMap(device -> Mono.zip(device.getProduct(), device.getMetadata()))
.flatMap(tp2 -> {
PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property);
PropertyMetadata prop = tp2.getT2().getPropertyOrNull(property);
return param
.toQuery()
.includes(property)
.where("deviceId", deviceId)
.execute(query -> timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.queryPager(query,
data -> DeviceProperty
.of(data, data.get(property).orElse(0), prop)
.property(property)
));
}
return param
.toQuery()
.includes(property)
.where("deviceId", deviceId)
.execute(query -> timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.queryPager(query,
data -> DeviceProperty
.of(data, data.get(property).orElse(0), prop)
.property(property)
));
}
);
}
@ -243,11 +248,22 @@ public class TimeSeriesColumnDeviceDataStoragePolicy extends TimeSeriesDeviceDat
.doOnNext(agg -> agg.values().remove("_time"));
}
/**
* 设备消息转换 二元组{deviceId, tsData}
*
* @param productId 产品ID
* @param message 设备属性消息
* @param properties 物模型属性
* @return 数据集合
* @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
* @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
*/
@Override
protected Flux<Tuple2<String, TimeSeriesData>> convertProperties(String productId, DeviceMessage message, Map<String, Object> properties) {
return convertPropertiesForColumnPolicy(productId, message, properties);
}
@Override
protected Object convertPropertyValue(Object value, PropertyMetadata metadata) {
if (value == null || metadata == null) {
return value;

View File

@ -10,6 +10,14 @@ import reactor.core.publisher.Mono;
import java.util.function.Function;
/**
* 抽象时序数据存储策略
* <p>
* 提供时序数据通用的查询存储逻辑
* </p>
*
* @author zhouhao
*/
public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDataStoragePolicy {
@ -22,18 +30,21 @@ public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDa
this.timeSeriesManager = timeSeriesManager;
}
@Override
protected Mono<Void> doSaveData(String metric, TimeSeriesData data) {
return timeSeriesManager
.getService(metric)
.commit(data);
}
@Override
protected Mono<Void> doSaveData(String metric, Flux<TimeSeriesData> data) {
return timeSeriesManager
.getService(metric)
.save(data);
}
@Override
protected <T> Flux<T> doQuery(String metric,
QueryParamEntity paramEntity,
Function<TimeSeriesData, T> mapper) {
@ -44,6 +55,7 @@ public abstract class TimeSeriesDeviceDataStoragePolicy extends AbstractDeviceDa
}
@Override
protected <T> Mono<PagerResult<T>> doQueryPager(String metric,
QueryParamEntity paramEntity,
Function<TimeSeriesData, T> mapper) {

View File

@ -2,18 +2,18 @@ package org.jetlinks.community.device.service.data;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.community.device.entity.DeviceProperty;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetadata;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.query.*;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
@ -28,6 +28,11 @@ import java.util.stream.Stream;
import static org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric.devicePropertyMetric;
/**
* 设备时序数据行存储策略
*
* @author zhouhao
*/
@Component
public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceDataStoragePolicy implements DeviceDataStoragePolicy {
@ -184,11 +189,11 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
return timeSeriesManager
.getService(devicePropertyMetric(tp2.getT1().getId()))
.aggregation(AggregationQueryParam
.of()
.agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
.groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
.filter(query)
.filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
.of()
.agg(new LimitAggregationColumn("property", "property", Aggregation.TOP, query.getPageSize()))
.groupBy(new LimitGroup("property", "property", propertiesMap.size() * 2)) //按property分组
.filter(query)
.filter(q -> q.where("deviceId", deviceId).in("property", propertiesMap.keySet()))
).map(data -> DeviceProperty
.of(data, data.getString("property").map(propertiesMap::get).orElse(null))
.deviceId(deviceId));
@ -236,13 +241,13 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
//执行查询
.execute(timeSeriesManager.getService(getTimeSeriesMetric(productId))::aggregation)
//按时间分组,然后将返回的结果合并起来
.groupBy(agg -> agg.getString("time", ""),Integer.MAX_VALUE)
.groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE)
.flatMap(group ->
{
String time = group.key();
return group
//按属性分组
.groupBy(agg -> agg.getString("property", ""),Integer.MAX_VALUE)
.groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE)
.flatMap(propsGroup -> {
String property = propsGroup.key();
return propsGroup
@ -285,6 +290,16 @@ public class TimeSeriesRowDeviceDataStoreStoragePolicy extends TimeSeriesDeviceD
.doOnNext(agg -> agg.values().remove("_time"));
}
/**
* 设备消息转换 二元组{deviceId, tsData}
*
* @param productId 产品ID
* @param message 设备属性消息
* @param properties 物模型属性
* @return 数据集合
* @see this#convertPropertiesForColumnPolicy(String, DeviceMessage, Map)
* @see this#convertPropertiesForRowPolicy(String, DeviceMessage, Map)
*/
@Override
protected Flux<Tuple2<String, TimeSeriesData>> convertProperties(String productId, DeviceMessage message, Map<String, Object> properties) {
return convertPropertiesForRowPolicy(productId, message, properties);

View File

@ -1,8 +1,8 @@
package org.jetlinks.community.device.timeseries;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
/**
* 设备时序数据度量标识
@ -26,6 +26,13 @@ public interface DeviceTimeSeriesMetric {
return TimeSeriesMetric.of(deviceEventMetricId(productId, eventId));
}
/**
* 构建事件指标ID
*
* @param productId 产品ID
* @param eventId 事件ID
* @return 事件指标ID
*/
static String deviceEventMetricId(String productId, String eventId) {
return "event_".concat(productId).concat("_").concat(eventId);
}

View File

@ -7,6 +7,11 @@ import org.jetlinks.community.gateway.supports.DeviceGatewayPropertiesManager;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
/**
* 设备网关配置服务
*
* @author zhouhao
*/
@Service
public class DeviceGatewayConfigService implements DeviceGatewayPropertiesManager {