fix: 修复es聚合查询错误

This commit is contained in:
zhouhao 2025-06-10 14:40:09 +08:00
parent 2157880905
commit 50b2ca9b44
4 changed files with 49 additions and 19 deletions

View File

@ -70,7 +70,7 @@ public class ElasticSearch7xSupport extends ElasticSearchSupport {
}
@Override
protected Object getBucketKey(MultiBucketBase bucket) {
public Object getBucketKey(MultiBucketBase bucket) {
if (bucket instanceof DateHistogramBucket _bucket) {
return _bucket.key();
}

View File

@ -76,7 +76,7 @@ public class ElasticSearch8xSupport extends ElasticSearchSupport {
}
@Override
protected Object getBucketKey(MultiBucketBase bucket) {
public Object getBucketKey(MultiBucketBase bucket) {
if (bucket instanceof DateHistogramBucket _bucket) {
return _bucket.key();
}

View File

@ -84,8 +84,8 @@ public class ReactiveAggregationService implements AggregationService {
his.order(Collections.singletonList(
NamedValue.of("_key", SortOrder.Desc)
));
if(timeGroup.getOffset()>0){
his.offset(Time.of(b->b.offset((int)timeGroup.getOffset())));
if (timeGroup.getOffset() > 0) {
his.offset(Time.of(b -> b.offset((int) timeGroup.getOffset())));
}
if (StringUtils.hasText(timeGroup.getFormat())) {
String format = timeGroup.getFormat();
@ -238,7 +238,7 @@ public class ReactiveAggregationService implements AggregationService {
.aggregations(createAggregations(metadata, aggregationQueryParam)), Map.class))
.flatMapMany(resp -> Flux
.fromIterable(resp.aggregations().entrySet())
.concatMap(e -> parseAggregation(e.getKey(), e.getValue())))
.concatMap(e -> parseAggregation(e.getKey(), e.getValue(), -1)))
.as(flux -> {
if (!isGroup) {
return flux
@ -279,7 +279,7 @@ public class ReactiveAggregationService implements AggregationService {
.aggregations(createAggregations(metadata, aggregationQueryParam)), Map.class))
.flatMapMany(resp -> Flux
.fromIterable(resp.aggregations().entrySet())
.concatMap(e -> parseAggregation(e.getKey(), e.getValue())))
.concatMap(e -> parseAggregation(e.getKey(), e.getValue(), -1)))
.as(flux -> {
if (!isGroup) {
return flux
@ -295,18 +295,30 @@ public class ReactiveAggregationService implements AggregationService {
}
private Flux<Map<String, Object>> parseAggregation(String name,
Aggregate aggregate) {
Aggregate aggregate,
long docCount) {
if (aggregate.isSum()) {
return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.sum().value())));
return Flux.just(docCount == 0
? Collections.emptyMap()
: Collections.singletonMap(name, getSafeNumber(aggregate.sum().value())));
}
if (aggregate.isAvg()) {
return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.avg().value())));
return Flux.just(docCount == 0
? Collections.emptyMap()
: Collections.singletonMap(name, getSafeNumber(aggregate.avg().value())));
}
if (aggregate.isMax()) {
return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.max().value())));
return Flux.just(docCount == 0
? Collections.emptyMap()
: Collections.singletonMap(name, getSafeNumber(aggregate.max().value())));
}
if (aggregate.isMin()) {
return Flux.just(Collections.singletonMap(name, getSafeNumber(aggregate.min().value())));
return Flux.just(docCount == 0
? Collections.emptyMap()
: Collections.singletonMap(name, getSafeNumber(aggregate.min().value())));
}
if (aggregate.isCardinality()) {
return Flux.just(Collections.singletonMap(name, aggregate.cardinality().value()));
}
if (aggregate.isFilter()) {
return Flux.just(Collections.singletonMap(name, aggregate.filter().docCount()));
@ -339,8 +351,13 @@ public class ReactiveAggregationService implements AggregationService {
if (aggregate.isTopHits()) {
return Flux
.fromIterable(aggregate.topHits().hits().hits())
.map(hit -> {
Map<String, Object> val = hit.source().to(Map.class);
.mapNotNull(hit -> {
JsonData source = hit.source();
if (source == null) {
return null;
}
@SuppressWarnings("all")
Map<String, Object> val = source.to(Map.class);
if (!val.containsKey("id")) {
val.put("id", hit.id());
}
@ -350,9 +367,7 @@ public class ReactiveAggregationService implements AggregationService {
if (aggregate.isDateHistogram()) {
return parseAggregation(name, aggregate.dateHistogram());
}
if (aggregate.isCardinality()) {
return Flux.just(Collections.singletonMap(name, aggregate.cardinality().value()));
}
log.warn("unsupported aggregation {} : {}", aggregate._kind(), aggregate);
return Flux.empty();
}
@ -372,7 +387,10 @@ public class ReactiveAggregationService implements AggregationService {
return bucketFlux
.concatMap(bucket -> Flux
.fromIterable(bucket.aggregations().entrySet())
.concatMap(e -> parseAggregation(e.getKey(), e.getValue()), 0)
.concatMap(e -> {
return parseAggregation(e.getKey(), e.getValue(), bucket.docCount());
}, 0)
.map(map -> transformBucket(name, map, bucket)), 0);
}
@ -407,10 +425,10 @@ public class ReactiveAggregationService implements AggregationService {
return flux.concatMap(base -> Flux
.fromIterable(base.aggregations().entrySet())
.concatMap(e -> parseAggregation(e.getKey(), e.getValue()), 0)
.concatMap(e -> parseAggregation(e.getKey(), e.getValue(), base.docCount()), 0)
.map(map -> {
Map<String, Object> val = new HashMap<>(map);
val.put(name, parseBucket(base));
val.putIfAbsent(name, parseBucket(base));
return val;
}), 0);
}

View File

@ -286,3 +286,15 @@ network:
- 1883-1890
- 8800-8810
- 5060-5061
trace:
# 开启链路追踪
enabled: true
jaeger: # https://hanta.yuque.com/px7kg1/dev/nn2r53n5w8ttap8a
enabled: false # 记录链路数据到jaeger
endpoint: "http://127.0.0.1:14250"
ignore-spans:
- "/device/*/upstream"
- "/device/*/decode"
- "/java/TimeSeriesMessageWriterConnector/writeDeviceMessageToTs"
- "/java/DeviceStatusMeasurementProvider/incrementOnline"
- "/java/DeviceMessageMeasurementProvider/incrementMessage"