fix: 修复设备统计没有产品字段问题

This commit is contained in:
zhouhao 2025-07-30 15:44:28 +08:00
parent 773f9f4bb1
commit b9cf1b3db5
9 changed files with 283 additions and 70 deletions

View File

@ -0,0 +1,25 @@
package org.jetlinks.community.micrometer;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Role;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import java.util.stream.Collectors;
@AutoConfiguration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class MeterRegistryConfiguration {
@Bean(destroyMethod = "shutdown")
@Order(Ordered.HIGHEST_PRECEDENCE + 100)
public MeterRegistryManager meterRegistryManager(ObjectProvider<MeterRegistrySupplier> registrySuppliers,
ObjectProvider<MeterRegistryCustomizer> customizers) {
return new MeterRegistryManager(registrySuppliers.stream().collect(Collectors.toList()),
customizers.stream().collect(Collectors.toList()));
}
}

View File

@ -19,12 +19,14 @@ import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import lombok.Setter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.StringType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -32,24 +34,59 @@ import java.util.stream.Collectors;
* @author wanghzeng
* @since 1.0
**/
@Component
@Setter
public class MeterRegistryManager {
private Map<String, MeterRegistry> meterRegistryMap = new ConcurrentHashMap<>();
@Autowired
private List<MeterRegistrySupplier> suppliers;
private final List<MeterRegistrySupplier> suppliers;
private MeterRegistry createMeterRegistry(String metric, String... tagKeys) {
return new CompositeMeterRegistry(Clock.SYSTEM,
suppliers.stream()
.map(supplier -> supplier.getMeterRegistry(metric, tagKeys))
.collect(Collectors.toList()));
private final List<MeterRegistryCustomizer> customizers;
public MeterRegistryManager(List<MeterRegistrySupplier> suppliers,
List<MeterRegistryCustomizer> customizers) {
this.suppliers = suppliers == null ? Collections.emptyList() : suppliers;
this.customizers = customizers == null ? Collections.emptyList() : customizers;
}
public void shutdown() {
meterRegistryMap
.values()
.forEach(MeterRegistry::close);
}
private MeterRegistry createMeterRegistry(String metric, Map<String, DataType> tagDefine) {
Map<String, DataType> tags = new HashMap<>(tagDefine);
MeterRegistrySettings settings = tags::put;
customizers.forEach(customizer -> customizer.custom(metric, settings));
if (suppliers.size() == 1) {
return suppliers.get(0).getMeterRegistry(metric, tags);
}
return new CompositeMeterRegistry(Clock.SYSTEM, suppliers
.stream()
.map(supplier -> supplier.getMeterRegistry(metric, tags))
.collect(Collectors.toList()));
}
public MeterRegistry getMeterRegister(String metric, String... tagKeys) {
return meterRegistryMap.computeIfAbsent(metric, _metric -> createMeterRegistry(_metric, tagKeys));
return meterRegistryMap.computeIfAbsent(metric, _metric -> {
if (tagKeys.length == 0) {
return createMeterRegistry(metric, Collections.emptyMap());
}
return createMeterRegistry(metric, Arrays
.stream(tagKeys)
.collect(Collectors.toMap(Function.identity(), key -> StringType.GLOBAL)));
});
}
@Deprecated
public MeterRegistry getMeterRegister(String metric, Map<String, DataType> tagDefine) {
return meterRegistryMap.computeIfAbsent(metric, _metric -> createMeterRegistry(_metric, tagDefine));
}
}

View File

@ -16,9 +16,14 @@
package org.jetlinks.community.micrometer;
import io.micrometer.core.instrument.MeterRegistry;
import org.jetlinks.core.metadata.DataType;
import java.util.Map;
public interface MeterRegistrySupplier {
MeterRegistry getMeterRegistry(String metric, String... tagKeys);
MeterRegistry getMeterRegistry(String metric, Map<String, DataType> tagDefine);
}

View File

@ -0,0 +1,91 @@
package org.jetlinks.community.micrometer;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.noop.*;
import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;
import java.util.function.ToLongFunction;
public class NoopMeterRegistry extends MeterRegistry {
public static final NoopMeterRegistry INSTANCE = new NoopMeterRegistry();
private NoopMeterRegistry() {
super(Clock.SYSTEM);
}
@Override
@Nonnull
protected <T> Gauge newGauge(@Nonnull Meter.Id id, T t, @Nonnull ToDoubleFunction<T> toDoubleFunction) {
return new NoopGauge(id);
}
@Override
@Nonnull
protected Counter newCounter(@Nonnull Meter.Id id) {
return new NoopCounter(id);
}
@Override
protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig) {
return new NoopLongTaskTimer(id);
}
@Override
protected <T> TimeGauge newTimeGauge(Meter.Id id, T obj, TimeUnit valueFunctionUnit, ToDoubleFunction<T> valueFunction) {
return new NoopTimeGauge(id);
}
@Override
protected LongTaskTimer newLongTaskTimer(Meter.Id id) {
return new NoopLongTaskTimer(id);
}
@Override
@Nonnull
protected Timer newTimer(@Nonnull Meter.Id id, @Nonnull DistributionStatisticConfig distributionStatisticConfig, @Nonnull PauseDetector pauseDetector) {
return new NoopTimer(id);
}
@Override
@Nonnull
protected DistributionSummary newDistributionSummary(@Nonnull Meter.Id id, @Nonnull DistributionStatisticConfig distributionStatisticConfig, double v) {
return new NoopDistributionSummary(id);
}
@Override
@Nonnull
protected Meter newMeter(@Nonnull Meter.Id id, @Nonnull Meter.Type type, @Nonnull Iterable<Measurement> iterable) {
return new NoopMeter(id);
}
@Override
@Nonnull
protected <T> FunctionTimer newFunctionTimer(@Nonnull Meter.Id id, @Nonnull T t,
@Nonnull ToLongFunction<T> toLongFunction,
@Nonnull ToDoubleFunction<T> toDoubleFunction,
@Nonnull TimeUnit timeUnit) {
return new NoopFunctionTimer(id);
}
@Override
@Nonnull
protected <T> FunctionCounter newFunctionCounter(@Nonnull Meter.Id id, @Nonnull T t, @Nonnull ToDoubleFunction<T> toDoubleFunction) {
return new NoopFunctionCounter(id);
}
@Override
@Nonnull
protected TimeUnit getBaseTimeUnit() {
return TimeUnit.MILLISECONDS;
}
@Override
@Nonnull
protected DistributionStatisticConfig defaultHistogramConfig() {
return DistributionStatisticConfig.NONE;
}
}

View File

@ -1,3 +1,4 @@
org.jetlinks.community.configuration.CommonConfiguration
org.jetlinks.community.dictionary.DictionaryConfiguration
org.jetlinks.community.configuration.UiResourceConfiguration
org.jetlinks.community.micrometer.MeterRegistryConfiguration

View File

@ -16,18 +16,22 @@
package org.jetlinks.community.timeseries.micrometer;
import io.micrometer.core.instrument.*;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class MeterTimeSeriesData implements TimeSeriesData {
private Map<String, Object> data = new HashMap<>();
private final Map<String, Object> data = new HashMap<>(24);
private long timestamp = System.currentTimeMillis();
private final long timestamp = System.currentTimeMillis();
@Override
public long getTimestamp() {
@ -52,9 +56,14 @@ public class MeterTimeSeriesData implements TimeSeriesData {
return this;
}
public MeterTimeSeriesData write(List<Tag> tags) {
public MeterTimeSeriesData write(List<Tag> tags, Function<String, DataType> typeGetter) {
for (Tag tag : tags) {
data.put(tag.getKey(), tag.getValue());
DataType type = typeGetter.apply(tag.getKey());
Object value = tag.getValue();
if (type instanceof Converter) {
value = ((Converter<?>) type).convert(value);
}
data.put(tag.getKey(), value);
}
return this;
}
@ -84,7 +93,6 @@ public class MeterTimeSeriesData implements TimeSeriesData {
}
public MeterTimeSeriesData write(FunctionTimer timer) {
data.put("count", timer.count());
data.put("sum", timer.totalTime(TimeUnit.MILLISECONDS));
data.put("mean", timer.mean(TimeUnit.MILLISECONDS));
@ -126,6 +134,7 @@ public class MeterTimeSeriesData implements TimeSeriesData {
public static MeterTimeSeriesData of(Meter meter) {
MeterTimeSeriesData data = new MeterTimeSeriesData();
data.data.put("id", IDGenerator.RANDOM.generate());
data.write(meter);
meter.match(
data::write,

View File

@ -19,6 +19,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import org.jetlinks.community.timeseries.TimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.DoubleType;
@ -26,14 +27,13 @@ import org.jetlinks.core.metadata.types.StringType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Getter
@AllArgsConstructor(staticName = "of")
class MeterTimeSeriesMetadata implements TimeSeriesMetadata {
@Getter
private TimeSeriesMetric metric;
@Getter
private List<String> keys;
private Map<String, DataType> keys;
static final List<PropertyMetadata> properties = new ArrayList<>();
@ -133,11 +133,11 @@ class MeterTimeSeriesMetadata implements TimeSeriesMetadata {
public List<PropertyMetadata> getProperties() {
List<PropertyMetadata> metadata = new ArrayList<>(properties);
for (String key : keys) {
for (Map.Entry<String,DataType> key : keys.entrySet()) {
SimplePropertyMetadata property = new SimplePropertyMetadata();
property.setId(key);
property.setName(key);
property.setValueType(new StringType());
property.setId(key.getKey());
property.setName(key.getKey());
property.setValueType(key.getValue());
metadata.add(property);
}
return metadata;

View File

@ -18,68 +18,107 @@ package org.jetlinks.community.timeseries.micrometer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.netty.util.concurrent.FastThreadLocalThread;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.types.StringType;
import reactor.core.publisher.Flux;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
public class TimeSeriesMeterRegistry extends StepMeterRegistry {
public class TimeSeriesMeterRegistry extends StepMeterRegistry implements ThreadFactory {
TimeSeriesManager timeSeriesManager;
TimeSeriesMetric metric;
private static final ThreadFactory DEFAULT_THREAD_FACTORY = new NamedThreadFactory("time-series-metrics-publisher");
AtomicInteger count = new AtomicInteger();
private Map<String, String> customTags;
private final Map<String, String> customTags;
private List<String> keys = new ArrayList<>();
private final Map<String, DataType> tagDefine;
public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager,
TimeSeriesMetric metric,
TimeSeriesRegistryProperties config,
Map<String, String> customTags,String ...tagKeys) {
Map<String, String> customTags,
String... tagKeys) {
this(timeSeriesManager,
metric,
config,
customTags,
Arrays
.stream(tagKeys)
.collect(Collectors.toMap(Function.identity(), ignore -> StringType.GLOBAL)));
}
public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager,
TimeSeriesMetric metric,
TimeSeriesRegistryProperties config,
Map<String, String> customTags,
Map<String, DataType> tagDefine) {
super(new TimeSeriesPropertiesPropertiesConfigAdapter(config), Clock.SYSTEM);
this.timeSeriesManager = timeSeriesManager;
this.metric = metric;
this.customTags = customTags;
keys.addAll(customTags.keySet());
keys.addAll(Arrays.asList(tagKeys));
keys.addAll(config.getCustomTagKeys());
start(DEFAULT_THREAD_FACTORY);
this.tagDefine = new HashMap<>(tagDefine);
start(this);
}
@Override
@SuppressWarnings("all")
public Thread newThread(@Nonnull Runnable r) {
return new FastThreadLocalThread(
r,
"time-series-metric-" + metric.getId() + (count.getAndIncrement() == 0 ? "" : ("-" + count.get())));
}
public void reload() {
timeSeriesManager
.registerMetadata(MeterTimeSeriesMetadata.of(metric, tagDefine))
.subscribe(ignore -> {
},
error -> log.error("register metric[{}] metadata error", metric.getId(), error));
}
@Override
public void start(ThreadFactory threadFactory) {
super.start(threadFactory);
timeSeriesManager.registerMetadata(MeterTimeSeriesMetadata.of(metric,keys))
.doOnError(e -> log.error("register metric [{}] metadata error", metric.getId(), e))
.subscribe((r) -> log.error("register metric [{}] metadata success", metric.getId()));
reload();
}
@Override
@SneakyThrows
protected void publish() {
timeSeriesManager
.getService(metric)
.save(Flux.fromIterable(this.getMeters())
.map(meter -> MeterTimeSeriesData.of(meter)
.name(getConventionName(meter.getId()))
.write(customTags)
.write(getConventionTags(meter.getId()))))
.doOnError(e -> log.error("failed to send metrics [{}]",metric.getId(), e))
.doOnSuccess(nil -> log.debug("success send metrics [{}]",metric.getId()))
.subscribe();
Flux.fromIterable(this.getMeters())
.map(meter -> MeterTimeSeriesData
.of(meter)
.name(getConventionName(meter.getId()))
.write(customTags)
.write(getConventionTags(meter.getId()), (key) -> tagDefine.getOrDefault(key, StringType.GLOBAL)))
.flatMap(timeSeriesManager.getService(metric)::commit)
.then()
.toFuture()
.get();
}
@Override
public void close() {
log.info("close micrometer metric [{}]", metric.getId());
super.close();
}
@Override
@Nonnull

View File

@ -1,26 +1,12 @@
/*
* Copyright 2025 JetLinks https://www.jetlinks.cn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jetlinks.community.timeseries.micrometer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.community.micrometer.MeterRegistrySupplier;
import org.jetlinks.community.micrometer.NoopMeterRegistry;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
@ -63,14 +49,34 @@ public class TimeSeriesMeterRegistrySupplier implements MeterRegistrySupplier {
@Override
public MeterRegistry getMeterRegistry(String metric, String... tagKeys) {
if (ignore.contains(metric)) {
return new SimpleMeterRegistry();
return NoopMeterRegistry.INSTANCE;
}
TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry(timeSeriesManager,
TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry(
timeSeriesManager,
TimeSeriesMetric.of(metric),
metrics.getOrDefault(metric, metrics.get("default")),
tags, tagKeys);
tags(), tagKeys);
registry.config().namingConvention(namingSupports.get(naming));
return registry;
}
private Map<String, String> tags() {
return this.tags;
}
@Override
public MeterRegistry getMeterRegistry(String metric,
Map<String, DataType> tagDefine) {
if (ignore.contains(metric)) {
return NoopMeterRegistry.INSTANCE;
}
TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry(
timeSeriesManager,
TimeSeriesMetric.of(metric),
metrics.getOrDefault(metric, metrics.get("default")),
tags(),
tagDefine);
registry.config().namingConvention(namingSupports.get(naming));
return registry;
}
}