diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Interval.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Interval.java index b8c1bce5..dec3b3e7 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Interval.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/Interval.java @@ -18,9 +18,15 @@ public class Interval { public static String minutes = "m"; public static String seconds = "s"; - private BigDecimal number; + private final BigDecimal number; - private String expression; + private final String expression; + + public boolean isFixed() { + return expression.equals(hours) || + expression.equals(minutes) || + expression.equals(seconds); + } @Override public String toString() { diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java index 46f363e3..6eaa3a58 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/ValueObject.java @@ -11,10 +11,10 @@ import java.util.Optional; public interface ValueObject { - Map getAll(); + Map values(); default Optional get(String name) { - return Optional.ofNullable(getAll()) + return Optional.ofNullable(values()) .map(map -> map.get(name)); } diff --git a/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java b/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java index 42698764..2941be1b 100644 --- a/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java +++ b/jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/MeasurementParameter.java @@ -22,7 +22,7 @@ public class MeasurementParameter implements ValueObject { } @Override - public Map getAll() { + public Map values() { return params; } } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java index 12299988..c3ecade0 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/index/strategies/AbstractElasticSearchIndexStrategy.java @@ -95,7 +95,12 @@ public abstract class AbstractElasticSearchIndexStrategy implements ElasticSearc } Map mappingConfig = new HashMap<>(); PutMappingRequest request = new PutMappingRequest(wrapIndex(metadata.getIndex())); - mappingConfig.put("properties", createElasticProperties(metadata.getProperties())); + request.type("_doc"); + List allProperties = new ArrayList<>(); + allProperties.addAll(metadata.getProperties()); + allProperties.addAll(ignore.getProperties()); + + mappingConfig.put("properties", createElasticProperties(allProperties)); request.source(mappingConfig); return request; } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java new file mode 100644 index 00000000..62939e19 --- /dev/null +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/AggType.java @@ -0,0 +1,79 @@ +package org.jetlinks.community.elastic.search.service.reactive; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; + +@AllArgsConstructor +public enum AggType { + + AVG("平均") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.avg(name).field(filed).missing(0); + } + + }, + MAX("最大") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.max(name).field(filed).missing(0); + } + + + }, + COUNT("非空值计数") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.count(name).field(filed).missing(0); + } + + }, + MIN("最小") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.min(name).field(filed).missing(0); + } + }, + FIRST("第一条数据") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.topHits(name).size(1); + } + }, + TOP("第N条数据") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.topHits(name); + } + }, + SUM("总数") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.sum(name).field(filed).missing(0); + } + }, + STATS("统计汇总") { + @Override + public AggregationBuilder aggregationBuilder(String name, String filed) { + return AggregationBuilders.stats(name).field(filed).missing(0); + } + + }; + + @Getter + private final String text; + + public abstract AggregationBuilder aggregationBuilder(String name, String filed); + + public static AggType of(String name) { + for (AggType type : AggType.values()) { + if (type.name().equalsIgnoreCase(name)) { + return type; + } + } + throw new UnsupportedOperationException("不支持的聚合度量类型:" + name); + } + +} diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java index af388bb0..c3a14bdd 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/DefaultReactiveElasticsearchClient.java @@ -1,8 +1,9 @@ package org.jetlinks.community.elastic.search.service.reactive; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.serializer.SerializerFeature; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; @@ -11,6 +12,7 @@ import org.apache.http.util.EntityUtils; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; @@ -50,16 +52,15 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Request; import org.elasticsearch.cluster.health.ClusterHealthStatus; -import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -110,8 +111,10 @@ import java.util.StringJoiner; import java.util.function.Function; import java.util.function.Supplier; +import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER; import static org.springframework.data.elasticsearch.client.util.RequestConverters.createContentType; +@Slf4j public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient { private final HostProvider hostProvider; private final RequestCreator requestCreator; @@ -131,6 +134,11 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch this.hostProvider = hostProvider; this.requestCreator = requestCreator; + info() + .subscribe(mainResponse -> { + log.debug("connect elasticsearch server : {}", JSON.toJSONString(mainResponse, SerializerFeature.PrettyFormat)); + version = mainResponse.getVersion(); + }); } public void setHeadersSupplier(Supplier headersSupplier) { @@ -216,7 +224,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices() */ @Override - public Indices indices() { + public org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices indices() { return this; } @@ -547,9 +555,10 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch */ @Override public Mono existsIndex(HttpHeaders headers, GetIndexRequest request) { - - return sendRequest(request, requestCreator.indexExists(), RawActionResponse.class, headers) // - .map(response -> response.statusCode().is2xxSuccessful()) // + return sendRequest(request, requestCreator.indexExists() + , RawActionResponse.class, headers) // + .map(response -> response.statusCode().is2xxSuccessful()) + .onErrorReturn(false) .next(); } @@ -585,7 +594,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono openIndex(HttpHeaders headers, OpenIndexRequest request) { - return sendRequest(request, requestCreator.indexOpen(), AcknowledgedResponse.class, headers) // + return sendRequest( + request, + requestCreator + .indexOpen() + .andThen(r -> { + r.addParameter("include_type_name", "true"); + return r; + }), + AcknowledgedResponse.class, headers) // .then(); } @@ -618,7 +635,9 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono updateMapping(HttpHeaders headers, PutMappingRequest putMappingRequest) { - return sendRequest(putMappingRequest, requestCreator.putMapping(), AcknowledgedResponse.class, headers) // + return sendRequest(putMappingRequest + , requestCreator.putMapping() + , AcknowledgedResponse.class, headers) // .then(); } @@ -638,7 +657,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback) */ @Override - public Mono execute(ReactiveElasticsearchClientCallback callback) { + public Mono execute(org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.ReactiveElasticsearchClientCallback callback) { return this.hostProvider.getActive(HostProvider.Verification.LAZY) // .flatMap(callback::doWithClient) // @@ -655,7 +674,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } @Override - public Mono status() { + public Mono status() { return hostProvider.clusterInfo() // .map(it -> new ClientStatus(it.getNodes())); @@ -795,8 +814,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } catch (Exception e) { return Mono - .error(new ElasticsearchStatusException(content, - RestStatus.fromCode(response.statusCode().value()),errorParseFailure)); + .error(new ElasticsearchStatusException(content, RestStatus.fromCode(response.statusCode().value()))); } } } @@ -900,9 +918,15 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch @Override public Mono searchForPage(SearchRequest request) { - + long startTime = System.currentTimeMillis(); return sendRequest(request, requestCreator.search(), SearchResponse.class, HttpHeaders.EMPTY) - .singleOrEmpty(); + .singleOrEmpty() + .doOnNext(res -> { + log.trace("execute search {} {}ms : {}", request.indices(), System.currentTimeMillis() - startTime, request.source()); + }) + .doOnError(err -> { + log.warn("execute search {} error : {}", request.indices(), request.source(), err); + }); } @SneakyThrows @@ -940,9 +964,7 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch } Request convertGetIndexTemplateRequest(GetIndexTemplatesRequest getIndexTemplatesRequest) { - final Request request = new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names())); - - return request; + return new Request(HttpGet.METHOD_NAME, "/_template/" + String.join(",", getIndexTemplatesRequest.names())); } @Override @@ -974,16 +996,23 @@ public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearch .singleOrEmpty(); } + private Version version = Version.CURRENT; + + @Override + public Version serverVersion() { + return version; + } + // endregion // region internal classes /** - * Reactive client {@link Status} implementation. + * Reactive client {@link ReactiveElasticsearchClient.Status} implementation. * * @author Christoph Strobl */ - class ClientStatus implements Status { + class ClientStatus implements ReactiveElasticsearchClient.Status { private final Collection connectedHosts; diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java index 83ccdd50..fa9ae412 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java @@ -1,29 +1,41 @@ package org.jetlinks.community.elastic.search.service.reactive; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.Version; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.ExtendedBounds; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.TopHits; +import org.elasticsearch.search.aggregations.metrics.TopHitsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.ValueCount; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortBuilders; +import org.elasticsearch.search.sort.SortOrder; import org.hswebframework.ezorm.core.param.QueryParam; import org.hswebframework.ezorm.core.param.TermType; -import org.jetlinks.community.elastic.search.aggreation.bucket.Bucket; -import org.jetlinks.community.elastic.search.aggreation.bucket.BucketAggregationsStructure; -import org.jetlinks.community.elastic.search.aggreation.bucket.BucketResponse; -import org.jetlinks.community.elastic.search.aggreation.bucket.Sort; -import org.jetlinks.community.elastic.search.aggreation.enums.BucketType; -import org.jetlinks.community.elastic.search.aggreation.enums.MetricsType; -import org.jetlinks.community.elastic.search.aggreation.enums.OrderType; -import org.jetlinks.community.elastic.search.aggreation.metrics.MetricsAggregationStructure; import org.jetlinks.community.elastic.search.index.ElasticSearchIndexManager; import org.jetlinks.community.elastic.search.service.AggregationService; import org.jetlinks.community.elastic.search.service.DefaultElasticSearchService; import org.jetlinks.community.elastic.search.utils.ElasticSearchConverter; -import org.jetlinks.community.timeseries.query.AggregationQueryParam; +import org.jetlinks.community.timeseries.query.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.ZoneId; import java.util.*; import java.util.stream.Collectors; @@ -53,69 +65,186 @@ public class ReactiveAggregationService implements AggregationService { .map(metadata -> ElasticSearchConverter.convertSearchSourceBuilder(queryParam, metadata)); } + private AggregationBuilder createBuilder(Group group, AggregationQueryParam param) { + + if (group instanceof TimeGroup) { + TimeGroup timeGroup = ((TimeGroup) group); + DateHistogramAggregationBuilder builder = AggregationBuilders + .dateHistogram(timeGroup.getAlias()) + .field(timeGroup.getProperty()); + if (StringUtils.hasText(timeGroup.getFormat())) { + String format = timeGroup.getFormat(); + if (format.startsWith("yyyy")) { + format = "8" + format; + } + builder.format(format); + } + builder.order(BucketOrder.key(false)); + if (timeGroup.getInterval() != null) { + if (restClient.serverVersion().after(Version.V_7_2_0)) { + if (timeGroup.getInterval().isFixed()) { + builder.fixedInterval(new DateHistogramInterval(timeGroup.getInterval().toString())); + } else { + builder.calendarInterval(new DateHistogramInterval(timeGroup.getInterval().toString())); + } + } else { + builder.dateHistogramInterval(new DateHistogramInterval(timeGroup.getInterval().toString())); + } + } + + builder.extendedBounds(getExtendedBounds(param)); +// builder.missing(""); + + builder.timeZone(ZoneId.systemDefault()); + return builder; + } else { + TermsAggregationBuilder builder = AggregationBuilders + .terms(group.getAlias()) + .field(group.getProperty()); + if (group instanceof LimitGroup) { + builder.size(((LimitGroup) group).getLimit()); + } else { + builder.size(100); + } +// builder.missing(0); + return builder.executionHint("map"); + } + } + @Override public Flux> aggregation(String[] index, AggregationQueryParam aggregationQueryParam) { QueryParam queryParam = prepareQueryParam(aggregationQueryParam); - BucketAggregationsStructure structure = createAggParameter(aggregationQueryParam); + + List groups = new ArrayList<>(); + // TODO: 2020/9/3 + if (aggregationQueryParam.getGroupByTime() != null) { + groups.add(aggregationQueryParam.getGroupByTime()); + } + groups.addAll(aggregationQueryParam.getGroupBy()); + AggregationBuilder aggregationBuilder; + AggregationBuilder lastAggBuilder; + if (!groups.isEmpty()) { + Group first = groups.get(0); + aggregationBuilder = lastAggBuilder = createBuilder(first, aggregationQueryParam); + for (int i = 1; i < groups.size(); i++) { + aggregationBuilder.subAggregation(lastAggBuilder = createBuilder(groups.get(i), aggregationQueryParam)); + } + } else { + aggregationBuilder = lastAggBuilder = AggregationBuilders.count("count"); + } + for (AggregationColumn aggColumn : aggregationQueryParam.getAggColumns()) { + AggregationBuilder builder = AggType.of(aggColumn.getAggregation().name()) + .aggregationBuilder(aggColumn.getAlias(), aggColumn.getProperty()); + if (builder instanceof TopHitsAggregationBuilder) { + TopHitsAggregationBuilder topHitsBuilder = ((TopHitsAggregationBuilder) builder); + if (CollectionUtils.isEmpty(queryParam.getSorts())) { + topHitsBuilder.sort(aggregationQueryParam.getTimeProperty(), SortOrder.DESC); + } else { + topHitsBuilder.sorts(queryParam.getSorts() + .stream() + .map(sort -> SortBuilders.fieldSort(sort.getName()) + .order("desc".equalsIgnoreCase(sort.getOrder()) ? SortOrder.DESC : SortOrder.ASC)) + .collect(Collectors.toList())); + } + if (aggColumn instanceof LimitAggregationColumn) { + topHitsBuilder.size(((LimitAggregationColumn) aggColumn).getLimit()); + }else { + topHitsBuilder.size(1); + } + } + lastAggBuilder.subAggregation(builder); + } + + AggregationBuilder ageBuilder = aggregationBuilder; + return Flux.fromArray(index) .flatMap(idx -> Mono.zip(indexManager.getIndexStrategy(idx), Mono.just(idx))) .collectList() .flatMap(strategy -> - createSearchSourceBuilder(queryParam, index[0]) + this + .createSearchSourceBuilder(queryParam, index[0]) .map(builder -> new SearchRequest(strategy .stream() .map(tp2 -> tp2.getT1().getIndexForSearch(tp2.getT2())) .toArray(String[]::new)) .indicesOptions(DefaultElasticSearchService.indexOptions) - .source(builder.size(0).aggregation(structure.getType().aggregationBuilder(structure)) - ) + .source(builder.size(0).aggregation(ageBuilder)) ) ) .flatMap(restClient::searchForPage) - .filter(response -> response.getAggregations() != null) - .map(response -> BucketResponse.builder() - .name(structure.getName()) - .buckets(structure.getType().convert(response.getAggregations().get(structure.getName()))) - .build()) - .flatMapIterable(BucketsParser::convert) - .take(aggregationQueryParam.getLimit()) + .flatMapMany(this::parseResult) + .as(flux -> aggregationQueryParam.getLimit() > 0 ? flux.take(aggregationQueryParam.getLimit()) : flux) ; } - static class BucketsParser { + protected Flux> parseResult(SearchResponse searchResponse) { + return Mono.justOrEmpty(searchResponse.getAggregations()) + .flatMapIterable(Aggregations::asList) + .flatMap(agg -> parseAggregation(agg.getName(), agg)); + } - private final List> result = new ArrayList<>(); - - public static List> convert(BucketResponse response) { - return new BucketsParser(response).result; + private Flux> parseAggregation(String name, org.elasticsearch.search.aggregations.Aggregation aggregation) { + if (aggregation instanceof Terms) { + return parseAggregation(((Terms) aggregation)); } - - public BucketsParser(BucketResponse response) { - this(response.getBuckets()); - } - - public BucketsParser(List buckets) { - buckets.forEach(bucket -> parser(bucket, new HashMap<>())); - } - - public void parser(Bucket bucket, Map fMap) { - addBucketProperty(bucket, fMap); - if (bucket.getBuckets() != null && !bucket.getBuckets().isEmpty()) { - bucket.getBuckets().forEach(b -> { - Map map = new HashMap<>(fMap); - addBucketProperty(b, map); - parser(b, map); + if (aggregation instanceof TopHits) { + TopHits topHits = ((TopHits) aggregation); + return Flux + .fromArray(topHits.getHits().getHits()) + .map(hit -> { + Map val = hit.getSourceAsMap(); + if (!val.containsKey("id")) { + val.put("id", hit.getId()); + } + return val; }); - } else { - result.add(fMap); - } + } + if (aggregation instanceof Histogram) { + return parseAggregation(((Histogram) aggregation)); + } + if (aggregation instanceof ValueCount) { + return Flux.just(Collections.singletonMap(name, ((ValueCount) aggregation).getValue())); + } + if (aggregation instanceof NumericMetricsAggregation.SingleValue) { + return Flux.just(Collections.singletonMap(name, getSafeNumber(((NumericMetricsAggregation.SingleValue) aggregation).value()))); } - private void addBucketProperty(Bucket bucket, Map fMap) { - fMap.put(bucket.getName(), bucket.getKey()); - fMap.putAll(bucket.toMap()); - } + return Flux.empty(); + } + + private double getSafeNumber(double number) { + return (Double.isNaN(number) || Double.isInfinite(number)) ? 0D : number; + } + + private Flux> parseAggregation(Histogram aggregation) { + + return Flux + .fromIterable(aggregation.getBuckets()) + .flatMap(bucket -> + Flux.fromIterable(bucket.getAggregations().asList()) + .flatMap(agg -> this.parseAggregation(agg.getName(), agg)) + .defaultIfEmpty(Collections.emptyMap()) + .map(map -> { + Map val = new HashMap<>(map); + val.put(aggregation.getName(), bucket.getKeyAsString()); + val.put("_" + aggregation.getName(), bucket.getKey()); + return val; + }) + ); + } + + private Flux> parseAggregation(Terms aggregation) { + + return Flux.fromIterable(aggregation.getBuckets()) + .flatMap(bucket -> Flux.fromIterable(bucket.getAggregations().asList()) + .flatMap(agg -> parseAggregation(agg.getName(), agg) + .map(map -> { + Map val = new HashMap<>(map); + val.put(aggregation.getName(), bucket.getKeyAsString()); + return val; + }) + )); } protected static QueryParam prepareQueryParam(AggregationQueryParam param) { @@ -128,54 +257,14 @@ public class ReactiveAggregationService implements AggregationService { return queryParam; } - protected BucketAggregationsStructure createAggParameter(AggregationQueryParam param) { - List structures = new ArrayList<>(); - if (param.getGroupByTime() != null) { - structures.add(convertAggGroupTimeStructure(param)); - } - if (param.getGroupBy() != null && !param.getGroupBy().isEmpty()) { - structures.addAll(getTermTypeStructures(param)); - } - for (int i = 0, size = structures.size(); i < size; i++) { - if (i < size - 1) { - structures.get(i).setSubBucketAggregation(Collections.singletonList(structures.get(i + 1))); - } - if (i == size - 1) { - structures.get(i) - .setSubMetricsAggregation(param - .getAggColumns() - .stream() - .map(agg -> { - MetricsAggregationStructure metricsAggregationStructure = new MetricsAggregationStructure(); - metricsAggregationStructure.setField(agg.getProperty()); - metricsAggregationStructure.setName(agg.getAlias()); - metricsAggregationStructure.setType(MetricsType.of(agg.getAggregation().name())); - return metricsAggregationStructure; - }).collect(Collectors.toList())); - } - } - return structures.get(0); - } - - protected BucketAggregationsStructure convertAggGroupTimeStructure(AggregationQueryParam param) { - BucketAggregationsStructure structure = new BucketAggregationsStructure(); - structure.setInterval(param.getGroupByTime().getInterval().toString()); - structure.setType(BucketType.DATE_HISTOGRAM); - structure.setFormat(param.getGroupByTime().getFormat()); - structure.setName(param.getGroupByTime().getAlias()); - structure.setField(param.getGroupByTime().getProperty()); - structure.setSort(Sort.desc(OrderType.KEY)); - structure.setExtendedBounds(getExtendedBounds(param)); - return structure; - } - protected static ExtendedBounds getExtendedBounds(AggregationQueryParam param) { return new ExtendedBounds(calculateStartWithTime(param), param.getEndWithTime()); } private static long calculateStartWithTime(AggregationQueryParam param) { long startWithParam = param.getStartWithTime(); -// if (param.getGroupByTime() != null && param.getGroupByTime().getInterval() != null) { + +// if (param.getGroupByTime() != nullcalculateStartWithTime(param) && param.getGroupByTime().getInterval() != null) { // long timeInterval = param.getGroupByTime().getInterval().toMillis() * param.getLimit(); // long tempStartWithParam = param.getEndWithTime() - timeInterval; // startWithParam = Math.max(tempStartWithParam, startWithParam); @@ -183,17 +272,4 @@ public class ReactiveAggregationService implements AggregationService { return startWithParam; } - protected List getTermTypeStructures(AggregationQueryParam param) { - return param.getGroupBy() - .stream() - .map(group -> { - BucketAggregationsStructure structure = new BucketAggregationsStructure(); - structure.setType(BucketType.TERMS); - structure.setSize(param.getLimit()); - structure.setField(group.getProperty()); - structure.setName(group.getAlias()); - return structure; - }).collect(Collectors.toList()); - } - } diff --git a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java index 81c5009f..fae07a3f 100644 --- a/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java +++ b/jetlinks-components/elasticsearch-component/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveElasticsearchClient.java @@ -1,5 +1,6 @@ package org.jetlinks.community.elastic.search.service.reactive; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest; @@ -26,4 +27,6 @@ public interface ReactiveElasticsearchClient extends Mono updateTemplate(PutIndexTemplateRequest request); + Version serverVersion(); + } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java index 32781226..b846d0f2 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/external/SubscribeRequest.java @@ -23,7 +23,7 @@ public class SubscribeRequest implements ValueObject { private Authentication authentication; @Override - public Map getAll() { + public Map values() { return parameter; } diff --git a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java index c57e79e3..4cb89919 100644 --- a/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java +++ b/jetlinks-components/gateway-component/src/main/java/org/jetlinks/community/gateway/supports/DeviceGatewayProperties.java @@ -20,7 +20,7 @@ public class DeviceGatewayProperties implements ValueObject { private Map configuration=new HashMap<>(); @Override - public Map getAll() { + public Map values() { return configuration; } } diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java index b3734e2b..00b2fa5f 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/client/TcpClientProperties.java @@ -35,7 +35,7 @@ public class TcpClientProperties implements ValueObject { private boolean enabled; @Override - public Map getAll() { + public Map values() { return parserConfiguration; } } diff --git a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java index 26dacd73..28dc2d55 100644 --- a/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java +++ b/jetlinks-components/network-component/tcp-component/src/main/java/org/jetlinks/community/network/tcp/server/TcpServerProperties.java @@ -53,7 +53,7 @@ public class TcpServerProperties implements ValueObject { } @Override - public Map getAll() { + public Map values() { return parserConfiguration; } } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java index ce019553..3da01705 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/TimeSeriesData.java @@ -14,7 +14,7 @@ public interface TimeSeriesData extends ValueObject { Map getData(); @Override - default Map getAll() { + default Map values() { return getData(); } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java index 8528aae7..21739331 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/Aggregation.java @@ -2,6 +2,13 @@ package org.jetlinks.community.timeseries.query; public enum Aggregation { - MIN, MAX, AVG, SUM, COUNT, NONE; + MIN, + MAX, + AVG, + SUM, + COUNT, + FIRST, + TOP, + NONE; } \ No newline at end of file diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java index 8a2ce964..75d0baa8 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationData.java @@ -1,5 +1,6 @@ package org.jetlinks.community.timeseries.query; + import org.jetlinks.community.ValueObject; import java.util.Map; @@ -15,7 +16,7 @@ public interface AggregationData extends ValueObject { } @Override - default Map getAll() { + default Map values() { return asMap(); } diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationQueryParam.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationQueryParam.java index 68ce6e84..956d95a1 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationQueryParam.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/AggregationQueryParam.java @@ -4,23 +4,27 @@ import lombok.Getter; import lombok.Setter; import org.hswebframework.ezorm.core.dsl.Query; import org.hswebframework.ezorm.core.param.QueryParam; +import org.hswebframework.web.api.crud.entity.QueryParamEntity; import org.jetlinks.community.Interval; -import java.time.Duration; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.function.Consumer; import java.util.function.Function; +/** + * 聚合查询参数 + */ @Getter @Setter public class AggregationQueryParam { - //集合列 + //聚合列 private List aggColumns = new ArrayList<>(); //按时间分组 + @Deprecated private TimeGroup groupByTime; //按字段分组 @@ -36,12 +40,16 @@ public class AggregationQueryParam { private String timeProperty = "timestamp"; //条件过滤 - private QueryParam queryParam = new QueryParam(); + private QueryParamEntity queryParam = new QueryParamEntity(); public static AggregationQueryParam of() { return new AggregationQueryParam(); } + public T as(Function mapper) { + return mapper.apply(this); + } + public AggregationQueryParam from(long time) { this.startWithTime = time; return this; @@ -66,11 +74,15 @@ public class AggregationQueryParam { return this; } - public AggregationQueryParam agg(String property, String alias, Aggregation agg) { - aggColumns.add(new AggregationColumn(property, alias, agg)); + public AggregationQueryParam agg(AggregationColumn agg) { + aggColumns.add(agg); return this; } + public AggregationQueryParam agg(String property, String alias, Aggregation agg) { + return this.agg(new AggregationColumn(property, alias, agg)); + } + public AggregationQueryParam agg(String property, Aggregation agg) { return agg(property, property, agg); @@ -138,6 +150,10 @@ public class AggregationQueryParam { return groupBy(new Group(property, alias)); } + public AggregationQueryParam groupBy(String property) { + return groupBy(new Group(property, property)); + } + public T execute(Function executor) { return executor.apply(this); } @@ -147,6 +163,11 @@ public class AggregationQueryParam { return this; } + public AggregationQueryParam filter(QueryParamEntity queryParam) { + this.queryParam = queryParam; + return this; + } + public AggregationQueryParam limit(int limit) { this.limit = limit; return this; diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitAggregationColumn.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitAggregationColumn.java new file mode 100644 index 00000000..05099a2b --- /dev/null +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitAggregationColumn.java @@ -0,0 +1,21 @@ +package org.jetlinks.community.timeseries.query; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class LimitAggregationColumn extends AggregationColumn { + + private int limit; + + public LimitAggregationColumn(String property, + String alias, + Aggregation aggregation, + int limit) { + super(property, alias, aggregation); + this.limit = limit; + } +} diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitGroup.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitGroup.java new file mode 100644 index 00000000..7aadbb41 --- /dev/null +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/LimitGroup.java @@ -0,0 +1,15 @@ +package org.jetlinks.community.timeseries.query; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class LimitGroup extends Group { + private int limit; + + public LimitGroup(String property, String alias, int limit) { + super(property, alias); + this.limit = limit; + } +} diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/TimeGroup.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/TimeGroup.java index b5f9fa09..07f00b76 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/TimeGroup.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/query/TimeGroup.java @@ -1,34 +1,28 @@ package org.jetlinks.community.timeseries.query; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.jetlinks.community.Interval; -import java.time.Duration; - @Getter @Setter -@AllArgsConstructor @NoArgsConstructor -public class TimeGroup { +public class TimeGroup extends Group{ - private String property = "timestamp"; - - //时间分组间隔,如: 1d , 30s + /** + * 时间分组间隔,如: 1d , 30s + */ private Interval interval; - private String alias; - /** * 时序时间返回格式 如 YYYY-MM-dd */ private String format; public TimeGroup(Interval interval, String alias, String format) { + super("timestamp",alias); this.interval = interval; - this.alias = alias; this.format = format; } }