From 50b2ca9b446fb52fce5a689d748e83eceaa214ef Mon Sep 17 00:00:00 2001 From: zhouhao Date: Tue, 10 Jun 2025 14:40:09 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8Des=E8=81=9A=E5=90=88?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../search/ElasticSearch7xSupport.java | 2 +- .../search/ElasticSearch8xSupport.java | 2 +- .../reactive/ReactiveAggregationService.java | 52 +++++++++++++------ .../src/main/resources/application.yml | 12 +++++ 4 files changed, 49 insertions(+), 19 deletions(-) diff --git a/jetlinks-components/elasticsearch-component/elasticsearch-7x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch7xSupport.java b/jetlinks-components/elasticsearch-component/elasticsearch-7x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch7xSupport.java index 02c59eb4..50f78ea5 100644 --- a/jetlinks-components/elasticsearch-component/elasticsearch-7x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch7xSupport.java +++ b/jetlinks-components/elasticsearch-component/elasticsearch-7x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch7xSupport.java @@ -70,7 +70,7 @@ public class ElasticSearch7xSupport extends ElasticSearchSupport { } @Override - protected Object getBucketKey(MultiBucketBase bucket) { + public Object getBucketKey(MultiBucketBase bucket) { if (bucket instanceof DateHistogramBucket _bucket) { return _bucket.key(); } diff --git a/jetlinks-components/elasticsearch-component/elasticsearch-8x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch8xSupport.java b/jetlinks-components/elasticsearch-component/elasticsearch-8x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch8xSupport.java index af0c6003..67a1551e 100644 --- a/jetlinks-components/elasticsearch-component/elasticsearch-8x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch8xSupport.java +++ b/jetlinks-components/elasticsearch-component/elasticsearch-8x/src/main/java/org/jetlinks/community/elastic/search/ElasticSearch8xSupport.java @@ -76,7 +76,7 @@ public class ElasticSearch8xSupport extends ElasticSearchSupport { } @Override - protected Object getBucketKey(MultiBucketBase bucket) { + public Object getBucketKey(MultiBucketBase bucket) { if (bucket instanceof DateHistogramBucket _bucket) { return _bucket.key(); } diff --git a/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java b/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java index 9253b29b..03af8457 100755 --- a/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java +++ b/jetlinks-components/elasticsearch-component/elasticsearch-core/src/main/java/org/jetlinks/community/elastic/search/service/reactive/ReactiveAggregationService.java @@ -84,8 +84,8 @@ public class ReactiveAggregationService implements AggregationService { his.order(Collections.singletonList( NamedValue.of("_key", SortOrder.Desc) )); - if(timeGroup.getOffset()>0){ - his.offset(Time.of(b->b.offset((int)timeGroup.getOffset()))); + if (timeGroup.getOffset() > 0) { + his.offset(Time.of(b -> b.offset((int) timeGroup.getOffset()))); } if (StringUtils.hasText(timeGroup.getFormat())) { String format = timeGroup.getFormat(); @@ -238,7 +238,7 @@ public class ReactiveAggregationService implements AggregationService { .aggregations(createAggregations(metadata, aggregationQueryParam)), Map.class)) .flatMapMany(resp -> Flux .fromIterable(resp.aggregations().entrySet()) - .concatMap(e -> parseAggregation(e.getKey(), e.getValue()))) + .concatMap(e -> parseAggregation(e.getKey(), e.getValue(), -1))) .as(flux -> { if (!isGroup) { return flux @@ -279,7 +279,7 @@ public class ReactiveAggregationService implements AggregationService { .aggregations(createAggregations(metadata, aggregationQueryParam)), Map.class)) .flatMapMany(resp -> Flux .fromIterable(resp.aggregations().entrySet()) - .concatMap(e -> parseAggregation(e.getKey(), e.getValue()))) + .concatMap(e -> parseAggregation(e.getKey(), e.getValue(), -1))) .as(flux -> { if (!isGroup) { return flux @@ -295,18 +295,30 @@ public class ReactiveAggregationService implements AggregationService { } private Flux> parseAggregation(String name, - Aggregate aggregate) { + Aggregate aggregate, + long docCount) { if (aggregate.isSum()) { - return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.sum().value()))); + return Flux.just(docCount == 0 + ? Collections.emptyMap() + : Collections.singletonMap(name, getSafeNumber(aggregate.sum().value()))); } if (aggregate.isAvg()) { - return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.avg().value()))); + return Flux.just(docCount == 0 + ? Collections.emptyMap() + : Collections.singletonMap(name, getSafeNumber(aggregate.avg().value()))); } if (aggregate.isMax()) { - return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.max().value()))); + return Flux.just(docCount == 0 + ? Collections.emptyMap() + : Collections.singletonMap(name, getSafeNumber(aggregate.max().value()))); } if (aggregate.isMin()) { - return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.min().value()))); + return Flux.just(docCount == 0 + ? Collections.emptyMap() + : Collections.singletonMap(name, getSafeNumber(aggregate.min().value()))); + } + if (aggregate.isCardinality()) { + return Flux.just(Collections.singletonMap(name, aggregate.cardinality().value())); } if (aggregate.isFilter()) { return Flux.just(Collections.singletonMap(name, aggregate.filter().docCount())); @@ -339,8 +351,13 @@ public class ReactiveAggregationService implements AggregationService { if (aggregate.isTopHits()) { return Flux .fromIterable(aggregate.topHits().hits().hits()) - .map(hit -> { - Map val = hit.source().to(Map.class); + .mapNotNull(hit -> { + JsonData source = hit.source(); + if (source == null) { + return null; + } + @SuppressWarnings("all") + Map val = source.to(Map.class); if (!val.containsKey("id")) { val.put("id", hit.id()); } @@ -350,9 +367,7 @@ public class ReactiveAggregationService implements AggregationService { if (aggregate.isDateHistogram()) { return parseAggregation(name, aggregate.dateHistogram()); } - if (aggregate.isCardinality()) { - return Flux.just(Collections.singletonMap(name, aggregate.cardinality().value())); - } + log.warn("unsupported aggregation {} : {}", aggregate._kind(), aggregate); return Flux.empty(); } @@ -372,7 +387,10 @@ public class ReactiveAggregationService implements AggregationService { return bucketFlux .concatMap(bucket -> Flux .fromIterable(bucket.aggregations().entrySet()) - .concatMap(e -> parseAggregation(e.getKey(), e.getValue()), 0) + .concatMap(e -> { + + return parseAggregation(e.getKey(), e.getValue(), bucket.docCount()); + }, 0) .map(map -> transformBucket(name, map, bucket)), 0); } @@ -407,10 +425,10 @@ public class ReactiveAggregationService implements AggregationService { return flux.concatMap(base -> Flux .fromIterable(base.aggregations().entrySet()) - .concatMap(e -> parseAggregation(e.getKey(), e.getValue()), 0) + .concatMap(e -> parseAggregation(e.getKey(), e.getValue(), base.docCount()), 0) .map(map -> { Map val = new HashMap<>(map); - val.put(name, parseBucket(base)); + val.putIfAbsent(name, parseBucket(base)); return val; }), 0); } diff --git a/jetlinks-standalone/src/main/resources/application.yml b/jetlinks-standalone/src/main/resources/application.yml index 6d181fba..b400349c 100644 --- a/jetlinks-standalone/src/main/resources/application.yml +++ b/jetlinks-standalone/src/main/resources/application.yml @@ -286,3 +286,15 @@ network: - 1883-1890 - 8800-8810 - 5060-5061 +trace: + # 开启链路追踪 + enabled: true + jaeger: # https://hanta.yuque.com/px7kg1/dev/nn2r53n5w8ttap8a + enabled: false # 记录链路数据到jaeger + endpoint: "http://127.0.0.1:14250" + ignore-spans: + - "/device/*/upstream" + - "/device/*/decode" + - "/java/TimeSeriesMessageWriterConnector/writeDeviceMessageToTs" + - "/java/DeviceStatusMeasurementProvider/incrementOnline" + - "/java/DeviceMessageMeasurementProvider/incrementMessage" \ No newline at end of file