diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryConfiguration.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryConfiguration.java new file mode 100644 index 00000000..253f2899 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryConfiguration.java @@ -0,0 +1,25 @@ +package org.jetlinks.community.micrometer; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.config.BeanDefinition; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Role; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; + +import java.util.stream.Collectors; + +@AutoConfiguration +@Role(BeanDefinition.ROLE_INFRASTRUCTURE) +public class MeterRegistryConfiguration { + + @Bean(destroyMethod = "shutdown") + @Order(Ordered.HIGHEST_PRECEDENCE + 100) + public MeterRegistryManager meterRegistryManager(ObjectProvider registrySuppliers, + ObjectProvider customizers) { + return new MeterRegistryManager(registrySuppliers.stream().collect(Collectors.toList()), + customizers.stream().collect(Collectors.toList())); + } + +} diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java index eae786ab..7536e8fa 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistryManager.java @@ -19,12 +19,14 @@ import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; import lombok.Setter; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.types.StringType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -32,24 +34,59 @@ import java.util.stream.Collectors; * @author wanghzeng * @since 1.0 **/ -@Component @Setter public class MeterRegistryManager { + private Map meterRegistryMap = new ConcurrentHashMap<>(); - @Autowired - private List suppliers; + private final List suppliers; - private MeterRegistry createMeterRegistry(String metric, String... tagKeys) { - return new CompositeMeterRegistry(Clock.SYSTEM, - suppliers.stream() - .map(supplier -> supplier.getMeterRegistry(metric, tagKeys)) - .collect(Collectors.toList())); + private final List customizers; + + public MeterRegistryManager(List suppliers, + List customizers) { + this.suppliers = suppliers == null ? Collections.emptyList() : suppliers; + this.customizers = customizers == null ? Collections.emptyList() : customizers; + } + + public void shutdown() { + meterRegistryMap + .values() + .forEach(MeterRegistry::close); + } + + private MeterRegistry createMeterRegistry(String metric, Map tagDefine) { + Map tags = new HashMap<>(tagDefine); + MeterRegistrySettings settings = tags::put; + customizers.forEach(customizer -> customizer.custom(metric, settings)); + + if (suppliers.size() == 1) { + return suppliers.get(0).getMeterRegistry(metric, tags); + } + + return new CompositeMeterRegistry(Clock.SYSTEM, suppliers + .stream() + .map(supplier -> supplier.getMeterRegistry(metric, tags)) + .collect(Collectors.toList())); } public MeterRegistry getMeterRegister(String metric, String... tagKeys) { - return meterRegistryMap.computeIfAbsent(metric, _metric -> createMeterRegistry(_metric, tagKeys)); + + return meterRegistryMap.computeIfAbsent(metric, _metric -> { + if (tagKeys.length == 0) { + return createMeterRegistry(metric, Collections.emptyMap()); + } + return createMeterRegistry(metric, Arrays + .stream(tagKeys) + .collect(Collectors.toMap(Function.identity(), key -> StringType.GLOBAL))); + }); } + @Deprecated + public MeterRegistry getMeterRegister(String metric, Map tagDefine) { + return meterRegistryMap.computeIfAbsent(metric, _metric -> createMeterRegistry(_metric, tagDefine)); + } + + } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java index 706e4672..8ff5edc4 100644 --- a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/MeterRegistrySupplier.java @@ -16,9 +16,14 @@ package org.jetlinks.community.micrometer; import io.micrometer.core.instrument.MeterRegistry; +import org.jetlinks.core.metadata.DataType; + +import java.util.Map; public interface MeterRegistrySupplier { MeterRegistry getMeterRegistry(String metric, String... tagKeys); + MeterRegistry getMeterRegistry(String metric, Map tagDefine); + } diff --git a/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/NoopMeterRegistry.java b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/NoopMeterRegistry.java new file mode 100644 index 00000000..68469c83 --- /dev/null +++ b/jetlinks-components/common-component/src/main/java/org/jetlinks/community/micrometer/NoopMeterRegistry.java @@ -0,0 +1,91 @@ +package org.jetlinks.community.micrometer; + +import io.micrometer.core.instrument.*; +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; +import io.micrometer.core.instrument.distribution.pause.PauseDetector; +import io.micrometer.core.instrument.noop.*; + +import javax.annotation.Nonnull; +import java.util.concurrent.TimeUnit; +import java.util.function.ToDoubleFunction; +import java.util.function.ToLongFunction; + +public class NoopMeterRegistry extends MeterRegistry { + public static final NoopMeterRegistry INSTANCE = new NoopMeterRegistry(); + + private NoopMeterRegistry() { + super(Clock.SYSTEM); + } + + @Override + @Nonnull + protected Gauge newGauge(@Nonnull Meter.Id id, T t, @Nonnull ToDoubleFunction toDoubleFunction) { + return new NoopGauge(id); + } + + @Override + @Nonnull + protected Counter newCounter(@Nonnull Meter.Id id) { + return new NoopCounter(id); + } + + @Override + protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig) { + return new NoopLongTaskTimer(id); + } + + @Override + protected TimeGauge newTimeGauge(Meter.Id id, T obj, TimeUnit valueFunctionUnit, ToDoubleFunction valueFunction) { + return new NoopTimeGauge(id); + } + + @Override + protected LongTaskTimer newLongTaskTimer(Meter.Id id) { + return new NoopLongTaskTimer(id); + } + + @Override + @Nonnull + protected Timer newTimer(@Nonnull Meter.Id id, @Nonnull DistributionStatisticConfig distributionStatisticConfig, @Nonnull PauseDetector pauseDetector) { + return new NoopTimer(id); + } + + @Override + @Nonnull + protected DistributionSummary newDistributionSummary(@Nonnull Meter.Id id, @Nonnull DistributionStatisticConfig distributionStatisticConfig, double v) { + return new NoopDistributionSummary(id); + } + + @Override + @Nonnull + protected Meter newMeter(@Nonnull Meter.Id id, @Nonnull Meter.Type type, @Nonnull Iterable iterable) { + return new NoopMeter(id); + } + + @Override + @Nonnull + protected FunctionTimer newFunctionTimer(@Nonnull Meter.Id id, @Nonnull T t, + @Nonnull ToLongFunction toLongFunction, + @Nonnull ToDoubleFunction toDoubleFunction, + @Nonnull TimeUnit timeUnit) { + return new NoopFunctionTimer(id); + } + + @Override + @Nonnull + protected FunctionCounter newFunctionCounter(@Nonnull Meter.Id id, @Nonnull T t, @Nonnull ToDoubleFunction toDoubleFunction) { + return new NoopFunctionCounter(id); + } + + @Override + @Nonnull + protected TimeUnit getBaseTimeUnit() { + return TimeUnit.MILLISECONDS; + } + + @Override + @Nonnull + protected DistributionStatisticConfig defaultHistogramConfig() { + return DistributionStatisticConfig.NONE; + } +} diff --git a/jetlinks-components/common-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/jetlinks-components/common-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 8b4bd189..435d46d5 100644 --- a/jetlinks-components/common-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/jetlinks-components/common-component/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1,3 +1,4 @@ org.jetlinks.community.configuration.CommonConfiguration org.jetlinks.community.dictionary.DictionaryConfiguration -org.jetlinks.community.configuration.UiResourceConfiguration \ No newline at end of file +org.jetlinks.community.configuration.UiResourceConfiguration +org.jetlinks.community.micrometer.MeterRegistryConfiguration \ No newline at end of file diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesData.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesData.java index 32aad402..66a4fef9 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesData.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesData.java @@ -16,18 +16,22 @@ package org.jetlinks.community.timeseries.micrometer; import io.micrometer.core.instrument.*; +import org.hswebframework.web.id.IDGenerator; import org.jetlinks.community.timeseries.TimeSeriesData; +import org.jetlinks.core.metadata.Converter; +import org.jetlinks.core.metadata.DataType; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Function; public class MeterTimeSeriesData implements TimeSeriesData { - private Map data = new HashMap<>(); + private final Map data = new HashMap<>(24); - private long timestamp = System.currentTimeMillis(); + private final long timestamp = System.currentTimeMillis(); @Override public long getTimestamp() { @@ -52,9 +56,14 @@ public class MeterTimeSeriesData implements TimeSeriesData { return this; } - public MeterTimeSeriesData write(List tags) { + public MeterTimeSeriesData write(List tags, Function typeGetter) { for (Tag tag : tags) { - data.put(tag.getKey(), tag.getValue()); + DataType type = typeGetter.apply(tag.getKey()); + Object value = tag.getValue(); + if (type instanceof Converter) { + value = ((Converter) type).convert(value); + } + data.put(tag.getKey(), value); } return this; } @@ -84,7 +93,6 @@ public class MeterTimeSeriesData implements TimeSeriesData { } public MeterTimeSeriesData write(FunctionTimer timer) { - data.put("count", timer.count()); data.put("sum", timer.totalTime(TimeUnit.MILLISECONDS)); data.put("mean", timer.mean(TimeUnit.MILLISECONDS)); @@ -126,6 +134,7 @@ public class MeterTimeSeriesData implements TimeSeriesData { public static MeterTimeSeriesData of(Meter meter) { MeterTimeSeriesData data = new MeterTimeSeriesData(); + data.data.put("id", IDGenerator.RANDOM.generate()); data.write(meter); meter.match( data::write, diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java index 40614e9c..2b64564c 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/MeterTimeSeriesMetadata.java @@ -19,6 +19,7 @@ import lombok.AllArgsConstructor; import lombok.Getter; import org.jetlinks.community.timeseries.TimeSeriesMetadata; import org.jetlinks.community.timeseries.TimeSeriesMetric; +import org.jetlinks.core.metadata.DataType; import org.jetlinks.core.metadata.PropertyMetadata; import org.jetlinks.core.metadata.SimplePropertyMetadata; import org.jetlinks.core.metadata.types.DoubleType; @@ -26,14 +27,13 @@ import org.jetlinks.core.metadata.types.StringType; import java.util.ArrayList; import java.util.List; - +import java.util.Map; +@Getter @AllArgsConstructor(staticName = "of") class MeterTimeSeriesMetadata implements TimeSeriesMetadata { - @Getter private TimeSeriesMetric metric; - @Getter - private List keys; + private Map keys; static final List properties = new ArrayList<>(); @@ -133,11 +133,11 @@ class MeterTimeSeriesMetadata implements TimeSeriesMetadata { public List getProperties() { List metadata = new ArrayList<>(properties); - for (String key : keys) { + for (Map.Entry key : keys.entrySet()) { SimplePropertyMetadata property = new SimplePropertyMetadata(); - property.setId(key); - property.setName(key); - property.setValueType(new StringType()); + property.setId(key.getKey()); + property.setName(key.getKey()); + property.setValueType(key.getValue()); metadata.add(property); } return metadata; diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java index 4511c1ea..f6996132 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistry.java @@ -18,68 +18,107 @@ package org.jetlinks.community.timeseries.micrometer; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.step.StepMeterRegistry; import io.micrometer.core.instrument.util.NamedThreadFactory; +import io.netty.util.concurrent.FastThreadLocalThread; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.TimeSeriesMetric; +import org.jetlinks.core.metadata.DataType; +import org.jetlinks.core.metadata.types.StringType; import reactor.core.publisher.Flux; import javax.annotation.Nonnull; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; @Slf4j -public class TimeSeriesMeterRegistry extends StepMeterRegistry { +public class TimeSeriesMeterRegistry extends StepMeterRegistry implements ThreadFactory { TimeSeriesManager timeSeriesManager; TimeSeriesMetric metric; - private static final ThreadFactory DEFAULT_THREAD_FACTORY = new NamedThreadFactory("time-series-metrics-publisher"); + AtomicInteger count = new AtomicInteger(); - private Map customTags; + private final Map customTags; - private List keys = new ArrayList<>(); + private final Map tagDefine; public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager, TimeSeriesMetric metric, TimeSeriesRegistryProperties config, - Map customTags,String ...tagKeys) { + Map customTags, + String... tagKeys) { + this(timeSeriesManager, + metric, + config, + customTags, + Arrays + .stream(tagKeys) + .collect(Collectors.toMap(Function.identity(), ignore -> StringType.GLOBAL))); + } + + public TimeSeriesMeterRegistry(TimeSeriesManager timeSeriesManager, + TimeSeriesMetric metric, + TimeSeriesRegistryProperties config, + Map customTags, + Map tagDefine) { super(new TimeSeriesPropertiesPropertiesConfigAdapter(config), Clock.SYSTEM); this.timeSeriesManager = timeSeriesManager; this.metric = metric; this.customTags = customTags; - keys.addAll(customTags.keySet()); - keys.addAll(Arrays.asList(tagKeys)); - keys.addAll(config.getCustomTagKeys()); - start(DEFAULT_THREAD_FACTORY); + + this.tagDefine = new HashMap<>(tagDefine); + start(this); + } + + + @Override + @SuppressWarnings("all") + public Thread newThread(@Nonnull Runnable r) { + return new FastThreadLocalThread( + r, + "time-series-metric-" + metric.getId() + (count.getAndIncrement() == 0 ? "" : ("-" + count.get()))); + } + + public void reload() { + timeSeriesManager + .registerMetadata(MeterTimeSeriesMetadata.of(metric, tagDefine)) + .subscribe(ignore -> { + }, + error -> log.error("register metric[{}] metadata error", metric.getId(), error)); } @Override public void start(ThreadFactory threadFactory) { super.start(threadFactory); - timeSeriesManager.registerMetadata(MeterTimeSeriesMetadata.of(metric,keys)) - .doOnError(e -> log.error("register metric [{}] metadata error", metric.getId(), e)) - .subscribe((r) -> log.error("register metric [{}] metadata success", metric.getId())); + reload(); } @Override + @SneakyThrows protected void publish() { - timeSeriesManager - .getService(metric) - .save(Flux.fromIterable(this.getMeters()) - .map(meter -> MeterTimeSeriesData.of(meter) - .name(getConventionName(meter.getId())) - .write(customTags) - .write(getConventionTags(meter.getId())))) - .doOnError(e -> log.error("failed to send metrics [{}]",metric.getId(), e)) - .doOnSuccess(nil -> log.debug("success send metrics [{}]",metric.getId())) - .subscribe(); + Flux.fromIterable(this.getMeters()) + .map(meter -> MeterTimeSeriesData + .of(meter) + .name(getConventionName(meter.getId())) + .write(customTags) + .write(getConventionTags(meter.getId()), (key) -> tagDefine.getOrDefault(key, StringType.GLOBAL))) + .flatMap(timeSeriesManager.getService(metric)::commit) + .then() + .toFuture() + .get(); } + @Override + public void close() { + log.info("close micrometer metric [{}]", metric.getId()); + super.close(); + } @Override @Nonnull diff --git a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java index fdccec0c..7e057450 100644 --- a/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java +++ b/jetlinks-components/timeseries-component/src/main/java/org/jetlinks/community/timeseries/micrometer/TimeSeriesMeterRegistrySupplier.java @@ -1,26 +1,12 @@ -/* - * Copyright 2025 JetLinks https://www.jetlinks.cn - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.jetlinks.community.timeseries.micrometer; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.config.NamingConvention; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import lombok.Getter; import lombok.Setter; +import org.jetlinks.core.metadata.DataType; import org.jetlinks.community.micrometer.MeterRegistrySupplier; +import org.jetlinks.community.micrometer.NoopMeterRegistry; import org.jetlinks.community.timeseries.TimeSeriesManager; import org.jetlinks.community.timeseries.TimeSeriesMetric; @@ -63,14 +49,34 @@ public class TimeSeriesMeterRegistrySupplier implements MeterRegistrySupplier { @Override public MeterRegistry getMeterRegistry(String metric, String... tagKeys) { if (ignore.contains(metric)) { - return new SimpleMeterRegistry(); + return NoopMeterRegistry.INSTANCE; } - TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry(timeSeriesManager, + TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry( + timeSeriesManager, TimeSeriesMetric.of(metric), metrics.getOrDefault(metric, metrics.get("default")), - tags, tagKeys); + tags(), tagKeys); registry.config().namingConvention(namingSupports.get(naming)); return registry; } + private Map tags() { + return this.tags; + } + + @Override + public MeterRegistry getMeterRegistry(String metric, + Map tagDefine) { + if (ignore.contains(metric)) { + return NoopMeterRegistry.INSTANCE; + } + TimeSeriesMeterRegistry registry = new TimeSeriesMeterRegistry( + timeSeriesManager, + TimeSeriesMetric.of(metric), + metrics.getOrDefault(metric, metrics.get("default")), + tags(), + tagDefine); + registry.config().namingConvention(namingSupports.get(naming)); + return registry; + } }